#![allow(clippy::disallowed_types)]
use std::path::{Path, PathBuf};
use std::sync::Arc;
use async_trait::async_trait;
use object_store::{GetOptions, ObjectStore, ObjectStoreExt, PutMode, PutOptions, PutPayload};
use sha2::{Digest, Sha256};
use tracing::warn;
use crate::checkpoint::checkpoint_manifest::CheckpointManifest;
async fn sync_file(path: &Path) -> Result<(), std::io::Error> {
let f = tokio::fs::OpenOptions::new().write(true).open(path).await?;
f.sync_all().await
}
#[allow(clippy::unnecessary_wraps, clippy::unused_async)] async fn sync_dir(path: &Path) -> Result<(), std::io::Error> {
#[cfg(unix)]
{
let f = tokio::fs::File::open(path).await?;
f.sync_all().await?;
}
#[cfg(not(unix))]
{
let _ = path;
}
Ok(())
}
#[derive(Debug, thiserror::Error)]
pub enum CheckpointStoreError {
#[error("checkpoint I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("checkpoint serialization error: {0}")]
Serde(#[from] serde_json::Error),
#[error("checkpoint {0} not found")]
NotFound(u64),
#[error("object store error: {0}")]
ObjectStore(#[from] object_store::Error),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ValidationIssue {
ManifestWarning(String),
IntegrityFailure(String),
}
impl ValidationIssue {
#[must_use]
pub fn is_fatal(&self) -> bool {
matches!(self, Self::IntegrityFailure(_))
}
#[must_use]
pub fn message(&self) -> &str {
match self {
Self::ManifestWarning(s) | Self::IntegrityFailure(s) => s,
}
}
}
impl std::fmt::Display for ValidationIssue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.message())
}
}
#[derive(Debug, Clone)]
pub struct ValidationResult {
pub checkpoint_id: u64,
pub valid: bool,
pub issues: Vec<ValidationIssue>,
}
#[derive(Debug, Clone)]
pub struct RecoveryReport {
pub chosen_id: Option<u64>,
pub skipped: Vec<(u64, String)>,
pub examined: usize,
pub elapsed: std::time::Duration,
}
fn parse_checkpoint_id_from_path(path: &str, prefix: &str, suffix: &str) -> Option<u64> {
for segment in path.split('/') {
let Some(rest) = segment.strip_prefix(prefix) else {
continue;
};
let Some(id_str) = rest.strip_suffix(suffix) else {
continue;
};
if let Ok(id) = id_str.parse::<u64>() {
return Some(id);
}
warn!(
path,
prefix, suffix, "malformed checkpoint id in object path — skipped"
);
return None;
}
None
}
fn sha256_hex(data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
format!("{:x}", hasher.finalize())
}
fn sha256_hex_chunks(chunks: &[bytes::Bytes]) -> String {
let mut hasher = Sha256::new();
for chunk in chunks {
hasher.update(chunk);
}
format!("{:x}", hasher.finalize())
}
fn sha256_hex_mixed<'a, I>(
states: &std::collections::HashMap<
String,
crate::checkpoint::checkpoint_manifest::OperatorCheckpoint,
>,
sidecar_chunks: I,
) -> String
where
I: IntoIterator<Item = &'a [u8]>,
{
let inline = sha256_hex_inline_states(states);
let mut hasher = Sha256::new();
hasher.update(inline.as_bytes());
for chunk in sidecar_chunks {
hasher.update(chunk);
}
format!("{:x}", hasher.finalize())
}
fn sha256_hex_inline_states(
states: &std::collections::HashMap<
String,
crate::checkpoint::checkpoint_manifest::OperatorCheckpoint,
>,
) -> String {
let mut names: Vec<&String> = states.keys().collect();
names.sort_unstable();
let mut hasher = Sha256::new();
for n in names {
if let Some(op) = states.get(n) {
if op.external {
continue;
}
hasher.update(n.as_bytes());
hasher.update([0u8]);
if let Some(b64) = &op.state_b64 {
hasher.update(b64.as_bytes());
}
hasher.update([0u8]);
}
}
format!("{:x}", hasher.finalize())
}
#[async_trait]
pub trait CheckpointStore: Send + Sync {
fn vnode_count(&self) -> u16 {
crate::checkpoint::checkpoint_manifest::DEFAULT_VNODE_COUNT
}
async fn save(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError>;
async fn load_latest(&self) -> Result<Option<CheckpointManifest>, CheckpointStoreError>;
async fn load_by_id(&self, id: u64)
-> Result<Option<CheckpointManifest>, CheckpointStoreError>;
async fn list(&self) -> Result<Vec<(u64, u64)>, CheckpointStoreError>;
async fn list_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
Ok(self.list().await?.iter().map(|(id, _)| *id).collect())
}
async fn prune(&self, keep_count: usize) -> Result<usize, CheckpointStoreError>;
async fn update_manifest(
&self,
manifest: &CheckpointManifest,
) -> Result<(), CheckpointStoreError> {
self.save(manifest).await
}
async fn save_state_data(
&self,
id: u64,
chunks: &[bytes::Bytes],
) -> Result<(), CheckpointStoreError>;
async fn load_state_data(&self, id: u64) -> Result<Option<Vec<u8>>, CheckpointStoreError>;
async fn validate_checkpoint(&self, id: u64) -> Result<ValidationResult, CheckpointStoreError> {
let mut issues = Vec::new();
let manifest = match self.load_by_id(id).await {
Ok(Some(m)) => m,
Ok(None) => {
return Ok(ValidationResult {
checkpoint_id: id,
valid: false,
issues: vec![ValidationIssue::IntegrityFailure(format!(
"manifest not found for checkpoint {id}"
))],
});
}
Err(CheckpointStoreError::Serde(e)) => {
return Ok(ValidationResult {
checkpoint_id: id,
valid: false,
issues: vec![ValidationIssue::IntegrityFailure(format!(
"corrupt manifest: {e}"
))],
});
}
Err(e) => return Err(e),
};
for err in manifest.validate(self.vnode_count()) {
issues.push(ValidationIssue::ManifestWarning(format!(
"manifest validation: {err}"
)));
}
if let Some(expected) = &manifest.state_checksum {
let any_inline = manifest.operator_states.values().any(|o| !o.external);
let any_external = manifest.operator_states.values().any(|o| o.external);
let needs_sidecar = any_external || !any_inline;
let sidecar = if needs_sidecar {
self.load_state_data(id).await?
} else {
None
};
let actual = match (any_inline, &sidecar) {
(true, Some(data)) => {
sha256_hex_mixed(&manifest.operator_states, std::iter::once(data.as_slice()))
}
(true, None) if !any_external => {
sha256_hex_inline_states(&manifest.operator_states)
}
(_, Some(data)) => sha256_hex(data),
(_, None) => {
issues.push(ValidationIssue::IntegrityFailure(
"state.bin referenced by checksum but not found".into(),
));
String::new()
}
};
if !actual.is_empty() && actual != *expected {
let label = if any_inline && any_external {
"mixed state checksum mismatch"
} else if any_inline {
"inline state checksum mismatch"
} else {
"state.bin checksum mismatch"
};
issues.push(ValidationIssue::IntegrityFailure(format!(
"{label}: expected {expected}, got {actual}"
)));
}
}
if manifest.epoch == 0 || manifest.checkpoint_id == 0 {
issues.push(ValidationIssue::IntegrityFailure(
"epoch or checkpoint_id is 0 — likely corrupted".into(),
));
}
let valid = issues.iter().all(|i| !i.is_fatal());
Ok(ValidationResult {
checkpoint_id: id,
valid,
issues,
})
}
async fn recover_latest_validated(&self) -> Result<RecoveryReport, CheckpointStoreError> {
let start = std::time::Instant::now();
let mut skipped = Vec::new();
let mut ids = self.list_ids().await?;
ids.reverse();
let examined = ids.len();
for id in &ids {
let result = self.validate_checkpoint(*id).await?;
if result.valid {
return Ok(RecoveryReport {
chosen_id: Some(*id),
skipped,
examined,
elapsed: start.elapsed(),
});
}
let reason = result
.issues
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join("; ");
warn!(
checkpoint_id = id,
reason = %reason,
"skipping invalid checkpoint"
);
skipped.push((*id, reason));
}
Ok(RecoveryReport {
chosen_id: None,
skipped,
examined,
elapsed: start.elapsed(),
})
}
async fn cleanup_orphans(&self) -> Result<usize, CheckpointStoreError> {
Ok(0)
}
async fn save_with_state(
&self,
manifest: &CheckpointManifest,
state_data: Option<&[bytes::Bytes]>,
) -> Result<(), CheckpointStoreError> {
let mut manifest = manifest.clone();
if let Some(chunks) = state_data {
manifest.state_checksum = Some(stamp_checksum(&manifest.operator_states, Some(chunks)));
self.save_state_data(manifest.checkpoint_id, chunks).await?;
} else if !manifest.operator_states.is_empty()
&& manifest.operator_states.values().all(|o| !o.external)
&& manifest.state_checksum.is_none()
{
manifest.state_checksum = Some(sha256_hex_inline_states(&manifest.operator_states));
}
self.save(&manifest).await
}
}
fn stamp_checksum(
states: &std::collections::HashMap<
String,
crate::checkpoint::checkpoint_manifest::OperatorCheckpoint,
>,
chunks: Option<&[bytes::Bytes]>,
) -> String {
let chunks = chunks.unwrap_or_default();
let any_inline = states.values().any(|o| !o.external);
if any_inline {
sha256_hex_mixed(states, chunks.iter().map(AsRef::as_ref))
} else {
sha256_hex_chunks(chunks)
}
}
pub struct FileSystemCheckpointStore {
base_dir: PathBuf,
max_retained: usize,
vnode_count: u16,
}
impl FileSystemCheckpointStore {
#[must_use]
pub fn new(base_dir: impl Into<PathBuf>, max_retained: usize) -> Self {
Self {
base_dir: base_dir.into(),
max_retained,
vnode_count: crate::checkpoint::checkpoint_manifest::DEFAULT_VNODE_COUNT,
}
}
#[must_use]
pub fn with_vnode_count(mut self, vnode_count: u16) -> Self {
self.vnode_count = vnode_count;
self
}
fn checkpoints_dir(&self) -> PathBuf {
self.base_dir.join("checkpoints")
}
fn checkpoint_dir(&self, id: u64) -> PathBuf {
self.checkpoints_dir().join(format!("checkpoint_{id:06}"))
}
fn manifest_path(&self, id: u64) -> PathBuf {
self.checkpoint_dir(id).join("manifest.json")
}
fn state_path(&self, id: u64) -> PathBuf {
self.checkpoint_dir(id).join("state.bin")
}
fn latest_path(&self) -> PathBuf {
self.checkpoints_dir().join("latest.txt")
}
fn parse_checkpoint_id(name: &str) -> Option<u64> {
name.strip_prefix("checkpoint_")
.and_then(|s| s.parse().ok())
}
async fn sorted_checkpoint_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
let dir = self.checkpoints_dir();
let mut reader = match tokio::fs::read_dir(&dir).await {
Ok(r) => r,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
Err(e) => return Err(e.into()),
};
let mut ids: Vec<u64> = Vec::new();
while let Some(entry) = reader.next_entry().await? {
let ft = entry.file_type().await?;
if !ft.is_dir() {
continue;
}
if let Some(id) = entry
.file_name()
.to_str()
.and_then(Self::parse_checkpoint_id)
{
ids.push(id);
}
}
ids.sort_unstable();
Ok(ids)
}
}
impl FileSystemCheckpointStore {
async fn find_orphan_dirs(&self) -> Result<Vec<PathBuf>, CheckpointStoreError> {
let dir = self.checkpoints_dir();
let mut reader = match tokio::fs::read_dir(&dir).await {
Ok(r) => r,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
Err(e) => return Err(e.into()),
};
let mut orphans = Vec::new();
while let Some(entry) = reader.next_entry().await? {
let ft = entry.file_type().await?;
if !ft.is_dir() {
continue;
}
let path = entry.path();
let has_state = tokio::fs::metadata(path.join("state.bin")).await.is_ok();
let has_manifest = tokio::fs::metadata(path.join("manifest.json"))
.await
.is_ok();
if has_state && !has_manifest {
orphans.push(path);
}
}
Ok(orphans)
}
}
#[async_trait]
impl CheckpointStore for FileSystemCheckpointStore {
fn vnode_count(&self) -> u16 {
self.vnode_count
}
async fn save(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError> {
let cp_dir = self.checkpoint_dir(manifest.checkpoint_id);
tokio::fs::create_dir_all(&cp_dir).await?;
let manifest_path = self.manifest_path(manifest.checkpoint_id);
let json = serde_json::to_string_pretty(manifest)?;
let tmp_path = manifest_path.with_extension("json.tmp");
let write_res = async {
tokio::fs::write(&tmp_path, &json).await?;
sync_file(&tmp_path).await?;
tokio::fs::rename(&tmp_path, &manifest_path).await?;
sync_dir(&cp_dir).await
}
.await;
if let Err(e) = write_res {
let _ = tokio::fs::remove_file(&tmp_path).await;
return Err(e.into());
}
let latest = self.latest_path();
let latest_dir = latest.parent().unwrap_or(Path::new(".")).to_path_buf();
tokio::fs::create_dir_all(&latest_dir).await?;
let latest_content = format!("checkpoint_{:06}", manifest.checkpoint_id);
let tmp_latest = latest.with_extension("txt.tmp");
tokio::fs::write(&tmp_latest, &latest_content).await?;
sync_file(&tmp_latest).await?;
tokio::fs::rename(&tmp_latest, &latest).await?;
sync_dir(&latest_dir).await?;
if self.max_retained > 0 {
if let Err(e) = self.prune(self.max_retained).await {
tracing::warn!(
max_retained = self.max_retained,
error = %e,
"[LDB-6009] Checkpoint prune failed — old checkpoints may accumulate on disk"
);
}
}
Ok(())
}
async fn load_latest(&self) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
let latest = self.latest_path();
let content = match tokio::fs::read_to_string(&latest).await {
Ok(c) => c,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(e.into()),
};
let dir_name = content.trim();
if dir_name.is_empty() {
return Ok(None);
}
match Self::parse_checkpoint_id(dir_name) {
Some(id) => self.load_by_id(id).await,
None => Ok(None),
}
}
async fn load_by_id(
&self,
id: u64,
) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
let path = self.manifest_path(id);
let json = match tokio::fs::read_to_string(&path).await {
Ok(s) => s,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(e.into()),
};
let manifest: CheckpointManifest = serde_json::from_str(&json)?;
let errors = manifest.validate(self.vnode_count());
if !errors.is_empty() {
tracing::warn!(
checkpoint_id = id,
error_count = errors.len(),
first_error = %errors[0],
"loaded checkpoint manifest has validation warnings"
);
}
Ok(Some(manifest))
}
async fn list_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
self.sorted_checkpoint_ids().await
}
async fn list(&self) -> Result<Vec<(u64, u64)>, CheckpointStoreError> {
let ids = self.sorted_checkpoint_ids().await?;
let mut result = Vec::with_capacity(ids.len());
for id in ids {
if let Ok(Some(manifest)) = self.load_by_id(id).await {
result.push((manifest.checkpoint_id, manifest.epoch));
}
}
Ok(result)
}
async fn prune(&self, keep_count: usize) -> Result<usize, CheckpointStoreError> {
let ids = self.sorted_checkpoint_ids().await?;
if ids.len() <= keep_count {
return Ok(0);
}
let to_remove = ids.len() - keep_count;
let mut removed = 0;
for &id in &ids[..to_remove] {
let dir = self.checkpoint_dir(id);
if tokio::fs::remove_dir_all(&dir).await.is_ok() {
removed += 1;
}
}
Ok(removed)
}
async fn save_state_data(
&self,
id: u64,
chunks: &[bytes::Bytes],
) -> Result<(), CheckpointStoreError> {
use tokio::io::AsyncWriteExt;
let cp_dir = self.checkpoint_dir(id);
tokio::fs::create_dir_all(&cp_dir).await?;
let path = self.state_path(id);
let tmp = path.with_extension("bin.tmp");
let mut file = tokio::fs::File::create(&tmp).await?;
for chunk in chunks {
file.write_all(chunk).await?;
}
file.sync_all().await?;
drop(file);
tokio::fs::rename(&tmp, &path).await?;
sync_dir(&cp_dir).await?;
Ok(())
}
async fn save_with_state(
&self,
manifest: &CheckpointManifest,
state_data: Option<&[bytes::Bytes]>,
) -> Result<(), CheckpointStoreError> {
let mut manifest = manifest.clone();
if let Some(chunks) = state_data {
manifest.state_checksum = Some(stamp_checksum(&manifest.operator_states, Some(chunks)));
self.save_state_data(manifest.checkpoint_id, chunks).await?;
} else if !manifest.operator_states.is_empty()
&& manifest.operator_states.values().all(|o| !o.external)
&& manifest.state_checksum.is_none()
{
manifest.state_checksum = Some(sha256_hex_inline_states(&manifest.operator_states));
}
self.save(&manifest).await
}
async fn cleanup_orphans(&self) -> Result<usize, CheckpointStoreError> {
let orphans = self.find_orphan_dirs().await?;
let mut cleaned = 0;
for dir in &orphans {
if tokio::fs::remove_dir_all(dir).await.is_ok() {
tracing::info!(
path = %dir.display(),
"cleaned up orphaned checkpoint directory"
);
cleaned += 1;
}
}
Ok(cleaned)
}
async fn load_state_data(&self, id: u64) -> Result<Option<Vec<u8>>, CheckpointStoreError> {
let path = self.state_path(id);
match tokio::fs::read(&path).await {
Ok(data) => Ok(Some(data)),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(e.into()),
}
}
}
#[derive(serde::Serialize, serde::Deserialize)]
struct LatestPointer {
checkpoint_id: u64,
}
pub struct ObjectStoreCheckpointStore {
store: Arc<dyn ObjectStore>,
prefix: String,
max_retained: usize,
vnode_count: u16,
}
impl ObjectStoreCheckpointStore {
#[must_use]
pub fn new(store: Arc<dyn ObjectStore>, prefix: String, max_retained: usize) -> Self {
Self {
store,
prefix,
max_retained,
vnode_count: crate::checkpoint::checkpoint_manifest::DEFAULT_VNODE_COUNT,
}
}
#[must_use]
pub fn with_vnode_count(mut self, vnode_count: u16) -> Self {
self.vnode_count = vnode_count;
self
}
fn manifest_path(&self, id: u64) -> object_store::path::Path {
object_store::path::Path::from(format!("{}manifests/manifest-{id:06}.json", self.prefix))
}
fn latest_pointer_path(&self) -> object_store::path::Path {
object_store::path::Path::from(format!("{}manifests/latest.json", self.prefix))
}
fn state_path(&self, id: u64) -> object_store::path::Path {
object_store::path::Path::from(format!("{}checkpoints/state-{id:06}.bin", self.prefix))
}
async fn put_with_retry(
&self,
path: &object_store::path::Path,
payload: PutPayload,
opts: &PutOptions,
) -> Result<(), CheckpointStoreError> {
const BACKOFFS_MS: &[u64] = &[100, 500, 2000];
let mut attempt = 0usize;
loop {
let result = self
.store
.put_opts(path, payload.clone(), opts.clone())
.await;
match result {
Ok(_) => return Ok(()),
Err(object_store::Error::Generic { .. }) if attempt < BACKOFFS_MS.len() => {
let delay = std::time::Duration::from_millis(BACKOFFS_MS[attempt]);
tracing::warn!(
path = %path,
attempt = attempt + 1,
delay_ms = delay.as_millis(),
"transient put error, retrying"
);
tokio::time::sleep(delay).await;
attempt += 1;
}
Err(e) => return Err(CheckpointStoreError::ObjectStore(e)),
}
}
}
async fn get_bytes(
&self,
path: &object_store::path::Path,
) -> Result<Option<bytes::Bytes>, CheckpointStoreError> {
match self.store.get_opts(path, GetOptions::default()).await {
Ok(get_result) => {
let data = get_result.bytes().await?;
Ok(Some(data))
}
Err(object_store::Error::NotFound { .. }) => Ok(None),
Err(e) => Err(CheckpointStoreError::ObjectStore(e)),
}
}
async fn load_manifest_at(
&self,
path: &object_store::path::Path,
) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
match self.get_bytes(path).await? {
Some(data) => {
let manifest: CheckpointManifest = serde_json::from_slice(&data)?;
Ok(Some(manifest))
}
None => Ok(None),
}
}
async fn list_checkpoint_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
use futures::TryStreamExt;
let mut ids = std::collections::BTreeSet::new();
let manifests_prefix = object_store::path::Path::from(format!("{}manifests/", self.prefix));
let entries: Vec<_> = self
.store
.list(Some(&manifests_prefix))
.try_collect()
.await?;
for entry in &entries {
if let Some(id) =
parse_checkpoint_id_from_path(entry.location.as_ref(), "manifest-", ".json")
{
ids.insert(id);
}
}
Ok(ids.into_iter().collect())
}
}
#[async_trait]
impl CheckpointStore for ObjectStoreCheckpointStore {
fn vnode_count(&self) -> u16 {
self.vnode_count
}
async fn save(&self, manifest: &CheckpointManifest) -> Result<(), CheckpointStoreError> {
let json = serde_json::to_string_pretty(manifest)?;
let path = self.manifest_path(manifest.checkpoint_id);
let json_bytes = bytes::Bytes::from(json);
let create_opts = PutOptions {
mode: PutMode::Create,
..PutOptions::default()
};
let result = self
.store
.put_opts(
&path,
PutPayload::from_bytes(json_bytes.clone()),
create_opts,
)
.await;
match result {
Ok(_) => {}
Err(object_store::Error::AlreadyExists { .. }) => {
tracing::warn!(
checkpoint_id = manifest.checkpoint_id,
"[LDB-6010] Manifest already exists — skipping write"
);
}
Err(object_store::Error::NotImplemented { .. }) => {
self.store
.put_opts(
&path,
PutPayload::from_bytes(json_bytes),
PutOptions::default(),
)
.await?;
}
Err(e) => return Err(CheckpointStoreError::ObjectStore(e)),
}
let latest = self.latest_pointer_path();
if let Some(current) = self.get_bytes(&latest).await? {
if let Ok(existing) = serde_json::from_slice::<LatestPointer>(¤t) {
if existing.checkpoint_id > manifest.checkpoint_id {
tracing::warn!(
current = existing.checkpoint_id,
ours = manifest.checkpoint_id,
"[LDB-6010] latest.json already points at a newer checkpoint — \
skipping pointer update (possible split-brain or delayed writer)"
);
return Ok(());
}
}
}
let pointer = serde_json::to_string(&LatestPointer {
checkpoint_id: manifest.checkpoint_id,
})?;
self.put_with_retry(
&latest,
PutPayload::from_bytes(bytes::Bytes::from(pointer)),
&PutOptions::default(),
)
.await?;
if self.max_retained > 0 {
if let Err(e) = self.prune(self.max_retained).await {
tracing::warn!(
max_retained = self.max_retained,
error = %e,
"[LDB-6009] Object store checkpoint prune failed"
);
}
}
Ok(())
}
async fn update_manifest(
&self,
manifest: &CheckpointManifest,
) -> Result<(), CheckpointStoreError> {
let json = serde_json::to_string_pretty(manifest)?;
let path = self.manifest_path(manifest.checkpoint_id);
let payload = PutPayload::from_bytes(bytes::Bytes::from(json));
self.store
.put_opts(&path, payload, PutOptions::default())
.await?;
Ok(())
}
async fn load_latest(&self) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
if let Some(data) = self.get_bytes(&self.latest_pointer_path()).await? {
let pointer: LatestPointer = serde_json::from_slice(&data)?;
return self.load_by_id(pointer.checkpoint_id).await;
}
Ok(None)
}
async fn load_by_id(
&self,
id: u64,
) -> Result<Option<CheckpointManifest>, CheckpointStoreError> {
self.load_manifest_at(&self.manifest_path(id)).await
}
async fn list_ids(&self) -> Result<Vec<u64>, CheckpointStoreError> {
self.list_checkpoint_ids().await
}
async fn list(&self) -> Result<Vec<(u64, u64)>, CheckpointStoreError> {
let ids = self.list_checkpoint_ids().await?;
let mut result = Vec::with_capacity(ids.len());
for id in ids {
if let Ok(Some(manifest)) = self.load_by_id(id).await {
result.push((manifest.checkpoint_id, manifest.epoch));
}
}
Ok(result)
}
async fn prune(&self, keep_count: usize) -> Result<usize, CheckpointStoreError> {
let ids = self.list_checkpoint_ids().await?;
if ids.len() <= keep_count {
return Ok(0);
}
let to_remove = ids.len() - keep_count;
let mut removed = 0;
let mut logged_error = false;
for &id in &ids[..to_remove] {
let manifest = self.manifest_path(id);
let state = self.state_path(id);
let manifest_res = self.store.delete(&manifest).await;
let state_res = self.store.delete(&state).await;
let manifest_ok = matches!(
manifest_res,
Ok(()) | Err(object_store::Error::NotFound { .. })
);
if manifest_ok {
removed += 1;
}
for err in [manifest_res, state_res]
.into_iter()
.filter_map(Result::err)
{
if matches!(err, object_store::Error::NotFound { .. }) {
continue;
}
if !logged_error {
tracing::warn!(
checkpoint_id = id,
error = %err,
"[LDB-6027] checkpoint prune: delete failed — \
retained objects may accumulate"
);
logged_error = true;
}
}
}
Ok(removed)
}
async fn save_state_data(
&self,
id: u64,
chunks: &[bytes::Bytes],
) -> Result<(), CheckpointStoreError> {
let path = self.state_path(id);
let payload: PutPayload = chunks.iter().cloned().collect();
self.put_with_retry(&path, payload, &PutOptions::default())
.await
}
async fn load_state_data(&self, id: u64) -> Result<Option<Vec<u8>>, CheckpointStoreError> {
Ok(self
.get_bytes(&self.state_path(id))
.await?
.map(|d| d.to_vec()))
}
async fn cleanup_orphans(&self) -> Result<usize, CheckpointStoreError> {
use futures::{StreamExt, TryStreamExt};
let manifest_ids: std::collections::BTreeSet<u64> =
self.list_checkpoint_ids().await?.into_iter().collect();
let state_prefix = object_store::path::Path::from(format!("{}checkpoints/", self.prefix));
let entries: Vec<_> = self.store.list(Some(&state_prefix)).try_collect().await?;
let mut orphan_paths = Vec::new();
for entry in &entries {
if let Some(id) =
parse_checkpoint_id_from_path(entry.location.as_ref(), "state-", ".bin")
{
if !manifest_ids.contains(&id) {
orphan_paths.push(entry.location.clone());
}
}
}
let count = orphan_paths.len();
if !orphan_paths.is_empty() {
let stream = futures::stream::iter(orphan_paths.into_iter().map(Ok)).boxed();
let mut results = self.store.delete_stream(stream);
while let Some(result) = results.next().await {
if let Err(e) = result {
tracing::warn!(error = %e, "failed to delete orphan state file");
}
}
}
Ok(count)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::checkpoint::checkpoint_manifest::{ConnectorCheckpoint, OperatorCheckpoint};
#[allow(clippy::disallowed_types)] use std::collections::HashMap;
fn make_store(dir: &Path) -> FileSystemCheckpointStore {
FileSystemCheckpointStore::new(dir, 3)
}
fn make_manifest(id: u64, epoch: u64) -> CheckpointManifest {
CheckpointManifest::new(id, epoch)
}
#[tokio::test]
async fn test_save_and_load_latest() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let m = make_manifest(1, 1);
store.save(&m).await.unwrap();
let loaded = store.load_latest().await.unwrap().unwrap();
assert_eq!(loaded.checkpoint_id, 1);
assert_eq!(loaded.epoch, 1);
}
#[tokio::test]
async fn test_load_latest_returns_none_when_empty() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
assert!(store.load_latest().await.unwrap().is_none());
}
#[tokio::test]
async fn test_load_latest_returns_most_recent() {
let dir = tempfile::tempdir().unwrap();
let store = FileSystemCheckpointStore::new(dir.path(), 10);
for i in 1..=5 {
store.save(&make_manifest(i, i)).await.unwrap();
}
let latest = store.load_latest().await.unwrap().unwrap();
assert_eq!(latest.checkpoint_id, 5);
assert_eq!(latest.epoch, 5);
}
#[tokio::test]
async fn test_load_by_id() {
let dir = tempfile::tempdir().unwrap();
let store = FileSystemCheckpointStore::new(dir.path(), 10);
store.save(&make_manifest(1, 10)).await.unwrap();
store.save(&make_manifest(2, 20)).await.unwrap();
let m = store.load_by_id(1).await.unwrap().unwrap();
assert_eq!(m.epoch, 10);
let m = store.load_by_id(2).await.unwrap().unwrap();
assert_eq!(m.epoch, 20);
assert!(store.load_by_id(99).await.unwrap().is_none());
}
#[tokio::test]
async fn test_list() {
let dir = tempfile::tempdir().unwrap();
let store = FileSystemCheckpointStore::new(dir.path(), 10);
store.save(&make_manifest(1, 10)).await.unwrap();
store.save(&make_manifest(3, 30)).await.unwrap();
store.save(&make_manifest(2, 20)).await.unwrap();
let list = store.list().await.unwrap();
assert_eq!(list, vec![(1, 10), (2, 20), (3, 30)]);
}
#[tokio::test]
async fn test_prune_keeps_max() {
let dir = tempfile::tempdir().unwrap();
let store = FileSystemCheckpointStore::new(dir.path(), 10);
for i in 1..=5 {
store.save(&make_manifest(i, i)).await.unwrap();
}
let removed = store.prune(2).await.unwrap();
assert_eq!(removed, 3);
let list = store.list().await.unwrap();
assert_eq!(list.len(), 2);
assert_eq!(list[0].0, 4);
assert_eq!(list[1].0, 5);
}
#[tokio::test]
async fn test_auto_prune_on_save() {
let dir = tempfile::tempdir().unwrap();
let store = FileSystemCheckpointStore::new(dir.path(), 2);
for i in 1..=5 {
store.save(&make_manifest(i, i)).await.unwrap();
}
let list = store.list().await.unwrap();
assert_eq!(list.len(), 2);
assert_eq!(list[0].0, 4);
assert_eq!(list[1].0, 5);
}
#[tokio::test]
async fn test_save_and_load_state_data() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
store.save(&make_manifest(1, 1)).await.unwrap();
let data = b"large operator state binary blob";
store
.save_state_data(1, &[bytes::Bytes::from_static(data)])
.await
.unwrap();
let loaded = store.load_state_data(1).await.unwrap().unwrap();
assert_eq!(loaded, data);
}
#[tokio::test]
async fn test_load_state_data_returns_none() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
assert!(store.load_state_data(99).await.unwrap().is_none());
}
#[tokio::test]
async fn test_full_manifest_round_trip() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let mut m = make_manifest(1, 5);
m.source_offsets.insert(
"kafka-src".into(),
ConnectorCheckpoint::with_offsets(
5,
HashMap::from([("0".into(), "1000".into()), ("1".into(), "2000".into())]),
),
);
m.sink_epochs.insert("pg-sink".into(), 4);
m.table_offsets.insert(
"instruments".into(),
ConnectorCheckpoint::with_offsets(5, HashMap::from([("lsn".into(), "0/AB".into())])),
);
m.operator_states
.insert("window".into(), OperatorCheckpoint::inline(b"data"));
m.watermark = Some(999_000);
store.save(&m).await.unwrap();
let loaded = store.load_latest().await.unwrap().unwrap();
assert_eq!(loaded.checkpoint_id, 1);
assert_eq!(loaded.epoch, 5);
assert_eq!(loaded.watermark, Some(999_000));
let src = loaded.source_offsets.get("kafka-src").unwrap();
assert_eq!(src.offsets.get("0"), Some(&"1000".into()));
assert_eq!(loaded.sink_epochs.get("pg-sink"), Some(&4));
let tbl = loaded.table_offsets.get("instruments").unwrap();
assert_eq!(tbl.offsets.get("lsn"), Some(&"0/AB".into()));
let op = loaded.operator_states.get("window").unwrap();
assert_eq!(op.decode_inline().unwrap(), b"data");
}
#[tokio::test]
async fn test_empty_latest_txt() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let cp_dir = dir.path().join("checkpoints");
std::fs::create_dir_all(&cp_dir).unwrap();
std::fs::write(cp_dir.join("latest.txt"), "").unwrap();
assert!(store.load_latest().await.unwrap().is_none());
}
#[tokio::test]
async fn test_latest_points_to_missing_checkpoint() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let cp_dir = dir.path().join("checkpoints");
std::fs::create_dir_all(&cp_dir).unwrap();
std::fs::write(cp_dir.join("latest.txt"), "checkpoint_000099").unwrap();
assert!(store.load_latest().await.unwrap().is_none());
}
#[tokio::test]
async fn test_prune_no_op_when_under_limit() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
store.save(&make_manifest(1, 1)).await.unwrap();
let removed = store.prune(5).await.unwrap();
assert_eq!(removed, 0);
}
#[tokio::test]
async fn test_save_with_state_writes_sidecar_before_manifest() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let m = make_manifest(1, 1);
let state = b"large-operator-state-blob";
store
.save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
.await
.unwrap();
let loaded = store.load_latest().await.unwrap().unwrap();
assert_eq!(loaded.checkpoint_id, 1);
let loaded_state = store.load_state_data(1).await.unwrap().unwrap();
assert_eq!(loaded_state, state);
}
#[tokio::test]
async fn test_save_with_state_none_is_same_as_save() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let m = make_manifest(1, 1);
store.save_with_state(&m, None).await.unwrap();
let loaded = store.load_latest().await.unwrap().unwrap();
assert_eq!(loaded.checkpoint_id, 1);
assert!(store.load_state_data(1).await.unwrap().is_none());
}
#[tokio::test]
async fn test_orphaned_state_without_manifest_is_ignored() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
store
.save_state_data(1, &[bytes::Bytes::from_static(b"orphaned")])
.await
.unwrap();
assert!(store.load_latest().await.unwrap().is_none());
assert!(store.list().await.unwrap().is_empty());
}
fn make_obj_store() -> ObjectStoreCheckpointStore {
let store = Arc::new(object_store::memory::InMemory::new());
ObjectStoreCheckpointStore::new(store, String::new(), 3)
}
#[tokio::test]
async fn test_obj_save_and_load_latest() {
let store = make_obj_store();
let m = make_manifest(1, 1);
store.save(&m).await.unwrap();
let loaded = store.load_latest().await.unwrap().unwrap();
assert_eq!(loaded.checkpoint_id, 1);
assert_eq!(loaded.epoch, 1);
}
#[tokio::test]
async fn test_obj_load_latest_returns_none_when_empty() {
let store = make_obj_store();
assert!(store.load_latest().await.unwrap().is_none());
}
#[tokio::test]
async fn test_obj_load_by_id() {
let store = ObjectStoreCheckpointStore::new(
Arc::new(object_store::memory::InMemory::new()),
String::new(),
10,
);
store.save(&make_manifest(1, 10)).await.unwrap();
store.save(&make_manifest(2, 20)).await.unwrap();
let m = store.load_by_id(1).await.unwrap().unwrap();
assert_eq!(m.epoch, 10);
let m = store.load_by_id(2).await.unwrap().unwrap();
assert_eq!(m.epoch, 20);
assert!(store.load_by_id(99).await.unwrap().is_none());
}
#[tokio::test]
async fn test_obj_list() {
let store = ObjectStoreCheckpointStore::new(
Arc::new(object_store::memory::InMemory::new()),
String::new(),
10,
);
store.save(&make_manifest(1, 10)).await.unwrap();
store.save(&make_manifest(3, 30)).await.unwrap();
store.save(&make_manifest(2, 20)).await.unwrap();
let list = store.list().await.unwrap();
assert_eq!(list, vec![(1, 10), (2, 20), (3, 30)]);
}
#[tokio::test]
async fn test_obj_prune() {
let store = ObjectStoreCheckpointStore::new(
Arc::new(object_store::memory::InMemory::new()),
String::new(),
10,
);
for i in 1..=5 {
store.save(&make_manifest(i, i)).await.unwrap();
}
let removed = store.prune(2).await.unwrap();
assert_eq!(removed, 3);
let list = store.list().await.unwrap();
assert_eq!(list.len(), 2);
assert_eq!(list[0].0, 4);
assert_eq!(list[1].0, 5);
}
#[tokio::test]
async fn test_obj_auto_prune_on_save() {
let store = ObjectStoreCheckpointStore::new(
Arc::new(object_store::memory::InMemory::new()),
String::new(),
2,
);
for i in 1..=5 {
store.save(&make_manifest(i, i)).await.unwrap();
}
let list = store.list().await.unwrap();
assert_eq!(list.len(), 2);
assert_eq!(list[0].0, 4);
assert_eq!(list[1].0, 5);
}
#[tokio::test]
async fn test_obj_save_and_load_state_data() {
let store = make_obj_store();
store.save(&make_manifest(1, 1)).await.unwrap();
let data = b"large operator state binary blob";
store
.save_state_data(1, &[bytes::Bytes::from_static(data)])
.await
.unwrap();
let loaded = store.load_state_data(1).await.unwrap().unwrap();
assert_eq!(loaded, data);
}
#[tokio::test]
async fn test_obj_load_state_data_returns_none() {
let store = make_obj_store();
assert!(store.load_state_data(99).await.unwrap().is_none());
}
#[tokio::test]
async fn test_obj_with_prefix() {
let inner = Arc::new(object_store::memory::InMemory::new());
let store = ObjectStoreCheckpointStore::new(inner, "nodes/abc123/".to_string(), 10);
store.save(&make_manifest(1, 42)).await.unwrap();
let loaded = store.load_latest().await.unwrap().unwrap();
assert_eq!(loaded.checkpoint_id, 1);
assert_eq!(loaded.epoch, 42);
}
#[tokio::test]
async fn test_obj_layout_paths() {
let inner = Arc::new(object_store::memory::InMemory::new());
let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
store.save(&make_manifest(1, 10)).await.unwrap();
let result = inner
.get_opts(
&object_store::path::Path::from("manifests/manifest-000001.json"),
GetOptions::default(),
)
.await;
assert!(result.is_ok(), "manifest path should exist");
let result = inner
.get_opts(
&object_store::path::Path::from("manifests/latest.json"),
GetOptions::default(),
)
.await;
assert!(result.is_ok(), "latest.json should exist");
}
#[tokio::test]
async fn test_obj_conditional_put_idempotent() {
let store = ObjectStoreCheckpointStore::new(
Arc::new(object_store::memory::InMemory::new()),
String::new(),
10,
);
let m = make_manifest(1, 10);
store.save(&m).await.unwrap();
store.save(&m).await.unwrap();
let loaded = store.load_latest().await.unwrap().unwrap();
assert_eq!(loaded.checkpoint_id, 1);
assert_eq!(loaded.epoch, 10);
}
#[tokio::test]
async fn test_obj_update_manifest_overwrites() {
use crate::checkpoint::checkpoint_manifest::SinkCommitStatus;
let store = make_obj_store();
let mut m = make_manifest(1, 10);
m.sink_commit_statuses
.insert("pg-sink".into(), SinkCommitStatus::Pending);
store.save(&m).await.unwrap();
let loaded = store.load_by_id(1).await.unwrap().unwrap();
assert_eq!(
loaded.sink_commit_statuses.get("pg-sink"),
Some(&SinkCommitStatus::Pending)
);
m.sink_commit_statuses
.insert("pg-sink".into(), SinkCommitStatus::Committed);
store.update_manifest(&m).await.unwrap();
let loaded = store.load_by_id(1).await.unwrap().unwrap();
assert_eq!(
loaded.sink_commit_statuses.get("pg-sink"),
Some(&SinkCommitStatus::Committed)
);
}
#[tokio::test]
async fn test_obj_save_still_uses_conditional_put() {
let store = make_obj_store();
let m = make_manifest(1, 10);
store.save(&m).await.unwrap();
store.save(&m).await.unwrap();
let mut m2 = make_manifest(1, 10);
m2.watermark = Some(42);
store.update_manifest(&m2).await.unwrap();
let loaded = store.load_by_id(1).await.unwrap().unwrap();
assert_eq!(loaded.watermark, Some(42));
}
#[tokio::test]
async fn test_fs_update_manifest_overwrites() {
use crate::checkpoint::checkpoint_manifest::SinkCommitStatus;
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let mut m = make_manifest(1, 10);
m.sink_commit_statuses
.insert("sink-a".into(), SinkCommitStatus::Pending);
store.save(&m).await.unwrap();
m.sink_commit_statuses
.insert("sink-a".into(), SinkCommitStatus::Committed);
store.update_manifest(&m).await.unwrap();
let loaded = store.load_by_id(1).await.unwrap().unwrap();
assert_eq!(
loaded.sink_commit_statuses.get("sink-a"),
Some(&SinkCommitStatus::Committed)
);
}
#[tokio::test]
async fn test_obj_state_paths() {
let inner = Arc::new(object_store::memory::InMemory::new());
let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
store.save(&make_manifest(1, 1)).await.unwrap();
store
.save_state_data(1, &[bytes::Bytes::from_static(b"state-blob")])
.await
.unwrap();
let result = inner
.get_opts(
&object_store::path::Path::from("checkpoints/state-000001.bin"),
GetOptions::default(),
)
.await;
assert!(result.is_ok(), "state path should exist");
}
#[tokio::test]
async fn test_obj_latest_json_format() {
let inner = Arc::new(object_store::memory::InMemory::new());
let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
store.save(&make_manifest(5, 50)).await.unwrap();
let data = inner
.get_opts(
&object_store::path::Path::from("manifests/latest.json"),
GetOptions::default(),
)
.await
.unwrap()
.bytes()
.await
.unwrap();
let pointer: super::LatestPointer = serde_json::from_slice(&data).unwrap();
assert_eq!(pointer.checkpoint_id, 5);
}
#[tokio::test]
async fn test_obj_latest_monotonic_guard_skips_regression() {
let inner = Arc::new(object_store::memory::InMemory::new());
let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
store.save(&make_manifest(10, 10)).await.unwrap();
store.save(&make_manifest(5, 5)).await.unwrap();
let loaded = store.load_latest().await.unwrap().unwrap();
assert_eq!(
loaded.checkpoint_id, 10,
"latest pointer should not regress to an older id"
);
}
#[tokio::test]
async fn test_validate_checkpoint_valid() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let m = make_manifest(1, 1);
store.save(&m).await.unwrap();
let result = store.validate_checkpoint(1).await.unwrap();
assert!(result.valid, "valid checkpoint: {:?}", result.issues);
assert!(result.issues.is_empty());
}
#[tokio::test]
async fn test_validate_checkpoint_epoch_zero_invalid() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let m = make_manifest(1, 0);
store.save(&m).await.unwrap();
let result = store.validate_checkpoint(1).await.unwrap();
assert!(!result.valid, "epoch=0 should be invalid");
assert!(
result.issues.iter().any(|i| i.message().contains("epoch")),
"should mention epoch: {:?}",
result.issues
);
}
#[tokio::test]
async fn test_validate_checkpoint_missing_manifest() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let result = store.validate_checkpoint(99).await.unwrap();
assert!(!result.valid);
assert!(result.issues[0].message().contains("not found"));
}
#[tokio::test]
async fn test_validate_checkpoint_corrupt_manifest() {
let dir = tempfile::tempdir().unwrap();
let store = FileSystemCheckpointStore::new(dir.path(), 10);
let cp_dir = dir.path().join("checkpoints/checkpoint_000001");
std::fs::create_dir_all(&cp_dir).unwrap();
std::fs::write(cp_dir.join("manifest.json"), "not valid json").unwrap();
let result = store.validate_checkpoint(1).await.unwrap();
assert!(!result.valid);
assert!(
result.issues[0].message().contains("corrupt manifest"),
"expected corrupt manifest issue: {:?}",
result.issues
);
}
#[tokio::test]
async fn test_validate_checkpoint_state_checksum_ok() {
let dir = tempfile::tempdir().unwrap();
let store = FileSystemCheckpointStore::new(dir.path(), 10);
let state = b"important operator state";
let m = make_manifest(1, 1);
store
.save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
.await
.unwrap();
let result = store.validate_checkpoint(1).await.unwrap();
assert!(result.valid, "checksum should match: {:?}", result.issues);
}
#[tokio::test]
async fn test_validate_checkpoint_state_checksum_mismatch() {
let dir = tempfile::tempdir().unwrap();
let store = FileSystemCheckpointStore::new(dir.path(), 10);
let state = b"original state";
let m = make_manifest(1, 1);
store
.save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
.await
.unwrap();
let state_path = dir.path().join("checkpoints/checkpoint_000001/state.bin");
std::fs::write(&state_path, b"corrupted data!!").unwrap();
let result = store.validate_checkpoint(1).await.unwrap();
assert!(!result.valid, "corrupted state should be invalid");
assert!(
result
.issues
.iter()
.any(|i| i.message().contains("checksum mismatch")),
"should report checksum mismatch: {:?}",
result.issues
);
}
#[tokio::test]
async fn test_validate_checkpoint_state_missing_when_expected() {
let dir = tempfile::tempdir().unwrap();
let store = FileSystemCheckpointStore::new(dir.path(), 10);
let m = make_manifest(1, 1);
store
.save_with_state(&m, Some(&[bytes::Bytes::from_static(b"state")]))
.await
.unwrap();
let state_path = dir.path().join("checkpoints/checkpoint_000001/state.bin");
std::fs::remove_file(&state_path).unwrap();
let result = store.validate_checkpoint(1).await.unwrap();
assert!(!result.valid);
assert!(
result
.issues
.iter()
.any(|i| i.message().contains("not found")),
"should report missing state: {:?}",
result.issues
);
}
#[tokio::test]
async fn test_recover_latest_validated_skips_corrupt() {
let dir = tempfile::tempdir().unwrap();
let store = FileSystemCheckpointStore::new(dir.path(), 10);
store.save(&make_manifest(1, 10)).await.unwrap();
store.save(&make_manifest(2, 20)).await.unwrap();
let cp2_manifest = dir
.path()
.join("checkpoints/checkpoint_000002/manifest.json");
std::fs::write(cp2_manifest, "<<<corrupt>>>").unwrap();
let report = store.recover_latest_validated().await.unwrap();
assert_eq!(report.chosen_id, Some(1));
assert_eq!(report.skipped.len(), 1);
assert_eq!(report.skipped[0].0, 2);
assert_eq!(report.examined, 2);
}
#[tokio::test]
async fn test_recover_latest_validated_fresh_start() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let report = store.recover_latest_validated().await.unwrap();
assert!(report.chosen_id.is_none());
assert_eq!(report.examined, 0);
}
#[tokio::test]
async fn test_recover_latest_validated_all_corrupt_is_fresh_start() {
let dir = tempfile::tempdir().unwrap();
let store = FileSystemCheckpointStore::new(dir.path(), 10);
store.save(&make_manifest(1, 1)).await.unwrap();
let cp_manifest = dir
.path()
.join("checkpoints/checkpoint_000001/manifest.json");
std::fs::write(cp_manifest, "corrupt").unwrap();
let report = store.recover_latest_validated().await.unwrap();
assert!(report.chosen_id.is_none());
}
#[tokio::test]
async fn test_cleanup_orphans_removes_stateless_dirs() {
let dir = tempfile::tempdir().unwrap();
let store = FileSystemCheckpointStore::new(dir.path(), 10);
let orphan_dir = dir.path().join("checkpoints/checkpoint_000099");
std::fs::create_dir_all(&orphan_dir).unwrap();
std::fs::write(orphan_dir.join("state.bin"), b"orphaned").unwrap();
store.save(&make_manifest(1, 1)).await.unwrap();
let cleaned = store.cleanup_orphans().await.unwrap();
assert_eq!(cleaned, 1);
assert!(!orphan_dir.exists());
assert!(store.load_by_id(1).await.unwrap().is_some());
}
#[tokio::test]
async fn test_cleanup_orphans_noop_when_clean() {
let dir = tempfile::tempdir().unwrap();
let store = FileSystemCheckpointStore::new(dir.path(), 10);
store.save(&make_manifest(1, 1)).await.unwrap();
let cleaned = store.cleanup_orphans().await.unwrap();
assert_eq!(cleaned, 0);
}
#[tokio::test]
async fn test_save_with_state_writes_checksum() {
let dir = tempfile::tempdir().unwrap();
let store = FileSystemCheckpointStore::new(dir.path(), 10);
let state = b"state-data-for-checksum";
let m = make_manifest(1, 1);
store
.save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
.await
.unwrap();
let loaded = store.load_latest().await.unwrap().unwrap();
assert!(
loaded.state_checksum.is_some(),
"state_checksum should be set"
);
let expected = sha256_hex(state);
assert_eq!(loaded.state_checksum.unwrap(), expected);
}
#[tokio::test]
async fn test_state_checksum_backward_compat() {
let json = r#"{
"version": 1,
"checkpoint_id": 1,
"epoch": 1,
"timestamp_ms": 1000
}"#;
let m: CheckpointManifest = serde_json::from_str(json).unwrap();
assert!(m.state_checksum.is_none());
}
#[tokio::test]
async fn test_obj_validate_checkpoint_valid() {
let store = make_obj_store();
store.save(&make_manifest(1, 1)).await.unwrap();
let result = store.validate_checkpoint(1).await.unwrap();
assert!(result.valid, "valid checkpoint: {:?}", result.issues);
}
#[tokio::test]
async fn test_obj_validate_checkpoint_missing() {
let store = make_obj_store();
let result = store.validate_checkpoint(99).await.unwrap();
assert!(!result.valid);
}
#[tokio::test]
async fn test_obj_validate_state_checksum() {
let store = ObjectStoreCheckpointStore::new(
Arc::new(object_store::memory::InMemory::new()),
String::new(),
10,
);
let state = b"obj-store-state-data";
let m = make_manifest(1, 1);
store
.save_with_state(&m, Some(&[bytes::Bytes::from_static(state)]))
.await
.unwrap();
let result = store.validate_checkpoint(1).await.unwrap();
assert!(result.valid, "checksum should match: {:?}", result.issues);
}
#[tokio::test]
async fn test_obj_recover_latest_validated() {
let store = ObjectStoreCheckpointStore::new(
Arc::new(object_store::memory::InMemory::new()),
String::new(),
10,
);
store.save(&make_manifest(1, 10)).await.unwrap();
store.save(&make_manifest(2, 20)).await.unwrap();
let report = store.recover_latest_validated().await.unwrap();
assert_eq!(report.chosen_id, Some(2));
assert!(report.skipped.is_empty());
}
#[tokio::test]
async fn test_obj_cleanup_orphans() {
let inner = Arc::new(object_store::memory::InMemory::new());
let store = ObjectStoreCheckpointStore::new(inner.clone(), String::new(), 10);
let state = b"state-with-manifest";
store
.save_with_state(
&make_manifest(1, 1),
Some(&[bytes::Bytes::from_static(state)]),
)
.await
.unwrap();
let orphan_path = object_store::path::Path::from("checkpoints/state-000099.bin");
inner
.put_opts(
&orphan_path,
PutPayload::from_bytes(bytes::Bytes::from_static(b"orphan")),
PutOptions::default(),
)
.await
.unwrap();
let cleaned = store.cleanup_orphans().await.unwrap();
assert_eq!(cleaned, 1);
let real_state = store.load_state_data(1).await.unwrap();
assert!(real_state.is_some());
}
}