claude_code_acp/session/
background_processes.rs

1//! Background process management for terminal commands
2//!
3//! Manages background terminal processes that are started with `run_in_background=true`.
4//! Supports retrieving incremental output and killing running processes.
5
6use std::io;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9
10use dashmap::DashMap;
11use tokio::process::Child;
12use tokio::sync::Mutex;
13
14use crate::session::wrapped_child::WrappedChild;
15
16/// Child process handle that can be either wrapped or unwrapped
17///
18/// This enum allows us to support both:
19/// - Legacy tokio::process::Child (no process group)
20/// - WrappedChild with process group support (process-wrap)
21///
22/// Note: Clone implementation creates a new handle that shares the same child
23/// but does NOT clone stdout/stderr (those are taken by the first user).
24#[derive(Debug)]
25pub enum ChildHandle {
26    /// Unwrapped child (legacy, no process group support)
27    Unwrapped {
28        /// The child process
29        child: Arc<Mutex<Child>>,
30    },
31    /// Wrapped child with process group support (via process-wrap)
32    Wrapped {
33        /// The wrapped child
34        child: Arc<Mutex<WrappedChild>>,
35    },
36}
37
38impl Clone for ChildHandle {
39    fn clone(&self) -> Self {
40        match self {
41            Self::Unwrapped { child } => Self::Unwrapped {
42                child: Arc::clone(child),
43            },
44            Self::Wrapped { child } => Self::Wrapped {
45                child: Arc::clone(child),
46            },
47        }
48    }
49}
50
51impl ChildHandle {
52    /// Get stdout reference (only available if not yet taken)
53    /// Note: This always returns None after cloning because stdout/stderr are not cloned
54    pub fn stdout(&self) -> Option<&tokio::process::ChildStdout> {
55        None // Stdout/stderr are not available after cloning
56    }
57
58    /// Get stderr reference (only available if not yet taken)
59    /// Note: This always returns None after cloning because stdout/stderr are not cloned
60    pub fn stderr(&self) -> Option<&tokio::process::ChildStderr> {
61        None // Stdout/stderr are not available after cloning
62    }
63
64    /// Kill the process (and process group if wrapped)
65    pub async fn kill(&mut self) -> io::Result<()> {
66        match self {
67            Self::Unwrapped { child } => {
68                let mut guard = child.lock().await;
69                guard.kill().await
70            }
71            Self::Wrapped { child } => {
72                let mut guard = child.lock().await;
73                guard.kill().await
74            }
75        }
76    }
77
78    /// Wait for the process to exit
79    pub async fn wait(&mut self) -> io::Result<std::process::ExitStatus> {
80        match self {
81            Self::Unwrapped { child } => {
82                let mut guard = child.lock().await;
83                guard.wait().await
84            }
85            Self::Wrapped { child } => {
86                let mut guard = child.lock().await;
87                guard.wait().await
88            }
89        }
90    }
91
92    /// Try to wait without blocking
93    pub fn try_wait(&mut self) -> io::Result<Option<std::process::ExitStatus>> {
94        // For tokio::sync::Mutex, we need to use try_lock
95        match self {
96            Self::Unwrapped { child } => {
97                if let Ok(mut guard) = child.try_lock() {
98                    guard.try_wait()
99                } else {
100                    Ok(None)
101                }
102            }
103            Self::Wrapped { child } => {
104                if let Ok(mut guard) = child.try_lock() {
105                    guard.try_wait()
106                } else {
107                    Ok(None)
108                }
109            }
110        }
111    }
112}
113
114/// Terminal exit status
115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub enum TerminalExitStatus {
117    /// Process exited normally with exit code
118    Exited(i32),
119    /// Process was killed by user
120    Killed,
121    /// Process timed out
122    TimedOut,
123    /// Process was aborted
124    Aborted,
125}
126
127impl TerminalExitStatus {
128    /// Get status string for API response
129    pub fn as_str(&self) -> &'static str {
130        match self {
131            Self::Exited(_) => "exited",
132            Self::Killed => "killed",
133            Self::TimedOut => "timedOut",
134            Self::Aborted => "aborted",
135        }
136    }
137}
138
139/// Background terminal state
140#[derive(Debug)]
141pub enum BackgroundTerminal {
142    /// Terminal is still running
143    Running {
144        /// The child process handle (wrapped or unwrapped)
145        child: ChildHandle,
146        /// Accumulated output buffer
147        output_buffer: Arc<Mutex<String>>,
148        /// Last read offset for incremental output
149        /// Using AtomicUsize for lock-free atomic operations
150        last_read_offset: Arc<AtomicUsize>,
151    },
152    /// Terminal has finished
153    Finished {
154        /// Exit status
155        status: TerminalExitStatus,
156        /// Final output
157        final_output: String,
158    },
159}
160
161impl BackgroundTerminal {
162    /// Create a new running terminal with a child handle
163    pub fn new_running(child: ChildHandle) -> Self {
164        Self::Running {
165            child,
166            output_buffer: Arc::new(Mutex::new(String::new())),
167            last_read_offset: Arc::new(AtomicUsize::new(0)),
168        }
169    }
170
171    /// Create a new running terminal from a legacy Child (unwrapped)
172    /// Note: stdout/stderr should be taken before creating the handle
173    pub fn new_running_unwrapped(child: Child) -> Self {
174        Self::Running {
175            child: ChildHandle::Unwrapped {
176                child: Arc::new(Mutex::new(child)),
177            },
178            output_buffer: Arc::new(Mutex::new(String::new())),
179            last_read_offset: Arc::new(AtomicUsize::new(0)),
180        }
181    }
182
183    /// Check if terminal is still running
184    pub fn is_running(&self) -> bool {
185        matches!(self, Self::Running { .. })
186    }
187
188    /// Get the status string
189    pub fn status_str(&self) -> &'static str {
190        match self {
191            Self::Running { .. } => "running",
192            Self::Finished { status, .. } => status.as_str(),
193        }
194    }
195
196    /// Get incremental output since last read
197    pub async fn get_incremental_output(&self) -> String {
198        match self {
199            Self::Running {
200                output_buffer,
201                last_read_offset,
202                ..
203            } => {
204                // Use atomic load to get current offset (lock-free)
205                let current_offset = last_read_offset.load(Ordering::Acquire);
206
207                let buffer = output_buffer.lock().await;
208                let new_output = buffer[current_offset..].to_string();
209                let new_len = buffer.len();
210                drop(buffer);
211
212                // Update offset using atomic store (lock-free)
213                last_read_offset.store(new_len, Ordering::Release);
214
215                new_output
216            }
217            Self::Finished { final_output, .. } => final_output.clone(),
218        }
219    }
220
221    /// Append output to the buffer (for running terminals)
222    pub async fn append_output(&self, output: &str) {
223        if let Self::Running { output_buffer, .. } = self {
224            let mut buffer = output_buffer.lock().await;
225            buffer.push_str(output);
226        }
227    }
228
229    /// Get all output
230    pub async fn get_all_output(&self) -> String {
231        match self {
232            Self::Running { output_buffer, .. } => {
233                let buffer = output_buffer.lock().await;
234                buffer.clone()
235            }
236            Self::Finished { final_output, .. } => final_output.clone(),
237        }
238    }
239
240    /// Transition to finished state
241    pub async fn finish(self, status: TerminalExitStatus) -> Self {
242        match self {
243            Self::Running { output_buffer, .. } => {
244                let final_output = output_buffer.lock().await.clone();
245                Self::Finished {
246                    status,
247                    final_output,
248                }
249            }
250            finished @ Self::Finished { .. } => finished,
251        }
252    }
253}
254
255/// Manager for background terminal processes
256#[derive(Debug, Default)]
257pub struct BackgroundProcessManager {
258    /// Map of shell ID to background terminal
259    terminals: DashMap<String, BackgroundTerminal>,
260}
261
262impl BackgroundProcessManager {
263    /// Create a new background process manager
264    pub fn new() -> Self {
265        Self {
266            terminals: DashMap::new(),
267        }
268    }
269
270    /// Register a new background terminal
271    pub fn register(&self, shell_id: String, terminal: BackgroundTerminal) {
272        self.terminals.insert(shell_id, terminal);
273    }
274
275    /// Check if a terminal exists
276    pub fn has_terminal(&self, shell_id: &str) -> bool {
277        self.terminals.contains_key(shell_id)
278    }
279
280    /// Get terminal by ID (returns reference for reading)
281    pub fn get(
282        &self,
283        shell_id: &str,
284    ) -> Option<dashmap::mapref::one::Ref<'_, String, BackgroundTerminal>> {
285        self.terminals.get(shell_id)
286    }
287
288    /// Remove terminal by ID
289    pub fn remove(&self, shell_id: &str) -> Option<(String, BackgroundTerminal)> {
290        self.terminals.remove(shell_id)
291    }
292
293    /// Update a terminal to finished state
294    pub async fn finish_terminal(&self, shell_id: &str, status: TerminalExitStatus) {
295        if let Some((id, terminal)) = self.terminals.remove(shell_id) {
296            let finished = terminal.finish(status).await;
297            self.terminals.insert(id, finished);
298        }
299    }
300
301    /// Get number of terminals
302    pub fn count(&self) -> usize {
303        self.terminals.len()
304    }
305
306    /// Get all shell IDs
307    pub fn shell_ids(&self) -> Vec<String> {
308        self.terminals.iter().map(|r| r.key().clone()).collect()
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315
316    #[test]
317    fn test_terminal_exit_status() {
318        assert_eq!(TerminalExitStatus::Exited(0).as_str(), "exited");
319        assert_eq!(TerminalExitStatus::Killed.as_str(), "killed");
320        assert_eq!(TerminalExitStatus::TimedOut.as_str(), "timedOut");
321        assert_eq!(TerminalExitStatus::Aborted.as_str(), "aborted");
322    }
323
324    #[test]
325    fn test_background_process_manager_new() {
326        let manager = BackgroundProcessManager::new();
327        assert_eq!(manager.count(), 0);
328    }
329
330    #[test]
331    fn test_background_process_manager_has_terminal() {
332        let manager = BackgroundProcessManager::new();
333        assert!(!manager.has_terminal("test-id"));
334    }
335
336    #[tokio::test]
337    async fn test_background_terminal_finished() {
338        let terminal = BackgroundTerminal::Finished {
339            status: TerminalExitStatus::Exited(0),
340            final_output: "test output".to_string(),
341        };
342
343        assert!(!terminal.is_running());
344        assert_eq!(terminal.status_str(), "exited");
345        assert_eq!(terminal.get_all_output().await, "test output");
346    }
347}