1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
// SPDX-License-Identifier: Apache-2.0
// Copyright 2024-2026 Dragonscale Team
//! Goal-directed QUERY evaluation via SLG resolution.
//!
//! Ported from `uni-locy/src/orchestrator/query.rs`. Uses `DerivedFactSource`
//! instead of `CypherExecutor`.
use std::collections::HashMap;
use std::time::Instant;
use uni_common::Value;
use uni_cypher::ast::{CypherLiteral, Expr, ReturnItem};
use uni_cypher::locy_ast::GoalQuery;
use uni_locy::{CompiledProgram, FactRow, LocyConfig, LocyError, LocyStats};
use super::locy_delta::RowStore;
use super::locy_eval::{eval_expr, value_cmp};
use super::locy_slg::{SLGResolver, extract_goal_bindings};
use super::locy_traits::DerivedFactSource;
/// Entry point for goal-directed QUERY evaluation.
///
/// Uses SLG resolution for all rules (recursive and non-recursive).
/// SLG is goal-directed: it only computes facts relevant to the WHERE constraints.
pub async fn evaluate_query(
query: &GoalQuery,
program: &CompiledProgram,
fact_source: &dyn DerivedFactSource,
config: &LocyConfig,
derived_store: &mut RowStore,
stats: &mut LocyStats,
start: Instant,
) -> Result<Vec<FactRow>, LocyError> {
let rule_name = query.rule_name.to_string();
let rule =
program
.rule_catalog
.get(&rule_name)
.ok_or_else(|| LocyError::QueryResolutionError {
message: format!("rule '{}' not found", rule_name),
})?;
let key_columns: Vec<String> = rule
.yield_schema
.iter()
.filter(|c| c.is_key)
.map(|c| c.name.clone())
.collect();
// Extract goal bindings from WHERE for goal-directed resolution
let goal_bindings = match &query.where_expr {
Some(expr) => extract_goal_bindings(expr, &key_columns),
None => std::collections::HashMap::new(),
};
// For FOLD rules (MNOR/MPROD/SUM), the SLG resolver does not apply
// post-fixpoint aggregation and would return raw pre-FOLD match rows.
// Use pre-computed facts from derived_store (which ran the full native
// fixpoint including FOLD aggregation and VID→Node enrichment).
let is_fold_rule = rule.clauses.iter().any(|c| !c.fold.is_empty());
if is_fold_rule && derived_store.contains_key(&rule_name) {
let rows = derived_store[&rule_name].rows.clone();
return apply_return_clause(rows, &query.return_clause, &config.params);
}
// Use a fresh store rather than the pre-computed orch_store.
// The native fixpoint stores node columns as VIDs (UInt64), not full node objects,
// so orch_store rows would fail property-based WHERE/RETURN evaluation (a.name etc.).
// SLG re-evaluation executes actual Cypher queries which return full node objects.
//
// However, FOLD rules (MNOR/MPROD/SUM) require fixpoint aggregation that the SLG
// resolver cannot perform. Seed the fresh store with pre-computed FOLD rule data
// so that downstream rules using IS NOT on FOLD rules can find their derived facts.
let mut fresh_store = RowStore::new();
for (name, relation) in derived_store.iter() {
if let Some(r) = program.rule_catalog.get(name)
&& r.clauses.iter().any(|c| !c.fold.is_empty())
{
fresh_store.insert(name.clone(), relation.clone());
}
}
let mut resolver = SLGResolver::new(program, fact_source, config, &mut fresh_store, start);
let results = resolver.resolve_goal(&rule_name, &goal_bindings).await?;
// Merge SLG stats
stats.queries_executed += resolver.stats.queries_executed;
stats.mutations_executed += resolver.stats.mutations_executed;
// Apply WHERE filter (SLG may return superset if goal bindings are partial).
// Params are injected into each row so $name references resolve correctly.
let filtered: Vec<FactRow> = if let Some(where_expr) = &query.where_expr {
results
.into_iter()
.filter(|row| {
let merged = merge_params(row, &config.params);
eval_expr(where_expr, &merged)
.map(|v| v.as_bool().unwrap_or(false))
.unwrap_or(false)
})
.collect()
} else {
results
};
// Apply RETURN clause if present
apply_return_clause(filtered, &query.return_clause, &config.params)
}
/// Apply a RETURN clause (projection, ordering, skip, limit) to results.
pub(super) fn apply_return_clause(
rows: Vec<FactRow>,
return_clause: &Option<uni_cypher::ast::ReturnClause>,
params: &HashMap<String, Value>,
) -> Result<Vec<FactRow>, LocyError> {
let rc = match return_clause {
Some(rc) => rc,
None => return Ok(rows),
};
// Project columns. Params are merged into each row so $name references
// in RETURN expressions (e.g. RETURN $agent_id AS id) resolve correctly.
let mut projected: Vec<FactRow> = rows
.into_iter()
.map(|row| {
let merged = merge_params(&row, params);
let mut new_row = FactRow::new();
for item in &rc.items {
match item {
ReturnItem::All => return Ok(row.clone()),
ReturnItem::Expr { expr, alias, .. } => {
let value = eval_expr(expr, &merged)?;
let name = alias.clone().unwrap_or_else(|| return_item_name(expr));
new_row.insert(name, value);
}
}
}
Ok(new_row)
})
.collect::<Result<Vec<_>, LocyError>>()?;
// Distinct
if rc.distinct {
let mut seen = std::collections::HashSet::new();
projected.retain(|row| {
let key = format!("{row:?}");
seen.insert(key)
});
}
// Order by
if let Some(sort_items) = &rc.order_by {
projected.sort_by(|a, b| {
for item in sort_items {
let va = eval_expr(&item.expr, a).unwrap_or(uni_common::Value::Null);
let vb = eval_expr(&item.expr, b).unwrap_or(uni_common::Value::Null);
let cmp = if item.ascending {
value_cmp(&va, &vb)
} else {
value_cmp(&vb, &va)
};
if cmp != std::cmp::Ordering::Equal {
return cmp;
}
}
std::cmp::Ordering::Equal
});
}
// Skip
if let Some(Expr::Literal(CypherLiteral::Integer(n))) = &rc.skip {
let n = *n as usize;
if n < projected.len() {
projected = projected[n..].to_vec();
} else {
projected.clear();
}
}
// Limit
if let Some(Expr::Literal(CypherLiteral::Integer(n))) = &rc.limit {
projected.truncate(*n as usize);
}
Ok(projected)
}
/// Merge query parameters into a row so that `Expr::Parameter(name)` can
/// resolve `$name` references during in-memory expression evaluation.
///
/// Row values take precedence — parameters only fill in keys that are absent.
pub(super) fn merge_params(row: &FactRow, params: &HashMap<String, Value>) -> FactRow {
let mut merged: FactRow = params.clone();
merged.extend(row.iter().map(|(k, v)| (k.clone(), v.clone())));
merged
}
/// Derive a column name from a RETURN expression when no alias is given.
///
/// Follows OpenCypher convention: `RETURN p` yields `"p"`,
/// `RETURN a.name` yields `"a.name"`. Falls back to `Debug` for
/// complex expressions.
fn return_item_name(expr: &Expr) -> String {
match expr {
Expr::Variable(v) => v.clone(),
Expr::Property(base, prop) => format!("{}.{}", return_item_name(base), prop),
_ => format!("{expr:?}"),
}
}