flem_serial_rs/
lib.rs

1use flem::{
2    Status,
3    traits::Channel, 
4    Packet,
5};
6use serialport::SerialPort;
7use std::{
8    sync::{
9        mpsc::{self, Receiver, Sender},
10        Arc, Mutex,
11    },
12    thread,
13    thread::JoinHandle,
14    time::Duration,
15};
16
17type FlemSerialPort = Box<dyn SerialPort>;
18type FlemSerialTx = Option<Arc<Mutex<FlemSerialPort>>>;
19
20pub struct FlemSerial<const PACKET_SIZE: usize> {
21    tx_port: FlemSerialTx,
22    continue_listening: Arc<Mutex<bool>>,
23    rx_listener_handle: Option<JoinHandle<()>>,
24    tx_listener_handle: Option<JoinHandle<()>>,
25    connection_settings: ConnectionSettings,
26}
27#[derive(Debug)]
28pub enum FlemSerialErrors {
29    NoDeviceFoundByThatName,
30    MultipleDevicesFoundByThatName,
31    ErrorConnectingToDevice,
32}
33
34#[derive(Clone, Copy, Debug)]
35pub struct ConnectionSettings {
36    baud: u32,
37    parity: serialport::Parity,
38    flow_control: serialport::FlowControl,
39    data_bits: serialport::DataBits,
40    stop_bits: serialport::StopBits,
41}
42
43impl ConnectionSettings {
44    pub fn default() -> Self {
45        Self {
46            baud: 115200,
47            parity: serialport::Parity::None,
48            flow_control: serialport::FlowControl::None,
49            data_bits: serialport::DataBits::Eight,
50            stop_bits: serialport::StopBits::One,
51        }
52    }
53
54    /// Set baud rate
55    pub fn baud(&mut self, baud: u32) -> &mut Self {
56        self.baud = baud;
57        self
58    }
59
60    /// Set parity
61    pub fn parity(&mut self, parity: serialport::Parity) -> &mut Self {
62        self.parity = parity;
63        self
64    }
65
66    /// Set flow control
67    pub fn flow_control(&mut self, flow_control: serialport::FlowControl) -> &mut Self {
68        self.flow_control = flow_control;
69        self
70    }
71
72    /// Set data bits
73    pub fn data_bits(&mut self, data_bits: serialport::DataBits) -> &mut Self {
74        self.data_bits = data_bits;
75        self
76    }
77
78    /// Set stop bits
79    pub fn stop_bits(&mut self, stop_bits: serialport::StopBits) -> &mut Self {
80        self.stop_bits = stop_bits;
81        self
82    }
83}
84
85impl<const PACKET_SIZE: usize> FlemSerial<PACKET_SIZE> {
86    pub fn new() -> Self {
87        Self {
88            tx_port: None,
89            continue_listening: Arc::new(Mutex::new(false)),
90            rx_listener_handle: None,
91            tx_listener_handle: None,
92            connection_settings: ConnectionSettings::default(),
93        }
94    }
95
96    pub fn update_connection_settings(&mut self, connection_settings: ConnectionSettings) {
97        self.connection_settings = connection_settings;
98    }
99}
100
101impl<const PACKET_SIZE: usize> Channel<PACKET_SIZE> for FlemSerial<PACKET_SIZE> {
102    type Error = FlemSerialErrors;
103
104    /// Lists the ports detected by the SerialPort library. Returns None if
105    /// no serial ports are detected.
106    fn list_devices(&self) -> Vec<String> {
107        let mut vec_ports = Vec::new();
108
109        let ports = serialport::available_ports();
110
111        match ports {
112            Ok(valid_ports) => {
113                for port in valid_ports {
114                    println!("Found serial port: {}", port.port_name);
115                    vec_ports.push(port.port_name);
116                }
117            }
118            Err(_error) => {
119
120            }
121        }
122
123        vec_ports
124    }
125
126    /// Attempts to connect to a serial port with a set baud.
127    ///
128    /// Configures the serial port with the following settings:
129    /// * FlowControl: None
130    /// * Parity: None
131    /// * DataBits: 8
132    /// * StopBits: 1
133    fn connect(&mut self, port_name: &String) -> Result<(), Self::Error> {
134        let ports = serialport::available_ports().unwrap();
135
136        let filtered_ports: Vec<_> = ports
137            .iter()
138            .filter(|port| port.port_name == *port_name)
139            .collect();
140
141        match filtered_ports.len() {
142            0 => {
143                println!("No serial devices enumerated");
144                Err(FlemSerialErrors::NoDeviceFoundByThatName)
145            }
146            1 => {
147                println!("Found {}, attempting to connect...", port_name);
148                if let Ok(port) = serialport::new(port_name, self.connection_settings.baud)
149                    .flow_control(self.connection_settings.flow_control)
150                    .parity(self.connection_settings.parity)
151                    .data_bits(self.connection_settings.data_bits)
152                    .stop_bits(self.connection_settings.stop_bits)
153                    .open()
154                {
155                    self.tx_port = Some(Arc::new(Mutex::new(
156                        port.try_clone()
157                            .expect("Couldn't clone serial port for tx_port"),
158                    )));
159
160                    println!("Connection successful to {}", port_name);
161
162                    return Ok(());
163                } else {
164                    println!("Connection failed to {}", port_name);
165                    return Err(FlemSerialErrors::ErrorConnectingToDevice);
166                }
167            }
168            _ => {
169                println!(
170                    "Connection failed to {}, multiple devices by that name found",
171                    port_name
172                );
173                Err(FlemSerialErrors::MultipleDevicesFoundByThatName)
174            }
175        }
176    }
177
178    fn disconnect(&mut self) -> Result<(), Self::Error> {
179        self.unlisten().unwrap();
180
181        self.tx_port = None;
182
183        Ok(())
184    }
185
186    /// Spawns a new thread and listens for data on. Creates 2 threads, 1 to monitor incoming Rx bytes and 1 to monitor
187    /// packets that have been queued up to transmit.
188    ///
189    /// ## Arguments
190    /// * `rx_sleep_time_ms` - The amount of time to sleep between reads if there is no data present.
191    /// * `tx_sleep_time_ms` - The amount of time to sleep between checking the packet queue if no tx packets are present.
192    ///
193    /// ## Returns a tuple of (FlemRx, FlemTx):
194    /// * `FlemRx` - A struct that contains a `Receiver` queue of successfully checksumed packets and a handle to the Rx monitoring thread.
195    /// * `FlemTx` - A struct that contains a `Sender` that can be used to send packets and a handle to the Tx monitoring thread.
196    fn listen(
197        &mut self,
198        rx_sleep_time_ms: u64,
199        tx_sleep_time_ms: u64,
200    ) -> (Sender<Packet<PACKET_SIZE>>, Receiver<Packet<PACKET_SIZE>>) {
201        // Reset the continue_listening flag
202        *self.continue_listening.lock().unwrap() = true;
203
204        // Clone the continue_listening flag
205        let continue_listening_clone_rx = self.continue_listening.clone();
206        let continue_listening_clone_tx = self.continue_listening.clone();
207
208        // Create producer / consumer queues
209        let (tx_packet_from_program, packet_to_transmit) = mpsc::channel::<flem::Packet<PACKET_SIZE>>();
210        let (validated_packet, rx_packet_to_program) = mpsc::channel::<flem::Packet<PACKET_SIZE>>();
211
212        let mut local_rx_port = self
213            .tx_port
214            .as_mut()
215            .unwrap()
216            .lock()
217            .unwrap()
218            .try_clone()
219            .expect("Couldn't clone serial port for rx_port");
220
221        let mut local_tx_port = self
222            .tx_port
223            .as_mut()
224            .unwrap()
225            .lock()
226            .unwrap()
227            .try_clone()
228            .expect("Couldn't clone serial port for tx_port");
229
230        self.tx_listener_handle = Some(thread::spawn(move || {
231            println!("Starting Tx thread");
232            while *continue_listening_clone_tx.lock().unwrap() {
233                // Using a timeout so we can stop the thread when needed
234                match packet_to_transmit.recv_timeout(Duration::from_millis(tx_sleep_time_ms)) {
235                    Ok(packet) => {
236                        if let Ok(_) = local_tx_port.write_all(&packet.bytes()) {
237                            local_tx_port.flush().unwrap();
238                        }
239                    }
240                    Err(_error) => {
241                        // Timeouts are ok, just keep checking the Tx queue
242                    }
243                }
244            }
245
246            println!("Tx thread stopped");
247            *continue_listening_clone_tx.lock().unwrap() = false;
248        }));
249
250        self.rx_listener_handle = Some(thread::spawn(move || {
251            println!("Starting Rx thread");
252
253            let mut rx_buffer = [0 as u8; 2048];
254            let mut rx_packet = flem::Packet::<PACKET_SIZE>::new();
255
256            while *continue_listening_clone_rx.lock().unwrap() {
257                match local_rx_port.read(&mut rx_buffer) {
258                    Ok(bytes_to_read) => {
259                        // Check if there are any bytes, if there are no bytes,
260                        // put the thread to sleep
261                        if bytes_to_read == 0 {
262                            thread::sleep(Duration::from_millis(10));
263                        } else {
264                            for i in 0..bytes_to_read {
265                                match rx_packet.construct(rx_buffer[i]) {
266                                    Ok(_) => {
267                                        validated_packet.send(rx_packet.clone()).unwrap();
268                                        rx_packet.reset_lazy();
269                                    }
270                                    Err(error) => {
271                                        match error {
272                                            Status::PacketBuilding => {
273                                                // Normal, building packet DO NOT RESET!
274                                            }
275                                            Status::HeaderBytesNotFound => {
276                                                // Not unusual, keep scanning until packet lock occurs
277                                            }
278                                            Status::ChecksumError => {
279                                                println!("FLEM checksum error detected");
280                                                rx_packet.reset_lazy();
281                                            }
282                                            _ => {
283                                                println!("FLEM error detected: {:?}", error);
284                                                rx_packet.reset_lazy();
285                                            }
286                                        }
287                                    }
288                                }
289                            }
290                        }
291                    }
292                    Err(_error) => {
293                        // Library indicates to retry on errors, so that is
294                        // what we will do.
295                        thread::sleep(Duration::from_millis(rx_sleep_time_ms));
296                    }
297                }
298            }
299
300            println!("Rx thread stopped");
301            *continue_listening_clone_rx.lock().unwrap() = false;
302        }));
303
304        (
305            tx_packet_from_program,
306            rx_packet_to_program
307        )
308    }
309
310    /// Attempts to close down the Rx and Tx threads.
311    fn unlisten(&mut self) -> Result<(), Self::Error>{
312        *self.continue_listening.lock().unwrap() = false;
313
314        if self.tx_listener_handle.is_some() {
315            self.tx_listener_handle
316                .take()
317                .unwrap()
318                .join()
319                .expect("Couldn't join on the Tx thread");
320        }
321
322        if self.rx_listener_handle.is_some() {
323            self.rx_listener_handle
324                .take()
325                .unwrap()
326                .join()
327                .expect("Couldn't join on Rx thread");
328        }
329
330        Ok(())
331    }
332}