1use anyhow::{Result, bail};
8use std::sync::Arc;
9
10use tokio::time::{Duration, timeout};
11
12use crate::a2a::{A2AProtocol, AgentCard};
13use crate::access_manager::{AccessManager, Role, Subject};
14use crate::event_bus::{EventBus, KernelEvent};
15use crate::metrics::get_metrics;
16use crate::scheduler::{AgentScheduler, Priority, ScheduledTask};
17use crate::supervisor::Supervisor;
18use crate::types::{AgentId, AgentStatus};
19use oxios_ouroboros::{ExecutionResult, Seed};
20
21pub struct AgentLifecycleManager {
23 supervisor: Arc<dyn Supervisor>,
24 scheduler: Arc<AgentScheduler>,
25 access_manager: Arc<parking_lot::Mutex<AccessManager>>,
26 a2a: Arc<A2AProtocol>,
27 event_bus: EventBus,
28 max_execution_time_secs: std::sync::atomic::AtomicU64,
30 allowed_tools: Vec<String>,
32 network_access: bool,
34 workspace_path: String,
36}
37
38impl Clone for AgentLifecycleManager {
39 fn clone(&self) -> Self {
40 Self {
41 supervisor: self.supervisor.clone(),
42 scheduler: self.scheduler.clone(),
43 access_manager: self.access_manager.clone(),
44 a2a: self.a2a.clone(),
45 event_bus: self.event_bus.clone(),
46 max_execution_time_secs: std::sync::atomic::AtomicU64::new(
47 self.max_execution_time_secs
48 .load(std::sync::atomic::Ordering::Relaxed),
49 ),
50 allowed_tools: self.allowed_tools.clone(),
51 network_access: self.network_access,
52 workspace_path: self.workspace_path.clone(),
53 }
54 }
55}
56
57impl AgentLifecycleManager {
58 #[allow(clippy::too_many_arguments)]
60 pub fn new(
61 supervisor: Arc<dyn Supervisor>,
62 scheduler: Arc<AgentScheduler>,
63 access_manager: Arc<parking_lot::Mutex<AccessManager>>,
64 a2a: Arc<A2AProtocol>,
65 event_bus: EventBus,
66 max_execution_time_secs: u64,
67 allowed_tools: Vec<String>,
68 network_access: bool,
69 workspace_path: String,
70 ) -> Self {
71 Self {
72 supervisor,
73 scheduler,
74 access_manager,
75 a2a,
76 event_bus,
77 max_execution_time_secs: std::sync::atomic::AtomicU64::new(max_execution_time_secs),
78 allowed_tools,
79 network_access,
80 workspace_path,
81 }
82 }
83
84 pub fn set_max_execution_time(&self, secs: u64) {
86 self.max_execution_time_secs
87 .store(secs, std::sync::atomic::Ordering::Relaxed);
88 tracing::info!(
89 max_execution_time_secs = secs,
90 "Lifecycle config hot-reloaded"
91 );
92 }
93
94 pub async fn spawn_and_run(&self, seed: &Seed, priority: Priority) -> Result<ExecutionResult> {
97 let agent_id = self.supervisor.fork(seed).await?;
99 let agent_name = format!("agent-{agent_id}");
100 tracing::info!(agent_id = %agent_id, seed_id = %seed.id, "Agent forked");
101
102 let card = self.build_agent_card(agent_id, &agent_name, seed);
104 if let Err(e) = self.a2a.registry().register_agent(card).await {
105 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to register A2A card");
106 }
107
108 if let Err(e) = self.a2a.deliver_pending_messages(agent_id).await {
110 tracing::debug!(agent_id = %agent_id, error = %e, "No pending A2A messages");
111 }
112
113 self.ensure_permissions(&agent_name);
115
116 get_metrics().agents_forked.inc();
118 let task =
119 ScheduledTask::for_agent(agent_id, format!("Execute seed '{}'", seed.goal), priority);
120 let task_id = self.scheduler.submit(task)?;
121 self.scheduler.start_task(task_id)?;
122
123 let max_secs = self
125 .max_execution_time_secs
126 .load(std::sync::atomic::Ordering::Relaxed);
127 let result = if max_secs > 0 {
128 let exec_timeout = Duration::from_secs(max_secs);
129 match timeout(exec_timeout, self.supervisor.run_with_seed(agent_id, seed)).await {
130 Ok(Ok(r)) => r,
131 Ok(Err(e)) => {
132 tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
133 self.cleanup_on_failure(agent_id, task_id).await;
134 return Err(e);
135 }
136 Err(_) => {
137 let secs = exec_timeout.as_secs();
138 tracing::warn!(
139 agent_id = %agent_id,
140 secs,
141 "Agent execution timed out after {}s",
142 secs
143 );
144 let _ = self.supervisor.kill(agent_id).await;
148 self.cleanup_on_failure(agent_id, task_id).await;
149 bail!("Agent execution timed out after {secs} seconds");
150 }
151 }
152 } else {
153 match self.supervisor.run_with_seed(agent_id, seed).await {
154 Ok(r) => r,
155 Err(e) => {
156 tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
157 self.cleanup_on_failure(agent_id, task_id).await;
158 return Err(e);
159 }
160 }
161 };
162
163 self.cleanup(agent_id, task_id, &result).await;
165
166 Ok(result)
167 }
168
169 pub async fn terminate(&self, agent_id: AgentId) -> Result<()> {
171 if let Err(e) = self.supervisor.kill(agent_id).await {
172 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to kill agent");
173 }
174 if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
175 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
176 }
177 let _ = self
178 .event_bus
179 .publish(KernelEvent::AgentStopped { id: agent_id });
180 Ok(())
181 }
182
183 fn build_agent_card(&self, agent_id: AgentId, agent_name: &str, seed: &Seed) -> AgentCard {
185 let goal_lower = seed.goal.to_lowercase();
186
187 let mut card = AgentCard::new(
188 agent_id,
189 agent_name,
190 format!("Agent executing seed: {}", seed.goal),
191 )
192 .with_capability("execute-seed")
193 .with_status(AgentStatus::Starting);
194
195 if goal_lower.contains("review") || goal_lower.contains("code") {
197 card = card.with_capability("code-review");
198 }
199 if goal_lower.contains("test") {
200 card = card.with_capability("testing");
201 }
202 if goal_lower.contains("refactor") || goal_lower.contains("improve") {
203 card = card.with_capability("refactoring");
204 }
205 if goal_lower.contains("write")
206 || goal_lower.contains("create")
207 || goal_lower.contains("implement")
208 {
209 card = card.with_capability("code-generation");
210 }
211 if goal_lower.contains("debug") || goal_lower.contains("fix") {
212 card = card.with_capability("debugging");
213 }
214
215 card
216 }
217
218 fn ensure_permissions(&self, agent_name: &str) {
226 let mut access = self.access_manager.lock();
227 let perms = access.get_or_create_permissions(agent_name);
228
229 for tool in &self.allowed_tools {
231 if !perms.allowed_tools.contains(tool.as_str()) {
232 perms.allow_tool(tool);
233 }
234 }
235
236 let ws_pattern = format!("{}/**", self.workspace_path.trim_end_matches('/'));
238 if !perms.allowed_paths.iter().any(|p| p == &ws_pattern) {
239 perms.allow_path(&ws_pattern);
240 }
241 if !perms.allowed_paths.iter().any(|p| p == "/tmp/**") {
243 perms.allow_path("/tmp/**");
244 }
245
246 if self.network_access {
248 perms.enable_network();
249 }
250
251 let subject = Subject::Agent(
254 agent_name
255 .strip_prefix("agent-")
256 .and_then(|s| s.parse().ok())
257 .unwrap_or_default(),
258 );
259 access
260 .rbac_manager_mut()
261 .assign_role(subject, Role::Superuser);
262 }
263
264 async fn cleanup(&self, agent_id: AgentId, task_id: uuid::Uuid, result: &ExecutionResult) {
266 if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
267 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
268 }
269 if result.success {
270 let _ = self.scheduler.complete_task(task_id);
271 } else {
272 let _ = self.scheduler.fail_task(task_id, &result.output);
273 }
274 }
275
276 pub fn reap_zombies(&self) -> Vec<uuid::Uuid> {
278 let reaped = self.scheduler.reap_zombies();
279 if !reaped.is_empty() {
280 tracing::warn!(count = reaped.len(), "Zombie tasks reaped");
281 let mut access = self.access_manager.lock();
282 for task_id in &reaped {
283 access.log_access("scheduler", "zombie_reap", &task_id.to_string(), true, None);
284 }
285 }
286 reaped
287 }
288
289 async fn cleanup_on_failure(&self, agent_id: AgentId, task_id: uuid::Uuid) {
291 if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
292 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
293 }
294 let _ = self.scheduler.fail_task(task_id, "execution failed");
295 }
296}