Skip to main content

rsigma_runtime/io/
stdin.rs

1use std::io::BufRead;
2
3use tokio::sync::mpsc;
4
5use super::{AckToken, EventSource, RawEvent};
6
7/// Reads events from stdin, one per line.
8///
9/// Reading happens on a dedicated OS thread (`std::thread`) that forwards
10/// each line over a bounded channel, rather than via `tokio::io::stdin()`.
11/// `tokio::io::stdin()` is implemented with an ordinary blocking read on a
12/// runtime-managed thread that cannot be cancelled; when no input is pending,
13/// that read parks until the next line or EOF and the Tokio runtime waits for
14/// it during shutdown. That makes the daemon hang on Ctrl+C / SIGTERM until
15/// more input arrives (see the `tokio::io::stdin` docs). Because a plain
16/// `std::thread` is not a runtime blocking task, the runtime does not wait for
17/// it at shutdown, so the daemon exits promptly even with an idle stdin.
18pub struct StdinSource {
19    rx: mpsc::Receiver<String>,
20}
21
22impl Default for StdinSource {
23    fn default() -> Self {
24        Self::new()
25    }
26}
27
28impl StdinSource {
29    pub fn new() -> Self {
30        // Bounded so a slow pipeline back-pressures the reader thread (which
31        // then stops draining the OS stdin buffer) instead of buffering
32        // unboundedly.
33        let (tx, rx) = mpsc::channel(1024);
34        std::thread::Builder::new()
35            .name("rsigma-stdin".to_string())
36            .spawn(move || {
37                let stdin = std::io::stdin();
38                for line in stdin.lock().lines() {
39                    match line {
40                        // `blocking_send` only errors when the receiver is
41                        // dropped, i.e. the daemon is shutting down; stop
42                        // reading so the thread can exit.
43                        Ok(line) => {
44                            if tx.blocking_send(line).is_err() {
45                                break;
46                            }
47                        }
48                        Err(e) => {
49                            tracing::warn!(error = %e, "Error reading stdin");
50                            break;
51                        }
52                    }
53                }
54            })
55            .expect("failed to spawn stdin reader thread");
56        StdinSource { rx }
57    }
58}
59
60impl EventSource for StdinSource {
61    async fn recv(&mut self) -> Option<RawEvent> {
62        loop {
63            match self.rx.recv().await {
64                Some(line) if !line.trim().is_empty() => {
65                    return Some(RawEvent {
66                        payload: line,
67                        ack_token: AckToken::Noop,
68                    });
69                }
70                Some(_) => continue,
71                None => return None,
72            }
73        }
74    }
75}