mod helpers;
use std::time::Duration;
use lance::Dataset;
use omnigraph::db::{CleanupPolicyOptions, Omnigraph, SkipReason};
use omnigraph::loader::{LoadMode, load_jsonl};
use helpers::{TEST_DATA, TEST_SCHEMA, count_rows, init_and_load};
fn node_table_uri(root: &str, type_name: &str) -> String {
let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
for &b in type_name.as_bytes() {
hash ^= b as u64;
hash = hash.wrapping_mul(0x100_0000_01b3);
}
format!("{}/nodes/{hash:016x}", root.trim_end_matches('/'))
}
#[tokio::test]
async fn optimize_on_empty_graph_returns_stats_per_table_with_no_changes() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let mut db = Omnigraph::init(uri, TEST_SCHEMA).await.unwrap();
let stats = db.optimize().await.unwrap();
assert_eq!(stats.len(), 4);
for s in &stats {
assert_eq!(s.fragments_removed, 0, "{} should not remove", s.table_key);
assert_eq!(s.fragments_added, 0, "{} should not add", s.table_key);
}
}
#[tokio::test]
async fn optimize_after_load_then_again_is_idempotent() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let _first = db.optimize().await.unwrap();
let second = db.optimize().await.unwrap();
for s in &second {
assert_eq!(
s.fragments_removed, 0,
"{} re-optimize should be no-op",
s.table_key
);
assert_eq!(
s.fragments_added, 0,
"{} re-optimize should be no-op",
s.table_key
);
assert!(
!s.committed,
"{} re-optimize should not commit a new version",
s.table_key
);
}
}
#[tokio::test]
async fn optimize_skips_blob_table_and_reports_skip() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap();
let schema = "\
node Doc {\n slug: String @key\n content: Blob\n}\n\
node Tag {\n slug: String @key\n}\n";
let mut db = Omnigraph::init(uri, schema).await.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Doc\",\"data\":{\"slug\":\"d1\",\"content\":\"base64:aGVsbG8x\"}}\n{\"type\":\"Doc\",\"data\":{\"slug\":\"d2\",\"content\":\"base64:aGVsbG8y\"}}",
LoadMode::Overwrite,
)
.await
.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Doc\",\"data\":{\"slug\":\"d3\",\"content\":\"base64:aGVsbG8z\"}}",
LoadMode::Merge,
)
.await
.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Doc\",\"data\":{\"slug\":\"d4\",\"content\":\"base64:aGVsbG80\"}}",
LoadMode::Merge,
)
.await
.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Tag\",\"data\":{\"slug\":\"t1\"}}\n{\"type\":\"Tag\",\"data\":{\"slug\":\"t2\"}}",
LoadMode::Merge,
)
.await
.unwrap();
load_jsonl(
&mut db,
"{\"type\":\"Tag\",\"data\":{\"slug\":\"t3\"}}",
LoadMode::Merge,
)
.await
.unwrap();
let stats = db
.optimize()
.await
.expect("optimize must not crash on a graph with a Blob table");
let doc = stats
.iter()
.find(|s| s.table_key == "node:Doc")
.expect("Doc stat present");
let tag = stats
.iter()
.find(|s| s.table_key == "node:Tag")
.expect("Tag stat present");
assert_eq!(
doc.skipped,
Some(SkipReason::BlobColumnsUnsupportedByLance),
"blob table must be reported as skipped",
);
assert!(!doc.committed, "skipped blob table is not compacted");
assert_eq!(doc.fragments_removed, 0);
assert_eq!(doc.fragments_added, 0);
assert_eq!(tag.skipped, None, "non-blob table must not be skipped");
}
#[tokio::test]
async fn cleanup_without_any_policy_option_errors() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let err = db
.cleanup(CleanupPolicyOptions::default())
.await
.expect_err("cleanup with no policy options must error");
let msg = format!("{}", err);
assert!(
msg.contains("keep_versions") && msg.contains("older_than"),
"error should name the two policy fields, got: {msg}"
);
}
#[tokio::test]
async fn cleanup_keep_one_preserves_head_and_table_remains_readable() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let people_before = count_rows(&db, "node:Person").await;
assert!(
people_before > 0,
"fixture should seed Person rows for this test to be meaningful"
);
let _stats = db
.cleanup(CleanupPolicyOptions {
keep_versions: Some(1),
older_than: None,
})
.await
.unwrap();
assert_eq!(count_rows(&db, "node:Person").await, people_before);
}
#[tokio::test]
async fn cleanup_older_than_zero_preserves_head() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let _stats = db
.cleanup(CleanupPolicyOptions {
keep_versions: None,
older_than: Some(Duration::from_secs(0)),
})
.await
.unwrap();
load_jsonl(&mut db, TEST_DATA, LoadMode::Merge)
.await
.unwrap();
}
#[tokio::test]
async fn cleanup_then_optimize_preserves_rows_and_table_remains_writable() {
let dir = tempfile::tempdir().unwrap();
let mut db = init_and_load(&dir).await;
let people_before = count_rows(&db, "node:Person").await;
let companies_before = count_rows(&db, "node:Company").await;
assert!(
people_before > 0 && companies_before > 0,
"fixture should seed both Person and Company rows"
);
db.cleanup(CleanupPolicyOptions {
keep_versions: Some(1),
older_than: None,
})
.await
.unwrap();
db.optimize().await.unwrap();
assert_eq!(count_rows(&db, "node:Person").await, people_before);
assert_eq!(count_rows(&db, "node:Company").await, companies_before);
load_jsonl(&mut db, TEST_DATA, LoadMode::Merge)
.await
.unwrap();
assert_eq!(count_rows(&db, "node:Person").await, people_before);
}
#[tokio::test]
async fn cleanup_reconciles_orphaned_branch_forks() {
let dir = tempfile::tempdir().unwrap();
let uri = dir.path().to_str().unwrap().to_string();
let mut db = init_and_load(&dir).await;
let people_before = count_rows(&db, "node:Person").await;
assert!(people_before > 0, "fixture should seed Person rows");
let person_uri = node_table_uri(&uri, "Person");
{
let mut ds = Dataset::open(&person_uri).await.unwrap();
let base = ds.version().version;
ds.create_branch("ghost", base, None).await.unwrap();
assert!(
ds.list_branches().await.unwrap().contains_key("ghost"),
"precondition: orphaned fork staged"
);
}
db.cleanup(CleanupPolicyOptions {
keep_versions: Some(1),
older_than: None,
})
.await
.unwrap();
{
let ds = Dataset::open(&person_uri).await.unwrap();
assert!(
!ds.list_branches().await.unwrap().contains_key("ghost"),
"cleanup should reconcile the orphaned 'ghost' fork away"
);
}
assert_eq!(
count_rows(&db, "node:Person").await,
people_before,
"cleanup must not disturb main while reconciling orphans"
);
db.cleanup(CleanupPolicyOptions {
keep_versions: Some(1),
older_than: None,
})
.await
.unwrap();
}