1use doido_core::Result;
2use std::collections::HashMap;
3use tokio::sync::{broadcast, Mutex};
4
5const CHANNEL_CAPACITY: usize = 128;
6
7#[async_trait::async_trait]
8pub trait PubSub: Send + Sync {
9 async fn subscribe(&self, stream: &str) -> Result<broadcast::Receiver<String>>;
10 async fn publish(&self, stream: &str, message: &str) -> Result<()>;
11 async fn unsubscribe(&self, stream: &str) -> Result<()>;
12}
13
14pub struct MemoryPubSub {
15 senders: Mutex<HashMap<String, broadcast::Sender<String>>>,
16}
17
18impl MemoryPubSub {
19 pub fn new() -> Self {
20 Self {
21 senders: Mutex::new(HashMap::new()),
22 }
23 }
24}
25
26impl Default for MemoryPubSub {
27 fn default() -> Self {
28 Self::new()
29 }
30}
31
32#[async_trait::async_trait]
33impl PubSub for MemoryPubSub {
34 async fn subscribe(&self, stream: &str) -> Result<broadcast::Receiver<String>> {
35 let mut senders = self.senders.lock().await;
36 let sender = senders
37 .entry(stream.to_string())
38 .or_insert_with(|| broadcast::channel(CHANNEL_CAPACITY).0);
39 Ok(sender.subscribe())
40 }
41
42 async fn publish(&self, stream: &str, message: &str) -> Result<()> {
43 let mut senders = self.senders.lock().await;
44 let sender = senders
45 .entry(stream.to_string())
46 .or_insert_with(|| broadcast::channel(CHANNEL_CAPACITY).0);
47 let _ = sender.send(message.to_string()); Ok(())
49 }
50
51 async fn unsubscribe(&self, stream: &str) -> Result<()> {
52 self.senders.lock().await.remove(stream);
53 Ok(())
54 }
55}