1use crate::error::MultiError;
4use crate::types::Message;
5use std::collections::HashMap;
6use tokio::sync::{mpsc, RwLock};
7
8pub struct Mailbox {
13 senders: RwLock<HashMap<String, mpsc::Sender<Message>>>,
14 buffer_size: usize,
15}
16
17impl Mailbox {
18 pub fn new(buffer_size: usize) -> Self {
19 Self {
20 senders: RwLock::new(HashMap::new()),
21 buffer_size,
22 }
23 }
24
25 pub async fn register(&self, name: &str) -> mpsc::Receiver<Message> {
27 let (tx, rx) = mpsc::channel(self.buffer_size);
28 self.senders.write().await.insert(name.to_string(), tx);
29 rx
30 }
31
32 pub async fn send(&self, msg: Message) -> Result<(), MultiError> {
34 let senders = self.senders.read().await;
35 let tx = senders
36 .get(&msg.to)
37 .ok_or_else(|| MultiError::MailboxSend(format!("no agent '{}'", msg.to)))?;
38 tx.send(msg)
39 .await
40 .map_err(|e| MultiError::MailboxSend(e.to_string()))
41 }
42
43 pub async fn broadcast(&self, msg: Message) -> Result<(), MultiError> {
45 let senders = self.senders.read().await;
46 for (name, tx) in senders.iter() {
47 if *name != msg.from {
48 let mut m = msg.clone();
49 m.to = name.clone();
50 let _ = tx.send(m).await;
51 }
52 }
53 Ok(())
54 }
55
56 pub async fn unregister(&self, name: &str) {
58 self.senders.write().await.remove(name);
59 }
60}
61
62impl Default for Mailbox {
63 fn default() -> Self {
64 Self::new(64)
65 }
66}