1use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use arrow_array::RecordBatch;
11use async_trait::async_trait;
12use uni_common::{Result, UniError, Value};
13use uni_cypher::ast::{Expr, Pattern, Query};
14use uni_cypher::locy_ast::RuleOutput;
15use uni_locy::types::CompiledCommand;
16use uni_locy::{
17 CommandResult, CompiledProgram, DerivedFactSet, FactRow, LocyCompileError, LocyConfig,
18 LocyError, LocyStats, RuntimeWarning, compile,
19};
20use uni_query::{QueryMetrics, QueryPlanner};
21
22use crate::api::locy_result::LocyResult;
23use uni_query::query::df_graph::locy_ast_builder::build_match_return_query;
24use uni_query::query::df_graph::locy_delta::{RowRelation, RowStore, extract_cypher_conditions};
25use uni_query::query::df_graph::locy_derive::CollectedDeriveOutput;
26use uni_query::query::df_graph::locy_eval::record_batches_to_locy_rows;
27use uni_query::query::df_graph::locy_explain::ProvenanceStore;
28use uni_query::query::df_graph::{DerivedFactSource, LocyExecutionContext};
29
30#[derive(Debug, Default, Clone)]
36pub struct LocyRuleRegistry {
37 pub rules: HashMap<String, uni_locy::types::CompiledRule>,
39 pub strata: Vec<uni_locy::types::Stratum>,
41 pub sources: Vec<String>,
43}
44
45pub(crate) fn register_rules_on_registry(
49 registry_lock: &std::sync::RwLock<LocyRuleRegistry>,
50 program: &str,
51) -> Result<()> {
52 let ast = uni_cypher::parse_locy(program).map_err(map_parse_error)?;
53 let registry = registry_lock.read().unwrap();
54 let compiled = if registry.rules.is_empty() {
55 drop(registry);
56 compile(&ast).map_err(map_compile_error)?
57 } else {
58 let external_names: Vec<String> = registry.rules.keys().cloned().collect();
59 drop(registry);
60 uni_locy::compile_with_external_rules(&ast, &external_names).map_err(map_compile_error)?
61 };
62 let mut registry = registry_lock.write().unwrap();
63 for (name, rule) in compiled.rule_catalog {
64 registry.rules.insert(name, rule);
65 }
66 let base_id = registry.strata.len();
67 for mut stratum in compiled.strata {
68 stratum.id += base_id;
69 stratum.depends_on = stratum.depends_on.iter().map(|d| base_id + d).collect();
70 registry.strata.push(stratum);
71 }
72 registry.sources.push(program.to_string());
73 Ok(())
74}
75
76pub(crate) async fn evaluate_with_db_and_config(
80 db: &crate::api::UniInner,
81 program: &str,
82 config: &LocyConfig,
83 rule_registry: &std::sync::RwLock<LocyRuleRegistry>,
84) -> Result<LocyResult> {
85 let ast = uni_cypher::parse_locy(program).map_err(map_parse_error)?;
87 let external_names: Option<Vec<String>> = {
88 let registry = rule_registry.read().unwrap();
89 if registry.rules.is_empty() {
90 None
91 } else {
92 Some(registry.rules.keys().cloned().collect())
93 }
94 };
95 let mut compiled = if let Some(names) = external_names {
96 uni_locy::compile_with_external_rules(&ast, &names).map_err(map_compile_error)?
97 } else {
98 compile(&ast).map_err(map_compile_error)?
99 };
100
101 {
103 let registry = rule_registry.read().unwrap();
104 if !registry.rules.is_empty() {
105 for (name, rule) in ®istry.rules {
106 compiled
107 .rule_catalog
108 .entry(name.clone())
109 .or_insert_with(|| rule.clone());
110 }
111 let base_id = registry.strata.len();
112 for stratum in &mut compiled.strata {
113 stratum.id += base_id;
114 stratum.depends_on = stratum.depends_on.iter().map(|d| d + base_id).collect();
115 }
116 let mut merged_strata = registry.strata.clone();
117 merged_strata.append(&mut compiled.strata);
118 compiled.strata = merged_strata;
119 }
120 }
121
122 let locy_l0 = if let Some(ref writer) = db.writer {
128 let w = writer.read().await;
129 Some(w.create_transaction_l0())
130 } else {
131 None };
133 let engine = LocyEngine {
134 db,
135 tx_l0_override: locy_l0.clone(),
136 locy_l0,
137 collect_derive: true,
138 };
139 engine.evaluate_compiled_with_config(compiled, config).await
140}
141
142pub struct LocyEngine<'a> {
144 pub(crate) db: &'a crate::api::UniInner,
145 pub(crate) tx_l0_override: Option<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
148 pub(crate) locy_l0: Option<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
153 pub(crate) collect_derive: bool,
156}
157
158impl crate::api::Uni {
159 #[allow(dead_code)]
163 pub(crate) fn locy(&self) -> LocyEngine<'_> {
164 LocyEngine {
165 db: &self.inner,
166 tx_l0_override: None,
167 locy_l0: None,
168 collect_derive: true,
169 }
170 }
171}
172
173impl<'a> LocyEngine<'a> {
174 pub fn compile_only(&self, program: &str) -> Result<CompiledProgram> {
180 let ast = uni_cypher::parse_locy(program).map_err(map_parse_error)?;
181 let registry = self.db.locy_rule_registry.read().unwrap();
182 if registry.rules.is_empty() {
183 drop(registry);
184 compile(&ast).map_err(map_compile_error)
185 } else {
186 let external_names: Vec<String> = registry.rules.keys().cloned().collect();
187 drop(registry);
188 uni_locy::compile_with_external_rules(&ast, &external_names).map_err(map_compile_error)
189 }
190 }
191
192 pub fn register(&self, program: &str) -> Result<()> {
199 let compiled = self.compile_only(program)?;
200 let mut registry = self.db.locy_rule_registry.write().unwrap();
201 for (name, rule) in compiled.rule_catalog {
202 registry.rules.insert(name, rule);
203 }
204 let base_id = registry.strata.len();
206 for mut stratum in compiled.strata {
207 stratum.id += base_id;
208 stratum.depends_on = stratum.depends_on.iter().map(|d| base_id + d).collect();
209 registry.strata.push(stratum);
210 }
211 Ok(())
212 }
213
214 pub fn clear_registry(&self) {
216 let mut registry = self.db.locy_rule_registry.write().unwrap();
217 registry.rules.clear();
218 registry.strata.clear();
219 }
220
221 pub async fn evaluate(&self, program: &str) -> Result<LocyResult> {
223 self.evaluate_with_config(program, &LocyConfig::default())
224 .await
225 }
226
227 pub fn evaluate_with(&self, program: &str) -> crate::api::locy_builder::InnerLocyBuilder<'_> {
245 crate::api::locy_builder::InnerLocyBuilder::new(self.db, program)
246 }
247
248 pub async fn explain(&self, program: &str) -> Result<LocyResult> {
250 self.evaluate(program).await
251 }
252
253 pub async fn evaluate_with_config(
258 &self,
259 program: &str,
260 config: &LocyConfig,
261 ) -> Result<LocyResult> {
262 let mut compiled = self.compile_only(program)?;
263
264 {
266 let registry = self.db.locy_rule_registry.read().unwrap();
267 if !registry.rules.is_empty() {
268 for (name, rule) in ®istry.rules {
269 compiled
270 .rule_catalog
271 .entry(name.clone())
272 .or_insert_with(|| rule.clone());
273 }
274 let base_id = registry.strata.len();
275 for stratum in &mut compiled.strata {
276 stratum.id += base_id;
277 stratum.depends_on = stratum.depends_on.iter().map(|d| d + base_id).collect();
278 }
279 let mut merged_strata = registry.strata.clone();
280 merged_strata.append(&mut compiled.strata);
281 compiled.strata = merged_strata;
282 }
283 }
284
285 self.evaluate_compiled_with_config(compiled, config).await
286 }
287
288 pub async fn evaluate_compiled_with_config(
294 &self,
295 compiled: CompiledProgram,
296 config: &LocyConfig,
297 ) -> Result<LocyResult> {
298 let start = Instant::now();
299
300 let evaluated_at_version = if self.collect_derive {
302 if let Some(ref w) = self.db.writer {
303 w.read()
304 .await
305 .l0_manager
306 .get_current()
307 .read()
308 .current_version
309 } else {
310 0
311 }
312 } else {
313 0
314 };
315
316 let schema = self.db.schema.schema();
318 let query_planner = uni_query::QueryPlanner::new(schema);
319 let plan_builder = uni_query::query::locy_planner::LocyPlanBuilder::new(&query_planner);
320 let logical = plan_builder
321 .build_program_plan(
322 &compiled,
323 config.max_iterations,
324 config.timeout,
325 config.max_derived_bytes,
326 config.deterministic_best_by,
327 config.strict_probability_domain,
328 config.probability_epsilon,
329 config.exact_probability,
330 config.max_bdd_variables,
331 config.top_k_proofs,
332 )
333 .map_err(|e| UniError::Query {
334 message: format!("LocyPlanBuildError: {e}"),
335 query: None,
336 })?;
337
338 let mut df_executor = uni_query::Executor::new(self.db.storage.clone());
340 df_executor.set_config(self.db.config.clone());
341 if let Some(ref w) = self.db.writer {
342 df_executor.set_writer(w.clone());
343 }
344 df_executor.set_xervo_runtime(self.db.xervo_runtime.clone());
345 df_executor.set_procedure_registry(self.db.procedure_registry.clone());
346 if let Ok(reg) = self.db.custom_functions.read()
347 && !reg.is_empty()
348 {
349 df_executor.set_custom_functions(std::sync::Arc::new(reg.clone()));
350 }
351
352 let (session_ctx, planner, _prop_mgr) = df_executor
353 .create_datafusion_planner(&self.db.properties, &config.params)
354 .await
355 .map_err(map_native_df_error)?;
356
357 let exec_plan = planner.plan(&logical).map_err(map_native_df_error)?;
359
360 let has_explain = compiled
362 .commands
363 .iter()
364 .any(|c| matches!(c, CompiledCommand::ExplainRule(_)));
365 let has_prob_fold = compiled.strata.iter().any(|s| {
366 s.rules.iter().any(|r| {
367 r.clauses.iter().any(|c| {
368 c.fold.iter().any(|f| {
369 if let uni_cypher::ast::Expr::FunctionCall { name, .. } = &f.aggregate {
370 matches!(name.to_uppercase().as_str(), "MNOR" | "MPROD")
371 } else {
372 false
373 }
374 })
375 })
376 })
377 });
378 let needs_tracker = has_explain || has_prob_fold;
379 let tracker: Option<Arc<uni_query::query::df_graph::ProvenanceStore>> = if needs_tracker {
380 Some(Arc::new(uni_query::query::df_graph::ProvenanceStore::new()))
381 } else {
382 None
383 };
384
385 let (
386 derived_store_slot,
387 iteration_counts_slot,
388 peak_memory_slot,
389 warnings_slot,
390 approximate_slot,
391 command_results_slot,
392 timeout_flag,
393 ) = if let Some(program_exec) = exec_plan
394 .as_any()
395 .downcast_ref::<uni_query::query::df_graph::LocyProgramExec>(
396 ) {
397 if let Some(ref t) = tracker {
398 program_exec.set_derivation_tracker(Arc::clone(t));
399 }
400 (
401 program_exec.derived_store_slot(),
402 program_exec.iteration_counts_slot(),
403 program_exec.peak_memory_slot(),
404 program_exec.warnings_slot(),
405 program_exec.approximate_slot(),
406 program_exec.command_results_slot(),
407 program_exec.timeout_flag(),
408 )
409 } else {
410 (
411 Arc::new(std::sync::RwLock::new(None)),
412 Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
413 Arc::new(std::sync::RwLock::new(0usize)),
414 Arc::new(std::sync::RwLock::new(Vec::new())),
415 Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
416 Arc::new(std::sync::RwLock::new(Vec::new())),
417 Arc::new(std::sync::atomic::AtomicBool::new(false)),
418 )
419 };
420
421 let _stats_batches = uni_query::Executor::collect_batches(&session_ctx, exec_plan)
423 .await
424 .map_err(map_native_df_error)?;
425
426 let native_store = derived_store_slot
428 .write()
429 .unwrap()
430 .take()
431 .unwrap_or_default();
432
433 let mut orch_store = native_store_to_row_store(&native_store, &compiled);
435
436 {
440 let orch_rows: HashMap<String, Vec<FactRow>> = orch_store
441 .iter()
442 .map(|(k, v)| (k.clone(), v.rows.clone()))
443 .collect();
444 let enriched_rows = enrich_vids_with_nodes(
445 self.db,
446 &native_store,
447 orch_rows,
448 planner.graph_ctx(),
449 planner.session_ctx(),
450 )
451 .await;
452 for (name, rows) in enriched_rows {
453 if let Some(rel) = orch_store.get_mut(&name) {
454 rel.rows = rows;
455 }
456 }
457 }
458
459 let native_ctx = NativeExecutionAdapter::new(
461 self.db,
462 &native_store,
463 &compiled,
464 planner.graph_ctx().clone(),
465 planner.session_ctx().clone(),
466 config.params.clone(),
467 self.tx_l0_override.clone(),
468 );
469 *native_ctx.locy_l0.lock().unwrap() = self.locy_l0.clone();
471 let mut locy_stats = LocyStats {
472 total_iterations: iteration_counts_slot
473 .read()
474 .map(|c| c.values().sum::<usize>())
475 .unwrap_or(0),
476 peak_memory_bytes: peak_memory_slot.read().map(|v| *v).unwrap_or(0),
477 ..LocyStats::default()
478 };
479 let approx_for_explain = approximate_slot
480 .read()
481 .map(|a| a.clone())
482 .unwrap_or_default();
483 let inline_map: HashMap<usize, CommandResult> =
485 command_results_slot.write().unwrap().drain(..).collect();
486
487 let mut command_results = Vec::new();
488 let mut collected_derives: Vec<CollectedDeriveOutput> = Vec::new();
489 let timed_out_early = timeout_flag.load(std::sync::atomic::Ordering::Relaxed);
490 if !timed_out_early {
493 for (cmd_idx, cmd) in compiled.commands.iter().enumerate() {
494 if let Some(result) = inline_map.get(&cmd_idx) {
495 command_results.push(result.clone());
497 continue;
498 }
499 let result = dispatch_native_command(
500 cmd,
501 &compiled,
502 &native_ctx,
503 config,
504 &mut orch_store,
505 &mut locy_stats,
506 tracker.clone(),
507 start,
508 &approx_for_explain,
509 self.collect_derive,
510 &mut collected_derives,
511 )
512 .await
513 .map_err(map_runtime_error)?;
514 command_results.push(result);
515 }
516 }
517
518 let evaluation_time = start.elapsed();
519
520 let mut base_derived: HashMap<String, Vec<FactRow>> = native_store
522 .rule_names()
523 .filter_map(|name| {
524 native_store
525 .get(name)
526 .map(|batches| (name.to_string(), record_batches_to_locy_rows(batches)))
527 })
528 .collect();
529
530 let approximate_groups = approximate_slot
532 .read()
533 .map(|a| a.clone())
534 .unwrap_or_default();
535 for (rule_name, groups) in &approximate_groups {
536 if !groups.is_empty()
537 && let Some(rows) = base_derived.get_mut(rule_name)
538 {
539 for row in rows.iter_mut() {
540 row.insert("_approximate".to_string(), Value::Bool(true));
541 }
542 }
543 }
544
545 let enriched_derived = enrich_vids_with_nodes(
546 self.db,
547 &native_store,
548 base_derived,
549 planner.graph_ctx(),
550 planner.session_ctx(),
551 )
552 .await;
553
554 let derived_fact_set = if !collected_derives.is_empty() {
556 let mut all_vertices = HashMap::new();
557 let mut all_edges = Vec::new();
558 let mut all_queries = Vec::new();
559 for output in collected_derives {
560 for (label, verts) in output.vertices {
561 all_vertices
562 .entry(label)
563 .or_insert_with(Vec::new)
564 .extend(verts);
565 }
566 all_edges.extend(output.edges);
567 all_queries.extend(output.queries);
568 }
569 Some(DerivedFactSet {
570 vertices: all_vertices,
571 edges: all_edges,
572 stats: locy_stats.clone(),
573 evaluated_at_version,
574 mutation_queries: all_queries,
575 })
576 } else {
577 None
578 };
579
580 let warnings = warnings_slot.read().map(|w| w.clone()).unwrap_or_default();
582 let timed_out = timeout_flag.load(std::sync::atomic::Ordering::Relaxed);
583 Ok(build_locy_result(
584 enriched_derived,
585 command_results,
586 &compiled,
587 evaluation_time,
588 locy_stats,
589 warnings,
590 approximate_groups,
591 derived_fact_set,
592 timed_out,
593 ))
594 }
595
596 async fn run_strata_native(
602 &self,
603 compiled: &CompiledProgram,
604 config: &LocyConfig,
605 ) -> Result<uni_query::query::df_graph::DerivedStore> {
606 let schema = self.db.schema.schema();
607 let query_planner = uni_query::QueryPlanner::new(schema);
608 let plan_builder = uni_query::query::locy_planner::LocyPlanBuilder::new(&query_planner);
609 let logical = plan_builder
610 .build_program_plan(
611 compiled,
612 config.max_iterations,
613 config.timeout,
614 config.max_derived_bytes,
615 config.deterministic_best_by,
616 config.strict_probability_domain,
617 config.probability_epsilon,
618 config.exact_probability,
619 config.max_bdd_variables,
620 config.top_k_proofs,
621 )
622 .map_err(|e| UniError::Query {
623 message: format!("LocyPlanBuildError: {e}"),
624 query: None,
625 })?;
626
627 let mut df_executor = uni_query::Executor::new(self.db.storage.clone());
628 df_executor.set_config(self.db.config.clone());
629 if let Some(ref w) = self.db.writer {
630 df_executor.set_writer(w.clone());
631 }
632 if let Some(ref l0) = self.tx_l0_override {
635 df_executor.set_transaction_l0(l0.clone());
636 }
637 df_executor.set_xervo_runtime(self.db.xervo_runtime.clone());
638 df_executor.set_procedure_registry(self.db.procedure_registry.clone());
639
640 let (session_ctx, planner, _) = df_executor
641 .create_datafusion_planner(&self.db.properties, &config.params)
642 .await
643 .map_err(map_native_df_error)?;
644 let exec_plan = planner.plan(&logical).map_err(map_native_df_error)?;
645
646 let derived_store_slot = if let Some(program_exec) =
647 exec_plan
648 .as_any()
649 .downcast_ref::<uni_query::query::df_graph::LocyProgramExec>()
650 {
651 program_exec.derived_store_slot()
652 } else {
653 Arc::new(std::sync::RwLock::new(None))
654 };
655
656 let _ = uni_query::Executor::collect_batches(&session_ctx, exec_plan)
657 .await
658 .map_err(map_native_df_error)?;
659 Ok(derived_store_slot
660 .write()
661 .unwrap()
662 .take()
663 .unwrap_or_default())
664 }
665}
666
667struct NativeExecutionAdapter<'a> {
670 db: &'a crate::api::UniInner,
671 native_store: &'a uni_query::query::df_graph::DerivedStore,
672 compiled: &'a CompiledProgram,
673 graph_ctx: Arc<uni_query::query::df_graph::GraphExecutionContext>,
675 session_ctx: Arc<parking_lot::RwLock<datafusion::prelude::SessionContext>>,
676 params: HashMap<String, Value>,
679 tx_l0_override: Option<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
681 locy_l0: std::sync::Mutex<Option<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>>,
684 l0_save_stack:
686 std::sync::Mutex<Vec<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>>,
687}
688
689impl<'a> NativeExecutionAdapter<'a> {
690 fn new(
691 db: &'a crate::api::UniInner,
692 native_store: &'a uni_query::query::df_graph::DerivedStore,
693 compiled: &'a CompiledProgram,
694 graph_ctx: Arc<uni_query::query::df_graph::GraphExecutionContext>,
695 session_ctx: Arc<parking_lot::RwLock<datafusion::prelude::SessionContext>>,
696 params: HashMap<String, Value>,
697 tx_l0_override: Option<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
698 ) -> Self {
699 Self {
700 db,
701 native_store,
702 compiled,
703 graph_ctx,
704 session_ctx,
705 params,
706 tx_l0_override,
707 locy_l0: std::sync::Mutex::new(None),
708 l0_save_stack: std::sync::Mutex::new(Vec::new()),
709 }
710 }
711
712 async fn execute_query_ast(
714 &self,
715 ast: Query,
716 ) -> std::result::Result<Vec<RecordBatch>, LocyError> {
717 let schema = self.db.schema.schema();
718 let logical_plan =
719 QueryPlanner::new(schema)
720 .plan(ast)
721 .map_err(|e| LocyError::ExecutorError {
722 message: e.to_string(),
723 })?;
724 uni_query::query::df_graph::common::execute_subplan(
725 &logical_plan,
726 &self.params,
727 &HashMap::new(),
728 &self.graph_ctx,
729 &self.session_ctx,
730 &self.db.storage,
731 &self.db.schema.schema(),
732 )
733 .await
734 .map_err(|e| LocyError::ExecutorError {
735 message: e.to_string(),
736 })
737 }
738}
739
740#[async_trait]
741impl DerivedFactSource for NativeExecutionAdapter<'_> {
742 fn lookup_derived(&self, rule_name: &str) -> std::result::Result<Vec<FactRow>, LocyError> {
743 let batches = self
744 .native_store
745 .get(rule_name)
746 .map(|v| v.as_slice())
747 .unwrap_or(&[]);
748 Ok(record_batches_to_locy_rows(batches))
749 }
750
751 fn lookup_derived_batches(
752 &self,
753 rule_name: &str,
754 ) -> std::result::Result<Vec<RecordBatch>, LocyError> {
755 Ok(self
756 .native_store
757 .get(rule_name)
758 .map(|v| v.to_vec())
759 .unwrap_or_default())
760 }
761
762 async fn execute_pattern(
763 &self,
764 pattern: &Pattern,
765 where_conditions: &[Expr],
766 ) -> std::result::Result<Vec<RecordBatch>, LocyError> {
767 let query = build_match_return_query(pattern, where_conditions);
768 let schema = self.db.schema.schema();
769 let logical_plan =
770 QueryPlanner::new(schema)
771 .plan(query)
772 .map_err(|e| LocyError::ExecutorError {
773 message: e.to_string(),
774 })?;
775
776 let tx_l0_for_ctx = self
780 .locy_l0
781 .lock()
782 .unwrap()
783 .clone()
784 .or_else(|| self.tx_l0_override.clone());
785 let transaction_ctx: Option<Arc<uni_query::query::df_graph::GraphExecutionContext>> =
786 if let Some(tx_l0) = tx_l0_for_ctx {
787 if let Some(writer_arc) = &self.db.writer {
788 if let Ok(writer) = writer_arc.try_read() {
789 let l0_ctx = uni_query::query::df_graph::L0Context {
790 current_l0: Some(writer.l0_manager.get_current()),
791 transaction_l0: Some(tx_l0),
792 pending_flush_l0s: writer.l0_manager.get_pending_flush(),
793 };
794 Some(Arc::new(
795 uni_query::query::df_graph::GraphExecutionContext::with_l0_context(
796 self.db.storage.clone(),
797 l0_ctx,
798 self.graph_ctx.property_manager().clone(),
799 ),
800 ))
801 } else {
802 None
803 }
804 } else {
805 None
806 }
807 } else {
808 None
809 };
810
811 let effective_ctx = transaction_ctx.as_ref().unwrap_or(&self.graph_ctx);
812
813 uni_query::query::df_graph::common::execute_subplan(
815 &logical_plan,
816 &self.params,
817 &HashMap::new(),
818 effective_ctx,
819 &self.session_ctx,
820 &self.db.storage,
821 &self.db.schema.schema(),
822 )
823 .await
824 .map_err(|e| LocyError::ExecutorError {
825 message: e.to_string(),
826 })
827 }
828}
829
830#[async_trait]
831impl LocyExecutionContext for NativeExecutionAdapter<'_> {
832 async fn lookup_derived_enriched(
833 &self,
834 rule_name: &str,
835 ) -> std::result::Result<Vec<FactRow>, LocyError> {
836 use arrow_schema::DataType;
837
838 if let Some(rule) = self.compiled.rule_catalog.get(rule_name) {
839 let is_derive_rule = rule
840 .clauses
841 .iter()
842 .all(|c| matches!(c.output, RuleOutput::Derive(_)));
843 if is_derive_rule {
844 let mut all_rows = Vec::new();
845 for clause in &rule.clauses {
846 let cypher_conds = extract_cypher_conditions(&clause.where_conditions);
847 let raw_batches = self
848 .execute_pattern(&clause.match_pattern, &cypher_conds)
849 .await?;
850 all_rows.extend(record_batches_to_locy_rows(&raw_batches));
851 }
852 return Ok(all_rows);
853 }
854 }
855
856 let batches = self
857 .native_store
858 .get(rule_name)
859 .map(|v| v.as_slice())
860 .unwrap_or(&[]);
861 let rows = record_batches_to_locy_rows(batches);
862
863 let vid_columns: HashSet<String> = batches
864 .first()
865 .map(|batch| {
866 batch
867 .schema()
868 .fields()
869 .iter()
870 .filter(|f| *f.data_type() == DataType::UInt64)
871 .map(|f| f.name().clone())
872 .collect()
873 })
874 .unwrap_or_default();
875
876 if vid_columns.is_empty() {
877 return Ok(rows);
878 }
879
880 let unique_vids: HashSet<i64> = rows
881 .iter()
882 .flat_map(|row| {
883 vid_columns.iter().filter_map(|col| {
884 if let Some(Value::Int(vid)) = row.get(col) {
885 Some(*vid)
886 } else {
887 None
888 }
889 })
890 })
891 .collect();
892
893 if unique_vids.is_empty() {
894 return Ok(rows);
895 }
896
897 let vids_literal = unique_vids
898 .iter()
899 .map(|v| v.to_string())
900 .collect::<Vec<_>>()
901 .join(", ");
902 let query_str =
903 format!("MATCH (n) WHERE id(n) IN [{vids_literal}] RETURN id(n) AS _vid, n");
904 let mut vid_to_node: HashMap<i64, Value> = HashMap::new();
905 if let Ok(ast) = uni_cypher::parse(&query_str)
906 && let Ok(batches) = self.execute_query_ast(ast).await
907 {
908 for row in record_batches_to_locy_rows(&batches) {
909 if let (Some(Value::Int(vid)), Some(node)) = (row.get("_vid"), row.get("n")) {
910 vid_to_node.insert(*vid, node.clone());
911 }
912 }
913 }
914
915 Ok(rows
916 .into_iter()
917 .map(|row| {
918 row.into_iter()
919 .map(|(k, v)| {
920 if vid_columns.contains(&k)
921 && let Value::Int(vid) = &v
922 {
923 let new_v = vid_to_node.get(vid).cloned().unwrap_or(v);
924 return (k, new_v);
925 }
926 (k, v)
927 })
928 .collect()
929 })
930 .collect())
931 }
932
933 async fn execute_cypher_read(
934 &self,
935 ast: Query,
936 ) -> std::result::Result<Vec<FactRow>, LocyError> {
937 let active_l0 = self.locy_l0.lock().unwrap().clone();
940 let result = if let Some(ref l0) = active_l0 {
941 self.db
942 .execute_ast_internal_with_tx_l0(
943 ast,
944 "<locy>",
945 HashMap::new(),
946 self.db.config.clone(),
947 l0.clone(),
948 )
949 .await
950 } else if let Some(ref tx_l0) = self.tx_l0_override {
951 self.db
952 .execute_ast_internal_with_tx_l0(
953 ast,
954 "<locy>",
955 HashMap::new(),
956 self.db.config.clone(),
957 tx_l0.clone(),
958 )
959 .await
960 } else {
961 self.db
962 .execute_ast_internal(ast, "<locy>", HashMap::new(), self.db.config.clone())
963 .await
964 }
965 .map_err(|e| LocyError::ExecutorError {
966 message: e.to_string(),
967 })?;
968 Ok(result
969 .into_rows()
970 .into_iter()
971 .map(|row| {
972 let cols: Vec<String> = row.columns().to_vec();
973 cols.into_iter().zip(row.into_values()).collect()
974 })
975 .collect())
976 }
977
978 async fn execute_mutation(
979 &self,
980 ast: Query,
981 params: HashMap<String, Value>,
982 ) -> std::result::Result<usize, LocyError> {
983 let active_l0 = self.locy_l0.lock().unwrap().clone();
985 if let Some(ref l0) = active_l0 {
986 let before = l0.read().mutation_count;
987 self.db
988 .execute_ast_internal_with_tx_l0(
989 ast,
990 "<locy>",
991 params,
992 self.db.config.clone(),
993 l0.clone(),
994 )
995 .await
996 .map_err(|e| LocyError::ExecutorError {
997 message: e.to_string(),
998 })?;
999 let after = l0.read().mutation_count;
1000 return Ok(after.saturating_sub(before));
1001 }
1002 if let Some(ref tx_l0) = self.tx_l0_override {
1003 let before = tx_l0.read().mutation_count;
1004 self.db
1005 .execute_ast_internal_with_tx_l0(
1006 ast,
1007 "<locy>",
1008 params,
1009 self.db.config.clone(),
1010 tx_l0.clone(),
1011 )
1012 .await
1013 .map_err(|e| LocyError::ExecutorError {
1014 message: e.to_string(),
1015 })?;
1016 let after = tx_l0.read().mutation_count;
1017 return Ok(after.saturating_sub(before));
1018 }
1019 let before = self.db.get_mutation_count().await;
1021 self.db
1022 .execute_ast_internal(ast, "<locy>", params, self.db.config.clone())
1023 .await
1024 .map_err(|e| LocyError::ExecutorError {
1025 message: e.to_string(),
1026 })?;
1027 let after = self.db.get_mutation_count().await;
1028 Ok(after.saturating_sub(before))
1029 }
1030
1031 async fn fork_l0(&self) -> std::result::Result<(), LocyError> {
1032 let mut guard = self.locy_l0.lock().unwrap();
1033 let current = guard.as_ref().ok_or_else(|| LocyError::SavepointFailed {
1034 message: "no active Locy L0 to fork".into(),
1035 })?;
1036 let cloned = Arc::new(parking_lot::RwLock::new(current.read().clone()));
1038 let previous = guard.replace(cloned).unwrap();
1040 self.l0_save_stack.lock().unwrap().push(previous);
1041 Ok(())
1042 }
1043
1044 async fn restore_l0(&self) -> std::result::Result<(), LocyError> {
1045 let saved =
1046 self.l0_save_stack
1047 .lock()
1048 .unwrap()
1049 .pop()
1050 .ok_or_else(|| LocyError::SavepointFailed {
1051 message: "no saved L0 to restore".into(),
1052 })?;
1053 let mut guard = self.locy_l0.lock().unwrap();
1054 *guard = Some(saved);
1055 Ok(())
1056 }
1057
1058 async fn re_evaluate_strata(
1059 &self,
1060 program: &CompiledProgram,
1061 config: &LocyConfig,
1062 ) -> std::result::Result<RowStore, LocyError> {
1063 let strata_only = CompiledProgram {
1064 strata: program.strata.clone(),
1065 rule_catalog: program.rule_catalog.clone(),
1066 warnings: vec![],
1067 commands: vec![],
1068 };
1069 let locy_l0 = self.locy_l0.lock().unwrap().clone();
1071 let engine = LocyEngine {
1072 db: self.db,
1073 tx_l0_override: locy_l0.clone(),
1074 locy_l0,
1075 collect_derive: false,
1076 };
1077 let native_store = engine
1078 .run_strata_native(&strata_only, config)
1079 .await
1080 .map_err(|e| LocyError::ExecutorError {
1081 message: e.to_string(),
1082 })?;
1083 let mut store = native_store_to_row_store(&native_store, program);
1084
1085 let store_rows: HashMap<String, Vec<FactRow>> = store
1088 .iter()
1089 .map(|(k, v)| (k.clone(), v.rows.clone()))
1090 .collect();
1091 let enriched = enrich_vids_with_nodes(
1092 self.db,
1093 &native_store,
1094 store_rows,
1095 &self.graph_ctx,
1096 &self.session_ctx,
1097 )
1098 .await;
1099 for (name, rows) in enriched {
1100 if let Some(rel) = store.get_mut(&name) {
1101 rel.rows = rows;
1102 }
1103 }
1104
1105 Ok(store)
1106 }
1107}
1108
1109#[allow(clippy::too_many_arguments)]
1112fn dispatch_native_command<'a>(
1113 cmd: &'a CompiledCommand,
1114 program: &'a CompiledProgram,
1115 ctx: &'a NativeExecutionAdapter<'a>,
1116 config: &'a LocyConfig,
1117 orch_store: &'a mut RowStore,
1118 stats: &'a mut LocyStats,
1119 tracker: Option<Arc<ProvenanceStore>>,
1120 start: Instant,
1121 approximate_groups: &'a HashMap<String, Vec<String>>,
1122 collect_derive: bool,
1123 collected_derives: &'a mut Vec<CollectedDeriveOutput>,
1124) -> std::pin::Pin<
1125 Box<
1126 dyn std::future::Future<Output = std::result::Result<CommandResult, LocyError>> + Send + 'a,
1127 >,
1128> {
1129 Box::pin(async move {
1130 match cmd {
1131 CompiledCommand::GoalQuery(gq) => {
1132 let rows = uni_query::query::df_graph::locy_query::evaluate_query(
1133 gq, program, ctx, config, orch_store, stats, start,
1134 )
1135 .await?;
1136 Ok(CommandResult::Query(rows))
1137 }
1138 CompiledCommand::ExplainRule(eq) => {
1139 let node = uni_query::query::df_graph::locy_explain::explain_rule(
1140 eq,
1141 program,
1142 ctx,
1143 config,
1144 orch_store,
1145 stats,
1146 tracker.as_deref(),
1147 Some(approximate_groups),
1148 )
1149 .await?;
1150 Ok(CommandResult::Explain(node))
1151 }
1152 CompiledCommand::Assume(ca) => {
1153 let rows = uni_query::query::df_graph::locy_assume::evaluate_assume(
1154 ca, program, ctx, config, stats,
1155 )
1156 .await?;
1157 Ok(CommandResult::Assume(rows))
1158 }
1159 CompiledCommand::Abduce(aq) => {
1160 let result = uni_query::query::df_graph::locy_abduce::evaluate_abduce(
1161 aq,
1162 program,
1163 ctx,
1164 config,
1165 orch_store,
1166 stats,
1167 tracker.as_deref(),
1168 )
1169 .await?;
1170 Ok(CommandResult::Abduce(result))
1171 }
1172 CompiledCommand::DeriveCommand(dc) => {
1173 if collect_derive {
1174 let output = uni_query::query::df_graph::locy_derive::collect_derive_facts(
1176 dc, program, ctx,
1177 )
1178 .await?;
1179 let affected = output.affected;
1180
1181 if ctx.tx_l0_override.is_some() {
1188 for query in &output.queries {
1189 ctx.execute_mutation(query.clone(), HashMap::new()).await?;
1190 }
1191 }
1192
1193 collected_derives.push(output);
1194 Ok(CommandResult::Derive { affected })
1195 } else {
1196 let affected = uni_query::query::df_graph::locy_derive::derive_command(
1198 dc, program, ctx, stats,
1199 )
1200 .await?;
1201 Ok(CommandResult::Derive { affected })
1202 }
1203 }
1204 CompiledCommand::Cypher(q) => {
1205 let rows = ctx.execute_cypher_read(q.clone()).await?;
1206 stats.queries_executed += 1;
1207 Ok(CommandResult::Cypher(rows))
1208 }
1209 }
1210 })
1211}
1212
1213async fn enrich_vids_with_nodes(
1216 db: &crate::api::UniInner,
1217 native_store: &uni_query::query::df_graph::DerivedStore,
1218 derived: HashMap<String, Vec<FactRow>>,
1219 graph_ctx: &Arc<uni_query::query::df_graph::GraphExecutionContext>,
1220 session_ctx: &Arc<parking_lot::RwLock<datafusion::prelude::SessionContext>>,
1221) -> HashMap<String, Vec<FactRow>> {
1222 use arrow_schema::DataType;
1223 let mut enriched = HashMap::new();
1224
1225 for (name, rows) in derived {
1226 let vid_columns: HashSet<String> = native_store
1227 .get(&name)
1228 .and_then(|batches| batches.first())
1229 .map(|batch| {
1230 batch
1231 .schema()
1232 .fields()
1233 .iter()
1234 .filter(|f| *f.data_type() == DataType::UInt64)
1235 .map(|f| f.name().clone())
1236 .collect()
1237 })
1238 .unwrap_or_default();
1239
1240 if vid_columns.is_empty() {
1241 enriched.insert(name, rows);
1242 continue;
1243 }
1244
1245 let unique_vids: HashSet<i64> = rows
1246 .iter()
1247 .flat_map(|row| {
1248 vid_columns.iter().filter_map(|col| {
1249 if let Some(Value::Int(vid)) = row.get(col) {
1250 Some(*vid)
1251 } else {
1252 None
1253 }
1254 })
1255 })
1256 .collect();
1257
1258 if unique_vids.is_empty() {
1259 enriched.insert(name, rows);
1260 continue;
1261 }
1262
1263 let vids_literal = unique_vids
1264 .iter()
1265 .map(|v| v.to_string())
1266 .collect::<Vec<_>>()
1267 .join(", ");
1268 let query_str = format!(
1269 "MATCH (n) WHERE id(n) IN [{}] RETURN id(n) AS _vid, n",
1270 vids_literal
1271 );
1272 let mut vid_to_node: HashMap<i64, Value> = HashMap::new();
1273 if let Ok(ast) = uni_cypher::parse(&query_str) {
1274 let schema = db.schema.schema();
1275 if let Ok(logical_plan) = uni_query::QueryPlanner::new(schema).plan(ast)
1276 && let Ok(batches) = uni_query::query::df_graph::common::execute_subplan(
1277 &logical_plan,
1278 &HashMap::new(),
1279 &HashMap::new(),
1280 graph_ctx,
1281 session_ctx,
1282 &db.storage,
1283 &db.schema.schema(),
1284 )
1285 .await
1286 {
1287 for row in record_batches_to_locy_rows(&batches) {
1288 if let (Some(Value::Int(vid)), Some(node)) = (row.get("_vid"), row.get("n")) {
1289 vid_to_node.insert(*vid, node.clone());
1290 }
1291 }
1292 }
1293 }
1294
1295 let enriched_rows: Vec<FactRow> = rows
1296 .into_iter()
1297 .map(|row| {
1298 row.into_iter()
1299 .map(|(k, v)| {
1300 if vid_columns.contains(&k)
1301 && let Value::Int(vid) = &v
1302 {
1303 let new_v = vid_to_node.get(vid).cloned().unwrap_or(v);
1304 return (k, new_v);
1305 }
1306 (k, v)
1307 })
1308 .collect()
1309 })
1310 .collect();
1311 enriched.insert(name, enriched_rows);
1312 }
1313
1314 enriched
1315}
1316
1317#[allow(clippy::too_many_arguments)]
1318fn build_locy_result(
1319 derived: HashMap<String, Vec<FactRow>>,
1320 command_results: Vec<CommandResult>,
1321 compiled: &CompiledProgram,
1322 evaluation_time: Duration,
1323 mut orchestrator_stats: LocyStats,
1324 warnings: Vec<RuntimeWarning>,
1325 approximate_groups: HashMap<String, Vec<String>>,
1326 derived_fact_set: Option<DerivedFactSet>,
1327 timed_out: bool,
1328) -> LocyResult {
1329 let total_facts: usize = derived.values().map(|v| v.len()).sum();
1330 orchestrator_stats.strata_evaluated = compiled.strata.len();
1331 orchestrator_stats.derived_nodes = total_facts;
1332 orchestrator_stats.evaluation_time = evaluation_time;
1333
1334 let inner = uni_locy::LocyResult {
1335 derived,
1336 stats: orchestrator_stats,
1337 command_results,
1338 warnings,
1339 approximate_groups,
1340 derived_fact_set,
1341 timed_out,
1342 };
1343 let metrics = QueryMetrics {
1344 total_time: evaluation_time,
1345 exec_time: evaluation_time,
1346 rows_returned: total_facts,
1347 ..Default::default()
1348 };
1349 LocyResult::new(inner, metrics)
1350}
1351
1352fn native_store_to_row_store(
1353 native: &uni_query::query::df_graph::DerivedStore,
1354 compiled: &CompiledProgram,
1355) -> RowStore {
1356 let mut result = RowStore::new();
1357 for name in native.rule_names() {
1358 if let Some(batches) = native.get(name) {
1359 let rows = record_batches_to_locy_rows(batches);
1360 let rule = compiled.rule_catalog.get(name);
1361 let columns: Vec<String> = rule
1362 .map(|r| r.yield_schema.iter().map(|yc| yc.name.clone()).collect())
1363 .unwrap_or_else(|| {
1364 rows.first()
1365 .map(|r| r.keys().cloned().collect())
1366 .unwrap_or_default()
1367 });
1368 result.insert(name.to_string(), RowRelation::new(columns, rows));
1369 }
1370 }
1371 result
1372}
1373
1374fn map_parse_error(e: uni_cypher::ParseError) -> UniError {
1377 UniError::Parse {
1378 message: format!("LocyParseError: {e}"),
1379 position: None,
1380 line: None,
1381 column: None,
1382 context: None,
1383 }
1384}
1385
1386fn map_compile_error(e: LocyCompileError) -> UniError {
1387 UniError::Query {
1388 message: format!("LocyCompileError: {e}"),
1389 query: None,
1390 }
1391}
1392
1393fn map_runtime_error(e: LocyError) -> UniError {
1394 match e {
1395 LocyError::SavepointFailed { ref message } => UniError::Transaction {
1396 message: format!("LocyRuntimeError: {message}"),
1397 },
1398 other => UniError::Query {
1399 message: format!("LocyRuntimeError: {other}"),
1400 query: None,
1401 },
1402 }
1403}
1404
1405fn map_native_df_error(e: impl std::fmt::Display) -> UniError {
1406 UniError::Query {
1407 message: format!("LocyRuntimeError: {e}"),
1408 query: None,
1409 }
1410}