#![allow(unused_variables, unused_mut, dead_code)]
#![allow(clippy::cast_precision_loss)]
use caxton::message_router::{
AgentId, AgentName, AgentQueueSize, AgentState, ConversationId, DeliveryOptions, FipaMessage,
LocalAgent, MessageContent, MessageId, MessageRouter, MessageRouterImpl, MessageTimestamp,
Performative, RouterConfig, RouterError,
};
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
fn create_test_message(sender: AgentId, receiver: AgentId) -> FipaMessage {
FipaMessage {
performative: Performative::Inform,
sender,
receiver,
content: MessageContent::try_new("Test message content".as_bytes().to_vec()).unwrap(),
message_id: MessageId::generate(),
conversation_id: Some(ConversationId::generate()),
reply_with: None,
in_reply_to: None,
protocol: None,
language: None,
ontology: None,
created_at: MessageTimestamp::now(),
trace_context: None,
delivery_options: DeliveryOptions::default(),
}
}
fn create_test_agent(id: u32) -> LocalAgent {
LocalAgent {
id: AgentId::generate(),
name: AgentName::try_new(format!("test-agent-{id}")).unwrap(),
state: AgentState::Running,
capabilities: vec![],
last_heartbeat: MessageTimestamp::now(),
queue_size: AgentQueueSize::default(),
}
}
#[tokio::test]
async fn test_async_non_blocking_routing() {
let config = RouterConfig::testing();
let router = MessageRouterImpl::new(config).await.unwrap();
router.start().await.unwrap();
let agent1 = create_test_agent(1);
let agent2 = create_test_agent(2);
let message = create_test_message(agent1.id, agent2.id);
let start = std::time::Instant::now();
let message_id = router.route_message(message).await.unwrap();
let duration = start.elapsed();
assert!(
duration < Duration::from_millis(10),
"Routing should be non-blocking"
);
assert_ne!(
message_id,
MessageId::generate(),
"Should return actual message ID"
);
router.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_route_message_by_agent_id() {
let config = RouterConfig::testing();
let router = Arc::new(MessageRouterImpl::new(config).await.unwrap());
router.start().await.unwrap();
let (_tx1, _rx1) = mpsc::channel::<FipaMessage>(10);
let (_tx2, _rx2) = mpsc::channel::<FipaMessage>(10);
let agent1 = create_test_agent(1);
let agent2 = create_test_agent(2);
let message = create_test_message(agent1.id, agent2.id);
let msg_id = message.message_id;
let result = router.route_message(message).await;
assert!(result.is_ok() || matches!(result, Err(RouterError::AgentNotFound { .. })));
router.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_agent_registration_deregistration() {
let config = RouterConfig::testing();
let router = MessageRouterImpl::new(config).await.unwrap();
router.start().await.unwrap();
let agent = create_test_agent(1);
let agent_id = agent.id;
let stats = router.get_stats().await.unwrap();
router.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_graceful_failure_handling() {
let config = RouterConfig::testing();
let router = MessageRouterImpl::new(config).await.unwrap();
router.start().await.unwrap();
let message = create_test_message(AgentId::generate(), AgentId::generate());
let result = router.route_message(message).await;
assert!(result.is_ok() || result.is_err());
tokio::time::sleep(Duration::from_millis(100)).await;
let stats = router.get_stats().await.unwrap();
router.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_conversation_context_maintenance() {
let config = RouterConfig::testing();
let router = Arc::new(MessageRouterImpl::new(config).await.unwrap());
router.start().await.unwrap();
let agent1 = create_test_agent(1);
let agent2 = create_test_agent(2);
let conversation_id = ConversationId::generate();
for i in 0..3 {
let mut message = create_test_message(
if i % 2 == 0 { agent1.id } else { agent2.id },
if i % 2 == 0 { agent2.id } else { agent1.id },
);
message.conversation_id = Some(conversation_id);
if i > 0 {
message.in_reply_to = Some(MessageId::generate());
}
router.route_message(message).await.ok();
}
tokio::time::sleep(Duration::from_millis(100)).await;
let stats = router.get_stats().await.unwrap();
assert!(
stats.total_conversations.into_inner() > 0,
"Should track conversations"
);
router.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_trace_span_observability() {
let config = RouterConfig::testing();
let router = MessageRouterImpl::new(config).await.unwrap();
router.start().await.unwrap();
let message = create_test_message(AgentId::generate(), AgentId::generate());
let result = router.route_message(message).await;
assert!(result.is_ok() || result.is_err());
router.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_local_agent_message_routing_works() {
let config = RouterConfig::testing();
let router = Arc::new(MessageRouterImpl::new(config).await.unwrap());
router.start().await.unwrap();
let (tx1, mut rx1) = mpsc::channel::<FipaMessage>(10);
let (tx2, mut rx2) = mpsc::channel::<FipaMessage>(10);
let agent1 = create_test_agent(1);
let agent2 = create_test_agent(2);
let message = create_test_message(agent1.id, agent2.id);
let msg_copy = message.clone();
let result = router.route_message(message).await;
router.shutdown().await.unwrap();
}
#[tokio::test]
#[ignore = "Performance test - Run with: cargo test --ignored"]
async fn test_performance_100k_messages_per_second() {
let config = RouterConfig::production();
let router = Arc::new(MessageRouterImpl::new(config).await.unwrap());
router.start().await.unwrap();
let mut agents = Vec::new();
for i in 0..100 {
agents.push(create_test_agent(i));
}
let start = std::time::Instant::now();
let message_count = 100_000;
let mut handles = Vec::new();
for i in 0..message_count {
let router_clone = router.clone();
let sender = agents[i % 100].id;
let receiver = agents[(i + 1) % 100].id;
let handle = tokio::spawn(async move {
let message = create_test_message(sender, receiver);
router_clone.route_message(message).await
});
handles.push(handle);
if handles.len() >= 1000 {
for h in handles.drain(..) {
h.await.ok();
}
}
}
for h in handles {
h.await.ok();
}
let duration = start.elapsed();
let msgs_per_sec = (message_count as f64) / duration.as_secs_f64();
println!("Performance: {msgs_per_sec:.0} messages/second");
assert!(
msgs_per_sec >= 100_000.0,
"Should achieve 100K msgs/sec, got {msgs_per_sec:.0}"
);
router.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_no_message_loss_normal_operation() {
let config = RouterConfig::testing();
let router = Arc::new(MessageRouterImpl::new(config).await.unwrap());
router.start().await.unwrap();
let message_count = 1000;
let mut sent_ids = HashSet::new();
for i in 0..message_count {
let message = create_test_message(AgentId::generate(), AgentId::generate());
sent_ids.insert(message.message_id);
router.route_message(message).await.ok();
}
tokio::time::sleep(Duration::from_millis(500)).await;
let stats = router.get_stats().await.unwrap();
let total_processed = stats.total_messages_processed.into_inner();
assert!(total_processed > 0, "Messages should be processed");
router.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_routing_scenario_high_concurrency() {
let config = RouterConfig::testing();
let router = Arc::new(MessageRouterImpl::new(config).await.unwrap());
router.start().await.unwrap();
let mut handles = Vec::new();
for i in 0..100 {
let router_clone = router.clone();
let handle = tokio::spawn(async move {
let message = create_test_message(AgentId::generate(), AgentId::generate());
router_clone.route_message(message).await
});
handles.push(handle);
}
let mut success_count = 0;
for h in handles {
if h.await.unwrap().is_ok() {
success_count += 1;
}
}
assert!(success_count > 0, "Some messages should route successfully");
router.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_end_to_end_delivery_integration() {
let config = RouterConfig::testing();
let router = Arc::new(MessageRouterImpl::new(config).await.unwrap());
router.start().await.unwrap();
router.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_metrics_track_routing_performance() {
let config = RouterConfig::testing();
let router = Arc::new(MessageRouterImpl::new(config).await.unwrap());
router.start().await.unwrap();
for _ in 0..10 {
let message = create_test_message(AgentId::generate(), AgentId::generate());
router.route_message(message).await.ok();
}
tokio::time::sleep(Duration::from_millis(100)).await;
let stats = router.get_stats().await.unwrap();
assert!(
stats.total_messages_processed.into_inner() > 0,
"Should track message count"
);
assert!(stats.messages_per_second >= 0.0, "Should track throughput");
assert!(stats.routing_latency_p50 > 0, "Should track latency");
router.shutdown().await.unwrap();
}