1use std::path::Path;
2use std::sync::mpsc::channel;
3use std::thread;
4use std::time::Duration;
5
6use futures::{
7 channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
8 future::FutureExt,
9 select,
10 sink::SinkExt,
11 stream::{Stream, StreamExt},
12};
13
14use log::debug;
15
16use tokio::io::{AsyncReadExt, AsyncWriteExt};
17use tokio_serial::{DataBits, FlowControl, Parity, Serial, SerialPortSettings, StopBits};
18use tokio_util::codec::*;
19
20use crate::error::*;
21use crate::frame::*;
22
23pub enum BrokerMessage {
24 AddListener {
25 listener: UnboundedSender<Frame>,
26 },
27 SendFrame {
28 frame: Frame,
29 responder: UnboundedSender<Result<Frame, Error>>,
30 },
31}
32
33pub struct Broker {
34 sender: UnboundedSender<BrokerMessage>,
35}
36
37async fn event_loop(
38 mut receiver: UnboundedReceiver<BrokerMessage>,
39 mut framed: Framed<impl AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static, FrameCodec>,
40) {
41 let mut listeners = Vec::<UnboundedSender<Frame>>::new();
42
43 loop {
44 select! {
45 maybe_frame = framed.next().fuse() => match(maybe_frame) {
46 Some(Ok(frame)) => {
47 debug!("Received Frame: {:02x?}", frame);
48
49 let mut new_listeners = Vec::with_capacity(listeners.len());
50 while let Some(mut listener) = listeners.pop() {
51 if listener.send(frame.clone()).await.is_ok() {
52 new_listeners.push(listener);
53 }
54 }
55
56 listeners = new_listeners;
57 },
58 _ => break,
59 },
60 msg = receiver.next() => {
61 match (msg) {
62 Some(BrokerMessage::AddListener{ listener }) => {
63 listeners.push(listener);
64 },
65 Some(BrokerMessage::SendFrame{ frame, mut responder }) => {
66 debug!("Sending Frame: {:02x?}", frame);
67 if let Err(e) = framed.send(frame).await {
68 let _ = responder.send(Err(e)).await;
69 continue;
70 }
71
72 match framed.next().await {
73 None => {
74 let _ = responder.send(Err(Error::Disconnected)).await;
75 break;
76 },
77 Some(response) => {
78 debug!("Received Response: {:02x?}", response);
79 let _ = responder.send(response).await;
80 }
81 }
82 },
83 None => break, }
85 }
86 }
87 }
88}
89
90impl Broker {
91 pub fn from_path(path: impl AsRef<Path> + Send + 'static) -> Result<Broker, std::io::Error> {
92 let (sender, receiver) = unbounded();
93
94 let (init_sender, init_receiver) = channel();
95
96 thread::spawn(move || {
97 let mut rt = tokio::runtime::Runtime::new().unwrap();
98 rt.block_on(async move {
99 let settings = SerialPortSettings {
100 baud_rate: 19200,
101 data_bits: DataBits::Eight,
102 flow_control: FlowControl::None,
103 parity: Parity::None,
104 stop_bits: StopBits::One,
105 timeout: Duration::from_millis(100),
106 };
107
108 match Serial::from_path(path.as_ref(), &settings) {
109 Ok(port) => {
110 init_sender.send(Ok(())).unwrap();
111 event_loop(receiver, Framed::new(port, FrameCodec())).await
112 }
113 Err(e) => init_sender.send(Err(e)).unwrap(),
114 }
115 });
116 });
117
118 init_receiver.recv().unwrap()?;
120 Ok(Broker { sender })
121 }
122
123 pub fn new(handle: impl AsyncReadExt + AsyncWriteExt + Unpin + Send + 'static) -> Broker {
124 let (sender, receiver) = unbounded();
125
126 thread::spawn(move || {
127 let mut rt = tokio::runtime::Runtime::new().unwrap();
128 rt.block_on(
129 async move { event_loop(receiver, Framed::new(handle, FrameCodec())).await },
130 );
131 });
132
133 Broker { sender }
134 }
135
136 pub async fn send(&mut self, frame: Frame) -> Result<Frame, Error> {
137 let (sender, mut receiver) = unbounded();
138 self.sender
139 .send(BrokerMessage::SendFrame {
140 frame,
141 responder: sender,
142 })
143 .await?;
144 receiver.next().await.ok_or_else(|| Error::Disconnected)?
145 }
146
147 pub async fn listen(&mut self) -> Result<impl Stream<Item = Frame>, Error> {
148 let (sender, receiver) = unbounded();
149 self.sender
150 .send(BrokerMessage::AddListener { listener: sender })
151 .await?;
152 Ok(receiver)
153 }
154}