Expand description
High-performance async message router for agent communication Core Message Router for Caxton Multi-Agent System
This module implements a high-performance, async message router that enables agents to communicate without knowing infrastructure details. It provides:
- 100K+ messages/second throughput through batching and connection pooling
- Sub-millisecond local routing via O(1) HashMap lookups
- Fault-tolerant remote routing with circuit breakers and retries
- Conversation context management for multi-turn dialogues
- Complete observability with OpenTelemetry integration
- Type safety using domain types to eliminate primitive obsession
§Architecture Overview
The message router follows a coordination-first architecture (ADR-0014) with local SQLite storage and SWIM gossip protocol for distributed coordination.
§Core Components
- [
MessageRouter]: Central coordination hub for message routing - [
DeliveryEngine]: Handles actual message delivery (local/remote) - [
ConversationManager]: Manages multi-turn conversation state - [
AgentRegistry]: O(1) agent lookup with capability indexing - [
FailureHandler]: Comprehensive error handling with retries and dead letter queue
§Message Flow
Client -> MessageRouter -> AgentRegistry -> DeliveryEngine -> Agent
| | |
v v v
ConversationMgr Capability Index Local/Remote
| | Delivery
v v
SQLite Storage Gossip Protocol§Performance Characteristics
- Local routing: < 1ms P99 latency
- Remote routing: < 5ms P99 latency
- Throughput: 100,000+ messages/second sustained
- Memory usage: O(agents + conversations) with bounded caches
- Agent lookup: O(1) time complexity
- Capability discovery: O(1) with hash indexing
§Usage Example
use caxton::message_router::{MessageRouter, MessageRouterImpl, RouterConfig, FipaMessage,
Performative, MessageContent, MessageId, MessageTimestamp, DeliveryOptions};
use caxton::domain_types::AgentId;
// Create router with production configuration
let config = RouterConfig::production();
let router = MessageRouterImpl::new(config).await?;
// Start background processing
router.start().await?;
// Route a message
let message = FipaMessage {
performative: Performative::Inform,
sender: AgentId::generate(),
receiver: AgentId::generate(),
content: MessageContent::try_new("Hello, world!".to_string().into_bytes()).unwrap(),
language: None,
ontology: None,
protocol: None,
conversation_id: None,
reply_with: None,
in_reply_to: None,
message_id: MessageId::generate(),
created_at: MessageTimestamp::now(),
trace_context: None,
delivery_options: DeliveryOptions::default(),
};
let message_id = router.route_message(message).await?;
println!("Message routed with ID: {}", message_id);
// Graceful shutdown
router.shutdown().await?;§Configuration
The router supports development and production configurations:
use caxton::message_router::{RouterConfig, ChannelCapacity, MessageTimeoutMs};
// Development: High observability, smaller queues
let dev_config = RouterConfig::development();
// Production: Optimized for throughput and efficiency
let prod_config = RouterConfig::production();
// Custom configuration
let custom_config = RouterConfig::builder()
.inbound_queue_size(ChannelCapacity::try_new(50_000).unwrap())
.message_timeout_ms(MessageTimeoutMs::try_new(15_000).unwrap())
.build()?;§Error Handling
The router provides comprehensive error handling:
- Validation errors: Invalid message format or size
- Routing errors: Agent not found, network failures
- Resource errors: Queue full, memory exhausted
- Timeout errors: Operation exceeded configured limits
Failed messages are automatically retried with exponential backoff. Undeliverable messages are moved to a dead letter queue for analysis.
§Observability
Complete observability through OpenTelemetry:
- Traces: End-to-end message flow with correlation
- Metrics: Throughput, latency, error rates, queue depths
- Logs: Structured logging with trace correlation
- Health checks: Component health and performance monitoring
§Thread Safety
All components are thread-safe and optimized for concurrent access:
- Lock-free data structures (DashMap, atomic operations)
- Async/await throughout for non-blocking operations
- Connection pooling for efficient resource usage
- Bounded queues for back-pressure control
Re-exports§
pub use config::ConfigError;pub use config::RouterConfig;pub use config::RouterConfigBuilder;pub use router::MessageRouterImpl;pub use traits::MessageRouter;pub use domain_types::*;pub use traits::*;
Modules§
- config
- Router configuration for development and production environments
- domain_
types - Domain types for the message router module
- router
- Main message router implementation
- traits
- Trait definitions for message router components