Skip to main content

uni_query/query/executor/
core.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use anyhow::{Result, anyhow};
5use std::collections::{HashMap, HashSet};
6use std::sync::Arc;
7use std::time::Instant;
8use tokio::sync::RwLock;
9use uni_algo::algo::AlgorithmRegistry;
10use uni_common::{TemporalValue, Value};
11use uni_cypher::ast::{BinaryOp, Expr};
12use uni_store::QueryContext;
13use uni_store::runtime::l0_manager::L0Manager;
14use uni_store::runtime::writer::Writer;
15use uni_store::storage::manager::StorageManager;
16use uni_xervo::runtime::ModelRuntime;
17
18use crate::query::expr_eval::eval_binary_op;
19use crate::types::QueryWarning;
20
21use super::procedure::ProcedureRegistry;
22
23/// Mutable accumulator for Cypher aggregate functions (COUNT, SUM, AVG, ...).
24#[derive(Debug)]
25pub(crate) enum Accumulator {
26    Count(i64),
27    Sum(f64),
28    Min(Option<Value>),
29    Max(Option<Value>),
30    Avg { sum: f64, count: i64 },
31    Collect(Vec<Value>),
32    CountDistinct(HashSet<String>),
33    PercentileDisc { values: Vec<f64>, percentile: f64 },
34    PercentileCont { values: Vec<f64>, percentile: f64 },
35}
36
37/// Convert f64 to Value, preserving integer representation when possible.
38fn numeric_to_value(val: f64) -> Value {
39    if val.fract() == 0.0 && val >= i64::MIN as f64 && val <= i64::MAX as f64 {
40        Value::Int(val as i64)
41    } else {
42        Value::Float(val)
43    }
44}
45
46/// Cross-type ordering rank for Cypher min/max (lower rank = smaller).
47fn cypher_type_rank(val: &Value) -> u8 {
48    match val {
49        Value::Null => 0,
50        Value::List(_) => 1,
51        Value::String(_) => 2,
52        Value::Bool(_) => 3,
53        Value::Int(_) | Value::Float(_) => 4,
54        _ => 5,
55    }
56}
57
58/// Compare two Cypher values for min/max with cross-type ordering.
59fn cypher_cross_type_cmp(a: &Value, b: &Value) -> std::cmp::Ordering {
60    use std::cmp::Ordering;
61    let ra = cypher_type_rank(a);
62    let rb = cypher_type_rank(b);
63    if ra != rb {
64        return ra.cmp(&rb);
65    }
66    match (a, b) {
67        (Value::Int(l), Value::Int(r)) => l.cmp(r),
68        (Value::Float(l), Value::Float(r)) => l.partial_cmp(r).unwrap_or(Ordering::Equal),
69        (Value::Int(l), Value::Float(r)) => (*l as f64).partial_cmp(r).unwrap_or(Ordering::Equal),
70        (Value::Float(l), Value::Int(r)) => l.partial_cmp(&(*r as f64)).unwrap_or(Ordering::Equal),
71        (Value::String(l), Value::String(r)) => l.cmp(r),
72        (Value::Bool(l), Value::Bool(r)) => l.cmp(r),
73        _ => Ordering::Equal,
74    }
75}
76
77impl Accumulator {
78    pub(crate) fn new(op: &str, distinct: bool) -> Self {
79        Self::new_with_percentile(op, distinct, 0.0)
80    }
81
82    pub(crate) fn new_with_percentile(op: &str, distinct: bool, percentile: f64) -> Self {
83        let op_upper = op.to_uppercase();
84        match op_upper.as_str() {
85            "COUNT" if distinct => Accumulator::CountDistinct(HashSet::new()),
86            "COUNT" => Accumulator::Count(0),
87            "SUM" => Accumulator::Sum(0.0),
88            "MIN" => Accumulator::Min(None),
89            "MAX" => Accumulator::Max(None),
90            "AVG" => Accumulator::Avg { sum: 0.0, count: 0 },
91            "COLLECT" => Accumulator::Collect(Vec::new()),
92            "PERCENTILEDISC" => Accumulator::PercentileDisc {
93                values: Vec::new(),
94                percentile,
95            },
96            "PERCENTILECONT" => Accumulator::PercentileCont {
97                values: Vec::new(),
98                percentile,
99            },
100            _ => Accumulator::Count(0),
101        }
102    }
103
104    pub(crate) fn update(&mut self, val: &Value, is_wildcard: bool) {
105        match self {
106            Accumulator::Count(c) => {
107                if is_wildcard || !val.is_null() {
108                    *c += 1;
109                }
110            }
111            Accumulator::Sum(s) => {
112                if let Some(n) = val.as_f64() {
113                    *s += n;
114                }
115            }
116            Accumulator::Min(current) => {
117                if !val.is_null() {
118                    *current = Some(match current.take() {
119                        None => val.clone(),
120                        Some(cur) if cypher_cross_type_cmp(val, &cur).is_lt() => val.clone(),
121                        Some(cur) => cur,
122                    });
123                }
124            }
125            Accumulator::Max(current) => {
126                if !val.is_null() {
127                    *current = Some(match current.take() {
128                        None => val.clone(),
129                        Some(cur) if cypher_cross_type_cmp(val, &cur).is_gt() => val.clone(),
130                        Some(cur) => cur,
131                    });
132                }
133            }
134            Accumulator::Avg { sum, count } => {
135                if let Some(n) = val.as_f64() {
136                    *sum += n;
137                    *count += 1;
138                }
139            }
140            Accumulator::Collect(v) => {
141                if !val.is_null() {
142                    v.push(val.clone());
143                }
144            }
145            Accumulator::CountDistinct(s) => {
146                if !val.is_null() {
147                    s.insert(val.to_string());
148                }
149            }
150            Accumulator::PercentileDisc { values, .. }
151            | Accumulator::PercentileCont { values, .. } => {
152                if let Some(n) = val.as_f64() {
153                    values.push(n);
154                }
155            }
156        }
157    }
158
159    pub(crate) fn finish(&self) -> Value {
160        match self {
161            Accumulator::Count(c) => Value::Int(*c),
162            Accumulator::Sum(s) => numeric_to_value(*s),
163            Accumulator::Min(opt) => opt.as_ref().cloned().unwrap_or(Value::Null),
164            Accumulator::Max(opt) => opt.as_ref().cloned().unwrap_or(Value::Null),
165            Accumulator::Avg { sum, count } => {
166                if *count > 0 {
167                    Value::Float(*sum / (*count as f64))
168                } else {
169                    Value::Null
170                }
171            }
172            Accumulator::Collect(v) => Value::List(v.clone()),
173            Accumulator::CountDistinct(s) => Value::Int(s.len() as i64),
174            Accumulator::PercentileDisc { values, percentile } => {
175                if values.is_empty() {
176                    return Value::Null;
177                }
178                let mut sorted = values.clone();
179                sorted.sort_by(|a, b| a.total_cmp(b));
180                let n = sorted.len();
181                let idx = (percentile * (n as f64 - 1.0)).round() as usize;
182                numeric_to_value(sorted[idx.min(n - 1)])
183            }
184            Accumulator::PercentileCont { values, percentile } => {
185                if values.is_empty() {
186                    return Value::Null;
187                }
188                let mut sorted = values.clone();
189                sorted.sort_by(|a, b| a.total_cmp(b));
190                let n = sorted.len();
191                if n == 1 {
192                    return Value::Float(sorted[0]);
193                }
194                let pos = percentile * (n as f64 - 1.0);
195                let lower = (pos.floor() as usize).min(n - 1);
196                let upper = (pos.ceil() as usize).min(n - 1);
197                if lower == upper {
198                    Value::Float(sorted[lower])
199                } else {
200                    let frac = pos - lower as f64;
201                    Value::Float(sorted[lower] + frac * (sorted[upper] - sorted[lower]))
202                }
203            }
204        }
205    }
206}
207
208/// Cache key for parsed generation expressions: (label_name, property_name)
209pub(crate) type GenExprCacheKey = (String, String);
210
211/// Query executor: runs logical plans against a Uni storage backend.
212///
213/// `Executor` bridges the logical query plan produced by [`crate::query::planner::QueryPlanner`]
214/// with the underlying `StorageManager`. It handles both read-only sessions and
215/// write-enabled sessions (via `Writer` and `L0Manager`).
216///
217/// # Cloning
218///
219/// `Executor` is cheaply cloneable — all expensive state is held behind `Arc`s.
220/// Clone it freely to share across tasks.
221// M-PUBLIC-DEBUG: Manual impl because Writer/ModelRuntime do not implement Debug.
222#[derive(Clone)]
223pub struct Executor {
224    pub(crate) storage: Arc<StorageManager>,
225    pub(crate) writer: Option<Arc<RwLock<Writer>>>,
226    pub(crate) l0_manager: Option<Arc<L0Manager>>,
227    pub(crate) algo_registry: Arc<AlgorithmRegistry>,
228    pub(crate) use_transaction: bool,
229    /// File sandbox configuration for BACKUP/COPY/EXPORT commands
230    pub(crate) file_sandbox: uni_common::config::FileSandboxConfig,
231    pub(crate) config: uni_common::config::UniConfig,
232    /// Cache for parsed generation expressions to avoid re-parsing on every row
233    pub(crate) gen_expr_cache: Arc<RwLock<HashMap<GenExprCacheKey, Expr>>>,
234    /// External procedure registry for test/user-defined procedures.
235    pub(crate) procedure_registry: Option<Arc<ProcedureRegistry>>,
236    /// Uni-Xervo runtime used by vector auto-embedding paths.
237    pub(crate) xervo_runtime: Option<Arc<ModelRuntime>>,
238    /// Warnings collected during the last execution.
239    pub(crate) warnings: Arc<std::sync::Mutex<Vec<QueryWarning>>>,
240}
241
242impl std::fmt::Debug for Executor {
243    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244        f.debug_struct("Executor")
245            .field("use_transaction", &self.use_transaction)
246            .field("has_writer", &self.writer.is_some())
247            .field("has_l0_manager", &self.l0_manager.is_some())
248            .field("has_xervo_runtime", &self.xervo_runtime.is_some())
249            .finish_non_exhaustive()
250    }
251}
252
253impl Executor {
254    /// Create a read-only executor backed by the given storage manager.
255    pub fn new(storage: Arc<StorageManager>) -> Self {
256        Self {
257            storage,
258            writer: None,
259            l0_manager: None,
260            algo_registry: Arc::new(AlgorithmRegistry::new()),
261            use_transaction: false,
262            file_sandbox: uni_common::config::FileSandboxConfig::default(),
263            config: uni_common::config::UniConfig::default(),
264            gen_expr_cache: Arc::new(RwLock::new(HashMap::new())),
265            procedure_registry: None,
266            xervo_runtime: None,
267            warnings: Arc::new(std::sync::Mutex::new(Vec::new())),
268        }
269    }
270
271    /// Create a write-enabled executor with an attached `Writer`.
272    pub fn new_with_writer(storage: Arc<StorageManager>, writer: Arc<RwLock<Writer>>) -> Self {
273        let mut executor = Self::new(storage);
274        executor.writer = Some(writer);
275        executor
276    }
277
278    /// Attach an external procedure registry for user-defined procedures.
279    pub fn set_procedure_registry(&mut self, registry: Arc<ProcedureRegistry>) {
280        self.procedure_registry = Some(registry);
281    }
282
283    /// Attach or detach the Uni-Xervo model runtime for vector auto-embedding.
284    pub fn set_xervo_runtime(&mut self, runtime: Option<Arc<ModelRuntime>>) {
285        self.xervo_runtime = runtime;
286    }
287
288    /// Set the file sandbox configuration for BACKUP/COPY/EXPORT commands.
289    /// MUST be called with sandboxed config in server mode.
290    pub fn set_file_sandbox(&mut self, sandbox: uni_common::config::FileSandboxConfig) {
291        self.file_sandbox = sandbox;
292    }
293
294    /// Apply a runtime configuration to this executor.
295    pub fn set_config(&mut self, config: uni_common::config::UniConfig) {
296        self.config = config;
297    }
298
299    /// Validate a file path against the sandbox configuration.
300    pub(crate) fn validate_path(&self, path: &str) -> Result<std::path::PathBuf> {
301        self.file_sandbox
302            .validate_path(path)
303            .map_err(|e| anyhow!("Path validation failed: {}", e))
304    }
305
306    /// Attach a `Writer` after construction, enabling write operations.
307    pub fn set_writer(&mut self, writer: Arc<RwLock<Writer>>) {
308        self.writer = Some(writer);
309    }
310
311    /// Take all collected warnings from the last execution, leaving the collector empty.
312    pub fn take_warnings(&self) -> Vec<QueryWarning> {
313        self.warnings
314            .lock()
315            .map(|mut w| std::mem::take(&mut *w))
316            .unwrap_or_default()
317    }
318
319    /// Configure whether query execution should operate within a transaction context.
320    pub fn set_use_transaction(&mut self, use_transaction: bool) {
321        self.use_transaction = use_transaction;
322    }
323
324    /// Build a `QueryContext` from the current writer or standalone L0 manager.
325    pub(crate) async fn get_context(&self) -> Option<QueryContext> {
326        if let Some(writer_lock) = &self.writer {
327            let writer = writer_lock.read().await;
328            // Include pending_flush L0s so data being flushed remains visible
329            let mut ctx = QueryContext::new_with_pending(
330                writer.l0_manager.get_current(),
331                writer.transaction_l0.clone(),
332                writer.l0_manager.get_pending_flush(),
333            );
334            ctx.set_deadline(Instant::now() + self.config.query_timeout);
335            Some(ctx)
336        } else {
337            self.l0_manager.as_ref().map(|m| {
338                let mut ctx = QueryContext::new(m.get_current());
339                ctx.set_deadline(Instant::now() + self.config.query_timeout);
340                ctx
341            })
342        }
343    }
344
345    /// Total ordering for Cypher ORDER BY, including cross-type comparisons.
346    pub(crate) fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
347        use std::cmp::Ordering;
348
349        let temporal_a = Self::extract_temporal_value(a);
350        let temporal_b = Self::extract_temporal_value(b);
351
352        if let (Some(ta), Some(tb)) = (&temporal_a, &temporal_b) {
353            return Self::compare_temporal(ta, tb);
354        }
355
356        // Temporal strings (e.g. "1984-10-11T...") and Value::Temporal should
357        // compare using Cypher temporal semantics when compatible.
358        if matches!(
359            (a, b),
360            (Value::String(_), Value::Temporal(_)) | (Value::Temporal(_), Value::String(_))
361        ) && let Some(ord) = Self::try_eval_ordering(a, b)
362        {
363            return ord;
364        }
365        if let (Value::String(_), Some(tb)) = (a, temporal_b)
366            && let Some(ord) = Self::try_eval_ordering(a, &Value::Temporal(tb))
367        {
368            return ord;
369        }
370        if let (Some(ta), Value::String(_)) = (temporal_a, b)
371            && let Some(ord) = Self::try_eval_ordering(&Value::Temporal(ta), b)
372        {
373            return ord;
374        }
375
376        let ra = Self::order_by_type_rank(a);
377        let rb = Self::order_by_type_rank(b);
378        if ra != rb {
379            return ra.cmp(&rb);
380        }
381
382        match (a, b) {
383            (Value::Map(l), Value::Map(r)) => Self::compare_maps(l, r),
384            (Value::Node(l), Value::Node(r)) => Self::compare_nodes(l, r),
385            (Value::Edge(l), Value::Edge(r)) => Self::compare_edges(l, r),
386            (Value::List(l), Value::List(r)) => Self::compare_lists(l, r),
387            (Value::Path(l), Value::Path(r)) => Self::compare_paths(l, r),
388            (Value::String(l), Value::String(r)) => {
389                // Use eval_binary_op on the original references to avoid cloning.
390                Self::try_eval_ordering(a, b).unwrap_or_else(|| l.cmp(r))
391            }
392            (Value::Bool(l), Value::Bool(r)) => l.cmp(r),
393            (Value::Temporal(l), Value::Temporal(r)) => Self::compare_temporal(l, r),
394            (Value::Int(l), Value::Int(r)) => l.cmp(r),
395            (Value::Float(l), Value::Float(r)) => {
396                if l.is_nan() && r.is_nan() {
397                    Ordering::Equal
398                } else if l.is_nan() {
399                    Ordering::Greater
400                } else if r.is_nan() {
401                    Ordering::Less
402                } else {
403                    l.partial_cmp(r).unwrap_or(Ordering::Equal)
404                }
405            }
406            (Value::Int(l), Value::Float(r)) => {
407                if r.is_nan() {
408                    Ordering::Less
409                } else {
410                    (*l as f64).partial_cmp(r).unwrap_or(Ordering::Equal)
411                }
412            }
413            (Value::Float(l), Value::Int(r)) => {
414                if l.is_nan() {
415                    Ordering::Greater
416                } else {
417                    l.partial_cmp(&(*r as f64)).unwrap_or(Ordering::Equal)
418                }
419            }
420            (Value::Bytes(l), Value::Bytes(r)) => l.cmp(r),
421            (Value::Vector(l), Value::Vector(r)) => {
422                for (lv, rv) in l.iter().zip(r.iter()) {
423                    let ord = lv.total_cmp(rv);
424                    if ord != Ordering::Equal {
425                        return ord;
426                    }
427                }
428                l.len().cmp(&r.len())
429            }
430            _ => Ordering::Equal,
431        }
432    }
433
434    fn try_eval_ordering(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
435        use std::cmp::Ordering;
436        if matches!(eval_binary_op(a, &BinaryOp::Lt, b), Ok(Value::Bool(true))) {
437            Some(Ordering::Less)
438        } else if matches!(eval_binary_op(a, &BinaryOp::Gt, b), Ok(Value::Bool(true))) {
439            Some(Ordering::Greater)
440        } else if matches!(eval_binary_op(a, &BinaryOp::Eq, b), Ok(Value::Bool(true))) {
441            Some(Ordering::Equal)
442        } else {
443            None
444        }
445    }
446
447    /// Cypher ORDER BY total precedence:
448    /// MAP < NODE < RELATIONSHIP < LIST < PATH < STRING < BOOLEAN < TEMPORAL < NUMBER < NaN < NULL
449    fn order_by_type_rank(v: &Value) -> u8 {
450        match v {
451            Value::Map(map) => Self::map_order_rank(map),
452            Value::Node(_) => 1,
453            Value::Edge(_) => 2,
454            Value::List(_) => 3,
455            Value::Path(_) => 4,
456            Value::String(_) => 5,
457            Value::Bool(_) => 6,
458            Value::Temporal(_) => 7,
459            Value::Int(_) => 8,
460            Value::Float(f) if f.is_nan() => 9,
461            Value::Float(_) => 8,
462            Value::Null => 10,
463            Value::Bytes(_) | Value::Vector(_) => 11,
464            _ => 11,
465        }
466    }
467
468    fn map_order_rank(map: &HashMap<String, Value>) -> u8 {
469        if Self::map_as_temporal(map).is_some() {
470            7
471        } else if map.contains_key("nodes")
472            && (map.contains_key("relationships") || map.contains_key("edges"))
473        {
474            4
475        } else if map.contains_key("_eid")
476            || map.contains_key("_src")
477            || map.contains_key("_dst")
478            || map.contains_key("_type")
479            || map.contains_key("_type_name")
480        {
481            2
482        } else if map.contains_key("_vid")
483            || map.contains_key("_labels")
484            || map.contains_key("_label")
485        {
486            1
487        } else {
488            0
489        }
490    }
491
492    fn extract_temporal_value(value: &Value) -> Option<TemporalValue> {
493        crate::query::expr_eval::temporal_from_value(value)
494    }
495
496    fn map_as_temporal(map: &HashMap<String, Value>) -> Option<TemporalValue> {
497        crate::query::expr_eval::temporal_from_map_wrapper(map)
498    }
499
500    fn compare_lists(left: &[Value], right: &[Value]) -> std::cmp::Ordering {
501        left.iter()
502            .zip(right.iter())
503            .map(|(l, r)| Self::compare_values(l, r))
504            .find(|o| o.is_ne())
505            .unwrap_or_else(|| left.len().cmp(&right.len()))
506    }
507
508    fn compare_maps(
509        left: &HashMap<String, Value>,
510        right: &HashMap<String, Value>,
511    ) -> std::cmp::Ordering {
512        let mut l_pairs: Vec<_> = left.iter().collect();
513        let mut r_pairs: Vec<_> = right.iter().collect();
514        l_pairs.sort_by_key(|(k, _)| *k);
515        r_pairs.sort_by_key(|(k, _)| *k);
516
517        l_pairs
518            .iter()
519            .zip(r_pairs.iter())
520            .map(|((lk, lv), (rk, rv))| lk.cmp(rk).then_with(|| Self::compare_values(lv, rv)))
521            .find(|o| o.is_ne())
522            .unwrap_or_else(|| l_pairs.len().cmp(&r_pairs.len()))
523    }
524
525    fn compare_nodes(left: &uni_common::Node, right: &uni_common::Node) -> std::cmp::Ordering {
526        let mut l_labels = left.labels.clone();
527        let mut r_labels = right.labels.clone();
528        l_labels.sort();
529        r_labels.sort();
530
531        l_labels
532            .cmp(&r_labels)
533            .then_with(|| left.vid.cmp(&right.vid))
534            .then_with(|| Self::compare_maps(&left.properties, &right.properties))
535    }
536
537    fn compare_edges(left: &uni_common::Edge, right: &uni_common::Edge) -> std::cmp::Ordering {
538        left.edge_type
539            .cmp(&right.edge_type)
540            .then_with(|| left.src.cmp(&right.src))
541            .then_with(|| left.dst.cmp(&right.dst))
542            .then_with(|| left.eid.cmp(&right.eid))
543            .then_with(|| Self::compare_maps(&left.properties, &right.properties))
544    }
545
546    fn compare_paths(left: &uni_common::Path, right: &uni_common::Path) -> std::cmp::Ordering {
547        left.nodes
548            .iter()
549            .zip(right.nodes.iter())
550            .map(|(l, r)| Self::compare_nodes(l, r))
551            .find(|o| o.is_ne())
552            .unwrap_or_else(|| left.nodes.len().cmp(&right.nodes.len()))
553            .then_with(|| {
554                left.edges
555                    .iter()
556                    .zip(right.edges.iter())
557                    .map(|(l, r)| Self::compare_edges(l, r))
558                    .find(|o| o.is_ne())
559                    .unwrap_or_else(|| left.edges.len().cmp(&right.edges.len()))
560            })
561    }
562
563    fn compare_temporal(left: &TemporalValue, right: &TemporalValue) -> std::cmp::Ordering {
564        match (left, right) {
565            (
566                TemporalValue::Date {
567                    days_since_epoch: l,
568                },
569                TemporalValue::Date {
570                    days_since_epoch: r,
571                },
572            ) => l.cmp(r),
573            (
574                TemporalValue::LocalTime {
575                    nanos_since_midnight: l,
576                },
577                TemporalValue::LocalTime {
578                    nanos_since_midnight: r,
579                },
580            ) => l.cmp(r),
581            (
582                TemporalValue::Time {
583                    nanos_since_midnight: lm,
584                    offset_seconds: lo,
585                },
586                TemporalValue::Time {
587                    nanos_since_midnight: rm,
588                    offset_seconds: ro,
589                },
590            ) => {
591                let l_utc = *lm as i128 - (*lo as i128) * 1_000_000_000;
592                let r_utc = *rm as i128 - (*ro as i128) * 1_000_000_000;
593                l_utc.cmp(&r_utc)
594            }
595            (
596                TemporalValue::LocalDateTime {
597                    nanos_since_epoch: l,
598                },
599                TemporalValue::LocalDateTime {
600                    nanos_since_epoch: r,
601                },
602            ) => l.cmp(r),
603            (
604                TemporalValue::DateTime {
605                    nanos_since_epoch: l,
606                    ..
607                },
608                TemporalValue::DateTime {
609                    nanos_since_epoch: r,
610                    ..
611                },
612            ) => l.cmp(r),
613            (
614                TemporalValue::Duration {
615                    months: lm,
616                    days: ld,
617                    nanos: ln,
618                },
619                TemporalValue::Duration {
620                    months: rm,
621                    days: rd,
622                    nanos: rn,
623                },
624            ) => (*lm, *ld, *ln).cmp(&(*rm, *rd, *rn)),
625            _ => Self::temporal_variant_rank(left).cmp(&Self::temporal_variant_rank(right)),
626        }
627    }
628
629    fn temporal_variant_rank(v: &TemporalValue) -> u8 {
630        match v {
631            TemporalValue::Date { .. } => 0,
632            TemporalValue::LocalTime { .. } => 1,
633            TemporalValue::Time { .. } => 2,
634            TemporalValue::LocalDateTime { .. } => 3,
635            TemporalValue::DateTime { .. } => 4,
636            TemporalValue::Duration { .. } => 5,
637        }
638    }
639}
640
641/// Combined output of a `PROFILE` query execution.
642///
643/// Contains both the logical plan explanation and per-operator runtime
644/// statistics collected during execution.
645#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
646pub struct ProfileOutput {
647    /// Logical plan explanation with index usage and cost estimates.
648    pub explain: crate::query::planner::ExplainOutput,
649    /// Per-operator timing and memory statistics.
650    pub runtime_stats: Vec<OperatorStats>,
651    /// Wall-clock time for the entire execution in milliseconds.
652    pub total_time_ms: u64,
653    /// Peak memory used during execution in bytes.
654    pub peak_memory_bytes: usize,
655}
656
657/// Runtime statistics for a single logical plan operator.
658#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
659pub struct OperatorStats {
660    /// Human-readable operator name (e.g., `"GraphScan"`, `"Filter"`).
661    pub operator: String,
662    /// Number of rows produced by this operator.
663    pub actual_rows: usize,
664    /// Wall-clock time spent in this operator in milliseconds.
665    pub time_ms: f64,
666    /// Memory allocated by this operator in bytes.
667    pub memory_bytes: usize,
668    /// Number of index cache hits (if applicable).
669    pub index_hits: Option<usize>,
670    /// Number of index cache misses (if applicable).
671    pub index_misses: Option<usize>,
672}
673
674impl Executor {
675    /// Profiles query execution and returns results with timing statistics.
676    ///
677    /// Uses the DataFusion-based executor for query execution. Granular operator
678    /// profiling will be added in a future release.
679    pub async fn profile(
680        &self,
681        plan: crate::query::planner::LogicalPlan,
682        params: &HashMap<String, Value>,
683    ) -> Result<(Vec<HashMap<String, Value>>, ProfileOutput)> {
684        // Generate ExplainOutput first
685        let planner =
686            crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
687        let explain_output = planner.explain_logical_plan(&plan)?;
688
689        let start = Instant::now();
690
691        // Execute using the standard execute path (DataFusion-based)
692        let prop_manager = self.create_prop_manager();
693        let results = self.execute(plan.clone(), &prop_manager, params).await?;
694
695        let total_time = start.elapsed();
696
697        // Return aggregate stats (granular operator profiling to be added later)
698        let stats = vec![OperatorStats {
699            operator: "DataFusion Execution".to_string(),
700            actual_rows: results.len(),
701            time_ms: total_time.as_secs_f64() * 1000.0,
702            memory_bytes: 0,
703            index_hits: None,
704            index_misses: None,
705        }];
706
707        Ok((
708            results,
709            ProfileOutput {
710                explain: explain_output,
711                runtime_stats: stats,
712                total_time_ms: total_time.as_millis() as u64,
713                peak_memory_bytes: 0,
714            },
715        ))
716    }
717
718    fn create_prop_manager(&self) -> uni_store::runtime::property_manager::PropertyManager {
719        uni_store::runtime::property_manager::PropertyManager::new(
720            self.storage.clone(),
721            self.storage.schema_manager_arc(),
722            1000,
723        )
724    }
725}