async_stdin/
lib.rs

1//! Read from stdin over a Tokio channel
2//!
3//! This is useful for interactive programs that read from
4//! stdin while waiting for other events to occur.
5//!
6//! # Examples
7//!
8//! ```no_run
9//! use async_stdin::recv_from_stdin;
10//!
11//! #[tokio::main]
12//! async fn main() {
13//!    let mut rx = recv_from_stdin(10);
14//!    while let Some(s) = rx.recv().await {
15//!       println!("Received: {}", s);
16//!   }
17//! }
18//! ```
19use std::io::{stdin, BufRead, BufReader};
20use tokio::sync::mpsc;
21
22/// Returns a [`mpsc::Receiver`] that contains the input from [`stdin`]
23///
24/// This is accomplished by spawning a thread which continuously
25/// blocks on reading from [`stdin`] and sends input via [`mpsc::Sender`]
26///
27/// # Examples
28///
29/// ```no_run
30/// use async_stdin::recv_from_stdin;
31///
32/// #[tokio::main]
33/// async fn main() {
34///    let mut rx = recv_from_stdin(10);
35///    while let Some(s) = rx.recv().await {
36///       println!("Received: {}", s);
37///   }
38/// }
39/// ```
40pub fn recv_from_stdin(buffer_size: usize) -> mpsc::Receiver<String> {
41    let (tx, rx) = mpsc::channel::<String>(buffer_size);
42    let stdin = BufReader::new(stdin());
43    std::thread::spawn(move || read_loop(stdin, tx));
44    rx
45}
46
47fn read_loop<R>(reader: R, tx: mpsc::Sender<String>)
48where
49    R: BufRead,
50{
51    let mut lines = reader.lines();
52    loop {
53        if let Some(Ok(line)) = lines.next() {
54            let _ = tx.blocking_send(line);
55        }
56    }
57}
58
59#[cfg(test)]
60mod tests {
61    #[tokio::test]
62    async fn test_blocking_read() {
63        let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(10);
64
65        // Send input to
66        let reader = std::io::BufReader::new("hello".as_bytes());
67        std::thread::spawn(move || super::read_loop(reader, tx));
68
69        let s = rx.recv().await.unwrap();
70        assert_eq!(s, "hello");
71    }
72}