Skip to main content

uni_db/api/
impl_locy.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Locy engine integration: wires the Locy compiler and native execution engine to the real database.
5
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use arrow_array::RecordBatch;
11use async_trait::async_trait;
12use uni_common::{Result, UniError, Value};
13use uni_cypher::ast::{Expr, Pattern, Query};
14use uni_cypher::locy_ast::RuleOutput;
15use uni_locy::types::CompiledCommand;
16use uni_locy::{
17    CommandResult, CompiledProgram, DerivedFactSet, FactRow, LocyCompileError, LocyConfig,
18    LocyError, LocyStats, RuntimeWarning, compile,
19};
20use uni_query::{QueryMetrics, QueryPlanner};
21
22use crate::api::locy_result::LocyResult;
23use uni_query::query::df_graph::locy_ast_builder::build_match_return_query;
24use uni_query::query::df_graph::locy_delta::{RowRelation, RowStore, extract_cypher_conditions};
25use uni_query::query::df_graph::locy_derive::CollectedDeriveOutput;
26use uni_query::query::df_graph::locy_eval::record_batches_to_locy_rows;
27use uni_query::query::df_graph::locy_explain::ProvenanceStore;
28use uni_query::query::df_graph::{DerivedFactSource, LocyExecutionContext};
29
30/// Session-level registry for pre-compiled Locy rules.
31///
32/// Rules registered here are automatically merged into subsequent `evaluate()`
33/// calls, eliminating the need to redeclare rules across multiple evaluations
34/// (e.g., baseline, EXPLAIN, ASSUME, ABDUCE in notebooks).
35#[derive(Debug, Default, Clone)]
36pub struct LocyRuleRegistry {
37    /// Compiled rules indexed by rule name.
38    pub rules: HashMap<String, uni_locy::types::CompiledRule>,
39    /// Strata from registered programs, for execution ordering.
40    pub strata: Vec<uni_locy::types::Stratum>,
41    /// Source program texts, stored for recompilation on rule removal.
42    pub sources: Vec<String>,
43}
44
45/// Compile and register rules into an existing rule registry.
46///
47/// Shared logic used by `Uni::register_rules()` and `Session::register_rules()`.
48pub(crate) fn register_rules_on_registry(
49    registry_lock: &std::sync::RwLock<LocyRuleRegistry>,
50    program: &str,
51) -> Result<()> {
52    let ast = uni_cypher::parse_locy(program).map_err(map_parse_error)?;
53    let registry = registry_lock.read().unwrap();
54    let compiled = if registry.rules.is_empty() {
55        drop(registry);
56        compile(&ast).map_err(map_compile_error)?
57    } else {
58        let external_names: Vec<String> = registry.rules.keys().cloned().collect();
59        drop(registry);
60        uni_locy::compile_with_external_rules(&ast, &external_names).map_err(map_compile_error)?
61    };
62    let mut registry = registry_lock.write().unwrap();
63    for (name, rule) in compiled.rule_catalog {
64        registry.rules.insert(name, rule);
65    }
66    let base_id = registry.strata.len();
67    for mut stratum in compiled.strata {
68        stratum.id += base_id;
69        stratum.depends_on = stratum.depends_on.iter().map(|d| base_id + d).collect();
70        registry.strata.push(stratum);
71    }
72    registry.sources.push(program.to_string());
73    Ok(())
74}
75
76/// Evaluate a Locy program against the database with a specific rule registry.
77///
78/// This is the core evaluation path used by Session and Transaction.
79pub(crate) async fn evaluate_with_db_and_config(
80    db: &crate::api::UniInner,
81    program: &str,
82    config: &LocyConfig,
83    rule_registry: &std::sync::RwLock<LocyRuleRegistry>,
84) -> Result<LocyResult> {
85    // Compile with the given registry
86    let ast = uni_cypher::parse_locy(program).map_err(map_parse_error)?;
87    let external_names: Option<Vec<String>> = {
88        let registry = rule_registry.read().unwrap();
89        if registry.rules.is_empty() {
90            None
91        } else {
92            Some(registry.rules.keys().cloned().collect())
93        }
94    };
95    let mut compiled = if let Some(names) = external_names {
96        uni_locy::compile_with_external_rules(&ast, &names).map_err(map_compile_error)?
97    } else {
98        compile(&ast).map_err(map_compile_error)?
99    };
100
101    // Merge registered rules
102    {
103        let registry = rule_registry.read().unwrap();
104        if !registry.rules.is_empty() {
105            for (name, rule) in &registry.rules {
106                compiled
107                    .rule_catalog
108                    .entry(name.clone())
109                    .or_insert_with(|| rule.clone());
110            }
111            let base_id = registry.strata.len();
112            for stratum in &mut compiled.strata {
113                stratum.id += base_id;
114                stratum.depends_on = stratum.depends_on.iter().map(|d| d + base_id).collect();
115            }
116            let mut merged_strata = registry.strata.clone();
117            merged_strata.append(&mut compiled.strata);
118            compiled.strata = merged_strata;
119        }
120    }
121
122    // Create a LocyEngine directly from &UniInner.
123    // Session-level: collect DERIVE output for deferred materialization.
124    // Always create an ephemeral locy_l0 for the evaluation scope — this provides:
125    // - DERIVE visibility: trailing Cypher sees DERIVE mutations
126    // - ASSUME/ABDUCE isolation: fork/restore from this buffer
127    let locy_l0 = if let Some(ref writer) = db.writer {
128        let w = writer.read().await;
129        Some(w.create_transaction_l0())
130    } else {
131        None // Read-only DB: degrade gracefully
132    };
133    let engine = LocyEngine {
134        db,
135        tx_l0_override: locy_l0.clone(),
136        locy_l0,
137        collect_derive: true,
138    };
139    engine.evaluate_compiled_with_config(compiled, config).await
140}
141
142/// Engine for evaluating Locy programs against a real database.
143pub struct LocyEngine<'a> {
144    pub(crate) db: &'a crate::api::UniInner,
145    /// When set, the engine routes reads/writes through this private L0 buffer
146    /// (commit-time serialization for transactions).
147    pub(crate) tx_l0_override: Option<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
148    /// Ephemeral L0 buffer for Locy evaluation scope.
149    /// Session path: ephemeral per-locy() buffer (DERIVE writes here, discarded on return).
150    /// Transaction path: same as tx_l0 (DERIVE auto-applies).
151    /// ASSUME/ABDUCE fork from here via fork_l0/restore_l0.
152    pub(crate) locy_l0: Option<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
153    /// When true, DERIVE commands collect ASTs + data instead of executing.
154    /// Session-level evaluation sets this to true; transaction-level sets false.
155    pub(crate) collect_derive: bool,
156}
157
158impl crate::api::Uni {
159    /// Create a Locy evaluation engine bound to this database (internal).
160    ///
161    /// All external access goes through `Session::locy()` / `Session::locy_with()`.
162    #[allow(dead_code)]
163    pub(crate) fn locy(&self) -> LocyEngine<'_> {
164        LocyEngine {
165            db: &self.inner,
166            tx_l0_override: None,
167            locy_l0: None,
168            collect_derive: true,
169        }
170    }
171}
172
173impl<'a> LocyEngine<'a> {
174    /// Parse and compile a Locy program without executing it.
175    ///
176    /// If the session's rule registry contains pre-compiled rules, their names
177    /// are passed to the compiler so that IS-ref and QUERY references to
178    /// registered rules are accepted during validation.
179    pub fn compile_only(&self, program: &str) -> Result<CompiledProgram> {
180        let ast = uni_cypher::parse_locy(program).map_err(map_parse_error)?;
181        let registry = self.db.locy_rule_registry.read().unwrap();
182        if registry.rules.is_empty() {
183            drop(registry);
184            compile(&ast).map_err(map_compile_error)
185        } else {
186            let external_names: Vec<String> = registry.rules.keys().cloned().collect();
187            drop(registry);
188            uni_locy::compile_with_external_rules(&ast, &external_names).map_err(map_compile_error)
189        }
190    }
191
192    /// Compile and register a Locy program's rules for reuse.
193    ///
194    /// Rules registered here persist within the database session and are
195    /// automatically merged into subsequent `evaluate()` calls, so notebooks
196    /// can define rules once and run QUERY, EXPLAIN, ASSUME, ABDUCE without
197    /// redeclaring the full rule set each time.
198    pub fn register(&self, program: &str) -> Result<()> {
199        let compiled = self.compile_only(program)?;
200        let mut registry = self.db.locy_rule_registry.write().unwrap();
201        for (name, rule) in compiled.rule_catalog {
202            registry.rules.insert(name, rule);
203        }
204        // Merge strata, assigning new IDs to avoid collisions.
205        let base_id = registry.strata.len();
206        for mut stratum in compiled.strata {
207            stratum.id += base_id;
208            stratum.depends_on = stratum.depends_on.iter().map(|d| base_id + d).collect();
209            registry.strata.push(stratum);
210        }
211        Ok(())
212    }
213
214    /// Clear all registered Locy rules from the session.
215    pub fn clear_registry(&self) {
216        let mut registry = self.db.locy_rule_registry.write().unwrap();
217        registry.rules.clear();
218        registry.strata.clear();
219    }
220
221    /// Parse, compile, and evaluate a Locy program with default config.
222    pub async fn evaluate(&self, program: &str) -> Result<LocyResult> {
223        self.evaluate_with_config(program, &LocyConfig::default())
224            .await
225    }
226
227    /// Start building a Locy evaluation with fluent parameter binding.
228    ///
229    /// Mirrors `db.query_with(cypher).param(…).fetch_all()` for Cypher.
230    ///
231    /// # Examples
232    ///
233    /// ```no_run
234    /// # use uni_db::Uni;
235    /// # async fn example(db: &Uni) -> uni_db::Result<()> {
236    /// let result = db.session()
237    ///     .locy_with("CREATE RULE ep AS MATCH (e:Episode) WHERE e.agent_id = $aid YIELD KEY e")
238    ///     .param("aid", "agent-123")
239    ///     .run()
240    ///     .await?;
241    /// # Ok(())
242    /// # }
243    /// ```
244    pub fn evaluate_with(&self, program: &str) -> crate::api::locy_builder::InnerLocyBuilder<'_> {
245        crate::api::locy_builder::InnerLocyBuilder::new(self.db, program)
246    }
247
248    /// Convenience wrapper for EXPLAIN RULE commands.
249    pub async fn explain(&self, program: &str) -> Result<LocyResult> {
250        self.evaluate(program).await
251    }
252
253    /// Parse, compile, and evaluate a Locy program with custom config.
254    ///
255    /// If rules were previously registered via `register()`, they are
256    /// automatically merged into the compiled program before execution.
257    pub async fn evaluate_with_config(
258        &self,
259        program: &str,
260        config: &LocyConfig,
261    ) -> Result<LocyResult> {
262        let mut compiled = self.compile_only(program)?;
263
264        // Merge registered rules into the compiled program.
265        {
266            let registry = self.db.locy_rule_registry.read().unwrap();
267            if !registry.rules.is_empty() {
268                for (name, rule) in &registry.rules {
269                    compiled
270                        .rule_catalog
271                        .entry(name.clone())
272                        .or_insert_with(|| rule.clone());
273                }
274                let base_id = registry.strata.len();
275                for stratum in &mut compiled.strata {
276                    stratum.id += base_id;
277                    stratum.depends_on = stratum.depends_on.iter().map(|d| d + base_id).collect();
278                }
279                let mut merged_strata = registry.strata.clone();
280                merged_strata.append(&mut compiled.strata);
281                compiled.strata = merged_strata;
282            }
283        }
284
285        self.evaluate_compiled_with_config(compiled, config).await
286    }
287
288    /// Evaluate an already-compiled Locy program with custom config.
289    ///
290    /// This is the core execution path: it takes a `CompiledProgram` (with any
291    /// registry merges already applied) and runs it through planning, execution,
292    /// and command dispatch.
293    pub async fn evaluate_compiled_with_config(
294        &self,
295        compiled: CompiledProgram,
296        config: &LocyConfig,
297    ) -> Result<LocyResult> {
298        let start = Instant::now();
299
300        // Capture current version for staleness detection in DerivedFactSet
301        let evaluated_at_version = if self.collect_derive {
302            if let Some(ref w) = self.db.writer {
303                w.read()
304                    .await
305                    .l0_manager
306                    .get_current()
307                    .read()
308                    .current_version
309            } else {
310                0
311            }
312        } else {
313            0
314        };
315
316        // 1. Build logical plan
317        let schema = self.db.schema.schema();
318        let query_planner = uni_query::QueryPlanner::new(schema);
319        let plan_builder = uni_query::query::locy_planner::LocyPlanBuilder::new(&query_planner);
320        let logical = plan_builder
321            .build_program_plan(
322                &compiled,
323                config.max_iterations,
324                config.timeout,
325                config.max_derived_bytes,
326                config.deterministic_best_by,
327                config.strict_probability_domain,
328                config.probability_epsilon,
329                config.exact_probability,
330                config.max_bdd_variables,
331                config.top_k_proofs,
332            )
333            .map_err(|e| UniError::Query {
334                message: format!("LocyPlanBuildError: {e}"),
335                query: None,
336            })?;
337
338        // 2. Create executor + physical planner
339        let mut df_executor = uni_query::Executor::new(self.db.storage.clone());
340        df_executor.set_config(self.db.config.clone());
341        if let Some(ref w) = self.db.writer {
342            df_executor.set_writer(w.clone());
343        }
344        df_executor.set_xervo_runtime(self.db.xervo_runtime.clone());
345        df_executor.set_procedure_registry(self.db.procedure_registry.clone());
346        if let Ok(reg) = self.db.custom_functions.read()
347            && !reg.is_empty()
348        {
349            df_executor.set_custom_functions(std::sync::Arc::new(reg.clone()));
350        }
351
352        let (session_ctx, planner, _prop_mgr) = df_executor
353            .create_datafusion_planner(&self.db.properties, &config.params)
354            .await
355            .map_err(map_native_df_error)?;
356
357        // 3. Physical plan
358        let exec_plan = planner.plan(&logical).map_err(map_native_df_error)?;
359
360        // 4. Create tracker for EXPLAIN commands or shared-proof detection
361        let has_explain = compiled
362            .commands
363            .iter()
364            .any(|c| matches!(c, CompiledCommand::ExplainRule(_)));
365        let has_prob_fold = compiled.strata.iter().any(|s| {
366            s.rules.iter().any(|r| {
367                r.clauses.iter().any(|c| {
368                    c.fold.iter().any(|f| {
369                        if let uni_cypher::ast::Expr::FunctionCall { name, .. } = &f.aggregate {
370                            matches!(name.to_uppercase().as_str(), "MNOR" | "MPROD")
371                        } else {
372                            false
373                        }
374                    })
375                })
376            })
377        });
378        let needs_tracker = has_explain || has_prob_fold;
379        let tracker: Option<Arc<uni_query::query::df_graph::ProvenanceStore>> = if needs_tracker {
380            Some(Arc::new(uni_query::query::df_graph::ProvenanceStore::new()))
381        } else {
382            None
383        };
384
385        let (
386            derived_store_slot,
387            iteration_counts_slot,
388            peak_memory_slot,
389            warnings_slot,
390            approximate_slot,
391            command_results_slot,
392            timeout_flag,
393        ) = if let Some(program_exec) = exec_plan
394            .as_any()
395            .downcast_ref::<uni_query::query::df_graph::LocyProgramExec>(
396        ) {
397            if let Some(ref t) = tracker {
398                program_exec.set_derivation_tracker(Arc::clone(t));
399            }
400            (
401                program_exec.derived_store_slot(),
402                program_exec.iteration_counts_slot(),
403                program_exec.peak_memory_slot(),
404                program_exec.warnings_slot(),
405                program_exec.approximate_slot(),
406                program_exec.command_results_slot(),
407                program_exec.timeout_flag(),
408            )
409        } else {
410            (
411                Arc::new(std::sync::RwLock::new(None)),
412                Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
413                Arc::new(std::sync::RwLock::new(0usize)),
414                Arc::new(std::sync::RwLock::new(Vec::new())),
415                Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
416                Arc::new(std::sync::RwLock::new(Vec::new())),
417                Arc::new(std::sync::atomic::AtomicBool::new(false)),
418            )
419        };
420
421        // 5. Execute strata
422        let _stats_batches = uni_query::Executor::collect_batches(&session_ctx, exec_plan)
423            .await
424            .map_err(map_native_df_error)?;
425
426        // 6. Extract native DerivedStore
427        let native_store = derived_store_slot
428            .write()
429            .unwrap()
430            .take()
431            .unwrap_or_default();
432
433        // 7. Convert native DerivedStore → row-based RowStore for SLG/EXPLAIN
434        let mut orch_store = native_store_to_row_store(&native_store, &compiled);
435
436        // 7b. Enrich VID integers → full Node objects so SLG/QUERY can access
437        //     node properties (d.name etc.) and IS-ref joins work correctly
438        //     across FOLD-rule boundaries.
439        {
440            let orch_rows: HashMap<String, Vec<FactRow>> = orch_store
441                .iter()
442                .map(|(k, v)| (k.clone(), v.rows.clone()))
443                .collect();
444            let enriched_rows = enrich_vids_with_nodes(
445                self.db,
446                &native_store,
447                orch_rows,
448                planner.graph_ctx(),
449                planner.session_ctx(),
450            )
451            .await;
452            for (name, rows) in enriched_rows {
453                if let Some(rel) = orch_store.get_mut(&name) {
454                    rel.rows = rows;
455                }
456            }
457        }
458
459        // 8. Dispatch commands via native trait interfaces
460        let native_ctx = NativeExecutionAdapter::new(
461            self.db,
462            &native_store,
463            &compiled,
464            planner.graph_ctx().clone(),
465            planner.session_ctx().clone(),
466            config.params.clone(),
467            self.tx_l0_override.clone(),
468        );
469        // Propagate locy_l0 to the adapter for DERIVE/ASSUME/ABDUCE scoping.
470        *native_ctx.locy_l0.lock().unwrap() = self.locy_l0.clone();
471        let mut locy_stats = LocyStats {
472            total_iterations: iteration_counts_slot
473                .read()
474                .map(|c| c.values().sum::<usize>())
475                .unwrap_or(0),
476            peak_memory_bytes: peak_memory_slot.read().map(|v| *v).unwrap_or(0),
477            ..LocyStats::default()
478        };
479        let approx_for_explain = approximate_slot
480            .read()
481            .map(|a| a.clone())
482            .unwrap_or_default();
483        // Collect inline results (QUERY, Cypher) already executed by run_program()
484        let inline_map: HashMap<usize, CommandResult> =
485            command_results_slot.write().unwrap().drain(..).collect();
486
487        let mut command_results = Vec::new();
488        let mut collected_derives: Vec<CollectedDeriveOutput> = Vec::new();
489        let timed_out_early = timeout_flag.load(std::sync::atomic::Ordering::Relaxed);
490        // Skip command dispatch when evaluation timed out — the partial derived
491        // store may be incomplete and SLG/QUERY would hit the expired timeout.
492        if !timed_out_early {
493            for (cmd_idx, cmd) in compiled.commands.iter().enumerate() {
494                if let Some(result) = inline_map.get(&cmd_idx) {
495                    // Already executed inline by run_program
496                    command_results.push(result.clone());
497                    continue;
498                }
499                let result = dispatch_native_command(
500                    cmd,
501                    &compiled,
502                    &native_ctx,
503                    config,
504                    &mut orch_store,
505                    &mut locy_stats,
506                    tracker.clone(),
507                    start,
508                    &approx_for_explain,
509                    self.collect_derive,
510                    &mut collected_derives,
511                )
512                .await
513                .map_err(map_runtime_error)?;
514                command_results.push(result);
515            }
516        }
517
518        let evaluation_time = start.elapsed();
519
520        // 9. Build derived map, enrich VID columns with full nodes
521        let mut base_derived: HashMap<String, Vec<FactRow>> = native_store
522            .rule_names()
523            .filter_map(|name| {
524                native_store
525                    .get(name)
526                    .map(|batches| (name.to_string(), record_batches_to_locy_rows(batches)))
527            })
528            .collect();
529
530        // Stamp _approximate on facts in rules that had BDD fallback groups.
531        let approximate_groups = approximate_slot
532            .read()
533            .map(|a| a.clone())
534            .unwrap_or_default();
535        for (rule_name, groups) in &approximate_groups {
536            if !groups.is_empty()
537                && let Some(rows) = base_derived.get_mut(rule_name)
538            {
539                for row in rows.iter_mut() {
540                    row.insert("_approximate".to_string(), Value::Bool(true));
541                }
542            }
543        }
544
545        let enriched_derived = enrich_vids_with_nodes(
546            self.db,
547            &native_store,
548            base_derived,
549            planner.graph_ctx(),
550            planner.session_ctx(),
551        )
552        .await;
553
554        // 10. Build DerivedFactSet from collected derives (session path only)
555        let derived_fact_set = if !collected_derives.is_empty() {
556            let mut all_vertices = HashMap::new();
557            let mut all_edges = Vec::new();
558            let mut all_queries = Vec::new();
559            for output in collected_derives {
560                for (label, verts) in output.vertices {
561                    all_vertices
562                        .entry(label)
563                        .or_insert_with(Vec::new)
564                        .extend(verts);
565                }
566                all_edges.extend(output.edges);
567                all_queries.extend(output.queries);
568            }
569            Some(DerivedFactSet {
570                vertices: all_vertices,
571                edges: all_edges,
572                stats: locy_stats.clone(),
573                evaluated_at_version,
574                mutation_queries: all_queries,
575            })
576        } else {
577            None
578        };
579
580        // 11. Build final LocyResult
581        let warnings = warnings_slot.read().map(|w| w.clone()).unwrap_or_default();
582        let timed_out = timeout_flag.load(std::sync::atomic::Ordering::Relaxed);
583        Ok(build_locy_result(
584            enriched_derived,
585            command_results,
586            &compiled,
587            evaluation_time,
588            locy_stats,
589            warnings,
590            approximate_groups,
591            derived_fact_set,
592            timed_out,
593        ))
594    }
595
596    /// Run only the fixpoint strata (no commands) via the native DataFusion path.
597    ///
598    /// Used by `re_evaluate_strata()` so that savepoint-scoped mutations from
599    /// ASSUME/ABDUCE hypothetical states are visible — the `Executor` is configured
600    /// with `self.db.writer`, which holds the active transaction handle.
601    async fn run_strata_native(
602        &self,
603        compiled: &CompiledProgram,
604        config: &LocyConfig,
605    ) -> Result<uni_query::query::df_graph::DerivedStore> {
606        let schema = self.db.schema.schema();
607        let query_planner = uni_query::QueryPlanner::new(schema);
608        let plan_builder = uni_query::query::locy_planner::LocyPlanBuilder::new(&query_planner);
609        let logical = plan_builder
610            .build_program_plan(
611                compiled,
612                config.max_iterations,
613                config.timeout,
614                config.max_derived_bytes,
615                config.deterministic_best_by,
616                config.strict_probability_domain,
617                config.probability_epsilon,
618                config.exact_probability,
619                config.max_bdd_variables,
620                config.top_k_proofs,
621            )
622            .map_err(|e| UniError::Query {
623                message: format!("LocyPlanBuildError: {e}"),
624                query: None,
625            })?;
626
627        let mut df_executor = uni_query::Executor::new(self.db.storage.clone());
628        df_executor.set_config(self.db.config.clone());
629        if let Some(ref w) = self.db.writer {
630            df_executor.set_writer(w.clone());
631        }
632        // Pass the tx_l0_override so the fixpoint planner sees uncommitted mutations
633        // (ASSUME/ABDUCE hypothetical state, session DERIVE mutations, etc.)
634        if let Some(ref l0) = self.tx_l0_override {
635            df_executor.set_transaction_l0(l0.clone());
636        }
637        df_executor.set_xervo_runtime(self.db.xervo_runtime.clone());
638        df_executor.set_procedure_registry(self.db.procedure_registry.clone());
639
640        let (session_ctx, planner, _) = df_executor
641            .create_datafusion_planner(&self.db.properties, &config.params)
642            .await
643            .map_err(map_native_df_error)?;
644        let exec_plan = planner.plan(&logical).map_err(map_native_df_error)?;
645
646        let derived_store_slot = if let Some(program_exec) =
647            exec_plan
648                .as_any()
649                .downcast_ref::<uni_query::query::df_graph::LocyProgramExec>()
650        {
651            program_exec.derived_store_slot()
652        } else {
653            Arc::new(std::sync::RwLock::new(None))
654        };
655
656        let _ = uni_query::Executor::collect_batches(&session_ctx, exec_plan)
657            .await
658            .map_err(map_native_df_error)?;
659        Ok(derived_store_slot
660            .write()
661            .unwrap()
662            .take()
663            .unwrap_or_default())
664    }
665}
666
667// ── NativeExecutionAdapter — implements DerivedFactSource + LocyExecutionContext ─
668
669struct NativeExecutionAdapter<'a> {
670    db: &'a crate::api::UniInner,
671    native_store: &'a uni_query::query::df_graph::DerivedStore,
672    compiled: &'a CompiledProgram,
673    /// Execution contexts from the fixpoint planner for columnar query execution.
674    graph_ctx: Arc<uni_query::query::df_graph::GraphExecutionContext>,
675    session_ctx: Arc<parking_lot::RwLock<datafusion::prelude::SessionContext>>,
676    /// Query parameters threaded from LocyConfig; passed to execute_subplan so
677    /// that $param references in rule MATCH WHERE clauses are resolved.
678    params: HashMap<String, Value>,
679    /// Private transaction L0 override for commit-time serialization.
680    tx_l0_override: Option<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
681    /// Locy-scoped L0 buffer. DERIVE mutations go here. ASSUME/ABDUCE fork from here.
682    /// Protected by std::sync::Mutex for interior mutability (fork/restore swap the Arc).
683    locy_l0: std::sync::Mutex<Option<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>>,
684    /// Stack of saved L0 states for nested fork/restore (ASSUME inside ASSUME).
685    l0_save_stack:
686        std::sync::Mutex<Vec<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>>,
687}
688
689impl<'a> NativeExecutionAdapter<'a> {
690    fn new(
691        db: &'a crate::api::UniInner,
692        native_store: &'a uni_query::query::df_graph::DerivedStore,
693        compiled: &'a CompiledProgram,
694        graph_ctx: Arc<uni_query::query::df_graph::GraphExecutionContext>,
695        session_ctx: Arc<parking_lot::RwLock<datafusion::prelude::SessionContext>>,
696        params: HashMap<String, Value>,
697        tx_l0_override: Option<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
698    ) -> Self {
699        Self {
700            db,
701            native_store,
702            compiled,
703            graph_ctx,
704            session_ctx,
705            params,
706            tx_l0_override,
707            locy_l0: std::sync::Mutex::new(None),
708            l0_save_stack: std::sync::Mutex::new(Vec::new()),
709        }
710    }
711
712    /// Execute a Query AST via execute_subplan, reusing the fixpoint contexts.
713    async fn execute_query_ast(
714        &self,
715        ast: Query,
716    ) -> std::result::Result<Vec<RecordBatch>, LocyError> {
717        let schema = self.db.schema.schema();
718        let logical_plan =
719            QueryPlanner::new(schema)
720                .plan(ast)
721                .map_err(|e| LocyError::ExecutorError {
722                    message: e.to_string(),
723                })?;
724        uni_query::query::df_graph::common::execute_subplan(
725            &logical_plan,
726            &self.params,
727            &HashMap::new(),
728            &self.graph_ctx,
729            &self.session_ctx,
730            &self.db.storage,
731            &self.db.schema.schema(),
732        )
733        .await
734        .map_err(|e| LocyError::ExecutorError {
735            message: e.to_string(),
736        })
737    }
738}
739
740#[async_trait]
741impl DerivedFactSource for NativeExecutionAdapter<'_> {
742    fn lookup_derived(&self, rule_name: &str) -> std::result::Result<Vec<FactRow>, LocyError> {
743        let batches = self
744            .native_store
745            .get(rule_name)
746            .map(|v| v.as_slice())
747            .unwrap_or(&[]);
748        Ok(record_batches_to_locy_rows(batches))
749    }
750
751    fn lookup_derived_batches(
752        &self,
753        rule_name: &str,
754    ) -> std::result::Result<Vec<RecordBatch>, LocyError> {
755        Ok(self
756            .native_store
757            .get(rule_name)
758            .map(|v| v.to_vec())
759            .unwrap_or_default())
760    }
761
762    async fn execute_pattern(
763        &self,
764        pattern: &Pattern,
765        where_conditions: &[Expr],
766    ) -> std::result::Result<Vec<RecordBatch>, LocyError> {
767        let query = build_match_return_query(pattern, where_conditions);
768        let schema = self.db.schema.schema();
769        let logical_plan =
770            QueryPlanner::new(schema)
771                .plan(query)
772                .map_err(|e| LocyError::ExecutorError {
773                    message: e.to_string(),
774                })?;
775
776        // When a locy_l0 or transaction L0 is active, the stored graph_ctx may not
777        // include the local L0 buffer. Rebuild a temporary context that includes it
778        // so pattern queries see the uncommitted/hypothetical state.
779        let tx_l0_for_ctx = self
780            .locy_l0
781            .lock()
782            .unwrap()
783            .clone()
784            .or_else(|| self.tx_l0_override.clone());
785        let transaction_ctx: Option<Arc<uni_query::query::df_graph::GraphExecutionContext>> =
786            if let Some(tx_l0) = tx_l0_for_ctx {
787                if let Some(writer_arc) = &self.db.writer {
788                    if let Ok(writer) = writer_arc.try_read() {
789                        let l0_ctx = uni_query::query::df_graph::L0Context {
790                            current_l0: Some(writer.l0_manager.get_current()),
791                            transaction_l0: Some(tx_l0),
792                            pending_flush_l0s: writer.l0_manager.get_pending_flush(),
793                        };
794                        Some(Arc::new(
795                            uni_query::query::df_graph::GraphExecutionContext::with_l0_context(
796                                self.db.storage.clone(),
797                                l0_ctx,
798                                self.graph_ctx.property_manager().clone(),
799                            ),
800                        ))
801                    } else {
802                        None
803                    }
804                } else {
805                    None
806                }
807            } else {
808                None
809            };
810
811        let effective_ctx = transaction_ctx.as_ref().unwrap_or(&self.graph_ctx);
812
813        // Use the fixpoint planner's execution contexts directly via execute_subplan.
814        uni_query::query::df_graph::common::execute_subplan(
815            &logical_plan,
816            &self.params,
817            &HashMap::new(),
818            effective_ctx,
819            &self.session_ctx,
820            &self.db.storage,
821            &self.db.schema.schema(),
822        )
823        .await
824        .map_err(|e| LocyError::ExecutorError {
825            message: e.to_string(),
826        })
827    }
828}
829
830#[async_trait]
831impl LocyExecutionContext for NativeExecutionAdapter<'_> {
832    async fn lookup_derived_enriched(
833        &self,
834        rule_name: &str,
835    ) -> std::result::Result<Vec<FactRow>, LocyError> {
836        use arrow_schema::DataType;
837
838        if let Some(rule) = self.compiled.rule_catalog.get(rule_name) {
839            let is_derive_rule = rule
840                .clauses
841                .iter()
842                .all(|c| matches!(c.output, RuleOutput::Derive(_)));
843            if is_derive_rule {
844                let mut all_rows = Vec::new();
845                for clause in &rule.clauses {
846                    let cypher_conds = extract_cypher_conditions(&clause.where_conditions);
847                    let raw_batches = self
848                        .execute_pattern(&clause.match_pattern, &cypher_conds)
849                        .await?;
850                    all_rows.extend(record_batches_to_locy_rows(&raw_batches));
851                }
852                return Ok(all_rows);
853            }
854        }
855
856        let batches = self
857            .native_store
858            .get(rule_name)
859            .map(|v| v.as_slice())
860            .unwrap_or(&[]);
861        let rows = record_batches_to_locy_rows(batches);
862
863        let vid_columns: HashSet<String> = batches
864            .first()
865            .map(|batch| {
866                batch
867                    .schema()
868                    .fields()
869                    .iter()
870                    .filter(|f| *f.data_type() == DataType::UInt64)
871                    .map(|f| f.name().clone())
872                    .collect()
873            })
874            .unwrap_or_default();
875
876        if vid_columns.is_empty() {
877            return Ok(rows);
878        }
879
880        let unique_vids: HashSet<i64> = rows
881            .iter()
882            .flat_map(|row| {
883                vid_columns.iter().filter_map(|col| {
884                    if let Some(Value::Int(vid)) = row.get(col) {
885                        Some(*vid)
886                    } else {
887                        None
888                    }
889                })
890            })
891            .collect();
892
893        if unique_vids.is_empty() {
894            return Ok(rows);
895        }
896
897        let vids_literal = unique_vids
898            .iter()
899            .map(|v| v.to_string())
900            .collect::<Vec<_>>()
901            .join(", ");
902        let query_str =
903            format!("MATCH (n) WHERE id(n) IN [{vids_literal}] RETURN id(n) AS _vid, n");
904        let mut vid_to_node: HashMap<i64, Value> = HashMap::new();
905        if let Ok(ast) = uni_cypher::parse(&query_str)
906            && let Ok(batches) = self.execute_query_ast(ast).await
907        {
908            for row in record_batches_to_locy_rows(&batches) {
909                if let (Some(Value::Int(vid)), Some(node)) = (row.get("_vid"), row.get("n")) {
910                    vid_to_node.insert(*vid, node.clone());
911                }
912            }
913        }
914
915        Ok(rows
916            .into_iter()
917            .map(|row| {
918                row.into_iter()
919                    .map(|(k, v)| {
920                        if vid_columns.contains(&k)
921                            && let Value::Int(vid) = &v
922                        {
923                            let new_v = vid_to_node.get(vid).cloned().unwrap_or(v);
924                            return (k, new_v);
925                        }
926                        (k, v)
927                    })
928                    .collect()
929            })
930            .collect())
931    }
932
933    async fn execute_cypher_read(
934        &self,
935        ast: Query,
936    ) -> std::result::Result<Vec<FactRow>, LocyError> {
937        // Route through locy_l0 so trailing Cypher sees DERIVE/ASSUME mutations.
938        // locy_l0 is the "active" L0 for this evaluation scope.
939        let active_l0 = self.locy_l0.lock().unwrap().clone();
940        let result = if let Some(ref l0) = active_l0 {
941            self.db
942                .execute_ast_internal_with_tx_l0(
943                    ast,
944                    "<locy>",
945                    HashMap::new(),
946                    self.db.config.clone(),
947                    l0.clone(),
948                )
949                .await
950        } else if let Some(ref tx_l0) = self.tx_l0_override {
951            self.db
952                .execute_ast_internal_with_tx_l0(
953                    ast,
954                    "<locy>",
955                    HashMap::new(),
956                    self.db.config.clone(),
957                    tx_l0.clone(),
958                )
959                .await
960        } else {
961            self.db
962                .execute_ast_internal(ast, "<locy>", HashMap::new(), self.db.config.clone())
963                .await
964        }
965        .map_err(|e| LocyError::ExecutorError {
966            message: e.to_string(),
967        })?;
968        Ok(result
969            .into_rows()
970            .into_iter()
971            .map(|row| {
972                let cols: Vec<String> = row.columns().to_vec();
973                cols.into_iter().zip(row.into_values()).collect()
974            })
975            .collect())
976    }
977
978    async fn execute_mutation(
979        &self,
980        ast: Query,
981        params: HashMap<String, Value>,
982    ) -> std::result::Result<usize, LocyError> {
983        // Route through locy_l0 for all mutations within this evaluation scope.
984        let active_l0 = self.locy_l0.lock().unwrap().clone();
985        if let Some(ref l0) = active_l0 {
986            let before = l0.read().mutation_count;
987            self.db
988                .execute_ast_internal_with_tx_l0(
989                    ast,
990                    "<locy>",
991                    params,
992                    self.db.config.clone(),
993                    l0.clone(),
994                )
995                .await
996                .map_err(|e| LocyError::ExecutorError {
997                    message: e.to_string(),
998                })?;
999            let after = l0.read().mutation_count;
1000            return Ok(after.saturating_sub(before));
1001        }
1002        if let Some(ref tx_l0) = self.tx_l0_override {
1003            let before = tx_l0.read().mutation_count;
1004            self.db
1005                .execute_ast_internal_with_tx_l0(
1006                    ast,
1007                    "<locy>",
1008                    params,
1009                    self.db.config.clone(),
1010                    tx_l0.clone(),
1011                )
1012                .await
1013                .map_err(|e| LocyError::ExecutorError {
1014                    message: e.to_string(),
1015                })?;
1016            let after = tx_l0.read().mutation_count;
1017            return Ok(after.saturating_sub(before));
1018        }
1019        // Standard path: mutations go through writer's global L0
1020        let before = self.db.get_mutation_count().await;
1021        self.db
1022            .execute_ast_internal(ast, "<locy>", params, self.db.config.clone())
1023            .await
1024            .map_err(|e| LocyError::ExecutorError {
1025                message: e.to_string(),
1026            })?;
1027        let after = self.db.get_mutation_count().await;
1028        Ok(after.saturating_sub(before))
1029    }
1030
1031    async fn fork_l0(&self) -> std::result::Result<(), LocyError> {
1032        let mut guard = self.locy_l0.lock().unwrap();
1033        let current = guard.as_ref().ok_or_else(|| LocyError::SavepointFailed {
1034            message: "no active Locy L0 to fork".into(),
1035        })?;
1036        // Clone the current L0 buffer (deep copy — forked WAL is None)
1037        let cloned = Arc::new(parking_lot::RwLock::new(current.read().clone()));
1038        // Save the original, replace with the clone for hypothetical mutations
1039        let previous = guard.replace(cloned).unwrap();
1040        self.l0_save_stack.lock().unwrap().push(previous);
1041        Ok(())
1042    }
1043
1044    async fn restore_l0(&self) -> std::result::Result<(), LocyError> {
1045        let saved =
1046            self.l0_save_stack
1047                .lock()
1048                .unwrap()
1049                .pop()
1050                .ok_or_else(|| LocyError::SavepointFailed {
1051                    message: "no saved L0 to restore".into(),
1052                })?;
1053        let mut guard = self.locy_l0.lock().unwrap();
1054        *guard = Some(saved);
1055        Ok(())
1056    }
1057
1058    async fn re_evaluate_strata(
1059        &self,
1060        program: &CompiledProgram,
1061        config: &LocyConfig,
1062    ) -> std::result::Result<RowStore, LocyError> {
1063        let strata_only = CompiledProgram {
1064            strata: program.strata.clone(),
1065            rule_catalog: program.rule_catalog.clone(),
1066            warnings: vec![],
1067            commands: vec![],
1068        };
1069        // Pass the current locy_l0 so re-evaluation sees hypothetical state.
1070        let locy_l0 = self.locy_l0.lock().unwrap().clone();
1071        let engine = LocyEngine {
1072            db: self.db,
1073            tx_l0_override: locy_l0.clone(),
1074            locy_l0,
1075            collect_derive: false,
1076        };
1077        let native_store = engine
1078            .run_strata_native(&strata_only, config)
1079            .await
1080            .map_err(|e| LocyError::ExecutorError {
1081                message: e.to_string(),
1082            })?;
1083        let mut store = native_store_to_row_store(&native_store, program);
1084
1085        // Enrich VID integers → full Node objects so SLG/QUERY inside
1086        // ASSUME/ABDUCE can access node properties and IS-ref joins work.
1087        let store_rows: HashMap<String, Vec<FactRow>> = store
1088            .iter()
1089            .map(|(k, v)| (k.clone(), v.rows.clone()))
1090            .collect();
1091        let enriched = enrich_vids_with_nodes(
1092            self.db,
1093            &native_store,
1094            store_rows,
1095            &self.graph_ctx,
1096            &self.session_ctx,
1097        )
1098        .await;
1099        for (name, rows) in enriched {
1100            if let Some(rel) = store.get_mut(&name) {
1101                rel.rows = rows;
1102            }
1103        }
1104
1105        Ok(store)
1106    }
1107}
1108
1109// ── Native command dispatch ───────────────────────────────────────────────────
1110
1111#[allow(clippy::too_many_arguments)]
1112fn dispatch_native_command<'a>(
1113    cmd: &'a CompiledCommand,
1114    program: &'a CompiledProgram,
1115    ctx: &'a NativeExecutionAdapter<'a>,
1116    config: &'a LocyConfig,
1117    orch_store: &'a mut RowStore,
1118    stats: &'a mut LocyStats,
1119    tracker: Option<Arc<ProvenanceStore>>,
1120    start: Instant,
1121    approximate_groups: &'a HashMap<String, Vec<String>>,
1122    collect_derive: bool,
1123    collected_derives: &'a mut Vec<CollectedDeriveOutput>,
1124) -> std::pin::Pin<
1125    Box<
1126        dyn std::future::Future<Output = std::result::Result<CommandResult, LocyError>> + Send + 'a,
1127    >,
1128> {
1129    Box::pin(async move {
1130        match cmd {
1131            CompiledCommand::GoalQuery(gq) => {
1132                let rows = uni_query::query::df_graph::locy_query::evaluate_query(
1133                    gq, program, ctx, config, orch_store, stats, start,
1134                )
1135                .await?;
1136                Ok(CommandResult::Query(rows))
1137            }
1138            CompiledCommand::ExplainRule(eq) => {
1139                let node = uni_query::query::df_graph::locy_explain::explain_rule(
1140                    eq,
1141                    program,
1142                    ctx,
1143                    config,
1144                    orch_store,
1145                    stats,
1146                    tracker.as_deref(),
1147                    Some(approximate_groups),
1148                )
1149                .await?;
1150                Ok(CommandResult::Explain(node))
1151            }
1152            CompiledCommand::Assume(ca) => {
1153                let rows = uni_query::query::df_graph::locy_assume::evaluate_assume(
1154                    ca, program, ctx, config, stats,
1155                )
1156                .await?;
1157                Ok(CommandResult::Assume(rows))
1158            }
1159            CompiledCommand::Abduce(aq) => {
1160                let result = uni_query::query::df_graph::locy_abduce::evaluate_abduce(
1161                    aq,
1162                    program,
1163                    ctx,
1164                    config,
1165                    orch_store,
1166                    stats,
1167                    tracker.as_deref(),
1168                )
1169                .await?;
1170                Ok(CommandResult::Abduce(result))
1171            }
1172            CompiledCommand::DeriveCommand(dc) => {
1173                if collect_derive {
1174                    // Session path: collect ASTs + data for deferred materialization.
1175                    let output = uni_query::query::df_graph::locy_derive::collect_derive_facts(
1176                        dc, program, ctx,
1177                    )
1178                    .await?;
1179                    let affected = output.affected;
1180
1181                    // Replay mutations to the ephemeral L0 so that subsequent
1182                    // trailing Cypher commands can read the derived edges.
1183                    // Guard: skip when no L0 exists (read-only DB).
1184                    // Replay mutations to the ephemeral L0 so that subsequent
1185                    // trailing Cypher commands can read the derived edges.
1186                    // Guard: skip when no L0 exists (read-only DB).
1187                    if ctx.tx_l0_override.is_some() {
1188                        for query in &output.queries {
1189                            ctx.execute_mutation(query.clone(), HashMap::new()).await?;
1190                        }
1191                    }
1192
1193                    collected_derives.push(output);
1194                    Ok(CommandResult::Derive { affected })
1195                } else {
1196                    // Transaction path: auto-apply mutations
1197                    let affected = uni_query::query::df_graph::locy_derive::derive_command(
1198                        dc, program, ctx, stats,
1199                    )
1200                    .await?;
1201                    Ok(CommandResult::Derive { affected })
1202                }
1203            }
1204            CompiledCommand::Cypher(q) => {
1205                let rows = ctx.execute_cypher_read(q.clone()).await?;
1206                stats.queries_executed += 1;
1207                Ok(CommandResult::Cypher(rows))
1208            }
1209        }
1210    })
1211}
1212
1213// ── Helpers ───────────────────────────────────────────────────────────────────
1214
1215async fn enrich_vids_with_nodes(
1216    db: &crate::api::UniInner,
1217    native_store: &uni_query::query::df_graph::DerivedStore,
1218    derived: HashMap<String, Vec<FactRow>>,
1219    graph_ctx: &Arc<uni_query::query::df_graph::GraphExecutionContext>,
1220    session_ctx: &Arc<parking_lot::RwLock<datafusion::prelude::SessionContext>>,
1221) -> HashMap<String, Vec<FactRow>> {
1222    use arrow_schema::DataType;
1223    let mut enriched = HashMap::new();
1224
1225    for (name, rows) in derived {
1226        let vid_columns: HashSet<String> = native_store
1227            .get(&name)
1228            .and_then(|batches| batches.first())
1229            .map(|batch| {
1230                batch
1231                    .schema()
1232                    .fields()
1233                    .iter()
1234                    .filter(|f| *f.data_type() == DataType::UInt64)
1235                    .map(|f| f.name().clone())
1236                    .collect()
1237            })
1238            .unwrap_or_default();
1239
1240        if vid_columns.is_empty() {
1241            enriched.insert(name, rows);
1242            continue;
1243        }
1244
1245        let unique_vids: HashSet<i64> = rows
1246            .iter()
1247            .flat_map(|row| {
1248                vid_columns.iter().filter_map(|col| {
1249                    if let Some(Value::Int(vid)) = row.get(col) {
1250                        Some(*vid)
1251                    } else {
1252                        None
1253                    }
1254                })
1255            })
1256            .collect();
1257
1258        if unique_vids.is_empty() {
1259            enriched.insert(name, rows);
1260            continue;
1261        }
1262
1263        let vids_literal = unique_vids
1264            .iter()
1265            .map(|v| v.to_string())
1266            .collect::<Vec<_>>()
1267            .join(", ");
1268        let query_str = format!(
1269            "MATCH (n) WHERE id(n) IN [{}] RETURN id(n) AS _vid, n",
1270            vids_literal
1271        );
1272        let mut vid_to_node: HashMap<i64, Value> = HashMap::new();
1273        if let Ok(ast) = uni_cypher::parse(&query_str) {
1274            let schema = db.schema.schema();
1275            if let Ok(logical_plan) = uni_query::QueryPlanner::new(schema).plan(ast)
1276                && let Ok(batches) = uni_query::query::df_graph::common::execute_subplan(
1277                    &logical_plan,
1278                    &HashMap::new(),
1279                    &HashMap::new(),
1280                    graph_ctx,
1281                    session_ctx,
1282                    &db.storage,
1283                    &db.schema.schema(),
1284                )
1285                .await
1286            {
1287                for row in record_batches_to_locy_rows(&batches) {
1288                    if let (Some(Value::Int(vid)), Some(node)) = (row.get("_vid"), row.get("n")) {
1289                        vid_to_node.insert(*vid, node.clone());
1290                    }
1291                }
1292            }
1293        }
1294
1295        let enriched_rows: Vec<FactRow> = rows
1296            .into_iter()
1297            .map(|row| {
1298                row.into_iter()
1299                    .map(|(k, v)| {
1300                        if vid_columns.contains(&k)
1301                            && let Value::Int(vid) = &v
1302                        {
1303                            let new_v = vid_to_node.get(vid).cloned().unwrap_or(v);
1304                            return (k, new_v);
1305                        }
1306                        (k, v)
1307                    })
1308                    .collect()
1309            })
1310            .collect();
1311        enriched.insert(name, enriched_rows);
1312    }
1313
1314    enriched
1315}
1316
1317#[allow(clippy::too_many_arguments)]
1318fn build_locy_result(
1319    derived: HashMap<String, Vec<FactRow>>,
1320    command_results: Vec<CommandResult>,
1321    compiled: &CompiledProgram,
1322    evaluation_time: Duration,
1323    mut orchestrator_stats: LocyStats,
1324    warnings: Vec<RuntimeWarning>,
1325    approximate_groups: HashMap<String, Vec<String>>,
1326    derived_fact_set: Option<DerivedFactSet>,
1327    timed_out: bool,
1328) -> LocyResult {
1329    let total_facts: usize = derived.values().map(|v| v.len()).sum();
1330    orchestrator_stats.strata_evaluated = compiled.strata.len();
1331    orchestrator_stats.derived_nodes = total_facts;
1332    orchestrator_stats.evaluation_time = evaluation_time;
1333
1334    let inner = uni_locy::LocyResult {
1335        derived,
1336        stats: orchestrator_stats,
1337        command_results,
1338        warnings,
1339        approximate_groups,
1340        derived_fact_set,
1341        timed_out,
1342    };
1343    let metrics = QueryMetrics {
1344        total_time: evaluation_time,
1345        exec_time: evaluation_time,
1346        rows_returned: total_facts,
1347        ..Default::default()
1348    };
1349    LocyResult::new(inner, metrics)
1350}
1351
1352fn native_store_to_row_store(
1353    native: &uni_query::query::df_graph::DerivedStore,
1354    compiled: &CompiledProgram,
1355) -> RowStore {
1356    let mut result = RowStore::new();
1357    for name in native.rule_names() {
1358        if let Some(batches) = native.get(name) {
1359            let rows = record_batches_to_locy_rows(batches);
1360            let rule = compiled.rule_catalog.get(name);
1361            let columns: Vec<String> = rule
1362                .map(|r| r.yield_schema.iter().map(|yc| yc.name.clone()).collect())
1363                .unwrap_or_else(|| {
1364                    rows.first()
1365                        .map(|r| r.keys().cloned().collect())
1366                        .unwrap_or_default()
1367                });
1368            result.insert(name.to_string(), RowRelation::new(columns, rows));
1369        }
1370    }
1371    result
1372}
1373
1374// ── Error mapping ──────────────────────────────────────────────────────────
1375
1376fn map_parse_error(e: uni_cypher::ParseError) -> UniError {
1377    UniError::Parse {
1378        message: format!("LocyParseError: {e}"),
1379        position: None,
1380        line: None,
1381        column: None,
1382        context: None,
1383    }
1384}
1385
1386fn map_compile_error(e: LocyCompileError) -> UniError {
1387    UniError::Query {
1388        message: format!("LocyCompileError: {e}"),
1389        query: None,
1390    }
1391}
1392
1393fn map_runtime_error(e: LocyError) -> UniError {
1394    match e {
1395        LocyError::SavepointFailed { ref message } => UniError::Transaction {
1396            message: format!("LocyRuntimeError: {message}"),
1397        },
1398        other => UniError::Query {
1399            message: format!("LocyRuntimeError: {other}"),
1400            query: None,
1401        },
1402    }
1403}
1404
1405fn map_native_df_error(e: impl std::fmt::Display) -> UniError {
1406    UniError::Query {
1407        message: format!("LocyRuntimeError: {e}"),
1408        query: None,
1409    }
1410}