1use std::collections::HashMap;
7
8#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct Checkpoint {
15 pub stream_id: String,
17 pub partition: u32,
19 pub offset: i64,
21 pub timestamp: i64,
23 pub metadata: HashMap<String, String>,
25}
26
27impl Checkpoint {
28 pub fn new(stream_id: impl Into<String>, partition: u32, offset: i64, timestamp: i64) -> Self {
30 Self {
31 stream_id: stream_id.into(),
32 partition,
33 offset,
34 timestamp,
35 metadata: HashMap::new(),
36 }
37 }
38
39 pub fn with_meta(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
41 self.metadata.insert(key.into(), value.into());
42 self
43 }
44}
45
46#[derive(Debug)]
52pub struct CheckpointStore {
53 checkpoints: HashMap<(String, u32), Checkpoint>,
55 history: Vec<Checkpoint>,
57 max_history: usize,
59}
60
61impl CheckpointStore {
62 pub fn new(max_history: usize) -> Self {
67 Self {
68 checkpoints: HashMap::new(),
69 history: Vec::new(),
70 max_history,
71 }
72 }
73
74 pub fn commit(&mut self, checkpoint: Checkpoint) {
79 let key = (checkpoint.stream_id.clone(), checkpoint.partition);
80 self.checkpoints.insert(key, checkpoint.clone());
81
82 self.history.push(checkpoint);
83 if self.history.len() > self.max_history {
85 let excess = self.history.len() - self.max_history;
86 self.history.drain(0..excess);
87 }
88 }
89
90 pub fn get(&self, stream_id: &str, partition: u32) -> Option<&Checkpoint> {
92 self.checkpoints.get(&(stream_id.to_owned(), partition))
93 }
94
95 pub fn latest_offset(&self, stream_id: &str, partition: u32) -> Option<i64> {
97 self.get(stream_id, partition).map(|c| c.offset)
98 }
99
100 pub fn reset(&mut self, stream_id: &str, partition: u32) {
105 self.checkpoints.remove(&(stream_id.to_owned(), partition));
106 }
107
108 pub fn reset_to(&mut self, stream_id: &str, partition: u32, offset: i64) -> bool {
113 let key = (stream_id.to_owned(), partition);
114 if let Some(cp) = self.checkpoints.get_mut(&key) {
115 cp.offset = offset;
116 true
117 } else {
118 false
119 }
120 }
121
122 pub fn all_streams(&self) -> Vec<&str> {
124 let mut seen: Vec<&str> = Vec::new();
125 for (stream_id, _partition) in self.checkpoints.keys() {
126 let s: &str = stream_id.as_str();
127 if !seen.contains(&s) {
128 seen.push(s);
129 }
130 }
131 seen
132 }
133
134 pub fn partitions(&self, stream_id: &str) -> Vec<u32> {
136 self.checkpoints
137 .keys()
138 .filter(|(s, _)| s == stream_id)
139 .map(|(_, p)| *p)
140 .collect()
141 }
142
143 pub fn history(&self, stream_id: &str, partition: u32) -> Vec<&Checkpoint> {
145 self.history
146 .iter()
147 .filter(|c| c.stream_id == stream_id && c.partition == partition)
148 .collect()
149 }
150
151 pub fn total_committed(&self) -> usize {
153 self.history.len()
154 }
155}
156
157#[cfg(test)]
162mod tests {
163 use super::*;
164
165 fn make_cp(stream: &str, partition: u32, offset: i64, ts: i64) -> Checkpoint {
166 Checkpoint::new(stream, partition, offset, ts)
167 }
168
169 #[test]
172 fn test_new_store_is_empty() {
173 let store = CheckpointStore::new(100);
174 assert_eq!(store.total_committed(), 0);
175 assert!(store.all_streams().is_empty());
176 }
177
178 #[test]
179 fn test_commit_single() {
180 let mut store = CheckpointStore::new(10);
181 store.commit(make_cp("topic-a", 0, 42, 1000));
182 assert_eq!(store.latest_offset("topic-a", 0), Some(42));
183 assert_eq!(store.total_committed(), 1);
184 }
185
186 #[test]
187 fn test_commit_updates_latest() {
188 let mut store = CheckpointStore::new(10);
189 store.commit(make_cp("topic-a", 0, 10, 1000));
190 store.commit(make_cp("topic-a", 0, 20, 2000));
191 assert_eq!(store.latest_offset("topic-a", 0), Some(20));
192 }
193
194 #[test]
195 fn test_commit_multiple_partitions() {
196 let mut store = CheckpointStore::new(10);
197 store.commit(make_cp("t", 0, 5, 100));
198 store.commit(make_cp("t", 1, 15, 200));
199 store.commit(make_cp("t", 2, 25, 300));
200
201 assert_eq!(store.latest_offset("t", 0), Some(5));
202 assert_eq!(store.latest_offset("t", 1), Some(15));
203 assert_eq!(store.latest_offset("t", 2), Some(25));
204 }
205
206 #[test]
207 fn test_get_none_for_unknown() {
208 let store = CheckpointStore::new(10);
209 assert!(store.get("missing", 0).is_none());
210 assert!(store.latest_offset("missing", 0).is_none());
211 }
212
213 #[test]
216 fn test_history_is_ordered() {
217 let mut store = CheckpointStore::new(50);
218 for i in 0..5_i64 {
219 store.commit(make_cp("events", 0, i, i * 100));
220 }
221 let hist = store.history("events", 0);
222 assert_eq!(hist.len(), 5);
223 assert_eq!(hist[0].offset, 0);
225 assert_eq!(hist[4].offset, 4);
226 }
227
228 #[test]
229 fn test_history_bounded_by_max() {
230 let max = 5_usize;
231 let mut store = CheckpointStore::new(max);
232 for i in 0..10_i64 {
233 store.commit(make_cp("stream", 0, i, i));
234 }
235 assert_eq!(store.total_committed(), max);
237 }
238
239 #[test]
240 fn test_history_only_for_matching_partition() {
241 let mut store = CheckpointStore::new(50);
242 store.commit(make_cp("s", 0, 1, 1));
243 store.commit(make_cp("s", 1, 2, 2));
244 store.commit(make_cp("s", 0, 3, 3));
245
246 let h0 = store.history("s", 0);
247 let h1 = store.history("s", 1);
248 assert_eq!(h0.len(), 2);
249 assert_eq!(h1.len(), 1);
250 }
251
252 #[test]
253 fn test_history_empty_for_unknown() {
254 let store = CheckpointStore::new(10);
255 assert!(store.history("none", 0).is_empty());
256 }
257
258 #[test]
261 fn test_reset_removes_checkpoint() {
262 let mut store = CheckpointStore::new(10);
263 store.commit(make_cp("r", 0, 99, 999));
264 store.reset("r", 0);
265 assert!(store.get("r", 0).is_none());
266 }
267
268 #[test]
269 fn test_reset_preserves_history() {
270 let mut store = CheckpointStore::new(10);
271 store.commit(make_cp("r", 0, 10, 1));
272 store.reset("r", 0);
273 assert_eq!(store.history("r", 0).len(), 1);
275 }
276
277 #[test]
278 fn test_reset_nonexistent_is_noop() {
279 let mut store = CheckpointStore::new(10);
280 store.reset("phantom", 99);
282 assert_eq!(store.total_committed(), 0);
283 }
284
285 #[test]
288 fn test_reset_to_existing() {
289 let mut store = CheckpointStore::new(10);
290 store.commit(make_cp("x", 0, 50, 100));
291 let ok = store.reset_to("x", 0, 30);
292 assert!(ok);
293 assert_eq!(store.latest_offset("x", 0), Some(30));
294 }
295
296 #[test]
297 fn test_reset_to_nonexistent_returns_false() {
298 let mut store = CheckpointStore::new(10);
299 let ok = store.reset_to("none", 0, 10);
300 assert!(!ok);
301 }
302
303 #[test]
304 fn test_reset_to_negative_offset() {
305 let mut store = CheckpointStore::new(10);
306 store.commit(make_cp("neg", 0, 100, 1));
307 let ok = store.reset_to("neg", 0, -1);
308 assert!(ok);
309 assert_eq!(store.latest_offset("neg", 0), Some(-1));
310 }
311
312 #[test]
315 fn test_all_streams_unique() {
316 let mut store = CheckpointStore::new(20);
317 store.commit(make_cp("alpha", 0, 1, 1));
318 store.commit(make_cp("alpha", 1, 2, 2));
319 store.commit(make_cp("beta", 0, 3, 3));
320
321 let mut streams = store.all_streams();
322 streams.sort_unstable();
323 assert_eq!(streams, vec!["alpha", "beta"]);
324 }
325
326 #[test]
327 fn test_partitions_for_stream() {
328 let mut store = CheckpointStore::new(20);
329 store.commit(make_cp("p", 0, 1, 1));
330 store.commit(make_cp("p", 2, 2, 2));
331 store.commit(make_cp("p", 5, 3, 3));
332
333 let mut parts = store.partitions("p");
334 parts.sort_unstable();
335 assert_eq!(parts, vec![0, 2, 5]);
336 }
337
338 #[test]
339 fn test_partitions_empty_for_unknown_stream() {
340 let store = CheckpointStore::new(10);
341 assert!(store.partitions("unknown").is_empty());
342 }
343
344 #[test]
347 fn test_checkpoint_metadata() {
348 let cp = Checkpoint::new("stream", 0, 42, 1000)
349 .with_meta("consumer_group", "grp1")
350 .with_meta("host", "worker-01");
351 assert_eq!(cp.metadata["consumer_group"], "grp1");
352 assert_eq!(cp.metadata["host"], "worker-01");
353 }
354
355 #[test]
356 fn test_metadata_stored_in_checkpoint() {
357 let mut store = CheckpointStore::new(10);
358 let cp = Checkpoint::new("s", 0, 1, 100).with_meta("key", "val");
359 store.commit(cp);
360 let stored = store.get("s", 0).expect("checkpoint should exist");
361 assert_eq!(stored.metadata["key"], "val");
362 }
363
364 #[test]
367 fn test_multiple_streams_independent() {
368 let mut store = CheckpointStore::new(50);
369 store.commit(make_cp("stream-1", 0, 100, 1000));
370 store.commit(make_cp("stream-2", 0, 200, 2000));
371 store.commit(make_cp("stream-1", 0, 150, 3000));
372
373 assert_eq!(store.latest_offset("stream-1", 0), Some(150));
374 assert_eq!(store.latest_offset("stream-2", 0), Some(200));
375 }
376
377 #[test]
378 fn test_streams_do_not_share_history() {
379 let mut store = CheckpointStore::new(50);
380 store.commit(make_cp("a", 0, 1, 1));
381 store.commit(make_cp("b", 0, 2, 2));
382
383 let ha = store.history("a", 0);
384 let hb = store.history("b", 0);
385 assert_eq!(ha.len(), 1);
386 assert_eq!(hb.len(), 1);
387 assert_eq!(ha[0].stream_id, "a");
388 assert_eq!(hb[0].stream_id, "b");
389 }
390
391 #[test]
394 fn test_zero_max_history() {
395 let mut store = CheckpointStore::new(0);
396 store.commit(make_cp("z", 0, 1, 1));
397 assert_eq!(store.latest_offset("z", 0), Some(1));
399 assert_eq!(store.total_committed(), 0);
400 }
401
402 #[test]
403 fn test_large_offset() {
404 let mut store = CheckpointStore::new(10);
405 store.commit(make_cp("huge", 0, i64::MAX, i64::MAX));
406 assert_eq!(store.latest_offset("huge", 0), Some(i64::MAX));
407 }
408
409 #[test]
410 fn test_checkpoint_equality() {
411 let c1 = make_cp("s", 0, 10, 100);
412 let c2 = make_cp("s", 0, 10, 100);
413 assert_eq!(c1, c2);
414 }
415
416 #[test]
417 fn test_history_across_partitions_in_same_store() {
418 let mut store = CheckpointStore::new(50);
419 for p in 0..3_u32 {
420 for o in 0..3_i64 {
421 store.commit(make_cp("multi", p, o, (p as i64) * 10 + o));
422 }
423 }
424 assert_eq!(store.total_committed(), 9);
426 for p in 0..3_u32 {
427 assert_eq!(store.history("multi", p).len(), 3);
428 }
429 }
430
431 #[test]
432 fn test_many_commits_bounded_history() {
433 let mut store = CheckpointStore::new(20);
434 for i in 0..100_i64 {
435 store.commit(make_cp("bounded", 0, i, i));
436 }
437 assert!(store.total_committed() <= 20);
438 assert_eq!(store.latest_offset("bounded", 0), Some(99));
440 }
441
442 #[test]
443 fn test_all_streams_after_reset() {
444 let mut store = CheckpointStore::new(20);
445 store.commit(make_cp("a", 0, 1, 1));
446 store.commit(make_cp("b", 0, 2, 2));
447 store.reset("a", 0);
448
449 let streams = store.all_streams();
450 assert!(!streams.contains(&"a"));
451 assert!(streams.contains(&"b"));
452 }
453
454 #[test]
455 fn test_partitions_after_reset() {
456 let mut store = CheckpointStore::new(20);
457 store.commit(make_cp("s", 0, 1, 1));
458 store.commit(make_cp("s", 1, 2, 2));
459 store.reset("s", 0);
460
461 let parts = store.partitions("s");
462 assert!(!parts.contains(&0));
463 assert!(parts.contains(&1));
464 }
465
466 #[test]
467 fn test_checkpoint_new_and_clone() {
468 let cp = Checkpoint::new("s", 3, 77, 999);
469 let cp2 = cp.clone();
470 assert_eq!(cp, cp2);
471 }
472
473 #[test]
474 fn test_commit_same_offset_twice() {
475 let mut store = CheckpointStore::new(10);
476 store.commit(make_cp("dup", 0, 5, 1));
477 store.commit(make_cp("dup", 0, 5, 2));
478 assert_eq!(store.latest_offset("dup", 0), Some(5));
480 assert_eq!(store.total_committed(), 2);
481 }
482
483 #[test]
484 fn test_reset_to_zero() {
485 let mut store = CheckpointStore::new(10);
486 store.commit(make_cp("zero", 0, 50, 1));
487 assert!(store.reset_to("zero", 0, 0));
488 assert_eq!(store.latest_offset("zero", 0), Some(0));
489 }
490}