use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};
use lora_database::{
Database, ExecuteOptions, LoraValue, QueryResult, ResultFormat, TransactionMode, WalConfig,
};
use serde_json::Value as JsonValue;
fn rows_options() -> Option<ExecuteOptions> {
Some(ExecuteOptions {
format: ResultFormat::Rows,
})
}
fn rows_json(result: QueryResult) -> Vec<JsonValue> {
let json = serde_json::to_value(result).unwrap();
json.get("rows")
.and_then(JsonValue::as_array)
.cloned()
.unwrap_or_default()
}
fn row_values(rows: Vec<lora_database::Row>, column: &str) -> Vec<JsonValue> {
rows.into_iter()
.map(|row| serde_json::to_value(row).unwrap())
.map(|row| row.get(column).cloned().unwrap_or(JsonValue::Null))
.collect()
}
struct TempWalDir {
path: PathBuf,
}
impl TempWalDir {
fn new(name: &str) -> Self {
let nonce = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
let mut path = std::env::temp_dir();
path.push(format!("lora-database-{name}-{nonce}"));
Self { path }
}
}
impl Drop for TempWalDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.path);
}
}
#[test]
fn database_stream_returns_rows_and_columns() {
let db = Database::in_memory();
db.execute(
"CREATE (:Person {name:'Ada'}), (:Person {name:'Grace'})",
rows_options(),
)
.unwrap();
let mut stream = db
.stream("MATCH (p:Person) RETURN p.name AS name ORDER BY name")
.unwrap();
assert_eq!(stream.columns(), &["name".to_string()]);
let values = row_values(stream.by_ref().collect(), "name");
assert_eq!(
values,
vec![
JsonValue::String("Ada".to_string()),
JsonValue::String("Grace".to_string())
]
);
assert_eq!(stream.next(), None);
}
#[test]
fn execute_with_timeout_cancels_before_work_continues() {
let db = Database::in_memory();
db.execute(
"UNWIND range(1,100) AS i CREATE (:T {i: i})",
rows_options(),
)
.unwrap();
let err = db
.execute_with_timeout(
"MATCH (t:T) RETURN t.i AS i",
rows_options(),
std::time::Duration::ZERO,
)
.unwrap_err();
assert!(
format!("{err:#}").contains("query deadline exceeded"),
"expected query timeout, got: {err:#}"
);
}
#[test]
fn transaction_commit_publishes_staged_changes() {
let db = Database::in_memory();
let mut tx = db.begin_transaction(TransactionMode::ReadWrite).unwrap();
tx.execute("CREATE (:Person {name:'Ada'})", rows_options())
.unwrap();
tx.commit().unwrap();
let rows = rows_json(
db.execute("MATCH (p:Person) RETURN p.name AS name", rows_options())
.unwrap(),
);
assert_eq!(rows[0]["name"], JsonValue::String("Ada".to_string()));
}
#[test]
fn transaction_rollback_discards_staged_changes() {
let db = Database::in_memory();
let mut tx = db.begin_transaction(TransactionMode::ReadWrite).unwrap();
tx.execute("CREATE (:Person {name:'Ada'})", rows_options())
.unwrap();
tx.rollback().unwrap();
let rows = rows_json(
db.execute("MATCH (p:Person) RETURN p.name AS name", rows_options())
.unwrap(),
);
assert!(rows.is_empty());
}
#[test]
fn read_only_transaction_rejects_writes() {
let db = Database::in_memory();
let mut tx = db.begin_transaction(TransactionMode::ReadOnly).unwrap();
let err = tx
.execute("CREATE (:Person {name:'Ada'})", rows_options())
.unwrap_err()
.to_string();
assert!(err.contains("read-only mode (CREATE"));
tx.rollback().unwrap();
}
#[test]
fn transaction_stream_reads_staged_state() {
let db = Database::in_memory();
let mut tx = db.begin_transaction(TransactionMode::ReadWrite).unwrap();
tx.execute(
"CREATE (:Person {name:'Ada'}), (:Person {name:'Grace'})",
rows_options(),
)
.unwrap();
let stream = tx
.stream("MATCH (p:Person) RETURN p.name AS name ORDER BY name")
.unwrap();
let values = row_values(stream.collect(), "name");
assert_eq!(
values,
vec![
JsonValue::String("Ada".to_string()),
JsonValue::String("Grace".to_string())
]
);
tx.rollback().unwrap();
}
#[test]
fn wal_replays_committed_transaction_but_not_rolled_back_transaction() {
let dir = TempWalDir::new("tx-wal");
{
let db = Database::open_with_wal(WalConfig::enabled(dir.path.clone())).unwrap();
let mut rolled_back = db.begin_transaction(TransactionMode::ReadWrite).unwrap();
rolled_back
.execute("CREATE (:Person {name:'Ada'})", rows_options())
.unwrap();
rolled_back.rollback().unwrap();
let mut committed = db.begin_transaction(TransactionMode::ReadWrite).unwrap();
committed
.execute("CREATE (:Person {name:'Grace'})", rows_options())
.unwrap();
committed.commit().unwrap();
}
{
let db = Database::open_with_wal(WalConfig::enabled(dir.path.clone())).unwrap();
let stream = db
.stream("MATCH (p:Person) RETURN p.name AS name ORDER BY name")
.unwrap();
let values = row_values(stream.collect(), "name");
assert_eq!(values, vec![JsonValue::String("Grace".to_string())]);
}
}
#[test]
fn transaction_with_params() {
let db = Database::in_memory();
let mut params = std::collections::BTreeMap::new();
params.insert("name".to_string(), LoraValue::String("Ada".to_string()));
let mut tx = db.begin_transaction(TransactionMode::ReadWrite).unwrap();
tx.execute_with_params("CREATE (:Person {name:$name})", rows_options(), params)
.unwrap();
tx.commit().unwrap();
let rows = rows_json(
db.execute("MATCH (p:Person) RETURN p.name AS name", rows_options())
.unwrap(),
);
assert_eq!(rows[0]["name"], JsonValue::String("Ada".to_string()));
}
#[test]
fn streaming_simple_match_yields_rows_one_at_a_time() {
let db = Database::in_memory();
db.execute(
"CREATE (:Person {name:'Ada'}), (:Person {name:'Grace'})",
rows_options(),
)
.unwrap();
let mut stream = db
.stream("MATCH (p:Person) RETURN p.name AS name ORDER BY name")
.unwrap();
assert_eq!(stream.columns(), &["name".to_string()]);
let first = stream.next_row().unwrap();
assert!(first.is_some());
let second = stream.next_row().unwrap();
assert!(second.is_some());
assert!(stream.next_row().unwrap().is_none());
assert!(stream.next_row().unwrap().is_none());
}
#[test]
fn empty_streaming_result_still_reports_columns() {
let db = Database::in_memory();
let stream = db
.stream("MATCH (p:Missing) RETURN p.name AS name, p.age AS age")
.unwrap();
assert_eq!(
stream.columns(),
&["name".to_string(), "age".to_string()],
"plan-derived columns must survive empty results"
);
let collected: Vec<_> = stream.collect();
assert!(collected.is_empty());
}
#[test]
fn streaming_with_blocking_operators_still_works() {
let db = Database::in_memory();
db.execute(
"CREATE (:Person {name:'Ada', age:30}), (:Person {name:'Grace', age:42}), (:Person {name:'Linus', age:25})",
rows_options(),
)
.unwrap();
let stream = db
.stream("MATCH (p:Person) RETURN p.name AS name ORDER BY p.age DESC LIMIT 2")
.unwrap();
assert_eq!(stream.columns(), &["name".to_string()]);
let values = row_values(stream.collect(), "name");
assert_eq!(
values,
vec![
JsonValue::String("Grace".to_string()),
JsonValue::String("Ada".to_string())
]
);
}
#[test]
fn dropping_read_stream_releases_resources() {
let db = Database::in_memory();
db.execute("CREATE (:N {v:1}), (:N {v:2})", rows_options())
.unwrap();
{
let _stream = db.stream("MATCH (n:N) RETURN n.v").unwrap();
}
let result = db.execute("MATCH (n:N) RETURN count(n) AS c", rows_options());
assert!(result.is_ok());
}
#[test]
fn read_only_transaction_rejects_create_merge_set_delete_remove() {
let db = Database::in_memory();
db.execute("CREATE (:N {v:1})", rows_options()).unwrap();
for query in [
"CREATE (:Person)",
"MERGE (:Person {name:'Ada'})",
"MATCH (n:N) SET n.v = 2",
"MATCH (n:N) REMOVE n.v",
"MATCH (n:N) DELETE n",
] {
let mut tx = db.begin_transaction(TransactionMode::ReadOnly).unwrap();
let err = tx
.execute(query, rows_options())
.expect_err(&format!("{query} should be rejected in ReadOnly tx"));
let msg = err.to_string();
assert!(
msg.contains("read-only mode"),
"unexpected error for `{query}`: {msg}"
);
tx.rollback().unwrap();
}
}
#[test]
fn transaction_cannot_commit_with_active_cursor() {
let db = Database::in_memory();
db.execute("CREATE (:N {v:1}), (:N {v:2})", rows_options())
.unwrap();
let mut tx = db.begin_transaction(TransactionMode::ReadWrite).unwrap();
let stream = tx.stream("MATCH (n:N) RETURN n.v").unwrap();
drop(stream);
tx.commit().unwrap();
}
#[test]
fn transaction_cursor_active_blocks_commit() {
let db = Database::in_memory();
db.execute("CREATE (:N {v:1}), (:N {v:2})", rows_options())
.unwrap();
let mut tx = db.begin_transaction(TransactionMode::ReadWrite).unwrap();
{
let _stream = tx.stream("MATCH (n:N) RETURN n.v").unwrap();
let err = tx
.execute("CREATE (:Marker)", rows_options())
.expect_err("a second statement must be rejected while a cursor is active");
assert!(err.to_string().contains("cursor"));
}
tx.commit().unwrap();
}
#[test]
fn failed_statement_rolls_back_only_that_statement() {
let db = Database::in_memory();
let mut tx = db.begin_transaction(TransactionMode::ReadWrite).unwrap();
tx.execute("CREATE (:Person {name:'Ada'})", rows_options())
.unwrap();
let err = tx.execute("MATCH (n:Person) SET n = 42", rows_options());
assert!(err.is_err(), "the statement must fail");
let result = tx
.execute("MATCH (p:Person) RETURN p.name AS name", rows_options())
.unwrap();
let rows = rows_json(result);
assert_eq!(rows.len(), 1);
assert_eq!(rows[0]["name"], JsonValue::String("Ada".to_string()));
tx.commit().unwrap();
}
#[test]
fn dropped_stream_in_tx_rolls_back_only_that_statement() {
let db = Database::in_memory();
let mut tx = db.begin_transaction(TransactionMode::ReadWrite).unwrap();
tx.execute("CREATE (:Person {name:'Ada'})", rows_options())
.unwrap();
{
let _stream = tx
.stream("CREATE (:Person {name:'DroppedAndRolledBack'}) RETURN 1 AS one")
.unwrap();
}
let result = tx
.execute(
"MATCH (p:Person) RETURN p.name AS name ORDER BY name",
rows_options(),
)
.unwrap();
let rows = rows_json(result);
assert_eq!(
rows.len(),
1,
"only Ada should remain after the dropped statement was rolled back"
);
assert_eq!(rows[0]["name"], JsonValue::String("Ada".to_string()));
tx.commit().unwrap();
}
#[test]
fn wal_replay_excludes_failed_statement_inside_committed_transaction() {
let dir = TempWalDir::new("tx-wal-failed-stmt");
{
let db = Database::open_with_wal(WalConfig::enabled(dir.path.clone())).unwrap();
let mut tx = db.begin_transaction(TransactionMode::ReadWrite).unwrap();
tx.execute("CREATE (:Person {name:'Ada'})", rows_options())
.unwrap();
let _ = tx.execute("MATCH (n:Person) SET n = 42", rows_options());
tx.execute("CREATE (:Person {name:'Grace'})", rows_options())
.unwrap();
tx.commit().unwrap();
}
{
let db = Database::open_with_wal(WalConfig::enabled(dir.path.clone())).unwrap();
let stream = db
.stream("MATCH (p:Person) RETURN p.name AS name ORDER BY name")
.unwrap();
let values = row_values(stream.collect(), "name");
assert_eq!(
values,
vec![
JsonValue::String("Ada".to_string()),
JsonValue::String("Grace".to_string())
]
);
}
}
#[test]
fn wal_replay_excludes_dropped_stream_inside_committed_transaction() {
let dir = TempWalDir::new("tx-wal-dropped-stream");
{
let db = Database::open_with_wal(WalConfig::enabled(dir.path.clone())).unwrap();
let mut tx = db.begin_transaction(TransactionMode::ReadWrite).unwrap();
tx.execute("CREATE (:Person {name:'Ada'})", rows_options())
.unwrap();
{
let _stream = tx
.stream("CREATE (:Person {name:'Dropped'}) RETURN 1 AS one")
.unwrap();
}
tx.execute("CREATE (:Person {name:'Grace'})", rows_options())
.unwrap();
tx.commit().unwrap();
}
{
let db = Database::open_with_wal(WalConfig::enabled(dir.path.clone())).unwrap();
let stream = db
.stream("MATCH (p:Person) RETURN p.name AS name ORDER BY name")
.unwrap();
let values = row_values(stream.collect(), "name");
assert_eq!(
values,
vec![
JsonValue::String("Ada".to_string()),
JsonValue::String("Grace".to_string())
]
);
}
}
#[test]
fn auto_commit_write_stream_commits_on_full_exhaustion() {
let db = Database::in_memory();
let stream = db
.stream("CREATE (:Person {name:'Ada'}) RETURN 1 AS one")
.unwrap();
let collected: Vec<_> = stream.collect();
assert_eq!(collected.len(), 1);
let rows = rows_json(
db.execute("MATCH (p:Person) RETURN p.name AS name", rows_options())
.unwrap(),
);
assert_eq!(rows.len(), 1);
assert_eq!(rows[0]["name"], JsonValue::String("Ada".to_string()));
}
#[test]
fn auto_commit_write_stream_rolls_back_on_drop() {
let db = Database::in_memory();
{
let _stream = db
.stream("CREATE (:Person {name:'Ada'}) RETURN 1 AS one")
.unwrap();
}
let rows = rows_json(
db.execute("MATCH (p:Person) RETURN p.name AS name", rows_options())
.unwrap(),
);
assert!(
rows.is_empty(),
"auto-commit write stream dropped pre-exhaustion must not publish staged changes"
);
}
#[test]
fn auto_commit_write_stream_rolls_back_on_partial_consumption() {
let db = Database::in_memory();
{
let mut stream = db
.stream("UNWIND [1,2,3] AS x CREATE (:N {value:x}) RETURN x")
.unwrap();
let _first = stream.next();
}
let rows = rows_json(
db.execute("MATCH (n:N) RETURN count(n) AS c", rows_options())
.unwrap(),
);
assert_eq!(rows[0]["c"], JsonValue::Number(0.into()));
}
#[test]
fn auto_commit_write_stream_with_wal_only_writes_on_exhaustion() {
let dir = TempWalDir::new("auto-commit-write-stream-wal");
{
let db = Database::open_with_wal(WalConfig::enabled(dir.path.clone())).unwrap();
{
let _ = db
.stream("CREATE (:Person {name:'Dropped'}) RETURN 1 AS one")
.unwrap();
}
let stream = db
.stream("CREATE (:Person {name:'Committed'}) RETURN 1 AS one")
.unwrap();
let _: Vec<_> = stream.collect();
}
{
let db = Database::open_with_wal(WalConfig::enabled(dir.path.clone())).unwrap();
let rows: Vec<_> = db
.stream("MATCH (p:Person) RETURN p.name AS name ORDER BY name")
.unwrap()
.collect();
let values = row_values(rows, "name");
assert_eq!(values, vec![JsonValue::String("Committed".to_string())]);
}
}
#[test]
fn db_stream_read_only_uses_live_pull_cursor() {
let db = Database::in_memory();
for i in 0..5_000 {
db.execute(&format!("CREATE (:N {{v:{i}}})"), rows_options())
.unwrap();
}
let mut stream = db.stream("MATCH (n:N) RETURN n.v").unwrap();
assert_eq!(stream.columns(), &["v".to_string()]);
let first = stream.next_row().unwrap();
assert!(first.is_some(), "live stream must produce a row");
drop(stream);
let count = rows_json(
db.execute("MATCH (n:N) RETURN count(n) AS c", rows_options())
.unwrap(),
);
assert_eq!(count[0]["c"], JsonValue::Number(5000.into()));
}
#[test]
fn auto_commit_read_stream_does_not_pay_staging_cost() {
let db = Database::in_memory();
db.execute("CREATE (:N {v:1}), (:N {v:2})", rows_options())
.unwrap();
let stream = db.stream("MATCH (n:N) RETURN n.v").unwrap();
assert_eq!(stream.columns(), &["v".to_string()]);
let collected: Vec<_> = stream.collect();
assert_eq!(collected.len(), 2);
}
mod pull_shape {
use lora_compiler::Compiler;
use lora_database::{parse_query, Database, InMemoryGraph};
use lora_executor::{drain, PullExecutor, RowSource};
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
fn compile(store: &lora_store::InMemoryGraph, query: &str) -> lora_compiler::CompiledQuery {
let document = parse_query(query).unwrap();
let resolved = {
let mut analyzer = lora_analyzer::Analyzer::new(store);
analyzer.analyze(&document).unwrap()
};
Compiler::compile(&resolved)
}
fn open<'a>(
store: &'a InMemoryGraph,
compiled: &'a lora_compiler::CompiledQuery,
) -> Box<dyn RowSource + 'a> {
PullExecutor::new(store, BTreeMap::new())
.open_compiled(compiled)
.unwrap()
}
#[test]
fn match_stream_yields_first_row_before_consuming_full_input() {
let db = Database::in_memory();
for i in 0..1000 {
db.execute(
&format!("CREATE (:N {{v:{i}}})"),
Some(lora_database::ExecuteOptions {
format: lora_database::ResultFormat::Rows,
}),
)
.unwrap();
}
let store = db.store().read().unwrap();
let compiled = compile(&store, "MATCH (n:N) RETURN n.v AS v");
let mut cursor = open(&store, &compiled);
let first = cursor.next_row().unwrap();
assert!(first.is_some());
}
#[test]
fn filter_pulls_only_until_predicate_match() {
let db = Database::in_memory();
db.execute(
"CREATE (:N {v:1}), (:N {v:2}), (:N {v:3}), (:N {v:4})",
Some(lora_database::ExecuteOptions {
format: lora_database::ResultFormat::Rows,
}),
)
.unwrap();
let store = db.store().read().unwrap();
let compiled = compile(&store, "MATCH (n:N) WHERE n.v = 3 RETURN n.v AS v");
let rows = drain(open(&store, &compiled).as_mut()).unwrap();
assert_eq!(rows.len(), 1);
}
#[test]
fn property_equality_filter_lowers_to_indexed_scan() {
let db = Database::in_memory();
db.execute(
"CREATE (:N {v:1}), (:N {v:2})",
Some(lora_database::ExecuteOptions {
format: lora_database::ResultFormat::Rows,
}),
)
.unwrap();
let store = db.store().read().unwrap();
let compiled = compile(&store, "MATCH (n:N) WHERE n.v = 2 RETURN n.v AS v");
let indexed = compiled.physical.nodes.iter().any(|op| {
matches!(
op,
lora_compiler::PhysicalOp::NodeByPropertyScan(scan)
if scan.key == "v"
&& scan.labels == vec![vec!["N".to_string()]]
)
});
assert!(indexed, "expected property equality to use indexed scan");
}
#[test]
fn indexed_scan_preserves_numeric_cross_type_equality() {
let db = Database::in_memory();
db.execute(
"CREATE (:N {v:1}), (:N {v:1.0}), (:N {v:2})",
Some(lora_database::ExecuteOptions {
format: lora_database::ResultFormat::Rows,
}),
)
.unwrap();
let store = db.store().read().unwrap();
let compiled = compile(&store, "MATCH (n:N) WHERE n.v = 1.0 RETURN n.v AS v");
let rows = drain(open(&store, &compiled).as_mut()).unwrap();
assert_eq!(rows.len(), 2);
}
#[test]
fn unwind_yields_each_element_lazily() {
let db = Database::in_memory();
let store = db.store().read().unwrap();
let compiled = compile(&store, "UNWIND [10, 20, 30] AS x RETURN x");
let mut cursor = open(&store, &compiled);
let r1 = cursor.next_row().unwrap().unwrap();
let r2 = cursor.next_row().unwrap().unwrap();
let r3 = cursor.next_row().unwrap().unwrap();
assert!(cursor.next_row().unwrap().is_none());
let _ = (r1, r2, r3);
}
#[test]
fn limit_stops_pulling_after_emitting_n() {
let db = Database::in_memory();
for i in 0..100 {
db.execute(
&format!("CREATE (:N {{v:{i}}})"),
Some(lora_database::ExecuteOptions {
format: lora_database::ResultFormat::Rows,
}),
)
.unwrap();
}
let store = db.store().read().unwrap();
let compiled = compile(&store, "MATCH (n:N) RETURN n.v AS v LIMIT 5");
let rows = drain(open(&store, &compiled).as_mut()).unwrap();
assert_eq!(rows.len(), 5);
}
#[test]
fn projection_pulls_one_in_one_out() {
let db = Database::in_memory();
db.execute(
"CREATE (:N {v:1}), (:N {v:2}), (:N {v:3})",
Some(lora_database::ExecuteOptions {
format: lora_database::ResultFormat::Rows,
}),
)
.unwrap();
let store = db.store().read().unwrap();
let compiled = compile(&store, "MATCH (n:N) RETURN n.v + 10 AS v");
let rows = drain(open(&store, &compiled).as_mut()).unwrap();
assert_eq!(rows.len(), 3);
}
#[test]
fn expand_streams_per_input_row() {
let db = Database::in_memory();
db.execute(
"CREATE (a:Person {name:'Ada'})-[:KNOWS]->(b:Person {name:'Bob'}),
(a)-[:KNOWS]->(:Person {name:'Carol'}),
(b)-[:KNOWS]->(:Person {name:'Dave'})",
Some(lora_database::ExecuteOptions {
format: lora_database::ResultFormat::Rows,
}),
)
.unwrap();
let store = db.store().read().unwrap();
let compiled = compile(
&store,
"MATCH (p:Person)-[:KNOWS]->(other) RETURN other.name AS name ORDER BY name",
);
let rows = drain(open(&store, &compiled).as_mut()).unwrap();
assert_eq!(rows.len(), 3);
}
#[test]
fn blocking_operator_still_produces_correct_results() {
let db = Database::in_memory();
db.execute(
"CREATE (:N {v:5}), (:N {v:1}), (:N {v:3}), (:N {v:2}), (:N {v:4})",
Some(lora_database::ExecuteOptions {
format: lora_database::ResultFormat::Rows,
}),
)
.unwrap();
let store = db.store().read().unwrap();
let compiled = compile(&store, "MATCH (n:N) RETURN n.v AS v ORDER BY n.v");
let rows = drain(open(&store, &compiled).as_mut()).unwrap();
let values: Vec<_> = rows
.into_iter()
.map(|r| serde_json::to_value(&r).unwrap())
.map(|v| v.get("v").and_then(|x| x.as_i64()).unwrap_or(-1))
.collect();
assert_eq!(values, vec![1, 2, 3, 4, 5]);
}
#[test]
fn aggregation_buffers_internally_and_yields_correctly() {
let db = Database::in_memory();
db.execute(
"CREATE (:N {v:1}), (:N {v:2}), (:N {v:3})",
Some(lora_database::ExecuteOptions {
format: lora_database::ResultFormat::Rows,
}),
)
.unwrap();
let store = db.store().read().unwrap();
let compiled = compile(&store, "MATCH (n:N) RETURN sum(n.v) AS total");
let rows = drain(open(&store, &compiled).as_mut()).unwrap();
assert_eq!(rows.len(), 1);
}
#[test]
fn classify_stream_recognises_writes() {
use lora_executor::{classify_stream, StreamShape};
let db = Database::in_memory();
let store = db.store().read().unwrap();
let read = compile(&store, "MATCH (n) RETURN n");
let write = compile(&store, "CREATE (:Foo) RETURN 1 AS one");
assert_eq!(classify_stream(&read), StreamShape::ReadOnly);
assert_eq!(classify_stream(&write), StreamShape::Mutating);
assert!(classify_stream(&write).is_mutating());
}
#[allow(dead_code)]
fn _unused() {
let _ = Arc::new(Mutex::new(()));
}
}
#[test]
fn read_only_transaction_does_not_appear_in_wal_replay() {
let dir = TempWalDir::new("tx-wal-readonly");
{
let db = Database::open_with_wal(WalConfig::enabled(dir.path.clone())).unwrap();
db.execute("CREATE (:Person {name:'Ada'})", rows_options())
.unwrap();
let mut tx = db.begin_transaction(TransactionMode::ReadOnly).unwrap();
let _ = tx
.execute("MATCH (p:Person) RETURN p.name", rows_options())
.unwrap();
tx.commit().unwrap();
}
{
let db = Database::open_with_wal(WalConfig::enabled(dir.path.clone())).unwrap();
let stream = db.stream("MATCH (p:Person) RETURN p.name AS name").unwrap();
let values = row_values(stream.collect(), "name");
assert_eq!(values, vec![JsonValue::String("Ada".to_string())]);
}
}
mod concurrency {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::Duration;
use lora_database::{Database, TransactionMode};
use serde_json::Value as JsonValue;
use super::{row_values, rows_options};
#[test]
fn read_only_queries_can_run_while_read_guard_is_held() {
let db = Arc::new(Database::in_memory());
db.execute("CREATE (:T {i:1}), (:T {i:2})", rows_options())
.unwrap();
let read_guard = db.store().read().unwrap();
let (tx, rx) = mpsc::channel();
let worker = {
let db = db.clone();
thread::spawn(move || {
let rows = db
.execute_rows("MATCH (t:T) RETURN t.i AS i ORDER BY i")
.unwrap();
tx.send(rows.len()).unwrap();
})
};
assert_eq!(
rx.recv_timeout(Duration::from_millis(100)).unwrap(),
2,
"read-only query should share the store read lock"
);
drop(read_guard);
worker.join().unwrap();
}
#[test]
fn read_only_transactions_can_run_while_read_guard_is_held() {
let db = Arc::new(Database::in_memory());
db.execute("CREATE (:T {i:1}), (:T {i:2})", rows_options())
.unwrap();
let read_guard = db.store().read().unwrap();
let (tx, rx) = mpsc::channel();
let worker = {
let db = db.clone();
thread::spawn(move || {
let mut tx_handle = db.begin_transaction(TransactionMode::ReadOnly).unwrap();
let rows = tx_handle
.execute_rows("MATCH (t:T) RETURN t.i AS i ORDER BY i")
.unwrap();
tx_handle.commit().unwrap();
tx.send(rows.len()).unwrap();
})
};
assert_eq!(
rx.recv_timeout(Duration::from_millis(100)).unwrap(),
2,
"read-only transaction should share the store read lock"
);
drop(read_guard);
worker.join().unwrap();
}
#[test]
fn concurrent_readwrite_transactions_serialize() {
let db = Arc::new(Database::in_memory());
let owner = Arc::new(AtomicUsize::new(0));
let a_holds = Arc::new(AtomicUsize::new(0));
let a = {
let db = db.clone();
let owner = owner.clone();
let a_holds = a_holds.clone();
thread::spawn(move || {
let mut tx = db.begin_transaction(TransactionMode::ReadWrite).unwrap();
assert!(
owner
.compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst)
.is_ok(),
"another tx held the store write lock when A entered"
);
a_holds.store(1, Ordering::SeqCst);
thread::sleep(Duration::from_millis(40));
tx.execute("CREATE (:T {who:'A'})", rows_options()).unwrap();
assert!(
owner
.compare_exchange(1, 0, Ordering::SeqCst, Ordering::SeqCst)
.is_ok(),
"owner state corrupted before A's commit"
);
tx.commit().unwrap();
})
};
while a_holds.load(Ordering::SeqCst) == 0 {
thread::yield_now();
}
let b = {
let db = db.clone();
let owner = owner.clone();
thread::spawn(move || {
let mut tx = db.begin_transaction(TransactionMode::ReadWrite).unwrap();
assert!(
owner
.compare_exchange(0, 2, Ordering::SeqCst, Ordering::SeqCst)
.is_ok(),
"B entered while A still held the store write lock"
);
tx.execute("CREATE (:T {who:'B'})", rows_options()).unwrap();
assert!(
owner
.compare_exchange(2, 0, Ordering::SeqCst, Ordering::SeqCst)
.is_ok(),
"owner state corrupted before B's commit"
);
tx.commit().unwrap();
})
};
a.join().unwrap();
b.join().unwrap();
let rows = db.execute_rows("MATCH (t:T) RETURN t.who AS who").unwrap();
let mut values = row_values(rows, "who");
values.sort_by_key(|v| v.as_str().map(str::to_owned));
assert_eq!(
values,
vec![
JsonValue::String("A".to_string()),
JsonValue::String("B".to_string()),
]
);
}
#[test]
fn live_read_stream_blocks_concurrent_writer() {
let db = Arc::new(Database::in_memory());
db.execute("UNWIND range(1,10) AS i CREATE (:T {i: i})", rows_options())
.unwrap();
let mut stream = db.stream("MATCH (t:T) RETURN t.i AS i").unwrap();
assert!(stream.next_row().unwrap().is_some());
let started = Arc::new(AtomicUsize::new(0));
let entered = Arc::new(AtomicUsize::new(0));
let writer = {
let db = db.clone();
let started = started.clone();
let entered = entered.clone();
thread::spawn(move || {
started.store(1, Ordering::SeqCst);
let mut tx = db.begin_transaction(TransactionMode::ReadWrite).unwrap();
entered.store(1, Ordering::SeqCst);
tx.execute("CREATE (:T {i:99})", rows_options()).unwrap();
tx.commit().unwrap();
})
};
while started.load(Ordering::SeqCst) == 0 {
thread::yield_now();
}
thread::sleep(Duration::from_millis(20));
assert_eq!(
entered.load(Ordering::SeqCst),
0,
"writer entered tx while a Live read stream still held the store read lock"
);
drop(stream);
writer.join().unwrap();
assert_eq!(db.node_count(), 11);
}
#[test]
fn active_tx_cursor_blocks_new_statement_or_stream() {
let db = Database::in_memory();
db.execute("CREATE (:T {n:1}), (:T {n:2})", rows_options())
.unwrap();
let mut tx = db.begin_transaction(TransactionMode::ReadWrite).unwrap();
let mut stream = tx.stream("MATCH (t:T) RETURN t.n AS n").unwrap();
assert!(stream.next_row().unwrap().is_some());
let exec_err = tx
.execute("MATCH (t:T) RETURN count(t)", rows_options())
.unwrap_err();
assert!(
format!("{exec_err:#}").contains("streaming cursor"),
"expected streaming-cursor error, got: {exec_err:#}"
);
let stream_err = tx.stream("MATCH (t:T) RETURN t.n AS n").unwrap_err();
assert!(
format!("{stream_err:#}").contains("streaming cursor"),
"expected streaming-cursor error, got: {stream_err:#}"
);
drop(stream);
let _ = tx
.execute("MATCH (t:T) RETURN count(t)", rows_options())
.unwrap();
tx.commit().unwrap();
}
}
mod streaming_writes {
use lora_database::Database;
use serde_json::Value as JsonValue;
use super::{rows_json, rows_options};
#[test]
fn unwind_create_streams_input_correctly() {
let db = Database::in_memory();
let n: i64 = 5_000;
db.execute(
&format!("UNWIND range(1, {n}) AS i CREATE (:T {{i: i}})"),
rows_options(),
)
.unwrap();
assert_eq!(db.node_count(), n as usize);
let result = db
.execute(
"MATCH (t:T) RETURN sum(t.i) AS s, count(t) AS c",
rows_options(),
)
.unwrap();
let rows = rows_json(result);
assert_eq!(rows.len(), 1);
let s = rows[0].get("s").and_then(JsonValue::as_i64).unwrap();
let c = rows[0].get("c").and_then(JsonValue::as_i64).unwrap();
assert_eq!(c, n);
assert_eq!(s, n * (n + 1) / 2);
}
#[test]
fn match_filter_create_streams_input_correctly() {
let db = Database::in_memory();
db.execute(
"UNWIND range(1, 200) AS i CREATE (:Src {i: i})",
rows_options(),
)
.unwrap();
db.execute(
"MATCH (s:Src) WHERE s.i > 100 CREATE (:Dst {origin: s.i})",
rows_options(),
)
.unwrap();
let result = db
.execute("MATCH (d:Dst) RETURN count(d) AS c", rows_options())
.unwrap();
let rows = rows_json(result);
assert_eq!(rows[0].get("c").and_then(JsonValue::as_i64).unwrap(), 100);
}
#[test]
fn create_after_sort_with_limit_streams_correctly() {
let db = Database::in_memory();
db.execute(
"UNWIND range(1, 50) AS i CREATE (:Src {i: i})",
rows_options(),
)
.unwrap();
db.execute(
"MATCH (s:Src) WITH s ORDER BY s.i DESC LIMIT 10 CREATE (:Top {origin: s.i})",
rows_options(),
)
.unwrap();
let result = db
.execute("MATCH (t:Top) RETURN sum(t.origin) AS s", rows_options())
.unwrap();
let rows = rows_json(result);
assert_eq!(
rows[0].get("s").and_then(JsonValue::as_i64).unwrap(),
(41..=50).sum::<i64>()
);
}
#[test]
fn auto_commit_stream_streamable_create_round_trip() {
let db = Database::in_memory();
let n = 250usize;
let stream = db
.stream(&format!("UNWIND range(1, {n}) AS i CREATE (:T {{i: i}})"))
.unwrap();
let count = stream.count();
assert_eq!(count, n);
assert_eq!(db.node_count(), n);
}
#[test]
fn auto_commit_stream_writes_lazily_then_rolls_back_on_drop() {
let db = Database::in_memory();
let mut stream = db
.stream("UNWIND range(1, 1000) AS i CREATE (n:T {i: i}) RETURN n.i AS i")
.unwrap();
let first = stream.next_row().unwrap().expect("at least one row");
let i = serde_json::to_value(first)
.unwrap()
.get("i")
.and_then(JsonValue::as_i64)
.unwrap();
assert_eq!(i, 1);
drop(stream);
assert_eq!(db.node_count(), 0);
}
#[test]
fn auto_commit_stream_commits_on_full_exhaustion() {
let db = Database::in_memory();
let stream = db
.stream("UNWIND range(1, 100) AS i CREATE (n:T {i: i}) RETURN n.i AS i")
.unwrap();
let collected: Vec<i64> = stream
.filter_map(|row| {
serde_json::to_value(row)
.ok()
.and_then(|v| v.get("i").and_then(JsonValue::as_i64))
})
.collect();
assert_eq!(collected, (1..=100).collect::<Vec<_>>());
assert_eq!(db.node_count(), 100);
}
#[test]
fn auto_commit_stream_large_input_completes() {
let db = Database::in_memory();
let n = 10_000usize;
let stream = db
.stream(&format!(
"UNWIND range(1, {n}) AS i CREATE (n:T {{i: i}}) RETURN n.i AS i"
))
.unwrap();
let count = stream.count();
assert_eq!(count, n);
assert_eq!(db.node_count(), n);
}
#[test]
fn set_with_streamable_input_updates_all_rows() {
let db = Database::in_memory();
let n: i64 = 1_000;
db.execute(
&format!("UNWIND range(1, {n}) AS i CREATE (:T {{i: i}})"),
rows_options(),
)
.unwrap();
db.execute(
"MATCH (t:T) WHERE t.i % 2 = 0 SET t.even = true",
rows_options(),
)
.unwrap();
let result = db
.execute(
"MATCH (t:T) WHERE t.even = true RETURN count(t) AS c",
rows_options(),
)
.unwrap();
let rows = rows_json(result);
assert_eq!(rows[0].get("c").and_then(JsonValue::as_i64).unwrap(), n / 2);
}
#[test]
fn delete_with_streamable_input_removes_all_targets() {
let db = Database::in_memory();
db.execute(
"UNWIND range(1, 500) AS i CREATE (:T {i: i})",
rows_options(),
)
.unwrap();
db.execute("MATCH (t:T) WHERE t.i > 100 DELETE t", rows_options())
.unwrap();
assert_eq!(db.node_count(), 100);
}
#[test]
fn remove_with_streamable_input_completes() {
let db = Database::in_memory();
db.execute(
"UNWIND range(1, 200) AS i CREATE (:T {i: i, scratch: 'x'})",
rows_options(),
)
.unwrap();
db.execute("MATCH (t:T) REMOVE t.scratch", rows_options())
.unwrap();
assert_eq!(db.node_count(), 200);
}
#[test]
fn auto_commit_set_stream_writes_lazily() {
let db = Database::in_memory();
db.execute(
"UNWIND range(1, 100) AS i CREATE (:T {i: i})",
rows_options(),
)
.unwrap();
let mut stream = db
.stream("MATCH (t:T) SET t.tagged = true RETURN t.i AS i")
.unwrap();
assert!(stream.next_row().unwrap().is_some());
drop(stream);
assert_eq!(db.node_count(), 100);
}
#[test]
fn auto_commit_set_stream_commits_on_exhaustion() {
let db = Database::in_memory();
db.execute(
"UNWIND range(1, 100) AS i CREATE (:T {i: i, marked: false})",
rows_options(),
)
.unwrap();
let stream = db
.stream("MATCH (t:T) SET t.marked = true RETURN t.i AS i")
.unwrap();
let count = stream.count();
assert_eq!(count, 100);
let result = db
.execute(
"MATCH (t:T) WHERE t.marked = true RETURN count(t) AS c",
rows_options(),
)
.unwrap();
let rows = rows_json(result);
assert_eq!(rows[0].get("c").and_then(JsonValue::as_i64).unwrap(), 100);
}
#[test]
fn auto_commit_delete_stream_rolls_back_on_drop() {
let db = Database::in_memory();
db.execute(
"UNWIND range(1, 100) AS i CREATE (:T {i: i})",
rows_options(),
)
.unwrap();
let mut stream = db
.stream("MATCH (t:T) WHERE t.i > 50 DELETE t RETURN t.i AS i")
.unwrap();
assert!(stream.next_row().unwrap().is_some());
drop(stream);
assert_eq!(db.node_count(), 100);
}
#[test]
fn auto_commit_delete_stream_commits_on_exhaustion() {
let db = Database::in_memory();
db.execute(
"UNWIND range(1, 100) AS i CREATE (:T {i: i})",
rows_options(),
)
.unwrap();
let stream = db
.stream("MATCH (t:T) WHERE t.i > 50 DELETE t RETURN t.i AS i")
.unwrap();
let count = stream.count();
assert_eq!(count, 50);
assert_eq!(db.node_count(), 50);
}
#[test]
fn auto_commit_merge_stream_writes_lazily() {
let db = Database::in_memory();
db.execute(
"UNWIND range(1, 50) AS i CREATE (:T {i: i})",
rows_options(),
)
.unwrap();
let mut stream = db
.stream("UNWIND range(1, 100) AS i MERGE (t:T {i: i}) RETURN t.i AS i")
.unwrap();
assert!(stream.next_row().unwrap().is_some());
drop(stream);
assert_eq!(db.node_count(), 50);
}
#[test]
fn auto_commit_merge_stream_commits_on_exhaustion() {
let db = Database::in_memory();
db.execute(
"UNWIND range(1, 50) AS i CREATE (:T {i: i})",
rows_options(),
)
.unwrap();
let stream = db
.stream("UNWIND range(1, 100) AS i MERGE (t:T {i: i}) RETURN t.i AS i")
.unwrap();
let collected: Vec<i64> = stream
.filter_map(|row| {
serde_json::to_value(row)
.ok()
.and_then(|v| v.get("i").and_then(JsonValue::as_i64))
})
.collect();
assert_eq!(collected.len(), 100);
assert_eq!(db.node_count(), 100);
}
#[test]
fn auto_commit_sort_then_create_rolls_back_on_drop() {
let db = Database::in_memory();
db.execute(
"UNWIND range(1, 100) AS i CREATE (:Src {i: i})",
rows_options(),
)
.unwrap();
let pre_count = db.node_count();
let mut stream = db
.stream(
"MATCH (s:Src) WITH s ORDER BY s.i DESC \
CREATE (:Top {origin: s.i}) RETURN s.i AS i",
)
.unwrap();
assert!(stream.next_row().unwrap().is_some());
drop(stream);
assert_eq!(db.node_count(), pre_count);
}
#[test]
fn auto_commit_distinct_then_create_streams() {
let db = Database::in_memory();
db.execute(
"UNWIND range(1, 100) AS i CREATE (:Src {kind: i % 5})",
rows_options(),
)
.unwrap();
let stream = db
.stream(
"MATCH (s:Src) WITH DISTINCT s.kind AS k \
CREATE (:Tag {kind: k}) RETURN k",
)
.unwrap();
let count = stream.count();
assert_eq!(count, 5);
let result = db
.execute("MATCH (t:Tag) RETURN count(t) AS c", rows_options())
.unwrap();
let rows = rows_json(result);
assert_eq!(rows[0].get("c").and_then(JsonValue::as_i64).unwrap(), 5);
}
#[test]
fn union_all_read_stream_yields_both_branches() {
let db = Database::in_memory();
db.execute("UNWIND range(1, 5) AS i CREATE (:T {i: i})", rows_options())
.unwrap();
db.execute(
"UNWIND range(10, 12) AS i CREATE (:U {i: i})",
rows_options(),
)
.unwrap();
let stream = db
.stream(
"MATCH (t:T) RETURN t.i AS x \
UNION ALL \
MATCH (u:U) RETURN u.i AS x",
)
.unwrap();
let collected: Vec<i64> = stream
.filter_map(|row| {
serde_json::to_value(row)
.ok()
.and_then(|v| v.get("x").and_then(JsonValue::as_i64))
})
.collect();
let mut sorted = collected;
sorted.sort_unstable();
assert_eq!(sorted, vec![1, 2, 3, 4, 5, 10, 11, 12]);
}
#[test]
fn union_dedup_collapses_overlap() {
let db = Database::in_memory();
db.execute("UNWIND range(1, 5) AS i CREATE (:T {i: i})", rows_options())
.unwrap();
db.execute("UNWIND range(3, 7) AS i CREATE (:U {i: i})", rows_options())
.unwrap();
let stream = db
.stream(
"MATCH (t:T) RETURN t.i AS x \
UNION \
MATCH (u:U) RETURN u.i AS x",
)
.unwrap();
let collected: Vec<i64> = stream
.filter_map(|row| {
serde_json::to_value(row)
.ok()
.and_then(|v| v.get("x").and_then(JsonValue::as_i64))
})
.collect();
let mut sorted = collected;
sorted.sort_unstable();
assert_eq!(sorted, vec![1, 2, 3, 4, 5, 6, 7]);
}
#[test]
fn auto_commit_mutating_union_all_rolls_back_on_drop() {
let db = Database::in_memory();
let mut stream = db
.stream(
"CREATE (:UA {i: 1}) RETURN 1 AS i \
UNION ALL \
CREATE (:UA {i: 2}) RETURN 2 AS i",
)
.unwrap();
assert!(stream.next_row().unwrap().is_some());
drop(stream);
assert_eq!(db.node_count(), 0);
}
#[test]
fn auto_commit_mutating_union_all_commits_on_exhaustion() {
let db = Database::in_memory();
let stream = db
.stream(
"CREATE (:UA {i: 1}) RETURN 1 AS i \
UNION ALL \
CREATE (:UA {i: 2}) RETURN 2 AS i",
)
.unwrap();
let mut collected: Vec<i64> = stream
.filter_map(|row| {
serde_json::to_value(row)
.ok()
.and_then(|v| v.get("i").and_then(JsonValue::as_i64))
})
.collect();
collected.sort_unstable();
assert_eq!(collected, vec![1, 2]);
assert_eq!(db.node_count(), 2);
}
#[test]
fn auto_commit_mutating_union_dedups_rows_but_commits_all_branch_writes() {
let db = Database::in_memory();
let stream = db
.stream(
"CREATE (:UB {i: 1}) RETURN 1 AS i \
UNION \
CREATE (:UB {i: 1}) RETURN 1 AS i",
)
.unwrap();
let collected: Vec<i64> = stream
.filter_map(|row| {
serde_json::to_value(row)
.ok()
.and_then(|v| v.get("i").and_then(JsonValue::as_i64))
})
.collect();
assert_eq!(collected, vec![1]);
assert_eq!(db.node_count(), 2);
}
#[test]
fn auto_commit_sort_then_create_commits_on_exhaustion() {
let db = Database::in_memory();
db.execute(
"UNWIND range(1, 100) AS i CREATE (:Src {i: i})",
rows_options(),
)
.unwrap();
let stream = db
.stream(
"MATCH (s:Src) WITH s ORDER BY s.i DESC \
CREATE (:Top {origin: s.i}) RETURN s.i AS i",
)
.unwrap();
let count = stream.count();
assert_eq!(count, 100);
let result = db
.execute(
"MATCH (t:Top) RETURN count(t) AS c, sum(t.origin) AS s",
rows_options(),
)
.unwrap();
let rows = rows_json(result);
assert_eq!(rows[0].get("c").and_then(JsonValue::as_i64).unwrap(), 100);
assert_eq!(
rows[0].get("s").and_then(JsonValue::as_i64).unwrap(),
(1..=100).sum::<i64>()
);
}
#[test]
fn auto_commit_remove_stream_rolls_back_on_drop() {
let db = Database::in_memory();
db.execute(
"UNWIND range(1, 50) AS i CREATE (:T:Tagged {i: i})",
rows_options(),
)
.unwrap();
let mut stream = db
.stream("MATCH (t:Tagged) REMOVE t:Tagged RETURN t.i AS i")
.unwrap();
assert!(stream.next_row().unwrap().is_some());
drop(stream);
let result = db
.execute("MATCH (t:Tagged) RETURN count(t) AS c", rows_options())
.unwrap();
let rows = rows_json(result);
assert_eq!(rows[0].get("c").and_then(JsonValue::as_i64).unwrap(), 50);
}
#[test]
fn merge_with_streamable_input_creates_or_binds() {
let db = Database::in_memory();
db.execute(
"UNWIND range(1, 50) AS i CREATE (:T {i: i})",
rows_options(),
)
.unwrap();
db.execute(
"UNWIND range(1, 100) AS i MERGE (:T {i: i})",
rows_options(),
)
.unwrap();
let result = db
.execute("MATCH (t:T) RETURN count(t) AS c", rows_options())
.unwrap();
let rows = rows_json(result);
assert_eq!(rows[0].get("c").and_then(JsonValue::as_i64).unwrap(), 100);
}
#[test]
fn varlen_match_create_streams_input_correctly() {
let db = Database::in_memory();
db.execute(
"CREATE (:V {i: 1})-[:R]->(:V {i: 2})-[:R]->(:V {i: 3})",
rows_options(),
)
.unwrap();
db.execute(
"MATCH (:V {i: 1})-[:R*1..2]->(v:V) CREATE (:Reach {i: v.i})",
rows_options(),
)
.unwrap();
let result = db
.execute(
"MATCH (r:Reach) RETURN count(r) AS c, sum(r.i) AS s",
rows_options(),
)
.unwrap();
let rows = rows_json(result);
assert_eq!(rows[0].get("c").and_then(JsonValue::as_i64).unwrap(), 2);
assert_eq!(rows[0].get("s").and_then(JsonValue::as_i64).unwrap(), 5);
}
}