use std::{collections::HashMap, path::Path};
use chrono::Local;
use dashmap::DashSet;
use erased_serde::Serialize as ErasedSerialize;
use futures::{StreamExt, TryStreamExt, future::BoxFuture, stream};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use uuid::Uuid;
use crate::structs::{
agent::{Agent, AgentError},
conversation::{AgentConversation, Role},
persistence::{self, PersistenceError},
swarm::{MetadataSchemaMap, Swarm, SwarmError},
};
#[derive(Debug, Error)]
pub enum AgentRearrangeError {
#[error("Agent error: {0}")]
AgentError(#[from] AgentError),
#[error("FilePersistence error: {0}")]
FilePersistenceError(#[from] PersistenceError),
#[error("Flow validation error: {0}")]
FlowValidationError(String),
#[error("Agent '{0}' not found")]
AgentNotFound(String),
#[error("Invalid flow format: {0}")]
InvalidFlowFormat(String),
#[error("Duplicate agent names in flow are not allowed")]
DuplicateAgentNames,
#[error("Tasks or Agents are empty")]
EmptyTasksOrAgents,
#[error("Json error: {0}")]
JsonError(#[from] serde_json::Error),
#[error("Execution error: {0}")]
ExecutionError(String),
#[error("Join error: {0}")]
JoinError(#[from] tokio::task::JoinError),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum OutputType {
All,
Final,
List,
Dict,
}
impl Default for OutputType {
fn default() -> Self {
OutputType::All
}
}
#[derive(Default)]
pub struct AgentRearrangeBuilder {
name: String,
description: String,
agents: Vec<Box<dyn Agent>>,
flow: Option<String>,
max_loops: u32,
verbose: bool,
output_type: OutputType,
autosave: bool,
return_json: bool,
metadata_output_dir: String,
rules: Option<String>,
team_awareness: bool,
}
impl AgentRearrangeBuilder {
pub fn name(mut self, name: impl Into<String>) -> Self {
self.name = name.into();
self
}
pub fn description(mut self, description: impl Into<String>) -> Self {
self.description = description.into();
self
}
pub fn add_agent(mut self, agent: Box<dyn Agent>) -> Self {
self.agents.push(agent);
self
}
pub fn agents(self, agents: Vec<Box<dyn Agent>>) -> Self {
agents
.into_iter()
.fold(self, |builder, agent| builder.add_agent(agent))
}
pub fn flow(mut self, flow: impl Into<String>) -> Self {
self.flow = Some(flow.into());
self
}
pub fn max_loops(mut self, max_loops: u32) -> Self {
self.max_loops = max_loops;
self
}
pub fn verbose(mut self, verbose: bool) -> Self {
self.verbose = verbose;
self
}
pub fn output_type(mut self, output_type: OutputType) -> Self {
self.output_type = output_type;
self
}
pub fn autosave(mut self, autosave: bool) -> Self {
self.autosave = autosave;
self
}
pub fn return_json(mut self, return_json: bool) -> Self {
self.return_json = return_json;
self
}
pub fn metadata_output_dir(mut self, dir: impl Into<String>) -> Self {
self.metadata_output_dir = dir.into();
self
}
pub fn rules(mut self, rules: impl Into<String>) -> Self {
self.rules = Some(rules.into());
self
}
pub fn team_awareness(mut self, team_awareness: bool) -> Self {
self.team_awareness = team_awareness;
self
}
pub fn build(self) -> AgentRearrange {
AgentRearrange {
id: Uuid::new_v4().to_string(),
name: self.name,
description: self.description,
agents: self
.agents
.into_iter()
.map(|agent| (agent.name(), agent))
.collect(),
flow: self.flow.unwrap_or_default(),
max_loops: if self.max_loops > 0 {
self.max_loops
} else {
1
},
verbose: self.verbose,
output_type: self.output_type,
autosave: self.autosave,
return_json: self.return_json,
metadata_output_dir: self.metadata_output_dir,
conversation: AgentConversation::new("AgentRearrange".to_string()),
metadata_map: MetadataSchemaMap::default(),
tasks: DashSet::new(),
rules: self.rules,
team_awareness: self.team_awareness,
}
}
}
pub struct AgentRearrange {
id: String,
name: String,
description: String,
agents: HashMap<String, Box<dyn Agent>>,
flow: String,
max_loops: u32,
verbose: bool,
output_type: OutputType,
autosave: bool,
return_json: bool,
metadata_output_dir: String,
conversation: AgentConversation,
#[allow(dead_code)]
metadata_map: MetadataSchemaMap,
#[allow(dead_code)]
tasks: DashSet<String>,
rules: Option<String>,
team_awareness: bool,
}
impl Default for AgentRearrange {
fn default() -> Self {
Self {
id: Uuid::new_v4().to_string(),
name: "AgentRearrange".to_string(),
description: "A swarm of agents for rearranging tasks.".to_string(),
agents: HashMap::new(),
flow: String::new(),
max_loops: 1,
verbose: false,
output_type: OutputType::All,
autosave: false,
return_json: false,
metadata_output_dir: String::new(),
conversation: AgentConversation::new("AgentRearrange".to_string()),
metadata_map: MetadataSchemaMap::default(),
tasks: DashSet::new(),
rules: None,
team_awareness: false,
}
}
}
impl AgentRearrange {
pub fn builder() -> AgentRearrangeBuilder {
AgentRearrangeBuilder::default()
}
pub fn set_custom_flow(&mut self, flow: impl Into<String>) {
self.flow = flow.into();
if self.verbose {
tracing::info!("Custom flow set: {}", self.flow);
}
}
pub fn add_agent(&mut self, agent: Box<dyn Agent>) {
let agent_name = agent.name();
if self.verbose {
tracing::info!("Adding agent {} to the swarm", agent_name);
}
self.agents.insert(agent_name, agent);
}
pub fn remove_agent(&mut self, agent_name: &str) -> Option<Box<dyn Agent>> {
if self.verbose {
tracing::info!("Removing agent {} from the swarm", agent_name);
}
self.agents.remove(agent_name)
}
pub fn add_agents(&mut self, agents: Vec<Box<dyn Agent>>) {
for agent in agents {
self.add_agent(agent);
}
}
pub fn validate_flow(&self) -> Result<(), AgentRearrangeError> {
if self.flow.is_empty() {
return Err(AgentRearrangeError::FlowValidationError(
"Flow cannot be empty".to_string(),
));
}
let tasks: Vec<&str> = if self.flow.contains("->") {
self.flow.split("->").collect()
} else {
vec![self.flow.as_str()]
};
for task in tasks {
let agent_names: Vec<&str> = task.split(',').map(|name| name.trim()).collect();
for agent_name in agent_names {
if agent_name != "H" && !self.agents.contains_key(agent_name) {
return Err(AgentRearrangeError::AgentNotFound(agent_name.to_string()));
}
}
}
if self.verbose {
tracing::info!("Flow: {} is valid", self.flow);
}
Ok(())
}
pub async fn run(&mut self, task: impl Into<String>) -> Result<String, AgentRearrangeError> {
self.run_internal(task, None, None).await
}
async fn run_internal(
&mut self,
task: impl Into<String>,
_img: Option<String>,
_custom_tasks: Option<HashMap<String, String>>,
) -> Result<String, AgentRearrangeError> {
let task = task.into();
if self.verbose {
tracing::info!("Starting task execution: {}", task);
}
self.conversation
.add(Role::User("System".to_string()), task.clone());
self.validate_flow()?;
if let Some(rules) = &self.rules {
self.conversation.add(
Role::User("System".to_string()),
format!("Rules: {}", rules),
);
}
let tasks: Vec<&str> = if self.flow.contains("->") {
self.flow.split("->").collect()
} else {
vec![self.flow.as_str()]
};
let mut current_task = task.clone();
let mut response_map = HashMap::new();
for loop_count in 0..self.max_loops {
if self.verbose {
tracing::info!("Starting loop {}/{}", loop_count + 1, self.max_loops);
}
for task_step in tasks.iter() {
let agent_names: Vec<&str> = task_step.split(',').map(|name| name.trim()).collect();
if agent_names.len() > 1 {
if self.verbose {
tracing::info!("Running agents in parallel: {:?}", agent_names);
}
let parallel_results = self
.execute_agents_parallel(&agent_names, ¤t_task)
.await?;
for (agent_name, result) in parallel_results {
self.conversation
.add(Role::Assistant(agent_name.clone()), result.clone());
response_map.insert(agent_name, result);
}
} else {
let agent_name = agent_names[0];
if self.verbose {
tracing::info!("Running agent sequentially: {}", agent_name);
}
if agent_name == "H" {
if self.verbose {
tracing::info!("Human intervention point reached");
}
continue;
}
let agent = self.agents.get(agent_name).ok_or_else(|| {
AgentRearrangeError::AgentNotFound(agent_name.to_string())
})?;
let result = agent
.run(self.conversation.to_string())
.await
.map_err(AgentRearrangeError::AgentError)?;
self.conversation
.add(Role::Assistant(agent_name.to_string()), result.clone());
response_map.insert(agent_name.to_string(), result.clone());
current_task = result;
}
}
}
if self.verbose {
tracing::info!("Task execution completed");
}
let output = self.format_output(&response_map, ¤t_task);
if self.autosave {
self.save_metadata().await?;
}
Ok(output)
}
async fn execute_agents_parallel(
&self,
agent_names: &[&str],
task: &str,
) -> Result<HashMap<String, String>, AgentRearrangeError> {
let mut handles = Vec::new();
for agent_name in agent_names {
if *agent_name == "H" {
continue; }
let agent = self
.agents
.get(*agent_name)
.ok_or_else(|| AgentRearrangeError::AgentNotFound(agent_name.to_string()))?;
let task_clone = task.to_string();
let agent_name_clone = agent_name.to_string();
let agent_clone = agent.clone_box();
let handle = tokio::spawn(async move {
let result = agent_clone.run(task_clone).await;
(agent_name_clone, result)
});
handles.push(handle);
}
let mut results = HashMap::new();
for handle in handles {
let (agent_name, result) = handle.await?;
let result = result.map_err(AgentRearrangeError::AgentError)?;
results.insert(agent_name, result);
}
Ok(results)
}
fn format_output(&self, response_map: &HashMap<String, String>, final_result: &str) -> String {
match self.output_type {
OutputType::All => {
let mut output = String::new();
for (agent_name, response) in response_map {
output.push_str(&format!("{}: {}\n", agent_name, response));
}
output
},
OutputType::Final => final_result.to_string(),
OutputType::List => {
let responses: Vec<String> = response_map.values().cloned().collect();
if self.return_json {
serde_json::to_string(&responses).unwrap_or_else(|_| "[]".to_string())
} else {
responses.join("\n")
}
},
OutputType::Dict => {
if self.return_json {
serde_json::to_string(response_map).unwrap_or_else(|_| "{}".to_string())
} else {
response_map
.iter()
.map(|(k, v)| format!("{}: {}", k, v))
.collect::<Vec<_>>()
.join("\n")
}
},
}
}
pub async fn batch_run(
&mut self,
tasks: Vec<String>,
batch_size: usize,
img: Option<Vec<String>>,
) -> Result<Vec<String>, AgentRearrangeError> {
if tasks.is_empty() {
return Err(AgentRearrangeError::EmptyTasksOrAgents);
}
let mut results = Vec::with_capacity(tasks.len());
for chunk in tasks.chunks(batch_size) {
let mut batch_handles = Vec::new();
for (i, task) in chunk.iter().enumerate() {
let img_path = img.as_ref().and_then(|imgs| imgs.get(i)).cloned();
let task_clone = task.clone();
let mut rearrange_clone = self.clone_for_task();
let handle = tokio::spawn(async move {
rearrange_clone
.run_internal(task_clone, img_path, None)
.await
});
batch_handles.push(handle);
}
for handle in batch_handles {
let result = handle.await??;
results.push(result);
}
}
Ok(results)
}
pub async fn concurrent_run(
&mut self,
tasks: Vec<String>,
img: Option<Vec<String>>,
max_concurrent: Option<usize>,
) -> Result<Vec<String>, AgentRearrangeError> {
if tasks.is_empty() {
return Err(AgentRearrangeError::EmptyTasksOrAgents);
}
let stream = stream::iter(tasks.into_iter().enumerate().map(|(i, task)| {
let img_path = img.as_ref().and_then(|imgs| imgs.get(i)).cloned();
let mut rearrange_clone = self.clone_for_task();
async move { rearrange_clone.run_internal(task, img_path, None).await }
}));
let results: Result<Vec<_>, _> = if let Some(max_concurrent) = max_concurrent {
stream.buffer_unordered(max_concurrent).try_collect().await
} else {
stream.buffer_unordered(8).try_collect().await };
results
}
fn clone_for_task(&self) -> Self {
let mut cloned_agents = HashMap::new();
for (name, agent) in &self.agents {
cloned_agents.insert(name.clone(), agent.clone_box());
}
Self {
id: self.id.clone(),
name: self.name.clone(),
description: self.description.clone(),
agents: cloned_agents,
flow: self.flow.clone(),
max_loops: self.max_loops,
verbose: self.verbose,
output_type: self.output_type.clone(),
autosave: false, return_json: self.return_json,
metadata_output_dir: self.metadata_output_dir.clone(),
conversation: AgentConversation::new(format!("{}-clone", self.name)),
metadata_map: MetadataSchemaMap::default(),
tasks: DashSet::new(),
rules: self.rules.clone(),
team_awareness: self.team_awareness,
}
}
async fn save_metadata(&self) -> Result<(), AgentRearrangeError> {
if !self.metadata_output_dir.is_empty() {
let metadata = self.to_metadata();
let path = Path::new(&self.metadata_output_dir).join(format!("{}.json", self.id));
let json_data = serde_json::to_string_pretty(&metadata)?;
persistence::save_to_file(json_data.as_bytes(), &path).await?;
}
Ok(())
}
fn to_metadata(&self) -> serde_json::Value {
serde_json::json!({
"id": self.id,
"name": self.name,
"description": self.description,
"flow": self.flow,
"max_loops": self.max_loops,
"agents": self.agents.keys().collect::<Vec<_>>(),
"conversation_length": self.conversation.history.len(),
"timestamp": Local::now().timestamp_millis(),
})
}
pub fn id(&self) -> &str {
&self.id
}
pub fn name(&self) -> &str {
&self.name
}
pub fn description(&self) -> &str {
&self.description
}
pub fn flow(&self) -> &str {
&self.flow
}
pub fn agent_names(&self) -> Vec<&String> {
self.agents.keys().collect()
}
pub fn conversation(&self) -> &AgentConversation {
&self.conversation
}
pub fn max_loops(&self) -> u32 {
self.max_loops
}
pub fn agent_count(&self) -> usize {
self.agents.len()
}
}
pub async fn rearrange(
name: impl Into<String>,
description: impl Into<String>,
agents: Vec<Box<dyn Agent>>,
flow: impl Into<String>,
task: impl Into<String>,
img: Option<String>,
) -> Result<String, AgentRearrangeError> {
let mut agent_system = AgentRearrange::builder()
.name(name)
.description(description)
.agents(agents)
.flow(flow)
.build();
agent_system.run_internal(task, img, None).await
}
impl Swarm for AgentRearrange {
fn run(&self, task: String) -> BoxFuture<'_, Result<Box<dyn ErasedSerialize>, SwarmError>> {
Box::pin(async move {
let mut rearrange_clone = self.clone_for_task();
match rearrange_clone.run_internal(task, None, None).await {
Ok(result) => {
let serialized: Box<dyn ErasedSerialize> = Box::new(result);
Ok(serialized)
},
Err(e) => Err(SwarmError::AgentRearrangeError(e)),
}
})
}
fn name(&self) -> &str {
&self.name
}
}