bamboo_agent/process/registry.rs
1//! Process management for external agent runs and Claude sessions
2//!
3//! This module provides process lifecycle management:
4//! - Registration and tracking of running processes
5//! - Graceful and forceful termination
6//! - Live output capture
7//! - Cross-platform process killing
8//!
9//! # Overview
10//!
11//! The process registry maintains a centralized record of all running agent processes
12//! and Claude sessions. It provides thread-safe access to process information and
13//! handles cross-platform process termination.
14//!
15//! # Architecture
16//!
17//! ```text
18//! ┌─────────────────────────────────────────┐
19//! │ ProcessRegistry (Central) │
20//! │ ┌───────────────────────────────────┐ │
21//! │ │ HashMap<run_id, ProcessHandle> │ │
22//! │ │ - Process metadata │ │
23//! │ │ - Child process handle │ │
24//! │ │ - Live output buffer │ │
25//! │ └───────────────────────────────────┘ │
26//! └─────────────────────────────────────────┘
27//! ▲ ▲
28//! │ │
29//! ┌──────┴─────┐ ┌──────┴─────┐
30//! │ Agent Run │ │ Claude │
31//! │ Process │ │ Session │
32//! └────────────┘ └────────────┘
33//! ```
34//!
35//! # Usage Example
36//!
37//! ```rust,no_run
38//! use bamboo_agent::process::registry::{ProcessRegistry, ProcessRegistrationConfig};
39//! use std::sync::{Arc, Mutex};
40//! use tokio::process::Command;
41//!
42//! #[tokio::main]
43//! async fn main() -> Result<(), String> {
44//! // Create registry
45//! let registry = ProcessRegistry::new();
46//!
47//! // Spawn a process
48//! let mut child = Command::new("my-agent")
49//! .arg("--task")
50//! .arg("analyze")
51//! .spawn()
52//! .map_err(|e| e.to_string())?;
53//!
54//! let pid = child.id().unwrap_or(0);
55//!
56//! // Register the process
57//! let config = ProcessRegistrationConfig {
58//! run_id: 1000001,
59//! agent_id: 1,
60//! agent_name: "CodeAnalyzer".to_string(),
61//! pid,
62//! project_path: "/project".to_string(),
63//! task: "analyze code".to_string(),
64//! model: "claude-3-5-sonnet".to_string(),
65//! };
66//!
67//! registry.register_process(config, child).await?;
68//!
69//! // Later, kill the process
70//! registry.kill_process(1000001).await?;
71//!
72//! Ok(())
73//! }
74//! ```
75
76use chrono::{DateTime, Utc};
77use serde::{Deserialize, Serialize};
78use std::collections::HashMap;
79use std::sync::{Arc, Mutex};
80use tokio::process::Child;
81use tokio::sync::Mutex as AsyncMutex;
82
83/// Type of process being tracked in the registry
84///
85/// This enum distinguishes between different types of processes that
86/// can be managed by the registry, each with their own metadata.
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub enum ProcessType {
89 /// An agent run process spawned by the server
90 AgentRun {
91 /// Unique identifier for the agent
92 agent_id: i64,
93 /// Human-readable name of the agent
94 agent_name: String,
95 },
96
97 /// A Claude interactive session process
98 ClaudeSession {
99 /// Session identifier for the Claude conversation
100 session_id: String,
101 },
102}
103
104/// Metadata about a registered process
105///
106/// Contains all the information needed to track, monitor, and manage
107/// a running process throughout its lifecycle.
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct ProcessInfo {
110 /// Unique run identifier for this process execution
111 pub run_id: i64,
112
113 /// Type of process (agent run or Claude session)
114 pub process_type: ProcessType,
115
116 /// Operating system process ID
117 pub pid: u32,
118
119 /// Timestamp when the process was started
120 pub started_at: DateTime<Utc>,
121
122 /// Project directory where the process is running
123 pub project_path: String,
124
125 /// Task description or prompt being executed
126 pub task: String,
127
128 /// Model identifier being used (e.g., "claude-3-5-sonnet")
129 pub model: String,
130}
131
132/// Configuration for registering a new agent process
133///
134/// This struct contains all the parameters needed to register
135/// an agent run process in the registry.
136#[derive(Debug, Clone)]
137pub struct ProcessRegistrationConfig {
138 /// Unique run identifier
139 pub run_id: i64,
140
141 /// Agent identifier
142 pub agent_id: i64,
143
144 /// Human-readable agent name
145 pub agent_name: String,
146
147 /// Operating system process ID
148 pub pid: u32,
149
150 /// Project directory path
151 pub project_path: String,
152
153 /// Task description
154 pub task: String,
155
156 /// Model being used
157 pub model: String,
158}
159
160/// Internal handle to a registered process
161///
162/// Combines process metadata with runtime resources needed to
163/// manage the process lifecycle and capture output.
164#[allow(dead_code)]
165pub struct ProcessHandle {
166 /// Process metadata and configuration
167 pub info: ProcessInfo,
168
169 /// Handle to the child process (if available)
170 ///
171 /// This is wrapped in Arc<Mutex> to allow shared ownership
172 /// and thread-safe access for process management operations.
173 pub child: Arc<Mutex<Option<Child>>>,
174
175 /// Buffer for capturing live process output
176 ///
177 /// Stores stdout/stderr output as it's generated, allowing
178 /// clients to retrieve recent output at any time.
179 pub live_output: Arc<Mutex<String>>,
180}
181
182/// Central registry for managing running agent processes
183///
184/// The ProcessRegistry maintains a thread-safe map of all running processes,
185/// providing lifecycle management, monitoring, and termination capabilities.
186///
187/// # Thread Safety
188///
189/// The registry uses async-aware locking (`tokio::sync::Mutex`) for the
190/// process map to avoid blocking async tasks. Process handles use
191/// standard `std::sync::Mutex` for synchronous operations.
192///
193/// # Process Lifecycle
194///
195/// ```text
196/// 1. Register → Process added to registry with metadata
197/// 2. Running → Process executes, output is captured
198/// 3. Kill → Graceful shutdown attempted, then force kill
199/// 4. Cleanup → Process removed from registry
200/// ```
201pub struct ProcessRegistry {
202 /// Map of run IDs to process handles
203 processes: Arc<AsyncMutex<HashMap<i64, ProcessHandle>>>,
204
205 /// Counter for generating unique run IDs
206 ///
207 /// Starts at 1,000,000 to distinguish from lower numbered IDs
208 /// that might be used elsewhere in the system.
209 next_id: Arc<Mutex<i64>>,
210}
211
212impl ProcessRegistry {
213 /// Create a new empty process registry
214 ///
215 /// Initializes the registry with an empty process map and
216 /// sets the ID counter to start at 1,000,000.
217 ///
218 /// # Example
219 ///
220 /// ```ignore
221 /// use bamboo_agent::process::registry::ProcessRegistry;
222 ///
223 /// let registry = ProcessRegistry::new();
224 /// ```
225 pub fn new() -> Self {
226 Self {
227 processes: Arc::new(AsyncMutex::new(HashMap::new())),
228 next_id: Arc::new(Mutex::new(1000000)),
229 }
230 }
231
232 /// Generate a unique run ID
233 ///
234 /// Returns the next available run ID and increments the counter.
235 /// IDs start at 1,000,000 and increase sequentially.
236 ///
237 /// # Errors
238 ///
239 /// Returns an error if the ID counter mutex is poisoned.
240 ///
241 /// # Example
242 ///
243 /// ```ignore
244 /// use bamboo_agent::process::registry::ProcessRegistry;
245 ///
246 /// let registry = ProcessRegistry::new();
247 /// let id = registry.generate_id().unwrap();
248 /// assert!(id >= 1000000);
249 /// ```
250 pub fn generate_id(&self) -> Result<i64, String> {
251 let mut next_id = self.next_id.lock().map_err(|e| e.to_string())?;
252 let id = *next_id;
253 *next_id += 1;
254 Ok(id)
255 }
256
257 /// Register a new agent run process
258 ///
259 /// Adds a newly spawned agent process to the registry with its
260 /// configuration and process handle.
261 ///
262 /// # Arguments
263 ///
264 /// * `config` - Registration configuration with agent metadata
265 /// * `child` - The spawned child process handle
266 ///
267 /// # Returns
268 ///
269 /// Returns `Ok(())` if registration succeeds.
270 ///
271 /// # Errors
272 ///
273 /// Returns an error if the process map lock fails.
274 ///
275 /// # Example
276 ///
277 /// ```rust,no_run
278 /// use bamboo_agent::process::registry::{ProcessRegistry, ProcessRegistrationConfig};
279 /// use tokio::process::Command;
280 ///
281 /// #[tokio::main]
282 /// async fn main() -> Result<(), String> {
283 /// let registry = ProcessRegistry::new();
284 ///
285 /// let mut child = Command::new("agent-binary")
286 /// .spawn()
287 /// .map_err(|e| e.to_string())?;
288 ///
289 /// let config = ProcessRegistrationConfig {
290 /// run_id: 1000001,
291 /// agent_id: 1,
292 /// agent_name: "MyAgent".to_string(),
293 /// pid: child.id().unwrap_or(0),
294 /// project_path: "/project".to_string(),
295 /// task: "Analyze code".to_string(),
296 /// model: "claude-3-5-sonnet".to_string(),
297 /// };
298 ///
299 /// registry.register_process(config, child).await
300 /// }
301 /// ```
302 pub async fn register_process(
303 &self,
304 config: ProcessRegistrationConfig,
305 child: Child,
306 ) -> Result<(), String> {
307 let ProcessRegistrationConfig {
308 run_id,
309 agent_id,
310 agent_name,
311 pid,
312 project_path,
313 task,
314 model,
315 } = config;
316
317 let process_info = ProcessInfo {
318 run_id,
319 process_type: ProcessType::AgentRun {
320 agent_id,
321 agent_name,
322 },
323 pid,
324 started_at: Utc::now(),
325 project_path,
326 task,
327 model,
328 };
329
330 self.register_process_internal(run_id, process_info, child)
331 .await
332 }
333
334 /// Register a sidecar process without a direct child handle
335 ///
336 /// Used for processes that are managed externally (e.g., by Tauri)
337 /// but still need to be tracked in the registry for monitoring.
338 ///
339 /// # Arguments
340 ///
341 /// * `config` - Registration configuration with agent metadata
342 ///
343 /// # Returns
344 ///
345 /// Returns `Ok(())` if registration succeeds.
346 ///
347 /// # Errors
348 ///
349 /// Returns an error if the process map lock fails.
350 pub async fn register_sidecar_process(
351 &self,
352 config: ProcessRegistrationConfig,
353 ) -> Result<(), String> {
354 let ProcessRegistrationConfig {
355 run_id,
356 agent_id,
357 agent_name,
358 pid,
359 project_path,
360 task,
361 model,
362 } = config;
363
364 let process_info = ProcessInfo {
365 run_id,
366 process_type: ProcessType::AgentRun {
367 agent_id,
368 agent_name,
369 },
370 pid,
371 started_at: Utc::now(),
372 project_path,
373 task,
374 model,
375 };
376
377 let mut processes = self.processes.lock().await;
378
379 let process_handle = ProcessHandle {
380 info: process_info,
381 child: Arc::new(Mutex::new(None)),
382 live_output: Arc::new(Mutex::new(String::new())),
383 };
384
385 processes.insert(run_id, process_handle);
386 Ok(())
387 }
388
389 /// Register a Claude interactive session process
390 ///
391 /// Adds a Claude session to the registry, generating a unique run ID
392 /// if one isn't provided.
393 ///
394 /// # Arguments
395 ///
396 /// * `session_id` - Claude session identifier
397 /// * `pid` - Operating system process ID
398 /// * `project_path` - Project directory path
399 /// * `task` - Task description
400 /// * `model` - Model identifier
401 /// * `child` - Optional child process handle wrapped in Arc<Mutex>
402 ///
403 /// # Returns
404 ///
405 /// Returns the generated or provided run ID on success.
406 ///
407 /// # Errors
408 ///
409 /// Returns an error if ID generation or process map lock fails.
410 pub async fn register_claude_session(
411 &self,
412 session_id: String,
413 pid: u32,
414 project_path: String,
415 task: String,
416 model: String,
417 child: Arc<Mutex<Option<Child>>>,
418 ) -> Result<i64, String> {
419 let run_id = self.generate_id()?;
420
421 let process_info = ProcessInfo {
422 run_id,
423 process_type: ProcessType::ClaudeSession { session_id },
424 pid,
425 started_at: Utc::now(),
426 project_path,
427 task,
428 model,
429 };
430
431 let mut processes = self.processes.lock().await;
432
433 let process_handle = ProcessHandle {
434 info: process_info,
435 child,
436 live_output: Arc::new(Mutex::new(String::new())),
437 };
438
439 processes.insert(run_id, process_handle);
440 Ok(run_id)
441 }
442
443 /// Internal helper to register a process in the map
444 async fn register_process_internal(
445 &self,
446 run_id: i64,
447 process_info: ProcessInfo,
448 child: Child,
449 ) -> Result<(), String> {
450 let mut processes = self.processes.lock().await;
451
452 let process_handle = ProcessHandle {
453 info: process_info,
454 child: Arc::new(Mutex::new(Some(child))),
455 live_output: Arc::new(Mutex::new(String::new())),
456 };
457
458 processes.insert(run_id, process_handle);
459 Ok(())
460 }
461
462 /// Get all running Claude session processes
463 ///
464 /// Returns a list of all registered Claude sessions that are
465 /// currently tracked in the registry.
466 ///
467 /// # Returns
468 ///
469 /// Vector of `ProcessInfo` for all Claude sessions.
470 ///
471 /// # Errors
472 ///
473 /// Returns an error if the process map lock fails.
474 pub async fn get_running_claude_sessions(&self) -> Result<Vec<ProcessInfo>, String> {
475 let processes = self.processes.lock().await;
476 Ok(processes
477 .values()
478 .filter_map(|handle| match &handle.info.process_type {
479 ProcessType::ClaudeSession { .. } => Some(handle.info.clone()),
480 _ => None,
481 })
482 .collect())
483 }
484
485 /// Find a Claude session by its session ID
486 ///
487 /// Searches the registry for a Claude session matching the
488 /// provided session identifier.
489 ///
490 /// # Arguments
491 ///
492 /// * `session_id` - The Claude session ID to search for
493 ///
494 /// # Returns
495 ///
496 /// `Some(ProcessInfo)` if found, `None` otherwise.
497 ///
498 /// # Errors
499 ///
500 /// Returns an error if the process map lock fails.
501 pub async fn get_claude_session_by_id(
502 &self,
503 session_id: &str,
504 ) -> Result<Option<ProcessInfo>, String> {
505 let processes = self.processes.lock().await;
506 Ok(processes
507 .values()
508 .find(|handle| match &handle.info.process_type {
509 ProcessType::ClaudeSession { session_id: sid } => sid == session_id,
510 _ => false,
511 })
512 .map(|handle| handle.info.clone()))
513 }
514
515 /// Remove a process from the registry
516 ///
517 /// Unregisters a process by its run ID. This does NOT kill the process;
518 /// it only removes it from tracking.
519 ///
520 /// # Arguments
521 ///
522 /// * `run_id` - The run ID of the process to remove
523 ///
524 /// # Returns
525 ///
526 /// Returns `Ok(())` whether or not the process existed.
527 ///
528 /// # Errors
529 ///
530 /// Returns an error if the process map lock fails.
531 pub async fn unregister_process(&self, run_id: i64) -> Result<(), String> {
532 let mut processes = self.processes.lock().await;
533 processes.remove(&run_id);
534 Ok(())
535 }
536
537 /// Synchronous version for use in non-async contexts
538 ///
539 /// Uses `try_lock` to avoid blocking. If the lock is held,
540 /// the operation is skipped (process will be cleaned up later).
541 #[allow(dead_code)]
542 fn unregister_process_sync(&self, run_id: i64) -> Result<(), String> {
543 // Use try_lock to avoid blocking in sync context
544 // If we can't get the lock, that's okay - the process will be cleaned up later
545 if let Ok(mut processes) = self.processes.try_lock() {
546 processes.remove(&run_id);
547 }
548 Ok(())
549 }
550
551 /// Get all registered processes (agent runs and Claude sessions)
552 ///
553 /// Returns a list of all processes currently in the registry.
554 ///
555 /// # Returns
556 ///
557 /// Vector of `ProcessInfo` for all registered processes.
558 ///
559 /// # Errors
560 ///
561 /// Returns an error if the process map lock fails.
562 #[allow(dead_code)]
563 pub async fn get_running_processes(&self) -> Result<Vec<ProcessInfo>, String> {
564 let processes = self.processes.lock().await;
565 Ok(processes
566 .values()
567 .map(|handle| handle.info.clone())
568 .collect())
569 }
570
571 /// Get all running agent run processes
572 ///
573 /// Returns a list of all agent run processes (excluding Claude sessions)
574 /// currently tracked in the registry.
575 ///
576 /// # Returns
577 ///
578 /// Vector of `ProcessInfo` for agent run processes.
579 ///
580 /// # Errors
581 ///
582 /// Returns an error if the process map lock fails.
583 pub async fn get_running_agent_processes(&self) -> Result<Vec<ProcessInfo>, String> {
584 let processes = self.processes.lock().await;
585 Ok(processes
586 .values()
587 .filter_map(|handle| match &handle.info.process_type {
588 ProcessType::AgentRun { .. } => Some(handle.info.clone()),
589 _ => None,
590 })
591 .collect())
592 }
593
594 /// Get process information by run ID
595 ///
596 /// Retrieves metadata for a specific process.
597 ///
598 /// # Arguments
599 ///
600 /// * `run_id` - The run ID to look up
601 ///
602 /// # Returns
603 ///
604 /// `Some(ProcessInfo)` if found, `None` otherwise.
605 ///
606 /// # Errors
607 ///
608 /// Returns an error if the process map lock fails.
609 #[allow(dead_code)]
610 pub async fn get_process(&self, run_id: i64) -> Result<Option<ProcessInfo>, String> {
611 let processes = self.processes.lock().await;
612 Ok(processes.get(&run_id).map(|handle| handle.info.clone()))
613 }
614
615 /// Kill a process by run ID
616 ///
617 /// Attempts graceful shutdown first using the child process handle,
618 /// then falls back to system-level process termination if needed.
619 ///
620 /// # Process
621 ///
622 /// 1. Send kill signal via child handle
623 /// 2. Wait up to 5 seconds for graceful exit
624 /// 3. If timeout, use system kill command (kill -KILL or taskkill)
625 /// 4. Remove process from registry
626 ///
627 /// # Arguments
628 ///
629 /// * `run_id` - The run ID of the process to kill
630 ///
631 /// # Returns
632 ///
633 /// `Ok(true)` if the process was killed successfully,
634 /// `Ok(false)` if the process wasn't found.
635 ///
636 /// # Errors
637 ///
638 /// Returns an error if process termination fails critically.
639 ///
640 /// # Cross-Platform
641 ///
642 /// - Unix: Uses SIGTERM first, then SIGKILL if needed
643 /// - Windows: Uses taskkill /F
644 pub async fn kill_process(&self, run_id: i64) -> Result<bool, String> {
645 use tracing::{error, info, warn};
646
647 let (pid, child_arc) = {
648 let processes = self.processes.lock().await;
649 if let Some(handle) = processes.get(&run_id) {
650 (handle.info.pid, handle.child.clone())
651 } else {
652 warn!("Process {} not found in registry", run_id);
653 return Ok(false);
654 }
655 };
656
657 info!(
658 "Attempting graceful shutdown of process {} (PID: {})",
659 run_id, pid
660 );
661
662 let kill_sent = {
663 let mut child_guard = child_arc.lock().map_err(|e| e.to_string())?;
664 if let Some(child) = child_guard.as_mut() {
665 match child.start_kill() {
666 Ok(_) => {
667 info!("Successfully sent kill signal to process {}", run_id);
668 true
669 }
670 Err(e) => {
671 error!("Failed to send kill signal to process {}: {}", run_id, e);
672 false
673 }
674 }
675 } else {
676 warn!(
677 "No child handle available for process {} (PID: {}), attempting system kill",
678 run_id, pid
679 );
680 false
681 }
682 };
683
684 if !kill_sent {
685 info!(
686 "Attempting fallback kill for process {} (PID: {})",
687 run_id, pid
688 );
689 match self.kill_process_by_pid(run_id, pid).await {
690 Ok(true) => return Ok(true),
691 Ok(false) => warn!(
692 "Fallback kill also failed for process {} (PID: {})",
693 run_id, pid
694 ),
695 Err(e) => error!("Error during fallback kill: {}", e),
696 }
697 }
698
699 let wait_result = tokio::time::timeout(tokio::time::Duration::from_secs(5), async {
700 loop {
701 let status = {
702 let mut child_guard = child_arc.lock().map_err(|e| e.to_string())?;
703 if let Some(child) = child_guard.as_mut() {
704 match child.try_wait() {
705 Ok(Some(status)) => {
706 info!("Process {} exited with status: {:?}", run_id, status);
707 *child_guard = None;
708 Some(Ok::<(), String>(()))
709 }
710 Ok(None) => None,
711 Err(e) => {
712 error!("Error checking process status: {}", e);
713 Some(Err(e.to_string()))
714 }
715 }
716 } else {
717 Some(Ok(()))
718 }
719 };
720
721 match status {
722 Some(result) => return result,
723 None => {
724 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
725 }
726 }
727 }
728 })
729 .await;
730
731 match wait_result {
732 Ok(Ok(_)) => {
733 info!("Process {} exited gracefully", run_id);
734 }
735 Ok(Err(e)) => {
736 error!("Error waiting for process {}: {}", run_id, e);
737 }
738 Err(_) => {
739 warn!("Process {} didn't exit within 5 seconds after kill", run_id);
740 if let Ok(mut child_guard) = child_arc.lock() {
741 *child_guard = None;
742 }
743 let _ = self.kill_process_by_pid(run_id, pid).await;
744 }
745 }
746
747 self.unregister_process(run_id).await?;
748
749 Ok(true)
750 }
751
752 /// Kill a process by its operating system PID
753 ///
754 /// Uses system commands to terminate a process when the child
755 /// handle is not available or has already been dropped.
756 ///
757 /// # Arguments
758 ///
759 /// * `run_id` - Run ID for registry cleanup
760 /// * `pid` - Operating system process ID
761 ///
762 /// # Returns
763 ///
764 /// `Ok(true)` if kill succeeded, `Ok(false)` if it failed.
765 ///
766 /// # Errors
767 ///
768 /// Returns an error if the kill command cannot be executed.
769 ///
770 /// # Cross-Platform Behavior
771 ///
772 /// - Unix: Tries SIGTERM first, waits 2 seconds, then SIGKILL
773 /// - Windows: Uses taskkill /F immediately
774 pub async fn kill_process_by_pid(&self, run_id: i64, pid: u32) -> Result<bool, String> {
775 use tracing::{error, info, warn};
776
777 info!("Attempting to kill process {} by PID {}", run_id, pid);
778
779 let kill_result = if cfg!(target_os = "windows") {
780 let pid_str = pid.to_string();
781 crate::core::process_utils::trace_windows_command(
782 "process_registry.kill_process_by_pid",
783 "taskkill",
784 ["/F", "/PID", pid_str.as_str()],
785 );
786 let mut command = std::process::Command::new("taskkill");
787 crate::core::process_utils::hide_window_for_std_command(&mut command);
788 command.args(["/F", "/PID", &pid_str]).output()
789 } else {
790 let term_result = std::process::Command::new("kill")
791 .args(["-TERM", &pid.to_string()])
792 .output();
793
794 match &term_result {
795 Ok(output) if output.status.success() => {
796 info!("Sent SIGTERM to PID {}", pid);
797 tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
798
799 let check_result = std::process::Command::new("kill")
800 .args(["-0", &pid.to_string()])
801 .output();
802
803 if let Ok(output) = check_result {
804 if output.status.success() {
805 warn!(
806 "Process {} still running after SIGTERM, sending SIGKILL",
807 pid
808 );
809 std::process::Command::new("kill")
810 .args(["-KILL", &pid.to_string()])
811 .output()
812 } else {
813 term_result
814 }
815 } else {
816 term_result
817 }
818 }
819 _ => {
820 warn!("SIGTERM failed for PID {}, trying SIGKILL", pid);
821 std::process::Command::new("kill")
822 .args(["-KILL", &pid.to_string()])
823 .output()
824 }
825 }
826 };
827
828 match kill_result {
829 Ok(output) => {
830 if output.status.success() {
831 info!("Successfully killed process with PID {}", pid);
832 self.unregister_process(run_id).await?;
833 Ok(true)
834 } else {
835 let error_msg = String::from_utf8_lossy(&output.stderr);
836 warn!("Failed to kill PID {}: {}", pid, error_msg);
837 Ok(false)
838 }
839 }
840 Err(e) => {
841 error!("Failed to execute kill command for PID {}: {}", pid, e);
842 Err(format!("Failed to execute kill command: {}", e))
843 }
844 }
845 }
846
847 /// Check if a process is still running
848 ///
849 /// Uses `try_wait()` to check if the process has exited without blocking.
850 ///
851 /// # Arguments
852 ///
853 /// * `run_id` - The run ID to check
854 ///
855 /// # Returns
856 ///
857 /// `Ok(true)` if the process is still running,
858 /// `Ok(false)` if it has exited or doesn't exist.
859 ///
860 /// # Errors
861 ///
862 /// Returns an error if lock acquisition fails.
863 #[allow(dead_code)]
864 pub async fn is_process_running(&self, run_id: i64) -> Result<bool, String> {
865 let processes = self.processes.lock().await;
866
867 if let Some(handle) = processes.get(&run_id) {
868 let child_arc = handle.child.clone();
869 drop(processes);
870
871 let mut child_guard = child_arc.lock().map_err(|e| e.to_string())?;
872 if let Some(ref mut child) = child_guard.as_mut() {
873 match child.try_wait() {
874 Ok(Some(_)) => {
875 *child_guard = None;
876 Ok(false)
877 }
878 Ok(None) => Ok(true),
879 Err(_) => {
880 *child_guard = None;
881 Ok(false)
882 }
883 }
884 } else {
885 Ok(false)
886 }
887 } else {
888 Ok(false)
889 }
890 }
891
892 /// Append output to a process's live output buffer
893 ///
894 /// Adds a line of output to the process's output buffer for later retrieval.
895 ///
896 /// # Arguments
897 ///
898 /// * `run_id` - The run ID of the process
899 /// * `output` - The output text to append
900 ///
901 /// # Returns
902 ///
903 /// Returns `Ok(())` whether or not the process exists.
904 ///
905 /// # Errors
906 ///
907 /// Returns an error if lock acquisition fails.
908 pub async fn append_live_output(&self, run_id: i64, output: &str) -> Result<(), String> {
909 let processes = self.processes.lock().await;
910 if let Some(handle) = processes.get(&run_id) {
911 let mut live_output = handle.live_output.lock().map_err(|e| e.to_string())?;
912 live_output.push_str(output);
913 live_output.push('\n');
914 }
915 Ok(())
916 }
917
918 /// Retrieve the live output buffer for a process
919 ///
920 /// Gets all captured output for the specified process.
921 ///
922 /// # Arguments
923 ///
924 /// * `run_id` - The run ID of the process
925 ///
926 /// # Returns
927 ///
928 /// The accumulated output string, or empty string if process not found.
929 ///
930 /// # Errors
931 ///
932 /// Returns an error if lock acquisition fails.
933 pub async fn get_live_output(&self, run_id: i64) -> Result<String, String> {
934 let processes = self.processes.lock().await;
935 if let Some(handle) = processes.get(&run_id) {
936 let live_output = handle.live_output.lock().map_err(|e| e.to_string())?;
937 Ok(live_output.clone())
938 } else {
939 Ok(String::new())
940 }
941 }
942
943 /// Remove all finished processes from the registry
944 ///
945 /// Checks each registered process and removes those that have exited.
946 /// Useful for periodic cleanup.
947 ///
948 /// # Returns
949 ///
950 /// Vector of run IDs that were removed.
951 ///
952 /// # Errors
953 ///
954 /// Returns an error if process checks or lock acquisition fails.
955 #[allow(dead_code)]
956 pub async fn cleanup_finished_processes(&self) -> Result<Vec<i64>, String> {
957 let mut finished_runs = Vec::new();
958
959 {
960 let processes = self.processes.lock().await;
961 let run_ids: Vec<i64> = processes.keys().cloned().collect();
962 drop(processes);
963
964 for run_id in run_ids {
965 if !self.is_process_running(run_id).await? {
966 finished_runs.push(run_id);
967 }
968 }
969 }
970
971 {
972 let mut processes = self.processes.lock().await;
973 for run_id in &finished_runs {
974 processes.remove(run_id);
975 }
976 }
977
978 Ok(finished_runs)
979 }
980}
981
982impl Default for ProcessRegistry {
983 fn default() -> Self {
984 Self::new()
985 }
986}
987
988#[cfg(test)]
989mod tests {
990 use super::*;
991
992 #[tokio::test]
993 async fn test_append_and_get_live_output() {
994 let registry = ProcessRegistry::new();
995 let run_id = registry
996 .register_claude_session(
997 "session-1".to_string(),
998 1234,
999 "/tmp/project".to_string(),
1000 "task".to_string(),
1001 "model".to_string(),
1002 Arc::new(Mutex::new(None)),
1003 )
1004 .await
1005 .unwrap();
1006
1007 registry.append_live_output(run_id, "line1").await.unwrap();
1008 registry.append_live_output(run_id, "line2").await.unwrap();
1009
1010 let output = registry.get_live_output(run_id).await.unwrap();
1011 assert_eq!(output, "line1\nline2\n");
1012 }
1013}