use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
use crate::command::Command;
use crate::config::{Parity, StopBits};
use crate::device::SerialDevice;
use crate::event::{Event, EventBus};
use crate::mapper::{LineEndingMapper, Mapper};
const fn parity_letter(p: Parity) -> char {
match p {
Parity::None => 'N',
Parity::Even => 'E',
Parity::Odd => 'O',
Parity::Mark => 'M',
Parity::Space => 'S',
}
}
const fn stop_bits_number(s: StopBits) -> u8 {
match s {
StopBits::One => 1,
StopBits::Two => 2,
}
}
const READ_BUFFER_BYTES: usize = 4096;
const SEND_BREAK_DURATION: Duration = Duration::from_millis(250);
const HELP_TEXT: &str = "commands: ?/h help, q/x quit, c show config, t toggle DTR, \
g toggle RTS, b<rate><Enter> set baud, \\ send break";
pub struct Session<D: SerialDevice + 'static> {
device: D,
bus: EventBus,
cancel: CancellationToken,
omap: Box<dyn Mapper>,
imap: Box<dyn Mapper>,
dtr_asserted: bool,
rts_asserted: bool,
}
impl<D: SerialDevice + 'static> Session<D> {
#[must_use]
pub fn new(device: D) -> Self {
Self {
device,
bus: EventBus::default(),
cancel: CancellationToken::new(),
omap: Box::new(LineEndingMapper::default()),
imap: Box::new(LineEndingMapper::default()),
dtr_asserted: true,
rts_asserted: true,
}
}
#[must_use]
pub fn with_bus(device: D, bus: EventBus) -> Self {
Self {
device,
bus,
cancel: CancellationToken::new(),
omap: Box::new(LineEndingMapper::default()),
imap: Box::new(LineEndingMapper::default()),
dtr_asserted: true,
rts_asserted: true,
}
}
#[must_use]
pub fn with_omap<M: Mapper + 'static>(mut self, mapper: M) -> Self {
self.omap = Box::new(mapper);
self
}
#[must_use]
pub fn with_imap<M: Mapper + 'static>(mut self, mapper: M) -> Self {
self.imap = Box::new(mapper);
self
}
#[must_use]
pub const fn with_initial_dtr(mut self, asserted: bool) -> Self {
self.dtr_asserted = asserted;
self
}
#[must_use]
pub const fn with_initial_rts(mut self, asserted: bool) -> Self {
self.rts_asserted = asserted;
self
}
#[must_use]
pub const fn bus(&self) -> &EventBus {
&self.bus
}
#[must_use]
pub fn cancellation_token(&self) -> CancellationToken {
self.cancel.clone()
}
pub async fn run(mut self) -> crate::Result<()> {
let mut subscriber = self.bus.subscribe();
self.bus.publish(Event::DeviceConnected);
let mut read_buf = vec![0_u8; READ_BUFFER_BYTES];
loop {
tokio::select! {
biased;
() = self.cancel.cancelled() => break,
res = self.device.read(&mut read_buf) => match res {
Ok(0) => {
self.bus.publish(Event::DeviceDisconnected {
reason: "EOF on serial read".into(),
});
break;
}
Ok(n) => {
let mapped = self.imap.map(&read_buf[..n]);
self.bus.publish(Event::RxBytes(mapped));
}
Err(err) => {
self.bus.publish(Event::DeviceDisconnected {
reason: format!("serial read failed: {err}"),
});
break;
}
},
msg = subscriber.recv() => match msg {
Ok(Event::TxBytes(bytes)) => {
let mapped = self.omap.map(&bytes);
if let Err(err) = self.device.write_all(&mapped).await {
self.bus.publish(Event::DeviceDisconnected {
reason: format!("serial write failed: {err}"),
});
break;
}
}
Ok(Event::Command(cmd)) => self.dispatch_command(cmd),
Ok(_) | Err(broadcast::error::RecvError::Lagged(_)) => {}
Err(broadcast::error::RecvError::Closed) => break,
},
}
}
Ok(())
}
fn dispatch_command(&mut self, cmd: Command) {
match cmd {
Command::Quit => self.cancel.cancel(),
Command::Help => {
self.bus.publish(Event::SystemMessage(HELP_TEXT.into()));
}
Command::ShowConfig => {
let cfg = self.device.config();
self.bus.publish(Event::SystemMessage(format!(
"config: {} {}{}{} flow={:?}",
cfg.baud_rate,
cfg.data_bits.bits(),
parity_letter(cfg.parity),
stop_bits_number(cfg.stop_bits),
cfg.flow_control,
)));
}
Command::SetBaud(rate) => match self.device.set_baud_rate(rate) {
Ok(()) => {
self.bus
.publish(Event::ConfigChanged(*self.device.config()));
}
Err(err) => {
self.bus.publish(Event::Error(Arc::new(err)));
}
},
Command::ToggleDtr => {
let new_state = !self.dtr_asserted;
match self.device.set_dtr(new_state) {
Ok(()) => {
self.dtr_asserted = new_state;
self.bus.publish(Event::SystemMessage(format!(
"DTR: {}",
if new_state { "asserted" } else { "deasserted" }
)));
}
Err(err) => {
self.bus.publish(Event::Error(Arc::new(err)));
}
}
}
Command::ToggleRts => {
let new_state = !self.rts_asserted;
match self.device.set_rts(new_state) {
Ok(()) => {
self.rts_asserted = new_state;
self.bus.publish(Event::SystemMessage(format!(
"RTS: {}",
if new_state { "asserted" } else { "deasserted" }
)));
}
Err(err) => {
self.bus.publish(Event::Error(Arc::new(err)));
}
}
}
Command::SendBreak => match self.device.send_break(SEND_BREAK_DURATION) {
Ok(()) => {
self.bus.publish(Event::SystemMessage(format!(
"sent {} ms break",
SEND_BREAK_DURATION.as_millis()
)));
}
Err(err) => {
self.bus.publish(Event::Error(Arc::new(err)));
}
},
}
}
}