use bytes::Bytes;
use rtcom_core::{CommandKeyParser, Event, EventBus, ParseOutput};
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio_util::sync::CancellationToken;
#[allow(
dead_code,
reason = "used by run_stdin_reader; main wiring in issue #7"
)]
const READ_BUFFER_BYTES: usize = 256;
#[allow(dead_code, reason = "called by tests; main wiring lands in issue #7")]
pub async fn run_stdin_reader<R>(
mut reader: R,
bus: EventBus,
cancel: CancellationToken,
escape: u8,
) where
R: AsyncRead + Unpin,
{
let mut parser = CommandKeyParser::new(escape);
let mut read_buf = [0_u8; READ_BUFFER_BYTES];
loop {
tokio::select! {
biased;
() = cancel.cancelled() => break,
res = reader.read(&mut read_buf) => match res {
Ok(0) | Err(_) => break,
Ok(n) => {
for &byte in &read_buf[..n] {
match parser.feed(byte) {
ParseOutput::None => {}
ParseOutput::Data(b) => {
bus.publish(Event::TxBytes(Bytes::copy_from_slice(&[b])));
}
ParseOutput::Command(cmd) => {
bus.publish(Event::Command(cmd));
}
}
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use rtcom_core::{Command, Event};
use tokio::io::{duplex, AsyncWriteExt};
use tokio::time::timeout;
use super::*;
const ESC: u8 = 0x14; const STEP: Duration = Duration::from_millis(500);
async fn reader_with(bytes: &[u8]) -> tokio::io::DuplexStream {
let (mut writer, reader) = duplex(64);
writer.write_all(bytes).await.unwrap();
drop(writer);
reader
}
async fn next(rx: &mut tokio::sync::broadcast::Receiver<Event>) -> Event {
timeout(STEP, rx.recv())
.await
.expect("timed out waiting for event")
.expect("bus closed unexpectedly")
}
#[tokio::test]
async fn plain_bytes_become_tx_events() {
let bus = EventBus::default();
let mut rx = bus.subscribe();
let cancel = CancellationToken::new();
let reader = reader_with(b"hi").await;
let task = tokio::spawn(run_stdin_reader(reader, bus, cancel, ESC));
match next(&mut rx).await {
Event::TxBytes(b) => assert_eq!(&b[..], b"h"),
other => panic!("unexpected: {other:?}"),
}
match next(&mut rx).await {
Event::TxBytes(b) => assert_eq!(&b[..], b"i"),
other => panic!("unexpected: {other:?}"),
}
timeout(STEP, task).await.unwrap().unwrap();
}
#[tokio::test]
async fn escape_sequence_emits_command_event() {
let bus = EventBus::default();
let mut rx = bus.subscribe();
let cancel = CancellationToken::new();
let reader = reader_with(&[ESC, b'?']).await;
let task = tokio::spawn(run_stdin_reader(reader, bus, cancel, ESC));
match next(&mut rx).await {
Event::Command(Command::Help) => {}
other => panic!("unexpected: {other:?}"),
}
timeout(STEP, task).await.unwrap().unwrap();
}
#[tokio::test]
async fn baud_change_sequence_emits_set_baud() {
let bus = EventBus::default();
let mut rx = bus.subscribe();
let cancel = CancellationToken::new();
let mut input = vec![ESC, b'b'];
input.extend_from_slice(b"9600\r");
let reader = reader_with(&input).await;
let task = tokio::spawn(run_stdin_reader(reader, bus, cancel, ESC));
match next(&mut rx).await {
Event::Command(Command::SetBaud(9600)) => {}
other => panic!("unexpected: {other:?}"),
}
timeout(STEP, task).await.unwrap().unwrap();
}
#[tokio::test]
async fn unknown_command_byte_does_not_publish_anything_but_drains_stream() {
let bus = EventBus::default();
let mut rx = bus.subscribe();
let cancel = CancellationToken::new();
let reader = reader_with(&[ESC, b'z', b'a']).await;
let task = tokio::spawn(run_stdin_reader(reader, bus, cancel, ESC));
match next(&mut rx).await {
Event::TxBytes(b) => assert_eq!(&b[..], b"a"),
other => panic!("unexpected: {other:?}"),
}
timeout(STEP, task).await.unwrap().unwrap();
}
#[tokio::test]
async fn cancellation_stops_pending_read_promptly() {
let bus = EventBus::default();
let cancel = CancellationToken::new();
let (_writer, reader) = duplex(64);
let task = tokio::spawn(run_stdin_reader(reader, bus, cancel.clone(), ESC));
tokio::task::yield_now().await;
cancel.cancel();
timeout(STEP, task).await.unwrap().unwrap();
}
#[tokio::test]
async fn eof_terminates_task() {
let bus = EventBus::default();
let cancel = CancellationToken::new();
let reader = reader_with(b"").await;
let task = tokio::spawn(run_stdin_reader(reader, bus, cancel, ESC));
timeout(STEP, task).await.unwrap().unwrap();
}
}