xio_common 0.12.0

XIO commonly used functionality
Documentation
use crate::messages::{
    IsRequest, IsResponse, Message, MessageExt, Notification, Request,
    Response,
};
use crate::{error, serialport, Error, Result};
use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender};
use ommui_broadcast::{
    Broadcaster, SubscriptionManager, SubscriptionRegistration,
};
use serialport::SerialPort;
use snafu::{OptionExt, ResultExt};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;

const RESPONSE_TIMEOUT_MILLIS: u64 = 2000;

#[derive(Clone, Debug)]
pub struct ResponseItem {
    pub response: Response,
    pub sequence: u8,
}

#[derive(Clone, Debug)]
pub(crate) struct RequestItem {
    request: Request,
    options: u8,
    sequence: u8,
}

#[derive(Clone, Debug)]
pub struct NotificationItem {
    pub notification: Notification,
    pub sequence: u8,
}

struct ConnectionInnerLocked {
    port_id: String,
    tx_port: Box<dyn SerialPort>,
    rx_handle: Option<thread::JoinHandle<()>>,
    rx_shutdown_sender: Sender<()>,
    sequence: u8,
}

// The tty port is sync, so we also mark this as sync
unsafe impl Sync for ConnectionInnerLocked {}

impl ConnectionInnerLocked {
    fn get_next_sequence_number(&mut self) -> u8 {
        let (next, _overflow) = self.sequence.overflowing_add(1);
        self.sequence = next;
        next
    }

    fn send(&mut self, options: u8, request: &Request) -> Result<u8> {
        let sequence = self.get_next_sequence_number();
        debug!("Sending with sequence number {}: {:?}", sequence, request);

        request.write_to(&mut self.tx_port, options, sequence)?;
        Ok(sequence)
    }
}

#[derive(Clone)]
pub struct Connection {
    response_subscription_manager: SubscriptionManager<ResponseItem>,
    notification_subscription_manager:
        SubscriptionManager<NotificationItem>,
    stale_connection_subscription_manager: SubscriptionManager<()>,
    stale_connection_broadcaster: Broadcaster<()>,
    inner: Arc<RwLock<ConnectionInnerLocked>>,
}

impl Drop for ConnectionInnerLocked {
    fn drop(&mut self) {
        // join the rx thread
        self.rx_shutdown_sender.send(()).unwrap();
        if let Some(rx_handle) = self.rx_handle.take() {
            info!("Waiting for port {} to close", self.port_id);
            if rx_handle.join().is_err() {
                warn!("Failure joining xio rx thread");
            } else {
                info!("Closed port {}", self.port_id);
            }
        }
    }
}

impl Connection {
    /// Create a Connection, and give it an identifier.
    ///
    /// The identifier gets inserted in the thread name, helping to identify
    /// threads when debugging.
    ///
    /// The resulting thread name is something like "xio_tx:<identifier>" or
    /// "xio_rx:<identifier>".
    pub fn new_with_identifier(
        port: &str,
        identifier: &str,
    ) -> Result<Self> {
        let mut settings = serialport::SerialPortSettings::default();
        settings.timeout = Duration::from_millis(RESPONSE_TIMEOUT_MILLIS);
        let tx_port = serialport::open_with_settings(port, &settings)
            .context(error::SerialPort)?;

        let mut rx_port =
            tx_port.try_clone().context(error::SerialPort)?;

        let mut response_broadcaster = Broadcaster::default();
        let mut notification_broadcaster = Broadcaster::default();
        let stale_connection_broadcaster = Broadcaster::default();
        let response_subscription_manager =
            response_broadcaster.subscription_manager();
        let notification_subscription_manager =
            notification_broadcaster.subscription_manager();
        let stale_connection_subscription_manager =
            stale_connection_broadcaster.subscription_manager();

        let (rx_shutdown_sender, rx_shutdown_receiver) = bounded(1);

        let rx_handle = {
            Some(
                thread::Builder::new()
                    .name(format!("xio_rx:{}", identifier))
                    .spawn(move || loop {
                        if rx_shutdown_receiver.try_recv().is_ok() {
                            return;
                        }
                        match Message::read_from(&mut rx_port) {
                            Ok((sequence, msg)) => match msg {
                                Message::Request(r) => {
                                    warn!(
                                        "Received a request message from \
                                         a xio device which should never \
                                         happen: {:?}",
                                        r
                                    );
                                }
                                Message::Response(response) => {
                                    let response = ResponseItem {
                                        sequence,
                                        response,
                                    };
                                    response_broadcaster
                                        .broadcast(response);
                                }
                                Message::Notification(notification) => {
                                    let notification = NotificationItem {
                                        sequence,
                                        notification,
                                    };
                                    notification_broadcaster
                                        .broadcast(notification);
                                }
                            },
                            Err(error::Error::Timeout { .. }) => {}
                            Err(error::Error::Io {
                                ref source, ..
                            }) if source.kind()
                                == std::io::ErrorKind::TimedOut =>
                            {
                                if rx_shutdown_receiver.try_recv().is_ok()
                                {
                                    return;
                                }
                            }
                            Err(e) => {
                                if rx_shutdown_receiver.try_recv().is_ok()
                                {
                                    return;
                                }
                                warn!("Error received: {:?}", e);
                            }
                        }
                    })
                    .context(error::ThreadCreation)?,
            )
        };
        let inner = Arc::new(RwLock::new(ConnectionInnerLocked {
            port_id: port.to_string(),
            rx_handle,
            tx_port,
            rx_shutdown_sender,
            sequence: 0u8,
        }));
        Ok(Connection {
            inner,
            response_subscription_manager,
            notification_subscription_manager,
            stale_connection_subscription_manager,
            stale_connection_broadcaster,
        })
    }

    pub fn new(port: &str) -> Result<Self> {
        Self::new_with_identifier(port, port)
    }

    fn get_responses_receiver(
        &mut self,
    ) -> Result<Receiver<ResponseItem>> {
        self.response_subscription_manager
            .subscribe()
            .context(error::Broadcast)
    }

    pub fn get_notifications_receiver(
        &mut self,
    ) -> Result<Receiver<NotificationItem>> {
        self.notification_subscription_manager
            .subscribe()
            .context(error::Broadcast)
    }

    pub fn get_stale_connection_receiver(
        &mut self,
    ) -> Result<Receiver<()>> {
        self.stale_connection_subscription_manager
            .subscribe()
            .context(error::Broadcast)
    }
}

pub trait SendAndReceive<Q, S>
where
    Q: IsRequest<Response = S>,
    S: IsResponse<Request = Q>,
{
    type Error;
    fn send_and_receive(
        &mut self,
        request: Q,
    ) -> std::result::Result<S, Self::Error>;
}

impl SendAndReceive<Vec<Request>, Vec<Response>> for Connection {
    type Error = Error;
    fn send_and_receive(
        &mut self,
        requests: Vec<Request>,
    ) -> Result<Vec<Response>> {
        let receiver = self.get_responses_receiver()?;
        let timeout = Duration::from_millis(RESPONSE_TIMEOUT_MILLIS);
        let options = 0u8;

        let mut write = self.inner.write().unwrap();
        let mut stale_connection_broadcaster =
            self.stale_connection_broadcaster.clone();
        Ok(requests
            .into_iter()
            .map(|request| {
                let expected_sequence = write.send(options, &request)?;
                match receiver.recv_timeout(timeout) {
                    Ok(ResponseItem { sequence, response }) => {
                        if sequence == expected_sequence {
                            Ok(response)
                        } else {
                            error::XioSequenceError {
                                expected: expected_sequence,
                                received: sequence,
                                request,
                                response,
                            }
                            .fail()?
                        }
                    }
                    Err(RecvTimeoutError::Timeout) => {
                        warn!(
                            "Timeout while waiting for answer from device"
                        );
                        stale_connection_broadcaster.broadcast(());
                        error::Timeout.fail()?
                    }
                    Err(RecvTimeoutError::Disconnected) => {
                        error::ChannelSenderDisconnected.fail()?
                    }
                }
            })
            .collect::<Result<Vec<_>>>()?)
    }
}

impl SendAndReceive<Request, Response> for Connection {
    type Error = Error;
    fn send_and_receive(&mut self, request: Request) -> Result<Response> {
        let mut responses = self.send_and_receive(vec![request])?;
        let response = responses.drain(..).next();
        response.context(error::ProgrammerError)
    }
}

impl<Q, S> SendAndReceive<Q, S> for Connection
where
    Q: Into<Request> + IsRequest<Response = S>,
    S: super::messages::TryFromResponse + IsResponse<Request = Q>,
{
    type Error = Error;
    fn send_and_receive(&mut self, request: Q) -> Result<S> {
        let response = self.send_and_receive(request.into())?;
        Ok(S::try_from_response(response)?)
    }
}