Skip to main content

bamboo_tools/tools/
bash_runtime.rs

1use bamboo_infrastructure::{
2    build_command_environment, decode_process_line_lossy, hide_window_for_tokio_command,
3    preferred_bash_shell, trace_windows_command, CommandEnvironmentDiagnostics,
4};
5use dashmap::DashMap;
6use regex::Regex;
7use std::path::Path;
8use std::process::Stdio;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::{Arc, OnceLock};
11use tokio::io::{AsyncBufReadExt, BufReader};
12use tokio::process::{Child, Command};
13use tokio::sync::Mutex;
14use tokio::time::{sleep, Duration};
15use tracing::warn;
16
17const MAX_OUTPUT_LINES: usize = 20_000;
18const COMPLETED_SESSION_TTL_SECS: u64 = 300;
19
20#[derive(Debug)]
21pub struct ShellSession {
22    pub id: String,
23    pub command: String,
24    pub environment: CommandEnvironmentDiagnostics,
25    child: Arc<Mutex<Child>>,
26    output: Arc<Mutex<Vec<String>>>,
27    base_index: Arc<Mutex<usize>>,
28    running: Arc<AtomicBool>,
29    exit_code: Arc<Mutex<Option<i32>>>,
30}
31
32impl ShellSession {
33    pub fn status(&self) -> &'static str {
34        if self.running.load(Ordering::Relaxed) {
35            "running"
36        } else {
37            "completed"
38        }
39    }
40
41    pub async fn exit_code(&self) -> Option<i32> {
42        *self.exit_code.lock().await
43    }
44
45    pub async fn read_output_since(
46        &self,
47        cursor: usize,
48        filter: Option<&Regex>,
49    ) -> (Vec<String>, usize, usize) {
50        let output = self.output.lock().await;
51        let base_index = self.base_index.lock().await;
52
53        let base = *base_index;
54        let effective_cursor = cursor.max(base);
55        let dropped_lines = effective_cursor.saturating_sub(cursor);
56        let start = effective_cursor.saturating_sub(base);
57        let new_lines = if start >= output.len() {
58            Vec::new()
59        } else {
60            output[start..]
61                .iter()
62                .filter(|line| filter.map(|re| re.is_match(line)).unwrap_or(true))
63                .cloned()
64                .collect()
65        };
66
67        let next_cursor = base + output.len();
68        (new_lines, next_cursor, dropped_lines)
69    }
70
71    pub async fn kill(&self) -> Result<(), String> {
72        let mut child = self.child.lock().await;
73        child
74            .kill()
75            .await
76            .map_err(|e| format!("Failed to kill shell '{}': {}", self.id, e))?;
77        self.running.store(false, Ordering::Relaxed);
78        Ok(())
79    }
80}
81
82fn sessions() -> &'static DashMap<String, Arc<ShellSession>> {
83    static SESSIONS: OnceLock<DashMap<String, Arc<ShellSession>>> = OnceLock::new();
84    SESSIONS.get_or_init(DashMap::new)
85}
86
87async fn push_line(output: &Arc<Mutex<Vec<String>>>, base_index: &Arc<Mutex<usize>>, line: String) {
88    let mut buffer = output.lock().await;
89    buffer.push(line);
90    if buffer.len() > MAX_OUTPUT_LINES {
91        let overflow = buffer.len() - MAX_OUTPUT_LINES;
92        buffer.drain(0..overflow);
93        let mut base = base_index.lock().await;
94        *base += overflow;
95    }
96}
97
98async fn pump_stream_lines<T>(
99    stream_name: &'static str,
100    reader: T,
101    output: Arc<Mutex<Vec<String>>>,
102    base_index: Arc<Mutex<usize>>,
103) where
104    T: tokio::io::AsyncRead + Unpin,
105{
106    let mut reader = BufReader::new(reader);
107    let mut line_bytes = Vec::new();
108
109    loop {
110        line_bytes.clear();
111        match reader.read_until(b'\n', &mut line_bytes).await {
112            Ok(0) => break,
113            Ok(_) => {
114                let line = decode_process_line_lossy(&mut line_bytes);
115                push_line(&output, &base_index, line).await;
116            }
117            Err(e) => {
118                warn!("Background shell {stream_name} read failed: {e}");
119                break;
120            }
121        }
122    }
123}
124
125pub async fn spawn_background(
126    command: &str,
127    cwd: Option<&Path>,
128) -> Result<Arc<ShellSession>, String> {
129    let shell = preferred_bash_shell();
130    trace_windows_command(
131        "agent.bash.background",
132        &shell.program,
133        [shell.arg, command],
134    );
135    let overrides = bamboo_infrastructure::Config::current_env_vars();
136    let prepared_env = build_command_environment(&overrides).await;
137    let mut cmd = Command::new(&shell.program);
138    hide_window_for_tokio_command(&mut cmd);
139    if let Some(cwd) = cwd {
140        cmd.current_dir(cwd);
141    }
142    prepared_env.apply_to_tokio_command(&mut cmd);
143    cmd.arg(shell.arg)
144        .arg(command)
145        .stdin(Stdio::null())
146        .stdout(Stdio::piped())
147        .stderr(Stdio::piped())
148        .kill_on_drop(true);
149
150    let mut child = cmd
151        .spawn()
152        .map_err(|e| format!("Failed to spawn background shell: {}", e))?;
153
154    let stdout = child
155        .stdout
156        .take()
157        .ok_or_else(|| "Failed to capture shell stdout".to_string())?;
158    let stderr = child
159        .stderr
160        .take()
161        .ok_or_else(|| "Failed to capture shell stderr".to_string())?;
162
163    let session_id = uuid::Uuid::new_v4().to_string();
164    let output = Arc::new(Mutex::new(Vec::new()));
165    let base_index = Arc::new(Mutex::new(0usize));
166    let running = Arc::new(AtomicBool::new(true));
167    let exit_code = Arc::new(Mutex::new(None));
168
169    let session = Arc::new(ShellSession {
170        id: session_id.clone(),
171        command: command.to_string(),
172        environment: prepared_env.diagnostics.clone(),
173        child: Arc::new(Mutex::new(child)),
174        output: output.clone(),
175        base_index: base_index.clone(),
176        running: running.clone(),
177        exit_code: exit_code.clone(),
178    });
179
180    {
181        let output = output.clone();
182        let base_index = base_index.clone();
183        tokio::spawn(async move {
184            pump_stream_lines("stdout", stdout, output, base_index).await;
185        });
186    }
187
188    {
189        let output = output.clone();
190        let base_index = base_index.clone();
191        tokio::spawn(async move {
192            pump_stream_lines("stderr", stderr, output, base_index).await;
193        });
194    }
195
196    {
197        let child = session.child.clone();
198        let session_id_for_gc = session_id.clone();
199        tokio::spawn(async move {
200            loop {
201                let poll = {
202                    let mut guard = child.lock().await;
203                    guard.try_wait()
204                };
205                match poll {
206                    Ok(Some(status)) => {
207                        *exit_code.lock().await = status.code();
208                        running.store(false, Ordering::Relaxed);
209                        break;
210                    }
211                    Ok(None) => {
212                        sleep(Duration::from_millis(100)).await;
213                    }
214                    Err(_) => {
215                        running.store(false, Ordering::Relaxed);
216                        break;
217                    }
218                }
219            }
220            sleep(Duration::from_secs(COMPLETED_SESSION_TTL_SECS)).await;
221            let _ = remove_shell(&session_id_for_gc);
222        });
223    }
224
225    sessions().insert(session_id, session.clone());
226    Ok(session)
227}
228
229pub fn get_shell(id: &str) -> Option<Arc<ShellSession>> {
230    sessions().get(id).map(|entry| entry.value().clone())
231}
232
233pub fn remove_shell(id: &str) -> Option<Arc<ShellSession>> {
234    sessions().remove(id).map(|(_, value)| value)
235}