shell_async/
shell_async.rs

1/*
2 *   Copyright (c) 2024 R3BL LLC
3 *   All rights reserved.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *   http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18//! This program uses the `r3bl_terminal_async` crate to provide a prompt and get user
19//! input, pass that to the `stdin` of a `bash` child process, and then display the output
20//! from the child process in the terminal. The followings steps outline what the program
21//! does:
22//!
23//! # YouTube video of live coding this example
24//!
25//! Please watch the following video to see how this example was created.
26//! - [Build with Naz : Create an async shell in Rust](https://youtu.be/jXzFCDIJQag)
27//! - [YouTube channel](https://www.youtube.com/@developerlifecom?sub_confirmation=1)
28//!
29//! # Create some shared global variables
30//!
31//! - A broadcast channel to signal shutdown to the child process, and all the spawned
32//!   tasks.
33//! - [r3bl_terminal_async::TerminalAsync] to write to the terminal. This provides the
34//!   mechanism to collect user input and display output.
35//! - [tokio::process::Child] to spawn the child process (`bash`) and interact with it.
36//!   This child process lives as long as the `main` function and exits when the user
37//!   chooses to exit the program.
38//!   - The [tokio::process::Command] starts `bash`.
39//!   - Both `stdin` and `stdout` are piped using [std::process::Stdio::piped].
40//!
41//! # ๐Ÿงต The main event loop simply waits for the following (on the current thread)
42//!
43//! - Start a main event loop (on the current thread):
44//!   - The shutdown signal from the broadcast channel, and monitors the
45//!     [r3bl_terminal_async::TerminalAsync] for user input. It writes the user input to the
46//!     [tokio::process::ChildStdin].
47//!   - Any exit inputs (user types "exit" or "Ctrl+D") from the user are captured here and
48//!     sent to the shutdown broadcast channel. It also listens to the broadcast channel to
49//!     break out of the loop on shutdown.
50//!   - It [tokio::process::Child::kill]s the child process when it gets the exit signal.
51//!   - It does not monitor the terminal for user input or the child process for output.
52//!
53//! # ๐Ÿš€ Spawn a new task to loop and read the output from the child process and display it
54//!
55//! - Spawn a task to loop:
56//!   - Read the [tokio::process::ChildStdout] and write it to the
57//!     [r3bl_terminal_async::SharedWriter].
58//!   - Also listen to the broadcast channel to break out of the loop on shutdown.
59//!
60//! # Run the binary
61//!
62//! ```text
63//! โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
64//! โ”‚ > cargo run --example shell_async โ”‚
65//! โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
66//! ```
67//!
68//! Type the following commands to have a go at this.
69//!
70//! ```text
71//! msg="hello nadia!"
72//! echo $msg
73//! ```
74//!
75//! You should see something like the following.
76//!
77//! ```text
78//! [1606192] > msg="hello nadia!"
79//! [1606192] > echo $msg
80//! hello nadia!
81//! [1606192] >
82//! ```
83//!
84//! # Clean up any left over processes
85//! ```text
86//! โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
87//! โ”‚ > killall -9 bash shell_async โ”‚
88//! โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
89//! ```
90//! This program uses the `r3bl_terminal_async` crate to provide a prompt and get user
91//! input, pass that to the `stdin` of a `bash` child process, and then display the output
92//! from the child process in the terminal.
93
94use std::io::Write as _;
95
96use crossterm::style::Stylize as _;
97use miette::IntoDiagnostic as _;
98use r3bl_core::{ok, SharedWriter};
99use r3bl_terminal_async::{ReadlineEvent,
100                          ReadlineEvent::{Eof, Interrupted, Line, Resized},
101                          TerminalAsync};
102use tokio::io::{AsyncBufReadExt as _, AsyncWriteExt as _};
103
104#[tokio::main]
105#[allow(clippy::needless_return)]
106async fn main() -> miette::Result<()> {
107    // Create a broadcast channel for shutdown.
108    let (shutdown_sender, _) = tokio::sync::broadcast::channel::<()>(1);
109
110    // Create a long-running `bash` child process using tokio::process::Command.
111    let child_process_constructor::ChildProcessHandle {
112        pid,
113        child,
114        stdin,
115        stdout,
116        stderr,
117    } = child_process_constructor::new("bash")?;
118
119    // Create a `r3bl_terminal_async` instance.
120    let terminal_async_constructor::TerminalAsyncHandle {
121        terminal_async,
122        shared_writer,
123    } = terminal_async_constructor::new(pid).await?;
124
125    // Create 2 tasks, join on them:
126    // 1. monitor the output from the child process.
127    // 2. monitor the input from the user (and relay it to the child process).
128    _ = tokio::join!(
129        // New green thread.
130        monitor_child_output::spawn(
131            stdout,
132            stderr,
133            shared_writer.clone(),
134            shutdown_sender.clone()
135        ),
136        // Current thread.
137        monitor_user_input_and_send_to_child::start_event_loop(
138            stdin,
139            terminal_async,
140            child,
141            shutdown_sender.clone()
142        )
143    );
144
145    ok!()
146}
147
148pub mod monitor_user_input_and_send_to_child {
149    use super::*;
150
151    /// Determine the control flow of the program based on the [ReadlineEvent] received
152    /// from user input.
153    enum ControlFlow {
154        ShutdownKillChild,
155        ProcessLine(String),
156        Resized,
157    }
158
159    /// Convert a [miette::Result<ReadlineEvent>] to a [ControlFlow]. This leverages the
160    /// type system to make it simpler to reason about what to do with the user input.
161    impl From<miette::Result<ReadlineEvent>> for ControlFlow {
162        fn from(result_readline_event: miette::Result<ReadlineEvent>) -> Self {
163            match result_readline_event {
164                Ok(readline_event) => match readline_event {
165                    Line(input) => {
166                        let input = input.trim().to_string();
167                        if input == "exit" {
168                            ControlFlow::ShutdownKillChild
169                        } else {
170                            ControlFlow::ProcessLine(input)
171                        }
172                    }
173                    Eof | Interrupted => ControlFlow::ShutdownKillChild,
174                    Resized => ControlFlow::Resized,
175                },
176                _ => ControlFlow::ShutdownKillChild,
177            }
178        }
179    }
180
181    pub async fn start_event_loop(
182        mut stdin: tokio::process::ChildStdin,
183        mut terminal_async: TerminalAsync,
184        mut child: tokio::process::Child,
185        shutdown_sender: tokio::sync::broadcast::Sender<()>,
186    ) {
187        let mut shutdown_receiver = shutdown_sender.subscribe();
188
189        loop {
190            tokio::select! {
191                // Branch: Monitor shutdown signal. This is cancel safe as `recv()` is
192                // cancel safe.
193                _ = shutdown_receiver.recv() => {
194                    break;
195                }
196
197                // Branch: Monitor terminal_async for user input. This is cancel safe as
198                // `get_readline_event()` is cancel safe.
199                result_readline_event = terminal_async.get_readline_event() => {
200                    match ControlFlow::from(result_readline_event) {
201                        ControlFlow::ShutdownKillChild => {
202                            _ = child.kill().await;
203                            _= shutdown_sender.send(());
204                            break;
205                        }
206                        ControlFlow::ProcessLine(input) => {
207                            let input = format!("{}\n", input);
208                            _ = stdin.write_all(input.as_bytes()).await;
209                            _ = stdin.flush().await;
210                        }
211                        ControlFlow::Resized => {}
212                    }
213                }
214            }
215        }
216    }
217}
218
219pub mod monitor_child_output {
220    use super::*;
221
222    pub async fn spawn(
223        stdout: tokio::process::ChildStdout,
224        stderr: tokio::process::ChildStderr,
225        mut shared_writer: SharedWriter,
226        shutdown_sender: tokio::sync::broadcast::Sender<()>,
227    ) -> tokio::task::JoinHandle<()> {
228        let mut stdout_lines = tokio::io::BufReader::new(stdout).lines();
229        let mut stderr_lines = tokio::io::BufReader::new(stderr).lines();
230        let mut shutdown_receiver = shutdown_sender.subscribe();
231
232        tokio::spawn(async move {
233            loop {
234                // Branch: Monitor shutdown signal. This is cancel safe as `recv()` is
235                // cancel safe.
236                tokio::select! {
237                    _ = shutdown_receiver.recv() => {
238                        break;
239                    }
240
241                    // Branch: Monitor stdout for output from the child process. This is
242                    // cancel safe as `next_line()` is cancel safe.
243                    result_line = stdout_lines.next_line() => {
244                        match result_line {
245                            Ok(Some(line)) => {
246                                let line = line.to_string().green();
247                                _ = writeln!(shared_writer, "{}", line);
248                            },
249                            _ => {
250                                _ = shutdown_sender.send(());
251                                break;
252                            }
253                        }
254                    }
255
256                    // Branch: Monitor stderr for output from the child process. This is
257                    // cancel safe as `next_line()` is cancel safe.
258                    result_line = stderr_lines.next_line() => {
259                        match result_line {
260                            Ok(Some(line)) => {
261                                let line = line.to_string().red();
262                                _ = writeln!(shared_writer, "{}", line);
263                            }
264                            _ => {
265                                _= shutdown_sender.send(());
266                                break;
267                            }
268                        }
269                    },
270                }
271            }
272        })
273    }
274}
275
276pub mod terminal_async_constructor {
277    use super::*;
278
279    pub struct TerminalAsyncHandle {
280        pub terminal_async: TerminalAsync,
281        pub shared_writer: SharedWriter,
282    }
283
284    pub async fn new(pid: u32) -> miette::Result<TerminalAsyncHandle> {
285        let prompt = {
286            let prompt_seg_1 = "โ•ญ".magenta().on_dark_grey().to_string();
287            let prompt_seg_2 = format!("โ”ค{pid}โ”œ").magenta().on_dark_grey().to_string();
288            let prompt_seg_3 = "โ•ฎ".magenta().on_dark_grey().to_string();
289            format!("{}{}{} ", prompt_seg_1, prompt_seg_2, prompt_seg_3)
290        };
291
292        let Ok(Some(terminal_async)) = TerminalAsync::try_new(prompt.as_str()).await
293        else {
294            miette::bail!("Failed to create TerminalAsync instance");
295        };
296
297        let shared_writer = terminal_async.clone_shared_writer();
298
299        ok!(TerminalAsyncHandle {
300            terminal_async,
301            shared_writer
302        })
303    }
304}
305
306pub mod child_process_constructor {
307    use super::*;
308
309    pub struct ChildProcessHandle {
310        pub stdin: tokio::process::ChildStdin,
311        pub stdout: tokio::process::ChildStdout,
312        pub stderr: tokio::process::ChildStderr,
313        pub pid: u32,
314        pub child: tokio::process::Child,
315    }
316
317    pub fn new(program: &str) -> miette::Result<ChildProcessHandle> {
318        let mut child: tokio::process::Child = tokio::process::Command::new(program)
319            .stdin(std::process::Stdio::piped())
320            .stdout(std::process::Stdio::piped())
321            .stderr(std::process::Stdio::piped())
322            .spawn()
323            .into_diagnostic()?;
324
325        let stdout: tokio::process::ChildStdout = child
326            .stdout
327            .take()
328            .ok_or_else(|| miette::miette!("Failed to open stdout of child process"))?;
329
330        let stdin: tokio::process::ChildStdin = child
331            .stdin
332            .take()
333            .ok_or_else(|| miette::miette!("Failed to open stdin of child process"))?;
334
335        let stderr: tokio::process::ChildStderr = child
336            .stderr
337            .take()
338            .ok_or_else(|| miette::miette!("Failed to open stderr of child process"))?;
339
340        let pid = child
341            .id()
342            .ok_or_else(|| miette::miette!("Failed to get PID of child process"))?;
343
344        ok!(ChildProcessHandle {
345            pid,
346            child,
347            stdin,
348            stdout,
349            stderr,
350        })
351    }
352}