sericom_core/serial_actor/mod.rs
1//! This module holds all of the code directly responsible for interacting
2//! with the serial connection and tasks within the program.
3
4pub mod tasks;
5
6/// Represents messages/commands that are sent from worker tasks
7/// to the [`SerialActor`] to process.
8#[non_exhaustive]
9#[derive(Debug)]
10pub enum SerialMessage {
11 /// Instructs the [`SerialActor`] to write bytes (`Vec<u8>`) to the serial connection.
12 Write(Vec<u8>),
13 /// Instructs the [`SerialActor`] to send a 'break' signal over the serial connection.
14 SendBreak,
15 /// Instructs the [`SerialActor`] to shutdown the serial connection.
16 Shutdown,
17}
18
19/// Represents events from the [`SerialActor`] that will be
20/// received and processed by worker tasks accordingly.
21#[non_exhaustive]
22#[derive(Clone, Debug)]
23pub enum SerialEvent {
24 /// Sends data received by the [`SerialActor`] to its tasks.
25 Data(std::sync::Arc<[u8]>),
26 /// Sends the error message received by the [`SerialActor`] to its tasks to handle.
27 Error(String),
28 /// Tells the [`SerialActor`]s tasks that the serial connection has been closed.
29 ConnectionClosed,
30}
31
32/// Responsible for passing data and messages between the serial connection and tasks.
33/// It uses the Actor model to maintain a single source for communicating between the
34/// serial connection and tasks within the program.
35///
36/// It broadcasts [`SerialEvent`]s to worker tasks via a [`tokio::sync::broadcast`]
37/// channel, and receives [`SerialMessage`]s from worker tasks via a [`tokio::sync::mpsc`]
38/// channel.
39pub struct SerialActor {
40 connection: serial2_tokio::SerialPort,
41 command_rx: tokio::sync::mpsc::Receiver<SerialMessage>,
42 broadcast_channel: tokio::sync::broadcast::Sender<SerialEvent>,
43}
44
45impl SerialActor {
46 /// Constructs a [`SerialActor`] Takes a serial port connection,
47 /// receiver to a command channel, and a sender to a broadcast channel.
48 pub fn new(
49 connection: serial2_tokio::SerialPort,
50 command_rx: tokio::sync::mpsc::Receiver<SerialMessage>,
51 broadcast_channel: tokio::sync::broadcast::Sender<SerialEvent>,
52 ) -> Self {
53 Self {
54 connection,
55 command_rx,
56 broadcast_channel,
57 }
58 }
59
60 /// This is the heart and soul of the [`SerialActor`].
61 /// `sericom` uses the Actor model to receive data from a serial connection
62 /// and forward to other tasks for them to process. It also receives [`SerialEvent`]s
63 /// from tasks and handles them accordingly; writes/sends data to the device
64 /// over the serial connection and closes the connection when receiving
65 /// [`SerialMessage::Shutdown`], ultimately causing the other tasks to shutdown.
66 ///
67 /// Since data is sent byte-by-byte over a serial connection, `run` will
68 /// batch the data before sending it to other tasks to reduce the number of syscalls.
69 pub async fn run(mut self) {
70 let mut buffer = vec![0u8; 4096];
71 loop {
72 tokio::select! {
73 // Handle commands/input from tasks
74 cmd = self.command_rx.recv() => {
75 match cmd {
76 Some(SerialMessage::Write(data)) => {
77 if let Err(e) = self.connection.write_all(&data).await {
78 self.broadcast_channel.send(SerialEvent::Error(e.to_string())).ok();
79 }
80 }
81 Some(SerialMessage::Shutdown) => {
82 self.broadcast_channel.send(SerialEvent::ConnectionClosed).ok();
83 }
84 Some(SerialMessage::SendBreak) => {
85 self.send_break().await;
86 }
87 None => break,
88 }
89 }
90 // Handle reading data from serial connection
91 read_result = self.connection.read(&mut buffer) => {
92 match read_result {
93 Ok(0) => {
94 self.broadcast_channel.send(SerialEvent::ConnectionClosed).ok();
95 break;
96 }
97 Ok(n) => {
98 let data: std::sync::Arc<[u8]> = buffer[..n].into();
99 self.broadcast_channel.send(SerialEvent::Data(data)).ok();
100 }
101 Err(e) => {
102 self.broadcast_channel.send(SerialEvent::Error(e.to_string())).ok();
103 break;
104 }
105 }
106 }
107 }
108 }
109 }
110
111 async fn send_break(&mut self) {
112 use tokio::time::{Duration, sleep};
113 let _ = self.connection.set_break(true);
114 sleep(Duration::from_millis(500)).await;
115 let _ = self.connection.set_break(false);
116 }
117}