sericom_core/serial_actor/
mod.rs

1//! This module holds all of the code directly responsible for interacting
2//! with the serial connection and tasks within the program.
3
4use tracing::{Level, span};
5
6pub mod tasks;
7
8/// Represents messages/commands that are sent from worker tasks
9/// to the [`SerialActor`] to process.
10#[non_exhaustive]
11#[derive(Debug)]
12pub enum SerialMessage {
13    /// Instructs the [`SerialActor`] to write bytes (`Vec<u8>`) to the serial connection.
14    Write(Vec<u8>),
15    /// Instructs the [`SerialActor`] to send a 'break' signal over the serial connection.
16    SendBreak,
17    /// Instructs the [`SerialActor`] to shutdown the serial connection.
18    Shutdown,
19}
20
21/// Represents events from the [`SerialActor`] that will be
22/// received and processed by worker tasks accordingly.
23#[non_exhaustive]
24#[derive(Clone, Debug)]
25pub enum SerialEvent {
26    /// Sends data received by the [`SerialActor`] to its tasks.
27    Data(std::sync::Arc<[u8]>),
28    /// Sends the error message received by the [`SerialActor`] to its tasks to handle.
29    Error(String),
30    /// Tells the [`SerialActor`]s tasks that the serial connection has been closed.
31    ConnectionClosed,
32}
33
34/// Responsible for passing data and messages between the serial connection and tasks.
35/// It uses the Actor model to maintain a single source for communicating between the
36/// serial connection and tasks within the program.
37///
38/// It broadcasts [`SerialEvent`]s to worker tasks via a [`tokio::sync::broadcast`]
39/// channel, and receives [`SerialMessage`]s from worker tasks via a [`tokio::sync::mpsc`]
40/// channel.
41pub struct SerialActor {
42    connection: serial2_tokio::SerialPort,
43    command_rx: tokio::sync::mpsc::Receiver<SerialMessage>,
44    broadcast_channel: tokio::sync::broadcast::Sender<SerialEvent>,
45}
46
47impl SerialActor {
48    /// Constructs a [`SerialActor`] Takes a serial port connection,
49    /// receiver to a command channel, and a sender to a broadcast channel.
50    pub fn new(
51        connection: serial2_tokio::SerialPort,
52        command_rx: tokio::sync::mpsc::Receiver<SerialMessage>,
53        broadcast_channel: tokio::sync::broadcast::Sender<SerialEvent>,
54    ) -> Self {
55        Self {
56            connection,
57            command_rx,
58            broadcast_channel,
59        }
60    }
61
62    /// This is the heart and soul of the [`SerialActor`].
63    /// `sericom` uses the Actor model to receive data from a serial connection
64    /// and forward to other tasks for them to process. It also receives [`SerialEvent`]s
65    /// from tasks and handles them accordingly; writes/sends data to the device
66    /// over the serial connection and closes the connection when receiving
67    /// [`SerialMessage::Shutdown`], ultimately causing the other tasks to shutdown.
68    ///
69    /// Since data is sent byte-by-byte over a serial connection, `run` will
70    /// batch the data before sending it to other tasks to reduce the number of syscalls.
71    pub async fn run(mut self) {
72        let span = span!(Level::TRACE, "SerialActor");
73        let _enter = span.enter();
74        let mut buffer = vec![0u8; 4096];
75        loop {
76            tokio::select! {
77                // Handle commands/input from tasks
78                cmd = self.command_rx.recv() => {
79                    match cmd {
80                        Some(SerialMessage::Write(data)) => {
81                            if let Err(e) = self.connection.write_all(&data).await {
82                                self.broadcast_channel.send(SerialEvent::Error(e.to_string())).ok();
83                            }
84                        }
85                        Some(SerialMessage::Shutdown) => {
86                            self.broadcast_channel.send(SerialEvent::ConnectionClosed).ok();
87                        }
88                        Some(SerialMessage::SendBreak) => {
89                            self.send_break().await;
90                        }
91                        None => break,
92                    }
93                }
94                // Handle reading data from serial connection
95                read_result = self.connection.read(&mut buffer) => {
96                    // tracing::event!(Level::TRACE, "Data read");
97                    match read_result {
98                        Ok(0) => {
99                            self.broadcast_channel.send(SerialEvent::ConnectionClosed).ok();
100                            break;
101                        }
102                        Ok(n) => {
103                            let data: std::sync::Arc<[u8]> = buffer[..n].into();
104                            self.broadcast_channel.send(SerialEvent::Data(data)).ok();
105                        }
106                        Err(e) => {
107                            self.broadcast_channel.send(SerialEvent::Error(e.to_string())).ok();
108                            break;
109                        }
110                    }
111                }
112            }
113        }
114    }
115
116    async fn send_break(&mut self) {
117        use tokio::time::{Duration, sleep};
118        let _ = self.connection.set_break(true);
119        sleep(Duration::from_millis(500)).await;
120        let _ = self.connection.set_break(false);
121    }
122}