Skip to main content

laminar_storage/
checkpoint_store.rs

1//! Checkpoint persistence via the [`CheckpointStore`] trait.
2//!
3//! Provides a filesystem-backed implementation ([`FileSystemCheckpointStore`])
4//! that writes manifests as atomic JSON files with a `latest.txt` pointer
5//! for crash-safe recovery.
6//!
7//! ## Disk Layout
8//!
9//! ```text
10//! {base_dir}/checkpoints/
11//!   checkpoint_000001/
12//!     manifest.json     # CheckpointManifest as pretty-printed JSON
13//!     state.bin         # Optional: large operator state sidecar
14//!   checkpoint_000002/
15//!     manifest.json
16//!   latest.txt          # "checkpoint_000002" — pointer to latest good checkpoint
17//! ```
18
19use std::path::{Path, PathBuf};
20use std::sync::Arc;
21
22use object_store::{GetOptions, ObjectStore, PutMode, PutOptions, PutPayload};
23use sha2::{Digest, Sha256};
24use tracing::warn;
25
26use crate::checkpoint_manifest::CheckpointManifest;
27
28/// Fsync a file to ensure its contents are durable on disk.
29fn sync_file(path: &Path) -> Result<(), std::io::Error> {
30    // Must open with write access — Windows requires it for FlushFileBuffers.
31    let f = std::fs::OpenOptions::new().write(true).open(path)?;
32    f.sync_all()
33}
34
35/// Fsync a directory to make rename operations durable.
36///
37/// On Unix, this flushes directory metadata (new/renamed entries).
38/// On Windows, directory sync is not supported; the OS handles durability.
39#[allow(clippy::unnecessary_wraps)] // Returns Result on Unix, no-op on Windows
40fn sync_dir(path: &Path) -> Result<(), std::io::Error> {
41    #[cfg(unix)]
42    {
43        let f = std::fs::File::open(path)?;
44        f.sync_all()?;
45    }
46    #[cfg(not(unix))]
47    {
48        let _ = path;
49    }
50    Ok(())
51}
52
53/// Errors from checkpoint store operations.
54#[derive(Debug, thiserror::Error)]
55pub enum CheckpointStoreError {
56    /// I/O error during checkpoint persistence.
57    #[error("checkpoint I/O error: {0}")]
58    Io(#[from] std::io::Error),
59
60    /// JSON serialization/deserialization error.
61    #[error("checkpoint serialization error: {0}")]
62    Serde(#[from] serde_json::Error),
63
64    /// Checkpoint not found.
65    #[error("checkpoint {0} not found")]
66    NotFound(u64),
67
68    /// Object store error.
69    #[error("object store error: {0}")]
70    ObjectStore(#[from] object_store::Error),
71}
72
73// ---------------------------------------------------------------------------
74// Checkpoint validation types
75// ---------------------------------------------------------------------------
76
77/// Result of validating a single checkpoint.
78#[derive(Debug, Clone)]
79pub struct ValidationResult {
80    /// Checkpoint ID that was validated.
81    pub checkpoint_id: u64,
82    /// Whether the checkpoint is valid for recovery.
83    pub valid: bool,
84    /// Issues found during validation.
85    pub issues: Vec<String>,
86}
87
88/// Report from a crash-safe recovery walk.
89///
90/// Captures which checkpoints were tried, which were skipped (and why),
91/// and which was ultimately chosen for recovery.
92#[derive(Debug, Clone)]
93pub struct RecoveryReport {
94    /// The checkpoint that was selected for recovery (`None` if fresh start).
95    pub chosen_id: Option<u64>,
96    /// Checkpoints that were tried and skipped (id, reason).
97    pub skipped: Vec<(u64, String)>,
98    /// Total number of checkpoints examined.
99    pub examined: usize,
100    /// Elapsed time for the recovery walk.
101    pub elapsed: std::time::Duration,
102}
103
104/// Compute SHA-256 hex digest of data.
105fn sha256_hex(data: &[u8]) -> String {
106    let mut hasher = Sha256::new();
107    hasher.update(data);
108    format!("{:x}", hasher.finalize())
109}
110
111/// Trait for checkpoint persistence backends.
112///
113/// Implementations must guarantee atomic manifest writes (readers never see
114/// a partial manifest). The `latest.txt` pointer is updated only after the
115/// manifest is fully written and synced.
116pub trait CheckpointStore: Send + Sync {
117    /// Persists a checkpoint manifest atomically.
118    ///
119    /// The implementation writes to a temporary file and renames on success
120    /// to prevent partial writes from being visible.
121    ///
122    /// # Errors
123    ///
124    /// Returns [`CheckpointStoreError`] on I/O or serialization failure.
125    fn save(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError>;
126
127    /// Loads the most recent checkpoint manifest.
128    ///
129    /// Returns `Ok(None)` if no checkpoint exists yet.
130    ///
131    /// # Errors
132    ///
133    /// Returns [`CheckpointStoreError`] on I/O or deserialization failure.
134    fn load_latest(&self) -> Result<Option<CheckpointManifest>, CheckpointStoreError>;
135
136    /// Loads a specific checkpoint manifest by ID.
137    ///
138    /// # Errors
139    ///
140    /// Returns [`CheckpointStoreError::NotFound`] if the checkpoint does not exist.
141    fn load_by_id(&self, id: u64) -> Result<Option<CheckpointManifest>, CheckpointStoreError>;
142
143    /// Lists all available checkpoints as `(checkpoint_id, epoch)` pairs.
144    ///
145    /// Results are sorted by checkpoint ID ascending.
146    ///
147    /// # Errors
148    ///
149    /// Returns [`CheckpointStoreError`] on I/O failure.
150    fn list(&self) -> Result<Vec<(u64, u64)>, CheckpointStoreError>;
151
152    /// Lists all checkpoint IDs without loading manifests.
153    ///
154    /// This is used by crash recovery to enumerate candidates including
155    /// those with corrupt manifests. Results are sorted ascending.
156    ///
157    /// # Errors
158    ///
159    /// Returns [`CheckpointStoreError`] on I/O failure.
160    fn list_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
161        // Default implementation: extract IDs from list().
162        Ok(self.list()?.iter().map(|(id, _)| *id).collect())
163    }
164
165    /// Prunes old checkpoints, keeping at most `keep_count` recent ones.
166    ///
167    /// Returns the number of checkpoints removed.
168    ///
169    /// # Errors
170    ///
171    /// Returns [`CheckpointStoreError`] on I/O failure.
172    fn prune(&self, keep_count: usize) -> Result<usize, CheckpointStoreError>;
173
174    /// Overwrites an existing checkpoint manifest.
175    ///
176    /// Used by Step 6b of the checkpoint protocol to update sink commit
177    /// statuses after the initial `save()`. Unlike `save()`, this does NOT
178    /// use conditional PUT — it unconditionally overwrites the manifest.
179    ///
180    /// The default implementation delegates to `save()`, which is correct
181    /// for backends where `save()` is idempotent (e.g., filesystem with
182    /// write-to-temp + rename).
183    ///
184    /// # Errors
185    ///
186    /// Returns [`CheckpointStoreError`] on I/O or serialization failure.
187    fn update_manifest(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError> {
188        self.save(manifest)
189    }
190
191    /// Writes large operator state data to a sidecar file for a given checkpoint.
192    ///
193    /// # Errors
194    ///
195    /// Returns [`CheckpointStoreError`] on I/O failure.
196    fn save_state_data(&self, id: u64, data: &[u8]) -> Result<(), CheckpointStoreError>;
197
198    /// Loads large operator state data from a sidecar file.
199    ///
200    /// Returns `Ok(None)` if no sidecar file exists.
201    ///
202    /// # Errors
203    ///
204    /// Returns [`CheckpointStoreError`] on I/O failure.
205    fn load_state_data(&self, id: u64) -> Result<Option<Vec<u8>>, CheckpointStoreError>;
206
207    /// Validate a specific checkpoint's integrity.
208    ///
209    /// Checks that the manifest is parseable and, if a `state_checksum` is
210    /// present, verifies the sidecar data matches.
211    ///
212    /// # Errors
213    ///
214    /// Returns [`CheckpointStoreError`] on I/O failure.
215    fn validate_checkpoint(&self, id: u64) -> Result<ValidationResult, CheckpointStoreError> {
216        let mut issues = Vec::new();
217
218        // Load manifest — corrupt JSON is a validation failure, not an I/O error.
219        let manifest = match self.load_by_id(id) {
220            Ok(Some(m)) => m,
221            Ok(None) => {
222                return Ok(ValidationResult {
223                    checkpoint_id: id,
224                    valid: false,
225                    issues: vec![format!("manifest not found for checkpoint {id}")],
226                });
227            }
228            Err(CheckpointStoreError::Serde(e)) => {
229                return Ok(ValidationResult {
230                    checkpoint_id: id,
231                    valid: false,
232                    issues: vec![format!("corrupt manifest: {e}")],
233                });
234            }
235            Err(e) => return Err(e),
236        };
237
238        // Basic manifest validation
239        for err in manifest.validate() {
240            issues.push(format!("manifest validation: {err}"));
241        }
242
243        // Verify state sidecar checksum
244        if let Some(expected) = &manifest.state_checksum {
245            match self.load_state_data(id)? {
246                Some(data) => {
247                    let actual = sha256_hex(&data);
248                    if actual != *expected {
249                        issues.push(format!(
250                            "state.bin checksum mismatch: expected {expected}, got {actual}"
251                        ));
252                    }
253                }
254                None => {
255                    issues.push("state.bin referenced by checksum but not found".into());
256                }
257            }
258        }
259
260        // epoch=0 or checkpoint_id=0 indicates a corrupted or nonsensical
261        // manifest — reject as invalid regardless of other issues.
262        if manifest.epoch == 0 || manifest.checkpoint_id == 0 {
263            issues.push("epoch or checkpoint_id is 0 — likely corrupted".into());
264            return Ok(ValidationResult {
265                checkpoint_id: id,
266                valid: false,
267                issues,
268            });
269        }
270
271        let valid =
272            issues.is_empty() || issues.iter().all(|i| i.starts_with("manifest validation:"));
273        Ok(ValidationResult {
274            checkpoint_id: id,
275            valid,
276            issues,
277        })
278    }
279
280    /// Walk backward from latest to find the first valid checkpoint.
281    ///
282    /// Returns a [`RecoveryReport`] describing the walk. If no valid
283    /// checkpoint is found, `chosen_id` is `None` (fresh start).
284    ///
285    /// # Errors
286    ///
287    /// Returns [`CheckpointStoreError`] on I/O failure.
288    fn recover_latest_validated(&self) -> Result<RecoveryReport, CheckpointStoreError> {
289        let start = std::time::Instant::now();
290        let mut skipped = Vec::new();
291
292        // Get all checkpoint IDs sorted descending (newest first).
293        // Uses list_ids() instead of list() so corrupt manifests are still
294        // enumerated (list() silently skips them).
295        let mut ids = self.list_ids()?;
296        ids.sort_unstable();
297        ids.reverse();
298
299        let examined = ids.len();
300
301        for id in &ids {
302            let result = self.validate_checkpoint(*id)?;
303            if result.valid {
304                return Ok(RecoveryReport {
305                    chosen_id: Some(*id),
306                    skipped,
307                    examined,
308                    elapsed: start.elapsed(),
309                });
310            }
311            let reason = result.issues.join("; ");
312            warn!(
313                checkpoint_id = id,
314                reason = %reason,
315                "skipping invalid checkpoint"
316            );
317            skipped.push((*id, reason));
318        }
319
320        Ok(RecoveryReport {
321            chosen_id: None,
322            skipped,
323            examined,
324            elapsed: start.elapsed(),
325        })
326    }
327
328    /// Delete orphaned state files that have no matching manifest.
329    ///
330    /// Returns the number of orphans cleaned up.
331    ///
332    /// # Errors
333    ///
334    /// Returns [`CheckpointStoreError`] on I/O failure.
335    fn cleanup_orphans(&self) -> Result<usize, CheckpointStoreError> {
336        // Default: no-op. Overridden by implementations that can detect orphans.
337        Ok(0)
338    }
339
340    /// Atomically saves a checkpoint manifest with optional sidecar state data.
341    ///
342    /// When `state_data` is provided, the sidecar (`state.bin`) is written and
343    /// fsynced **before** the manifest. This ensures that if the sidecar write
344    /// fails, the manifest is never persisted and `latest.txt` still points to
345    /// the previous valid checkpoint.
346    ///
347    /// Orphaned `state.bin` files (written but no manifest) are harmless and
348    /// cleaned up by [`prune()`](Self::prune).
349    ///
350    /// # Errors
351    ///
352    /// Returns [`CheckpointStoreError`] on I/O or serialization failure.
353    fn save_with_state(
354        &self,
355        manifest: &CheckpointManifest,
356        state_data: Option<&[u8]>,
357    ) -> Result<(), CheckpointStoreError> {
358        let mut manifest = manifest.clone();
359        if let Some(data) = state_data {
360            // Compute checksum from in-memory bytes before writing. This is safe
361            // because: (1) save_state_data writes to a temp file then renames
362            // atomically, so the on-disk bytes match the in-memory data exactly;
363            // (2) if the sidecar write fails, save() is never called, so the
364            // manifest with the checksum is never persisted.
365            manifest.state_checksum = Some(sha256_hex(data));
366            self.save_state_data(manifest.checkpoint_id, data)?;
367        }
368        self.save(&manifest)
369    }
370}
371
372/// Filesystem-backed checkpoint store.
373///
374/// Writes checkpoint manifests as JSON files with atomic rename semantics.
375/// A `latest.txt` pointer (not a symlink) tracks the most recent checkpoint
376/// for Windows compatibility.
377pub struct FileSystemCheckpointStore {
378    base_dir: PathBuf,
379    max_retained: usize,
380}
381
382impl FileSystemCheckpointStore {
383    /// Creates a new filesystem checkpoint store.
384    ///
385    /// The `base_dir` is the parent directory; checkpoints are stored under
386    /// `{base_dir}/checkpoints/`. The directory is created lazily on first save.
387    #[must_use]
388    pub fn new(base_dir: impl Into<PathBuf>, max_retained: usize) -> Self {
389        Self {
390            base_dir: base_dir.into(),
391            max_retained,
392        }
393    }
394
395    /// Returns the checkpoints directory path.
396    fn checkpoints_dir(&self) -> PathBuf {
397        self.base_dir.join("checkpoints")
398    }
399
400    /// Returns the directory path for a specific checkpoint.
401    fn checkpoint_dir(&self, id: u64) -> PathBuf {
402        self.checkpoints_dir().join(format!("checkpoint_{id:06}"))
403    }
404
405    /// Returns the manifest file path for a specific checkpoint.
406    fn manifest_path(&self, id: u64) -> PathBuf {
407        self.checkpoint_dir(id).join("manifest.json")
408    }
409
410    /// Returns the state sidecar file path for a specific checkpoint.
411    fn state_path(&self, id: u64) -> PathBuf {
412        self.checkpoint_dir(id).join("state.bin")
413    }
414
415    /// Returns the latest.txt pointer path.
416    fn latest_path(&self) -> PathBuf {
417        self.checkpoints_dir().join("latest.txt")
418    }
419
420    /// Parses a checkpoint ID from a directory name like `checkpoint_000042`.
421    fn parse_checkpoint_id(name: &str) -> Option<u64> {
422        name.strip_prefix("checkpoint_")
423            .and_then(|s| s.parse().ok())
424    }
425
426    /// Collects and sorts all checkpoint directory entries.
427    fn sorted_checkpoint_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
428        let dir = self.checkpoints_dir();
429        if !dir.exists() {
430            return Ok(Vec::new());
431        }
432
433        let mut ids: Vec<u64> = std::fs::read_dir(&dir)?
434            .filter_map(Result::ok)
435            .filter(|e| e.path().is_dir())
436            .filter_map(|e| e.file_name().to_str().and_then(Self::parse_checkpoint_id))
437            .collect();
438
439        ids.sort_unstable();
440        Ok(ids)
441    }
442}
443
444impl FileSystemCheckpointStore {
445    /// Find checkpoint directories that have state.bin but no manifest.json
446    /// (orphaned from a crash after sidecar write but before manifest commit).
447    fn find_orphan_dirs(&self) -> Result<Vec<PathBuf>, CheckpointStoreError> {
448        let dir = self.checkpoints_dir();
449        if !dir.exists() {
450            return Ok(Vec::new());
451        }
452
453        let mut orphans = Vec::new();
454        for entry in std::fs::read_dir(&dir)? {
455            let entry = entry?;
456            let path = entry.path();
457            if !path.is_dir() {
458                continue;
459            }
460            let has_state = path.join("state.bin").exists();
461            let has_manifest = path.join("manifest.json").exists();
462            if has_state && !has_manifest {
463                orphans.push(path);
464            }
465        }
466        Ok(orphans)
467    }
468}
469
470impl CheckpointStore for FileSystemCheckpointStore {
471    fn save(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError> {
472        let cp_dir = self.checkpoint_dir(manifest.checkpoint_id);
473        std::fs::create_dir_all(&cp_dir)?;
474
475        let manifest_path = self.manifest_path(manifest.checkpoint_id);
476        let json = serde_json::to_string_pretty(manifest)?;
477
478        // Write to a temp file, fsync, then rename for atomic durability
479        let tmp_path = manifest_path.with_extension("json.tmp");
480        std::fs::write(&tmp_path, &json)?;
481        sync_file(&tmp_path)?;
482        std::fs::rename(&tmp_path, &manifest_path)?;
483        sync_dir(&cp_dir)?;
484
485        // Update latest.txt pointer — only after manifest is durable
486        let latest = self.latest_path();
487        let latest_dir = latest.parent().unwrap_or(Path::new("."));
488        std::fs::create_dir_all(latest_dir)?;
489        let latest_content = format!("checkpoint_{:06}", manifest.checkpoint_id);
490        let tmp_latest = latest.with_extension("txt.tmp");
491        std::fs::write(&tmp_latest, &latest_content)?;
492        sync_file(&tmp_latest)?;
493        std::fs::rename(&tmp_latest, &latest)?;
494        sync_dir(latest_dir)?;
495
496        // Auto-prune if configured
497        if self.max_retained > 0 {
498            if let Err(e) = self.prune(self.max_retained) {
499                tracing::warn!(
500                    max_retained = self.max_retained,
501                    error = %e,
502                    "[LDB-6009] Checkpoint prune failed — old checkpoints may accumulate on disk"
503                );
504            }
505        }
506
507        Ok(())
508    }
509
510    fn load_latest(&self) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
511        let latest = self.latest_path();
512        if !latest.exists() {
513            return Ok(None);
514        }
515
516        let content = std::fs::read_to_string(&latest)?;
517        let dir_name = content.trim();
518        if dir_name.is_empty() {
519            return Ok(None);
520        }
521
522        let id = Self::parse_checkpoint_id(dir_name);
523        match id {
524            Some(id) => self.load_by_id(id),
525            None => Ok(None),
526        }
527    }
528
529    fn load_by_id(&self, id: u64) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
530        let path = self.manifest_path(id);
531        if !path.exists() {
532            return Ok(None);
533        }
534
535        let json = std::fs::read_to_string(&path)?;
536        let manifest: CheckpointManifest = serde_json::from_str(&json)?;
537
538        // Validate manifest consistency on load
539        let errors = manifest.validate();
540        if !errors.is_empty() {
541            tracing::warn!(
542                checkpoint_id = id,
543                error_count = errors.len(),
544                first_error = %errors[0],
545                "loaded checkpoint manifest has validation warnings"
546            );
547        }
548
549        Ok(Some(manifest))
550    }
551
552    fn list_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
553        self.sorted_checkpoint_ids()
554    }
555
556    fn list(&self) -> Result<Vec<(u64, u64)>, CheckpointStoreError> {
557        let ids = self.sorted_checkpoint_ids()?;
558        let mut result = Vec::with_capacity(ids.len());
559
560        for id in ids {
561            // Skip missing/corrupt manifests — list() is best-effort.
562            if let Ok(Some(manifest)) = self.load_by_id(id) {
563                result.push((manifest.checkpoint_id, manifest.epoch));
564            }
565        }
566
567        Ok(result)
568    }
569
570    fn prune(&self, keep_count: usize) -> Result<usize, CheckpointStoreError> {
571        let ids = self.sorted_checkpoint_ids()?;
572        if ids.len() <= keep_count {
573            return Ok(0);
574        }
575
576        let to_remove = ids.len() - keep_count;
577        let mut removed = 0;
578
579        for &id in &ids[..to_remove] {
580            let dir = self.checkpoint_dir(id);
581            if std::fs::remove_dir_all(&dir).is_ok() {
582                removed += 1;
583            }
584        }
585
586        Ok(removed)
587    }
588
589    fn save_state_data(&self, id: u64, data: &[u8]) -> Result<(), CheckpointStoreError> {
590        let cp_dir = self.checkpoint_dir(id);
591        std::fs::create_dir_all(&cp_dir)?;
592
593        let path = self.state_path(id);
594        let tmp = path.with_extension("bin.tmp");
595        std::fs::write(&tmp, data)?;
596        sync_file(&tmp)?;
597        std::fs::rename(&tmp, &path)?;
598        sync_dir(&cp_dir)?;
599
600        Ok(())
601    }
602
603    fn save_with_state(
604        &self,
605        manifest: &CheckpointManifest,
606        state_data: Option<&[u8]>,
607    ) -> Result<(), CheckpointStoreError> {
608        let mut manifest = manifest.clone();
609        // Write sidecar FIRST — if this fails, manifest is never written
610        // and latest.txt still points to the previous valid checkpoint.
611        if let Some(data) = state_data {
612            manifest.state_checksum = Some(sha256_hex(data));
613            self.save_state_data(manifest.checkpoint_id, data)?;
614        }
615        self.save(&manifest)
616    }
617
618    fn cleanup_orphans(&self) -> Result<usize, CheckpointStoreError> {
619        let orphans = self.find_orphan_dirs()?;
620        let mut cleaned = 0;
621        for dir in &orphans {
622            if std::fs::remove_dir_all(dir).is_ok() {
623                tracing::info!(
624                    path = %dir.display(),
625                    "cleaned up orphaned checkpoint directory"
626                );
627                cleaned += 1;
628            }
629        }
630        Ok(cleaned)
631    }
632
633    fn load_state_data(&self, id: u64) -> Result<Option<Vec<u8>>, CheckpointStoreError> {
634        let path = self.state_path(id);
635        if !path.exists() {
636            return Ok(None);
637        }
638        let data = std::fs::read(&path)?;
639        Ok(Some(data))
640    }
641}
642
643// ---------------------------------------------------------------------------
644// ObjectStoreCheckpointStore — sync wrapper around any ObjectStore backend
645// ---------------------------------------------------------------------------
646
647/// JSON pointer stored in `manifests/latest.json`.
648#[derive(serde::Serialize, serde::Deserialize)]
649struct LatestPointer {
650    checkpoint_id: u64,
651}
652
653/// Object-store-backed checkpoint store with hierarchical layout.
654///
655/// Bridges the sync [`CheckpointStore`] trait to the async [`ObjectStore`] API
656/// via a dedicated single-threaded Tokio runtime. Checkpoints run infrequently
657/// (every ~10s) and are not on the hot path.
658///
659/// ## Object Layout (v2 — hierarchical)
660///
661/// ```text
662/// {prefix}/
663///   manifests/
664///     manifest-000001.json    # Checkpoint manifest (JSON)
665///     manifest-000002.json
666///     latest.json             # {"checkpoint_id": 2}
667///   checkpoints/
668///     state-000001.bin        # Optional sidecar state
669///     state-000002.bin
670/// ```
671///
672/// ## Legacy Layout (v1 — flat, read-only)
673///
674/// ```text
675/// {prefix}/checkpoints/
676///   checkpoint_000001/
677///     manifest.json
678///     state.bin
679///   latest.txt                # "checkpoint_000002"
680/// ```
681///
682/// Reads check v2 paths first, then fall back to v1 for backward compatibility.
683/// Writes always use v2 layout. Manifest writes use [`PutMode::Create`] for
684/// split-brain prevention (conditional PUT).
685pub struct ObjectStoreCheckpointStore {
686    store: Arc<dyn ObjectStore>,
687    prefix: String,
688    max_retained: usize,
689    /// Dedicated single-threaded runtime for async→sync bridging.
690    /// Separate from the application runtime to avoid `block_on` reentrancy.
691    rt: tokio::runtime::Runtime,
692}
693
694impl ObjectStoreCheckpointStore {
695    /// Create a new object-store-backed checkpoint store.
696    ///
697    /// `prefix` is prepended to all object paths (e.g., `"nodes/abc123/"`).
698    /// It should end with `/` or be empty.
699    ///
700    /// # Errors
701    ///
702    /// Returns `std::io::Error` if the internal Tokio runtime cannot be created.
703    pub fn new(
704        store: Arc<dyn ObjectStore>,
705        prefix: String,
706        max_retained: usize,
707    ) -> std::io::Result<Self> {
708        let rt = tokio::runtime::Builder::new_current_thread()
709            .enable_all()
710            .build()?;
711        Ok(Self {
712            store,
713            prefix,
714            max_retained,
715            rt,
716        })
717    }
718
719    // ── v2 (hierarchical) paths ──
720
721    fn manifest_path(&self, id: u64) -> object_store::path::Path {
722        object_store::path::Path::from(format!("{}manifests/manifest-{id:06}.json", self.prefix))
723    }
724
725    fn latest_pointer_path(&self) -> object_store::path::Path {
726        object_store::path::Path::from(format!("{}manifests/latest.json", self.prefix))
727    }
728
729    fn state_path(&self, id: u64) -> object_store::path::Path {
730        object_store::path::Path::from(format!("{}checkpoints/state-{id:06}.bin", self.prefix))
731    }
732
733    // ── v1 (legacy) paths — for backward-compat reads only ──
734
735    fn legacy_manifest_path(&self, id: u64) -> object_store::path::Path {
736        object_store::path::Path::from(format!(
737            "{}checkpoints/checkpoint_{id:06}/manifest.json",
738            self.prefix
739        ))
740    }
741
742    fn legacy_state_path(&self, id: u64) -> object_store::path::Path {
743        object_store::path::Path::from(format!(
744            "{}checkpoints/checkpoint_{id:06}/state.bin",
745            self.prefix
746        ))
747    }
748
749    fn legacy_latest_path(&self) -> object_store::path::Path {
750        object_store::path::Path::from(format!("{}checkpoints/latest.txt", self.prefix))
751    }
752
753    // ── Helpers ──
754
755    /// GET an object, returning `Ok(None)` for `NotFound`.
756    fn get_bytes(
757        &self,
758        path: &object_store::path::Path,
759    ) -> Result<Option<bytes::Bytes>, CheckpointStoreError> {
760        let result = self
761            .rt
762            .block_on(async { self.store.get_opts(path, GetOptions::default()).await });
763
764        match result {
765            Ok(get_result) => {
766                let data = self.rt.block_on(async { get_result.bytes().await })?;
767                Ok(Some(data))
768            }
769            Err(object_store::Error::NotFound { .. }) => Ok(None),
770            Err(e) => Err(CheckpointStoreError::ObjectStore(e)),
771        }
772    }
773
774    /// Load a manifest from a specific path, returning `Ok(None)` for `NotFound`.
775    fn load_manifest_at(
776        &self,
777        path: &object_store::path::Path,
778    ) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
779        match self.get_bytes(path)? {
780            Some(data) => {
781                let manifest: CheckpointManifest = serde_json::from_slice(&data)?;
782                Ok(Some(manifest))
783            }
784            None => Ok(None),
785        }
786    }
787
788    /// List checkpoint IDs by scanning both v2 and v1 layouts.
789    fn list_checkpoint_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
790        let mut ids = std::collections::BTreeSet::new();
791
792        // v2 layout: manifests/manifest-NNNNNN.json
793        let manifests_prefix = object_store::path::Path::from(format!("{}manifests/", self.prefix));
794        let entries: Vec<_> = self.rt.block_on(async {
795            use futures::TryStreamExt;
796            self.store
797                .list(Some(&manifests_prefix))
798                .try_collect::<Vec<_>>()
799                .await
800        })?;
801        for entry in &entries {
802            let path_str = entry.location.as_ref();
803            for segment in path_str.split('/') {
804                if let Some(rest) = segment.strip_prefix("manifest-") {
805                    if let Some(id_str) = rest.strip_suffix(".json") {
806                        if let Ok(id) = id_str.parse::<u64>() {
807                            ids.insert(id);
808                        }
809                    }
810                }
811            }
812        }
813
814        // v1 layout: checkpoints/checkpoint_NNNNNN/manifest.json
815        let checkpoints_prefix =
816            object_store::path::Path::from(format!("{}checkpoints/", self.prefix));
817        let entries: Vec<_> = self.rt.block_on(async {
818            use futures::TryStreamExt;
819            self.store
820                .list(Some(&checkpoints_prefix))
821                .try_collect::<Vec<_>>()
822                .await
823        })?;
824        for entry in &entries {
825            let path_str = entry.location.as_ref();
826            if !path_str.ends_with("manifest.json") {
827                continue;
828            }
829            for segment in path_str.split('/') {
830                if let Some(id_str) = segment.strip_prefix("checkpoint_") {
831                    if let Ok(id) = id_str.parse::<u64>() {
832                        ids.insert(id);
833                    }
834                }
835            }
836        }
837
838        Ok(ids.into_iter().collect())
839    }
840}
841
842impl CheckpointStore for ObjectStoreCheckpointStore {
843    fn save(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError> {
844        let json = serde_json::to_string_pretty(manifest)?;
845        let path = self.manifest_path(manifest.checkpoint_id);
846        let json_bytes = bytes::Bytes::from(json);
847
848        // Conditional PUT — prevents duplicate manifest writes (split-brain safety).
849        let create_opts = PutOptions {
850            mode: PutMode::Create,
851            ..PutOptions::default()
852        };
853        let result = self.rt.block_on(async {
854            self.store
855                .put_opts(
856                    &path,
857                    PutPayload::from_bytes(json_bytes.clone()),
858                    create_opts,
859                )
860                .await
861        });
862
863        match result {
864            Ok(_) => {}
865            Err(object_store::Error::AlreadyExists { .. }) => {
866                tracing::warn!(
867                    checkpoint_id = manifest.checkpoint_id,
868                    "[LDB-6010] Manifest already exists — skipping write"
869                );
870            }
871            Err(object_store::Error::NotImplemented { .. }) => {
872                // Backend doesn't support conditional PUT — fall back to overwrite.
873                self.rt.block_on(async {
874                    self.store
875                        .put_opts(
876                            &path,
877                            PutPayload::from_bytes(json_bytes),
878                            PutOptions::default(),
879                        )
880                        .await
881                })?;
882            }
883            Err(e) => return Err(CheckpointStoreError::ObjectStore(e)),
884        }
885
886        // Update latest.json pointer (always overwrite).
887        let latest = self.latest_pointer_path();
888        let pointer = serde_json::to_string(&LatestPointer {
889            checkpoint_id: manifest.checkpoint_id,
890        })?;
891        let payload = PutPayload::from_bytes(bytes::Bytes::from(pointer));
892        self.rt.block_on(async {
893            self.store
894                .put_opts(&latest, payload, PutOptions::default())
895                .await
896        })?;
897
898        // Auto-prune
899        if self.max_retained > 0 {
900            if let Err(e) = self.prune(self.max_retained) {
901                tracing::warn!(
902                    max_retained = self.max_retained,
903                    error = %e,
904                    "[LDB-6009] Object store checkpoint prune failed"
905                );
906            }
907        }
908
909        Ok(())
910    }
911
912    fn update_manifest(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError> {
913        let json = serde_json::to_string_pretty(manifest)?;
914        let path = self.manifest_path(manifest.checkpoint_id);
915        let payload = PutPayload::from_bytes(bytes::Bytes::from(json));
916
917        // Unconditional PUT — overwrites the existing manifest.
918        self.rt.block_on(async {
919            self.store
920                .put_opts(&path, payload, PutOptions::default())
921                .await
922        })?;
923
924        Ok(())
925    }
926
927    fn load_latest(&self) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
928        // Try v2 layout: manifests/latest.json
929        if let Some(data) = self.get_bytes(&self.latest_pointer_path())? {
930            let pointer: LatestPointer = serde_json::from_slice(&data)?;
931            return self.load_by_id(pointer.checkpoint_id);
932        }
933
934        // Fall back to v1 layout: checkpoints/latest.txt
935        if let Some(data) = self.get_bytes(&self.legacy_latest_path())? {
936            let content = String::from_utf8_lossy(&data);
937            let dir_name = content.trim();
938            if dir_name.is_empty() {
939                return Ok(None);
940            }
941            let id = dir_name
942                .strip_prefix("checkpoint_")
943                .and_then(|s| s.parse::<u64>().ok());
944            return match id {
945                Some(id) => self.load_by_id(id),
946                None => Ok(None),
947            };
948        }
949
950        Ok(None)
951    }
952
953    fn load_by_id(&self, id: u64) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
954        // Try v2 layout first
955        if let Some(m) = self.load_manifest_at(&self.manifest_path(id))? {
956            return Ok(Some(m));
957        }
958        // Fall back to v1 layout
959        self.load_manifest_at(&self.legacy_manifest_path(id))
960    }
961
962    fn list_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
963        self.list_checkpoint_ids()
964    }
965
966    fn list(&self) -> Result<Vec<(u64, u64)>, CheckpointStoreError> {
967        let ids = self.list_checkpoint_ids()?;
968        let mut result = Vec::with_capacity(ids.len());
969
970        for id in ids {
971            if let Ok(Some(manifest)) = self.load_by_id(id) {
972                result.push((manifest.checkpoint_id, manifest.epoch));
973            }
974        }
975
976        Ok(result)
977    }
978
979    fn prune(&self, keep_count: usize) -> Result<usize, CheckpointStoreError> {
980        let ids = self.list_checkpoint_ids()?;
981        if ids.len() <= keep_count {
982            return Ok(0);
983        }
984
985        let to_remove = ids.len() - keep_count;
986        let mut removed = 0;
987
988        for &id in &ids[..to_remove] {
989            // Delete from both v2 and v1 layouts (ignore NotFound).
990            let paths = vec![
991                Ok(self.manifest_path(id)),
992                Ok(self.state_path(id)),
993                Ok(self.legacy_manifest_path(id)),
994                Ok(self.legacy_state_path(id)),
995            ];
996
997            self.rt.block_on(async {
998                use futures::StreamExt;
999                let stream = futures::stream::iter(paths).boxed();
1000                let mut results = self.store.delete_stream(stream);
1001                while let Some(_result) = results.next().await {
1002                    // Ignore individual delete errors (file may not exist)
1003                }
1004            });
1005            removed += 1;
1006        }
1007
1008        Ok(removed)
1009    }
1010
1011    fn save_state_data(&self, id: u64, data: &[u8]) -> Result<(), CheckpointStoreError> {
1012        let path = self.state_path(id);
1013        let payload = PutPayload::from_bytes(bytes::Bytes::copy_from_slice(data));
1014        self.rt.block_on(async {
1015            self.store
1016                .put_opts(&path, payload, PutOptions::default())
1017                .await
1018        })?;
1019        Ok(())
1020    }
1021
1022    fn load_state_data(&self, id: u64) -> Result<Option<Vec<u8>>, CheckpointStoreError> {
1023        // Try v2 layout first
1024        if let Some(data) = self.get_bytes(&self.state_path(id))? {
1025            return Ok(Some(data.to_vec()));
1026        }
1027        // Fall back to v1 layout
1028        Ok(self
1029            .get_bytes(&self.legacy_state_path(id))?
1030            .map(|d| d.to_vec()))
1031    }
1032
1033    fn cleanup_orphans(&self) -> Result<usize, CheckpointStoreError> {
1034        // Collect state IDs that have a state.bin but no matching manifest.
1035        let manifest_ids: std::collections::BTreeSet<u64> =
1036            self.list_checkpoint_ids()?.into_iter().collect();
1037
1038        // List state files in v2 layout: checkpoints/state-NNNNNN.bin
1039        let state_prefix = object_store::path::Path::from(format!("{}checkpoints/", self.prefix));
1040        let entries: Vec<_> = self.rt.block_on(async {
1041            use futures::TryStreamExt;
1042            self.store
1043                .list(Some(&state_prefix))
1044                .try_collect::<Vec<_>>()
1045                .await
1046        })?;
1047
1048        let mut orphan_paths = Vec::new();
1049        for entry in &entries {
1050            let path_str = entry.location.as_ref();
1051            for segment in path_str.split('/') {
1052                if let Some(rest) = segment.strip_prefix("state-") {
1053                    if let Some(id_str) = rest.strip_suffix(".bin") {
1054                        if let Ok(id) = id_str.parse::<u64>() {
1055                            if !manifest_ids.contains(&id) {
1056                                orphan_paths.push(entry.location.clone());
1057                            }
1058                        }
1059                    }
1060                }
1061            }
1062        }
1063
1064        let count = orphan_paths.len();
1065        if !orphan_paths.is_empty() {
1066            self.rt.block_on(async {
1067                use futures::StreamExt;
1068                let stream = futures::stream::iter(orphan_paths.into_iter().map(Ok)).boxed();
1069                let mut results = self.store.delete_stream(stream);
1070                while let Some(result) = results.next().await {
1071                    if let Err(e) = result {
1072                        tracing::warn!(error = %e, "failed to delete orphan state file");
1073                    }
1074                }
1075            });
1076        }
1077
1078        Ok(count)
1079    }
1080}
1081
1082#[cfg(test)]
1083mod tests {
1084    use super::*;
1085    use crate::checkpoint_manifest::{ConnectorCheckpoint, OperatorCheckpoint};
1086    #[allow(clippy::disallowed_types)] // cold path: checkpoint store
1087    use std::collections::HashMap;
1088
1089    fn make_store(dir: &Path) -> FileSystemCheckpointStore {
1090        FileSystemCheckpointStore::new(dir, 3)
1091    }
1092
1093    fn make_manifest(id: u64, epoch: u64) -> CheckpointManifest {
1094        CheckpointManifest::new(id, epoch)
1095    }
1096
1097    #[test]
1098    fn test_save_and_load_latest() {
1099        let dir = tempfile::tempdir().unwrap();
1100        let store = make_store(dir.path());
1101
1102        let m = make_manifest(1, 1);
1103        store.save(&m).unwrap();
1104
1105        let loaded = store.load_latest().unwrap().unwrap();
1106        assert_eq!(loaded.checkpoint_id, 1);
1107        assert_eq!(loaded.epoch, 1);
1108    }
1109
1110    #[test]
1111    fn test_load_latest_returns_none_when_empty() {
1112        let dir = tempfile::tempdir().unwrap();
1113        let store = make_store(dir.path());
1114        assert!(store.load_latest().unwrap().is_none());
1115    }
1116
1117    #[test]
1118    fn test_load_latest_returns_most_recent() {
1119        let dir = tempfile::tempdir().unwrap();
1120        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1121
1122        for i in 1..=5 {
1123            store.save(&make_manifest(i, i)).unwrap();
1124        }
1125
1126        let latest = store.load_latest().unwrap().unwrap();
1127        assert_eq!(latest.checkpoint_id, 5);
1128        assert_eq!(latest.epoch, 5);
1129    }
1130
1131    #[test]
1132    fn test_load_by_id() {
1133        let dir = tempfile::tempdir().unwrap();
1134        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1135
1136        store.save(&make_manifest(1, 10)).unwrap();
1137        store.save(&make_manifest(2, 20)).unwrap();
1138
1139        let m = store.load_by_id(1).unwrap().unwrap();
1140        assert_eq!(m.epoch, 10);
1141
1142        let m = store.load_by_id(2).unwrap().unwrap();
1143        assert_eq!(m.epoch, 20);
1144
1145        assert!(store.load_by_id(99).unwrap().is_none());
1146    }
1147
1148    #[test]
1149    fn test_list() {
1150        let dir = tempfile::tempdir().unwrap();
1151        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1152
1153        store.save(&make_manifest(1, 10)).unwrap();
1154        store.save(&make_manifest(3, 30)).unwrap();
1155        store.save(&make_manifest(2, 20)).unwrap();
1156
1157        let list = store.list().unwrap();
1158        assert_eq!(list, vec![(1, 10), (2, 20), (3, 30)]);
1159    }
1160
1161    #[test]
1162    fn test_prune_keeps_max() {
1163        let dir = tempfile::tempdir().unwrap();
1164        let store = FileSystemCheckpointStore::new(dir.path(), 10); // no auto-prune
1165
1166        for i in 1..=5 {
1167            store.save(&make_manifest(i, i)).unwrap();
1168        }
1169
1170        let removed = store.prune(2).unwrap();
1171        assert_eq!(removed, 3);
1172
1173        let list = store.list().unwrap();
1174        assert_eq!(list.len(), 2);
1175        assert_eq!(list[0].0, 4);
1176        assert_eq!(list[1].0, 5);
1177    }
1178
1179    #[test]
1180    fn test_auto_prune_on_save() {
1181        let dir = tempfile::tempdir().unwrap();
1182        let store = FileSystemCheckpointStore::new(dir.path(), 2);
1183
1184        for i in 1..=5 {
1185            store.save(&make_manifest(i, i)).unwrap();
1186        }
1187
1188        let list = store.list().unwrap();
1189        assert_eq!(list.len(), 2);
1190        // Should keep the two most recent
1191        assert_eq!(list[0].0, 4);
1192        assert_eq!(list[1].0, 5);
1193    }
1194
1195    #[test]
1196    fn test_save_and_load_state_data() {
1197        let dir = tempfile::tempdir().unwrap();
1198        let store = make_store(dir.path());
1199
1200        store.save(&make_manifest(1, 1)).unwrap();
1201
1202        let data = b"large operator state binary blob";
1203        store.save_state_data(1, data).unwrap();
1204
1205        let loaded = store.load_state_data(1).unwrap().unwrap();
1206        assert_eq!(loaded, data);
1207    }
1208
1209    #[test]
1210    fn test_load_state_data_returns_none() {
1211        let dir = tempfile::tempdir().unwrap();
1212        let store = make_store(dir.path());
1213        assert!(store.load_state_data(99).unwrap().is_none());
1214    }
1215
1216    #[test]
1217    fn test_full_manifest_round_trip() {
1218        let dir = tempfile::tempdir().unwrap();
1219        let store = make_store(dir.path());
1220
1221        let mut m = make_manifest(1, 5);
1222        m.source_offsets.insert(
1223            "kafka-src".into(),
1224            ConnectorCheckpoint::with_offsets(
1225                5,
1226                HashMap::from([("0".into(), "1000".into()), ("1".into(), "2000".into())]),
1227            ),
1228        );
1229        m.sink_epochs.insert("pg-sink".into(), 4);
1230        m.table_offsets.insert(
1231            "instruments".into(),
1232            ConnectorCheckpoint::with_offsets(5, HashMap::from([("lsn".into(), "0/AB".into())])),
1233        );
1234        m.operator_states
1235            .insert("window".into(), OperatorCheckpoint::inline(b"data"));
1236        m.watermark = Some(999_000);
1237        m.wal_position = 4096;
1238        m.per_core_wal_positions = vec![100, 200];
1239
1240        store.save(&m).unwrap();
1241
1242        let loaded = store.load_latest().unwrap().unwrap();
1243        assert_eq!(loaded.checkpoint_id, 1);
1244        assert_eq!(loaded.epoch, 5);
1245        assert_eq!(loaded.watermark, Some(999_000));
1246        assert_eq!(loaded.wal_position, 4096);
1247        assert_eq!(loaded.per_core_wal_positions, vec![100, 200]);
1248
1249        let src = loaded.source_offsets.get("kafka-src").unwrap();
1250        assert_eq!(src.offsets.get("0"), Some(&"1000".into()));
1251
1252        assert_eq!(loaded.sink_epochs.get("pg-sink"), Some(&4));
1253
1254        let tbl = loaded.table_offsets.get("instruments").unwrap();
1255        assert_eq!(tbl.offsets.get("lsn"), Some(&"0/AB".into()));
1256
1257        let op = loaded.operator_states.get("window").unwrap();
1258        assert_eq!(op.decode_inline().unwrap(), b"data");
1259    }
1260
1261    #[test]
1262    fn test_empty_latest_txt() {
1263        let dir = tempfile::tempdir().unwrap();
1264        let store = make_store(dir.path());
1265
1266        let cp_dir = dir.path().join("checkpoints");
1267        std::fs::create_dir_all(&cp_dir).unwrap();
1268        std::fs::write(cp_dir.join("latest.txt"), "").unwrap();
1269
1270        assert!(store.load_latest().unwrap().is_none());
1271    }
1272
1273    #[test]
1274    fn test_latest_points_to_missing_checkpoint() {
1275        let dir = tempfile::tempdir().unwrap();
1276        let store = make_store(dir.path());
1277
1278        let cp_dir = dir.path().join("checkpoints");
1279        std::fs::create_dir_all(&cp_dir).unwrap();
1280        std::fs::write(cp_dir.join("latest.txt"), "checkpoint_000099").unwrap();
1281
1282        assert!(store.load_latest().unwrap().is_none());
1283    }
1284
1285    #[test]
1286    fn test_prune_no_op_when_under_limit() {
1287        let dir = tempfile::tempdir().unwrap();
1288        let store = make_store(dir.path());
1289
1290        store.save(&make_manifest(1, 1)).unwrap();
1291        let removed = store.prune(5).unwrap();
1292        assert_eq!(removed, 0);
1293    }
1294
1295    #[test]
1296    fn test_save_with_state_writes_sidecar_before_manifest() {
1297        let dir = tempfile::tempdir().unwrap();
1298        let store = make_store(dir.path());
1299
1300        let m = make_manifest(1, 1);
1301        let state = b"large-operator-state-blob";
1302        store.save_with_state(&m, Some(state)).unwrap();
1303
1304        // Both manifest and state should be present.
1305        let loaded = store.load_latest().unwrap().unwrap();
1306        assert_eq!(loaded.checkpoint_id, 1);
1307
1308        let loaded_state = store.load_state_data(1).unwrap().unwrap();
1309        assert_eq!(loaded_state, state);
1310    }
1311
1312    #[test]
1313    fn test_save_with_state_none_is_same_as_save() {
1314        let dir = tempfile::tempdir().unwrap();
1315        let store = make_store(dir.path());
1316
1317        let m = make_manifest(1, 1);
1318        store.save_with_state(&m, None).unwrap();
1319
1320        let loaded = store.load_latest().unwrap().unwrap();
1321        assert_eq!(loaded.checkpoint_id, 1);
1322        assert!(store.load_state_data(1).unwrap().is_none());
1323    }
1324
1325    #[test]
1326    fn test_orphaned_state_without_manifest_is_ignored() {
1327        let dir = tempfile::tempdir().unwrap();
1328        let store = make_store(dir.path());
1329
1330        // Write only sidecar state, no manifest (simulates crash after
1331        // state write but before manifest write).
1332        store.save_state_data(1, b"orphaned").unwrap();
1333
1334        // load_latest should return None — the orphan is not visible.
1335        assert!(store.load_latest().unwrap().is_none());
1336
1337        // list should not include the orphan (no manifest.json).
1338        assert!(store.list().unwrap().is_empty());
1339    }
1340
1341    // -----------------------------------------------------------------------
1342    // ObjectStoreCheckpointStore tests (using InMemory backend)
1343    // -----------------------------------------------------------------------
1344
1345    fn make_obj_store() -> ObjectStoreCheckpointStore {
1346        let store = Arc::new(object_store::memory::InMemory::new());
1347        ObjectStoreCheckpointStore::new(store, String::new(), 3).unwrap()
1348    }
1349
1350    fn make_obj_store_shared(
1351        store: Arc<object_store::memory::InMemory>,
1352    ) -> ObjectStoreCheckpointStore {
1353        ObjectStoreCheckpointStore::new(store, String::new(), 10).unwrap()
1354    }
1355
1356    /// Write a manifest to the legacy (v1) layout for backward-compat testing.
1357    fn write_legacy_manifest(
1358        store: &Arc<object_store::memory::InMemory>,
1359        prefix: &str,
1360        manifest: &CheckpointManifest,
1361    ) {
1362        let rt = tokio::runtime::Builder::new_current_thread()
1363            .enable_all()
1364            .build()
1365            .unwrap();
1366        let json = serde_json::to_string_pretty(manifest).unwrap();
1367
1368        let path = object_store::path::Path::from(format!(
1369            "{}checkpoints/checkpoint_{:06}/manifest.json",
1370            prefix, manifest.checkpoint_id
1371        ));
1372        rt.block_on(async {
1373            store
1374                .put_opts(
1375                    &path,
1376                    PutPayload::from_bytes(bytes::Bytes::from(json)),
1377                    PutOptions::default(),
1378                )
1379                .await
1380                .unwrap();
1381        });
1382
1383        let latest = object_store::path::Path::from(format!("{prefix}checkpoints/latest.txt"));
1384        let content = format!("checkpoint_{:06}", manifest.checkpoint_id);
1385        rt.block_on(async {
1386            store
1387                .put_opts(
1388                    &latest,
1389                    PutPayload::from_bytes(bytes::Bytes::from(content)),
1390                    PutOptions::default(),
1391                )
1392                .await
1393                .unwrap();
1394        });
1395    }
1396
1397    #[test]
1398    fn test_obj_save_and_load_latest() {
1399        let store = make_obj_store();
1400        let m = make_manifest(1, 1);
1401        store.save(&m).unwrap();
1402
1403        let loaded = store.load_latest().unwrap().unwrap();
1404        assert_eq!(loaded.checkpoint_id, 1);
1405        assert_eq!(loaded.epoch, 1);
1406    }
1407
1408    #[test]
1409    fn test_obj_load_latest_returns_none_when_empty() {
1410        let store = make_obj_store();
1411        assert!(store.load_latest().unwrap().is_none());
1412    }
1413
1414    #[test]
1415    fn test_obj_load_by_id() {
1416        let store = ObjectStoreCheckpointStore::new(
1417            Arc::new(object_store::memory::InMemory::new()),
1418            String::new(),
1419            10,
1420        )
1421        .unwrap();
1422
1423        store.save(&make_manifest(1, 10)).unwrap();
1424        store.save(&make_manifest(2, 20)).unwrap();
1425
1426        let m = store.load_by_id(1).unwrap().unwrap();
1427        assert_eq!(m.epoch, 10);
1428        let m = store.load_by_id(2).unwrap().unwrap();
1429        assert_eq!(m.epoch, 20);
1430        assert!(store.load_by_id(99).unwrap().is_none());
1431    }
1432
1433    #[test]
1434    fn test_obj_list() {
1435        let store = ObjectStoreCheckpointStore::new(
1436            Arc::new(object_store::memory::InMemory::new()),
1437            String::new(),
1438            10,
1439        )
1440        .unwrap();
1441
1442        store.save(&make_manifest(1, 10)).unwrap();
1443        store.save(&make_manifest(3, 30)).unwrap();
1444        store.save(&make_manifest(2, 20)).unwrap();
1445
1446        let list = store.list().unwrap();
1447        assert_eq!(list, vec![(1, 10), (2, 20), (3, 30)]);
1448    }
1449
1450    #[test]
1451    fn test_obj_prune() {
1452        let store = ObjectStoreCheckpointStore::new(
1453            Arc::new(object_store::memory::InMemory::new()),
1454            String::new(),
1455            10,
1456        )
1457        .unwrap();
1458
1459        for i in 1..=5 {
1460            store.save(&make_manifest(i, i)).unwrap();
1461        }
1462
1463        let removed = store.prune(2).unwrap();
1464        assert_eq!(removed, 3);
1465
1466        let list = store.list().unwrap();
1467        assert_eq!(list.len(), 2);
1468        assert_eq!(list[0].0, 4);
1469        assert_eq!(list[1].0, 5);
1470    }
1471
1472    #[test]
1473    fn test_obj_auto_prune_on_save() {
1474        let store = ObjectStoreCheckpointStore::new(
1475            Arc::new(object_store::memory::InMemory::new()),
1476            String::new(),
1477            2,
1478        )
1479        .unwrap();
1480
1481        for i in 1..=5 {
1482            store.save(&make_manifest(i, i)).unwrap();
1483        }
1484
1485        let list = store.list().unwrap();
1486        assert_eq!(list.len(), 2);
1487        assert_eq!(list[0].0, 4);
1488        assert_eq!(list[1].0, 5);
1489    }
1490
1491    #[test]
1492    fn test_obj_save_and_load_state_data() {
1493        let store = make_obj_store();
1494        store.save(&make_manifest(1, 1)).unwrap();
1495
1496        let data = b"large operator state binary blob";
1497        store.save_state_data(1, data).unwrap();
1498
1499        let loaded = store.load_state_data(1).unwrap().unwrap();
1500        assert_eq!(loaded, data);
1501    }
1502
1503    #[test]
1504    fn test_obj_load_state_data_returns_none() {
1505        let store = make_obj_store();
1506        assert!(store.load_state_data(99).unwrap().is_none());
1507    }
1508
1509    #[test]
1510    fn test_obj_with_prefix() {
1511        let inner = Arc::new(object_store::memory::InMemory::new());
1512        let store =
1513            ObjectStoreCheckpointStore::new(inner, "nodes/abc123/".to_string(), 10).unwrap();
1514
1515        store.save(&make_manifest(1, 42)).unwrap();
1516        let loaded = store.load_latest().unwrap().unwrap();
1517        assert_eq!(loaded.checkpoint_id, 1);
1518        assert_eq!(loaded.epoch, 42);
1519    }
1520
1521    // -----------------------------------------------------------------------
1522    // v2 layout verification + backward compatibility tests
1523    // -----------------------------------------------------------------------
1524
1525    #[test]
1526    fn test_obj_v2_layout_paths() {
1527        let inner = Arc::new(object_store::memory::InMemory::new());
1528        let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10).unwrap();
1529
1530        store.save(&make_manifest(1, 10)).unwrap();
1531
1532        let rt = tokio::runtime::Builder::new_current_thread()
1533            .enable_all()
1534            .build()
1535            .unwrap();
1536
1537        // Manifest should be at v2 path
1538        let result = rt.block_on(async {
1539            inner
1540                .get_opts(
1541                    &object_store::path::Path::from("manifests/manifest-000001.json"),
1542                    GetOptions::default(),
1543                )
1544                .await
1545        });
1546        assert!(result.is_ok(), "v2 manifest path should exist");
1547
1548        // latest.json should exist
1549        let result = rt.block_on(async {
1550            inner
1551                .get_opts(
1552                    &object_store::path::Path::from("manifests/latest.json"),
1553                    GetOptions::default(),
1554                )
1555                .await
1556        });
1557        assert!(result.is_ok(), "v2 latest.json should exist");
1558
1559        // v1 path should NOT exist
1560        let result = rt.block_on(async {
1561            inner
1562                .get_opts(
1563                    &object_store::path::Path::from("checkpoints/checkpoint_000001/manifest.json"),
1564                    GetOptions::default(),
1565                )
1566                .await
1567        });
1568        assert!(result.is_err(), "v1 manifest path should NOT exist");
1569    }
1570
1571    #[test]
1572    fn test_obj_v1_backward_compat_load() {
1573        let inner = Arc::new(object_store::memory::InMemory::new());
1574        write_legacy_manifest(&inner, "", &make_manifest(1, 42));
1575
1576        let store = make_obj_store_shared(inner);
1577
1578        let loaded = store.load_latest().unwrap().unwrap();
1579        assert_eq!(loaded.checkpoint_id, 1);
1580        assert_eq!(loaded.epoch, 42);
1581
1582        let loaded = store.load_by_id(1).unwrap().unwrap();
1583        assert_eq!(loaded.epoch, 42);
1584    }
1585
1586    #[test]
1587    fn test_obj_v1_backward_compat_list() {
1588        let inner = Arc::new(object_store::memory::InMemory::new());
1589        write_legacy_manifest(&inner, "", &make_manifest(1, 10));
1590        write_legacy_manifest(&inner, "", &make_manifest(2, 20));
1591
1592        let store = make_obj_store_shared(inner);
1593        let list = store.list().unwrap();
1594        assert_eq!(list, vec![(1, 10), (2, 20)]);
1595    }
1596
1597    #[test]
1598    fn test_obj_mixed_layout_list() {
1599        let inner = Arc::new(object_store::memory::InMemory::new());
1600        // Checkpoint 1 in v1 layout
1601        write_legacy_manifest(&inner, "", &make_manifest(1, 10));
1602        // Checkpoint 2 in v2 layout
1603        let store = make_obj_store_shared(inner);
1604        store.save(&make_manifest(2, 20)).unwrap();
1605
1606        let list = store.list().unwrap();
1607        assert_eq!(list, vec![(1, 10), (2, 20)]);
1608    }
1609
1610    #[test]
1611    fn test_obj_conditional_put_idempotent() {
1612        let store = ObjectStoreCheckpointStore::new(
1613            Arc::new(object_store::memory::InMemory::new()),
1614            String::new(),
1615            10,
1616        )
1617        .unwrap();
1618
1619        let m = make_manifest(1, 10);
1620        store.save(&m).unwrap();
1621
1622        // Second save with same ID should succeed (logs warning, skips write)
1623        store.save(&m).unwrap();
1624
1625        let loaded = store.load_latest().unwrap().unwrap();
1626        assert_eq!(loaded.checkpoint_id, 1);
1627        assert_eq!(loaded.epoch, 10);
1628    }
1629
1630    #[test]
1631    fn test_obj_update_manifest_overwrites() {
1632        use crate::checkpoint_manifest::SinkCommitStatus;
1633
1634        let store = make_obj_store();
1635
1636        // Step 5: initial save with Pending sink status
1637        let mut m = make_manifest(1, 10);
1638        m.sink_commit_statuses
1639            .insert("pg-sink".into(), SinkCommitStatus::Pending);
1640        store.save(&m).unwrap();
1641
1642        // Verify Pending persisted
1643        let loaded = store.load_by_id(1).unwrap().unwrap();
1644        assert_eq!(
1645            loaded.sink_commit_statuses.get("pg-sink"),
1646            Some(&SinkCommitStatus::Pending)
1647        );
1648
1649        // Step 6b: update manifest with Committed status
1650        m.sink_commit_statuses
1651            .insert("pg-sink".into(), SinkCommitStatus::Committed);
1652        store.update_manifest(&m).unwrap();
1653
1654        // Verify Committed persisted (the bug: save() would skip this)
1655        let loaded = store.load_by_id(1).unwrap().unwrap();
1656        assert_eq!(
1657            loaded.sink_commit_statuses.get("pg-sink"),
1658            Some(&SinkCommitStatus::Committed)
1659        );
1660    }
1661
1662    #[test]
1663    fn test_obj_save_still_uses_conditional_put() {
1664        let store = make_obj_store();
1665
1666        let m = make_manifest(1, 10);
1667        store.save(&m).unwrap();
1668
1669        // Second save() with same ID skips (conditional PUT)
1670        // but does not error
1671        store.save(&m).unwrap();
1672
1673        // update_manifest() with same ID overwrites
1674        let mut m2 = make_manifest(1, 10);
1675        m2.watermark = Some(42);
1676        store.update_manifest(&m2).unwrap();
1677
1678        let loaded = store.load_by_id(1).unwrap().unwrap();
1679        assert_eq!(loaded.watermark, Some(42));
1680    }
1681
1682    #[test]
1683    fn test_fs_update_manifest_overwrites() {
1684        use crate::checkpoint_manifest::SinkCommitStatus;
1685
1686        let dir = tempfile::tempdir().unwrap();
1687        let store = make_store(dir.path());
1688
1689        let mut m = make_manifest(1, 10);
1690        m.sink_commit_statuses
1691            .insert("sink-a".into(), SinkCommitStatus::Pending);
1692        store.save(&m).unwrap();
1693
1694        m.sink_commit_statuses
1695            .insert("sink-a".into(), SinkCommitStatus::Committed);
1696        store.update_manifest(&m).unwrap();
1697
1698        let loaded = store.load_by_id(1).unwrap().unwrap();
1699        assert_eq!(
1700            loaded.sink_commit_statuses.get("sink-a"),
1701            Some(&SinkCommitStatus::Committed)
1702        );
1703    }
1704
1705    #[test]
1706    fn test_obj_v1_state_backward_compat() {
1707        let inner = Arc::new(object_store::memory::InMemory::new());
1708        let rt = tokio::runtime::Builder::new_current_thread()
1709            .enable_all()
1710            .build()
1711            .unwrap();
1712
1713        // Write state to v1 path
1714        let path = object_store::path::Path::from("checkpoints/checkpoint_000001/state.bin");
1715        let data = b"legacy-state-blob";
1716        rt.block_on(async {
1717            inner
1718                .put_opts(
1719                    &path,
1720                    PutPayload::from_bytes(bytes::Bytes::from_static(data)),
1721                    PutOptions::default(),
1722                )
1723                .await
1724                .unwrap();
1725        });
1726
1727        let store = make_obj_store_shared(inner);
1728        let loaded = store.load_state_data(1).unwrap().unwrap();
1729        assert_eq!(loaded, data);
1730    }
1731
1732    #[test]
1733    fn test_obj_v2_state_paths() {
1734        let inner = Arc::new(object_store::memory::InMemory::new());
1735        let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10).unwrap();
1736
1737        store.save(&make_manifest(1, 1)).unwrap();
1738        store.save_state_data(1, b"v2-state").unwrap();
1739
1740        let rt = tokio::runtime::Builder::new_current_thread()
1741            .enable_all()
1742            .build()
1743            .unwrap();
1744
1745        // State should be at v2 path
1746        let result = rt.block_on(async {
1747            inner
1748                .get_opts(
1749                    &object_store::path::Path::from("checkpoints/state-000001.bin"),
1750                    GetOptions::default(),
1751                )
1752                .await
1753        });
1754        assert!(result.is_ok(), "v2 state path should exist");
1755
1756        // v1 state path should NOT exist
1757        let result = rt.block_on(async {
1758            inner
1759                .get_opts(
1760                    &object_store::path::Path::from("checkpoints/checkpoint_000001/state.bin"),
1761                    GetOptions::default(),
1762                )
1763                .await
1764        });
1765        assert!(result.is_err(), "v1 state path should NOT exist");
1766    }
1767
1768    #[test]
1769    fn test_obj_prune_cleans_both_layouts() {
1770        let inner = Arc::new(object_store::memory::InMemory::new());
1771        // Checkpoint 1 in v1 layout
1772        write_legacy_manifest(&inner, "", &make_manifest(1, 10));
1773        // Checkpoints 2-4 in v2 layout
1774        let store = ObjectStoreCheckpointStore::new(inner, String::new(), 10).unwrap();
1775        store.save(&make_manifest(2, 20)).unwrap();
1776        store.save(&make_manifest(3, 30)).unwrap();
1777        store.save(&make_manifest(4, 40)).unwrap();
1778
1779        let removed = store.prune(2).unwrap();
1780        assert_eq!(removed, 2);
1781
1782        let list = store.list().unwrap();
1783        assert_eq!(list, vec![(3, 30), (4, 40)]);
1784    }
1785
1786    #[test]
1787    fn test_obj_latest_json_format() {
1788        let inner = Arc::new(object_store::memory::InMemory::new());
1789        let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10).unwrap();
1790
1791        store.save(&make_manifest(5, 50)).unwrap();
1792
1793        let rt = tokio::runtime::Builder::new_current_thread()
1794            .enable_all()
1795            .build()
1796            .unwrap();
1797        let data = rt.block_on(async {
1798            inner
1799                .get_opts(
1800                    &object_store::path::Path::from("manifests/latest.json"),
1801                    GetOptions::default(),
1802                )
1803                .await
1804                .unwrap()
1805                .bytes()
1806                .await
1807                .unwrap()
1808        });
1809
1810        let pointer: super::LatestPointer = serde_json::from_slice(&data).unwrap();
1811        assert_eq!(pointer.checkpoint_id, 5);
1812    }
1813
1814    #[test]
1815    fn test_validate_checkpoint_valid() {
1816        let dir = tempfile::tempdir().unwrap();
1817        let store = make_store(dir.path());
1818
1819        let m = make_manifest(1, 1);
1820        store.save(&m).unwrap();
1821
1822        let result = store.validate_checkpoint(1).unwrap();
1823        assert!(result.valid, "valid checkpoint: {:?}", result.issues);
1824        assert!(result.issues.is_empty());
1825    }
1826
1827    #[test]
1828    fn test_validate_checkpoint_epoch_zero_invalid() {
1829        let dir = tempfile::tempdir().unwrap();
1830        let store = make_store(dir.path());
1831
1832        // Manually save a manifest with epoch=0 (bypassing normal creation)
1833        let m = make_manifest(1, 0);
1834        store.save(&m).unwrap();
1835
1836        let result = store.validate_checkpoint(1).unwrap();
1837        assert!(!result.valid, "epoch=0 should be invalid");
1838        assert!(
1839            result.issues.iter().any(|i| i.contains("epoch")),
1840            "should mention epoch: {:?}",
1841            result.issues
1842        );
1843    }
1844
1845    #[test]
1846    fn test_validate_checkpoint_missing_manifest() {
1847        let dir = tempfile::tempdir().unwrap();
1848        let store = make_store(dir.path());
1849
1850        let result = store.validate_checkpoint(99).unwrap();
1851        assert!(!result.valid);
1852        assert!(result.issues[0].contains("not found"));
1853    }
1854
1855    #[test]
1856    fn test_validate_checkpoint_corrupt_manifest() {
1857        let dir = tempfile::tempdir().unwrap();
1858        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1859
1860        // Create a checkpoint dir with corrupt manifest JSON.
1861        let cp_dir = dir.path().join("checkpoints/checkpoint_000001");
1862        std::fs::create_dir_all(&cp_dir).unwrap();
1863        std::fs::write(cp_dir.join("manifest.json"), "not valid json").unwrap();
1864
1865        // Corrupt manifest is a validation failure, not an I/O error.
1866        let result = store.validate_checkpoint(1).unwrap();
1867        assert!(!result.valid);
1868        assert!(
1869            result.issues[0].contains("corrupt manifest"),
1870            "expected corrupt manifest issue: {:?}",
1871            result.issues
1872        );
1873    }
1874
1875    #[test]
1876    fn test_validate_checkpoint_state_checksum_ok() {
1877        let dir = tempfile::tempdir().unwrap();
1878        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1879
1880        let state = b"important operator state";
1881        let m = make_manifest(1, 1);
1882        store.save_with_state(&m, Some(state)).unwrap();
1883
1884        let result = store.validate_checkpoint(1).unwrap();
1885        assert!(result.valid, "checksum should match: {:?}", result.issues);
1886    }
1887
1888    #[test]
1889    fn test_validate_checkpoint_state_checksum_mismatch() {
1890        let dir = tempfile::tempdir().unwrap();
1891        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1892
1893        // Save with state to get a checksum.
1894        let state = b"original state";
1895        let m = make_manifest(1, 1);
1896        store.save_with_state(&m, Some(state)).unwrap();
1897
1898        // Now corrupt the state.bin on disk.
1899        let state_path = dir.path().join("checkpoints/checkpoint_000001/state.bin");
1900        std::fs::write(&state_path, b"corrupted data!!").unwrap();
1901
1902        let result = store.validate_checkpoint(1).unwrap();
1903        assert!(!result.valid, "corrupted state should be invalid");
1904        assert!(
1905            result
1906                .issues
1907                .iter()
1908                .any(|i| i.contains("checksum mismatch")),
1909            "should report checksum mismatch: {:?}",
1910            result.issues
1911        );
1912    }
1913
1914    #[test]
1915    fn test_validate_checkpoint_state_missing_when_expected() {
1916        let dir = tempfile::tempdir().unwrap();
1917        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1918
1919        // Save with state.
1920        let m = make_manifest(1, 1);
1921        store.save_with_state(&m, Some(b"state")).unwrap();
1922
1923        // Delete the state.bin file to simulate partial crash.
1924        let state_path = dir.path().join("checkpoints/checkpoint_000001/state.bin");
1925        std::fs::remove_file(&state_path).unwrap();
1926
1927        let result = store.validate_checkpoint(1).unwrap();
1928        assert!(!result.valid);
1929        assert!(
1930            result.issues.iter().any(|i| i.contains("not found")),
1931            "should report missing state: {:?}",
1932            result.issues
1933        );
1934    }
1935
1936    #[test]
1937    fn test_recover_latest_validated_skips_corrupt() {
1938        let dir = tempfile::tempdir().unwrap();
1939        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1940
1941        // Save two checkpoints.
1942        store.save(&make_manifest(1, 10)).unwrap();
1943        store.save(&make_manifest(2, 20)).unwrap();
1944
1945        // Corrupt the latest checkpoint's manifest.
1946        let cp2_manifest = dir
1947            .path()
1948            .join("checkpoints/checkpoint_000002/manifest.json");
1949        std::fs::write(cp2_manifest, "<<<corrupt>>>").unwrap();
1950
1951        // Recovery should skip checkpoint 2 and pick checkpoint 1.
1952        let report = store.recover_latest_validated().unwrap();
1953        assert_eq!(report.chosen_id, Some(1));
1954        assert_eq!(report.skipped.len(), 1);
1955        assert_eq!(report.skipped[0].0, 2);
1956        assert_eq!(report.examined, 2);
1957    }
1958
1959    #[test]
1960    fn test_recover_latest_validated_fresh_start() {
1961        let dir = tempfile::tempdir().unwrap();
1962        let store = make_store(dir.path());
1963
1964        let report = store.recover_latest_validated().unwrap();
1965        assert!(report.chosen_id.is_none());
1966        assert_eq!(report.examined, 0);
1967    }
1968
1969    #[test]
1970    fn test_recover_latest_validated_all_corrupt_is_fresh_start() {
1971        let dir = tempfile::tempdir().unwrap();
1972        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1973
1974        // Save a checkpoint, then corrupt it.
1975        store.save(&make_manifest(1, 1)).unwrap();
1976        let cp_manifest = dir
1977            .path()
1978            .join("checkpoints/checkpoint_000001/manifest.json");
1979        std::fs::write(cp_manifest, "corrupt").unwrap();
1980
1981        // The corrupt manifest will cause load_by_id (via list()) to fail,
1982        // so it may not appear in the list at all. Either way, recovery
1983        // should not select it.
1984        let report = store.recover_latest_validated().unwrap();
1985        assert!(report.chosen_id.is_none());
1986    }
1987
1988    #[test]
1989    fn test_cleanup_orphans_removes_stateless_dirs() {
1990        let dir = tempfile::tempdir().unwrap();
1991        let store = FileSystemCheckpointStore::new(dir.path(), 10);
1992
1993        // Create an orphan: state.bin exists but no manifest.json.
1994        let orphan_dir = dir.path().join("checkpoints/checkpoint_000099");
1995        std::fs::create_dir_all(&orphan_dir).unwrap();
1996        std::fs::write(orphan_dir.join("state.bin"), b"orphaned").unwrap();
1997
1998        // Normal checkpoint (has manifest).
1999        store.save(&make_manifest(1, 1)).unwrap();
2000
2001        let cleaned = store.cleanup_orphans().unwrap();
2002        assert_eq!(cleaned, 1);
2003
2004        // Orphan dir should be gone.
2005        assert!(!orphan_dir.exists());
2006        // Normal checkpoint should still be there.
2007        assert!(store.load_by_id(1).unwrap().is_some());
2008    }
2009
2010    #[test]
2011    fn test_cleanup_orphans_noop_when_clean() {
2012        let dir = tempfile::tempdir().unwrap();
2013        let store = FileSystemCheckpointStore::new(dir.path(), 10);
2014
2015        store.save(&make_manifest(1, 1)).unwrap();
2016        let cleaned = store.cleanup_orphans().unwrap();
2017        assert_eq!(cleaned, 0);
2018    }
2019
2020    #[test]
2021    fn test_save_with_state_writes_checksum() {
2022        let dir = tempfile::tempdir().unwrap();
2023        let store = FileSystemCheckpointStore::new(dir.path(), 10);
2024
2025        let state = b"state-data-for-checksum";
2026        let m = make_manifest(1, 1);
2027        store.save_with_state(&m, Some(state)).unwrap();
2028
2029        let loaded = store.load_latest().unwrap().unwrap();
2030        assert!(
2031            loaded.state_checksum.is_some(),
2032            "state_checksum should be set"
2033        );
2034        let expected = sha256_hex(state);
2035        assert_eq!(loaded.state_checksum.unwrap(), expected);
2036    }
2037
2038    #[test]
2039    fn test_state_checksum_backward_compat() {
2040        // Older manifests without state_checksum should deserialize fine.
2041        let json = r#"{
2042            "version": 1,
2043            "checkpoint_id": 1,
2044            "epoch": 1,
2045            "timestamp_ms": 1000
2046        }"#;
2047        let m: CheckpointManifest = serde_json::from_str(json).unwrap();
2048        assert!(m.state_checksum.is_none());
2049    }
2050
2051    // ObjectStore variants
2052
2053    #[test]
2054    fn test_obj_validate_checkpoint_valid() {
2055        let store = make_obj_store();
2056        store.save(&make_manifest(1, 1)).unwrap();
2057
2058        let result = store.validate_checkpoint(1).unwrap();
2059        assert!(result.valid, "valid checkpoint: {:?}", result.issues);
2060    }
2061
2062    #[test]
2063    fn test_obj_validate_checkpoint_missing() {
2064        let store = make_obj_store();
2065        let result = store.validate_checkpoint(99).unwrap();
2066        assert!(!result.valid);
2067    }
2068
2069    #[test]
2070    fn test_obj_validate_state_checksum() {
2071        let store = ObjectStoreCheckpointStore::new(
2072            Arc::new(object_store::memory::InMemory::new()),
2073            String::new(),
2074            10,
2075        )
2076        .unwrap();
2077
2078        let state = b"obj-store-state-data";
2079        let m = make_manifest(1, 1);
2080        store.save_with_state(&m, Some(state)).unwrap();
2081
2082        let result = store.validate_checkpoint(1).unwrap();
2083        assert!(result.valid, "checksum should match: {:?}", result.issues);
2084    }
2085
2086    #[test]
2087    fn test_obj_recover_latest_validated() {
2088        let store = ObjectStoreCheckpointStore::new(
2089            Arc::new(object_store::memory::InMemory::new()),
2090            String::new(),
2091            10,
2092        )
2093        .unwrap();
2094
2095        store.save(&make_manifest(1, 10)).unwrap();
2096        store.save(&make_manifest(2, 20)).unwrap();
2097
2098        let report = store.recover_latest_validated().unwrap();
2099        assert_eq!(report.chosen_id, Some(2));
2100        assert!(report.skipped.is_empty());
2101    }
2102
2103    #[test]
2104    fn test_obj_cleanup_orphans() {
2105        let inner = Arc::new(object_store::memory::InMemory::new());
2106        let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10).unwrap();
2107
2108        // Save a checkpoint (creates manifest + state).
2109        let state = b"state-with-manifest";
2110        store
2111            .save_with_state(&make_manifest(1, 1), Some(state))
2112            .unwrap();
2113
2114        // Write an orphan state file (no manifest).
2115        let rt = tokio::runtime::Builder::new_current_thread()
2116            .enable_all()
2117            .build()
2118            .unwrap();
2119        let orphan_path = object_store::path::Path::from("checkpoints/state-000099.bin");
2120        rt.block_on(async {
2121            inner
2122                .put_opts(
2123                    &orphan_path,
2124                    PutPayload::from_bytes(bytes::Bytes::from_static(b"orphan")),
2125                    PutOptions::default(),
2126                )
2127                .await
2128                .unwrap();
2129        });
2130
2131        let cleaned = store.cleanup_orphans().unwrap();
2132        assert_eq!(cleaned, 1);
2133
2134        // Verify orphan is gone but real state is intact.
2135        let real_state = store.load_state_data(1).unwrap();
2136        assert!(real_state.is_some());
2137    }
2138}