sericom_core/serial_actor/
mod.rs

1//! This module holds all of the code directly responsible for interacting
2//! with the serial connection and tasks within the program.
3
4pub mod tasks;
5
6/// Represents messages/commands that are sent from worker tasks
7/// to the [`SerialActor`] to process.
8#[non_exhaustive]
9#[derive(Debug)]
10pub enum SerialMessage {
11    /// Instructs the [`SerialActor`] to write bytes (`Vec<u8>`) to the serial connection.
12    Write(Vec<u8>),
13    /// Instructs the [`SerialActor`] to send a 'break' signal over the serial connection.
14    SendBreak,
15    /// Instructs the [`SerialActor`] to shutdown the serial connection.
16    Shutdown,
17}
18
19/// Represents events from the [`SerialActor`] that will be
20/// received and processed by worker tasks accordingly.
21#[non_exhaustive]
22#[derive(Clone, Debug)]
23pub enum SerialEvent {
24    /// Sends data received by the [`SerialActor`] to its tasks.
25    Data(std::sync::Arc<[u8]>),
26    /// Sends the error message received by the [`SerialActor`] to its tasks to handle.
27    Error(String),
28    /// Tells the [`SerialActor`]s tasks that the serial connection has been closed.
29    ConnectionClosed,
30}
31
32/// Responsible for passing data and messages between the serial connection and tasks.
33/// It uses the Actor model to maintain a single source for communicating between the
34/// serial connection and tasks within the program.
35///
36/// It broadcasts [`SerialEvent`]s to worker tasks via a [`tokio::sync::broadcast`]
37/// channel, and receives [`SerialMessage`]s from worker tasks via a [`tokio::sync::mpsc`]
38/// channel.
39pub struct SerialActor {
40    connection: serial2_tokio::SerialPort,
41    command_rx: tokio::sync::mpsc::Receiver<SerialMessage>,
42    broadcast_channel: tokio::sync::broadcast::Sender<SerialEvent>,
43}
44
45impl SerialActor {
46    /// Constructs a [`SerialActor`] Takes a serial port connection,
47    /// receiver to a command channel, and a sender to a broadcast channel.
48    pub fn new(
49        connection: serial2_tokio::SerialPort,
50        command_rx: tokio::sync::mpsc::Receiver<SerialMessage>,
51        broadcast_channel: tokio::sync::broadcast::Sender<SerialEvent>,
52    ) -> Self {
53        Self {
54            connection,
55            command_rx,
56            broadcast_channel,
57        }
58    }
59
60    /// This is the heart and soul of the [`SerialActor`].
61    /// `sericom` uses the Actor model to receive data from a serial connection
62    /// and forward to other tasks for them to process. It also receives [`SerialEvent`]s
63    /// from tasks and handles them accordingly; writes/sends data to the device
64    /// over the serial connection and closes the connection when receiving
65    /// [`SerialMessage::Shutdown`], ultimately causing the other tasks to shutdown.
66    ///
67    /// Since data is sent byte-by-byte over a serial connection, `run` will
68    /// batch the data before sending it to other tasks to reduce the number of syscalls.
69    pub async fn run(mut self) {
70        let mut buffer = vec![0u8; 4096];
71        loop {
72            tokio::select! {
73                // Handle commands/input from tasks
74                cmd = self.command_rx.recv() => {
75                    match cmd {
76                        Some(SerialMessage::Write(data)) => {
77                            if let Err(e) = self.connection.write_all(&data).await {
78                                self.broadcast_channel.send(SerialEvent::Error(e.to_string())).ok();
79                            }
80                        }
81                        Some(SerialMessage::Shutdown) => {
82                            self.broadcast_channel.send(SerialEvent::ConnectionClosed).ok();
83                        }
84                        Some(SerialMessage::SendBreak) => {
85                            self.send_break().await;
86                        }
87                        None => break,
88                    }
89                }
90                // Handle reading data from serial connection
91                read_result = self.connection.read(&mut buffer) => {
92                    match read_result {
93                        Ok(0) => {
94                            self.broadcast_channel.send(SerialEvent::ConnectionClosed).ok();
95                            break;
96                        }
97                        Ok(n) => {
98                            let data: std::sync::Arc<[u8]> = buffer[..n].into();
99                            self.broadcast_channel.send(SerialEvent::Data(data)).ok();
100                        }
101                        Err(e) => {
102                            self.broadcast_channel.send(SerialEvent::Error(e.to_string())).ok();
103                            break;
104                        }
105                    }
106                }
107            }
108        }
109    }
110
111    async fn send_break(&mut self) {
112        use tokio::time::{Duration, sleep};
113        let _ = self.connection.set_break(true);
114        sleep(Duration::from_millis(500)).await;
115        let _ = self.connection.set_break(false);
116    }
117}