Skip to main content

uni_query/query/df_graph/
locy_derive.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! DERIVE command execution via `LocyExecutionContext`.
5//!
6//! Extracted from `uni-locy/src/orchestrator/mod.rs::derive_command`.
7//! Uses `LocyExecutionContext` for fact lookup and mutation execution.
8//!
9//! Supports two modes:
10//! - **execute mode** (`derive_command`): immediately applies mutations via `ctx.execute_mutation()`
11//! - **collect mode** (`collect_derive_facts`): collects Cypher ASTs + vertex/edge data for deferred application
12
13use std::collections::HashMap;
14
15use uni_common::Properties;
16use uni_cypher::ast::Query;
17use uni_cypher::locy_ast::{DeriveClause, DeriveCommand, DerivePattern, RuleOutput};
18use uni_locy::result::DerivedEdge;
19use uni_locy::{CompiledProgram, FactRow, LocyError, LocyStats};
20
21use super::locy_ast_builder::build_derive_create;
22use super::locy_eval::eval_expr;
23use super::locy_traits::LocyExecutionContext;
24
25/// Output of `collect_derive_facts()` — collected but not yet executed.
26pub struct CollectedDeriveOutput {
27    pub queries: Vec<Query>,
28    pub vertices: HashMap<String, Vec<Properties>>,
29    pub edges: Vec<DerivedEdge>,
30    pub affected: usize,
31}
32
33/// Execute a top-level DERIVE command (auto-apply mode).
34///
35/// Looks up facts from the native store via `ctx.lookup_derived()`, applies optional
36/// WHERE filtering, and for each matching fact executes the DERIVE mutation via
37/// `ctx.execute_mutation()`.
38pub async fn derive_command(
39    dc: &DeriveCommand,
40    program: &CompiledProgram,
41    ctx: &dyn LocyExecutionContext,
42    stats: &mut LocyStats,
43) -> Result<usize, LocyError> {
44    let collected = collect_derive_facts_inner(dc, program, ctx).await?;
45    for query in collected.queries {
46        ctx.execute_mutation(query, HashMap::new()).await?;
47        stats.mutations_executed += 1;
48    }
49    Ok(collected.affected)
50}
51
52/// Collect derived facts without executing mutations (collect mode).
53///
54/// Returns the Cypher ASTs, vertex data, and edge data for deferred
55/// application via `tx.apply()`.
56pub async fn collect_derive_facts(
57    dc: &DeriveCommand,
58    program: &CompiledProgram,
59    ctx: &dyn LocyExecutionContext,
60) -> Result<CollectedDeriveOutput, LocyError> {
61    collect_derive_facts_inner(dc, program, ctx).await
62}
63
64/// Shared implementation for both execute and collect modes.
65async fn collect_derive_facts_inner(
66    dc: &DeriveCommand,
67    program: &CompiledProgram,
68    ctx: &dyn LocyExecutionContext,
69) -> Result<CollectedDeriveOutput, LocyError> {
70    let rule_name = dc.rule_name.to_string();
71    let rule = program
72        .rule_catalog
73        .get(&rule_name)
74        .ok_or_else(|| LocyError::EvaluationError {
75            message: format!("rule '{}' not found for DERIVE command", rule_name),
76        })?;
77
78    let facts = ctx.lookup_derived_enriched(&rule_name).await?;
79
80    // Apply optional WHERE filter
81    let filtered: Vec<_> = if let Some(where_expr) = &dc.where_expr {
82        facts
83            .into_iter()
84            .filter(|row| {
85                eval_expr(where_expr, row)
86                    .map(|v| v.as_bool().unwrap_or(false))
87                    .unwrap_or(false)
88            })
89            .collect()
90    } else {
91        facts
92    };
93
94    let mut all_queries = Vec::new();
95    let mut all_vertices: HashMap<String, Vec<Properties>> = HashMap::new();
96    let mut all_edges = Vec::new();
97    let mut affected = 0;
98
99    for clause in &rule.clauses {
100        if let RuleOutput::Derive(derive_clause) = &clause.output {
101            for row in &filtered {
102                let queries = build_derive_create(derive_clause, row)?;
103                affected += queries.len();
104
105                // Extract vertex/edge data for inspection
106                extract_vertex_edge_data(derive_clause, row, &mut all_vertices, &mut all_edges);
107
108                all_queries.extend(queries);
109            }
110        }
111    }
112
113    Ok(CollectedDeriveOutput {
114        queries: all_queries,
115        vertices: all_vertices,
116        edges: all_edges,
117        affected,
118    })
119}
120
121/// Extract vertex and edge inspection data from a DeriveClause + bindings row.
122fn extract_vertex_edge_data(
123    derive_clause: &DeriveClause,
124    row: &FactRow,
125    vertices: &mut HashMap<String, Vec<Properties>>,
126    edges: &mut Vec<DerivedEdge>,
127) {
128    match derive_clause {
129        DeriveClause::Patterns(patterns) => {
130            for pattern in patterns {
131                extract_from_pattern(pattern, row, vertices, edges);
132            }
133        }
134        DeriveClause::Merge(a, b) => {
135            // MERGE produces an edge between two existing nodes, no new vertices
136            let source_props = node_properties_from_binding(a, row);
137            let target_props = node_properties_from_binding(b, row);
138            edges.push(DerivedEdge {
139                edge_type: "MERGED_WITH".to_string(),
140                source_label: node_label_from_binding(a, row),
141                source_properties: source_props,
142                target_label: node_label_from_binding(b, row),
143                target_properties: target_props,
144                edge_properties: Properties::new(),
145            });
146        }
147    }
148}
149
150/// Extract vertex/edge data from a single DerivePattern.
151fn extract_from_pattern(
152    pattern: &DerivePattern,
153    row: &FactRow,
154    vertices: &mut HashMap<String, Vec<Properties>>,
155    edges: &mut Vec<DerivedEdge>,
156) {
157    let source = &pattern.source;
158    let target = &pattern.target;
159    let edge = &pattern.edge;
160
161    let source_label = source
162        .labels
163        .first()
164        .cloned()
165        .unwrap_or_else(|| node_label_from_binding(&source.variable, row));
166    let target_label = target
167        .labels
168        .first()
169        .cloned()
170        .unwrap_or_else(|| node_label_from_binding(&target.variable, row));
171
172    let source_props = node_properties_from_binding(&source.variable, row);
173    let target_props = node_properties_from_binding(&target.variable, row);
174
175    if source.is_new {
176        vertices
177            .entry(source_label.clone())
178            .or_default()
179            .push(source_props.clone());
180    }
181    if target.is_new {
182        vertices
183            .entry(target_label.clone())
184            .or_default()
185            .push(target_props.clone());
186    }
187
188    let edge_props = edge
189        .properties
190        .as_ref()
191        .and_then(|expr| eval_map_expr(expr, row))
192        .unwrap_or_default();
193
194    edges.push(DerivedEdge {
195        edge_type: edge.edge_type.clone(),
196        source_label,
197        source_properties: source_props,
198        target_label,
199        target_properties: target_props,
200        edge_properties: edge_props,
201    });
202}
203
204/// Extract properties from a binding row for a node variable.
205fn node_properties_from_binding(var: &str, row: &FactRow) -> Properties {
206    use uni_common::Value;
207    match row.get(var) {
208        Some(Value::Node(node)) => node.properties.clone(),
209        Some(Value::Map(map)) => map.clone(),
210        _ => Properties::new(),
211    }
212}
213
214/// Extract the label from a binding row for a node variable.
215fn node_label_from_binding(var: &str, row: &FactRow) -> String {
216    use uni_common::Value;
217    match row.get(var) {
218        Some(Value::Node(node)) => node.labels.first().cloned().unwrap_or_default(),
219        _ => String::new(),
220    }
221}
222
223/// Try to evaluate a map expression to Properties.
224fn eval_map_expr(expr: &uni_cypher::ast::Expr, row: &FactRow) -> Option<Properties> {
225    use uni_common::Value;
226    match eval_expr(expr, row) {
227        Ok(Value::Map(m)) => Some(m),
228        _ => None,
229    }
230}