Skip to main content

laminar_storage/checkpoint/
mod.rs

1//! Checkpoint infrastructure for state persistence and recovery.
2//!
3//! - `layout`: object-store checkpoint layout
4//! - `checkpointer`: Async checkpoint persistence via object stores
5//! - `source_offsets`: Typed source position tracking
6
7/// Async checkpoint persistence via object stores.
8pub mod checkpointer;
9/// Object-store checkpoint layout with UUID v7 identifiers.
10pub mod layout;
11/// Distributed recovery manager.
12pub mod recovery;
13/// Typed source position tracking for checkpoint recovery.
14pub mod source_offsets;
15
16#[allow(clippy::disallowed_types)] // cold path: checkpoint recovery
17use std::collections::HashMap;
18use std::path::PathBuf;
19
20use anyhow::{Context, Result};
21
22use crate::wal::WalPosition;
23
24/// Checkpoint metadata stored alongside checkpoint data.
25#[derive(Debug)]
26pub struct CheckpointMetadata {
27    /// Unique checkpoint ID (monotonically increasing).
28    pub id: u64,
29
30    /// Unix timestamp when checkpoint was created.
31    pub timestamp: u64,
32
33    /// WAL position at time of checkpoint.
34    pub wal_position: WalPosition,
35
36    /// Source offsets for exactly-once semantics.
37    pub source_offsets: HashMap<String, u64>,
38
39    /// Size of the state snapshot in bytes.
40    pub state_size: u64,
41
42    /// Current watermark at checkpoint time (for recovery).
43    pub watermark: Option<i64>,
44}
45
46/// A completed checkpoint on disk.
47#[derive(Debug)]
48pub struct Checkpoint {
49    /// Checkpoint metadata.
50    pub metadata: CheckpointMetadata,
51
52    /// Path to checkpoint directory.
53    pub path: PathBuf,
54}
55
56impl Checkpoint {
57    /// Path to the metadata file.
58    #[must_use]
59    pub fn metadata_path(&self) -> PathBuf {
60        self.path.join("metadata.rkyv")
61    }
62
63    /// Path to the state snapshot file.
64    #[must_use]
65    pub fn state_path(&self) -> PathBuf {
66        self.path.join("state.rkyv")
67    }
68
69    /// Path to the source offsets file.
70    #[must_use]
71    pub fn offsets_path(&self) -> PathBuf {
72        self.path.join("offsets.json")
73    }
74
75    /// Load the state snapshot from disk.
76    ///
77    /// # Errors
78    ///
79    /// Returns an error if the state file cannot be read.
80    pub fn load_state(&self) -> Result<Vec<u8>> {
81        std::fs::read(self.state_path()).context("Failed to read state snapshot")
82    }
83
84    /// Load source offsets from disk.
85    ///
86    /// # Errors
87    ///
88    /// Returns an error if the offsets file cannot be read or parsed.
89    pub fn load_offsets(&self) -> Result<HashMap<String, u64>> {
90        let path = self.offsets_path();
91        if path.exists() {
92            let data = std::fs::read_to_string(&path).context("Failed to read source offsets")?;
93            serde_json::from_str(&data).context("Failed to parse source offsets")
94        } else {
95            Ok(HashMap::new())
96        }
97    }
98}