Skip to main content

oxios_kernel/
agent_lifecycle.rs

1//! Agent lifecycle management — fork, register, run, cleanup.
2//!
3//! Extracted from Orchestrator to reduce the god-object scope.
4//! Handles: fork agent → register A2A → check permissions →
5//! submit to scheduler → run → unregister → complete/fail.
6
7use anyhow::{Result, bail};
8use std::sync::Arc;
9
10use tokio::time::{Duration, timeout};
11
12use crate::a2a::{A2AProtocol, AgentCard};
13use crate::access_manager::{AccessManager, Role, Subject};
14use crate::event_bus::{EventBus, KernelEvent};
15use crate::metrics::get_metrics;
16use crate::scheduler::{AgentScheduler, Priority, ScheduledTask};
17use crate::supervisor::Supervisor;
18use crate::types::{AgentId, AgentStatus};
19use oxios_ouroboros::{ExecutionResult, Seed};
20
21/// Manages the full lifecycle of a single agent from fork to cleanup.
22pub struct AgentLifecycleManager {
23    supervisor: Arc<dyn Supervisor>,
24    scheduler: Arc<AgentScheduler>,
25    access_manager: Arc<parking_lot::Mutex<AccessManager>>,
26    a2a: Arc<A2AProtocol>,
27    event_bus: EventBus,
28    /// Maximum execution time in seconds for agent tasks (0 = no limit).
29    max_execution_time_secs: std::sync::atomic::AtomicU64,
30    /// Default allowed tools from config.
31    allowed_tools: Vec<String>,
32    /// Whether agents get network access by default.
33    network_access: bool,
34    /// Workspace path for path sandbox.
35    workspace_path: String,
36}
37
38impl Clone for AgentLifecycleManager {
39    fn clone(&self) -> Self {
40        Self {
41            supervisor: self.supervisor.clone(),
42            scheduler: self.scheduler.clone(),
43            access_manager: self.access_manager.clone(),
44            a2a: self.a2a.clone(),
45            event_bus: self.event_bus.clone(),
46            max_execution_time_secs: std::sync::atomic::AtomicU64::new(
47                self.max_execution_time_secs
48                    .load(std::sync::atomic::Ordering::Relaxed),
49            ),
50            allowed_tools: self.allowed_tools.clone(),
51            network_access: self.network_access,
52            workspace_path: self.workspace_path.clone(),
53        }
54    }
55}
56
57impl AgentLifecycleManager {
58    /// Create a new lifecycle manager.
59    #[allow(clippy::too_many_arguments)]
60    pub fn new(
61        supervisor: Arc<dyn Supervisor>,
62        scheduler: Arc<AgentScheduler>,
63        access_manager: Arc<parking_lot::Mutex<AccessManager>>,
64        a2a: Arc<A2AProtocol>,
65        event_bus: EventBus,
66        max_execution_time_secs: u64,
67        allowed_tools: Vec<String>,
68        network_access: bool,
69        workspace_path: String,
70    ) -> Self {
71        Self {
72            supervisor,
73            scheduler,
74            access_manager,
75            a2a,
76            event_bus,
77            max_execution_time_secs: std::sync::atomic::AtomicU64::new(max_execution_time_secs),
78            allowed_tools,
79            network_access,
80            workspace_path,
81        }
82    }
83
84    /// Hot-reload max execution time without restart.
85    pub fn set_max_execution_time(&self, secs: u64) {
86        self.max_execution_time_secs
87            .store(secs, std::sync::atomic::Ordering::Relaxed);
88        tracing::info!(
89            max_execution_time_secs = secs,
90            "Lifecycle config hot-reloaded"
91        );
92    }
93
94    /// Fork an agent, register it in A2A and access control, submit to
95    /// scheduler, run the seed, then clean up.
96    pub async fn spawn_and_run(&self, seed: &Seed, priority: Priority) -> Result<ExecutionResult> {
97        // 1. Fork
98        let agent_id = self.supervisor.fork(seed).await?;
99        let agent_name = format!("agent-{agent_id}");
100        tracing::info!(agent_id = %agent_id, seed_id = %seed.id, "Agent forked");
101
102        // 2. Register A2A card
103        let card = self.build_agent_card(agent_id, &agent_name, seed);
104        if let Err(e) = self.a2a.registry().register_agent(card).await {
105            tracing::warn!(agent_id = %agent_id, error = %e, "Failed to register A2A card");
106        }
107
108        // 2b. Deliver any pending A2A messages to this agent
109        if let Err(e) = self.a2a.deliver_pending_messages(agent_id).await {
110            tracing::debug!(agent_id = %agent_id, error = %e, "No pending A2A messages");
111        }
112
113        // 3. Ensure access permissions
114        self.ensure_permissions(&agent_name);
115
116        // 4. Submit and start task
117        get_metrics().agents_forked.inc();
118        let task =
119            ScheduledTask::for_agent(agent_id, format!("Execute seed '{}'", seed.goal), priority);
120        let task_id = self.scheduler.submit(task)?;
121        self.scheduler.start_task(task_id)?;
122
123        // 5. Run — always cleanup even on failure
124        let max_secs = self
125            .max_execution_time_secs
126            .load(std::sync::atomic::Ordering::Relaxed);
127        let result = if max_secs > 0 {
128            let exec_timeout = Duration::from_secs(max_secs);
129            match timeout(exec_timeout, self.supervisor.run_with_seed(agent_id, seed)).await {
130                Ok(Ok(r)) => r,
131                Ok(Err(e)) => {
132                    tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
133                    self.cleanup_on_failure(agent_id, task_id).await;
134                    return Err(e);
135                }
136                Err(_) => {
137                    let secs = exec_timeout.as_secs();
138                    tracing::warn!(agent_id = %agent_id, secs, "Agent execution timed out after {}s", secs);
139                    self.cleanup_on_failure(agent_id, task_id).await;
140                    bail!("Agent execution timed out after {secs} seconds");
141                }
142            }
143        } else {
144            match self.supervisor.run_with_seed(agent_id, seed).await {
145                Ok(r) => r,
146                Err(e) => {
147                    tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
148                    self.cleanup_on_failure(agent_id, task_id).await;
149                    return Err(e);
150                }
151            }
152        };
153
154        // 6. Cleanup on success
155        self.cleanup(agent_id, task_id, &result).await;
156
157        Ok(result)
158    }
159
160    /// Kill an agent and clean up all registered state.
161    pub async fn terminate(&self, agent_id: AgentId) -> Result<()> {
162        if let Err(e) = self.supervisor.kill(agent_id).await {
163            tracing::warn!(agent_id = %agent_id, error = %e, "Failed to kill agent");
164        }
165        if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
166            tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
167        }
168        let _ = self
169            .event_bus
170            .publish(KernelEvent::AgentStopped { id: agent_id });
171        Ok(())
172    }
173
174    /// Build an A2A agent card from the seed.
175    fn build_agent_card(&self, agent_id: AgentId, agent_name: &str, seed: &Seed) -> AgentCard {
176        let goal_lower = seed.goal.to_lowercase();
177
178        let mut card = AgentCard::new(
179            agent_id,
180            agent_name,
181            format!("Agent executing seed: {}", seed.goal),
182        )
183        .with_capability("execute-seed")
184        .with_status(AgentStatus::Starting);
185
186        // Infer capabilities from goal.
187        if goal_lower.contains("review") || goal_lower.contains("code") {
188            card = card.with_capability("code-review");
189        }
190        if goal_lower.contains("test") {
191            card = card.with_capability("testing");
192        }
193        if goal_lower.contains("refactor") || goal_lower.contains("improve") {
194            card = card.with_capability("refactoring");
195        }
196        if goal_lower.contains("write")
197            || goal_lower.contains("create")
198            || goal_lower.contains("implement")
199        {
200            card = card.with_capability("code-generation");
201        }
202        if goal_lower.contains("debug") || goal_lower.contains("fix") {
203            card = card.with_capability("debugging");
204        }
205
206        card
207    }
208
209    /// Ensure default tool permissions exist for an agent.
210    ///
211    /// Applies config.toml `[security]` settings:
212    /// - `allowed_tools` → agent's tool set
213    /// - `network_access` → network permission
214    /// - workspace path → path sandbox
215    /// - RBAC `Superuser` role → allows all tools and paths
216    fn ensure_permissions(&self, agent_name: &str) {
217        let mut access = self.access_manager.lock();
218        let perms = access.get_or_create_permissions(agent_name);
219
220        // Grant all tools from config
221        for tool in &self.allowed_tools {
222            if !perms.allowed_tools.contains(tool.as_str()) {
223                perms.allow_tool(tool);
224            }
225        }
226
227        // Add workspace path to allowed paths
228        let ws_pattern = format!("{}/**", self.workspace_path.trim_end_matches('/'));
229        if !perms.allowed_paths.iter().any(|p| p == &ws_pattern) {
230            perms.allow_path(&ws_pattern);
231        }
232        // Also allow /tmp for agent temp files
233        if !perms.allowed_paths.iter().any(|p| p == "/tmp/**") {
234            perms.allow_path("/tmp/**");
235        }
236
237        // Apply network access from config
238        if self.network_access {
239            perms.enable_network();
240        }
241
242        // Assign Superuser RBAC role so AccessGate passes
243        // (config.toml already defines which tools are allowed)
244        let subject = Subject::Agent(
245            agent_name
246                .strip_prefix("agent-")
247                .and_then(|s| s.parse().ok())
248                .unwrap_or_default(),
249        );
250        access
251            .rbac_manager_mut()
252            .assign_role(subject, Role::Superuser);
253    }
254
255    /// Unregister A2A, complete/fail scheduler task.
256    async fn cleanup(&self, agent_id: AgentId, task_id: uuid::Uuid, result: &ExecutionResult) {
257        if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
258            tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
259        }
260        if result.success {
261            let _ = self.scheduler.complete_task(task_id);
262        } else {
263            let _ = self.scheduler.fail_task(task_id, &result.output);
264        }
265    }
266
267    /// Reap finished zombie tasks and log the cleanup.
268    pub fn reap_zombies(&self) -> Vec<uuid::Uuid> {
269        let reaped = self.scheduler.reap_zombies();
270        if !reaped.is_empty() {
271            tracing::warn!(count = reaped.len(), "Zombie tasks reaped");
272            let mut access = self.access_manager.lock();
273            for task_id in &reaped {
274                access.log_access("scheduler", "zombie_reap", &task_id.to_string(), true, None);
275            }
276        }
277        reaped
278    }
279
280    /// Cleanup when agent execution fails (no ExecutionResult available).
281    async fn cleanup_on_failure(&self, agent_id: AgentId, task_id: uuid::Uuid) {
282        if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
283            tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
284        }
285        let _ = self.scheduler.fail_task(task_id, "execution failed");
286    }
287}