#![allow(clippy::disallowed_types)]
use std::collections::HashMap;
use laminar_storage::checkpoint_manifest::{CheckpointManifest, SinkCommitStatus};
use laminar_storage::checkpoint_store::CheckpointStore;
use laminar_storage::ValidationResult;
use tracing::{debug, error, info, warn};
use crate::checkpoint_coordinator::{
connector_to_source_checkpoint, RegisteredSink, RegisteredSource,
};
use crate::error::DbError;
#[derive(Debug)]
pub struct RecoveredState {
pub manifest: CheckpointManifest,
pub sources_restored: usize,
pub tables_restored: usize,
pub sinks_rolled_back: usize,
pub source_errors: HashMap<String, String>,
pub sink_errors: HashMap<String, String>,
}
impl RecoveredState {
#[must_use]
pub fn epoch(&self) -> u64 {
self.manifest.epoch
}
#[must_use]
pub fn watermark(&self) -> Option<i64> {
self.manifest.watermark
}
#[must_use]
pub fn has_errors(&self) -> bool {
!self.source_errors.is_empty() || !self.sink_errors.is_empty()
}
#[must_use]
pub fn operator_states(
&self,
) -> &HashMap<String, laminar_storage::checkpoint_manifest::OperatorCheckpoint> {
&self.manifest.operator_states
}
#[must_use]
pub fn wal_position(&self) -> u64 {
self.manifest.wal_position
}
#[must_use]
pub fn per_core_wal_positions(&self) -> &[u64] {
&self.manifest.per_core_wal_positions
}
#[must_use]
pub fn table_store_checkpoint_path(&self) -> Option<&str> {
self.manifest.table_store_checkpoint_path.as_deref()
}
#[must_use]
pub fn has_inflight_data(&self) -> bool {
!self.manifest.inflight_data.is_empty()
}
#[must_use]
pub fn inflight_data(
&self,
) -> &HashMap<String, Vec<laminar_storage::checkpoint_manifest::InFlightRecord>> {
&self.manifest.inflight_data
}
}
pub struct RecoveryManager<'a> {
store: &'a dyn CheckpointStore,
strict: bool,
}
impl<'a> RecoveryManager<'a> {
#[must_use]
pub fn new(store: &'a dyn CheckpointStore) -> Self {
Self {
store,
strict: true,
}
}
#[must_use]
pub fn lenient(store: &'a dyn CheckpointStore) -> Self {
Self {
store,
strict: false,
}
}
pub(crate) async fn recover(
&self,
sources: &[RegisteredSource],
sinks: &[RegisteredSink],
table_sources: &[RegisteredSource],
) -> Result<Option<RecoveredState>, DbError> {
match self.store.load_latest() {
Ok(Some(manifest)) => {
if self.is_checkpoint_corrupt(&manifest) {
warn!(
checkpoint_id = manifest.checkpoint_id,
"[LDB-6010] latest checkpoint corrupt, trying fallback"
);
} else if Self::has_pending_sinks(&manifest) {
warn!(
checkpoint_id = manifest.checkpoint_id,
epoch = manifest.epoch,
"[LDB-6015] checkpoint has uncommitted sinks — source offsets \
may be past uncommitted data, falling back to previous checkpoint"
);
} else {
let state = self
.restore_from(manifest, sources, sinks, table_sources)
.await;
if let Err(e) = self.check_strict(&state) {
warn!(
checkpoint_id = state.manifest.checkpoint_id,
error = %e,
"latest checkpoint restore had strict errors, trying fallback"
);
} else {
return Ok(Some(state));
}
}
}
Ok(None) => {
info!("no checkpoint found, starting fresh");
return Ok(None);
}
Err(e) => {
warn!(error = %e, "latest checkpoint load failed, trying fallback");
}
}
let checkpoints = self.store.list().map_err(|e| {
DbError::Checkpoint(format!("failed to list checkpoints for fallback: {e}"))
})?;
if checkpoints.is_empty() {
warn!("no checkpoints available for fallback, starting fresh");
return Ok(None);
}
for &(checkpoint_id, _epoch) in checkpoints.iter().rev() {
match self.store.load_by_id(checkpoint_id) {
Ok(Some(manifest)) => {
if self.is_checkpoint_corrupt(&manifest) {
warn!(
checkpoint_id,
"[LDB-6010] fallback checkpoint corrupt, skipping"
);
continue;
}
if Self::has_pending_sinks(&manifest) {
warn!(
checkpoint_id,
"[LDB-6015] fallback checkpoint has uncommitted sinks, skipping"
);
continue;
}
info!(checkpoint_id, "recovering from fallback checkpoint");
let state = self
.restore_from(manifest, sources, sinks, table_sources)
.await;
if let Err(e) = self.check_strict(&state) {
warn!(
checkpoint_id,
error = %e,
"fallback checkpoint restore had strict errors, trying next"
);
continue;
}
return Ok(Some(state));
}
Ok(None) => {
debug!(checkpoint_id, "fallback checkpoint not found, skipping");
}
Err(e) => {
warn!(
checkpoint_id,
error = %e,
"fallback checkpoint load failed, trying next"
);
}
}
}
warn!("all checkpoints failed to load, starting fresh");
Ok(None)
}
fn resolve_external_states(&self, manifest: &mut CheckpointManifest) -> bool {
let external_ops: Vec<String> = manifest
.operator_states
.iter()
.filter(|(_, op)| op.external)
.map(|(name, _)| name.clone())
.collect();
if external_ops.is_empty() {
return true;
}
let state_data = match self.store.load_state_data(manifest.checkpoint_id) {
Ok(Some(data)) => data,
Ok(None) => {
error!(
checkpoint_id = manifest.checkpoint_id,
operators = ?external_ops,
"[LDB-6010] sidecar state.bin missing — external operator states \
cannot be resolved; operators will start with empty state"
);
for name in &external_ops {
if let Some(op) = manifest.operator_states.get_mut(name) {
*op = laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(&[]);
}
}
return false;
}
Err(e) => {
error!(
checkpoint_id = manifest.checkpoint_id,
error = %e,
operators = ?external_ops,
"[LDB-6010] failed to load sidecar state.bin — external operator states \
cannot be resolved; operators will start with empty state"
);
for name in &external_ops {
if let Some(op) = manifest.operator_states.get_mut(name) {
*op = laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(&[]);
}
}
return false;
}
};
let mut all_resolved = true;
for (name, op) in &mut manifest.operator_states {
if op.external {
#[allow(clippy::cast_possible_truncation)] let start = op.external_offset as usize;
#[allow(clippy::cast_possible_truncation)]
let end = start + op.external_length as usize;
if end <= state_data.len() {
let external_offset = op.external_offset;
let external_length = op.external_length;
let data = &state_data[start..end];
*op = laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(data);
debug!(
operator = %name,
offset = external_offset,
length = external_length,
"resolved external operator state from sidecar"
);
} else {
error!(
operator = %name,
offset = start,
length = op.external_length,
sidecar_len = state_data.len(),
"[LDB-6010] sidecar too small for external operator state — \
operator will start with empty state"
);
*op = laminar_storage::checkpoint_manifest::OperatorCheckpoint::inline(&[]);
all_resolved = false;
}
}
}
all_resolved
}
#[allow(clippy::too_many_lines)]
async fn restore_from(
&self,
mut manifest: CheckpointManifest,
sources: &[RegisteredSource],
sinks: &[RegisteredSink],
table_sources: &[RegisteredSource],
) -> RecoveredState {
let sidecar_ok = self.resolve_external_states(&mut manifest);
if !sidecar_ok && self.strict {
warn!(
checkpoint_id = manifest.checkpoint_id,
"[LDB-6010] sidecar resolution failed in strict mode — \
checkpoint will be rejected"
);
}
let validation_errors = manifest.validate();
if !validation_errors.is_empty() {
for err in &validation_errors {
warn!(
checkpoint_id = manifest.checkpoint_id,
error = %err,
"manifest validation warning"
);
}
}
if !manifest.source_names.is_empty() {
let mut current_sources: Vec<&str> = sources.iter().map(|s| s.name.as_str()).collect();
current_sources.sort_unstable();
let checkpoint_sources: Vec<&str> =
manifest.source_names.iter().map(String::as_str).collect();
let added: Vec<&&str> = current_sources
.iter()
.filter(|n| !checkpoint_sources.contains(n))
.collect();
let removed: Vec<&&str> = checkpoint_sources
.iter()
.filter(|n| !current_sources.contains(n))
.collect();
if !added.is_empty() {
warn!(
sources = ?added,
"new sources added since checkpoint — no saved offsets"
);
}
if !removed.is_empty() {
warn!(
sources = ?removed,
"sources removed since checkpoint — orphaned offsets"
);
}
}
if !manifest.sink_names.is_empty() {
let mut current_sinks: Vec<&str> = sinks.iter().map(|s| s.name.as_str()).collect();
current_sinks.sort_unstable();
let checkpoint_sinks: Vec<&str> =
manifest.sink_names.iter().map(String::as_str).collect();
let added: Vec<&&str> = current_sinks
.iter()
.filter(|n| !checkpoint_sinks.contains(n))
.collect();
let removed: Vec<&&str> = checkpoint_sinks
.iter()
.filter(|n| !current_sinks.contains(n))
.collect();
if !added.is_empty() {
warn!(
sinks = ?added,
"new sinks added since checkpoint — no saved epoch"
);
}
if !removed.is_empty() {
warn!(
sinks = ?removed,
"sinks removed since checkpoint — orphaned epochs"
);
}
}
info!(
checkpoint_id = manifest.checkpoint_id,
epoch = manifest.epoch,
validation_warnings = validation_errors.len(),
"recovering from checkpoint"
);
let mut result = RecoveredState {
manifest: manifest.clone(),
sources_restored: 0,
tables_restored: 0,
sinks_rolled_back: 0,
source_errors: HashMap::new(),
sink_errors: HashMap::new(),
};
if !sidecar_ok {
result.source_errors.insert(
"__sidecar__".into(),
"[LDB-6010] sidecar state.bin missing or truncated — \
operator state cannot be fully restored"
.into(),
);
}
for source in sources {
if !source.supports_replay {
info!(
source = %source.name,
"skipping restore for non-replayable source (at-most-once)"
);
continue;
}
if let Some(cp) = manifest.source_offsets.get(&source.name) {
let source_cp = connector_to_source_checkpoint(cp);
let mut connector = source.connector.lock().await;
match connector.restore(&source_cp).await {
Ok(()) => {
result.sources_restored += 1;
debug!(source = %source.name, epoch = cp.epoch, "source restored");
}
Err(e) => {
let msg = format!("source restore failed: {e}");
warn!(source = %source.name, error = %e, "source restore failed");
result.source_errors.insert(source.name.clone(), msg);
}
}
}
}
for table_source in table_sources {
if let Some(cp) = manifest.table_offsets.get(&table_source.name) {
let source_cp = connector_to_source_checkpoint(cp);
let mut connector = table_source.connector.lock().await;
match connector.restore(&source_cp).await {
Ok(()) => {
result.tables_restored += 1;
debug!(table = %table_source.name, epoch = cp.epoch, "table source restored");
}
Err(e) => {
let msg = format!("table source restore failed: {e}");
warn!(table = %table_source.name, error = %e, "table source restore failed");
result.source_errors.insert(table_source.name.clone(), msg);
}
}
}
}
for sink in sinks {
if sink.exactly_once {
let already_committed = manifest
.sink_commit_statuses
.get(&sink.name)
.is_some_and(|s| matches!(s, SinkCommitStatus::Committed));
if already_committed {
debug!(
sink = %sink.name,
epoch = manifest.epoch,
"sink already committed, skipping rollback"
);
continue;
}
sink.handle.rollback_epoch(manifest.epoch).await;
result.sinks_rolled_back += 1;
debug!(sink = %sink.name, epoch = manifest.epoch, "sink rolled back");
}
}
if !manifest.inflight_data.is_empty() {
let total_records: usize = manifest.inflight_data.values().map(Vec::len).sum();
info!(
operators = manifest.inflight_data.len(),
total_records, "unaligned checkpoint: inflight data present for replay"
);
}
info!(
checkpoint_id = manifest.checkpoint_id,
epoch = manifest.epoch,
sources_restored = result.sources_restored,
tables_restored = result.tables_restored,
sinks_rolled_back = result.sinks_rolled_back,
errors = result.source_errors.len() + result.sink_errors.len(),
has_inflight_data = !manifest.inflight_data.is_empty(),
"recovery complete"
);
result
}
fn is_checkpoint_corrupt(&self, manifest: &CheckpointManifest) -> bool {
if manifest.state_checksum.is_none() && manifest.operator_states.is_empty() {
return false;
}
match self.store.validate_checkpoint(manifest.checkpoint_id) {
Ok(ValidationResult {
valid: false,
ref issues,
..
}) => {
error!(
checkpoint_id = manifest.checkpoint_id,
issues = ?issues,
"[LDB-6010] checkpoint integrity check failed"
);
true
}
Ok(_) => false, Err(e) => {
error!(
checkpoint_id = manifest.checkpoint_id,
error = %e,
"[LDB-6010] checkpoint validation I/O error — \
treating as corrupt for safety"
);
true
}
}
}
fn has_pending_sinks(manifest: &CheckpointManifest) -> bool {
manifest
.sink_commit_statuses
.values()
.any(|s| matches!(s, SinkCommitStatus::Pending))
}
fn check_strict(&self, state: &RecoveredState) -> Result<(), DbError> {
if !self.strict || !state.has_errors() {
return Ok(());
}
let mut msgs: Vec<String> = state
.source_errors
.iter()
.map(|(k, v)| format!("source '{k}': {v}"))
.collect();
for (k, v) in &state.sink_errors {
msgs.push(format!("sink '{k}': {v}"));
}
Err(DbError::Checkpoint(format!(
"strict recovery failed — {} restore error(s): {}",
msgs.len(),
msgs.join("; ")
)))
}
pub fn load_latest(&self) -> Result<Option<CheckpointManifest>, DbError> {
self.store
.load_latest()
.map_err(|e| DbError::Checkpoint(format!("failed to load checkpoint: {e}")))
}
pub fn load_by_id(&self, checkpoint_id: u64) -> Result<Option<CheckpointManifest>, DbError> {
self.store.load_by_id(checkpoint_id).map_err(|e| {
DbError::Checkpoint(format!("failed to load checkpoint {checkpoint_id}: {e}"))
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use laminar_storage::checkpoint_manifest::OperatorCheckpoint;
use laminar_storage::checkpoint_store::FileSystemCheckpointStore;
fn make_store(dir: &std::path::Path) -> FileSystemCheckpointStore {
FileSystemCheckpointStore::new(dir, 3)
}
#[tokio::test]
async fn test_recover_no_checkpoint() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let mgr = RecoveryManager::new(&store);
let result = mgr.recover(&[], &[], &[]).await.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn test_recover_empty_checkpoint() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let manifest = CheckpointManifest::new(1, 5);
store.save(&manifest).unwrap();
let mgr = RecoveryManager::new(&store);
let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
assert_eq!(result.epoch(), 5);
assert_eq!(result.sources_restored, 0);
assert_eq!(result.tables_restored, 0);
assert_eq!(result.sinks_rolled_back, 0);
assert!(!result.has_errors());
}
#[tokio::test]
async fn test_recover_with_watermark() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let mut manifest = CheckpointManifest::new(1, 3);
manifest.watermark = Some(42_000);
store.save(&manifest).unwrap();
let mgr = RecoveryManager::new(&store);
let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
assert_eq!(result.watermark(), Some(42_000));
}
#[tokio::test]
async fn test_recover_with_operator_states() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let mut manifest = CheckpointManifest::new(1, 7);
manifest
.operator_states
.insert("0".to_string(), OperatorCheckpoint::inline(b"window-state"));
manifest
.operator_states
.insert("3".to_string(), OperatorCheckpoint::inline(b"filter-state"));
store.save(&manifest).unwrap();
let mgr = RecoveryManager::new(&store);
let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
assert_eq!(result.operator_states().len(), 2);
let op0 = result.operator_states().get("0").unwrap();
assert_eq!(op0.decode_inline().unwrap(), b"window-state");
}
#[tokio::test]
async fn test_recover_wal_positions() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let mut manifest = CheckpointManifest::new(1, 2);
manifest.wal_position = 4096;
manifest.per_core_wal_positions = vec![100, 200, 300];
store.save(&manifest).unwrap();
let mgr = RecoveryManager::new(&store);
let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
assert_eq!(result.wal_position(), 4096);
assert_eq!(result.per_core_wal_positions(), &[100, 200, 300]);
}
#[tokio::test]
async fn test_recover_table_store_path() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let mut manifest = CheckpointManifest::new(1, 1);
manifest.table_store_checkpoint_path = Some("/data/rocksdb_cp_001".into());
store.save(&manifest).unwrap();
let mgr = RecoveryManager::new(&store);
let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
assert_eq!(
result.table_store_checkpoint_path(),
Some("/data/rocksdb_cp_001")
);
}
#[test]
fn test_load_latest_no_checkpoint() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let mgr = RecoveryManager::new(&store);
assert!(mgr.load_latest().unwrap().is_none());
}
#[test]
fn test_load_by_id() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
store.save(&CheckpointManifest::new(1, 1)).unwrap();
store.save(&CheckpointManifest::new(2, 2)).unwrap();
let mgr = RecoveryManager::new(&store);
let m = mgr.load_by_id(1).unwrap().unwrap();
assert_eq!(m.checkpoint_id, 1);
let m2 = mgr.load_by_id(2).unwrap().unwrap();
assert_eq!(m2.checkpoint_id, 2);
assert!(mgr.load_by_id(999).unwrap().is_none());
}
#[tokio::test]
async fn test_recover_fallback_to_previous_checkpoint() {
let dir = tempfile::tempdir().unwrap();
let store = FileSystemCheckpointStore::new(dir.path(), 10);
let mut m1 = CheckpointManifest::new(1, 10);
m1.watermark = Some(1000);
store.save(&m1).unwrap();
let mut m2 = CheckpointManifest::new(2, 20);
m2.watermark = Some(2000);
store.save(&m2).unwrap();
let latest_manifest_path = dir
.path()
.join("checkpoints")
.join("checkpoint_000002")
.join("manifest.json");
std::fs::write(&latest_manifest_path, "not valid json!!!").unwrap();
let mgr = RecoveryManager::new(&store);
let result = mgr.recover(&[], &[], &[]).await.unwrap();
let recovered = result.expect("should recover from fallback checkpoint");
assert_eq!(recovered.manifest.checkpoint_id, 1);
assert_eq!(recovered.epoch(), 10);
assert_eq!(recovered.watermark(), Some(1000));
}
#[tokio::test]
async fn test_recover_all_checkpoints_corrupt_starts_fresh() {
let dir = tempfile::tempdir().unwrap();
let store = FileSystemCheckpointStore::new(dir.path(), 10);
store.save(&CheckpointManifest::new(1, 5)).unwrap();
let manifest_path = dir
.path()
.join("checkpoints")
.join("checkpoint_000001")
.join("manifest.json");
std::fs::write(&manifest_path, "corrupt").unwrap();
let mgr = RecoveryManager::new(&store);
let result = mgr.recover(&[], &[], &[]).await.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn test_recover_latest_ok_no_fallback_needed() {
let dir = tempfile::tempdir().unwrap();
let store = FileSystemCheckpointStore::new(dir.path(), 10);
store.save(&CheckpointManifest::new(1, 10)).unwrap();
store.save(&CheckpointManifest::new(2, 20)).unwrap();
let mgr = RecoveryManager::new(&store);
let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
assert_eq!(result.manifest.checkpoint_id, 2);
assert_eq!(result.epoch(), 20);
}
#[tokio::test]
async fn test_recover_with_sidecar_state() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let mut manifest = CheckpointManifest::new(1, 5);
let large_data = vec![0xAB; 2048];
manifest
.operator_states
.insert("big-op".into(), OperatorCheckpoint::external(0, 2048));
store.save_state_data(1, &large_data).unwrap();
store.save(&manifest).unwrap();
let mgr = RecoveryManager::new(&store);
let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
let op = result.operator_states().get("big-op").unwrap();
assert!(!op.external, "external state should be resolved to inline");
assert_eq!(op.decode_inline().unwrap(), large_data);
}
#[tokio::test]
async fn test_recover_mixed_inline_and_external() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let mut manifest = CheckpointManifest::new(1, 3);
manifest
.operator_states
.insert("small-op".into(), OperatorCheckpoint::inline(b"tiny"));
let large_data = vec![0xCD; 4096];
manifest
.operator_states
.insert("big-op".into(), OperatorCheckpoint::external(0, 4096));
store.save_state_data(1, &large_data).unwrap();
store.save(&manifest).unwrap();
let mgr = RecoveryManager::new(&store);
let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
let small = result.operator_states().get("small-op").unwrap();
assert_eq!(small.decode_inline().unwrap(), b"tiny");
let big = result.operator_states().get("big-op").unwrap();
assert_eq!(big.decode_inline().unwrap(), large_data);
}
#[tokio::test]
async fn test_recover_with_inflight_data() {
use laminar_storage::checkpoint_manifest::InFlightRecord;
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let mut manifest = CheckpointManifest::new(1, 5);
let record = InFlightRecord {
input_id: 0,
data_b64: "aW5mbGlnaHQtZXZlbnQ=".into(),
};
manifest
.inflight_data
.insert("join-op".into(), vec![record]);
store.save(&manifest).unwrap();
let mgr = RecoveryManager::new(&store);
let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
assert!(result.has_inflight_data());
let inflight = result.inflight_data();
assert_eq!(inflight.len(), 1);
let records = inflight.get("join-op").unwrap();
assert_eq!(records.len(), 1);
assert_eq!(records[0].input_id, 0);
assert_eq!(records[0].data_b64, "aW5mbGlnaHQtZXZlbnQ=");
}
#[tokio::test]
async fn test_recover_missing_sidecar_graceful() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let mut manifest = CheckpointManifest::new(1, 1);
manifest
.operator_states
.insert("orphan".into(), OperatorCheckpoint::external(0, 100));
store.save(&manifest).unwrap();
let mgr = RecoveryManager::lenient(&store);
let result = mgr.recover(&[], &[], &[]).await.unwrap().unwrap();
let op = result.operator_states().get("orphan").unwrap();
assert!(
!op.external,
"unresolved external state replaced with inline empty"
);
assert!(
op.state_b64.as_ref().is_none_or(String::is_empty),
"replaced state should be empty"
);
}
#[tokio::test]
async fn test_recovered_state_has_errors() {
let state = RecoveredState {
manifest: CheckpointManifest::new(1, 1),
sources_restored: 0,
tables_restored: 0,
sinks_rolled_back: 0,
source_errors: HashMap::new(),
sink_errors: HashMap::new(),
};
assert!(!state.has_errors());
let state_with_errors = RecoveredState {
manifest: CheckpointManifest::new(1, 1),
sources_restored: 0,
tables_restored: 0,
sinks_rolled_back: 0,
source_errors: HashMap::from([("source1".into(), "failed".into())]),
sink_errors: HashMap::new(),
};
assert!(state_with_errors.has_errors());
}
#[tokio::test]
async fn test_recover_missing_sidecar_strict_rejects() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let mut manifest = CheckpointManifest::new(1, 1);
manifest
.operator_states
.insert("orphan".into(), OperatorCheckpoint::external(0, 100));
store.save(&manifest).unwrap();
let mgr = RecoveryManager::new(&store);
let result = mgr.recover(&[], &[], &[]).await.unwrap();
assert!(
result.is_none(),
"strict mode should reject checkpoint with missing sidecar"
);
}
#[tokio::test]
async fn test_recover_skips_pending_sinks_falls_back() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let mut m1 = CheckpointManifest::new(1, 1);
m1.sink_commit_statuses
.insert("delta_sink".into(), SinkCommitStatus::Committed);
store.save(&m1).unwrap();
let mut m2 = CheckpointManifest::new(2, 2);
m2.sink_commit_statuses
.insert("delta_sink".into(), SinkCommitStatus::Pending);
store.save(&m2).unwrap();
let mgr = RecoveryManager::new(&store);
let result = mgr.recover(&[], &[], &[]).await.unwrap();
let state = result.expect("should recover from epoch 1 fallback");
assert_eq!(
state.epoch(),
1,
"recovery must skip checkpoint with Pending sinks"
);
}
#[tokio::test]
async fn test_recover_all_pending_starts_fresh() {
let dir = tempfile::tempdir().unwrap();
let store = make_store(dir.path());
let mut m = CheckpointManifest::new(1, 1);
m.sink_commit_statuses
.insert("sink".into(), SinkCommitStatus::Pending);
store.save(&m).unwrap();
let mgr = RecoveryManager::new(&store);
let result = mgr.recover(&[], &[], &[]).await.unwrap();
assert!(
result.is_none(),
"should start fresh when all checkpoints have pending sinks"
);
}
}