1use chrono::{DateTime, Utc};
6use dashmap::DashMap;
7use serde::{Deserialize, Serialize};
8
9use crate::types::SlotId;
10
11#[derive(Serialize, Deserialize, Clone, Debug)]
13pub struct Message {
14 pub id: String,
16 pub from: SlotId,
18 pub to: SlotId,
20 pub content: String,
22 pub timestamp: DateTime<Utc>,
24}
25
26pub struct MessageBus {
31 inboxes: DashMap<String, Vec<Message>>,
33}
34
35impl Default for MessageBus {
36 fn default() -> Self {
37 Self::new()
38 }
39}
40
41impl MessageBus {
42 pub fn new() -> Self {
44 Self {
45 inboxes: DashMap::new(),
46 }
47 }
48
49 pub fn send(&self, from: SlotId, to: SlotId, content: String) -> String {
51 let message_id = generate_message_id();
52 let to_key = to.0.clone();
53 let message = Message {
54 id: message_id.clone(),
55 from,
56 to,
57 content,
58 timestamp: Utc::now(),
59 };
60
61 self.inboxes.entry(to_key).or_default().push(message);
62
63 message_id
64 }
65
66 pub fn read(&self, slot_id: &SlotId) -> Vec<Message> {
68 self.inboxes
69 .remove(&slot_id.0)
70 .map(|(_, messages)| messages)
71 .unwrap_or_default()
72 }
73
74 pub fn peek(&self, slot_id: &SlotId) -> Vec<Message> {
76 self.inboxes
77 .get(&slot_id.0)
78 .map(|entry| entry.clone())
79 .unwrap_or_default()
80 }
81
82 pub fn broadcast(&self, from: SlotId, recipients: &[SlotId], content: String) -> Vec<String> {
87 let mut ids = Vec::new();
88 for to in recipients {
89 if *to == from {
90 continue;
91 }
92 let id = self.send(from.clone(), to.clone(), content.clone());
93 ids.push(id);
94 }
95 ids
96 }
97
98 pub fn count(&self, slot_id: &SlotId) -> usize {
100 self.inboxes
101 .get(&slot_id.0)
102 .map(|entry| entry.len())
103 .unwrap_or(0)
104 }
105}
106
107fn generate_message_id() -> String {
108 use std::time::{SystemTime, UNIX_EPOCH};
109 let nanos = SystemTime::now()
110 .duration_since(UNIX_EPOCH)
111 .unwrap_or_default()
112 .as_nanos();
113 format!("msg-{nanos:x}")
114}
115
116#[cfg(test)]
117mod tests {
118 use super::*;
119
120 #[test]
121 fn test_send_and_read() {
122 let bus = MessageBus::new();
123 let slot1 = SlotId("slot-1".to_string());
124 let slot2 = SlotId("slot-2".to_string());
125
126 let msg_id = bus.send(slot1.clone(), slot2.clone(), "hello".to_string());
127 assert!(!msg_id.is_empty());
128
129 let messages = bus.read(&slot2);
130 assert_eq!(messages.len(), 1);
131 assert_eq!(messages[0].id, msg_id);
132 assert_eq!(messages[0].content, "hello");
133 assert_eq!(messages[0].from, slot1);
134 assert_eq!(messages[0].to, slot2);
135
136 let messages_again = bus.read(&slot2);
138 assert!(messages_again.is_empty());
139 }
140
141 #[test]
142 fn test_send_and_peek() {
143 let bus = MessageBus::new();
144 let slot1 = SlotId("slot-1".to_string());
145 let slot2 = SlotId("slot-2".to_string());
146
147 bus.send(slot1.clone(), slot2.clone(), "hello".to_string());
148
149 let messages = bus.peek(&slot2);
150 assert_eq!(messages.len(), 1);
151
152 let messages_again = bus.peek(&slot2);
154 assert_eq!(messages_again.len(), 1);
155 }
156
157 #[test]
158 fn test_multiple_senders() {
159 let bus = MessageBus::new();
160 let slot1 = SlotId("slot-1".to_string());
161 let slot2 = SlotId("slot-2".to_string());
162 let slot3 = SlotId("slot-3".to_string());
163
164 bus.send(slot1.clone(), slot3.clone(), "from slot1".to_string());
165 bus.send(slot2.clone(), slot3.clone(), "from slot2".to_string());
166
167 let messages = bus.read(&slot3);
168 assert_eq!(messages.len(), 2);
169 assert_eq!(messages[0].from, slot1);
170 assert_eq!(messages[1].from, slot2);
171 }
172
173 #[test]
174 fn test_empty_inbox() {
175 let bus = MessageBus::new();
176 let slot1 = SlotId("slot-1".to_string());
177
178 let messages = bus.read(&slot1);
179 assert!(messages.is_empty());
180
181 let messages = bus.peek(&slot1);
182 assert!(messages.is_empty());
183 }
184
185 #[test]
186 fn test_broadcast() {
187 let bus = MessageBus::new();
188 let slot0 = SlotId("slot-0".to_string());
189 let slot1 = SlotId("slot-1".to_string());
190 let slot2 = SlotId("slot-2".to_string());
191 let slot3 = SlotId("slot-3".to_string());
192
193 let recipients = vec![slot0.clone(), slot1.clone(), slot2.clone(), slot3.clone()];
194 let ids = bus.broadcast(slot0.clone(), &recipients, "hello all".to_string());
195
196 assert_eq!(ids.len(), 3);
198
199 assert_eq!(bus.count(&slot1), 1);
201 assert_eq!(bus.count(&slot2), 1);
202 assert_eq!(bus.count(&slot3), 1);
203 assert_eq!(bus.count(&slot0), 0); let msg = bus.read(&slot1);
206 assert_eq!(msg[0].content, "hello all");
207 assert_eq!(msg[0].from, slot0);
208 }
209
210 #[test]
211 fn test_count() {
212 let bus = MessageBus::new();
213 let slot1 = SlotId("slot-1".to_string());
214 let slot2 = SlotId("slot-2".to_string());
215
216 assert_eq!(bus.count(&slot1), 0);
217
218 bus.send(slot1.clone(), slot2.clone(), "msg1".to_string());
219 assert_eq!(bus.count(&slot2), 1);
220
221 bus.send(slot1.clone(), slot2.clone(), "msg2".to_string());
222 assert_eq!(bus.count(&slot2), 2);
223
224 bus.read(&slot2);
225 assert_eq!(bus.count(&slot2), 0);
226 }
227}