#![recursion_limit = "512"]
mod helpers;
use helpers::cost::{
IoCounts, assert_flat, assert_grows, cost_harness, last_manifest_reads, local_graph, measure,
measure_insert, measure_insert_as, measure_with_staged,
};
use helpers::{MUTATION_QUERIES, commit_many, commit_many_as, init_and_load, mixed_params};
#[tokio::test]
async fn internal_table_scans_are_flat_in_history() {
cost_harness(async {
const ACTOR: &str = "act-cost-gate";
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
let mut curve: Vec<(u64, IoCounts)> = Vec::new();
let mut current = 0u64;
for d in [10u64, 100] {
if d > current {
commit_many_as(&mut db, (d - current) as usize, ACTOR).await;
current = d;
}
db.optimize().await.unwrap();
let io = measure_insert_as(&mut db, &format!("lock_{d}"), ACTOR).await;
current += 1; eprintln!(
"depth~{d}: data={} __manifest={}",
io.data_reads, io.manifest_reads
);
curve.push((d, io));
}
assert_flat(&curve, |c| c.manifest_reads, 4, "__manifest scan");
})
.await;
}
#[tokio::test]
async fn internal_table_scans_grow_without_compaction() {
cost_harness(async {
const ACTOR: &str = "act-cost-gate-served";
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
let mut curve: Vec<(u64, IoCounts)> = Vec::new();
let mut current = 0u64;
for d in [10u64, 100] {
if d > current {
commit_many_as(&mut db, (d - current) as usize, ACTOR).await;
current = d;
}
let io = measure_insert_as(&mut db, &format!("served_{d}"), ACTOR).await;
current += 1; eprintln!(
"depth~{d} (uncompacted): data={} __manifest={}",
io.data_reads, io.manifest_reads
);
curve.push((d, io));
}
assert_grows(&curve, |c| c.manifest_reads, 20, "__manifest scan (uncompacted/served)");
})
.await;
}
#[tokio::test]
async fn data_table_reads_split_into_flat_opener_and_growing_scan() {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
let mut curve: Vec<(u64, IoCounts)> = Vec::new();
let mut current = 0u64;
for d in [10u64, 100] {
if d > current {
commit_many(&mut db, (d - current) as usize).await;
current = d;
}
let io = measure_insert(&mut db, &format!("split_{d}")).await;
current += 1;
eprintln!(
"depth~{d}: opener={} scan={} data_total={}",
io.data_opener_reads, io.data_scan_reads, io.data_reads
);
curve.push((d, io));
}
assert!(
curve[0].1.data_opener_reads > 0,
"opener reads must be > 0 — the classifier missed version-resolution reads, \
so a flat opener assertion would be vacuous"
);
assert_flat(&curve, |c| c.data_opener_reads, 4, "local data-table opener");
assert_grows(&curve, |c| c.data_scan_reads, 20, "local data-table scan");
}
#[tokio::test]
async fn single_insert_data_write_is_bounded() {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
commit_many(&mut db, 5).await;
let io = measure_insert(&mut db, "w").await;
eprintln!("single insert: data_writes={}", io.data_writes);
assert!(io.data_writes <= 4, "data-table write_iops should be a small constant, got {}", io.data_writes);
}
#[tokio::test]
async fn write_op_count_ceiling_at_shallow_depth() {
cost_harness(async {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
commit_many(&mut db, 5).await;
let io = measure_insert(&mut db, "ceil").await;
eprintln!(
"depth~5: data={} __manifest={} total_reads={}",
io.data_reads, io.manifest_reads, io.total_reads()
);
const MANIFEST_CEILING: u64 = 24;
assert!(
io.manifest_reads <= MANIFEST_CEILING,
"per-write __manifest reads {} exceeded ceiling {MANIFEST_CEILING} — a publish-path \
scan was re-added (RFC-013 P2 folds them into one). Reads: {:#?}",
io.manifest_reads,
last_manifest_reads(),
);
const CEILING: u64 = 80;
assert!(
io.total_reads() <= CEILING,
"per-write read ops {} exceeded ceiling {CEILING} — a new round-trip was added",
io.total_reads()
);
})
.await;
}
#[tokio::test]
async fn keyed_insert_routes_through_merge_insert_only() {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
let (res, _io, staged) = measure_with_staged(db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "fit")], &[("$age", 30)]),
))
.await;
res.unwrap();
assert_eq!(staged.stage_merge_insert, 1, "keyed Person insert stages one merge-insert");
assert_eq!(staged.stage_append, 0, "keyed insert must not stage_append");
assert_eq!(staged.create_vector_index, 0, "no inline vector-index build on a plain insert");
}
#[tokio::test]
async fn write_validates_schema_contract_once() {
use omnigraph::instrumentation::CountingStorageAdapter;
use omnigraph::storage::storage_for_uri;
let dir = tempfile::tempdir().unwrap();
let _ = init_and_load(&dir).await;
let uri = dir.path().to_str().unwrap();
let (adapter, counts) = CountingStorageAdapter::new(storage_for_uri(uri).unwrap());
let db = omnigraph::db::Omnigraph::open_with_storage(uri, adapter)
.await
.unwrap();
let before_read_text = counts.read_text();
let before_exists = counts.exists();
db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "schema_once")], &[("$age", 30)]),
)
.await
.unwrap();
let read_text_delta = counts.read_text() - before_read_text;
let exists_delta = counts.exists() - before_exists;
eprintln!("schema-contract reads on one write: read_text={read_text_delta} exists={exists_delta}");
assert_eq!(
read_text_delta, 3,
"a write must validate the schema contract once (3 reads), not N times",
);
assert_eq!(
exists_delta, 2,
"a write must probe contract-file existence once (2 probes), not N times",
);
}
#[tokio::test]
async fn keyed_insert_opens_table_at_most_once() {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
let io = {
let (res, io) = measure(db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "opens")], &[("$age", 30)]),
))
.await;
res.unwrap();
io
};
eprintln!(
"data_open_count={} internal_open_count={} for a single-table keyed insert",
io.data_open_count, io.internal_open_count
);
assert!(
io.data_open_count <= 1,
"a keyed single-table write must open its data table at most once, got {}",
io.data_open_count,
);
}
#[tokio::test]
async fn manifest_reads_capture_warm_probe() {
let fresh = {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
commit_many(&mut db, 3).await; let io = measure_insert(&mut db, "fresh").await;
eprintln!("fresh-only warm write: __manifest={}", io.manifest_reads);
io.manifest_reads
};
cost_harness(async move {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
commit_many(&mut db, 3).await;
let io = measure_insert(&mut db, "ground_truth").await;
eprintln!("ground-truth warm write: __manifest={}", io.manifest_reads);
assert!(
io.manifest_reads > fresh,
"ground-truth __manifest reads {} must exceed fresh-only {fresh} by the \
warm-coordinator probe's RPCs — else the warm-handle probe is escaping the \
tracker (the blind spot this guards). Reads: {:#?}",
io.manifest_reads,
last_manifest_reads(),
);
})
.await;
}