Skip to main content

oxios_kernel/
agent_lifecycle.rs

1//! Agent lifecycle management — fork, register, run, cleanup.
2//!
3//! Extracted from Orchestrator to reduce the god-object scope.
4//! Handles: fork agent → register A2A → check permissions →
5//! submit to scheduler → run → unregister → complete/fail.
6
7use anyhow::{bail, Result};
8use std::sync::Arc;
9
10use tokio::time::{timeout, Duration};
11
12use crate::a2a::{A2AProtocol, AgentCard};
13use crate::access_manager::AccessManager;
14use crate::event_bus::{EventBus, KernelEvent};
15use crate::metrics::get_metrics;
16use crate::scheduler::{AgentScheduler, Priority, ScheduledTask};
17use crate::supervisor::Supervisor;
18use crate::types::{AgentId, AgentStatus};
19use oxios_ouroboros::{ExecutionResult, Seed};
20
21/// Manages the full lifecycle of a single agent from fork to cleanup.
22#[derive(Clone)]
23pub struct AgentLifecycleManager {
24    supervisor: Arc<dyn Supervisor>,
25    scheduler: Arc<AgentScheduler>,
26    access_manager: Arc<parking_lot::Mutex<AccessManager>>,
27    a2a: Arc<A2AProtocol>,
28    event_bus: EventBus,
29    /// Maximum execution time in seconds for agent tasks (0 = no limit).
30    max_execution_time_secs: u64,
31}
32
33impl AgentLifecycleManager {
34    /// Create a new lifecycle manager.
35    pub fn new(
36        supervisor: Arc<dyn Supervisor>,
37        scheduler: Arc<AgentScheduler>,
38        access_manager: Arc<parking_lot::Mutex<AccessManager>>,
39        a2a: Arc<A2AProtocol>,
40        event_bus: EventBus,
41        max_execution_time_secs: u64,
42    ) -> Self {
43        Self {
44            supervisor,
45            scheduler,
46            access_manager,
47            a2a,
48            event_bus,
49            max_execution_time_secs,
50        }
51    }
52
53    /// Fork an agent, register it in A2A and access control, submit to
54    /// scheduler, run the seed, then clean up.
55    pub async fn spawn_and_run(&self, seed: &Seed, priority: Priority) -> Result<ExecutionResult> {
56        // 1. Fork
57        let agent_id = self.supervisor.fork(seed).await?;
58        let agent_name = format!("agent-{}", agent_id);
59        tracing::info!(agent_id = %agent_id, seed_id = %seed.id, "Agent forked");
60
61        // 2. Register A2A card
62        let card = self.build_agent_card(agent_id, &agent_name, seed);
63        if let Err(e) = self.a2a.registry().register_agent(card).await {
64            tracing::warn!(agent_id = %agent_id, error = %e, "Failed to register A2A card");
65        }
66
67        // 2b. Deliver any pending A2A messages to this agent
68        if let Err(e) = self.a2a.deliver_pending_messages(agent_id).await {
69            tracing::debug!(agent_id = %agent_id, error = %e, "No pending A2A messages");
70        }
71
72        // 3. Ensure access permissions
73        self.ensure_permissions(&agent_name);
74
75        // 4. Submit and start task
76        get_metrics().agents_forked.inc();
77        let task =
78            ScheduledTask::for_agent(agent_id, format!("Execute seed '{}'", seed.goal), priority);
79        let task_id = self.scheduler.submit(task)?;
80        self.scheduler.start_task(task_id)?;
81
82        // 5. Run — always cleanup even on failure
83        let result = if self.max_execution_time_secs > 0 {
84            let exec_timeout = Duration::from_secs(self.max_execution_time_secs);
85            match timeout(exec_timeout, self.supervisor.run_with_seed(agent_id, seed)).await {
86                Ok(Ok(r)) => r,
87                Ok(Err(e)) => {
88                    tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
89                    self.cleanup_on_failure(agent_id, task_id).await;
90                    return Err(e);
91                }
92                Err(_) => {
93                    let secs = exec_timeout.as_secs();
94                    tracing::warn!(agent_id = %agent_id, secs, "Agent execution timed out after {}s", secs);
95                    self.cleanup_on_failure(agent_id, task_id).await;
96                    bail!("Agent execution timed out after {} seconds", secs);
97                }
98            }
99        } else {
100            match self.supervisor.run_with_seed(agent_id, seed).await {
101                Ok(r) => r,
102                Err(e) => {
103                    tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
104                    self.cleanup_on_failure(agent_id, task_id).await;
105                    return Err(e);
106                }
107            }
108        };
109
110        // 6. Cleanup on success
111        self.cleanup(agent_id, task_id, &result).await;
112
113        Ok(result)
114    }
115
116    /// Kill an agent and clean up all registered state.
117    pub async fn terminate(&self, agent_id: AgentId) -> Result<()> {
118        if let Err(e) = self.supervisor.kill(agent_id).await {
119            tracing::warn!(agent_id = %agent_id, error = %e, "Failed to kill agent");
120        }
121        if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
122            tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
123        }
124        let _ = self
125            .event_bus
126            .publish(KernelEvent::AgentStopped { id: agent_id });
127        Ok(())
128    }
129
130    /// Build an A2A agent card from the seed.
131    fn build_agent_card(&self, agent_id: AgentId, agent_name: &str, seed: &Seed) -> AgentCard {
132        let goal_lower = seed.goal.to_lowercase();
133
134        let mut card = AgentCard::new(
135            agent_id,
136            agent_name,
137            format!("Agent executing seed: {}", seed.goal),
138        )
139        .with_capability("execute-seed")
140        .with_status(AgentStatus::Starting);
141
142        // Infer capabilities from goal.
143        if goal_lower.contains("review") || goal_lower.contains("code") {
144            card = card.with_capability("code-review");
145        }
146        if goal_lower.contains("test") {
147            card = card.with_capability("testing");
148        }
149        if goal_lower.contains("refactor") || goal_lower.contains("improve") {
150            card = card.with_capability("refactoring");
151        }
152        if goal_lower.contains("write")
153            || goal_lower.contains("create")
154            || goal_lower.contains("implement")
155        {
156            card = card.with_capability("code-generation");
157        }
158        if goal_lower.contains("debug") || goal_lower.contains("fix") {
159            card = card.with_capability("debugging");
160        }
161
162        card
163    }
164
165    /// Ensure default tool permissions exist for an agent.
166    fn ensure_permissions(&self, agent_name: &str) {
167        let mut access = self.access_manager.lock();
168        for tool in ["bash", "read", "write", "edit", "grep", "find"] {
169            access.can_use_tool(agent_name, tool);
170        }
171        if access.get_permissions(agent_name).is_none() {
172            tracing::warn!(agent = %agent_name, "Agent has no permissions, using default");
173            access.get_or_create_permissions(agent_name);
174        }
175    }
176
177    /// Unregister A2A, complete/fail scheduler task.
178    async fn cleanup(&self, agent_id: AgentId, task_id: uuid::Uuid, result: &ExecutionResult) {
179        if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
180            tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
181        }
182        if result.success {
183            let _ = self.scheduler.complete_task(task_id);
184        } else {
185            let _ = self.scheduler.fail_task(task_id, &result.output);
186        }
187    }
188
189    /// Reap finished zombie tasks and log the cleanup.
190    pub fn reap_zombies(&self) -> Vec<uuid::Uuid> {
191        let reaped = self.scheduler.reap_zombies();
192        if !reaped.is_empty() {
193            tracing::warn!(count = reaped.len(), "Zombie tasks reaped");
194            let mut access = self.access_manager.lock();
195            for task_id in &reaped {
196                access.log_access("scheduler", "zombie_reap", &task_id.to_string(), true, None);
197            }
198        }
199        reaped
200    }
201
202    /// Cleanup when agent execution fails (no ExecutionResult available).
203    async fn cleanup_on_failure(&self, agent_id: AgentId, task_id: uuid::Uuid) {
204        if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
205            tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
206        }
207        let _ = self.scheduler.fail_task(task_id, "execution failed");
208    }
209}