use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
use failure::{Fail, ResultExt, SyncFailure};
use messages::{IsRequest, IsResponse, Message, MessageExt, Notification,
Request, Response};
use serialport::SerialPort;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use {serialport, std, Error, ErrorKind, Result};
#[derive(Default, Clone)]
pub(crate) struct SequenceHolder {
seq: Arc<Mutex<u8>>,
}
impl SequenceHolder {
pub fn request_sequence_number(&mut self) -> Result<u8> {
let mut lock = self.seq.lock().unwrap();
let seq: u8 = *lock;
*lock = seq + 1;
Ok(seq)
}
}
#[derive(Clone, Debug)]
pub(crate) enum RequestItem {
Request {
request: Request,
options: u8,
sequence: u8,
},
Finish,
}
#[derive(Clone, Debug)]
pub struct ResponseItem {
pub response: Response,
pub sequence: u8,
}
#[derive(Clone, Debug)]
pub struct NotificationItem {
pub notification: Notification,
pub sequence: u8,
}
pub struct Connection {
port: Box<SerialPort>,
tx_request_sender: Sender<RequestItem>,
register_response_sender: Sender<Sender<ResponseItem>>,
register_notification_sender: Sender<Sender<NotificationItem>>,
rx_shutdown_sender: Sender<()>,
tx_handle: Option<thread::JoinHandle<()>>,
rx_handle: Option<thread::JoinHandle<()>>,
sequence: SequenceHolder,
}
impl Drop for Connection {
fn drop(&mut self) {
if let Err(e) = self.rx_shutdown_sender.send(()) {
warn!(
"Error putting item in rx_shutdown_sender: {:?}",
e
);
}
if let Err(e) = self.tx_request_sender.send(RequestItem::Finish) {
warn!(
"Error putting finish item in tx_request_sender: {:?}",
e
);
}
let tx_result = if let Some(handle) = self.tx_handle.take() {
handle.join()
} else {
Ok(())
};
let rx_result = if let Some(handle) = self.rx_handle.take() {
handle.join()
} else {
Ok(())
};
if tx_result.is_err() {
warn!("Failure joining xio tx thread");
}
if rx_result.is_err() {
warn!("Failure joining xio rx thread");
}
}
}
impl Connection {
pub fn new(port: &str) -> Result<Self> {
let mut settings = serialport::SerialPortSettings::default();
settings.timeout = Duration::from_millis(100);
let port = serialport::open_with_settings(port, &settings)?;
let mut tx_port = port.try_clone()?;
let mut rx_port = port.try_clone()?;
let (register_response_sender, register_response_receiver) =
unbounded::<Sender<ResponseItem>>();
let (register_notification_sender, register_notification_receiver) =
unbounded::<Sender<NotificationItem>>();
let (tx_request_sender, tx_request_receiver) =
unbounded::<RequestItem>();
let (rx_shutdown_sender, rx_shutdown_receiver) = unbounded::<()>();
let tx_handle = thread::Builder::new()
.name("xio_tx".to_string())
.spawn(move || {
let mut iter = tx_request_receiver.iter();
while let Some(RequestItem::Request {
request,
options,
sequence,
}) = iter.next()
{
debug!("Sending request {:?}", request);
if let Err(e) =
request.write_to(&mut tx_port, options, sequence)
{
warn!(
"Error writing to xio tx port, \
closing down: {:?}",
e
);
return;
}
}
})
.context(ErrorKind::ThreadCreation)?;
let rx_handle = {
thread::Builder::new()
.name("xio_rx".to_string())
.spawn(move || {
let mut response_senders: Vec<
Sender<ResponseItem>,
> = Vec::new();
let mut notification_senders: Vec<
Sender<NotificationItem>,
> = Vec::new();
loop {
match Message::read_from(&mut rx_port) {
Ok((sequence, msg)) => match msg {
Message::Request(r) => {
warn!(
"Received a request message from \
a xio device which should never \
happen: {:?}",
r
);
}
Message::Response(response) => {
while let Ok(s) =
register_response_receiver
.try_recv()
{
response_senders.push(s);
}
let response = ResponseItem {
sequence,
response,
};
response_senders = response_senders
.into_iter()
.filter(|s| {
s.send(response.clone())
.is_ok()
})
.collect();
}
Message::Notification(notification) => {
while let Ok(s) =
register_notification_receiver
.try_recv()
{
notification_senders.push(s);
}
let notification = NotificationItem {
sequence,
notification,
};
notification_senders =
notification_senders
.into_iter()
.filter(|s| {
s.send(
notification.clone(),
).is_ok()
})
.collect();
}
},
Err(e) => {
if let Ok(()) =
rx_shutdown_receiver.try_recv()
{
return;
}
if e.kind() != std::io::ErrorKind::TimedOut
{
warn!("Error received: {:?}", e);
}
}
}
}
})
.context(ErrorKind::ThreadCreation)?
};
Ok(Connection {
port,
tx_request_sender,
register_response_sender,
register_notification_sender,
rx_shutdown_sender,
tx_handle: Some(tx_handle),
rx_handle: Some(rx_handle),
sequence: SequenceHolder::default(),
})
}
pub fn add_response_rx(&mut self) -> Result<Receiver<ResponseItem>> {
let (sender, receiver) = unbounded::<ResponseItem>();
self.register_response_sender
.send(sender)
.map_err(SyncFailure::new)
.context(ErrorKind::ChannelReceiverDisconnected)?;
Ok(receiver)
}
pub fn add_notification_rx(
&mut self,
) -> Result<Receiver<NotificationItem>> {
let (sender, receiver) = unbounded::<NotificationItem>();
self.register_notification_sender
.send(sender)
.map_err(SyncFailure::new)
.context(ErrorKind::ChannelReceiverDisconnected)?;
Ok(receiver)
}
pub fn get_handle(&mut self) -> Result<Handle> {
let seq = self.sequence.clone();
let rx = Arc::new(Mutex::new(self.add_response_rx()?));
let tx = self.tx_request_sender.clone();
Ok(Handle { seq, tx, rx })
}
}
pub trait SendAndReceive<Q, S>
where
Q: IsRequest<Response = S>,
S: IsResponse<Request = Q>,
{
type Error;
fn send_and_receive(
&mut self,
request: Q,
) -> std::result::Result<S, Self::Error>;
}
impl SendAndReceive<Vec<Request>, Vec<Response>> for Handle {
type Error = Error;
fn send_and_receive(
&mut self,
request: Vec<Request>,
) -> Result<Vec<Response>> {
let expected_sequences = request
.into_iter()
.map(|request| self.send(0u8, request))
.collect::<Result<Vec<_>>>()?;
let responses = expected_sequences
.into_iter()
.map(|sequence| self.receive_next(sequence))
.collect::<Result<Vec<_>>>()?;
Ok(responses)
}
}
impl SendAndReceive<Request, Response> for Handle {
type Error = Error;
fn send_and_receive(&mut self, request: Request) -> Result<Response> {
let mut responses = self.send_and_receive(vec![request])?;
let response = responses.drain(..).next();
response.ok_or_else(|| ErrorKind::ProgrammerError.into())
}
}
impl<Q, S> SendAndReceive<Q, S> for Handle
where
Q: Into<Request> + IsRequest<Response = S>,
S: super::messages::TryFromResponse + IsResponse<Request = Q>,
{
type Error = Error;
fn send_and_receive(&mut self, request: Q) -> Result<S> {
let response = self.send_and_receive(request.into())?;
Ok(S::try_from_response(response)?)
}
}
#[derive(Clone)]
pub struct Handle {
seq: SequenceHolder,
tx: Sender<RequestItem>,
rx: Arc<Mutex<Receiver<ResponseItem>>>,
}
impl Handle {
pub fn send(&mut self, options: u8, request: Request) -> Result<u8> {
let sequence = self.seq.request_sequence_number()?;
self.tx
.send(RequestItem::Request {
request,
options,
sequence,
})
.map_err(|e| {
e.context(ErrorKind::ChannelReceiverDisconnected)
})?;
Ok(sequence)
}
fn receive_next(&self, expected_sequence: u8) -> Result<Response> {
for _i in 0..20 {
let result = self.rx
.lock()
.unwrap()
.recv_timeout(Duration::from_millis(100));
match result {
Ok(ResponseItem {
sequence,
response,
}) => {
if expected_sequence == sequence {
return Ok(response);
}
}
Err(RecvTimeoutError::Timeout) => {}
Err(d @ RecvTimeoutError::Disconnected) => {
return Err(d.context(
ErrorKind::ChannelReceiverDisconnected,
).into());
}
}
}
Err(ErrorKind::Timeout.into())
}
}