1use futures::StreamExt;
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::Instant;
8use uni_common::{Result, UniConfig, UniError};
9use uni_query::{
10 ExplainOutput, LogicalPlan, ProfileOutput, QueryCursor, QueryMetrics, QueryResult,
11 ResultNormalizer, Row, Value as ApiValue,
12};
13
14fn normalize_error_message(raw: &str, cypher: &str) -> String {
19 let mut normalized = raw.to_string();
20 let cypher_upper = cypher.to_uppercase();
21 let cypher_lower = cypher.to_lowercase();
22
23 if raw.contains("Error during planning: UDF") && raw.contains("is not registered") {
24 normalized = format!("SyntaxError: UnknownFunction - {}", raw);
25 } else if raw.contains("_cypher_in(): second argument must be a list") {
26 normalized = format!("TypeError: InvalidArgumentType - {}", raw);
27 } else if raw.contains("InvalidNumberOfArguments: Procedure") && raw.contains("got 0") {
28 if cypher_upper.contains("YIELD") {
29 normalized = format!("SyntaxError: InvalidArgumentPassingMode - {}", raw);
30 } else {
31 normalized = format!("ParameterMissing: MissingParameter - {}", raw);
32 }
33 } else if raw.contains("Function count not implemented or is aggregate")
34 || raw.contains("Physical plan does not support logical expression AggregateFunction")
35 || raw.contains("Expected aggregate function, got: ListComprehension")
36 {
37 normalized = format!("SyntaxError: InvalidAggregation - {}", raw);
38 } else if raw.contains("Expected aggregate function, got: BinaryOp") {
39 normalized = format!("SyntaxError: AmbiguousAggregationExpression - {}", raw);
40 } else if raw.contains("Schema error: No field named \"me.age\". Valid fields are \"count(you.age)\".")
41 {
42 normalized = format!("SyntaxError: UndefinedVariable - {}", raw);
43 } else if raw.contains(
44 "Schema error: No field named \"me.age\". Valid fields are \"me.age + you.age\", \"count(*)\".",
45 ) {
46 normalized = format!("SyntaxError: AmbiguousAggregationExpression - {}", raw);
47 } else if raw.contains("MERGE edge must have a type")
48 || raw.contains("MERGE does not support multiple edge types")
49 {
50 normalized = format!("SyntaxError: NoSingleRelationshipType - {}", raw);
51 } else if raw.contains("MERGE node must have a label") {
52 if cypher.contains("$param") {
53 normalized = format!("SyntaxError: InvalidParameterUse - {}", raw);
54 } else if cypher.contains('*') && cypher.contains("-[:") {
55 normalized = format!("SyntaxError: CreatingVarLength - {}", raw);
56 } else if cypher_lower.contains("on create set x.")
57 || cypher_lower.contains("on match set x.")
58 {
59 normalized = format!("SyntaxError: UndefinedVariable - {}", raw);
60 }
61 }
62
63 normalized
64}
65
66pub(crate) fn into_parse_error(e: impl std::fmt::Display) -> UniError {
68 UniError::Parse {
69 message: e.to_string(),
70 position: None,
71 line: None,
72 column: None,
73 context: None,
74 }
75}
76
77pub(crate) fn into_query_error(e: impl std::fmt::Display, cypher: &str) -> UniError {
82 let msg = normalize_error_message(&e.to_string(), cypher);
83 if msg.starts_with("SyntaxError:") {
86 UniError::Parse {
87 message: msg,
88 position: None,
89 line: None,
90 column: None,
91 context: Some(cypher.to_string()),
92 }
93 } else {
94 UniError::Query {
95 message: msg,
96 query: Some(cypher.to_string()),
97 }
98 }
99}
100
101fn into_execution_error(e: impl std::fmt::Display, cypher: &str) -> UniError {
106 let msg = normalize_error_message(&e.to_string(), cypher);
107 if msg.contains("Query cancelled") {
108 UniError::Cancelled
109 } else if msg.contains("Query timed out") {
110 UniError::Query {
111 message: "Query timed out".to_string(),
112 query: Some(cypher.to_string()),
113 }
114 } else if msg.contains("Query exceeded memory limit") {
115 UniError::Query {
116 message: msg,
117 query: Some(cypher.to_string()),
118 }
119 } else if msg.contains("TypeError:") {
120 UniError::Type {
121 expected: msg,
122 actual: String::new(),
123 }
124 } else if msg.starts_with("ConstraintVerificationFailed:") {
125 UniError::Constraint { message: msg }
126 } else {
127 UniError::Query {
128 message: msg,
129 query: Some(cypher.to_string()),
130 }
131 }
132}
133
134fn extract_projection_order(plan: &LogicalPlan) -> Option<Vec<String>> {
137 match plan {
138 LogicalPlan::Project { projections, .. } => Some(
139 projections
140 .iter()
141 .map(|(expr, alias)| alias.clone().unwrap_or_else(|| expr.to_string_repr()))
142 .collect(),
143 ),
144 LogicalPlan::Aggregate {
145 group_by,
146 aggregates,
147 ..
148 } => {
149 let mut names: Vec<String> = group_by.iter().map(|e| e.to_string_repr()).collect();
150 names.extend(aggregates.iter().map(|e| e.to_string_repr()));
151 Some(names)
152 }
153 LogicalPlan::Limit { input, .. }
154 | LogicalPlan::Sort { input, .. }
155 | LogicalPlan::Filter { input, .. } => extract_projection_order(input),
156 _ => None,
157 }
158}
159
160impl crate::api::UniInner {
161 pub(crate) async fn get_mutation_count(&self) -> usize {
164 match self.writer.as_ref() {
165 Some(w) => {
166 let writer = w.read().await;
167 writer.l0_manager.get_current().read().mutation_count
168 }
169 None => 0,
170 }
171 }
172
173 #[allow(dead_code)] pub(crate) async fn get_mutation_stats(&self) -> uni_store::runtime::l0::MutationStats {
177 match self.writer.as_ref() {
178 Some(w) => {
179 let writer = w.read().await;
180 writer
181 .l0_manager
182 .get_current()
183 .read()
184 .mutation_stats
185 .clone()
186 }
187 None => uni_store::runtime::l0::MutationStats::default(),
188 }
189 }
190
191 pub(crate) async fn explain_internal(&self, cypher: &str) -> Result<ExplainOutput> {
193 let ast = uni_cypher::parse(cypher).map_err(into_parse_error)?;
194
195 let planner = uni_query::QueryPlanner::new(self.schema.schema().clone());
196 planner
197 .explain_plan(ast)
198 .map_err(|e| into_query_error(e, cypher))
199 }
200
201 pub(crate) async fn profile_internal(
203 &self,
204 cypher: &str,
205 params: HashMap<String, ApiValue>,
206 ) -> Result<(QueryResult, ProfileOutput)> {
207 let ast = uni_cypher::parse(cypher).map_err(into_parse_error)?;
208
209 let planner = uni_query::QueryPlanner::new(self.schema.schema().clone());
210 let logical_plan = planner.plan(ast).map_err(|e| into_query_error(e, cypher))?;
211
212 let mut executor = uni_query::Executor::new(self.storage.clone());
213 executor.set_config(self.config.clone());
214 executor.set_xervo_runtime(self.xervo_runtime.clone());
215 executor.set_procedure_registry(self.procedure_registry.clone());
216 if let Ok(reg) = self.custom_functions.read()
217 && !reg.is_empty()
218 {
219 executor.set_custom_functions(Arc::new(reg.clone()));
220 }
221 if let Some(w) = &self.writer {
222 executor.set_writer(w.clone());
223 }
224
225 let projection_order = extract_projection_order(&logical_plan);
227
228 let (results, profile_output) = executor
229 .profile(logical_plan, ¶ms)
230 .await
231 .map_err(|e| into_execution_error(e, cypher))?;
232
233 let columns = if results.is_empty() {
235 Arc::new(vec![])
236 } else if let Some(order) = projection_order {
237 Arc::new(order)
238 } else {
239 let mut cols: Vec<String> = results[0].keys().cloned().collect();
240 cols.sort();
241 Arc::new(cols)
242 };
243
244 let rows = results
245 .into_iter()
246 .map(|map| {
247 let mut values = Vec::with_capacity(columns.len());
248 for col in columns.iter() {
249 let value = map.get(col).cloned().unwrap_or(ApiValue::Null);
250 let normalized =
252 ResultNormalizer::normalize_value(value).unwrap_or(ApiValue::Null);
253 values.push(normalized);
254 }
255 Row::new(columns.clone(), values)
256 })
257 .collect();
258
259 Ok((
260 QueryResult::new(columns, rows, Vec::new(), Default::default()),
261 profile_output,
262 ))
263 }
264
265 pub(crate) async fn execute_cursor_internal_with_config(
266 &self,
267 cypher: &str,
268 params: HashMap<String, ApiValue>,
269 config: UniConfig,
270 ) -> Result<QueryCursor> {
271 let ast = uni_cypher::parse(cypher).map_err(into_parse_error)?;
272
273 let planner =
274 uni_query::QueryPlanner::new(self.schema.schema().clone()).with_params(params.clone());
275 let logical_plan = planner.plan(ast).map_err(|e| into_query_error(e, cypher))?;
276
277 let mut executor = uni_query::Executor::new(self.storage.clone());
278 executor.set_config(config.clone());
279 executor.set_xervo_runtime(self.xervo_runtime.clone());
280 executor.set_procedure_registry(self.procedure_registry.clone());
281 if let Ok(reg) = self.custom_functions.read()
282 && !reg.is_empty()
283 {
284 executor.set_custom_functions(Arc::new(reg.clone()));
285 }
286 if let Some(w) = &self.writer {
287 executor.set_writer(w.clone());
288 }
289
290 let projection_order = extract_projection_order(&logical_plan);
291 let projection_order_for_rows = projection_order.clone();
292 let cypher_for_error = cypher.to_string();
293 let batch_size = config.batch_size;
294
295 let stream = executor.execute_stream(logical_plan, self.properties.clone(), params);
296
297 let row_stream = stream
299 .map(move |batch_res| {
300 let results = batch_res.map_err(|e| {
301 let msg = normalize_error_message(&e.to_string(), &cypher_for_error);
302 if msg.contains("TypeError:") {
303 UniError::Type {
304 expected: msg,
305 actual: String::new(),
306 }
307 } else if msg.starts_with("ConstraintVerificationFailed:") {
308 UniError::Constraint { message: msg }
309 } else {
310 UniError::Query {
311 message: msg,
312 query: Some(cypher_for_error.clone()),
313 }
314 }
315 })?;
316
317 if results.is_empty() {
318 return Ok(vec![]);
319 }
320
321 let columns = if let Some(order) = &projection_order_for_rows {
323 Arc::new(order.clone())
324 } else {
325 let mut cols: Vec<String> = results[0].keys().cloned().collect();
326 cols.sort();
327 Arc::new(cols)
328 };
329
330 let rows = results
331 .into_iter()
332 .map(|map| {
333 let mut values = Vec::with_capacity(columns.len());
334 for col in columns.iter() {
335 let value = map.get(col).cloned().unwrap_or(ApiValue::Null);
336 values.push(value);
337 }
338 Row::new(columns.clone(), values)
339 })
340 .collect::<Vec<Row>>();
341
342 Ok(rows)
343 })
344 .flat_map(
346 move |batch_res: std::result::Result<Vec<Row>, UniError>| match batch_res {
347 Ok(rows) if batch_size > 0 => {
348 let chunks: Vec<_> =
349 rows.chunks(batch_size).map(|c| Ok(c.to_vec())).collect();
350 futures::stream::iter(chunks).boxed()
351 }
352 other => futures::stream::iter(vec![other]).boxed(),
353 },
354 );
355
356 let columns = if let Some(order) = projection_order {
358 Arc::new(order)
359 } else {
360 Arc::new(vec![])
361 };
362
363 Ok(QueryCursor::new(columns, Box::pin(row_stream)))
364 }
365
366 pub(crate) async fn execute_internal(
367 &self,
368 cypher: &str,
369 params: HashMap<String, ApiValue>,
370 ) -> Result<QueryResult> {
371 self.execute_internal_with_config(cypher, params, self.config.clone())
372 .await
373 }
374
375 pub(crate) async fn execute_internal_with_tx_l0(
379 &self,
380 cypher: &str,
381 params: HashMap<String, ApiValue>,
382 tx_l0: std::sync::Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>,
383 ) -> Result<QueryResult> {
384 let total_start = Instant::now();
385
386 let parse_start = Instant::now();
387 let ast = uni_cypher::parse(cypher).map_err(into_parse_error)?;
388 let parse_time = parse_start.elapsed();
389
390 let (ast, tt_spec) = match ast {
391 uni_cypher::ast::Query::TimeTravel { query, spec } => (*query, Some(spec)),
392 other => (other, None),
393 };
394
395 if tt_spec.is_some() {
396 return Err(UniError::Query {
397 message: "Time-travel queries are not supported within transactions".to_string(),
398 query: Some(cypher.to_string()),
399 });
400 }
401
402 let plan_start = Instant::now();
403 let planner =
404 uni_query::QueryPlanner::new(self.schema.schema().clone()).with_params(params.clone());
405 let logical_plan = planner.plan(ast).map_err(|e| into_query_error(e, cypher))?;
406 let plan_time = plan_start.elapsed();
407
408 let mut executor = uni_query::Executor::new(self.storage.clone());
409 executor.set_config(self.config.clone());
410 executor.set_xervo_runtime(self.xervo_runtime.clone());
411 executor.set_procedure_registry(self.procedure_registry.clone());
412 if let Ok(reg) = self.custom_functions.read()
413 && !reg.is_empty()
414 {
415 executor.set_custom_functions(Arc::new(reg.clone()));
416 }
417 if let Some(w) = &self.writer {
418 executor.set_writer(w.clone());
419 }
420 executor.set_transaction_l0(tx_l0);
421
422 let projection_order = extract_projection_order(&logical_plan);
423
424 let exec_start = Instant::now();
425 let results = executor
426 .execute(logical_plan, &self.properties, ¶ms)
427 .await
428 .map_err(|e| into_execution_error(e, cypher))?;
429 let exec_time = exec_start.elapsed();
430
431 let columns = if results.is_empty() {
432 Arc::new(vec![])
433 } else if let Some(order) = projection_order {
434 Arc::new(order)
435 } else {
436 let mut cols: Vec<String> = results[0].keys().cloned().collect();
437 cols.sort();
438 Arc::new(cols)
439 };
440
441 let rows: Vec<Row> = results
442 .into_iter()
443 .map(|map| {
444 let mut values = Vec::with_capacity(columns.len());
445 for col in columns.iter() {
446 let value = map.get(col).cloned().unwrap_or(ApiValue::Null);
447 let normalized =
448 ResultNormalizer::normalize_value(value).unwrap_or(ApiValue::Null);
449 values.push(normalized);
450 }
451 Row::new(columns.clone(), values)
452 })
453 .collect();
454
455 let metrics = QueryMetrics {
456 parse_time,
457 plan_time,
458 exec_time,
459 total_time: total_start.elapsed(),
460 rows_returned: rows.len(),
461 ..Default::default()
462 };
463
464 Ok(QueryResult::new(
465 columns,
466 rows,
467 executor.take_warnings(),
468 metrics,
469 ))
470 }
471
472 pub(crate) async fn execute_cursor_internal_with_tx_l0(
477 &self,
478 cypher: &str,
479 params: HashMap<String, ApiValue>,
480 tx_l0: std::sync::Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>,
481 ) -> Result<QueryCursor> {
482 let ast = uni_cypher::parse(cypher).map_err(into_parse_error)?;
483
484 let (ast, tt_spec) = match ast {
485 uni_cypher::ast::Query::TimeTravel { query, spec } => (*query, Some(spec)),
486 other => (other, None),
487 };
488
489 if tt_spec.is_some() {
490 return Err(UniError::Query {
491 message: "Time-travel queries are not supported within transactions".to_string(),
492 query: Some(cypher.to_string()),
493 });
494 }
495
496 let planner =
497 uni_query::QueryPlanner::new(self.schema.schema().clone()).with_params(params.clone());
498 let logical_plan = planner.plan(ast).map_err(|e| into_query_error(e, cypher))?;
499
500 let mut executor = uni_query::Executor::new(self.storage.clone());
501 executor.set_config(self.config.clone());
502 executor.set_xervo_runtime(self.xervo_runtime.clone());
503 executor.set_procedure_registry(self.procedure_registry.clone());
504 if let Ok(reg) = self.custom_functions.read()
505 && !reg.is_empty()
506 {
507 executor.set_custom_functions(Arc::new(reg.clone()));
508 }
509 if let Some(w) = &self.writer {
510 executor.set_writer(w.clone());
511 }
512 executor.set_transaction_l0(tx_l0);
513
514 let projection_order = extract_projection_order(&logical_plan);
515 let projection_order_for_rows = projection_order.clone();
516 let cypher_for_error = cypher.to_string();
517 let batch_size = self.config.batch_size;
518
519 let stream = executor.execute_stream(logical_plan, self.properties.clone(), params);
520
521 let row_stream = stream
522 .map(move |batch_res| {
523 let results = batch_res.map_err(|e| {
524 let msg = normalize_error_message(&e.to_string(), &cypher_for_error);
525 if msg.contains("TypeError:") {
526 UniError::Type {
527 expected: msg,
528 actual: String::new(),
529 }
530 } else if msg.starts_with("ConstraintVerificationFailed:") {
531 UniError::Constraint { message: msg }
532 } else {
533 UniError::Query {
534 message: msg,
535 query: Some(cypher_for_error.clone()),
536 }
537 }
538 })?;
539
540 if results.is_empty() {
541 return Ok(vec![]);
542 }
543
544 let columns = if let Some(order) = &projection_order_for_rows {
545 Arc::new(order.clone())
546 } else {
547 let mut cols: Vec<String> = results[0].keys().cloned().collect();
548 cols.sort();
549 Arc::new(cols)
550 };
551
552 let rows = results
553 .into_iter()
554 .map(|map| {
555 let mut values = Vec::with_capacity(columns.len());
556 for col in columns.iter() {
557 let value = map.get(col).cloned().unwrap_or(ApiValue::Null);
558 values.push(value);
559 }
560 Row::new(columns.clone(), values)
561 })
562 .collect::<Vec<Row>>();
563
564 Ok(rows)
565 })
566 .flat_map(
567 move |batch_res: std::result::Result<Vec<Row>, UniError>| match batch_res {
568 Ok(rows) if batch_size > 0 => {
569 let chunks: Vec<_> =
570 rows.chunks(batch_size).map(|c| Ok(c.to_vec())).collect();
571 futures::stream::iter(chunks).boxed()
572 }
573 other => futures::stream::iter(vec![other]).boxed(),
574 },
575 );
576
577 let columns = if let Some(order) = projection_order {
578 Arc::new(order)
579 } else {
580 Arc::new(vec![])
581 };
582
583 Ok(QueryCursor::new(columns, Box::pin(row_stream)))
584 }
585
586 pub(crate) async fn execute_internal_with_config(
587 &self,
588 cypher: &str,
589 params: HashMap<String, ApiValue>,
590 config: UniConfig,
591 ) -> Result<QueryResult> {
592 let total_start = Instant::now();
593
594 let parse_start = Instant::now();
596 let ast = uni_cypher::parse(cypher).map_err(into_parse_error)?;
597 let parse_time = parse_start.elapsed();
598
599 let (ast, tt_spec) = match ast {
600 uni_cypher::ast::Query::TimeTravel { query, spec } => (*query, Some(spec)),
601 other => (other, None),
602 };
603
604 if let Some(spec) = tt_spec {
605 uni_query::validate_read_only(&ast).map_err(|msg| into_query_error(msg, cypher))?;
606 let snapshot_id = self.resolve_time_travel(&spec).await?;
608 let pinned = self.at_snapshot(&snapshot_id).await?;
609 return pinned
610 .execute_ast_internal(ast, cypher, params, config)
611 .await;
612 }
613
614 let mut result = self
615 .execute_ast_internal(ast, cypher, params, config)
616 .await?;
617 result.update_parse_timing(parse_time, total_start.elapsed());
618 Ok(result)
619 }
620
621 pub(crate) async fn execute_internal_with_config_and_token(
623 &self,
624 cypher: &str,
625 params: HashMap<String, ApiValue>,
626 config: UniConfig,
627 cancellation_token: Option<tokio_util::sync::CancellationToken>,
628 ) -> Result<QueryResult> {
629 let total_start = Instant::now();
630
631 let parse_start = Instant::now();
632 let ast = uni_cypher::parse(cypher).map_err(into_parse_error)?;
633 let parse_time = parse_start.elapsed();
634
635 let (ast, tt_spec) = match ast {
636 uni_cypher::ast::Query::TimeTravel { query, spec } => (*query, Some(spec)),
637 other => (other, None),
638 };
639
640 if let Some(spec) = tt_spec {
641 uni_query::validate_read_only(&ast).map_err(|msg| into_query_error(msg, cypher))?;
642 let snapshot_id = self.resolve_time_travel(&spec).await?;
643 let pinned = self.at_snapshot(&snapshot_id).await?;
644 return pinned
645 .execute_ast_internal(ast, cypher, params, config)
646 .await;
647 }
648
649 let planner =
650 uni_query::QueryPlanner::new(self.schema.schema().clone()).with_params(params.clone());
651 let logical_plan = planner.plan(ast).map_err(|e| into_query_error(e, cypher))?;
652
653 let mut result = self
654 .execute_plan_internal(logical_plan, cypher, params, config, cancellation_token)
655 .await?;
656 result.update_parse_timing(parse_time, total_start.elapsed());
657 Ok(result)
658 }
659
660 pub(crate) async fn execute_ast_internal_with_tx_l0(
662 &self,
663 ast: uni_query::CypherQuery,
664 cypher: &str,
665 params: HashMap<String, ApiValue>,
666 config: UniConfig,
667 tx_l0: std::sync::Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>,
668 ) -> Result<QueryResult> {
669 let total_start = Instant::now();
670
671 let plan_start = Instant::now();
672 let planner =
673 uni_query::QueryPlanner::new(self.schema.schema().clone()).with_params(params.clone());
674 let logical_plan = planner.plan(ast).map_err(|e| into_query_error(e, cypher))?;
675 let plan_time = plan_start.elapsed();
676
677 let mut executor = uni_query::Executor::new(self.storage.clone());
678 executor.set_config(config.clone());
679 executor.set_xervo_runtime(self.xervo_runtime.clone());
680 executor.set_procedure_registry(self.procedure_registry.clone());
681 if let Ok(reg) = self.custom_functions.read()
682 && !reg.is_empty()
683 {
684 executor.set_custom_functions(Arc::new(reg.clone()));
685 }
686 if let Some(w) = &self.writer {
687 executor.set_writer(w.clone());
688 }
689 executor.set_transaction_l0(tx_l0);
690
691 let projection_order = extract_projection_order(&logical_plan);
692
693 let exec_start = Instant::now();
694 let results = executor
695 .execute(logical_plan, &self.properties, ¶ms)
696 .await
697 .map_err(|e| into_execution_error(e, cypher))?;
698 let exec_time = exec_start.elapsed();
699
700 let columns = if results.is_empty() {
701 Arc::new(vec![])
702 } else if let Some(order) = projection_order {
703 Arc::new(order)
704 } else {
705 let mut cols: Vec<String> = results[0].keys().cloned().collect();
706 cols.sort();
707 Arc::new(cols)
708 };
709
710 let rows = results
711 .into_iter()
712 .map(|map| {
713 let mut values = Vec::with_capacity(columns.len());
714 for col in columns.iter() {
715 let value = map.get(col).cloned().unwrap_or(ApiValue::Null);
716 let normalized =
717 ResultNormalizer::normalize_value(value).unwrap_or(ApiValue::Null);
718 values.push(normalized);
719 }
720 Row::new(columns.clone(), values)
721 })
722 .collect::<Vec<Row>>();
723
724 let metrics = QueryMetrics {
725 parse_time: std::time::Duration::ZERO,
726 plan_time,
727 exec_time,
728 total_time: total_start.elapsed(),
729 rows_returned: rows.len(),
730 ..Default::default()
731 };
732
733 Ok(QueryResult::new(
734 columns,
735 rows,
736 executor.take_warnings(),
737 metrics,
738 ))
739 }
740
741 pub(crate) async fn execute_ast_internal(
746 &self,
747 ast: uni_query::CypherQuery,
748 cypher: &str,
749 params: HashMap<String, ApiValue>,
750 config: UniConfig,
751 ) -> Result<QueryResult> {
752 let total_start = Instant::now();
753 let deadline = total_start + config.query_timeout;
754
755 let plan_start = Instant::now();
756 let planner =
757 uni_query::QueryPlanner::new(self.schema.schema().clone()).with_params(params.clone());
758 let logical_plan = planner.plan(ast).map_err(|e| into_query_error(e, cypher))?;
759 let plan_time = plan_start.elapsed();
760
761 let mut executor = uni_query::Executor::new(self.storage.clone());
762 executor.set_config(config.clone());
763 executor.set_xervo_runtime(self.xervo_runtime.clone());
764 executor.set_procedure_registry(self.procedure_registry.clone());
765 if let Ok(reg) = self.custom_functions.read()
766 && !reg.is_empty()
767 {
768 executor.set_custom_functions(Arc::new(reg.clone()));
769 }
770 if let Some(w) = &self.writer {
771 executor.set_writer(w.clone());
772 }
773
774 let projection_order = extract_projection_order(&logical_plan);
775
776 let exec_start = Instant::now();
777 let timeout_duration = config.query_timeout;
778 let results = tokio::time::timeout(
779 timeout_duration,
780 executor.execute(logical_plan, &self.properties, ¶ms),
781 )
782 .await
783 .map_err(|_| UniError::Query {
784 message: "Query timed out".to_string(),
785 query: Some(cypher.to_string()),
786 })?
787 .map_err(|e| into_execution_error(e, cypher))?;
788 let exec_time = exec_start.elapsed();
789
790 if Instant::now() > deadline {
793 return Err(UniError::Query {
794 message: "Query timed out".to_string(),
795 query: Some(cypher.to_string()),
796 });
797 }
798
799 let max_mem = config.max_query_memory;
801 if max_mem > 0 {
802 let estimated_bytes: usize = results
803 .iter()
804 .map(|row| {
805 row.values()
806 .map(|v| std::mem::size_of_val(v) + 64)
807 .sum::<usize>()
808 })
809 .sum();
810 if estimated_bytes > max_mem {
811 return Err(UniError::Query {
812 message: format!(
813 "Query exceeded memory limit ({} bytes > {} byte limit)",
814 estimated_bytes, max_mem
815 ),
816 query: Some(cypher.to_string()),
817 });
818 }
819 }
820
821 let columns = if results.is_empty() {
822 Arc::new(vec![])
823 } else if let Some(order) = projection_order {
824 Arc::new(order)
825 } else {
826 let mut cols: Vec<String> = results[0].keys().cloned().collect();
827 cols.sort();
828 Arc::new(cols)
829 };
830
831 let rows = results
832 .into_iter()
833 .map(|map| {
834 let mut values = Vec::with_capacity(columns.len());
835 for col in columns.iter() {
836 let value = map.get(col).cloned().unwrap_or(ApiValue::Null);
837 let normalized =
838 ResultNormalizer::normalize_value(value).unwrap_or(ApiValue::Null);
839 values.push(normalized);
840 }
841 Row::new(columns.clone(), values)
842 })
843 .collect::<Vec<Row>>();
844
845 let metrics = QueryMetrics {
846 parse_time: std::time::Duration::ZERO,
847 plan_time,
848 exec_time,
849 total_time: total_start.elapsed(),
850 rows_returned: rows.len(),
851 ..Default::default()
852 };
853
854 Ok(QueryResult::new(
855 columns,
856 rows,
857 executor.take_warnings(),
858 metrics,
859 ))
860 }
861
862 async fn resolve_time_travel(&self, spec: &uni_query::TimeTravelSpec) -> Result<String> {
864 match spec {
865 uni_query::TimeTravelSpec::Version(id) => Ok(id.clone()),
866 uni_query::TimeTravelSpec::Timestamp(ts_str) => {
867 let ts = chrono::DateTime::parse_from_rfc3339(ts_str)
868 .map_err(|e| {
869 into_parse_error(format!("Invalid timestamp '{}': {}", ts_str, e))
870 })?
871 .with_timezone(&chrono::Utc);
872 self.resolve_time_travel_timestamp(ts).await
873 }
874 }
875 }
876
877 pub(crate) async fn resolve_time_travel_timestamp(
880 &self,
881 ts: chrono::DateTime<chrono::Utc>,
882 ) -> Result<String> {
883 let manifest = self
884 .storage
885 .snapshot_manager()
886 .find_snapshot_at_time(ts)
887 .await
888 .map_err(UniError::Internal)?
889 .ok_or_else(|| UniError::Query {
890 message: format!("No snapshot found at or before {}", ts),
891 query: None,
892 })?;
893 Ok(manifest.snapshot_id)
894 }
895
896 pub(crate) async fn execute_plan_internal(
901 &self,
902 plan: uni_query::LogicalPlan,
903 cypher: &str,
904 params: HashMap<String, ApiValue>,
905 config: UniConfig,
906 cancellation_token: Option<tokio_util::sync::CancellationToken>,
907 ) -> Result<QueryResult> {
908 let total_start = Instant::now();
909
910 let mut executor = uni_query::Executor::new(self.storage.clone());
911 executor.set_config(config.clone());
912 executor.set_xervo_runtime(self.xervo_runtime.clone());
913 executor.set_procedure_registry(self.procedure_registry.clone());
914 if let Ok(reg) = self.custom_functions.read()
915 && !reg.is_empty()
916 {
917 executor.set_custom_functions(Arc::new(reg.clone()));
918 }
919 if let Some(w) = &self.writer {
920 executor.set_writer(w.clone());
921 }
922 if let Some(token) = cancellation_token {
923 executor.set_cancellation_token(token);
924 }
925
926 let projection_order = extract_projection_order(&plan);
927
928 let exec_start = Instant::now();
929 let deadline = exec_start + config.query_timeout;
930 let timeout_duration = config.query_timeout;
931 let results = tokio::time::timeout(
932 timeout_duration,
933 executor.execute(plan, &self.properties, ¶ms),
934 )
935 .await
936 .map_err(|_| UniError::Query {
937 message: "Query timed out".to_string(),
938 query: Some(cypher.to_string()),
939 })?
940 .map_err(|e| into_execution_error(e, cypher))?;
941 let exec_time = exec_start.elapsed();
942
943 if Instant::now() > deadline {
944 return Err(UniError::Query {
945 message: "Query timed out".to_string(),
946 query: Some(cypher.to_string()),
947 });
948 }
949
950 let max_mem = config.max_query_memory;
951 if max_mem > 0 {
952 let estimated_bytes: usize = results
953 .iter()
954 .map(|row| {
955 row.values()
956 .map(|v| std::mem::size_of_val(v) + 64)
957 .sum::<usize>()
958 })
959 .sum();
960 if estimated_bytes > max_mem {
961 return Err(UniError::Query {
962 message: format!(
963 "Query exceeded memory limit ({} bytes > {} byte limit)",
964 estimated_bytes, max_mem
965 ),
966 query: Some(cypher.to_string()),
967 });
968 }
969 }
970
971 let columns = if results.is_empty() {
972 Arc::new(vec![])
973 } else if let Some(order) = projection_order {
974 Arc::new(order)
975 } else {
976 let mut cols: Vec<String> = results[0].keys().cloned().collect();
977 cols.sort();
978 Arc::new(cols)
979 };
980
981 let rows: Vec<Row> = results
982 .into_iter()
983 .map(|map| {
984 let mut values = Vec::with_capacity(columns.len());
985 for col in columns.iter() {
986 let value = map.get(col).cloned().unwrap_or(ApiValue::Null);
987 let normalized =
988 ResultNormalizer::normalize_value(value).unwrap_or(ApiValue::Null);
989 values.push(normalized);
990 }
991 Row::new(columns.clone(), values)
992 })
993 .collect();
994
995 let metrics = QueryMetrics {
996 parse_time: std::time::Duration::ZERO,
997 plan_time: std::time::Duration::ZERO,
998 exec_time,
999 total_time: total_start.elapsed(),
1000 rows_returned: rows.len(),
1001 ..Default::default()
1002 };
1003
1004 Ok(QueryResult::new(
1005 columns,
1006 rows,
1007 executor.take_warnings(),
1008 metrics,
1009 ))
1010 }
1011}