serial_frame/
lib.rs

1use serialport::SerialPort;
2
3use log::*;
4use std::thread;
5use std::thread::JoinHandle;
6
7use core::convert::TryFrom;
8use crossbeam_channel::{unbounded, Receiver, Sender, TryRecvError};
9use std::io::ErrorKind;
10
11pub mod common_types;
12
13pub use common_types::*;
14
15#[derive(Debug)]
16pub enum SerialFrameError {
17    CouldNotStart,
18    CouldNotSendStop,
19    SerialportDisconnected,
20    SerialThreadPaniced,
21    ReceiverDropped,
22    FailedConversion(Vec<u8>),
23}
24
25pub type Result<T> = core::result::Result<T, SerialFrameError>;
26
27/// Structure which can only be obtained by starting a SerialFrameSender structure, and can only be
28/// used to stop the resulting thread from the SerialFrameSender::start method. When this structure
29/// is dropped, the SerialFrameSender will also stop
30pub struct SerialFrameStopper {
31    handle: JoinHandle<()>,
32    stopsender: Sender<()>,
33}
34
35impl SerialFrameStopper {
36    pub fn stop(self) -> Result<()> {
37        self.stopsender
38            .send(())
39            .map_err(|_e| SerialFrameError::CouldNotSendStop)?;
40        self.handle
41            .join()
42            .map_err(|_e| SerialFrameError::SerialThreadPaniced)?;
43        Ok(())
44    }
45}
46
47/// The frame sender structure, this will create a SerialFrameSender, that once started will split
48/// incoming bytes from the serialport and send them framed by the separator
49///
50/// Ex: "This is one line\nAnd this is another\n"
51///
52/// will return "This is one line\n", and "This is another\n" in two separate vectors over the
53/// channel sent in when starting the thread
54pub struct SerialFrameSender {
55    separator: u8,
56    port: Box<dyn SerialPort>,
57    error_tx: Option<Sender<SerialFrameError>>,
58}
59
60impl SerialFrameSender {
61    pub fn new(separator: u8, port: Box<dyn SerialPort>) -> SerialFrameSender {
62        Self {
63            separator,
64            port,
65            error_tx: None,
66        }
67    }
68
69    pub fn add_error_channel(&mut self) -> Receiver<SerialFrameError> {
70        let (tx, rx) = unbounded();
71        self.error_tx = Some(tx);
72
73        rx
74    }
75
76    /// Consumes the SerialFrameSender and creates a new running thread, that will send complete
77    /// frames over the Channel it takes as input separated by the specified separator. It will
78    /// also try to convert those bytes into a Type that has implemented the TryFrom<Vec<u8>>
79    ///
80    /// Returned is structure that can be used to stop this thread, and thus unblock the serialport
81    /// or an error
82    pub fn start<T: 'static + Send + TryFrom<Vec<u8>>>(
83        mut self,
84        type_send: Sender<T>,
85    ) -> Result<SerialFrameStopper> {
86        let (stoptx, stoprx) = unbounded();
87
88        let handle = thread::Builder::new()
89            .name("SerialFrameSender".to_string())
90            .spawn(move || {
91                let mut buf: Vec<u8> = Vec::new();
92                let mut serial_byte = [0; 10240];
93
94                'thread: loop {
95                    // Functionality to close the thread
96                    match stoprx.try_recv() {
97                        Err(TryRecvError::Empty) => {
98                            match self.port.read(&mut serial_byte[..]) {
99                                Ok(n) => {
100                                    buf.extend_from_slice(&serial_byte[..n]);
101                                }
102                                Err(ref e) if e.kind() == ErrorKind::TimedOut => {
103                                    trace!("{}", e);
104                                }
105                                // ends up here if unplugged
106                                Err(e) => {
107                                    error!("{}", e);
108                                    if let Some(send) = &self.error_tx {
109                                        let res =
110                                            send.send(SerialFrameError::SerialportDisconnected);
111                                        if let Err(e) = res {
112                                            error!("Could not send error, quitting: {}", e);
113                                        }
114                                    }
115                                    break 'thread;
116                                }
117                            }
118
119                            while let Some(end) = buf.iter().position(|&f| f == self.separator) {
120                                trace!("end: {}", end);
121                                let frame: Vec<u8> = buf.drain(..end + 1).collect();
122                                trace!("frame: {:?}", frame);
123                                // Try converting into the correct type and then sending it over
124                                // the channel if successful
125                                if let Ok(framed) = T::try_from(frame.clone()) {
126                                    let res = type_send.send(framed);
127                                    if let Err(e) = res {
128                                        error!("Could not send frame, quitting: {}", e);
129                                        if let Some(send) = self.error_tx {
130                                            let _res = send.send(SerialFrameError::ReceiverDropped);
131                                        }
132
133                                        break 'thread;
134                                    }
135                                } else {
136                                    if let Some(send) = &self.error_tx {
137                                        let res = send.send(SerialFrameError::FailedConversion(
138                                            frame.clone(),
139                                        ));
140                                        if let Err(e) = res {
141                                            error!("Could not send frame, quitting: {}", e);
142                                            break 'thread;
143                                        }
144                                    }
145                                }
146                            }
147                        }
148                        Err(TryRecvError::Disconnected) => {
149                            info!("Thread handle was dropped");
150                            if let Some(panic) = &self.error_tx {
151                                let _ = panic.send(SerialFrameError::ReceiverDropped);
152                            }
153                        }
154                        _ => {
155                            info!("Thread got stop request");
156                            break 'thread;
157                        }
158                    }
159                }
160            });
161
162        let handle = handle.map_err(|_e| SerialFrameError::CouldNotStart)?;
163
164        let stopsend = SerialFrameStopper {
165            handle,
166            stopsender: stoptx,
167        };
168
169        Ok(stopsend)
170    }
171    pub fn stop(&mut self) -> () {}
172}
173
174#[cfg(test)]
175mod tests {
176    #[test]
177    fn it_works() {
178        assert_eq!(2 + 2, 4);
179    }
180}