Skip to main content

oxios_kernel/
agent_lifecycle.rs

1//! Agent lifecycle management — fork, register, run, cleanup.
2//!
3//! Extracted from Orchestrator to reduce the god-object scope.
4//! Handles: fork agent → register A2A → check permissions →
5//! submit to scheduler → run → unregister → complete/fail.
6
7use anyhow::{Result, bail};
8use std::sync::Arc;
9
10use tokio::time::{Duration, timeout};
11
12use crate::a2a::{A2AProtocol, AgentCard};
13use crate::access_manager::{AccessManager, Role, Subject};
14use crate::event_bus::{EventBus, KernelEvent};
15use crate::metrics::get_metrics;
16use crate::scheduler::{AgentScheduler, Priority, ScheduledTask};
17use crate::supervisor::Supervisor;
18use crate::types::{AgentId, AgentStatus};
19use oxios_ouroboros::{Directive, ExecEnv, ExecutionResult, Seed};
20
21/// Manages the full lifecycle of a single agent from fork to cleanup.
22pub struct AgentLifecycleManager {
23    supervisor: Arc<dyn Supervisor>,
24    scheduler: Arc<AgentScheduler>,
25    access_manager: Arc<parking_lot::Mutex<AccessManager>>,
26    a2a: Arc<A2AProtocol>,
27    event_bus: EventBus,
28    /// Maximum execution time in seconds for agent tasks (0 = no limit).
29    max_execution_time_secs: std::sync::atomic::AtomicU64,
30    /// Default allowed tools from config.
31    allowed_tools: Vec<String>,
32    /// Whether agents get network access by default.
33    network_access: bool,
34    /// Workspace path for path sandbox.
35    workspace_path: String,
36}
37
38impl Clone for AgentLifecycleManager {
39    fn clone(&self) -> Self {
40        Self {
41            supervisor: self.supervisor.clone(),
42            scheduler: self.scheduler.clone(),
43            access_manager: self.access_manager.clone(),
44            a2a: self.a2a.clone(),
45            event_bus: self.event_bus.clone(),
46            max_execution_time_secs: std::sync::atomic::AtomicU64::new(
47                self.max_execution_time_secs
48                    .load(std::sync::atomic::Ordering::Relaxed),
49            ),
50            allowed_tools: self.allowed_tools.clone(),
51            network_access: self.network_access,
52            workspace_path: self.workspace_path.clone(),
53        }
54    }
55}
56
57impl AgentLifecycleManager {
58    /// Create a new lifecycle manager.
59    #[allow(clippy::too_many_arguments)]
60    pub fn new(
61        supervisor: Arc<dyn Supervisor>,
62        scheduler: Arc<AgentScheduler>,
63        access_manager: Arc<parking_lot::Mutex<AccessManager>>,
64        a2a: Arc<A2AProtocol>,
65        event_bus: EventBus,
66        max_execution_time_secs: u64,
67        allowed_tools: Vec<String>,
68        network_access: bool,
69        workspace_path: String,
70    ) -> Self {
71        Self {
72            supervisor,
73            scheduler,
74            access_manager,
75            a2a,
76            event_bus,
77            max_execution_time_secs: std::sync::atomic::AtomicU64::new(max_execution_time_secs),
78            allowed_tools,
79            network_access,
80            workspace_path,
81        }
82    }
83
84    /// Hot-reload max execution time without restart.
85    pub fn set_max_execution_time(&self, secs: u64) {
86        self.max_execution_time_secs
87            .store(secs, std::sync::atomic::Ordering::Relaxed);
88        tracing::info!(
89            max_execution_time_secs = secs,
90            "Lifecycle config hot-reloaded"
91        );
92    }
93
94    /// Fork an agent, register it in A2A and access control, submit to
95    /// scheduler, run the seed, then clean up.
96    pub async fn spawn_and_run(&self, seed: &Seed, priority: Priority) -> Result<ExecutionResult> {
97        // 1. Fork
98        let agent_id = self.supervisor.fork(seed).await?;
99        let agent_name = format!("agent-{agent_id}");
100        tracing::info!(agent_id = %agent_id, seed_id = %seed.id, "Agent forked");
101
102        // 2. Register A2A card
103        let card = self.build_agent_card(agent_id, &agent_name, seed);
104        if let Err(e) = self.a2a.registry().register_agent(card).await {
105            tracing::warn!(agent_id = %agent_id, error = %e, "Failed to register A2A card");
106        }
107
108        // 2b. Deliver any pending A2A messages to this agent
109        if let Err(e) = self.a2a.deliver_pending_messages(agent_id).await {
110            tracing::debug!(agent_id = %agent_id, error = %e, "No pending A2A messages");
111        }
112
113        // 3. Ensure access permissions
114        self.ensure_permissions(&agent_name);
115
116        // 4. Submit and start task
117        get_metrics().agents_forked.inc();
118        let task =
119            ScheduledTask::for_agent(agent_id, format!("Execute seed '{}'", seed.goal), priority);
120        let task_id = self.scheduler.submit(task)?;
121        self.scheduler.start_task(task_id)?;
122
123        // 5. Run — always cleanup even on failure
124        let max_secs = self
125            .max_execution_time_secs
126            .load(std::sync::atomic::Ordering::Relaxed);
127        let result = if max_secs > 0 {
128            let exec_timeout = Duration::from_secs(max_secs);
129            match timeout(exec_timeout, self.supervisor.run_with_seed(agent_id, seed)).await {
130                Ok(Ok(r)) => r,
131                Ok(Err(e)) => {
132                    tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
133                    self.cleanup_on_failure(agent_id, task_id).await;
134                    return Err(e);
135                }
136                Err(_) => {
137                    let secs = exec_timeout.as_secs();
138                    tracing::warn!(
139                        agent_id = %agent_id,
140                        secs,
141                        "Agent execution timed out after {}s",
142                        secs
143                    );
144                    // Abort the detached execution body. Previously the
145                    // timeout only dropped the awaiting future while the
146                    // spawned task kept running — leaking tokens/resources.
147                    let _ = self.supervisor.kill(agent_id).await;
148                    self.cleanup_on_failure(agent_id, task_id).await;
149                    bail!("Agent execution timed out after {secs} seconds");
150                }
151            }
152        } else {
153            match self.supervisor.run_with_seed(agent_id, seed).await {
154                Ok(r) => r,
155                Err(e) => {
156                    tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
157                    self.cleanup_on_failure(agent_id, task_id).await;
158                    return Err(e);
159                }
160            }
161        };
162
163        // 6. Cleanup on success
164        self.cleanup(agent_id, task_id, &result).await;
165
166        Ok(result)
167    }
168
169    /// Fork an agent, register it in A2A and access control, submit to
170    /// scheduler, run the directive + exec env, then clean up (RFC-027).
171    ///
172    /// Mirrors [`spawn_and_run`](Self::spawn_and_run) but operates on the
173    /// unified-intent types. The legacy Seed variant stays for Phase 6.
174    pub async fn execute_directive(
175        &self,
176        directive: &Directive,
177        env: &ExecEnv,
178        priority: Priority,
179    ) -> Result<ExecutionResult> {
180        // 1. Fork
181        let agent_id = self.supervisor.fork_directive(directive, env).await?;
182        let agent_name = format!("agent-{agent_id}");
183        tracing::info!(agent_id = %agent_id, "Agent forked from directive");
184
185        // 2. Register A2A card
186        let card = self.build_agent_card_directive(agent_id, &agent_name, directive);
187        if let Err(e) = self.a2a.registry().register_agent(card).await {
188            tracing::warn!(agent_id = %agent_id, error = %e, "Failed to register A2A card");
189        }
190
191        // 2b. Deliver any pending A2A messages to this agent
192        if let Err(e) = self.a2a.deliver_pending_messages(agent_id).await {
193            tracing::debug!(agent_id = %agent_id, error = %e, "No pending A2A messages");
194        }
195
196        // 3. Ensure access permissions
197        self.ensure_permissions(&agent_name);
198
199        // 4. Submit and start task
200        get_metrics().agents_forked.inc();
201        let task = ScheduledTask::for_agent(
202            agent_id,
203            format!("Execute directive '{}'", directive.goal),
204            priority,
205        );
206        let task_id = self.scheduler.submit(task)?;
207        self.scheduler.start_task(task_id)?;
208
209        // 5. Run — always cleanup even on failure
210        let max_secs = self
211            .max_execution_time_secs
212            .load(std::sync::atomic::Ordering::Relaxed);
213        let result = if max_secs > 0 {
214            let exec_timeout = Duration::from_secs(max_secs);
215            match timeout(
216                exec_timeout,
217                self.supervisor.run_with_directive(agent_id, directive, env),
218            )
219            .await
220            {
221                Ok(Ok(r)) => r,
222                Ok(Err(e)) => {
223                    tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
224                    self.cleanup_on_failure(agent_id, task_id).await;
225                    return Err(e);
226                }
227                Err(_) => {
228                    let secs = exec_timeout.as_secs();
229                    tracing::warn!(
230                        agent_id = %agent_id,
231                        secs,
232                        "Agent execution timed out after {}s",
233                        secs
234                    );
235                    // Abort the detached execution body. Previously the
236                    // timeout only dropped the awaiting future while the
237                    // spawned task kept running — leaking tokens/resources.
238                    let _ = self.supervisor.kill(agent_id).await;
239                    self.cleanup_on_failure(agent_id, task_id).await;
240                    bail!("Agent execution timed out after {secs} seconds");
241                }
242            }
243        } else {
244            match self
245                .supervisor
246                .run_with_directive(agent_id, directive, env)
247                .await
248            {
249                Ok(r) => r,
250                Err(e) => {
251                    tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
252                    self.cleanup_on_failure(agent_id, task_id).await;
253                    return Err(e);
254                }
255            }
256        };
257
258        // 6. Cleanup on success
259        self.cleanup(agent_id, task_id, &result).await;
260
261        Ok(result)
262    }
263
264    /// Execute a directive with feedback from a previous failed attempt (RFC-027).
265    ///
266    /// Injects the previous result's output and the review gaps into the
267    /// directive's constraints so the agent sees what went wrong.
268    pub async fn execute_with_feedback(
269        &self,
270        directive: &Directive,
271        env: &ExecEnv,
272        prev_result: &ExecutionResult,
273        gaps: &[String],
274        priority: Priority,
275    ) -> Result<ExecutionResult> {
276        // Augment the directive with feedback from the previous attempt.
277        let mut augmented = directive.clone();
278        let feedback = format!(
279            "## Previous attempt failed\n{}\n\n## Unmet criteria\n{}\n\n\
280             Review the above output and fix the unmet criteria.",
281            prev_result.output,
282            gaps.iter()
283                .enumerate()
284                .map(|(i, g)| format!("{}. {g}", i + 1))
285                .collect::<Vec<_>>()
286                .join("\n")
287        );
288        augmented.constraints.push(feedback);
289
290        self.execute_directive(&augmented, env, priority).await
291    }
292
293    /// Kill an agent and clean up all registered state.
294    pub async fn terminate(&self, agent_id: AgentId) -> Result<()> {
295        if let Err(e) = self.supervisor.kill(agent_id).await {
296            tracing::warn!(agent_id = %agent_id, error = %e, "Failed to kill agent");
297        }
298        if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
299            tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
300        }
301        let _ = self.event_bus.publish(KernelEvent::AgentStopped {
302            id: agent_id,
303            success: false,
304        });
305        Ok(())
306    }
307
308    /// Build an A2A agent card from the seed.
309    fn build_agent_card(&self, agent_id: AgentId, agent_name: &str, seed: &Seed) -> AgentCard {
310        let goal_lower = seed.goal.to_lowercase();
311
312        let mut card = AgentCard::new(
313            agent_id,
314            agent_name,
315            format!("Agent executing seed: {}", seed.goal),
316        )
317        .with_capability("execute-seed")
318        .with_status(AgentStatus::Starting);
319
320        // Infer capabilities from goal.
321        if goal_lower.contains("review") || goal_lower.contains("code") {
322            card = card.with_capability("code-review");
323        }
324        if goal_lower.contains("test") {
325            card = card.with_capability("testing");
326        }
327        if goal_lower.contains("refactor") || goal_lower.contains("improve") {
328            card = card.with_capability("refactoring");
329        }
330        if goal_lower.contains("write")
331            || goal_lower.contains("create")
332            || goal_lower.contains("implement")
333        {
334            card = card.with_capability("code-generation");
335        }
336        if goal_lower.contains("debug") || goal_lower.contains("fix") {
337            card = card.with_capability("debugging");
338        }
339
340        card
341    }
342
343    /// Build an A2A agent card from a Directive (RFC-027).
344    ///
345    /// Mirrors [`build_agent_card`](Self::build_agent_card) but reads the
346    /// goal from a Directive. The card advertises `execute-directive` instead
347    /// of `execute-seed` so A2A consumers can distinguish the two paths.
348    fn build_agent_card_directive(
349        &self,
350        agent_id: AgentId,
351        agent_name: &str,
352        directive: &Directive,
353    ) -> AgentCard {
354        let goal_lower = directive.goal.to_lowercase();
355
356        let mut card = AgentCard::new(
357            agent_id,
358            agent_name,
359            format!("Agent executing directive: {}", directive.goal),
360        )
361        .with_capability("execute-directive")
362        .with_status(AgentStatus::Starting);
363
364        // Infer capabilities from goal.
365        if goal_lower.contains("review") || goal_lower.contains("code") {
366            card = card.with_capability("code-review");
367        }
368        if goal_lower.contains("test") {
369            card = card.with_capability("testing");
370        }
371        if goal_lower.contains("refactor") || goal_lower.contains("improve") {
372            card = card.with_capability("refactoring");
373        }
374        if goal_lower.contains("write")
375            || goal_lower.contains("create")
376            || goal_lower.contains("implement")
377        {
378            card = card.with_capability("code-generation");
379        }
380        if goal_lower.contains("debug") || goal_lower.contains("fix") {
381            card = card.with_capability("debugging");
382        }
383
384        card
385    }
386
387    /// Ensure default tool permissions exist for an agent.
388    ///
389    /// Applies config.toml `[security]` settings:
390    /// - `allowed_tools` → agent's tool set
391    /// - `network_access` → network permission
392    /// - workspace path → path sandbox
393    /// - RBAC `Superuser` role → allows all tools and paths
394    fn ensure_permissions(&self, agent_name: &str) {
395        let mut access = self.access_manager.lock();
396        let perms = access.get_or_create_permissions(agent_name);
397
398        // Grant all tools from config
399        for tool in &self.allowed_tools {
400            if !perms.allowed_tools.contains(tool.as_str()) {
401                perms.allow_tool(tool);
402            }
403        }
404
405        // Add workspace path to allowed paths
406        let ws_pattern = format!("{}/**", self.workspace_path.trim_end_matches('/'));
407        if !perms.allowed_paths.iter().any(|p| p == &ws_pattern) {
408            perms.allow_path(&ws_pattern);
409        }
410        // Also allow /tmp for agent temp files
411        if !perms.allowed_paths.iter().any(|p| p == "/tmp/**") {
412            perms.allow_path("/tmp/**");
413        }
414
415        // Apply network access from config
416        if self.network_access {
417            perms.enable_network();
418        }
419
420        // Assign Superuser RBAC role so AccessGate passes
421        // (config.toml already defines which tools are allowed)
422        let subject = Subject::Agent(
423            agent_name
424                .strip_prefix("agent-")
425                .and_then(|s| s.parse().ok())
426                .unwrap_or_default(),
427        );
428        access
429            .rbac_manager_mut()
430            .assign_role(subject, Role::Superuser);
431    }
432
433    /// Unregister A2A, complete/fail scheduler task.
434    async fn cleanup(&self, agent_id: AgentId, task_id: uuid::Uuid, result: &ExecutionResult) {
435        if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
436            tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
437        }
438        if result.success {
439            let _ = self.scheduler.complete_task(task_id);
440        } else {
441            let _ = self.scheduler.fail_task(task_id, &result.output);
442        }
443    }
444
445    /// Reap finished zombie tasks and log the cleanup.
446    pub fn reap_zombies(&self) -> Vec<uuid::Uuid> {
447        let reaped = self.scheduler.reap_zombies();
448        if !reaped.is_empty() {
449            tracing::warn!(count = reaped.len(), "Zombie tasks reaped");
450            let mut access = self.access_manager.lock();
451            for task_id in &reaped {
452                access.log_access("scheduler", "zombie_reap", &task_id.to_string(), true, None);
453            }
454        }
455        reaped
456    }
457
458    /// Cleanup when agent execution fails (no ExecutionResult available).
459    async fn cleanup_on_failure(&self, agent_id: AgentId, task_id: uuid::Uuid) {
460        if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
461            tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
462        }
463        let _ = self.scheduler.fail_task(task_id, "execution failed");
464    }
465}