1use crate::error::Error;
13use crate::query::ast::{
14 BinaryOperator, Clause, Direction, ExistsExpression, Expression, FunctionCall,
15 ListComprehension, Literal, PathElement, Pattern, PropertyAccess, PropertyMap,
16 RelationshipDirection, RelationshipPattern,
17};
18use crate::query::planner::{
19 AggregateFunction, AggregateNode, DistinctNode, ExpandNode, ExpandVarLengthNode, FilterNode,
20 FtsCandidateScanNode, LeftOuterJoinNode, LimitNode, NestedLoopJoinNode, PhysicalPlan,
21 ProjectNode, ScanNode, SingleRowNode, SkipNode, SortNode, UnwindNode, VectorTopKScanNode,
22};
23use crate::{Database, QueryCriteria, Triple};
24use std::collections::{HashMap, HashSet, VecDeque};
25
26#[derive(Debug, Clone)]
27pub enum Value {
28 String(String),
29 Float(f64),
30 Boolean(bool),
31 Null,
32 Vector(Vec<f32>),
33 Node(u64),
34 Relationship(Triple),
35}
36
37impl PartialEq for Value {
38 fn eq(&self, other: &Self) -> bool {
39 match (self, other) {
40 (Value::String(a), Value::String(b)) => a == b,
41 (Value::Float(a), Value::Float(b)) => a == b,
42 (Value::Boolean(a), Value::Boolean(b)) => a == b,
43 (Value::Null, Value::Null) => true,
44 (Value::Vector(a), Value::Vector(b)) => a == b,
45 (Value::Node(a), Value::Node(b)) => a == b,
46 (Value::Relationship(a), Value::Relationship(b)) => a == b,
47 _ => false,
48 }
49 }
50}
51
52#[derive(Debug, Clone)]
53pub struct Record {
54 pub values: HashMap<String, Value>,
55}
56
57impl Record {
58 pub fn new() -> Self {
59 Self {
60 values: HashMap::new(),
61 }
62 }
63
64 pub fn get(&self, key: &str) -> Option<&Value> {
65 self.values.get(key)
66 }
67
68 pub fn insert(&mut self, key: String, value: Value) {
69 self.values.insert(key, value);
70 }
71
72 pub fn merge(&mut self, other: &Record) {
73 for (k, v) in &other.values {
74 self.values.insert(k.clone(), v.clone());
75 }
76 }
77}
78
79fn record_distinct_key(record: &Record) -> String {
80 let mut keys: Vec<&String> = record.values.keys().collect();
81 keys.sort();
82 let mut out = String::new();
83 for key in keys {
84 if let Some(value) = record.values.get(key) {
85 out.push_str(key);
86 out.push('=');
87 out.push_str(&format!("{:?};", value));
88 }
89 }
90 out
91}
92
93impl Default for Record {
94 fn default() -> Self {
95 Self::new()
96 }
97}
98
99fn try_merge_records(mut left: Record, right: Record) -> Option<Record> {
100 for (k, v) in right.values {
101 if let Some(existing) = left.values.get(&k) {
102 if existing != &v {
103 return None;
104 }
105 } else {
106 left.values.insert(k, v);
107 }
108 }
109 Some(left)
110}
111
112pub struct ExecutionContext<'a> {
113 pub db: &'a Database,
114 pub params: &'a HashMap<String, Value>,
115}
116
117pub struct ArcExecutionContext {
120 pub db: std::sync::Arc<Database>,
121 pub params: std::sync::Arc<HashMap<String, Value>>,
122}
123
124impl ArcExecutionContext {
125 pub fn new(db: std::sync::Arc<Database>, params: HashMap<String, Value>) -> Self {
126 Self {
127 db,
128 params: std::sync::Arc::new(params),
129 }
130 }
131}
132
133pub struct OwnedExecutionContext {
136 pub db_ptr: *const Database,
137 pub params: HashMap<String, Value>,
138}
139
140impl OwnedExecutionContext {
141 pub unsafe fn db(&self) -> &Database {
146 unsafe { &*self.db_ptr }
147 }
148}
149
150#[derive(Debug)]
152pub struct ScanStats {
153 pub estimated_cardinality: usize,
154 pub has_labels: bool,
155}
156
157impl ScanStats {
158 pub fn estimate_scan_cardinality(db: &Database, labels: &[String]) -> Self {
160 if labels.is_empty() {
161 let mut unique_nodes = std::collections::HashSet::new();
164 let sample_criteria = crate::QueryCriteria::default();
165 let sample_count = 100; for triple in db.query(sample_criteria).take(sample_count) {
168 unique_nodes.insert(triple.subject_id);
169 unique_nodes.insert(triple.object_id);
170 }
171
172 let estimated_nodes = if unique_nodes.len() < sample_count / 2 {
174 unique_nodes.len() } else {
176 unique_nodes.len() * 2 };
178
179 Self {
180 estimated_cardinality: estimated_nodes.max(1),
181 has_labels: false,
182 }
183 } else {
184 let mut total_labeled_nodes = 0;
186
187 if let Ok(Some(type_id)) = db.resolve_id("type") {
189 for label in labels {
190 if let Ok(Some(label_id)) = db.resolve_id(label) {
191 let criteria = QueryCriteria {
193 subject_id: None,
194 predicate_id: Some(type_id),
195 object_id: Some(label_id),
196 };
197
198 let labeled_count = db.query(criteria).count();
199 if total_labeled_nodes == 0 {
200 total_labeled_nodes = labeled_count;
201 } else {
202 total_labeled_nodes = (total_labeled_nodes * labeled_count / 10).max(1);
204 }
205 }
206 }
207 }
208
209 Self {
210 estimated_cardinality: total_labeled_nodes.max(1),
211 has_labels: true,
212 }
213 }
214 }
215}
216
217pub trait ExecutionPlan {
222 fn execute<'a>(
223 &'a self,
224 ctx: &'a ExecutionContext<'a>,
225 ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error>;
226
227 fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize;
229}
230
231impl ExecutionPlan for PhysicalPlan {
232 fn execute<'a>(
233 &'a self,
234 ctx: &'a ExecutionContext<'a>,
235 ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
236 match self {
237 PhysicalPlan::SingleRow(node) => node.execute(ctx),
238 PhysicalPlan::Scan(node) => node.execute(ctx),
239 PhysicalPlan::FtsCandidateScan(node) => node.execute(ctx),
240 PhysicalPlan::VectorTopKScan(node) => node.execute(ctx),
241 PhysicalPlan::Filter(node) => node.execute(ctx),
242 PhysicalPlan::Project(node) => node.execute(ctx),
243 PhysicalPlan::Limit(node) => node.execute(ctx),
244 PhysicalPlan::Skip(node) => node.execute(ctx),
245 PhysicalPlan::Sort(node) => node.execute(ctx),
246 PhysicalPlan::Distinct(node) => node.execute(ctx),
247 PhysicalPlan::Aggregate(node) => node.execute(ctx),
248 PhysicalPlan::NestedLoopJoin(node) => node.execute(ctx),
249 PhysicalPlan::LeftOuterJoin(node) => node.execute(ctx),
250 PhysicalPlan::Expand(node) => node.execute(ctx),
251 PhysicalPlan::ExpandVarLength(node) => node.execute(ctx),
252 PhysicalPlan::Unwind(node) => node.execute(ctx),
253 _ => Err(Error::Other("Unsupported physical plan type".to_string())),
254 }
255 }
256
257 fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
258 match self {
259 PhysicalPlan::SingleRow(node) => node.estimate_cardinality(ctx),
260 PhysicalPlan::Scan(node) => node.estimate_cardinality(ctx),
261 PhysicalPlan::FtsCandidateScan(node) => node.estimate_cardinality(ctx),
262 PhysicalPlan::VectorTopKScan(node) => node.estimate_cardinality(ctx),
263 PhysicalPlan::Filter(node) => node.estimate_cardinality(ctx),
264 PhysicalPlan::Project(node) => node.estimate_cardinality(ctx),
265 PhysicalPlan::Limit(node) => node.estimate_cardinality(ctx),
266 PhysicalPlan::Skip(node) => node.estimate_cardinality(ctx),
267 PhysicalPlan::Sort(node) => node.estimate_cardinality(ctx),
268 PhysicalPlan::Distinct(node) => node.estimate_cardinality(ctx),
269 PhysicalPlan::Aggregate(node) => node.estimate_cardinality(ctx),
270 PhysicalPlan::NestedLoopJoin(node) => node.estimate_cardinality(ctx),
271 PhysicalPlan::LeftOuterJoin(node) => node.estimate_cardinality(ctx),
272 PhysicalPlan::Expand(node) => node.estimate_cardinality(ctx),
273 PhysicalPlan::ExpandVarLength(node) => node.estimate_cardinality(ctx),
274 PhysicalPlan::Unwind(node) => node.estimate_cardinality(ctx),
275 _ => 1,
276 }
277 }
278}
279
280use std::sync::Arc;
281
282impl PhysicalPlan {
283 pub fn execute_streaming(
289 self,
290 ctx: Arc<ArcExecutionContext>,
291 ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'static>, Error> {
292 match self {
293 PhysicalPlan::SingleRow(_) => Ok(Box::new(std::iter::once(Ok(Record::new())))),
294 PhysicalPlan::Scan(node) => {
295 let alias = node.alias;
296 let labels = node.labels;
297 let db = Arc::clone(&ctx.db);
298
299 if labels.is_empty() {
300 Ok(Box::new(scan_all_nodes_streaming(db, alias)))
301 } else {
302 Ok(Box::new(scan_labeled_nodes_streaming(db, labels, alias)))
303 }
304 }
305 PhysicalPlan::FtsCandidateScan(node) => {
306 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
307 {
308 let alias = node.alias;
309 let labels = node.labels;
310 let property = node.property;
311 let query_expr = node.query;
312 let db = Arc::clone(&ctx.db);
313
314 let query = match resolve_query_string_streaming(&query_expr, &ctx) {
315 Some(q) => q,
316 None => {
317 return Ok(Box::new(std::iter::empty())
318 as Box<dyn Iterator<Item = Result<Record, Error>> + 'static>);
319 }
320 };
321
322 let Some(scores) = db.fts_scores_for_query(property.as_str(), query.as_str())
323 else {
324 return Ok(Box::new(std::iter::empty())
325 as Box<dyn Iterator<Item = Result<Record, Error>> + 'static>);
326 };
327
328 let candidate_ids: Vec<u64> = scores.keys().copied().collect();
329 let type_and_labels = resolve_label_ids_streaming(&db, &labels);
330
331 return Ok(Box::new(candidate_ids.into_iter().filter_map(
332 move |node_id| {
333 if let Some((type_id, label_ids)) = type_and_labels.as_ref()
334 && !node_has_labels_streaming(&db, node_id, *type_id, label_ids)
335 {
336 return None;
337 }
338
339 let mut record = Record::new();
340 record.insert(alias.clone(), Value::Node(node_id));
341 Some(Ok(record))
342 },
343 )));
344 }
345
346 #[cfg(not(all(feature = "fts", not(target_arch = "wasm32"))))]
347 {
348 let _ = node;
349 Ok(Box::new(std::iter::empty())
350 as Box<
351 dyn Iterator<Item = Result<Record, Error>> + 'static,
352 >)
353 }
354 }
355 PhysicalPlan::VectorTopKScan(node) => {
356 let exec_ctx = ExecutionContext {
359 db: ctx.db.as_ref(),
360 params: ctx.params.as_ref(),
361 };
362 let iter = node.execute(&exec_ctx)?;
363 let mut records = Vec::new();
364 for item in iter {
365 records.push(item?);
366 }
367 Ok(Box::new(records.into_iter().map(Ok)))
368 }
369 PhysicalPlan::Filter(node) => {
370 let input_iter = node.input.execute_streaming(Arc::clone(&ctx))?;
371 let predicate = node.predicate;
372 let ctx_clone = Arc::clone(&ctx);
373 Ok(Box::new(input_iter.filter(move |result| {
374 match result {
375 Ok(record) => evaluate_expression_streaming(&predicate, record, &ctx_clone),
376 Err(_) => true, }
378 })))
379 }
380 PhysicalPlan::Project(node) => {
381 let input_iter = node.input.execute_streaming(Arc::clone(&ctx))?;
382 let projections = node.projections;
383 let ctx_clone = Arc::clone(&ctx);
384 Ok(Box::new(input_iter.map(move |result| {
385 result.map(|record| {
386 let mut new_record = Record::new();
387 for (expr, alias) in &projections {
388 let value =
389 evaluate_expression_value_streaming(expr, &record, &ctx_clone);
390 new_record.insert(alias.clone(), value);
391 }
392 new_record
393 })
394 })))
395 }
396 PhysicalPlan::Limit(node) => {
397 let limit = usize::try_from(node.limit).unwrap_or(usize::MAX);
398 let input_iter = node.input.execute_streaming(ctx)?;
399 Ok(Box::new(input_iter.take(limit)))
400 }
401 PhysicalPlan::Skip(node) => {
402 let skip = usize::try_from(node.skip).unwrap_or(0);
403 let input_iter = node.input.execute_streaming(ctx)?;
404 Ok(Box::new(input_iter.skip(skip)))
405 }
406 PhysicalPlan::Sort(node) => {
407 let input_iter = node.input.execute_streaming(Arc::clone(&ctx))?;
408 let order_by = node.order_by;
409 let ctx_clone = Arc::clone(&ctx);
410 let mut records: Vec<Record> = input_iter.filter_map(|r| r.ok()).collect();
412 records.sort_by(|a, b| {
413 for (expr, direction) in &order_by {
414 let val_a = evaluate_expression_value_streaming(expr, a, &ctx_clone);
415 let val_b = evaluate_expression_value_streaming(expr, b, &ctx_clone);
416 let cmp = compare_values_for_sort(&val_a, &val_b, direction);
417 if cmp != std::cmp::Ordering::Equal {
418 return cmp;
419 }
420 }
421 std::cmp::Ordering::Equal
422 });
423 Ok(Box::new(records.into_iter().map(Ok)))
424 }
425 PhysicalPlan::Distinct(node) => {
426 let input_iter = node.input.execute_streaming(Arc::clone(&ctx))?;
427 let mut seen: HashSet<String> = HashSet::new();
428 Ok(Box::new(input_iter.filter_map(
429 move |result| match result {
430 Ok(record) => {
431 let key = record_distinct_key(&record);
432 if seen.insert(key) {
433 Some(Ok(record))
434 } else {
435 None
436 }
437 }
438 Err(err) => Some(Err(err)),
439 },
440 )))
441 }
442 PhysicalPlan::Expand(node) => {
443 let input_iter = node.input.execute_streaming(Arc::clone(&ctx))?;
444 Ok(Box::new(StreamingExpandIterator::new(
445 input_iter,
446 node.start_node_alias,
447 node.rel_alias,
448 node.end_node_alias,
449 node.direction,
450 node.rel_type,
451 ctx,
452 )))
453 }
454 PhysicalPlan::Unwind(node) => {
455 let input_iter = node.input.execute_streaming(Arc::clone(&ctx))?;
456 let expression = node.expression;
457 let alias = node.alias;
458 let ctx_clone = Arc::clone(&ctx);
459 Ok(Box::new(input_iter.flat_map(move |result| {
460 match result {
461 Ok(record) => {
462 match unwind_values_streaming(&expression, &record, &ctx_clone) {
463 Ok(values) => values
464 .into_iter()
465 .map(|value| {
466 let mut new_record = record.clone();
467 new_record.insert(alias.clone(), value);
468 Ok(new_record)
469 })
470 .collect::<Vec<_>>(),
471 Err(err) => vec![Err(err)],
472 }
473 }
474 Err(err) => vec![Err(err)],
475 }
476 })))
477 }
478 PhysicalPlan::ExpandVarLength(node) => {
479 let input_iter = node.input.execute_streaming(Arc::clone(&ctx))?;
480 Ok(Box::new(StreamingExpandVarLengthIterator {
481 input_iter,
482 start_node_alias: node.start_node_alias,
483 end_node_alias: node.end_node_alias,
484 direction: node.direction,
485 rel_type: node.rel_type,
486 min_hops: node.min_hops,
487 max_hops: node.max_hops,
488 ctx,
489 current_record: None,
490 current_expansions: None,
491 }))
492 }
493 PhysicalPlan::NestedLoopJoin(node) => {
494 let left_iter = node.left.execute_streaming(Arc::clone(&ctx))?;
495 let right_plan = *node.right;
496 let predicate = node.predicate;
497 Ok(Box::new(StreamingNestedLoopJoin::new(
498 left_iter, right_plan, predicate, ctx,
499 )))
500 }
501 PhysicalPlan::LeftOuterJoin(node) => {
502 let left_iter = node.left.execute_streaming(Arc::clone(&ctx))?;
503 let right_plan = *node.right;
504 Ok(Box::new(StreamingLeftOuterJoin::new(
505 left_iter,
506 right_plan,
507 node.right_aliases,
508 ctx,
509 )))
510 }
511 _ => Err(Error::Other(
512 "Unsupported physical plan type for streaming".to_string(),
513 )),
514 }
515 }
516}
517
518fn scan_all_nodes_streaming(
524 db: Arc<Database>,
525 alias: String,
526) -> impl Iterator<Item = Result<Record, Error>> + Send + 'static {
527 let mut unique_nodes = HashSet::new();
528 let subject_criteria = crate::QueryCriteria::default();
529
530 for triple in db.query(subject_criteria).take(10000) {
531 unique_nodes.insert(triple.subject_id);
532 unique_nodes.insert(triple.object_id);
533 }
534
535 unique_nodes.into_iter().map(move |node_id| {
536 let mut record = Record::new();
537 record.insert(alias.clone(), Value::Node(node_id));
538 Ok(record)
539 })
540}
541
542fn scan_labeled_nodes_streaming(
544 db: Arc<Database>,
545 labels: Vec<String>,
546 alias: String,
547) -> impl Iterator<Item = Result<Record, Error>> + Send + 'static {
548 let type_id = match db.resolve_id("type") {
549 Ok(Some(id)) => Some(id),
550 _ => None,
551 };
552
553 let node_ids: Vec<u64> = if let Some(type_id) = type_id {
554 let mut nodes = HashSet::new();
555 for label in &labels {
556 if let Ok(Some(label_id)) = db.resolve_id(label) {
557 let criteria = crate::QueryCriteria {
558 subject_id: None,
559 predicate_id: Some(type_id),
560 object_id: Some(label_id),
561 };
562 for triple in db.query(criteria) {
563 nodes.insert(triple.subject_id);
564 }
565 }
566 }
567 nodes.into_iter().collect()
568 } else {
569 Vec::new()
570 };
571
572 node_ids.into_iter().map(move |node_id| {
573 let mut record = Record::new();
574 record.insert(alias.clone(), Value::Node(node_id));
575 Ok(record)
576 })
577}
578
579struct StreamingNestedLoopJoin {
581 left_iter: Box<dyn Iterator<Item = Result<Record, Error>> + 'static>,
582 right_plan: PhysicalPlan,
583 predicate: Option<Expression>,
584 ctx: Arc<ArcExecutionContext>,
585 current_left: Option<Record>,
586 current_right: Option<Box<dyn Iterator<Item = Result<Record, Error>> + 'static>>,
587}
588
589impl StreamingNestedLoopJoin {
590 fn new(
591 left_iter: Box<dyn Iterator<Item = Result<Record, Error>> + 'static>,
592 right_plan: PhysicalPlan,
593 predicate: Option<Expression>,
594 ctx: Arc<ArcExecutionContext>,
595 ) -> Self {
596 Self {
597 left_iter,
598 right_plan,
599 predicate,
600 ctx,
601 current_left: None,
602 current_right: None,
603 }
604 }
605}
606
607impl Iterator for StreamingNestedLoopJoin {
608 type Item = Result<Record, Error>;
609
610 fn next(&mut self) -> Option<Self::Item> {
611 loop {
612 if let Some(ref mut right_iter) = self.current_right
614 && let Some(right_result) = right_iter.next()
615 {
616 match right_result {
617 Ok(right_record) => {
618 let left_record = self.current_left.as_ref().unwrap();
619 let mut merged = left_record.clone();
620 for (k, v) in right_record.values {
621 merged.insert(k, v);
622 }
623
624 if let Some(ref pred) = self.predicate
626 && !evaluate_expression_streaming(pred, &merged, &self.ctx)
627 {
628 continue;
629 }
630 return Some(Ok(merged));
631 }
632 Err(e) => return Some(Err(e)),
633 }
634 }
635
636 match self.left_iter.next()? {
638 Ok(left_record) => {
639 self.current_left = Some(left_record);
640 match self
642 .right_plan
643 .clone()
644 .execute_streaming(Arc::clone(&self.ctx))
645 {
646 Ok(right_iter) => {
647 self.current_right = Some(right_iter);
648 }
649 Err(e) => return Some(Err(e)),
650 }
651 }
652 Err(e) => return Some(Err(e)),
653 }
654 }
655 }
656}
657
658struct StreamingLeftOuterJoin {
662 left_iter: Box<dyn Iterator<Item = Result<Record, Error>> + 'static>,
663 right_plan: PhysicalPlan,
664 right_aliases: Vec<String>,
665 ctx: Arc<ArcExecutionContext>,
666 current_left: Option<Record>,
667 current_right: Option<Box<dyn Iterator<Item = Result<Record, Error>> + 'static>>,
668 matched_current_left: bool,
669 emitted_null_current_left: bool,
670}
671
672impl StreamingLeftOuterJoin {
673 fn new(
674 left_iter: Box<dyn Iterator<Item = Result<Record, Error>> + 'static>,
675 right_plan: PhysicalPlan,
676 right_aliases: Vec<String>,
677 ctx: Arc<ArcExecutionContext>,
678 ) -> Self {
679 Self {
680 left_iter,
681 right_plan,
682 right_aliases,
683 ctx,
684 current_left: None,
685 current_right: None,
686 matched_current_left: false,
687 emitted_null_current_left: false,
688 }
689 }
690
691 fn emit_null_row(&mut self, mut left_record: Record) -> Record {
692 for alias in &self.right_aliases {
693 left_record
694 .values
695 .entry(alias.clone())
696 .or_insert(Value::Null);
697 }
698 left_record
699 }
700}
701
702impl Iterator for StreamingLeftOuterJoin {
703 type Item = Result<Record, Error>;
704
705 fn next(&mut self) -> Option<Self::Item> {
706 loop {
707 if let Some(ref mut right_iter) = self.current_right {
708 if let Some(right_result) = right_iter.next() {
709 match right_result {
710 Ok(right_record) => {
711 let left_record = self.current_left.as_ref().unwrap().clone();
712 if let Some(merged) = try_merge_records(left_record, right_record) {
713 self.matched_current_left = true;
714 return Some(Ok(merged));
715 }
716 continue;
717 }
718 Err(e) => return Some(Err(e)),
719 }
720 }
721
722 self.current_right = None;
724 if !self.matched_current_left && !self.emitted_null_current_left {
725 self.emitted_null_current_left = true;
726 let left_record = self.current_left.take().unwrap();
727 return Some(Ok(self.emit_null_row(left_record)));
728 }
729 self.current_left = None;
730 self.matched_current_left = false;
731 self.emitted_null_current_left = false;
732 continue;
733 }
734
735 match self.left_iter.next()? {
737 Ok(left_record) => {
738 self.current_left = Some(left_record);
739 self.matched_current_left = false;
740 self.emitted_null_current_left = false;
741 match self
742 .right_plan
743 .clone()
744 .execute_streaming(Arc::clone(&self.ctx))
745 {
746 Ok(right_iter) => self.current_right = Some(right_iter),
747 Err(e) => return Some(Err(e)),
748 }
749 }
750 Err(e) => return Some(Err(e)),
751 }
752 }
753 }
754}
755
756struct StreamingExpandIterator {
758 input_iter: Box<dyn Iterator<Item = Result<Record, Error>> + 'static>,
759 start_node_alias: String,
760 rel_alias: String,
761 end_node_alias: String,
762 direction: RelationshipDirection,
763 rel_type: Option<String>,
764 ctx: Arc<ArcExecutionContext>,
765 current_record: Option<Record>,
766 current_expansions: Option<std::vec::IntoIter<(crate::Triple, u64)>>,
767}
768
769impl StreamingExpandIterator {
770 fn new(
771 input_iter: Box<dyn Iterator<Item = Result<Record, Error>> + 'static>,
772 start_node_alias: String,
773 rel_alias: String,
774 end_node_alias: String,
775 direction: RelationshipDirection,
776 rel_type: Option<String>,
777 ctx: Arc<ArcExecutionContext>,
778 ) -> Self {
779 Self {
780 input_iter,
781 start_node_alias,
782 rel_alias,
783 end_node_alias,
784 direction,
785 rel_type,
786 ctx,
787 current_record: None,
788 current_expansions: None,
789 }
790 }
791
792 fn get_expansions(&self, node_id: u64) -> Vec<(crate::Triple, u64)> {
793 let db = &self.ctx.db;
794 let mut results = Vec::new();
795
796 let rel_type_id = self
797 .rel_type
798 .as_ref()
799 .and_then(|t| db.resolve_id(t).ok().flatten());
800
801 match self.direction {
802 RelationshipDirection::LeftToRight | RelationshipDirection::Undirected => {
803 let criteria = crate::QueryCriteria {
804 subject_id: Some(node_id),
805 predicate_id: rel_type_id,
806 object_id: None,
807 };
808 for triple in db.query(criteria) {
809 results.push((triple, triple.object_id));
810 }
811 }
812 _ => {}
813 }
814
815 if self.direction == RelationshipDirection::RightToLeft {
816 let criteria = crate::QueryCriteria {
817 subject_id: None,
818 predicate_id: rel_type_id,
819 object_id: Some(node_id),
820 };
821 for triple in db.query(criteria) {
822 results.push((triple, triple.subject_id));
823 }
824 }
825
826 results
827 }
828}
829
830impl Iterator for StreamingExpandIterator {
831 type Item = Result<Record, Error>;
832
833 fn next(&mut self) -> Option<Self::Item> {
834 loop {
835 if let Some(ref mut expansions) = self.current_expansions
837 && let Some((triple, end_node_id)) = expansions.next()
838 {
839 let record = self.current_record.as_ref().unwrap();
840 let mut new_record = record.clone();
841 new_record.insert(self.rel_alias.clone(), Value::Relationship(triple));
842 new_record.insert(self.end_node_alias.clone(), Value::Node(end_node_id));
843 return Some(Ok(new_record));
844 }
845
846 match self.input_iter.next()? {
848 Ok(record) => {
849 if let Some(Value::Node(node_id)) = record.values.get(&self.start_node_alias) {
850 let expansions = self.get_expansions(*node_id);
851 self.current_record = Some(record);
852 self.current_expansions = Some(expansions.into_iter());
853 }
854 }
855 Err(e) => return Some(Err(e)),
856 }
857 }
858 }
859}
860
861fn evaluate_expression_streaming(
863 expr: &Expression,
864 record: &Record,
865 ctx: &ArcExecutionContext,
866) -> bool {
867 match evaluate_expression_value_streaming(expr, record, ctx) {
868 Value::Boolean(b) => b,
869 _ => false,
870 }
871}
872
873fn evaluate_expression_value_streaming(
875 expr: &Expression,
876 record: &Record,
877 ctx: &ArcExecutionContext,
878) -> Value {
879 match expr {
880 Expression::Literal(l) => match l {
881 Literal::String(s) => Value::String(s.clone()),
882 Literal::Float(f) => Value::Float(*f),
883 Literal::Integer(i) => Value::Float(*i as f64),
884 Literal::Boolean(b) => Value::Boolean(*b),
885 Literal::Null => Value::Null,
886 },
887 Expression::Variable(name) => record.get(name).cloned().unwrap_or(Value::Null),
888 Expression::Parameter(name) => ctx.params.get(name).cloned().unwrap_or(Value::Null),
889 Expression::PropertyAccess(pa) => {
890 if let Some(Value::Node(node_id)) = record.get(&pa.variable)
891 && let Ok(Some(binary)) = ctx.db.get_node_property_binary(*node_id)
892 && let Ok(props) = crate::storage::property::deserialize_properties(&binary)
893 && let Some(value) = props.get(&pa.property)
894 {
895 return match value {
896 serde_json::Value::String(s) => Value::String(s.clone()),
897 serde_json::Value::Number(n) => Value::Float(n.as_f64().unwrap_or(0.0)),
898 serde_json::Value::Bool(b) => Value::Boolean(*b),
899 serde_json::Value::Null => Value::Null,
900 serde_json::Value::Array(items) => {
901 let mut out = Vec::with_capacity(items.len());
902 for item in items {
903 let Some(n) = item.as_f64() else {
904 return Value::String(
905 serde_json::Value::Array(items.clone()).to_string(),
906 );
907 };
908 out.push(n as f32);
909 }
910 Value::Vector(out)
911 }
912 _ => Value::Null,
913 };
914 }
915 Value::Null
916 }
917 Expression::Binary(b) => {
918 let left = evaluate_expression_value_streaming(&b.left, record, ctx);
919 let right = evaluate_expression_value_streaming(&b.right, record, ctx);
920
921 match b.operator {
922 BinaryOperator::Equal => Value::Boolean(left == right),
923 BinaryOperator::NotEqual => Value::Boolean(left != right),
924 BinaryOperator::And => match (left, right) {
925 (Value::Boolean(l), Value::Boolean(r)) => Value::Boolean(l && r),
926 _ => Value::Null,
927 },
928 BinaryOperator::Or => match (left, right) {
929 (Value::Boolean(l), Value::Boolean(r)) => Value::Boolean(l || r),
930 _ => Value::Null,
931 },
932 BinaryOperator::LessThan => match (left, right) {
933 (Value::Float(l), Value::Float(r)) => Value::Boolean(l < r),
934 _ => Value::Null,
935 },
936 BinaryOperator::LessThanOrEqual => match (left, right) {
937 (Value::Float(l), Value::Float(r)) => Value::Boolean(l <= r),
938 _ => Value::Null,
939 },
940 BinaryOperator::GreaterThan => match (left, right) {
941 (Value::Float(l), Value::Float(r)) => Value::Boolean(l > r),
942 _ => Value::Null,
943 },
944 BinaryOperator::GreaterThanOrEqual => match (left, right) {
945 (Value::Float(l), Value::Float(r)) => Value::Boolean(l >= r),
946 _ => Value::Null,
947 },
948 BinaryOperator::Add => match (left, right) {
949 (Value::Float(l), Value::Float(r)) => Value::Float(l + r),
950 (Value::String(l), Value::String(r)) => Value::String(format!("{}{}", l, r)),
951 _ => Value::Null,
952 },
953 BinaryOperator::Subtract => match (left, right) {
954 (Value::Float(l), Value::Float(r)) => Value::Float(l - r),
955 _ => Value::Null,
956 },
957 BinaryOperator::Multiply => match (left, right) {
958 (Value::Float(l), Value::Float(r)) => Value::Float(l * r),
959 _ => Value::Null,
960 },
961 BinaryOperator::Divide => match (left, right) {
962 (Value::Float(l), Value::Float(r)) if r != 0.0 => Value::Float(l / r),
963 _ => Value::Null,
964 },
965 BinaryOperator::Modulo => match (left, right) {
966 (Value::Float(l), Value::Float(r)) if r != 0.0 => Value::Float(l % r),
967 _ => Value::Null,
968 },
969 BinaryOperator::In => value_in_list(&left, &right),
970 BinaryOperator::NotIn => match value_in_list(&left, &right) {
971 Value::Boolean(b) => Value::Boolean(!b),
972 other => other,
973 },
974 BinaryOperator::StartsWith => match (left, right) {
975 (Value::String(l), Value::String(r)) => Value::Boolean(l.starts_with(&r)),
976 _ => Value::Null,
977 },
978 BinaryOperator::EndsWith => match (left, right) {
979 (Value::String(l), Value::String(r)) => Value::Boolean(l.ends_with(&r)),
980 _ => Value::Null,
981 },
982 BinaryOperator::Contains => match (left, right) {
983 (Value::String(l), Value::String(r)) => Value::Boolean(l.contains(&r)),
984 _ => Value::Null,
985 },
986 _ => Value::Null,
987 }
988 }
989 Expression::Unary(u) => {
990 let arg = evaluate_expression_value_streaming(&u.argument, record, ctx);
991 match u.operator {
992 crate::query::ast::UnaryOperator::Not => match arg {
993 Value::Boolean(b) => Value::Boolean(!b),
994 _ => Value::Null,
995 },
996 crate::query::ast::UnaryOperator::Negate => match arg {
997 Value::Float(f) => Value::Float(-f),
998 _ => Value::Null,
999 },
1000 }
1001 }
1002 Expression::Case(case_expr) => {
1003 for alt in &case_expr.alternatives {
1004 if evaluate_expression_streaming(&alt.when, record, ctx) {
1005 return evaluate_expression_value_streaming(&alt.then, record, ctx);
1006 }
1007 }
1008 match &case_expr.else_expression {
1009 Some(expr) => evaluate_expression_value_streaming(expr, record, ctx),
1010 None => Value::Null,
1011 }
1012 }
1013 Expression::Exists(exists_expr) => {
1014 Value::Boolean(evaluate_exists_streaming(exists_expr.as_ref(), record, ctx))
1015 }
1016 Expression::List(elements) => list_literal_value_streaming(elements, record, ctx),
1017 Expression::ListComprehension(comp) => {
1018 list_comprehension_value_streaming(comp.as_ref(), record, ctx)
1019 }
1020 Expression::FunctionCall(func) => match func.name.to_lowercase().as_str() {
1021 "vec_similarity" => {
1022 let Some(left) = func
1023 .arguments
1024 .first()
1025 .map(|arg| evaluate_expression_value_streaming(arg, record, ctx))
1026 else {
1027 return Value::Null;
1028 };
1029 let Some(right) = func
1030 .arguments
1031 .get(1)
1032 .map(|arg| evaluate_expression_value_streaming(arg, record, ctx))
1033 else {
1034 return Value::Null;
1035 };
1036 let Some(left_vec) = value_to_vector(&left) else {
1037 return Value::Null;
1038 };
1039 let Some(right_vec) = value_to_vector(&right) else {
1040 return Value::Null;
1041 };
1042 let Some(sim) = cosine_similarity(&left_vec, &right_vec) else {
1043 return Value::Null;
1044 };
1045 Value::Float(sim as f64)
1046 }
1047 "txt_score" => {
1048 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
1049 {
1050 let Some(Expression::PropertyAccess(pa)) = func.arguments.first() else {
1051 return Value::Null;
1052 };
1053 let Some(Value::Node(node_id)) = record.get(&pa.variable) else {
1054 return Value::Null;
1055 };
1056 let Some(query_expr) = func.arguments.get(1) else {
1057 return Value::Null;
1058 };
1059 let Value::String(query) =
1060 evaluate_expression_value_streaming(query_expr, record, ctx)
1061 else {
1062 return Value::Null;
1063 };
1064 Value::Float(ctx.db.fts_txt_score(*node_id, &pa.property, &query))
1065 }
1066 #[cfg(not(all(feature = "fts", not(target_arch = "wasm32"))))]
1067 {
1068 Value::Float(0.0)
1069 }
1070 }
1071 "id" => match func
1072 .arguments
1073 .first()
1074 .map(|arg| evaluate_expression_value_streaming(arg, record, ctx))
1075 {
1076 Some(Value::Node(id)) => Value::Float(id as f64),
1077 _ => Value::Null,
1078 },
1079 "type" => match func
1080 .arguments
1081 .first()
1082 .map(|arg| evaluate_expression_value_streaming(arg, record, ctx))
1083 {
1084 Some(Value::Relationship(triple)) => relationship_type_value(&ctx.db, &triple),
1085 _ => Value::Null,
1086 },
1087 "labels" => match func
1088 .arguments
1089 .first()
1090 .map(|arg| evaluate_expression_value_streaming(arg, record, ctx))
1091 {
1092 Some(Value::Node(id)) => node_labels_value(&ctx.db, id),
1093 _ => Value::Null,
1094 },
1095 "keys" => match func
1096 .arguments
1097 .first()
1098 .map(|arg| evaluate_expression_value_streaming(arg, record, ctx))
1099 {
1100 Some(Value::Node(id)) => node_property_keys_value(&ctx.db, id),
1101 Some(Value::Relationship(triple)) => edge_property_keys_value(&ctx.db, &triple),
1102 _ => Value::Null,
1103 },
1104 "size" => match func
1105 .arguments
1106 .first()
1107 .map(|arg| evaluate_expression_value_streaming(arg, record, ctx))
1108 {
1109 Some(Value::String(s)) => Value::Float(s.len() as f64),
1110 _ => Value::Null,
1111 },
1112 "toupper" => match func
1113 .arguments
1114 .first()
1115 .map(|arg| evaluate_expression_value_streaming(arg, record, ctx))
1116 {
1117 Some(Value::String(s)) => Value::String(s.to_uppercase()),
1118 _ => Value::Null,
1119 },
1120 "tolower" => match func
1121 .arguments
1122 .first()
1123 .map(|arg| evaluate_expression_value_streaming(arg, record, ctx))
1124 {
1125 Some(Value::String(s)) => Value::String(s.to_lowercase()),
1126 _ => Value::Null,
1127 },
1128 "trim" => match func
1129 .arguments
1130 .first()
1131 .map(|arg| evaluate_expression_value_streaming(arg, record, ctx))
1132 {
1133 Some(Value::String(s)) => Value::String(s.trim().to_string()),
1134 _ => Value::Null,
1135 },
1136 "coalesce" => {
1137 for arg in &func.arguments {
1138 let v = evaluate_expression_value_streaming(arg, record, ctx);
1139 if !matches!(v, Value::Null) {
1140 return v;
1141 }
1142 }
1143 Value::Null
1144 }
1145 _ => Value::Null,
1146 },
1147 _ => Value::Null,
1148 }
1149}
1150
1151fn list_literal_value_streaming(
1152 elements: &[Expression],
1153 record: &Record,
1154 ctx: &ArcExecutionContext,
1155) -> Value {
1156 let json = serde_json::Value::Array(
1157 elements
1158 .iter()
1159 .map(|e| executor_value_to_json(&evaluate_expression_value_streaming(e, record, ctx)))
1160 .collect(),
1161 );
1162 Value::String(json.to_string())
1163}
1164
1165fn list_comprehension_value_streaming(
1166 comp: &ListComprehension,
1167 record: &Record,
1168 ctx: &ArcExecutionContext,
1169) -> Value {
1170 let Some(items) = evaluate_list_source_streaming(&comp.list, record, ctx) else {
1171 return Value::Null;
1172 };
1173
1174 let mut out = Vec::new();
1175 for item in items {
1176 let mut scoped = record.clone();
1177 scoped.insert(comp.variable.clone(), item);
1178
1179 if let Some(where_expr) = &comp.where_expression
1180 && !evaluate_expression_streaming(where_expr, &scoped, ctx)
1181 {
1182 continue;
1183 }
1184
1185 let mapped = match &comp.map_expression {
1186 Some(expr) => evaluate_expression_value_streaming(expr, &scoped, ctx),
1187 None => scoped.get(&comp.variable).cloned().unwrap_or(Value::Null),
1188 };
1189 out.push(executor_value_to_json(&mapped));
1190 }
1191
1192 Value::String(serde_json::Value::Array(out).to_string())
1193}
1194
1195fn evaluate_list_source_streaming(
1196 expr: &Expression,
1197 record: &Record,
1198 ctx: &ArcExecutionContext,
1199) -> Option<Vec<Value>> {
1200 match expr {
1201 Expression::List(elements) => Some(
1202 elements
1203 .iter()
1204 .map(|e| evaluate_expression_value_streaming(e, record, ctx))
1205 .collect(),
1206 ),
1207 _ => match evaluate_expression_value_streaming(expr, record, ctx) {
1208 Value::String(s) => parse_executor_list_string(&s),
1209 Value::Vector(v) => Some(v.iter().map(|f| Value::Float(*f as f64)).collect()),
1210 _ => None,
1211 },
1212 }
1213}
1214
1215fn executor_value_to_json(value: &Value) -> serde_json::Value {
1216 match value {
1217 Value::Null => serde_json::Value::Null,
1218 Value::Boolean(b) => serde_json::Value::Bool(*b),
1219 Value::Float(f) => serde_json::Number::from_f64(*f)
1220 .map(serde_json::Value::Number)
1221 .unwrap_or(serde_json::Value::Null),
1222 Value::String(s) => serde_json::Value::String(s.clone()),
1223 Value::Vector(v) => serde_json::Value::Array(
1224 v.iter()
1225 .map(|f| {
1226 serde_json::Number::from_f64(*f as f64)
1227 .map(serde_json::Value::Number)
1228 .unwrap_or(serde_json::Value::Null)
1229 })
1230 .collect(),
1231 ),
1232 Value::Node(id) => serde_json::Value::Number(serde_json::Number::from(*id)),
1233 Value::Relationship(triple) => serde_json::Value::String(format!("{triple:?}")),
1234 }
1235}
1236
1237fn json_to_executor_value(value: &serde_json::Value) -> Value {
1238 match value {
1239 serde_json::Value::Null => Value::Null,
1240 serde_json::Value::Bool(b) => Value::Boolean(*b),
1241 serde_json::Value::Number(n) => Value::Float(n.as_f64().unwrap_or(0.0)),
1242 serde_json::Value::String(s) => Value::String(s.clone()),
1243 _ => Value::Null,
1244 }
1245}
1246
1247fn parse_executor_list_string(input: &str) -> Option<Vec<Value>> {
1248 let json = serde_json::from_str::<serde_json::Value>(input).ok()?;
1249 let serde_json::Value::Array(items) = json else {
1250 return None;
1251 };
1252 Some(items.iter().map(json_to_executor_value).collect())
1253}
1254
1255fn parse_executor_vector_string(input: &str) -> Option<Vec<f32>> {
1256 let json = serde_json::from_str::<serde_json::Value>(input).ok()?;
1257 let serde_json::Value::Array(items) = json else {
1258 return None;
1259 };
1260 let mut out = Vec::with_capacity(items.len());
1261 for item in items {
1262 out.push(item.as_f64()? as f32);
1263 }
1264 Some(out)
1265}
1266
1267fn value_to_vector(value: &Value) -> Option<Vec<f32>> {
1268 match value {
1269 Value::Vector(v) => Some(v.clone()),
1270 Value::String(s) => parse_executor_vector_string(s),
1271 _ => None,
1272 }
1273}
1274
1275fn cosine_similarity(a: &[f32], b: &[f32]) -> Option<f32> {
1276 if a.is_empty() || a.len() != b.len() {
1277 return None;
1278 }
1279 let mut dot = 0.0f32;
1280 let mut norm_a = 0.0f32;
1281 let mut norm_b = 0.0f32;
1282 for (x, y) in a.iter().zip(b.iter()) {
1283 dot += x * y;
1284 norm_a += x * x;
1285 norm_b += y * y;
1286 }
1287 let denom = norm_a.sqrt() * norm_b.sqrt();
1288 if denom == 0.0 {
1289 return None;
1290 }
1291 Some(dot / denom)
1292}
1293
1294fn value_in_list(needle: &Value, haystack: &Value) -> Value {
1295 match haystack {
1296 Value::String(input) => {
1297 let Some(items) = parse_executor_list_string(input) else {
1298 return Value::Null;
1299 };
1300 Value::Boolean(items.iter().any(|v| v == needle))
1301 }
1302 Value::Vector(items) => match needle {
1303 Value::Float(f) => Value::Boolean(items.iter().any(|v| (*v as f64) == *f)),
1304 _ => Value::Null,
1305 },
1306 _ => Value::Null,
1307 }
1308}
1309
1310fn unwind_value_to_list(value: Value) -> Result<Vec<Value>, Error> {
1311 match value {
1312 Value::Null => Ok(Vec::new()),
1313 Value::String(input) => parse_executor_list_string(&input)
1314 .ok_or_else(|| Error::Other("UNWIND expects a list expression".to_string())),
1315 Value::Vector(items) => Ok(items.into_iter().map(|f| Value::Float(f as f64)).collect()),
1316 _ => Err(Error::Other("UNWIND expects a list expression".to_string())),
1317 }
1318}
1319
1320fn unwind_values_streaming(
1321 expr: &Expression,
1322 record: &Record,
1323 ctx: &ArcExecutionContext,
1324) -> Result<Vec<Value>, Error> {
1325 match expr {
1326 Expression::List(elements) => Ok(elements
1327 .iter()
1328 .map(|e| evaluate_expression_value_streaming(e, record, ctx))
1329 .collect()),
1330 _ => unwind_value_to_list(evaluate_expression_value_streaming(expr, record, ctx)),
1331 }
1332}
1333
1334fn unwind_values(
1335 expr: &Expression,
1336 record: &Record,
1337 ctx: &ExecutionContext,
1338) -> Result<Vec<Value>, Error> {
1339 match expr {
1340 Expression::List(elements) => Ok(elements
1341 .iter()
1342 .map(|e| evaluate_expression_value(e, record, ctx))
1343 .collect()),
1344 _ => unwind_value_to_list(evaluate_expression_value(expr, record, ctx)),
1345 }
1346}
1347
1348fn evaluate_exists_streaming(
1349 exists_expr: &ExistsExpression,
1350 record: &Record,
1351 ctx: &ArcExecutionContext,
1352) -> bool {
1353 match exists_expr {
1354 ExistsExpression::Pattern(pattern) => {
1355 exists_match_pattern_streaming(pattern, None, record, ctx)
1356 }
1357 ExistsExpression::Subquery(query) => {
1358 let (pattern, where_expr) = match extract_exists_match_query(query) {
1359 Some(v) => v,
1360 None => return false,
1361 };
1362 exists_match_pattern_streaming(pattern, where_expr, record, ctx)
1363 }
1364 }
1365}
1366
1367fn exists_match_pattern_streaming(
1368 pattern: &Pattern,
1369 where_expr: Option<&Expression>,
1370 outer_record: &Record,
1371 ctx: &ArcExecutionContext,
1372) -> bool {
1373 let Some(PathElement::Node(start_node)) = pattern.elements.first() else {
1374 return false;
1375 };
1376
1377 if let Some(var) = &start_node.variable
1378 && let Some(Value::Node(start_id)) = outer_record.get(var)
1379 {
1380 if !node_satisfies_streaming(*start_id, start_node, outer_record, ctx) {
1381 return false;
1382 }
1383 return exists_path_from_node_streaming(
1384 pattern,
1385 0,
1386 *start_id,
1387 outer_record,
1388 where_expr,
1389 ctx,
1390 );
1391 }
1392
1393 exists_uncorrelated_match_streaming(pattern, where_expr, ctx)
1395}
1396
1397fn exists_uncorrelated_match_streaming(
1398 pattern: &Pattern,
1399 where_expr: Option<&Expression>,
1400 ctx: &ArcExecutionContext,
1401) -> bool {
1402 use crate::query::ast::{MatchClause, Query, ReturnClause, ReturnItem, WhereClause};
1403 use crate::query::planner::QueryPlanner;
1404
1405 let mut clauses: Vec<Clause> = Vec::new();
1406 clauses.push(Clause::Match(MatchClause {
1407 optional: false,
1408 pattern: pattern.clone(),
1409 }));
1410 if let Some(expr) = where_expr.cloned() {
1411 clauses.push(Clause::Where(WhereClause { expression: expr }));
1412 }
1413 clauses.push(Clause::Return(ReturnClause {
1414 distinct: false,
1415 items: vec![ReturnItem {
1416 expression: Expression::Literal(Literal::Integer(1)),
1417 alias: Some("_exists".to_string()),
1418 }],
1419 order_by: None,
1420 limit: Some(1),
1421 skip: None,
1422 }));
1423
1424 let planner = QueryPlanner::new();
1425 let plan = match planner.plan(Query { clauses }) {
1426 Ok(plan) => plan,
1427 Err(_) => return false,
1428 };
1429
1430 let exec_ctx = ExecutionContext {
1431 db: ctx.db.as_ref(),
1432 params: ctx.params.as_ref(),
1433 };
1434 match plan.execute(&exec_ctx) {
1435 Ok(mut iter) => iter.next().is_some(),
1436 Err(_) => false,
1437 }
1438}
1439
1440fn exists_path_from_node_streaming(
1441 pattern: &Pattern,
1442 node_index: usize,
1443 current_node_id: u64,
1444 bindings: &Record,
1445 where_expr: Option<&Expression>,
1446 ctx: &ArcExecutionContext,
1447) -> bool {
1448 let next_rel_index = node_index + 1;
1449 let next_node_index = node_index + 2;
1450
1451 if next_node_index >= pattern.elements.len() {
1452 return where_expr.is_none_or(|expr| evaluate_expression_streaming(expr, bindings, ctx));
1453 }
1454
1455 let PathElement::Relationship(rel) = &pattern.elements[next_rel_index] else {
1456 return false;
1457 };
1458 let PathElement::Node(next_node) = &pattern.elements[next_node_index] else {
1459 return false;
1460 };
1461
1462 if rel.variable_length.is_some() {
1463 return exists_var_length_step_streaming(
1464 pattern,
1465 next_node_index,
1466 current_node_id,
1467 rel,
1468 bindings,
1469 where_expr,
1470 ctx,
1471 );
1472 }
1473
1474 for (triple, end_id) in iter_matching_edges(ctx.db.as_ref(), current_node_id, rel) {
1475 let mut new_record = bindings.clone();
1476
1477 if let Some(rel_var) = &rel.variable {
1478 match new_record.get(rel_var) {
1479 Some(Value::Relationship(existing)) if existing == &triple => {}
1480 Some(_) => continue,
1481 None => new_record.insert(rel_var.clone(), Value::Relationship(triple)),
1482 }
1483 }
1484
1485 if let Some(props) = &rel.properties
1486 && !edge_satisfies_streaming(&triple, props, &new_record, ctx)
1487 {
1488 continue;
1489 }
1490
1491 if !node_satisfies_streaming(end_id, next_node, &new_record, ctx) {
1492 continue;
1493 }
1494
1495 if let Some(node_var) = &next_node.variable {
1496 match new_record.get(node_var) {
1497 Some(Value::Node(existing)) if *existing == end_id => {}
1498 Some(_) => continue,
1499 None => new_record.insert(node_var.clone(), Value::Node(end_id)),
1500 }
1501 }
1502
1503 if exists_path_from_node_streaming(
1504 pattern,
1505 next_node_index,
1506 end_id,
1507 &new_record,
1508 where_expr,
1509 ctx,
1510 ) {
1511 return true;
1512 }
1513 }
1514
1515 false
1516}
1517
1518fn exists_var_length_step_streaming(
1519 pattern: &Pattern,
1520 next_node_index: usize,
1521 current_node_id: u64,
1522 rel: &RelationshipPattern,
1523 bindings: &Record,
1524 where_expr: Option<&Expression>,
1525 ctx: &ArcExecutionContext,
1526) -> bool {
1527 let PathElement::Node(next_node) = &pattern.elements[next_node_index] else {
1528 return false;
1529 };
1530 if rel.variable.is_some() || rel.properties.is_some() {
1531 return false;
1532 }
1533 let Some(var_len) = &rel.variable_length else {
1534 return false;
1535 };
1536 let min_hops = var_len.min.unwrap_or(1);
1537 let Some(max_hops) = var_len.max else {
1538 return false;
1539 };
1540
1541 if rel.types.len() > 1 {
1542 return false;
1543 }
1544
1545 let rel_predicate_id = rel
1546 .types
1547 .first()
1548 .and_then(|t| ctx.db.resolve_id(t).ok().flatten());
1549
1550 let reachable = find_reachable_nodes(
1551 ctx.db.as_ref(),
1552 current_node_id,
1553 rel.direction.clone(),
1554 rel_predicate_id,
1555 min_hops,
1556 max_hops,
1557 );
1558
1559 for end_id in reachable {
1560 let mut new_record = bindings.clone();
1561
1562 if !node_satisfies_streaming(end_id, next_node, &new_record, ctx) {
1563 continue;
1564 }
1565
1566 if let Some(node_var) = &next_node.variable {
1567 match new_record.get(node_var) {
1568 Some(Value::Node(existing)) if *existing == end_id => {}
1569 Some(_) => continue,
1570 None => new_record.insert(node_var.clone(), Value::Node(end_id)),
1571 }
1572 }
1573
1574 if exists_path_from_node_streaming(
1575 pattern,
1576 next_node_index,
1577 end_id,
1578 &new_record,
1579 where_expr,
1580 ctx,
1581 ) {
1582 return true;
1583 }
1584 }
1585
1586 false
1587}
1588
1589fn node_satisfies_streaming(
1590 node_id: u64,
1591 node: &crate::query::ast::NodePattern,
1592 bindings: &Record,
1593 ctx: &ArcExecutionContext,
1594) -> bool {
1595 if let Some(var) = &node.variable
1596 && let Some(Value::Node(bound)) = bindings.get(var)
1597 && *bound != node_id
1598 {
1599 return false;
1600 }
1601
1602 if !node.labels.is_empty() {
1603 let Some(type_id) = ctx.db.resolve_id("type").ok().flatten() else {
1604 return false;
1605 };
1606 for label in &node.labels {
1607 let Some(label_id) = ctx.db.resolve_id(label).ok().flatten() else {
1608 return false;
1609 };
1610 let criteria = QueryCriteria {
1611 subject_id: Some(node_id),
1612 predicate_id: Some(type_id),
1613 object_id: Some(label_id),
1614 };
1615 if ctx.db.query(criteria).next().is_none() {
1616 return false;
1617 }
1618 }
1619 }
1620
1621 if let Some(props) = &node.properties
1622 && !node_properties_match_streaming(node_id, props, bindings, ctx)
1623 {
1624 return false;
1625 }
1626
1627 true
1628}
1629
1630fn node_properties_match_streaming(
1631 node_id: u64,
1632 props: &PropertyMap,
1633 bindings: &Record,
1634 ctx: &ArcExecutionContext,
1635) -> bool {
1636 if props.properties.is_empty() {
1637 return true;
1638 }
1639 let Ok(Some(binary)) = ctx.db.get_node_property_binary(node_id) else {
1640 return false;
1641 };
1642 let Ok(stored) = crate::storage::property::deserialize_properties(&binary) else {
1643 return false;
1644 };
1645
1646 for pair in &props.properties {
1647 let expected = evaluate_expression_value_streaming(&pair.value, bindings, ctx);
1648 let Some(actual) = stored.get(&pair.key) else {
1649 return false;
1650 };
1651 if !json_value_matches_executor_value(actual, &expected) {
1652 return false;
1653 }
1654 }
1655
1656 true
1657}
1658
1659fn edge_satisfies_streaming(
1660 triple: &Triple,
1661 props: &PropertyMap,
1662 bindings: &Record,
1663 ctx: &ArcExecutionContext,
1664) -> bool {
1665 if props.properties.is_empty() {
1666 return true;
1667 }
1668 let Ok(Some(binary)) =
1669 ctx.db
1670 .get_edge_property_binary(triple.subject_id, triple.predicate_id, triple.object_id)
1671 else {
1672 return false;
1673 };
1674 let Ok(stored) = crate::storage::property::deserialize_properties(&binary) else {
1675 return false;
1676 };
1677
1678 for pair in &props.properties {
1679 let expected = evaluate_expression_value_streaming(&pair.value, bindings, ctx);
1680 let Some(actual) = stored.get(&pair.key) else {
1681 return false;
1682 };
1683 if !json_value_matches_executor_value(actual, &expected) {
1684 return false;
1685 }
1686 }
1687
1688 true
1689}
1690
1691fn json_value_matches_executor_value(actual: &serde_json::Value, expected: &Value) -> bool {
1692 match expected {
1693 Value::Null => actual.is_null(),
1694 Value::String(s) => actual.as_str() == Some(s.as_str()),
1695 Value::Boolean(b) => actual.as_bool() == Some(*b),
1696 Value::Float(f) => actual.as_f64().map(|v| v == *f).unwrap_or(false),
1697 _ => false,
1698 }
1699}
1700
1701fn iter_matching_edges(
1702 db: &Database,
1703 node_id: u64,
1704 rel: &RelationshipPattern,
1705) -> Vec<(Triple, u64)> {
1706 let predicate_ids: Vec<u64> = rel
1707 .types
1708 .iter()
1709 .filter_map(|t| db.resolve_id(t).ok().flatten())
1710 .collect();
1711
1712 let predicate_ids: Vec<Option<u64>> = if predicate_ids.is_empty() && !rel.types.is_empty() {
1713 return Vec::new();
1715 } else if predicate_ids.is_empty() {
1716 vec![None]
1717 } else {
1718 predicate_ids.into_iter().map(Some).collect()
1719 };
1720
1721 let mut out = Vec::new();
1722
1723 for pred in predicate_ids {
1724 match rel.direction {
1725 RelationshipDirection::LeftToRight => {
1726 let criteria = QueryCriteria {
1727 subject_id: Some(node_id),
1728 predicate_id: pred,
1729 object_id: None,
1730 };
1731 out.extend(db.query(criteria).map(|t| (t, t.object_id)));
1732 }
1733 RelationshipDirection::RightToLeft => {
1734 let criteria = QueryCriteria {
1735 subject_id: None,
1736 predicate_id: pred,
1737 object_id: Some(node_id),
1738 };
1739 out.extend(db.query(criteria).map(|t| (t, t.subject_id)));
1740 }
1741 RelationshipDirection::Undirected => {
1742 let out_criteria = QueryCriteria {
1743 subject_id: Some(node_id),
1744 predicate_id: pred,
1745 object_id: None,
1746 };
1747 out.extend(db.query(out_criteria).map(|t| (t, t.object_id)));
1748
1749 let in_criteria = QueryCriteria {
1750 subject_id: None,
1751 predicate_id: pred,
1752 object_id: Some(node_id),
1753 };
1754 out.extend(db.query(in_criteria).map(|t| (t, t.subject_id)));
1755 }
1756 }
1757 }
1758
1759 out
1760}
1761
1762fn extract_exists_match_query(
1763 query: &crate::query::ast::Query,
1764) -> Option<(&Pattern, Option<&Expression>)> {
1765 use crate::query::ast::{Clause, WhereClause};
1766
1767 let mut match_pattern: Option<&Pattern> = None;
1768 let mut where_expr: Option<&Expression> = None;
1769
1770 for clause in &query.clauses {
1771 match clause {
1772 Clause::Match(m) => match_pattern = Some(&m.pattern),
1773 Clause::Where(WhereClause { expression }) => where_expr = Some(expression),
1774 Clause::Return(_) => {}
1775 _ => return None,
1776 }
1777 }
1778
1779 match_pattern.map(|p| (p, where_expr))
1780}
1781fn json_array_string(mut values: Vec<String>) -> Value {
1782 values.sort();
1783 let json =
1784 serde_json::Value::Array(values.into_iter().map(serde_json::Value::String).collect());
1785 Value::String(json.to_string())
1786}
1787
1788fn relationship_type_value(db: &Database, triple: &Triple) -> Value {
1789 match db.resolve_str(triple.predicate_id).ok().flatten() {
1790 Some(s) => Value::String(s),
1791 None => Value::Null,
1792 }
1793}
1794
1795fn node_labels_value(db: &Database, node_id: u64) -> Value {
1796 let Some(type_id) = db.resolve_id("type").ok().flatten() else {
1797 return Value::String("[]".to_string());
1798 };
1799
1800 let criteria = QueryCriteria {
1801 subject_id: Some(node_id),
1802 predicate_id: Some(type_id),
1803 object_id: None,
1804 };
1805
1806 let labels: Vec<String> = db
1807 .query(criteria)
1808 .filter_map(|t| db.resolve_str(t.object_id).ok().flatten())
1809 .collect();
1810
1811 json_array_string(labels)
1812}
1813
1814fn node_property_keys_value(db: &Database, node_id: u64) -> Value {
1815 let keys: Vec<String> = db
1816 .get_node_property_binary(node_id)
1817 .ok()
1818 .flatten()
1819 .and_then(|binary| crate::storage::property::deserialize_properties(&binary).ok())
1820 .map(|props| props.keys().cloned().collect())
1821 .unwrap_or_default();
1822
1823 json_array_string(keys)
1824}
1825
1826fn edge_property_keys_value(db: &Database, triple: &Triple) -> Value {
1827 let keys: Vec<String> = db
1828 .get_edge_property_binary(triple.subject_id, triple.predicate_id, triple.object_id)
1829 .ok()
1830 .flatten()
1831 .and_then(|binary| crate::storage::property::deserialize_properties(&binary).ok())
1832 .map(|props| props.keys().cloned().collect())
1833 .unwrap_or_default();
1834
1835 json_array_string(keys)
1836}
1837
1838impl ExecutionPlan for SingleRowNode {
1843 fn execute<'a>(
1844 &'a self,
1845 _ctx: &'a ExecutionContext<'a>,
1846 ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
1847 Ok(Box::new(std::iter::once(Ok(Record::new()))))
1848 }
1849
1850 fn estimate_cardinality(&self, _ctx: &ExecutionContext) -> usize {
1851 1
1852 }
1853}
1854
1855impl ExecutionPlan for DistinctNode {
1856 fn execute<'a>(
1857 &'a self,
1858 ctx: &'a ExecutionContext<'a>,
1859 ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
1860 let input_iter = self.input.execute(ctx)?;
1861 let mut seen: HashSet<String> = HashSet::new();
1862 Ok(Box::new(input_iter.filter_map(
1863 move |result| match result {
1864 Ok(record) => {
1865 let key = record_distinct_key(&record);
1866 if seen.insert(key) {
1867 Some(Ok(record))
1868 } else {
1869 None
1870 }
1871 }
1872 Err(err) => Some(Err(err)),
1873 },
1874 )))
1875 }
1876
1877 fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
1878 self.input.estimate_cardinality(ctx)
1879 }
1880}
1881
1882impl ExecutionPlan for UnwindNode {
1883 fn execute<'a>(
1884 &'a self,
1885 ctx: &'a ExecutionContext<'a>,
1886 ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
1887 let input_iter = self.input.execute(ctx)?;
1888 let expression = self.expression.clone();
1889 let alias = self.alias.clone();
1890 Ok(Box::new(input_iter.flat_map(move |result| {
1891 match result {
1892 Ok(record) => match unwind_values(&expression, &record, ctx) {
1893 Ok(values) => values
1894 .into_iter()
1895 .map(|value| {
1896 let mut new_record = record.clone();
1897 new_record.insert(alias.clone(), value);
1898 Ok(new_record)
1899 })
1900 .collect::<Vec<_>>(),
1901 Err(err) => vec![Err(err)],
1902 },
1903 Err(err) => vec![Err(err)],
1904 }
1905 })))
1906 }
1907
1908 fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
1909 self.input.estimate_cardinality(ctx)
1910 }
1911}
1912
1913impl ExecutionPlan for ScanNode {
1914 fn execute<'a>(
1915 &'a self,
1916 ctx: &'a ExecutionContext<'a>,
1917 ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
1918 let alias = self.alias.clone();
1919
1920 if self.labels.is_empty() {
1921 Ok(Box::new(scan_all_nodes_optimized(ctx.db, alias)))
1923 } else {
1924 Ok(Box::new(scan_labeled_nodes_optimized(
1926 ctx.db,
1927 &self.labels,
1928 alias,
1929 )))
1930 }
1931 }
1932
1933 fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
1934 ScanStats::estimate_scan_cardinality(ctx.db, &self.labels).estimated_cardinality
1935 }
1936}
1937
1938impl ExecutionPlan for FtsCandidateScanNode {
1939 fn execute<'a>(
1940 &'a self,
1941 ctx: &'a ExecutionContext<'a>,
1942 ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
1943 let Some(query) = resolve_query_string(&self.query, ctx) else {
1944 return Ok(Box::new(std::iter::empty()));
1945 };
1946 if query.is_empty() || self.property.is_empty() {
1947 return Ok(Box::new(std::iter::empty()));
1948 }
1949
1950 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
1951 let Some(scores) = ctx
1952 .db
1953 .fts_scores_for_query(self.property.as_str(), query.as_str())
1954 else {
1955 return Ok(Box::new(std::iter::empty()));
1956 };
1957 #[cfg(not(all(feature = "fts", not(target_arch = "wasm32"))))]
1958 let scores: std::sync::Arc<std::collections::HashMap<u64, f32>> =
1959 std::sync::Arc::new(std::collections::HashMap::new());
1960
1961 let alias = self.alias.clone();
1962 let db = ctx.db;
1963 let type_and_labels = resolve_label_ids(db, &self.labels);
1964 let candidate_ids: Vec<u64> = scores.keys().copied().collect();
1965
1966 Ok(Box::new(candidate_ids.into_iter().filter_map(
1967 move |node_id| {
1968 if let Some((type_id, label_ids)) = type_and_labels.as_ref()
1969 && !node_has_labels(db, node_id, *type_id, label_ids)
1970 {
1971 return None;
1972 }
1973
1974 let mut record = Record::new();
1975 record.insert(alias.clone(), Value::Node(node_id));
1976 Some(Ok(record))
1977 },
1978 )))
1979 }
1980
1981 fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
1982 let scan_est = ScanStats::estimate_scan_cardinality(ctx.db, &self.labels)
1983 .estimated_cardinality
1984 .max(1);
1985 scan_est.min(10_000)
1986 }
1987}
1988
1989impl ExecutionPlan for VectorTopKScanNode {
1990 fn execute<'a>(
1991 &'a self,
1992 ctx: &'a ExecutionContext<'a>,
1993 ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
1994 fn fallback<'a>(
1995 node: &VectorTopKScanNode,
1996 ctx: &'a ExecutionContext<'a>,
1997 limit: usize,
1998 ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
1999 let alias = node.alias.clone();
2000 let property = node.property.clone();
2001 let query_expr = node.query.clone();
2002
2003 let sort_expr = Expression::FunctionCall(FunctionCall {
2004 name: "vec_similarity".to_string(),
2005 arguments: vec![
2006 Expression::PropertyAccess(PropertyAccess {
2007 variable: alias.clone(),
2008 property,
2009 }),
2010 query_expr,
2011 ],
2012 });
2013
2014 let mut records = Vec::new();
2015 if node.labels.is_empty() {
2016 for item in scan_all_nodes_optimized(ctx.db, alias.clone()) {
2017 records.push(item?);
2018 }
2019 } else {
2020 let labels = node.labels.clone();
2021 for item in scan_labeled_nodes_optimized(ctx.db, &labels, alias.clone()) {
2022 records.push(item?);
2023 }
2024 }
2025 records.sort_by(|a, b| {
2026 let val_a = evaluate_expression_value(&sort_expr, a, ctx);
2027 let val_b = evaluate_expression_value(&sort_expr, b, ctx);
2028 compare_values_for_sort(&val_a, &val_b, &Direction::Descending)
2029 });
2030 records.truncate(limit);
2031 Ok(Box::new(records.into_iter().map(Ok)))
2032 }
2033
2034 let limit = usize::try_from(self.limit).unwrap_or(usize::MAX);
2035 if limit == 0 || self.property.is_empty() {
2036 return Ok(Box::new(std::iter::empty()));
2037 }
2038
2039 if !self.labels.is_empty() {
2041 return fallback(self, ctx, limit);
2042 }
2043
2044 #[cfg(all(feature = "vector", not(target_arch = "wasm32")))]
2045 {
2046 let query_value = evaluate_expression_value(&self.query, &Record::new(), ctx);
2047 let Some(query_vec) = value_to_vector(&query_value) else {
2048 return fallback(self, ctx, limit);
2049 };
2050
2051 let Some(config) = ctx.db.vector_index_config() else {
2052 return fallback(self, ctx, limit);
2053 };
2054
2055 let metric = config.metric.to_lowercase();
2056 if config.property != self.property
2057 || query_vec.len() != config.dim
2058 || !(metric == "cosine" || metric == "cos")
2059 {
2060 return fallback(self, ctx, limit);
2061 }
2062
2063 let hits = match ctx.db.vector_search(&query_vec, limit) {
2064 Ok(h) => h,
2065 Err(_) => return fallback(self, ctx, limit),
2066 };
2067
2068 let alias = self.alias.clone();
2071 let property_expr = Expression::PropertyAccess(PropertyAccess {
2072 variable: alias.clone(),
2073 property: self.property.clone(),
2074 });
2075 let mut scored: Vec<(u64, Option<f32>)> = Vec::with_capacity(hits.len());
2076 for (node_id, _) in hits {
2077 let mut record = Record::new();
2078 record.insert(alias.clone(), Value::Node(node_id));
2079
2080 let value = evaluate_expression_value(&property_expr, &record, ctx);
2081 let score = value_to_vector(&value).and_then(|v| cosine_similarity(&v, &query_vec));
2082 scored.push((node_id, score));
2083 }
2084
2085 scored.sort_by(|(id_a, s_a), (id_b, s_b)| match (s_a, s_b) {
2086 (Some(a), Some(b)) => b
2087 .partial_cmp(a)
2088 .unwrap_or(std::cmp::Ordering::Equal)
2089 .then_with(|| id_a.cmp(id_b)),
2090 (Some(_), None) => std::cmp::Ordering::Less,
2091 (None, Some(_)) => std::cmp::Ordering::Greater,
2092 (None, None) => id_a.cmp(id_b),
2093 });
2094
2095 let candidate_ids: Vec<u64> = scored.into_iter().map(|(id, _)| id).collect();
2096 return Ok(Box::new(candidate_ids.into_iter().map(move |node_id| {
2097 let mut record = Record::new();
2098 record.insert(alias.clone(), Value::Node(node_id));
2099 Ok(record)
2100 })));
2101 }
2102 #[cfg(not(all(feature = "vector", not(target_arch = "wasm32"))))]
2103 {
2104 fallback(self, ctx, limit)
2105 }
2106 }
2107
2108 fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
2109 let scan_est = ScanStats::estimate_scan_cardinality(ctx.db, &self.labels)
2110 .estimated_cardinality
2111 .max(1);
2112 scan_est.min(self.limit as usize)
2113 }
2114}
2115
2116fn scan_all_nodes_optimized(
2118 db: &Database,
2119 alias: String,
2120) -> impl Iterator<Item = Result<Record, Error>> + '_ {
2121 let mut unique_nodes = HashSet::new();
2126
2127 let subject_criteria = QueryCriteria::default();
2129 for triple in db.query(subject_criteria).take(10000) {
2130 unique_nodes.insert(triple.subject_id);
2132 unique_nodes.insert(triple.object_id);
2133 }
2134
2135 unique_nodes.into_iter().map(move |node_id| {
2136 let mut record = Record::new();
2137 record.insert(alias.clone(), Value::Node(node_id));
2138 Ok(record)
2139 })
2140}
2141
2142fn scan_labeled_nodes_optimized<'a>(
2144 db: &'a Database,
2145 labels: &'a [String],
2146 alias: String,
2147) -> impl Iterator<Item = Result<Record, Error>> + 'a {
2148 let type_id = match db.resolve_id("type") {
2150 Ok(Some(id)) => id,
2151 _ => {
2152 return Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Result<Record, Error>>>;
2153 }
2154 };
2155
2156 let labels = labels.to_vec(); Box::new(std::iter::once(()).flat_map(move |_| {
2159 let mut label_node_sets: Vec<HashSet<u64>> = Vec::new();
2160
2161 for label in &labels {
2163 if let Ok(Some(label_id)) = db.resolve_id(label) {
2164 let criteria = QueryCriteria {
2165 subject_id: None,
2166 predicate_id: Some(type_id),
2167 object_id: Some(label_id),
2168 };
2169
2170 let nodes: HashSet<u64> =
2171 db.query(criteria).map(|triple| triple.subject_id).collect();
2172 label_node_sets.push(nodes);
2173 } else {
2174 label_node_sets.push(HashSet::new());
2176 }
2177 }
2178
2179 let final_nodes = if label_node_sets.is_empty() {
2181 HashSet::new()
2182 } else {
2183 label_node_sets
2184 .into_iter()
2185 .reduce(|acc, set| acc.intersection(&set).cloned().collect())
2186 .unwrap_or_default()
2187 };
2188
2189 let alias_clone = alias.clone();
2190 final_nodes.into_iter().map(move |node_id| {
2191 let mut record = Record::new();
2192 record.insert(alias_clone.clone(), Value::Node(node_id));
2193 Ok(record)
2194 })
2195 }))
2196}
2197
2198fn resolve_query_string(expr: &Expression, ctx: &ExecutionContext) -> Option<String> {
2199 match expr {
2200 Expression::Literal(Literal::String(s)) => Some(s.clone()),
2201 Expression::Parameter(name) => match ctx.params.get(name) {
2202 Some(Value::String(s)) => Some(s.clone()),
2203 _ => None,
2204 },
2205 _ => None,
2206 }
2207}
2208
2209fn resolve_label_ids(db: &Database, labels: &[String]) -> Option<(u64, Vec<u64>)> {
2210 if labels.is_empty() {
2211 return None;
2212 }
2213 let type_id = db.resolve_id("type").ok().flatten()?;
2214 let mut label_ids = Vec::with_capacity(labels.len());
2215 for label in labels {
2216 label_ids.push(db.resolve_id(label).ok().flatten()?);
2217 }
2218 Some((type_id, label_ids))
2219}
2220
2221fn node_has_labels(db: &Database, node_id: u64, type_id: u64, label_ids: &[u64]) -> bool {
2222 for label_id in label_ids {
2223 let criteria = QueryCriteria {
2224 subject_id: Some(node_id),
2225 predicate_id: Some(type_id),
2226 object_id: Some(*label_id),
2227 };
2228 if db.query(criteria).next().is_none() {
2229 return false;
2230 }
2231 }
2232 true
2233}
2234
2235#[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
2236fn resolve_query_string_streaming(expr: &Expression, ctx: &ArcExecutionContext) -> Option<String> {
2237 match expr {
2238 Expression::Literal(Literal::String(s)) => Some(s.clone()),
2239 Expression::Parameter(name) => match ctx.params.get(name) {
2240 Some(Value::String(s)) => Some(s.clone()),
2241 _ => None,
2242 },
2243 _ => None,
2244 }
2245}
2246
2247#[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
2248fn resolve_label_ids_streaming(db: &Arc<Database>, labels: &[String]) -> Option<(u64, Vec<u64>)> {
2249 if labels.is_empty() {
2250 return None;
2251 }
2252 let type_id = db.resolve_id("type").ok().flatten()?;
2253 let mut label_ids = Vec::with_capacity(labels.len());
2254 for label in labels {
2255 label_ids.push(db.resolve_id(label).ok().flatten()?);
2256 }
2257 Some((type_id, label_ids))
2258}
2259
2260#[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
2261fn node_has_labels_streaming(
2262 db: &Arc<Database>,
2263 node_id: u64,
2264 type_id: u64,
2265 label_ids: &[u64],
2266) -> bool {
2267 for label_id in label_ids {
2268 let criteria = QueryCriteria {
2269 subject_id: Some(node_id),
2270 predicate_id: Some(type_id),
2271 object_id: Some(*label_id),
2272 };
2273 if db.query(criteria).next().is_none() {
2274 return false;
2275 }
2276 }
2277 true
2278}
2279
2280impl ExecutionPlan for NestedLoopJoinNode {
2285 fn execute<'a>(
2286 &'a self,
2287 ctx: &'a ExecutionContext<'a>,
2288 ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
2289 let left_card = self.left.estimate_cardinality(ctx);
2291 let right_card = self.right.estimate_cardinality(ctx);
2292
2293 if left_card <= right_card {
2294 Ok(Box::new(IndexNestedLoopJoinIter::new(
2296 self.left.execute(ctx)?,
2297 &self.right,
2298 ctx,
2299 )))
2300 } else {
2301 Ok(Box::new(IndexNestedLoopJoinIter::new(
2303 self.right.execute(ctx)?,
2304 &self.left,
2305 ctx,
2306 )))
2307 }
2308 }
2309
2310 fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
2311 let left_card = self.left.estimate_cardinality(ctx);
2314 let right_card = self.right.estimate_cardinality(ctx);
2315 (left_card * right_card / 10).max(1)
2316 }
2317}
2318
2319struct IndexNestedLoopJoinIter<'a> {
2321 outer_iter: Box<dyn Iterator<Item = Result<Record, Error>> + 'a>,
2322 inner_plan: &'a PhysicalPlan,
2323 ctx: &'a ExecutionContext<'a>,
2324 current_outer: Option<Record>,
2325 current_inner: Option<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>>,
2326}
2327
2328impl<'a> IndexNestedLoopJoinIter<'a> {
2329 fn new(
2330 outer_iter: Box<dyn Iterator<Item = Result<Record, Error>> + 'a>,
2331 inner_plan: &'a PhysicalPlan,
2332 ctx: &'a ExecutionContext<'a>,
2333 ) -> Self {
2334 Self {
2335 outer_iter,
2336 inner_plan,
2337 ctx,
2338 current_outer: None,
2339 current_inner: None,
2340 }
2341 }
2342}
2343
2344impl<'a> Iterator for IndexNestedLoopJoinIter<'a> {
2345 type Item = Result<Record, Error>;
2346
2347 fn next(&mut self) -> Option<Self::Item> {
2348 loop {
2349 if let Some(ref mut inner_iter) = self.current_inner {
2351 if let Some(inner_result) = inner_iter.next() {
2352 match inner_result {
2353 Ok(inner_record) => {
2354 if let Some(ref outer_record) = self.current_outer {
2355 let mut joined = outer_record.clone();
2356 joined.merge(&inner_record);
2357 return Some(Ok(joined));
2358 }
2359 }
2360 Err(e) => return Some(Err(e)),
2361 }
2362 } else {
2363 self.current_inner = None;
2365 self.current_outer = None;
2366 }
2367 }
2368
2369 match self.outer_iter.next() {
2371 Some(Ok(outer_record)) => {
2372 match self.inner_plan.execute(self.ctx) {
2374 Ok(inner_iter) => {
2375 self.current_outer = Some(outer_record);
2376 self.current_inner = Some(inner_iter);
2377 }
2379 Err(e) => return Some(Err(e)),
2380 }
2381 }
2382 Some(Err(e)) => return Some(Err(e)),
2383 None => return None, }
2385 }
2386 }
2387}
2388
2389impl ExecutionPlan for LeftOuterJoinNode {
2390 fn execute<'a>(
2391 &'a self,
2392 ctx: &'a ExecutionContext<'a>,
2393 ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
2394 Ok(Box::new(LeftOuterJoinIter::new(
2395 self.left.execute(ctx)?,
2396 &self.right,
2397 &self.right_aliases,
2398 ctx,
2399 )))
2400 }
2401
2402 fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
2403 let left_card = self.left.estimate_cardinality(ctx);
2404 let right_card = self.right.estimate_cardinality(ctx);
2405 (left_card * right_card / 10).max(left_card).max(1)
2406 }
2407}
2408
2409struct LeftOuterJoinIter<'a> {
2410 outer_iter: Box<dyn Iterator<Item = Result<Record, Error>> + 'a>,
2411 inner_plan: &'a PhysicalPlan,
2412 right_aliases: &'a [String],
2413 ctx: &'a ExecutionContext<'a>,
2414 current_outer: Option<Record>,
2415 current_inner: Option<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>>,
2416 matched_current_outer: bool,
2417 emitted_null_current_outer: bool,
2418}
2419
2420impl<'a> LeftOuterJoinIter<'a> {
2421 fn new(
2422 outer_iter: Box<dyn Iterator<Item = Result<Record, Error>> + 'a>,
2423 inner_plan: &'a PhysicalPlan,
2424 right_aliases: &'a [String],
2425 ctx: &'a ExecutionContext<'a>,
2426 ) -> Self {
2427 Self {
2428 outer_iter,
2429 inner_plan,
2430 right_aliases,
2431 ctx,
2432 current_outer: None,
2433 current_inner: None,
2434 matched_current_outer: false,
2435 emitted_null_current_outer: false,
2436 }
2437 }
2438
2439 fn emit_null_row(&self, mut outer: Record) -> Record {
2440 for alias in self.right_aliases {
2441 outer.values.entry(alias.clone()).or_insert(Value::Null);
2442 }
2443 outer
2444 }
2445}
2446
2447impl<'a> Iterator for LeftOuterJoinIter<'a> {
2448 type Item = Result<Record, Error>;
2449
2450 fn next(&mut self) -> Option<Self::Item> {
2451 loop {
2452 if let Some(ref mut inner_iter) = self.current_inner {
2453 if let Some(inner_result) = inner_iter.next() {
2454 match inner_result {
2455 Ok(inner_record) => {
2456 let outer_record = self.current_outer.as_ref().unwrap().clone();
2457 if let Some(joined) = try_merge_records(outer_record, inner_record) {
2458 self.matched_current_outer = true;
2459 return Some(Ok(joined));
2460 }
2461 continue;
2462 }
2463 Err(e) => return Some(Err(e)),
2464 }
2465 }
2466
2467 self.current_inner = None;
2469 if !self.matched_current_outer && !self.emitted_null_current_outer {
2470 self.emitted_null_current_outer = true;
2471 let outer_record = self.current_outer.take().unwrap();
2472 return Some(Ok(self.emit_null_row(outer_record)));
2473 }
2474 self.current_outer = None;
2475 self.matched_current_outer = false;
2476 self.emitted_null_current_outer = false;
2477 continue;
2478 }
2479
2480 match self.outer_iter.next()? {
2481 Ok(outer_record) => match self.inner_plan.execute(self.ctx) {
2482 Ok(inner_iter) => {
2483 self.current_outer = Some(outer_record);
2484 self.current_inner = Some(inner_iter);
2485 self.matched_current_outer = false;
2486 self.emitted_null_current_outer = false;
2487 }
2488 Err(e) => return Some(Err(e)),
2489 },
2490 Err(e) => return Some(Err(e)),
2491 }
2492 }
2493 }
2494}
2495
2496impl ExecutionPlan for FilterNode {
2501 fn execute<'a>(
2502 &'a self,
2503 ctx: &'a ExecutionContext<'a>,
2504 ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
2505 let input_iter = self.input.execute(ctx)?;
2506 let predicate = self.predicate.clone();
2507
2508 Ok(Box::new(input_iter.filter_map(move |res| match res {
2509 Ok(record) => {
2510 if evaluate_expression(&predicate, &record, ctx) {
2511 Some(Ok(record))
2512 } else {
2513 None
2514 }
2515 }
2516 Err(e) => Some(Err(e)),
2517 })))
2518 }
2519
2520 fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
2521 (self.input.estimate_cardinality(ctx) / 10).max(1)
2523 }
2524}
2525
2526impl ExecutionPlan for ProjectNode {
2527 fn execute<'a>(
2528 &'a self,
2529 ctx: &'a ExecutionContext<'a>,
2530 ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
2531 let input_iter = self.input.execute(ctx)?;
2532 let projections = self.projections.clone();
2533
2534 Ok(Box::new(input_iter.map(move |res| match res {
2535 Ok(record) => {
2536 let mut new_record = Record::new();
2537 for (expr, alias) in &projections {
2538 let val = evaluate_expression_value(expr, &record, ctx);
2539 new_record.insert(alias.clone(), val);
2540 }
2541 Ok(new_record)
2542 }
2543 Err(e) => Err(e),
2544 })))
2545 }
2546
2547 fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
2548 self.input.estimate_cardinality(ctx)
2550 }
2551}
2552
2553struct StreamingExpandVarLengthIterator {
2555 input_iter: Box<dyn Iterator<Item = Result<Record, Error>> + 'static>,
2556 start_node_alias: String,
2557 end_node_alias: String,
2558 direction: RelationshipDirection,
2559 rel_type: Option<String>,
2560 min_hops: u32,
2561 max_hops: u32,
2562 ctx: Arc<ArcExecutionContext>,
2563 current_record: Option<Record>,
2564 current_expansions: Option<std::vec::IntoIter<u64>>,
2565}
2566
2567impl StreamingExpandVarLengthIterator {
2568 fn compute_expansions(&self, start_id: u64) -> Vec<u64> {
2569 let rel_predicate_id: Option<u64> = if let Some(ref rel_type) = self.rel_type {
2570 self.ctx.db.resolve_id(rel_type).ok().flatten()
2571 } else {
2572 None
2573 };
2574
2575 find_reachable_nodes(
2576 &self.ctx.db,
2577 start_id,
2578 self.direction.clone(),
2579 rel_predicate_id,
2580 self.min_hops,
2581 self.max_hops,
2582 )
2583 }
2584}
2585
2586impl Iterator for StreamingExpandVarLengthIterator {
2587 type Item = Result<Record, Error>;
2588
2589 fn next(&mut self) -> Option<Self::Item> {
2590 loop {
2591 if let Some(ref mut expansions) = self.current_expansions
2592 && let Some(end_id) = expansions.next()
2593 {
2594 let mut record = self.current_record.as_ref().unwrap().clone();
2595 record.insert(self.end_node_alias.clone(), Value::Node(end_id));
2596 return Some(Ok(record));
2597 }
2598
2599 self.current_record = None;
2600 self.current_expansions = None;
2601
2602 match self.input_iter.next()? {
2603 Ok(record) => {
2604 let Some(Value::Node(start_id)) = record.values.get(&self.start_node_alias)
2605 else {
2606 continue;
2607 };
2608 let expansions = self.compute_expansions(*start_id);
2609 self.current_record = Some(record);
2610 self.current_expansions = Some(expansions.into_iter());
2611 }
2612 Err(e) => return Some(Err(e)),
2613 }
2614 }
2615 }
2616}
2617
2618impl ExecutionPlan for ExpandNode {
2619 fn execute<'a>(
2620 &'a self,
2621 ctx: &'a ExecutionContext<'a>,
2622 ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
2623 let input_iter = self.input.execute(ctx)?;
2624 let start_node_alias = self.start_node_alias.clone();
2625 let rel_alias = self.rel_alias.clone();
2626 let end_node_alias = self.end_node_alias.clone();
2627 let direction = self.direction.clone();
2628 let db = ctx.db;
2629
2630 let rel_predicate_id: Option<u64> = if let Some(ref rel_type) = self.rel_type {
2632 db.resolve_id(rel_type).ok().flatten()
2633 } else {
2634 None
2635 };
2636
2637 Ok(Box::new(input_iter.flat_map(
2638 move |res| -> Box<dyn Iterator<Item = Result<Record, Error>>> {
2639 match res {
2640 Ok(record) => {
2641 let start_val = record.get(&start_node_alias);
2642 let start_id = match start_val {
2643 Some(Value::Node(id)) => *id,
2644 _ => return Box::new(std::iter::empty()),
2645 };
2646
2647 let criteria = match direction {
2648 RelationshipDirection::LeftToRight => QueryCriteria {
2649 subject_id: Some(start_id),
2650 predicate_id: rel_predicate_id,
2651 object_id: None,
2652 },
2653 RelationshipDirection::RightToLeft => QueryCriteria {
2654 subject_id: None,
2655 predicate_id: rel_predicate_id,
2656 object_id: Some(start_id),
2657 },
2658 RelationshipDirection::Undirected => QueryCriteria {
2659 subject_id: Some(start_id),
2660 predicate_id: rel_predicate_id,
2661 object_id: None,
2662 },
2663 };
2664
2665 let triples = db.query(criteria);
2666 let rel_alias = rel_alias.clone();
2667 let end_node_alias = end_node_alias.clone();
2668 let record = record.clone();
2669 let direction = direction.clone();
2670
2671 Box::new(triples.map(move |triple| {
2672 let mut new_record = record.clone();
2673 new_record.insert(rel_alias.clone(), Value::Relationship(triple));
2674
2675 let end_id = if direction == RelationshipDirection::RightToLeft {
2676 triple.subject_id
2677 } else {
2678 triple.object_id
2679 };
2680 new_record.insert(end_node_alias.clone(), Value::Node(end_id));
2681
2682 Ok(new_record)
2683 }))
2684 }
2685 Err(e) => Box::new(std::iter::once(Err(e))),
2686 }
2687 },
2688 )))
2689 }
2690
2691 fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
2692 self.input.estimate_cardinality(ctx) * 3
2695 }
2696}
2697
2698impl ExecutionPlan for ExpandVarLengthNode {
2699 fn execute<'a>(
2700 &'a self,
2701 ctx: &'a ExecutionContext<'a>,
2702 ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
2703 let input_iter = self.input.execute(ctx)?;
2704 let start_node_alias = self.start_node_alias.clone();
2705 let end_node_alias = self.end_node_alias.clone();
2706 let direction = self.direction.clone();
2707 let db = ctx.db;
2708
2709 let rel_predicate_id: Option<u64> = if let Some(ref rel_type) = self.rel_type {
2710 db.resolve_id(rel_type).ok().flatten()
2711 } else {
2712 None
2713 };
2714
2715 let min_hops = self.min_hops;
2716 let max_hops = self.max_hops;
2717
2718 Ok(Box::new(input_iter.flat_map(
2719 move |res| -> Box<dyn Iterator<Item = Result<Record, Error>> + 'a> {
2720 let record = match res {
2721 Ok(record) => record,
2722 Err(e) => return Box::new(std::iter::once(Err(e))),
2723 };
2724
2725 let Some(Value::Node(start_id)) = record.get(&start_node_alias) else {
2726 return Box::new(std::iter::empty());
2727 };
2728
2729 let expansions = find_reachable_nodes(
2730 db,
2731 *start_id,
2732 direction.clone(),
2733 rel_predicate_id,
2734 min_hops,
2735 max_hops,
2736 );
2737
2738 let end_node_alias = end_node_alias.clone();
2739 Box::new(expansions.into_iter().map(move |end_id| {
2740 let mut new_record = record.clone();
2741 new_record.insert(end_node_alias.clone(), Value::Node(end_id));
2742 Ok(new_record)
2743 }))
2744 },
2745 )))
2746 }
2747
2748 fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
2749 let hops = usize::try_from(self.max_hops).unwrap_or(1).max(1);
2751 self.input
2752 .estimate_cardinality(ctx)
2753 .saturating_mul(3usize.saturating_pow(hops as u32))
2754 .max(1)
2755 }
2756}
2757
2758fn find_reachable_nodes(
2759 db: &Database,
2760 start_id: u64,
2761 direction: RelationshipDirection,
2762 rel_predicate_id: Option<u64>,
2763 min_hops: u32,
2764 max_hops: u32,
2765) -> Vec<u64> {
2766 let mut results = Vec::new();
2767
2768 if min_hops == 0 {
2769 results.push(start_id);
2770 }
2771
2772 let mut queue: VecDeque<(u64, u32)> = VecDeque::new();
2773 let mut visited: HashSet<(u64, u32)> = HashSet::new();
2774 queue.push_back((start_id, 0));
2775 visited.insert((start_id, 0));
2776
2777 while let Some((node_id, depth)) = queue.pop_front() {
2778 if depth >= max_hops {
2779 continue;
2780 }
2781
2782 let next_depth = depth + 1;
2783
2784 let mut neighbors = Vec::new();
2785 match direction {
2786 RelationshipDirection::LeftToRight => {
2787 let criteria = QueryCriteria {
2788 subject_id: Some(node_id),
2789 predicate_id: rel_predicate_id,
2790 object_id: None,
2791 };
2792 neighbors.extend(db.query(criteria).map(|t| t.object_id));
2793 }
2794 RelationshipDirection::RightToLeft => {
2795 let criteria = QueryCriteria {
2796 subject_id: None,
2797 predicate_id: rel_predicate_id,
2798 object_id: Some(node_id),
2799 };
2800 neighbors.extend(db.query(criteria).map(|t| t.subject_id));
2801 }
2802 RelationshipDirection::Undirected => {
2803 let out = QueryCriteria {
2804 subject_id: Some(node_id),
2805 predicate_id: rel_predicate_id,
2806 object_id: None,
2807 };
2808 neighbors.extend(db.query(out).map(|t| t.object_id));
2809
2810 let inc = QueryCriteria {
2811 subject_id: None,
2812 predicate_id: rel_predicate_id,
2813 object_id: Some(node_id),
2814 };
2815 neighbors.extend(db.query(inc).map(|t| t.subject_id));
2816 }
2817 }
2818
2819 for neighbor in neighbors {
2820 if visited.insert((neighbor, next_depth)) {
2821 if next_depth >= min_hops {
2822 results.push(neighbor);
2823 }
2824 queue.push_back((neighbor, next_depth));
2825 }
2826 }
2827 }
2828
2829 results
2830}
2831
2832impl ExecutionPlan for LimitNode {
2837 fn execute<'a>(
2838 &'a self,
2839 ctx: &'a ExecutionContext<'a>,
2840 ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
2841 let limit = usize::try_from(self.limit).unwrap_or(usize::MAX);
2842 let inner = self.input.execute(ctx)?;
2843
2844 struct LimitIter<I> {
2845 inner: I,
2846 remaining: usize,
2847 }
2848
2849 impl<I> Iterator for LimitIter<I>
2850 where
2851 I: Iterator<Item = Result<Record, Error>>,
2852 {
2853 type Item = Result<Record, Error>;
2854
2855 fn next(&mut self) -> Option<Self::Item> {
2856 if self.remaining == 0 {
2857 return None;
2858 }
2859 match self.inner.next()? {
2860 Ok(v) => {
2861 self.remaining -= 1;
2862 Some(Ok(v))
2863 }
2864 Err(e) => Some(Err(e)),
2865 }
2866 }
2867 }
2868
2869 Ok(Box::new(LimitIter {
2870 inner,
2871 remaining: limit,
2872 }))
2873 }
2874
2875 fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
2876 let inner = self.input.estimate_cardinality(ctx);
2877 inner.min(self.limit as usize)
2878 }
2879}
2880
2881impl ExecutionPlan for SkipNode {
2886 fn execute<'a>(
2887 &'a self,
2888 ctx: &'a ExecutionContext<'a>,
2889 ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
2890 let skip = usize::try_from(self.skip).unwrap_or(0);
2891 let inner = self.input.execute(ctx)?;
2892 Ok(Box::new(inner.skip(skip)))
2893 }
2894
2895 fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
2896 let inner = self.input.estimate_cardinality(ctx);
2897 inner.saturating_sub(self.skip as usize)
2898 }
2899}
2900
2901impl ExecutionPlan for SortNode {
2906 fn execute<'a>(
2907 &'a self,
2908 ctx: &'a ExecutionContext<'a>,
2909 ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
2910 let mut records: Vec<Record> = self.input.execute(ctx)?.filter_map(|r| r.ok()).collect();
2912 let order_by = &self.order_by;
2913
2914 records.sort_by(|a, b| {
2915 for (expr, direction) in order_by {
2916 let val_a = evaluate_expression_value(expr, a, ctx);
2917 let val_b = evaluate_expression_value(expr, b, ctx);
2918 let cmp = compare_values_for_sort(&val_a, &val_b, direction);
2919 if cmp != std::cmp::Ordering::Equal {
2920 return cmp;
2921 }
2922 }
2923 std::cmp::Ordering::Equal
2924 });
2925
2926 Ok(Box::new(records.into_iter().map(Ok)))
2927 }
2928
2929 fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
2930 self.input.estimate_cardinality(ctx)
2931 }
2932}
2933
2934impl ExecutionPlan for AggregateNode {
2939 fn execute<'a>(
2940 &'a self,
2941 ctx: &'a ExecutionContext<'a>,
2942 ) -> Result<Box<dyn Iterator<Item = Result<Record, Error>> + 'a>, Error> {
2943 let records: Vec<Record> = self.input.execute(ctx)?.filter_map(|r| r.ok()).collect();
2945
2946 if self.group_by.is_empty() {
2948 let mut result = Record::new();
2949
2950 for (agg_func, alias) in &self.aggregations {
2951 let value = compute_aggregate(agg_func, &records, ctx);
2952 result.insert(alias.clone(), value);
2953 }
2954
2955 return Ok(Box::new(std::iter::once(Ok(result))));
2956 }
2957
2958 let mut groups: HashMap<String, Vec<Record>> = HashMap::new();
2961
2962 for record in records {
2963 let key = self
2965 .group_by
2966 .iter()
2967 .map(|expr| format!("{:?}", evaluate_expression_value(expr, &record, ctx)))
2968 .collect::<Vec<_>>()
2969 .join("|");
2970 groups.entry(key).or_default().push(record);
2971 }
2972
2973 let results: Vec<Record> = groups
2974 .into_values()
2975 .map(|group_records| {
2976 let mut result = Record::new();
2977
2978 if let Some(first) = group_records.first() {
2980 for expr in &self.group_by {
2981 if let Expression::Variable(name) = expr
2982 && let Some(val) = first.get(name)
2983 {
2984 result.insert(name.clone(), val.clone());
2985 }
2986 }
2987 }
2988
2989 for (agg_func, alias) in &self.aggregations {
2991 let value = compute_aggregate(agg_func, &group_records, ctx);
2992 result.insert(alias.clone(), value);
2993 }
2994
2995 result
2996 })
2997 .collect();
2998
2999 Ok(Box::new(results.into_iter().map(Ok)))
3000 }
3001
3002 fn estimate_cardinality(&self, ctx: &ExecutionContext) -> usize {
3003 if self.group_by.is_empty() {
3004 1 } else {
3006 (self.input.estimate_cardinality(ctx) / 10).max(1)
3008 }
3009 }
3010}
3011
3012fn compute_aggregate(
3014 func: &AggregateFunction,
3015 records: &[Record],
3016 ctx: &ExecutionContext,
3017) -> Value {
3018 match func {
3019 AggregateFunction::Count(expr) => {
3020 let count = if let Some(e) = expr {
3021 records
3022 .iter()
3023 .filter(|r| !matches!(evaluate_expression_value(e, r, ctx), Value::Null))
3024 .count()
3025 } else {
3026 records.len() };
3028 Value::Float(count as f64)
3029 }
3030 AggregateFunction::Sum(expr) => {
3031 let sum: f64 = records
3032 .iter()
3033 .filter_map(|r| {
3034 if let Value::Float(f) = evaluate_expression_value(expr, r, ctx) {
3035 Some(f)
3036 } else {
3037 None
3038 }
3039 })
3040 .sum();
3041 Value::Float(sum)
3042 }
3043 AggregateFunction::Avg(expr) => {
3044 let values: Vec<f64> = records
3045 .iter()
3046 .filter_map(|r| {
3047 if let Value::Float(f) = evaluate_expression_value(expr, r, ctx) {
3048 Some(f)
3049 } else {
3050 None
3051 }
3052 })
3053 .collect();
3054 if values.is_empty() {
3055 Value::Null
3056 } else {
3057 Value::Float(values.iter().sum::<f64>() / values.len() as f64)
3058 }
3059 }
3060 AggregateFunction::Min(expr) => records
3061 .iter()
3062 .map(|r| evaluate_expression_value(expr, r, ctx))
3063 .filter(|v| !matches!(v, Value::Null))
3064 .min_by(compare_values)
3065 .unwrap_or(Value::Null),
3066 AggregateFunction::Max(expr) => records
3067 .iter()
3068 .map(|r| evaluate_expression_value(expr, r, ctx))
3069 .filter(|v| !matches!(v, Value::Null))
3070 .max_by(compare_values)
3071 .unwrap_or(Value::Null),
3072 AggregateFunction::Collect(expr) => {
3073 let values: Vec<String> = records
3075 .iter()
3076 .map(|r| format!("{:?}", evaluate_expression_value(expr, r, ctx)))
3077 .collect();
3078 Value::String(format!("[{}]", values.join(", ")))
3079 }
3080 }
3081}
3082
3083fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
3085 match (a, b) {
3086 (Value::Float(x), Value::Float(y)) => x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal),
3087 (Value::String(x), Value::String(y)) => x.cmp(y),
3088 (Value::Boolean(x), Value::Boolean(y)) => x.cmp(y),
3089 (Value::Node(x), Value::Node(y)) => x.cmp(y),
3090 (Value::Null, Value::Null) => std::cmp::Ordering::Equal,
3091 (Value::Null, _) => std::cmp::Ordering::Greater, (_, Value::Null) => std::cmp::Ordering::Less,
3093 _ => std::cmp::Ordering::Equal,
3094 }
3095}
3096
3097fn compare_values_for_sort(a: &Value, b: &Value, direction: &Direction) -> std::cmp::Ordering {
3098 match (a, b) {
3100 (Value::Null, Value::Null) => std::cmp::Ordering::Equal,
3101 (Value::Null, _) => std::cmp::Ordering::Greater,
3102 (_, Value::Null) => std::cmp::Ordering::Less,
3103 _ => {
3104 let cmp = compare_values(a, b);
3105 match direction {
3106 Direction::Ascending => cmp,
3107 Direction::Descending => cmp.reverse(),
3108 }
3109 }
3110 }
3111}
3112
3113fn evaluate_expression(expr: &Expression, record: &Record, ctx: &ExecutionContext) -> bool {
3118 match evaluate_expression_value(expr, record, ctx) {
3119 Value::Boolean(b) => b,
3120 _ => false,
3121 }
3122}
3123
3124pub fn evaluate_expression_value(
3125 expr: &Expression,
3126 record: &Record,
3127 ctx: &ExecutionContext,
3128) -> Value {
3129 match expr {
3130 Expression::Literal(l) => match l {
3131 Literal::String(s) => Value::String(s.clone()),
3132 Literal::Float(f) => Value::Float(*f),
3133 Literal::Integer(i) => Value::Float(*i as f64),
3134 Literal::Boolean(b) => Value::Boolean(*b),
3135 Literal::Null => Value::Null,
3136 },
3137 Expression::Variable(name) => record.get(name).cloned().unwrap_or(Value::Null),
3138 Expression::Parameter(name) => ctx.params.get(name).cloned().unwrap_or(Value::Null),
3139 Expression::PropertyAccess(pa) => {
3140 if let Some(Value::Node(node_id)) = record.get(&pa.variable)
3141 && let Ok(Some(binary)) = ctx.db.get_node_property_binary(*node_id)
3142 && let Ok(props) = crate::storage::property::deserialize_properties(&binary)
3143 && let Some(value) = props.get(&pa.property)
3144 {
3145 return match value {
3146 serde_json::Value::String(s) => Value::String(s.clone()),
3147 serde_json::Value::Number(n) => Value::Float(n.as_f64().unwrap_or(0.0)),
3148 serde_json::Value::Bool(b) => Value::Boolean(*b),
3149 serde_json::Value::Null => Value::Null,
3150 serde_json::Value::Array(items) => {
3151 let mut out = Vec::with_capacity(items.len());
3152 for item in items {
3153 let Some(n) = item.as_f64() else {
3154 return Value::String(
3155 serde_json::Value::Array(items.clone()).to_string(),
3156 );
3157 };
3158 out.push(n as f32);
3159 }
3160 Value::Vector(out)
3161 }
3162 _ => Value::Null,
3163 };
3164 }
3165 Value::Null
3166 }
3167 Expression::Binary(b) => {
3168 let left = evaluate_expression_value(&b.left, record, ctx);
3169 let right = evaluate_expression_value(&b.right, record, ctx);
3170
3171 match b.operator {
3172 BinaryOperator::Equal => Value::Boolean(left == right),
3173 BinaryOperator::NotEqual => Value::Boolean(left != right),
3174 BinaryOperator::And => match (left, right) {
3175 (Value::Boolean(l), Value::Boolean(r)) => Value::Boolean(l && r),
3176 _ => Value::Null,
3177 },
3178 BinaryOperator::Or => match (left, right) {
3179 (Value::Boolean(l), Value::Boolean(r)) => Value::Boolean(l || r),
3180 _ => Value::Null,
3181 },
3182 BinaryOperator::LessThan => match (left, right) {
3183 (Value::Float(l), Value::Float(r)) => Value::Boolean(l < r),
3184 _ => Value::Null,
3185 },
3186 BinaryOperator::LessThanOrEqual => match (left, right) {
3187 (Value::Float(l), Value::Float(r)) => Value::Boolean(l <= r),
3188 _ => Value::Null,
3189 },
3190 BinaryOperator::GreaterThan => match (left, right) {
3191 (Value::Float(l), Value::Float(r)) => Value::Boolean(l > r),
3192 _ => Value::Null,
3193 },
3194 BinaryOperator::GreaterThanOrEqual => match (left, right) {
3195 (Value::Float(l), Value::Float(r)) => Value::Boolean(l >= r),
3196 _ => Value::Null,
3197 },
3198 BinaryOperator::Add => match (left, right) {
3199 (Value::Float(l), Value::Float(r)) => Value::Float(l + r),
3200 (Value::String(l), Value::String(r)) => Value::String(format!("{}{}", l, r)),
3201 _ => Value::Null,
3202 },
3203 BinaryOperator::Subtract => match (left, right) {
3204 (Value::Float(l), Value::Float(r)) => Value::Float(l - r),
3205 _ => Value::Null,
3206 },
3207 BinaryOperator::Multiply => match (left, right) {
3208 (Value::Float(l), Value::Float(r)) => Value::Float(l * r),
3209 _ => Value::Null,
3210 },
3211 BinaryOperator::Divide => match (left, right) {
3212 (Value::Float(l), Value::Float(r)) if r != 0.0 => Value::Float(l / r),
3213 _ => Value::Null,
3214 },
3215 BinaryOperator::Modulo => match (left, right) {
3216 (Value::Float(l), Value::Float(r)) if r != 0.0 => Value::Float(l % r),
3217 _ => Value::Null,
3218 },
3219 BinaryOperator::In => value_in_list(&left, &right),
3220 BinaryOperator::NotIn => match value_in_list(&left, &right) {
3221 Value::Boolean(b) => Value::Boolean(!b),
3222 other => other,
3223 },
3224 BinaryOperator::StartsWith => match (left, right) {
3225 (Value::String(l), Value::String(r)) => Value::Boolean(l.starts_with(&r)),
3226 _ => Value::Null,
3227 },
3228 BinaryOperator::EndsWith => match (left, right) {
3229 (Value::String(l), Value::String(r)) => Value::Boolean(l.ends_with(&r)),
3230 _ => Value::Null,
3231 },
3232 BinaryOperator::Contains => match (left, right) {
3233 (Value::String(l), Value::String(r)) => Value::Boolean(l.contains(&r)),
3234 _ => Value::Null,
3235 },
3236 _ => Value::Null,
3237 }
3238 }
3239 Expression::Unary(u) => {
3240 let arg = evaluate_expression_value(&u.argument, record, ctx);
3241 match u.operator {
3242 crate::query::ast::UnaryOperator::Not => match arg {
3243 Value::Boolean(b) => Value::Boolean(!b),
3244 _ => Value::Null,
3245 },
3246 crate::query::ast::UnaryOperator::Negate => match arg {
3247 Value::Float(f) => Value::Float(-f),
3248 _ => Value::Null,
3249 },
3250 }
3251 }
3252 Expression::Case(case_expr) => {
3253 for alt in &case_expr.alternatives {
3254 if evaluate_expression(&alt.when, record, ctx) {
3255 return evaluate_expression_value(&alt.then, record, ctx);
3256 }
3257 }
3258 match &case_expr.else_expression {
3259 Some(expr) => evaluate_expression_value(expr, record, ctx),
3260 None => Value::Null,
3261 }
3262 }
3263 Expression::Exists(exists_expr) => {
3264 Value::Boolean(evaluate_exists(exists_expr.as_ref(), record, ctx))
3265 }
3266 Expression::List(elements) => list_literal_value(elements, record, ctx),
3267 Expression::ListComprehension(comp) => list_comprehension_value(comp.as_ref(), record, ctx),
3268 Expression::FunctionCall(func) => match func.name.to_lowercase().as_str() {
3269 "vec_similarity" => {
3270 let Some(left) = func
3271 .arguments
3272 .first()
3273 .map(|arg| evaluate_expression_value(arg, record, ctx))
3274 else {
3275 return Value::Null;
3276 };
3277 let Some(right) = func
3278 .arguments
3279 .get(1)
3280 .map(|arg| evaluate_expression_value(arg, record, ctx))
3281 else {
3282 return Value::Null;
3283 };
3284 let Some(left_vec) = value_to_vector(&left) else {
3285 return Value::Null;
3286 };
3287 let Some(right_vec) = value_to_vector(&right) else {
3288 return Value::Null;
3289 };
3290 let Some(sim) = cosine_similarity(&left_vec, &right_vec) else {
3291 return Value::Null;
3292 };
3293 Value::Float(sim as f64)
3294 }
3295 "txt_score" => {
3296 #[cfg(all(feature = "fts", not(target_arch = "wasm32")))]
3297 {
3298 let Some(Expression::PropertyAccess(pa)) = func.arguments.first() else {
3299 return Value::Null;
3300 };
3301 let Some(Value::Node(node_id)) = record.get(&pa.variable) else {
3302 return Value::Null;
3303 };
3304 let Some(query_expr) = func.arguments.get(1) else {
3305 return Value::Null;
3306 };
3307 let Value::String(query) = evaluate_expression_value(query_expr, record, ctx)
3308 else {
3309 return Value::Null;
3310 };
3311 Value::Float(ctx.db.fts_txt_score(*node_id, &pa.property, &query))
3312 }
3313 #[cfg(not(all(feature = "fts", not(target_arch = "wasm32"))))]
3314 {
3315 Value::Float(0.0)
3316 }
3317 }
3318 "id" => match func
3319 .arguments
3320 .first()
3321 .map(|arg| evaluate_expression_value(arg, record, ctx))
3322 {
3323 Some(Value::Node(id)) => Value::Float(id as f64),
3324 _ => Value::Null,
3325 },
3326 "type" => match func
3327 .arguments
3328 .first()
3329 .map(|arg| evaluate_expression_value(arg, record, ctx))
3330 {
3331 Some(Value::Relationship(triple)) => relationship_type_value(ctx.db, &triple),
3332 _ => Value::Null,
3333 },
3334 "labels" => match func
3335 .arguments
3336 .first()
3337 .map(|arg| evaluate_expression_value(arg, record, ctx))
3338 {
3339 Some(Value::Node(id)) => node_labels_value(ctx.db, id),
3340 _ => Value::Null,
3341 },
3342 "keys" => match func
3343 .arguments
3344 .first()
3345 .map(|arg| evaluate_expression_value(arg, record, ctx))
3346 {
3347 Some(Value::Node(id)) => node_property_keys_value(ctx.db, id),
3348 Some(Value::Relationship(triple)) => edge_property_keys_value(ctx.db, &triple),
3349 _ => Value::Null,
3350 },
3351 "size" => match func
3352 .arguments
3353 .first()
3354 .map(|arg| evaluate_expression_value(arg, record, ctx))
3355 {
3356 Some(Value::String(s)) => Value::Float(s.len() as f64),
3357 _ => Value::Null,
3358 },
3359 "toupper" => match func
3360 .arguments
3361 .first()
3362 .map(|arg| evaluate_expression_value(arg, record, ctx))
3363 {
3364 Some(Value::String(s)) => Value::String(s.to_uppercase()),
3365 _ => Value::Null,
3366 },
3367 "tolower" => match func
3368 .arguments
3369 .first()
3370 .map(|arg| evaluate_expression_value(arg, record, ctx))
3371 {
3372 Some(Value::String(s)) => Value::String(s.to_lowercase()),
3373 _ => Value::Null,
3374 },
3375 "trim" => match func
3376 .arguments
3377 .first()
3378 .map(|arg| evaluate_expression_value(arg, record, ctx))
3379 {
3380 Some(Value::String(s)) => Value::String(s.trim().to_string()),
3381 _ => Value::Null,
3382 },
3383 "coalesce" => {
3384 for arg in &func.arguments {
3385 let v = evaluate_expression_value(arg, record, ctx);
3386 if !matches!(v, Value::Null) {
3387 return v;
3388 }
3389 }
3390 Value::Null
3391 }
3392 _ => Value::Null,
3393 },
3394 _ => Value::Null,
3395 }
3396}
3397
3398fn list_literal_value(elements: &[Expression], record: &Record, ctx: &ExecutionContext) -> Value {
3399 let json = serde_json::Value::Array(
3400 elements
3401 .iter()
3402 .map(|e| executor_value_to_json(&evaluate_expression_value(e, record, ctx)))
3403 .collect(),
3404 );
3405 Value::String(json.to_string())
3406}
3407
3408fn list_comprehension_value(
3409 comp: &ListComprehension,
3410 record: &Record,
3411 ctx: &ExecutionContext,
3412) -> Value {
3413 let Some(items) = evaluate_list_source(&comp.list, record, ctx) else {
3414 return Value::Null;
3415 };
3416
3417 let mut out = Vec::new();
3418 for item in items {
3419 let mut scoped = record.clone();
3420 scoped.insert(comp.variable.clone(), item);
3421
3422 if let Some(where_expr) = &comp.where_expression
3423 && !evaluate_expression(where_expr, &scoped, ctx)
3424 {
3425 continue;
3426 }
3427
3428 let mapped = match &comp.map_expression {
3429 Some(expr) => evaluate_expression_value(expr, &scoped, ctx),
3430 None => scoped.get(&comp.variable).cloned().unwrap_or(Value::Null),
3431 };
3432 out.push(executor_value_to_json(&mapped));
3433 }
3434
3435 Value::String(serde_json::Value::Array(out).to_string())
3436}
3437
3438fn evaluate_list_source(
3439 expr: &Expression,
3440 record: &Record,
3441 ctx: &ExecutionContext,
3442) -> Option<Vec<Value>> {
3443 match expr {
3444 Expression::List(elements) => Some(
3445 elements
3446 .iter()
3447 .map(|e| evaluate_expression_value(e, record, ctx))
3448 .collect(),
3449 ),
3450 _ => match evaluate_expression_value(expr, record, ctx) {
3451 Value::String(s) => parse_executor_list_string(&s),
3452 _ => None,
3453 },
3454 }
3455}
3456fn evaluate_exists(
3457 exists_expr: &ExistsExpression,
3458 record: &Record,
3459 ctx: &ExecutionContext,
3460) -> bool {
3461 match exists_expr {
3462 ExistsExpression::Pattern(pattern) => exists_match_pattern(pattern, None, record, ctx),
3463 ExistsExpression::Subquery(query) => {
3464 let (pattern, where_expr) = match extract_exists_match_query(query) {
3465 Some(v) => v,
3466 None => return false,
3467 };
3468 exists_match_pattern(pattern, where_expr, record, ctx)
3469 }
3470 }
3471}
3472
3473fn exists_match_pattern(
3474 pattern: &Pattern,
3475 where_expr: Option<&Expression>,
3476 outer_record: &Record,
3477 ctx: &ExecutionContext,
3478) -> bool {
3479 let Some(PathElement::Node(start_node)) = pattern.elements.first() else {
3480 return false;
3481 };
3482
3483 if let Some(var) = &start_node.variable
3484 && let Some(Value::Node(start_id)) = outer_record.get(var)
3485 {
3486 if !node_satisfies(*start_id, start_node, outer_record, ctx) {
3487 return false;
3488 }
3489 return exists_path_from_node(pattern, 0, *start_id, outer_record, where_expr, ctx);
3490 }
3491
3492 exists_uncorrelated_match(pattern, where_expr, ctx)
3493}
3494
3495fn exists_uncorrelated_match(
3496 pattern: &Pattern,
3497 where_expr: Option<&Expression>,
3498 ctx: &ExecutionContext,
3499) -> bool {
3500 use crate::query::ast::{MatchClause, Query, ReturnClause, ReturnItem, WhereClause};
3501 use crate::query::planner::QueryPlanner;
3502
3503 let mut clauses: Vec<Clause> = Vec::new();
3504 clauses.push(Clause::Match(MatchClause {
3505 optional: false,
3506 pattern: pattern.clone(),
3507 }));
3508 if let Some(expr) = where_expr.cloned() {
3509 clauses.push(Clause::Where(WhereClause { expression: expr }));
3510 }
3511 clauses.push(Clause::Return(ReturnClause {
3512 distinct: false,
3513 items: vec![ReturnItem {
3514 expression: Expression::Literal(Literal::Integer(1)),
3515 alias: Some("_exists".to_string()),
3516 }],
3517 order_by: None,
3518 limit: Some(1),
3519 skip: None,
3520 }));
3521
3522 let planner = QueryPlanner::new();
3523 let plan = match planner.plan(Query { clauses }) {
3524 Ok(plan) => plan,
3525 Err(_) => return false,
3526 };
3527
3528 match plan.execute(ctx) {
3529 Ok(mut iter) => iter.next().is_some(),
3530 Err(_) => false,
3531 }
3532}
3533
3534fn exists_path_from_node(
3535 pattern: &Pattern,
3536 node_index: usize,
3537 current_node_id: u64,
3538 bindings: &Record,
3539 where_expr: Option<&Expression>,
3540 ctx: &ExecutionContext,
3541) -> bool {
3542 let next_rel_index = node_index + 1;
3543 let next_node_index = node_index + 2;
3544
3545 if next_node_index >= pattern.elements.len() {
3546 return where_expr.is_none_or(|expr| evaluate_expression(expr, bindings, ctx));
3547 }
3548
3549 let PathElement::Relationship(rel) = &pattern.elements[next_rel_index] else {
3550 return false;
3551 };
3552 let PathElement::Node(next_node) = &pattern.elements[next_node_index] else {
3553 return false;
3554 };
3555
3556 if rel.variable_length.is_some() {
3557 return exists_var_length_step(
3558 pattern,
3559 next_node_index,
3560 current_node_id,
3561 rel,
3562 bindings,
3563 where_expr,
3564 ctx,
3565 );
3566 }
3567
3568 for (triple, end_id) in iter_matching_edges(ctx.db, current_node_id, rel) {
3569 let mut new_record = bindings.clone();
3570
3571 if let Some(rel_var) = &rel.variable {
3572 match new_record.get(rel_var) {
3573 Some(Value::Relationship(existing)) if existing == &triple => {}
3574 Some(_) => continue,
3575 None => new_record.insert(rel_var.clone(), Value::Relationship(triple)),
3576 }
3577 }
3578
3579 if let Some(props) = &rel.properties
3580 && !edge_satisfies(&triple, props, &new_record, ctx)
3581 {
3582 continue;
3583 }
3584
3585 if !node_satisfies(end_id, next_node, &new_record, ctx) {
3586 continue;
3587 }
3588
3589 if let Some(node_var) = &next_node.variable {
3590 match new_record.get(node_var) {
3591 Some(Value::Node(existing)) if *existing == end_id => {}
3592 Some(_) => continue,
3593 None => new_record.insert(node_var.clone(), Value::Node(end_id)),
3594 }
3595 }
3596
3597 if exists_path_from_node(
3598 pattern,
3599 next_node_index,
3600 end_id,
3601 &new_record,
3602 where_expr,
3603 ctx,
3604 ) {
3605 return true;
3606 }
3607 }
3608
3609 false
3610}
3611
3612fn exists_var_length_step(
3613 pattern: &Pattern,
3614 next_node_index: usize,
3615 current_node_id: u64,
3616 rel: &RelationshipPattern,
3617 bindings: &Record,
3618 where_expr: Option<&Expression>,
3619 ctx: &ExecutionContext,
3620) -> bool {
3621 let PathElement::Node(next_node) = &pattern.elements[next_node_index] else {
3622 return false;
3623 };
3624 if rel.variable.is_some() || rel.properties.is_some() {
3625 return false;
3626 }
3627 let Some(var_len) = &rel.variable_length else {
3628 return false;
3629 };
3630 let min_hops = var_len.min.unwrap_or(1);
3631 let Some(max_hops) = var_len.max else {
3632 return false;
3633 };
3634
3635 if rel.types.len() > 1 {
3636 return false;
3637 }
3638
3639 let rel_predicate_id = rel
3640 .types
3641 .first()
3642 .and_then(|t| ctx.db.resolve_id(t).ok().flatten());
3643
3644 let reachable = find_reachable_nodes(
3645 ctx.db,
3646 current_node_id,
3647 rel.direction.clone(),
3648 rel_predicate_id,
3649 min_hops,
3650 max_hops,
3651 );
3652
3653 for end_id in reachable {
3654 let mut new_record = bindings.clone();
3655
3656 if !node_satisfies(end_id, next_node, &new_record, ctx) {
3657 continue;
3658 }
3659
3660 if let Some(node_var) = &next_node.variable {
3661 match new_record.get(node_var) {
3662 Some(Value::Node(existing)) if *existing == end_id => {}
3663 Some(_) => continue,
3664 None => new_record.insert(node_var.clone(), Value::Node(end_id)),
3665 }
3666 }
3667
3668 if exists_path_from_node(
3669 pattern,
3670 next_node_index,
3671 end_id,
3672 &new_record,
3673 where_expr,
3674 ctx,
3675 ) {
3676 return true;
3677 }
3678 }
3679
3680 false
3681}
3682
3683fn node_satisfies(
3684 node_id: u64,
3685 node: &crate::query::ast::NodePattern,
3686 bindings: &Record,
3687 ctx: &ExecutionContext,
3688) -> bool {
3689 if let Some(var) = &node.variable
3690 && let Some(Value::Node(bound)) = bindings.get(var)
3691 && *bound != node_id
3692 {
3693 return false;
3694 }
3695
3696 if !node.labels.is_empty() {
3697 let Some(type_id) = ctx.db.resolve_id("type").ok().flatten() else {
3698 return false;
3699 };
3700 for label in &node.labels {
3701 let Some(label_id) = ctx.db.resolve_id(label).ok().flatten() else {
3702 return false;
3703 };
3704 let criteria = QueryCriteria {
3705 subject_id: Some(node_id),
3706 predicate_id: Some(type_id),
3707 object_id: Some(label_id),
3708 };
3709 if ctx.db.query(criteria).next().is_none() {
3710 return false;
3711 }
3712 }
3713 }
3714
3715 if let Some(props) = &node.properties
3716 && !node_properties_match(node_id, props, bindings, ctx)
3717 {
3718 return false;
3719 }
3720
3721 true
3722}
3723
3724fn node_properties_match(
3725 node_id: u64,
3726 props: &PropertyMap,
3727 bindings: &Record,
3728 ctx: &ExecutionContext,
3729) -> bool {
3730 if props.properties.is_empty() {
3731 return true;
3732 }
3733 let Ok(Some(binary)) = ctx.db.get_node_property_binary(node_id) else {
3734 return false;
3735 };
3736 let Ok(stored) = crate::storage::property::deserialize_properties(&binary) else {
3737 return false;
3738 };
3739
3740 for pair in &props.properties {
3741 let expected = evaluate_expression_value(&pair.value, bindings, ctx);
3742 let Some(actual) = stored.get(&pair.key) else {
3743 return false;
3744 };
3745 if !json_value_matches_executor_value(actual, &expected) {
3746 return false;
3747 }
3748 }
3749
3750 true
3751}
3752
3753fn edge_satisfies(
3754 triple: &Triple,
3755 props: &PropertyMap,
3756 bindings: &Record,
3757 ctx: &ExecutionContext,
3758) -> bool {
3759 if props.properties.is_empty() {
3760 return true;
3761 }
3762 let Ok(Some(binary)) =
3763 ctx.db
3764 .get_edge_property_binary(triple.subject_id, triple.predicate_id, triple.object_id)
3765 else {
3766 return false;
3767 };
3768 let Ok(stored) = crate::storage::property::deserialize_properties(&binary) else {
3769 return false;
3770 };
3771
3772 for pair in &props.properties {
3773 let expected = evaluate_expression_value(&pair.value, bindings, ctx);
3774 let Some(actual) = stored.get(&pair.key) else {
3775 return false;
3776 };
3777 if !json_value_matches_executor_value(actual, &expected) {
3778 return false;
3779 }
3780 }
3781
3782 true
3783}
3784
3785#[cfg(test)]
3786mod tests {
3787 use super::*;
3788 use crate::Database;
3789 use tempfile::tempdir;
3790
3791 #[test]
3792 fn test_optimized_scan_empty_labels() {
3793 let dir = tempdir().unwrap();
3794 let path = dir.path().join("test.nervus");
3795 let mut db = Database::open(crate::Options::new(&path)).unwrap();
3796
3797 db.add_fact(crate::Fact::new("alice", "knows", "bob"))
3799 .unwrap();
3800 db.add_fact(crate::Fact::new("bob", "knows", "charlie"))
3801 .unwrap();
3802
3803 let ctx = ExecutionContext {
3804 db: &db,
3805 params: &HashMap::new(),
3806 };
3807
3808 let scan_node = ScanNode {
3809 alias: "n".to_string(),
3810 labels: vec![],
3811 };
3812
3813 let results: Vec<_> = scan_node.execute(&ctx).unwrap().collect();
3814
3815 assert!(results.len() >= 3);
3817 assert!(results.iter().all(|r| r.is_ok()));
3818 }
3819
3820 #[test]
3821 fn test_cardinality_estimation() {
3822 let dir = tempdir().unwrap();
3823 let path = dir.path().join("test.nervus");
3824 let mut db = Database::open(crate::Options::new(&path)).unwrap();
3825
3826 db.add_fact(crate::Fact::new("alice", "type", "Person"))
3828 .unwrap();
3829 db.add_fact(crate::Fact::new("bob", "type", "Person"))
3830 .unwrap();
3831 db.add_fact(crate::Fact::new("charlie", "type", "Robot"))
3832 .unwrap();
3833
3834 let ctx = ExecutionContext {
3835 db: &db,
3836 params: &HashMap::new(),
3837 };
3838
3839 let scan_all = ScanNode {
3841 alias: "n".to_string(),
3842 labels: vec![],
3843 };
3844 let card_all = scan_all.estimate_cardinality(&ctx);
3845 assert!(card_all > 0);
3846
3847 let scan_person = ScanNode {
3849 alias: "p".to_string(),
3850 labels: vec!["Person".to_string()],
3851 };
3852 let card_person = scan_person.estimate_cardinality(&ctx);
3853 assert!(card_person > 0);
3854
3855 println!("card_all = {}, card_person = {}", card_all, card_person);
3857 assert!(card_person <= card_all);
3858 }
3859}