1use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use arrow_array::RecordBatch;
11use async_trait::async_trait;
12use uni_common::{Result, UniError, Value};
13use uni_cypher::ast::{Expr, Pattern, Query};
14use uni_cypher::locy_ast::RuleOutput;
15use uni_locy::types::CompiledCommand;
16use uni_locy::{
17 CommandResult, CompiledProgram, DerivedFactSet, FactRow, LocyCompileError, LocyConfig,
18 LocyError, LocyStats, RuntimeWarning, compile,
19};
20use uni_query::{QueryMetrics, QueryPlanner};
21
22use crate::api::locy_result::LocyResult;
23use uni_query::query::df_graph::locy_ast_builder::build_match_return_query;
24use uni_query::query::df_graph::locy_delta::{RowRelation, RowStore, extract_cypher_conditions};
25use uni_query::query::df_graph::locy_derive::CollectedDeriveOutput;
26use uni_query::query::df_graph::locy_eval::record_batches_to_locy_rows;
27use uni_query::query::df_graph::locy_explain::ProvenanceStore;
28use uni_query::query::df_graph::{DerivedFactSource, LocyExecutionContext};
29
30#[derive(Debug, Default, Clone)]
36pub struct LocyRuleRegistry {
37 pub rules: HashMap<String, uni_locy::types::CompiledRule>,
39 pub strata: Vec<uni_locy::types::Stratum>,
41 pub sources: Vec<String>,
43}
44
45pub(crate) fn register_rules_on_registry(
49 registry_lock: &std::sync::RwLock<LocyRuleRegistry>,
50 program: &str,
51) -> Result<()> {
52 let ast = uni_cypher::parse_locy(program).map_err(map_parse_error)?;
53 let registry = registry_lock.read().unwrap();
54 let compiled = if registry.rules.is_empty() {
55 drop(registry);
56 compile(&ast).map_err(map_compile_error)?
57 } else {
58 let external_names: Vec<String> = registry.rules.keys().cloned().collect();
59 drop(registry);
60 uni_locy::compile_with_external_rules(&ast, &external_names).map_err(map_compile_error)?
61 };
62 let mut registry = registry_lock.write().unwrap();
63 for (name, rule) in compiled.rule_catalog {
64 registry.rules.insert(name, rule);
65 }
66 let base_id = registry.strata.len();
67 for mut stratum in compiled.strata {
68 stratum.id += base_id;
69 stratum.depends_on = stratum.depends_on.iter().map(|d| base_id + d).collect();
70 registry.strata.push(stratum);
71 }
72 registry.sources.push(program.to_string());
73 Ok(())
74}
75
76pub(crate) async fn evaluate_with_db_and_config(
80 db: &crate::api::UniInner,
81 program: &str,
82 config: &LocyConfig,
83 rule_registry: &std::sync::RwLock<LocyRuleRegistry>,
84) -> Result<LocyResult> {
85 let ast = uni_cypher::parse_locy(program).map_err(map_parse_error)?;
87 let external_names: Option<Vec<String>> = {
88 let registry = rule_registry.read().unwrap();
89 if registry.rules.is_empty() {
90 None
91 } else {
92 Some(registry.rules.keys().cloned().collect())
93 }
94 };
95 let mut compiled = if let Some(names) = external_names {
96 uni_locy::compile_with_external_rules(&ast, &names).map_err(map_compile_error)?
97 } else {
98 compile(&ast).map_err(map_compile_error)?
99 };
100
101 {
103 let registry = rule_registry.read().unwrap();
104 if !registry.rules.is_empty() {
105 for (name, rule) in ®istry.rules {
106 compiled
107 .rule_catalog
108 .entry(name.clone())
109 .or_insert_with(|| rule.clone());
110 }
111 let base_id = registry.strata.len();
112 for stratum in &mut compiled.strata {
113 stratum.id += base_id;
114 stratum.depends_on = stratum.depends_on.iter().map(|d| d + base_id).collect();
115 }
116 let mut merged_strata = registry.strata.clone();
117 merged_strata.append(&mut compiled.strata);
118 compiled.strata = merged_strata;
119 }
120 }
121
122 let locy_l0 = if let Some(ref writer) = db.writer {
128 let w = writer.read().await;
129 Some(w.create_transaction_l0())
130 } else {
131 None };
133 let engine = LocyEngine {
134 db,
135 tx_l0_override: locy_l0.clone(),
136 locy_l0,
137 collect_derive: true,
138 };
139 engine.evaluate_compiled_with_config(compiled, config).await
140}
141
142pub struct LocyEngine<'a> {
144 pub(crate) db: &'a crate::api::UniInner,
145 pub(crate) tx_l0_override: Option<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
148 pub(crate) locy_l0: Option<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
153 pub(crate) collect_derive: bool,
156}
157
158impl crate::api::Uni {
159 #[allow(dead_code)]
163 pub(crate) fn locy(&self) -> LocyEngine<'_> {
164 LocyEngine {
165 db: &self.inner,
166 tx_l0_override: None,
167 locy_l0: None,
168 collect_derive: true,
169 }
170 }
171}
172
173impl<'a> LocyEngine<'a> {
174 pub fn compile_only(&self, program: &str) -> Result<CompiledProgram> {
180 let ast = uni_cypher::parse_locy(program).map_err(map_parse_error)?;
181 let registry = self.db.locy_rule_registry.read().unwrap();
182 if registry.rules.is_empty() {
183 drop(registry);
184 compile(&ast).map_err(map_compile_error)
185 } else {
186 let external_names: Vec<String> = registry.rules.keys().cloned().collect();
187 drop(registry);
188 uni_locy::compile_with_external_rules(&ast, &external_names).map_err(map_compile_error)
189 }
190 }
191
192 pub fn register(&self, program: &str) -> Result<()> {
199 let compiled = self.compile_only(program)?;
200 let mut registry = self.db.locy_rule_registry.write().unwrap();
201 for (name, rule) in compiled.rule_catalog {
202 registry.rules.insert(name, rule);
203 }
204 let base_id = registry.strata.len();
206 for mut stratum in compiled.strata {
207 stratum.id += base_id;
208 stratum.depends_on = stratum.depends_on.iter().map(|d| base_id + d).collect();
209 registry.strata.push(stratum);
210 }
211 Ok(())
212 }
213
214 pub fn clear_registry(&self) {
216 let mut registry = self.db.locy_rule_registry.write().unwrap();
217 registry.rules.clear();
218 registry.strata.clear();
219 }
220
221 pub async fn evaluate(&self, program: &str) -> Result<LocyResult> {
223 self.evaluate_with_config(program, &LocyConfig::default())
224 .await
225 }
226
227 pub fn evaluate_with(&self, program: &str) -> crate::api::locy_builder::InnerLocyBuilder<'_> {
245 crate::api::locy_builder::InnerLocyBuilder::new(self.db, program)
246 }
247
248 pub async fn explain(&self, program: &str) -> Result<LocyResult> {
250 self.evaluate(program).await
251 }
252
253 pub async fn evaluate_with_config(
258 &self,
259 program: &str,
260 config: &LocyConfig,
261 ) -> Result<LocyResult> {
262 let mut compiled = self.compile_only(program)?;
263
264 {
266 let registry = self.db.locy_rule_registry.read().unwrap();
267 if !registry.rules.is_empty() {
268 for (name, rule) in ®istry.rules {
269 compiled
270 .rule_catalog
271 .entry(name.clone())
272 .or_insert_with(|| rule.clone());
273 }
274 let base_id = registry.strata.len();
275 for stratum in &mut compiled.strata {
276 stratum.id += base_id;
277 stratum.depends_on = stratum.depends_on.iter().map(|d| d + base_id).collect();
278 }
279 let mut merged_strata = registry.strata.clone();
280 merged_strata.append(&mut compiled.strata);
281 compiled.strata = merged_strata;
282 }
283 }
284
285 self.evaluate_compiled_with_config(compiled, config).await
286 }
287
288 pub async fn evaluate_compiled_with_config(
294 &self,
295 compiled: CompiledProgram,
296 config: &LocyConfig,
297 ) -> Result<LocyResult> {
298 let start = Instant::now();
299
300 let evaluated_at_version = if self.collect_derive {
302 if let Some(ref w) = self.db.writer {
303 w.read()
304 .await
305 .l0_manager
306 .get_current()
307 .read()
308 .current_version
309 } else {
310 0
311 }
312 } else {
313 0
314 };
315
316 let schema = self.db.schema.schema();
318 let query_planner = uni_query::QueryPlanner::new(schema);
319 let plan_builder = uni_query::query::locy_planner::LocyPlanBuilder::new(&query_planner);
320 let logical = plan_builder
321 .build_program_plan(
322 &compiled,
323 config.max_iterations,
324 config.timeout,
325 config.max_derived_bytes,
326 config.deterministic_best_by,
327 config.strict_probability_domain,
328 config.probability_epsilon,
329 config.exact_probability,
330 config.max_bdd_variables,
331 config.top_k_proofs,
332 )
333 .map_err(|e| UniError::Query {
334 message: format!("LocyPlanBuildError: {e}"),
335 query: None,
336 })?;
337
338 let mut df_executor = uni_query::Executor::new(self.db.storage.clone());
340 df_executor.set_config(self.db.config.clone());
341 if let Some(ref w) = self.db.writer {
342 df_executor.set_writer(w.clone());
343 }
344 df_executor.set_xervo_runtime(self.db.xervo_runtime.clone());
345 df_executor.set_procedure_registry(self.db.procedure_registry.clone());
346 if let Ok(reg) = self.db.custom_functions.read()
347 && !reg.is_empty()
348 {
349 df_executor.set_custom_functions(std::sync::Arc::new(reg.clone()));
350 }
351
352 let (session_ctx, planner, _prop_mgr) = df_executor
353 .create_datafusion_planner(&self.db.properties, &config.params)
354 .await
355 .map_err(map_native_df_error)?;
356
357 let exec_plan = planner.plan(&logical).map_err(map_native_df_error)?;
359
360 let has_explain = compiled
362 .commands
363 .iter()
364 .any(|c| matches!(c, CompiledCommand::ExplainRule(_)));
365 let has_prob_fold = compiled.strata.iter().any(|s| {
366 s.rules.iter().any(|r| {
367 r.clauses.iter().any(|c| {
368 c.fold.iter().any(|f| {
369 if let uni_cypher::ast::Expr::FunctionCall { name, .. } = &f.aggregate {
370 matches!(name.to_uppercase().as_str(), "MNOR" | "MPROD")
371 } else {
372 false
373 }
374 })
375 })
376 })
377 });
378 let needs_tracker = has_explain || has_prob_fold;
379 let tracker: Option<Arc<uni_query::query::df_graph::ProvenanceStore>> = if needs_tracker {
380 Some(Arc::new(uni_query::query::df_graph::ProvenanceStore::new()))
381 } else {
382 None
383 };
384
385 let (
386 derived_store_slot,
387 iteration_counts_slot,
388 peak_memory_slot,
389 warnings_slot,
390 approximate_slot,
391 command_results_slot,
392 ) = if let Some(program_exec) = exec_plan
393 .as_any()
394 .downcast_ref::<uni_query::query::df_graph::LocyProgramExec>(
395 ) {
396 if let Some(ref t) = tracker {
397 program_exec.set_derivation_tracker(Arc::clone(t));
398 }
399 (
400 program_exec.derived_store_slot(),
401 program_exec.iteration_counts_slot(),
402 program_exec.peak_memory_slot(),
403 program_exec.warnings_slot(),
404 program_exec.approximate_slot(),
405 program_exec.command_results_slot(),
406 )
407 } else {
408 (
409 Arc::new(std::sync::RwLock::new(None)),
410 Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
411 Arc::new(std::sync::RwLock::new(0usize)),
412 Arc::new(std::sync::RwLock::new(Vec::new())),
413 Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
414 Arc::new(std::sync::RwLock::new(Vec::new())),
415 )
416 };
417
418 let _stats_batches = uni_query::Executor::collect_batches(&session_ctx, exec_plan)
420 .await
421 .map_err(map_native_df_error)?;
422
423 let native_store = derived_store_slot
425 .write()
426 .unwrap()
427 .take()
428 .unwrap_or_default();
429
430 let mut orch_store = native_store_to_row_store(&native_store, &compiled);
432
433 {
437 let orch_rows: HashMap<String, Vec<FactRow>> = orch_store
438 .iter()
439 .map(|(k, v)| (k.clone(), v.rows.clone()))
440 .collect();
441 let enriched_rows = enrich_vids_with_nodes(
442 self.db,
443 &native_store,
444 orch_rows,
445 planner.graph_ctx(),
446 planner.session_ctx(),
447 )
448 .await;
449 for (name, rows) in enriched_rows {
450 if let Some(rel) = orch_store.get_mut(&name) {
451 rel.rows = rows;
452 }
453 }
454 }
455
456 let native_ctx = NativeExecutionAdapter::new(
458 self.db,
459 &native_store,
460 &compiled,
461 planner.graph_ctx().clone(),
462 planner.session_ctx().clone(),
463 config.params.clone(),
464 self.tx_l0_override.clone(),
465 );
466 *native_ctx.locy_l0.lock().unwrap() = self.locy_l0.clone();
468 let mut locy_stats = LocyStats {
469 total_iterations: iteration_counts_slot
470 .read()
471 .map(|c| c.values().sum::<usize>())
472 .unwrap_or(0),
473 peak_memory_bytes: peak_memory_slot.read().map(|v| *v).unwrap_or(0),
474 ..LocyStats::default()
475 };
476 let approx_for_explain = approximate_slot
477 .read()
478 .map(|a| a.clone())
479 .unwrap_or_default();
480 let inline_map: HashMap<usize, CommandResult> =
482 command_results_slot.write().unwrap().drain(..).collect();
483
484 let mut command_results = Vec::new();
485 let mut collected_derives: Vec<CollectedDeriveOutput> = Vec::new();
486 for (cmd_idx, cmd) in compiled.commands.iter().enumerate() {
487 if let Some(result) = inline_map.get(&cmd_idx) {
488 command_results.push(result.clone());
490 continue;
491 }
492 let result = dispatch_native_command(
493 cmd,
494 &compiled,
495 &native_ctx,
496 config,
497 &mut orch_store,
498 &mut locy_stats,
499 tracker.clone(),
500 start,
501 &approx_for_explain,
502 self.collect_derive,
503 &mut collected_derives,
504 )
505 .await
506 .map_err(map_runtime_error)?;
507 command_results.push(result);
508 }
509
510 let evaluation_time = start.elapsed();
511
512 let mut base_derived: HashMap<String, Vec<FactRow>> = native_store
514 .rule_names()
515 .filter_map(|name| {
516 native_store
517 .get(name)
518 .map(|batches| (name.to_string(), record_batches_to_locy_rows(batches)))
519 })
520 .collect();
521
522 let approximate_groups = approximate_slot
524 .read()
525 .map(|a| a.clone())
526 .unwrap_or_default();
527 for (rule_name, groups) in &approximate_groups {
528 if !groups.is_empty()
529 && let Some(rows) = base_derived.get_mut(rule_name)
530 {
531 for row in rows.iter_mut() {
532 row.insert("_approximate".to_string(), Value::Bool(true));
533 }
534 }
535 }
536
537 let enriched_derived = enrich_vids_with_nodes(
538 self.db,
539 &native_store,
540 base_derived,
541 planner.graph_ctx(),
542 planner.session_ctx(),
543 )
544 .await;
545
546 let derived_fact_set = if !collected_derives.is_empty() {
548 let mut all_vertices = HashMap::new();
549 let mut all_edges = Vec::new();
550 let mut all_queries = Vec::new();
551 for output in collected_derives {
552 for (label, verts) in output.vertices {
553 all_vertices
554 .entry(label)
555 .or_insert_with(Vec::new)
556 .extend(verts);
557 }
558 all_edges.extend(output.edges);
559 all_queries.extend(output.queries);
560 }
561 Some(DerivedFactSet {
562 vertices: all_vertices,
563 edges: all_edges,
564 stats: locy_stats.clone(),
565 evaluated_at_version,
566 mutation_queries: all_queries,
567 })
568 } else {
569 None
570 };
571
572 let warnings = warnings_slot.read().map(|w| w.clone()).unwrap_or_default();
574 Ok(build_locy_result(
575 enriched_derived,
576 command_results,
577 &compiled,
578 evaluation_time,
579 locy_stats,
580 warnings,
581 approximate_groups,
582 derived_fact_set,
583 ))
584 }
585
586 async fn run_strata_native(
592 &self,
593 compiled: &CompiledProgram,
594 config: &LocyConfig,
595 ) -> Result<uni_query::query::df_graph::DerivedStore> {
596 let schema = self.db.schema.schema();
597 let query_planner = uni_query::QueryPlanner::new(schema);
598 let plan_builder = uni_query::query::locy_planner::LocyPlanBuilder::new(&query_planner);
599 let logical = plan_builder
600 .build_program_plan(
601 compiled,
602 config.max_iterations,
603 config.timeout,
604 config.max_derived_bytes,
605 config.deterministic_best_by,
606 config.strict_probability_domain,
607 config.probability_epsilon,
608 config.exact_probability,
609 config.max_bdd_variables,
610 config.top_k_proofs,
611 )
612 .map_err(|e| UniError::Query {
613 message: format!("LocyPlanBuildError: {e}"),
614 query: None,
615 })?;
616
617 let mut df_executor = uni_query::Executor::new(self.db.storage.clone());
618 df_executor.set_config(self.db.config.clone());
619 if let Some(ref w) = self.db.writer {
620 df_executor.set_writer(w.clone());
621 }
622 if let Some(ref l0) = self.tx_l0_override {
625 df_executor.set_transaction_l0(l0.clone());
626 }
627 df_executor.set_xervo_runtime(self.db.xervo_runtime.clone());
628 df_executor.set_procedure_registry(self.db.procedure_registry.clone());
629
630 let (session_ctx, planner, _) = df_executor
631 .create_datafusion_planner(&self.db.properties, &config.params)
632 .await
633 .map_err(map_native_df_error)?;
634 let exec_plan = planner.plan(&logical).map_err(map_native_df_error)?;
635
636 let derived_store_slot = if let Some(program_exec) =
637 exec_plan
638 .as_any()
639 .downcast_ref::<uni_query::query::df_graph::LocyProgramExec>()
640 {
641 program_exec.derived_store_slot()
642 } else {
643 Arc::new(std::sync::RwLock::new(None))
644 };
645
646 let _ = uni_query::Executor::collect_batches(&session_ctx, exec_plan)
647 .await
648 .map_err(map_native_df_error)?;
649 Ok(derived_store_slot
650 .write()
651 .unwrap()
652 .take()
653 .unwrap_or_default())
654 }
655}
656
657struct NativeExecutionAdapter<'a> {
660 db: &'a crate::api::UniInner,
661 native_store: &'a uni_query::query::df_graph::DerivedStore,
662 compiled: &'a CompiledProgram,
663 graph_ctx: Arc<uni_query::query::df_graph::GraphExecutionContext>,
665 session_ctx: Arc<parking_lot::RwLock<datafusion::prelude::SessionContext>>,
666 params: HashMap<String, Value>,
669 tx_l0_override: Option<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
671 locy_l0: std::sync::Mutex<Option<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>>,
674 l0_save_stack:
676 std::sync::Mutex<Vec<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>>,
677}
678
679impl<'a> NativeExecutionAdapter<'a> {
680 fn new(
681 db: &'a crate::api::UniInner,
682 native_store: &'a uni_query::query::df_graph::DerivedStore,
683 compiled: &'a CompiledProgram,
684 graph_ctx: Arc<uni_query::query::df_graph::GraphExecutionContext>,
685 session_ctx: Arc<parking_lot::RwLock<datafusion::prelude::SessionContext>>,
686 params: HashMap<String, Value>,
687 tx_l0_override: Option<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
688 ) -> Self {
689 Self {
690 db,
691 native_store,
692 compiled,
693 graph_ctx,
694 session_ctx,
695 params,
696 tx_l0_override,
697 locy_l0: std::sync::Mutex::new(None),
698 l0_save_stack: std::sync::Mutex::new(Vec::new()),
699 }
700 }
701
702 async fn execute_query_ast(
704 &self,
705 ast: Query,
706 ) -> std::result::Result<Vec<RecordBatch>, LocyError> {
707 let schema = self.db.schema.schema();
708 let logical_plan =
709 QueryPlanner::new(schema)
710 .plan(ast)
711 .map_err(|e| LocyError::ExecutorError {
712 message: e.to_string(),
713 })?;
714 uni_query::query::df_graph::common::execute_subplan(
715 &logical_plan,
716 &self.params,
717 &HashMap::new(),
718 &self.graph_ctx,
719 &self.session_ctx,
720 &self.db.storage,
721 &self.db.schema.schema(),
722 )
723 .await
724 .map_err(|e| LocyError::ExecutorError {
725 message: e.to_string(),
726 })
727 }
728}
729
730#[async_trait(?Send)]
731impl DerivedFactSource for NativeExecutionAdapter<'_> {
732 fn lookup_derived(&self, rule_name: &str) -> std::result::Result<Vec<FactRow>, LocyError> {
733 let batches = self
734 .native_store
735 .get(rule_name)
736 .map(|v| v.as_slice())
737 .unwrap_or(&[]);
738 Ok(record_batches_to_locy_rows(batches))
739 }
740
741 fn lookup_derived_batches(
742 &self,
743 rule_name: &str,
744 ) -> std::result::Result<Vec<RecordBatch>, LocyError> {
745 Ok(self
746 .native_store
747 .get(rule_name)
748 .map(|v| v.to_vec())
749 .unwrap_or_default())
750 }
751
752 async fn execute_pattern(
753 &self,
754 pattern: &Pattern,
755 where_conditions: &[Expr],
756 ) -> std::result::Result<Vec<RecordBatch>, LocyError> {
757 let query = build_match_return_query(pattern, where_conditions);
758 let schema = self.db.schema.schema();
759 let logical_plan =
760 QueryPlanner::new(schema)
761 .plan(query)
762 .map_err(|e| LocyError::ExecutorError {
763 message: e.to_string(),
764 })?;
765
766 let tx_l0_for_ctx = self
770 .locy_l0
771 .lock()
772 .unwrap()
773 .clone()
774 .or_else(|| self.tx_l0_override.clone());
775 let transaction_ctx: Option<Arc<uni_query::query::df_graph::GraphExecutionContext>> =
776 if let Some(tx_l0) = tx_l0_for_ctx {
777 if let Some(writer_arc) = &self.db.writer {
778 if let Ok(writer) = writer_arc.try_read() {
779 let l0_ctx = uni_query::query::df_graph::L0Context {
780 current_l0: Some(writer.l0_manager.get_current()),
781 transaction_l0: Some(tx_l0),
782 pending_flush_l0s: writer.l0_manager.get_pending_flush(),
783 };
784 Some(Arc::new(
785 uni_query::query::df_graph::GraphExecutionContext::with_l0_context(
786 self.db.storage.clone(),
787 l0_ctx,
788 self.graph_ctx.property_manager().clone(),
789 ),
790 ))
791 } else {
792 None
793 }
794 } else {
795 None
796 }
797 } else {
798 None
799 };
800
801 let effective_ctx = transaction_ctx.as_ref().unwrap_or(&self.graph_ctx);
802
803 uni_query::query::df_graph::common::execute_subplan(
805 &logical_plan,
806 &self.params,
807 &HashMap::new(),
808 effective_ctx,
809 &self.session_ctx,
810 &self.db.storage,
811 &self.db.schema.schema(),
812 )
813 .await
814 .map_err(|e| LocyError::ExecutorError {
815 message: e.to_string(),
816 })
817 }
818}
819
820#[async_trait(?Send)]
821impl LocyExecutionContext for NativeExecutionAdapter<'_> {
822 async fn lookup_derived_enriched(
823 &self,
824 rule_name: &str,
825 ) -> std::result::Result<Vec<FactRow>, LocyError> {
826 use arrow_schema::DataType;
827
828 if let Some(rule) = self.compiled.rule_catalog.get(rule_name) {
829 let is_derive_rule = rule
830 .clauses
831 .iter()
832 .all(|c| matches!(c.output, RuleOutput::Derive(_)));
833 if is_derive_rule {
834 let mut all_rows = Vec::new();
835 for clause in &rule.clauses {
836 let cypher_conds = extract_cypher_conditions(&clause.where_conditions);
837 let raw_batches = self
838 .execute_pattern(&clause.match_pattern, &cypher_conds)
839 .await?;
840 all_rows.extend(record_batches_to_locy_rows(&raw_batches));
841 }
842 return Ok(all_rows);
843 }
844 }
845
846 let batches = self
847 .native_store
848 .get(rule_name)
849 .map(|v| v.as_slice())
850 .unwrap_or(&[]);
851 let rows = record_batches_to_locy_rows(batches);
852
853 let vid_columns: HashSet<String> = batches
854 .first()
855 .map(|batch| {
856 batch
857 .schema()
858 .fields()
859 .iter()
860 .filter(|f| *f.data_type() == DataType::UInt64)
861 .map(|f| f.name().clone())
862 .collect()
863 })
864 .unwrap_or_default();
865
866 if vid_columns.is_empty() {
867 return Ok(rows);
868 }
869
870 let unique_vids: HashSet<i64> = rows
871 .iter()
872 .flat_map(|row| {
873 vid_columns.iter().filter_map(|col| {
874 if let Some(Value::Int(vid)) = row.get(col) {
875 Some(*vid)
876 } else {
877 None
878 }
879 })
880 })
881 .collect();
882
883 if unique_vids.is_empty() {
884 return Ok(rows);
885 }
886
887 let vids_literal = unique_vids
888 .iter()
889 .map(|v| v.to_string())
890 .collect::<Vec<_>>()
891 .join(", ");
892 let query_str =
893 format!("MATCH (n) WHERE id(n) IN [{vids_literal}] RETURN id(n) AS _vid, n");
894 let mut vid_to_node: HashMap<i64, Value> = HashMap::new();
895 if let Ok(ast) = uni_cypher::parse(&query_str)
896 && let Ok(batches) = self.execute_query_ast(ast).await
897 {
898 for row in record_batches_to_locy_rows(&batches) {
899 if let (Some(Value::Int(vid)), Some(node)) = (row.get("_vid"), row.get("n")) {
900 vid_to_node.insert(*vid, node.clone());
901 }
902 }
903 }
904
905 Ok(rows
906 .into_iter()
907 .map(|row| {
908 row.into_iter()
909 .map(|(k, v)| {
910 if vid_columns.contains(&k)
911 && let Value::Int(vid) = &v
912 {
913 let new_v = vid_to_node.get(vid).cloned().unwrap_or(v);
914 return (k, new_v);
915 }
916 (k, v)
917 })
918 .collect()
919 })
920 .collect())
921 }
922
923 async fn execute_cypher_read(
924 &self,
925 ast: Query,
926 ) -> std::result::Result<Vec<FactRow>, LocyError> {
927 let active_l0 = self.locy_l0.lock().unwrap().clone();
930 let result = if let Some(ref l0) = active_l0 {
931 self.db
932 .execute_ast_internal_with_tx_l0(
933 ast,
934 "<locy>",
935 HashMap::new(),
936 self.db.config.clone(),
937 l0.clone(),
938 )
939 .await
940 } else if let Some(ref tx_l0) = self.tx_l0_override {
941 self.db
942 .execute_ast_internal_with_tx_l0(
943 ast,
944 "<locy>",
945 HashMap::new(),
946 self.db.config.clone(),
947 tx_l0.clone(),
948 )
949 .await
950 } else {
951 self.db
952 .execute_ast_internal(ast, "<locy>", HashMap::new(), self.db.config.clone())
953 .await
954 }
955 .map_err(|e| LocyError::ExecutorError {
956 message: e.to_string(),
957 })?;
958 Ok(result
959 .into_rows()
960 .into_iter()
961 .map(|row| {
962 let cols: Vec<String> = row.columns().to_vec();
963 cols.into_iter().zip(row.into_values()).collect()
964 })
965 .collect())
966 }
967
968 async fn execute_mutation(
969 &self,
970 ast: Query,
971 params: HashMap<String, Value>,
972 ) -> std::result::Result<usize, LocyError> {
973 let active_l0 = self.locy_l0.lock().unwrap().clone();
975 if let Some(ref l0) = active_l0 {
976 let before = l0.read().mutation_count;
977 self.db
978 .execute_ast_internal_with_tx_l0(
979 ast,
980 "<locy>",
981 params,
982 self.db.config.clone(),
983 l0.clone(),
984 )
985 .await
986 .map_err(|e| LocyError::ExecutorError {
987 message: e.to_string(),
988 })?;
989 let after = l0.read().mutation_count;
990 return Ok(after.saturating_sub(before));
991 }
992 if let Some(ref tx_l0) = self.tx_l0_override {
993 let before = tx_l0.read().mutation_count;
994 self.db
995 .execute_ast_internal_with_tx_l0(
996 ast,
997 "<locy>",
998 params,
999 self.db.config.clone(),
1000 tx_l0.clone(),
1001 )
1002 .await
1003 .map_err(|e| LocyError::ExecutorError {
1004 message: e.to_string(),
1005 })?;
1006 let after = tx_l0.read().mutation_count;
1007 return Ok(after.saturating_sub(before));
1008 }
1009 let before = self.db.get_mutation_count().await;
1011 self.db
1012 .execute_ast_internal(ast, "<locy>", params, self.db.config.clone())
1013 .await
1014 .map_err(|e| LocyError::ExecutorError {
1015 message: e.to_string(),
1016 })?;
1017 let after = self.db.get_mutation_count().await;
1018 Ok(after.saturating_sub(before))
1019 }
1020
1021 async fn fork_l0(&self) -> std::result::Result<(), LocyError> {
1022 let mut guard = self.locy_l0.lock().unwrap();
1023 let current = guard.as_ref().ok_or_else(|| LocyError::SavepointFailed {
1024 message: "no active Locy L0 to fork".into(),
1025 })?;
1026 let cloned = Arc::new(parking_lot::RwLock::new(current.read().clone()));
1028 let previous = guard.replace(cloned).unwrap();
1030 self.l0_save_stack.lock().unwrap().push(previous);
1031 Ok(())
1032 }
1033
1034 async fn restore_l0(&self) -> std::result::Result<(), LocyError> {
1035 let saved =
1036 self.l0_save_stack
1037 .lock()
1038 .unwrap()
1039 .pop()
1040 .ok_or_else(|| LocyError::SavepointFailed {
1041 message: "no saved L0 to restore".into(),
1042 })?;
1043 let mut guard = self.locy_l0.lock().unwrap();
1044 *guard = Some(saved);
1045 Ok(())
1046 }
1047
1048 async fn re_evaluate_strata(
1049 &self,
1050 program: &CompiledProgram,
1051 config: &LocyConfig,
1052 ) -> std::result::Result<RowStore, LocyError> {
1053 let strata_only = CompiledProgram {
1054 strata: program.strata.clone(),
1055 rule_catalog: program.rule_catalog.clone(),
1056 warnings: vec![],
1057 commands: vec![],
1058 };
1059 let locy_l0 = self.locy_l0.lock().unwrap().clone();
1061 let engine = LocyEngine {
1062 db: self.db,
1063 tx_l0_override: locy_l0.clone(),
1064 locy_l0,
1065 collect_derive: false,
1066 };
1067 let native_store = engine
1068 .run_strata_native(&strata_only, config)
1069 .await
1070 .map_err(|e| LocyError::ExecutorError {
1071 message: e.to_string(),
1072 })?;
1073 let mut store = native_store_to_row_store(&native_store, program);
1074
1075 let store_rows: HashMap<String, Vec<FactRow>> = store
1078 .iter()
1079 .map(|(k, v)| (k.clone(), v.rows.clone()))
1080 .collect();
1081 let enriched = enrich_vids_with_nodes(
1082 self.db,
1083 &native_store,
1084 store_rows,
1085 &self.graph_ctx,
1086 &self.session_ctx,
1087 )
1088 .await;
1089 for (name, rows) in enriched {
1090 if let Some(rel) = store.get_mut(&name) {
1091 rel.rows = rows;
1092 }
1093 }
1094
1095 Ok(store)
1096 }
1097}
1098
1099#[allow(clippy::too_many_arguments)]
1102fn dispatch_native_command<'a>(
1103 cmd: &'a CompiledCommand,
1104 program: &'a CompiledProgram,
1105 ctx: &'a NativeExecutionAdapter<'a>,
1106 config: &'a LocyConfig,
1107 orch_store: &'a mut RowStore,
1108 stats: &'a mut LocyStats,
1109 tracker: Option<Arc<ProvenanceStore>>,
1110 start: Instant,
1111 approximate_groups: &'a HashMap<String, Vec<String>>,
1112 collect_derive: bool,
1113 collected_derives: &'a mut Vec<CollectedDeriveOutput>,
1114) -> std::pin::Pin<
1115 Box<dyn std::future::Future<Output = std::result::Result<CommandResult, LocyError>> + 'a>,
1116> {
1117 Box::pin(async move {
1118 match cmd {
1119 CompiledCommand::GoalQuery(gq) => {
1120 let rows = uni_query::query::df_graph::locy_query::evaluate_query(
1121 gq, program, ctx, config, orch_store, stats, start,
1122 )
1123 .await?;
1124 Ok(CommandResult::Query(rows))
1125 }
1126 CompiledCommand::ExplainRule(eq) => {
1127 let node = uni_query::query::df_graph::locy_explain::explain_rule(
1128 eq,
1129 program,
1130 ctx,
1131 config,
1132 orch_store,
1133 stats,
1134 tracker.as_deref(),
1135 Some(approximate_groups),
1136 )
1137 .await?;
1138 Ok(CommandResult::Explain(node))
1139 }
1140 CompiledCommand::Assume(ca) => {
1141 let rows = uni_query::query::df_graph::locy_assume::evaluate_assume(
1142 ca, program, ctx, config, stats,
1143 )
1144 .await?;
1145 Ok(CommandResult::Assume(rows))
1146 }
1147 CompiledCommand::Abduce(aq) => {
1148 let result = uni_query::query::df_graph::locy_abduce::evaluate_abduce(
1149 aq,
1150 program,
1151 ctx,
1152 config,
1153 orch_store,
1154 stats,
1155 tracker.as_deref(),
1156 )
1157 .await?;
1158 Ok(CommandResult::Abduce(result))
1159 }
1160 CompiledCommand::DeriveCommand(dc) => {
1161 if collect_derive {
1162 let output = uni_query::query::df_graph::locy_derive::collect_derive_facts(
1164 dc, program, ctx,
1165 )
1166 .await?;
1167 let affected = output.affected;
1168
1169 if ctx.tx_l0_override.is_some() {
1176 for query in &output.queries {
1177 ctx.execute_mutation(query.clone(), HashMap::new()).await?;
1178 }
1179 }
1180
1181 collected_derives.push(output);
1182 Ok(CommandResult::Derive { affected })
1183 } else {
1184 let affected = uni_query::query::df_graph::locy_derive::derive_command(
1186 dc, program, ctx, stats,
1187 )
1188 .await?;
1189 Ok(CommandResult::Derive { affected })
1190 }
1191 }
1192 CompiledCommand::Cypher(q) => {
1193 let rows = ctx.execute_cypher_read(q.clone()).await?;
1194 stats.queries_executed += 1;
1195 Ok(CommandResult::Cypher(rows))
1196 }
1197 }
1198 })
1199}
1200
1201async fn enrich_vids_with_nodes(
1204 db: &crate::api::UniInner,
1205 native_store: &uni_query::query::df_graph::DerivedStore,
1206 derived: HashMap<String, Vec<FactRow>>,
1207 graph_ctx: &Arc<uni_query::query::df_graph::GraphExecutionContext>,
1208 session_ctx: &Arc<parking_lot::RwLock<datafusion::prelude::SessionContext>>,
1209) -> HashMap<String, Vec<FactRow>> {
1210 use arrow_schema::DataType;
1211 let mut enriched = HashMap::new();
1212
1213 for (name, rows) in derived {
1214 let vid_columns: HashSet<String> = native_store
1215 .get(&name)
1216 .and_then(|batches| batches.first())
1217 .map(|batch| {
1218 batch
1219 .schema()
1220 .fields()
1221 .iter()
1222 .filter(|f| *f.data_type() == DataType::UInt64)
1223 .map(|f| f.name().clone())
1224 .collect()
1225 })
1226 .unwrap_or_default();
1227
1228 if vid_columns.is_empty() {
1229 enriched.insert(name, rows);
1230 continue;
1231 }
1232
1233 let unique_vids: HashSet<i64> = rows
1234 .iter()
1235 .flat_map(|row| {
1236 vid_columns.iter().filter_map(|col| {
1237 if let Some(Value::Int(vid)) = row.get(col) {
1238 Some(*vid)
1239 } else {
1240 None
1241 }
1242 })
1243 })
1244 .collect();
1245
1246 if unique_vids.is_empty() {
1247 enriched.insert(name, rows);
1248 continue;
1249 }
1250
1251 let vids_literal = unique_vids
1252 .iter()
1253 .map(|v| v.to_string())
1254 .collect::<Vec<_>>()
1255 .join(", ");
1256 let query_str = format!(
1257 "MATCH (n) WHERE id(n) IN [{}] RETURN id(n) AS _vid, n",
1258 vids_literal
1259 );
1260 let mut vid_to_node: HashMap<i64, Value> = HashMap::new();
1261 if let Ok(ast) = uni_cypher::parse(&query_str) {
1262 let schema = db.schema.schema();
1263 if let Ok(logical_plan) = uni_query::QueryPlanner::new(schema).plan(ast)
1264 && let Ok(batches) = uni_query::query::df_graph::common::execute_subplan(
1265 &logical_plan,
1266 &HashMap::new(),
1267 &HashMap::new(),
1268 graph_ctx,
1269 session_ctx,
1270 &db.storage,
1271 &db.schema.schema(),
1272 )
1273 .await
1274 {
1275 for row in record_batches_to_locy_rows(&batches) {
1276 if let (Some(Value::Int(vid)), Some(node)) = (row.get("_vid"), row.get("n")) {
1277 vid_to_node.insert(*vid, node.clone());
1278 }
1279 }
1280 }
1281 }
1282
1283 let enriched_rows: Vec<FactRow> = rows
1284 .into_iter()
1285 .map(|row| {
1286 row.into_iter()
1287 .map(|(k, v)| {
1288 if vid_columns.contains(&k)
1289 && let Value::Int(vid) = &v
1290 {
1291 let new_v = vid_to_node.get(vid).cloned().unwrap_or(v);
1292 return (k, new_v);
1293 }
1294 (k, v)
1295 })
1296 .collect()
1297 })
1298 .collect();
1299 enriched.insert(name, enriched_rows);
1300 }
1301
1302 enriched
1303}
1304
1305#[allow(clippy::too_many_arguments)]
1306fn build_locy_result(
1307 derived: HashMap<String, Vec<FactRow>>,
1308 command_results: Vec<CommandResult>,
1309 compiled: &CompiledProgram,
1310 evaluation_time: Duration,
1311 mut orchestrator_stats: LocyStats,
1312 warnings: Vec<RuntimeWarning>,
1313 approximate_groups: HashMap<String, Vec<String>>,
1314 derived_fact_set: Option<DerivedFactSet>,
1315) -> LocyResult {
1316 let total_facts: usize = derived.values().map(|v| v.len()).sum();
1317 orchestrator_stats.strata_evaluated = compiled.strata.len();
1318 orchestrator_stats.derived_nodes = total_facts;
1319 orchestrator_stats.evaluation_time = evaluation_time;
1320
1321 let inner = uni_locy::LocyResult {
1322 derived,
1323 stats: orchestrator_stats,
1324 command_results,
1325 warnings,
1326 approximate_groups,
1327 derived_fact_set,
1328 };
1329 let metrics = QueryMetrics {
1330 total_time: evaluation_time,
1331 exec_time: evaluation_time,
1332 rows_returned: total_facts,
1333 ..Default::default()
1334 };
1335 LocyResult::new(inner, metrics)
1336}
1337
1338fn native_store_to_row_store(
1339 native: &uni_query::query::df_graph::DerivedStore,
1340 compiled: &CompiledProgram,
1341) -> RowStore {
1342 let mut result = RowStore::new();
1343 for name in native.rule_names() {
1344 if let Some(batches) = native.get(name) {
1345 let rows = record_batches_to_locy_rows(batches);
1346 let rule = compiled.rule_catalog.get(name);
1347 let columns: Vec<String> = rule
1348 .map(|r| r.yield_schema.iter().map(|yc| yc.name.clone()).collect())
1349 .unwrap_or_else(|| {
1350 rows.first()
1351 .map(|r| r.keys().cloned().collect())
1352 .unwrap_or_default()
1353 });
1354 result.insert(name.to_string(), RowRelation::new(columns, rows));
1355 }
1356 }
1357 result
1358}
1359
1360fn map_parse_error(e: uni_cypher::ParseError) -> UniError {
1363 UniError::Parse {
1364 message: format!("LocyParseError: {e}"),
1365 position: None,
1366 line: None,
1367 column: None,
1368 context: None,
1369 }
1370}
1371
1372fn map_compile_error(e: LocyCompileError) -> UniError {
1373 UniError::Query {
1374 message: format!("LocyCompileError: {e}"),
1375 query: None,
1376 }
1377}
1378
1379fn map_runtime_error(e: LocyError) -> UniError {
1380 match e {
1381 LocyError::SavepointFailed { ref message } => UniError::Transaction {
1382 message: format!("LocyRuntimeError: {message}"),
1383 },
1384 other => UniError::Query {
1385 message: format!("LocyRuntimeError: {other}"),
1386 query: None,
1387 },
1388 }
1389}
1390
1391fn map_native_df_error(e: impl std::fmt::Display) -> UniError {
1392 UniError::Query {
1393 message: format!("LocyRuntimeError: {e}"),
1394 query: None,
1395 }
1396}