Skip to main content

de_mls/ds/
topic_filter.rs

1//! Transport-agnostic topic filter used by the app as a fast allowlist.
2use std::collections::HashSet;
3
4use tokio::sync::RwLock;
5
6use crate::ds::SUBTOPICS;
7
8#[derive(Debug, Clone, PartialEq, Eq, Hash)]
9pub struct TopicKey {
10    pub group_id: String,
11    pub subtopic: String,
12}
13
14impl TopicKey {
15    pub fn new(group_id: &str, subtopic: &str) -> Self {
16        Self {
17            group_id: group_id.to_string(),
18            subtopic: subtopic.to_string(),
19        }
20    }
21}
22
23/// Fast allowlist for inbound routing.
24#[derive(Default, Debug)]
25pub struct TopicFilter {
26    set: RwLock<HashSet<TopicKey>>,
27}
28
29impl TopicFilter {
30    pub fn new() -> Self {
31        Self::default()
32    }
33
34    /// Add all subtopics for a group.
35    pub async fn add_many(&self, group_name: &str) {
36        let mut w = self.set.write().await;
37        for sub in SUBTOPICS {
38            w.insert(TopicKey::new(group_name, sub));
39        }
40    }
41
42    /// Remove all subtopics for a group.
43    pub async fn remove_many(&self, group_name: &str) {
44        self.set.write().await.retain(|x| x.group_id != group_name);
45    }
46
47    /// Membership test (first-stage filter).
48    #[inline]
49    pub async fn contains(&self, group_id: &str, subtopic: &str) -> bool {
50        let key = TopicKey::new(group_id, subtopic);
51        self.set.read().await.contains(&key)
52    }
53
54    pub async fn snapshot(&self) -> Vec<TopicKey> {
55        self.set.read().await.iter().cloned().collect()
56    }
57}