Skip to main content

rtcom_core/
session.rs

1//! Session orchestrator: bridges a [`SerialDevice`] with the
2//! [`EventBus`].
3//!
4//! [`SerialDevice`]: crate::SerialDevice
5//! [`EventBus`]: crate::EventBus
6//!
7//! At v0.1 a [`Session`] runs a single task that multiplexes the serial
8//! device, the cancellation token, and the bus subscription via
9//! `tokio::select!`:
10//!
11//! - bytes arriving from the device → [`Event::RxBytes`];
12//! - [`Event::TxBytes`] published on the bus → bytes written to the device;
13//! - [`Event::Command`] published on the bus → handler dispatch (Issue #7);
14//! - cancellation token tripped or fatal I/O error → publish
15//!   [`Event::DeviceDisconnected`] (when applicable) and exit cleanly.
16//!
17//! The single-task model gives the dispatch handlers exclusive `&mut`
18//! access to the device, which is required for the synchronous control
19//! operations (`set_baud_rate`, `set_dtr`, `send_break`, ...). The
20//! tradeoff is that a long write momentarily delays reads — acceptable
21//! for an interactive serial terminal where writes are short and rare.
22//!
23//! Later issues plug in mappers (Issue #8), logging, scripting, and so
24//! on as additional bus subscribers.
25
26use std::sync::Arc;
27use std::time::Duration;
28
29use tokio::io::{AsyncReadExt, AsyncWriteExt};
30use tokio::sync::broadcast;
31use tokio_util::sync::CancellationToken;
32
33use crate::command::Command;
34use crate::config::{Parity, SerialConfig, StopBits};
35use crate::device::SerialDevice;
36use crate::error::Result;
37use crate::event::{Event, EventBus};
38use crate::mapper::{LineEndingMapper, Mapper};
39
40const fn parity_letter(p: Parity) -> char {
41    match p {
42        Parity::None => 'N',
43        Parity::Even => 'E',
44        Parity::Odd => 'O',
45        Parity::Mark => 'M',
46        Parity::Space => 'S',
47    }
48}
49
50const fn stop_bits_number(s: StopBits) -> u8 {
51    match s {
52        StopBits::One => 1,
53        StopBits::Two => 2,
54    }
55}
56
57/// Read buffer size. 4 KiB matches the typical USB-serial driver burst
58/// granularity; larger buffers waste memory, smaller ones fragment events.
59const READ_BUFFER_BYTES: usize = 4096;
60
61/// Duration of the line break asserted by the `SendBreak` command.
62const SEND_BREAK_DURATION: Duration = Duration::from_millis(250);
63
64/// Static cheatsheet text for `Command::Help`.
65const HELP_TEXT: &str = "commands: ?/h help, q/x quit, c show config, t toggle DTR, \
66                         g toggle RTS, b<rate><Enter> set baud, \\ send break";
67
68/// Owns a serial device and a bus, and runs the I/O + command loop.
69///
70/// `Session` is generic over the device type so tests can substitute a
71/// PTY pair (`SerialPortDevice::pair`) or, in the future, a fully mocked
72/// backend without dynamic dispatch overhead.
73pub struct Session<D: SerialDevice + 'static> {
74    device: D,
75    bus: EventBus,
76    cancel: CancellationToken,
77    /// Outbound mapper applied to `Event::TxBytes` payloads before they
78    /// hit the device. Defaults to a no-op `LineEndingMapper::default()`.
79    omap: Box<dyn Mapper>,
80    /// Inbound mapper applied to bytes read from the device before they
81    /// are republished as `Event::RxBytes`. Defaults to no-op.
82    imap: Box<dyn Mapper>,
83    /// Tracked DTR state. Initialised to `true` because `SerialDevice`
84    /// gives no way to query the line, and most backends open with DTR
85    /// asserted; the first toggle therefore deasserts.
86    dtr_asserted: bool,
87    /// Tracked RTS state. See `dtr_asserted` for the rationale.
88    rts_asserted: bool,
89}
90
91impl<D: SerialDevice + 'static> Session<D> {
92    /// Builds a session with a fresh bus and cancellation token,
93    /// no-op mappers on both directions.
94    #[must_use]
95    pub fn new(device: D) -> Self {
96        Self {
97            device,
98            bus: EventBus::default(),
99            cancel: CancellationToken::new(),
100            omap: Box::new(LineEndingMapper::default()),
101            imap: Box::new(LineEndingMapper::default()),
102            dtr_asserted: true,
103            rts_asserted: true,
104        }
105    }
106
107    /// Builds a session attached to a caller-supplied bus. Useful when
108    /// several subsystems already share a bus and the session should join
109    /// the existing fan-out instead of starting its own.
110    #[must_use]
111    pub fn with_bus(device: D, bus: EventBus) -> Self {
112        Self {
113            device,
114            bus,
115            cancel: CancellationToken::new(),
116            omap: Box::new(LineEndingMapper::default()),
117            imap: Box::new(LineEndingMapper::default()),
118            dtr_asserted: true,
119            rts_asserted: true,
120        }
121    }
122
123    /// Replaces the outbound mapper applied to `Event::TxBytes`
124    /// payloads before they reach the device.
125    #[must_use]
126    pub fn with_omap<M: Mapper + 'static>(mut self, mapper: M) -> Self {
127        self.omap = Box::new(mapper);
128        self
129    }
130
131    /// Replaces the inbound mapper applied to bytes read from the
132    /// device before they are republished as `Event::RxBytes`.
133    #[must_use]
134    pub fn with_imap<M: Mapper + 'static>(mut self, mapper: M) -> Self {
135        self.imap = Box::new(mapper);
136        self
137    }
138
139    /// Tells the session what the DTR line's actual state is on the
140    /// device. Use this when the caller has already issued a
141    /// `set_dtr` (e.g. main applying `--lower-dtr` right after
142    /// opening the port) so the cached state stays honest and the
143    /// first `Command::ToggleDtr` produces the right transition.
144    ///
145    /// Defaults to `true` (asserted) — the typical OS state at open.
146    #[must_use]
147    pub const fn with_initial_dtr(mut self, asserted: bool) -> Self {
148        self.dtr_asserted = asserted;
149        self
150    }
151
152    /// Tells the session what the RTS line's actual state is. See
153    /// [`with_initial_dtr`](Self::with_initial_dtr) for the rationale.
154    #[must_use]
155    pub const fn with_initial_rts(mut self, asserted: bool) -> Self {
156        self.rts_asserted = asserted;
157        self
158    }
159
160    /// Returns a reference to the bus. Clone it before calling
161    /// [`Session::run`] (which consumes `self`) if you need to publish or
162    /// subscribe from outside the session.
163    #[must_use]
164    pub const fn bus(&self) -> &EventBus {
165        &self.bus
166    }
167
168    /// Returns a clone of the cancellation token.
169    ///
170    /// Triggering [`CancellationToken::cancel`] on any clone causes
171    /// [`Session::run`] to wind down and return.
172    #[must_use]
173    pub fn cancellation_token(&self) -> CancellationToken {
174        self.cancel.clone()
175    }
176
177    /// Drives the session to completion.
178    ///
179    /// Subscribes to the bus, publishes [`Event::DeviceConnected`], then
180    /// loops until the cancellation token trips or a fatal I/O error
181    /// terminates the device.
182    ///
183    /// # Errors
184    ///
185    /// Currently always returns `Ok(())`; the variant is reserved for
186    /// startup failures introduced by later issues (e.g. mapper
187    /// initialisation).
188    pub async fn run(mut self) -> crate::Result<()> {
189        // Subscribe BEFORE publishing so the loop sees nothing it sent
190        // itself, but external pre-existing subscribers still get
191        // DeviceConnected.
192        let mut subscriber = self.bus.subscribe();
193        self.bus.publish(Event::DeviceConnected);
194
195        let mut read_buf = vec![0_u8; READ_BUFFER_BYTES];
196        loop {
197            tokio::select! {
198                biased;
199                () = self.cancel.cancelled() => break,
200
201                res = self.device.read(&mut read_buf) => match res {
202                    Ok(0) => {
203                        self.bus.publish(Event::DeviceDisconnected {
204                            reason: "EOF on serial read".into(),
205                        });
206                        break;
207                    }
208                    Ok(n) => {
209                        let mapped = self.imap.map(&read_buf[..n]);
210                        self.bus.publish(Event::RxBytes(mapped));
211                    }
212                    Err(err) => {
213                        self.bus.publish(Event::DeviceDisconnected {
214                            reason: format!("serial read failed: {err}"),
215                        });
216                        break;
217                    }
218                },
219
220                msg = subscriber.recv() => match msg {
221                    Ok(Event::TxBytes(bytes)) => {
222                        let mapped = self.omap.map(&bytes);
223                        if let Err(err) = self.device.write_all(&mapped).await {
224                            self.bus.publish(Event::DeviceDisconnected {
225                                reason: format!("serial write failed: {err}"),
226                            });
227                            break;
228                        }
229                    }
230                    Ok(Event::Command(cmd)) => self.dispatch_command(cmd).await,
231                    // Lagged: we missed some events but can resume.
232                    // Other event variants are not the loop's concern.
233                    Ok(_) | Err(broadcast::error::RecvError::Lagged(_)) => {}
234                    // Closed: no senders left, nothing more will arrive.
235                    Err(broadcast::error::RecvError::Closed) => break,
236                },
237            }
238        }
239        Ok(())
240    }
241
242    /// Apply a [`Command`] to the device and bus.
243    ///
244    /// Commands that mutate the device run synchronously here; success
245    /// emits [`Event::ConfigChanged`] (`ApplyConfig` / `SetBaud`) or
246    /// [`Event::ModemLinesChanged`] (line toggles / absolute sets),
247    /// failure emits [`Event::Error`]. The `async` signature exists so
248    /// the dispatcher can await [`Session::apply_config`] without
249    /// forking a task; the other arms are synchronous and perform no
250    /// awaits.
251    pub(crate) async fn dispatch_command(&mut self, cmd: Command) {
252        match cmd {
253            Command::Quit => self.cancel.cancel(),
254            Command::Help => {
255                self.bus.publish(Event::SystemMessage(HELP_TEXT.into()));
256            }
257            Command::ShowConfig => {
258                let cfg = self.device.config();
259                self.bus.publish(Event::SystemMessage(format!(
260                    "config: {} {}{}{} flow={:?}",
261                    cfg.baud_rate,
262                    cfg.data_bits.bits(),
263                    parity_letter(cfg.parity),
264                    stop_bits_number(cfg.stop_bits),
265                    cfg.flow_control,
266                )));
267            }
268            Command::SetBaud(rate) => match self.device.set_baud_rate(rate) {
269                Ok(()) => {
270                    self.bus
271                        .publish(Event::ConfigChanged(*self.device.config()));
272                }
273                Err(err) => {
274                    self.bus.publish(Event::Error(Arc::new(err)));
275                }
276            },
277            Command::ApplyConfig(cfg) => {
278                if let Err(err) = self.apply_config(cfg).await {
279                    self.bus.publish(Event::Error(Arc::new(err)));
280                }
281                // Success path: `apply_config` already published
282                // `ConfigChanged`.
283            }
284            Command::ToggleDtr => {
285                let new_state = !self.dtr_asserted;
286                self.apply_dtr(new_state);
287            }
288            Command::ToggleRts => {
289                let new_state = !self.rts_asserted;
290                self.apply_rts(new_state);
291            }
292            Command::SetDtrAbs(state) => self.apply_dtr(state),
293            Command::SetRtsAbs(state) => self.apply_rts(state),
294            Command::SendBreak => match self.device.send_break(SEND_BREAK_DURATION) {
295                Ok(()) => {
296                    self.bus.publish(Event::SystemMessage(format!(
297                        "sent {} ms break",
298                        SEND_BREAK_DURATION.as_millis()
299                    )));
300                }
301                Err(err) => {
302                    self.bus.publish(Event::Error(Arc::new(err)));
303                }
304            },
305            Command::OpenMenu => {
306                // T4 only wires the parser → event. The actual TUI
307                // subscriber lands in a later task; for now just
308                // broadcast the signal so late-bound listeners can
309                // observe it.
310                self.bus.publish(Event::MenuOpened);
311            }
312        }
313    }
314
315    /// Drive the DTR line to `new_state`, publishing a `SystemMessage`
316    /// and a [`Event::ModemLinesChanged`] on success, or
317    /// [`Event::Error`] on failure. Shared by `ToggleDtr` and
318    /// `SetDtrAbs` so both paths surface identical observable events.
319    fn apply_dtr(&mut self, new_state: bool) {
320        match self.device.set_dtr(new_state) {
321            Ok(()) => {
322                self.dtr_asserted = new_state;
323                self.bus.publish(Event::SystemMessage(format!(
324                    "DTR: {}",
325                    if new_state { "asserted" } else { "deasserted" }
326                )));
327                self.bus.publish(Event::ModemLinesChanged {
328                    dtr: self.dtr_asserted,
329                    rts: self.rts_asserted,
330                });
331            }
332            Err(err) => {
333                self.bus.publish(Event::Error(Arc::new(err)));
334            }
335        }
336    }
337
338    /// RTS counterpart to [`Self::apply_dtr`].
339    fn apply_rts(&mut self, new_state: bool) {
340        match self.device.set_rts(new_state) {
341            Ok(()) => {
342                self.rts_asserted = new_state;
343                self.bus.publish(Event::SystemMessage(format!(
344                    "RTS: {}",
345                    if new_state { "asserted" } else { "deasserted" }
346                )));
347                self.bus.publish(Event::ModemLinesChanged {
348                    dtr: self.dtr_asserted,
349                    rts: self.rts_asserted,
350                });
351            }
352            Err(err) => {
353                self.bus.publish(Event::Error(Arc::new(err)));
354            }
355        }
356    }
357
358    /// Apply a new [`SerialConfig`] to the device atomically.
359    ///
360    /// Applies `baud_rate → data_bits → stop_bits → parity → flow_control`
361    /// in that fixed order. On the first failing step, best-effort-rolls
362    /// back the previously-applied steps to the configuration that was
363    /// live at entry, returns the [`Error`](crate::Error) from the failing
364    /// step, and does not publish [`Event::ConfigChanged`]. On full
365    /// success, publishes [`Event::ConfigChanged`] with the device's
366    /// post-apply configuration and returns `Ok(())`.
367    ///
368    /// Fields whose new value equals the current value still go through
369    /// the setter call — the backend is free to short-circuit, and keeping
370    /// the apply sequence uniform avoids branchy rollback state.
371    ///
372    /// This method is `async` for forward compatibility with backends
373    /// whose setters may need to await (e.g. remote devices); the current
374    /// `serialport` backend is synchronous so the body performs no
375    /// awaits.
376    ///
377    /// # Errors
378    ///
379    /// Returns the first setter failure encountered. Rollback failures
380    /// are best-effort and silently swallowed — the device is already in
381    /// an inconsistent state by that point and surfacing a secondary
382    /// error would mask the original cause.
383    // `async` is deliberate: the public API is async so a future backend
384    // (e.g. a networked device whose setters must round-trip) can plug in
385    // without a breaking signature change. The current synchronous path
386    // simply performs no awaits.
387    #[allow(clippy::unused_async)]
388    pub async fn apply_config(&mut self, new: SerialConfig) -> Result<()> {
389        let snapshot = *self.device.config();
390
391        if let Err(e) = self.device.set_baud_rate(new.baud_rate) {
392            self.rollback(&snapshot);
393            return Err(e);
394        }
395        if let Err(e) = self.device.set_data_bits(new.data_bits) {
396            self.rollback(&snapshot);
397            return Err(e);
398        }
399        if let Err(e) = self.device.set_stop_bits(new.stop_bits) {
400            self.rollback(&snapshot);
401            return Err(e);
402        }
403        if let Err(e) = self.device.set_parity(new.parity) {
404            self.rollback(&snapshot);
405            return Err(e);
406        }
407        if let Err(e) = self.device.set_flow_control(new.flow_control) {
408            self.rollback(&snapshot);
409            return Err(e);
410        }
411
412        self.bus
413            .publish(Event::ConfigChanged(*self.device.config()));
414        Ok(())
415    }
416
417    /// Best-effort rollback to `snapshot`. Errors are intentionally
418    /// ignored: the device is already inconsistent, and we prefer to
419    /// surface the original failure to the caller.
420    fn rollback(&mut self, snapshot: &SerialConfig) {
421        let _ = self.device.set_baud_rate(snapshot.baud_rate);
422        let _ = self.device.set_data_bits(snapshot.data_bits);
423        let _ = self.device.set_stop_bits(snapshot.stop_bits);
424        let _ = self.device.set_parity(snapshot.parity);
425        let _ = self.device.set_flow_control(snapshot.flow_control);
426    }
427}
428
429#[cfg(test)]
430mod tests {
431    //! Unit tests for [`Session::apply_config`] using an in-module
432    //! [`MockDevice`]. The mock is intentionally not exposed outside
433    //! this file — integration tests use [`crate::SerialPortDevice::pair`]
434    //! which offers a real PTY but cannot drive setter failures.
435
436    use std::pin::Pin;
437    use std::task::{Context, Poll};
438    use std::time::Duration;
439
440    use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
441    use tokio::sync::broadcast::error::TryRecvError;
442
443    use super::{Event, Result, SerialDevice, Session};
444    use crate::command::Command;
445    use crate::config::{DataBits, FlowControl, ModemStatus, Parity, SerialConfig, StopBits};
446    use crate::error::Error;
447
448    /// In-memory [`SerialDevice`] with programmable setter failures.
449    ///
450    /// Each setter can be armed to fail on its next call via the
451    /// corresponding `fail_*` flag; the flag consumes itself (one-shot)
452    /// so a rearmed setter fails exactly once.
453    //
454    // The five booleans model five independent one-shot triggers on the
455    // five distinct setters; a state machine or enum would be strictly
456    // more awkward for this "pick which steps blow up" harness.
457    #[allow(clippy::struct_excessive_bools)]
458    struct MockDevice {
459        config: SerialConfig,
460        fail_baud: bool,
461        fail_data_bits: bool,
462        fail_stop_bits: bool,
463        fail_parity: bool,
464        fail_flow: bool,
465    }
466
467    impl MockDevice {
468        const fn new(config: SerialConfig) -> Self {
469            Self {
470                config,
471                fail_baud: false,
472                fail_data_bits: false,
473                fail_stop_bits: false,
474                fail_parity: false,
475                fail_flow: false,
476            }
477        }
478    }
479
480    impl AsyncRead for MockDevice {
481        fn poll_read(
482            self: Pin<&mut Self>,
483            _cx: &mut Context<'_>,
484            _buf: &mut ReadBuf<'_>,
485        ) -> Poll<std::io::Result<()>> {
486            Poll::Pending
487        }
488    }
489
490    impl AsyncWrite for MockDevice {
491        fn poll_write(
492            self: Pin<&mut Self>,
493            _cx: &mut Context<'_>,
494            buf: &[u8],
495        ) -> Poll<std::io::Result<usize>> {
496            Poll::Ready(Ok(buf.len()))
497        }
498        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
499            Poll::Ready(Ok(()))
500        }
501        fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
502            Poll::Ready(Ok(()))
503        }
504    }
505
506    impl SerialDevice for MockDevice {
507        fn set_baud_rate(&mut self, baud: u32) -> Result<()> {
508            if self.fail_baud {
509                self.fail_baud = false;
510                return Err(Error::InvalidConfig("mock: baud fail".into()));
511            }
512            self.config.baud_rate = baud;
513            Ok(())
514        }
515        fn set_data_bits(&mut self, bits: DataBits) -> Result<()> {
516            if self.fail_data_bits {
517                self.fail_data_bits = false;
518                return Err(Error::InvalidConfig("mock: data_bits fail".into()));
519            }
520            self.config.data_bits = bits;
521            Ok(())
522        }
523        fn set_stop_bits(&mut self, bits: StopBits) -> Result<()> {
524            if self.fail_stop_bits {
525                self.fail_stop_bits = false;
526                return Err(Error::InvalidConfig("mock: stop_bits fail".into()));
527            }
528            self.config.stop_bits = bits;
529            Ok(())
530        }
531        fn set_parity(&mut self, parity: Parity) -> Result<()> {
532            if self.fail_parity {
533                self.fail_parity = false;
534                return Err(Error::InvalidConfig("mock: parity fail".into()));
535            }
536            self.config.parity = parity;
537            Ok(())
538        }
539        fn set_flow_control(&mut self, flow: FlowControl) -> Result<()> {
540            if self.fail_flow {
541                self.fail_flow = false;
542                return Err(Error::InvalidConfig("mock: flow fail".into()));
543            }
544            self.config.flow_control = flow;
545            Ok(())
546        }
547        fn set_dtr(&mut self, _level: bool) -> Result<()> {
548            Ok(())
549        }
550        fn set_rts(&mut self, _level: bool) -> Result<()> {
551            Ok(())
552        }
553        fn send_break(&mut self, _duration: Duration) -> Result<()> {
554            Ok(())
555        }
556        fn modem_status(&mut self) -> Result<ModemStatus> {
557            Ok(ModemStatus::default())
558        }
559        fn config(&self) -> &SerialConfig {
560            &self.config
561        }
562    }
563
564    fn new_cfg() -> SerialConfig {
565        SerialConfig {
566            baud_rate: 9600,
567            data_bits: DataBits::Seven,
568            stop_bits: StopBits::Two,
569            parity: Parity::Even,
570            flow_control: FlowControl::Hardware,
571            ..SerialConfig::default()
572        }
573    }
574
575    #[tokio::test]
576    async fn apply_config_success_publishes_config_changed() {
577        let device = MockDevice::new(SerialConfig::default());
578        let mut session = Session::new(device);
579        let mut rx = session.bus().subscribe();
580
581        let target = new_cfg();
582        session
583            .apply_config(target)
584            .await
585            .expect("apply_config should succeed");
586
587        // Device state reflects the new config.
588        let got = session.device.config();
589        assert_eq!(got.baud_rate, target.baud_rate);
590        assert_eq!(got.data_bits, target.data_bits);
591        assert_eq!(got.stop_bits, target.stop_bits);
592        assert_eq!(got.parity, target.parity);
593        assert_eq!(got.flow_control, target.flow_control);
594
595        // Event::ConfigChanged was published with the new config.
596        match rx.try_recv() {
597            Ok(Event::ConfigChanged(cfg)) => {
598                assert_eq!(cfg.baud_rate, target.baud_rate);
599                assert_eq!(cfg.flow_control, target.flow_control);
600            }
601            other => panic!("expected ConfigChanged, got {other:?}"),
602        }
603    }
604
605    #[tokio::test]
606    async fn apply_config_rolls_back_on_middle_failure() {
607        // Start at default, arm the flow-control setter to fail.
608        let mut device = MockDevice::new(SerialConfig::default());
609        device.fail_flow = true;
610        let initial = *device.config();
611
612        let mut session = Session::new(device);
613        let mut rx = session.bus().subscribe();
614
615        let target = new_cfg();
616        let err = session
617            .apply_config(target)
618            .await
619            .expect_err("apply_config must fail when flow setter errors");
620        assert!(matches!(err, Error::InvalidConfig(_)));
621
622        // Device state was rolled back to the pre-apply snapshot.
623        let got = session.device.config();
624        assert_eq!(got.baud_rate, initial.baud_rate);
625        assert_eq!(got.data_bits, initial.data_bits);
626        assert_eq!(got.stop_bits, initial.stop_bits);
627        assert_eq!(got.parity, initial.parity);
628        assert_eq!(got.flow_control, initial.flow_control);
629
630        // No ConfigChanged event was published.
631        match rx.try_recv() {
632            Err(TryRecvError::Empty) => {}
633            Ok(Event::ConfigChanged(_)) => panic!("unexpected ConfigChanged after rollback"),
634            other => panic!("unexpected bus state: {other:?}"),
635        }
636    }
637
638    #[tokio::test]
639    async fn apply_config_command_dispatches_through_session() {
640        let device = MockDevice::new(SerialConfig::default());
641        let mut session = Session::new(device);
642        let mut rx = session.bus().subscribe();
643
644        let target = SerialConfig {
645            baud_rate: 9600,
646            ..SerialConfig::default()
647        };
648        session.dispatch_command(Command::ApplyConfig(target)).await;
649
650        let ev = rx.try_recv().expect("ConfigChanged should be on the bus");
651        match ev {
652            Event::ConfigChanged(cfg) => assert_eq!(cfg.baud_rate, 9600),
653            other => panic!("expected ConfigChanged, got {other:?}"),
654        }
655    }
656
657    #[tokio::test]
658    async fn apply_config_command_on_failure_publishes_error() {
659        let mut device = MockDevice::new(SerialConfig::default());
660        device.fail_baud = true;
661        let mut session = Session::new(device);
662        let mut rx = session.bus().subscribe();
663
664        let target = SerialConfig {
665            baud_rate: 9600,
666            ..SerialConfig::default()
667        };
668        session.dispatch_command(Command::ApplyConfig(target)).await;
669
670        match rx.try_recv() {
671            Ok(Event::Error(_)) => {}
672            other => panic!("expected Error, got {other:?}"),
673        }
674    }
675
676    #[tokio::test]
677    async fn set_dtr_abs_publishes_modem_lines_changed() {
678        let device = MockDevice::new(SerialConfig::default());
679        let mut session = Session::new(device);
680        let mut rx = session.bus().subscribe();
681
682        session.dispatch_command(Command::SetDtrAbs(true)).await;
683
684        // Expected sequence: SystemMessage, ModemLinesChanged.
685        match rx.recv().await.unwrap() {
686            Event::SystemMessage(_) => {}
687            other => panic!("expected SystemMessage, got {other:?}"),
688        }
689        match rx.recv().await.unwrap() {
690            Event::ModemLinesChanged { dtr, rts } => {
691                assert!(dtr);
692                // `rts_asserted` defaults to `true` in Session::new.
693                assert!(rts);
694            }
695            other => panic!("expected ModemLinesChanged, got {other:?}"),
696        }
697    }
698
699    #[tokio::test]
700    async fn set_rts_abs_publishes_modem_lines_changed() {
701        let device = MockDevice::new(SerialConfig::default());
702        let mut session = Session::new(device);
703        let mut rx = session.bus().subscribe();
704
705        session.dispatch_command(Command::SetRtsAbs(false)).await;
706
707        let _ = rx.recv().await; // SystemMessage
708        match rx.recv().await.unwrap() {
709            Event::ModemLinesChanged { dtr, rts } => {
710                assert!(dtr);
711                assert!(!rts);
712            }
713            other => panic!("expected ModemLinesChanged, got {other:?}"),
714        }
715    }
716
717    #[tokio::test]
718    async fn toggle_dtr_now_also_publishes_modem_lines_changed() {
719        let device = MockDevice::new(SerialConfig::default());
720        let mut session = Session::new(device);
721        let mut rx = session.bus().subscribe();
722
723        session.dispatch_command(Command::ToggleDtr).await;
724
725        let _ = rx.recv().await; // SystemMessage (existing pre-T17 behaviour)
726        match rx.recv().await.unwrap() {
727            Event::ModemLinesChanged { dtr, rts } => {
728                // Toggle from the default (true) lowers DTR.
729                assert!(!dtr);
730                assert!(rts);
731            }
732            other => panic!("expected ModemLinesChanged, got {other:?}"),
733        }
734    }
735
736    #[tokio::test]
737    async fn toggle_rts_now_also_publishes_modem_lines_changed() {
738        let device = MockDevice::new(SerialConfig::default());
739        let mut session = Session::new(device);
740        let mut rx = session.bus().subscribe();
741
742        session.dispatch_command(Command::ToggleRts).await;
743
744        let _ = rx.recv().await; // SystemMessage
745        match rx.recv().await.unwrap() {
746            Event::ModemLinesChanged { dtr, rts } => {
747                assert!(dtr);
748                assert!(!rts);
749            }
750            other => panic!("expected ModemLinesChanged, got {other:?}"),
751        }
752    }
753
754    #[tokio::test]
755    async fn apply_config_rolls_back_on_first_step_failure() {
756        // Arm baud to fail — the very first step.
757        let mut device = MockDevice::new(SerialConfig::default());
758        device.fail_baud = true;
759        let initial = *device.config();
760
761        let mut session = Session::new(device);
762        let mut rx = session.bus().subscribe();
763
764        let target = new_cfg();
765        let err = session
766            .apply_config(target)
767            .await
768            .expect_err("apply_config must fail when baud setter errors");
769        assert!(matches!(err, Error::InvalidConfig(_)));
770
771        // Device state is unchanged (rollback is a no-op since nothing
772        // succeeded before the failing step, but we still verify).
773        let got = session.device.config();
774        assert_eq!(got, &initial);
775
776        // No ConfigChanged event was published.
777        match rx.try_recv() {
778            Err(TryRecvError::Empty) => {}
779            Ok(Event::ConfigChanged(_)) => panic!("unexpected ConfigChanged after rollback"),
780            other => panic!("unexpected bus state: {other:?}"),
781        }
782    }
783}