#![cfg(feature = "failpoints")]
mod helpers;
use fail::FailScenario;
use futures::FutureExt;
use omnigraph::db::Omnigraph;
use omnigraph::error::{ManifestErrorKind, OmniError};
use omnigraph::failpoints::ScopedFailPoint;
use omnigraph::loader::LoadMode;
use helpers::recovery::{
FollowUpMutation, RecoveryExpectation, TableExpectation, assert_post_recovery_invariants,
branch_head_commit_id, single_sidecar_operation_id,
};
use helpers::{MUTATION_QUERIES, mixed_params, mutate_main, version_main};
const SCHEMA_V1: &str = "node Person { name: String @key }\n";
const SCHEMA_V2_ADDED_TYPE: &str =
"node Person { name: String @key }\nnode Company { name: String @key }\n";
fn node_table_uri(root: &str, type_name: &str) -> String {
let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
for &b in type_name.as_bytes() {
hash ^= b as u64;
hash = hash.wrapping_mul(0x100_0000_01b3);
}
format!("{}/nodes/{hash:016x}", root.trim_end_matches('/'))
}
#[tokio::test]
async fn branch_create_failpoint_triggers() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let db = Omnigraph::init(uri, helpers::TEST_SCHEMA).await.unwrap();
let _failpoint = ScopedFailPoint::new("branch_create.after_manifest_branch_create", "return");
let err = db.branch_create("feature").await.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: branch_create.after_manifest_branch_create")
);
}
#[tokio::test]
async fn branch_delete_partial_failure_converges_via_cleanup() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut main = helpers::init_and_load(&dir).await;
main.branch_create("feature").await.unwrap();
let mut feature = Omnigraph::open(&uri).await.unwrap();
helpers::mutate_branch(
&mut feature,
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap();
drop(feature);
let person_uri = node_table_uri(&uri, "Person");
{
let ds = lance::Dataset::open(&person_uri).await.unwrap();
assert!(
ds.list_branches().await.unwrap().contains_key("feature"),
"precondition: the owned table fork exists before delete"
);
}
{
let _fp = ScopedFailPoint::new("branch_delete.before_table_cleanup", "return");
main.branch_delete("feature").await.expect(
"branch_delete is best-effort after the manifest flip: a cleanup-step \
failure must not fail the call",
);
}
assert_eq!(main.branch_list().await.unwrap(), vec!["main".to_string()]);
{
let ds = lance::Dataset::open(&person_uri).await.unwrap();
assert!(
ds.list_branches().await.unwrap().contains_key("feature"),
"failed eager reclaim should leave the orphan for cleanup to reconcile"
);
}
main.cleanup(omnigraph::db::CleanupPolicyOptions {
keep_versions: Some(1),
older_than: None,
})
.await
.unwrap();
{
let ds = lance::Dataset::open(&person_uri).await.unwrap();
assert!(
!ds.list_branches().await.unwrap().contains_key("feature"),
"cleanup should reconcile the orphaned fork away"
);
}
main.branch_create("feature").await.unwrap();
let mut feature2 = Omnigraph::open(&uri).await.unwrap();
helpers::mutate_branch(
&mut feature2,
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Frank")], &[("$age", 41)]),
)
.await
.unwrap();
}
#[tokio::test]
async fn recreate_over_orphaned_fork_self_heals_without_cleanup() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut main = helpers::init_and_load(&dir).await;
main.branch_create("feature").await.unwrap();
let mut feature = Omnigraph::open(&uri).await.unwrap();
helpers::mutate_branch(
&mut feature,
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap();
drop(feature);
{
let _fp = ScopedFailPoint::new("branch_delete.before_table_cleanup", "return");
main.branch_delete("feature").await.unwrap();
}
main.branch_create("feature").await.unwrap();
let mut feature2 = Omnigraph::open(&uri).await.unwrap();
helpers::mutate_branch(
&mut feature2,
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Frank")], &[("$age", 41)]),
)
.await
.expect("recreate-over-orphan write must self-heal, not require cleanup");
let main_people = helpers::count_rows(&main, "node:Person").await;
let feature_people = helpers::count_rows_branch(&feature2, "feature", "node:Person").await;
assert_eq!(
feature_people,
main_people + 1,
"self-healed feature must fork fresh from main (+Frank only); \
main={main_people}, feature={feature_people} (main+2 ⇒ Eve resurrected)"
);
}
#[tokio::test]
async fn recreate_over_orphaned_fork_reports_indeterminate_authority_read() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let db = helpers::init_and_load(&dir).await;
db.branch_create("feature").await.unwrap();
let person_uri = node_table_uri(&uri, "Person");
{
let mut ds = lance::Dataset::open(&person_uri).await.unwrap();
let base = ds.version().version;
ds.create_branch("feature", base, None).await.unwrap();
}
let row = r#"{"type":"Person","data":{"name":"Grace","age":37}}"#;
{
let _fp = ScopedFailPoint::new("classify.fresh_read", "return");
let err = db
.load_as("feature", None, row, LoadMode::Merge, None)
.await
.expect_err("indeterminate authority read must fail retryably");
match &err {
OmniError::Manifest(manifest) => {
assert_eq!(manifest.kind, ManifestErrorKind::Conflict);
assert!(
manifest.details.is_none(),
"indeterminate authority read is not an expected-version mismatch: {manifest:?}"
);
}
other => panic!("expected manifest conflict, got {other:?}"),
}
let message = err.to_string();
assert!(
message.contains("could not verify")
&& message.contains("fresh manifest authority was unavailable")
&& message.contains("refresh and retry"),
"error should name the unavailable authority read, got: {message}"
);
assert!(
!message.contains("expected manifest table version"),
"indeterminate authority must not be reported as a version mismatch: {message}"
);
let ds = lance::Dataset::open(&person_uri).await.unwrap();
assert!(
ds.list_branches().await.unwrap().contains_key("feature"),
"ambiguous orphan status must leave the fork for a later retry"
);
}
db.load_as("feature", None, row, LoadMode::Merge, None)
.await
.expect("when fresh authority is available, the orphan is reclaimed and write converges");
}
#[tokio::test]
async fn cleanup_isolates_single_table_failure() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = helpers::init_and_load(&dir).await;
let person_uri = node_table_uri(&uri, "Person");
{
let mut ds = lance::Dataset::open(&person_uri).await.unwrap();
let base = ds.version().version;
ds.create_branch("ghost", base, None).await.unwrap();
}
let _fp = ScopedFailPoint::new("cleanup.table_gc", "1*return");
let stats = db
.cleanup(omnigraph::db::CleanupPolicyOptions {
keep_versions: Some(1),
older_than: None,
})
.await
.expect("a single table's GC failure must not abort cleanup");
let errored = stats.iter().filter(|s| s.error.is_some()).count();
assert_eq!(
errored, 1,
"exactly one table's GC failure should be surfaced in stats, got {errored}"
);
assert!(
stats.len() >= 4,
"every node+edge table should still appear in the stats"
);
{
let ds = lance::Dataset::open(&person_uri).await.unwrap();
assert!(
!ds.list_branches().await.unwrap().contains_key("ghost"),
"reconcile should reclaim the orphan despite the GC failure"
);
}
}
#[tokio::test]
async fn cleanup_isolates_reconcile_failure() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = helpers::init_and_load(&dir).await;
let person_uri = node_table_uri(&uri, "Person");
{
let mut ds = lance::Dataset::open(&person_uri).await.unwrap();
let base = ds.version().version;
ds.create_branch("ghost", base, None).await.unwrap();
}
{
let _fp = ScopedFailPoint::new("cleanup.reconcile_fork", "1*return");
db.cleanup(omnigraph::db::CleanupPolicyOptions {
keep_versions: Some(1),
older_than: None,
})
.await
.expect("a reconcile force-delete failure must not abort cleanup");
}
{
let ds = lance::Dataset::open(&person_uri).await.unwrap();
assert!(
ds.list_branches().await.unwrap().contains_key("ghost"),
"the orphan whose reclaim was injected-to-fail should remain"
);
}
db.cleanup(omnigraph::db::CleanupPolicyOptions {
keep_versions: Some(1),
older_than: None,
})
.await
.unwrap();
{
let ds = lance::Dataset::open(&person_uri).await.unwrap();
assert!(
!ds.list_branches().await.unwrap().contains_key("ghost"),
"the second cleanup should reconcile the orphan"
);
}
}
#[tokio::test]
async fn cleanup_reclaims_orphaned_commit_graph_branch() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = helpers::init_and_load(&dir).await;
db.branch_create("feature").await.unwrap();
{
let _fp = ScopedFailPoint::new("branch_delete.before_commit_graph_reclaim", "return");
db.branch_delete("feature").await.unwrap();
}
let commits_uri = format!("{}/_graph_commits.lance", uri.trim_end_matches('/'));
{
let ds = lance::Dataset::open(&commits_uri).await.unwrap();
assert!(
ds.list_branches().await.unwrap().contains_key("feature"),
"precondition: the commit-graph branch should be orphaned after the failed reclaim"
);
}
db.cleanup(omnigraph::db::CleanupPolicyOptions {
keep_versions: Some(1),
older_than: None,
})
.await
.unwrap();
{
let ds = lance::Dataset::open(&commits_uri).await.unwrap();
assert!(
!ds.list_branches().await.unwrap().contains_key("feature"),
"cleanup should reclaim the orphaned commit-graph branch"
);
}
}
#[tokio::test]
async fn reconcile_skips_fork_when_fresh_recheck_is_unavailable_then_converges() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = helpers::init_and_load(&dir).await;
db.branch_create("feature").await.unwrap();
let person_uri = node_table_uri(&uri, "Person");
{
let mut ds = lance::Dataset::open(&person_uri).await.unwrap();
let base = ds.version().version;
ds.create_branch("feature", base, None).await.unwrap();
assert!(
ds.list_branches().await.unwrap().contains_key("feature"),
"precondition: forged orphan fork present"
);
}
{
let _fp = ScopedFailPoint::new("classify.fresh_read", "return");
db.cleanup(omnigraph::db::CleanupPolicyOptions {
keep_versions: Some(1),
older_than: None,
})
.await
.unwrap();
let ds = lance::Dataset::open(&person_uri).await.unwrap();
assert!(
ds.list_branches().await.unwrap().contains_key("feature"),
"reconcile must NOT delete a fork whose fresh re-check is inconclusive"
);
}
db.cleanup(omnigraph::db::CleanupPolicyOptions {
keep_versions: Some(1),
older_than: None,
})
.await
.unwrap();
{
let ds = lance::Dataset::open(&person_uri).await.unwrap();
assert!(
!ds.list_branches().await.unwrap().contains_key("feature"),
"next cleanup (fresh read available) must reclaim the confirmed orphan"
);
}
}
#[tokio::test]
async fn branch_create_recreates_over_commit_graph_zombie() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA)
.await
.unwrap();
db.branch_create("feature").await.unwrap();
{
let _fp = ScopedFailPoint::new("branch_delete.before_commit_graph_reclaim", "return");
db.branch_delete("feature").await.unwrap();
}
assert_eq!(db.branch_list().await.unwrap(), vec!["main".to_string()]);
db.branch_create("feature")
.await
.expect("branch_create should heal the zombie commit-graph branch and succeed");
assert!(
db.branch_list()
.await
.unwrap()
.contains(&"feature".to_string())
);
}
#[tokio::test]
async fn branch_create_rolls_back_manifest_on_commit_graph_failure() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA)
.await
.unwrap();
let err = {
let _fp = ScopedFailPoint::new("branch_create.after_manifest_branch_create", "return");
db.branch_create("feature").await.unwrap_err()
};
assert!(
!db.branch_list()
.await
.unwrap()
.contains(&"feature".to_string()),
"branch_create must roll back the manifest branch when the derived \
commit-graph branch fails, got error: {err}"
);
}
static FORK_A_AT_POINT: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
static FORK_RELEASE_A: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
#[tokio::test(flavor = "multi_thread")]
async fn fork_collision_with_live_concurrent_fork_is_retryable() {
use std::sync::atomic::Ordering::SeqCst;
let _scenario = FailScenario::setup();
FORK_A_AT_POINT.store(false, SeqCst);
FORK_RELEASE_A.store(false, SeqCst);
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let main = helpers::init_and_load(&dir).await;
main.branch_create("feature").await.unwrap();
fail::cfg_callback("fork.before_classify", || {
if FORK_A_AT_POINT
.compare_exchange(false, true, SeqCst, SeqCst)
.is_ok()
{
for _ in 0..2000 {
if FORK_RELEASE_A.load(SeqCst) {
break;
}
std::thread::sleep(std::time::Duration::from_millis(5));
}
}
})
.unwrap();
let uri_a = uri.clone();
let writer_a = tokio::spawn(async move {
let mut a = Omnigraph::open(&uri_a).await.unwrap();
helpers::mutate_branch(
&mut a,
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
});
for _ in 0..600 {
if FORK_A_AT_POINT.load(SeqCst) {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(5)).await;
}
assert!(
FORK_A_AT_POINT.load(SeqCst),
"writer A never reached the fork point"
);
let mut b = Omnigraph::open(&uri).await.unwrap();
helpers::mutate_branch(
&mut b,
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Frank")], &[("$age", 41)]),
)
.await
.unwrap();
FORK_RELEASE_A.store(true, SeqCst);
let err = writer_a
.await
.unwrap()
.expect_err("A's stale-snapshot fork should be a retryable conflict");
fail::remove("fork.before_classify");
let msg = err.to_string();
assert!(
!msg.contains("cleanup"),
"a live concurrent fork must not be misclassified as an orphan, got: {msg}"
);
assert!(
msg.contains("refresh and retry") || msg.contains("expected manifest table version"),
"expected a retryable stale-view error, got: {msg}"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn graph_publish_failpoint_triggers_before_commit_append() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let mut db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA)
.await
.unwrap();
let _failpoint = ScopedFailPoint::new("graph_publish.before_commit_append", "return");
let err = mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: graph_publish.before_commit_append")
);
}
#[tokio::test]
async fn schema_apply_pre_commit_crash_rolls_forward_via_sidecar() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
{
let db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap();
let _failpoint = ScopedFailPoint::new("schema_apply.after_staging_write", "return");
let err = db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: schema_apply.after_staging_write"),
"got: {}",
err
);
}
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
db.schema_source().as_str(),
SCHEMA_V2_ADDED_TYPE,
"live schema must reflect the rolled-forward apply (Company added)"
);
assert_no_staging_files(dir.path());
let company_rows = helpers::count_rows(&db, "node:Company").await;
assert_eq!(
company_rows, 0,
"node:Company must have a manifest entry post-recovery"
);
}
#[tokio::test]
async fn schema_apply_recovers_post_commit_crash() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
{
let db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap();
let _failpoint = ScopedFailPoint::new("schema_apply.after_manifest_commit", "return");
let err = db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: schema_apply.after_manifest_commit"),
"got: {}",
err
);
}
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(db.schema_source().as_str(), SCHEMA_V2_ADDED_TYPE);
assert_no_staging_files(dir.path());
}
#[tokio::test]
async fn schema_apply_recovers_partial_rename() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
{
let db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap();
db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap();
}
std::fs::copy(
dir.path().join("_schema.ir.json"),
dir.path().join("_schema.ir.json.staging"),
)
.unwrap();
std::fs::copy(
dir.path().join("__schema_state.json"),
dir.path().join("__schema_state.json.staging"),
)
.unwrap();
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(db.schema_source().as_str(), SCHEMA_V2_ADDED_TYPE);
assert_no_staging_files(dir.path());
}
#[tokio::test]
async fn recovery_rolls_forward_after_finalize_publisher_failure() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let operation_id;
{
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let err = mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"),
"unexpected error: {err}"
);
let recovery_dir = dir.path().join("__recovery");
let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert_eq!(
sidecars.len(),
1,
"exactly one sidecar should persist after the finalize failure"
);
operation_id = single_sidecar_operation_id(dir.path());
}
let db = Omnigraph::open(&uri).await.unwrap();
let person_count = helpers::count_rows(&db, "node:Person").await;
assert_eq!(
person_count, 1,
"exactly one person (Eve) must be visible after roll-forward"
);
drop(db);
assert_post_recovery_invariants(
dir.path(),
&operation_id,
RecoveryExpectation::RolledForward {
tables: vec![TableExpectation::main("node:Person").follow_up_mutation(
FollowUpMutation::new(
"main",
MUTATION_QUERIES,
"insert_person",
mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
),
)],
},
)
.await
.unwrap();
let db = Omnigraph::open(&uri).await.unwrap();
let person_count = helpers::count_rows(&db, "node:Person").await;
assert_eq!(
person_count, 2,
"Frank's insert must land normally after recovery"
);
}
#[tokio::test]
async fn inline_delete_conflict_writes_sidecar_before_rejecting() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let db = helpers::init_and_load(&dir).await;
let pre_snapshot = db
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
.await
.unwrap();
let pre_person_pin = pre_snapshot.entry("node:Person").unwrap().table_version;
let person_uri = node_table_uri(&uri, "Person");
{
let _pause_delete =
ScopedFailPoint::new("mutation.delete_node_pre_primary_delete", "pause");
let delete_params = helpers::params(&[("$name", "Alice")]);
let delete = db.mutate("main", MUTATION_QUERIES, "remove_person", &delete_params);
tokio::pin!(delete);
let mut concurrent_update_succeeded = false;
for _ in 0..50 {
if delete.as_mut().now_or_never().is_some() {
panic!("delete mutation completed before primary-delete failpoint was released");
}
let mut concurrent = Omnigraph::open_read_only(&uri).await.unwrap();
if mutate_main(
&mut concurrent,
MUTATION_QUERIES,
"set_age",
&mixed_params(&[("$name", "Bob")], &[("$age", 26)]),
)
.await
.is_ok()
{
concurrent_update_succeeded = true;
break;
}
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
}
assert!(
concurrent_update_succeeded,
"concurrent update must land while delete is paused"
);
fail::remove("mutation.delete_node_pre_primary_delete");
let err = delete.await.unwrap_err();
assert!(
err.to_string().contains("stale view of 'node:Person'")
|| err.to_string().contains("ExpectedVersionMismatch")
|| err.to_string().contains("expected version mismatch"),
"unexpected error: {err}",
);
}
let person_head = lance::Dataset::open(&person_uri)
.await
.unwrap()
.version()
.version;
assert!(
person_head > pre_person_pin,
"primary inline delete must have advanced node:Person before rejecting"
);
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
helpers::count_rows(&db, "node:Person").await,
4,
"manifest-conflicted delete must not remove net Person rows after recovery"
);
assert_eq!(
helpers::count_rows(&db, "edge:Knows").await,
3,
"manifest-conflicted delete must not remove net Knows rows after recovery"
);
}
#[tokio::test]
async fn recovery_rolls_forward_load_on_feature_branch() {
use omnigraph::loader::LoadMode;
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let operation_id;
let main_person_pin;
let feature_parent_commit_id;
{
let db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
db.branch_create("feature").await.unwrap();
db.mutate(
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "BeforeLoad")], &[("$age", 40)]),
)
.await
.unwrap();
main_person_pin = db
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
.await
.unwrap()
.entry("node:Person")
.expect("main must have Person")
.table_version;
feature_parent_commit_id = branch_head_commit_id(dir.path(), "feature").await.unwrap();
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let err = db
.load(
"feature",
r#"{"type":"Person","data":{"name":"FeatureLoad","age":41}}
"#,
LoadMode::Append,
)
.await
.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"),
"unexpected error: {err}"
);
operation_id = single_sidecar_operation_id(dir.path());
}
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
helpers::count_rows_branch(&db, "feature", "node:Person").await,
2,
"feature branch load row must be visible after recovery"
);
assert_eq!(
helpers::count_rows(&db, "node:Person").await,
0,
"feature branch load recovery must not publish the row to main"
);
drop(db);
assert_post_recovery_invariants(
dir.path(),
&operation_id,
RecoveryExpectation::RolledForward {
tables: vec![
TableExpectation::branch("node:Person", "feature")
.expected_main_manifest_pin(main_person_pin)
.expected_recovery_parent_commit_id(feature_parent_commit_id)
.follow_up_mutation(FollowUpMutation::new(
"feature",
MUTATION_QUERIES,
"insert_person",
mixed_params(&[("$name", "AfterLoad")], &[("$age", 42)]),
)),
],
},
)
.await
.unwrap();
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
helpers::count_rows_branch(&db, "feature", "node:Person").await,
3,
"follow-up feature mutation must succeed after load recovery"
);
assert_eq!(
helpers::count_rows(&db, "node:Person").await,
0,
"follow-up feature mutation must not move main"
);
}
#[tokio::test]
async fn recovery_rolls_forward_load_overwrite() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let operation_id;
let parent_commit_id;
{
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(&mut db, helpers::TEST_DATA, LoadMode::Overwrite)
.await
.unwrap();
parent_commit_id = branch_head_commit_id(dir.path(), "main").await.unwrap();
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let err = db
.load(
"main",
r#"{"type":"Person","data":{"name":"OverwriteLoad","age":41}}
"#,
LoadMode::Overwrite,
)
.await
.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"),
"unexpected error: {err}"
);
operation_id = single_sidecar_operation_id(dir.path());
}
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
helpers::count_rows(&db, "node:Person").await,
1,
"overwrite row must be visible after recovery rolls the load forward"
);
drop(db);
assert_post_recovery_invariants(
dir.path(),
&operation_id,
RecoveryExpectation::RolledForward {
tables: vec![
TableExpectation::main("node:Person")
.expected_recovery_parent_commit_id(parent_commit_id)
.follow_up_mutation(FollowUpMutation::new(
"main",
MUTATION_QUERIES,
"insert_person",
mixed_params(&[("$name", "AfterOverwriteLoad")], &[("$age", 42)]),
)),
],
},
)
.await
.unwrap();
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
helpers::count_rows(&db, "node:Person").await,
2,
"follow-up mutation must succeed after overwrite load recovery"
);
}
#[tokio::test]
async fn recovery_rolls_forward_ensure_indices_on_feature_branch() {
use lance::index::DatasetIndexExt;
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph::table_store::TableStore;
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let operation_id;
let feature_parent_commit_id;
let main_person_pin;
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"alice","age":30}}
"#,
LoadMode::Append,
)
.await
.unwrap();
db.branch_create("feature").await.unwrap();
db.mutate(
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "BeforeEnsure")], &[("$age", 42)]),
)
.await
.unwrap();
main_person_pin = db
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
.await
.unwrap()
.entry("node:Person")
.expect("main must have Person")
.table_version;
let person_uri = node_table_uri(&uri, "Person");
let store = TableStore::new(&uri);
let mut ds = store
.open_dataset_head(&person_uri, Some("feature"))
.await
.unwrap();
ds.drop_index("id_idx").await.unwrap();
let dropped_index_head = ds.version().version;
db.failpoint_publish_table_head_without_index_rebuild_for_test(
"feature",
"node:Person",
Some("feature"),
)
.await
.unwrap();
let feature_snapshot = db
.snapshot_of(omnigraph::db::ReadTarget::branch("feature"))
.await
.unwrap();
assert_eq!(
feature_snapshot
.entry("node:Person")
.expect("feature must have Person")
.table_version,
dropped_index_head,
"test setup must publish the dropped-index table head before ensure_indices runs",
);
feature_parent_commit_id = branch_head_commit_id(dir.path(), "feature").await.unwrap();
{
let _failpoint =
ScopedFailPoint::new("ensure_indices.post_phase_b_pre_manifest_commit", "return");
let err = db.ensure_indices_on("feature").await.unwrap_err();
assert!(
err.to_string().contains(
"injected failpoint triggered: ensure_indices.post_phase_b_pre_manifest_commit"
),
"unexpected error: {err}"
);
operation_id = single_sidecar_operation_id(dir.path());
}
drop(db);
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
helpers::count_rows_branch(&db, "feature", "node:Person").await,
2,
"feature should see inherited alice plus recovered branch-local row"
);
assert_eq!(
helpers::count_rows(&db, "node:Person").await,
1,
"ensure_indices branch recovery must not move main"
);
drop(db);
assert_post_recovery_invariants(
dir.path(),
&operation_id,
RecoveryExpectation::RolledForward {
tables: vec![
TableExpectation::branch("node:Person", "feature")
.expected_main_manifest_pin(main_person_pin)
.expected_recovery_parent_commit_id(feature_parent_commit_id)
.follow_up_mutation(FollowUpMutation::new(
"feature",
MUTATION_QUERIES,
"insert_person",
mixed_params(&[("$name", "AfterEnsure")], &[("$age", 44)]),
)),
],
},
)
.await
.unwrap();
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(
helpers::count_rows_branch(&db, "feature", "node:Person").await,
3,
"follow-up feature mutation must succeed after ensure_indices recovery"
);
assert_eq!(
helpers::count_rows(&db, "node:Person").await,
1,
"follow-up feature mutation must not move main"
);
}
#[tokio::test]
async fn refresh_runs_roll_forward_recovery_in_process() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
{
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let err = mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"),
"unexpected error: {err}"
);
let recovery_dir = dir.path().join("__recovery");
assert_eq!(
std::fs::read_dir(&recovery_dir).unwrap().count(),
1,
"exactly one sidecar must persist after the finalize failure"
);
}
db.refresh().await.expect("refresh must succeed");
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
let remaining: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert!(
remaining.is_empty(),
"sidecar must be deleted by refresh-time roll-forward; remaining: {:?}",
remaining,
);
}
let person_count = helpers::count_rows(&db, "node:Person").await;
assert_eq!(
person_count, 1,
"Eve must be visible after refresh-time roll-forward"
);
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
)
.await
.expect("Person insert must succeed after refresh-time recovery");
assert_eq!(helpers::count_rows(&db, "node:Person").await, 2);
}
#[tokio::test]
async fn load_after_finalize_publisher_failure_heals_without_reopen() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
{
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let err = load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"Alice","age":30}}
{"type":"Person","data":{"name":"Bob","age":25}}
{"type":"Company","data":{"name":"Acme"}}
{"edge":"WorksAt","from":"Alice","to":"Acme"}
"#,
LoadMode::Merge,
)
.await
.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"),
"unexpected error: {err}"
);
let recovery_dir = dir.path().join("__recovery");
assert_eq!(
std::fs::read_dir(&recovery_dir).unwrap().count(),
1,
"exactly one sidecar must persist after the finalize failure"
);
}
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"Carol","age":41}}
{"type":"Company","data":{"name":"Globex"}}
"#,
LoadMode::Merge,
)
.await
.expect(
"a follow-up load on the same handle must heal sidecar-covered \
drift in-process instead of demanding repair/restart",
);
assert_eq!(helpers::count_rows(&db, "node:Person").await, 3);
assert_eq!(helpers::count_rows(&db, "node:Company").await, 2);
assert_eq!(helpers::count_rows(&db, "edge:WorksAt").await, 1);
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
assert_eq!(
std::fs::read_dir(&recovery_dir).unwrap().count(),
0,
"sidecar must be consumed by the in-process roll-forward"
);
}
}
#[tokio::test]
async fn sidecar_write_failure_aborts_load_with_no_head_advance() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
let person_uri = node_table_uri(&uri, "Person");
let pre_head = lance::Dataset::open(&person_uri)
.await
.unwrap()
.version()
.version;
{
let _failpoint = ScopedFailPoint::new("recovery.sidecar_write", "return");
let err = load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"Alice","age":30}}
{"type":"Company","data":{"name":"Acme"}}
"#,
LoadMode::Merge,
)
.await
.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: recovery.sidecar_write"),
"unexpected error: {err}"
);
}
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
assert_eq!(
std::fs::read_dir(&recovery_dir).unwrap().count(),
0,
"a Phase A put failure must not leave a sidecar"
);
}
let post_head = lance::Dataset::open(&person_uri)
.await
.unwrap()
.version()
.version;
assert_eq!(
pre_head, post_head,
"a Phase A put failure must abort before any Lance HEAD advance"
);
let manifest_pin = db
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
.await
.unwrap()
.entry("node:Person")
.unwrap()
.table_version;
assert_eq!(manifest_pin, post_head, "no drift after a Phase A abort");
assert_eq!(helpers::count_rows(&db, "node:Person").await, 0);
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"Alice","age":30}}
{"type":"Company","data":{"name":"Acme"}}
"#,
LoadMode::Merge,
)
.await
.expect("a transient sidecar put failure must not wedge later writes");
assert_eq!(helpers::count_rows(&db, "node:Person").await, 1);
assert_eq!(helpers::count_rows(&db, "node:Company").await, 1);
}
#[tokio::test]
async fn s3_load_recovers_after_publisher_failure_without_reopen() {
use omnigraph::loader::{LoadMode, load_jsonl};
let Some(uri) = helpers::s3_test_graph_uri("failpoints") else {
eprintln!(
"skipping s3_load_recovers_after_publisher_failure_without_reopen: \
OMNIGRAPH_S3_TEST_BUCKET is not set"
);
return;
};
let _scenario = FailScenario::setup();
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
{
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let err = load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"Alice","age":30}}
{"type":"Company","data":{"name":"Acme"}}
"#,
LoadMode::Merge,
)
.await
.err()
.expect("finalize failpoint must fail the load");
assert!(
err.to_string()
.contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"),
"unexpected error: {err}"
);
}
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"Bob","age":25}}
"#,
LoadMode::Merge,
)
.await
.expect("the same-handle heal must converge on an S3-backed graph");
assert_eq!(helpers::count_rows(&db, "node:Person").await, 2);
assert_eq!(helpers::count_rows(&db, "node:Company").await, 1);
drop(db);
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(helpers::count_rows(&db, "node:Person").await, 2);
}
#[tokio::test]
async fn record_audit_failure_after_roll_forward_converges_on_next_write() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
{
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"Alice","age":30}}
"#,
LoadMode::Merge,
)
.await
.err()
.expect("finalize failpoint must fail the load");
}
{
let _failpoint = ScopedFailPoint::new("recovery.record_audit", "return");
let err = load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"Bob","age":25}}
"#,
LoadMode::Merge,
)
.await
.err()
.expect("an audit write failure mid-heal must fail the write");
assert!(
err.to_string()
.contains("injected failpoint triggered: recovery.record_audit"),
"unexpected error: {err}"
);
let recovery_dir = dir.path().join("__recovery");
assert_eq!(
std::fs::read_dir(&recovery_dir).unwrap().count(),
1,
"the sidecar must survive an audit write failure so the retry can record it"
);
}
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"Carol","age":41}}
"#,
LoadMode::Merge,
)
.await
.expect("recovery must converge once the audit fault clears");
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
assert_eq!(std::fs::read_dir(&recovery_dir).unwrap().count(), 0);
}
assert_eq!(helpers::count_rows(&db, "node:Person").await, 2);
let audit_uri = format!(
"{}/_graph_commit_recoveries.lance",
uri.trim_end_matches('/')
);
let audit_rows = lance::Dataset::open(&audit_uri)
.await
.expect("audit dataset exists after the retried recovery")
.count_rows(None)
.await
.unwrap();
assert_eq!(audit_rows, 1, "exactly one recovery audit row");
}
#[tokio::test]
async fn sidecar_list_failure_fails_write_and_open_loudly_then_clears() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
{
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let err = load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"Alice","age":30}}
"#,
LoadMode::Merge,
)
.await
.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"),
"unexpected error: {err}"
);
let recovery_dir = dir.path().join("__recovery");
assert_eq!(std::fs::read_dir(&recovery_dir).unwrap().count(), 1);
}
let _failpoint = ScopedFailPoint::new("recovery.sidecar_list", "return");
let err = load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"Bob","age":25}}
"#,
LoadMode::Merge,
)
.await
.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: recovery.sidecar_list"),
"the write-entry heal must surface a list failure loudly; got: {err}"
);
drop(db);
let err = Omnigraph::open(&uri)
.await
.err()
.expect("open must fail while the sidecar list fault is active");
assert!(
err.to_string()
.contains("injected failpoint triggered: recovery.sidecar_list"),
"the open-time sweep must surface a list failure loudly; got: {err}"
);
drop(_failpoint);
let db = Omnigraph::open(&uri).await.unwrap();
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
assert_eq!(
std::fs::read_dir(&recovery_dir).unwrap().count(),
0,
"open after the fault clears must recover the sidecar"
);
}
assert_eq!(helpers::count_rows(&db, "node:Person").await, 1);
}
#[tokio::test]
async fn sidecar_delete_failure_keeps_write_success_and_next_write_heals() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
{
let _failpoint = ScopedFailPoint::new("recovery.sidecar_delete", "return");
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"Alice","age":30}}
"#,
LoadMode::Merge,
)
.await
.expect("a Phase D delete failure must not fail a write that already published");
assert_eq!(helpers::count_rows(&db, "node:Person").await, 1);
let recovery_dir = dir.path().join("__recovery");
assert_eq!(
std::fs::read_dir(&recovery_dir).unwrap().count(),
1,
"the swallowed delete leaves a stale sidecar behind"
);
}
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"Bob","age":25}}
"#,
LoadMode::Merge,
)
.await
.expect("a stale sidecar from a failed Phase D delete must not block later writes");
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
assert_eq!(
std::fs::read_dir(&recovery_dir).unwrap().count(),
0,
"the stale sidecar must be consumed by the next write's heal"
);
}
assert_eq!(helpers::count_rows(&db, "node:Person").await, 2);
}
#[tokio::test]
async fn sidecar_write_failure_aborts_branch_merge_with_no_head_advance() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"Alice","age":30}}
"#,
LoadMode::Append,
)
.await
.unwrap();
db.branch_create("feature").await.unwrap();
helpers::mutate_branch(
&mut db,
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap();
helpers::mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Mallory")], &[("$age", 35)]),
)
.await
.unwrap();
let person_uri = node_table_uri(&uri, "Person");
let pre_head = lance::Dataset::open(&person_uri)
.await
.unwrap()
.version()
.version;
{
let _failpoint = ScopedFailPoint::new("recovery.sidecar_write", "return");
let err = db.branch_merge("feature", "main").await.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: recovery.sidecar_write"),
"unexpected error: {err}"
);
}
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
assert_eq!(
std::fs::read_dir(&recovery_dir).unwrap().count(),
0,
"a Phase A put failure must not leave a sidecar"
);
}
let post_head = lance::Dataset::open(&person_uri)
.await
.unwrap()
.version()
.version;
assert_eq!(
pre_head, post_head,
"a Phase A put failure must abort the merge before any target \
Lance HEAD advance"
);
assert_eq!(helpers::count_rows(&db, "node:Person").await, 2);
db.branch_merge("feature", "main")
.await
.expect("a transient sidecar put failure must not wedge the merge");
assert_eq!(helpers::count_rows(&db, "node:Person").await, 3);
}
#[tokio::test]
async fn mutation_after_finalize_publisher_failure_heals_without_reopen() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
{
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let err = mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"),
"unexpected error: {err}"
);
let recovery_dir = dir.path().join("__recovery");
assert_eq!(
std::fs::read_dir(&recovery_dir).unwrap().count(),
1,
"exactly one sidecar must persist after the finalize failure"
);
}
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
)
.await
.expect(
"a follow-up mutation on the same handle must heal sidecar-covered \
drift in-process instead of demanding repair/restart",
);
assert_eq!(helpers::count_rows(&db, "node:Person").await, 2);
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
assert_eq!(
std::fs::read_dir(&recovery_dir).unwrap().count(),
0,
"sidecar must be consumed by the in-process roll-forward"
);
}
}
#[tokio::test]
async fn schema_apply_after_finalize_publisher_failure_heals_without_reopen() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
{
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let err = load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"Alice","age":30}}
{"type":"Company","data":{"name":"Acme"}}
"#,
LoadMode::Merge,
)
.await
.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"),
"unexpected error: {err}"
);
let recovery_dir = dir.path().join("__recovery");
assert_eq!(std::fs::read_dir(&recovery_dir).unwrap().count(), 1);
}
let desired = format!("{}\nnode Tag {{ name: String @key }}\n", helpers::TEST_SCHEMA);
db.apply_schema(&desired).await.expect(
"schema apply on the same handle must heal sidecar-covered \
drift in-process instead of failing until restart",
);
assert_eq!(helpers::count_rows(&db, "node:Person").await, 1);
assert_eq!(helpers::count_rows(&db, "node:Company").await, 1);
assert_eq!(helpers::count_rows(&db, "node:Tag").await, 0);
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
assert_eq!(
std::fs::read_dir(&recovery_dir).unwrap().count(),
0,
"no sidecar may remain after heal + successful schema apply"
);
}
}
#[tokio::test]
async fn branch_merge_after_finalize_publisher_failure_heals_without_reopen() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"Alice","age":30}}
"#,
LoadMode::Append,
)
.await
.unwrap();
db.branch_create("feature").await.unwrap();
helpers::mutate_branch(
&mut db,
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap();
{
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let err = load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"Bob","age":25}}
"#,
LoadMode::Merge,
)
.await
.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: mutation.post_finalize_pre_publisher"),
"unexpected error: {err}"
);
let recovery_dir = dir.path().join("__recovery");
assert_eq!(std::fs::read_dir(&recovery_dir).unwrap().count(), 1);
}
db.branch_merge("feature", "main").await.expect(
"branch merge on the same handle must heal sidecar-covered \
drift in-process instead of failing or folding it silently",
);
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
assert_eq!(
std::fs::read_dir(&recovery_dir).unwrap().count(),
0,
"the load's sidecar must be consumed by the entry heal, not left behind"
);
}
assert_eq!(helpers::count_rows(&db, "node:Person").await, 3);
}
#[tokio::test]
async fn orphaned_branch_discard_is_idempotent_across_delete_failure() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Person\",\"data\":{\"name\":\"Alice\",\"age\":30}}\n",
LoadMode::Merge,
)
.await
.unwrap();
db.branch_create("feature").await.unwrap();
helpers::mutate_branch(
&mut db,
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap();
let person_uri = node_table_uri(&uri, "Person");
let sidecar_json = format!(
r#"{{
"schema_version": 1,
"operation_id": "01H000000000000000000000ID",
"started_at": "0",
"branch": "feature",
"actor_id": null,
"writer_kind": "Mutation",
"tables": [
{{
"table_key": "node:Person",
"table_path": "{person_uri}",
"expected_version": 999,
"post_commit_pin": 1000,
"table_branch": "feature"
}}
]
}}"#
);
let recovery_dir = dir.path().join("__recovery");
std::fs::create_dir_all(&recovery_dir).unwrap();
std::fs::write(
recovery_dir.join("01H000000000000000000000ID.json"),
&sidecar_json,
)
.unwrap();
db.branch_delete("feature").await.unwrap();
{
let _failpoint = ScopedFailPoint::new("recovery.sidecar_delete", "return");
let err = load_jsonl(
&mut db,
"{\"type\":\"Person\",\"data\":{\"name\":\"Bob\",\"age\":25}}\n",
LoadMode::Merge,
)
.await
.err()
.expect("a sidecar-delete fault mid-discard must fail the write");
assert!(
err.to_string()
.contains("injected failpoint triggered: recovery.sidecar_delete"),
"unexpected error: {err}"
);
assert_eq!(std::fs::read_dir(&recovery_dir).unwrap().count(), 1);
}
load_jsonl(
&mut db,
"{\"type\":\"Person\",\"data\":{\"name\":\"Bob\",\"age\":25}}\n",
LoadMode::Merge,
)
.await
.expect("the retry must complete the orphan discard and the write");
assert_eq!(std::fs::read_dir(&recovery_dir).unwrap().count(), 0);
let orphan_rows = helpers::recovery::recovery_audit_kinds(dir.path())
.await
.into_iter()
.filter(|kind| kind == "OrphanedBranchDiscarded")
.count();
assert_eq!(
orphan_rows, 1,
"exactly one OrphanedBranchDiscarded audit row despite the delete-fault retry"
);
}
#[tokio::test]
async fn drift_guard_names_both_paths_when_sidecar_list_fails() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Person\",\"data\":{\"name\":\"alice\",\"age\":30}}\n",
LoadMode::Append,
)
.await
.unwrap();
let snapshot = db
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
.await
.unwrap();
let entry = snapshot.entry("node:Person").unwrap();
let person_uri = format!("{}/{}", uri.trim_end_matches('/'), entry.table_path);
let manifest_pin = entry.table_version;
let mut ds = lance::Dataset::open(&person_uri).await.unwrap();
helpers::lance_delete_inline(&mut ds, "1 = 2").await;
let head_after_drift = ds.version().version;
let sidecar_json = format!(
r#"{{
"schema_version": 1,
"operation_id": "01H0000000000000000000LSTF",
"started_at": "0",
"branch": null,
"actor_id": null,
"writer_kind": "Mutation",
"tables": [
{{
"table_key":"node:Person",
"table_path":"{}",
"expected_version":{},
"post_commit_pin":{}
}}
]
}}"#,
person_uri,
manifest_pin - 1,
head_after_drift,
);
let recovery_dir = dir.path().join("__recovery");
std::fs::create_dir_all(&recovery_dir).unwrap();
std::fs::write(
recovery_dir.join("01H0000000000000000000LSTF.json"),
&sidecar_json,
)
.unwrap();
let _failpoint = ScopedFailPoint::new("recovery.sidecar_list", "1*off->1*return");
let err = load_jsonl(
&mut db,
"{\"type\":\"Person\",\"data\":{\"name\":\"bob\",\"age\":25}}\n",
LoadMode::Merge,
)
.await
.err()
.expect("drift must still fail the write");
let msg = err.to_string();
assert!(
msg.contains("could not classify the drift")
&& msg.contains("omnigraph repair")
&& msg.contains("reopen the graph read-write"),
"an unclassifiable drift must name BOTH recovery paths, not \
confidently route to repair; got: {msg}"
);
}
#[tokio::test]
async fn orphaned_branch_discard_converges_across_audit_append_failure() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Person\",\"data\":{\"name\":\"Alice\",\"age\":30}}\n",
LoadMode::Merge,
)
.await
.unwrap();
db.branch_create("feature").await.unwrap();
helpers::mutate_branch(
&mut db,
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap();
let person_uri = node_table_uri(&uri, "Person");
let sidecar_json = format!(
r#"{{
"schema_version": 1,
"operation_id": "01H000000000000000000000AF",
"started_at": "0",
"branch": "feature",
"actor_id": null,
"writer_kind": "Mutation",
"tables": [
{{
"table_key": "node:Person",
"table_path": "{person_uri}",
"expected_version": 999,
"post_commit_pin": 1000,
"table_branch": "feature"
}}
]
}}"#
);
let recovery_dir = dir.path().join("__recovery");
std::fs::create_dir_all(&recovery_dir).unwrap();
std::fs::write(
recovery_dir.join("01H000000000000000000000AF.json"),
&sidecar_json,
)
.unwrap();
db.branch_delete("feature").await.unwrap();
{
let _failpoint = ScopedFailPoint::new("recovery.orphan_discard_audit_append", "return");
let err = load_jsonl(
&mut db,
"{\"type\":\"Person\",\"data\":{\"name\":\"Bob\",\"age\":25}}\n",
LoadMode::Merge,
)
.await
.err()
.expect("an audit-append fault mid-discard must fail the write");
assert!(
err.to_string()
.contains("injected failpoint triggered: recovery.orphan_discard_audit_append"),
"unexpected error: {err}"
);
assert_eq!(
std::fs::read_dir(&recovery_dir).unwrap().count(),
1,
"the sidecar must survive an audit-append fault so the discard is retried"
);
let orphan_rows = helpers::recovery::recovery_audit_kinds(dir.path())
.await
.into_iter()
.filter(|kind| kind == "OrphanedBranchDiscarded")
.count();
assert_eq!(orphan_rows, 0, "no audit row landed before the fault");
}
load_jsonl(
&mut db,
"{\"type\":\"Person\",\"data\":{\"name\":\"Bob\",\"age\":25}}\n",
LoadMode::Merge,
)
.await
.expect("the retry must complete the orphan discard and the write");
assert_eq!(std::fs::read_dir(&recovery_dir).unwrap().count(), 0);
let orphan_rows = helpers::recovery::recovery_audit_kinds(dir.path())
.await
.into_iter()
.filter(|kind| kind == "OrphanedBranchDiscarded")
.count();
assert_eq!(
orphan_rows, 1,
"exactly one OrphanedBranchDiscarded audit row despite the audit-fault retry"
);
}
#[tokio::test]
async fn load_after_schema_apply_phase_b_failure_uses_recovered_catalog() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Person\",\"data\":{\"name\":\"alice\",\"age\":30}}\n",
LoadMode::Append,
)
.await
.unwrap();
let v2_schema = r#"node Person {
name: String @key
age: I32?
city: String?
}
node Company {
name: String @key
}
node Tag {
label: String @key
}
edge Knows: Person -> Person {
since: Date?
}
edge WorksAt: Person -> Company
"#;
{
let _failpoint = ScopedFailPoint::new("schema_apply.after_staging_write", "return");
let err = db.apply_schema(v2_schema).await.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: schema_apply.after_staging_write"),
"unexpected error: {err}"
);
let recovery_dir = dir.path().join("__recovery");
assert_eq!(std::fs::read_dir(&recovery_dir).unwrap().count(), 1);
}
load_jsonl(
&mut db,
"{\"type\":\"Tag\",\"data\":{\"label\":\"t1\"}}\n",
LoadMode::Merge,
)
.await
.expect(
"after the heal rolls the schema apply forward, the same handle \
must accept rows of the recovered schema's types",
);
assert_eq!(helpers::count_rows(&db, "node:Tag").await, 1);
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
assert_eq!(std::fs::read_dir(&recovery_dir).unwrap().count(), 0);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn heal_does_not_promote_live_schema_apply_staging() {
use omnigraph::loader::LoadMode;
use std::sync::Arc;
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let db = Arc::new(Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap());
let failpoint = ScopedFailPoint::new("schema_apply.after_staging_write", "pause");
let apply_db = Arc::clone(&db);
let desired = format!("{}\nnode Tag {{ name: String @key }}\n", helpers::TEST_SCHEMA);
let apply = tokio::spawn(async move { apply_db.apply_schema(&desired).await });
let staging_pg = dir.path().join("_schema.pg.staging");
for _ in 0..500 {
if staging_pg.exists() {
break;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
assert!(staging_pg.exists(), "schema apply never reached the paused window");
let load_db = Arc::clone(&db);
let load = tokio::spawn(async move {
load_db
.load_as(
"main",
None,
"{\"type\":\"Person\",\"data\":{\"name\":\"Alice\",\"age\":30}}\n",
LoadMode::Merge,
None,
)
.await
});
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
drop(failpoint);
let apply_result = apply.await.unwrap();
let _ = tokio::time::timeout(std::time::Duration::from_secs(30), load)
.await
.expect("load must complete once the apply releases its guards")
.unwrap();
apply_result.expect(
"a concurrent write's heal must not promote the live schema \
apply's staging files out from under it",
);
assert_eq!(helpers::count_rows(&db, "node:Tag").await, 0);
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
assert_eq!(std::fs::read_dir(&recovery_dir).unwrap().count(), 0);
}
}
#[tokio::test]
async fn refresh_defers_rollback_eligible_sidecar_to_next_open() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"alice","age":30}}
"#,
LoadMode::Append,
)
.await
.unwrap();
let snapshot = db
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
.await
.unwrap();
let entry = snapshot.entry("node:Person").unwrap();
let person_uri = format!("{}/{}", uri.trim_end_matches('/'), entry.table_path);
let manifest_pin = entry.table_version;
let mut ds = lance::Dataset::open(&person_uri).await.unwrap();
helpers::lance_delete_inline(&mut ds, "1 = 2").await;
let head_after_drift = ds.version().version;
assert_eq!(head_after_drift, manifest_pin + 1);
let bogus_expected = manifest_pin - 1;
let bogus_post = head_after_drift;
let sidecar_json = format!(
r#"{{
"schema_version": 1,
"operation_id": "01H0000000000000000000RBCK",
"started_at": "0",
"branch": null,
"actor_id": "act-rollback",
"writer_kind": "Mutation",
"tables": [
{{
"table_key":"node:Person",
"table_path":"{}",
"expected_version":{},
"post_commit_pin":{}
}}
]
}}"#,
person_uri, bogus_expected, bogus_post,
);
let recovery_dir = dir.path().join("__recovery");
std::fs::create_dir_all(&recovery_dir).unwrap();
std::fs::write(
recovery_dir.join("01H0000000000000000000RBCK.json"),
&sidecar_json,
)
.unwrap();
let pre_head = lance::Dataset::open(&person_uri)
.await
.unwrap()
.version()
.version;
db.refresh()
.await
.expect("refresh must succeed (deferring rollback)");
assert_eq!(
std::fs::read_dir(&recovery_dir).unwrap().count(),
1,
"rollback-eligible sidecar must be deferred to next ReadWrite open",
);
let post_head = lance::Dataset::open(&person_uri)
.await
.unwrap()
.version()
.version;
assert_eq!(
pre_head, post_head,
"refresh-time recovery must NOT call Dataset::restore on Person; \
pre_head={pre_head}, post_head={post_head}",
);
let err = mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Grace")], &[("$age", 50)]),
)
.await
.unwrap_err();
assert!(
err.to_string()
.contains("a pending recovery sidecar requires rollback"),
"drift guard must point at a read-write reopen for sidecar-covered \
rollback-eligible drift; got: {err}"
);
drop(db);
let db = Omnigraph::open(&uri).await.unwrap();
let remaining = if recovery_dir.exists() {
std::fs::read_dir(&recovery_dir).unwrap().count()
} else {
0
};
assert_eq!(
remaining, 0,
"full sweep at next open must process the deferred sidecar",
);
let final_head = lance::Dataset::open(&person_uri)
.await
.unwrap()
.version()
.version;
assert!(
final_head > post_head,
"full sweep must run Dataset::restore (head advances); \
post_head={post_head}, final_head={final_head}",
);
let entry_version = db
.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
.await
.unwrap()
.entry("node:Person")
.unwrap()
.table_version;
assert_eq!(
entry_version, final_head,
"full-sweep roll-back must publish so manifest pin ({entry_version}) == Lance HEAD ({final_head})",
);
}
#[tokio::test]
async fn finalize_publisher_residual_does_not_drift_untouched_tables() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let mut db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA)
.await
.unwrap();
{
let _failpoint = ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let _ = mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.expect_err("synthetic failpoint must fire");
}
use omnigraph::loader::{LoadMode, load_jsonl};
load_jsonl(
&mut db,
r#"{"type": "Company", "data": {"name": "Acme"}}"#,
LoadMode::Append,
)
.await
.expect("Company write on a non-drifted table should succeed");
}
#[tokio::test]
async fn ensure_indices_stage_btree_failure_leaves_existing_tables_writable() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
mutate_main(
&mut db,
helpers::MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Alice")], &[("$age", 30)]),
)
.await
.expect("seed Person");
let indexed_schema = helpers::TEST_SCHEMA.replace("age: I32?", "age: I32? @index");
db.apply_schema(&indexed_schema)
.await
.expect("adding an @index is metadata-only and succeeds");
{
let _failpoint =
ScopedFailPoint::new("ensure_indices.post_stage_pre_commit_btree", "return");
let err = db.ensure_indices().await.unwrap_err();
assert!(
err.to_string()
.contains("ensure_indices.post_stage_pre_commit_btree"),
"ensure_indices should fail with the synthetic failpoint error, got: {err}"
);
}
use omnigraph::loader::{LoadMode, load_jsonl};
load_jsonl(
&mut db,
r#"{"type": "Company", "data": {"name": "Acme"}}"#,
LoadMode::Append,
)
.await
.expect("Company write on a table untouched by the failed ensure_indices should succeed");
}
fn assert_no_staging_files(graph: &std::path::Path) {
for name in [
"_schema.pg.staging",
"_schema.ir.json.staging",
"__schema_state.json.staging",
] {
let path = graph.join(name);
assert!(
!path.exists(),
"staging file {} still exists after recovery",
path.display()
);
}
}
#[tokio::test]
async fn schema_apply_without_schema_staging_rolls_back_on_next_open() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let operation_id;
{
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"alice","age":30}}
"#,
LoadMode::Append,
)
.await
.unwrap();
}
let pre_failure_version = {
let db = Omnigraph::open(&uri).await.unwrap();
version_main(&db).await.unwrap()
};
{
let db = Omnigraph::open(&uri).await.unwrap();
let _failpoint = ScopedFailPoint::new("schema_apply.before_staging_write", "return");
let v2_schema = r#"node Person {
name: String @key
age: I32?
city: String?
}
node Company {
name: String @key
}
node Tag {
label: String @key
}
edge Knows: Person -> Person {
since: Date?
}
edge WorksAt: Person -> Company
"#;
let err = db.apply_schema(v2_schema).await.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: schema_apply.before_staging_write"),
"unexpected error: {err}"
);
operation_id = single_sidecar_operation_id(dir.path());
}
let db = Omnigraph::open(&uri).await.unwrap();
assert!(
version_main(&db).await.unwrap() > pre_failure_version,
"roll-back publishes the restored (old-schema) version, advancing the manifest; \
pre={pre_failure_version}",
);
assert_eq!(
helpers::count_rows(&db, "node:Person").await,
1,
"old-schema data must remain readable after rollback"
);
drop(db);
assert_post_recovery_invariants(
dir.path(),
&operation_id,
RecoveryExpectation::RolledBack {
tables: vec![TableExpectation::main("node:Person")],
},
)
.await
.unwrap();
let live_schema = std::fs::read_to_string(dir.path().join("_schema.pg")).unwrap();
assert!(
!live_schema.contains("city: String?"),
"_schema.pg must keep the OLD schema when staging files never existed; got:\n{live_schema}",
);
assert!(
!live_schema.contains("node Tag"),
"_schema.pg must keep the OLD schema when staging files never existed; got:\n{live_schema}",
);
}
#[tokio::test]
async fn schema_apply_phase_b_failure_recovered_on_next_open() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let operation_id;
{
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"alice","age":30}}
"#,
LoadMode::Append,
)
.await
.unwrap();
}
let pre_failure_version = {
let db = Omnigraph::open(&uri).await.unwrap();
version_main(&db).await.unwrap()
};
{
let db = Omnigraph::open(&uri).await.unwrap();
let _failpoint = ScopedFailPoint::new("schema_apply.after_staging_write", "return");
let v2_schema = r#"node Person {
name: String @key
age: I32?
city: String?
}
node Company {
name: String @key
}
node Tag {
label: String @key
}
edge Knows: Person -> Person {
since: Date?
}
edge WorksAt: Person -> Company
"#;
let err = db.apply_schema(v2_schema).await.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: schema_apply.after_staging_write"),
"unexpected error: {err}"
);
let recovery_dir = dir.path().join("__recovery");
let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert_eq!(
sidecars.len(),
1,
"exactly one sidecar must persist after schema_apply failure"
);
operation_id = single_sidecar_operation_id(dir.path());
}
let db = Omnigraph::open(&uri).await.unwrap();
let post_recovery_version = version_main(&db).await.unwrap();
assert!(
post_recovery_version > pre_failure_version,
"manifest version must advance post-recovery; pre={pre_failure_version}, \
post={post_recovery_version}",
);
drop(db);
assert_post_recovery_invariants(
dir.path(),
&operation_id,
RecoveryExpectation::RolledForward {
tables: vec![TableExpectation::main("node:Person")],
},
)
.await
.unwrap();
let live_schema = std::fs::read_to_string(dir.path().join("_schema.pg")).unwrap();
assert!(
live_schema.contains("city: String?"),
"_schema.pg must reflect the NEW schema (city column added); got:\n{live_schema}",
);
assert!(
live_schema.contains("node Tag"),
"_schema.pg must reflect the NEW schema (Tag type added); got:\n{live_schema}",
);
let db = Omnigraph::open(&uri).await.unwrap();
let tag_rows = helpers::count_rows(&db, "node:Tag").await;
assert_eq!(
tag_rows, 0,
"node:Tag must have a manifest entry (with 0 rows) post-recovery; \
a panic here means recovery failed to register the added table"
);
}
#[tokio::test]
async fn optimize_phase_b_failure_recovered_on_next_open() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let operation_id;
{
let db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
for (name, age) in [("alice", 30), ("bob", 31), ("carol", 32), ("dave", 33)] {
db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", name)], &[("$age", age)]),
)
.await
.unwrap();
}
}
let pre_failure_version = {
let db = Omnigraph::open(&uri).await.unwrap();
version_main(&db).await.unwrap()
};
{
let db = Omnigraph::open(&uri).await.unwrap();
let _failpoint =
ScopedFailPoint::new("optimize.post_phase_b_pre_manifest_commit", "return");
let err = db.optimize().await.unwrap_err();
assert!(
err.to_string().contains(
"injected failpoint triggered: optimize.post_phase_b_pre_manifest_commit"
),
"unexpected error: {err}"
);
let recovery_dir = dir.path().join("__recovery");
let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert_eq!(
sidecars.len(),
1,
"exactly one Optimize sidecar must persist after optimize failure"
);
operation_id = single_sidecar_operation_id(dir.path());
}
let db = Omnigraph::open(&uri).await.unwrap();
let post_recovery_version = version_main(&db).await.unwrap();
assert!(
post_recovery_version > pre_failure_version,
"manifest version must advance post-recovery (compaction rolled forward); \
pre={pre_failure_version}, post={post_recovery_version}",
);
drop(db);
assert_post_recovery_invariants(
dir.path(),
&operation_id,
RecoveryExpectation::RolledForward {
tables: vec![TableExpectation::main("node:Person")],
},
)
.await
.unwrap();
let db = Omnigraph::open(&uri).await.unwrap();
let desired = helpers::TEST_SCHEMA.replace(
" age: I32?\n}",
" age: I32?\n nickname: String?\n}",
);
db.apply_schema(&desired)
.await
.expect("schema apply after optimize recovery must succeed");
}
#[tokio::test]
async fn branch_merge_phase_b_failure_recovered_on_next_open() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
{
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"alice","age":30}}
"#,
LoadMode::Append,
)
.await
.unwrap();
db.branch_create("feature").await.unwrap();
db.mutate(
"feature",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Bob")], &[("$age", 40)]),
)
.await
.unwrap();
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Carol")], &[("$age", 50)]),
)
.await
.unwrap();
}
let pre_failure_version = {
let db = Omnigraph::open(&uri).await.unwrap();
version_main(&db).await.unwrap()
};
{
let db = Omnigraph::open(&uri).await.unwrap();
let _failpoint =
ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return");
let err = db.branch_merge("feature", "main").await.unwrap_err();
assert!(
err.to_string().contains(
"injected failpoint triggered: branch_merge.post_phase_b_pre_manifest_commit"
),
"unexpected error: {err}"
);
let recovery_dir = dir.path().join("__recovery");
let sidecars: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert_eq!(
sidecars.len(),
1,
"exactly one sidecar must persist after branch_merge failure"
);
}
let db = Omnigraph::open(&uri).await.unwrap();
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
let remaining: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert!(
remaining.is_empty(),
"sidecar must be deleted; remaining: {:?}",
remaining,
);
}
let audit_dir = dir.path().join("_graph_commit_recoveries.lance");
assert!(
audit_dir.exists(),
"_graph_commit_recoveries.lance must exist after branch_merge recovery"
);
let post_recovery_version = version_main(&db).await.unwrap();
assert!(
post_recovery_version > pre_failure_version,
"manifest version must advance post-recovery; pre={pre_failure_version}, \
post={post_recovery_version}",
);
{
use arrow_array::{Array, StringArray};
use futures::TryStreamExt;
let commits_dir = dir.path().join("_graph_commits.lance");
let ds = lance::Dataset::open(commits_dir.to_str().unwrap())
.await
.unwrap();
let batches: Vec<arrow_array::RecordBatch> = ds
.scan()
.try_into_stream()
.await
.unwrap()
.try_collect()
.await
.unwrap();
let mut found_recovery_merge = false;
for batch in batches {
let merged = batch
.column_by_name("merged_parent_commit_id")
.expect("merged_parent_commit_id column present")
.as_any()
.downcast_ref::<StringArray>()
.expect("merged_parent_commit_id is Utf8");
for i in 0..merged.len() {
if !merged.is_null(i) {
found_recovery_merge = true;
break;
}
}
}
assert!(
found_recovery_merge,
"recovered branch_merge must record `merged_parent_commit_id` so future \
merges detect already-up-to-date — no merge-parent-tagged commit found",
);
}
drop(db);
}
#[tokio::test]
async fn branch_merge_phase_b_failure_recovered_on_non_main_target() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let operation_id;
let target_parent_commit_id;
{
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"alice","age":30}}
"#,
LoadMode::Append,
)
.await
.unwrap();
db.branch_create("target_branch").await.unwrap();
db.mutate(
"target_branch",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Bob")], &[("$age", 40)]),
)
.await
.unwrap();
db.branch_create("source_branch").await.unwrap();
db.mutate(
"source_branch",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Carol")], &[("$age", 50)]),
)
.await
.unwrap();
}
let main_person_pin = {
let db = Omnigraph::open(&uri).await.unwrap();
db.snapshot_of(omnigraph::db::ReadTarget::branch("main"))
.await
.unwrap()
.entry("node:Person")
.expect("main must have Person")
.table_version
};
target_parent_commit_id = branch_head_commit_id(dir.path(), "target_branch")
.await
.unwrap();
{
let db = Omnigraph::open(&uri).await.unwrap();
let _failpoint =
ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return");
let err = db
.branch_merge("source_branch", "target_branch")
.await
.unwrap_err();
assert!(
err.to_string().contains(
"injected failpoint triggered: branch_merge.post_phase_b_pre_manifest_commit"
),
"unexpected error: {err}"
);
let recovery_dir = dir.path().join("__recovery");
let sidecar_count = std::fs::read_dir(&recovery_dir).unwrap().count();
assert_eq!(
sidecar_count, 1,
"exactly one sidecar must persist after non-main branch_merge failure"
);
operation_id = single_sidecar_operation_id(dir.path());
}
let db = Omnigraph::open(&uri).await.unwrap();
drop(db);
assert_post_recovery_invariants(
dir.path(),
&operation_id,
RecoveryExpectation::RolledForward {
tables: vec![
TableExpectation::branch("node:Person", "target_branch")
.expected_main_manifest_pin(main_person_pin)
.expected_recovery_parent_commit_id(target_parent_commit_id),
],
},
)
.await
.unwrap();
}
#[tokio::test]
async fn branch_merge_sidecar_pins_table_branch_to_active_branch() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
{
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"alice","age":30}}
"#,
LoadMode::Append,
)
.await
.unwrap();
db.branch_create("target_branch").await.unwrap();
db.mutate(
"target_branch",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Bob")], &[("$age", 40)]),
)
.await
.unwrap();
db.branch_create("source_branch").await.unwrap();
db.mutate(
"source_branch",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Carol")], &[("$age", 50)]),
)
.await
.unwrap();
}
{
let db = Omnigraph::open(&uri).await.unwrap();
let _failpoint =
ScopedFailPoint::new("branch_merge.post_phase_b_pre_manifest_commit", "return");
let _ = db
.branch_merge("source_branch", "target_branch")
.await
.expect_err("failpoint must fire");
}
let operation_id = single_sidecar_operation_id(dir.path());
let sidecar_path = dir
.path()
.join("__recovery")
.join(format!("{operation_id}.json"));
let sidecar_json = std::fs::read_to_string(&sidecar_path).unwrap();
let sidecar: serde_json::Value = serde_json::from_str(&sidecar_json).unwrap();
let tables = sidecar["tables"]
.as_array()
.expect("sidecar tables must be an array");
assert!(
!tables.is_empty(),
"sidecar must pin at least one RewriteMerged table — both branches mutated Person"
);
for pin in tables {
let table_branch = pin
.get("table_branch")
.and_then(|v| v.as_str())
.unwrap_or_else(|| {
panic!(
"sidecar pin must record table_branch as the merge target (active_branch); \
got pin {pin:?}"
)
});
assert_eq!(
table_branch, "target_branch",
"sidecar pin must record `table_branch` as the merge target branch (where \
commits actually land via publish_rewritten_merge_table → open_for_mutation), \
NOT entry.table_branch from the target snapshot. See merge.rs filter_map and \
the rationale comment at table_ops.rs:115-120. Got pin: {pin:?}"
);
}
}
#[tokio::test]
async fn ensure_indices_phase_b_failure_does_not_leak_sidecar_when_no_work_needed() {
use omnigraph::loader::{LoadMode, load_jsonl};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
{
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
load_jsonl(
&mut db,
r#"{"type":"Person","data":{"name":"alice","age":30}}
{"type":"Person","data":{"name":"bob","age":25}}
"#,
LoadMode::Append,
)
.await
.unwrap();
}
{
let db = Omnigraph::open(&uri).await.unwrap();
let _failpoint =
ScopedFailPoint::new("ensure_indices.post_phase_b_pre_manifest_commit", "return");
let err = db.ensure_indices().await.unwrap_err();
assert!(
err.to_string().contains(
"injected failpoint triggered: ensure_indices.post_phase_b_pre_manifest_commit"
),
"unexpected error: {err}"
);
let recovery_dir = dir.path().join("__recovery");
let sidecars: Vec<_> = if recovery_dir.exists() {
std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect()
} else {
Vec::new()
};
assert!(
sidecars.is_empty(),
"steady-state ensure_indices must not leave a sidecar; got {:?}",
sidecars,
);
}
let _db = Omnigraph::open(&uri).await.unwrap();
let recovery_dir = dir.path().join("__recovery");
if recovery_dir.exists() {
let remaining: Vec<_> = std::fs::read_dir(&recovery_dir)
.unwrap()
.filter_map(|e| e.ok())
.collect();
assert!(
remaining.is_empty(),
"sidecar must remain deleted; remaining: {:?}",
remaining,
);
}
let audit_dir = dir.path().join("_graph_commit_recoveries.lance");
assert!(
!audit_dir.exists(),
"_graph_commit_recoveries.lance must NOT exist when no sidecar was processed"
);
}
#[tokio::test]
async fn init_failpoint_after_schema_pg_written_cleans_up_schema_file() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let _failpoint = ScopedFailPoint::new("init.after_schema_pg_written", "return");
let err = match Omnigraph::init(uri, helpers::TEST_SCHEMA).await {
Ok(_) => panic!("expected Omnigraph::init to fail at the configured failpoint"),
Err(e) => e,
};
assert!(
err.to_string()
.contains("injected failpoint triggered: init.after_schema_pg_written"),
"got: {err}"
);
assert!(
!dir.path().join("_schema.pg").exists(),
"_schema.pg must be cleaned up after init failure"
);
}
#[tokio::test]
async fn init_failpoint_after_schema_contract_written_cleans_up_all_schema_files() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let _failpoint = ScopedFailPoint::new("init.after_schema_contract_written", "return");
let err = match Omnigraph::init(uri, helpers::TEST_SCHEMA).await {
Ok(_) => panic!("expected Omnigraph::init to fail at the configured failpoint"),
Err(e) => e,
};
assert!(
err.to_string()
.contains("injected failpoint triggered: init.after_schema_contract_written"),
"got: {err}"
);
assert!(
!dir.path().join("_schema.pg").exists(),
"_schema.pg must be cleaned up"
);
assert!(
!dir.path().join("_schema.ir.json").exists(),
"_schema.ir.json must be cleaned up"
);
assert!(
!dir.path().join("__schema_state.json").exists(),
"__schema_state.json must be cleaned up"
);
}
#[tokio::test]
async fn init_failpoint_after_coordinator_init_cleans_up_schema_files() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let _failpoint = ScopedFailPoint::new("init.after_coordinator_init", "return");
let err = match Omnigraph::init(uri, helpers::TEST_SCHEMA).await {
Ok(_) => panic!("expected Omnigraph::init to fail at the configured failpoint"),
Err(e) => e,
};
assert!(
err.to_string()
.contains("injected failpoint triggered: init.after_coordinator_init"),
"got: {err}"
);
assert!(
!dir.path().join("_schema.pg").exists(),
"_schema.pg must be cleaned up after late-phase init failure"
);
assert!(
!dir.path().join("_schema.ir.json").exists(),
"_schema.ir.json must be cleaned up after late-phase init failure"
);
assert!(
!dir.path().join("__schema_state.json").exists(),
"__schema_state.json must be cleaned up after late-phase init failure"
);
}
#[tokio::test]
async fn init_failpoint_returns_original_error_not_cleanup_error() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let _failpoint = ScopedFailPoint::new("init.after_schema_pg_written", "return");
let err = match Omnigraph::init(uri, helpers::TEST_SCHEMA).await {
Ok(_) => panic!("expected Omnigraph::init to fail at the configured failpoint"),
Err(e) => e,
};
let msg = err.to_string();
assert!(
msg.contains("init.after_schema_pg_written"),
"init error must surface the failpoint cause, got: {msg}"
);
}