use super::evaluator::{StratifiedEvaluator, evaluate_not_join};
use super::functions::{AggImpl, FunctionRegistry, apply_builtin_aggregate, value_cmp};
use super::matcher::{PatternMatcher, edn_to_entity_id, edn_to_value};
use super::optimizer;
use super::rules::RuleRegistry;
use super::types::{
AsOf, AttributeSpec, BinOp, DatalogCommand, DatalogQuery, EdnValue, Expr, FindSpec, Order,
Pattern, Rule, Transaction, UnaryOp, ValidAt, WhereClause, WindowFunc,
};
use crate::graph::FactStorage;
use crate::graph::types::{Fact, TransactOptions, TxId, Value, tx_id_now};
use anyhow::{Result, anyhow};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
fn query_uses_per_fact_pseudo_attr(query: &DatalogQuery) -> bool {
fn check_clauses(clauses: &[WhereClause]) -> bool {
clauses.iter().any(|c| match c {
WhereClause::Pattern(p) => matches!(
&p.attribute,
AttributeSpec::Pseudo(pa) if pa.is_per_fact()
),
WhereClause::Not(inner) => check_clauses(inner),
WhereClause::NotJoin { clauses: inner, .. } => check_clauses(inner),
WhereClause::Or(branches) => branches.iter().any(|b| check_clauses(b)),
WhereClause::OrJoin { branches, .. } => branches.iter().any(|b| check_clauses(b)),
_ => false,
})
}
check_clauses(&query.where_clauses)
}
#[derive(Debug, Clone, PartialEq)]
pub enum QueryResult {
Transacted(TxId),
Retracted(TxId),
QueryResults {
vars: Vec<String>,
results: Vec<Vec<Value>>,
},
Ok,
}
pub struct DatalogExecutor {
storage: FactStorage,
facts_override: Option<Arc<[Fact]>>,
read_now_floor: Option<i64>,
rules: Arc<RwLock<RuleRegistry>>,
functions: Arc<RwLock<FunctionRegistry>>,
indexes: Arc<crate::storage::index::Indexes>,
max_derived_facts: usize,
max_results: usize,
}
impl DatalogExecutor {
#[allow(dead_code)]
pub fn new(storage: FactStorage) -> Self {
DatalogExecutor {
storage,
facts_override: None,
read_now_floor: None,
rules: Arc::new(RwLock::new(RuleRegistry::new())),
functions: Arc::new(RwLock::new(FunctionRegistry::with_builtins())),
indexes: Arc::new(crate::storage::index::Indexes::new()),
max_derived_facts: crate::query::datalog::evaluator::DEFAULT_MAX_DERIVED_FACTS,
max_results: crate::query::datalog::evaluator::DEFAULT_MAX_RESULTS,
}
}
pub fn new_with_rules_and_functions(
storage: FactStorage,
rules: Arc<RwLock<RuleRegistry>>,
functions: Arc<RwLock<FunctionRegistry>>,
) -> Self {
DatalogExecutor {
storage,
facts_override: None,
read_now_floor: None,
rules,
functions,
indexes: Arc::new(crate::storage::index::Indexes::new()),
max_derived_facts: crate::query::datalog::evaluator::DEFAULT_MAX_DERIVED_FACTS,
max_results: crate::query::datalog::evaluator::DEFAULT_MAX_RESULTS,
}
}
pub(crate) fn new_from_facts_with_rules_and_functions(
facts: Arc<[Fact]>,
pending_read_now_floor: Option<i64>,
rules: Arc<RwLock<RuleRegistry>>,
functions: Arc<RwLock<FunctionRegistry>>,
) -> Self {
DatalogExecutor {
storage: FactStorage::new(),
facts_override: Some(facts),
read_now_floor: pending_read_now_floor,
rules,
functions,
indexes: Arc::new(crate::storage::index::Indexes::new()),
max_derived_facts: crate::query::datalog::evaluator::DEFAULT_MAX_DERIVED_FACTS,
max_results: crate::query::datalog::evaluator::DEFAULT_MAX_RESULTS,
}
}
#[allow(dead_code)]
pub fn new_with_rules(storage: FactStorage, rules: Arc<RwLock<RuleRegistry>>) -> Self {
Self::new_with_rules_and_functions(
storage,
rules,
Arc::new(RwLock::new(FunctionRegistry::with_builtins())),
)
}
#[allow(dead_code)]
pub fn new_with_limits(
storage: FactStorage,
rules: Arc<RwLock<RuleRegistry>>,
functions: Arc<RwLock<FunctionRegistry>>,
max_derived_facts: usize,
max_results: usize,
) -> Self {
let indexes = storage.pending_indexes_snapshot();
DatalogExecutor {
storage,
facts_override: None,
read_now_floor: None,
rules,
functions,
indexes: Arc::new(indexes),
max_derived_facts,
max_results,
}
}
pub fn set_limits(&mut self, max_derived_facts: usize, max_results: usize) {
self.max_derived_facts = max_derived_facts;
self.max_results = max_results;
}
fn read_now(&self) -> i64 {
let now = tx_id_now() as i64;
self.read_now_floor.map_or(now, |floor| now.max(floor))
}
pub fn execute(&self, command: DatalogCommand) -> Result<QueryResult> {
match command {
DatalogCommand::Transact(tx) => self.execute_transact(tx),
DatalogCommand::Retract(tx) => self.execute_retract(tx),
DatalogCommand::Query(query) => self.execute_query(query),
DatalogCommand::Rule(rule) => self.execute_rule(rule),
}
}
fn execute_transact(&self, tx: Transaction) -> Result<QueryResult> {
let tx_opts = if tx.valid_from.is_some() || tx.valid_to.is_some() {
Some(TransactOptions::new(tx.valid_from, tx.valid_to))
} else {
None
};
let mut fact_tuples = Vec::new();
for pattern in tx.facts {
let entity_id =
edn_to_entity_id(&pattern.entity).map_err(|e| anyhow!("Invalid entity: {}", e))?;
let attribute = match &pattern.attribute {
AttributeSpec::Real(EdnValue::Keyword(k)) => k.clone(),
AttributeSpec::Real(_) => return Err(anyhow!("Attribute must be a keyword")),
AttributeSpec::Pseudo(_) => {
return Err(anyhow!("Cannot transact a pseudo-attribute"));
}
};
let value =
edn_to_value(&pattern.value).map_err(|e| anyhow!("Invalid value: {}", e))?;
let per_fact_opts = if pattern.valid_from.is_some() || pattern.valid_to.is_some() {
Some(TransactOptions::new(pattern.valid_from, pattern.valid_to))
} else {
None
};
fact_tuples.push((entity_id, attribute, value, per_fact_opts));
}
let tx_id = self
.storage
.transact_batch(fact_tuples, tx_opts)
.map_err(|e| anyhow!("Transaction failed: {}", e))?;
Ok(QueryResult::Transacted(tx_id))
}
fn execute_retract(&self, tx: Transaction) -> Result<QueryResult> {
let mut fact_tuples = Vec::new();
for pattern in tx.facts {
let entity_id =
edn_to_entity_id(&pattern.entity).map_err(|e| anyhow!("Invalid entity: {}", e))?;
let attribute = match &pattern.attribute {
AttributeSpec::Real(EdnValue::Keyword(k)) => k.clone(),
AttributeSpec::Real(_) => return Err(anyhow!("Attribute must be a keyword")),
AttributeSpec::Pseudo(_) => {
return Err(anyhow!("Cannot transact a pseudo-attribute"));
}
};
let value =
edn_to_value(&pattern.value).map_err(|e| anyhow!("Invalid value: {}", e))?;
fact_tuples.push((entity_id, attribute, value));
}
let tx_id = self
.storage
.retract(fact_tuples)
.map_err(|e| anyhow!("Retraction failed: {}", e))?;
Ok(QueryResult::Retracted(tx_id))
}
fn filter_facts_for_query(&self, query: &DatalogQuery) -> Result<Arc<[Fact]>> {
let now = self.read_now();
let source_facts: Vec<Fact> = match (&self.facts_override, query.as_of.as_ref()) {
(Some(facts), Some(as_of)) => {
crate::graph::storage::filter_facts_as_of(facts.iter().cloned().collect(), as_of)
}
(Some(facts), None) => facts.iter().cloned().collect(),
(None, Some(as_of)) => self.storage.get_facts_as_of(as_of)?,
(None, None) => self.storage.get_all_facts()?,
};
let tx_filtered = source_facts;
let asserted = crate::graph::storage::net_asserted_facts(tx_filtered);
let valid_filtered: Vec<Fact> = match &query.valid_at {
Some(ValidAt::Timestamp(t)) => asserted
.into_iter()
.filter(|f| f.valid_from <= *t && *t < f.valid_to)
.collect(),
Some(ValidAt::AnyValidTime) => asserted,
Some(ValidAt::Slot(_)) => {
panic!("internal: unsubstituted :valid-at bind slot reached the executor");
}
None => asserted
.into_iter()
.filter(|f| f.valid_from <= now && now < f.valid_to)
.collect(),
};
Ok(Arc::from(valid_filtered))
}
fn execute_query(&self, query: DatalogQuery) -> Result<QueryResult> {
if query.uses_rules() {
return self.execute_query_with_rules(query);
}
if !query.has_binding_mechanism() {
return Err(anyhow!(
"query has no :where clause, rules, or aggregates — nothing binds the variables. \
Add a :where clause (e.g., [:find ?e ?a ?v :where [?e ?a ?v]]) or use an aggregate."
));
}
let now = self.read_now();
let valid_at_value = match &query.valid_at {
Some(ValidAt::Timestamp(t)) => Value::Integer(*t),
Some(ValidAt::AnyValidTime) => Value::Null,
Some(ValidAt::Slot(_)) => {
panic!("internal: unsubstituted :valid-at bind slot reached the executor");
}
None => Value::Integer(now),
};
if query_uses_per_fact_pseudo_attr(&query)
&& !matches!(query.valid_at, Some(ValidAt::AnyValidTime))
{
return Err(anyhow!(
"temporal pseudo-attributes :db/valid-from, :db/valid-to, :db/tx-count, and \
:db/tx-id require :any-valid-time; add :any-valid-time to your query"
));
}
let filtered_facts = self.filter_facts_for_query(&query)?;
let matcher = PatternMatcher::from_slice_with_valid_at(
filtered_facts.clone(),
valid_at_value.clone(),
);
let patterns = query.get_patterns();
let planned_patterns = optimizer::plan(patterns, &self.indexes);
let bindings = matcher.match_patterns_with_hints(&planned_patterns);
let registry = self.functions.read().unwrap();
let rules_guard = self.rules.read().unwrap();
let bindings = apply_or_clauses(
&query.where_clauses,
bindings,
filtered_facts.clone(),
&rules_guard,
query.as_of.clone(),
query.valid_at.clone(),
®istry,
)?;
drop(rules_guard);
let not_clauses: Vec<&Vec<WhereClause>> = query
.where_clauses
.iter()
.filter_map(|c| match c {
WhereClause::Not(inner) => Some(inner),
_ => None,
})
.collect();
let not_join_clauses: Vec<(Vec<String>, Vec<WhereClause>)> = query
.where_clauses
.iter()
.filter_map(|c| match c {
WhereClause::NotJoin { join_vars, clauses } => {
Some((join_vars.clone(), clauses.clone()))
}
_ => None,
})
.collect();
let not_filtered: Vec<_> = if not_clauses.is_empty() && not_join_clauses.is_empty() {
bindings
} else {
bindings
.into_iter()
.filter(|binding| {
for not_body in ¬_clauses {
if not_body_matches(
not_body,
binding,
filtered_facts.clone(),
valid_at_value.clone(),
®istry,
) {
return false;
}
}
for (join_vars, nj_clauses) in ¬_join_clauses {
if evaluate_not_join(
join_vars,
nj_clauses,
binding,
filtered_facts.clone(),
&self.functions.read().unwrap(),
) {
return false;
}
}
true
})
.collect()
};
let filtered_bindings = apply_expr_clauses(not_filtered, &query.where_clauses, ®istry)?;
let results =
apply_post_processing(filtered_bindings, &query.find, &query.with_vars, ®istry)?;
Ok(QueryResult::QueryResults {
vars: query.find.iter().map(|s| s.display_name()).collect(),
results,
})
}
fn execute_query_with_rules(&self, query: DatalogQuery) -> Result<QueryResult> {
let all_rule_invocations = query.get_rule_invocations();
let predicates: Vec<String> = all_rule_invocations
.iter()
.map(|(pred, _)| pred.clone())
.collect();
let now = self.read_now();
let valid_at_value = match &query.valid_at {
Some(ValidAt::Timestamp(t)) => Value::Integer(*t),
Some(ValidAt::AnyValidTime) => Value::Null,
Some(ValidAt::Slot(_)) => {
panic!("internal: unsubstituted :valid-at bind slot reached the executor");
}
None => Value::Integer(now),
};
if query_uses_per_fact_pseudo_attr(&query)
&& !matches!(query.valid_at, Some(ValidAt::AnyValidTime))
{
return Err(anyhow!(
"temporal pseudo-attributes :db/valid-from, :db/valid-to, :db/tx-count, and \
:db/tx-id require :any-valid-time; add :any-valid-time to your query"
));
}
let filtered_facts = self.filter_facts_for_query(&query)?;
let filtered_storage = FactStorage::new();
for fact in filtered_facts.iter().cloned() {
filtered_storage.load_fact(fact)?;
}
let evaluator = StratifiedEvaluator::new(
filtered_storage,
self.rules.clone(),
self.functions.clone(),
1000, self.max_derived_facts,
self.max_results,
);
let derived_storage = evaluator.evaluate(&predicates)?;
let mut all_patterns = query.get_patterns();
for (predicate, args) in query.get_top_level_rule_invocations() {
let pattern = match args.len() {
1 => {
Pattern::new(
args[0].clone(),
EdnValue::Keyword(format!(":{}", predicate)),
EdnValue::Symbol("?_rule_value".to_string()),
)
}
2 => {
Pattern::new(
args[0].clone(),
EdnValue::Keyword(format!(":{}", predicate)),
args[1].clone(),
)
}
n => {
return Err(anyhow!(
"Rule invocation '{}' must have 1 or 2 arguments, got {}",
predicate,
n
));
}
};
all_patterns.push(pattern);
}
let derived_facts: Arc<[Fact]> =
Arc::from(derived_storage.get_asserted_facts().unwrap_or_default());
let matcher =
PatternMatcher::from_slice_with_valid_at(derived_facts.clone(), valid_at_value.clone());
let bindings = matcher.match_patterns(&all_patterns);
let registry = self.functions.read().unwrap();
let rules_guard = self.rules.read().unwrap();
let bindings = apply_or_clauses(
&query.where_clauses,
bindings,
derived_facts.clone(),
&rules_guard,
query.as_of.clone(),
query.valid_at.clone(),
®istry,
)?;
drop(rules_guard);
let not_clauses: Vec<&Vec<WhereClause>> = query
.where_clauses
.iter()
.filter_map(|c| match c {
WhereClause::Not(inner) => Some(inner),
_ => None,
})
.collect();
let not_join_clauses: Vec<(Vec<String>, Vec<WhereClause>)> = query
.where_clauses
.iter()
.filter_map(|c| match c {
WhereClause::NotJoin { join_vars, clauses } => {
Some((join_vars.clone(), clauses.clone()))
}
_ => None,
})
.collect();
let not_filtered: Vec<_> = if not_clauses.is_empty() && not_join_clauses.is_empty() {
bindings
} else {
bindings
.into_iter()
.filter(|binding| {
for not_body in ¬_clauses {
let substituted: Vec<Pattern> = not_body
.iter()
.filter_map(|c| match c {
WhereClause::Pattern(p) => {
Some(crate::query::datalog::evaluator::substitute_pattern(
p, binding,
))
}
WhereClause::RuleInvocation { predicate, args } => {
let resolved_args: Vec<EdnValue> = args
.iter()
.map(|a| match a {
EdnValue::Symbol(s) if s.starts_with('?') => {
binding
.get(s)
.map(|v| match v {
Value::Keyword(k) => {
EdnValue::Keyword(k.clone())
}
Value::String(s) => {
EdnValue::String(s.clone())
}
Value::Integer(i) => EdnValue::Integer(*i),
Value::Float(f) => EdnValue::Float(*f),
Value::Boolean(b) => EdnValue::Boolean(*b),
Value::Ref(u) => EdnValue::Uuid(*u),
Value::Null => EdnValue::Nil,
})
.unwrap_or_else(|| a.clone())
}
other => other.clone(),
})
.collect();
let pattern = match resolved_args.len() {
1 => Pattern::new(
resolved_args[0].clone(),
EdnValue::Keyword(format!(":{}", predicate)),
EdnValue::Symbol("?_rule_value".to_string()),
),
2 => Pattern::new(
resolved_args[0].clone(),
EdnValue::Keyword(format!(":{}", predicate)),
resolved_args[1].clone(),
),
_ => return None,
};
Some(crate::query::datalog::evaluator::substitute_pattern(
&pattern, binding,
))
}
_ => None,
})
.collect();
let m = PatternMatcher::from_slice_with_valid_at(
derived_facts.clone(),
valid_at_value.clone(),
);
let mut not_bindings: Vec<Binding> = if substituted.is_empty() {
vec![binding.clone()]
} else {
m.match_patterns(&substituted)
.into_iter()
.map(|mut nb| {
for (k, v) in binding {
nb.entry(k.clone()).or_insert_with(|| v.clone());
}
nb
})
.collect()
};
not_bindings = apply_expr_clauses(not_bindings, not_body, ®istry)
.unwrap_or_default();
if !not_bindings.is_empty() {
return false; }
}
for (join_vars, nj_clauses) in ¬_join_clauses {
if evaluate_not_join(
join_vars,
nj_clauses,
binding,
derived_facts.clone(),
&self.functions.read().unwrap(),
) {
return false;
}
}
true
})
.collect()
};
let filtered_bindings = apply_expr_clauses(not_filtered, &query.where_clauses, ®istry)?;
let results =
apply_post_processing(filtered_bindings, &query.find, &query.with_vars, ®istry)?;
Ok(QueryResult::QueryResults {
vars: query.find.iter().map(|s| s.display_name()).collect(),
results,
})
}
fn execute_rule(&self, rule: Rule) -> Result<QueryResult> {
let predicate = self.extract_predicate(&rule)?;
self.rules.write().unwrap().register_rule(predicate, rule)?;
Ok(QueryResult::Ok)
}
fn extract_predicate(&self, rule: &Rule) -> Result<String> {
if rule.head.is_empty() {
return Err(anyhow!("Rule head cannot be empty"));
}
match &rule.head[0] {
EdnValue::Symbol(s) => Ok(s.clone()),
_ => Err(anyhow!(
"Rule head must start with a symbol (predicate name)"
)),
}
}
#[allow(dead_code)]
pub fn storage(&self) -> &FactStorage {
&self.storage
}
#[cfg(test)]
pub fn rules(&self) -> Arc<RwLock<RuleRegistry>> {
self.rules.clone()
}
}
fn not_body_matches(
not_body: &[WhereClause],
outer: &Binding,
storage: Arc<[Fact]>,
valid_at: Value,
registry: &FunctionRegistry,
) -> bool {
use crate::query::datalog::evaluator::substitute_pattern;
let patterns: Vec<_> = not_body
.iter()
.filter_map(|c| match c {
WhereClause::Pattern(p) => Some(substitute_pattern(p, outer)),
_ => None,
})
.collect();
let matcher = crate::query::datalog::matcher::PatternMatcher::from_slice_with_valid_at(
storage.clone(),
valid_at,
);
let mut not_bindings: Vec<Binding> = if patterns.is_empty() {
vec![outer.clone()]
} else {
matcher
.match_patterns(&patterns)
.into_iter()
.map(|mut nb| {
for (k, v) in outer {
nb.entry(k.clone()).or_insert_with(|| v.clone());
}
nb
})
.collect()
};
not_bindings = apply_expr_clauses(not_bindings, not_body, registry).unwrap_or_default();
!not_bindings.is_empty()
}
fn extract_variables(
bindings: Vec<std::collections::HashMap<String, Value>>,
find_specs: &[FindSpec],
) -> Vec<Vec<Value>> {
let mut results = Vec::new();
for binding in bindings {
let mut row = Vec::new();
for spec in find_specs {
if let Some(value) = binding.get(spec.var()) {
row.push(value.clone());
} else {
break;
}
}
if row.len() == find_specs.len() {
results.push(row);
}
}
results
}
type Binding = std::collections::HashMap<String, Value>;
fn apply_post_processing(
bindings: Vec<Binding>,
find_specs: &[FindSpec],
with_vars: &[String],
registry: &FunctionRegistry,
) -> Result<Vec<Vec<Value>>> {
let has_aggregates = find_specs
.iter()
.any(|s| matches!(s, FindSpec::Aggregate { .. }));
let has_windows = find_specs.iter().any(|s| matches!(s, FindSpec::Window(_)));
if !has_aggregates && !has_windows {
return Ok(extract_variables(bindings, find_specs));
}
let mut working: Vec<Binding> = if has_aggregates {
compute_aggregation(bindings, find_specs, with_vars, registry)?
} else {
bindings
};
if has_windows {
apply_window_functions(&mut working, find_specs, registry)?;
}
Ok(project_find_specs(&working, find_specs))
}
fn compute_aggregation(
bindings: Vec<Binding>,
find_specs: &[FindSpec],
with_vars: &[String],
registry: &FunctionRegistry,
) -> Result<Vec<Binding>> {
let has_grouping_vars = find_specs
.iter()
.any(|s| matches!(s, FindSpec::Variable(_)));
if bindings.is_empty() {
let all_count = !has_grouping_vars
&& find_specs.iter().all(|s| {
matches!(s, FindSpec::Aggregate { func, .. }
if func == "count" || func == "count-distinct")
});
if all_count {
let mut b = Binding::new();
for (i, _) in find_specs.iter().enumerate() {
b.insert(format!("__agg_{}", i), Value::Integer(0));
}
return Ok(vec![b]);
}
return Ok(vec![]);
}
let has_windows = find_specs.iter().any(|s| matches!(s, FindSpec::Window(_)));
let mut group_var_names: Vec<&str> = find_specs
.iter()
.filter_map(|s| match s {
FindSpec::Variable(v) => Some(v.as_str()),
_ => None,
})
.collect();
if !has_windows {
group_var_names.extend(with_vars.iter().map(|s| s.as_str()));
}
use std::collections::BTreeMap;
let mut groups: BTreeMap<Vec<Value>, Vec<Binding>> = BTreeMap::new();
for b in bindings {
let key: Vec<Value> = group_var_names
.iter()
.map(|v| b.get(*v).cloned().unwrap_or(Value::Null))
.collect();
groups.entry(key).or_default().push(b);
}
let mut group_key_idx: std::collections::HashMap<&str, usize> =
std::collections::HashMap::new();
{
let mut var_pos = 0usize;
for spec in find_specs {
if let FindSpec::Variable(v) = spec {
group_key_idx.insert(v.as_str(), var_pos);
var_pos += 1;
}
}
}
let mut results: Vec<Binding> = Vec::new();
for (key, group_bindings) in groups.iter() {
let mut binding = Binding::new();
let mut skip = false;
for (v, &idx) in &group_key_idx {
binding.insert((*v).to_string(), key[idx].clone());
}
for (i, spec) in find_specs.iter().enumerate() {
if let FindSpec::Aggregate { func, var } = spec {
let non_null: Vec<&Value> = group_bindings
.iter()
.filter_map(|b| b.get(var.as_str()))
.filter(|v| !matches!(v, Value::Null))
.collect();
let agg_val: anyhow::Result<Value> = match registry.get(func.as_str()) {
Some(desc) if desc.is_builtin => {
apply_builtin_aggregate(func, &non_null)
}
Some(desc) => {
if let AggImpl::Udf(ops) = &desc.impl_ {
if non_null.is_empty() {
Ok(Value::Null)
} else {
let mut acc = (ops.init)();
for v in &non_null {
(ops.step)(&mut acc, v);
}
Ok((ops.finalise)(&acc, non_null.len()))
}
} else {
apply_builtin_aggregate(func, &non_null)
}
}
None => Err(anyhow::anyhow!("unknown aggregate function: '{}'", func)),
};
match agg_val {
Ok(v) => {
binding.insert(format!("__agg_{}", i), v);
}
Err(e) => {
let msg = e.to_string();
if msg.contains("no non-null values in group") {
skip = true;
break;
}
return Err(e);
}
}
}
}
if !skip {
results.push(binding);
}
}
Ok(results)
}
fn apply_window_functions(
bindings: &mut [Binding],
find_specs: &[FindSpec],
registry: &FunctionRegistry,
) -> Result<()> {
for (i, spec) in find_specs.iter().enumerate() {
let FindSpec::Window(ws) = spec else {
continue;
};
let key = format!("__win_{}", i);
let mut partitions: HashMap<Option<Value>, Vec<usize>> = HashMap::new();
for (row_idx, binding) in bindings.iter().enumerate() {
let part_key = ws
.partition_by
.as_ref()
.and_then(|pv| binding.get(pv))
.cloned();
partitions.entry(part_key).or_default().push(row_idx);
}
for row_indices in partitions.values_mut() {
let mut keyed: Vec<(Value, usize)> = row_indices
.iter()
.map(|&i| {
let k = bindings[i]
.get(&ws.order_by)
.cloned()
.unwrap_or(Value::Null);
(k, i)
})
.collect();
keyed.sort_by(|(a, _), (b, _)| {
let cmp = value_cmp(a, b);
match ws.order {
Order::Asc => cmp,
Order::Desc => cmp.reverse(),
}
});
for (dest, (_, src)) in row_indices.iter_mut().zip(keyed.iter()) {
*dest = *src;
}
let window_values: Vec<Value> = match ws.func {
WindowFunc::RowNumber => (1..=keyed.len())
.map(|pos| Value::Integer(pos as i64))
.collect(),
WindowFunc::Rank => {
let mut values = Vec::with_capacity(keyed.len());
let mut rank = 1i64;
let mut prev: Option<&Value> = None;
for (row_num, (key, _)) in (1i64..).zip(keyed.iter()) {
if prev != Some(key) {
rank = row_num;
prev = Some(key);
}
values.push(Value::Integer(rank));
}
values
}
_ => {
let func_name = ws.func_name();
let desc = registry.get(&func_name).ok_or_else(|| {
anyhow::anyhow!(
"unknown window function '{}' — register it with register_aggregate() before querying",
func_name
)
})?;
let mut values = Vec::with_capacity(keyed.len());
match &desc.impl_ {
AggImpl::Builtin(ops) => {
let mut acc = (ops.init)();
for (_, row_idx) in keyed.iter() {
let val = ws
.var
.as_ref()
.and_then(|v| bindings[*row_idx].get(v))
.unwrap_or(&Value::Null);
(ops.step)(&mut acc, val);
values.push((ops.finalise)(&acc));
}
}
AggImpl::Udf(ops) => {
let mut acc = (ops.init)();
let mut row_count = 0usize;
for (_, row_idx) in keyed.iter() {
let val = ws
.var
.as_ref()
.and_then(|v| bindings[*row_idx].get(v))
.unwrap_or(&Value::Null);
(ops.step)(&mut acc, val);
row_count += 1;
values.push((ops.finalise)(&acc, row_count));
}
}
}
values
}
};
for (&row_idx, window_val) in row_indices.iter().zip(window_values) {
bindings[row_idx].insert(key.clone(), window_val);
}
}
}
Ok(())
}
fn project_find_specs(bindings: &[Binding], find_specs: &[FindSpec]) -> Vec<Vec<Value>> {
let mut results = Vec::new();
for binding in bindings {
let mut row = Vec::new();
let mut complete = true;
for (i, spec) in find_specs.iter().enumerate() {
let val = match spec {
FindSpec::Variable(v) => binding.get(v).cloned(),
FindSpec::Aggregate { .. } => binding.get(&format!("__agg_{}", i)).cloned(),
FindSpec::Window(_) => binding.get(&format!("__win_{}", i)).cloned(),
};
match val {
Some(v) => row.push(v),
None => {
complete = false;
break;
}
}
}
if complete {
results.push(row);
}
}
results
}
pub(crate) fn evaluate_branch(
branch: &[WhereClause],
incoming: Vec<Binding>,
storage: Arc<[Fact]>,
rules: &crate::query::datalog::rules::RuleRegistry,
as_of: Option<AsOf>,
valid_at: Option<ValidAt>,
registry: &FunctionRegistry,
) -> anyhow::Result<Vec<Binding>> {
use crate::query::datalog::evaluator::rule_invocation_to_pattern;
use crate::query::datalog::matcher::PatternMatcher;
if incoming.is_empty() {
return Ok(vec![]);
}
let branch_valid_at_value = match &valid_at {
Some(ValidAt::Timestamp(t)) => Value::Integer(*t),
Some(ValidAt::AnyValidTime) => Value::Null,
Some(ValidAt::Slot(_)) => {
panic!("internal: unsubstituted :valid-at bind slot reached the executor");
}
None => Value::Integer(tx_id_now() as i64),
};
let patterns: Vec<Pattern> = branch
.iter()
.filter_map(|c| match c {
WhereClause::Pattern(p) => Some(p.clone()),
WhereClause::RuleInvocation { predicate, args } => {
rule_invocation_to_pattern(predicate, args).ok()
}
_ => None,
})
.collect();
let matcher =
PatternMatcher::from_slice_with_valid_at(storage.clone(), branch_valid_at_value.clone());
let bindings = if patterns.is_empty() {
incoming
} else {
matcher.match_patterns_seeded(&patterns, incoming)
};
if bindings.is_empty() {
return Ok(vec![]);
}
let bindings = apply_or_clauses(
branch,
bindings,
storage.clone(),
rules,
as_of.clone(),
valid_at.clone(),
registry,
)?;
if bindings.is_empty() {
return Ok(vec![]);
}
let not_clauses: Vec<&Vec<WhereClause>> = branch
.iter()
.filter_map(|c| match c {
WhereClause::Not(inner) => Some(inner),
_ => None,
})
.collect();
let not_join_clauses: Vec<(Vec<String>, Vec<WhereClause>)> = branch
.iter()
.filter_map(|c| match c {
WhereClause::NotJoin { join_vars, clauses } => {
Some((join_vars.clone(), clauses.clone()))
}
_ => None,
})
.collect();
let bindings = if not_clauses.is_empty() && not_join_clauses.is_empty() {
bindings
} else {
bindings
.into_iter()
.filter(|binding| {
for not_body in ¬_clauses {
if not_body_matches(
not_body,
binding,
storage.clone(),
branch_valid_at_value.clone(),
registry,
) {
return false;
}
}
for (join_vars, nj_clauses) in ¬_join_clauses {
if evaluate_not_join(join_vars, nj_clauses, binding, storage.clone(), registry)
{
return false;
}
}
true
})
.collect()
};
let bindings = apply_expr_clauses(bindings, branch, registry)?;
Ok(bindings)
}
pub(crate) fn apply_or_clauses(
clauses: &[WhereClause],
mut bindings: Vec<Binding>,
storage: Arc<[Fact]>,
rules: &crate::query::datalog::rules::RuleRegistry,
as_of: Option<AsOf>,
valid_at: Option<ValidAt>,
registry: &FunctionRegistry,
) -> anyhow::Result<Vec<Binding>> {
for clause in clauses {
match clause {
WhereClause::Or(branches) => {
let mut seen: std::collections::HashSet<Vec<(String, crate::graph::types::Value)>> =
std::collections::HashSet::new();
let mut result: Vec<Binding> = Vec::new();
for branch in branches {
let branch_result = evaluate_branch(
branch,
bindings.clone(),
storage.clone(),
rules,
as_of.clone(),
valid_at.clone(),
registry,
)?;
for b in branch_result {
let mut key: Vec<_> =
b.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
key.sort_unstable();
if seen.insert(key) {
result.push(b);
}
}
}
bindings = result;
}
WhereClause::OrJoin {
join_vars,
branches,
} => {
let outer_keys: std::collections::HashSet<String> =
bindings.iter().flat_map(|b| b.keys().cloned()).collect();
let mut seen: std::collections::HashSet<Vec<(String, crate::graph::types::Value)>> =
std::collections::HashSet::new();
let mut result: Vec<Binding> = Vec::new();
for branch in branches {
let branch_result = evaluate_branch(
branch,
bindings.clone(),
storage.clone(),
rules,
as_of.clone(),
valid_at.clone(),
registry,
)?;
for mut b in branch_result {
if !join_vars.iter().all(|v| b.contains_key(v)) {
continue;
}
b.retain(|k, _| outer_keys.contains(k));
let key: Vec<_> = b.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
if seen.insert(key) {
result.push(b);
}
}
}
bindings = result;
}
_ => {} }
}
Ok(bindings)
}
pub(crate) fn is_truthy(v: &Value) -> bool {
match v {
Value::Boolean(b) => *b,
Value::Integer(i) => *i != 0,
Value::Float(f) => *f != 0.0,
_ => false,
}
}
fn to_float_pair(l: &Value, r: &Value) -> Result<(f64, f64), ()> {
let lf = match l {
Value::Integer(i) => *i as f64,
Value::Float(f) => *f,
_ => return Err(()),
};
let rf = match r {
Value::Integer(i) => *i as f64,
Value::Float(f) => *f,
_ => return Err(()),
};
Ok((lf, rf))
}
fn eval_binop(op: &BinOp, l: Value, r: Value) -> Result<Value, ()> {
match op {
BinOp::Eq => return Ok(Value::Boolean(l == r)),
BinOp::Neq => return Ok(Value::Boolean(l != r)),
_ => {}
}
match op {
BinOp::Lt | BinOp::Gt | BinOp::Lte | BinOp::Gte => {
let (lf, rf) = to_float_pair(&l, &r)?;
Ok(Value::Boolean(match op {
BinOp::Lt => lf < rf,
BinOp::Gt => lf > rf,
BinOp::Lte => lf <= rf,
BinOp::Gte => lf >= rf,
_ => unreachable!(),
}))
}
BinOp::Add | BinOp::Sub | BinOp::Mul | BinOp::Div => match (&l, &r) {
(Value::Integer(a), Value::Integer(b)) => match op {
BinOp::Add => Ok(Value::Integer(a.wrapping_add(*b))),
BinOp::Sub => Ok(Value::Integer(a.wrapping_sub(*b))),
BinOp::Mul => Ok(Value::Integer(a.wrapping_mul(*b))),
BinOp::Div => {
if *b == 0 {
Err(())
} else {
Ok(Value::Integer(a / b))
}
}
_ => unreachable!(),
},
_ => {
let (lf, rf) = to_float_pair(&l, &r)?;
match op {
BinOp::Add => {
let r = lf + rf;
if r.is_nan() {
Err(())
} else {
Ok(Value::Float(r))
}
}
BinOp::Sub => {
let r = lf - rf;
if r.is_nan() {
Err(())
} else {
Ok(Value::Float(r))
}
}
BinOp::Mul => {
let r = lf * rf;
if r.is_nan() {
Err(())
} else {
Ok(Value::Float(r))
}
}
BinOp::Div => {
if rf == 0.0 || rf.is_nan() {
Err(())
} else {
Ok(Value::Float(lf / rf))
}
}
_ => unreachable!(),
}
}
},
BinOp::StartsWith => match (l, r) {
(Value::String(s), Value::String(prefix)) => {
Ok(Value::Boolean(s.starts_with(prefix.as_str())))
}
_ => Err(()),
},
BinOp::EndsWith => match (l, r) {
(Value::String(s), Value::String(suffix)) => {
Ok(Value::Boolean(s.ends_with(suffix.as_str())))
}
_ => Err(()),
},
BinOp::Contains => match (l, r) {
(Value::String(s), Value::String(needle)) => {
Ok(Value::Boolean(s.contains(needle.as_str())))
}
_ => Err(()),
},
BinOp::Matches { regex: re, .. } => match (l, r) {
(Value::String(s), Value::String(_)) => Ok(Value::Boolean(re.is_match(&s))),
_ => Err(()),
},
BinOp::Eq | BinOp::Neq => unreachable!(),
}
}
pub(crate) fn eval_expr(
expr: &Expr,
binding: &std::collections::HashMap<String, Value>,
registry: Option<&FunctionRegistry>,
) -> Result<Value, ()> {
match expr {
Expr::Var(v) => binding.get(v).cloned().ok_or(()),
Expr::Lit(val) => Ok(val.clone()),
Expr::UnaryOp(op, arg) => {
let v = eval_expr(arg, binding, registry)?;
match op {
UnaryOp::StringQ => Ok(Value::Boolean(matches!(v, Value::String(_)))),
UnaryOp::IntegerQ => Ok(Value::Boolean(matches!(v, Value::Integer(_)))),
UnaryOp::FloatQ => Ok(Value::Boolean(matches!(v, Value::Float(_)))),
UnaryOp::BooleanQ => Ok(Value::Boolean(matches!(v, Value::Boolean(_)))),
UnaryOp::NilQ => Ok(Value::Boolean(matches!(v, Value::Null))),
UnaryOp::Udf(name) => {
let desc = registry.and_then(|r| r.get_predicate(name)).ok_or(())?;
Ok(Value::Boolean((desc.f)(&v)))
}
}
}
Expr::BinOp(op, lhs, rhs) => {
let l = eval_expr(lhs, binding, registry)?;
let r = eval_expr(rhs, binding, registry)?;
eval_binop(op, l, r)
}
Expr::Slot(_) => {
panic!("internal: unsubstituted bind slot reached eval_expr");
}
}
}
pub(crate) fn apply_expr_clauses(
mut bindings: Vec<Binding>,
where_clauses: &[WhereClause],
registry: &FunctionRegistry,
) -> anyhow::Result<Vec<Binding>> {
for clause in where_clauses {
if let WhereClause::Expr {
expr: Expr::UnaryOp(UnaryOp::Udf(name), _),
..
} = clause
&& registry.get_predicate(name).is_none()
{
anyhow::bail!("unknown predicate: '{}'", name);
}
}
for clause in where_clauses {
if let WhereClause::Expr { expr, binding: out } = clause {
bindings = bindings
.into_iter()
.filter_map(|mut b| match eval_expr(expr, &b, Some(registry)) {
Ok(v) => {
if let Some(var) = out {
b.insert(var.clone(), v);
Some(b)
} else if is_truthy(&v) {
Some(b)
} else {
None
}
}
Err(()) => None,
})
.collect();
}
}
Ok(bindings)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::query::datalog::parser::parse_datalog_command;
use crate::query::datalog::types::WhereClause;
use uuid::Uuid;
#[test]
fn test_execute_transact() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage);
let cmd = parse_datalog_command(
r#"(transact [[:alice :person/name "Alice"]
[:alice :person/age 30]])"#,
)
.unwrap();
let result = executor.execute(cmd).unwrap();
match result {
QueryResult::Transacted(tx_id) => {
assert!(tx_id > 0);
}
_ => panic!("Expected Transacted result"),
}
let facts = executor.storage().get_asserted_facts().unwrap();
assert_eq!(facts.len(), 2);
}
#[test]
fn test_execute_simple_query() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage.clone());
let alice_id = Uuid::new_v4();
storage
.transact(
vec![
(
alice_id,
":person/name".to_string(),
Value::String("Alice".to_string()),
),
(alice_id, ":person/age".to_string(), Value::Integer(30)),
],
None,
)
.unwrap();
let cmd = parse_datalog_command(r#"(query [:find ?name :where [?e :person/name ?name]])"#)
.unwrap();
let result = executor.execute(cmd).unwrap();
match result {
QueryResult::QueryResults { vars, results } => {
assert_eq!(vars, vec!["?name"]);
assert_eq!(results.len(), 1);
assert_eq!(results[0][0], Value::String("Alice".to_string()));
}
_ => panic!("Expected QueryResults"),
}
}
#[test]
fn test_execute_multi_pattern_query() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage.clone());
let alice_id = Uuid::new_v4();
storage
.transact(
vec![
(
alice_id,
":person/name".to_string(),
Value::String("Alice".to_string()),
),
(alice_id, ":person/age".to_string(), Value::Integer(30)),
],
None,
)
.unwrap();
let cmd = parse_datalog_command(
r#"(query [:find ?name ?age
:where [?e :person/name ?name]
[?e :person/age ?age]])"#,
)
.unwrap();
let result = executor.execute(cmd).unwrap();
match result {
QueryResult::QueryResults { vars, results } => {
assert_eq!(vars, vec!["?name", "?age"]);
assert_eq!(results.len(), 1);
assert_eq!(results[0][0], Value::String("Alice".to_string()));
assert_eq!(results[0][1], Value::Integer(30));
}
_ => panic!("Expected QueryResults"),
}
}
#[test]
fn test_execute_query_no_results() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage);
let cmd = parse_datalog_command(r#"(query [:find ?name :where [?e :person/name ?name]])"#)
.unwrap();
let result = executor.execute(cmd).unwrap();
match result {
QueryResult::QueryResults { vars, results } => {
assert_eq!(vars, vec!["?name"]);
assert_eq!(results.len(), 0);
}
_ => panic!("Expected QueryResults"),
}
}
#[test]
fn test_execute_retract() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage.clone());
let alice_id = Uuid::new_v4();
storage
.transact(
vec![(alice_id, ":person/age".to_string(), Value::Integer(30))],
None,
)
.unwrap();
let current_value = storage
.get_current_value(&alice_id, &":person/age".to_string())
.unwrap();
assert_eq!(current_value, Some(Value::Integer(30)));
std::thread::sleep(std::time::Duration::from_millis(2));
let cmd = parse_datalog_command(
format!(r#"(retract [[#uuid "{}" :person/age 30]])"#, alice_id).as_str(),
)
.unwrap();
let result = executor.execute(cmd).unwrap();
match result {
QueryResult::Retracted(tx_id) => {
assert!(tx_id > 0);
}
_ => panic!("Expected Retracted result"),
}
let current_value = storage
.get_current_value(&alice_id, &":person/age".to_string())
.unwrap();
assert_eq!(current_value, None);
}
#[test]
fn test_transact_with_keyword_entity() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage.clone());
let cmd = parse_datalog_command(
r#"(transact [[:alice :person/name "Alice"]
[:alice :person/age 30]])"#,
)
.unwrap();
let result = executor.execute(cmd).unwrap();
match result {
QueryResult::Transacted(_) => {}
_ => panic!("Expected Transacted result"),
}
let query_cmd = parse_datalog_command(
r#"(query [:find ?name ?age
:where [?e :person/name ?name]
[?e :person/age ?age]])"#,
)
.unwrap();
let result = executor.execute(query_cmd).unwrap();
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(results.len(), 1);
assert_eq!(results[0][0], Value::String("Alice".to_string()));
assert_eq!(results[0][1], Value::Integer(30));
}
_ => panic!("Expected QueryResults"),
}
}
#[test]
fn test_register_rule() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage);
let cmd =
parse_datalog_command(r#"(rule [(reachable ?x ?y) [?x :connected ?y]])"#).unwrap();
let result = executor.execute(cmd).unwrap();
assert_eq!(result, QueryResult::Ok);
let registry = executor.rules();
let rules = registry.read().unwrap().get_rules("reachable");
assert_eq!(rules.len(), 1);
}
#[test]
fn test_register_multiple_rules_same_predicate() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage);
let cmd1 =
parse_datalog_command(r#"(rule [(reachable ?x ?y) [?x :connected ?y]])"#).unwrap();
executor.execute(cmd1).unwrap();
let cmd2 = parse_datalog_command(
r#"(rule [(reachable ?x ?y) [?x :connected ?z] (reachable ?z ?y)])"#,
)
.unwrap();
executor.execute(cmd2).unwrap();
let registry = executor.rules();
let rules = registry.read().unwrap().get_rules("reachable");
assert_eq!(rules.len(), 2);
}
#[test]
fn test_register_rules_different_predicates() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage);
let cmd1 =
parse_datalog_command(r#"(rule [(reachable ?x ?y) [?x :connected ?y]])"#).unwrap();
executor.execute(cmd1).unwrap();
let cmd2 = parse_datalog_command(r#"(rule [(ancestor ?a ?d) [?a :parent ?d]])"#).unwrap();
executor.execute(cmd2).unwrap();
let registry = executor.rules();
let reg_read = registry.read().unwrap();
assert!(reg_read.has_rule("reachable"));
assert!(reg_read.has_rule("ancestor"));
assert_eq!(reg_read.predicate_count(), 2);
}
#[test]
fn test_query_with_rule_invocation() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage.clone());
let a = Uuid::new_v4();
let b = Uuid::new_v4();
let c = Uuid::new_v4();
storage
.transact(
vec![
(a, ":connected".to_string(), Value::Ref(b)),
(a, ":connected".to_string(), Value::Ref(c)),
],
None,
)
.unwrap();
let rule1 =
parse_datalog_command(r#"(rule [(reachable ?x ?y) [?x :connected ?y]])"#).unwrap();
executor.execute(rule1).unwrap();
let query_str = format!(
r#"(query [:find ?to :where (reachable #uuid "{}" ?to)])"#,
a
);
let query_cmd = parse_datalog_command(&query_str).unwrap();
let result = executor.execute(query_cmd).unwrap();
match result {
QueryResult::QueryResults { vars, results } => {
assert_eq!(vars, vec!["?to"]);
assert_eq!(results.len(), 2);
let result_uuids: Vec<Uuid> = results
.iter()
.map(|row| match &row[0] {
Value::Ref(uuid) => *uuid,
_ => panic!("Expected Ref value"),
})
.collect();
assert!(result_uuids.contains(&b));
assert!(result_uuids.contains(&c));
}
_ => panic!("Expected QueryResults"),
}
}
#[test]
fn test_query_mixed_pattern_and_rule() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage.clone());
let a = Uuid::new_v4();
let b = Uuid::new_v4();
let c = Uuid::new_v4();
storage
.transact(
vec![
(a, ":connected".to_string(), Value::Ref(b)),
(a, ":connected".to_string(), Value::Ref(c)),
(
b,
":person/name".to_string(),
Value::String("Bob".to_string()),
),
],
None,
)
.unwrap();
executor
.execute(
parse_datalog_command(r#"(rule [(reachable ?x ?y) [?x :connected ?y]])"#).unwrap(),
)
.unwrap();
let query_str = format!(
r#"(query [:find ?name :where (reachable #uuid "{}" ?to) [?to :person/name ?name]])"#,
a
);
let query_cmd = parse_datalog_command(&query_str).unwrap();
let result = executor.execute(query_cmd).unwrap();
match result {
QueryResult::QueryResults { vars, results } => {
assert_eq!(vars, vec!["?name"]);
assert_eq!(results.len(), 1);
assert_eq!(results[0][0], Value::String("Bob".to_string()));
}
_ => panic!("Expected QueryResults"),
}
}
#[test]
fn test_query_with_recursive_transitive_closure() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage.clone());
let a = Uuid::new_v4();
let b = Uuid::new_v4();
let c = Uuid::new_v4();
storage
.transact(
vec![
(a, ":connected".to_string(), Value::Ref(b)),
(b, ":connected".to_string(), Value::Ref(c)),
],
None,
)
.unwrap();
executor
.execute(
parse_datalog_command(r#"(rule [(reachable ?x ?y) [?x :connected ?y]])"#).unwrap(),
)
.unwrap();
executor
.execute(
parse_datalog_command(
r#"(rule [(reachable ?x ?y) [?x :connected ?z] (reachable ?z ?y)])"#,
)
.unwrap(),
)
.unwrap();
let query_str = format!(
r#"(query [:find ?to :where (reachable #uuid "{}" ?to)])"#,
a
);
let query_cmd = parse_datalog_command(&query_str).unwrap();
let result = executor.execute(query_cmd).unwrap();
match result {
QueryResult::QueryResults { vars, results } => {
assert_eq!(vars, vec!["?to"]);
assert_eq!(results.len(), 2);
let result_uuids: Vec<Uuid> = results
.iter()
.map(|row| match &row[0] {
Value::Ref(uuid) => *uuid,
_ => panic!("Expected Ref value"),
})
.collect();
assert!(result_uuids.contains(&b));
assert!(result_uuids.contains(&c));
}
_ => panic!("Expected QueryResults"),
}
}
#[test]
fn test_default_query_filters_to_currently_valid() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage.clone());
let alice = Uuid::new_v4();
executor
.execute(DatalogCommand::Transact(Transaction {
facts: vec![Pattern::new(
EdnValue::Uuid(alice),
EdnValue::Keyword(":person/name".to_string()),
EdnValue::String("Alice".to_string()),
)],
valid_from: None,
valid_to: None,
}))
.unwrap();
executor
.execute(DatalogCommand::Transact(Transaction {
facts: vec![Pattern::new(
EdnValue::Uuid(alice),
EdnValue::Keyword(":employment/status".to_string()),
EdnValue::Keyword(":active".to_string()),
)],
valid_from: Some(1000_i64),
valid_to: Some(2000_i64), }))
.unwrap();
let result = executor
.execute(DatalogCommand::Query(DatalogQuery::new(
vec![FindSpec::Variable("?attr".to_string())],
vec![WhereClause::Pattern(Pattern::new(
EdnValue::Uuid(alice),
EdnValue::Symbol("?attr".to_string()),
EdnValue::Symbol("?v".to_string()),
))],
)))
.unwrap();
let rows = match result {
QueryResult::QueryResults { results, .. } => results,
_ => panic!("expected query results"),
};
assert_eq!(rows.len(), 1); }
#[test]
fn test_as_of_counter_shows_past_state() {
use crate::query::datalog::types::AsOf;
use crate::query::datalog::types::ValidAt;
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage);
let alice = Uuid::new_v4();
executor
.execute(DatalogCommand::Transact(Transaction {
facts: vec![Pattern::new(
EdnValue::Uuid(alice),
EdnValue::Keyword(":person/name".to_string()),
EdnValue::String("Alice".to_string()),
)],
valid_from: None,
valid_to: None,
}))
.unwrap();
executor
.execute(DatalogCommand::Transact(Transaction {
facts: vec![Pattern::new(
EdnValue::Uuid(alice),
EdnValue::Keyword(":person/age".to_string()),
EdnValue::Integer(30),
)],
valid_from: None,
valid_to: None,
}))
.unwrap();
let result = executor
.execute(DatalogCommand::Query(DatalogQuery {
find: vec![FindSpec::Variable("?attr".to_string())],
where_clauses: vec![WhereClause::Pattern(Pattern::new(
EdnValue::Uuid(alice),
EdnValue::Symbol("?attr".to_string()),
EdnValue::Symbol("?v".to_string()),
))],
as_of: Some(AsOf::Counter(1)),
valid_at: Some(ValidAt::AnyValidTime),
with_vars: Vec::new(),
}))
.unwrap();
let rows = match result {
QueryResult::QueryResults { results, .. } => results,
_ => panic!("expected query results"),
};
assert_eq!(rows.len(), 1);
}
#[test]
fn test_valid_at_any_valid_time_shows_all() {
use crate::query::datalog::types::ValidAt;
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage);
let alice = Uuid::new_v4();
executor
.execute(DatalogCommand::Transact(Transaction {
facts: vec![Pattern::new(
EdnValue::Uuid(alice),
EdnValue::Keyword(":person/name".to_string()),
EdnValue::String("Alice".to_string()),
)],
valid_from: None,
valid_to: None,
}))
.unwrap();
executor
.execute(DatalogCommand::Transact(Transaction {
facts: vec![Pattern::new(
EdnValue::Uuid(alice),
EdnValue::Keyword(":employment/status".to_string()),
EdnValue::Keyword(":active".to_string()),
)],
valid_from: Some(1000_i64),
valid_to: Some(2000_i64), }))
.unwrap();
let result = executor
.execute(DatalogCommand::Query(DatalogQuery {
find: vec![FindSpec::Variable("?attr".to_string())],
where_clauses: vec![WhereClause::Pattern(Pattern::new(
EdnValue::Uuid(alice),
EdnValue::Symbol("?attr".to_string()),
EdnValue::Symbol("?v".to_string()),
))],
as_of: None,
valid_at: Some(ValidAt::AnyValidTime),
with_vars: Vec::new(),
}))
.unwrap();
let rows = match result {
QueryResult::QueryResults { results, .. } => results,
_ => panic!("expected query results"),
};
assert_eq!(rows.len(), 2);
}
#[test]
fn test_query_recursive_with_mixed_patterns() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage.clone());
let a = Uuid::new_v4();
let b = Uuid::new_v4();
let c = Uuid::new_v4();
storage
.transact(
vec![
(a, ":connected".to_string(), Value::Ref(b)),
(b, ":connected".to_string(), Value::Ref(c)),
(
c,
":person/name".to_string(),
Value::String("Charlie".to_string()),
),
],
None,
)
.unwrap();
executor
.execute(
parse_datalog_command(r#"(rule [(reachable ?x ?y) [?x :connected ?y]])"#).unwrap(),
)
.unwrap();
executor
.execute(
parse_datalog_command(
r#"(rule [(reachable ?x ?y) [?x :connected ?z] (reachable ?z ?y)])"#,
)
.unwrap(),
)
.unwrap();
let query_str = format!(
r#"(query [:find ?name :where (reachable #uuid "{}" ?to) [?to :person/name ?name]])"#,
a
);
let query_cmd = parse_datalog_command(&query_str).unwrap();
let result = executor.execute(query_cmd).unwrap();
match result {
QueryResult::QueryResults { vars, results } => {
assert_eq!(vars, vec!["?name"]);
assert_eq!(results.len(), 1);
assert_eq!(results[0][0], Value::String("Charlie".to_string()));
}
_ => panic!("Expected QueryResults"),
}
}
#[test]
fn test_execute_query_not_as_pure_filter() {
use crate::query::datalog::types::WhereClause;
let storage = FactStorage::new();
let alice = uuid::Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
let bob = uuid::Uuid::parse_str("00000000-0000-0000-0000-000000000002").unwrap();
storage
.transact(
vec![
(alice, ":applied".to_string(), Value::Boolean(true)),
(alice, ":rejected".to_string(), Value::Boolean(true)),
],
None,
)
.unwrap();
storage
.transact(
vec![(bob, ":applied".to_string(), Value::Boolean(true))],
None,
)
.unwrap();
let query = DatalogQuery::new(
vec![FindSpec::Variable("?e".to_string())],
vec![
WhereClause::Pattern(Pattern::new(
EdnValue::Symbol("?e".to_string()),
EdnValue::Keyword(":applied".to_string()),
EdnValue::Boolean(true),
)),
WhereClause::Not(vec![WhereClause::Pattern(Pattern::new(
EdnValue::Symbol("?e".to_string()),
EdnValue::Keyword(":rejected".to_string()),
EdnValue::Boolean(true),
))]),
],
);
let executor = DatalogExecutor::new(storage);
let result = executor
.execute(crate::query::datalog::types::DatalogCommand::Query(query))
.unwrap();
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(results.len(), 1, "only bob should pass (alice is rejected)");
}
_ => panic!("Expected QueryResults"),
}
}
#[test]
fn test_execute_query_with_rules_not_in_query_body() {
use crate::query::datalog::types::{Pattern, WhereClause};
let storage = FactStorage::new();
let a = uuid::Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
let b = uuid::Uuid::parse_str("00000000-0000-0000-0000-000000000002").unwrap();
let c = uuid::Uuid::parse_str("00000000-0000-0000-0000-000000000003").unwrap();
storage
.transact(
vec![
(a, ":connected".to_string(), Value::Ref(b)),
(a, ":connected".to_string(), Value::Ref(c)),
(c, ":blocked".to_string(), Value::Boolean(true)),
],
None,
)
.unwrap();
let rules = Arc::new(RwLock::new(RuleRegistry::new()));
{
use crate::query::datalog::types::{Rule, WhereClause as WC};
let rule = Rule {
head: vec![
EdnValue::Symbol("reachable".to_string()),
EdnValue::Symbol("?from".to_string()),
EdnValue::Symbol("?to".to_string()),
],
body: vec![WC::Pattern(Pattern::new(
EdnValue::Symbol("?from".to_string()),
EdnValue::Keyword(":connected".to_string()),
EdnValue::Symbol("?to".to_string()),
))],
};
rules
.write()
.unwrap()
.register_rule("reachable".to_string(), rule)
.unwrap();
}
let query = DatalogQuery::new(
vec![FindSpec::Variable("?x".to_string())],
vec![
WhereClause::RuleInvocation {
predicate: "reachable".to_string(),
args: vec![
EdnValue::Symbol("?_a".to_string()),
EdnValue::Symbol("?x".to_string()),
],
},
WhereClause::Not(vec![WhereClause::Pattern(Pattern::new(
EdnValue::Symbol("?x".to_string()),
EdnValue::Keyword(":blocked".to_string()),
EdnValue::Boolean(true),
))]),
],
);
let executor = DatalogExecutor::new_with_rules(storage, rules);
let result = executor
.execute(crate::query::datalog::types::DatalogCommand::Query(query))
.unwrap();
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(
results.len(),
1,
"c should be excluded (blocked), only b passes"
);
}
_ => panic!("Expected QueryResults"),
}
}
#[test]
fn test_execute_query_not_join_basic() {
let storage = FactStorage::new();
let alice = Uuid::new_v4();
let bob = Uuid::new_v4();
let dep1 = Uuid::new_v4();
storage
.transact(
vec![
(alice, ":submitted".to_string(), Value::Boolean(true)),
(alice, ":has-dep".to_string(), Value::Ref(dep1)),
(dep1, ":blocked".to_string(), Value::Boolean(true)),
(bob, ":submitted".to_string(), Value::Boolean(true)),
],
None,
)
.unwrap();
let query = DatalogQuery::new(
vec![FindSpec::Variable("?x".to_string())],
vec![
WhereClause::Pattern(Pattern::new(
EdnValue::Symbol("?x".to_string()),
EdnValue::Keyword(":submitted".to_string()),
EdnValue::Boolean(true),
)),
WhereClause::NotJoin {
join_vars: vec!["?x".to_string()],
clauses: vec![
WhereClause::Pattern(Pattern::new(
EdnValue::Symbol("?x".to_string()),
EdnValue::Keyword(":has-dep".to_string()),
EdnValue::Symbol("?d".to_string()),
)),
WhereClause::Pattern(Pattern::new(
EdnValue::Symbol("?d".to_string()),
EdnValue::Keyword(":blocked".to_string()),
EdnValue::Boolean(true),
)),
],
},
],
);
let executor = DatalogExecutor::new(storage);
let result = executor
.execute(crate::query::datalog::types::DatalogCommand::Query(query))
.unwrap();
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(results.len(), 1, "only bob should be returned");
}
_ => panic!("Expected QueryResults"),
}
}
#[test]
fn test_execute_query_with_rules_not_join_in_query_body() {
let storage = FactStorage::new();
let root = Uuid::new_v4();
let a = Uuid::new_v4();
let b = Uuid::new_v4();
let dep1 = Uuid::new_v4();
storage
.transact(
vec![
(root, ":edge".to_string(), Value::Ref(a)),
(root, ":edge".to_string(), Value::Ref(b)),
(a, ":has-dep".to_string(), Value::Ref(dep1)),
(dep1, ":blocked".to_string(), Value::Boolean(true)),
],
None,
)
.unwrap();
let rules = Arc::new(RwLock::new(RuleRegistry::new()));
{
use crate::query::datalog::types::{Rule, WhereClause as WC};
let rule = Rule {
head: vec![
EdnValue::Symbol("reachable".to_string()),
EdnValue::Symbol("?x".to_string()),
EdnValue::Symbol("?y".to_string()),
],
body: vec![WC::Pattern(Pattern::new(
EdnValue::Symbol("?x".to_string()),
EdnValue::Keyword(":edge".to_string()),
EdnValue::Symbol("?y".to_string()),
))],
};
rules
.write()
.unwrap()
.register_rule("reachable".to_string(), rule)
.unwrap();
}
let query = DatalogQuery::new(
vec![FindSpec::Variable("?y".to_string())],
vec![
WhereClause::RuleInvocation {
predicate: "reachable".to_string(),
args: vec![EdnValue::Uuid(root), EdnValue::Symbol("?y".to_string())],
},
WhereClause::NotJoin {
join_vars: vec!["?y".to_string()],
clauses: vec![
WhereClause::Pattern(Pattern::new(
EdnValue::Symbol("?y".to_string()),
EdnValue::Keyword(":has-dep".to_string()),
EdnValue::Symbol("?d".to_string()),
)),
WhereClause::Pattern(Pattern::new(
EdnValue::Symbol("?d".to_string()),
EdnValue::Keyword(":blocked".to_string()),
EdnValue::Boolean(true),
)),
],
},
],
);
let executor = DatalogExecutor::new_with_rules(storage, rules);
let result = executor
.execute(crate::query::datalog::types::DatalogCommand::Query(query))
.unwrap();
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(results.len(), 1, "only b should pass");
}
_ => panic!("Expected QueryResults"),
}
}
#[test]
fn test_optimizer_does_not_change_query_results() {
let storage = FactStorage::new();
let alice = uuid::Uuid::new_v4();
let bob = uuid::Uuid::new_v4();
storage
.transact(
vec![
(
alice,
":name".to_string(),
Value::String("Alice".to_string()),
),
(alice, ":friend".to_string(), Value::Ref(bob)),
(bob, ":name".to_string(), Value::String("Bob".to_string())),
],
None,
)
.unwrap();
let executor = DatalogExecutor::new(storage);
let result = executor
.execute(
parse_datalog_command("(query [:find ?name :where [?e :name ?name]])").unwrap(),
)
.unwrap();
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(results.len(), 2, "Alice and Bob both have names");
}
_ => panic!("Expected QueryResults"),
}
}
fn binding(pairs: &[(&str, Value)]) -> std::collections::HashMap<String, Value> {
pairs
.iter()
.map(|(k, v)| (k.to_string(), v.clone()))
.collect()
}
#[test]
fn test_apply_aggregation_count_basic() {
let bindings = vec![
binding(&[("?e", Value::Integer(1))]),
binding(&[("?e", Value::Integer(2))]),
binding(&[("?e", Value::Integer(3))]),
];
let find_specs = vec![FindSpec::Aggregate {
func: "count".to_string(),
var: "?e".to_string(),
}];
let results = apply_post_processing(
bindings,
&find_specs,
&[],
&FunctionRegistry::with_builtins(),
)
.unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0][0], Value::Integer(3));
}
#[test]
fn test_apply_aggregation_count_with_grouping() {
let bindings = vec![
binding(&[
("?dept", Value::String("eng".to_string())),
("?e", Value::Integer(1)),
]),
binding(&[
("?dept", Value::String("eng".to_string())),
("?e", Value::Integer(2)),
]),
binding(&[
("?dept", Value::String("hr".to_string())),
("?e", Value::Integer(3)),
]),
];
let find_specs = vec![
FindSpec::Variable("?dept".to_string()),
FindSpec::Aggregate {
func: "count".to_string(),
var: "?e".to_string(),
},
];
let mut results = apply_post_processing(
bindings,
&find_specs,
&[],
&FunctionRegistry::with_builtins(),
)
.unwrap();
results.sort_by_key(|r| match &r[0] {
Value::String(s) => s.clone(),
_ => String::new(),
});
assert_eq!(results.len(), 2);
assert_eq!(
results[0],
vec![Value::String("eng".to_string()), Value::Integer(2)]
);
assert_eq!(
results[1],
vec![Value::String("hr".to_string()), Value::Integer(1)]
);
}
#[test]
fn test_apply_aggregation_count_distinct() {
let bindings = vec![
binding(&[("?v", Value::Integer(1))]),
binding(&[("?v", Value::Integer(1))]), binding(&[("?v", Value::Integer(2))]),
];
let find_specs = vec![FindSpec::Aggregate {
func: "count-distinct".to_string(),
var: "?v".to_string(),
}];
let results = apply_post_processing(
bindings,
&find_specs,
&[],
&FunctionRegistry::with_builtins(),
)
.unwrap();
assert_eq!(results[0][0], Value::Integer(2));
}
#[test]
fn test_apply_aggregation_count_empty_no_grouping_vars() {
let find_specs = vec![FindSpec::Aggregate {
func: "count".to_string(),
var: "?e".to_string(),
}];
let results =
apply_post_processing(vec![], &find_specs, &[], &FunctionRegistry::with_builtins())
.unwrap();
assert_eq!(results.len(), 1, "should return one row with 0");
assert_eq!(results[0][0], Value::Integer(0));
}
#[test]
fn test_apply_aggregation_count_empty_with_grouping_var() {
let find_specs = vec![
FindSpec::Variable("?dept".to_string()),
FindSpec::Aggregate {
func: "count".to_string(),
var: "?e".to_string(),
},
];
let results =
apply_post_processing(vec![], &find_specs, &[], &FunctionRegistry::with_builtins())
.unwrap();
assert_eq!(results.len(), 0, "should return empty set");
}
#[test]
fn test_apply_aggregation_sum_integers() {
let bindings = vec![
binding(&[("?v", Value::Integer(10))]),
binding(&[("?v", Value::Integer(20))]),
binding(&[("?v", Value::Integer(30))]),
];
let find_specs = vec![FindSpec::Aggregate {
func: "sum".to_string(),
var: "?v".to_string(),
}];
let results = apply_post_processing(
bindings,
&find_specs,
&[],
&FunctionRegistry::with_builtins(),
)
.unwrap();
assert_eq!(results[0][0], Value::Integer(60));
}
#[test]
fn test_apply_aggregation_sum_widens_to_float() {
let bindings = vec![
binding(&[("?v", Value::Integer(10))]),
binding(&[("?v", Value::Float(0.5))]),
];
let find_specs = vec![FindSpec::Aggregate {
func: "sum".to_string(),
var: "?v".to_string(),
}];
let results = apply_post_processing(
bindings,
&find_specs,
&[],
&FunctionRegistry::with_builtins(),
)
.unwrap();
assert_eq!(results[0][0], Value::Float(10.5));
}
#[test]
fn test_apply_aggregation_sum_distinct_deduplicates() {
let bindings = vec![
binding(&[("?v", Value::Integer(5))]),
binding(&[("?v", Value::Integer(5))]), binding(&[("?v", Value::Integer(10))]),
];
let find_specs = vec![FindSpec::Aggregate {
func: "sum-distinct".to_string(),
var: "?v".to_string(),
}];
let results = apply_post_processing(
bindings,
&find_specs,
&[],
&FunctionRegistry::with_builtins(),
)
.unwrap();
assert_eq!(results[0][0], Value::Integer(15)); }
#[test]
fn test_apply_aggregation_sum_type_error() {
let bindings = vec![binding(&[("?v", Value::String("bad".to_string()))])];
let find_specs = vec![FindSpec::Aggregate {
func: "sum".to_string(),
var: "?v".to_string(),
}];
let result = apply_post_processing(
bindings,
&find_specs,
&[],
&FunctionRegistry::with_builtins(),
);
assert!(result.is_err(), "sum of string should fail");
}
#[test]
fn test_apply_aggregation_min_integers() {
let bindings = vec![
binding(&[("?v", Value::Integer(30))]),
binding(&[("?v", Value::Integer(10))]),
binding(&[("?v", Value::Integer(20))]),
];
let find_specs = vec![FindSpec::Aggregate {
func: "min".to_string(),
var: "?v".to_string(),
}];
let results = apply_post_processing(
bindings,
&find_specs,
&[],
&FunctionRegistry::with_builtins(),
)
.unwrap();
assert_eq!(results[0][0], Value::Integer(10));
}
#[test]
fn test_apply_aggregation_max_strings() {
let bindings = vec![
binding(&[("?v", Value::String("apple".to_string()))]),
binding(&[("?v", Value::String("zebra".to_string()))]),
binding(&[("?v", Value::String("mango".to_string()))]),
];
let find_specs = vec![FindSpec::Aggregate {
func: "max".to_string(),
var: "?v".to_string(),
}];
let results = apply_post_processing(
bindings,
&find_specs,
&[],
&FunctionRegistry::with_builtins(),
)
.unwrap();
assert_eq!(results[0][0], Value::String("zebra".to_string()));
}
#[test]
fn test_apply_aggregation_min_type_error_boolean() {
let bindings = vec![binding(&[("?v", Value::Boolean(true))])];
let find_specs = vec![FindSpec::Aggregate {
func: "min".to_string(),
var: "?v".to_string(),
}];
let result = apply_post_processing(
bindings,
&find_specs,
&[],
&FunctionRegistry::with_builtins(),
);
assert!(result.is_err(), "min of boolean should fail");
}
#[test]
fn test_apply_aggregation_min_mixed_int_float_error() {
let bindings = vec![
binding(&[("?v", Value::Integer(1))]),
binding(&[("?v", Value::Float(2.0))]),
];
let find_specs = vec![FindSpec::Aggregate {
func: "min".to_string(),
var: "?v".to_string(),
}];
let result = apply_post_processing(
bindings,
&find_specs,
&[],
&FunctionRegistry::with_builtins(),
);
assert!(result.is_err(), "min of mixed Integer/Float should fail");
}
#[test]
fn test_apply_aggregation_skips_nulls_in_sum() {
let bindings = vec![
binding(&[("?v", Value::Integer(10))]),
binding(&[("?v", Value::Null)]),
binding(&[("?v", Value::Integer(20))]),
];
let find_specs = vec![FindSpec::Aggregate {
func: "sum".to_string(),
var: "?v".to_string(),
}];
let results = apply_post_processing(
bindings,
&find_specs,
&[],
&FunctionRegistry::with_builtins(),
)
.unwrap();
assert_eq!(results[0][0], Value::Integer(30));
}
#[test]
fn test_apply_aggregation_skips_nulls_in_count() {
let bindings = vec![
binding(&[("?v", Value::Integer(1))]),
binding(&[("?v", Value::Null)]),
binding(&[("?v", Value::Integer(2))]),
];
let find_specs = vec![FindSpec::Aggregate {
func: "count".to_string(),
var: "?v".to_string(),
}];
let results = apply_post_processing(
bindings,
&find_specs,
&[],
&FunctionRegistry::with_builtins(),
)
.unwrap();
assert_eq!(results[0][0], Value::Integer(2)); }
#[test]
fn test_apply_aggregation_sum_empty_bindings() {
let find_specs = vec![FindSpec::Aggregate {
func: "sum".to_string(),
var: "?v".to_string(),
}];
let results =
apply_post_processing(vec![], &find_specs, &[], &FunctionRegistry::with_builtins())
.unwrap();
assert_eq!(results.len(), 0, "sum on empty should return empty set");
}
#[test]
fn test_apply_aggregation_with_var_grouping() {
let bindings = vec![
binding(&[
("?dept", Value::String("eng".to_string())),
("?salary", Value::Integer(50)),
("?e", Value::Integer(1)),
]),
binding(&[
("?dept", Value::String("eng".to_string())),
("?salary", Value::Integer(50)),
("?e", Value::Integer(2)),
]),
];
let find_specs = vec![
FindSpec::Variable("?dept".to_string()),
FindSpec::Aggregate {
func: "sum".to_string(),
var: "?salary".to_string(),
},
];
let results_no_with = apply_post_processing(
bindings.clone(),
&find_specs,
&[],
&FunctionRegistry::with_builtins(),
)
.unwrap();
assert_eq!(results_no_with.len(), 1);
assert_eq!(results_no_with[0][1], Value::Integer(100));
let results_with = apply_post_processing(
bindings,
&find_specs,
&["?e".to_string()],
&FunctionRegistry::with_builtins(),
)
.unwrap();
assert_eq!(results_with.len(), 2);
assert_eq!(results_with[0][1], Value::Integer(50));
}
#[test]
fn test_filter_facts_for_query_returns_net_asserted_slice() {
use uuid::Uuid;
let storage = FactStorage::new();
let alice = Uuid::new_v4();
storage
.transact(
vec![(
alice,
":person/name".to_string(),
Value::String("Alice".to_string()),
)],
None,
)
.unwrap();
storage
.retract(vec![(
alice,
":person/name".to_string(),
Value::String("Alice".to_string()),
)])
.unwrap();
storage
.transact(
vec![(alice, ":person/age".to_string(), Value::Integer(30))],
None,
)
.unwrap();
let executor = DatalogExecutor::new(storage);
let query = DatalogQuery {
find: vec![],
where_clauses: vec![],
as_of: None,
valid_at: Some(ValidAt::AnyValidTime),
with_vars: vec![],
};
let facts = executor.filter_facts_for_query(&query).unwrap();
assert_eq!(facts.len(), 1, "expected exactly 1 net-asserted fact");
assert_eq!(facts[0].attribute, ":person/age");
}
#[test]
fn test_filter_facts_for_query_valid_time_filter() {
use crate::graph::types::TransactOptions;
use uuid::Uuid;
let storage = FactStorage::new();
let alice = Uuid::new_v4();
storage
.transact(
vec![(
alice,
":employment/status".to_string(),
Value::String("active".to_string()),
)],
Some(TransactOptions::new(Some(1000_i64), Some(2000_i64))),
)
.unwrap();
storage
.transact(
vec![(
alice,
":person/name".to_string(),
Value::String("Alice".to_string()),
)],
Some(TransactOptions::new(Some(0_i64), None)),
)
.unwrap();
let executor = DatalogExecutor::new(storage);
let query_inside = DatalogQuery {
find: vec![],
where_clauses: vec![],
as_of: None,
valid_at: Some(ValidAt::Timestamp(1500_i64)),
with_vars: vec![],
};
let facts_inside = executor.filter_facts_for_query(&query_inside).unwrap();
assert_eq!(facts_inside.len(), 2, "both facts visible at t=1500");
let query_outside = DatalogQuery {
find: vec![],
where_clauses: vec![],
as_of: None,
valid_at: Some(ValidAt::Timestamp(3000_i64)),
with_vars: vec![],
};
let facts_outside = executor.filter_facts_for_query(&query_outside).unwrap();
assert_eq!(
facts_outside.len(),
1,
"only open-ended fact visible at t=3000"
);
assert_eq!(facts_outside[0].attribute, ":person/name");
}
}
#[cfg(test)]
mod expr_eval_tests {
use super::*;
use crate::graph::types::Value;
use crate::query::datalog::parser::parse_datalog_command;
use crate::query::datalog::types::{BinOp, Expr, UnaryOp, WhereClause};
use std::collections::HashMap;
use std::sync::Arc;
use uuid::Uuid;
fn b(pairs: &[(&str, Value)]) -> HashMap<String, Value> {
pairs
.iter()
.map(|(k, v)| (k.to_string(), v.clone()))
.collect()
}
#[test]
fn test_eval_lit() {
let e = Expr::Lit(Value::Integer(42));
assert_eq!(eval_expr(&e, &HashMap::new(), None), Ok(Value::Integer(42)));
}
#[test]
fn test_eval_var_bound() {
let e = Expr::Var("?x".to_string());
let binding = b(&[("?x", Value::Integer(10))]);
assert_eq!(eval_expr(&e, &binding, None), Ok(Value::Integer(10)));
}
#[test]
fn test_eval_var_unbound_is_err() {
let e = Expr::Var("?x".to_string());
assert_eq!(eval_expr(&e, &HashMap::new(), None), Err(()));
}
#[test]
fn test_eval_lt_true() {
let e = Expr::BinOp(
BinOp::Lt,
Box::new(Expr::Var("?v".to_string())),
Box::new(Expr::Lit(Value::Integer(100))),
);
let binding = b(&[("?v", Value::Integer(50))]);
assert_eq!(eval_expr(&e, &binding, None), Ok(Value::Boolean(true)));
}
#[test]
fn test_eval_lt_false() {
let e = Expr::BinOp(
BinOp::Lt,
Box::new(Expr::Var("?v".to_string())),
Box::new(Expr::Lit(Value::Integer(100))),
);
let binding = b(&[("?v", Value::Integer(150))]);
assert_eq!(eval_expr(&e, &binding, None), Ok(Value::Boolean(false)));
}
#[test]
fn test_eval_add_integers() {
let e = Expr::BinOp(
BinOp::Add,
Box::new(Expr::Var("?a".to_string())),
Box::new(Expr::Var("?b".to_string())),
);
let binding = b(&[("?a", Value::Integer(3)), ("?b", Value::Integer(4))]);
assert_eq!(eval_expr(&e, &binding, None), Ok(Value::Integer(7)));
}
#[test]
fn test_eval_add_int_float_promotes() {
let e = Expr::BinOp(
BinOp::Add,
Box::new(Expr::Lit(Value::Integer(1))),
Box::new(Expr::Lit(Value::Float(1.5))),
);
assert_eq!(eval_expr(&e, &HashMap::new(), None), Ok(Value::Float(2.5)));
}
#[test]
fn test_eval_div_integer_truncates() {
let e = Expr::BinOp(
BinOp::Div,
Box::new(Expr::Lit(Value::Integer(5))),
Box::new(Expr::Lit(Value::Integer(2))),
);
assert_eq!(eval_expr(&e, &HashMap::new(), None), Ok(Value::Integer(2)));
}
#[test]
fn test_eval_div_by_zero_is_err() {
let e = Expr::BinOp(
BinOp::Div,
Box::new(Expr::Lit(Value::Integer(5))),
Box::new(Expr::Lit(Value::Integer(0))),
);
assert_eq!(eval_expr(&e, &HashMap::new(), None), Err(()));
}
#[test]
fn test_eval_eq_strings() {
let e = Expr::BinOp(
BinOp::Eq,
Box::new(Expr::Lit(Value::String("Alice".to_string()))),
Box::new(Expr::Lit(Value::String("Alice".to_string()))),
);
assert_eq!(
eval_expr(&e, &HashMap::new(), None),
Ok(Value::Boolean(true))
);
}
#[test]
fn test_eval_eq_int_float_false() {
let e = Expr::BinOp(
BinOp::Eq,
Box::new(Expr::Lit(Value::Integer(1))),
Box::new(Expr::Lit(Value::Float(1.0))),
);
assert_eq!(
eval_expr(&e, &HashMap::new(), None),
Ok(Value::Boolean(false))
);
}
#[test]
fn test_eval_type_mismatch_comparison_is_err() {
let e = Expr::BinOp(
BinOp::Lt,
Box::new(Expr::Lit(Value::String("hello".to_string()))),
Box::new(Expr::Lit(Value::Integer(100))),
);
assert_eq!(eval_expr(&e, &HashMap::new(), None), Err(()));
}
#[test]
fn test_eval_string_q_true() {
let e = Expr::UnaryOp(
UnaryOp::StringQ,
Box::new(Expr::Lit(Value::String("hi".to_string()))),
);
assert_eq!(
eval_expr(&e, &HashMap::new(), None),
Ok(Value::Boolean(true))
);
}
#[test]
fn test_eval_string_q_false() {
let e = Expr::UnaryOp(UnaryOp::StringQ, Box::new(Expr::Lit(Value::Integer(1))));
assert_eq!(
eval_expr(&e, &HashMap::new(), None),
Ok(Value::Boolean(false))
);
}
#[test]
fn test_eval_starts_with_true() {
let e = Expr::BinOp(
BinOp::StartsWith,
Box::new(Expr::Lit(Value::String("foobar".to_string()))),
Box::new(Expr::Lit(Value::String("foo".to_string()))),
);
assert_eq!(
eval_expr(&e, &HashMap::new(), None),
Ok(Value::Boolean(true))
);
}
#[test]
fn test_eval_ends_with_true() {
let e = Expr::BinOp(
BinOp::EndsWith,
Box::new(Expr::Lit(Value::String("foobar".to_string()))),
Box::new(Expr::Lit(Value::String("bar".to_string()))),
);
assert_eq!(
eval_expr(&e, &HashMap::new(), None),
Ok(Value::Boolean(true))
);
}
#[test]
fn test_eval_contains_true() {
let e = Expr::BinOp(
BinOp::Contains,
Box::new(Expr::Lit(Value::String("engineer at co".to_string()))),
Box::new(Expr::Lit(Value::String("engineer".to_string()))),
);
assert_eq!(
eval_expr(&e, &HashMap::new(), None),
Ok(Value::Boolean(true))
);
}
#[test]
fn test_eval_matches_true() {
let re = regex_lite::Regex::new("^[^@]+@[^@]+$").unwrap();
let e = Expr::BinOp(
BinOp::Matches {
regex: re,
pattern: "^[^@]+@[^@]+$".to_string(),
},
Box::new(Expr::Lit(Value::String("test@example.com".to_string()))),
Box::new(Expr::Lit(Value::String("^[^@]+@[^@]+$".to_string()))),
);
assert_eq!(
eval_expr(&e, &HashMap::new(), None),
Ok(Value::Boolean(true))
);
}
#[test]
fn test_is_truthy() {
assert!(is_truthy(&Value::Boolean(true)));
assert!(!is_truthy(&Value::Boolean(false)));
assert!(is_truthy(&Value::Integer(1)));
assert!(!is_truthy(&Value::Integer(0)));
assert!(is_truthy(&Value::Float(0.1)));
assert!(!is_truthy(&Value::Float(0.0)));
assert!(!is_truthy(&Value::Null));
assert!(!is_truthy(&Value::String("hi".to_string())));
}
#[test]
fn test_apply_expr_filter_keeps_truthy() {
use crate::query::datalog::types::WhereClause;
let expr = Expr::BinOp(
BinOp::Lt,
Box::new(Expr::Var("?v".to_string())),
Box::new(Expr::Lit(Value::Integer(100))),
);
let clauses = vec![WhereClause::Expr {
expr,
binding: None,
}];
let bindings = vec![
b(&[("?v", Value::Integer(50))]),
b(&[("?v", Value::Integer(150))]),
];
let result =
apply_expr_clauses(bindings, &clauses, &FunctionRegistry::with_builtins()).unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].get("?v"), Some(&Value::Integer(50)));
}
#[test]
fn test_apply_expr_binding_extends_row() {
use crate::query::datalog::types::WhereClause;
let expr = Expr::BinOp(
BinOp::Add,
Box::new(Expr::Var("?a".to_string())),
Box::new(Expr::Var("?b".to_string())),
);
let clauses = vec![WhereClause::Expr {
expr,
binding: Some("?sum".to_string()),
}];
let bindings = vec![b(&[("?a", Value::Integer(3)), ("?b", Value::Integer(4))])];
let result =
apply_expr_clauses(bindings, &clauses, &FunctionRegistry::with_builtins()).unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].get("?sum"), Some(&Value::Integer(7)));
}
#[test]
fn test_apply_expr_type_mismatch_drops_row() {
use crate::query::datalog::types::WhereClause;
let expr = Expr::BinOp(
BinOp::Lt,
Box::new(Expr::Var("?v".to_string())),
Box::new(Expr::Lit(Value::Integer(100))),
);
let clauses = vec![WhereClause::Expr {
expr,
binding: None,
}];
let bindings = vec![b(&[("?v", Value::String("hello".to_string()))])];
let result =
apply_expr_clauses(bindings, &clauses, &FunctionRegistry::with_builtins()).unwrap();
assert_eq!(result.len(), 0);
}
#[test]
fn test_execute_expr_filter_lt() {
use crate::graph::storage::FactStorage;
use crate::query::datalog::rules::RuleRegistry;
use std::sync::{Arc, RwLock};
let storage = FactStorage::new();
let rules = Arc::new(RwLock::new(RuleRegistry::new()));
let executor = DatalogExecutor::new_with_rules(storage.clone(), rules);
executor
.execute(
crate::query::datalog::parser::parse_datalog_command(
"(transact [[:item1 :item/price 50] [:item2 :item/price 150]])",
)
.unwrap(),
)
.unwrap();
let result = executor.execute(
crate::query::datalog::parser::parse_datalog_command(
"(query [:find ?e :where [?e :item/price ?p] [(< ?p 100)]])",
)
.unwrap(),
);
assert!(result.is_ok(), "expr filter query failed");
match result.unwrap() {
QueryResult::QueryResults { results, .. } => {
assert_eq!(results.len(), 1, "expected exactly one result");
}
_ => panic!("expected QueryResults"),
}
}
#[test]
fn test_apply_or_clauses_union_from_two_branches() {
use uuid::Uuid;
let storage = FactStorage::new();
let e1 = Uuid::new_v4();
let e2 = Uuid::new_v4();
storage
.transact(
vec![
(e1, ":color".to_string(), Value::Keyword(":red".to_string())),
(
e2,
":color".to_string(),
Value::Keyword(":blue".to_string()),
),
],
None,
)
.unwrap();
let executor = DatalogExecutor::new(storage.clone());
let cmd = crate::query::datalog::parser::parse_datalog_command(
r#"(query [:find ?e
:where (or [?e :color :red] [?e :color :blue])])"#,
)
.unwrap();
let result = executor.execute(cmd).unwrap();
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(results.len(), 2, "both entities should match via or");
}
_ => panic!("expected QueryResults"),
}
}
#[test]
fn test_apply_or_clauses_deduplication() {
use uuid::Uuid;
let storage = FactStorage::new();
let e1 = Uuid::new_v4();
storage
.transact(
vec![
(e1, ":color".to_string(), Value::Keyword(":red".to_string())),
(
e1,
":shape".to_string(),
Value::Keyword(":circle".to_string()),
),
],
None,
)
.unwrap();
let executor = DatalogExecutor::new(storage.clone());
let cmd = crate::query::datalog::parser::parse_datalog_command(
r#"(query [:find ?e
:where (or [?e :color :red] [?e :shape :circle])])"#,
)
.unwrap();
let result = executor.execute(cmd).unwrap();
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(
results.len(),
1,
"one entity matched by both branches → deduplicated"
);
}
_ => panic!("expected QueryResults"),
}
}
#[test]
fn execute_transact_non_keyword_attribute_error() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage);
let cmd = DatalogCommand::Transact(Transaction {
facts: vec![Pattern::new(
EdnValue::Keyword(":e".to_string()),
EdnValue::String("not-a-keyword".to_string()),
EdnValue::String("value".to_string()),
)],
valid_from: None,
valid_to: None,
});
let r = executor.execute(cmd);
assert!(r.is_err(), "non-keyword attribute in transact must fail");
}
#[test]
fn execute_retract_non_keyword_attribute_error() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage);
let cmd = DatalogCommand::Retract(Transaction {
facts: vec![Pattern::new(
EdnValue::Keyword(":e".to_string()),
EdnValue::Integer(42),
EdnValue::String("value".to_string()),
)],
valid_from: None,
valid_to: None,
});
let r = executor.execute(cmd);
assert!(r.is_err(), "non-keyword attribute in retract must fail");
}
#[test]
fn execute_transact_pseudo_attr_error() {
use crate::query::datalog::types::{PseudoAttr, Transaction};
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage);
let cmd = DatalogCommand::Transact(Transaction {
facts: vec![Pattern::pseudo(
EdnValue::Keyword(":e".to_string()),
PseudoAttr::ValidFrom,
EdnValue::Integer(0),
)],
valid_from: None,
valid_to: None,
});
let r = executor.execute(cmd);
assert!(r.is_err(), "transacting a pseudo-attribute must fail");
}
#[test]
fn execute_retract_pseudo_attr_error() {
use crate::query::datalog::types::{PseudoAttr, Transaction};
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage);
let cmd = DatalogCommand::Retract(Transaction {
facts: vec![Pattern::pseudo(
EdnValue::Keyword(":e".to_string()),
PseudoAttr::TxCount,
EdnValue::Integer(0),
)],
valid_from: None,
valid_to: None,
});
let r = executor.execute(cmd);
assert!(r.is_err(), "retracting a pseudo-attribute must fail");
}
#[test]
fn execute_rule_empty_head_error() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage);
let cmd = DatalogCommand::Rule(Rule {
head: vec![],
body: vec![WhereClause::Pattern(Pattern::new(
EdnValue::Symbol("?x".to_string()),
EdnValue::Keyword(":a".to_string()),
EdnValue::Symbol("?v".to_string()),
))],
});
let r = executor.execute(cmd);
assert!(r.is_err(), "rule with empty head must fail");
}
#[test]
fn execute_rule_non_symbol_head_error() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage);
let cmd = DatalogCommand::Rule(Rule {
head: vec![EdnValue::Integer(99)], body: vec![WhereClause::Pattern(Pattern::new(
EdnValue::Symbol("?x".to_string()),
EdnValue::Keyword(":a".to_string()),
EdnValue::Symbol("?v".to_string()),
))],
});
let r = executor.execute(cmd);
assert!(r.is_err(), "rule head starting with non-symbol must fail");
}
#[test]
fn test_eval_float_div_by_zero_is_err() {
let e = Expr::BinOp(
BinOp::Div,
Box::new(Expr::Lit(Value::Float(5.0))),
Box::new(Expr::Lit(Value::Float(0.0))),
);
assert_eq!(eval_expr(&e, &HashMap::new(), None), Err(()));
}
#[test]
fn test_eval_float_div_succeeds() {
let e = Expr::BinOp(
BinOp::Div,
Box::new(Expr::Lit(Value::Float(6.0))),
Box::new(Expr::Lit(Value::Float(2.0))),
);
assert_eq!(eval_expr(&e, &HashMap::new(), None), Ok(Value::Float(3.0)));
}
#[test]
fn test_eval_float_sub() {
let e = Expr::BinOp(
BinOp::Sub,
Box::new(Expr::Lit(Value::Float(5.0))),
Box::new(Expr::Lit(Value::Float(2.0))),
);
assert_eq!(eval_expr(&e, &HashMap::new(), None), Ok(Value::Float(3.0)));
}
#[test]
fn test_eval_float_mul() {
let e = Expr::BinOp(
BinOp::Mul,
Box::new(Expr::Lit(Value::Float(3.0))),
Box::new(Expr::Lit(Value::Float(4.0))),
);
assert_eq!(eval_expr(&e, &HashMap::new(), None), Ok(Value::Float(12.0)));
}
#[test]
fn test_agg_count_empty_bindings_returns_zero() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage);
let cmd = parse_datalog_command("(query [:find (count ?x) :where [?x :no-such-attr _]])")
.expect("parse failed");
let result = executor.execute(cmd).expect("query failed");
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(results.len(), 1, "should return one row with count 0");
assert_eq!(results[0][0], crate::graph::types::Value::Integer(0));
}
_ => panic!("expected QueryResults"),
}
}
#[test]
fn test_agg_sum_empty_no_grouping_returns_zero() {
let storage = FactStorage::new();
storage
.transact(
vec![(
Uuid::new_v4(),
":item/price".to_string(),
crate::graph::types::Value::Integer(50),
)],
None,
)
.unwrap();
let executor = DatalogExecutor::new(storage);
let cmd = parse_datalog_command("(query [:find (sum ?v) :where [?x :no-such-attr ?v]])")
.expect("parse failed");
let result = executor.execute(cmd).expect("query failed");
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(results.len(), 0, "empty bindings with sum returns no rows");
}
_ => panic!("expected QueryResults"),
}
}
#[test]
fn test_agg_sum_distinct_float_values() {
let storage = FactStorage::new();
let e1 = Uuid::new_v4();
let e2 = Uuid::new_v4();
storage
.transact(
vec![
(
e1,
":item/weight".to_string(),
crate::graph::types::Value::Float(1.5),
),
(
e2,
":item/weight".to_string(),
crate::graph::types::Value::Float(1.5),
),
],
None,
)
.unwrap();
let executor = DatalogExecutor::new(storage);
let cmd =
parse_datalog_command("(query [:find (sum-distinct ?w) :where [?e :item/weight ?w]])")
.expect("parse failed");
let result = executor.execute(cmd).expect("query failed");
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(results.len(), 1, "expected one result row");
assert_eq!(
results[0][0],
crate::graph::types::Value::Float(1.5),
"sum-distinct of [1.5, 1.5] should be 1.5"
);
}
_ => panic!("expected QueryResults"),
}
}
#[test]
fn test_agg_min_max_on_all_null_group_skips_row() {
let storage = FactStorage::new();
let e1 = Uuid::new_v4();
storage
.transact(
vec![(
e1,
":item/score".to_string(),
crate::graph::types::Value::Null,
)],
None,
)
.unwrap();
let executor = DatalogExecutor::new(storage);
let cmd = parse_datalog_command("(query [:find (min ?s) :where [?e :item/score ?s]])")
.expect("parse failed");
let result = executor.execute(cmd).expect("query failed");
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(
results.len(),
0,
"min on all-null group should produce 0 rows"
);
}
_ => panic!("expected QueryResults"),
}
}
#[test]
fn test_agg_min_on_strings() {
let storage = FactStorage::new();
let e1 = Uuid::new_v4();
let e2 = Uuid::new_v4();
storage
.transact(
vec![
(
e1,
":item/name".to_string(),
crate::graph::types::Value::String("banana".to_string()),
),
(
e2,
":item/name".to_string(),
crate::graph::types::Value::String("apple".to_string()),
),
],
None,
)
.unwrap();
let executor = DatalogExecutor::new(storage);
let cmd = parse_datalog_command("(query [:find (min ?n) :where [?e :item/name ?n]])")
.expect("parse failed");
let result = executor.execute(cmd).expect("query failed");
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(results.len(), 1, "expected one result");
assert_eq!(
results[0][0],
crate::graph::types::Value::String("apple".to_string()),
"min of strings should return lexicographically smallest"
);
}
_ => panic!("expected QueryResults"),
}
}
#[test]
fn test_agg_max_on_floats() {
let storage = FactStorage::new();
let e1 = Uuid::new_v4();
let e2 = Uuid::new_v4();
storage
.transact(
vec![
(
e1,
":item/score".to_string(),
crate::graph::types::Value::Float(3.5),
),
(
e2,
":item/score".to_string(),
crate::graph::types::Value::Float(2.5),
),
],
None,
)
.unwrap();
let executor = DatalogExecutor::new(storage);
let cmd = parse_datalog_command("(query [:find (max ?s) :where [?e :item/score ?s]])")
.expect("parse failed");
let result = executor.execute(cmd).expect("query failed");
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(results.len(), 1, "expected one result");
assert_eq!(
results[0][0],
crate::graph::types::Value::Float(3.5),
"max of floats should return largest"
);
}
_ => panic!("expected QueryResults"),
}
}
#[test]
fn test_evaluate_branch_with_timestamp_valid_at() {
use crate::query::datalog::types::ValidAt;
let storage = FactStorage::new();
let e1 = Uuid::new_v4();
storage
.transact(
vec![(
e1,
":tag".to_string(),
crate::graph::types::Value::Integer(1),
)],
None,
)
.unwrap();
let facts: Arc<[crate::graph::types::Fact]> =
Arc::from(storage.get_asserted_facts().unwrap().as_slice());
let rules = crate::query::datalog::rules::RuleRegistry::new();
let branch = vec![WhereClause::Pattern(Pattern::new(
EdnValue::Symbol("?e".to_string()),
EdnValue::Keyword(":tag".to_string()),
EdnValue::Symbol("?v".to_string()),
))];
let mut initial = std::collections::HashMap::new();
initial.insert("?seed".to_string(), crate::graph::types::Value::Integer(0));
let ts_result = evaluate_branch(
&branch,
vec![initial.clone()],
facts.clone(),
&rules,
None,
Some(ValidAt::Timestamp(crate::graph::types::tx_id_now() as i64)),
&FunctionRegistry::with_builtins(),
)
.unwrap();
assert_eq!(ts_result.len(), 1, "timestamp valid_at should match");
let any_result = evaluate_branch(
&branch,
vec![initial],
facts,
&rules,
None,
Some(ValidAt::AnyValidTime),
&FunctionRegistry::with_builtins(),
)
.unwrap();
assert_eq!(any_result.len(), 1, "any_valid_time should match");
}
#[test]
fn test_execute_query_with_rules_valid_at_timestamp() {
use crate::query::datalog::parser::parse_datalog_command;
use crate::query::datalog::types::ValidAt;
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage);
let rule_cmd = parse_datalog_command(r#"(rule [(tagged ?e) [?e :item/tag ?v]])"#)
.expect("rule parse failed");
executor.execute(rule_cmd).expect("rule register failed");
executor
.execute(
parse_datalog_command(r#"(transact [[:item1 :item/tag "x"]])"#)
.expect("transact parse failed"),
)
.expect("transact failed");
let q_ts = crate::query::datalog::types::DatalogQuery {
find: vec![crate::query::datalog::types::FindSpec::Variable(
"?e".to_string(),
)],
where_clauses: vec![],
as_of: None,
valid_at: Some(ValidAt::Timestamp(946684800000)), with_vars: vec![],
};
let r_ts = executor.execute_query_with_rules(q_ts);
assert!(
r_ts.is_ok(),
"execute_query_with_rules with Timestamp must not error"
);
let q_any = crate::query::datalog::types::DatalogQuery {
find: vec![crate::query::datalog::types::FindSpec::Variable(
"?e".to_string(),
)],
where_clauses: vec![],
as_of: None,
valid_at: Some(ValidAt::AnyValidTime),
with_vars: vec![],
};
let r_any = executor.execute_query_with_rules(q_any);
assert!(
r_any.is_ok(),
"execute_query_with_rules with AnyValidTime must not error"
);
let err_cmd = parse_datalog_command(
"(query [:find ?e ?vf :where (tagged ?e) [?e :db/valid-from ?vf]])",
)
.expect("err query parse failed");
let err_result = executor.execute(err_cmd);
assert!(
err_result.is_err(),
"per-fact pseudo-attr without :any-valid-time in rules query must fail"
);
}
#[test]
fn test_evaluate_branch_empty_incoming_returns_empty() {
let storage = FactStorage::new();
storage
.transact(
vec![(
Uuid::new_v4(),
":a".to_string(),
crate::graph::types::Value::Integer(1),
)],
None,
)
.unwrap();
let facts: Arc<[crate::graph::types::Fact]> =
Arc::from(storage.get_asserted_facts().unwrap().as_slice());
let rules = crate::query::datalog::rules::RuleRegistry::new();
let branch = vec![WhereClause::Pattern(Pattern::new(
EdnValue::Symbol("?x".to_string()),
EdnValue::Keyword(":a".to_string()),
EdnValue::Symbol("?v".to_string()),
))];
let result = evaluate_branch(
&branch,
vec![],
facts,
&rules,
None,
None,
&FunctionRegistry::with_builtins(),
)
.unwrap();
assert_eq!(result.len(), 0, "empty incoming should return empty");
}
#[test]
fn test_evaluate_branch_no_match_patterns_empty_bindings_returns_empty() {
let storage = FactStorage::new();
let facts: Arc<[crate::graph::types::Fact]> =
Arc::from(storage.get_asserted_facts().unwrap().as_slice());
let rules = crate::query::datalog::rules::RuleRegistry::new();
let branch = vec![WhereClause::Pattern(Pattern::new(
EdnValue::Symbol("?x".to_string()),
EdnValue::Keyword(":no-such-attr".to_string()),
EdnValue::Symbol("?v".to_string()),
))];
let mut initial = std::collections::HashMap::new();
initial.insert("?init".to_string(), crate::graph::types::Value::Integer(1));
let result = evaluate_branch(
&branch,
vec![initial],
facts,
&rules,
None,
None,
&FunctionRegistry::with_builtins(),
)
.unwrap();
assert_eq!(
result.len(),
0,
"no matching facts should return empty bindings"
);
}
#[test]
fn test_evaluate_branch_not_filter_excludes_matching() {
let storage = FactStorage::new();
let e1 = Uuid::new_v4();
let e2 = Uuid::new_v4();
storage
.transact(
vec![
(
e1,
":color".to_string(),
crate::graph::types::Value::Keyword(":red".to_string()),
),
(
e2,
":color".to_string(),
crate::graph::types::Value::Keyword(":blue".to_string()),
),
(
e1,
":flagged".to_string(),
crate::graph::types::Value::Boolean(true),
),
],
None,
)
.unwrap();
let executor = DatalogExecutor::new(storage);
let cmd = parse_datalog_command(
"(query [:find ?e :where [?e :color ?c] (not-join [?e] [?e :flagged ?fv])])",
)
.expect("parse failed");
let result = executor.execute(cmd).expect("query failed");
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(results.len(), 1, "only e2 (non-flagged) should match");
}
_ => panic!("expected QueryResults"),
}
}
#[test]
fn test_or_join_deduplication() {
let storage = FactStorage::new();
let e1 = Uuid::new_v4();
storage
.transact(
vec![
(
e1,
":color".to_string(),
crate::graph::types::Value::Keyword(":red".to_string()),
),
(
e1,
":shape".to_string(),
crate::graph::types::Value::Keyword(":circle".to_string()),
),
],
None,
)
.unwrap();
let executor = DatalogExecutor::new(storage);
let cmd = parse_datalog_command(
"(query [:find ?e :where [?e :color ?c] (or-join [?e] [?e :color ?c2] [?e :shape ?s])])",
)
.expect("parse failed");
let result = executor.execute(cmd).expect("query failed");
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(
results.len(),
1,
"e1 should appear once despite two matching branches"
);
}
_ => panic!("expected QueryResults"),
}
}
#[test]
fn test_transact_with_tx_level_valid_time() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage);
let cmd = parse_datalog_command(
r#"(transact {:valid-from "2020-01-01T00:00:00Z" :valid-to "2025-01-01T00:00:00Z"} [[:alice :person/name "Alice"]])"#,
)
.expect("parse with tx-level valid-time should succeed");
let result = executor.execute(cmd);
assert!(
result.is_ok(),
"transact with tx-level valid-time should succeed"
);
}
#[test]
fn test_transact_with_per_fact_valid_time() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage);
let cmd = parse_datalog_command(
r#"(transact [[:alice :person/name "Alice" {:valid-from "2020-01-01T00:00:00Z"}]])"#,
)
.expect("parse with per-fact valid-time should succeed");
let result = executor.execute(cmd);
assert!(
result.is_ok(),
"transact with per-fact valid-time should succeed"
);
}
#[test]
fn test_transact_with_valid_to_only_at_tx_level() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage);
let cmd = parse_datalog_command(
r#"(transact {:valid-to "2025-01-01T00:00:00Z"} [[:alice :person/name "Alice"]])"#,
)
.expect("parse with tx-level valid-to only should succeed");
let result = executor.execute(cmd);
assert!(
result.is_ok(),
"transact with valid-to only at tx level should succeed"
);
}
#[test]
fn test_transact_with_valid_to_only_per_fact() {
let storage = FactStorage::new();
let executor = DatalogExecutor::new(storage);
let cmd = parse_datalog_command(
r#"(transact [[:alice :person/name "Alice" {:valid-to "2025-01-01T00:00:00Z"}]])"#,
)
.expect("parse with per-fact valid-to only should succeed");
let result = executor.execute(cmd);
assert!(
result.is_ok(),
"transact with valid-to only per fact should succeed"
);
}
#[test]
fn test_evaluate_branch_empty_patterns_passes_incoming_through() {
let storage = FactStorage::new();
let e1 = Uuid::new_v4();
storage
.transact(
vec![(
e1,
":a".to_string(),
crate::graph::types::Value::Integer(10),
)],
None,
)
.unwrap();
let facts: Arc<[crate::graph::types::Fact]> =
Arc::from(storage.get_asserted_facts().unwrap().as_slice());
let rules = crate::query::datalog::rules::RuleRegistry::new();
let branch = vec![WhereClause::Expr {
expr: crate::query::datalog::types::Expr::Lit(crate::graph::types::Value::Boolean(
true,
)),
binding: None,
}];
let mut initial = std::collections::HashMap::new();
initial.insert("?x".to_string(), crate::graph::types::Value::Integer(42));
let result = evaluate_branch(
&branch,
vec![initial],
facts,
&rules,
None,
None,
&FunctionRegistry::with_builtins(),
)
.unwrap();
assert_eq!(
result.len(),
1,
"expr-only branch should pass binding through"
);
}
#[test]
fn test_evaluate_branch_or_clause_produces_empty_bindings() {
let storage = FactStorage::new();
let facts: Arc<[crate::graph::types::Fact]> =
Arc::from(storage.get_asserted_facts().unwrap().as_slice());
let rules = crate::query::datalog::rules::RuleRegistry::new();
let or_branch = vec![WhereClause::Pattern(Pattern::new(
EdnValue::Symbol("?x".to_string()),
EdnValue::Keyword(":no-attr".to_string()),
EdnValue::Symbol("?v".to_string()),
))];
let branch = vec![WhereClause::Or(vec![or_branch])];
let mut initial = std::collections::HashMap::new();
initial.insert("?seed".to_string(), crate::graph::types::Value::Integer(1));
let result = evaluate_branch(
&branch,
vec![initial],
facts,
&rules,
None,
None,
&FunctionRegistry::with_builtins(),
)
.unwrap();
assert_eq!(
result.len(),
0,
"or clause with no matches should yield empty"
);
}
#[test]
fn test_evaluate_branch_not_join_excludes_matching() {
let storage = FactStorage::new();
let e1 = Uuid::new_v4();
let e2 = Uuid::new_v4();
storage
.transact(
vec![
(
e1,
":status".to_string(),
crate::graph::types::Value::Keyword(":active".to_string()),
),
(
e2,
":status".to_string(),
crate::graph::types::Value::Keyword(":inactive".to_string()),
),
(
e2,
":blocked".to_string(),
crate::graph::types::Value::Boolean(true),
),
],
None,
)
.unwrap();
let executor = DatalogExecutor::new(storage);
let cmd = parse_datalog_command(
"(query [:find ?e :where [?e :status ?s] (not-join [?e] [?e :blocked ?b])])",
)
.expect("parse failed");
let result = executor.execute(cmd).expect("query failed");
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(
results.len(),
1,
"only non-blocked entity should be returned"
);
}
_ => panic!("expected QueryResults"),
}
}
#[test]
fn test_not_body_expr_only_filters_binding() {
let storage = FactStorage::new();
let e1 = Uuid::new_v4();
let e2 = Uuid::new_v4();
storage
.transact(
vec![
(
e1,
":item/price".to_string(),
crate::graph::types::Value::Integer(200),
),
(
e2,
":item/price".to_string(),
crate::graph::types::Value::Integer(50),
),
],
None,
)
.unwrap();
let executor = DatalogExecutor::new(storage);
let cmd = parse_datalog_command(
"(query [:find ?e :where [?e :item/price ?p] (not [(> ?p 100)])])",
)
.expect("parse failed");
let result = executor.execute(cmd).expect("query failed");
match result {
QueryResult::QueryResults { results, .. } => {
assert_eq!(
results.len(),
1,
"only the item with price 50 should survive"
);
}
_ => panic!("expected QueryResults"),
}
}
#[test]
fn window_sum_resets_per_partition() {
use super::super::functions::FunctionRegistry;
use super::super::types::{Order, WindowFunc, WindowSpec};
let mut bindings: Vec<std::collections::HashMap<String, Value>> = vec![
[
("dept".into(), Value::String("A".into())),
("salary".into(), Value::Integer(10)),
]
.into_iter()
.collect(),
[
("dept".into(), Value::String("A".into())),
("salary".into(), Value::Integer(20)),
]
.into_iter()
.collect(),
[
("dept".into(), Value::String("B".into())),
("salary".into(), Value::Integer(100)),
]
.into_iter()
.collect(),
];
let find_specs = vec![
FindSpec::Variable("dept".into()),
FindSpec::Variable("salary".into()),
FindSpec::Window(WindowSpec {
func: WindowFunc::Sum,
var: Some("salary".into()),
partition_by: Some("dept".into()),
order_by: "salary".into(),
order: Order::Asc,
}),
];
let registry = FunctionRegistry::with_builtins();
apply_window_functions(&mut bindings, &find_specs, ®istry).expect("window");
let row_a10 = bindings
.iter()
.find(|b| b.get("salary") == Some(&Value::Integer(10)))
.unwrap();
assert_eq!(row_a10.get("__win_2"), Some(&Value::Integer(10)));
let row_a20 = bindings
.iter()
.find(|b| b.get("salary") == Some(&Value::Integer(20)))
.unwrap();
assert_eq!(row_a20.get("__win_2"), Some(&Value::Integer(30)));
let row_b100 = bindings
.iter()
.find(|b| b.get("salary") == Some(&Value::Integer(100)))
.unwrap();
assert_eq!(row_b100.get("__win_2"), Some(&Value::Integer(100)));
}
}