1use anyhow::{bail, Result};
8use std::sync::Arc;
9
10use tokio::time::{timeout, Duration};
11
12use crate::a2a::{A2AProtocol, AgentCard};
13use crate::access_manager::AccessManager;
14use crate::event_bus::{EventBus, KernelEvent};
15use crate::metrics::get_metrics;
16use crate::scheduler::{AgentScheduler, Priority, ScheduledTask};
17use crate::supervisor::Supervisor;
18use crate::types::{AgentId, AgentStatus};
19use oxios_ouroboros::{ExecutionResult, Seed};
20
21pub struct AgentLifecycleManager {
23 supervisor: Arc<dyn Supervisor>,
24 scheduler: Arc<AgentScheduler>,
25 access_manager: Arc<parking_lot::Mutex<AccessManager>>,
26 a2a: Arc<A2AProtocol>,
27 event_bus: EventBus,
28 max_execution_time_secs: std::sync::atomic::AtomicU64,
30}
31
32impl Clone for AgentLifecycleManager {
33 fn clone(&self) -> Self {
34 Self {
35 supervisor: self.supervisor.clone(),
36 scheduler: self.scheduler.clone(),
37 access_manager: self.access_manager.clone(),
38 a2a: self.a2a.clone(),
39 event_bus: self.event_bus.clone(),
40 max_execution_time_secs: std::sync::atomic::AtomicU64::new(
41 self.max_execution_time_secs
42 .load(std::sync::atomic::Ordering::Relaxed),
43 ),
44 }
45 }
46}
47
48impl AgentLifecycleManager {
49 pub fn new(
51 supervisor: Arc<dyn Supervisor>,
52 scheduler: Arc<AgentScheduler>,
53 access_manager: Arc<parking_lot::Mutex<AccessManager>>,
54 a2a: Arc<A2AProtocol>,
55 event_bus: EventBus,
56 max_execution_time_secs: u64,
57 ) -> Self {
58 Self {
59 supervisor,
60 scheduler,
61 access_manager,
62 a2a,
63 event_bus,
64 max_execution_time_secs: std::sync::atomic::AtomicU64::new(max_execution_time_secs),
65 }
66 }
67
68 pub fn set_max_execution_time(&self, secs: u64) {
70 self.max_execution_time_secs
71 .store(secs, std::sync::atomic::Ordering::Relaxed);
72 tracing::info!(
73 max_execution_time_secs = secs,
74 "Lifecycle config hot-reloaded"
75 );
76 }
77
78 pub async fn spawn_and_run(&self, seed: &Seed, priority: Priority) -> Result<ExecutionResult> {
81 let agent_id = self.supervisor.fork(seed).await?;
83 let agent_name = format!("agent-{agent_id}");
84 tracing::info!(agent_id = %agent_id, seed_id = %seed.id, "Agent forked");
85
86 let card = self.build_agent_card(agent_id, &agent_name, seed);
88 if let Err(e) = self.a2a.registry().register_agent(card).await {
89 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to register A2A card");
90 }
91
92 if let Err(e) = self.a2a.deliver_pending_messages(agent_id).await {
94 tracing::debug!(agent_id = %agent_id, error = %e, "No pending A2A messages");
95 }
96
97 self.ensure_permissions(&agent_name);
99
100 get_metrics().agents_forked.inc();
102 let task =
103 ScheduledTask::for_agent(agent_id, format!("Execute seed '{}'", seed.goal), priority);
104 let task_id = self.scheduler.submit(task)?;
105 self.scheduler.start_task(task_id)?;
106
107 let max_secs = self
109 .max_execution_time_secs
110 .load(std::sync::atomic::Ordering::Relaxed);
111 let result = if max_secs > 0 {
112 let exec_timeout = Duration::from_secs(max_secs);
113 match timeout(exec_timeout, self.supervisor.run_with_seed(agent_id, seed)).await {
114 Ok(Ok(r)) => r,
115 Ok(Err(e)) => {
116 tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
117 self.cleanup_on_failure(agent_id, task_id).await;
118 return Err(e);
119 }
120 Err(_) => {
121 let secs = exec_timeout.as_secs();
122 tracing::warn!(agent_id = %agent_id, secs, "Agent execution timed out after {}s", secs);
123 self.cleanup_on_failure(agent_id, task_id).await;
124 bail!("Agent execution timed out after {secs} seconds");
125 }
126 }
127 } else {
128 match self.supervisor.run_with_seed(agent_id, seed).await {
129 Ok(r) => r,
130 Err(e) => {
131 tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
132 self.cleanup_on_failure(agent_id, task_id).await;
133 return Err(e);
134 }
135 }
136 };
137
138 self.cleanup(agent_id, task_id, &result).await;
140
141 Ok(result)
142 }
143
144 pub async fn terminate(&self, agent_id: AgentId) -> Result<()> {
146 if let Err(e) = self.supervisor.kill(agent_id).await {
147 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to kill agent");
148 }
149 if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
150 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
151 }
152 let _ = self
153 .event_bus
154 .publish(KernelEvent::AgentStopped { id: agent_id });
155 Ok(())
156 }
157
158 fn build_agent_card(&self, agent_id: AgentId, agent_name: &str, seed: &Seed) -> AgentCard {
160 let goal_lower = seed.goal.to_lowercase();
161
162 let mut card = AgentCard::new(
163 agent_id,
164 agent_name,
165 format!("Agent executing seed: {}", seed.goal),
166 )
167 .with_capability("execute-seed")
168 .with_status(AgentStatus::Starting);
169
170 if goal_lower.contains("review") || goal_lower.contains("code") {
172 card = card.with_capability("code-review");
173 }
174 if goal_lower.contains("test") {
175 card = card.with_capability("testing");
176 }
177 if goal_lower.contains("refactor") || goal_lower.contains("improve") {
178 card = card.with_capability("refactoring");
179 }
180 if goal_lower.contains("write")
181 || goal_lower.contains("create")
182 || goal_lower.contains("implement")
183 {
184 card = card.with_capability("code-generation");
185 }
186 if goal_lower.contains("debug") || goal_lower.contains("fix") {
187 card = card.with_capability("debugging");
188 }
189
190 card
191 }
192
193 fn ensure_permissions(&self, agent_name: &str) {
195 let mut access = self.access_manager.lock();
196 let perms = access.get_or_create_permissions(agent_name);
197 for tool in [
199 "bash", "read", "write", "edit", "grep", "find", "exec", "ls",
200 ] {
201 if !perms.allowed_tools.contains(tool) {
202 perms.allow_tool(tool);
203 }
204 }
205 }
206
207 async fn cleanup(&self, agent_id: AgentId, task_id: uuid::Uuid, result: &ExecutionResult) {
209 if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
210 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
211 }
212 if result.success {
213 let _ = self.scheduler.complete_task(task_id);
214 } else {
215 let _ = self.scheduler.fail_task(task_id, &result.output);
216 }
217 }
218
219 pub fn reap_zombies(&self) -> Vec<uuid::Uuid> {
221 let reaped = self.scheduler.reap_zombies();
222 if !reaped.is_empty() {
223 tracing::warn!(count = reaped.len(), "Zombie tasks reaped");
224 let mut access = self.access_manager.lock();
225 for task_id in &reaped {
226 access.log_access("scheduler", "zombie_reap", &task_id.to_string(), true, None);
227 }
228 }
229 reaped
230 }
231
232 async fn cleanup_on_failure(&self, agent_id: AgentId, task_id: uuid::Uuid) {
234 if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
235 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
236 }
237 let _ = self.scheduler.fail_task(task_id, "execution failed");
238 }
239}