monocoque_core/
subscription.rs1use bytes::Bytes;
7use std::collections::BTreeSet;
8
9#[derive(Debug, Clone)]
11pub struct Subscription {
12 pub prefix: Bytes,
14}
15
16impl Subscription {
17 #[must_use]
19 pub const fn new(prefix: Bytes) -> Self {
20 Self { prefix }
21 }
22
23 #[must_use]
25 pub fn matches(&self, topic: &[u8]) -> bool {
26 if self.prefix.is_empty() {
28 return true;
29 }
30
31 topic.len() >= self.prefix.len() && topic[..self.prefix.len()] == self.prefix[..]
33 }
34}
35
36#[derive(Debug, Default)]
42pub struct SubscriptionTrie {
43 prefixes: BTreeSet<Vec<u8>>,
44}
45
46impl SubscriptionTrie {
47 #[must_use]
49 pub fn new() -> Self {
50 Self {
51 prefixes: BTreeSet::new(),
52 }
53 }
54
55 pub fn subscribe(&mut self, prefix: Bytes) {
57 self.prefixes.insert(prefix.to_vec());
58 }
59
60 pub fn unsubscribe(&mut self, prefix: &Bytes) {
62 self.prefixes.remove(prefix.as_ref());
63 }
64
65 #[must_use]
70 pub fn matches(&self, topic: &[u8]) -> bool {
71 if self.prefixes.is_empty() {
72 return false;
73 }
74
75 if self.prefixes.contains(&[][..]) {
77 return true;
78 }
79
80 use std::ops::Bound;
84 if let Some(candidate) = self
85 .prefixes
86 .range::<Vec<u8>, _>((Bound::Unbounded, Bound::Included(&topic.to_vec())))
87 .next_back()
88 {
89 if topic.starts_with(candidate.as_slice()) {
90 return true;
91 }
92 }
93
94 false
95 }
96
97 #[must_use]
99 pub fn subscriptions(&self) -> Vec<Subscription> {
100 self.prefixes
101 .iter()
102 .map(|p| Subscription::new(Bytes::copy_from_slice(p)))
103 .collect()
104 }
105
106 #[must_use]
108 pub fn is_empty(&self) -> bool {
109 self.prefixes.is_empty()
110 }
111
112 #[must_use]
114 pub fn len(&self) -> usize {
115 self.prefixes.len()
116 }
117
118 pub fn clear(&mut self) {
120 self.prefixes.clear();
121 }
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
126pub enum SubscriptionEvent {
127 Subscribe(Bytes),
129 Unsubscribe(Bytes),
131}
132
133impl SubscriptionEvent {
134 #[must_use]
138 pub fn from_message(msg: &[u8]) -> Option<Self> {
139 if msg.is_empty() {
140 return None;
141 }
142
143 let prefix = Bytes::copy_from_slice(&msg[1..]);
144 match msg[0] {
145 0x01 => Some(Self::Subscribe(prefix)),
146 0x00 => Some(Self::Unsubscribe(prefix)),
147 _ => None,
148 }
149 }
150
151 #[must_use]
153 pub fn to_message(&self) -> Bytes {
154 let (cmd, prefix) = match self {
155 Self::Subscribe(p) => (0x01u8, p),
156 Self::Unsubscribe(p) => (0x00u8, p),
157 };
158
159 let mut msg = Vec::with_capacity(1 + prefix.len());
160 msg.push(cmd);
161 msg.extend_from_slice(prefix);
162 Bytes::from(msg)
163 }
164
165 #[must_use]
167 pub const fn prefix(&self) -> &Bytes {
168 match self {
169 Self::Subscribe(p) | Self::Unsubscribe(p) => p,
170 }
171 }
172
173 #[must_use]
175 pub const fn is_subscribe(&self) -> bool {
176 matches!(self, Self::Subscribe(_))
177 }
178
179 #[must_use]
181 pub const fn is_unsubscribe(&self) -> bool {
182 matches!(self, Self::Unsubscribe(_))
183 }
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189
190 #[test]
191 fn test_subscription_matches() {
192 let sub = Subscription::new(Bytes::from_static(b"topic."));
193
194 assert!(sub.matches(b"topic.foo"));
195 assert!(sub.matches(b"topic.bar"));
196 assert!(!sub.matches(b"other.foo"));
197 assert!(!sub.matches(b"topi"));
198 }
199
200 #[test]
201 fn test_empty_subscription_matches_all() {
202 let sub = Subscription::new(Bytes::new());
203
204 assert!(sub.matches(b"anything"));
205 assert!(sub.matches(b""));
206 }
207
208 #[test]
209 fn test_trie_basic() {
210 let mut trie = SubscriptionTrie::new();
211
212 assert!(!trie.matches(b"topic.foo"));
213
214 trie.subscribe(Bytes::from_static(b"topic."));
215 assert!(trie.matches(b"topic.foo"));
216 assert!(!trie.matches(b"other.foo"));
217
218 trie.unsubscribe(&Bytes::from_static(b"topic."));
219 assert!(!trie.matches(b"topic.foo"));
220 }
221
222 #[test]
223 fn test_trie_multiple_subscriptions() {
224 let mut trie = SubscriptionTrie::new();
225
226 trie.subscribe(Bytes::from_static(b"topic."));
227 trie.subscribe(Bytes::from_static(b"events."));
228
229 assert!(trie.matches(b"topic.foo"));
230 assert!(trie.matches(b"events.bar"));
231 assert!(!trie.matches(b"other.baz"));
232 }
233
234 #[test]
235 fn test_trie_empty_prefix_matches_all() {
236 let mut trie = SubscriptionTrie::new();
237 trie.subscribe(Bytes::new());
238
239 assert!(trie.matches(b"anything"));
240 assert!(trie.matches(b""));
241 }
242
243 #[test]
244 fn test_trie_no_false_prefix_match() {
245 let mut trie = SubscriptionTrie::new();
246 trie.subscribe(Bytes::from_static(b"topic."));
247
248 assert!(!trie.matches(b"topic"));
250 assert!(trie.matches(b"topic."));
251 assert!(trie.matches(b"topic.sub"));
252 }
253
254 #[test]
255 fn test_subscription_event() {
256 let sub = SubscriptionEvent::Subscribe(Bytes::from_static(b"topic"));
257 let msg = sub.to_message();
258
259 assert_eq!(msg[0], 0x01);
260 assert_eq!(&msg[1..], b"topic");
261
262 let parsed = SubscriptionEvent::from_message(&msg).unwrap();
263 assert_eq!(parsed, sub);
264 }
265
266 #[test]
267 fn test_unsubscription_event() {
268 let unsub = SubscriptionEvent::Unsubscribe(Bytes::from_static(b"topic"));
269 let msg = unsub.to_message();
270
271 assert_eq!(msg[0], 0x00);
272 assert_eq!(&msg[1..], b"topic");
273
274 let parsed = SubscriptionEvent::from_message(&msg).unwrap();
275 assert_eq!(parsed, unsub);
276 }
277}