#![cfg(feature = "failpoints")]
mod helpers;
use fail::FailScenario;
use omnigraph::db::Omnigraph;
use omnigraph::failpoints::ScopedFailPoint;
use helpers::{MUTATION_QUERIES, mixed_params, mutate_main};
const SCHEMA_V1: &str = "node Person { name: String @key }\n";
const SCHEMA_V2_ADDED_TYPE: &str =
"node Person { name: String @key }\nnode Company { name: String @key }\n";
#[tokio::test]
async fn branch_create_failpoint_triggers() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, helpers::TEST_SCHEMA).await.unwrap();
let _failpoint = ScopedFailPoint::new("branch_create.after_manifest_branch_create", "return");
let err = db.branch_create("feature").await.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: branch_create.after_manifest_branch_create")
);
}
#[tokio::test]
async fn graph_publish_failpoint_triggers_before_commit_append() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let mut db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA)
.await
.unwrap();
let _failpoint = ScopedFailPoint::new("graph_publish.before_commit_append", "return");
let err = mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: graph_publish.before_commit_append")
);
}
#[tokio::test]
async fn schema_apply_recovers_pre_commit_crash() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
{
let mut db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap();
let _failpoint = ScopedFailPoint::new("schema_apply.after_staging_write", "return");
let err = db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: schema_apply.after_staging_write"),
"got: {}",
err
);
}
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(db.schema_source(), SCHEMA_V1);
assert_no_staging_files(dir.path());
}
#[tokio::test]
async fn schema_apply_recovers_post_commit_crash() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
{
let mut db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap();
let _failpoint = ScopedFailPoint::new("schema_apply.after_manifest_commit", "return");
let err = db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap_err();
assert!(
err.to_string()
.contains("injected failpoint triggered: schema_apply.after_manifest_commit"),
"got: {}",
err
);
}
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(db.schema_source(), SCHEMA_V2_ADDED_TYPE);
assert_no_staging_files(dir.path());
}
#[tokio::test]
async fn schema_apply_recovers_partial_rename() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
{
let mut db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap();
db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap();
}
std::fs::copy(
dir.path().join("_schema.ir.json"),
dir.path().join("_schema.ir.json.staging"),
)
.unwrap();
std::fs::copy(
dir.path().join("__schema_state.json"),
dir.path().join("__schema_state.json.staging"),
)
.unwrap();
let db = Omnigraph::open(&uri).await.unwrap();
assert_eq!(db.schema_source(), SCHEMA_V2_ADDED_TYPE);
assert_no_staging_files(dir.path());
}
#[tokio::test]
async fn finalize_publisher_residual_drifts_lance_head_until_next_writer_recovers() {
use omnigraph::error::{ManifestConflictDetails, OmniError};
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let mut db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA)
.await
.unwrap();
{
let _failpoint =
ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let err = mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.unwrap_err();
assert!(
err.to_string().contains(
"injected failpoint triggered: mutation.post_finalize_pre_publisher"
),
"unexpected error: {err}"
);
}
let err = mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
)
.await
.unwrap_err();
let OmniError::Manifest(manifest_err) = err else {
panic!("expected Manifest error, got {err:?}");
};
let Some(ManifestConflictDetails::ExpectedVersionMismatch {
ref table_key,
expected,
actual,
}) = manifest_err.details
else {
panic!(
"expected ExpectedVersionMismatch (the documented residual), got {:?}",
manifest_err.details
);
};
assert_eq!(
table_key, "node:Person",
"drift should be on the table the failed finalize touched"
);
assert!(
actual > expected,
"Lance HEAD on the drifted table should be ahead of manifest pinned: actual={actual} expected={expected}",
);
}
#[tokio::test]
async fn finalize_publisher_residual_does_not_drift_untouched_tables() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let mut db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA)
.await
.unwrap();
{
let _failpoint =
ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
let _ = mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.expect_err("synthetic failpoint must fire");
}
use omnigraph::loader::{LoadMode, load_jsonl};
load_jsonl(
&mut db,
r#"{"type": "Company", "data": {"name": "Acme"}}"#,
LoadMode::Append,
)
.await
.expect("Company write on a non-drifted table should succeed");
}
#[tokio::test]
async fn ensure_indices_phase_a_btree_failure_leaves_existing_tables_writable() {
let _scenario = FailScenario::setup();
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();
let extended_schema = format!("{}\nnode Project {{ name: String @key }}\n", helpers::TEST_SCHEMA);
{
let _failpoint = ScopedFailPoint::new(
"ensure_indices.post_stage_pre_commit_btree",
"return",
);
let err = db.apply_schema(&extended_schema).await.unwrap_err();
assert!(
err.to_string()
.contains("ensure_indices.post_stage_pre_commit_btree"),
"schema apply should fail with the synthetic failpoint error, got: {err}"
);
}
mutate_main(
&mut db,
helpers::MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
)
.await
.expect("Person mutation must succeed after the failed schema apply — existing tables are not drifted");
}
fn assert_no_staging_files(repo: &std::path::Path) {
for name in [
"_schema.pg.staging",
"_schema.ir.json.staging",
"__schema_state.json.staging",
] {
let path = repo.join(name);
assert!(
!path.exists(),
"staging file {} still exists after recovery",
path.display()
);
}
}