1use std::collections::{HashMap, HashSet};
7use std::path::Path;
8
9use tracing::info_span;
10
11use sparrowdb_catalog::catalog::Catalog;
12use sparrowdb_common::{col_id_of, NodeId, Result};
13use sparrowdb_cypher::ast::{
14 BinOpKind, CallStatement, CreateStatement, Expr, ListPredicateKind, Literal,
15 MatchCreateStatement, MatchMergeRelStatement, MatchMutateStatement,
16 MatchOptionalMatchStatement, MatchStatement, MatchWithStatement, Mutation,
17 OptionalMatchStatement, PathPattern, PipelineStage, PipelineStatement, ReturnItem, SortDir,
18 Statement, UnionStatement, UnwindStatement, WithClause,
19};
20use sparrowdb_cypher::{bind, parse};
21use sparrowdb_storage::csr::{CsrBackward, CsrForward};
22use sparrowdb_storage::edge_store::{DeltaRecord, EdgeStore, RelTableId};
23use sparrowdb_storage::fulltext_index::FulltextIndex;
24use sparrowdb_storage::node_store::{NodeStore, Value as StoreValue};
25use sparrowdb_storage::property_index::PropertyIndex;
26use sparrowdb_storage::text_index::TextIndex;
27use sparrowdb_storage::wal::WalReplayer;
28
29use crate::types::{QueryResult, Value};
30
31#[derive(Debug, Default)]
47pub struct DegreeCache {
48 inner: HashMap<u64, u32>,
50}
51
52impl DegreeCache {
53 pub fn out_degree(&self, slot: u64) -> u32 {
57 self.inner.get(&slot).copied().unwrap_or(0)
58 }
59
60 fn increment(&mut self, slot: u64) {
62 *self.inner.entry(slot).or_insert(0) += 1;
63 }
64
65 fn build(csrs: &HashMap<u32, CsrForward>, delta: &[DeltaRecord]) -> Self {
70 let mut cache = DegreeCache::default();
71
72 for csr in csrs.values() {
75 for slot in 0..csr.n_nodes() {
76 let deg = csr.neighbors(slot).len() as u32;
77 if deg > 0 {
78 *cache.inner.entry(slot).or_insert(0) += deg;
79 }
80 }
81 }
82
83 for rec in delta {
86 let src_slot = rec.src.0 & 0xFFFF_FFFF;
87 cache.increment(src_slot);
88 }
89
90 cache
91 }
92}
93
94#[derive(Debug, Clone, Copy)]
100enum RelTableLookup {
101 All,
103 Found(u32),
105 NotFound,
108}
109
110pub struct Engine {
112 pub store: NodeStore,
113 pub catalog: Catalog,
114 pub csrs: HashMap<u32, CsrForward>,
118 pub db_root: std::path::PathBuf,
119 pub params: HashMap<String, Value>,
121 pub prop_index: std::cell::RefCell<PropertyIndex>,
129 pub text_index: std::cell::RefCell<TextIndex>,
141 pub deadline: Option<std::time::Instant>,
148 pub degree_cache: DegreeCache,
155 pub unique_constraints: HashSet<(u32, u32)>,
157}
158
159impl Engine {
160 pub fn new(
166 store: NodeStore,
167 catalog: Catalog,
168 csrs: HashMap<u32, CsrForward>,
169 db_root: &Path,
170 ) -> Self {
171 let delta_all: Vec<DeltaRecord> = {
186 let ids = catalog.list_rel_table_ids();
187 if ids.is_empty() {
188 EdgeStore::open(db_root, RelTableId(0))
189 .and_then(|s| s.read_delta())
190 .unwrap_or_default()
191 } else {
192 ids.into_iter()
193 .flat_map(|(id, _, _, _)| {
194 EdgeStore::open(db_root, RelTableId(id as u32))
195 .and_then(|s| s.read_delta())
196 .unwrap_or_default()
197 })
198 .collect()
199 }
200 };
201 let degree_cache = DegreeCache::build(&csrs, &delta_all);
202
203 Engine {
204 store,
205 catalog,
206 csrs,
207 db_root: db_root.to_path_buf(),
208 params: HashMap::new(),
209 prop_index: std::cell::RefCell::new(PropertyIndex::new()),
210 text_index: std::cell::RefCell::new(TextIndex::new()),
211 deadline: None,
212 degree_cache,
213 unique_constraints: HashSet::new(),
214 }
215 }
216
217 pub fn with_single_csr(
223 store: NodeStore,
224 catalog: Catalog,
225 csr: CsrForward,
226 db_root: &Path,
227 ) -> Self {
228 let mut csrs = HashMap::new();
229 csrs.insert(0u32, csr);
230 Self::new(store, catalog, csrs, db_root)
231 }
232
233 pub fn with_params(mut self, params: HashMap<String, Value>) -> Self {
238 self.params = params;
239 self
240 }
241
242 pub fn with_deadline(mut self, deadline: std::time::Instant) -> Self {
247 self.deadline = Some(deadline);
248 self
249 }
250
251 #[inline]
257 fn check_deadline(&self) -> sparrowdb_common::Result<()> {
258 if let Some(dl) = self.deadline {
259 if std::time::Instant::now() >= dl {
260 return Err(sparrowdb_common::Error::QueryTimeout);
261 }
262 }
263 Ok(())
264 }
265
266 fn resolve_rel_table_id(
275 &self,
276 src_label_id: u32,
277 dst_label_id: u32,
278 rel_type: &str,
279 ) -> RelTableLookup {
280 if rel_type.is_empty() {
281 return RelTableLookup::All;
282 }
283 match self
284 .catalog
285 .get_rel_table(src_label_id as u16, dst_label_id as u16, rel_type)
286 .ok()
287 .flatten()
288 {
289 Some(id) => RelTableLookup::Found(id as u32),
290 None => RelTableLookup::NotFound,
291 }
292 }
293
294 fn read_delta_for(&self, rel_table_id: u32) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
299 EdgeStore::open(&self.db_root, RelTableId(rel_table_id))
300 .and_then(|s| s.read_delta())
301 .unwrap_or_default()
302 }
303
304 fn read_delta_all(&self) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
308 let ids = self.catalog.list_rel_table_ids();
309 if ids.is_empty() {
310 return EdgeStore::open(&self.db_root, RelTableId(0))
312 .and_then(|s| s.read_delta())
313 .unwrap_or_default();
314 }
315 ids.into_iter()
316 .flat_map(|(id, _, _, _)| {
317 EdgeStore::open(&self.db_root, RelTableId(id as u32))
318 .and_then(|s| s.read_delta())
319 .unwrap_or_default()
320 })
321 .collect()
322 }
323
324 fn csr_neighbors(&self, rel_table_id: u32, src_slot: u64) -> Vec<u64> {
326 self.csrs
327 .get(&rel_table_id)
328 .map(|csr| csr.neighbors(src_slot).to_vec())
329 .unwrap_or_default()
330 }
331
332 fn csr_neighbors_all(&self, src_slot: u64) -> Vec<u64> {
334 let mut out: Vec<u64> = Vec::new();
335 for csr in self.csrs.values() {
336 out.extend_from_slice(csr.neighbors(src_slot));
337 }
338 out
339 }
340
341 pub fn top_k_by_degree(&self, label_id: u32, k: usize) -> Result<Vec<(u64, u32)>> {
350 if k == 0 {
351 return Ok(vec![]);
352 }
353 let hwm = self.store.hwm_for_label(label_id)?;
354 if hwm == 0 {
355 return Ok(vec![]);
356 }
357
358 let mut pairs: Vec<(u64, u32)> = (0..hwm)
359 .map(|slot| (slot, self.degree_cache.out_degree(slot)))
360 .collect();
361
362 pairs.sort_unstable_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
364 pairs.truncate(k);
365 Ok(pairs)
366 }
367
368 pub fn execute(&mut self, cypher: &str) -> Result<QueryResult> {
373 let stmt = {
374 let _parse_span = info_span!("sparrowdb.parse", cypher = cypher).entered();
375 parse(cypher)?
376 };
377
378 let bound = {
379 let _bind_span = info_span!("sparrowdb.bind").entered();
380 bind(stmt, &self.catalog)?
381 };
382
383 {
384 let _plan_span = info_span!("sparrowdb.plan_execute").entered();
385 self.execute_bound(bound.inner)
386 }
387 }
388
389 pub fn execute_statement(&mut self, stmt: Statement) -> Result<QueryResult> {
394 self.execute_bound(stmt)
395 }
396
397 fn execute_bound(&mut self, stmt: Statement) -> Result<QueryResult> {
398 match stmt {
399 Statement::Match(m) => self.execute_match(&m),
400 Statement::MatchWith(mw) => self.execute_match_with(&mw),
401 Statement::Unwind(u) => self.execute_unwind(&u),
402 Statement::Create(c) => self.execute_create(&c),
403 Statement::Merge(_)
407 | Statement::MatchMergeRel(_)
408 | Statement::MatchMutate(_)
409 | Statement::MatchCreate(_) => Err(sparrowdb_common::Error::InvalidArgument(
410 "mutation statements must be executed via execute_mutation".into(),
411 )),
412 Statement::OptionalMatch(om) => self.execute_optional_match(&om),
413 Statement::MatchOptionalMatch(mom) => self.execute_match_optional_match(&mom),
414 Statement::Union(u) => self.execute_union(u),
415 Statement::Checkpoint | Statement::Optimize => Ok(QueryResult::empty(vec![])),
416 Statement::Call(c) => self.execute_call(&c),
417 Statement::Pipeline(p) => self.execute_pipeline(&p),
418 Statement::CreateIndex { label, property } => {
419 self.execute_create_index(&label, &property)
420 }
421 Statement::CreateConstraint { label, property } => {
422 self.execute_create_constraint(&label, &property)
423 }
424 }
425 }
426
427 fn execute_call(&self, c: &CallStatement) -> Result<QueryResult> {
434 match c.procedure.as_str() {
435 "db.index.fulltext.queryNodes" => self.call_fulltext_query_nodes(c),
436 "db.schema" => self.call_db_schema(c),
437 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
438 "unknown procedure: {other}"
439 ))),
440 }
441 }
442
443 fn call_fulltext_query_nodes(&self, c: &CallStatement) -> Result<QueryResult> {
452 if c.args.len() != 2 {
454 return Err(sparrowdb_common::Error::InvalidArgument(
455 "db.index.fulltext.queryNodes requires exactly 2 arguments: (indexName, query)"
456 .into(),
457 ));
458 }
459
460 let index_name = eval_expr_to_string(&c.args[0])?;
462 let query = eval_expr_to_string(&c.args[1])?;
464
465 let index = FulltextIndex::open(&self.db_root, &index_name)?;
468 let node_ids = index.search(&query);
469
470 let yield_cols: Vec<String> = if c.yield_columns.is_empty() {
473 vec!["node".to_owned()]
474 } else {
475 c.yield_columns.clone()
476 };
477
478 if let Some(bad_col) = yield_cols.iter().find(|c| c.as_str() != "node") {
480 return Err(sparrowdb_common::Error::InvalidArgument(format!(
481 "unsupported YIELD column for db.index.fulltext.queryNodes: {bad_col}"
482 )));
483 }
484
485 let mut rows: Vec<Vec<Value>> = Vec::new();
487 for raw_id in node_ids {
488 let node_id = sparrowdb_common::NodeId(raw_id);
489 let row: Vec<Value> = yield_cols.iter().map(|_| Value::NodeRef(node_id)).collect();
490 rows.push(row);
491 }
492
493 let (columns, rows) = if let Some(ref ret) = c.return_clause {
495 self.project_call_return(ret, &yield_cols, rows)?
496 } else {
497 (yield_cols, rows)
498 };
499
500 Ok(QueryResult { columns, rows })
501 }
502
503 fn call_db_schema(&self, c: &CallStatement) -> Result<QueryResult> {
514 if !c.args.is_empty() {
515 return Err(sparrowdb_common::Error::InvalidArgument(
516 "db.schema requires exactly 0 arguments".into(),
517 ));
518 }
519 let columns = vec![
520 "type".to_owned(),
521 "name".to_owned(),
522 "properties".to_owned(),
523 ];
524
525 let wal_dir = self.db_root.join("wal");
527 let schema = WalReplayer::scan_schema(&wal_dir)?;
528
529 let mut rows: Vec<Vec<Value>> = Vec::new();
530
531 let labels = self.catalog.list_labels()?;
533 for (label_id, label_name) in &labels {
534 let mut prop_names: Vec<String> = schema
535 .node_props
536 .get(&(*label_id as u32))
537 .map(|s| s.iter().cloned().collect())
538 .unwrap_or_default();
539 prop_names.sort();
540 let props_value = Value::List(prop_names.into_iter().map(Value::String).collect());
541 rows.push(vec![
542 Value::String("node".to_owned()),
543 Value::String(label_name.clone()),
544 props_value,
545 ]);
546 }
547
548 let rel_tables = self.catalog.list_rel_tables()?;
550 let mut seen_rel_types: std::collections::HashSet<String> =
552 std::collections::HashSet::new();
553 for (_, _, rel_type) in &rel_tables {
554 if seen_rel_types.insert(rel_type.clone()) {
555 let mut prop_names: Vec<String> = schema
556 .rel_props
557 .get(rel_type)
558 .map(|s| s.iter().cloned().collect())
559 .unwrap_or_default();
560 prop_names.sort();
561 let props_value = Value::List(prop_names.into_iter().map(Value::String).collect());
562 rows.push(vec![
563 Value::String("relationship".to_owned()),
564 Value::String(rel_type.clone()),
565 props_value,
566 ]);
567 }
568 }
569
570 Ok(QueryResult { columns, rows })
571 }
572
573 fn project_call_return(
583 &self,
584 ret: &sparrowdb_cypher::ast::ReturnClause,
585 yield_cols: &[String],
586 rows: Vec<Vec<Value>>,
587 ) -> Result<(Vec<String>, Vec<Vec<Value>>)> {
588 let out_cols: Vec<String> = ret
590 .items
591 .iter()
592 .map(|item| {
593 item.alias
594 .clone()
595 .unwrap_or_else(|| expr_to_col_name(&item.expr))
596 })
597 .collect();
598
599 let mut out_rows = Vec::new();
600 for row in rows {
601 let env: HashMap<String, Value> = yield_cols
603 .iter()
604 .zip(row.iter())
605 .map(|(k, v)| (k.clone(), v.clone()))
606 .collect();
607
608 let projected: Vec<Value> = ret
609 .items
610 .iter()
611 .map(|item| eval_call_expr(&item.expr, &env, &self.store))
612 .collect();
613 out_rows.push(projected);
614 }
615 Ok((out_cols, out_rows))
616 }
617
618 pub fn is_mutation(stmt: &Statement) -> bool {
623 match stmt {
624 Statement::Merge(_)
625 | Statement::MatchMergeRel(_)
626 | Statement::MatchMutate(_)
627 | Statement::MatchCreate(_) => true,
628 Statement::Create(_) => true,
632 _ => false,
633 }
634 }
635
636 pub fn scan_match_mutate(&self, mm: &MatchMutateStatement) -> Result<Vec<NodeId>> {
642 if mm.match_patterns.is_empty() {
643 return Ok(vec![]);
644 }
645
646 if mm.match_patterns.len() != 1 || !mm.match_patterns[0].rels.is_empty() {
650 return Err(sparrowdb_common::Error::InvalidArgument(
651 "MATCH...SET/DELETE currently supports only single-node patterns (no relationships)"
652 .into(),
653 ));
654 }
655
656 let pat = &mm.match_patterns[0];
657 if pat.nodes.is_empty() {
658 return Ok(vec![]);
659 }
660 let node_pat = &pat.nodes[0];
661 let label = node_pat.labels.first().cloned().unwrap_or_default();
662
663 let label_id = match self.catalog.get_label(&label)? {
664 Some(id) => id as u32,
665 None => return Ok(vec![]),
667 };
668
669 let hwm = self.store.hwm_for_label(label_id)?;
670
671 let filter_col_ids: Vec<u32> = node_pat
673 .props
674 .iter()
675 .map(|pe| prop_name_to_col_id(&pe.key))
676 .collect();
677
678 let mut all_col_ids: Vec<u32> = filter_col_ids;
680 if let Some(ref where_expr) = mm.where_clause {
681 collect_col_ids_from_expr(where_expr, &mut all_col_ids);
682 }
683
684 let var_name = node_pat.var.as_str();
685 let mut matching_ids = Vec::new();
686
687 for slot in 0..hwm {
688 let node_id = NodeId(((label_id as u64) << 32) | slot);
689
690 if self.is_node_tombstoned(node_id) {
693 continue;
694 }
695
696 let props = read_node_props(&self.store, node_id, &all_col_ids)?;
697
698 if !matches_prop_filter_static(
699 &props,
700 &node_pat.props,
701 &self.dollar_params(),
702 &self.store,
703 ) {
704 continue;
705 }
706
707 if let Some(ref where_expr) = mm.where_clause {
708 let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
709 row_vals.extend(self.dollar_params());
710 if !self.eval_where_graph(where_expr, &row_vals) {
711 continue;
712 }
713 }
714
715 matching_ids.push(node_id);
716 }
717
718 Ok(matching_ids)
719 }
720
721 pub fn mutation_from_match_mutate(mm: &MatchMutateStatement) -> &Mutation {
724 &mm.mutation
725 }
726
727 fn is_node_tombstoned(&self, node_id: NodeId) -> bool {
736 match self.store.get_node_raw(node_id, &[0u32]) {
737 Ok(col0) => col0.iter().any(|&(c, v)| c == 0 && v == u64::MAX),
738 Err(sparrowdb_common::Error::NotFound) => false,
739 Err(e) => {
740 tracing::warn!(
741 node_id = node_id.0,
742 error = ?e,
743 "tombstone check failed; treating node as not tombstoned"
744 );
745 false
746 }
747 }
748 }
749
750 fn node_matches_prop_filter(
757 &self,
758 node_id: NodeId,
759 filter_col_ids: &[u32],
760 props: &[sparrowdb_cypher::ast::PropEntry],
761 ) -> bool {
762 if props.is_empty() {
763 return true;
764 }
765 match self.store.get_node_raw(node_id, filter_col_ids) {
766 Ok(raw_props) => {
767 matches_prop_filter_static(&raw_props, props, &self.dollar_params(), &self.store)
768 }
769 Err(_) => false,
770 }
771 }
772
773 pub fn scan_match_create(
781 &self,
782 mc: &MatchCreateStatement,
783 ) -> Result<HashMap<String, Vec<NodeId>>> {
784 let mut var_candidates: HashMap<String, Vec<NodeId>> = HashMap::new();
785
786 for pat in &mc.match_patterns {
787 for node_pat in &pat.nodes {
788 if node_pat.var.is_empty() {
789 continue;
790 }
791 if var_candidates.contains_key(&node_pat.var) {
793 continue;
794 }
795
796 let label = node_pat.labels.first().cloned().unwrap_or_default();
797 let label_id: u32 = match self.catalog.get_label(&label)? {
798 Some(id) => id as u32,
799 None => {
800 var_candidates.insert(node_pat.var.clone(), vec![]);
802 continue;
803 }
804 };
805
806 let hwm = self.store.hwm_for_label(label_id)?;
807
808 let filter_col_ids: Vec<u32> = node_pat
810 .props
811 .iter()
812 .map(|p| prop_name_to_col_id(&p.key))
813 .collect();
814
815 let mut matching_ids: Vec<NodeId> = Vec::new();
816 for slot in 0..hwm {
817 let node_id = NodeId(((label_id as u64) << 32) | slot);
818
819 match self.store.get_node_raw(node_id, &[0u32]) {
822 Ok(col0) if col0.iter().any(|&(c, v)| c == 0 && v == u64::MAX) => {
823 continue;
824 }
825 Ok(_) | Err(_) => {}
826 }
827
828 if !node_pat.props.is_empty() {
830 match self.store.get_node_raw(node_id, &filter_col_ids) {
831 Ok(props) => {
832 if !matches_prop_filter_static(
833 &props,
834 &node_pat.props,
835 &self.dollar_params(),
836 &self.store,
837 ) {
838 continue;
839 }
840 }
841 Err(_) => continue,
844 }
845 }
846
847 matching_ids.push(node_id);
848 }
849
850 var_candidates.insert(node_pat.var.clone(), matching_ids);
851 }
852 }
853
854 Ok(var_candidates)
855 }
856
857 pub fn scan_match_create_rows(
879 &self,
880 mc: &MatchCreateStatement,
881 ) -> Result<Vec<HashMap<String, NodeId>>> {
882 let mut accumulated: Vec<HashMap<String, NodeId>> = vec![HashMap::new()];
884
885 for pat in &mc.match_patterns {
886 if pat.rels.is_empty() {
887 let mut per_var: Vec<(String, Vec<NodeId>)> = Vec::new();
892
893 for node_pat in &pat.nodes {
894 if node_pat.var.is_empty() {
895 continue;
896 }
897
898 let scan_label_ids: Vec<u32> = if node_pat.labels.is_empty() {
902 self.catalog
903 .list_labels()?
904 .into_iter()
905 .map(|(id, _)| id as u32)
906 .collect()
907 } else {
908 let label = node_pat.labels.first().cloned().unwrap_or_default();
909 match self.catalog.get_label(&label)? {
910 Some(id) => vec![id as u32],
911 None => {
912 return Ok(vec![]);
914 }
915 }
916 };
917
918 let filter_col_ids: Vec<u32> = node_pat
919 .props
920 .iter()
921 .map(|p| prop_name_to_col_id(&p.key))
922 .collect();
923
924 let mut matching_ids: Vec<NodeId> = Vec::new();
925 for label_id in scan_label_ids {
926 let hwm = self.store.hwm_for_label(label_id)?;
927 for slot in 0..hwm {
928 let node_id = NodeId(((label_id as u64) << 32) | slot);
929
930 if self.is_node_tombstoned(node_id) {
931 continue;
932 }
933 if !self.node_matches_prop_filter(
934 node_id,
935 &filter_col_ids,
936 &node_pat.props,
937 ) {
938 continue;
939 }
940
941 matching_ids.push(node_id);
942 }
943 }
944
945 if matching_ids.is_empty() {
946 return Ok(vec![]);
948 }
949
950 per_var.push((node_pat.var.clone(), matching_ids));
951 }
952
953 for (var, candidates) in per_var {
957 let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
958 for row in &accumulated {
959 for &node_id in &candidates {
960 let mut new_row = row.clone();
961 new_row.insert(var.clone(), node_id);
962 next.push(new_row);
963 }
964 }
965 accumulated = next;
966 }
967 } else if pat.rels.len() == 1 && pat.nodes.len() == 2 {
968 let src_node_pat = &pat.nodes[0];
971 let dst_node_pat = &pat.nodes[1];
972 let rel_pat = &pat.rels[0];
973
974 if rel_pat.dir != sparrowdb_cypher::ast::EdgeDir::Outgoing {
976 return Err(sparrowdb_common::Error::Unimplemented);
977 }
978
979 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
980 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
981
982 let src_label_id: u32 = match self.catalog.get_label(&src_label)? {
983 Some(id) => id as u32,
984 None => return Ok(vec![]),
985 };
986 let dst_label_id: u32 = match self.catalog.get_label(&dst_label)? {
987 Some(id) => id as u32,
988 None => return Ok(vec![]),
989 };
990
991 let src_filter_cols: Vec<u32> = src_node_pat
992 .props
993 .iter()
994 .map(|p| prop_name_to_col_id(&p.key))
995 .collect();
996 let dst_filter_cols: Vec<u32> = dst_node_pat
997 .props
998 .iter()
999 .map(|p| prop_name_to_col_id(&p.key))
1000 .collect();
1001
1002 let rel_lookup =
1004 self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
1005 if matches!(rel_lookup, RelTableLookup::NotFound) {
1006 return Ok(vec![]);
1007 }
1008
1009 let delta_adj: HashMap<u64, Vec<u64>> = {
1012 let records: Vec<DeltaRecord> = match rel_lookup {
1013 RelTableLookup::Found(rtid) => self.read_delta_for(rtid),
1014 _ => self.read_delta_all(),
1015 };
1016 let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
1017 for r in records {
1018 let s = r.src.0;
1019 let s_label = (s >> 32) as u32;
1020 if s_label == src_label_id {
1021 let s_slot = s & 0xFFFF_FFFF;
1022 adj.entry(s_slot).or_default().push(r.dst.0 & 0xFFFF_FFFF);
1023 }
1024 }
1025 adj
1026 };
1027
1028 let hwm_src = self.store.hwm_for_label(src_label_id)?;
1029
1030 let mut pattern_rows: Vec<HashMap<String, NodeId>> = Vec::new();
1032
1033 for src_slot in 0..hwm_src {
1034 self.check_deadline()?;
1036
1037 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
1038
1039 if self.is_node_tombstoned(src_node) {
1040 continue;
1041 }
1042 if !self.node_matches_prop_filter(
1043 src_node,
1044 &src_filter_cols,
1045 &src_node_pat.props,
1046 ) {
1047 continue;
1048 }
1049
1050 let csr_neighbors_vec: Vec<u64> = match rel_lookup {
1052 RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
1053 _ => self.csr_neighbors_all(src_slot),
1054 };
1055 let empty: Vec<u64> = Vec::new();
1056 let delta_neighbors: &[u64] =
1057 delta_adj.get(&src_slot).map_or(&empty, |v| v.as_slice());
1058
1059 let mut seen: HashSet<u64> = HashSet::new();
1060 for &dst_slot in csr_neighbors_vec.iter().chain(delta_neighbors.iter()) {
1061 if !seen.insert(dst_slot) {
1062 continue;
1063 }
1064 let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
1065
1066 if self.is_node_tombstoned(dst_node) {
1067 continue;
1068 }
1069 if !self.node_matches_prop_filter(
1070 dst_node,
1071 &dst_filter_cols,
1072 &dst_node_pat.props,
1073 ) {
1074 continue;
1075 }
1076
1077 let mut row: HashMap<String, NodeId> = HashMap::new();
1078
1079 if !src_node_pat.var.is_empty()
1082 && !dst_node_pat.var.is_empty()
1083 && src_node_pat.var == dst_node_pat.var
1084 {
1085 if src_node != dst_node {
1086 continue;
1087 }
1088 row.insert(src_node_pat.var.clone(), src_node);
1089 } else {
1090 if !src_node_pat.var.is_empty() {
1091 row.insert(src_node_pat.var.clone(), src_node);
1092 }
1093 if !dst_node_pat.var.is_empty() {
1094 row.insert(dst_node_pat.var.clone(), dst_node);
1095 }
1096 }
1097 pattern_rows.push(row);
1098 }
1099 }
1100
1101 if pattern_rows.is_empty() {
1102 return Ok(vec![]);
1103 }
1104
1105 let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
1109 for acc_row in &accumulated {
1110 'outer: for pat_row in &pattern_rows {
1111 for (k, v) in pat_row {
1113 if let Some(existing) = acc_row.get(k) {
1114 if existing != v {
1115 continue 'outer;
1116 }
1117 }
1118 }
1119 let mut new_row = acc_row.clone();
1120 new_row.extend(pat_row.iter().map(|(k, v)| (k.clone(), *v)));
1121 next.push(new_row);
1122 }
1123 }
1124 accumulated = next;
1125 } else {
1126 return Err(sparrowdb_common::Error::Unimplemented);
1128 }
1129 }
1130
1131 Ok(accumulated)
1132 }
1133
1134 pub fn scan_match_merge_rel_rows(
1138 &self,
1139 mm: &MatchMergeRelStatement,
1140 ) -> Result<Vec<HashMap<String, NodeId>>> {
1141 let proxy = MatchCreateStatement {
1144 match_patterns: mm.match_patterns.clone(),
1145 match_props: vec![],
1146 create: CreateStatement {
1147 nodes: vec![],
1148 edges: vec![],
1149 },
1150 };
1151 self.scan_match_create_rows(&proxy)
1152 }
1153
1154 fn execute_unwind(&self, u: &UnwindStatement) -> Result<QueryResult> {
1157 use crate::operators::{Operator, UnwindOperator};
1158
1159 let values = eval_list_expr(&u.expr, &self.params)?;
1161
1162 let column_names = extract_return_column_names(&u.return_clause.items);
1164
1165 if values.is_empty() {
1166 return Ok(QueryResult::empty(column_names));
1167 }
1168
1169 let mut op = UnwindOperator::new(u.alias.clone(), values);
1170 let chunks = op.collect_all()?;
1171
1172 let mut rows: Vec<Vec<Value>> = Vec::new();
1179 for chunk in &chunks {
1180 for group in &chunk.groups {
1181 let n = group.len();
1182 for row_idx in 0..n {
1183 let row = u
1184 .return_clause
1185 .items
1186 .iter()
1187 .map(|item| {
1188 let is_alias = match &item.expr {
1191 Expr::Var(name) => name == &u.alias,
1192 _ => false,
1193 };
1194 if is_alias {
1195 group.get_value(&u.alias, row_idx).unwrap_or(Value::Null)
1196 } else {
1197 Value::Null
1200 }
1201 })
1202 .collect();
1203 rows.push(row);
1204 }
1205 }
1206 }
1207
1208 Ok(QueryResult {
1209 columns: column_names,
1210 rows,
1211 })
1212 }
1213
1214 fn execute_create(&mut self, create: &CreateStatement) -> Result<QueryResult> {
1224 for node in &create.nodes {
1225 let label = node.labels.first().cloned().unwrap_or_default();
1227
1228 if is_reserved_label(&label) {
1230 return Err(sparrowdb_common::Error::InvalidArgument(format!(
1231 "invalid argument: label \"{label}\" is reserved — the __SO_ prefix is for internal use only"
1232 )));
1233 }
1234
1235 let label_id: u32 = match self.catalog.get_label(&label)? {
1236 Some(id) => id as u32,
1237 None => self.catalog.create_label(&label)? as u32,
1238 };
1239
1240 let empty_bindings: HashMap<String, Value> = HashMap::new();
1244 let props: Vec<(u32, StoreValue)> = node
1245 .props
1246 .iter()
1247 .map(|entry| {
1248 let col_id = prop_name_to_col_id(&entry.key);
1249 let val = eval_expr(&entry.value, &empty_bindings);
1250 let store_val = value_to_store_value(val);
1251 (col_id, store_val)
1252 })
1253 .collect();
1254
1255 for &(con_label, con_col) in self.unique_constraints.clone().iter() {
1257 if con_label != label_id { continue; }
1258 if let Some((_, sv)) = props.iter().find(|&&(c, _)| c == con_col) {
1259 let prop_idx = self.prop_index.borrow();
1260 if prop_idx.is_indexed(con_label, con_col) {
1261 let existing = prop_idx.lookup(con_label, con_col, sv.to_u64());
1262 if !existing.is_empty() {
1263 return Err(sparrowdb_common::Error::InvalidArgument(format!(
1264 "unique constraint violation: label \"{label}\" already has a node with the given property value"
1265 )));
1266 }
1267 }
1268 }
1269 }
1270 self.store.create_node(label_id, &props)?;
1271 let constrained: Vec<u32> = props.iter().map(|&(c, _)| c)
1273 .filter(|&c| self.unique_constraints.contains(&(label_id, c))).collect();
1274 for col in constrained {
1275 let _ = self.prop_index.borrow_mut().build_for(&self.store, label_id, col);
1276 }
1277 }
1278 Ok(QueryResult::empty(vec![]))
1279 }
1280
1281 fn execute_create_constraint(&mut self, label: &str, property: &str) -> Result<QueryResult> {
1282 let label_id: u32 = match self.catalog.get_label(label)? {
1283 Some(id) => id as u32,
1284 None => self.catalog.create_label(label)? as u32,
1285 };
1286 let col_id = col_id_of(property);
1287 self.unique_constraints.insert((label_id, col_id));
1288 self.prop_index.borrow_mut().build_for(&self.store, label_id, col_id)?;
1289 Ok(QueryResult::empty(vec![]))
1290 }
1291
1292 fn execute_create_index(&mut self, label: &str, property: &str) -> Result<QueryResult> {
1293 let label_id: u32 = match self.catalog.get_label(label)? {
1294 Some(id) => id as u32,
1295 None => return Ok(QueryResult::empty(vec![])),
1296 };
1297 let col_id = col_id_of(property);
1298 self.prop_index
1299 .borrow_mut()
1300 .build_for(&self.store, label_id, col_id)?;
1301 Ok(QueryResult::empty(vec![]))
1302 }
1303
1304 fn execute_union(&mut self, u: UnionStatement) -> Result<QueryResult> {
1313 let left_result = self.execute_bound(*u.left)?;
1314 let right_result = self.execute_bound(*u.right)?;
1315
1316 if !left_result.columns.is_empty()
1318 && !right_result.columns.is_empty()
1319 && left_result.columns.len() != right_result.columns.len()
1320 {
1321 return Err(sparrowdb_common::Error::InvalidArgument(format!(
1322 "UNION: left side has {} columns, right side has {}",
1323 left_result.columns.len(),
1324 right_result.columns.len()
1325 )));
1326 }
1327
1328 let columns = if !left_result.columns.is_empty() {
1329 left_result.columns.clone()
1330 } else {
1331 right_result.columns.clone()
1332 };
1333
1334 let mut rows = left_result.rows;
1335 rows.extend(right_result.rows);
1336
1337 if !u.all {
1338 deduplicate_rows(&mut rows);
1339 }
1340
1341 Ok(QueryResult { columns, rows })
1342 }
1343
1344 fn execute_match_with(&self, m: &MatchWithStatement) -> Result<QueryResult> {
1353 let intermediate = self.collect_match_rows_for_with(
1355 &m.match_patterns,
1356 m.match_where.as_ref(),
1357 &m.with_clause,
1358 )?;
1359
1360 let has_agg = m
1364 .with_clause
1365 .items
1366 .iter()
1367 .any(|item| is_aggregate_expr(&item.expr));
1368
1369 let projected: Vec<HashMap<String, Value>> = if has_agg {
1370 let agg_rows = self.aggregate_with_items(&intermediate, &m.with_clause.items);
1372 agg_rows
1374 .into_iter()
1375 .filter(|with_vals| {
1376 if let Some(ref where_expr) = m.with_clause.where_clause {
1377 let mut with_vals_p = with_vals.clone();
1378 with_vals_p.extend(self.dollar_params());
1379 self.eval_where_graph(where_expr, &with_vals_p)
1380 } else {
1381 true
1382 }
1383 })
1384 .map(|mut with_vals| {
1385 with_vals.extend(self.dollar_params());
1386 with_vals
1387 })
1388 .collect()
1389 } else {
1390 let mut projected: Vec<HashMap<String, Value>> = Vec::new();
1392 for row_vals in &intermediate {
1393 let mut with_vals: HashMap<String, Value> = HashMap::new();
1394 for item in &m.with_clause.items {
1395 let val = self.eval_expr_graph(&item.expr, row_vals);
1396 with_vals.insert(item.alias.clone(), val);
1397 if let sparrowdb_cypher::ast::Expr::Var(ref src_var) = item.expr {
1401 if let Some(node_ref) = row_vals.get(src_var) {
1402 if matches!(node_ref, Value::NodeRef(_)) {
1403 with_vals.insert(item.alias.clone(), node_ref.clone());
1404 with_vals.insert(
1405 format!("{}.__node_id__", item.alias),
1406 node_ref.clone(),
1407 );
1408 }
1409 }
1410 let nid_key = format!("{src_var}.__node_id__");
1412 if let Some(node_ref) = row_vals.get(&nid_key) {
1413 with_vals
1414 .insert(format!("{}.__node_id__", item.alias), node_ref.clone());
1415 }
1416 }
1417 }
1418 if let Some(ref where_expr) = m.with_clause.where_clause {
1419 let mut with_vals_p = with_vals.clone();
1420 with_vals_p.extend(self.dollar_params());
1421 if !self.eval_where_graph(where_expr, &with_vals_p) {
1422 continue;
1423 }
1424 }
1425 with_vals.extend(self.dollar_params());
1428 projected.push(with_vals);
1429 }
1430 projected
1431 };
1432
1433 let column_names = extract_return_column_names(&m.return_clause.items);
1435
1436 let mut ordered_projected = projected;
1440 if !m.order_by.is_empty() {
1441 ordered_projected.sort_by(|a, b| {
1442 for (expr, dir) in &m.order_by {
1443 let val_a = eval_expr(expr, a);
1444 let val_b = eval_expr(expr, b);
1445 let cmp = compare_values(&val_a, &val_b);
1446 let cmp = if *dir == SortDir::Desc {
1447 cmp.reverse()
1448 } else {
1449 cmp
1450 };
1451 if cmp != std::cmp::Ordering::Equal {
1452 return cmp;
1453 }
1454 }
1455 std::cmp::Ordering::Equal
1456 });
1457 }
1458
1459 if let Some(skip) = m.skip {
1461 let skip = (skip as usize).min(ordered_projected.len());
1462 ordered_projected.drain(0..skip);
1463 }
1464 if let Some(lim) = m.limit {
1465 ordered_projected.truncate(lim as usize);
1466 }
1467
1468 let mut rows: Vec<Vec<Value>> = ordered_projected
1469 .iter()
1470 .map(|with_vals| {
1471 m.return_clause
1472 .items
1473 .iter()
1474 .map(|item| self.eval_expr_graph(&item.expr, with_vals))
1475 .collect()
1476 })
1477 .collect();
1478
1479 if m.distinct {
1480 deduplicate_rows(&mut rows);
1481 }
1482
1483 Ok(QueryResult {
1484 columns: column_names,
1485 rows,
1486 })
1487 }
1488
1489 fn aggregate_with_items(
1494 &self,
1495 rows: &[HashMap<String, Value>],
1496 items: &[sparrowdb_cypher::ast::WithItem],
1497 ) -> Vec<HashMap<String, Value>> {
1498 let key_indices: Vec<usize> = items
1500 .iter()
1501 .enumerate()
1502 .filter(|(_, item)| !is_aggregate_expr(&item.expr))
1503 .map(|(i, _)| i)
1504 .collect();
1505 let agg_indices: Vec<usize> = items
1506 .iter()
1507 .enumerate()
1508 .filter(|(_, item)| is_aggregate_expr(&item.expr))
1509 .map(|(i, _)| i)
1510 .collect();
1511
1512 let mut group_keys: Vec<Vec<Value>> = Vec::new();
1514 let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new(); for row_vals in rows {
1517 let key: Vec<Value> = key_indices
1518 .iter()
1519 .map(|&i| eval_expr(&items[i].expr, row_vals))
1520 .collect();
1521 let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
1522 pos
1523 } else {
1524 group_keys.push(key);
1525 group_accum.push(vec![vec![]; agg_indices.len()]);
1526 group_keys.len() - 1
1527 };
1528 for (ai, &ri) in agg_indices.iter().enumerate() {
1529 match &items[ri].expr {
1530 sparrowdb_cypher::ast::Expr::CountStar => {
1531 group_accum[group_idx][ai].push(Value::Int64(1));
1532 }
1533 sparrowdb_cypher::ast::Expr::FnCall { name, args }
1534 if name.to_lowercase() == "collect" =>
1535 {
1536 let val = if !args.is_empty() {
1537 eval_expr(&args[0], row_vals)
1538 } else {
1539 Value::Null
1540 };
1541 if !matches!(val, Value::Null) {
1542 group_accum[group_idx][ai].push(val);
1543 }
1544 }
1545 sparrowdb_cypher::ast::Expr::FnCall { name, args }
1546 if matches!(
1547 name.to_lowercase().as_str(),
1548 "count" | "sum" | "avg" | "min" | "max"
1549 ) =>
1550 {
1551 let val = if !args.is_empty() {
1552 eval_expr(&args[0], row_vals)
1553 } else {
1554 Value::Null
1555 };
1556 if !matches!(val, Value::Null) {
1557 group_accum[group_idx][ai].push(val);
1558 }
1559 }
1560 _ => {}
1561 }
1562 }
1563 }
1564
1565 if rows.is_empty() && key_indices.is_empty() {
1568 let mut out_row: HashMap<String, Value> = HashMap::new();
1569 for &ri in &agg_indices {
1570 let val = match &items[ri].expr {
1571 sparrowdb_cypher::ast::Expr::CountStar => Value::Int64(0),
1572 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1573 if name.to_lowercase() == "collect" =>
1574 {
1575 Value::List(vec![])
1576 }
1577 _ => Value::Int64(0),
1578 };
1579 out_row.insert(items[ri].alias.clone(), val);
1580 }
1581 return vec![out_row];
1582 }
1583
1584 let mut result: Vec<HashMap<String, Value>> = Vec::new();
1586 for (gi, key_vals) in group_keys.iter().enumerate() {
1587 let mut out_row: HashMap<String, Value> = HashMap::new();
1588 for (ki, &ri) in key_indices.iter().enumerate() {
1590 out_row.insert(items[ri].alias.clone(), key_vals[ki].clone());
1591 }
1592 for (ai, &ri) in agg_indices.iter().enumerate() {
1594 let accum = &group_accum[gi][ai];
1595 let val = match &items[ri].expr {
1596 sparrowdb_cypher::ast::Expr::CountStar => Value::Int64(accum.len() as i64),
1597 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1598 if name.to_lowercase() == "collect" =>
1599 {
1600 Value::List(accum.clone())
1601 }
1602 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1603 if name.to_lowercase() == "count" =>
1604 {
1605 Value::Int64(accum.len() as i64)
1606 }
1607 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1608 if name.to_lowercase() == "sum" =>
1609 {
1610 let sum: i64 = accum
1611 .iter()
1612 .filter_map(|v| {
1613 if let Value::Int64(n) = v {
1614 Some(*n)
1615 } else {
1616 None
1617 }
1618 })
1619 .sum();
1620 Value::Int64(sum)
1621 }
1622 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1623 if name.to_lowercase() == "min" =>
1624 {
1625 accum
1626 .iter()
1627 .min_by(|a, b| compare_values(a, b))
1628 .cloned()
1629 .unwrap_or(Value::Null)
1630 }
1631 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1632 if name.to_lowercase() == "max" =>
1633 {
1634 accum
1635 .iter()
1636 .max_by(|a, b| compare_values(a, b))
1637 .cloned()
1638 .unwrap_or(Value::Null)
1639 }
1640 _ => Value::Null,
1641 };
1642 out_row.insert(items[ri].alias.clone(), val);
1643 }
1644 result.push(out_row);
1645 }
1646 result
1647 }
1648
1649 fn execute_pipeline(&self, p: &PipelineStatement) -> Result<QueryResult> {
1654 let mut current_rows: Vec<HashMap<String, Value>> =
1656 if let Some((expr, alias)) = &p.leading_unwind {
1657 let values = eval_list_expr(expr, &self.params)?;
1659 values
1660 .into_iter()
1661 .map(|v| {
1662 let mut m = HashMap::new();
1663 m.insert(alias.clone(), v);
1664 m
1665 })
1666 .collect()
1667 } else if let Some(ref patterns) = p.leading_match {
1668 self.collect_pipeline_match_rows(patterns, p.leading_where.as_ref())?
1673 } else {
1674 vec![HashMap::new()]
1675 };
1676
1677 for stage in &p.stages {
1679 match stage {
1680 PipelineStage::With {
1681 clause,
1682 order_by,
1683 skip,
1684 limit,
1685 } => {
1686 if !order_by.is_empty() {
1690 current_rows.sort_by(|a, b| {
1691 for (expr, dir) in order_by {
1692 let va = eval_expr(expr, a);
1693 let vb = eval_expr(expr, b);
1694 let cmp = compare_values(&va, &vb);
1695 let cmp = if *dir == SortDir::Desc {
1696 cmp.reverse()
1697 } else {
1698 cmp
1699 };
1700 if cmp != std::cmp::Ordering::Equal {
1701 return cmp;
1702 }
1703 }
1704 std::cmp::Ordering::Equal
1705 });
1706 }
1707 if let Some(s) = skip {
1708 let s = (*s as usize).min(current_rows.len());
1709 current_rows.drain(0..s);
1710 }
1711 if let Some(l) = limit {
1712 current_rows.truncate(*l as usize);
1713 }
1714
1715 let has_agg = clause
1717 .items
1718 .iter()
1719 .any(|item| is_aggregate_expr(&item.expr));
1720 let next_rows: Vec<HashMap<String, Value>> = if has_agg {
1721 let agg_rows = self.aggregate_with_items(¤t_rows, &clause.items);
1722 agg_rows
1723 .into_iter()
1724 .filter(|with_vals| {
1725 if let Some(ref where_expr) = clause.where_clause {
1726 let mut wv = with_vals.clone();
1727 wv.extend(self.dollar_params());
1728 self.eval_where_graph(where_expr, &wv)
1729 } else {
1730 true
1731 }
1732 })
1733 .map(|mut with_vals| {
1734 with_vals.extend(self.dollar_params());
1735 with_vals
1736 })
1737 .collect()
1738 } else {
1739 let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
1740 for row_vals in ¤t_rows {
1741 let mut with_vals: HashMap<String, Value> = HashMap::new();
1742 for item in &clause.items {
1743 let val = self.eval_expr_graph(&item.expr, row_vals);
1744 with_vals.insert(item.alias.clone(), val);
1745 if let sparrowdb_cypher::ast::Expr::Var(ref src_var) = item.expr {
1747 if let Some(nr @ Value::NodeRef(_)) = row_vals.get(src_var) {
1748 with_vals.insert(item.alias.clone(), nr.clone());
1749 with_vals.insert(
1750 format!("{}.__node_id__", item.alias),
1751 nr.clone(),
1752 );
1753 }
1754 let nid_key = format!("{src_var}.__node_id__");
1755 if let Some(nr) = row_vals.get(&nid_key) {
1756 with_vals.insert(
1757 format!("{}.__node_id__", item.alias),
1758 nr.clone(),
1759 );
1760 }
1761 }
1762 }
1763 if let Some(ref where_expr) = clause.where_clause {
1764 let mut wv = with_vals.clone();
1765 wv.extend(self.dollar_params());
1766 if !self.eval_where_graph(where_expr, &wv) {
1767 continue;
1768 }
1769 }
1770 with_vals.extend(self.dollar_params());
1771 next_rows.push(with_vals);
1772 }
1773 next_rows
1774 };
1775 current_rows = next_rows;
1776 }
1777 PipelineStage::Match {
1778 patterns,
1779 where_clause,
1780 } => {
1781 let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
1784 for binding in ¤t_rows {
1785 let new_rows = self.execute_pipeline_match_stage(
1786 patterns,
1787 where_clause.as_ref(),
1788 binding,
1789 )?;
1790 next_rows.extend(new_rows);
1791 }
1792 current_rows = next_rows;
1793 }
1794 PipelineStage::Unwind { alias, new_alias } => {
1795 let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
1797 for row_vals in ¤t_rows {
1798 let list_val = row_vals.get(alias.as_str()).cloned().unwrap_or(Value::Null);
1799 let items = match list_val {
1800 Value::List(v) => v,
1801 other => vec![other],
1802 };
1803 for item in items {
1804 let mut new_row = row_vals.clone();
1805 new_row.insert(new_alias.clone(), item);
1806 next_rows.push(new_row);
1807 }
1808 }
1809 current_rows = next_rows;
1810 }
1811 }
1812 }
1813
1814 let column_names = extract_return_column_names(&p.return_clause.items);
1816
1817 if !p.return_order_by.is_empty() {
1819 current_rows.sort_by(|a, b| {
1820 for (expr, dir) in &p.return_order_by {
1821 let va = eval_expr(expr, a);
1822 let vb = eval_expr(expr, b);
1823 let cmp = compare_values(&va, &vb);
1824 let cmp = if *dir == SortDir::Desc {
1825 cmp.reverse()
1826 } else {
1827 cmp
1828 };
1829 if cmp != std::cmp::Ordering::Equal {
1830 return cmp;
1831 }
1832 }
1833 std::cmp::Ordering::Equal
1834 });
1835 }
1836
1837 if let Some(skip) = p.return_skip {
1838 let skip = (skip as usize).min(current_rows.len());
1839 current_rows.drain(0..skip);
1840 }
1841 if let Some(lim) = p.return_limit {
1842 current_rows.truncate(lim as usize);
1843 }
1844
1845 let mut rows: Vec<Vec<Value>> = current_rows
1846 .iter()
1847 .map(|row_vals| {
1848 p.return_clause
1849 .items
1850 .iter()
1851 .map(|item| self.eval_expr_graph(&item.expr, row_vals))
1852 .collect()
1853 })
1854 .collect();
1855
1856 if p.distinct {
1857 deduplicate_rows(&mut rows);
1858 }
1859
1860 Ok(QueryResult {
1861 columns: column_names,
1862 rows,
1863 })
1864 }
1865
1866 fn collect_pipeline_match_rows(
1872 &self,
1873 patterns: &[PathPattern],
1874 where_clause: Option<&Expr>,
1875 ) -> Result<Vec<HashMap<String, Value>>> {
1876 if patterns.is_empty() {
1877 return Ok(vec![HashMap::new()]);
1878 }
1879
1880 let pat = &patterns[0];
1882 let node = &pat.nodes[0];
1883 let var_name = node.var.as_str();
1884 let label = node.labels.first().cloned().unwrap_or_default();
1885
1886 let label_id = match self.catalog.get_label(&label)? {
1887 Some(id) => id as u32,
1888 None => return Ok(vec![]),
1889 };
1890 let hwm = self.store.hwm_for_label(label_id)?;
1891 let col_ids: Vec<u32> = self.store.col_ids_for_label(label_id).unwrap_or_default();
1892
1893 let mut result: Vec<HashMap<String, Value>> = Vec::new();
1894 for slot in 0..hwm {
1895 let node_id = NodeId(((label_id as u64) << 32) | slot);
1896 if self.is_node_tombstoned(node_id) {
1897 continue;
1898 }
1899 let props = match self.store.get_node_raw(node_id, &col_ids) {
1900 Ok(p) => p,
1901 Err(_) => continue,
1902 };
1903 if !self.matches_prop_filter(&props, &node.props) {
1904 continue;
1905 }
1906 let mut row_vals = build_row_vals(&props, var_name, &col_ids, &self.store);
1907 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
1909 row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
1910
1911 if let Some(wexpr) = where_clause {
1912 let mut row_vals_p = row_vals.clone();
1913 row_vals_p.extend(self.dollar_params());
1914 if !self.eval_where_graph(wexpr, &row_vals_p) {
1915 continue;
1916 }
1917 }
1918 result.push(row_vals);
1919 }
1920 Ok(result)
1921 }
1922
1923 fn execute_pipeline_match_stage(
1932 &self,
1933 patterns: &[PathPattern],
1934 where_clause: Option<&Expr>,
1935 binding: &HashMap<String, Value>,
1936 ) -> Result<Vec<HashMap<String, Value>>> {
1937 if patterns.is_empty() {
1938 return Ok(vec![binding.clone()]);
1939 }
1940
1941 let pat = &patterns[0];
1942
1943 if !pat.rels.is_empty() {
1945 return self.execute_pipeline_match_hop(pat, where_clause, binding);
1948 }
1949
1950 let node = &pat.nodes[0];
1951 let var_name = node.var.as_str();
1952 let label = node.labels.first().cloned().unwrap_or_default();
1953
1954 let label_id = match self.catalog.get_label(&label)? {
1955 Some(id) => id as u32,
1956 None => return Ok(vec![]),
1957 };
1958 let hwm = self.store.hwm_for_label(label_id)?;
1959 let col_ids: Vec<u32> = self.store.col_ids_for_label(label_id).unwrap_or_default();
1960
1961 let mut result: Vec<HashMap<String, Value>> = Vec::new();
1962 let params = self.dollar_params();
1963 for slot in 0..hwm {
1964 let node_id = NodeId(((label_id as u64) << 32) | slot);
1965 if self.is_node_tombstoned(node_id) {
1966 continue;
1967 }
1968 let props = match self.store.get_node_raw(node_id, &col_ids) {
1969 Ok(p) => p,
1970 Err(_) => continue,
1971 };
1972
1973 if !self.matches_prop_filter_with_binding(&props, &node.props, binding, ¶ms) {
1975 continue;
1976 }
1977
1978 let mut row_vals = build_row_vals(&props, var_name, &col_ids, &self.store);
1979 row_vals.extend(binding.clone());
1981 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
1982 row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
1983
1984 if let Some(wexpr) = where_clause {
1985 let mut row_vals_p = row_vals.clone();
1986 row_vals_p.extend(params.clone());
1987 if !self.eval_where_graph(wexpr, &row_vals_p) {
1988 continue;
1989 }
1990 }
1991 result.push(row_vals);
1992 }
1993 Ok(result)
1994 }
1995
1996 fn execute_pipeline_match_hop(
2001 &self,
2002 pat: &sparrowdb_cypher::ast::PathPattern,
2003 where_clause: Option<&Expr>,
2004 binding: &HashMap<String, Value>,
2005 ) -> Result<Vec<HashMap<String, Value>>> {
2006 if pat.nodes.len() < 2 || pat.rels.is_empty() {
2007 return Ok(vec![]);
2008 }
2009 let src_pat = &pat.nodes[0];
2010 let dst_pat = &pat.nodes[1];
2011 let rel_pat = &pat.rels[0];
2012
2013 let src_label = src_pat.labels.first().cloned().unwrap_or_default();
2014 let dst_label = dst_pat.labels.first().cloned().unwrap_or_default();
2015
2016 let src_label_id = match self.catalog.get_label(&src_label)? {
2017 Some(id) => id as u32,
2018 None => return Ok(vec![]),
2019 };
2020 let dst_label_id = match self.catalog.get_label(&dst_label)? {
2021 Some(id) => id as u32,
2022 None => return Ok(vec![]),
2023 };
2024
2025 let src_col_ids: Vec<u32> = self
2026 .store
2027 .col_ids_for_label(src_label_id)
2028 .unwrap_or_default();
2029 let dst_col_ids: Vec<u32> = self
2030 .store
2031 .col_ids_for_label(dst_label_id)
2032 .unwrap_or_default();
2033 let params = self.dollar_params();
2034
2035 let src_candidates: Vec<NodeId> = {
2037 let bound_src = binding
2039 .get(&src_pat.var)
2040 .or_else(|| binding.get(&format!("{}.__node_id__", src_pat.var)));
2041 if let Some(Value::NodeRef(nid)) = bound_src {
2042 vec![*nid]
2043 } else {
2044 let hwm = self.store.hwm_for_label(src_label_id)?;
2045 let mut cands = Vec::new();
2046 for slot in 0..hwm {
2047 let node_id = NodeId(((src_label_id as u64) << 32) | slot);
2048 if self.is_node_tombstoned(node_id) {
2049 continue;
2050 }
2051 if let Ok(props) = self.store.get_node_raw(node_id, &src_col_ids) {
2052 if self.matches_prop_filter_with_binding(
2053 &props,
2054 &src_pat.props,
2055 binding,
2056 ¶ms,
2057 ) {
2058 cands.push(node_id);
2059 }
2060 }
2061 }
2062 cands
2063 }
2064 };
2065
2066 let rel_table_id = self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
2067
2068 let mut result: Vec<HashMap<String, Value>> = Vec::new();
2069 for src_id in src_candidates {
2070 let src_slot = src_id.0 & 0xFFFF_FFFF;
2071 let dst_slots: Vec<u64> = match &rel_table_id {
2072 RelTableLookup::Found(rtid) => self.csr_neighbors(*rtid, src_slot),
2073 RelTableLookup::NotFound => continue,
2074 RelTableLookup::All => self.csr_neighbors_all(src_slot),
2075 };
2076 let delta_slots: Vec<u64> = self
2078 .read_delta_all()
2079 .into_iter()
2080 .filter(|r| {
2081 let r_src_label = (r.src.0 >> 32) as u32;
2082 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
2083 r_src_label == src_label_id && r_src_slot == src_slot
2084 })
2085 .map(|r| r.dst.0 & 0xFFFF_FFFF)
2086 .collect();
2087 let all_slots: std::collections::HashSet<u64> =
2088 dst_slots.into_iter().chain(delta_slots).collect();
2089
2090 for dst_slot in all_slots {
2091 let dst_id = NodeId(((dst_label_id as u64) << 32) | dst_slot);
2092 if self.is_node_tombstoned(dst_id) {
2093 continue;
2094 }
2095 if let Ok(dst_props) = self.store.get_node_raw(dst_id, &dst_col_ids) {
2096 if !self.matches_prop_filter_with_binding(
2097 &dst_props,
2098 &dst_pat.props,
2099 binding,
2100 ¶ms,
2101 ) {
2102 continue;
2103 }
2104 let src_props = self
2105 .store
2106 .get_node_raw(src_id, &src_col_ids)
2107 .unwrap_or_default();
2108 let mut row_vals =
2109 build_row_vals(&src_props, &src_pat.var, &src_col_ids, &self.store);
2110 row_vals.extend(build_row_vals(
2111 &dst_props,
2112 &dst_pat.var,
2113 &dst_col_ids,
2114 &self.store,
2115 ));
2116 row_vals.extend(binding.clone());
2118 row_vals.insert(src_pat.var.clone(), Value::NodeRef(src_id));
2119 row_vals.insert(
2120 format!("{}.__node_id__", src_pat.var),
2121 Value::NodeRef(src_id),
2122 );
2123 row_vals.insert(dst_pat.var.clone(), Value::NodeRef(dst_id));
2124 row_vals.insert(
2125 format!("{}.__node_id__", dst_pat.var),
2126 Value::NodeRef(dst_id),
2127 );
2128
2129 if let Some(wexpr) = where_clause {
2130 let mut row_vals_p = row_vals.clone();
2131 row_vals_p.extend(params.clone());
2132 if !self.eval_where_graph(wexpr, &row_vals_p) {
2133 continue;
2134 }
2135 }
2136 result.push(row_vals);
2137 }
2138 }
2139 }
2140 Ok(result)
2141 }
2142
2143 fn matches_prop_filter_with_binding(
2149 &self,
2150 props: &[(u32, u64)],
2151 filters: &[sparrowdb_cypher::ast::PropEntry],
2152 binding: &HashMap<String, Value>,
2153 params: &HashMap<String, Value>,
2154 ) -> bool {
2155 for f in filters {
2156 let col_id = prop_name_to_col_id(&f.key);
2157 let stored_raw = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
2158
2159 let filter_val = match &f.value {
2161 sparrowdb_cypher::ast::Expr::Var(v) => {
2162 binding.get(v).cloned().unwrap_or(Value::Null)
2164 }
2165 other => eval_expr(other, params),
2166 };
2167
2168 let stored_val = stored_raw.map(|raw| decode_raw_val(raw, &self.store));
2169 let matches = match (stored_val, &filter_val) {
2170 (Some(Value::String(a)), Value::String(b)) => &a == b,
2171 (Some(Value::Int64(a)), Value::Int64(b)) => a == *b,
2172 (Some(Value::Bool(a)), Value::Bool(b)) => a == *b,
2173 (Some(Value::Float64(a)), Value::Float64(b)) => a == *b,
2174 (None, Value::Null) => true,
2175 _ => false,
2176 };
2177 if !matches {
2178 return false;
2179 }
2180 }
2181 true
2182 }
2183
2184 fn collect_match_rows_for_with(
2193 &self,
2194 patterns: &[PathPattern],
2195 where_clause: Option<&Expr>,
2196 with_clause: &WithClause,
2197 ) -> Result<Vec<HashMap<String, Value>>> {
2198 if patterns.is_empty() || patterns[0].rels.is_empty() {
2199 let pat = &patterns[0];
2200 let node = &pat.nodes[0];
2201 let var_name = node.var.as_str();
2202 let label = node.labels.first().cloned().unwrap_or_default();
2203 let label_id = self
2204 .catalog
2205 .get_label(&label)?
2206 .ok_or(sparrowdb_common::Error::NotFound)?;
2207 let label_id_u32 = label_id as u32;
2208 let hwm = self.store.hwm_for_label(label_id_u32)?;
2209
2210 let mut all_col_ids: Vec<u32> = Vec::new();
2212 if let Some(wexpr) = &where_clause {
2213 collect_col_ids_from_expr(wexpr, &mut all_col_ids);
2214 }
2215 for item in &with_clause.items {
2216 collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
2217 }
2218 for p in &node.props {
2219 let col_id = prop_name_to_col_id(&p.key);
2220 if !all_col_ids.contains(&col_id) {
2221 all_col_ids.push(col_id);
2222 }
2223 }
2224
2225 let mut result: Vec<HashMap<String, Value>> = Vec::new();
2226 for slot in 0..hwm {
2227 let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
2228 if self.is_node_tombstoned(node_id) {
2231 continue;
2232 }
2233 let props = read_node_props(&self.store, node_id, &all_col_ids)?;
2234 if !self.matches_prop_filter(&props, &node.props) {
2235 continue;
2236 }
2237 let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
2238 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2241 row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
2242 if let Some(wexpr) = &where_clause {
2243 let mut row_vals_p = row_vals.clone();
2244 row_vals_p.extend(self.dollar_params());
2245 if !self.eval_where_graph(wexpr, &row_vals_p) {
2246 continue;
2247 }
2248 }
2249 result.push(row_vals);
2250 }
2251 Ok(result)
2252 } else {
2253 Err(sparrowdb_common::Error::Unimplemented)
2254 }
2255 }
2256
2257 fn execute_match(&self, m: &MatchStatement) -> Result<QueryResult> {
2258 if m.pattern.is_empty() {
2259 let column_names = extract_return_column_names(&m.return_clause.items);
2261 let empty_vals: HashMap<String, Value> = HashMap::new();
2262 let row: Vec<Value> = m
2263 .return_clause
2264 .items
2265 .iter()
2266 .map(|item| eval_expr(&item.expr, &empty_vals))
2267 .collect();
2268 return Ok(QueryResult {
2269 columns: column_names,
2270 rows: vec![row],
2271 });
2272 }
2273
2274 let is_two_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() == 2;
2276 let is_one_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() == 1;
2277 let is_n_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() >= 3;
2279 let is_var_len = m.pattern.len() == 1
2281 && m.pattern[0].rels.len() == 1
2282 && m.pattern[0].rels[0].min_hops.is_some();
2283
2284 let column_names = extract_return_column_names(&m.return_clause.items);
2285
2286 let is_multi_pattern = m.pattern.len() > 1 && m.pattern.iter().all(|p| p.rels.is_empty());
2289
2290 if is_var_len {
2291 self.execute_variable_length(m, &column_names)
2292 } else if is_two_hop {
2293 self.execute_two_hop(m, &column_names)
2294 } else if is_one_hop {
2295 self.execute_one_hop(m, &column_names)
2296 } else if is_n_hop {
2297 self.execute_n_hop(m, &column_names)
2298 } else if is_multi_pattern {
2299 self.execute_multi_pattern_scan(m, &column_names)
2300 } else if m.pattern[0].rels.is_empty() {
2301 self.execute_scan(m, &column_names)
2302 } else {
2303 self.execute_scan(m, &column_names)
2305 }
2306 }
2307
2308 fn execute_optional_match(&self, om: &OptionalMatchStatement) -> Result<QueryResult> {
2315 use sparrowdb_common::Error;
2316
2317 let match_stmt = MatchStatement {
2319 pattern: om.pattern.clone(),
2320 where_clause: om.where_clause.clone(),
2321 return_clause: om.return_clause.clone(),
2322 order_by: om.order_by.clone(),
2323 skip: om.skip,
2324 limit: om.limit,
2325 distinct: om.distinct,
2326 };
2327
2328 let column_names = extract_return_column_names(&om.return_clause.items);
2329
2330 let result = self.execute_match(&match_stmt);
2331
2332 match result {
2333 Ok(qr) if !qr.rows.is_empty() => Ok(qr),
2334 Ok(_) | Err(Error::NotFound) | Err(Error::InvalidArgument(_)) => {
2336 let null_row = vec![Value::Null; column_names.len()];
2337 Ok(QueryResult {
2338 columns: column_names,
2339 rows: vec![null_row],
2340 })
2341 }
2342 Err(e) => Err(e),
2343 }
2344 }
2345
2346 fn execute_match_optional_match(
2354 &self,
2355 mom: &MatchOptionalMatchStatement,
2356 ) -> Result<QueryResult> {
2357 let column_names = extract_return_column_names(&mom.return_clause.items);
2358
2359 let lead_return_items: Vec<ReturnItem> = mom
2362 .return_clause
2363 .items
2364 .iter()
2365 .filter(|item| {
2366 let lead_vars: Vec<&str> = mom
2368 .match_patterns
2369 .iter()
2370 .flat_map(|p| p.nodes.iter().map(|n| n.var.as_str()))
2371 .collect();
2372 match &item.expr {
2373 Expr::PropAccess { var, .. } => lead_vars.contains(&var.as_str()),
2374 Expr::Var(v) => lead_vars.contains(&v.as_str()),
2375 _ => false,
2376 }
2377 })
2378 .cloned()
2379 .collect();
2380
2381 let lead_col_names = extract_return_column_names(&lead_return_items);
2384
2385 if mom.match_patterns.is_empty() || mom.match_patterns[0].nodes.is_empty() {
2387 let null_row = vec![Value::Null; column_names.len()];
2388 return Ok(QueryResult {
2389 columns: column_names,
2390 rows: vec![null_row],
2391 });
2392 }
2393 let lead_node_pat = &mom.match_patterns[0].nodes[0];
2394 let lead_label = lead_node_pat.labels.first().cloned().unwrap_or_default();
2395 let lead_label_id = match self.catalog.get_label(&lead_label)? {
2396 Some(id) => id as u32,
2397 None => {
2398 return Ok(QueryResult {
2400 columns: column_names,
2401 rows: vec![],
2402 });
2403 }
2404 };
2405
2406 let lead_all_col_ids: Vec<u32> = {
2408 let mut ids = collect_col_ids_from_columns(&lead_col_names);
2409 if let Some(ref wexpr) = mom.match_where {
2410 collect_col_ids_from_expr(wexpr, &mut ids);
2411 }
2412 for p in &lead_node_pat.props {
2413 let col_id = prop_name_to_col_id(&p.key);
2414 if !ids.contains(&col_id) {
2415 ids.push(col_id);
2416 }
2417 }
2418 ids
2419 };
2420
2421 let lead_hwm = self.store.hwm_for_label(lead_label_id)?;
2422 let lead_var = lead_node_pat.var.as_str();
2423
2424 let mut lead_rows: Vec<(u64, Vec<(u32, u64)>)> = Vec::new();
2426 for slot in 0..lead_hwm {
2427 let node_id = NodeId(((lead_label_id as u64) << 32) | slot);
2428 if self.is_node_tombstoned(node_id) {
2431 continue;
2432 }
2433 let props = read_node_props(&self.store, node_id, &lead_all_col_ids)?;
2434 if !self.matches_prop_filter(&props, &lead_node_pat.props) {
2435 continue;
2436 }
2437 if let Some(ref wexpr) = mom.match_where {
2438 let mut row_vals = build_row_vals(&props, lead_var, &lead_all_col_ids, &self.store);
2439 row_vals.extend(self.dollar_params());
2440 if !self.eval_where_graph(wexpr, &row_vals) {
2441 continue;
2442 }
2443 }
2444 lead_rows.push((slot, props));
2445 }
2446
2447 let opt_patterns = &mom.optional_patterns;
2451
2452 let opt_vars: Vec<String> = opt_patterns
2454 .iter()
2455 .flat_map(|p| p.nodes.iter().map(|n| n.var.clone()))
2456 .filter(|v| !v.is_empty())
2457 .collect();
2458
2459 let mut result_rows: Vec<Vec<Value>> = Vec::new();
2460
2461 for (lead_slot, lead_props) in &lead_rows {
2462 let lead_row_vals =
2463 build_row_vals(lead_props, lead_var, &lead_all_col_ids, &self.store);
2464
2465 let opt_sub_rows: Vec<HashMap<String, Value>> = if opt_patterns.len() == 1
2470 && opt_patterns[0].rels.len() == 1
2471 && opt_patterns[0].nodes.len() == 2
2472 {
2473 let opt_pat = &opt_patterns[0];
2474 let opt_src_pat = &opt_pat.nodes[0];
2475 let opt_dst_pat = &opt_pat.nodes[1];
2476 let opt_rel_pat = &opt_pat.rels[0];
2477
2478 let opt_dst_label = opt_dst_pat.labels.first().cloned().unwrap_or_default();
2480 let opt_dst_label_id: Option<u32> = match self.catalog.get_label(&opt_dst_label) {
2481 Ok(Some(id)) => Some(id as u32),
2482 _ => None,
2483 };
2484
2485 self.optional_one_hop_sub_rows(
2486 *lead_slot,
2487 lead_label_id,
2488 opt_dst_label_id,
2489 opt_src_pat,
2490 opt_dst_pat,
2491 opt_rel_pat,
2492 &opt_vars,
2493 &column_names,
2494 )
2495 .unwrap_or_default()
2496 } else {
2497 vec![]
2499 };
2500
2501 if opt_sub_rows.is_empty() {
2502 let row: Vec<Value> = mom
2504 .return_clause
2505 .items
2506 .iter()
2507 .map(|item| {
2508 let v = eval_expr(&item.expr, &lead_row_vals);
2509 if v == Value::Null {
2510 match &item.expr {
2513 Expr::PropAccess { var, .. } | Expr::Var(var) => {
2514 if opt_vars.contains(var) {
2515 Value::Null
2516 } else {
2517 eval_expr(&item.expr, &lead_row_vals)
2518 }
2519 }
2520 _ => eval_expr(&item.expr, &lead_row_vals),
2521 }
2522 } else {
2523 v
2524 }
2525 })
2526 .collect();
2527 result_rows.push(row);
2528 } else {
2529 for opt_row_vals in opt_sub_rows {
2531 let mut combined = lead_row_vals.clone();
2532 combined.extend(opt_row_vals);
2533 let row: Vec<Value> = mom
2534 .return_clause
2535 .items
2536 .iter()
2537 .map(|item| eval_expr(&item.expr, &combined))
2538 .collect();
2539 result_rows.push(row);
2540 }
2541 }
2542 }
2543
2544 if mom.distinct {
2545 deduplicate_rows(&mut result_rows);
2546 }
2547 if let Some(skip) = mom.skip {
2548 let skip = (skip as usize).min(result_rows.len());
2549 result_rows.drain(0..skip);
2550 }
2551 if let Some(lim) = mom.limit {
2552 result_rows.truncate(lim as usize);
2553 }
2554
2555 Ok(QueryResult {
2556 columns: column_names,
2557 rows: result_rows,
2558 })
2559 }
2560
2561 #[allow(clippy::too_many_arguments)]
2564 fn optional_one_hop_sub_rows(
2565 &self,
2566 src_slot: u64,
2567 src_label_id: u32,
2568 dst_label_id: Option<u32>,
2569 _src_pat: &sparrowdb_cypher::ast::NodePattern,
2570 dst_node_pat: &sparrowdb_cypher::ast::NodePattern,
2571 rel_pat: &sparrowdb_cypher::ast::RelPattern,
2572 opt_vars: &[String],
2573 column_names: &[String],
2574 ) -> Result<Vec<HashMap<String, Value>>> {
2575 let dst_label_id = match dst_label_id {
2576 Some(id) => id,
2577 None => return Ok(vec![]),
2578 };
2579
2580 let dst_var = dst_node_pat.var.as_str();
2581 let col_ids_dst = collect_col_ids_for_var(dst_var, column_names, dst_label_id);
2582 let _ = opt_vars;
2583
2584 let rel_lookup = self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
2586
2587 if matches!(rel_lookup, RelTableLookup::NotFound) {
2589 return Ok(vec![]);
2590 }
2591
2592 let delta_neighbors: Vec<u64> = {
2593 let records: Vec<DeltaRecord> = match rel_lookup {
2594 RelTableLookup::Found(rtid) => self.read_delta_for(rtid),
2595 _ => self.read_delta_all(),
2596 };
2597 records
2598 .into_iter()
2599 .filter(|r| {
2600 let r_src_label = (r.src.0 >> 32) as u32;
2601 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
2602 r_src_label == src_label_id && r_src_slot == src_slot
2603 })
2604 .map(|r| r.dst.0 & 0xFFFF_FFFF)
2605 .collect()
2606 };
2607
2608 let csr_neighbors = match rel_lookup {
2609 RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
2610 _ => self.csr_neighbors_all(src_slot),
2611 };
2612 let all_neighbors: Vec<u64> = csr_neighbors.into_iter().chain(delta_neighbors).collect();
2613
2614 let mut seen: HashSet<u64> = HashSet::new();
2615 let mut sub_rows: Vec<HashMap<String, Value>> = Vec::new();
2616
2617 for dst_slot in all_neighbors {
2618 if !seen.insert(dst_slot) {
2619 continue;
2620 }
2621 let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
2622 let dst_props = read_node_props(&self.store, dst_node, &col_ids_dst)?;
2623 if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
2624 continue;
2625 }
2626 let row_vals = build_row_vals(&dst_props, dst_var, &col_ids_dst, &self.store);
2627 sub_rows.push(row_vals);
2628 }
2629
2630 Ok(sub_rows)
2631 }
2632
2633 fn execute_multi_pattern_scan(
2642 &self,
2643 m: &MatchStatement,
2644 column_names: &[String],
2645 ) -> Result<QueryResult> {
2646 let mut per_var: Vec<(String, u32, Vec<NodeId>)> = Vec::new(); for pat in &m.pattern {
2650 if pat.nodes.is_empty() {
2651 continue;
2652 }
2653 let node = &pat.nodes[0];
2654 if node.var.is_empty() {
2655 continue;
2656 }
2657 let label = node.labels.first().cloned().unwrap_or_default();
2658 let label_id = match self.catalog.get_label(&label)? {
2659 Some(id) => id as u32,
2660 None => return Ok(QueryResult::empty(column_names.to_vec())),
2661 };
2662 let filter_col_ids: Vec<u32> = node
2663 .props
2664 .iter()
2665 .map(|p| prop_name_to_col_id(&p.key))
2666 .collect();
2667 let params = self.dollar_params();
2668 let hwm = self.store.hwm_for_label(label_id)?;
2669 let mut candidates: Vec<NodeId> = Vec::new();
2670 for slot in 0..hwm {
2671 let node_id = NodeId(((label_id as u64) << 32) | slot);
2672 if self.is_node_tombstoned(node_id) {
2673 continue;
2674 }
2675 if filter_col_ids.is_empty() {
2676 candidates.push(node_id);
2677 } else if let Ok(raw_props) = self.store.get_node_raw(node_id, &filter_col_ids) {
2678 if matches_prop_filter_static(&raw_props, &node.props, ¶ms, &self.store) {
2679 candidates.push(node_id);
2680 }
2681 }
2682 }
2683 if candidates.is_empty() {
2684 return Ok(QueryResult::empty(column_names.to_vec()));
2685 }
2686 per_var.push((node.var.clone(), label_id, candidates));
2687 }
2688
2689 let mut accumulated: Vec<HashMap<String, Value>> = vec![HashMap::new()];
2691 for (var, _label_id, candidates) in &per_var {
2692 let mut next: Vec<HashMap<String, Value>> = Vec::new();
2693 for base_row in &accumulated {
2694 for &node_id in candidates {
2695 let mut row = base_row.clone();
2696 row.insert(var.clone(), Value::NodeRef(node_id));
2698 row.insert(format!("{var}.__node_id__"), Value::NodeRef(node_id));
2699 let label_id = (node_id.0 >> 32) as u32;
2701 let label_col_ids = self.store.col_ids_for_label(label_id).unwrap_or_default();
2702 let nullable = self
2703 .store
2704 .get_node_raw_nullable(node_id, &label_col_ids)
2705 .unwrap_or_default();
2706 for &(col_id, opt_raw) in &nullable {
2707 if let Some(raw) = opt_raw {
2708 row.insert(
2709 format!("{var}.col_{col_id}"),
2710 decode_raw_val(raw, &self.store),
2711 );
2712 }
2713 }
2714 next.push(row);
2715 }
2716 }
2717 accumulated = next;
2718 }
2719
2720 if let Some(ref where_expr) = m.where_clause {
2722 accumulated.retain(|row| self.eval_where_graph(where_expr, row));
2723 }
2724
2725 let dollar_params = self.dollar_params();
2727 if !dollar_params.is_empty() {
2728 for row in &mut accumulated {
2729 row.extend(dollar_params.clone());
2730 }
2731 }
2732
2733 let mut rows = self.aggregate_rows_graph(&accumulated, &m.return_clause.items);
2734
2735 apply_order_by(&mut rows, m, column_names);
2737 if let Some(skip) = m.skip {
2738 let skip = (skip as usize).min(rows.len());
2739 rows.drain(0..skip);
2740 }
2741 if let Some(limit) = m.limit {
2742 rows.truncate(limit as usize);
2743 }
2744
2745 Ok(QueryResult {
2746 columns: column_names.to_vec(),
2747 rows,
2748 })
2749 }
2750
2751 fn execute_scan(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
2752 let pat = &m.pattern[0];
2753 let node = &pat.nodes[0];
2754
2755 if node.labels.is_empty() {
2758 return self.execute_scan_all_labels(m, column_names);
2759 }
2760
2761 let label = node.labels.first().cloned().unwrap_or_default();
2762 let label_id = match self.catalog.get_label(&label)? {
2764 Some(id) => id as u32,
2765 None => {
2766 return Ok(QueryResult {
2767 columns: column_names.to_vec(),
2768 rows: vec![],
2769 })
2770 }
2771 };
2772 let label_id_u32 = label_id;
2773
2774 let hwm = self.store.hwm_for_label(label_id_u32)?;
2775 tracing::debug!(label = %label, hwm = hwm, "node scan start");
2776
2777 let col_ids = collect_col_ids_from_columns(column_names);
2780 let mut all_col_ids: Vec<u32> = col_ids.clone();
2781 if let Some(ref where_expr) = m.where_clause {
2783 collect_col_ids_from_expr(where_expr, &mut all_col_ids);
2784 }
2785 for p in &node.props {
2787 let col_id = prop_name_to_col_id(&p.key);
2788 if !all_col_ids.contains(&col_id) {
2789 all_col_ids.push(col_id);
2790 }
2791 }
2792
2793 let use_agg = has_aggregate_in_return(&m.return_clause.items);
2794 let use_eval_path = use_agg || needs_node_ref_in_return(&m.return_clause.items);
2800 if use_eval_path {
2801 for item in &m.return_clause.items {
2806 collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
2807 }
2808 }
2809
2810 let bare_vars = bare_var_names_in_return(&m.return_clause.items);
2813 let all_label_col_ids: Vec<u32> = if !bare_vars.is_empty() {
2814 self.store.col_ids_for_label(label_id_u32)?
2815 } else {
2816 vec![]
2817 };
2818
2819 let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
2820 let mut rows: Vec<Vec<Value>> = Vec::new();
2821
2822 for p in &node.props {
2827 let col_id = sparrowdb_common::col_id_of(&p.key);
2828 let _ = self
2830 .prop_index
2831 .borrow_mut()
2832 .build_for(&self.store, label_id_u32, col_id);
2833 }
2834
2835 let index_candidate_slots: Option<Vec<u32>> = {
2840 let prop_index_ref = self.prop_index.borrow();
2841 try_index_lookup_for_props(&node.props, label_id_u32, &prop_index_ref)
2842 };
2843
2844 if index_candidate_slots.is_none() {
2852 if let Some(wexpr) = m.where_clause.as_ref() {
2853 for prop_name in where_clause_eq_prop_names(wexpr, node.var.as_str()) {
2854 let col_id = sparrowdb_common::col_id_of(prop_name);
2855 let _ =
2856 self.prop_index
2857 .borrow_mut()
2858 .build_for(&self.store, label_id_u32, col_id);
2859 }
2860 }
2861 }
2862 let where_eq_candidate_slots: Option<Vec<u32>> = if index_candidate_slots.is_none() {
2863 let prop_index_ref = self.prop_index.borrow();
2864 m.where_clause.as_ref().and_then(|wexpr| {
2865 try_where_eq_index_lookup(wexpr, node.var.as_str(), label_id_u32, &prop_index_ref)
2866 })
2867 } else {
2868 None
2869 };
2870
2871 if index_candidate_slots.is_none() && where_eq_candidate_slots.is_none() {
2877 if let Some(wexpr) = m.where_clause.as_ref() {
2878 for prop_name in where_clause_range_prop_names(wexpr, node.var.as_str()) {
2879 let col_id = sparrowdb_common::col_id_of(prop_name);
2880 let _ =
2881 self.prop_index
2882 .borrow_mut()
2883 .build_for(&self.store, label_id_u32, col_id);
2884 }
2885 }
2886 }
2887 let where_range_candidate_slots: Option<Vec<u32>> =
2888 if index_candidate_slots.is_none() && where_eq_candidate_slots.is_none() {
2889 let prop_index_ref = self.prop_index.borrow();
2890 m.where_clause.as_ref().and_then(|wexpr| {
2891 try_where_range_index_lookup(
2892 wexpr,
2893 node.var.as_str(),
2894 label_id_u32,
2895 &prop_index_ref,
2896 )
2897 })
2898 } else {
2899 None
2900 };
2901
2902 if index_candidate_slots.is_none()
2913 && where_eq_candidate_slots.is_none()
2914 && where_range_candidate_slots.is_none()
2915 {
2916 if let Some(wexpr) = m.where_clause.as_ref() {
2917 for prop_name in where_clause_text_prop_names(wexpr, node.var.as_str()) {
2918 let col_id = sparrowdb_common::col_id_of(prop_name);
2919 self.text_index
2920 .borrow_mut()
2921 .build_for(&self.store, label_id_u32, col_id);
2922 }
2923 }
2924 }
2925 let text_candidate_slots: Option<Vec<u32>> = if index_candidate_slots.is_none()
2926 && where_eq_candidate_slots.is_none()
2927 && where_range_candidate_slots.is_none()
2928 {
2929 m.where_clause.as_ref().and_then(|wexpr| {
2930 let text_index_ref = self.text_index.borrow();
2931 try_text_index_lookup(wexpr, node.var.as_str(), label_id_u32, &text_index_ref)
2932 })
2933 } else {
2934 None
2935 };
2936
2937 let slot_iter: Box<dyn Iterator<Item = u64>> =
2941 if let Some(ref slots) = index_candidate_slots {
2942 tracing::debug!(
2943 label = %label,
2944 candidates = slots.len(),
2945 "SPA-249: property index fast path"
2946 );
2947 Box::new(slots.iter().map(|&s| s as u64))
2948 } else if let Some(ref slots) = where_eq_candidate_slots {
2949 tracing::debug!(
2950 label = %label,
2951 candidates = slots.len(),
2952 "SPA-249 Phase 1b: WHERE equality index fast path"
2953 );
2954 Box::new(slots.iter().map(|&s| s as u64))
2955 } else if let Some(ref slots) = where_range_candidate_slots {
2956 tracing::debug!(
2957 label = %label,
2958 candidates = slots.len(),
2959 "SPA-249 Phase 2: WHERE range index fast path"
2960 );
2961 Box::new(slots.iter().map(|&s| s as u64))
2962 } else if let Some(ref slots) = text_candidate_slots {
2963 tracing::debug!(
2964 label = %label,
2965 candidates = slots.len(),
2966 "SPA-251: text index fast path"
2967 );
2968 Box::new(slots.iter().map(|&s| s as u64))
2969 } else {
2970 Box::new(0..hwm)
2971 };
2972
2973 for slot in slot_iter {
2974 self.check_deadline()?;
2976
2977 let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
2978 if slot < 1024 || slot % 10_000 == 0 {
2979 tracing::trace!(slot = slot, node_id = node_id.0, "scan emit");
2980 }
2981
2982 if self.is_node_tombstoned(node_id) {
2990 continue;
2991 }
2992
2993 let nullable_props = self.store.get_node_raw_nullable(node_id, &all_col_ids)?;
2998 let props: Vec<(u32, u64)> = nullable_props
2999 .iter()
3000 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
3001 .collect();
3002
3003 if !self.matches_prop_filter(&props, &node.props) {
3005 continue;
3006 }
3007
3008 let var_name = node.var.as_str();
3010 if let Some(ref where_expr) = m.where_clause {
3011 let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
3012 if !var_name.is_empty() && !label.is_empty() {
3014 row_vals.insert(
3015 format!("{}.__labels__", var_name),
3016 Value::List(vec![Value::String(label.clone())]),
3017 );
3018 }
3019 if !var_name.is_empty() {
3021 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
3022 }
3023 row_vals.extend(self.dollar_params());
3025 if !self.eval_where_graph(where_expr, &row_vals) {
3026 continue;
3027 }
3028 }
3029
3030 if use_eval_path {
3031 let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
3033 if !var_name.is_empty() && !label.is_empty() {
3035 row_vals.insert(
3036 format!("{}.__labels__", var_name),
3037 Value::List(vec![Value::String(label.clone())]),
3038 );
3039 }
3040 if !var_name.is_empty() {
3041 if bare_vars.contains(&var_name.to_string()) && !all_label_col_ids.is_empty() {
3045 let all_nullable = self
3046 .store
3047 .get_node_raw_nullable(node_id, &all_label_col_ids)?;
3048 let all_props: Vec<(u32, u64)> = all_nullable
3049 .iter()
3050 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
3051 .collect();
3052 row_vals.insert(
3053 var_name.to_string(),
3054 build_node_map(&all_props, &self.store),
3055 );
3056 } else {
3057 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
3058 }
3059 row_vals.insert(format!("{}.__node_id__", var_name), Value::NodeRef(node_id));
3062 }
3063 raw_rows.push(row_vals);
3064 } else {
3065 let row = project_row(
3067 &props,
3068 column_names,
3069 &all_col_ids,
3070 var_name,
3071 &label,
3072 &self.store,
3073 );
3074 rows.push(row);
3075 }
3076 }
3077
3078 if use_eval_path {
3079 rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
3080 } else {
3081 if m.distinct {
3082 deduplicate_rows(&mut rows);
3083 }
3084
3085 apply_order_by(&mut rows, m, column_names);
3087
3088 if let Some(skip) = m.skip {
3090 let skip = (skip as usize).min(rows.len());
3091 rows.drain(0..skip);
3092 }
3093
3094 if let Some(lim) = m.limit {
3096 rows.truncate(lim as usize);
3097 }
3098 }
3099
3100 tracing::debug!(rows = rows.len(), "node scan complete");
3101 Ok(QueryResult {
3102 columns: column_names.to_vec(),
3103 rows,
3104 })
3105 }
3106
3107 fn execute_scan_all_labels(
3116 &self,
3117 m: &MatchStatement,
3118 column_names: &[String],
3119 ) -> Result<QueryResult> {
3120 let all_labels = self.catalog.list_labels()?;
3121 tracing::debug!(label_count = all_labels.len(), "label-less full scan start");
3122
3123 let pat = &m.pattern[0];
3124 let node = &pat.nodes[0];
3125 let var_name = node.var.as_str();
3126
3127 let mut all_col_ids: Vec<u32> = collect_col_ids_from_columns(column_names);
3129 if let Some(ref where_expr) = m.where_clause {
3130 collect_col_ids_from_expr(where_expr, &mut all_col_ids);
3131 }
3132 for p in &node.props {
3133 let col_id = prop_name_to_col_id(&p.key);
3134 if !all_col_ids.contains(&col_id) {
3135 all_col_ids.push(col_id);
3136 }
3137 }
3138
3139 let use_agg = has_aggregate_in_return(&m.return_clause.items);
3140 let use_eval_path_all = use_agg || needs_node_ref_in_return(&m.return_clause.items);
3142 if use_eval_path_all {
3143 for item in &m.return_clause.items {
3144 collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
3145 }
3146 }
3147
3148 let bare_vars_all = bare_var_names_in_return(&m.return_clause.items);
3150
3151 let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
3152 let mut rows: Vec<Vec<Value>> = Vec::new();
3153
3154 for (label_id, label_name) in &all_labels {
3155 let label_id_u32 = *label_id as u32;
3156 let hwm = self.store.hwm_for_label(label_id_u32)?;
3157 tracing::debug!(label = %label_name, hwm = hwm, "label-less scan: label slot");
3158
3159 let all_label_col_ids_here: Vec<u32> = if !bare_vars_all.is_empty() {
3161 self.store.col_ids_for_label(label_id_u32)?
3162 } else {
3163 vec![]
3164 };
3165
3166 for slot in 0..hwm {
3167 self.check_deadline()?;
3169
3170 let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
3171
3172 if self.is_node_tombstoned(node_id) {
3176 continue;
3177 }
3178
3179 let nullable_props = self.store.get_node_raw_nullable(node_id, &all_col_ids)?;
3180 let props: Vec<(u32, u64)> = nullable_props
3181 .iter()
3182 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
3183 .collect();
3184
3185 if !self.matches_prop_filter(&props, &node.props) {
3187 continue;
3188 }
3189
3190 if let Some(ref where_expr) = m.where_clause {
3192 let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
3193 if !var_name.is_empty() {
3194 row_vals.insert(
3195 format!("{}.__labels__", var_name),
3196 Value::List(vec![Value::String(label_name.clone())]),
3197 );
3198 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
3199 }
3200 row_vals.extend(self.dollar_params());
3201 if !self.eval_where_graph(where_expr, &row_vals) {
3202 continue;
3203 }
3204 }
3205
3206 if use_eval_path_all {
3207 let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
3208 if !var_name.is_empty() {
3209 row_vals.insert(
3210 format!("{}.__labels__", var_name),
3211 Value::List(vec![Value::String(label_name.clone())]),
3212 );
3213 if bare_vars_all.contains(&var_name.to_string())
3215 && !all_label_col_ids_here.is_empty()
3216 {
3217 let all_nullable = self
3218 .store
3219 .get_node_raw_nullable(node_id, &all_label_col_ids_here)?;
3220 let all_props: Vec<(u32, u64)> = all_nullable
3221 .iter()
3222 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
3223 .collect();
3224 row_vals.insert(
3225 var_name.to_string(),
3226 build_node_map(&all_props, &self.store),
3227 );
3228 } else {
3229 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
3230 }
3231 row_vals
3232 .insert(format!("{}.__node_id__", var_name), Value::NodeRef(node_id));
3233 }
3234 raw_rows.push(row_vals);
3235 } else {
3236 let row = project_row(
3237 &props,
3238 column_names,
3239 &all_col_ids,
3240 var_name,
3241 label_name,
3242 &self.store,
3243 );
3244 rows.push(row);
3245 }
3246 }
3247 }
3248
3249 if use_eval_path_all {
3250 rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
3251 }
3252
3253 if m.distinct {
3256 deduplicate_rows(&mut rows);
3257 }
3258 apply_order_by(&mut rows, m, column_names);
3259 if let Some(skip) = m.skip {
3260 let skip = (skip as usize).min(rows.len());
3261 rows.drain(0..skip);
3262 }
3263 if let Some(lim) = m.limit {
3264 rows.truncate(lim as usize);
3265 }
3266
3267 tracing::debug!(rows = rows.len(), "label-less full scan complete");
3268 Ok(QueryResult {
3269 columns: column_names.to_vec(),
3270 rows,
3271 })
3272 }
3273
3274 fn execute_one_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
3277 let pat = &m.pattern[0];
3278 let src_node_pat = &pat.nodes[0];
3279 let dst_node_pat = &pat.nodes[1];
3280 let rel_pat = &pat.rels[0];
3281
3282 let dir = &rel_pat.dir;
3283 use sparrowdb_cypher::ast::EdgeDir;
3289
3290 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
3291 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
3292 let src_label_id_opt: Option<u32> = if src_label.is_empty() {
3294 None
3295 } else {
3296 self.catalog.get_label(&src_label)?.map(|id| id as u32)
3297 };
3298 let dst_label_id_opt: Option<u32> = if dst_label.is_empty() {
3299 None
3300 } else {
3301 self.catalog.get_label(&dst_label)?.map(|id| id as u32)
3302 };
3303
3304 let all_rel_tables = self.catalog.list_rel_tables_with_ids();
3316 let rel_tables_to_scan: Vec<(u64, u32, u32, String)> = all_rel_tables
3317 .into_iter()
3318 .filter(|(_, sid, did, rt)| {
3319 let type_ok = rel_pat.rel_type.is_empty() || rt == &rel_pat.rel_type;
3320 let src_ok = src_label_id_opt.map(|id| id == *sid as u32).unwrap_or(true);
3321 let dst_ok = dst_label_id_opt.map(|id| id == *did as u32).unwrap_or(true);
3322 type_ok && src_ok && dst_ok
3323 })
3324 .map(|(catalog_id, sid, did, rt)| (catalog_id, sid as u32, did as u32, rt))
3325 .collect();
3326
3327 let use_agg = has_aggregate_in_return(&m.return_clause.items);
3328 let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
3329 let mut rows: Vec<Vec<Value>> = Vec::new();
3330 let mut seen_undirected: HashSet<(u64, u64)> = HashSet::new();
3333
3334 let label_id_to_name: Vec<(u16, String)> = if src_label.is_empty() || dst_label.is_empty() {
3336 self.catalog.list_labels().unwrap_or_default()
3337 } else {
3338 vec![]
3339 };
3340
3341 for (catalog_rel_id, tbl_src_label_id, tbl_dst_label_id, tbl_rel_type) in
3343 &rel_tables_to_scan
3344 {
3345 let storage_rel_id = RelTableId(*catalog_rel_id as u32);
3346 let effective_src_label_id = *tbl_src_label_id;
3347 let effective_dst_label_id = *tbl_dst_label_id;
3348
3349 let effective_rel_type: &str = tbl_rel_type.as_str();
3352
3353 let effective_src_label: &str = if src_label.is_empty() {
3355 label_id_to_name
3356 .iter()
3357 .find(|(id, _)| *id as u32 == effective_src_label_id)
3358 .map(|(_, name)| name.as_str())
3359 .unwrap_or("")
3360 } else {
3361 src_label.as_str()
3362 };
3363 let effective_dst_label: &str = if dst_label.is_empty() {
3364 label_id_to_name
3365 .iter()
3366 .find(|(id, _)| *id as u32 == effective_dst_label_id)
3367 .map(|(_, name)| name.as_str())
3368 .unwrap_or("")
3369 } else {
3370 dst_label.as_str()
3371 };
3372
3373 let hwm_src = match self.store.hwm_for_label(effective_src_label_id) {
3374 Ok(h) => h,
3375 Err(_) => continue,
3376 };
3377 tracing::debug!(
3378 src_label = %effective_src_label,
3379 dst_label = %effective_dst_label,
3380 rel_type = %effective_rel_type,
3381 hwm_src = hwm_src,
3382 "one-hop traversal start"
3383 );
3384
3385 let mut col_ids_src =
3386 collect_col_ids_for_var(&src_node_pat.var, column_names, effective_src_label_id);
3387 let mut col_ids_dst =
3388 collect_col_ids_for_var(&dst_node_pat.var, column_names, effective_dst_label_id);
3389 if use_agg {
3390 for item in &m.return_clause.items {
3391 collect_col_ids_from_expr(&item.expr, &mut col_ids_src);
3392 collect_col_ids_from_expr(&item.expr, &mut col_ids_dst);
3393 }
3394 }
3395 if let Some(ref where_expr) = m.where_clause {
3397 collect_col_ids_from_expr(where_expr, &mut col_ids_src);
3398 collect_col_ids_from_expr(where_expr, &mut col_ids_dst);
3399 }
3400
3401 let delta_records_all = {
3404 let edge_store = EdgeStore::open(&self.db_root, storage_rel_id);
3405 edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
3406 };
3407
3408 for src_slot in 0..hwm_src {
3410 self.check_deadline()?;
3412
3413 let src_node = NodeId(((effective_src_label_id as u64) << 32) | src_slot);
3414 let src_props = if !col_ids_src.is_empty() || !src_node_pat.props.is_empty() {
3415 let all_needed: Vec<u32> = {
3416 let mut v = col_ids_src.clone();
3417 for p in &src_node_pat.props {
3418 let col_id = prop_name_to_col_id(&p.key);
3419 if !v.contains(&col_id) {
3420 v.push(col_id);
3421 }
3422 }
3423 v
3424 };
3425 self.store.get_node_raw(src_node, &all_needed)?
3426 } else {
3427 vec![]
3428 };
3429
3430 if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
3432 continue;
3433 }
3434
3435 let delta_neighbors: Vec<u64> = delta_records_all
3438 .iter()
3439 .filter(|r| {
3440 let r_src_label = (r.src.0 >> 32) as u32;
3441 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
3442 r_src_label == effective_src_label_id && r_src_slot == src_slot
3443 })
3444 .map(|r| r.dst.0 & 0xFFFF_FFFF)
3445 .collect();
3446
3447 let csr_neighbors: &[u64] = self
3451 .csrs
3452 .get(&u32::try_from(*catalog_rel_id).expect("rel_table_id overflowed u32"))
3453 .map(|c| c.neighbors(src_slot))
3454 .unwrap_or(&[]);
3455 let all_neighbors: Vec<u64> = csr_neighbors
3456 .iter()
3457 .copied()
3458 .chain(delta_neighbors.into_iter())
3459 .collect();
3460 let mut seen_neighbors: HashSet<u64> = HashSet::new();
3461 for &dst_slot in &all_neighbors {
3462 if !seen_neighbors.insert(dst_slot) {
3463 continue;
3464 }
3465 if *dir == EdgeDir::Both {
3468 seen_undirected.insert((src_slot, dst_slot));
3469 }
3470 let dst_node = NodeId(((effective_dst_label_id as u64) << 32) | dst_slot);
3471 let dst_props = if !col_ids_dst.is_empty() || !dst_node_pat.props.is_empty() {
3472 let all_needed: Vec<u32> = {
3473 let mut v = col_ids_dst.clone();
3474 for p in &dst_node_pat.props {
3475 let col_id = prop_name_to_col_id(&p.key);
3476 if !v.contains(&col_id) {
3477 v.push(col_id);
3478 }
3479 }
3480 v
3481 };
3482 self.store.get_node_raw(dst_node, &all_needed)?
3483 } else {
3484 vec![]
3485 };
3486
3487 if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
3489 continue;
3490 }
3491
3492 if *dir == EdgeDir::Both {
3495 seen_undirected.insert((src_slot, dst_slot));
3496 }
3497
3498 if let Some(ref where_expr) = m.where_clause {
3500 let mut row_vals = build_row_vals(
3501 &src_props,
3502 &src_node_pat.var,
3503 &col_ids_src,
3504 &self.store,
3505 );
3506 row_vals.extend(build_row_vals(
3507 &dst_props,
3508 &dst_node_pat.var,
3509 &col_ids_dst,
3510 &self.store,
3511 ));
3512 if !rel_pat.var.is_empty() {
3514 row_vals.insert(
3515 format!("{}.__type__", rel_pat.var),
3516 Value::String(effective_rel_type.to_string()),
3517 );
3518 }
3519 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3521 row_vals.insert(
3522 format!("{}.__labels__", src_node_pat.var),
3523 Value::List(vec![Value::String(effective_src_label.to_string())]),
3524 );
3525 }
3526 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3527 row_vals.insert(
3528 format!("{}.__labels__", dst_node_pat.var),
3529 Value::List(vec![Value::String(effective_dst_label.to_string())]),
3530 );
3531 }
3532 row_vals.extend(self.dollar_params());
3533 if !self.eval_where_graph(where_expr, &row_vals) {
3534 continue;
3535 }
3536 }
3537
3538 if use_agg {
3539 let mut row_vals = build_row_vals(
3540 &src_props,
3541 &src_node_pat.var,
3542 &col_ids_src,
3543 &self.store,
3544 );
3545 row_vals.extend(build_row_vals(
3546 &dst_props,
3547 &dst_node_pat.var,
3548 &col_ids_dst,
3549 &self.store,
3550 ));
3551 if !rel_pat.var.is_empty() {
3553 row_vals.insert(
3554 format!("{}.__type__", rel_pat.var),
3555 Value::String(effective_rel_type.to_string()),
3556 );
3557 }
3558 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3559 row_vals.insert(
3560 format!("{}.__labels__", src_node_pat.var),
3561 Value::List(vec![Value::String(effective_src_label.to_string())]),
3562 );
3563 }
3564 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3565 row_vals.insert(
3566 format!("{}.__labels__", dst_node_pat.var),
3567 Value::List(vec![Value::String(effective_dst_label.to_string())]),
3568 );
3569 }
3570 if !src_node_pat.var.is_empty() {
3571 row_vals.insert(src_node_pat.var.clone(), Value::NodeRef(src_node));
3572 }
3573 if !dst_node_pat.var.is_empty() {
3574 row_vals.insert(dst_node_pat.var.clone(), Value::NodeRef(dst_node));
3575 }
3576 if !rel_pat.var.is_empty() {
3579 let edge_id = sparrowdb_common::EdgeId(
3585 (*catalog_rel_id << 32) | (src_slot ^ dst_slot) & 0xFFFF_FFFF,
3586 );
3587 row_vals.insert(rel_pat.var.clone(), Value::EdgeRef(edge_id));
3588 }
3589 raw_rows.push(row_vals);
3590 } else {
3591 let rel_var_type = if !rel_pat.var.is_empty() {
3596 Some((rel_pat.var.as_str(), effective_rel_type))
3597 } else {
3598 None
3599 };
3600 let src_label_meta =
3601 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3602 Some((src_node_pat.var.as_str(), effective_src_label))
3603 } else {
3604 None
3605 };
3606 let dst_label_meta =
3607 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3608 Some((dst_node_pat.var.as_str(), effective_dst_label))
3609 } else {
3610 None
3611 };
3612 let row = project_hop_row(
3613 &src_props,
3614 &dst_props,
3615 column_names,
3616 &src_node_pat.var,
3617 &dst_node_pat.var,
3618 rel_var_type,
3619 src_label_meta,
3620 dst_label_meta,
3621 &self.store,
3622 );
3623 rows.push(row);
3624 }
3625 }
3626 }
3627 }
3628
3629 if *dir == EdgeDir::Both {
3634 for (catalog_rel_id, tbl_src_label_id, tbl_dst_label_id, tbl_rel_type) in
3635 &rel_tables_to_scan
3636 {
3637 let storage_rel_id = RelTableId(*catalog_rel_id as u32);
3638 let bwd_scan_label_id = *tbl_dst_label_id;
3640 let bwd_dst_label_id = *tbl_src_label_id;
3641 let effective_rel_type: &str = tbl_rel_type.as_str();
3642
3643 let effective_src_label: &str = if src_label.is_empty() {
3644 label_id_to_name
3645 .iter()
3646 .find(|(id, _)| *id as u32 == bwd_scan_label_id)
3647 .map(|(_, name)| name.as_str())
3648 .unwrap_or("")
3649 } else {
3650 src_label.as_str()
3651 };
3652 let effective_dst_label: &str = if dst_label.is_empty() {
3653 label_id_to_name
3654 .iter()
3655 .find(|(id, _)| *id as u32 == bwd_dst_label_id)
3656 .map(|(_, name)| name.as_str())
3657 .unwrap_or("")
3658 } else {
3659 dst_label.as_str()
3660 };
3661
3662 let hwm_bwd = match self.store.hwm_for_label(bwd_scan_label_id) {
3663 Ok(h) => h,
3664 Err(_) => continue,
3665 };
3666
3667 let mut col_ids_src =
3668 collect_col_ids_for_var(&src_node_pat.var, column_names, bwd_scan_label_id);
3669 let mut col_ids_dst =
3670 collect_col_ids_for_var(&dst_node_pat.var, column_names, bwd_dst_label_id);
3671 if use_agg {
3672 for item in &m.return_clause.items {
3673 collect_col_ids_from_expr(&item.expr, &mut col_ids_src);
3674 collect_col_ids_from_expr(&item.expr, &mut col_ids_dst);
3675 }
3676 }
3677
3678 let delta_records_bwd = EdgeStore::open(&self.db_root, storage_rel_id)
3681 .and_then(|s| s.read_delta())
3682 .unwrap_or_default();
3683
3684 let csr_bwd: Option<CsrBackward> = EdgeStore::open(&self.db_root, storage_rel_id)
3689 .and_then(|s| s.open_bwd())
3690 .ok();
3691
3692 for b_slot in 0..hwm_bwd {
3694 let b_node = NodeId(((bwd_scan_label_id as u64) << 32) | b_slot);
3695 let b_props = if !col_ids_src.is_empty() || !src_node_pat.props.is_empty() {
3696 let all_needed: Vec<u32> = {
3697 let mut v = col_ids_src.clone();
3698 for p in &src_node_pat.props {
3699 let col_id = prop_name_to_col_id(&p.key);
3700 if !v.contains(&col_id) {
3701 v.push(col_id);
3702 }
3703 }
3704 v
3705 };
3706 self.store.get_node_raw(b_node, &all_needed)?
3707 } else {
3708 vec![]
3709 };
3710 if !self.matches_prop_filter(&b_props, &src_node_pat.props) {
3715 continue;
3716 }
3717
3718 let delta_predecessors: Vec<u64> = delta_records_bwd
3721 .iter()
3722 .filter(|r| {
3723 let r_dst_label = (r.dst.0 >> 32) as u32;
3724 let r_dst_slot = r.dst.0 & 0xFFFF_FFFF;
3725 r_dst_label == bwd_scan_label_id && r_dst_slot == b_slot
3726 })
3727 .map(|r| r.src.0 & 0xFFFF_FFFF)
3728 .collect();
3729
3730 let csr_predecessors: &[u64] = csr_bwd
3736 .as_ref()
3737 .map(|c| c.predecessors(b_slot))
3738 .unwrap_or(&[]);
3739 let all_predecessors: Vec<u64> = csr_predecessors
3740 .iter()
3741 .copied()
3742 .chain(delta_predecessors.into_iter())
3743 .collect();
3744
3745 let mut seen_preds: HashSet<u64> = HashSet::new();
3746 for a_slot in all_predecessors {
3747 if !seen_preds.insert(a_slot) {
3748 continue;
3749 }
3750 if seen_undirected.contains(&(b_slot, a_slot)) {
3760 continue;
3761 }
3762
3763 let a_node = NodeId(((bwd_dst_label_id as u64) << 32) | a_slot);
3764 let a_props = if !col_ids_dst.is_empty() || !dst_node_pat.props.is_empty() {
3765 let all_needed: Vec<u32> = {
3766 let mut v = col_ids_dst.clone();
3767 for p in &dst_node_pat.props {
3768 let col_id = prop_name_to_col_id(&p.key);
3769 if !v.contains(&col_id) {
3770 v.push(col_id);
3771 }
3772 }
3773 v
3774 };
3775 self.store.get_node_raw(a_node, &all_needed)?
3776 } else {
3777 vec![]
3778 };
3779
3780 if !self.matches_prop_filter(&a_props, &dst_node_pat.props) {
3781 continue;
3782 }
3783
3784 if let Some(ref where_expr) = m.where_clause {
3786 let mut row_vals = build_row_vals(
3787 &b_props,
3788 &src_node_pat.var,
3789 &col_ids_src,
3790 &self.store,
3791 );
3792 row_vals.extend(build_row_vals(
3793 &a_props,
3794 &dst_node_pat.var,
3795 &col_ids_dst,
3796 &self.store,
3797 ));
3798 if !rel_pat.var.is_empty() {
3799 row_vals.insert(
3800 format!("{}.__type__", rel_pat.var),
3801 Value::String(effective_rel_type.to_string()),
3802 );
3803 }
3804 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3805 row_vals.insert(
3806 format!("{}.__labels__", src_node_pat.var),
3807 Value::List(vec![Value::String(
3808 effective_src_label.to_string(),
3809 )]),
3810 );
3811 }
3812 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3813 row_vals.insert(
3814 format!("{}.__labels__", dst_node_pat.var),
3815 Value::List(vec![Value::String(
3816 effective_dst_label.to_string(),
3817 )]),
3818 );
3819 }
3820 row_vals.extend(self.dollar_params());
3821 if !self.eval_where_graph(where_expr, &row_vals) {
3822 continue;
3823 }
3824 }
3825
3826 if use_agg {
3827 let mut row_vals = build_row_vals(
3828 &b_props,
3829 &src_node_pat.var,
3830 &col_ids_src,
3831 &self.store,
3832 );
3833 row_vals.extend(build_row_vals(
3834 &a_props,
3835 &dst_node_pat.var,
3836 &col_ids_dst,
3837 &self.store,
3838 ));
3839 if !rel_pat.var.is_empty() {
3840 row_vals.insert(
3841 format!("{}.__type__", rel_pat.var),
3842 Value::String(effective_rel_type.to_string()),
3843 );
3844 }
3845 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3846 row_vals.insert(
3847 format!("{}.__labels__", src_node_pat.var),
3848 Value::List(vec![Value::String(
3849 effective_src_label.to_string(),
3850 )]),
3851 );
3852 }
3853 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3854 row_vals.insert(
3855 format!("{}.__labels__", dst_node_pat.var),
3856 Value::List(vec![Value::String(
3857 effective_dst_label.to_string(),
3858 )]),
3859 );
3860 }
3861 if !src_node_pat.var.is_empty() {
3862 row_vals.insert(src_node_pat.var.clone(), Value::NodeRef(b_node));
3863 }
3864 if !dst_node_pat.var.is_empty() {
3865 row_vals.insert(dst_node_pat.var.clone(), Value::NodeRef(a_node));
3866 }
3867 if !rel_pat.var.is_empty() {
3870 let edge_id = sparrowdb_common::EdgeId(
3871 (*catalog_rel_id << 32) | (b_slot ^ a_slot) & 0xFFFF_FFFF,
3872 );
3873 row_vals.insert(rel_pat.var.clone(), Value::EdgeRef(edge_id));
3874 }
3875 raw_rows.push(row_vals);
3876 } else {
3877 let rel_var_type = if !rel_pat.var.is_empty() {
3878 Some((rel_pat.var.as_str(), effective_rel_type))
3879 } else {
3880 None
3881 };
3882 let src_label_meta = if !src_node_pat.var.is_empty()
3883 && !effective_src_label.is_empty()
3884 {
3885 Some((src_node_pat.var.as_str(), effective_src_label))
3886 } else {
3887 None
3888 };
3889 let dst_label_meta = if !dst_node_pat.var.is_empty()
3890 && !effective_dst_label.is_empty()
3891 {
3892 Some((dst_node_pat.var.as_str(), effective_dst_label))
3893 } else {
3894 None
3895 };
3896 let row = project_hop_row(
3897 &b_props,
3898 &a_props,
3899 column_names,
3900 &src_node_pat.var,
3901 &dst_node_pat.var,
3902 rel_var_type,
3903 src_label_meta,
3904 dst_label_meta,
3905 &self.store,
3906 );
3907 rows.push(row);
3908 }
3909 }
3910 }
3911 }
3912 }
3913
3914 if use_agg {
3915 rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
3916 } else {
3917 if m.distinct {
3919 deduplicate_rows(&mut rows);
3920 }
3921
3922 apply_order_by(&mut rows, m, column_names);
3924
3925 if let Some(skip) = m.skip {
3927 let skip = (skip as usize).min(rows.len());
3928 rows.drain(0..skip);
3929 }
3930
3931 if let Some(lim) = m.limit {
3933 rows.truncate(lim as usize);
3934 }
3935 }
3936
3937 tracing::debug!(rows = rows.len(), "one-hop traversal complete");
3938 Ok(QueryResult {
3939 columns: column_names.to_vec(),
3940 rows,
3941 })
3942 }
3943
3944 fn execute_two_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
3947 use crate::join::AspJoin;
3948
3949 let pat = &m.pattern[0];
3950 let src_node_pat = &pat.nodes[0];
3951 let fof_node_pat = &pat.nodes[2];
3953
3954 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
3955 let fof_label = fof_node_pat.labels.first().cloned().unwrap_or_default();
3956 let src_label_id = self
3957 .catalog
3958 .get_label(&src_label)?
3959 .ok_or(sparrowdb_common::Error::NotFound)? as u32;
3960 let fof_label_id = self
3961 .catalog
3962 .get_label(&fof_label)?
3963 .ok_or(sparrowdb_common::Error::NotFound)? as u32;
3964
3965 let hwm_src = self.store.hwm_for_label(src_label_id)?;
3966 tracing::debug!(src_label = %src_label, fof_label = %fof_label, hwm_src = hwm_src, "two-hop traversal start");
3967
3968 let col_ids_fof = {
3972 let mut ids = collect_col_ids_for_var(&fof_node_pat.var, column_names, fof_label_id);
3973 for p in &fof_node_pat.props {
3974 let col_id = prop_name_to_col_id(&p.key);
3975 if !ids.contains(&col_id) {
3976 ids.push(col_id);
3977 }
3978 }
3979 if let Some(ref where_expr) = m.where_clause {
3980 collect_col_ids_from_expr_for_var(where_expr, &fof_node_pat.var, &mut ids);
3981 }
3982 ids
3983 };
3984
3985 let col_ids_src_where: Vec<u32> = {
3990 let mut ids = collect_col_ids_for_var(&src_node_pat.var, column_names, src_label_id);
3991 if let Some(ref where_expr) = m.where_clause {
3992 collect_col_ids_from_expr_for_var(where_expr, &src_node_pat.var, &mut ids);
3993 }
3994 ids
3995 };
3996
3997 let delta_adj: HashMap<u64, Vec<u64>> = {
4003 let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
4004 for r in self.read_delta_all() {
4005 let r_src_label = (r.src.0 >> 32) as u32;
4006 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4007 if r_src_label == src_label_id {
4008 adj.entry(r_src_slot)
4009 .or_default()
4010 .push(r.dst.0 & 0xFFFF_FFFF);
4011 }
4012 }
4013 adj
4014 };
4015
4016 let merged_csr = {
4021 let max_nodes = self.csrs.values().map(|c| c.n_nodes()).max().unwrap_or(0);
4022 let mut edges: Vec<(u64, u64)> = Vec::new();
4023 for csr in self.csrs.values() {
4024 for src in 0..csr.n_nodes() {
4025 for &dst in csr.neighbors(src) {
4026 edges.push((src, dst));
4027 }
4028 }
4029 }
4030 edges.sort_unstable();
4032 edges.dedup();
4033 CsrForward::build(max_nodes, &edges)
4034 };
4035 let join = AspJoin::new(&merged_csr);
4036 let mut rows = Vec::new();
4037
4038 for src_slot in 0..hwm_src {
4040 self.check_deadline()?;
4042
4043 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
4044 let src_needed: Vec<u32> = {
4045 let mut v = vec![];
4046 for p in &src_node_pat.props {
4047 let col_id = prop_name_to_col_id(&p.key);
4048 if !v.contains(&col_id) {
4049 v.push(col_id);
4050 }
4051 }
4052 for &col_id in &col_ids_src_where {
4053 if !v.contains(&col_id) {
4054 v.push(col_id);
4055 }
4056 }
4057 v
4058 };
4059
4060 let src_props = read_node_props(&self.store, src_node, &src_needed)?;
4061
4062 if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
4064 continue;
4065 }
4066
4067 let mut fof_slots = join.two_hop(src_slot)?;
4069
4070 let first_hop_delta = delta_adj
4073 .get(&src_slot)
4074 .map(|v| v.as_slice())
4075 .unwrap_or(&[]);
4076 if !first_hop_delta.is_empty() {
4077 let mut delta_fof: HashSet<u64> = HashSet::new();
4078 for &mid_slot in first_hop_delta {
4079 for &fof in merged_csr.neighbors(mid_slot) {
4081 delta_fof.insert(fof);
4082 }
4083 if let Some(mid_neighbors) = delta_adj.get(&mid_slot) {
4085 for &fof in mid_neighbors {
4086 delta_fof.insert(fof);
4087 }
4088 }
4089 }
4090 fof_slots.extend(delta_fof);
4091 let unique: HashSet<u64> = fof_slots.into_iter().collect();
4093 fof_slots = unique.into_iter().collect();
4094 fof_slots.sort_unstable();
4095 }
4096
4097 for fof_slot in fof_slots {
4098 let fof_node = NodeId(((fof_label_id as u64) << 32) | fof_slot);
4099 let fof_props = read_node_props(&self.store, fof_node, &col_ids_fof)?;
4100
4101 if !self.matches_prop_filter(&fof_props, &fof_node_pat.props) {
4103 continue;
4104 }
4105
4106 if let Some(ref where_expr) = m.where_clause {
4108 let mut row_vals = build_row_vals(
4109 &src_props,
4110 &src_node_pat.var,
4111 &col_ids_src_where,
4112 &self.store,
4113 );
4114 row_vals.extend(build_row_vals(
4115 &fof_props,
4116 &fof_node_pat.var,
4117 &col_ids_fof,
4118 &self.store,
4119 ));
4120 if !src_node_pat.var.is_empty() && !src_label.is_empty() {
4122 row_vals.insert(
4123 format!("{}.__labels__", src_node_pat.var),
4124 Value::List(vec![Value::String(src_label.clone())]),
4125 );
4126 }
4127 if !fof_node_pat.var.is_empty() && !fof_label.is_empty() {
4128 row_vals.insert(
4129 format!("{}.__labels__", fof_node_pat.var),
4130 Value::List(vec![Value::String(fof_label.clone())]),
4131 );
4132 }
4133 if !pat.rels[0].var.is_empty() {
4135 row_vals.insert(
4136 format!("{}.__type__", pat.rels[0].var),
4137 Value::String(pat.rels[0].rel_type.clone()),
4138 );
4139 }
4140 if !pat.rels[1].var.is_empty() {
4141 row_vals.insert(
4142 format!("{}.__type__", pat.rels[1].var),
4143 Value::String(pat.rels[1].rel_type.clone()),
4144 );
4145 }
4146 row_vals.extend(self.dollar_params());
4147 if !self.eval_where_graph(where_expr, &row_vals) {
4148 continue;
4149 }
4150 }
4151
4152 let row = project_fof_row(
4153 &src_props,
4154 &fof_props,
4155 column_names,
4156 &src_node_pat.var,
4157 &self.store,
4158 );
4159 rows.push(row);
4160 }
4161 }
4162
4163 if m.distinct {
4165 deduplicate_rows(&mut rows);
4166 }
4167
4168 apply_order_by(&mut rows, m, column_names);
4170
4171 if let Some(skip) = m.skip {
4173 let skip = (skip as usize).min(rows.len());
4174 rows.drain(0..skip);
4175 }
4176
4177 if let Some(lim) = m.limit {
4179 rows.truncate(lim as usize);
4180 }
4181
4182 tracing::debug!(rows = rows.len(), "two-hop traversal complete");
4183 Ok(QueryResult {
4184 columns: column_names.to_vec(),
4185 rows,
4186 })
4187 }
4188
4189 fn execute_n_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
4204 let pat = &m.pattern[0];
4205 let n_nodes = pat.nodes.len();
4206 let n_rels = pat.rels.len();
4207
4208 if n_nodes != n_rels + 1 {
4210 return Err(sparrowdb_common::Error::Unimplemented);
4211 }
4212
4213 let col_ids_per_node: Vec<Vec<u32>> = (0..n_nodes)
4216 .map(|i| {
4217 let node_pat = &pat.nodes[i];
4218 let var = &node_pat.var;
4219 let mut ids = if var.is_empty() {
4220 vec![]
4221 } else {
4222 collect_col_ids_for_var(var, column_names, 0)
4223 };
4224 if let Some(ref where_expr) = m.where_clause {
4226 if !var.is_empty() {
4227 collect_col_ids_from_expr_for_var(where_expr, var, &mut ids);
4228 }
4229 }
4230 for p in &node_pat.props {
4232 let col_id = prop_name_to_col_id(&p.key);
4233 if !ids.contains(&col_id) {
4234 ids.push(col_id);
4235 }
4236 }
4237 if ids.is_empty() {
4239 ids.push(0);
4240 }
4241 ids
4242 })
4243 .collect();
4244
4245 let label_ids_per_node: Vec<Option<u32>> = (0..n_nodes)
4247 .map(|i| {
4248 let label = pat.nodes[i].labels.first().cloned().unwrap_or_default();
4249 if label.is_empty() {
4250 None
4251 } else {
4252 self.catalog
4253 .get_label(&label)
4254 .ok()
4255 .flatten()
4256 .map(|id| id as u32)
4257 }
4258 })
4259 .collect();
4260
4261 let src_label_id = match label_ids_per_node[0] {
4263 Some(id) => id,
4264 None => return Err(sparrowdb_common::Error::Unimplemented),
4265 };
4266 let hwm_src = self.store.hwm_for_label(src_label_id)?;
4267
4268 let delta_all = self.read_delta_all();
4270
4271 let mut rows: Vec<Vec<Value>> = Vec::new();
4272
4273 for src_slot in 0..hwm_src {
4274 self.check_deadline()?;
4276
4277 let src_node_id = NodeId(((src_label_id as u64) << 32) | src_slot);
4278
4279 if self.is_node_tombstoned(src_node_id) {
4281 continue;
4282 }
4283
4284 let src_props = read_node_props(&self.store, src_node_id, &col_ids_per_node[0])?;
4285
4286 if !self.matches_prop_filter(&src_props, &pat.nodes[0].props) {
4288 continue;
4289 }
4290
4291 let mut row_vals: HashMap<String, Value> = HashMap::new();
4293 if !pat.nodes[0].var.is_empty() {
4294 for &(col_id, raw) in &src_props {
4295 let key = format!("{}.col_{col_id}", pat.nodes[0].var);
4296 row_vals.insert(key, decode_raw_val(raw, &self.store));
4297 }
4298 }
4299
4300 let mut frontier: Vec<(u64, HashMap<String, Value>)> = vec![(src_slot, row_vals)];
4304
4305 for hop_idx in 0..n_rels {
4306 let next_node_pat = &pat.nodes[hop_idx + 1];
4307 let next_label_id_opt = label_ids_per_node[hop_idx + 1];
4308 let next_col_ids = &col_ids_per_node[hop_idx + 1];
4309 let cur_label_id = label_ids_per_node[hop_idx].unwrap_or(src_label_id);
4310
4311 let mut next_frontier: Vec<(u64, HashMap<String, Value>)> = Vec::new();
4312
4313 for (cur_slot, cur_vals) in frontier {
4314 let csr_nb: Vec<u64> = self.csr_neighbors_all(cur_slot);
4316 let delta_nb: Vec<u64> = delta_all
4317 .iter()
4318 .filter(|r| {
4319 let r_src_label = (r.src.0 >> 32) as u32;
4320 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4321 r_src_label == cur_label_id && r_src_slot == cur_slot
4322 })
4323 .map(|r| r.dst.0 & 0xFFFF_FFFF)
4324 .collect();
4325
4326 let mut seen: HashSet<u64> = HashSet::new();
4327 let all_nb: Vec<u64> = csr_nb
4328 .into_iter()
4329 .chain(delta_nb)
4330 .filter(|&nb| seen.insert(nb))
4331 .collect();
4332
4333 for next_slot in all_nb {
4334 let next_node_id = if let Some(lbl_id) = next_label_id_opt {
4335 NodeId(((lbl_id as u64) << 32) | next_slot)
4336 } else {
4337 NodeId(next_slot)
4338 };
4339
4340 let next_props = read_node_props(&self.store, next_node_id, next_col_ids)?;
4341
4342 if !self.matches_prop_filter(&next_props, &next_node_pat.props) {
4344 continue;
4345 }
4346
4347 let mut new_vals = cur_vals.clone();
4350 if !next_node_pat.var.is_empty() {
4351 for &(col_id, raw) in &next_props {
4352 let key = format!("{}.col_{col_id}", next_node_pat.var);
4353 new_vals.insert(key, decode_raw_val(raw, &self.store));
4354 }
4355 }
4356
4357 next_frontier.push((next_slot, new_vals));
4358 }
4359 }
4360
4361 frontier = next_frontier;
4362 }
4363
4364 for (_final_slot, path_vals) in frontier {
4366 if let Some(ref where_expr) = m.where_clause {
4368 let mut eval_vals = path_vals.clone();
4369 eval_vals.extend(self.dollar_params());
4370 if !self.eval_where_graph(where_expr, &eval_vals) {
4371 continue;
4372 }
4373 }
4374
4375 let row: Vec<Value> = column_names
4378 .iter()
4379 .map(|col_name| {
4380 if let Some((var, prop)) = col_name.split_once('.') {
4381 let key = format!("{var}.col_{}", col_id_of(prop));
4382 path_vals.get(&key).cloned().unwrap_or(Value::Null)
4383 } else {
4384 Value::Null
4385 }
4386 })
4387 .collect();
4388
4389 rows.push(row);
4390 }
4391 }
4392
4393 if m.distinct {
4395 deduplicate_rows(&mut rows);
4396 }
4397
4398 apply_order_by(&mut rows, m, column_names);
4400
4401 if let Some(skip) = m.skip {
4403 let skip = (skip as usize).min(rows.len());
4404 rows.drain(0..skip);
4405 }
4406
4407 if let Some(lim) = m.limit {
4409 rows.truncate(lim as usize);
4410 }
4411
4412 tracing::debug!(
4413 rows = rows.len(),
4414 n_rels = n_rels,
4415 "n-hop traversal complete"
4416 );
4417 Ok(QueryResult {
4418 columns: column_names.to_vec(),
4419 rows,
4420 })
4421 }
4422
4423 fn get_node_neighbors_labeled(
4438 &self,
4439 src_slot: u64,
4440 src_label_id: u32,
4441 delta_all: &[sparrowdb_storage::edge_store::DeltaRecord],
4442 node_label: &std::collections::HashSet<(u64, u32)>,
4443 all_label_ids: &[u32],
4444 out: &mut std::collections::HashSet<(u64, u32)>,
4445 ) {
4446 out.clear();
4447
4448 let csr_slots: Vec<u64> = self.csr_neighbors_all(src_slot);
4451
4452 for r in delta_all.iter().filter(|r| {
4455 let r_src_label = (r.src.0 >> 32) as u32;
4456 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4457 r_src_label == src_label_id && r_src_slot == src_slot
4458 }) {
4459 let dst_slot = r.dst.0 & 0xFFFF_FFFF;
4460 let dst_label = (r.dst.0 >> 32) as u32;
4461 out.insert((dst_slot, dst_label));
4462 }
4463
4464 'csr: for dst_slot in csr_slots {
4468 for &lid in all_label_ids {
4470 if out.contains(&(dst_slot, lid)) {
4471 continue 'csr; }
4473 }
4474 let mut found = false;
4477 for &lid in all_label_ids {
4478 if node_label.contains(&(dst_slot, lid)) {
4479 out.insert((dst_slot, lid));
4480 found = true;
4481 break;
4482 }
4483 }
4484 if !found {
4485 out.insert((dst_slot, src_label_id));
4489 }
4490 }
4491 }
4492
4493 #[allow(clippy::too_many_arguments)]
4514 fn execute_variable_hops(
4515 &self,
4516 src_slot: u64,
4517 src_label_id: u32,
4518 min_hops: u32,
4519 max_hops: u32,
4520 delta_all: &[sparrowdb_storage::edge_store::DeltaRecord],
4521 node_label: &std::collections::HashSet<(u64, u32)>,
4522 all_label_ids: &[u32],
4523 neighbors_buf: &mut std::collections::HashSet<(u64, u32)>,
4524 ) -> Vec<(u64, u32)> {
4525 const PATH_RESULT_CAP: usize = 100_000;
4528 const SAFETY_CAP: u32 = 10;
4529 let max_hops = max_hops.min(SAFETY_CAP);
4530
4531 let mut results: Vec<(u64, u32)> = Vec::new();
4532
4533 if min_hops == 0 {
4535 results.push((src_slot, src_label_id));
4536 if max_hops == 0 {
4537 return results;
4538 }
4539 }
4540
4541 type Frame = (u64, u32, u32, Vec<(u64, u32)>);
4556
4557 let mut path_visited: std::collections::HashSet<(u64, u32)> =
4559 std::collections::HashSet::new();
4560 path_visited.insert((src_slot, src_label_id));
4561
4562 self.get_node_neighbors_labeled(
4564 src_slot,
4565 src_label_id,
4566 delta_all,
4567 node_label,
4568 all_label_ids,
4569 neighbors_buf,
4570 );
4571 let src_nbrs: Vec<(u64, u32)> = neighbors_buf.iter().copied().collect();
4572
4573 let mut stack: Vec<Frame> = vec![(src_slot, src_label_id, 1, src_nbrs)];
4575
4576 while let Some(frame) = stack.last_mut() {
4577 let (_, _, depth, ref mut nbrs) = *frame;
4578
4579 match nbrs.pop() {
4580 None => {
4581 let (popped_slot, popped_label, popped_depth, _) = stack.pop().unwrap();
4583 if popped_depth > 1 {
4586 path_visited.remove(&(popped_slot, popped_label));
4587 }
4588 }
4589 Some((nb_slot, nb_label)) => {
4590 if path_visited.contains(&(nb_slot, nb_label)) {
4592 continue;
4593 }
4594
4595 if depth >= min_hops {
4597 results.push((nb_slot, nb_label));
4598 if results.len() >= PATH_RESULT_CAP {
4599 eprintln!(
4600 "sparrowdb: variable-length path result cap ({PATH_RESULT_CAP}) \
4601 hit; truncating results. Consider a tighter *M..N bound."
4602 );
4603 return results;
4604 }
4605 }
4606
4607 if depth < max_hops {
4609 path_visited.insert((nb_slot, nb_label));
4610 self.get_node_neighbors_labeled(
4611 nb_slot,
4612 nb_label,
4613 delta_all,
4614 node_label,
4615 all_label_ids,
4616 neighbors_buf,
4617 );
4618 let next_nbrs: Vec<(u64, u32)> = neighbors_buf.iter().copied().collect();
4619 stack.push((nb_slot, nb_label, depth + 1, next_nbrs));
4620 }
4621 }
4622 }
4623 }
4624
4625 results
4626 }
4627
4628 fn get_node_neighbors_by_slot(
4630 &self,
4631 src_slot: u64,
4632 src_label_id: u32,
4633 delta_all: &[sparrowdb_storage::edge_store::DeltaRecord],
4634 ) -> Vec<u64> {
4635 let csr_neighbors: Vec<u64> = self.csr_neighbors_all(src_slot);
4636 let delta_neighbors: Vec<u64> = delta_all
4637 .iter()
4638 .filter(|r| {
4639 let r_src_label = (r.src.0 >> 32) as u32;
4640 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4641 r_src_label == src_label_id && r_src_slot == src_slot
4642 })
4643 .map(|r| r.dst.0 & 0xFFFF_FFFF)
4644 .collect();
4645 let mut all: std::collections::HashSet<u64> = csr_neighbors.into_iter().collect();
4646 all.extend(delta_neighbors);
4647 all.into_iter().collect()
4648 }
4649
4650 fn execute_variable_length(
4652 &self,
4653 m: &MatchStatement,
4654 column_names: &[String],
4655 ) -> Result<QueryResult> {
4656 let pat = &m.pattern[0];
4657 let src_node_pat = &pat.nodes[0];
4658 let dst_node_pat = &pat.nodes[1];
4659 let rel_pat = &pat.rels[0];
4660
4661 if rel_pat.dir != sparrowdb_cypher::ast::EdgeDir::Outgoing {
4662 return Err(sparrowdb_common::Error::Unimplemented);
4663 }
4664
4665 let min_hops = rel_pat.min_hops.unwrap_or(1);
4666 let max_hops = rel_pat.max_hops.unwrap_or(10); let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
4669 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
4670
4671 let src_label_id = self
4672 .catalog
4673 .get_label(&src_label)?
4674 .ok_or(sparrowdb_common::Error::NotFound)? as u32;
4675 let dst_label_id: Option<u32> = if dst_label.is_empty() {
4677 None
4678 } else {
4679 Some(
4680 self.catalog
4681 .get_label(&dst_label)?
4682 .ok_or(sparrowdb_common::Error::NotFound)? as u32,
4683 )
4684 };
4685
4686 let hwm_src = self.store.hwm_for_label(src_label_id)?;
4687
4688 let col_ids_src = collect_col_ids_for_var(&src_node_pat.var, column_names, src_label_id);
4689 let col_ids_dst =
4690 collect_col_ids_for_var(&dst_node_pat.var, column_names, dst_label_id.unwrap_or(0));
4691
4692 let dst_all_col_ids: Vec<u32> = {
4695 let mut v = col_ids_dst.clone();
4696 for p in &dst_node_pat.props {
4697 let col_id = prop_name_to_col_id(&p.key);
4698 if !v.contains(&col_id) {
4699 v.push(col_id);
4700 }
4701 }
4702 if let Some(ref where_expr) = m.where_clause {
4703 collect_col_ids_from_expr(where_expr, &mut v);
4704 }
4705 v
4706 };
4707
4708 let mut rows: Vec<Vec<Value>> = Vec::new();
4709 let labels_by_id: std::collections::HashMap<u16, String> = self
4718 .catalog
4719 .list_labels()
4720 .unwrap_or_default()
4721 .into_iter()
4722 .collect();
4723
4724 let delta_all = self.read_delta_all();
4729 let mut node_label: std::collections::HashSet<(u64, u32)> =
4730 std::collections::HashSet::new();
4731 for r in &delta_all {
4732 let src_s = r.src.0 & 0xFFFF_FFFF;
4733 let src_l = (r.src.0 >> 32) as u32;
4734 node_label.insert((src_s, src_l));
4735 let dst_s = r.dst.0 & 0xFFFF_FFFF;
4736 let dst_l = (r.dst.0 >> 32) as u32;
4737 node_label.insert((dst_s, dst_l));
4738 }
4739 let mut all_label_ids: Vec<u32> = node_label.iter().map(|&(_, l)| l).collect();
4740 all_label_ids.sort_unstable();
4741 all_label_ids.dedup();
4742
4743 let mut neighbors_buf: std::collections::HashSet<(u64, u32)> =
4745 std::collections::HashSet::new();
4746
4747 for src_slot in 0..hwm_src {
4748 self.check_deadline()?;
4750
4751 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
4752
4753 let src_all_col_ids: Vec<u32> = {
4755 let mut v = col_ids_src.clone();
4756 for p in &src_node_pat.props {
4757 let col_id = prop_name_to_col_id(&p.key);
4758 if !v.contains(&col_id) {
4759 v.push(col_id);
4760 }
4761 }
4762 if let Some(ref where_expr) = m.where_clause {
4763 collect_col_ids_from_expr(where_expr, &mut v);
4764 }
4765 v
4766 };
4767 let src_props = read_node_props(&self.store, src_node, &src_all_col_ids)?;
4768
4769 if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
4770 continue;
4771 }
4772
4773 let dst_nodes = self.execute_variable_hops(
4777 src_slot,
4778 src_label_id,
4779 min_hops,
4780 max_hops,
4781 &delta_all,
4782 &node_label,
4783 &all_label_ids,
4784 &mut neighbors_buf,
4785 );
4786
4787 for (dst_slot, actual_label_id) in dst_nodes {
4788 if let Some(required_label) = dst_label_id {
4791 if actual_label_id != required_label {
4792 continue;
4793 }
4794 }
4795
4796 let resolved_dst_label_id = dst_label_id.unwrap_or(actual_label_id);
4799
4800 let dst_node = NodeId(((resolved_dst_label_id as u64) << 32) | dst_slot);
4801 let dst_props = read_node_props(&self.store, dst_node, &dst_all_col_ids)?;
4806
4807 if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
4808 continue;
4809 }
4810
4811 let resolved_dst_label_name: String = if !dst_label.is_empty() {
4815 dst_label.clone()
4816 } else {
4817 labels_by_id
4818 .get(&(actual_label_id as u16))
4819 .cloned()
4820 .unwrap_or_default()
4821 };
4822
4823 if let Some(ref where_expr) = m.where_clause {
4825 let mut row_vals =
4826 build_row_vals(&src_props, &src_node_pat.var, &col_ids_src, &self.store);
4827 row_vals.extend(build_row_vals(
4828 &dst_props,
4829 &dst_node_pat.var,
4830 &col_ids_dst,
4831 &self.store,
4832 ));
4833 if !rel_pat.var.is_empty() {
4835 row_vals.insert(
4836 format!("{}.__type__", rel_pat.var),
4837 Value::String(rel_pat.rel_type.clone()),
4838 );
4839 }
4840 if !src_node_pat.var.is_empty() && !src_label.is_empty() {
4842 row_vals.insert(
4843 format!("{}.__labels__", src_node_pat.var),
4844 Value::List(vec![Value::String(src_label.clone())]),
4845 );
4846 }
4847 if !dst_node_pat.var.is_empty() && !resolved_dst_label_name.is_empty() {
4850 row_vals.insert(
4851 format!("{}.__labels__", dst_node_pat.var),
4852 Value::List(vec![Value::String(resolved_dst_label_name.clone())]),
4853 );
4854 }
4855 row_vals.extend(self.dollar_params());
4856 if !self.eval_where_graph(where_expr, &row_vals) {
4857 continue;
4858 }
4859 }
4860
4861 let rel_var_type = if !rel_pat.var.is_empty() {
4862 Some((rel_pat.var.as_str(), rel_pat.rel_type.as_str()))
4863 } else {
4864 None
4865 };
4866 let src_label_meta = if !src_node_pat.var.is_empty() && !src_label.is_empty() {
4867 Some((src_node_pat.var.as_str(), src_label.as_str()))
4868 } else {
4869 None
4870 };
4871 let dst_label_meta =
4872 if !dst_node_pat.var.is_empty() && !resolved_dst_label_name.is_empty() {
4873 Some((dst_node_pat.var.as_str(), resolved_dst_label_name.as_str()))
4874 } else {
4875 None
4876 };
4877 let row = project_hop_row(
4878 &src_props,
4879 &dst_props,
4880 column_names,
4881 &src_node_pat.var,
4882 &dst_node_pat.var,
4883 rel_var_type,
4884 src_label_meta,
4885 dst_label_meta,
4886 &self.store,
4887 );
4888 rows.push(row);
4889 }
4890 }
4891
4892 if m.distinct {
4894 deduplicate_rows(&mut rows);
4895 }
4896
4897 apply_order_by(&mut rows, m, column_names);
4899
4900 if let Some(skip) = m.skip {
4902 let skip = (skip as usize).min(rows.len());
4903 rows.drain(0..skip);
4904 }
4905
4906 if let Some(lim) = m.limit {
4908 rows.truncate(lim as usize);
4909 }
4910
4911 tracing::debug!(
4912 rows = rows.len(),
4913 min_hops,
4914 max_hops,
4915 "variable-length traversal complete"
4916 );
4917 Ok(QueryResult {
4918 columns: column_names.to_vec(),
4919 rows,
4920 })
4921 }
4922
4923 fn matches_prop_filter(
4926 &self,
4927 props: &[(u32, u64)],
4928 filters: &[sparrowdb_cypher::ast::PropEntry],
4929 ) -> bool {
4930 matches_prop_filter_static(props, filters, &self.dollar_params(), &self.store)
4931 }
4932
4933 fn dollar_params(&self) -> HashMap<String, Value> {
4939 self.params
4940 .iter()
4941 .map(|(k, v)| (format!("${k}"), v.clone()))
4942 .collect()
4943 }
4944
4945 fn eval_expr_graph(&self, expr: &Expr, vals: &HashMap<String, Value>) -> Value {
4949 match expr {
4950 Expr::ExistsSubquery(ep) => Value::Bool(self.eval_exists_subquery(ep, vals)),
4951 Expr::ShortestPath(sp) => self.eval_shortest_path_expr(sp, vals),
4952 Expr::CaseWhen {
4953 branches,
4954 else_expr,
4955 } => {
4956 for (cond, then_val) in branches {
4957 if let Value::Bool(true) = self.eval_expr_graph(cond, vals) {
4958 return self.eval_expr_graph(then_val, vals);
4959 }
4960 }
4961 else_expr
4962 .as_ref()
4963 .map(|e| self.eval_expr_graph(e, vals))
4964 .unwrap_or(Value::Null)
4965 }
4966 Expr::And(l, r) => {
4967 match (self.eval_expr_graph(l, vals), self.eval_expr_graph(r, vals)) {
4968 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
4969 _ => Value::Null,
4970 }
4971 }
4972 Expr::Or(l, r) => {
4973 match (self.eval_expr_graph(l, vals), self.eval_expr_graph(r, vals)) {
4974 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
4975 _ => Value::Null,
4976 }
4977 }
4978 Expr::Not(inner) => match self.eval_expr_graph(inner, vals) {
4979 Value::Bool(b) => Value::Bool(!b),
4980 _ => Value::Null,
4981 },
4982 Expr::PropAccess { var, prop } => {
4985 let normal = eval_expr(expr, vals);
4987 if !matches!(normal, Value::Null) {
4988 return normal;
4989 }
4990 if let Some(Value::NodeRef(node_id)) = vals
4992 .get(var.as_str())
4993 .or_else(|| vals.get(&format!("{var}.__node_id__")))
4994 {
4995 let col_id = prop_name_to_col_id(prop);
4996 if let Ok(props) = self.store.get_node_raw(*node_id, &[col_id]) {
4997 if let Some(&(_, raw)) = props.iter().find(|(c, _)| *c == col_id) {
4998 return decode_raw_val(raw, &self.store);
4999 }
5000 }
5001 }
5002 Value::Null
5003 }
5004 _ => eval_expr(expr, vals),
5005 }
5006 }
5007
5008 fn eval_where_graph(&self, expr: &Expr, vals: &HashMap<String, Value>) -> bool {
5010 match self.eval_expr_graph(expr, vals) {
5011 Value::Bool(b) => b,
5012 _ => eval_where(expr, vals),
5013 }
5014 }
5015
5016 fn eval_exists_subquery(
5018 &self,
5019 ep: &sparrowdb_cypher::ast::ExistsPattern,
5020 vals: &HashMap<String, Value>,
5021 ) -> bool {
5022 let path = &ep.path;
5023 if path.nodes.len() < 2 || path.rels.is_empty() {
5024 return false;
5025 }
5026 let src_pat = &path.nodes[0];
5027 let dst_pat = &path.nodes[1];
5028 let rel_pat = &path.rels[0];
5029
5030 let src_node_id = match self.resolve_node_id_from_var(&src_pat.var, vals) {
5031 Some(id) => id,
5032 None => return false,
5033 };
5034 let src_slot = src_node_id.0 & 0xFFFF_FFFF;
5035 let src_label_id = (src_node_id.0 >> 32) as u32;
5036
5037 let dst_label = dst_pat.labels.first().map(String::as_str).unwrap_or("");
5038 let dst_label_id_opt: Option<u32> = if dst_label.is_empty() {
5039 None
5040 } else {
5041 self.catalog
5042 .get_label(dst_label)
5043 .ok()
5044 .flatten()
5045 .map(|id| id as u32)
5046 };
5047
5048 let rel_lookup = if let Some(dst_lid) = dst_label_id_opt {
5049 self.resolve_rel_table_id(src_label_id, dst_lid, &rel_pat.rel_type)
5050 } else {
5051 RelTableLookup::All
5052 };
5053
5054 let csr_nb: Vec<u64> = match rel_lookup {
5055 RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
5056 RelTableLookup::NotFound => return false,
5057 RelTableLookup::All => self.csr_neighbors_all(src_slot),
5058 };
5059 let delta_nb: Vec<u64> = self
5060 .read_delta_all()
5061 .into_iter()
5062 .filter(|r| {
5063 let r_src_label = (r.src.0 >> 32) as u32;
5064 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
5065 if r_src_label != src_label_id || r_src_slot != src_slot {
5066 return false;
5067 }
5068 if let Some(dst_lid) = dst_label_id_opt {
5072 let r_dst_label = (r.dst.0 >> 32) as u32;
5073 r_dst_label == dst_lid
5074 } else {
5075 true
5076 }
5077 })
5078 .map(|r| r.dst.0 & 0xFFFF_FFFF)
5079 .collect();
5080
5081 let all_nb: std::collections::HashSet<u64> = csr_nb.into_iter().chain(delta_nb).collect();
5082
5083 for dst_slot in all_nb {
5084 if let Some(did) = dst_label_id_opt {
5085 let probe_id = NodeId(((did as u64) << 32) | dst_slot);
5086 if self.store.get_node_raw(probe_id, &[]).is_err() {
5087 continue;
5088 }
5089 if !dst_pat.props.is_empty() {
5090 let col_ids: Vec<u32> = dst_pat
5091 .props
5092 .iter()
5093 .map(|p| prop_name_to_col_id(&p.key))
5094 .collect();
5095 match self.store.get_node_raw(probe_id, &col_ids) {
5096 Ok(props) => {
5097 let params = self.dollar_params();
5098 if !matches_prop_filter_static(
5099 &props,
5100 &dst_pat.props,
5101 ¶ms,
5102 &self.store,
5103 ) {
5104 continue;
5105 }
5106 }
5107 Err(_) => continue,
5108 }
5109 }
5110 }
5111 return true;
5112 }
5113 false
5114 }
5115
5116 fn resolve_node_id_from_var(&self, var: &str, vals: &HashMap<String, Value>) -> Option<NodeId> {
5118 let id_key = format!("{var}.__node_id__");
5119 if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
5120 return Some(*nid);
5121 }
5122 if let Some(Value::NodeRef(nid)) = vals.get(var) {
5123 return Some(*nid);
5124 }
5125 None
5126 }
5127
5128 fn eval_shortest_path_expr(
5130 &self,
5131 sp: &sparrowdb_cypher::ast::ShortestPathExpr,
5132 vals: &HashMap<String, Value>,
5133 ) -> Value {
5134 let (src_label_id, src_slot) =
5139 if let Some(nid) = self.resolve_node_id_from_var(&sp.src_var, vals) {
5140 let label_id = (nid.0 >> 32) as u32;
5141 let slot = nid.0 & 0xFFFF_FFFF;
5142 (label_id, slot)
5143 } else {
5144 let label_id = match self.catalog.get_label(&sp.src_label) {
5146 Ok(Some(id)) => id as u32,
5147 _ => return Value::Null,
5148 };
5149 match self.find_node_by_props(label_id, &sp.src_props) {
5150 Some(slot) => (label_id, slot),
5151 None => return Value::Null,
5152 }
5153 };
5154
5155 let dst_slot = if let Some(nid) = self.resolve_node_id_from_var(&sp.dst_var, vals) {
5156 nid.0 & 0xFFFF_FFFF
5157 } else {
5158 let dst_label_id = match self.catalog.get_label(&sp.dst_label) {
5159 Ok(Some(id)) => id as u32,
5160 _ => return Value::Null,
5161 };
5162 match self.find_node_by_props(dst_label_id, &sp.dst_props) {
5163 Some(slot) => slot,
5164 None => return Value::Null,
5165 }
5166 };
5167
5168 match self.bfs_shortest_path(src_slot, src_label_id, dst_slot, 10) {
5169 Some(hops) => Value::Int64(hops as i64),
5170 None => Value::Null,
5171 }
5172 }
5173
5174 fn find_node_by_props(
5176 &self,
5177 label_id: u32,
5178 props: &[sparrowdb_cypher::ast::PropEntry],
5179 ) -> Option<u64> {
5180 if props.is_empty() {
5181 return None;
5182 }
5183 let hwm = self.store.hwm_for_label(label_id).ok()?;
5184 let col_ids: Vec<u32> = props.iter().map(|p| prop_name_to_col_id(&p.key)).collect();
5185 let params = self.dollar_params();
5186 for slot in 0..hwm {
5187 let node_id = NodeId(((label_id as u64) << 32) | slot);
5188 if let Ok(raw_props) = self.store.get_node_raw(node_id, &col_ids) {
5189 if matches_prop_filter_static(&raw_props, props, ¶ms, &self.store) {
5190 return Some(slot);
5191 }
5192 }
5193 }
5194 None
5195 }
5196
5197 fn bfs_shortest_path(
5206 &self,
5207 src_slot: u64,
5208 src_label_id: u32,
5209 dst_slot: u64,
5210 max_hops: u32,
5211 ) -> Option<u32> {
5212 if src_slot == dst_slot {
5213 return Some(0);
5214 }
5215 let delta_all = self.read_delta_all();
5217 let mut visited: std::collections::HashSet<u64> = std::collections::HashSet::new();
5218 visited.insert(src_slot);
5219 let mut frontier: Vec<u64> = vec![src_slot];
5220
5221 for depth in 1..=max_hops {
5222 let mut next_frontier: Vec<u64> = Vec::new();
5223 for &node_slot in &frontier {
5224 let neighbors =
5225 self.get_node_neighbors_by_slot(node_slot, src_label_id, &delta_all);
5226 for nb in neighbors {
5227 if nb == dst_slot {
5228 return Some(depth);
5229 }
5230 if visited.insert(nb) {
5231 next_frontier.push(nb);
5232 }
5233 }
5234 }
5235 if next_frontier.is_empty() {
5236 break;
5237 }
5238 frontier = next_frontier;
5239 }
5240 None
5241 }
5242
5243 fn aggregate_rows_graph(
5246 &self,
5247 rows: &[HashMap<String, Value>],
5248 return_items: &[ReturnItem],
5249 ) -> Vec<Vec<Value>> {
5250 let needs_graph = return_items.iter().any(|item| expr_needs_graph(&item.expr));
5252 if !needs_graph {
5253 return aggregate_rows(rows, return_items);
5254 }
5255 rows.iter()
5257 .map(|row_vals| {
5258 return_items
5259 .iter()
5260 .map(|item| self.eval_expr_graph(&item.expr, row_vals))
5261 .collect()
5262 })
5263 .collect()
5264 }
5265}
5266
5267fn matches_prop_filter_static(
5270 props: &[(u32, u64)],
5271 filters: &[sparrowdb_cypher::ast::PropEntry],
5272 params: &HashMap<String, Value>,
5273 store: &NodeStore,
5274) -> bool {
5275 for f in filters {
5276 let col_id = prop_name_to_col_id(&f.key);
5277 let stored_val = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
5278
5279 let filter_val = eval_expr(&f.value, params);
5282 let matches = match filter_val {
5283 Value::Int64(n) => {
5284 stored_val == Some(StoreValue::Int64(n).to_u64())
5287 }
5288 Value::Bool(b) => {
5289 let expected = StoreValue::Int64(if b { 1 } else { 0 }).to_u64();
5292 stored_val == Some(expected)
5293 }
5294 Value::String(s) => {
5295 stored_val.is_some_and(|raw| store.raw_str_matches(raw, &s))
5298 }
5299 Value::Float64(f) => {
5300 stored_val.is_some_and(|raw| {
5303 matches!(store.decode_raw_value(raw), StoreValue::Float(stored_f) if stored_f == f)
5304 })
5305 }
5306 Value::Null => true, _ => false,
5308 };
5309 if !matches {
5310 return false;
5311 }
5312 }
5313 true
5314}
5315
5316fn eval_list_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<Value>> {
5325 match expr {
5326 Expr::List(elems) => {
5327 let mut values = Vec::with_capacity(elems.len());
5328 for elem in elems {
5329 values.push(eval_scalar_expr(elem));
5330 }
5331 Ok(values)
5332 }
5333 Expr::Literal(Literal::Param(name)) => {
5334 match params.get(name) {
5336 Some(Value::List(items)) => Ok(items.clone()),
5337 Some(other) => {
5338 Ok(vec![other.clone()])
5341 }
5342 None => {
5343 Ok(vec![])
5345 }
5346 }
5347 }
5348 Expr::FnCall { name, args } => {
5349 let name_lc = name.to_lowercase();
5352 if name_lc == "range" {
5353 let empty_vals: std::collections::HashMap<String, Value> =
5354 std::collections::HashMap::new();
5355 let evaluated: Vec<Value> =
5356 args.iter().map(|a| eval_expr(a, &empty_vals)).collect();
5357 let start = match evaluated.first() {
5359 Some(Value::Int64(n)) => *n,
5360 _ => {
5361 return Err(sparrowdb_common::Error::InvalidArgument(
5362 "range() expects integer arguments".into(),
5363 ))
5364 }
5365 };
5366 let end = match evaluated.get(1) {
5367 Some(Value::Int64(n)) => *n,
5368 _ => {
5369 return Err(sparrowdb_common::Error::InvalidArgument(
5370 "range() expects at least 2 integer arguments".into(),
5371 ))
5372 }
5373 };
5374 let step: i64 = match evaluated.get(2) {
5375 Some(Value::Int64(n)) => *n,
5376 None => 1,
5377 _ => 1,
5378 };
5379 if step == 0 {
5380 return Err(sparrowdb_common::Error::InvalidArgument(
5381 "range(): step must not be zero".into(),
5382 ));
5383 }
5384 let mut values = Vec::new();
5385 if step > 0 {
5386 let mut i = start;
5387 while i <= end {
5388 values.push(Value::Int64(i));
5389 i += step;
5390 }
5391 } else {
5392 let mut i = start;
5393 while i >= end {
5394 values.push(Value::Int64(i));
5395 i += step;
5396 }
5397 }
5398 Ok(values)
5399 } else {
5400 Err(sparrowdb_common::Error::InvalidArgument(format!(
5402 "UNWIND: function '{name}' does not return a list"
5403 )))
5404 }
5405 }
5406 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
5407 "UNWIND expression is not a list: {:?}",
5408 other
5409 ))),
5410 }
5411}
5412
5413fn eval_scalar_expr(expr: &Expr) -> Value {
5415 match expr {
5416 Expr::Literal(lit) => match lit {
5417 Literal::Int(n) => Value::Int64(*n),
5418 Literal::Float(f) => Value::Float64(*f),
5419 Literal::Bool(b) => Value::Bool(*b),
5420 Literal::String(s) => Value::String(s.clone()),
5421 Literal::Null => Value::Null,
5422 Literal::Param(_) => Value::Null,
5423 },
5424 _ => Value::Null,
5425 }
5426}
5427
5428fn extract_return_column_names(items: &[ReturnItem]) -> Vec<String> {
5429 items
5430 .iter()
5431 .map(|item| match &item.alias {
5432 Some(alias) => alias.clone(),
5433 None => match &item.expr {
5434 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
5435 Expr::Var(v) => v.clone(),
5436 Expr::CountStar => "count(*)".to_string(),
5437 Expr::FnCall { name, args } => {
5438 let arg_str = args
5439 .first()
5440 .map(|a| match a {
5441 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
5442 Expr::Var(v) => v.clone(),
5443 _ => "*".to_string(),
5444 })
5445 .unwrap_or_else(|| "*".to_string());
5446 format!("{}({})", name.to_lowercase(), arg_str)
5447 }
5448 _ => "?".to_string(),
5449 },
5450 })
5451 .collect()
5452}
5453
5454fn collect_col_ids_from_expr_for_var(expr: &Expr, target_var: &str, out: &mut Vec<u32>) {
5461 match expr {
5462 Expr::PropAccess { var, prop } => {
5463 if var == target_var {
5464 let col_id = prop_name_to_col_id(prop);
5465 if !out.contains(&col_id) {
5466 out.push(col_id);
5467 }
5468 }
5469 }
5470 Expr::BinOp { left, right, .. } => {
5471 collect_col_ids_from_expr_for_var(left, target_var, out);
5472 collect_col_ids_from_expr_for_var(right, target_var, out);
5473 }
5474 Expr::And(l, r) | Expr::Or(l, r) => {
5475 collect_col_ids_from_expr_for_var(l, target_var, out);
5476 collect_col_ids_from_expr_for_var(r, target_var, out);
5477 }
5478 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
5479 collect_col_ids_from_expr_for_var(inner, target_var, out);
5480 }
5481 Expr::InList { expr, list, .. } => {
5482 collect_col_ids_from_expr_for_var(expr, target_var, out);
5483 for item in list {
5484 collect_col_ids_from_expr_for_var(item, target_var, out);
5485 }
5486 }
5487 Expr::FnCall { args, .. } | Expr::List(args) => {
5488 for arg in args {
5489 collect_col_ids_from_expr_for_var(arg, target_var, out);
5490 }
5491 }
5492 Expr::ListPredicate {
5493 list_expr,
5494 predicate,
5495 ..
5496 } => {
5497 collect_col_ids_from_expr_for_var(list_expr, target_var, out);
5498 collect_col_ids_from_expr_for_var(predicate, target_var, out);
5499 }
5500 Expr::CaseWhen {
5502 branches,
5503 else_expr,
5504 } => {
5505 for (cond, then_val) in branches {
5506 collect_col_ids_from_expr_for_var(cond, target_var, out);
5507 collect_col_ids_from_expr_for_var(then_val, target_var, out);
5508 }
5509 if let Some(e) = else_expr {
5510 collect_col_ids_from_expr_for_var(e, target_var, out);
5511 }
5512 }
5513 _ => {}
5514 }
5515}
5516
5517fn collect_col_ids_from_expr(expr: &Expr, out: &mut Vec<u32>) {
5522 match expr {
5523 Expr::PropAccess { prop, .. } => {
5524 let col_id = prop_name_to_col_id(prop);
5525 if !out.contains(&col_id) {
5526 out.push(col_id);
5527 }
5528 }
5529 Expr::BinOp { left, right, .. } => {
5530 collect_col_ids_from_expr(left, out);
5531 collect_col_ids_from_expr(right, out);
5532 }
5533 Expr::And(l, r) | Expr::Or(l, r) => {
5534 collect_col_ids_from_expr(l, out);
5535 collect_col_ids_from_expr(r, out);
5536 }
5537 Expr::Not(inner) => collect_col_ids_from_expr(inner, out),
5538 Expr::InList { expr, list, .. } => {
5539 collect_col_ids_from_expr(expr, out);
5540 for item in list {
5541 collect_col_ids_from_expr(item, out);
5542 }
5543 }
5544 Expr::FnCall { args, .. } => {
5546 for arg in args {
5547 collect_col_ids_from_expr(arg, out);
5548 }
5549 }
5550 Expr::ListPredicate {
5551 list_expr,
5552 predicate,
5553 ..
5554 } => {
5555 collect_col_ids_from_expr(list_expr, out);
5556 collect_col_ids_from_expr(predicate, out);
5557 }
5558 Expr::List(items) => {
5560 for item in items {
5561 collect_col_ids_from_expr(item, out);
5562 }
5563 }
5564 Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
5565 collect_col_ids_from_expr(inner, out);
5566 }
5567 Expr::CaseWhen {
5569 branches,
5570 else_expr,
5571 } => {
5572 for (cond, then_val) in branches {
5573 collect_col_ids_from_expr(cond, out);
5574 collect_col_ids_from_expr(then_val, out);
5575 }
5576 if let Some(e) = else_expr {
5577 collect_col_ids_from_expr(e, out);
5578 }
5579 }
5580 _ => {}
5581 }
5582}
5583
5584#[allow(dead_code)]
5589fn literal_to_store_value(lit: &Literal) -> StoreValue {
5590 match lit {
5591 Literal::Int(n) => StoreValue::Int64(*n),
5592 Literal::String(s) => StoreValue::Bytes(s.as_bytes().to_vec()),
5593 Literal::Float(f) => StoreValue::Float(*f),
5594 Literal::Bool(b) => StoreValue::Int64(if *b { 1 } else { 0 }),
5595 Literal::Null | Literal::Param(_) => StoreValue::Int64(0),
5596 }
5597}
5598
5599fn value_to_store_value(val: Value) -> StoreValue {
5604 match val {
5605 Value::Int64(n) => StoreValue::Int64(n),
5606 Value::Float64(f) => StoreValue::Float(f),
5607 Value::Bool(b) => StoreValue::Int64(if b { 1 } else { 0 }),
5608 Value::String(s) => StoreValue::Bytes(s.into_bytes()),
5609 Value::Null => StoreValue::Int64(0),
5610 Value::NodeRef(id) => StoreValue::Int64(id.0 as i64),
5611 Value::EdgeRef(id) => StoreValue::Int64(id.0 as i64),
5612 Value::List(_) => StoreValue::Int64(0),
5613 Value::Map(_) => StoreValue::Int64(0),
5614 }
5615}
5616
5617fn string_to_raw_u64(s: &str) -> u64 {
5623 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
5624}
5625
5626fn try_index_lookup_for_props(
5637 props: &[sparrowdb_cypher::ast::PropEntry],
5638 label_id: u32,
5639 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
5640) -> Option<Vec<u32>> {
5641 if props.len() != 1 {
5643 return None;
5644 }
5645 let filter = &props[0];
5646
5647 let raw_value: u64 = match &filter.value {
5649 Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
5650 Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
5651 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
5652 }
5653 _ => return None,
5656 };
5657
5658 let col_id = prop_name_to_col_id(&filter.key);
5659 if !prop_index.is_indexed(label_id, col_id) {
5660 return None;
5661 }
5662 Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
5663}
5664
5665fn try_text_index_lookup(
5678 expr: &Expr,
5679 node_var: &str,
5680 label_id: u32,
5681 text_index: &TextIndex,
5682) -> Option<Vec<u32>> {
5683 let (left, op, right) = match expr {
5684 Expr::BinOp { left, op, right }
5685 if matches!(op, BinOpKind::Contains | BinOpKind::StartsWith) =>
5686 {
5687 (left.as_ref(), op, right.as_ref())
5688 }
5689 _ => return None,
5690 };
5691
5692 let prop_name = match left {
5694 Expr::PropAccess { var, prop } if var.as_str() == node_var => prop.as_str(),
5695 _ => return None,
5696 };
5697
5698 let pattern = match right {
5700 Expr::Literal(Literal::String(s)) => s.as_str(),
5701 _ => return None,
5702 };
5703
5704 let col_id = prop_name_to_col_id(prop_name);
5705 if !text_index.is_indexed(label_id, col_id) {
5706 return None;
5707 }
5708
5709 let slots = match op {
5710 BinOpKind::Contains => text_index.lookup_contains(label_id, col_id, pattern),
5711 BinOpKind::StartsWith => text_index.lookup_starts_with(label_id, col_id, pattern),
5712 _ => return None,
5713 };
5714
5715 Some(slots)
5716}
5717
5718fn where_clause_text_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
5726 let left = match expr {
5727 Expr::BinOp {
5728 left,
5729 op: BinOpKind::Contains | BinOpKind::StartsWith,
5730 right: _,
5731 } => left.as_ref(),
5732 _ => return vec![],
5733 };
5734 if let Expr::PropAccess { var, prop } = left {
5735 if var.as_str() == node_var {
5736 return vec![prop.as_str()];
5737 }
5738 }
5739 vec![]
5740}
5741
5742fn where_clause_eq_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
5748 let (left, right) = match expr {
5749 Expr::BinOp {
5750 left,
5751 op: BinOpKind::Eq,
5752 right,
5753 } => (left.as_ref(), right.as_ref()),
5754 _ => return vec![],
5755 };
5756 if let Expr::PropAccess { var, prop } = left {
5757 if var.as_str() == node_var {
5758 return vec![prop.as_str()];
5759 }
5760 }
5761 if let Expr::PropAccess { var, prop } = right {
5762 if var.as_str() == node_var {
5763 return vec![prop.as_str()];
5764 }
5765 }
5766 vec![]
5767}
5768
5769fn where_clause_range_prop_names<'a>(expr: &'a Expr, node_var: &str) -> Vec<&'a str> {
5775 let is_range_op = |op: &BinOpKind| {
5776 matches!(
5777 op,
5778 BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
5779 )
5780 };
5781
5782 if let Expr::BinOp { left, op, right } = expr {
5784 if is_range_op(op) {
5785 if let Expr::PropAccess { var, prop } = left.as_ref() {
5786 if var.as_str() == node_var {
5787 return vec![prop.as_str()];
5788 }
5789 }
5790 if let Expr::PropAccess { var, prop } = right.as_ref() {
5791 if var.as_str() == node_var {
5792 return vec![prop.as_str()];
5793 }
5794 }
5795 return vec![];
5796 }
5797 }
5798
5799 if let Expr::BinOp {
5801 left,
5802 op: BinOpKind::And,
5803 right,
5804 } = expr
5805 {
5806 let mut names: Vec<&'a str> = where_clause_range_prop_names(left, node_var);
5807 names.extend(where_clause_range_prop_names(right, node_var));
5808 return names;
5809 }
5810
5811 vec![]
5812}
5813
5814fn try_where_eq_index_lookup(
5825 expr: &Expr,
5826 node_var: &str,
5827 label_id: u32,
5828 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
5829) -> Option<Vec<u32>> {
5830 let (left, op, right) = match expr {
5831 Expr::BinOp { left, op, right } if matches!(op, BinOpKind::Eq) => {
5832 (left.as_ref(), op, right.as_ref())
5833 }
5834 _ => return None,
5835 };
5836 let _ = op;
5837
5838 let (prop_name, lit) = if let Expr::PropAccess { var, prop } = left {
5840 if var.as_str() == node_var {
5841 (prop.as_str(), right)
5842 } else {
5843 return None;
5844 }
5845 } else if let Expr::PropAccess { var, prop } = right {
5846 if var.as_str() == node_var {
5847 (prop.as_str(), left)
5848 } else {
5849 return None;
5850 }
5851 } else {
5852 return None;
5853 };
5854
5855 let raw_value: u64 = match lit {
5856 Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
5857 Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
5858 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
5859 }
5860 _ => return None,
5861 };
5862
5863 let col_id = prop_name_to_col_id(prop_name);
5864 if !prop_index.is_indexed(label_id, col_id) {
5865 return None;
5866 }
5867 Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
5868}
5869
5870fn try_where_range_index_lookup(
5881 expr: &Expr,
5882 node_var: &str,
5883 label_id: u32,
5884 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
5885) -> Option<Vec<u32>> {
5886 use sparrowdb_storage::property_index::sort_key;
5887
5888 fn encode_int(n: i64) -> u64 {
5890 StoreValue::Int64(n).to_u64()
5891 }
5892
5893 #[allow(clippy::type_complexity)]
5896 fn extract_single_bound<'a>(
5897 expr: &'a Expr,
5898 node_var: &'a str,
5899 ) -> Option<(&'a str, Option<(u64, bool)>, Option<(u64, bool)>)> {
5900 let (left, op, right) = match expr {
5901 Expr::BinOp { left, op, right }
5902 if matches!(
5903 op,
5904 BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
5905 ) =>
5906 {
5907 (left.as_ref(), op, right.as_ref())
5908 }
5909 _ => return None,
5910 };
5911
5912 if let (Expr::PropAccess { var, prop }, Expr::Literal(Literal::Int(n))) = (left, right) {
5914 if var.as_str() != node_var {
5915 return None;
5916 }
5917 let sk = sort_key(encode_int(*n));
5918 let prop_name = prop.as_str();
5919 return match op {
5920 BinOpKind::Gt => Some((prop_name, Some((sk, false)), None)),
5921 BinOpKind::Ge => Some((prop_name, Some((sk, true)), None)),
5922 BinOpKind::Lt => Some((prop_name, None, Some((sk, false)))),
5923 BinOpKind::Le => Some((prop_name, None, Some((sk, true)))),
5924 _ => None,
5925 };
5926 }
5927
5928 if let (Expr::Literal(Literal::Int(n)), Expr::PropAccess { var, prop }) = (left, right) {
5930 if var.as_str() != node_var {
5931 return None;
5932 }
5933 let sk = sort_key(encode_int(*n));
5934 let prop_name = prop.as_str();
5935 return match op {
5937 BinOpKind::Gt => Some((prop_name, None, Some((sk, false)))),
5938 BinOpKind::Ge => Some((prop_name, None, Some((sk, true)))),
5939 BinOpKind::Lt => Some((prop_name, Some((sk, false)), None)),
5940 BinOpKind::Le => Some((prop_name, Some((sk, true)), None)),
5941 _ => None,
5942 };
5943 }
5944
5945 None
5946 }
5947
5948 if let Expr::BinOp {
5951 left,
5952 op: BinOpKind::And,
5953 right,
5954 } = expr
5955 {
5956 if let (Some((lp, llo, lhi)), Some((rp, rlo, rhi))) = (
5957 extract_single_bound(left, node_var),
5958 extract_single_bound(right, node_var),
5959 ) {
5960 if lp == rp {
5961 let col_id = prop_name_to_col_id(lp);
5962 if !prop_index.is_indexed(label_id, col_id) {
5963 return None;
5964 }
5965 let lo: Option<(u64, bool)> = match (llo, rlo) {
5971 (Some(a), Some(b)) => Some(std::cmp::max(a, b)),
5972 (Some(a), None) | (None, Some(a)) => Some(a),
5973 (None, None) => None,
5974 };
5975 let hi: Option<(u64, bool)> = match (lhi, rhi) {
5976 (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
5977 (Some(a), None) | (None, Some(a)) => Some(a),
5978 (None, None) => None,
5979 };
5980 if lo.is_none() && hi.is_none() {
5982 return None;
5983 }
5984 return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
5985 }
5986 }
5987 }
5988
5989 if let Some((prop_name, lo, hi)) = extract_single_bound(expr, node_var) {
5991 let col_id = prop_name_to_col_id(prop_name);
5992 if !prop_index.is_indexed(label_id, col_id) {
5993 return None;
5994 }
5995 return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
5996 }
5997
5998 None
5999}
6000
6001fn prop_name_to_col_id(name: &str) -> u32 {
6022 col_id_of(name)
6023}
6024
6025fn collect_col_ids_from_columns(column_names: &[String]) -> Vec<u32> {
6026 let mut ids = Vec::new();
6027 for name in column_names {
6028 let prop = name.split('.').next_back().unwrap_or(name.as_str());
6030 let col_id = prop_name_to_col_id(prop);
6031 if !ids.contains(&col_id) {
6032 ids.push(col_id);
6033 }
6034 }
6035 ids
6036}
6037
6038fn collect_col_ids_for_var(var: &str, column_names: &[String], _label_id: u32) -> Vec<u32> {
6044 let mut ids = Vec::new();
6045 for name in column_names {
6046 if let Some((v, prop)) = name.split_once('.') {
6048 if v == var {
6049 let col_id = prop_name_to_col_id(prop);
6050 if !ids.contains(&col_id) {
6051 ids.push(col_id);
6052 }
6053 }
6054 } else {
6055 let col_id = prop_name_to_col_id(name.as_str());
6057 if !ids.contains(&col_id) {
6058 ids.push(col_id);
6059 }
6060 }
6061 }
6062 if ids.is_empty() {
6063 ids.push(0);
6065 }
6066 ids
6067}
6068
6069fn read_node_props(
6081 store: &NodeStore,
6082 node_id: NodeId,
6083 col_ids: &[u32],
6084) -> sparrowdb_common::Result<Vec<(u32, u64)>> {
6085 if col_ids.is_empty() {
6086 return Ok(vec![]);
6087 }
6088 let nullable = store.get_node_raw_nullable(node_id, col_ids)?;
6089 Ok(nullable
6090 .into_iter()
6091 .filter_map(|(col_id, opt): (u32, Option<u64>)| opt.map(|v| (col_id, v)))
6092 .collect())
6093}
6094
6095fn decode_raw_val(raw: u64, store: &NodeStore) -> Value {
6102 match store.decode_raw_value(raw) {
6103 StoreValue::Int64(n) => Value::Int64(n),
6104 StoreValue::Bytes(b) => Value::String(String::from_utf8_lossy(&b).into_owned()),
6105 StoreValue::Float(f) => Value::Float64(f),
6106 }
6107}
6108
6109fn build_row_vals(
6110 props: &[(u32, u64)],
6111 var_name: &str,
6112 _col_ids: &[u32],
6113 store: &NodeStore,
6114) -> HashMap<String, Value> {
6115 let mut map = HashMap::new();
6116 for &(col_id, raw) in props {
6117 let key = format!("{var_name}.col_{col_id}");
6118 map.insert(key, decode_raw_val(raw, store));
6119 }
6120 map
6121}
6122
6123#[inline]
6129fn is_reserved_label(label: &str) -> bool {
6130 label.starts_with("__SO_")
6131}
6132
6133fn values_equal(a: &Value, b: &Value) -> bool {
6141 match (a, b) {
6142 (Value::Int64(x), Value::Int64(y)) => x == y,
6144 (Value::String(x), Value::String(y)) => x == y,
6150 (Value::Bool(x), Value::Bool(y)) => x == y,
6151 (Value::Float64(x), Value::Float64(y)) => x == y,
6152 (Value::Int64(raw), Value::String(s)) => *raw as u64 == string_to_raw_u64(s),
6156 (Value::String(s), Value::Int64(raw)) => string_to_raw_u64(s) == *raw as u64,
6157 (Value::Null, Value::Null) => true,
6159 _ => false,
6160 }
6161}
6162
6163fn eval_where(expr: &Expr, vals: &HashMap<String, Value>) -> bool {
6164 match expr {
6165 Expr::BinOp { left, op, right } => {
6166 let lv = eval_expr(left, vals);
6167 let rv = eval_expr(right, vals);
6168 match op {
6169 BinOpKind::Eq => values_equal(&lv, &rv),
6170 BinOpKind::Neq => !values_equal(&lv, &rv),
6171 BinOpKind::Contains => lv.contains(&rv),
6172 BinOpKind::StartsWith => {
6173 matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.starts_with(r.as_str()))
6174 }
6175 BinOpKind::EndsWith => {
6176 matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.ends_with(r.as_str()))
6177 }
6178 BinOpKind::Lt => match (&lv, &rv) {
6179 (Value::Int64(a), Value::Int64(b)) => a < b,
6180 _ => false,
6181 },
6182 BinOpKind::Le => match (&lv, &rv) {
6183 (Value::Int64(a), Value::Int64(b)) => a <= b,
6184 _ => false,
6185 },
6186 BinOpKind::Gt => match (&lv, &rv) {
6187 (Value::Int64(a), Value::Int64(b)) => a > b,
6188 _ => false,
6189 },
6190 BinOpKind::Ge => match (&lv, &rv) {
6191 (Value::Int64(a), Value::Int64(b)) => a >= b,
6192 _ => false,
6193 },
6194 _ => false,
6195 }
6196 }
6197 Expr::And(l, r) => eval_where(l, vals) && eval_where(r, vals),
6198 Expr::Or(l, r) => eval_where(l, vals) || eval_where(r, vals),
6199 Expr::Not(inner) => !eval_where(inner, vals),
6200 Expr::Literal(Literal::Bool(b)) => *b,
6201 Expr::Literal(_) => false,
6202 Expr::InList {
6203 expr,
6204 list,
6205 negated,
6206 } => {
6207 let lv = eval_expr(expr, vals);
6208 let matched = list
6209 .iter()
6210 .any(|item| values_equal(&lv, &eval_expr(item, vals)));
6211 if *negated {
6212 !matched
6213 } else {
6214 matched
6215 }
6216 }
6217 Expr::ListPredicate { .. } => {
6218 match eval_expr(expr, vals) {
6220 Value::Bool(b) => b,
6221 _ => false,
6222 }
6223 }
6224 Expr::IsNull(inner) => matches!(eval_expr(inner, vals), Value::Null),
6225 Expr::IsNotNull(inner) => !matches!(eval_expr(inner, vals), Value::Null),
6226 Expr::CaseWhen { .. } => matches!(eval_expr(expr, vals), Value::Bool(true)),
6228 Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
6231 false
6232 }
6233 _ => false, }
6235}
6236
6237fn eval_expr(expr: &Expr, vals: &HashMap<String, Value>) -> Value {
6238 match expr {
6239 Expr::PropAccess { var, prop } => {
6240 let key = format!("{var}.{prop}");
6242 if let Some(v) = vals.get(&key) {
6243 return v.clone();
6244 }
6245 let col_id = prop_name_to_col_id(prop);
6249 let fallback_key = format!("{var}.col_{col_id}");
6250 vals.get(&fallback_key).cloned().unwrap_or(Value::Null)
6251 }
6252 Expr::Var(v) => vals.get(v.as_str()).cloned().unwrap_or(Value::Null),
6253 Expr::Literal(lit) => match lit {
6254 Literal::Int(n) => Value::Int64(*n),
6255 Literal::Float(f) => Value::Float64(*f),
6256 Literal::Bool(b) => Value::Bool(*b),
6257 Literal::String(s) => Value::String(s.clone()),
6258 Literal::Param(p) => {
6259 vals.get(&format!("${p}")).cloned().unwrap_or(Value::Null)
6262 }
6263 Literal::Null => Value::Null,
6264 },
6265 Expr::FnCall { name, args } => {
6266 let name_lc = name.to_lowercase();
6270 if name_lc == "type" {
6271 if let Some(Expr::Var(var_name)) = args.first() {
6272 let meta_key = format!("{}.__type__", var_name);
6273 return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
6274 }
6275 }
6276 if name_lc == "labels" {
6277 if let Some(Expr::Var(var_name)) = args.first() {
6278 let meta_key = format!("{}.__labels__", var_name);
6279 return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
6280 }
6281 }
6282 if name_lc == "id" {
6285 if let Some(Expr::Var(var_name)) = args.first() {
6286 let id_key = format!("{}.__node_id__", var_name);
6288 if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
6289 return Value::Int64(nid.0 as i64);
6290 }
6291 if let Some(Value::NodeRef(nid)) = vals.get(var_name.as_str()) {
6293 return Value::Int64(nid.0 as i64);
6294 }
6295 return Value::Null;
6296 }
6297 }
6298 let evaluated: Vec<Value> = args.iter().map(|a| eval_expr(a, vals)).collect();
6300 crate::functions::dispatch_function(name, evaluated).unwrap_or(Value::Null)
6301 }
6302 Expr::BinOp { left, op, right } => {
6303 let lv = eval_expr(left, vals);
6305 let rv = eval_expr(right, vals);
6306 match op {
6307 BinOpKind::Eq => Value::Bool(lv == rv),
6308 BinOpKind::Neq => Value::Bool(lv != rv),
6309 BinOpKind::Lt => match (&lv, &rv) {
6310 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a < b),
6311 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a < b),
6312 _ => Value::Null,
6313 },
6314 BinOpKind::Le => match (&lv, &rv) {
6315 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a <= b),
6316 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a <= b),
6317 _ => Value::Null,
6318 },
6319 BinOpKind::Gt => match (&lv, &rv) {
6320 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a > b),
6321 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a > b),
6322 _ => Value::Null,
6323 },
6324 BinOpKind::Ge => match (&lv, &rv) {
6325 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a >= b),
6326 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a >= b),
6327 _ => Value::Null,
6328 },
6329 BinOpKind::Contains => match (&lv, &rv) {
6330 (Value::String(l), Value::String(r)) => Value::Bool(l.contains(r.as_str())),
6331 _ => Value::Null,
6332 },
6333 BinOpKind::StartsWith => match (&lv, &rv) {
6334 (Value::String(l), Value::String(r)) => Value::Bool(l.starts_with(r.as_str())),
6335 _ => Value::Null,
6336 },
6337 BinOpKind::EndsWith => match (&lv, &rv) {
6338 (Value::String(l), Value::String(r)) => Value::Bool(l.ends_with(r.as_str())),
6339 _ => Value::Null,
6340 },
6341 BinOpKind::And => match (&lv, &rv) {
6342 (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a && *b),
6343 _ => Value::Null,
6344 },
6345 BinOpKind::Or => match (&lv, &rv) {
6346 (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a || *b),
6347 _ => Value::Null,
6348 },
6349 BinOpKind::Add => match (&lv, &rv) {
6350 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a + b),
6351 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a + b),
6352 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 + b),
6353 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a + *b as f64),
6354 (Value::String(a), Value::String(b)) => Value::String(format!("{a}{b}")),
6355 _ => Value::Null,
6356 },
6357 BinOpKind::Sub => match (&lv, &rv) {
6358 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a - b),
6359 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a - b),
6360 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 - b),
6361 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a - *b as f64),
6362 _ => Value::Null,
6363 },
6364 BinOpKind::Mul => match (&lv, &rv) {
6365 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a * b),
6366 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a * b),
6367 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 * b),
6368 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a * *b as f64),
6369 _ => Value::Null,
6370 },
6371 BinOpKind::Div => match (&lv, &rv) {
6372 (Value::Int64(a), Value::Int64(b)) => {
6373 if *b == 0 {
6374 Value::Null
6375 } else {
6376 Value::Int64(a / b)
6377 }
6378 }
6379 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a / b),
6380 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 / b),
6381 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a / *b as f64),
6382 _ => Value::Null,
6383 },
6384 BinOpKind::Mod => match (&lv, &rv) {
6385 (Value::Int64(a), Value::Int64(b)) => {
6386 if *b == 0 {
6387 Value::Null
6388 } else {
6389 Value::Int64(a % b)
6390 }
6391 }
6392 _ => Value::Null,
6393 },
6394 }
6395 }
6396 Expr::Not(inner) => match eval_expr(inner, vals) {
6397 Value::Bool(b) => Value::Bool(!b),
6398 _ => Value::Null,
6399 },
6400 Expr::And(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
6401 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
6402 _ => Value::Null,
6403 },
6404 Expr::Or(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
6405 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
6406 _ => Value::Null,
6407 },
6408 Expr::InList {
6409 expr,
6410 list,
6411 negated,
6412 } => {
6413 let lv = eval_expr(expr, vals);
6414 let matched = list
6415 .iter()
6416 .any(|item| values_equal(&lv, &eval_expr(item, vals)));
6417 Value::Bool(if *negated { !matched } else { matched })
6418 }
6419 Expr::List(items) => {
6420 let evaluated: Vec<Value> = items.iter().map(|e| eval_expr(e, vals)).collect();
6421 Value::List(evaluated)
6422 }
6423 Expr::ListPredicate {
6424 kind,
6425 variable,
6426 list_expr,
6427 predicate,
6428 } => {
6429 let list_val = eval_expr(list_expr, vals);
6430 let items = match list_val {
6431 Value::List(v) => v,
6432 _ => return Value::Null,
6433 };
6434 let mut satisfied_count = 0usize;
6435 let mut scope = vals.clone();
6438 for item in &items {
6439 scope.insert(variable.clone(), item.clone());
6440 let result = eval_expr(predicate, &scope);
6441 if result == Value::Bool(true) {
6442 satisfied_count += 1;
6443 }
6444 }
6445 let result = match kind {
6446 ListPredicateKind::Any => satisfied_count > 0,
6447 ListPredicateKind::All => satisfied_count == items.len(),
6448 ListPredicateKind::None => satisfied_count == 0,
6449 ListPredicateKind::Single => satisfied_count == 1,
6450 };
6451 Value::Bool(result)
6452 }
6453 Expr::IsNull(inner) => Value::Bool(matches!(eval_expr(inner, vals), Value::Null)),
6454 Expr::IsNotNull(inner) => Value::Bool(!matches!(eval_expr(inner, vals), Value::Null)),
6455 Expr::CaseWhen {
6457 branches,
6458 else_expr,
6459 } => {
6460 for (cond, then_val) in branches {
6461 if let Value::Bool(true) = eval_expr(cond, vals) {
6462 return eval_expr(then_val, vals);
6463 }
6464 }
6465 else_expr
6466 .as_ref()
6467 .map(|e| eval_expr(e, vals))
6468 .unwrap_or(Value::Null)
6469 }
6470 Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
6472 Value::Null
6473 }
6474 }
6475}
6476
6477fn project_row(
6478 props: &[(u32, u64)],
6479 column_names: &[String],
6480 _col_ids: &[u32],
6481 var_name: &str,
6483 node_label: &str,
6485 store: &NodeStore,
6486) -> Vec<Value> {
6487 column_names
6488 .iter()
6489 .map(|col_name| {
6490 if let Some(inner) = col_name
6492 .strip_prefix("labels(")
6493 .and_then(|s| s.strip_suffix(')'))
6494 {
6495 if inner == var_name && !node_label.is_empty() {
6496 return Value::List(vec![Value::String(node_label.to_string())]);
6497 }
6498 return Value::Null;
6499 }
6500 let prop = col_name.split('.').next_back().unwrap_or(col_name.as_str());
6501 let col_id = prop_name_to_col_id(prop);
6502 props
6503 .iter()
6504 .find(|(c, _)| *c == col_id)
6505 .map(|(_, v)| decode_raw_val(*v, store))
6506 .unwrap_or(Value::Null)
6507 })
6508 .collect()
6509}
6510
6511#[allow(clippy::too_many_arguments)]
6512fn project_hop_row(
6513 src_props: &[(u32, u64)],
6514 dst_props: &[(u32, u64)],
6515 column_names: &[String],
6516 src_var: &str,
6517 _dst_var: &str,
6518 rel_var_type: Option<(&str, &str)>,
6520 src_label_meta: Option<(&str, &str)>,
6522 dst_label_meta: Option<(&str, &str)>,
6524 store: &NodeStore,
6525) -> Vec<Value> {
6526 column_names
6527 .iter()
6528 .map(|col_name| {
6529 if let Some(inner) = col_name
6531 .strip_prefix("type(")
6532 .and_then(|s| s.strip_suffix(')'))
6533 {
6534 if let Some((rel_var, rel_type)) = rel_var_type {
6536 if inner == rel_var {
6537 return Value::String(rel_type.to_string());
6538 }
6539 }
6540 return Value::Null;
6541 }
6542 if let Some(inner) = col_name
6544 .strip_prefix("labels(")
6545 .and_then(|s| s.strip_suffix(')'))
6546 {
6547 if let Some((meta_var, label)) = src_label_meta {
6548 if inner == meta_var {
6549 return Value::List(vec![Value::String(label.to_string())]);
6550 }
6551 }
6552 if let Some((meta_var, label)) = dst_label_meta {
6553 if inner == meta_var {
6554 return Value::List(vec![Value::String(label.to_string())]);
6555 }
6556 }
6557 return Value::Null;
6558 }
6559 if let Some((v, prop)) = col_name.split_once('.') {
6560 let col_id = prop_name_to_col_id(prop);
6561 let props = if v == src_var { src_props } else { dst_props };
6562 props
6563 .iter()
6564 .find(|(c, _)| *c == col_id)
6565 .map(|(_, val)| decode_raw_val(*val, store))
6566 .unwrap_or(Value::Null)
6567 } else {
6568 Value::Null
6569 }
6570 })
6571 .collect()
6572}
6573
6574fn project_fof_row(
6581 src_props: &[(u32, u64)],
6582 fof_props: &[(u32, u64)],
6583 column_names: &[String],
6584 src_var: &str,
6585 store: &NodeStore,
6586) -> Vec<Value> {
6587 column_names
6588 .iter()
6589 .map(|col_name| {
6590 if let Some((var, prop)) = col_name.split_once('.') {
6591 let col_id = prop_name_to_col_id(prop);
6592 let props = if !src_var.is_empty() && var == src_var {
6593 src_props
6594 } else {
6595 fof_props
6596 };
6597 props
6598 .iter()
6599 .find(|(c, _)| *c == col_id)
6600 .map(|(_, v)| decode_raw_val(*v, store))
6601 .unwrap_or(Value::Null)
6602 } else {
6603 Value::Null
6604 }
6605 })
6606 .collect()
6607}
6608
6609fn deduplicate_rows(rows: &mut Vec<Vec<Value>>) {
6610 let mut unique: Vec<Vec<Value>> = Vec::with_capacity(rows.len());
6613 for row in rows.drain(..) {
6614 if !unique.iter().any(|existing| existing == &row) {
6615 unique.push(row);
6616 }
6617 }
6618 *rows = unique;
6619}
6620
6621fn sort_spill_threshold() -> usize {
6623 std::env::var("SPARROWDB_SORT_SPILL_ROWS")
6624 .ok()
6625 .and_then(|v| v.parse().ok())
6626 .unwrap_or(crate::sort_spill::DEFAULT_ROW_THRESHOLD)
6627}
6628
6629fn make_sort_key(
6631 row: &[Value],
6632 order_by: &[(Expr, SortDir)],
6633 column_names: &[String],
6634) -> Vec<crate::sort_spill::SortKeyVal> {
6635 use crate::sort_spill::{OrdValue, SortKeyVal};
6636 order_by
6637 .iter()
6638 .map(|(expr, dir)| {
6639 let col_idx = match expr {
6640 Expr::PropAccess { var, prop } => {
6641 let key = format!("{var}.{prop}");
6642 column_names.iter().position(|c| c == &key)
6643 }
6644 Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
6645 _ => None,
6646 };
6647 let val = col_idx
6648 .and_then(|i| row.get(i))
6649 .map(OrdValue::from_value)
6650 .unwrap_or(OrdValue::Null);
6651 match dir {
6652 SortDir::Asc => SortKeyVal::Asc(val),
6653 SortDir::Desc => SortKeyVal::Desc(std::cmp::Reverse(val)),
6654 }
6655 })
6656 .collect()
6657}
6658
6659fn apply_order_by(rows: &mut Vec<Vec<Value>>, m: &MatchStatement, column_names: &[String]) {
6660 if m.order_by.is_empty() {
6661 return;
6662 }
6663
6664 let threshold = sort_spill_threshold();
6665
6666 if rows.len() <= threshold {
6667 rows.sort_by(|a, b| {
6668 for (expr, dir) in &m.order_by {
6669 let col_idx = match expr {
6670 Expr::PropAccess { var, prop } => {
6671 let key = format!("{var}.{prop}");
6672 column_names.iter().position(|c| c == &key)
6673 }
6674 Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
6675 _ => None,
6676 };
6677 if let Some(idx) = col_idx {
6678 if idx < a.len() && idx < b.len() {
6679 let cmp = compare_values(&a[idx], &b[idx]);
6680 let cmp = if *dir == SortDir::Desc {
6681 cmp.reverse()
6682 } else {
6683 cmp
6684 };
6685 if cmp != std::cmp::Ordering::Equal {
6686 return cmp;
6687 }
6688 }
6689 }
6690 }
6691 std::cmp::Ordering::Equal
6692 });
6693 } else {
6694 use crate::sort_spill::{SortableRow, SpillingSorter};
6695 let mut sorter: SpillingSorter<SortableRow> = SpillingSorter::new();
6696 for row in rows.drain(..) {
6697 let key = make_sort_key(&row, &m.order_by, column_names);
6698 if sorter.push(SortableRow { key, data: row }).is_err() {
6699 return;
6700 }
6701 }
6702 if let Ok(iter) = sorter.finish() {
6703 *rows = iter.map(|sr| sr.data).collect::<Vec<_>>();
6704 }
6705 }
6706}
6707
6708fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
6709 match (a, b) {
6710 (Value::Int64(x), Value::Int64(y)) => x.cmp(y),
6711 (Value::Float64(x), Value::Float64(y)) => {
6712 x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal)
6713 }
6714 (Value::String(x), Value::String(y)) => x.cmp(y),
6715 _ => std::cmp::Ordering::Equal,
6716 }
6717}
6718
6719fn is_aggregate_expr(expr: &Expr) -> bool {
6723 match expr {
6724 Expr::CountStar => true,
6725 Expr::FnCall { name, .. } => matches!(
6726 name.to_lowercase().as_str(),
6727 "count" | "sum" | "avg" | "min" | "max" | "collect"
6728 ),
6729 Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
6731 _ => false,
6732 }
6733}
6734
6735fn expr_has_collect(expr: &Expr) -> bool {
6737 match expr {
6738 Expr::FnCall { name, .. } => name.to_lowercase() == "collect",
6739 Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
6740 _ => false,
6741 }
6742}
6743
6744fn extract_collect_arg(expr: &Expr, row_vals: &HashMap<String, Value>) -> Value {
6750 match expr {
6751 Expr::FnCall { args, .. } if !args.is_empty() => eval_expr(&args[0], row_vals),
6752 Expr::ListPredicate { list_expr, .. } => extract_collect_arg(list_expr, row_vals),
6753 _ => Value::Null,
6754 }
6755}
6756
6757fn evaluate_aggregate_expr(
6763 expr: &Expr,
6764 accumulated_list: &Value,
6765 outer_vals: &HashMap<String, Value>,
6766) -> Value {
6767 match expr {
6768 Expr::FnCall { name, .. } if name.to_lowercase() == "collect" => accumulated_list.clone(),
6769 Expr::ListPredicate {
6770 kind,
6771 variable,
6772 predicate,
6773 ..
6774 } => {
6775 let items = match accumulated_list {
6776 Value::List(v) => v,
6777 _ => return Value::Null,
6778 };
6779 let mut satisfied_count = 0usize;
6780 for item in items {
6781 let mut scope = outer_vals.clone();
6782 scope.insert(variable.clone(), item.clone());
6783 let result = eval_expr(predicate, &scope);
6784 if result == Value::Bool(true) {
6785 satisfied_count += 1;
6786 }
6787 }
6788 let result = match kind {
6789 ListPredicateKind::Any => satisfied_count > 0,
6790 ListPredicateKind::All => satisfied_count == items.len(),
6791 ListPredicateKind::None => satisfied_count == 0,
6792 ListPredicateKind::Single => satisfied_count == 1,
6793 };
6794 Value::Bool(result)
6795 }
6796 _ => Value::Null,
6797 }
6798}
6799
6800fn has_aggregate_in_return(items: &[ReturnItem]) -> bool {
6802 items.iter().any(|item| is_aggregate_expr(&item.expr))
6803}
6804
6805fn needs_node_ref_in_return(items: &[ReturnItem]) -> bool {
6816 items.iter().any(|item| {
6817 matches!(&item.expr, Expr::FnCall { name, .. } if name.to_lowercase() == "id")
6818 || matches!(&item.expr, Expr::Var(_))
6819 || expr_needs_graph(&item.expr)
6820 || expr_needs_eval_path(&item.expr)
6821 })
6822}
6823
6824fn expr_needs_eval_path(expr: &Expr) -> bool {
6836 match expr {
6837 Expr::FnCall { name, args } => {
6838 let name_lc = name.to_lowercase();
6839 if matches!(
6841 name_lc.as_str(),
6842 "count" | "sum" | "avg" | "min" | "max" | "collect"
6843 ) {
6844 return false;
6845 }
6846 let _ = args; true
6852 }
6853 Expr::BinOp { left, right, .. } => {
6855 expr_needs_eval_path(left) || expr_needs_eval_path(right)
6856 }
6857 Expr::And(l, r) | Expr::Or(l, r) => expr_needs_eval_path(l) || expr_needs_eval_path(r),
6858 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
6859 expr_needs_eval_path(inner)
6860 }
6861 _ => false,
6862 }
6863}
6864
6865fn bare_var_names_in_return(items: &[ReturnItem]) -> Vec<String> {
6870 items
6871 .iter()
6872 .filter_map(|item| {
6873 if let Expr::Var(v) = &item.expr {
6874 Some(v.clone())
6875 } else {
6876 None
6877 }
6878 })
6879 .collect()
6880}
6881
6882fn build_node_map(props: &[(u32, u64)], store: &NodeStore) -> Value {
6887 let entries: Vec<(String, Value)> = props
6888 .iter()
6889 .map(|&(col_id, raw)| (format!("col_{col_id}"), decode_raw_val(raw, store)))
6890 .collect();
6891 Value::Map(entries)
6892}
6893
6894#[derive(Debug, Clone, PartialEq)]
6896enum AggKind {
6897 Key,
6899 CountStar,
6900 Count,
6901 Sum,
6902 Avg,
6903 Min,
6904 Max,
6905 Collect,
6906}
6907
6908fn agg_kind(expr: &Expr) -> AggKind {
6909 match expr {
6910 Expr::CountStar => AggKind::CountStar,
6911 Expr::FnCall { name, .. } => match name.to_lowercase().as_str() {
6912 "count" => AggKind::Count,
6913 "sum" => AggKind::Sum,
6914 "avg" => AggKind::Avg,
6915 "min" => AggKind::Min,
6916 "max" => AggKind::Max,
6917 "collect" => AggKind::Collect,
6918 _ => AggKind::Key,
6919 },
6920 Expr::ListPredicate { list_expr, .. } if expr_has_collect(list_expr) => AggKind::Collect,
6922 _ => AggKind::Key,
6923 }
6924}
6925
6926fn expr_needs_graph(expr: &Expr) -> bool {
6935 match expr {
6936 Expr::ShortestPath(_) | Expr::ExistsSubquery(_) | Expr::CaseWhen { .. } => true,
6937 Expr::And(l, r) | Expr::Or(l, r) => expr_needs_graph(l) || expr_needs_graph(r),
6938 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => expr_needs_graph(inner),
6939 Expr::BinOp { left, right, .. } => expr_needs_graph(left) || expr_needs_graph(right),
6940 _ => false,
6941 }
6942}
6943
6944fn aggregate_rows(rows: &[HashMap<String, Value>], return_items: &[ReturnItem]) -> Vec<Vec<Value>> {
6945 let kinds: Vec<AggKind> = return_items
6947 .iter()
6948 .map(|item| agg_kind(&item.expr))
6949 .collect();
6950
6951 let key_indices: Vec<usize> = kinds
6952 .iter()
6953 .enumerate()
6954 .filter(|(_, k)| **k == AggKind::Key)
6955 .map(|(i, _)| i)
6956 .collect();
6957
6958 let agg_indices: Vec<usize> = kinds
6959 .iter()
6960 .enumerate()
6961 .filter(|(_, k)| **k != AggKind::Key)
6962 .map(|(i, _)| i)
6963 .collect();
6964
6965 if agg_indices.is_empty() {
6967 return rows
6968 .iter()
6969 .map(|row_vals| {
6970 return_items
6971 .iter()
6972 .map(|item| eval_expr(&item.expr, row_vals))
6973 .collect()
6974 })
6975 .collect();
6976 }
6977
6978 let mut group_keys: Vec<Vec<Value>> = Vec::new();
6980 let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new();
6982
6983 for row_vals in rows {
6984 let key: Vec<Value> = key_indices
6985 .iter()
6986 .map(|&i| eval_expr(&return_items[i].expr, row_vals))
6987 .collect();
6988
6989 let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
6990 pos
6991 } else {
6992 group_keys.push(key);
6993 group_accum.push(vec![vec![]; agg_indices.len()]);
6994 group_keys.len() - 1
6995 };
6996
6997 for (ai, &ri) in agg_indices.iter().enumerate() {
6998 match &kinds[ri] {
6999 AggKind::CountStar => {
7000 group_accum[group_idx][ai].push(Value::Int64(1));
7002 }
7003 AggKind::Count | AggKind::Sum | AggKind::Avg | AggKind::Min | AggKind::Max => {
7004 let arg_val = match &return_items[ri].expr {
7005 Expr::FnCall { args, .. } if !args.is_empty() => {
7006 eval_expr(&args[0], row_vals)
7007 }
7008 _ => Value::Null,
7009 };
7010 if !matches!(arg_val, Value::Null) {
7012 group_accum[group_idx][ai].push(arg_val);
7013 }
7014 }
7015 AggKind::Collect => {
7016 let arg_val = extract_collect_arg(&return_items[ri].expr, row_vals);
7019 if !matches!(arg_val, Value::Null) {
7021 group_accum[group_idx][ai].push(arg_val);
7022 }
7023 }
7024 AggKind::Key => unreachable!(),
7025 }
7026 }
7027 }
7028
7029 if group_keys.is_empty() && key_indices.is_empty() {
7031 let empty_vals: HashMap<String, Value> = HashMap::new();
7032 let row: Vec<Value> = return_items
7033 .iter()
7034 .zip(kinds.iter())
7035 .map(|(item, k)| match k {
7036 AggKind::CountStar | AggKind::Count | AggKind::Sum => Value::Int64(0),
7037 AggKind::Avg | AggKind::Min | AggKind::Max => Value::Null,
7038 AggKind::Collect => {
7039 evaluate_aggregate_expr(&item.expr, &Value::List(vec![]), &empty_vals)
7040 }
7041 AggKind::Key => Value::Null,
7042 })
7043 .collect();
7044 return vec![row];
7045 }
7046
7047 if group_keys.is_empty() {
7049 return vec![];
7050 }
7051
7052 let mut out: Vec<Vec<Value>> = Vec::with_capacity(group_keys.len());
7054 for (gi, key_vals) in group_keys.into_iter().enumerate() {
7055 let mut output_row: Vec<Value> = Vec::with_capacity(return_items.len());
7056 let mut ki = 0usize;
7057 let mut ai = 0usize;
7058 let outer_vals: HashMap<String, Value> = key_indices
7060 .iter()
7061 .enumerate()
7062 .map(|(pos, &i)| {
7063 let name = return_items[i]
7064 .alias
7065 .clone()
7066 .unwrap_or_else(|| format!("_k{i}"));
7067 (name, key_vals[pos].clone())
7068 })
7069 .collect();
7070 for col_idx in 0..return_items.len() {
7071 if kinds[col_idx] == AggKind::Key {
7072 output_row.push(key_vals[ki].clone());
7073 ki += 1;
7074 } else {
7075 let accumulated = Value::List(group_accum[gi][ai].clone());
7076 let result = if kinds[col_idx] == AggKind::Collect {
7077 evaluate_aggregate_expr(&return_items[col_idx].expr, &accumulated, &outer_vals)
7078 } else {
7079 finalize_aggregate(&kinds[col_idx], &group_accum[gi][ai])
7080 };
7081 output_row.push(result);
7082 ai += 1;
7083 }
7084 }
7085 out.push(output_row);
7086 }
7087 out
7088}
7089
7090fn finalize_aggregate(kind: &AggKind, vals: &[Value]) -> Value {
7092 match kind {
7093 AggKind::CountStar | AggKind::Count => Value::Int64(vals.len() as i64),
7094 AggKind::Sum => {
7095 let mut sum_i: i64 = 0;
7096 let mut sum_f: f64 = 0.0;
7097 let mut is_float = false;
7098 for v in vals {
7099 match v {
7100 Value::Int64(n) => sum_i += n,
7101 Value::Float64(f) => {
7102 is_float = true;
7103 sum_f += f;
7104 }
7105 _ => {}
7106 }
7107 }
7108 if is_float {
7109 Value::Float64(sum_f + sum_i as f64)
7110 } else {
7111 Value::Int64(sum_i)
7112 }
7113 }
7114 AggKind::Avg => {
7115 if vals.is_empty() {
7116 return Value::Null;
7117 }
7118 let mut sum: f64 = 0.0;
7119 let mut count: i64 = 0;
7120 for v in vals {
7121 match v {
7122 Value::Int64(n) => {
7123 sum += *n as f64;
7124 count += 1;
7125 }
7126 Value::Float64(f) => {
7127 sum += f;
7128 count += 1;
7129 }
7130 _ => {}
7131 }
7132 }
7133 if count == 0 {
7134 Value::Null
7135 } else {
7136 Value::Float64(sum / count as f64)
7137 }
7138 }
7139 AggKind::Min => vals
7140 .iter()
7141 .fold(None::<Value>, |acc, v| match (acc, v) {
7142 (None, v) => Some(v.clone()),
7143 (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.min(*b))),
7144 (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.min(*b))),
7145 (Some(Value::String(a)), Value::String(b)) => {
7146 Some(Value::String(if a <= *b { a } else { b.clone() }))
7147 }
7148 (Some(a), _) => Some(a),
7149 })
7150 .unwrap_or(Value::Null),
7151 AggKind::Max => vals
7152 .iter()
7153 .fold(None::<Value>, |acc, v| match (acc, v) {
7154 (None, v) => Some(v.clone()),
7155 (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.max(*b))),
7156 (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.max(*b))),
7157 (Some(Value::String(a)), Value::String(b)) => {
7158 Some(Value::String(if a >= *b { a } else { b.clone() }))
7159 }
7160 (Some(a), _) => Some(a),
7161 })
7162 .unwrap_or(Value::Null),
7163 AggKind::Collect => Value::List(vals.to_vec()),
7164 AggKind::Key => Value::Null,
7165 }
7166}
7167
7168fn eval_expr_to_string(expr: &Expr) -> Result<String> {
7175 match expr {
7176 Expr::Literal(Literal::String(s)) => Ok(s.clone()),
7177 Expr::Literal(Literal::Param(p)) => Err(sparrowdb_common::Error::InvalidArgument(format!(
7178 "parameter ${p} requires runtime binding; pass a literal string instead"
7179 ))),
7180 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
7181 "procedure argument must be a string literal, got: {other:?}"
7182 ))),
7183 }
7184}
7185
7186fn expr_to_col_name(expr: &Expr) -> String {
7189 match expr {
7190 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
7191 Expr::Var(v) => v.clone(),
7192 _ => "value".to_owned(),
7193 }
7194}
7195
7196fn eval_call_expr(expr: &Expr, env: &HashMap<String, Value>, store: &NodeStore) -> Value {
7202 match expr {
7203 Expr::Var(v) => env.get(v.as_str()).cloned().unwrap_or(Value::Null),
7204 Expr::PropAccess { var, prop } => match env.get(var.as_str()) {
7205 Some(Value::NodeRef(node_id)) => {
7206 let col_id = prop_name_to_col_id(prop);
7207 read_node_props(store, *node_id, &[col_id])
7208 .ok()
7209 .and_then(|pairs| pairs.into_iter().find(|(c, _)| *c == col_id))
7210 .map(|(_, raw)| decode_raw_val(raw, store))
7211 .unwrap_or(Value::Null)
7212 }
7213 Some(other) => other.clone(),
7214 None => Value::Null,
7215 },
7216 Expr::Literal(lit) => match lit {
7217 Literal::Int(n) => Value::Int64(*n),
7218 Literal::Float(f) => Value::Float64(*f),
7219 Literal::Bool(b) => Value::Bool(*b),
7220 Literal::String(s) => Value::String(s.clone()),
7221 _ => Value::Null,
7222 },
7223 _ => Value::Null,
7224 }
7225}