Skip to main content

fsqlite_wal/
checkpoint.rs

1//! WAL checkpoint planning primitives for PASSIVE/FULL/RESTART/TRUNCATE modes.
2//!
3//! This module models the mode semantics as deterministic pure functions so
4//! higher layers can execute checkpoint I/O while preserving mode behavior.
5
6use serde::Serialize;
7
8/// Checkpoint modes matching SQLite WAL checkpoint variants.
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
10pub enum CheckpointMode {
11    /// Opportunistically backfill frames that do not require waiting.
12    Passive,
13    /// Attempt to backfill all frames, blocking completion if readers pin the tail.
14    Full,
15    /// Full checkpoint plus WAL reset when no readers remain.
16    Restart,
17    /// Restart checkpoint plus WAL truncation when no readers remain.
18    Truncate,
19}
20
21/// Snapshot of WAL checkpoint state used to compute a mode plan.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub struct CheckpointState {
24    /// Highest valid WAL frame index (`mxFrame` equivalent).
25    pub total_frames: u32,
26    /// Already backfilled frame count (`nBackfill` equivalent).
27    pub backfilled_frames: u32,
28    /// Oldest active reader end mark frame, if any reader is active.
29    ///
30    /// `None` means no active readers currently pinning the WAL tail.
31    pub oldest_reader_frame: Option<u32>,
32}
33
34impl CheckpointState {
35    /// Normalize counters to a consistent state before planning.
36    #[must_use]
37    pub fn normalized(self) -> Self {
38        let total_frames = self.total_frames;
39        let backfilled_frames = self.backfilled_frames.min(total_frames);
40        let oldest_reader_frame = self
41            .oldest_reader_frame
42            .map(|frame| frame.min(total_frames));
43        Self {
44            total_frames,
45            backfilled_frames,
46            oldest_reader_frame,
47        }
48    }
49
50    /// Number of frames still pending backfill.
51    #[must_use]
52    pub fn remaining_frames(self) -> u32 {
53        self.total_frames.saturating_sub(self.backfilled_frames)
54    }
55}
56
57/// Planned checkpoint actions for a single checkpoint decision.
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub struct CheckpointPlan {
60    /// Checkpoint mode used for this plan.
61    pub mode: CheckpointMode,
62    /// Number of additional frames to backfill immediately.
63    pub frames_to_backfill: u32,
64    /// Whether frame backfill completes at plan end.
65    pub progress: CheckpointProgress,
66    /// Whether active readers prevent mode completion behavior right now.
67    pub blocked_by_readers: bool,
68    /// Post-backfill action requested by the mode.
69    pub post_action: CheckpointPostAction,
70}
71
72impl CheckpointPlan {
73    /// Whether this plan fully completes frame backfill.
74    #[must_use]
75    pub const fn completes_checkpoint(self) -> bool {
76        matches!(self.progress, CheckpointProgress::Complete)
77    }
78
79    /// Whether this plan requests a WAL reset.
80    #[must_use]
81    pub const fn should_reset_wal(self) -> bool {
82        matches!(self.post_action, CheckpointPostAction::ResetWal)
83    }
84
85    /// Whether this plan requests WAL truncation.
86    #[must_use]
87    pub const fn should_truncate_wal(self) -> bool {
88        matches!(self.post_action, CheckpointPostAction::TruncateWal)
89    }
90}
91
92/// Backfill completion state for a checkpoint plan.
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
94pub enum CheckpointProgress {
95    Partial,
96    Complete,
97}
98
99/// Post-backfill WAL action requested by a checkpoint mode.
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum CheckpointPostAction {
102    None,
103    ResetWal,
104    TruncateWal,
105}
106
107/// Compute a deterministic checkpoint plan from mode and current state.
108#[must_use]
109pub fn plan_checkpoint(mode: CheckpointMode, state: CheckpointState) -> CheckpointPlan {
110    let state = state.normalized();
111    let remaining_frames = state.remaining_frames();
112    let has_active_reader = state.oldest_reader_frame.is_some();
113    let reader_limit = state.oldest_reader_frame.unwrap_or(state.total_frames);
114    let reader_eligible = reader_limit.saturating_sub(state.backfilled_frames);
115
116    match mode {
117        CheckpointMode::Passive => {
118            let frames_to_backfill = reader_eligible.min(remaining_frames);
119            CheckpointPlan {
120                mode,
121                frames_to_backfill,
122                progress: completion_for(frames_to_backfill, remaining_frames),
123                blocked_by_readers: false,
124                post_action: CheckpointPostAction::None,
125            }
126        }
127        CheckpointMode::Full => {
128            let frames_to_backfill = reader_eligible.min(remaining_frames);
129            let progress = completion_for(frames_to_backfill, remaining_frames);
130            CheckpointPlan {
131                mode,
132                frames_to_backfill,
133                progress,
134                blocked_by_readers: matches!(progress, CheckpointProgress::Partial),
135                post_action: CheckpointPostAction::None,
136            }
137        }
138        CheckpointMode::Restart => {
139            let frames_to_backfill = reader_eligible.min(remaining_frames);
140            let progress = completion_for(frames_to_backfill, remaining_frames);
141            let post_action = if matches!(progress, CheckpointProgress::Complete)
142                && !has_active_reader
143                && state.total_frames > 0
144            {
145                CheckpointPostAction::ResetWal
146            } else {
147                CheckpointPostAction::None
148            };
149            CheckpointPlan {
150                mode,
151                frames_to_backfill,
152                progress,
153                blocked_by_readers: has_active_reader,
154                post_action,
155            }
156        }
157        CheckpointMode::Truncate => {
158            let frames_to_backfill = reader_eligible.min(remaining_frames);
159            let progress = completion_for(frames_to_backfill, remaining_frames);
160            let post_action = if matches!(progress, CheckpointProgress::Complete)
161                && !has_active_reader
162                && state.total_frames > 0
163            {
164                CheckpointPostAction::TruncateWal
165            } else {
166                CheckpointPostAction::None
167            };
168            CheckpointPlan {
169                mode,
170                frames_to_backfill,
171                progress,
172                blocked_by_readers: has_active_reader,
173                post_action,
174            }
175        }
176    }
177}
178
179#[must_use]
180const fn completion_for(frames_to_backfill: u32, remaining_frames: u32) -> CheckpointProgress {
181    if frames_to_backfill == remaining_frames {
182        CheckpointProgress::Complete
183    } else {
184        CheckpointProgress::Partial
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::{CheckpointMode, CheckpointState, plan_checkpoint};
191
192    #[test]
193    fn test_passive_respects_reader_limit() {
194        let plan = plan_checkpoint(
195            CheckpointMode::Passive,
196            CheckpointState {
197                total_frames: 100,
198                backfilled_frames: 40,
199                oldest_reader_frame: Some(65),
200            },
201        );
202
203        assert_eq!(plan.frames_to_backfill, 25);
204        assert!(!plan.completes_checkpoint());
205        assert!(!plan.blocked_by_readers);
206        assert!(!plan.should_reset_wal());
207        assert!(!plan.should_truncate_wal());
208    }
209
210    #[test]
211    fn test_full_marks_blocked_when_reader_pins_tail() {
212        let plan = plan_checkpoint(
213            CheckpointMode::Full,
214            CheckpointState {
215                total_frames: 200,
216                backfilled_frames: 120,
217                oldest_reader_frame: Some(150),
218            },
219        );
220
221        assert_eq!(plan.frames_to_backfill, 30);
222        assert!(!plan.completes_checkpoint());
223        assert!(plan.blocked_by_readers);
224        assert!(!plan.should_reset_wal());
225        assert!(!plan.should_truncate_wal());
226    }
227
228    #[test]
229    fn test_full_completes_without_readers() {
230        let plan = plan_checkpoint(
231            CheckpointMode::Full,
232            CheckpointState {
233                total_frames: 75,
234                backfilled_frames: 60,
235                oldest_reader_frame: None,
236            },
237        );
238
239        assert_eq!(plan.frames_to_backfill, 15);
240        assert!(plan.completes_checkpoint());
241        assert!(!plan.blocked_by_readers);
242    }
243
244    #[test]
245    fn test_restart_requires_reader_drain_before_reset() {
246        let plan = plan_checkpoint(
247            CheckpointMode::Restart,
248            CheckpointState {
249                total_frames: 90,
250                backfilled_frames: 90,
251                oldest_reader_frame: Some(90),
252            },
253        );
254
255        assert_eq!(plan.frames_to_backfill, 0);
256        assert!(plan.completes_checkpoint());
257        assert!(plan.blocked_by_readers);
258        assert!(!plan.should_reset_wal());
259    }
260
261    #[test]
262    fn test_restart_resets_when_complete_and_reader_free() {
263        let plan = plan_checkpoint(
264            CheckpointMode::Restart,
265            CheckpointState {
266                total_frames: 64,
267                backfilled_frames: 48,
268                oldest_reader_frame: None,
269            },
270        );
271
272        assert_eq!(plan.frames_to_backfill, 16);
273        assert!(plan.completes_checkpoint());
274        assert!(!plan.blocked_by_readers);
275        assert!(plan.should_reset_wal());
276    }
277
278    #[test]
279    fn test_truncate_requires_reader_drain_before_truncate() {
280        let plan = plan_checkpoint(
281            CheckpointMode::Truncate,
282            CheckpointState {
283                total_frames: 40,
284                backfilled_frames: 40,
285                oldest_reader_frame: Some(40),
286            },
287        );
288
289        assert_eq!(plan.frames_to_backfill, 0);
290        assert!(plan.completes_checkpoint());
291        assert!(plan.blocked_by_readers);
292        assert!(!plan.should_truncate_wal());
293    }
294
295    #[test]
296    fn test_truncate_requests_truncate_when_complete_and_reader_free() {
297        let plan = plan_checkpoint(
298            CheckpointMode::Truncate,
299            CheckpointState {
300                total_frames: 10,
301                backfilled_frames: 4,
302                oldest_reader_frame: None,
303            },
304        );
305
306        assert_eq!(plan.frames_to_backfill, 6);
307        assert!(plan.completes_checkpoint());
308        assert!(!plan.blocked_by_readers);
309        assert!(plan.should_truncate_wal());
310        assert!(!plan.should_reset_wal());
311    }
312
313    #[test]
314    fn test_normalization_clamps_invalid_counters() {
315        let plan = plan_checkpoint(
316            CheckpointMode::Passive,
317            CheckpointState {
318                total_frames: 5,
319                backfilled_frames: 99,
320                oldest_reader_frame: Some(77),
321            },
322        );
323
324        assert_eq!(plan.frames_to_backfill, 0);
325        assert!(plan.completes_checkpoint());
326    }
327
328    #[test]
329    fn test_empty_wal_all_modes_are_noop() {
330        let empty = CheckpointState {
331            total_frames: 0,
332            backfilled_frames: 0,
333            oldest_reader_frame: None,
334        };
335        for mode in [
336            CheckpointMode::Passive,
337            CheckpointMode::Full,
338            CheckpointMode::Restart,
339            CheckpointMode::Truncate,
340        ] {
341            let plan = plan_checkpoint(mode, empty);
342            assert_eq!(plan.frames_to_backfill, 0, "{mode:?} on empty WAL");
343            assert!(plan.completes_checkpoint(), "{mode:?} on empty WAL");
344            assert!(!plan.blocked_by_readers, "{mode:?} on empty WAL");
345            assert!(
346                !plan.should_reset_wal() && !plan.should_truncate_wal(),
347                "{mode:?} on empty WAL should not request post-actions"
348            );
349        }
350    }
351
352    #[test]
353    fn test_passive_no_readers_backfills_all() {
354        let plan = plan_checkpoint(
355            CheckpointMode::Passive,
356            CheckpointState {
357                total_frames: 50,
358                backfilled_frames: 20,
359                oldest_reader_frame: None,
360            },
361        );
362        assert_eq!(plan.frames_to_backfill, 30);
363        assert!(plan.completes_checkpoint());
364        assert!(!plan.blocked_by_readers);
365    }
366
367    #[test]
368    fn test_already_fully_backfilled_is_complete() {
369        let plan = plan_checkpoint(
370            CheckpointMode::Full,
371            CheckpointState {
372                total_frames: 100,
373                backfilled_frames: 100,
374                oldest_reader_frame: Some(80),
375            },
376        );
377        assert_eq!(plan.frames_to_backfill, 0);
378        assert!(plan.completes_checkpoint());
379        assert!(!plan.blocked_by_readers);
380    }
381
382    #[test]
383    fn test_reader_at_exact_backfill_boundary_yields_zero_work() {
384        let plan = plan_checkpoint(
385            CheckpointMode::Passive,
386            CheckpointState {
387                total_frames: 100,
388                backfilled_frames: 60,
389                oldest_reader_frame: Some(60),
390            },
391        );
392        assert_eq!(plan.frames_to_backfill, 0);
393        assert!(!plan.completes_checkpoint());
394    }
395
396    #[test]
397    fn test_restart_on_fully_backfilled_with_reader_blocks_reset() {
398        let plan = plan_checkpoint(
399            CheckpointMode::Restart,
400            CheckpointState {
401                total_frames: 50,
402                backfilled_frames: 50,
403                oldest_reader_frame: Some(50),
404            },
405        );
406        assert_eq!(plan.frames_to_backfill, 0);
407        assert!(plan.completes_checkpoint());
408        assert!(plan.blocked_by_readers);
409        assert!(!plan.should_reset_wal());
410    }
411
412    #[test]
413    fn test_truncate_on_fully_backfilled_no_readers_truncates() {
414        let plan = plan_checkpoint(
415            CheckpointMode::Truncate,
416            CheckpointState {
417                total_frames: 50,
418                backfilled_frames: 50,
419                oldest_reader_frame: None,
420            },
421        );
422        assert_eq!(plan.frames_to_backfill, 0);
423        assert!(plan.completes_checkpoint());
424        assert!(!plan.blocked_by_readers);
425        assert!(plan.should_truncate_wal());
426        assert!(!plan.should_reset_wal());
427    }
428
429    #[test]
430    fn test_remaining_frames_saturates_at_zero() {
431        let state = CheckpointState {
432            total_frames: 10,
433            backfilled_frames: 10,
434            oldest_reader_frame: None,
435        };
436        assert_eq!(state.remaining_frames(), 0);
437        let over = CheckpointState {
438            total_frames: 5,
439            backfilled_frames: 99,
440            oldest_reader_frame: None,
441        };
442        assert_eq!(over.remaining_frames(), 0);
443    }
444
445    #[test]
446    fn test_normalized_clamps_reader_to_total() {
447        let state = CheckpointState {
448            total_frames: 20,
449            backfilled_frames: 30,
450            oldest_reader_frame: Some(50),
451        };
452        let n = state.normalized();
453        assert_eq!(n.backfilled_frames, 20);
454        assert_eq!(n.oldest_reader_frame, Some(20));
455    }
456
457    #[test]
458    fn test_full_reader_at_backfill_boundary_is_blocked() {
459        let plan = plan_checkpoint(
460            CheckpointMode::Full,
461            CheckpointState {
462                total_frames: 100,
463                backfilled_frames: 60,
464                oldest_reader_frame: Some(60),
465            },
466        );
467        assert_eq!(plan.frames_to_backfill, 0);
468        assert!(!plan.completes_checkpoint());
469        assert!(plan.blocked_by_readers);
470    }
471
472    #[test]
473    fn test_passive_never_reports_blocked() {
474        for reader in [Some(10), Some(50), None] {
475            let plan = plan_checkpoint(
476                CheckpointMode::Passive,
477                CheckpointState {
478                    total_frames: 50,
479                    backfilled_frames: 0,
480                    oldest_reader_frame: reader,
481                },
482            );
483            assert!(
484                !plan.blocked_by_readers,
485                "Passive must never report blocked (reader={reader:?})"
486            );
487        }
488    }
489
490    #[test]
491    fn test_restart_no_post_action_on_empty_wal() {
492        let plan = plan_checkpoint(
493            CheckpointMode::Restart,
494            CheckpointState {
495                total_frames: 0,
496                backfilled_frames: 0,
497                oldest_reader_frame: None,
498            },
499        );
500        assert!(plan.completes_checkpoint());
501        assert!(!plan.should_reset_wal());
502    }
503
504    #[test]
505    fn test_normalized_is_idempotent() {
506        let state = CheckpointState {
507            total_frames: 10,
508            backfilled_frames: 50,
509            oldest_reader_frame: Some(99),
510        };
511        let n1 = state.normalized();
512        let n2 = n1.normalized();
513        assert_eq!(n1, n2);
514    }
515
516    #[test]
517    fn test_normalized_none_reader_passes_through() {
518        let state = CheckpointState {
519            total_frames: 30,
520            backfilled_frames: 10,
521            oldest_reader_frame: None,
522        };
523        let n = state.normalized();
524        assert_eq!(n.total_frames, 30);
525        assert_eq!(n.backfilled_frames, 10);
526        assert!(n.oldest_reader_frame.is_none());
527    }
528
529    #[test]
530    fn test_reset_and_truncate_are_mutually_exclusive() {
531        for mode in [
532            CheckpointMode::Passive,
533            CheckpointMode::Full,
534            CheckpointMode::Restart,
535            CheckpointMode::Truncate,
536        ] {
537            for reader in [Some(50), None] {
538                let plan = plan_checkpoint(
539                    mode,
540                    CheckpointState {
541                        total_frames: 50,
542                        backfilled_frames: 0,
543                        oldest_reader_frame: reader,
544                    },
545                );
546                assert!(
547                    !(plan.should_reset_wal() && plan.should_truncate_wal()),
548                    "{mode:?} reader={reader:?}: reset and truncate must be mutually exclusive"
549                );
550            }
551        }
552    }
553
554    #[test]
555    fn test_checkpoint_mode_copy_and_eq() {
556        let a = CheckpointMode::Restart;
557        let b = a;
558        assert_eq!(a, b);
559        assert_ne!(CheckpointMode::Passive, CheckpointMode::Full);
560        assert_ne!(CheckpointMode::Restart, CheckpointMode::Truncate);
561    }
562
563    #[test]
564    fn test_checkpoint_mode_debug_and_serialize() {
565        let dbg = format!("{:?}", CheckpointMode::Truncate);
566        assert!(dbg.contains("Truncate"));
567        let json = serde_json::to_string(&CheckpointMode::Passive).unwrap();
568        assert_eq!(json, "\"Passive\"");
569        let json_full = serde_json::to_string(&CheckpointMode::Full).unwrap();
570        assert_eq!(json_full, "\"Full\"");
571    }
572
573    #[test]
574    fn test_checkpoint_state_clone_copy_debug() {
575        let state = CheckpointState {
576            total_frames: 100,
577            backfilled_frames: 50,
578            oldest_reader_frame: Some(75),
579        };
580        let copied = state;
581        let cloned = state;
582        assert_eq!(copied, cloned);
583        let dbg = format!("{state:?}");
584        assert!(dbg.contains("CheckpointState"));
585        assert!(dbg.contains("total_frames"));
586        assert!(dbg.contains("100"));
587    }
588
589    #[test]
590    fn test_checkpoint_plan_clone_debug() {
591        use super::{CheckpointPostAction, CheckpointProgress};
592        let plan = plan_checkpoint(
593            CheckpointMode::Restart,
594            CheckpointState {
595                total_frames: 20,
596                backfilled_frames: 20,
597                oldest_reader_frame: None,
598            },
599        );
600        let cloned = plan;
601        assert_eq!(plan, cloned);
602        let dbg = format!("{plan:?}");
603        assert!(dbg.contains("CheckpointPlan"));
604        assert!(dbg.contains("Restart"));
605        assert_eq!(plan.progress, CheckpointProgress::Complete);
606        assert_eq!(plan.post_action, CheckpointPostAction::ResetWal);
607    }
608
609    #[test]
610    fn test_progress_and_post_action_variants_eq_debug() {
611        use super::{CheckpointPostAction, CheckpointProgress};
612        assert_ne!(CheckpointProgress::Partial, CheckpointProgress::Complete);
613        assert_eq!(CheckpointProgress::Partial, CheckpointProgress::Partial);
614        assert_ne!(CheckpointPostAction::None, CheckpointPostAction::ResetWal);
615        assert_ne!(
616            CheckpointPostAction::ResetWal,
617            CheckpointPostAction::TruncateWal
618        );
619        let dbg_prog = format!("{:?}", CheckpointProgress::Complete);
620        assert!(dbg_prog.contains("Complete"));
621        let dbg_act = format!("{:?}", CheckpointPostAction::TruncateWal);
622        assert!(dbg_act.contains("TruncateWal"));
623    }
624}