Skip to main content

uni_query/query/executor/
core.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use anyhow::{Result, anyhow};
5use std::collections::{HashMap, HashSet};
6use std::sync::Arc;
7use std::time::Instant;
8use tokio::sync::RwLock;
9use uni_algo::algo::AlgorithmRegistry;
10use uni_common::{TemporalValue, Value};
11use uni_cypher::ast::{BinaryOp, Expr};
12use uni_store::PropertyManager;
13use uni_store::QueryContext;
14use uni_store::runtime::l0_manager::L0Manager;
15use uni_store::runtime::writer::Writer;
16use uni_store::storage::manager::StorageManager;
17use uni_xervo::runtime::ModelRuntime;
18
19use crate::query::expr_eval::eval_binary_op;
20use crate::types::QueryWarning;
21
22use super::procedure::ProcedureRegistry;
23
24/// Mutable accumulator for Cypher aggregate functions (COUNT, SUM, AVG, ...).
25#[derive(Debug)]
26pub(crate) enum Accumulator {
27    Count(i64),
28    Sum(f64),
29    Min(Option<Value>),
30    Max(Option<Value>),
31    Avg { sum: f64, count: i64 },
32    Collect(Vec<Value>),
33    CountDistinct(HashSet<String>),
34    PercentileDisc { values: Vec<f64>, percentile: f64 },
35    PercentileCont { values: Vec<f64>, percentile: f64 },
36}
37
38/// Convert f64 to Value, preserving integer representation when possible.
39fn numeric_to_value(val: f64) -> Value {
40    if val.fract() == 0.0 && val >= i64::MIN as f64 && val <= i64::MAX as f64 {
41        Value::Int(val as i64)
42    } else {
43        Value::Float(val)
44    }
45}
46
47/// Cross-type ordering rank for Cypher min/max (lower rank = smaller).
48///
49/// NOTE: This is deliberately a *different* ordering from
50/// [`Executor::compare_values`] (used for `ORDER BY`). The two systems must
51/// not be merged: `min`/`max` follow the openCypher aggregation ordering
52/// (Null < List < String < Boolean < Number), whereas `ORDER BY` follows the
53/// openCypher comparability/ordering rules for sort, which rank the type
54/// families differently. Unifying them would silently break one or the other.
55fn cypher_type_rank(val: &Value) -> u8 {
56    match val {
57        Value::Null => 0,
58        Value::List(_) => 1,
59        Value::String(_) => 2,
60        Value::Bool(_) => 3,
61        Value::Int(_) | Value::Float(_) => 4,
62        _ => 5,
63    }
64}
65
66/// Compare two Cypher values for min/max with cross-type ordering.
67fn cypher_cross_type_cmp(a: &Value, b: &Value) -> std::cmp::Ordering {
68    use std::cmp::Ordering;
69    let ra = cypher_type_rank(a);
70    let rb = cypher_type_rank(b);
71    if ra != rb {
72        return ra.cmp(&rb);
73    }
74    match (a, b) {
75        (Value::Int(l), Value::Int(r)) => l.cmp(r),
76        (Value::Float(l), Value::Float(r)) => l.partial_cmp(r).unwrap_or(Ordering::Equal),
77        (Value::Int(l), Value::Float(r)) => (*l as f64).partial_cmp(r).unwrap_or(Ordering::Equal),
78        (Value::Float(l), Value::Int(r)) => l.partial_cmp(&(*r as f64)).unwrap_or(Ordering::Equal),
79        (Value::String(l), Value::String(r)) => l.cmp(r),
80        (Value::Bool(l), Value::Bool(r)) => l.cmp(r),
81        _ => Ordering::Equal,
82    }
83}
84
85impl Accumulator {
86    pub(crate) fn new(op: &str, distinct: bool) -> Self {
87        Self::new_with_percentile(op, distinct, 0.0)
88    }
89
90    pub(crate) fn new_with_percentile(op: &str, distinct: bool, percentile: f64) -> Self {
91        let op_upper = op.to_uppercase();
92        match op_upper.as_str() {
93            "COUNT" if distinct => Accumulator::CountDistinct(HashSet::new()),
94            "COUNT" => Accumulator::Count(0),
95            "SUM" => Accumulator::Sum(0.0),
96            "MIN" => Accumulator::Min(None),
97            "MAX" => Accumulator::Max(None),
98            "AVG" => Accumulator::Avg { sum: 0.0, count: 0 },
99            "COLLECT" => Accumulator::Collect(Vec::new()),
100            "PERCENTILEDISC" => Accumulator::PercentileDisc {
101                values: Vec::new(),
102                percentile,
103            },
104            "PERCENTILECONT" => Accumulator::PercentileCont {
105                values: Vec::new(),
106                percentile,
107            },
108            _ => Accumulator::Count(0),
109        }
110    }
111
112    pub(crate) fn update(&mut self, val: &Value, is_wildcard: bool) {
113        match self {
114            Accumulator::Count(c) => {
115                if is_wildcard || !val.is_null() {
116                    *c += 1;
117                }
118            }
119            Accumulator::Sum(s) => {
120                if let Some(n) = val.as_f64() {
121                    *s += n;
122                }
123            }
124            Accumulator::Min(current) => {
125                if !val.is_null() {
126                    *current = Some(match current.take() {
127                        None => val.clone(),
128                        Some(cur) if cypher_cross_type_cmp(val, &cur).is_lt() => val.clone(),
129                        Some(cur) => cur,
130                    });
131                }
132            }
133            Accumulator::Max(current) => {
134                if !val.is_null() {
135                    *current = Some(match current.take() {
136                        None => val.clone(),
137                        Some(cur) if cypher_cross_type_cmp(val, &cur).is_gt() => val.clone(),
138                        Some(cur) => cur,
139                    });
140                }
141            }
142            Accumulator::Avg { sum, count } => {
143                if let Some(n) = val.as_f64() {
144                    *sum += n;
145                    *count += 1;
146                }
147            }
148            Accumulator::Collect(v) => {
149                if !val.is_null() {
150                    v.push(val.clone());
151                }
152            }
153            Accumulator::CountDistinct(s) => {
154                if !val.is_null() {
155                    s.insert(val.to_string());
156                }
157            }
158            Accumulator::PercentileDisc { values, .. }
159            | Accumulator::PercentileCont { values, .. } => {
160                if let Some(n) = val.as_f64() {
161                    values.push(n);
162                }
163            }
164        }
165    }
166
167    pub(crate) fn finish(&self) -> Value {
168        match self {
169            Accumulator::Count(c) => Value::Int(*c),
170            Accumulator::Sum(s) => numeric_to_value(*s),
171            Accumulator::Min(opt) => opt.as_ref().cloned().unwrap_or(Value::Null),
172            Accumulator::Max(opt) => opt.as_ref().cloned().unwrap_or(Value::Null),
173            Accumulator::Avg { sum, count } => {
174                if *count > 0 {
175                    Value::Float(*sum / (*count as f64))
176                } else {
177                    Value::Null
178                }
179            }
180            Accumulator::Collect(v) => Value::List(v.clone()),
181            Accumulator::CountDistinct(s) => Value::Int(s.len() as i64),
182            Accumulator::PercentileDisc { values, percentile } => {
183                if values.is_empty() {
184                    return Value::Null;
185                }
186                let mut sorted = values.clone();
187                sorted.sort_by(|a, b| a.total_cmp(b));
188                let n = sorted.len();
189                let idx = (percentile * (n as f64 - 1.0)).round() as usize;
190                numeric_to_value(sorted[idx.min(n - 1)])
191            }
192            Accumulator::PercentileCont { values, percentile } => {
193                if values.is_empty() {
194                    return Value::Null;
195                }
196                let mut sorted = values.clone();
197                sorted.sort_by(|a, b| a.total_cmp(b));
198                let n = sorted.len();
199                if n == 1 {
200                    return Value::Float(sorted[0]);
201                }
202                let pos = percentile * (n as f64 - 1.0);
203                let lower = (pos.floor() as usize).min(n - 1);
204                let upper = (pos.ceil() as usize).min(n - 1);
205                if lower == upper {
206                    Value::Float(sorted[lower])
207                } else {
208                    let frac = pos - lower as f64;
209                    Value::Float(sorted[lower] + frac * (sorted[upper] - sorted[lower]))
210                }
211            }
212        }
213    }
214}
215
216/// Cache key for parsed generation expressions: (label_name, property_name)
217pub(crate) type GenExprCacheKey = (String, String);
218
219/// Query executor: runs logical plans against a Uni storage backend.
220///
221/// `Executor` bridges the logical query plan produced by [`crate::query::planner::QueryPlanner`]
222/// with the underlying `StorageManager`. It handles both read-only sessions and
223/// write-enabled sessions (via `Writer` and `L0Manager`).
224///
225/// # Cloning
226///
227/// `Executor` is cheaply cloneable — all expensive state is held behind `Arc`s.
228/// Clone it freely to share across tasks.
229///
230/// **Note on `Clone` semantics**: the derived `Clone` would alias the
231/// `warnings` `Arc<Mutex<…>>`, making warnings collected on the clone
232/// bleed into the original. A clone is intended to be a "fresh executor
233/// borrowing shared state," so the manual impl below installs a fresh
234/// `warnings` accumulator. All other Arc-shared state is intentionally
235/// shared (caches, registries, runtimes).
236// M-PUBLIC-DEBUG: Manual impl because Writer/ModelRuntime do not implement Debug.
237pub struct Executor {
238    pub(crate) storage: Arc<StorageManager>,
239    pub(crate) writer: Option<Arc<Writer>>,
240    pub(crate) l0_manager: Option<Arc<L0Manager>>,
241    pub(crate) algo_registry: Arc<AlgorithmRegistry>,
242    pub(crate) use_transaction: bool,
243    /// File sandbox configuration for BACKUP/COPY/EXPORT commands
244    pub(crate) file_sandbox: uni_common::config::FileSandboxConfig,
245    pub(crate) config: uni_common::config::UniConfig,
246    /// Cache for parsed generation expressions to avoid re-parsing on every row
247    pub(crate) gen_expr_cache: Arc<RwLock<HashMap<GenExprCacheKey, Expr>>>,
248    /// External procedure registry for test/user-defined procedures.
249    pub(crate) procedure_registry: Option<Arc<ProcedureRegistry>>,
250    /// Uni-Xervo runtime used by vector auto-embedding paths.
251    pub(crate) xervo_runtime: Option<Arc<ModelRuntime>>,
252    /// Warnings collected during the last execution.
253    pub(crate) warnings: Arc<std::sync::Mutex<Vec<QueryWarning>>>,
254    /// Private transaction L0 buffer for query context and mutations.
255    /// Used by Transaction to route reads and writes through a private L0 buffer
256    /// without requiring the writer lock at transaction-creation time.
257    pub(crate) transaction_l0_override:
258        Option<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
259    /// Per-transaction VID/EID reservoir. When set, CREATE/MERGE paths pull
260    /// IDs from this batched cache instead of `Writer::next_vid()` /
261    /// `Writer::next_eid()`, amortizing the `IdAllocator`'s global mutex.
262    pub(crate) id_reservoir: Option<Arc<uni_store::runtime::TxIdReservoir>>,
263    /// User-defined custom scalar function registry.
264    pub(crate) custom_function_registry:
265        Option<Arc<uni_query_functions::custom_functions::CustomFunctionRegistry>>,
266    /// Cooperative cancellation token. Passed to `QueryContext` and
267    /// `GraphExecutionContext` so in-flight operators can detect cancellation.
268    pub(crate) cancellation_token: Option<tokio_util::sync::CancellationToken>,
269    /// Shared `PropertyManager` to reuse inside `create_datafusion_planner`
270    /// instead of allocating a fresh one (with empty LRU caches) per query.
271    /// When `None`, the planner falls back to constructing from the
272    /// `&PropertyManager` parameter — preserving existing behavior for any
273    /// caller that does not set this.
274    pub(crate) prop_manager_arc: Option<Arc<PropertyManager>>,
275    /// Pre-built DataFusion `SessionContext` template with all Cypher UDFs
276    /// already registered. When set AND no custom UDFs are installed,
277    /// `create_datafusion_planner` clones this Arc (O(1)) instead of building
278    /// a fresh `SessionContext` and re-registering UDFs every query
279    /// (~140 µs/query saved on the InMemory backend).
280    pub(crate) df_session_template: Option<Arc<datafusion::execution::context::SessionContext>>,
281    /// Pinned L0 snapshot for snapshot-isolated reads (Component C1). When set,
282    /// `get_context` builds the read context from this frozen view instead of the
283    /// live L0, so a transaction's reads are isolated from concurrent commits.
284    /// `None` (the default, when no snapshot has been pinned) means live reads.
285    pub(crate) read_snapshot: Option<uni_store::runtime::SnapshotView>,
286}
287
288impl std::fmt::Debug for Executor {
289    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
290        f.debug_struct("Executor")
291            .field("use_transaction", &self.use_transaction)
292            .field("has_writer", &self.writer.is_some())
293            .field("has_l0_manager", &self.l0_manager.is_some())
294            .field("has_xervo_runtime", &self.xervo_runtime.is_some())
295            .finish_non_exhaustive()
296    }
297}
298
299impl Clone for Executor {
300    /// Clone an `Executor`, sharing all Arc-backed state but installing a
301    /// fresh `warnings` accumulator. Warnings are query-scoped; aliasing
302    /// the Mutex across clones would let one query's warnings spill into
303    /// another.
304    fn clone(&self) -> Self {
305        Self {
306            storage: self.storage.clone(),
307            writer: self.writer.clone(),
308            l0_manager: self.l0_manager.clone(),
309            algo_registry: self.algo_registry.clone(),
310            use_transaction: self.use_transaction,
311            file_sandbox: self.file_sandbox.clone(),
312            config: self.config.clone(),
313            gen_expr_cache: self.gen_expr_cache.clone(),
314            procedure_registry: self.procedure_registry.clone(),
315            xervo_runtime: self.xervo_runtime.clone(),
316            // Fresh warnings per clone — see struct doc.
317            warnings: Arc::new(std::sync::Mutex::new(Vec::new())),
318            transaction_l0_override: self.transaction_l0_override.clone(),
319            id_reservoir: self.id_reservoir.clone(),
320            custom_function_registry: self.custom_function_registry.clone(),
321            cancellation_token: self.cancellation_token.clone(),
322            prop_manager_arc: self.prop_manager_arc.clone(),
323            df_session_template: self.df_session_template.clone(),
324            read_snapshot: self.read_snapshot.clone(),
325        }
326    }
327}
328
329impl Executor {
330    /// Create a read-only executor backed by the given storage manager.
331    pub fn new(storage: Arc<StorageManager>) -> Self {
332        // M4: auto-wire the host plugin registry so low-level test
333        // setups that bypass `Uni::build` still see `uni.schema.*` and
334        // `uni.algo.*` procedures (which no longer have hardcoded
335        // dispatch arms in `procedure_call.rs`).
336        let proc_registry = Arc::new(ProcedureRegistry::new());
337        proc_registry.set_plugin_registry(crate::procedures_plugin::default_host_plugin_registry());
338        Self {
339            storage,
340            writer: None,
341            l0_manager: None,
342            algo_registry: Arc::new(AlgorithmRegistry::new()),
343            use_transaction: false,
344            file_sandbox: uni_common::config::FileSandboxConfig::default(),
345            config: uni_common::config::UniConfig::default(),
346            gen_expr_cache: Arc::new(RwLock::new(HashMap::new())),
347            procedure_registry: Some(proc_registry),
348            xervo_runtime: None,
349            warnings: Arc::new(std::sync::Mutex::new(Vec::new())),
350            transaction_l0_override: None,
351            id_reservoir: None,
352            custom_function_registry: None,
353            cancellation_token: None,
354            prop_manager_arc: None,
355            df_session_template: None,
356            read_snapshot: None,
357        }
358    }
359
360    /// Install a shared `PropertyManager` so the planner can reuse it
361    /// instead of constructing a fresh one per query.
362    ///
363    /// Reusing the caller's `Arc<PropertyManager>` skips the LRU cache
364    /// allocation cost (~80 µs/query in the InMemory backend).
365    pub fn set_prop_manager(&mut self, pm: Arc<PropertyManager>) {
366        self.prop_manager_arc = Some(pm);
367    }
368
369    /// Install a pre-built `SessionContext` template with Cypher UDFs already
370    /// registered, so the planner can clone it (O(1)) instead of rebuilding.
371    ///
372    /// When custom UDFs are also installed, the template is bypassed and a
373    /// fresh `SessionContext` is built for that query (preserves UDF
374    /// isolation across executors).
375    pub fn set_df_session_template(
376        &mut self,
377        tmpl: Arc<datafusion::execution::context::SessionContext>,
378    ) {
379        self.df_session_template = Some(tmpl);
380    }
381
382    /// Create a write-enabled executor with an attached `Writer`.
383    pub fn new_with_writer(storage: Arc<StorageManager>, writer: Arc<Writer>) -> Self {
384        let mut executor = Self::new(storage);
385        executor.writer = Some(writer);
386        executor
387    }
388
389    /// Attach an external procedure registry for user-defined procedures.
390    pub fn set_procedure_registry(&mut self, registry: Arc<ProcedureRegistry>) {
391        self.procedure_registry = Some(registry);
392    }
393
394    /// Attach or detach the Uni-Xervo model runtime for vector auto-embedding.
395    pub fn set_xervo_runtime(&mut self, runtime: Option<Arc<ModelRuntime>>) {
396        self.xervo_runtime = runtime;
397    }
398
399    /// Set the file sandbox configuration for BACKUP/COPY/EXPORT commands.
400    /// MUST be called with sandboxed config in server mode.
401    pub fn set_file_sandbox(&mut self, sandbox: uni_common::config::FileSandboxConfig) {
402        self.file_sandbox = sandbox;
403    }
404
405    /// Apply a runtime configuration to this executor.
406    pub fn set_config(&mut self, config: uni_common::config::UniConfig) {
407        self.config = config;
408    }
409
410    /// Validate a file path against the sandbox configuration.
411    pub(crate) fn validate_path(&self, path: &str) -> Result<std::path::PathBuf> {
412        self.file_sandbox
413            .validate_path(path)
414            .map_err(|e| anyhow!("Path validation failed: {}", e))
415    }
416
417    /// Attach a `Writer` after construction, enabling write operations.
418    pub fn set_writer(&mut self, writer: Arc<Writer>) {
419        self.writer = Some(writer);
420    }
421
422    /// Take all collected warnings from the last execution, leaving the collector empty.
423    pub fn take_warnings(&self) -> Vec<QueryWarning> {
424        self.warnings
425            .lock()
426            .map(|mut w| std::mem::take(&mut *w))
427            .unwrap_or_default()
428    }
429
430    /// Configure whether query execution should operate within a transaction context.
431    pub fn set_use_transaction(&mut self, use_transaction: bool) {
432        self.use_transaction = use_transaction;
433    }
434
435    /// Set a private transaction L0 buffer for both read visibility (QueryContext)
436    /// and mutation routing.
437    pub fn set_transaction_l0(
438        &mut self,
439        l0: Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>,
440    ) {
441        self.transaction_l0_override = Some(l0);
442    }
443
444    /// Attach a pinned L0 snapshot for snapshot-isolated reads (Component C1).
445    ///
446    /// When `Some`, `get_context` reads from the frozen view instead of
447    /// the live L0. A `None` is a no-op (live reads), so threading it is always
448    /// safe — callers that do not pin a snapshot leave it `None`.
449    pub fn set_read_snapshot(&mut self, snapshot: Option<uni_store::runtime::SnapshotView>) {
450        self.read_snapshot = snapshot;
451    }
452
453    /// Attach a per-transaction VID/EID reservoir. When set, CREATE/MERGE
454    /// paths in `execute_create_pattern` pull IDs from the reservoir's
455    /// pre-reserved cache, amortizing the global `IdAllocator` mutex.
456    pub fn set_id_reservoir(&mut self, r: Arc<uni_store::runtime::TxIdReservoir>) {
457        self.id_reservoir = Some(r);
458    }
459
460    /// Attach a custom scalar function registry for user-defined functions.
461    pub fn set_custom_functions(
462        &mut self,
463        registry: Arc<uni_query_functions::custom_functions::CustomFunctionRegistry>,
464    ) {
465        self.custom_function_registry = Some(registry);
466    }
467
468    /// Set a cooperative cancellation token for in-flight query cancellation.
469    pub fn set_cancellation_token(&mut self, token: tokio_util::sync::CancellationToken) {
470        self.cancellation_token = Some(token);
471    }
472
473    /// Build a `QueryContext` from the current writer or standalone L0 manager.
474    /// When `transaction_l0_override` is set, it is used as the transaction L0 —
475    /// this is how private-per-transaction L0 buffers become visible to reads
476    /// without requiring the writer lock at tx creation.
477    pub(crate) async fn get_context(&self) -> Option<QueryContext> {
478        if let Some(writer) = &self.writer {
479            // Prefer the override (private tx L0) over the writer's slot
480            let tx_l0 = self.transaction_l0_override.clone();
481            // Component C1: when a snapshot is pinned, read from the frozen
482            // generation (`main` + `extra`) so concurrent commits are invisible;
483            // `transaction_l0` stays live for read-your-writes. `None` (the
484            // default, when no snapshot is pinned) ⇒ live reads.
485            let mut ctx = match &self.read_snapshot {
486                Some(snap) => {
487                    QueryContext::new_with_pending(snap.main.clone(), tx_l0, snap.extra.clone())
488                }
489                None => QueryContext::new_with_pending(
490                    writer.l0_manager.get_current(),
491                    tx_l0,
492                    writer.l0_manager.get_pending_flush(),
493                ),
494            };
495            ctx.set_deadline(Instant::now() + self.config.query_timeout);
496            if let Some(ref token) = self.cancellation_token {
497                ctx.set_cancellation_token(token.clone());
498            }
499            Some(ctx)
500        } else {
501            self.l0_manager.as_ref().map(|m| {
502                let mut ctx = QueryContext::new(m.get_current());
503                ctx.set_deadline(Instant::now() + self.config.query_timeout);
504                if let Some(ref token) = self.cancellation_token {
505                    ctx.set_cancellation_token(token.clone());
506                }
507                ctx
508            })
509        }
510    }
511
512    /// Total ordering for Cypher ORDER BY, including cross-type comparisons.
513    ///
514    /// NOTE: This uses a different cross-type rank from
515    /// [`cypher_cross_type_cmp`]/[`cypher_type_rank`] (the `min`/`max`
516    /// aggregation ordering). The divergence is intentional — sort ordering
517    /// and aggregation ordering follow distinct openCypher rules — so the two
518    /// must stay separate.
519    pub(crate) fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
520        use std::cmp::Ordering;
521
522        let temporal_a = Self::extract_temporal_value(a);
523        let temporal_b = Self::extract_temporal_value(b);
524
525        if let (Some(ta), Some(tb)) = (&temporal_a, &temporal_b) {
526            return Self::compare_temporal(ta, tb);
527        }
528
529        // Temporal strings (e.g. "1984-10-11T...") and Value::Temporal should
530        // compare using Cypher temporal semantics when compatible.
531        if matches!(
532            (a, b),
533            (Value::String(_), Value::Temporal(_)) | (Value::Temporal(_), Value::String(_))
534        ) && let Some(ord) = Self::try_eval_ordering(a, b)
535        {
536            return ord;
537        }
538        if let (Value::String(_), Some(tb)) = (a, temporal_b)
539            && let Some(ord) = Self::try_eval_ordering(a, &Value::Temporal(tb))
540        {
541            return ord;
542        }
543        if let (Some(ta), Value::String(_)) = (temporal_a, b)
544            && let Some(ord) = Self::try_eval_ordering(&Value::Temporal(ta), b)
545        {
546            return ord;
547        }
548
549        let ra = Self::order_by_type_rank(a);
550        let rb = Self::order_by_type_rank(b);
551        if ra != rb {
552            return ra.cmp(&rb);
553        }
554
555        match (a, b) {
556            (Value::Map(l), Value::Map(r)) => Self::compare_maps(l, r),
557            (Value::Node(l), Value::Node(r)) => Self::compare_nodes(l, r),
558            (Value::Edge(l), Value::Edge(r)) => Self::compare_edges(l, r),
559            (Value::List(l), Value::List(r)) => Self::compare_lists(l, r),
560            (Value::Path(l), Value::Path(r)) => Self::compare_paths(l, r),
561            (Value::String(l), Value::String(r)) => {
562                // Use eval_binary_op on the original references to avoid cloning.
563                Self::try_eval_ordering(a, b).unwrap_or_else(|| l.cmp(r))
564            }
565            (Value::Bool(l), Value::Bool(r)) => l.cmp(r),
566            (Value::Temporal(l), Value::Temporal(r)) => Self::compare_temporal(l, r),
567            (Value::Int(l), Value::Int(r)) => l.cmp(r),
568            (Value::Float(l), Value::Float(r)) => {
569                if l.is_nan() && r.is_nan() {
570                    Ordering::Equal
571                } else if l.is_nan() {
572                    Ordering::Greater
573                } else if r.is_nan() {
574                    Ordering::Less
575                } else {
576                    l.partial_cmp(r).unwrap_or(Ordering::Equal)
577                }
578            }
579            (Value::Int(l), Value::Float(r)) => {
580                if r.is_nan() {
581                    Ordering::Less
582                } else {
583                    (*l as f64).partial_cmp(r).unwrap_or(Ordering::Equal)
584                }
585            }
586            (Value::Float(l), Value::Int(r)) => {
587                if l.is_nan() {
588                    Ordering::Greater
589                } else {
590                    l.partial_cmp(&(*r as f64)).unwrap_or(Ordering::Equal)
591                }
592            }
593            (Value::Bytes(l), Value::Bytes(r)) => l.cmp(r),
594            (Value::Vector(l), Value::Vector(r)) => {
595                for (lv, rv) in l.iter().zip(r.iter()) {
596                    let ord = lv.total_cmp(rv);
597                    if ord != Ordering::Equal {
598                        return ord;
599                    }
600                }
601                l.len().cmp(&r.len())
602            }
603            _ => Ordering::Equal,
604        }
605    }
606
607    fn try_eval_ordering(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
608        use std::cmp::Ordering;
609        if matches!(eval_binary_op(a, &BinaryOp::Lt, b), Ok(Value::Bool(true))) {
610            Some(Ordering::Less)
611        } else if matches!(eval_binary_op(a, &BinaryOp::Gt, b), Ok(Value::Bool(true))) {
612            Some(Ordering::Greater)
613        } else if matches!(eval_binary_op(a, &BinaryOp::Eq, b), Ok(Value::Bool(true))) {
614            Some(Ordering::Equal)
615        } else {
616            None
617        }
618    }
619
620    /// Cypher ORDER BY total precedence:
621    /// MAP < NODE < RELATIONSHIP < LIST < PATH < STRING < BOOLEAN < TEMPORAL < NUMBER < NaN < NULL
622    fn order_by_type_rank(v: &Value) -> u8 {
623        match v {
624            Value::Map(map) => Self::map_order_rank(map),
625            Value::Node(_) => 1,
626            Value::Edge(_) => 2,
627            Value::List(_) => 3,
628            Value::Path(_) => 4,
629            Value::String(_) => 5,
630            Value::Bool(_) => 6,
631            Value::Temporal(_) => 7,
632            Value::Int(_) => 8,
633            Value::Float(f) if f.is_nan() => 9,
634            Value::Float(_) => 8,
635            Value::Null => 10,
636            Value::Bytes(_) | Value::Vector(_) => 11,
637            _ => 11,
638        }
639    }
640
641    fn map_order_rank(map: &HashMap<String, Value>) -> u8 {
642        if Self::map_as_temporal(map).is_some() {
643            7
644        } else if map.contains_key("nodes")
645            && (map.contains_key("relationships") || map.contains_key("edges"))
646        {
647            4
648        } else if map.contains_key("_eid")
649            || map.contains_key("_src")
650            || map.contains_key("_dst")
651            || map.contains_key("_type")
652            || map.contains_key("_type_name")
653        {
654            2
655        } else if map.contains_key("_vid")
656            || map.contains_key("_labels")
657            || map.contains_key("_label")
658        {
659            1
660        } else {
661            0
662        }
663    }
664
665    fn extract_temporal_value(value: &Value) -> Option<TemporalValue> {
666        crate::query::expr_eval::temporal_from_value(value)
667    }
668
669    fn map_as_temporal(map: &HashMap<String, Value>) -> Option<TemporalValue> {
670        crate::query::expr_eval::temporal_from_map_wrapper(map)
671    }
672
673    fn compare_lists(left: &[Value], right: &[Value]) -> std::cmp::Ordering {
674        left.iter()
675            .zip(right.iter())
676            .map(|(l, r)| Self::compare_values(l, r))
677            .find(|o| o.is_ne())
678            .unwrap_or_else(|| left.len().cmp(&right.len()))
679    }
680
681    fn compare_maps(
682        left: &HashMap<String, Value>,
683        right: &HashMap<String, Value>,
684    ) -> std::cmp::Ordering {
685        let mut l_pairs: Vec<_> = left.iter().collect();
686        let mut r_pairs: Vec<_> = right.iter().collect();
687        l_pairs.sort_by_key(|(k, _)| *k);
688        r_pairs.sort_by_key(|(k, _)| *k);
689
690        l_pairs
691            .iter()
692            .zip(r_pairs.iter())
693            .map(|((lk, lv), (rk, rv))| lk.cmp(rk).then_with(|| Self::compare_values(lv, rv)))
694            .find(|o| o.is_ne())
695            .unwrap_or_else(|| l_pairs.len().cmp(&r_pairs.len()))
696    }
697
698    fn compare_nodes(left: &uni_common::Node, right: &uni_common::Node) -> std::cmp::Ordering {
699        let mut l_labels = left.labels.clone();
700        let mut r_labels = right.labels.clone();
701        l_labels.sort();
702        r_labels.sort();
703
704        l_labels
705            .cmp(&r_labels)
706            .then_with(|| left.vid.cmp(&right.vid))
707            .then_with(|| Self::compare_maps(&left.properties, &right.properties))
708    }
709
710    fn compare_edges(left: &uni_common::Edge, right: &uni_common::Edge) -> std::cmp::Ordering {
711        left.edge_type
712            .cmp(&right.edge_type)
713            .then_with(|| left.src.cmp(&right.src))
714            .then_with(|| left.dst.cmp(&right.dst))
715            .then_with(|| left.eid.cmp(&right.eid))
716            .then_with(|| Self::compare_maps(&left.properties, &right.properties))
717    }
718
719    fn compare_paths(left: &uni_common::Path, right: &uni_common::Path) -> std::cmp::Ordering {
720        left.nodes
721            .iter()
722            .zip(right.nodes.iter())
723            .map(|(l, r)| Self::compare_nodes(l, r))
724            .find(|o| o.is_ne())
725            .unwrap_or_else(|| left.nodes.len().cmp(&right.nodes.len()))
726            .then_with(|| {
727                left.edges
728                    .iter()
729                    .zip(right.edges.iter())
730                    .map(|(l, r)| Self::compare_edges(l, r))
731                    .find(|o| o.is_ne())
732                    .unwrap_or_else(|| left.edges.len().cmp(&right.edges.len()))
733            })
734    }
735
736    fn compare_temporal(left: &TemporalValue, right: &TemporalValue) -> std::cmp::Ordering {
737        match (left, right) {
738            (
739                TemporalValue::Date {
740                    days_since_epoch: l,
741                },
742                TemporalValue::Date {
743                    days_since_epoch: r,
744                },
745            ) => l.cmp(r),
746            (
747                TemporalValue::LocalTime {
748                    nanos_since_midnight: l,
749                },
750                TemporalValue::LocalTime {
751                    nanos_since_midnight: r,
752                },
753            ) => l.cmp(r),
754            (
755                TemporalValue::Time {
756                    nanos_since_midnight: lm,
757                    offset_seconds: lo,
758                },
759                TemporalValue::Time {
760                    nanos_since_midnight: rm,
761                    offset_seconds: ro,
762                },
763            ) => {
764                let l_utc = *lm as i128 - (*lo as i128) * 1_000_000_000;
765                let r_utc = *rm as i128 - (*ro as i128) * 1_000_000_000;
766                l_utc.cmp(&r_utc)
767            }
768            (
769                TemporalValue::LocalDateTime {
770                    nanos_since_epoch: l,
771                },
772                TemporalValue::LocalDateTime {
773                    nanos_since_epoch: r,
774                },
775            ) => l.cmp(r),
776            (
777                TemporalValue::DateTime {
778                    nanos_since_epoch: l,
779                    ..
780                },
781                TemporalValue::DateTime {
782                    nanos_since_epoch: r,
783                    ..
784                },
785            ) => l.cmp(r),
786            (
787                TemporalValue::Duration {
788                    months: lm,
789                    days: ld,
790                    nanos: ln,
791                },
792                TemporalValue::Duration {
793                    months: rm,
794                    days: rd,
795                    nanos: rn,
796                },
797            ) => (*lm, *ld, *ln).cmp(&(*rm, *rd, *rn)),
798            _ => Self::temporal_variant_rank(left).cmp(&Self::temporal_variant_rank(right)),
799        }
800    }
801
802    fn temporal_variant_rank(v: &TemporalValue) -> u8 {
803        match v {
804            TemporalValue::Date { .. } => 0,
805            TemporalValue::LocalTime { .. } => 1,
806            TemporalValue::Time { .. } => 2,
807            TemporalValue::LocalDateTime { .. } => 3,
808            TemporalValue::DateTime { .. } => 4,
809            TemporalValue::Duration { .. } => 5,
810            TemporalValue::Btic { .. } => 6,
811        }
812    }
813}
814
815/// Combined output of a `PROFILE` query execution.
816///
817/// Contains both the logical plan explanation and per-operator runtime
818/// statistics collected during execution.
819#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
820pub struct ProfileOutput {
821    /// Logical plan explanation with index usage and cost estimates.
822    pub explain: crate::query::planner::ExplainOutput,
823    /// Per-operator timing and memory statistics.
824    pub runtime_stats: Vec<OperatorStats>,
825    /// Wall-clock time for the entire execution in milliseconds.
826    pub total_time_ms: u64,
827    /// Peak memory used during execution in bytes.
828    pub peak_memory_bytes: usize,
829}
830
831/// Runtime statistics for a single logical plan operator.
832#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
833pub struct OperatorStats {
834    /// Human-readable operator name (e.g., `"GraphScan"`, `"Filter"`).
835    pub operator: String,
836    /// Number of rows produced by this operator.
837    pub actual_rows: usize,
838    /// Wall-clock time spent in this operator in milliseconds.
839    pub time_ms: f64,
840    /// Memory allocated by this operator in bytes.
841    pub memory_bytes: usize,
842    /// Number of index cache hits (if applicable).
843    pub index_hits: Option<usize>,
844    /// Number of index cache misses (if applicable).
845    pub index_misses: Option<usize>,
846}
847
848/// Walk a DataFusion physical plan tree (post-order DFS) and collect
849/// per-operator metrics recorded by `BaselineMetrics` during execution.
850///
851/// Children are visited before parents so the resulting `Vec` flows from
852/// data-producers (leaf scans) up to consumers (projections, filters).
853fn collect_plan_metrics(
854    plan: &Arc<dyn datafusion::physical_plan::ExecutionPlan>,
855) -> Vec<OperatorStats> {
856    let mut stats = Vec::new();
857    collect_plan_metrics_inner(plan, &mut stats);
858    stats
859}
860
861fn collect_plan_metrics_inner(
862    plan: &Arc<dyn datafusion::physical_plan::ExecutionPlan>,
863    out: &mut Vec<OperatorStats>,
864) {
865    // Recurse into children first (post-order)
866    for child in plan.children() {
867        collect_plan_metrics_inner(child, out);
868    }
869
870    let operator = plan.name().to_string();
871
872    let (actual_rows, time_ms) = match plan.metrics() {
873        Some(metrics) => {
874            let rows = metrics.output_rows().unwrap_or(0);
875            // elapsed_compute() returns nanoseconds
876            let nanos = metrics.elapsed_compute().unwrap_or(0);
877            let ms = nanos as f64 / 1_000_000.0;
878            (rows, ms)
879        }
880        None => (0, 0.0),
881    };
882
883    out.push(OperatorStats {
884        operator,
885        actual_rows,
886        time_ms,
887        memory_bytes: 0,
888        index_hits: None,
889        index_misses: None,
890    });
891}
892
893impl Executor {
894    /// Profiles query execution and returns results with per-operator timing
895    /// statistics extracted from the DataFusion physical plan tree.
896    pub async fn profile(
897        &self,
898        plan: crate::query::planner::LogicalPlan,
899        params: &HashMap<String, Value>,
900    ) -> Result<(Vec<HashMap<String, Value>>, ProfileOutput)> {
901        // Generate ExplainOutput first
902        let planner =
903            crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
904        let explain_output = planner.explain_logical_plan(&plan)?;
905
906        let start = Instant::now();
907
908        let prop_manager = self.create_prop_manager();
909
910        // DDL/admin queries don't flow through DataFusion — fall back to
911        // single aggregate stat.
912        let (results, stats) = if Self::is_ddl_or_admin(&plan) {
913            let results = self
914                .execute_subplan(plan, &prop_manager, params, None)
915                .await?;
916            let elapsed = start.elapsed();
917            let stats = vec![OperatorStats {
918                operator: "DDL/Admin Execution".to_string(),
919                actual_rows: results.len(),
920                time_ms: elapsed.as_secs_f64() * 1000.0,
921                memory_bytes: 0,
922                index_hits: None,
923                index_misses: None,
924            }];
925            (results, stats)
926        } else {
927            let (batches, execution_plan) = self
928                .execute_datafusion_with_plan(plan, &prop_manager, params)
929                .await?;
930            let results = self.record_batches_to_rows(batches)?;
931            let stats = collect_plan_metrics(&execution_plan);
932            (results, stats)
933        };
934
935        let total_time = start.elapsed();
936
937        Ok((
938            results,
939            ProfileOutput {
940                explain: explain_output,
941                runtime_stats: stats,
942                total_time_ms: total_time.as_millis() as u64,
943                peak_memory_bytes: 0,
944            },
945        ))
946    }
947
948    fn create_prop_manager(&self) -> uni_store::runtime::property_manager::PropertyManager {
949        uni_store::runtime::property_manager::PropertyManager::new(
950            self.storage.clone(),
951            self.storage.schema_manager_arc(),
952            1000,
953        )
954    }
955}
956
957#[cfg(test)]
958mod tests {
959    use super::*;
960
961    // ── Accumulator tests ────────────────────────────────────────────
962
963    #[test]
964    fn test_accumulator_count_basic() {
965        let mut acc = Accumulator::new("COUNT", false);
966        acc.update(&Value::Int(1), false);
967        acc.update(&Value::Null, false); // null skipped
968        acc.update(&Value::Int(2), false);
969        assert_eq!(acc.finish(), Value::Int(2));
970    }
971
972    #[test]
973    fn test_accumulator_count_wildcard() {
974        let mut acc = Accumulator::new("COUNT", false);
975        acc.update(&Value::Int(1), true);
976        acc.update(&Value::Null, true); // wildcard counts nulls
977        acc.update(&Value::Int(2), true);
978        assert_eq!(acc.finish(), Value::Int(3));
979    }
980
981    #[test]
982    fn test_accumulator_sum() {
983        let mut acc = Accumulator::new("SUM", false);
984        acc.update(&Value::Int(10), false);
985        acc.update(&Value::Float(2.5), false);
986        acc.update(&Value::Null, false); // null skipped
987        assert_eq!(acc.finish(), Value::Float(12.5));
988    }
989
990    #[test]
991    fn test_accumulator_avg() {
992        let mut acc = Accumulator::new("AVG", false);
993        acc.update(&Value::Int(10), false);
994        acc.update(&Value::Int(20), false);
995        acc.update(&Value::Int(30), false);
996        assert_eq!(acc.finish(), Value::Float(20.0));
997    }
998
999    #[test]
1000    fn test_accumulator_avg_empty() {
1001        let acc = Accumulator::new("AVG", false);
1002        assert_eq!(acc.finish(), Value::Null);
1003    }
1004
1005    #[test]
1006    fn test_accumulator_min_max() {
1007        let mut min_acc = Accumulator::new("MIN", false);
1008        let mut max_acc = Accumulator::new("MAX", false);
1009        for v in &[Value::Int(3), Value::Int(1), Value::Int(2)] {
1010            min_acc.update(v, false);
1011            max_acc.update(v, false);
1012        }
1013        assert_eq!(min_acc.finish(), Value::Int(1));
1014        assert_eq!(max_acc.finish(), Value::Int(3));
1015    }
1016
1017    #[test]
1018    fn test_accumulator_collect() {
1019        let mut acc = Accumulator::new("COLLECT", false);
1020        acc.update(&Value::String("a".into()), false);
1021        acc.update(&Value::Null, false); // null skipped
1022        acc.update(&Value::String("b".into()), false);
1023        assert_eq!(
1024            acc.finish(),
1025            Value::List(vec![Value::String("a".into()), Value::String("b".into()),])
1026        );
1027    }
1028
1029    #[test]
1030    fn test_accumulator_count_distinct() {
1031        let mut acc = Accumulator::new("COUNT", true);
1032        acc.update(&Value::String("a".into()), false);
1033        acc.update(&Value::String("b".into()), false);
1034        acc.update(&Value::String("a".into()), false); // duplicate
1035        acc.update(&Value::Null, false); // null skipped
1036        assert_eq!(acc.finish(), Value::Int(2));
1037    }
1038
1039    #[test]
1040    fn test_accumulator_percentile_empty() {
1041        let acc = Accumulator::new_with_percentile("PERCENTILEDISC", false, 0.5);
1042        assert_eq!(acc.finish(), Value::Null);
1043    }
1044
1045    // ── compare_values tests ─────────────────────────────────────────
1046
1047    #[test]
1048    fn test_compare_values_int_ordering() {
1049        assert!(Executor::compare_values(&Value::Int(1), &Value::Int(2)).is_lt());
1050        assert!(Executor::compare_values(&Value::Int(5), &Value::Int(5)).is_eq());
1051        assert!(Executor::compare_values(&Value::Int(9), &Value::Int(3)).is_gt());
1052    }
1053
1054    #[test]
1055    fn test_compare_values_null_last() {
1056        // Null should sort after everything
1057        assert!(Executor::compare_values(&Value::Int(1), &Value::Null).is_lt());
1058        assert!(Executor::compare_values(&Value::Null, &Value::Int(1)).is_gt());
1059        assert!(Executor::compare_values(&Value::Null, &Value::Null).is_eq());
1060    }
1061
1062    #[test]
1063    fn test_compare_values_cross_type_rank() {
1064        // String should sort before Bool which sorts before Int
1065        assert!(Executor::compare_values(&Value::String("z".into()), &Value::Bool(false)).is_lt());
1066        assert!(Executor::compare_values(&Value::Bool(true), &Value::Int(1)).is_lt());
1067    }
1068
1069    #[test]
1070    fn test_compare_values_lists() {
1071        let l1 = Value::List(vec![Value::Int(1), Value::Int(2)]);
1072        let l2 = Value::List(vec![Value::Int(1), Value::Int(3)]);
1073        assert!(Executor::compare_values(&l1, &l2).is_lt());
1074    }
1075}