1use crate::db::MissionDB;
6use crate::types::{Slot, Task};
7use anyhow::{anyhow, Result};
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, HashSet};
10use std::path::PathBuf;
11use std::process::Stdio;
12use std::sync::{Arc, RwLock};
13use tokio::fs::OpenOptions;
14use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
15use tokio::process::{Child, Command};
16use tokio::sync::broadcast;
17use tracing::{debug, error, info, warn};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
21#[serde(rename_all = "lowercase")]
22pub enum AgentStatus {
23 Stopped,
24 Starting,
25 Idle,
26 Busy,
27 Stopping,
28}
29
30impl AgentStatus {
31 pub fn as_str(&self) -> &'static str {
32 match self {
33 AgentStatus::Stopped => "stopped",
34 AgentStatus::Starting => "starting",
35 AgentStatus::Idle => "idle",
36 AgentStatus::Busy => "busy",
37 AgentStatus::Stopping => "stopping",
38 }
39 }
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(rename_all = "camelCase")]
45pub struct AgentProcess {
46 pub slot_id: String,
47 pub role: String,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub pid: Option<u32>,
50 pub status: AgentStatus,
51 #[serde(skip_serializing_if = "Option::is_none")]
52 pub session_id: Option<String>,
53 #[serde(skip_serializing_if = "Option::is_none")]
54 pub started_at: Option<i64>,
55 #[serde(skip_serializing_if = "Option::is_none")]
56 pub current_task_id: Option<String>,
57 pub log_file: PathBuf,
58}
59
60#[derive(Debug, Clone, Default)]
62pub struct SpawnOptions {
63 pub visible: bool,
65 pub auto_restart: bool,
67}
68
69#[derive(Debug, Clone)]
71pub struct ExecuteResult {
72 pub result: String,
73 pub session_id: String,
74}
75
76#[derive(Debug, Clone)]
78pub enum ProcessEvent {
79 AgentSpawned(String),
80 AgentKilled(String),
81 AgentBusy(String),
82 AgentIdle(String),
83}
84
85pub struct ProcessManager {
89 processes: Arc<RwLock<HashMap<String, AgentProcess>>>,
90 child_processes: Arc<RwLock<HashMap<String, Child>>>,
91 db: Arc<MissionDB>,
92 logs_dir: PathBuf,
93 auto_restart_slots: Arc<RwLock<HashSet<String>>>,
94 event_tx: broadcast::Sender<ProcessEvent>,
95}
96
97impl ProcessManager {
98 pub fn new(db: Arc<MissionDB>, logs_dir: PathBuf) -> Self {
100 std::fs::create_dir_all(&logs_dir).ok();
102
103 let (event_tx, _) = broadcast::channel(100);
104
105 Self {
106 processes: Arc::new(RwLock::new(HashMap::new())),
107 child_processes: Arc::new(RwLock::new(HashMap::new())),
108 db,
109 logs_dir,
110 auto_restart_slots: Arc::new(RwLock::new(HashSet::new())),
111 event_tx,
112 }
113 }
114
115 pub fn subscribe(&self) -> broadcast::Receiver<ProcessEvent> {
117 self.event_tx.subscribe()
118 }
119
120 pub fn init_slot(&self, slot: &Slot) {
122 let log_file = self.logs_dir.join(format!("{}.log", slot.config.id));
123 let session_id = self.db.get_slot_session(&slot.config.id).ok().flatten();
124
125 let agent = AgentProcess {
126 slot_id: slot.config.id.clone(),
127 role: slot.config.role.clone(),
128 pid: None,
129 status: AgentStatus::Stopped,
130 session_id,
131 started_at: None,
132 current_task_id: None,
133 log_file,
134 };
135
136 let mut processes = self.processes.write().unwrap();
137 processes.insert(slot.config.id.clone(), agent);
138
139 debug!(slot_id = %slot.config.id, role = %slot.config.role, "Slot initialized");
140 }
141
142 pub async fn spawn(&self, slot: &Slot, options: SpawnOptions) -> Result<AgentProcess> {
144 let slot_id = &slot.config.id;
145
146 {
148 let processes = self.processes.read().unwrap();
149 let agent = processes
150 .get(slot_id)
151 .ok_or_else(|| anyhow!("Slot not initialized: {}", slot_id))?;
152
153 if agent.status != AgentStatus::Stopped {
154 return Err(anyhow!(
155 "Agent already running: {} (status: {:?})",
156 slot_id,
157 agent.status
158 ));
159 }
160 }
161
162 {
164 let mut processes = self.processes.write().unwrap();
165 if let Some(agent) = processes.get_mut(slot_id) {
166 agent.status = AgentStatus::Starting;
167 }
168 }
169
170 if options.auto_restart {
171 let mut auto_restart = self.auto_restart_slots.write().unwrap();
172 auto_restart.insert(slot_id.clone());
173 }
174
175 if options.visible {
177 self.spawn_visible(slot).await?;
178 } else {
179 self.spawn_headless(slot).await?;
180 }
181
182 let agent = {
184 let mut processes = self.processes.write().unwrap();
185 if let Some(agent) = processes.get_mut(slot_id) {
186 agent.status = AgentStatus::Idle;
187 agent.started_at = Some(chrono::Utc::now().timestamp_millis());
188 info!(slot_id = %slot_id, pid = ?agent.pid, "Agent spawned");
189 agent.clone()
190 } else {
191 return Err(anyhow!("Agent not found after spawn: {}", slot_id));
192 }
193 };
194
195 let _ = self.event_tx.send(ProcessEvent::AgentSpawned(slot_id.clone()));
196
197 Ok(agent)
198 }
199
200 async fn spawn_headless(&self, slot: &Slot) -> Result<()> {
202 debug!(slot_id = %slot.config.id, "Headless mode: ready for tasks");
205 Ok(())
206 }
207
208 async fn spawn_visible(&self, slot: &Slot) -> Result<()> {
210 let default_cwd = std::env::current_dir()
211 .map(|p| p.to_string_lossy().to_string())
212 .unwrap_or_else(|_| ".".to_string());
213 let cwd = slot.config.cwd.as_deref().unwrap_or(&default_cwd);
214
215 let script = format!(
217 r#"
218 tell application "Terminal"
219 do script "cd {} && echo 'Agent {} ready ({})' && read -p 'Press Enter to exit...'"
220 activate
221 end tell
222 "#,
223 cwd, slot.config.id, slot.config.role
224 );
225
226 let mut child = Command::new("osascript")
227 .arg("-e")
228 .arg(&script)
229 .spawn()?;
230
231 child.wait().await?;
233
234 info!(slot_id = %slot.config.id, "Visible terminal opened");
235 Ok(())
236 }
237
238 pub async fn execute_task(&self, slot: &Slot, task: &Task) -> Result<ExecuteResult> {
240 let slot_id = &slot.config.id;
241
242 let (session_id, log_file) = {
244 let mut processes = self.processes.write().unwrap();
245 let agent = processes
246 .get_mut(slot_id)
247 .ok_or_else(|| anyhow!("Slot not initialized: {}", slot_id))?;
248
249 if agent.status == AgentStatus::Stopped {
250 return Err(anyhow!("Agent not running: {}", slot_id));
251 }
252
253 if agent.status == AgentStatus::Busy {
254 return Err(anyhow!("Agent is busy: {}", slot_id));
255 }
256
257 agent.status = AgentStatus::Busy;
259 agent.current_task_id = Some(task.id.clone());
260
261 (agent.session_id.clone(), agent.log_file.clone())
262 };
263
264 let _ = self.event_tx.send(ProcessEvent::AgentBusy(slot_id.clone()));
265
266 let mut log_file_handle = OpenOptions::new()
268 .create(true)
269 .append(true)
270 .open(&log_file)
271 .await?;
272
273 let now = chrono::Utc::now().to_rfc3339();
274 log_file_handle
275 .write_all(format!("\n--- Task {} started at {} ---\n", task.id, now).as_bytes())
276 .await?;
277 log_file_handle
278 .write_all(format!("Prompt: {}\n\n", task.prompt).as_bytes())
279 .await?;
280
281 let result = self
283 .run_claude_command(slot, task, session_id.as_deref(), &mut log_file_handle)
284 .await;
285
286 {
288 let mut processes = self.processes.write().unwrap();
289 if let Some(agent) = processes.get_mut(slot_id) {
290 agent.status = AgentStatus::Idle;
291 agent.current_task_id = None;
292
293 if let Ok(ref res) = result {
294 agent.session_id = Some(res.session_id.clone());
295 let _ = self.db.set_slot_session(slot_id, &res.session_id);
296 }
297 }
298 }
299
300 let _ = self.event_tx.send(ProcessEvent::AgentIdle(slot_id.clone()));
301
302 if let Ok(ref res) = result {
304 log_file_handle
305 .write_all(format!("\n--- Task {} completed ---\n", task.id).as_bytes())
306 .await?;
307 let preview = if res.result.len() > 500 {
308 format!("{}...", &res.result[..500])
309 } else {
310 res.result.clone()
311 };
312 log_file_handle
313 .write_all(format!("Result: {}\n", preview).as_bytes())
314 .await?;
315 }
316
317 result
318 }
319
320 async fn run_claude_command(
322 &self,
323 slot: &Slot,
324 task: &Task,
325 session_id: Option<&str>,
326 log_file: &mut tokio::fs::File,
327 ) -> Result<ExecuteResult> {
328 let slot_id = &slot.config.id;
329 let cwd = slot
330 .config
331 .cwd
332 .as_deref()
333 .map(PathBuf::from)
334 .unwrap_or_else(|| std::env::current_dir().unwrap());
335
336 let mut args = vec![
337 "-p".to_string(),
338 task.prompt.clone(),
339 "--output-format".to_string(),
340 "stream-json".to_string(),
341 "--verbose".to_string(),
342 ];
343
344 if let Some(sid) = session_id {
345 args.push("--resume".to_string());
346 args.push(sid.to_string());
347 }
348
349 debug!(slot_id = %slot_id, task_id = %task.id, cwd = ?cwd, "Running claude command");
350
351 let child = Command::new("claude")
352 .args(&args)
353 .current_dir(&cwd)
354 .stdin(Stdio::piped())
355 .stdout(Stdio::piped())
356 .stderr(Stdio::piped())
357 .spawn()?;
358
359 let pid = child.id();
360
361 {
363 let mut children = self.child_processes.write().unwrap();
364 children.insert(slot_id.clone(), child);
365 }
366
367 let child = {
369 let mut children = self.child_processes.write().unwrap();
370 children.remove(slot_id)
371 };
372
373 let mut child = child.ok_or_else(|| anyhow!("Child process not found"))?;
374
375 let stdout = child.stdout.take().ok_or_else(|| anyhow!("No stdout"))?;
376 let stderr = child.stderr.take().ok_or_else(|| anyhow!("No stderr"))?;
377
378 {
380 let mut processes = self.processes.write().unwrap();
381 if let Some(agent) = processes.get_mut(slot_id) {
382 agent.pid = pid;
383 }
384 }
385
386 let mut stdout_reader = BufReader::new(stdout).lines();
388 let mut result_text = String::new();
389 let mut final_session_id = session_id.map(String::from).unwrap_or_default();
390
391 let slot_id_clone = slot_id.clone();
393 let stderr_handle = tokio::spawn(async move {
394 let mut stderr_reader = BufReader::new(stderr).lines();
395 while let Ok(Some(line)) = stderr_reader.next_line().await {
396 warn!(slot_id = %slot_id_clone, "[stderr] {}", line);
397 }
398 });
399
400 while let Ok(Some(line)) = stdout_reader.next_line().await {
402 log_file.write_all(format!("{}\n", line).as_bytes()).await?;
403
404 if let Ok(event) = serde_json::from_str::<serde_json::Value>(&line) {
406 if let Some(event_type) = event.get("type").and_then(|t| t.as_str()) {
407 match event_type {
408 "result" => {
409 if let Some(r) = event.get("result").and_then(|r| r.as_str()) {
410 result_text = r.to_string();
411 }
412 if let Some(sid) = event.get("session_id").and_then(|s| s.as_str()) {
413 final_session_id = sid.to_string();
414 }
415 }
416 "system" => {
417 if let Some(sid) = event.get("session_id").and_then(|s| s.as_str()) {
418 final_session_id = sid.to_string();
419 }
420 }
421 _ => {}
422 }
423 }
424 }
425 }
426
427 let status = child.wait().await?;
429
430 let _ = stderr_handle.await;
432
433 {
435 let mut processes = self.processes.write().unwrap();
436 if let Some(agent) = processes.get_mut(slot_id) {
437 agent.pid = None;
438 }
439 }
440
441 if status.success() {
442 Ok(ExecuteResult {
443 result: result_text,
444 session_id: final_session_id,
445 })
446 } else {
447 Err(anyhow!(
448 "Claude exited with code {}",
449 status.code().unwrap_or(-1)
450 ))
451 }
452 }
453
454 pub async fn kill(&self, slot_id: &str) -> Result<()> {
456 {
458 let processes = self.processes.read().unwrap();
459 let agent = processes
460 .get(slot_id)
461 .ok_or_else(|| anyhow!("Slot not found: {}", slot_id))?;
462
463 if agent.status == AgentStatus::Stopped {
464 return Ok(());
465 }
466 }
467
468 {
470 let mut processes = self.processes.write().unwrap();
471 if let Some(agent) = processes.get_mut(slot_id) {
472 agent.status = AgentStatus::Stopping;
473 }
474 }
475
476 {
478 let mut auto_restart = self.auto_restart_slots.write().unwrap();
479 auto_restart.remove(slot_id);
480 }
481
482 {
484 let mut children = self.child_processes.write().unwrap();
485 if let Some(mut child) = children.remove(slot_id) {
486 child.kill().await.ok();
487 }
488 }
489
490 {
492 let mut processes = self.processes.write().unwrap();
493 if let Some(agent) = processes.get_mut(slot_id) {
494 agent.status = AgentStatus::Stopped;
495 agent.pid = None;
496 agent.current_task_id = None;
497 }
498 }
499
500 info!(slot_id = %slot_id, "Agent killed");
501 let _ = self.event_tx.send(ProcessEvent::AgentKilled(slot_id.to_string()));
502
503 Ok(())
504 }
505
506 pub async fn restart(&self, slot: &Slot, options: SpawnOptions) -> Result<AgentProcess> {
508 self.kill(&slot.config.id).await?;
509 self.spawn(slot, options).await
510 }
511
512 pub fn get_status(&self, slot_id: &str) -> Option<AgentProcess> {
514 let processes = self.processes.read().unwrap();
515 processes.get(slot_id).cloned()
516 }
517
518 pub fn get_all_status(&self) -> Vec<AgentProcess> {
520 let processes = self.processes.read().unwrap();
521 processes.values().cloned().collect()
522 }
523
524 pub fn is_available(&self, slot_id: &str) -> bool {
526 let processes = self.processes.read().unwrap();
527 processes
528 .get(slot_id)
529 .map(|a| a.status == AgentStatus::Idle)
530 .unwrap_or(false)
531 }
532
533 pub fn is_running(&self, slot_id: &str) -> bool {
535 let processes = self.processes.read().unwrap();
536 processes
537 .get(slot_id)
538 .map(|a| a.status == AgentStatus::Idle || a.status == AgentStatus::Busy)
539 .unwrap_or(false)
540 }
541
542 pub fn get_stats(&self) -> ProcessStats {
544 let processes = self.processes.read().unwrap();
545
546 let mut stopped = 0;
547 let mut idle = 0;
548 let mut busy = 0;
549
550 for agent in processes.values() {
551 match agent.status {
552 AgentStatus::Stopped | AgentStatus::Stopping => stopped += 1,
553 AgentStatus::Idle | AgentStatus::Starting => idle += 1,
554 AgentStatus::Busy => busy += 1,
555 }
556 }
557
558 ProcessStats {
559 total: processes.len(),
560 stopped,
561 idle,
562 busy,
563 }
564 }
565
566 pub async fn shutdown(&self) {
568 info!("Shutting down all agents...");
569
570 let slot_ids: Vec<String> = {
571 let processes = self.processes.read().unwrap();
572 processes.keys().cloned().collect()
573 };
574
575 for slot_id in slot_ids {
576 if let Err(e) = self.kill(&slot_id).await {
577 error!(slot_id = %slot_id, error = %e, "Error killing agent");
578 }
579 }
580
581 info!("All agents shut down");
582 }
583}
584
585#[derive(Debug, Clone)]
587pub struct ProcessStats {
588 pub total: usize,
589 pub stopped: usize,
590 pub idle: usize,
591 pub busy: usize,
592}
593
594#[cfg(test)]
595mod tests {
596 use super::*;
597 use crate::types::SlotConfig;
598 use tempfile::tempdir;
599
600 fn create_test_db() -> Arc<MissionDB> {
601 let dir = tempdir().unwrap();
602 let db_path = dir.path().join("test.db");
603 Arc::new(MissionDB::open(db_path).unwrap())
604 }
605
606 fn create_test_slot() -> Slot {
607 Slot {
608 config: SlotConfig {
609 id: "test-slot".to_string(),
610 role: "worker".to_string(),
611 description: "Test slot".to_string(),
612 cwd: None,
613 mcp_config: None,
614 auto_start: None,
615 },
616 session_id: None,
617 }
618 }
619
620 #[tokio::test]
621 async fn test_init_slot() {
622 let db = create_test_db();
623 let logs_dir = tempdir().unwrap().path().to_path_buf();
624 let manager = ProcessManager::new(db, logs_dir);
625
626 let slot = create_test_slot();
627 manager.init_slot(&slot);
628
629 let status = manager.get_status("test-slot").unwrap();
630 assert_eq!(status.slot_id, "test-slot");
631 assert_eq!(status.status, AgentStatus::Stopped);
632 }
633
634 #[tokio::test]
635 async fn test_get_stats() {
636 let db = create_test_db();
637 let logs_dir = tempdir().unwrap().path().to_path_buf();
638 let manager = ProcessManager::new(db, logs_dir);
639
640 let slot1 = Slot {
641 config: SlotConfig {
642 id: "slot-1".to_string(),
643 role: "worker".to_string(),
644 description: "Slot 1".to_string(),
645 cwd: None,
646 mcp_config: None,
647 auto_start: None,
648 },
649 session_id: None,
650 };
651
652 let slot2 = Slot {
653 config: SlotConfig {
654 id: "slot-2".to_string(),
655 role: "worker".to_string(),
656 description: "Slot 2".to_string(),
657 cwd: None,
658 mcp_config: None,
659 auto_start: None,
660 },
661 session_id: None,
662 };
663
664 manager.init_slot(&slot1);
665 manager.init_slot(&slot2);
666
667 let stats = manager.get_stats();
668 assert_eq!(stats.total, 2);
669 assert_eq!(stats.stopped, 2);
670 assert_eq!(stats.idle, 0);
671 assert_eq!(stats.busy, 0);
672 }
673}