use anyhow::{Context, Result};
use async_channel::{Receiver, Sender};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use tokio::fs;
use tracing::{debug, error, info};
use crate::agent::{AgentStatus, TaskResult};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MessagePriority {
Low,
Normal,
High,
Critical,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CoordinationType {
TaskFailure,
HelpResponse,
ResourceRequest,
StatusUpdate,
Information,
TaskDelegation,
TaskCompletion,
InformationRequest,
InformationResponse,
ReviewRequest,
ReviewResponse,
SyncRequest,
Custom(String),
}
pub mod ai_message_bus;
pub mod conversion;
pub mod dialogue;
pub mod mailbox;
pub use conversion::{
AgentMappingRegistry, UnifiedAgentInfo, convert_from_ai_session, convert_to_ai_session,
};
pub use mailbox::{AgentMailbox, MailboxMessage, MailboxPriority, MailboxSystem, MessageTarget};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AgentMessage {
Registration {
agent_id: String,
capabilities: Vec<String>,
metadata: serde_json::Value,
},
TaskAssignment {
task_id: String,
agent_id: String,
task_data: serde_json::Value,
},
TaskProgress {
agent_id: String,
task_id: String,
progress: f32,
message: String,
},
HelpRequest {
agent_id: String,
context: String,
priority: MessagePriority,
},
StatusUpdate {
agent_id: String,
status: AgentStatus,
metrics: serde_json::Value,
},
TaskCompleted {
agent_id: String,
task_id: String,
result: TaskResult,
},
RequestAssistance {
agent_id: String,
task_id: String,
reason: String,
},
QualityIssue {
agent_id: String,
task_id: String,
issues: Vec<String>,
},
Coordination {
from_agent: String,
to_agent: String,
message_type: CoordinationType,
payload: serde_json::Value,
},
Heartbeat {
agent_id: String,
timestamp: DateTime<Utc>,
},
InterAgentMessage {
from_agent: String,
to_agent: String,
message: String,
timestamp: DateTime<Utc>,
},
TaskGenerated {
task_id: String,
description: String,
reasoning: String,
},
Custom {
message_type: String,
data: serde_json::Value,
},
}
#[derive(Clone)]
pub struct CoordinationBus {
sender: Sender<AgentMessage>,
receiver: Receiver<AgentMessage>,
message_dir: PathBuf,
}
impl CoordinationBus {
pub async fn new() -> Result<Self> {
let (sender, receiver) = async_channel::bounded(1000);
let message_dir = PathBuf::from("coordination/messages");
fs::create_dir_all(&message_dir)
.await
.context("Failed to create message directory")?;
Ok(Self {
sender,
receiver,
message_dir,
})
}
pub async fn send_message(&self, message: AgentMessage) -> Result<()> {
debug!("Sending message: {:?}", message);
self.sender
.send(message.clone())
.await
.context("Failed to send message through channel")?;
self.persist_message(&message).await?;
Ok(())
}
pub async fn receive_message(&self) -> Result<AgentMessage> {
self.receiver
.recv()
.await
.context("Failed to receive message from channel")
}
pub fn try_receive_message(&self) -> Option<AgentMessage> {
self.receiver.try_recv().ok()
}
async fn persist_message(&self, message: &AgentMessage) -> Result<()> {
let timestamp = Utc::now();
let filename = format!(
"{}-{}.json",
timestamp.timestamp_millis(),
uuid::Uuid::new_v4()
);
let filepath = self.message_dir.join(&filename);
let content = serde_json::to_string_pretty(message)?;
fs::write(&filepath, content)
.await
.context("Failed to persist message")?;
self.cleanup_old_messages().await?;
Ok(())
}
async fn cleanup_old_messages(&self) -> Result<()> {
let mut entries = fs::read_dir(&self.message_dir).await?;
let mut files = Vec::new();
while let Some(entry) = entries.next_entry().await? {
if let Ok(metadata) = entry.metadata().await
&& metadata.is_file()
{
files.push((entry.path(), metadata.modified()?));
}
}
files.sort_by_key(|(_, time)| *time);
if files.len() > 1000 {
let to_remove = files.len() - 1000;
for (path, _) in files.into_iter().take(to_remove) {
if let Err(e) = fs::remove_file(&path).await {
error!("Failed to remove old message file: {}", e);
}
}
}
Ok(())
}
pub async fn load_persisted_messages(&self) -> Result<Vec<AgentMessage>> {
let mut messages = Vec::new();
let mut entries = fs::read_dir(&self.message_dir).await?;
while let Some(entry) = entries.next_entry().await? {
if let Ok(content) = fs::read_to_string(entry.path()).await
&& let Ok(message) = serde_json::from_str::<AgentMessage>(&content)
{
messages.push(message);
}
}
messages.sort_by_key(|m| match m {
AgentMessage::Heartbeat { timestamp, .. } => *timestamp,
AgentMessage::InterAgentMessage { timestamp, .. } => *timestamp,
_ => Utc::now(),
});
Ok(messages)
}
pub fn get_sender(&self) -> Sender<AgentMessage> {
self.sender.clone()
}
pub async fn close(&self) -> Result<()> {
self.sender.close();
Ok(())
}
}
#[derive(Clone)]
pub struct TaskQueue {
task_dir: PathBuf,
}
impl TaskQueue {
pub async fn new() -> Result<Self> {
Self::with_dir("coordination/task-queue").await
}
pub async fn with_dir(dir: &str) -> Result<Self> {
let task_dir = PathBuf::from(dir);
fs::create_dir_all(&task_dir).await?;
Ok(Self { task_dir })
}
pub async fn add_task(&self, task: &crate::agent::Task) -> Result<()> {
let filename = format!("{}.json", task.id);
let filepath = self.task_dir.join(&filename);
let content = serde_json::to_string_pretty(task)?;
fs::write(&filepath, content).await?;
info!("Task {} added to queue", task.id);
Ok(())
}
pub async fn get_pending_tasks(&self) -> Result<Vec<crate::agent::Task>> {
let mut tasks: Vec<crate::agent::Task> = Vec::new();
let mut entries = fs::read_dir(&self.task_dir).await?;
while let Some(entry) = entries.next_entry().await? {
if let Ok(content) = fs::read_to_string(entry.path()).await
&& let Ok(task) = serde_json::from_str(&content)
{
tasks.push(task);
}
}
tasks.sort_by_key(|t| std::cmp::Reverse(t.priority));
Ok(tasks)
}
pub async fn remove_task(&self, task_id: &str) -> Result<()> {
let filename = format!("{}.json", task_id);
let filepath = self.task_dir.join(&filename);
fs::remove_file(&filepath).await?;
info!("Task {} removed from queue", task_id);
Ok(())
}
}
#[derive(Clone)]
pub struct StatusTracker {
status_dir: PathBuf,
}
impl StatusTracker {
pub async fn new() -> Result<Self> {
let status_dir = PathBuf::from("coordination/agent-status");
fs::create_dir_all(&status_dir).await?;
Ok(Self { status_dir })
}
pub async fn update_status(
&self,
agent_id: &str,
status: &AgentStatus,
additional_info: serde_json::Value,
) -> Result<()> {
let filename = format!("{}.json", agent_id);
let filepath = self.status_dir.join(&filename);
let status_info = serde_json::json!({
"agent_id": agent_id,
"status": status,
"timestamp": Utc::now(),
"additional_info": additional_info,
});
let content = serde_json::to_string_pretty(&status_info)?;
fs::write(&filepath, content).await?;
Ok(())
}
pub async fn get_status(&self, agent_id: &str) -> Result<Option<serde_json::Value>> {
let filename = format!("{}.json", agent_id);
let filepath = self.status_dir.join(&filename);
if !filepath.exists() {
return Ok(None);
}
let content = fs::read_to_string(&filepath).await?;
let status: serde_json::Value = serde_json::from_str(&content)?;
Ok(Some(status))
}
pub async fn get_all_statuses(&self) -> Result<Vec<serde_json::Value>> {
let mut statuses = Vec::new();
let mut entries = fs::read_dir(&self.status_dir).await?;
while let Some(entry) = entries.next_entry().await? {
if let Ok(content) = fs::read_to_string(entry.path()).await
&& let Ok(status) = serde_json::from_str(&content)
{
statuses.push(status);
}
}
Ok(statuses)
}
}