use std::convert::TryFrom;
use std::io;
use std::path::Path;
use std::time::Duration;
use log::{debug, error, warn};
use futures::{
future::FutureExt,
select_biased,
stream::{Stream, StreamExt},
};
use futures_timer::Delay;
use crate::broker::*;
use crate::error::*;
use crate::frame::*;
use crate::message::*;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
const NUM_RETRIES: u8 = 20;
const RETRY_DELAY: Duration = Duration::from_millis(250);
pub const DEFAULT_TIMEOUT_DURATION: Duration = Duration::from_secs(10);
pub struct Modem {
broker: Broker,
}
impl Modem {
pub fn from_path(path: impl AsRef<Path> + Send + 'static) -> io::Result<Self> {
debug!("Creating Modem with path {}", path.as_ref().display());
let broker = Broker::from_path(path)?;
Ok(Self { broker })
}
pub fn new(handle: impl AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static) -> Modem {
Self {
broker: Broker::new(handle),
}
}
async fn send_frame(&mut self, frame: Frame) -> Result<Frame, Error> {
let mut retries = NUM_RETRIES;
loop {
retries -= 1;
debug!(
"Sending Frame (attempt {}) {:02x?}",
NUM_RETRIES - retries,
frame
);
match self.broker.send(frame.clone()).await {
Ok(response) => {
debug!("Received Response: {:02x?}", response);
return Ok(response);
}
Err(Error::NotAcknowledged) if retries > 0 => {
warn!("Frame not acknowledged, retrying after {:?}", RETRY_DELAY);
Delay::new(RETRY_DELAY).await;
continue;
}
e => {
error!("Failed to send frame, {:02x?}", e);
return e;
}
}
}
}
async fn send_message_direct(&mut self, message: Message) -> Result<Message, Error> {
debug!("Sending Message {:02x?}", message);
let mut listener = self.listen().await?;
if message.flags.contains(MessageFlags::EXTENDED) {
self.send_frame(Frame::ExtendedInsteonSend {
to: message.to,
flags: message.flags,
max_hops: message.max_hops,
cmd1: message.cmd1.into(),
cmd2: message.cmd2.into(),
data: message.data,
})
.await?;
} else {
self.send_frame(Frame::StandardInsteonSend {
to: message.to,
flags: message.flags,
max_hops: message.max_hops,
cmd1: message.cmd1.into(),
cmd2: message.cmd2.into(),
})
.await?;
}
while let Some(response) = listener.next().await {
debug!("Received Message: {:02x?}", response);
if message.is_ack(&response) {
return Ok(response);
}
}
Ok(message)
}
pub async fn send_message(&mut self, message: Message) -> Result<Message, Error> {
self.send_message_with_timeout(message, DEFAULT_TIMEOUT_DURATION)
.await
}
pub async fn send_message_with_timeout(
&mut self,
message: Message,
duration: Duration,
) -> Result<Message, Error> {
let mut delay = Delay::new(duration).fuse();
let mut sending = Box::pin(self.send_message_direct(message).fuse());
select_biased! {
e = delay => Err(Error::Timeout),
r = sending => r
}
}
pub async fn get_info(&mut self) -> Result<ModemInfo, Error> {
match self.send_frame(Frame::GetModemInfo).await? {
Frame::ModemInfo(info) => Ok(info),
_ => Err(Error::UnexpectedResponse),
}
}
pub async fn get_links(&mut self) -> Result<impl Iterator<Item = AllLinkRecord>, Error> {
let mut records = Vec::new();
let mut listener = self.listen_frames().await?;
self.send_frame(Frame::GetFirstAllLinkRecord).await?;
while let Some(frame) = listener.next().await {
match frame {
Frame::AllLinkRecord(record) => {
debug!("Got All Link {:?}", record);
records.push(record);
if let Err(Error::NotAcknowledged) =
self.broker.send(Frame::GetNextAllLinkRecord).await
{
break;
}
}
_ => return Err(Error::UnexpectedResponse),
}
}
Ok(records.into_iter())
}
async fn listen_frames(
&mut self,
) -> Result<impl Stream<Item = Frame> + Sync + Send + Unpin, Error> {
self.broker.listen().await
}
pub async fn listen(
&mut self,
) -> Result<impl Stream<Item = Message> + Sync + Send + Unpin, Error> {
Ok(Box::pin(self.broker.listen().await?.filter_map(
|frame| async {
if let Ok(message) = Message::try_from(frame) {
Some(message)
} else {
None
}
},
)))
}
pub async fn link_device(
&mut self,
address: Option<Address>,
mode: AllLinkMode,
group: u8,
) -> Result<AllLinkComplete, Error> {
self.send_frame(Frame::CancelAllLink).await?;
let mut listener = self.listen_frames().await?;
if let Some(address) = address {
self.send_message(
(
address,
Command::StartLinking,
Command::from(group),
MessageFlags::EXTENDED,
)
.into(),
)
.await?;
}
self.send_frame(Frame::StartAllLink { mode, group }).await?;
let mut result = Err(Error::UnexpectedResponse);
while let Some(frame) = listener.next().await {
match frame {
Frame::AllLinkComplete(info) => {
result = Ok(info);
break;
}
_ => continue,
}
}
drop(listener);
if let Some(address) = address {
let _ = self
.send_message(
(
address,
Command::CancelLinking,
Command::from(group),
MessageFlags::EXTENDED,
)
.into(),
)
.await; }
let _ = self.send_frame(Frame::CancelAllLink).await;
result
}
}
#[cfg(test)]
mod tests {
use super::*;
use lazy_static::lazy_static;
use std::{
env,
sync::{Arc, Mutex},
};
const MODEM_ENV_VAR: &str = "MODEM_PATH";
lazy_static! {
static ref MODEM: Arc<Mutex<Modem>> = {
pretty_env_logger::init();
Arc::new(Mutex::new(
Modem::from_path(env::var(MODEM_ENV_VAR).unwrap()).unwrap(),
))
};
}
macro_rules! assume_modem {
() => {
if env::var(MODEM_ENV_VAR).is_err() {
return ();
}
};
}
#[async_std::test]
async fn get_info() {
assume_modem!();
let info = MODEM.lock().unwrap().get_info().await.unwrap();
assert_eq!(info.category, 3);
}
#[async_std::test]
async fn get_links() {
assume_modem!();
let links: Vec<AllLinkRecord> = MODEM.lock().unwrap().get_links().await.unwrap().collect();
assert!(!links.is_empty());
}
#[test]
fn bad_path() {
assert!(Modem::from_path("/this/does/not/exist").is_err());
}
}