sericom_core/serial_actor/mod.rs
1//! This module holds all of the code directly responsible for interacting
2//! with the serial connection and tasks within the program.
3
4use tracing::{Level, span};
5
6pub mod tasks;
7
8/// Represents messages/commands that are sent from worker tasks
9/// to the [`SerialActor`] to process.
10#[non_exhaustive]
11#[derive(Debug)]
12pub enum SerialMessage {
13 /// Instructs the [`SerialActor`] to write bytes (`Vec<u8>`) to the serial connection.
14 Write(Vec<u8>),
15 /// Instructs the [`SerialActor`] to send a 'break' signal over the serial connection.
16 SendBreak,
17 /// Instructs the [`SerialActor`] to shutdown the serial connection.
18 Shutdown,
19}
20
21/// Represents events from the [`SerialActor`] that will be
22/// received and processed by worker tasks accordingly.
23#[non_exhaustive]
24#[derive(Clone, Debug)]
25pub enum SerialEvent {
26 /// Sends data received by the [`SerialActor`] to its tasks.
27 Data(std::sync::Arc<[u8]>),
28 /// Sends the error message received by the [`SerialActor`] to its tasks to handle.
29 Error(String),
30 /// Tells the [`SerialActor`]s tasks that the serial connection has been closed.
31 ConnectionClosed,
32}
33
34/// Responsible for passing data and messages between the serial connection and tasks.
35/// It uses the Actor model to maintain a single source for communicating between the
36/// serial connection and tasks within the program.
37///
38/// It broadcasts [`SerialEvent`]s to worker tasks via a [`tokio::sync::broadcast`]
39/// channel, and receives [`SerialMessage`]s from worker tasks via a [`tokio::sync::mpsc`]
40/// channel.
41pub struct SerialActor {
42 connection: serial2_tokio::SerialPort,
43 command_rx: tokio::sync::mpsc::Receiver<SerialMessage>,
44 broadcast_channel: tokio::sync::broadcast::Sender<SerialEvent>,
45}
46
47impl SerialActor {
48 /// Constructs a [`SerialActor`] Takes a serial port connection,
49 /// receiver to a command channel, and a sender to a broadcast channel.
50 pub fn new(
51 connection: serial2_tokio::SerialPort,
52 command_rx: tokio::sync::mpsc::Receiver<SerialMessage>,
53 broadcast_channel: tokio::sync::broadcast::Sender<SerialEvent>,
54 ) -> Self {
55 Self {
56 connection,
57 command_rx,
58 broadcast_channel,
59 }
60 }
61
62 /// This is the heart and soul of the [`SerialActor`].
63 /// `sericom` uses the Actor model to receive data from a serial connection
64 /// and forward to other tasks for them to process. It also receives [`SerialEvent`]s
65 /// from tasks and handles them accordingly; writes/sends data to the device
66 /// over the serial connection and closes the connection when receiving
67 /// [`SerialMessage::Shutdown`], ultimately causing the other tasks to shutdown.
68 ///
69 /// Since data is sent byte-by-byte over a serial connection, `run` will
70 /// batch the data before sending it to other tasks to reduce the number of syscalls.
71 pub async fn run(mut self) {
72 let span = span!(Level::TRACE, "SerialActor");
73 let _enter = span.enter();
74 let mut buffer = vec![0u8; 4096];
75 loop {
76 tokio::select! {
77 // Handle commands/input from tasks
78 cmd = self.command_rx.recv() => {
79 match cmd {
80 Some(SerialMessage::Write(data)) => {
81 if let Err(e) = self.connection.write_all(&data).await {
82 self.broadcast_channel.send(SerialEvent::Error(e.to_string())).ok();
83 }
84 }
85 Some(SerialMessage::Shutdown) => {
86 self.broadcast_channel.send(SerialEvent::ConnectionClosed).ok();
87 }
88 Some(SerialMessage::SendBreak) => {
89 self.send_break().await;
90 }
91 None => break,
92 }
93 }
94 // Handle reading data from serial connection
95 read_result = self.connection.read(&mut buffer) => {
96 // tracing::event!(Level::TRACE, "Data read");
97 match read_result {
98 Ok(0) => {
99 self.broadcast_channel.send(SerialEvent::ConnectionClosed).ok();
100 break;
101 }
102 Ok(n) => {
103 let data: std::sync::Arc<[u8]> = buffer[..n].into();
104 self.broadcast_channel.send(SerialEvent::Data(data)).ok();
105 }
106 Err(e) => {
107 self.broadcast_channel.send(SerialEvent::Error(e.to_string())).ok();
108 break;
109 }
110 }
111 }
112 }
113 }
114 }
115
116 async fn send_break(&mut self) {
117 use tokio::time::{Duration, sleep};
118 let _ = self.connection.set_break(true);
119 sleep(Duration::from_millis(500)).await;
120 let _ = self.connection.set_break(false);
121 }
122}