Skip to main content

doido_cable/
pubsub.rs

1use doido_core::Result;
2use std::collections::HashMap;
3use tokio::sync::{broadcast, Mutex};
4
5const CHANNEL_CAPACITY: usize = 128;
6
7#[async_trait::async_trait]
8pub trait PubSub: Send + Sync {
9    async fn subscribe(&self, stream: &str) -> Result<broadcast::Receiver<String>>;
10    async fn publish(&self, stream: &str, message: &str) -> Result<()>;
11    async fn unsubscribe(&self, stream: &str) -> Result<()>;
12}
13
14pub struct MemoryPubSub {
15    senders: Mutex<HashMap<String, broadcast::Sender<String>>>,
16}
17
18impl MemoryPubSub {
19    pub fn new() -> Self {
20        Self {
21            senders: Mutex::new(HashMap::new()),
22        }
23    }
24}
25
26impl Default for MemoryPubSub {
27    fn default() -> Self {
28        Self::new()
29    }
30}
31
32#[async_trait::async_trait]
33impl PubSub for MemoryPubSub {
34    async fn subscribe(&self, stream: &str) -> Result<broadcast::Receiver<String>> {
35        let mut senders = self.senders.lock().await;
36        let sender = senders
37            .entry(stream.to_string())
38            .or_insert_with(|| broadcast::channel(CHANNEL_CAPACITY).0);
39        Ok(sender.subscribe())
40    }
41
42    async fn publish(&self, stream: &str, message: &str) -> Result<()> {
43        let mut senders = self.senders.lock().await;
44        let sender = senders
45            .entry(stream.to_string())
46            .or_insert_with(|| broadcast::channel(CHANNEL_CAPACITY).0);
47        let _ = sender.send(message.to_string()); // ignore if no subscribers
48        Ok(())
49    }
50
51    async fn unsubscribe(&self, stream: &str) -> Result<()> {
52        self.senders.lock().await.remove(stream);
53        Ok(())
54    }
55}