1use serde::Serialize;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
10pub enum CheckpointMode {
11 Passive,
13 Full,
15 Restart,
17 Truncate,
19}
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub struct CheckpointState {
24 pub total_frames: u32,
26 pub backfilled_frames: u32,
28 pub oldest_reader_frame: Option<u32>,
32}
33
34impl CheckpointState {
35 #[must_use]
37 pub fn normalized(self) -> Self {
38 let total_frames = self.total_frames;
39 let backfilled_frames = self.backfilled_frames.min(total_frames);
40 let oldest_reader_frame = self
41 .oldest_reader_frame
42 .map(|frame| frame.min(total_frames));
43 Self {
44 total_frames,
45 backfilled_frames,
46 oldest_reader_frame,
47 }
48 }
49
50 #[must_use]
52 pub fn remaining_frames(self) -> u32 {
53 self.total_frames.saturating_sub(self.backfilled_frames)
54 }
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub struct CheckpointPlan {
60 pub mode: CheckpointMode,
62 pub frames_to_backfill: u32,
64 pub progress: CheckpointProgress,
66 pub blocked_by_readers: bool,
68 pub post_action: CheckpointPostAction,
70}
71
72impl CheckpointPlan {
73 #[must_use]
75 pub const fn completes_checkpoint(self) -> bool {
76 matches!(self.progress, CheckpointProgress::Complete)
77 }
78
79 #[must_use]
81 pub const fn should_reset_wal(self) -> bool {
82 matches!(self.post_action, CheckpointPostAction::ResetWal)
83 }
84
85 #[must_use]
87 pub const fn should_truncate_wal(self) -> bool {
88 matches!(self.post_action, CheckpointPostAction::TruncateWal)
89 }
90}
91
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub enum CheckpointProgress {
95 Partial,
96 Complete,
97}
98
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum CheckpointPostAction {
102 None,
103 ResetWal,
104 TruncateWal,
105}
106
107#[must_use]
109pub fn plan_checkpoint(mode: CheckpointMode, state: CheckpointState) -> CheckpointPlan {
110 let state = state.normalized();
111 let remaining_frames = state.remaining_frames();
112 let has_active_reader = state.oldest_reader_frame.is_some();
113 let reader_limit = state.oldest_reader_frame.unwrap_or(state.total_frames);
114 let reader_eligible = reader_limit.saturating_sub(state.backfilled_frames);
115
116 match mode {
117 CheckpointMode::Passive => {
118 let frames_to_backfill = reader_eligible.min(remaining_frames);
119 CheckpointPlan {
120 mode,
121 frames_to_backfill,
122 progress: completion_for(frames_to_backfill, remaining_frames),
123 blocked_by_readers: false,
124 post_action: CheckpointPostAction::None,
125 }
126 }
127 CheckpointMode::Full => {
128 let frames_to_backfill = reader_eligible.min(remaining_frames);
129 let progress = completion_for(frames_to_backfill, remaining_frames);
130 CheckpointPlan {
131 mode,
132 frames_to_backfill,
133 progress,
134 blocked_by_readers: matches!(progress, CheckpointProgress::Partial),
135 post_action: CheckpointPostAction::None,
136 }
137 }
138 CheckpointMode::Restart => {
139 let frames_to_backfill = reader_eligible.min(remaining_frames);
140 let progress = completion_for(frames_to_backfill, remaining_frames);
141 let post_action = if matches!(progress, CheckpointProgress::Complete)
142 && !has_active_reader
143 && state.total_frames > 0
144 {
145 CheckpointPostAction::ResetWal
146 } else {
147 CheckpointPostAction::None
148 };
149 CheckpointPlan {
150 mode,
151 frames_to_backfill,
152 progress,
153 blocked_by_readers: has_active_reader,
154 post_action,
155 }
156 }
157 CheckpointMode::Truncate => {
158 let frames_to_backfill = reader_eligible.min(remaining_frames);
159 let progress = completion_for(frames_to_backfill, remaining_frames);
160 let post_action = if matches!(progress, CheckpointProgress::Complete)
161 && !has_active_reader
162 && state.total_frames > 0
163 {
164 CheckpointPostAction::TruncateWal
165 } else {
166 CheckpointPostAction::None
167 };
168 CheckpointPlan {
169 mode,
170 frames_to_backfill,
171 progress,
172 blocked_by_readers: has_active_reader,
173 post_action,
174 }
175 }
176 }
177}
178
179#[must_use]
180const fn completion_for(frames_to_backfill: u32, remaining_frames: u32) -> CheckpointProgress {
181 if frames_to_backfill == remaining_frames {
182 CheckpointProgress::Complete
183 } else {
184 CheckpointProgress::Partial
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use super::{CheckpointMode, CheckpointState, plan_checkpoint};
191
192 #[test]
193 fn test_passive_respects_reader_limit() {
194 let plan = plan_checkpoint(
195 CheckpointMode::Passive,
196 CheckpointState {
197 total_frames: 100,
198 backfilled_frames: 40,
199 oldest_reader_frame: Some(65),
200 },
201 );
202
203 assert_eq!(plan.frames_to_backfill, 25);
204 assert!(!plan.completes_checkpoint());
205 assert!(!plan.blocked_by_readers);
206 assert!(!plan.should_reset_wal());
207 assert!(!plan.should_truncate_wal());
208 }
209
210 #[test]
211 fn test_full_marks_blocked_when_reader_pins_tail() {
212 let plan = plan_checkpoint(
213 CheckpointMode::Full,
214 CheckpointState {
215 total_frames: 200,
216 backfilled_frames: 120,
217 oldest_reader_frame: Some(150),
218 },
219 );
220
221 assert_eq!(plan.frames_to_backfill, 30);
222 assert!(!plan.completes_checkpoint());
223 assert!(plan.blocked_by_readers);
224 assert!(!plan.should_reset_wal());
225 assert!(!plan.should_truncate_wal());
226 }
227
228 #[test]
229 fn test_full_completes_without_readers() {
230 let plan = plan_checkpoint(
231 CheckpointMode::Full,
232 CheckpointState {
233 total_frames: 75,
234 backfilled_frames: 60,
235 oldest_reader_frame: None,
236 },
237 );
238
239 assert_eq!(plan.frames_to_backfill, 15);
240 assert!(plan.completes_checkpoint());
241 assert!(!plan.blocked_by_readers);
242 }
243
244 #[test]
245 fn test_restart_requires_reader_drain_before_reset() {
246 let plan = plan_checkpoint(
247 CheckpointMode::Restart,
248 CheckpointState {
249 total_frames: 90,
250 backfilled_frames: 90,
251 oldest_reader_frame: Some(90),
252 },
253 );
254
255 assert_eq!(plan.frames_to_backfill, 0);
256 assert!(plan.completes_checkpoint());
257 assert!(plan.blocked_by_readers);
258 assert!(!plan.should_reset_wal());
259 }
260
261 #[test]
262 fn test_restart_resets_when_complete_and_reader_free() {
263 let plan = plan_checkpoint(
264 CheckpointMode::Restart,
265 CheckpointState {
266 total_frames: 64,
267 backfilled_frames: 48,
268 oldest_reader_frame: None,
269 },
270 );
271
272 assert_eq!(plan.frames_to_backfill, 16);
273 assert!(plan.completes_checkpoint());
274 assert!(!plan.blocked_by_readers);
275 assert!(plan.should_reset_wal());
276 }
277
278 #[test]
279 fn test_truncate_requires_reader_drain_before_truncate() {
280 let plan = plan_checkpoint(
281 CheckpointMode::Truncate,
282 CheckpointState {
283 total_frames: 40,
284 backfilled_frames: 40,
285 oldest_reader_frame: Some(40),
286 },
287 );
288
289 assert_eq!(plan.frames_to_backfill, 0);
290 assert!(plan.completes_checkpoint());
291 assert!(plan.blocked_by_readers);
292 assert!(!plan.should_truncate_wal());
293 }
294
295 #[test]
296 fn test_truncate_requests_truncate_when_complete_and_reader_free() {
297 let plan = plan_checkpoint(
298 CheckpointMode::Truncate,
299 CheckpointState {
300 total_frames: 10,
301 backfilled_frames: 4,
302 oldest_reader_frame: None,
303 },
304 );
305
306 assert_eq!(plan.frames_to_backfill, 6);
307 assert!(plan.completes_checkpoint());
308 assert!(!plan.blocked_by_readers);
309 assert!(plan.should_truncate_wal());
310 assert!(!plan.should_reset_wal());
311 }
312
313 #[test]
314 fn test_normalization_clamps_invalid_counters() {
315 let plan = plan_checkpoint(
316 CheckpointMode::Passive,
317 CheckpointState {
318 total_frames: 5,
319 backfilled_frames: 99,
320 oldest_reader_frame: Some(77),
321 },
322 );
323
324 assert_eq!(plan.frames_to_backfill, 0);
325 assert!(plan.completes_checkpoint());
326 }
327
328 #[test]
329 fn test_empty_wal_all_modes_are_noop() {
330 let empty = CheckpointState {
331 total_frames: 0,
332 backfilled_frames: 0,
333 oldest_reader_frame: None,
334 };
335 for mode in [
336 CheckpointMode::Passive,
337 CheckpointMode::Full,
338 CheckpointMode::Restart,
339 CheckpointMode::Truncate,
340 ] {
341 let plan = plan_checkpoint(mode, empty);
342 assert_eq!(plan.frames_to_backfill, 0, "{mode:?} on empty WAL");
343 assert!(plan.completes_checkpoint(), "{mode:?} on empty WAL");
344 assert!(!plan.blocked_by_readers, "{mode:?} on empty WAL");
345 assert!(
346 !plan.should_reset_wal() && !plan.should_truncate_wal(),
347 "{mode:?} on empty WAL should not request post-actions"
348 );
349 }
350 }
351
352 #[test]
353 fn test_passive_no_readers_backfills_all() {
354 let plan = plan_checkpoint(
355 CheckpointMode::Passive,
356 CheckpointState {
357 total_frames: 50,
358 backfilled_frames: 20,
359 oldest_reader_frame: None,
360 },
361 );
362 assert_eq!(plan.frames_to_backfill, 30);
363 assert!(plan.completes_checkpoint());
364 assert!(!plan.blocked_by_readers);
365 }
366
367 #[test]
368 fn test_already_fully_backfilled_is_complete() {
369 let plan = plan_checkpoint(
370 CheckpointMode::Full,
371 CheckpointState {
372 total_frames: 100,
373 backfilled_frames: 100,
374 oldest_reader_frame: Some(80),
375 },
376 );
377 assert_eq!(plan.frames_to_backfill, 0);
378 assert!(plan.completes_checkpoint());
379 assert!(!plan.blocked_by_readers);
380 }
381
382 #[test]
383 fn test_reader_at_exact_backfill_boundary_yields_zero_work() {
384 let plan = plan_checkpoint(
385 CheckpointMode::Passive,
386 CheckpointState {
387 total_frames: 100,
388 backfilled_frames: 60,
389 oldest_reader_frame: Some(60),
390 },
391 );
392 assert_eq!(plan.frames_to_backfill, 0);
393 assert!(!plan.completes_checkpoint());
394 }
395
396 #[test]
397 fn test_restart_on_fully_backfilled_with_reader_blocks_reset() {
398 let plan = plan_checkpoint(
399 CheckpointMode::Restart,
400 CheckpointState {
401 total_frames: 50,
402 backfilled_frames: 50,
403 oldest_reader_frame: Some(50),
404 },
405 );
406 assert_eq!(plan.frames_to_backfill, 0);
407 assert!(plan.completes_checkpoint());
408 assert!(plan.blocked_by_readers);
409 assert!(!plan.should_reset_wal());
410 }
411
412 #[test]
413 fn test_truncate_on_fully_backfilled_no_readers_truncates() {
414 let plan = plan_checkpoint(
415 CheckpointMode::Truncate,
416 CheckpointState {
417 total_frames: 50,
418 backfilled_frames: 50,
419 oldest_reader_frame: None,
420 },
421 );
422 assert_eq!(plan.frames_to_backfill, 0);
423 assert!(plan.completes_checkpoint());
424 assert!(!plan.blocked_by_readers);
425 assert!(plan.should_truncate_wal());
426 assert!(!plan.should_reset_wal());
427 }
428
429 #[test]
430 fn test_remaining_frames_saturates_at_zero() {
431 let state = CheckpointState {
432 total_frames: 10,
433 backfilled_frames: 10,
434 oldest_reader_frame: None,
435 };
436 assert_eq!(state.remaining_frames(), 0);
437 let over = CheckpointState {
438 total_frames: 5,
439 backfilled_frames: 99,
440 oldest_reader_frame: None,
441 };
442 assert_eq!(over.remaining_frames(), 0);
443 }
444
445 #[test]
446 fn test_normalized_clamps_reader_to_total() {
447 let state = CheckpointState {
448 total_frames: 20,
449 backfilled_frames: 30,
450 oldest_reader_frame: Some(50),
451 };
452 let n = state.normalized();
453 assert_eq!(n.backfilled_frames, 20);
454 assert_eq!(n.oldest_reader_frame, Some(20));
455 }
456
457 #[test]
458 fn test_full_reader_at_backfill_boundary_is_blocked() {
459 let plan = plan_checkpoint(
460 CheckpointMode::Full,
461 CheckpointState {
462 total_frames: 100,
463 backfilled_frames: 60,
464 oldest_reader_frame: Some(60),
465 },
466 );
467 assert_eq!(plan.frames_to_backfill, 0);
468 assert!(!plan.completes_checkpoint());
469 assert!(plan.blocked_by_readers);
470 }
471
472 #[test]
473 fn test_passive_never_reports_blocked() {
474 for reader in [Some(10), Some(50), None] {
475 let plan = plan_checkpoint(
476 CheckpointMode::Passive,
477 CheckpointState {
478 total_frames: 50,
479 backfilled_frames: 0,
480 oldest_reader_frame: reader,
481 },
482 );
483 assert!(
484 !plan.blocked_by_readers,
485 "Passive must never report blocked (reader={reader:?})"
486 );
487 }
488 }
489
490 #[test]
491 fn test_restart_no_post_action_on_empty_wal() {
492 let plan = plan_checkpoint(
493 CheckpointMode::Restart,
494 CheckpointState {
495 total_frames: 0,
496 backfilled_frames: 0,
497 oldest_reader_frame: None,
498 },
499 );
500 assert!(plan.completes_checkpoint());
501 assert!(!plan.should_reset_wal());
502 }
503
504 #[test]
505 fn test_normalized_is_idempotent() {
506 let state = CheckpointState {
507 total_frames: 10,
508 backfilled_frames: 50,
509 oldest_reader_frame: Some(99),
510 };
511 let n1 = state.normalized();
512 let n2 = n1.normalized();
513 assert_eq!(n1, n2);
514 }
515
516 #[test]
517 fn test_normalized_none_reader_passes_through() {
518 let state = CheckpointState {
519 total_frames: 30,
520 backfilled_frames: 10,
521 oldest_reader_frame: None,
522 };
523 let n = state.normalized();
524 assert_eq!(n.total_frames, 30);
525 assert_eq!(n.backfilled_frames, 10);
526 assert!(n.oldest_reader_frame.is_none());
527 }
528
529 #[test]
530 fn test_reset_and_truncate_are_mutually_exclusive() {
531 for mode in [
532 CheckpointMode::Passive,
533 CheckpointMode::Full,
534 CheckpointMode::Restart,
535 CheckpointMode::Truncate,
536 ] {
537 for reader in [Some(50), None] {
538 let plan = plan_checkpoint(
539 mode,
540 CheckpointState {
541 total_frames: 50,
542 backfilled_frames: 0,
543 oldest_reader_frame: reader,
544 },
545 );
546 assert!(
547 !(plan.should_reset_wal() && plan.should_truncate_wal()),
548 "{mode:?} reader={reader:?}: reset and truncate must be mutually exclusive"
549 );
550 }
551 }
552 }
553
554 #[test]
555 fn test_checkpoint_mode_copy_and_eq() {
556 let a = CheckpointMode::Restart;
557 let b = a;
558 assert_eq!(a, b);
559 assert_ne!(CheckpointMode::Passive, CheckpointMode::Full);
560 assert_ne!(CheckpointMode::Restart, CheckpointMode::Truncate);
561 }
562
563 #[test]
564 fn test_checkpoint_mode_debug_and_serialize() {
565 let dbg = format!("{:?}", CheckpointMode::Truncate);
566 assert!(dbg.contains("Truncate"));
567 let json = serde_json::to_string(&CheckpointMode::Passive).unwrap();
568 assert_eq!(json, "\"Passive\"");
569 let json_full = serde_json::to_string(&CheckpointMode::Full).unwrap();
570 assert_eq!(json_full, "\"Full\"");
571 }
572
573 #[test]
574 fn test_checkpoint_state_clone_copy_debug() {
575 let state = CheckpointState {
576 total_frames: 100,
577 backfilled_frames: 50,
578 oldest_reader_frame: Some(75),
579 };
580 let copied = state;
581 let cloned = state;
582 assert_eq!(copied, cloned);
583 let dbg = format!("{state:?}");
584 assert!(dbg.contains("CheckpointState"));
585 assert!(dbg.contains("total_frames"));
586 assert!(dbg.contains("100"));
587 }
588
589 #[test]
590 fn test_checkpoint_plan_clone_debug() {
591 use super::{CheckpointPostAction, CheckpointProgress};
592 let plan = plan_checkpoint(
593 CheckpointMode::Restart,
594 CheckpointState {
595 total_frames: 20,
596 backfilled_frames: 20,
597 oldest_reader_frame: None,
598 },
599 );
600 let cloned = plan;
601 assert_eq!(plan, cloned);
602 let dbg = format!("{plan:?}");
603 assert!(dbg.contains("CheckpointPlan"));
604 assert!(dbg.contains("Restart"));
605 assert_eq!(plan.progress, CheckpointProgress::Complete);
606 assert_eq!(plan.post_action, CheckpointPostAction::ResetWal);
607 }
608
609 #[test]
610 fn test_progress_and_post_action_variants_eq_debug() {
611 use super::{CheckpointPostAction, CheckpointProgress};
612 assert_ne!(CheckpointProgress::Partial, CheckpointProgress::Complete);
613 assert_eq!(CheckpointProgress::Partial, CheckpointProgress::Partial);
614 assert_ne!(CheckpointPostAction::None, CheckpointPostAction::ResetWal);
615 assert_ne!(
616 CheckpointPostAction::ResetWal,
617 CheckpointPostAction::TruncateWal
618 );
619 let dbg_prog = format!("{:?}", CheckpointProgress::Complete);
620 assert!(dbg_prog.contains("Complete"));
621 let dbg_act = format!("{:?}", CheckpointPostAction::TruncateWal);
622 assert!(dbg_act.contains("TruncateWal"));
623 }
624}