1use anyhow::{Result, anyhow};
5use std::collections::{HashMap, HashSet};
6use std::sync::Arc;
7use std::time::Instant;
8use tokio::sync::RwLock;
9use uni_algo::algo::AlgorithmRegistry;
10use uni_common::{TemporalValue, Value};
11use uni_cypher::ast::{BinaryOp, Expr};
12use uni_store::QueryContext;
13use uni_store::runtime::l0_manager::L0Manager;
14use uni_store::runtime::writer::Writer;
15use uni_store::storage::manager::StorageManager;
16use uni_xervo::runtime::ModelRuntime;
17
18use crate::query::expr_eval::eval_binary_op;
19use crate::types::QueryWarning;
20
21use super::procedure::ProcedureRegistry;
22
23#[derive(Debug)]
25pub(crate) enum Accumulator {
26 Count(i64),
27 Sum(f64),
28 Min(Option<Value>),
29 Max(Option<Value>),
30 Avg { sum: f64, count: i64 },
31 Collect(Vec<Value>),
32 CountDistinct(HashSet<String>),
33 PercentileDisc { values: Vec<f64>, percentile: f64 },
34 PercentileCont { values: Vec<f64>, percentile: f64 },
35}
36
37fn numeric_to_value(val: f64) -> Value {
39 if val.fract() == 0.0 && val >= i64::MIN as f64 && val <= i64::MAX as f64 {
40 Value::Int(val as i64)
41 } else {
42 Value::Float(val)
43 }
44}
45
46fn cypher_type_rank(val: &Value) -> u8 {
48 match val {
49 Value::Null => 0,
50 Value::List(_) => 1,
51 Value::String(_) => 2,
52 Value::Bool(_) => 3,
53 Value::Int(_) | Value::Float(_) => 4,
54 _ => 5,
55 }
56}
57
58fn cypher_cross_type_cmp(a: &Value, b: &Value) -> std::cmp::Ordering {
60 use std::cmp::Ordering;
61 let ra = cypher_type_rank(a);
62 let rb = cypher_type_rank(b);
63 if ra != rb {
64 return ra.cmp(&rb);
65 }
66 match (a, b) {
67 (Value::Int(l), Value::Int(r)) => l.cmp(r),
68 (Value::Float(l), Value::Float(r)) => l.partial_cmp(r).unwrap_or(Ordering::Equal),
69 (Value::Int(l), Value::Float(r)) => (*l as f64).partial_cmp(r).unwrap_or(Ordering::Equal),
70 (Value::Float(l), Value::Int(r)) => l.partial_cmp(&(*r as f64)).unwrap_or(Ordering::Equal),
71 (Value::String(l), Value::String(r)) => l.cmp(r),
72 (Value::Bool(l), Value::Bool(r)) => l.cmp(r),
73 _ => Ordering::Equal,
74 }
75}
76
77impl Accumulator {
78 pub(crate) fn new(op: &str, distinct: bool) -> Self {
79 Self::new_with_percentile(op, distinct, 0.0)
80 }
81
82 pub(crate) fn new_with_percentile(op: &str, distinct: bool, percentile: f64) -> Self {
83 let op_upper = op.to_uppercase();
84 match op_upper.as_str() {
85 "COUNT" if distinct => Accumulator::CountDistinct(HashSet::new()),
86 "COUNT" => Accumulator::Count(0),
87 "SUM" => Accumulator::Sum(0.0),
88 "MIN" => Accumulator::Min(None),
89 "MAX" => Accumulator::Max(None),
90 "AVG" => Accumulator::Avg { sum: 0.0, count: 0 },
91 "COLLECT" => Accumulator::Collect(Vec::new()),
92 "PERCENTILEDISC" => Accumulator::PercentileDisc {
93 values: Vec::new(),
94 percentile,
95 },
96 "PERCENTILECONT" => Accumulator::PercentileCont {
97 values: Vec::new(),
98 percentile,
99 },
100 _ => Accumulator::Count(0),
101 }
102 }
103
104 pub(crate) fn update(&mut self, val: &Value, is_wildcard: bool) {
105 match self {
106 Accumulator::Count(c) => {
107 if is_wildcard || !val.is_null() {
108 *c += 1;
109 }
110 }
111 Accumulator::Sum(s) => {
112 if let Some(n) = val.as_f64() {
113 *s += n;
114 }
115 }
116 Accumulator::Min(current) => {
117 if !val.is_null() {
118 *current = Some(match current.take() {
119 None => val.clone(),
120 Some(cur) if cypher_cross_type_cmp(val, &cur).is_lt() => val.clone(),
121 Some(cur) => cur,
122 });
123 }
124 }
125 Accumulator::Max(current) => {
126 if !val.is_null() {
127 *current = Some(match current.take() {
128 None => val.clone(),
129 Some(cur) if cypher_cross_type_cmp(val, &cur).is_gt() => val.clone(),
130 Some(cur) => cur,
131 });
132 }
133 }
134 Accumulator::Avg { sum, count } => {
135 if let Some(n) = val.as_f64() {
136 *sum += n;
137 *count += 1;
138 }
139 }
140 Accumulator::Collect(v) => {
141 if !val.is_null() {
142 v.push(val.clone());
143 }
144 }
145 Accumulator::CountDistinct(s) => {
146 if !val.is_null() {
147 s.insert(val.to_string());
148 }
149 }
150 Accumulator::PercentileDisc { values, .. }
151 | Accumulator::PercentileCont { values, .. } => {
152 if let Some(n) = val.as_f64() {
153 values.push(n);
154 }
155 }
156 }
157 }
158
159 pub(crate) fn finish(&self) -> Value {
160 match self {
161 Accumulator::Count(c) => Value::Int(*c),
162 Accumulator::Sum(s) => numeric_to_value(*s),
163 Accumulator::Min(opt) => opt.as_ref().cloned().unwrap_or(Value::Null),
164 Accumulator::Max(opt) => opt.as_ref().cloned().unwrap_or(Value::Null),
165 Accumulator::Avg { sum, count } => {
166 if *count > 0 {
167 Value::Float(*sum / (*count as f64))
168 } else {
169 Value::Null
170 }
171 }
172 Accumulator::Collect(v) => Value::List(v.clone()),
173 Accumulator::CountDistinct(s) => Value::Int(s.len() as i64),
174 Accumulator::PercentileDisc { values, percentile } => {
175 if values.is_empty() {
176 return Value::Null;
177 }
178 let mut sorted = values.clone();
179 sorted.sort_by(|a, b| a.total_cmp(b));
180 let n = sorted.len();
181 let idx = (percentile * (n as f64 - 1.0)).round() as usize;
182 numeric_to_value(sorted[idx.min(n - 1)])
183 }
184 Accumulator::PercentileCont { values, percentile } => {
185 if values.is_empty() {
186 return Value::Null;
187 }
188 let mut sorted = values.clone();
189 sorted.sort_by(|a, b| a.total_cmp(b));
190 let n = sorted.len();
191 if n == 1 {
192 return Value::Float(sorted[0]);
193 }
194 let pos = percentile * (n as f64 - 1.0);
195 let lower = (pos.floor() as usize).min(n - 1);
196 let upper = (pos.ceil() as usize).min(n - 1);
197 if lower == upper {
198 Value::Float(sorted[lower])
199 } else {
200 let frac = pos - lower as f64;
201 Value::Float(sorted[lower] + frac * (sorted[upper] - sorted[lower]))
202 }
203 }
204 }
205 }
206}
207
208pub(crate) type GenExprCacheKey = (String, String);
210
211#[derive(Clone)]
223pub struct Executor {
224 pub(crate) storage: Arc<StorageManager>,
225 pub(crate) writer: Option<Arc<RwLock<Writer>>>,
226 pub(crate) l0_manager: Option<Arc<L0Manager>>,
227 pub(crate) algo_registry: Arc<AlgorithmRegistry>,
228 pub(crate) use_transaction: bool,
229 pub(crate) file_sandbox: uni_common::config::FileSandboxConfig,
231 pub(crate) config: uni_common::config::UniConfig,
232 pub(crate) gen_expr_cache: Arc<RwLock<HashMap<GenExprCacheKey, Expr>>>,
234 pub(crate) procedure_registry: Option<Arc<ProcedureRegistry>>,
236 pub(crate) xervo_runtime: Option<Arc<ModelRuntime>>,
238 pub(crate) warnings: Arc<std::sync::Mutex<Vec<QueryWarning>>>,
240 pub(crate) transaction_l0_override:
244 Option<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
245 pub(crate) custom_function_registry:
247 Option<Arc<super::custom_functions::CustomFunctionRegistry>>,
248 pub(crate) cancellation_token: Option<tokio_util::sync::CancellationToken>,
251}
252
253impl std::fmt::Debug for Executor {
254 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255 f.debug_struct("Executor")
256 .field("use_transaction", &self.use_transaction)
257 .field("has_writer", &self.writer.is_some())
258 .field("has_l0_manager", &self.l0_manager.is_some())
259 .field("has_xervo_runtime", &self.xervo_runtime.is_some())
260 .finish_non_exhaustive()
261 }
262}
263
264impl Executor {
265 pub fn new(storage: Arc<StorageManager>) -> Self {
267 Self {
268 storage,
269 writer: None,
270 l0_manager: None,
271 algo_registry: Arc::new(AlgorithmRegistry::new()),
272 use_transaction: false,
273 file_sandbox: uni_common::config::FileSandboxConfig::default(),
274 config: uni_common::config::UniConfig::default(),
275 gen_expr_cache: Arc::new(RwLock::new(HashMap::new())),
276 procedure_registry: None,
277 xervo_runtime: None,
278 warnings: Arc::new(std::sync::Mutex::new(Vec::new())),
279 transaction_l0_override: None,
280 custom_function_registry: None,
281 cancellation_token: None,
282 }
283 }
284
285 pub fn new_with_writer(storage: Arc<StorageManager>, writer: Arc<RwLock<Writer>>) -> Self {
287 let mut executor = Self::new(storage);
288 executor.writer = Some(writer);
289 executor
290 }
291
292 pub fn set_procedure_registry(&mut self, registry: Arc<ProcedureRegistry>) {
294 self.procedure_registry = Some(registry);
295 }
296
297 pub fn set_xervo_runtime(&mut self, runtime: Option<Arc<ModelRuntime>>) {
299 self.xervo_runtime = runtime;
300 }
301
302 pub fn set_file_sandbox(&mut self, sandbox: uni_common::config::FileSandboxConfig) {
305 self.file_sandbox = sandbox;
306 }
307
308 pub fn set_config(&mut self, config: uni_common::config::UniConfig) {
310 self.config = config;
311 }
312
313 pub(crate) fn validate_path(&self, path: &str) -> Result<std::path::PathBuf> {
315 self.file_sandbox
316 .validate_path(path)
317 .map_err(|e| anyhow!("Path validation failed: {}", e))
318 }
319
320 pub fn set_writer(&mut self, writer: Arc<RwLock<Writer>>) {
322 self.writer = Some(writer);
323 }
324
325 pub fn take_warnings(&self) -> Vec<QueryWarning> {
327 self.warnings
328 .lock()
329 .map(|mut w| std::mem::take(&mut *w))
330 .unwrap_or_default()
331 }
332
333 pub fn set_use_transaction(&mut self, use_transaction: bool) {
335 self.use_transaction = use_transaction;
336 }
337
338 pub fn set_transaction_l0(
341 &mut self,
342 l0: Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>,
343 ) {
344 self.transaction_l0_override = Some(l0);
345 }
346
347 pub fn set_custom_functions(
349 &mut self,
350 registry: Arc<super::custom_functions::CustomFunctionRegistry>,
351 ) {
352 self.custom_function_registry = Some(registry);
353 }
354
355 pub fn set_cancellation_token(&mut self, token: tokio_util::sync::CancellationToken) {
357 self.cancellation_token = Some(token);
358 }
359
360 pub(crate) async fn get_context(&self) -> Option<QueryContext> {
365 if let Some(writer_lock) = &self.writer {
366 let writer = writer_lock.read().await;
367 let tx_l0 = self.transaction_l0_override.clone();
369 let mut ctx = QueryContext::new_with_pending(
370 writer.l0_manager.get_current(),
371 tx_l0,
372 writer.l0_manager.get_pending_flush(),
373 );
374 ctx.set_deadline(Instant::now() + self.config.query_timeout);
375 if let Some(ref token) = self.cancellation_token {
376 ctx.set_cancellation_token(token.clone());
377 }
378 Some(ctx)
379 } else {
380 self.l0_manager.as_ref().map(|m| {
381 let mut ctx = QueryContext::new(m.get_current());
382 ctx.set_deadline(Instant::now() + self.config.query_timeout);
383 if let Some(ref token) = self.cancellation_token {
384 ctx.set_cancellation_token(token.clone());
385 }
386 ctx
387 })
388 }
389 }
390
391 pub(crate) fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
393 use std::cmp::Ordering;
394
395 let temporal_a = Self::extract_temporal_value(a);
396 let temporal_b = Self::extract_temporal_value(b);
397
398 if let (Some(ta), Some(tb)) = (&temporal_a, &temporal_b) {
399 return Self::compare_temporal(ta, tb);
400 }
401
402 if matches!(
405 (a, b),
406 (Value::String(_), Value::Temporal(_)) | (Value::Temporal(_), Value::String(_))
407 ) && let Some(ord) = Self::try_eval_ordering(a, b)
408 {
409 return ord;
410 }
411 if let (Value::String(_), Some(tb)) = (a, temporal_b)
412 && let Some(ord) = Self::try_eval_ordering(a, &Value::Temporal(tb))
413 {
414 return ord;
415 }
416 if let (Some(ta), Value::String(_)) = (temporal_a, b)
417 && let Some(ord) = Self::try_eval_ordering(&Value::Temporal(ta), b)
418 {
419 return ord;
420 }
421
422 let ra = Self::order_by_type_rank(a);
423 let rb = Self::order_by_type_rank(b);
424 if ra != rb {
425 return ra.cmp(&rb);
426 }
427
428 match (a, b) {
429 (Value::Map(l), Value::Map(r)) => Self::compare_maps(l, r),
430 (Value::Node(l), Value::Node(r)) => Self::compare_nodes(l, r),
431 (Value::Edge(l), Value::Edge(r)) => Self::compare_edges(l, r),
432 (Value::List(l), Value::List(r)) => Self::compare_lists(l, r),
433 (Value::Path(l), Value::Path(r)) => Self::compare_paths(l, r),
434 (Value::String(l), Value::String(r)) => {
435 Self::try_eval_ordering(a, b).unwrap_or_else(|| l.cmp(r))
437 }
438 (Value::Bool(l), Value::Bool(r)) => l.cmp(r),
439 (Value::Temporal(l), Value::Temporal(r)) => Self::compare_temporal(l, r),
440 (Value::Int(l), Value::Int(r)) => l.cmp(r),
441 (Value::Float(l), Value::Float(r)) => {
442 if l.is_nan() && r.is_nan() {
443 Ordering::Equal
444 } else if l.is_nan() {
445 Ordering::Greater
446 } else if r.is_nan() {
447 Ordering::Less
448 } else {
449 l.partial_cmp(r).unwrap_or(Ordering::Equal)
450 }
451 }
452 (Value::Int(l), Value::Float(r)) => {
453 if r.is_nan() {
454 Ordering::Less
455 } else {
456 (*l as f64).partial_cmp(r).unwrap_or(Ordering::Equal)
457 }
458 }
459 (Value::Float(l), Value::Int(r)) => {
460 if l.is_nan() {
461 Ordering::Greater
462 } else {
463 l.partial_cmp(&(*r as f64)).unwrap_or(Ordering::Equal)
464 }
465 }
466 (Value::Bytes(l), Value::Bytes(r)) => l.cmp(r),
467 (Value::Vector(l), Value::Vector(r)) => {
468 for (lv, rv) in l.iter().zip(r.iter()) {
469 let ord = lv.total_cmp(rv);
470 if ord != Ordering::Equal {
471 return ord;
472 }
473 }
474 l.len().cmp(&r.len())
475 }
476 _ => Ordering::Equal,
477 }
478 }
479
480 fn try_eval_ordering(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
481 use std::cmp::Ordering;
482 if matches!(eval_binary_op(a, &BinaryOp::Lt, b), Ok(Value::Bool(true))) {
483 Some(Ordering::Less)
484 } else if matches!(eval_binary_op(a, &BinaryOp::Gt, b), Ok(Value::Bool(true))) {
485 Some(Ordering::Greater)
486 } else if matches!(eval_binary_op(a, &BinaryOp::Eq, b), Ok(Value::Bool(true))) {
487 Some(Ordering::Equal)
488 } else {
489 None
490 }
491 }
492
493 fn order_by_type_rank(v: &Value) -> u8 {
496 match v {
497 Value::Map(map) => Self::map_order_rank(map),
498 Value::Node(_) => 1,
499 Value::Edge(_) => 2,
500 Value::List(_) => 3,
501 Value::Path(_) => 4,
502 Value::String(_) => 5,
503 Value::Bool(_) => 6,
504 Value::Temporal(_) => 7,
505 Value::Int(_) => 8,
506 Value::Float(f) if f.is_nan() => 9,
507 Value::Float(_) => 8,
508 Value::Null => 10,
509 Value::Bytes(_) | Value::Vector(_) => 11,
510 _ => 11,
511 }
512 }
513
514 fn map_order_rank(map: &HashMap<String, Value>) -> u8 {
515 if Self::map_as_temporal(map).is_some() {
516 7
517 } else if map.contains_key("nodes")
518 && (map.contains_key("relationships") || map.contains_key("edges"))
519 {
520 4
521 } else if map.contains_key("_eid")
522 || map.contains_key("_src")
523 || map.contains_key("_dst")
524 || map.contains_key("_type")
525 || map.contains_key("_type_name")
526 {
527 2
528 } else if map.contains_key("_vid")
529 || map.contains_key("_labels")
530 || map.contains_key("_label")
531 {
532 1
533 } else {
534 0
535 }
536 }
537
538 fn extract_temporal_value(value: &Value) -> Option<TemporalValue> {
539 crate::query::expr_eval::temporal_from_value(value)
540 }
541
542 fn map_as_temporal(map: &HashMap<String, Value>) -> Option<TemporalValue> {
543 crate::query::expr_eval::temporal_from_map_wrapper(map)
544 }
545
546 fn compare_lists(left: &[Value], right: &[Value]) -> std::cmp::Ordering {
547 left.iter()
548 .zip(right.iter())
549 .map(|(l, r)| Self::compare_values(l, r))
550 .find(|o| o.is_ne())
551 .unwrap_or_else(|| left.len().cmp(&right.len()))
552 }
553
554 fn compare_maps(
555 left: &HashMap<String, Value>,
556 right: &HashMap<String, Value>,
557 ) -> std::cmp::Ordering {
558 let mut l_pairs: Vec<_> = left.iter().collect();
559 let mut r_pairs: Vec<_> = right.iter().collect();
560 l_pairs.sort_by_key(|(k, _)| *k);
561 r_pairs.sort_by_key(|(k, _)| *k);
562
563 l_pairs
564 .iter()
565 .zip(r_pairs.iter())
566 .map(|((lk, lv), (rk, rv))| lk.cmp(rk).then_with(|| Self::compare_values(lv, rv)))
567 .find(|o| o.is_ne())
568 .unwrap_or_else(|| l_pairs.len().cmp(&r_pairs.len()))
569 }
570
571 fn compare_nodes(left: &uni_common::Node, right: &uni_common::Node) -> std::cmp::Ordering {
572 let mut l_labels = left.labels.clone();
573 let mut r_labels = right.labels.clone();
574 l_labels.sort();
575 r_labels.sort();
576
577 l_labels
578 .cmp(&r_labels)
579 .then_with(|| left.vid.cmp(&right.vid))
580 .then_with(|| Self::compare_maps(&left.properties, &right.properties))
581 }
582
583 fn compare_edges(left: &uni_common::Edge, right: &uni_common::Edge) -> std::cmp::Ordering {
584 left.edge_type
585 .cmp(&right.edge_type)
586 .then_with(|| left.src.cmp(&right.src))
587 .then_with(|| left.dst.cmp(&right.dst))
588 .then_with(|| left.eid.cmp(&right.eid))
589 .then_with(|| Self::compare_maps(&left.properties, &right.properties))
590 }
591
592 fn compare_paths(left: &uni_common::Path, right: &uni_common::Path) -> std::cmp::Ordering {
593 left.nodes
594 .iter()
595 .zip(right.nodes.iter())
596 .map(|(l, r)| Self::compare_nodes(l, r))
597 .find(|o| o.is_ne())
598 .unwrap_or_else(|| left.nodes.len().cmp(&right.nodes.len()))
599 .then_with(|| {
600 left.edges
601 .iter()
602 .zip(right.edges.iter())
603 .map(|(l, r)| Self::compare_edges(l, r))
604 .find(|o| o.is_ne())
605 .unwrap_or_else(|| left.edges.len().cmp(&right.edges.len()))
606 })
607 }
608
609 fn compare_temporal(left: &TemporalValue, right: &TemporalValue) -> std::cmp::Ordering {
610 match (left, right) {
611 (
612 TemporalValue::Date {
613 days_since_epoch: l,
614 },
615 TemporalValue::Date {
616 days_since_epoch: r,
617 },
618 ) => l.cmp(r),
619 (
620 TemporalValue::LocalTime {
621 nanos_since_midnight: l,
622 },
623 TemporalValue::LocalTime {
624 nanos_since_midnight: r,
625 },
626 ) => l.cmp(r),
627 (
628 TemporalValue::Time {
629 nanos_since_midnight: lm,
630 offset_seconds: lo,
631 },
632 TemporalValue::Time {
633 nanos_since_midnight: rm,
634 offset_seconds: ro,
635 },
636 ) => {
637 let l_utc = *lm as i128 - (*lo as i128) * 1_000_000_000;
638 let r_utc = *rm as i128 - (*ro as i128) * 1_000_000_000;
639 l_utc.cmp(&r_utc)
640 }
641 (
642 TemporalValue::LocalDateTime {
643 nanos_since_epoch: l,
644 },
645 TemporalValue::LocalDateTime {
646 nanos_since_epoch: r,
647 },
648 ) => l.cmp(r),
649 (
650 TemporalValue::DateTime {
651 nanos_since_epoch: l,
652 ..
653 },
654 TemporalValue::DateTime {
655 nanos_since_epoch: r,
656 ..
657 },
658 ) => l.cmp(r),
659 (
660 TemporalValue::Duration {
661 months: lm,
662 days: ld,
663 nanos: ln,
664 },
665 TemporalValue::Duration {
666 months: rm,
667 days: rd,
668 nanos: rn,
669 },
670 ) => (*lm, *ld, *ln).cmp(&(*rm, *rd, *rn)),
671 _ => Self::temporal_variant_rank(left).cmp(&Self::temporal_variant_rank(right)),
672 }
673 }
674
675 fn temporal_variant_rank(v: &TemporalValue) -> u8 {
676 match v {
677 TemporalValue::Date { .. } => 0,
678 TemporalValue::LocalTime { .. } => 1,
679 TemporalValue::Time { .. } => 2,
680 TemporalValue::LocalDateTime { .. } => 3,
681 TemporalValue::DateTime { .. } => 4,
682 TemporalValue::Duration { .. } => 5,
683 }
684 }
685}
686
687#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
692pub struct ProfileOutput {
693 pub explain: crate::query::planner::ExplainOutput,
695 pub runtime_stats: Vec<OperatorStats>,
697 pub total_time_ms: u64,
699 pub peak_memory_bytes: usize,
701}
702
703#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
705pub struct OperatorStats {
706 pub operator: String,
708 pub actual_rows: usize,
710 pub time_ms: f64,
712 pub memory_bytes: usize,
714 pub index_hits: Option<usize>,
716 pub index_misses: Option<usize>,
718}
719
720fn collect_plan_metrics(
726 plan: &Arc<dyn datafusion::physical_plan::ExecutionPlan>,
727) -> Vec<OperatorStats> {
728 let mut stats = Vec::new();
729 collect_plan_metrics_inner(plan, &mut stats);
730 stats
731}
732
733fn collect_plan_metrics_inner(
734 plan: &Arc<dyn datafusion::physical_plan::ExecutionPlan>,
735 out: &mut Vec<OperatorStats>,
736) {
737 for child in plan.children() {
739 collect_plan_metrics_inner(child, out);
740 }
741
742 let operator = plan.name().to_string();
743
744 let (actual_rows, time_ms) = match plan.metrics() {
745 Some(metrics) => {
746 let rows = metrics.output_rows().unwrap_or(0);
747 let nanos = metrics.elapsed_compute().unwrap_or(0);
749 let ms = nanos as f64 / 1_000_000.0;
750 (rows, ms)
751 }
752 None => (0, 0.0),
753 };
754
755 out.push(OperatorStats {
756 operator,
757 actual_rows,
758 time_ms,
759 memory_bytes: 0,
760 index_hits: None,
761 index_misses: None,
762 });
763}
764
765impl Executor {
766 pub async fn profile(
769 &self,
770 plan: crate::query::planner::LogicalPlan,
771 params: &HashMap<String, Value>,
772 ) -> Result<(Vec<HashMap<String, Value>>, ProfileOutput)> {
773 let planner =
775 crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
776 let explain_output = planner.explain_logical_plan(&plan)?;
777
778 let start = Instant::now();
779
780 let prop_manager = self.create_prop_manager();
781
782 let (results, stats) = if Self::is_ddl_or_admin(&plan) {
785 let results = self
786 .execute_subplan(plan, &prop_manager, params, None)
787 .await?;
788 let elapsed = start.elapsed();
789 let stats = vec![OperatorStats {
790 operator: "DDL/Admin Execution".to_string(),
791 actual_rows: results.len(),
792 time_ms: elapsed.as_secs_f64() * 1000.0,
793 memory_bytes: 0,
794 index_hits: None,
795 index_misses: None,
796 }];
797 (results, stats)
798 } else {
799 let (batches, execution_plan) = self
800 .execute_datafusion_with_plan(plan, &prop_manager, params)
801 .await?;
802 let results = self.record_batches_to_rows(batches)?;
803 let stats = collect_plan_metrics(&execution_plan);
804 (results, stats)
805 };
806
807 let total_time = start.elapsed();
808
809 Ok((
810 results,
811 ProfileOutput {
812 explain: explain_output,
813 runtime_stats: stats,
814 total_time_ms: total_time.as_millis() as u64,
815 peak_memory_bytes: 0,
816 },
817 ))
818 }
819
820 fn create_prop_manager(&self) -> uni_store::runtime::property_manager::PropertyManager {
821 uni_store::runtime::property_manager::PropertyManager::new(
822 self.storage.clone(),
823 self.storage.schema_manager_arc(),
824 1000,
825 )
826 }
827}