serial_arbiter/
lib.rs

1mod connection;
2mod serial_port;
3
4use connection::Connection;
5use crossbeam::channel::{bounded, Receiver, RecvTimeoutError, SendError, Sender};
6use serial_port::{port_recv, port_send};
7use std::collections::VecDeque;
8use std::path::Path;
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use std::{io, mem, thread};
12
13pub const POLLING_INTERVAL: Duration = Duration::from_millis(1);
14
15/// # Serial Port Arbiter
16///
17/// This is a Linux-only serial port library that offers the following benefits
18/// over directly using `/dev/tty`:
19/// 1. Opens the `/dev/tty` file with flags for non-blocking access.
20/// 2. Sets the `termios` flags to use the TTY in raw mode.
21/// 3. Prevents deadlocks caused by input buffer starvation.
22/// 4. Prevents data garbling by implementing transaction arbitration.
23/// 5. Gracefully handles interrupts and timeout errors.
24/// 6. Gracefully handles connection errors and automatically reconnects.
25/// 7. Provides a more convenient API than the raw `io::Read` and `io::Write`.
26///
27/// **This is an "async-less" library**, and it is intended to remain that way.  
28/// If you need asynchronous behavior, you can easily make it async-compatible in your own code.
29#[derive(Clone)]
30pub struct Arbiter {
31    conn: Arc<Connection>,
32    chan: Sender<Request>,
33}
34
35enum Request {
36    Clear(Clear),
37    Transmit(Transmit),
38    Receive(Receive),
39}
40
41struct Clear {
42    pub response: Sender<io::Result<()>>,
43}
44
45struct Transmit {
46    pub tx_bytes: Arc<[u8]>,
47    pub deadline: Instant,
48    pub response: Sender<io::Result<()>>,
49}
50
51struct Receive {
52    pub until: Option<u8>,
53    pub deadline: Option<Instant>,
54    pub response: Sender<io::Result<Option<Vec<u8>>>>,
55}
56
57struct WorkerThread {
58    buff: VecDeque<u8>,
59    conn: Arc<Connection>,
60    chan: Receiver<Request>,
61}
62
63impl Default for Arbiter {
64    fn default() -> Self {
65        Self::new()
66    }
67}
68
69impl Arbiter {
70    /// Creates a new arbiter which will handle a serial port
71    /// connection defined by the given serial port builder.
72    pub fn new() -> Self {
73        let conn = Arc::new(Connection::new());
74
75        // Setup read and write channels
76        let (req_tx, req_rx) = bounded::<Request>(0);
77
78        // Spawn background thread
79        let worker = WorkerThread::new(conn.clone(), req_rx);
80        worker.spawn();
81
82        Self { conn, chan: req_tx }
83    }
84
85    /// Closes the serial port
86    pub fn close(&self) {
87        self.conn.close();
88    }
89
90    /// Returns true if the connection is open
91    pub fn is_open(&self) -> bool {
92        self.conn.is_open()
93    }
94
95    /// Opens the serial port.
96    pub fn open(&self, path: impl AsRef<Path>) -> io::Result<()> {
97        self.conn.set_path(path);
98        self.conn.open().map(|_| ())
99    }
100
101    /// Clear the Rx buffer of the serial port.
102    pub fn clear_rx_buff(&self) -> io::Result<()> {
103        let (response, result_ch) = bounded(1);
104        let request = Request::Clear(Clear { response });
105        if let Err(SendError { .. }) = self.chan.send(request) {
106            return Err(io::Error::other("Internal error"));
107        }
108        match result_ch.recv() {
109            Err(_) => Err(io::Error::other("Internal error")),
110            Ok(result) => result,
111        }
112    }
113
114    /// Transmits data to the serial port.
115    pub fn transmit(&self, tx_bytes: Arc<[u8]>, deadline: Instant) -> io::Result<()> {
116        let (response, result_ch) = bounded(1);
117        let request = Request::Transmit(Transmit {
118            tx_bytes,
119            deadline,
120            response,
121        });
122        if let Err(SendError { .. }) = self.chan.send(request) {
123            return Err(io::Error::other("Internal error"));
124        }
125        match result_ch.recv() {
126            Err(_) => Err(io::Error::other("Internal error")),
127            Ok(result) => result,
128        }
129    }
130
131    /// Transmits a string to the serial port.
132    /// Returns any bytes received during transmission.
133    pub fn transmit_str(&self, str: impl AsRef<str>, deadline: Instant) -> io::Result<()> {
134        let tx_bytes = str.as_ref().as_bytes().into();
135        self.transmit(tx_bytes, deadline)
136    }
137
138    /// Receives data from the serial port
139    pub fn receive(
140        &self,
141        until: Option<u8>,
142        deadline: Option<Instant>,
143    ) -> io::Result<Option<Vec<u8>>> {
144        let (response, result_ch) = bounded(1);
145        let request = Request::Receive(Receive {
146            until,
147            deadline,
148            response,
149        });
150        if let Err(SendError { .. }) = self.chan.send(request) {
151            return Err(io::Error::other("Internal error"));
152        }
153        match result_ch.recv() {
154            Err(_) => Err(io::Error::other("Internal error")),
155            Ok(result) => result,
156        }
157    }
158
159    /// Receives data from the serial port and converts to a String
160    pub fn receive_string(
161        &self,
162        until: Option<u8>,
163        deadline: Option<Instant>,
164    ) -> io::Result<Option<String>> {
165        let result = self.receive(until, deadline)?;
166        Ok(result.map(|x| String::from_utf8_lossy(&x).to_string()))
167    }
168
169    /// Change the duration of cooloff after disconnecting due to an error
170    /// and before a new connection attempt is made. If set to None then
171    /// another connect attepmpt is tried without any artificial delays.
172    pub fn set_cooloff_duration(&self, cooloff: Option<Duration>) {
173        self.conn.set_cooloff_duration(cooloff);
174    }
175}
176
177impl WorkerThread {
178    fn new(connection: Arc<Connection>, requests: Receiver<Request>) -> Self {
179        Self {
180            buff: VecDeque::new(),
181            conn: connection,
182            chan: requests,
183        }
184    }
185
186    fn spawn(mut self) {
187        thread::spawn(move || loop {
188            self.process();
189        });
190    }
191
192    fn process(&mut self) {
193        loop {
194            let request_recv = self.chan.recv_timeout(POLLING_INTERVAL);
195            match request_recv {
196                Err(RecvTimeoutError::Disconnected) => {
197                    // Stop signal
198                    return;
199                }
200                Err(RecvTimeoutError::Timeout) => {
201                    // Collect incomming data to avoid RX buffer starvation
202                    let _ = self.receive_from_port(None, None);
203                }
204                Ok(request) => match request {
205                    Request::Clear(tx) => {
206                        let result = if self.conn.is_open() {
207                            self.receive_from_port(None, None)
208                        } else {
209                            Ok(())
210                        };
211                        self.buff.clear();
212                        let _ = tx.response.try_send(result);
213                    }
214                    Request::Transmit(tx) => {
215                        let result = self.transmit_to_port(tx.tx_bytes, tx.deadline);
216                        let _ = tx.response.try_send(result);
217                    }
218                    Request::Receive(rx) => {
219                        // Check if we can skip reading from port
220                        if let Some(delimiter) = rx.until {
221                            // If we have all needed data
222                            let colltype = CollectKind::UntilOrNothing(delimiter);
223                            if let Some(data) = self.collect_from_buff(colltype) {
224                                // Return the data immediately
225                                let _ = rx.response.try_send(Ok(Some(data)));
226                                continue;
227                            }
228                        }
229
230                        // Receive all new available data from the port
231                        if let Err(err) = self.receive_from_port(rx.until, rx.deadline) {
232                            // Error when receiving data
233                            let _ = rx.response.try_send(Err(err));
234                            continue;
235                        }
236
237                        // Return collected data
238                        let colltype = match rx.until {
239                            None => CollectKind::Everything,
240                            Some(delimiter) => CollectKind::UntilOrEverything(delimiter),
241                        };
242                        let data = self.collect_from_buff(colltype);
243                        let _ = rx.response.try_send(Ok(data));
244                    }
245                },
246            };
247        }
248    }
249
250    fn receive_from_port(
251        &mut self,
252        until: Option<u8>,
253        deadline: Option<Instant>,
254    ) -> io::Result<()> {
255        let file_mutex = self.conn.open()?;
256        let mut file = file_mutex.lock().unwrap();
257        let result = port_recv(&mut file, &mut self.buff, until, deadline);
258        if result.is_err() {
259            self.conn.close();
260        }
261        result
262    }
263
264    fn transmit_to_port(&mut self, data: Arc<[u8]>, deadline: Instant) -> io::Result<()> {
265        let file_mutex = self.conn.open()?;
266        let mut file = file_mutex.lock().unwrap();
267        let result = port_send(&mut file, &data, &mut self.buff, deadline);
268        if result.is_err() {
269            self.conn.close();
270        }
271        result
272    }
273
274    /// Collect data from the RX FIFO buffer.
275    fn collect_from_buff(&mut self, collect: CollectKind) -> Option<Vec<u8>> {
276        if self.buff.is_empty() {
277            return None;
278        }
279        match collect {
280            CollectKind::Everything => self.collect_from_buff_everything(),
281            CollectKind::UntilOrEverything(delimiter) => {
282                if let Some(pos) = self.buff.iter().position(|x| x == &delimiter) {
283                    self.collect_from_buff_count(pos + 1)
284                } else {
285                    self.collect_from_buff_everything()
286                }
287            }
288            CollectKind::UntilOrNothing(delimiter) => {
289                if let Some(pos) = self.buff.iter().position(|x| x == &delimiter) {
290                    self.collect_from_buff_count(pos + 1)
291                } else {
292                    None
293                }
294            }
295        }
296    }
297
298    /// Collect the given count of elements from the RX FIFO buffer
299    fn collect_from_buff_count(&mut self, count: usize) -> Option<Vec<u8>> {
300        if self.buff.is_empty() {
301            // Return nothing
302            return None;
303        }
304        if self.buff.len() <= count {
305            return self.collect_from_buff_everything();
306        }
307        // Return part of the buffer
308        let mut data = self.buff.split_off(count);
309        mem::swap(&mut self.buff, &mut data);
310        Some(data.into())
311    }
312
313    /// Collect all data from the RX FIFO buffer
314    fn collect_from_buff_everything(&mut self) -> Option<Vec<u8>> {
315        if self.buff.is_empty() {
316            return None;
317        }
318        let mut data = VecDeque::new();
319        mem::swap(&mut self.buff, &mut data);
320        Some(data.into())
321    }
322}
323
324enum CollectKind {
325    /// Consume all data from the buffer
326    Everything,
327    /// Consume all data from the buffer but only until the given byte.
328    /// If the byte is not found then consume the whole buffer.
329    UntilOrEverything(u8),
330    /// Consume data from the buffer but only until the given byte.
331    /// If the byte is not found then do not consume any data from the buffer.
332    UntilOrNothing(u8),
333}