Skip to main content

laminar_storage/checkpoint/
layout.rs

1//! Object-store checkpoint layout for distributed checkpoints.
2//!
3//! Defines the directory structure, manifest format, and path conventions
4//! for checkpoints stored in object stores (S3, GCS, Azure Blob, local FS).
5//!
6//! ## Directory Layout
7//!
8//! ```text
9//! checkpoints/
10//! ├── _latest                          # Pointer to latest manifest
11//! ├── {checkpoint_id}/
12//! │   ├── manifest.json                # CheckpointManifestV2
13//! │   ├── operators/
14//! │   │   └── {operator_name}/
15//! │   │       └── partition-{id}.snap  # Full state snapshot
16//! │   │       └── partition-{id}.delta # Incremental delta
17//! │   └── offsets/
18//! │       └── {source_name}.json       # Source offset data
19//! └── {checkpoint_id}/
20//!     └── ...
21//! ```
22//!
23//! ## Checkpoint IDs
24//!
25//! `CheckpointId` wraps a UUID v7, which is time-sortable. This means
26//! lexicographic sorting of checkpoint directories equals chronological
27//! ordering — no need to parse timestamps or sequence numbers.
28
29#[allow(clippy::disallowed_types)] // cold path: checkpoint recovery
30use std::collections::HashMap;
31use std::fmt;
32
33use serde::{Deserialize, Serialize};
34use uuid::Uuid;
35
36/// A time-sortable checkpoint identifier based on UUID v7.
37///
38/// UUID v7 embeds a Unix timestamp in the most significant bits, so
39/// lexicographic ordering of the hex representation matches chronological
40/// order. This simplifies object-store listing and garbage collection.
41#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
42pub struct CheckpointId(Uuid);
43
44impl CheckpointId {
45    /// Generate a new checkpoint ID with the current timestamp.
46    #[must_use]
47    pub fn now() -> Self {
48        Self(Uuid::now_v7())
49    }
50
51    /// Create a checkpoint ID from an existing UUID.
52    #[must_use]
53    pub const fn from_uuid(uuid: Uuid) -> Self {
54        Self(uuid)
55    }
56
57    /// Get the underlying UUID.
58    #[must_use]
59    pub const fn as_uuid(&self) -> Uuid {
60        self.0
61    }
62
63    /// Return the hyphenated string representation.
64    #[must_use]
65    pub fn to_string_id(&self) -> String {
66        self.0.to_string()
67    }
68}
69
70impl fmt::Display for CheckpointId {
71    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72        write!(f, "{}", self.0)
73    }
74}
75
76/// Deterministic path generator for checkpoint artifacts in object stores.
77///
78/// All paths are relative to a configurable `base_prefix`. This struct
79/// is stateless — it only computes paths, never performs I/O.
80#[derive(Debug, Clone)]
81pub struct CheckpointPaths {
82    /// Base prefix in the object store (e.g., `"checkpoints/"`)
83    pub(crate) base_prefix: String,
84}
85
86impl CheckpointPaths {
87    /// Create a new path generator with the given base prefix.
88    ///
89    /// The prefix should end with `/` (one is appended if missing).
90    #[must_use]
91    pub fn new(base_prefix: &str) -> Self {
92        let base_prefix = if base_prefix.ends_with('/') {
93            base_prefix.to_string()
94        } else {
95            format!("{base_prefix}/")
96        };
97        Self { base_prefix }
98    }
99
100    /// Path to the `_latest` pointer file.
101    #[must_use]
102    pub fn latest_pointer(&self) -> String {
103        format!("{}_{}", self.base_prefix, "latest")
104    }
105
106    /// Root directory for a specific checkpoint.
107    #[must_use]
108    pub fn checkpoint_dir(&self, id: &CheckpointId) -> String {
109        format!("{}{}/", self.base_prefix, id)
110    }
111
112    /// Path to the manifest file for a checkpoint.
113    #[must_use]
114    pub fn manifest(&self, id: &CheckpointId) -> String {
115        format!("{}{}manifest.json", self.base_prefix, id)
116    }
117
118    /// Path to a full state snapshot for an operator partition.
119    #[must_use]
120    pub fn snapshot(&self, id: &CheckpointId, operator: &str, partition: u32) -> String {
121        format!(
122            "{}{}operators/{}/partition-{partition}.snap",
123            self.base_prefix, id, operator
124        )
125    }
126
127    /// Path to an incremental delta for an operator partition.
128    #[must_use]
129    pub fn delta(&self, id: &CheckpointId, operator: &str, partition: u32) -> String {
130        format!(
131            "{}{}operators/{}/partition-{partition}.delta",
132            self.base_prefix, id, operator
133        )
134    }
135
136    /// Path to a source offset file.
137    #[must_use]
138    pub fn source_offset(&self, id: &CheckpointId, source_name: &str) -> String {
139        format!("{}{}offsets/{source_name}.json", self.base_prefix, id)
140    }
141}
142
143impl Default for CheckpointPaths {
144    fn default() -> Self {
145        Self::new("checkpoints/")
146    }
147}
148
149/// V2 checkpoint manifest for distributed object-store checkpoints.
150///
151/// This extends the file-system [`CheckpointManifest`](super::super::CheckpointManifest)
152/// with per-operator partition entries, incremental delta support, and
153/// source offset tracking suitable for multi-partition recovery.
154#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
155pub struct CheckpointManifestV2 {
156    /// Manifest format version (always 2 for this type).
157    pub version: u32,
158    /// Unique checkpoint identifier (UUID v7).
159    pub checkpoint_id: CheckpointId,
160    /// Monotonically increasing epoch number.
161    pub epoch: u64,
162    /// Timestamp when checkpoint was created (millis since Unix epoch).
163    pub timestamp_ms: u64,
164
165    /// Per-operator snapshot entries.
166    #[serde(default)]
167    pub operators: HashMap<String, OperatorSnapshotEntry>,
168
169    /// Per-source offset entries.
170    #[serde(default)]
171    pub source_offsets: HashMap<String, SourceOffsetEntry>,
172
173    /// Parent checkpoint ID for incremental checkpoints.
174    #[serde(default)]
175    pub parent_id: Option<CheckpointId>,
176
177    /// Global watermark at checkpoint time.
178    #[serde(default)]
179    pub watermark: Option<i64>,
180
181    /// Total size of all checkpoint artifacts in bytes.
182    #[serde(default)]
183    pub total_size_bytes: u64,
184}
185
186impl CheckpointManifestV2 {
187    /// Create a new V2 manifest.
188    #[must_use]
189    #[allow(clippy::cast_possible_truncation)]
190    pub fn new(checkpoint_id: CheckpointId, epoch: u64) -> Self {
191        let timestamp_ms = std::time::SystemTime::now()
192            .duration_since(std::time::UNIX_EPOCH)
193            .unwrap_or_default()
194            .as_millis() as u64;
195
196        Self {
197            version: 2,
198            checkpoint_id,
199            epoch,
200            timestamp_ms,
201            operators: HashMap::new(),
202            source_offsets: HashMap::new(),
203            parent_id: None,
204            watermark: None,
205            total_size_bytes: 0,
206        }
207    }
208}
209
210/// Per-operator snapshot metadata in a V2 manifest.
211#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
212pub struct OperatorSnapshotEntry {
213    /// Per-partition snapshot/delta entries.
214    pub partitions: Vec<PartitionSnapshotEntry>,
215    /// Total bytes across all partitions.
216    #[serde(default)]
217    pub total_bytes: u64,
218}
219
220/// Per-partition snapshot or delta entry.
221#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
222pub struct PartitionSnapshotEntry {
223    /// Partition index.
224    pub partition_id: u32,
225    /// Whether this is a full snapshot or an incremental delta.
226    pub is_delta: bool,
227    /// Object-store path to the artifact (relative to checkpoint dir).
228    pub path: String,
229    /// Size in bytes.
230    pub size_bytes: u64,
231    /// SHA-256 hex digest for integrity verification.
232    #[serde(default)]
233    pub sha256: Option<String>,
234}
235
236/// Per-source offset entry for exactly-once recovery.
237#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
238pub struct SourceOffsetEntry {
239    /// Source type (e.g., "kafka", "postgres-cdc", "file").
240    pub source_type: String,
241    /// Source-specific offset data (key-value pairs).
242    pub offsets: HashMap<String, String>,
243    /// Epoch this offset entry belongs to.
244    pub epoch: u64,
245}
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250
251    #[test]
252    fn test_checkpoint_id_time_sortable() {
253        let id1 = CheckpointId::now();
254        // Ensure different timestamps (UUIDv7 has ms precision)
255        std::thread::sleep(std::time::Duration::from_millis(2));
256        let id2 = CheckpointId::now();
257
258        // UUID v7 is time-sortable: id1 < id2
259        assert!(id1 < id2, "UUID v7 should be time-sortable");
260
261        // String representation is also sortable
262        assert!(id1.to_string_id() < id2.to_string_id());
263    }
264
265    #[test]
266    fn test_checkpoint_id_display() {
267        let id = CheckpointId::now();
268        let s = id.to_string();
269        // UUID v7 format: 8-4-4-4-12
270        assert_eq!(s.len(), 36);
271        assert_eq!(s.chars().filter(|c| *c == '-').count(), 4);
272    }
273
274    #[test]
275    fn test_checkpoint_paths() {
276        let paths = CheckpointPaths::new("s3://my-bucket/checkpoints");
277        let id = CheckpointId::now();
278
279        let latest = paths.latest_pointer();
280        assert!(latest.ends_with("_latest"));
281
282        let manifest = paths.manifest(&id);
283        assert!(manifest.ends_with("manifest.json"));
284        assert!(manifest.contains(&id.to_string()));
285
286        let snap = paths.snapshot(&id, "window-agg", 3);
287        assert!(snap.contains("operators/window-agg/"));
288        assert!(snap.ends_with("partition-3.snap"));
289
290        let delta = paths.delta(&id, "window-agg", 3);
291        assert!(delta.ends_with("partition-3.delta"));
292
293        let offset = paths.source_offset(&id, "kafka-trades");
294        assert!(offset.ends_with("kafka-trades.json"));
295    }
296
297    #[test]
298    fn test_checkpoint_paths_trailing_slash() {
299        let paths1 = CheckpointPaths::new("prefix/");
300        let paths2 = CheckpointPaths::new("prefix");
301        let id = CheckpointId::now();
302
303        // Both should produce the same paths
304        assert_eq!(paths1.manifest(&id), paths2.manifest(&id));
305    }
306
307    #[test]
308    fn test_manifest_v2_json_round_trip() {
309        let id = CheckpointId::now();
310        let mut manifest = CheckpointManifestV2::new(id, 10);
311        manifest.watermark = Some(5000);
312        manifest.parent_id = Some(CheckpointId::now());
313
314        manifest.operators.insert(
315            "window-agg".into(),
316            OperatorSnapshotEntry {
317                partitions: vec![
318                    PartitionSnapshotEntry {
319                        partition_id: 0,
320                        is_delta: false,
321                        path: "operators/window-agg/partition-0.snap".into(),
322                        size_bytes: 1024,
323                        sha256: Some("abcd1234".into()),
324                    },
325                    PartitionSnapshotEntry {
326                        partition_id: 1,
327                        is_delta: true,
328                        path: "operators/window-agg/partition-1.delta".into(),
329                        size_bytes: 256,
330                        sha256: None,
331                    },
332                ],
333                total_bytes: 1280,
334            },
335        );
336
337        manifest.source_offsets.insert(
338            "kafka-trades".into(),
339            SourceOffsetEntry {
340                source_type: "kafka".into(),
341                offsets: HashMap::from([
342                    ("partition-0".into(), "1234".into()),
343                    ("partition-1".into(), "5678".into()),
344                ]),
345                epoch: 10,
346            },
347        );
348
349        let json = serde_json::to_string_pretty(&manifest).unwrap();
350        let restored: CheckpointManifestV2 = serde_json::from_str(&json).unwrap();
351
352        assert_eq!(restored.version, 2);
353        assert_eq!(restored.checkpoint_id, id);
354        assert_eq!(restored.epoch, 10);
355        assert_eq!(restored.watermark, Some(5000));
356        assert!(restored.parent_id.is_some());
357
358        let op = restored.operators.get("window-agg").unwrap();
359        assert_eq!(op.partitions.len(), 2);
360        assert_eq!(op.total_bytes, 1280);
361
362        let src = restored.source_offsets.get("kafka-trades").unwrap();
363        assert_eq!(src.source_type, "kafka");
364        assert_eq!(src.offsets.get("partition-0"), Some(&"1234".into()));
365    }
366
367    #[test]
368    fn test_manifest_v2_backward_compat_missing_fields() {
369        let id = CheckpointId::now();
370        let json = format!(
371            r#"{{
372                "version": 2,
373                "checkpoint_id": "{id}",
374                "epoch": 1,
375                "timestamp_ms": 1000
376            }}"#
377        );
378
379        let manifest: CheckpointManifestV2 = serde_json::from_str(&json).unwrap();
380        assert_eq!(manifest.version, 2);
381        assert!(manifest.operators.is_empty());
382        assert!(manifest.source_offsets.is_empty());
383        assert!(manifest.parent_id.is_none());
384        assert!(manifest.watermark.is_none());
385    }
386}