Skip to main content

uni_db/api/
impl_query.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use futures::StreamExt;
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::Instant;
8use uni_common::{Result, UniConfig, UniError};
9use uni_query::{
10    ExplainOutput, LogicalPlan, ProfileOutput, QueryCursor, QueryMetrics, QueryResult,
11    ResultNormalizer, Row, Value as ApiValue,
12};
13
14/// Normalize backend/planner error text into canonical Cypher/TCK codes.
15///
16/// This keeps behavioral semantics unchanged while making error classification
17/// stable across planner backends.
18fn normalize_error_message(raw: &str, cypher: &str) -> String {
19    let mut normalized = raw.to_string();
20    let cypher_upper = cypher.to_uppercase();
21    let cypher_lower = cypher.to_lowercase();
22
23    if raw.contains("Error during planning: UDF") && raw.contains("is not registered") {
24        normalized = format!("SyntaxError: UnknownFunction - {}", raw);
25    } else if raw.contains("_cypher_in(): second argument must be a list") {
26        normalized = format!("TypeError: InvalidArgumentType - {}", raw);
27    } else if raw.contains("InvalidNumberOfArguments: Procedure") && raw.contains("got 0") {
28        if cypher_upper.contains("YIELD") {
29            normalized = format!("SyntaxError: InvalidArgumentPassingMode - {}", raw);
30        } else {
31            normalized = format!("ParameterMissing: MissingParameter - {}", raw);
32        }
33    } else if raw.contains("Function count not implemented or is aggregate")
34        || raw.contains("Physical plan does not support logical expression AggregateFunction")
35        || raw.contains("Expected aggregate function, got: ListComprehension")
36    {
37        normalized = format!("SyntaxError: InvalidAggregation - {}", raw);
38    } else if raw.contains("Expected aggregate function, got: BinaryOp") {
39        normalized = format!("SyntaxError: AmbiguousAggregationExpression - {}", raw);
40    } else if raw.contains("Schema error: No field named \"me.age\". Valid fields are \"count(you.age)\".")
41    {
42        normalized = format!("SyntaxError: UndefinedVariable - {}", raw);
43    } else if raw.contains(
44        "Schema error: No field named \"me.age\". Valid fields are \"me.age + you.age\", \"count(*)\".",
45    ) {
46        normalized = format!("SyntaxError: AmbiguousAggregationExpression - {}", raw);
47    } else if raw.contains("MERGE edge must have a type")
48        || raw.contains("MERGE does not support multiple edge types")
49    {
50        normalized = format!("SyntaxError: NoSingleRelationshipType - {}", raw);
51    } else if raw.contains("MERGE node must have a label") {
52        if cypher.contains("$param") {
53            normalized = format!("SyntaxError: InvalidParameterUse - {}", raw);
54        } else if cypher.contains('*') && cypher.contains("-[:") {
55            normalized = format!("SyntaxError: CreatingVarLength - {}", raw);
56        } else if cypher_lower.contains("on create set x.")
57            || cypher_lower.contains("on match set x.")
58        {
59            normalized = format!("SyntaxError: UndefinedVariable - {}", raw);
60        }
61    }
62
63    normalized
64}
65
66/// Convert a parse error into `UniError::Parse`.
67pub(crate) fn into_parse_error(e: impl std::fmt::Display) -> UniError {
68    UniError::Parse {
69        message: e.to_string(),
70        position: None,
71        line: None,
72        column: None,
73        context: None,
74    }
75}
76
77/// Convert a planner/compile-time error into the appropriate `UniError` type.
78///
79/// Errors starting with "SyntaxError:" are treated as parse/syntax errors.
80/// All other errors are query/semantic errors (CompileTime).
81pub(crate) fn into_query_error(e: impl std::fmt::Display, cypher: &str) -> UniError {
82    let msg = normalize_error_message(&e.to_string(), cypher);
83    // Errors containing "SyntaxError:" prefix should be treated as syntax errors
84    // This covers validation errors like VariableTypeConflict, UndefinedVariable, etc.
85    if msg.starts_with("SyntaxError:") {
86        UniError::Parse {
87            message: msg,
88            position: None,
89            line: None,
90            column: None,
91            context: Some(cypher.to_string()),
92        }
93    } else {
94        UniError::Query {
95            message: msg,
96            query: Some(cypher.to_string()),
97        }
98    }
99}
100
101/// Convert an executor/runtime error into the appropriate `UniError` type.
102/// TypeError messages from UDF execution become `UniError::Type` (Runtime phase).
103/// ConstraintVerificationFailed messages become `UniError::Constraint` (Runtime phase).
104/// All other executor errors remain `UniError::Query`.
105fn into_execution_error(e: impl std::fmt::Display, cypher: &str) -> UniError {
106    let msg = normalize_error_message(&e.to_string(), cypher);
107    if msg.contains("Query cancelled") {
108        UniError::Cancelled
109    } else if msg.contains("Query timed out") {
110        UniError::Query {
111            message: "Query timed out".to_string(),
112            query: Some(cypher.to_string()),
113        }
114    } else if msg.contains("Query exceeded memory limit") {
115        UniError::Query {
116            message: msg,
117            query: Some(cypher.to_string()),
118        }
119    } else if msg.contains("TypeError:") {
120        UniError::Type {
121            expected: msg,
122            actual: String::new(),
123        }
124    } else if msg.starts_with("ConstraintVerificationFailed:") {
125        UniError::Constraint { message: msg }
126    } else {
127        UniError::Query {
128            message: msg,
129            query: Some(cypher.to_string()),
130        }
131    }
132}
133
134/// Extract projection column names from a LogicalPlan, preserving query order.
135/// Returns None if the plan doesn't have projections at the top level.
136fn extract_projection_order(plan: &LogicalPlan) -> Option<Vec<String>> {
137    match plan {
138        LogicalPlan::Project { projections, .. } => Some(
139            projections
140                .iter()
141                .map(|(expr, alias)| alias.clone().unwrap_or_else(|| expr.to_string_repr()))
142                .collect(),
143        ),
144        LogicalPlan::Aggregate {
145            group_by,
146            aggregates,
147            ..
148        } => {
149            let mut names: Vec<String> = group_by.iter().map(|e| e.to_string_repr()).collect();
150            names.extend(aggregates.iter().map(|e| e.to_string_repr()));
151            Some(names)
152        }
153        LogicalPlan::Limit { input, .. }
154        | LogicalPlan::Sort { input, .. }
155        | LogicalPlan::Filter { input, .. } => extract_projection_order(input),
156        _ => None,
157    }
158}
159
160impl crate::api::UniInner {
161    /// Get the current L0Buffer mutation count (cumulative mutations since last flush).
162    /// Used to compute affected_rows for mutation queries that return no result rows.
163    pub(crate) async fn get_mutation_count(&self) -> usize {
164        match self.writer.as_ref() {
165            Some(w) => {
166                let writer = w.read().await;
167                writer.l0_manager.get_current().read().mutation_count
168            }
169            None => 0,
170        }
171    }
172
173    /// Get the current L0Buffer mutation stats snapshot.
174    /// Used together with `get_mutation_count` to compute per-type affected counters.
175    #[allow(dead_code)] // Reserved for future per-type affected_rows reporting
176    pub(crate) async fn get_mutation_stats(&self) -> uni_store::runtime::l0::MutationStats {
177        match self.writer.as_ref() {
178            Some(w) => {
179                let writer = w.read().await;
180                writer
181                    .l0_manager
182                    .get_current()
183                    .read()
184                    .mutation_stats
185                    .clone()
186            }
187            None => uni_store::runtime::l0::MutationStats::default(),
188        }
189    }
190
191    /// Explain a Cypher query plan without executing it.
192    pub(crate) async fn explain_internal(&self, cypher: &str) -> Result<ExplainOutput> {
193        let ast = uni_cypher::parse(cypher).map_err(into_parse_error)?;
194
195        let planner = uni_query::QueryPlanner::new(self.schema.schema().clone());
196        planner
197            .explain_plan(ast)
198            .map_err(|e| into_query_error(e, cypher))
199    }
200
201    /// Profile a Cypher query execution.
202    pub(crate) async fn profile_internal(
203        &self,
204        cypher: &str,
205        params: HashMap<String, ApiValue>,
206    ) -> Result<(QueryResult, ProfileOutput)> {
207        let ast = uni_cypher::parse(cypher).map_err(into_parse_error)?;
208
209        let planner = uni_query::QueryPlanner::new(self.schema.schema().clone());
210        let logical_plan = planner.plan(ast).map_err(|e| into_query_error(e, cypher))?;
211
212        let mut executor = uni_query::Executor::new(self.storage.clone());
213        executor.set_config(self.config.clone());
214        executor.set_xervo_runtime(self.xervo_runtime.clone());
215        executor.set_procedure_registry(self.procedure_registry.clone());
216        if let Ok(reg) = self.custom_functions.read()
217            && !reg.is_empty()
218        {
219            executor.set_custom_functions(Arc::new(reg.clone()));
220        }
221        if let Some(w) = &self.writer {
222            executor.set_writer(w.clone());
223        }
224
225        // Extract projection order
226        let projection_order = extract_projection_order(&logical_plan);
227
228        let (results, profile_output) = executor
229            .profile(logical_plan, &params)
230            .await
231            .map_err(|e| into_execution_error(e, cypher))?;
232
233        // Convert results to QueryResult
234        let columns = if results.is_empty() {
235            Arc::new(vec![])
236        } else if let Some(order) = projection_order {
237            Arc::new(order)
238        } else {
239            let mut cols: Vec<String> = results[0].keys().cloned().collect();
240            cols.sort();
241            Arc::new(cols)
242        };
243
244        let rows = results
245            .into_iter()
246            .map(|map| {
247                let mut values = Vec::with_capacity(columns.len());
248                for col in columns.iter() {
249                    let value = map.get(col).cloned().unwrap_or(ApiValue::Null);
250                    // Normalize to ensure proper Node/Edge/Path types
251                    let normalized =
252                        ResultNormalizer::normalize_value(value).unwrap_or(ApiValue::Null);
253                    values.push(normalized);
254                }
255                Row::new(columns.clone(), values)
256            })
257            .collect();
258
259        Ok((
260            QueryResult::new(columns, rows, Vec::new(), Default::default()),
261            profile_output,
262        ))
263    }
264
265    pub(crate) async fn execute_cursor_internal_with_config(
266        &self,
267        cypher: &str,
268        params: HashMap<String, ApiValue>,
269        config: UniConfig,
270    ) -> Result<QueryCursor> {
271        let ast = uni_cypher::parse(cypher).map_err(into_parse_error)?;
272
273        let planner =
274            uni_query::QueryPlanner::new(self.schema.schema().clone()).with_params(params.clone());
275        let logical_plan = planner.plan(ast).map_err(|e| into_query_error(e, cypher))?;
276
277        let mut executor = uni_query::Executor::new(self.storage.clone());
278        executor.set_config(config.clone());
279        executor.set_xervo_runtime(self.xervo_runtime.clone());
280        executor.set_procedure_registry(self.procedure_registry.clone());
281        if let Ok(reg) = self.custom_functions.read()
282            && !reg.is_empty()
283        {
284            executor.set_custom_functions(Arc::new(reg.clone()));
285        }
286        if let Some(w) = &self.writer {
287            executor.set_writer(w.clone());
288        }
289
290        let projection_order = extract_projection_order(&logical_plan);
291        let projection_order_for_rows = projection_order.clone();
292        let cypher_for_error = cypher.to_string();
293        let batch_size = config.batch_size;
294
295        let stream = executor.execute_stream(logical_plan, self.properties.clone(), params);
296
297        // Convert raw hash-map batches to Row batches, chunked by batch_size.
298        let row_stream = stream
299            .map(move |batch_res| {
300                let results = batch_res.map_err(|e| {
301                    let msg = normalize_error_message(&e.to_string(), &cypher_for_error);
302                    if msg.contains("TypeError:") {
303                        UniError::Type {
304                            expected: msg,
305                            actual: String::new(),
306                        }
307                    } else if msg.starts_with("ConstraintVerificationFailed:") {
308                        UniError::Constraint { message: msg }
309                    } else {
310                        UniError::Query {
311                            message: msg,
312                            query: Some(cypher_for_error.clone()),
313                        }
314                    }
315                })?;
316
317                if results.is_empty() {
318                    return Ok(vec![]);
319                }
320
321                // Determine columns for this batch
322                let columns = if let Some(order) = &projection_order_for_rows {
323                    Arc::new(order.clone())
324                } else {
325                    let mut cols: Vec<String> = results[0].keys().cloned().collect();
326                    cols.sort();
327                    Arc::new(cols)
328                };
329
330                let rows = results
331                    .into_iter()
332                    .map(|map| {
333                        let mut values = Vec::with_capacity(columns.len());
334                        for col in columns.iter() {
335                            let value = map.get(col).cloned().unwrap_or(ApiValue::Null);
336                            values.push(value);
337                        }
338                        Row::new(columns.clone(), values)
339                    })
340                    .collect::<Vec<Row>>();
341
342                Ok(rows)
343            })
344            // Re-chunk into batch_size-sized pieces
345            .flat_map(
346                move |batch_res: std::result::Result<Vec<Row>, UniError>| match batch_res {
347                    Ok(rows) if batch_size > 0 => {
348                        let chunks: Vec<_> =
349                            rows.chunks(batch_size).map(|c| Ok(c.to_vec())).collect();
350                        futures::stream::iter(chunks).boxed()
351                    }
352                    other => futures::stream::iter(vec![other]).boxed(),
353                },
354            );
355
356        // We need columns ahead of time for QueryCursor if possible.
357        let columns = if let Some(order) = projection_order {
358            Arc::new(order)
359        } else {
360            Arc::new(vec![])
361        };
362
363        Ok(QueryCursor::new(columns, Box::pin(row_stream)))
364    }
365
366    pub(crate) async fn execute_internal(
367        &self,
368        cypher: &str,
369        params: HashMap<String, ApiValue>,
370    ) -> Result<QueryResult> {
371        self.execute_internal_with_config(cypher, params, self.config.clone())
372            .await
373    }
374
375    /// Execute a Cypher query with a private transaction L0 buffer.
376    /// The tx_l0 is installed on the executor so both reads and mutations
377    /// are routed through the caller's private L0 (commit-time serialization).
378    pub(crate) async fn execute_internal_with_tx_l0(
379        &self,
380        cypher: &str,
381        params: HashMap<String, ApiValue>,
382        tx_l0: std::sync::Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>,
383    ) -> Result<QueryResult> {
384        let total_start = Instant::now();
385
386        let parse_start = Instant::now();
387        let ast = uni_cypher::parse(cypher).map_err(into_parse_error)?;
388        let parse_time = parse_start.elapsed();
389
390        let (ast, tt_spec) = match ast {
391            uni_cypher::ast::Query::TimeTravel { query, spec } => (*query, Some(spec)),
392            other => (other, None),
393        };
394
395        if tt_spec.is_some() {
396            return Err(UniError::Query {
397                message: "Time-travel queries are not supported within transactions".to_string(),
398                query: Some(cypher.to_string()),
399            });
400        }
401
402        let plan_start = Instant::now();
403        let planner =
404            uni_query::QueryPlanner::new(self.schema.schema().clone()).with_params(params.clone());
405        let logical_plan = planner.plan(ast).map_err(|e| into_query_error(e, cypher))?;
406        let plan_time = plan_start.elapsed();
407
408        let mut executor = uni_query::Executor::new(self.storage.clone());
409        executor.set_config(self.config.clone());
410        executor.set_xervo_runtime(self.xervo_runtime.clone());
411        executor.set_procedure_registry(self.procedure_registry.clone());
412        if let Ok(reg) = self.custom_functions.read()
413            && !reg.is_empty()
414        {
415            executor.set_custom_functions(Arc::new(reg.clone()));
416        }
417        if let Some(w) = &self.writer {
418            executor.set_writer(w.clone());
419        }
420        executor.set_transaction_l0(tx_l0);
421
422        let projection_order = extract_projection_order(&logical_plan);
423
424        let exec_start = Instant::now();
425        let results = executor
426            .execute(logical_plan, &self.properties, &params)
427            .await
428            .map_err(|e| into_execution_error(e, cypher))?;
429        let exec_time = exec_start.elapsed();
430
431        let columns = if results.is_empty() {
432            Arc::new(vec![])
433        } else if let Some(order) = projection_order {
434            Arc::new(order)
435        } else {
436            let mut cols: Vec<String> = results[0].keys().cloned().collect();
437            cols.sort();
438            Arc::new(cols)
439        };
440
441        let rows: Vec<Row> = results
442            .into_iter()
443            .map(|map| {
444                let mut values = Vec::with_capacity(columns.len());
445                for col in columns.iter() {
446                    let value = map.get(col).cloned().unwrap_or(ApiValue::Null);
447                    let normalized =
448                        ResultNormalizer::normalize_value(value).unwrap_or(ApiValue::Null);
449                    values.push(normalized);
450                }
451                Row::new(columns.clone(), values)
452            })
453            .collect();
454
455        let metrics = QueryMetrics {
456            parse_time,
457            plan_time,
458            exec_time,
459            total_time: total_start.elapsed(),
460            rows_returned: rows.len(),
461            ..Default::default()
462        };
463
464        Ok(QueryResult::new(
465            columns,
466            rows,
467            executor.take_warnings(),
468            metrics,
469        ))
470    }
471
472    /// Execute a cursor query with a private transaction L0 buffer.
473    ///
474    /// Mirrors `execute_cursor_internal_with_config` but installs the
475    /// caller's private L0 so reads see uncommitted transaction writes.
476    pub(crate) async fn execute_cursor_internal_with_tx_l0(
477        &self,
478        cypher: &str,
479        params: HashMap<String, ApiValue>,
480        tx_l0: std::sync::Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>,
481    ) -> Result<QueryCursor> {
482        let ast = uni_cypher::parse(cypher).map_err(into_parse_error)?;
483
484        let (ast, tt_spec) = match ast {
485            uni_cypher::ast::Query::TimeTravel { query, spec } => (*query, Some(spec)),
486            other => (other, None),
487        };
488
489        if tt_spec.is_some() {
490            return Err(UniError::Query {
491                message: "Time-travel queries are not supported within transactions".to_string(),
492                query: Some(cypher.to_string()),
493            });
494        }
495
496        let planner =
497            uni_query::QueryPlanner::new(self.schema.schema().clone()).with_params(params.clone());
498        let logical_plan = planner.plan(ast).map_err(|e| into_query_error(e, cypher))?;
499
500        let mut executor = uni_query::Executor::new(self.storage.clone());
501        executor.set_config(self.config.clone());
502        executor.set_xervo_runtime(self.xervo_runtime.clone());
503        executor.set_procedure_registry(self.procedure_registry.clone());
504        if let Ok(reg) = self.custom_functions.read()
505            && !reg.is_empty()
506        {
507            executor.set_custom_functions(Arc::new(reg.clone()));
508        }
509        if let Some(w) = &self.writer {
510            executor.set_writer(w.clone());
511        }
512        executor.set_transaction_l0(tx_l0);
513
514        let projection_order = extract_projection_order(&logical_plan);
515        let projection_order_for_rows = projection_order.clone();
516        let cypher_for_error = cypher.to_string();
517        let batch_size = self.config.batch_size;
518
519        let stream = executor.execute_stream(logical_plan, self.properties.clone(), params);
520
521        let row_stream = stream
522            .map(move |batch_res| {
523                let results = batch_res.map_err(|e| {
524                    let msg = normalize_error_message(&e.to_string(), &cypher_for_error);
525                    if msg.contains("TypeError:") {
526                        UniError::Type {
527                            expected: msg,
528                            actual: String::new(),
529                        }
530                    } else if msg.starts_with("ConstraintVerificationFailed:") {
531                        UniError::Constraint { message: msg }
532                    } else {
533                        UniError::Query {
534                            message: msg,
535                            query: Some(cypher_for_error.clone()),
536                        }
537                    }
538                })?;
539
540                if results.is_empty() {
541                    return Ok(vec![]);
542                }
543
544                let columns = if let Some(order) = &projection_order_for_rows {
545                    Arc::new(order.clone())
546                } else {
547                    let mut cols: Vec<String> = results[0].keys().cloned().collect();
548                    cols.sort();
549                    Arc::new(cols)
550                };
551
552                let rows = results
553                    .into_iter()
554                    .map(|map| {
555                        let mut values = Vec::with_capacity(columns.len());
556                        for col in columns.iter() {
557                            let value = map.get(col).cloned().unwrap_or(ApiValue::Null);
558                            values.push(value);
559                        }
560                        Row::new(columns.clone(), values)
561                    })
562                    .collect::<Vec<Row>>();
563
564                Ok(rows)
565            })
566            .flat_map(
567                move |batch_res: std::result::Result<Vec<Row>, UniError>| match batch_res {
568                    Ok(rows) if batch_size > 0 => {
569                        let chunks: Vec<_> =
570                            rows.chunks(batch_size).map(|c| Ok(c.to_vec())).collect();
571                        futures::stream::iter(chunks).boxed()
572                    }
573                    other => futures::stream::iter(vec![other]).boxed(),
574                },
575            );
576
577        let columns = if let Some(order) = projection_order {
578            Arc::new(order)
579        } else {
580            Arc::new(vec![])
581        };
582
583        Ok(QueryCursor::new(columns, Box::pin(row_stream)))
584    }
585
586    pub(crate) async fn execute_internal_with_config(
587        &self,
588        cypher: &str,
589        params: HashMap<String, ApiValue>,
590        config: UniConfig,
591    ) -> Result<QueryResult> {
592        let total_start = Instant::now();
593
594        // Single parse: extract time-travel clause if present
595        let parse_start = Instant::now();
596        let ast = uni_cypher::parse(cypher).map_err(into_parse_error)?;
597        let parse_time = parse_start.elapsed();
598
599        let (ast, tt_spec) = match ast {
600            uni_cypher::ast::Query::TimeTravel { query, spec } => (*query, Some(spec)),
601            other => (other, None),
602        };
603
604        if let Some(spec) = tt_spec {
605            uni_query::validate_read_only(&ast).map_err(|msg| into_query_error(msg, cypher))?;
606            // Resolve to snapshot and execute on pinned instance
607            let snapshot_id = self.resolve_time_travel(&spec).await?;
608            let pinned = self.at_snapshot(&snapshot_id).await?;
609            return pinned
610                .execute_ast_internal(ast, cypher, params, config)
611                .await;
612        }
613
614        let mut result = self
615            .execute_ast_internal(ast, cypher, params, config)
616            .await?;
617        result.update_parse_timing(parse_time, total_start.elapsed());
618        Ok(result)
619    }
620
621    /// Like `execute_internal_with_config` but also accepts a cancellation token.
622    pub(crate) async fn execute_internal_with_config_and_token(
623        &self,
624        cypher: &str,
625        params: HashMap<String, ApiValue>,
626        config: UniConfig,
627        cancellation_token: Option<tokio_util::sync::CancellationToken>,
628    ) -> Result<QueryResult> {
629        let total_start = Instant::now();
630
631        let parse_start = Instant::now();
632        let ast = uni_cypher::parse(cypher).map_err(into_parse_error)?;
633        let parse_time = parse_start.elapsed();
634
635        let (ast, tt_spec) = match ast {
636            uni_cypher::ast::Query::TimeTravel { query, spec } => (*query, Some(spec)),
637            other => (other, None),
638        };
639
640        if let Some(spec) = tt_spec {
641            uni_query::validate_read_only(&ast).map_err(|msg| into_query_error(msg, cypher))?;
642            let snapshot_id = self.resolve_time_travel(&spec).await?;
643            let pinned = self.at_snapshot(&snapshot_id).await?;
644            return pinned
645                .execute_ast_internal(ast, cypher, params, config)
646                .await;
647        }
648
649        let planner =
650            uni_query::QueryPlanner::new(self.schema.schema().clone()).with_params(params.clone());
651        let logical_plan = planner.plan(ast).map_err(|e| into_query_error(e, cypher))?;
652
653        let mut result = self
654            .execute_plan_internal(logical_plan, cypher, params, config, cancellation_token)
655            .await?;
656        result.update_parse_timing(parse_time, total_start.elapsed());
657        Ok(result)
658    }
659
660    /// Execute a pre-parsed Cypher AST with a private transaction L0 override.
661    pub(crate) async fn execute_ast_internal_with_tx_l0(
662        &self,
663        ast: uni_query::CypherQuery,
664        cypher: &str,
665        params: HashMap<String, ApiValue>,
666        config: UniConfig,
667        tx_l0: std::sync::Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>,
668    ) -> Result<QueryResult> {
669        let total_start = Instant::now();
670
671        let plan_start = Instant::now();
672        let planner =
673            uni_query::QueryPlanner::new(self.schema.schema().clone()).with_params(params.clone());
674        let logical_plan = planner.plan(ast).map_err(|e| into_query_error(e, cypher))?;
675        let plan_time = plan_start.elapsed();
676
677        let mut executor = uni_query::Executor::new(self.storage.clone());
678        executor.set_config(config.clone());
679        executor.set_xervo_runtime(self.xervo_runtime.clone());
680        executor.set_procedure_registry(self.procedure_registry.clone());
681        if let Ok(reg) = self.custom_functions.read()
682            && !reg.is_empty()
683        {
684            executor.set_custom_functions(Arc::new(reg.clone()));
685        }
686        if let Some(w) = &self.writer {
687            executor.set_writer(w.clone());
688        }
689        executor.set_transaction_l0(tx_l0);
690
691        let projection_order = extract_projection_order(&logical_plan);
692
693        let exec_start = Instant::now();
694        let results = executor
695            .execute(logical_plan, &self.properties, &params)
696            .await
697            .map_err(|e| into_execution_error(e, cypher))?;
698        let exec_time = exec_start.elapsed();
699
700        let columns = if results.is_empty() {
701            Arc::new(vec![])
702        } else if let Some(order) = projection_order {
703            Arc::new(order)
704        } else {
705            let mut cols: Vec<String> = results[0].keys().cloned().collect();
706            cols.sort();
707            Arc::new(cols)
708        };
709
710        let rows = results
711            .into_iter()
712            .map(|map| {
713                let mut values = Vec::with_capacity(columns.len());
714                for col in columns.iter() {
715                    let value = map.get(col).cloned().unwrap_or(ApiValue::Null);
716                    let normalized =
717                        ResultNormalizer::normalize_value(value).unwrap_or(ApiValue::Null);
718                    values.push(normalized);
719                }
720                Row::new(columns.clone(), values)
721            })
722            .collect::<Vec<Row>>();
723
724        let metrics = QueryMetrics {
725            parse_time: std::time::Duration::ZERO,
726            plan_time,
727            exec_time,
728            total_time: total_start.elapsed(),
729            rows_returned: rows.len(),
730            ..Default::default()
731        };
732
733        Ok(QueryResult::new(
734            columns,
735            rows,
736            executor.take_warnings(),
737            metrics,
738        ))
739    }
740
741    /// Execute a pre-parsed Cypher AST through the planner and executor.
742    ///
743    /// The `cypher` parameter is the original query string, used only for
744    /// error messages.
745    pub(crate) async fn execute_ast_internal(
746        &self,
747        ast: uni_query::CypherQuery,
748        cypher: &str,
749        params: HashMap<String, ApiValue>,
750        config: UniConfig,
751    ) -> Result<QueryResult> {
752        let total_start = Instant::now();
753        let deadline = total_start + config.query_timeout;
754
755        let plan_start = Instant::now();
756        let planner =
757            uni_query::QueryPlanner::new(self.schema.schema().clone()).with_params(params.clone());
758        let logical_plan = planner.plan(ast).map_err(|e| into_query_error(e, cypher))?;
759        let plan_time = plan_start.elapsed();
760
761        let mut executor = uni_query::Executor::new(self.storage.clone());
762        executor.set_config(config.clone());
763        executor.set_xervo_runtime(self.xervo_runtime.clone());
764        executor.set_procedure_registry(self.procedure_registry.clone());
765        if let Ok(reg) = self.custom_functions.read()
766            && !reg.is_empty()
767        {
768            executor.set_custom_functions(Arc::new(reg.clone()));
769        }
770        if let Some(w) = &self.writer {
771            executor.set_writer(w.clone());
772        }
773
774        let projection_order = extract_projection_order(&logical_plan);
775
776        let exec_start = Instant::now();
777        let timeout_duration = config.query_timeout;
778        let results = tokio::time::timeout(
779            timeout_duration,
780            executor.execute(logical_plan, &self.properties, &params),
781        )
782        .await
783        .map_err(|_| UniError::Query {
784            message: "Query timed out".to_string(),
785            query: Some(cypher.to_string()),
786        })?
787        .map_err(|e| into_execution_error(e, cypher))?;
788        let exec_time = exec_start.elapsed();
789
790        // Instant-based deadline check for sub-millisecond timeouts that
791        // tokio::time::timeout cannot catch due to timer wheel resolution.
792        if Instant::now() > deadline {
793            return Err(UniError::Query {
794                message: "Query timed out".to_string(),
795                query: Some(cypher.to_string()),
796            });
797        }
798
799        // Enforce per-query memory limit on the result set.
800        let max_mem = config.max_query_memory;
801        if max_mem > 0 {
802            let estimated_bytes: usize = results
803                .iter()
804                .map(|row| {
805                    row.values()
806                        .map(|v| std::mem::size_of_val(v) + 64)
807                        .sum::<usize>()
808                })
809                .sum();
810            if estimated_bytes > max_mem {
811                return Err(UniError::Query {
812                    message: format!(
813                        "Query exceeded memory limit ({} bytes > {} byte limit)",
814                        estimated_bytes, max_mem
815                    ),
816                    query: Some(cypher.to_string()),
817                });
818            }
819        }
820
821        let columns = if results.is_empty() {
822            Arc::new(vec![])
823        } else if let Some(order) = projection_order {
824            Arc::new(order)
825        } else {
826            let mut cols: Vec<String> = results[0].keys().cloned().collect();
827            cols.sort();
828            Arc::new(cols)
829        };
830
831        let rows = results
832            .into_iter()
833            .map(|map| {
834                let mut values = Vec::with_capacity(columns.len());
835                for col in columns.iter() {
836                    let value = map.get(col).cloned().unwrap_or(ApiValue::Null);
837                    let normalized =
838                        ResultNormalizer::normalize_value(value).unwrap_or(ApiValue::Null);
839                    values.push(normalized);
840                }
841                Row::new(columns.clone(), values)
842            })
843            .collect::<Vec<Row>>();
844
845        let metrics = QueryMetrics {
846            parse_time: std::time::Duration::ZERO,
847            plan_time,
848            exec_time,
849            total_time: total_start.elapsed(),
850            rows_returned: rows.len(),
851            ..Default::default()
852        };
853
854        Ok(QueryResult::new(
855            columns,
856            rows,
857            executor.take_warnings(),
858            metrics,
859        ))
860    }
861
862    /// Resolve a time-travel spec to a snapshot ID.
863    async fn resolve_time_travel(&self, spec: &uni_query::TimeTravelSpec) -> Result<String> {
864        match spec {
865            uni_query::TimeTravelSpec::Version(id) => Ok(id.clone()),
866            uni_query::TimeTravelSpec::Timestamp(ts_str) => {
867                let ts = chrono::DateTime::parse_from_rfc3339(ts_str)
868                    .map_err(|e| {
869                        into_parse_error(format!("Invalid timestamp '{}': {}", ts_str, e))
870                    })?
871                    .with_timezone(&chrono::Utc);
872                self.resolve_time_travel_timestamp(ts).await
873            }
874        }
875    }
876
877    /// Resolve a `chrono::DateTime<Utc>` to the snapshot ID of the closest
878    /// snapshot at or before that timestamp.
879    pub(crate) async fn resolve_time_travel_timestamp(
880        &self,
881        ts: chrono::DateTime<chrono::Utc>,
882    ) -> Result<String> {
883        let manifest = self
884            .storage
885            .snapshot_manager()
886            .find_snapshot_at_time(ts)
887            .await
888            .map_err(UniError::Internal)?
889            .ok_or_else(|| UniError::Query {
890                message: format!("No snapshot found at or before {}", ts),
891                query: None,
892            })?;
893        Ok(manifest.snapshot_id)
894    }
895
896    /// Execute a pre-built logical plan, skipping parse and plan phases.
897    ///
898    /// Used by the plan cache and prepared statements to re-execute
899    /// previously planned queries.
900    pub(crate) async fn execute_plan_internal(
901        &self,
902        plan: uni_query::LogicalPlan,
903        cypher: &str,
904        params: HashMap<String, ApiValue>,
905        config: UniConfig,
906        cancellation_token: Option<tokio_util::sync::CancellationToken>,
907    ) -> Result<QueryResult> {
908        let total_start = Instant::now();
909
910        let mut executor = uni_query::Executor::new(self.storage.clone());
911        executor.set_config(config.clone());
912        executor.set_xervo_runtime(self.xervo_runtime.clone());
913        executor.set_procedure_registry(self.procedure_registry.clone());
914        if let Ok(reg) = self.custom_functions.read()
915            && !reg.is_empty()
916        {
917            executor.set_custom_functions(Arc::new(reg.clone()));
918        }
919        if let Some(w) = &self.writer {
920            executor.set_writer(w.clone());
921        }
922        if let Some(token) = cancellation_token {
923            executor.set_cancellation_token(token);
924        }
925
926        let projection_order = extract_projection_order(&plan);
927
928        let exec_start = Instant::now();
929        let deadline = exec_start + config.query_timeout;
930        let timeout_duration = config.query_timeout;
931        let results = tokio::time::timeout(
932            timeout_duration,
933            executor.execute(plan, &self.properties, &params),
934        )
935        .await
936        .map_err(|_| UniError::Query {
937            message: "Query timed out".to_string(),
938            query: Some(cypher.to_string()),
939        })?
940        .map_err(|e| into_execution_error(e, cypher))?;
941        let exec_time = exec_start.elapsed();
942
943        if Instant::now() > deadline {
944            return Err(UniError::Query {
945                message: "Query timed out".to_string(),
946                query: Some(cypher.to_string()),
947            });
948        }
949
950        let max_mem = config.max_query_memory;
951        if max_mem > 0 {
952            let estimated_bytes: usize = results
953                .iter()
954                .map(|row| {
955                    row.values()
956                        .map(|v| std::mem::size_of_val(v) + 64)
957                        .sum::<usize>()
958                })
959                .sum();
960            if estimated_bytes > max_mem {
961                return Err(UniError::Query {
962                    message: format!(
963                        "Query exceeded memory limit ({} bytes > {} byte limit)",
964                        estimated_bytes, max_mem
965                    ),
966                    query: Some(cypher.to_string()),
967                });
968            }
969        }
970
971        let columns = if results.is_empty() {
972            Arc::new(vec![])
973        } else if let Some(order) = projection_order {
974            Arc::new(order)
975        } else {
976            let mut cols: Vec<String> = results[0].keys().cloned().collect();
977            cols.sort();
978            Arc::new(cols)
979        };
980
981        let rows: Vec<Row> = results
982            .into_iter()
983            .map(|map| {
984                let mut values = Vec::with_capacity(columns.len());
985                for col in columns.iter() {
986                    let value = map.get(col).cloned().unwrap_or(ApiValue::Null);
987                    let normalized =
988                        ResultNormalizer::normalize_value(value).unwrap_or(ApiValue::Null);
989                    values.push(normalized);
990                }
991                Row::new(columns.clone(), values)
992            })
993            .collect();
994
995        let metrics = QueryMetrics {
996            parse_time: std::time::Duration::ZERO,
997            plan_time: std::time::Duration::ZERO,
998            exec_time,
999            total_time: total_start.elapsed(),
1000            rows_returned: rows.len(),
1001            ..Default::default()
1002        };
1003
1004        Ok(QueryResult::new(
1005            columns,
1006            rows,
1007            executor.take_warnings(),
1008            metrics,
1009        ))
1010    }
1011}