Skip to main content

oxirs_stream/
stream_checkpoint.rs

1//! Stream checkpoint/offset tracking for at-least-once delivery.
2//!
3//! Provides `CheckpointStore` which commits consumer offsets per (stream_id, partition)
4//! pair and keeps a bounded history for replay and auditing.
5
6use std::collections::HashMap;
7
8// ──────────────────────────────────────────────────────────────────────────────
9// Types
10// ──────────────────────────────────────────────────────────────────────────────
11
12/// A committed offset for one stream partition.
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct Checkpoint {
15    /// Logical stream name.
16    pub stream_id: String,
17    /// Partition index within the stream.
18    pub partition: u32,
19    /// Consumed-up-to offset (inclusive).
20    pub offset: i64,
21    /// Wall-clock timestamp when the checkpoint was committed (epoch millis).
22    pub timestamp: i64,
23    /// Arbitrary application metadata (e.g. consumer group, host, version).
24    pub metadata: HashMap<String, String>,
25}
26
27impl Checkpoint {
28    /// Create a minimal checkpoint with no metadata.
29    pub fn new(stream_id: impl Into<String>, partition: u32, offset: i64, timestamp: i64) -> Self {
30        Self {
31            stream_id: stream_id.into(),
32            partition,
33            offset,
34            timestamp,
35            metadata: HashMap::new(),
36        }
37    }
38
39    /// Builder helper: attach a single metadata entry.
40    pub fn with_meta(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
41        self.metadata.insert(key.into(), value.into());
42        self
43    }
44}
45
46/// Persistent checkpoint store with bounded history per (stream, partition) key.
47///
48/// The store always keeps the *latest* checkpoint as the authoritative offset
49/// and additionally retains up to `max_history` older entries for auditing or
50/// replay purposes.
51#[derive(Debug)]
52pub struct CheckpointStore {
53    /// Latest checkpoint per (stream_id, partition).
54    checkpoints: HashMap<(String, u32), Checkpoint>,
55    /// Ordered commit history (newest last).
56    history: Vec<Checkpoint>,
57    /// Maximum entries kept in `history`.
58    max_history: usize,
59}
60
61impl CheckpointStore {
62    /// Create a new store.
63    ///
64    /// # Parameters
65    /// - `max_history`: maximum number of historical checkpoints retained.
66    pub fn new(max_history: usize) -> Self {
67        Self {
68            checkpoints: HashMap::new(),
69            history: Vec::new(),
70            max_history,
71        }
72    }
73
74    /// Commit a checkpoint.
75    ///
76    /// Updates the latest offset for the (stream, partition) key and appends
77    /// the entry to the rolling history buffer.
78    pub fn commit(&mut self, checkpoint: Checkpoint) {
79        let key = (checkpoint.stream_id.clone(), checkpoint.partition);
80        self.checkpoints.insert(key, checkpoint.clone());
81
82        self.history.push(checkpoint);
83        // Trim history to the configured limit.
84        if self.history.len() > self.max_history {
85            let excess = self.history.len() - self.max_history;
86            self.history.drain(0..excess);
87        }
88    }
89
90    /// Return a reference to the latest checkpoint for a (stream, partition) pair.
91    pub fn get(&self, stream_id: &str, partition: u32) -> Option<&Checkpoint> {
92        self.checkpoints.get(&(stream_id.to_owned(), partition))
93    }
94
95    /// Return the latest committed offset, if any.
96    pub fn latest_offset(&self, stream_id: &str, partition: u32) -> Option<i64> {
97        self.get(stream_id, partition).map(|c| c.offset)
98    }
99
100    /// Remove the checkpoint for the given (stream, partition) key.
101    ///
102    /// Historical entries for this key are *not* removed so that auditing
103    /// still works.
104    pub fn reset(&mut self, stream_id: &str, partition: u32) {
105        self.checkpoints.remove(&(stream_id.to_owned(), partition));
106    }
107
108    /// Roll back the checkpoint to a specific offset.
109    ///
110    /// Returns `true` if a checkpoint existed and was updated, `false` otherwise.
111    /// Historical entries are preserved unchanged.
112    pub fn reset_to(&mut self, stream_id: &str, partition: u32, offset: i64) -> bool {
113        let key = (stream_id.to_owned(), partition);
114        if let Some(cp) = self.checkpoints.get_mut(&key) {
115            cp.offset = offset;
116            true
117        } else {
118            false
119        }
120    }
121
122    /// Return all stream IDs that have at least one active checkpoint.
123    pub fn all_streams(&self) -> Vec<&str> {
124        let mut seen: Vec<&str> = Vec::new();
125        for (stream_id, _partition) in self.checkpoints.keys() {
126            let s: &str = stream_id.as_str();
127            if !seen.contains(&s) {
128                seen.push(s);
129            }
130        }
131        seen
132    }
133
134    /// Return all active partitions for a given stream.
135    pub fn partitions(&self, stream_id: &str) -> Vec<u32> {
136        self.checkpoints
137            .keys()
138            .filter(|(s, _)| s == stream_id)
139            .map(|(_, p)| *p)
140            .collect()
141    }
142
143    /// Return historical checkpoints for a (stream, partition) pair, oldest first.
144    pub fn history(&self, stream_id: &str, partition: u32) -> Vec<&Checkpoint> {
145        self.history
146            .iter()
147            .filter(|c| c.stream_id == stream_id && c.partition == partition)
148            .collect()
149    }
150
151    /// Return the total number of commits across all partitions and streams.
152    pub fn total_committed(&self) -> usize {
153        self.history.len()
154    }
155}
156
157// ──────────────────────────────────────────────────────────────────────────────
158// Tests
159// ──────────────────────────────────────────────────────────────────────────────
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164
165    fn make_cp(stream: &str, partition: u32, offset: i64, ts: i64) -> Checkpoint {
166        Checkpoint::new(stream, partition, offset, ts)
167    }
168
169    // ── Basic construction ────────────────────────────────────────────────────
170
171    #[test]
172    fn test_new_store_is_empty() {
173        let store = CheckpointStore::new(100);
174        assert_eq!(store.total_committed(), 0);
175        assert!(store.all_streams().is_empty());
176    }
177
178    #[test]
179    fn test_commit_single() {
180        let mut store = CheckpointStore::new(10);
181        store.commit(make_cp("topic-a", 0, 42, 1000));
182        assert_eq!(store.latest_offset("topic-a", 0), Some(42));
183        assert_eq!(store.total_committed(), 1);
184    }
185
186    #[test]
187    fn test_commit_updates_latest() {
188        let mut store = CheckpointStore::new(10);
189        store.commit(make_cp("topic-a", 0, 10, 1000));
190        store.commit(make_cp("topic-a", 0, 20, 2000));
191        assert_eq!(store.latest_offset("topic-a", 0), Some(20));
192    }
193
194    #[test]
195    fn test_commit_multiple_partitions() {
196        let mut store = CheckpointStore::new(10);
197        store.commit(make_cp("t", 0, 5, 100));
198        store.commit(make_cp("t", 1, 15, 200));
199        store.commit(make_cp("t", 2, 25, 300));
200
201        assert_eq!(store.latest_offset("t", 0), Some(5));
202        assert_eq!(store.latest_offset("t", 1), Some(15));
203        assert_eq!(store.latest_offset("t", 2), Some(25));
204    }
205
206    #[test]
207    fn test_get_none_for_unknown() {
208        let store = CheckpointStore::new(10);
209        assert!(store.get("missing", 0).is_none());
210        assert!(store.latest_offset("missing", 0).is_none());
211    }
212
213    // ── History ───────────────────────────────────────────────────────────────
214
215    #[test]
216    fn test_history_is_ordered() {
217        let mut store = CheckpointStore::new(50);
218        for i in 0..5_i64 {
219            store.commit(make_cp("events", 0, i, i * 100));
220        }
221        let hist = store.history("events", 0);
222        assert_eq!(hist.len(), 5);
223        // Oldest first
224        assert_eq!(hist[0].offset, 0);
225        assert_eq!(hist[4].offset, 4);
226    }
227
228    #[test]
229    fn test_history_bounded_by_max() {
230        let max = 5_usize;
231        let mut store = CheckpointStore::new(max);
232        for i in 0..10_i64 {
233            store.commit(make_cp("stream", 0, i, i));
234        }
235        // total_committed returns history len, bounded by max_history
236        assert_eq!(store.total_committed(), max);
237    }
238
239    #[test]
240    fn test_history_only_for_matching_partition() {
241        let mut store = CheckpointStore::new(50);
242        store.commit(make_cp("s", 0, 1, 1));
243        store.commit(make_cp("s", 1, 2, 2));
244        store.commit(make_cp("s", 0, 3, 3));
245
246        let h0 = store.history("s", 0);
247        let h1 = store.history("s", 1);
248        assert_eq!(h0.len(), 2);
249        assert_eq!(h1.len(), 1);
250    }
251
252    #[test]
253    fn test_history_empty_for_unknown() {
254        let store = CheckpointStore::new(10);
255        assert!(store.history("none", 0).is_empty());
256    }
257
258    // ── Reset ─────────────────────────────────────────────────────────────────
259
260    #[test]
261    fn test_reset_removes_checkpoint() {
262        let mut store = CheckpointStore::new(10);
263        store.commit(make_cp("r", 0, 99, 999));
264        store.reset("r", 0);
265        assert!(store.get("r", 0).is_none());
266    }
267
268    #[test]
269    fn test_reset_preserves_history() {
270        let mut store = CheckpointStore::new(10);
271        store.commit(make_cp("r", 0, 10, 1));
272        store.reset("r", 0);
273        // History still contains the committed entry
274        assert_eq!(store.history("r", 0).len(), 1);
275    }
276
277    #[test]
278    fn test_reset_nonexistent_is_noop() {
279        let mut store = CheckpointStore::new(10);
280        // Should not panic
281        store.reset("phantom", 99);
282        assert_eq!(store.total_committed(), 0);
283    }
284
285    // ── Reset-to ──────────────────────────────────────────────────────────────
286
287    #[test]
288    fn test_reset_to_existing() {
289        let mut store = CheckpointStore::new(10);
290        store.commit(make_cp("x", 0, 50, 100));
291        let ok = store.reset_to("x", 0, 30);
292        assert!(ok);
293        assert_eq!(store.latest_offset("x", 0), Some(30));
294    }
295
296    #[test]
297    fn test_reset_to_nonexistent_returns_false() {
298        let mut store = CheckpointStore::new(10);
299        let ok = store.reset_to("none", 0, 10);
300        assert!(!ok);
301    }
302
303    #[test]
304    fn test_reset_to_negative_offset() {
305        let mut store = CheckpointStore::new(10);
306        store.commit(make_cp("neg", 0, 100, 1));
307        let ok = store.reset_to("neg", 0, -1);
308        assert!(ok);
309        assert_eq!(store.latest_offset("neg", 0), Some(-1));
310    }
311
312    // ── All streams / partitions ───────────────────────────────────────────────
313
314    #[test]
315    fn test_all_streams_unique() {
316        let mut store = CheckpointStore::new(20);
317        store.commit(make_cp("alpha", 0, 1, 1));
318        store.commit(make_cp("alpha", 1, 2, 2));
319        store.commit(make_cp("beta", 0, 3, 3));
320
321        let mut streams = store.all_streams();
322        streams.sort_unstable();
323        assert_eq!(streams, vec!["alpha", "beta"]);
324    }
325
326    #[test]
327    fn test_partitions_for_stream() {
328        let mut store = CheckpointStore::new(20);
329        store.commit(make_cp("p", 0, 1, 1));
330        store.commit(make_cp("p", 2, 2, 2));
331        store.commit(make_cp("p", 5, 3, 3));
332
333        let mut parts = store.partitions("p");
334        parts.sort_unstable();
335        assert_eq!(parts, vec![0, 2, 5]);
336    }
337
338    #[test]
339    fn test_partitions_empty_for_unknown_stream() {
340        let store = CheckpointStore::new(10);
341        assert!(store.partitions("unknown").is_empty());
342    }
343
344    // ── Metadata ──────────────────────────────────────────────────────────────
345
346    #[test]
347    fn test_checkpoint_metadata() {
348        let cp = Checkpoint::new("stream", 0, 42, 1000)
349            .with_meta("consumer_group", "grp1")
350            .with_meta("host", "worker-01");
351        assert_eq!(cp.metadata["consumer_group"], "grp1");
352        assert_eq!(cp.metadata["host"], "worker-01");
353    }
354
355    #[test]
356    fn test_metadata_stored_in_checkpoint() {
357        let mut store = CheckpointStore::new(10);
358        let cp = Checkpoint::new("s", 0, 1, 100).with_meta("key", "val");
359        store.commit(cp);
360        let stored = store.get("s", 0).expect("checkpoint should exist");
361        assert_eq!(stored.metadata["key"], "val");
362    }
363
364    // ── Multi-stream isolation ────────────────────────────────────────────────
365
366    #[test]
367    fn test_multiple_streams_independent() {
368        let mut store = CheckpointStore::new(50);
369        store.commit(make_cp("stream-1", 0, 100, 1000));
370        store.commit(make_cp("stream-2", 0, 200, 2000));
371        store.commit(make_cp("stream-1", 0, 150, 3000));
372
373        assert_eq!(store.latest_offset("stream-1", 0), Some(150));
374        assert_eq!(store.latest_offset("stream-2", 0), Some(200));
375    }
376
377    #[test]
378    fn test_streams_do_not_share_history() {
379        let mut store = CheckpointStore::new(50);
380        store.commit(make_cp("a", 0, 1, 1));
381        store.commit(make_cp("b", 0, 2, 2));
382
383        let ha = store.history("a", 0);
384        let hb = store.history("b", 0);
385        assert_eq!(ha.len(), 1);
386        assert_eq!(hb.len(), 1);
387        assert_eq!(ha[0].stream_id, "a");
388        assert_eq!(hb[0].stream_id, "b");
389    }
390
391    // ── Edge cases ────────────────────────────────────────────────────────────
392
393    #[test]
394    fn test_zero_max_history() {
395        let mut store = CheckpointStore::new(0);
396        store.commit(make_cp("z", 0, 1, 1));
397        // No history is kept but checkpoint is still committed
398        assert_eq!(store.latest_offset("z", 0), Some(1));
399        assert_eq!(store.total_committed(), 0);
400    }
401
402    #[test]
403    fn test_large_offset() {
404        let mut store = CheckpointStore::new(10);
405        store.commit(make_cp("huge", 0, i64::MAX, i64::MAX));
406        assert_eq!(store.latest_offset("huge", 0), Some(i64::MAX));
407    }
408
409    #[test]
410    fn test_checkpoint_equality() {
411        let c1 = make_cp("s", 0, 10, 100);
412        let c2 = make_cp("s", 0, 10, 100);
413        assert_eq!(c1, c2);
414    }
415
416    #[test]
417    fn test_history_across_partitions_in_same_store() {
418        let mut store = CheckpointStore::new(50);
419        for p in 0..3_u32 {
420            for o in 0..3_i64 {
421                store.commit(make_cp("multi", p, o, (p as i64) * 10 + o));
422            }
423        }
424        // 3 partitions × 3 offsets = 9 total
425        assert_eq!(store.total_committed(), 9);
426        for p in 0..3_u32 {
427            assert_eq!(store.history("multi", p).len(), 3);
428        }
429    }
430
431    #[test]
432    fn test_many_commits_bounded_history() {
433        let mut store = CheckpointStore::new(20);
434        for i in 0..100_i64 {
435            store.commit(make_cp("bounded", 0, i, i));
436        }
437        assert!(store.total_committed() <= 20);
438        // Latest should still be correct
439        assert_eq!(store.latest_offset("bounded", 0), Some(99));
440    }
441
442    #[test]
443    fn test_all_streams_after_reset() {
444        let mut store = CheckpointStore::new(20);
445        store.commit(make_cp("a", 0, 1, 1));
446        store.commit(make_cp("b", 0, 2, 2));
447        store.reset("a", 0);
448
449        let streams = store.all_streams();
450        assert!(!streams.contains(&"a"));
451        assert!(streams.contains(&"b"));
452    }
453
454    #[test]
455    fn test_partitions_after_reset() {
456        let mut store = CheckpointStore::new(20);
457        store.commit(make_cp("s", 0, 1, 1));
458        store.commit(make_cp("s", 1, 2, 2));
459        store.reset("s", 0);
460
461        let parts = store.partitions("s");
462        assert!(!parts.contains(&0));
463        assert!(parts.contains(&1));
464    }
465
466    #[test]
467    fn test_checkpoint_new_and_clone() {
468        let cp = Checkpoint::new("s", 3, 77, 999);
469        let cp2 = cp.clone();
470        assert_eq!(cp, cp2);
471    }
472
473    #[test]
474    fn test_commit_same_offset_twice() {
475        let mut store = CheckpointStore::new(10);
476        store.commit(make_cp("dup", 0, 5, 1));
477        store.commit(make_cp("dup", 0, 5, 2));
478        // Should still work; latest offset unchanged
479        assert_eq!(store.latest_offset("dup", 0), Some(5));
480        assert_eq!(store.total_committed(), 2);
481    }
482
483    #[test]
484    fn test_reset_to_zero() {
485        let mut store = CheckpointStore::new(10);
486        store.commit(make_cp("zero", 0, 50, 1));
487        assert!(store.reset_to("zero", 0, 0));
488        assert_eq!(store.latest_offset("zero", 0), Some(0));
489    }
490}