claude_code_acp/session/
background_processes.rs1use std::io;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicUsize, Ordering};
9
10use dashmap::DashMap;
11use tokio::process::Child;
12use tokio::sync::Mutex;
13
14use crate::session::wrapped_child::WrappedChild;
15
16#[derive(Debug)]
25pub enum ChildHandle {
26 Unwrapped {
28 child: Arc<Mutex<Child>>,
30 },
31 Wrapped {
33 child: Arc<Mutex<WrappedChild>>,
35 },
36}
37
38impl Clone for ChildHandle {
39 fn clone(&self) -> Self {
40 match self {
41 Self::Unwrapped { child } => Self::Unwrapped {
42 child: Arc::clone(child),
43 },
44 Self::Wrapped { child } => Self::Wrapped {
45 child: Arc::clone(child),
46 },
47 }
48 }
49}
50
51impl ChildHandle {
52 pub fn stdout(&self) -> Option<&tokio::process::ChildStdout> {
55 None }
57
58 pub fn stderr(&self) -> Option<&tokio::process::ChildStderr> {
61 None }
63
64 pub async fn kill(&mut self) -> io::Result<()> {
66 match self {
67 Self::Unwrapped { child } => {
68 let mut guard = child.lock().await;
69 guard.kill().await
70 }
71 Self::Wrapped { child } => {
72 let mut guard = child.lock().await;
73 guard.kill().await
74 }
75 }
76 }
77
78 pub async fn wait(&mut self) -> io::Result<std::process::ExitStatus> {
80 match self {
81 Self::Unwrapped { child } => {
82 let mut guard = child.lock().await;
83 guard.wait().await
84 }
85 Self::Wrapped { child } => {
86 let mut guard = child.lock().await;
87 guard.wait().await
88 }
89 }
90 }
91
92 pub fn try_wait(&mut self) -> io::Result<Option<std::process::ExitStatus>> {
94 match self {
96 Self::Unwrapped { child } => {
97 if let Ok(mut guard) = child.try_lock() {
98 guard.try_wait()
99 } else {
100 Ok(None)
101 }
102 }
103 Self::Wrapped { child } => {
104 if let Ok(mut guard) = child.try_lock() {
105 guard.try_wait()
106 } else {
107 Ok(None)
108 }
109 }
110 }
111 }
112}
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub enum TerminalExitStatus {
117 Exited(i32),
119 Killed,
121 TimedOut,
123 Aborted,
125}
126
127impl TerminalExitStatus {
128 pub fn as_str(&self) -> &'static str {
130 match self {
131 Self::Exited(_) => "exited",
132 Self::Killed => "killed",
133 Self::TimedOut => "timedOut",
134 Self::Aborted => "aborted",
135 }
136 }
137}
138
139#[derive(Debug)]
141pub enum BackgroundTerminal {
142 Running {
144 child: ChildHandle,
146 output_buffer: Arc<Mutex<String>>,
148 last_read_offset: Arc<AtomicUsize>,
151 },
152 Finished {
154 status: TerminalExitStatus,
156 final_output: String,
158 },
159}
160
161impl BackgroundTerminal {
162 pub fn new_running(child: ChildHandle) -> Self {
164 Self::Running {
165 child,
166 output_buffer: Arc::new(Mutex::new(String::new())),
167 last_read_offset: Arc::new(AtomicUsize::new(0)),
168 }
169 }
170
171 pub fn new_running_unwrapped(child: Child) -> Self {
174 Self::Running {
175 child: ChildHandle::Unwrapped {
176 child: Arc::new(Mutex::new(child)),
177 },
178 output_buffer: Arc::new(Mutex::new(String::new())),
179 last_read_offset: Arc::new(AtomicUsize::new(0)),
180 }
181 }
182
183 pub fn is_running(&self) -> bool {
185 matches!(self, Self::Running { .. })
186 }
187
188 pub fn status_str(&self) -> &'static str {
190 match self {
191 Self::Running { .. } => "running",
192 Self::Finished { status, .. } => status.as_str(),
193 }
194 }
195
196 pub async fn get_incremental_output(&self) -> String {
198 match self {
199 Self::Running {
200 output_buffer,
201 last_read_offset,
202 ..
203 } => {
204 let current_offset = last_read_offset.load(Ordering::Acquire);
206
207 let buffer = output_buffer.lock().await;
208 let new_output = buffer[current_offset..].to_string();
209 let new_len = buffer.len();
210 drop(buffer);
211
212 last_read_offset.store(new_len, Ordering::Release);
214
215 new_output
216 }
217 Self::Finished { final_output, .. } => final_output.clone(),
218 }
219 }
220
221 pub async fn append_output(&self, output: &str) {
223 if let Self::Running { output_buffer, .. } = self {
224 let mut buffer = output_buffer.lock().await;
225 buffer.push_str(output);
226 }
227 }
228
229 pub async fn get_all_output(&self) -> String {
231 match self {
232 Self::Running { output_buffer, .. } => {
233 let buffer = output_buffer.lock().await;
234 buffer.clone()
235 }
236 Self::Finished { final_output, .. } => final_output.clone(),
237 }
238 }
239
240 pub async fn finish(self, status: TerminalExitStatus) -> Self {
242 match self {
243 Self::Running { output_buffer, .. } => {
244 let final_output = output_buffer.lock().await.clone();
245 Self::Finished {
246 status,
247 final_output,
248 }
249 }
250 finished @ Self::Finished { .. } => finished,
251 }
252 }
253}
254
255#[derive(Debug, Default)]
257pub struct BackgroundProcessManager {
258 terminals: DashMap<String, BackgroundTerminal>,
260}
261
262impl BackgroundProcessManager {
263 pub fn new() -> Self {
265 Self {
266 terminals: DashMap::new(),
267 }
268 }
269
270 pub fn register(&self, shell_id: String, terminal: BackgroundTerminal) {
272 self.terminals.insert(shell_id, terminal);
273 }
274
275 pub fn has_terminal(&self, shell_id: &str) -> bool {
277 self.terminals.contains_key(shell_id)
278 }
279
280 pub fn get(
282 &self,
283 shell_id: &str,
284 ) -> Option<dashmap::mapref::one::Ref<'_, String, BackgroundTerminal>> {
285 self.terminals.get(shell_id)
286 }
287
288 pub fn remove(&self, shell_id: &str) -> Option<(String, BackgroundTerminal)> {
290 self.terminals.remove(shell_id)
291 }
292
293 pub async fn finish_terminal(&self, shell_id: &str, status: TerminalExitStatus) {
295 if let Some((id, terminal)) = self.terminals.remove(shell_id) {
296 let finished = terminal.finish(status).await;
297 self.terminals.insert(id, finished);
298 }
299 }
300
301 pub fn count(&self) -> usize {
303 self.terminals.len()
304 }
305
306 pub fn shell_ids(&self) -> Vec<String> {
308 self.terminals.iter().map(|r| r.key().clone()).collect()
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315
316 #[test]
317 fn test_terminal_exit_status() {
318 assert_eq!(TerminalExitStatus::Exited(0).as_str(), "exited");
319 assert_eq!(TerminalExitStatus::Killed.as_str(), "killed");
320 assert_eq!(TerminalExitStatus::TimedOut.as_str(), "timedOut");
321 assert_eq!(TerminalExitStatus::Aborted.as_str(), "aborted");
322 }
323
324 #[test]
325 fn test_background_process_manager_new() {
326 let manager = BackgroundProcessManager::new();
327 assert_eq!(manager.count(), 0);
328 }
329
330 #[test]
331 fn test_background_process_manager_has_terminal() {
332 let manager = BackgroundProcessManager::new();
333 assert!(!manager.has_terminal("test-id"));
334 }
335
336 #[tokio::test]
337 async fn test_background_terminal_finished() {
338 let terminal = BackgroundTerminal::Finished {
339 status: TerminalExitStatus::Exited(0),
340 final_output: "test output".to_string(),
341 };
342
343 assert!(!terminal.is_running());
344 assert_eq!(terminal.status_str(), "exited");
345 assert_eq!(terminal.get_all_output().await, "test output");
346 }
347}