bamboo_tools/tools/
bash_runtime.rs1use bamboo_infrastructure::{
2 build_command_environment, decode_process_line_lossy, hide_window_for_tokio_command,
3 preferred_bash_shell, trace_windows_command, CommandEnvironmentDiagnostics,
4};
5use dashmap::DashMap;
6use regex::Regex;
7use std::path::Path;
8use std::process::Stdio;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::{Arc, OnceLock};
11use tokio::io::{AsyncBufReadExt, BufReader};
12use tokio::process::{Child, Command};
13use tokio::sync::Mutex;
14use tokio::time::{sleep, Duration};
15use tracing::warn;
16
17const MAX_OUTPUT_LINES: usize = 20_000;
18const COMPLETED_SESSION_TTL_SECS: u64 = 300;
19
20#[derive(Debug)]
21pub struct ShellSession {
22 pub id: String,
23 pub command: String,
24 pub environment: CommandEnvironmentDiagnostics,
25 child: Arc<Mutex<Child>>,
26 output: Arc<Mutex<Vec<String>>>,
27 base_index: Arc<Mutex<usize>>,
28 running: Arc<AtomicBool>,
29 exit_code: Arc<Mutex<Option<i32>>>,
30}
31
32impl ShellSession {
33 pub fn status(&self) -> &'static str {
34 if self.running.load(Ordering::Relaxed) {
35 "running"
36 } else {
37 "completed"
38 }
39 }
40
41 pub async fn exit_code(&self) -> Option<i32> {
42 *self.exit_code.lock().await
43 }
44
45 pub async fn read_output_since(
46 &self,
47 cursor: usize,
48 filter: Option<&Regex>,
49 ) -> (Vec<String>, usize, usize) {
50 let output = self.output.lock().await;
51 let base_index = self.base_index.lock().await;
52
53 let base = *base_index;
54 let effective_cursor = cursor.max(base);
55 let dropped_lines = effective_cursor.saturating_sub(cursor);
56 let start = effective_cursor.saturating_sub(base);
57 let new_lines = if start >= output.len() {
58 Vec::new()
59 } else {
60 output[start..]
61 .iter()
62 .filter(|line| filter.map(|re| re.is_match(line)).unwrap_or(true))
63 .cloned()
64 .collect()
65 };
66
67 let next_cursor = base + output.len();
68 (new_lines, next_cursor, dropped_lines)
69 }
70
71 pub async fn kill(&self) -> Result<(), String> {
72 let mut child = self.child.lock().await;
73 child
74 .kill()
75 .await
76 .map_err(|e| format!("Failed to kill shell '{}': {}", self.id, e))?;
77 self.running.store(false, Ordering::Relaxed);
78 Ok(())
79 }
80}
81
82fn sessions() -> &'static DashMap<String, Arc<ShellSession>> {
83 static SESSIONS: OnceLock<DashMap<String, Arc<ShellSession>>> = OnceLock::new();
84 SESSIONS.get_or_init(DashMap::new)
85}
86
87async fn push_line(output: &Arc<Mutex<Vec<String>>>, base_index: &Arc<Mutex<usize>>, line: String) {
88 let mut buffer = output.lock().await;
89 buffer.push(line);
90 if buffer.len() > MAX_OUTPUT_LINES {
91 let overflow = buffer.len() - MAX_OUTPUT_LINES;
92 buffer.drain(0..overflow);
93 let mut base = base_index.lock().await;
94 *base += overflow;
95 }
96}
97
98async fn pump_stream_lines<T>(
99 stream_name: &'static str,
100 reader: T,
101 output: Arc<Mutex<Vec<String>>>,
102 base_index: Arc<Mutex<usize>>,
103) where
104 T: tokio::io::AsyncRead + Unpin,
105{
106 let mut reader = BufReader::new(reader);
107 let mut line_bytes = Vec::new();
108
109 loop {
110 line_bytes.clear();
111 match reader.read_until(b'\n', &mut line_bytes).await {
112 Ok(0) => break,
113 Ok(_) => {
114 let line = decode_process_line_lossy(&mut line_bytes);
115 push_line(&output, &base_index, line).await;
116 }
117 Err(e) => {
118 warn!("Background shell {stream_name} read failed: {e}");
119 break;
120 }
121 }
122 }
123}
124
125pub async fn spawn_background(
126 command: &str,
127 cwd: Option<&Path>,
128) -> Result<Arc<ShellSession>, String> {
129 let shell = preferred_bash_shell();
130 trace_windows_command(
131 "agent.bash.background",
132 &shell.program,
133 [shell.arg, command],
134 );
135 let overrides = bamboo_infrastructure::Config::current_env_vars();
136 let prepared_env = build_command_environment(&overrides).await;
137 let mut cmd = Command::new(&shell.program);
138 hide_window_for_tokio_command(&mut cmd);
139 if let Some(cwd) = cwd {
140 cmd.current_dir(cwd);
141 }
142 prepared_env.apply_to_tokio_command(&mut cmd);
143 cmd.arg(shell.arg)
144 .arg(command)
145 .stdin(Stdio::null())
146 .stdout(Stdio::piped())
147 .stderr(Stdio::piped())
148 .kill_on_drop(true);
149
150 let mut child = cmd
151 .spawn()
152 .map_err(|e| format!("Failed to spawn background shell: {}", e))?;
153
154 let stdout = child
155 .stdout
156 .take()
157 .ok_or_else(|| "Failed to capture shell stdout".to_string())?;
158 let stderr = child
159 .stderr
160 .take()
161 .ok_or_else(|| "Failed to capture shell stderr".to_string())?;
162
163 let session_id = uuid::Uuid::new_v4().to_string();
164 let output = Arc::new(Mutex::new(Vec::new()));
165 let base_index = Arc::new(Mutex::new(0usize));
166 let running = Arc::new(AtomicBool::new(true));
167 let exit_code = Arc::new(Mutex::new(None));
168
169 let session = Arc::new(ShellSession {
170 id: session_id.clone(),
171 command: command.to_string(),
172 environment: prepared_env.diagnostics.clone(),
173 child: Arc::new(Mutex::new(child)),
174 output: output.clone(),
175 base_index: base_index.clone(),
176 running: running.clone(),
177 exit_code: exit_code.clone(),
178 });
179
180 {
181 let output = output.clone();
182 let base_index = base_index.clone();
183 tokio::spawn(async move {
184 pump_stream_lines("stdout", stdout, output, base_index).await;
185 });
186 }
187
188 {
189 let output = output.clone();
190 let base_index = base_index.clone();
191 tokio::spawn(async move {
192 pump_stream_lines("stderr", stderr, output, base_index).await;
193 });
194 }
195
196 {
197 let child = session.child.clone();
198 let session_id_for_gc = session_id.clone();
199 tokio::spawn(async move {
200 loop {
201 let poll = {
202 let mut guard = child.lock().await;
203 guard.try_wait()
204 };
205 match poll {
206 Ok(Some(status)) => {
207 *exit_code.lock().await = status.code();
208 running.store(false, Ordering::Relaxed);
209 break;
210 }
211 Ok(None) => {
212 sleep(Duration::from_millis(100)).await;
213 }
214 Err(_) => {
215 running.store(false, Ordering::Relaxed);
216 break;
217 }
218 }
219 }
220 sleep(Duration::from_secs(COMPLETED_SESSION_TTL_SECS)).await;
221 let _ = remove_shell(&session_id_for_gc);
222 });
223 }
224
225 sessions().insert(session_id, session.clone());
226 Ok(session)
227}
228
229pub fn get_shell(id: &str) -> Option<Arc<ShellSession>> {
230 sessions().get(id).map(|entry| entry.value().clone())
231}
232
233pub fn remove_shell(id: &str) -> Option<Arc<ShellSession>> {
234 sessions().remove(id).map(|(_, value)| value)
235}