Skip to main content

maw/oplog/
checkpoint.rs

1//! View checkpoints and log compaction (§5.5).
2//!
3//! Checkpoints are periodic snapshots of a [`MaterializedView`] stored as
4//! Annotate operations in the op log. They bound replay cost: instead of
5//! replaying from root, we replay from the latest checkpoint.
6//!
7//! # Checkpoint strategy
8//!
9//! Every N operations (configurable, default 100), the caller writes a
10//! Checkpoint annotation containing the serialized [`MaterializedView`].
11//! The checkpoint is deterministic: the same sequence of operations always
12//! produces the same checkpoint.
13//!
14//! # Compaction
15//!
16//! Compaction replaces all operations before a checkpoint with a single
17//! synthetic Create + Checkpoint pair. The old blobs remain in git's object
18//! store and will be garbage-collected by `git gc` when unreferenced.
19//!
20//! # Replay from checkpoint
21//!
22//! [`materialize_from_checkpoint`] walks the op log backwards until it hits
23//! a checkpoint annotation, then replays only the operations after the
24//! checkpoint. This is semantically equivalent to full replay.
25
26#![allow(clippy::missing_errors_doc)]
27
28use std::collections::BTreeMap;
29use std::fmt;
30use std::path::Path;
31
32use serde::{Deserialize, Serialize};
33
34use crate::model::patch::PatchSet;
35use crate::model::types::{GitOid, WorkspaceId};
36use crate::oplog::read::{OpLogReadError, walk_chain};
37use crate::oplog::types::{OpPayload, Operation};
38use crate::oplog::view::{MaterializedView, ViewError, materialize_from_ops};
39use crate::oplog::write::{OpLogWriteError, append_operation};
40
41// ---------------------------------------------------------------------------
42// Constants
43// ---------------------------------------------------------------------------
44
45/// The annotation key used for checkpoint data in the op log.
46pub const CHECKPOINT_KEY: &str = "checkpoint";
47
48/// Default checkpoint interval: write a checkpoint every N operations.
49#[allow(dead_code)]
50pub const DEFAULT_CHECKPOINT_INTERVAL: usize = 100;
51
52// ---------------------------------------------------------------------------
53// Checkpoint data
54// ---------------------------------------------------------------------------
55
56/// Serialized checkpoint data stored in an Annotate operation.
57///
58/// Contains the full materialized view state at the point of the checkpoint.
59#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
60pub struct CheckpointData {
61    /// The materialized view at the checkpoint.
62    pub view: CheckpointView,
63
64    /// Number of operations replayed to produce this checkpoint.
65    pub op_count: usize,
66
67    /// The OID of the operation that triggered the checkpoint.
68    pub trigger_oid: String,
69}
70
71/// Subset of [`MaterializedView`] that is checkpointed.
72///
73/// We serialize only the essential state, not the full `MaterializedView`
74/// struct, to keep checkpoints forward-compatible.
75#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
76pub struct CheckpointView {
77    /// The workspace this view belongs to.
78    pub workspace_id: String,
79
80    /// Current epoch (from latest Create or Merge).
81    pub epoch: Option<String>,
82
83    /// Current patch set (serialized).
84    pub patch_set: Option<PatchSet>,
85
86    /// Patch set blob OID.
87    pub patch_set_oid: Option<String>,
88
89    /// Description.
90    pub description: Option<String>,
91
92    /// Annotations (excluding checkpoint annotations themselves).
93    pub annotations: BTreeMap<String, BTreeMap<String, serde_json::Value>>,
94
95    /// Whether workspace is destroyed.
96    pub is_destroyed: bool,
97}
98
99impl CheckpointView {
100    /// Convert a [`MaterializedView`] to checkpoint-serializable form.
101    #[must_use]
102    pub fn from_view(view: &MaterializedView) -> Self {
103        Self {
104            workspace_id: view.workspace_id.to_string(),
105            epoch: view.epoch.as_ref().map(|e| e.as_str().to_owned()),
106            patch_set: view.patch_set.clone(),
107            patch_set_oid: view.patch_set_oid.as_ref().map(|o| o.as_str().to_owned()),
108            description: view.description.clone(),
109            annotations: view
110                .annotations
111                .iter()
112                .filter(|(k, _)| k.as_str() != CHECKPOINT_KEY)
113                .map(|(k, v)| (k.clone(), v.clone()))
114                .collect(),
115            is_destroyed: view.is_destroyed,
116        }
117    }
118
119    /// Restore a [`MaterializedView`] from checkpoint data.
120    ///
121    /// # Errors
122    ///
123    /// Returns `CheckpointError::InvalidData` if OIDs cannot be parsed.
124    pub fn to_view(&self, op_count: usize) -> Result<MaterializedView, CheckpointError> {
125        use crate::model::types::EpochId;
126
127        let epoch = self
128            .epoch
129            .as_ref()
130            .map(|s| EpochId::new(s))
131            .transpose()
132            .map_err(|_| CheckpointError::InvalidData {
133                detail: format!("invalid epoch OID: {:?}", self.epoch),
134            })?;
135
136        let patch_set_oid = self
137            .patch_set_oid
138            .as_ref()
139            .map(|s| GitOid::new(s))
140            .transpose()
141            .map_err(|_| CheckpointError::InvalidData {
142                detail: format!("invalid patch_set OID: {:?}", self.patch_set_oid),
143            })?;
144
145        let ws_id =
146            WorkspaceId::new(&self.workspace_id).map_err(|_| CheckpointError::InvalidData {
147                detail: format!("invalid workspace_id: {:?}", self.workspace_id),
148            })?;
149
150        Ok(MaterializedView {
151            workspace_id: ws_id,
152            epoch,
153            patch_set: self.patch_set.clone(),
154            patch_set_oid,
155            description: self.description.clone(),
156            annotations: self.annotations.clone(),
157            op_count,
158            is_destroyed: self.is_destroyed,
159        })
160    }
161}
162
163// ---------------------------------------------------------------------------
164// Error type
165// ---------------------------------------------------------------------------
166
167/// Errors from checkpoint operations.
168#[derive(Debug)]
169pub enum CheckpointError {
170    /// Op log read error.
171    OpLogRead(OpLogReadError),
172
173    /// Op log write error.
174    OpLogWrite(OpLogWriteError),
175
176    /// View materialization error.
177    View(ViewError),
178
179    /// Checkpoint data was malformed or unparseable.
180    InvalidData {
181        /// Description of what was wrong.
182        detail: String,
183    },
184
185    /// No checkpoint found in the op log.
186    NoCheckpoint {
187        /// The workspace that has no checkpoint.
188        workspace_id: WorkspaceId,
189    },
190}
191
192impl fmt::Display for CheckpointError {
193    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
194        match self {
195            Self::OpLogRead(e) => write!(f, "checkpoint: op log read error: {e}"),
196            Self::OpLogWrite(e) => write!(f, "checkpoint: op log write error: {e}"),
197            Self::View(e) => write!(f, "checkpoint: view error: {e}"),
198            Self::InvalidData { detail } => {
199                write!(f, "checkpoint: invalid data: {detail}")
200            }
201            Self::NoCheckpoint { workspace_id } => {
202                write!(
203                    f,
204                    "no checkpoint found for workspace '{workspace_id}'\n  \
205                     To fix: run checkpoint creation first."
206                )
207            }
208        }
209    }
210}
211
212impl std::error::Error for CheckpointError {
213    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
214        match self {
215            Self::OpLogRead(e) => Some(e),
216            Self::OpLogWrite(e) => Some(e),
217            Self::View(e) => Some(e),
218            _ => None,
219        }
220    }
221}
222
223impl From<OpLogReadError> for CheckpointError {
224    fn from(e: OpLogReadError) -> Self {
225        Self::OpLogRead(e)
226    }
227}
228
229impl From<OpLogWriteError> for CheckpointError {
230    fn from(e: OpLogWriteError) -> Self {
231        Self::OpLogWrite(e)
232    }
233}
234
235impl From<ViewError> for CheckpointError {
236    fn from(e: ViewError) -> Self {
237        Self::View(e)
238    }
239}
240
241// ---------------------------------------------------------------------------
242// Checkpoint detection
243// ---------------------------------------------------------------------------
244
245/// Check if an operation is a checkpoint annotation.
246#[must_use]
247pub fn is_checkpoint(op: &Operation) -> bool {
248    matches!(&op.payload, OpPayload::Annotate { key, .. } if key == CHECKPOINT_KEY)
249}
250
251/// Extract [`CheckpointData`] from a checkpoint annotation operation.
252///
253/// Returns `None` if the operation is not a checkpoint annotation or if
254/// the data cannot be parsed.
255#[must_use]
256pub fn extract_checkpoint(op: &Operation) -> Option<CheckpointData> {
257    match &op.payload {
258        OpPayload::Annotate { key, data } if key == CHECKPOINT_KEY => {
259            // Convert BTreeMap<String, Value> to CheckpointData via JSON
260            let value = serde_json::Value::Object(
261                data.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
262            );
263            serde_json::from_value(value).ok()
264        }
265        _ => None,
266    }
267}
268
269// ---------------------------------------------------------------------------
270// Checkpoint creation
271// ---------------------------------------------------------------------------
272
273/// Determine if a checkpoint should be written based on operation count.
274///
275/// Returns `true` if `op_count` is a multiple of `interval` and `op_count > 0`.
276#[must_use]
277pub const fn should_checkpoint(op_count: usize, interval: usize) -> bool {
278    interval > 0 && op_count > 0 && op_count.is_multiple_of(interval)
279}
280
281/// Create a checkpoint [`Operation`] from a materialized view.
282///
283/// The checkpoint is an Annotate operation with key `"checkpoint"` and
284/// the serialized checkpoint data as the value.
285///
286/// # Arguments
287/// * `view` — the materialized view to checkpoint.
288/// * `trigger_oid` — the OID of the operation that triggered this checkpoint.
289/// * `parent_oid` — the parent operation OID for the checkpoint op.
290#[must_use]
291pub fn create_checkpoint_op(
292    view: &MaterializedView,
293    trigger_oid: &GitOid,
294    parent_oid: &GitOid,
295) -> Operation {
296    let checkpoint_data = CheckpointData {
297        view: CheckpointView::from_view(view),
298        op_count: view.op_count,
299        trigger_oid: trigger_oid.as_str().to_owned(),
300    };
301
302    // Serialize CheckpointData into a BTreeMap<String, Value> for the annotation
303    let data_value = serde_json::to_value(&checkpoint_data).unwrap_or_default();
304    let data: BTreeMap<String, serde_json::Value> = match data_value {
305        serde_json::Value::Object(map) => map.into_iter().collect(),
306        _ => BTreeMap::new(),
307    };
308
309    Operation {
310        parent_ids: vec![parent_oid.clone()],
311        workspace_id: view.workspace_id.clone(),
312        timestamp: {
313            let dur = std::time::SystemTime::now()
314                .duration_since(std::time::UNIX_EPOCH)
315                .unwrap_or_default();
316            format!(
317                "{}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
318                1970 + dur.as_secs() / 31_557_600,
319                (dur.as_secs() % 31_557_600) / 2_629_800 + 1,
320                (dur.as_secs() % 2_629_800) / 86400 + 1,
321                (dur.as_secs() % 86400) / 3600,
322                (dur.as_secs() % 3600) / 60,
323                dur.as_secs() % 60,
324            )
325        },
326        payload: OpPayload::Annotate {
327            key: CHECKPOINT_KEY.to_owned(),
328            data,
329        },
330    }
331}
332
333/// Write a checkpoint to the op log if the interval has been reached.
334///
335/// This is the primary entry point for automated checkpoint creation.
336/// Call it after each operation is appended.
337///
338/// # Returns
339///
340/// `Some(oid)` if a checkpoint was written, `None` if not needed yet.
341///
342/// # Arguments
343/// * `root` — path to the git repository root.
344/// * `workspace_id` — workspace to checkpoint.
345/// * `current_view` — the just-materialized view.
346/// * `trigger_oid` — OID of the operation that just completed.
347/// * `current_head` — current head ref value (= `trigger_oid` usually).
348/// * `interval` — checkpoint every N operations.
349pub fn maybe_write_checkpoint(
350    root: &Path,
351    workspace_id: &WorkspaceId,
352    current_view: &MaterializedView,
353    trigger_oid: &GitOid,
354    current_head: &GitOid,
355    interval: usize,
356) -> Result<Option<GitOid>, CheckpointError> {
357    if !should_checkpoint(current_view.op_count, interval) {
358        return Ok(None);
359    }
360
361    let cp_op = create_checkpoint_op(current_view, trigger_oid, current_head);
362    let oid = append_operation(root, workspace_id, &cp_op, Some(current_head))?;
363
364    Ok(Some(oid))
365}
366
367// ---------------------------------------------------------------------------
368// Replay from checkpoint
369// ---------------------------------------------------------------------------
370
371/// Materialize a workspace view, starting from the latest checkpoint if one exists.
372///
373/// This is semantically equivalent to full replay but faster for long op logs:
374/// 1. Walk backwards from head until a checkpoint annotation is found.
375/// 2. Restore the view from the checkpoint.
376/// 3. Replay only the operations after the checkpoint.
377///
378/// If no checkpoint exists, falls back to full replay from root.
379///
380/// # Arguments
381/// * `root` — path to the git repository root.
382/// * `workspace_id` — workspace to materialize.
383/// * `read_patch_set` — callback to fetch patch-set blob contents.
384pub fn materialize_from_checkpoint<F>(
385    root: &Path,
386    workspace_id: &WorkspaceId,
387    read_patch_set: F,
388) -> Result<MaterializedView, CheckpointError>
389where
390    F: Fn(&GitOid) -> Result<PatchSet, ViewError>,
391{
392    // Walk the entire chain to find the latest checkpoint
393    let stop_pred: Option<&dyn Fn(&Operation) -> bool> = None;
394    let chain = walk_chain(root, workspace_id, None, stop_pred)?;
395
396    if chain.is_empty() {
397        return Err(CheckpointError::OpLogRead(OpLogReadError::NoHead {
398            workspace_id: workspace_id.clone(),
399        }));
400    }
401
402    // chain is in reverse chronological order (newest first)
403    // Find the latest checkpoint
404    let mut checkpoint_idx = None;
405    for (i, (_oid, op)) in chain.iter().enumerate() {
406        if is_checkpoint(op) {
407            checkpoint_idx = Some(i);
408            break; // First one found is the latest (chain is newest-first)
409        }
410    }
411
412    if let Some(cp_idx) = checkpoint_idx {
413        // Extract checkpoint data
414        let (_cp_oid, cp_op) = &chain[cp_idx];
415        let cp_data = extract_checkpoint(cp_op).ok_or_else(|| CheckpointError::InvalidData {
416            detail: "checkpoint annotation has unparseable data".to_owned(),
417        })?;
418
419        // Restore view from checkpoint
420        let mut view = cp_data.view.to_view(cp_data.op_count)?;
421
422        // Replay operations AFTER the checkpoint (newer operations)
423        // chain[0..cp_idx] are newer than the checkpoint, reversed for causal order
424        let post_checkpoint: Vec<_> = chain[..cp_idx].iter().rev().cloned().collect();
425
426        for (oid, op) in &post_checkpoint {
427            // Skip checkpoint annotations during replay
428            if is_checkpoint(op) {
429                view.op_count += 1;
430                continue;
431            }
432            replay_single_op(&mut view, oid, op, &read_patch_set)?;
433        }
434
435        Ok(view)
436    } else {
437        // No checkpoint found — full replay
438        let mut ops: Vec<_> = chain;
439        ops.reverse(); // causal order (oldest first)
440        let view = materialize_from_ops(workspace_id.clone(), &ops, read_patch_set)?;
441        Ok(view)
442    }
443}
444
445/// Replay a single operation on a mutable view (mirrors `apply_operation` in view.rs).
446fn replay_single_op<F>(
447    view: &mut MaterializedView,
448    _oid: &GitOid,
449    op: &Operation,
450    read_patch_set: &F,
451) -> Result<(), CheckpointError>
452where
453    F: Fn(&GitOid) -> Result<PatchSet, ViewError>,
454{
455    view.op_count += 1;
456
457    match &op.payload {
458        OpPayload::Create { epoch } => {
459            view.epoch = Some(epoch.clone());
460            view.patch_set = None;
461            view.patch_set_oid = None;
462            view.is_destroyed = false;
463        }
464
465        OpPayload::Snapshot { patch_set_oid } => {
466            let ps = read_patch_set(patch_set_oid)?;
467            view.patch_set = Some(ps);
468            view.patch_set_oid = Some(patch_set_oid.clone());
469        }
470
471        OpPayload::Compensate { .. } => {
472            view.patch_set = None;
473            view.patch_set_oid = None;
474        }
475
476        OpPayload::Merge { epoch_after, .. } => {
477            view.epoch = Some(epoch_after.clone());
478            view.patch_set = None;
479            view.patch_set_oid = None;
480        }
481
482        OpPayload::Describe { message } => {
483            view.description = Some(message.clone());
484        }
485
486        OpPayload::Annotate { key, data } => {
487            view.annotations.insert(key.clone(), data.clone());
488        }
489
490        OpPayload::Destroy => {
491            view.is_destroyed = true;
492        }
493    }
494
495    Ok(())
496}
497
498// ---------------------------------------------------------------------------
499// Log compaction
500// ---------------------------------------------------------------------------
501
502/// Compact result containing the new head after compaction.
503#[derive(Clone, Debug)]
504pub struct CompactionResult {
505    /// The new head OID after compaction.
506    #[allow(dead_code)]
507    pub new_head: GitOid,
508
509    /// Number of operations before compaction.
510    pub ops_before: usize,
511
512    /// Number of operations after compaction (checkpoint + any post-checkpoint ops).
513    pub ops_after: usize,
514}
515
516/// Compact the op log for a workspace by replacing all operations before
517/// the latest checkpoint with a synthetic Create + Checkpoint pair.
518///
519/// This reduces the chain length while preserving the same materialized view.
520/// Old blobs remain in git's object store until `git gc` collects them.
521///
522/// # Strategy
523///
524/// 1. Walk chain, find latest checkpoint.
525/// 2. Create synthetic root: Create operation with the checkpoint's epoch.
526/// 3. Create checkpoint annotation on top of synthetic root.
527/// 4. Re-link post-checkpoint operations to point to the new checkpoint.
528/// 5. Update head ref.
529///
530/// # Arguments
531/// * `root` — path to the git repository root.
532/// * `workspace_id` — workspace to compact.
533///
534/// # Errors
535///
536/// Returns `CheckpointError::NoCheckpoint` if no checkpoint exists.
537pub fn compact(
538    root: &Path,
539    workspace_id: &WorkspaceId,
540) -> Result<CompactionResult, CheckpointError> {
541    let stop_pred: Option<&dyn Fn(&Operation) -> bool> = None;
542    let chain = walk_chain(root, workspace_id, None, stop_pred)?;
543
544    if chain.is_empty() {
545        return Err(CheckpointError::OpLogRead(OpLogReadError::NoHead {
546            workspace_id: workspace_id.clone(),
547        }));
548    }
549
550    let ops_before = chain.len();
551
552    // Find the latest checkpoint (chain is newest-first)
553    let mut checkpoint_idx = None;
554    for (i, (_oid, op)) in chain.iter().enumerate() {
555        if is_checkpoint(op) {
556            checkpoint_idx = Some(i);
557            break;
558        }
559    }
560
561    let cp_idx = checkpoint_idx.ok_or_else(|| CheckpointError::NoCheckpoint {
562        workspace_id: workspace_id.clone(),
563    })?;
564
565    // If checkpoint is the second-to-last or last op, not much to compact
566    if cp_idx >= chain.len() - 1 {
567        // Nothing to compact — checkpoint is already at or near root
568        return Ok(CompactionResult {
569            new_head: chain[0].0.clone(),
570            ops_before,
571            ops_after: ops_before,
572        });
573    }
574
575    let (_cp_oid, cp_op) = &chain[cp_idx];
576    let cp_data = extract_checkpoint(cp_op).ok_or_else(|| CheckpointError::InvalidData {
577        detail: "checkpoint annotation has unparseable data".to_owned(),
578    })?;
579
580    // Extract epoch from checkpoint
581    let epoch = cp_data
582        .view
583        .epoch
584        .as_ref()
585        .ok_or_else(|| CheckpointError::InvalidData {
586            detail: "checkpoint has no epoch".to_owned(),
587        })?;
588    let epoch_id =
589        crate::model::types::EpochId::new(epoch).map_err(|_| CheckpointError::InvalidData {
590            detail: format!("invalid epoch in checkpoint: {epoch}"),
591        })?;
592
593    // Step 1: Write synthetic Create (new root, no parents)
594    let synthetic_create = Operation {
595        parent_ids: vec![],
596        workspace_id: workspace_id.clone(),
597        timestamp: cp_op.timestamp.clone(),
598        payload: OpPayload::Create { epoch: epoch_id },
599    };
600
601    // We can't use append_operation because we're building a new chain.
602    // Write blobs directly and update the ref at the end.
603    let create_oid = crate::oplog::write::write_operation_blob(root, &synthetic_create)?;
604
605    // Step 2: Write checkpoint annotation on top of synthetic Create
606    let mut cp_annotate = cp_op.clone();
607    cp_annotate.parent_ids = vec![create_oid];
608    let cp_new_oid = crate::oplog::write::write_operation_blob(root, &cp_annotate)?;
609
610    // Step 3: Re-write post-checkpoint ops with updated parent pointers
611    // Post-checkpoint ops are chain[0..cp_idx], from newest to oldest
612    // We need to re-link them: the oldest post-checkpoint op should point to cp_new_oid
613    let post_ops: Vec<_> = chain[..cp_idx].iter().rev().cloned().collect(); // oldest first
614
615    let mut prev_oid = cp_new_oid;
616    let mut ops_after = 2; // synthetic create + checkpoint
617
618    for (_old_oid, mut op) in post_ops {
619        // Replace parent with prev_oid
620        op.parent_ids = vec![prev_oid.clone()];
621        let new_oid = crate::oplog::write::write_operation_blob(root, &op)?;
622        prev_oid = new_oid;
623        ops_after += 1;
624    }
625
626    // Step 4: Update head ref to point to the new chain head
627    let ref_name = crate::refs::workspace_head_ref(workspace_id.as_str());
628    let current_head = chain[0].0.clone();
629    crate::refs::write_ref_cas(root, &ref_name, &current_head, &prev_oid).map_err(|e| match e {
630        crate::refs::RefError::CasMismatch { .. } => {
631            CheckpointError::OpLogWrite(OpLogWriteError::CasMismatch {
632                workspace_id: workspace_id.clone(),
633            })
634        }
635        other => CheckpointError::OpLogWrite(OpLogWriteError::RefError(other)),
636    })?;
637
638    Ok(CompactionResult {
639        new_head: prev_oid,
640        ops_before,
641        ops_after,
642    })
643}
644
645// ---------------------------------------------------------------------------
646// Tests
647// ---------------------------------------------------------------------------
648
649#[cfg(test)]
650#[allow(clippy::all, clippy::pedantic, clippy::nursery)]
651mod tests {
652    use super::*;
653    use crate::model::patch::{FileId, PatchSet, PatchValue};
654    use crate::model::types::{EpochId, GitOid, WorkspaceId};
655    use crate::oplog::types::{OpPayload, Operation};
656    use crate::oplog::view::MaterializedView;
657    use std::collections::BTreeMap;
658    use std::path::PathBuf;
659
660    // -----------------------------------------------------------------------
661    // Helpers
662    // -----------------------------------------------------------------------
663
664    fn test_oid(c: char) -> GitOid {
665        GitOid::new(&c.to_string().repeat(40)).unwrap()
666    }
667
668    fn test_epoch(c: char) -> EpochId {
669        EpochId::new(&c.to_string().repeat(40)).unwrap()
670    }
671
672    fn test_ws(name: &str) -> WorkspaceId {
673        WorkspaceId::new(name).unwrap()
674    }
675
676    fn test_patch_set(epoch_char: char) -> PatchSet {
677        let mut patches = BTreeMap::new();
678        patches.insert(
679            PathBuf::from("src/main.rs"),
680            PatchValue::Add {
681                blob: test_oid('f'),
682                file_id: FileId::new(1),
683            },
684        );
685        PatchSet {
686            base_epoch: test_epoch(epoch_char),
687            patches,
688        }
689    }
690
691    fn mock_reader(ps: PatchSet) -> impl Fn(&GitOid) -> Result<PatchSet, ViewError> {
692        move |_oid| Ok(ps.clone())
693    }
694
695    fn make_op(ws: &str, payload: OpPayload) -> Operation {
696        Operation {
697            parent_ids: vec![],
698            workspace_id: test_ws(ws),
699            timestamp: "2026-02-19T12:00:00Z".to_owned(),
700            payload,
701        }
702    }
703
704    fn make_view(ws: &str, epoch_char: char, op_count: usize) -> MaterializedView {
705        MaterializedView {
706            workspace_id: test_ws(ws),
707            epoch: Some(test_epoch(epoch_char)),
708            patch_set: Some(test_patch_set(epoch_char)),
709            patch_set_oid: Some(test_oid('d')),
710            description: Some("test description".into()),
711            annotations: BTreeMap::new(),
712            op_count,
713            is_destroyed: false,
714        }
715    }
716
717    // -----------------------------------------------------------------------
718    // is_checkpoint
719    // -----------------------------------------------------------------------
720
721    #[test]
722    fn is_checkpoint_returns_true_for_checkpoint_annotate() {
723        let op = make_op(
724            "ws-1",
725            OpPayload::Annotate {
726                key: CHECKPOINT_KEY.to_owned(),
727                data: BTreeMap::new(),
728            },
729        );
730        assert!(is_checkpoint(&op));
731    }
732
733    #[test]
734    fn is_checkpoint_returns_false_for_other_annotate() {
735        let op = make_op(
736            "ws-1",
737            OpPayload::Annotate {
738                key: "validation".to_owned(),
739                data: BTreeMap::new(),
740            },
741        );
742        assert!(!is_checkpoint(&op));
743    }
744
745    #[test]
746    fn is_checkpoint_returns_false_for_non_annotate() {
747        let op = make_op("ws-1", OpPayload::Destroy);
748        assert!(!is_checkpoint(&op));
749    }
750
751    // -----------------------------------------------------------------------
752    // should_checkpoint
753    // -----------------------------------------------------------------------
754
755    #[test]
756    fn should_checkpoint_at_interval() {
757        assert!(should_checkpoint(100, 100));
758        assert!(should_checkpoint(200, 100));
759        assert!(should_checkpoint(50, 50));
760    }
761
762    #[test]
763    fn should_not_checkpoint_between_intervals() {
764        assert!(!should_checkpoint(99, 100));
765        assert!(!should_checkpoint(101, 100));
766        assert!(!should_checkpoint(1, 100));
767    }
768
769    #[test]
770    fn should_not_checkpoint_at_zero() {
771        assert!(!should_checkpoint(0, 100));
772    }
773
774    #[test]
775    fn should_not_checkpoint_with_zero_interval() {
776        assert!(!should_checkpoint(100, 0));
777    }
778
779    // -----------------------------------------------------------------------
780    // CheckpointView round-trip
781    // -----------------------------------------------------------------------
782
783    #[test]
784    fn checkpoint_view_from_and_to_view() {
785        let view = make_view("ws-1", 'a', 100);
786        let cp_view = CheckpointView::from_view(&view);
787        let restored = cp_view.to_view(100).unwrap();
788
789        assert_eq!(restored.workspace_id, view.workspace_id);
790        assert_eq!(restored.epoch, view.epoch);
791        assert_eq!(restored.patch_set, view.patch_set);
792        assert_eq!(restored.patch_set_oid, view.patch_set_oid);
793        assert_eq!(restored.description, view.description);
794        assert_eq!(restored.is_destroyed, view.is_destroyed);
795        assert_eq!(restored.op_count, view.op_count);
796    }
797
798    #[test]
799    fn checkpoint_view_filters_checkpoint_annotations() {
800        let mut view = make_view("ws-1", 'a', 100);
801        let mut checkpoint_data = BTreeMap::new();
802        checkpoint_data.insert("key".into(), serde_json::Value::String("val".into()));
803        view.annotations
804            .insert(CHECKPOINT_KEY.to_owned(), checkpoint_data);
805
806        let mut other_data = BTreeMap::new();
807        other_data.insert("passed".into(), serde_json::Value::Bool(true));
808        view.annotations.insert("validation".to_owned(), other_data);
809
810        let cp_view = CheckpointView::from_view(&view);
811
812        // Checkpoint annotation should be filtered out
813        assert!(!cp_view.annotations.contains_key(CHECKPOINT_KEY));
814        assert!(cp_view.annotations.contains_key("validation"));
815    }
816
817    #[test]
818    fn checkpoint_view_empty_epoch() {
819        let view = MaterializedView::empty(test_ws("ws-1"));
820        let cp_view = CheckpointView::from_view(&view);
821        let restored = cp_view.to_view(0).unwrap();
822
823        assert!(restored.epoch.is_none());
824        assert!(restored.patch_set.is_none());
825        assert!(!restored.is_destroyed);
826    }
827
828    #[test]
829    fn checkpoint_view_destroyed() {
830        let mut view = make_view("ws-1", 'a', 5);
831        view.is_destroyed = true;
832
833        let cp_view = CheckpointView::from_view(&view);
834        assert!(cp_view.is_destroyed);
835
836        let restored = cp_view.to_view(5).unwrap();
837        assert!(restored.is_destroyed);
838    }
839
840    // -----------------------------------------------------------------------
841    // CheckpointData serde
842    // -----------------------------------------------------------------------
843
844    #[test]
845    fn checkpoint_data_serde_roundtrip() {
846        let view = make_view("ws-1", 'a', 100);
847        let cp_data = CheckpointData {
848            view: CheckpointView::from_view(&view),
849            op_count: 100,
850            trigger_oid: test_oid('1').as_str().to_owned(),
851        };
852
853        let json = serde_json::to_value(&cp_data).unwrap();
854        let restored: CheckpointData = serde_json::from_value(json).unwrap();
855
856        assert_eq!(restored.op_count, 100);
857        assert_eq!(restored.trigger_oid, cp_data.trigger_oid);
858        assert_eq!(restored.view.workspace_id, "ws-1");
859    }
860
861    // -----------------------------------------------------------------------
862    // create_checkpoint_op
863    // -----------------------------------------------------------------------
864
865    #[test]
866    fn create_checkpoint_op_produces_annotate_with_correct_key() {
867        let view = make_view("ws-1", 'a', 100);
868        let trigger = test_oid('1');
869        let parent = test_oid('2');
870
871        let op = create_checkpoint_op(&view, &trigger, &parent);
872
873        assert!(is_checkpoint(&op));
874        assert_eq!(op.parent_ids, vec![parent]);
875        assert_eq!(op.workspace_id, test_ws("ws-1"));
876    }
877
878    #[test]
879    fn create_checkpoint_op_data_is_extractable() {
880        let view = make_view("ws-1", 'a', 100);
881        let trigger = test_oid('1');
882        let parent = test_oid('2');
883
884        let op = create_checkpoint_op(&view, &trigger, &parent);
885        let extracted = extract_checkpoint(&op).expect("should extract checkpoint");
886
887        assert_eq!(extracted.op_count, 100);
888        assert_eq!(extracted.trigger_oid, trigger.as_str());
889        assert_eq!(extracted.view.workspace_id, "ws-1");
890    }
891
892    // -----------------------------------------------------------------------
893    // extract_checkpoint
894    // -----------------------------------------------------------------------
895
896    #[test]
897    fn extract_checkpoint_returns_none_for_non_checkpoint() {
898        let op = make_op("ws-1", OpPayload::Destroy);
899        assert!(extract_checkpoint(&op).is_none());
900    }
901
902    #[test]
903    fn extract_checkpoint_returns_none_for_wrong_key() {
904        let op = make_op(
905            "ws-1",
906            OpPayload::Annotate {
907                key: "not-a-checkpoint".to_owned(),
908                data: BTreeMap::new(),
909            },
910        );
911        assert!(extract_checkpoint(&op).is_none());
912    }
913
914    // -----------------------------------------------------------------------
915    // materialize_from_ops with checkpoint (unit-level)
916    // -----------------------------------------------------------------------
917
918    #[test]
919    fn materialize_from_ops_with_checkpoint_in_chain() {
920        let ps = test_patch_set('a');
921
922        // Build a chain: Create → Snapshot → Checkpoint → Describe
923        let ops = vec![
924            (
925                test_oid('1'),
926                make_op(
927                    "ws-1",
928                    OpPayload::Create {
929                        epoch: test_epoch('a'),
930                    },
931                ),
932            ),
933            (
934                test_oid('2'),
935                make_op(
936                    "ws-1",
937                    OpPayload::Snapshot {
938                        patch_set_oid: test_oid('d'),
939                    },
940                ),
941            ),
942        ];
943
944        // Full replay should give us the view at snapshot
945        let view = materialize_from_ops(test_ws("ws-1"), &ops, mock_reader(ps.clone())).unwrap();
946
947        assert_eq!(view.epoch, Some(test_epoch('a')));
948        assert_eq!(view.patch_set, Some(ps));
949        assert_eq!(view.op_count, 2);
950    }
951
952    // -----------------------------------------------------------------------
953    // Replay from checkpoint equivalence (property test)
954    // -----------------------------------------------------------------------
955
956    #[test]
957    fn checkpoint_restore_equals_full_replay() {
958        // Build view from full replay
959        let ps = test_patch_set('a');
960        let ops = vec![
961            (
962                test_oid('1'),
963                make_op(
964                    "ws-1",
965                    OpPayload::Create {
966                        epoch: test_epoch('a'),
967                    },
968                ),
969            ),
970            (
971                test_oid('2'),
972                make_op(
973                    "ws-1",
974                    OpPayload::Snapshot {
975                        patch_set_oid: test_oid('d'),
976                    },
977                ),
978            ),
979            (
980                test_oid('3'),
981                make_op(
982                    "ws-1",
983                    OpPayload::Describe {
984                        message: "checkpoint test".into(),
985                    },
986                ),
987            ),
988        ];
989
990        let full_view =
991            materialize_from_ops(test_ws("ws-1"), &ops, mock_reader(ps.clone())).unwrap();
992
993        // Create checkpoint from the view at op 2
994        let partial_view =
995            materialize_from_ops(test_ws("ws-1"), &ops[..2], mock_reader(ps.clone())).unwrap();
996        let cp_view = CheckpointView::from_view(&partial_view);
997        let mut restored = cp_view.to_view(2).unwrap();
998
999        // Replay the remaining op (Describe)
1000        let remaining_ops = &ops[2..];
1001        for (oid, op) in remaining_ops {
1002            replay_single_op(&mut restored, oid, op, &mock_reader(ps.clone())).unwrap();
1003        }
1004
1005        // Should be equivalent to full replay
1006        assert_eq!(restored.epoch, full_view.epoch);
1007        assert_eq!(restored.patch_set, full_view.patch_set);
1008        assert_eq!(restored.description, full_view.description);
1009        assert_eq!(restored.is_destroyed, full_view.is_destroyed);
1010        assert_eq!(restored.op_count, full_view.op_count);
1011    }
1012
1013    // -----------------------------------------------------------------------
1014    // Compaction determinism
1015    // -----------------------------------------------------------------------
1016
1017    #[test]
1018    fn compaction_produces_same_view() {
1019        // This is a logical test: given the same checkpoint data, compaction
1020        // always produces the same synthetic root.
1021        let view1 = make_view("ws-1", 'a', 100);
1022        let view2 = make_view("ws-1", 'a', 100);
1023
1024        let cp1 = CheckpointView::from_view(&view1);
1025        let cp2 = CheckpointView::from_view(&view2);
1026
1027        assert_eq!(cp1, cp2, "deterministic: same view → same checkpoint");
1028    }
1029
1030    // -----------------------------------------------------------------------
1031    // Error cases
1032    // -----------------------------------------------------------------------
1033
1034    #[test]
1035    fn checkpoint_view_invalid_epoch() {
1036        let cp_view = CheckpointView {
1037            workspace_id: "ws-1".into(),
1038            epoch: Some("not-a-valid-oid".into()),
1039            patch_set: None,
1040            patch_set_oid: None,
1041            description: None,
1042            annotations: BTreeMap::new(),
1043            is_destroyed: false,
1044        };
1045
1046        let result = cp_view.to_view(0);
1047        assert!(result.is_err());
1048    }
1049
1050    #[test]
1051    fn checkpoint_view_invalid_workspace_id() {
1052        let cp_view = CheckpointView {
1053            workspace_id: String::new(),
1054            epoch: None,
1055            patch_set: None,
1056            patch_set_oid: None,
1057            description: None,
1058            annotations: BTreeMap::new(),
1059            is_destroyed: false,
1060        };
1061
1062        let result = cp_view.to_view(0);
1063        assert!(result.is_err());
1064    }
1065
1066    // -----------------------------------------------------------------------
1067    // Error display
1068    // -----------------------------------------------------------------------
1069
1070    #[test]
1071    fn error_display_no_checkpoint() {
1072        let err = CheckpointError::NoCheckpoint {
1073            workspace_id: test_ws("agent-1"),
1074        };
1075        let msg = format!("{err}");
1076        assert!(msg.contains("agent-1"));
1077        assert!(msg.contains("no checkpoint"));
1078    }
1079
1080    #[test]
1081    fn error_display_invalid_data() {
1082        let err = CheckpointError::InvalidData {
1083            detail: "bad epoch".into(),
1084        };
1085        let msg = format!("{err}");
1086        assert!(msg.contains("invalid data"));
1087        assert!(msg.contains("bad epoch"));
1088    }
1089
1090    // -----------------------------------------------------------------------
1091    // Integration: checkpoint + compact in-memory
1092    // -----------------------------------------------------------------------
1093
1094    #[test]
1095    fn checkpoint_interval_logic_over_sequence() {
1096        let interval = 3;
1097
1098        // Simulate 10 operations
1099        let checkpoints: Vec<usize> = (1..=10)
1100            .filter(|n| should_checkpoint(*n, interval))
1101            .collect();
1102
1103        assert_eq!(checkpoints, vec![3, 6, 9]);
1104    }
1105
1106    #[test]
1107    fn maybe_write_checkpoint_respects_interval() {
1108        // When op_count is not at the interval, should return None
1109        let _view = MaterializedView {
1110            op_count: 50,
1111            ..make_view("ws-1", 'a', 50)
1112        };
1113
1114        // We can't call maybe_write_checkpoint without a real repo,
1115        // but we can test the should_checkpoint guard
1116        assert!(!should_checkpoint(50, 100));
1117        assert!(should_checkpoint(100, 100));
1118    }
1119
1120    // -----------------------------------------------------------------------
1121    // Full integration with git (requires tempdir + git init)
1122    // -----------------------------------------------------------------------
1123
1124    fn setup_repo() -> (tempfile::TempDir, std::path::PathBuf) {
1125        use std::process::Command;
1126
1127        let dir = tempfile::TempDir::new().unwrap();
1128        let root = dir.path().to_path_buf();
1129
1130        Command::new("git")
1131            .args(["init"])
1132            .current_dir(&root)
1133            .output()
1134            .unwrap();
1135        Command::new("git")
1136            .args(["config", "user.name", "Test"])
1137            .current_dir(&root)
1138            .output()
1139            .unwrap();
1140        Command::new("git")
1141            .args(["config", "user.email", "test@test.com"])
1142            .current_dir(&root)
1143            .output()
1144            .unwrap();
1145        Command::new("git")
1146            .args(["config", "commit.gpgsign", "false"])
1147            .current_dir(&root)
1148            .output()
1149            .unwrap();
1150
1151        std::fs::write(root.join("README.md"), "# Test\n").unwrap();
1152        Command::new("git")
1153            .args(["add", "README.md"])
1154            .current_dir(&root)
1155            .output()
1156            .unwrap();
1157        Command::new("git")
1158            .args(["commit", "-m", "initial"])
1159            .current_dir(&root)
1160            .output()
1161            .unwrap();
1162
1163        (dir, root)
1164    }
1165
1166    #[test]
1167    fn integration_write_checkpoint_and_compact() {
1168        let (_dir, root) = setup_repo();
1169        let ws_id = test_ws("agent-1");
1170
1171        // Build a chain of 5 operations
1172        let op1 = Operation {
1173            parent_ids: vec![],
1174            workspace_id: ws_id.clone(),
1175            timestamp: "2026-02-19T12:00:00Z".into(),
1176            payload: OpPayload::Create {
1177                epoch: test_epoch('a'),
1178            },
1179        };
1180        let oid1 = append_operation(&root, &ws_id, &op1, None).unwrap();
1181
1182        let op2 = Operation {
1183            parent_ids: vec![oid1.clone()],
1184            workspace_id: ws_id.clone(),
1185            timestamp: "2026-02-19T12:01:00Z".into(),
1186            payload: OpPayload::Describe {
1187                message: "step 2".into(),
1188            },
1189        };
1190        let oid2 = append_operation(&root, &ws_id, &op2, Some(&oid1)).unwrap();
1191
1192        let op3 = Operation {
1193            parent_ids: vec![oid2.clone()],
1194            workspace_id: ws_id.clone(),
1195            timestamp: "2026-02-19T12:02:00Z".into(),
1196            payload: OpPayload::Describe {
1197                message: "step 3".into(),
1198            },
1199        };
1200        let oid3 = append_operation(&root, &ws_id, &op3, Some(&oid2)).unwrap();
1201
1202        // Materialize the view at this point
1203        let ps = test_patch_set('a');
1204        let ops = vec![(oid1, op1), (oid2, op2), (oid3.clone(), op3)];
1205        let view = materialize_from_ops(ws_id.clone(), &ops, mock_reader(ps.clone())).unwrap();
1206        assert_eq!(view.op_count, 3);
1207
1208        // Write a checkpoint
1209        let cp_oid = maybe_write_checkpoint(
1210            &root, &ws_id, &view, &oid3, &oid3, 3, // checkpoint every 3 ops
1211        )
1212        .unwrap();
1213        assert!(cp_oid.is_some(), "should write checkpoint at op 3");
1214        let cp_oid = cp_oid.unwrap();
1215
1216        // Add two more operations after checkpoint
1217        let op4 = Operation {
1218            parent_ids: vec![cp_oid.clone()],
1219            workspace_id: ws_id.clone(),
1220            timestamp: "2026-02-19T12:03:00Z".into(),
1221            payload: OpPayload::Describe {
1222                message: "step 4 after checkpoint".into(),
1223            },
1224        };
1225        let oid4 = append_operation(&root, &ws_id, &op4, Some(&cp_oid)).unwrap();
1226
1227        let op5 = Operation {
1228            parent_ids: vec![oid4.clone()],
1229            workspace_id: ws_id.clone(),
1230            timestamp: "2026-02-19T12:04:00Z".into(),
1231            payload: OpPayload::Describe {
1232                message: "step 5".into(),
1233            },
1234        };
1235        let _oid5 = append_operation(&root, &ws_id, &op5, Some(&oid4)).unwrap();
1236
1237        // Compact: should reduce chain from 6 ops to 4 (synthetic root + checkpoint + 2 post-cp ops)
1238        let result = compact(&root, &ws_id).unwrap();
1239        assert_eq!(result.ops_before, 6); // create + 2 describe + checkpoint + 2 describe
1240        assert_eq!(result.ops_after, 4); // synthetic create + checkpoint + 2 describes
1241
1242        // Verify the chain is correct after compaction
1243        let stop_pred: Option<&dyn Fn(&Operation) -> bool> = None;
1244        let chain = walk_chain(&root, &ws_id, None, stop_pred).unwrap();
1245        assert_eq!(chain.len(), 4);
1246
1247        // The view from compacted chain should match
1248        let mut chain_causal: Vec<_> = chain;
1249        chain_causal.reverse();
1250        let compacted_view = materialize_from_ops(ws_id, &chain_causal, mock_reader(ps)).unwrap();
1251
1252        assert_eq!(compacted_view.description, Some("step 5".into()));
1253        assert_eq!(compacted_view.epoch, Some(test_epoch('a')));
1254    }
1255
1256    #[test]
1257    fn integration_materialize_from_checkpoint() {
1258        let (_dir, root) = setup_repo();
1259        let ws_id = test_ws("agent-1");
1260
1261        // Build chain: Create → Describe → Describe (checkpoint) → Describe
1262        let op1 = Operation {
1263            parent_ids: vec![],
1264            workspace_id: ws_id.clone(),
1265            timestamp: "2026-02-19T12:00:00Z".into(),
1266            payload: OpPayload::Create {
1267                epoch: test_epoch('a'),
1268            },
1269        };
1270        let oid1 = append_operation(&root, &ws_id, &op1, None).unwrap();
1271
1272        let op2 = Operation {
1273            parent_ids: vec![oid1.clone()],
1274            workspace_id: ws_id.clone(),
1275            timestamp: "2026-02-19T12:01:00Z".into(),
1276            payload: OpPayload::Describe {
1277                message: "step 2".into(),
1278            },
1279        };
1280        let oid2 = append_operation(&root, &ws_id, &op2, Some(&oid1)).unwrap();
1281
1282        let op3 = Operation {
1283            parent_ids: vec![oid2.clone()],
1284            workspace_id: ws_id.clone(),
1285            timestamp: "2026-02-19T12:02:00Z".into(),
1286            payload: OpPayload::Describe {
1287                message: "step 3".into(),
1288            },
1289        };
1290        let oid3 = append_operation(&root, &ws_id, &op3, Some(&oid2)).unwrap();
1291
1292        // Materialize view at step 3 and write checkpoint
1293        let ps = test_patch_set('a');
1294        let ops = vec![(oid1, op1), (oid2, op2), (oid3.clone(), op3)];
1295        let view = materialize_from_ops(ws_id.clone(), &ops, mock_reader(ps.clone())).unwrap();
1296
1297        let cp_oid = maybe_write_checkpoint(&root, &ws_id, &view, &oid3, &oid3, 3)
1298            .unwrap()
1299            .expect("checkpoint should be written");
1300
1301        // Add an operation after checkpoint
1302        let op4 = Operation {
1303            parent_ids: vec![cp_oid.clone()],
1304            workspace_id: ws_id.clone(),
1305            timestamp: "2026-02-19T12:03:00Z".into(),
1306            payload: OpPayload::Describe {
1307                message: "step 4 after checkpoint".into(),
1308            },
1309        };
1310        let _oid4 = append_operation(&root, &ws_id, &op4, Some(&cp_oid)).unwrap();
1311
1312        // Materialize from checkpoint
1313        let cp_view = materialize_from_checkpoint(&root, &ws_id, mock_reader(ps)).unwrap();
1314
1315        // Should have the latest description
1316        assert_eq!(cp_view.description, Some("step 4 after checkpoint".into()));
1317        assert_eq!(cp_view.epoch, Some(test_epoch('a')));
1318    }
1319
1320    #[test]
1321    fn integration_no_checkpoint_falls_back_to_full_replay() {
1322        let (_dir, root) = setup_repo();
1323        let ws_id = test_ws("agent-1");
1324
1325        let op1 = Operation {
1326            parent_ids: vec![],
1327            workspace_id: ws_id.clone(),
1328            timestamp: "2026-02-19T12:00:00Z".into(),
1329            payload: OpPayload::Create {
1330                epoch: test_epoch('a'),
1331            },
1332        };
1333        let oid1 = append_operation(&root, &ws_id, &op1, None).unwrap();
1334
1335        let op2 = Operation {
1336            parent_ids: vec![oid1.clone()],
1337            workspace_id: ws_id.clone(),
1338            timestamp: "2026-02-19T12:01:00Z".into(),
1339            payload: OpPayload::Describe {
1340                message: "no checkpoint here".into(),
1341            },
1342        };
1343        let _oid2 = append_operation(&root, &ws_id, &op2, Some(&oid1)).unwrap();
1344
1345        let ps = test_patch_set('a');
1346        let view = materialize_from_checkpoint(&root, &ws_id, mock_reader(ps)).unwrap();
1347
1348        assert_eq!(view.description, Some("no checkpoint here".into()));
1349        assert_eq!(view.op_count, 2);
1350    }
1351
1352    #[test]
1353    fn integration_compact_without_checkpoint_fails() {
1354        let (_dir, root) = setup_repo();
1355        let ws_id = test_ws("agent-1");
1356
1357        let op1 = Operation {
1358            parent_ids: vec![],
1359            workspace_id: ws_id.clone(),
1360            timestamp: "2026-02-19T12:00:00Z".into(),
1361            payload: OpPayload::Create {
1362                epoch: test_epoch('a'),
1363            },
1364        };
1365        let _oid1 = append_operation(&root, &ws_id, &op1, None).unwrap();
1366
1367        let result = compact(&root, &ws_id);
1368        assert!(
1369            matches!(result, Err(CheckpointError::NoCheckpoint { .. })),
1370            "compact without checkpoint should fail"
1371        );
1372    }
1373}