Skip to main content

kora_cdc/
subscription.rs

1//! Lightweight CDC subscriptions with cursor tracking.
2//!
3//! A [`Subscription`] provides a stateful read cursor over a
4//! [`CdcRing`](crate::ring::CdcRing). Each call to
5//! [`Subscription::poll`] returns the next batch of events and advances
6//! the cursor, optionally filtering keys through a glob pattern.
7//!
8//! Multiple subscriptions can be managed together through a
9//! [`SubscriptionManager`], which assigns monotonic IDs and exposes
10//! subscribe / unsubscribe lifecycle operations.
11
12use crate::ring::{CdcEvent, CdcReadResult, CdcRing};
13
14/// A cursor-based CDC subscription.
15///
16/// Holds a read position into a [`CdcRing`](crate::ring::CdcRing) and an
17/// optional glob pattern used to filter events by key. Call [`poll`](Subscription::poll)
18/// to consume the next batch of matching events.
19pub struct Subscription {
20    id: u64,
21    pattern: Option<String>,
22    cursor: u64,
23}
24
25impl Subscription {
26    /// Create a subscription starting from sequence 0.
27    pub fn new(id: u64, pattern: Option<String>) -> Self {
28        Self {
29            id,
30            pattern,
31            cursor: 0,
32        }
33    }
34
35    /// Create a subscription starting from the given sequence number.
36    pub fn with_cursor(id: u64, pattern: Option<String>, cursor: u64) -> Self {
37        Self {
38            id,
39            pattern,
40            cursor,
41        }
42    }
43
44    /// Return the unique subscription ID.
45    pub fn id(&self) -> u64 {
46        self.id
47    }
48
49    /// Return the current read-cursor position (next sequence to consume).
50    pub fn cursor(&self) -> u64 {
51        self.cursor
52    }
53
54    /// Return the glob pattern, if one was set.
55    pub fn pattern(&self) -> Option<&str> {
56        self.pattern.as_deref()
57    }
58
59    /// Read up to `limit` new events from `ring`, advancing the cursor.
60    ///
61    /// When a pattern is set, only events whose key matches the glob are
62    /// included in the returned batch. The cursor still advances past
63    /// non-matching events so they are not re-examined on the next poll.
64    pub fn poll(&mut self, ring: &CdcRing, limit: usize) -> CdcReadResult {
65        let result = ring.read(self.cursor, limit);
66
67        let filtered_events: Vec<CdcEvent> = if let Some(ref pattern) = self.pattern {
68            result
69                .events
70                .into_iter()
71                .filter(|e| glob_match(pattern, &e.key))
72                .collect()
73        } else {
74            result.events
75        };
76
77        self.cursor = result.next_seq;
78
79        CdcReadResult {
80            events: filtered_events,
81            next_seq: result.next_seq,
82            gap: result.gap,
83        }
84    }
85
86    /// Reposition the read cursor to an arbitrary sequence number.
87    pub fn seek(&mut self, seq: u64) {
88        self.cursor = seq;
89    }
90}
91
92/// Registry for multiple CDC subscriptions.
93///
94/// Assigns monotonically increasing IDs and provides lookup, creation, and
95/// removal of [`Subscription`] instances.
96pub struct SubscriptionManager {
97    subscriptions: Vec<Subscription>,
98    next_id: u64,
99}
100
101impl SubscriptionManager {
102    /// Create an empty subscription manager.
103    pub fn new() -> Self {
104        Self {
105            subscriptions: Vec::new(),
106            next_id: 1,
107        }
108    }
109
110    /// Create a subscription starting at sequence 0 and return its ID.
111    pub fn subscribe(&mut self, pattern: Option<String>) -> u64 {
112        let id = self.next_id;
113        self.next_id += 1;
114        self.subscriptions.push(Subscription::new(id, pattern));
115        id
116    }
117
118    /// Create a subscription beginning at `cursor` and return its ID.
119    pub fn subscribe_at(&mut self, pattern: Option<String>, cursor: u64) -> u64 {
120        let id = self.next_id;
121        self.next_id += 1;
122        self.subscriptions
123            .push(Subscription::with_cursor(id, pattern, cursor));
124        id
125    }
126
127    /// Remove a subscription by ID. Returns `true` if it existed.
128    pub fn unsubscribe(&mut self, id: u64) -> bool {
129        let before = self.subscriptions.len();
130        self.subscriptions.retain(|s| s.id != id);
131        self.subscriptions.len() < before
132    }
133
134    /// Look up a subscription by ID, returning a mutable reference.
135    pub fn get_mut(&mut self, id: u64) -> Option<&mut Subscription> {
136        self.subscriptions.iter_mut().find(|s| s.id == id)
137    }
138
139    /// Return the number of active subscriptions.
140    pub fn len(&self) -> usize {
141        self.subscriptions.len()
142    }
143
144    /// Return `true` if there are no active subscriptions.
145    pub fn is_empty(&self) -> bool {
146        self.subscriptions.is_empty()
147    }
148}
149
150impl Default for SubscriptionManager {
151    fn default() -> Self {
152        Self::new()
153    }
154}
155
156/// Match a glob `pattern` against a UTF-8 `key`.
157///
158/// Supports `*` (zero or more characters) and `?` (exactly one character).
159/// Returns `false` for non-UTF-8 keys.
160fn glob_match(pattern: &str, key: &[u8]) -> bool {
161    let key_str = match std::str::from_utf8(key) {
162        Ok(s) => s,
163        Err(_) => return false,
164    };
165    glob_match_str(pattern.as_bytes(), key_str.as_bytes())
166}
167
168fn glob_match_str(pattern: &[u8], text: &[u8]) -> bool {
169    let mut pi = 0;
170    let mut ti = 0;
171    let mut star_pi = usize::MAX;
172    let mut star_ti = 0;
173
174    while ti < text.len() {
175        if pi < pattern.len() && (pattern[pi] == b'?' || pattern[pi] == text[ti]) {
176            pi += 1;
177            ti += 1;
178        } else if pi < pattern.len() && pattern[pi] == b'*' {
179            star_pi = pi;
180            star_ti = ti;
181            pi += 1;
182        } else if star_pi != usize::MAX {
183            pi = star_pi + 1;
184            star_ti += 1;
185            ti = star_ti;
186        } else {
187            return false;
188        }
189    }
190
191    while pi < pattern.len() && pattern[pi] == b'*' {
192        pi += 1;
193    }
194
195    pi == pattern.len()
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201    use crate::ring::CdcOp;
202
203    fn make_ring_with_events(n: usize) -> CdcRing {
204        let mut ring = CdcRing::new(100);
205        for i in 0..n {
206            ring.push(
207                CdcOp::Set,
208                format!("user:{}", i).into_bytes(),
209                Some(format!("val{}", i).into_bytes()),
210                i as u64,
211            );
212        }
213        ring
214    }
215
216    #[test]
217    fn test_subscription_poll() {
218        let ring = make_ring_with_events(5);
219        let mut sub = Subscription::new(1, None);
220
221        let result = sub.poll(&ring, 3);
222        assert_eq!(result.events.len(), 3);
223        assert_eq!(sub.cursor(), 3);
224
225        let result = sub.poll(&ring, 100);
226        assert_eq!(result.events.len(), 2);
227        assert_eq!(sub.cursor(), 5);
228
229        // No more events
230        let result = sub.poll(&ring, 100);
231        assert!(result.events.is_empty());
232    }
233
234    #[test]
235    fn test_subscription_with_pattern() {
236        let mut ring = CdcRing::new(100);
237        ring.push(CdcOp::Set, b"user:1".to_vec(), None, 1);
238        ring.push(CdcOp::Set, b"order:1".to_vec(), None, 2);
239        ring.push(CdcOp::Set, b"user:2".to_vec(), None, 3);
240        ring.push(CdcOp::Set, b"order:2".to_vec(), None, 4);
241
242        let mut sub = Subscription::new(1, Some("user:*".into()));
243        let result = sub.poll(&ring, 100);
244        assert_eq!(result.events.len(), 2);
245        assert_eq!(result.events[0].key, b"user:1");
246        assert_eq!(result.events[1].key, b"user:2");
247    }
248
249    #[test]
250    fn test_subscription_seek() {
251        let ring = make_ring_with_events(10);
252        let mut sub = Subscription::new(1, None);
253        sub.seek(7);
254
255        let result = sub.poll(&ring, 100);
256        assert_eq!(result.events.len(), 3);
257        assert_eq!(result.events[0].seq, 7);
258    }
259
260    #[test]
261    fn test_manager_subscribe_unsubscribe() {
262        let mut mgr = SubscriptionManager::new();
263        let id1 = mgr.subscribe(None);
264        let id2 = mgr.subscribe(Some("user:*".into()));
265
266        assert_eq!(mgr.len(), 2);
267        assert!(mgr.unsubscribe(id1));
268        assert_eq!(mgr.len(), 1);
269        assert!(!mgr.unsubscribe(id1)); // already removed
270        assert!(mgr.unsubscribe(id2));
271        assert!(mgr.is_empty());
272    }
273
274    #[test]
275    fn test_manager_subscribe_at() {
276        let mut mgr = SubscriptionManager::new();
277        let id = mgr.subscribe_at(None, 42);
278        let sub = mgr.get_mut(id).unwrap();
279        assert_eq!(sub.cursor(), 42);
280    }
281
282    #[test]
283    fn test_glob_match_patterns() {
284        assert!(glob_match("*", b"anything"));
285        assert!(glob_match("user:*", b"user:123"));
286        assert!(!glob_match("user:*", b"order:123"));
287        assert!(glob_match("user:?", b"user:1"));
288        assert!(!glob_match("user:?", b"user:12"));
289        assert!(glob_match("*:*", b"foo:bar"));
290        assert!(glob_match("", b""));
291        assert!(!glob_match("", b"x"));
292    }
293}