xio_common 0.5.0

XIO commonly used functionality
Documentation
use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
use failure::{Fail, ResultExt, SyncFailure};
use messages::{IsRequest, IsResponse, Message, MessageExt, Notification,
               Request, Response};
use serialport::SerialPort;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use {serialport, std, Error, ErrorKind, Result};

#[derive(Default, Clone)]
pub(crate) struct SequenceHolder {
    // Once AtomicU8 is out of experimental, it can be used here
    // See: https://github.com/rust-lang/rust/issues/32976
    seq: Arc<Mutex<u8>>,
}

impl SequenceHolder {
    pub fn request_sequence_number(&mut self) -> Result<u8> {
        let mut lock = self.seq.lock().unwrap();
        let seq: u8 = *lock;
        *lock = seq + 1;
        Ok(seq)
    }
}

#[derive(Clone, Debug)]
pub(crate) enum RequestItem {
    Request {
        request: Request,
        options: u8,
        sequence: u8,
    },
    Finish,
}

#[derive(Clone, Debug)]
pub struct ResponseItem {
    pub response: Response,
    pub sequence: u8,
}

#[derive(Clone, Debug)]
pub struct NotificationItem {
    pub notification: Notification,
    pub sequence: u8,
}

pub struct Connection {
    port: Box<SerialPort>,
    tx_request_sender: Sender<RequestItem>,

    register_response_sender: Sender<Sender<ResponseItem>>,
    register_notification_sender: Sender<Sender<NotificationItem>>,

    rx_shutdown_sender: Sender<()>,

    tx_handle: Option<thread::JoinHandle<()>>,
    rx_handle: Option<thread::JoinHandle<()>>,

    sequence: SequenceHolder,
}

impl Drop for Connection {
    fn drop(&mut self) {
        // make the threads finish their loop
        if let Err(e) = self.rx_shutdown_sender.send(()) {
            warn!(
                "Error putting item in rx_shutdown_sender: {:?}",
                e
            );
        }
        if let Err(e) = self.tx_request_sender.send(RequestItem::Finish) {
            warn!(
                "Error putting finish item in tx_request_sender: {:?}",
                e
            );
        }

        // join the threads
        let tx_result = if let Some(handle) = self.tx_handle.take() {
            handle.join()
        } else {
            Ok(())
        };
        let rx_result = if let Some(handle) = self.rx_handle.take() {
            handle.join()
        } else {
            Ok(())
        };

        if tx_result.is_err() {
            warn!("Failure joining xio tx thread");
        }
        if rx_result.is_err() {
            warn!("Failure joining xio rx thread");
        }
    }
}

impl Connection {
    pub fn new(port: &str) -> Result<Self> {
        let mut settings = serialport::SerialPortSettings::default();
        settings.timeout = Duration::from_millis(100);
        let port = serialport::open_with_settings(port, &settings)?;

        let mut tx_port = port.try_clone()?;
        let mut rx_port = port.try_clone()?;

        let (register_response_sender, register_response_receiver) =
            unbounded::<Sender<ResponseItem>>();
        let (register_notification_sender, register_notification_receiver) =
            unbounded::<Sender<NotificationItem>>();

        let (tx_request_sender, tx_request_receiver) =
            unbounded::<RequestItem>();
        let (rx_shutdown_sender, rx_shutdown_receiver) = unbounded::<()>();
        let tx_handle = thread::Builder::new()
            .name("xio_tx".to_string())
            .spawn(move || {
                let mut iter = tx_request_receiver.iter();
                while let Some(RequestItem::Request {
                    request,
                    options,
                    sequence,
                }) = iter.next()
                {
                    debug!("Sending request {:?}", request);
                    if let Err(e) =
                        request.write_to(&mut tx_port, options, sequence)
                    {
                        warn!(
                            "Error writing to xio tx port, \
                             closing down: {:?}",
                            e
                        );
                        return;
                    }
                }
            })
            .context(ErrorKind::ThreadCreation)?;

        let rx_handle = {
            thread::Builder::new()
                .name("xio_rx".to_string())
                .spawn(move || {
                    let mut response_senders: Vec<
                        Sender<ResponseItem>,
                    > = Vec::new();
                    let mut notification_senders: Vec<
                        Sender<NotificationItem>,
                    > = Vec::new();

                    loop {
                        match Message::read_from(&mut rx_port) {
                            Ok((sequence, msg)) => match msg {
                                Message::Request(r) => {
                                    warn!(
                                        "Received a request message from \
                                         a xio device which should never \
                                         happen: {:?}",
                                        r
                                    );
                                }
                                Message::Response(response) => {
                                    while let Ok(s) =
                                        register_response_receiver
                                            .try_recv()
                                    {
                                        response_senders.push(s);
                                    }

                                    let response = ResponseItem {
                                        sequence,
                                        response,
                                    };
                                    response_senders = response_senders
                                        .into_iter()
                                        .filter(|s| {
                                            s.send(response.clone())
                                                .is_ok()
                                        })
                                        .collect();
                                }
                                Message::Notification(notification) => {
                                    while let Ok(s) =
                                        register_notification_receiver
                                            .try_recv()
                                    {
                                        notification_senders.push(s);
                                    }

                                    let notification = NotificationItem {
                                        sequence,
                                        notification,
                                    };
                                    notification_senders =
                                        notification_senders
                                            .into_iter()
                                            .filter(|s| {
                                                s.send(
                                                    notification.clone(),
                                                ).is_ok()
                                            })
                                            .collect();
                                }
                            },
                            Err(e) => {
                                if let Ok(()) =
                                    rx_shutdown_receiver.try_recv()
                                {
                                    return;
                                }
                                if e.kind() != std::io::ErrorKind::TimedOut
                                {
                                    // might happen if a shutdown is in progress
                                    warn!("Error received: {:?}", e);
                                }
                            }
                        }
                    }
                })
                .context(ErrorKind::ThreadCreation)?
        };
        Ok(Connection {
            port,
            tx_request_sender,
            register_response_sender,
            register_notification_sender,
            rx_shutdown_sender,
            tx_handle: Some(tx_handle),
            rx_handle: Some(rx_handle),
            sequence: SequenceHolder::default(),
        })
    }

    pub fn add_response_rx(&mut self) -> Result<Receiver<ResponseItem>> {
        let (sender, receiver) = unbounded::<ResponseItem>();
        self.register_response_sender
            .send(sender)
            .map_err(SyncFailure::new)
            .context(ErrorKind::ChannelReceiverDisconnected)?;
        Ok(receiver)
    }

    pub fn add_notification_rx(
        &mut self,
    ) -> Result<Receiver<NotificationItem>> {
        let (sender, receiver) = unbounded::<NotificationItem>();
        self.register_notification_sender
            .send(sender)
            .map_err(SyncFailure::new)
            .context(ErrorKind::ChannelReceiverDisconnected)?;
        Ok(receiver)
    }

    pub fn get_handle(&mut self) -> Result<Handle> {
        let seq = self.sequence.clone();
        let rx = Arc::new(Mutex::new(self.add_response_rx()?));
        let tx = self.tx_request_sender.clone();
        Ok(Handle { seq, tx, rx })
    }
}

pub trait SendAndReceive<Q, S>
where
    Q: IsRequest<Response = S>,
    S: IsResponse<Request = Q>,
{
    type Error;
    fn send_and_receive(
        &mut self,
        request: Q,
    ) -> std::result::Result<S, Self::Error>;
}

impl SendAndReceive<Vec<Request>, Vec<Response>> for Handle {
    type Error = Error;
    fn send_and_receive(
        &mut self,
        request: Vec<Request>,
    ) -> Result<Vec<Response>> {
        let expected_sequences = request
            .into_iter()
            .map(|request| self.send(0u8, request))
            .collect::<Result<Vec<_>>>()?;

        let responses = expected_sequences
            .into_iter()
            .map(|sequence| self.receive_next(sequence))
            .collect::<Result<Vec<_>>>()?;

        Ok(responses)
    }
}

impl SendAndReceive<Request, Response> for Handle {
    type Error = Error;
    fn send_and_receive(&mut self, request: Request) -> Result<Response> {
        let mut responses = self.send_and_receive(vec![request])?;
        let response = responses.drain(..).next();
        response.ok_or_else(|| ErrorKind::ProgrammerError.into())
    }
}

impl<Q, S> SendAndReceive<Q, S> for Handle
where
    Q: Into<Request> + IsRequest<Response = S>,
    S: super::messages::TryFromResponse + IsResponse<Request = Q>,
{
    type Error = Error;
    fn send_and_receive(&mut self, request: Q) -> Result<S> {
        let response = self.send_and_receive(request.into())?;
        Ok(S::try_from_response(response)?)
    }
}

#[derive(Clone)]
pub struct Handle {
    seq: SequenceHolder,
    tx: Sender<RequestItem>,
    rx: Arc<Mutex<Receiver<ResponseItem>>>,
}

impl Handle {
    pub fn send(&mut self, options: u8, request: Request) -> Result<u8> {
        let sequence = self.seq.request_sequence_number()?;
        self.tx
            .send(RequestItem::Request {
                request,
                options,
                sequence,
            })
            .map_err(|e| {
                e.context(ErrorKind::ChannelReceiverDisconnected)
            })?;
        Ok(sequence)
    }

    fn receive_next(&self, expected_sequence: u8) -> Result<Response> {
        for _i in 0..20 {
            let result = self.rx
                .lock()
                .unwrap()
                .recv_timeout(Duration::from_millis(100));
            match result {
                Ok(ResponseItem {
                    sequence,
                    response,
                }) => {
                    if expected_sequence == sequence {
                        return Ok(response);
                    }
                }
                Err(RecvTimeoutError::Timeout) => {}
                Err(d @ RecvTimeoutError::Disconnected) => {
                    return Err(d.context(
                        ErrorKind::ChannelReceiverDisconnected,
                    ).into());
                }
            }
        }
        Err(ErrorKind::Timeout.into())
    }
}