1mod connection;
2mod serial_port;
3
4use connection::Connection;
5use crossbeam::channel::{bounded, Receiver, RecvTimeoutError, SendError, Sender};
6use serial_port::{port_recv, port_send};
7use std::collections::VecDeque;
8use std::path::Path;
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use std::{io, mem, thread};
12
13pub const POLLING_INTERVAL: Duration = Duration::from_millis(1);
14
15#[derive(Clone)]
30pub struct Arbiter {
31 conn: Arc<Connection>,
32 chan: Sender<Request>,
33}
34
35enum Request {
36 Clear(Clear),
37 Transmit(Transmit),
38 Receive(Receive),
39}
40
41struct Clear {
42 pub response: Sender<io::Result<()>>,
43}
44
45struct Transmit {
46 pub tx_bytes: Arc<[u8]>,
47 pub deadline: Instant,
48 pub response: Sender<io::Result<()>>,
49}
50
51struct Receive {
52 pub until: Option<u8>,
53 pub deadline: Option<Instant>,
54 pub response: Sender<io::Result<Option<Vec<u8>>>>,
55}
56
57struct WorkerThread {
58 buff: VecDeque<u8>,
59 conn: Arc<Connection>,
60 chan: Receiver<Request>,
61}
62
63impl Default for Arbiter {
64 fn default() -> Self {
65 Self::new()
66 }
67}
68
69impl Arbiter {
70 pub fn new() -> Self {
73 let conn = Arc::new(Connection::new());
74
75 let (req_tx, req_rx) = bounded::<Request>(0);
77
78 let worker = WorkerThread::new(conn.clone(), req_rx);
80 worker.spawn();
81
82 Self { conn, chan: req_tx }
83 }
84
85 pub fn close(&self) {
87 self.conn.close();
88 }
89
90 pub fn is_open(&self) -> bool {
92 self.conn.is_open()
93 }
94
95 pub fn open(&self, path: impl AsRef<Path>) -> io::Result<()> {
97 self.conn.set_path(path);
98 self.conn.open().map(|_| ())
99 }
100
101 pub fn clear_rx_buff(&self) -> io::Result<()> {
103 let (response, result_ch) = bounded(1);
104 let request = Request::Clear(Clear { response });
105 if let Err(SendError { .. }) = self.chan.send(request) {
106 return Err(io::Error::other("Internal error"));
107 }
108 match result_ch.recv() {
109 Err(_) => Err(io::Error::other("Internal error")),
110 Ok(result) => result,
111 }
112 }
113
114 pub fn transmit(&self, tx_bytes: Arc<[u8]>, deadline: Instant) -> io::Result<()> {
116 let (response, result_ch) = bounded(1);
117 let request = Request::Transmit(Transmit {
118 tx_bytes,
119 deadline,
120 response,
121 });
122 if let Err(SendError { .. }) = self.chan.send(request) {
123 return Err(io::Error::other("Internal error"));
124 }
125 match result_ch.recv() {
126 Err(_) => Err(io::Error::other("Internal error")),
127 Ok(result) => result,
128 }
129 }
130
131 pub fn transmit_str(&self, str: impl AsRef<str>, deadline: Instant) -> io::Result<()> {
134 let tx_bytes = str.as_ref().as_bytes().into();
135 self.transmit(tx_bytes, deadline)
136 }
137
138 pub fn receive(
140 &self,
141 until: Option<u8>,
142 deadline: Option<Instant>,
143 ) -> io::Result<Option<Vec<u8>>> {
144 let (response, result_ch) = bounded(1);
145 let request = Request::Receive(Receive {
146 until,
147 deadline,
148 response,
149 });
150 if let Err(SendError { .. }) = self.chan.send(request) {
151 return Err(io::Error::other("Internal error"));
152 }
153 match result_ch.recv() {
154 Err(_) => Err(io::Error::other("Internal error")),
155 Ok(result) => result,
156 }
157 }
158
159 pub fn receive_string(
161 &self,
162 until: Option<u8>,
163 deadline: Option<Instant>,
164 ) -> io::Result<Option<String>> {
165 let result = self.receive(until, deadline)?;
166 Ok(result.map(|x| String::from_utf8_lossy(&x).to_string()))
167 }
168
169 pub fn set_cooloff_duration(&self, cooloff: Option<Duration>) {
173 self.conn.set_cooloff_duration(cooloff);
174 }
175}
176
177impl WorkerThread {
178 fn new(connection: Arc<Connection>, requests: Receiver<Request>) -> Self {
179 Self {
180 buff: VecDeque::new(),
181 conn: connection,
182 chan: requests,
183 }
184 }
185
186 fn spawn(mut self) {
187 thread::spawn(move || loop {
188 self.process();
189 });
190 }
191
192 fn process(&mut self) {
193 loop {
194 let request_recv = self.chan.recv_timeout(POLLING_INTERVAL);
195 match request_recv {
196 Err(RecvTimeoutError::Disconnected) => {
197 return;
199 }
200 Err(RecvTimeoutError::Timeout) => {
201 let _ = self.receive_from_port(None, None);
203 }
204 Ok(request) => match request {
205 Request::Clear(tx) => {
206 let result = if self.conn.is_open() {
207 self.receive_from_port(None, None)
208 } else {
209 Ok(())
210 };
211 self.buff.clear();
212 let _ = tx.response.try_send(result);
213 }
214 Request::Transmit(tx) => {
215 let result = self.transmit_to_port(tx.tx_bytes, tx.deadline);
216 let _ = tx.response.try_send(result);
217 }
218 Request::Receive(rx) => {
219 if let Some(delimiter) = rx.until {
221 let colltype = CollectKind::UntilOrNothing(delimiter);
223 if let Some(data) = self.collect_from_buff(colltype) {
224 let _ = rx.response.try_send(Ok(Some(data)));
226 continue;
227 }
228 }
229
230 if let Err(err) = self.receive_from_port(rx.until, rx.deadline) {
232 let _ = rx.response.try_send(Err(err));
234 continue;
235 }
236
237 let colltype = match rx.until {
239 None => CollectKind::Everything,
240 Some(delimiter) => CollectKind::UntilOrEverything(delimiter),
241 };
242 let data = self.collect_from_buff(colltype);
243 let _ = rx.response.try_send(Ok(data));
244 }
245 },
246 };
247 }
248 }
249
250 fn receive_from_port(
251 &mut self,
252 until: Option<u8>,
253 deadline: Option<Instant>,
254 ) -> io::Result<()> {
255 let file_mutex = self.conn.open()?;
256 let mut file = file_mutex.lock().unwrap();
257 let result = port_recv(&mut file, &mut self.buff, until, deadline);
258 if result.is_err() {
259 self.conn.close();
260 }
261 result
262 }
263
264 fn transmit_to_port(&mut self, data: Arc<[u8]>, deadline: Instant) -> io::Result<()> {
265 let file_mutex = self.conn.open()?;
266 let mut file = file_mutex.lock().unwrap();
267 let result = port_send(&mut file, &data, &mut self.buff, deadline);
268 if result.is_err() {
269 self.conn.close();
270 }
271 result
272 }
273
274 fn collect_from_buff(&mut self, collect: CollectKind) -> Option<Vec<u8>> {
276 if self.buff.is_empty() {
277 return None;
278 }
279 match collect {
280 CollectKind::Everything => self.collect_from_buff_everything(),
281 CollectKind::UntilOrEverything(delimiter) => {
282 if let Some(pos) = self.buff.iter().position(|x| x == &delimiter) {
283 self.collect_from_buff_count(pos + 1)
284 } else {
285 self.collect_from_buff_everything()
286 }
287 }
288 CollectKind::UntilOrNothing(delimiter) => {
289 if let Some(pos) = self.buff.iter().position(|x| x == &delimiter) {
290 self.collect_from_buff_count(pos + 1)
291 } else {
292 None
293 }
294 }
295 }
296 }
297
298 fn collect_from_buff_count(&mut self, count: usize) -> Option<Vec<u8>> {
300 if self.buff.is_empty() {
301 return None;
303 }
304 if self.buff.len() <= count {
305 return self.collect_from_buff_everything();
306 }
307 let mut data = self.buff.split_off(count);
309 mem::swap(&mut self.buff, &mut data);
310 Some(data.into())
311 }
312
313 fn collect_from_buff_everything(&mut self) -> Option<Vec<u8>> {
315 if self.buff.is_empty() {
316 return None;
317 }
318 let mut data = VecDeque::new();
319 mem::swap(&mut self.buff, &mut data);
320 Some(data.into())
321 }
322}
323
324enum CollectKind {
325 Everything,
327 UntilOrEverything(u8),
330 UntilOrNothing(u8),
333}