Skip to main content

claude_pool/
messaging.rs

1//! Inter-slot messaging for pool communication.
2//!
3//! Provides a message bus for slots to send and receive messages to each other.
4
5use chrono::{DateTime, Utc};
6use dashmap::DashMap;
7use serde::{Deserialize, Serialize};
8
9use crate::types::SlotId;
10
11/// A message sent from one slot to another.
12#[derive(Serialize, Deserialize, Clone, Debug)]
13pub struct Message {
14    /// Unique message ID.
15    pub id: String,
16    /// Sender slot ID.
17    pub from: SlotId,
18    /// Recipient slot ID.
19    pub to: SlotId,
20    /// Message content.
21    pub content: String,
22    /// Timestamp when message was created.
23    pub timestamp: DateTime<Utc>,
24}
25
26/// Message bus for inter-slot communication.
27///
28/// Implements a simple inbox-based messaging system where each slot
29/// has a queue of messages addressed to it.
30pub struct MessageBus {
31    /// Inboxes keyed by slot ID, containing vectors of messages.
32    inboxes: DashMap<String, Vec<Message>>,
33}
34
35impl Default for MessageBus {
36    fn default() -> Self {
37        Self::new()
38    }
39}
40
41impl MessageBus {
42    /// Create a new message bus.
43    pub fn new() -> Self {
44        Self {
45            inboxes: DashMap::new(),
46        }
47    }
48
49    /// Send a message from one slot to another, returning the message ID.
50    pub fn send(&self, from: SlotId, to: SlotId, content: String) -> String {
51        let message_id = generate_message_id();
52        let to_key = to.0.clone();
53        let message = Message {
54            id: message_id.clone(),
55            from,
56            to,
57            content,
58            timestamp: Utc::now(),
59        };
60
61        self.inboxes.entry(to_key).or_default().push(message);
62
63        message_id
64    }
65
66    /// Read and drain all messages for a slot, returning them in order.
67    pub fn read(&self, slot_id: &SlotId) -> Vec<Message> {
68        self.inboxes
69            .remove(&slot_id.0)
70            .map(|(_, messages)| messages)
71            .unwrap_or_default()
72    }
73
74    /// Peek at all messages for a slot without removing them.
75    pub fn peek(&self, slot_id: &SlotId) -> Vec<Message> {
76        self.inboxes
77            .get(&slot_id.0)
78            .map(|entry| entry.clone())
79            .unwrap_or_default()
80    }
81
82    /// Broadcast a message from one slot to all other slots.
83    ///
84    /// Sends a copy of the message to every slot in `recipients` except the sender.
85    /// Returns the list of message IDs created.
86    pub fn broadcast(&self, from: SlotId, recipients: &[SlotId], content: String) -> Vec<String> {
87        let mut ids = Vec::new();
88        for to in recipients {
89            if *to == from {
90                continue;
91            }
92            let id = self.send(from.clone(), to.clone(), content.clone());
93            ids.push(id);
94        }
95        ids
96    }
97
98    /// Get the count of messages in a slot's inbox.
99    pub fn count(&self, slot_id: &SlotId) -> usize {
100        self.inboxes
101            .get(&slot_id.0)
102            .map(|entry| entry.len())
103            .unwrap_or(0)
104    }
105}
106
107fn generate_message_id() -> String {
108    use std::time::{SystemTime, UNIX_EPOCH};
109    let nanos = SystemTime::now()
110        .duration_since(UNIX_EPOCH)
111        .unwrap_or_default()
112        .as_nanos();
113    format!("msg-{nanos:x}")
114}
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119
120    #[test]
121    fn test_send_and_read() {
122        let bus = MessageBus::new();
123        let slot1 = SlotId("slot-1".to_string());
124        let slot2 = SlotId("slot-2".to_string());
125
126        let msg_id = bus.send(slot1.clone(), slot2.clone(), "hello".to_string());
127        assert!(!msg_id.is_empty());
128
129        let messages = bus.read(&slot2);
130        assert_eq!(messages.len(), 1);
131        assert_eq!(messages[0].id, msg_id);
132        assert_eq!(messages[0].content, "hello");
133        assert_eq!(messages[0].from, slot1);
134        assert_eq!(messages[0].to, slot2);
135
136        // Verify inbox is empty after read
137        let messages_again = bus.read(&slot2);
138        assert!(messages_again.is_empty());
139    }
140
141    #[test]
142    fn test_send_and_peek() {
143        let bus = MessageBus::new();
144        let slot1 = SlotId("slot-1".to_string());
145        let slot2 = SlotId("slot-2".to_string());
146
147        bus.send(slot1.clone(), slot2.clone(), "hello".to_string());
148
149        let messages = bus.peek(&slot2);
150        assert_eq!(messages.len(), 1);
151
152        // Verify inbox still has message after peek
153        let messages_again = bus.peek(&slot2);
154        assert_eq!(messages_again.len(), 1);
155    }
156
157    #[test]
158    fn test_multiple_senders() {
159        let bus = MessageBus::new();
160        let slot1 = SlotId("slot-1".to_string());
161        let slot2 = SlotId("slot-2".to_string());
162        let slot3 = SlotId("slot-3".to_string());
163
164        bus.send(slot1.clone(), slot3.clone(), "from slot1".to_string());
165        bus.send(slot2.clone(), slot3.clone(), "from slot2".to_string());
166
167        let messages = bus.read(&slot3);
168        assert_eq!(messages.len(), 2);
169        assert_eq!(messages[0].from, slot1);
170        assert_eq!(messages[1].from, slot2);
171    }
172
173    #[test]
174    fn test_empty_inbox() {
175        let bus = MessageBus::new();
176        let slot1 = SlotId("slot-1".to_string());
177
178        let messages = bus.read(&slot1);
179        assert!(messages.is_empty());
180
181        let messages = bus.peek(&slot1);
182        assert!(messages.is_empty());
183    }
184
185    #[test]
186    fn test_broadcast() {
187        let bus = MessageBus::new();
188        let slot0 = SlotId("slot-0".to_string());
189        let slot1 = SlotId("slot-1".to_string());
190        let slot2 = SlotId("slot-2".to_string());
191        let slot3 = SlotId("slot-3".to_string());
192
193        let recipients = vec![slot0.clone(), slot1.clone(), slot2.clone(), slot3.clone()];
194        let ids = bus.broadcast(slot0.clone(), &recipients, "hello all".to_string());
195
196        // Should send to 3 recipients (not self)
197        assert_eq!(ids.len(), 3);
198
199        // Each recipient should have exactly one message
200        assert_eq!(bus.count(&slot1), 1);
201        assert_eq!(bus.count(&slot2), 1);
202        assert_eq!(bus.count(&slot3), 1);
203        assert_eq!(bus.count(&slot0), 0); // sender excluded
204
205        let msg = bus.read(&slot1);
206        assert_eq!(msg[0].content, "hello all");
207        assert_eq!(msg[0].from, slot0);
208    }
209
210    #[test]
211    fn test_count() {
212        let bus = MessageBus::new();
213        let slot1 = SlotId("slot-1".to_string());
214        let slot2 = SlotId("slot-2".to_string());
215
216        assert_eq!(bus.count(&slot1), 0);
217
218        bus.send(slot1.clone(), slot2.clone(), "msg1".to_string());
219        assert_eq!(bus.count(&slot2), 1);
220
221        bus.send(slot1.clone(), slot2.clone(), "msg2".to_string());
222        assert_eq!(bus.count(&slot2), 2);
223
224        bus.read(&slot2);
225        assert_eq!(bus.count(&slot2), 0);
226    }
227}