1use crate::ring::{CdcEvent, CdcReadResult, CdcRing};
13
14pub struct Subscription {
20 id: u64,
21 pattern: Option<String>,
22 cursor: u64,
23}
24
25impl Subscription {
26 pub fn new(id: u64, pattern: Option<String>) -> Self {
28 Self {
29 id,
30 pattern,
31 cursor: 0,
32 }
33 }
34
35 pub fn with_cursor(id: u64, pattern: Option<String>, cursor: u64) -> Self {
37 Self {
38 id,
39 pattern,
40 cursor,
41 }
42 }
43
44 pub fn id(&self) -> u64 {
46 self.id
47 }
48
49 pub fn cursor(&self) -> u64 {
51 self.cursor
52 }
53
54 pub fn pattern(&self) -> Option<&str> {
56 self.pattern.as_deref()
57 }
58
59 pub fn poll(&mut self, ring: &CdcRing, limit: usize) -> CdcReadResult {
65 let result = ring.read(self.cursor, limit);
66
67 let filtered_events: Vec<CdcEvent> = if let Some(ref pattern) = self.pattern {
68 result
69 .events
70 .into_iter()
71 .filter(|e| glob_match(pattern, &e.key))
72 .collect()
73 } else {
74 result.events
75 };
76
77 self.cursor = result.next_seq;
78
79 CdcReadResult {
80 events: filtered_events,
81 next_seq: result.next_seq,
82 gap: result.gap,
83 }
84 }
85
86 pub fn seek(&mut self, seq: u64) {
88 self.cursor = seq;
89 }
90}
91
92pub struct SubscriptionManager {
97 subscriptions: Vec<Subscription>,
98 next_id: u64,
99}
100
101impl SubscriptionManager {
102 pub fn new() -> Self {
104 Self {
105 subscriptions: Vec::new(),
106 next_id: 1,
107 }
108 }
109
110 pub fn subscribe(&mut self, pattern: Option<String>) -> u64 {
112 let id = self.next_id;
113 self.next_id += 1;
114 self.subscriptions.push(Subscription::new(id, pattern));
115 id
116 }
117
118 pub fn subscribe_at(&mut self, pattern: Option<String>, cursor: u64) -> u64 {
120 let id = self.next_id;
121 self.next_id += 1;
122 self.subscriptions
123 .push(Subscription::with_cursor(id, pattern, cursor));
124 id
125 }
126
127 pub fn unsubscribe(&mut self, id: u64) -> bool {
129 let before = self.subscriptions.len();
130 self.subscriptions.retain(|s| s.id != id);
131 self.subscriptions.len() < before
132 }
133
134 pub fn get_mut(&mut self, id: u64) -> Option<&mut Subscription> {
136 self.subscriptions.iter_mut().find(|s| s.id == id)
137 }
138
139 pub fn len(&self) -> usize {
141 self.subscriptions.len()
142 }
143
144 pub fn is_empty(&self) -> bool {
146 self.subscriptions.is_empty()
147 }
148}
149
150impl Default for SubscriptionManager {
151 fn default() -> Self {
152 Self::new()
153 }
154}
155
156fn glob_match(pattern: &str, key: &[u8]) -> bool {
161 let key_str = match std::str::from_utf8(key) {
162 Ok(s) => s,
163 Err(_) => return false,
164 };
165 glob_match_str(pattern.as_bytes(), key_str.as_bytes())
166}
167
168fn glob_match_str(pattern: &[u8], text: &[u8]) -> bool {
169 let mut pi = 0;
170 let mut ti = 0;
171 let mut star_pi = usize::MAX;
172 let mut star_ti = 0;
173
174 while ti < text.len() {
175 if pi < pattern.len() && (pattern[pi] == b'?' || pattern[pi] == text[ti]) {
176 pi += 1;
177 ti += 1;
178 } else if pi < pattern.len() && pattern[pi] == b'*' {
179 star_pi = pi;
180 star_ti = ti;
181 pi += 1;
182 } else if star_pi != usize::MAX {
183 pi = star_pi + 1;
184 star_ti += 1;
185 ti = star_ti;
186 } else {
187 return false;
188 }
189 }
190
191 while pi < pattern.len() && pattern[pi] == b'*' {
192 pi += 1;
193 }
194
195 pi == pattern.len()
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201 use crate::ring::CdcOp;
202
203 fn make_ring_with_events(n: usize) -> CdcRing {
204 let mut ring = CdcRing::new(100);
205 for i in 0..n {
206 ring.push(
207 CdcOp::Set,
208 format!("user:{}", i).into_bytes(),
209 Some(format!("val{}", i).into_bytes()),
210 i as u64,
211 );
212 }
213 ring
214 }
215
216 #[test]
217 fn test_subscription_poll() {
218 let ring = make_ring_with_events(5);
219 let mut sub = Subscription::new(1, None);
220
221 let result = sub.poll(&ring, 3);
222 assert_eq!(result.events.len(), 3);
223 assert_eq!(sub.cursor(), 3);
224
225 let result = sub.poll(&ring, 100);
226 assert_eq!(result.events.len(), 2);
227 assert_eq!(sub.cursor(), 5);
228
229 let result = sub.poll(&ring, 100);
231 assert!(result.events.is_empty());
232 }
233
234 #[test]
235 fn test_subscription_with_pattern() {
236 let mut ring = CdcRing::new(100);
237 ring.push(CdcOp::Set, b"user:1".to_vec(), None, 1);
238 ring.push(CdcOp::Set, b"order:1".to_vec(), None, 2);
239 ring.push(CdcOp::Set, b"user:2".to_vec(), None, 3);
240 ring.push(CdcOp::Set, b"order:2".to_vec(), None, 4);
241
242 let mut sub = Subscription::new(1, Some("user:*".into()));
243 let result = sub.poll(&ring, 100);
244 assert_eq!(result.events.len(), 2);
245 assert_eq!(result.events[0].key, b"user:1");
246 assert_eq!(result.events[1].key, b"user:2");
247 }
248
249 #[test]
250 fn test_subscription_seek() {
251 let ring = make_ring_with_events(10);
252 let mut sub = Subscription::new(1, None);
253 sub.seek(7);
254
255 let result = sub.poll(&ring, 100);
256 assert_eq!(result.events.len(), 3);
257 assert_eq!(result.events[0].seq, 7);
258 }
259
260 #[test]
261 fn test_manager_subscribe_unsubscribe() {
262 let mut mgr = SubscriptionManager::new();
263 let id1 = mgr.subscribe(None);
264 let id2 = mgr.subscribe(Some("user:*".into()));
265
266 assert_eq!(mgr.len(), 2);
267 assert!(mgr.unsubscribe(id1));
268 assert_eq!(mgr.len(), 1);
269 assert!(!mgr.unsubscribe(id1)); assert!(mgr.unsubscribe(id2));
271 assert!(mgr.is_empty());
272 }
273
274 #[test]
275 fn test_manager_subscribe_at() {
276 let mut mgr = SubscriptionManager::new();
277 let id = mgr.subscribe_at(None, 42);
278 let sub = mgr.get_mut(id).unwrap();
279 assert_eq!(sub.cursor(), 42);
280 }
281
282 #[test]
283 fn test_glob_match_patterns() {
284 assert!(glob_match("*", b"anything"));
285 assert!(glob_match("user:*", b"user:123"));
286 assert!(!glob_match("user:*", b"order:123"));
287 assert!(glob_match("user:?", b"user:1"));
288 assert!(!glob_match("user:?", b"user:12"));
289 assert!(glob_match("*:*", b"foo:bar"));
290 assert!(glob_match("", b""));
291 assert!(!glob_match("", b"x"));
292 }
293}