1use std::collections::HashMap;
9use std::marker::PhantomData;
10use std::path::PathBuf;
11use std::sync::Arc;
12
13use dashmap::DashMap;
14use serde::{Deserialize, Serialize};
15use tokio_util::sync::CancellationToken;
16use tracing::{debug, info, warn};
17
18use clawft_platform::Platform;
19
20use crate::capability::AgentCapabilities;
21use crate::error::{KernelError, KernelResult};
22use crate::ipc::KernelIpc;
23use crate::process::{Pid, ProcessEntry, ProcessState, ProcessTable, ResourceUsage};
24
25#[non_exhaustive]
32#[cfg(feature = "os-patterns")]
33#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
34pub enum RestartStrategy {
35 #[default]
37 OneForOne,
38 OneForAll,
40 RestForOne,
42 Permanent,
44 Transient,
46}
47
48#[cfg(feature = "os-patterns")]
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct RestartBudget {
55 pub max_restarts: u32,
57 pub within_secs: u64,
59}
60
61#[cfg(feature = "os-patterns")]
62impl Default for RestartBudget {
63 fn default() -> Self {
64 Self {
65 max_restarts: 5,
66 within_secs: 60,
67 }
68 }
69}
70
71#[cfg(feature = "os-patterns")]
73pub struct RestartTracker {
74 pub restart_count: u32,
76 pub window_start: std::time::Instant,
78 pub last_restart: Option<std::time::Instant>,
80 pub backoff_ms: u64,
82}
83
84#[cfg(feature = "os-patterns")]
85impl RestartTracker {
86 pub fn new() -> Self {
88 Self {
89 restart_count: 0,
90 window_start: std::time::Instant::now(),
91 last_restart: None,
92 backoff_ms: 0,
93 }
94 }
95
96 pub fn next_backoff_ms(&self) -> u64 {
100 let base: u64 = 100;
101 let exponent = self.restart_count.saturating_sub(1);
102 let delay = base.saturating_mul(1u64 << exponent.min(20));
103 delay.min(30_000)
104 }
105
106 pub fn check_window(&mut self, budget: &RestartBudget) {
108 let now = std::time::Instant::now();
109 if now.duration_since(self.window_start).as_secs() > budget.within_secs {
110 self.restart_count = 0;
111 self.window_start = now;
112 }
113 }
114
115 pub fn record_restart(&mut self, budget: &RestartBudget) -> bool {
118 self.check_window(budget);
119 self.restart_count += 1;
120 self.backoff_ms = self.next_backoff_ms();
121 self.last_restart = Some(std::time::Instant::now());
122 self.restart_count <= budget.max_restarts
123 }
124
125 pub fn is_exhausted(&self, budget: &RestartBudget) -> bool {
127 let now = std::time::Instant::now();
129 if now.duration_since(self.window_start).as_secs() > budget.within_secs {
130 return false;
132 }
133 self.restart_count >= budget.max_restarts
134 }
135
136 pub fn remaining(&self, budget: &RestartBudget) -> u32 {
142 let now = std::time::Instant::now();
143 if now.duration_since(self.window_start).as_secs() > budget.within_secs {
144 return budget.max_restarts;
145 }
146 budget.max_restarts.saturating_sub(self.restart_count)
147 }
148
149 pub fn should_restart(strategy: &RestartStrategy, exit_code: i32) -> bool {
152 match strategy {
153 RestartStrategy::Permanent => false,
154 RestartStrategy::Transient => exit_code != 0,
155 _ => true,
157 }
158 }
159}
160
161#[cfg(feature = "os-patterns")]
162impl Default for RestartTracker {
163 fn default() -> Self {
164 Self::new()
165 }
166}
167
168#[non_exhaustive]
172#[cfg(feature = "os-patterns")]
173#[derive(Debug, Clone, PartialEq, Eq)]
174pub enum ResourceCheckResult {
175 Ok,
177 Warning {
179 resource: String,
180 current: u64,
181 limit: u64,
182 },
183 Exceeded {
185 resource: String,
186 current: u64,
187 limit: u64,
188 },
189}
190
191#[cfg(feature = "os-patterns")]
193pub fn check_resource_usage(
194 usage: &ResourceUsage,
195 limits: &crate::capability::ResourceLimits,
196) -> Vec<ResourceCheckResult> {
197 let mut results = Vec::new();
198
199 let checks: &[(&str, u64, u64)] = &[
200 ("memory", usage.memory_bytes, limits.max_memory_bytes),
201 ("cpu_time", usage.cpu_time_ms, limits.max_cpu_time_ms),
202 ("messages", usage.messages_sent, limits.max_messages),
203 ("tool_calls", usage.tool_calls, limits.max_tool_calls),
204 ];
205
206 for &(name, current, limit) in checks {
207 if limit == 0 {
208 continue; }
210 let ratio = current as f64 / limit as f64;
211 if ratio >= 1.0 {
212 results.push(ResourceCheckResult::Exceeded {
213 resource: name.to_owned(),
214 current,
215 limit,
216 });
217 } else if ratio >= 0.8 {
218 results.push(ResourceCheckResult::Warning {
219 resource: name.to_owned(),
220 current,
221 limit,
222 });
223 }
224 }
225
226 results
227}
228
229#[non_exhaustive]
236#[derive(Debug, Clone, Serialize, Deserialize)]
237pub enum SpawnBackend {
238 Native,
240 Wasm {
242 module: PathBuf,
244 },
245 Container {
247 image: String,
249 },
250 Tee {
252 enclave: EnclaveConfig,
254 },
255 Remote {
257 node_id: String,
259 },
260}
261
262#[derive(Debug, Clone, Serialize, Deserialize)]
267pub struct EnclaveConfig {
268 pub enclave_type: String,
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize)]
274pub struct SpawnRequest {
275 pub agent_id: String,
277
278 #[serde(default)]
281 pub capabilities: Option<AgentCapabilities>,
282
283 #[serde(default)]
285 pub parent_pid: Option<Pid>,
286
287 #[serde(default)]
289 pub env: HashMap<String, String>,
290
291 #[serde(default)]
296 pub backend: Option<SpawnBackend>,
297}
298
299#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct SpawnResult {
302 pub pid: Pid,
304
305 pub agent_id: String,
307}
308
309pub struct AgentSupervisor<P: Platform> {
327 process_table: Arc<ProcessTable>,
328 kernel_ipc: Arc<KernelIpc>,
329 default_capabilities: AgentCapabilities,
330 running_agents: Arc<DashMap<Pid, tokio::task::JoinHandle<()>>>,
331 a2a_router: Option<Arc<crate::a2a::A2ARouter>>,
332 cron_service: Option<Arc<crate::cron::CronService>>,
333 #[cfg(feature = "exochain")]
334 tree_manager: Option<Arc<crate::tree_manager::TreeManager>>,
335 #[cfg(feature = "exochain")]
336 chain_manager: Option<Arc<crate::chain::ChainManager>>,
337 #[cfg(feature = "os-patterns")]
339 monitor_registry: Arc<crate::monitor::MonitorRegistry>,
340 #[cfg(feature = "os-patterns")]
342 restart_strategy: RestartStrategy,
343 #[cfg(feature = "os-patterns")]
345 restart_budget: RestartBudget,
346 #[cfg(feature = "os-patterns")]
348 restart_trackers: Arc<DashMap<Pid, RestartTracker>>,
349 _platform: PhantomData<P>,
350}
351
352impl<P: Platform> AgentSupervisor<P> {
353 pub fn new(
362 process_table: Arc<ProcessTable>,
363 kernel_ipc: Arc<KernelIpc>,
364 default_capabilities: AgentCapabilities,
365 ) -> Self {
366 Self {
367 process_table,
368 kernel_ipc,
369 default_capabilities,
370 running_agents: Arc::new(DashMap::new()),
371 a2a_router: None,
372 cron_service: None,
373 #[cfg(feature = "exochain")]
374 tree_manager: None,
375 #[cfg(feature = "exochain")]
376 chain_manager: None,
377 #[cfg(feature = "os-patterns")]
378 monitor_registry: Arc::new(crate::monitor::MonitorRegistry::new()),
379 #[cfg(feature = "os-patterns")]
380 restart_strategy: RestartStrategy::default(),
381 #[cfg(feature = "os-patterns")]
382 restart_budget: RestartBudget::default(),
383 #[cfg(feature = "os-patterns")]
384 restart_trackers: Arc::new(DashMap::new()),
385 _platform: PhantomData,
386 }
387 }
388
389 pub fn with_a2a_router(
394 mut self,
395 a2a_router: Arc<crate::a2a::A2ARouter>,
396 cron_service: Arc<crate::cron::CronService>,
397 ) -> Self {
398 self.a2a_router = Some(a2a_router);
399 self.cron_service = Some(cron_service);
400 self
401 }
402
403 pub fn a2a_router(&self) -> Option<&Arc<crate::a2a::A2ARouter>> {
405 self.a2a_router.as_ref()
406 }
407
408 pub fn cron_service(&self) -> Option<&Arc<crate::cron::CronService>> {
410 self.cron_service.as_ref()
411 }
412
413 #[cfg(feature = "exochain")]
418 pub fn with_exochain(
419 mut self,
420 tree_manager: Option<Arc<crate::tree_manager::TreeManager>>,
421 chain_manager: Option<Arc<crate::chain::ChainManager>>,
422 ) -> Self {
423 self.tree_manager = tree_manager;
424 self.chain_manager = chain_manager;
425 self
426 }
427
428 #[cfg(feature = "os-patterns")]
430 pub fn with_restart_config(
431 mut self,
432 strategy: RestartStrategy,
433 budget: RestartBudget,
434 ) -> Self {
435 self.restart_strategy = strategy;
436 self.restart_budget = budget;
437 self
438 }
439
440 #[cfg(feature = "os-patterns")]
442 pub fn monitor_registry(&self) -> &Arc<crate::monitor::MonitorRegistry> {
443 &self.monitor_registry
444 }
445
446 #[cfg(feature = "os-patterns")]
448 pub fn restart_strategy(&self) -> &RestartStrategy {
449 &self.restart_strategy
450 }
451
452 #[cfg(feature = "os-patterns")]
454 pub fn restart_budget(&self) -> &RestartBudget {
455 &self.restart_budget
456 }
457
458 #[cfg(feature = "os-patterns")]
467 pub fn handle_exit(
468 &self,
469 pid: Pid,
470 exit_code: i32,
471 ) -> Vec<(Pid, SpawnResult)> {
472 use crate::monitor::ExitReason;
473
474 let reason = if exit_code == 0 {
475 ExitReason::Normal
476 } else {
477 ExitReason::Crash(format!("exit code {exit_code}"))
478 };
479
480 let (_link_signals, _down_signals) =
482 self.monitor_registry.process_exited(pid, &reason);
483
484 let mut restarts = Vec::new();
485
486 if !RestartTracker::should_restart(&self.restart_strategy, exit_code) {
488 debug!(pid, exit_code, strategy = ?self.restart_strategy, "not restarting per strategy");
489 return restarts;
490 }
491
492 let mut tracker = self
494 .restart_trackers
495 .entry(pid)
496 .or_default();
497
498 let within_budget = tracker.record_restart(&self.restart_budget);
499 let backoff_ms = tracker.backoff_ms;
500 drop(tracker);
501
502 if !within_budget {
503 warn!(
504 pid,
505 "restart budget exhausted for pid, escalating"
506 );
507 return restarts;
508 }
509
510 info!(
511 pid,
512 backoff_ms,
513 strategy = ?self.restart_strategy,
514 "scheduling restart after backoff"
515 );
516
517 #[cfg(feature = "exochain")]
519 if let Some(ref cm) = self.chain_manager {
520 cm.append(
521 "supervisor",
522 "agent.self_heal_restart",
523 Some(serde_json::json!({
524 "pid": pid,
525 "exit_code": exit_code,
526 "strategy": format!("{:?}", self.restart_strategy),
527 "backoff_ms": backoff_ms,
528 })),
529 );
530 }
531
532 match self.restart_strategy {
534 RestartStrategy::OneForOne | RestartStrategy::Transient => {
535 if let Ok(result) = self.restart(pid) {
537 restarts.push((pid, result));
538 }
539 }
540 RestartStrategy::OneForAll => {
541 let siblings: Vec<ProcessEntry> = self
543 .process_table
544 .list()
545 .into_iter()
546 .filter(|e| e.pid != 0 && e.pid != pid && e.state == ProcessState::Running)
547 .collect();
548
549 for sibling in &siblings {
550 if let Ok(result) = self.restart(sibling.pid) {
551 restarts.push((sibling.pid, result));
552 }
553 }
554 if let Ok(result) = self.restart(pid) {
555 restarts.push((pid, result));
556 }
557 }
558 RestartStrategy::RestForOne => {
559 let mut all: Vec<ProcessEntry> = self
561 .process_table
562 .list()
563 .into_iter()
564 .filter(|e| e.pid != 0 && e.state == ProcessState::Running)
565 .collect();
566 all.sort_by_key(|e| e.pid);
567
568 let after: Vec<Pid> = all
569 .iter()
570 .filter(|e| e.pid > pid)
571 .map(|e| e.pid)
572 .collect();
573
574 for sibling_pid in &after {
575 if let Ok(result) = self.restart(*sibling_pid) {
576 restarts.push((*sibling_pid, result));
577 }
578 }
579 if let Ok(result) = self.restart(pid) {
580 restarts.push((pid, result));
581 }
582 }
583 RestartStrategy::Permanent => {
584 }
586 }
587
588 restarts
589 }
590
591 pub fn spawn(&self, request: SpawnRequest) -> KernelResult<SpawnResult> {
603 match &request.backend {
605 None | Some(SpawnBackend::Native) => { }
606 Some(SpawnBackend::Wasm { .. }) => {
607 return Err(KernelError::BackendNotAvailable {
608 backend: "wasm".into(),
609 reason: "WASM sandbox requires K3 (Wasmtime integration)".into(),
610 });
611 }
612 Some(SpawnBackend::Container { .. }) => {
613 return Err(KernelError::BackendNotAvailable {
614 backend: "container".into(),
615 reason: "container runtime requires K4 (Docker/Podman integration)".into(),
616 });
617 }
618 Some(SpawnBackend::Tee { .. }) => {
619 return Err(KernelError::BackendNotAvailable {
620 backend: "tee".into(),
621 reason: "TEE runtime requires K6+ and hardware support".into(),
622 });
623 }
624 Some(SpawnBackend::Remote { .. }) => {
625 return Err(KernelError::BackendNotAvailable {
626 backend: "remote".into(),
627 reason: "remote delegation requires K6 (cluster networking)".into(),
628 });
629 }
630 }
631
632 let caps = request
633 .capabilities
634 .unwrap_or_else(|| self.default_capabilities.clone());
635
636 info!(
637 agent_id = %request.agent_id,
638 parent_pid = ?request.parent_pid,
639 "spawning supervised agent"
640 );
641
642 let entry = ProcessEntry {
643 pid: 0, agent_id: request.agent_id.clone(),
645 state: ProcessState::Starting,
646 capabilities: caps,
647 resource_usage: ResourceUsage::default(),
648 cancel_token: CancellationToken::new(),
649 parent_pid: request.parent_pid,
650 };
651
652 let pid = self.process_table.insert(entry)?;
653
654 debug!(pid, agent_id = %request.agent_id, "agent spawned");
655
656 Ok(SpawnResult {
657 pid,
658 agent_id: request.agent_id,
659 })
660 }
661
662 pub fn spawn_and_run<F, Fut>(
679 &self,
680 request: SpawnRequest,
681 work: F,
682 ) -> KernelResult<SpawnResult>
683 where
684 F: FnOnce(Pid, CancellationToken) -> Fut,
685 Fut: std::future::Future<Output = i32> + Send + 'static,
686 {
687 #[cfg(feature = "exochain")]
689 let parent_pid = request.parent_pid;
690
691 let result = self.spawn(request)?;
693 let pid = result.pid;
694
695 let entry = self
696 .process_table
697 .get(pid)
698 .ok_or(KernelError::ProcessNotFound { pid })?;
699 let cancel_token = entry.cancel_token.clone();
700
701 #[cfg(feature = "exochain")]
703 if let Some(ref tm) = self.tree_manager
704 && let Err(e) = tm.register_agent(&result.agent_id, pid, &entry.capabilities)
705 {
706 warn!(error = %e, pid, "failed to register agent in resource tree");
707 }
708
709 let _ = self
711 .process_table
712 .update_state(pid, ProcessState::Running);
713
714 #[cfg(feature = "exochain")]
716 if let Some(ref cm) = self.chain_manager {
717 cm.append(
718 "supervisor",
719 "agent.spawn",
720 Some(serde_json::json!({
721 "agent_id": result.agent_id,
722 "pid": pid,
723 "parent_pid": parent_pid,
724 })),
725 );
726 }
727
728 let process_table = Arc::clone(&self.process_table);
730 let running_agents = Arc::clone(&self.running_agents);
731 let agent_id = result.agent_id.clone();
732 #[cfg(feature = "exochain")]
733 let tree_manager = self.tree_manager.clone();
734 #[cfg(feature = "exochain")]
735 let chain_manager = self.chain_manager.clone();
736
737 let future = work(pid, cancel_token);
738 let handle = tokio::spawn(async move {
739 let exit_code = future.await;
740
741 let _ = process_table.update_state(pid, ProcessState::Exited(exit_code));
743
744 #[cfg(feature = "exochain")]
746 if let Some(ref tm) = tree_manager {
747 let agent_path = format!("/kernel/agents/{agent_id}");
748 let rid = exo_resource_tree::ResourceId::new(&agent_path);
749
750 let success = exit_code == 0;
753 let observation = exo_resource_tree::NodeScoring {
754 trust: if success { 0.8 } else { 0.2 },
755 performance: if success { 0.7 } else { 0.3 },
756 difficulty: 0.5,
757 reward: if success { 0.6 } else { 0.1 },
758 reliability: if success { 0.9 } else { 0.1 },
759 velocity: 0.5,
760 };
761 if let Err(e) = tm.blend_scoring(&rid, &observation, 0.3) {
763 debug!(error = %e, pid, "scoring blend skipped (node may be unregistered)");
764 }
765 }
766
767 #[cfg(feature = "exochain")]
769 if let Some(ref tm) = tree_manager
770 && let Err(e) = tm.unregister_agent(&agent_id, pid, exit_code)
771 {
772 tracing::warn!(error = %e, pid, "failed to unregister agent from tree");
773 }
774
775 #[cfg(feature = "exochain")]
777 if let Some(ref cm) = chain_manager {
778 cm.append(
779 "supervisor",
780 "agent.exit",
781 Some(serde_json::json!({
782 "agent_id": agent_id,
783 "pid": pid,
784 "exit_code": exit_code,
785 })),
786 );
787 }
788
789 running_agents.remove(&pid);
791
792 info!(pid, exit_code, agent_id = %agent_id, "agent task completed");
793 });
794
795 self.running_agents.insert(pid, handle);
796
797 info!(pid, agent_id = %result.agent_id, "agent spawned and running");
798
799 Ok(result)
800 }
801
802 pub fn stop(&self, pid: Pid, graceful: bool) -> KernelResult<()> {
816 let entry = self
817 .process_table
818 .get(pid)
819 .ok_or(KernelError::ProcessNotFound { pid })?;
820
821 if matches!(entry.state, ProcessState::Exited(_)) {
823 warn!(pid, "stop called on already-exited process");
824 return Ok(());
825 }
826
827 if graceful {
828 info!(pid, "gracefully stopping agent");
829 let _ = self.process_table.update_state(pid, ProcessState::Stopping);
833 entry.cancel_token.cancel();
834 } else {
835 info!(pid, "force stopping agent");
836 entry.cancel_token.cancel();
837 let _ = self
838 .process_table
839 .update_state(pid, ProcessState::Exited(-1));
840
841 if let Some((_, handle)) = self.running_agents.remove(&pid) {
843 handle.abort();
844 }
845
846 #[cfg(feature = "exochain")]
849 {
850 if let Some(ref tm) = self.tree_manager {
851 let _ = tm.unregister_agent(&entry.agent_id, pid, -1);
852 }
853 if let Some(ref cm) = self.chain_manager {
854 cm.append(
855 "supervisor",
856 "agent.force_stop",
857 Some(serde_json::json!({
858 "agent_id": entry.agent_id,
859 "pid": pid,
860 })),
861 );
862 }
863 }
864 }
865
866 Ok(())
867 }
868
869 pub fn restart(&self, pid: Pid) -> KernelResult<SpawnResult> {
884 let old_entry = self
885 .process_table
886 .get(pid)
887 .ok_or(KernelError::ProcessNotFound { pid })?;
888
889 info!(pid, agent_id = %old_entry.agent_id, "restarting agent");
890
891 self.stop(pid, true)?;
893
894 if !matches!(old_entry.state, ProcessState::Exited(_)) {
896 let _ = self
897 .process_table
898 .update_state(pid, ProcessState::Exited(0));
899 }
900
901 let request = SpawnRequest {
903 agent_id: old_entry.agent_id.clone(),
904 capabilities: Some(old_entry.capabilities.clone()),
905 parent_pid: Some(pid),
906 env: HashMap::new(),
907 backend: None, };
909
910 let result = self.spawn(request)?;
911
912 #[cfg(feature = "exochain")]
914 if let Some(ref cm) = self.chain_manager {
915 cm.append(
916 "supervisor",
917 "agent.restart",
918 Some(serde_json::json!({
919 "agent_id": result.agent_id,
920 "old_pid": pid,
921 "new_pid": result.pid,
922 })),
923 );
924 }
925
926 Ok(result)
927 }
928
929 pub fn inspect(&self, pid: Pid) -> KernelResult<ProcessEntry> {
939 self.process_table
940 .get(pid)
941 .ok_or(KernelError::ProcessNotFound { pid })
942 }
943
944 pub fn list_by_state(&self, state: ProcessState) -> Vec<ProcessEntry> {
946 self.process_table
947 .list()
948 .into_iter()
949 .filter(|e| e.state == state)
950 .collect()
951 }
952
953 pub fn list_agents(&self) -> Vec<ProcessEntry> {
955 self.process_table
956 .list()
957 .into_iter()
958 .filter(|e| e.pid != 0)
959 .collect()
960 }
961
962 pub fn process_table(&self) -> &Arc<ProcessTable> {
964 &self.process_table
965 }
966
967 pub fn ipc(&self) -> &Arc<KernelIpc> {
969 &self.kernel_ipc
970 }
971
972 pub fn default_capabilities(&self) -> &AgentCapabilities {
974 &self.default_capabilities
975 }
976
977 pub fn running_count(&self) -> usize {
979 self.process_table
980 .list()
981 .iter()
982 .filter(|e| e.pid != 0 && e.state == ProcessState::Running)
983 .count()
984 }
985
986 pub fn running_task_count(&self) -> usize {
988 self.running_agents.len()
989 }
990
991 pub fn abort_all(&self) {
993 for entry in self.running_agents.iter() {
994 entry.value().abort();
995 }
996 self.running_agents.clear();
997 }
998
999 pub async fn watchdog_sweep(&self) -> Vec<(Pid, i32)> {
1010 let mut reaped = Vec::new();
1011
1012 let finished_pids: Vec<Pid> = self
1014 .running_agents
1015 .iter()
1016 .filter(|entry| entry.value().is_finished())
1017 .map(|entry| *entry.key())
1018 .collect();
1019
1020 for pid in finished_pids {
1021 if let Some((_, handle)) = self.running_agents.remove(&pid) {
1022 let exit_code = match handle.await {
1024 Ok(()) => -2, Err(e) if e.is_panic() => -3, Err(_) => -2, };
1028
1029 if let Some(entry) = self.process_table.get(pid)
1031 && entry.state == ProcessState::Running
1032 {
1033 let _ = self
1034 .process_table
1035 .update_state(pid, ProcessState::Exited(exit_code));
1036
1037 #[cfg(feature = "exochain")]
1038 if let Some(ref cm) = self.chain_manager {
1039 cm.append(
1040 "watchdog",
1041 "agent.watchdog_reap",
1042 Some(serde_json::json!({
1043 "pid": pid,
1044 "exit_code": exit_code,
1045 "agent_id": entry.agent_id,
1046 })),
1047 );
1048 }
1049
1050 reaped.push((pid, exit_code));
1051 info!(pid, exit_code, agent_id = %entry.agent_id, "watchdog reaped stale agent");
1052 }
1053 }
1054 }
1055
1056 reaped
1057 }
1058
1059 pub async fn shutdown_all(&self, timeout: std::time::Duration) -> Vec<(Pid, i32)> {
1068 for entry in self.process_table.list() {
1070 if entry.pid == 0 {
1071 continue; }
1073 entry.cancel_token.cancel();
1074 }
1075
1076 let handles: Vec<(Pid, tokio::task::JoinHandle<()>)> = {
1078 let pids: Vec<Pid> = self.running_agents.iter().map(|e| *e.key()).collect();
1079 let mut collected = Vec::with_capacity(pids.len());
1080 for pid in pids {
1081 if let Some((pid, handle)) = self.running_agents.remove(&pid) {
1082 collected.push((pid, handle));
1083 }
1084 }
1085 collected
1086 };
1087
1088 if handles.is_empty() {
1089 return Vec::new();
1090 }
1091
1092 let process_table = &self.process_table;
1093
1094 let mut results = Vec::with_capacity(handles.len());
1098
1099 match tokio::time::timeout(
1100 timeout,
1101 futures::future::join_all(
1102 handles
1103 .into_iter()
1104 .map(|(pid, handle)| async move { (pid, handle.await) }),
1105 ),
1106 )
1107 .await
1108 {
1109 Ok(join_results) => {
1110 for (pid, join_result) in join_results {
1112 let exit_code = match join_result {
1113 Ok(()) => process_table
1114 .get(pid)
1115 .and_then(|e| match e.state {
1116 ProcessState::Exited(code) => Some(code),
1117 _ => None,
1118 })
1119 .unwrap_or(0),
1120 Err(e) if e.is_panic() => -3,
1121 Err(_) => -1,
1122 };
1123 results.push((pid, exit_code));
1124 }
1125 }
1126 Err(_elapsed) => {
1127 info!("shutdown timeout reached, aborting remaining agents");
1128 let remaining: Vec<Pid> =
1133 self.running_agents.iter().map(|e| *e.key()).collect();
1134 for pid in remaining {
1135 if let Some((pid, handle)) = self.running_agents.remove(&pid) {
1136 handle.abort();
1137 let _ = process_table.update_state(pid, ProcessState::Exited(-1));
1138 results.push((pid, -1));
1139 }
1140 }
1141
1142 if results.is_empty() {
1145 for entry in process_table.list() {
1146 if entry.pid != 0 && !matches!(entry.state, ProcessState::Exited(_)) {
1147 let _ = process_table.update_state(entry.pid, ProcessState::Exited(-1));
1148 results.push((entry.pid, -1));
1149 }
1150 }
1151 }
1152 }
1153 }
1154
1155 results
1156 }
1157
1158 #[cfg(feature = "exochain")]
1160 pub fn tree_manager(&self) -> Option<&Arc<crate::tree_manager::TreeManager>> {
1161 self.tree_manager.as_ref()
1162 }
1163
1164 #[cfg(feature = "exochain")]
1166 pub fn chain_manager(&self) -> Option<&Arc<crate::chain::ChainManager>> {
1167 self.chain_manager.as_ref()
1168 }
1169}
1170
1171#[cfg(test)]
1172mod tests {
1173 use super::*;
1174 use clawft_core::bus::MessageBus;
1175
1176 fn make_supervisor() -> AgentSupervisor<clawft_platform::NativePlatform> {
1177 let process_table = Arc::new(ProcessTable::new(16));
1178 let bus = Arc::new(MessageBus::new());
1179 let ipc = Arc::new(KernelIpc::new(bus));
1180 AgentSupervisor::new(process_table, ipc, AgentCapabilities::default())
1181 }
1182
1183 fn simple_request(agent_id: &str) -> SpawnRequest {
1184 SpawnRequest {
1185 agent_id: agent_id.to_owned(),
1186 capabilities: None,
1187 parent_pid: None,
1188 env: HashMap::new(),
1189 backend: None,
1190 }
1191 }
1192
1193 #[test]
1194 fn spawn_creates_process_entry() {
1195 let sup = make_supervisor();
1196 let result = sup.spawn(simple_request("agent-1")).unwrap();
1197
1198 assert!(result.pid > 0);
1199 assert_eq!(result.agent_id, "agent-1");
1200
1201 let entry = sup.inspect(result.pid).unwrap();
1202 assert_eq!(entry.agent_id, "agent-1");
1203 assert_eq!(entry.state, ProcessState::Starting);
1204 }
1205
1206 #[test]
1207 fn spawn_uses_default_capabilities() {
1208 let sup = make_supervisor();
1209 let result = sup.spawn(simple_request("agent-1")).unwrap();
1210
1211 let entry = sup.inspect(result.pid).unwrap();
1212 assert!(entry.capabilities.can_spawn);
1213 assert!(entry.capabilities.can_ipc);
1214 assert!(entry.capabilities.can_exec_tools);
1215 }
1216
1217 #[test]
1218 fn spawn_uses_custom_capabilities() {
1219 let sup = make_supervisor();
1220 let caps = AgentCapabilities {
1221 can_spawn: false,
1222 can_ipc: false,
1223 can_exec_tools: true,
1224 can_network: true,
1225 ..Default::default()
1226 };
1227
1228 let request = SpawnRequest {
1229 agent_id: "restricted".to_owned(),
1230 capabilities: Some(caps.clone()),
1231 parent_pid: None,
1232 env: HashMap::new(),
1233 backend: None,
1234 };
1235
1236 let result = sup.spawn(request).unwrap();
1237 let entry = sup.inspect(result.pid).unwrap();
1238 assert!(!entry.capabilities.can_spawn);
1239 assert!(!entry.capabilities.can_ipc);
1240 assert!(entry.capabilities.can_network);
1241 }
1242
1243 #[test]
1244 fn spawn_with_parent_pid() {
1245 let sup = make_supervisor();
1246 let parent = sup.spawn(simple_request("parent")).unwrap();
1247
1248 let request = SpawnRequest {
1249 agent_id: "child".to_owned(),
1250 capabilities: None,
1251 parent_pid: Some(parent.pid),
1252 env: HashMap::new(),
1253 backend: None,
1254 };
1255
1256 let result = sup.spawn(request).unwrap();
1257 let entry = sup.inspect(result.pid).unwrap();
1258 assert_eq!(entry.parent_pid, Some(parent.pid));
1259 }
1260
1261 #[test]
1262 fn spawn_fails_when_table_full() {
1263 let process_table = Arc::new(ProcessTable::new(2));
1264 let bus = Arc::new(MessageBus::new());
1265 let ipc = Arc::new(KernelIpc::new(bus));
1266 let sup: AgentSupervisor<clawft_platform::NativePlatform> =
1267 AgentSupervisor::new(process_table, ipc, AgentCapabilities::default());
1268
1269 sup.spawn(simple_request("a1")).unwrap();
1270 sup.spawn(simple_request("a2")).unwrap();
1271 let result = sup.spawn(simple_request("a3"));
1272 assert!(result.is_err());
1273 }
1274
1275 #[test]
1276 fn stop_graceful() {
1277 let sup = make_supervisor();
1278 let result = sup.spawn(simple_request("agent-1")).unwrap();
1279
1280 sup.process_table()
1282 .update_state(result.pid, ProcessState::Running)
1283 .unwrap();
1284
1285 sup.stop(result.pid, true).unwrap();
1286
1287 let entry = sup.inspect(result.pid).unwrap();
1288 assert_eq!(entry.state, ProcessState::Stopping);
1289 assert!(entry.cancel_token.is_cancelled());
1290 }
1291
1292 #[test]
1293 fn stop_force() {
1294 let sup = make_supervisor();
1295 let result = sup.spawn(simple_request("agent-1")).unwrap();
1296
1297 sup.process_table()
1299 .update_state(result.pid, ProcessState::Running)
1300 .unwrap();
1301
1302 sup.stop(result.pid, false).unwrap();
1303
1304 let entry = sup.inspect(result.pid).unwrap();
1305 assert!(entry.cancel_token.is_cancelled());
1306 }
1307
1308 #[test]
1309 fn stop_already_exited_is_idempotent() {
1310 let sup = make_supervisor();
1311 let result = sup.spawn(simple_request("agent-1")).unwrap();
1312
1313 sup.process_table()
1315 .update_state(result.pid, ProcessState::Exited(0))
1316 .unwrap();
1317
1318 sup.stop(result.pid, true).unwrap();
1320 }
1321
1322 #[test]
1323 fn stop_nonexistent_pid_fails() {
1324 let sup = make_supervisor();
1325 let result = sup.stop(999, true);
1326 assert!(result.is_err());
1327 }
1328
1329 #[test]
1330 fn restart_creates_new_process() {
1331 let sup = make_supervisor();
1332 let original = sup.spawn(simple_request("agent-1")).unwrap();
1333
1334 sup.process_table()
1336 .update_state(original.pid, ProcessState::Running)
1337 .unwrap();
1338
1339 let restarted = sup.restart(original.pid).unwrap();
1340
1341 assert_ne!(restarted.pid, original.pid);
1343 assert_eq!(restarted.agent_id, "agent-1");
1344
1345 let new_entry = sup.inspect(restarted.pid).unwrap();
1347 assert_eq!(new_entry.parent_pid, Some(original.pid));
1348 }
1349
1350 #[test]
1351 fn restart_preserves_capabilities() {
1352 let sup = make_supervisor();
1353 let caps = AgentCapabilities {
1354 can_spawn: false,
1355 can_network: true,
1356 ..Default::default()
1357 };
1358
1359 let request = SpawnRequest {
1360 agent_id: "restricted".to_owned(),
1361 capabilities: Some(caps),
1362 parent_pid: None,
1363 env: HashMap::new(),
1364 backend: None,
1365 };
1366
1367 let original = sup.spawn(request).unwrap();
1368 sup.process_table()
1369 .update_state(original.pid, ProcessState::Running)
1370 .unwrap();
1371
1372 let restarted = sup.restart(original.pid).unwrap();
1373 let entry = sup.inspect(restarted.pid).unwrap();
1374 assert!(!entry.capabilities.can_spawn);
1375 assert!(entry.capabilities.can_network);
1376 }
1377
1378 #[test]
1379 fn list_by_state() {
1380 let sup = make_supervisor();
1381 let r1 = sup.spawn(simple_request("a1")).unwrap();
1382 let r2 = sup.spawn(simple_request("a2")).unwrap();
1383 sup.spawn(simple_request("a3")).unwrap();
1384
1385 sup.process_table()
1387 .update_state(r1.pid, ProcessState::Running)
1388 .unwrap();
1389 sup.process_table()
1390 .update_state(r2.pid, ProcessState::Running)
1391 .unwrap();
1392
1393 let running = sup.list_by_state(ProcessState::Running);
1394 assert_eq!(running.len(), 2);
1395
1396 let starting = sup.list_by_state(ProcessState::Starting);
1397 assert_eq!(starting.len(), 1);
1398 }
1399
1400 #[test]
1401 fn list_agents_excludes_kernel() {
1402 let sup = make_supervisor();
1403
1404 let kernel_entry = ProcessEntry {
1406 pid: 0,
1407 agent_id: "kernel".to_owned(),
1408 state: ProcessState::Running,
1409 capabilities: AgentCapabilities::default(),
1410 resource_usage: ResourceUsage::default(),
1411 cancel_token: CancellationToken::new(),
1412 parent_pid: None,
1413 };
1414 sup.process_table().insert_with_pid(kernel_entry).unwrap();
1415
1416 sup.spawn(simple_request("agent-1")).unwrap();
1418
1419 let agents = sup.list_agents();
1420 assert_eq!(agents.len(), 1);
1421 assert_eq!(agents[0].agent_id, "agent-1");
1422 }
1423
1424 #[test]
1425 fn running_count() {
1426 let sup = make_supervisor();
1427 let r1 = sup.spawn(simple_request("a1")).unwrap();
1428 let r2 = sup.spawn(simple_request("a2")).unwrap();
1429 sup.spawn(simple_request("a3")).unwrap();
1430
1431 assert_eq!(sup.running_count(), 0); sup.process_table()
1434 .update_state(r1.pid, ProcessState::Running)
1435 .unwrap();
1436 assert_eq!(sup.running_count(), 1);
1437
1438 sup.process_table()
1439 .update_state(r2.pid, ProcessState::Running)
1440 .unwrap();
1441 assert_eq!(sup.running_count(), 2);
1442 }
1443
1444 #[test]
1445 fn default_capabilities_accessor() {
1446 let sup = make_supervisor();
1447 let caps = sup.default_capabilities();
1448 assert!(caps.can_spawn);
1449 assert!(caps.can_ipc);
1450 assert!(caps.can_exec_tools);
1451 }
1452
1453 #[test]
1454 fn spawn_request_serde_roundtrip() {
1455 let request = SpawnRequest {
1456 agent_id: "test".to_owned(),
1457 capabilities: Some(AgentCapabilities {
1458 can_spawn: false,
1459 ..Default::default()
1460 }),
1461 parent_pid: Some(5),
1462 env: HashMap::from([("KEY".into(), "VALUE".into())]),
1463 backend: Some(SpawnBackend::Native),
1464 };
1465
1466 let json = serde_json::to_string(&request).unwrap();
1467 let restored: SpawnRequest = serde_json::from_str(&json).unwrap();
1468 assert_eq!(restored.agent_id, "test");
1469 assert_eq!(restored.parent_pid, Some(5));
1470 assert!(!restored.capabilities.unwrap().can_spawn);
1471 }
1472
1473 #[test]
1474 fn spawn_result_serde_roundtrip() {
1475 let result = SpawnResult {
1476 pid: 42,
1477 agent_id: "agent-42".to_owned(),
1478 };
1479
1480 let json = serde_json::to_string(&result).unwrap();
1481 let restored: SpawnResult = serde_json::from_str(&json).unwrap();
1482 assert_eq!(restored.pid, 42);
1483 assert_eq!(restored.agent_id, "agent-42");
1484 }
1485
1486 #[tokio::test]
1487 async fn spawn_and_run_executes_work() {
1488 let sup = make_supervisor();
1489
1490 let result = sup
1491 .spawn_and_run(simple_request("runner-1"), |_pid, _cancel| async { 0 })
1492 .unwrap();
1493
1494 assert!(result.pid > 0);
1495 assert_eq!(result.agent_id, "runner-1");
1496
1497 let entry = sup.inspect(result.pid).unwrap();
1499 assert_eq!(entry.state, ProcessState::Running);
1500
1501 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1503
1504 let entry = sup.inspect(result.pid).unwrap();
1506 assert!(matches!(entry.state, ProcessState::Exited(0)));
1507
1508 assert_eq!(sup.running_task_count(), 0);
1510 }
1511
1512 #[tokio::test]
1513 async fn spawn_and_run_respects_cancellation() {
1514 let sup = make_supervisor();
1515
1516 let result = sup
1517 .spawn_and_run(simple_request("cancellable"), |_pid, cancel| async move {
1518 cancel.cancelled().await;
1519 42
1520 })
1521 .unwrap();
1522
1523 assert_eq!(sup.running_task_count(), 1);
1524
1525 sup.stop(result.pid, true).unwrap();
1527
1528 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1530
1531 let entry = sup.inspect(result.pid).unwrap();
1532 assert!(matches!(entry.state, ProcessState::Exited(42)));
1533 assert_eq!(sup.running_task_count(), 0);
1534 }
1535
1536 #[tokio::test]
1537 async fn spawn_and_run_force_stop_aborts() {
1538 let sup = make_supervisor();
1539
1540 let result = sup
1541 .spawn_and_run(simple_request("force-me"), |_pid, cancel| async move {
1542 cancel.cancelled().await;
1543 0
1544 })
1545 .unwrap();
1546
1547 sup.stop(result.pid, false).unwrap();
1549
1550 let entry = sup.inspect(result.pid).unwrap();
1551 assert!(matches!(entry.state, ProcessState::Exited(-1)));
1552 assert_eq!(sup.running_task_count(), 0);
1553 }
1554
1555 #[tokio::test]
1556 async fn abort_all_clears_running_agents() {
1557 let sup = make_supervisor();
1558
1559 sup.spawn_and_run(simple_request("a1"), |_pid, cancel| async move {
1560 cancel.cancelled().await;
1561 0
1562 })
1563 .unwrap();
1564 sup.spawn_and_run(simple_request("a2"), |_pid, cancel| async move {
1565 cancel.cancelled().await;
1566 0
1567 })
1568 .unwrap();
1569
1570 assert_eq!(sup.running_task_count(), 2);
1571
1572 sup.abort_all();
1573
1574 assert_eq!(sup.running_task_count(), 0);
1575 }
1576
1577 #[tokio::test]
1578 async fn watchdog_sweep_reaps_finished_task() {
1579 let sup = make_supervisor();
1580
1581 let result = sup
1583 .spawn_and_run(simple_request("instant"), |_pid, _cancel| async { 0 })
1584 .unwrap();
1585
1586 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1588
1589 let reaped = sup.watchdog_sweep().await;
1592
1593 let entry = sup.inspect(result.pid).unwrap();
1596 assert!(
1597 matches!(entry.state, ProcessState::Exited(_)),
1598 "process should be Exited after sweep, got {:?}",
1599 entry.state
1600 );
1601
1602 assert_eq!(sup.running_task_count(), 0);
1604
1605 for (pid, code) in &reaped {
1607 assert_eq!(*pid, result.pid);
1608 assert!(*code == -2 || *code == -3);
1609 }
1610 }
1611
1612 #[tokio::test]
1613 async fn shutdown_all_graceful() {
1614 let sup = make_supervisor();
1615
1616 sup.spawn_and_run(simple_request("g1"), |_pid, cancel| async move {
1617 cancel.cancelled().await;
1618 0
1619 })
1620 .unwrap();
1621 sup.spawn_and_run(simple_request("g2"), |_pid, cancel| async move {
1622 cancel.cancelled().await;
1623 42
1624 })
1625 .unwrap();
1626
1627 assert_eq!(sup.running_task_count(), 2);
1628
1629 let results = sup
1630 .shutdown_all(std::time::Duration::from_secs(5))
1631 .await;
1632
1633 assert_eq!(results.len(), 2);
1634 assert_eq!(sup.running_task_count(), 0);
1635 }
1636
1637 #[tokio::test]
1638 async fn shutdown_all_timeout_aborts() {
1639 let sup = make_supervisor();
1640
1641 sup.spawn_and_run(simple_request("stubborn"), |_pid, _cancel| async move {
1643 tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
1645 0
1646 })
1647 .unwrap();
1648
1649 assert_eq!(sup.running_task_count(), 1);
1650
1651 let results = sup
1653 .shutdown_all(std::time::Duration::from_millis(100))
1654 .await;
1655
1656 assert!(!results.is_empty());
1658 assert_eq!(sup.running_task_count(), 0);
1659 }
1660
1661 #[cfg(feature = "exochain")]
1662 #[tokio::test]
1663 async fn chain_logs_agent_spawn() {
1664 let process_table = Arc::new(ProcessTable::new(16));
1665 let bus = Arc::new(MessageBus::new());
1666 let ipc = Arc::new(KernelIpc::new(bus));
1667 let cm = Arc::new(crate::chain::ChainManager::new(0, 1000));
1668
1669 let sup: AgentSupervisor<clawft_platform::NativePlatform> =
1670 AgentSupervisor::new(process_table, ipc, AgentCapabilities::default())
1671 .with_exochain(None, Some(cm.clone()));
1672
1673 let request = SpawnRequest {
1674 agent_id: "chain-agent".to_owned(),
1675 capabilities: None,
1676 parent_pid: Some(99),
1677 env: HashMap::new(),
1678 backend: None,
1679 };
1680
1681 let result = sup
1682 .spawn_and_run(request, |_pid, _cancel| async { 0 })
1683 .unwrap();
1684
1685 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1687
1688 let events = cm.tail(10);
1690 let spawn_evt = events.iter().find(|e| e.kind == "agent.spawn");
1691
1692 assert!(spawn_evt.is_some(), "expected agent.spawn event on chain");
1693
1694 let payload = spawn_evt.unwrap().payload.as_ref().unwrap();
1695 assert_eq!(payload["agent_id"], "chain-agent");
1696 assert_eq!(payload["pid"], result.pid);
1697 assert_eq!(payload["parent_pid"], 99);
1698
1699 let exit_evt = events.iter().find(|e| e.kind == "agent.exit");
1701 assert!(exit_evt.is_some(), "expected agent.exit event on chain");
1702 }
1703
1704 #[test]
1707 fn spawn_native_explicit() {
1708 let sup = make_supervisor();
1709 let request = SpawnRequest {
1710 agent_id: "native-agent".to_owned(),
1711 capabilities: None,
1712 parent_pid: None,
1713 env: HashMap::new(),
1714 backend: Some(SpawnBackend::Native),
1715 };
1716 let result = sup.spawn(request).unwrap();
1717 assert!(result.pid > 0);
1718 assert_eq!(result.agent_id, "native-agent");
1719 }
1720
1721 #[test]
1722 fn spawn_backend_none_defaults_to_native() {
1723 let sup = make_supervisor();
1724 let request = SpawnRequest {
1725 agent_id: "default-agent".to_owned(),
1726 capabilities: None,
1727 parent_pid: None,
1728 env: HashMap::new(),
1729 backend: None,
1730 };
1731 let result = sup.spawn(request).unwrap();
1732 assert!(result.pid > 0);
1733 assert_eq!(result.agent_id, "default-agent");
1734 }
1735
1736 #[test]
1737 fn spawn_wasm_returns_not_available() {
1738 let sup = make_supervisor();
1739 let request = SpawnRequest {
1740 agent_id: "wasm-agent".to_owned(),
1741 capabilities: None,
1742 parent_pid: None,
1743 env: HashMap::new(),
1744 backend: Some(SpawnBackend::Wasm {
1745 module: PathBuf::from("/tmp/agent.wasm"),
1746 }),
1747 };
1748 let result = sup.spawn(request);
1749 assert!(result.is_err());
1750 let err = result.unwrap_err();
1751 let msg = err.to_string();
1752 assert!(msg.contains("wasm"), "error should mention wasm: {msg}");
1753 assert!(msg.contains("not available"), "error should say not available: {msg}");
1754 }
1755
1756 #[test]
1757 fn spawn_container_returns_not_available() {
1758 let sup = make_supervisor();
1759 let request = SpawnRequest {
1760 agent_id: "container-agent".to_owned(),
1761 capabilities: None,
1762 parent_pid: None,
1763 env: HashMap::new(),
1764 backend: Some(SpawnBackend::Container {
1765 image: "ghcr.io/test/agent:latest".into(),
1766 }),
1767 };
1768 let result = sup.spawn(request);
1769 assert!(result.is_err());
1770 let msg = result.unwrap_err().to_string();
1771 assert!(msg.contains("container"), "error should mention container: {msg}");
1772 }
1773
1774 #[test]
1775 fn spawn_tee_returns_not_available() {
1776 let sup = make_supervisor();
1777 let request = SpawnRequest {
1778 agent_id: "tee-agent".to_owned(),
1779 capabilities: None,
1780 parent_pid: None,
1781 env: HashMap::new(),
1782 backend: Some(SpawnBackend::Tee {
1783 enclave: EnclaveConfig {
1784 enclave_type: "sgx".into(),
1785 },
1786 }),
1787 };
1788 let result = sup.spawn(request);
1789 assert!(result.is_err());
1790 let msg = result.unwrap_err().to_string();
1791 assert!(msg.contains("tee"), "error should mention tee: {msg}");
1792 }
1793
1794 #[test]
1795 fn spawn_remote_returns_not_available() {
1796 let sup = make_supervisor();
1797 let request = SpawnRequest {
1798 agent_id: "remote-agent".to_owned(),
1799 capabilities: None,
1800 parent_pid: None,
1801 env: HashMap::new(),
1802 backend: Some(SpawnBackend::Remote {
1803 node_id: "node-42".into(),
1804 }),
1805 };
1806 let result = sup.spawn(request);
1807 assert!(result.is_err());
1808 let msg = result.unwrap_err().to_string();
1809 assert!(msg.contains("remote"), "error should mention remote: {msg}");
1810 }
1811
1812 #[cfg(feature = "os-patterns")]
1815 mod restart_tests {
1816 use super::*;
1817 use crate::supervisor::{
1818 RestartBudget, RestartStrategy, RestartTracker,
1819 ResourceCheckResult, check_resource_usage,
1820 };
1821 use crate::capability::ResourceLimits;
1822
1823 #[test]
1824 fn restart_strategy_default_is_one_for_one() {
1825 assert_eq!(RestartStrategy::default(), RestartStrategy::OneForOne);
1826 }
1827
1828 #[test]
1829 fn restart_strategy_serde_roundtrip() {
1830 let strategies = vec![
1831 RestartStrategy::OneForOne,
1832 RestartStrategy::OneForAll,
1833 RestartStrategy::RestForOne,
1834 ];
1835 for strategy in strategies {
1836 let json = serde_json::to_string(&strategy).unwrap();
1837 let restored: RestartStrategy = serde_json::from_str(&json).unwrap();
1838 assert_eq!(restored, strategy);
1839 }
1840 }
1841
1842 #[test]
1843 fn restart_budget_default() {
1844 let budget = RestartBudget::default();
1845 assert_eq!(budget.max_restarts, 5);
1846 assert_eq!(budget.within_secs, 60);
1847 }
1848
1849 #[test]
1850 fn restart_budget_serde_roundtrip() {
1851 let budget = RestartBudget {
1852 max_restarts: 3,
1853 within_secs: 30,
1854 };
1855 let json = serde_json::to_string(&budget).unwrap();
1856 let restored: RestartBudget = serde_json::from_str(&json).unwrap();
1857 assert_eq!(restored.max_restarts, 3);
1858 assert_eq!(restored.within_secs, 30);
1859 }
1860
1861 #[test]
1862 fn tracker_new_starts_at_zero() {
1863 let tracker = RestartTracker::new();
1864 assert_eq!(tracker.restart_count, 0);
1865 assert_eq!(tracker.backoff_ms, 0);
1866 assert!(tracker.last_restart.is_none());
1867 }
1868
1869 #[test]
1870 fn tracker_backoff_exponential() {
1871 let mut tracker = RestartTracker::new();
1872 let budget = RestartBudget {
1873 max_restarts: 10,
1874 within_secs: 60,
1875 };
1876
1877 tracker.record_restart(&budget);
1879 assert_eq!(tracker.backoff_ms, 100);
1880
1881 tracker.record_restart(&budget);
1883 assert_eq!(tracker.backoff_ms, 200);
1884
1885 tracker.record_restart(&budget);
1887 assert_eq!(tracker.backoff_ms, 400);
1888
1889 tracker.record_restart(&budget);
1891 assert_eq!(tracker.backoff_ms, 800);
1892 }
1893
1894 #[test]
1895 fn tracker_backoff_caps_at_30s() {
1896 let mut tracker = RestartTracker::new();
1897 tracker.restart_count = 20;
1898 let delay = tracker.next_backoff_ms();
1899 assert!(delay <= 30_000, "backoff should cap at 30s, got {delay}");
1900 }
1901
1902 #[test]
1903 fn tracker_budget_exceeded_returns_false() {
1904 let mut tracker = RestartTracker::new();
1905 let budget = RestartBudget {
1906 max_restarts: 2,
1907 within_secs: 60,
1908 };
1909
1910 assert!(tracker.record_restart(&budget)); assert!(tracker.record_restart(&budget)); assert!(!tracker.record_restart(&budget)); }
1914
1915 #[test]
1916 fn tracker_budget_within_returns_true() {
1917 let mut tracker = RestartTracker::new();
1918 let budget = RestartBudget {
1919 max_restarts: 5,
1920 within_secs: 60,
1921 };
1922
1923 for _ in 0..5 {
1924 assert!(tracker.record_restart(&budget));
1925 }
1926 }
1927
1928 #[test]
1929 fn tracker_records_last_restart() {
1930 let mut tracker = RestartTracker::new();
1931 let budget = RestartBudget::default();
1932 assert!(tracker.last_restart.is_none());
1933
1934 tracker.record_restart(&budget);
1935 assert!(tracker.last_restart.is_some());
1936 }
1937
1938 #[test]
1941 fn resource_check_within_limits() {
1942 let usage = ResourceUsage {
1943 memory_bytes: 100,
1944 cpu_time_ms: 100,
1945 tool_calls: 10,
1946 messages_sent: 10,
1947 };
1948 let limits = ResourceLimits::default(); let results = check_resource_usage(&usage, &limits);
1950 assert!(results.is_empty());
1951 }
1952
1953 #[test]
1954 fn resource_check_warning_at_80_percent() {
1955 let usage = ResourceUsage {
1956 memory_bytes: 220 * 1024 * 1024, cpu_time_ms: 100,
1958 tool_calls: 10,
1959 messages_sent: 10,
1960 };
1961 let limits = ResourceLimits::default();
1962 let results = check_resource_usage(&usage, &limits);
1963 assert_eq!(results.len(), 1);
1964 assert!(matches!(&results[0], ResourceCheckResult::Warning { resource, .. } if resource == "memory"));
1965 }
1966
1967 #[test]
1968 fn resource_check_exceeded_at_100_percent() {
1969 let usage = ResourceUsage {
1970 memory_bytes: 300 * 1024 * 1024, cpu_time_ms: 100,
1972 tool_calls: 10,
1973 messages_sent: 10,
1974 };
1975 let limits = ResourceLimits::default();
1976 let results = check_resource_usage(&usage, &limits);
1977 assert_eq!(results.len(), 1);
1978 assert!(matches!(&results[0], ResourceCheckResult::Exceeded { resource, .. } if resource == "memory"));
1979 }
1980
1981 #[test]
1982 fn resource_check_unlimited_skipped() {
1983 let usage = ResourceUsage {
1984 memory_bytes: 999_999_999,
1985 cpu_time_ms: 999_999_999,
1986 tool_calls: 999_999_999,
1987 messages_sent: 999_999_999,
1988 };
1989 let limits = ResourceLimits {
1990 max_memory_bytes: 0,
1991 max_cpu_time_ms: 0,
1992 max_tool_calls: 0,
1993 max_messages: 0,
1994 ..Default::default()
1995 };
1996 let results = check_resource_usage(&usage, &limits);
1997 assert!(results.is_empty(), "0 = unlimited should skip enforcement");
1998 }
1999
2000 #[test]
2001 fn resource_check_multiple_exceeded() {
2002 let limits = ResourceLimits {
2003 max_memory_bytes: 100,
2004 max_cpu_time_ms: 100,
2005 max_tool_calls: 10,
2006 max_messages: 10,
2007 ..Default::default()
2008 };
2009 let usage = ResourceUsage {
2010 memory_bytes: 200,
2011 cpu_time_ms: 200,
2012 tool_calls: 20,
2013 messages_sent: 20,
2014 };
2015 let results = check_resource_usage(&usage, &limits);
2016 assert_eq!(results.len(), 4);
2017 for r in &results {
2018 assert!(matches!(r, ResourceCheckResult::Exceeded { .. }));
2019 }
2020 }
2021
2022 #[test]
2023 fn resource_check_message_limit() {
2024 let limits = ResourceLimits {
2025 max_messages: 100,
2026 ..Default::default()
2027 };
2028 let usage = ResourceUsage {
2029 messages_sent: 100,
2030 ..Default::default()
2031 };
2032 let results = check_resource_usage(&usage, &limits);
2033 assert_eq!(results.len(), 1);
2034 assert!(matches!(&results[0], ResourceCheckResult::Exceeded { resource, .. } if resource == "messages"));
2035 }
2036
2037 #[test]
2038 fn resource_check_cpu_time_limit() {
2039 let limits = ResourceLimits {
2040 max_cpu_time_ms: 1000,
2041 ..Default::default()
2042 };
2043 let usage = ResourceUsage {
2044 cpu_time_ms: 1000,
2045 ..Default::default()
2046 };
2047 let results = check_resource_usage(&usage, &limits);
2048 assert_eq!(results.len(), 1);
2049 assert!(matches!(&results[0], ResourceCheckResult::Exceeded { resource, .. } if resource == "cpu_time"));
2050 }
2051
2052 #[test]
2055 fn restart_strategy_permanent_serde_roundtrip() {
2056 let strategy = RestartStrategy::Permanent;
2057 let json = serde_json::to_string(&strategy).unwrap();
2058 let restored: RestartStrategy = serde_json::from_str(&json).unwrap();
2059 assert_eq!(restored, RestartStrategy::Permanent);
2060 }
2061
2062 #[test]
2063 fn restart_strategy_transient_serde_roundtrip() {
2064 let strategy = RestartStrategy::Transient;
2065 let json = serde_json::to_string(&strategy).unwrap();
2066 let restored: RestartStrategy = serde_json::from_str(&json).unwrap();
2067 assert_eq!(restored, RestartStrategy::Transient);
2068 }
2069
2070 #[test]
2071 fn should_restart_permanent_never_restarts() {
2072 assert!(!RestartTracker::should_restart(&RestartStrategy::Permanent, 0));
2073 assert!(!RestartTracker::should_restart(&RestartStrategy::Permanent, 1));
2074 assert!(!RestartTracker::should_restart(&RestartStrategy::Permanent, -1));
2075 }
2076
2077 #[test]
2078 fn should_restart_transient_only_on_abnormal() {
2079 assert!(!RestartTracker::should_restart(&RestartStrategy::Transient, 0));
2081 assert!(RestartTracker::should_restart(&RestartStrategy::Transient, 1));
2083 assert!(RestartTracker::should_restart(&RestartStrategy::Transient, -1));
2084 assert!(RestartTracker::should_restart(&RestartStrategy::Transient, 42));
2085 }
2086
2087 #[test]
2088 fn should_restart_one_for_one_always() {
2089 assert!(RestartTracker::should_restart(&RestartStrategy::OneForOne, 0));
2090 assert!(RestartTracker::should_restart(&RestartStrategy::OneForOne, 1));
2091 }
2092
2093 #[test]
2094 fn should_restart_one_for_all_always() {
2095 assert!(RestartTracker::should_restart(&RestartStrategy::OneForAll, 0));
2096 assert!(RestartTracker::should_restart(&RestartStrategy::OneForAll, -1));
2097 }
2098
2099 #[test]
2100 fn should_restart_rest_for_one_always() {
2101 assert!(RestartTracker::should_restart(&RestartStrategy::RestForOne, 0));
2102 assert!(RestartTracker::should_restart(&RestartStrategy::RestForOne, 1));
2103 }
2104
2105 #[test]
2106 fn tracker_is_exhausted_when_at_max() {
2107 let mut tracker = RestartTracker::new();
2108 let budget = RestartBudget {
2109 max_restarts: 3,
2110 within_secs: 60,
2111 };
2112
2113 assert!(!tracker.is_exhausted(&budget));
2114
2115 tracker.record_restart(&budget); tracker.record_restart(&budget); assert!(!tracker.is_exhausted(&budget));
2118
2119 tracker.record_restart(&budget); assert!(tracker.is_exhausted(&budget));
2121 }
2122
2123 #[test]
2124 fn tracker_remaining_decreases() {
2125 let mut tracker = RestartTracker::new();
2126 let budget = RestartBudget {
2127 max_restarts: 5,
2128 within_secs: 60,
2129 };
2130
2131 assert_eq!(tracker.remaining(&budget), 5);
2132 tracker.record_restart(&budget);
2133 assert_eq!(tracker.remaining(&budget), 4);
2134 tracker.record_restart(&budget);
2135 assert_eq!(tracker.remaining(&budget), 3);
2136 }
2137
2138 #[test]
2139 fn tracker_remaining_saturates_at_zero() {
2140 let mut tracker = RestartTracker::new();
2141 let budget = RestartBudget {
2142 max_restarts: 2,
2143 within_secs: 60,
2144 };
2145
2146 tracker.record_restart(&budget); tracker.record_restart(&budget); assert_eq!(tracker.remaining(&budget), 0);
2149 tracker.record_restart(&budget); assert_eq!(tracker.remaining(&budget), 0);
2151 }
2152
2153 #[test]
2154 fn tracker_backoff_sequence_matches_spec() {
2155 let mut tracker = RestartTracker::new();
2157 let budget = RestartBudget {
2158 max_restarts: 20,
2159 within_secs: 600,
2160 };
2161
2162 let expected = [100, 200, 400, 800, 1600, 3200, 6400, 12800, 25600, 30000];
2163 for &exp in &expected {
2164 tracker.record_restart(&budget);
2165 assert_eq!(
2166 tracker.backoff_ms, exp,
2167 "restart #{}: expected {}ms, got {}ms",
2168 tracker.restart_count, exp, tracker.backoff_ms
2169 );
2170 }
2171 }
2172
2173 #[test]
2174 fn handle_exit_permanent_no_restart() {
2175 let sup = make_supervisor_with_strategy(
2176 RestartStrategy::Permanent,
2177 RestartBudget::default(),
2178 );
2179 let result = sup.spawn(simple_request("perm-agent")).unwrap();
2180 sup.process_table()
2181 .update_state(result.pid, ProcessState::Running)
2182 .unwrap();
2183
2184 let _ = sup.process_table().update_state(result.pid, ProcessState::Exited(1));
2186
2187 let restarts = sup.handle_exit(result.pid, 1);
2188 assert!(restarts.is_empty(), "Permanent strategy should never restart");
2189 }
2190
2191 #[test]
2192 fn handle_exit_transient_normal_no_restart() {
2193 let sup = make_supervisor_with_strategy(
2194 RestartStrategy::Transient,
2195 RestartBudget::default(),
2196 );
2197 let result = sup.spawn(simple_request("trans-agent")).unwrap();
2198 sup.process_table()
2199 .update_state(result.pid, ProcessState::Running)
2200 .unwrap();
2201
2202 let _ = sup.process_table().update_state(result.pid, ProcessState::Exited(0));
2203
2204 let restarts = sup.handle_exit(result.pid, 0);
2205 assert!(
2206 restarts.is_empty(),
2207 "Transient should not restart on normal exit (code 0)"
2208 );
2209 }
2210
2211 #[test]
2212 fn handle_exit_transient_abnormal_restarts() {
2213 let sup = make_supervisor_with_strategy(
2214 RestartStrategy::Transient,
2215 RestartBudget { max_restarts: 5, within_secs: 60 },
2216 );
2217 let result = sup.spawn(simple_request("trans-crash")).unwrap();
2218 sup.process_table()
2219 .update_state(result.pid, ProcessState::Running)
2220 .unwrap();
2221
2222 let _ = sup.process_table().update_state(result.pid, ProcessState::Exited(1));
2223
2224 let restarts = sup.handle_exit(result.pid, 1);
2225 assert_eq!(restarts.len(), 1, "Transient should restart on abnormal exit");
2226 assert_eq!(restarts[0].0, result.pid);
2227 }
2228
2229 #[test]
2230 fn handle_exit_one_for_one_restarts_only_failed() {
2231 let sup = make_supervisor_with_strategy(
2232 RestartStrategy::OneForOne,
2233 RestartBudget { max_restarts: 10, within_secs: 60 },
2234 );
2235 let r1 = sup.spawn(simple_request("ofo-a")).unwrap();
2236 let r2 = sup.spawn(simple_request("ofo-b")).unwrap();
2237 sup.process_table().update_state(r1.pid, ProcessState::Running).unwrap();
2238 sup.process_table().update_state(r2.pid, ProcessState::Running).unwrap();
2239
2240 let _ = sup.process_table().update_state(r1.pid, ProcessState::Exited(1));
2241
2242 let restarts = sup.handle_exit(r1.pid, 1);
2243 assert_eq!(restarts.len(), 1);
2245 assert_eq!(restarts[0].0, r1.pid);
2246
2247 let r2_entry = sup.inspect(r2.pid).unwrap();
2249 assert_eq!(r2_entry.state, ProcessState::Running);
2250 }
2251
2252 #[test]
2253 fn handle_exit_budget_exhausted_no_restart() {
2254 let sup = make_supervisor_with_strategy(
2255 RestartStrategy::OneForOne,
2256 RestartBudget { max_restarts: 1, within_secs: 60 },
2257 );
2258
2259 let r1 = sup.spawn(simple_request("budget-a")).unwrap();
2261 sup.process_table().update_state(r1.pid, ProcessState::Running).unwrap();
2262 let _ = sup.process_table().update_state(r1.pid, ProcessState::Exited(1));
2263
2264 let restarts1 = sup.handle_exit(r1.pid, 1);
2265 assert_eq!(restarts1.len(), 1, "first restart should succeed");
2266
2267 let new_pid = restarts1[0].1.pid;
2270 sup.process_table().update_state(new_pid, ProcessState::Running).unwrap();
2271 let _ = sup.process_table().update_state(r1.pid, ProcessState::Exited(1));
2272
2273 let restarts2 = sup.handle_exit(r1.pid, 1);
2274 assert!(restarts2.is_empty(), "budget should be exhausted after 1 restart");
2275 }
2276
2277 #[test]
2278 fn handle_exit_links_notify_monitor_registry() {
2279 let sup = make_supervisor_with_strategy(
2280 RestartStrategy::Permanent, RestartBudget::default(),
2282 );
2283 let r1 = sup.spawn(simple_request("linked-a")).unwrap();
2284 let r2 = sup.spawn(simple_request("linked-b")).unwrap();
2285
2286 sup.monitor_registry().link(r1.pid, r2.pid);
2288
2289 let _ref_id = sup.monitor_registry().monitor(r2.pid, r1.pid);
2291
2292 sup.process_table().update_state(r1.pid, ProcessState::Running).unwrap();
2293 let _ = sup.process_table().update_state(r1.pid, ProcessState::Exited(1));
2294
2295 let _restarts = sup.handle_exit(r1.pid, 1);
2297
2298 assert!(!sup.monitor_registry().is_linked(r1.pid, r2.pid));
2300 assert!(sup.monitor_registry().get_monitors(r1.pid).is_empty());
2302 }
2303
2304 #[test]
2305 fn supervisor_with_restart_config_builder() {
2306 let process_table = Arc::new(ProcessTable::new(16));
2307 let bus = Arc::new(clawft_core::bus::MessageBus::new());
2308 let ipc = Arc::new(KernelIpc::new(bus));
2309
2310 let sup: AgentSupervisor<clawft_platform::NativePlatform> =
2311 AgentSupervisor::new(process_table, ipc, AgentCapabilities::default())
2312 .with_restart_config(
2313 RestartStrategy::RestForOne,
2314 RestartBudget { max_restarts: 3, within_secs: 30 },
2315 );
2316
2317 assert_eq!(*sup.restart_strategy(), RestartStrategy::RestForOne);
2318 assert_eq!(sup.restart_budget().max_restarts, 3);
2319 assert_eq!(sup.restart_budget().within_secs, 30);
2320 }
2321
2322 fn make_supervisor_with_strategy(
2323 strategy: RestartStrategy,
2324 budget: RestartBudget,
2325 ) -> AgentSupervisor<clawft_platform::NativePlatform> {
2326 let process_table = Arc::new(ProcessTable::new(32));
2327 let bus = Arc::new(clawft_core::bus::MessageBus::new());
2328 let ipc = Arc::new(KernelIpc::new(bus));
2329 AgentSupervisor::new(process_table, ipc, AgentCapabilities::default())
2330 .with_restart_config(strategy, budget)
2331 }
2332 }
2333
2334 #[test]
2337 fn spawn_backend_native_serde_roundtrip() {
2338 let backend = SpawnBackend::Native;
2339 let json = serde_json::to_string(&backend).unwrap();
2340 let _: SpawnBackend = serde_json::from_str(&json).unwrap();
2341 }
2342
2343 #[test]
2344 fn spawn_backend_wasm_serde_roundtrip() {
2345 let backend = SpawnBackend::Wasm {
2346 module: PathBuf::from("/opt/modules/agent.wasm"),
2347 };
2348 let json = serde_json::to_string(&backend).unwrap();
2349 let restored: SpawnBackend = serde_json::from_str(&json).unwrap();
2350 assert!(matches!(restored, SpawnBackend::Wasm { .. }));
2351 }
2352
2353 #[test]
2354 fn spawn_backend_container_serde_roundtrip() {
2355 let backend = SpawnBackend::Container {
2356 image: "ghcr.io/org/agent:v1".into(),
2357 };
2358 let json = serde_json::to_string(&backend).unwrap();
2359 let restored: SpawnBackend = serde_json::from_str(&json).unwrap();
2360 assert!(matches!(restored, SpawnBackend::Container { .. }));
2361 }
2362
2363 #[test]
2364 fn spawn_backend_tee_serde_roundtrip() {
2365 let backend = SpawnBackend::Tee {
2366 enclave: EnclaveConfig {
2367 enclave_type: "sgx".into(),
2368 },
2369 };
2370 let json = serde_json::to_string(&backend).unwrap();
2371 let restored: SpawnBackend = serde_json::from_str(&json).unwrap();
2372 assert!(matches!(restored, SpawnBackend::Tee { .. }));
2373 }
2374
2375 #[test]
2376 fn spawn_backend_remote_serde_roundtrip() {
2377 let backend = SpawnBackend::Remote {
2378 node_id: "node-42".into(),
2379 };
2380 let json = serde_json::to_string(&backend).unwrap();
2381 let restored: SpawnBackend = serde_json::from_str(&json).unwrap();
2382 assert!(matches!(restored, SpawnBackend::Remote { .. }));
2383 }
2384
2385 #[test]
2386 fn enclave_config_serde_roundtrip() {
2387 let cfg = EnclaveConfig {
2388 enclave_type: "trustzone".into(),
2389 };
2390 let json = serde_json::to_string(&cfg).unwrap();
2391 let restored: EnclaveConfig = serde_json::from_str(&json).unwrap();
2392 assert_eq!(restored.enclave_type, "trustzone");
2393 }
2394
2395 #[test]
2396 fn spawn_request_with_backend_serde_roundtrip() {
2397 let req = SpawnRequest {
2398 agent_id: "test-agent".into(),
2399 capabilities: None,
2400 parent_pid: Some(42),
2401 env: HashMap::from([("LOG_LEVEL".into(), "debug".into())]),
2402 backend: Some(SpawnBackend::Native),
2403 };
2404 let json = serde_json::to_string(&req).unwrap();
2405 let restored: SpawnRequest = serde_json::from_str(&json).unwrap();
2406 assert_eq!(restored.agent_id, "test-agent");
2407 assert_eq!(restored.parent_pid, Some(42));
2408 assert!(restored.backend.is_some());
2409 }
2410
2411 #[test]
2412 fn spawn_request_minimal_json_deserializes() {
2413 let json = r#"{"agent_id":"minimal"}"#;
2414 let req: SpawnRequest = serde_json::from_str(json).unwrap();
2415 assert_eq!(req.agent_id, "minimal");
2416 assert!(req.capabilities.is_none());
2417 assert!(req.parent_pid.is_none());
2418 assert!(req.env.is_empty());
2419 assert!(req.backend.is_none());
2420 }
2421}