use crate::graph::types::{Fact, TxId, VALID_TIME_FOREVER};
pub(crate) const VALID_FROM_USE_TX_TIME: i64 = i64::MIN;
use crate::graph::FactStorage;
use crate::graph::types::Value;
use crate::query::datalog::evaluator::DEFAULT_MAX_DERIVED_FACTS;
use crate::query::datalog::evaluator::DEFAULT_MAX_RESULTS;
use crate::query::datalog::executor::DatalogExecutor;
use crate::query::datalog::executor::QueryResult;
use crate::query::datalog::functions::{
AggImpl, AggregateDesc, FunctionRegistry, PredicateDesc, UdfFinaliseFn, UdfOps, UdfStepFn,
};
use crate::query::datalog::parser::parse_datalog_command;
use crate::query::datalog::rules::RuleRegistry;
use crate::query::datalog::types::{AttributeSpec, DatalogCommand, Transaction};
use crate::storage::backend::MemoryBackend;
#[cfg(not(target_arch = "wasm32"))]
use crate::storage::backend::file::FileBackend;
use crate::storage::persistent_facts::PersistentFactStorage;
#[cfg(not(target_arch = "wasm32"))]
use crate::wal::WalWriter;
use anyhow::{Result, bail};
use std::any::Any;
#[cfg(not(target_arch = "wasm32"))]
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
thread_local! {
static WRITE_TX_ACTIVE: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
}
fn set_write_tx_active(val: bool) {
WRITE_TX_ACTIVE.with(|f| f.set(val));
}
fn is_write_tx_active() -> bool {
WRITE_TX_ACTIVE.with(|f| f.get())
}
#[derive(Debug, Clone)]
pub struct OpenOptions {
pub wal_checkpoint_threshold: usize,
pub page_cache_size: usize,
pub max_derived_facts: usize,
pub max_results: usize,
}
impl Default for OpenOptions {
fn default() -> Self {
OpenOptions {
wal_checkpoint_threshold: 1000,
page_cache_size: 256,
max_derived_facts: DEFAULT_MAX_DERIVED_FACTS,
max_results: DEFAULT_MAX_RESULTS,
}
}
}
impl OpenOptions {
pub fn new() -> Self {
Self::default()
}
pub fn page_cache_size(mut self, size: usize) -> Self {
self.page_cache_size = size;
self
}
pub fn max_derived_facts(mut self, n: usize) -> Self {
self.max_derived_facts = n;
self
}
pub fn max_results(mut self, n: usize) -> Self {
self.max_results = n;
self
}
#[cfg(not(target_arch = "wasm32"))]
pub fn path(self, path: impl AsRef<Path>) -> OpenOptionsWithPath {
OpenOptionsWithPath {
opts: self,
path: path.as_ref().to_path_buf(),
}
}
pub fn open_memory(self) -> Result<Minigraf> {
Minigraf::in_memory_with_options(self)
}
}
#[cfg(not(target_arch = "wasm32"))]
pub struct OpenOptionsWithPath {
opts: OpenOptions,
path: PathBuf,
}
#[cfg(not(target_arch = "wasm32"))]
impl OpenOptionsWithPath {
pub fn open(self) -> Result<Minigraf> {
Minigraf::open_with_options(self.path, self.opts)
}
}
enum WriteContext {
Memory,
#[cfg(not(target_arch = "wasm32"))]
File {
pfs: PersistentFactStorage<FileBackend>,
wal: Option<WalWriter>,
db_path: PathBuf,
wal_entry_count: usize,
},
}
struct Inner {
fact_storage: FactStorage,
rules: Arc<RwLock<RuleRegistry>>,
functions: Arc<RwLock<FunctionRegistry>>,
write_lock: Mutex<WriteContext>,
options: OpenOptions,
}
impl Drop for Inner {
fn drop(&mut self) {
if self.options.wal_checkpoint_threshold == usize::MAX {
return;
}
if let Ok(mut ctx) = self.write_lock.lock() {
let _ = Minigraf::do_checkpoint(&self.fact_storage, &mut ctx);
}
}
}
#[derive(Clone)]
pub struct Minigraf {
inner: Arc<Inner>,
}
impl Minigraf {
#[cfg(not(target_arch = "wasm32"))]
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
Self::open_with_options(path, OpenOptions::default())
}
#[cfg(not(target_arch = "wasm32"))]
pub fn open_with_options(path: impl AsRef<Path>, opts: OpenOptions) -> Result<Self> {
let db_path = path.as_ref().to_path_buf();
let backend = FileBackend::open(&db_path)?;
let pfs = PersistentFactStorage::new(backend, opts.page_cache_size)?;
let fact_storage = pfs.storage().clone();
let wal_path = Self::wal_path_for(&db_path);
let wal_entry_count = Self::replay_wal(&wal_path, &fact_storage, &pfs)?;
let wal = if wal_path.exists() {
Some(WalWriter::open_or_create(&wal_path)?)
} else {
None
};
let ctx = WriteContext::File {
pfs,
wal,
db_path,
wal_entry_count,
};
Ok(Minigraf {
inner: Arc::new(Inner {
fact_storage,
rules: Arc::new(RwLock::new(RuleRegistry::new())),
functions: Arc::new(RwLock::new(FunctionRegistry::with_builtins())),
write_lock: Mutex::new(ctx),
options: opts,
}),
})
}
pub fn in_memory() -> Result<Self> {
Self::in_memory_with_options(OpenOptions::default())
}
pub fn in_memory_with_options(opts: OpenOptions) -> Result<Self> {
let backend = MemoryBackend::new();
let pfs = PersistentFactStorage::new(backend, opts.page_cache_size)?;
let fact_storage = pfs.storage().clone();
drop(pfs);
Ok(Minigraf {
inner: Arc::new(Inner {
fact_storage,
rules: Arc::new(RwLock::new(RuleRegistry::new())),
functions: Arc::new(RwLock::new(FunctionRegistry::with_builtins())),
write_lock: Mutex::new(WriteContext::Memory),
options: opts,
}),
})
}
#[cfg(not(target_arch = "wasm32"))]
fn replay_wal(
wal_path: &Path,
fact_storage: &FactStorage,
pfs: &PersistentFactStorage<FileBackend>,
) -> Result<usize> {
if !wal_path.exists() {
return Ok(0);
}
let mut reader = crate::wal::WalReader::open(wal_path)?;
let entries = reader.read_entries()?;
let last_checkpointed = pfs.last_checkpointed_tx_count();
let mut replayed = 0;
for entry in &entries {
if entry.tx_count <= last_checkpointed {
continue;
}
for fact in &entry.facts {
let _ = fact_storage.load_fact(fact.clone())?;
}
replayed += 1;
}
fact_storage.restore_tx_counter()?;
Ok(replayed)
}
pub fn execute(&self, input: &str) -> Result<QueryResult> {
if is_write_tx_active() {
bail!(
"a WriteTransaction is already in progress on this thread; use tx.execute() instead"
);
}
let cmd = parse_datalog_command(input).map_err(|e| anyhow::anyhow!("{}", e))?;
let is_write = matches!(
cmd,
DatalogCommand::Transact(_) | DatalogCommand::Retract(_) | DatalogCommand::Rule(_)
);
if is_write {
let mut ctx = self.inner.write_lock.lock().map_err(|_| {
anyhow::anyhow!("write lock is poisoned; database may be in an inconsistent state")
})?;
let (stamped, is_retract) = match &cmd {
DatalogCommand::Transact(tx) => (Minigraf::materialize_transaction(tx)?, false),
DatalogCommand::Retract(tx) => (Minigraf::materialize_retraction(tx)?, true),
DatalogCommand::Rule(_) => {
let executor = DatalogExecutor::new_with_rules_and_functions(
self.inner.fact_storage.clone(),
self.inner.rules.clone(),
self.inner.functions.clone(),
);
return executor.execute(cmd);
}
_ => unreachable!("is_write guarantees Transact, Retract, or Rule"),
};
let tx_count = self.inner.fact_storage.allocate_tx_count();
let tx_id = crate::graph::types::tx_id_now();
let stamped: Vec<Fact> = stamped
.into_iter()
.map(|mut f| {
f.tx_id = tx_id;
f.tx_count = tx_count;
if f.asserted && f.valid_from == VALID_FROM_USE_TX_TIME {
f.valid_from = tx_id as i64;
}
f
})
.collect();
let should_checkpoint = WriteTransaction::wal_write_stamped_batch(
&mut ctx,
&self.inner.options,
tx_count,
&stamped,
)?;
for fact in &stamped {
let _ = self.inner.fact_storage.load_fact(fact.clone())?;
}
if should_checkpoint {
Minigraf::do_checkpoint(&self.inner.fact_storage, &mut ctx)?;
}
if is_retract {
Ok(QueryResult::Retracted(tx_id))
} else {
Ok(QueryResult::Transacted(tx_id))
}
} else {
let mut executor = DatalogExecutor::new_with_rules_and_functions(
self.inner.fact_storage.clone(),
self.inner.rules.clone(),
self.inner.functions.clone(),
);
executor.set_limits(
self.inner.options.max_derived_facts,
self.inner.options.max_results,
);
executor.execute(cmd)
}
}
pub fn begin_write(&self) -> Result<WriteTransaction<'_>> {
if is_write_tx_active() {
bail!(
"a WriteTransaction is already in progress on this thread; use tx.execute() instead"
);
}
let guard = self.inner.write_lock.lock().map_err(|_| {
anyhow::anyhow!("write lock is poisoned; database may be in an inconsistent state")
})?;
set_write_tx_active(true);
Ok(WriteTransaction {
guard,
inner: &self.inner,
pending_facts: Vec::new(),
next_pending_tx_count: self.inner.fact_storage.current_tx_count() + 1,
next_pending_tx_id: crate::graph::types::tx_id_now(),
committed: false,
})
}
pub fn checkpoint(&self) -> Result<()> {
let mut ctx = self.inner.write_lock.lock().map_err(|_| {
anyhow::anyhow!("write lock is poisoned; database may be in an inconsistent state")
})?;
Self::do_checkpoint(&self.inner.fact_storage, &mut ctx)
}
fn do_checkpoint(_fact_storage: &FactStorage, ctx: &mut WriteContext) -> Result<()> {
match ctx {
WriteContext::Memory => {
}
#[cfg(not(target_arch = "wasm32"))]
WriteContext::File {
pfs,
wal,
db_path,
wal_entry_count,
} => {
pfs.force_dirty();
pfs.save()?;
let wal_path = Self::wal_path_for(db_path);
*wal = None;
if wal_path.exists() {
WalWriter::delete_file(&wal_path)?;
}
*wal_entry_count = 0;
}
}
Ok(())
}
pub fn prepare(
&self,
query_str: &str,
) -> Result<crate::query::datalog::prepared::PreparedQuery> {
use crate::query::datalog::prepared::prepare_query;
let cmd = parse_datalog_command(query_str).map_err(|e| anyhow::anyhow!("{}", e))?;
let query = match cmd {
DatalogCommand::Query(q) => q,
DatalogCommand::Transact(_) => {
anyhow::bail!("only (query ...) commands can be prepared; got transact")
}
DatalogCommand::Retract(_) => {
anyhow::bail!("only (query ...) commands can be prepared; got retract")
}
DatalogCommand::Rule(_) => {
anyhow::bail!("only (query ...) commands can be prepared; got rule")
}
};
prepare_query(
query,
self.inner.fact_storage.clone(),
self.inner.rules.clone(),
self.inner.functions.clone(),
)
}
pub fn repl(&self) -> crate::repl::Repl<'_> {
crate::repl::Repl::new(self)
}
#[cfg(not(target_arch = "wasm32"))]
fn wal_path_for(db_path: &Path) -> PathBuf {
let mut p = db_path.to_path_buf();
let name = p
.file_name()
.map(|n| {
let mut s = n.to_os_string();
s.push(".wal");
s
})
.unwrap_or_else(|| std::ffi::OsString::from("db.graph.wal"));
p.set_file_name(name);
p
}
pub(crate) fn materialize_transaction(tx: &Transaction) -> Result<Vec<Fact>> {
use crate::query::datalog::matcher::{edn_to_entity_id, edn_to_value};
use crate::query::datalog::types::EdnValue;
let tx_valid_from = tx.valid_from;
let tx_valid_to = tx.valid_to;
let mut facts = Vec::new();
for pattern in &tx.facts {
let entity = edn_to_entity_id(&pattern.entity)
.map_err(|e| anyhow::anyhow!("invalid entity: {}", e))?;
let attr = match &pattern.attribute {
AttributeSpec::Real(EdnValue::Keyword(k)) => k.clone(),
AttributeSpec::Real(_) => anyhow::bail!("attribute must be a keyword"),
AttributeSpec::Pseudo(_) => anyhow::bail!("cannot transact a pseudo-attribute"),
};
let value = edn_to_value(&pattern.value)
.map_err(|e| anyhow::anyhow!("invalid value: {}", e))?;
let valid_from = pattern
.valid_from
.or(tx_valid_from)
.unwrap_or(VALID_FROM_USE_TX_TIME);
let valid_to = pattern
.valid_to
.or(tx_valid_to)
.unwrap_or(VALID_TIME_FOREVER);
facts.push(Fact::with_valid_time(
entity, attr, value, 0, 0, valid_from, valid_to,
));
}
Ok(facts)
}
pub(crate) fn materialize_retraction(tx: &Transaction) -> Result<Vec<Fact>> {
use crate::query::datalog::matcher::{edn_to_entity_id, edn_to_value};
use crate::query::datalog::types::EdnValue;
let mut facts = Vec::new();
for pattern in &tx.facts {
let entity = edn_to_entity_id(&pattern.entity)
.map_err(|e| anyhow::anyhow!("invalid entity: {}", e))?;
let attr = match &pattern.attribute {
AttributeSpec::Real(EdnValue::Keyword(k)) => k.clone(),
AttributeSpec::Real(_) => anyhow::bail!("attribute must be a keyword"),
AttributeSpec::Pseudo(_) => anyhow::bail!("cannot transact a pseudo-attribute"),
};
let value = edn_to_value(&pattern.value)
.map_err(|e| anyhow::anyhow!("invalid value: {}", e))?;
let mut f = Fact::retract(entity, attr, value, 0);
f.tx_count = 0;
facts.push(f);
}
Ok(facts)
}
pub fn register_aggregate<Acc>(
&self,
name: &str,
init: impl Fn() -> Acc + Send + Sync + 'static,
step: impl Fn(&mut Acc, &Value) + Send + Sync + 'static,
finalise: impl Fn(&Acc, usize) -> Value + Send + Sync + 'static,
) -> Result<()>
where
Acc: Any + Send + 'static,
{
let init_boxed: Arc<dyn Fn() -> Box<dyn Any + Send> + Send + Sync> =
Arc::new(move || Box::new(init()) as Box<dyn Any + Send>);
let step_boxed: UdfStepFn = Arc::new(move |acc, v| {
step(
acc.downcast_mut::<Acc>()
.expect("UDF accumulator type mismatch"),
v,
);
});
let finalise_boxed: UdfFinaliseFn = Arc::new(move |acc, n| {
finalise(
acc.downcast_ref::<Acc>()
.expect("UDF accumulator type mismatch"),
n,
)
});
let desc = AggregateDesc {
impl_: AggImpl::Udf(UdfOps {
init: init_boxed,
step: step_boxed,
finalise: finalise_boxed,
}),
is_builtin: false,
};
self.inner
.functions
.write()
.map_err(|e| anyhow::anyhow!("function registry lock poisoned: {}", e))?
.register_aggregate_desc(name.to_string(), desc)
}
pub fn register_predicate(
&self,
name: &str,
f: impl Fn(&Value) -> bool + Send + Sync + 'static,
) -> Result<()> {
let desc = PredicateDesc {
f: Arc::new(f),
is_builtin: false,
};
self.inner
.functions
.write()
.map_err(|e| anyhow::anyhow!("function registry lock poisoned: {}", e))?
.register_predicate_desc(name.to_string(), desc)
}
}
pub struct WriteTransaction<'a> {
guard: MutexGuard<'a, WriteContext>,
inner: &'a Inner,
pending_facts: Vec<Fact>,
next_pending_tx_count: u64,
next_pending_tx_id: TxId,
committed: bool,
}
impl<'a> WriteTransaction<'a> {
pub fn execute(&mut self, input: &str) -> Result<QueryResult> {
let cmd = parse_datalog_command(input).map_err(|e| anyhow::anyhow!("{}", e))?;
match cmd {
DatalogCommand::Transact(tx) => {
self.stage_pending_facts(Minigraf::materialize_transaction(&tx)?);
Ok(QueryResult::Ok)
}
DatalogCommand::Retract(tx) => {
self.stage_pending_facts(Minigraf::materialize_retraction(&tx)?);
Ok(QueryResult::Ok)
}
DatalogCommand::Query(_) => self.execute_read_command(cmd),
DatalogCommand::Rule(rule) => self.execute_rule_command(rule),
}
}
fn execute_read_command(&self, cmd: DatalogCommand) -> Result<QueryResult> {
if self.pending_facts.is_empty() {
let mut executor = DatalogExecutor::new_with_rules_and_functions(
self.inner.fact_storage.clone(),
self.inner.rules.clone(),
self.inner.functions.clone(),
);
executor.set_limits(
self.inner.options.max_derived_facts,
self.inner.options.max_results,
);
return executor.execute(cmd);
}
let merged_facts = self.merged_query_facts()?;
let mut executor = DatalogExecutor::new_from_facts_with_rules_and_functions(
merged_facts,
self.pending_read_now_floor(),
self.inner.rules.clone(),
self.inner.functions.clone(),
);
executor.set_limits(
self.inner.options.max_derived_facts,
self.inner.options.max_results,
);
executor.execute(cmd)
}
fn execute_rule_command(
&self,
rule: crate::query::datalog::types::Rule,
) -> Result<QueryResult> {
let mut executor = DatalogExecutor::new_with_rules_and_functions(
self.inner.fact_storage.clone(),
self.inner.rules.clone(),
self.inner.functions.clone(),
);
executor.set_limits(
self.inner.options.max_derived_facts,
self.inner.options.max_results,
);
executor.execute(DatalogCommand::Rule(rule))
}
fn stage_pending_facts(&mut self, facts: Vec<Fact>) {
let staged_tx_id = std::cmp::max(crate::graph::types::tx_id_now(), self.next_pending_tx_id);
let staged_tx_count = self.next_pending_tx_count;
self.pending_facts.extend(facts.into_iter().map(|mut fact| {
fact.tx_id = staged_tx_id;
fact.tx_count = staged_tx_count;
fact
}));
self.next_pending_tx_id = staged_tx_id.saturating_add(1);
self.next_pending_tx_count += 1;
}
pub fn commit(mut self) -> Result<()> {
let facts_to_commit = std::mem::take(&mut self.pending_facts);
if !facts_to_commit.is_empty() {
let tx_count = self.inner.fact_storage.allocate_tx_count();
let tx_id = crate::graph::types::tx_id_now();
let stamped: Vec<Fact> = facts_to_commit
.into_iter()
.map(|mut f| {
f.tx_id = tx_id;
f.tx_count = tx_count;
if f.valid_from == VALID_FROM_USE_TX_TIME && f.asserted {
f.valid_from = tx_id as i64;
}
f
})
.collect();
let should_checkpoint = Self::wal_write_stamped_batch(
&mut self.guard,
&self.inner.options,
tx_count,
&stamped,
)?;
for fact in stamped {
let _ = self.inner.fact_storage.load_fact(fact)?;
}
if should_checkpoint {
Minigraf::do_checkpoint(&self.inner.fact_storage, &mut self.guard)?;
}
}
self.committed = true;
set_write_tx_active(false);
Ok(())
}
#[allow(unused_variables)]
fn wal_write_stamped_batch(
ctx: &mut WriteContext,
opts: &OpenOptions,
tx_count: u64,
facts: &[Fact],
) -> Result<bool> {
match ctx {
WriteContext::Memory => Ok(false),
#[cfg(not(target_arch = "wasm32"))]
WriteContext::File {
pfs,
wal,
db_path,
wal_entry_count,
} => {
if wal.is_none() {
let wal_path = Minigraf::wal_path_for(db_path);
*wal = Some(WalWriter::open_or_create(&wal_path)?);
}
let wal_writer = wal.as_mut().expect("WAL not initialized");
wal_writer.append_entry(tx_count, facts)?;
pfs.mark_dirty();
*wal_entry_count += 1;
Ok(*wal_entry_count >= opts.wal_checkpoint_threshold)
}
}
}
pub fn rollback(mut self) {
self.pending_facts.clear();
self.committed = true; set_write_tx_active(false);
}
fn merged_query_facts(&self) -> Result<Arc<[Fact]>> {
let committed = self.inner.fact_storage.get_all_facts()?;
let mut merged = Vec::with_capacity(committed.len() + self.pending_facts.len());
merged.extend(committed);
merged.extend(self.pending_facts.iter().cloned().map(|mut f| {
if f.asserted && f.valid_from == VALID_FROM_USE_TX_TIME {
f.valid_from = f.tx_id as i64;
}
f
}));
Ok(Arc::from(merged))
}
fn pending_read_now_floor(&self) -> Option<i64> {
self.pending_facts
.iter()
.map(|fact| fact.tx_id as i64)
.max()
}
}
impl Drop for WriteTransaction<'_> {
fn drop(&mut self) {
if !self.committed {
self.pending_facts.clear();
set_write_tx_active(false);
}
}
}
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
use super::*;
#[test]
fn repl_constructed_from_db() {
let db = Minigraf::in_memory().unwrap();
let _repl = db.repl();
}
#[test]
fn test_in_memory_no_wal_file() {
let db = Minigraf::in_memory().unwrap();
db.execute(r#"(transact [[:alice :person/name "Alice"]])"#)
.unwrap();
db.execute(r#"(transact [[:alice :person/age 30]])"#)
.unwrap();
let facts = db.inner.fact_storage.get_asserted_facts().unwrap();
assert_eq!(facts.len(), 2, "expected 2 facts after 2 transacts");
}
#[test]
fn test_begin_write_commit_facts_visible() {
let db = Minigraf::in_memory().unwrap();
{
let mut tx = db.begin_write().unwrap();
tx.execute(r#"(transact [[:alice :person/name "Alice"]])"#)
.unwrap();
tx.execute(r#"(transact [[:alice :person/age 30]])"#)
.unwrap();
tx.commit().unwrap();
}
let facts = db.inner.fact_storage.get_asserted_facts().unwrap();
assert_eq!(facts.len(), 2, "committed facts must be visible");
}
#[test]
fn test_begin_write_rollback_no_facts_visible() {
let db = Minigraf::in_memory().unwrap();
{
let mut tx = db.begin_write().unwrap();
tx.execute(r#"(transact [[:alice :person/name "Alice"]])"#)
.unwrap();
tx.rollback();
}
let facts = db.inner.fact_storage.get_asserted_facts().unwrap();
assert_eq!(facts.len(), 0, "rolled-back facts must not be visible");
}
#[test]
fn test_drop_without_commit_is_rollback() {
let db = Minigraf::in_memory().unwrap();
{
let mut tx = db.begin_write().unwrap();
tx.execute(r#"(transact [[:alice :person/name "Alice"]])"#)
.unwrap();
}
let facts = db.inner.fact_storage.get_asserted_facts().unwrap();
assert_eq!(facts.len(), 0, "dropped transaction must act as rollback");
}
#[test]
fn test_write_transaction_read_your_own_writes() {
let db = Minigraf::in_memory().unwrap();
let mut tx = db.begin_write().unwrap();
tx.execute(r#"(transact [[:alice :person/name "Alice"]])"#)
.unwrap();
let result = tx
.execute(r#"(query [:find ?name :where [?e :person/name ?name]])"#)
.unwrap();
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(results.len(), 1, "buffered fact must be visible in query");
}
_ => panic!("expected QueryResults"),
}
tx.commit().unwrap();
}
#[test]
fn test_write_transaction_query_with_pending_retraction_and_assertion() {
let db = Minigraf::in_memory().unwrap();
db.execute(r#"(transact [[:alice :person/name "Alice"] [:alice :person/age 30]])"#)
.unwrap();
let mut tx = db.begin_write().unwrap();
tx.execute(r#"(retract [[:alice :person/age 30]])"#)
.unwrap();
tx.execute(r#"(transact [[:alice :person/age 31]])"#)
.unwrap();
let result = tx
.execute(r#"(query [:find ?age :where [:alice :person/age ?age]])"#)
.unwrap();
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(results.len(), 1, "should see only one visible age");
assert_eq!(results[0][0], Value::Integer(31));
}
_ => panic!("expected QueryResults"),
}
}
#[test]
fn test_write_transaction_pending_retraction_only_hides_committed_fact() {
let db = Minigraf::in_memory().unwrap();
db.execute(r#"(transact [[:alice :person/age 30]])"#)
.unwrap();
let mut tx = db.begin_write().unwrap();
tx.execute(r#"(retract [[:alice :person/age 30]])"#)
.unwrap();
let result = tx
.execute(r#"(query [:find ?age :where [:alice :person/age ?age]])"#)
.unwrap();
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(
results.len(),
0,
"retracted committed fact must not be visible"
);
}
_ => panic!("expected QueryResults"),
}
}
#[test]
fn test_write_transaction_rule_query_with_pending_write() {
let db = Minigraf::in_memory().unwrap();
db.execute("(rule [(reachable ?x ?y) [?x :edge ?y]])")
.unwrap();
db.execute("(rule [(reachable ?x ?y) [?x :edge ?z] (reachable ?z ?y)])")
.unwrap();
db.execute("(transact [[:a :edge :b]])").unwrap();
let mut tx = db.begin_write().unwrap();
tx.execute("(transact [[:b :edge :c]])").unwrap();
let result = tx
.execute("(query [:find ?y :where (reachable :a ?y)])")
.unwrap();
match result {
QueryResult::QueryResults { results, .. } => {
assert!(
results
.iter()
.any(|row| row[0] == Value::Keyword(":c".to_string())),
"rule query should see pending edge"
);
}
_ => panic!("expected QueryResults"),
}
}
#[test]
fn test_write_transaction_pending_metadata_is_stable_across_reads() {
use chrono::{SecondsFormat, Utc};
use std::time::Duration;
let db = Minigraf::in_memory().unwrap();
let mut tx = db.begin_write().unwrap();
tx.execute(r#"(transact [[:alice :person/age 30]])"#)
.unwrap();
let first = tx
.execute(
r#"(query [:find ?tx ?tc ?vf :any-valid-time :where [:alice :person/age ?age] [:alice :db/tx-id ?tx] [:alice :db/tx-count ?tc] [:alice :db/valid-from ?vf]])"#,
)
.unwrap();
std::thread::sleep(Duration::from_millis(3));
let second = tx
.execute(
r#"(query [:find ?tx ?tc ?vf :any-valid-time :where [:alice :person/age ?age] [:alice :db/tx-id ?tx] [:alice :db/tx-count ?tc] [:alice :db/valid-from ?vf]])"#,
)
.unwrap();
let (first_row, tx_id) = match &first {
QueryResult::QueryResults { results, .. } => {
assert_eq!(results.len(), 1);
let row = results[0].clone();
let tx_id = match row[0] {
Value::Integer(i) => i,
_ => panic!("expected tx id integer"),
};
assert_eq!(row[1], Value::Integer(1));
assert_eq!(row[2], Value::Integer(tx_id));
(row, tx_id)
}
_ => panic!("expected QueryResults"),
};
match second {
QueryResult::QueryResults {
results: second_results,
..
} => {
assert_eq!(second_results.len(), 1);
assert_eq!(first_row, second_results[0]);
}
_ => panic!("expected QueryResults"),
}
tx.execute(r#"(transact [[:alice :person/age 31]])"#)
.unwrap();
let future_tx_id = tx_id as u64 + 60_000;
let future_fact = Fact::with_valid_time(
uuid::Uuid::new_v4(),
":future/marker".to_string(),
Value::Integer(99),
future_tx_id,
1,
future_tx_id as i64,
VALID_TIME_FOREVER,
);
db.inner.fact_storage.load_fact(future_fact).unwrap();
let as_of_timestamp = chrono::DateTime::<Utc>::from_timestamp_millis(tx_id)
.unwrap()
.to_rfc3339_opts(SecondsFormat::Millis, true);
let as_of_query = format!(
r#"(query [:find ?age :as-of "{}" :where [:alice :person/age ?age]])"#,
as_of_timestamp
);
let as_of_result = tx.execute(&as_of_query).unwrap();
match as_of_result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(results.len(), 1);
assert_eq!(results[0][0], Value::Integer(30));
}
_ => panic!("expected QueryResults"),
}
let future_query = tx
.execute(r#"(query [:find ?v :where [?e :future/marker ?v]])"#)
.unwrap();
match future_query {
QueryResult::QueryResults { results, .. } => {
assert_eq!(
results.len(),
0,
"future committed facts must not become visible from pending-only floor"
);
}
_ => panic!("expected QueryResults"),
}
}
#[test]
fn test_same_thread_reentrant_write_returns_error() {
let db = Minigraf::in_memory().unwrap();
let _tx = db.begin_write().unwrap();
let err = db
.execute(r#"(transact [[:bob :person/name "Bob"]])"#)
.unwrap_err();
assert!(
err.to_string()
.contains("WriteTransaction is already in progress"),
"expected reentrant-write error, got: {}",
err
);
}
#[test]
fn test_thread_local_flag_cleared_after_commit() {
let db = Minigraf::in_memory().unwrap();
{
let mut tx = db.begin_write().unwrap();
tx.execute(r#"(transact [[:alice :person/name "Alice"]])"#)
.unwrap();
tx.commit().unwrap();
}
let result = db.begin_write();
assert!(
result.is_ok(),
"begin_write must succeed after commit clears the flag"
);
result.unwrap().rollback();
}
#[test]
fn test_thread_local_flag_cleared_after_rollback() {
let db = Minigraf::in_memory().unwrap();
{
let tx = db.begin_write().unwrap();
tx.rollback();
}
let result = db.begin_write();
assert!(
result.is_ok(),
"begin_write must succeed after rollback clears the flag"
);
result.unwrap().rollback();
}
#[test]
fn test_thread_local_flag_cleared_after_drop() {
let db = Minigraf::in_memory().unwrap();
{
let mut tx = db.begin_write().unwrap();
tx.execute(r#"(transact [[:alice :person/name "Alice"]])"#)
.unwrap();
}
let result = db.begin_write();
assert!(
result.is_ok(),
"begin_write must succeed after drop clears the flag"
);
result.unwrap().rollback();
}
#[test]
fn test_in_memory_checkpoint_is_noop() {
let db = Minigraf::in_memory().unwrap();
db.execute(r#"(transact [[:alice :person/name "Alice"]])"#)
.unwrap();
db.checkpoint().unwrap();
let facts = db.inner.fact_storage.get_asserted_facts().unwrap();
assert_eq!(facts.len(), 1);
}
#[test]
fn test_open_with_options_custom_threshold() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.graph");
let opts = OpenOptions {
wal_checkpoint_threshold: 5,
page_cache_size: 256,
max_derived_facts: 100_000,
max_results: 1_000_000,
};
let db = Minigraf::open_with_options(&path, opts).unwrap();
assert_eq!(db.inner.options.wal_checkpoint_threshold, 5);
}
#[test]
#[cfg(unix)] fn test_failed_commit_leaves_database_unchanged() {
fn count_results(result: QueryResult) -> usize {
match result {
QueryResult::QueryResults { results, .. } => results.len(),
_ => 0,
}
}
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.graph");
let wal_path = {
let mut p = db_path.as_os_str().to_owned();
p.push(".wal");
std::path::PathBuf::from(p)
};
let db = Minigraf::open(&db_path).unwrap();
db.execute("(transact [[:alice :name \"Alice\"]])").unwrap();
db.checkpoint().unwrap();
assert!(!wal_path.exists(), "WAL must be gone after checkpoint");
std::fs::create_dir(&wal_path).unwrap();
let mut tx = db.begin_write().unwrap();
tx.execute("(transact [[:bob :name \"Bob\"]])").unwrap();
let result = tx.commit();
std::fs::remove_dir(&wal_path).unwrap();
assert!(
result.is_err(),
"commit should fail when WAL path is a directory"
);
let n = count_results(
db.execute("(query [:find ?name :where [?e :name ?name]])")
.unwrap(),
);
assert_eq!(
n, 1,
"only Alice should be visible; Bob's failed commit must be rolled back"
);
}
#[test]
fn test_file_backed_checkpoint() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.graph");
let wal_path = dir.path().join("test.graph.wal");
let db = Minigraf::open(&path).unwrap();
db.execute(r#"(transact [[:alice :person/name "Alice"]])"#)
.unwrap();
assert!(wal_path.exists(), "WAL must exist after a write");
db.checkpoint().unwrap();
assert!(!wal_path.exists(), "WAL must be deleted after checkpoint");
let db2 = Minigraf::open(&path).unwrap();
let facts = db2.inner.fact_storage.get_asserted_facts().unwrap();
assert_eq!(facts.len(), 1, "facts must survive checkpoint");
}
#[test]
fn test_materialize_transaction_non_keyword_real_attr_error() {
use crate::query::datalog::types::EdnValue;
use crate::query::datalog::types::{Pattern, Transaction};
let tx = Transaction {
facts: vec![Pattern::new(
EdnValue::Keyword(":alice".to_string()),
EdnValue::Integer(42), EdnValue::Integer(0),
)],
valid_from: None,
valid_to: None,
};
let r = Minigraf::materialize_transaction(&tx);
assert!(
r.is_err(),
"materialize_transaction with non-keyword Real attr must fail"
);
}
#[test]
fn test_materialize_retraction_non_keyword_real_attr_error() {
use crate::query::datalog::types::EdnValue;
use crate::query::datalog::types::{Pattern, Transaction};
let tx = Transaction {
facts: vec![Pattern::new(
EdnValue::Keyword(":alice".to_string()),
EdnValue::String("not-a-keyword".to_string()), EdnValue::Integer(0),
)],
valid_from: None,
valid_to: None,
};
let r = Minigraf::materialize_retraction(&tx);
assert!(
r.is_err(),
"materialize_retraction with non-keyword Real attr must fail"
);
}
#[test]
fn test_materialize_transaction_pseudo_attr_error() {
use crate::query::datalog::types::EdnValue;
use crate::query::datalog::types::{Pattern, PseudoAttr, Transaction};
let tx = Transaction {
facts: vec![Pattern::pseudo(
EdnValue::Keyword(":alice".to_string()),
PseudoAttr::ValidFrom,
EdnValue::Integer(0),
)],
valid_from: None,
valid_to: None,
};
let r = Minigraf::materialize_transaction(&tx);
assert!(
r.is_err(),
"materialize_transaction with pseudo-attr must fail"
);
}
#[test]
fn test_materialize_retraction_pseudo_attr_error() {
use crate::query::datalog::types::EdnValue;
use crate::query::datalog::types::{Pattern, PseudoAttr, Transaction};
let tx = Transaction {
facts: vec![Pattern::pseudo(
EdnValue::Keyword(":alice".to_string()),
PseudoAttr::TxCount,
EdnValue::Integer(0),
)],
valid_from: None,
valid_to: None,
};
let r = Minigraf::materialize_retraction(&tx);
assert!(
r.is_err(),
"materialize_retraction with pseudo-attr must fail"
);
}
#[test]
fn test_begin_write_flag_not_leaked_on_lock_failure() {
let db = Minigraf::in_memory().unwrap();
{
let _tx = db.begin_write().unwrap();
assert!(
is_write_tx_active(),
"flag should be set during active transaction"
);
}
assert!(
!is_write_tx_active(),
"flag should be cleared after transaction ends"
);
{
let _tx = db.begin_write().unwrap();
}
assert!(
!is_write_tx_active(),
"flag should be cleared after second transaction"
);
{
let _tx = db.begin_write().unwrap();
}
assert!(
!is_write_tx_active(),
"flag should be cleared after third transaction"
);
}
#[test]
fn test_max_derived_facts_limit_enforced() {
let low_opts = OpenOptions::default()
.max_derived_facts(5)
.max_results(1_000_000);
let db_low = Minigraf::in_memory_with_options(low_opts).unwrap();
db_low.execute("(transact [[:a :edge :b] [:b :edge :c] [:c :edge :d] [:d :edge :e] [:e :edge :f]])").unwrap();
db_low
.execute(r#"(rule [(reachable ?x ?y) [?x :edge ?y]])"#)
.unwrap();
db_low
.execute(r#"(rule [(reachable ?x ?y) [?x :edge ?z] (reachable ?z ?y)])"#)
.unwrap();
let result = db_low.execute("(query [:find ?to :where (reachable :a ?to)])");
assert!(
result.is_err(),
"Query should fail with max_derived_facts limit"
);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("derived") || err_msg.contains("limit"),
"Error should mention derived facts limit, got: {}",
err_msg
);
let high_opts = OpenOptions::default()
.max_derived_facts(100_000)
.max_results(1_000_000);
let db_high = Minigraf::in_memory_with_options(high_opts).unwrap();
db_high.execute("(transact [[:a :edge :b] [:b :edge :c] [:c :edge :d] [:d :edge :e] [:e :edge :f]])").unwrap();
db_high
.execute(r#"(rule [(reachable ?x ?y) [?x :edge ?y]])"#)
.unwrap();
db_high
.execute(r#"(rule [(reachable ?x ?y) [?x :edge ?z] (reachable ?z ?y)])"#)
.unwrap();
let result = db_high.execute("(query [:find ?to :where (reachable :a ?to)])");
assert!(result.is_ok(), "Query should succeed with higher limit");
}
}
#[cfg(all(target_os = "wasi", test))]
mod wasi_tests {
use crate::db::Minigraf;
use crate::query::datalog::executor::QueryResult;
#[test]
fn in_memory_smoke() {
let db = Minigraf::in_memory().expect("open in-memory db");
db.execute("(transact [[:e1 :name \"hello\"]])")
.expect("transact");
let r = db
.execute("(query [:find ?e :where [?e :name _]])")
.expect("query");
match r {
QueryResult::QueryResults { results, .. } => {
assert!(!results.is_empty());
}
_ => panic!("expected QueryResults"),
}
}
}