1use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use arrow_array::RecordBatch;
11use async_trait::async_trait;
12use uni_common::{Result, UniError, Value};
13use uni_cypher::ast::{Expr, Pattern, Query};
14use uni_cypher::locy_ast::RuleOutput;
15use uni_locy::types::CompiledCommand;
16use uni_locy::{
17 CommandResult, CompiledProgram, LocyCompileError, LocyConfig, LocyError, LocyResult, LocyStats,
18 Row, SavepointId, compile,
19};
20use uni_query::QueryPlanner;
21use uni_query::query::df_graph::locy_ast_builder::build_match_return_query;
22use uni_query::query::df_graph::locy_delta::{RowRelation, RowStore, extract_cypher_conditions};
23use uni_query::query::df_graph::locy_eval::record_batches_to_locy_rows;
24use uni_query::query::df_graph::locy_explain::DerivationTracker;
25use uni_query::query::df_graph::{DerivedFactSource, LocyExecutionContext};
26
27use crate::api::Uni;
28
29pub struct LocyEngine<'a> {
31 db: &'a Uni,
32}
33
34impl Uni {
35 pub fn locy(&self) -> LocyEngine<'_> {
37 LocyEngine { db: self }
38 }
39}
40
41impl<'a> LocyEngine<'a> {
42 pub fn compile_only(&self, program: &str) -> Result<CompiledProgram> {
44 let ast = uni_cypher::parse_locy(program).map_err(map_parse_error)?;
45 compile(&ast).map_err(map_compile_error)
46 }
47
48 pub async fn evaluate(&self, program: &str) -> Result<LocyResult> {
50 self.evaluate_with_config(program, &LocyConfig::default())
51 .await
52 }
53
54 pub async fn explain(&self, program: &str) -> Result<LocyResult> {
56 self.evaluate(program).await
57 }
58
59 pub async fn evaluate_with_config(
61 &self,
62 program: &str,
63 config: &LocyConfig,
64 ) -> Result<LocyResult> {
65 let compiled = self.compile_only(program)?;
66 let start = Instant::now();
67
68 let schema = self.db.schema.schema();
70 let query_planner = uni_query::QueryPlanner::new(schema);
71 let plan_builder = uni_query::query::locy_planner::LocyPlanBuilder::new(&query_planner);
72 let logical = plan_builder
73 .build_program_plan(
74 &compiled,
75 config.max_iterations,
76 config.timeout,
77 config.max_derived_bytes,
78 config.deterministic_best_by,
79 )
80 .map_err(|e| UniError::Query {
81 message: format!("LocyPlanBuildError: {e}"),
82 query: None,
83 })?;
84
85 let mut df_executor = uni_query::Executor::new(self.db.storage.clone());
87 df_executor.set_config(self.db.config.clone());
88 if let Some(ref w) = self.db.writer {
89 df_executor.set_writer(w.clone());
90 }
91 df_executor.set_xervo_runtime(self.db.xervo_runtime.clone());
92 df_executor.set_procedure_registry(self.db.procedure_registry.clone());
93
94 let (session_ctx, planner, _prop_mgr) = df_executor
95 .create_datafusion_planner(&self.db.properties, &HashMap::new())
96 .await
97 .map_err(map_native_df_error)?;
98
99 let exec_plan = planner.plan(&logical).map_err(map_native_df_error)?;
101
102 let has_explain = compiled
104 .commands
105 .iter()
106 .any(|c| matches!(c, CompiledCommand::ExplainRule(_)));
107 let tracker: Option<Arc<uni_query::query::df_graph::DerivationTracker>> = if has_explain {
108 Some(Arc::new(
109 uni_query::query::df_graph::DerivationTracker::new(),
110 ))
111 } else {
112 None
113 };
114
115 let (derived_store_slot, iteration_counts_slot, peak_memory_slot) =
116 if let Some(program_exec) = exec_plan
117 .as_any()
118 .downcast_ref::<uni_query::query::df_graph::LocyProgramExec>(
119 ) {
120 if let Some(ref t) = tracker {
121 program_exec.set_derivation_tracker(Arc::clone(t));
122 }
123 (
124 program_exec.derived_store_slot(),
125 program_exec.iteration_counts_slot(),
126 program_exec.peak_memory_slot(),
127 )
128 } else {
129 (
130 Arc::new(std::sync::RwLock::new(None)),
131 Arc::new(std::sync::RwLock::new(std::collections::HashMap::new())),
132 Arc::new(std::sync::RwLock::new(0usize)),
133 )
134 };
135
136 let _stats_batches = uni_query::Executor::collect_batches(&session_ctx, exec_plan)
138 .await
139 .map_err(map_native_df_error)?;
140
141 let native_store = derived_store_slot
143 .write()
144 .unwrap()
145 .take()
146 .unwrap_or_default();
147
148 let mut orch_store = native_store_to_row_store(&native_store, &compiled);
150
151 let native_ctx = NativeExecutionAdapter::new(
153 self.db,
154 &native_store,
155 &compiled,
156 planner.graph_ctx().clone(),
157 planner.session_ctx().clone(),
158 );
159 let mut locy_stats = LocyStats {
160 total_iterations: iteration_counts_slot
161 .read()
162 .map(|c| c.values().sum::<usize>())
163 .unwrap_or(0),
164 peak_memory_bytes: peak_memory_slot.read().map(|v| *v).unwrap_or(0),
165 ..LocyStats::default()
166 };
167 let mut command_results = Vec::new();
168 for cmd in &compiled.commands {
169 let result = dispatch_native_command(
170 cmd,
171 &compiled,
172 &native_ctx,
173 config,
174 &mut orch_store,
175 &mut locy_stats,
176 tracker.clone(),
177 start,
178 )
179 .await
180 .map_err(map_runtime_error)?;
181 command_results.push(result);
182 }
183
184 let evaluation_time = start.elapsed();
185
186 let base_derived: HashMap<String, Vec<Row>> = native_store
188 .rule_names()
189 .filter_map(|name| {
190 native_store
191 .get(name)
192 .map(|batches| (name.to_string(), record_batches_to_locy_rows(batches)))
193 })
194 .collect();
195 let enriched_derived = enrich_vids_with_nodes(
196 self.db,
197 &native_store,
198 base_derived,
199 planner.graph_ctx(),
200 planner.session_ctx(),
201 )
202 .await;
203
204 Ok(build_locy_result(
206 enriched_derived,
207 command_results,
208 &compiled,
209 evaluation_time,
210 locy_stats,
211 ))
212 }
213
214 async fn run_strata_native(
220 &self,
221 compiled: &CompiledProgram,
222 config: &LocyConfig,
223 ) -> Result<uni_query::query::df_graph::DerivedStore> {
224 let schema = self.db.schema.schema();
225 let query_planner = uni_query::QueryPlanner::new(schema);
226 let plan_builder = uni_query::query::locy_planner::LocyPlanBuilder::new(&query_planner);
227 let logical = plan_builder
228 .build_program_plan(
229 compiled,
230 config.max_iterations,
231 config.timeout,
232 config.max_derived_bytes,
233 config.deterministic_best_by,
234 )
235 .map_err(|e| UniError::Query {
236 message: format!("LocyPlanBuildError: {e}"),
237 query: None,
238 })?;
239
240 let mut df_executor = uni_query::Executor::new(self.db.storage.clone());
241 df_executor.set_config(self.db.config.clone());
242 if let Some(ref w) = self.db.writer {
243 df_executor.set_writer(w.clone());
244 }
245 df_executor.set_xervo_runtime(self.db.xervo_runtime.clone());
246 df_executor.set_procedure_registry(self.db.procedure_registry.clone());
247
248 let (session_ctx, planner, _) = df_executor
249 .create_datafusion_planner(&self.db.properties, &HashMap::new())
250 .await
251 .map_err(map_native_df_error)?;
252 let exec_plan = planner.plan(&logical).map_err(map_native_df_error)?;
253
254 let derived_store_slot = if let Some(program_exec) =
255 exec_plan
256 .as_any()
257 .downcast_ref::<uni_query::query::df_graph::LocyProgramExec>()
258 {
259 program_exec.derived_store_slot()
260 } else {
261 Arc::new(std::sync::RwLock::new(None))
262 };
263
264 let _ = uni_query::Executor::collect_batches(&session_ctx, exec_plan)
265 .await
266 .map_err(map_native_df_error)?;
267 Ok(derived_store_slot
268 .write()
269 .unwrap()
270 .take()
271 .unwrap_or_default())
272 }
273}
274
275struct NativeExecutionAdapter<'a> {
278 db: &'a Uni,
279 native_store: &'a uni_query::query::df_graph::DerivedStore,
280 compiled: &'a CompiledProgram,
281 graph_ctx: Arc<uni_query::query::df_graph::GraphExecutionContext>,
283 session_ctx: Arc<parking_lot::RwLock<datafusion::prelude::SessionContext>>,
284}
285
286impl<'a> NativeExecutionAdapter<'a> {
287 fn new(
288 db: &'a Uni,
289 native_store: &'a uni_query::query::df_graph::DerivedStore,
290 compiled: &'a CompiledProgram,
291 graph_ctx: Arc<uni_query::query::df_graph::GraphExecutionContext>,
292 session_ctx: Arc<parking_lot::RwLock<datafusion::prelude::SessionContext>>,
293 ) -> Self {
294 Self {
295 db,
296 native_store,
297 compiled,
298 graph_ctx,
299 session_ctx,
300 }
301 }
302
303 async fn execute_query_ast(
305 &self,
306 ast: Query,
307 ) -> std::result::Result<Vec<RecordBatch>, LocyError> {
308 let schema = self.db.schema.schema();
309 let logical_plan =
310 QueryPlanner::new(schema)
311 .plan(ast)
312 .map_err(|e| LocyError::ExecutorError {
313 message: e.to_string(),
314 })?;
315 uni_query::query::df_graph::common::execute_subplan(
316 &logical_plan,
317 &HashMap::new(),
318 &HashMap::new(),
319 &self.graph_ctx,
320 &self.session_ctx,
321 &self.db.storage,
322 &self.db.schema.schema(),
323 )
324 .await
325 .map_err(|e| LocyError::ExecutorError {
326 message: e.to_string(),
327 })
328 }
329}
330
331#[async_trait(?Send)]
332impl DerivedFactSource for NativeExecutionAdapter<'_> {
333 fn lookup_derived(&self, rule_name: &str) -> std::result::Result<Vec<Row>, LocyError> {
334 let batches = self
335 .native_store
336 .get(rule_name)
337 .map(|v| v.as_slice())
338 .unwrap_or(&[]);
339 Ok(record_batches_to_locy_rows(batches))
340 }
341
342 fn lookup_derived_batches(
343 &self,
344 rule_name: &str,
345 ) -> std::result::Result<Vec<RecordBatch>, LocyError> {
346 Ok(self
347 .native_store
348 .get(rule_name)
349 .map(|v| v.to_vec())
350 .unwrap_or_default())
351 }
352
353 async fn execute_pattern(
354 &self,
355 pattern: &Pattern,
356 where_conditions: &[Expr],
357 ) -> std::result::Result<Vec<RecordBatch>, LocyError> {
358 let query = build_match_return_query(pattern, where_conditions);
359 let schema = self.db.schema.schema();
360 let logical_plan =
361 QueryPlanner::new(schema)
362 .plan(query)
363 .map_err(|e| LocyError::ExecutorError {
364 message: e.to_string(),
365 })?;
366
367 uni_query::query::df_graph::common::execute_subplan(
369 &logical_plan,
370 &HashMap::new(),
371 &HashMap::new(),
372 &self.graph_ctx,
373 &self.session_ctx,
374 &self.db.storage,
375 &self.db.schema.schema(),
376 )
377 .await
378 .map_err(|e| LocyError::ExecutorError {
379 message: e.to_string(),
380 })
381 }
382}
383
384#[async_trait(?Send)]
385impl LocyExecutionContext for NativeExecutionAdapter<'_> {
386 async fn lookup_derived_enriched(
387 &self,
388 rule_name: &str,
389 ) -> std::result::Result<Vec<Row>, LocyError> {
390 use arrow_schema::DataType;
391
392 if let Some(rule) = self.compiled.rule_catalog.get(rule_name) {
393 let is_derive_rule = rule
394 .clauses
395 .iter()
396 .all(|c| matches!(c.output, RuleOutput::Derive(_)));
397 if is_derive_rule {
398 let mut all_rows = Vec::new();
399 for clause in &rule.clauses {
400 let cypher_conds = extract_cypher_conditions(&clause.where_conditions);
401 let raw_batches = self
402 .execute_pattern(&clause.match_pattern, &cypher_conds)
403 .await?;
404 all_rows.extend(record_batches_to_locy_rows(&raw_batches));
405 }
406 return Ok(all_rows);
407 }
408 }
409
410 let batches = self
411 .native_store
412 .get(rule_name)
413 .map(|v| v.as_slice())
414 .unwrap_or(&[]);
415 let rows = record_batches_to_locy_rows(batches);
416
417 let vid_columns: HashSet<String> = batches
418 .first()
419 .map(|batch| {
420 batch
421 .schema()
422 .fields()
423 .iter()
424 .filter(|f| *f.data_type() == DataType::UInt64)
425 .map(|f| f.name().clone())
426 .collect()
427 })
428 .unwrap_or_default();
429
430 if vid_columns.is_empty() {
431 return Ok(rows);
432 }
433
434 let unique_vids: HashSet<i64> = rows
435 .iter()
436 .flat_map(|row| {
437 vid_columns.iter().filter_map(|col| {
438 if let Some(Value::Int(vid)) = row.get(col) {
439 Some(*vid)
440 } else {
441 None
442 }
443 })
444 })
445 .collect();
446
447 if unique_vids.is_empty() {
448 return Ok(rows);
449 }
450
451 let vids_literal = unique_vids
452 .iter()
453 .map(|v| v.to_string())
454 .collect::<Vec<_>>()
455 .join(", ");
456 let query_str =
457 format!("MATCH (n) WHERE id(n) IN [{vids_literal}] RETURN id(n) AS _vid, n");
458 let mut vid_to_node: HashMap<i64, Value> = HashMap::new();
459 if let Ok(ast) = uni_cypher::parse(&query_str)
460 && let Ok(batches) = self.execute_query_ast(ast).await
461 {
462 for row in record_batches_to_locy_rows(&batches) {
463 if let (Some(Value::Int(vid)), Some(node)) = (row.get("_vid"), row.get("n")) {
464 vid_to_node.insert(*vid, node.clone());
465 }
466 }
467 }
468
469 Ok(rows
470 .into_iter()
471 .map(|row| {
472 row.into_iter()
473 .map(|(k, v)| {
474 if vid_columns.contains(&k)
475 && let Value::Int(vid) = &v
476 {
477 let new_v = vid_to_node.get(vid).cloned().unwrap_or(v);
478 return (k, new_v);
479 }
480 (k, v)
481 })
482 .collect()
483 })
484 .collect())
485 }
486
487 async fn execute_cypher_read(&self, ast: Query) -> std::result::Result<Vec<Row>, LocyError> {
488 let result = self
491 .db
492 .execute_ast_internal(ast, "<locy>", HashMap::new(), self.db.config.clone())
493 .await
494 .map_err(|e| LocyError::ExecutorError {
495 message: e.to_string(),
496 })?;
497 Ok(result
498 .rows
499 .into_iter()
500 .map(|row| {
501 row.columns
502 .iter()
503 .zip(row.values)
504 .map(|(col, val)| (col.clone(), val))
505 .collect()
506 })
507 .collect())
508 }
509
510 async fn execute_mutation(
511 &self,
512 ast: Query,
513 params: HashMap<String, Value>,
514 ) -> std::result::Result<usize, LocyError> {
515 let before = self.db.get_mutation_count().await;
516 self.db
517 .execute_ast_internal(ast, "<locy>", params, self.db.config.clone())
518 .await
519 .map_err(|e| LocyError::ExecutorError {
520 message: e.to_string(),
521 })?;
522 let after = self.db.get_mutation_count().await;
523 Ok(after.saturating_sub(before))
524 }
525
526 async fn begin_savepoint(&self) -> std::result::Result<SavepointId, LocyError> {
527 let writer = self
528 .db
529 .writer
530 .as_ref()
531 .ok_or_else(|| LocyError::SavepointFailed {
532 message: "database is read-only".to_string(),
533 })?;
534 let mut w = writer.write().await;
535 w.begin_transaction()
536 .map_err(|e| LocyError::SavepointFailed {
537 message: e.to_string(),
538 })?;
539 Ok(SavepointId(0))
540 }
541
542 async fn rollback_savepoint(&self, _id: SavepointId) -> std::result::Result<(), LocyError> {
543 let writer = self
544 .db
545 .writer
546 .as_ref()
547 .ok_or_else(|| LocyError::SavepointFailed {
548 message: "database is read-only".to_string(),
549 })?;
550 let mut w = writer.write().await;
551 w.rollback_transaction()
552 .map_err(|e| LocyError::SavepointFailed {
553 message: e.to_string(),
554 })?;
555 Ok(())
556 }
557
558 async fn re_evaluate_strata(
559 &self,
560 program: &CompiledProgram,
561 config: &LocyConfig,
562 ) -> std::result::Result<RowStore, LocyError> {
563 let strata_only = CompiledProgram {
564 strata: program.strata.clone(),
565 rule_catalog: program.rule_catalog.clone(),
566 warnings: vec![],
567 commands: vec![],
568 };
569 let engine = self.db.locy();
570 let native_store = engine
571 .run_strata_native(&strata_only, config)
572 .await
573 .map_err(|e| LocyError::ExecutorError {
574 message: e.to_string(),
575 })?;
576 Ok(native_store_to_row_store(&native_store, program))
577 }
578}
579
580#[allow(clippy::too_many_arguments)]
583fn dispatch_native_command<'a>(
584 cmd: &'a CompiledCommand,
585 program: &'a CompiledProgram,
586 ctx: &'a NativeExecutionAdapter<'a>,
587 config: &'a LocyConfig,
588 orch_store: &'a mut RowStore,
589 stats: &'a mut LocyStats,
590 tracker: Option<Arc<DerivationTracker>>,
591 start: Instant,
592) -> std::pin::Pin<
593 Box<dyn std::future::Future<Output = std::result::Result<CommandResult, LocyError>> + 'a>,
594> {
595 Box::pin(async move {
596 match cmd {
597 CompiledCommand::GoalQuery(gq) => {
598 let rows = uni_query::query::df_graph::locy_query::evaluate_query(
599 gq, program, ctx, config, orch_store, stats, start,
600 )
601 .await?;
602 Ok(CommandResult::Query(rows))
603 }
604 CompiledCommand::ExplainRule(eq) => {
605 let node = uni_query::query::df_graph::locy_explain::explain_rule(
606 eq,
607 program,
608 ctx,
609 config,
610 orch_store,
611 stats,
612 tracker.as_deref(),
613 )
614 .await?;
615 Ok(CommandResult::Explain(node))
616 }
617 CompiledCommand::Assume(ca) => {
618 let rows = uni_query::query::df_graph::locy_assume::evaluate_assume(
619 ca, program, ctx, config, stats,
620 )
621 .await?;
622 Ok(CommandResult::Assume(rows))
623 }
624 CompiledCommand::Abduce(aq) => {
625 let result = uni_query::query::df_graph::locy_abduce::evaluate_abduce(
626 aq,
627 program,
628 ctx,
629 config,
630 orch_store,
631 stats,
632 tracker.as_deref(),
633 )
634 .await?;
635 Ok(CommandResult::Abduce(result))
636 }
637 CompiledCommand::DeriveCommand(dc) => {
638 let affected = uni_query::query::df_graph::locy_derive::derive_command(
639 dc, program, ctx, stats,
640 )
641 .await?;
642 Ok(CommandResult::Derive { affected })
643 }
644 CompiledCommand::Cypher(q) => {
645 let rows = ctx.execute_cypher_read(q.clone()).await?;
646 stats.queries_executed += 1;
647 Ok(CommandResult::Cypher(rows))
648 }
649 }
650 })
651}
652
653async fn enrich_vids_with_nodes(
656 db: &Uni,
657 native_store: &uni_query::query::df_graph::DerivedStore,
658 derived: HashMap<String, Vec<Row>>,
659 graph_ctx: &Arc<uni_query::query::df_graph::GraphExecutionContext>,
660 session_ctx: &Arc<parking_lot::RwLock<datafusion::prelude::SessionContext>>,
661) -> HashMap<String, Vec<Row>> {
662 use arrow_schema::DataType;
663 let mut enriched = HashMap::new();
664
665 for (name, rows) in derived {
666 let vid_columns: HashSet<String> = native_store
667 .get(&name)
668 .and_then(|batches| batches.first())
669 .map(|batch| {
670 batch
671 .schema()
672 .fields()
673 .iter()
674 .filter(|f| *f.data_type() == DataType::UInt64)
675 .map(|f| f.name().clone())
676 .collect()
677 })
678 .unwrap_or_default();
679
680 if vid_columns.is_empty() {
681 enriched.insert(name, rows);
682 continue;
683 }
684
685 let unique_vids: HashSet<i64> = rows
686 .iter()
687 .flat_map(|row| {
688 vid_columns.iter().filter_map(|col| {
689 if let Some(Value::Int(vid)) = row.get(col) {
690 Some(*vid)
691 } else {
692 None
693 }
694 })
695 })
696 .collect();
697
698 if unique_vids.is_empty() {
699 enriched.insert(name, rows);
700 continue;
701 }
702
703 let vids_literal = unique_vids
704 .iter()
705 .map(|v| v.to_string())
706 .collect::<Vec<_>>()
707 .join(", ");
708 let query_str = format!(
709 "MATCH (n) WHERE id(n) IN [{}] RETURN id(n) AS _vid, n",
710 vids_literal
711 );
712 let mut vid_to_node: HashMap<i64, Value> = HashMap::new();
713 if let Ok(ast) = uni_cypher::parse(&query_str) {
714 let schema = db.schema.schema();
715 if let Ok(logical_plan) = uni_query::QueryPlanner::new(schema).plan(ast)
716 && let Ok(batches) = uni_query::query::df_graph::common::execute_subplan(
717 &logical_plan,
718 &HashMap::new(),
719 &HashMap::new(),
720 graph_ctx,
721 session_ctx,
722 &db.storage,
723 &db.schema.schema(),
724 )
725 .await
726 {
727 for row in record_batches_to_locy_rows(&batches) {
728 if let (Some(Value::Int(vid)), Some(node)) = (row.get("_vid"), row.get("n")) {
729 vid_to_node.insert(*vid, node.clone());
730 }
731 }
732 }
733 }
734
735 let enriched_rows: Vec<Row> = rows
736 .into_iter()
737 .map(|row| {
738 row.into_iter()
739 .map(|(k, v)| {
740 if vid_columns.contains(&k)
741 && let Value::Int(vid) = &v
742 {
743 let new_v = vid_to_node.get(vid).cloned().unwrap_or(v);
744 return (k, new_v);
745 }
746 (k, v)
747 })
748 .collect()
749 })
750 .collect();
751 enriched.insert(name, enriched_rows);
752 }
753
754 enriched
755}
756
757fn build_locy_result(
758 derived: HashMap<String, Vec<Row>>,
759 command_results: Vec<CommandResult>,
760 compiled: &CompiledProgram,
761 evaluation_time: Duration,
762 mut orchestrator_stats: LocyStats,
763) -> LocyResult {
764 let total_facts: usize = derived.values().map(|v| v.len()).sum();
765 orchestrator_stats.strata_evaluated = compiled.strata.len();
766 orchestrator_stats.derived_nodes = total_facts;
767 orchestrator_stats.evaluation_time = evaluation_time;
768
769 LocyResult {
770 derived,
771 stats: orchestrator_stats,
772 command_results,
773 }
774}
775
776fn native_store_to_row_store(
777 native: &uni_query::query::df_graph::DerivedStore,
778 compiled: &CompiledProgram,
779) -> RowStore {
780 let mut result = RowStore::new();
781 for name in native.rule_names() {
782 if let Some(batches) = native.get(name) {
783 let rows = record_batches_to_locy_rows(batches);
784 let rule = compiled.rule_catalog.get(name);
785 let columns: Vec<String> = rule
786 .map(|r| r.yield_schema.iter().map(|yc| yc.name.clone()).collect())
787 .unwrap_or_else(|| {
788 rows.first()
789 .map(|r| r.keys().cloned().collect())
790 .unwrap_or_default()
791 });
792 result.insert(name.to_string(), RowRelation::new(columns, rows));
793 }
794 }
795 result
796}
797
798fn map_parse_error(e: uni_cypher::ParseError) -> UniError {
801 UniError::Parse {
802 message: format!("LocyParseError: {e}"),
803 position: None,
804 line: None,
805 column: None,
806 context: None,
807 }
808}
809
810fn map_compile_error(e: LocyCompileError) -> UniError {
811 UniError::Query {
812 message: format!("LocyCompileError: {e}"),
813 query: None,
814 }
815}
816
817fn map_runtime_error(e: LocyError) -> UniError {
818 match e {
819 LocyError::SavepointFailed { ref message } => UniError::Transaction {
820 message: format!("LocyRuntimeError: {message}"),
821 },
822 other => UniError::Query {
823 message: format!("LocyRuntimeError: {other}"),
824 query: None,
825 },
826 }
827}
828
829fn map_native_df_error(e: impl std::fmt::Display) -> UniError {
830 UniError::Query {
831 message: format!("LocyRuntimeError: {e}"),
832 query: None,
833 }
834}