serial_frame/lib.rs
1use serialport::SerialPort;
2
3use log::*;
4use std::thread;
5use std::thread::JoinHandle;
6
7use core::convert::TryFrom;
8use crossbeam_channel::{unbounded, Receiver, Sender, TryRecvError};
9use std::io::ErrorKind;
10
11pub mod common_types;
12
13pub use common_types::*;
14
15#[derive(Debug)]
16pub enum SerialFrameError {
17 CouldNotStart,
18 CouldNotSendStop,
19 SerialportDisconnected,
20 SerialThreadPaniced,
21 ReceiverDropped,
22 FailedConversion(Vec<u8>),
23}
24
25pub type Result<T> = core::result::Result<T, SerialFrameError>;
26
27/// Structure which can only be obtained by starting a SerialFrameSender structure, and can only be
28/// used to stop the resulting thread from the SerialFrameSender::start method. When this structure
29/// is dropped, the SerialFrameSender will also stop
30pub struct SerialFrameStopper {
31 handle: JoinHandle<()>,
32 stopsender: Sender<()>,
33}
34
35impl SerialFrameStopper {
36 pub fn stop(self) -> Result<()> {
37 self.stopsender
38 .send(())
39 .map_err(|_e| SerialFrameError::CouldNotSendStop)?;
40 self.handle
41 .join()
42 .map_err(|_e| SerialFrameError::SerialThreadPaniced)?;
43 Ok(())
44 }
45}
46
47/// The frame sender structure, this will create a SerialFrameSender, that once started will split
48/// incoming bytes from the serialport and send them framed by the separator
49///
50/// Ex: "This is one line\nAnd this is another\n"
51///
52/// will return "This is one line\n", and "This is another\n" in two separate vectors over the
53/// channel sent in when starting the thread
54pub struct SerialFrameSender {
55 separator: u8,
56 port: Box<dyn SerialPort>,
57 error_tx: Option<Sender<SerialFrameError>>,
58}
59
60impl SerialFrameSender {
61 pub fn new(separator: u8, port: Box<dyn SerialPort>) -> SerialFrameSender {
62 Self {
63 separator,
64 port,
65 error_tx: None,
66 }
67 }
68
69 pub fn add_error_channel(&mut self) -> Receiver<SerialFrameError> {
70 let (tx, rx) = unbounded();
71 self.error_tx = Some(tx);
72
73 rx
74 }
75
76 /// Consumes the SerialFrameSender and creates a new running thread, that will send complete
77 /// frames over the Channel it takes as input separated by the specified separator. It will
78 /// also try to convert those bytes into a Type that has implemented the TryFrom<Vec<u8>>
79 ///
80 /// Returned is structure that can be used to stop this thread, and thus unblock the serialport
81 /// or an error
82 pub fn start<T: 'static + Send + TryFrom<Vec<u8>>>(
83 mut self,
84 type_send: Sender<T>,
85 ) -> Result<SerialFrameStopper> {
86 let (stoptx, stoprx) = unbounded();
87
88 let handle = thread::Builder::new()
89 .name("SerialFrameSender".to_string())
90 .spawn(move || {
91 let mut buf: Vec<u8> = Vec::new();
92 let mut serial_byte = [0; 10240];
93
94 'thread: loop {
95 // Functionality to close the thread
96 match stoprx.try_recv() {
97 Err(TryRecvError::Empty) => {
98 match self.port.read(&mut serial_byte[..]) {
99 Ok(n) => {
100 buf.extend_from_slice(&serial_byte[..n]);
101 }
102 Err(ref e) if e.kind() == ErrorKind::TimedOut => {
103 trace!("{}", e);
104 }
105 // ends up here if unplugged
106 Err(e) => {
107 error!("{}", e);
108 if let Some(send) = &self.error_tx {
109 let res =
110 send.send(SerialFrameError::SerialportDisconnected);
111 if let Err(e) = res {
112 error!("Could not send error, quitting: {}", e);
113 }
114 }
115 break 'thread;
116 }
117 }
118
119 while let Some(end) = buf.iter().position(|&f| f == self.separator) {
120 trace!("end: {}", end);
121 let frame: Vec<u8> = buf.drain(..end + 1).collect();
122 trace!("frame: {:?}", frame);
123 // Try converting into the correct type and then sending it over
124 // the channel if successful
125 if let Ok(framed) = T::try_from(frame.clone()) {
126 let res = type_send.send(framed);
127 if let Err(e) = res {
128 error!("Could not send frame, quitting: {}", e);
129 if let Some(send) = self.error_tx {
130 let _res = send.send(SerialFrameError::ReceiverDropped);
131 }
132
133 break 'thread;
134 }
135 } else {
136 if let Some(send) = &self.error_tx {
137 let res = send.send(SerialFrameError::FailedConversion(
138 frame.clone(),
139 ));
140 if let Err(e) = res {
141 error!("Could not send frame, quitting: {}", e);
142 break 'thread;
143 }
144 }
145 }
146 }
147 }
148 Err(TryRecvError::Disconnected) => {
149 info!("Thread handle was dropped");
150 if let Some(panic) = &self.error_tx {
151 let _ = panic.send(SerialFrameError::ReceiverDropped);
152 }
153 }
154 _ => {
155 info!("Thread got stop request");
156 break 'thread;
157 }
158 }
159 }
160 });
161
162 let handle = handle.map_err(|_e| SerialFrameError::CouldNotStart)?;
163
164 let stopsend = SerialFrameStopper {
165 handle,
166 stopsender: stoptx,
167 };
168
169 Ok(stopsend)
170 }
171 pub fn stop(&mut self) -> () {}
172}
173
174#[cfg(test)]
175mod tests {
176 #[test]
177 fn it_works() {
178 assert_eq!(2 + 2, 4);
179 }
180}