mod helpers;
use std::time::Duration;
use lance::Dataset;
use lance::dataset::optimize::{CompactionOptions, compact_files};
use omnigraph::db::{
CleanupPolicyOptions, Omnigraph, ReadTarget, RepairAction, RepairClassification, RepairOptions,
SkipReason,
};
use omnigraph::loader::{LoadMode, load_jsonl};
use omnigraph::table_store::{IndexCoverage, TableStore};
use helpers::{
MUTATION_QUERIES, TEST_DATA, TEST_SCHEMA, count_rows, init_and_load, mixed_params, mutate_main,
snapshot_main,
};
fn node_table_uri(root: &str, type_name: &str) -> String {
let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
for &b in type_name.as_bytes() {
hash ^= b as u64;
hash = hash.wrapping_mul(0x100_0000_01b3);
}
format!("{}/nodes/{hash:016x}", root.trim_end_matches('/'))
}
async fn person_manifest_and_head(db: &Omnigraph, root: &str) -> (u64, u64, String) {
let snap = db.snapshot_of(ReadTarget::branch("main")).await.unwrap();
let entry = snap.entry("node:Person").unwrap();
let full = format!("{}/{}", root.trim_end_matches('/'), entry.table_path);
let head = Dataset::open(&full).await.unwrap().version().version;
(entry.table_version, head, full)
}
async fn add_person_fragments(db: &mut Omnigraph) {
for (name, age) in [("Eve", 40), ("Frank", 41), ("Grace", 42), ("Heidi", 43)] {
mutate_main(
db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", name)], &[("$age", age as i64)]),
)
.await
.expect("insert");
}
}
async fn forge_person_compaction_drift(db: &mut Omnigraph, root: &str) -> (u64, u64, String) {
add_person_fragments(db).await;
let (manifest_version, _, full) = person_manifest_and_head(db, root).await;
let mut ds = Dataset::open(&full).await.unwrap();
let metrics = compact_files(&mut ds, CompactionOptions::default(), None)
.await
.expect("raw Lance compaction");
let lance_head_version = ds.version().version;
assert!(
lance_head_version > manifest_version,
"raw Lance compaction should advance HEAD beyond manifest"
);
assert!(
metrics.fragments_removed > 0 || metrics.fragments_added > 0,
"test precondition: raw compaction should rewrite fragments"
);
(manifest_version, lance_head_version, full)
}
async fn forge_person_delete_drift(db: &Omnigraph, root: &str) -> (u64, u64, String) {
let (manifest_version, _, full) = person_manifest_and_head(db, root).await;
let mut ds = Dataset::open(&full).await.unwrap();
let deleted = ds.delete("name = 'Alice'").await.expect("raw Lance delete");
assert_eq!(deleted.num_deleted_rows, 1, "fixture should delete Alice");
let lance_head_version = deleted.new_dataset.version().version;
assert!(
lance_head_version > manifest_version,
"raw Lance delete should advance HEAD beyond manifest"
);
(manifest_version, lance_head_version, full)
}
#[tokio::test]
async fn optimize_on_empty_graph_returns_stats_per_table_with_no_changes() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let stats = db.optimize().await.unwrap();
assert_eq!(stats.len(), 4);
for s in &stats {
assert_eq!(s.fragments_removed, 0, "{} should not remove", s.table_key);
assert_eq!(s.fragments_added, 0, "{} should not add", s.table_key);
}
}
#[tokio::test]
async fn optimize_after_load_then_again_is_idempotent() {
let dir = tempfile::tempdir().unwrap();
let db = init_and_load(&dir).await;
let _first = db.optimize().await.unwrap();
let second = db.optimize().await.unwrap();
for s in &second {
assert_eq!(
s.fragments_removed, 0,
"{} re-optimize should be no-op",
s.table_key
);
assert_eq!(
s.fragments_added, 0,
"{} re-optimize should be no-op",
s.table_key
);
assert!(
!s.committed,
"{} re-optimize should not commit a new version",
s.table_key
);
}
}
#[tokio::test]
async fn optimize_reindexes_fragments_appended_after_index_build() {
const SCHEMA: &str = r#"
node Doc {
slug: String @key
rank: I32 @index
}
"#;
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, SCHEMA).await.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Doc\",\"data\":{\"slug\":\"d1\",\"rank\":1}}\n\
{\"type\":\"Doc\",\"data\":{\"slug\":\"d2\",\"rank\":2}}",
LoadMode::Merge,
)
.await
.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Doc\",\"data\":{\"slug\":\"d3\",\"rank\":3}}\n\
{\"type\":\"Doc\",\"data\":{\"slug\":\"d4\",\"rank\":4}}",
LoadMode::Merge,
)
.await
.unwrap();
{
let snap = snapshot_main(&db).await.unwrap();
let ds = snap.open("node:Doc").await.unwrap();
assert!(
TableStore::has_unindexed_fragments(&ds).await.unwrap(),
"appended fragment should be unindexed before optimize"
);
}
db.optimize().await.unwrap();
let snap = snapshot_main(&db).await.unwrap();
let ds = snap.open("node:Doc").await.unwrap();
assert!(
!TableStore::has_unindexed_fragments(&ds).await.unwrap(),
"optimize must extend index coverage to all fragments"
);
assert_eq!(
TableStore::key_column_index_coverage(&ds, "rank")
.await
.unwrap(),
IndexCoverage::Indexed,
"rank BTREE must cover all fragments after optimize"
);
}
#[tokio::test]
async fn optimize_skips_blob_table_and_reports_skip() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let schema = "\
node Doc {\n slug: String @key\n content: Blob\n}\n\
node Tag {\n slug: String @key\n}\n";
let mut db = Omnigraph::init(uri, schema).await.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Doc\",\"data\":{\"slug\":\"d1\",\"content\":\"base64:aGVsbG8x\"}}\n{\"type\":\"Doc\",\"data\":{\"slug\":\"d2\",\"content\":\"base64:aGVsbG8y\"}}",
LoadMode::Overwrite,
)
.await
.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Doc\",\"data\":{\"slug\":\"d3\",\"content\":\"base64:aGVsbG8z\"}}",
LoadMode::Merge,
)
.await
.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Doc\",\"data\":{\"slug\":\"d4\",\"content\":\"base64:aGVsbG80\"}}",
LoadMode::Merge,
)
.await
.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Tag\",\"data\":{\"slug\":\"t1\"}}\n{\"type\":\"Tag\",\"data\":{\"slug\":\"t2\"}}",
LoadMode::Merge,
)
.await
.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Tag\",\"data\":{\"slug\":\"t3\"}}",
LoadMode::Merge,
)
.await
.unwrap();
let stats = db
.optimize()
.await
.expect("optimize must not crash on a graph with a Blob table");
let doc = stats
.iter()
.find(|s| s.table_key == "node:Doc")
.expect("Doc stat present");
let tag = stats
.iter()
.find(|s| s.table_key == "node:Tag")
.expect("Tag stat present");
assert_eq!(
doc.skipped,
Some(SkipReason::BlobColumnsUnsupportedByLance),
"blob table must be reported as skipped",
);
assert!(!doc.committed, "skipped blob table is not compacted");
assert_eq!(doc.fragments_removed, 0);
assert_eq!(doc.fragments_added, 0);
assert_eq!(tag.skipped, None, "non-blob table must not be skipped");
}
#[tokio::test]
async fn optimize_publishes_compaction_to_manifest_so_schema_apply_succeeds() {
let dir = tempfile::tempdir().unwrap();
let root = dir
.path()
.to_str()
.unwrap()
.trim_end_matches('/')
.to_string();
let mut db = init_and_load(&dir).await;
for (name, age) in [("Eve", 40), ("Frank", 41), ("Grace", 42), ("Heidi", 43)] {
mutate_main(
&mut db,
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", name)], &[("$age", age as i64)]),
)
.await
.expect("insert");
}
let stats = db.optimize().await.unwrap();
let person = stats
.iter()
.find(|s| s.table_key == "node:Person")
.expect("Person stat present");
assert!(
person.committed,
"Person is multi-fragment, so optimize must have compacted it"
);
let snap = db.snapshot_of(ReadTarget::branch("main")).await.unwrap();
let entry = snap.entry("node:Person").unwrap();
let manifest_version = entry.table_version;
let full = format!("{}/{}", root, entry.table_path);
let lance_head = Dataset::open(&full).await.unwrap().version().version;
assert_eq!(
manifest_version, lance_head,
"after optimize, manifest table_version ({manifest_version}) must equal Lance HEAD ({lance_head})",
);
assert_eq!(count_rows(&db, "node:Person").await, 8);
let desired = TEST_SCHEMA.replace(
" age: I32?\n}",
" age: I32?\n nickname: String?\n}",
);
let result = db
.apply_schema(&desired)
.await
.expect("additive schema apply after optimize must succeed");
assert!(result.applied, "schema apply should report applied=true");
}
#[tokio::test]
async fn optimize_skips_preexisting_manifest_head_drift() {
let dir = tempfile::tempdir().unwrap();
let root = dir
.path()
.to_str()
.unwrap()
.trim_end_matches('/')
.to_string();
let mut db = init_and_load(&dir).await;
let (manifest_before, head_before, _) = forge_person_compaction_drift(&mut db, &root).await;
let stats = db.optimize().await.unwrap();
let person = stats
.iter()
.find(|s| s.table_key == "node:Person")
.expect("Person stat present");
assert_eq!(person.skipped, Some(SkipReason::DriftNeedsRepair));
assert!(!person.committed);
assert_eq!(person.manifest_version, Some(manifest_before));
assert_eq!(person.lance_head_version, Some(head_before));
let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await;
assert_eq!(
manifest_after, manifest_before,
"optimize must not publish uncovered drift"
);
assert_eq!(
head_after, head_before,
"optimize must not move drifted HEAD"
);
}
#[tokio::test]
async fn repair_preview_reports_verified_maintenance_drift_without_healing() {
let dir = tempfile::tempdir().unwrap();
let root = dir
.path()
.to_str()
.unwrap()
.trim_end_matches('/')
.to_string();
let mut db = init_and_load(&dir).await;
let (manifest_before, head_before, _) = forge_person_compaction_drift(&mut db, &root).await;
let stats = db
.repair(RepairOptions {
confirm: false,
force: false,
})
.await
.unwrap();
assert_eq!(stats.manifest_version, None);
let person = stats
.tables
.iter()
.find(|s| s.table_key == "node:Person")
.expect("Person repair stat present");
assert_eq!(
person.classification,
RepairClassification::VerifiedMaintenance
);
assert_eq!(person.action, RepairAction::Preview);
assert_eq!(person.manifest_version, manifest_before);
assert_eq!(person.lance_head_version, head_before);
assert!(
person
.operations
.iter()
.all(|op| op == "ReserveFragments" || op == "Rewrite"),
"maintenance drift should only include Lance maintenance operations: {:?}",
person.operations
);
let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await;
assert_eq!(manifest_after, manifest_before);
assert_eq!(head_after, head_before);
}
#[tokio::test]
async fn repair_confirm_heals_verified_maintenance_drift() {
let dir = tempfile::tempdir().unwrap();
let root = dir
.path()
.to_str()
.unwrap()
.trim_end_matches('/')
.to_string();
let mut db = init_and_load(&dir).await;
let (_, head_before, _) = forge_person_compaction_drift(&mut db, &root).await;
let stats = db
.repair(RepairOptions {
confirm: true,
force: false,
})
.await
.unwrap();
assert!(
stats.manifest_version.is_some(),
"confirmed repair should publish one manifest commit"
);
let person = stats
.tables
.iter()
.find(|s| s.table_key == "node:Person")
.expect("Person repair stat present");
assert_eq!(
person.classification,
RepairClassification::VerifiedMaintenance
);
assert_eq!(person.action, RepairAction::Healed);
let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await;
assert_eq!(manifest_after, head_before);
assert_eq!(head_after, head_before);
let desired = TEST_SCHEMA.replace(
" age: I32?\n}",
" age: I32?\n nickname: String?\n}",
);
let result = db
.apply_schema(&desired)
.await
.expect("strict schema apply should succeed after repair");
assert!(result.applied);
}
#[tokio::test]
async fn repair_refuses_raw_delete_without_force() {
let dir = tempfile::tempdir().unwrap();
let root = dir
.path()
.to_str()
.unwrap()
.trim_end_matches('/')
.to_string();
let db = init_and_load(&dir).await;
let (manifest_before, head_before, _) = forge_person_delete_drift(&db, &root).await;
let stats = db
.repair(RepairOptions {
confirm: true,
force: false,
})
.await
.unwrap();
assert_eq!(stats.manifest_version, None);
let person = stats
.tables
.iter()
.find(|s| s.table_key == "node:Person")
.expect("Person repair stat present");
assert_eq!(person.classification, RepairClassification::Suspicious);
assert_eq!(person.action, RepairAction::Refused);
assert!(
person.operations.iter().any(|op| op == "Delete"),
"raw Lance delete should be reported as a suspicious operation: {:?}",
person.operations
);
let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await;
assert_eq!(manifest_after, manifest_before);
assert_eq!(head_after, head_before);
assert_eq!(
count_rows(&db, "node:Person").await,
4,
"manifest-pinned reads should still see the pre-delete version"
);
}
#[tokio::test]
async fn repair_force_heals_suspicious_drift() {
let dir = tempfile::tempdir().unwrap();
let root = dir
.path()
.to_str()
.unwrap()
.trim_end_matches('/')
.to_string();
let db = init_and_load(&dir).await;
let (_, head_before, _) = forge_person_delete_drift(&db, &root).await;
let stats = db
.repair(RepairOptions {
confirm: true,
force: true,
})
.await
.unwrap();
let person = stats
.tables
.iter()
.find(|s| s.table_key == "node:Person")
.expect("Person repair stat present");
assert_eq!(person.classification, RepairClassification::Suspicious);
assert_eq!(person.action, RepairAction::Forced);
let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await;
assert_eq!(manifest_after, head_before);
assert_eq!(head_after, head_before);
assert_eq!(
count_rows(&db, "node:Person").await,
3,
"forced repair publishes the raw delete's HEAD"
);
}
#[tokio::test]
async fn non_strict_load_refuses_uncovered_drift_before_folding_it() {
let dir = tempfile::tempdir().unwrap();
let root = dir
.path()
.to_str()
.unwrap()
.trim_end_matches('/')
.to_string();
let mut db = init_and_load(&dir).await;
let (manifest_before, head_before, _) = forge_person_compaction_drift(&mut db, &root).await;
let err = load_jsonl(
&mut db,
"{\"type\":\"Person\",\"data\":{\"name\":\"Ivan\",\"age\":44}}",
LoadMode::Merge,
)
.await
.expect_err("merge load must not silently fold uncovered drift");
assert!(
err.to_string().contains("omnigraph repair"),
"error should point at explicit repair; got: {err}"
);
let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await;
assert_eq!(manifest_after, manifest_before);
assert_eq!(head_after, head_before);
}
#[tokio::test]
async fn delete_only_mutation_refuses_uncovered_drift_before_inline_commit() {
let dir = tempfile::tempdir().unwrap();
let root = dir
.path()
.to_str()
.unwrap()
.trim_end_matches('/')
.to_string();
let mut db = init_and_load(&dir).await;
let (manifest_before, head_before, _) = forge_person_compaction_drift(&mut db, &root).await;
let err = mutate_main(
&mut db,
MUTATION_QUERIES,
"remove_person",
&mixed_params(&[("$name", "Alice")], &[]),
)
.await
.expect_err("strict delete must reject uncovered drift before delete_where");
assert!(
err.to_string().contains("expected"),
"delete should fail as a strict stale-version write; got: {err}"
);
let (manifest_after, head_after, _) = person_manifest_and_head(&db, &root).await;
assert_eq!(manifest_after, manifest_before);
assert_eq!(
head_after, head_before,
"delete_where must not run after the strict drift guard fails"
);
assert_eq!(
count_rows(&db, "node:Person").await,
8,
"manifest-pinned reads should still see all rows present before the failed delete"
);
}
#[tokio::test]
async fn optimize_defers_when_recovery_sidecar_is_pending() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let db = init_and_load(&dir).await;
let recovery_dir = dir.path().join("__recovery");
std::fs::create_dir_all(&recovery_dir).unwrap();
let person_path = node_table_uri(uri, "Person");
let sidecar_json = format!(
r#"{{
"schema_version": 1,
"operation_id": "01H000000000000000000DEFR",
"started_at": "0",
"branch": null,
"actor_id": "act-test",
"writer_kind": "Mutation",
"tables": [
{{
"table_key": "node:Person",
"table_path": "{}",
"expected_version": 1,
"post_commit_pin": 2
}}
]
}}"#,
person_path
);
std::fs::write(
recovery_dir.join("01H000000000000000000DEFR.json"),
sidecar_json,
)
.unwrap();
let err = db
.optimize()
.await
.expect_err("optimize must defer (error) while a recovery sidecar is pending");
assert!(
err.to_string().to_lowercase().contains("recovery"),
"optimize defer error should mention recovery; got: {err}",
);
}
#[tokio::test]
async fn cleanup_without_any_policy_option_errors() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let err = db
.cleanup(CleanupPolicyOptions::default())
.await
.expect_err("cleanup with no policy options must error");
let msg = format!("{}", err);
assert!(
msg.contains("keep_versions") && msg.contains("older_than"),
"error should name the two policy fields, got: {msg}"
);
}
#[tokio::test]
async fn cleanup_keep_one_preserves_head_and_table_remains_readable() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let people_before = count_rows(&db, "node:Person").await;
assert!(
people_before > 0,
"fixture should seed Person rows for this test to be meaningful"
);
let _stats = db
.cleanup(CleanupPolicyOptions {
keep_versions: Some(1),
older_than: None,
})
.await
.unwrap();
assert_eq!(count_rows(&db, "node:Person").await, people_before);
}
#[tokio::test]
async fn cleanup_older_than_zero_preserves_head() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let _stats = db
.cleanup(CleanupPolicyOptions {
keep_versions: None,
older_than: Some(Duration::from_secs(0)),
})
.await
.unwrap();
load_jsonl(&mut db, TEST_DATA, LoadMode::Merge)
.await
.unwrap();
}
#[tokio::test]
async fn cleanup_then_optimize_preserves_rows_and_table_remains_writable() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let people_before = count_rows(&db, "node:Person").await;
let companies_before = count_rows(&db, "node:Company").await;
assert!(
people_before > 0 && companies_before > 0,
"fixture should seed both Person and Company rows"
);
db.cleanup(CleanupPolicyOptions {
keep_versions: Some(1),
older_than: None,
})
.await
.unwrap();
db.optimize().await.unwrap();
assert_eq!(count_rows(&db, "node:Person").await, people_before);
assert_eq!(count_rows(&db, "node:Company").await, companies_before);
load_jsonl(&mut db, TEST_DATA, LoadMode::Merge)
.await
.unwrap();
assert_eq!(count_rows(&db, "node:Person").await, people_before);
}
#[tokio::test]
async fn cleanup_reconciles_orphaned_branch_forks() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = init_and_load(&dir).await;
let people_before = count_rows(&db, "node:Person").await;
assert!(people_before > 0, "fixture should seed Person rows");
let person_uri = node_table_uri(&uri, "Person");
{
let mut ds = Dataset::open(&person_uri).await.unwrap();
let base = ds.version().version;
ds.create_branch("ghost", base, None).await.unwrap();
assert!(
ds.list_branches().await.unwrap().contains_key("ghost"),
"precondition: orphaned fork staged"
);
}
db.cleanup(CleanupPolicyOptions {
keep_versions: Some(1),
older_than: None,
})
.await
.unwrap();
{
let ds = Dataset::open(&person_uri).await.unwrap();
assert!(
!ds.list_branches().await.unwrap().contains_key("ghost"),
"cleanup should reconcile the orphaned 'ghost' fork away"
);
}
assert_eq!(
count_rows(&db, "node:Person").await,
people_before,
"cleanup must not disturb main while reconciling orphans"
);
db.cleanup(CleanupPolicyOptions {
keep_versions: Some(1),
older_than: None,
})
.await
.unwrap();
}
#[tokio::test]
async fn cleanup_reconciles_live_branch_orphan_fork_but_keeps_legitimate_fork() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = init_and_load(&dir).await;
db.branch_create("feature").await.unwrap();
db.load_as(
"feature",
None,
r#"{"type":"Company","data":{"name":"Acme"}}"#,
LoadMode::Merge,
None,
)
.await
.unwrap();
let person_uri = node_table_uri(&uri, "Person");
{
let mut ds = Dataset::open(&person_uri).await.unwrap();
let base = ds.version().version;
ds.create_branch("feature", base, None).await.unwrap();
assert!(
ds.list_branches().await.unwrap().contains_key("feature"),
"precondition: forged orphan Person fork present on the live branch"
);
}
let company_uri = node_table_uri(&uri, "Company");
let main_people = count_rows(&db, "node:Person").await;
let main_companies = count_rows(&db, "node:Company").await;
db.cleanup(CleanupPolicyOptions {
keep_versions: Some(1),
older_than: None,
})
.await
.unwrap();
{
let ds = Dataset::open(&person_uri).await.unwrap();
assert!(
!ds.list_branches().await.unwrap().contains_key("feature"),
"cleanup must reclaim the manifest-unreferenced Person fork on the live branch"
);
}
{
let ds = Dataset::open(&company_uri).await.unwrap();
assert!(
ds.list_branches().await.unwrap().contains_key("feature"),
"cleanup must NOT reclaim a legitimately-forked table on a live branch"
);
}
assert_eq!(count_rows(&db, "node:Person").await, main_people);
assert_eq!(count_rows(&db, "node:Company").await, main_companies);
}
#[tokio::test]
async fn index_build_tolerates_null_vector_rows() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let schema = "node Doc {\n \
slug: String @key\n \
n: I64 @index\n \
embedding: Vector(8)? @index\n\
}\n";
let mut db = Omnigraph::init(uri, schema).await.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Doc\",\"data\":{\"slug\":\"d1\",\"n\":1}}\n\
{\"type\":\"Doc\",\"data\":{\"slug\":\"d2\",\"n\":2}}",
LoadMode::Merge,
)
.await
.expect("load rows with null embeddings");
db.ensure_indices()
.await
.expect("ensure_indices must not abort when a vector column has no trainable vectors yet");
}
#[tokio::test]
async fn optimize_materializes_index_declared_but_unbuilt() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let v1 = "node Doc {\n slug: String @key\n rank: I32\n}\n";
let mut db = Omnigraph::init(uri, v1).await.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Doc\",\"data\":{\"slug\":\"d1\",\"rank\":1}}\n\
{\"type\":\"Doc\",\"data\":{\"slug\":\"d2\",\"rank\":2}}",
LoadMode::Merge,
)
.await
.unwrap();
let v2 = "node Doc {\n slug: String @key\n rank: I32 @index\n}\n";
db.apply_schema(v2).await.expect("index-only apply");
{
let snap = snapshot_main(&db).await.unwrap();
let ds = snap.open("node:Doc").await.unwrap();
assert!(
matches!(
TableStore::key_column_index_coverage(&ds, "rank")
.await
.unwrap(),
IndexCoverage::Degraded { .. }
),
"rank must be unindexed after the deferred apply"
);
}
db.optimize().await.unwrap();
let snap = snapshot_main(&db).await.unwrap();
let ds = snap.open("node:Doc").await.unwrap();
assert_eq!(
TableStore::key_column_index_coverage(&ds, "rank")
.await
.unwrap(),
IndexCoverage::Indexed,
"optimize must build the declared-but-unbuilt rank index"
);
}
#[tokio::test]
async fn optimize_materializes_index_after_type_rename() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let v1 = "node Doc {\n slug: String @key\n rank: I32 @index\n}\n";
let mut db = Omnigraph::init(uri, v1).await.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Doc\",\"data\":{\"slug\":\"d1\",\"rank\":1}}\n\
{\"type\":\"Doc\",\"data\":{\"slug\":\"d2\",\"rank\":2}}",
LoadMode::Merge,
)
.await
.unwrap();
let v2 = "node Item @rename_from(\"Doc\") {\n slug: String @key\n rank: I32 @index\n}\n";
let result = db.apply_schema(v2).await.expect("rename apply");
assert!(result.applied);
assert_eq!(
count_rows(&db, "node:Item").await,
2,
"rename must preserve rows"
);
{
let snap = snapshot_main(&db).await.unwrap();
let ds = snap.open("node:Item").await.unwrap();
assert!(
matches!(
TableStore::key_column_index_coverage(&ds, "rank")
.await
.unwrap(),
IndexCoverage::Degraded { .. }
),
"rank must be unindexed immediately after the rename"
);
}
db.optimize().await.unwrap();
let snap = snapshot_main(&db).await.unwrap();
let ds = snap.open("node:Item").await.unwrap();
assert_eq!(
TableStore::key_column_index_coverage(&ds, "rank")
.await
.unwrap(),
IndexCoverage::Indexed,
"optimize must build the renamed table's deferred rank index"
);
}