Skip to main content

uni_db/api/
impl_locy.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Locy engine integration: wires the Locy compiler and native execution engine to the real database.
5
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use arrow_array::RecordBatch;
11use async_trait::async_trait;
12use uni_common::{Result, UniError, Value};
13use uni_cypher::ast::{Expr, Pattern, Query};
14use uni_cypher::locy_ast::RuleOutput;
15use uni_locy::types::CompiledCommand;
16use uni_locy::{
17    CommandResult, CompiledProgram, LocyCompileError, LocyConfig, LocyError, LocyResult, LocyStats,
18    Row, RuntimeWarning, SavepointId, compile,
19};
20use uni_query::QueryPlanner;
21use uni_query::query::df_graph::locy_ast_builder::build_match_return_query;
22use uni_query::query::df_graph::locy_delta::{RowRelation, RowStore, extract_cypher_conditions};
23use uni_query::query::df_graph::locy_eval::record_batches_to_locy_rows;
24use uni_query::query::df_graph::locy_explain::ProvenanceStore;
25use uni_query::query::df_graph::{DerivedFactSource, LocyExecutionContext};
26
27use crate::api::Uni;
28
29/// Session-level registry for pre-compiled Locy rules.
30///
31/// Rules registered here are automatically merged into subsequent `evaluate()`
32/// calls, eliminating the need to redeclare rules across multiple evaluations
33/// (e.g., baseline, EXPLAIN, ASSUME, ABDUCE in notebooks).
34#[derive(Debug, Default, Clone)]
35pub struct LocyRuleRegistry {
36    /// Compiled rules indexed by rule name.
37    pub rules: HashMap<String, uni_locy::types::CompiledRule>,
38    /// Strata from registered programs, for execution ordering.
39    pub strata: Vec<uni_locy::types::Stratum>,
40}
41
42/// Engine for evaluating Locy programs against a real database.
43pub struct LocyEngine<'a> {
44    db: &'a Uni,
45}
46
47impl Uni {
48    /// Create a Locy evaluation engine bound to this database.
49    pub fn locy(&self) -> LocyEngine<'_> {
50        LocyEngine { db: self }
51    }
52}
53
54impl<'a> LocyEngine<'a> {
55    /// Parse and compile a Locy program without executing it.
56    ///
57    /// If the session's rule registry contains pre-compiled rules, their names
58    /// are passed to the compiler so that IS-ref and QUERY references to
59    /// registered rules are accepted during validation.
60    pub fn compile_only(&self, program: &str) -> Result<CompiledProgram> {
61        let ast = uni_cypher::parse_locy(program).map_err(map_parse_error)?;
62        let registry = self.db.locy_rule_registry.read().unwrap();
63        if registry.rules.is_empty() {
64            drop(registry);
65            compile(&ast).map_err(map_compile_error)
66        } else {
67            let external_names: Vec<String> = registry.rules.keys().cloned().collect();
68            drop(registry);
69            uni_locy::compile_with_external_rules(&ast, &external_names).map_err(map_compile_error)
70        }
71    }
72
73    /// Compile and register a Locy program's rules for reuse.
74    ///
75    /// Rules registered here persist within the database session and are
76    /// automatically merged into subsequent `evaluate()` calls, so notebooks
77    /// can define rules once and run QUERY, EXPLAIN, ASSUME, ABDUCE without
78    /// redeclaring the full rule set each time.
79    pub fn register(&self, program: &str) -> Result<()> {
80        let compiled = self.compile_only(program)?;
81        let mut registry = self.db.locy_rule_registry.write().unwrap();
82        for (name, rule) in compiled.rule_catalog {
83            registry.rules.insert(name, rule);
84        }
85        // Merge strata, assigning new IDs to avoid collisions.
86        let base_id = registry.strata.len();
87        for mut stratum in compiled.strata {
88            let old_id = stratum.id;
89            stratum.id = base_id + old_id;
90            stratum.depends_on = stratum.depends_on.iter().map(|d| base_id + d).collect();
91            registry.strata.push(stratum);
92        }
93        Ok(())
94    }
95
96    /// Clear all registered Locy rules from the session.
97    pub fn clear_registry(&self) {
98        let mut registry = self.db.locy_rule_registry.write().unwrap();
99        registry.rules.clear();
100        registry.strata.clear();
101    }
102
103    /// Parse, compile, and evaluate a Locy program with default config.
104    pub async fn evaluate(&self, program: &str) -> Result<LocyResult> {
105        self.evaluate_with_config(program, &LocyConfig::default())
106            .await
107    }
108
109    /// Start building a Locy evaluation with fluent parameter binding.
110    ///
111    /// Mirrors `db.query_with(cypher).param(…).fetch_all()` for Cypher.
112    ///
113    /// # Examples
114    ///
115    /// ```no_run
116    /// # use uni_db::Uni;
117    /// # async fn example(db: &Uni) -> uni_db::Result<()> {
118    /// let result = db.locy()
119    ///     .evaluate_with("CREATE RULE ep AS MATCH (e:Episode) WHERE e.agent_id = $aid YIELD KEY e")
120    ///     .param("aid", "agent-123")
121    ///     .run()
122    ///     .await?;
123    /// # Ok(())
124    /// # }
125    /// ```
126    pub fn evaluate_with(&self, program: &str) -> crate::api::locy_builder::LocyBuilder<'_> {
127        crate::api::locy_builder::LocyBuilder::new(self.db, program)
128    }
129
130    /// Convenience wrapper for EXPLAIN RULE commands.
131    pub async fn explain(&self, program: &str) -> Result<LocyResult> {
132        self.evaluate(program).await
133    }
134
135    /// Parse, compile, and evaluate a Locy program with custom config.
136    ///
137    /// If rules were previously registered via `register()`, they are
138    /// automatically merged into the compiled program before execution.
139    pub async fn evaluate_with_config(
140        &self,
141        program: &str,
142        config: &LocyConfig,
143    ) -> Result<LocyResult> {
144        let mut compiled = self.compile_only(program)?;
145
146        // Merge registered rules into the compiled program.
147        {
148            let registry = self.db.locy_rule_registry.read().unwrap();
149            if !registry.rules.is_empty() {
150                for (name, rule) in &registry.rules {
151                    compiled
152                        .rule_catalog
153                        .entry(name.clone())
154                        .or_insert_with(|| rule.clone());
155                }
156                let base_id = registry.strata.len();
157                for stratum in &mut compiled.strata {
158                    stratum.id += base_id;
159                    stratum.depends_on = stratum.depends_on.iter().map(|d| d + base_id).collect();
160                }
161                let mut merged_strata = registry.strata.clone();
162                merged_strata.append(&mut compiled.strata);
163                compiled.strata = merged_strata;
164            }
165        }
166        let start = Instant::now();
167
168        // 1. Build logical plan
169        let schema = self.db.schema.schema();
170        let query_planner = uni_query::QueryPlanner::new(schema);
171        let plan_builder = uni_query::query::locy_planner::LocyPlanBuilder::new(&query_planner);
172        let logical = plan_builder
173            .build_program_plan(
174                &compiled,
175                config.max_iterations,
176                config.timeout,
177                config.max_derived_bytes,
178                config.deterministic_best_by,
179                config.strict_probability_domain,
180                config.probability_epsilon,
181                config.exact_probability,
182                config.max_bdd_variables,
183                config.top_k_proofs,
184            )
185            .map_err(|e| UniError::Query {
186                message: format!("LocyPlanBuildError: {e}"),
187                query: None,
188            })?;
189
190        // 2. Create executor + physical planner
191        let mut df_executor = uni_query::Executor::new(self.db.storage.clone());
192        df_executor.set_config(self.db.config.clone());
193        if let Some(ref w) = self.db.writer {
194            df_executor.set_writer(w.clone());
195        }
196        df_executor.set_xervo_runtime(self.db.xervo_runtime.clone());
197        df_executor.set_procedure_registry(self.db.procedure_registry.clone());
198
199        let (session_ctx, planner, _prop_mgr) = df_executor
200            .create_datafusion_planner(&self.db.properties, &config.params)
201            .await
202            .map_err(map_native_df_error)?;
203
204        // 3. Physical plan
205        let exec_plan = planner.plan(&logical).map_err(map_native_df_error)?;
206
207        // 4. Create tracker for EXPLAIN commands or shared-proof detection
208        let has_explain = compiled
209            .commands
210            .iter()
211            .any(|c| matches!(c, CompiledCommand::ExplainRule(_)));
212        let has_prob_fold = compiled.strata.iter().any(|s| {
213            s.rules.iter().any(|r| {
214                r.clauses.iter().any(|c| {
215                    c.fold.iter().any(|f| {
216                        if let uni_cypher::ast::Expr::FunctionCall { name, .. } = &f.aggregate {
217                            matches!(name.to_uppercase().as_str(), "MNOR" | "MPROD")
218                        } else {
219                            false
220                        }
221                    })
222                })
223            })
224        });
225        let needs_tracker = has_explain || has_prob_fold;
226        let tracker: Option<Arc<uni_query::query::df_graph::ProvenanceStore>> = if needs_tracker {
227            Some(Arc::new(uni_query::query::df_graph::ProvenanceStore::new()))
228        } else {
229            None
230        };
231
232        let (
233            derived_store_slot,
234            iteration_counts_slot,
235            peak_memory_slot,
236            warnings_slot,
237            approximate_slot,
238        ) = if let Some(program_exec) = exec_plan
239            .as_any()
240            .downcast_ref::<uni_query::query::df_graph::LocyProgramExec>(
241        ) {
242            if let Some(ref t) = tracker {
243                program_exec.set_derivation_tracker(Arc::clone(t));
244            }
245            (
246                program_exec.derived_store_slot(),
247                program_exec.iteration_counts_slot(),
248                program_exec.peak_memory_slot(),
249                program_exec.warnings_slot(),
250                program_exec.approximate_slot(),
251            )
252        } else {
253            (
254                Arc::new(std::sync::RwLock::new(None)),
255                Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
256                Arc::new(std::sync::RwLock::new(0usize)),
257                Arc::new(std::sync::RwLock::new(Vec::new())),
258                Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
259            )
260        };
261
262        // 5. Execute strata
263        let _stats_batches = uni_query::Executor::collect_batches(&session_ctx, exec_plan)
264            .await
265            .map_err(map_native_df_error)?;
266
267        // 6. Extract native DerivedStore
268        let native_store = derived_store_slot
269            .write()
270            .unwrap()
271            .take()
272            .unwrap_or_default();
273
274        // 7. Convert native DerivedStore → row-based RowStore for SLG/EXPLAIN
275        let mut orch_store = native_store_to_row_store(&native_store, &compiled);
276
277        // 8. Dispatch commands via native trait interfaces
278        let native_ctx = NativeExecutionAdapter::new(
279            self.db,
280            &native_store,
281            &compiled,
282            planner.graph_ctx().clone(),
283            planner.session_ctx().clone(),
284            config.params.clone(),
285        );
286        let mut locy_stats = LocyStats {
287            total_iterations: iteration_counts_slot
288                .read()
289                .map(|c| c.values().sum::<usize>())
290                .unwrap_or(0),
291            peak_memory_bytes: peak_memory_slot.read().map(|v| *v).unwrap_or(0),
292            ..LocyStats::default()
293        };
294        let approx_for_explain = approximate_slot
295            .read()
296            .map(|a| a.clone())
297            .unwrap_or_default();
298        let mut command_results = Vec::new();
299        for cmd in &compiled.commands {
300            let result = dispatch_native_command(
301                cmd,
302                &compiled,
303                &native_ctx,
304                config,
305                &mut orch_store,
306                &mut locy_stats,
307                tracker.clone(),
308                start,
309                &approx_for_explain,
310            )
311            .await
312            .map_err(map_runtime_error)?;
313            command_results.push(result);
314        }
315
316        let evaluation_time = start.elapsed();
317
318        // 9. Build derived map, enrich VID columns with full nodes
319        let mut base_derived: HashMap<String, Vec<Row>> = native_store
320            .rule_names()
321            .filter_map(|name| {
322                native_store
323                    .get(name)
324                    .map(|batches| (name.to_string(), record_batches_to_locy_rows(batches)))
325            })
326            .collect();
327
328        // Stamp _approximate on facts in rules that had BDD fallback groups.
329        let approximate_groups = approximate_slot
330            .read()
331            .map(|a| a.clone())
332            .unwrap_or_default();
333        for (rule_name, groups) in &approximate_groups {
334            if !groups.is_empty()
335                && let Some(rows) = base_derived.get_mut(rule_name)
336            {
337                for row in rows.iter_mut() {
338                    row.insert("_approximate".to_string(), Value::Bool(true));
339                }
340            }
341        }
342
343        let enriched_derived = enrich_vids_with_nodes(
344            self.db,
345            &native_store,
346            base_derived,
347            planner.graph_ctx(),
348            planner.session_ctx(),
349        )
350        .await;
351
352        // 10. Build final LocyResult
353        let warnings = warnings_slot.read().map(|w| w.clone()).unwrap_or_default();
354        Ok(build_locy_result(
355            enriched_derived,
356            command_results,
357            &compiled,
358            evaluation_time,
359            locy_stats,
360            warnings,
361            approximate_groups,
362        ))
363    }
364
365    /// Run only the fixpoint strata (no commands) via the native DataFusion path.
366    ///
367    /// Used by `re_evaluate_strata()` so that savepoint-scoped mutations from
368    /// ASSUME/ABDUCE hypothetical states are visible — the `Executor` is configured
369    /// with `self.db.writer`, which holds the active transaction handle.
370    async fn run_strata_native(
371        &self,
372        compiled: &CompiledProgram,
373        config: &LocyConfig,
374    ) -> Result<uni_query::query::df_graph::DerivedStore> {
375        let schema = self.db.schema.schema();
376        let query_planner = uni_query::QueryPlanner::new(schema);
377        let plan_builder = uni_query::query::locy_planner::LocyPlanBuilder::new(&query_planner);
378        let logical = plan_builder
379            .build_program_plan(
380                compiled,
381                config.max_iterations,
382                config.timeout,
383                config.max_derived_bytes,
384                config.deterministic_best_by,
385                config.strict_probability_domain,
386                config.probability_epsilon,
387                config.exact_probability,
388                config.max_bdd_variables,
389                config.top_k_proofs,
390            )
391            .map_err(|e| UniError::Query {
392                message: format!("LocyPlanBuildError: {e}"),
393                query: None,
394            })?;
395
396        let mut df_executor = uni_query::Executor::new(self.db.storage.clone());
397        df_executor.set_config(self.db.config.clone());
398        if let Some(ref w) = self.db.writer {
399            df_executor.set_writer(w.clone());
400        }
401        df_executor.set_xervo_runtime(self.db.xervo_runtime.clone());
402        df_executor.set_procedure_registry(self.db.procedure_registry.clone());
403
404        let (session_ctx, planner, _) = df_executor
405            .create_datafusion_planner(&self.db.properties, &HashMap::new())
406            .await
407            .map_err(map_native_df_error)?;
408        let exec_plan = planner.plan(&logical).map_err(map_native_df_error)?;
409
410        let derived_store_slot = if let Some(program_exec) =
411            exec_plan
412                .as_any()
413                .downcast_ref::<uni_query::query::df_graph::LocyProgramExec>()
414        {
415            program_exec.derived_store_slot()
416        } else {
417            Arc::new(std::sync::RwLock::new(None))
418        };
419
420        let _ = uni_query::Executor::collect_batches(&session_ctx, exec_plan)
421            .await
422            .map_err(map_native_df_error)?;
423        Ok(derived_store_slot
424            .write()
425            .unwrap()
426            .take()
427            .unwrap_or_default())
428    }
429}
430
431// ── NativeExecutionAdapter — implements DerivedFactSource + LocyExecutionContext ─
432
433struct NativeExecutionAdapter<'a> {
434    db: &'a Uni,
435    native_store: &'a uni_query::query::df_graph::DerivedStore,
436    compiled: &'a CompiledProgram,
437    /// Execution contexts from the fixpoint planner for columnar query execution.
438    graph_ctx: Arc<uni_query::query::df_graph::GraphExecutionContext>,
439    session_ctx: Arc<parking_lot::RwLock<datafusion::prelude::SessionContext>>,
440    /// Query parameters threaded from LocyConfig; passed to execute_subplan so
441    /// that $param references in rule MATCH WHERE clauses are resolved.
442    params: HashMap<String, Value>,
443}
444
445impl<'a> NativeExecutionAdapter<'a> {
446    fn new(
447        db: &'a Uni,
448        native_store: &'a uni_query::query::df_graph::DerivedStore,
449        compiled: &'a CompiledProgram,
450        graph_ctx: Arc<uni_query::query::df_graph::GraphExecutionContext>,
451        session_ctx: Arc<parking_lot::RwLock<datafusion::prelude::SessionContext>>,
452        params: HashMap<String, Value>,
453    ) -> Self {
454        Self {
455            db,
456            native_store,
457            compiled,
458            graph_ctx,
459            session_ctx,
460            params,
461        }
462    }
463
464    /// Execute a Query AST via execute_subplan, reusing the fixpoint contexts.
465    async fn execute_query_ast(
466        &self,
467        ast: Query,
468    ) -> std::result::Result<Vec<RecordBatch>, LocyError> {
469        let schema = self.db.schema.schema();
470        let logical_plan =
471            QueryPlanner::new(schema)
472                .plan(ast)
473                .map_err(|e| LocyError::ExecutorError {
474                    message: e.to_string(),
475                })?;
476        uni_query::query::df_graph::common::execute_subplan(
477            &logical_plan,
478            &self.params,
479            &HashMap::new(),
480            &self.graph_ctx,
481            &self.session_ctx,
482            &self.db.storage,
483            &self.db.schema.schema(),
484        )
485        .await
486        .map_err(|e| LocyError::ExecutorError {
487            message: e.to_string(),
488        })
489    }
490}
491
492#[async_trait(?Send)]
493impl DerivedFactSource for NativeExecutionAdapter<'_> {
494    fn lookup_derived(&self, rule_name: &str) -> std::result::Result<Vec<Row>, LocyError> {
495        let batches = self
496            .native_store
497            .get(rule_name)
498            .map(|v| v.as_slice())
499            .unwrap_or(&[]);
500        Ok(record_batches_to_locy_rows(batches))
501    }
502
503    fn lookup_derived_batches(
504        &self,
505        rule_name: &str,
506    ) -> std::result::Result<Vec<RecordBatch>, LocyError> {
507        Ok(self
508            .native_store
509            .get(rule_name)
510            .map(|v| v.to_vec())
511            .unwrap_or_default())
512    }
513
514    async fn execute_pattern(
515        &self,
516        pattern: &Pattern,
517        where_conditions: &[Expr],
518    ) -> std::result::Result<Vec<RecordBatch>, LocyError> {
519        let query = build_match_return_query(pattern, where_conditions);
520        let schema = self.db.schema.schema();
521        let logical_plan =
522            QueryPlanner::new(schema)
523                .plan(query)
524                .map_err(|e| LocyError::ExecutorError {
525                    message: e.to_string(),
526                })?;
527
528        // When a savepoint transaction is active (e.g., inside ASSUME/ABDUCE mutations),
529        // the stored graph_ctx was built before the savepoint mutations and its L0Context
530        // does not include the transaction-local L0 buffer. Rebuild a temporary context
531        // that includes transaction_l0 so pattern queries see the hypothetical state.
532        let transaction_ctx: Option<Arc<uni_query::query::df_graph::GraphExecutionContext>> =
533            if let Some(writer_arc) = &self.db.writer {
534                if let Ok(writer) = writer_arc.try_read() {
535                    if writer.transaction_l0.is_some() {
536                        let l0_ctx = uni_query::query::df_graph::L0Context {
537                            current_l0: Some(writer.l0_manager.get_current()),
538                            transaction_l0: writer.transaction_l0.clone(),
539                            pending_flush_l0s: writer.l0_manager.get_pending_flush(),
540                        };
541                        Some(Arc::new(
542                            uni_query::query::df_graph::GraphExecutionContext::with_l0_context(
543                                self.db.storage.clone(),
544                                l0_ctx,
545                                self.graph_ctx.property_manager().clone(),
546                            ),
547                        ))
548                    } else {
549                        None
550                    }
551                } else {
552                    None
553                }
554            } else {
555                None
556            };
557
558        let effective_ctx = transaction_ctx.as_ref().unwrap_or(&self.graph_ctx);
559
560        // Use the fixpoint planner's execution contexts directly via execute_subplan.
561        uni_query::query::df_graph::common::execute_subplan(
562            &logical_plan,
563            &self.params,
564            &HashMap::new(),
565            effective_ctx,
566            &self.session_ctx,
567            &self.db.storage,
568            &self.db.schema.schema(),
569        )
570        .await
571        .map_err(|e| LocyError::ExecutorError {
572            message: e.to_string(),
573        })
574    }
575}
576
577#[async_trait(?Send)]
578impl LocyExecutionContext for NativeExecutionAdapter<'_> {
579    async fn lookup_derived_enriched(
580        &self,
581        rule_name: &str,
582    ) -> std::result::Result<Vec<Row>, LocyError> {
583        use arrow_schema::DataType;
584
585        if let Some(rule) = self.compiled.rule_catalog.get(rule_name) {
586            let is_derive_rule = rule
587                .clauses
588                .iter()
589                .all(|c| matches!(c.output, RuleOutput::Derive(_)));
590            if is_derive_rule {
591                let mut all_rows = Vec::new();
592                for clause in &rule.clauses {
593                    let cypher_conds = extract_cypher_conditions(&clause.where_conditions);
594                    let raw_batches = self
595                        .execute_pattern(&clause.match_pattern, &cypher_conds)
596                        .await?;
597                    all_rows.extend(record_batches_to_locy_rows(&raw_batches));
598                }
599                return Ok(all_rows);
600            }
601        }
602
603        let batches = self
604            .native_store
605            .get(rule_name)
606            .map(|v| v.as_slice())
607            .unwrap_or(&[]);
608        let rows = record_batches_to_locy_rows(batches);
609
610        let vid_columns: HashSet<String> = batches
611            .first()
612            .map(|batch| {
613                batch
614                    .schema()
615                    .fields()
616                    .iter()
617                    .filter(|f| *f.data_type() == DataType::UInt64)
618                    .map(|f| f.name().clone())
619                    .collect()
620            })
621            .unwrap_or_default();
622
623        if vid_columns.is_empty() {
624            return Ok(rows);
625        }
626
627        let unique_vids: HashSet<i64> = rows
628            .iter()
629            .flat_map(|row| {
630                vid_columns.iter().filter_map(|col| {
631                    if let Some(Value::Int(vid)) = row.get(col) {
632                        Some(*vid)
633                    } else {
634                        None
635                    }
636                })
637            })
638            .collect();
639
640        if unique_vids.is_empty() {
641            return Ok(rows);
642        }
643
644        let vids_literal = unique_vids
645            .iter()
646            .map(|v| v.to_string())
647            .collect::<Vec<_>>()
648            .join(", ");
649        let query_str =
650            format!("MATCH (n) WHERE id(n) IN [{vids_literal}] RETURN id(n) AS _vid, n");
651        let mut vid_to_node: HashMap<i64, Value> = HashMap::new();
652        if let Ok(ast) = uni_cypher::parse(&query_str)
653            && let Ok(batches) = self.execute_query_ast(ast).await
654        {
655            for row in record_batches_to_locy_rows(&batches) {
656                if let (Some(Value::Int(vid)), Some(node)) = (row.get("_vid"), row.get("n")) {
657                    vid_to_node.insert(*vid, node.clone());
658                }
659            }
660        }
661
662        Ok(rows
663            .into_iter()
664            .map(|row| {
665                row.into_iter()
666                    .map(|(k, v)| {
667                        if vid_columns.contains(&k)
668                            && let Value::Int(vid) = &v
669                        {
670                            let new_v = vid_to_node.get(vid).cloned().unwrap_or(v);
671                            return (k, new_v);
672                        }
673                        (k, v)
674                    })
675                    .collect()
676            })
677            .collect())
678    }
679
680    async fn execute_cypher_read(&self, ast: Query) -> std::result::Result<Vec<Row>, LocyError> {
681        // Must use execute_ast_internal (fresh SessionContext) so that savepoint
682        // mutations applied during ASSUME/ABDUCE body dispatch are visible.
683        let result = self
684            .db
685            .execute_ast_internal(ast, "<locy>", HashMap::new(), self.db.config.clone())
686            .await
687            .map_err(|e| LocyError::ExecutorError {
688                message: e.to_string(),
689            })?;
690        Ok(result
691            .rows
692            .into_iter()
693            .map(|row| {
694                row.columns
695                    .iter()
696                    .zip(row.values)
697                    .map(|(col, val)| (col.clone(), val))
698                    .collect()
699            })
700            .collect())
701    }
702
703    async fn execute_mutation(
704        &self,
705        ast: Query,
706        params: HashMap<String, Value>,
707    ) -> std::result::Result<usize, LocyError> {
708        let before = self.db.get_mutation_count().await;
709        self.db
710            .execute_ast_internal(ast, "<locy>", params, self.db.config.clone())
711            .await
712            .map_err(|e| LocyError::ExecutorError {
713                message: e.to_string(),
714            })?;
715        let after = self.db.get_mutation_count().await;
716        Ok(after.saturating_sub(before))
717    }
718
719    async fn begin_savepoint(&self) -> std::result::Result<SavepointId, LocyError> {
720        let writer = self
721            .db
722            .writer
723            .as_ref()
724            .ok_or_else(|| LocyError::SavepointFailed {
725                message: "database is read-only".to_string(),
726            })?;
727        let mut w = writer.write().await;
728        w.begin_transaction()
729            .map_err(|e| LocyError::SavepointFailed {
730                message: e.to_string(),
731            })?;
732        Ok(SavepointId(0))
733    }
734
735    async fn rollback_savepoint(&self, _id: SavepointId) -> std::result::Result<(), LocyError> {
736        let writer = self
737            .db
738            .writer
739            .as_ref()
740            .ok_or_else(|| LocyError::SavepointFailed {
741                message: "database is read-only".to_string(),
742            })?;
743        let mut w = writer.write().await;
744        w.rollback_transaction()
745            .map_err(|e| LocyError::SavepointFailed {
746                message: e.to_string(),
747            })?;
748        Ok(())
749    }
750
751    async fn re_evaluate_strata(
752        &self,
753        program: &CompiledProgram,
754        config: &LocyConfig,
755    ) -> std::result::Result<RowStore, LocyError> {
756        let strata_only = CompiledProgram {
757            strata: program.strata.clone(),
758            rule_catalog: program.rule_catalog.clone(),
759            warnings: vec![],
760            commands: vec![],
761        };
762        let engine = self.db.locy();
763        let native_store = engine
764            .run_strata_native(&strata_only, config)
765            .await
766            .map_err(|e| LocyError::ExecutorError {
767                message: e.to_string(),
768            })?;
769        Ok(native_store_to_row_store(&native_store, program))
770    }
771}
772
773// ── Native command dispatch ───────────────────────────────────────────────────
774
775#[allow(clippy::too_many_arguments)]
776fn dispatch_native_command<'a>(
777    cmd: &'a CompiledCommand,
778    program: &'a CompiledProgram,
779    ctx: &'a NativeExecutionAdapter<'a>,
780    config: &'a LocyConfig,
781    orch_store: &'a mut RowStore,
782    stats: &'a mut LocyStats,
783    tracker: Option<Arc<ProvenanceStore>>,
784    start: Instant,
785    approximate_groups: &'a HashMap<String, Vec<String>>,
786) -> std::pin::Pin<
787    Box<dyn std::future::Future<Output = std::result::Result<CommandResult, LocyError>> + 'a>,
788> {
789    Box::pin(async move {
790        match cmd {
791            CompiledCommand::GoalQuery(gq) => {
792                let rows = uni_query::query::df_graph::locy_query::evaluate_query(
793                    gq, program, ctx, config, orch_store, stats, start,
794                )
795                .await?;
796                Ok(CommandResult::Query(rows))
797            }
798            CompiledCommand::ExplainRule(eq) => {
799                let node = uni_query::query::df_graph::locy_explain::explain_rule(
800                    eq,
801                    program,
802                    ctx,
803                    config,
804                    orch_store,
805                    stats,
806                    tracker.as_deref(),
807                    Some(approximate_groups),
808                )
809                .await?;
810                Ok(CommandResult::Explain(node))
811            }
812            CompiledCommand::Assume(ca) => {
813                let rows = uni_query::query::df_graph::locy_assume::evaluate_assume(
814                    ca, program, ctx, config, stats,
815                )
816                .await?;
817                Ok(CommandResult::Assume(rows))
818            }
819            CompiledCommand::Abduce(aq) => {
820                let result = uni_query::query::df_graph::locy_abduce::evaluate_abduce(
821                    aq,
822                    program,
823                    ctx,
824                    config,
825                    orch_store,
826                    stats,
827                    tracker.as_deref(),
828                )
829                .await?;
830                Ok(CommandResult::Abduce(result))
831            }
832            CompiledCommand::DeriveCommand(dc) => {
833                let affected = uni_query::query::df_graph::locy_derive::derive_command(
834                    dc, program, ctx, stats,
835                )
836                .await?;
837                Ok(CommandResult::Derive { affected })
838            }
839            CompiledCommand::Cypher(q) => {
840                let rows = ctx.execute_cypher_read(q.clone()).await?;
841                stats.queries_executed += 1;
842                Ok(CommandResult::Cypher(rows))
843            }
844        }
845    })
846}
847
848// ── Helpers ───────────────────────────────────────────────────────────────────
849
850async fn enrich_vids_with_nodes(
851    db: &Uni,
852    native_store: &uni_query::query::df_graph::DerivedStore,
853    derived: HashMap<String, Vec<Row>>,
854    graph_ctx: &Arc<uni_query::query::df_graph::GraphExecutionContext>,
855    session_ctx: &Arc<parking_lot::RwLock<datafusion::prelude::SessionContext>>,
856) -> HashMap<String, Vec<Row>> {
857    use arrow_schema::DataType;
858    let mut enriched = HashMap::new();
859
860    for (name, rows) in derived {
861        let vid_columns: HashSet<String> = native_store
862            .get(&name)
863            .and_then(|batches| batches.first())
864            .map(|batch| {
865                batch
866                    .schema()
867                    .fields()
868                    .iter()
869                    .filter(|f| *f.data_type() == DataType::UInt64)
870                    .map(|f| f.name().clone())
871                    .collect()
872            })
873            .unwrap_or_default();
874
875        if vid_columns.is_empty() {
876            enriched.insert(name, rows);
877            continue;
878        }
879
880        let unique_vids: HashSet<i64> = rows
881            .iter()
882            .flat_map(|row| {
883                vid_columns.iter().filter_map(|col| {
884                    if let Some(Value::Int(vid)) = row.get(col) {
885                        Some(*vid)
886                    } else {
887                        None
888                    }
889                })
890            })
891            .collect();
892
893        if unique_vids.is_empty() {
894            enriched.insert(name, rows);
895            continue;
896        }
897
898        let vids_literal = unique_vids
899            .iter()
900            .map(|v| v.to_string())
901            .collect::<Vec<_>>()
902            .join(", ");
903        let query_str = format!(
904            "MATCH (n) WHERE id(n) IN [{}] RETURN id(n) AS _vid, n",
905            vids_literal
906        );
907        let mut vid_to_node: HashMap<i64, Value> = HashMap::new();
908        if let Ok(ast) = uni_cypher::parse(&query_str) {
909            let schema = db.schema.schema();
910            if let Ok(logical_plan) = uni_query::QueryPlanner::new(schema).plan(ast)
911                && let Ok(batches) = uni_query::query::df_graph::common::execute_subplan(
912                    &logical_plan,
913                    &HashMap::new(),
914                    &HashMap::new(),
915                    graph_ctx,
916                    session_ctx,
917                    &db.storage,
918                    &db.schema.schema(),
919                )
920                .await
921            {
922                for row in record_batches_to_locy_rows(&batches) {
923                    if let (Some(Value::Int(vid)), Some(node)) = (row.get("_vid"), row.get("n")) {
924                        vid_to_node.insert(*vid, node.clone());
925                    }
926                }
927            }
928        }
929
930        let enriched_rows: Vec<Row> = rows
931            .into_iter()
932            .map(|row| {
933                row.into_iter()
934                    .map(|(k, v)| {
935                        if vid_columns.contains(&k)
936                            && let Value::Int(vid) = &v
937                        {
938                            let new_v = vid_to_node.get(vid).cloned().unwrap_or(v);
939                            return (k, new_v);
940                        }
941                        (k, v)
942                    })
943                    .collect()
944            })
945            .collect();
946        enriched.insert(name, enriched_rows);
947    }
948
949    enriched
950}
951
952fn build_locy_result(
953    derived: HashMap<String, Vec<Row>>,
954    command_results: Vec<CommandResult>,
955    compiled: &CompiledProgram,
956    evaluation_time: Duration,
957    mut orchestrator_stats: LocyStats,
958    warnings: Vec<RuntimeWarning>,
959    approximate_groups: HashMap<String, Vec<String>>,
960) -> LocyResult {
961    let total_facts: usize = derived.values().map(|v| v.len()).sum();
962    orchestrator_stats.strata_evaluated = compiled.strata.len();
963    orchestrator_stats.derived_nodes = total_facts;
964    orchestrator_stats.evaluation_time = evaluation_time;
965
966    LocyResult {
967        derived,
968        stats: orchestrator_stats,
969        command_results,
970        warnings,
971        approximate_groups,
972    }
973}
974
975fn native_store_to_row_store(
976    native: &uni_query::query::df_graph::DerivedStore,
977    compiled: &CompiledProgram,
978) -> RowStore {
979    let mut result = RowStore::new();
980    for name in native.rule_names() {
981        if let Some(batches) = native.get(name) {
982            let rows = record_batches_to_locy_rows(batches);
983            let rule = compiled.rule_catalog.get(name);
984            let columns: Vec<String> = rule
985                .map(|r| r.yield_schema.iter().map(|yc| yc.name.clone()).collect())
986                .unwrap_or_else(|| {
987                    rows.first()
988                        .map(|r| r.keys().cloned().collect())
989                        .unwrap_or_default()
990                });
991            result.insert(name.to_string(), RowRelation::new(columns, rows));
992        }
993    }
994    result
995}
996
997// ── Error mapping ──────────────────────────────────────────────────────────
998
999fn map_parse_error(e: uni_cypher::ParseError) -> UniError {
1000    UniError::Parse {
1001        message: format!("LocyParseError: {e}"),
1002        position: None,
1003        line: None,
1004        column: None,
1005        context: None,
1006    }
1007}
1008
1009fn map_compile_error(e: LocyCompileError) -> UniError {
1010    UniError::Query {
1011        message: format!("LocyCompileError: {e}"),
1012        query: None,
1013    }
1014}
1015
1016fn map_runtime_error(e: LocyError) -> UniError {
1017    match e {
1018        LocyError::SavepointFailed { ref message } => UniError::Transaction {
1019            message: format!("LocyRuntimeError: {message}"),
1020        },
1021        other => UniError::Query {
1022            message: format!("LocyRuntimeError: {other}"),
1023            query: None,
1024        },
1025    }
1026}
1027
1028fn map_native_df_error(e: impl std::fmt::Display) -> UniError {
1029    UniError::Query {
1030        message: format!("LocyRuntimeError: {e}"),
1031        query: None,
1032    }
1033}