1use anyhow::{Result, bail};
8use std::sync::Arc;
9
10use tokio::time::{Duration, timeout};
11
12use crate::a2a::{A2AProtocol, AgentCard};
13use crate::access_manager::{AccessManager, Role, Subject};
14use crate::event_bus::{EventBus, KernelEvent};
15use crate::metrics::get_metrics;
16use crate::scheduler::{AgentScheduler, Priority, ScheduledTask};
17use crate::supervisor::Supervisor;
18use crate::types::{AgentId, AgentStatus};
19use oxios_ouroboros::{Directive, ExecEnv, ExecutionResult, Seed};
20
21pub struct AgentLifecycleManager {
23 supervisor: Arc<dyn Supervisor>,
24 scheduler: Arc<AgentScheduler>,
25 access_manager: Arc<parking_lot::Mutex<AccessManager>>,
26 a2a: Arc<A2AProtocol>,
27 event_bus: EventBus,
28 max_execution_time_secs: std::sync::atomic::AtomicU64,
30 allowed_tools: Vec<String>,
32 network_access: bool,
34 workspace_path: String,
36}
37
38impl Clone for AgentLifecycleManager {
39 fn clone(&self) -> Self {
40 Self {
41 supervisor: self.supervisor.clone(),
42 scheduler: self.scheduler.clone(),
43 access_manager: self.access_manager.clone(),
44 a2a: self.a2a.clone(),
45 event_bus: self.event_bus.clone(),
46 max_execution_time_secs: std::sync::atomic::AtomicU64::new(
47 self.max_execution_time_secs
48 .load(std::sync::atomic::Ordering::Relaxed),
49 ),
50 allowed_tools: self.allowed_tools.clone(),
51 network_access: self.network_access,
52 workspace_path: self.workspace_path.clone(),
53 }
54 }
55}
56
57impl AgentLifecycleManager {
58 #[allow(clippy::too_many_arguments)]
60 pub fn new(
61 supervisor: Arc<dyn Supervisor>,
62 scheduler: Arc<AgentScheduler>,
63 access_manager: Arc<parking_lot::Mutex<AccessManager>>,
64 a2a: Arc<A2AProtocol>,
65 event_bus: EventBus,
66 max_execution_time_secs: u64,
67 allowed_tools: Vec<String>,
68 network_access: bool,
69 workspace_path: String,
70 ) -> Self {
71 Self {
72 supervisor,
73 scheduler,
74 access_manager,
75 a2a,
76 event_bus,
77 max_execution_time_secs: std::sync::atomic::AtomicU64::new(max_execution_time_secs),
78 allowed_tools,
79 network_access,
80 workspace_path,
81 }
82 }
83
84 pub fn set_max_execution_time(&self, secs: u64) {
86 self.max_execution_time_secs
87 .store(secs, std::sync::atomic::Ordering::Relaxed);
88 tracing::info!(
89 max_execution_time_secs = secs,
90 "Lifecycle config hot-reloaded"
91 );
92 }
93
94 pub async fn spawn_and_run(&self, seed: &Seed, priority: Priority) -> Result<ExecutionResult> {
97 let agent_id = self.supervisor.fork(seed).await?;
99 let agent_name = format!("agent-{agent_id}");
100 tracing::info!(agent_id = %agent_id, seed_id = %seed.id, "Agent forked");
101
102 let card = self.build_agent_card(agent_id, &agent_name, seed);
104 if let Err(e) = self.a2a.registry().register_agent(card).await {
105 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to register A2A card");
106 }
107
108 if let Err(e) = self.a2a.deliver_pending_messages(agent_id).await {
110 tracing::debug!(agent_id = %agent_id, error = %e, "No pending A2A messages");
111 }
112
113 self.ensure_permissions(&agent_name);
115
116 get_metrics().agents_forked.inc();
118 let task =
119 ScheduledTask::for_agent(agent_id, format!("Execute seed '{}'", seed.goal), priority);
120 let task_id = self.scheduler.submit(task)?;
121 self.scheduler.start_task(task_id)?;
122
123 let max_secs = self
125 .max_execution_time_secs
126 .load(std::sync::atomic::Ordering::Relaxed);
127 let result = if max_secs > 0 {
128 let exec_timeout = Duration::from_secs(max_secs);
129 match timeout(exec_timeout, self.supervisor.run_with_seed(agent_id, seed)).await {
130 Ok(Ok(r)) => r,
131 Ok(Err(e)) => {
132 tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
133 self.cleanup_on_failure(agent_id, task_id).await;
134 return Err(e);
135 }
136 Err(_) => {
137 let secs = exec_timeout.as_secs();
138 tracing::warn!(
139 agent_id = %agent_id,
140 secs,
141 "Agent execution timed out after {}s",
142 secs
143 );
144 let _ = self.supervisor.kill(agent_id).await;
148 self.cleanup_on_failure(agent_id, task_id).await;
149 bail!("Agent execution timed out after {secs} seconds");
150 }
151 }
152 } else {
153 match self.supervisor.run_with_seed(agent_id, seed).await {
154 Ok(r) => r,
155 Err(e) => {
156 tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
157 self.cleanup_on_failure(agent_id, task_id).await;
158 return Err(e);
159 }
160 }
161 };
162
163 self.cleanup(agent_id, task_id, &result).await;
165
166 Ok(result)
167 }
168
169 pub async fn execute_directive(
175 &self,
176 directive: &Directive,
177 env: &ExecEnv,
178 priority: Priority,
179 ) -> Result<ExecutionResult> {
180 let agent_id = self.supervisor.fork_directive(directive, env).await?;
182 let agent_name = format!("agent-{agent_id}");
183 tracing::info!(agent_id = %agent_id, "Agent forked from directive");
184
185 let card = self.build_agent_card_directive(agent_id, &agent_name, directive);
187 if let Err(e) = self.a2a.registry().register_agent(card).await {
188 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to register A2A card");
189 }
190
191 if let Err(e) = self.a2a.deliver_pending_messages(agent_id).await {
193 tracing::debug!(agent_id = %agent_id, error = %e, "No pending A2A messages");
194 }
195
196 self.ensure_permissions(&agent_name);
198
199 get_metrics().agents_forked.inc();
201 let task = ScheduledTask::for_agent(
202 agent_id,
203 format!("Execute directive '{}'", directive.goal),
204 priority,
205 );
206 let task_id = self.scheduler.submit(task)?;
207 self.scheduler.start_task(task_id)?;
208
209 let max_secs = self
211 .max_execution_time_secs
212 .load(std::sync::atomic::Ordering::Relaxed);
213 let result = if max_secs > 0 {
214 let exec_timeout = Duration::from_secs(max_secs);
215 match timeout(
216 exec_timeout,
217 self.supervisor.run_with_directive(agent_id, directive, env),
218 )
219 .await
220 {
221 Ok(Ok(r)) => r,
222 Ok(Err(e)) => {
223 tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
224 self.cleanup_on_failure(agent_id, task_id).await;
225 return Err(e);
226 }
227 Err(_) => {
228 let secs = exec_timeout.as_secs();
229 tracing::warn!(
230 agent_id = %agent_id,
231 secs,
232 "Agent execution timed out after {}s",
233 secs
234 );
235 let _ = self.supervisor.kill(agent_id).await;
239 self.cleanup_on_failure(agent_id, task_id).await;
240 bail!("Agent execution timed out after {secs} seconds");
241 }
242 }
243 } else {
244 match self
245 .supervisor
246 .run_with_directive(agent_id, directive, env)
247 .await
248 {
249 Ok(r) => r,
250 Err(e) => {
251 tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
252 self.cleanup_on_failure(agent_id, task_id).await;
253 return Err(e);
254 }
255 }
256 };
257
258 self.cleanup(agent_id, task_id, &result).await;
260
261 Ok(result)
262 }
263
264 pub async fn execute_with_feedback(
269 &self,
270 directive: &Directive,
271 env: &ExecEnv,
272 prev_result: &ExecutionResult,
273 gaps: &[String],
274 priority: Priority,
275 ) -> Result<ExecutionResult> {
276 let mut augmented = directive.clone();
278 let feedback = format!(
279 "## Previous attempt failed\n{}\n\n## Unmet criteria\n{}\n\n\
280 Review the above output and fix the unmet criteria.",
281 prev_result.output,
282 gaps.iter()
283 .enumerate()
284 .map(|(i, g)| format!("{}. {g}", i + 1))
285 .collect::<Vec<_>>()
286 .join("\n")
287 );
288 augmented.constraints.push(feedback);
289
290 self.execute_directive(&augmented, env, priority).await
291 }
292
293 pub async fn terminate(&self, agent_id: AgentId) -> Result<()> {
295 if let Err(e) = self.supervisor.kill(agent_id).await {
296 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to kill agent");
297 }
298 if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
299 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
300 }
301 let _ = self.event_bus.publish(KernelEvent::AgentStopped {
302 id: agent_id,
303 success: false,
304 });
305 Ok(())
306 }
307
308 fn build_agent_card(&self, agent_id: AgentId, agent_name: &str, seed: &Seed) -> AgentCard {
310 let goal_lower = seed.goal.to_lowercase();
311
312 let mut card = AgentCard::new(
313 agent_id,
314 agent_name,
315 format!("Agent executing seed: {}", seed.goal),
316 )
317 .with_capability("execute-seed")
318 .with_status(AgentStatus::Starting);
319
320 if goal_lower.contains("review") || goal_lower.contains("code") {
322 card = card.with_capability("code-review");
323 }
324 if goal_lower.contains("test") {
325 card = card.with_capability("testing");
326 }
327 if goal_lower.contains("refactor") || goal_lower.contains("improve") {
328 card = card.with_capability("refactoring");
329 }
330 if goal_lower.contains("write")
331 || goal_lower.contains("create")
332 || goal_lower.contains("implement")
333 {
334 card = card.with_capability("code-generation");
335 }
336 if goal_lower.contains("debug") || goal_lower.contains("fix") {
337 card = card.with_capability("debugging");
338 }
339
340 card
341 }
342
343 fn build_agent_card_directive(
349 &self,
350 agent_id: AgentId,
351 agent_name: &str,
352 directive: &Directive,
353 ) -> AgentCard {
354 let goal_lower = directive.goal.to_lowercase();
355
356 let mut card = AgentCard::new(
357 agent_id,
358 agent_name,
359 format!("Agent executing directive: {}", directive.goal),
360 )
361 .with_capability("execute-directive")
362 .with_status(AgentStatus::Starting);
363
364 if goal_lower.contains("review") || goal_lower.contains("code") {
366 card = card.with_capability("code-review");
367 }
368 if goal_lower.contains("test") {
369 card = card.with_capability("testing");
370 }
371 if goal_lower.contains("refactor") || goal_lower.contains("improve") {
372 card = card.with_capability("refactoring");
373 }
374 if goal_lower.contains("write")
375 || goal_lower.contains("create")
376 || goal_lower.contains("implement")
377 {
378 card = card.with_capability("code-generation");
379 }
380 if goal_lower.contains("debug") || goal_lower.contains("fix") {
381 card = card.with_capability("debugging");
382 }
383
384 card
385 }
386
387 fn ensure_permissions(&self, agent_name: &str) {
395 let mut access = self.access_manager.lock();
396 let perms = access.get_or_create_permissions(agent_name);
397
398 for tool in &self.allowed_tools {
400 if !perms.allowed_tools.contains(tool.as_str()) {
401 perms.allow_tool(tool);
402 }
403 }
404
405 let ws_pattern = format!("{}/**", self.workspace_path.trim_end_matches('/'));
407 if !perms.allowed_paths.iter().any(|p| p == &ws_pattern) {
408 perms.allow_path(&ws_pattern);
409 }
410 if !perms.allowed_paths.iter().any(|p| p == "/tmp/**") {
412 perms.allow_path("/tmp/**");
413 }
414
415 if self.network_access {
417 perms.enable_network();
418 }
419
420 let subject = Subject::Agent(
423 agent_name
424 .strip_prefix("agent-")
425 .and_then(|s| s.parse().ok())
426 .unwrap_or_default(),
427 );
428 access
429 .rbac_manager_mut()
430 .assign_role(subject, Role::Superuser);
431 }
432
433 async fn cleanup(&self, agent_id: AgentId, task_id: uuid::Uuid, result: &ExecutionResult) {
435 if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
436 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
437 }
438 if result.success {
439 let _ = self.scheduler.complete_task(task_id);
440 } else {
441 let _ = self.scheduler.fail_task(task_id, &result.output);
442 }
443 }
444
445 pub fn reap_zombies(&self) -> Vec<uuid::Uuid> {
447 let reaped = self.scheduler.reap_zombies();
448 if !reaped.is_empty() {
449 tracing::warn!(count = reaped.len(), "Zombie tasks reaped");
450 let mut access = self.access_manager.lock();
451 for task_id in &reaped {
452 access.log_access("scheduler", "zombie_reap", &task_id.to_string(), true, None);
453 }
454 }
455 reaped
456 }
457
458 async fn cleanup_on_failure(&self, agent_id: AgentId, task_id: uuid::Uuid) {
460 if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
461 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
462 }
463 let _ = self.scheduler.fail_task(task_id, "execution failed");
464 }
465}