rsigma_runtime/io/stdin.rs
1use std::io::BufRead;
2
3use tokio::sync::mpsc;
4
5use super::{AckToken, EventSource, RawEvent};
6
7/// Reads events from stdin, one per line.
8///
9/// Reading happens on a dedicated OS thread (`std::thread`) that forwards
10/// each line over a bounded channel, rather than via `tokio::io::stdin()`.
11/// `tokio::io::stdin()` is implemented with an ordinary blocking read on a
12/// runtime-managed thread that cannot be cancelled; when no input is pending,
13/// that read parks until the next line or EOF and the Tokio runtime waits for
14/// it during shutdown. That makes the daemon hang on Ctrl+C / SIGTERM until
15/// more input arrives (see the `tokio::io::stdin` docs). Because a plain
16/// `std::thread` is not a runtime blocking task, the runtime does not wait for
17/// it at shutdown, so the daemon exits promptly even with an idle stdin.
18pub struct StdinSource {
19 rx: mpsc::Receiver<String>,
20}
21
22impl Default for StdinSource {
23 fn default() -> Self {
24 Self::new()
25 }
26}
27
28impl StdinSource {
29 pub fn new() -> Self {
30 // Bounded so a slow pipeline back-pressures the reader thread (which
31 // then stops draining the OS stdin buffer) instead of buffering
32 // unboundedly.
33 let (tx, rx) = mpsc::channel(1024);
34 std::thread::Builder::new()
35 .name("rsigma-stdin".to_string())
36 .spawn(move || {
37 let stdin = std::io::stdin();
38 for line in stdin.lock().lines() {
39 match line {
40 // `blocking_send` only errors when the receiver is
41 // dropped, i.e. the daemon is shutting down; stop
42 // reading so the thread can exit.
43 Ok(line) => {
44 if tx.blocking_send(line).is_err() {
45 break;
46 }
47 }
48 Err(e) => {
49 tracing::warn!(error = %e, "Error reading stdin");
50 break;
51 }
52 }
53 }
54 })
55 .expect("failed to spawn stdin reader thread");
56 StdinSource { rx }
57 }
58}
59
60impl EventSource for StdinSource {
61 async fn recv(&mut self) -> Option<RawEvent> {
62 loop {
63 match self.rx.recv().await {
64 Some(line) if !line.trim().is_empty() => {
65 return Some(RawEvent {
66 payload: line,
67 ack_token: AckToken::Noop,
68 });
69 }
70 Some(_) => continue,
71 None => return None,
72 }
73 }
74 }
75}