Skip to main content

rsigma_runtime/io/
stdin.rs

1use tokio::io::{AsyncBufReadExt, BufReader};
2
3use super::{AckToken, EventSource, RawEvent};
4
5/// Reads events from stdin, one per line.
6///
7/// Uses `tokio::io::stdin()` with `AsyncBufReadExt::lines()` for fully async
8/// reading without a background blocking thread.
9pub struct StdinSource {
10    lines: tokio::io::Lines<BufReader<tokio::io::Stdin>>,
11}
12
13impl Default for StdinSource {
14    fn default() -> Self {
15        Self::new()
16    }
17}
18
19impl StdinSource {
20    pub fn new() -> Self {
21        let stdin = tokio::io::stdin();
22        let lines = BufReader::new(stdin).lines();
23        StdinSource { lines }
24    }
25}
26
27impl EventSource for StdinSource {
28    async fn recv(&mut self) -> Option<RawEvent> {
29        loop {
30            match self.lines.next_line().await {
31                Ok(Some(line)) if !line.trim().is_empty() => {
32                    return Some(RawEvent {
33                        payload: line,
34                        ack_token: AckToken::Noop,
35                    });
36                }
37                Ok(Some(_)) => continue,
38                Ok(None) => return None,
39                Err(e) => {
40                    tracing::warn!(error = %e, "Error reading stdin");
41                    return None;
42                }
43            }
44        }
45    }
46}