omnigraph-engine 0.4.1

Runtime engine for the Omnigraph graph database.
Documentation
#![cfg(feature = "failpoints")]

mod helpers;

use fail::FailScenario;
use omnigraph::db::Omnigraph;
use omnigraph::failpoints::ScopedFailPoint;

use helpers::{MUTATION_QUERIES, mixed_params, mutate_main};

const SCHEMA_V1: &str = "node Person { name: String @key }\n";
const SCHEMA_V2_ADDED_TYPE: &str =
    "node Person { name: String @key }\nnode Company { name: String @key }\n";

#[tokio::test]
async fn branch_create_failpoint_triggers() {
    let _scenario = FailScenario::setup();
    let dir = tempfile::tempdir().unwrap();
    let uri = dir.path().to_str().unwrap();
    let mut db = Omnigraph::init(uri, helpers::TEST_SCHEMA).await.unwrap();
    let _failpoint = ScopedFailPoint::new("branch_create.after_manifest_branch_create", "return");

    let err = db.branch_create("feature").await.unwrap_err();
    assert!(
        err.to_string()
            .contains("injected failpoint triggered: branch_create.after_manifest_branch_create")
    );
}

#[tokio::test]
async fn graph_publish_failpoint_triggers_before_commit_append() {
    let _scenario = FailScenario::setup();
    let dir = tempfile::tempdir().unwrap();
    let mut db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA)
        .await
        .unwrap();
    let _failpoint = ScopedFailPoint::new("graph_publish.before_commit_append", "return");

    let err = mutate_main(
        &mut db,
        MUTATION_QUERIES,
        "insert_person",
        &mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
    )
    .await
    .unwrap_err();
    assert!(
        err.to_string()
            .contains("injected failpoint triggered: graph_publish.before_commit_append")
    );
}

// Atomic schema apply: schema apply writes staging files first, then commits
// the manifest, then renames staging → final. Tests below inject crashes at
// the two boundaries and assert that reopening the repo yields a consistent
// state.

#[tokio::test]
async fn schema_apply_recovers_pre_commit_crash() {
    let _scenario = FailScenario::setup();
    let dir = tempfile::tempdir().unwrap();
    let uri = dir.path().to_str().unwrap().to_string();

    {
        let mut db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap();
        let _failpoint = ScopedFailPoint::new("schema_apply.after_staging_write", "return");
        let err = db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap_err();
        assert!(
            err.to_string()
                .contains("injected failpoint triggered: schema_apply.after_staging_write"),
            "got: {}",
            err
        );
    }

    // Reopen — recovery sweep should delete staging files and keep the
    // original schema, since the manifest commit never happened.
    let db = Omnigraph::open(&uri).await.unwrap();
    assert_eq!(db.schema_source(), SCHEMA_V1);
    assert_no_staging_files(dir.path());
}

#[tokio::test]
async fn schema_apply_recovers_post_commit_crash() {
    let _scenario = FailScenario::setup();
    let dir = tempfile::tempdir().unwrap();
    let uri = dir.path().to_str().unwrap().to_string();

    {
        let mut db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap();
        let _failpoint = ScopedFailPoint::new("schema_apply.after_manifest_commit", "return");
        let err = db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap_err();
        assert!(
            err.to_string()
                .contains("injected failpoint triggered: schema_apply.after_manifest_commit"),
            "got: {}",
            err
        );
    }

    // Reopen — manifest is at the new version, so recovery sweep should
    // complete the rename and the live schema matches v2.
    let db = Omnigraph::open(&uri).await.unwrap();
    assert_eq!(db.schema_source(), SCHEMA_V2_ADDED_TYPE);
    assert_no_staging_files(dir.path());
}

#[tokio::test]
async fn schema_apply_recovers_partial_rename() {
    // Construct a partial-rename state: _schema.pg has been renamed in
    // (matching v2), but _schema.ir.json.staging and __schema_state.json.staging
    // were never renamed. Recovery should detect that the live source matches
    // the staging state's hash and complete the remaining renames.
    let _scenario = FailScenario::setup();
    let dir = tempfile::tempdir().unwrap();
    let uri = dir.path().to_str().unwrap().to_string();

    {
        let mut db = Omnigraph::init(&uri, SCHEMA_V1).await.unwrap();
        db.apply_schema(SCHEMA_V2_ADDED_TYPE).await.unwrap();
    }

    // Simulate: one of the renames (the IR or state file) didn't complete by
    // copying the live ir/state files back to their staging names.
    std::fs::copy(
        dir.path().join("_schema.ir.json"),
        dir.path().join("_schema.ir.json.staging"),
    )
    .unwrap();
    std::fs::copy(
        dir.path().join("__schema_state.json"),
        dir.path().join("__schema_state.json.staging"),
    )
    .unwrap();

    // Reopen — recovery should complete the rename (overwriting final files
    // with identical staging content) and remove the staging files.
    let db = Omnigraph::open(&uri).await.unwrap();
    assert_eq!(db.schema_source(), SCHEMA_V2_ADDED_TYPE);
    assert_no_staging_files(dir.path());
}

/// Pin the documented "finalize → publisher residual" of the
/// staged-write commit path.
///
/// `MutationStaging::finalize` runs `commit_staged` per touched table
/// sequentially before the publisher commits the manifest. Lance has no
/// multi-dataset atomic commit primitive, so a failure between the
/// per-table staged commits and the manifest commit leaves Lance HEAD
/// advanced on the touched tables with no manifest update — and the
/// next mutation surfaces `ExpectedVersionMismatch` on those tables.
///
/// This isn't a code bug we can fix without an upstream Lance change;
/// it's the documented residual (see `docs/runs.md` "Finalize →
/// publisher residual"). The test pins the behavior so future code
/// changes catch any silent regression: if someone widens the residual
/// (e.g. failing earlier in finalize without rolling back), this test
/// will surface a different error than `ExpectedVersionMismatch`. If
/// someone narrows the residual (e.g. lance ships multi-dataset commit
/// and we plumb it), this test will start passing the next mutation
/// — and someone has to update the assertion + the docs.
#[tokio::test]
async fn finalize_publisher_residual_drifts_lance_head_until_next_writer_recovers() {
    use omnigraph::error::{ManifestConflictDetails, OmniError};

    let _scenario = FailScenario::setup();
    let dir = tempfile::tempdir().unwrap();
    let mut db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA)
        .await
        .unwrap();

    {
        let _failpoint =
            ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");

        // First mutation: finalize succeeds (commit_staged advances Lance
        // HEAD on node:Person), then the failpoint kicks before the
        // publisher's manifest commit. The caller sees the synthetic
        // error.
        let err = mutate_main(
            &mut db,
            MUTATION_QUERIES,
            "insert_person",
            &mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
        )
        .await
        .unwrap_err();
        assert!(
            err.to_string().contains(
                "injected failpoint triggered: mutation.post_finalize_pre_publisher"
            ),
            "unexpected error: {err}"
        );
    }
    // Failpoint dropped — subsequent calls are not synthetic-failed.

    // Next mutation against the same table surfaces the documented
    // residual: Lance HEAD on node:Person advanced (commit_staged ran),
    // manifest didn't, so the publisher CAS at next-mutation time
    // surfaces ExpectedVersionMismatch.
    let err = mutate_main(
        &mut db,
        MUTATION_QUERIES,
        "insert_person",
        &mixed_params(&[("$name", "Frank")], &[("$age", 33)]),
    )
    .await
    .unwrap_err();
    let OmniError::Manifest(manifest_err) = err else {
        panic!("expected Manifest error, got {err:?}");
    };
    let Some(ManifestConflictDetails::ExpectedVersionMismatch {
        ref table_key,
        expected,
        actual,
    }) = manifest_err.details
    else {
        panic!(
            "expected ExpectedVersionMismatch (the documented residual), got {:?}",
            manifest_err.details
        );
    };
    assert_eq!(
        table_key, "node:Person",
        "drift should be on the table the failed finalize touched"
    );
    assert!(
        actual > expected,
        "Lance HEAD on the drifted table should be ahead of manifest pinned: actual={actual} expected={expected}",
    );
}

/// Companion to the above — confirms that a finalize→publisher failure
/// on one table leaves OTHER tables untouched. Subsequent writes to
/// non-drifted tables proceed normally; the drift is contained.
#[tokio::test]
async fn finalize_publisher_residual_does_not_drift_untouched_tables() {
    let _scenario = FailScenario::setup();
    let dir = tempfile::tempdir().unwrap();
    let mut db = Omnigraph::init(dir.path().to_str().unwrap(), helpers::TEST_SCHEMA)
        .await
        .unwrap();

    {
        let _failpoint =
            ScopedFailPoint::new("mutation.post_finalize_pre_publisher", "return");
        let _ = mutate_main(
            &mut db,
            MUTATION_QUERIES,
            "insert_person",
            &mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
        )
        .await
        .expect_err("synthetic failpoint must fire");
    }

    // node:Person drifted. node:Company didn't — try a Company write.
    use omnigraph::loader::{LoadMode, load_jsonl};
    load_jsonl(
        &mut db,
        r#"{"type": "Company", "data": {"name": "Acme"}}"#,
        LoadMode::Append,
    )
    .await
    .expect("Company write on a non-drifted table should succeed");
}

/// MR-793 Phase 4 acceptance bar — proves that a Phase A failure in
/// the staged-index path (`stage_create_btree_index` succeeded;
/// `commit_staged` not yet called) leaves NO Lance-HEAD drift on the
/// existing tables. Subsequent operations against those tables succeed
/// without `ExpectedVersionMismatch`.
///
/// Path: `apply_schema(v1 → v2)` adds a new node type. The
/// `added_tables` loop in `schema_apply` creates the empty dataset and
/// then calls `build_indices_on_dataset_for_catalog` →
/// `stage_and_commit_btree(..., &["id"])`. The failpoint fires
/// between `stage_create_btree_index` and `commit_staged`, so the
/// staged segments are written under `_indices/<uuid>/` but Lance HEAD
/// on the new dataset is unchanged at v=1. The schema-apply lock
/// branch is released by `apply_schema`'s outer match. Existing
/// tables (e.g. `node:Person`) are completely untouched by the new
/// node's added_tables iteration — they're outside the failed apply
/// path entirely — and we assert that mutations against them continue
/// to work.
///
/// The orphan empty dataset from the failed apply is acceptable
/// residual: it's unreferenced by `__manifest` and will be reclaimed
/// by `cleanup_old_versions` (or removed when a future apply at the
/// same target path resolves the rename).
#[tokio::test]
async fn ensure_indices_phase_a_btree_failure_leaves_existing_tables_writable() {
    let _scenario = FailScenario::setup();
    let dir = tempfile::tempdir().unwrap();
    let uri = dir.path().to_str().unwrap().to_string();

    // Init with TEST_SCHEMA which declares Person + Knows. Indices on
    // those tables get built during init.
    let mut db = Omnigraph::init(&uri, helpers::TEST_SCHEMA).await.unwrap();

    // Apply a schema that adds a new node type. The added_tables loop
    // will hit the failpoint between stage and commit on the new
    // node:Project table's btree-on-id build. (TEST_SCHEMA already
    // has Person + Company + Knows + WorksAt — pick a name that isn't
    // already declared.)
    let extended_schema = format!("{}\nnode Project {{ name: String @key }}\n", helpers::TEST_SCHEMA);

    {
        let _failpoint = ScopedFailPoint::new(
            "ensure_indices.post_stage_pre_commit_btree",
            "return",
        );
        let err = db.apply_schema(&extended_schema).await.unwrap_err();
        assert!(
            err.to_string()
                .contains("ensure_indices.post_stage_pre_commit_btree"),
            "schema apply should fail with the synthetic failpoint error, got: {err}"
        );
    }

    // Existing tables stayed at their pre-apply versions; subsequent
    // mutations against them succeed (no Lance-HEAD drift).
    mutate_main(
        &mut db,
        helpers::MUTATION_QUERIES,
        "insert_person",
        &mixed_params(&[("$name", "Eve")], &[("$age", 22)]),
    )
    .await
    .expect("Person mutation must succeed after the failed schema apply — existing tables are not drifted");
}

fn assert_no_staging_files(repo: &std::path::Path) {
    for name in [
        "_schema.pg.staging",
        "_schema.ir.json.staging",
        "__schema_state.json.staging",
    ] {
        let path = repo.join(name);
        assert!(
            !path.exists(),
            "staging file {} still exists after recovery",
            path.display()
        );
    }
}