airtouch5 0.2.0

A library for communicating with AirTouch 5 air conditioning system control consoles
Documentation
use std::{collections::BTreeMap, io};

use tokio::sync::{broadcast, mpsc, oneshot, watch};

use crate::{
    conn::{frame::Frame, Connection, MessageKind},
    types::status::{CurrentStatus, StatusChange},
};

type Outstanding = BTreeMap<(u8, u8), oneshot::Sender<io::Result<Frame>>>;
pub(super) type Request = (Frame, oneshot::Sender<io::Result<Frame>>);

struct IoLoop {
    connection: Connection,
    channel_kill: oneshot::Receiver<()>,
    channel_die: Option<oneshot::Sender<io::Error>>,
    channel_req: mpsc::Receiver<Request>,
    channel_change: broadcast::Sender<StatusChange>,
    channel_status: watch::Sender<CurrentStatus>,
    outstanding: Outstanding,
}

impl IoLoop {
    /// Main IO Loop: select for
    /// - `channel_kill`: shutdown requested, clean up and die
    /// - `connection` as `Stream`: forward status messages to `broadcast_status()`,
    ///   and handle replies to control requests or extended messages by sending
    ///   them back to caller on its `oneshot`
    /// - `channel_req`: forward request to `connection` as `Sink` and register caller
    ///   as waiting for reply
    async fn run(&mut self) {
        use futures_util::sink::SinkExt;
        use tokio_stream::StreamExt;
        let mut running = true;
        while running {
            tokio::select! {
                // frame from the connection
                next = self.connection.next() => {
                    if let Some(Ok(frame)) = next {
                        // response or status frame
                        if let Some(tx) = self.outstanding.remove(&(frame.msg_id, frame.msg_type)) {
                            // a response, send back to caller
                            if frame.kind == MessageKind::StatusResponse {
                                // and also broadcast as a status update
                                self.broadcast_status(frame.clone());
                            }
                            let _ = tx.send(Ok(frame));
                        } else if frame.kind == MessageKind::StatusResponse {
                            // a status update to broadcast
                            self.broadcast_status(frame);
                        } else {
                            log::debug!("ignoring unexpected {:?} message", frame.kind);
                        }
                    } else {
                        // IO error or remote closed the connection
                        let e = match next {
                            Some(Err(e)) => e,
                            None => io::Error::from(io::ErrorKind::UnexpectedEof),
                            _ => panic!("unexpected next value"),
                        };
                        log::error!("{}", e);
                        if let Some(die_tx) = self.channel_die.take() {
                            let _ = die_tx.send(e);
                        }
                    }
                },
                Some((frame, ret_tx)) = self.channel_req.recv() => {
                    let (msg_id, msg_type) = (frame.msg_id, frame.msg_type);
                    match self.connection.send(frame).await {
                        Ok(_) => {
                            // register that we're expecting a response
                            self.outstanding.insert((msg_id, msg_type), ret_tx);
                        },
                        Err(e) => {
                            // failed to send, send error back to caller
                            // (ignore send failure if the caller has stopped waiting)
                            let _ = ret_tx.send(Err(e));
                       },
                    }
                }
                // shutdown request
                _ = &mut self.channel_kill => { running = false; },
            }
        }
    }

    fn broadcast_status(&mut self, frame: Frame) {
        if let Ok(change) = StatusChange::try_from(frame) {
            // TODO: detect whether actually modified and use `send_if_modified()`
            self.channel_status
                .send_modify(|status| status.apply(&change));
            let _ = self.channel_change.send(change);
        }
    }
}

pub(super) fn spawn(
    connection: Connection,
    channel_kill: oneshot::Receiver<()>,
    channel_req: mpsc::Receiver<Request>,
    channel_change: broadcast::Sender<StatusChange>,
    channel_status: watch::Sender<CurrentStatus>,
) -> tokio::task::JoinHandle<()> {
    let outstanding = BTreeMap::<_, _>::new();
    tokio::spawn(async move {
        let mut io_loop = IoLoop {
            connection,
            channel_kill,
            channel_die: None, /* FIXME */
            channel_req,
            channel_change,
            channel_status,
            outstanding,
        };
        io_loop.run().await;
    })
}