use std::{
path::PathBuf,
sync::atomic::{AtomicU64, Ordering},
};
use super::{maintenance::CheckpointStop, *};
use crate::{
ElementId, EqualityIndex, IndexDefinition, Key, LabelId, PropertyFamily, PropertyKeyId,
PropertySubject, PropertyType,
};
static NEXT_PATH: AtomicU64 = AtomicU64::new(0);
fn temp_store(name: &str) -> PathBuf {
let id = NEXT_PATH.fetch_add(1, Ordering::Relaxed);
let path =
std::env::temp_dir().join(format!("oxgraph-db-cp-{name}-{}-{id}", std::process::id()));
let _ = std::fs::remove_dir_all(&path);
path
}
#[cfg(not(debug_assertions))]
const OPEN_LATENCY_ELEMENTS: usize = 100_000;
#[cfg(not(debug_assertions))]
const OPEN_LATENCY_RELATIONS: usize = 320_000;
#[cfg(not(debug_assertions))]
const OPEN_LATENCY_RUNS: u32 = 5;
#[cfg(not(debug_assertions))]
fn populate_large_store(database: &mut Db) {
database.set_checkpoint_policy(CheckpointPolicy::Manual);
database
.write(|writer| {
let rank = writer.register_property_key(
"rank",
PropertyFamily::Element,
PropertyType::Integer,
)?;
let weight = writer.register_property_key(
"weight",
PropertyFamily::Relation,
PropertyType::Integer,
)?;
let role = writer.register_role("party")?;
let mut elements = Vec::with_capacity(OPEN_LATENCY_ELEMENTS);
for index in 0..OPEN_LATENCY_ELEMENTS {
let element = writer.create_element()?;
writer.set_property(
PropertySubject::Element(element),
rank,
PropertyValue::Integer(i64::try_from(index % 997).unwrap_or(0)),
)?;
elements.push(element);
}
for index in 0..OPEN_LATENCY_RELATIONS {
let relation = writer.create_relation()?;
writer.set_property(
PropertySubject::Relation(relation),
weight,
PropertyValue::Integer(i64::try_from(index % 503).unwrap_or(0)),
)?;
let source = elements[index % OPEN_LATENCY_ELEMENTS];
let target = elements[(index + 1) % OPEN_LATENCY_ELEMENTS];
writer.create_incidence(relation, source, role)?;
writer.create_incidence(relation, target, role)?;
}
Ok(())
})
.expect("populate");
database.compact().expect("compact");
}
#[cfg(not(debug_assertions))]
fn mean_open_ms(path: &std::path::Path) -> f64 {
let mut total = std::time::Duration::ZERO;
for _run in 0..OPEN_LATENCY_RUNS {
let start = std::time::Instant::now();
let opened = Db::open(path).expect("timed open");
total += start.elapsed();
drop(opened);
}
total.as_secs_f64() * 1000.0 / f64::from(OPEN_LATENCY_RUNS)
}
#[cfg(not(debug_assertions))]
fn mean_old_from_view_ms(path: &std::path::Path) -> f64 {
let superblock = crate::wal::read_superblock(path).expect("superblock");
let base_path = path.join(base_file(superblock.base_generation.get()));
let mut total = std::time::Duration::ZERO;
for _run in 0..OPEN_LATENCY_RUNS {
let base = crate::backing::Base::open(&base_path, false).expect("base open");
let start = std::time::Instant::now();
let records = crate::overlay::BaseRecords::from_view(base.get()).expect("old from_view");
total += start.elapsed();
drop(records);
drop(base);
}
total.as_secs_f64() * 1000.0 / f64::from(OPEN_LATENCY_RUNS)
}
#[test]
#[ignore = "manual perf measurement; run explicitly with --release --ignored --nocapture"]
#[cfg(not(debug_assertions))]
fn open_latency_large_base() {
let path = temp_store("open-latency");
let mut database = Db::create(&path).expect("create");
populate_large_store(&mut database);
drop(database);
let _warm = Db::open(&path).expect("warm open");
let after_ms = mean_open_ms(&path);
let before_ms = mean_old_from_view_ms(&path);
println!(
"open_latency_large_base: {OPEN_LATENCY_ELEMENTS} elements, \
{OPEN_LATENCY_RELATIONS} relations, {} incidences, {} properties",
OPEN_LATENCY_RELATIONS * 2,
OPEN_LATENCY_ELEMENTS + OPEN_LATENCY_RELATIONS,
);
println!(" BEFORE open work (decode + from_records rebuild): {before_ms:.1} ms / open");
println!(" AFTER full Db::open (decode + BORROWED index): {after_ms:.1} ms / open");
let _ = std::fs::remove_dir_all(&path);
}
#[test]
fn reconcile_upserts_reuse_or_mint_and_retain_prunes_the_complement() {
let path = temp_store("reconcile");
let mut database = Db::create(&path).expect("create");
let index = {
let mut writer = database.begin_write().expect("begin write");
let key = writer
.register_property_key("stable_key", PropertyFamily::Element, PropertyType::Text)
.expect("key");
let index = writer
.define_index(
"element_stable_key_eq",
IndexDefinition::PropertyEquality { key },
)
.expect("index");
writer.commit().expect("commit schema");
index
};
let eq = EqualityIndex::<crate::Text>::from_id(index);
let (a1, b1) = {
let mut writer = database.begin_write().expect("begin write");
let a = writer.upsert_element(eq, "a").expect("upsert a");
let b = writer.upsert_element(eq, "b").expect("upsert b");
writer.commit().expect("commit");
(a, b)
};
let (a2, c1) = {
let mut writer = database.begin_write().expect("begin write");
let a = writer.upsert_element(eq, "a").expect("re-upsert a");
let c = writer.upsert_element(eq, "c").expect("upsert c");
writer.retain(eq, &["a", "c"]).expect("retain");
writer.commit().expect("commit");
(a, c)
};
assert_eq!(a1, a2, "an unchanged identity reuses its element id");
assert_ne!(c1, a1);
assert_ne!(c1, b1);
let read = database.reader();
assert!(read.contains_element(a1), "kept a");
assert!(read.contains_element(c1), "kept c");
assert!(!read.contains_element(b1), "retain tombstoned b");
assert_eq!(
read.element_by_key(eq, "a")
.expect("lookup a")
.map(|element| element.id),
Some(a1)
);
assert!(
read.element_by_key(eq, "b").expect("lookup b").is_none(),
"b is not resolvable after the prune"
);
let _ = std::fs::remove_dir_all(&path);
}
#[test]
fn write_closure_commits_on_ok_rolls_back_on_err_and_reports_outcome() {
let path = temp_store("write-closure");
let mut database = Db::create(&path).expect("create");
let (id, outcome) = database
.write(|writer| {
let id = writer.create_element()?;
Ok(id)
})
.expect("write");
assert!(matches!(outcome, CommitOutcome::Committed(_)));
database
.read(|read| {
assert!(read.contains_element(id));
Ok(())
})
.expect("read");
let ((), outcome) = database.write(|_writer| Ok(())).expect("empty write");
assert_eq!(outcome, CommitOutcome::Empty);
let before = database
.read(|read| Ok(read.element_count()))
.expect("count");
let result = database.write(|writer| {
writer.create_element()?;
Err::<(), DbError>(DbError::Query(crate::error::QueryError::Empty))
});
assert!(result.is_err());
let after = database
.read(|read| Ok(read.element_count()))
.expect("count");
assert_eq!(before, after, "the failed write staged nothing durable");
let _ = std::fs::remove_dir_all(&path);
}
#[test]
fn re_setting_an_unchanged_property_value_is_a_no_op_commit() {
let path = temp_store("set-noop");
let mut database = Db::create(&path).expect("create");
let schema = Schema::new().key::<crate::Text>("name", PropertyFamily::Element);
let id = database
.write(|writer| {
let bound = writer.apply_schema(&schema)?;
let name = bound.key::<crate::Text>("name")?;
let id = writer.create_element()?;
writer.set(id, name, "alpha")?;
Ok(id)
})
.expect("first write")
.0;
let ((), outcome) = database
.write(|writer| {
let bound = writer.apply_schema(&schema)?;
let name = bound.key::<crate::Text>("name")?;
writer.set(id, name, "alpha")?;
Ok(())
})
.expect("idempotent set");
assert_eq!(
outcome,
CommitOutcome::Empty,
"re-setting the same property value must log no mutation"
);
let ((), outcome) = database
.write(|writer| {
let bound = writer.apply_schema(&schema)?;
let name = bound.key::<crate::Text>("name")?;
writer.set(id, name, "beta")?;
Ok(())
})
.expect("changed set");
assert!(matches!(outcome, CommitOutcome::Committed(_)));
let name = database
.bind(&schema)
.expect("bind")
.key::<crate::Text>("name")
.expect("name key");
let value = database
.read(|read| {
Ok(read
.element(id)
.and_then(|element| element.properties().get::<crate::Text, String>(name)))
})
.expect("read");
assert_eq!(value.as_deref(), Some("beta"));
let _ = std::fs::remove_dir_all(&path);
}
#[test]
fn schema_apply_is_idempotent_and_bind_resolves_typed_handles() {
let path = temp_store("schema");
let mut database = Db::create(&path).expect("create");
let schema = Schema::new()
.label("function")
.key::<crate::Text>("name", PropertyFamily::Element)
.equality_index("name_eq", "name");
let (alpha, beta) = database
.write(|writer| {
let bound = writer.apply_schema(&schema)?;
let name_eq = bound.equality_index::<crate::Text>("name_eq")?;
let function = bound.label("function")?;
let alpha = writer.upsert_element(name_eq, "alpha")?;
writer.add_label(alpha, function)?;
let beta = writer.upsert_element(name_eq, "beta")?;
Ok((alpha, beta))
})
.expect("apply + write")
.0;
assert_ne!(alpha, beta);
let (_bound, outcome) = database
.write(|writer| writer.apply_schema(&schema))
.expect("re-apply");
assert_eq!(
outcome,
CommitOutcome::Empty,
"re-applying a schema registers nothing new"
);
let reopened = Db::open(&path).expect("open");
let bound = reopened.bind(&schema).expect("bind");
let name_eq = bound
.equality_index::<crate::Text>("name_eq")
.expect("typed index");
assert!(
bound.equality_index::<crate::Int>("name_eq").is_err(),
"a wrong-value-type handle request is a SchemaConflict"
);
let found = reopened
.read(|read| read.element_by_key(name_eq, "alpha"))
.expect("read")
.expect("alpha present");
assert_eq!(found.id, alpha);
let _ = std::fs::remove_dir_all(&path);
}
#[derive(Debug, Eq, PartialEq)]
struct LogicalState {
elements: Vec<ElementId>,
rank_eq_500: Vec<PropertySubject>,
person_members: Vec<ElementId>,
}
struct Fixture {
rank: PropertyKeyId,
person: LabelId,
}
fn build_fixture(database: &mut Db) -> Fixture {
let mut writer = database.begin_write().expect("begin write");
let rank = writer
.register_property_key("rank", PropertyFamily::Element, PropertyType::Integer)
.expect("rank key");
let person = writer.register_label("Person").expect("person label");
for index in 0..8u64 {
let element = writer.create_element().expect("element");
writer
.set(
element,
Key::<crate::Int>::from_id(rank),
i64::try_from(index).expect("index") * 100,
)
.expect("set rank");
if index % 2 == 0 {
writer.add_label(element, person).expect("add label");
}
}
writer.commit().expect("commit fixture");
Fixture { rank, person }
}
fn read_logical(database: &Db, fixture: &Fixture) -> LogicalState {
let read = database.reader();
let elements = read.element_ids();
let rank_eq_500 = read
.lookup_property_equal(fixture.rank, &PropertyValue::Integer(500))
.expect("rank lookup");
let person_members = read.snapshot.view().elements_with_label(fixture.person);
LogicalState {
elements,
rank_eq_500,
person_members,
}
}
fn assert_no_id_reuse_across_fold(database: &mut Db) {
let max_existing = database
.reader()
.element_ids()
.into_iter()
.map(ElementId::get)
.max()
.unwrap_or(0);
let expected = ElementId::new(max_existing + 1);
let mut writer = database.begin_write().expect("watermark probe writer");
let minted = writer.create_element().expect("watermark probe element");
assert_eq!(
minted, expected,
"the next minted id must be one past the max existing id (watermark \
survived the fold; ids are never reused)",
);
drop(writer);
}
#[test]
fn checkpoint_crash_matrix_recovers_exact_state() {
for stop in [
CheckpointStop::BeforeSuperblock,
CheckpointStop::BeforeRotate,
CheckpointStop::Complete,
] {
let path = temp_store(&format!("crash-{stop:?}"));
let mut database = Db::create(&path).expect("create");
let fixture = build_fixture(&mut database);
let before = read_logical(&database, &fixture);
let before_generation = database.base_generation;
database
.checkpoint_inner(stop)
.expect("checkpoint stop returns ok");
drop(database);
let mut recovered = Db::open(&path).expect("reopen after crash");
let after = read_logical(&recovered, &fixture);
assert_eq!(
after, before,
"crash at {stop:?} must recover the exact logical state",
);
assert_no_id_reuse_across_fold(&mut recovered);
match stop {
CheckpointStop::BeforeSuperblock => assert_eq!(
recovered.base_generation, before_generation,
"old superblock stays authoritative before the new one lands",
),
CheckpointStop::BeforeRotate | CheckpointStop::Complete => assert_eq!(
recovered.base_generation,
before_generation + 1,
"the new superblock names the folded generation",
),
}
let reopened = Db::open(&path).expect("second reopen");
assert_eq!(read_logical(&reopened, &fixture), before);
drop(reopened);
let _ = std::fs::remove_dir_all(&path);
}
}
#[test]
fn auto_checkpoint_policy_folds_when_log_outgrows_base() {
let manual_path = temp_store("auto-manual");
let mut manual = Db::create(&manual_path).expect("create manual");
manual.set_checkpoint_policy(CheckpointPolicy::Manual);
let _fixture = build_fixture(&mut manual);
for _ in 0..200 {
let mut writer = manual.begin_write().expect("writer");
writer.create_element().expect("element");
writer.commit().expect("commit");
}
assert_eq!(
manual.live_generation(),
CheckpointGeneration::new(0),
"manual policy must never auto-fold",
);
drop(manual);
let _ = std::fs::remove_dir_all(&manual_path);
let auto_path = temp_store("auto-ratio");
let mut auto = Db::create(&auto_path).expect("create auto");
auto.set_checkpoint_policy(CheckpointPolicy::SizeRatio { factor: 1 });
let fixture = build_fixture(&mut auto);
let before = read_logical(&auto, &fixture);
for _ in 0..400 {
let mut writer = auto.begin_write().expect("writer");
writer.create_element().expect("element");
writer.commit().expect("commit");
}
assert!(
auto.live_generation() > CheckpointGeneration::new(0),
"size-ratio policy must auto-fold once the log outgrows the base",
);
let after = read_logical(&auto, &fixture);
assert_eq!(after.rank_eq_500, before.rank_eq_500);
assert_eq!(after.person_members, before.person_members);
assert_no_id_reuse_across_fold(&mut auto);
assert_eq!(
auto.checkpoint_policy(),
CheckpointPolicy::SizeRatio { factor: 1 },
"the auto-fold reopen must preserve the configured policy",
);
let status = auto.stats();
assert_eq!(status.live_generation, auto.live_generation());
assert!(status.base_byte_size > 0, "live base has bytes");
drop(auto);
let _ = std::fs::remove_dir_all(&auto_path);
}