#![allow(dead_code)]
use anyhow::Result;
use dashmap::DashMap;
use futures::future::join_all;
use std::sync::Arc;
use tokio::sync::{mpsc, Semaphore};
use tracing::{debug, info, warn};
pub mod context;
pub mod executor;
pub mod types;
pub use context::AgentContext;
pub use executor::AgentExecutor;
pub use types::{Agent, AgentId, AgentResult, AgentTask, TaskType};
const MAX_CONCURRENT_AGENTS: usize = 10;
pub struct AgentSystem {
executor: Arc<AgentExecutor>,
active_agents: DashMap<AgentId, AgentHandle>,
task_queue: mpsc::Sender<AgentTask>,
semaphore: Arc<Semaphore>,
}
struct AgentHandle {
task: AgentTask,
abort_handle: tokio::task::AbortHandle,
}
impl AgentSystem {
pub fn new() -> Self {
let (tx, mut rx) = mpsc::channel::<AgentTask>(1000);
let executor = Arc::new(AgentExecutor::new());
let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_AGENTS));
let system = Self {
executor: executor.clone(),
active_agents: DashMap::new(),
task_queue: tx,
semaphore,
};
tokio::spawn(async move {
while let Some(task) = rx.recv().await {
let exec = executor.clone();
tokio::spawn(async move {
if let Err(e) = exec.execute(task).await {
warn!("Agent task failed: {}", e);
}
});
}
});
system
}
pub async fn spawn_agent<A>(&self, agent: A, task: AgentTask) -> Result<AgentId>
where
A: Agent + Send + Sync + 'static,
{
let agent_id = AgentId::new();
let permit = self.semaphore.clone().acquire_owned().await?;
let context = AgentContext::new(agent_id.clone(), task.clone());
let executor = self.executor.clone();
let agent_id_clone = agent_id.clone();
let task_clone = task.clone();
let handle = tokio::spawn(async move {
let _permit = permit; debug!("Agent {} started task: {:?}", agent_id_clone, task_clone);
match agent.execute(context).await {
Ok(result) => {
debug!("Agent {} completed successfully", agent_id_clone);
let _ = executor.store_result(agent_id_clone.clone(), result).await;
}
Err(e) => {
warn!("Agent {} failed: {}", agent_id_clone, e);
}
}
});
self.active_agents.insert(
agent_id.clone(),
AgentHandle {
task,
abort_handle: handle.abort_handle(),
},
);
Ok(agent_id)
}
pub async fn spawn_agents<A>(&self, agents: Vec<(A, AgentTask)>) -> Result<Vec<AgentId>>
where
A: Agent + Send + Sync + 'static,
{
let futures = agents.into_iter().map(|(agent, task)| {
self.spawn_agent(agent, task)
});
let results = join_all(futures).await;
results.into_iter().collect::<Result<Vec<_>>>()
}
pub async fn wait_for_all(&self) {
while !self.active_agents.is_empty() {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
self.active_agents.retain(|_, handle| {
!handle.abort_handle.is_finished()
});
}
}
pub async fn collect_results(&self) -> Vec<(AgentId, AgentResult)> {
self.executor.collect_results().await
}
pub async fn refresh_recent(&self) -> Result<()> {
info!("Refreshing recent activity...");
Ok(())
}
pub fn abort_agent(&self, agent_id: &AgentId) {
if let Some((_, handle)) = self.active_agents.remove(agent_id) {
handle.abort_handle.abort();
info!("Aborted agent {}", agent_id);
}
}
}
impl Default for AgentSystem {
fn default() -> Self {
Self::new()
}
}