#![recursion_limit = "512"]
mod helpers;
use helpers::cost::{
IoCounts, assert_flat, assert_grows, local_graph, measure, measure_insert, measure_insert_as,
measure_with_staged,
};
use helpers::{MUTATION_QUERIES, commit_many, commit_many_as, init_and_load, mixed_params};
#[tokio::test]
async fn internal_table_scans_are_flat_in_history() {
const ACTOR: &str = "act-cost-gate";
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
let mut curve: Vec<(u64, IoCounts)> = Vec::new();
let mut current = 0u64;
for d in [10u64, 100] {
if d > current {
commit_many_as(&mut db, (d - current) as usize, ACTOR).await;
current = d;
}
db.optimize().await.unwrap();
let io = measure_insert_as(&mut db, &format!("lock_{d}"), ACTOR).await;
current += 1; eprintln!(
"depth~{d}: data={} __manifest={} _graph_commits+actors={}",
io.data_reads, io.manifest_reads, io.commit_graph_reads
);
curve.push((d, io));
}
assert_flat(&curve, |c| c.manifest_reads, 4, "__manifest scan");
assert_flat(&curve, |c| c.commit_graph_reads, 4, "_graph_commits + _graph_commit_actors scan");
}
#[tokio::test]
async fn data_table_reads_split_into_flat_opener_and_growing_scan() {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
let mut curve: Vec<(u64, IoCounts)> = Vec::new();
let mut current = 0u64;
for d in [10u64, 100] {
if d > current {
commit_many(&mut db, (d - current) as usize).await;
current = d;
}
let io = measure_insert(&mut db, &format!("split_{d}")).await;
current += 1;
eprintln!(
"depth~{d}: opener={} scan={} data_total={}",
io.data_opener_reads, io.data_scan_reads, io.data_reads
);
curve.push((d, io));
}
assert!(
curve[0].1.data_opener_reads > 0,
"opener reads must be > 0 — the classifier missed version-resolution reads, \
so a flat opener assertion would be vacuous"
);
assert_flat(&curve, |c| c.data_opener_reads, 4, "local data-table opener");
assert_grows(&curve, |c| c.data_scan_reads, 20, "local data-table scan");
}
#[tokio::test]
async fn single_insert_data_write_is_bounded() {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
commit_many(&mut db, 5).await;
let io = measure_insert(&mut db, "w").await;
eprintln!("single insert: data_writes={}", io.data_writes);
assert!(io.data_writes <= 4, "data-table write_iops should be a small constant, got {}", io.data_writes);
}
#[tokio::test]
async fn write_op_count_ceiling_at_shallow_depth() {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
commit_many(&mut db, 5).await;
let io = measure_insert(&mut db, "ceil").await;
eprintln!(
"depth~5: data={} __manifest={} _graph_commits={} total_reads={}",
io.data_reads, io.manifest_reads, io.commit_graph_reads, io.total_reads()
);
const CEILING: u64 = 80;
assert!(
io.total_reads() <= CEILING,
"per-write read ops {} exceeded ceiling {CEILING} — a new round-trip was added",
io.total_reads()
);
}
#[tokio::test]
async fn keyed_insert_routes_through_merge_insert_only() {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
let (res, _io, staged) = measure_with_staged(db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "fit")], &[("$age", 30)]),
))
.await;
res.unwrap();
assert_eq!(staged.stage_merge_insert, 1, "keyed Person insert stages one merge-insert");
assert_eq!(staged.stage_append, 0, "keyed insert must not stage_append");
assert_eq!(staged.create_vector_index, 0, "no inline vector-index build on a plain insert");
}
#[tokio::test]
async fn write_validates_schema_contract_once() {
use omnigraph::instrumentation::CountingStorageAdapter;
use omnigraph::storage::storage_for_uri;
let dir = tempfile::tempdir().unwrap();
let _ = init_and_load(&dir).await;
let uri = dir.path().to_str().unwrap();
let (adapter, counts) = CountingStorageAdapter::new(storage_for_uri(uri).unwrap());
let db = omnigraph::db::Omnigraph::open_with_storage(uri, adapter)
.await
.unwrap();
let before_read_text = counts.read_text();
let before_exists = counts.exists();
db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "schema_once")], &[("$age", 30)]),
)
.await
.unwrap();
let read_text_delta = counts.read_text() - before_read_text;
let exists_delta = counts.exists() - before_exists;
eprintln!("schema-contract reads on one write: read_text={read_text_delta} exists={exists_delta}");
assert_eq!(
read_text_delta, 3,
"a write must validate the schema contract once (3 reads), not N times",
);
assert_eq!(
exists_delta, 2,
"a write must probe contract-file existence once (2 probes), not N times",
);
}
#[tokio::test]
async fn keyed_insert_opens_table_at_most_once() {
let dir = tempfile::tempdir().unwrap();
let mut db = local_graph(&dir).await;
let io = {
let (res, io) = measure(db.mutate(
"main",
MUTATION_QUERIES,
"insert_person",
&mixed_params(&[("$name", "opens")], &[("$age", 30)]),
))
.await;
res.unwrap();
io
};
eprintln!(
"data_open_count={} internal_open_count={} for a single-table keyed insert",
io.data_open_count, io.internal_open_count
);
assert!(
io.data_open_count <= 1,
"a keyed single-table write must open its data table at most once, got {}",
io.data_open_count,
);
}