1use std::path::{Path, PathBuf};
20use std::sync::Arc;
21
22use object_store::{GetOptions, ObjectStore, PutMode, PutOptions, PutPayload};
23use sha2::{Digest, Sha256};
24use tracing::warn;
25
26use crate::checkpoint_manifest::CheckpointManifest;
27
28fn sync_file(path: &Path) -> Result<(), std::io::Error> {
30 let f = std::fs::OpenOptions::new().write(true).open(path)?;
32 f.sync_all()
33}
34
35#[allow(clippy::unnecessary_wraps)] fn sync_dir(path: &Path) -> Result<(), std::io::Error> {
41 #[cfg(unix)]
42 {
43 let f = std::fs::File::open(path)?;
44 f.sync_all()?;
45 }
46 #[cfg(not(unix))]
47 {
48 let _ = path;
49 }
50 Ok(())
51}
52
53#[derive(Debug, thiserror::Error)]
55pub enum CheckpointStoreError {
56 #[error("checkpoint I/O error: {0}")]
58 Io(#[from] std::io::Error),
59
60 #[error("checkpoint serialization error: {0}")]
62 Serde(#[from] serde_json::Error),
63
64 #[error("checkpoint {0} not found")]
66 NotFound(u64),
67
68 #[error("object store error: {0}")]
70 ObjectStore(#[from] object_store::Error),
71}
72
73#[derive(Debug, Clone)]
79pub struct ValidationResult {
80 pub checkpoint_id: u64,
82 pub valid: bool,
84 pub issues: Vec<String>,
86}
87
88#[derive(Debug, Clone)]
93pub struct RecoveryReport {
94 pub chosen_id: Option<u64>,
96 pub skipped: Vec<(u64, String)>,
98 pub examined: usize,
100 pub elapsed: std::time::Duration,
102}
103
104fn sha256_hex(data: &[u8]) -> String {
106 let mut hasher = Sha256::new();
107 hasher.update(data);
108 format!("{:x}", hasher.finalize())
109}
110
111pub trait CheckpointStore: Send + Sync {
117 fn save(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError>;
126
127 fn load_latest(&self) -> Result<Option<CheckpointManifest>, CheckpointStoreError>;
135
136 fn load_by_id(&self, id: u64) -> Result<Option<CheckpointManifest>, CheckpointStoreError>;
142
143 fn list(&self) -> Result<Vec<(u64, u64)>, CheckpointStoreError>;
151
152 fn list_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
161 Ok(self.list()?.iter().map(|(id, _)| *id).collect())
163 }
164
165 fn prune(&self, keep_count: usize) -> Result<usize, CheckpointStoreError>;
173
174 fn update_manifest(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError> {
188 self.save(manifest)
189 }
190
191 fn save_state_data(&self, id: u64, data: &[u8]) -> Result<(), CheckpointStoreError>;
197
198 fn load_state_data(&self, id: u64) -> Result<Option<Vec<u8>>, CheckpointStoreError>;
206
207 fn validate_checkpoint(&self, id: u64) -> Result<ValidationResult, CheckpointStoreError> {
216 let mut issues = Vec::new();
217
218 let manifest = match self.load_by_id(id) {
220 Ok(Some(m)) => m,
221 Ok(None) => {
222 return Ok(ValidationResult {
223 checkpoint_id: id,
224 valid: false,
225 issues: vec![format!("manifest not found for checkpoint {id}")],
226 });
227 }
228 Err(CheckpointStoreError::Serde(e)) => {
229 return Ok(ValidationResult {
230 checkpoint_id: id,
231 valid: false,
232 issues: vec![format!("corrupt manifest: {e}")],
233 });
234 }
235 Err(e) => return Err(e),
236 };
237
238 for err in manifest.validate() {
240 issues.push(format!("manifest validation: {err}"));
241 }
242
243 if let Some(expected) = &manifest.state_checksum {
245 match self.load_state_data(id)? {
246 Some(data) => {
247 let actual = sha256_hex(&data);
248 if actual != *expected {
249 issues.push(format!(
250 "state.bin checksum mismatch: expected {expected}, got {actual}"
251 ));
252 }
253 }
254 None => {
255 issues.push("state.bin referenced by checksum but not found".into());
256 }
257 }
258 }
259
260 if manifest.epoch == 0 || manifest.checkpoint_id == 0 {
263 issues.push("epoch or checkpoint_id is 0 — likely corrupted".into());
264 return Ok(ValidationResult {
265 checkpoint_id: id,
266 valid: false,
267 issues,
268 });
269 }
270
271 let valid =
272 issues.is_empty() || issues.iter().all(|i| i.starts_with("manifest validation:"));
273 Ok(ValidationResult {
274 checkpoint_id: id,
275 valid,
276 issues,
277 })
278 }
279
280 fn recover_latest_validated(&self) -> Result<RecoveryReport, CheckpointStoreError> {
289 let start = std::time::Instant::now();
290 let mut skipped = Vec::new();
291
292 let mut ids = self.list_ids()?;
296 ids.sort_unstable();
297 ids.reverse();
298
299 let examined = ids.len();
300
301 for id in &ids {
302 let result = self.validate_checkpoint(*id)?;
303 if result.valid {
304 return Ok(RecoveryReport {
305 chosen_id: Some(*id),
306 skipped,
307 examined,
308 elapsed: start.elapsed(),
309 });
310 }
311 let reason = result.issues.join("; ");
312 warn!(
313 checkpoint_id = id,
314 reason = %reason,
315 "skipping invalid checkpoint"
316 );
317 skipped.push((*id, reason));
318 }
319
320 Ok(RecoveryReport {
321 chosen_id: None,
322 skipped,
323 examined,
324 elapsed: start.elapsed(),
325 })
326 }
327
328 fn cleanup_orphans(&self) -> Result<usize, CheckpointStoreError> {
336 Ok(0)
338 }
339
340 fn save_with_state(
354 &self,
355 manifest: &CheckpointManifest,
356 state_data: Option<&[u8]>,
357 ) -> Result<(), CheckpointStoreError> {
358 let mut manifest = manifest.clone();
359 if let Some(data) = state_data {
360 manifest.state_checksum = Some(sha256_hex(data));
366 self.save_state_data(manifest.checkpoint_id, data)?;
367 }
368 self.save(&manifest)
369 }
370}
371
372pub struct FileSystemCheckpointStore {
378 base_dir: PathBuf,
379 max_retained: usize,
380}
381
382impl FileSystemCheckpointStore {
383 #[must_use]
388 pub fn new(base_dir: impl Into<PathBuf>, max_retained: usize) -> Self {
389 Self {
390 base_dir: base_dir.into(),
391 max_retained,
392 }
393 }
394
395 fn checkpoints_dir(&self) -> PathBuf {
397 self.base_dir.join("checkpoints")
398 }
399
400 fn checkpoint_dir(&self, id: u64) -> PathBuf {
402 self.checkpoints_dir().join(format!("checkpoint_{id:06}"))
403 }
404
405 fn manifest_path(&self, id: u64) -> PathBuf {
407 self.checkpoint_dir(id).join("manifest.json")
408 }
409
410 fn state_path(&self, id: u64) -> PathBuf {
412 self.checkpoint_dir(id).join("state.bin")
413 }
414
415 fn latest_path(&self) -> PathBuf {
417 self.checkpoints_dir().join("latest.txt")
418 }
419
420 fn parse_checkpoint_id(name: &str) -> Option<u64> {
422 name.strip_prefix("checkpoint_")
423 .and_then(|s| s.parse().ok())
424 }
425
426 fn sorted_checkpoint_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
428 let dir = self.checkpoints_dir();
429 if !dir.exists() {
430 return Ok(Vec::new());
431 }
432
433 let mut ids: Vec<u64> = std::fs::read_dir(&dir)?
434 .filter_map(Result::ok)
435 .filter(|e| e.path().is_dir())
436 .filter_map(|e| e.file_name().to_str().and_then(Self::parse_checkpoint_id))
437 .collect();
438
439 ids.sort_unstable();
440 Ok(ids)
441 }
442}
443
444impl FileSystemCheckpointStore {
445 fn find_orphan_dirs(&self) -> Result<Vec<PathBuf>, CheckpointStoreError> {
448 let dir = self.checkpoints_dir();
449 if !dir.exists() {
450 return Ok(Vec::new());
451 }
452
453 let mut orphans = Vec::new();
454 for entry in std::fs::read_dir(&dir)? {
455 let entry = entry?;
456 let path = entry.path();
457 if !path.is_dir() {
458 continue;
459 }
460 let has_state = path.join("state.bin").exists();
461 let has_manifest = path.join("manifest.json").exists();
462 if has_state && !has_manifest {
463 orphans.push(path);
464 }
465 }
466 Ok(orphans)
467 }
468}
469
470impl CheckpointStore for FileSystemCheckpointStore {
471 fn save(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError> {
472 let cp_dir = self.checkpoint_dir(manifest.checkpoint_id);
473 std::fs::create_dir_all(&cp_dir)?;
474
475 let manifest_path = self.manifest_path(manifest.checkpoint_id);
476 let json = serde_json::to_string_pretty(manifest)?;
477
478 let tmp_path = manifest_path.with_extension("json.tmp");
480 std::fs::write(&tmp_path, &json)?;
481 sync_file(&tmp_path)?;
482 std::fs::rename(&tmp_path, &manifest_path)?;
483 sync_dir(&cp_dir)?;
484
485 let latest = self.latest_path();
487 let latest_dir = latest.parent().unwrap_or(Path::new("."));
488 std::fs::create_dir_all(latest_dir)?;
489 let latest_content = format!("checkpoint_{:06}", manifest.checkpoint_id);
490 let tmp_latest = latest.with_extension("txt.tmp");
491 std::fs::write(&tmp_latest, &latest_content)?;
492 sync_file(&tmp_latest)?;
493 std::fs::rename(&tmp_latest, &latest)?;
494 sync_dir(latest_dir)?;
495
496 if self.max_retained > 0 {
498 if let Err(e) = self.prune(self.max_retained) {
499 tracing::warn!(
500 max_retained = self.max_retained,
501 error = %e,
502 "[LDB-6009] Checkpoint prune failed — old checkpoints may accumulate on disk"
503 );
504 }
505 }
506
507 Ok(())
508 }
509
510 fn load_latest(&self) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
511 let latest = self.latest_path();
512 if !latest.exists() {
513 return Ok(None);
514 }
515
516 let content = std::fs::read_to_string(&latest)?;
517 let dir_name = content.trim();
518 if dir_name.is_empty() {
519 return Ok(None);
520 }
521
522 let id = Self::parse_checkpoint_id(dir_name);
523 match id {
524 Some(id) => self.load_by_id(id),
525 None => Ok(None),
526 }
527 }
528
529 fn load_by_id(&self, id: u64) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
530 let path = self.manifest_path(id);
531 if !path.exists() {
532 return Ok(None);
533 }
534
535 let json = std::fs::read_to_string(&path)?;
536 let manifest: CheckpointManifest = serde_json::from_str(&json)?;
537
538 let errors = manifest.validate();
540 if !errors.is_empty() {
541 tracing::warn!(
542 checkpoint_id = id,
543 error_count = errors.len(),
544 first_error = %errors[0],
545 "loaded checkpoint manifest has validation warnings"
546 );
547 }
548
549 Ok(Some(manifest))
550 }
551
552 fn list_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
553 self.sorted_checkpoint_ids()
554 }
555
556 fn list(&self) -> Result<Vec<(u64, u64)>, CheckpointStoreError> {
557 let ids = self.sorted_checkpoint_ids()?;
558 let mut result = Vec::with_capacity(ids.len());
559
560 for id in ids {
561 if let Ok(Some(manifest)) = self.load_by_id(id) {
563 result.push((manifest.checkpoint_id, manifest.epoch));
564 }
565 }
566
567 Ok(result)
568 }
569
570 fn prune(&self, keep_count: usize) -> Result<usize, CheckpointStoreError> {
571 let ids = self.sorted_checkpoint_ids()?;
572 if ids.len() <= keep_count {
573 return Ok(0);
574 }
575
576 let to_remove = ids.len() - keep_count;
577 let mut removed = 0;
578
579 for &id in &ids[..to_remove] {
580 let dir = self.checkpoint_dir(id);
581 if std::fs::remove_dir_all(&dir).is_ok() {
582 removed += 1;
583 }
584 }
585
586 Ok(removed)
587 }
588
589 fn save_state_data(&self, id: u64, data: &[u8]) -> Result<(), CheckpointStoreError> {
590 let cp_dir = self.checkpoint_dir(id);
591 std::fs::create_dir_all(&cp_dir)?;
592
593 let path = self.state_path(id);
594 let tmp = path.with_extension("bin.tmp");
595 std::fs::write(&tmp, data)?;
596 sync_file(&tmp)?;
597 std::fs::rename(&tmp, &path)?;
598 sync_dir(&cp_dir)?;
599
600 Ok(())
601 }
602
603 fn save_with_state(
604 &self,
605 manifest: &CheckpointManifest,
606 state_data: Option<&[u8]>,
607 ) -> Result<(), CheckpointStoreError> {
608 let mut manifest = manifest.clone();
609 if let Some(data) = state_data {
612 manifest.state_checksum = Some(sha256_hex(data));
613 self.save_state_data(manifest.checkpoint_id, data)?;
614 }
615 self.save(&manifest)
616 }
617
618 fn cleanup_orphans(&self) -> Result<usize, CheckpointStoreError> {
619 let orphans = self.find_orphan_dirs()?;
620 let mut cleaned = 0;
621 for dir in &orphans {
622 if std::fs::remove_dir_all(dir).is_ok() {
623 tracing::info!(
624 path = %dir.display(),
625 "cleaned up orphaned checkpoint directory"
626 );
627 cleaned += 1;
628 }
629 }
630 Ok(cleaned)
631 }
632
633 fn load_state_data(&self, id: u64) -> Result<Option<Vec<u8>>, CheckpointStoreError> {
634 let path = self.state_path(id);
635 if !path.exists() {
636 return Ok(None);
637 }
638 let data = std::fs::read(&path)?;
639 Ok(Some(data))
640 }
641}
642
643#[derive(serde::Serialize, serde::Deserialize)]
649struct LatestPointer {
650 checkpoint_id: u64,
651}
652
653pub struct ObjectStoreCheckpointStore {
686 store: Arc<dyn ObjectStore>,
687 prefix: String,
688 max_retained: usize,
689 rt: tokio::runtime::Runtime,
692}
693
694impl ObjectStoreCheckpointStore {
695 pub fn new(
704 store: Arc<dyn ObjectStore>,
705 prefix: String,
706 max_retained: usize,
707 ) -> std::io::Result<Self> {
708 let rt = tokio::runtime::Builder::new_current_thread()
709 .enable_all()
710 .build()?;
711 Ok(Self {
712 store,
713 prefix,
714 max_retained,
715 rt,
716 })
717 }
718
719 fn manifest_path(&self, id: u64) -> object_store::path::Path {
722 object_store::path::Path::from(format!("{}manifests/manifest-{id:06}.json", self.prefix))
723 }
724
725 fn latest_pointer_path(&self) -> object_store::path::Path {
726 object_store::path::Path::from(format!("{}manifests/latest.json", self.prefix))
727 }
728
729 fn state_path(&self, id: u64) -> object_store::path::Path {
730 object_store::path::Path::from(format!("{}checkpoints/state-{id:06}.bin", self.prefix))
731 }
732
733 fn legacy_manifest_path(&self, id: u64) -> object_store::path::Path {
736 object_store::path::Path::from(format!(
737 "{}checkpoints/checkpoint_{id:06}/manifest.json",
738 self.prefix
739 ))
740 }
741
742 fn legacy_state_path(&self, id: u64) -> object_store::path::Path {
743 object_store::path::Path::from(format!(
744 "{}checkpoints/checkpoint_{id:06}/state.bin",
745 self.prefix
746 ))
747 }
748
749 fn legacy_latest_path(&self) -> object_store::path::Path {
750 object_store::path::Path::from(format!("{}checkpoints/latest.txt", self.prefix))
751 }
752
753 fn get_bytes(
757 &self,
758 path: &object_store::path::Path,
759 ) -> Result<Option<bytes::Bytes>, CheckpointStoreError> {
760 let result = self
761 .rt
762 .block_on(async { self.store.get_opts(path, GetOptions::default()).await });
763
764 match result {
765 Ok(get_result) => {
766 let data = self.rt.block_on(async { get_result.bytes().await })?;
767 Ok(Some(data))
768 }
769 Err(object_store::Error::NotFound { .. }) => Ok(None),
770 Err(e) => Err(CheckpointStoreError::ObjectStore(e)),
771 }
772 }
773
774 fn load_manifest_at(
776 &self,
777 path: &object_store::path::Path,
778 ) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
779 match self.get_bytes(path)? {
780 Some(data) => {
781 let manifest: CheckpointManifest = serde_json::from_slice(&data)?;
782 Ok(Some(manifest))
783 }
784 None => Ok(None),
785 }
786 }
787
788 fn list_checkpoint_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
790 let mut ids = std::collections::BTreeSet::new();
791
792 let manifests_prefix = object_store::path::Path::from(format!("{}manifests/", self.prefix));
794 let entries: Vec<_> = self.rt.block_on(async {
795 use futures::TryStreamExt;
796 self.store
797 .list(Some(&manifests_prefix))
798 .try_collect::<Vec<_>>()
799 .await
800 })?;
801 for entry in &entries {
802 let path_str = entry.location.as_ref();
803 for segment in path_str.split('/') {
804 if let Some(rest) = segment.strip_prefix("manifest-") {
805 if let Some(id_str) = rest.strip_suffix(".json") {
806 if let Ok(id) = id_str.parse::<u64>() {
807 ids.insert(id);
808 }
809 }
810 }
811 }
812 }
813
814 let checkpoints_prefix =
816 object_store::path::Path::from(format!("{}checkpoints/", self.prefix));
817 let entries: Vec<_> = self.rt.block_on(async {
818 use futures::TryStreamExt;
819 self.store
820 .list(Some(&checkpoints_prefix))
821 .try_collect::<Vec<_>>()
822 .await
823 })?;
824 for entry in &entries {
825 let path_str = entry.location.as_ref();
826 if !path_str.ends_with("manifest.json") {
827 continue;
828 }
829 for segment in path_str.split('/') {
830 if let Some(id_str) = segment.strip_prefix("checkpoint_") {
831 if let Ok(id) = id_str.parse::<u64>() {
832 ids.insert(id);
833 }
834 }
835 }
836 }
837
838 Ok(ids.into_iter().collect())
839 }
840}
841
842impl CheckpointStore for ObjectStoreCheckpointStore {
843 fn save(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError> {
844 let json = serde_json::to_string_pretty(manifest)?;
845 let path = self.manifest_path(manifest.checkpoint_id);
846 let json_bytes = bytes::Bytes::from(json);
847
848 let create_opts = PutOptions {
850 mode: PutMode::Create,
851 ..PutOptions::default()
852 };
853 let result = self.rt.block_on(async {
854 self.store
855 .put_opts(
856 &path,
857 PutPayload::from_bytes(json_bytes.clone()),
858 create_opts,
859 )
860 .await
861 });
862
863 match result {
864 Ok(_) => {}
865 Err(object_store::Error::AlreadyExists { .. }) => {
866 tracing::warn!(
867 checkpoint_id = manifest.checkpoint_id,
868 "[LDB-6010] Manifest already exists — skipping write"
869 );
870 }
871 Err(object_store::Error::NotImplemented { .. }) => {
872 self.rt.block_on(async {
874 self.store
875 .put_opts(
876 &path,
877 PutPayload::from_bytes(json_bytes),
878 PutOptions::default(),
879 )
880 .await
881 })?;
882 }
883 Err(e) => return Err(CheckpointStoreError::ObjectStore(e)),
884 }
885
886 let latest = self.latest_pointer_path();
888 let pointer = serde_json::to_string(&LatestPointer {
889 checkpoint_id: manifest.checkpoint_id,
890 })?;
891 let payload = PutPayload::from_bytes(bytes::Bytes::from(pointer));
892 self.rt.block_on(async {
893 self.store
894 .put_opts(&latest, payload, PutOptions::default())
895 .await
896 })?;
897
898 if self.max_retained > 0 {
900 if let Err(e) = self.prune(self.max_retained) {
901 tracing::warn!(
902 max_retained = self.max_retained,
903 error = %e,
904 "[LDB-6009] Object store checkpoint prune failed"
905 );
906 }
907 }
908
909 Ok(())
910 }
911
912 fn update_manifest(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError> {
913 let json = serde_json::to_string_pretty(manifest)?;
914 let path = self.manifest_path(manifest.checkpoint_id);
915 let payload = PutPayload::from_bytes(bytes::Bytes::from(json));
916
917 self.rt.block_on(async {
919 self.store
920 .put_opts(&path, payload, PutOptions::default())
921 .await
922 })?;
923
924 Ok(())
925 }
926
927 fn load_latest(&self) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
928 if let Some(data) = self.get_bytes(&self.latest_pointer_path())? {
930 let pointer: LatestPointer = serde_json::from_slice(&data)?;
931 return self.load_by_id(pointer.checkpoint_id);
932 }
933
934 if let Some(data) = self.get_bytes(&self.legacy_latest_path())? {
936 let content = String::from_utf8_lossy(&data);
937 let dir_name = content.trim();
938 if dir_name.is_empty() {
939 return Ok(None);
940 }
941 let id = dir_name
942 .strip_prefix("checkpoint_")
943 .and_then(|s| s.parse::<u64>().ok());
944 return match id {
945 Some(id) => self.load_by_id(id),
946 None => Ok(None),
947 };
948 }
949
950 Ok(None)
951 }
952
953 fn load_by_id(&self, id: u64) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
954 if let Some(m) = self.load_manifest_at(&self.manifest_path(id))? {
956 return Ok(Some(m));
957 }
958 self.load_manifest_at(&self.legacy_manifest_path(id))
960 }
961
962 fn list_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
963 self.list_checkpoint_ids()
964 }
965
966 fn list(&self) -> Result<Vec<(u64, u64)>, CheckpointStoreError> {
967 let ids = self.list_checkpoint_ids()?;
968 let mut result = Vec::with_capacity(ids.len());
969
970 for id in ids {
971 if let Ok(Some(manifest)) = self.load_by_id(id) {
972 result.push((manifest.checkpoint_id, manifest.epoch));
973 }
974 }
975
976 Ok(result)
977 }
978
979 fn prune(&self, keep_count: usize) -> Result<usize, CheckpointStoreError> {
980 let ids = self.list_checkpoint_ids()?;
981 if ids.len() <= keep_count {
982 return Ok(0);
983 }
984
985 let to_remove = ids.len() - keep_count;
986 let mut removed = 0;
987
988 for &id in &ids[..to_remove] {
989 let paths = vec![
991 Ok(self.manifest_path(id)),
992 Ok(self.state_path(id)),
993 Ok(self.legacy_manifest_path(id)),
994 Ok(self.legacy_state_path(id)),
995 ];
996
997 self.rt.block_on(async {
998 use futures::StreamExt;
999 let stream = futures::stream::iter(paths).boxed();
1000 let mut results = self.store.delete_stream(stream);
1001 while let Some(_result) = results.next().await {
1002 }
1004 });
1005 removed += 1;
1006 }
1007
1008 Ok(removed)
1009 }
1010
1011 fn save_state_data(&self, id: u64, data: &[u8]) -> Result<(), CheckpointStoreError> {
1012 let path = self.state_path(id);
1013 let payload = PutPayload::from_bytes(bytes::Bytes::copy_from_slice(data));
1014 self.rt.block_on(async {
1015 self.store
1016 .put_opts(&path, payload, PutOptions::default())
1017 .await
1018 })?;
1019 Ok(())
1020 }
1021
1022 fn load_state_data(&self, id: u64) -> Result<Option<Vec<u8>>, CheckpointStoreError> {
1023 if let Some(data) = self.get_bytes(&self.state_path(id))? {
1025 return Ok(Some(data.to_vec()));
1026 }
1027 Ok(self
1029 .get_bytes(&self.legacy_state_path(id))?
1030 .map(|d| d.to_vec()))
1031 }
1032
1033 fn cleanup_orphans(&self) -> Result<usize, CheckpointStoreError> {
1034 let manifest_ids: std::collections::BTreeSet<u64> =
1036 self.list_checkpoint_ids()?.into_iter().collect();
1037
1038 let state_prefix = object_store::path::Path::from(format!("{}checkpoints/", self.prefix));
1040 let entries: Vec<_> = self.rt.block_on(async {
1041 use futures::TryStreamExt;
1042 self.store
1043 .list(Some(&state_prefix))
1044 .try_collect::<Vec<_>>()
1045 .await
1046 })?;
1047
1048 let mut orphan_paths = Vec::new();
1049 for entry in &entries {
1050 let path_str = entry.location.as_ref();
1051 for segment in path_str.split('/') {
1052 if let Some(rest) = segment.strip_prefix("state-") {
1053 if let Some(id_str) = rest.strip_suffix(".bin") {
1054 if let Ok(id) = id_str.parse::<u64>() {
1055 if !manifest_ids.contains(&id) {
1056 orphan_paths.push(entry.location.clone());
1057 }
1058 }
1059 }
1060 }
1061 }
1062 }
1063
1064 let count = orphan_paths.len();
1065 if !orphan_paths.is_empty() {
1066 self.rt.block_on(async {
1067 use futures::StreamExt;
1068 let stream = futures::stream::iter(orphan_paths.into_iter().map(Ok)).boxed();
1069 let mut results = self.store.delete_stream(stream);
1070 while let Some(result) = results.next().await {
1071 if let Err(e) = result {
1072 tracing::warn!(error = %e, "failed to delete orphan state file");
1073 }
1074 }
1075 });
1076 }
1077
1078 Ok(count)
1079 }
1080}
1081
1082#[cfg(test)]
1083mod tests {
1084 use super::*;
1085 use crate::checkpoint_manifest::{ConnectorCheckpoint, OperatorCheckpoint};
1086 #[allow(clippy::disallowed_types)] use std::collections::HashMap;
1088
1089 fn make_store(dir: &Path) -> FileSystemCheckpointStore {
1090 FileSystemCheckpointStore::new(dir, 3)
1091 }
1092
1093 fn make_manifest(id: u64, epoch: u64) -> CheckpointManifest {
1094 CheckpointManifest::new(id, epoch)
1095 }
1096
1097 #[test]
1098 fn test_save_and_load_latest() {
1099 let dir = tempfile::tempdir().unwrap();
1100 let store = make_store(dir.path());
1101
1102 let m = make_manifest(1, 1);
1103 store.save(&m).unwrap();
1104
1105 let loaded = store.load_latest().unwrap().unwrap();
1106 assert_eq!(loaded.checkpoint_id, 1);
1107 assert_eq!(loaded.epoch, 1);
1108 }
1109
1110 #[test]
1111 fn test_load_latest_returns_none_when_empty() {
1112 let dir = tempfile::tempdir().unwrap();
1113 let store = make_store(dir.path());
1114 assert!(store.load_latest().unwrap().is_none());
1115 }
1116
1117 #[test]
1118 fn test_load_latest_returns_most_recent() {
1119 let dir = tempfile::tempdir().unwrap();
1120 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1121
1122 for i in 1..=5 {
1123 store.save(&make_manifest(i, i)).unwrap();
1124 }
1125
1126 let latest = store.load_latest().unwrap().unwrap();
1127 assert_eq!(latest.checkpoint_id, 5);
1128 assert_eq!(latest.epoch, 5);
1129 }
1130
1131 #[test]
1132 fn test_load_by_id() {
1133 let dir = tempfile::tempdir().unwrap();
1134 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1135
1136 store.save(&make_manifest(1, 10)).unwrap();
1137 store.save(&make_manifest(2, 20)).unwrap();
1138
1139 let m = store.load_by_id(1).unwrap().unwrap();
1140 assert_eq!(m.epoch, 10);
1141
1142 let m = store.load_by_id(2).unwrap().unwrap();
1143 assert_eq!(m.epoch, 20);
1144
1145 assert!(store.load_by_id(99).unwrap().is_none());
1146 }
1147
1148 #[test]
1149 fn test_list() {
1150 let dir = tempfile::tempdir().unwrap();
1151 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1152
1153 store.save(&make_manifest(1, 10)).unwrap();
1154 store.save(&make_manifest(3, 30)).unwrap();
1155 store.save(&make_manifest(2, 20)).unwrap();
1156
1157 let list = store.list().unwrap();
1158 assert_eq!(list, vec![(1, 10), (2, 20), (3, 30)]);
1159 }
1160
1161 #[test]
1162 fn test_prune_keeps_max() {
1163 let dir = tempfile::tempdir().unwrap();
1164 let store = FileSystemCheckpointStore::new(dir.path(), 10); for i in 1..=5 {
1167 store.save(&make_manifest(i, i)).unwrap();
1168 }
1169
1170 let removed = store.prune(2).unwrap();
1171 assert_eq!(removed, 3);
1172
1173 let list = store.list().unwrap();
1174 assert_eq!(list.len(), 2);
1175 assert_eq!(list[0].0, 4);
1176 assert_eq!(list[1].0, 5);
1177 }
1178
1179 #[test]
1180 fn test_auto_prune_on_save() {
1181 let dir = tempfile::tempdir().unwrap();
1182 let store = FileSystemCheckpointStore::new(dir.path(), 2);
1183
1184 for i in 1..=5 {
1185 store.save(&make_manifest(i, i)).unwrap();
1186 }
1187
1188 let list = store.list().unwrap();
1189 assert_eq!(list.len(), 2);
1190 assert_eq!(list[0].0, 4);
1192 assert_eq!(list[1].0, 5);
1193 }
1194
1195 #[test]
1196 fn test_save_and_load_state_data() {
1197 let dir = tempfile::tempdir().unwrap();
1198 let store = make_store(dir.path());
1199
1200 store.save(&make_manifest(1, 1)).unwrap();
1201
1202 let data = b"large operator state binary blob";
1203 store.save_state_data(1, data).unwrap();
1204
1205 let loaded = store.load_state_data(1).unwrap().unwrap();
1206 assert_eq!(loaded, data);
1207 }
1208
1209 #[test]
1210 fn test_load_state_data_returns_none() {
1211 let dir = tempfile::tempdir().unwrap();
1212 let store = make_store(dir.path());
1213 assert!(store.load_state_data(99).unwrap().is_none());
1214 }
1215
1216 #[test]
1217 fn test_full_manifest_round_trip() {
1218 let dir = tempfile::tempdir().unwrap();
1219 let store = make_store(dir.path());
1220
1221 let mut m = make_manifest(1, 5);
1222 m.source_offsets.insert(
1223 "kafka-src".into(),
1224 ConnectorCheckpoint::with_offsets(
1225 5,
1226 HashMap::from([("0".into(), "1000".into()), ("1".into(), "2000".into())]),
1227 ),
1228 );
1229 m.sink_epochs.insert("pg-sink".into(), 4);
1230 m.table_offsets.insert(
1231 "instruments".into(),
1232 ConnectorCheckpoint::with_offsets(5, HashMap::from([("lsn".into(), "0/AB".into())])),
1233 );
1234 m.operator_states
1235 .insert("window".into(), OperatorCheckpoint::inline(b"data"));
1236 m.watermark = Some(999_000);
1237 m.wal_position = 4096;
1238 m.per_core_wal_positions = vec![100, 200];
1239
1240 store.save(&m).unwrap();
1241
1242 let loaded = store.load_latest().unwrap().unwrap();
1243 assert_eq!(loaded.checkpoint_id, 1);
1244 assert_eq!(loaded.epoch, 5);
1245 assert_eq!(loaded.watermark, Some(999_000));
1246 assert_eq!(loaded.wal_position, 4096);
1247 assert_eq!(loaded.per_core_wal_positions, vec![100, 200]);
1248
1249 let src = loaded.source_offsets.get("kafka-src").unwrap();
1250 assert_eq!(src.offsets.get("0"), Some(&"1000".into()));
1251
1252 assert_eq!(loaded.sink_epochs.get("pg-sink"), Some(&4));
1253
1254 let tbl = loaded.table_offsets.get("instruments").unwrap();
1255 assert_eq!(tbl.offsets.get("lsn"), Some(&"0/AB".into()));
1256
1257 let op = loaded.operator_states.get("window").unwrap();
1258 assert_eq!(op.decode_inline().unwrap(), b"data");
1259 }
1260
1261 #[test]
1262 fn test_empty_latest_txt() {
1263 let dir = tempfile::tempdir().unwrap();
1264 let store = make_store(dir.path());
1265
1266 let cp_dir = dir.path().join("checkpoints");
1267 std::fs::create_dir_all(&cp_dir).unwrap();
1268 std::fs::write(cp_dir.join("latest.txt"), "").unwrap();
1269
1270 assert!(store.load_latest().unwrap().is_none());
1271 }
1272
1273 #[test]
1274 fn test_latest_points_to_missing_checkpoint() {
1275 let dir = tempfile::tempdir().unwrap();
1276 let store = make_store(dir.path());
1277
1278 let cp_dir = dir.path().join("checkpoints");
1279 std::fs::create_dir_all(&cp_dir).unwrap();
1280 std::fs::write(cp_dir.join("latest.txt"), "checkpoint_000099").unwrap();
1281
1282 assert!(store.load_latest().unwrap().is_none());
1283 }
1284
1285 #[test]
1286 fn test_prune_no_op_when_under_limit() {
1287 let dir = tempfile::tempdir().unwrap();
1288 let store = make_store(dir.path());
1289
1290 store.save(&make_manifest(1, 1)).unwrap();
1291 let removed = store.prune(5).unwrap();
1292 assert_eq!(removed, 0);
1293 }
1294
1295 #[test]
1296 fn test_save_with_state_writes_sidecar_before_manifest() {
1297 let dir = tempfile::tempdir().unwrap();
1298 let store = make_store(dir.path());
1299
1300 let m = make_manifest(1, 1);
1301 let state = b"large-operator-state-blob";
1302 store.save_with_state(&m, Some(state)).unwrap();
1303
1304 let loaded = store.load_latest().unwrap().unwrap();
1306 assert_eq!(loaded.checkpoint_id, 1);
1307
1308 let loaded_state = store.load_state_data(1).unwrap().unwrap();
1309 assert_eq!(loaded_state, state);
1310 }
1311
1312 #[test]
1313 fn test_save_with_state_none_is_same_as_save() {
1314 let dir = tempfile::tempdir().unwrap();
1315 let store = make_store(dir.path());
1316
1317 let m = make_manifest(1, 1);
1318 store.save_with_state(&m, None).unwrap();
1319
1320 let loaded = store.load_latest().unwrap().unwrap();
1321 assert_eq!(loaded.checkpoint_id, 1);
1322 assert!(store.load_state_data(1).unwrap().is_none());
1323 }
1324
1325 #[test]
1326 fn test_orphaned_state_without_manifest_is_ignored() {
1327 let dir = tempfile::tempdir().unwrap();
1328 let store = make_store(dir.path());
1329
1330 store.save_state_data(1, b"orphaned").unwrap();
1333
1334 assert!(store.load_latest().unwrap().is_none());
1336
1337 assert!(store.list().unwrap().is_empty());
1339 }
1340
1341 fn make_obj_store() -> ObjectStoreCheckpointStore {
1346 let store = Arc::new(object_store::memory::InMemory::new());
1347 ObjectStoreCheckpointStore::new(store, String::new(), 3).unwrap()
1348 }
1349
1350 fn make_obj_store_shared(
1351 store: Arc<object_store::memory::InMemory>,
1352 ) -> ObjectStoreCheckpointStore {
1353 ObjectStoreCheckpointStore::new(store, String::new(), 10).unwrap()
1354 }
1355
1356 fn write_legacy_manifest(
1358 store: &Arc<object_store::memory::InMemory>,
1359 prefix: &str,
1360 manifest: &CheckpointManifest,
1361 ) {
1362 let rt = tokio::runtime::Builder::new_current_thread()
1363 .enable_all()
1364 .build()
1365 .unwrap();
1366 let json = serde_json::to_string_pretty(manifest).unwrap();
1367
1368 let path = object_store::path::Path::from(format!(
1369 "{}checkpoints/checkpoint_{:06}/manifest.json",
1370 prefix, manifest.checkpoint_id
1371 ));
1372 rt.block_on(async {
1373 store
1374 .put_opts(
1375 &path,
1376 PutPayload::from_bytes(bytes::Bytes::from(json)),
1377 PutOptions::default(),
1378 )
1379 .await
1380 .unwrap();
1381 });
1382
1383 let latest = object_store::path::Path::from(format!("{prefix}checkpoints/latest.txt"));
1384 let content = format!("checkpoint_{:06}", manifest.checkpoint_id);
1385 rt.block_on(async {
1386 store
1387 .put_opts(
1388 &latest,
1389 PutPayload::from_bytes(bytes::Bytes::from(content)),
1390 PutOptions::default(),
1391 )
1392 .await
1393 .unwrap();
1394 });
1395 }
1396
1397 #[test]
1398 fn test_obj_save_and_load_latest() {
1399 let store = make_obj_store();
1400 let m = make_manifest(1, 1);
1401 store.save(&m).unwrap();
1402
1403 let loaded = store.load_latest().unwrap().unwrap();
1404 assert_eq!(loaded.checkpoint_id, 1);
1405 assert_eq!(loaded.epoch, 1);
1406 }
1407
1408 #[test]
1409 fn test_obj_load_latest_returns_none_when_empty() {
1410 let store = make_obj_store();
1411 assert!(store.load_latest().unwrap().is_none());
1412 }
1413
1414 #[test]
1415 fn test_obj_load_by_id() {
1416 let store = ObjectStoreCheckpointStore::new(
1417 Arc::new(object_store::memory::InMemory::new()),
1418 String::new(),
1419 10,
1420 )
1421 .unwrap();
1422
1423 store.save(&make_manifest(1, 10)).unwrap();
1424 store.save(&make_manifest(2, 20)).unwrap();
1425
1426 let m = store.load_by_id(1).unwrap().unwrap();
1427 assert_eq!(m.epoch, 10);
1428 let m = store.load_by_id(2).unwrap().unwrap();
1429 assert_eq!(m.epoch, 20);
1430 assert!(store.load_by_id(99).unwrap().is_none());
1431 }
1432
1433 #[test]
1434 fn test_obj_list() {
1435 let store = ObjectStoreCheckpointStore::new(
1436 Arc::new(object_store::memory::InMemory::new()),
1437 String::new(),
1438 10,
1439 )
1440 .unwrap();
1441
1442 store.save(&make_manifest(1, 10)).unwrap();
1443 store.save(&make_manifest(3, 30)).unwrap();
1444 store.save(&make_manifest(2, 20)).unwrap();
1445
1446 let list = store.list().unwrap();
1447 assert_eq!(list, vec![(1, 10), (2, 20), (3, 30)]);
1448 }
1449
1450 #[test]
1451 fn test_obj_prune() {
1452 let store = ObjectStoreCheckpointStore::new(
1453 Arc::new(object_store::memory::InMemory::new()),
1454 String::new(),
1455 10,
1456 )
1457 .unwrap();
1458
1459 for i in 1..=5 {
1460 store.save(&make_manifest(i, i)).unwrap();
1461 }
1462
1463 let removed = store.prune(2).unwrap();
1464 assert_eq!(removed, 3);
1465
1466 let list = store.list().unwrap();
1467 assert_eq!(list.len(), 2);
1468 assert_eq!(list[0].0, 4);
1469 assert_eq!(list[1].0, 5);
1470 }
1471
1472 #[test]
1473 fn test_obj_auto_prune_on_save() {
1474 let store = ObjectStoreCheckpointStore::new(
1475 Arc::new(object_store::memory::InMemory::new()),
1476 String::new(),
1477 2,
1478 )
1479 .unwrap();
1480
1481 for i in 1..=5 {
1482 store.save(&make_manifest(i, i)).unwrap();
1483 }
1484
1485 let list = store.list().unwrap();
1486 assert_eq!(list.len(), 2);
1487 assert_eq!(list[0].0, 4);
1488 assert_eq!(list[1].0, 5);
1489 }
1490
1491 #[test]
1492 fn test_obj_save_and_load_state_data() {
1493 let store = make_obj_store();
1494 store.save(&make_manifest(1, 1)).unwrap();
1495
1496 let data = b"large operator state binary blob";
1497 store.save_state_data(1, data).unwrap();
1498
1499 let loaded = store.load_state_data(1).unwrap().unwrap();
1500 assert_eq!(loaded, data);
1501 }
1502
1503 #[test]
1504 fn test_obj_load_state_data_returns_none() {
1505 let store = make_obj_store();
1506 assert!(store.load_state_data(99).unwrap().is_none());
1507 }
1508
1509 #[test]
1510 fn test_obj_with_prefix() {
1511 let inner = Arc::new(object_store::memory::InMemory::new());
1512 let store =
1513 ObjectStoreCheckpointStore::new(inner, "nodes/abc123/".to_string(), 10).unwrap();
1514
1515 store.save(&make_manifest(1, 42)).unwrap();
1516 let loaded = store.load_latest().unwrap().unwrap();
1517 assert_eq!(loaded.checkpoint_id, 1);
1518 assert_eq!(loaded.epoch, 42);
1519 }
1520
1521 #[test]
1526 fn test_obj_v2_layout_paths() {
1527 let inner = Arc::new(object_store::memory::InMemory::new());
1528 let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10).unwrap();
1529
1530 store.save(&make_manifest(1, 10)).unwrap();
1531
1532 let rt = tokio::runtime::Builder::new_current_thread()
1533 .enable_all()
1534 .build()
1535 .unwrap();
1536
1537 let result = rt.block_on(async {
1539 inner
1540 .get_opts(
1541 &object_store::path::Path::from("manifests/manifest-000001.json"),
1542 GetOptions::default(),
1543 )
1544 .await
1545 });
1546 assert!(result.is_ok(), "v2 manifest path should exist");
1547
1548 let result = rt.block_on(async {
1550 inner
1551 .get_opts(
1552 &object_store::path::Path::from("manifests/latest.json"),
1553 GetOptions::default(),
1554 )
1555 .await
1556 });
1557 assert!(result.is_ok(), "v2 latest.json should exist");
1558
1559 let result = rt.block_on(async {
1561 inner
1562 .get_opts(
1563 &object_store::path::Path::from("checkpoints/checkpoint_000001/manifest.json"),
1564 GetOptions::default(),
1565 )
1566 .await
1567 });
1568 assert!(result.is_err(), "v1 manifest path should NOT exist");
1569 }
1570
1571 #[test]
1572 fn test_obj_v1_backward_compat_load() {
1573 let inner = Arc::new(object_store::memory::InMemory::new());
1574 write_legacy_manifest(&inner, "", &make_manifest(1, 42));
1575
1576 let store = make_obj_store_shared(inner);
1577
1578 let loaded = store.load_latest().unwrap().unwrap();
1579 assert_eq!(loaded.checkpoint_id, 1);
1580 assert_eq!(loaded.epoch, 42);
1581
1582 let loaded = store.load_by_id(1).unwrap().unwrap();
1583 assert_eq!(loaded.epoch, 42);
1584 }
1585
1586 #[test]
1587 fn test_obj_v1_backward_compat_list() {
1588 let inner = Arc::new(object_store::memory::InMemory::new());
1589 write_legacy_manifest(&inner, "", &make_manifest(1, 10));
1590 write_legacy_manifest(&inner, "", &make_manifest(2, 20));
1591
1592 let store = make_obj_store_shared(inner);
1593 let list = store.list().unwrap();
1594 assert_eq!(list, vec![(1, 10), (2, 20)]);
1595 }
1596
1597 #[test]
1598 fn test_obj_mixed_layout_list() {
1599 let inner = Arc::new(object_store::memory::InMemory::new());
1600 write_legacy_manifest(&inner, "", &make_manifest(1, 10));
1602 let store = make_obj_store_shared(inner);
1604 store.save(&make_manifest(2, 20)).unwrap();
1605
1606 let list = store.list().unwrap();
1607 assert_eq!(list, vec![(1, 10), (2, 20)]);
1608 }
1609
1610 #[test]
1611 fn test_obj_conditional_put_idempotent() {
1612 let store = ObjectStoreCheckpointStore::new(
1613 Arc::new(object_store::memory::InMemory::new()),
1614 String::new(),
1615 10,
1616 )
1617 .unwrap();
1618
1619 let m = make_manifest(1, 10);
1620 store.save(&m).unwrap();
1621
1622 store.save(&m).unwrap();
1624
1625 let loaded = store.load_latest().unwrap().unwrap();
1626 assert_eq!(loaded.checkpoint_id, 1);
1627 assert_eq!(loaded.epoch, 10);
1628 }
1629
1630 #[test]
1631 fn test_obj_update_manifest_overwrites() {
1632 use crate::checkpoint_manifest::SinkCommitStatus;
1633
1634 let store = make_obj_store();
1635
1636 let mut m = make_manifest(1, 10);
1638 m.sink_commit_statuses
1639 .insert("pg-sink".into(), SinkCommitStatus::Pending);
1640 store.save(&m).unwrap();
1641
1642 let loaded = store.load_by_id(1).unwrap().unwrap();
1644 assert_eq!(
1645 loaded.sink_commit_statuses.get("pg-sink"),
1646 Some(&SinkCommitStatus::Pending)
1647 );
1648
1649 m.sink_commit_statuses
1651 .insert("pg-sink".into(), SinkCommitStatus::Committed);
1652 store.update_manifest(&m).unwrap();
1653
1654 let loaded = store.load_by_id(1).unwrap().unwrap();
1656 assert_eq!(
1657 loaded.sink_commit_statuses.get("pg-sink"),
1658 Some(&SinkCommitStatus::Committed)
1659 );
1660 }
1661
1662 #[test]
1663 fn test_obj_save_still_uses_conditional_put() {
1664 let store = make_obj_store();
1665
1666 let m = make_manifest(1, 10);
1667 store.save(&m).unwrap();
1668
1669 store.save(&m).unwrap();
1672
1673 let mut m2 = make_manifest(1, 10);
1675 m2.watermark = Some(42);
1676 store.update_manifest(&m2).unwrap();
1677
1678 let loaded = store.load_by_id(1).unwrap().unwrap();
1679 assert_eq!(loaded.watermark, Some(42));
1680 }
1681
1682 #[test]
1683 fn test_fs_update_manifest_overwrites() {
1684 use crate::checkpoint_manifest::SinkCommitStatus;
1685
1686 let dir = tempfile::tempdir().unwrap();
1687 let store = make_store(dir.path());
1688
1689 let mut m = make_manifest(1, 10);
1690 m.sink_commit_statuses
1691 .insert("sink-a".into(), SinkCommitStatus::Pending);
1692 store.save(&m).unwrap();
1693
1694 m.sink_commit_statuses
1695 .insert("sink-a".into(), SinkCommitStatus::Committed);
1696 store.update_manifest(&m).unwrap();
1697
1698 let loaded = store.load_by_id(1).unwrap().unwrap();
1699 assert_eq!(
1700 loaded.sink_commit_statuses.get("sink-a"),
1701 Some(&SinkCommitStatus::Committed)
1702 );
1703 }
1704
1705 #[test]
1706 fn test_obj_v1_state_backward_compat() {
1707 let inner = Arc::new(object_store::memory::InMemory::new());
1708 let rt = tokio::runtime::Builder::new_current_thread()
1709 .enable_all()
1710 .build()
1711 .unwrap();
1712
1713 let path = object_store::path::Path::from("checkpoints/checkpoint_000001/state.bin");
1715 let data = b"legacy-state-blob";
1716 rt.block_on(async {
1717 inner
1718 .put_opts(
1719 &path,
1720 PutPayload::from_bytes(bytes::Bytes::from_static(data)),
1721 PutOptions::default(),
1722 )
1723 .await
1724 .unwrap();
1725 });
1726
1727 let store = make_obj_store_shared(inner);
1728 let loaded = store.load_state_data(1).unwrap().unwrap();
1729 assert_eq!(loaded, data);
1730 }
1731
1732 #[test]
1733 fn test_obj_v2_state_paths() {
1734 let inner = Arc::new(object_store::memory::InMemory::new());
1735 let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10).unwrap();
1736
1737 store.save(&make_manifest(1, 1)).unwrap();
1738 store.save_state_data(1, b"v2-state").unwrap();
1739
1740 let rt = tokio::runtime::Builder::new_current_thread()
1741 .enable_all()
1742 .build()
1743 .unwrap();
1744
1745 let result = rt.block_on(async {
1747 inner
1748 .get_opts(
1749 &object_store::path::Path::from("checkpoints/state-000001.bin"),
1750 GetOptions::default(),
1751 )
1752 .await
1753 });
1754 assert!(result.is_ok(), "v2 state path should exist");
1755
1756 let result = rt.block_on(async {
1758 inner
1759 .get_opts(
1760 &object_store::path::Path::from("checkpoints/checkpoint_000001/state.bin"),
1761 GetOptions::default(),
1762 )
1763 .await
1764 });
1765 assert!(result.is_err(), "v1 state path should NOT exist");
1766 }
1767
1768 #[test]
1769 fn test_obj_prune_cleans_both_layouts() {
1770 let inner = Arc::new(object_store::memory::InMemory::new());
1771 write_legacy_manifest(&inner, "", &make_manifest(1, 10));
1773 let store = ObjectStoreCheckpointStore::new(inner, String::new(), 10).unwrap();
1775 store.save(&make_manifest(2, 20)).unwrap();
1776 store.save(&make_manifest(3, 30)).unwrap();
1777 store.save(&make_manifest(4, 40)).unwrap();
1778
1779 let removed = store.prune(2).unwrap();
1780 assert_eq!(removed, 2);
1781
1782 let list = store.list().unwrap();
1783 assert_eq!(list, vec![(3, 30), (4, 40)]);
1784 }
1785
1786 #[test]
1787 fn test_obj_latest_json_format() {
1788 let inner = Arc::new(object_store::memory::InMemory::new());
1789 let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10).unwrap();
1790
1791 store.save(&make_manifest(5, 50)).unwrap();
1792
1793 let rt = tokio::runtime::Builder::new_current_thread()
1794 .enable_all()
1795 .build()
1796 .unwrap();
1797 let data = rt.block_on(async {
1798 inner
1799 .get_opts(
1800 &object_store::path::Path::from("manifests/latest.json"),
1801 GetOptions::default(),
1802 )
1803 .await
1804 .unwrap()
1805 .bytes()
1806 .await
1807 .unwrap()
1808 });
1809
1810 let pointer: super::LatestPointer = serde_json::from_slice(&data).unwrap();
1811 assert_eq!(pointer.checkpoint_id, 5);
1812 }
1813
1814 #[test]
1815 fn test_validate_checkpoint_valid() {
1816 let dir = tempfile::tempdir().unwrap();
1817 let store = make_store(dir.path());
1818
1819 let m = make_manifest(1, 1);
1820 store.save(&m).unwrap();
1821
1822 let result = store.validate_checkpoint(1).unwrap();
1823 assert!(result.valid, "valid checkpoint: {:?}", result.issues);
1824 assert!(result.issues.is_empty());
1825 }
1826
1827 #[test]
1828 fn test_validate_checkpoint_epoch_zero_invalid() {
1829 let dir = tempfile::tempdir().unwrap();
1830 let store = make_store(dir.path());
1831
1832 let m = make_manifest(1, 0);
1834 store.save(&m).unwrap();
1835
1836 let result = store.validate_checkpoint(1).unwrap();
1837 assert!(!result.valid, "epoch=0 should be invalid");
1838 assert!(
1839 result.issues.iter().any(|i| i.contains("epoch")),
1840 "should mention epoch: {:?}",
1841 result.issues
1842 );
1843 }
1844
1845 #[test]
1846 fn test_validate_checkpoint_missing_manifest() {
1847 let dir = tempfile::tempdir().unwrap();
1848 let store = make_store(dir.path());
1849
1850 let result = store.validate_checkpoint(99).unwrap();
1851 assert!(!result.valid);
1852 assert!(result.issues[0].contains("not found"));
1853 }
1854
1855 #[test]
1856 fn test_validate_checkpoint_corrupt_manifest() {
1857 let dir = tempfile::tempdir().unwrap();
1858 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1859
1860 let cp_dir = dir.path().join("checkpoints/checkpoint_000001");
1862 std::fs::create_dir_all(&cp_dir).unwrap();
1863 std::fs::write(cp_dir.join("manifest.json"), "not valid json").unwrap();
1864
1865 let result = store.validate_checkpoint(1).unwrap();
1867 assert!(!result.valid);
1868 assert!(
1869 result.issues[0].contains("corrupt manifest"),
1870 "expected corrupt manifest issue: {:?}",
1871 result.issues
1872 );
1873 }
1874
1875 #[test]
1876 fn test_validate_checkpoint_state_checksum_ok() {
1877 let dir = tempfile::tempdir().unwrap();
1878 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1879
1880 let state = b"important operator state";
1881 let m = make_manifest(1, 1);
1882 store.save_with_state(&m, Some(state)).unwrap();
1883
1884 let result = store.validate_checkpoint(1).unwrap();
1885 assert!(result.valid, "checksum should match: {:?}", result.issues);
1886 }
1887
1888 #[test]
1889 fn test_validate_checkpoint_state_checksum_mismatch() {
1890 let dir = tempfile::tempdir().unwrap();
1891 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1892
1893 let state = b"original state";
1895 let m = make_manifest(1, 1);
1896 store.save_with_state(&m, Some(state)).unwrap();
1897
1898 let state_path = dir.path().join("checkpoints/checkpoint_000001/state.bin");
1900 std::fs::write(&state_path, b"corrupted data!!").unwrap();
1901
1902 let result = store.validate_checkpoint(1).unwrap();
1903 assert!(!result.valid, "corrupted state should be invalid");
1904 assert!(
1905 result
1906 .issues
1907 .iter()
1908 .any(|i| i.contains("checksum mismatch")),
1909 "should report checksum mismatch: {:?}",
1910 result.issues
1911 );
1912 }
1913
1914 #[test]
1915 fn test_validate_checkpoint_state_missing_when_expected() {
1916 let dir = tempfile::tempdir().unwrap();
1917 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1918
1919 let m = make_manifest(1, 1);
1921 store.save_with_state(&m, Some(b"state")).unwrap();
1922
1923 let state_path = dir.path().join("checkpoints/checkpoint_000001/state.bin");
1925 std::fs::remove_file(&state_path).unwrap();
1926
1927 let result = store.validate_checkpoint(1).unwrap();
1928 assert!(!result.valid);
1929 assert!(
1930 result.issues.iter().any(|i| i.contains("not found")),
1931 "should report missing state: {:?}",
1932 result.issues
1933 );
1934 }
1935
1936 #[test]
1937 fn test_recover_latest_validated_skips_corrupt() {
1938 let dir = tempfile::tempdir().unwrap();
1939 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1940
1941 store.save(&make_manifest(1, 10)).unwrap();
1943 store.save(&make_manifest(2, 20)).unwrap();
1944
1945 let cp2_manifest = dir
1947 .path()
1948 .join("checkpoints/checkpoint_000002/manifest.json");
1949 std::fs::write(cp2_manifest, "<<<corrupt>>>").unwrap();
1950
1951 let report = store.recover_latest_validated().unwrap();
1953 assert_eq!(report.chosen_id, Some(1));
1954 assert_eq!(report.skipped.len(), 1);
1955 assert_eq!(report.skipped[0].0, 2);
1956 assert_eq!(report.examined, 2);
1957 }
1958
1959 #[test]
1960 fn test_recover_latest_validated_fresh_start() {
1961 let dir = tempfile::tempdir().unwrap();
1962 let store = make_store(dir.path());
1963
1964 let report = store.recover_latest_validated().unwrap();
1965 assert!(report.chosen_id.is_none());
1966 assert_eq!(report.examined, 0);
1967 }
1968
1969 #[test]
1970 fn test_recover_latest_validated_all_corrupt_is_fresh_start() {
1971 let dir = tempfile::tempdir().unwrap();
1972 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1973
1974 store.save(&make_manifest(1, 1)).unwrap();
1976 let cp_manifest = dir
1977 .path()
1978 .join("checkpoints/checkpoint_000001/manifest.json");
1979 std::fs::write(cp_manifest, "corrupt").unwrap();
1980
1981 let report = store.recover_latest_validated().unwrap();
1985 assert!(report.chosen_id.is_none());
1986 }
1987
1988 #[test]
1989 fn test_cleanup_orphans_removes_stateless_dirs() {
1990 let dir = tempfile::tempdir().unwrap();
1991 let store = FileSystemCheckpointStore::new(dir.path(), 10);
1992
1993 let orphan_dir = dir.path().join("checkpoints/checkpoint_000099");
1995 std::fs::create_dir_all(&orphan_dir).unwrap();
1996 std::fs::write(orphan_dir.join("state.bin"), b"orphaned").unwrap();
1997
1998 store.save(&make_manifest(1, 1)).unwrap();
2000
2001 let cleaned = store.cleanup_orphans().unwrap();
2002 assert_eq!(cleaned, 1);
2003
2004 assert!(!orphan_dir.exists());
2006 assert!(store.load_by_id(1).unwrap().is_some());
2008 }
2009
2010 #[test]
2011 fn test_cleanup_orphans_noop_when_clean() {
2012 let dir = tempfile::tempdir().unwrap();
2013 let store = FileSystemCheckpointStore::new(dir.path(), 10);
2014
2015 store.save(&make_manifest(1, 1)).unwrap();
2016 let cleaned = store.cleanup_orphans().unwrap();
2017 assert_eq!(cleaned, 0);
2018 }
2019
2020 #[test]
2021 fn test_save_with_state_writes_checksum() {
2022 let dir = tempfile::tempdir().unwrap();
2023 let store = FileSystemCheckpointStore::new(dir.path(), 10);
2024
2025 let state = b"state-data-for-checksum";
2026 let m = make_manifest(1, 1);
2027 store.save_with_state(&m, Some(state)).unwrap();
2028
2029 let loaded = store.load_latest().unwrap().unwrap();
2030 assert!(
2031 loaded.state_checksum.is_some(),
2032 "state_checksum should be set"
2033 );
2034 let expected = sha256_hex(state);
2035 assert_eq!(loaded.state_checksum.unwrap(), expected);
2036 }
2037
2038 #[test]
2039 fn test_state_checksum_backward_compat() {
2040 let json = r#"{
2042 "version": 1,
2043 "checkpoint_id": 1,
2044 "epoch": 1,
2045 "timestamp_ms": 1000
2046 }"#;
2047 let m: CheckpointManifest = serde_json::from_str(json).unwrap();
2048 assert!(m.state_checksum.is_none());
2049 }
2050
2051 #[test]
2054 fn test_obj_validate_checkpoint_valid() {
2055 let store = make_obj_store();
2056 store.save(&make_manifest(1, 1)).unwrap();
2057
2058 let result = store.validate_checkpoint(1).unwrap();
2059 assert!(result.valid, "valid checkpoint: {:?}", result.issues);
2060 }
2061
2062 #[test]
2063 fn test_obj_validate_checkpoint_missing() {
2064 let store = make_obj_store();
2065 let result = store.validate_checkpoint(99).unwrap();
2066 assert!(!result.valid);
2067 }
2068
2069 #[test]
2070 fn test_obj_validate_state_checksum() {
2071 let store = ObjectStoreCheckpointStore::new(
2072 Arc::new(object_store::memory::InMemory::new()),
2073 String::new(),
2074 10,
2075 )
2076 .unwrap();
2077
2078 let state = b"obj-store-state-data";
2079 let m = make_manifest(1, 1);
2080 store.save_with_state(&m, Some(state)).unwrap();
2081
2082 let result = store.validate_checkpoint(1).unwrap();
2083 assert!(result.valid, "checksum should match: {:?}", result.issues);
2084 }
2085
2086 #[test]
2087 fn test_obj_recover_latest_validated() {
2088 let store = ObjectStoreCheckpointStore::new(
2089 Arc::new(object_store::memory::InMemory::new()),
2090 String::new(),
2091 10,
2092 )
2093 .unwrap();
2094
2095 store.save(&make_manifest(1, 10)).unwrap();
2096 store.save(&make_manifest(2, 20)).unwrap();
2097
2098 let report = store.recover_latest_validated().unwrap();
2099 assert_eq!(report.chosen_id, Some(2));
2100 assert!(report.skipped.is_empty());
2101 }
2102
2103 #[test]
2104 fn test_obj_cleanup_orphans() {
2105 let inner = Arc::new(object_store::memory::InMemory::new());
2106 let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10).unwrap();
2107
2108 let state = b"state-with-manifest";
2110 store
2111 .save_with_state(&make_manifest(1, 1), Some(state))
2112 .unwrap();
2113
2114 let rt = tokio::runtime::Builder::new_current_thread()
2116 .enable_all()
2117 .build()
2118 .unwrap();
2119 let orphan_path = object_store::path::Path::from("checkpoints/state-000099.bin");
2120 rt.block_on(async {
2121 inner
2122 .put_opts(
2123 &orphan_path,
2124 PutPayload::from_bytes(bytes::Bytes::from_static(b"orphan")),
2125 PutOptions::default(),
2126 )
2127 .await
2128 .unwrap();
2129 });
2130
2131 let cleaned = store.cleanup_orphans().unwrap();
2132 assert_eq!(cleaned, 1);
2133
2134 let real_state = store.load_state_data(1).unwrap();
2136 assert!(real_state.is_some());
2137 }
2138}