use async_trait::async_trait;
use std::collections::HashMap;
use thiserror::Error;
use super::domain_types::{
AgentId, AgentLocation, AgentState, CapabilityName, Conversation, ConversationId,
FailureReason, FipaMessage, LocalAgent, MaxRetries, MessageCount, MessageId, MessageTimestamp,
NodeId, ProtocolName, RouteHops, RouteInfo,
};
#[derive(Debug, Error)]
pub enum RouterError {
#[error("Agent not found: {agent_id}")]
AgentNotFound { agent_id: AgentId },
#[error("Message too large: {size} bytes (max: {max_size} bytes)")]
MessageTooLarge { size: usize, max_size: usize },
#[error("Queue full: {queue_type}")]
QueueFull { queue_type: String },
#[error("Network error: {source}")]
NetworkError {
#[source]
source: Box<dyn std::error::Error + Send + Sync>,
},
#[error("Serialization error: {source}")]
SerializationError {
#[source]
source: serde_json::Error,
},
#[error("Timeout: operation took longer than {timeout_ms}ms")]
Timeout { timeout_ms: u64 },
#[error("Configuration error: {message}")]
ConfigurationError { message: String },
#[error("Storage error: {source}")]
StorageError {
#[source]
source: Box<dyn std::error::Error + Send + Sync>,
},
#[error("Circuit breaker open for node: {node_id}")]
CircuitBreakerOpen { node_id: NodeId },
#[error("Validation error: {field} - {reason}")]
ValidationError { field: String, reason: String },
#[error("Resource exhausted: {resource}")]
ResourceExhausted { resource: String },
}
#[derive(Debug, Error)]
pub enum DeliveryError {
#[error("Local delivery failed: {source}")]
LocalDeliveryFailed {
#[source]
source: Box<dyn std::error::Error + Send + Sync>,
},
#[error("Remote delivery failed to node {node_id}: {source}")]
RemoteDeliveryFailed {
node_id: NodeId,
#[source]
source: Box<dyn std::error::Error + Send + Sync>,
},
#[error("Circuit breaker open for node: {node_id}")]
CircuitBreakerOpen { node_id: NodeId },
#[error("Connection pool exhausted for node: {node_id}")]
ConnectionPoolExhausted { node_id: NodeId },
#[error("Serialization failed: {source}")]
SerializationFailed {
#[source]
source: serde_json::Error,
},
#[error("Retry limit exceeded: {max_retries}")]
RetryLimitExceeded { max_retries: MaxRetries },
}
#[derive(Debug, Error)]
pub enum ConversationError {
#[error("Conversation not found: {conversation_id}")]
ConversationNotFound { conversation_id: ConversationId },
#[error("Too many participants: {count} (max: {max})")]
TooManyParticipants { count: usize, max: usize },
#[error("Storage error: {source}")]
StorageError {
#[source]
source: Box<dyn std::error::Error + Send + Sync>,
},
#[error("Conversation timeout: {conversation_id}")]
ConversationTimeout { conversation_id: ConversationId },
}
#[derive(Debug, Error)]
pub enum RegistryError {
#[error("Agent already registered: {agent_id}")]
AgentAlreadyRegistered { agent_id: AgentId },
#[error("Agent not found: {agent_id}")]
AgentNotFound { agent_id: AgentId },
#[error("Invalid agent state transition: {from:?} -> {to:?}")]
InvalidStateTransition { from: AgentState, to: AgentState },
#[error("Storage error: {source}")]
StorageError {
#[source]
source: Box<dyn std::error::Error + Send + Sync>,
},
#[error("Health check failed for agent: {agent_id}")]
HealthCheckFailed { agent_id: AgentId },
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum HealthStatus {
Healthy,
Degraded { reason: String },
Unhealthy { reason: String },
}
#[derive(Debug, Clone)]
pub struct RouterStats {
pub messages_per_second: f64,
pub peak_messages_per_second: f64,
pub total_messages_processed: MessageCount,
pub routing_latency_p50: u64,
pub routing_latency_p90: u64,
pub routing_latency_p99: u64,
pub routing_latency_p999: u64,
pub total_errors: MessageCount,
pub error_rate: f64,
pub errors_by_type: HashMap<String, MessageCount>,
pub inbound_queue_depth: usize,
pub outbound_queue_depth: usize,
pub agent_queue_depths: HashMap<AgentId, usize>,
pub active_conversations: usize,
pub total_conversations: MessageCount,
pub average_conversation_length: f64,
pub memory_usage_bytes: usize,
pub cpu_usage_percent: f64,
pub database_size_bytes: usize,
}
#[async_trait]
pub trait MessageRouter: Send + Sync {
async fn route_message(&self, message: FipaMessage) -> Result<MessageId, RouterError>;
async fn register_agent(
&self,
agent: LocalAgent,
capabilities: Vec<CapabilityName>,
) -> Result<(), RouterError>;
async fn deregister_agent(&self, agent_id: AgentId) -> Result<(), RouterError>;
async fn update_agent_state(
&self,
agent_id: AgentId,
state: AgentState,
) -> Result<(), RouterError>;
async fn get_stats(&self) -> Result<RouterStats, RouterError>;
async fn health_check(&self) -> Result<HealthStatus, RouterError>;
async fn shutdown(&self) -> Result<(), RouterError>;
}
#[async_trait]
pub trait DeliveryEngine: Send + Sync {
async fn deliver_local(
&self,
message: FipaMessage,
agent: LocalAgent,
) -> Result<MessageId, DeliveryError>;
async fn deliver_remote(
&self,
message: FipaMessage,
node_id: NodeId,
) -> Result<MessageId, DeliveryError>;
async fn deliver_batch(
&self,
messages: Vec<FipaMessage>,
) -> Vec<Result<MessageId, DeliveryError>>;
async fn health_check(&self) -> Result<HealthStatus, DeliveryError>;
}
#[async_trait]
pub trait ConversationManager: Send + Sync {
async fn get_or_create_conversation(
&self,
conversation_id: ConversationId,
participants: std::collections::HashSet<AgentId>,
protocol: Option<ProtocolName>,
) -> Result<Conversation, ConversationError>;
async fn update_conversation(
&self,
conversation_id: ConversationId,
message: &FipaMessage,
) -> Result<(), ConversationError>;
async fn get_agent_conversations(
&self,
agent_id: AgentId,
) -> Result<Vec<Conversation>, ConversationError>;
async fn cleanup_expired_conversations(&self) -> Result<usize, ConversationError>;
async fn get_conversation_stats(&self) -> Result<ConversationStats, ConversationError>;
}
#[async_trait]
pub trait AgentRegistry: Send + Sync {
async fn lookup(&self, agent_id: &AgentId) -> Result<AgentLocation, RegistryError>;
async fn register_local_agent(
&self,
agent: LocalAgent,
capabilities: Vec<CapabilityName>,
) -> Result<(), RegistryError>;
async fn deregister_local_agent(&self, agent_id: AgentId) -> Result<(), RegistryError>;
async fn update_remote_route(
&self,
agent_id: AgentId,
node_id: NodeId,
hops: RouteHops,
) -> Result<(), RegistryError>;
async fn find_agents_by_capability(
&self,
capability: &CapabilityName,
) -> Result<Vec<AgentId>, RegistryError>;
async fn list_local_agents(&self) -> Result<Vec<LocalAgent>, RegistryError>;
async fn update_agent_health(
&self,
agent_id: AgentId,
is_healthy: bool,
last_heartbeat: MessageTimestamp,
) -> Result<(), RegistryError>;
}
#[async_trait]
pub trait MessageStorage: Send + Sync {
async fn store_message(&self, message: &FipaMessage) -> Result<(), RouterError>;
async fn get_message(&self, message_id: MessageId) -> Result<Option<FipaMessage>, RouterError>;
async fn remove_message(&self, message_id: MessageId) -> Result<(), RouterError>;
async fn list_agent_messages(
&self,
agent_id: AgentId,
limit: Option<usize>,
) -> Result<Vec<FipaMessage>, RouterError>;
}
#[async_trait]
pub trait ConversationStorage: Send + Sync {
async fn save_conversation(&self, conversation: &Conversation)
-> Result<(), ConversationError>;
async fn load_conversation(
&self,
conversation_id: ConversationId,
) -> Result<Option<Conversation>, ConversationError>;
async fn archive_conversation(
&self,
conversation: &Conversation,
) -> Result<(), ConversationError>;
async fn list_agent_conversations(
&self,
agent_id: AgentId,
) -> Result<Vec<ConversationId>, ConversationError>;
}
#[async_trait]
pub trait AgentStorage: Send + Sync {
async fn save_agent_registration(
&self,
agent: &LocalAgent,
capabilities: &[CapabilityName],
) -> Result<(), RegistryError>;
async fn load_agent(&self, agent_id: AgentId) -> Result<Option<LocalAgent>, RegistryError>;
async fn remove_agent(&self, agent_id: AgentId) -> Result<(), RegistryError>;
async fn save_route(&self, agent_id: AgentId, route: &RouteInfo) -> Result<(), RegistryError>;
async fn load_route(&self, agent_id: AgentId) -> Result<Option<RouteInfo>, RegistryError>;
async fn list_agents(&self) -> Result<Vec<LocalAgent>, RegistryError>;
}
#[async_trait]
pub trait FailureHandler: Send + Sync {
async fn handle_routing_failure(
&self,
message: FipaMessage,
error: RouterError,
) -> Result<MessageId, RouterError>;
async fn schedule_retry(
&self,
message: FipaMessage,
retry_count: u8,
) -> Result<(), RouterError>;
async fn dead_letter(
&self,
message: FipaMessage,
reason: FailureReason,
) -> Result<(), RouterError>;
async fn get_dead_letter_stats(&self) -> Result<DeadLetterStats, RouterError>;
}
#[async_trait]
pub trait CircuitBreaker: Send + Sync {
async fn record_success(&self);
async fn record_failure(&self);
async fn is_open(&self) -> bool;
async fn get_state(&self) -> CircuitBreakerState;
}
pub trait MetricsCollector: Send + Sync {
fn record_message_routed(&self, message: &FipaMessage, duration: std::time::Duration);
fn record_routing_error(&self, error: &RouterError);
fn record_delivery_metrics(&self, success: bool, duration: std::time::Duration);
fn record_conversation_created(&self);
fn record_agent_registered(&self, agent_id: AgentId);
fn record_agent_deregistered(&self, agent_id: AgentId);
}
#[derive(Debug, Clone)]
pub struct ConversationStats {
pub total_active: usize,
pub total_created: MessageCount,
pub average_duration_ms: u64,
pub average_message_count: f64,
pub participants_distribution: HashMap<usize, usize>, }
#[derive(Debug, Clone)]
pub struct DeadLetterStats {
pub total_messages: MessageCount,
pub messages_by_reason: HashMap<FailureReason, MessageCount>,
pub oldest_message_age_ms: Option<u64>,
pub queue_size_bytes: usize,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CircuitBreakerState {
Closed,
HalfOpen,
Open,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum RetryReason {
NetworkError,
QueueFull,
AgentUnavailable,
ResourceExhausted,
Timeout,
}
#[derive(Debug, Clone)]
pub struct DetailedHealthStatus {
pub overall: HealthStatus,
pub components: HashMap<String, ComponentHealth>,
pub uptime_ms: u64,
pub last_error: Option<String>,
}
#[derive(Debug, Clone)]
pub struct ComponentHealth {
pub status: HealthStatus,
pub last_check: MessageTimestamp,
pub error_count: u32,
pub metrics: HashMap<String, f64>,
}