1use anyhow::{Result, anyhow};
5use std::collections::{HashMap, HashSet};
6use std::sync::Arc;
7use std::time::Instant;
8use tokio::sync::RwLock;
9use uni_algo::algo::AlgorithmRegistry;
10use uni_common::{TemporalValue, Value};
11use uni_cypher::ast::{BinaryOp, Expr};
12use uni_store::QueryContext;
13use uni_store::runtime::l0_manager::L0Manager;
14use uni_store::runtime::writer::Writer;
15use uni_store::storage::manager::StorageManager;
16use uni_xervo::runtime::ModelRuntime;
17
18use crate::query::expr_eval::eval_binary_op;
19use crate::types::QueryWarning;
20
21use super::procedure::ProcedureRegistry;
22
23#[derive(Debug)]
25pub(crate) enum Accumulator {
26 Count(i64),
27 Sum(f64),
28 Min(Option<Value>),
29 Max(Option<Value>),
30 Avg { sum: f64, count: i64 },
31 Collect(Vec<Value>),
32 CountDistinct(HashSet<String>),
33 PercentileDisc { values: Vec<f64>, percentile: f64 },
34 PercentileCont { values: Vec<f64>, percentile: f64 },
35}
36
37fn numeric_to_value(val: f64) -> Value {
39 if val.fract() == 0.0 && val >= i64::MIN as f64 && val <= i64::MAX as f64 {
40 Value::Int(val as i64)
41 } else {
42 Value::Float(val)
43 }
44}
45
46fn cypher_type_rank(val: &Value) -> u8 {
48 match val {
49 Value::Null => 0,
50 Value::List(_) => 1,
51 Value::String(_) => 2,
52 Value::Bool(_) => 3,
53 Value::Int(_) | Value::Float(_) => 4,
54 _ => 5,
55 }
56}
57
58fn cypher_cross_type_cmp(a: &Value, b: &Value) -> std::cmp::Ordering {
60 use std::cmp::Ordering;
61 let ra = cypher_type_rank(a);
62 let rb = cypher_type_rank(b);
63 if ra != rb {
64 return ra.cmp(&rb);
65 }
66 match (a, b) {
67 (Value::Int(l), Value::Int(r)) => l.cmp(r),
68 (Value::Float(l), Value::Float(r)) => l.partial_cmp(r).unwrap_or(Ordering::Equal),
69 (Value::Int(l), Value::Float(r)) => (*l as f64).partial_cmp(r).unwrap_or(Ordering::Equal),
70 (Value::Float(l), Value::Int(r)) => l.partial_cmp(&(*r as f64)).unwrap_or(Ordering::Equal),
71 (Value::String(l), Value::String(r)) => l.cmp(r),
72 (Value::Bool(l), Value::Bool(r)) => l.cmp(r),
73 _ => Ordering::Equal,
74 }
75}
76
77impl Accumulator {
78 pub(crate) fn new(op: &str, distinct: bool) -> Self {
79 Self::new_with_percentile(op, distinct, 0.0)
80 }
81
82 pub(crate) fn new_with_percentile(op: &str, distinct: bool, percentile: f64) -> Self {
83 let op_upper = op.to_uppercase();
84 match op_upper.as_str() {
85 "COUNT" if distinct => Accumulator::CountDistinct(HashSet::new()),
86 "COUNT" => Accumulator::Count(0),
87 "SUM" => Accumulator::Sum(0.0),
88 "MIN" => Accumulator::Min(None),
89 "MAX" => Accumulator::Max(None),
90 "AVG" => Accumulator::Avg { sum: 0.0, count: 0 },
91 "COLLECT" => Accumulator::Collect(Vec::new()),
92 "PERCENTILEDISC" => Accumulator::PercentileDisc {
93 values: Vec::new(),
94 percentile,
95 },
96 "PERCENTILECONT" => Accumulator::PercentileCont {
97 values: Vec::new(),
98 percentile,
99 },
100 _ => Accumulator::Count(0),
101 }
102 }
103
104 pub(crate) fn update(&mut self, val: &Value, is_wildcard: bool) {
105 match self {
106 Accumulator::Count(c) => {
107 if is_wildcard || !val.is_null() {
108 *c += 1;
109 }
110 }
111 Accumulator::Sum(s) => {
112 if let Some(n) = val.as_f64() {
113 *s += n;
114 }
115 }
116 Accumulator::Min(current) => {
117 if !val.is_null() {
118 *current = Some(match current.take() {
119 None => val.clone(),
120 Some(cur) if cypher_cross_type_cmp(val, &cur).is_lt() => val.clone(),
121 Some(cur) => cur,
122 });
123 }
124 }
125 Accumulator::Max(current) => {
126 if !val.is_null() {
127 *current = Some(match current.take() {
128 None => val.clone(),
129 Some(cur) if cypher_cross_type_cmp(val, &cur).is_gt() => val.clone(),
130 Some(cur) => cur,
131 });
132 }
133 }
134 Accumulator::Avg { sum, count } => {
135 if let Some(n) = val.as_f64() {
136 *sum += n;
137 *count += 1;
138 }
139 }
140 Accumulator::Collect(v) => {
141 if !val.is_null() {
142 v.push(val.clone());
143 }
144 }
145 Accumulator::CountDistinct(s) => {
146 if !val.is_null() {
147 s.insert(val.to_string());
148 }
149 }
150 Accumulator::PercentileDisc { values, .. }
151 | Accumulator::PercentileCont { values, .. } => {
152 if let Some(n) = val.as_f64() {
153 values.push(n);
154 }
155 }
156 }
157 }
158
159 pub(crate) fn finish(&self) -> Value {
160 match self {
161 Accumulator::Count(c) => Value::Int(*c),
162 Accumulator::Sum(s) => numeric_to_value(*s),
163 Accumulator::Min(opt) => opt.as_ref().cloned().unwrap_or(Value::Null),
164 Accumulator::Max(opt) => opt.as_ref().cloned().unwrap_or(Value::Null),
165 Accumulator::Avg { sum, count } => {
166 if *count > 0 {
167 Value::Float(*sum / (*count as f64))
168 } else {
169 Value::Null
170 }
171 }
172 Accumulator::Collect(v) => Value::List(v.clone()),
173 Accumulator::CountDistinct(s) => Value::Int(s.len() as i64),
174 Accumulator::PercentileDisc { values, percentile } => {
175 if values.is_empty() {
176 return Value::Null;
177 }
178 let mut sorted = values.clone();
179 sorted.sort_by(|a, b| a.total_cmp(b));
180 let n = sorted.len();
181 let idx = (percentile * (n as f64 - 1.0)).round() as usize;
182 numeric_to_value(sorted[idx.min(n - 1)])
183 }
184 Accumulator::PercentileCont { values, percentile } => {
185 if values.is_empty() {
186 return Value::Null;
187 }
188 let mut sorted = values.clone();
189 sorted.sort_by(|a, b| a.total_cmp(b));
190 let n = sorted.len();
191 if n == 1 {
192 return Value::Float(sorted[0]);
193 }
194 let pos = percentile * (n as f64 - 1.0);
195 let lower = (pos.floor() as usize).min(n - 1);
196 let upper = (pos.ceil() as usize).min(n - 1);
197 if lower == upper {
198 Value::Float(sorted[lower])
199 } else {
200 let frac = pos - lower as f64;
201 Value::Float(sorted[lower] + frac * (sorted[upper] - sorted[lower]))
202 }
203 }
204 }
205 }
206}
207
208pub(crate) type GenExprCacheKey = (String, String);
210
211#[derive(Clone)]
223pub struct Executor {
224 pub(crate) storage: Arc<StorageManager>,
225 pub(crate) writer: Option<Arc<RwLock<Writer>>>,
226 pub(crate) l0_manager: Option<Arc<L0Manager>>,
227 pub(crate) algo_registry: Arc<AlgorithmRegistry>,
228 pub(crate) use_transaction: bool,
229 pub(crate) file_sandbox: uni_common::config::FileSandboxConfig,
231 pub(crate) config: uni_common::config::UniConfig,
232 pub(crate) gen_expr_cache: Arc<RwLock<HashMap<GenExprCacheKey, Expr>>>,
234 pub(crate) procedure_registry: Option<Arc<ProcedureRegistry>>,
236 pub(crate) xervo_runtime: Option<Arc<ModelRuntime>>,
238 pub(crate) warnings: Arc<std::sync::Mutex<Vec<QueryWarning>>>,
240}
241
242impl std::fmt::Debug for Executor {
243 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244 f.debug_struct("Executor")
245 .field("use_transaction", &self.use_transaction)
246 .field("has_writer", &self.writer.is_some())
247 .field("has_l0_manager", &self.l0_manager.is_some())
248 .field("has_xervo_runtime", &self.xervo_runtime.is_some())
249 .finish_non_exhaustive()
250 }
251}
252
253impl Executor {
254 pub fn new(storage: Arc<StorageManager>) -> Self {
256 Self {
257 storage,
258 writer: None,
259 l0_manager: None,
260 algo_registry: Arc::new(AlgorithmRegistry::new()),
261 use_transaction: false,
262 file_sandbox: uni_common::config::FileSandboxConfig::default(),
263 config: uni_common::config::UniConfig::default(),
264 gen_expr_cache: Arc::new(RwLock::new(HashMap::new())),
265 procedure_registry: None,
266 xervo_runtime: None,
267 warnings: Arc::new(std::sync::Mutex::new(Vec::new())),
268 }
269 }
270
271 pub fn new_with_writer(storage: Arc<StorageManager>, writer: Arc<RwLock<Writer>>) -> Self {
273 let mut executor = Self::new(storage);
274 executor.writer = Some(writer);
275 executor
276 }
277
278 pub fn set_procedure_registry(&mut self, registry: Arc<ProcedureRegistry>) {
280 self.procedure_registry = Some(registry);
281 }
282
283 pub fn set_xervo_runtime(&mut self, runtime: Option<Arc<ModelRuntime>>) {
285 self.xervo_runtime = runtime;
286 }
287
288 pub fn set_file_sandbox(&mut self, sandbox: uni_common::config::FileSandboxConfig) {
291 self.file_sandbox = sandbox;
292 }
293
294 pub fn set_config(&mut self, config: uni_common::config::UniConfig) {
296 self.config = config;
297 }
298
299 pub(crate) fn validate_path(&self, path: &str) -> Result<std::path::PathBuf> {
301 self.file_sandbox
302 .validate_path(path)
303 .map_err(|e| anyhow!("Path validation failed: {}", e))
304 }
305
306 pub fn set_writer(&mut self, writer: Arc<RwLock<Writer>>) {
308 self.writer = Some(writer);
309 }
310
311 pub fn take_warnings(&self) -> Vec<QueryWarning> {
313 self.warnings
314 .lock()
315 .map(|mut w| std::mem::take(&mut *w))
316 .unwrap_or_default()
317 }
318
319 pub fn set_use_transaction(&mut self, use_transaction: bool) {
321 self.use_transaction = use_transaction;
322 }
323
324 pub(crate) async fn get_context(&self) -> Option<QueryContext> {
326 if let Some(writer_lock) = &self.writer {
327 let writer = writer_lock.read().await;
328 let mut ctx = QueryContext::new_with_pending(
330 writer.l0_manager.get_current(),
331 writer.transaction_l0.clone(),
332 writer.l0_manager.get_pending_flush(),
333 );
334 ctx.set_deadline(Instant::now() + self.config.query_timeout);
335 Some(ctx)
336 } else {
337 self.l0_manager.as_ref().map(|m| {
338 let mut ctx = QueryContext::new(m.get_current());
339 ctx.set_deadline(Instant::now() + self.config.query_timeout);
340 ctx
341 })
342 }
343 }
344
345 pub(crate) fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
347 use std::cmp::Ordering;
348
349 let temporal_a = Self::extract_temporal_value(a);
350 let temporal_b = Self::extract_temporal_value(b);
351
352 if let (Some(ta), Some(tb)) = (&temporal_a, &temporal_b) {
353 return Self::compare_temporal(ta, tb);
354 }
355
356 if matches!(
359 (a, b),
360 (Value::String(_), Value::Temporal(_)) | (Value::Temporal(_), Value::String(_))
361 ) && let Some(ord) = Self::try_eval_ordering(a, b)
362 {
363 return ord;
364 }
365 if let (Value::String(_), Some(tb)) = (a, temporal_b)
366 && let Some(ord) = Self::try_eval_ordering(a, &Value::Temporal(tb))
367 {
368 return ord;
369 }
370 if let (Some(ta), Value::String(_)) = (temporal_a, b)
371 && let Some(ord) = Self::try_eval_ordering(&Value::Temporal(ta), b)
372 {
373 return ord;
374 }
375
376 let ra = Self::order_by_type_rank(a);
377 let rb = Self::order_by_type_rank(b);
378 if ra != rb {
379 return ra.cmp(&rb);
380 }
381
382 match (a, b) {
383 (Value::Map(l), Value::Map(r)) => Self::compare_maps(l, r),
384 (Value::Node(l), Value::Node(r)) => Self::compare_nodes(l, r),
385 (Value::Edge(l), Value::Edge(r)) => Self::compare_edges(l, r),
386 (Value::List(l), Value::List(r)) => Self::compare_lists(l, r),
387 (Value::Path(l), Value::Path(r)) => Self::compare_paths(l, r),
388 (Value::String(l), Value::String(r)) => {
389 Self::try_eval_ordering(a, b).unwrap_or_else(|| l.cmp(r))
391 }
392 (Value::Bool(l), Value::Bool(r)) => l.cmp(r),
393 (Value::Temporal(l), Value::Temporal(r)) => Self::compare_temporal(l, r),
394 (Value::Int(l), Value::Int(r)) => l.cmp(r),
395 (Value::Float(l), Value::Float(r)) => {
396 if l.is_nan() && r.is_nan() {
397 Ordering::Equal
398 } else if l.is_nan() {
399 Ordering::Greater
400 } else if r.is_nan() {
401 Ordering::Less
402 } else {
403 l.partial_cmp(r).unwrap_or(Ordering::Equal)
404 }
405 }
406 (Value::Int(l), Value::Float(r)) => {
407 if r.is_nan() {
408 Ordering::Less
409 } else {
410 (*l as f64).partial_cmp(r).unwrap_or(Ordering::Equal)
411 }
412 }
413 (Value::Float(l), Value::Int(r)) => {
414 if l.is_nan() {
415 Ordering::Greater
416 } else {
417 l.partial_cmp(&(*r as f64)).unwrap_or(Ordering::Equal)
418 }
419 }
420 (Value::Bytes(l), Value::Bytes(r)) => l.cmp(r),
421 (Value::Vector(l), Value::Vector(r)) => {
422 for (lv, rv) in l.iter().zip(r.iter()) {
423 let ord = lv.total_cmp(rv);
424 if ord != Ordering::Equal {
425 return ord;
426 }
427 }
428 l.len().cmp(&r.len())
429 }
430 _ => Ordering::Equal,
431 }
432 }
433
434 fn try_eval_ordering(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
435 use std::cmp::Ordering;
436 if matches!(eval_binary_op(a, &BinaryOp::Lt, b), Ok(Value::Bool(true))) {
437 Some(Ordering::Less)
438 } else if matches!(eval_binary_op(a, &BinaryOp::Gt, b), Ok(Value::Bool(true))) {
439 Some(Ordering::Greater)
440 } else if matches!(eval_binary_op(a, &BinaryOp::Eq, b), Ok(Value::Bool(true))) {
441 Some(Ordering::Equal)
442 } else {
443 None
444 }
445 }
446
447 fn order_by_type_rank(v: &Value) -> u8 {
450 match v {
451 Value::Map(map) => Self::map_order_rank(map),
452 Value::Node(_) => 1,
453 Value::Edge(_) => 2,
454 Value::List(_) => 3,
455 Value::Path(_) => 4,
456 Value::String(_) => 5,
457 Value::Bool(_) => 6,
458 Value::Temporal(_) => 7,
459 Value::Int(_) => 8,
460 Value::Float(f) if f.is_nan() => 9,
461 Value::Float(_) => 8,
462 Value::Null => 10,
463 Value::Bytes(_) | Value::Vector(_) => 11,
464 _ => 11,
465 }
466 }
467
468 fn map_order_rank(map: &HashMap<String, Value>) -> u8 {
469 if Self::map_as_temporal(map).is_some() {
470 7
471 } else if map.contains_key("nodes")
472 && (map.contains_key("relationships") || map.contains_key("edges"))
473 {
474 4
475 } else if map.contains_key("_eid")
476 || map.contains_key("_src")
477 || map.contains_key("_dst")
478 || map.contains_key("_type")
479 || map.contains_key("_type_name")
480 {
481 2
482 } else if map.contains_key("_vid")
483 || map.contains_key("_labels")
484 || map.contains_key("_label")
485 {
486 1
487 } else {
488 0
489 }
490 }
491
492 fn extract_temporal_value(value: &Value) -> Option<TemporalValue> {
493 crate::query::expr_eval::temporal_from_value(value)
494 }
495
496 fn map_as_temporal(map: &HashMap<String, Value>) -> Option<TemporalValue> {
497 crate::query::expr_eval::temporal_from_map_wrapper(map)
498 }
499
500 fn compare_lists(left: &[Value], right: &[Value]) -> std::cmp::Ordering {
501 left.iter()
502 .zip(right.iter())
503 .map(|(l, r)| Self::compare_values(l, r))
504 .find(|o| o.is_ne())
505 .unwrap_or_else(|| left.len().cmp(&right.len()))
506 }
507
508 fn compare_maps(
509 left: &HashMap<String, Value>,
510 right: &HashMap<String, Value>,
511 ) -> std::cmp::Ordering {
512 let mut l_pairs: Vec<_> = left.iter().collect();
513 let mut r_pairs: Vec<_> = right.iter().collect();
514 l_pairs.sort_by_key(|(k, _)| *k);
515 r_pairs.sort_by_key(|(k, _)| *k);
516
517 l_pairs
518 .iter()
519 .zip(r_pairs.iter())
520 .map(|((lk, lv), (rk, rv))| lk.cmp(rk).then_with(|| Self::compare_values(lv, rv)))
521 .find(|o| o.is_ne())
522 .unwrap_or_else(|| l_pairs.len().cmp(&r_pairs.len()))
523 }
524
525 fn compare_nodes(left: &uni_common::Node, right: &uni_common::Node) -> std::cmp::Ordering {
526 let mut l_labels = left.labels.clone();
527 let mut r_labels = right.labels.clone();
528 l_labels.sort();
529 r_labels.sort();
530
531 l_labels
532 .cmp(&r_labels)
533 .then_with(|| left.vid.cmp(&right.vid))
534 .then_with(|| Self::compare_maps(&left.properties, &right.properties))
535 }
536
537 fn compare_edges(left: &uni_common::Edge, right: &uni_common::Edge) -> std::cmp::Ordering {
538 left.edge_type
539 .cmp(&right.edge_type)
540 .then_with(|| left.src.cmp(&right.src))
541 .then_with(|| left.dst.cmp(&right.dst))
542 .then_with(|| left.eid.cmp(&right.eid))
543 .then_with(|| Self::compare_maps(&left.properties, &right.properties))
544 }
545
546 fn compare_paths(left: &uni_common::Path, right: &uni_common::Path) -> std::cmp::Ordering {
547 left.nodes
548 .iter()
549 .zip(right.nodes.iter())
550 .map(|(l, r)| Self::compare_nodes(l, r))
551 .find(|o| o.is_ne())
552 .unwrap_or_else(|| left.nodes.len().cmp(&right.nodes.len()))
553 .then_with(|| {
554 left.edges
555 .iter()
556 .zip(right.edges.iter())
557 .map(|(l, r)| Self::compare_edges(l, r))
558 .find(|o| o.is_ne())
559 .unwrap_or_else(|| left.edges.len().cmp(&right.edges.len()))
560 })
561 }
562
563 fn compare_temporal(left: &TemporalValue, right: &TemporalValue) -> std::cmp::Ordering {
564 match (left, right) {
565 (
566 TemporalValue::Date {
567 days_since_epoch: l,
568 },
569 TemporalValue::Date {
570 days_since_epoch: r,
571 },
572 ) => l.cmp(r),
573 (
574 TemporalValue::LocalTime {
575 nanos_since_midnight: l,
576 },
577 TemporalValue::LocalTime {
578 nanos_since_midnight: r,
579 },
580 ) => l.cmp(r),
581 (
582 TemporalValue::Time {
583 nanos_since_midnight: lm,
584 offset_seconds: lo,
585 },
586 TemporalValue::Time {
587 nanos_since_midnight: rm,
588 offset_seconds: ro,
589 },
590 ) => {
591 let l_utc = *lm as i128 - (*lo as i128) * 1_000_000_000;
592 let r_utc = *rm as i128 - (*ro as i128) * 1_000_000_000;
593 l_utc.cmp(&r_utc)
594 }
595 (
596 TemporalValue::LocalDateTime {
597 nanos_since_epoch: l,
598 },
599 TemporalValue::LocalDateTime {
600 nanos_since_epoch: r,
601 },
602 ) => l.cmp(r),
603 (
604 TemporalValue::DateTime {
605 nanos_since_epoch: l,
606 ..
607 },
608 TemporalValue::DateTime {
609 nanos_since_epoch: r,
610 ..
611 },
612 ) => l.cmp(r),
613 (
614 TemporalValue::Duration {
615 months: lm,
616 days: ld,
617 nanos: ln,
618 },
619 TemporalValue::Duration {
620 months: rm,
621 days: rd,
622 nanos: rn,
623 },
624 ) => (*lm, *ld, *ln).cmp(&(*rm, *rd, *rn)),
625 _ => Self::temporal_variant_rank(left).cmp(&Self::temporal_variant_rank(right)),
626 }
627 }
628
629 fn temporal_variant_rank(v: &TemporalValue) -> u8 {
630 match v {
631 TemporalValue::Date { .. } => 0,
632 TemporalValue::LocalTime { .. } => 1,
633 TemporalValue::Time { .. } => 2,
634 TemporalValue::LocalDateTime { .. } => 3,
635 TemporalValue::DateTime { .. } => 4,
636 TemporalValue::Duration { .. } => 5,
637 }
638 }
639}
640
641#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
646pub struct ProfileOutput {
647 pub explain: crate::query::planner::ExplainOutput,
649 pub runtime_stats: Vec<OperatorStats>,
651 pub total_time_ms: u64,
653 pub peak_memory_bytes: usize,
655}
656
657#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
659pub struct OperatorStats {
660 pub operator: String,
662 pub actual_rows: usize,
664 pub time_ms: f64,
666 pub memory_bytes: usize,
668 pub index_hits: Option<usize>,
670 pub index_misses: Option<usize>,
672}
673
674impl Executor {
675 pub async fn profile(
680 &self,
681 plan: crate::query::planner::LogicalPlan,
682 params: &HashMap<String, Value>,
683 ) -> Result<(Vec<HashMap<String, Value>>, ProfileOutput)> {
684 let planner =
686 crate::query::planner::QueryPlanner::new(self.storage.schema_manager().schema());
687 let explain_output = planner.explain_logical_plan(&plan)?;
688
689 let start = Instant::now();
690
691 let prop_manager = self.create_prop_manager();
693 let results = self.execute(plan.clone(), &prop_manager, params).await?;
694
695 let total_time = start.elapsed();
696
697 let stats = vec![OperatorStats {
699 operator: "DataFusion Execution".to_string(),
700 actual_rows: results.len(),
701 time_ms: total_time.as_secs_f64() * 1000.0,
702 memory_bytes: 0,
703 index_hits: None,
704 index_misses: None,
705 }];
706
707 Ok((
708 results,
709 ProfileOutput {
710 explain: explain_output,
711 runtime_stats: stats,
712 total_time_ms: total_time.as_millis() as u64,
713 peak_memory_bytes: 0,
714 },
715 ))
716 }
717
718 fn create_prop_manager(&self) -> uni_store::runtime::property_manager::PropertyManager {
719 uni_store::runtime::property_manager::PropertyManager::new(
720 self.storage.clone(),
721 self.storage.schema_manager_arc(),
722 1000,
723 )
724 }
725}