use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{error, info, warn};
use crate::agents::architect::ArchitectGPT;
use crate::agents::backend::BackendGPT;
#[cfg(feature = "img")]
use crate::agents::designer::DesignerGPT;
use crate::agents::frontend::FrontendGPT;
use crate::agents::git::GitGPT;
use crate::agents::types::AgentType;
use crate::common::utils::Task;
use crate::message::parse_kv;
use iac_rs::prelude::*;
use std::env;
pub struct Orchestrator {
pub id: String,
pub signer: Signer,
pub verifier: Verifier,
pub agents: Arc<Mutex<HashMap<String, AgentType>>>,
}
impl Orchestrator {
pub async fn new(id: String, signer: Signer, verifier: Verifier) -> anyhow::Result<Self> {
Ok(Self {
id,
signer,
verifier,
agents: Arc::new(Mutex::new(HashMap::new())),
})
}
pub async fn run(&mut self) -> Result<()> {
let addr = env::var("ORCHESTRATOR_ADDRESS").unwrap_or_else(|_| "0.0.0.0:8443".to_string());
let agents = Arc::clone(&self.agents);
let verifier = self.verifier.clone();
let id = self.id.clone();
let signer = self.signer.clone();
let mut server = Server::bind(&addr).await?;
info!("[*] \"Orchestrator\": Listening on {}", addr);
let server_handle = server.clone();
server.set_handler(move |(msg, conn)| {
let agents = Arc::clone(&agents);
let signer = signer.clone();
let value = id.clone();
let server_handle = server_handle.clone();
async move {
let mut agents = agents.lock().await;
let reply = match msg.msg_type {
MessageType::Create => {
let (_input, lang) = parse_kv(&msg.payload_json);
let lang_str = if lang.trim().is_empty() {
info!("[*] \"Orchestrator\": Language not specified, defaulting to 'python'");
"python".to_string()
} else {
lang
};
let language = Box::leak(lang_str.into_boxed_str());
let new_agent = match msg.to.as_str() {
"arch" => {
info!("[*] \"Orchestrator\": Creating Architect agent '{}'", msg.to);
Some(AgentType::Architect(ArchitectGPT::new("ArchitectGPT", "Architect agent").await))
}
"back" => {
info!("[*] \"Orchestrator\": Creating Backend agent '{}', language: {}", msg.to, language);
Some(AgentType::Backend(BackendGPT::new("BackendGPT", "Backend agent", language).await))
}
"front" => {
info!("[*] \"Orchestrator\": Creating Frontend agent '{}', language: {}", msg.to, language);
Some(AgentType::Frontend(FrontendGPT::new("FrontendGPT", "Frontend agent", language).await))
}
#[cfg(feature = "img")]
"design" => {
info!("[*] \"Orchestrator\": Creating Designer agent '{}'", msg.to);
Some(AgentType::Designer(DesignerGPT::new("DesignerGPT", "Designer agent").await))
}
#[cfg(feature = "git")]
"git" => {
info!("[*] \"Orchestrator\": Creating Git agent '{}'", msg.to);
Some(AgentType::Git(GitGPT::new("GitGPT", "Git agent").await))
}
_ => {
warn!("[*] \"Orchestrator\": Unknown agent type requested '{}'", msg.to);
None
}
};
if let Some(agent) = new_agent {
agents.insert(msg.to.clone(), agent);
format!("โ
Agent '{}' created", msg.to)
} else {
format!("โ Unknown agent type '{}'", msg.to)
}
}
MessageType::Terminate => {
if agents.remove(&msg.to).is_some() {
info!("[*] \"Orchestrator\": Agent '{}' terminated", msg.to);
format!("๐งน Agent '{}' terminated", msg.to)
} else {
warn!("[*] \"Orchestrator\": Attempted to terminate unknown agent '{}'", msg.to);
format!("โ Agent '{}' not found for termination", msg.to)
}
}
MessageType::Run => {
if let Some(agent) = agents.get_mut(&msg.to) {
info!("[*] \"Orchestrator\": Executing task for agent '{}'", msg.to);
let mut task = Task::from_payload(&msg.payload_json);
if let Err(e) = agent.execute(&mut task, true, false, 3).await {
error!("[*] \"Orchestrator\": Error executing task for agent '{}': {:?}", msg.to, e);
format!("โ Failed to execute task for agent '{}'", msg.to)
} else {
format!("โ
Executed task for agent '{}'", msg.to)
}
} else {
warn!("[*] \"Orchestrator\": Agent '{}' not found for running task", msg.to);
format!("โ Agent '{}' not found", msg.to)
}
}
_ => {
warn!("[*] \"Orchestrator\": Unsupported message type: {:?}", msg.msg_type);
format!("โ Unsupported message type: {:?}", msg.msg_type)
}
};
let response = Message::new(&value, &conn, MessageType::Reply, &reply);
if let Err(e) = server_handle.send(&conn, response, &signer).await {
error!("Failed to send reply: {:?}", e);
} else {
info!("[*] \"Orchestrator\": Reply sent to '{}'", conn);
}
Ok(())
}
});
if let Err(e) = server.run(verifier).await {
error!("[*] \"Orchestrator\": Server run error: {:?}", e);
return Err(e);
}
Ok(())
}
}