1use std::sync::Arc;
27use std::time::Duration;
28
29use tokio::io::{AsyncReadExt, AsyncWriteExt};
30use tokio::sync::broadcast;
31use tokio_util::sync::CancellationToken;
32
33use crate::command::Command;
34use crate::config::{Parity, SerialConfig, StopBits};
35use crate::device::SerialDevice;
36use crate::error::Result;
37use crate::event::{Event, EventBus};
38use crate::mapper::{LineEndingMapper, Mapper};
39
40const fn parity_letter(p: Parity) -> char {
41 match p {
42 Parity::None => 'N',
43 Parity::Even => 'E',
44 Parity::Odd => 'O',
45 Parity::Mark => 'M',
46 Parity::Space => 'S',
47 }
48}
49
50const fn stop_bits_number(s: StopBits) -> u8 {
51 match s {
52 StopBits::One => 1,
53 StopBits::Two => 2,
54 }
55}
56
57const READ_BUFFER_BYTES: usize = 4096;
60
61const SEND_BREAK_DURATION: Duration = Duration::from_millis(250);
63
64const HELP_TEXT: &str = "commands: ?/h help, q/x quit, c show config, t toggle DTR, \
66 g toggle RTS, b<rate><Enter> set baud, \\ send break";
67
68pub struct Session<D: SerialDevice + 'static> {
74 device: D,
75 bus: EventBus,
76 cancel: CancellationToken,
77 omap: Box<dyn Mapper>,
80 imap: Box<dyn Mapper>,
83 dtr_asserted: bool,
87 rts_asserted: bool,
89}
90
91impl<D: SerialDevice + 'static> Session<D> {
92 #[must_use]
95 pub fn new(device: D) -> Self {
96 Self {
97 device,
98 bus: EventBus::default(),
99 cancel: CancellationToken::new(),
100 omap: Box::new(LineEndingMapper::default()),
101 imap: Box::new(LineEndingMapper::default()),
102 dtr_asserted: true,
103 rts_asserted: true,
104 }
105 }
106
107 #[must_use]
111 pub fn with_bus(device: D, bus: EventBus) -> Self {
112 Self {
113 device,
114 bus,
115 cancel: CancellationToken::new(),
116 omap: Box::new(LineEndingMapper::default()),
117 imap: Box::new(LineEndingMapper::default()),
118 dtr_asserted: true,
119 rts_asserted: true,
120 }
121 }
122
123 #[must_use]
126 pub fn with_omap<M: Mapper + 'static>(mut self, mapper: M) -> Self {
127 self.omap = Box::new(mapper);
128 self
129 }
130
131 #[must_use]
134 pub fn with_imap<M: Mapper + 'static>(mut self, mapper: M) -> Self {
135 self.imap = Box::new(mapper);
136 self
137 }
138
139 #[must_use]
147 pub const fn with_initial_dtr(mut self, asserted: bool) -> Self {
148 self.dtr_asserted = asserted;
149 self
150 }
151
152 #[must_use]
155 pub const fn with_initial_rts(mut self, asserted: bool) -> Self {
156 self.rts_asserted = asserted;
157 self
158 }
159
160 #[must_use]
164 pub const fn bus(&self) -> &EventBus {
165 &self.bus
166 }
167
168 #[must_use]
173 pub fn cancellation_token(&self) -> CancellationToken {
174 self.cancel.clone()
175 }
176
177 pub async fn run(mut self) -> crate::Result<()> {
189 let mut subscriber = self.bus.subscribe();
193 self.bus.publish(Event::DeviceConnected);
194
195 let mut read_buf = vec![0_u8; READ_BUFFER_BYTES];
196 loop {
197 tokio::select! {
198 biased;
199 () = self.cancel.cancelled() => break,
200
201 res = self.device.read(&mut read_buf) => match res {
202 Ok(0) => {
203 self.bus.publish(Event::DeviceDisconnected {
204 reason: "EOF on serial read".into(),
205 });
206 break;
207 }
208 Ok(n) => {
209 let mapped = self.imap.map(&read_buf[..n]);
210 self.bus.publish(Event::RxBytes(mapped));
211 }
212 Err(err) => {
213 self.bus.publish(Event::DeviceDisconnected {
214 reason: format!("serial read failed: {err}"),
215 });
216 break;
217 }
218 },
219
220 msg = subscriber.recv() => match msg {
221 Ok(Event::TxBytes(bytes)) => {
222 let mapped = self.omap.map(&bytes);
223 if let Err(err) = self.device.write_all(&mapped).await {
224 self.bus.publish(Event::DeviceDisconnected {
225 reason: format!("serial write failed: {err}"),
226 });
227 break;
228 }
229 }
230 Ok(Event::Command(cmd)) => self.dispatch_command(cmd).await,
231 Ok(_) | Err(broadcast::error::RecvError::Lagged(_)) => {}
234 Err(broadcast::error::RecvError::Closed) => break,
236 },
237 }
238 }
239 Ok(())
240 }
241
242 pub(crate) async fn dispatch_command(&mut self, cmd: Command) {
252 match cmd {
253 Command::Quit => self.cancel.cancel(),
254 Command::Help => {
255 self.bus.publish(Event::SystemMessage(HELP_TEXT.into()));
256 }
257 Command::ShowConfig => {
258 let cfg = self.device.config();
259 self.bus.publish(Event::SystemMessage(format!(
260 "config: {} {}{}{} flow={:?}",
261 cfg.baud_rate,
262 cfg.data_bits.bits(),
263 parity_letter(cfg.parity),
264 stop_bits_number(cfg.stop_bits),
265 cfg.flow_control,
266 )));
267 }
268 Command::SetBaud(rate) => match self.device.set_baud_rate(rate) {
269 Ok(()) => {
270 self.bus
271 .publish(Event::ConfigChanged(*self.device.config()));
272 }
273 Err(err) => {
274 self.bus.publish(Event::Error(Arc::new(err)));
275 }
276 },
277 Command::ApplyConfig(cfg) => {
278 if let Err(err) = self.apply_config(cfg).await {
279 self.bus.publish(Event::Error(Arc::new(err)));
280 }
281 }
284 Command::ToggleDtr => {
285 let new_state = !self.dtr_asserted;
286 self.apply_dtr(new_state);
287 }
288 Command::ToggleRts => {
289 let new_state = !self.rts_asserted;
290 self.apply_rts(new_state);
291 }
292 Command::SetDtrAbs(state) => self.apply_dtr(state),
293 Command::SetRtsAbs(state) => self.apply_rts(state),
294 Command::SendBreak => match self.device.send_break(SEND_BREAK_DURATION) {
295 Ok(()) => {
296 self.bus.publish(Event::SystemMessage(format!(
297 "sent {} ms break",
298 SEND_BREAK_DURATION.as_millis()
299 )));
300 }
301 Err(err) => {
302 self.bus.publish(Event::Error(Arc::new(err)));
303 }
304 },
305 Command::OpenMenu => {
306 self.bus.publish(Event::MenuOpened);
311 }
312 }
313 }
314
315 fn apply_dtr(&mut self, new_state: bool) {
320 match self.device.set_dtr(new_state) {
321 Ok(()) => {
322 self.dtr_asserted = new_state;
323 self.bus.publish(Event::SystemMessage(format!(
324 "DTR: {}",
325 if new_state { "asserted" } else { "deasserted" }
326 )));
327 self.bus.publish(Event::ModemLinesChanged {
328 dtr: self.dtr_asserted,
329 rts: self.rts_asserted,
330 });
331 }
332 Err(err) => {
333 self.bus.publish(Event::Error(Arc::new(err)));
334 }
335 }
336 }
337
338 fn apply_rts(&mut self, new_state: bool) {
340 match self.device.set_rts(new_state) {
341 Ok(()) => {
342 self.rts_asserted = new_state;
343 self.bus.publish(Event::SystemMessage(format!(
344 "RTS: {}",
345 if new_state { "asserted" } else { "deasserted" }
346 )));
347 self.bus.publish(Event::ModemLinesChanged {
348 dtr: self.dtr_asserted,
349 rts: self.rts_asserted,
350 });
351 }
352 Err(err) => {
353 self.bus.publish(Event::Error(Arc::new(err)));
354 }
355 }
356 }
357
358 #[allow(clippy::unused_async)]
388 pub async fn apply_config(&mut self, new: SerialConfig) -> Result<()> {
389 let snapshot = *self.device.config();
390
391 if let Err(e) = self.device.set_baud_rate(new.baud_rate) {
392 self.rollback(&snapshot);
393 return Err(e);
394 }
395 if let Err(e) = self.device.set_data_bits(new.data_bits) {
396 self.rollback(&snapshot);
397 return Err(e);
398 }
399 if let Err(e) = self.device.set_stop_bits(new.stop_bits) {
400 self.rollback(&snapshot);
401 return Err(e);
402 }
403 if let Err(e) = self.device.set_parity(new.parity) {
404 self.rollback(&snapshot);
405 return Err(e);
406 }
407 if let Err(e) = self.device.set_flow_control(new.flow_control) {
408 self.rollback(&snapshot);
409 return Err(e);
410 }
411
412 self.bus
413 .publish(Event::ConfigChanged(*self.device.config()));
414 Ok(())
415 }
416
417 fn rollback(&mut self, snapshot: &SerialConfig) {
421 let _ = self.device.set_baud_rate(snapshot.baud_rate);
422 let _ = self.device.set_data_bits(snapshot.data_bits);
423 let _ = self.device.set_stop_bits(snapshot.stop_bits);
424 let _ = self.device.set_parity(snapshot.parity);
425 let _ = self.device.set_flow_control(snapshot.flow_control);
426 }
427}
428
429#[cfg(test)]
430mod tests {
431 use std::pin::Pin;
437 use std::task::{Context, Poll};
438 use std::time::Duration;
439
440 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
441 use tokio::sync::broadcast::error::TryRecvError;
442
443 use super::{Event, Result, SerialDevice, Session};
444 use crate::command::Command;
445 use crate::config::{DataBits, FlowControl, ModemStatus, Parity, SerialConfig, StopBits};
446 use crate::error::Error;
447
448 #[allow(clippy::struct_excessive_bools)]
458 struct MockDevice {
459 config: SerialConfig,
460 fail_baud: bool,
461 fail_data_bits: bool,
462 fail_stop_bits: bool,
463 fail_parity: bool,
464 fail_flow: bool,
465 }
466
467 impl MockDevice {
468 const fn new(config: SerialConfig) -> Self {
469 Self {
470 config,
471 fail_baud: false,
472 fail_data_bits: false,
473 fail_stop_bits: false,
474 fail_parity: false,
475 fail_flow: false,
476 }
477 }
478 }
479
480 impl AsyncRead for MockDevice {
481 fn poll_read(
482 self: Pin<&mut Self>,
483 _cx: &mut Context<'_>,
484 _buf: &mut ReadBuf<'_>,
485 ) -> Poll<std::io::Result<()>> {
486 Poll::Pending
487 }
488 }
489
490 impl AsyncWrite for MockDevice {
491 fn poll_write(
492 self: Pin<&mut Self>,
493 _cx: &mut Context<'_>,
494 buf: &[u8],
495 ) -> Poll<std::io::Result<usize>> {
496 Poll::Ready(Ok(buf.len()))
497 }
498 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
499 Poll::Ready(Ok(()))
500 }
501 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
502 Poll::Ready(Ok(()))
503 }
504 }
505
506 impl SerialDevice for MockDevice {
507 fn set_baud_rate(&mut self, baud: u32) -> Result<()> {
508 if self.fail_baud {
509 self.fail_baud = false;
510 return Err(Error::InvalidConfig("mock: baud fail".into()));
511 }
512 self.config.baud_rate = baud;
513 Ok(())
514 }
515 fn set_data_bits(&mut self, bits: DataBits) -> Result<()> {
516 if self.fail_data_bits {
517 self.fail_data_bits = false;
518 return Err(Error::InvalidConfig("mock: data_bits fail".into()));
519 }
520 self.config.data_bits = bits;
521 Ok(())
522 }
523 fn set_stop_bits(&mut self, bits: StopBits) -> Result<()> {
524 if self.fail_stop_bits {
525 self.fail_stop_bits = false;
526 return Err(Error::InvalidConfig("mock: stop_bits fail".into()));
527 }
528 self.config.stop_bits = bits;
529 Ok(())
530 }
531 fn set_parity(&mut self, parity: Parity) -> Result<()> {
532 if self.fail_parity {
533 self.fail_parity = false;
534 return Err(Error::InvalidConfig("mock: parity fail".into()));
535 }
536 self.config.parity = parity;
537 Ok(())
538 }
539 fn set_flow_control(&mut self, flow: FlowControl) -> Result<()> {
540 if self.fail_flow {
541 self.fail_flow = false;
542 return Err(Error::InvalidConfig("mock: flow fail".into()));
543 }
544 self.config.flow_control = flow;
545 Ok(())
546 }
547 fn set_dtr(&mut self, _level: bool) -> Result<()> {
548 Ok(())
549 }
550 fn set_rts(&mut self, _level: bool) -> Result<()> {
551 Ok(())
552 }
553 fn send_break(&mut self, _duration: Duration) -> Result<()> {
554 Ok(())
555 }
556 fn modem_status(&mut self) -> Result<ModemStatus> {
557 Ok(ModemStatus::default())
558 }
559 fn config(&self) -> &SerialConfig {
560 &self.config
561 }
562 }
563
564 fn new_cfg() -> SerialConfig {
565 SerialConfig {
566 baud_rate: 9600,
567 data_bits: DataBits::Seven,
568 stop_bits: StopBits::Two,
569 parity: Parity::Even,
570 flow_control: FlowControl::Hardware,
571 ..SerialConfig::default()
572 }
573 }
574
575 #[tokio::test]
576 async fn apply_config_success_publishes_config_changed() {
577 let device = MockDevice::new(SerialConfig::default());
578 let mut session = Session::new(device);
579 let mut rx = session.bus().subscribe();
580
581 let target = new_cfg();
582 session
583 .apply_config(target)
584 .await
585 .expect("apply_config should succeed");
586
587 let got = session.device.config();
589 assert_eq!(got.baud_rate, target.baud_rate);
590 assert_eq!(got.data_bits, target.data_bits);
591 assert_eq!(got.stop_bits, target.stop_bits);
592 assert_eq!(got.parity, target.parity);
593 assert_eq!(got.flow_control, target.flow_control);
594
595 match rx.try_recv() {
597 Ok(Event::ConfigChanged(cfg)) => {
598 assert_eq!(cfg.baud_rate, target.baud_rate);
599 assert_eq!(cfg.flow_control, target.flow_control);
600 }
601 other => panic!("expected ConfigChanged, got {other:?}"),
602 }
603 }
604
605 #[tokio::test]
606 async fn apply_config_rolls_back_on_middle_failure() {
607 let mut device = MockDevice::new(SerialConfig::default());
609 device.fail_flow = true;
610 let initial = *device.config();
611
612 let mut session = Session::new(device);
613 let mut rx = session.bus().subscribe();
614
615 let target = new_cfg();
616 let err = session
617 .apply_config(target)
618 .await
619 .expect_err("apply_config must fail when flow setter errors");
620 assert!(matches!(err, Error::InvalidConfig(_)));
621
622 let got = session.device.config();
624 assert_eq!(got.baud_rate, initial.baud_rate);
625 assert_eq!(got.data_bits, initial.data_bits);
626 assert_eq!(got.stop_bits, initial.stop_bits);
627 assert_eq!(got.parity, initial.parity);
628 assert_eq!(got.flow_control, initial.flow_control);
629
630 match rx.try_recv() {
632 Err(TryRecvError::Empty) => {}
633 Ok(Event::ConfigChanged(_)) => panic!("unexpected ConfigChanged after rollback"),
634 other => panic!("unexpected bus state: {other:?}"),
635 }
636 }
637
638 #[tokio::test]
639 async fn apply_config_command_dispatches_through_session() {
640 let device = MockDevice::new(SerialConfig::default());
641 let mut session = Session::new(device);
642 let mut rx = session.bus().subscribe();
643
644 let target = SerialConfig {
645 baud_rate: 9600,
646 ..SerialConfig::default()
647 };
648 session.dispatch_command(Command::ApplyConfig(target)).await;
649
650 let ev = rx.try_recv().expect("ConfigChanged should be on the bus");
651 match ev {
652 Event::ConfigChanged(cfg) => assert_eq!(cfg.baud_rate, 9600),
653 other => panic!("expected ConfigChanged, got {other:?}"),
654 }
655 }
656
657 #[tokio::test]
658 async fn apply_config_command_on_failure_publishes_error() {
659 let mut device = MockDevice::new(SerialConfig::default());
660 device.fail_baud = true;
661 let mut session = Session::new(device);
662 let mut rx = session.bus().subscribe();
663
664 let target = SerialConfig {
665 baud_rate: 9600,
666 ..SerialConfig::default()
667 };
668 session.dispatch_command(Command::ApplyConfig(target)).await;
669
670 match rx.try_recv() {
671 Ok(Event::Error(_)) => {}
672 other => panic!("expected Error, got {other:?}"),
673 }
674 }
675
676 #[tokio::test]
677 async fn set_dtr_abs_publishes_modem_lines_changed() {
678 let device = MockDevice::new(SerialConfig::default());
679 let mut session = Session::new(device);
680 let mut rx = session.bus().subscribe();
681
682 session.dispatch_command(Command::SetDtrAbs(true)).await;
683
684 match rx.recv().await.unwrap() {
686 Event::SystemMessage(_) => {}
687 other => panic!("expected SystemMessage, got {other:?}"),
688 }
689 match rx.recv().await.unwrap() {
690 Event::ModemLinesChanged { dtr, rts } => {
691 assert!(dtr);
692 assert!(rts);
694 }
695 other => panic!("expected ModemLinesChanged, got {other:?}"),
696 }
697 }
698
699 #[tokio::test]
700 async fn set_rts_abs_publishes_modem_lines_changed() {
701 let device = MockDevice::new(SerialConfig::default());
702 let mut session = Session::new(device);
703 let mut rx = session.bus().subscribe();
704
705 session.dispatch_command(Command::SetRtsAbs(false)).await;
706
707 let _ = rx.recv().await; match rx.recv().await.unwrap() {
709 Event::ModemLinesChanged { dtr, rts } => {
710 assert!(dtr);
711 assert!(!rts);
712 }
713 other => panic!("expected ModemLinesChanged, got {other:?}"),
714 }
715 }
716
717 #[tokio::test]
718 async fn toggle_dtr_now_also_publishes_modem_lines_changed() {
719 let device = MockDevice::new(SerialConfig::default());
720 let mut session = Session::new(device);
721 let mut rx = session.bus().subscribe();
722
723 session.dispatch_command(Command::ToggleDtr).await;
724
725 let _ = rx.recv().await; match rx.recv().await.unwrap() {
727 Event::ModemLinesChanged { dtr, rts } => {
728 assert!(!dtr);
730 assert!(rts);
731 }
732 other => panic!("expected ModemLinesChanged, got {other:?}"),
733 }
734 }
735
736 #[tokio::test]
737 async fn toggle_rts_now_also_publishes_modem_lines_changed() {
738 let device = MockDevice::new(SerialConfig::default());
739 let mut session = Session::new(device);
740 let mut rx = session.bus().subscribe();
741
742 session.dispatch_command(Command::ToggleRts).await;
743
744 let _ = rx.recv().await; match rx.recv().await.unwrap() {
746 Event::ModemLinesChanged { dtr, rts } => {
747 assert!(dtr);
748 assert!(!rts);
749 }
750 other => panic!("expected ModemLinesChanged, got {other:?}"),
751 }
752 }
753
754 #[tokio::test]
755 async fn apply_config_rolls_back_on_first_step_failure() {
756 let mut device = MockDevice::new(SerialConfig::default());
758 device.fail_baud = true;
759 let initial = *device.config();
760
761 let mut session = Session::new(device);
762 let mut rx = session.bus().subscribe();
763
764 let target = new_cfg();
765 let err = session
766 .apply_config(target)
767 .await
768 .expect_err("apply_config must fail when baud setter errors");
769 assert!(matches!(err, Error::InvalidConfig(_)));
770
771 let got = session.device.config();
774 assert_eq!(got, &initial);
775
776 match rx.try_recv() {
778 Err(TryRecvError::Empty) => {}
779 Ok(Event::ConfigChanged(_)) => panic!("unexpected ConfigChanged after rollback"),
780 other => panic!("unexpected bus state: {other:?}"),
781 }
782 }
783}