Skip to main content

missiond_core/core/
process_manager.rs

1//! Process Manager - Claude Code agent process lifecycle management
2//!
3//! Manages the lifecycle of Claude Code agent child processes.
4
5use crate::db::MissionDB;
6use crate::types::{Slot, Task};
7use anyhow::{anyhow, Result};
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, HashSet};
10use std::path::PathBuf;
11use std::process::Stdio;
12use std::sync::{Arc, RwLock};
13use tokio::fs::OpenOptions;
14use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
15use tokio::process::{Child, Command};
16use tokio::sync::broadcast;
17use tracing::{debug, error, info, warn};
18
19/// Agent status
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
21#[serde(rename_all = "lowercase")]
22pub enum AgentStatus {
23    Stopped,
24    Starting,
25    Idle,
26    Busy,
27    Stopping,
28}
29
30impl AgentStatus {
31    pub fn as_str(&self) -> &'static str {
32        match self {
33            AgentStatus::Stopped => "stopped",
34            AgentStatus::Starting => "starting",
35            AgentStatus::Idle => "idle",
36            AgentStatus::Busy => "busy",
37            AgentStatus::Stopping => "stopping",
38        }
39    }
40}
41
42/// Agent process information
43#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(rename_all = "camelCase")]
45pub struct AgentProcess {
46    pub slot_id: String,
47    pub role: String,
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub pid: Option<u32>,
50    pub status: AgentStatus,
51    #[serde(skip_serializing_if = "Option::is_none")]
52    pub session_id: Option<String>,
53    #[serde(skip_serializing_if = "Option::is_none")]
54    pub started_at: Option<i64>,
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pub current_task_id: Option<String>,
57    pub log_file: PathBuf,
58}
59
60/// Options for spawning an agent
61#[derive(Debug, Clone, Default)]
62pub struct SpawnOptions {
63    /// Open a visible terminal window
64    pub visible: bool,
65    /// Auto-restart on crash
66    pub auto_restart: bool,
67}
68
69/// Result of executing a task
70#[derive(Debug, Clone)]
71pub struct ExecuteResult {
72    pub result: String,
73    pub session_id: String,
74}
75
76/// Process Manager event
77#[derive(Debug, Clone)]
78pub enum ProcessEvent {
79    AgentSpawned(String),
80    AgentKilled(String),
81    AgentBusy(String),
82    AgentIdle(String),
83}
84
85/// Process Manager
86///
87/// Manages Claude Code Agent child process lifecycle
88pub struct ProcessManager {
89    processes: Arc<RwLock<HashMap<String, AgentProcess>>>,
90    child_processes: Arc<RwLock<HashMap<String, Child>>>,
91    db: Arc<MissionDB>,
92    logs_dir: PathBuf,
93    auto_restart_slots: Arc<RwLock<HashSet<String>>>,
94    event_tx: broadcast::Sender<ProcessEvent>,
95}
96
97impl ProcessManager {
98    /// Create a new ProcessManager
99    pub fn new(db: Arc<MissionDB>, logs_dir: PathBuf) -> Self {
100        // Ensure logs directory exists
101        std::fs::create_dir_all(&logs_dir).ok();
102
103        let (event_tx, _) = broadcast::channel(100);
104
105        Self {
106            processes: Arc::new(RwLock::new(HashMap::new())),
107            child_processes: Arc::new(RwLock::new(HashMap::new())),
108            db,
109            logs_dir,
110            auto_restart_slots: Arc::new(RwLock::new(HashSet::new())),
111            event_tx,
112        }
113    }
114
115    /// Get event receiver
116    pub fn subscribe(&self) -> broadcast::Receiver<ProcessEvent> {
117        self.event_tx.subscribe()
118    }
119
120    /// Initialize a slot's process state
121    pub fn init_slot(&self, slot: &Slot) {
122        let log_file = self.logs_dir.join(format!("{}.log", slot.config.id));
123        let session_id = self.db.get_slot_session(&slot.config.id).ok().flatten();
124
125        let agent = AgentProcess {
126            slot_id: slot.config.id.clone(),
127            role: slot.config.role.clone(),
128            pid: None,
129            status: AgentStatus::Stopped,
130            session_id,
131            started_at: None,
132            current_task_id: None,
133            log_file,
134        };
135
136        let mut processes = self.processes.write().unwrap();
137        processes.insert(slot.config.id.clone(), agent);
138
139        debug!(slot_id = %slot.config.id, role = %slot.config.role, "Slot initialized");
140    }
141
142    /// Spawn an agent process
143    pub async fn spawn(&self, slot: &Slot, options: SpawnOptions) -> Result<AgentProcess> {
144        let slot_id = &slot.config.id;
145
146        // Get current agent state
147        {
148            let processes = self.processes.read().unwrap();
149            let agent = processes
150                .get(slot_id)
151                .ok_or_else(|| anyhow!("Slot not initialized: {}", slot_id))?;
152
153            if agent.status != AgentStatus::Stopped {
154                return Err(anyhow!(
155                    "Agent already running: {} (status: {:?})",
156                    slot_id,
157                    agent.status
158                ));
159            }
160        }
161
162        // Update to starting
163        {
164            let mut processes = self.processes.write().unwrap();
165            if let Some(agent) = processes.get_mut(slot_id) {
166                agent.status = AgentStatus::Starting;
167            }
168        }
169
170        if options.auto_restart {
171            let mut auto_restart = self.auto_restart_slots.write().unwrap();
172            auto_restart.insert(slot_id.clone());
173        }
174
175        // Spawn based on mode
176        if options.visible {
177            self.spawn_visible(slot).await?;
178        } else {
179            self.spawn_headless(slot).await?;
180        }
181
182        // Update to idle
183        let agent = {
184            let mut processes = self.processes.write().unwrap();
185            if let Some(agent) = processes.get_mut(slot_id) {
186                agent.status = AgentStatus::Idle;
187                agent.started_at = Some(chrono::Utc::now().timestamp_millis());
188                info!(slot_id = %slot_id, pid = ?agent.pid, "Agent spawned");
189                agent.clone()
190            } else {
191                return Err(anyhow!("Agent not found after spawn: {}", slot_id));
192            }
193        };
194
195        let _ = self.event_tx.send(ProcessEvent::AgentSpawned(slot_id.clone()));
196
197        Ok(agent)
198    }
199
200    /// Spawn in headless mode
201    async fn spawn_headless(&self, slot: &Slot) -> Result<()> {
202        // In headless mode, we don't start a persistent process.
203        // Instead, we run `claude -p` on demand when executing tasks.
204        debug!(slot_id = %slot.config.id, "Headless mode: ready for tasks");
205        Ok(())
206    }
207
208    /// Spawn in visible mode (open terminal window)
209    async fn spawn_visible(&self, slot: &Slot) -> Result<()> {
210        let default_cwd = std::env::current_dir()
211            .map(|p| p.to_string_lossy().to_string())
212            .unwrap_or_else(|_| ".".to_string());
213        let cwd = slot.config.cwd.as_deref().unwrap_or(&default_cwd);
214
215        // macOS: Use osascript to open Terminal
216        let script = format!(
217            r#"
218            tell application "Terminal"
219                do script "cd {} && echo 'Agent {} ready ({})' && read -p 'Press Enter to exit...'"
220                activate
221            end tell
222        "#,
223            cwd, slot.config.id, slot.config.role
224        );
225
226        let mut child = Command::new("osascript")
227            .arg("-e")
228            .arg(&script)
229            .spawn()?;
230
231        // Detach the process
232        child.wait().await?;
233
234        info!(slot_id = %slot.config.id, "Visible terminal opened");
235        Ok(())
236    }
237
238    /// Execute a task
239    pub async fn execute_task(&self, slot: &Slot, task: &Task) -> Result<ExecuteResult> {
240        let slot_id = &slot.config.id;
241
242        // Check agent state
243        let (session_id, log_file) = {
244            let mut processes = self.processes.write().unwrap();
245            let agent = processes
246                .get_mut(slot_id)
247                .ok_or_else(|| anyhow!("Slot not initialized: {}", slot_id))?;
248
249            if agent.status == AgentStatus::Stopped {
250                return Err(anyhow!("Agent not running: {}", slot_id));
251            }
252
253            if agent.status == AgentStatus::Busy {
254                return Err(anyhow!("Agent is busy: {}", slot_id));
255            }
256
257            // Mark as busy
258            agent.status = AgentStatus::Busy;
259            agent.current_task_id = Some(task.id.clone());
260
261            (agent.session_id.clone(), agent.log_file.clone())
262        };
263
264        let _ = self.event_tx.send(ProcessEvent::AgentBusy(slot_id.clone()));
265
266        // Open log file
267        let mut log_file_handle = OpenOptions::new()
268            .create(true)
269            .append(true)
270            .open(&log_file)
271            .await?;
272
273        let now = chrono::Utc::now().to_rfc3339();
274        log_file_handle
275            .write_all(format!("\n--- Task {} started at {} ---\n", task.id, now).as_bytes())
276            .await?;
277        log_file_handle
278            .write_all(format!("Prompt: {}\n\n", task.prompt).as_bytes())
279            .await?;
280
281        // Run claude command
282        let result = self
283            .run_claude_command(slot, task, session_id.as_deref(), &mut log_file_handle)
284            .await;
285
286        // Update agent state
287        {
288            let mut processes = self.processes.write().unwrap();
289            if let Some(agent) = processes.get_mut(slot_id) {
290                agent.status = AgentStatus::Idle;
291                agent.current_task_id = None;
292
293                if let Ok(ref res) = result {
294                    agent.session_id = Some(res.session_id.clone());
295                    let _ = self.db.set_slot_session(slot_id, &res.session_id);
296                }
297            }
298        }
299
300        let _ = self.event_tx.send(ProcessEvent::AgentIdle(slot_id.clone()));
301
302        // Write completion log
303        if let Ok(ref res) = result {
304            log_file_handle
305                .write_all(format!("\n--- Task {} completed ---\n", task.id).as_bytes())
306                .await?;
307            let preview = if res.result.len() > 500 {
308                format!("{}...", &res.result[..500])
309            } else {
310                res.result.clone()
311            };
312            log_file_handle
313                .write_all(format!("Result: {}\n", preview).as_bytes())
314                .await?;
315        }
316
317        result
318    }
319
320    /// Run the Claude command
321    async fn run_claude_command(
322        &self,
323        slot: &Slot,
324        task: &Task,
325        session_id: Option<&str>,
326        log_file: &mut tokio::fs::File,
327    ) -> Result<ExecuteResult> {
328        let slot_id = &slot.config.id;
329        let cwd = slot
330            .config
331            .cwd
332            .as_deref()
333            .map(PathBuf::from)
334            .unwrap_or_else(|| std::env::current_dir().unwrap());
335
336        let mut args = vec![
337            "-p".to_string(),
338            task.prompt.clone(),
339            "--output-format".to_string(),
340            "stream-json".to_string(),
341            "--verbose".to_string(),
342        ];
343
344        if let Some(sid) = session_id {
345            args.push("--resume".to_string());
346            args.push(sid.to_string());
347        }
348
349        debug!(slot_id = %slot_id, task_id = %task.id, cwd = ?cwd, "Running claude command");
350
351        let child = Command::new("claude")
352            .args(&args)
353            .current_dir(&cwd)
354            .stdin(Stdio::piped())
355            .stdout(Stdio::piped())
356            .stderr(Stdio::piped())
357            .spawn()?;
358
359        let pid = child.id();
360
361        // Store child process
362        {
363            let mut children = self.child_processes.write().unwrap();
364            children.insert(slot_id.clone(), child);
365        }
366
367        // Take references to stdout/stderr
368        let child = {
369            let mut children = self.child_processes.write().unwrap();
370            children.remove(slot_id)
371        };
372
373        let mut child = child.ok_or_else(|| anyhow!("Child process not found"))?;
374
375        let stdout = child.stdout.take().ok_or_else(|| anyhow!("No stdout"))?;
376        let stderr = child.stderr.take().ok_or_else(|| anyhow!("No stderr"))?;
377
378        // Update PID
379        {
380            let mut processes = self.processes.write().unwrap();
381            if let Some(agent) = processes.get_mut(slot_id) {
382                agent.pid = pid;
383            }
384        }
385
386        // Read stdout
387        let mut stdout_reader = BufReader::new(stdout).lines();
388        let mut result_text = String::new();
389        let mut final_session_id = session_id.map(String::from).unwrap_or_default();
390
391        // Spawn stderr reader
392        let slot_id_clone = slot_id.clone();
393        let stderr_handle = tokio::spawn(async move {
394            let mut stderr_reader = BufReader::new(stderr).lines();
395            while let Ok(Some(line)) = stderr_reader.next_line().await {
396                warn!(slot_id = %slot_id_clone, "[stderr] {}", line);
397            }
398        });
399
400        // Read stdout lines
401        while let Ok(Some(line)) = stdout_reader.next_line().await {
402            log_file.write_all(format!("{}\n", line).as_bytes()).await?;
403
404            // Parse stream-json events
405            if let Ok(event) = serde_json::from_str::<serde_json::Value>(&line) {
406                if let Some(event_type) = event.get("type").and_then(|t| t.as_str()) {
407                    match event_type {
408                        "result" => {
409                            if let Some(r) = event.get("result").and_then(|r| r.as_str()) {
410                                result_text = r.to_string();
411                            }
412                            if let Some(sid) = event.get("session_id").and_then(|s| s.as_str()) {
413                                final_session_id = sid.to_string();
414                            }
415                        }
416                        "system" => {
417                            if let Some(sid) = event.get("session_id").and_then(|s| s.as_str()) {
418                                final_session_id = sid.to_string();
419                            }
420                        }
421                        _ => {}
422                    }
423                }
424            }
425        }
426
427        // Wait for process
428        let status = child.wait().await?;
429
430        // Wait for stderr reader
431        let _ = stderr_handle.await;
432
433        // Clear PID
434        {
435            let mut processes = self.processes.write().unwrap();
436            if let Some(agent) = processes.get_mut(slot_id) {
437                agent.pid = None;
438            }
439        }
440
441        if status.success() {
442            Ok(ExecuteResult {
443                result: result_text,
444                session_id: final_session_id,
445            })
446        } else {
447            Err(anyhow!(
448                "Claude exited with code {}",
449                status.code().unwrap_or(-1)
450            ))
451        }
452    }
453
454    /// Kill an agent
455    pub async fn kill(&self, slot_id: &str) -> Result<()> {
456        // Check if agent exists
457        {
458            let processes = self.processes.read().unwrap();
459            let agent = processes
460                .get(slot_id)
461                .ok_or_else(|| anyhow!("Slot not found: {}", slot_id))?;
462
463            if agent.status == AgentStatus::Stopped {
464                return Ok(());
465            }
466        }
467
468        // Update to stopping
469        {
470            let mut processes = self.processes.write().unwrap();
471            if let Some(agent) = processes.get_mut(slot_id) {
472                agent.status = AgentStatus::Stopping;
473            }
474        }
475
476        // Remove from auto-restart
477        {
478            let mut auto_restart = self.auto_restart_slots.write().unwrap();
479            auto_restart.remove(slot_id);
480        }
481
482        // Kill child process if running
483        {
484            let mut children = self.child_processes.write().unwrap();
485            if let Some(mut child) = children.remove(slot_id) {
486                child.kill().await.ok();
487            }
488        }
489
490        // Update to stopped
491        {
492            let mut processes = self.processes.write().unwrap();
493            if let Some(agent) = processes.get_mut(slot_id) {
494                agent.status = AgentStatus::Stopped;
495                agent.pid = None;
496                agent.current_task_id = None;
497            }
498        }
499
500        info!(slot_id = %slot_id, "Agent killed");
501        let _ = self.event_tx.send(ProcessEvent::AgentKilled(slot_id.to_string()));
502
503        Ok(())
504    }
505
506    /// Restart an agent
507    pub async fn restart(&self, slot: &Slot, options: SpawnOptions) -> Result<AgentProcess> {
508        self.kill(&slot.config.id).await?;
509        self.spawn(slot, options).await
510    }
511
512    /// Get agent status
513    pub fn get_status(&self, slot_id: &str) -> Option<AgentProcess> {
514        let processes = self.processes.read().unwrap();
515        processes.get(slot_id).cloned()
516    }
517
518    /// Get all agent statuses
519    pub fn get_all_status(&self) -> Vec<AgentProcess> {
520        let processes = self.processes.read().unwrap();
521        processes.values().cloned().collect()
522    }
523
524    /// Check if agent is available (idle)
525    pub fn is_available(&self, slot_id: &str) -> bool {
526        let processes = self.processes.read().unwrap();
527        processes
528            .get(slot_id)
529            .map(|a| a.status == AgentStatus::Idle)
530            .unwrap_or(false)
531    }
532
533    /// Check if agent is running
534    pub fn is_running(&self, slot_id: &str) -> bool {
535        let processes = self.processes.read().unwrap();
536        processes
537            .get(slot_id)
538            .map(|a| a.status == AgentStatus::Idle || a.status == AgentStatus::Busy)
539            .unwrap_or(false)
540    }
541
542    /// Get statistics
543    pub fn get_stats(&self) -> ProcessStats {
544        let processes = self.processes.read().unwrap();
545
546        let mut stopped = 0;
547        let mut idle = 0;
548        let mut busy = 0;
549
550        for agent in processes.values() {
551            match agent.status {
552                AgentStatus::Stopped | AgentStatus::Stopping => stopped += 1,
553                AgentStatus::Idle | AgentStatus::Starting => idle += 1,
554                AgentStatus::Busy => busy += 1,
555            }
556        }
557
558        ProcessStats {
559            total: processes.len(),
560            stopped,
561            idle,
562            busy,
563        }
564    }
565
566    /// Shutdown all agents
567    pub async fn shutdown(&self) {
568        info!("Shutting down all agents...");
569
570        let slot_ids: Vec<String> = {
571            let processes = self.processes.read().unwrap();
572            processes.keys().cloned().collect()
573        };
574
575        for slot_id in slot_ids {
576            if let Err(e) = self.kill(&slot_id).await {
577                error!(slot_id = %slot_id, error = %e, "Error killing agent");
578            }
579        }
580
581        info!("All agents shut down");
582    }
583}
584
585/// Process statistics
586#[derive(Debug, Clone)]
587pub struct ProcessStats {
588    pub total: usize,
589    pub stopped: usize,
590    pub idle: usize,
591    pub busy: usize,
592}
593
594#[cfg(test)]
595mod tests {
596    use super::*;
597    use crate::types::SlotConfig;
598    use tempfile::tempdir;
599
600    fn create_test_db() -> Arc<MissionDB> {
601        let dir = tempdir().unwrap();
602        let db_path = dir.path().join("test.db");
603        Arc::new(MissionDB::open(db_path).unwrap())
604    }
605
606    fn create_test_slot() -> Slot {
607        Slot {
608            config: SlotConfig {
609                id: "test-slot".to_string(),
610                role: "worker".to_string(),
611                description: "Test slot".to_string(),
612                cwd: None,
613                mcp_config: None,
614                auto_start: None,
615            },
616            session_id: None,
617        }
618    }
619
620    #[tokio::test]
621    async fn test_init_slot() {
622        let db = create_test_db();
623        let logs_dir = tempdir().unwrap().path().to_path_buf();
624        let manager = ProcessManager::new(db, logs_dir);
625
626        let slot = create_test_slot();
627        manager.init_slot(&slot);
628
629        let status = manager.get_status("test-slot").unwrap();
630        assert_eq!(status.slot_id, "test-slot");
631        assert_eq!(status.status, AgentStatus::Stopped);
632    }
633
634    #[tokio::test]
635    async fn test_get_stats() {
636        let db = create_test_db();
637        let logs_dir = tempdir().unwrap().path().to_path_buf();
638        let manager = ProcessManager::new(db, logs_dir);
639
640        let slot1 = Slot {
641            config: SlotConfig {
642                id: "slot-1".to_string(),
643                role: "worker".to_string(),
644                description: "Slot 1".to_string(),
645                cwd: None,
646                mcp_config: None,
647                auto_start: None,
648            },
649            session_id: None,
650        };
651
652        let slot2 = Slot {
653            config: SlotConfig {
654                id: "slot-2".to_string(),
655                role: "worker".to_string(),
656                description: "Slot 2".to_string(),
657                cwd: None,
658                mcp_config: None,
659                auto_start: None,
660            },
661            session_id: None,
662        };
663
664        manager.init_slot(&slot1);
665        manager.init_slot(&slot2);
666
667        let stats = manager.get_stats();
668        assert_eq!(stats.total, 2);
669        assert_eq!(stats.stopped, 2);
670        assert_eq!(stats.idle, 0);
671        assert_eq!(stats.busy, 0);
672    }
673}