use crate::messages::{
IsRequest, IsResponse, Message, MessageExt, Notification, Request,
Response,
};
use crate::{error, serialport, Error, Result};
use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender};
use ommui_broadcast::{
Broadcaster, SubscriptionManager, SubscriptionRegistration,
};
use serialport::SerialPort;
use snafu::{OptionExt, ResultExt};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
const RESPONSE_TIMEOUT_MILLIS: u64 = 2000;
#[derive(Clone, Debug)]
pub struct ResponseItem {
pub response: Response,
pub sequence: u8,
}
#[derive(Clone, Debug)]
pub(crate) struct RequestItem {
request: Request,
options: u8,
sequence: u8,
}
#[derive(Clone, Debug)]
pub struct NotificationItem {
pub notification: Notification,
pub sequence: u8,
}
struct ConnectionInnerLocked {
port_id: String,
tx_port: Box<SerialPort>,
rx_handle: Option<thread::JoinHandle<()>>,
rx_shutdown_sender: Sender<()>,
sequence: u8,
}
unsafe impl Sync for ConnectionInnerLocked {}
impl ConnectionInnerLocked {
fn get_next_sequence_number(&mut self) -> u8 {
let (next, _overflow) = self.sequence.overflowing_add(1);
self.sequence = next;
next
}
fn send(&mut self, options: u8, request: &Request) -> Result<u8> {
let sequence = self.get_next_sequence_number();
debug!("Sending with sequence number {}: {:?}", sequence, request);
request.write_to(&mut self.tx_port, options, sequence)?;
Ok(sequence)
}
}
#[derive(Clone)]
pub struct Connection {
response_subscription_manager: SubscriptionManager<ResponseItem>,
notification_subscription_manager:
SubscriptionManager<NotificationItem>,
stale_connection_subscription_manager: SubscriptionManager<()>,
stale_connection_broadcaster: Broadcaster<()>,
inner: Arc<RwLock<ConnectionInnerLocked>>,
}
impl Drop for ConnectionInnerLocked {
fn drop(&mut self) {
self.rx_shutdown_sender.send(()).unwrap();
if let Some(rx_handle) = self.rx_handle.take() {
info!("Waiting for port {} to close", self.port_id);
if rx_handle.join().is_err() {
warn!("Failure joining xio rx thread");
} else {
info!("Closed port {}", self.port_id);
}
}
}
}
impl Connection {
pub fn new_with_identifier(
port: &str,
identifier: &str,
) -> Result<Self> {
let mut settings = serialport::SerialPortSettings::default();
settings.timeout = Duration::from_millis(RESPONSE_TIMEOUT_MILLIS);
let tx_port = serialport::open_with_settings(port, &settings)
.context(error::SerialPort)?;
let mut rx_port =
tx_port.try_clone().context(error::SerialPort)?;
let mut response_broadcaster = Broadcaster::default();
let mut notification_broadcaster = Broadcaster::default();
let stale_connection_broadcaster = Broadcaster::default();
let response_subscription_manager =
response_broadcaster.subscription_manager();
let notification_subscription_manager =
notification_broadcaster.subscription_manager();
let stale_connection_subscription_manager =
stale_connection_broadcaster.subscription_manager();
let (rx_shutdown_sender, rx_shutdown_receiver) = bounded(1);
let rx_handle = {
Some(
thread::Builder::new()
.name(format!("xio_rx:{}", identifier))
.spawn(move || loop {
if rx_shutdown_receiver.try_recv().is_ok() {
return;
}
match Message::read_from(&mut rx_port) {
Ok((sequence, msg)) => match msg {
Message::Request(r) => {
warn!(
"Received a request message from \
a xio device which should never \
happen: {:?}",
r
);
}
Message::Response(response) => {
let response = ResponseItem {
sequence,
response,
};
response_broadcaster
.broadcast(response);
}
Message::Notification(notification) => {
let notification = NotificationItem {
sequence,
notification,
};
notification_broadcaster
.broadcast(notification);
}
},
Err(error::Error::Timeout { .. }) => {}
Err(error::Error::Io {
ref source, ..
}) if source.kind()
== std::io::ErrorKind::TimedOut =>
{
if rx_shutdown_receiver.try_recv().is_ok()
{
return;
}
}
Err(e) => {
if rx_shutdown_receiver.try_recv().is_ok()
{
return;
}
warn!("Error received: {:?}", e);
}
}
})
.context(error::ThreadCreation)?,
)
};
let inner = Arc::new(RwLock::new(ConnectionInnerLocked {
port_id: port.to_string(),
rx_handle,
tx_port,
rx_shutdown_sender,
sequence: 0u8,
}));
Ok(Connection {
inner,
response_subscription_manager,
notification_subscription_manager,
stale_connection_subscription_manager,
stale_connection_broadcaster,
})
}
pub fn new(port: &str) -> Result<Self> {
Self::new_with_identifier(port, port)
}
fn get_responses_receiver(
&mut self,
) -> Result<Receiver<ResponseItem>> {
self.response_subscription_manager
.subscribe()
.context(error::Broadcast)
}
pub fn get_notifications_receiver(
&mut self,
) -> Result<Receiver<NotificationItem>> {
self.notification_subscription_manager
.subscribe()
.context(error::Broadcast)
}
pub fn get_stale_connection_receiver(
&mut self,
) -> Result<Receiver<()>> {
self.stale_connection_subscription_manager
.subscribe()
.context(error::Broadcast)
}
}
pub trait SendAndReceive<Q, S>
where
Q: IsRequest<Response = S>,
S: IsResponse<Request = Q>,
{
type Error;
fn send_and_receive(
&mut self,
request: Q,
) -> std::result::Result<S, Self::Error>;
}
impl SendAndReceive<Vec<Request>, Vec<Response>> for Connection {
type Error = Error;
fn send_and_receive(
&mut self,
requests: Vec<Request>,
) -> Result<Vec<Response>> {
let receiver = self.get_responses_receiver()?;
let timeout = Duration::from_millis(RESPONSE_TIMEOUT_MILLIS);
let options = 0u8;
let mut write = self.inner.write().unwrap();
let mut stale_connection_broadcaster =
self.stale_connection_broadcaster.clone();
Ok(requests
.into_iter()
.map(|request| {
let expected_sequence = write.send(options, &request)?;
match receiver.recv_timeout(timeout) {
Ok(ResponseItem { sequence, response }) => {
if sequence == expected_sequence {
Ok(response)
} else {
error::XioSequenceError {
expected: expected_sequence,
received: sequence,
request,
response,
}
.fail()?
}
}
Err(RecvTimeoutError::Timeout) => {
warn!(
"Timeout while waiting for answer from device"
);
stale_connection_broadcaster.broadcast(());
error::Timeout.fail()?
}
Err(RecvTimeoutError::Disconnected) => {
error::ChannelSenderDisconnected.fail()?
}
}
})
.collect::<Result<Vec<_>>>()?)
}
}
impl SendAndReceive<Request, Response> for Connection {
type Error = Error;
fn send_and_receive(&mut self, request: Request) -> Result<Response> {
let mut responses = self.send_and_receive(vec![request])?;
let response = responses.drain(..).next();
response.context(error::ProgrammerError)
}
}
impl<Q, S> SendAndReceive<Q, S> for Connection
where
Q: Into<Request> + IsRequest<Response = S>,
S: super::messages::TryFromResponse + IsResponse<Request = Q>,
{
type Error = Error;
fn send_and_receive(&mut self, request: Q) -> Result<S> {
let response = self.send_and_receive(request.into())?;
Ok(S::try_from_response(response)?)
}
}