Skip to main content

uni_query/query/executor/
core.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use anyhow::{Result, anyhow};
5use std::collections::{HashMap, HashSet};
6use std::sync::Arc;
7use std::time::Instant;
8use tokio::sync::RwLock;
9use uni_algo::algo::AlgorithmRegistry;
10use uni_common::{TemporalValue, Value};
11use uni_cypher::ast::{BinaryOp, Expr};
12use uni_store::PropertyManager;
13use uni_store::QueryContext;
14use uni_store::runtime::l0_manager::L0Manager;
15use uni_store::runtime::writer::Writer;
16use uni_store::storage::manager::StorageManager;
17use uni_xervo::runtime::ModelRuntime;
18
19use crate::query::expr_eval::eval_binary_op;
20use crate::types::QueryWarning;
21
22use super::procedure::ProcedureRegistry;
23
24/// Mutable accumulator for Cypher aggregate functions (COUNT, SUM, AVG, ...).
25#[derive(Debug)]
26pub(crate) enum Accumulator {
27    Count(i64),
28    Sum(f64),
29    Min(Option<Value>),
30    Max(Option<Value>),
31    Avg { sum: f64, count: i64 },
32    Collect(Vec<Value>),
33    CountDistinct(HashSet<Value>),
34    PercentileDisc { values: Vec<f64>, percentile: f64 },
35    PercentileCont { values: Vec<f64>, percentile: f64 },
36}
37
38/// Convert f64 to Value, preserving integer representation when possible.
39fn numeric_to_value(val: f64) -> Value {
40    if val.fract() == 0.0 && val >= i64::MIN as f64 && val <= i64::MAX as f64 {
41        Value::Int(val as i64)
42    } else {
43        Value::Float(val)
44    }
45}
46
47/// Canonical key for `COUNT(DISTINCT …)`: integral finite floats collapse to
48/// `Int` so `1` and `1.0` count once (matching `cypher_eq`'s numeric
49/// coercion); every other value keeps its type. The accumulator previously
50/// stringified values, which also collapsed `1` with `'1'` — a wrong answer.
51fn distinct_key(val: &Value) -> Value {
52    match val {
53        Value::Float(f)
54            if f.is_finite()
55                && f.fract() == 0.0
56                && *f >= i64::MIN as f64
57                && *f <= i64::MAX as f64 =>
58        {
59            Value::Int(*f as i64)
60        }
61        other => other.clone(),
62    }
63}
64
65/// Cross-type ordering rank for Cypher min/max (lower rank = smaller).
66///
67/// NOTE: This is deliberately a *different* ordering from
68/// [`Executor::compare_values`] (used for `ORDER BY`). The two systems must
69/// not be merged: `min`/`max` follow the openCypher aggregation ordering
70/// (Null < List < String < Boolean < Number), whereas `ORDER BY` follows the
71/// openCypher comparability/ordering rules for sort, which rank the type
72/// families differently. Unifying them would silently break one or the other.
73fn cypher_type_rank(val: &Value) -> u8 {
74    match val {
75        Value::Null => 0,
76        Value::List(_) => 1,
77        Value::String(_) => 2,
78        Value::Bool(_) => 3,
79        Value::Int(_) | Value::Float(_) => 4,
80        _ => 5,
81    }
82}
83
84/// Compare two Cypher values for min/max with cross-type ordering.
85fn cypher_cross_type_cmp(a: &Value, b: &Value) -> std::cmp::Ordering {
86    use std::cmp::Ordering;
87    let ra = cypher_type_rank(a);
88    let rb = cypher_type_rank(b);
89    if ra != rb {
90        return ra.cmp(&rb);
91    }
92    match (a, b) {
93        (Value::Int(l), Value::Int(r)) => l.cmp(r),
94        (Value::Float(l), Value::Float(r)) => l.partial_cmp(r).unwrap_or(Ordering::Equal),
95        (Value::Int(l), Value::Float(r)) => (*l as f64).partial_cmp(r).unwrap_or(Ordering::Equal),
96        (Value::Float(l), Value::Int(r)) => l.partial_cmp(&(*r as f64)).unwrap_or(Ordering::Equal),
97        (Value::String(l), Value::String(r)) => l.cmp(r),
98        (Value::Bool(l), Value::Bool(r)) => l.cmp(r),
99        _ => Ordering::Equal,
100    }
101}
102
103impl Accumulator {
104    pub(crate) fn new(op: &str, distinct: bool) -> Self {
105        Self::new_with_percentile(op, distinct, 0.0)
106    }
107
108    pub(crate) fn new_with_percentile(op: &str, distinct: bool, percentile: f64) -> Self {
109        let op_upper = op.to_uppercase();
110        match op_upper.as_str() {
111            "COUNT" if distinct => Accumulator::CountDistinct(HashSet::new()),
112            "COUNT" => Accumulator::Count(0),
113            "SUM" => Accumulator::Sum(0.0),
114            "MIN" => Accumulator::Min(None),
115            "MAX" => Accumulator::Max(None),
116            "AVG" => Accumulator::Avg { sum: 0.0, count: 0 },
117            "COLLECT" => Accumulator::Collect(Vec::new()),
118            "PERCENTILEDISC" => Accumulator::PercentileDisc {
119                values: Vec::new(),
120                percentile,
121            },
122            "PERCENTILECONT" => Accumulator::PercentileCont {
123                values: Vec::new(),
124                percentile,
125            },
126            _ => Accumulator::Count(0),
127        }
128    }
129
130    pub(crate) fn update(&mut self, val: &Value, is_wildcard: bool) {
131        match self {
132            Accumulator::Count(c) => {
133                if is_wildcard || !val.is_null() {
134                    *c += 1;
135                }
136            }
137            Accumulator::Sum(s) => {
138                if let Some(n) = val.as_f64() {
139                    *s += n;
140                }
141            }
142            Accumulator::Min(current) => {
143                if !val.is_null() {
144                    *current = Some(match current.take() {
145                        None => val.clone(),
146                        Some(cur) if cypher_cross_type_cmp(val, &cur).is_lt() => val.clone(),
147                        Some(cur) => cur,
148                    });
149                }
150            }
151            Accumulator::Max(current) => {
152                if !val.is_null() {
153                    *current = Some(match current.take() {
154                        None => val.clone(),
155                        Some(cur) if cypher_cross_type_cmp(val, &cur).is_gt() => val.clone(),
156                        Some(cur) => cur,
157                    });
158                }
159            }
160            Accumulator::Avg { sum, count } => {
161                if let Some(n) = val.as_f64() {
162                    *sum += n;
163                    *count += 1;
164                }
165            }
166            Accumulator::Collect(v) => {
167                if !val.is_null() {
168                    v.push(val.clone());
169                }
170            }
171            Accumulator::CountDistinct(s) => {
172                if !val.is_null() {
173                    s.insert(distinct_key(val));
174                }
175            }
176            Accumulator::PercentileDisc { values, .. }
177            | Accumulator::PercentileCont { values, .. } => {
178                if let Some(n) = val.as_f64() {
179                    values.push(n);
180                }
181            }
182        }
183    }
184
185    pub(crate) fn finish(&self) -> Value {
186        match self {
187            Accumulator::Count(c) => Value::Int(*c),
188            Accumulator::Sum(s) => numeric_to_value(*s),
189            Accumulator::Min(opt) => opt.as_ref().cloned().unwrap_or(Value::Null),
190            Accumulator::Max(opt) => opt.as_ref().cloned().unwrap_or(Value::Null),
191            Accumulator::Avg { sum, count } => {
192                if *count > 0 {
193                    Value::Float(*sum / (*count as f64))
194                } else {
195                    Value::Null
196                }
197            }
198            Accumulator::Collect(v) => Value::List(v.clone()),
199            Accumulator::CountDistinct(s) => Value::Int(s.len() as i64),
200            Accumulator::PercentileDisc { values, percentile } => {
201                if values.is_empty() {
202                    return Value::Null;
203                }
204                let mut sorted = values.clone();
205                sorted.sort_by(|a, b| a.total_cmp(b));
206                let n = sorted.len();
207                let idx = (percentile * (n as f64 - 1.0)).round() as usize;
208                numeric_to_value(sorted[idx.min(n - 1)])
209            }
210            Accumulator::PercentileCont { values, percentile } => {
211                if values.is_empty() {
212                    return Value::Null;
213                }
214                let mut sorted = values.clone();
215                sorted.sort_by(|a, b| a.total_cmp(b));
216                let n = sorted.len();
217                if n == 1 {
218                    return Value::Float(sorted[0]);
219                }
220                let pos = percentile * (n as f64 - 1.0);
221                let lower = (pos.floor() as usize).min(n - 1);
222                let upper = (pos.ceil() as usize).min(n - 1);
223                if lower == upper {
224                    Value::Float(sorted[lower])
225                } else {
226                    let frac = pos - lower as f64;
227                    Value::Float(sorted[lower] + frac * (sorted[upper] - sorted[lower]))
228                }
229            }
230        }
231    }
232}
233
234/// Cache key for parsed generation expressions: (label_name, property_name)
235pub(crate) type GenExprCacheKey = (String, String);
236
237/// Query executor: runs logical plans against a Uni storage backend.
238///
239/// `Executor` bridges the logical query plan produced by [`crate::query::planner::QueryPlanner`]
240/// with the underlying `StorageManager`. It handles both read-only sessions and
241/// write-enabled sessions (via `Writer` and `L0Manager`).
242///
243/// # Cloning
244///
245/// `Executor` is cheaply cloneable — all expensive state is held behind `Arc`s.
246/// Clone it freely to share across tasks.
247///
248/// **Note on `Clone` semantics**: the derived `Clone` would alias the
249/// `warnings` `Arc<Mutex<…>>`, making warnings collected on the clone
250/// bleed into the original. A clone is intended to be a "fresh executor
251/// borrowing shared state," so the manual impl below installs a fresh
252/// `warnings` accumulator. All other Arc-shared state is intentionally
253/// shared (caches, registries, runtimes).
254// M-PUBLIC-DEBUG: Manual impl because Writer/ModelRuntime do not implement Debug.
255pub struct Executor {
256    pub(crate) storage: Arc<StorageManager>,
257    pub(crate) writer: Option<Arc<Writer>>,
258    pub(crate) l0_manager: Option<Arc<L0Manager>>,
259    pub(crate) algo_registry: Arc<AlgorithmRegistry>,
260    pub(crate) use_transaction: bool,
261    /// File sandbox configuration for BACKUP/COPY/EXPORT commands
262    pub(crate) file_sandbox: uni_common::config::FileSandboxConfig,
263    pub(crate) config: uni_common::config::UniConfig,
264    /// Cache for parsed generation expressions to avoid re-parsing on every row
265    pub(crate) gen_expr_cache: Arc<RwLock<HashMap<GenExprCacheKey, Expr>>>,
266    /// External procedure registry for test/user-defined procedures.
267    pub(crate) procedure_registry: Option<Arc<ProcedureRegistry>>,
268    /// Uni-Xervo runtime used by vector auto-embedding paths.
269    pub(crate) xervo_runtime: Option<Arc<ModelRuntime>>,
270    /// Warnings collected during the last execution.
271    pub(crate) warnings: Arc<std::sync::Mutex<Vec<QueryWarning>>>,
272    /// Private transaction L0 buffer for query context and mutations.
273    /// Used by Transaction to route reads and writes through a private L0 buffer
274    /// without requiring the writer lock at transaction-creation time.
275    pub(crate) transaction_l0_override:
276        Option<Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>>,
277    /// Per-transaction VID/EID reservoir. When set, CREATE/MERGE paths pull
278    /// IDs from this batched cache instead of `Writer::next_vid()` /
279    /// `Writer::next_eid()`, amortizing the `IdAllocator`'s global mutex.
280    pub(crate) id_reservoir: Option<Arc<uni_store::runtime::TxIdReservoir>>,
281    /// User-defined custom scalar function registry.
282    pub(crate) custom_function_registry:
283        Option<Arc<uni_query_functions::custom_functions::CustomFunctionRegistry>>,
284    /// Cooperative cancellation token. Passed to `QueryContext` and
285    /// `GraphExecutionContext` so in-flight operators can detect cancellation.
286    pub(crate) cancellation_token: Option<tokio_util::sync::CancellationToken>,
287    /// Shared `PropertyManager` to reuse inside `create_datafusion_planner`
288    /// instead of allocating a fresh one (with empty LRU caches) per query.
289    /// When `None`, the planner falls back to constructing from the
290    /// `&PropertyManager` parameter — preserving existing behavior for any
291    /// caller that does not set this.
292    pub(crate) prop_manager_arc: Option<Arc<PropertyManager>>,
293    /// Pre-built DataFusion `SessionContext` template with all Cypher UDFs
294    /// already registered. When set AND no custom UDFs are installed,
295    /// `create_datafusion_planner` clones this Arc (O(1)) instead of building
296    /// a fresh `SessionContext` and re-registering UDFs every query
297    /// (~140 µs/query saved on the InMemory backend).
298    pub(crate) df_session_template: Option<Arc<datafusion::execution::context::SessionContext>>,
299    /// Pinned L0 snapshot for snapshot-isolated reads (Component C1). When set,
300    /// `get_context` builds the read context from this frozen view instead of the
301    /// live L0, so a transaction's reads are isolated from concurrent commits.
302    /// `None` (the default, when no snapshot has been pinned) means live reads.
303    pub(crate) read_snapshot: Option<uni_store::runtime::SnapshotView>,
304}
305
306impl std::fmt::Debug for Executor {
307    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
308        f.debug_struct("Executor")
309            .field("use_transaction", &self.use_transaction)
310            .field("has_writer", &self.writer.is_some())
311            .field("has_l0_manager", &self.l0_manager.is_some())
312            .field("has_xervo_runtime", &self.xervo_runtime.is_some())
313            .finish_non_exhaustive()
314    }
315}
316
317impl Clone for Executor {
318    /// Clone an `Executor`, sharing all Arc-backed state but installing a
319    /// fresh `warnings` accumulator. Warnings are query-scoped; aliasing
320    /// the Mutex across clones would let one query's warnings spill into
321    /// another.
322    fn clone(&self) -> Self {
323        Self {
324            storage: self.storage.clone(),
325            writer: self.writer.clone(),
326            l0_manager: self.l0_manager.clone(),
327            algo_registry: self.algo_registry.clone(),
328            use_transaction: self.use_transaction,
329            file_sandbox: self.file_sandbox.clone(),
330            config: self.config.clone(),
331            gen_expr_cache: self.gen_expr_cache.clone(),
332            procedure_registry: self.procedure_registry.clone(),
333            xervo_runtime: self.xervo_runtime.clone(),
334            // Fresh warnings per clone — see struct doc.
335            warnings: Arc::new(std::sync::Mutex::new(Vec::new())),
336            transaction_l0_override: self.transaction_l0_override.clone(),
337            id_reservoir: self.id_reservoir.clone(),
338            custom_function_registry: self.custom_function_registry.clone(),
339            cancellation_token: self.cancellation_token.clone(),
340            prop_manager_arc: self.prop_manager_arc.clone(),
341            df_session_template: self.df_session_template.clone(),
342            read_snapshot: self.read_snapshot.clone(),
343        }
344    }
345}
346
347impl Executor {
348    /// Create a read-only executor backed by the given storage manager.
349    pub fn new(storage: Arc<StorageManager>) -> Self {
350        // M4: auto-wire the host plugin registry so low-level test
351        // setups that bypass `Uni::build` still see `uni.schema.*` and
352        // `uni.algo.*` procedures (which no longer have hardcoded
353        // dispatch arms in `procedure_call.rs`).
354        let proc_registry = Arc::new(ProcedureRegistry::new());
355        proc_registry.set_plugin_registry(crate::procedures_plugin::default_host_plugin_registry());
356        Self {
357            storage,
358            writer: None,
359            l0_manager: None,
360            algo_registry: Arc::new(AlgorithmRegistry::new()),
361            use_transaction: false,
362            file_sandbox: uni_common::config::FileSandboxConfig::default(),
363            config: uni_common::config::UniConfig::default(),
364            gen_expr_cache: Arc::new(RwLock::new(HashMap::new())),
365            procedure_registry: Some(proc_registry),
366            xervo_runtime: None,
367            warnings: Arc::new(std::sync::Mutex::new(Vec::new())),
368            transaction_l0_override: None,
369            id_reservoir: None,
370            custom_function_registry: None,
371            cancellation_token: None,
372            prop_manager_arc: None,
373            df_session_template: None,
374            read_snapshot: None,
375        }
376    }
377
378    /// Install a shared `PropertyManager` so the planner can reuse it
379    /// instead of constructing a fresh one per query.
380    ///
381    /// Reusing the caller's `Arc<PropertyManager>` skips the LRU cache
382    /// allocation cost (~80 µs/query in the InMemory backend).
383    pub fn set_prop_manager(&mut self, pm: Arc<PropertyManager>) {
384        self.prop_manager_arc = Some(pm);
385    }
386
387    /// Install a pre-built `SessionContext` template with Cypher UDFs already
388    /// registered, so the planner can clone it (O(1)) instead of rebuilding.
389    ///
390    /// When custom UDFs are also installed, the template is bypassed and a
391    /// fresh `SessionContext` is built for that query (preserves UDF
392    /// isolation across executors).
393    pub fn set_df_session_template(
394        &mut self,
395        tmpl: Arc<datafusion::execution::context::SessionContext>,
396    ) {
397        self.df_session_template = Some(tmpl);
398    }
399
400    /// Create a write-enabled executor with an attached `Writer`.
401    pub fn new_with_writer(storage: Arc<StorageManager>, writer: Arc<Writer>) -> Self {
402        let mut executor = Self::new(storage);
403        executor.writer = Some(writer);
404        executor
405    }
406
407    /// Attach an external procedure registry for user-defined procedures.
408    pub fn set_procedure_registry(&mut self, registry: Arc<ProcedureRegistry>) {
409        self.procedure_registry = Some(registry);
410    }
411
412    /// Attach or detach the Uni-Xervo model runtime for vector auto-embedding.
413    pub fn set_xervo_runtime(&mut self, runtime: Option<Arc<ModelRuntime>>) {
414        self.xervo_runtime = runtime;
415    }
416
417    /// Set the file sandbox configuration for BACKUP/COPY/EXPORT commands.
418    /// MUST be called with sandboxed config in server mode.
419    pub fn set_file_sandbox(&mut self, sandbox: uni_common::config::FileSandboxConfig) {
420        self.file_sandbox = sandbox;
421    }
422
423    /// Apply a runtime configuration to this executor.
424    pub fn set_config(&mut self, config: uni_common::config::UniConfig) {
425        self.config = config;
426    }
427
428    /// Validate a file path against the sandbox configuration.
429    pub(crate) fn validate_path(&self, path: &str) -> Result<std::path::PathBuf> {
430        self.file_sandbox
431            .validate_path(path)
432            .map_err(|e| anyhow!("Path validation failed: {}", e))
433    }
434
435    /// Attach a `Writer` after construction, enabling write operations.
436    pub fn set_writer(&mut self, writer: Arc<Writer>) {
437        self.writer = Some(writer);
438    }
439
440    /// Take all collected warnings from the last execution, leaving the collector empty.
441    pub fn take_warnings(&self) -> Vec<QueryWarning> {
442        self.warnings
443            .lock()
444            .map(|mut w| std::mem::take(&mut *w))
445            .unwrap_or_default()
446    }
447
448    /// Configure whether query execution should operate within a transaction context.
449    pub fn set_use_transaction(&mut self, use_transaction: bool) {
450        self.use_transaction = use_transaction;
451    }
452
453    /// Set a private transaction L0 buffer for both read visibility (QueryContext)
454    /// and mutation routing.
455    pub fn set_transaction_l0(
456        &mut self,
457        l0: Arc<parking_lot::RwLock<uni_store::runtime::l0::L0Buffer>>,
458    ) {
459        self.transaction_l0_override = Some(l0);
460    }
461
462    /// Attach a pinned L0 snapshot for snapshot-isolated reads (Component C1).
463    ///
464    /// When `Some`, `get_context` reads from the frozen view instead of
465    /// the live L0. A `None` is a no-op (live reads), so threading it is always
466    /// safe — callers that do not pin a snapshot leave it `None`.
467    pub fn set_read_snapshot(&mut self, snapshot: Option<uni_store::runtime::SnapshotView>) {
468        self.read_snapshot = snapshot;
469    }
470
471    /// The storage manager queries should read through: the transaction's
472    /// version-pinned manager when the read snapshot carries one (C2 — L1
473    /// scans then filter to `_version <= started_at_version`, so a flush
474    /// completing mid-transaction cannot leak post-snapshot rows), else the
475    /// live manager.
476    pub(crate) fn effective_storage(&self) -> Arc<StorageManager> {
477        self.read_snapshot
478            .as_ref()
479            .and_then(|s| s.pinned_storage.clone())
480            .unwrap_or_else(|| self.storage.clone())
481    }
482
483    /// Attach a per-transaction VID/EID reservoir. When set, CREATE/MERGE
484    /// paths in `execute_create_pattern` pull IDs from the reservoir's
485    /// pre-reserved cache, amortizing the global `IdAllocator` mutex.
486    pub fn set_id_reservoir(&mut self, r: Arc<uni_store::runtime::TxIdReservoir>) {
487        self.id_reservoir = Some(r);
488    }
489
490    /// Attach a custom scalar function registry for user-defined functions.
491    pub fn set_custom_functions(
492        &mut self,
493        registry: Arc<uni_query_functions::custom_functions::CustomFunctionRegistry>,
494    ) {
495        self.custom_function_registry = Some(registry);
496    }
497
498    /// Set a cooperative cancellation token for in-flight query cancellation.
499    pub fn set_cancellation_token(&mut self, token: tokio_util::sync::CancellationToken) {
500        self.cancellation_token = Some(token);
501    }
502
503    /// Build a `QueryContext` from the current writer or standalone L0 manager.
504    /// When `transaction_l0_override` is set, it is used as the transaction L0 —
505    /// this is how private-per-transaction L0 buffers become visible to reads
506    /// without requiring the writer lock at tx creation.
507    pub(crate) async fn get_context(&self) -> Option<QueryContext> {
508        if let Some(writer) = &self.writer {
509            // Prefer the override (private tx L0) over the writer's slot
510            let tx_l0 = self.transaction_l0_override.clone();
511            // Component C1: when a snapshot is pinned, read from the frozen
512            // generation (`main` + `extra`) so concurrent commits are invisible;
513            // `transaction_l0` stays live for read-your-writes. `None` (the
514            // default, when no snapshot is pinned) ⇒ live reads.
515            let mut ctx = match &self.read_snapshot {
516                Some(snap) => {
517                    QueryContext::new_with_pending(snap.main.clone(), tx_l0, snap.extra.clone())
518                }
519                None => QueryContext::new_with_pending(
520                    writer.l0_manager.get_current(),
521                    tx_l0,
522                    writer.l0_manager.get_pending_flush(),
523                ),
524            };
525            ctx.set_deadline(Instant::now() + self.config.query_timeout);
526            if let Some(ref token) = self.cancellation_token {
527                ctx.set_cancellation_token(token.clone());
528            }
529            Some(ctx)
530        } else {
531            self.l0_manager.as_ref().map(|m| {
532                let mut ctx = QueryContext::new(m.get_current());
533                ctx.set_deadline(Instant::now() + self.config.query_timeout);
534                if let Some(ref token) = self.cancellation_token {
535                    ctx.set_cancellation_token(token.clone());
536                }
537                ctx
538            })
539        }
540    }
541
542    /// Total ordering for Cypher ORDER BY, including cross-type comparisons.
543    ///
544    /// NOTE: This uses a different cross-type rank from
545    /// [`cypher_cross_type_cmp`]/[`cypher_type_rank`] (the `min`/`max`
546    /// aggregation ordering). The divergence is intentional — sort ordering
547    /// and aggregation ordering follow distinct openCypher rules — so the two
548    /// must stay separate.
549    pub(crate) fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
550        use std::cmp::Ordering;
551
552        let temporal_a = Self::extract_temporal_value(a);
553        let temporal_b = Self::extract_temporal_value(b);
554
555        if let (Some(ta), Some(tb)) = (&temporal_a, &temporal_b) {
556            return Self::compare_temporal(ta, tb);
557        }
558
559        // Temporal strings (e.g. "1984-10-11T...") and Value::Temporal should
560        // compare using Cypher temporal semantics when compatible.
561        if matches!(
562            (a, b),
563            (Value::String(_), Value::Temporal(_)) | (Value::Temporal(_), Value::String(_))
564        ) && let Some(ord) = Self::try_eval_ordering(a, b)
565        {
566            return ord;
567        }
568        if let (Value::String(_), Some(tb)) = (a, temporal_b)
569            && let Some(ord) = Self::try_eval_ordering(a, &Value::Temporal(tb))
570        {
571            return ord;
572        }
573        if let (Some(ta), Value::String(_)) = (temporal_a, b)
574            && let Some(ord) = Self::try_eval_ordering(&Value::Temporal(ta), b)
575        {
576            return ord;
577        }
578
579        let ra = Self::order_by_type_rank(a);
580        let rb = Self::order_by_type_rank(b);
581        if ra != rb {
582            return ra.cmp(&rb);
583        }
584
585        match (a, b) {
586            (Value::Map(l), Value::Map(r)) => Self::compare_maps(l, r),
587            (Value::Node(l), Value::Node(r)) => Self::compare_nodes(l, r),
588            (Value::Edge(l), Value::Edge(r)) => Self::compare_edges(l, r),
589            (Value::List(l), Value::List(r)) => Self::compare_lists(l, r),
590            (Value::Path(l), Value::Path(r)) => Self::compare_paths(l, r),
591            (Value::String(l), Value::String(r)) => {
592                // Use eval_binary_op on the original references to avoid cloning.
593                Self::try_eval_ordering(a, b).unwrap_or_else(|| l.cmp(r))
594            }
595            (Value::Bool(l), Value::Bool(r)) => l.cmp(r),
596            (Value::Temporal(l), Value::Temporal(r)) => Self::compare_temporal(l, r),
597            (Value::Int(l), Value::Int(r)) => l.cmp(r),
598            (Value::Float(l), Value::Float(r)) => {
599                if l.is_nan() && r.is_nan() {
600                    Ordering::Equal
601                } else if l.is_nan() {
602                    Ordering::Greater
603                } else if r.is_nan() {
604                    Ordering::Less
605                } else {
606                    l.partial_cmp(r).unwrap_or(Ordering::Equal)
607                }
608            }
609            (Value::Int(l), Value::Float(r)) => {
610                if r.is_nan() {
611                    Ordering::Less
612                } else {
613                    (*l as f64).partial_cmp(r).unwrap_or(Ordering::Equal)
614                }
615            }
616            (Value::Float(l), Value::Int(r)) => {
617                if l.is_nan() {
618                    Ordering::Greater
619                } else {
620                    l.partial_cmp(&(*r as f64)).unwrap_or(Ordering::Equal)
621                }
622            }
623            (Value::Bytes(l), Value::Bytes(r)) => l.cmp(r),
624            (Value::Vector(l), Value::Vector(r)) => {
625                for (lv, rv) in l.iter().zip(r.iter()) {
626                    let ord = lv.total_cmp(rv);
627                    if ord != Ordering::Equal {
628                        return ord;
629                    }
630                }
631                l.len().cmp(&r.len())
632            }
633            _ => Ordering::Equal,
634        }
635    }
636
637    fn try_eval_ordering(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
638        use std::cmp::Ordering;
639        if matches!(eval_binary_op(a, &BinaryOp::Lt, b), Ok(Value::Bool(true))) {
640            Some(Ordering::Less)
641        } else if matches!(eval_binary_op(a, &BinaryOp::Gt, b), Ok(Value::Bool(true))) {
642            Some(Ordering::Greater)
643        } else if matches!(eval_binary_op(a, &BinaryOp::Eq, b), Ok(Value::Bool(true))) {
644            Some(Ordering::Equal)
645        } else {
646            None
647        }
648    }
649
650    /// Cypher ORDER BY total precedence:
651    /// MAP < NODE < RELATIONSHIP < LIST < PATH < STRING < BOOLEAN < TEMPORAL < NUMBER < NaN < NULL
652    fn order_by_type_rank(v: &Value) -> u8 {
653        match v {
654            Value::Map(map) => Self::map_order_rank(map),
655            Value::Node(_) => 1,
656            Value::Edge(_) => 2,
657            Value::List(_) => 3,
658            Value::Path(_) => 4,
659            Value::String(_) => 5,
660            Value::Bool(_) => 6,
661            Value::Temporal(_) => 7,
662            Value::Int(_) => 8,
663            Value::Float(f) if f.is_nan() => 9,
664            Value::Float(_) => 8,
665            Value::Null => 10,
666            Value::Bytes(_) | Value::Vector(_) => 11,
667            _ => 11,
668        }
669    }
670
671    fn map_order_rank(map: &HashMap<String, Value>) -> u8 {
672        if Self::map_as_temporal(map).is_some() {
673            7
674        } else if map.contains_key("nodes")
675            && (map.contains_key("relationships") || map.contains_key("edges"))
676        {
677            4
678        } else if map.contains_key("_eid")
679            || map.contains_key("_src")
680            || map.contains_key("_dst")
681            || map.contains_key("_type")
682            || map.contains_key("_type_name")
683        {
684            2
685        } else if map.contains_key("_vid")
686            || map.contains_key("_labels")
687            || map.contains_key("_label")
688        {
689            1
690        } else {
691            0
692        }
693    }
694
695    fn extract_temporal_value(value: &Value) -> Option<TemporalValue> {
696        crate::query::expr_eval::temporal_from_value(value)
697    }
698
699    fn map_as_temporal(map: &HashMap<String, Value>) -> Option<TemporalValue> {
700        crate::query::expr_eval::temporal_from_map_wrapper(map)
701    }
702
703    fn compare_lists(left: &[Value], right: &[Value]) -> std::cmp::Ordering {
704        left.iter()
705            .zip(right.iter())
706            .map(|(l, r)| Self::compare_values(l, r))
707            .find(|o| o.is_ne())
708            .unwrap_or_else(|| left.len().cmp(&right.len()))
709    }
710
711    fn compare_maps(
712        left: &HashMap<String, Value>,
713        right: &HashMap<String, Value>,
714    ) -> std::cmp::Ordering {
715        let mut l_pairs: Vec<_> = left.iter().collect();
716        let mut r_pairs: Vec<_> = right.iter().collect();
717        l_pairs.sort_by_key(|(k, _)| *k);
718        r_pairs.sort_by_key(|(k, _)| *k);
719
720        l_pairs
721            .iter()
722            .zip(r_pairs.iter())
723            .map(|((lk, lv), (rk, rv))| lk.cmp(rk).then_with(|| Self::compare_values(lv, rv)))
724            .find(|o| o.is_ne())
725            .unwrap_or_else(|| l_pairs.len().cmp(&r_pairs.len()))
726    }
727
728    fn compare_nodes(left: &uni_common::Node, right: &uni_common::Node) -> std::cmp::Ordering {
729        let mut l_labels = left.labels.clone();
730        let mut r_labels = right.labels.clone();
731        l_labels.sort();
732        r_labels.sort();
733
734        l_labels
735            .cmp(&r_labels)
736            .then_with(|| left.vid.cmp(&right.vid))
737            .then_with(|| Self::compare_maps(&left.properties, &right.properties))
738    }
739
740    fn compare_edges(left: &uni_common::Edge, right: &uni_common::Edge) -> std::cmp::Ordering {
741        left.edge_type
742            .cmp(&right.edge_type)
743            .then_with(|| left.src.cmp(&right.src))
744            .then_with(|| left.dst.cmp(&right.dst))
745            .then_with(|| left.eid.cmp(&right.eid))
746            .then_with(|| Self::compare_maps(&left.properties, &right.properties))
747    }
748
749    fn compare_paths(left: &uni_common::Path, right: &uni_common::Path) -> std::cmp::Ordering {
750        left.nodes
751            .iter()
752            .zip(right.nodes.iter())
753            .map(|(l, r)| Self::compare_nodes(l, r))
754            .find(|o| o.is_ne())
755            .unwrap_or_else(|| left.nodes.len().cmp(&right.nodes.len()))
756            .then_with(|| {
757                left.edges
758                    .iter()
759                    .zip(right.edges.iter())
760                    .map(|(l, r)| Self::compare_edges(l, r))
761                    .find(|o| o.is_ne())
762                    .unwrap_or_else(|| left.edges.len().cmp(&right.edges.len()))
763            })
764    }
765
766    fn compare_temporal(left: &TemporalValue, right: &TemporalValue) -> std::cmp::Ordering {
767        match (left, right) {
768            (
769                TemporalValue::Date {
770                    days_since_epoch: l,
771                },
772                TemporalValue::Date {
773                    days_since_epoch: r,
774                },
775            ) => l.cmp(r),
776            (
777                TemporalValue::LocalTime {
778                    nanos_since_midnight: l,
779                },
780                TemporalValue::LocalTime {
781                    nanos_since_midnight: r,
782                },
783            ) => l.cmp(r),
784            (
785                TemporalValue::Time {
786                    nanos_since_midnight: lm,
787                    offset_seconds: lo,
788                },
789                TemporalValue::Time {
790                    nanos_since_midnight: rm,
791                    offset_seconds: ro,
792                },
793            ) => {
794                let l_utc = *lm as i128 - (*lo as i128) * 1_000_000_000;
795                let r_utc = *rm as i128 - (*ro as i128) * 1_000_000_000;
796                l_utc.cmp(&r_utc)
797            }
798            (
799                TemporalValue::LocalDateTime {
800                    nanos_since_epoch: l,
801                },
802                TemporalValue::LocalDateTime {
803                    nanos_since_epoch: r,
804                },
805            ) => l.cmp(r),
806            (
807                TemporalValue::DateTime {
808                    nanos_since_epoch: l,
809                    ..
810                },
811                TemporalValue::DateTime {
812                    nanos_since_epoch: r,
813                    ..
814                },
815            ) => l.cmp(r),
816            (
817                TemporalValue::Duration {
818                    months: lm,
819                    days: ld,
820                    nanos: ln,
821                },
822                TemporalValue::Duration {
823                    months: rm,
824                    days: rd,
825                    nanos: rn,
826                },
827            ) => (*lm, *ld, *ln).cmp(&(*rm, *rd, *rn)),
828            _ => Self::temporal_variant_rank(left).cmp(&Self::temporal_variant_rank(right)),
829        }
830    }
831
832    fn temporal_variant_rank(v: &TemporalValue) -> u8 {
833        match v {
834            TemporalValue::Date { .. } => 0,
835            TemporalValue::LocalTime { .. } => 1,
836            TemporalValue::Time { .. } => 2,
837            TemporalValue::LocalDateTime { .. } => 3,
838            TemporalValue::DateTime { .. } => 4,
839            TemporalValue::Duration { .. } => 5,
840            TemporalValue::Btic { .. } => 6,
841        }
842    }
843}
844
845/// Combined output of a `PROFILE` query execution.
846///
847/// Contains both the logical plan explanation and per-operator runtime
848/// statistics collected during execution.
849#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
850pub struct ProfileOutput {
851    /// Logical plan explanation with index usage and cost estimates.
852    pub explain: crate::query::planner::ExplainOutput,
853    /// Per-operator timing and memory statistics.
854    pub runtime_stats: Vec<OperatorStats>,
855    /// Wall-clock time for the entire execution in milliseconds.
856    pub total_time_ms: u64,
857    /// Peak memory used during execution in bytes.
858    pub peak_memory_bytes: usize,
859}
860
861/// Runtime statistics for a single logical plan operator.
862#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
863pub struct OperatorStats {
864    /// Human-readable operator name (e.g., `"GraphScan"`, `"Filter"`).
865    pub operator: String,
866    /// Number of rows produced by this operator.
867    pub actual_rows: usize,
868    /// Wall-clock time spent in this operator in milliseconds.
869    pub time_ms: f64,
870    /// Memory allocated by this operator in bytes.
871    pub memory_bytes: usize,
872    /// Number of index cache hits (if applicable).
873    pub index_hits: Option<usize>,
874    /// Number of index cache misses (if applicable).
875    pub index_misses: Option<usize>,
876}
877
878/// Walk a DataFusion physical plan tree (post-order DFS) and collect
879/// per-operator metrics recorded by `BaselineMetrics` during execution.
880///
881/// Children are visited before parents so the resulting `Vec` flows from
882/// data-producers (leaf scans) up to consumers (projections, filters).
883fn collect_plan_metrics(
884    plan: &Arc<dyn datafusion::physical_plan::ExecutionPlan>,
885) -> Vec<OperatorStats> {
886    let mut stats = Vec::new();
887    collect_plan_metrics_inner(plan, &mut stats);
888    stats
889}
890
891fn collect_plan_metrics_inner(
892    plan: &Arc<dyn datafusion::physical_plan::ExecutionPlan>,
893    out: &mut Vec<OperatorStats>,
894) {
895    // Recurse into children first (post-order)
896    for child in plan.children() {
897        collect_plan_metrics_inner(child, out);
898    }
899
900    let operator = plan.name().to_string();
901
902    let (actual_rows, time_ms) = match plan.metrics() {
903        Some(metrics) => {
904            let rows = metrics.output_rows().unwrap_or(0);
905            // elapsed_compute() returns nanoseconds
906            let nanos = metrics.elapsed_compute().unwrap_or(0);
907            let ms = nanos as f64 / 1_000_000.0;
908            (rows, ms)
909        }
910        None => (0, 0.0),
911    };
912
913    out.push(OperatorStats {
914        operator,
915        actual_rows,
916        time_ms,
917        memory_bytes: 0,
918        index_hits: None,
919        index_misses: None,
920    });
921}
922
923impl Executor {
924    /// Profiles query execution and returns results with per-operator timing
925    /// statistics extracted from the DataFusion physical plan tree.
926    pub async fn profile(
927        &self,
928        plan: crate::query::planner::LogicalPlan,
929        params: &HashMap<String, Value>,
930    ) -> Result<(Vec<HashMap<String, Value>>, ProfileOutput)> {
931        // Generate ExplainOutput first
932        let planner =
933            crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
934        let explain_output = planner.explain_logical_plan(&plan)?;
935
936        let start = Instant::now();
937
938        let prop_manager = self.create_prop_manager();
939
940        // DDL/admin queries don't flow through DataFusion — fall back to
941        // single aggregate stat.
942        let (results, stats) = if Self::is_ddl_or_admin(&plan) {
943            let results = self
944                .execute_subplan(plan, &prop_manager, params, None)
945                .await?;
946            let elapsed = start.elapsed();
947            let stats = vec![OperatorStats {
948                operator: "DDL/Admin Execution".to_string(),
949                actual_rows: results.len(),
950                time_ms: elapsed.as_secs_f64() * 1000.0,
951                memory_bytes: 0,
952                index_hits: None,
953                index_misses: None,
954            }];
955            (results, stats)
956        } else {
957            let (batches, execution_plan) = self
958                .execute_datafusion_with_plan(plan, &prop_manager, params)
959                .await?;
960            let results = self.record_batches_to_rows(batches)?;
961            let stats = collect_plan_metrics(&execution_plan);
962            (results, stats)
963        };
964
965        let total_time = start.elapsed();
966
967        Ok((
968            results,
969            ProfileOutput {
970                explain: explain_output,
971                runtime_stats: stats,
972                total_time_ms: total_time.as_millis() as u64,
973                peak_memory_bytes: 0,
974            },
975        ))
976    }
977
978    fn create_prop_manager(&self) -> uni_store::runtime::property_manager::PropertyManager {
979        uni_store::runtime::property_manager::PropertyManager::new(
980            self.storage.clone(),
981            self.storage.schema_manager_arc(),
982            1000,
983        )
984    }
985}
986
987#[cfg(test)]
988mod tests {
989    use super::*;
990
991    // ── Accumulator tests ────────────────────────────────────────────
992
993    #[test]
994    fn test_accumulator_count_basic() {
995        let mut acc = Accumulator::new("COUNT", false);
996        acc.update(&Value::Int(1), false);
997        acc.update(&Value::Null, false); // null skipped
998        acc.update(&Value::Int(2), false);
999        assert_eq!(acc.finish(), Value::Int(2));
1000    }
1001
1002    #[test]
1003    fn test_accumulator_count_wildcard() {
1004        let mut acc = Accumulator::new("COUNT", false);
1005        acc.update(&Value::Int(1), true);
1006        acc.update(&Value::Null, true); // wildcard counts nulls
1007        acc.update(&Value::Int(2), true);
1008        assert_eq!(acc.finish(), Value::Int(3));
1009    }
1010
1011    #[test]
1012    fn test_accumulator_sum() {
1013        let mut acc = Accumulator::new("SUM", false);
1014        acc.update(&Value::Int(10), false);
1015        acc.update(&Value::Float(2.5), false);
1016        acc.update(&Value::Null, false); // null skipped
1017        assert_eq!(acc.finish(), Value::Float(12.5));
1018    }
1019
1020    #[test]
1021    fn test_accumulator_avg() {
1022        let mut acc = Accumulator::new("AVG", false);
1023        acc.update(&Value::Int(10), false);
1024        acc.update(&Value::Int(20), false);
1025        acc.update(&Value::Int(30), false);
1026        assert_eq!(acc.finish(), Value::Float(20.0));
1027    }
1028
1029    #[test]
1030    fn test_accumulator_avg_empty() {
1031        let acc = Accumulator::new("AVG", false);
1032        assert_eq!(acc.finish(), Value::Null);
1033    }
1034
1035    #[test]
1036    fn test_accumulator_min_max() {
1037        let mut min_acc = Accumulator::new("MIN", false);
1038        let mut max_acc = Accumulator::new("MAX", false);
1039        for v in &[Value::Int(3), Value::Int(1), Value::Int(2)] {
1040            min_acc.update(v, false);
1041            max_acc.update(v, false);
1042        }
1043        assert_eq!(min_acc.finish(), Value::Int(1));
1044        assert_eq!(max_acc.finish(), Value::Int(3));
1045    }
1046
1047    #[test]
1048    fn test_accumulator_collect() {
1049        let mut acc = Accumulator::new("COLLECT", false);
1050        acc.update(&Value::String("a".into()), false);
1051        acc.update(&Value::Null, false); // null skipped
1052        acc.update(&Value::String("b".into()), false);
1053        assert_eq!(
1054            acc.finish(),
1055            Value::List(vec![Value::String("a".into()), Value::String("b".into()),])
1056        );
1057    }
1058
1059    #[test]
1060    fn test_accumulator_count_distinct() {
1061        let mut acc = Accumulator::new("COUNT", true);
1062        acc.update(&Value::String("a".into()), false);
1063        acc.update(&Value::String("b".into()), false);
1064        acc.update(&Value::String("a".into()), false); // duplicate
1065        acc.update(&Value::Null, false); // null skipped
1066        assert_eq!(acc.finish(), Value::Int(2));
1067    }
1068
1069    #[test]
1070    fn test_accumulator_percentile_empty() {
1071        let acc = Accumulator::new_with_percentile("PERCENTILEDISC", false, 0.5);
1072        assert_eq!(acc.finish(), Value::Null);
1073    }
1074
1075    // ── compare_values tests ─────────────────────────────────────────
1076
1077    #[test]
1078    fn test_compare_values_int_ordering() {
1079        assert!(Executor::compare_values(&Value::Int(1), &Value::Int(2)).is_lt());
1080        assert!(Executor::compare_values(&Value::Int(5), &Value::Int(5)).is_eq());
1081        assert!(Executor::compare_values(&Value::Int(9), &Value::Int(3)).is_gt());
1082    }
1083
1084    #[test]
1085    fn test_compare_values_null_last() {
1086        // Null should sort after everything
1087        assert!(Executor::compare_values(&Value::Int(1), &Value::Null).is_lt());
1088        assert!(Executor::compare_values(&Value::Null, &Value::Int(1)).is_gt());
1089        assert!(Executor::compare_values(&Value::Null, &Value::Null).is_eq());
1090    }
1091
1092    #[test]
1093    fn test_compare_values_cross_type_rank() {
1094        // String should sort before Bool which sorts before Int
1095        assert!(Executor::compare_values(&Value::String("z".into()), &Value::Bool(false)).is_lt());
1096        assert!(Executor::compare_values(&Value::Bool(true), &Value::Int(1)).is_lt());
1097    }
1098
1099    #[test]
1100    fn test_compare_values_lists() {
1101        let l1 = Value::List(vec![Value::Int(1), Value::Int(2)]);
1102        let l2 = Value::List(vec![Value::Int(1), Value::Int(3)]);
1103        assert!(Executor::compare_values(&l1, &l2).is_lt());
1104    }
1105
1106    /// COUNT(DISTINCT) must coerce numerically (1 vs 1.0 count once, matching
1107    /// `cypher_eq`) while keeping cross-type identity (1 vs '1' vs true count
1108    /// separately — the old stringifying accumulator collapsed 1 with '1').
1109    #[test]
1110    fn test_count_distinct_type_identity() {
1111        let mut acc = Accumulator::new("COUNT", true);
1112        acc.update(&Value::Int(1), false);
1113        acc.update(&Value::Float(1.0), false);
1114        assert_eq!(acc.finish(), Value::Int(1), "1 and 1.0 count once");
1115
1116        let mut acc = Accumulator::new("COUNT", true);
1117        acc.update(&Value::Int(1), false);
1118        acc.update(&Value::String("1".to_string()), false);
1119        assert_eq!(acc.finish(), Value::Int(2), "1 and '1' are distinct");
1120
1121        let mut acc = Accumulator::new("COUNT", true);
1122        acc.update(&Value::Int(1), false);
1123        acc.update(&Value::Bool(true), false);
1124        assert_eq!(acc.finish(), Value::Int(2), "1 and true are distinct");
1125
1126        // Non-integral and non-finite floats keep float identity; NaN and
1127        // signed zero follow Value's normalized Hash/Eq (one bucket each).
1128        let mut acc = Accumulator::new("COUNT", true);
1129        acc.update(&Value::Float(1.5), false);
1130        acc.update(&Value::Float(1.5), false);
1131        acc.update(&Value::Float(f64::NAN), false);
1132        acc.update(&Value::Float(f64::NAN), false);
1133        acc.update(&Value::Float(0.0), false);
1134        acc.update(&Value::Float(-0.0), false);
1135        assert_eq!(acc.finish(), Value::Int(3), "1.5, NaN, 0 -> 3 buckets");
1136    }
1137}