1use anyhow::{bail, Result};
8use std::sync::Arc;
9
10use tokio::time::{timeout, Duration};
11
12use crate::a2a::{A2AProtocol, AgentCard};
13use crate::access_manager::AccessManager;
14use crate::event_bus::{EventBus, KernelEvent};
15use crate::metrics::get_metrics;
16use crate::scheduler::{AgentScheduler, Priority, ScheduledTask};
17use crate::supervisor::Supervisor;
18use crate::types::{AgentId, AgentStatus};
19use oxios_ouroboros::{ExecutionResult, Seed};
20
21#[derive(Clone)]
23pub struct AgentLifecycleManager {
24 supervisor: Arc<dyn Supervisor>,
25 scheduler: Arc<AgentScheduler>,
26 access_manager: Arc<parking_lot::Mutex<AccessManager>>,
27 a2a: Arc<A2AProtocol>,
28 event_bus: EventBus,
29 max_execution_time_secs: u64,
31}
32
33impl AgentLifecycleManager {
34 pub fn new(
36 supervisor: Arc<dyn Supervisor>,
37 scheduler: Arc<AgentScheduler>,
38 access_manager: Arc<parking_lot::Mutex<AccessManager>>,
39 a2a: Arc<A2AProtocol>,
40 event_bus: EventBus,
41 max_execution_time_secs: u64,
42 ) -> Self {
43 Self {
44 supervisor,
45 scheduler,
46 access_manager,
47 a2a,
48 event_bus,
49 max_execution_time_secs,
50 }
51 }
52
53 pub async fn spawn_and_run(&self, seed: &Seed, priority: Priority) -> Result<ExecutionResult> {
56 let agent_id = self.supervisor.fork(seed).await?;
58 let agent_name = format!("agent-{}", agent_id);
59 tracing::info!(agent_id = %agent_id, seed_id = %seed.id, "Agent forked");
60
61 let card = self.build_agent_card(agent_id, &agent_name, seed);
63 if let Err(e) = self.a2a.registry().register_agent(card).await {
64 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to register A2A card");
65 }
66
67 if let Err(e) = self.a2a.deliver_pending_messages(agent_id).await {
69 tracing::debug!(agent_id = %agent_id, error = %e, "No pending A2A messages");
70 }
71
72 self.ensure_permissions(&agent_name);
74
75 get_metrics().agents_forked.inc();
77 let task =
78 ScheduledTask::for_agent(agent_id, format!("Execute seed '{}'", seed.goal), priority);
79 let task_id = self.scheduler.submit(task)?;
80 self.scheduler.start_task(task_id)?;
81
82 let result = if self.max_execution_time_secs > 0 {
84 let exec_timeout = Duration::from_secs(self.max_execution_time_secs);
85 match timeout(exec_timeout, self.supervisor.run_with_seed(agent_id, seed)).await {
86 Ok(Ok(r)) => r,
87 Ok(Err(e)) => {
88 tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
89 self.cleanup_on_failure(agent_id, task_id).await;
90 return Err(e);
91 }
92 Err(_) => {
93 let secs = exec_timeout.as_secs();
94 tracing::warn!(agent_id = %agent_id, secs, "Agent execution timed out after {}s", secs);
95 self.cleanup_on_failure(agent_id, task_id).await;
96 bail!("Agent execution timed out after {} seconds", secs);
97 }
98 }
99 } else {
100 match self.supervisor.run_with_seed(agent_id, seed).await {
101 Ok(r) => r,
102 Err(e) => {
103 tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
104 self.cleanup_on_failure(agent_id, task_id).await;
105 return Err(e);
106 }
107 }
108 };
109
110 self.cleanup(agent_id, task_id, &result).await;
112
113 Ok(result)
114 }
115
116 pub async fn terminate(&self, agent_id: AgentId) -> Result<()> {
118 if let Err(e) = self.supervisor.kill(agent_id).await {
119 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to kill agent");
120 }
121 if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
122 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
123 }
124 let _ = self
125 .event_bus
126 .publish(KernelEvent::AgentStopped { id: agent_id });
127 Ok(())
128 }
129
130 fn build_agent_card(&self, agent_id: AgentId, agent_name: &str, seed: &Seed) -> AgentCard {
132 let goal_lower = seed.goal.to_lowercase();
133
134 let mut card = AgentCard::new(
135 agent_id,
136 agent_name,
137 format!("Agent executing seed: {}", seed.goal),
138 )
139 .with_capability("execute-seed")
140 .with_status(AgentStatus::Starting);
141
142 if goal_lower.contains("review") || goal_lower.contains("code") {
144 card = card.with_capability("code-review");
145 }
146 if goal_lower.contains("test") {
147 card = card.with_capability("testing");
148 }
149 if goal_lower.contains("refactor") || goal_lower.contains("improve") {
150 card = card.with_capability("refactoring");
151 }
152 if goal_lower.contains("write")
153 || goal_lower.contains("create")
154 || goal_lower.contains("implement")
155 {
156 card = card.with_capability("code-generation");
157 }
158 if goal_lower.contains("debug") || goal_lower.contains("fix") {
159 card = card.with_capability("debugging");
160 }
161
162 card
163 }
164
165 fn ensure_permissions(&self, agent_name: &str) {
167 let mut access = self.access_manager.lock();
168 for tool in ["bash", "read", "write", "edit", "grep", "find"] {
169 access.can_use_tool(agent_name, tool);
170 }
171 if access.get_permissions(agent_name).is_none() {
172 tracing::warn!(agent = %agent_name, "Agent has no permissions, using default");
173 access.get_or_create_permissions(agent_name);
174 }
175 }
176
177 async fn cleanup(&self, agent_id: AgentId, task_id: uuid::Uuid, result: &ExecutionResult) {
179 if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
180 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
181 }
182 if result.success {
183 let _ = self.scheduler.complete_task(task_id);
184 } else {
185 let _ = self.scheduler.fail_task(task_id, &result.output);
186 }
187 }
188
189 pub fn reap_zombies(&self) -> Vec<uuid::Uuid> {
191 let reaped = self.scheduler.reap_zombies();
192 if !reaped.is_empty() {
193 tracing::warn!(count = reaped.len(), "Zombie tasks reaped");
194 let mut access = self.access_manager.lock();
195 for task_id in &reaped {
196 access.log_access("scheduler", "zombie_reap", &task_id.to_string(), true, None);
197 }
198 }
199 reaped
200 }
201
202 async fn cleanup_on_failure(&self, agent_id: AgentId, task_id: uuid::Uuid) {
204 if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
205 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
206 }
207 let _ = self.scheduler.fail_task(task_id, "execution failed");
208 }
209}