Skip to main content

uni_query/query/executor/
core.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use anyhow::{Result, anyhow};
5use std::collections::{HashMap, HashSet};
6use std::sync::Arc;
7use std::time::Instant;
8use tokio::sync::RwLock;
9use uni_algo::algo::AlgorithmRegistry;
10use uni_common::{TemporalValue, Value};
11use uni_cypher::ast::{BinaryOp, Expr};
12use uni_store::QueryContext;
13use uni_store::runtime::l0_manager::L0Manager;
14use uni_store::runtime::writer::Writer;
15use uni_store::storage::manager::StorageManager;
16use uni_xervo::runtime::ModelRuntime;
17
18use crate::query::expr_eval::eval_binary_op;
19use crate::types::QueryWarning;
20
21use super::procedure::ProcedureRegistry;
22
23/// Mutable accumulator for Cypher aggregate functions (COUNT, SUM, AVG, ...).
24#[derive(Debug)]
25pub(crate) enum Accumulator {
26    Count(i64),
27    Sum(f64),
28    Min(Option<Value>),
29    Max(Option<Value>),
30    Avg { sum: f64, count: i64 },
31    Collect(Vec<Value>),
32    CountDistinct(HashSet<String>),
33    PercentileDisc { values: Vec<f64>, percentile: f64 },
34    PercentileCont { values: Vec<f64>, percentile: f64 },
35}
36
37/// Convert f64 to Value, preserving integer representation when possible.
38fn numeric_to_value(val: f64) -> Value {
39    if val.fract() == 0.0 && val >= i64::MIN as f64 && val <= i64::MAX as f64 {
40        Value::Int(val as i64)
41    } else {
42        Value::Float(val)
43    }
44}
45
46/// Cross-type ordering rank for Cypher min/max (lower rank = smaller).
47fn cypher_type_rank(val: &Value) -> u8 {
48    match val {
49        Value::Null => 0,
50        Value::List(_) => 1,
51        Value::String(_) => 2,
52        Value::Bool(_) => 3,
53        Value::Int(_) | Value::Float(_) => 4,
54        _ => 5,
55    }
56}
57
58/// Compare two Cypher values for min/max with cross-type ordering.
59fn cypher_cross_type_cmp(a: &Value, b: &Value) -> std::cmp::Ordering {
60    use std::cmp::Ordering;
61    let ra = cypher_type_rank(a);
62    let rb = cypher_type_rank(b);
63    if ra != rb {
64        return ra.cmp(&rb);
65    }
66    match (a, b) {
67        (Value::Int(l), Value::Int(r)) => l.cmp(r),
68        (Value::Float(l), Value::Float(r)) => l.partial_cmp(r).unwrap_or(Ordering::Equal),
69        (Value::Int(l), Value::Float(r)) => (*l as f64).partial_cmp(r).unwrap_or(Ordering::Equal),
70        (Value::Float(l), Value::Int(r)) => l.partial_cmp(&(*r as f64)).unwrap_or(Ordering::Equal),
71        (Value::String(l), Value::String(r)) => l.cmp(r),
72        (Value::Bool(l), Value::Bool(r)) => l.cmp(r),
73        _ => Ordering::Equal,
74    }
75}
76
77impl Accumulator {
78    pub(crate) fn new(op: &str, distinct: bool) -> Self {
79        Self::new_with_percentile(op, distinct, 0.0)
80    }
81
82    pub(crate) fn new_with_percentile(op: &str, distinct: bool, percentile: f64) -> Self {
83        let op_upper = op.to_uppercase();
84        match op_upper.as_str() {
85            "COUNT" if distinct => Accumulator::CountDistinct(HashSet::new()),
86            "COUNT" => Accumulator::Count(0),
87            "SUM" => Accumulator::Sum(0.0),
88            "MIN" => Accumulator::Min(None),
89            "MAX" => Accumulator::Max(None),
90            "AVG" => Accumulator::Avg { sum: 0.0, count: 0 },
91            "COLLECT" => Accumulator::Collect(Vec::new()),
92            "PERCENTILEDISC" => Accumulator::PercentileDisc {
93                values: Vec::new(),
94                percentile,
95            },
96            "PERCENTILECONT" => Accumulator::PercentileCont {
97                values: Vec::new(),
98                percentile,
99            },
100            _ => Accumulator::Count(0),
101        }
102    }
103
104    pub(crate) fn update(&mut self, val: &Value, is_wildcard: bool) {
105        match self {
106            Accumulator::Count(c) => {
107                if is_wildcard || !val.is_null() {
108                    *c += 1;
109                }
110            }
111            Accumulator::Sum(s) => {
112                if let Some(n) = val.as_f64() {
113                    *s += n;
114                }
115            }
116            Accumulator::Min(current) => {
117                if !val.is_null() {
118                    *current = Some(match current.take() {
119                        None => val.clone(),
120                        Some(cur) if cypher_cross_type_cmp(val, &cur).is_lt() => val.clone(),
121                        Some(cur) => cur,
122                    });
123                }
124            }
125            Accumulator::Max(current) => {
126                if !val.is_null() {
127                    *current = Some(match current.take() {
128                        None => val.clone(),
129                        Some(cur) if cypher_cross_type_cmp(val, &cur).is_gt() => val.clone(),
130                        Some(cur) => cur,
131                    });
132                }
133            }
134            Accumulator::Avg { sum, count } => {
135                if let Some(n) = val.as_f64() {
136                    *sum += n;
137                    *count += 1;
138                }
139            }
140            Accumulator::Collect(v) => {
141                if !val.is_null() {
142                    v.push(val.clone());
143                }
144            }
145            Accumulator::CountDistinct(s) => {
146                if !val.is_null() {
147                    s.insert(val.to_string());
148                }
149            }
150            Accumulator::PercentileDisc { values, .. }
151            | Accumulator::PercentileCont { values, .. } => {
152                if let Some(n) = val.as_f64() {
153                    values.push(n);
154                }
155            }
156        }
157    }
158
159    pub(crate) fn finish(&self) -> Value {
160        match self {
161            Accumulator::Count(c) => Value::Int(*c),
162            Accumulator::Sum(s) => numeric_to_value(*s),
163            Accumulator::Min(opt) => opt.as_ref().cloned().unwrap_or(Value::Null),
164            Accumulator::Max(opt) => opt.as_ref().cloned().unwrap_or(Value::Null),
165            Accumulator::Avg { sum, count } => {
166                if *count > 0 {
167                    Value::Float(*sum / (*count as f64))
168                } else {
169                    Value::Null
170                }
171            }
172            Accumulator::Collect(v) => Value::List(v.clone()),
173            Accumulator::CountDistinct(s) => Value::Int(s.len() as i64),
174            Accumulator::PercentileDisc { values, percentile } => {
175                if values.is_empty() {
176                    return Value::Null;
177                }
178                let mut sorted = values.clone();
179                sorted.sort_by(|a, b| a.total_cmp(b));
180                let n = sorted.len();
181                let idx = (percentile * (n as f64 - 1.0)).round() as usize;
182                numeric_to_value(sorted[idx.min(n - 1)])
183            }
184            Accumulator::PercentileCont { values, percentile } => {
185                if values.is_empty() {
186                    return Value::Null;
187                }
188                let mut sorted = values.clone();
189                sorted.sort_by(|a, b| a.total_cmp(b));
190                let n = sorted.len();
191                if n == 1 {
192                    return Value::Float(sorted[0]);
193                }
194                let pos = percentile * (n as f64 - 1.0);
195                let lower = (pos.floor() as usize).min(n - 1);
196                let upper = (pos.ceil() as usize).min(n - 1);
197                if lower == upper {
198                    Value::Float(sorted[lower])
199                } else {
200                    let frac = pos - lower as f64;
201                    Value::Float(sorted[lower] + frac * (sorted[upper] - sorted[lower]))
202                }
203            }
204        }
205    }
206}
207
208/// Cache key for parsed generation expressions: (label_name, property_name)
209pub(crate) type GenExprCacheKey = (String, String);
210
211/// Query executor: runs logical plans against a Uni storage backend.
212///
213/// `Executor` bridges the logical query plan produced by [`crate::query::planner::QueryPlanner`]
214/// with the underlying `StorageManager`. It handles both read-only sessions and
215/// write-enabled sessions (via `Writer` and `L0Manager`).
216///
217/// # Cloning
218///
219/// `Executor` is cheaply cloneable — all expensive state is held behind `Arc`s.
220/// Clone it freely to share across tasks.
221// M-PUBLIC-DEBUG: Manual impl because Writer/ModelRuntime do not implement Debug.
222#[derive(Clone)]
223pub struct Executor {
224    pub(crate) storage: Arc<StorageManager>,
225    pub(crate) writer: Option<Arc<RwLock<Writer>>>,
226    pub(crate) l0_manager: Option<Arc<L0Manager>>,
227    pub(crate) algo_registry: Arc<AlgorithmRegistry>,
228    pub(crate) use_transaction: bool,
229    /// File sandbox configuration for BACKUP/COPY/EXPORT commands
230    pub(crate) file_sandbox: uni_common::config::FileSandboxConfig,
231    pub(crate) config: uni_common::config::UniConfig,
232    /// Cache for parsed generation expressions to avoid re-parsing on every row
233    pub(crate) gen_expr_cache: Arc<RwLock<HashMap<GenExprCacheKey, Expr>>>,
234    /// External procedure registry for test/user-defined procedures.
235    pub(crate) procedure_registry: Option<Arc<ProcedureRegistry>>,
236    /// Uni-Xervo runtime used by vector auto-embedding paths.
237    pub(crate) xervo_runtime: Option<Arc<ModelRuntime>>,
238    /// Warnings collected during the last execution.
239    pub(crate) warnings: Arc<std::sync::Mutex<Vec<QueryWarning>>>,
240    /// Private transaction L0 buffer for query context and mutations.
241    /// Used by Transaction to route reads and writes through a private L0 buffer
242    /// without requiring the writer lock at transaction-creation time.
243    pub(crate) transaction_l0_override:
244        Option<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
245    /// User-defined custom scalar function registry.
246    pub(crate) custom_function_registry:
247        Option<Arc<super::custom_functions::CustomFunctionRegistry>>,
248    /// Cooperative cancellation token. Passed to `QueryContext` and
249    /// `GraphExecutionContext` so in-flight operators can detect cancellation.
250    pub(crate) cancellation_token: Option<tokio_util::sync::CancellationToken>,
251}
252
253impl std::fmt::Debug for Executor {
254    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255        f.debug_struct("Executor")
256            .field("use_transaction", &self.use_transaction)
257            .field("has_writer", &self.writer.is_some())
258            .field("has_l0_manager", &self.l0_manager.is_some())
259            .field("has_xervo_runtime", &self.xervo_runtime.is_some())
260            .finish_non_exhaustive()
261    }
262}
263
264impl Executor {
265    /// Create a read-only executor backed by the given storage manager.
266    pub fn new(storage: Arc<StorageManager>) -> Self {
267        Self {
268            storage,
269            writer: None,
270            l0_manager: None,
271            algo_registry: Arc::new(AlgorithmRegistry::new()),
272            use_transaction: false,
273            file_sandbox: uni_common::config::FileSandboxConfig::default(),
274            config: uni_common::config::UniConfig::default(),
275            gen_expr_cache: Arc::new(RwLock::new(HashMap::new())),
276            procedure_registry: None,
277            xervo_runtime: None,
278            warnings: Arc::new(std::sync::Mutex::new(Vec::new())),
279            transaction_l0_override: None,
280            custom_function_registry: None,
281            cancellation_token: None,
282        }
283    }
284
285    /// Create a write-enabled executor with an attached `Writer`.
286    pub fn new_with_writer(storage: Arc<StorageManager>, writer: Arc<RwLock<Writer>>) -> Self {
287        let mut executor = Self::new(storage);
288        executor.writer = Some(writer);
289        executor
290    }
291
292    /// Attach an external procedure registry for user-defined procedures.
293    pub fn set_procedure_registry(&mut self, registry: Arc<ProcedureRegistry>) {
294        self.procedure_registry = Some(registry);
295    }
296
297    /// Attach or detach the Uni-Xervo model runtime for vector auto-embedding.
298    pub fn set_xervo_runtime(&mut self, runtime: Option<Arc<ModelRuntime>>) {
299        self.xervo_runtime = runtime;
300    }
301
302    /// Set the file sandbox configuration for BACKUP/COPY/EXPORT commands.
303    /// MUST be called with sandboxed config in server mode.
304    pub fn set_file_sandbox(&mut self, sandbox: uni_common::config::FileSandboxConfig) {
305        self.file_sandbox = sandbox;
306    }
307
308    /// Apply a runtime configuration to this executor.
309    pub fn set_config(&mut self, config: uni_common::config::UniConfig) {
310        self.config = config;
311    }
312
313    /// Validate a file path against the sandbox configuration.
314    pub(crate) fn validate_path(&self, path: &str) -> Result<std::path::PathBuf> {
315        self.file_sandbox
316            .validate_path(path)
317            .map_err(|e| anyhow!("Path validation failed: {}", e))
318    }
319
320    /// Attach a `Writer` after construction, enabling write operations.
321    pub fn set_writer(&mut self, writer: Arc<RwLock<Writer>>) {
322        self.writer = Some(writer);
323    }
324
325    /// Take all collected warnings from the last execution, leaving the collector empty.
326    pub fn take_warnings(&self) -> Vec<QueryWarning> {
327        self.warnings
328            .lock()
329            .map(|mut w| std::mem::take(&mut *w))
330            .unwrap_or_default()
331    }
332
333    /// Configure whether query execution should operate within a transaction context.
334    pub fn set_use_transaction(&mut self, use_transaction: bool) {
335        self.use_transaction = use_transaction;
336    }
337
338    /// Set a private transaction L0 buffer for both read visibility (QueryContext)
339    /// and mutation routing.
340    pub fn set_transaction_l0(
341        &mut self,
342        l0: Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>,
343    ) {
344        self.transaction_l0_override = Some(l0);
345    }
346
347    /// Attach a custom scalar function registry for user-defined functions.
348    pub fn set_custom_functions(
349        &mut self,
350        registry: Arc<super::custom_functions::CustomFunctionRegistry>,
351    ) {
352        self.custom_function_registry = Some(registry);
353    }
354
355    /// Set a cooperative cancellation token for in-flight query cancellation.
356    pub fn set_cancellation_token(&mut self, token: tokio_util::sync::CancellationToken) {
357        self.cancellation_token = Some(token);
358    }
359
360    /// Build a `QueryContext` from the current writer or standalone L0 manager.
361    /// When `transaction_l0_override` is set, it is used as the transaction L0 —
362    /// this is how private-per-transaction L0 buffers become visible to reads
363    /// without requiring the writer lock at tx creation.
364    pub(crate) async fn get_context(&self) -> Option<QueryContext> {
365        if let Some(writer_lock) = &self.writer {
366            let writer = writer_lock.read().await;
367            // Prefer the override (private tx L0) over the writer's slot
368            let tx_l0 = self.transaction_l0_override.clone();
369            let mut ctx = QueryContext::new_with_pending(
370                writer.l0_manager.get_current(),
371                tx_l0,
372                writer.l0_manager.get_pending_flush(),
373            );
374            ctx.set_deadline(Instant::now() + self.config.query_timeout);
375            if let Some(ref token) = self.cancellation_token {
376                ctx.set_cancellation_token(token.clone());
377            }
378            Some(ctx)
379        } else {
380            self.l0_manager.as_ref().map(|m| {
381                let mut ctx = QueryContext::new(m.get_current());
382                ctx.set_deadline(Instant::now() + self.config.query_timeout);
383                if let Some(ref token) = self.cancellation_token {
384                    ctx.set_cancellation_token(token.clone());
385                }
386                ctx
387            })
388        }
389    }
390
391    /// Total ordering for Cypher ORDER BY, including cross-type comparisons.
392    pub(crate) fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
393        use std::cmp::Ordering;
394
395        let temporal_a = Self::extract_temporal_value(a);
396        let temporal_b = Self::extract_temporal_value(b);
397
398        if let (Some(ta), Some(tb)) = (&temporal_a, &temporal_b) {
399            return Self::compare_temporal(ta, tb);
400        }
401
402        // Temporal strings (e.g. "1984-10-11T...") and Value::Temporal should
403        // compare using Cypher temporal semantics when compatible.
404        if matches!(
405            (a, b),
406            (Value::String(_), Value::Temporal(_)) | (Value::Temporal(_), Value::String(_))
407        ) && let Some(ord) = Self::try_eval_ordering(a, b)
408        {
409            return ord;
410        }
411        if let (Value::String(_), Some(tb)) = (a, temporal_b)
412            && let Some(ord) = Self::try_eval_ordering(a, &Value::Temporal(tb))
413        {
414            return ord;
415        }
416        if let (Some(ta), Value::String(_)) = (temporal_a, b)
417            && let Some(ord) = Self::try_eval_ordering(&Value::Temporal(ta), b)
418        {
419            return ord;
420        }
421
422        let ra = Self::order_by_type_rank(a);
423        let rb = Self::order_by_type_rank(b);
424        if ra != rb {
425            return ra.cmp(&rb);
426        }
427
428        match (a, b) {
429            (Value::Map(l), Value::Map(r)) => Self::compare_maps(l, r),
430            (Value::Node(l), Value::Node(r)) => Self::compare_nodes(l, r),
431            (Value::Edge(l), Value::Edge(r)) => Self::compare_edges(l, r),
432            (Value::List(l), Value::List(r)) => Self::compare_lists(l, r),
433            (Value::Path(l), Value::Path(r)) => Self::compare_paths(l, r),
434            (Value::String(l), Value::String(r)) => {
435                // Use eval_binary_op on the original references to avoid cloning.
436                Self::try_eval_ordering(a, b).unwrap_or_else(|| l.cmp(r))
437            }
438            (Value::Bool(l), Value::Bool(r)) => l.cmp(r),
439            (Value::Temporal(l), Value::Temporal(r)) => Self::compare_temporal(l, r),
440            (Value::Int(l), Value::Int(r)) => l.cmp(r),
441            (Value::Float(l), Value::Float(r)) => {
442                if l.is_nan() && r.is_nan() {
443                    Ordering::Equal
444                } else if l.is_nan() {
445                    Ordering::Greater
446                } else if r.is_nan() {
447                    Ordering::Less
448                } else {
449                    l.partial_cmp(r).unwrap_or(Ordering::Equal)
450                }
451            }
452            (Value::Int(l), Value::Float(r)) => {
453                if r.is_nan() {
454                    Ordering::Less
455                } else {
456                    (*l as f64).partial_cmp(r).unwrap_or(Ordering::Equal)
457                }
458            }
459            (Value::Float(l), Value::Int(r)) => {
460                if l.is_nan() {
461                    Ordering::Greater
462                } else {
463                    l.partial_cmp(&(*r as f64)).unwrap_or(Ordering::Equal)
464                }
465            }
466            (Value::Bytes(l), Value::Bytes(r)) => l.cmp(r),
467            (Value::Vector(l), Value::Vector(r)) => {
468                for (lv, rv) in l.iter().zip(r.iter()) {
469                    let ord = lv.total_cmp(rv);
470                    if ord != Ordering::Equal {
471                        return ord;
472                    }
473                }
474                l.len().cmp(&r.len())
475            }
476            _ => Ordering::Equal,
477        }
478    }
479
480    fn try_eval_ordering(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
481        use std::cmp::Ordering;
482        if matches!(eval_binary_op(a, &BinaryOp::Lt, b), Ok(Value::Bool(true))) {
483            Some(Ordering::Less)
484        } else if matches!(eval_binary_op(a, &BinaryOp::Gt, b), Ok(Value::Bool(true))) {
485            Some(Ordering::Greater)
486        } else if matches!(eval_binary_op(a, &BinaryOp::Eq, b), Ok(Value::Bool(true))) {
487            Some(Ordering::Equal)
488        } else {
489            None
490        }
491    }
492
493    /// Cypher ORDER BY total precedence:
494    /// MAP < NODE < RELATIONSHIP < LIST < PATH < STRING < BOOLEAN < TEMPORAL < NUMBER < NaN < NULL
495    fn order_by_type_rank(v: &Value) -> u8 {
496        match v {
497            Value::Map(map) => Self::map_order_rank(map),
498            Value::Node(_) => 1,
499            Value::Edge(_) => 2,
500            Value::List(_) => 3,
501            Value::Path(_) => 4,
502            Value::String(_) => 5,
503            Value::Bool(_) => 6,
504            Value::Temporal(_) => 7,
505            Value::Int(_) => 8,
506            Value::Float(f) if f.is_nan() => 9,
507            Value::Float(_) => 8,
508            Value::Null => 10,
509            Value::Bytes(_) | Value::Vector(_) => 11,
510            _ => 11,
511        }
512    }
513
514    fn map_order_rank(map: &HashMap<String, Value>) -> u8 {
515        if Self::map_as_temporal(map).is_some() {
516            7
517        } else if map.contains_key("nodes")
518            && (map.contains_key("relationships") || map.contains_key("edges"))
519        {
520            4
521        } else if map.contains_key("_eid")
522            || map.contains_key("_src")
523            || map.contains_key("_dst")
524            || map.contains_key("_type")
525            || map.contains_key("_type_name")
526        {
527            2
528        } else if map.contains_key("_vid")
529            || map.contains_key("_labels")
530            || map.contains_key("_label")
531        {
532            1
533        } else {
534            0
535        }
536    }
537
538    fn extract_temporal_value(value: &Value) -> Option<TemporalValue> {
539        crate::query::expr_eval::temporal_from_value(value)
540    }
541
542    fn map_as_temporal(map: &HashMap<String, Value>) -> Option<TemporalValue> {
543        crate::query::expr_eval::temporal_from_map_wrapper(map)
544    }
545
546    fn compare_lists(left: &[Value], right: &[Value]) -> std::cmp::Ordering {
547        left.iter()
548            .zip(right.iter())
549            .map(|(l, r)| Self::compare_values(l, r))
550            .find(|o| o.is_ne())
551            .unwrap_or_else(|| left.len().cmp(&right.len()))
552    }
553
554    fn compare_maps(
555        left: &HashMap<String, Value>,
556        right: &HashMap<String, Value>,
557    ) -> std::cmp::Ordering {
558        let mut l_pairs: Vec<_> = left.iter().collect();
559        let mut r_pairs: Vec<_> = right.iter().collect();
560        l_pairs.sort_by_key(|(k, _)| *k);
561        r_pairs.sort_by_key(|(k, _)| *k);
562
563        l_pairs
564            .iter()
565            .zip(r_pairs.iter())
566            .map(|((lk, lv), (rk, rv))| lk.cmp(rk).then_with(|| Self::compare_values(lv, rv)))
567            .find(|o| o.is_ne())
568            .unwrap_or_else(|| l_pairs.len().cmp(&r_pairs.len()))
569    }
570
571    fn compare_nodes(left: &uni_common::Node, right: &uni_common::Node) -> std::cmp::Ordering {
572        let mut l_labels = left.labels.clone();
573        let mut r_labels = right.labels.clone();
574        l_labels.sort();
575        r_labels.sort();
576
577        l_labels
578            .cmp(&r_labels)
579            .then_with(|| left.vid.cmp(&right.vid))
580            .then_with(|| Self::compare_maps(&left.properties, &right.properties))
581    }
582
583    fn compare_edges(left: &uni_common::Edge, right: &uni_common::Edge) -> std::cmp::Ordering {
584        left.edge_type
585            .cmp(&right.edge_type)
586            .then_with(|| left.src.cmp(&right.src))
587            .then_with(|| left.dst.cmp(&right.dst))
588            .then_with(|| left.eid.cmp(&right.eid))
589            .then_with(|| Self::compare_maps(&left.properties, &right.properties))
590    }
591
592    fn compare_paths(left: &uni_common::Path, right: &uni_common::Path) -> std::cmp::Ordering {
593        left.nodes
594            .iter()
595            .zip(right.nodes.iter())
596            .map(|(l, r)| Self::compare_nodes(l, r))
597            .find(|o| o.is_ne())
598            .unwrap_or_else(|| left.nodes.len().cmp(&right.nodes.len()))
599            .then_with(|| {
600                left.edges
601                    .iter()
602                    .zip(right.edges.iter())
603                    .map(|(l, r)| Self::compare_edges(l, r))
604                    .find(|o| o.is_ne())
605                    .unwrap_or_else(|| left.edges.len().cmp(&right.edges.len()))
606            })
607    }
608
609    fn compare_temporal(left: &TemporalValue, right: &TemporalValue) -> std::cmp::Ordering {
610        match (left, right) {
611            (
612                TemporalValue::Date {
613                    days_since_epoch: l,
614                },
615                TemporalValue::Date {
616                    days_since_epoch: r,
617                },
618            ) => l.cmp(r),
619            (
620                TemporalValue::LocalTime {
621                    nanos_since_midnight: l,
622                },
623                TemporalValue::LocalTime {
624                    nanos_since_midnight: r,
625                },
626            ) => l.cmp(r),
627            (
628                TemporalValue::Time {
629                    nanos_since_midnight: lm,
630                    offset_seconds: lo,
631                },
632                TemporalValue::Time {
633                    nanos_since_midnight: rm,
634                    offset_seconds: ro,
635                },
636            ) => {
637                let l_utc = *lm as i128 - (*lo as i128) * 1_000_000_000;
638                let r_utc = *rm as i128 - (*ro as i128) * 1_000_000_000;
639                l_utc.cmp(&r_utc)
640            }
641            (
642                TemporalValue::LocalDateTime {
643                    nanos_since_epoch: l,
644                },
645                TemporalValue::LocalDateTime {
646                    nanos_since_epoch: r,
647                },
648            ) => l.cmp(r),
649            (
650                TemporalValue::DateTime {
651                    nanos_since_epoch: l,
652                    ..
653                },
654                TemporalValue::DateTime {
655                    nanos_since_epoch: r,
656                    ..
657                },
658            ) => l.cmp(r),
659            (
660                TemporalValue::Duration {
661                    months: lm,
662                    days: ld,
663                    nanos: ln,
664                },
665                TemporalValue::Duration {
666                    months: rm,
667                    days: rd,
668                    nanos: rn,
669                },
670            ) => (*lm, *ld, *ln).cmp(&(*rm, *rd, *rn)),
671            _ => Self::temporal_variant_rank(left).cmp(&Self::temporal_variant_rank(right)),
672        }
673    }
674
675    fn temporal_variant_rank(v: &TemporalValue) -> u8 {
676        match v {
677            TemporalValue::Date { .. } => 0,
678            TemporalValue::LocalTime { .. } => 1,
679            TemporalValue::Time { .. } => 2,
680            TemporalValue::LocalDateTime { .. } => 3,
681            TemporalValue::DateTime { .. } => 4,
682            TemporalValue::Duration { .. } => 5,
683        }
684    }
685}
686
687/// Combined output of a `PROFILE` query execution.
688///
689/// Contains both the logical plan explanation and per-operator runtime
690/// statistics collected during execution.
691#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
692pub struct ProfileOutput {
693    /// Logical plan explanation with index usage and cost estimates.
694    pub explain: crate::query::planner::ExplainOutput,
695    /// Per-operator timing and memory statistics.
696    pub runtime_stats: Vec<OperatorStats>,
697    /// Wall-clock time for the entire execution in milliseconds.
698    pub total_time_ms: u64,
699    /// Peak memory used during execution in bytes.
700    pub peak_memory_bytes: usize,
701}
702
703/// Runtime statistics for a single logical plan operator.
704#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
705pub struct OperatorStats {
706    /// Human-readable operator name (e.g., `"GraphScan"`, `"Filter"`).
707    pub operator: String,
708    /// Number of rows produced by this operator.
709    pub actual_rows: usize,
710    /// Wall-clock time spent in this operator in milliseconds.
711    pub time_ms: f64,
712    /// Memory allocated by this operator in bytes.
713    pub memory_bytes: usize,
714    /// Number of index cache hits (if applicable).
715    pub index_hits: Option<usize>,
716    /// Number of index cache misses (if applicable).
717    pub index_misses: Option<usize>,
718}
719
720/// Walk a DataFusion physical plan tree (post-order DFS) and collect
721/// per-operator metrics recorded by `BaselineMetrics` during execution.
722///
723/// Children are visited before parents so the resulting `Vec` flows from
724/// data-producers (leaf scans) up to consumers (projections, filters).
725fn collect_plan_metrics(
726    plan: &Arc<dyn datafusion::physical_plan::ExecutionPlan>,
727) -> Vec<OperatorStats> {
728    let mut stats = Vec::new();
729    collect_plan_metrics_inner(plan, &mut stats);
730    stats
731}
732
733fn collect_plan_metrics_inner(
734    plan: &Arc<dyn datafusion::physical_plan::ExecutionPlan>,
735    out: &mut Vec<OperatorStats>,
736) {
737    // Recurse into children first (post-order)
738    for child in plan.children() {
739        collect_plan_metrics_inner(child, out);
740    }
741
742    let operator = plan.name().to_string();
743
744    let (actual_rows, time_ms) = match plan.metrics() {
745        Some(metrics) => {
746            let rows = metrics.output_rows().unwrap_or(0);
747            // elapsed_compute() returns nanoseconds
748            let nanos = metrics.elapsed_compute().unwrap_or(0);
749            let ms = nanos as f64 / 1_000_000.0;
750            (rows, ms)
751        }
752        None => (0, 0.0),
753    };
754
755    out.push(OperatorStats {
756        operator,
757        actual_rows,
758        time_ms,
759        memory_bytes: 0,
760        index_hits: None,
761        index_misses: None,
762    });
763}
764
765impl Executor {
766    /// Profiles query execution and returns results with per-operator timing
767    /// statistics extracted from the DataFusion physical plan tree.
768    pub async fn profile(
769        &self,
770        plan: crate::query::planner::LogicalPlan,
771        params: &HashMap<String, Value>,
772    ) -> Result<(Vec<HashMap<String, Value>>, ProfileOutput)> {
773        // Generate ExplainOutput first
774        let planner =
775            crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
776        let explain_output = planner.explain_logical_plan(&plan)?;
777
778        let start = Instant::now();
779
780        let prop_manager = self.create_prop_manager();
781
782        // DDL/admin queries don't flow through DataFusion — fall back to
783        // single aggregate stat.
784        let (results, stats) = if Self::is_ddl_or_admin(&plan) {
785            let results = self
786                .execute_subplan(plan, &prop_manager, params, None)
787                .await?;
788            let elapsed = start.elapsed();
789            let stats = vec![OperatorStats {
790                operator: "DDL/Admin Execution".to_string(),
791                actual_rows: results.len(),
792                time_ms: elapsed.as_secs_f64() * 1000.0,
793                memory_bytes: 0,
794                index_hits: None,
795                index_misses: None,
796            }];
797            (results, stats)
798        } else {
799            let (batches, execution_plan) = self
800                .execute_datafusion_with_plan(plan, &prop_manager, params)
801                .await?;
802            let results = self.record_batches_to_rows(batches)?;
803            let stats = collect_plan_metrics(&execution_plan);
804            (results, stats)
805        };
806
807        let total_time = start.elapsed();
808
809        Ok((
810            results,
811            ProfileOutput {
812                explain: explain_output,
813                runtime_stats: stats,
814                total_time_ms: total_time.as_millis() as u64,
815                peak_memory_bytes: 0,
816            },
817        ))
818    }
819
820    fn create_prop_manager(&self) -> uni_store::runtime::property_manager::PropertyManager {
821        uni_store::runtime::property_manager::PropertyManager::new(
822            self.storage.clone(),
823            self.storage.schema_manager_arc(),
824            1000,
825        )
826    }
827}