use anyhow::{Context as AnyhowContext, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{RwLock, mpsc};
use crate::agent::{Task, TaskResult};
use crate::coordination::conversion::AgentMappingRegistry;
use crate::coordination::{AgentMessage as CCSwarmAgentMessage, CoordinationType};
use crate::orchestrator::DelegationDecision;
pub type AgentId = String;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CCSwarmMessage {
Base(CCSwarmAgentMessage),
Delegation {
task_id: String,
agent_id: AgentId,
decision: Box<DelegationDecision>,
},
QualityReview {
task_id: String,
agent_id: AgentId,
review_type: String,
},
SessionCommand {
session_id: String,
command: SessionCommandType,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SessionCommandType {
Start,
Pause,
Resume,
Terminate,
AttachAgent { agent_id: AgentId },
DetachAgent { agent_id: AgentId },
}
pub struct AIMessageBus {
channels: Arc<RwLock<HashMap<AgentId, mpsc::Sender<CCSwarmMessage>>>>,
mapping_registry: Arc<AgentMappingRegistry>,
}
impl Default for AIMessageBus {
fn default() -> Self {
Self::new()
}
}
impl AIMessageBus {
pub fn new() -> Self {
Self {
channels: Arc::new(RwLock::new(HashMap::new())),
mapping_registry: Arc::new(AgentMappingRegistry::new()),
}
}
pub async fn register_agent(&self, agent_id: AgentId) -> mpsc::Receiver<CCSwarmMessage> {
let (tx, rx) = mpsc::channel(100);
let mut channels = self.channels.write().await;
channels.insert(agent_id, tx);
rx
}
pub async fn send_to_agent(&self, agent_id: &AgentId, message: CCSwarmMessage) -> Result<()> {
let channels = self.channels.read().await;
if let Some(tx) = channels.get(agent_id) {
tx.send(message)
.await
.context("Failed to send message to agent")?;
}
Ok(())
}
pub async fn broadcast(&self, message: CCSwarmMessage) -> Result<()> {
let channels = self.channels.read().await;
for tx in channels.values() {
let _ = tx.send(message.clone()).await;
}
Ok(())
}
pub async fn send_task(&self, agent_id: &AgentId, task: Task) -> Result<()> {
let message = CCSwarmMessage::Base(CCSwarmAgentMessage::Coordination {
from_agent: "orchestrator".to_string(),
to_agent: agent_id.clone(),
message_type: CoordinationType::TaskDelegation,
payload: serde_json::to_value(&task)?,
});
self.send_to_agent(agent_id, message).await
}
pub async fn send_task_result(&self, agent_id: &AgentId, result: TaskResult) -> Result<()> {
let message = CCSwarmMessage::Base(CCSwarmAgentMessage::Coordination {
from_agent: agent_id.clone(),
to_agent: "orchestrator".to_string(),
message_type: CoordinationType::TaskCompletion,
payload: serde_json::to_value(&result)?,
});
self.broadcast(message).await
}
pub async fn unregister_agent(&self, agent_id: &AgentId) {
let mut channels = self.channels.write().await;
channels.remove(agent_id);
}
pub fn mapping_registry(&self) -> Arc<AgentMappingRegistry> {
Arc::clone(&self.mapping_registry)
}
}