Skip to main content

oxios_kernel/
agent_lifecycle.rs

1//! Agent lifecycle management — fork, register, run, cleanup.
2//!
3//! Extracted from Orchestrator to reduce the god-object scope.
4//! Handles: fork agent → register A2A → check permissions →
5//! submit to scheduler → run → unregister → complete/fail.
6
7use anyhow::{bail, Result};
8use std::sync::Arc;
9
10use tokio::time::{timeout, Duration};
11
12use crate::a2a::{A2AProtocol, AgentCard};
13use crate::access_manager::AccessManager;
14use crate::event_bus::{EventBus, KernelEvent};
15use crate::metrics::get_metrics;
16use crate::scheduler::{AgentScheduler, Priority, ScheduledTask};
17use crate::supervisor::Supervisor;
18use crate::types::{AgentId, AgentStatus};
19use oxios_ouroboros::{ExecutionResult, Seed};
20
21/// Manages the full lifecycle of a single agent from fork to cleanup.
22pub struct AgentLifecycleManager {
23    supervisor: Arc<dyn Supervisor>,
24    scheduler: Arc<AgentScheduler>,
25    access_manager: Arc<parking_lot::Mutex<AccessManager>>,
26    a2a: Arc<A2AProtocol>,
27    event_bus: EventBus,
28    /// Maximum execution time in seconds for agent tasks (0 = no limit).
29    max_execution_time_secs: std::sync::atomic::AtomicU64,
30}
31
32impl Clone for AgentLifecycleManager {
33    fn clone(&self) -> Self {
34        Self {
35            supervisor: self.supervisor.clone(),
36            scheduler: self.scheduler.clone(),
37            access_manager: self.access_manager.clone(),
38            a2a: self.a2a.clone(),
39            event_bus: self.event_bus.clone(),
40            max_execution_time_secs: std::sync::atomic::AtomicU64::new(
41                self.max_execution_time_secs
42                    .load(std::sync::atomic::Ordering::Relaxed),
43            ),
44        }
45    }
46}
47
48impl AgentLifecycleManager {
49    /// Create a new lifecycle manager.
50    pub fn new(
51        supervisor: Arc<dyn Supervisor>,
52        scheduler: Arc<AgentScheduler>,
53        access_manager: Arc<parking_lot::Mutex<AccessManager>>,
54        a2a: Arc<A2AProtocol>,
55        event_bus: EventBus,
56        max_execution_time_secs: u64,
57    ) -> Self {
58        Self {
59            supervisor,
60            scheduler,
61            access_manager,
62            a2a,
63            event_bus,
64            max_execution_time_secs: std::sync::atomic::AtomicU64::new(max_execution_time_secs),
65        }
66    }
67
68    /// Hot-reload max execution time without restart.
69    pub fn set_max_execution_time(&self, secs: u64) {
70        self.max_execution_time_secs
71            .store(secs, std::sync::atomic::Ordering::Relaxed);
72        tracing::info!(
73            max_execution_time_secs = secs,
74            "Lifecycle config hot-reloaded"
75        );
76    }
77
78    /// Fork an agent, register it in A2A and access control, submit to
79    /// scheduler, run the seed, then clean up.
80    pub async fn spawn_and_run(&self, seed: &Seed, priority: Priority) -> Result<ExecutionResult> {
81        // 1. Fork
82        let agent_id = self.supervisor.fork(seed).await?;
83        let agent_name = format!("agent-{agent_id}");
84        tracing::info!(agent_id = %agent_id, seed_id = %seed.id, "Agent forked");
85
86        // 2. Register A2A card
87        let card = self.build_agent_card(agent_id, &agent_name, seed);
88        if let Err(e) = self.a2a.registry().register_agent(card).await {
89            tracing::warn!(agent_id = %agent_id, error = %e, "Failed to register A2A card");
90        }
91
92        // 2b. Deliver any pending A2A messages to this agent
93        if let Err(e) = self.a2a.deliver_pending_messages(agent_id).await {
94            tracing::debug!(agent_id = %agent_id, error = %e, "No pending A2A messages");
95        }
96
97        // 3. Ensure access permissions
98        self.ensure_permissions(&agent_name);
99
100        // 4. Submit and start task
101        get_metrics().agents_forked.inc();
102        let task =
103            ScheduledTask::for_agent(agent_id, format!("Execute seed '{}'", seed.goal), priority);
104        let task_id = self.scheduler.submit(task)?;
105        self.scheduler.start_task(task_id)?;
106
107        // 5. Run — always cleanup even on failure
108        let max_secs = self
109            .max_execution_time_secs
110            .load(std::sync::atomic::Ordering::Relaxed);
111        let result = if max_secs > 0 {
112            let exec_timeout = Duration::from_secs(max_secs);
113            match timeout(exec_timeout, self.supervisor.run_with_seed(agent_id, seed)).await {
114                Ok(Ok(r)) => r,
115                Ok(Err(e)) => {
116                    tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
117                    self.cleanup_on_failure(agent_id, task_id).await;
118                    return Err(e);
119                }
120                Err(_) => {
121                    let secs = exec_timeout.as_secs();
122                    tracing::warn!(agent_id = %agent_id, secs, "Agent execution timed out after {}s", secs);
123                    self.cleanup_on_failure(agent_id, task_id).await;
124                    bail!("Agent execution timed out after {secs} seconds");
125                }
126            }
127        } else {
128            match self.supervisor.run_with_seed(agent_id, seed).await {
129                Ok(r) => r,
130                Err(e) => {
131                    tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
132                    self.cleanup_on_failure(agent_id, task_id).await;
133                    return Err(e);
134                }
135            }
136        };
137
138        // 6. Cleanup on success
139        self.cleanup(agent_id, task_id, &result).await;
140
141        Ok(result)
142    }
143
144    /// Kill an agent and clean up all registered state.
145    pub async fn terminate(&self, agent_id: AgentId) -> Result<()> {
146        if let Err(e) = self.supervisor.kill(agent_id).await {
147            tracing::warn!(agent_id = %agent_id, error = %e, "Failed to kill agent");
148        }
149        if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
150            tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
151        }
152        let _ = self
153            .event_bus
154            .publish(KernelEvent::AgentStopped { id: agent_id });
155        Ok(())
156    }
157
158    /// Build an A2A agent card from the seed.
159    fn build_agent_card(&self, agent_id: AgentId, agent_name: &str, seed: &Seed) -> AgentCard {
160        let goal_lower = seed.goal.to_lowercase();
161
162        let mut card = AgentCard::new(
163            agent_id,
164            agent_name,
165            format!("Agent executing seed: {}", seed.goal),
166        )
167        .with_capability("execute-seed")
168        .with_status(AgentStatus::Starting);
169
170        // Infer capabilities from goal.
171        if goal_lower.contains("review") || goal_lower.contains("code") {
172            card = card.with_capability("code-review");
173        }
174        if goal_lower.contains("test") {
175            card = card.with_capability("testing");
176        }
177        if goal_lower.contains("refactor") || goal_lower.contains("improve") {
178            card = card.with_capability("refactoring");
179        }
180        if goal_lower.contains("write")
181            || goal_lower.contains("create")
182            || goal_lower.contains("implement")
183        {
184            card = card.with_capability("code-generation");
185        }
186        if goal_lower.contains("debug") || goal_lower.contains("fix") {
187            card = card.with_capability("debugging");
188        }
189
190        card
191    }
192
193    /// Ensure default tool permissions exist for an agent.
194    fn ensure_permissions(&self, agent_name: &str) {
195        let mut access = self.access_manager.lock();
196        let perms = access.get_or_create_permissions(agent_name);
197        // Grant default tool set to new agents
198        for tool in [
199            "bash", "read", "write", "edit", "grep", "find", "exec", "ls",
200        ] {
201            if !perms.allowed_tools.contains(tool) {
202                perms.allow_tool(tool);
203            }
204        }
205    }
206
207    /// Unregister A2A, complete/fail scheduler task.
208    async fn cleanup(&self, agent_id: AgentId, task_id: uuid::Uuid, result: &ExecutionResult) {
209        if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
210            tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
211        }
212        if result.success {
213            let _ = self.scheduler.complete_task(task_id);
214        } else {
215            let _ = self.scheduler.fail_task(task_id, &result.output);
216        }
217    }
218
219    /// Reap finished zombie tasks and log the cleanup.
220    pub fn reap_zombies(&self) -> Vec<uuid::Uuid> {
221        let reaped = self.scheduler.reap_zombies();
222        if !reaped.is_empty() {
223            tracing::warn!(count = reaped.len(), "Zombie tasks reaped");
224            let mut access = self.access_manager.lock();
225            for task_id in &reaped {
226                access.log_access("scheduler", "zombie_reap", &task_id.to_string(), true, None);
227            }
228        }
229        reaped
230    }
231
232    /// Cleanup when agent execution fails (no ExecutionResult available).
233    async fn cleanup_on_failure(&self, agent_id: AgentId, task_id: uuid::Uuid) {
234        if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
235            tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
236        }
237        let _ = self.scheduler.fail_task(task_id, "execution failed");
238    }
239}