Module message_router

Source
Expand description

High-performance async message router for agent communication Core Message Router for Caxton Multi-Agent System

This module implements a high-performance, async message router that enables agents to communicate without knowing infrastructure details. It provides:

  • 100K+ messages/second throughput through batching and connection pooling
  • Sub-millisecond local routing via O(1) HashMap lookups
  • Fault-tolerant remote routing with circuit breakers and retries
  • Conversation context management for multi-turn dialogues
  • Complete observability with OpenTelemetry integration
  • Type safety using domain types to eliminate primitive obsession

§Architecture Overview

The message router follows a coordination-first architecture (ADR-0014) with local SQLite storage and SWIM gossip protocol for distributed coordination.

§Core Components

  • [MessageRouter]: Central coordination hub for message routing
  • [DeliveryEngine]: Handles actual message delivery (local/remote)
  • [ConversationManager]: Manages multi-turn conversation state
  • [AgentRegistry]: O(1) agent lookup with capability indexing
  • [FailureHandler]: Comprehensive error handling with retries and dead letter queue

§Message Flow

Client -> MessageRouter -> AgentRegistry -> DeliveryEngine -> Agent
             |               |                    |
             v               v                    v
        ConversationMgr  Capability Index   Local/Remote
             |               |              Delivery
             v               v
         SQLite Storage  Gossip Protocol

§Performance Characteristics

  • Local routing: < 1ms P99 latency
  • Remote routing: < 5ms P99 latency
  • Throughput: 100,000+ messages/second sustained
  • Memory usage: O(agents + conversations) with bounded caches
  • Agent lookup: O(1) time complexity
  • Capability discovery: O(1) with hash indexing

§Usage Example

use caxton::message_router::{MessageRouter, MessageRouterImpl, RouterConfig, FipaMessage,
    Performative, MessageContent, MessageId, MessageTimestamp, DeliveryOptions};
use caxton::domain_types::AgentId;

// Create router with production configuration
let config = RouterConfig::production();
let router = MessageRouterImpl::new(config).await?;

// Start background processing
router.start().await?;

// Route a message
let message = FipaMessage {
    performative: Performative::Inform,
    sender: AgentId::generate(),
    receiver: AgentId::generate(),
    content: MessageContent::try_new("Hello, world!".to_string().into_bytes()).unwrap(),
    language: None,
    ontology: None,
    protocol: None,
    conversation_id: None,
    reply_with: None,
    in_reply_to: None,
    message_id: MessageId::generate(),
    created_at: MessageTimestamp::now(),
    trace_context: None,
    delivery_options: DeliveryOptions::default(),
};
let message_id = router.route_message(message).await?;
println!("Message routed with ID: {}", message_id);

// Graceful shutdown
router.shutdown().await?;

§Configuration

The router supports development and production configurations:

use caxton::message_router::{RouterConfig, ChannelCapacity, MessageTimeoutMs};

// Development: High observability, smaller queues
let dev_config = RouterConfig::development();

// Production: Optimized for throughput and efficiency
let prod_config = RouterConfig::production();

// Custom configuration
let custom_config = RouterConfig::builder()
    .inbound_queue_size(ChannelCapacity::try_new(50_000).unwrap())
    .message_timeout_ms(MessageTimeoutMs::try_new(15_000).unwrap())
    .build()?;

§Error Handling

The router provides comprehensive error handling:

  • Validation errors: Invalid message format or size
  • Routing errors: Agent not found, network failures
  • Resource errors: Queue full, memory exhausted
  • Timeout errors: Operation exceeded configured limits

Failed messages are automatically retried with exponential backoff. Undeliverable messages are moved to a dead letter queue for analysis.

§Observability

Complete observability through OpenTelemetry:

  • Traces: End-to-end message flow with correlation
  • Metrics: Throughput, latency, error rates, queue depths
  • Logs: Structured logging with trace correlation
  • Health checks: Component health and performance monitoring

§Thread Safety

All components are thread-safe and optimized for concurrent access:

  • Lock-free data structures (DashMap, atomic operations)
  • Async/await throughout for non-blocking operations
  • Connection pooling for efficient resource usage
  • Bounded queues for back-pressure control

Re-exports§

pub use config::ConfigError;
pub use config::RouterConfig;
pub use config::RouterConfigBuilder;
pub use router::MessageRouterImpl;
pub use traits::MessageRouter;
pub use domain_types::*;
pub use traits::*;

Modules§

config
Router configuration for development and production environments
domain_types
Domain types for the message router module
router
Main message router implementation
traits
Trait definitions for message router components