uni_query/query/df_graph/
locy_derive.rs1use std::collections::HashMap;
14
15use uni_common::Properties;
16use uni_cypher::ast::Query;
17use uni_cypher::locy_ast::{DeriveClause, DeriveCommand, DerivePattern, RuleOutput};
18use uni_locy::result::DerivedEdge;
19use uni_locy::{CompiledProgram, FactRow, LocyError, LocyStats};
20
21use super::locy_ast_builder::build_derive_create;
22use super::locy_eval::eval_expr;
23use super::locy_traits::LocyExecutionContext;
24
25pub struct CollectedDeriveOutput {
27 pub queries: Vec<Query>,
28 pub vertices: HashMap<String, Vec<Properties>>,
29 pub edges: Vec<DerivedEdge>,
30 pub affected: usize,
31}
32
33pub async fn derive_command(
39 dc: &DeriveCommand,
40 program: &CompiledProgram,
41 ctx: &dyn LocyExecutionContext,
42 stats: &mut LocyStats,
43) -> Result<usize, LocyError> {
44 let collected = collect_derive_facts_inner(dc, program, ctx).await?;
45 for query in collected.queries {
46 ctx.execute_mutation(query, HashMap::new()).await?;
47 stats.mutations_executed += 1;
48 }
49 Ok(collected.affected)
50}
51
52pub async fn collect_derive_facts(
57 dc: &DeriveCommand,
58 program: &CompiledProgram,
59 ctx: &dyn LocyExecutionContext,
60) -> Result<CollectedDeriveOutput, LocyError> {
61 collect_derive_facts_inner(dc, program, ctx).await
62}
63
64async fn collect_derive_facts_inner(
66 dc: &DeriveCommand,
67 program: &CompiledProgram,
68 ctx: &dyn LocyExecutionContext,
69) -> Result<CollectedDeriveOutput, LocyError> {
70 let rule_name = dc.rule_name.to_string();
71 let rule = program
72 .rule_catalog
73 .get(&rule_name)
74 .ok_or_else(|| LocyError::EvaluationError {
75 message: format!("rule '{}' not found for DERIVE command", rule_name),
76 })?;
77
78 let facts = ctx.lookup_derived_enriched(&rule_name).await?;
79
80 let filtered: Vec<_> = if let Some(where_expr) = &dc.where_expr {
82 facts
83 .into_iter()
84 .filter(|row| {
85 eval_expr(where_expr, row)
86 .map(|v| v.as_bool().unwrap_or(false))
87 .unwrap_or(false)
88 })
89 .collect()
90 } else {
91 facts
92 };
93
94 let mut all_queries = Vec::new();
95 let mut all_vertices: HashMap<String, Vec<Properties>> = HashMap::new();
96 let mut all_edges = Vec::new();
97 let mut affected = 0;
98
99 for clause in &rule.clauses {
100 if let RuleOutput::Derive(derive_clause) = &clause.output {
101 for row in &filtered {
102 let queries = build_derive_create(derive_clause, row)?;
103 affected += queries.len();
104
105 extract_vertex_edge_data(derive_clause, row, &mut all_vertices, &mut all_edges);
107
108 all_queries.extend(queries);
109 }
110 }
111 }
112
113 Ok(CollectedDeriveOutput {
114 queries: all_queries,
115 vertices: all_vertices,
116 edges: all_edges,
117 affected,
118 })
119}
120
121fn extract_vertex_edge_data(
123 derive_clause: &DeriveClause,
124 row: &FactRow,
125 vertices: &mut HashMap<String, Vec<Properties>>,
126 edges: &mut Vec<DerivedEdge>,
127) {
128 match derive_clause {
129 DeriveClause::Patterns(patterns) => {
130 for pattern in patterns {
131 extract_from_pattern(pattern, row, vertices, edges);
132 }
133 }
134 DeriveClause::Merge(a, b) => {
135 let source_props = node_properties_from_binding(a, row);
137 let target_props = node_properties_from_binding(b, row);
138 edges.push(DerivedEdge {
139 edge_type: "MERGED_WITH".to_string(),
140 source_label: node_label_from_binding(a, row),
141 source_properties: source_props,
142 target_label: node_label_from_binding(b, row),
143 target_properties: target_props,
144 edge_properties: Properties::new(),
145 });
146 }
147 }
148}
149
150fn extract_from_pattern(
152 pattern: &DerivePattern,
153 row: &FactRow,
154 vertices: &mut HashMap<String, Vec<Properties>>,
155 edges: &mut Vec<DerivedEdge>,
156) {
157 let source = &pattern.source;
158 let target = &pattern.target;
159 let edge = &pattern.edge;
160
161 let source_label = source
162 .labels
163 .first()
164 .cloned()
165 .unwrap_or_else(|| node_label_from_binding(&source.variable, row));
166 let target_label = target
167 .labels
168 .first()
169 .cloned()
170 .unwrap_or_else(|| node_label_from_binding(&target.variable, row));
171
172 let source_props = node_properties_from_binding(&source.variable, row);
173 let target_props = node_properties_from_binding(&target.variable, row);
174
175 if source.is_new {
176 vertices
177 .entry(source_label.clone())
178 .or_default()
179 .push(source_props.clone());
180 }
181 if target.is_new {
182 vertices
183 .entry(target_label.clone())
184 .or_default()
185 .push(target_props.clone());
186 }
187
188 let edge_props = edge
189 .properties
190 .as_ref()
191 .and_then(|expr| eval_map_expr(expr, row))
192 .unwrap_or_default();
193
194 edges.push(DerivedEdge {
195 edge_type: edge.edge_type.clone(),
196 source_label,
197 source_properties: source_props,
198 target_label,
199 target_properties: target_props,
200 edge_properties: edge_props,
201 });
202}
203
204fn node_properties_from_binding(var: &str, row: &FactRow) -> Properties {
206 use uni_common::Value;
207 match row.get(var) {
208 Some(Value::Node(node)) => node.properties.clone(),
209 Some(Value::Map(map)) => map.clone(),
210 _ => Properties::new(),
211 }
212}
213
214fn node_label_from_binding(var: &str, row: &FactRow) -> String {
216 use uni_common::Value;
217 match row.get(var) {
218 Some(Value::Node(node)) => node.labels.first().cloned().unwrap_or_default(),
219 _ => String::new(),
220 }
221}
222
223fn eval_map_expr(expr: &uni_cypher::ast::Expr, row: &FactRow) -> Option<Properties> {
225 use uni_common::Value;
226 match eval_expr(expr, row) {
227 Ok(Value::Map(m)) => Some(m),
228 _ => None,
229 }
230}