1use std::collections::HashMap;
7use uuid::Uuid;
8
9#[derive(Debug, Clone)]
11pub enum AgentPayload {
12 TaskRequest {
14 description: String,
15 args: HashMap<String, String>,
16 },
17 TaskResult { success: bool, output: String },
19 FactShare { key: String, value: String },
21 StatusQuery,
23 StatusResponse {
25 agent_name: String,
26 active: bool,
27 pending_tasks: usize,
28 },
29 Shutdown,
31 Progress {
33 task_id: String,
34 percent: f32,
35 message: String,
36 },
37 Error {
39 code: String,
40 message: String,
41 recoverable: bool,
42 },
43 Query {
45 topic: String,
46 context: HashMap<String, String>,
47 },
48 Response { topic: String, answer: String },
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
54pub enum MessagePriority {
55 Low = 0,
56 Normal = 1,
57 High = 2,
58 Critical = 3,
59}
60
61#[derive(Debug, Clone)]
63pub struct AgentEnvelope {
64 pub id: Uuid,
66 pub from: Uuid,
68 pub to: Uuid,
70 pub payload: AgentPayload,
72 pub created_at: chrono::DateTime<chrono::Utc>,
74 pub correlation_id: Option<Uuid>,
76 pub priority: MessagePriority,
78}
79
80impl AgentEnvelope {
81 pub fn new(from: Uuid, to: Uuid, payload: AgentPayload) -> Self {
83 Self {
84 id: Uuid::new_v4(),
85 from,
86 to,
87 payload,
88 created_at: chrono::Utc::now(),
89 correlation_id: None,
90 priority: MessagePriority::Normal,
91 }
92 }
93
94 pub fn with_correlation(mut self, id: Uuid) -> Self {
96 self.correlation_id = Some(id);
97 self
98 }
99
100 pub fn with_priority(mut self, priority: MessagePriority) -> Self {
102 self.priority = priority;
103 self
104 }
105}
106
107pub struct MessageBus {
110 mailboxes: HashMap<Uuid, Vec<AgentEnvelope>>,
111 max_mailbox_size: usize,
112}
113
114impl MessageBus {
115 pub fn new(max_mailbox_size: usize) -> Self {
117 Self {
118 mailboxes: HashMap::new(),
119 max_mailbox_size,
120 }
121 }
122
123 pub fn register(&mut self, agent_id: Uuid) {
125 self.mailboxes.entry(agent_id).or_default();
126 }
127
128 pub fn unregister(&mut self, agent_id: &Uuid) {
130 self.mailboxes.remove(agent_id);
131 }
132
133 pub fn send(&mut self, envelope: AgentEnvelope) -> Result<(), String> {
136 let mailbox = self
137 .mailboxes
138 .get_mut(&envelope.to)
139 .ok_or_else(|| format!("Agent {} not registered", envelope.to))?;
140
141 if mailbox.len() >= self.max_mailbox_size {
142 return Err(format!(
143 "Mailbox for agent {} is full (max {})",
144 envelope.to, self.max_mailbox_size
145 ));
146 }
147
148 let pos = mailbox
151 .iter()
152 .position(|e| e.priority < envelope.priority)
153 .unwrap_or(mailbox.len());
154 mailbox.insert(pos, envelope);
155 Ok(())
156 }
157
158 pub fn receive(&mut self, agent_id: &Uuid) -> Option<AgentEnvelope> {
160 self.mailboxes.get_mut(agent_id).and_then(|mb| {
161 if mb.is_empty() {
162 None
163 } else {
164 Some(mb.remove(0))
165 }
166 })
167 }
168
169 pub fn peek(&self, agent_id: &Uuid) -> Option<&AgentEnvelope> {
171 self.mailboxes.get(agent_id).and_then(|mb| mb.first())
172 }
173
174 pub fn pending_count(&self, agent_id: &Uuid) -> usize {
176 self.mailboxes.get(agent_id).map_or(0, |mb| mb.len())
177 }
178
179 pub fn pending_count_all(&self) -> usize {
181 self.mailboxes.values().map(|mb| mb.len()).sum()
182 }
183
184 pub fn mailbox_count(&self) -> usize {
186 self.mailboxes.len()
187 }
188}
189
190impl Default for MessageBus {
191 fn default() -> Self {
192 Self::new(1000)
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199
200 #[test]
201 fn test_message_bus_register_and_unregister() {
202 let mut bus = MessageBus::new(10);
203 let id = Uuid::new_v4();
204 bus.register(id);
205 assert_eq!(bus.mailbox_count(), 1);
206
207 bus.unregister(&id);
208 assert_eq!(bus.mailbox_count(), 0);
209 }
210
211 #[test]
212 fn test_message_bus_send_and_receive() {
213 let mut bus = MessageBus::new(10);
214 let sender = Uuid::new_v4();
215 let receiver = Uuid::new_v4();
216 bus.register(sender);
217 bus.register(receiver);
218
219 let envelope = AgentEnvelope::new(sender, receiver, AgentPayload::StatusQuery);
220 bus.send(envelope).unwrap();
221
222 assert_eq!(bus.pending_count(&receiver), 1);
223
224 let msg = bus.receive(&receiver).unwrap();
225 assert_eq!(msg.from, sender);
226 assert!(matches!(msg.payload, AgentPayload::StatusQuery));
227 assert_eq!(bus.pending_count(&receiver), 0);
228 }
229
230 #[test]
231 fn test_message_bus_send_to_unregistered() {
232 let mut bus = MessageBus::new(10);
233 let sender = Uuid::new_v4();
234 let ghost = Uuid::new_v4();
235 bus.register(sender);
236
237 let envelope = AgentEnvelope::new(sender, ghost, AgentPayload::Shutdown);
238 let result = bus.send(envelope);
239 assert!(result.is_err());
240 assert!(result.unwrap_err().contains("not registered"));
241 }
242
243 #[test]
244 fn test_message_bus_mailbox_full() {
245 let mut bus = MessageBus::new(2);
246 let sender = Uuid::new_v4();
247 let receiver = Uuid::new_v4();
248 bus.register(sender);
249 bus.register(receiver);
250
251 let e1 = AgentEnvelope::new(sender, receiver, AgentPayload::StatusQuery);
252 let e2 = AgentEnvelope::new(sender, receiver, AgentPayload::StatusQuery);
253 let e3 = AgentEnvelope::new(sender, receiver, AgentPayload::StatusQuery);
254
255 bus.send(e1).unwrap();
256 bus.send(e2).unwrap();
257 let result = bus.send(e3);
258 assert!(result.is_err());
259 assert!(result.unwrap_err().contains("full"));
260 }
261
262 #[test]
263 fn test_message_bus_pending_count_all() {
264 let mut bus = MessageBus::new(10);
265 let a = Uuid::new_v4();
266 let b = Uuid::new_v4();
267 bus.register(a);
268 bus.register(b);
269
270 bus.send(AgentEnvelope::new(a, b, AgentPayload::StatusQuery))
271 .unwrap();
272 bus.send(AgentEnvelope::new(b, a, AgentPayload::Shutdown))
273 .unwrap();
274
275 assert_eq!(bus.pending_count_all(), 2);
276 }
277
278 #[test]
279 fn test_message_bus_peek() {
280 let mut bus = MessageBus::new(10);
281 let a = Uuid::new_v4();
282 let b = Uuid::new_v4();
283 bus.register(a);
284 bus.register(b);
285
286 bus.send(AgentEnvelope::new(a, b, AgentPayload::StatusQuery))
287 .unwrap();
288
289 let peeked = bus.peek(&b);
291 assert!(peeked.is_some());
292 assert_eq!(bus.pending_count(&b), 1);
293
294 let received = bus.receive(&b);
296 assert!(received.is_some());
297 assert_eq!(bus.pending_count(&b), 0);
298 }
299
300 #[test]
301 fn test_envelope_creation() {
302 let from = Uuid::new_v4();
303 let to = Uuid::new_v4();
304 let envelope = AgentEnvelope::new(
305 from,
306 to,
307 AgentPayload::FactShare {
308 key: "test-key".into(),
309 value: "test-value".into(),
310 },
311 );
312 assert_eq!(envelope.from, from);
313 assert_eq!(envelope.to, to);
314 if let AgentPayload::FactShare { key, value } = &envelope.payload {
315 assert_eq!(key, "test-key");
316 assert_eq!(value, "test-value");
317 } else {
318 panic!("Expected FactShare payload");
319 }
320 }
321
322 #[test]
325 fn test_payload_progress() {
326 let payload = AgentPayload::Progress {
327 task_id: "task-1".into(),
328 percent: 0.75,
329 message: "Almost done".into(),
330 };
331 if let AgentPayload::Progress {
332 task_id,
333 percent,
334 message,
335 } = &payload
336 {
337 assert_eq!(task_id, "task-1");
338 assert!((percent - 0.75).abs() < f32::EPSILON);
339 assert_eq!(message, "Almost done");
340 } else {
341 panic!("Expected Progress");
342 }
343 }
344
345 #[test]
346 fn test_payload_error_recoverable() {
347 let payload = AgentPayload::Error {
348 code: "E001".into(),
349 message: "Something went wrong".into(),
350 recoverable: true,
351 };
352 if let AgentPayload::Error {
353 code, recoverable, ..
354 } = &payload
355 {
356 assert_eq!(code, "E001");
357 assert!(recoverable);
358 } else {
359 panic!("Expected Error");
360 }
361 }
362
363 #[test]
364 fn test_payload_query_response() {
365 let query = AgentPayload::Query {
366 topic: "weather".into(),
367 context: HashMap::from([("city".into(), "SF".into())]),
368 };
369 let response = AgentPayload::Response {
370 topic: "weather".into(),
371 answer: "Sunny".into(),
372 };
373 if let AgentPayload::Query { topic, context } = &query {
374 assert_eq!(topic, "weather");
375 assert_eq!(context.get("city").unwrap(), "SF");
376 } else {
377 panic!("Expected Query");
378 }
379 if let AgentPayload::Response { topic, answer } = &response {
380 assert_eq!(topic, "weather");
381 assert_eq!(answer, "Sunny");
382 } else {
383 panic!("Expected Response");
384 }
385 }
386
387 #[test]
388 fn test_envelope_correlation_id() {
389 let from = Uuid::new_v4();
390 let to = Uuid::new_v4();
391 let corr = Uuid::new_v4();
392 let envelope =
393 AgentEnvelope::new(from, to, AgentPayload::StatusQuery).with_correlation(corr);
394 assert_eq!(envelope.correlation_id, Some(corr));
395 }
396
397 #[test]
398 fn test_envelope_priority_default_normal() {
399 let from = Uuid::new_v4();
400 let to = Uuid::new_v4();
401 let envelope = AgentEnvelope::new(from, to, AgentPayload::StatusQuery);
402 assert_eq!(envelope.priority, MessagePriority::Normal);
403 }
404
405 #[test]
406 fn test_envelope_with_priority_critical() {
407 let from = Uuid::new_v4();
408 let to = Uuid::new_v4();
409 let envelope = AgentEnvelope::new(from, to, AgentPayload::Shutdown)
410 .with_priority(MessagePriority::Critical);
411 assert_eq!(envelope.priority, MessagePriority::Critical);
412 }
413
414 #[test]
415 fn test_envelope_builder_chain() {
416 let from = Uuid::new_v4();
417 let to = Uuid::new_v4();
418 let corr = Uuid::new_v4();
419 let envelope = AgentEnvelope::new(from, to, AgentPayload::StatusQuery)
420 .with_correlation(corr)
421 .with_priority(MessagePriority::High);
422 assert_eq!(envelope.correlation_id, Some(corr));
423 assert_eq!(envelope.priority, MessagePriority::High);
424 }
425
426 #[test]
429 fn test_priority_queue_critical_first() {
430 let mut bus = MessageBus::new(10);
431 let a = Uuid::new_v4();
432 let b = Uuid::new_v4();
433 bus.register(b);
434
435 bus.send(
437 AgentEnvelope::new(a, b, AgentPayload::StatusQuery)
438 .with_priority(MessagePriority::Normal),
439 )
440 .unwrap();
441 bus.send(
442 AgentEnvelope::new(a, b, AgentPayload::Shutdown)
443 .with_priority(MessagePriority::Critical),
444 )
445 .unwrap();
446
447 let first = bus.receive(&b).unwrap();
449 assert_eq!(first.priority, MessagePriority::Critical);
450 let second = bus.receive(&b).unwrap();
451 assert_eq!(second.priority, MessagePriority::Normal);
452 }
453
454 #[test]
455 fn test_priority_queue_fifo_same_priority() {
456 let mut bus = MessageBus::new(10);
457 let a = Uuid::new_v4();
458 let b = Uuid::new_v4();
459 bus.register(b);
460
461 let e1 = AgentEnvelope::new(
462 a,
463 b,
464 AgentPayload::FactShare {
465 key: "first".into(),
466 value: "1".into(),
467 },
468 );
469 let e2 = AgentEnvelope::new(
470 a,
471 b,
472 AgentPayload::FactShare {
473 key: "second".into(),
474 value: "2".into(),
475 },
476 );
477 let id1 = e1.id;
478 let id2 = e2.id;
479
480 bus.send(e1).unwrap();
481 bus.send(e2).unwrap();
482
483 let first = bus.receive(&b).unwrap();
485 assert_eq!(first.id, id1);
486 let second = bus.receive(&b).unwrap();
487 assert_eq!(second.id, id2);
488 }
489
490 #[test]
491 fn test_priority_queue_mixed_priorities() {
492 let mut bus = MessageBus::new(10);
493 let a = Uuid::new_v4();
494 let b = Uuid::new_v4();
495 bus.register(b);
496
497 bus.send(
498 AgentEnvelope::new(a, b, AgentPayload::StatusQuery).with_priority(MessagePriority::Low),
499 )
500 .unwrap();
501 bus.send(
502 AgentEnvelope::new(a, b, AgentPayload::StatusQuery)
503 .with_priority(MessagePriority::High),
504 )
505 .unwrap();
506 bus.send(
507 AgentEnvelope::new(a, b, AgentPayload::StatusQuery)
508 .with_priority(MessagePriority::Normal),
509 )
510 .unwrap();
511 bus.send(
512 AgentEnvelope::new(a, b, AgentPayload::Shutdown)
513 .with_priority(MessagePriority::Critical),
514 )
515 .unwrap();
516
517 assert_eq!(bus.receive(&b).unwrap().priority, MessagePriority::Critical);
518 assert_eq!(bus.receive(&b).unwrap().priority, MessagePriority::High);
519 assert_eq!(bus.receive(&b).unwrap().priority, MessagePriority::Normal);
520 assert_eq!(bus.receive(&b).unwrap().priority, MessagePriority::Low);
521 }
522
523 #[test]
524 fn test_priority_queue_empty() {
525 let mut bus = MessageBus::new(10);
526 let a = Uuid::new_v4();
527 bus.register(a);
528 assert!(bus.receive(&a).is_none());
529 assert!(bus.peek(&a).is_none());
530 }
531
532 #[test]
533 fn test_message_bus_priority_ordering() {
534 let mut bus = MessageBus::new(10);
535 let a = Uuid::new_v4();
536 let b = Uuid::new_v4();
537 bus.register(b);
538
539 bus.send(
541 AgentEnvelope::new(a, b, AgentPayload::StatusQuery)
542 .with_priority(MessagePriority::Normal),
543 )
544 .unwrap();
545 bus.send(
546 AgentEnvelope::new(a, b, AgentPayload::Shutdown)
547 .with_priority(MessagePriority::Critical),
548 )
549 .unwrap();
550
551 let peeked = bus.peek(&b).unwrap();
553 assert_eq!(peeked.priority, MessagePriority::Critical);
554 }
555}