Skip to main content

uni_db/api/
impl_locy.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Locy engine integration: wires the Locy compiler and native execution engine to the real database.
5
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use arrow_array::RecordBatch;
11use async_trait::async_trait;
12use uni_common::{Result, UniError, Value};
13use uni_cypher::ast::{Expr, Pattern, Query};
14use uni_cypher::locy_ast::RuleOutput;
15use uni_locy::types::CompiledCommand;
16use uni_locy::{
17    CommandResult, CompiledProgram, LocyCompileError, LocyConfig, LocyError, LocyResult, LocyStats,
18    Row, SavepointId, compile,
19};
20use uni_query::QueryPlanner;
21use uni_query::query::df_graph::locy_ast_builder::build_match_return_query;
22use uni_query::query::df_graph::locy_delta::{RowRelation, RowStore, extract_cypher_conditions};
23use uni_query::query::df_graph::locy_eval::record_batches_to_locy_rows;
24use uni_query::query::df_graph::locy_explain::DerivationTracker;
25use uni_query::query::df_graph::{DerivedFactSource, LocyExecutionContext};
26
27use crate::api::Uni;
28
29/// Engine for evaluating Locy programs against a real database.
30pub struct LocyEngine<'a> {
31    db: &'a Uni,
32}
33
34impl Uni {
35    /// Create a Locy evaluation engine bound to this database.
36    pub fn locy(&self) -> LocyEngine<'_> {
37        LocyEngine { db: self }
38    }
39}
40
41impl<'a> LocyEngine<'a> {
42    /// Parse and compile a Locy program without executing it.
43    pub fn compile_only(&self, program: &str) -> Result<CompiledProgram> {
44        let ast = uni_cypher::parse_locy(program).map_err(map_parse_error)?;
45        compile(&ast).map_err(map_compile_error)
46    }
47
48    /// Parse, compile, and evaluate a Locy program with default config.
49    pub async fn evaluate(&self, program: &str) -> Result<LocyResult> {
50        self.evaluate_with_config(program, &LocyConfig::default())
51            .await
52    }
53
54    /// Convenience wrapper for EXPLAIN RULE commands.
55    pub async fn explain(&self, program: &str) -> Result<LocyResult> {
56        self.evaluate(program).await
57    }
58
59    /// Parse, compile, and evaluate a Locy program with custom config.
60    pub async fn evaluate_with_config(
61        &self,
62        program: &str,
63        config: &LocyConfig,
64    ) -> Result<LocyResult> {
65        let compiled = self.compile_only(program)?;
66        let start = Instant::now();
67
68        // 1. Build logical plan
69        let schema = self.db.schema.schema();
70        let query_planner = uni_query::QueryPlanner::new(schema);
71        let plan_builder = uni_query::query::locy_planner::LocyPlanBuilder::new(&query_planner);
72        let logical = plan_builder
73            .build_program_plan(
74                &compiled,
75                config.max_iterations,
76                config.timeout,
77                config.max_derived_bytes,
78                config.deterministic_best_by,
79            )
80            .map_err(|e| UniError::Query {
81                message: format!("LocyPlanBuildError: {e}"),
82                query: None,
83            })?;
84
85        // 2. Create executor + physical planner
86        let mut df_executor = uni_query::Executor::new(self.db.storage.clone());
87        df_executor.set_config(self.db.config.clone());
88        if let Some(ref w) = self.db.writer {
89            df_executor.set_writer(w.clone());
90        }
91        df_executor.set_xervo_runtime(self.db.xervo_runtime.clone());
92        df_executor.set_procedure_registry(self.db.procedure_registry.clone());
93
94        let (session_ctx, planner, _prop_mgr) = df_executor
95            .create_datafusion_planner(&self.db.properties, &HashMap::new())
96            .await
97            .map_err(map_native_df_error)?;
98
99        // 3. Physical plan
100        let exec_plan = planner.plan(&logical).map_err(map_native_df_error)?;
101
102        // 4. Create tracker for EXPLAIN commands
103        let has_explain = compiled
104            .commands
105            .iter()
106            .any(|c| matches!(c, CompiledCommand::ExplainRule(_)));
107        let tracker: Option<Arc<uni_query::query::df_graph::DerivationTracker>> = if has_explain {
108            Some(Arc::new(
109                uni_query::query::df_graph::DerivationTracker::new(),
110            ))
111        } else {
112            None
113        };
114
115        let (derived_store_slot, iteration_counts_slot, peak_memory_slot) =
116            if let Some(program_exec) = exec_plan
117                .as_any()
118                .downcast_ref::<uni_query::query::df_graph::LocyProgramExec>(
119            ) {
120                if let Some(ref t) = tracker {
121                    program_exec.set_derivation_tracker(Arc::clone(t));
122                }
123                (
124                    program_exec.derived_store_slot(),
125                    program_exec.iteration_counts_slot(),
126                    program_exec.peak_memory_slot(),
127                )
128            } else {
129                (
130                    Arc::new(std::sync::RwLock::new(None)),
131                    Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
132                    Arc::new(std::sync::RwLock::new(0usize)),
133                )
134            };
135
136        // 5. Execute strata
137        let _stats_batches = uni_query::Executor::collect_batches(&session_ctx, exec_plan)
138            .await
139            .map_err(map_native_df_error)?;
140
141        // 6. Extract native DerivedStore
142        let native_store = derived_store_slot
143            .write()
144            .unwrap()
145            .take()
146            .unwrap_or_default();
147
148        // 7. Convert native DerivedStore → row-based RowStore for SLG/EXPLAIN
149        let mut orch_store = native_store_to_row_store(&native_store, &compiled);
150
151        // 8. Dispatch commands via native trait interfaces
152        let native_ctx = NativeExecutionAdapter::new(
153            self.db,
154            &native_store,
155            &compiled,
156            planner.graph_ctx().clone(),
157            planner.session_ctx().clone(),
158        );
159        let mut locy_stats = LocyStats {
160            total_iterations: iteration_counts_slot
161                .read()
162                .map(|c| c.values().sum::<usize>())
163                .unwrap_or(0),
164            peak_memory_bytes: peak_memory_slot.read().map(|v| *v).unwrap_or(0),
165            ..LocyStats::default()
166        };
167        let mut command_results = Vec::new();
168        for cmd in &compiled.commands {
169            let result = dispatch_native_command(
170                cmd,
171                &compiled,
172                &native_ctx,
173                config,
174                &mut orch_store,
175                &mut locy_stats,
176                tracker.clone(),
177                start,
178            )
179            .await
180            .map_err(map_runtime_error)?;
181            command_results.push(result);
182        }
183
184        let evaluation_time = start.elapsed();
185
186        // 9. Build derived map, enrich VID columns with full nodes
187        let base_derived: HashMap<String, Vec<Row>> = native_store
188            .rule_names()
189            .filter_map(|name| {
190                native_store
191                    .get(name)
192                    .map(|batches| (name.to_string(), record_batches_to_locy_rows(batches)))
193            })
194            .collect();
195        let enriched_derived = enrich_vids_with_nodes(
196            self.db,
197            &native_store,
198            base_derived,
199            planner.graph_ctx(),
200            planner.session_ctx(),
201        )
202        .await;
203
204        // 10. Build final LocyResult
205        Ok(build_locy_result(
206            enriched_derived,
207            command_results,
208            &compiled,
209            evaluation_time,
210            locy_stats,
211        ))
212    }
213
214    /// Run only the fixpoint strata (no commands) via the native DataFusion path.
215    ///
216    /// Used by `re_evaluate_strata()` so that savepoint-scoped mutations from
217    /// ASSUME/ABDUCE hypothetical states are visible — the `Executor` is configured
218    /// with `self.db.writer`, which holds the active transaction handle.
219    async fn run_strata_native(
220        &self,
221        compiled: &CompiledProgram,
222        config: &LocyConfig,
223    ) -> Result<uni_query::query::df_graph::DerivedStore> {
224        let schema = self.db.schema.schema();
225        let query_planner = uni_query::QueryPlanner::new(schema);
226        let plan_builder = uni_query::query::locy_planner::LocyPlanBuilder::new(&query_planner);
227        let logical = plan_builder
228            .build_program_plan(
229                compiled,
230                config.max_iterations,
231                config.timeout,
232                config.max_derived_bytes,
233                config.deterministic_best_by,
234            )
235            .map_err(|e| UniError::Query {
236                message: format!("LocyPlanBuildError: {e}"),
237                query: None,
238            })?;
239
240        let mut df_executor = uni_query::Executor::new(self.db.storage.clone());
241        df_executor.set_config(self.db.config.clone());
242        if let Some(ref w) = self.db.writer {
243            df_executor.set_writer(w.clone());
244        }
245        df_executor.set_xervo_runtime(self.db.xervo_runtime.clone());
246        df_executor.set_procedure_registry(self.db.procedure_registry.clone());
247
248        let (session_ctx, planner, _) = df_executor
249            .create_datafusion_planner(&self.db.properties, &HashMap::new())
250            .await
251            .map_err(map_native_df_error)?;
252        let exec_plan = planner.plan(&logical).map_err(map_native_df_error)?;
253
254        let derived_store_slot = if let Some(program_exec) =
255            exec_plan
256                .as_any()
257                .downcast_ref::<uni_query::query::df_graph::LocyProgramExec>()
258        {
259            program_exec.derived_store_slot()
260        } else {
261            Arc::new(std::sync::RwLock::new(None))
262        };
263
264        let _ = uni_query::Executor::collect_batches(&session_ctx, exec_plan)
265            .await
266            .map_err(map_native_df_error)?;
267        Ok(derived_store_slot
268            .write()
269            .unwrap()
270            .take()
271            .unwrap_or_default())
272    }
273}
274
275// ── NativeExecutionAdapter — implements DerivedFactSource + LocyExecutionContext ─
276
277struct NativeExecutionAdapter<'a> {
278    db: &'a Uni,
279    native_store: &'a uni_query::query::df_graph::DerivedStore,
280    compiled: &'a CompiledProgram,
281    /// Execution contexts from the fixpoint planner for columnar query execution.
282    graph_ctx: Arc<uni_query::query::df_graph::GraphExecutionContext>,
283    session_ctx: Arc<parking_lot::RwLock<datafusion::prelude::SessionContext>>,
284}
285
286impl<'a> NativeExecutionAdapter<'a> {
287    fn new(
288        db: &'a Uni,
289        native_store: &'a uni_query::query::df_graph::DerivedStore,
290        compiled: &'a CompiledProgram,
291        graph_ctx: Arc<uni_query::query::df_graph::GraphExecutionContext>,
292        session_ctx: Arc<parking_lot::RwLock<datafusion::prelude::SessionContext>>,
293    ) -> Self {
294        Self {
295            db,
296            native_store,
297            compiled,
298            graph_ctx,
299            session_ctx,
300        }
301    }
302
303    /// Execute a Query AST via execute_subplan, reusing the fixpoint contexts.
304    async fn execute_query_ast(
305        &self,
306        ast: Query,
307    ) -> std::result::Result<Vec<RecordBatch>, LocyError> {
308        let schema = self.db.schema.schema();
309        let logical_plan =
310            QueryPlanner::new(schema)
311                .plan(ast)
312                .map_err(|e| LocyError::ExecutorError {
313                    message: e.to_string(),
314                })?;
315        uni_query::query::df_graph::common::execute_subplan(
316            &logical_plan,
317            &HashMap::new(),
318            &HashMap::new(),
319            &self.graph_ctx,
320            &self.session_ctx,
321            &self.db.storage,
322            &self.db.schema.schema(),
323        )
324        .await
325        .map_err(|e| LocyError::ExecutorError {
326            message: e.to_string(),
327        })
328    }
329}
330
331#[async_trait(?Send)]
332impl DerivedFactSource for NativeExecutionAdapter<'_> {
333    fn lookup_derived(&self, rule_name: &str) -> std::result::Result<Vec<Row>, LocyError> {
334        let batches = self
335            .native_store
336            .get(rule_name)
337            .map(|v| v.as_slice())
338            .unwrap_or(&[]);
339        Ok(record_batches_to_locy_rows(batches))
340    }
341
342    fn lookup_derived_batches(
343        &self,
344        rule_name: &str,
345    ) -> std::result::Result<Vec<RecordBatch>, LocyError> {
346        Ok(self
347            .native_store
348            .get(rule_name)
349            .map(|v| v.to_vec())
350            .unwrap_or_default())
351    }
352
353    async fn execute_pattern(
354        &self,
355        pattern: &Pattern,
356        where_conditions: &[Expr],
357    ) -> std::result::Result<Vec<RecordBatch>, LocyError> {
358        let query = build_match_return_query(pattern, where_conditions);
359        let schema = self.db.schema.schema();
360        let logical_plan =
361            QueryPlanner::new(schema)
362                .plan(query)
363                .map_err(|e| LocyError::ExecutorError {
364                    message: e.to_string(),
365                })?;
366
367        // Use the fixpoint planner's execution contexts directly via execute_subplan.
368        uni_query::query::df_graph::common::execute_subplan(
369            &logical_plan,
370            &HashMap::new(),
371            &HashMap::new(),
372            &self.graph_ctx,
373            &self.session_ctx,
374            &self.db.storage,
375            &self.db.schema.schema(),
376        )
377        .await
378        .map_err(|e| LocyError::ExecutorError {
379            message: e.to_string(),
380        })
381    }
382}
383
384#[async_trait(?Send)]
385impl LocyExecutionContext for NativeExecutionAdapter<'_> {
386    async fn lookup_derived_enriched(
387        &self,
388        rule_name: &str,
389    ) -> std::result::Result<Vec<Row>, LocyError> {
390        use arrow_schema::DataType;
391
392        if let Some(rule) = self.compiled.rule_catalog.get(rule_name) {
393            let is_derive_rule = rule
394                .clauses
395                .iter()
396                .all(|c| matches!(c.output, RuleOutput::Derive(_)));
397            if is_derive_rule {
398                let mut all_rows = Vec::new();
399                for clause in &rule.clauses {
400                    let cypher_conds = extract_cypher_conditions(&clause.where_conditions);
401                    let raw_batches = self
402                        .execute_pattern(&clause.match_pattern, &cypher_conds)
403                        .await?;
404                    all_rows.extend(record_batches_to_locy_rows(&raw_batches));
405                }
406                return Ok(all_rows);
407            }
408        }
409
410        let batches = self
411            .native_store
412            .get(rule_name)
413            .map(|v| v.as_slice())
414            .unwrap_or(&[]);
415        let rows = record_batches_to_locy_rows(batches);
416
417        let vid_columns: HashSet<String> = batches
418            .first()
419            .map(|batch| {
420                batch
421                    .schema()
422                    .fields()
423                    .iter()
424                    .filter(|f| *f.data_type() == DataType::UInt64)
425                    .map(|f| f.name().clone())
426                    .collect()
427            })
428            .unwrap_or_default();
429
430        if vid_columns.is_empty() {
431            return Ok(rows);
432        }
433
434        let unique_vids: HashSet<i64> = rows
435            .iter()
436            .flat_map(|row| {
437                vid_columns.iter().filter_map(|col| {
438                    if let Some(Value::Int(vid)) = row.get(col) {
439                        Some(*vid)
440                    } else {
441                        None
442                    }
443                })
444            })
445            .collect();
446
447        if unique_vids.is_empty() {
448            return Ok(rows);
449        }
450
451        let vids_literal = unique_vids
452            .iter()
453            .map(|v| v.to_string())
454            .collect::<Vec<_>>()
455            .join(", ");
456        let query_str =
457            format!("MATCH (n) WHERE id(n) IN [{vids_literal}] RETURN id(n) AS _vid, n");
458        let mut vid_to_node: HashMap<i64, Value> = HashMap::new();
459        if let Ok(ast) = uni_cypher::parse(&query_str)
460            && let Ok(batches) = self.execute_query_ast(ast).await
461        {
462            for row in record_batches_to_locy_rows(&batches) {
463                if let (Some(Value::Int(vid)), Some(node)) = (row.get("_vid"), row.get("n")) {
464                    vid_to_node.insert(*vid, node.clone());
465                }
466            }
467        }
468
469        Ok(rows
470            .into_iter()
471            .map(|row| {
472                row.into_iter()
473                    .map(|(k, v)| {
474                        if vid_columns.contains(&k)
475                            && let Value::Int(vid) = &v
476                        {
477                            let new_v = vid_to_node.get(vid).cloned().unwrap_or(v);
478                            return (k, new_v);
479                        }
480                        (k, v)
481                    })
482                    .collect()
483            })
484            .collect())
485    }
486
487    async fn execute_cypher_read(&self, ast: Query) -> std::result::Result<Vec<Row>, LocyError> {
488        // Must use execute_ast_internal (fresh SessionContext) so that savepoint
489        // mutations applied during ASSUME/ABDUCE body dispatch are visible.
490        let result = self
491            .db
492            .execute_ast_internal(ast, "<locy>", HashMap::new(), self.db.config.clone())
493            .await
494            .map_err(|e| LocyError::ExecutorError {
495                message: e.to_string(),
496            })?;
497        Ok(result
498            .rows
499            .into_iter()
500            .map(|row| {
501                row.columns
502                    .iter()
503                    .zip(row.values)
504                    .map(|(col, val)| (col.clone(), val))
505                    .collect()
506            })
507            .collect())
508    }
509
510    async fn execute_mutation(
511        &self,
512        ast: Query,
513        params: HashMap<String, Value>,
514    ) -> std::result::Result<usize, LocyError> {
515        let before = self.db.get_mutation_count().await;
516        self.db
517            .execute_ast_internal(ast, "<locy>", params, self.db.config.clone())
518            .await
519            .map_err(|e| LocyError::ExecutorError {
520                message: e.to_string(),
521            })?;
522        let after = self.db.get_mutation_count().await;
523        Ok(after.saturating_sub(before))
524    }
525
526    async fn begin_savepoint(&self) -> std::result::Result<SavepointId, LocyError> {
527        let writer = self
528            .db
529            .writer
530            .as_ref()
531            .ok_or_else(|| LocyError::SavepointFailed {
532                message: "database is read-only".to_string(),
533            })?;
534        let mut w = writer.write().await;
535        w.begin_transaction()
536            .map_err(|e| LocyError::SavepointFailed {
537                message: e.to_string(),
538            })?;
539        Ok(SavepointId(0))
540    }
541
542    async fn rollback_savepoint(&self, _id: SavepointId) -> std::result::Result<(), LocyError> {
543        let writer = self
544            .db
545            .writer
546            .as_ref()
547            .ok_or_else(|| LocyError::SavepointFailed {
548                message: "database is read-only".to_string(),
549            })?;
550        let mut w = writer.write().await;
551        w.rollback_transaction()
552            .map_err(|e| LocyError::SavepointFailed {
553                message: e.to_string(),
554            })?;
555        Ok(())
556    }
557
558    async fn re_evaluate_strata(
559        &self,
560        program: &CompiledProgram,
561        config: &LocyConfig,
562    ) -> std::result::Result<RowStore, LocyError> {
563        let strata_only = CompiledProgram {
564            strata: program.strata.clone(),
565            rule_catalog: program.rule_catalog.clone(),
566            warnings: vec![],
567            commands: vec![],
568        };
569        let engine = self.db.locy();
570        let native_store = engine
571            .run_strata_native(&strata_only, config)
572            .await
573            .map_err(|e| LocyError::ExecutorError {
574                message: e.to_string(),
575            })?;
576        Ok(native_store_to_row_store(&native_store, program))
577    }
578}
579
580// ── Native command dispatch ───────────────────────────────────────────────────
581
582#[allow(clippy::too_many_arguments)]
583fn dispatch_native_command<'a>(
584    cmd: &'a CompiledCommand,
585    program: &'a CompiledProgram,
586    ctx: &'a NativeExecutionAdapter<'a>,
587    config: &'a LocyConfig,
588    orch_store: &'a mut RowStore,
589    stats: &'a mut LocyStats,
590    tracker: Option<Arc<DerivationTracker>>,
591    start: Instant,
592) -> std::pin::Pin<
593    Box<dyn std::future::Future<Output = std::result::Result<CommandResult, LocyError>> + 'a>,
594> {
595    Box::pin(async move {
596        match cmd {
597            CompiledCommand::GoalQuery(gq) => {
598                let rows = uni_query::query::df_graph::locy_query::evaluate_query(
599                    gq, program, ctx, config, orch_store, stats, start,
600                )
601                .await?;
602                Ok(CommandResult::Query(rows))
603            }
604            CompiledCommand::ExplainRule(eq) => {
605                let node = uni_query::query::df_graph::locy_explain::explain_rule(
606                    eq,
607                    program,
608                    ctx,
609                    config,
610                    orch_store,
611                    stats,
612                    tracker.as_deref(),
613                )
614                .await?;
615                Ok(CommandResult::Explain(node))
616            }
617            CompiledCommand::Assume(ca) => {
618                let rows = uni_query::query::df_graph::locy_assume::evaluate_assume(
619                    ca, program, ctx, config, stats,
620                )
621                .await?;
622                Ok(CommandResult::Assume(rows))
623            }
624            CompiledCommand::Abduce(aq) => {
625                let result = uni_query::query::df_graph::locy_abduce::evaluate_abduce(
626                    aq,
627                    program,
628                    ctx,
629                    config,
630                    orch_store,
631                    stats,
632                    tracker.as_deref(),
633                )
634                .await?;
635                Ok(CommandResult::Abduce(result))
636            }
637            CompiledCommand::DeriveCommand(dc) => {
638                let affected = uni_query::query::df_graph::locy_derive::derive_command(
639                    dc, program, ctx, stats,
640                )
641                .await?;
642                Ok(CommandResult::Derive { affected })
643            }
644            CompiledCommand::Cypher(q) => {
645                let rows = ctx.execute_cypher_read(q.clone()).await?;
646                stats.queries_executed += 1;
647                Ok(CommandResult::Cypher(rows))
648            }
649        }
650    })
651}
652
653// ── Helpers ───────────────────────────────────────────────────────────────────
654
655async fn enrich_vids_with_nodes(
656    db: &Uni,
657    native_store: &uni_query::query::df_graph::DerivedStore,
658    derived: HashMap<String, Vec<Row>>,
659    graph_ctx: &Arc<uni_query::query::df_graph::GraphExecutionContext>,
660    session_ctx: &Arc<parking_lot::RwLock<datafusion::prelude::SessionContext>>,
661) -> HashMap<String, Vec<Row>> {
662    use arrow_schema::DataType;
663    let mut enriched = HashMap::new();
664
665    for (name, rows) in derived {
666        let vid_columns: HashSet<String> = native_store
667            .get(&name)
668            .and_then(|batches| batches.first())
669            .map(|batch| {
670                batch
671                    .schema()
672                    .fields()
673                    .iter()
674                    .filter(|f| *f.data_type() == DataType::UInt64)
675                    .map(|f| f.name().clone())
676                    .collect()
677            })
678            .unwrap_or_default();
679
680        if vid_columns.is_empty() {
681            enriched.insert(name, rows);
682            continue;
683        }
684
685        let unique_vids: HashSet<i64> = rows
686            .iter()
687            .flat_map(|row| {
688                vid_columns.iter().filter_map(|col| {
689                    if let Some(Value::Int(vid)) = row.get(col) {
690                        Some(*vid)
691                    } else {
692                        None
693                    }
694                })
695            })
696            .collect();
697
698        if unique_vids.is_empty() {
699            enriched.insert(name, rows);
700            continue;
701        }
702
703        let vids_literal = unique_vids
704            .iter()
705            .map(|v| v.to_string())
706            .collect::<Vec<_>>()
707            .join(", ");
708        let query_str = format!(
709            "MATCH (n) WHERE id(n) IN [{}] RETURN id(n) AS _vid, n",
710            vids_literal
711        );
712        let mut vid_to_node: HashMap<i64, Value> = HashMap::new();
713        if let Ok(ast) = uni_cypher::parse(&query_str) {
714            let schema = db.schema.schema();
715            if let Ok(logical_plan) = uni_query::QueryPlanner::new(schema).plan(ast)
716                && let Ok(batches) = uni_query::query::df_graph::common::execute_subplan(
717                    &logical_plan,
718                    &HashMap::new(),
719                    &HashMap::new(),
720                    graph_ctx,
721                    session_ctx,
722                    &db.storage,
723                    &db.schema.schema(),
724                )
725                .await
726            {
727                for row in record_batches_to_locy_rows(&batches) {
728                    if let (Some(Value::Int(vid)), Some(node)) = (row.get("_vid"), row.get("n")) {
729                        vid_to_node.insert(*vid, node.clone());
730                    }
731                }
732            }
733        }
734
735        let enriched_rows: Vec<Row> = rows
736            .into_iter()
737            .map(|row| {
738                row.into_iter()
739                    .map(|(k, v)| {
740                        if vid_columns.contains(&k)
741                            && let Value::Int(vid) = &v
742                        {
743                            let new_v = vid_to_node.get(vid).cloned().unwrap_or(v);
744                            return (k, new_v);
745                        }
746                        (k, v)
747                    })
748                    .collect()
749            })
750            .collect();
751        enriched.insert(name, enriched_rows);
752    }
753
754    enriched
755}
756
757fn build_locy_result(
758    derived: HashMap<String, Vec<Row>>,
759    command_results: Vec<CommandResult>,
760    compiled: &CompiledProgram,
761    evaluation_time: Duration,
762    mut orchestrator_stats: LocyStats,
763) -> LocyResult {
764    let total_facts: usize = derived.values().map(|v| v.len()).sum();
765    orchestrator_stats.strata_evaluated = compiled.strata.len();
766    orchestrator_stats.derived_nodes = total_facts;
767    orchestrator_stats.evaluation_time = evaluation_time;
768
769    LocyResult {
770        derived,
771        stats: orchestrator_stats,
772        command_results,
773    }
774}
775
776fn native_store_to_row_store(
777    native: &uni_query::query::df_graph::DerivedStore,
778    compiled: &CompiledProgram,
779) -> RowStore {
780    let mut result = RowStore::new();
781    for name in native.rule_names() {
782        if let Some(batches) = native.get(name) {
783            let rows = record_batches_to_locy_rows(batches);
784            let rule = compiled.rule_catalog.get(name);
785            let columns: Vec<String> = rule
786                .map(|r| r.yield_schema.iter().map(|yc| yc.name.clone()).collect())
787                .unwrap_or_else(|| {
788                    rows.first()
789                        .map(|r| r.keys().cloned().collect())
790                        .unwrap_or_default()
791                });
792            result.insert(name.to_string(), RowRelation::new(columns, rows));
793        }
794    }
795    result
796}
797
798// ── Error mapping ──────────────────────────────────────────────────────────
799
800fn map_parse_error(e: uni_cypher::ParseError) -> UniError {
801    UniError::Parse {
802        message: format!("LocyParseError: {e}"),
803        position: None,
804        line: None,
805        column: None,
806        context: None,
807    }
808}
809
810fn map_compile_error(e: LocyCompileError) -> UniError {
811    UniError::Query {
812        message: format!("LocyCompileError: {e}"),
813        query: None,
814    }
815}
816
817fn map_runtime_error(e: LocyError) -> UniError {
818    match e {
819        LocyError::SavepointFailed { ref message } => UniError::Transaction {
820            message: format!("LocyRuntimeError: {message}"),
821        },
822        other => UniError::Query {
823            message: format!("LocyRuntimeError: {other}"),
824            query: None,
825        },
826    }
827}
828
829fn map_native_df_error(e: impl std::fmt::Display) -> UniError {
830    UniError::Query {
831        message: format!("LocyRuntimeError: {e}"),
832        query: None,
833    }
834}