Skip to main content

rtcom_core/
session.rs

1//! Session orchestrator: bridges a [`SerialDevice`] with the
2//! [`EventBus`].
3//!
4//! [`SerialDevice`]: crate::SerialDevice
5//! [`EventBus`]: crate::EventBus
6//!
7//! At v0.1 a [`Session`] runs a single task that multiplexes the serial
8//! device, the cancellation token, and the bus subscription via
9//! `tokio::select!`:
10//!
11//! - bytes arriving from the device → [`Event::RxBytes`];
12//! - [`Event::TxBytes`] published on the bus → bytes written to the device;
13//! - [`Event::Command`] published on the bus → handler dispatch (Issue #7);
14//! - cancellation token tripped or fatal I/O error → publish
15//!   [`Event::DeviceDisconnected`] (when applicable) and exit cleanly.
16//!
17//! The single-task model gives the dispatch handlers exclusive `&mut`
18//! access to the device, which is required for the synchronous control
19//! operations (`set_baud_rate`, `set_dtr`, `send_break`, ...). The
20//! tradeoff is that a long write momentarily delays reads — acceptable
21//! for an interactive serial terminal where writes are short and rare.
22//!
23//! Later issues plug in mappers (Issue #8), logging, scripting, and so
24//! on as additional bus subscribers.
25
26use std::sync::Arc;
27use std::time::Duration;
28
29use tokio::io::{AsyncReadExt, AsyncWriteExt};
30use tokio::sync::broadcast;
31use tokio_util::sync::CancellationToken;
32
33use crate::command::Command;
34use crate::config::{Parity, StopBits};
35use crate::device::SerialDevice;
36use crate::event::{Event, EventBus};
37use crate::mapper::{LineEndingMapper, Mapper};
38
39const fn parity_letter(p: Parity) -> char {
40    match p {
41        Parity::None => 'N',
42        Parity::Even => 'E',
43        Parity::Odd => 'O',
44        Parity::Mark => 'M',
45        Parity::Space => 'S',
46    }
47}
48
49const fn stop_bits_number(s: StopBits) -> u8 {
50    match s {
51        StopBits::One => 1,
52        StopBits::Two => 2,
53    }
54}
55
56/// Read buffer size. 4 KiB matches the typical USB-serial driver burst
57/// granularity; larger buffers waste memory, smaller ones fragment events.
58const READ_BUFFER_BYTES: usize = 4096;
59
60/// Duration of the line break asserted by the `SendBreak` command.
61const SEND_BREAK_DURATION: Duration = Duration::from_millis(250);
62
63/// Static cheatsheet text for `Command::Help`.
64const HELP_TEXT: &str = "commands: ?/h help, q/x quit, c show config, t toggle DTR, \
65                         g toggle RTS, b<rate><Enter> set baud, \\ send break";
66
67/// Owns a serial device and a bus, and runs the I/O + command loop.
68///
69/// `Session` is generic over the device type so tests can substitute a
70/// PTY pair (`SerialPortDevice::pair`) or, in the future, a fully mocked
71/// backend without dynamic dispatch overhead.
72pub struct Session<D: SerialDevice + 'static> {
73    device: D,
74    bus: EventBus,
75    cancel: CancellationToken,
76    /// Outbound mapper applied to `Event::TxBytes` payloads before they
77    /// hit the device. Defaults to a no-op `LineEndingMapper::default()`.
78    omap: Box<dyn Mapper>,
79    /// Inbound mapper applied to bytes read from the device before they
80    /// are republished as `Event::RxBytes`. Defaults to no-op.
81    imap: Box<dyn Mapper>,
82    /// Tracked DTR state. Initialised to `true` because `SerialDevice`
83    /// gives no way to query the line, and most backends open with DTR
84    /// asserted; the first toggle therefore deasserts.
85    dtr_asserted: bool,
86    /// Tracked RTS state. See `dtr_asserted` for the rationale.
87    rts_asserted: bool,
88}
89
90impl<D: SerialDevice + 'static> Session<D> {
91    /// Builds a session with a fresh bus and cancellation token,
92    /// no-op mappers on both directions.
93    #[must_use]
94    pub fn new(device: D) -> Self {
95        Self {
96            device,
97            bus: EventBus::default(),
98            cancel: CancellationToken::new(),
99            omap: Box::new(LineEndingMapper::default()),
100            imap: Box::new(LineEndingMapper::default()),
101            dtr_asserted: true,
102            rts_asserted: true,
103        }
104    }
105
106    /// Builds a session attached to a caller-supplied bus. Useful when
107    /// several subsystems already share a bus and the session should join
108    /// the existing fan-out instead of starting its own.
109    #[must_use]
110    pub fn with_bus(device: D, bus: EventBus) -> Self {
111        Self {
112            device,
113            bus,
114            cancel: CancellationToken::new(),
115            omap: Box::new(LineEndingMapper::default()),
116            imap: Box::new(LineEndingMapper::default()),
117            dtr_asserted: true,
118            rts_asserted: true,
119        }
120    }
121
122    /// Replaces the outbound mapper applied to `Event::TxBytes`
123    /// payloads before they reach the device.
124    #[must_use]
125    pub fn with_omap<M: Mapper + 'static>(mut self, mapper: M) -> Self {
126        self.omap = Box::new(mapper);
127        self
128    }
129
130    /// Replaces the inbound mapper applied to bytes read from the
131    /// device before they are republished as `Event::RxBytes`.
132    #[must_use]
133    pub fn with_imap<M: Mapper + 'static>(mut self, mapper: M) -> Self {
134        self.imap = Box::new(mapper);
135        self
136    }
137
138    /// Tells the session what the DTR line's actual state is on the
139    /// device. Use this when the caller has already issued a
140    /// `set_dtr` (e.g. main applying `--lower-dtr` right after
141    /// opening the port) so the cached state stays honest and the
142    /// first `Command::ToggleDtr` produces the right transition.
143    ///
144    /// Defaults to `true` (asserted) — the typical OS state at open.
145    #[must_use]
146    pub const fn with_initial_dtr(mut self, asserted: bool) -> Self {
147        self.dtr_asserted = asserted;
148        self
149    }
150
151    /// Tells the session what the RTS line's actual state is. See
152    /// [`with_initial_dtr`](Self::with_initial_dtr) for the rationale.
153    #[must_use]
154    pub const fn with_initial_rts(mut self, asserted: bool) -> Self {
155        self.rts_asserted = asserted;
156        self
157    }
158
159    /// Returns a reference to the bus. Clone it before calling
160    /// [`Session::run`] (which consumes `self`) if you need to publish or
161    /// subscribe from outside the session.
162    #[must_use]
163    pub const fn bus(&self) -> &EventBus {
164        &self.bus
165    }
166
167    /// Returns a clone of the cancellation token.
168    ///
169    /// Triggering [`CancellationToken::cancel`] on any clone causes
170    /// [`Session::run`] to wind down and return.
171    #[must_use]
172    pub fn cancellation_token(&self) -> CancellationToken {
173        self.cancel.clone()
174    }
175
176    /// Drives the session to completion.
177    ///
178    /// Subscribes to the bus, publishes [`Event::DeviceConnected`], then
179    /// loops until the cancellation token trips or a fatal I/O error
180    /// terminates the device.
181    ///
182    /// # Errors
183    ///
184    /// Currently always returns `Ok(())`; the variant is reserved for
185    /// startup failures introduced by later issues (e.g. mapper
186    /// initialisation).
187    pub async fn run(mut self) -> crate::Result<()> {
188        // Subscribe BEFORE publishing so the loop sees nothing it sent
189        // itself, but external pre-existing subscribers still get
190        // DeviceConnected.
191        let mut subscriber = self.bus.subscribe();
192        self.bus.publish(Event::DeviceConnected);
193
194        let mut read_buf = vec![0_u8; READ_BUFFER_BYTES];
195        loop {
196            tokio::select! {
197                biased;
198                () = self.cancel.cancelled() => break,
199
200                res = self.device.read(&mut read_buf) => match res {
201                    Ok(0) => {
202                        self.bus.publish(Event::DeviceDisconnected {
203                            reason: "EOF on serial read".into(),
204                        });
205                        break;
206                    }
207                    Ok(n) => {
208                        let mapped = self.imap.map(&read_buf[..n]);
209                        self.bus.publish(Event::RxBytes(mapped));
210                    }
211                    Err(err) => {
212                        self.bus.publish(Event::DeviceDisconnected {
213                            reason: format!("serial read failed: {err}"),
214                        });
215                        break;
216                    }
217                },
218
219                msg = subscriber.recv() => match msg {
220                    Ok(Event::TxBytes(bytes)) => {
221                        let mapped = self.omap.map(&bytes);
222                        if let Err(err) = self.device.write_all(&mapped).await {
223                            self.bus.publish(Event::DeviceDisconnected {
224                                reason: format!("serial write failed: {err}"),
225                            });
226                            break;
227                        }
228                    }
229                    Ok(Event::Command(cmd)) => self.dispatch_command(cmd),
230                    // Lagged: we missed some events but can resume.
231                    // Other event variants are not the loop's concern.
232                    Ok(_) | Err(broadcast::error::RecvError::Lagged(_)) => {}
233                    // Closed: no senders left, nothing more will arrive.
234                    Err(broadcast::error::RecvError::Closed) => break,
235                },
236            }
237        }
238        Ok(())
239    }
240
241    /// Apply a [`Command`] to the device and bus.
242    ///
243    /// Commands that mutate the device run synchronously here; success
244    /// emits [`Event::ConfigChanged`] (when applicable), failure emits
245    /// [`Event::Error`]. The caller (the `Session::run` loop) does not
246    /// need to await anything: every operation either completes
247    /// immediately or is fire-and-forget.
248    fn dispatch_command(&mut self, cmd: Command) {
249        match cmd {
250            Command::Quit => self.cancel.cancel(),
251            Command::Help => {
252                self.bus.publish(Event::SystemMessage(HELP_TEXT.into()));
253            }
254            Command::ShowConfig => {
255                let cfg = self.device.config();
256                self.bus.publish(Event::SystemMessage(format!(
257                    "config: {} {}{}{} flow={:?}",
258                    cfg.baud_rate,
259                    cfg.data_bits.bits(),
260                    parity_letter(cfg.parity),
261                    stop_bits_number(cfg.stop_bits),
262                    cfg.flow_control,
263                )));
264            }
265            Command::SetBaud(rate) => match self.device.set_baud_rate(rate) {
266                Ok(()) => {
267                    self.bus
268                        .publish(Event::ConfigChanged(*self.device.config()));
269                }
270                Err(err) => {
271                    self.bus.publish(Event::Error(Arc::new(err)));
272                }
273            },
274            Command::ToggleDtr => {
275                let new_state = !self.dtr_asserted;
276                match self.device.set_dtr(new_state) {
277                    Ok(()) => {
278                        self.dtr_asserted = new_state;
279                        self.bus.publish(Event::SystemMessage(format!(
280                            "DTR: {}",
281                            if new_state { "asserted" } else { "deasserted" }
282                        )));
283                    }
284                    Err(err) => {
285                        self.bus.publish(Event::Error(Arc::new(err)));
286                    }
287                }
288            }
289            Command::ToggleRts => {
290                let new_state = !self.rts_asserted;
291                match self.device.set_rts(new_state) {
292                    Ok(()) => {
293                        self.rts_asserted = new_state;
294                        self.bus.publish(Event::SystemMessage(format!(
295                            "RTS: {}",
296                            if new_state { "asserted" } else { "deasserted" }
297                        )));
298                    }
299                    Err(err) => {
300                        self.bus.publish(Event::Error(Arc::new(err)));
301                    }
302                }
303            }
304            Command::SendBreak => match self.device.send_break(SEND_BREAK_DURATION) {
305                Ok(()) => {
306                    self.bus.publish(Event::SystemMessage(format!(
307                        "sent {} ms break",
308                        SEND_BREAK_DURATION.as_millis()
309                    )));
310                }
311                Err(err) => {
312                    self.bus.publish(Event::Error(Arc::new(err)));
313                }
314            },
315        }
316    }
317}