monocoque_core/pubsub/
index.rs1use bytes::Bytes;
11use smallvec::SmallVec;
12
13pub type PeerKey = u64;
16
17#[derive(Debug, Clone)]
18struct Subscription {
19 prefix: Bytes,
20 peers: SmallVec<[PeerKey; 4]>,
22}
23
24#[derive(Debug, Default)]
25pub struct SubscriptionIndex {
26 subs: Vec<Subscription>,
27}
28
29impl SubscriptionIndex {
30 #[must_use]
31 pub const fn new() -> Self {
32 Self { subs: Vec::new() }
33 }
34
35 #[inline]
36 #[must_use]
37 pub fn is_empty(&self) -> bool {
38 self.subs.is_empty()
39 }
40
41 pub fn subscribe(&mut self, peer: PeerKey, prefix: Bytes) {
47 match self.subs.binary_search_by(|s| s.prefix.cmp(&prefix)) {
48 Ok(idx) => {
49 let peers = &mut self.subs[idx].peers;
50 if !peers.contains(&peer) {
51 peers.push(peer);
52 }
53 }
54 Err(idx) => {
55 let mut peers = SmallVec::<[PeerKey; 4]>::new();
56 peers.push(peer);
57 self.subs.insert(idx, Subscription { prefix, peers });
58 }
59 }
60 }
61
62 pub fn unsubscribe(&mut self, peer: PeerKey, prefix: &Bytes) {
64 if let Ok(idx) = self.subs.binary_search_by(|s| s.prefix.cmp(prefix)) {
65 let peers = &mut self.subs[idx].peers;
66 if let Some(pos) = peers.iter().position(|p| *p == peer) {
67 peers.swap_remove(pos);
68 }
69 if peers.is_empty() {
70 self.subs.remove(idx);
71 }
72 }
73 }
74
75 pub fn remove_peer_everywhere(&mut self, peer: PeerKey) {
79 let mut i = 0usize;
80 while i < self.subs.len() {
81 let peers = &mut self.subs[i].peers;
82
83 if let Some(pos) = peers.iter().position(|p| *p == peer) {
84 peers.swap_remove(pos);
85 }
86
87 if peers.is_empty() {
88 self.subs.remove(i);
89 } else {
90 i += 1;
91 }
92 }
93 }
94
95 #[must_use]
104 pub fn match_topic(&self, topic: &[u8]) -> SmallVec<[PeerKey; 16]> {
105 let mut out: SmallVec<[PeerKey; 16]> = SmallVec::new();
106
107 for sub in &self.subs {
108 let p = sub.prefix.as_ref();
109
110 if p > topic {
113 break;
114 }
115
116 if topic.starts_with(p) {
117 out.extend_from_slice(&sub.peers);
118 }
119 }
120
121 if out.len() > 1 {
123 out.sort_unstable();
124 out.dedup();
125 }
126
127 out
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134
135 #[test]
136 fn subscribe_and_match() {
137 let mut idx = SubscriptionIndex::new();
138
139 idx.subscribe(1, Bytes::from_static(b"A"));
140 idx.subscribe(2, Bytes::from_static(b"AB"));
141 idx.subscribe(3, Bytes::from_static(b"B"));
142
143 let m = idx.match_topic(b"ABC");
144 assert_eq!(m.as_slice(), &[1, 2]);
145
146 let m = idx.match_topic(b"BANANA");
147 assert_eq!(m.as_slice(), &[3]);
148 }
149
150 #[test]
151 fn dedup_nested_prefixes() {
152 let mut idx = SubscriptionIndex::new();
153
154 idx.subscribe(7, Bytes::from_static(b"A"));
155 idx.subscribe(7, Bytes::from_static(b"AB"));
156
157 let m = idx.match_topic(b"ABCD");
158 assert_eq!(m.as_slice(), &[7]);
159 }
160
161 #[test]
162 fn remove_peer_everywhere_cleans_empty_entries() {
163 let mut idx = SubscriptionIndex::new();
164
165 idx.subscribe(1, Bytes::from_static(b"A"));
166 idx.subscribe(2, Bytes::from_static(b"A"));
167 idx.subscribe(1, Bytes::from_static(b"AB"));
168
169 idx.remove_peer_everywhere(1);
170
171 let m = idx.match_topic(b"ABCD");
172 assert_eq!(m.as_slice(), &[2]);
173 }
174}