use chrono::{DateTime, Utc};
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use crate::types::SlotId;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Message {
pub id: String,
pub from: SlotId,
pub to: SlotId,
pub content: String,
pub timestamp: DateTime<Utc>,
}
pub struct MessageBus {
inboxes: DashMap<String, Vec<Message>>,
}
impl Default for MessageBus {
fn default() -> Self {
Self::new()
}
}
impl MessageBus {
pub fn new() -> Self {
Self {
inboxes: DashMap::new(),
}
}
pub fn send(&self, from: SlotId, to: SlotId, content: String) -> String {
let message_id = generate_message_id();
let to_key = to.0.clone();
let message = Message {
id: message_id.clone(),
from,
to,
content,
timestamp: Utc::now(),
};
self.inboxes.entry(to_key).or_default().push(message);
message_id
}
pub fn read(&self, slot_id: &SlotId) -> Vec<Message> {
self.inboxes
.remove(&slot_id.0)
.map(|(_, messages)| messages)
.unwrap_or_default()
}
pub fn peek(&self, slot_id: &SlotId) -> Vec<Message> {
self.inboxes
.get(&slot_id.0)
.map(|entry| entry.clone())
.unwrap_or_default()
}
pub fn broadcast(&self, from: SlotId, recipients: &[SlotId], content: String) -> Vec<String> {
let mut ids = Vec::new();
for to in recipients {
if *to == from {
continue;
}
let id = self.send(from.clone(), to.clone(), content.clone());
ids.push(id);
}
ids
}
pub fn count(&self, slot_id: &SlotId) -> usize {
self.inboxes
.get(&slot_id.0)
.map(|entry| entry.len())
.unwrap_or(0)
}
}
fn generate_message_id() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
format!("msg-{nanos:x}")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_send_and_read() {
let bus = MessageBus::new();
let slot1 = SlotId("slot-1".to_string());
let slot2 = SlotId("slot-2".to_string());
let msg_id = bus.send(slot1.clone(), slot2.clone(), "hello".to_string());
assert!(!msg_id.is_empty());
let messages = bus.read(&slot2);
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].id, msg_id);
assert_eq!(messages[0].content, "hello");
assert_eq!(messages[0].from, slot1);
assert_eq!(messages[0].to, slot2);
let messages_again = bus.read(&slot2);
assert!(messages_again.is_empty());
}
#[test]
fn test_send_and_peek() {
let bus = MessageBus::new();
let slot1 = SlotId("slot-1".to_string());
let slot2 = SlotId("slot-2".to_string());
bus.send(slot1.clone(), slot2.clone(), "hello".to_string());
let messages = bus.peek(&slot2);
assert_eq!(messages.len(), 1);
let messages_again = bus.peek(&slot2);
assert_eq!(messages_again.len(), 1);
}
#[test]
fn test_multiple_senders() {
let bus = MessageBus::new();
let slot1 = SlotId("slot-1".to_string());
let slot2 = SlotId("slot-2".to_string());
let slot3 = SlotId("slot-3".to_string());
bus.send(slot1.clone(), slot3.clone(), "from slot1".to_string());
bus.send(slot2.clone(), slot3.clone(), "from slot2".to_string());
let messages = bus.read(&slot3);
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].from, slot1);
assert_eq!(messages[1].from, slot2);
}
#[test]
fn test_empty_inbox() {
let bus = MessageBus::new();
let slot1 = SlotId("slot-1".to_string());
let messages = bus.read(&slot1);
assert!(messages.is_empty());
let messages = bus.peek(&slot1);
assert!(messages.is_empty());
}
#[test]
fn test_broadcast() {
let bus = MessageBus::new();
let slot0 = SlotId("slot-0".to_string());
let slot1 = SlotId("slot-1".to_string());
let slot2 = SlotId("slot-2".to_string());
let slot3 = SlotId("slot-3".to_string());
let recipients = vec![slot0.clone(), slot1.clone(), slot2.clone(), slot3.clone()];
let ids = bus.broadcast(slot0.clone(), &recipients, "hello all".to_string());
assert_eq!(ids.len(), 3);
assert_eq!(bus.count(&slot1), 1);
assert_eq!(bus.count(&slot2), 1);
assert_eq!(bus.count(&slot3), 1);
assert_eq!(bus.count(&slot0), 0);
let msg = bus.read(&slot1);
assert_eq!(msg[0].content, "hello all");
assert_eq!(msg[0].from, slot0);
}
#[test]
fn test_count() {
let bus = MessageBus::new();
let slot1 = SlotId("slot-1".to_string());
let slot2 = SlotId("slot-2".to_string());
assert_eq!(bus.count(&slot1), 0);
bus.send(slot1.clone(), slot2.clone(), "msg1".to_string());
assert_eq!(bus.count(&slot2), 1);
bus.send(slot1.clone(), slot2.clone(), "msg2".to_string());
assert_eq!(bus.count(&slot2), 2);
bus.read(&slot2);
assert_eq!(bus.count(&slot2), 0);
}
}