rtcom_core/session.rs
1//! Session orchestrator: bridges a [`SerialDevice`] with the
2//! [`EventBus`].
3//!
4//! [`SerialDevice`]: crate::SerialDevice
5//! [`EventBus`]: crate::EventBus
6//!
7//! At v0.1 a [`Session`] runs a single task that multiplexes the serial
8//! device, the cancellation token, and the bus subscription via
9//! `tokio::select!`:
10//!
11//! - bytes arriving from the device → [`Event::RxBytes`];
12//! - [`Event::TxBytes`] published on the bus → bytes written to the device;
13//! - [`Event::Command`] published on the bus → handler dispatch (Issue #7);
14//! - cancellation token tripped or fatal I/O error → publish
15//! [`Event::DeviceDisconnected`] (when applicable) and exit cleanly.
16//!
17//! The single-task model gives the dispatch handlers exclusive `&mut`
18//! access to the device, which is required for the synchronous control
19//! operations (`set_baud_rate`, `set_dtr`, `send_break`, ...). The
20//! tradeoff is that a long write momentarily delays reads — acceptable
21//! for an interactive serial terminal where writes are short and rare.
22//!
23//! Later issues plug in mappers (Issue #8), logging, scripting, and so
24//! on as additional bus subscribers.
25
26use std::sync::Arc;
27use std::time::Duration;
28
29use tokio::io::{AsyncReadExt, AsyncWriteExt};
30use tokio::sync::broadcast;
31use tokio_util::sync::CancellationToken;
32
33use crate::command::Command;
34use crate::config::{Parity, StopBits};
35use crate::device::SerialDevice;
36use crate::event::{Event, EventBus};
37use crate::mapper::{LineEndingMapper, Mapper};
38
39const fn parity_letter(p: Parity) -> char {
40 match p {
41 Parity::None => 'N',
42 Parity::Even => 'E',
43 Parity::Odd => 'O',
44 Parity::Mark => 'M',
45 Parity::Space => 'S',
46 }
47}
48
49const fn stop_bits_number(s: StopBits) -> u8 {
50 match s {
51 StopBits::One => 1,
52 StopBits::Two => 2,
53 }
54}
55
56/// Read buffer size. 4 KiB matches the typical USB-serial driver burst
57/// granularity; larger buffers waste memory, smaller ones fragment events.
58const READ_BUFFER_BYTES: usize = 4096;
59
60/// Duration of the line break asserted by the `SendBreak` command.
61const SEND_BREAK_DURATION: Duration = Duration::from_millis(250);
62
63/// Static cheatsheet text for `Command::Help`.
64const HELP_TEXT: &str = "commands: ?/h help, q/x quit, c show config, t toggle DTR, \
65 g toggle RTS, b<rate><Enter> set baud, \\ send break";
66
67/// Owns a serial device and a bus, and runs the I/O + command loop.
68///
69/// `Session` is generic over the device type so tests can substitute a
70/// PTY pair (`SerialPortDevice::pair`) or, in the future, a fully mocked
71/// backend without dynamic dispatch overhead.
72pub struct Session<D: SerialDevice + 'static> {
73 device: D,
74 bus: EventBus,
75 cancel: CancellationToken,
76 /// Outbound mapper applied to `Event::TxBytes` payloads before they
77 /// hit the device. Defaults to a no-op `LineEndingMapper::default()`.
78 omap: Box<dyn Mapper>,
79 /// Inbound mapper applied to bytes read from the device before they
80 /// are republished as `Event::RxBytes`. Defaults to no-op.
81 imap: Box<dyn Mapper>,
82 /// Tracked DTR state. Initialised to `true` because `SerialDevice`
83 /// gives no way to query the line, and most backends open with DTR
84 /// asserted; the first toggle therefore deasserts.
85 dtr_asserted: bool,
86 /// Tracked RTS state. See `dtr_asserted` for the rationale.
87 rts_asserted: bool,
88}
89
90impl<D: SerialDevice + 'static> Session<D> {
91 /// Builds a session with a fresh bus and cancellation token,
92 /// no-op mappers on both directions.
93 #[must_use]
94 pub fn new(device: D) -> Self {
95 Self {
96 device,
97 bus: EventBus::default(),
98 cancel: CancellationToken::new(),
99 omap: Box::new(LineEndingMapper::default()),
100 imap: Box::new(LineEndingMapper::default()),
101 dtr_asserted: true,
102 rts_asserted: true,
103 }
104 }
105
106 /// Builds a session attached to a caller-supplied bus. Useful when
107 /// several subsystems already share a bus and the session should join
108 /// the existing fan-out instead of starting its own.
109 #[must_use]
110 pub fn with_bus(device: D, bus: EventBus) -> Self {
111 Self {
112 device,
113 bus,
114 cancel: CancellationToken::new(),
115 omap: Box::new(LineEndingMapper::default()),
116 imap: Box::new(LineEndingMapper::default()),
117 dtr_asserted: true,
118 rts_asserted: true,
119 }
120 }
121
122 /// Replaces the outbound mapper applied to `Event::TxBytes`
123 /// payloads before they reach the device.
124 #[must_use]
125 pub fn with_omap<M: Mapper + 'static>(mut self, mapper: M) -> Self {
126 self.omap = Box::new(mapper);
127 self
128 }
129
130 /// Replaces the inbound mapper applied to bytes read from the
131 /// device before they are republished as `Event::RxBytes`.
132 #[must_use]
133 pub fn with_imap<M: Mapper + 'static>(mut self, mapper: M) -> Self {
134 self.imap = Box::new(mapper);
135 self
136 }
137
138 /// Returns a reference to the bus. Clone it before calling
139 /// [`Session::run`] (which consumes `self`) if you need to publish or
140 /// subscribe from outside the session.
141 #[must_use]
142 pub const fn bus(&self) -> &EventBus {
143 &self.bus
144 }
145
146 /// Returns a clone of the cancellation token.
147 ///
148 /// Triggering [`CancellationToken::cancel`] on any clone causes
149 /// [`Session::run`] to wind down and return.
150 #[must_use]
151 pub fn cancellation_token(&self) -> CancellationToken {
152 self.cancel.clone()
153 }
154
155 /// Drives the session to completion.
156 ///
157 /// Subscribes to the bus, publishes [`Event::DeviceConnected`], then
158 /// loops until the cancellation token trips or a fatal I/O error
159 /// terminates the device.
160 ///
161 /// # Errors
162 ///
163 /// Currently always returns `Ok(())`; the variant is reserved for
164 /// startup failures introduced by later issues (e.g. mapper
165 /// initialisation).
166 pub async fn run(mut self) -> crate::Result<()> {
167 // Subscribe BEFORE publishing so the loop sees nothing it sent
168 // itself, but external pre-existing subscribers still get
169 // DeviceConnected.
170 let mut subscriber = self.bus.subscribe();
171 self.bus.publish(Event::DeviceConnected);
172
173 let mut read_buf = vec![0_u8; READ_BUFFER_BYTES];
174 loop {
175 tokio::select! {
176 biased;
177 () = self.cancel.cancelled() => break,
178
179 res = self.device.read(&mut read_buf) => match res {
180 Ok(0) => {
181 self.bus.publish(Event::DeviceDisconnected {
182 reason: "EOF on serial read".into(),
183 });
184 break;
185 }
186 Ok(n) => {
187 let mapped = self.imap.map(&read_buf[..n]);
188 self.bus.publish(Event::RxBytes(mapped));
189 }
190 Err(err) => {
191 self.bus.publish(Event::DeviceDisconnected {
192 reason: format!("serial read failed: {err}"),
193 });
194 break;
195 }
196 },
197
198 msg = subscriber.recv() => match msg {
199 Ok(Event::TxBytes(bytes)) => {
200 let mapped = self.omap.map(&bytes);
201 if let Err(err) = self.device.write_all(&mapped).await {
202 self.bus.publish(Event::DeviceDisconnected {
203 reason: format!("serial write failed: {err}"),
204 });
205 break;
206 }
207 }
208 Ok(Event::Command(cmd)) => self.dispatch_command(cmd),
209 // Lagged: we missed some events but can resume.
210 // Other event variants are not the loop's concern.
211 Ok(_) | Err(broadcast::error::RecvError::Lagged(_)) => {}
212 // Closed: no senders left, nothing more will arrive.
213 Err(broadcast::error::RecvError::Closed) => break,
214 },
215 }
216 }
217 Ok(())
218 }
219
220 /// Apply a [`Command`] to the device and bus.
221 ///
222 /// Commands that mutate the device run synchronously here; success
223 /// emits [`Event::ConfigChanged`] (when applicable), failure emits
224 /// [`Event::Error`]. The caller (the `Session::run` loop) does not
225 /// need to await anything: every operation either completes
226 /// immediately or is fire-and-forget.
227 fn dispatch_command(&mut self, cmd: Command) {
228 match cmd {
229 Command::Quit => self.cancel.cancel(),
230 Command::Help => {
231 self.bus.publish(Event::SystemMessage(HELP_TEXT.into()));
232 }
233 Command::ShowConfig => {
234 let cfg = self.device.config();
235 self.bus.publish(Event::SystemMessage(format!(
236 "config: {} {}{}{} flow={:?}",
237 cfg.baud_rate,
238 cfg.data_bits.bits(),
239 parity_letter(cfg.parity),
240 stop_bits_number(cfg.stop_bits),
241 cfg.flow_control,
242 )));
243 }
244 Command::SetBaud(rate) => match self.device.set_baud_rate(rate) {
245 Ok(()) => {
246 self.bus
247 .publish(Event::ConfigChanged(*self.device.config()));
248 }
249 Err(err) => {
250 self.bus.publish(Event::Error(Arc::new(err)));
251 }
252 },
253 Command::ToggleDtr => {
254 let new_state = !self.dtr_asserted;
255 match self.device.set_dtr(new_state) {
256 Ok(()) => {
257 self.dtr_asserted = new_state;
258 self.bus.publish(Event::SystemMessage(format!(
259 "DTR: {}",
260 if new_state { "asserted" } else { "deasserted" }
261 )));
262 }
263 Err(err) => {
264 self.bus.publish(Event::Error(Arc::new(err)));
265 }
266 }
267 }
268 Command::ToggleRts => {
269 let new_state = !self.rts_asserted;
270 match self.device.set_rts(new_state) {
271 Ok(()) => {
272 self.rts_asserted = new_state;
273 self.bus.publish(Event::SystemMessage(format!(
274 "RTS: {}",
275 if new_state { "asserted" } else { "deasserted" }
276 )));
277 }
278 Err(err) => {
279 self.bus.publish(Event::Error(Arc::new(err)));
280 }
281 }
282 }
283 Command::SendBreak => match self.device.send_break(SEND_BREAK_DURATION) {
284 Ok(()) => {
285 self.bus.publish(Event::SystemMessage(format!(
286 "sent {} ms break",
287 SEND_BREAK_DURATION.as_millis()
288 )));
289 }
290 Err(err) => {
291 self.bus.publish(Event::Error(Arc::new(err)));
292 }
293 },
294 }
295 }
296}