Skip to main content

nervusdb_core/query/
executor.rs

1//! Query Executor v2 - Index-Aware Execution Engine
2//!
3//! 解决 v1 Executor 的性能问题:
4//! - v1: ScanNode 全表扫描构建节点集合 -> O(N)
5//! - v2: 智能索引扫描 + Index Nested Loop Join -> O(log N)
6//!
7//! 关键改进:
8//! 1. NodeScan 使用 Hexastore 索引而不是全表扫描
9//! 2. Index Nested Loop Join 避免内存爆炸
10//! 3. 基础统计信息支持 Join 顺序优化
11
12use crate::error::Error;
13use crate::query::ast::{
14    BinaryOperator, Clause, Direction, ExistsExpression, Expression, FunctionCall,
15    ListComprehension, Literal, PathElement, Pattern, PropertyAccess, PropertyMap,
16    RelationshipDirection, RelationshipPattern,
17};
18use crate::query::planner::{
19    AggregateFunction, AggregateNode, DistinctNode, ExpandNode, ExpandVarLengthNode, FilterNode,
20    FtsCandidateScanNode, LeftOuterJoinNode, LimitNode, NestedLoopJoinNode, PhysicalPlan,
21    ProjectNode, ScanNode, SingleRowNode, SkipNode, SortNode, UnwindNode, VectorTopKScanNode,
22};
23use crate::{Database, QueryCriteria, Triple};
24use std::collections::{HashMap, HashSet, VecDeque};
25
26#[derive(Debug, Clone)]
27pub enum Value {
28    String(String),
29    Float(f64),
30    Boolean(bool),
31    Null,
32    Vector(Vec<f32>),
33    Node(u64),
34    Relationship(Triple),
35}
36
37impl PartialEq for Value {
38    fn eq(&self, other: &Self) -> bool {
39        match (self, other) {
40            (Value::String(a), Value::String(b)) => a == b,
41            (Value::Float(a), Value::Float(b)) => a == b,
42            (Value::Boolean(a), Value::Boolean(b)) => a == b,
43            (Value::Null, Value::Null) => true,
44            (Value::Vector(a), Value::Vector(b)) => a == b,
45            (Value::Node(a), Value::Node(b)) => a == b,
46            (Value::Relationship(a), Value::Relationship(b)) => a == b,
47            _ => false,
48        }
49    }
50}
51
52#[derive(Debug, Clone)]
53pub struct Record {
54    pub values: HashMap<String, Value>,
55}
56
57impl Record {
58    pub fn new() -> Self {
59        Self {
60            values: HashMap::new(),
61        }
62    }
63
64    pub fn get(&self, key: &str) -> Option<&Value> {
65        self.values.get(key)
66    }
67
68    pub fn insert(&mut self, key: String, value: Value) {
69        self.values.insert(key, value);
70    }
71
72    pub fn merge(&mut self, other: &Record) {
73        for (k, v) in &other.values {
74            self.values.insert(k.clone(), v.clone());
75        }
76    }
77}
78
79fn record_distinct_key(record: &Record) -> String {
80    let mut keys: Vec<&String> = record.values.keys().collect();
81    keys.sort();
82    let mut out = String::new();
83    for key in keys {
84        if let Some(value) = record.values.get(key) {
85            out.push_str(key);
86            out.push('=');
87            out.push_str(&format!("{:?};", value));
88        }
89    }
90    out
91}
92
93impl Default for Record {
94    fn default() -> Self {
95        Self::new()
96    }
97}
98
99fn try_merge_records(mut left: Record, right: Record) -> Option<Record> {
100    for (k, v) in right.values {
101        if let Some(existing) = left.values.get(&k) {
102            if existing != &v {
103                return None;
104            }
105        } else {
106            left.values.insert(k, v);
107        }
108    }
109    Some(left)
110}
111
112pub struct ExecutionContext<'a> {
113    pub db: &'a Database,
114    pub params: &'a HashMap<String, Value>,
115}
116
117/// Arc-based execution context for true streaming across FFI boundary
118/// This allows iterators to hold a reference to the database without lifetime issues
119pub struct ArcExecutionContext {
120    pub db: std::sync::Arc<Database>,
121    pub params: std::sync::Arc<HashMap<String, Value>>,
122}
123
124impl ArcExecutionContext {
125    pub fn new(db: std::sync::Arc<Database>, params: HashMap<String, Value>) -> Self {
126        Self {
127            db,
128            params: std::sync::Arc::new(params),
129        }
130    }
131}
132
133/// Owned execution context for FFI - uses raw pointer to avoid lifetime issues
134/// SAFETY: The caller must ensure db_ptr remains valid for the lifetime of this context
135pub struct OwnedExecutionContext {
136    pub db_ptr: *const Database,
137    pub params: HashMap<String, Value>,
138}
139
140impl OwnedExecutionContext {
141    /// Returns a reference to the database.
142    ///
143    /// # Safety
144    /// The caller must ensure `db_ptr` is valid and points to a live `Database` instance.
145    pub unsafe fn db(&self) -> &Database {
146        unsafe { &*self.db_ptr }
147    }
148}
149
150/// 节点扫描统计信息
151#[derive(Debug)]
152pub struct ScanStats {
153    pub estimated_cardinality: usize,
154    pub has_labels: bool,
155}
156
157impl ScanStats {
158    /// 计算 ScanNode 的预估基数
159    pub fn estimate_scan_cardinality(db: &Database, labels: &[String]) -> Self {
160        if labels.is_empty() {
161            // 无标签过滤:需要获取所有唯一节点
162            // 使用实际采样来估算:扫描少量数据并估算唯一节点数
163            let mut unique_nodes = std::collections::HashSet::new();
164            let sample_criteria = crate::QueryCriteria::default();
165            let sample_count = 100; // 采样前100个三元组
166
167            for triple in db.query(sample_criteria).take(sample_count) {
168                unique_nodes.insert(triple.subject_id);
169                unique_nodes.insert(triple.object_id);
170            }
171
172            // 如果采样了全部数据,返回精确值;否则按比例估算
173            let estimated_nodes = if unique_nodes.len() < sample_count / 2 {
174                unique_nodes.len() // 数据较少,返回精确值
175            } else {
176                unique_nodes.len() * 2 // 估算还有更多节点
177            };
178
179            Self {
180                estimated_cardinality: estimated_nodes.max(1),
181                has_labels: false,
182            }
183        } else {
184            // 有标签过滤:估算有该标签的节点数
185            let mut total_labeled_nodes = 0;
186
187            // 解析 "type" 谓词 ID
188            if let Ok(Some(type_id)) = db.resolve_id("type") {
189                for label in labels {
190                    if let Ok(Some(label_id)) = db.resolve_id(label) {
191                        // 计算 (?, type, label) 的三元组数量
192                        let criteria = QueryCriteria {
193                            subject_id: None,
194                            predicate_id: Some(type_id),
195                            object_id: Some(label_id),
196                        };
197
198                        let labeled_count = db.query(criteria).count();
199                        if total_labeled_nodes == 0 {
200                            total_labeled_nodes = labeled_count;
201                        } else {
202                            // 多标签:取交集(假设独立性,实际会更小)
203                            total_labeled_nodes = (total_labeled_nodes * labeled_count / 10).max(1);
204                        }
205                    }
206                }
207            }
208
209            Self {
210                estimated_cardinality: total_labeled_nodes.max(1),
211                has_labels: true,
212            }
213        }
214    }
215}
216
217// ============================================================================
218// Enhanced Execution Plans
219// ============================================================================
220
221pub trait ExecutionPlan {
222    fn execute<'a>(
223        &'a self,
224        ctx: &'a ExecutionContext<'a>,
225    ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error>;
226
227    /// 估算该操作的输出基数
228    fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize;
229}
230
231impl ExecutionPlan for PhysicalPlan {
232    fn execute<'a>(
233        &'a self,
234        ctx: &'a ExecutionContext<'a>,
235    ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
236        match self {
237            PhysicalPlan::SingleRow(node) => node.execute(ctx),
238            PhysicalPlan::Scan(node) => node.execute(ctx),
239            PhysicalPlan::FtsCandidateScan(node) => node.execute(ctx),
240            PhysicalPlan::VectorTopKScan(node) => node.execute(ctx),
241            PhysicalPlan::Filter(node) => node.execute(ctx),
242            PhysicalPlan::Project(node) => node.execute(ctx),
243            PhysicalPlan::Limit(node) => node.execute(ctx),
244            PhysicalPlan::Skip(node) => node.execute(ctx),
245            PhysicalPlan::Sort(node) => node.execute(ctx),
246            PhysicalPlan::Distinct(node) => node.execute(ctx),
247            PhysicalPlan::Aggregate(node) => node.execute(ctx),
248            PhysicalPlan::NestedLoopJoin(node) => node.execute(ctx),
249            PhysicalPlan::LeftOuterJoin(node) => node.execute(ctx),
250            PhysicalPlan::Expand(node) => node.execute(ctx),
251            PhysicalPlan::ExpandVarLength(node) => node.execute(ctx),
252            PhysicalPlan::Unwind(node) => node.execute(ctx),
253            _ => Err(Error::Other("Unsupported physical plan type".to_string())),
254        }
255    }
256
257    fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
258        match self {
259            PhysicalPlan::SingleRow(node) => node.estimate_cardinality(ctx),
260            PhysicalPlan::Scan(node) => node.estimate_cardinality(ctx),
261            PhysicalPlan::FtsCandidateScan(node) => node.estimate_cardinality(ctx),
262            PhysicalPlan::VectorTopKScan(node) => node.estimate_cardinality(ctx),
263            PhysicalPlan::Filter(node) => node.estimate_cardinality(ctx),
264            PhysicalPlan::Project(node) => node.estimate_cardinality(ctx),
265            PhysicalPlan::Limit(node) => node.estimate_cardinality(ctx),
266            PhysicalPlan::Skip(node) => node.estimate_cardinality(ctx),
267            PhysicalPlan::Sort(node) => node.estimate_cardinality(ctx),
268            PhysicalPlan::Distinct(node) => node.estimate_cardinality(ctx),
269            PhysicalPlan::Aggregate(node) => node.estimate_cardinality(ctx),
270            PhysicalPlan::NestedLoopJoin(node) => node.estimate_cardinality(ctx),
271            PhysicalPlan::LeftOuterJoin(node) => node.estimate_cardinality(ctx),
272            PhysicalPlan::Expand(node) => node.estimate_cardinality(ctx),
273            PhysicalPlan::ExpandVarLength(node) => node.estimate_cardinality(ctx),
274            PhysicalPlan::Unwind(node) => node.estimate_cardinality(ctx),
275            _ => 1,
276        }
277    }
278}
279
280use std::sync::Arc;
281
282impl PhysicalPlan {
283    /// Execute the plan with Arc-based context for true streaming across FFI boundary.
284    /// Returns a 'static iterator that owns its database reference.
285    ///
286    /// Note: The iterator is NOT Send because Database contains non-Send fields.
287    /// This is fine for FFI use where calls are typically single-threaded.
288    pub fn execute_streaming(
289        self,
290        ctx: Arc<ArcExecutionContext>,
291    ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'static>, Error> {
292        match self {
293            PhysicalPlan::SingleRow(_) => Ok(Box::new(std::iter::once(Ok(Record::new())))),
294            PhysicalPlan::Scan(node) => {
295                let alias = node.alias;
296                let labels = node.labels;
297                let db = Arc::clone(&ctx.db);
298
299                if labels.is_empty() {
300                    Ok(Box::new(scan_all_nodes_streaming(db, alias)))
301                } else {
302                    Ok(Box::new(scan_labeled_nodes_streaming(db, labels, alias)))
303                }
304            }
305            PhysicalPlan::FtsCandidateScan(node) => {
306                #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
307                {
308                    let alias = node.alias;
309                    let labels = node.labels;
310                    let property = node.property;
311                    let query_expr = node.query;
312                    let db = Arc::clone(&ctx.db);
313
314                    let query = match resolve_query_string_streaming(&query_expr, &ctx) {
315                        Some(q) => q,
316                        None => {
317                            return Ok(Box::new(std::iter::empty())
318                                as Box<dyn Iterator<Item = Result<Record, Error>> + 'static>);
319                        }
320                    };
321
322                    let Some(scores) = db.fts_scores_for_query(property.as_str(), query.as_str())
323                    else {
324                        return Ok(Box::new(std::iter::empty())
325                            as Box<dyn Iterator<Item = Result<Record, Error>> + 'static>);
326                    };
327
328                    let candidate_ids: Vec<u64> = scores.keys().copied().collect();
329                    let type_and_labels = resolve_label_ids_streaming(&db, &labels);
330
331                    return Ok(Box::new(candidate_ids.into_iter().filter_map(
332                        move |node_id| {
333                            if let Some((type_id, label_ids)) = type_and_labels.as_ref()
334                                && !node_has_labels_streaming(&db, node_id, *type_id, label_ids)
335                            {
336                                return None;
337                            }
338
339                            let mut record = Record::new();
340                            record.insert(alias.clone(), Value::Node(node_id));
341                            Some(Ok(record))
342                        },
343                    )));
344                }
345
346                #[cfg(not(all(feature = "fts", not(target_arch = "wasm32"))))]
347                {
348                    let _ = node;
349                    Ok(Box::new(std::iter::empty())
350                        as Box<
351                            dyn Iterator<Item = Result<Record, Error>> + 'static,
352                        >)
353                }
354            }
355            PhysicalPlan::VectorTopKScan(node) => {
356                // Delegate to the non-streaming implementation and materialize the limited result
357                // set, then hand out a 'static iterator for FFI.
358                let exec_ctx = ExecutionContext {
359                    db: ctx.db.as_ref(),
360                    params: ctx.params.as_ref(),
361                };
362                let iter = node.execute(&exec_ctx)?;
363                let mut records = Vec::new();
364                for item in iter {
365                    records.push(item?);
366                }
367                Ok(Box::new(records.into_iter().map(Ok)))
368            }
369            PhysicalPlan::Filter(node) => {
370                let input_iter = node.input.execute_streaming(Arc::clone(&ctx))?;
371                let predicate = node.predicate;
372                let ctx_clone = Arc::clone(&ctx);
373                Ok(Box::new(input_iter.filter(move |result| {
374                    match result {
375                        Ok(record) => evaluate_expression_streaming(&predicate, record, &ctx_clone),
376                        Err(_) => true, // Pass through errors
377                    }
378                })))
379            }
380            PhysicalPlan::Project(node) => {
381                let input_iter = node.input.execute_streaming(Arc::clone(&ctx))?;
382                let projections = node.projections;
383                let ctx_clone = Arc::clone(&ctx);
384                Ok(Box::new(input_iter.map(move |result| {
385                    result.map(|record| {
386                        let mut new_record = Record::new();
387                        for (expr, alias) in &projections {
388                            let value =
389                                evaluate_expression_value_streaming(expr, &record, &ctx_clone);
390                            new_record.insert(alias.clone(), value);
391                        }
392                        new_record
393                    })
394                })))
395            }
396            PhysicalPlan::Limit(node) => {
397                let limit = usize::try_from(node.limit).unwrap_or(usize::MAX);
398                let input_iter = node.input.execute_streaming(ctx)?;
399                Ok(Box::new(input_iter.take(limit)))
400            }
401            PhysicalPlan::Skip(node) => {
402                let skip = usize::try_from(node.skip).unwrap_or(0);
403                let input_iter = node.input.execute_streaming(ctx)?;
404                Ok(Box::new(input_iter.skip(skip)))
405            }
406            PhysicalPlan::Sort(node) => {
407                let input_iter = node.input.execute_streaming(Arc::clone(&ctx))?;
408                let order_by = node.order_by;
409                let ctx_clone = Arc::clone(&ctx);
410                // Sort requires materialization - collect all records, sort, then iterate
411                let mut records: Vec<Record> = input_iter.filter_map(|r| r.ok()).collect();
412                records.sort_by(|a, b| {
413                    for (expr, direction) in &order_by {
414                        let val_a = evaluate_expression_value_streaming(expr, a, &ctx_clone);
415                        let val_b = evaluate_expression_value_streaming(expr, b, &ctx_clone);
416                        let cmp = compare_values_for_sort(&val_a, &val_b, direction);
417                        if cmp != std::cmp::Ordering::Equal {
418                            return cmp;
419                        }
420                    }
421                    std::cmp::Ordering::Equal
422                });
423                Ok(Box::new(records.into_iter().map(Ok)))
424            }
425            PhysicalPlan::Distinct(node) => {
426                let input_iter = node.input.execute_streaming(Arc::clone(&ctx))?;
427                let mut seen: HashSet<String> = HashSet::new();
428                Ok(Box::new(input_iter.filter_map(
429                    move |result| match result {
430                        Ok(record) => {
431                            let key = record_distinct_key(&record);
432                            if seen.insert(key) {
433                                Some(Ok(record))
434                            } else {
435                                None
436                            }
437                        }
438                        Err(err) => Some(Err(err)),
439                    },
440                )))
441            }
442            PhysicalPlan::Expand(node) => {
443                let input_iter = node.input.execute_streaming(Arc::clone(&ctx))?;
444                Ok(Box::new(StreamingExpandIterator::new(
445                    input_iter,
446                    node.start_node_alias,
447                    node.rel_alias,
448                    node.end_node_alias,
449                    node.direction,
450                    node.rel_type,
451                    ctx,
452                )))
453            }
454            PhysicalPlan::Unwind(node) => {
455                let input_iter = node.input.execute_streaming(Arc::clone(&ctx))?;
456                let expression = node.expression;
457                let alias = node.alias;
458                let ctx_clone = Arc::clone(&ctx);
459                Ok(Box::new(input_iter.flat_map(move |result| {
460                    match result {
461                        Ok(record) => {
462                            match unwind_values_streaming(&expression, &record, &ctx_clone) {
463                                Ok(values) => values
464                                    .into_iter()
465                                    .map(|value| {
466                                        let mut new_record = record.clone();
467                                        new_record.insert(alias.clone(), value);
468                                        Ok(new_record)
469                                    })
470                                    .collect::<Vec<_>>(),
471                                Err(err) => vec![Err(err)],
472                            }
473                        }
474                        Err(err) => vec![Err(err)],
475                    }
476                })))
477            }
478            PhysicalPlan::ExpandVarLength(node) => {
479                let input_iter = node.input.execute_streaming(Arc::clone(&ctx))?;
480                Ok(Box::new(StreamingExpandVarLengthIterator {
481                    input_iter,
482                    start_node_alias: node.start_node_alias,
483                    end_node_alias: node.end_node_alias,
484                    direction: node.direction,
485                    rel_type: node.rel_type,
486                    min_hops: node.min_hops,
487                    max_hops: node.max_hops,
488                    ctx,
489                    current_record: None,
490                    current_expansions: None,
491                }))
492            }
493            PhysicalPlan::NestedLoopJoin(node) => {
494                let left_iter = node.left.execute_streaming(Arc::clone(&ctx))?;
495                let right_plan = *node.right;
496                let predicate = node.predicate;
497                Ok(Box::new(StreamingNestedLoopJoin::new(
498                    left_iter, right_plan, predicate, ctx,
499                )))
500            }
501            PhysicalPlan::LeftOuterJoin(node) => {
502                let left_iter = node.left.execute_streaming(Arc::clone(&ctx))?;
503                let right_plan = *node.right;
504                Ok(Box::new(StreamingLeftOuterJoin::new(
505                    left_iter,
506                    right_plan,
507                    node.right_aliases,
508                    ctx,
509                )))
510            }
511            _ => Err(Error::Other(
512                "Unsupported physical plan type for streaming".to_string(),
513            )),
514        }
515    }
516}
517
518// ============================================================================
519// Streaming Iterator Implementations
520// ============================================================================
521
522/// Streaming version of scan_all_nodes_optimized
523fn scan_all_nodes_streaming(
524    db: Arc<Database>,
525    alias: String,
526) -> impl Iterator<Item = Result<Record, Error>> + Send + 'static {
527    let mut unique_nodes = HashSet::new();
528    let subject_criteria = crate::QueryCriteria::default();
529
530    for triple in db.query(subject_criteria).take(10000) {
531        unique_nodes.insert(triple.subject_id);
532        unique_nodes.insert(triple.object_id);
533    }
534
535    unique_nodes.into_iter().map(move |node_id| {
536        let mut record = Record::new();
537        record.insert(alias.clone(), Value::Node(node_id));
538        Ok(record)
539    })
540}
541
542/// Streaming version of scan_labeled_nodes_optimized
543fn scan_labeled_nodes_streaming(
544    db: Arc<Database>,
545    labels: Vec<String>,
546    alias: String,
547) -> impl Iterator<Item = Result<Record, Error>> + Send + 'static {
548    let type_id = match db.resolve_id("type") {
549        Ok(Some(id)) => Some(id),
550        _ => None,
551    };
552
553    let node_ids: Vec<u64> = if let Some(type_id) = type_id {
554        let mut nodes = HashSet::new();
555        for label in &labels {
556            if let Ok(Some(label_id)) = db.resolve_id(label) {
557                let criteria = crate::QueryCriteria {
558                    subject_id: None,
559                    predicate_id: Some(type_id),
560                    object_id: Some(label_id),
561                };
562                for triple in db.query(criteria) {
563                    nodes.insert(triple.subject_id);
564                }
565            }
566        }
567        nodes.into_iter().collect()
568    } else {
569        Vec::new()
570    };
571
572    node_ids.into_iter().map(move |node_id| {
573        let mut record = Record::new();
574        record.insert(alias.clone(), Value::Node(node_id));
575        Ok(record)
576    })
577}
578
579/// Streaming nested loop join iterator
580struct StreamingNestedLoopJoin {
581    left_iter: Box<dyn Iterator<Item = Result<Record, Error>> + 'static>,
582    right_plan: PhysicalPlan,
583    predicate: Option<Expression>,
584    ctx: Arc<ArcExecutionContext>,
585    current_left: Option<Record>,
586    current_right: Option<Box<dyn Iterator<Item = Result<Record, Error>> + 'static>>,
587}
588
589impl StreamingNestedLoopJoin {
590    fn new(
591        left_iter: Box<dyn Iterator<Item = Result<Record, Error>> + 'static>,
592        right_plan: PhysicalPlan,
593        predicate: Option<Expression>,
594        ctx: Arc<ArcExecutionContext>,
595    ) -> Self {
596        Self {
597            left_iter,
598            right_plan,
599            predicate,
600            ctx,
601            current_left: None,
602            current_right: None,
603        }
604    }
605}
606
607impl Iterator for StreamingNestedLoopJoin {
608    type Item = Result<Record, Error>;
609
610    fn next(&mut self) -> Option<Self::Item> {
611        loop {
612            // Try to get next from current right iterator
613            if let Some(ref mut right_iter) = self.current_right
614                && let Some(right_result) = right_iter.next()
615            {
616                match right_result {
617                    Ok(right_record) => {
618                        let left_record = self.current_left.as_ref().unwrap();
619                        let mut merged = left_record.clone();
620                        for (k, v) in right_record.values {
621                            merged.insert(k, v);
622                        }
623
624                        // Apply predicate if any
625                        if let Some(ref pred) = self.predicate
626                            && !evaluate_expression_streaming(pred, &merged, &self.ctx)
627                        {
628                            continue;
629                        }
630                        return Some(Ok(merged));
631                    }
632                    Err(e) => return Some(Err(e)),
633                }
634            }
635
636            // Get next left record
637            match self.left_iter.next()? {
638                Ok(left_record) => {
639                    self.current_left = Some(left_record);
640                    // Clone the right plan and execute it
641                    match self
642                        .right_plan
643                        .clone()
644                        .execute_streaming(Arc::clone(&self.ctx))
645                    {
646                        Ok(right_iter) => {
647                            self.current_right = Some(right_iter);
648                        }
649                        Err(e) => return Some(Err(e)),
650                    }
651                }
652                Err(e) => return Some(Err(e)),
653            }
654        }
655    }
656}
657
658// SAFETY: StreamingNestedLoopJoin is not Send - FFI calls are single-threaded
659
660/// Streaming left outer join iterator (for OPTIONAL MATCH)
661struct StreamingLeftOuterJoin {
662    left_iter: Box<dyn Iterator<Item = Result<Record, Error>> + 'static>,
663    right_plan: PhysicalPlan,
664    right_aliases: Vec<String>,
665    ctx: Arc<ArcExecutionContext>,
666    current_left: Option<Record>,
667    current_right: Option<Box<dyn Iterator<Item = Result<Record, Error>> + 'static>>,
668    matched_current_left: bool,
669    emitted_null_current_left: bool,
670}
671
672impl StreamingLeftOuterJoin {
673    fn new(
674        left_iter: Box<dyn Iterator<Item = Result<Record, Error>> + 'static>,
675        right_plan: PhysicalPlan,
676        right_aliases: Vec<String>,
677        ctx: Arc<ArcExecutionContext>,
678    ) -> Self {
679        Self {
680            left_iter,
681            right_plan,
682            right_aliases,
683            ctx,
684            current_left: None,
685            current_right: None,
686            matched_current_left: false,
687            emitted_null_current_left: false,
688        }
689    }
690
691    fn emit_null_row(&mut self, mut left_record: Record) -> Record {
692        for alias in &self.right_aliases {
693            left_record
694                .values
695                .entry(alias.clone())
696                .or_insert(Value::Null);
697        }
698        left_record
699    }
700}
701
702impl Iterator for StreamingLeftOuterJoin {
703    type Item = Result<Record, Error>;
704
705    fn next(&mut self) -> Option<Self::Item> {
706        loop {
707            if let Some(ref mut right_iter) = self.current_right {
708                if let Some(right_result) = right_iter.next() {
709                    match right_result {
710                        Ok(right_record) => {
711                            let left_record = self.current_left.as_ref().unwrap().clone();
712                            if let Some(merged) = try_merge_records(left_record, right_record) {
713                                self.matched_current_left = true;
714                                return Some(Ok(merged));
715                            }
716                            continue;
717                        }
718                        Err(e) => return Some(Err(e)),
719                    }
720                }
721
722                // Right exhausted; maybe emit NULL row, then advance.
723                self.current_right = None;
724                if !self.matched_current_left && !self.emitted_null_current_left {
725                    self.emitted_null_current_left = true;
726                    let left_record = self.current_left.take().unwrap();
727                    return Some(Ok(self.emit_null_row(left_record)));
728                }
729                self.current_left = None;
730                self.matched_current_left = false;
731                self.emitted_null_current_left = false;
732                continue;
733            }
734
735            // Load next left record.
736            match self.left_iter.next()? {
737                Ok(left_record) => {
738                    self.current_left = Some(left_record);
739                    self.matched_current_left = false;
740                    self.emitted_null_current_left = false;
741                    match self
742                        .right_plan
743                        .clone()
744                        .execute_streaming(Arc::clone(&self.ctx))
745                    {
746                        Ok(right_iter) => self.current_right = Some(right_iter),
747                        Err(e) => return Some(Err(e)),
748                    }
749                }
750                Err(e) => return Some(Err(e)),
751            }
752        }
753    }
754}
755
756/// Streaming expand iterator
757struct StreamingExpandIterator {
758    input_iter: Box<dyn Iterator<Item = Result<Record, Error>> + 'static>,
759    start_node_alias: String,
760    rel_alias: String,
761    end_node_alias: String,
762    direction: RelationshipDirection,
763    rel_type: Option<String>,
764    ctx: Arc<ArcExecutionContext>,
765    current_record: Option<Record>,
766    current_expansions: Option<std::vec::IntoIter<(crate::Triple, u64)>>,
767}
768
769impl StreamingExpandIterator {
770    fn new(
771        input_iter: Box<dyn Iterator<Item = Result<Record, Error>> + 'static>,
772        start_node_alias: String,
773        rel_alias: String,
774        end_node_alias: String,
775        direction: RelationshipDirection,
776        rel_type: Option<String>,
777        ctx: Arc<ArcExecutionContext>,
778    ) -> Self {
779        Self {
780            input_iter,
781            start_node_alias,
782            rel_alias,
783            end_node_alias,
784            direction,
785            rel_type,
786            ctx,
787            current_record: None,
788            current_expansions: None,
789        }
790    }
791
792    fn get_expansions(&self, node_id: u64) -> Vec<(crate::Triple, u64)> {
793        let db = &self.ctx.db;
794        let mut results = Vec::new();
795
796        let rel_type_id = self
797            .rel_type
798            .as_ref()
799            .and_then(|t| db.resolve_id(t).ok().flatten());
800
801        match self.direction {
802            RelationshipDirection::LeftToRight | RelationshipDirection::Undirected => {
803                let criteria = crate::QueryCriteria {
804                    subject_id: Some(node_id),
805                    predicate_id: rel_type_id,
806                    object_id: None,
807                };
808                for triple in db.query(criteria) {
809                    results.push((triple, triple.object_id));
810                }
811            }
812            _ => {}
813        }
814
815        if self.direction == RelationshipDirection::RightToLeft {
816            let criteria = crate::QueryCriteria {
817                subject_id: None,
818                predicate_id: rel_type_id,
819                object_id: Some(node_id),
820            };
821            for triple in db.query(criteria) {
822                results.push((triple, triple.subject_id));
823            }
824        }
825
826        results
827    }
828}
829
830impl Iterator for StreamingExpandIterator {
831    type Item = Result<Record, Error>;
832
833    fn next(&mut self) -> Option<Self::Item> {
834        loop {
835            // Try to get next expansion from current record
836            if let Some(ref mut expansions) = self.current_expansions
837                && let Some((triple, end_node_id)) = expansions.next()
838            {
839                let record = self.current_record.as_ref().unwrap();
840                let mut new_record = record.clone();
841                new_record.insert(self.rel_alias.clone(), Value::Relationship(triple));
842                new_record.insert(self.end_node_alias.clone(), Value::Node(end_node_id));
843                return Some(Ok(new_record));
844            }
845
846            // Get next input record
847            match self.input_iter.next()? {
848                Ok(record) => {
849                    if let Some(Value::Node(node_id)) = record.values.get(&self.start_node_alias) {
850                        let expansions = self.get_expansions(*node_id);
851                        self.current_record = Some(record);
852                        self.current_expansions = Some(expansions.into_iter());
853                    }
854                }
855                Err(e) => return Some(Err(e)),
856            }
857        }
858    }
859}
860
861/// Streaming version of evaluate_expression
862fn evaluate_expression_streaming(
863    expr: &Expression,
864    record: &Record,
865    ctx: &ArcExecutionContext,
866) -> bool {
867    match evaluate_expression_value_streaming(expr, record, ctx) {
868        Value::Boolean(b) => b,
869        _ => false,
870    }
871}
872
873/// Streaming version of evaluate_expression_value
874fn evaluate_expression_value_streaming(
875    expr: &Expression,
876    record: &Record,
877    ctx: &ArcExecutionContext,
878) -> Value {
879    match expr {
880        Expression::Literal(l) => match l {
881            Literal::String(s) => Value::String(s.clone()),
882            Literal::Float(f) => Value::Float(*f),
883            Literal::Integer(i) => Value::Float(*i as f64),
884            Literal::Boolean(b) => Value::Boolean(*b),
885            Literal::Null => Value::Null,
886        },
887        Expression::Variable(name) => record.get(name).cloned().unwrap_or(Value::Null),
888        Expression::Parameter(name) => ctx.params.get(name).cloned().unwrap_or(Value::Null),
889        Expression::PropertyAccess(pa) => {
890            if let Some(Value::Node(node_id)) = record.get(&pa.variable)
891                && let Ok(Some(binary)) = ctx.db.get_node_property_binary(*node_id)
892                && let Ok(props) = crate::storage::property::deserialize_properties(&binary)
893                && let Some(value) = props.get(&pa.property)
894            {
895                return match value {
896                    serde_json::Value::String(s) => Value::String(s.clone()),
897                    serde_json::Value::Number(n) => Value::Float(n.as_f64().unwrap_or(0.0)),
898                    serde_json::Value::Bool(b) => Value::Boolean(*b),
899                    serde_json::Value::Null => Value::Null,
900                    serde_json::Value::Array(items) => {
901                        let mut out = Vec::with_capacity(items.len());
902                        for item in items {
903                            let Some(n) = item.as_f64() else {
904                                return Value::String(
905                                    serde_json::Value::Array(items.clone()).to_string(),
906                                );
907                            };
908                            out.push(n as f32);
909                        }
910                        Value::Vector(out)
911                    }
912                    _ => Value::Null,
913                };
914            }
915            Value::Null
916        }
917        Expression::Binary(b) => {
918            let left = evaluate_expression_value_streaming(&b.left, record, ctx);
919            let right = evaluate_expression_value_streaming(&b.right, record, ctx);
920
921            match b.operator {
922                BinaryOperator::Equal => Value::Boolean(left == right),
923                BinaryOperator::NotEqual => Value::Boolean(left != right),
924                BinaryOperator::And => match (left, right) {
925                    (Value::Boolean(l), Value::Boolean(r)) => Value::Boolean(l && r),
926                    _ => Value::Null,
927                },
928                BinaryOperator::Or => match (left, right) {
929                    (Value::Boolean(l), Value::Boolean(r)) => Value::Boolean(l || r),
930                    _ => Value::Null,
931                },
932                BinaryOperator::LessThan => match (left, right) {
933                    (Value::Float(l), Value::Float(r)) => Value::Boolean(l < r),
934                    _ => Value::Null,
935                },
936                BinaryOperator::LessThanOrEqual => match (left, right) {
937                    (Value::Float(l), Value::Float(r)) => Value::Boolean(l <= r),
938                    _ => Value::Null,
939                },
940                BinaryOperator::GreaterThan => match (left, right) {
941                    (Value::Float(l), Value::Float(r)) => Value::Boolean(l > r),
942                    _ => Value::Null,
943                },
944                BinaryOperator::GreaterThanOrEqual => match (left, right) {
945                    (Value::Float(l), Value::Float(r)) => Value::Boolean(l >= r),
946                    _ => Value::Null,
947                },
948                BinaryOperator::Add => match (left, right) {
949                    (Value::Float(l), Value::Float(r)) => Value::Float(l + r),
950                    (Value::String(l), Value::String(r)) => Value::String(format!("{}{}", l, r)),
951                    _ => Value::Null,
952                },
953                BinaryOperator::Subtract => match (left, right) {
954                    (Value::Float(l), Value::Float(r)) => Value::Float(l - r),
955                    _ => Value::Null,
956                },
957                BinaryOperator::Multiply => match (left, right) {
958                    (Value::Float(l), Value::Float(r)) => Value::Float(l * r),
959                    _ => Value::Null,
960                },
961                BinaryOperator::Divide => match (left, right) {
962                    (Value::Float(l), Value::Float(r)) if r != 0.0 => Value::Float(l / r),
963                    _ => Value::Null,
964                },
965                BinaryOperator::Modulo => match (left, right) {
966                    (Value::Float(l), Value::Float(r)) if r != 0.0 => Value::Float(l % r),
967                    _ => Value::Null,
968                },
969                BinaryOperator::In => value_in_list(&left, &right),
970                BinaryOperator::NotIn => match value_in_list(&left, &right) {
971                    Value::Boolean(b) => Value::Boolean(!b),
972                    other => other,
973                },
974                BinaryOperator::StartsWith => match (left, right) {
975                    (Value::String(l), Value::String(r)) => Value::Boolean(l.starts_with(&r)),
976                    _ => Value::Null,
977                },
978                BinaryOperator::EndsWith => match (left, right) {
979                    (Value::String(l), Value::String(r)) => Value::Boolean(l.ends_with(&r)),
980                    _ => Value::Null,
981                },
982                BinaryOperator::Contains => match (left, right) {
983                    (Value::String(l), Value::String(r)) => Value::Boolean(l.contains(&r)),
984                    _ => Value::Null,
985                },
986                _ => Value::Null,
987            }
988        }
989        Expression::Unary(u) => {
990            let arg = evaluate_expression_value_streaming(&u.argument, record, ctx);
991            match u.operator {
992                crate::query::ast::UnaryOperator::Not => match arg {
993                    Value::Boolean(b) => Value::Boolean(!b),
994                    _ => Value::Null,
995                },
996                crate::query::ast::UnaryOperator::Negate => match arg {
997                    Value::Float(f) => Value::Float(-f),
998                    _ => Value::Null,
999                },
1000            }
1001        }
1002        Expression::Case(case_expr) => {
1003            for alt in &case_expr.alternatives {
1004                if evaluate_expression_streaming(&alt.when, record, ctx) {
1005                    return evaluate_expression_value_streaming(&alt.then, record, ctx);
1006                }
1007            }
1008            match &case_expr.else_expression {
1009                Some(expr) => evaluate_expression_value_streaming(expr, record, ctx),
1010                None => Value::Null,
1011            }
1012        }
1013        Expression::Exists(exists_expr) => {
1014            Value::Boolean(evaluate_exists_streaming(exists_expr.as_ref(), record, ctx))
1015        }
1016        Expression::List(elements) => list_literal_value_streaming(elements, record, ctx),
1017        Expression::ListComprehension(comp) => {
1018            list_comprehension_value_streaming(comp.as_ref(), record, ctx)
1019        }
1020        Expression::FunctionCall(func) => match func.name.to_lowercase().as_str() {
1021            "vec_similarity" => {
1022                let Some(left) = func
1023                    .arguments
1024                    .first()
1025                    .map(|arg| evaluate_expression_value_streaming(arg, record, ctx))
1026                else {
1027                    return Value::Null;
1028                };
1029                let Some(right) = func
1030                    .arguments
1031                    .get(1)
1032                    .map(|arg| evaluate_expression_value_streaming(arg, record, ctx))
1033                else {
1034                    return Value::Null;
1035                };
1036                let Some(left_vec) = value_to_vector(&left) else {
1037                    return Value::Null;
1038                };
1039                let Some(right_vec) = value_to_vector(&right) else {
1040                    return Value::Null;
1041                };
1042                let Some(sim) = cosine_similarity(&left_vec, &right_vec) else {
1043                    return Value::Null;
1044                };
1045                Value::Float(sim as f64)
1046            }
1047            "txt_score" => {
1048                #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
1049                {
1050                    let Some(Expression::PropertyAccess(pa)) = func.arguments.first() else {
1051                        return Value::Null;
1052                    };
1053                    let Some(Value::Node(node_id)) = record.get(&pa.variable) else {
1054                        return Value::Null;
1055                    };
1056                    let Some(query_expr) = func.arguments.get(1) else {
1057                        return Value::Null;
1058                    };
1059                    let Value::String(query) =
1060                        evaluate_expression_value_streaming(query_expr, record, ctx)
1061                    else {
1062                        return Value::Null;
1063                    };
1064                    Value::Float(ctx.db.fts_txt_score(*node_id, &pa.property, &query))
1065                }
1066                #[cfg(not(all(feature = "fts", not(target_arch = "wasm32"))))]
1067                {
1068                    Value::Float(0.0)
1069                }
1070            }
1071            "id" => match func
1072                .arguments
1073                .first()
1074                .map(|arg| evaluate_expression_value_streaming(arg, record, ctx))
1075            {
1076                Some(Value::Node(id)) => Value::Float(id as f64),
1077                _ => Value::Null,
1078            },
1079            "type" => match func
1080                .arguments
1081                .first()
1082                .map(|arg| evaluate_expression_value_streaming(arg, record, ctx))
1083            {
1084                Some(Value::Relationship(triple)) => relationship_type_value(&ctx.db, &triple),
1085                _ => Value::Null,
1086            },
1087            "labels" => match func
1088                .arguments
1089                .first()
1090                .map(|arg| evaluate_expression_value_streaming(arg, record, ctx))
1091            {
1092                Some(Value::Node(id)) => node_labels_value(&ctx.db, id),
1093                _ => Value::Null,
1094            },
1095            "keys" => match func
1096                .arguments
1097                .first()
1098                .map(|arg| evaluate_expression_value_streaming(arg, record, ctx))
1099            {
1100                Some(Value::Node(id)) => node_property_keys_value(&ctx.db, id),
1101                Some(Value::Relationship(triple)) => edge_property_keys_value(&ctx.db, &triple),
1102                _ => Value::Null,
1103            },
1104            "size" => match func
1105                .arguments
1106                .first()
1107                .map(|arg| evaluate_expression_value_streaming(arg, record, ctx))
1108            {
1109                Some(Value::String(s)) => Value::Float(s.len() as f64),
1110                _ => Value::Null,
1111            },
1112            "toupper" => match func
1113                .arguments
1114                .first()
1115                .map(|arg| evaluate_expression_value_streaming(arg, record, ctx))
1116            {
1117                Some(Value::String(s)) => Value::String(s.to_uppercase()),
1118                _ => Value::Null,
1119            },
1120            "tolower" => match func
1121                .arguments
1122                .first()
1123                .map(|arg| evaluate_expression_value_streaming(arg, record, ctx))
1124            {
1125                Some(Value::String(s)) => Value::String(s.to_lowercase()),
1126                _ => Value::Null,
1127            },
1128            "trim" => match func
1129                .arguments
1130                .first()
1131                .map(|arg| evaluate_expression_value_streaming(arg, record, ctx))
1132            {
1133                Some(Value::String(s)) => Value::String(s.trim().to_string()),
1134                _ => Value::Null,
1135            },
1136            "coalesce" => {
1137                for arg in &func.arguments {
1138                    let v = evaluate_expression_value_streaming(arg, record, ctx);
1139                    if !matches!(v, Value::Null) {
1140                        return v;
1141                    }
1142                }
1143                Value::Null
1144            }
1145            _ => Value::Null,
1146        },
1147        _ => Value::Null,
1148    }
1149}
1150
1151fn list_literal_value_streaming(
1152    elements: &[Expression],
1153    record: &Record,
1154    ctx: &ArcExecutionContext,
1155) -> Value {
1156    let json = serde_json::Value::Array(
1157        elements
1158            .iter()
1159            .map(|e| executor_value_to_json(&evaluate_expression_value_streaming(e, record, ctx)))
1160            .collect(),
1161    );
1162    Value::String(json.to_string())
1163}
1164
1165fn list_comprehension_value_streaming(
1166    comp: &ListComprehension,
1167    record: &Record,
1168    ctx: &ArcExecutionContext,
1169) -> Value {
1170    let Some(items) = evaluate_list_source_streaming(&comp.list, record, ctx) else {
1171        return Value::Null;
1172    };
1173
1174    let mut out = Vec::new();
1175    for item in items {
1176        let mut scoped = record.clone();
1177        scoped.insert(comp.variable.clone(), item);
1178
1179        if let Some(where_expr) = &comp.where_expression
1180            && !evaluate_expression_streaming(where_expr, &scoped, ctx)
1181        {
1182            continue;
1183        }
1184
1185        let mapped = match &comp.map_expression {
1186            Some(expr) => evaluate_expression_value_streaming(expr, &scoped, ctx),
1187            None => scoped.get(&comp.variable).cloned().unwrap_or(Value::Null),
1188        };
1189        out.push(executor_value_to_json(&mapped));
1190    }
1191
1192    Value::String(serde_json::Value::Array(out).to_string())
1193}
1194
1195fn evaluate_list_source_streaming(
1196    expr: &Expression,
1197    record: &Record,
1198    ctx: &ArcExecutionContext,
1199) -> Option<Vec<Value>> {
1200    match expr {
1201        Expression::List(elements) => Some(
1202            elements
1203                .iter()
1204                .map(|e| evaluate_expression_value_streaming(e, record, ctx))
1205                .collect(),
1206        ),
1207        _ => match evaluate_expression_value_streaming(expr, record, ctx) {
1208            Value::String(s) => parse_executor_list_string(&s),
1209            Value::Vector(v) => Some(v.iter().map(|f| Value::Float(*f as f64)).collect()),
1210            _ => None,
1211        },
1212    }
1213}
1214
1215fn executor_value_to_json(value: &Value) -> serde_json::Value {
1216    match value {
1217        Value::Null => serde_json::Value::Null,
1218        Value::Boolean(b) => serde_json::Value::Bool(*b),
1219        Value::Float(f) => serde_json::Number::from_f64(*f)
1220            .map(serde_json::Value::Number)
1221            .unwrap_or(serde_json::Value::Null),
1222        Value::String(s) => serde_json::Value::String(s.clone()),
1223        Value::Vector(v) => serde_json::Value::Array(
1224            v.iter()
1225                .map(|f| {
1226                    serde_json::Number::from_f64(*f as f64)
1227                        .map(serde_json::Value::Number)
1228                        .unwrap_or(serde_json::Value::Null)
1229                })
1230                .collect(),
1231        ),
1232        Value::Node(id) => serde_json::Value::Number(serde_json::Number::from(*id)),
1233        Value::Relationship(triple) => serde_json::Value::String(format!("{triple:?}")),
1234    }
1235}
1236
1237fn json_to_executor_value(value: &serde_json::Value) -> Value {
1238    match value {
1239        serde_json::Value::Null => Value::Null,
1240        serde_json::Value::Bool(b) => Value::Boolean(*b),
1241        serde_json::Value::Number(n) => Value::Float(n.as_f64().unwrap_or(0.0)),
1242        serde_json::Value::String(s) => Value::String(s.clone()),
1243        _ => Value::Null,
1244    }
1245}
1246
1247fn parse_executor_list_string(input: &str) -> Option<Vec<Value>> {
1248    let json = serde_json::from_str::<serde_json::Value>(input).ok()?;
1249    let serde_json::Value::Array(items) = json else {
1250        return None;
1251    };
1252    Some(items.iter().map(json_to_executor_value).collect())
1253}
1254
1255fn parse_executor_vector_string(input: &str) -> Option<Vec<f32>> {
1256    let json = serde_json::from_str::<serde_json::Value>(input).ok()?;
1257    let serde_json::Value::Array(items) = json else {
1258        return None;
1259    };
1260    let mut out = Vec::with_capacity(items.len());
1261    for item in items {
1262        out.push(item.as_f64()? as f32);
1263    }
1264    Some(out)
1265}
1266
1267fn value_to_vector(value: &Value) -> Option<Vec<f32>> {
1268    match value {
1269        Value::Vector(v) => Some(v.clone()),
1270        Value::String(s) => parse_executor_vector_string(s),
1271        _ => None,
1272    }
1273}
1274
1275fn cosine_similarity(a: &[f32], b: &[f32]) -> Option<f32> {
1276    if a.is_empty() || a.len() != b.len() {
1277        return None;
1278    }
1279    let mut dot = 0.0f32;
1280    let mut norm_a = 0.0f32;
1281    let mut norm_b = 0.0f32;
1282    for (x, y) in a.iter().zip(b.iter()) {
1283        dot += x * y;
1284        norm_a += x * x;
1285        norm_b += y * y;
1286    }
1287    let denom = norm_a.sqrt() * norm_b.sqrt();
1288    if denom == 0.0 {
1289        return None;
1290    }
1291    Some(dot / denom)
1292}
1293
1294fn value_in_list(needle: &Value, haystack: &Value) -> Value {
1295    match haystack {
1296        Value::String(input) => {
1297            let Some(items) = parse_executor_list_string(input) else {
1298                return Value::Null;
1299            };
1300            Value::Boolean(items.iter().any(|v| v == needle))
1301        }
1302        Value::Vector(items) => match needle {
1303            Value::Float(f) => Value::Boolean(items.iter().any(|v| (*v as f64) == *f)),
1304            _ => Value::Null,
1305        },
1306        _ => Value::Null,
1307    }
1308}
1309
1310fn unwind_value_to_list(value: Value) -> Result<Vec<Value>, Error> {
1311    match value {
1312        Value::Null => Ok(Vec::new()),
1313        Value::String(input) => parse_executor_list_string(&input)
1314            .ok_or_else(|| Error::Other("UNWIND expects a list expression".to_string())),
1315        Value::Vector(items) => Ok(items.into_iter().map(|f| Value::Float(f as f64)).collect()),
1316        _ => Err(Error::Other("UNWIND expects a list expression".to_string())),
1317    }
1318}
1319
1320fn unwind_values_streaming(
1321    expr: &Expression,
1322    record: &Record,
1323    ctx: &ArcExecutionContext,
1324) -> Result<Vec<Value>, Error> {
1325    match expr {
1326        Expression::List(elements) => Ok(elements
1327            .iter()
1328            .map(|e| evaluate_expression_value_streaming(e, record, ctx))
1329            .collect()),
1330        _ => unwind_value_to_list(evaluate_expression_value_streaming(expr, record, ctx)),
1331    }
1332}
1333
1334fn unwind_values(
1335    expr: &Expression,
1336    record: &Record,
1337    ctx: &ExecutionContext,
1338) -> Result<Vec<Value>, Error> {
1339    match expr {
1340        Expression::List(elements) => Ok(elements
1341            .iter()
1342            .map(|e| evaluate_expression_value(e, record, ctx))
1343            .collect()),
1344        _ => unwind_value_to_list(evaluate_expression_value(expr, record, ctx)),
1345    }
1346}
1347
1348fn evaluate_exists_streaming(
1349    exists_expr: &ExistsExpression,
1350    record: &Record,
1351    ctx: &ArcExecutionContext,
1352) -> bool {
1353    match exists_expr {
1354        ExistsExpression::Pattern(pattern) => {
1355            exists_match_pattern_streaming(pattern, None, record, ctx)
1356        }
1357        ExistsExpression::Subquery(query) => {
1358            let (pattern, where_expr) = match extract_exists_match_query(query) {
1359                Some(v) => v,
1360                None => return false,
1361            };
1362            exists_match_pattern_streaming(pattern, where_expr, record, ctx)
1363        }
1364    }
1365}
1366
1367fn exists_match_pattern_streaming(
1368    pattern: &Pattern,
1369    where_expr: Option<&Expression>,
1370    outer_record: &Record,
1371    ctx: &ArcExecutionContext,
1372) -> bool {
1373    let Some(PathElement::Node(start_node)) = pattern.elements.first() else {
1374        return false;
1375    };
1376
1377    if let Some(var) = &start_node.variable
1378        && let Some(Value::Node(start_id)) = outer_record.get(var)
1379    {
1380        if !node_satisfies_streaming(*start_id, start_node, outer_record, ctx) {
1381            return false;
1382        }
1383        return exists_path_from_node_streaming(
1384            pattern,
1385            0,
1386            *start_id,
1387            outer_record,
1388            where_expr,
1389            ctx,
1390        );
1391    }
1392
1393    // Fallback: evaluate as an uncorrelated MATCH query and check if any result exists.
1394    exists_uncorrelated_match_streaming(pattern, where_expr, ctx)
1395}
1396
1397fn exists_uncorrelated_match_streaming(
1398    pattern: &Pattern,
1399    where_expr: Option<&Expression>,
1400    ctx: &ArcExecutionContext,
1401) -> bool {
1402    use crate::query::ast::{MatchClause, Query, ReturnClause, ReturnItem, WhereClause};
1403    use crate::query::planner::QueryPlanner;
1404
1405    let mut clauses: Vec<Clause> = Vec::new();
1406    clauses.push(Clause::Match(MatchClause {
1407        optional: false,
1408        pattern: pattern.clone(),
1409    }));
1410    if let Some(expr) = where_expr.cloned() {
1411        clauses.push(Clause::Where(WhereClause { expression: expr }));
1412    }
1413    clauses.push(Clause::Return(ReturnClause {
1414        distinct: false,
1415        items: vec![ReturnItem {
1416            expression: Expression::Literal(Literal::Integer(1)),
1417            alias: Some("_exists".to_string()),
1418        }],
1419        order_by: None,
1420        limit: Some(1),
1421        skip: None,
1422    }));
1423
1424    let planner = QueryPlanner::new();
1425    let plan = match planner.plan(Query { clauses }) {
1426        Ok(plan) => plan,
1427        Err(_) => return false,
1428    };
1429
1430    let exec_ctx = ExecutionContext {
1431        db: ctx.db.as_ref(),
1432        params: ctx.params.as_ref(),
1433    };
1434    match plan.execute(&exec_ctx) {
1435        Ok(mut iter) => iter.next().is_some(),
1436        Err(_) => false,
1437    }
1438}
1439
1440fn exists_path_from_node_streaming(
1441    pattern: &Pattern,
1442    node_index: usize,
1443    current_node_id: u64,
1444    bindings: &Record,
1445    where_expr: Option<&Expression>,
1446    ctx: &ArcExecutionContext,
1447) -> bool {
1448    let next_rel_index = node_index + 1;
1449    let next_node_index = node_index + 2;
1450
1451    if next_node_index >= pattern.elements.len() {
1452        return where_expr.is_none_or(|expr| evaluate_expression_streaming(expr, bindings, ctx));
1453    }
1454
1455    let PathElement::Relationship(rel) = &pattern.elements[next_rel_index] else {
1456        return false;
1457    };
1458    let PathElement::Node(next_node) = &pattern.elements[next_node_index] else {
1459        return false;
1460    };
1461
1462    if rel.variable_length.is_some() {
1463        return exists_var_length_step_streaming(
1464            pattern,
1465            next_node_index,
1466            current_node_id,
1467            rel,
1468            bindings,
1469            where_expr,
1470            ctx,
1471        );
1472    }
1473
1474    for (triple, end_id) in iter_matching_edges(ctx.db.as_ref(), current_node_id, rel) {
1475        let mut new_record = bindings.clone();
1476
1477        if let Some(rel_var) = &rel.variable {
1478            match new_record.get(rel_var) {
1479                Some(Value::Relationship(existing)) if existing == &triple => {}
1480                Some(_) => continue,
1481                None => new_record.insert(rel_var.clone(), Value::Relationship(triple)),
1482            }
1483        }
1484
1485        if let Some(props) = &rel.properties
1486            && !edge_satisfies_streaming(&triple, props, &new_record, ctx)
1487        {
1488            continue;
1489        }
1490
1491        if !node_satisfies_streaming(end_id, next_node, &new_record, ctx) {
1492            continue;
1493        }
1494
1495        if let Some(node_var) = &next_node.variable {
1496            match new_record.get(node_var) {
1497                Some(Value::Node(existing)) if *existing == end_id => {}
1498                Some(_) => continue,
1499                None => new_record.insert(node_var.clone(), Value::Node(end_id)),
1500            }
1501        }
1502
1503        if exists_path_from_node_streaming(
1504            pattern,
1505            next_node_index,
1506            end_id,
1507            &new_record,
1508            where_expr,
1509            ctx,
1510        ) {
1511            return true;
1512        }
1513    }
1514
1515    false
1516}
1517
1518fn exists_var_length_step_streaming(
1519    pattern: &Pattern,
1520    next_node_index: usize,
1521    current_node_id: u64,
1522    rel: &RelationshipPattern,
1523    bindings: &Record,
1524    where_expr: Option<&Expression>,
1525    ctx: &ArcExecutionContext,
1526) -> bool {
1527    let PathElement::Node(next_node) = &pattern.elements[next_node_index] else {
1528        return false;
1529    };
1530    if rel.variable.is_some() || rel.properties.is_some() {
1531        return false;
1532    }
1533    let Some(var_len) = &rel.variable_length else {
1534        return false;
1535    };
1536    let min_hops = var_len.min.unwrap_or(1);
1537    let Some(max_hops) = var_len.max else {
1538        return false;
1539    };
1540
1541    if rel.types.len() > 1 {
1542        return false;
1543    }
1544
1545    let rel_predicate_id = rel
1546        .types
1547        .first()
1548        .and_then(|t| ctx.db.resolve_id(t).ok().flatten());
1549
1550    let reachable = find_reachable_nodes(
1551        ctx.db.as_ref(),
1552        current_node_id,
1553        rel.direction.clone(),
1554        rel_predicate_id,
1555        min_hops,
1556        max_hops,
1557    );
1558
1559    for end_id in reachable {
1560        let mut new_record = bindings.clone();
1561
1562        if !node_satisfies_streaming(end_id, next_node, &new_record, ctx) {
1563            continue;
1564        }
1565
1566        if let Some(node_var) = &next_node.variable {
1567            match new_record.get(node_var) {
1568                Some(Value::Node(existing)) if *existing == end_id => {}
1569                Some(_) => continue,
1570                None => new_record.insert(node_var.clone(), Value::Node(end_id)),
1571            }
1572        }
1573
1574        if exists_path_from_node_streaming(
1575            pattern,
1576            next_node_index,
1577            end_id,
1578            &new_record,
1579            where_expr,
1580            ctx,
1581        ) {
1582            return true;
1583        }
1584    }
1585
1586    false
1587}
1588
1589fn node_satisfies_streaming(
1590    node_id: u64,
1591    node: &crate::query::ast::NodePattern,
1592    bindings: &Record,
1593    ctx: &ArcExecutionContext,
1594) -> bool {
1595    if let Some(var) = &node.variable
1596        && let Some(Value::Node(bound)) = bindings.get(var)
1597        && *bound != node_id
1598    {
1599        return false;
1600    }
1601
1602    if !node.labels.is_empty() {
1603        let Some(type_id) = ctx.db.resolve_id("type").ok().flatten() else {
1604            return false;
1605        };
1606        for label in &node.labels {
1607            let Some(label_id) = ctx.db.resolve_id(label).ok().flatten() else {
1608                return false;
1609            };
1610            let criteria = QueryCriteria {
1611                subject_id: Some(node_id),
1612                predicate_id: Some(type_id),
1613                object_id: Some(label_id),
1614            };
1615            if ctx.db.query(criteria).next().is_none() {
1616                return false;
1617            }
1618        }
1619    }
1620
1621    if let Some(props) = &node.properties
1622        && !node_properties_match_streaming(node_id, props, bindings, ctx)
1623    {
1624        return false;
1625    }
1626
1627    true
1628}
1629
1630fn node_properties_match_streaming(
1631    node_id: u64,
1632    props: &PropertyMap,
1633    bindings: &Record,
1634    ctx: &ArcExecutionContext,
1635) -> bool {
1636    if props.properties.is_empty() {
1637        return true;
1638    }
1639    let Ok(Some(binary)) = ctx.db.get_node_property_binary(node_id) else {
1640        return false;
1641    };
1642    let Ok(stored) = crate::storage::property::deserialize_properties(&binary) else {
1643        return false;
1644    };
1645
1646    for pair in &props.properties {
1647        let expected = evaluate_expression_value_streaming(&pair.value, bindings, ctx);
1648        let Some(actual) = stored.get(&pair.key) else {
1649            return false;
1650        };
1651        if !json_value_matches_executor_value(actual, &expected) {
1652            return false;
1653        }
1654    }
1655
1656    true
1657}
1658
1659fn edge_satisfies_streaming(
1660    triple: &Triple,
1661    props: &PropertyMap,
1662    bindings: &Record,
1663    ctx: &ArcExecutionContext,
1664) -> bool {
1665    if props.properties.is_empty() {
1666        return true;
1667    }
1668    let Ok(Some(binary)) =
1669        ctx.db
1670            .get_edge_property_binary(triple.subject_id, triple.predicate_id, triple.object_id)
1671    else {
1672        return false;
1673    };
1674    let Ok(stored) = crate::storage::property::deserialize_properties(&binary) else {
1675        return false;
1676    };
1677
1678    for pair in &props.properties {
1679        let expected = evaluate_expression_value_streaming(&pair.value, bindings, ctx);
1680        let Some(actual) = stored.get(&pair.key) else {
1681            return false;
1682        };
1683        if !json_value_matches_executor_value(actual, &expected) {
1684            return false;
1685        }
1686    }
1687
1688    true
1689}
1690
1691fn json_value_matches_executor_value(actual: &serde_json::Value, expected: &Value) -> bool {
1692    match expected {
1693        Value::Null => actual.is_null(),
1694        Value::String(s) => actual.as_str() == Some(s.as_str()),
1695        Value::Boolean(b) => actual.as_bool() == Some(*b),
1696        Value::Float(f) => actual.as_f64().map(|v| v == *f).unwrap_or(false),
1697        _ => false,
1698    }
1699}
1700
1701fn iter_matching_edges(
1702    db: &Database,
1703    node_id: u64,
1704    rel: &RelationshipPattern,
1705) -> Vec<(Triple, u64)> {
1706    let predicate_ids: Vec<u64> = rel
1707        .types
1708        .iter()
1709        .filter_map(|t| db.resolve_id(t).ok().flatten())
1710        .collect();
1711
1712    let predicate_ids: Vec<Option<u64>> = if predicate_ids.is_empty() && !rel.types.is_empty() {
1713        // Types were specified but none resolved -> no matches.
1714        return Vec::new();
1715    } else if predicate_ids.is_empty() {
1716        vec![None]
1717    } else {
1718        predicate_ids.into_iter().map(Some).collect()
1719    };
1720
1721    let mut out = Vec::new();
1722
1723    for pred in predicate_ids {
1724        match rel.direction {
1725            RelationshipDirection::LeftToRight => {
1726                let criteria = QueryCriteria {
1727                    subject_id: Some(node_id),
1728                    predicate_id: pred,
1729                    object_id: None,
1730                };
1731                out.extend(db.query(criteria).map(|t| (t, t.object_id)));
1732            }
1733            RelationshipDirection::RightToLeft => {
1734                let criteria = QueryCriteria {
1735                    subject_id: None,
1736                    predicate_id: pred,
1737                    object_id: Some(node_id),
1738                };
1739                out.extend(db.query(criteria).map(|t| (t, t.subject_id)));
1740            }
1741            RelationshipDirection::Undirected => {
1742                let out_criteria = QueryCriteria {
1743                    subject_id: Some(node_id),
1744                    predicate_id: pred,
1745                    object_id: None,
1746                };
1747                out.extend(db.query(out_criteria).map(|t| (t, t.object_id)));
1748
1749                let in_criteria = QueryCriteria {
1750                    subject_id: None,
1751                    predicate_id: pred,
1752                    object_id: Some(node_id),
1753                };
1754                out.extend(db.query(in_criteria).map(|t| (t, t.subject_id)));
1755            }
1756        }
1757    }
1758
1759    out
1760}
1761
1762fn extract_exists_match_query(
1763    query: &crate::query::ast::Query,
1764) -> Option<(&Pattern, Option<&Expression>)> {
1765    use crate::query::ast::{Clause, WhereClause};
1766
1767    let mut match_pattern: Option<&Pattern> = None;
1768    let mut where_expr: Option<&Expression> = None;
1769
1770    for clause in &query.clauses {
1771        match clause {
1772            Clause::Match(m) => match_pattern = Some(&m.pattern),
1773            Clause::Where(WhereClause { expression }) => where_expr = Some(expression),
1774            Clause::Return(_) => {}
1775            _ => return None,
1776        }
1777    }
1778
1779    match_pattern.map(|p| (p, where_expr))
1780}
1781fn json_array_string(mut values: Vec<String>) -> Value {
1782    values.sort();
1783    let json =
1784        serde_json::Value::Array(values.into_iter().map(serde_json::Value::String).collect());
1785    Value::String(json.to_string())
1786}
1787
1788fn relationship_type_value(db: &Database, triple: &Triple) -> Value {
1789    match db.resolve_str(triple.predicate_id).ok().flatten() {
1790        Some(s) => Value::String(s),
1791        None => Value::Null,
1792    }
1793}
1794
1795fn node_labels_value(db: &Database, node_id: u64) -> Value {
1796    let Some(type_id) = db.resolve_id("type").ok().flatten() else {
1797        return Value::String("[]".to_string());
1798    };
1799
1800    let criteria = QueryCriteria {
1801        subject_id: Some(node_id),
1802        predicate_id: Some(type_id),
1803        object_id: None,
1804    };
1805
1806    let labels: Vec<String> = db
1807        .query(criteria)
1808        .filter_map(|t| db.resolve_str(t.object_id).ok().flatten())
1809        .collect();
1810
1811    json_array_string(labels)
1812}
1813
1814fn node_property_keys_value(db: &Database, node_id: u64) -> Value {
1815    let keys: Vec<String> = db
1816        .get_node_property_binary(node_id)
1817        .ok()
1818        .flatten()
1819        .and_then(|binary| crate::storage::property::deserialize_properties(&binary).ok())
1820        .map(|props| props.keys().cloned().collect())
1821        .unwrap_or_default();
1822
1823    json_array_string(keys)
1824}
1825
1826fn edge_property_keys_value(db: &Database, triple: &Triple) -> Value {
1827    let keys: Vec<String> = db
1828        .get_edge_property_binary(triple.subject_id, triple.predicate_id, triple.object_id)
1829        .ok()
1830        .flatten()
1831        .and_then(|binary| crate::storage::property::deserialize_properties(&binary).ok())
1832        .map(|props| props.keys().cloned().collect())
1833        .unwrap_or_default();
1834
1835    json_array_string(keys)
1836}
1837
1838// ============================================================================
1839// Optimized ScanNode Implementation
1840// ============================================================================
1841
1842impl ExecutionPlan for SingleRowNode {
1843    fn execute<'a>(
1844        &'a self,
1845        _ctx: &'a ExecutionContext<'a>,
1846    ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
1847        Ok(Box::new(std::iter::once(Ok(Record::new()))))
1848    }
1849
1850    fn estimate_cardinality(&self, _ctx: &ExecutionContext) -> usize {
1851        1
1852    }
1853}
1854
1855impl ExecutionPlan for DistinctNode {
1856    fn execute<'a>(
1857        &'a self,
1858        ctx: &'a ExecutionContext<'a>,
1859    ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
1860        let input_iter = self.input.execute(ctx)?;
1861        let mut seen: HashSet<String> = HashSet::new();
1862        Ok(Box::new(input_iter.filter_map(
1863            move |result| match result {
1864                Ok(record) => {
1865                    let key = record_distinct_key(&record);
1866                    if seen.insert(key) {
1867                        Some(Ok(record))
1868                    } else {
1869                        None
1870                    }
1871                }
1872                Err(err) => Some(Err(err)),
1873            },
1874        )))
1875    }
1876
1877    fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
1878        self.input.estimate_cardinality(ctx)
1879    }
1880}
1881
1882impl ExecutionPlan for UnwindNode {
1883    fn execute<'a>(
1884        &'a self,
1885        ctx: &'a ExecutionContext<'a>,
1886    ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
1887        let input_iter = self.input.execute(ctx)?;
1888        let expression = self.expression.clone();
1889        let alias = self.alias.clone();
1890        Ok(Box::new(input_iter.flat_map(move |result| {
1891            match result {
1892                Ok(record) => match unwind_values(&expression, &record, ctx) {
1893                    Ok(values) => values
1894                        .into_iter()
1895                        .map(|value| {
1896                            let mut new_record = record.clone();
1897                            new_record.insert(alias.clone(), value);
1898                            Ok(new_record)
1899                        })
1900                        .collect::<Vec<_>>(),
1901                    Err(err) => vec![Err(err)],
1902                },
1903                Err(err) => vec![Err(err)],
1904            }
1905        })))
1906    }
1907
1908    fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
1909        self.input.estimate_cardinality(ctx)
1910    }
1911}
1912
1913impl ExecutionPlan for ScanNode {
1914    fn execute<'a>(
1915        &'a self,
1916        ctx: &'a ExecutionContext<'a>,
1917    ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
1918        let alias = self.alias.clone();
1919
1920        if self.labels.is_empty() {
1921            // 无标签扫描:使用优化的节点发现算法
1922            Ok(Box::new(scan_all_nodes_optimized(ctx.db, alias)))
1923        } else {
1924            // 标签扫描:利用 (?, type, Label) 索引
1925            Ok(Box::new(scan_labeled_nodes_optimized(
1926                ctx.db,
1927                &self.labels,
1928                alias,
1929            )))
1930        }
1931    }
1932
1933    fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
1934        ScanStats::estimate_scan_cardinality(ctx.db, &self.labels).estimated_cardinality
1935    }
1936}
1937
1938impl ExecutionPlan for FtsCandidateScanNode {
1939    fn execute<'a>(
1940        &'a self,
1941        ctx: &'a ExecutionContext<'a>,
1942    ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
1943        let Some(query) = resolve_query_string(&self.query, ctx) else {
1944            return Ok(Box::new(std::iter::empty()));
1945        };
1946        if query.is_empty() || self.property.is_empty() {
1947            return Ok(Box::new(std::iter::empty()));
1948        }
1949
1950        #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
1951        let Some(scores) = ctx
1952            .db
1953            .fts_scores_for_query(self.property.as_str(), query.as_str())
1954        else {
1955            return Ok(Box::new(std::iter::empty()));
1956        };
1957        #[cfg(not(all(feature = "fts", not(target_arch = "wasm32"))))]
1958        let scores: std::sync::Arc<std::collections::HashMap<u64, f32>> =
1959            std::sync::Arc::new(std::collections::HashMap::new());
1960
1961        let alias = self.alias.clone();
1962        let db = ctx.db;
1963        let type_and_labels = resolve_label_ids(db, &self.labels);
1964        let candidate_ids: Vec<u64> = scores.keys().copied().collect();
1965
1966        Ok(Box::new(candidate_ids.into_iter().filter_map(
1967            move |node_id| {
1968                if let Some((type_id, label_ids)) = type_and_labels.as_ref()
1969                    && !node_has_labels(db, node_id, *type_id, label_ids)
1970                {
1971                    return None;
1972                }
1973
1974                let mut record = Record::new();
1975                record.insert(alias.clone(), Value::Node(node_id));
1976                Some(Ok(record))
1977            },
1978        )))
1979    }
1980
1981    fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
1982        let scan_est = ScanStats::estimate_scan_cardinality(ctx.db, &self.labels)
1983            .estimated_cardinality
1984            .max(1);
1985        scan_est.min(10_000)
1986    }
1987}
1988
1989impl ExecutionPlan for VectorTopKScanNode {
1990    fn execute<'a>(
1991        &'a self,
1992        ctx: &'a ExecutionContext<'a>,
1993    ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
1994        fn fallback<'a>(
1995            node: &VectorTopKScanNode,
1996            ctx: &'a ExecutionContext<'a>,
1997            limit: usize,
1998        ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
1999            let alias = node.alias.clone();
2000            let property = node.property.clone();
2001            let query_expr = node.query.clone();
2002
2003            let sort_expr = Expression::FunctionCall(FunctionCall {
2004                name: "vec_similarity".to_string(),
2005                arguments: vec![
2006                    Expression::PropertyAccess(PropertyAccess {
2007                        variable: alias.clone(),
2008                        property,
2009                    }),
2010                    query_expr,
2011                ],
2012            });
2013
2014            let mut records = Vec::new();
2015            if node.labels.is_empty() {
2016                for item in scan_all_nodes_optimized(ctx.db, alias.clone()) {
2017                    records.push(item?);
2018                }
2019            } else {
2020                let labels = node.labels.clone();
2021                for item in scan_labeled_nodes_optimized(ctx.db, &labels, alias.clone()) {
2022                    records.push(item?);
2023                }
2024            }
2025            records.sort_by(|a, b| {
2026                let val_a = evaluate_expression_value(&sort_expr, a, ctx);
2027                let val_b = evaluate_expression_value(&sort_expr, b, ctx);
2028                compare_values_for_sort(&val_a, &val_b, &Direction::Descending)
2029            });
2030            records.truncate(limit);
2031            Ok(Box::new(records.into_iter().map(Ok)))
2032        }
2033
2034        let limit = usize::try_from(self.limit).unwrap_or(usize::MAX);
2035        if limit == 0 || self.property.is_empty() {
2036            return Ok(Box::new(std::iter::empty()));
2037        }
2038
2039        // Only optimize global Top-K for now. Label filtering stays on the exact path.
2040        if !self.labels.is_empty() {
2041            return fallback(self, ctx, limit);
2042        }
2043
2044        #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
2045        {
2046            let query_value = evaluate_expression_value(&self.query, &Record::new(), ctx);
2047            let Some(query_vec) = value_to_vector(&query_value) else {
2048                return fallback(self, ctx, limit);
2049            };
2050
2051            let Some(config) = ctx.db.vector_index_config() else {
2052                return fallback(self, ctx, limit);
2053            };
2054
2055            let metric = config.metric.to_lowercase();
2056            if config.property != self.property
2057                || query_vec.len() != config.dim
2058                || !(metric == "cosine" || metric == "cos")
2059            {
2060                return fallback(self, ctx, limit);
2061            }
2062
2063            let hits = match ctx.db.vector_search(&query_vec, limit) {
2064                Ok(h) => h,
2065                Err(_) => return fallback(self, ctx, limit),
2066            };
2067
2068            // Re-score candidates using the exact `vec_similarity` semantics to keep ordering
2069            // consistent with the executor.
2070            let alias = self.alias.clone();
2071            let property_expr = Expression::PropertyAccess(PropertyAccess {
2072                variable: alias.clone(),
2073                property: self.property.clone(),
2074            });
2075            let mut scored: Vec<(u64, Option<f32>)> = Vec::with_capacity(hits.len());
2076            for (node_id, _) in hits {
2077                let mut record = Record::new();
2078                record.insert(alias.clone(), Value::Node(node_id));
2079
2080                let value = evaluate_expression_value(&property_expr, &record, ctx);
2081                let score = value_to_vector(&value).and_then(|v| cosine_similarity(&v, &query_vec));
2082                scored.push((node_id, score));
2083            }
2084
2085            scored.sort_by(|(id_a, s_a), (id_b, s_b)| match (s_a, s_b) {
2086                (Some(a), Some(b)) => b
2087                    .partial_cmp(a)
2088                    .unwrap_or(std::cmp::Ordering::Equal)
2089                    .then_with(|| id_a.cmp(id_b)),
2090                (Some(_), None) => std::cmp::Ordering::Less,
2091                (None, Some(_)) => std::cmp::Ordering::Greater,
2092                (None, None) => id_a.cmp(id_b),
2093            });
2094
2095            let candidate_ids: Vec<u64> = scored.into_iter().map(|(id, _)| id).collect();
2096            return Ok(Box::new(candidate_ids.into_iter().map(move |node_id| {
2097                let mut record = Record::new();
2098                record.insert(alias.clone(), Value::Node(node_id));
2099                Ok(record)
2100            })));
2101        }
2102        #[cfg(not(all(feature = "vector", not(target_arch = "wasm32"))))]
2103        {
2104            fallback(self, ctx, limit)
2105        }
2106    }
2107
2108    fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
2109        let scan_est = ScanStats::estimate_scan_cardinality(ctx.db, &self.labels)
2110            .estimated_cardinality
2111            .max(1);
2112        scan_est.min(self.limit as usize)
2113    }
2114}
2115
2116/// 优化的全节点扫描 - 避免全表扫描
2117fn scan_all_nodes_optimized(
2118    db: &Database,
2119    alias: String,
2120) -> impl Iterator<Item = Result<Record, Error>> + '_ {
2121    // 策略:使用 SPO 索引扫描,提取唯一的 subject 节点
2122    // 然后使用 OSP 索引扫描,提取唯一的 object 节点
2123    // 合并去重
2124
2125    let mut unique_nodes = HashSet::new();
2126
2127    // 扫描所有三元组的 subjects
2128    let subject_criteria = QueryCriteria::default();
2129    for triple in db.query(subject_criteria).take(10000) {
2130        // 限制扫描量
2131        unique_nodes.insert(triple.subject_id);
2132        unique_nodes.insert(triple.object_id);
2133    }
2134
2135    unique_nodes.into_iter().map(move |node_id| {
2136        let mut record = Record::new();
2137        record.insert(alias.clone(), Value::Node(node_id));
2138        Ok(record)
2139    })
2140}
2141
2142/// 优化的标签节点扫描 - 使用类型索引
2143fn scan_labeled_nodes_optimized<'a>(
2144    db: &'a Database,
2145    labels: &'a [String],
2146    alias: String,
2147) -> impl Iterator<Item = Result<Record, Error>> + 'a {
2148    // 解析 "type" 谓词 ID
2149    let type_id = match db.resolve_id("type") {
2150        Ok(Some(id)) => id,
2151        _ => {
2152            return Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Result<Record, Error>>>;
2153        }
2154    };
2155
2156    let labels = labels.to_vec(); // Clone for move
2157
2158    Box::new(std::iter::once(()).flat_map(move |_| {
2159        let mut label_node_sets: Vec<HashSet<u64>> = Vec::new();
2160
2161        // 为每个标签收集节点
2162        for label in &labels {
2163            if let Ok(Some(label_id)) = db.resolve_id(label) {
2164                let criteria = QueryCriteria {
2165                    subject_id: None,
2166                    predicate_id: Some(type_id),
2167                    object_id: Some(label_id),
2168                };
2169
2170                let nodes: HashSet<u64> =
2171                    db.query(criteria).map(|triple| triple.subject_id).collect();
2172                label_node_sets.push(nodes);
2173            } else {
2174                // 标签不存在,返回空集合
2175                label_node_sets.push(HashSet::new());
2176            }
2177        }
2178
2179        // 计算标签交集(节点必须有所有指定标签)
2180        let final_nodes = if label_node_sets.is_empty() {
2181            HashSet::new()
2182        } else {
2183            label_node_sets
2184                .into_iter()
2185                .reduce(|acc, set| acc.intersection(&set).cloned().collect())
2186                .unwrap_or_default()
2187        };
2188
2189        let alias_clone = alias.clone();
2190        final_nodes.into_iter().map(move |node_id| {
2191            let mut record = Record::new();
2192            record.insert(alias_clone.clone(), Value::Node(node_id));
2193            Ok(record)
2194        })
2195    }))
2196}
2197
2198fn resolve_query_string(expr: &Expression, ctx: &ExecutionContext) -> Option<String> {
2199    match expr {
2200        Expression::Literal(Literal::String(s)) => Some(s.clone()),
2201        Expression::Parameter(name) => match ctx.params.get(name) {
2202            Some(Value::String(s)) => Some(s.clone()),
2203            _ => None,
2204        },
2205        _ => None,
2206    }
2207}
2208
2209fn resolve_label_ids(db: &Database, labels: &[String]) -> Option<(u64, Vec<u64>)> {
2210    if labels.is_empty() {
2211        return None;
2212    }
2213    let type_id = db.resolve_id("type").ok().flatten()?;
2214    let mut label_ids = Vec::with_capacity(labels.len());
2215    for label in labels {
2216        label_ids.push(db.resolve_id(label).ok().flatten()?);
2217    }
2218    Some((type_id, label_ids))
2219}
2220
2221fn node_has_labels(db: &Database, node_id: u64, type_id: u64, label_ids: &[u64]) -> bool {
2222    for label_id in label_ids {
2223        let criteria = QueryCriteria {
2224            subject_id: Some(node_id),
2225            predicate_id: Some(type_id),
2226            object_id: Some(*label_id),
2227        };
2228        if db.query(criteria).next().is_none() {
2229            return false;
2230        }
2231    }
2232    true
2233}
2234
2235#[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
2236fn resolve_query_string_streaming(expr: &Expression, ctx: &ArcExecutionContext) -> Option<String> {
2237    match expr {
2238        Expression::Literal(Literal::String(s)) => Some(s.clone()),
2239        Expression::Parameter(name) => match ctx.params.get(name) {
2240            Some(Value::String(s)) => Some(s.clone()),
2241            _ => None,
2242        },
2243        _ => None,
2244    }
2245}
2246
2247#[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
2248fn resolve_label_ids_streaming(db: &Arc<Database>, labels: &[String]) -> Option<(u64, Vec<u64>)> {
2249    if labels.is_empty() {
2250        return None;
2251    }
2252    let type_id = db.resolve_id("type").ok().flatten()?;
2253    let mut label_ids = Vec::with_capacity(labels.len());
2254    for label in labels {
2255        label_ids.push(db.resolve_id(label).ok().flatten()?);
2256    }
2257    Some((type_id, label_ids))
2258}
2259
2260#[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
2261fn node_has_labels_streaming(
2262    db: &Arc<Database>,
2263    node_id: u64,
2264    type_id: u64,
2265    label_ids: &[u64],
2266) -> bool {
2267    for label_id in label_ids {
2268        let criteria = QueryCriteria {
2269            subject_id: Some(node_id),
2270            predicate_id: Some(type_id),
2271            object_id: Some(*label_id),
2272        };
2273        if db.query(criteria).next().is_none() {
2274            return false;
2275        }
2276    }
2277    true
2278}
2279
2280// ============================================================================
2281// Index Nested Loop Join Implementation
2282// ============================================================================
2283
2284impl ExecutionPlan for NestedLoopJoinNode {
2285    fn execute<'a>(
2286        &'a self,
2287        ctx: &'a ExecutionContext<'a>,
2288    ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
2289        // 选择更优的 Join 顺序
2290        let left_card = self.left.estimate_cardinality(ctx);
2291        let right_card = self.right.estimate_cardinality(ctx);
2292
2293        if left_card <= right_card {
2294            // 左侧较小,使用 left -> right 顺序
2295            Ok(Box::new(IndexNestedLoopJoinIter::new(
2296                self.left.execute(ctx)?,
2297                &self.right,
2298                ctx,
2299            )))
2300        } else {
2301            // 右侧较小,交换顺序
2302            Ok(Box::new(IndexNestedLoopJoinIter::new(
2303                self.right.execute(ctx)?,
2304                &self.left,
2305                ctx,
2306            )))
2307        }
2308    }
2309
2310    fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
2311        // Join 基数 = 左侧基数 * 右侧基数 * 选择性
2312        // 假设选择性为 0.1(保守估计)
2313        let left_card = self.left.estimate_cardinality(ctx);
2314        let right_card = self.right.estimate_cardinality(ctx);
2315        (left_card * right_card / 10).max(1)
2316    }
2317}
2318
2319/// Index Nested Loop Join 迭代器 - 避免内存爆炸
2320struct IndexNestedLoopJoinIter<'a> {
2321    outer_iter: Box<dyn Iterator<Item = Result<Record, Error>> + 'a>,
2322    inner_plan: &'a PhysicalPlan,
2323    ctx: &'a ExecutionContext<'a>,
2324    current_outer: Option<Record>,
2325    current_inner: Option<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>>,
2326}
2327
2328impl<'a> IndexNestedLoopJoinIter<'a> {
2329    fn new(
2330        outer_iter: Box<dyn Iterator<Item = Result<Record, Error>> + 'a>,
2331        inner_plan: &'a PhysicalPlan,
2332        ctx: &'a ExecutionContext<'a>,
2333    ) -> Self {
2334        Self {
2335            outer_iter,
2336            inner_plan,
2337            ctx,
2338            current_outer: None,
2339            current_inner: None,
2340        }
2341    }
2342}
2343
2344impl<'a> Iterator for IndexNestedLoopJoinIter<'a> {
2345    type Item = Result<Record, Error>;
2346
2347    fn next(&mut self) -> Option<Self::Item> {
2348        loop {
2349            // 如果有当前内层迭代器,尝试获取下一个内层记录
2350            if let Some(ref mut inner_iter) = self.current_inner {
2351                if let Some(inner_result) = inner_iter.next() {
2352                    match inner_result {
2353                        Ok(inner_record) => {
2354                            if let Some(ref outer_record) = self.current_outer {
2355                                let mut joined = outer_record.clone();
2356                                joined.merge(&inner_record);
2357                                return Some(Ok(joined));
2358                            }
2359                        }
2360                        Err(e) => return Some(Err(e)),
2361                    }
2362                } else {
2363                    // 内层迭代器耗尽,清除状态
2364                    self.current_inner = None;
2365                    self.current_outer = None;
2366                }
2367            }
2368
2369            // 获取下一个外层记录
2370            match self.outer_iter.next() {
2371                Some(Ok(outer_record)) => {
2372                    // 为该外层记录创建新的内层迭代器
2373                    match self.inner_plan.execute(self.ctx) {
2374                        Ok(inner_iter) => {
2375                            self.current_outer = Some(outer_record);
2376                            self.current_inner = Some(inner_iter);
2377                            // 继续循环以获取第一个 join 结果
2378                        }
2379                        Err(e) => return Some(Err(e)),
2380                    }
2381                }
2382                Some(Err(e)) => return Some(Err(e)),
2383                None => return None, // 外层迭代器耗尽
2384            }
2385        }
2386    }
2387}
2388
2389impl ExecutionPlan for LeftOuterJoinNode {
2390    fn execute<'a>(
2391        &'a self,
2392        ctx: &'a ExecutionContext<'a>,
2393    ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
2394        Ok(Box::new(LeftOuterJoinIter::new(
2395            self.left.execute(ctx)?,
2396            &self.right,
2397            &self.right_aliases,
2398            ctx,
2399        )))
2400    }
2401
2402    fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
2403        let left_card = self.left.estimate_cardinality(ctx);
2404        let right_card = self.right.estimate_cardinality(ctx);
2405        (left_card * right_card / 10).max(left_card).max(1)
2406    }
2407}
2408
2409struct LeftOuterJoinIter<'a> {
2410    outer_iter: Box<dyn Iterator<Item = Result<Record, Error>> + 'a>,
2411    inner_plan: &'a PhysicalPlan,
2412    right_aliases: &'a [String],
2413    ctx: &'a ExecutionContext<'a>,
2414    current_outer: Option<Record>,
2415    current_inner: Option<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>>,
2416    matched_current_outer: bool,
2417    emitted_null_current_outer: bool,
2418}
2419
2420impl<'a> LeftOuterJoinIter<'a> {
2421    fn new(
2422        outer_iter: Box<dyn Iterator<Item = Result<Record, Error>> + 'a>,
2423        inner_plan: &'a PhysicalPlan,
2424        right_aliases: &'a [String],
2425        ctx: &'a ExecutionContext<'a>,
2426    ) -> Self {
2427        Self {
2428            outer_iter,
2429            inner_plan,
2430            right_aliases,
2431            ctx,
2432            current_outer: None,
2433            current_inner: None,
2434            matched_current_outer: false,
2435            emitted_null_current_outer: false,
2436        }
2437    }
2438
2439    fn emit_null_row(&self, mut outer: Record) -> Record {
2440        for alias in self.right_aliases {
2441            outer.values.entry(alias.clone()).or_insert(Value::Null);
2442        }
2443        outer
2444    }
2445}
2446
2447impl<'a> Iterator for LeftOuterJoinIter<'a> {
2448    type Item = Result<Record, Error>;
2449
2450    fn next(&mut self) -> Option<Self::Item> {
2451        loop {
2452            if let Some(ref mut inner_iter) = self.current_inner {
2453                if let Some(inner_result) = inner_iter.next() {
2454                    match inner_result {
2455                        Ok(inner_record) => {
2456                            let outer_record = self.current_outer.as_ref().unwrap().clone();
2457                            if let Some(joined) = try_merge_records(outer_record, inner_record) {
2458                                self.matched_current_outer = true;
2459                                return Some(Ok(joined));
2460                            }
2461                            continue;
2462                        }
2463                        Err(e) => return Some(Err(e)),
2464                    }
2465                }
2466
2467                // Inner exhausted; maybe emit NULL row, then advance.
2468                self.current_inner = None;
2469                if !self.matched_current_outer && !self.emitted_null_current_outer {
2470                    self.emitted_null_current_outer = true;
2471                    let outer_record = self.current_outer.take().unwrap();
2472                    return Some(Ok(self.emit_null_row(outer_record)));
2473                }
2474                self.current_outer = None;
2475                self.matched_current_outer = false;
2476                self.emitted_null_current_outer = false;
2477                continue;
2478            }
2479
2480            match self.outer_iter.next()? {
2481                Ok(outer_record) => match self.inner_plan.execute(self.ctx) {
2482                    Ok(inner_iter) => {
2483                        self.current_outer = Some(outer_record);
2484                        self.current_inner = Some(inner_iter);
2485                        self.matched_current_outer = false;
2486                        self.emitted_null_current_outer = false;
2487                    }
2488                    Err(e) => return Some(Err(e)),
2489                },
2490                Err(e) => return Some(Err(e)),
2491            }
2492        }
2493    }
2494}
2495
2496// ============================================================================
2497// Other ExecutionPlan Implementations
2498// ============================================================================
2499
2500impl ExecutionPlan for FilterNode {
2501    fn execute<'a>(
2502        &'a self,
2503        ctx: &'a ExecutionContext<'a>,
2504    ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
2505        let input_iter = self.input.execute(ctx)?;
2506        let predicate = self.predicate.clone();
2507
2508        Ok(Box::new(input_iter.filter_map(move |res| match res {
2509            Ok(record) => {
2510                if evaluate_expression(&predicate, &record, ctx) {
2511                    Some(Ok(record))
2512                } else {
2513                    None
2514                }
2515            }
2516            Err(e) => Some(Err(e)),
2517        })))
2518    }
2519
2520    fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
2521        // 过滤选择性假设为 0.1
2522        (self.input.estimate_cardinality(ctx) / 10).max(1)
2523    }
2524}
2525
2526impl ExecutionPlan for ProjectNode {
2527    fn execute<'a>(
2528        &'a self,
2529        ctx: &'a ExecutionContext<'a>,
2530    ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
2531        let input_iter = self.input.execute(ctx)?;
2532        let projections = self.projections.clone();
2533
2534        Ok(Box::new(input_iter.map(move |res| match res {
2535            Ok(record) => {
2536                let mut new_record = Record::new();
2537                for (expr, alias) in &projections {
2538                    let val = evaluate_expression_value(expr, &record, ctx);
2539                    new_record.insert(alias.clone(), val);
2540                }
2541                Ok(new_record)
2542            }
2543            Err(e) => Err(e),
2544        })))
2545    }
2546
2547    fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
2548        // 投影不改变基数
2549        self.input.estimate_cardinality(ctx)
2550    }
2551}
2552
2553/// Streaming variable-length expand iterator
2554struct StreamingExpandVarLengthIterator {
2555    input_iter: Box<dyn Iterator<Item = Result<Record, Error>> + 'static>,
2556    start_node_alias: String,
2557    end_node_alias: String,
2558    direction: RelationshipDirection,
2559    rel_type: Option<String>,
2560    min_hops: u32,
2561    max_hops: u32,
2562    ctx: Arc<ArcExecutionContext>,
2563    current_record: Option<Record>,
2564    current_expansions: Option<std::vec::IntoIter<u64>>,
2565}
2566
2567impl StreamingExpandVarLengthIterator {
2568    fn compute_expansions(&self, start_id: u64) -> Vec<u64> {
2569        let rel_predicate_id: Option<u64> = if let Some(ref rel_type) = self.rel_type {
2570            self.ctx.db.resolve_id(rel_type).ok().flatten()
2571        } else {
2572            None
2573        };
2574
2575        find_reachable_nodes(
2576            &self.ctx.db,
2577            start_id,
2578            self.direction.clone(),
2579            rel_predicate_id,
2580            self.min_hops,
2581            self.max_hops,
2582        )
2583    }
2584}
2585
2586impl Iterator for StreamingExpandVarLengthIterator {
2587    type Item = Result<Record, Error>;
2588
2589    fn next(&mut self) -> Option<Self::Item> {
2590        loop {
2591            if let Some(ref mut expansions) = self.current_expansions
2592                && let Some(end_id) = expansions.next()
2593            {
2594                let mut record = self.current_record.as_ref().unwrap().clone();
2595                record.insert(self.end_node_alias.clone(), Value::Node(end_id));
2596                return Some(Ok(record));
2597            }
2598
2599            self.current_record = None;
2600            self.current_expansions = None;
2601
2602            match self.input_iter.next()? {
2603                Ok(record) => {
2604                    let Some(Value::Node(start_id)) = record.values.get(&self.start_node_alias)
2605                    else {
2606                        continue;
2607                    };
2608                    let expansions = self.compute_expansions(*start_id);
2609                    self.current_record = Some(record);
2610                    self.current_expansions = Some(expansions.into_iter());
2611                }
2612                Err(e) => return Some(Err(e)),
2613            }
2614        }
2615    }
2616}
2617
2618impl ExecutionPlan for ExpandNode {
2619    fn execute<'a>(
2620        &'a self,
2621        ctx: &'a ExecutionContext<'a>,
2622    ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
2623        let input_iter = self.input.execute(ctx)?;
2624        let start_node_alias = self.start_node_alias.clone();
2625        let rel_alias = self.rel_alias.clone();
2626        let end_node_alias = self.end_node_alias.clone();
2627        let direction = self.direction.clone();
2628        let db = ctx.db;
2629
2630        // 解析关系类型
2631        let rel_predicate_id: Option<u64> = if let Some(ref rel_type) = self.rel_type {
2632            db.resolve_id(rel_type).ok().flatten()
2633        } else {
2634            None
2635        };
2636
2637        Ok(Box::new(input_iter.flat_map(
2638            move |res| -> Box<dyn Iterator<Item = Result<Record, Error>>> {
2639                match res {
2640                    Ok(record) => {
2641                        let start_val = record.get(&start_node_alias);
2642                        let start_id = match start_val {
2643                            Some(Value::Node(id)) => *id,
2644                            _ => return Box::new(std::iter::empty()),
2645                        };
2646
2647                        let criteria = match direction {
2648                            RelationshipDirection::LeftToRight => QueryCriteria {
2649                                subject_id: Some(start_id),
2650                                predicate_id: rel_predicate_id,
2651                                object_id: None,
2652                            },
2653                            RelationshipDirection::RightToLeft => QueryCriteria {
2654                                subject_id: None,
2655                                predicate_id: rel_predicate_id,
2656                                object_id: Some(start_id),
2657                            },
2658                            RelationshipDirection::Undirected => QueryCriteria {
2659                                subject_id: Some(start_id),
2660                                predicate_id: rel_predicate_id,
2661                                object_id: None,
2662                            },
2663                        };
2664
2665                        let triples = db.query(criteria);
2666                        let rel_alias = rel_alias.clone();
2667                        let end_node_alias = end_node_alias.clone();
2668                        let record = record.clone();
2669                        let direction = direction.clone();
2670
2671                        Box::new(triples.map(move |triple| {
2672                            let mut new_record = record.clone();
2673                            new_record.insert(rel_alias.clone(), Value::Relationship(triple));
2674
2675                            let end_id = if direction == RelationshipDirection::RightToLeft {
2676                                triple.subject_id
2677                            } else {
2678                                triple.object_id
2679                            };
2680                            new_record.insert(end_node_alias.clone(), Value::Node(end_id));
2681
2682                            Ok(new_record)
2683                        }))
2684                    }
2685                    Err(e) => Box::new(std::iter::once(Err(e))),
2686                }
2687            },
2688        )))
2689    }
2690
2691    fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
2692        // Expand 基数 = 输入基数 * 平均出度
2693        // 假设平均出度为 3
2694        self.input.estimate_cardinality(ctx) * 3
2695    }
2696}
2697
2698impl ExecutionPlan for ExpandVarLengthNode {
2699    fn execute<'a>(
2700        &'a self,
2701        ctx: &'a ExecutionContext<'a>,
2702    ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
2703        let input_iter = self.input.execute(ctx)?;
2704        let start_node_alias = self.start_node_alias.clone();
2705        let end_node_alias = self.end_node_alias.clone();
2706        let direction = self.direction.clone();
2707        let db = ctx.db;
2708
2709        let rel_predicate_id: Option<u64> = if let Some(ref rel_type) = self.rel_type {
2710            db.resolve_id(rel_type).ok().flatten()
2711        } else {
2712            None
2713        };
2714
2715        let min_hops = self.min_hops;
2716        let max_hops = self.max_hops;
2717
2718        Ok(Box::new(input_iter.flat_map(
2719            move |res| -> Box<dyn Iterator<Item = Result<Record, Error>> + 'a> {
2720                let record = match res {
2721                    Ok(record) => record,
2722                    Err(e) => return Box::new(std::iter::once(Err(e))),
2723                };
2724
2725                let Some(Value::Node(start_id)) = record.get(&start_node_alias) else {
2726                    return Box::new(std::iter::empty());
2727                };
2728
2729                let expansions = find_reachable_nodes(
2730                    db,
2731                    *start_id,
2732                    direction.clone(),
2733                    rel_predicate_id,
2734                    min_hops,
2735                    max_hops,
2736                );
2737
2738                let end_node_alias = end_node_alias.clone();
2739                Box::new(expansions.into_iter().map(move |end_id| {
2740                    let mut new_record = record.clone();
2741                    new_record.insert(end_node_alias.clone(), Value::Node(end_id));
2742                    Ok(new_record)
2743                }))
2744            },
2745        )))
2746    }
2747
2748    fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
2749        // Rough estimate: average branching factor 3 per hop.
2750        let hops = usize::try_from(self.max_hops).unwrap_or(1).max(1);
2751        self.input
2752            .estimate_cardinality(ctx)
2753            .saturating_mul(3usize.saturating_pow(hops as u32))
2754            .max(1)
2755    }
2756}
2757
2758fn find_reachable_nodes(
2759    db: &Database,
2760    start_id: u64,
2761    direction: RelationshipDirection,
2762    rel_predicate_id: Option<u64>,
2763    min_hops: u32,
2764    max_hops: u32,
2765) -> Vec<u64> {
2766    let mut results = Vec::new();
2767
2768    if min_hops == 0 {
2769        results.push(start_id);
2770    }
2771
2772    let mut queue: VecDeque<(u64, u32)> = VecDeque::new();
2773    let mut visited: HashSet<(u64, u32)> = HashSet::new();
2774    queue.push_back((start_id, 0));
2775    visited.insert((start_id, 0));
2776
2777    while let Some((node_id, depth)) = queue.pop_front() {
2778        if depth >= max_hops {
2779            continue;
2780        }
2781
2782        let next_depth = depth + 1;
2783
2784        let mut neighbors = Vec::new();
2785        match direction {
2786            RelationshipDirection::LeftToRight => {
2787                let criteria = QueryCriteria {
2788                    subject_id: Some(node_id),
2789                    predicate_id: rel_predicate_id,
2790                    object_id: None,
2791                };
2792                neighbors.extend(db.query(criteria).map(|t| t.object_id));
2793            }
2794            RelationshipDirection::RightToLeft => {
2795                let criteria = QueryCriteria {
2796                    subject_id: None,
2797                    predicate_id: rel_predicate_id,
2798                    object_id: Some(node_id),
2799                };
2800                neighbors.extend(db.query(criteria).map(|t| t.subject_id));
2801            }
2802            RelationshipDirection::Undirected => {
2803                let out = QueryCriteria {
2804                    subject_id: Some(node_id),
2805                    predicate_id: rel_predicate_id,
2806                    object_id: None,
2807                };
2808                neighbors.extend(db.query(out).map(|t| t.object_id));
2809
2810                let inc = QueryCriteria {
2811                    subject_id: None,
2812                    predicate_id: rel_predicate_id,
2813                    object_id: Some(node_id),
2814                };
2815                neighbors.extend(db.query(inc).map(|t| t.subject_id));
2816            }
2817        }
2818
2819        for neighbor in neighbors {
2820            if visited.insert((neighbor, next_depth)) {
2821                if next_depth >= min_hops {
2822                    results.push(neighbor);
2823                }
2824                queue.push_back((neighbor, next_depth));
2825            }
2826        }
2827    }
2828
2829    results
2830}
2831
2832// ============================================================================
2833// Limit
2834// ============================================================================
2835
2836impl ExecutionPlan for LimitNode {
2837    fn execute<'a>(
2838        &'a self,
2839        ctx: &'a ExecutionContext<'a>,
2840    ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
2841        let limit = usize::try_from(self.limit).unwrap_or(usize::MAX);
2842        let inner = self.input.execute(ctx)?;
2843
2844        struct LimitIter<I> {
2845            inner: I,
2846            remaining: usize,
2847        }
2848
2849        impl<I> Iterator for LimitIter<I>
2850        where
2851            I: Iterator<Item = Result<Record, Error>>,
2852        {
2853            type Item = Result<Record, Error>;
2854
2855            fn next(&mut self) -> Option<Self::Item> {
2856                if self.remaining == 0 {
2857                    return None;
2858                }
2859                match self.inner.next()? {
2860                    Ok(v) => {
2861                        self.remaining -= 1;
2862                        Some(Ok(v))
2863                    }
2864                    Err(e) => Some(Err(e)),
2865                }
2866            }
2867        }
2868
2869        Ok(Box::new(LimitIter {
2870            inner,
2871            remaining: limit,
2872        }))
2873    }
2874
2875    fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
2876        let inner = self.input.estimate_cardinality(ctx);
2877        inner.min(self.limit as usize)
2878    }
2879}
2880
2881// ============================================================================
2882// Skip
2883// ============================================================================
2884
2885impl ExecutionPlan for SkipNode {
2886    fn execute<'a>(
2887        &'a self,
2888        ctx: &'a ExecutionContext<'a>,
2889    ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
2890        let skip = usize::try_from(self.skip).unwrap_or(0);
2891        let inner = self.input.execute(ctx)?;
2892        Ok(Box::new(inner.skip(skip)))
2893    }
2894
2895    fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
2896        let inner = self.input.estimate_cardinality(ctx);
2897        inner.saturating_sub(self.skip as usize)
2898    }
2899}
2900
2901// ============================================================================
2902// Sort
2903// ============================================================================
2904
2905impl ExecutionPlan for SortNode {
2906    fn execute<'a>(
2907        &'a self,
2908        ctx: &'a ExecutionContext<'a>,
2909    ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
2910        // Sort requires materialization
2911        let mut records: Vec<Record> = self.input.execute(ctx)?.filter_map(|r| r.ok()).collect();
2912        let order_by = &self.order_by;
2913
2914        records.sort_by(|a, b| {
2915            for (expr, direction) in order_by {
2916                let val_a = evaluate_expression_value(expr, a, ctx);
2917                let val_b = evaluate_expression_value(expr, b, ctx);
2918                let cmp = compare_values_for_sort(&val_a, &val_b, direction);
2919                if cmp != std::cmp::Ordering::Equal {
2920                    return cmp;
2921                }
2922            }
2923            std::cmp::Ordering::Equal
2924        });
2925
2926        Ok(Box::new(records.into_iter().map(Ok)))
2927    }
2928
2929    fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
2930        self.input.estimate_cardinality(ctx)
2931    }
2932}
2933
2934// ============================================================================
2935// Aggregate
2936// ============================================================================
2937
2938impl ExecutionPlan for AggregateNode {
2939    fn execute<'a>(
2940        &'a self,
2941        ctx: &'a ExecutionContext<'a>,
2942    ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
2943        // Collect all input records
2944        let records: Vec<Record> = self.input.execute(ctx)?.filter_map(|r| r.ok()).collect();
2945
2946        // Simple case: no GROUP BY, aggregate all records into one result
2947        if self.group_by.is_empty() {
2948            let mut result = Record::new();
2949
2950            for (agg_func, alias) in &self.aggregations {
2951                let value = compute_aggregate(agg_func, &records, ctx);
2952                result.insert(alias.clone(), value);
2953            }
2954
2955            return Ok(Box::new(std::iter::once(Ok(result))));
2956        }
2957
2958        // GROUP BY case: group records and aggregate each group
2959        // For simplicity, we'll implement basic grouping
2960        let mut groups: HashMap<String, Vec<Record>> = HashMap::new();
2961
2962        for record in records {
2963            // Create group key from group_by expressions
2964            let key = self
2965                .group_by
2966                .iter()
2967                .map(|expr| format!("{:?}", evaluate_expression_value(expr, &record, ctx)))
2968                .collect::<Vec<_>>()
2969                .join("|");
2970            groups.entry(key).or_default().push(record);
2971        }
2972
2973        let results: Vec<Record> = groups
2974            .into_values()
2975            .map(|group_records| {
2976                let mut result = Record::new();
2977
2978                // Add group by values from first record
2979                if let Some(first) = group_records.first() {
2980                    for expr in &self.group_by {
2981                        if let Expression::Variable(name) = expr
2982                            && let Some(val) = first.get(name)
2983                        {
2984                            result.insert(name.clone(), val.clone());
2985                        }
2986                    }
2987                }
2988
2989                // Compute aggregates
2990                for (agg_func, alias) in &self.aggregations {
2991                    let value = compute_aggregate(agg_func, &group_records, ctx);
2992                    result.insert(alias.clone(), value);
2993                }
2994
2995                result
2996            })
2997            .collect();
2998
2999        Ok(Box::new(results.into_iter().map(Ok)))
3000    }
3001
3002    fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
3003        if self.group_by.is_empty() {
3004            1 // Single aggregate result
3005        } else {
3006            // Estimate: assume 10% of input are unique groups
3007            (self.input.estimate_cardinality(ctx) / 10).max(1)
3008        }
3009    }
3010}
3011
3012/// Compute aggregate function over a set of records
3013fn compute_aggregate(
3014    func: &AggregateFunction,
3015    records: &[Record],
3016    ctx: &ExecutionContext,
3017) -> Value {
3018    match func {
3019        AggregateFunction::Count(expr) => {
3020            let count = if let Some(e) = expr {
3021                records
3022                    .iter()
3023                    .filter(|r| !matches!(evaluate_expression_value(e, r, ctx), Value::Null))
3024                    .count()
3025            } else {
3026                records.len() // count(*)
3027            };
3028            Value::Float(count as f64)
3029        }
3030        AggregateFunction::Sum(expr) => {
3031            let sum: f64 = records
3032                .iter()
3033                .filter_map(|r| {
3034                    if let Value::Float(f) = evaluate_expression_value(expr, r, ctx) {
3035                        Some(f)
3036                    } else {
3037                        None
3038                    }
3039                })
3040                .sum();
3041            Value::Float(sum)
3042        }
3043        AggregateFunction::Avg(expr) => {
3044            let values: Vec<f64> = records
3045                .iter()
3046                .filter_map(|r| {
3047                    if let Value::Float(f) = evaluate_expression_value(expr, r, ctx) {
3048                        Some(f)
3049                    } else {
3050                        None
3051                    }
3052                })
3053                .collect();
3054            if values.is_empty() {
3055                Value::Null
3056            } else {
3057                Value::Float(values.iter().sum::<f64>() / values.len() as f64)
3058            }
3059        }
3060        AggregateFunction::Min(expr) => records
3061            .iter()
3062            .map(|r| evaluate_expression_value(expr, r, ctx))
3063            .filter(|v| !matches!(v, Value::Null))
3064            .min_by(compare_values)
3065            .unwrap_or(Value::Null),
3066        AggregateFunction::Max(expr) => records
3067            .iter()
3068            .map(|r| evaluate_expression_value(expr, r, ctx))
3069            .filter(|v| !matches!(v, Value::Null))
3070            .max_by(compare_values)
3071            .unwrap_or(Value::Null),
3072        AggregateFunction::Collect(expr) => {
3073            // Collect returns a list - for now we'll represent as a string
3074            let values: Vec<String> = records
3075                .iter()
3076                .map(|r| format!("{:?}", evaluate_expression_value(expr, r, ctx)))
3077                .collect();
3078            Value::String(format!("[{}]", values.join(", ")))
3079        }
3080    }
3081}
3082
3083/// Compare two Values for ordering
3084fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
3085    match (a, b) {
3086        (Value::Float(x), Value::Float(y)) => x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal),
3087        (Value::String(x), Value::String(y)) => x.cmp(y),
3088        (Value::Boolean(x), Value::Boolean(y)) => x.cmp(y),
3089        (Value::Node(x), Value::Node(y)) => x.cmp(y),
3090        (Value::Null, Value::Null) => std::cmp::Ordering::Equal,
3091        (Value::Null, _) => std::cmp::Ordering::Greater, // NULL sorts last (ASC)
3092        (_, Value::Null) => std::cmp::Ordering::Less,
3093        _ => std::cmp::Ordering::Equal,
3094    }
3095}
3096
3097fn compare_values_for_sort(a: &Value, b: &Value, direction: &Direction) -> std::cmp::Ordering {
3098    // Keep NULLs last for both ASC and DESC.
3099    match (a, b) {
3100        (Value::Null, Value::Null) => std::cmp::Ordering::Equal,
3101        (Value::Null, _) => std::cmp::Ordering::Greater,
3102        (_, Value::Null) => std::cmp::Ordering::Less,
3103        _ => {
3104            let cmp = compare_values(a, b);
3105            match direction {
3106                Direction::Ascending => cmp,
3107                Direction::Descending => cmp.reverse(),
3108            }
3109        }
3110    }
3111}
3112
3113// ============================================================================
3114// Expression Evaluation (从原 executor.rs 复制)
3115// ============================================================================
3116
3117fn evaluate_expression(expr: &Expression, record: &Record, ctx: &ExecutionContext) -> bool {
3118    match evaluate_expression_value(expr, record, ctx) {
3119        Value::Boolean(b) => b,
3120        _ => false,
3121    }
3122}
3123
3124pub fn evaluate_expression_value(
3125    expr: &Expression,
3126    record: &Record,
3127    ctx: &ExecutionContext,
3128) -> Value {
3129    match expr {
3130        Expression::Literal(l) => match l {
3131            Literal::String(s) => Value::String(s.clone()),
3132            Literal::Float(f) => Value::Float(*f),
3133            Literal::Integer(i) => Value::Float(*i as f64),
3134            Literal::Boolean(b) => Value::Boolean(*b),
3135            Literal::Null => Value::Null,
3136        },
3137        Expression::Variable(name) => record.get(name).cloned().unwrap_or(Value::Null),
3138        Expression::Parameter(name) => ctx.params.get(name).cloned().unwrap_or(Value::Null),
3139        Expression::PropertyAccess(pa) => {
3140            if let Some(Value::Node(node_id)) = record.get(&pa.variable)
3141                && let Ok(Some(binary)) = ctx.db.get_node_property_binary(*node_id)
3142                && let Ok(props) = crate::storage::property::deserialize_properties(&binary)
3143                && let Some(value) = props.get(&pa.property)
3144            {
3145                return match value {
3146                    serde_json::Value::String(s) => Value::String(s.clone()),
3147                    serde_json::Value::Number(n) => Value::Float(n.as_f64().unwrap_or(0.0)),
3148                    serde_json::Value::Bool(b) => Value::Boolean(*b),
3149                    serde_json::Value::Null => Value::Null,
3150                    serde_json::Value::Array(items) => {
3151                        let mut out = Vec::with_capacity(items.len());
3152                        for item in items {
3153                            let Some(n) = item.as_f64() else {
3154                                return Value::String(
3155                                    serde_json::Value::Array(items.clone()).to_string(),
3156                                );
3157                            };
3158                            out.push(n as f32);
3159                        }
3160                        Value::Vector(out)
3161                    }
3162                    _ => Value::Null,
3163                };
3164            }
3165            Value::Null
3166        }
3167        Expression::Binary(b) => {
3168            let left = evaluate_expression_value(&b.left, record, ctx);
3169            let right = evaluate_expression_value(&b.right, record, ctx);
3170
3171            match b.operator {
3172                BinaryOperator::Equal => Value::Boolean(left == right),
3173                BinaryOperator::NotEqual => Value::Boolean(left != right),
3174                BinaryOperator::And => match (left, right) {
3175                    (Value::Boolean(l), Value::Boolean(r)) => Value::Boolean(l && r),
3176                    _ => Value::Null,
3177                },
3178                BinaryOperator::Or => match (left, right) {
3179                    (Value::Boolean(l), Value::Boolean(r)) => Value::Boolean(l || r),
3180                    _ => Value::Null,
3181                },
3182                BinaryOperator::LessThan => match (left, right) {
3183                    (Value::Float(l), Value::Float(r)) => Value::Boolean(l < r),
3184                    _ => Value::Null,
3185                },
3186                BinaryOperator::LessThanOrEqual => match (left, right) {
3187                    (Value::Float(l), Value::Float(r)) => Value::Boolean(l <= r),
3188                    _ => Value::Null,
3189                },
3190                BinaryOperator::GreaterThan => match (left, right) {
3191                    (Value::Float(l), Value::Float(r)) => Value::Boolean(l > r),
3192                    _ => Value::Null,
3193                },
3194                BinaryOperator::GreaterThanOrEqual => match (left, right) {
3195                    (Value::Float(l), Value::Float(r)) => Value::Boolean(l >= r),
3196                    _ => Value::Null,
3197                },
3198                BinaryOperator::Add => match (left, right) {
3199                    (Value::Float(l), Value::Float(r)) => Value::Float(l + r),
3200                    (Value::String(l), Value::String(r)) => Value::String(format!("{}{}", l, r)),
3201                    _ => Value::Null,
3202                },
3203                BinaryOperator::Subtract => match (left, right) {
3204                    (Value::Float(l), Value::Float(r)) => Value::Float(l - r),
3205                    _ => Value::Null,
3206                },
3207                BinaryOperator::Multiply => match (left, right) {
3208                    (Value::Float(l), Value::Float(r)) => Value::Float(l * r),
3209                    _ => Value::Null,
3210                },
3211                BinaryOperator::Divide => match (left, right) {
3212                    (Value::Float(l), Value::Float(r)) if r != 0.0 => Value::Float(l / r),
3213                    _ => Value::Null,
3214                },
3215                BinaryOperator::Modulo => match (left, right) {
3216                    (Value::Float(l), Value::Float(r)) if r != 0.0 => Value::Float(l % r),
3217                    _ => Value::Null,
3218                },
3219                BinaryOperator::In => value_in_list(&left, &right),
3220                BinaryOperator::NotIn => match value_in_list(&left, &right) {
3221                    Value::Boolean(b) => Value::Boolean(!b),
3222                    other => other,
3223                },
3224                BinaryOperator::StartsWith => match (left, right) {
3225                    (Value::String(l), Value::String(r)) => Value::Boolean(l.starts_with(&r)),
3226                    _ => Value::Null,
3227                },
3228                BinaryOperator::EndsWith => match (left, right) {
3229                    (Value::String(l), Value::String(r)) => Value::Boolean(l.ends_with(&r)),
3230                    _ => Value::Null,
3231                },
3232                BinaryOperator::Contains => match (left, right) {
3233                    (Value::String(l), Value::String(r)) => Value::Boolean(l.contains(&r)),
3234                    _ => Value::Null,
3235                },
3236                _ => Value::Null,
3237            }
3238        }
3239        Expression::Unary(u) => {
3240            let arg = evaluate_expression_value(&u.argument, record, ctx);
3241            match u.operator {
3242                crate::query::ast::UnaryOperator::Not => match arg {
3243                    Value::Boolean(b) => Value::Boolean(!b),
3244                    _ => Value::Null,
3245                },
3246                crate::query::ast::UnaryOperator::Negate => match arg {
3247                    Value::Float(f) => Value::Float(-f),
3248                    _ => Value::Null,
3249                },
3250            }
3251        }
3252        Expression::Case(case_expr) => {
3253            for alt in &case_expr.alternatives {
3254                if evaluate_expression(&alt.when, record, ctx) {
3255                    return evaluate_expression_value(&alt.then, record, ctx);
3256                }
3257            }
3258            match &case_expr.else_expression {
3259                Some(expr) => evaluate_expression_value(expr, record, ctx),
3260                None => Value::Null,
3261            }
3262        }
3263        Expression::Exists(exists_expr) => {
3264            Value::Boolean(evaluate_exists(exists_expr.as_ref(), record, ctx))
3265        }
3266        Expression::List(elements) => list_literal_value(elements, record, ctx),
3267        Expression::ListComprehension(comp) => list_comprehension_value(comp.as_ref(), record, ctx),
3268        Expression::FunctionCall(func) => match func.name.to_lowercase().as_str() {
3269            "vec_similarity" => {
3270                let Some(left) = func
3271                    .arguments
3272                    .first()
3273                    .map(|arg| evaluate_expression_value(arg, record, ctx))
3274                else {
3275                    return Value::Null;
3276                };
3277                let Some(right) = func
3278                    .arguments
3279                    .get(1)
3280                    .map(|arg| evaluate_expression_value(arg, record, ctx))
3281                else {
3282                    return Value::Null;
3283                };
3284                let Some(left_vec) = value_to_vector(&left) else {
3285                    return Value::Null;
3286                };
3287                let Some(right_vec) = value_to_vector(&right) else {
3288                    return Value::Null;
3289                };
3290                let Some(sim) = cosine_similarity(&left_vec, &right_vec) else {
3291                    return Value::Null;
3292                };
3293                Value::Float(sim as f64)
3294            }
3295            "txt_score" => {
3296                #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
3297                {
3298                    let Some(Expression::PropertyAccess(pa)) = func.arguments.first() else {
3299                        return Value::Null;
3300                    };
3301                    let Some(Value::Node(node_id)) = record.get(&pa.variable) else {
3302                        return Value::Null;
3303                    };
3304                    let Some(query_expr) = func.arguments.get(1) else {
3305                        return Value::Null;
3306                    };
3307                    let Value::String(query) = evaluate_expression_value(query_expr, record, ctx)
3308                    else {
3309                        return Value::Null;
3310                    };
3311                    Value::Float(ctx.db.fts_txt_score(*node_id, &pa.property, &query))
3312                }
3313                #[cfg(not(all(feature = "fts", not(target_arch = "wasm32"))))]
3314                {
3315                    Value::Float(0.0)
3316                }
3317            }
3318            "id" => match func
3319                .arguments
3320                .first()
3321                .map(|arg| evaluate_expression_value(arg, record, ctx))
3322            {
3323                Some(Value::Node(id)) => Value::Float(id as f64),
3324                _ => Value::Null,
3325            },
3326            "type" => match func
3327                .arguments
3328                .first()
3329                .map(|arg| evaluate_expression_value(arg, record, ctx))
3330            {
3331                Some(Value::Relationship(triple)) => relationship_type_value(ctx.db, &triple),
3332                _ => Value::Null,
3333            },
3334            "labels" => match func
3335                .arguments
3336                .first()
3337                .map(|arg| evaluate_expression_value(arg, record, ctx))
3338            {
3339                Some(Value::Node(id)) => node_labels_value(ctx.db, id),
3340                _ => Value::Null,
3341            },
3342            "keys" => match func
3343                .arguments
3344                .first()
3345                .map(|arg| evaluate_expression_value(arg, record, ctx))
3346            {
3347                Some(Value::Node(id)) => node_property_keys_value(ctx.db, id),
3348                Some(Value::Relationship(triple)) => edge_property_keys_value(ctx.db, &triple),
3349                _ => Value::Null,
3350            },
3351            "size" => match func
3352                .arguments
3353                .first()
3354                .map(|arg| evaluate_expression_value(arg, record, ctx))
3355            {
3356                Some(Value::String(s)) => Value::Float(s.len() as f64),
3357                _ => Value::Null,
3358            },
3359            "toupper" => match func
3360                .arguments
3361                .first()
3362                .map(|arg| evaluate_expression_value(arg, record, ctx))
3363            {
3364                Some(Value::String(s)) => Value::String(s.to_uppercase()),
3365                _ => Value::Null,
3366            },
3367            "tolower" => match func
3368                .arguments
3369                .first()
3370                .map(|arg| evaluate_expression_value(arg, record, ctx))
3371            {
3372                Some(Value::String(s)) => Value::String(s.to_lowercase()),
3373                _ => Value::Null,
3374            },
3375            "trim" => match func
3376                .arguments
3377                .first()
3378                .map(|arg| evaluate_expression_value(arg, record, ctx))
3379            {
3380                Some(Value::String(s)) => Value::String(s.trim().to_string()),
3381                _ => Value::Null,
3382            },
3383            "coalesce" => {
3384                for arg in &func.arguments {
3385                    let v = evaluate_expression_value(arg, record, ctx);
3386                    if !matches!(v, Value::Null) {
3387                        return v;
3388                    }
3389                }
3390                Value::Null
3391            }
3392            _ => Value::Null,
3393        },
3394        _ => Value::Null,
3395    }
3396}
3397
3398fn list_literal_value(elements: &[Expression], record: &Record, ctx: &ExecutionContext) -> Value {
3399    let json = serde_json::Value::Array(
3400        elements
3401            .iter()
3402            .map(|e| executor_value_to_json(&evaluate_expression_value(e, record, ctx)))
3403            .collect(),
3404    );
3405    Value::String(json.to_string())
3406}
3407
3408fn list_comprehension_value(
3409    comp: &ListComprehension,
3410    record: &Record,
3411    ctx: &ExecutionContext,
3412) -> Value {
3413    let Some(items) = evaluate_list_source(&comp.list, record, ctx) else {
3414        return Value::Null;
3415    };
3416
3417    let mut out = Vec::new();
3418    for item in items {
3419        let mut scoped = record.clone();
3420        scoped.insert(comp.variable.clone(), item);
3421
3422        if let Some(where_expr) = &comp.where_expression
3423            && !evaluate_expression(where_expr, &scoped, ctx)
3424        {
3425            continue;
3426        }
3427
3428        let mapped = match &comp.map_expression {
3429            Some(expr) => evaluate_expression_value(expr, &scoped, ctx),
3430            None => scoped.get(&comp.variable).cloned().unwrap_or(Value::Null),
3431        };
3432        out.push(executor_value_to_json(&mapped));
3433    }
3434
3435    Value::String(serde_json::Value::Array(out).to_string())
3436}
3437
3438fn evaluate_list_source(
3439    expr: &Expression,
3440    record: &Record,
3441    ctx: &ExecutionContext,
3442) -> Option<Vec<Value>> {
3443    match expr {
3444        Expression::List(elements) => Some(
3445            elements
3446                .iter()
3447                .map(|e| evaluate_expression_value(e, record, ctx))
3448                .collect(),
3449        ),
3450        _ => match evaluate_expression_value(expr, record, ctx) {
3451            Value::String(s) => parse_executor_list_string(&s),
3452            _ => None,
3453        },
3454    }
3455}
3456fn evaluate_exists(
3457    exists_expr: &ExistsExpression,
3458    record: &Record,
3459    ctx: &ExecutionContext,
3460) -> bool {
3461    match exists_expr {
3462        ExistsExpression::Pattern(pattern) => exists_match_pattern(pattern, None, record, ctx),
3463        ExistsExpression::Subquery(query) => {
3464            let (pattern, where_expr) = match extract_exists_match_query(query) {
3465                Some(v) => v,
3466                None => return false,
3467            };
3468            exists_match_pattern(pattern, where_expr, record, ctx)
3469        }
3470    }
3471}
3472
3473fn exists_match_pattern(
3474    pattern: &Pattern,
3475    where_expr: Option<&Expression>,
3476    outer_record: &Record,
3477    ctx: &ExecutionContext,
3478) -> bool {
3479    let Some(PathElement::Node(start_node)) = pattern.elements.first() else {
3480        return false;
3481    };
3482
3483    if let Some(var) = &start_node.variable
3484        && let Some(Value::Node(start_id)) = outer_record.get(var)
3485    {
3486        if !node_satisfies(*start_id, start_node, outer_record, ctx) {
3487            return false;
3488        }
3489        return exists_path_from_node(pattern, 0, *start_id, outer_record, where_expr, ctx);
3490    }
3491
3492    exists_uncorrelated_match(pattern, where_expr, ctx)
3493}
3494
3495fn exists_uncorrelated_match(
3496    pattern: &Pattern,
3497    where_expr: Option<&Expression>,
3498    ctx: &ExecutionContext,
3499) -> bool {
3500    use crate::query::ast::{MatchClause, Query, ReturnClause, ReturnItem, WhereClause};
3501    use crate::query::planner::QueryPlanner;
3502
3503    let mut clauses: Vec<Clause> = Vec::new();
3504    clauses.push(Clause::Match(MatchClause {
3505        optional: false,
3506        pattern: pattern.clone(),
3507    }));
3508    if let Some(expr) = where_expr.cloned() {
3509        clauses.push(Clause::Where(WhereClause { expression: expr }));
3510    }
3511    clauses.push(Clause::Return(ReturnClause {
3512        distinct: false,
3513        items: vec![ReturnItem {
3514            expression: Expression::Literal(Literal::Integer(1)),
3515            alias: Some("_exists".to_string()),
3516        }],
3517        order_by: None,
3518        limit: Some(1),
3519        skip: None,
3520    }));
3521
3522    let planner = QueryPlanner::new();
3523    let plan = match planner.plan(Query { clauses }) {
3524        Ok(plan) => plan,
3525        Err(_) => return false,
3526    };
3527
3528    match plan.execute(ctx) {
3529        Ok(mut iter) => iter.next().is_some(),
3530        Err(_) => false,
3531    }
3532}
3533
3534fn exists_path_from_node(
3535    pattern: &Pattern,
3536    node_index: usize,
3537    current_node_id: u64,
3538    bindings: &Record,
3539    where_expr: Option<&Expression>,
3540    ctx: &ExecutionContext,
3541) -> bool {
3542    let next_rel_index = node_index + 1;
3543    let next_node_index = node_index + 2;
3544
3545    if next_node_index >= pattern.elements.len() {
3546        return where_expr.is_none_or(|expr| evaluate_expression(expr, bindings, ctx));
3547    }
3548
3549    let PathElement::Relationship(rel) = &pattern.elements[next_rel_index] else {
3550        return false;
3551    };
3552    let PathElement::Node(next_node) = &pattern.elements[next_node_index] else {
3553        return false;
3554    };
3555
3556    if rel.variable_length.is_some() {
3557        return exists_var_length_step(
3558            pattern,
3559            next_node_index,
3560            current_node_id,
3561            rel,
3562            bindings,
3563            where_expr,
3564            ctx,
3565        );
3566    }
3567
3568    for (triple, end_id) in iter_matching_edges(ctx.db, current_node_id, rel) {
3569        let mut new_record = bindings.clone();
3570
3571        if let Some(rel_var) = &rel.variable {
3572            match new_record.get(rel_var) {
3573                Some(Value::Relationship(existing)) if existing == &triple => {}
3574                Some(_) => continue,
3575                None => new_record.insert(rel_var.clone(), Value::Relationship(triple)),
3576            }
3577        }
3578
3579        if let Some(props) = &rel.properties
3580            && !edge_satisfies(&triple, props, &new_record, ctx)
3581        {
3582            continue;
3583        }
3584
3585        if !node_satisfies(end_id, next_node, &new_record, ctx) {
3586            continue;
3587        }
3588
3589        if let Some(node_var) = &next_node.variable {
3590            match new_record.get(node_var) {
3591                Some(Value::Node(existing)) if *existing == end_id => {}
3592                Some(_) => continue,
3593                None => new_record.insert(node_var.clone(), Value::Node(end_id)),
3594            }
3595        }
3596
3597        if exists_path_from_node(
3598            pattern,
3599            next_node_index,
3600            end_id,
3601            &new_record,
3602            where_expr,
3603            ctx,
3604        ) {
3605            return true;
3606        }
3607    }
3608
3609    false
3610}
3611
3612fn exists_var_length_step(
3613    pattern: &Pattern,
3614    next_node_index: usize,
3615    current_node_id: u64,
3616    rel: &RelationshipPattern,
3617    bindings: &Record,
3618    where_expr: Option<&Expression>,
3619    ctx: &ExecutionContext,
3620) -> bool {
3621    let PathElement::Node(next_node) = &pattern.elements[next_node_index] else {
3622        return false;
3623    };
3624    if rel.variable.is_some() || rel.properties.is_some() {
3625        return false;
3626    }
3627    let Some(var_len) = &rel.variable_length else {
3628        return false;
3629    };
3630    let min_hops = var_len.min.unwrap_or(1);
3631    let Some(max_hops) = var_len.max else {
3632        return false;
3633    };
3634
3635    if rel.types.len() > 1 {
3636        return false;
3637    }
3638
3639    let rel_predicate_id = rel
3640        .types
3641        .first()
3642        .and_then(|t| ctx.db.resolve_id(t).ok().flatten());
3643
3644    let reachable = find_reachable_nodes(
3645        ctx.db,
3646        current_node_id,
3647        rel.direction.clone(),
3648        rel_predicate_id,
3649        min_hops,
3650        max_hops,
3651    );
3652
3653    for end_id in reachable {
3654        let mut new_record = bindings.clone();
3655
3656        if !node_satisfies(end_id, next_node, &new_record, ctx) {
3657            continue;
3658        }
3659
3660        if let Some(node_var) = &next_node.variable {
3661            match new_record.get(node_var) {
3662                Some(Value::Node(existing)) if *existing == end_id => {}
3663                Some(_) => continue,
3664                None => new_record.insert(node_var.clone(), Value::Node(end_id)),
3665            }
3666        }
3667
3668        if exists_path_from_node(
3669            pattern,
3670            next_node_index,
3671            end_id,
3672            &new_record,
3673            where_expr,
3674            ctx,
3675        ) {
3676            return true;
3677        }
3678    }
3679
3680    false
3681}
3682
3683fn node_satisfies(
3684    node_id: u64,
3685    node: &crate::query::ast::NodePattern,
3686    bindings: &Record,
3687    ctx: &ExecutionContext,
3688) -> bool {
3689    if let Some(var) = &node.variable
3690        && let Some(Value::Node(bound)) = bindings.get(var)
3691        && *bound != node_id
3692    {
3693        return false;
3694    }
3695
3696    if !node.labels.is_empty() {
3697        let Some(type_id) = ctx.db.resolve_id("type").ok().flatten() else {
3698            return false;
3699        };
3700        for label in &node.labels {
3701            let Some(label_id) = ctx.db.resolve_id(label).ok().flatten() else {
3702                return false;
3703            };
3704            let criteria = QueryCriteria {
3705                subject_id: Some(node_id),
3706                predicate_id: Some(type_id),
3707                object_id: Some(label_id),
3708            };
3709            if ctx.db.query(criteria).next().is_none() {
3710                return false;
3711            }
3712        }
3713    }
3714
3715    if let Some(props) = &node.properties
3716        && !node_properties_match(node_id, props, bindings, ctx)
3717    {
3718        return false;
3719    }
3720
3721    true
3722}
3723
3724fn node_properties_match(
3725    node_id: u64,
3726    props: &PropertyMap,
3727    bindings: &Record,
3728    ctx: &ExecutionContext,
3729) -> bool {
3730    if props.properties.is_empty() {
3731        return true;
3732    }
3733    let Ok(Some(binary)) = ctx.db.get_node_property_binary(node_id) else {
3734        return false;
3735    };
3736    let Ok(stored) = crate::storage::property::deserialize_properties(&binary) else {
3737        return false;
3738    };
3739
3740    for pair in &props.properties {
3741        let expected = evaluate_expression_value(&pair.value, bindings, ctx);
3742        let Some(actual) = stored.get(&pair.key) else {
3743            return false;
3744        };
3745        if !json_value_matches_executor_value(actual, &expected) {
3746            return false;
3747        }
3748    }
3749
3750    true
3751}
3752
3753fn edge_satisfies(
3754    triple: &Triple,
3755    props: &PropertyMap,
3756    bindings: &Record,
3757    ctx: &ExecutionContext,
3758) -> bool {
3759    if props.properties.is_empty() {
3760        return true;
3761    }
3762    let Ok(Some(binary)) =
3763        ctx.db
3764            .get_edge_property_binary(triple.subject_id, triple.predicate_id, triple.object_id)
3765    else {
3766        return false;
3767    };
3768    let Ok(stored) = crate::storage::property::deserialize_properties(&binary) else {
3769        return false;
3770    };
3771
3772    for pair in &props.properties {
3773        let expected = evaluate_expression_value(&pair.value, bindings, ctx);
3774        let Some(actual) = stored.get(&pair.key) else {
3775            return false;
3776        };
3777        if !json_value_matches_executor_value(actual, &expected) {
3778            return false;
3779        }
3780    }
3781
3782    true
3783}
3784
3785#[cfg(test)]
3786mod tests {
3787    use super::*;
3788    use crate::Database;
3789    use tempfile::tempdir;
3790
3791    #[test]
3792    fn test_optimized_scan_empty_labels() {
3793        let dir = tempdir().unwrap();
3794        let path = dir.path().join("test.nervus");
3795        let mut db = Database::open(crate::Options::new(&path)).unwrap();
3796
3797        // 创建测试数据
3798        db.add_fact(crate::Fact::new("alice", "knows", "bob"))
3799            .unwrap();
3800        db.add_fact(crate::Fact::new("bob", "knows", "charlie"))
3801            .unwrap();
3802
3803        let ctx = ExecutionContext {
3804            db: &db,
3805            params: &HashMap::new(),
3806        };
3807
3808        let scan_node = ScanNode {
3809            alias: "n".to_string(),
3810            labels: vec![],
3811        };
3812
3813        let results: Vec<_> = scan_node.execute(&ctx).unwrap().collect();
3814
3815        // 应该找到所有唯一节点:alice, bob, charlie
3816        assert!(results.len() >= 3);
3817        assert!(results.iter().all(|r| r.is_ok()));
3818    }
3819
3820    #[test]
3821    fn test_cardinality_estimation() {
3822        let dir = tempdir().unwrap();
3823        let path = dir.path().join("test.nervus");
3824        let mut db = Database::open(crate::Options::new(&path)).unwrap();
3825
3826        // 添加标签
3827        db.add_fact(crate::Fact::new("alice", "type", "Person"))
3828            .unwrap();
3829        db.add_fact(crate::Fact::new("bob", "type", "Person"))
3830            .unwrap();
3831        db.add_fact(crate::Fact::new("charlie", "type", "Robot"))
3832            .unwrap();
3833
3834        let ctx = ExecutionContext {
3835            db: &db,
3836            params: &HashMap::new(),
3837        };
3838
3839        // 测试无标签扫描的基数估算
3840        let scan_all = ScanNode {
3841            alias: "n".to_string(),
3842            labels: vec![],
3843        };
3844        let card_all = scan_all.estimate_cardinality(&ctx);
3845        assert!(card_all > 0);
3846
3847        // 测试有标签扫描的基数估算
3848        let scan_person = ScanNode {
3849            alias: "p".to_string(),
3850            labels: vec!["Person".to_string()],
3851        };
3852        let card_person = scan_person.estimate_cardinality(&ctx);
3853        assert!(card_person > 0);
3854
3855        // 有标签的基数应该 <= 无标签的基数
3856        println!("card_all = {}, card_person = {}", card_all, card_person);
3857        assert!(card_person <= card_all);
3858    }
3859}