1use anyhow::{Result, anyhow};
5use std::collections::{HashMap, HashSet};
6use std::sync::Arc;
7use std::time::Instant;
8use tokio::sync::RwLock;
9use uni_algo::algo::AlgorithmRegistry;
10use uni_common::{TemporalType, TemporalValue, Value};
11use uni_cypher::ast::{BinaryOp, Expr};
12use uni_store::QueryContext;
13use uni_store::runtime::l0_manager::L0Manager;
14use uni_store::runtime::writer::Writer;
15use uni_store::storage::manager::StorageManager;
16use uni_xervo::runtime::ModelRuntime;
17
18use crate::query::datetime::{classify_temporal, eval_datetime_function};
19use crate::query::expr_eval::eval_binary_op;
20
21use super::procedure::ProcedureRegistry;
22
23#[derive(Debug)]
24pub(crate) enum Accumulator {
25 Count(i64),
26 Sum(f64),
27 Min(Option<Value>),
28 Max(Option<Value>),
29 Avg { sum: f64, count: i64 },
30 Collect(Vec<Value>),
31 CountDistinct(HashSet<String>),
32 PercentileDisc { values: Vec<f64>, percentile: f64 },
33 PercentileCont { values: Vec<f64>, percentile: f64 },
34}
35
36fn numeric_to_value(val: f64) -> Value {
38 if val.fract() == 0.0 && val >= i64::MIN as f64 && val <= i64::MAX as f64 {
39 Value::Int(val as i64)
40 } else {
41 Value::Float(val)
42 }
43}
44
45fn cypher_type_rank(val: &Value) -> u8 {
47 match val {
48 Value::Null => 0,
49 Value::List(_) => 1,
50 Value::String(_) => 2,
51 Value::Bool(_) => 3,
52 Value::Int(_) | Value::Float(_) => 4,
53 _ => 5,
54 }
55}
56
57fn cypher_cross_type_cmp(a: &Value, b: &Value) -> std::cmp::Ordering {
59 use std::cmp::Ordering;
60 let ra = cypher_type_rank(a);
61 let rb = cypher_type_rank(b);
62 if ra != rb {
63 return ra.cmp(&rb);
64 }
65 match (a, b) {
66 (Value::Int(l), Value::Int(r)) => l.cmp(r),
67 (Value::Float(l), Value::Float(r)) => l.partial_cmp(r).unwrap_or(Ordering::Equal),
68 (Value::Int(l), Value::Float(r)) => (*l as f64).partial_cmp(r).unwrap_or(Ordering::Equal),
69 (Value::Float(l), Value::Int(r)) => l.partial_cmp(&(*r as f64)).unwrap_or(Ordering::Equal),
70 (Value::String(l), Value::String(r)) => l.cmp(r),
71 (Value::Bool(l), Value::Bool(r)) => l.cmp(r),
72 _ => Ordering::Equal,
73 }
74}
75
76impl Accumulator {
77 pub(crate) fn new(op: &str, distinct: bool) -> Self {
78 Self::new_with_percentile(op, distinct, 0.0)
79 }
80
81 pub(crate) fn new_with_percentile(op: &str, distinct: bool, percentile: f64) -> Self {
82 let op_upper = op.to_uppercase();
83 match op_upper.as_str() {
84 "COUNT" if distinct => Accumulator::CountDistinct(HashSet::new()),
85 "COUNT" => Accumulator::Count(0),
86 "SUM" => Accumulator::Sum(0.0),
87 "MIN" => Accumulator::Min(None),
88 "MAX" => Accumulator::Max(None),
89 "AVG" => Accumulator::Avg { sum: 0.0, count: 0 },
90 "COLLECT" => Accumulator::Collect(Vec::new()),
91 "PERCENTILEDISC" => Accumulator::PercentileDisc {
92 values: Vec::new(),
93 percentile,
94 },
95 "PERCENTILECONT" => Accumulator::PercentileCont {
96 values: Vec::new(),
97 percentile,
98 },
99 _ => Accumulator::Count(0),
100 }
101 }
102
103 pub(crate) fn update(&mut self, val: &Value, is_wildcard: bool) {
104 match self {
105 Accumulator::Count(c) => {
106 if is_wildcard || !val.is_null() {
107 *c += 1;
108 }
109 }
110 Accumulator::Sum(s) => {
111 if let Some(n) = val.as_f64() {
112 *s += n;
113 }
114 }
115 Accumulator::Min(current) => {
116 if !val.is_null() {
117 *current = Some(match current.take() {
118 None => val.clone(),
119 Some(cur) if cypher_cross_type_cmp(val, &cur).is_lt() => val.clone(),
120 Some(cur) => cur,
121 });
122 }
123 }
124 Accumulator::Max(current) => {
125 if !val.is_null() {
126 *current = Some(match current.take() {
127 None => val.clone(),
128 Some(cur) if cypher_cross_type_cmp(val, &cur).is_gt() => val.clone(),
129 Some(cur) => cur,
130 });
131 }
132 }
133 Accumulator::Avg { sum, count } => {
134 if let Some(n) = val.as_f64() {
135 *sum += n;
136 *count += 1;
137 }
138 }
139 Accumulator::Collect(v) => {
140 if !val.is_null() {
141 v.push(val.clone());
142 }
143 }
144 Accumulator::CountDistinct(s) => {
145 if !val.is_null() {
146 s.insert(val.to_string());
147 }
148 }
149 Accumulator::PercentileDisc { values, .. }
150 | Accumulator::PercentileCont { values, .. } => {
151 if let Some(n) = val.as_f64() {
152 values.push(n);
153 }
154 }
155 }
156 }
157
158 pub(crate) fn finish(&self) -> Value {
159 match self {
160 Accumulator::Count(c) => Value::Int(*c),
161 Accumulator::Sum(s) => numeric_to_value(*s),
162 Accumulator::Min(opt) => opt.as_ref().cloned().unwrap_or(Value::Null),
163 Accumulator::Max(opt) => opt.as_ref().cloned().unwrap_or(Value::Null),
164 Accumulator::Avg { sum, count } => {
165 if *count > 0 {
166 Value::Float(*sum / (*count as f64))
167 } else {
168 Value::Null
169 }
170 }
171 Accumulator::Collect(v) => Value::List(v.clone()),
172 Accumulator::CountDistinct(s) => Value::Int(s.len() as i64),
173 Accumulator::PercentileDisc { values, percentile } => {
174 if values.is_empty() {
175 return Value::Null;
176 }
177 let mut sorted = values.clone();
178 sorted.sort_by(|a, b| a.total_cmp(b));
179 let n = sorted.len();
180 let idx = (percentile * (n as f64 - 1.0)).round() as usize;
181 numeric_to_value(sorted[idx.min(n - 1)])
182 }
183 Accumulator::PercentileCont { values, percentile } => {
184 if values.is_empty() {
185 return Value::Null;
186 }
187 let mut sorted = values.clone();
188 sorted.sort_by(|a, b| a.total_cmp(b));
189 let n = sorted.len();
190 if n == 1 {
191 return Value::Float(sorted[0]);
192 }
193 let pos = percentile * (n as f64 - 1.0);
194 let lower = (pos.floor() as usize).min(n - 1);
195 let upper = (pos.ceil() as usize).min(n - 1);
196 if lower == upper {
197 Value::Float(sorted[lower])
198 } else {
199 let frac = pos - lower as f64;
200 Value::Float(sorted[lower] + frac * (sorted[upper] - sorted[lower]))
201 }
202 }
203 }
204 }
205}
206
207pub(crate) type GenExprCacheKey = (String, String);
209
210#[derive(Clone)]
211pub struct Executor {
212 pub(crate) storage: Arc<StorageManager>,
213 pub(crate) writer: Option<Arc<RwLock<Writer>>>,
214 pub(crate) l0_manager: Option<Arc<L0Manager>>,
215 pub(crate) algo_registry: Arc<AlgorithmRegistry>,
216 pub(crate) use_transaction: bool,
217 pub(crate) file_sandbox: uni_common::config::FileSandboxConfig,
219 pub(crate) config: uni_common::config::UniConfig,
220 pub(crate) gen_expr_cache: Arc<RwLock<HashMap<GenExprCacheKey, Expr>>>,
222 pub(crate) procedure_registry: Option<Arc<ProcedureRegistry>>,
224 pub(crate) xervo_runtime: Option<Arc<ModelRuntime>>,
226}
227
228impl Executor {
229 pub fn new(storage: Arc<StorageManager>) -> Self {
230 Self {
231 storage,
232 writer: None,
233 l0_manager: None,
234 algo_registry: Arc::new(AlgorithmRegistry::new()),
235 use_transaction: false,
236 file_sandbox: uni_common::config::FileSandboxConfig::default(),
237 config: uni_common::config::UniConfig::default(),
238 gen_expr_cache: Arc::new(RwLock::new(HashMap::new())),
239 procedure_registry: None,
240 xervo_runtime: None,
241 }
242 }
243
244 pub fn new_with_writer(storage: Arc<StorageManager>, writer: Arc<RwLock<Writer>>) -> Self {
245 let mut executor = Self::new(storage);
246 executor.writer = Some(writer);
247 executor
248 }
249
250 pub fn set_procedure_registry(&mut self, registry: Arc<ProcedureRegistry>) {
252 self.procedure_registry = Some(registry);
253 }
254
255 pub fn set_xervo_runtime(&mut self, runtime: Option<Arc<ModelRuntime>>) {
256 self.xervo_runtime = runtime;
257 }
258
259 pub fn set_file_sandbox(&mut self, sandbox: uni_common::config::FileSandboxConfig) {
262 self.file_sandbox = sandbox;
263 }
264
265 pub fn set_config(&mut self, config: uni_common::config::UniConfig) {
266 self.config = config;
267 }
268
269 pub(crate) fn validate_path(&self, path: &str) -> Result<std::path::PathBuf> {
271 self.file_sandbox
272 .validate_path(path)
273 .map_err(|e| anyhow!("Path validation failed: {}", e))
274 }
275
276 pub fn set_writer(&mut self, writer: Arc<RwLock<Writer>>) {
277 self.writer = Some(writer);
278 }
279
280 pub fn set_use_transaction(&mut self, use_transaction: bool) {
281 self.use_transaction = use_transaction;
282 }
283
284 pub(crate) async fn get_context(&self) -> Option<QueryContext> {
285 if let Some(writer_lock) = &self.writer {
286 let writer = writer_lock.read().await;
287 let mut ctx = QueryContext::new_with_pending(
289 writer.l0_manager.get_current(),
290 writer.transaction_l0.clone(),
291 writer.l0_manager.get_pending_flush(),
292 );
293 ctx.set_deadline(Instant::now() + self.config.query_timeout);
294 Some(ctx)
295 } else {
296 self.l0_manager.as_ref().map(|m| {
297 let mut ctx = QueryContext::new(m.get_current());
298 ctx.set_deadline(Instant::now() + self.config.query_timeout);
299 ctx
300 })
301 }
302 }
303
304 pub(crate) fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
305 use std::cmp::Ordering;
306
307 let temporal_a = Self::extract_temporal_value(a);
308 let temporal_b = Self::extract_temporal_value(b);
309
310 if let (Some(ta), Some(tb)) = (&temporal_a, &temporal_b) {
311 return Self::compare_temporal(ta, tb);
312 }
313
314 if matches!(
317 (a, b),
318 (Value::String(_), Value::Temporal(_)) | (Value::Temporal(_), Value::String(_))
319 ) && let Some(ord) = Self::try_eval_ordering(a, b)
320 {
321 return ord;
322 }
323 if let (Value::String(_), Some(tb)) = (a, temporal_b)
324 && let Some(ord) = Self::try_eval_ordering(a, &Value::Temporal(tb))
325 {
326 return ord;
327 }
328 if let (Some(ta), Value::String(_)) = (temporal_a, b)
329 && let Some(ord) = Self::try_eval_ordering(&Value::Temporal(ta), b)
330 {
331 return ord;
332 }
333
334 let ra = Self::order_by_type_rank(a);
335 let rb = Self::order_by_type_rank(b);
336 if ra != rb {
337 return ra.cmp(&rb);
338 }
339
340 match (a, b) {
341 (Value::Map(l), Value::Map(r)) => Self::compare_maps(l, r),
342 (Value::Node(l), Value::Node(r)) => Self::compare_nodes(l, r),
343 (Value::Edge(l), Value::Edge(r)) => Self::compare_edges(l, r),
344 (Value::List(l), Value::List(r)) => Self::compare_lists(l, r),
345 (Value::Path(l), Value::Path(r)) => Self::compare_paths(l, r),
346 (Value::String(l), Value::String(r)) => {
347 Self::try_eval_ordering(a, b).unwrap_or_else(|| l.cmp(r))
349 }
350 (Value::Bool(l), Value::Bool(r)) => l.cmp(r),
351 (Value::Temporal(l), Value::Temporal(r)) => Self::compare_temporal(l, r),
352 (Value::Int(l), Value::Int(r)) => l.cmp(r),
353 (Value::Float(l), Value::Float(r)) => {
354 if l.is_nan() && r.is_nan() {
355 Ordering::Equal
356 } else if l.is_nan() {
357 Ordering::Greater
358 } else if r.is_nan() {
359 Ordering::Less
360 } else {
361 l.partial_cmp(r).unwrap_or(Ordering::Equal)
362 }
363 }
364 (Value::Int(l), Value::Float(r)) => {
365 if r.is_nan() {
366 Ordering::Less
367 } else {
368 (*l as f64).partial_cmp(r).unwrap_or(Ordering::Equal)
369 }
370 }
371 (Value::Float(l), Value::Int(r)) => {
372 if l.is_nan() {
373 Ordering::Greater
374 } else {
375 l.partial_cmp(&(*r as f64)).unwrap_or(Ordering::Equal)
376 }
377 }
378 (Value::Bytes(l), Value::Bytes(r)) => l.cmp(r),
379 (Value::Vector(l), Value::Vector(r)) => {
380 for (lv, rv) in l.iter().zip(r.iter()) {
381 let ord = lv.total_cmp(rv);
382 if ord != Ordering::Equal {
383 return ord;
384 }
385 }
386 l.len().cmp(&r.len())
387 }
388 _ => Ordering::Equal,
389 }
390 }
391
392 fn try_eval_ordering(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
393 use std::cmp::Ordering;
394 if matches!(eval_binary_op(a, &BinaryOp::Lt, b), Ok(Value::Bool(true))) {
395 Some(Ordering::Less)
396 } else if matches!(eval_binary_op(a, &BinaryOp::Gt, b), Ok(Value::Bool(true))) {
397 Some(Ordering::Greater)
398 } else if matches!(eval_binary_op(a, &BinaryOp::Eq, b), Ok(Value::Bool(true))) {
399 Some(Ordering::Equal)
400 } else {
401 None
402 }
403 }
404
405 fn order_by_type_rank(v: &Value) -> u8 {
408 match v {
409 Value::Map(map) => Self::map_order_rank(map),
410 Value::Node(_) => 1,
411 Value::Edge(_) => 2,
412 Value::List(_) => 3,
413 Value::Path(_) => 4,
414 Value::String(_) => 5,
415 Value::Bool(_) => 6,
416 Value::Temporal(_) => 7,
417 Value::Int(_) => 8,
418 Value::Float(f) if f.is_nan() => 9,
419 Value::Float(_) => 8,
420 Value::Null => 10,
421 Value::Bytes(_) | Value::Vector(_) => 11,
422 _ => 11,
423 }
424 }
425
426 fn map_order_rank(map: &HashMap<String, Value>) -> u8 {
427 if Self::map_as_temporal(map).is_some() {
428 7
429 } else if map.contains_key("nodes")
430 && (map.contains_key("relationships") || map.contains_key("edges"))
431 {
432 4
433 } else if map.contains_key("_eid")
434 || map.contains_key("_src")
435 || map.contains_key("_dst")
436 || map.contains_key("_type")
437 || map.contains_key("_type_name")
438 {
439 2
440 } else if map.contains_key("_vid")
441 || map.contains_key("_labels")
442 || map.contains_key("_label")
443 {
444 1
445 } else {
446 0
447 }
448 }
449
450 fn extract_temporal_value(value: &Value) -> Option<TemporalValue> {
451 match value {
452 Value::Temporal(t) => Some(t.clone()),
453 Value::Map(map) => Self::map_as_temporal(map),
454 Value::String(s) => Self::string_as_temporal(s),
455 _ => None,
456 }
457 }
458
459 fn string_as_temporal(s: &str) -> Option<TemporalValue> {
460 let fn_name = match classify_temporal(s)? {
461 TemporalType::Date => "DATE",
462 TemporalType::LocalTime => "LOCALTIME",
463 TemporalType::Time => "TIME",
464 TemporalType::LocalDateTime => "LOCALDATETIME",
465 TemporalType::DateTime => "DATETIME",
466 TemporalType::Duration => "DURATION",
467 };
468 match eval_datetime_function(fn_name, &[Value::String(s.to_string())]).ok()? {
469 Value::Temporal(tv) => Some(tv),
470 _ => None,
471 }
472 }
473
474 fn map_as_temporal(map: &HashMap<String, Value>) -> Option<TemporalValue> {
475 if map.len() != 1 {
476 return None;
477 }
478
479 let as_i32 = |v: &Value| v.as_i64().and_then(|n| i32::try_from(n).ok());
480 let as_i64 = |v: &Value| v.as_i64();
481
482 if let Some(Value::Map(inner)) = map.get("Date") {
483 let days = inner.get("days_since_epoch").and_then(as_i32)?;
484 return Some(TemporalValue::Date {
485 days_since_epoch: days,
486 });
487 }
488 if let Some(Value::Map(inner)) = map.get("LocalTime") {
489 let nanos = inner.get("nanos_since_midnight").and_then(as_i64)?;
490 return Some(TemporalValue::LocalTime {
491 nanos_since_midnight: nanos,
492 });
493 }
494 if let Some(Value::Map(inner)) = map.get("Time") {
495 let nanos = inner.get("nanos_since_midnight").and_then(as_i64)?;
496 let offset = inner.get("offset_seconds").and_then(as_i32)?;
497 return Some(TemporalValue::Time {
498 nanos_since_midnight: nanos,
499 offset_seconds: offset,
500 });
501 }
502 if let Some(Value::Map(inner)) = map.get("LocalDateTime") {
503 let nanos = inner.get("nanos_since_epoch").and_then(as_i64)?;
504 return Some(TemporalValue::LocalDateTime {
505 nanos_since_epoch: nanos,
506 });
507 }
508 if let Some(Value::Map(inner)) = map.get("DateTime") {
509 let nanos = inner.get("nanos_since_epoch").and_then(as_i64)?;
510 let offset = inner.get("offset_seconds").and_then(as_i32)?;
511 let timezone_name = match inner.get("timezone_name") {
512 Some(Value::String(s)) => Some(s.clone()),
513 _ => None,
514 };
515 return Some(TemporalValue::DateTime {
516 nanos_since_epoch: nanos,
517 offset_seconds: offset,
518 timezone_name,
519 });
520 }
521 if let Some(Value::Map(inner)) = map.get("Duration") {
522 let months = inner.get("months").and_then(as_i64)?;
523 let days = inner.get("days").and_then(as_i64)?;
524 let nanos = inner.get("nanos").and_then(as_i64)?;
525 return Some(TemporalValue::Duration {
526 months,
527 days,
528 nanos,
529 });
530 }
531 None
532 }
533
534 fn compare_lists(left: &[Value], right: &[Value]) -> std::cmp::Ordering {
535 left.iter()
536 .zip(right.iter())
537 .map(|(l, r)| Self::compare_values(l, r))
538 .find(|o| o.is_ne())
539 .unwrap_or_else(|| left.len().cmp(&right.len()))
540 }
541
542 fn compare_maps(
543 left: &HashMap<String, Value>,
544 right: &HashMap<String, Value>,
545 ) -> std::cmp::Ordering {
546 let mut l_pairs: Vec<_> = left.iter().collect();
547 let mut r_pairs: Vec<_> = right.iter().collect();
548 l_pairs.sort_by_key(|(k, _)| *k);
549 r_pairs.sort_by_key(|(k, _)| *k);
550
551 l_pairs
552 .iter()
553 .zip(r_pairs.iter())
554 .map(|((lk, lv), (rk, rv))| lk.cmp(rk).then_with(|| Self::compare_values(lv, rv)))
555 .find(|o| o.is_ne())
556 .unwrap_or_else(|| l_pairs.len().cmp(&r_pairs.len()))
557 }
558
559 fn compare_nodes(left: &uni_common::Node, right: &uni_common::Node) -> std::cmp::Ordering {
560 let mut l_labels = left.labels.clone();
561 let mut r_labels = right.labels.clone();
562 l_labels.sort();
563 r_labels.sort();
564
565 l_labels
566 .cmp(&r_labels)
567 .then_with(|| left.vid.cmp(&right.vid))
568 .then_with(|| Self::compare_maps(&left.properties, &right.properties))
569 }
570
571 fn compare_edges(left: &uni_common::Edge, right: &uni_common::Edge) -> std::cmp::Ordering {
572 left.edge_type
573 .cmp(&right.edge_type)
574 .then_with(|| left.src.cmp(&right.src))
575 .then_with(|| left.dst.cmp(&right.dst))
576 .then_with(|| left.eid.cmp(&right.eid))
577 .then_with(|| Self::compare_maps(&left.properties, &right.properties))
578 }
579
580 fn compare_paths(left: &uni_common::Path, right: &uni_common::Path) -> std::cmp::Ordering {
581 left.nodes
582 .iter()
583 .zip(right.nodes.iter())
584 .map(|(l, r)| Self::compare_nodes(l, r))
585 .find(|o| o.is_ne())
586 .unwrap_or_else(|| left.nodes.len().cmp(&right.nodes.len()))
587 .then_with(|| {
588 left.edges
589 .iter()
590 .zip(right.edges.iter())
591 .map(|(l, r)| Self::compare_edges(l, r))
592 .find(|o| o.is_ne())
593 .unwrap_or_else(|| left.edges.len().cmp(&right.edges.len()))
594 })
595 }
596
597 fn compare_temporal(left: &TemporalValue, right: &TemporalValue) -> std::cmp::Ordering {
598 match (left, right) {
599 (
600 TemporalValue::Date {
601 days_since_epoch: l,
602 },
603 TemporalValue::Date {
604 days_since_epoch: r,
605 },
606 ) => l.cmp(r),
607 (
608 TemporalValue::LocalTime {
609 nanos_since_midnight: l,
610 },
611 TemporalValue::LocalTime {
612 nanos_since_midnight: r,
613 },
614 ) => l.cmp(r),
615 (
616 TemporalValue::Time {
617 nanos_since_midnight: lm,
618 offset_seconds: lo,
619 },
620 TemporalValue::Time {
621 nanos_since_midnight: rm,
622 offset_seconds: ro,
623 },
624 ) => {
625 let l_utc = *lm as i128 - (*lo as i128) * 1_000_000_000;
626 let r_utc = *rm as i128 - (*ro as i128) * 1_000_000_000;
627 l_utc.cmp(&r_utc)
628 }
629 (
630 TemporalValue::LocalDateTime {
631 nanos_since_epoch: l,
632 },
633 TemporalValue::LocalDateTime {
634 nanos_since_epoch: r,
635 },
636 ) => l.cmp(r),
637 (
638 TemporalValue::DateTime {
639 nanos_since_epoch: l,
640 ..
641 },
642 TemporalValue::DateTime {
643 nanos_since_epoch: r,
644 ..
645 },
646 ) => l.cmp(r),
647 (
648 TemporalValue::Duration {
649 months: lm,
650 days: ld,
651 nanos: ln,
652 },
653 TemporalValue::Duration {
654 months: rm,
655 days: rd,
656 nanos: rn,
657 },
658 ) => (*lm, *ld, *ln).cmp(&(*rm, *rd, *rn)),
659 _ => Self::temporal_variant_rank(left).cmp(&Self::temporal_variant_rank(right)),
660 }
661 }
662
663 fn temporal_variant_rank(v: &TemporalValue) -> u8 {
664 match v {
665 TemporalValue::Date { .. } => 0,
666 TemporalValue::LocalTime { .. } => 1,
667 TemporalValue::Time { .. } => 2,
668 TemporalValue::LocalDateTime { .. } => 3,
669 TemporalValue::DateTime { .. } => 4,
670 TemporalValue::Duration { .. } => 5,
671 }
672 }
673}
674
675#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
676pub struct ProfileOutput {
677 pub explain: crate::query::planner::ExplainOutput,
678 pub runtime_stats: Vec<OperatorStats>,
679 pub total_time_ms: u64,
680 pub peak_memory_bytes: usize,
681}
682
683#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
684pub struct OperatorStats {
685 pub operator: String,
686 pub actual_rows: usize,
687 pub time_ms: f64,
688 pub memory_bytes: usize,
689 pub index_hits: Option<usize>,
690 pub index_misses: Option<usize>,
691}
692
693impl Executor {
694 pub async fn profile(
699 &self,
700 plan: crate::query::planner::LogicalPlan,
701 params: &HashMap<String, Value>,
702 ) -> Result<(Vec<HashMap<String, Value>>, ProfileOutput)> {
703 let planner =
705 crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
706 let explain_output = planner.explain_logical_plan(&plan)?;
707
708 let start = Instant::now();
709
710 let prop_manager = self.create_prop_manager();
712 let results = self.execute(plan.clone(), &prop_manager, params).await?;
713
714 let total_time = start.elapsed();
715
716 let stats = vec![OperatorStats {
718 operator: "DataFusion Execution".to_string(),
719 actual_rows: results.len(),
720 time_ms: total_time.as_secs_f64() * 1000.0,
721 memory_bytes: 0,
722 index_hits: None,
723 index_misses: None,
724 }];
725
726 Ok((
727 results,
728 ProfileOutput {
729 explain: explain_output,
730 runtime_stats: stats,
731 total_time_ms: total_time.as_millis() as u64,
732 peak_memory_bytes: 0,
733 },
734 ))
735 }
736
737 fn create_prop_manager(&self) -> uni_store::runtime::property_manager::PropertyManager {
738 uni_store::runtime::property_manager::PropertyManager::new(
739 self.storage.clone(),
740 self.storage.schema_manager_arc(),
741 1000,
742 )
743 }
744}