use std::io::Write;
use std::path::{Path, PathBuf};
use lora_database::{
resolve_database_path, Database, DatabaseName, DatabaseOpenOptions, ExecuteOptions,
ResultFormat,
};
use lora_store::{MutationEvent, Properties, PropertyValue};
use lora_wal::{Lsn, SyncMode, Wal, WalConfig};
struct TmpDir {
path: PathBuf,
}
impl TmpDir {
fn new(tag: &str) -> Self {
let mut path = std::env::temp_dir();
path.push(format!(
"lora-db-wal-{}-{}-{}",
tag,
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
));
std::fs::create_dir_all(&path).unwrap();
Self { path }
}
fn path(&self) -> &Path {
&self.path
}
}
impl Drop for TmpDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.path);
}
}
fn copy_dir_all(from: &Path, to: &Path) {
std::fs::create_dir_all(to).unwrap();
for entry in std::fs::read_dir(from).unwrap() {
let entry = entry.unwrap();
let from_path = entry.path();
let to_path = to.join(entry.file_name());
if from_path.is_dir() {
copy_dir_all(&from_path, &to_path);
} else {
std::fs::copy(&from_path, &to_path).unwrap();
}
}
}
fn write_archive_without_manifest(path: &Path) {
let file = std::fs::File::create(path).unwrap();
let mut zip = zip::ZipWriter::new(file);
let options = zip::write::FileOptions::default()
.compression_method(zip::CompressionMethod::Stored)
.unix_permissions(0o644);
zip.start_file("wal/0000000001.wal", options).unwrap();
zip.write_all(b"not-a-real-wal").unwrap();
zip.finish().unwrap();
}
fn rows() -> Option<ExecuteOptions> {
Some(ExecuteOptions {
format: ResultFormat::Rows,
})
}
fn enabled(dir: &Path) -> WalConfig {
WalConfig::Enabled {
dir: dir.to_path_buf(),
sync_mode: SyncMode::PerCommit,
segment_target_bytes: 8 * 1024 * 1024,
}
}
fn group_enabled(dir: &Path) -> WalConfig {
WalConfig::Enabled {
dir: dir.to_path_buf(),
sync_mode: SyncMode::Group {
interval_ms: 60_000,
},
segment_target_bytes: 8 * 1024 * 1024,
}
}
#[test]
fn disabled_config_behaves_like_in_memory() {
let db = Database::open_with_wal(WalConfig::Disabled).unwrap();
db.execute("CREATE (:User {id: 1})", rows()).unwrap();
assert_eq!(db.node_count(), 1);
assert!(db.wal().is_none());
}
#[test]
fn database_name_validation_accepts_only_portable_names() {
for valid in [
"app",
"app.loradb",
"tenant_01",
"tenant+01",
"a-b",
"A123",
"./database-dir/application",
"database_dir/app.loradb",
] {
assert!(
DatabaseName::parse(valid).is_ok(),
"{valid} should be valid"
);
}
for invalid in [
"",
".",
"..",
"../x",
"/absolute/app",
"x//y",
"a-b.c",
"app.txt",
"has space",
"ümlaut",
] {
assert!(
DatabaseName::parse(invalid).is_err(),
"{invalid:?} should be invalid"
);
}
}
#[test]
fn named_database_resolves_to_lora_root_under_database_dir() {
let dir = TmpDir::new("named-path");
let path = resolve_database_path("app_01", dir.path()).unwrap();
assert_eq!(path, dir.path().join("app_01.loradb"));
let path = resolve_database_path("./tenant-a/application", dir.path()).unwrap();
assert_eq!(path, dir.path().join("tenant-a").join("application.loradb"));
let path = resolve_database_path("tenant_b/app.loradb", dir.path()).unwrap();
assert_eq!(path, dir.path().join("tenant_b").join("app.loradb"));
}
#[test]
fn named_database_persists_under_lora_root() {
let dir = TmpDir::new("named-recover");
{
let db = Database::open_named(
"app",
DatabaseOpenOptions::default().with_database_dir(dir.path()),
)
.unwrap();
db.execute("CREATE (:User {id: 1})", rows()).unwrap();
}
assert!(
dir.path().join("app.loradb").is_file(),
"named databases should persist as a portable .loradb archive file"
);
assert!(
!dir.path().join("app.loradb.wal").exists(),
"clean shutdown should remove the durable sidecar after archiving"
);
let bytes = std::fs::read(dir.path().join("app.loradb")).unwrap();
assert_eq!(&bytes[..4], b"PK\x03\x04");
let file = std::fs::File::open(dir.path().join("app.loradb")).unwrap();
let mut zip = zip::ZipArchive::new(file).unwrap();
assert!(zip.by_name("manifest.json").is_ok());
assert!(zip.by_name("wal/0000000001.wal").is_ok());
let db = Database::open_named(
"app",
DatabaseOpenOptions::default().with_database_dir(dir.path()),
)
.unwrap();
assert_eq!(db.node_count(), 1);
}
#[test]
fn named_database_recovers_from_durable_sidecar_when_archive_lags() {
let dir = TmpDir::new("named-sidecar-recover");
let archive_path = dir.path().join("app.loradb");
let sidecar_path = dir.path().join("app.loradb.wal");
let saved_sidecar = dir.path().join("saved-sidecar");
{
let db = Database::open_named(
"app",
DatabaseOpenOptions {
sync_mode: SyncMode::PerCommit,
..DatabaseOpenOptions::default().with_database_dir(dir.path())
},
)
.unwrap();
db.execute("CREATE (:N {id: 1})", rows()).unwrap();
}
let stale_archive = std::fs::read(&archive_path).unwrap();
{
let db = Database::open_named(
"app",
DatabaseOpenOptions {
sync_mode: SyncMode::PerCommit,
..DatabaseOpenOptions::default().with_database_dir(dir.path())
},
)
.unwrap();
db.execute("CREATE (:N {id: 2})", rows()).unwrap();
copy_dir_all(&sidecar_path, &saved_sidecar);
}
std::fs::write(&archive_path, stale_archive).unwrap();
copy_dir_all(&saved_sidecar, &sidecar_path);
{
let db = Database::open_named(
"app",
DatabaseOpenOptions::default().with_database_dir(dir.path()),
)
.unwrap();
assert_eq!(db.node_count(), 2);
}
assert!(
!sidecar_path.exists(),
"clean recovery shutdown should archive and remove the sidecar"
);
}
#[test]
fn named_database_rejects_invalid_archive_without_publishing_partial_sidecar() {
let dir = TmpDir::new("named-invalid-archive");
let archive_path = dir.path().join("app.loradb");
let sidecar_path = dir.path().join("app.loradb.wal");
write_archive_without_manifest(&archive_path);
let err = match Database::open_named(
"app",
DatabaseOpenOptions::default().with_database_dir(dir.path()),
) {
Ok(_) => panic!("invalid archive should fail to open"),
Err(err) => err,
};
assert!(
err.to_string().contains("manifest"),
"unexpected error: {err}"
);
assert!(
!sidecar_path.exists(),
"failed archive extraction must not leave a partial sidecar"
);
}
#[test]
fn named_database_rejects_concurrent_archive_open() {
let dir = TmpDir::new("named-concurrent-open");
let first = Database::open_named(
"app",
DatabaseOpenOptions::default().with_database_dir(dir.path()),
)
.unwrap();
let err = match Database::open_named(
"app",
DatabaseOpenOptions::default().with_database_dir(dir.path()),
) {
Ok(_) => panic!("second archive open should fail"),
Err(err) => err,
};
assert!(
err.to_string().contains("already open"),
"unexpected error: {err}"
);
drop(first);
let reopened = Database::open_named(
"app",
DatabaseOpenOptions::default().with_database_dir(dir.path()),
)
.unwrap();
assert_eq!(reopened.node_count(), 0);
}
#[test]
fn named_database_recovers_write_burst_from_zip_archive() {
let dir = TmpDir::new("named-burst");
{
let db = Database::open_named(
"burst",
DatabaseOpenOptions::default().with_database_dir(dir.path()),
)
.unwrap();
for i in 0..250 {
db.execute(&format!("CREATE (:Burst {{id: {i}}})"), rows())
.unwrap();
}
assert_eq!(db.node_count(), 250);
}
let archive_path = dir.path().join("burst.loradb");
assert!(archive_path.is_file());
let file = std::fs::File::open(&archive_path).unwrap();
let mut zip = zip::ZipArchive::new(file).unwrap();
assert!(zip.by_name("manifest.json").is_ok());
assert!(zip.by_name("wal/0000000001.wal").is_ok());
let db = Database::open_named(
"burst",
DatabaseOpenOptions::default().with_database_dir(dir.path()),
)
.unwrap();
assert_eq!(db.node_count(), 250);
let result = db
.execute("MATCH (n:Burst) RETURN n.id AS id ORDER BY id", rows())
.unwrap();
let json = serde_json::to_value(&result).unwrap();
let row_array = json["rows"].as_array().expect("rows array");
assert_eq!(row_array.first().unwrap()["id"], serde_json::json!(0));
assert_eq!(row_array.last().unwrap()["id"], serde_json::json!(249));
}
#[test]
fn named_database_final_archive_flush_captures_group_buffer() {
let dir = TmpDir::new("named-group-final-flush");
{
let db = Database::open_named(
"app",
DatabaseOpenOptions {
database_dir: dir.path().to_path_buf(),
sync_mode: SyncMode::Group {
interval_ms: 60_000,
},
..DatabaseOpenOptions::default()
},
)
.unwrap();
db.execute(
"CREATE (:Person {name: 'Ada'})-[:KNOWS]->(:Person {name: 'Grace'})",
rows(),
)
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(1_200));
}
let db = Database::open_named(
"app",
DatabaseOpenOptions::default().with_database_dir(dir.path()),
)
.unwrap();
assert_eq!(db.node_count(), 2);
assert_eq!(db.relationship_count(), 1);
}
#[test]
fn fresh_open_then_crash_recover_replays_committed_writes() {
let dir = TmpDir::new("recover");
{
let db = Database::open_with_wal(enabled(dir.path())).unwrap();
db.execute("CREATE (:User {id: 1, name: 'alice'})", rows())
.unwrap();
db.execute("CREATE (:User {id: 2, name: 'bob'})", rows())
.unwrap();
}
let db = Database::open_with_wal(enabled(dir.path())).unwrap();
assert_eq!(db.node_count(), 2);
let result = db
.execute("MATCH (u:User) RETURN u.id AS id ORDER BY id", rows())
.unwrap();
let json = serde_json::to_value(&result).unwrap();
let row_array = json["rows"].as_array().expect("rows array");
assert_eq!(row_array.len(), 2);
assert_eq!(row_array[0]["id"], serde_json::json!(1));
assert_eq!(row_array[1]["id"], serde_json::json!(2));
}
#[test]
fn read_only_queries_dont_block_recovery() {
let dir = TmpDir::new("read-only");
{
let db = Database::open_with_wal(enabled(dir.path())).unwrap();
db.execute("CREATE (:Tag {v: 1})", rows()).unwrap();
for _ in 0..5 {
db.execute("MATCH (t:Tag) RETURN t", rows()).unwrap();
}
db.execute("CREATE (:Tag {v: 2})", rows()).unwrap();
}
let db = Database::open_with_wal(enabled(dir.path())).unwrap();
assert_eq!(db.node_count(), 2);
}
fn wal_bytes(dir: &Path) -> u64 {
let mut total = 0u64;
for entry in std::fs::read_dir(dir).unwrap().flatten() {
let p = entry.path();
if p.extension().and_then(|s| s.to_str()) == Some("wal") {
total += std::fs::metadata(&p).unwrap().len();
}
}
total
}
#[test]
fn read_only_queries_do_not_grow_wal_or_advance_lsn() {
let dir = TmpDir::new("ro-no-grow");
let db = Database::open_with_wal(enabled(dir.path())).unwrap();
db.execute("CREATE (:Tag {v: 1})", rows()).unwrap();
let bytes_before = wal_bytes(dir.path());
let lsn_before = db.wal().unwrap().wal().next_lsn();
for _ in 0..200 {
db.execute("MATCH (t:Tag) RETURN t", rows()).unwrap();
}
let bytes_after = wal_bytes(dir.path());
let lsn_after = db.wal().unwrap().wal().next_lsn();
assert_eq!(
bytes_before,
bytes_after,
"200 read-only queries grew the WAL by {} bytes",
bytes_after.saturating_sub(bytes_before)
);
assert_eq!(
lsn_before, lsn_after,
"200 read-only queries advanced next_lsn from {} to {}",
lsn_before, lsn_after
);
}
#[test]
fn aborted_query_does_not_persist_partial_mutation() {
let dir = TmpDir::new("aborted");
{
let db = Database::open_with_wal(enabled(dir.path())).unwrap();
db.execute("CREATE (:User {id: 1})", rows()).unwrap();
let bad = db.execute("MATCH (u:User) CREATE (u)-[:KNOWS]->(missing)", rows());
let _ = bad;
}
let recovered = Database::open_with_wal(enabled(dir.path())).unwrap();
let count = recovered.node_count();
drop(recovered);
let again = Database::open_with_wal(enabled(dir.path())).unwrap();
assert_eq!(again.node_count(), count);
}
#[test]
fn failed_mutating_query_poisons_live_wal_handle_until_restart() {
let dir = TmpDir::new("abort-poisons-live");
{
let db = Database::open_with_wal(enabled(dir.path())).unwrap();
let err = db
.execute("CREATE (a)-[:R]->(b) WITH a DELETE a", rows())
.unwrap_err();
assert!(
err.to_string().contains("WAL poisoned"),
"expected the failed mutating query to poison the live handle, got {err}"
);
let next = db.execute("RETURN 1 AS ok", rows()).unwrap_err();
assert!(
next.to_string().contains("WAL arm failed"),
"expected future queries on the live handle to fail, got {next}"
);
}
let recovered = Database::open_with_wal(enabled(dir.path())).unwrap();
assert_eq!(
recovered.node_count(),
0,
"recovery should discard the aborted create/delete transaction"
);
}
#[test]
fn replay_preserves_ids_after_aborted_create_gap() {
let dir = TmpDir::new("id-gap");
{
let (wal, replay) =
Wal::open(dir.path(), SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO).unwrap();
assert!(replay.is_empty());
let aborted = wal.begin().unwrap();
wal.append(
aborted,
&MutationEvent::CreateNode {
id: 0,
labels: vec!["Discarded".into()],
properties: Properties::new(),
},
)
.unwrap();
wal.abort(aborted).unwrap();
wal.flush().unwrap();
let create = wal.begin().unwrap();
wal.append(
create,
&MutationEvent::CreateNode {
id: 1,
labels: vec!["Kept".into()],
properties: Properties::new(),
},
)
.unwrap();
wal.commit(create).unwrap();
wal.flush().unwrap();
let set_name = wal.begin().unwrap();
wal.append(
set_name,
&MutationEvent::SetNodeProperty {
node_id: 1,
key: "name".into(),
value: PropertyValue::String("survivor".into()),
},
)
.unwrap();
wal.commit(set_name).unwrap();
wal.flush().unwrap();
}
let db = Database::open_with_wal(enabled(dir.path())).unwrap();
assert_eq!(db.node_count(), 1);
let result = db
.execute(
"MATCH (n:Kept {name: 'survivor'}) RETURN n.name AS name",
rows(),
)
.unwrap();
let json = serde_json::to_value(&result).unwrap();
let row_array = json["rows"].as_array().expect("rows array");
assert_eq!(row_array.len(), 1);
assert_eq!(row_array[0]["name"], serde_json::json!("survivor"));
}
#[test]
fn replay_rejects_relationship_with_missing_endpoint() {
let dir = TmpDir::new("missing-endpoint");
{
let (wal, replay) =
Wal::open(dir.path(), SyncMode::PerCommit, 8 * 1024 * 1024, Lsn::ZERO).unwrap();
assert!(replay.is_empty());
let tx = wal.begin().unwrap();
wal.append(
tx,
&MutationEvent::CreateRelationship {
id: 0,
src: 10,
dst: 11,
rel_type: "BROKEN".into(),
properties: Properties::new(),
},
)
.unwrap();
wal.commit(tx).unwrap();
wal.flush().unwrap();
}
let err = match Database::open_with_wal(enabled(dir.path())) {
Ok(_) => panic!("recovery should reject the malformed relationship"),
Err(err) => err,
};
assert!(
err.to_string().contains("missing source node 10"),
"unexpected recovery error: {err}"
);
}
#[test]
fn checkpoint_truncates_segments_and_recovery_uses_snapshot() {
let wal_dir = TmpDir::new("ckpt-wal");
let snap_dir = TmpDir::new("ckpt-snap");
let snap_path = snap_dir.path().join("snapshot.bin");
let db = Database::open_with_wal(enabled(wal_dir.path())).unwrap();
for i in 0..10 {
db.execute(&format!("CREATE (:N {{i: {}}})", i), rows())
.unwrap();
}
let meta = db.checkpoint_to(&snap_path).unwrap();
assert_eq!(meta.node_count, 10);
assert!(
meta.wal_lsn.is_some(),
"checkpoint must stamp a wal_lsn into the snapshot header"
);
db.execute("CREATE (:N {i: 100})", rows()).unwrap();
db.execute("CREATE (:N {i: 101})", rows()).unwrap();
drop(db);
let recovered = Database::recover(&snap_path, enabled(wal_dir.path())).unwrap();
assert_eq!(recovered.node_count(), 12);
}
#[test]
fn group_mode_checkpoint_uses_fsynced_fence() {
let wal_dir = TmpDir::new("group-ckpt-wal");
let snap_dir = TmpDir::new("group-ckpt-snap");
let snap_path = snap_dir.path().join("snapshot.bin");
{
let db = Database::open_with_wal(group_enabled(wal_dir.path())).unwrap();
db.execute("CREATE (:N {i: 1})", rows()).unwrap();
let meta = db.checkpoint_to(&snap_path).unwrap();
assert!(
meta.wal_lsn.unwrap_or_default() > 0,
"checkpoint should stamp a non-zero WAL fence"
);
}
let recovered = Database::recover(&snap_path, group_enabled(wal_dir.path())).unwrap();
assert_eq!(recovered.node_count(), 1);
}
#[test]
fn recover_with_missing_snapshot_falls_back_to_wal_only() {
let wal_dir = TmpDir::new("missing-snap-wal");
let snap_dir = TmpDir::new("missing-snap-snap");
let absent = snap_dir.path().join("does-not-exist.bin");
{
let db = Database::open_with_wal(enabled(wal_dir.path())).unwrap();
db.execute("CREATE (:Z {v: 1})", rows()).unwrap();
db.execute("CREATE (:Z {v: 2})", rows()).unwrap();
}
let db = Database::recover(&absent, enabled(wal_dir.path())).unwrap();
assert_eq!(db.node_count(), 2);
}
#[test]
fn recover_with_disabled_wal_only_loads_snapshot() {
let snap_dir = TmpDir::new("disabled-recover");
let snap_path = snap_dir.path().join("seed.bin");
{
let db = Database::in_memory();
db.execute("CREATE (:Seed {x: 1})", rows()).unwrap();
db.execute("CREATE (:Seed {x: 2})", rows()).unwrap();
db.save_snapshot_to(&snap_path).unwrap();
}
let db = Database::recover(&snap_path, WalConfig::Disabled).unwrap();
assert_eq!(db.node_count(), 2);
assert!(db.wal().is_none());
}
#[test]
fn clear_brackets_through_wal() {
let dir = TmpDir::new("clear");
{
let db = Database::open_with_wal(enabled(dir.path())).unwrap();
db.execute("CREATE (:A {v: 1})", rows()).unwrap();
db.execute("CREATE (:A {v: 2})", rows()).unwrap();
db.clear();
assert_eq!(db.node_count(), 0);
db.execute("CREATE (:B {v: 3})", rows()).unwrap();
}
let db = Database::open_with_wal(enabled(dir.path())).unwrap();
assert_eq!(db.node_count(), 1);
}
#[test]
fn checkpoint_requires_wal() {
let snap_dir = TmpDir::new("no-wal-ckpt");
let snap_path = snap_dir.path().join("snap.bin");
let db = Database::in_memory();
let err = db.checkpoint_to(&snap_path).unwrap_err();
assert!(err.to_string().contains("WAL"));
}