#![cfg_attr(docsrs, feature(doc_cfg))]
mod conn;
pub mod discovery;
mod io_loop;
mod message;
pub mod types;
#[cfg(feature = "timeout")]
use std::time::Duration;
use std::{io, net::IpAddr};
use crate::{
message::{
ac_capability::{AcCapability, AcCapabilityResponse},
ac_status::AcStatusMessage,
console_version::ConsoleVersionResponse,
zone_name::ZoneNameResponse,
zone_status::ZoneStatusMessage,
},
types::status::{CurrentStatus, StatusChange},
};
#[cfg(feature = "control")]
use crate::message::{
ac_control::{AcControl, AcControlMessage},
zone_control::{ZoneControl, ZoneControlMessage},
};
pub struct AirTouch5 {
conn_loop: Option<(
tokio::task::JoinHandle<()>,
tokio::sync::oneshot::Sender<()>,
)>,
channel_req: tokio::sync::mpsc::Sender<io_loop::Request>,
status: tokio::sync::watch::Sender<CurrentStatus>,
changes: tokio::sync::broadcast::WeakSender<StatusChange>,
}
impl AirTouch5 {
pub async fn new<A: tokio::net::ToSocketAddrs>(addr: A) -> io::Result<Self> {
let connection = conn::Connection::new(addr).await?;
Self::with_connection(connection).await
}
pub async fn with_ipaddr(addr: IpAddr) -> io::Result<Self> {
let connection = conn::Connection::with_ipaddr(addr).await?;
Self::with_connection(connection).await
}
pub async fn with_ipaddrs(addrs: &[IpAddr]) -> io::Result<Self> {
let connection = conn::Connection::with_ipaddrs(addrs).await?;
Self::with_connection(connection).await
}
pub async fn with_str(addr: &str) -> io::Result<Self> {
let connection = conn::Connection::with_str(addr).await?;
Self::with_connection(connection).await
}
async fn with_connection(connection: conn::Connection) -> io::Result<Self> {
let (kill_tx, kill_rx) = tokio::sync::oneshot::channel();
let (cast_tx, _) = tokio::sync::broadcast::channel(32);
let (watch_tx, _) = tokio::sync::watch::channel(CurrentStatus::default());
let (req_tx, req_rx) = tokio::sync::mpsc::channel(4);
let changes = cast_tx.downgrade();
let status = watch_tx.clone();
let h = io_loop::spawn(connection, kill_rx, req_rx, cast_tx, watch_tx);
Ok(Self {
conn_loop: Some((h, kill_tx)),
channel_req: req_tx,
status,
changes,
})
}
pub fn subscribe_changes(&self) -> Option<tokio::sync::broadcast::Receiver<StatusChange>> {
self.changes.upgrade().map(|c| c.subscribe())
}
pub fn subscribe_status(&self) -> Option<tokio::sync::watch::Receiver<CurrentStatus>> {
Some(self.status.subscribe())
}
pub async fn shutdown(&mut self) -> io::Result<()> {
if let Some((h, tx)) = self.conn_loop.take() {
if tx.send(()).is_ok() {
return Ok(h.await?);
}
}
Ok(())
}
async fn send_request(&self, request: conn::frame::Frame) -> io::Result<conn::frame::Frame> {
let (ret_tx, ret_rx) = tokio::sync::oneshot::channel();
self.channel_req
.send((request, ret_tx))
.await
.map_err(io::Error::other)?;
ret_rx.await.map_err(io::Error::other)?
}
pub async fn zone_status(&self) -> io::Result<ZoneStatusMessage> {
let req = ZoneStatusMessage::request();
let f = self
.send_request(req.try_into().map_err(io::Error::other)?)
.await?;
let resp: ZoneStatusMessage = f.try_into()?;
Ok(resp)
}
#[cfg(feature = "timeout")]
pub async fn zone_status_timeout(&self, duration: Duration) -> io::Result<ZoneStatusMessage> {
tokio::time::timeout(duration, self.zone_status()).await?
}
pub async fn ac_status(&self) -> io::Result<AcStatusMessage> {
let req = AcStatusMessage::request();
let f = self
.send_request(req.try_into().map_err(io::Error::other)?)
.await?;
let resp: AcStatusMessage = f.try_into()?;
Ok(resp)
}
#[cfg(feature = "timeout")]
pub async fn ac_status_timeout(&self, duration: Duration) -> io::Result<AcStatusMessage> {
tokio::time::timeout(duration, self.ac_status()).await?
}
pub async fn ac_capabilities(&self) -> io::Result<AcCapabilityResponse> {
let req = message::ac_capability::AcCapabilityRequest::new(None);
let f = self
.send_request(req.try_into().map_err(io::Error::other)?)
.await?;
let resp: AcCapabilityResponse = f.try_into()?;
Ok(resp)
}
#[cfg(feature = "timeout")]
pub async fn ac_capabilities_timeout(
&self,
duration: Duration,
) -> io::Result<AcCapabilityResponse> {
tokio::time::timeout(duration, self.ac_capabilities()).await?
}
pub async fn ac_capability(&self, ac_index: u8) -> io::Result<AcCapability> {
let req = message::ac_capability::AcCapabilityRequest::new(Some(ac_index));
let f = self
.send_request(req.try_into().map_err(io::Error::other)?)
.await?;
let mut resp: AcCapabilityResponse = f.try_into()?;
if let Some(ac) = resp.acs.remove(&ac_index) {
Ok(ac)
} else {
Err(io::Error::from(io::ErrorKind::NotFound))
}
}
#[cfg(feature = "timeout")]
pub async fn ac_capability_timeout(
&self,
ac_index: u8,
duration: Duration,
) -> io::Result<AcCapability> {
tokio::time::timeout(duration, self.ac_capability(ac_index)).await?
}
pub async fn console_version(&self) -> io::Result<ConsoleVersionResponse> {
let req = message::console_version::ConsoleVersionRequest::new();
let f = self
.send_request(req.try_into().map_err(io::Error::other)?)
.await?;
let resp: ConsoleVersionResponse = f.try_into()?;
Ok(resp)
}
#[cfg(feature = "timeout")]
pub async fn console_version_timeout(
&self,
duration: Duration,
) -> io::Result<ConsoleVersionResponse> {
tokio::time::timeout(duration, self.console_version()).await?
}
pub async fn zone_names(&self) -> io::Result<ZoneNameResponse> {
let req = message::zone_name::ZoneNameRequest::new(None);
let f = self
.send_request(req.try_into().map_err(io::Error::other)?)
.await?;
let resp: ZoneNameResponse = f.try_into()?;
Ok(resp)
}
#[cfg(feature = "timeout")]
pub async fn zone_names_timeout(&self, duration: Duration) -> io::Result<ZoneNameResponse> {
tokio::time::timeout(duration, self.zone_names()).await?
}
pub async fn zone_name(&self, zone_idx: u8) -> io::Result<String> {
let req = message::zone_name::ZoneNameRequest::new(Some(zone_idx));
let f = self
.send_request(req.try_into().map_err(io::Error::other)?)
.await?;
let mut resp: ZoneNameResponse = f.try_into()?;
if let Some(name) = resp.zones.remove(&zone_idx) {
Ok(name)
} else {
Err(io::Error::other(format!("no zone {}", zone_idx)))
}
}
#[cfg(feature = "timeout")]
pub async fn zone_name_timeout(&self, zone_idx: u8, duration: Duration) -> io::Result<String> {
tokio::time::timeout(duration, self.zone_name(zone_idx)).await?
}
#[cfg(feature = "control")]
pub async fn control_zone(
&self,
zone_idx: u8,
zone_control: ZoneControl,
) -> io::Result<ZoneStatusMessage> {
self.control_zones([(zone_idx, zone_control)]).await
}
#[cfg(all(feature = "control", feature = "timeout"))]
pub async fn control_zone_timeout(
&self,
zone_idx: u8,
zone_control: ZoneControl,
duration: Duration,
) -> io::Result<ZoneStatusMessage> {
tokio::time::timeout(duration, self.control_zones([(zone_idx, zone_control)])).await?
}
#[cfg(feature = "control")]
pub async fn control_zones<
K: Into<u8>,
V: Into<ZoneControl>,
T: IntoIterator<Item = (K, V)>,
>(
&self,
zones: T,
) -> io::Result<ZoneStatusMessage> {
let req = ZoneControlMessage::new(zones);
let f = self
.send_request(req.try_into().map_err(io::Error::other)?)
.await?;
let resp: ZoneStatusMessage = f.try_into()?;
Ok(resp)
}
#[cfg(all(feature = "control", feature = "timeout"))]
pub async fn control_zones_timeout<
K: Into<u8>,
V: Into<ZoneControl>,
T: IntoIterator<Item = (K, V)>,
>(
&self,
zones: T,
duration: Duration,
) -> io::Result<ZoneStatusMessage> {
tokio::time::timeout(duration, self.control_zones(zones)).await?
}
#[cfg(feature = "control")]
pub async fn control_ac(
&self,
ac_idx: u8,
ac_control: AcControl,
) -> io::Result<AcStatusMessage> {
self.control_acs([(ac_idx, ac_control)]).await
}
#[cfg(all(feature = "control", feature = "timeout"))]
pub async fn control_ac_timeout(
&self,
ac_idx: u8,
ac_control: AcControl,
duration: Duration,
) -> io::Result<AcStatusMessage> {
tokio::time::timeout(duration, self.control_acs([(ac_idx, ac_control)])).await?
}
#[cfg(feature = "control")]
pub async fn control_acs<K: Into<u8>, V: Into<AcControl>, T: IntoIterator<Item = (K, V)>>(
&self,
acs: T,
) -> io::Result<AcStatusMessage> {
let req = AcControlMessage::new(acs);
let f = self
.send_request(req.try_into().map_err(io::Error::other)?)
.await?;
let resp: AcStatusMessage = f.try_into()?;
Ok(resp)
}
#[cfg(all(feature = "control", feature = "timeout"))]
pub async fn control_acs_timeout<
K: Into<u8>,
V: Into<AcControl>,
T: IntoIterator<Item = (K, V)>,
>(
&self,
acs: T,
duration: Duration,
) -> io::Result<AcStatusMessage> {
tokio::time::timeout(duration, self.control_acs(acs)).await?
}
}
impl Drop for AirTouch5 {
fn drop(&mut self) {
if let Some((_, tx)) = self.conn_loop.take() {
let _ = tx.send(());
}
}
}
#[cfg(test)]
#[macro_use]
extern crate assert_matches;
#[cfg(test)]
mod tests {
#[allow(unused_imports)]
use super::*;
}