Skip to main content

uni_query/query/executor/
core.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use anyhow::{Result, anyhow};
5use std::collections::{HashMap, HashSet};
6use std::sync::Arc;
7use std::time::Instant;
8use tokio::sync::RwLock;
9use uni_algo::algo::AlgorithmRegistry;
10use uni_common::{TemporalType, TemporalValue, Value};
11use uni_cypher::ast::{BinaryOp, Expr};
12use uni_store::QueryContext;
13use uni_store::runtime::l0_manager::L0Manager;
14use uni_store::runtime::writer::Writer;
15use uni_store::storage::manager::StorageManager;
16use uni_xervo::runtime::ModelRuntime;
17
18use crate::query::datetime::{classify_temporal, eval_datetime_function};
19use crate::query::expr_eval::eval_binary_op;
20
21use super::procedure::ProcedureRegistry;
22
23#[derive(Debug)]
24pub(crate) enum Accumulator {
25    Count(i64),
26    Sum(f64),
27    Min(Option<Value>),
28    Max(Option<Value>),
29    Avg { sum: f64, count: i64 },
30    Collect(Vec<Value>),
31    CountDistinct(HashSet<String>),
32    PercentileDisc { values: Vec<f64>, percentile: f64 },
33    PercentileCont { values: Vec<f64>, percentile: f64 },
34}
35
36/// Convert f64 to Value, preserving integer representation when possible.
37fn numeric_to_value(val: f64) -> Value {
38    if val.fract() == 0.0 && val >= i64::MIN as f64 && val <= i64::MAX as f64 {
39        Value::Int(val as i64)
40    } else {
41        Value::Float(val)
42    }
43}
44
45/// Cross-type ordering rank for Cypher min/max (lower rank = smaller).
46fn cypher_type_rank(val: &Value) -> u8 {
47    match val {
48        Value::Null => 0,
49        Value::List(_) => 1,
50        Value::String(_) => 2,
51        Value::Bool(_) => 3,
52        Value::Int(_) | Value::Float(_) => 4,
53        _ => 5,
54    }
55}
56
57/// Compare two Cypher values for min/max with cross-type ordering.
58fn cypher_cross_type_cmp(a: &Value, b: &Value) -> std::cmp::Ordering {
59    use std::cmp::Ordering;
60    let ra = cypher_type_rank(a);
61    let rb = cypher_type_rank(b);
62    if ra != rb {
63        return ra.cmp(&rb);
64    }
65    match (a, b) {
66        (Value::Int(l), Value::Int(r)) => l.cmp(r),
67        (Value::Float(l), Value::Float(r)) => l.partial_cmp(r).unwrap_or(Ordering::Equal),
68        (Value::Int(l), Value::Float(r)) => (*l as f64).partial_cmp(r).unwrap_or(Ordering::Equal),
69        (Value::Float(l), Value::Int(r)) => l.partial_cmp(&(*r as f64)).unwrap_or(Ordering::Equal),
70        (Value::String(l), Value::String(r)) => l.cmp(r),
71        (Value::Bool(l), Value::Bool(r)) => l.cmp(r),
72        _ => Ordering::Equal,
73    }
74}
75
76impl Accumulator {
77    pub(crate) fn new(op: &str, distinct: bool) -> Self {
78        Self::new_with_percentile(op, distinct, 0.0)
79    }
80
81    pub(crate) fn new_with_percentile(op: &str, distinct: bool, percentile: f64) -> Self {
82        let op_upper = op.to_uppercase();
83        match op_upper.as_str() {
84            "COUNT" if distinct => Accumulator::CountDistinct(HashSet::new()),
85            "COUNT" => Accumulator::Count(0),
86            "SUM" => Accumulator::Sum(0.0),
87            "MIN" => Accumulator::Min(None),
88            "MAX" => Accumulator::Max(None),
89            "AVG" => Accumulator::Avg { sum: 0.0, count: 0 },
90            "COLLECT" => Accumulator::Collect(Vec::new()),
91            "PERCENTILEDISC" => Accumulator::PercentileDisc {
92                values: Vec::new(),
93                percentile,
94            },
95            "PERCENTILECONT" => Accumulator::PercentileCont {
96                values: Vec::new(),
97                percentile,
98            },
99            _ => Accumulator::Count(0),
100        }
101    }
102
103    pub(crate) fn update(&mut self, val: &Value, is_wildcard: bool) {
104        match self {
105            Accumulator::Count(c) => {
106                if is_wildcard || !val.is_null() {
107                    *c += 1;
108                }
109            }
110            Accumulator::Sum(s) => {
111                if let Some(n) = val.as_f64() {
112                    *s += n;
113                }
114            }
115            Accumulator::Min(current) => {
116                if !val.is_null() {
117                    *current = Some(match current.take() {
118                        None => val.clone(),
119                        Some(cur) if cypher_cross_type_cmp(val, &cur).is_lt() => val.clone(),
120                        Some(cur) => cur,
121                    });
122                }
123            }
124            Accumulator::Max(current) => {
125                if !val.is_null() {
126                    *current = Some(match current.take() {
127                        None => val.clone(),
128                        Some(cur) if cypher_cross_type_cmp(val, &cur).is_gt() => val.clone(),
129                        Some(cur) => cur,
130                    });
131                }
132            }
133            Accumulator::Avg { sum, count } => {
134                if let Some(n) = val.as_f64() {
135                    *sum += n;
136                    *count += 1;
137                }
138            }
139            Accumulator::Collect(v) => {
140                if !val.is_null() {
141                    v.push(val.clone());
142                }
143            }
144            Accumulator::CountDistinct(s) => {
145                if !val.is_null() {
146                    s.insert(val.to_string());
147                }
148            }
149            Accumulator::PercentileDisc { values, .. }
150            | Accumulator::PercentileCont { values, .. } => {
151                if let Some(n) = val.as_f64() {
152                    values.push(n);
153                }
154            }
155        }
156    }
157
158    pub(crate) fn finish(&self) -> Value {
159        match self {
160            Accumulator::Count(c) => Value::Int(*c),
161            Accumulator::Sum(s) => numeric_to_value(*s),
162            Accumulator::Min(opt) => opt.as_ref().cloned().unwrap_or(Value::Null),
163            Accumulator::Max(opt) => opt.as_ref().cloned().unwrap_or(Value::Null),
164            Accumulator::Avg { sum, count } => {
165                if *count > 0 {
166                    Value::Float(*sum / (*count as f64))
167                } else {
168                    Value::Null
169                }
170            }
171            Accumulator::Collect(v) => Value::List(v.clone()),
172            Accumulator::CountDistinct(s) => Value::Int(s.len() as i64),
173            Accumulator::PercentileDisc { values, percentile } => {
174                if values.is_empty() {
175                    return Value::Null;
176                }
177                let mut sorted = values.clone();
178                sorted.sort_by(|a, b| a.total_cmp(b));
179                let n = sorted.len();
180                let idx = (percentile * (n as f64 - 1.0)).round() as usize;
181                numeric_to_value(sorted[idx.min(n - 1)])
182            }
183            Accumulator::PercentileCont { values, percentile } => {
184                if values.is_empty() {
185                    return Value::Null;
186                }
187                let mut sorted = values.clone();
188                sorted.sort_by(|a, b| a.total_cmp(b));
189                let n = sorted.len();
190                if n == 1 {
191                    return Value::Float(sorted[0]);
192                }
193                let pos = percentile * (n as f64 - 1.0);
194                let lower = (pos.floor() as usize).min(n - 1);
195                let upper = (pos.ceil() as usize).min(n - 1);
196                if lower == upper {
197                    Value::Float(sorted[lower])
198                } else {
199                    let frac = pos - lower as f64;
200                    Value::Float(sorted[lower] + frac * (sorted[upper] - sorted[lower]))
201                }
202            }
203        }
204    }
205}
206
207/// Cache key for parsed generation expressions: (label_name, property_name)
208pub(crate) type GenExprCacheKey = (String, String);
209
210#[derive(Clone)]
211pub struct Executor {
212    pub(crate) storage: Arc<StorageManager>,
213    pub(crate) writer: Option<Arc<RwLock<Writer>>>,
214    pub(crate) l0_manager: Option<Arc<L0Manager>>,
215    pub(crate) algo_registry: Arc<AlgorithmRegistry>,
216    pub(crate) use_transaction: bool,
217    /// File sandbox configuration for BACKUP/COPY/EXPORT commands
218    pub(crate) file_sandbox: uni_common::config::FileSandboxConfig,
219    pub(crate) config: uni_common::config::UniConfig,
220    /// Cache for parsed generation expressions to avoid re-parsing on every row
221    pub(crate) gen_expr_cache: Arc<RwLock<HashMap<GenExprCacheKey, Expr>>>,
222    /// External procedure registry for test/user-defined procedures.
223    pub(crate) procedure_registry: Option<Arc<ProcedureRegistry>>,
224    /// Uni-Xervo runtime used by vector auto-embedding paths.
225    pub(crate) xervo_runtime: Option<Arc<ModelRuntime>>,
226}
227
228impl Executor {
229    pub fn new(storage: Arc<StorageManager>) -> Self {
230        Self {
231            storage,
232            writer: None,
233            l0_manager: None,
234            algo_registry: Arc::new(AlgorithmRegistry::new()),
235            use_transaction: false,
236            file_sandbox: uni_common::config::FileSandboxConfig::default(),
237            config: uni_common::config::UniConfig::default(),
238            gen_expr_cache: Arc::new(RwLock::new(HashMap::new())),
239            procedure_registry: None,
240            xervo_runtime: None,
241        }
242    }
243
244    pub fn new_with_writer(storage: Arc<StorageManager>, writer: Arc<RwLock<Writer>>) -> Self {
245        let mut executor = Self::new(storage);
246        executor.writer = Some(writer);
247        executor
248    }
249
250    /// Sets the external procedure registry for user-defined procedures.
251    pub fn set_procedure_registry(&mut self, registry: Arc<ProcedureRegistry>) {
252        self.procedure_registry = Some(registry);
253    }
254
255    pub fn set_xervo_runtime(&mut self, runtime: Option<Arc<ModelRuntime>>) {
256        self.xervo_runtime = runtime;
257    }
258
259    /// Set the file sandbox configuration for BACKUP/COPY/EXPORT commands.
260    /// MUST be called with sandboxed config in server mode.
261    pub fn set_file_sandbox(&mut self, sandbox: uni_common::config::FileSandboxConfig) {
262        self.file_sandbox = sandbox;
263    }
264
265    pub fn set_config(&mut self, config: uni_common::config::UniConfig) {
266        self.config = config;
267    }
268
269    /// Validate a file path against the sandbox configuration.
270    pub(crate) fn validate_path(&self, path: &str) -> Result<std::path::PathBuf> {
271        self.file_sandbox
272            .validate_path(path)
273            .map_err(|e| anyhow!("Path validation failed: {}", e))
274    }
275
276    pub fn set_writer(&mut self, writer: Arc<RwLock<Writer>>) {
277        self.writer = Some(writer);
278    }
279
280    pub fn set_use_transaction(&mut self, use_transaction: bool) {
281        self.use_transaction = use_transaction;
282    }
283
284    pub(crate) async fn get_context(&self) -> Option<QueryContext> {
285        if let Some(writer_lock) = &self.writer {
286            let writer = writer_lock.read().await;
287            // Include pending_flush L0s so data being flushed remains visible
288            let mut ctx = QueryContext::new_with_pending(
289                writer.l0_manager.get_current(),
290                writer.transaction_l0.clone(),
291                writer.l0_manager.get_pending_flush(),
292            );
293            ctx.set_deadline(Instant::now() + self.config.query_timeout);
294            Some(ctx)
295        } else {
296            self.l0_manager.as_ref().map(|m| {
297                let mut ctx = QueryContext::new(m.get_current());
298                ctx.set_deadline(Instant::now() + self.config.query_timeout);
299                ctx
300            })
301        }
302    }
303
304    pub(crate) fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
305        use std::cmp::Ordering;
306
307        let temporal_a = Self::extract_temporal_value(a);
308        let temporal_b = Self::extract_temporal_value(b);
309
310        if let (Some(ta), Some(tb)) = (&temporal_a, &temporal_b) {
311            return Self::compare_temporal(ta, tb);
312        }
313
314        // Temporal strings (e.g. "1984-10-11T...") and Value::Temporal should
315        // compare using Cypher temporal semantics when compatible.
316        if matches!(
317            (a, b),
318            (Value::String(_), Value::Temporal(_)) | (Value::Temporal(_), Value::String(_))
319        ) && let Some(ord) = Self::try_eval_ordering(a, b)
320        {
321            return ord;
322        }
323        if let (Value::String(_), Some(tb)) = (a, temporal_b)
324            && let Some(ord) = Self::try_eval_ordering(a, &Value::Temporal(tb))
325        {
326            return ord;
327        }
328        if let (Some(ta), Value::String(_)) = (temporal_a, b)
329            && let Some(ord) = Self::try_eval_ordering(&Value::Temporal(ta), b)
330        {
331            return ord;
332        }
333
334        let ra = Self::order_by_type_rank(a);
335        let rb = Self::order_by_type_rank(b);
336        if ra != rb {
337            return ra.cmp(&rb);
338        }
339
340        match (a, b) {
341            (Value::Map(l), Value::Map(r)) => Self::compare_maps(l, r),
342            (Value::Node(l), Value::Node(r)) => Self::compare_nodes(l, r),
343            (Value::Edge(l), Value::Edge(r)) => Self::compare_edges(l, r),
344            (Value::List(l), Value::List(r)) => Self::compare_lists(l, r),
345            (Value::Path(l), Value::Path(r)) => Self::compare_paths(l, r),
346            (Value::String(l), Value::String(r)) => {
347                // Use eval_binary_op on the original references to avoid cloning.
348                Self::try_eval_ordering(a, b).unwrap_or_else(|| l.cmp(r))
349            }
350            (Value::Bool(l), Value::Bool(r)) => l.cmp(r),
351            (Value::Temporal(l), Value::Temporal(r)) => Self::compare_temporal(l, r),
352            (Value::Int(l), Value::Int(r)) => l.cmp(r),
353            (Value::Float(l), Value::Float(r)) => {
354                if l.is_nan() && r.is_nan() {
355                    Ordering::Equal
356                } else if l.is_nan() {
357                    Ordering::Greater
358                } else if r.is_nan() {
359                    Ordering::Less
360                } else {
361                    l.partial_cmp(r).unwrap_or(Ordering::Equal)
362                }
363            }
364            (Value::Int(l), Value::Float(r)) => {
365                if r.is_nan() {
366                    Ordering::Less
367                } else {
368                    (*l as f64).partial_cmp(r).unwrap_or(Ordering::Equal)
369                }
370            }
371            (Value::Float(l), Value::Int(r)) => {
372                if l.is_nan() {
373                    Ordering::Greater
374                } else {
375                    l.partial_cmp(&(*r as f64)).unwrap_or(Ordering::Equal)
376                }
377            }
378            (Value::Bytes(l), Value::Bytes(r)) => l.cmp(r),
379            (Value::Vector(l), Value::Vector(r)) => {
380                for (lv, rv) in l.iter().zip(r.iter()) {
381                    let ord = lv.total_cmp(rv);
382                    if ord != Ordering::Equal {
383                        return ord;
384                    }
385                }
386                l.len().cmp(&r.len())
387            }
388            _ => Ordering::Equal,
389        }
390    }
391
392    fn try_eval_ordering(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
393        use std::cmp::Ordering;
394        if matches!(eval_binary_op(a, &BinaryOp::Lt, b), Ok(Value::Bool(true))) {
395            Some(Ordering::Less)
396        } else if matches!(eval_binary_op(a, &BinaryOp::Gt, b), Ok(Value::Bool(true))) {
397            Some(Ordering::Greater)
398        } else if matches!(eval_binary_op(a, &BinaryOp::Eq, b), Ok(Value::Bool(true))) {
399            Some(Ordering::Equal)
400        } else {
401            None
402        }
403    }
404
405    /// Cypher ORDER BY total precedence:
406    /// MAP < NODE < RELATIONSHIP < LIST < PATH < STRING < BOOLEAN < TEMPORAL < NUMBER < NaN < NULL
407    fn order_by_type_rank(v: &Value) -> u8 {
408        match v {
409            Value::Map(map) => Self::map_order_rank(map),
410            Value::Node(_) => 1,
411            Value::Edge(_) => 2,
412            Value::List(_) => 3,
413            Value::Path(_) => 4,
414            Value::String(_) => 5,
415            Value::Bool(_) => 6,
416            Value::Temporal(_) => 7,
417            Value::Int(_) => 8,
418            Value::Float(f) if f.is_nan() => 9,
419            Value::Float(_) => 8,
420            Value::Null => 10,
421            Value::Bytes(_) | Value::Vector(_) => 11,
422            _ => 11,
423        }
424    }
425
426    fn map_order_rank(map: &HashMap<String, Value>) -> u8 {
427        if Self::map_as_temporal(map).is_some() {
428            7
429        } else if map.contains_key("nodes")
430            && (map.contains_key("relationships") || map.contains_key("edges"))
431        {
432            4
433        } else if map.contains_key("_eid")
434            || map.contains_key("_src")
435            || map.contains_key("_dst")
436            || map.contains_key("_type")
437            || map.contains_key("_type_name")
438        {
439            2
440        } else if map.contains_key("_vid")
441            || map.contains_key("_labels")
442            || map.contains_key("_label")
443        {
444            1
445        } else {
446            0
447        }
448    }
449
450    fn extract_temporal_value(value: &Value) -> Option<TemporalValue> {
451        match value {
452            Value::Temporal(t) => Some(t.clone()),
453            Value::Map(map) => Self::map_as_temporal(map),
454            Value::String(s) => Self::string_as_temporal(s),
455            _ => None,
456        }
457    }
458
459    fn string_as_temporal(s: &str) -> Option<TemporalValue> {
460        let fn_name = match classify_temporal(s)? {
461            TemporalType::Date => "DATE",
462            TemporalType::LocalTime => "LOCALTIME",
463            TemporalType::Time => "TIME",
464            TemporalType::LocalDateTime => "LOCALDATETIME",
465            TemporalType::DateTime => "DATETIME",
466            TemporalType::Duration => "DURATION",
467        };
468        match eval_datetime_function(fn_name, &[Value::String(s.to_string())]).ok()? {
469            Value::Temporal(tv) => Some(tv),
470            _ => None,
471        }
472    }
473
474    fn map_as_temporal(map: &HashMap<String, Value>) -> Option<TemporalValue> {
475        if map.len() != 1 {
476            return None;
477        }
478
479        let as_i32 = |v: &Value| v.as_i64().and_then(|n| i32::try_from(n).ok());
480        let as_i64 = |v: &Value| v.as_i64();
481
482        if let Some(Value::Map(inner)) = map.get("Date") {
483            let days = inner.get("days_since_epoch").and_then(as_i32)?;
484            return Some(TemporalValue::Date {
485                days_since_epoch: days,
486            });
487        }
488        if let Some(Value::Map(inner)) = map.get("LocalTime") {
489            let nanos = inner.get("nanos_since_midnight").and_then(as_i64)?;
490            return Some(TemporalValue::LocalTime {
491                nanos_since_midnight: nanos,
492            });
493        }
494        if let Some(Value::Map(inner)) = map.get("Time") {
495            let nanos = inner.get("nanos_since_midnight").and_then(as_i64)?;
496            let offset = inner.get("offset_seconds").and_then(as_i32)?;
497            return Some(TemporalValue::Time {
498                nanos_since_midnight: nanos,
499                offset_seconds: offset,
500            });
501        }
502        if let Some(Value::Map(inner)) = map.get("LocalDateTime") {
503            let nanos = inner.get("nanos_since_epoch").and_then(as_i64)?;
504            return Some(TemporalValue::LocalDateTime {
505                nanos_since_epoch: nanos,
506            });
507        }
508        if let Some(Value::Map(inner)) = map.get("DateTime") {
509            let nanos = inner.get("nanos_since_epoch").and_then(as_i64)?;
510            let offset = inner.get("offset_seconds").and_then(as_i32)?;
511            let timezone_name = match inner.get("timezone_name") {
512                Some(Value::String(s)) => Some(s.clone()),
513                _ => None,
514            };
515            return Some(TemporalValue::DateTime {
516                nanos_since_epoch: nanos,
517                offset_seconds: offset,
518                timezone_name,
519            });
520        }
521        if let Some(Value::Map(inner)) = map.get("Duration") {
522            let months = inner.get("months").and_then(as_i64)?;
523            let days = inner.get("days").and_then(as_i64)?;
524            let nanos = inner.get("nanos").and_then(as_i64)?;
525            return Some(TemporalValue::Duration {
526                months,
527                days,
528                nanos,
529            });
530        }
531        None
532    }
533
534    fn compare_lists(left: &[Value], right: &[Value]) -> std::cmp::Ordering {
535        left.iter()
536            .zip(right.iter())
537            .map(|(l, r)| Self::compare_values(l, r))
538            .find(|o| o.is_ne())
539            .unwrap_or_else(|| left.len().cmp(&right.len()))
540    }
541
542    fn compare_maps(
543        left: &HashMap<String, Value>,
544        right: &HashMap<String, Value>,
545    ) -> std::cmp::Ordering {
546        let mut l_pairs: Vec<_> = left.iter().collect();
547        let mut r_pairs: Vec<_> = right.iter().collect();
548        l_pairs.sort_by_key(|(k, _)| *k);
549        r_pairs.sort_by_key(|(k, _)| *k);
550
551        l_pairs
552            .iter()
553            .zip(r_pairs.iter())
554            .map(|((lk, lv), (rk, rv))| lk.cmp(rk).then_with(|| Self::compare_values(lv, rv)))
555            .find(|o| o.is_ne())
556            .unwrap_or_else(|| l_pairs.len().cmp(&r_pairs.len()))
557    }
558
559    fn compare_nodes(left: &uni_common::Node, right: &uni_common::Node) -> std::cmp::Ordering {
560        let mut l_labels = left.labels.clone();
561        let mut r_labels = right.labels.clone();
562        l_labels.sort();
563        r_labels.sort();
564
565        l_labels
566            .cmp(&r_labels)
567            .then_with(|| left.vid.cmp(&right.vid))
568            .then_with(|| Self::compare_maps(&left.properties, &right.properties))
569    }
570
571    fn compare_edges(left: &uni_common::Edge, right: &uni_common::Edge) -> std::cmp::Ordering {
572        left.edge_type
573            .cmp(&right.edge_type)
574            .then_with(|| left.src.cmp(&right.src))
575            .then_with(|| left.dst.cmp(&right.dst))
576            .then_with(|| left.eid.cmp(&right.eid))
577            .then_with(|| Self::compare_maps(&left.properties, &right.properties))
578    }
579
580    fn compare_paths(left: &uni_common::Path, right: &uni_common::Path) -> std::cmp::Ordering {
581        left.nodes
582            .iter()
583            .zip(right.nodes.iter())
584            .map(|(l, r)| Self::compare_nodes(l, r))
585            .find(|o| o.is_ne())
586            .unwrap_or_else(|| left.nodes.len().cmp(&right.nodes.len()))
587            .then_with(|| {
588                left.edges
589                    .iter()
590                    .zip(right.edges.iter())
591                    .map(|(l, r)| Self::compare_edges(l, r))
592                    .find(|o| o.is_ne())
593                    .unwrap_or_else(|| left.edges.len().cmp(&right.edges.len()))
594            })
595    }
596
597    fn compare_temporal(left: &TemporalValue, right: &TemporalValue) -> std::cmp::Ordering {
598        match (left, right) {
599            (
600                TemporalValue::Date {
601                    days_since_epoch: l,
602                },
603                TemporalValue::Date {
604                    days_since_epoch: r,
605                },
606            ) => l.cmp(r),
607            (
608                TemporalValue::LocalTime {
609                    nanos_since_midnight: l,
610                },
611                TemporalValue::LocalTime {
612                    nanos_since_midnight: r,
613                },
614            ) => l.cmp(r),
615            (
616                TemporalValue::Time {
617                    nanos_since_midnight: lm,
618                    offset_seconds: lo,
619                },
620                TemporalValue::Time {
621                    nanos_since_midnight: rm,
622                    offset_seconds: ro,
623                },
624            ) => {
625                let l_utc = *lm as i128 - (*lo as i128) * 1_000_000_000;
626                let r_utc = *rm as i128 - (*ro as i128) * 1_000_000_000;
627                l_utc.cmp(&r_utc)
628            }
629            (
630                TemporalValue::LocalDateTime {
631                    nanos_since_epoch: l,
632                },
633                TemporalValue::LocalDateTime {
634                    nanos_since_epoch: r,
635                },
636            ) => l.cmp(r),
637            (
638                TemporalValue::DateTime {
639                    nanos_since_epoch: l,
640                    ..
641                },
642                TemporalValue::DateTime {
643                    nanos_since_epoch: r,
644                    ..
645                },
646            ) => l.cmp(r),
647            (
648                TemporalValue::Duration {
649                    months: lm,
650                    days: ld,
651                    nanos: ln,
652                },
653                TemporalValue::Duration {
654                    months: rm,
655                    days: rd,
656                    nanos: rn,
657                },
658            ) => (*lm, *ld, *ln).cmp(&(*rm, *rd, *rn)),
659            _ => Self::temporal_variant_rank(left).cmp(&Self::temporal_variant_rank(right)),
660        }
661    }
662
663    fn temporal_variant_rank(v: &TemporalValue) -> u8 {
664        match v {
665            TemporalValue::Date { .. } => 0,
666            TemporalValue::LocalTime { .. } => 1,
667            TemporalValue::Time { .. } => 2,
668            TemporalValue::LocalDateTime { .. } => 3,
669            TemporalValue::DateTime { .. } => 4,
670            TemporalValue::Duration { .. } => 5,
671        }
672    }
673}
674
675#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
676pub struct ProfileOutput {
677    pub explain: crate::query::planner::ExplainOutput,
678    pub runtime_stats: Vec<OperatorStats>,
679    pub total_time_ms: u64,
680    pub peak_memory_bytes: usize,
681}
682
683#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
684pub struct OperatorStats {
685    pub operator: String,
686    pub actual_rows: usize,
687    pub time_ms: f64,
688    pub memory_bytes: usize,
689    pub index_hits: Option<usize>,
690    pub index_misses: Option<usize>,
691}
692
693impl Executor {
694    /// Profiles query execution and returns results with timing statistics.
695    ///
696    /// Uses the DataFusion-based executor for query execution. Granular operator
697    /// profiling will be added in a future release.
698    pub async fn profile(
699        &self,
700        plan: crate::query::planner::LogicalPlan,
701        params: &HashMap<String, Value>,
702    ) -> Result<(Vec<HashMap<String, Value>>, ProfileOutput)> {
703        // Generate ExplainOutput first
704        let planner =
705            crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
706        let explain_output = planner.explain_logical_plan(&plan)?;
707
708        let start = Instant::now();
709
710        // Execute using the standard execute path (DataFusion-based)
711        let prop_manager = self.create_prop_manager();
712        let results = self.execute(plan.clone(), &prop_manager, params).await?;
713
714        let total_time = start.elapsed();
715
716        // Return aggregate stats (granular operator profiling to be added later)
717        let stats = vec![OperatorStats {
718            operator: "DataFusion Execution".to_string(),
719            actual_rows: results.len(),
720            time_ms: total_time.as_secs_f64() * 1000.0,
721            memory_bytes: 0,
722            index_hits: None,
723            index_misses: None,
724        }];
725
726        Ok((
727            results,
728            ProfileOutput {
729                explain: explain_output,
730                runtime_stats: stats,
731                total_time_ms: total_time.as_millis() as u64,
732                peak_memory_bytes: 0,
733            },
734        ))
735    }
736
737    fn create_prop_manager(&self) -> uni_store::runtime::property_manager::PropertyManager {
738        uni_store::runtime::property_manager::PropertyManager::new(
739            self.storage.clone(),
740            self.storage.schema_manager_arc(),
741            1000,
742        )
743    }
744}