rtcom_core/session.rs
1//! Session orchestrator: bridges a [`SerialDevice`] with the
2//! [`EventBus`].
3//!
4//! [`SerialDevice`]: crate::SerialDevice
5//! [`EventBus`]: crate::EventBus
6//!
7//! At v0.1 a [`Session`] runs a single task that multiplexes the serial
8//! device, the cancellation token, and the bus subscription via
9//! `tokio::select!`:
10//!
11//! - bytes arriving from the device → [`Event::RxBytes`];
12//! - [`Event::TxBytes`] published on the bus → bytes written to the device;
13//! - [`Event::Command`] published on the bus → handler dispatch (Issue #7);
14//! - cancellation token tripped or fatal I/O error → publish
15//! [`Event::DeviceDisconnected`] (when applicable) and exit cleanly.
16//!
17//! The single-task model gives the dispatch handlers exclusive `&mut`
18//! access to the device, which is required for the synchronous control
19//! operations (`set_baud_rate`, `set_dtr`, `send_break`, ...). The
20//! tradeoff is that a long write momentarily delays reads — acceptable
21//! for an interactive serial terminal where writes are short and rare.
22//!
23//! Later issues plug in mappers (Issue #8), logging, scripting, and so
24//! on as additional bus subscribers.
25
26use std::sync::Arc;
27use std::time::Duration;
28
29use tokio::io::{AsyncReadExt, AsyncWriteExt};
30use tokio::sync::broadcast;
31use tokio_util::sync::CancellationToken;
32
33use crate::command::Command;
34use crate::config::{Parity, StopBits};
35use crate::device::SerialDevice;
36use crate::event::{Event, EventBus};
37use crate::mapper::{LineEndingMapper, Mapper};
38
39const fn parity_letter(p: Parity) -> char {
40 match p {
41 Parity::None => 'N',
42 Parity::Even => 'E',
43 Parity::Odd => 'O',
44 Parity::Mark => 'M',
45 Parity::Space => 'S',
46 }
47}
48
49const fn stop_bits_number(s: StopBits) -> u8 {
50 match s {
51 StopBits::One => 1,
52 StopBits::Two => 2,
53 }
54}
55
56/// Read buffer size. 4 KiB matches the typical USB-serial driver burst
57/// granularity; larger buffers waste memory, smaller ones fragment events.
58const READ_BUFFER_BYTES: usize = 4096;
59
60/// Duration of the line break asserted by the `SendBreak` command.
61const SEND_BREAK_DURATION: Duration = Duration::from_millis(250);
62
63/// Static cheatsheet text for `Command::Help`.
64const HELP_TEXT: &str = "commands: ?/h help, q/x quit, c show config, t toggle DTR, \
65 g toggle RTS, b<rate><Enter> set baud, \\ send break";
66
67/// Owns a serial device and a bus, and runs the I/O + command loop.
68///
69/// `Session` is generic over the device type so tests can substitute a
70/// PTY pair (`SerialPortDevice::pair`) or, in the future, a fully mocked
71/// backend without dynamic dispatch overhead.
72pub struct Session<D: SerialDevice + 'static> {
73 device: D,
74 bus: EventBus,
75 cancel: CancellationToken,
76 /// Outbound mapper applied to `Event::TxBytes` payloads before they
77 /// hit the device. Defaults to a no-op `LineEndingMapper::default()`.
78 omap: Box<dyn Mapper>,
79 /// Inbound mapper applied to bytes read from the device before they
80 /// are republished as `Event::RxBytes`. Defaults to no-op.
81 imap: Box<dyn Mapper>,
82 /// Tracked DTR state. Initialised to `true` because `SerialDevice`
83 /// gives no way to query the line, and most backends open with DTR
84 /// asserted; the first toggle therefore deasserts.
85 dtr_asserted: bool,
86 /// Tracked RTS state. See `dtr_asserted` for the rationale.
87 rts_asserted: bool,
88}
89
90impl<D: SerialDevice + 'static> Session<D> {
91 /// Builds a session with a fresh bus and cancellation token,
92 /// no-op mappers on both directions.
93 #[must_use]
94 pub fn new(device: D) -> Self {
95 Self {
96 device,
97 bus: EventBus::default(),
98 cancel: CancellationToken::new(),
99 omap: Box::new(LineEndingMapper::default()),
100 imap: Box::new(LineEndingMapper::default()),
101 dtr_asserted: true,
102 rts_asserted: true,
103 }
104 }
105
106 /// Builds a session attached to a caller-supplied bus. Useful when
107 /// several subsystems already share a bus and the session should join
108 /// the existing fan-out instead of starting its own.
109 #[must_use]
110 pub fn with_bus(device: D, bus: EventBus) -> Self {
111 Self {
112 device,
113 bus,
114 cancel: CancellationToken::new(),
115 omap: Box::new(LineEndingMapper::default()),
116 imap: Box::new(LineEndingMapper::default()),
117 dtr_asserted: true,
118 rts_asserted: true,
119 }
120 }
121
122 /// Replaces the outbound mapper applied to `Event::TxBytes`
123 /// payloads before they reach the device.
124 #[must_use]
125 pub fn with_omap<M: Mapper + 'static>(mut self, mapper: M) -> Self {
126 self.omap = Box::new(mapper);
127 self
128 }
129
130 /// Replaces the inbound mapper applied to bytes read from the
131 /// device before they are republished as `Event::RxBytes`.
132 #[must_use]
133 pub fn with_imap<M: Mapper + 'static>(mut self, mapper: M) -> Self {
134 self.imap = Box::new(mapper);
135 self
136 }
137
138 /// Tells the session what the DTR line's actual state is on the
139 /// device. Use this when the caller has already issued a
140 /// `set_dtr` (e.g. main applying `--lower-dtr` right after
141 /// opening the port) so the cached state stays honest and the
142 /// first `Command::ToggleDtr` produces the right transition.
143 ///
144 /// Defaults to `true` (asserted) — the typical OS state at open.
145 #[must_use]
146 pub const fn with_initial_dtr(mut self, asserted: bool) -> Self {
147 self.dtr_asserted = asserted;
148 self
149 }
150
151 /// Tells the session what the RTS line's actual state is. See
152 /// [`with_initial_dtr`](Self::with_initial_dtr) for the rationale.
153 #[must_use]
154 pub const fn with_initial_rts(mut self, asserted: bool) -> Self {
155 self.rts_asserted = asserted;
156 self
157 }
158
159 /// Returns a reference to the bus. Clone it before calling
160 /// [`Session::run`] (which consumes `self`) if you need to publish or
161 /// subscribe from outside the session.
162 #[must_use]
163 pub const fn bus(&self) -> &EventBus {
164 &self.bus
165 }
166
167 /// Returns a clone of the cancellation token.
168 ///
169 /// Triggering [`CancellationToken::cancel`] on any clone causes
170 /// [`Session::run`] to wind down and return.
171 #[must_use]
172 pub fn cancellation_token(&self) -> CancellationToken {
173 self.cancel.clone()
174 }
175
176 /// Drives the session to completion.
177 ///
178 /// Subscribes to the bus, publishes [`Event::DeviceConnected`], then
179 /// loops until the cancellation token trips or a fatal I/O error
180 /// terminates the device.
181 ///
182 /// # Errors
183 ///
184 /// Currently always returns `Ok(())`; the variant is reserved for
185 /// startup failures introduced by later issues (e.g. mapper
186 /// initialisation).
187 pub async fn run(mut self) -> crate::Result<()> {
188 // Subscribe BEFORE publishing so the loop sees nothing it sent
189 // itself, but external pre-existing subscribers still get
190 // DeviceConnected.
191 let mut subscriber = self.bus.subscribe();
192 self.bus.publish(Event::DeviceConnected);
193
194 let mut read_buf = vec![0_u8; READ_BUFFER_BYTES];
195 loop {
196 tokio::select! {
197 biased;
198 () = self.cancel.cancelled() => break,
199
200 res = self.device.read(&mut read_buf) => match res {
201 Ok(0) => {
202 self.bus.publish(Event::DeviceDisconnected {
203 reason: "EOF on serial read".into(),
204 });
205 break;
206 }
207 Ok(n) => {
208 let mapped = self.imap.map(&read_buf[..n]);
209 self.bus.publish(Event::RxBytes(mapped));
210 }
211 Err(err) => {
212 self.bus.publish(Event::DeviceDisconnected {
213 reason: format!("serial read failed: {err}"),
214 });
215 break;
216 }
217 },
218
219 msg = subscriber.recv() => match msg {
220 Ok(Event::TxBytes(bytes)) => {
221 let mapped = self.omap.map(&bytes);
222 if let Err(err) = self.device.write_all(&mapped).await {
223 self.bus.publish(Event::DeviceDisconnected {
224 reason: format!("serial write failed: {err}"),
225 });
226 break;
227 }
228 }
229 Ok(Event::Command(cmd)) => self.dispatch_command(cmd),
230 // Lagged: we missed some events but can resume.
231 // Other event variants are not the loop's concern.
232 Ok(_) | Err(broadcast::error::RecvError::Lagged(_)) => {}
233 // Closed: no senders left, nothing more will arrive.
234 Err(broadcast::error::RecvError::Closed) => break,
235 },
236 }
237 }
238 Ok(())
239 }
240
241 /// Apply a [`Command`] to the device and bus.
242 ///
243 /// Commands that mutate the device run synchronously here; success
244 /// emits [`Event::ConfigChanged`] (when applicable), failure emits
245 /// [`Event::Error`]. The caller (the `Session::run` loop) does not
246 /// need to await anything: every operation either completes
247 /// immediately or is fire-and-forget.
248 fn dispatch_command(&mut self, cmd: Command) {
249 match cmd {
250 Command::Quit => self.cancel.cancel(),
251 Command::Help => {
252 self.bus.publish(Event::SystemMessage(HELP_TEXT.into()));
253 }
254 Command::ShowConfig => {
255 let cfg = self.device.config();
256 self.bus.publish(Event::SystemMessage(format!(
257 "config: {} {}{}{} flow={:?}",
258 cfg.baud_rate,
259 cfg.data_bits.bits(),
260 parity_letter(cfg.parity),
261 stop_bits_number(cfg.stop_bits),
262 cfg.flow_control,
263 )));
264 }
265 Command::SetBaud(rate) => match self.device.set_baud_rate(rate) {
266 Ok(()) => {
267 self.bus
268 .publish(Event::ConfigChanged(*self.device.config()));
269 }
270 Err(err) => {
271 self.bus.publish(Event::Error(Arc::new(err)));
272 }
273 },
274 Command::ToggleDtr => {
275 let new_state = !self.dtr_asserted;
276 match self.device.set_dtr(new_state) {
277 Ok(()) => {
278 self.dtr_asserted = new_state;
279 self.bus.publish(Event::SystemMessage(format!(
280 "DTR: {}",
281 if new_state { "asserted" } else { "deasserted" }
282 )));
283 }
284 Err(err) => {
285 self.bus.publish(Event::Error(Arc::new(err)));
286 }
287 }
288 }
289 Command::ToggleRts => {
290 let new_state = !self.rts_asserted;
291 match self.device.set_rts(new_state) {
292 Ok(()) => {
293 self.rts_asserted = new_state;
294 self.bus.publish(Event::SystemMessage(format!(
295 "RTS: {}",
296 if new_state { "asserted" } else { "deasserted" }
297 )));
298 }
299 Err(err) => {
300 self.bus.publish(Event::Error(Arc::new(err)));
301 }
302 }
303 }
304 Command::SendBreak => match self.device.send_break(SEND_BREAK_DURATION) {
305 Ok(()) => {
306 self.bus.publish(Event::SystemMessage(format!(
307 "sent {} ms break",
308 SEND_BREAK_DURATION.as_millis()
309 )));
310 }
311 Err(err) => {
312 self.bus.publish(Event::Error(Arc::new(err)));
313 }
314 },
315 }
316 }
317}