1use anyhow::{Result, anyhow};
5use std::collections::{HashMap, HashSet};
6use std::sync::Arc;
7use std::time::Instant;
8use tokio::sync::RwLock;
9use uni_algo::algo::AlgorithmRegistry;
10use uni_common::{TemporalValue, Value};
11use uni_cypher::ast::{BinaryOp, Expr};
12use uni_store::PropertyManager;
13use uni_store::QueryContext;
14use uni_store::runtime::l0_manager::L0Manager;
15use uni_store::runtime::writer::Writer;
16use uni_store::storage::manager::StorageManager;
17use uni_xervo::runtime::ModelRuntime;
18
19use crate::query::expr_eval::eval_binary_op;
20use crate::types::QueryWarning;
21
22use super::procedure::ProcedureRegistry;
23
24#[derive(Debug)]
26pub(crate) enum Accumulator {
27 Count(i64),
28 Sum(f64),
29 Min(Option<Value>),
30 Max(Option<Value>),
31 Avg { sum: f64, count: i64 },
32 Collect(Vec<Value>),
33 CountDistinct(HashSet<String>),
34 PercentileDisc { values: Vec<f64>, percentile: f64 },
35 PercentileCont { values: Vec<f64>, percentile: f64 },
36}
37
38fn numeric_to_value(val: f64) -> Value {
40 if val.fract() == 0.0 && val >= i64::MIN as f64 && val <= i64::MAX as f64 {
41 Value::Int(val as i64)
42 } else {
43 Value::Float(val)
44 }
45}
46
47fn cypher_type_rank(val: &Value) -> u8 {
56 match val {
57 Value::Null => 0,
58 Value::List(_) => 1,
59 Value::String(_) => 2,
60 Value::Bool(_) => 3,
61 Value::Int(_) | Value::Float(_) => 4,
62 _ => 5,
63 }
64}
65
66fn cypher_cross_type_cmp(a: &Value, b: &Value) -> std::cmp::Ordering {
68 use std::cmp::Ordering;
69 let ra = cypher_type_rank(a);
70 let rb = cypher_type_rank(b);
71 if ra != rb {
72 return ra.cmp(&rb);
73 }
74 match (a, b) {
75 (Value::Int(l), Value::Int(r)) => l.cmp(r),
76 (Value::Float(l), Value::Float(r)) => l.partial_cmp(r).unwrap_or(Ordering::Equal),
77 (Value::Int(l), Value::Float(r)) => (*l as f64).partial_cmp(r).unwrap_or(Ordering::Equal),
78 (Value::Float(l), Value::Int(r)) => l.partial_cmp(&(*r as f64)).unwrap_or(Ordering::Equal),
79 (Value::String(l), Value::String(r)) => l.cmp(r),
80 (Value::Bool(l), Value::Bool(r)) => l.cmp(r),
81 _ => Ordering::Equal,
82 }
83}
84
85impl Accumulator {
86 pub(crate) fn new(op: &str, distinct: bool) -> Self {
87 Self::new_with_percentile(op, distinct, 0.0)
88 }
89
90 pub(crate) fn new_with_percentile(op: &str, distinct: bool, percentile: f64) -> Self {
91 let op_upper = op.to_uppercase();
92 match op_upper.as_str() {
93 "COUNT" if distinct => Accumulator::CountDistinct(HashSet::new()),
94 "COUNT" => Accumulator::Count(0),
95 "SUM" => Accumulator::Sum(0.0),
96 "MIN" => Accumulator::Min(None),
97 "MAX" => Accumulator::Max(None),
98 "AVG" => Accumulator::Avg { sum: 0.0, count: 0 },
99 "COLLECT" => Accumulator::Collect(Vec::new()),
100 "PERCENTILEDISC" => Accumulator::PercentileDisc {
101 values: Vec::new(),
102 percentile,
103 },
104 "PERCENTILECONT" => Accumulator::PercentileCont {
105 values: Vec::new(),
106 percentile,
107 },
108 _ => Accumulator::Count(0),
109 }
110 }
111
112 pub(crate) fn update(&mut self, val: &Value, is_wildcard: bool) {
113 match self {
114 Accumulator::Count(c) => {
115 if is_wildcard || !val.is_null() {
116 *c += 1;
117 }
118 }
119 Accumulator::Sum(s) => {
120 if let Some(n) = val.as_f64() {
121 *s += n;
122 }
123 }
124 Accumulator::Min(current) => {
125 if !val.is_null() {
126 *current = Some(match current.take() {
127 None => val.clone(),
128 Some(cur) if cypher_cross_type_cmp(val, &cur).is_lt() => val.clone(),
129 Some(cur) => cur,
130 });
131 }
132 }
133 Accumulator::Max(current) => {
134 if !val.is_null() {
135 *current = Some(match current.take() {
136 None => val.clone(),
137 Some(cur) if cypher_cross_type_cmp(val, &cur).is_gt() => val.clone(),
138 Some(cur) => cur,
139 });
140 }
141 }
142 Accumulator::Avg { sum, count } => {
143 if let Some(n) = val.as_f64() {
144 *sum += n;
145 *count += 1;
146 }
147 }
148 Accumulator::Collect(v) => {
149 if !val.is_null() {
150 v.push(val.clone());
151 }
152 }
153 Accumulator::CountDistinct(s) => {
154 if !val.is_null() {
155 s.insert(val.to_string());
156 }
157 }
158 Accumulator::PercentileDisc { values, .. }
159 | Accumulator::PercentileCont { values, .. } => {
160 if let Some(n) = val.as_f64() {
161 values.push(n);
162 }
163 }
164 }
165 }
166
167 pub(crate) fn finish(&self) -> Value {
168 match self {
169 Accumulator::Count(c) => Value::Int(*c),
170 Accumulator::Sum(s) => numeric_to_value(*s),
171 Accumulator::Min(opt) => opt.as_ref().cloned().unwrap_or(Value::Null),
172 Accumulator::Max(opt) => opt.as_ref().cloned().unwrap_or(Value::Null),
173 Accumulator::Avg { sum, count } => {
174 if *count > 0 {
175 Value::Float(*sum / (*count as f64))
176 } else {
177 Value::Null
178 }
179 }
180 Accumulator::Collect(v) => Value::List(v.clone()),
181 Accumulator::CountDistinct(s) => Value::Int(s.len() as i64),
182 Accumulator::PercentileDisc { values, percentile } => {
183 if values.is_empty() {
184 return Value::Null;
185 }
186 let mut sorted = values.clone();
187 sorted.sort_by(|a, b| a.total_cmp(b));
188 let n = sorted.len();
189 let idx = (percentile * (n as f64 - 1.0)).round() as usize;
190 numeric_to_value(sorted[idx.min(n - 1)])
191 }
192 Accumulator::PercentileCont { values, percentile } => {
193 if values.is_empty() {
194 return Value::Null;
195 }
196 let mut sorted = values.clone();
197 sorted.sort_by(|a, b| a.total_cmp(b));
198 let n = sorted.len();
199 if n == 1 {
200 return Value::Float(sorted[0]);
201 }
202 let pos = percentile * (n as f64 - 1.0);
203 let lower = (pos.floor() as usize).min(n - 1);
204 let upper = (pos.ceil() as usize).min(n - 1);
205 if lower == upper {
206 Value::Float(sorted[lower])
207 } else {
208 let frac = pos - lower as f64;
209 Value::Float(sorted[lower] + frac * (sorted[upper] - sorted[lower]))
210 }
211 }
212 }
213 }
214}
215
216pub(crate) type GenExprCacheKey = (String, String);
218
219pub struct Executor {
238 pub(crate) storage: Arc<StorageManager>,
239 pub(crate) writer: Option<Arc<Writer>>,
240 pub(crate) l0_manager: Option<Arc<L0Manager>>,
241 pub(crate) algo_registry: Arc<AlgorithmRegistry>,
242 pub(crate) use_transaction: bool,
243 pub(crate) file_sandbox: uni_common::config::FileSandboxConfig,
245 pub(crate) config: uni_common::config::UniConfig,
246 pub(crate) gen_expr_cache: Arc<RwLock<HashMap<GenExprCacheKey, Expr>>>,
248 pub(crate) procedure_registry: Option<Arc<ProcedureRegistry>>,
250 pub(crate) xervo_runtime: Option<Arc<ModelRuntime>>,
252 pub(crate) warnings: Arc<std::sync::Mutex<Vec<QueryWarning>>>,
254 pub(crate) transaction_l0_override:
258 Option<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
259 pub(crate) id_reservoir: Option<Arc<uni_store::runtime::TxIdReservoir>>,
263 pub(crate) custom_function_registry:
265 Option<Arc<uni_query_functions::custom_functions::CustomFunctionRegistry>>,
266 pub(crate) cancellation_token: Option<tokio_util::sync::CancellationToken>,
269 pub(crate) prop_manager_arc: Option<Arc<PropertyManager>>,
275 pub(crate) df_session_template: Option<Arc<datafusion::execution::context::SessionContext>>,
281 pub(crate) read_snapshot: Option<uni_store::runtime::SnapshotView>,
286}
287
288impl std::fmt::Debug for Executor {
289 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
290 f.debug_struct("Executor")
291 .field("use_transaction", &self.use_transaction)
292 .field("has_writer", &self.writer.is_some())
293 .field("has_l0_manager", &self.l0_manager.is_some())
294 .field("has_xervo_runtime", &self.xervo_runtime.is_some())
295 .finish_non_exhaustive()
296 }
297}
298
299impl Clone for Executor {
300 fn clone(&self) -> Self {
305 Self {
306 storage: self.storage.clone(),
307 writer: self.writer.clone(),
308 l0_manager: self.l0_manager.clone(),
309 algo_registry: self.algo_registry.clone(),
310 use_transaction: self.use_transaction,
311 file_sandbox: self.file_sandbox.clone(),
312 config: self.config.clone(),
313 gen_expr_cache: self.gen_expr_cache.clone(),
314 procedure_registry: self.procedure_registry.clone(),
315 xervo_runtime: self.xervo_runtime.clone(),
316 warnings: Arc::new(std::sync::Mutex::new(Vec::new())),
318 transaction_l0_override: self.transaction_l0_override.clone(),
319 id_reservoir: self.id_reservoir.clone(),
320 custom_function_registry: self.custom_function_registry.clone(),
321 cancellation_token: self.cancellation_token.clone(),
322 prop_manager_arc: self.prop_manager_arc.clone(),
323 df_session_template: self.df_session_template.clone(),
324 read_snapshot: self.read_snapshot.clone(),
325 }
326 }
327}
328
329impl Executor {
330 pub fn new(storage: Arc<StorageManager>) -> Self {
332 let proc_registry = Arc::new(ProcedureRegistry::new());
337 proc_registry.set_plugin_registry(crate::procedures_plugin::default_host_plugin_registry());
338 Self {
339 storage,
340 writer: None,
341 l0_manager: None,
342 algo_registry: Arc::new(AlgorithmRegistry::new()),
343 use_transaction: false,
344 file_sandbox: uni_common::config::FileSandboxConfig::default(),
345 config: uni_common::config::UniConfig::default(),
346 gen_expr_cache: Arc::new(RwLock::new(HashMap::new())),
347 procedure_registry: Some(proc_registry),
348 xervo_runtime: None,
349 warnings: Arc::new(std::sync::Mutex::new(Vec::new())),
350 transaction_l0_override: None,
351 id_reservoir: None,
352 custom_function_registry: None,
353 cancellation_token: None,
354 prop_manager_arc: None,
355 df_session_template: None,
356 read_snapshot: None,
357 }
358 }
359
360 pub fn set_prop_manager(&mut self, pm: Arc<PropertyManager>) {
366 self.prop_manager_arc = Some(pm);
367 }
368
369 pub fn set_df_session_template(
376 &mut self,
377 tmpl: Arc<datafusion::execution::context::SessionContext>,
378 ) {
379 self.df_session_template = Some(tmpl);
380 }
381
382 pub fn new_with_writer(storage: Arc<StorageManager>, writer: Arc<Writer>) -> Self {
384 let mut executor = Self::new(storage);
385 executor.writer = Some(writer);
386 executor
387 }
388
389 pub fn set_procedure_registry(&mut self, registry: Arc<ProcedureRegistry>) {
391 self.procedure_registry = Some(registry);
392 }
393
394 pub fn set_xervo_runtime(&mut self, runtime: Option<Arc<ModelRuntime>>) {
396 self.xervo_runtime = runtime;
397 }
398
399 pub fn set_file_sandbox(&mut self, sandbox: uni_common::config::FileSandboxConfig) {
402 self.file_sandbox = sandbox;
403 }
404
405 pub fn set_config(&mut self, config: uni_common::config::UniConfig) {
407 self.config = config;
408 }
409
410 pub(crate) fn validate_path(&self, path: &str) -> Result<std::path::PathBuf> {
412 self.file_sandbox
413 .validate_path(path)
414 .map_err(|e| anyhow!("Path validation failed: {}", e))
415 }
416
417 pub fn set_writer(&mut self, writer: Arc<Writer>) {
419 self.writer = Some(writer);
420 }
421
422 pub fn take_warnings(&self) -> Vec<QueryWarning> {
424 self.warnings
425 .lock()
426 .map(|mut w| std::mem::take(&mut *w))
427 .unwrap_or_default()
428 }
429
430 pub fn set_use_transaction(&mut self, use_transaction: bool) {
432 self.use_transaction = use_transaction;
433 }
434
435 pub fn set_transaction_l0(
438 &mut self,
439 l0: Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>,
440 ) {
441 self.transaction_l0_override = Some(l0);
442 }
443
444 pub fn set_read_snapshot(&mut self, snapshot: Option<uni_store::runtime::SnapshotView>) {
450 self.read_snapshot = snapshot;
451 }
452
453 pub fn set_id_reservoir(&mut self, r: Arc<uni_store::runtime::TxIdReservoir>) {
457 self.id_reservoir = Some(r);
458 }
459
460 pub fn set_custom_functions(
462 &mut self,
463 registry: Arc<uni_query_functions::custom_functions::CustomFunctionRegistry>,
464 ) {
465 self.custom_function_registry = Some(registry);
466 }
467
468 pub fn set_cancellation_token(&mut self, token: tokio_util::sync::CancellationToken) {
470 self.cancellation_token = Some(token);
471 }
472
473 pub(crate) async fn get_context(&self) -> Option<QueryContext> {
478 if let Some(writer) = &self.writer {
479 let tx_l0 = self.transaction_l0_override.clone();
481 let mut ctx = match &self.read_snapshot {
486 Some(snap) => {
487 QueryContext::new_with_pending(snap.main.clone(), tx_l0, snap.extra.clone())
488 }
489 None => QueryContext::new_with_pending(
490 writer.l0_manager.get_current(),
491 tx_l0,
492 writer.l0_manager.get_pending_flush(),
493 ),
494 };
495 ctx.set_deadline(Instant::now() + self.config.query_timeout);
496 if let Some(ref token) = self.cancellation_token {
497 ctx.set_cancellation_token(token.clone());
498 }
499 Some(ctx)
500 } else {
501 self.l0_manager.as_ref().map(|m| {
502 let mut ctx = QueryContext::new(m.get_current());
503 ctx.set_deadline(Instant::now() + self.config.query_timeout);
504 if let Some(ref token) = self.cancellation_token {
505 ctx.set_cancellation_token(token.clone());
506 }
507 ctx
508 })
509 }
510 }
511
512 pub(crate) fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
520 use std::cmp::Ordering;
521
522 let temporal_a = Self::extract_temporal_value(a);
523 let temporal_b = Self::extract_temporal_value(b);
524
525 if let (Some(ta), Some(tb)) = (&temporal_a, &temporal_b) {
526 return Self::compare_temporal(ta, tb);
527 }
528
529 if matches!(
532 (a, b),
533 (Value::String(_), Value::Temporal(_)) | (Value::Temporal(_), Value::String(_))
534 ) && let Some(ord) = Self::try_eval_ordering(a, b)
535 {
536 return ord;
537 }
538 if let (Value::String(_), Some(tb)) = (a, temporal_b)
539 && let Some(ord) = Self::try_eval_ordering(a, &Value::Temporal(tb))
540 {
541 return ord;
542 }
543 if let (Some(ta), Value::String(_)) = (temporal_a, b)
544 && let Some(ord) = Self::try_eval_ordering(&Value::Temporal(ta), b)
545 {
546 return ord;
547 }
548
549 let ra = Self::order_by_type_rank(a);
550 let rb = Self::order_by_type_rank(b);
551 if ra != rb {
552 return ra.cmp(&rb);
553 }
554
555 match (a, b) {
556 (Value::Map(l), Value::Map(r)) => Self::compare_maps(l, r),
557 (Value::Node(l), Value::Node(r)) => Self::compare_nodes(l, r),
558 (Value::Edge(l), Value::Edge(r)) => Self::compare_edges(l, r),
559 (Value::List(l), Value::List(r)) => Self::compare_lists(l, r),
560 (Value::Path(l), Value::Path(r)) => Self::compare_paths(l, r),
561 (Value::String(l), Value::String(r)) => {
562 Self::try_eval_ordering(a, b).unwrap_or_else(|| l.cmp(r))
564 }
565 (Value::Bool(l), Value::Bool(r)) => l.cmp(r),
566 (Value::Temporal(l), Value::Temporal(r)) => Self::compare_temporal(l, r),
567 (Value::Int(l), Value::Int(r)) => l.cmp(r),
568 (Value::Float(l), Value::Float(r)) => {
569 if l.is_nan() && r.is_nan() {
570 Ordering::Equal
571 } else if l.is_nan() {
572 Ordering::Greater
573 } else if r.is_nan() {
574 Ordering::Less
575 } else {
576 l.partial_cmp(r).unwrap_or(Ordering::Equal)
577 }
578 }
579 (Value::Int(l), Value::Float(r)) => {
580 if r.is_nan() {
581 Ordering::Less
582 } else {
583 (*l as f64).partial_cmp(r).unwrap_or(Ordering::Equal)
584 }
585 }
586 (Value::Float(l), Value::Int(r)) => {
587 if l.is_nan() {
588 Ordering::Greater
589 } else {
590 l.partial_cmp(&(*r as f64)).unwrap_or(Ordering::Equal)
591 }
592 }
593 (Value::Bytes(l), Value::Bytes(r)) => l.cmp(r),
594 (Value::Vector(l), Value::Vector(r)) => {
595 for (lv, rv) in l.iter().zip(r.iter()) {
596 let ord = lv.total_cmp(rv);
597 if ord != Ordering::Equal {
598 return ord;
599 }
600 }
601 l.len().cmp(&r.len())
602 }
603 _ => Ordering::Equal,
604 }
605 }
606
607 fn try_eval_ordering(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
608 use std::cmp::Ordering;
609 if matches!(eval_binary_op(a, &BinaryOp::Lt, b), Ok(Value::Bool(true))) {
610 Some(Ordering::Less)
611 } else if matches!(eval_binary_op(a, &BinaryOp::Gt, b), Ok(Value::Bool(true))) {
612 Some(Ordering::Greater)
613 } else if matches!(eval_binary_op(a, &BinaryOp::Eq, b), Ok(Value::Bool(true))) {
614 Some(Ordering::Equal)
615 } else {
616 None
617 }
618 }
619
620 fn order_by_type_rank(v: &Value) -> u8 {
623 match v {
624 Value::Map(map) => Self::map_order_rank(map),
625 Value::Node(_) => 1,
626 Value::Edge(_) => 2,
627 Value::List(_) => 3,
628 Value::Path(_) => 4,
629 Value::String(_) => 5,
630 Value::Bool(_) => 6,
631 Value::Temporal(_) => 7,
632 Value::Int(_) => 8,
633 Value::Float(f) if f.is_nan() => 9,
634 Value::Float(_) => 8,
635 Value::Null => 10,
636 Value::Bytes(_) | Value::Vector(_) => 11,
637 _ => 11,
638 }
639 }
640
641 fn map_order_rank(map: &HashMap<String, Value>) -> u8 {
642 if Self::map_as_temporal(map).is_some() {
643 7
644 } else if map.contains_key("nodes")
645 && (map.contains_key("relationships") || map.contains_key("edges"))
646 {
647 4
648 } else if map.contains_key("_eid")
649 || map.contains_key("_src")
650 || map.contains_key("_dst")
651 || map.contains_key("_type")
652 || map.contains_key("_type_name")
653 {
654 2
655 } else if map.contains_key("_vid")
656 || map.contains_key("_labels")
657 || map.contains_key("_label")
658 {
659 1
660 } else {
661 0
662 }
663 }
664
665 fn extract_temporal_value(value: &Value) -> Option<TemporalValue> {
666 crate::query::expr_eval::temporal_from_value(value)
667 }
668
669 fn map_as_temporal(map: &HashMap<String, Value>) -> Option<TemporalValue> {
670 crate::query::expr_eval::temporal_from_map_wrapper(map)
671 }
672
673 fn compare_lists(left: &[Value], right: &[Value]) -> std::cmp::Ordering {
674 left.iter()
675 .zip(right.iter())
676 .map(|(l, r)| Self::compare_values(l, r))
677 .find(|o| o.is_ne())
678 .unwrap_or_else(|| left.len().cmp(&right.len()))
679 }
680
681 fn compare_maps(
682 left: &HashMap<String, Value>,
683 right: &HashMap<String, Value>,
684 ) -> std::cmp::Ordering {
685 let mut l_pairs: Vec<_> = left.iter().collect();
686 let mut r_pairs: Vec<_> = right.iter().collect();
687 l_pairs.sort_by_key(|(k, _)| *k);
688 r_pairs.sort_by_key(|(k, _)| *k);
689
690 l_pairs
691 .iter()
692 .zip(r_pairs.iter())
693 .map(|((lk, lv), (rk, rv))| lk.cmp(rk).then_with(|| Self::compare_values(lv, rv)))
694 .find(|o| o.is_ne())
695 .unwrap_or_else(|| l_pairs.len().cmp(&r_pairs.len()))
696 }
697
698 fn compare_nodes(left: &uni_common::Node, right: &uni_common::Node) -> std::cmp::Ordering {
699 let mut l_labels = left.labels.clone();
700 let mut r_labels = right.labels.clone();
701 l_labels.sort();
702 r_labels.sort();
703
704 l_labels
705 .cmp(&r_labels)
706 .then_with(|| left.vid.cmp(&right.vid))
707 .then_with(|| Self::compare_maps(&left.properties, &right.properties))
708 }
709
710 fn compare_edges(left: &uni_common::Edge, right: &uni_common::Edge) -> std::cmp::Ordering {
711 left.edge_type
712 .cmp(&right.edge_type)
713 .then_with(|| left.src.cmp(&right.src))
714 .then_with(|| left.dst.cmp(&right.dst))
715 .then_with(|| left.eid.cmp(&right.eid))
716 .then_with(|| Self::compare_maps(&left.properties, &right.properties))
717 }
718
719 fn compare_paths(left: &uni_common::Path, right: &uni_common::Path) -> std::cmp::Ordering {
720 left.nodes
721 .iter()
722 .zip(right.nodes.iter())
723 .map(|(l, r)| Self::compare_nodes(l, r))
724 .find(|o| o.is_ne())
725 .unwrap_or_else(|| left.nodes.len().cmp(&right.nodes.len()))
726 .then_with(|| {
727 left.edges
728 .iter()
729 .zip(right.edges.iter())
730 .map(|(l, r)| Self::compare_edges(l, r))
731 .find(|o| o.is_ne())
732 .unwrap_or_else(|| left.edges.len().cmp(&right.edges.len()))
733 })
734 }
735
736 fn compare_temporal(left: &TemporalValue, right: &TemporalValue) -> std::cmp::Ordering {
737 match (left, right) {
738 (
739 TemporalValue::Date {
740 days_since_epoch: l,
741 },
742 TemporalValue::Date {
743 days_since_epoch: r,
744 },
745 ) => l.cmp(r),
746 (
747 TemporalValue::LocalTime {
748 nanos_since_midnight: l,
749 },
750 TemporalValue::LocalTime {
751 nanos_since_midnight: r,
752 },
753 ) => l.cmp(r),
754 (
755 TemporalValue::Time {
756 nanos_since_midnight: lm,
757 offset_seconds: lo,
758 },
759 TemporalValue::Time {
760 nanos_since_midnight: rm,
761 offset_seconds: ro,
762 },
763 ) => {
764 let l_utc = *lm as i128 - (*lo as i128) * 1_000_000_000;
765 let r_utc = *rm as i128 - (*ro as i128) * 1_000_000_000;
766 l_utc.cmp(&r_utc)
767 }
768 (
769 TemporalValue::LocalDateTime {
770 nanos_since_epoch: l,
771 },
772 TemporalValue::LocalDateTime {
773 nanos_since_epoch: r,
774 },
775 ) => l.cmp(r),
776 (
777 TemporalValue::DateTime {
778 nanos_since_epoch: l,
779 ..
780 },
781 TemporalValue::DateTime {
782 nanos_since_epoch: r,
783 ..
784 },
785 ) => l.cmp(r),
786 (
787 TemporalValue::Duration {
788 months: lm,
789 days: ld,
790 nanos: ln,
791 },
792 TemporalValue::Duration {
793 months: rm,
794 days: rd,
795 nanos: rn,
796 },
797 ) => (*lm, *ld, *ln).cmp(&(*rm, *rd, *rn)),
798 _ => Self::temporal_variant_rank(left).cmp(&Self::temporal_variant_rank(right)),
799 }
800 }
801
802 fn temporal_variant_rank(v: &TemporalValue) -> u8 {
803 match v {
804 TemporalValue::Date { .. } => 0,
805 TemporalValue::LocalTime { .. } => 1,
806 TemporalValue::Time { .. } => 2,
807 TemporalValue::LocalDateTime { .. } => 3,
808 TemporalValue::DateTime { .. } => 4,
809 TemporalValue::Duration { .. } => 5,
810 TemporalValue::Btic { .. } => 6,
811 }
812 }
813}
814
815#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
820pub struct ProfileOutput {
821 pub explain: crate::query::planner::ExplainOutput,
823 pub runtime_stats: Vec<OperatorStats>,
825 pub total_time_ms: u64,
827 pub peak_memory_bytes: usize,
829}
830
831#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
833pub struct OperatorStats {
834 pub operator: String,
836 pub actual_rows: usize,
838 pub time_ms: f64,
840 pub memory_bytes: usize,
842 pub index_hits: Option<usize>,
844 pub index_misses: Option<usize>,
846}
847
848fn collect_plan_metrics(
854 plan: &Arc<dyn datafusion::physical_plan::ExecutionPlan>,
855) -> Vec<OperatorStats> {
856 let mut stats = Vec::new();
857 collect_plan_metrics_inner(plan, &mut stats);
858 stats
859}
860
861fn collect_plan_metrics_inner(
862 plan: &Arc<dyn datafusion::physical_plan::ExecutionPlan>,
863 out: &mut Vec<OperatorStats>,
864) {
865 for child in plan.children() {
867 collect_plan_metrics_inner(child, out);
868 }
869
870 let operator = plan.name().to_string();
871
872 let (actual_rows, time_ms) = match plan.metrics() {
873 Some(metrics) => {
874 let rows = metrics.output_rows().unwrap_or(0);
875 let nanos = metrics.elapsed_compute().unwrap_or(0);
877 let ms = nanos as f64 / 1_000_000.0;
878 (rows, ms)
879 }
880 None => (0, 0.0),
881 };
882
883 out.push(OperatorStats {
884 operator,
885 actual_rows,
886 time_ms,
887 memory_bytes: 0,
888 index_hits: None,
889 index_misses: None,
890 });
891}
892
893impl Executor {
894 pub async fn profile(
897 &self,
898 plan: crate::query::planner::LogicalPlan,
899 params: &HashMap<String, Value>,
900 ) -> Result<(Vec<HashMap<String, Value>>, ProfileOutput)> {
901 let planner =
903 crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
904 let explain_output = planner.explain_logical_plan(&plan)?;
905
906 let start = Instant::now();
907
908 let prop_manager = self.create_prop_manager();
909
910 let (results, stats) = if Self::is_ddl_or_admin(&plan) {
913 let results = self
914 .execute_subplan(plan, &prop_manager, params, None)
915 .await?;
916 let elapsed = start.elapsed();
917 let stats = vec![OperatorStats {
918 operator: "DDL/Admin Execution".to_string(),
919 actual_rows: results.len(),
920 time_ms: elapsed.as_secs_f64() * 1000.0,
921 memory_bytes: 0,
922 index_hits: None,
923 index_misses: None,
924 }];
925 (results, stats)
926 } else {
927 let (batches, execution_plan) = self
928 .execute_datafusion_with_plan(plan, &prop_manager, params)
929 .await?;
930 let results = self.record_batches_to_rows(batches)?;
931 let stats = collect_plan_metrics(&execution_plan);
932 (results, stats)
933 };
934
935 let total_time = start.elapsed();
936
937 Ok((
938 results,
939 ProfileOutput {
940 explain: explain_output,
941 runtime_stats: stats,
942 total_time_ms: total_time.as_millis() as u64,
943 peak_memory_bytes: 0,
944 },
945 ))
946 }
947
948 fn create_prop_manager(&self) -> uni_store::runtime::property_manager::PropertyManager {
949 uni_store::runtime::property_manager::PropertyManager::new(
950 self.storage.clone(),
951 self.storage.schema_manager_arc(),
952 1000,
953 )
954 }
955}
956
957#[cfg(test)]
958mod tests {
959 use super::*;
960
961 #[test]
964 fn test_accumulator_count_basic() {
965 let mut acc = Accumulator::new("COUNT", false);
966 acc.update(&Value::Int(1), false);
967 acc.update(&Value::Null, false); acc.update(&Value::Int(2), false);
969 assert_eq!(acc.finish(), Value::Int(2));
970 }
971
972 #[test]
973 fn test_accumulator_count_wildcard() {
974 let mut acc = Accumulator::new("COUNT", false);
975 acc.update(&Value::Int(1), true);
976 acc.update(&Value::Null, true); acc.update(&Value::Int(2), true);
978 assert_eq!(acc.finish(), Value::Int(3));
979 }
980
981 #[test]
982 fn test_accumulator_sum() {
983 let mut acc = Accumulator::new("SUM", false);
984 acc.update(&Value::Int(10), false);
985 acc.update(&Value::Float(2.5), false);
986 acc.update(&Value::Null, false); assert_eq!(acc.finish(), Value::Float(12.5));
988 }
989
990 #[test]
991 fn test_accumulator_avg() {
992 let mut acc = Accumulator::new("AVG", false);
993 acc.update(&Value::Int(10), false);
994 acc.update(&Value::Int(20), false);
995 acc.update(&Value::Int(30), false);
996 assert_eq!(acc.finish(), Value::Float(20.0));
997 }
998
999 #[test]
1000 fn test_accumulator_avg_empty() {
1001 let acc = Accumulator::new("AVG", false);
1002 assert_eq!(acc.finish(), Value::Null);
1003 }
1004
1005 #[test]
1006 fn test_accumulator_min_max() {
1007 let mut min_acc = Accumulator::new("MIN", false);
1008 let mut max_acc = Accumulator::new("MAX", false);
1009 for v in &[Value::Int(3), Value::Int(1), Value::Int(2)] {
1010 min_acc.update(v, false);
1011 max_acc.update(v, false);
1012 }
1013 assert_eq!(min_acc.finish(), Value::Int(1));
1014 assert_eq!(max_acc.finish(), Value::Int(3));
1015 }
1016
1017 #[test]
1018 fn test_accumulator_collect() {
1019 let mut acc = Accumulator::new("COLLECT", false);
1020 acc.update(&Value::String("a".into()), false);
1021 acc.update(&Value::Null, false); acc.update(&Value::String("b".into()), false);
1023 assert_eq!(
1024 acc.finish(),
1025 Value::List(vec![Value::String("a".into()), Value::String("b".into()),])
1026 );
1027 }
1028
1029 #[test]
1030 fn test_accumulator_count_distinct() {
1031 let mut acc = Accumulator::new("COUNT", true);
1032 acc.update(&Value::String("a".into()), false);
1033 acc.update(&Value::String("b".into()), false);
1034 acc.update(&Value::String("a".into()), false); acc.update(&Value::Null, false); assert_eq!(acc.finish(), Value::Int(2));
1037 }
1038
1039 #[test]
1040 fn test_accumulator_percentile_empty() {
1041 let acc = Accumulator::new_with_percentile("PERCENTILEDISC", false, 0.5);
1042 assert_eq!(acc.finish(), Value::Null);
1043 }
1044
1045 #[test]
1048 fn test_compare_values_int_ordering() {
1049 assert!(Executor::compare_values(&Value::Int(1), &Value::Int(2)).is_lt());
1050 assert!(Executor::compare_values(&Value::Int(5), &Value::Int(5)).is_eq());
1051 assert!(Executor::compare_values(&Value::Int(9), &Value::Int(3)).is_gt());
1052 }
1053
1054 #[test]
1055 fn test_compare_values_null_last() {
1056 assert!(Executor::compare_values(&Value::Int(1), &Value::Null).is_lt());
1058 assert!(Executor::compare_values(&Value::Null, &Value::Int(1)).is_gt());
1059 assert!(Executor::compare_values(&Value::Null, &Value::Null).is_eq());
1060 }
1061
1062 #[test]
1063 fn test_compare_values_cross_type_rank() {
1064 assert!(Executor::compare_values(&Value::String("z".into()), &Value::Bool(false)).is_lt());
1066 assert!(Executor::compare_values(&Value::Bool(true), &Value::Int(1)).is_lt());
1067 }
1068
1069 #[test]
1070 fn test_compare_values_lists() {
1071 let l1 = Value::List(vec![Value::Int(1), Value::Int(2)]);
1072 let l2 = Value::List(vec![Value::Int(1), Value::Int(3)]);
1073 assert!(Executor::compare_values(&l1, &l2).is_lt());
1074 }
1075}