caxton 0.1.4

A secure WebAssembly runtime for multi-agent systems
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
//! Test-Driven Development tests for Core Message Router
//! Following red-green-refactor cycle
//!
//! These tests are written FIRST, before implementation, to drive the design

#![allow(unused_variables, unused_mut, dead_code)]
#![allow(clippy::cast_precision_loss)] // For performance measurement calculations

use caxton::message_router::{
    AgentId, AgentName, AgentQueueSize, AgentState, ConversationId, DeliveryOptions, FipaMessage,
    LocalAgent, MessageContent, MessageId, MessageRouter, MessageRouterImpl, MessageTimestamp,
    Performative, RouterConfig, RouterError,
};
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;

/// Helper to create a test message
fn create_test_message(sender: AgentId, receiver: AgentId) -> FipaMessage {
    FipaMessage {
        performative: Performative::Inform,
        sender,
        receiver,
        content: MessageContent::try_new("Test message content".as_bytes().to_vec()).unwrap(),
        message_id: MessageId::generate(),
        conversation_id: Some(ConversationId::generate()),
        reply_with: None,
        in_reply_to: None,
        protocol: None,
        language: None,
        ontology: None,
        created_at: MessageTimestamp::now(),
        trace_context: None,
        delivery_options: DeliveryOptions::default(),
    }
}

/// Helper to create test agent
fn create_test_agent(id: u32) -> LocalAgent {
    LocalAgent {
        id: AgentId::generate(),
        name: AgentName::try_new(format!("test-agent-{id}")).unwrap(),
        state: AgentState::Running,
        capabilities: vec![],
        last_heartbeat: MessageTimestamp::now(),
        queue_size: AgentQueueSize::default(),
    }
}

// ============================================================================
// ACCEPTANCE CRITERIA 1: Async message router processes messages without blocking
// ============================================================================

#[tokio::test]
async fn test_async_non_blocking_routing() {
    // Given: A message router with async processing
    let config = RouterConfig::testing();
    let router = MessageRouterImpl::new(config).await.unwrap();
    router.start().await.unwrap();

    // When: We send a message
    let agent1 = create_test_agent(1);
    let agent2 = create_test_agent(2);
    let message = create_test_message(agent1.id, agent2.id);

    // Then: The route_message call should return immediately (non-blocking)
    let start = std::time::Instant::now();
    let message_id = router.route_message(message).await.unwrap();
    let duration = start.elapsed();

    // Non-blocking means it returns quickly, not waiting for delivery
    assert!(
        duration < Duration::from_millis(10),
        "Routing should be non-blocking"
    );
    assert_ne!(
        message_id,
        MessageId::generate(),
        "Should return actual message ID"
    );

    router.shutdown().await.unwrap();
}

// ============================================================================
// ACCEPTANCE CRITERIA 2: Messages are routed based on agent ID
// ============================================================================

#[tokio::test]
async fn test_route_message_by_agent_id() {
    // Given: Router with registered agents
    let config = RouterConfig::testing();
    let router = Arc::new(MessageRouterImpl::new(config).await.unwrap());
    router.start().await.unwrap();

    // Create test agents with message queues
    let (_tx1, _rx1) = mpsc::channel::<FipaMessage>(10);
    let (_tx2, _rx2) = mpsc::channel::<FipaMessage>(10);

    let agent1 = create_test_agent(1);
    let agent2 = create_test_agent(2);

    // TODO: Need to implement actual message queue injection
    // For now, we'll test that routing attempts to find the agent

    // When: We send a message to agent2
    let message = create_test_message(agent1.id, agent2.id);
    let msg_id = message.message_id;
    let result = router.route_message(message).await;

    // Then: Message should be routed (even if delivery fails due to no queue)
    assert!(result.is_ok() || matches!(result, Err(RouterError::AgentNotFound { .. })));

    router.shutdown().await.unwrap();
}

// ============================================================================
// ACCEPTANCE CRITERIA 3: Router handles agent registration and deregistration
// ============================================================================

#[tokio::test]
async fn test_agent_registration_deregistration() {
    // Given: A message router
    let config = RouterConfig::testing();
    let router = MessageRouterImpl::new(config).await.unwrap();
    router.start().await.unwrap();

    let agent = create_test_agent(1);
    let agent_id = agent.id;

    // When: We register an agent
    // Note: This test reveals we need a public API for registration
    // Currently registration is internal only

    // Then: Agent should be findable
    let stats = router.get_stats().await.unwrap();

    // When: We deregister the agent
    // Note: This test reveals we need a public API for deregistration

    // Then: Agent should not be findable

    router.shutdown().await.unwrap();
}

// ============================================================================
// ACCEPTANCE CRITERIA 4: Message delivery failures are handled gracefully
// ============================================================================

#[tokio::test]
async fn test_graceful_failure_handling() {
    // Given: Router with non-existent receiver
    let config = RouterConfig::testing();
    let router = MessageRouterImpl::new(config).await.unwrap();
    router.start().await.unwrap();

    // When: We send to non-existent agent
    let message = create_test_message(AgentId::generate(), AgentId::generate());
    let result = router.route_message(message).await;

    // Then: Should handle gracefully (not panic)
    assert!(result.is_ok() || result.is_err());

    // And: Stats should reflect the failure
    tokio::time::sleep(Duration::from_millis(100)).await;
    let stats = router.get_stats().await.unwrap();
    // We expect failed deliveries to be tracked

    router.shutdown().await.unwrap();
}

// ============================================================================
// ACCEPTANCE CRITERIA 5: Router maintains conversation context
// ============================================================================

#[tokio::test]
async fn test_conversation_context_maintenance() {
    // Given: Router with conversation tracking
    let config = RouterConfig::testing();
    let router = Arc::new(MessageRouterImpl::new(config).await.unwrap());
    router.start().await.unwrap();

    let agent1 = create_test_agent(1);
    let agent2 = create_test_agent(2);
    let conversation_id = ConversationId::generate();

    // When: We send multiple messages in a conversation
    for i in 0..3 {
        let mut message = create_test_message(
            if i % 2 == 0 { agent1.id } else { agent2.id },
            if i % 2 == 0 { agent2.id } else { agent1.id },
        );
        message.conversation_id = Some(conversation_id);
        if i > 0 {
            message.in_reply_to = Some(MessageId::generate());
        }

        router.route_message(message).await.ok();
    }

    // Then: Conversation should be tracked
    tokio::time::sleep(Duration::from_millis(100)).await;
    let stats = router.get_stats().await.unwrap();

    // Check that conversation tracking is working
    assert!(
        stats.total_conversations.into_inner() > 0,
        "Should track conversations"
    );

    router.shutdown().await.unwrap();
}

// ============================================================================
// ACCEPTANCE CRITERIA 6: Messages include trace and span IDs for observability
// ============================================================================

#[tokio::test]
async fn test_trace_span_observability() {
    // Given: Router with observability
    let config = RouterConfig::testing();
    let router = MessageRouterImpl::new(config).await.unwrap();
    router.start().await.unwrap();

    // When: We route a message
    let message = create_test_message(AgentId::generate(), AgentId::generate());
    let result = router.route_message(message).await;

    // Then: Tracing should be active (we can't directly test spans without a collector)
    // This test validates that the router doesn't panic with tracing enabled
    assert!(result.is_ok() || result.is_err());

    router.shutdown().await.unwrap();
}

// ============================================================================
// DEFINITION OF DONE: Message routing works for local agents
// ============================================================================

#[tokio::test]
async fn test_local_agent_message_routing_works() {
    // This is the KEY test - messages must actually be delivered

    // Given: Router with local agents that have message queues
    let config = RouterConfig::testing();
    let router = Arc::new(MessageRouterImpl::new(config).await.unwrap());
    router.start().await.unwrap();

    // Create agents with actual message channels
    let (tx1, mut rx1) = mpsc::channel::<FipaMessage>(10);
    let (tx2, mut rx2) = mpsc::channel::<FipaMessage>(10);

    let agent1 = create_test_agent(1);
    let agent2 = create_test_agent(2);

    // TODO: We need a way to register agents WITH their message queues
    // This reveals a design issue - how do agents receive messages?

    // When: We route a message from agent1 to agent2
    let message = create_test_message(agent1.id, agent2.id);
    let msg_copy = message.clone();
    let result = router.route_message(message).await;

    // Then: Message should be delivered to agent2's queue
    // THIS IS THE CRITICAL TEST THAT MUST PASS

    router.shutdown().await.unwrap();
}

// ============================================================================
// DEFINITION OF DONE: Performance meets 100K messages/second target
// ============================================================================

#[tokio::test]
#[ignore = "Performance test - Run with: cargo test --ignored"]
async fn test_performance_100k_messages_per_second() {
    // Given: Production configuration
    let config = RouterConfig::production();
    let router = Arc::new(MessageRouterImpl::new(config).await.unwrap());
    router.start().await.unwrap();

    // Setup: Create 100 agents
    let mut agents = Vec::new();
    for i in 0..100 {
        agents.push(create_test_agent(i));
    }

    // When: We send 100,000 messages
    let start = std::time::Instant::now();
    let message_count = 100_000;

    let mut handles = Vec::new();
    for i in 0..message_count {
        let router_clone = router.clone();
        let sender = agents[i % 100].id;
        let receiver = agents[(i + 1) % 100].id;

        let handle = tokio::spawn(async move {
            let message = create_test_message(sender, receiver);
            router_clone.route_message(message).await
        });

        handles.push(handle);

        // Batch spawning to avoid overwhelming tokio
        if handles.len() >= 1000 {
            for h in handles.drain(..) {
                h.await.ok();
            }
        }
    }

    // Wait for remaining
    for h in handles {
        h.await.ok();
    }

    let duration = start.elapsed();

    // Then: Should achieve 100K msgs/sec
    // Safe conversion: precision loss is acceptable for performance display
    let msgs_per_sec = (message_count as f64) / duration.as_secs_f64();
    println!("Performance: {msgs_per_sec:.0} messages/second");

    assert!(
        msgs_per_sec >= 100_000.0,
        "Should achieve 100K msgs/sec, got {msgs_per_sec:.0}"
    );

    router.shutdown().await.unwrap();
}

// ============================================================================
// DEFINITION OF DONE: No message loss under normal operation
// ============================================================================

#[tokio::test]
async fn test_no_message_loss_normal_operation() {
    // Given: Router under normal load
    let config = RouterConfig::testing();
    let router = Arc::new(MessageRouterImpl::new(config).await.unwrap());
    router.start().await.unwrap();

    // When: We send 1000 messages
    let message_count = 1000;
    let mut sent_ids = HashSet::new();

    for i in 0..message_count {
        let message = create_test_message(AgentId::generate(), AgentId::generate());
        sent_ids.insert(message.message_id);
        router.route_message(message).await.ok();
    }

    // Then: All messages should be accounted for (routed or in dead letter queue)
    tokio::time::sleep(Duration::from_millis(500)).await;
    let stats = router.get_stats().await.unwrap();

    let total_processed = stats.total_messages_processed.into_inner();

    // Messages should be processed (successfully or unsuccessfully)
    assert!(total_processed > 0, "Messages should be processed");

    // In production, we'd track successful + failed + dead letter = total sent

    router.shutdown().await.unwrap();
}

// ============================================================================
// DEFINITION OF DONE: Unit tests cover all routing scenarios
// ============================================================================

#[tokio::test]
async fn test_routing_scenario_high_concurrency() {
    // Given: Router with concurrent message flow
    let config = RouterConfig::testing();
    let router = Arc::new(MessageRouterImpl::new(config).await.unwrap());
    router.start().await.unwrap();

    // When: 100 concurrent senders
    let mut handles = Vec::new();
    for i in 0..100 {
        let router_clone = router.clone();
        let handle = tokio::spawn(async move {
            let message = create_test_message(AgentId::generate(), AgentId::generate());
            router_clone.route_message(message).await
        });
        handles.push(handle);
    }

    // Then: All should complete without deadlock
    let mut success_count = 0;
    for h in handles {
        if h.await.unwrap().is_ok() {
            success_count += 1;
        }
    }

    assert!(success_count > 0, "Some messages should route successfully");

    router.shutdown().await.unwrap();
}

// ============================================================================
// DEFINITION OF DONE: Integration tests verify end-to-end delivery
// ============================================================================

#[tokio::test]
async fn test_end_to_end_delivery_integration() {
    // This test verifies the complete message flow from sender to receiver

    // Given: Complete router setup with all components
    let config = RouterConfig::testing();
    let router = Arc::new(MessageRouterImpl::new(config).await.unwrap());
    router.start().await.unwrap();

    // TODO: This test needs actual delivery implementation
    // It should verify:
    // 1. Message accepted by router
    // 2. Agent lookup successful
    // 3. Message queued for delivery
    // 4. Message delivered to agent
    // 5. Delivery confirmation received
    // 6. Stats updated correctly

    router.shutdown().await.unwrap();
}

// ============================================================================
// DEFINITION OF DONE: Metrics track routing performance
// ============================================================================

#[tokio::test]
async fn test_metrics_track_routing_performance() {
    // Given: Router with metrics collection
    let config = RouterConfig::testing();
    let router = Arc::new(MessageRouterImpl::new(config).await.unwrap());
    router.start().await.unwrap();

    // When: We route messages
    for _ in 0..10 {
        let message = create_test_message(AgentId::generate(), AgentId::generate());
        router.route_message(message).await.ok();
    }

    tokio::time::sleep(Duration::from_millis(100)).await;

    // Then: Metrics should be collected
    let stats = router.get_stats().await.unwrap();

    assert!(
        stats.total_messages_processed.into_inner() > 0,
        "Should track message count"
    );
    assert!(stats.messages_per_second >= 0.0, "Should track throughput");
    assert!(stats.routing_latency_p50 > 0, "Should track latency");

    router.shutdown().await.unwrap();
}