1use anyhow::{Result, anyhow};
5use std::collections::{HashMap, HashSet};
6use std::sync::Arc;
7use std::time::Instant;
8use tokio::sync::RwLock;
9use uni_algo::algo::AlgorithmRegistry;
10use uni_common::{TemporalValue, Value};
11use uni_cypher::ast::{BinaryOp, Expr};
12use uni_store::PropertyManager;
13use uni_store::QueryContext;
14use uni_store::runtime::l0_manager::L0Manager;
15use uni_store::runtime::writer::Writer;
16use uni_store::storage::manager::StorageManager;
17use uni_xervo::runtime::ModelRuntime;
18
19use crate::query::expr_eval::eval_binary_op;
20use crate::types::QueryWarning;
21
22use super::procedure::ProcedureRegistry;
23
24#[derive(Debug)]
26pub(crate) enum Accumulator {
27 Count(i64),
28 Sum(f64),
29 Min(Option<Value>),
30 Max(Option<Value>),
31 Avg { sum: f64, count: i64 },
32 Collect(Vec<Value>),
33 CountDistinct(HashSet<Value>),
34 PercentileDisc { values: Vec<f64>, percentile: f64 },
35 PercentileCont { values: Vec<f64>, percentile: f64 },
36}
37
38fn numeric_to_value(val: f64) -> Value {
40 if val.fract() == 0.0 && val >= i64::MIN as f64 && val <= i64::MAX as f64 {
41 Value::Int(val as i64)
42 } else {
43 Value::Float(val)
44 }
45}
46
47fn distinct_key(val: &Value) -> Value {
52 match val {
53 Value::Float(f)
54 if f.is_finite()
55 && f.fract() == 0.0
56 && *f >= i64::MIN as f64
57 && *f <= i64::MAX as f64 =>
58 {
59 Value::Int(*f as i64)
60 }
61 other => other.clone(),
62 }
63}
64
65fn cypher_type_rank(val: &Value) -> u8 {
74 match val {
75 Value::Null => 0,
76 Value::List(_) => 1,
77 Value::String(_) => 2,
78 Value::Bool(_) => 3,
79 Value::Int(_) | Value::Float(_) => 4,
80 _ => 5,
81 }
82}
83
84fn cypher_cross_type_cmp(a: &Value, b: &Value) -> std::cmp::Ordering {
86 use std::cmp::Ordering;
87 let ra = cypher_type_rank(a);
88 let rb = cypher_type_rank(b);
89 if ra != rb {
90 return ra.cmp(&rb);
91 }
92 match (a, b) {
93 (Value::Int(l), Value::Int(r)) => l.cmp(r),
94 (Value::Float(l), Value::Float(r)) => l.partial_cmp(r).unwrap_or(Ordering::Equal),
95 (Value::Int(l), Value::Float(r)) => (*l as f64).partial_cmp(r).unwrap_or(Ordering::Equal),
96 (Value::Float(l), Value::Int(r)) => l.partial_cmp(&(*r as f64)).unwrap_or(Ordering::Equal),
97 (Value::String(l), Value::String(r)) => l.cmp(r),
98 (Value::Bool(l), Value::Bool(r)) => l.cmp(r),
99 _ => Ordering::Equal,
100 }
101}
102
103impl Accumulator {
104 pub(crate) fn new(op: &str, distinct: bool) -> Self {
105 Self::new_with_percentile(op, distinct, 0.0)
106 }
107
108 pub(crate) fn new_with_percentile(op: &str, distinct: bool, percentile: f64) -> Self {
109 let op_upper = op.to_uppercase();
110 match op_upper.as_str() {
111 "COUNT" if distinct => Accumulator::CountDistinct(HashSet::new()),
112 "COUNT" => Accumulator::Count(0),
113 "SUM" => Accumulator::Sum(0.0),
114 "MIN" => Accumulator::Min(None),
115 "MAX" => Accumulator::Max(None),
116 "AVG" => Accumulator::Avg { sum: 0.0, count: 0 },
117 "COLLECT" => Accumulator::Collect(Vec::new()),
118 "PERCENTILEDISC" => Accumulator::PercentileDisc {
119 values: Vec::new(),
120 percentile,
121 },
122 "PERCENTILECONT" => Accumulator::PercentileCont {
123 values: Vec::new(),
124 percentile,
125 },
126 _ => Accumulator::Count(0),
127 }
128 }
129
130 pub(crate) fn update(&mut self, val: &Value, is_wildcard: bool) {
131 match self {
132 Accumulator::Count(c) => {
133 if is_wildcard || !val.is_null() {
134 *c += 1;
135 }
136 }
137 Accumulator::Sum(s) => {
138 if let Some(n) = val.as_f64() {
139 *s += n;
140 }
141 }
142 Accumulator::Min(current) => {
143 if !val.is_null() {
144 *current = Some(match current.take() {
145 None => val.clone(),
146 Some(cur) if cypher_cross_type_cmp(val, &cur).is_lt() => val.clone(),
147 Some(cur) => cur,
148 });
149 }
150 }
151 Accumulator::Max(current) => {
152 if !val.is_null() {
153 *current = Some(match current.take() {
154 None => val.clone(),
155 Some(cur) if cypher_cross_type_cmp(val, &cur).is_gt() => val.clone(),
156 Some(cur) => cur,
157 });
158 }
159 }
160 Accumulator::Avg { sum, count } => {
161 if let Some(n) = val.as_f64() {
162 *sum += n;
163 *count += 1;
164 }
165 }
166 Accumulator::Collect(v) => {
167 if !val.is_null() {
168 v.push(val.clone());
169 }
170 }
171 Accumulator::CountDistinct(s) => {
172 if !val.is_null() {
173 s.insert(distinct_key(val));
174 }
175 }
176 Accumulator::PercentileDisc { values, .. }
177 | Accumulator::PercentileCont { values, .. } => {
178 if let Some(n) = val.as_f64() {
179 values.push(n);
180 }
181 }
182 }
183 }
184
185 pub(crate) fn finish(&self) -> Value {
186 match self {
187 Accumulator::Count(c) => Value::Int(*c),
188 Accumulator::Sum(s) => numeric_to_value(*s),
189 Accumulator::Min(opt) => opt.as_ref().cloned().unwrap_or(Value::Null),
190 Accumulator::Max(opt) => opt.as_ref().cloned().unwrap_or(Value::Null),
191 Accumulator::Avg { sum, count } => {
192 if *count > 0 {
193 Value::Float(*sum / (*count as f64))
194 } else {
195 Value::Null
196 }
197 }
198 Accumulator::Collect(v) => Value::List(v.clone()),
199 Accumulator::CountDistinct(s) => Value::Int(s.len() as i64),
200 Accumulator::PercentileDisc { values, percentile } => {
201 if values.is_empty() {
202 return Value::Null;
203 }
204 let mut sorted = values.clone();
205 sorted.sort_by(|a, b| a.total_cmp(b));
206 let n = sorted.len();
207 let idx = (percentile * (n as f64 - 1.0)).round() as usize;
208 numeric_to_value(sorted[idx.min(n - 1)])
209 }
210 Accumulator::PercentileCont { values, percentile } => {
211 if values.is_empty() {
212 return Value::Null;
213 }
214 let mut sorted = values.clone();
215 sorted.sort_by(|a, b| a.total_cmp(b));
216 let n = sorted.len();
217 if n == 1 {
218 return Value::Float(sorted[0]);
219 }
220 let pos = percentile * (n as f64 - 1.0);
221 let lower = (pos.floor() as usize).min(n - 1);
222 let upper = (pos.ceil() as usize).min(n - 1);
223 if lower == upper {
224 Value::Float(sorted[lower])
225 } else {
226 let frac = pos - lower as f64;
227 Value::Float(sorted[lower] + frac * (sorted[upper] - sorted[lower]))
228 }
229 }
230 }
231 }
232}
233
234pub(crate) type GenExprCacheKey = (String, String);
236
237pub struct Executor {
256 pub(crate) storage: Arc<StorageManager>,
257 pub(crate) writer: Option<Arc<Writer>>,
258 pub(crate) l0_manager: Option<Arc<L0Manager>>,
259 pub(crate) algo_registry: Arc<AlgorithmRegistry>,
260 pub(crate) use_transaction: bool,
261 pub(crate) file_sandbox: uni_common::config::FileSandboxConfig,
263 pub(crate) config: uni_common::config::UniConfig,
264 pub(crate) gen_expr_cache: Arc<RwLock<HashMap<GenExprCacheKey, Expr>>>,
266 pub(crate) procedure_registry: Option<Arc<ProcedureRegistry>>,
268 pub(crate) xervo_runtime: Option<Arc<ModelRuntime>>,
270 pub(crate) warnings: Arc<std::sync::Mutex<Vec<QueryWarning>>>,
272 pub(crate) transaction_l0_override:
276 Option<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
277 pub(crate) id_reservoir: Option<Arc<uni_store::runtime::TxIdReservoir>>,
281 pub(crate) custom_function_registry:
283 Option<Arc<uni_query_functions::custom_functions::CustomFunctionRegistry>>,
284 pub(crate) cancellation_token: Option<tokio_util::sync::CancellationToken>,
287 pub(crate) prop_manager_arc: Option<Arc<PropertyManager>>,
293 pub(crate) df_session_template: Option<Arc<datafusion::execution::context::SessionContext>>,
299 pub(crate) read_snapshot: Option<uni_store::runtime::SnapshotView>,
304}
305
306impl std::fmt::Debug for Executor {
307 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
308 f.debug_struct("Executor")
309 .field("use_transaction", &self.use_transaction)
310 .field("has_writer", &self.writer.is_some())
311 .field("has_l0_manager", &self.l0_manager.is_some())
312 .field("has_xervo_runtime", &self.xervo_runtime.is_some())
313 .finish_non_exhaustive()
314 }
315}
316
317impl Clone for Executor {
318 fn clone(&self) -> Self {
323 Self {
324 storage: self.storage.clone(),
325 writer: self.writer.clone(),
326 l0_manager: self.l0_manager.clone(),
327 algo_registry: self.algo_registry.clone(),
328 use_transaction: self.use_transaction,
329 file_sandbox: self.file_sandbox.clone(),
330 config: self.config.clone(),
331 gen_expr_cache: self.gen_expr_cache.clone(),
332 procedure_registry: self.procedure_registry.clone(),
333 xervo_runtime: self.xervo_runtime.clone(),
334 warnings: Arc::new(std::sync::Mutex::new(Vec::new())),
336 transaction_l0_override: self.transaction_l0_override.clone(),
337 id_reservoir: self.id_reservoir.clone(),
338 custom_function_registry: self.custom_function_registry.clone(),
339 cancellation_token: self.cancellation_token.clone(),
340 prop_manager_arc: self.prop_manager_arc.clone(),
341 df_session_template: self.df_session_template.clone(),
342 read_snapshot: self.read_snapshot.clone(),
343 }
344 }
345}
346
347impl Executor {
348 pub fn new(storage: Arc<StorageManager>) -> Self {
350 let proc_registry = Arc::new(ProcedureRegistry::new());
355 proc_registry.set_plugin_registry(crate::procedures_plugin::default_host_plugin_registry());
356 Self {
357 storage,
358 writer: None,
359 l0_manager: None,
360 algo_registry: Arc::new(AlgorithmRegistry::new()),
361 use_transaction: false,
362 file_sandbox: uni_common::config::FileSandboxConfig::default(),
363 config: uni_common::config::UniConfig::default(),
364 gen_expr_cache: Arc::new(RwLock::new(HashMap::new())),
365 procedure_registry: Some(proc_registry),
366 xervo_runtime: None,
367 warnings: Arc::new(std::sync::Mutex::new(Vec::new())),
368 transaction_l0_override: None,
369 id_reservoir: None,
370 custom_function_registry: None,
371 cancellation_token: None,
372 prop_manager_arc: None,
373 df_session_template: None,
374 read_snapshot: None,
375 }
376 }
377
378 pub fn set_prop_manager(&mut self, pm: Arc<PropertyManager>) {
384 self.prop_manager_arc = Some(pm);
385 }
386
387 pub fn set_df_session_template(
394 &mut self,
395 tmpl: Arc<datafusion::execution::context::SessionContext>,
396 ) {
397 self.df_session_template = Some(tmpl);
398 }
399
400 pub fn new_with_writer(storage: Arc<StorageManager>, writer: Arc<Writer>) -> Self {
402 let mut executor = Self::new(storage);
403 executor.writer = Some(writer);
404 executor
405 }
406
407 pub fn set_procedure_registry(&mut self, registry: Arc<ProcedureRegistry>) {
409 self.procedure_registry = Some(registry);
410 }
411
412 pub fn set_xervo_runtime(&mut self, runtime: Option<Arc<ModelRuntime>>) {
414 self.xervo_runtime = runtime;
415 }
416
417 pub fn set_file_sandbox(&mut self, sandbox: uni_common::config::FileSandboxConfig) {
420 self.file_sandbox = sandbox;
421 }
422
423 pub fn set_config(&mut self, config: uni_common::config::UniConfig) {
425 self.config = config;
426 }
427
428 pub(crate) fn validate_path(&self, path: &str) -> Result<std::path::PathBuf> {
430 self.file_sandbox
431 .validate_path(path)
432 .map_err(|e| anyhow!("Path validation failed: {}", e))
433 }
434
435 pub fn set_writer(&mut self, writer: Arc<Writer>) {
437 self.writer = Some(writer);
438 }
439
440 pub fn take_warnings(&self) -> Vec<QueryWarning> {
442 self.warnings
443 .lock()
444 .map(|mut w| std::mem::take(&mut *w))
445 .unwrap_or_default()
446 }
447
448 pub fn set_use_transaction(&mut self, use_transaction: bool) {
450 self.use_transaction = use_transaction;
451 }
452
453 pub fn set_transaction_l0(
456 &mut self,
457 l0: Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>,
458 ) {
459 self.transaction_l0_override = Some(l0);
460 }
461
462 pub fn set_read_snapshot(&mut self, snapshot: Option<uni_store::runtime::SnapshotView>) {
468 self.read_snapshot = snapshot;
469 }
470
471 pub(crate) fn effective_storage(&self) -> Arc<StorageManager> {
477 self.read_snapshot
478 .as_ref()
479 .and_then(|s| s.pinned_storage.clone())
480 .unwrap_or_else(|| self.storage.clone())
481 }
482
483 pub fn set_id_reservoir(&mut self, r: Arc<uni_store::runtime::TxIdReservoir>) {
487 self.id_reservoir = Some(r);
488 }
489
490 pub fn set_custom_functions(
492 &mut self,
493 registry: Arc<uni_query_functions::custom_functions::CustomFunctionRegistry>,
494 ) {
495 self.custom_function_registry = Some(registry);
496 }
497
498 pub fn set_cancellation_token(&mut self, token: tokio_util::sync::CancellationToken) {
500 self.cancellation_token = Some(token);
501 }
502
503 pub(crate) async fn get_context(&self) -> Option<QueryContext> {
508 if let Some(writer) = &self.writer {
509 let tx_l0 = self.transaction_l0_override.clone();
511 let mut ctx = match &self.read_snapshot {
516 Some(snap) => {
517 QueryContext::new_with_pending(snap.main.clone(), tx_l0, snap.extra.clone())
518 }
519 None => QueryContext::new_with_pending(
520 writer.l0_manager.get_current(),
521 tx_l0,
522 writer.l0_manager.get_pending_flush(),
523 ),
524 };
525 ctx.set_deadline(Instant::now() + self.config.query_timeout);
526 if let Some(ref token) = self.cancellation_token {
527 ctx.set_cancellation_token(token.clone());
528 }
529 Some(ctx)
530 } else {
531 self.l0_manager.as_ref().map(|m| {
532 let mut ctx = QueryContext::new(m.get_current());
533 ctx.set_deadline(Instant::now() + self.config.query_timeout);
534 if let Some(ref token) = self.cancellation_token {
535 ctx.set_cancellation_token(token.clone());
536 }
537 ctx
538 })
539 }
540 }
541
542 pub(crate) fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
550 use std::cmp::Ordering;
551
552 let temporal_a = Self::extract_temporal_value(a);
553 let temporal_b = Self::extract_temporal_value(b);
554
555 if let (Some(ta), Some(tb)) = (&temporal_a, &temporal_b) {
556 return Self::compare_temporal(ta, tb);
557 }
558
559 if matches!(
562 (a, b),
563 (Value::String(_), Value::Temporal(_)) | (Value::Temporal(_), Value::String(_))
564 ) && let Some(ord) = Self::try_eval_ordering(a, b)
565 {
566 return ord;
567 }
568 if let (Value::String(_), Some(tb)) = (a, temporal_b)
569 && let Some(ord) = Self::try_eval_ordering(a, &Value::Temporal(tb))
570 {
571 return ord;
572 }
573 if let (Some(ta), Value::String(_)) = (temporal_a, b)
574 && let Some(ord) = Self::try_eval_ordering(&Value::Temporal(ta), b)
575 {
576 return ord;
577 }
578
579 let ra = Self::order_by_type_rank(a);
580 let rb = Self::order_by_type_rank(b);
581 if ra != rb {
582 return ra.cmp(&rb);
583 }
584
585 match (a, b) {
586 (Value::Map(l), Value::Map(r)) => Self::compare_maps(l, r),
587 (Value::Node(l), Value::Node(r)) => Self::compare_nodes(l, r),
588 (Value::Edge(l), Value::Edge(r)) => Self::compare_edges(l, r),
589 (Value::List(l), Value::List(r)) => Self::compare_lists(l, r),
590 (Value::Path(l), Value::Path(r)) => Self::compare_paths(l, r),
591 (Value::String(l), Value::String(r)) => {
592 Self::try_eval_ordering(a, b).unwrap_or_else(|| l.cmp(r))
594 }
595 (Value::Bool(l), Value::Bool(r)) => l.cmp(r),
596 (Value::Temporal(l), Value::Temporal(r)) => Self::compare_temporal(l, r),
597 (Value::Int(l), Value::Int(r)) => l.cmp(r),
598 (Value::Float(l), Value::Float(r)) => {
599 if l.is_nan() && r.is_nan() {
600 Ordering::Equal
601 } else if l.is_nan() {
602 Ordering::Greater
603 } else if r.is_nan() {
604 Ordering::Less
605 } else {
606 l.partial_cmp(r).unwrap_or(Ordering::Equal)
607 }
608 }
609 (Value::Int(l), Value::Float(r)) => {
610 if r.is_nan() {
611 Ordering::Less
612 } else {
613 (*l as f64).partial_cmp(r).unwrap_or(Ordering::Equal)
614 }
615 }
616 (Value::Float(l), Value::Int(r)) => {
617 if l.is_nan() {
618 Ordering::Greater
619 } else {
620 l.partial_cmp(&(*r as f64)).unwrap_or(Ordering::Equal)
621 }
622 }
623 (Value::Bytes(l), Value::Bytes(r)) => l.cmp(r),
624 (Value::Vector(l), Value::Vector(r)) => {
625 for (lv, rv) in l.iter().zip(r.iter()) {
626 let ord = lv.total_cmp(rv);
627 if ord != Ordering::Equal {
628 return ord;
629 }
630 }
631 l.len().cmp(&r.len())
632 }
633 _ => Ordering::Equal,
634 }
635 }
636
637 fn try_eval_ordering(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
638 use std::cmp::Ordering;
639 if matches!(eval_binary_op(a, &BinaryOp::Lt, b), Ok(Value::Bool(true))) {
640 Some(Ordering::Less)
641 } else if matches!(eval_binary_op(a, &BinaryOp::Gt, b), Ok(Value::Bool(true))) {
642 Some(Ordering::Greater)
643 } else if matches!(eval_binary_op(a, &BinaryOp::Eq, b), Ok(Value::Bool(true))) {
644 Some(Ordering::Equal)
645 } else {
646 None
647 }
648 }
649
650 fn order_by_type_rank(v: &Value) -> u8 {
653 match v {
654 Value::Map(map) => Self::map_order_rank(map),
655 Value::Node(_) => 1,
656 Value::Edge(_) => 2,
657 Value::List(_) => 3,
658 Value::Path(_) => 4,
659 Value::String(_) => 5,
660 Value::Bool(_) => 6,
661 Value::Temporal(_) => 7,
662 Value::Int(_) => 8,
663 Value::Float(f) if f.is_nan() => 9,
664 Value::Float(_) => 8,
665 Value::Null => 10,
666 Value::Bytes(_) | Value::Vector(_) => 11,
667 _ => 11,
668 }
669 }
670
671 fn map_order_rank(map: &HashMap<String, Value>) -> u8 {
672 if Self::map_as_temporal(map).is_some() {
673 7
674 } else if map.contains_key("nodes")
675 && (map.contains_key("relationships") || map.contains_key("edges"))
676 {
677 4
678 } else if map.contains_key("_eid")
679 || map.contains_key("_src")
680 || map.contains_key("_dst")
681 || map.contains_key("_type")
682 || map.contains_key("_type_name")
683 {
684 2
685 } else if map.contains_key("_vid")
686 || map.contains_key("_labels")
687 || map.contains_key("_label")
688 {
689 1
690 } else {
691 0
692 }
693 }
694
695 fn extract_temporal_value(value: &Value) -> Option<TemporalValue> {
696 crate::query::expr_eval::temporal_from_value(value)
697 }
698
699 fn map_as_temporal(map: &HashMap<String, Value>) -> Option<TemporalValue> {
700 crate::query::expr_eval::temporal_from_map_wrapper(map)
701 }
702
703 fn compare_lists(left: &[Value], right: &[Value]) -> std::cmp::Ordering {
704 left.iter()
705 .zip(right.iter())
706 .map(|(l, r)| Self::compare_values(l, r))
707 .find(|o| o.is_ne())
708 .unwrap_or_else(|| left.len().cmp(&right.len()))
709 }
710
711 fn compare_maps(
712 left: &HashMap<String, Value>,
713 right: &HashMap<String, Value>,
714 ) -> std::cmp::Ordering {
715 let mut l_pairs: Vec<_> = left.iter().collect();
716 let mut r_pairs: Vec<_> = right.iter().collect();
717 l_pairs.sort_by_key(|(k, _)| *k);
718 r_pairs.sort_by_key(|(k, _)| *k);
719
720 l_pairs
721 .iter()
722 .zip(r_pairs.iter())
723 .map(|((lk, lv), (rk, rv))| lk.cmp(rk).then_with(|| Self::compare_values(lv, rv)))
724 .find(|o| o.is_ne())
725 .unwrap_or_else(|| l_pairs.len().cmp(&r_pairs.len()))
726 }
727
728 fn compare_nodes(left: &uni_common::Node, right: &uni_common::Node) -> std::cmp::Ordering {
729 let mut l_labels = left.labels.clone();
730 let mut r_labels = right.labels.clone();
731 l_labels.sort();
732 r_labels.sort();
733
734 l_labels
735 .cmp(&r_labels)
736 .then_with(|| left.vid.cmp(&right.vid))
737 .then_with(|| Self::compare_maps(&left.properties, &right.properties))
738 }
739
740 fn compare_edges(left: &uni_common::Edge, right: &uni_common::Edge) -> std::cmp::Ordering {
741 left.edge_type
742 .cmp(&right.edge_type)
743 .then_with(|| left.src.cmp(&right.src))
744 .then_with(|| left.dst.cmp(&right.dst))
745 .then_with(|| left.eid.cmp(&right.eid))
746 .then_with(|| Self::compare_maps(&left.properties, &right.properties))
747 }
748
749 fn compare_paths(left: &uni_common::Path, right: &uni_common::Path) -> std::cmp::Ordering {
750 left.nodes
751 .iter()
752 .zip(right.nodes.iter())
753 .map(|(l, r)| Self::compare_nodes(l, r))
754 .find(|o| o.is_ne())
755 .unwrap_or_else(|| left.nodes.len().cmp(&right.nodes.len()))
756 .then_with(|| {
757 left.edges
758 .iter()
759 .zip(right.edges.iter())
760 .map(|(l, r)| Self::compare_edges(l, r))
761 .find(|o| o.is_ne())
762 .unwrap_or_else(|| left.edges.len().cmp(&right.edges.len()))
763 })
764 }
765
766 fn compare_temporal(left: &TemporalValue, right: &TemporalValue) -> std::cmp::Ordering {
767 match (left, right) {
768 (
769 TemporalValue::Date {
770 days_since_epoch: l,
771 },
772 TemporalValue::Date {
773 days_since_epoch: r,
774 },
775 ) => l.cmp(r),
776 (
777 TemporalValue::LocalTime {
778 nanos_since_midnight: l,
779 },
780 TemporalValue::LocalTime {
781 nanos_since_midnight: r,
782 },
783 ) => l.cmp(r),
784 (
785 TemporalValue::Time {
786 nanos_since_midnight: lm,
787 offset_seconds: lo,
788 },
789 TemporalValue::Time {
790 nanos_since_midnight: rm,
791 offset_seconds: ro,
792 },
793 ) => {
794 let l_utc = *lm as i128 - (*lo as i128) * 1_000_000_000;
795 let r_utc = *rm as i128 - (*ro as i128) * 1_000_000_000;
796 l_utc.cmp(&r_utc)
797 }
798 (
799 TemporalValue::LocalDateTime {
800 nanos_since_epoch: l,
801 },
802 TemporalValue::LocalDateTime {
803 nanos_since_epoch: r,
804 },
805 ) => l.cmp(r),
806 (
807 TemporalValue::DateTime {
808 nanos_since_epoch: l,
809 ..
810 },
811 TemporalValue::DateTime {
812 nanos_since_epoch: r,
813 ..
814 },
815 ) => l.cmp(r),
816 (
817 TemporalValue::Duration {
818 months: lm,
819 days: ld,
820 nanos: ln,
821 },
822 TemporalValue::Duration {
823 months: rm,
824 days: rd,
825 nanos: rn,
826 },
827 ) => (*lm, *ld, *ln).cmp(&(*rm, *rd, *rn)),
828 _ => Self::temporal_variant_rank(left).cmp(&Self::temporal_variant_rank(right)),
829 }
830 }
831
832 fn temporal_variant_rank(v: &TemporalValue) -> u8 {
833 match v {
834 TemporalValue::Date { .. } => 0,
835 TemporalValue::LocalTime { .. } => 1,
836 TemporalValue::Time { .. } => 2,
837 TemporalValue::LocalDateTime { .. } => 3,
838 TemporalValue::DateTime { .. } => 4,
839 TemporalValue::Duration { .. } => 5,
840 TemporalValue::Btic { .. } => 6,
841 }
842 }
843}
844
845#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
850pub struct ProfileOutput {
851 pub explain: crate::query::planner::ExplainOutput,
853 pub runtime_stats: Vec<OperatorStats>,
855 pub total_time_ms: u64,
857 pub peak_memory_bytes: usize,
859}
860
861#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
863pub struct OperatorStats {
864 pub operator: String,
866 pub actual_rows: usize,
868 pub time_ms: f64,
870 pub memory_bytes: usize,
872 pub index_hits: Option<usize>,
874 pub index_misses: Option<usize>,
876}
877
878fn collect_plan_metrics(
884 plan: &Arc<dyn datafusion::physical_plan::ExecutionPlan>,
885) -> Vec<OperatorStats> {
886 let mut stats = Vec::new();
887 collect_plan_metrics_inner(plan, &mut stats);
888 stats
889}
890
891fn collect_plan_metrics_inner(
892 plan: &Arc<dyn datafusion::physical_plan::ExecutionPlan>,
893 out: &mut Vec<OperatorStats>,
894) {
895 for child in plan.children() {
897 collect_plan_metrics_inner(child, out);
898 }
899
900 let operator = plan.name().to_string();
901
902 let (actual_rows, time_ms) = match plan.metrics() {
903 Some(metrics) => {
904 let rows = metrics.output_rows().unwrap_or(0);
905 let nanos = metrics.elapsed_compute().unwrap_or(0);
907 let ms = nanos as f64 / 1_000_000.0;
908 (rows, ms)
909 }
910 None => (0, 0.0),
911 };
912
913 out.push(OperatorStats {
914 operator,
915 actual_rows,
916 time_ms,
917 memory_bytes: 0,
918 index_hits: None,
919 index_misses: None,
920 });
921}
922
923impl Executor {
924 pub async fn profile(
927 &self,
928 plan: crate::query::planner::LogicalPlan,
929 params: &HashMap<String, Value>,
930 ) -> Result<(Vec<HashMap<String, Value>>, ProfileOutput)> {
931 let planner =
933 crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
934 let explain_output = planner.explain_logical_plan(&plan)?;
935
936 let start = Instant::now();
937
938 let prop_manager = self.create_prop_manager();
939
940 let (results, stats) = if Self::is_ddl_or_admin(&plan) {
943 let results = self
944 .execute_subplan(plan, &prop_manager, params, None)
945 .await?;
946 let elapsed = start.elapsed();
947 let stats = vec![OperatorStats {
948 operator: "DDL/Admin Execution".to_string(),
949 actual_rows: results.len(),
950 time_ms: elapsed.as_secs_f64() * 1000.0,
951 memory_bytes: 0,
952 index_hits: None,
953 index_misses: None,
954 }];
955 (results, stats)
956 } else {
957 let (batches, execution_plan) = self
958 .execute_datafusion_with_plan(plan, &prop_manager, params)
959 .await?;
960 let results = self.record_batches_to_rows(batches)?;
961 let stats = collect_plan_metrics(&execution_plan);
962 (results, stats)
963 };
964
965 let total_time = start.elapsed();
966
967 Ok((
968 results,
969 ProfileOutput {
970 explain: explain_output,
971 runtime_stats: stats,
972 total_time_ms: total_time.as_millis() as u64,
973 peak_memory_bytes: 0,
974 },
975 ))
976 }
977
978 fn create_prop_manager(&self) -> uni_store::runtime::property_manager::PropertyManager {
979 uni_store::runtime::property_manager::PropertyManager::new(
980 self.storage.clone(),
981 self.storage.schema_manager_arc(),
982 1000,
983 )
984 }
985}
986
987#[cfg(test)]
988mod tests {
989 use super::*;
990
991 #[test]
994 fn test_accumulator_count_basic() {
995 let mut acc = Accumulator::new("COUNT", false);
996 acc.update(&Value::Int(1), false);
997 acc.update(&Value::Null, false); acc.update(&Value::Int(2), false);
999 assert_eq!(acc.finish(), Value::Int(2));
1000 }
1001
1002 #[test]
1003 fn test_accumulator_count_wildcard() {
1004 let mut acc = Accumulator::new("COUNT", false);
1005 acc.update(&Value::Int(1), true);
1006 acc.update(&Value::Null, true); acc.update(&Value::Int(2), true);
1008 assert_eq!(acc.finish(), Value::Int(3));
1009 }
1010
1011 #[test]
1012 fn test_accumulator_sum() {
1013 let mut acc = Accumulator::new("SUM", false);
1014 acc.update(&Value::Int(10), false);
1015 acc.update(&Value::Float(2.5), false);
1016 acc.update(&Value::Null, false); assert_eq!(acc.finish(), Value::Float(12.5));
1018 }
1019
1020 #[test]
1021 fn test_accumulator_avg() {
1022 let mut acc = Accumulator::new("AVG", false);
1023 acc.update(&Value::Int(10), false);
1024 acc.update(&Value::Int(20), false);
1025 acc.update(&Value::Int(30), false);
1026 assert_eq!(acc.finish(), Value::Float(20.0));
1027 }
1028
1029 #[test]
1030 fn test_accumulator_avg_empty() {
1031 let acc = Accumulator::new("AVG", false);
1032 assert_eq!(acc.finish(), Value::Null);
1033 }
1034
1035 #[test]
1036 fn test_accumulator_min_max() {
1037 let mut min_acc = Accumulator::new("MIN", false);
1038 let mut max_acc = Accumulator::new("MAX", false);
1039 for v in &[Value::Int(3), Value::Int(1), Value::Int(2)] {
1040 min_acc.update(v, false);
1041 max_acc.update(v, false);
1042 }
1043 assert_eq!(min_acc.finish(), Value::Int(1));
1044 assert_eq!(max_acc.finish(), Value::Int(3));
1045 }
1046
1047 #[test]
1048 fn test_accumulator_collect() {
1049 let mut acc = Accumulator::new("COLLECT", false);
1050 acc.update(&Value::String("a".into()), false);
1051 acc.update(&Value::Null, false); acc.update(&Value::String("b".into()), false);
1053 assert_eq!(
1054 acc.finish(),
1055 Value::List(vec![Value::String("a".into()), Value::String("b".into()),])
1056 );
1057 }
1058
1059 #[test]
1060 fn test_accumulator_count_distinct() {
1061 let mut acc = Accumulator::new("COUNT", true);
1062 acc.update(&Value::String("a".into()), false);
1063 acc.update(&Value::String("b".into()), false);
1064 acc.update(&Value::String("a".into()), false); acc.update(&Value::Null, false); assert_eq!(acc.finish(), Value::Int(2));
1067 }
1068
1069 #[test]
1070 fn test_accumulator_percentile_empty() {
1071 let acc = Accumulator::new_with_percentile("PERCENTILEDISC", false, 0.5);
1072 assert_eq!(acc.finish(), Value::Null);
1073 }
1074
1075 #[test]
1078 fn test_compare_values_int_ordering() {
1079 assert!(Executor::compare_values(&Value::Int(1), &Value::Int(2)).is_lt());
1080 assert!(Executor::compare_values(&Value::Int(5), &Value::Int(5)).is_eq());
1081 assert!(Executor::compare_values(&Value::Int(9), &Value::Int(3)).is_gt());
1082 }
1083
1084 #[test]
1085 fn test_compare_values_null_last() {
1086 assert!(Executor::compare_values(&Value::Int(1), &Value::Null).is_lt());
1088 assert!(Executor::compare_values(&Value::Null, &Value::Int(1)).is_gt());
1089 assert!(Executor::compare_values(&Value::Null, &Value::Null).is_eq());
1090 }
1091
1092 #[test]
1093 fn test_compare_values_cross_type_rank() {
1094 assert!(Executor::compare_values(&Value::String("z".into()), &Value::Bool(false)).is_lt());
1096 assert!(Executor::compare_values(&Value::Bool(true), &Value::Int(1)).is_lt());
1097 }
1098
1099 #[test]
1100 fn test_compare_values_lists() {
1101 let l1 = Value::List(vec![Value::Int(1), Value::Int(2)]);
1102 let l2 = Value::List(vec![Value::Int(1), Value::Int(3)]);
1103 assert!(Executor::compare_values(&l1, &l2).is_lt());
1104 }
1105
1106 #[test]
1110 fn test_count_distinct_type_identity() {
1111 let mut acc = Accumulator::new("COUNT", true);
1112 acc.update(&Value::Int(1), false);
1113 acc.update(&Value::Float(1.0), false);
1114 assert_eq!(acc.finish(), Value::Int(1), "1 and 1.0 count once");
1115
1116 let mut acc = Accumulator::new("COUNT", true);
1117 acc.update(&Value::Int(1), false);
1118 acc.update(&Value::String("1".to_string()), false);
1119 assert_eq!(acc.finish(), Value::Int(2), "1 and '1' are distinct");
1120
1121 let mut acc = Accumulator::new("COUNT", true);
1122 acc.update(&Value::Int(1), false);
1123 acc.update(&Value::Bool(true), false);
1124 assert_eq!(acc.finish(), Value::Int(2), "1 and true are distinct");
1125
1126 let mut acc = Accumulator::new("COUNT", true);
1129 acc.update(&Value::Float(1.5), false);
1130 acc.update(&Value::Float(1.5), false);
1131 acc.update(&Value::Float(f64::NAN), false);
1132 acc.update(&Value::Float(f64::NAN), false);
1133 acc.update(&Value::Float(0.0), false);
1134 acc.update(&Value::Float(-0.0), false);
1135 assert_eq!(acc.finish(), Value::Int(3), "1.5, NaN, 0 -> 3 buckets");
1136 }
1137}