1use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use arrow_array::RecordBatch;
11use async_trait::async_trait;
12use uni_common::{Result, UniError, Value};
13use uni_cypher::ast::{Expr, Pattern, Query};
14use uni_cypher::locy_ast::RuleOutput;
15use uni_locy::types::CompiledCommand;
16use uni_locy::{
17 CommandResult, CompiledProgram, LocyCompileError, LocyConfig, LocyError, LocyResult, LocyStats,
18 Row, RuntimeWarning, SavepointId, compile,
19};
20use uni_query::QueryPlanner;
21use uni_query::query::df_graph::locy_ast_builder::build_match_return_query;
22use uni_query::query::df_graph::locy_delta::{RowRelation, RowStore, extract_cypher_conditions};
23use uni_query::query::df_graph::locy_eval::record_batches_to_locy_rows;
24use uni_query::query::df_graph::locy_explain::ProvenanceStore;
25use uni_query::query::df_graph::{DerivedFactSource, LocyExecutionContext};
26
27use crate::api::Uni;
28
29#[derive(Debug, Default, Clone)]
35pub struct LocyRuleRegistry {
36 pub rules: HashMap<String, uni_locy::types::CompiledRule>,
38 pub strata: Vec<uni_locy::types::Stratum>,
40}
41
42pub struct LocyEngine<'a> {
44 db: &'a Uni,
45}
46
47impl Uni {
48 pub fn locy(&self) -> LocyEngine<'_> {
50 LocyEngine { db: self }
51 }
52}
53
54impl<'a> LocyEngine<'a> {
55 pub fn compile_only(&self, program: &str) -> Result<CompiledProgram> {
61 let ast = uni_cypher::parse_locy(program).map_err(map_parse_error)?;
62 let registry = self.db.locy_rule_registry.read().unwrap();
63 if registry.rules.is_empty() {
64 drop(registry);
65 compile(&ast).map_err(map_compile_error)
66 } else {
67 let external_names: Vec<String> = registry.rules.keys().cloned().collect();
68 drop(registry);
69 uni_locy::compile_with_external_rules(&ast, &external_names).map_err(map_compile_error)
70 }
71 }
72
73 pub fn register(&self, program: &str) -> Result<()> {
80 let compiled = self.compile_only(program)?;
81 let mut registry = self.db.locy_rule_registry.write().unwrap();
82 for (name, rule) in compiled.rule_catalog {
83 registry.rules.insert(name, rule);
84 }
85 let base_id = registry.strata.len();
87 for mut stratum in compiled.strata {
88 let old_id = stratum.id;
89 stratum.id = base_id + old_id;
90 stratum.depends_on = stratum.depends_on.iter().map(|d| base_id + d).collect();
91 registry.strata.push(stratum);
92 }
93 Ok(())
94 }
95
96 pub fn clear_registry(&self) {
98 let mut registry = self.db.locy_rule_registry.write().unwrap();
99 registry.rules.clear();
100 registry.strata.clear();
101 }
102
103 pub async fn evaluate(&self, program: &str) -> Result<LocyResult> {
105 self.evaluate_with_config(program, &LocyConfig::default())
106 .await
107 }
108
109 pub fn evaluate_with(&self, program: &str) -> crate::api::locy_builder::LocyBuilder<'_> {
127 crate::api::locy_builder::LocyBuilder::new(self.db, program)
128 }
129
130 pub async fn explain(&self, program: &str) -> Result<LocyResult> {
132 self.evaluate(program).await
133 }
134
135 pub async fn evaluate_with_config(
140 &self,
141 program: &str,
142 config: &LocyConfig,
143 ) -> Result<LocyResult> {
144 let mut compiled = self.compile_only(program)?;
145
146 {
148 let registry = self.db.locy_rule_registry.read().unwrap();
149 if !registry.rules.is_empty() {
150 for (name, rule) in ®istry.rules {
151 compiled
152 .rule_catalog
153 .entry(name.clone())
154 .or_insert_with(|| rule.clone());
155 }
156 let base_id = registry.strata.len();
157 for stratum in &mut compiled.strata {
158 stratum.id += base_id;
159 stratum.depends_on = stratum.depends_on.iter().map(|d| d + base_id).collect();
160 }
161 let mut merged_strata = registry.strata.clone();
162 merged_strata.append(&mut compiled.strata);
163 compiled.strata = merged_strata;
164 }
165 }
166 let start = Instant::now();
167
168 let schema = self.db.schema.schema();
170 let query_planner = uni_query::QueryPlanner::new(schema);
171 let plan_builder = uni_query::query::locy_planner::LocyPlanBuilder::new(&query_planner);
172 let logical = plan_builder
173 .build_program_plan(
174 &compiled,
175 config.max_iterations,
176 config.timeout,
177 config.max_derived_bytes,
178 config.deterministic_best_by,
179 config.strict_probability_domain,
180 config.probability_epsilon,
181 config.exact_probability,
182 config.max_bdd_variables,
183 config.top_k_proofs,
184 )
185 .map_err(|e| UniError::Query {
186 message: format!("LocyPlanBuildError: {e}"),
187 query: None,
188 })?;
189
190 let mut df_executor = uni_query::Executor::new(self.db.storage.clone());
192 df_executor.set_config(self.db.config.clone());
193 if let Some(ref w) = self.db.writer {
194 df_executor.set_writer(w.clone());
195 }
196 df_executor.set_xervo_runtime(self.db.xervo_runtime.clone());
197 df_executor.set_procedure_registry(self.db.procedure_registry.clone());
198
199 let (session_ctx, planner, _prop_mgr) = df_executor
200 .create_datafusion_planner(&self.db.properties, &config.params)
201 .await
202 .map_err(map_native_df_error)?;
203
204 let exec_plan = planner.plan(&logical).map_err(map_native_df_error)?;
206
207 let has_explain = compiled
209 .commands
210 .iter()
211 .any(|c| matches!(c, CompiledCommand::ExplainRule(_)));
212 let has_prob_fold = compiled.strata.iter().any(|s| {
213 s.rules.iter().any(|r| {
214 r.clauses.iter().any(|c| {
215 c.fold.iter().any(|f| {
216 if let uni_cypher::ast::Expr::FunctionCall { name, .. } = &f.aggregate {
217 matches!(name.to_uppercase().as_str(), "MNOR" | "MPROD")
218 } else {
219 false
220 }
221 })
222 })
223 })
224 });
225 let needs_tracker = has_explain || has_prob_fold;
226 let tracker: Option<Arc<uni_query::query::df_graph::ProvenanceStore>> = if needs_tracker {
227 Some(Arc::new(uni_query::query::df_graph::ProvenanceStore::new()))
228 } else {
229 None
230 };
231
232 let (
233 derived_store_slot,
234 iteration_counts_slot,
235 peak_memory_slot,
236 warnings_slot,
237 approximate_slot,
238 ) = if let Some(program_exec) = exec_plan
239 .as_any()
240 .downcast_ref::<uni_query::query::df_graph::LocyProgramExec>(
241 ) {
242 if let Some(ref t) = tracker {
243 program_exec.set_derivation_tracker(Arc::clone(t));
244 }
245 (
246 program_exec.derived_store_slot(),
247 program_exec.iteration_counts_slot(),
248 program_exec.peak_memory_slot(),
249 program_exec.warnings_slot(),
250 program_exec.approximate_slot(),
251 )
252 } else {
253 (
254 Arc::new(std::sync::RwLock::new(None)),
255 Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
256 Arc::new(std::sync::RwLock::new(0usize)),
257 Arc::new(std::sync::RwLock::new(Vec::new())),
258 Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
259 )
260 };
261
262 let _stats_batches = uni_query::Executor::collect_batches(&session_ctx, exec_plan)
264 .await
265 .map_err(map_native_df_error)?;
266
267 let native_store = derived_store_slot
269 .write()
270 .unwrap()
271 .take()
272 .unwrap_or_default();
273
274 let mut orch_store = native_store_to_row_store(&native_store, &compiled);
276
277 let native_ctx = NativeExecutionAdapter::new(
279 self.db,
280 &native_store,
281 &compiled,
282 planner.graph_ctx().clone(),
283 planner.session_ctx().clone(),
284 config.params.clone(),
285 );
286 let mut locy_stats = LocyStats {
287 total_iterations: iteration_counts_slot
288 .read()
289 .map(|c| c.values().sum::<usize>())
290 .unwrap_or(0),
291 peak_memory_bytes: peak_memory_slot.read().map(|v| *v).unwrap_or(0),
292 ..LocyStats::default()
293 };
294 let approx_for_explain = approximate_slot
295 .read()
296 .map(|a| a.clone())
297 .unwrap_or_default();
298 let mut command_results = Vec::new();
299 for cmd in &compiled.commands {
300 let result = dispatch_native_command(
301 cmd,
302 &compiled,
303 &native_ctx,
304 config,
305 &mut orch_store,
306 &mut locy_stats,
307 tracker.clone(),
308 start,
309 &approx_for_explain,
310 )
311 .await
312 .map_err(map_runtime_error)?;
313 command_results.push(result);
314 }
315
316 let evaluation_time = start.elapsed();
317
318 let mut base_derived: HashMap<String, Vec<Row>> = native_store
320 .rule_names()
321 .filter_map(|name| {
322 native_store
323 .get(name)
324 .map(|batches| (name.to_string(), record_batches_to_locy_rows(batches)))
325 })
326 .collect();
327
328 let approximate_groups = approximate_slot
330 .read()
331 .map(|a| a.clone())
332 .unwrap_or_default();
333 for (rule_name, groups) in &approximate_groups {
334 if !groups.is_empty()
335 && let Some(rows) = base_derived.get_mut(rule_name)
336 {
337 for row in rows.iter_mut() {
338 row.insert("_approximate".to_string(), Value::Bool(true));
339 }
340 }
341 }
342
343 let enriched_derived = enrich_vids_with_nodes(
344 self.db,
345 &native_store,
346 base_derived,
347 planner.graph_ctx(),
348 planner.session_ctx(),
349 )
350 .await;
351
352 let warnings = warnings_slot.read().map(|w| w.clone()).unwrap_or_default();
354 Ok(build_locy_result(
355 enriched_derived,
356 command_results,
357 &compiled,
358 evaluation_time,
359 locy_stats,
360 warnings,
361 approximate_groups,
362 ))
363 }
364
365 async fn run_strata_native(
371 &self,
372 compiled: &CompiledProgram,
373 config: &LocyConfig,
374 ) -> Result<uni_query::query::df_graph::DerivedStore> {
375 let schema = self.db.schema.schema();
376 let query_planner = uni_query::QueryPlanner::new(schema);
377 let plan_builder = uni_query::query::locy_planner::LocyPlanBuilder::new(&query_planner);
378 let logical = plan_builder
379 .build_program_plan(
380 compiled,
381 config.max_iterations,
382 config.timeout,
383 config.max_derived_bytes,
384 config.deterministic_best_by,
385 config.strict_probability_domain,
386 config.probability_epsilon,
387 config.exact_probability,
388 config.max_bdd_variables,
389 config.top_k_proofs,
390 )
391 .map_err(|e| UniError::Query {
392 message: format!("LocyPlanBuildError: {e}"),
393 query: None,
394 })?;
395
396 let mut df_executor = uni_query::Executor::new(self.db.storage.clone());
397 df_executor.set_config(self.db.config.clone());
398 if let Some(ref w) = self.db.writer {
399 df_executor.set_writer(w.clone());
400 }
401 df_executor.set_xervo_runtime(self.db.xervo_runtime.clone());
402 df_executor.set_procedure_registry(self.db.procedure_registry.clone());
403
404 let (session_ctx, planner, _) = df_executor
405 .create_datafusion_planner(&self.db.properties, &HashMap::new())
406 .await
407 .map_err(map_native_df_error)?;
408 let exec_plan = planner.plan(&logical).map_err(map_native_df_error)?;
409
410 let derived_store_slot = if let Some(program_exec) =
411 exec_plan
412 .as_any()
413 .downcast_ref::<uni_query::query::df_graph::LocyProgramExec>()
414 {
415 program_exec.derived_store_slot()
416 } else {
417 Arc::new(std::sync::RwLock::new(None))
418 };
419
420 let _ = uni_query::Executor::collect_batches(&session_ctx, exec_plan)
421 .await
422 .map_err(map_native_df_error)?;
423 Ok(derived_store_slot
424 .write()
425 .unwrap()
426 .take()
427 .unwrap_or_default())
428 }
429}
430
431struct NativeExecutionAdapter<'a> {
434 db: &'a Uni,
435 native_store: &'a uni_query::query::df_graph::DerivedStore,
436 compiled: &'a CompiledProgram,
437 graph_ctx: Arc<uni_query::query::df_graph::GraphExecutionContext>,
439 session_ctx: Arc<parking_lot::RwLock<datafusion::prelude::SessionContext>>,
440 params: HashMap<String, Value>,
443}
444
445impl<'a> NativeExecutionAdapter<'a> {
446 fn new(
447 db: &'a Uni,
448 native_store: &'a uni_query::query::df_graph::DerivedStore,
449 compiled: &'a CompiledProgram,
450 graph_ctx: Arc<uni_query::query::df_graph::GraphExecutionContext>,
451 session_ctx: Arc<parking_lot::RwLock<datafusion::prelude::SessionContext>>,
452 params: HashMap<String, Value>,
453 ) -> Self {
454 Self {
455 db,
456 native_store,
457 compiled,
458 graph_ctx,
459 session_ctx,
460 params,
461 }
462 }
463
464 async fn execute_query_ast(
466 &self,
467 ast: Query,
468 ) -> std::result::Result<Vec<RecordBatch>, LocyError> {
469 let schema = self.db.schema.schema();
470 let logical_plan =
471 QueryPlanner::new(schema)
472 .plan(ast)
473 .map_err(|e| LocyError::ExecutorError {
474 message: e.to_string(),
475 })?;
476 uni_query::query::df_graph::common::execute_subplan(
477 &logical_plan,
478 &self.params,
479 &HashMap::new(),
480 &self.graph_ctx,
481 &self.session_ctx,
482 &self.db.storage,
483 &self.db.schema.schema(),
484 )
485 .await
486 .map_err(|e| LocyError::ExecutorError {
487 message: e.to_string(),
488 })
489 }
490}
491
492#[async_trait(?Send)]
493impl DerivedFactSource for NativeExecutionAdapter<'_> {
494 fn lookup_derived(&self, rule_name: &str) -> std::result::Result<Vec<Row>, LocyError> {
495 let batches = self
496 .native_store
497 .get(rule_name)
498 .map(|v| v.as_slice())
499 .unwrap_or(&[]);
500 Ok(record_batches_to_locy_rows(batches))
501 }
502
503 fn lookup_derived_batches(
504 &self,
505 rule_name: &str,
506 ) -> std::result::Result<Vec<RecordBatch>, LocyError> {
507 Ok(self
508 .native_store
509 .get(rule_name)
510 .map(|v| v.to_vec())
511 .unwrap_or_default())
512 }
513
514 async fn execute_pattern(
515 &self,
516 pattern: &Pattern,
517 where_conditions: &[Expr],
518 ) -> std::result::Result<Vec<RecordBatch>, LocyError> {
519 let query = build_match_return_query(pattern, where_conditions);
520 let schema = self.db.schema.schema();
521 let logical_plan =
522 QueryPlanner::new(schema)
523 .plan(query)
524 .map_err(|e| LocyError::ExecutorError {
525 message: e.to_string(),
526 })?;
527
528 let transaction_ctx: Option<Arc<uni_query::query::df_graph::GraphExecutionContext>> =
533 if let Some(writer_arc) = &self.db.writer {
534 if let Ok(writer) = writer_arc.try_read() {
535 if writer.transaction_l0.is_some() {
536 let l0_ctx = uni_query::query::df_graph::L0Context {
537 current_l0: Some(writer.l0_manager.get_current()),
538 transaction_l0: writer.transaction_l0.clone(),
539 pending_flush_l0s: writer.l0_manager.get_pending_flush(),
540 };
541 Some(Arc::new(
542 uni_query::query::df_graph::GraphExecutionContext::with_l0_context(
543 self.db.storage.clone(),
544 l0_ctx,
545 self.graph_ctx.property_manager().clone(),
546 ),
547 ))
548 } else {
549 None
550 }
551 } else {
552 None
553 }
554 } else {
555 None
556 };
557
558 let effective_ctx = transaction_ctx.as_ref().unwrap_or(&self.graph_ctx);
559
560 uni_query::query::df_graph::common::execute_subplan(
562 &logical_plan,
563 &self.params,
564 &HashMap::new(),
565 effective_ctx,
566 &self.session_ctx,
567 &self.db.storage,
568 &self.db.schema.schema(),
569 )
570 .await
571 .map_err(|e| LocyError::ExecutorError {
572 message: e.to_string(),
573 })
574 }
575}
576
577#[async_trait(?Send)]
578impl LocyExecutionContext for NativeExecutionAdapter<'_> {
579 async fn lookup_derived_enriched(
580 &self,
581 rule_name: &str,
582 ) -> std::result::Result<Vec<Row>, LocyError> {
583 use arrow_schema::DataType;
584
585 if let Some(rule) = self.compiled.rule_catalog.get(rule_name) {
586 let is_derive_rule = rule
587 .clauses
588 .iter()
589 .all(|c| matches!(c.output, RuleOutput::Derive(_)));
590 if is_derive_rule {
591 let mut all_rows = Vec::new();
592 for clause in &rule.clauses {
593 let cypher_conds = extract_cypher_conditions(&clause.where_conditions);
594 let raw_batches = self
595 .execute_pattern(&clause.match_pattern, &cypher_conds)
596 .await?;
597 all_rows.extend(record_batches_to_locy_rows(&raw_batches));
598 }
599 return Ok(all_rows);
600 }
601 }
602
603 let batches = self
604 .native_store
605 .get(rule_name)
606 .map(|v| v.as_slice())
607 .unwrap_or(&[]);
608 let rows = record_batches_to_locy_rows(batches);
609
610 let vid_columns: HashSet<String> = batches
611 .first()
612 .map(|batch| {
613 batch
614 .schema()
615 .fields()
616 .iter()
617 .filter(|f| *f.data_type() == DataType::UInt64)
618 .map(|f| f.name().clone())
619 .collect()
620 })
621 .unwrap_or_default();
622
623 if vid_columns.is_empty() {
624 return Ok(rows);
625 }
626
627 let unique_vids: HashSet<i64> = rows
628 .iter()
629 .flat_map(|row| {
630 vid_columns.iter().filter_map(|col| {
631 if let Some(Value::Int(vid)) = row.get(col) {
632 Some(*vid)
633 } else {
634 None
635 }
636 })
637 })
638 .collect();
639
640 if unique_vids.is_empty() {
641 return Ok(rows);
642 }
643
644 let vids_literal = unique_vids
645 .iter()
646 .map(|v| v.to_string())
647 .collect::<Vec<_>>()
648 .join(", ");
649 let query_str =
650 format!("MATCH (n) WHERE id(n) IN [{vids_literal}] RETURN id(n) AS _vid, n");
651 let mut vid_to_node: HashMap<i64, Value> = HashMap::new();
652 if let Ok(ast) = uni_cypher::parse(&query_str)
653 && let Ok(batches) = self.execute_query_ast(ast).await
654 {
655 for row in record_batches_to_locy_rows(&batches) {
656 if let (Some(Value::Int(vid)), Some(node)) = (row.get("_vid"), row.get("n")) {
657 vid_to_node.insert(*vid, node.clone());
658 }
659 }
660 }
661
662 Ok(rows
663 .into_iter()
664 .map(|row| {
665 row.into_iter()
666 .map(|(k, v)| {
667 if vid_columns.contains(&k)
668 && let Value::Int(vid) = &v
669 {
670 let new_v = vid_to_node.get(vid).cloned().unwrap_or(v);
671 return (k, new_v);
672 }
673 (k, v)
674 })
675 .collect()
676 })
677 .collect())
678 }
679
680 async fn execute_cypher_read(&self, ast: Query) -> std::result::Result<Vec<Row>, LocyError> {
681 let result = self
684 .db
685 .execute_ast_internal(ast, "<locy>", HashMap::new(), self.db.config.clone())
686 .await
687 .map_err(|e| LocyError::ExecutorError {
688 message: e.to_string(),
689 })?;
690 Ok(result
691 .rows
692 .into_iter()
693 .map(|row| {
694 row.columns
695 .iter()
696 .zip(row.values)
697 .map(|(col, val)| (col.clone(), val))
698 .collect()
699 })
700 .collect())
701 }
702
703 async fn execute_mutation(
704 &self,
705 ast: Query,
706 params: HashMap<String, Value>,
707 ) -> std::result::Result<usize, LocyError> {
708 let before = self.db.get_mutation_count().await;
709 self.db
710 .execute_ast_internal(ast, "<locy>", params, self.db.config.clone())
711 .await
712 .map_err(|e| LocyError::ExecutorError {
713 message: e.to_string(),
714 })?;
715 let after = self.db.get_mutation_count().await;
716 Ok(after.saturating_sub(before))
717 }
718
719 async fn begin_savepoint(&self) -> std::result::Result<SavepointId, LocyError> {
720 let writer = self
721 .db
722 .writer
723 .as_ref()
724 .ok_or_else(|| LocyError::SavepointFailed {
725 message: "database is read-only".to_string(),
726 })?;
727 let mut w = writer.write().await;
728 w.begin_transaction()
729 .map_err(|e| LocyError::SavepointFailed {
730 message: e.to_string(),
731 })?;
732 Ok(SavepointId(0))
733 }
734
735 async fn rollback_savepoint(&self, _id: SavepointId) -> std::result::Result<(), LocyError> {
736 let writer = self
737 .db
738 .writer
739 .as_ref()
740 .ok_or_else(|| LocyError::SavepointFailed {
741 message: "database is read-only".to_string(),
742 })?;
743 let mut w = writer.write().await;
744 w.rollback_transaction()
745 .map_err(|e| LocyError::SavepointFailed {
746 message: e.to_string(),
747 })?;
748 Ok(())
749 }
750
751 async fn re_evaluate_strata(
752 &self,
753 program: &CompiledProgram,
754 config: &LocyConfig,
755 ) -> std::result::Result<RowStore, LocyError> {
756 let strata_only = CompiledProgram {
757 strata: program.strata.clone(),
758 rule_catalog: program.rule_catalog.clone(),
759 warnings: vec![],
760 commands: vec![],
761 };
762 let engine = self.db.locy();
763 let native_store = engine
764 .run_strata_native(&strata_only, config)
765 .await
766 .map_err(|e| LocyError::ExecutorError {
767 message: e.to_string(),
768 })?;
769 Ok(native_store_to_row_store(&native_store, program))
770 }
771}
772
773#[allow(clippy::too_many_arguments)]
776fn dispatch_native_command<'a>(
777 cmd: &'a CompiledCommand,
778 program: &'a CompiledProgram,
779 ctx: &'a NativeExecutionAdapter<'a>,
780 config: &'a LocyConfig,
781 orch_store: &'a mut RowStore,
782 stats: &'a mut LocyStats,
783 tracker: Option<Arc<ProvenanceStore>>,
784 start: Instant,
785 approximate_groups: &'a HashMap<String, Vec<String>>,
786) -> std::pin::Pin<
787 Box<dyn std::future::Future<Output = std::result::Result<CommandResult, LocyError>> + 'a>,
788> {
789 Box::pin(async move {
790 match cmd {
791 CompiledCommand::GoalQuery(gq) => {
792 let rows = uni_query::query::df_graph::locy_query::evaluate_query(
793 gq, program, ctx, config, orch_store, stats, start,
794 )
795 .await?;
796 Ok(CommandResult::Query(rows))
797 }
798 CompiledCommand::ExplainRule(eq) => {
799 let node = uni_query::query::df_graph::locy_explain::explain_rule(
800 eq,
801 program,
802 ctx,
803 config,
804 orch_store,
805 stats,
806 tracker.as_deref(),
807 Some(approximate_groups),
808 )
809 .await?;
810 Ok(CommandResult::Explain(node))
811 }
812 CompiledCommand::Assume(ca) => {
813 let rows = uni_query::query::df_graph::locy_assume::evaluate_assume(
814 ca, program, ctx, config, stats,
815 )
816 .await?;
817 Ok(CommandResult::Assume(rows))
818 }
819 CompiledCommand::Abduce(aq) => {
820 let result = uni_query::query::df_graph::locy_abduce::evaluate_abduce(
821 aq,
822 program,
823 ctx,
824 config,
825 orch_store,
826 stats,
827 tracker.as_deref(),
828 )
829 .await?;
830 Ok(CommandResult::Abduce(result))
831 }
832 CompiledCommand::DeriveCommand(dc) => {
833 let affected = uni_query::query::df_graph::locy_derive::derive_command(
834 dc, program, ctx, stats,
835 )
836 .await?;
837 Ok(CommandResult::Derive { affected })
838 }
839 CompiledCommand::Cypher(q) => {
840 let rows = ctx.execute_cypher_read(q.clone()).await?;
841 stats.queries_executed += 1;
842 Ok(CommandResult::Cypher(rows))
843 }
844 }
845 })
846}
847
848async fn enrich_vids_with_nodes(
851 db: &Uni,
852 native_store: &uni_query::query::df_graph::DerivedStore,
853 derived: HashMap<String, Vec<Row>>,
854 graph_ctx: &Arc<uni_query::query::df_graph::GraphExecutionContext>,
855 session_ctx: &Arc<parking_lot::RwLock<datafusion::prelude::SessionContext>>,
856) -> HashMap<String, Vec<Row>> {
857 use arrow_schema::DataType;
858 let mut enriched = HashMap::new();
859
860 for (name, rows) in derived {
861 let vid_columns: HashSet<String> = native_store
862 .get(&name)
863 .and_then(|batches| batches.first())
864 .map(|batch| {
865 batch
866 .schema()
867 .fields()
868 .iter()
869 .filter(|f| *f.data_type() == DataType::UInt64)
870 .map(|f| f.name().clone())
871 .collect()
872 })
873 .unwrap_or_default();
874
875 if vid_columns.is_empty() {
876 enriched.insert(name, rows);
877 continue;
878 }
879
880 let unique_vids: HashSet<i64> = rows
881 .iter()
882 .flat_map(|row| {
883 vid_columns.iter().filter_map(|col| {
884 if let Some(Value::Int(vid)) = row.get(col) {
885 Some(*vid)
886 } else {
887 None
888 }
889 })
890 })
891 .collect();
892
893 if unique_vids.is_empty() {
894 enriched.insert(name, rows);
895 continue;
896 }
897
898 let vids_literal = unique_vids
899 .iter()
900 .map(|v| v.to_string())
901 .collect::<Vec<_>>()
902 .join(", ");
903 let query_str = format!(
904 "MATCH (n) WHERE id(n) IN [{}] RETURN id(n) AS _vid, n",
905 vids_literal
906 );
907 let mut vid_to_node: HashMap<i64, Value> = HashMap::new();
908 if let Ok(ast) = uni_cypher::parse(&query_str) {
909 let schema = db.schema.schema();
910 if let Ok(logical_plan) = uni_query::QueryPlanner::new(schema).plan(ast)
911 && let Ok(batches) = uni_query::query::df_graph::common::execute_subplan(
912 &logical_plan,
913 &HashMap::new(),
914 &HashMap::new(),
915 graph_ctx,
916 session_ctx,
917 &db.storage,
918 &db.schema.schema(),
919 )
920 .await
921 {
922 for row in record_batches_to_locy_rows(&batches) {
923 if let (Some(Value::Int(vid)), Some(node)) = (row.get("_vid"), row.get("n")) {
924 vid_to_node.insert(*vid, node.clone());
925 }
926 }
927 }
928 }
929
930 let enriched_rows: Vec<Row> = rows
931 .into_iter()
932 .map(|row| {
933 row.into_iter()
934 .map(|(k, v)| {
935 if vid_columns.contains(&k)
936 && let Value::Int(vid) = &v
937 {
938 let new_v = vid_to_node.get(vid).cloned().unwrap_or(v);
939 return (k, new_v);
940 }
941 (k, v)
942 })
943 .collect()
944 })
945 .collect();
946 enriched.insert(name, enriched_rows);
947 }
948
949 enriched
950}
951
952fn build_locy_result(
953 derived: HashMap<String, Vec<Row>>,
954 command_results: Vec<CommandResult>,
955 compiled: &CompiledProgram,
956 evaluation_time: Duration,
957 mut orchestrator_stats: LocyStats,
958 warnings: Vec<RuntimeWarning>,
959 approximate_groups: HashMap<String, Vec<String>>,
960) -> LocyResult {
961 let total_facts: usize = derived.values().map(|v| v.len()).sum();
962 orchestrator_stats.strata_evaluated = compiled.strata.len();
963 orchestrator_stats.derived_nodes = total_facts;
964 orchestrator_stats.evaluation_time = evaluation_time;
965
966 LocyResult {
967 derived,
968 stats: orchestrator_stats,
969 command_results,
970 warnings,
971 approximate_groups,
972 }
973}
974
975fn native_store_to_row_store(
976 native: &uni_query::query::df_graph::DerivedStore,
977 compiled: &CompiledProgram,
978) -> RowStore {
979 let mut result = RowStore::new();
980 for name in native.rule_names() {
981 if let Some(batches) = native.get(name) {
982 let rows = record_batches_to_locy_rows(batches);
983 let rule = compiled.rule_catalog.get(name);
984 let columns: Vec<String> = rule
985 .map(|r| r.yield_schema.iter().map(|yc| yc.name.clone()).collect())
986 .unwrap_or_else(|| {
987 rows.first()
988 .map(|r| r.keys().cloned().collect())
989 .unwrap_or_default()
990 });
991 result.insert(name.to_string(), RowRelation::new(columns, rows));
992 }
993 }
994 result
995}
996
997fn map_parse_error(e: uni_cypher::ParseError) -> UniError {
1000 UniError::Parse {
1001 message: format!("LocyParseError: {e}"),
1002 position: None,
1003 line: None,
1004 column: None,
1005 context: None,
1006 }
1007}
1008
1009fn map_compile_error(e: LocyCompileError) -> UniError {
1010 UniError::Query {
1011 message: format!("LocyCompileError: {e}"),
1012 query: None,
1013 }
1014}
1015
1016fn map_runtime_error(e: LocyError) -> UniError {
1017 match e {
1018 LocyError::SavepointFailed { ref message } => UniError::Transaction {
1019 message: format!("LocyRuntimeError: {message}"),
1020 },
1021 other => UniError::Query {
1022 message: format!("LocyRuntimeError: {other}"),
1023 query: None,
1024 },
1025 }
1026}
1027
1028fn map_native_df_error(e: impl std::fmt::Display) -> UniError {
1029 UniError::Query {
1030 message: format!("LocyRuntimeError: {e}"),
1031 query: None,
1032 }
1033}