Skip to main content

monocoque_core/
subscription.rs

1//! Subscription trie for efficient topic matching in XPUB/XSUB/SUB sockets.
2//!
3//! This provides a more efficient subscription matching mechanism than linear
4//! scanning, especially for large numbers of subscriptions.
5
6use bytes::Bytes;
7use std::collections::BTreeSet;
8
9/// A subscription entry with topic prefix
10#[derive(Debug, Clone)]
11pub struct Subscription {
12    /// Topic prefix (empty = subscribe to all)
13    pub prefix: Bytes,
14}
15
16impl Subscription {
17    /// Create a new subscription for a topic prefix
18    #[must_use]
19    pub const fn new(prefix: Bytes) -> Self {
20        Self { prefix }
21    }
22
23    /// Check if this subscription matches a given topic
24    #[must_use]
25    pub fn matches(&self, topic: &[u8]) -> bool {
26        // Empty prefix matches everything
27        if self.prefix.is_empty() {
28            return true;
29        }
30
31        // Check if topic starts with prefix
32        topic.len() >= self.prefix.len() && topic[..self.prefix.len()] == self.prefix[..]
33    }
34}
35
36/// Efficient subscription storage using a sorted set for O(log N) operations.
37///
38/// Backed by a `BTreeSet<Vec<u8>>` which allows O(log N) prefix searching:
39/// find the largest stored prefix ≤ the topic, then check if it is a prefix
40/// of the topic.  Subscribe/unsubscribe are also O(log N).
41#[derive(Debug, Default)]
42pub struct SubscriptionTrie {
43    prefixes: BTreeSet<Vec<u8>>,
44}
45
46impl SubscriptionTrie {
47    /// Create a new empty subscription trie
48    #[must_use]
49    pub fn new() -> Self {
50        Self {
51            prefixes: BTreeSet::new(),
52        }
53    }
54
55    /// Add a subscription
56    pub fn subscribe(&mut self, prefix: Bytes) {
57        self.prefixes.insert(prefix.to_vec());
58    }
59
60    /// Remove a subscription
61    pub fn unsubscribe(&mut self, prefix: &Bytes) {
62        self.prefixes.remove(prefix.as_ref());
63    }
64
65    /// Check if a topic matches any subscription
66    ///
67    /// Returns true if the topic should be delivered.
68    /// O(log N) using BTreeSet range lookup.
69    #[must_use]
70    pub fn matches(&self, topic: &[u8]) -> bool {
71        if self.prefixes.is_empty() {
72            return false;
73        }
74
75        // Check empty prefix first (matches everything)
76        if self.prefixes.contains(&[][..]) {
77            return true;
78        }
79
80        // Find the largest stored prefix <= topic.
81        // Any stored prefix that is a true prefix of `topic` must be <= topic
82        // in lexicographic order, so the best candidate is the largest such key.
83        use std::ops::Bound;
84        if let Some(candidate) = self
85            .prefixes
86            .range::<Vec<u8>, _>((Bound::Unbounded, Bound::Included(&topic.to_vec())))
87            .next_back()
88        {
89            if topic.starts_with(candidate.as_slice()) {
90                return true;
91            }
92        }
93
94        false
95    }
96
97    /// Get all subscriptions as a `Vec<Subscription>`.
98    #[must_use]
99    pub fn subscriptions(&self) -> Vec<Subscription> {
100        self.prefixes
101            .iter()
102            .map(|p| Subscription::new(Bytes::copy_from_slice(p)))
103            .collect()
104    }
105
106    /// Check if there are no subscriptions
107    #[must_use]
108    pub fn is_empty(&self) -> bool {
109        self.prefixes.is_empty()
110    }
111
112    /// Get the number of subscriptions
113    #[must_use]
114    pub fn len(&self) -> usize {
115        self.prefixes.len()
116    }
117
118    /// Clear all subscriptions
119    pub fn clear(&mut self) {
120        self.prefixes.clear();
121    }
122}
123
124/// Subscription event for XPUB socket
125#[derive(Debug, Clone, PartialEq, Eq)]
126pub enum SubscriptionEvent {
127    /// A peer subscribed to a topic
128    Subscribe(Bytes),
129    /// A peer unsubscribed from a topic
130    Unsubscribe(Bytes),
131}
132
133impl SubscriptionEvent {
134    /// Create a subscription event from a ZMTP subscription message
135    ///
136    /// Format: [0x01|0x00] [topic prefix...]
137    #[must_use]
138    pub fn from_message(msg: &[u8]) -> Option<Self> {
139        if msg.is_empty() {
140            return None;
141        }
142
143        let prefix = Bytes::copy_from_slice(&msg[1..]);
144        match msg[0] {
145            0x01 => Some(Self::Subscribe(prefix)),
146            0x00 => Some(Self::Unsubscribe(prefix)),
147            _ => None,
148        }
149    }
150
151    /// Encode this event as a ZMTP subscription message
152    #[must_use]
153    pub fn to_message(&self) -> Bytes {
154        let (cmd, prefix) = match self {
155            Self::Subscribe(p) => (0x01u8, p),
156            Self::Unsubscribe(p) => (0x00u8, p),
157        };
158
159        let mut msg = Vec::with_capacity(1 + prefix.len());
160        msg.push(cmd);
161        msg.extend_from_slice(prefix);
162        Bytes::from(msg)
163    }
164
165    /// Get the topic prefix
166    #[must_use]
167    pub const fn prefix(&self) -> &Bytes {
168        match self {
169            Self::Subscribe(p) | Self::Unsubscribe(p) => p,
170        }
171    }
172
173    /// Check if this is a subscribe event
174    #[must_use]
175    pub const fn is_subscribe(&self) -> bool {
176        matches!(self, Self::Subscribe(_))
177    }
178
179    /// Check if this is an unsubscribe event
180    #[must_use]
181    pub const fn is_unsubscribe(&self) -> bool {
182        matches!(self, Self::Unsubscribe(_))
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189
190    #[test]
191    fn test_subscription_matches() {
192        let sub = Subscription::new(Bytes::from_static(b"topic."));
193
194        assert!(sub.matches(b"topic.foo"));
195        assert!(sub.matches(b"topic.bar"));
196        assert!(!sub.matches(b"other.foo"));
197        assert!(!sub.matches(b"topi"));
198    }
199
200    #[test]
201    fn test_empty_subscription_matches_all() {
202        let sub = Subscription::new(Bytes::new());
203
204        assert!(sub.matches(b"anything"));
205        assert!(sub.matches(b""));
206    }
207
208    #[test]
209    fn test_trie_basic() {
210        let mut trie = SubscriptionTrie::new();
211
212        assert!(!trie.matches(b"topic.foo"));
213
214        trie.subscribe(Bytes::from_static(b"topic."));
215        assert!(trie.matches(b"topic.foo"));
216        assert!(!trie.matches(b"other.foo"));
217
218        trie.unsubscribe(&Bytes::from_static(b"topic."));
219        assert!(!trie.matches(b"topic.foo"));
220    }
221
222    #[test]
223    fn test_trie_multiple_subscriptions() {
224        let mut trie = SubscriptionTrie::new();
225
226        trie.subscribe(Bytes::from_static(b"topic."));
227        trie.subscribe(Bytes::from_static(b"events."));
228
229        assert!(trie.matches(b"topic.foo"));
230        assert!(trie.matches(b"events.bar"));
231        assert!(!trie.matches(b"other.baz"));
232    }
233
234    #[test]
235    fn test_trie_empty_prefix_matches_all() {
236        let mut trie = SubscriptionTrie::new();
237        trie.subscribe(Bytes::new());
238
239        assert!(trie.matches(b"anything"));
240        assert!(trie.matches(b""));
241    }
242
243    #[test]
244    fn test_trie_no_false_prefix_match() {
245        let mut trie = SubscriptionTrie::new();
246        trie.subscribe(Bytes::from_static(b"topic."));
247
248        // "topic" is a prefix of "topic." but "topic." is NOT a prefix of "topic"
249        assert!(!trie.matches(b"topic"));
250        assert!(trie.matches(b"topic."));
251        assert!(trie.matches(b"topic.sub"));
252    }
253
254    #[test]
255    fn test_subscription_event() {
256        let sub = SubscriptionEvent::Subscribe(Bytes::from_static(b"topic"));
257        let msg = sub.to_message();
258
259        assert_eq!(msg[0], 0x01);
260        assert_eq!(&msg[1..], b"topic");
261
262        let parsed = SubscriptionEvent::from_message(&msg).unwrap();
263        assert_eq!(parsed, sub);
264    }
265
266    #[test]
267    fn test_unsubscription_event() {
268        let unsub = SubscriptionEvent::Unsubscribe(Bytes::from_static(b"topic"));
269        let msg = unsub.to_message();
270
271        assert_eq!(msg[0], 0x00);
272        assert_eq!(&msg[1..], b"topic");
273
274        let parsed = SubscriptionEvent::from_message(&msg).unwrap();
275        assert_eq!(parsed, unsub);
276    }
277}