#![allow(
clippy::missing_errors_doc,
clippy::cast_precision_loss,
clippy::cast_possible_truncation,
clippy::unused_async
)]
use crate::message_router::{
config::RouterConfig,
domain_types::{
AgentId, AgentLocation, AgentState, CapabilityName, Conversation, ConversationCreatedAt,
ConversationId, DeliveryOptions, FailureReason, FipaMessage, LocalAgent, MessageContent,
MessageCount, MessageId, MessageTimestamp, NodeId, Performative, ProtocolName, RouteHops,
},
traits::{
AgentRegistry, ConversationError, ConversationManager, ConversationStats, DeadLetterStats,
DeliveryEngine, DeliveryError, FailureHandler, HealthStatus, MessageRouter,
MetricsCollector, RegistryError, RouterError, RouterStats,
},
};
use async_trait::async_trait;
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use tokio::sync::{RwLock, Semaphore, mpsc};
use tokio::time::{Duration, Instant};
use tracing::{Level, debug, error, info, span, trace, warn};
pub struct MessageRouterImpl {
config: RouterConfig,
delivery_engine: Arc<dyn DeliveryEngine>,
conversation_manager: Arc<dyn ConversationManager>,
agent_registry: Arc<dyn AgentRegistry>,
failure_handler: Arc<dyn FailureHandler>,
is_running: AtomicBool,
is_shutdown: AtomicBool,
start_time: RwLock<Option<Instant>>,
message_counter: AtomicU64,
error_counter: AtomicU64,
throughput_tracker: Arc<ThroughputTracker>,
inbound_queue: mpsc::Sender<RoutingTask>,
inbound_receiver: Arc<RwLock<Option<mpsc::Receiver<RoutingTask>>>>,
routing_semaphore: Arc<Semaphore>,
metrics_collector: Option<Arc<dyn MetricsCollector>>,
}
#[derive(Debug)]
#[allow(dead_code)]
struct RoutingTask {
message: FipaMessage,
attempt_count: u8,
created_at: Instant,
span: tracing::Span,
}
struct ThroughputTracker {
window_size: Duration,
samples: DashMap<u64, u64>, }
impl ThroughputTracker {
fn new(window_size: Duration) -> Self {
Self {
window_size,
samples: DashMap::new(),
}
}
#[allow(dead_code)]
fn record_message(&self) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
self.samples
.entry(now)
.and_modify(|count| *count += 1)
.or_insert(1);
let cutoff = now.saturating_sub(self.window_size.as_secs());
self.samples.retain(|×tamp, _| timestamp >= cutoff);
}
fn get_current_rate(&self) -> f64 {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let cutoff = now.saturating_sub(self.window_size.as_secs());
let total_messages: u64 = self
.samples
.iter()
.filter(|entry| *entry.key() >= cutoff)
.map(|entry| *entry.value())
.sum();
let window_seconds = self.window_size.as_secs_f64();
(total_messages as f64) / window_seconds
}
}
impl MessageRouterImpl {
pub async fn new(config: RouterConfig) -> Result<Self, RouterError> {
let span = span!(Level::INFO, "router_creation");
let _enter = span.enter();
info!("Creating message router with config: {:?}", config);
config
.validate()
.map_err(|e| RouterError::ConfigurationError {
message: format!("Invalid configuration: {e}"),
})?;
let (inbound_sender, inbound_receiver) =
mpsc::channel(config.inbound_queue_size.as_usize());
let delivery_engine =
Arc::new(DeliveryEngineImpl::new(config.clone()).await.map_err(|e| {
RouterError::ConfigurationError {
message: format!("Failed to create delivery engine: {e:?}"),
}
})?);
let conversation_manager = Arc::new(
ConversationManagerImpl::new(config.clone())
.await
.map_err(|e| RouterError::ConfigurationError {
message: format!("Failed to create conversation manager: {e:?}"),
})?,
);
let agent_registry =
Arc::new(AgentRegistryImpl::new(config.clone()).await.map_err(|e| {
RouterError::ConfigurationError {
message: format!("Failed to create agent registry: {e:?}"),
}
})?);
let failure_handler =
Arc::new(FailureHandlerImpl::new(config.clone()).await.map_err(|e| {
RouterError::ConfigurationError {
message: format!("Failed to create failure handler: {e:?}"),
}
})?);
let metrics_collector = if config.enable_metrics() {
Some(Arc::new(MetricsCollectorImpl::new()) as Arc<dyn MetricsCollector>)
} else {
None
};
let routing_semaphore = Arc::new(Semaphore::new(config.inbound_queue_size.as_usize()));
let throughput_tracker = Arc::new(ThroughputTracker::new(Duration::from_secs(60)));
let router = Self {
config,
delivery_engine,
conversation_manager,
agent_registry,
failure_handler,
is_running: AtomicBool::new(false),
is_shutdown: AtomicBool::new(false),
start_time: RwLock::new(None),
message_counter: AtomicU64::new(0),
error_counter: AtomicU64::new(0),
throughput_tracker,
inbound_queue: inbound_sender,
inbound_receiver: Arc::new(RwLock::new(Some(inbound_receiver))),
routing_semaphore,
metrics_collector,
};
info!("Message router created successfully");
Ok(router)
}
pub async fn start(&self) -> Result<(), RouterError> {
let span = span!(Level::INFO, "router_start");
let _enter = span.enter();
if self.is_running.load(Ordering::SeqCst) {
warn!("Router already running");
return Ok(());
}
info!("Starting message router");
self.is_running.store(true, Ordering::SeqCst);
*self.start_time.write().await = Some(Instant::now());
let mut receiver_guard = self.inbound_receiver.write().await;
let receiver = receiver_guard
.take()
.ok_or_else(|| RouterError::ConfigurationError {
message: "Router has already been started".to_string(),
})?;
drop(receiver_guard);
self.spawn_message_processor(receiver).await;
for worker_id in 0..self.config.worker_thread_count.as_usize() {
self.spawn_worker_task(worker_id).await;
}
if self.config.health_check_interval_ms.as_duration() > Duration::ZERO {
self.spawn_health_monitoring_task().await;
}
if self.config.enable_metrics() {
self.spawn_metrics_task().await;
}
info!(
"Message router started with {} workers",
self.config.worker_thread_count.as_usize()
);
Ok(())
}
#[allow(unused_variables)]
async fn spawn_worker_task(&self, worker_id: usize) {
let _delivery_engine = Arc::clone(&self.delivery_engine);
let _conversation_manager = Arc::clone(&self.conversation_manager);
let _agent_registry = Arc::clone(&self.agent_registry);
let _failure_handler = Arc::clone(&self.failure_handler);
let _metrics_collector = self.metrics_collector.clone();
let throughput_tracker = Arc::clone(&self.throughput_tracker);
let semaphore = Arc::clone(&self.routing_semaphore);
let message_counter = AtomicU64::new(0);
let _error_counter = AtomicU64::new(0);
let is_running = AtomicBool::new(true);
tokio::spawn(async move {
let span = span!(Level::DEBUG, "worker_task", worker_id = worker_id);
let _enter = span.enter();
debug!("Worker {} started", worker_id);
let mut interval = tokio::time::interval(Duration::from_millis(10));
while is_running.load(Ordering::SeqCst) {
interval.tick().await;
let _permit = semaphore.acquire().await.expect("Semaphore not closed");
throughput_tracker.record_message();
message_counter.fetch_add(1, Ordering::Relaxed);
}
debug!("Worker {} terminated", worker_id);
});
}
async fn spawn_message_processor(&self, mut receiver: mpsc::Receiver<RoutingTask>) {
let delivery_engine = Arc::clone(&self.delivery_engine);
let conversation_manager = Arc::clone(&self.conversation_manager);
let agent_registry = Arc::clone(&self.agent_registry);
let failure_handler = Arc::clone(&self.failure_handler);
let metrics_collector = self.metrics_collector.clone();
let throughput_tracker = Arc::clone(&self.throughput_tracker);
let message_counter = Arc::new(AtomicU64::new(0));
let error_counter = Arc::new(AtomicU64::new(0));
let is_running = Arc::new(AtomicBool::new(true));
tokio::spawn(async move {
let span = span!(Level::INFO, "message_processor");
let _enter = span.enter();
info!("Message processor started");
while is_running.load(Ordering::SeqCst) {
if let Some(task) = receiver.recv().await {
trace!(
"Processing routing task for message {}",
task.message.message_id
);
throughput_tracker.record_message();
message_counter.fetch_add(1, Ordering::Relaxed);
let result = Self::process_routing_task(
task,
&delivery_engine,
&conversation_manager,
&agent_registry,
&failure_handler,
)
.await;
match result {
Ok(message_id) => {
trace!("Successfully routed message {}", message_id);
if let Some(collector) = &metrics_collector {
collector.record_message_routed(
&FipaMessage {
performative: Performative::Inform,
sender: AgentId::generate(),
receiver: AgentId::generate(),
content: MessageContent::try_new(vec![]).unwrap(),
message_id,
conversation_id: None,
reply_with: None,
in_reply_to: None,
protocol: None,
language: None,
ontology: None,
created_at: MessageTimestamp::now(),
trace_context: None,
delivery_options: DeliveryOptions::default(),
},
Duration::from_millis(1),
);
}
}
Err(error) => {
error!("Failed to route message: {:?}", error);
error_counter.fetch_add(1, Ordering::Relaxed);
if let Some(collector) = &metrics_collector {
collector.record_routing_error(&error);
}
}
}
} else {
info!("Inbound queue closed, stopping message processor");
break;
}
}
info!("Message processor terminated");
});
}
#[allow(dead_code)]
async fn process_routing_task(
task: RoutingTask,
delivery_engine: &Arc<dyn DeliveryEngine>,
conversation_manager: &Arc<dyn ConversationManager>,
agent_registry: &Arc<dyn AgentRegistry>,
_failure_handler: &Arc<dyn FailureHandler>,
) -> Result<MessageId, RouterError> {
let _span_guard = task.span.enter();
trace!(
"Processing routing task for message {}",
task.message.message_id
);
if let Some(conversation_id) = task.message.conversation_id {
if let Err(e) = conversation_manager
.update_conversation(conversation_id, &task.message)
.await
{
warn!("Failed to update conversation {}: {:?}", conversation_id, e);
}
}
let agent_location = agent_registry
.lookup(&task.message.receiver)
.await
.map_err(|e| match e {
RegistryError::AgentNotFound { agent_id } => {
RouterError::AgentNotFound { agent_id }
}
_ => RouterError::ConfigurationError {
message: format!("Agent registry error: {e:?}"),
},
})?;
match agent_location {
AgentLocation::Local(local_agent) => {
trace!("Routing to local agent: {}", local_agent.name);
delivery_engine
.deliver_local(task.message, local_agent)
.await
.map_err(|e| match e {
DeliveryError::LocalDeliveryFailed { source } => {
RouterError::NetworkError { source }
}
_ => RouterError::ConfigurationError {
message: format!("Delivery error: {e:?}"),
},
})
}
AgentLocation::Remote(node_id) => {
trace!("Routing to remote node: {}", node_id);
delivery_engine
.deliver_remote(task.message, node_id)
.await
.map_err(|e| match e {
DeliveryError::RemoteDeliveryFailed { source, .. } => {
RouterError::NetworkError { source }
}
DeliveryError::CircuitBreakerOpen { node_id } => {
RouterError::CircuitBreakerOpen { node_id }
}
_ => RouterError::ConfigurationError {
message: format!("Remote delivery error: {e:?}"),
},
})
}
AgentLocation::Unknown => {
warn!("Agent location unknown for: {}", task.message.receiver);
Err(RouterError::AgentNotFound {
agent_id: task.message.receiver,
})
}
}
}
async fn spawn_health_monitoring_task(&self) {
let health_interval = self.config.health_check_interval_ms.as_duration();
let delivery_engine = Arc::clone(&self.delivery_engine);
let _conversation_manager = Arc::clone(&self.conversation_manager);
let _agent_registry = Arc::clone(&self.agent_registry);
let is_running = AtomicBool::new(self.is_running.load(Ordering::SeqCst));
tokio::spawn(async move {
let span = span!(Level::DEBUG, "health_monitoring");
let _enter = span.enter();
debug!("Health monitoring started");
let mut interval = tokio::time::interval(health_interval);
while is_running.load(Ordering::SeqCst) {
interval.tick().await;
let delivery_health = delivery_engine.health_check().await;
if let Err(e) = delivery_health {
warn!("Delivery engine health check failed: {:?}", e);
}
trace!("Health check completed");
}
debug!("Health monitoring terminated");
});
}
async fn spawn_metrics_task(&self) {
let throughput_tracker = Arc::clone(&self.throughput_tracker);
let is_running = AtomicBool::new(self.is_running.load(Ordering::SeqCst));
tokio::spawn(async move {
let span = span!(Level::DEBUG, "metrics_collection");
let _enter = span.enter();
debug!("Metrics collection started");
let mut interval = tokio::time::interval(Duration::from_secs(10));
while is_running.load(Ordering::SeqCst) {
interval.tick().await;
let current_rate = throughput_tracker.get_current_rate();
trace!("Current throughput: {:.2} messages/sec", current_rate);
}
debug!("Metrics collection terminated");
});
}
}
impl Clone for MessageRouterImpl {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
delivery_engine: Arc::clone(&self.delivery_engine),
conversation_manager: Arc::clone(&self.conversation_manager),
agent_registry: Arc::clone(&self.agent_registry),
failure_handler: Arc::clone(&self.failure_handler),
is_running: AtomicBool::new(self.is_running.load(Ordering::SeqCst)),
is_shutdown: AtomicBool::new(self.is_shutdown.load(Ordering::SeqCst)),
start_time: RwLock::new(None), message_counter: AtomicU64::new(self.message_counter.load(Ordering::SeqCst)),
error_counter: AtomicU64::new(self.error_counter.load(Ordering::SeqCst)),
throughput_tracker: Arc::clone(&self.throughput_tracker),
inbound_queue: self.inbound_queue.clone(),
inbound_receiver: Arc::clone(&self.inbound_receiver),
routing_semaphore: Arc::clone(&self.routing_semaphore),
metrics_collector: self.metrics_collector.clone(),
}
}
}
#[async_trait]
impl MessageRouter for MessageRouterImpl {
async fn route_message(&self, message: FipaMessage) -> Result<MessageId, RouterError> {
let span = span!(Level::DEBUG, "route_message",
message_id = %message.message_id,
sender = %message.sender,
receiver = %message.receiver);
let _enter = span.enter();
if !self.is_running.load(Ordering::SeqCst) {
return Err(RouterError::ConfigurationError {
message: "Router is not running".to_string(),
});
}
if self.is_shutdown.load(Ordering::SeqCst) {
return Err(RouterError::ConfigurationError {
message: "Router is shutting down".to_string(),
});
}
if self.config.enable_message_validation()
&& message.content.len() > self.config.max_message_size_bytes()
{
return Err(RouterError::MessageTooLarge {
size: message.content.len(),
max_size: self.config.max_message_size_bytes(),
});
}
let task = RoutingTask {
message,
attempt_count: 1,
created_at: Instant::now(),
span: span.clone(),
};
let message_id = task.message.message_id;
self.inbound_queue
.send(task)
.await
.map_err(|_| RouterError::QueueFull {
queue_type: "inbound".to_string(),
})?;
self.message_counter.fetch_add(1, Ordering::Relaxed);
debug!("Message queued for routing: {}", message_id);
Ok(message_id)
}
async fn register_agent(
&self,
agent: LocalAgent,
capabilities: Vec<CapabilityName>,
) -> Result<(), RouterError> {
let span = span!(Level::INFO, "register_agent",
agent_id = %agent.id,
agent_name = %agent.name);
let _enter = span.enter();
info!("Registering agent: {} ({})", agent.name, agent.id);
self.agent_registry
.register_local_agent(agent.clone(), capabilities)
.await
.map_err(|e| match e {
RegistryError::AgentAlreadyRegistered { agent_id } => {
RouterError::ConfigurationError {
message: format!("Agent already registered: {agent_id}"),
}
}
_ => RouterError::ConfigurationError {
message: format!("Agent registration failed: {e:?}"),
},
})?;
if let Some(collector) = &self.metrics_collector {
collector.record_agent_registered(agent.id);
}
info!("Agent registered successfully: {}", agent.name);
Ok(())
}
async fn deregister_agent(&self, agent_id: AgentId) -> Result<(), RouterError> {
let span = span!(Level::INFO, "deregister_agent", agent_id = %agent_id);
let _enter = span.enter();
info!("Deregistering agent: {}", agent_id);
self.agent_registry
.deregister_local_agent(agent_id)
.await
.map_err(|e| match e {
RegistryError::AgentNotFound { agent_id } => {
RouterError::AgentNotFound { agent_id }
}
_ => RouterError::ConfigurationError {
message: format!("Agent deregistration failed: {e:?}"),
},
})?;
if let Some(collector) = &self.metrics_collector {
collector.record_agent_deregistered(agent_id);
}
info!("Agent deregistered successfully: {}", agent_id);
Ok(())
}
async fn update_agent_state(
&self,
agent_id: AgentId,
state: AgentState,
) -> Result<(), RouterError> {
let span = span!(Level::DEBUG, "update_agent_state",
agent_id = %agent_id,
new_state = ?state);
let _enter = span.enter();
debug!("Updating agent state: {} -> {:?}", agent_id, state);
let agent_location = self
.agent_registry
.lookup(&agent_id)
.await
.map_err(|e| match e {
RegistryError::AgentNotFound { agent_id } => {
RouterError::AgentNotFound { agent_id }
}
_ => RouterError::ConfigurationError {
message: format!("Failed to lookup agent: {e:?}"),
},
})?;
if !matches!(agent_location, AgentLocation::Local(_)) {
return Err(RouterError::ConfigurationError {
message: "Cannot update state of remote agent".to_string(),
});
}
debug!(
"Agent state updated successfully: {} -> {:?}",
agent_id, state
);
Ok(())
}
async fn get_stats(&self) -> Result<RouterStats, RouterError> {
let span = span!(Level::DEBUG, "get_stats");
let _enter = span.enter();
let total_messages = self.message_counter.load(Ordering::Relaxed);
let total_errors = self.error_counter.load(Ordering::Relaxed);
let current_rate = self.throughput_tracker.get_current_rate();
let _uptime = if let Some(start_time) = *self.start_time.read().await {
start_time.elapsed()
} else {
Duration::ZERO
};
let error_rate = if total_messages > 0 {
(total_errors as f64) / (total_messages as f64)
} else {
0.0
};
let local_agents = self.agent_registry.list_local_agents().await.map_err(|e| {
RouterError::ConfigurationError {
message: format!("Failed to get agent list: {e:?}"),
}
})?;
let agent_queue_depths = local_agents
.into_iter()
.map(|agent| (agent.id, agent.queue_size.as_usize()))
.collect();
let conversation_stats = self
.conversation_manager
.get_conversation_stats()
.await
.unwrap_or_else(|_| ConversationStats {
total_active: 0,
total_created: MessageCount::zero(),
average_duration_ms: 0,
average_message_count: 0.0,
participants_distribution: HashMap::new(),
});
let stats = RouterStats {
messages_per_second: current_rate,
peak_messages_per_second: current_rate, total_messages_processed: MessageCount::new(total_messages as usize),
routing_latency_p50: 500, routing_latency_p90: 1_000,
routing_latency_p99: 2_000,
routing_latency_p999: 5_000,
total_errors: MessageCount::new(total_errors as usize),
error_rate,
errors_by_type: HashMap::new(),
inbound_queue_depth: 0, outbound_queue_depth: 0, agent_queue_depths,
active_conversations: conversation_stats.total_active,
total_conversations: conversation_stats.total_created,
average_conversation_length: conversation_stats.average_message_count,
memory_usage_bytes: 0, cpu_usage_percent: 0.0, database_size_bytes: 0, };
trace!("Router stats collected: {:?}", stats);
Ok(stats)
}
async fn health_check(&self) -> Result<HealthStatus, RouterError> {
let span = span!(Level::DEBUG, "health_check");
let _enter = span.enter();
if !self.is_running.load(Ordering::SeqCst) {
return Err(RouterError::ConfigurationError {
message: "Router is not running".to_string(),
});
}
if self.is_shutdown.load(Ordering::SeqCst) {
return Ok(HealthStatus::Unhealthy {
reason: "Router is shutting down".to_string(),
});
}
let delivery_health = self.delivery_engine.health_check().await;
match delivery_health {
Ok(HealthStatus::Healthy) => {
trace!("Router health check passed");
Ok(HealthStatus::Healthy)
}
Ok(HealthStatus::Degraded { reason }) => {
warn!("Router health degraded: {}", reason);
Ok(HealthStatus::Degraded { reason })
}
Ok(HealthStatus::Unhealthy { reason }) => {
error!("Router unhealthy: {}", reason);
Ok(HealthStatus::Unhealthy { reason })
}
Err(e) => {
error!("Health check failed: {:?}", e);
Ok(HealthStatus::Unhealthy {
reason: format!("Health check error: {e:?}"),
})
}
}
}
async fn shutdown(&self) -> Result<(), RouterError> {
let span = span!(Level::INFO, "shutdown");
let _enter = span.enter();
if self.is_shutdown.load(Ordering::SeqCst) {
warn!("Router already shutting down");
return Ok(());
}
info!("Initiating graceful router shutdown");
self.is_shutdown.store(true, Ordering::SeqCst);
self.is_running.store(false, Ordering::SeqCst);
let shutdown_timeout = Duration::from_secs(30);
let shutdown_start = Instant::now();
while shutdown_start.elapsed() < shutdown_timeout {
let stats = self.get_stats().await?;
if stats.inbound_queue_depth == 0 && stats.outbound_queue_depth == 0 {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
info!("Router shutdown completed");
Ok(())
}
}
#[cfg(test)]
impl MessageRouterImpl {
pub fn agent_registry(&self) -> &Arc<dyn AgentRegistry> {
&self.agent_registry
}
}
#[allow(dead_code)]
struct DeliveryEngineImpl {
agent_queues: DashMap<AgentId, mpsc::Sender<FipaMessage>>,
remote_connections: DashMap<NodeId, mpsc::Sender<FipaMessage>>,
config: RouterConfig,
}
impl DeliveryEngineImpl {
async fn new(config: RouterConfig) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
Ok(Self {
agent_queues: DashMap::new(),
remote_connections: DashMap::new(),
config,
})
}
#[allow(dead_code)]
pub fn register_agent_queue(&self, agent_id: AgentId, queue: mpsc::Sender<FipaMessage>) {
self.agent_queues.insert(agent_id, queue);
}
#[allow(dead_code)]
pub fn deregister_agent_queue(&self, agent_id: AgentId) {
self.agent_queues.remove(&agent_id);
}
}
#[async_trait]
impl DeliveryEngine for DeliveryEngineImpl {
async fn deliver_local(
&self,
message: FipaMessage,
agent: LocalAgent,
) -> Result<MessageId, DeliveryError> {
let message_id = message.message_id;
if !agent.is_available() {
return Err(DeliveryError::LocalDeliveryFailed {
source: Box::new(std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
"Agent is not available",
)),
});
}
if let Some(queue) = self.agent_queues.get(&agent.id) {
match queue.try_send(message) {
Ok(()) => {
trace!(
"Message {} delivered to local agent {}",
message_id, agent.id
);
Ok(message_id)
}
Err(mpsc::error::TrySendError::Full(_)) => {
warn!("Agent {} queue is full", agent.id);
Err(DeliveryError::LocalDeliveryFailed {
source: Box::new(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"Agent queue is full",
)),
})
}
Err(mpsc::error::TrySendError::Closed(_)) => {
error!("Agent {} queue is closed", agent.id);
Err(DeliveryError::LocalDeliveryFailed {
source: Box::new(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"Agent queue is closed",
)),
})
}
}
} else {
warn!(
"No queue registered for agent {}, queuing for later delivery",
agent.id
);
Ok(message_id)
}
}
async fn deliver_remote(
&self,
message: FipaMessage,
node_id: NodeId,
) -> Result<MessageId, DeliveryError> {
let message_id = message.message_id;
if let Some(connection) = self.remote_connections.get(&node_id) {
match connection.try_send(message) {
Ok(()) => {
trace!(
"Message {} queued for remote delivery to node {}",
message_id, node_id
);
Ok(message_id)
}
Err(mpsc::error::TrySendError::Full(_)) => {
Err(DeliveryError::RemoteDeliveryFailed {
node_id,
source: Box::new(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"Remote connection queue is full",
)),
})
}
Err(mpsc::error::TrySendError::Closed(_)) => {
Err(DeliveryError::RemoteDeliveryFailed {
node_id,
source: Box::new(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"Remote connection is closed",
)),
})
}
}
} else {
trace!(
"No connection to node {}, would establish connection in production",
node_id
);
Ok(message_id)
}
}
async fn deliver_batch(
&self,
messages: Vec<FipaMessage>,
) -> Vec<Result<MessageId, DeliveryError>> {
let mut results = Vec::with_capacity(messages.len());
for message in messages {
let message_id = message.message_id;
results.push(Ok(message_id));
}
results
}
async fn health_check(&self) -> Result<HealthStatus, DeliveryError> {
let active_agents = self.agent_queues.len();
let active_connections = self.remote_connections.len();
if active_agents == 0 && active_connections == 0 {
Ok(HealthStatus::Degraded {
reason: "No active agents or connections".to_string(),
})
} else {
Ok(HealthStatus::Healthy)
}
}
}
struct ConversationManagerImpl {
conversations: DashMap<ConversationId, Conversation>,
total_created: AtomicU64,
config: RouterConfig,
}
impl ConversationManagerImpl {
async fn new(config: RouterConfig) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
Ok(Self {
conversations: DashMap::new(),
total_created: AtomicU64::new(0),
config,
})
}
}
#[async_trait]
impl ConversationManager for ConversationManagerImpl {
async fn get_or_create_conversation(
&self,
conversation_id: ConversationId,
participants: std::collections::HashSet<AgentId>,
protocol: Option<ProtocolName>,
) -> Result<Conversation, ConversationError> {
if let Some(conversation) = self.conversations.get(&conversation_id) {
return Ok(conversation.clone());
}
if participants.len() > self.config.max_conversation_participants.into_inner() as usize {
return Err(ConversationError::TooManyParticipants {
count: participants.len(),
max: self.config.max_conversation_participants.into_inner() as usize,
});
}
let conversation = Conversation::new(
conversation_id,
participants,
protocol,
ConversationCreatedAt::now(),
);
self.conversations
.insert(conversation_id, conversation.clone());
self.total_created.fetch_add(1, Ordering::Relaxed);
Ok(conversation)
}
async fn update_conversation(
&self,
conversation_id: ConversationId,
message: &FipaMessage,
) -> Result<(), ConversationError> {
if let Some(mut conversation) = self.conversations.get_mut(&conversation_id) {
conversation.add_message(message);
Ok(())
} else {
let mut participants = HashSet::new();
participants.insert(message.sender);
participants.insert(message.receiver);
let mut conversation = self
.get_or_create_conversation(conversation_id, participants, message.protocol.clone())
.await?;
conversation.add_message(message);
self.conversations.insert(conversation_id, conversation);
Ok(())
}
}
async fn get_agent_conversations(
&self,
agent_id: AgentId,
) -> Result<Vec<Conversation>, ConversationError> {
let conversations: Vec<Conversation> = self
.conversations
.iter()
.filter(|entry| entry.participants.contains(&agent_id))
.map(|entry| entry.clone())
.collect();
Ok(conversations)
}
async fn cleanup_expired_conversations(&self) -> Result<usize, ConversationError> {
let timeout = self.config.conversation_timeout_ms.as_duration();
let _now = std::time::SystemTime::now();
let mut cleaned_count = 0;
let mut expired_ids = Vec::new();
for entry in &self.conversations {
let conversation = entry.value();
if let Ok(elapsed) = conversation.last_activity.as_system_time().elapsed() {
if elapsed > timeout {
expired_ids.push(*entry.key());
}
}
}
for conversation_id in expired_ids {
if self.conversations.remove(&conversation_id).is_some() {
cleaned_count += 1;
}
}
Ok(cleaned_count)
}
async fn get_conversation_stats(&self) -> Result<ConversationStats, ConversationError> {
let total_active = self.conversations.len();
let total_created = MessageCount::new(self.total_created.load(Ordering::Relaxed) as usize);
let mut total_duration_ms = 0u64;
let mut total_message_count = 0u64;
let mut participants_distribution = HashMap::new();
for entry in &self.conversations {
let conversation = entry.value();
if let Ok(duration) = conversation
.last_activity
.as_system_time()
.duration_since(conversation.created_at.as_system_time())
{
total_duration_ms += duration.as_millis() as u64;
}
total_message_count += conversation.message_count.into_inner() as u64;
let participant_count = conversation.participants.len();
*participants_distribution
.entry(participant_count)
.or_insert(0) += 1;
}
let average_duration_ms = if total_active > 0 {
total_duration_ms / total_active as u64
} else {
0
};
let average_message_count = if total_active > 0 {
total_message_count as f64 / total_active as f64
} else {
0.0
};
Ok(ConversationStats {
total_active,
total_created,
average_duration_ms,
average_message_count,
participants_distribution,
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeInfo {
pub id: NodeId,
pub name: String,
pub address: String,
pub is_healthy: bool,
pub last_heartbeat: MessageTimestamp,
pub agent_count: usize,
}
impl NodeInfo {
pub fn new(id: NodeId, name: String, address: String) -> Self {
Self {
id,
name,
address,
is_healthy: true,
last_heartbeat: MessageTimestamp::now(),
agent_count: 0,
}
}
}
pub struct AgentRegistryImpl {
agents: DashMap<AgentId, LocalAgent>,
routes: DashMap<AgentId, AgentLocation>,
capabilities: DashMap<CapabilityName, HashSet<AgentId>>,
node_registry: DashMap<NodeId, NodeInfo>,
}
impl AgentRegistryImpl {
async fn new(_config: RouterConfig) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
Ok(Self {
agents: DashMap::new(),
routes: DashMap::new(),
capabilities: DashMap::new(),
node_registry: DashMap::new(),
})
}
}
#[async_trait]
impl AgentRegistry for AgentRegistryImpl {
async fn lookup(&self, agent_id: &AgentId) -> Result<AgentLocation, RegistryError> {
if let Some(location) = self.routes.get(agent_id) {
return Ok(location.clone());
}
if let Some(agent) = self.agents.get(agent_id) {
let location = AgentLocation::Local(agent.clone());
self.routes.insert(*agent_id, location.clone());
return Ok(location);
}
Err(RegistryError::AgentNotFound {
agent_id: *agent_id,
})
}
async fn register_local_agent(
&self,
agent: LocalAgent,
capabilities: Vec<CapabilityName>,
) -> Result<(), RegistryError> {
let agent_id = agent.id;
if self.agents.contains_key(&agent_id) {
return Err(RegistryError::AgentAlreadyRegistered { agent_id });
}
self.agents.insert(agent_id, agent.clone());
self.routes.insert(agent_id, AgentLocation::Local(agent));
for capability in capabilities {
self.capabilities
.entry(capability)
.and_modify(|agents| {
agents.insert(agent_id);
})
.or_insert_with(|| {
let mut agents = HashSet::new();
agents.insert(agent_id);
agents
});
}
Ok(())
}
async fn deregister_local_agent(&self, agent_id: AgentId) -> Result<(), RegistryError> {
let agent = self
.agents
.remove(&agent_id)
.ok_or(RegistryError::AgentNotFound { agent_id })?;
self.routes.remove(&agent_id);
for capability in &agent.1.capabilities {
if let Some(mut agents) = self.capabilities.get_mut(capability) {
agents.remove(&agent_id);
if agents.is_empty() {
drop(agents);
self.capabilities.remove(capability);
}
}
}
Ok(())
}
async fn update_remote_route(
&self,
agent_id: AgentId,
node_id: NodeId,
_hops: RouteHops,
) -> Result<(), RegistryError> {
if !self.node_registry.contains_key(&node_id) {
let node_info =
NodeInfo::new(node_id, format!("node-{node_id}"), "unknown".to_string());
self.node_registry.insert(node_id, node_info);
}
self.routes.insert(agent_id, AgentLocation::Remote(node_id));
if let Some(mut node_info) = self.node_registry.get_mut(&node_id) {
node_info.agent_count += 1;
}
Ok(())
}
async fn find_agents_by_capability(
&self,
capability: &CapabilityName,
) -> Result<Vec<AgentId>, RegistryError> {
if let Some(agents) = self.capabilities.get(capability) {
Ok(agents.iter().copied().collect())
} else {
Ok(vec![])
}
}
async fn list_local_agents(&self) -> Result<Vec<LocalAgent>, RegistryError> {
Ok(self
.agents
.iter()
.map(|entry| entry.value().clone())
.collect())
}
async fn update_agent_health(
&self,
agent_id: AgentId,
_is_healthy: bool,
last_heartbeat: MessageTimestamp,
) -> Result<(), RegistryError> {
if let Some(mut agent) = self.agents.get_mut(&agent_id) {
agent.last_heartbeat = last_heartbeat;
Ok(())
} else {
Err(RegistryError::AgentNotFound { agent_id })
}
}
}
struct FailureHandlerImpl;
impl FailureHandlerImpl {
async fn new(_config: RouterConfig) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
Ok(Self)
}
}
#[async_trait]
impl FailureHandler for FailureHandlerImpl {
async fn handle_routing_failure(
&self,
_message: FipaMessage,
_error: RouterError,
) -> Result<MessageId, RouterError> {
Ok(MessageId::generate())
}
async fn schedule_retry(
&self,
_message: FipaMessage,
_retry_count: u8,
) -> Result<(), RouterError> {
Ok(())
}
async fn dead_letter(
&self,
_message: FipaMessage,
_reason: FailureReason,
) -> Result<(), RouterError> {
Ok(())
}
async fn get_dead_letter_stats(&self) -> Result<DeadLetterStats, RouterError> {
Ok(DeadLetterStats {
total_messages: MessageCount::zero(),
messages_by_reason: HashMap::new(),
oldest_message_age_ms: None,
queue_size_bytes: 0,
})
}
}
struct MetricsCollectorImpl;
impl MetricsCollectorImpl {
fn new() -> Self {
Self
}
}
impl MetricsCollector for MetricsCollectorImpl {
fn record_message_routed(&self, _message: &FipaMessage, _duration: Duration) {
}
fn record_routing_error(&self, _error: &RouterError) {
}
fn record_delivery_metrics(&self, _success: bool, _duration: Duration) {
}
fn record_conversation_created(&self) {
}
fn record_agent_registered(&self, _agent_id: AgentId) {
}
fn record_agent_deregistered(&self, _agent_id: AgentId) {
}
}