1use std::collections::HashMap;
8use std::fs::OpenOptions;
9use std::path::PathBuf;
10use std::process::{Child, Command};
11use std::time::Instant;
12
13use anyhow::{anyhow, Context, Result};
14
15use crate::commands::agents::{save_agents, AgentEntry};
16use crate::commands::logs;
17use crate::config::{resolve_identity, Config};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum AgentAction {
26 Implement,
28 Plan,
30}
31
32impl std::fmt::Display for AgentAction {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 match self {
35 AgentAction::Implement => write!(f, "implement"),
36 AgentAction::Plan => write!(f, "plan"),
37 }
38 }
39}
40
41pub struct AgentProcess {
43 pub unit_id: String,
44 pub unit_title: String,
45 pub action: AgentAction,
46 pub pid: u32,
47 pub started_at: Instant,
48 pub log_path: PathBuf,
49 child: Child,
50}
51
52#[derive(Debug)]
54pub struct CompletedAgent {
55 pub unit_id: String,
56 pub unit_title: String,
57 pub action: AgentAction,
58 pub success: bool,
59 pub exit_code: Option<i32>,
60 pub duration: std::time::Duration,
61 pub log_path: PathBuf,
62}
63
64pub struct Spawner {
68 running: HashMap<String, AgentProcess>,
69}
70
71#[must_use]
80pub fn substitute_template(template: &str, unit_id: &str) -> String {
81 template.replace("{id}", unit_id)
82}
83
84#[must_use]
88pub fn substitute_template_with_model(
89 template: &str,
90 unit_id: &str,
91 model: Option<&str>,
92) -> String {
93 let result = template.replace("{id}", unit_id);
94 match model {
95 Some(m) => result.replace("{model}", m),
96 None => result,
97 }
98}
99
100pub fn build_log_path(unit_id: &str) -> Result<PathBuf> {
105 let dir = logs::log_dir()?;
106 let safe_id = unit_id.replace('.', "_");
107 let timestamp = chrono::Utc::now().format("%Y%m%d-%H%M%S");
108 Ok(dir.join(format!("{}-{}.log", safe_id, timestamp)))
109}
110
111impl Spawner {
116 #[must_use]
118 pub fn new() -> Self {
119 Self {
120 running: HashMap::new(),
121 }
122 }
123
124 pub fn spawn(
133 &mut self,
134 unit_id: &str,
135 unit_title: &str,
136 action: AgentAction,
137 config: &Config,
138 mana_dir: Option<&std::path::Path>,
139 ) -> Result<()> {
140 if self.running.contains_key(unit_id) {
141 return Err(anyhow!("Unit {} already has a running agent", unit_id));
142 }
143
144 let (template, model) = match action {
145 AgentAction::Implement => (
146 config
147 .run
148 .as_deref()
149 .ok_or_else(|| anyhow!("No run template configured"))?,
150 config.run_model.as_deref(),
151 ),
152 AgentAction::Plan => (
153 config
154 .plan
155 .as_deref()
156 .ok_or_else(|| anyhow!("No plan template configured"))?,
157 config.plan_model.as_deref(),
158 ),
159 };
160
161 let cmd = substitute_template_with_model(template, unit_id, model);
162 let log_path = build_log_path(unit_id)?;
163
164 let agent_identity = build_agent_identity(mana_dir);
166
167 claim_unit(unit_id, agent_identity.as_deref())?;
169
170 let log_file = OpenOptions::new()
172 .create(true)
173 .append(true)
174 .open(&log_path)
175 .with_context(|| format!("Failed to open log file: {}", log_path.display()))?;
176 let log_stderr = log_file
177 .try_clone()
178 .context("Failed to clone log file handle")?;
179
180 let imp_mode = match action {
182 AgentAction::Implement => "worker",
183 AgentAction::Plan => "planner",
184 };
185
186 let child = match Command::new("sh")
188 .args(["-c", &cmd])
189 .env("IMP_MODE", imp_mode)
190 .stdout(log_file)
191 .stderr(log_stderr)
192 .spawn()
193 {
194 Ok(child) => child,
195 Err(e) => {
196 let _ = release_unit(unit_id);
198 return Err(anyhow!("Failed to spawn agent for {}: {}", unit_id, e));
199 }
200 };
201
202 let pid = child.id();
203
204 let _ = register_agent(unit_id, unit_title, action, pid, &log_path);
206
207 self.running.insert(
208 unit_id.to_string(),
209 AgentProcess {
210 unit_id: unit_id.to_string(),
211 unit_title: unit_title.to_string(),
212 action,
213 pid,
214 started_at: Instant::now(),
215 log_path,
216 child,
217 },
218 );
219
220 Ok(())
221 }
222
223 pub fn check_completed(&mut self) -> Vec<CompletedAgent> {
229 let mut completed = Vec::new();
230 let mut finished_ids = Vec::new();
231
232 for (id, proc) in self.running.iter_mut() {
233 match proc.child.try_wait() {
234 Ok(Some(status)) => {
235 let success = status.success();
236 let exit_code = status.code();
237
238 if !success {
239 let _ = release_unit(id);
240 }
241
242 let _ = finish_agent(id, exit_code);
244
245 completed.push(CompletedAgent {
246 unit_id: id.clone(),
247 unit_title: proc.unit_title.clone(),
248 action: proc.action,
249 success,
250 exit_code,
251 duration: proc.started_at.elapsed(),
252 log_path: proc.log_path.clone(),
253 });
254 finished_ids.push(id.clone());
255 }
256 Ok(None) => {} Err(e) => {
258 eprintln!("Error checking agent for {}: {}", id, e);
259 let _ = release_unit(id);
260 let _ = finish_agent(id, Some(-1));
261 completed.push(CompletedAgent {
262 unit_id: id.clone(),
263 unit_title: proc.unit_title.clone(),
264 action: proc.action,
265 success: false,
266 exit_code: Some(-1),
267 duration: proc.started_at.elapsed(),
268 log_path: proc.log_path.clone(),
269 });
270 finished_ids.push(id.clone());
271 }
272 }
273 }
274
275 for id in finished_ids {
276 self.running.remove(&id);
277 }
278
279 completed
280 }
281
282 #[must_use]
284 pub fn running_count(&self) -> usize {
285 self.running.len()
286 }
287
288 #[must_use]
290 pub fn can_spawn(&self, max_concurrent: u32) -> bool {
291 (self.running.len() as u32) < max_concurrent
292 }
293
294 #[must_use]
296 pub fn list_running(&self) -> Vec<&AgentProcess> {
297 self.running.values().collect()
298 }
299
300 pub fn kill_all(&mut self) {
302 for (id, proc) in self.running.iter_mut() {
303 let _ = proc.child.kill();
304 let _ = proc.child.wait(); let _ = release_unit(id);
306 let _ = finish_agent(id, Some(-9));
307 }
308 self.running.clear();
309 }
310
311 pub fn shutdown_all(&mut self, grace_period: std::time::Duration) {
317 if self.running.is_empty() {
318 return;
319 }
320
321 for proc in self.running.values() {
323 unsafe {
324 libc::kill(proc.pid as i32, libc::SIGTERM);
325 }
326 }
327
328 let deadline = Instant::now() + grace_period;
330 loop {
331 let mut finished_ids = Vec::new();
332 for (id, proc) in self.running.iter_mut() {
333 if let Ok(Some(_)) = proc.child.try_wait() {
334 finished_ids.push(id.clone());
335 }
336 }
337 for id in &finished_ids {
338 let _ = release_unit(id);
339 let _ = finish_agent(id, Some(-15));
340 self.running.remove(id);
341 }
342 if self.running.is_empty() || Instant::now() >= deadline {
343 break;
344 }
345 std::thread::sleep(std::time::Duration::from_millis(100));
346 }
347
348 self.kill_all();
350 }
351}
352
353impl Default for Spawner {
354 fn default() -> Self {
355 Self::new()
356 }
357}
358
359fn build_agent_identity(mana_dir: Option<&std::path::Path>) -> Option<String> {
365 let pid = std::process::id();
366 let user = mana_dir.and_then(resolve_identity);
367 match user {
368 Some(u) => Some(format!("{}/agent-{}", u, pid)),
369 None => Some(format!("agent-{}", pid)),
370 }
371}
372
373fn claim_unit(unit_id: &str, by: Option<&str>) -> Result<()> {
375 let mut args = vec!["claim", unit_id, "--force"];
376 let by_owned;
377 if let Some(identity) = by {
378 args.push("--by");
379 by_owned = identity.to_string();
380 args.push(&by_owned);
381 }
382 let status = Command::new("mana")
383 .args(&args)
384 .stdout(std::process::Stdio::null())
385 .stderr(std::process::Stdio::null())
386 .status()
387 .with_context(|| format!("Failed to run mana claim {}", unit_id))?;
388
389 if !status.success() {
390 return Err(anyhow!(
391 "mana claim {} failed with exit code {}",
392 unit_id,
393 status.code().unwrap_or(-1)
394 ));
395 }
396 Ok(())
397}
398
399fn release_unit(unit_id: &str) -> Result<()> {
401 let status = Command::new("mana")
402 .args(["claim", unit_id, "--release"])
403 .stdout(std::process::Stdio::null())
404 .stderr(std::process::Stdio::null())
405 .status()
406 .with_context(|| format!("Failed to run mana claim {} --release", unit_id))?;
407
408 if !status.success() {
409 return Err(anyhow!(
410 "mana claim {} --release failed with exit code {}",
411 unit_id,
412 status.code().unwrap_or(-1)
413 ));
414 }
415 Ok(())
416}
417
418fn register_agent(
424 unit_id: &str,
425 unit_title: &str,
426 action: AgentAction,
427 pid: u32,
428 log_path: &std::path::Path,
429) -> Result<()> {
430 let mut agents = crate::commands::agents::load_agents().unwrap_or_default();
431 agents.insert(
432 unit_id.to_string(),
433 AgentEntry {
434 pid,
435 title: unit_title.to_string(),
436 action: action.to_string(),
437 started_at: chrono::Utc::now().timestamp(),
438 log_path: Some(log_path.display().to_string()),
439 finished_at: None,
440 exit_code: None,
441 },
442 );
443 save_agents(&agents)
444}
445
446fn finish_agent(unit_id: &str, exit_code: Option<i32>) -> Result<()> {
448 let mut agents = crate::commands::agents::load_agents().unwrap_or_default();
449 if let Some(entry) = agents.get_mut(unit_id) {
450 entry.finished_at = Some(chrono::Utc::now().timestamp());
451 entry.exit_code = exit_code;
452 save_agents(&agents)?;
453 }
454 Ok(())
455}
456
457pub fn log_dir() -> Result<PathBuf> {
465 logs::log_dir()
466}
467
468pub fn find_latest_log(unit_id: &str) -> Result<Option<PathBuf>> {
470 logs::find_latest_log(unit_id)
471}
472
473pub fn find_all_logs(unit_id: &str) -> Result<Vec<PathBuf>> {
475 logs::find_all_logs(unit_id)
476}
477
478#[cfg(test)]
483mod tests {
484 use super::*;
485 use std::fs::File;
486
487 #[test]
488 fn spawner_starts_empty() {
489 let spawner = Spawner::new();
490 assert_eq!(spawner.running_count(), 0);
491 assert!(spawner.list_running().is_empty());
492 }
493
494 #[test]
495 fn can_spawn_respects_max_concurrent() {
496 let spawner = Spawner::new();
497 assert!(spawner.can_spawn(4));
498 assert!(spawner.can_spawn(1));
499 assert!(!spawner.can_spawn(0));
501 }
502
503 #[test]
504 fn can_spawn_false_when_full() {
505 let mut spawner = Spawner::new();
506
507 let log_path = std::env::temp_dir().join("test-spawner-full.log");
510 let log_file = File::create(&log_path).unwrap();
511 let log_stderr = log_file.try_clone().unwrap();
512 let child = Command::new("sleep")
513 .arg("60")
514 .stdout(log_file)
515 .stderr(log_stderr)
516 .spawn()
517 .unwrap();
518
519 spawner.running.insert(
520 "1".to_string(),
521 AgentProcess {
522 unit_id: "1".to_string(),
523 unit_title: "Test".to_string(),
524 action: AgentAction::Implement,
525 pid: child.id(),
526 started_at: Instant::now(),
527 log_path: log_path.clone(),
528 child,
529 },
530 );
531
532 assert!(!spawner.can_spawn(1));
533 assert!(spawner.can_spawn(2));
534
535 spawner.kill_all();
537 let _ = std::fs::remove_file(&log_path);
538 }
539
540 #[test]
541 fn log_dir_creates_directory() {
542 let dir = log_dir().unwrap();
543 assert!(dir.exists());
544 assert!(dir.is_dir());
545 }
546
547 #[test]
548 fn template_substitution_replaces_id() {
549 assert_eq!(
550 substitute_template("deli spawn {id}", "5.1"),
551 "deli spawn 5.1"
552 );
553 assert_eq!(
554 substitute_template(
555 "claude -p 'implement unit {id} and run mana close {id}'",
556 "42"
557 ),
558 "claude -p 'implement unit 42 and run mana close 42'"
559 );
560 }
561
562 #[test]
563 fn template_substitution_no_placeholder() {
564 assert_eq!(substitute_template("echo hello", "5.1"), "echo hello");
565 }
566
567 #[test]
568 fn template_substitution_multiple_placeholders() {
569 assert_eq!(substitute_template("{id}-{id}-{id}", "3"), "3-3-3");
570 }
571
572 #[test]
573 fn template_with_model_substitution() {
574 assert_eq!(
575 substitute_template_with_model(
576 "claude --model {model} -p 'implement {id}'",
577 "5",
578 Some("sonnet")
579 ),
580 "claude --model sonnet -p 'implement 5'"
581 );
582 }
583
584 #[test]
585 fn template_with_model_none_leaves_placeholder() {
586 assert_eq!(
587 substitute_template_with_model("claude --model {model} -p 'implement {id}'", "5", None),
588 "claude --model {model} -p 'implement 5'"
589 );
590 }
591
592 #[test]
593 fn template_with_model_no_model_placeholder() {
594 assert_eq!(
596 substitute_template_with_model("echo {id}", "5", Some("opus")),
597 "echo 5"
598 );
599 }
600
601 #[test]
602 fn find_latest_log_returns_none_for_unknown() {
603 let result = find_latest_log("nonexistent_spawner_test_99999").unwrap();
604 assert!(result.is_none());
605 }
606
607 #[test]
608 fn find_all_logs_empty_for_unknown() {
609 let result = find_all_logs("nonexistent_spawner_test_99999").unwrap();
610 assert!(result.is_empty());
611 }
612
613 #[test]
614 fn build_log_path_uses_safe_id() {
615 let path = build_log_path("5.1").unwrap();
616 let filename = path.file_name().unwrap().to_str().unwrap();
617 assert!(filename.starts_with("5_1-"), "Got: {}", filename);
618 assert!(filename.ends_with(".log"), "Got: {}", filename);
619 }
620
621 #[test]
622 fn build_log_path_simple_id() {
623 let path = build_log_path("42").unwrap();
624 let filename = path.file_name().unwrap().to_str().unwrap();
625 assert!(filename.starts_with("42-"), "Got: {}", filename);
626 assert!(filename.ends_with(".log"), "Got: {}", filename);
627 }
628
629 #[test]
630 fn check_completed_on_empty_spawner() {
631 let mut spawner = Spawner::new();
632 let completed = spawner.check_completed();
633 assert!(completed.is_empty());
634 }
635
636 #[test]
637 fn check_completed_detects_finished_process() {
638 let mut spawner = Spawner::new();
639
640 let log_path = std::env::temp_dir().join("test-spawner-finished.log");
642 let log_file = File::create(&log_path).unwrap();
643 let log_stderr = log_file.try_clone().unwrap();
644 let child = Command::new("true")
645 .stdout(log_file)
646 .stderr(log_stderr)
647 .spawn()
648 .unwrap();
649
650 spawner.running.insert(
651 "test-1".to_string(),
652 AgentProcess {
653 unit_id: "test-1".to_string(),
654 unit_title: "Instant task".to_string(),
655 action: AgentAction::Implement,
656 pid: child.id(),
657 started_at: Instant::now(),
658 log_path: log_path.clone(),
659 child,
660 },
661 );
662
663 std::thread::sleep(std::time::Duration::from_millis(100));
665
666 let completed = spawner.check_completed();
667 assert_eq!(completed.len(), 1);
668 assert_eq!(completed[0].unit_id, "test-1");
669 assert!(completed[0].success);
670 assert_eq!(completed[0].exit_code, Some(0));
671 assert_eq!(spawner.running_count(), 0);
672
673 let _ = std::fs::remove_file(&log_path);
674 }
675
676 #[test]
677 fn check_completed_detects_failed_process() {
678 let mut spawner = Spawner::new();
679
680 let log_path = std::env::temp_dir().join("test-spawner-failed.log");
681 let log_file = File::create(&log_path).unwrap();
682 let log_stderr = log_file.try_clone().unwrap();
683 let child = Command::new("false")
684 .stdout(log_file)
685 .stderr(log_stderr)
686 .spawn()
687 .unwrap();
688
689 spawner.running.insert(
690 "test-2".to_string(),
691 AgentProcess {
692 unit_id: "test-2".to_string(),
693 unit_title: "Failing task".to_string(),
694 action: AgentAction::Plan,
695 pid: child.id(),
696 started_at: Instant::now(),
697 log_path: log_path.clone(),
698 child,
699 },
700 );
701
702 std::thread::sleep(std::time::Duration::from_millis(100));
703
704 let completed = spawner.check_completed();
705 assert_eq!(completed.len(), 1);
706 assert_eq!(completed[0].unit_id, "test-2");
707 assert!(!completed[0].success);
708 assert_eq!(completed[0].exit_code, Some(1));
709
710 let _ = std::fs::remove_file(&log_path);
711 }
712
713 #[test]
714 fn kill_all_clears_running() {
715 let mut spawner = Spawner::new();
716
717 let log_path = std::env::temp_dir().join("test-spawner-killall.log");
718 let log_file = File::create(&log_path).unwrap();
719 let log_stderr = log_file.try_clone().unwrap();
720 let child = Command::new("sleep")
721 .arg("60")
722 .stdout(log_file)
723 .stderr(log_stderr)
724 .spawn()
725 .unwrap();
726
727 spawner.running.insert(
728 "test-3".to_string(),
729 AgentProcess {
730 unit_id: "test-3".to_string(),
731 unit_title: "Long task".to_string(),
732 action: AgentAction::Implement,
733 pid: child.id(),
734 started_at: Instant::now(),
735 log_path: log_path.clone(),
736 child,
737 },
738 );
739
740 assert_eq!(spawner.running_count(), 1);
741 spawner.kill_all();
742 assert_eq!(spawner.running_count(), 0);
743
744 let _ = std::fs::remove_file(&log_path);
745 }
746
747 #[test]
748 fn spawn_errors_without_run_template() {
749 let mut spawner = Spawner::new();
750 let config = Config {
751 project: "test".to_string(),
752 next_id: 1,
753 auto_close_parent: true,
754 run: None,
755 plan: None,
756 max_loops: 10,
757 max_concurrent: 4,
758 poll_interval: 30,
759 extends: vec![],
760 rules_file: None,
761 file_locking: false,
762 on_close: None,
763 on_fail: None,
764 post_plan: None,
765 verify_timeout: None,
766 review: None,
767 user: None,
768 user_email: None,
769 auto_commit: false,
770 commit_template: None,
771 research: None,
772 run_model: None,
773 plan_model: None,
774 review_model: None,
775 research_model: None,
776 batch_verify: false,
777 memory_reserve_mb: 0,
778 notify: None,
779 worktree: false,
780 };
781
782 let result = spawner.spawn("1", "Test", AgentAction::Implement, &config, None);
783 assert!(result.is_err());
784 let msg = result.unwrap_err().to_string();
785 assert!(msg.contains("No run template"), "Got: {}", msg);
786 }
787
788 #[test]
789 fn spawn_errors_without_plan_template() {
790 let mut spawner = Spawner::new();
791 let config = Config {
792 project: "test".to_string(),
793 next_id: 1,
794 auto_close_parent: true,
795 run: Some("echo {id}".to_string()),
796 plan: None,
797 max_loops: 10,
798 max_concurrent: 4,
799 poll_interval: 30,
800 extends: vec![],
801 rules_file: None,
802 file_locking: false,
803 on_close: None,
804 on_fail: None,
805 post_plan: None,
806 verify_timeout: None,
807 review: None,
808 user: None,
809 user_email: None,
810 auto_commit: false,
811 commit_template: None,
812 research: None,
813 run_model: None,
814 plan_model: None,
815 review_model: None,
816 research_model: None,
817 batch_verify: false,
818 memory_reserve_mb: 0,
819 notify: None,
820 worktree: false,
821 };
822
823 let result = spawner.spawn("1", "Test", AgentAction::Plan, &config, None);
824 assert!(result.is_err());
825 let msg = result.unwrap_err().to_string();
826 assert!(msg.contains("No plan template"), "Got: {}", msg);
827 }
828
829 #[test]
830 fn default_creates_empty_spawner() {
831 let spawner = Spawner::default();
832 assert_eq!(spawner.running_count(), 0);
833 }
834
835 #[test]
836 fn agent_action_display() {
837 assert_eq!(AgentAction::Implement.to_string(), "implement");
838 assert_eq!(AgentAction::Plan.to_string(), "plan");
839 }
840}