use anyhow::{bail, Result};
use std::sync::Arc;
use tokio::time::{timeout, Duration};
use crate::a2a::{A2AProtocol, AgentCard};
use crate::access_manager::AccessManager;
use crate::event_bus::{EventBus, KernelEvent};
use crate::metrics::get_metrics;
use crate::scheduler::{AgentScheduler, Priority, ScheduledTask};
use crate::supervisor::Supervisor;
use crate::types::{AgentId, AgentStatus};
use oxios_ouroboros::{ExecutionResult, Seed};
#[derive(Clone)]
pub struct AgentLifecycleManager {
supervisor: Arc<dyn Supervisor>,
scheduler: Arc<AgentScheduler>,
access_manager: Arc<parking_lot::Mutex<AccessManager>>,
a2a: Arc<A2AProtocol>,
event_bus: EventBus,
max_execution_time_secs: u64,
}
impl AgentLifecycleManager {
pub fn new(
supervisor: Arc<dyn Supervisor>,
scheduler: Arc<AgentScheduler>,
access_manager: Arc<parking_lot::Mutex<AccessManager>>,
a2a: Arc<A2AProtocol>,
event_bus: EventBus,
max_execution_time_secs: u64,
) -> Self {
Self {
supervisor,
scheduler,
access_manager,
a2a,
event_bus,
max_execution_time_secs,
}
}
pub async fn spawn_and_run(&self, seed: &Seed, priority: Priority) -> Result<ExecutionResult> {
let agent_id = self.supervisor.fork(seed).await?;
let agent_name = format!("agent-{}", agent_id);
tracing::info!(agent_id = %agent_id, seed_id = %seed.id, "Agent forked");
let card = self.build_agent_card(agent_id, &agent_name, seed);
if let Err(e) = self.a2a.registry().register_agent(card).await {
tracing::warn!(agent_id = %agent_id, error = %e, "Failed to register A2A card");
}
if let Err(e) = self.a2a.deliver_pending_messages(agent_id).await {
tracing::debug!(agent_id = %agent_id, error = %e, "No pending A2A messages");
}
self.ensure_permissions(&agent_name);
get_metrics().agents_forked.inc();
let task =
ScheduledTask::for_agent(agent_id, format!("Execute seed '{}'", seed.goal), priority);
let task_id = self.scheduler.submit(task)?;
self.scheduler.start_task(task_id)?;
let result = if self.max_execution_time_secs > 0 {
let exec_timeout = Duration::from_secs(self.max_execution_time_secs);
match timeout(exec_timeout, self.supervisor.run_with_seed(agent_id, seed)).await {
Ok(Ok(r)) => r,
Ok(Err(e)) => {
tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
self.cleanup_on_failure(agent_id, task_id).await;
return Err(e);
}
Err(_) => {
let secs = exec_timeout.as_secs();
tracing::warn!(agent_id = %agent_id, secs, "Agent execution timed out after {}s", secs);
self.cleanup_on_failure(agent_id, task_id).await;
bail!("Agent execution timed out after {} seconds", secs);
}
}
} else {
match self.supervisor.run_with_seed(agent_id, seed).await {
Ok(r) => r,
Err(e) => {
tracing::warn!(agent_id = %agent_id, error = %e, "Agent execution failed, cleaning up");
self.cleanup_on_failure(agent_id, task_id).await;
return Err(e);
}
}
};
self.cleanup(agent_id, task_id, &result).await;
Ok(result)
}
pub async fn terminate(&self, agent_id: AgentId) -> Result<()> {
if let Err(e) = self.supervisor.kill(agent_id).await {
tracing::warn!(agent_id = %agent_id, error = %e, "Failed to kill agent");
}
if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
}
let _ = self
.event_bus
.publish(KernelEvent::AgentStopped { id: agent_id });
Ok(())
}
fn build_agent_card(&self, agent_id: AgentId, agent_name: &str, seed: &Seed) -> AgentCard {
let goal_lower = seed.goal.to_lowercase();
let mut card = AgentCard::new(
agent_id,
agent_name,
format!("Agent executing seed: {}", seed.goal),
)
.with_capability("execute-seed")
.with_status(AgentStatus::Starting);
if goal_lower.contains("review") || goal_lower.contains("code") {
card = card.with_capability("code-review");
}
if goal_lower.contains("test") {
card = card.with_capability("testing");
}
if goal_lower.contains("refactor") || goal_lower.contains("improve") {
card = card.with_capability("refactoring");
}
if goal_lower.contains("write")
|| goal_lower.contains("create")
|| goal_lower.contains("implement")
{
card = card.with_capability("code-generation");
}
if goal_lower.contains("debug") || goal_lower.contains("fix") {
card = card.with_capability("debugging");
}
card
}
fn ensure_permissions(&self, agent_name: &str) {
let mut access = self.access_manager.lock();
let perms = access.get_or_create_permissions(agent_name);
for tool in [
"bash", "read", "write", "edit", "grep", "find", "exec", "ls",
] {
if !perms.allowed_tools.contains(tool) {
perms.allow_tool(tool);
}
}
}
async fn cleanup(&self, agent_id: AgentId, task_id: uuid::Uuid, result: &ExecutionResult) {
if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
}
if result.success {
let _ = self.scheduler.complete_task(task_id);
} else {
let _ = self.scheduler.fail_task(task_id, &result.output);
}
}
pub fn reap_zombies(&self) -> Vec<uuid::Uuid> {
let reaped = self.scheduler.reap_zombies();
if !reaped.is_empty() {
tracing::warn!(count = reaped.len(), "Zombie tasks reaped");
let mut access = self.access_manager.lock();
for task_id in &reaped {
access.log_access("scheduler", "zombie_reap", &task_id.to_string(), true, None);
}
}
reaped
}
async fn cleanup_on_failure(&self, agent_id: AgentId, task_id: uuid::Uuid) {
if let Err(e) = self.a2a.registry().unregister_agent(agent_id).await {
tracing::warn!(agent_id = %agent_id, error = %e, "Failed to unregister A2A card");
}
let _ = self.scheduler.fail_task(task_id, "execution failed");
}
}