Skip to main content

rtcom_core/
session.rs

1//! Session orchestrator: bridges a [`SerialDevice`] with the
2//! [`EventBus`].
3//!
4//! [`SerialDevice`]: crate::SerialDevice
5//! [`EventBus`]: crate::EventBus
6//!
7//! At v0.1 a [`Session`] runs a single task that multiplexes the serial
8//! device, the cancellation token, and the bus subscription via
9//! `tokio::select!`:
10//!
11//! - bytes arriving from the device → [`Event::RxBytes`];
12//! - [`Event::TxBytes`] published on the bus → bytes written to the device;
13//! - [`Event::Command`] published on the bus → handler dispatch (Issue #7);
14//! - cancellation token tripped or fatal I/O error → publish
15//!   [`Event::DeviceDisconnected`] (when applicable) and exit cleanly.
16//!
17//! The single-task model gives the dispatch handlers exclusive `&mut`
18//! access to the device, which is required for the synchronous control
19//! operations (`set_baud_rate`, `set_dtr`, `send_break`, ...). The
20//! tradeoff is that a long write momentarily delays reads — acceptable
21//! for an interactive serial terminal where writes are short and rare.
22//!
23//! Later issues plug in mappers (Issue #8), logging, scripting, and so
24//! on as additional bus subscribers.
25
26use std::sync::Arc;
27use std::time::Duration;
28
29use tokio::io::{AsyncReadExt, AsyncWriteExt};
30use tokio::sync::broadcast;
31use tokio_util::sync::CancellationToken;
32
33use crate::command::Command;
34use crate::config::{Parity, StopBits};
35use crate::device::SerialDevice;
36use crate::event::{Event, EventBus};
37use crate::mapper::{LineEndingMapper, Mapper};
38
39const fn parity_letter(p: Parity) -> char {
40    match p {
41        Parity::None => 'N',
42        Parity::Even => 'E',
43        Parity::Odd => 'O',
44        Parity::Mark => 'M',
45        Parity::Space => 'S',
46    }
47}
48
49const fn stop_bits_number(s: StopBits) -> u8 {
50    match s {
51        StopBits::One => 1,
52        StopBits::Two => 2,
53    }
54}
55
56/// Read buffer size. 4 KiB matches the typical USB-serial driver burst
57/// granularity; larger buffers waste memory, smaller ones fragment events.
58const READ_BUFFER_BYTES: usize = 4096;
59
60/// Duration of the line break asserted by the `SendBreak` command.
61const SEND_BREAK_DURATION: Duration = Duration::from_millis(250);
62
63/// Static cheatsheet text for `Command::Help`.
64const HELP_TEXT: &str = "commands: ?/h help, q/x quit, c show config, t toggle DTR, \
65                         g toggle RTS, b<rate><Enter> set baud, \\ send break";
66
67/// Owns a serial device and a bus, and runs the I/O + command loop.
68///
69/// `Session` is generic over the device type so tests can substitute a
70/// PTY pair (`SerialPortDevice::pair`) or, in the future, a fully mocked
71/// backend without dynamic dispatch overhead.
72pub struct Session<D: SerialDevice + 'static> {
73    device: D,
74    bus: EventBus,
75    cancel: CancellationToken,
76    /// Outbound mapper applied to `Event::TxBytes` payloads before they
77    /// hit the device. Defaults to a no-op `LineEndingMapper::default()`.
78    omap: Box<dyn Mapper>,
79    /// Inbound mapper applied to bytes read from the device before they
80    /// are republished as `Event::RxBytes`. Defaults to no-op.
81    imap: Box<dyn Mapper>,
82    /// Tracked DTR state. Initialised to `true` because `SerialDevice`
83    /// gives no way to query the line, and most backends open with DTR
84    /// asserted; the first toggle therefore deasserts.
85    dtr_asserted: bool,
86    /// Tracked RTS state. See `dtr_asserted` for the rationale.
87    rts_asserted: bool,
88}
89
90impl<D: SerialDevice + 'static> Session<D> {
91    /// Builds a session with a fresh bus and cancellation token,
92    /// no-op mappers on both directions.
93    #[must_use]
94    pub fn new(device: D) -> Self {
95        Self {
96            device,
97            bus: EventBus::default(),
98            cancel: CancellationToken::new(),
99            omap: Box::new(LineEndingMapper::default()),
100            imap: Box::new(LineEndingMapper::default()),
101            dtr_asserted: true,
102            rts_asserted: true,
103        }
104    }
105
106    /// Builds a session attached to a caller-supplied bus. Useful when
107    /// several subsystems already share a bus and the session should join
108    /// the existing fan-out instead of starting its own.
109    #[must_use]
110    pub fn with_bus(device: D, bus: EventBus) -> Self {
111        Self {
112            device,
113            bus,
114            cancel: CancellationToken::new(),
115            omap: Box::new(LineEndingMapper::default()),
116            imap: Box::new(LineEndingMapper::default()),
117            dtr_asserted: true,
118            rts_asserted: true,
119        }
120    }
121
122    /// Replaces the outbound mapper applied to `Event::TxBytes`
123    /// payloads before they reach the device.
124    #[must_use]
125    pub fn with_omap<M: Mapper + 'static>(mut self, mapper: M) -> Self {
126        self.omap = Box::new(mapper);
127        self
128    }
129
130    /// Replaces the inbound mapper applied to bytes read from the
131    /// device before they are republished as `Event::RxBytes`.
132    #[must_use]
133    pub fn with_imap<M: Mapper + 'static>(mut self, mapper: M) -> Self {
134        self.imap = Box::new(mapper);
135        self
136    }
137
138    /// Returns a reference to the bus. Clone it before calling
139    /// [`Session::run`] (which consumes `self`) if you need to publish or
140    /// subscribe from outside the session.
141    #[must_use]
142    pub const fn bus(&self) -> &EventBus {
143        &self.bus
144    }
145
146    /// Returns a clone of the cancellation token.
147    ///
148    /// Triggering [`CancellationToken::cancel`] on any clone causes
149    /// [`Session::run`] to wind down and return.
150    #[must_use]
151    pub fn cancellation_token(&self) -> CancellationToken {
152        self.cancel.clone()
153    }
154
155    /// Drives the session to completion.
156    ///
157    /// Subscribes to the bus, publishes [`Event::DeviceConnected`], then
158    /// loops until the cancellation token trips or a fatal I/O error
159    /// terminates the device.
160    ///
161    /// # Errors
162    ///
163    /// Currently always returns `Ok(())`; the variant is reserved for
164    /// startup failures introduced by later issues (e.g. mapper
165    /// initialisation).
166    pub async fn run(mut self) -> crate::Result<()> {
167        // Subscribe BEFORE publishing so the loop sees nothing it sent
168        // itself, but external pre-existing subscribers still get
169        // DeviceConnected.
170        let mut subscriber = self.bus.subscribe();
171        self.bus.publish(Event::DeviceConnected);
172
173        let mut read_buf = vec![0_u8; READ_BUFFER_BYTES];
174        loop {
175            tokio::select! {
176                biased;
177                () = self.cancel.cancelled() => break,
178
179                res = self.device.read(&mut read_buf) => match res {
180                    Ok(0) => {
181                        self.bus.publish(Event::DeviceDisconnected {
182                            reason: "EOF on serial read".into(),
183                        });
184                        break;
185                    }
186                    Ok(n) => {
187                        let mapped = self.imap.map(&read_buf[..n]);
188                        self.bus.publish(Event::RxBytes(mapped));
189                    }
190                    Err(err) => {
191                        self.bus.publish(Event::DeviceDisconnected {
192                            reason: format!("serial read failed: {err}"),
193                        });
194                        break;
195                    }
196                },
197
198                msg = subscriber.recv() => match msg {
199                    Ok(Event::TxBytes(bytes)) => {
200                        let mapped = self.omap.map(&bytes);
201                        if let Err(err) = self.device.write_all(&mapped).await {
202                            self.bus.publish(Event::DeviceDisconnected {
203                                reason: format!("serial write failed: {err}"),
204                            });
205                            break;
206                        }
207                    }
208                    Ok(Event::Command(cmd)) => self.dispatch_command(cmd),
209                    // Lagged: we missed some events but can resume.
210                    // Other event variants are not the loop's concern.
211                    Ok(_) | Err(broadcast::error::RecvError::Lagged(_)) => {}
212                    // Closed: no senders left, nothing more will arrive.
213                    Err(broadcast::error::RecvError::Closed) => break,
214                },
215            }
216        }
217        Ok(())
218    }
219
220    /// Apply a [`Command`] to the device and bus.
221    ///
222    /// Commands that mutate the device run synchronously here; success
223    /// emits [`Event::ConfigChanged`] (when applicable), failure emits
224    /// [`Event::Error`]. The caller (the `Session::run` loop) does not
225    /// need to await anything: every operation either completes
226    /// immediately or is fire-and-forget.
227    fn dispatch_command(&mut self, cmd: Command) {
228        match cmd {
229            Command::Quit => self.cancel.cancel(),
230            Command::Help => {
231                self.bus.publish(Event::SystemMessage(HELP_TEXT.into()));
232            }
233            Command::ShowConfig => {
234                let cfg = self.device.config();
235                self.bus.publish(Event::SystemMessage(format!(
236                    "config: {} {}{}{} flow={:?}",
237                    cfg.baud_rate,
238                    cfg.data_bits.bits(),
239                    parity_letter(cfg.parity),
240                    stop_bits_number(cfg.stop_bits),
241                    cfg.flow_control,
242                )));
243            }
244            Command::SetBaud(rate) => match self.device.set_baud_rate(rate) {
245                Ok(()) => {
246                    self.bus
247                        .publish(Event::ConfigChanged(*self.device.config()));
248                }
249                Err(err) => {
250                    self.bus.publish(Event::Error(Arc::new(err)));
251                }
252            },
253            Command::ToggleDtr => {
254                let new_state = !self.dtr_asserted;
255                match self.device.set_dtr(new_state) {
256                    Ok(()) => {
257                        self.dtr_asserted = new_state;
258                        self.bus.publish(Event::SystemMessage(format!(
259                            "DTR: {}",
260                            if new_state { "asserted" } else { "deasserted" }
261                        )));
262                    }
263                    Err(err) => {
264                        self.bus.publish(Event::Error(Arc::new(err)));
265                    }
266                }
267            }
268            Command::ToggleRts => {
269                let new_state = !self.rts_asserted;
270                match self.device.set_rts(new_state) {
271                    Ok(()) => {
272                        self.rts_asserted = new_state;
273                        self.bus.publish(Event::SystemMessage(format!(
274                            "RTS: {}",
275                            if new_state { "asserted" } else { "deasserted" }
276                        )));
277                    }
278                    Err(err) => {
279                        self.bus.publish(Event::Error(Arc::new(err)));
280                    }
281                }
282            }
283            Command::SendBreak => match self.device.send_break(SEND_BREAK_DURATION) {
284                Ok(()) => {
285                    self.bus.publish(Event::SystemMessage(format!(
286                        "sent {} ms break",
287                        SEND_BREAK_DURATION.as_millis()
288                    )));
289                }
290                Err(err) => {
291                    self.bus.publish(Event::Error(Arc::new(err)));
292                }
293            },
294        }
295    }
296}