1use anyhow::{Result, bail};
8use std::sync::Arc;
9
10use tokio::time::{Duration, timeout};
11
12use crate::a2a::{A2AProtocol, AgentCard};
13use crate::access_manager::{AccessManager, Role, Subject};
14use crate::event_bus::{EventBus, KernelEvent};
15use crate::metrics::get_metrics;
16use crate::scheduler::{AgentScheduler, Priority, ScheduledTask};
17use crate::supervisor::Supervisor;
18use crate::types::{AgentId, AgentStatus};
19use oxios_ouroboros::{ExecutionResult, Seed};
20
21pub struct AgentLifecycleManager {
23 supervisor: Arc<dyn Supervisor>,
24 scheduler: Arc<AgentScheduler>,
25 access_manager: Arc<parking_lot::Mutex<AccessManager>>,
26 a2a: Arc<A2AProtocol>,
27 event_bus: EventBus,
28 max_execution_time_secs: std::sync::atomic::AtomicU64,
30 allowed_tools: Vec<String>,
32 network_access: bool,
34 workspace_path: String,
36}
37
38impl Clone for AgentLifecycleManager {
39 fn clone(&self) -> Self {
40 Self {
41 supervisor: self.supervisor.clone(),
42 scheduler: self.scheduler.clone(),
43 access_manager: self.access_manager.clone(),
44 a2a: self.a2a.clone(),
45 event_bus: self.event_bus.clone(),
46 max_execution_time_secs: std::sync::atomic::AtomicU64::new(
47 self.max_execution_time_secs
48 .load(std::sync::atomic::Ordering::Relaxed),
49 ),
50 allowed_tools: self.allowed_tools.clone(),
51 network_access: self.network_access,
52 workspace_path: self.workspace_path.clone(),
53 }
54 }
55}
56
57impl AgentLifecycleManager {
58 #[allow(clippy::too_many_arguments)]
60 pub fn new(
61 supervisor: Arc<dyn Supervisor>,
62 scheduler: Arc<AgentScheduler>,
63 access_manager: Arc<parking_lot::Mutex<AccessManager>>,
64 a2a: Arc<A2AProtocol>,
65 event_bus: EventBus,
66 max_execution_time_secs: u64,
67 allowed_tools: Vec<String>,
68 network_access: bool,
69 workspace_path: String,
70 ) -> Self {
71 Self {
72 supervisor,
73 scheduler,
74 access_manager,
75 a2a,
76 event_bus,
77 max_execution_time_secs: std::sync::atomic::AtomicU64::new(max_execution_time_secs),
78 allowed_tools,
79 network_access,
80 workspace_path,
81 }
82 }
83
84 pub fn set_max_execution_time(&self, secs: u64) {
86 self.max_execution_time_secs
87 .store(secs, std::sync::atomic::Ordering::Relaxed);
88 tracing::info!(
89 max_execution_time_secs = secs,
90 "Lifecycle config hot-reloaded"
91 );
92 }
93
94 pub async fn spawn_and_run(&self, seed: &Seed, priority: Priority) -> Result<ExecutionResult> {
97 let agent_id = self.supervisor.fork(seed).await?;
99 let agent_name = format!("agent-{agent_id}");
100 tracing::info!(agent_id = %agent_id, seed_id = %seed.id, "Agent forked");
101
102 let card = self.build_agent_card(agent_id, &agent_name, seed);
104 if let Err(e) = self.a2a.registry().register_agent(card).await {
105 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to register A2A card");
106 }
107
108 if let Err(e) = self.a2a.deliver_pending_messages(agent_id).await {
110 tracing::debug!(agent_id = %agent_id, error = %e, "No pending A2A messages");
111 }
112
113 self.ensure_permissions(&agent_name);
115
116 get_metrics().agents_forked.inc();
118 let task =
119 ScheduledTask::for_agent(agent_id, format!("Execute seed '{}'", seed.goal), priority);
120 let task_id = self.scheduler.submit(task)?;
121 self.scheduler.start_task(task_id)?;
122
123 let max_secs = self
125 .max_execution_time_secs
126 .load(std::sync::atomic::Ordering::Relaxed);
127 let result = if max_secs > 0 {
128 let exec_timeout = Duration::from_secs(max_secs);
129 match timeout(exec_timeout, self.supervisor.run_with_seed(agent_id, seed)).await {
130 Ok(Ok(r)) => r,
131 Ok(Err(e)) => {
132 tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
133 self.cleanup_on_failure(agent_id, task_id).await;
134 return Err(e);
135 }
136 Err(_) => {
137 let secs = exec_timeout.as_secs();
138 tracing::warn!(agent_id = %agent_id, secs, "Agent execution timed out after {}s", secs);
139 self.cleanup_on_failure(agent_id, task_id).await;
140 bail!("Agent execution timed out after {secs} seconds");
141 }
142 }
143 } else {
144 match self.supervisor.run_with_seed(agent_id, seed).await {
145 Ok(r) => r,
146 Err(e) => {
147 tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
148 self.cleanup_on_failure(agent_id, task_id).await;
149 return Err(e);
150 }
151 }
152 };
153
154 self.cleanup(agent_id, task_id, &result).await;
156
157 Ok(result)
158 }
159
160 pub async fn terminate(&self, agent_id: AgentId) -> Result<()> {
162 if let Err(e) = self.supervisor.kill(agent_id).await {
163 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to kill agent");
164 }
165 if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
166 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
167 }
168 let _ = self
169 .event_bus
170 .publish(KernelEvent::AgentStopped { id: agent_id });
171 Ok(())
172 }
173
174 fn build_agent_card(&self, agent_id: AgentId, agent_name: &str, seed: &Seed) -> AgentCard {
176 let goal_lower = seed.goal.to_lowercase();
177
178 let mut card = AgentCard::new(
179 agent_id,
180 agent_name,
181 format!("Agent executing seed: {}", seed.goal),
182 )
183 .with_capability("execute-seed")
184 .with_status(AgentStatus::Starting);
185
186 if goal_lower.contains("review") || goal_lower.contains("code") {
188 card = card.with_capability("code-review");
189 }
190 if goal_lower.contains("test") {
191 card = card.with_capability("testing");
192 }
193 if goal_lower.contains("refactor") || goal_lower.contains("improve") {
194 card = card.with_capability("refactoring");
195 }
196 if goal_lower.contains("write")
197 || goal_lower.contains("create")
198 || goal_lower.contains("implement")
199 {
200 card = card.with_capability("code-generation");
201 }
202 if goal_lower.contains("debug") || goal_lower.contains("fix") {
203 card = card.with_capability("debugging");
204 }
205
206 card
207 }
208
209 fn ensure_permissions(&self, agent_name: &str) {
217 let mut access = self.access_manager.lock();
218 let perms = access.get_or_create_permissions(agent_name);
219
220 for tool in &self.allowed_tools {
222 if !perms.allowed_tools.contains(tool.as_str()) {
223 perms.allow_tool(tool);
224 }
225 }
226
227 let ws_pattern = format!("{}/**", self.workspace_path.trim_end_matches('/'));
229 if !perms.allowed_paths.iter().any(|p| p == &ws_pattern) {
230 perms.allow_path(&ws_pattern);
231 }
232 if !perms.allowed_paths.iter().any(|p| p == "/tmp/**") {
234 perms.allow_path("/tmp/**");
235 }
236
237 if self.network_access {
239 perms.enable_network();
240 }
241
242 let subject = Subject::Agent(
245 agent_name
246 .strip_prefix("agent-")
247 .and_then(|s| s.parse().ok())
248 .unwrap_or_default(),
249 );
250 access
251 .rbac_manager_mut()
252 .assign_role(subject, Role::Superuser);
253 }
254
255 async fn cleanup(&self, agent_id: AgentId, task_id: uuid::Uuid, result: &ExecutionResult) {
257 if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
258 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
259 }
260 if result.success {
261 let _ = self.scheduler.complete_task(task_id);
262 } else {
263 let _ = self.scheduler.fail_task(task_id, &result.output);
264 }
265 }
266
267 pub fn reap_zombies(&self) -> Vec<uuid::Uuid> {
269 let reaped = self.scheduler.reap_zombies();
270 if !reaped.is_empty() {
271 tracing::warn!(count = reaped.len(), "Zombie tasks reaped");
272 let mut access = self.access_manager.lock();
273 for task_id in &reaped {
274 access.log_access("scheduler", "zombie_reap", &task_id.to_string(), true, None);
275 }
276 }
277 reaped
278 }
279
280 async fn cleanup_on_failure(&self, agent_id: AgentId, task_id: uuid::Uuid) {
282 if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
283 tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
284 }
285 let _ = self.scheduler.fail_task(task_id, "execution failed");
286 }
287}