Skip to main content

monocoque_core/pubsub/
index.rs

1//! PUB/SUB Subscription Index (Phase 3: Sorted Prefix Table)
2//!
3//! Design:
4//! - Keep subscriptions in a Vec sorted lexicographically by `prefix`.
5//! - subscribe/unsubscribe: O(log N) search + O(N) insert/remove shift (N ~ < 10k typical).
6//! - `match_topic` hot-path: cache-friendly forward scan with early-exit when prefix > topic.
7//! - Returns `SmallVec` of `PeerKeys` to avoid heap alloc in common cases.
8//! - Dedups results because peers may subscribe to overlapping prefixes.
9
10use bytes::Bytes;
11use smallvec::SmallVec;
12
13/// Compact integer ID for peers to keep the index cache-dense.
14/// (Avoids storing Bytes/Senders directly in the hot structure.)
15pub type PeerKey = u64;
16
17#[derive(Debug, Clone)]
18struct Subscription {
19    prefix: Bytes,
20    /// Inline up to 4 peers without heap allocation (common low fanout).
21    peers: SmallVec<[PeerKey; 4]>,
22}
23
24#[derive(Debug, Default)]
25pub struct SubscriptionIndex {
26    subs: Vec<Subscription>,
27}
28
29impl SubscriptionIndex {
30    #[must_use]
31    pub const fn new() -> Self {
32        Self { subs: Vec::new() }
33    }
34
35    #[inline]
36    #[must_use]
37    pub fn is_empty(&self) -> bool {
38        self.subs.is_empty()
39    }
40
41    /// Adds a subscription for `peer` to `prefix`.
42    ///
43    /// Complexity:
44    /// - O(log N) to find
45    /// - O(N) for insertion shift in the vec (acceptable for typical ZMQ sizes)
46    pub fn subscribe(&mut self, peer: PeerKey, prefix: Bytes) {
47        match self.subs.binary_search_by(|s| s.prefix.cmp(&prefix)) {
48            Ok(idx) => {
49                let peers = &mut self.subs[idx].peers;
50                if !peers.contains(&peer) {
51                    peers.push(peer);
52                }
53            }
54            Err(idx) => {
55                let mut peers = SmallVec::<[PeerKey; 4]>::new();
56                peers.push(peer);
57                self.subs.insert(idx, Subscription { prefix, peers });
58            }
59        }
60    }
61
62    /// Removes a subscription for `peer` from `prefix`.
63    pub fn unsubscribe(&mut self, peer: PeerKey, prefix: &Bytes) {
64        if let Ok(idx) = self.subs.binary_search_by(|s| s.prefix.cmp(prefix)) {
65            let peers = &mut self.subs[idx].peers;
66            if let Some(pos) = peers.iter().position(|p| *p == peer) {
67                peers.swap_remove(pos);
68            }
69            if peers.is_empty() {
70                self.subs.remove(idx);
71            }
72        }
73    }
74
75    /// Remove `peer` from every prefix (used on disconnect).
76    ///
77    /// Complexity: O(N) scan, acceptable on churn events.
78    pub fn remove_peer_everywhere(&mut self, peer: PeerKey) {
79        let mut i = 0usize;
80        while i < self.subs.len() {
81            let peers = &mut self.subs[i].peers;
82
83            if let Some(pos) = peers.iter().position(|p| *p == peer) {
84                peers.swap_remove(pos);
85            }
86
87            if peers.is_empty() {
88                self.subs.remove(i);
89            } else {
90                i += 1;
91            }
92        }
93    }
94
95    /// Match a topic against all subscriptions.
96    ///
97    /// Returns a deduplicated list of `PeerKeys`.
98    ///
99    /// Hot path characteristics:
100    /// - Forward scan over sorted prefixes.
101    /// - Early exit when prefix > topic lexicographically (cannot be a prefix).
102    /// - Starts-with check for actual prefix match.
103    #[must_use]
104    pub fn match_topic(&self, topic: &[u8]) -> SmallVec<[PeerKey; 16]> {
105        let mut out: SmallVec<[PeerKey; 16]> = SmallVec::new();
106
107        for sub in &self.subs {
108            let p = sub.prefix.as_ref();
109
110            // If prefix > topic, it cannot be a prefix of topic.
111            // e.g. prefix="apply" > topic="apple" => stop.
112            if p > topic {
113                break;
114            }
115
116            if topic.starts_with(p) {
117                out.extend_from_slice(&sub.peers);
118            }
119        }
120
121        // Dedup if needed (peer might have subscribed to nested prefixes).
122        if out.len() > 1 {
123            out.sort_unstable();
124            out.dedup();
125        }
126
127        out
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134
135    #[test]
136    fn subscribe_and_match() {
137        let mut idx = SubscriptionIndex::new();
138
139        idx.subscribe(1, Bytes::from_static(b"A"));
140        idx.subscribe(2, Bytes::from_static(b"AB"));
141        idx.subscribe(3, Bytes::from_static(b"B"));
142
143        let m = idx.match_topic(b"ABC");
144        assert_eq!(m.as_slice(), &[1, 2]);
145
146        let m = idx.match_topic(b"BANANA");
147        assert_eq!(m.as_slice(), &[3]);
148    }
149
150    #[test]
151    fn dedup_nested_prefixes() {
152        let mut idx = SubscriptionIndex::new();
153
154        idx.subscribe(7, Bytes::from_static(b"A"));
155        idx.subscribe(7, Bytes::from_static(b"AB"));
156
157        let m = idx.match_topic(b"ABCD");
158        assert_eq!(m.as_slice(), &[7]);
159    }
160
161    #[test]
162    fn remove_peer_everywhere_cleans_empty_entries() {
163        let mut idx = SubscriptionIndex::new();
164
165        idx.subscribe(1, Bytes::from_static(b"A"));
166        idx.subscribe(2, Bytes::from_static(b"A"));
167        idx.subscribe(1, Bytes::from_static(b"AB"));
168
169        idx.remove_peer_everywhere(1);
170
171        let m = idx.match_topic(b"ABCD");
172        assert_eq!(m.as_slice(), &[2]);
173    }
174}