Skip to main content

rio_rs/
message_router.rs

1//! Maps objects and their ids to different broadcast channels
2//!
3//! <div class="warning">
4//!
5//! # TODO
6//! - [ ] This component might be temporary. It serves as a router between different publishers and subscribers
7//! - [ ] I need a way to remove unused channels (use LRU)
8//! - [ ] Configure channel limits
9//!
10//! </div>
11
12use dashmap::DashMap;
13use tokio::sync::broadcast;
14
15use crate::protocol::pubsub::SubscriptionResponse;
16
17type InboxHashMap = DashMap<(String, String), broadcast::Sender<SubscriptionResponse>>;
18
19#[derive(Debug, Default, Clone)]
20pub struct MessageRouter {
21    inboxes: InboxHashMap,
22}
23
24impl MessageRouter {
25    pub fn create_subscription(
26        &self,
27        k1: String,
28        k2: String,
29    ) -> broadcast::Receiver<SubscriptionResponse> {
30        let sender = self.inboxes.entry((k1, k2)).or_insert_with(|| {
31            let (sender, _) = broadcast::channel(1_000);
32            sender
33        });
34        sender.subscribe()
35    }
36
37    pub fn publish(&self, k1: String, k2: String, message: SubscriptionResponse) {
38        let maybe_sender = self.inboxes.get_mut(&(k1, k2));
39        if let Some(sender) = maybe_sender {
40            sender.send(message).ok();
41        }
42    }
43}