use std::collections::HashMap;
use lance::Dataset;
use serde::{Deserialize, Serialize};
use tracing::warn;
use crate::db::commit_graph::CommitGraph;
use crate::db::graph_coordinator::GraphCoordinator;
use crate::db::recovery_audit::{
RecoveryAudit, RecoveryAuditRecord, RecoveryKind, TableOutcome, now_micros,
};
use crate::db::schema_state::SchemaStateRecovery;
use crate::error::{OmniError, Result};
use crate::storage::StorageAdapter;
use super::Snapshot;
use super::publisher::{GraphNamespacePublisher, ManifestBatchPublisher};
use super::{ManifestChange, SubTableUpdate, TableRegistration, TableTombstone};
pub(crate) const RECOVERY_ACTOR: &str = "omnigraph:recovery";
pub(crate) const RECOVERY_DIR_NAME: &str = "__recovery";
pub(crate) const SIDECAR_SCHEMA_VERSION: u32 = 1;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum RecoveryMode {
Full,
RollForwardOnly,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub(crate) enum SidecarKind {
Mutation,
Load,
SchemaApply,
BranchMerge,
EnsureIndices,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct SidecarTablePin {
pub table_key: String,
pub table_path: String,
pub expected_version: u64,
pub post_commit_pin: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub table_branch: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct SidecarTableRegistration {
pub table_key: String,
pub table_path: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub table_branch: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct SidecarTombstone {
pub table_key: String,
pub tombstone_version: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct RecoverySidecar {
pub schema_version: u32,
pub operation_id: String,
pub started_at: String,
pub branch: Option<String>,
pub actor_id: Option<String>,
pub writer_kind: SidecarKind,
pub tables: Vec<SidecarTablePin>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub merge_source_commit_id: Option<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub additional_registrations: Vec<SidecarTableRegistration>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub tombstones: Vec<SidecarTombstone>,
}
#[derive(Debug, Clone)]
pub(crate) struct RecoverySidecarHandle {
pub(crate) operation_id: String,
pub(crate) sidecar_uri: String,
}
#[derive(Debug)]
pub(crate) struct SidecarSchemaError {
pub sidecar_uri: String,
pub found_version: u32,
pub supported_version: u32,
}
impl std::fmt::Display for SidecarSchemaError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"recovery sidecar at '{}' declares schema_version={}, but this \
binary supports only schema_version={}; refusing to interpret \
— upgrade omnigraph or remove the sidecar with operator review",
self.sidecar_uri, self.found_version, self.supported_version,
)
}
}
impl std::error::Error for SidecarSchemaError {}
impl From<SidecarSchemaError> for OmniError {
fn from(err: SidecarSchemaError) -> Self {
OmniError::manifest_internal(err.to_string())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum TableClassification {
NoMovement,
RolledPastExpected,
UnexpectedAtP1,
UnexpectedMultistep,
InvariantViolation { observed: u64 },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SidecarDecision {
RollForward,
RollBack,
Abort,
}
pub(crate) fn recovery_dir_uri(root_uri: &str) -> String {
let trimmed = root_uri.trim_end_matches('/');
format!("{}/{}", trimmed, RECOVERY_DIR_NAME)
}
pub(crate) fn sidecar_uri(root_uri: &str, operation_id: &str) -> String {
let dir = recovery_dir_uri(root_uri);
format!("{}/{}.json", dir, operation_id)
}
pub(crate) async fn write_sidecar(
root_uri: &str,
storage: &dyn StorageAdapter,
sidecar: &RecoverySidecar,
) -> Result<RecoverySidecarHandle> {
debug_assert_eq!(sidecar.schema_version, SIDECAR_SCHEMA_VERSION);
let uri = sidecar_uri(root_uri, &sidecar.operation_id);
let json = serde_json::to_string_pretty(sidecar).map_err(|err| {
OmniError::manifest_internal(format!("failed to serialize recovery sidecar: {}", err))
})?;
storage.write_text(&uri, &json).await?;
Ok(RecoverySidecarHandle {
operation_id: sidecar.operation_id.clone(),
sidecar_uri: uri,
})
}
pub(crate) async fn delete_sidecar(
handle: &RecoverySidecarHandle,
storage: &dyn StorageAdapter,
) -> Result<()> {
storage.delete(&handle.sidecar_uri).await
}
pub(crate) async fn list_sidecars(
root_uri: &str,
storage: &dyn StorageAdapter,
) -> Result<Vec<RecoverySidecar>> {
let dir = recovery_dir_uri(root_uri);
let mut uris = storage.list_dir(&dir).await?;
uris.sort();
let mut out = Vec::with_capacity(uris.len());
for uri in uris {
if !uri.ends_with(".json") {
continue;
}
let body = storage.read_text(&uri).await?;
let sidecar = parse_sidecar(&uri, &body)?;
out.push(sidecar);
}
Ok(out)
}
pub(crate) fn parse_sidecar(sidecar_uri: &str, body: &str) -> Result<RecoverySidecar> {
#[derive(Deserialize)]
struct Peek {
schema_version: u32,
}
let peek: Peek = serde_json::from_str(body).map_err(|err| {
OmniError::manifest_internal(format!(
"recovery sidecar at '{}' is not valid JSON: {}",
sidecar_uri, err
))
})?;
if peek.schema_version != SIDECAR_SCHEMA_VERSION {
return Err(SidecarSchemaError {
sidecar_uri: sidecar_uri.to_string(),
found_version: peek.schema_version,
supported_version: SIDECAR_SCHEMA_VERSION,
}
.into());
}
serde_json::from_str(body).map_err(|err| {
OmniError::manifest_internal(format!(
"recovery sidecar at '{}' failed to deserialize: {}",
sidecar_uri, err
))
})
}
pub(crate) fn classify_table(
pin: &SidecarTablePin,
lance_head: u64,
manifest_pinned: u64,
kind: SidecarKind,
) -> TableClassification {
use TableClassification::*;
if lance_head < manifest_pinned {
return InvariantViolation {
observed: lance_head,
};
}
if lance_head == manifest_pinned {
return NoMovement;
}
let strict = matches!(kind, SidecarKind::Mutation | SidecarKind::Load);
if strict {
if lance_head == manifest_pinned + 1 {
if pin.expected_version == manifest_pinned && pin.post_commit_pin == lance_head {
RolledPastExpected
} else {
UnexpectedAtP1
}
} else {
UnexpectedMultistep
}
} else {
if pin.expected_version == manifest_pinned {
RolledPastExpected
} else if lance_head == manifest_pinned + 1 {
UnexpectedAtP1
} else {
UnexpectedMultistep
}
}
}
pub(crate) fn decide(classifications: &[TableClassification]) -> SidecarDecision {
use SidecarDecision::*;
use TableClassification::*;
if classifications
.iter()
.any(|c| matches!(c, InvariantViolation { .. }))
{
return Abort;
}
if classifications
.iter()
.any(|c| matches!(c, NoMovement | UnexpectedAtP1 | UnexpectedMultistep))
{
return RollBack;
}
RollForward
}
pub(crate) async fn restore_table_to_version(
table_path: &str,
branch: Option<&str>,
target_version: u64,
) -> Result<()> {
let head = Dataset::open(table_path)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let head = match branch {
Some(b) if b != "main" => head
.checkout_branch(b)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?,
_ => head,
};
let mut to_restore = head
.checkout_version(target_version)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
to_restore
.restore()
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
Ok(())
}
pub(crate) async fn recover_manifest_drift(
root_uri: &str,
storage: std::sync::Arc<dyn StorageAdapter>,
coordinator: &mut GraphCoordinator,
mode: RecoveryMode,
schema_state_recovery: SchemaStateRecovery,
) -> Result<()> {
let sidecars = list_sidecars(root_uri, storage.as_ref()).await?;
if sidecars.is_empty() {
return Ok(());
}
for sidecar in sidecars {
let branch_snapshot = match sidecar.branch.as_deref() {
Some(b) => {
let mut branch_coord =
GraphCoordinator::open_branch(root_uri, b, std::sync::Arc::clone(&storage))
.await?;
branch_coord.refresh().await?;
branch_coord.snapshot()
}
None => {
coordinator.refresh().await?;
coordinator.snapshot()
}
};
process_sidecar(
root_uri,
storage.as_ref(),
&branch_snapshot,
&sidecar,
mode,
schema_state_recovery,
)
.await?;
}
coordinator.refresh().await?;
Ok(())
}
async fn process_sidecar(
root_uri: &str,
storage: &dyn StorageAdapter,
snapshot: &Snapshot,
sidecar: &RecoverySidecar,
mode: RecoveryMode,
schema_state_recovery: SchemaStateRecovery,
) -> Result<()> {
let mut states = Vec::with_capacity(sidecar.tables.len());
for pin in &sidecar.tables {
let lance_head = open_lance_head(&pin.table_path, pin.table_branch.as_deref()).await?;
let manifest_pinned = snapshot
.entry(&pin.table_key)
.map(|e| e.table_version)
.unwrap_or(0);
states.push(ClassifiedTable {
classification: classify_table(pin, lance_head, manifest_pinned, sidecar.writer_kind),
manifest_pinned,
lance_head,
});
}
let classifications = states
.iter()
.map(|state| state.classification)
.collect::<Vec<_>>();
match decide(&classifications) {
SidecarDecision::Abort => match mode {
RecoveryMode::Full => {
Err(OmniError::manifest_internal(format!(
"recovery sidecar '{}' has invariant violation; refusing to act \
— operator review required (sidecar at '{}', classifications: {:?})",
sidecar.operation_id,
sidecar_uri(root_uri, &sidecar.operation_id),
classifications,
)))
}
RecoveryMode::RollForwardOnly => {
warn!(
operation_id = sidecar.operation_id.as_str(),
writer_kind = ?sidecar.writer_kind,
"recovery: deferring sidecar with invariant violation to next ReadWrite open"
);
Ok(())
}
},
SidecarDecision::RollBack => {
let all_no_movement = states
.iter()
.all(|s| matches!(s.classification, TableClassification::NoMovement));
let any_pin_advanced = sidecar
.tables
.iter()
.zip(states.iter())
.any(|(pin, state)| state.manifest_pinned > pin.expected_version);
if all_no_movement && any_pin_advanced {
if matches!(mode, RecoveryMode::RollForwardOnly) {
warn!(
operation_id = sidecar.operation_id.as_str(),
writer_kind = ?sidecar.writer_kind,
"recovery: cleaning up stale sidecar from a prior successful \
roll-forward (audit-recovery, in-process refresh)"
);
} else {
warn!(
operation_id = sidecar.operation_id.as_str(),
writer_kind = ?sidecar.writer_kind,
"recovery: cleaning up stale sidecar from a prior successful \
roll-forward (manifest already advanced; recording RolledForward audit)"
);
}
return record_audit_recovery_rollforward(
root_uri, storage, snapshot, sidecar, &states,
)
.await;
}
if matches!(mode, RecoveryMode::RollForwardOnly) {
warn!(
operation_id = sidecar.operation_id.as_str(),
writer_kind = ?sidecar.writer_kind,
"recovery: deferring rollback-eligible sidecar to next ReadWrite open"
);
return Ok(());
}
warn!(
operation_id = sidecar.operation_id.as_str(),
writer_kind = ?sidecar.writer_kind,
"recovery: rolling back sidecar (mixed or unexpected state)"
);
roll_back_sidecar(root_uri, storage, snapshot, sidecar, &states).await
}
SidecarDecision::RollForward => {
if matches!(sidecar.writer_kind, SidecarKind::SchemaApply)
&& !schema_state_recovery.completed_schema_apply_sidecar_rename()
{
return match mode {
RecoveryMode::Full => {
warn!(
operation_id = sidecar.operation_id.as_str(),
"recovery: rolling back SchemaApply sidecar because schema staging \
files were not promoted in this recovery pass"
);
roll_back_sidecar(root_uri, storage, snapshot, sidecar, &states).await
}
RecoveryMode::RollForwardOnly => {
warn!(
operation_id = sidecar.operation_id.as_str(),
"recovery: deferring SchemaApply sidecar because schema staging files \
were not promoted in this recovery pass"
);
Ok(())
}
};
}
warn!(
operation_id = sidecar.operation_id.as_str(),
writer_kind = ?sidecar.writer_kind,
"recovery: rolling forward sidecar (Phase B completed; \
Phase C did not land)"
);
let (new_manifest_version, published_versions) =
roll_forward_all(root_uri, sidecar, snapshot).await?;
let mut outcomes: Vec<TableOutcome> = sidecar
.tables
.iter()
.map(|pin| TableOutcome {
table_key: pin.table_key.clone(),
from_version: pin.expected_version,
to_version: published_versions
.get(&pin.table_key)
.copied()
.unwrap_or(pin.post_commit_pin),
})
.collect();
for reg in &sidecar.additional_registrations {
outcomes.push(TableOutcome {
table_key: reg.table_key.clone(),
from_version: 0,
to_version: published_versions.get(®.table_key).copied().unwrap_or(0),
});
}
record_audit(
root_uri,
sidecar,
new_manifest_version,
RecoveryKind::RolledForward,
outcomes,
)
.await?;
delete_sidecar_by_operation_id(root_uri, storage, &sidecar.operation_id).await?;
Ok(())
}
}
}
#[derive(Debug, Clone, Copy)]
struct ClassifiedTable {
classification: TableClassification,
manifest_pinned: u64,
lance_head: u64,
}
async fn roll_back_sidecar(
root_uri: &str,
storage: &dyn StorageAdapter,
snapshot: &Snapshot,
sidecar: &RecoverySidecar,
states: &[ClassifiedTable],
) -> Result<()> {
let mut outcomes = Vec::with_capacity(sidecar.tables.len());
for (pin, state) in sidecar.tables.iter().zip(states.iter()) {
if matches!(
state.classification,
TableClassification::RolledPastExpected
| TableClassification::UnexpectedAtP1
| TableClassification::UnexpectedMultistep
) {
restore_table_to_version(
&pin.table_path,
pin.table_branch.as_deref(),
state.manifest_pinned,
)
.await?;
outcomes.push(TableOutcome {
table_key: pin.table_key.clone(),
from_version: state.lance_head,
to_version: state.manifest_pinned,
});
}
}
record_audit(
root_uri,
sidecar,
snapshot.version(),
RecoveryKind::RolledBack,
outcomes,
)
.await?;
delete_sidecar_by_operation_id(root_uri, storage, &sidecar.operation_id).await?;
Ok(())
}
async fn record_audit_recovery_rollforward(
root_uri: &str,
storage: &dyn StorageAdapter,
snapshot: &Snapshot,
sidecar: &RecoverySidecar,
states: &[ClassifiedTable],
) -> Result<()> {
let outcomes: Vec<TableOutcome> = sidecar
.tables
.iter()
.zip(states.iter())
.map(|(pin, state)| TableOutcome {
table_key: pin.table_key.clone(),
from_version: pin.expected_version,
to_version: state.manifest_pinned,
})
.collect();
record_audit(
root_uri,
sidecar,
snapshot.version(),
RecoveryKind::RolledForward,
outcomes,
)
.await?;
delete_sidecar_by_operation_id(root_uri, storage, &sidecar.operation_id).await?;
Ok(())
}
async fn roll_forward_all(
root_uri: &str,
sidecar: &RecoverySidecar,
snapshot: &Snapshot,
) -> Result<(u64, HashMap<String, u64>)> {
let total_changes =
sidecar.tables.len() + sidecar.additional_registrations.len() + sidecar.tombstones.len();
let mut updates: Vec<ManifestChange> = Vec::with_capacity(total_changes);
let mut expected: HashMap<String, u64> = HashMap::with_capacity(total_changes);
let mut published_versions: HashMap<String, u64> =
HashMap::with_capacity(sidecar.tables.len() + sidecar.additional_registrations.len());
for pin in &sidecar.tables {
let head_ds = Dataset::open(&pin.table_path)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let head_ds = match pin.table_branch.as_deref() {
Some(b) if b != "main" => head_ds
.checkout_branch(b)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?,
_ => head_ds,
};
let head_version = head_ds.version().version;
let row_count = head_ds
.count_rows(None)
.await
.map_err(|e| OmniError::Lance(e.to_string()))? as u64;
let table_relative_path = super::table_path_for_table_key(&pin.table_key)?;
let version_metadata = super::metadata::TableVersionMetadata::from_dataset(
root_uri,
&table_relative_path,
&head_ds,
)?;
updates.push(ManifestChange::Update(SubTableUpdate {
table_key: pin.table_key.clone(),
table_version: head_version,
table_branch: pin.table_branch.clone(),
row_count,
version_metadata,
}));
expected.insert(pin.table_key.clone(), pin.expected_version);
published_versions.insert(pin.table_key.clone(), head_version);
}
for reg in &sidecar.additional_registrations {
if snapshot.entry(®.table_key).is_some() {
if let Some(entry) = snapshot.entry(®.table_key) {
published_versions.insert(reg.table_key.clone(), entry.table_version);
}
continue;
}
let dataset_uri = format!("{}/{}", root_uri.trim_end_matches('/'), reg.table_path);
let head_ds = Dataset::open(&dataset_uri)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let head_ds = match reg.table_branch.as_deref() {
Some(b) if b != "main" => head_ds
.checkout_branch(b)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?,
_ => head_ds,
};
let head_version = head_ds.version().version;
let row_count = head_ds
.count_rows(None)
.await
.map_err(|e| OmniError::Lance(e.to_string()))? as u64;
let version_metadata = super::metadata::TableVersionMetadata::from_dataset(
root_uri,
®.table_path,
&head_ds,
)?;
updates.push(ManifestChange::RegisterTable(TableRegistration {
table_key: reg.table_key.clone(),
table_path: reg.table_path.clone(),
}));
updates.push(ManifestChange::Update(SubTableUpdate {
table_key: reg.table_key.clone(),
table_version: head_version,
table_branch: reg.table_branch.clone(),
row_count,
version_metadata,
}));
expected.insert(reg.table_key.clone(), 0);
published_versions.insert(reg.table_key.clone(), head_version);
}
for tomb in &sidecar.tombstones {
if snapshot.entry(&tomb.table_key).is_none() {
continue;
}
updates.push(ManifestChange::Tombstone(TableTombstone {
table_key: tomb.table_key.clone(),
tombstone_version: tomb.tombstone_version,
}));
expected.insert(
tomb.table_key.clone(),
tomb.tombstone_version.saturating_sub(1),
);
}
let publisher = GraphNamespacePublisher::new(root_uri, sidecar.branch.as_deref());
let new_dataset = publisher.publish(&updates, &expected).await?;
Ok((new_dataset.version().version, published_versions))
}
async fn record_audit(
root_uri: &str,
sidecar: &RecoverySidecar,
manifest_version: u64,
kind: RecoveryKind,
outcomes: Vec<TableOutcome>,
) -> Result<()> {
let target_branch = sidecar.branch.as_deref();
let mut graph = match target_branch {
Some(branch) => CommitGraph::open_at_branch(root_uri, branch).await?,
None => CommitGraph::open(root_uri).await?,
};
let graph_commit_id = match (
sidecar.writer_kind,
sidecar.merge_source_commit_id.as_deref(),
kind,
) {
(SidecarKind::BranchMerge, Some(source_id), RecoveryKind::RolledForward) => {
let parent_commit_id = graph.head_commit_id().await?.unwrap_or_default();
graph
.append_merge_commit(
target_branch,
manifest_version,
&parent_commit_id,
source_id,
Some(RECOVERY_ACTOR),
)
.await?
}
_ => {
graph
.append_commit(target_branch, manifest_version, Some(RECOVERY_ACTOR))
.await?
}
};
let mut audit = RecoveryAudit::open(root_uri).await?;
audit
.append(RecoveryAuditRecord {
graph_commit_id,
recovery_kind: kind,
recovery_for_actor: sidecar.actor_id.clone(),
operation_id: sidecar.operation_id.clone(),
sidecar_writer_kind: format!("{:?}", sidecar.writer_kind),
per_table_outcomes: outcomes,
created_at: now_micros()?,
})
.await?;
Ok(())
}
pub(crate) async fn has_schema_apply_sidecar(
root_uri: &str,
storage: &dyn StorageAdapter,
) -> Result<bool> {
let sidecars = list_sidecars(root_uri, storage).await?;
Ok(sidecars
.iter()
.any(|s| matches!(s.writer_kind, SidecarKind::SchemaApply)))
}
async fn open_lance_head(table_path: &str, branch: Option<&str>) -> Result<u64> {
let ds = Dataset::open(table_path)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?;
let ds = match branch {
Some(b) if b != "main" => ds
.checkout_branch(b)
.await
.map_err(|e| OmniError::Lance(e.to_string()))?,
_ => ds,
};
Ok(ds.version().version)
}
async fn delete_sidecar_by_operation_id(
root_uri: &str,
storage: &dyn StorageAdapter,
operation_id: &str,
) -> Result<()> {
storage.delete(&sidecar_uri(root_uri, operation_id)).await
}
pub(crate) fn new_sidecar(
writer_kind: SidecarKind,
branch: Option<String>,
actor_id: Option<String>,
tables: Vec<SidecarTablePin>,
) -> RecoverySidecar {
use std::time::{SystemTime, UNIX_EPOCH};
let operation_id = ulid::Ulid::new().to_string();
let started_at = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(d) => format!("{}", d.as_micros()),
Err(_) => "0".to_string(),
};
RecoverySidecar {
schema_version: SIDECAR_SCHEMA_VERSION,
operation_id,
started_at,
branch,
actor_id,
writer_kind,
tables,
merge_source_commit_id: None,
additional_registrations: Vec::new(),
tombstones: Vec::new(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::LocalStorageAdapter;
use crate::table_store::TableStore;
use arrow_array::{Int32Array, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema};
use std::sync::Arc;
fn person_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("age", DataType::Int32, true),
]))
}
fn person_batch(rows: &[(&str, Option<i32>)]) -> RecordBatch {
let ids: Vec<&str> = rows.iter().map(|(id, _)| *id).collect();
let ages: Vec<Option<i32>> = rows.iter().map(|(_, age)| *age).collect();
RecordBatch::try_new(
person_schema(),
vec![
Arc::new(StringArray::from(ids)),
Arc::new(Int32Array::from(ages)),
],
)
.unwrap()
}
fn make_pin(table_key: &str, table_path: &str, expected: u64, post: u64) -> SidecarTablePin {
SidecarTablePin {
table_key: table_key.to_string(),
table_path: table_path.to_string(),
expected_version: expected,
post_commit_pin: post,
table_branch: None,
}
}
#[test]
fn sidecar_round_trips_through_json() {
let original = new_sidecar(
SidecarKind::Mutation,
Some("main".to_string()),
Some("act-alice".to_string()),
vec![make_pin("node:Person", "file:///tmp/people.lance", 5, 6)],
);
let json = serde_json::to_string(&original).unwrap();
let parsed = parse_sidecar("file:///tmp/__recovery/x.json", &json).unwrap();
assert_eq!(parsed.schema_version, SIDECAR_SCHEMA_VERSION);
assert_eq!(parsed.operation_id, original.operation_id);
assert_eq!(parsed.writer_kind, SidecarKind::Mutation);
assert_eq!(parsed.branch.as_deref(), Some("main"));
assert_eq!(parsed.actor_id.as_deref(), Some("act-alice"));
assert_eq!(parsed.tables.len(), 1);
assert_eq!(parsed.tables[0].table_key, "node:Person");
}
#[test]
fn parse_sidecar_refuses_unknown_schema_version() {
let body = r#"{
"schema_version": 99,
"operation_id": "01H000000000000000000000XX",
"started_at": "0",
"branch": null,
"actor_id": null,
"writer_kind": "Mutation",
"tables": []
}"#;
let err = parse_sidecar("file:///tmp/__recovery/x.json", body).unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("schema_version=99") && msg.contains("supports only schema_version=1"),
"expected SidecarSchemaError mentioning the version mismatch, got: {}",
msg,
);
}
#[test]
fn classify_no_movement_when_head_equals_pinned() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 5, 5, SidecarKind::Mutation),
TableClassification::NoMovement,
);
}
#[test]
fn classify_rolled_past_expected_when_sidecar_matches_strict() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 6, 5, SidecarKind::Mutation),
TableClassification::RolledPastExpected,
);
}
#[test]
fn classify_unexpected_at_p1_when_sidecar_does_not_match_strict() {
let pin = make_pin("node:Person", "irrelevant", 5, 7);
assert_eq!(
classify_table(&pin, 6, 5, SidecarKind::Mutation),
TableClassification::UnexpectedAtP1,
);
}
#[test]
fn classify_unexpected_multistep_when_head_jumped_more_than_one_strict() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 8, 5, SidecarKind::Mutation),
TableClassification::UnexpectedMultistep,
);
}
#[test]
fn classify_invariant_violation_when_head_below_pinned() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 3, 5, SidecarKind::Mutation),
TableClassification::InvariantViolation { observed: 3 },
);
}
#[test]
fn classify_loose_match_accepts_multi_commit_drift_for_schema_apply() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 8, 5, SidecarKind::SchemaApply),
TableClassification::RolledPastExpected,
);
}
#[test]
fn classify_loose_match_accepts_multi_commit_drift_for_ensure_indices() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 9, 5, SidecarKind::EnsureIndices),
TableClassification::RolledPastExpected,
);
}
#[test]
fn classify_loose_match_no_movement_unchanged() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 5, 5, SidecarKind::SchemaApply),
TableClassification::NoMovement,
);
}
#[test]
fn classify_loose_match_invariant_violation_unchanged() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 3, 5, SidecarKind::SchemaApply),
TableClassification::InvariantViolation { observed: 3 },
);
}
#[test]
fn classify_loose_match_accepts_multi_commit_drift_for_branch_merge() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 8, 5, SidecarKind::BranchMerge),
TableClassification::RolledPastExpected,
);
}
#[test]
fn classify_loose_match_branch_merge_no_movement_unchanged() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 5, 5, SidecarKind::BranchMerge),
TableClassification::NoMovement,
);
}
#[test]
fn classify_loose_match_branch_merge_invariant_violation_unchanged() {
let pin = make_pin("node:Person", "irrelevant", 5, 6);
assert_eq!(
classify_table(&pin, 3, 5, SidecarKind::BranchMerge),
TableClassification::InvariantViolation { observed: 3 },
);
}
#[test]
fn decide_roll_forward_when_all_classifications_match() {
let cls = vec![
TableClassification::RolledPastExpected,
TableClassification::RolledPastExpected,
];
assert_eq!(decide(&cls), SidecarDecision::RollForward);
}
#[test]
fn decide_roll_back_on_mid_phase_b_crash_mix() {
let cls = vec![
TableClassification::RolledPastExpected,
TableClassification::NoMovement,
];
assert_eq!(decide(&cls), SidecarDecision::RollBack);
}
#[test]
fn decide_roll_back_on_unexpected_at_p1() {
let cls = vec![
TableClassification::RolledPastExpected,
TableClassification::UnexpectedAtP1,
];
assert_eq!(decide(&cls), SidecarDecision::RollBack);
}
#[test]
fn decide_abort_on_invariant_violation() {
let cls = vec![
TableClassification::RolledPastExpected,
TableClassification::InvariantViolation { observed: 1 },
];
assert_eq!(decide(&cls), SidecarDecision::Abort);
}
#[test]
fn decide_roll_forward_on_empty_slice() {
assert_eq!(decide(&[]), SidecarDecision::RollForward);
}
#[tokio::test]
async fn restore_table_to_version_appends_one_commit() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let mut ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
store
.append_batch(&uri, &mut ds, person_batch(&[("bob", Some(25))]))
.await
.unwrap();
store
.append_batch(&uri, &mut ds, person_batch(&[("carol", Some(40))]))
.await
.unwrap();
let head_before = ds.version().version;
assert_eq!(head_before, 3);
restore_table_to_version(&uri, None, 1).await.unwrap();
let post = Dataset::open(&uri).await.unwrap();
assert_eq!(post.version().version, head_before + 1);
let scanner = post.scan();
let batches: Vec<RecordBatch> =
futures::TryStreamExt::try_collect(scanner.try_into_stream().await.unwrap())
.await
.unwrap();
let total: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total, 1);
}
#[tokio::test]
async fn restore_table_to_version_always_appends_a_commit() {
let dir = tempfile::tempdir().unwrap();
let uri = format!("{}/people.lance", dir.path().to_str().unwrap());
let store = TableStore::new(dir.path().to_str().unwrap());
let mut ds = TableStore::write_dataset(&uri, person_batch(&[("alice", Some(30))]))
.await
.unwrap();
store
.append_batch(&uri, &mut ds, person_batch(&[("bob", Some(25))]))
.await
.unwrap();
restore_table_to_version(&uri, None, 1).await.unwrap();
let mid = Dataset::open(&uri).await.unwrap().version().version;
assert_eq!(mid, 3);
restore_table_to_version(&uri, None, 1).await.unwrap();
let post = Dataset::open(&uri).await.unwrap().version().version;
assert_eq!(
post,
mid + 1,
"restore must always append a commit (no fragment-set short-circuit)"
);
}
#[tokio::test]
async fn list_sidecars_returns_empty_when_dir_missing() {
let dir = tempfile::tempdir().unwrap();
let storage = LocalStorageAdapter::default();
let result = list_sidecars(dir.path().to_str().unwrap(), &storage)
.await
.unwrap();
assert!(result.is_empty());
}
#[tokio::test]
async fn write_then_list_then_delete_round_trip() {
let dir = tempfile::tempdir().unwrap();
std::fs::create_dir(dir.path().join(RECOVERY_DIR_NAME)).unwrap();
let storage = LocalStorageAdapter::default();
let root = dir.path().to_str().unwrap();
let sidecar = new_sidecar(
SidecarKind::Mutation,
Some("main".to_string()),
Some("act-alice".to_string()),
vec![make_pin("node:Person", "file:///tmp/x.lance", 5, 6)],
);
let handle = write_sidecar(root, &storage, &sidecar).await.unwrap();
assert_eq!(handle.operation_id, sidecar.operation_id);
let listed = list_sidecars(root, &storage).await.unwrap();
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].operation_id, sidecar.operation_id);
delete_sidecar(&handle, &storage).await.unwrap();
let after = list_sidecars(root, &storage).await.unwrap();
assert!(after.is_empty());
}
#[tokio::test]
async fn list_sidecars_skips_non_json_files() {
let dir = tempfile::tempdir().unwrap();
std::fs::create_dir(dir.path().join(RECOVERY_DIR_NAME)).unwrap();
std::fs::write(
dir.path().join(RECOVERY_DIR_NAME).join(".DS_Store"),
"noise",
)
.unwrap();
let storage = LocalStorageAdapter::default();
let result = list_sidecars(dir.path().to_str().unwrap(), &storage)
.await
.unwrap();
assert!(result.is_empty());
}
#[tokio::test]
async fn list_sidecars_returns_deterministic_order() {
let dir = tempfile::tempdir().unwrap();
std::fs::create_dir(dir.path().join(RECOVERY_DIR_NAME)).unwrap();
let storage = LocalStorageAdapter::default();
let root = dir.path().to_str().unwrap();
let ids = [
"01H000000000000000000000ZZ",
"01H000000000000000000000MM",
"01H000000000000000000000AA",
];
for id in &ids {
let sc = new_sidecar(
SidecarKind::Mutation,
None,
None,
vec![make_pin("node:Person", "/dev/null/x.lance", 1, 2)],
);
let mut sc = sc;
sc.operation_id = id.to_string();
write_sidecar(root, &storage, &sc).await.unwrap();
}
let listed = list_sidecars(root, &storage).await.unwrap();
let listed_ids: Vec<&str> = listed.iter().map(|s| s.operation_id.as_str()).collect();
let mut sorted_ids = listed_ids.clone();
sorted_ids.sort();
assert_eq!(
listed_ids, sorted_ids,
"list_sidecars must return sidecars in deterministic (sorted) order; got {:?}",
listed_ids,
);
}
}