Skip to main content

rustant_core/multi/
messaging.rs

1//! Inter-agent messaging — message bus and envelope types.
2//!
3//! Provides an in-process message bus for agents to communicate asynchronously.
4//! Each agent has a mailbox (bounded queue) to prevent memory exhaustion.
5
6use std::collections::HashMap;
7use uuid::Uuid;
8
9/// Payload types for inter-agent communication.
10#[derive(Debug, Clone)]
11pub enum AgentPayload {
12    /// Request to execute a task.
13    TaskRequest {
14        description: String,
15        args: HashMap<String, String>,
16    },
17    /// Result of a completed task.
18    TaskResult { success: bool, output: String },
19    /// Share a fact with another agent.
20    FactShare { key: String, value: String },
21    /// Query another agent's status.
22    StatusQuery,
23    /// Response to a status query.
24    StatusResponse {
25        agent_name: String,
26        active: bool,
27        pending_tasks: usize,
28    },
29    /// Request an agent to shut down.
30    Shutdown,
31    /// Progress update on a task.
32    Progress {
33        task_id: String,
34        percent: f32,
35        message: String,
36    },
37    /// Error report.
38    Error {
39        code: String,
40        message: String,
41        recoverable: bool,
42    },
43    /// Query another agent about a topic.
44    Query {
45        topic: String,
46        context: HashMap<String, String>,
47    },
48    /// Response to a query.
49    Response { topic: String, answer: String },
50}
51
52/// Priority levels for inter-agent messages.
53#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
54pub enum MessagePriority {
55    Low = 0,
56    Normal = 1,
57    High = 2,
58    Critical = 3,
59}
60
61/// An envelope wrapping a payload with routing information.
62#[derive(Debug, Clone)]
63pub struct AgentEnvelope {
64    /// Unique message ID.
65    pub id: Uuid,
66    /// Sender agent ID.
67    pub from: Uuid,
68    /// Recipient agent ID.
69    pub to: Uuid,
70    /// The payload.
71    pub payload: AgentPayload,
72    /// Timestamp of creation.
73    pub created_at: chrono::DateTime<chrono::Utc>,
74    /// Optional correlation ID for request/response pairing.
75    pub correlation_id: Option<Uuid>,
76    /// Message priority.
77    pub priority: MessagePriority,
78}
79
80impl AgentEnvelope {
81    /// Create a new envelope with default priority (Normal) and no correlation ID.
82    pub fn new(from: Uuid, to: Uuid, payload: AgentPayload) -> Self {
83        Self {
84            id: Uuid::new_v4(),
85            from,
86            to,
87            payload,
88            created_at: chrono::Utc::now(),
89            correlation_id: None,
90            priority: MessagePriority::Normal,
91        }
92    }
93
94    /// Set a correlation ID for request/response pairing.
95    pub fn with_correlation(mut self, id: Uuid) -> Self {
96        self.correlation_id = Some(id);
97        self
98    }
99
100    /// Set the message priority.
101    pub fn with_priority(mut self, priority: MessagePriority) -> Self {
102        self.priority = priority;
103        self
104    }
105}
106
107/// In-process message bus for inter-agent communication.
108/// Messages are stored in priority order (highest priority first, FIFO within same priority).
109pub struct MessageBus {
110    mailboxes: HashMap<Uuid, Vec<AgentEnvelope>>,
111    max_mailbox_size: usize,
112}
113
114impl MessageBus {
115    /// Create a new message bus with a maximum mailbox size per agent.
116    pub fn new(max_mailbox_size: usize) -> Self {
117        Self {
118            mailboxes: HashMap::new(),
119            max_mailbox_size,
120        }
121    }
122
123    /// Register a mailbox for an agent.
124    pub fn register(&mut self, agent_id: Uuid) {
125        self.mailboxes.entry(agent_id).or_default();
126    }
127
128    /// Remove a mailbox for an agent, discarding pending messages.
129    pub fn unregister(&mut self, agent_id: &Uuid) {
130        self.mailboxes.remove(agent_id);
131    }
132
133    /// Send a message to an agent's mailbox. Returns Err if the mailbox is full
134    /// or the recipient is not registered. Messages are inserted in priority order.
135    pub fn send(&mut self, envelope: AgentEnvelope) -> Result<(), String> {
136        let mailbox = self
137            .mailboxes
138            .get_mut(&envelope.to)
139            .ok_or_else(|| format!("Agent {} not registered", envelope.to))?;
140
141        if mailbox.len() >= self.max_mailbox_size {
142            return Err(format!(
143                "Mailbox for agent {} is full (max {})",
144                envelope.to, self.max_mailbox_size
145            ));
146        }
147
148        // Insert in sorted position: higher priority first, FIFO within same priority.
149        // We find the first position where the existing message has lower priority.
150        let pos = mailbox
151            .iter()
152            .position(|e| e.priority < envelope.priority)
153            .unwrap_or(mailbox.len());
154        mailbox.insert(pos, envelope);
155        Ok(())
156    }
157
158    /// Receive the highest-priority message from an agent's mailbox.
159    pub fn receive(&mut self, agent_id: &Uuid) -> Option<AgentEnvelope> {
160        self.mailboxes.get_mut(agent_id).and_then(|mb| {
161            if mb.is_empty() {
162                None
163            } else {
164                Some(mb.remove(0))
165            }
166        })
167    }
168
169    /// Peek at the highest-priority message without removing it.
170    pub fn peek(&self, agent_id: &Uuid) -> Option<&AgentEnvelope> {
171        self.mailboxes.get(agent_id).and_then(|mb| mb.first())
172    }
173
174    /// Number of pending messages for a specific agent.
175    pub fn pending_count(&self, agent_id: &Uuid) -> usize {
176        self.mailboxes.get(agent_id).map_or(0, |mb| mb.len())
177    }
178
179    /// Total pending messages across all mailboxes.
180    pub fn pending_count_all(&self) -> usize {
181        self.mailboxes.values().map(|mb| mb.len()).sum()
182    }
183
184    /// Number of registered mailboxes.
185    pub fn mailbox_count(&self) -> usize {
186        self.mailboxes.len()
187    }
188}
189
190impl Default for MessageBus {
191    fn default() -> Self {
192        Self::new(1000)
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199
200    #[test]
201    fn test_message_bus_register_and_unregister() {
202        let mut bus = MessageBus::new(10);
203        let id = Uuid::new_v4();
204        bus.register(id);
205        assert_eq!(bus.mailbox_count(), 1);
206
207        bus.unregister(&id);
208        assert_eq!(bus.mailbox_count(), 0);
209    }
210
211    #[test]
212    fn test_message_bus_send_and_receive() {
213        let mut bus = MessageBus::new(10);
214        let sender = Uuid::new_v4();
215        let receiver = Uuid::new_v4();
216        bus.register(sender);
217        bus.register(receiver);
218
219        let envelope = AgentEnvelope::new(sender, receiver, AgentPayload::StatusQuery);
220        bus.send(envelope).unwrap();
221
222        assert_eq!(bus.pending_count(&receiver), 1);
223
224        let msg = bus.receive(&receiver).unwrap();
225        assert_eq!(msg.from, sender);
226        assert!(matches!(msg.payload, AgentPayload::StatusQuery));
227        assert_eq!(bus.pending_count(&receiver), 0);
228    }
229
230    #[test]
231    fn test_message_bus_send_to_unregistered() {
232        let mut bus = MessageBus::new(10);
233        let sender = Uuid::new_v4();
234        let ghost = Uuid::new_v4();
235        bus.register(sender);
236
237        let envelope = AgentEnvelope::new(sender, ghost, AgentPayload::Shutdown);
238        let result = bus.send(envelope);
239        assert!(result.is_err());
240        assert!(result.unwrap_err().contains("not registered"));
241    }
242
243    #[test]
244    fn test_message_bus_mailbox_full() {
245        let mut bus = MessageBus::new(2);
246        let sender = Uuid::new_v4();
247        let receiver = Uuid::new_v4();
248        bus.register(sender);
249        bus.register(receiver);
250
251        let e1 = AgentEnvelope::new(sender, receiver, AgentPayload::StatusQuery);
252        let e2 = AgentEnvelope::new(sender, receiver, AgentPayload::StatusQuery);
253        let e3 = AgentEnvelope::new(sender, receiver, AgentPayload::StatusQuery);
254
255        bus.send(e1).unwrap();
256        bus.send(e2).unwrap();
257        let result = bus.send(e3);
258        assert!(result.is_err());
259        assert!(result.unwrap_err().contains("full"));
260    }
261
262    #[test]
263    fn test_message_bus_pending_count_all() {
264        let mut bus = MessageBus::new(10);
265        let a = Uuid::new_v4();
266        let b = Uuid::new_v4();
267        bus.register(a);
268        bus.register(b);
269
270        bus.send(AgentEnvelope::new(a, b, AgentPayload::StatusQuery))
271            .unwrap();
272        bus.send(AgentEnvelope::new(b, a, AgentPayload::Shutdown))
273            .unwrap();
274
275        assert_eq!(bus.pending_count_all(), 2);
276    }
277
278    #[test]
279    fn test_message_bus_peek() {
280        let mut bus = MessageBus::new(10);
281        let a = Uuid::new_v4();
282        let b = Uuid::new_v4();
283        bus.register(a);
284        bus.register(b);
285
286        bus.send(AgentEnvelope::new(a, b, AgentPayload::StatusQuery))
287            .unwrap();
288
289        // Peek should not remove
290        let peeked = bus.peek(&b);
291        assert!(peeked.is_some());
292        assert_eq!(bus.pending_count(&b), 1);
293
294        // Receive should remove
295        let received = bus.receive(&b);
296        assert!(received.is_some());
297        assert_eq!(bus.pending_count(&b), 0);
298    }
299
300    #[test]
301    fn test_envelope_creation() {
302        let from = Uuid::new_v4();
303        let to = Uuid::new_v4();
304        let envelope = AgentEnvelope::new(
305            from,
306            to,
307            AgentPayload::FactShare {
308                key: "test-key".into(),
309                value: "test-value".into(),
310            },
311        );
312        assert_eq!(envelope.from, from);
313        assert_eq!(envelope.to, to);
314        if let AgentPayload::FactShare { key, value } = &envelope.payload {
315            assert_eq!(key, "test-key");
316            assert_eq!(value, "test-value");
317        } else {
318            panic!("Expected FactShare payload");
319        }
320    }
321
322    // --- Sprint 4B: New payload variants + MessagePriority ---
323
324    #[test]
325    fn test_payload_progress() {
326        let payload = AgentPayload::Progress {
327            task_id: "task-1".into(),
328            percent: 0.75,
329            message: "Almost done".into(),
330        };
331        if let AgentPayload::Progress {
332            task_id,
333            percent,
334            message,
335        } = &payload
336        {
337            assert_eq!(task_id, "task-1");
338            assert!((percent - 0.75).abs() < f32::EPSILON);
339            assert_eq!(message, "Almost done");
340        } else {
341            panic!("Expected Progress");
342        }
343    }
344
345    #[test]
346    fn test_payload_error_recoverable() {
347        let payload = AgentPayload::Error {
348            code: "E001".into(),
349            message: "Something went wrong".into(),
350            recoverable: true,
351        };
352        if let AgentPayload::Error {
353            code, recoverable, ..
354        } = &payload
355        {
356            assert_eq!(code, "E001");
357            assert!(recoverable);
358        } else {
359            panic!("Expected Error");
360        }
361    }
362
363    #[test]
364    fn test_payload_query_response() {
365        let query = AgentPayload::Query {
366            topic: "weather".into(),
367            context: HashMap::from([("city".into(), "SF".into())]),
368        };
369        let response = AgentPayload::Response {
370            topic: "weather".into(),
371            answer: "Sunny".into(),
372        };
373        if let AgentPayload::Query { topic, context } = &query {
374            assert_eq!(topic, "weather");
375            assert_eq!(context.get("city").unwrap(), "SF");
376        } else {
377            panic!("Expected Query");
378        }
379        if let AgentPayload::Response { topic, answer } = &response {
380            assert_eq!(topic, "weather");
381            assert_eq!(answer, "Sunny");
382        } else {
383            panic!("Expected Response");
384        }
385    }
386
387    #[test]
388    fn test_envelope_correlation_id() {
389        let from = Uuid::new_v4();
390        let to = Uuid::new_v4();
391        let corr = Uuid::new_v4();
392        let envelope =
393            AgentEnvelope::new(from, to, AgentPayload::StatusQuery).with_correlation(corr);
394        assert_eq!(envelope.correlation_id, Some(corr));
395    }
396
397    #[test]
398    fn test_envelope_priority_default_normal() {
399        let from = Uuid::new_v4();
400        let to = Uuid::new_v4();
401        let envelope = AgentEnvelope::new(from, to, AgentPayload::StatusQuery);
402        assert_eq!(envelope.priority, MessagePriority::Normal);
403    }
404
405    #[test]
406    fn test_envelope_with_priority_critical() {
407        let from = Uuid::new_v4();
408        let to = Uuid::new_v4();
409        let envelope = AgentEnvelope::new(from, to, AgentPayload::Shutdown)
410            .with_priority(MessagePriority::Critical);
411        assert_eq!(envelope.priority, MessagePriority::Critical);
412    }
413
414    #[test]
415    fn test_envelope_builder_chain() {
416        let from = Uuid::new_v4();
417        let to = Uuid::new_v4();
418        let corr = Uuid::new_v4();
419        let envelope = AgentEnvelope::new(from, to, AgentPayload::StatusQuery)
420            .with_correlation(corr)
421            .with_priority(MessagePriority::High);
422        assert_eq!(envelope.correlation_id, Some(corr));
423        assert_eq!(envelope.priority, MessagePriority::High);
424    }
425
426    // --- Sprint 4C: Priority queue tests ---
427
428    #[test]
429    fn test_priority_queue_critical_first() {
430        let mut bus = MessageBus::new(10);
431        let a = Uuid::new_v4();
432        let b = Uuid::new_v4();
433        bus.register(b);
434
435        // Send Normal first, then Critical
436        bus.send(
437            AgentEnvelope::new(a, b, AgentPayload::StatusQuery)
438                .with_priority(MessagePriority::Normal),
439        )
440        .unwrap();
441        bus.send(
442            AgentEnvelope::new(a, b, AgentPayload::Shutdown)
443                .with_priority(MessagePriority::Critical),
444        )
445        .unwrap();
446
447        // Critical should come out first
448        let first = bus.receive(&b).unwrap();
449        assert_eq!(first.priority, MessagePriority::Critical);
450        let second = bus.receive(&b).unwrap();
451        assert_eq!(second.priority, MessagePriority::Normal);
452    }
453
454    #[test]
455    fn test_priority_queue_fifo_same_priority() {
456        let mut bus = MessageBus::new(10);
457        let a = Uuid::new_v4();
458        let b = Uuid::new_v4();
459        bus.register(b);
460
461        let e1 = AgentEnvelope::new(
462            a,
463            b,
464            AgentPayload::FactShare {
465                key: "first".into(),
466                value: "1".into(),
467            },
468        );
469        let e2 = AgentEnvelope::new(
470            a,
471            b,
472            AgentPayload::FactShare {
473                key: "second".into(),
474                value: "2".into(),
475            },
476        );
477        let id1 = e1.id;
478        let id2 = e2.id;
479
480        bus.send(e1).unwrap();
481        bus.send(e2).unwrap();
482
483        // Both Normal priority — should come out in FIFO order
484        let first = bus.receive(&b).unwrap();
485        assert_eq!(first.id, id1);
486        let second = bus.receive(&b).unwrap();
487        assert_eq!(second.id, id2);
488    }
489
490    #[test]
491    fn test_priority_queue_mixed_priorities() {
492        let mut bus = MessageBus::new(10);
493        let a = Uuid::new_v4();
494        let b = Uuid::new_v4();
495        bus.register(b);
496
497        bus.send(
498            AgentEnvelope::new(a, b, AgentPayload::StatusQuery).with_priority(MessagePriority::Low),
499        )
500        .unwrap();
501        bus.send(
502            AgentEnvelope::new(a, b, AgentPayload::StatusQuery)
503                .with_priority(MessagePriority::High),
504        )
505        .unwrap();
506        bus.send(
507            AgentEnvelope::new(a, b, AgentPayload::StatusQuery)
508                .with_priority(MessagePriority::Normal),
509        )
510        .unwrap();
511        bus.send(
512            AgentEnvelope::new(a, b, AgentPayload::Shutdown)
513                .with_priority(MessagePriority::Critical),
514        )
515        .unwrap();
516
517        assert_eq!(bus.receive(&b).unwrap().priority, MessagePriority::Critical);
518        assert_eq!(bus.receive(&b).unwrap().priority, MessagePriority::High);
519        assert_eq!(bus.receive(&b).unwrap().priority, MessagePriority::Normal);
520        assert_eq!(bus.receive(&b).unwrap().priority, MessagePriority::Low);
521    }
522
523    #[test]
524    fn test_priority_queue_empty() {
525        let mut bus = MessageBus::new(10);
526        let a = Uuid::new_v4();
527        bus.register(a);
528        assert!(bus.receive(&a).is_none());
529        assert!(bus.peek(&a).is_none());
530    }
531
532    #[test]
533    fn test_message_bus_priority_ordering() {
534        let mut bus = MessageBus::new(10);
535        let a = Uuid::new_v4();
536        let b = Uuid::new_v4();
537        bus.register(b);
538
539        // Send Normal, then Critical
540        bus.send(
541            AgentEnvelope::new(a, b, AgentPayload::StatusQuery)
542                .with_priority(MessagePriority::Normal),
543        )
544        .unwrap();
545        bus.send(
546            AgentEnvelope::new(a, b, AgentPayload::Shutdown)
547                .with_priority(MessagePriority::Critical),
548        )
549        .unwrap();
550
551        // Peek should show Critical
552        let peeked = bus.peek(&b).unwrap();
553        assert_eq!(peeked.priority, MessagePriority::Critical);
554    }
555}