shell_async/shell_async.rs
1/*
2 * Copyright (c) 2024 R3BL LLC
3 * All rights reserved.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18//! This program uses the `r3bl_terminal_async` crate to provide a prompt and get user
19//! input, pass that to the `stdin` of a `bash` child process, and then display the output
20//! from the child process in the terminal. The followings steps outline what the program
21//! does:
22//!
23//! # YouTube video of live coding this example
24//!
25//! Please watch the following video to see how this example was created.
26//! - [Build with Naz : Create an async shell in Rust](https://youtu.be/jXzFCDIJQag)
27//! - [YouTube channel](https://www.youtube.com/@developerlifecom?sub_confirmation=1)
28//!
29//! # Create some shared global variables
30//!
31//! - A broadcast channel to signal shutdown to the child process, and all the spawned
32//! tasks.
33//! - [r3bl_terminal_async::TerminalAsync] to write to the terminal. This provides the
34//! mechanism to collect user input and display output.
35//! - [tokio::process::Child] to spawn the child process (`bash`) and interact with it.
36//! This child process lives as long as the `main` function and exits when the user
37//! chooses to exit the program.
38//! - The [tokio::process::Command] starts `bash`.
39//! - Both `stdin` and `stdout` are piped using [std::process::Stdio::piped].
40//!
41//! # ๐งต The main event loop simply waits for the following (on the current thread)
42//!
43//! - Start a main event loop (on the current thread):
44//! - The shutdown signal from the broadcast channel, and monitors the
45//! [r3bl_terminal_async::TerminalAsync] for user input. It writes the user input to the
46//! [tokio::process::ChildStdin].
47//! - Any exit inputs (user types "exit" or "Ctrl+D") from the user are captured here and
48//! sent to the shutdown broadcast channel. It also listens to the broadcast channel to
49//! break out of the loop on shutdown.
50//! - It [tokio::process::Child::kill]s the child process when it gets the exit signal.
51//! - It does not monitor the terminal for user input or the child process for output.
52//!
53//! # ๐ Spawn a new task to loop and read the output from the child process and display it
54//!
55//! - Spawn a task to loop:
56//! - Read the [tokio::process::ChildStdout] and write it to the
57//! [r3bl_terminal_async::SharedWriter].
58//! - Also listen to the broadcast channel to break out of the loop on shutdown.
59//!
60//! # Run the binary
61//!
62//! ```text
63//! โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
64//! โ > cargo run --example shell_async โ
65//! โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
66//! ```
67//!
68//! Type the following commands to have a go at this.
69//!
70//! ```text
71//! msg="hello nadia!"
72//! echo $msg
73//! ```
74//!
75//! You should see something like the following.
76//!
77//! ```text
78//! [1606192] > msg="hello nadia!"
79//! [1606192] > echo $msg
80//! hello nadia!
81//! [1606192] >
82//! ```
83//!
84//! # Clean up any left over processes
85//! ```text
86//! โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
87//! โ > killall -9 bash shell_async โ
88//! โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
89//! ```
90//! This program uses the `r3bl_terminal_async` crate to provide a prompt and get user
91//! input, pass that to the `stdin` of a `bash` child process, and then display the output
92//! from the child process in the terminal.
93
94use std::io::Write as _;
95
96use crossterm::style::Stylize as _;
97use miette::IntoDiagnostic as _;
98use r3bl_core::{ok, SharedWriter};
99use r3bl_terminal_async::{ReadlineEvent,
100 ReadlineEvent::{Eof, Interrupted, Line, Resized},
101 TerminalAsync};
102use tokio::io::{AsyncBufReadExt as _, AsyncWriteExt as _};
103
104#[tokio::main]
105#[allow(clippy::needless_return)]
106async fn main() -> miette::Result<()> {
107 // Create a broadcast channel for shutdown.
108 let (shutdown_sender, _) = tokio::sync::broadcast::channel::<()>(1);
109
110 // Create a long-running `bash` child process using tokio::process::Command.
111 let child_process_constructor::ChildProcessHandle {
112 pid,
113 child,
114 stdin,
115 stdout,
116 stderr,
117 } = child_process_constructor::new("bash")?;
118
119 // Create a `r3bl_terminal_async` instance.
120 let terminal_async_constructor::TerminalAsyncHandle {
121 terminal_async,
122 shared_writer,
123 } = terminal_async_constructor::new(pid).await?;
124
125 // Create 2 tasks, join on them:
126 // 1. monitor the output from the child process.
127 // 2. monitor the input from the user (and relay it to the child process).
128 _ = tokio::join!(
129 // New green thread.
130 monitor_child_output::spawn(
131 stdout,
132 stderr,
133 shared_writer.clone(),
134 shutdown_sender.clone()
135 ),
136 // Current thread.
137 monitor_user_input_and_send_to_child::start_event_loop(
138 stdin,
139 terminal_async,
140 child,
141 shutdown_sender.clone()
142 )
143 );
144
145 ok!()
146}
147
148pub mod monitor_user_input_and_send_to_child {
149 use super::*;
150
151 /// Determine the control flow of the program based on the [ReadlineEvent] received
152 /// from user input.
153 enum ControlFlow {
154 ShutdownKillChild,
155 ProcessLine(String),
156 Resized,
157 }
158
159 /// Convert a [miette::Result<ReadlineEvent>] to a [ControlFlow]. This leverages the
160 /// type system to make it simpler to reason about what to do with the user input.
161 impl From<miette::Result<ReadlineEvent>> for ControlFlow {
162 fn from(result_readline_event: miette::Result<ReadlineEvent>) -> Self {
163 match result_readline_event {
164 Ok(readline_event) => match readline_event {
165 Line(input) => {
166 let input = input.trim().to_string();
167 if input == "exit" {
168 ControlFlow::ShutdownKillChild
169 } else {
170 ControlFlow::ProcessLine(input)
171 }
172 }
173 Eof | Interrupted => ControlFlow::ShutdownKillChild,
174 Resized => ControlFlow::Resized,
175 },
176 _ => ControlFlow::ShutdownKillChild,
177 }
178 }
179 }
180
181 pub async fn start_event_loop(
182 mut stdin: tokio::process::ChildStdin,
183 mut terminal_async: TerminalAsync,
184 mut child: tokio::process::Child,
185 shutdown_sender: tokio::sync::broadcast::Sender<()>,
186 ) {
187 let mut shutdown_receiver = shutdown_sender.subscribe();
188
189 loop {
190 tokio::select! {
191 // Branch: Monitor shutdown signal. This is cancel safe as `recv()` is
192 // cancel safe.
193 _ = shutdown_receiver.recv() => {
194 break;
195 }
196
197 // Branch: Monitor terminal_async for user input. This is cancel safe as
198 // `get_readline_event()` is cancel safe.
199 result_readline_event = terminal_async.get_readline_event() => {
200 match ControlFlow::from(result_readline_event) {
201 ControlFlow::ShutdownKillChild => {
202 _ = child.kill().await;
203 _= shutdown_sender.send(());
204 break;
205 }
206 ControlFlow::ProcessLine(input) => {
207 let input = format!("{}\n", input);
208 _ = stdin.write_all(input.as_bytes()).await;
209 _ = stdin.flush().await;
210 }
211 ControlFlow::Resized => {}
212 }
213 }
214 }
215 }
216 }
217}
218
219pub mod monitor_child_output {
220 use super::*;
221
222 pub async fn spawn(
223 stdout: tokio::process::ChildStdout,
224 stderr: tokio::process::ChildStderr,
225 mut shared_writer: SharedWriter,
226 shutdown_sender: tokio::sync::broadcast::Sender<()>,
227 ) -> tokio::task::JoinHandle<()> {
228 let mut stdout_lines = tokio::io::BufReader::new(stdout).lines();
229 let mut stderr_lines = tokio::io::BufReader::new(stderr).lines();
230 let mut shutdown_receiver = shutdown_sender.subscribe();
231
232 tokio::spawn(async move {
233 loop {
234 // Branch: Monitor shutdown signal. This is cancel safe as `recv()` is
235 // cancel safe.
236 tokio::select! {
237 _ = shutdown_receiver.recv() => {
238 break;
239 }
240
241 // Branch: Monitor stdout for output from the child process. This is
242 // cancel safe as `next_line()` is cancel safe.
243 result_line = stdout_lines.next_line() => {
244 match result_line {
245 Ok(Some(line)) => {
246 let line = line.to_string().green();
247 _ = writeln!(shared_writer, "{}", line);
248 },
249 _ => {
250 _ = shutdown_sender.send(());
251 break;
252 }
253 }
254 }
255
256 // Branch: Monitor stderr for output from the child process. This is
257 // cancel safe as `next_line()` is cancel safe.
258 result_line = stderr_lines.next_line() => {
259 match result_line {
260 Ok(Some(line)) => {
261 let line = line.to_string().red();
262 _ = writeln!(shared_writer, "{}", line);
263 }
264 _ => {
265 _= shutdown_sender.send(());
266 break;
267 }
268 }
269 },
270 }
271 }
272 })
273 }
274}
275
276pub mod terminal_async_constructor {
277 use super::*;
278
279 pub struct TerminalAsyncHandle {
280 pub terminal_async: TerminalAsync,
281 pub shared_writer: SharedWriter,
282 }
283
284 pub async fn new(pid: u32) -> miette::Result<TerminalAsyncHandle> {
285 let prompt = {
286 let prompt_seg_1 = "โญ".magenta().on_dark_grey().to_string();
287 let prompt_seg_2 = format!("โค{pid}โ").magenta().on_dark_grey().to_string();
288 let prompt_seg_3 = "โฎ".magenta().on_dark_grey().to_string();
289 format!("{}{}{} ", prompt_seg_1, prompt_seg_2, prompt_seg_3)
290 };
291
292 let Ok(Some(terminal_async)) = TerminalAsync::try_new(prompt.as_str()).await
293 else {
294 miette::bail!("Failed to create TerminalAsync instance");
295 };
296
297 let shared_writer = terminal_async.clone_shared_writer();
298
299 ok!(TerminalAsyncHandle {
300 terminal_async,
301 shared_writer
302 })
303 }
304}
305
306pub mod child_process_constructor {
307 use super::*;
308
309 pub struct ChildProcessHandle {
310 pub stdin: tokio::process::ChildStdin,
311 pub stdout: tokio::process::ChildStdout,
312 pub stderr: tokio::process::ChildStderr,
313 pub pid: u32,
314 pub child: tokio::process::Child,
315 }
316
317 pub fn new(program: &str) -> miette::Result<ChildProcessHandle> {
318 let mut child: tokio::process::Child = tokio::process::Command::new(program)
319 .stdin(std::process::Stdio::piped())
320 .stdout(std::process::Stdio::piped())
321 .stderr(std::process::Stdio::piped())
322 .spawn()
323 .into_diagnostic()?;
324
325 let stdout: tokio::process::ChildStdout = child
326 .stdout
327 .take()
328 .ok_or_else(|| miette::miette!("Failed to open stdout of child process"))?;
329
330 let stdin: tokio::process::ChildStdin = child
331 .stdin
332 .take()
333 .ok_or_else(|| miette::miette!("Failed to open stdin of child process"))?;
334
335 let stderr: tokio::process::ChildStderr = child
336 .stderr
337 .take()
338 .ok_or_else(|| miette::miette!("Failed to open stderr of child process"))?;
339
340 let pid = child
341 .id()
342 .ok_or_else(|| miette::miette!("Failed to get PID of child process"))?;
343
344 ok!(ChildProcessHandle {
345 pid,
346 child,
347 stdin,
348 stdout,
349 stderr,
350 })
351 }
352}