Skip to main content

plm/
broker.rs

1use std::path::Path;
2use std::sync::mpsc::channel;
3use std::thread;
4use std::time::Duration;
5
6use futures::{
7    channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
8    future::FutureExt,
9    select,
10    sink::SinkExt,
11    stream::{Stream, StreamExt},
12};
13
14use log::debug;
15
16use tokio::io::{AsyncReadExt, AsyncWriteExt};
17use tokio_serial::{DataBits, FlowControl, Parity, Serial, SerialPortSettings, StopBits};
18use tokio_util::codec::*;
19
20use crate::error::*;
21use crate::frame::*;
22
23pub enum BrokerMessage {
24    AddListener {
25        listener: UnboundedSender<Frame>,
26    },
27    SendFrame {
28        frame: Frame,
29        responder: UnboundedSender<Result<Frame, Error>>,
30    },
31}
32
33pub struct Broker {
34    sender: UnboundedSender<BrokerMessage>,
35}
36
37async fn event_loop(
38    mut receiver: UnboundedReceiver<BrokerMessage>,
39    mut framed: Framed<impl AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static, FrameCodec>,
40) {
41    let mut listeners = Vec::<UnboundedSender<Frame>>::new();
42
43    loop {
44        select! {
45            maybe_frame = framed.next().fuse() => match(maybe_frame) {
46                Some(Ok(frame)) => {
47                    debug!("Received Frame: {:02x?}", frame);
48
49                    let mut new_listeners = Vec::with_capacity(listeners.len());
50                    while let Some(mut listener) = listeners.pop() {
51                        if listener.send(frame.clone()).await.is_ok() {
52                            new_listeners.push(listener);
53                        }
54                    }
55
56                    listeners = new_listeners;
57                },
58                _ => break,
59            },
60            msg = receiver.next() => {
61                match (msg) {
62                    Some(BrokerMessage::AddListener{ listener }) => {
63                        listeners.push(listener);
64                    },
65                    Some(BrokerMessage::SendFrame{ frame, mut responder }) => {
66                        debug!("Sending Frame: {:02x?}", frame);
67                        if let Err(e) = framed.send(frame).await {
68                            let _ = responder.send(Err(e)).await;
69                            continue;
70                        }
71
72                        match framed.next().await {
73                            None => {
74                                let _ = responder.send(Err(Error::Disconnected)).await;
75                                break;
76                            },
77                            Some(response) => {
78                                debug!("Received Response: {:02x?}", response);
79                                let _ = responder.send(response).await;
80                            }
81                        }
82                    },
83                    None => break, // No more messages coming, exit
84                }
85            }
86        }
87    }
88}
89
90impl Broker {
91    pub fn from_path(path: impl AsRef<Path> + Send + 'static) -> Result<Broker, std::io::Error> {
92        let (sender, receiver) = unbounded();
93
94        let (init_sender, init_receiver) = channel();
95
96        thread::spawn(move || {
97            let mut rt = tokio::runtime::Runtime::new().unwrap();
98            rt.block_on(async move {
99                let settings = SerialPortSettings {
100                    baud_rate: 19200,
101                    data_bits: DataBits::Eight,
102                    flow_control: FlowControl::None,
103                    parity: Parity::None,
104                    stop_bits: StopBits::One,
105                    timeout: Duration::from_millis(100),
106                };
107
108                match Serial::from_path(path.as_ref(), &settings) {
109                    Ok(port) => {
110                        init_sender.send(Ok(())).unwrap();
111                        event_loop(receiver, Framed::new(port, FrameCodec())).await
112                    }
113                    Err(e) => init_sender.send(Err(e)).unwrap(),
114                }
115            });
116        });
117
118        // Make sure we were able to create the port
119        init_receiver.recv().unwrap()?;
120        Ok(Broker { sender })
121    }
122
123    pub fn new(handle: impl AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static) -> Broker {
124        let (sender, receiver) = unbounded();
125
126        thread::spawn(move || {
127            let mut rt = tokio::runtime::Runtime::new().unwrap();
128            rt.block_on(
129                async move { event_loop(receiver, Framed::new(handle, FrameCodec())).await },
130            );
131        });
132
133        Broker { sender }
134    }
135
136    pub async fn send(&mut self, frame: Frame) -> Result<Frame, Error> {
137        let (sender, mut receiver) = unbounded();
138        self.sender
139            .send(BrokerMessage::SendFrame {
140                frame,
141                responder: sender,
142            })
143            .await?;
144        receiver.next().await.ok_or_else(|| Error::Disconnected)?
145    }
146
147    pub async fn listen(&mut self) -> Result<impl Stream<Item = Frame>, Error> {
148        let (sender, receiver) = unbounded();
149        self.sender
150            .send(BrokerMessage::AddListener { listener: sender })
151            .await?;
152        Ok(receiver)
153    }
154}