1use std::collections::{HashMap, HashSet};
7use std::path::Path;
8
9use tracing::info_span;
10
11use sparrowdb_catalog::catalog::Catalog;
12use sparrowdb_common::{col_id_of, NodeId, Result};
13use sparrowdb_cypher::ast::{
14 BinOpKind, CallStatement, CreateStatement, Expr, ListPredicateKind, Literal,
15 MatchCreateStatement, MatchMergeRelStatement, MatchMutateStatement,
16 MatchOptionalMatchStatement, MatchStatement, MatchWithStatement, Mutation,
17 OptionalMatchStatement, PathPattern, PipelineStage, PipelineStatement, ReturnItem, SortDir,
18 Statement, UnionStatement, UnwindStatement, WithClause,
19};
20use sparrowdb_cypher::{bind, parse};
21use sparrowdb_storage::csr::{CsrBackward, CsrForward};
22use sparrowdb_storage::edge_store::{DeltaRecord, EdgeStore, RelTableId};
23use sparrowdb_storage::fulltext_index::FulltextIndex;
24use sparrowdb_storage::node_store::{NodeStore, Value as StoreValue};
25use sparrowdb_storage::property_index::PropertyIndex;
26use sparrowdb_storage::text_index::TextIndex;
27use sparrowdb_storage::wal::WalReplayer;
28
29use crate::types::{QueryResult, Value};
30
31#[derive(Debug, Clone, Copy)]
37enum RelTableLookup {
38 All,
40 Found(u32),
42 NotFound,
45}
46
47pub struct Engine {
49 pub store: NodeStore,
50 pub catalog: Catalog,
51 pub csrs: HashMap<u32, CsrForward>,
55 pub db_root: std::path::PathBuf,
56 pub params: HashMap<String, Value>,
58 pub prop_index: PropertyIndex,
63 pub text_index: TextIndex,
70 pub deadline: Option<std::time::Instant>,
77}
78
79impl Engine {
80 pub fn new(
86 store: NodeStore,
87 catalog: Catalog,
88 csrs: HashMap<u32, CsrForward>,
89 db_root: &Path,
90 ) -> Self {
91 let (prop_index, text_index) = match catalog.list_labels() {
94 Ok(labels) => {
95 let labels_u32: Vec<(u32, String)> = labels
97 .into_iter()
98 .map(|(id, name)| (id as u32, name))
99 .collect();
100 let pi = PropertyIndex::build(&store, &labels_u32);
101 let ti = TextIndex::build(&store, &labels_u32);
103 (pi, ti)
104 }
105 Err(e) => {
106 tracing::warn!(error = ?e, "SPA-249/SPA-251: index build failed; disabled");
107 (PropertyIndex::new(), TextIndex::new())
108 }
109 };
110 Engine {
111 store,
112 catalog,
113 csrs,
114 db_root: db_root.to_path_buf(),
115 params: HashMap::new(),
116 prop_index,
117 text_index,
118 deadline: None,
119 }
120 }
121
122 pub fn with_single_csr(
128 store: NodeStore,
129 catalog: Catalog,
130 csr: CsrForward,
131 db_root: &Path,
132 ) -> Self {
133 let mut csrs = HashMap::new();
134 csrs.insert(0u32, csr);
135 Self::new(store, catalog, csrs, db_root)
136 }
137
138 pub fn with_params(mut self, params: HashMap<String, Value>) -> Self {
143 self.params = params;
144 self
145 }
146
147 pub fn with_deadline(mut self, deadline: std::time::Instant) -> Self {
152 self.deadline = Some(deadline);
153 self
154 }
155
156 #[inline]
162 fn check_deadline(&self) -> sparrowdb_common::Result<()> {
163 if let Some(dl) = self.deadline {
164 if std::time::Instant::now() >= dl {
165 return Err(sparrowdb_common::Error::QueryTimeout);
166 }
167 }
168 Ok(())
169 }
170
171 fn resolve_rel_table_id(
180 &self,
181 src_label_id: u32,
182 dst_label_id: u32,
183 rel_type: &str,
184 ) -> RelTableLookup {
185 if rel_type.is_empty() {
186 return RelTableLookup::All;
187 }
188 match self
189 .catalog
190 .get_rel_table(src_label_id as u16, dst_label_id as u16, rel_type)
191 .ok()
192 .flatten()
193 {
194 Some(id) => RelTableLookup::Found(id as u32),
195 None => RelTableLookup::NotFound,
196 }
197 }
198
199 fn read_delta_for(&self, rel_table_id: u32) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
204 EdgeStore::open(&self.db_root, RelTableId(rel_table_id))
205 .and_then(|s| s.read_delta())
206 .unwrap_or_default()
207 }
208
209 fn read_delta_all(&self) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
213 let ids = self.catalog.list_rel_table_ids();
214 if ids.is_empty() {
215 return EdgeStore::open(&self.db_root, RelTableId(0))
217 .and_then(|s| s.read_delta())
218 .unwrap_or_default();
219 }
220 ids.into_iter()
221 .flat_map(|(id, _, _, _)| {
222 EdgeStore::open(&self.db_root, RelTableId(id as u32))
223 .and_then(|s| s.read_delta())
224 .unwrap_or_default()
225 })
226 .collect()
227 }
228
229 fn csr_neighbors(&self, rel_table_id: u32, src_slot: u64) -> Vec<u64> {
231 self.csrs
232 .get(&rel_table_id)
233 .map(|csr| csr.neighbors(src_slot).to_vec())
234 .unwrap_or_default()
235 }
236
237 fn csr_neighbors_all(&self, src_slot: u64) -> Vec<u64> {
239 let mut out: Vec<u64> = Vec::new();
240 for csr in self.csrs.values() {
241 out.extend_from_slice(csr.neighbors(src_slot));
242 }
243 out
244 }
245
246 pub fn execute(&mut self, cypher: &str) -> Result<QueryResult> {
251 let stmt = {
252 let _parse_span = info_span!("sparrowdb.parse", cypher = cypher).entered();
253 parse(cypher)?
254 };
255
256 let bound = {
257 let _bind_span = info_span!("sparrowdb.bind").entered();
258 bind(stmt, &self.catalog)?
259 };
260
261 {
262 let _plan_span = info_span!("sparrowdb.plan_execute").entered();
263 self.execute_bound(bound.inner)
264 }
265 }
266
267 pub fn execute_statement(&mut self, stmt: Statement) -> Result<QueryResult> {
272 self.execute_bound(stmt)
273 }
274
275 fn execute_bound(&mut self, stmt: Statement) -> Result<QueryResult> {
276 match stmt {
277 Statement::Match(m) => self.execute_match(&m),
278 Statement::MatchWith(mw) => self.execute_match_with(&mw),
279 Statement::Unwind(u) => self.execute_unwind(&u),
280 Statement::Create(c) => self.execute_create(&c),
281 Statement::Merge(_)
285 | Statement::MatchMergeRel(_)
286 | Statement::MatchMutate(_)
287 | Statement::MatchCreate(_) => Err(sparrowdb_common::Error::InvalidArgument(
288 "mutation statements must be executed via execute_mutation".into(),
289 )),
290 Statement::OptionalMatch(om) => self.execute_optional_match(&om),
291 Statement::MatchOptionalMatch(mom) => self.execute_match_optional_match(&mom),
292 Statement::Union(u) => self.execute_union(u),
293 Statement::Checkpoint | Statement::Optimize => Ok(QueryResult::empty(vec![])),
294 Statement::Call(c) => self.execute_call(&c),
295 Statement::Pipeline(p) => self.execute_pipeline(&p),
296 }
297 }
298
299 fn execute_call(&self, c: &CallStatement) -> Result<QueryResult> {
306 match c.procedure.as_str() {
307 "db.index.fulltext.queryNodes" => self.call_fulltext_query_nodes(c),
308 "db.schema" => self.call_db_schema(c),
309 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
310 "unknown procedure: {other}"
311 ))),
312 }
313 }
314
315 fn call_fulltext_query_nodes(&self, c: &CallStatement) -> Result<QueryResult> {
324 if c.args.len() != 2 {
326 return Err(sparrowdb_common::Error::InvalidArgument(
327 "db.index.fulltext.queryNodes requires exactly 2 arguments: (indexName, query)"
328 .into(),
329 ));
330 }
331
332 let index_name = eval_expr_to_string(&c.args[0])?;
334 let query = eval_expr_to_string(&c.args[1])?;
336
337 let index = FulltextIndex::open(&self.db_root, &index_name)?;
340 let node_ids = index.search(&query);
341
342 let yield_cols: Vec<String> = if c.yield_columns.is_empty() {
345 vec!["node".to_owned()]
346 } else {
347 c.yield_columns.clone()
348 };
349
350 if let Some(bad_col) = yield_cols.iter().find(|c| c.as_str() != "node") {
352 return Err(sparrowdb_common::Error::InvalidArgument(format!(
353 "unsupported YIELD column for db.index.fulltext.queryNodes: {bad_col}"
354 )));
355 }
356
357 let mut rows: Vec<Vec<Value>> = Vec::new();
359 for raw_id in node_ids {
360 let node_id = sparrowdb_common::NodeId(raw_id);
361 let row: Vec<Value> = yield_cols.iter().map(|_| Value::NodeRef(node_id)).collect();
362 rows.push(row);
363 }
364
365 let (columns, rows) = if let Some(ref ret) = c.return_clause {
367 self.project_call_return(ret, &yield_cols, rows)?
368 } else {
369 (yield_cols, rows)
370 };
371
372 Ok(QueryResult { columns, rows })
373 }
374
375 fn call_db_schema(&self, c: &CallStatement) -> Result<QueryResult> {
386 if !c.args.is_empty() {
387 return Err(sparrowdb_common::Error::InvalidArgument(
388 "db.schema requires exactly 0 arguments".into(),
389 ));
390 }
391 let columns = vec![
392 "type".to_owned(),
393 "name".to_owned(),
394 "properties".to_owned(),
395 ];
396
397 let wal_dir = self.db_root.join("wal");
399 let schema = WalReplayer::scan_schema(&wal_dir)?;
400
401 let mut rows: Vec<Vec<Value>> = Vec::new();
402
403 let labels = self.catalog.list_labels()?;
405 for (label_id, label_name) in &labels {
406 let mut prop_names: Vec<String> = schema
407 .node_props
408 .get(&(*label_id as u32))
409 .map(|s| s.iter().cloned().collect())
410 .unwrap_or_default();
411 prop_names.sort();
412 let props_value = Value::List(prop_names.into_iter().map(Value::String).collect());
413 rows.push(vec![
414 Value::String("node".to_owned()),
415 Value::String(label_name.clone()),
416 props_value,
417 ]);
418 }
419
420 let rel_tables = self.catalog.list_rel_tables()?;
422 let mut seen_rel_types: std::collections::HashSet<String> =
424 std::collections::HashSet::new();
425 for (_, _, rel_type) in &rel_tables {
426 if seen_rel_types.insert(rel_type.clone()) {
427 let mut prop_names: Vec<String> = schema
428 .rel_props
429 .get(rel_type)
430 .map(|s| s.iter().cloned().collect())
431 .unwrap_or_default();
432 prop_names.sort();
433 let props_value = Value::List(prop_names.into_iter().map(Value::String).collect());
434 rows.push(vec![
435 Value::String("relationship".to_owned()),
436 Value::String(rel_type.clone()),
437 props_value,
438 ]);
439 }
440 }
441
442 Ok(QueryResult { columns, rows })
443 }
444
445 fn project_call_return(
455 &self,
456 ret: &sparrowdb_cypher::ast::ReturnClause,
457 yield_cols: &[String],
458 rows: Vec<Vec<Value>>,
459 ) -> Result<(Vec<String>, Vec<Vec<Value>>)> {
460 let out_cols: Vec<String> = ret
462 .items
463 .iter()
464 .map(|item| {
465 item.alias
466 .clone()
467 .unwrap_or_else(|| expr_to_col_name(&item.expr))
468 })
469 .collect();
470
471 let mut out_rows = Vec::new();
472 for row in rows {
473 let env: HashMap<String, Value> = yield_cols
475 .iter()
476 .zip(row.iter())
477 .map(|(k, v)| (k.clone(), v.clone()))
478 .collect();
479
480 let projected: Vec<Value> = ret
481 .items
482 .iter()
483 .map(|item| eval_call_expr(&item.expr, &env, &self.store))
484 .collect();
485 out_rows.push(projected);
486 }
487 Ok((out_cols, out_rows))
488 }
489
490 pub fn is_mutation(stmt: &Statement) -> bool {
495 match stmt {
496 Statement::Merge(_)
497 | Statement::MatchMergeRel(_)
498 | Statement::MatchMutate(_)
499 | Statement::MatchCreate(_) => true,
500 Statement::Create(_) => true,
504 _ => false,
505 }
506 }
507
508 pub fn scan_match_mutate(&self, mm: &MatchMutateStatement) -> Result<Vec<NodeId>> {
514 if mm.match_patterns.is_empty() {
515 return Ok(vec![]);
516 }
517
518 if mm.match_patterns.len() != 1 || !mm.match_patterns[0].rels.is_empty() {
522 return Err(sparrowdb_common::Error::InvalidArgument(
523 "MATCH...SET/DELETE currently supports only single-node patterns (no relationships)"
524 .into(),
525 ));
526 }
527
528 let pat = &mm.match_patterns[0];
529 if pat.nodes.is_empty() {
530 return Ok(vec![]);
531 }
532 let node_pat = &pat.nodes[0];
533 let label = node_pat.labels.first().cloned().unwrap_or_default();
534
535 let label_id = match self.catalog.get_label(&label)? {
536 Some(id) => id as u32,
537 None => return Ok(vec![]),
539 };
540
541 let hwm = self.store.hwm_for_label(label_id)?;
542
543 let filter_col_ids: Vec<u32> = node_pat
545 .props
546 .iter()
547 .map(|pe| prop_name_to_col_id(&pe.key))
548 .collect();
549
550 let mut all_col_ids: Vec<u32> = filter_col_ids;
552 if let Some(ref where_expr) = mm.where_clause {
553 collect_col_ids_from_expr(where_expr, &mut all_col_ids);
554 }
555
556 let var_name = node_pat.var.as_str();
557 let mut matching_ids = Vec::new();
558
559 for slot in 0..hwm {
560 let node_id = NodeId(((label_id as u64) << 32) | slot);
561
562 if self.is_node_tombstoned(node_id) {
565 continue;
566 }
567
568 let props = read_node_props(&self.store, node_id, &all_col_ids)?;
569
570 if !matches_prop_filter_static(
571 &props,
572 &node_pat.props,
573 &self.dollar_params(),
574 &self.store,
575 ) {
576 continue;
577 }
578
579 if let Some(ref where_expr) = mm.where_clause {
580 let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
581 row_vals.extend(self.dollar_params());
582 if !self.eval_where_graph(where_expr, &row_vals) {
583 continue;
584 }
585 }
586
587 matching_ids.push(node_id);
588 }
589
590 Ok(matching_ids)
591 }
592
593 pub fn mutation_from_match_mutate(mm: &MatchMutateStatement) -> &Mutation {
596 &mm.mutation
597 }
598
599 fn is_node_tombstoned(&self, node_id: NodeId) -> bool {
608 match self.store.get_node_raw(node_id, &[0u32]) {
609 Ok(col0) => col0.iter().any(|&(c, v)| c == 0 && v == u64::MAX),
610 Err(sparrowdb_common::Error::NotFound) => false,
611 Err(e) => {
612 tracing::warn!(
613 node_id = node_id.0,
614 error = ?e,
615 "tombstone check failed; treating node as not tombstoned"
616 );
617 false
618 }
619 }
620 }
621
622 fn node_matches_prop_filter(
629 &self,
630 node_id: NodeId,
631 filter_col_ids: &[u32],
632 props: &[sparrowdb_cypher::ast::PropEntry],
633 ) -> bool {
634 if props.is_empty() {
635 return true;
636 }
637 match self.store.get_node_raw(node_id, filter_col_ids) {
638 Ok(raw_props) => {
639 matches_prop_filter_static(&raw_props, props, &self.dollar_params(), &self.store)
640 }
641 Err(_) => false,
642 }
643 }
644
645 pub fn scan_match_create(
653 &self,
654 mc: &MatchCreateStatement,
655 ) -> Result<HashMap<String, Vec<NodeId>>> {
656 let mut var_candidates: HashMap<String, Vec<NodeId>> = HashMap::new();
657
658 for pat in &mc.match_patterns {
659 for node_pat in &pat.nodes {
660 if node_pat.var.is_empty() {
661 continue;
662 }
663 if var_candidates.contains_key(&node_pat.var) {
665 continue;
666 }
667
668 let label = node_pat.labels.first().cloned().unwrap_or_default();
669 let label_id: u32 = match self.catalog.get_label(&label)? {
670 Some(id) => id as u32,
671 None => {
672 var_candidates.insert(node_pat.var.clone(), vec![]);
674 continue;
675 }
676 };
677
678 let hwm = self.store.hwm_for_label(label_id)?;
679
680 let filter_col_ids: Vec<u32> = node_pat
682 .props
683 .iter()
684 .map(|p| prop_name_to_col_id(&p.key))
685 .collect();
686
687 let mut matching_ids: Vec<NodeId> = Vec::new();
688 for slot in 0..hwm {
689 let node_id = NodeId(((label_id as u64) << 32) | slot);
690
691 match self.store.get_node_raw(node_id, &[0u32]) {
694 Ok(col0) if col0.iter().any(|&(c, v)| c == 0 && v == u64::MAX) => {
695 continue;
696 }
697 Ok(_) | Err(_) => {}
698 }
699
700 if !node_pat.props.is_empty() {
702 match self.store.get_node_raw(node_id, &filter_col_ids) {
703 Ok(props) => {
704 if !matches_prop_filter_static(
705 &props,
706 &node_pat.props,
707 &self.dollar_params(),
708 &self.store,
709 ) {
710 continue;
711 }
712 }
713 Err(_) => continue,
716 }
717 }
718
719 matching_ids.push(node_id);
720 }
721
722 var_candidates.insert(node_pat.var.clone(), matching_ids);
723 }
724 }
725
726 Ok(var_candidates)
727 }
728
729 pub fn scan_match_create_rows(
751 &self,
752 mc: &MatchCreateStatement,
753 ) -> Result<Vec<HashMap<String, NodeId>>> {
754 let mut accumulated: Vec<HashMap<String, NodeId>> = vec![HashMap::new()];
756
757 for pat in &mc.match_patterns {
758 if pat.rels.is_empty() {
759 let mut per_var: Vec<(String, Vec<NodeId>)> = Vec::new();
764
765 for node_pat in &pat.nodes {
766 if node_pat.var.is_empty() {
767 continue;
768 }
769
770 let scan_label_ids: Vec<u32> = if node_pat.labels.is_empty() {
774 self.catalog
775 .list_labels()?
776 .into_iter()
777 .map(|(id, _)| id as u32)
778 .collect()
779 } else {
780 let label = node_pat.labels.first().cloned().unwrap_or_default();
781 match self.catalog.get_label(&label)? {
782 Some(id) => vec![id as u32],
783 None => {
784 return Ok(vec![]);
786 }
787 }
788 };
789
790 let filter_col_ids: Vec<u32> = node_pat
791 .props
792 .iter()
793 .map(|p| prop_name_to_col_id(&p.key))
794 .collect();
795
796 let mut matching_ids: Vec<NodeId> = Vec::new();
797 for label_id in scan_label_ids {
798 let hwm = self.store.hwm_for_label(label_id)?;
799 for slot in 0..hwm {
800 let node_id = NodeId(((label_id as u64) << 32) | slot);
801
802 if self.is_node_tombstoned(node_id) {
803 continue;
804 }
805 if !self.node_matches_prop_filter(
806 node_id,
807 &filter_col_ids,
808 &node_pat.props,
809 ) {
810 continue;
811 }
812
813 matching_ids.push(node_id);
814 }
815 }
816
817 if matching_ids.is_empty() {
818 return Ok(vec![]);
820 }
821
822 per_var.push((node_pat.var.clone(), matching_ids));
823 }
824
825 for (var, candidates) in per_var {
829 let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
830 for row in &accumulated {
831 for &node_id in &candidates {
832 let mut new_row = row.clone();
833 new_row.insert(var.clone(), node_id);
834 next.push(new_row);
835 }
836 }
837 accumulated = next;
838 }
839 } else if pat.rels.len() == 1 && pat.nodes.len() == 2 {
840 let src_node_pat = &pat.nodes[0];
843 let dst_node_pat = &pat.nodes[1];
844 let rel_pat = &pat.rels[0];
845
846 if rel_pat.dir != sparrowdb_cypher::ast::EdgeDir::Outgoing {
848 return Err(sparrowdb_common::Error::Unimplemented);
849 }
850
851 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
852 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
853
854 let src_label_id: u32 = match self.catalog.get_label(&src_label)? {
855 Some(id) => id as u32,
856 None => return Ok(vec![]),
857 };
858 let dst_label_id: u32 = match self.catalog.get_label(&dst_label)? {
859 Some(id) => id as u32,
860 None => return Ok(vec![]),
861 };
862
863 let src_filter_cols: Vec<u32> = src_node_pat
864 .props
865 .iter()
866 .map(|p| prop_name_to_col_id(&p.key))
867 .collect();
868 let dst_filter_cols: Vec<u32> = dst_node_pat
869 .props
870 .iter()
871 .map(|p| prop_name_to_col_id(&p.key))
872 .collect();
873
874 let rel_lookup =
876 self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
877 if matches!(rel_lookup, RelTableLookup::NotFound) {
878 return Ok(vec![]);
879 }
880
881 let delta_adj: HashMap<u64, Vec<u64>> = {
884 let records: Vec<DeltaRecord> = match rel_lookup {
885 RelTableLookup::Found(rtid) => self.read_delta_for(rtid),
886 _ => self.read_delta_all(),
887 };
888 let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
889 for r in records {
890 let s = r.src.0;
891 let s_label = (s >> 32) as u32;
892 if s_label == src_label_id {
893 let s_slot = s & 0xFFFF_FFFF;
894 adj.entry(s_slot).or_default().push(r.dst.0 & 0xFFFF_FFFF);
895 }
896 }
897 adj
898 };
899
900 let hwm_src = self.store.hwm_for_label(src_label_id)?;
901
902 let mut pattern_rows: Vec<HashMap<String, NodeId>> = Vec::new();
904
905 for src_slot in 0..hwm_src {
906 self.check_deadline()?;
908
909 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
910
911 if self.is_node_tombstoned(src_node) {
912 continue;
913 }
914 if !self.node_matches_prop_filter(
915 src_node,
916 &src_filter_cols,
917 &src_node_pat.props,
918 ) {
919 continue;
920 }
921
922 let csr_neighbors_vec: Vec<u64> = match rel_lookup {
924 RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
925 _ => self.csr_neighbors_all(src_slot),
926 };
927 let empty: Vec<u64> = Vec::new();
928 let delta_neighbors: &[u64] =
929 delta_adj.get(&src_slot).map_or(&empty, |v| v.as_slice());
930
931 let mut seen: HashSet<u64> = HashSet::new();
932 for &dst_slot in csr_neighbors_vec.iter().chain(delta_neighbors.iter()) {
933 if !seen.insert(dst_slot) {
934 continue;
935 }
936 let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
937
938 if self.is_node_tombstoned(dst_node) {
939 continue;
940 }
941 if !self.node_matches_prop_filter(
942 dst_node,
943 &dst_filter_cols,
944 &dst_node_pat.props,
945 ) {
946 continue;
947 }
948
949 let mut row: HashMap<String, NodeId> = HashMap::new();
950
951 if !src_node_pat.var.is_empty()
954 && !dst_node_pat.var.is_empty()
955 && src_node_pat.var == dst_node_pat.var
956 {
957 if src_node != dst_node {
958 continue;
959 }
960 row.insert(src_node_pat.var.clone(), src_node);
961 } else {
962 if !src_node_pat.var.is_empty() {
963 row.insert(src_node_pat.var.clone(), src_node);
964 }
965 if !dst_node_pat.var.is_empty() {
966 row.insert(dst_node_pat.var.clone(), dst_node);
967 }
968 }
969 pattern_rows.push(row);
970 }
971 }
972
973 if pattern_rows.is_empty() {
974 return Ok(vec![]);
975 }
976
977 let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
981 for acc_row in &accumulated {
982 'outer: for pat_row in &pattern_rows {
983 for (k, v) in pat_row {
985 if let Some(existing) = acc_row.get(k) {
986 if existing != v {
987 continue 'outer;
988 }
989 }
990 }
991 let mut new_row = acc_row.clone();
992 new_row.extend(pat_row.iter().map(|(k, v)| (k.clone(), *v)));
993 next.push(new_row);
994 }
995 }
996 accumulated = next;
997 } else {
998 return Err(sparrowdb_common::Error::Unimplemented);
1000 }
1001 }
1002
1003 Ok(accumulated)
1004 }
1005
1006 pub fn scan_match_merge_rel_rows(
1010 &self,
1011 mm: &MatchMergeRelStatement,
1012 ) -> Result<Vec<HashMap<String, NodeId>>> {
1013 let proxy = MatchCreateStatement {
1016 match_patterns: mm.match_patterns.clone(),
1017 match_props: vec![],
1018 create: CreateStatement {
1019 nodes: vec![],
1020 edges: vec![],
1021 },
1022 };
1023 self.scan_match_create_rows(&proxy)
1024 }
1025
1026 fn execute_unwind(&self, u: &UnwindStatement) -> Result<QueryResult> {
1029 use crate::operators::{Operator, UnwindOperator};
1030
1031 let values = eval_list_expr(&u.expr, &self.params)?;
1033
1034 let column_names = extract_return_column_names(&u.return_clause.items);
1036
1037 if values.is_empty() {
1038 return Ok(QueryResult::empty(column_names));
1039 }
1040
1041 let mut op = UnwindOperator::new(u.alias.clone(), values);
1042 let chunks = op.collect_all()?;
1043
1044 let mut rows: Vec<Vec<Value>> = Vec::new();
1051 for chunk in &chunks {
1052 for group in &chunk.groups {
1053 let n = group.len();
1054 for row_idx in 0..n {
1055 let row = u
1056 .return_clause
1057 .items
1058 .iter()
1059 .map(|item| {
1060 let is_alias = match &item.expr {
1063 Expr::Var(name) => name == &u.alias,
1064 _ => false,
1065 };
1066 if is_alias {
1067 group.get_value(&u.alias, row_idx).unwrap_or(Value::Null)
1068 } else {
1069 Value::Null
1072 }
1073 })
1074 .collect();
1075 rows.push(row);
1076 }
1077 }
1078 }
1079
1080 Ok(QueryResult {
1081 columns: column_names,
1082 rows,
1083 })
1084 }
1085
1086 fn execute_create(&mut self, create: &CreateStatement) -> Result<QueryResult> {
1096 for node in &create.nodes {
1097 let label = node.labels.first().cloned().unwrap_or_default();
1099
1100 if is_reserved_label(&label) {
1102 return Err(sparrowdb_common::Error::InvalidArgument(format!(
1103 "invalid argument: label \"{label}\" is reserved — the __SO_ prefix is for internal use only"
1104 )));
1105 }
1106
1107 let label_id: u32 = match self.catalog.get_label(&label)? {
1108 Some(id) => id as u32,
1109 None => self.catalog.create_label(&label)? as u32,
1110 };
1111
1112 let empty_bindings: HashMap<String, Value> = HashMap::new();
1116 let props: Vec<(u32, StoreValue)> = node
1117 .props
1118 .iter()
1119 .map(|entry| {
1120 let col_id = prop_name_to_col_id(&entry.key);
1121 let val = eval_expr(&entry.value, &empty_bindings);
1122 let store_val = value_to_store_value(val);
1123 (col_id, store_val)
1124 })
1125 .collect();
1126
1127 self.store.create_node(label_id, &props)?;
1128 }
1129 Ok(QueryResult::empty(vec![]))
1130 }
1131
1132 fn execute_union(&mut self, u: UnionStatement) -> Result<QueryResult> {
1141 let left_result = self.execute_bound(*u.left)?;
1142 let right_result = self.execute_bound(*u.right)?;
1143
1144 if !left_result.columns.is_empty()
1146 && !right_result.columns.is_empty()
1147 && left_result.columns.len() != right_result.columns.len()
1148 {
1149 return Err(sparrowdb_common::Error::InvalidArgument(format!(
1150 "UNION: left side has {} columns, right side has {}",
1151 left_result.columns.len(),
1152 right_result.columns.len()
1153 )));
1154 }
1155
1156 let columns = if !left_result.columns.is_empty() {
1157 left_result.columns.clone()
1158 } else {
1159 right_result.columns.clone()
1160 };
1161
1162 let mut rows = left_result.rows;
1163 rows.extend(right_result.rows);
1164
1165 if !u.all {
1166 deduplicate_rows(&mut rows);
1167 }
1168
1169 Ok(QueryResult { columns, rows })
1170 }
1171
1172 fn execute_match_with(&self, m: &MatchWithStatement) -> Result<QueryResult> {
1181 let intermediate = self.collect_match_rows_for_with(
1183 &m.match_patterns,
1184 m.match_where.as_ref(),
1185 &m.with_clause,
1186 )?;
1187
1188 let has_agg = m
1192 .with_clause
1193 .items
1194 .iter()
1195 .any(|item| is_aggregate_expr(&item.expr));
1196
1197 let projected: Vec<HashMap<String, Value>> = if has_agg {
1198 let agg_rows = self.aggregate_with_items(&intermediate, &m.with_clause.items);
1200 agg_rows
1202 .into_iter()
1203 .filter(|with_vals| {
1204 if let Some(ref where_expr) = m.with_clause.where_clause {
1205 let mut with_vals_p = with_vals.clone();
1206 with_vals_p.extend(self.dollar_params());
1207 self.eval_where_graph(where_expr, &with_vals_p)
1208 } else {
1209 true
1210 }
1211 })
1212 .map(|mut with_vals| {
1213 with_vals.extend(self.dollar_params());
1214 with_vals
1215 })
1216 .collect()
1217 } else {
1218 let mut projected: Vec<HashMap<String, Value>> = Vec::new();
1220 for row_vals in &intermediate {
1221 let mut with_vals: HashMap<String, Value> = HashMap::new();
1222 for item in &m.with_clause.items {
1223 let val = self.eval_expr_graph(&item.expr, row_vals);
1224 with_vals.insert(item.alias.clone(), val);
1225 if let sparrowdb_cypher::ast::Expr::Var(ref src_var) = item.expr {
1229 if let Some(node_ref) = row_vals.get(src_var) {
1230 if matches!(node_ref, Value::NodeRef(_)) {
1231 with_vals.insert(item.alias.clone(), node_ref.clone());
1232 with_vals.insert(
1233 format!("{}.__node_id__", item.alias),
1234 node_ref.clone(),
1235 );
1236 }
1237 }
1238 let nid_key = format!("{src_var}.__node_id__");
1240 if let Some(node_ref) = row_vals.get(&nid_key) {
1241 with_vals
1242 .insert(format!("{}.__node_id__", item.alias), node_ref.clone());
1243 }
1244 }
1245 }
1246 if let Some(ref where_expr) = m.with_clause.where_clause {
1247 let mut with_vals_p = with_vals.clone();
1248 with_vals_p.extend(self.dollar_params());
1249 if !self.eval_where_graph(where_expr, &with_vals_p) {
1250 continue;
1251 }
1252 }
1253 with_vals.extend(self.dollar_params());
1256 projected.push(with_vals);
1257 }
1258 projected
1259 };
1260
1261 let column_names = extract_return_column_names(&m.return_clause.items);
1263
1264 let mut ordered_projected = projected;
1268 if !m.order_by.is_empty() {
1269 ordered_projected.sort_by(|a, b| {
1270 for (expr, dir) in &m.order_by {
1271 let val_a = eval_expr(expr, a);
1272 let val_b = eval_expr(expr, b);
1273 let cmp = compare_values(&val_a, &val_b);
1274 let cmp = if *dir == SortDir::Desc {
1275 cmp.reverse()
1276 } else {
1277 cmp
1278 };
1279 if cmp != std::cmp::Ordering::Equal {
1280 return cmp;
1281 }
1282 }
1283 std::cmp::Ordering::Equal
1284 });
1285 }
1286
1287 if let Some(skip) = m.skip {
1289 let skip = (skip as usize).min(ordered_projected.len());
1290 ordered_projected.drain(0..skip);
1291 }
1292 if let Some(lim) = m.limit {
1293 ordered_projected.truncate(lim as usize);
1294 }
1295
1296 let mut rows: Vec<Vec<Value>> = ordered_projected
1297 .iter()
1298 .map(|with_vals| {
1299 m.return_clause
1300 .items
1301 .iter()
1302 .map(|item| self.eval_expr_graph(&item.expr, with_vals))
1303 .collect()
1304 })
1305 .collect();
1306
1307 if m.distinct {
1308 deduplicate_rows(&mut rows);
1309 }
1310
1311 Ok(QueryResult {
1312 columns: column_names,
1313 rows,
1314 })
1315 }
1316
1317 fn aggregate_with_items(
1322 &self,
1323 rows: &[HashMap<String, Value>],
1324 items: &[sparrowdb_cypher::ast::WithItem],
1325 ) -> Vec<HashMap<String, Value>> {
1326 let key_indices: Vec<usize> = items
1328 .iter()
1329 .enumerate()
1330 .filter(|(_, item)| !is_aggregate_expr(&item.expr))
1331 .map(|(i, _)| i)
1332 .collect();
1333 let agg_indices: Vec<usize> = items
1334 .iter()
1335 .enumerate()
1336 .filter(|(_, item)| is_aggregate_expr(&item.expr))
1337 .map(|(i, _)| i)
1338 .collect();
1339
1340 let mut group_keys: Vec<Vec<Value>> = Vec::new();
1342 let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new(); for row_vals in rows {
1345 let key: Vec<Value> = key_indices
1346 .iter()
1347 .map(|&i| eval_expr(&items[i].expr, row_vals))
1348 .collect();
1349 let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
1350 pos
1351 } else {
1352 group_keys.push(key);
1353 group_accum.push(vec![vec![]; agg_indices.len()]);
1354 group_keys.len() - 1
1355 };
1356 for (ai, &ri) in agg_indices.iter().enumerate() {
1357 match &items[ri].expr {
1358 sparrowdb_cypher::ast::Expr::CountStar => {
1359 group_accum[group_idx][ai].push(Value::Int64(1));
1360 }
1361 sparrowdb_cypher::ast::Expr::FnCall { name, args }
1362 if name.to_lowercase() == "collect" =>
1363 {
1364 let val = if !args.is_empty() {
1365 eval_expr(&args[0], row_vals)
1366 } else {
1367 Value::Null
1368 };
1369 if !matches!(val, Value::Null) {
1370 group_accum[group_idx][ai].push(val);
1371 }
1372 }
1373 sparrowdb_cypher::ast::Expr::FnCall { name, args }
1374 if matches!(
1375 name.to_lowercase().as_str(),
1376 "count" | "sum" | "avg" | "min" | "max"
1377 ) =>
1378 {
1379 let val = if !args.is_empty() {
1380 eval_expr(&args[0], row_vals)
1381 } else {
1382 Value::Null
1383 };
1384 if !matches!(val, Value::Null) {
1385 group_accum[group_idx][ai].push(val);
1386 }
1387 }
1388 _ => {}
1389 }
1390 }
1391 }
1392
1393 if rows.is_empty() && key_indices.is_empty() {
1396 let mut out_row: HashMap<String, Value> = HashMap::new();
1397 for &ri in &agg_indices {
1398 let val = match &items[ri].expr {
1399 sparrowdb_cypher::ast::Expr::CountStar => Value::Int64(0),
1400 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1401 if name.to_lowercase() == "collect" =>
1402 {
1403 Value::List(vec![])
1404 }
1405 _ => Value::Int64(0),
1406 };
1407 out_row.insert(items[ri].alias.clone(), val);
1408 }
1409 return vec![out_row];
1410 }
1411
1412 let mut result: Vec<HashMap<String, Value>> = Vec::new();
1414 for (gi, key_vals) in group_keys.iter().enumerate() {
1415 let mut out_row: HashMap<String, Value> = HashMap::new();
1416 for (ki, &ri) in key_indices.iter().enumerate() {
1418 out_row.insert(items[ri].alias.clone(), key_vals[ki].clone());
1419 }
1420 for (ai, &ri) in agg_indices.iter().enumerate() {
1422 let accum = &group_accum[gi][ai];
1423 let val = match &items[ri].expr {
1424 sparrowdb_cypher::ast::Expr::CountStar => Value::Int64(accum.len() as i64),
1425 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1426 if name.to_lowercase() == "collect" =>
1427 {
1428 Value::List(accum.clone())
1429 }
1430 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1431 if name.to_lowercase() == "count" =>
1432 {
1433 Value::Int64(accum.len() as i64)
1434 }
1435 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1436 if name.to_lowercase() == "sum" =>
1437 {
1438 let sum: i64 = accum
1439 .iter()
1440 .filter_map(|v| {
1441 if let Value::Int64(n) = v {
1442 Some(*n)
1443 } else {
1444 None
1445 }
1446 })
1447 .sum();
1448 Value::Int64(sum)
1449 }
1450 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1451 if name.to_lowercase() == "min" =>
1452 {
1453 accum
1454 .iter()
1455 .min_by(|a, b| compare_values(a, b))
1456 .cloned()
1457 .unwrap_or(Value::Null)
1458 }
1459 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1460 if name.to_lowercase() == "max" =>
1461 {
1462 accum
1463 .iter()
1464 .max_by(|a, b| compare_values(a, b))
1465 .cloned()
1466 .unwrap_or(Value::Null)
1467 }
1468 _ => Value::Null,
1469 };
1470 out_row.insert(items[ri].alias.clone(), val);
1471 }
1472 result.push(out_row);
1473 }
1474 result
1475 }
1476
1477 fn execute_pipeline(&self, p: &PipelineStatement) -> Result<QueryResult> {
1482 let mut current_rows: Vec<HashMap<String, Value>> =
1484 if let Some((expr, alias)) = &p.leading_unwind {
1485 let values = eval_list_expr(expr, &self.params)?;
1487 values
1488 .into_iter()
1489 .map(|v| {
1490 let mut m = HashMap::new();
1491 m.insert(alias.clone(), v);
1492 m
1493 })
1494 .collect()
1495 } else if let Some(ref patterns) = p.leading_match {
1496 self.collect_pipeline_match_rows(patterns, p.leading_where.as_ref())?
1501 } else {
1502 vec![HashMap::new()]
1503 };
1504
1505 for stage in &p.stages {
1507 match stage {
1508 PipelineStage::With {
1509 clause,
1510 order_by,
1511 skip,
1512 limit,
1513 } => {
1514 if !order_by.is_empty() {
1518 current_rows.sort_by(|a, b| {
1519 for (expr, dir) in order_by {
1520 let va = eval_expr(expr, a);
1521 let vb = eval_expr(expr, b);
1522 let cmp = compare_values(&va, &vb);
1523 let cmp = if *dir == SortDir::Desc {
1524 cmp.reverse()
1525 } else {
1526 cmp
1527 };
1528 if cmp != std::cmp::Ordering::Equal {
1529 return cmp;
1530 }
1531 }
1532 std::cmp::Ordering::Equal
1533 });
1534 }
1535 if let Some(s) = skip {
1536 let s = (*s as usize).min(current_rows.len());
1537 current_rows.drain(0..s);
1538 }
1539 if let Some(l) = limit {
1540 current_rows.truncate(*l as usize);
1541 }
1542
1543 let has_agg = clause
1545 .items
1546 .iter()
1547 .any(|item| is_aggregate_expr(&item.expr));
1548 let next_rows: Vec<HashMap<String, Value>> = if has_agg {
1549 let agg_rows = self.aggregate_with_items(¤t_rows, &clause.items);
1550 agg_rows
1551 .into_iter()
1552 .filter(|with_vals| {
1553 if let Some(ref where_expr) = clause.where_clause {
1554 let mut wv = with_vals.clone();
1555 wv.extend(self.dollar_params());
1556 self.eval_where_graph(where_expr, &wv)
1557 } else {
1558 true
1559 }
1560 })
1561 .map(|mut with_vals| {
1562 with_vals.extend(self.dollar_params());
1563 with_vals
1564 })
1565 .collect()
1566 } else {
1567 let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
1568 for row_vals in ¤t_rows {
1569 let mut with_vals: HashMap<String, Value> = HashMap::new();
1570 for item in &clause.items {
1571 let val = self.eval_expr_graph(&item.expr, row_vals);
1572 with_vals.insert(item.alias.clone(), val);
1573 if let sparrowdb_cypher::ast::Expr::Var(ref src_var) = item.expr {
1575 if let Some(nr @ Value::NodeRef(_)) = row_vals.get(src_var) {
1576 with_vals.insert(item.alias.clone(), nr.clone());
1577 with_vals.insert(
1578 format!("{}.__node_id__", item.alias),
1579 nr.clone(),
1580 );
1581 }
1582 let nid_key = format!("{src_var}.__node_id__");
1583 if let Some(nr) = row_vals.get(&nid_key) {
1584 with_vals.insert(
1585 format!("{}.__node_id__", item.alias),
1586 nr.clone(),
1587 );
1588 }
1589 }
1590 }
1591 if let Some(ref where_expr) = clause.where_clause {
1592 let mut wv = with_vals.clone();
1593 wv.extend(self.dollar_params());
1594 if !self.eval_where_graph(where_expr, &wv) {
1595 continue;
1596 }
1597 }
1598 with_vals.extend(self.dollar_params());
1599 next_rows.push(with_vals);
1600 }
1601 next_rows
1602 };
1603 current_rows = next_rows;
1604 }
1605 PipelineStage::Match {
1606 patterns,
1607 where_clause,
1608 } => {
1609 let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
1612 for binding in ¤t_rows {
1613 let new_rows = self.execute_pipeline_match_stage(
1614 patterns,
1615 where_clause.as_ref(),
1616 binding,
1617 )?;
1618 next_rows.extend(new_rows);
1619 }
1620 current_rows = next_rows;
1621 }
1622 PipelineStage::Unwind { alias, new_alias } => {
1623 let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
1625 for row_vals in ¤t_rows {
1626 let list_val = row_vals.get(alias.as_str()).cloned().unwrap_or(Value::Null);
1627 let items = match list_val {
1628 Value::List(v) => v,
1629 other => vec![other],
1630 };
1631 for item in items {
1632 let mut new_row = row_vals.clone();
1633 new_row.insert(new_alias.clone(), item);
1634 next_rows.push(new_row);
1635 }
1636 }
1637 current_rows = next_rows;
1638 }
1639 }
1640 }
1641
1642 let column_names = extract_return_column_names(&p.return_clause.items);
1644
1645 if !p.return_order_by.is_empty() {
1647 current_rows.sort_by(|a, b| {
1648 for (expr, dir) in &p.return_order_by {
1649 let va = eval_expr(expr, a);
1650 let vb = eval_expr(expr, b);
1651 let cmp = compare_values(&va, &vb);
1652 let cmp = if *dir == SortDir::Desc {
1653 cmp.reverse()
1654 } else {
1655 cmp
1656 };
1657 if cmp != std::cmp::Ordering::Equal {
1658 return cmp;
1659 }
1660 }
1661 std::cmp::Ordering::Equal
1662 });
1663 }
1664
1665 if let Some(skip) = p.return_skip {
1666 let skip = (skip as usize).min(current_rows.len());
1667 current_rows.drain(0..skip);
1668 }
1669 if let Some(lim) = p.return_limit {
1670 current_rows.truncate(lim as usize);
1671 }
1672
1673 let mut rows: Vec<Vec<Value>> = current_rows
1674 .iter()
1675 .map(|row_vals| {
1676 p.return_clause
1677 .items
1678 .iter()
1679 .map(|item| self.eval_expr_graph(&item.expr, row_vals))
1680 .collect()
1681 })
1682 .collect();
1683
1684 if p.distinct {
1685 deduplicate_rows(&mut rows);
1686 }
1687
1688 Ok(QueryResult {
1689 columns: column_names,
1690 rows,
1691 })
1692 }
1693
1694 fn collect_pipeline_match_rows(
1700 &self,
1701 patterns: &[PathPattern],
1702 where_clause: Option<&Expr>,
1703 ) -> Result<Vec<HashMap<String, Value>>> {
1704 if patterns.is_empty() {
1705 return Ok(vec![HashMap::new()]);
1706 }
1707
1708 let pat = &patterns[0];
1710 let node = &pat.nodes[0];
1711 let var_name = node.var.as_str();
1712 let label = node.labels.first().cloned().unwrap_or_default();
1713
1714 let label_id = match self.catalog.get_label(&label)? {
1715 Some(id) => id as u32,
1716 None => return Ok(vec![]),
1717 };
1718 let hwm = self.store.hwm_for_label(label_id)?;
1719 let col_ids: Vec<u32> = self.store.col_ids_for_label(label_id).unwrap_or_default();
1720
1721 let mut result: Vec<HashMap<String, Value>> = Vec::new();
1722 for slot in 0..hwm {
1723 let node_id = NodeId(((label_id as u64) << 32) | slot);
1724 if self.is_node_tombstoned(node_id) {
1725 continue;
1726 }
1727 let props = match self.store.get_node_raw(node_id, &col_ids) {
1728 Ok(p) => p,
1729 Err(_) => continue,
1730 };
1731 if !self.matches_prop_filter(&props, &node.props) {
1732 continue;
1733 }
1734 let mut row_vals = build_row_vals(&props, var_name, &col_ids, &self.store);
1735 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
1737 row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
1738
1739 if let Some(wexpr) = where_clause {
1740 let mut row_vals_p = row_vals.clone();
1741 row_vals_p.extend(self.dollar_params());
1742 if !self.eval_where_graph(wexpr, &row_vals_p) {
1743 continue;
1744 }
1745 }
1746 result.push(row_vals);
1747 }
1748 Ok(result)
1749 }
1750
1751 fn execute_pipeline_match_stage(
1760 &self,
1761 patterns: &[PathPattern],
1762 where_clause: Option<&Expr>,
1763 binding: &HashMap<String, Value>,
1764 ) -> Result<Vec<HashMap<String, Value>>> {
1765 if patterns.is_empty() {
1766 return Ok(vec![binding.clone()]);
1767 }
1768
1769 let pat = &patterns[0];
1770
1771 if !pat.rels.is_empty() {
1773 return self.execute_pipeline_match_hop(pat, where_clause, binding);
1776 }
1777
1778 let node = &pat.nodes[0];
1779 let var_name = node.var.as_str();
1780 let label = node.labels.first().cloned().unwrap_or_default();
1781
1782 let label_id = match self.catalog.get_label(&label)? {
1783 Some(id) => id as u32,
1784 None => return Ok(vec![]),
1785 };
1786 let hwm = self.store.hwm_for_label(label_id)?;
1787 let col_ids: Vec<u32> = self.store.col_ids_for_label(label_id).unwrap_or_default();
1788
1789 let mut result: Vec<HashMap<String, Value>> = Vec::new();
1790 let params = self.dollar_params();
1791 for slot in 0..hwm {
1792 let node_id = NodeId(((label_id as u64) << 32) | slot);
1793 if self.is_node_tombstoned(node_id) {
1794 continue;
1795 }
1796 let props = match self.store.get_node_raw(node_id, &col_ids) {
1797 Ok(p) => p,
1798 Err(_) => continue,
1799 };
1800
1801 if !self.matches_prop_filter_with_binding(&props, &node.props, binding, ¶ms) {
1803 continue;
1804 }
1805
1806 let mut row_vals = build_row_vals(&props, var_name, &col_ids, &self.store);
1807 row_vals.extend(binding.clone());
1809 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
1810 row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
1811
1812 if let Some(wexpr) = where_clause {
1813 let mut row_vals_p = row_vals.clone();
1814 row_vals_p.extend(params.clone());
1815 if !self.eval_where_graph(wexpr, &row_vals_p) {
1816 continue;
1817 }
1818 }
1819 result.push(row_vals);
1820 }
1821 Ok(result)
1822 }
1823
1824 fn execute_pipeline_match_hop(
1829 &self,
1830 pat: &sparrowdb_cypher::ast::PathPattern,
1831 where_clause: Option<&Expr>,
1832 binding: &HashMap<String, Value>,
1833 ) -> Result<Vec<HashMap<String, Value>>> {
1834 if pat.nodes.len() < 2 || pat.rels.is_empty() {
1835 return Ok(vec![]);
1836 }
1837 let src_pat = &pat.nodes[0];
1838 let dst_pat = &pat.nodes[1];
1839 let rel_pat = &pat.rels[0];
1840
1841 let src_label = src_pat.labels.first().cloned().unwrap_or_default();
1842 let dst_label = dst_pat.labels.first().cloned().unwrap_or_default();
1843
1844 let src_label_id = match self.catalog.get_label(&src_label)? {
1845 Some(id) => id as u32,
1846 None => return Ok(vec![]),
1847 };
1848 let dst_label_id = match self.catalog.get_label(&dst_label)? {
1849 Some(id) => id as u32,
1850 None => return Ok(vec![]),
1851 };
1852
1853 let src_col_ids: Vec<u32> = self
1854 .store
1855 .col_ids_for_label(src_label_id)
1856 .unwrap_or_default();
1857 let dst_col_ids: Vec<u32> = self
1858 .store
1859 .col_ids_for_label(dst_label_id)
1860 .unwrap_or_default();
1861 let params = self.dollar_params();
1862
1863 let src_candidates: Vec<NodeId> = {
1865 let bound_src = binding
1867 .get(&src_pat.var)
1868 .or_else(|| binding.get(&format!("{}.__node_id__", src_pat.var)));
1869 if let Some(Value::NodeRef(nid)) = bound_src {
1870 vec![*nid]
1871 } else {
1872 let hwm = self.store.hwm_for_label(src_label_id)?;
1873 let mut cands = Vec::new();
1874 for slot in 0..hwm {
1875 let node_id = NodeId(((src_label_id as u64) << 32) | slot);
1876 if self.is_node_tombstoned(node_id) {
1877 continue;
1878 }
1879 if let Ok(props) = self.store.get_node_raw(node_id, &src_col_ids) {
1880 if self.matches_prop_filter_with_binding(
1881 &props,
1882 &src_pat.props,
1883 binding,
1884 ¶ms,
1885 ) {
1886 cands.push(node_id);
1887 }
1888 }
1889 }
1890 cands
1891 }
1892 };
1893
1894 let rel_table_id = self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
1895
1896 let mut result: Vec<HashMap<String, Value>> = Vec::new();
1897 for src_id in src_candidates {
1898 let src_slot = src_id.0 & 0xFFFF_FFFF;
1899 let dst_slots: Vec<u64> = match &rel_table_id {
1900 RelTableLookup::Found(rtid) => self.csr_neighbors(*rtid, src_slot),
1901 RelTableLookup::NotFound => continue,
1902 RelTableLookup::All => self.csr_neighbors_all(src_slot),
1903 };
1904 let delta_slots: Vec<u64> = self
1906 .read_delta_all()
1907 .into_iter()
1908 .filter(|r| {
1909 let r_src_label = (r.src.0 >> 32) as u32;
1910 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
1911 r_src_label == src_label_id && r_src_slot == src_slot
1912 })
1913 .map(|r| r.dst.0 & 0xFFFF_FFFF)
1914 .collect();
1915 let all_slots: std::collections::HashSet<u64> =
1916 dst_slots.into_iter().chain(delta_slots).collect();
1917
1918 for dst_slot in all_slots {
1919 let dst_id = NodeId(((dst_label_id as u64) << 32) | dst_slot);
1920 if self.is_node_tombstoned(dst_id) {
1921 continue;
1922 }
1923 if let Ok(dst_props) = self.store.get_node_raw(dst_id, &dst_col_ids) {
1924 if !self.matches_prop_filter_with_binding(
1925 &dst_props,
1926 &dst_pat.props,
1927 binding,
1928 ¶ms,
1929 ) {
1930 continue;
1931 }
1932 let src_props = self
1933 .store
1934 .get_node_raw(src_id, &src_col_ids)
1935 .unwrap_or_default();
1936 let mut row_vals =
1937 build_row_vals(&src_props, &src_pat.var, &src_col_ids, &self.store);
1938 row_vals.extend(build_row_vals(
1939 &dst_props,
1940 &dst_pat.var,
1941 &dst_col_ids,
1942 &self.store,
1943 ));
1944 row_vals.extend(binding.clone());
1946 row_vals.insert(src_pat.var.clone(), Value::NodeRef(src_id));
1947 row_vals.insert(
1948 format!("{}.__node_id__", src_pat.var),
1949 Value::NodeRef(src_id),
1950 );
1951 row_vals.insert(dst_pat.var.clone(), Value::NodeRef(dst_id));
1952 row_vals.insert(
1953 format!("{}.__node_id__", dst_pat.var),
1954 Value::NodeRef(dst_id),
1955 );
1956
1957 if let Some(wexpr) = where_clause {
1958 let mut row_vals_p = row_vals.clone();
1959 row_vals_p.extend(params.clone());
1960 if !self.eval_where_graph(wexpr, &row_vals_p) {
1961 continue;
1962 }
1963 }
1964 result.push(row_vals);
1965 }
1966 }
1967 }
1968 Ok(result)
1969 }
1970
1971 fn matches_prop_filter_with_binding(
1977 &self,
1978 props: &[(u32, u64)],
1979 filters: &[sparrowdb_cypher::ast::PropEntry],
1980 binding: &HashMap<String, Value>,
1981 params: &HashMap<String, Value>,
1982 ) -> bool {
1983 for f in filters {
1984 let col_id = prop_name_to_col_id(&f.key);
1985 let stored_raw = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
1986
1987 let filter_val = match &f.value {
1989 sparrowdb_cypher::ast::Expr::Var(v) => {
1990 binding.get(v).cloned().unwrap_or(Value::Null)
1992 }
1993 other => eval_expr(other, params),
1994 };
1995
1996 let stored_val = stored_raw.map(|raw| decode_raw_val(raw, &self.store));
1997 let matches = match (stored_val, &filter_val) {
1998 (Some(Value::String(a)), Value::String(b)) => &a == b,
1999 (Some(Value::Int64(a)), Value::Int64(b)) => a == *b,
2000 (Some(Value::Bool(a)), Value::Bool(b)) => a == *b,
2001 (Some(Value::Float64(a)), Value::Float64(b)) => a == *b,
2002 (None, Value::Null) => true,
2003 _ => false,
2004 };
2005 if !matches {
2006 return false;
2007 }
2008 }
2009 true
2010 }
2011
2012 fn collect_match_rows_for_with(
2021 &self,
2022 patterns: &[PathPattern],
2023 where_clause: Option<&Expr>,
2024 with_clause: &WithClause,
2025 ) -> Result<Vec<HashMap<String, Value>>> {
2026 if patterns.is_empty() || patterns[0].rels.is_empty() {
2027 let pat = &patterns[0];
2028 let node = &pat.nodes[0];
2029 let var_name = node.var.as_str();
2030 let label = node.labels.first().cloned().unwrap_or_default();
2031 let label_id = self
2032 .catalog
2033 .get_label(&label)?
2034 .ok_or(sparrowdb_common::Error::NotFound)?;
2035 let label_id_u32 = label_id as u32;
2036 let hwm = self.store.hwm_for_label(label_id_u32)?;
2037
2038 let mut all_col_ids: Vec<u32> = Vec::new();
2040 if let Some(wexpr) = &where_clause {
2041 collect_col_ids_from_expr(wexpr, &mut all_col_ids);
2042 }
2043 for item in &with_clause.items {
2044 collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
2045 }
2046 for p in &node.props {
2047 let col_id = prop_name_to_col_id(&p.key);
2048 if !all_col_ids.contains(&col_id) {
2049 all_col_ids.push(col_id);
2050 }
2051 }
2052
2053 let mut result: Vec<HashMap<String, Value>> = Vec::new();
2054 for slot in 0..hwm {
2055 let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
2056 if self.is_node_tombstoned(node_id) {
2059 continue;
2060 }
2061 let props = read_node_props(&self.store, node_id, &all_col_ids)?;
2062 if !self.matches_prop_filter(&props, &node.props) {
2063 continue;
2064 }
2065 let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
2066 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2069 row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
2070 if let Some(wexpr) = &where_clause {
2071 let mut row_vals_p = row_vals.clone();
2072 row_vals_p.extend(self.dollar_params());
2073 if !self.eval_where_graph(wexpr, &row_vals_p) {
2074 continue;
2075 }
2076 }
2077 result.push(row_vals);
2078 }
2079 Ok(result)
2080 } else {
2081 Err(sparrowdb_common::Error::Unimplemented)
2082 }
2083 }
2084
2085 fn execute_match(&self, m: &MatchStatement) -> Result<QueryResult> {
2086 if m.pattern.is_empty() {
2087 let column_names = extract_return_column_names(&m.return_clause.items);
2089 let empty_vals: HashMap<String, Value> = HashMap::new();
2090 let row: Vec<Value> = m
2091 .return_clause
2092 .items
2093 .iter()
2094 .map(|item| eval_expr(&item.expr, &empty_vals))
2095 .collect();
2096 return Ok(QueryResult {
2097 columns: column_names,
2098 rows: vec![row],
2099 });
2100 }
2101
2102 let is_two_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() == 2;
2104 let is_one_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() == 1;
2105 let is_n_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() >= 3;
2107 let is_var_len = m.pattern.len() == 1
2109 && m.pattern[0].rels.len() == 1
2110 && m.pattern[0].rels[0].min_hops.is_some();
2111
2112 let column_names = extract_return_column_names(&m.return_clause.items);
2113
2114 let is_multi_pattern = m.pattern.len() > 1 && m.pattern.iter().all(|p| p.rels.is_empty());
2117
2118 if is_var_len {
2119 self.execute_variable_length(m, &column_names)
2120 } else if is_two_hop {
2121 self.execute_two_hop(m, &column_names)
2122 } else if is_one_hop {
2123 self.execute_one_hop(m, &column_names)
2124 } else if is_n_hop {
2125 self.execute_n_hop(m, &column_names)
2126 } else if is_multi_pattern {
2127 self.execute_multi_pattern_scan(m, &column_names)
2128 } else if m.pattern[0].rels.is_empty() {
2129 self.execute_scan(m, &column_names)
2130 } else {
2131 self.execute_scan(m, &column_names)
2133 }
2134 }
2135
2136 fn execute_optional_match(&self, om: &OptionalMatchStatement) -> Result<QueryResult> {
2143 use sparrowdb_common::Error;
2144
2145 let match_stmt = MatchStatement {
2147 pattern: om.pattern.clone(),
2148 where_clause: om.where_clause.clone(),
2149 return_clause: om.return_clause.clone(),
2150 order_by: om.order_by.clone(),
2151 skip: om.skip,
2152 limit: om.limit,
2153 distinct: om.distinct,
2154 };
2155
2156 let column_names = extract_return_column_names(&om.return_clause.items);
2157
2158 let result = self.execute_match(&match_stmt);
2159
2160 match result {
2161 Ok(qr) if !qr.rows.is_empty() => Ok(qr),
2162 Ok(_) | Err(Error::NotFound) | Err(Error::InvalidArgument(_)) => {
2164 let null_row = vec![Value::Null; column_names.len()];
2165 Ok(QueryResult {
2166 columns: column_names,
2167 rows: vec![null_row],
2168 })
2169 }
2170 Err(e) => Err(e),
2171 }
2172 }
2173
2174 fn execute_match_optional_match(
2182 &self,
2183 mom: &MatchOptionalMatchStatement,
2184 ) -> Result<QueryResult> {
2185 let column_names = extract_return_column_names(&mom.return_clause.items);
2186
2187 let lead_return_items: Vec<ReturnItem> = mom
2190 .return_clause
2191 .items
2192 .iter()
2193 .filter(|item| {
2194 let lead_vars: Vec<&str> = mom
2196 .match_patterns
2197 .iter()
2198 .flat_map(|p| p.nodes.iter().map(|n| n.var.as_str()))
2199 .collect();
2200 match &item.expr {
2201 Expr::PropAccess { var, .. } => lead_vars.contains(&var.as_str()),
2202 Expr::Var(v) => lead_vars.contains(&v.as_str()),
2203 _ => false,
2204 }
2205 })
2206 .cloned()
2207 .collect();
2208
2209 let lead_col_names = extract_return_column_names(&lead_return_items);
2212
2213 if mom.match_patterns.is_empty() || mom.match_patterns[0].nodes.is_empty() {
2215 let null_row = vec![Value::Null; column_names.len()];
2216 return Ok(QueryResult {
2217 columns: column_names,
2218 rows: vec![null_row],
2219 });
2220 }
2221 let lead_node_pat = &mom.match_patterns[0].nodes[0];
2222 let lead_label = lead_node_pat.labels.first().cloned().unwrap_or_default();
2223 let lead_label_id = match self.catalog.get_label(&lead_label)? {
2224 Some(id) => id as u32,
2225 None => {
2226 return Ok(QueryResult {
2228 columns: column_names,
2229 rows: vec![],
2230 });
2231 }
2232 };
2233
2234 let lead_all_col_ids: Vec<u32> = {
2236 let mut ids = collect_col_ids_from_columns(&lead_col_names);
2237 if let Some(ref wexpr) = mom.match_where {
2238 collect_col_ids_from_expr(wexpr, &mut ids);
2239 }
2240 for p in &lead_node_pat.props {
2241 let col_id = prop_name_to_col_id(&p.key);
2242 if !ids.contains(&col_id) {
2243 ids.push(col_id);
2244 }
2245 }
2246 ids
2247 };
2248
2249 let lead_hwm = self.store.hwm_for_label(lead_label_id)?;
2250 let lead_var = lead_node_pat.var.as_str();
2251
2252 let mut lead_rows: Vec<(u64, Vec<(u32, u64)>)> = Vec::new();
2254 for slot in 0..lead_hwm {
2255 let node_id = NodeId(((lead_label_id as u64) << 32) | slot);
2256 if self.is_node_tombstoned(node_id) {
2259 continue;
2260 }
2261 let props = read_node_props(&self.store, node_id, &lead_all_col_ids)?;
2262 if !self.matches_prop_filter(&props, &lead_node_pat.props) {
2263 continue;
2264 }
2265 if let Some(ref wexpr) = mom.match_where {
2266 let mut row_vals = build_row_vals(&props, lead_var, &lead_all_col_ids, &self.store);
2267 row_vals.extend(self.dollar_params());
2268 if !self.eval_where_graph(wexpr, &row_vals) {
2269 continue;
2270 }
2271 }
2272 lead_rows.push((slot, props));
2273 }
2274
2275 let opt_patterns = &mom.optional_patterns;
2279
2280 let opt_vars: Vec<String> = opt_patterns
2282 .iter()
2283 .flat_map(|p| p.nodes.iter().map(|n| n.var.clone()))
2284 .filter(|v| !v.is_empty())
2285 .collect();
2286
2287 let mut result_rows: Vec<Vec<Value>> = Vec::new();
2288
2289 for (lead_slot, lead_props) in &lead_rows {
2290 let lead_row_vals =
2291 build_row_vals(lead_props, lead_var, &lead_all_col_ids, &self.store);
2292
2293 let opt_sub_rows: Vec<HashMap<String, Value>> = if opt_patterns.len() == 1
2298 && opt_patterns[0].rels.len() == 1
2299 && opt_patterns[0].nodes.len() == 2
2300 {
2301 let opt_pat = &opt_patterns[0];
2302 let opt_src_pat = &opt_pat.nodes[0];
2303 let opt_dst_pat = &opt_pat.nodes[1];
2304 let opt_rel_pat = &opt_pat.rels[0];
2305
2306 let opt_dst_label = opt_dst_pat.labels.first().cloned().unwrap_or_default();
2308 let opt_dst_label_id: Option<u32> = match self.catalog.get_label(&opt_dst_label) {
2309 Ok(Some(id)) => Some(id as u32),
2310 _ => None,
2311 };
2312
2313 self.optional_one_hop_sub_rows(
2314 *lead_slot,
2315 lead_label_id,
2316 opt_dst_label_id,
2317 opt_src_pat,
2318 opt_dst_pat,
2319 opt_rel_pat,
2320 &opt_vars,
2321 &column_names,
2322 )
2323 .unwrap_or_default()
2324 } else {
2325 vec![]
2327 };
2328
2329 if opt_sub_rows.is_empty() {
2330 let row: Vec<Value> = mom
2332 .return_clause
2333 .items
2334 .iter()
2335 .map(|item| {
2336 let v = eval_expr(&item.expr, &lead_row_vals);
2337 if v == Value::Null {
2338 match &item.expr {
2341 Expr::PropAccess { var, .. } | Expr::Var(var) => {
2342 if opt_vars.contains(var) {
2343 Value::Null
2344 } else {
2345 eval_expr(&item.expr, &lead_row_vals)
2346 }
2347 }
2348 _ => eval_expr(&item.expr, &lead_row_vals),
2349 }
2350 } else {
2351 v
2352 }
2353 })
2354 .collect();
2355 result_rows.push(row);
2356 } else {
2357 for opt_row_vals in opt_sub_rows {
2359 let mut combined = lead_row_vals.clone();
2360 combined.extend(opt_row_vals);
2361 let row: Vec<Value> = mom
2362 .return_clause
2363 .items
2364 .iter()
2365 .map(|item| eval_expr(&item.expr, &combined))
2366 .collect();
2367 result_rows.push(row);
2368 }
2369 }
2370 }
2371
2372 if mom.distinct {
2373 deduplicate_rows(&mut result_rows);
2374 }
2375 if let Some(skip) = mom.skip {
2376 let skip = (skip as usize).min(result_rows.len());
2377 result_rows.drain(0..skip);
2378 }
2379 if let Some(lim) = mom.limit {
2380 result_rows.truncate(lim as usize);
2381 }
2382
2383 Ok(QueryResult {
2384 columns: column_names,
2385 rows: result_rows,
2386 })
2387 }
2388
2389 #[allow(clippy::too_many_arguments)]
2392 fn optional_one_hop_sub_rows(
2393 &self,
2394 src_slot: u64,
2395 src_label_id: u32,
2396 dst_label_id: Option<u32>,
2397 _src_pat: &sparrowdb_cypher::ast::NodePattern,
2398 dst_node_pat: &sparrowdb_cypher::ast::NodePattern,
2399 rel_pat: &sparrowdb_cypher::ast::RelPattern,
2400 opt_vars: &[String],
2401 column_names: &[String],
2402 ) -> Result<Vec<HashMap<String, Value>>> {
2403 let dst_label_id = match dst_label_id {
2404 Some(id) => id,
2405 None => return Ok(vec![]),
2406 };
2407
2408 let dst_var = dst_node_pat.var.as_str();
2409 let col_ids_dst = collect_col_ids_for_var(dst_var, column_names, dst_label_id);
2410 let _ = opt_vars;
2411
2412 let rel_lookup = self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
2414
2415 if matches!(rel_lookup, RelTableLookup::NotFound) {
2417 return Ok(vec![]);
2418 }
2419
2420 let delta_neighbors: Vec<u64> = {
2421 let records: Vec<DeltaRecord> = match rel_lookup {
2422 RelTableLookup::Found(rtid) => self.read_delta_for(rtid),
2423 _ => self.read_delta_all(),
2424 };
2425 records
2426 .into_iter()
2427 .filter(|r| {
2428 let r_src_label = (r.src.0 >> 32) as u32;
2429 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
2430 r_src_label == src_label_id && r_src_slot == src_slot
2431 })
2432 .map(|r| r.dst.0 & 0xFFFF_FFFF)
2433 .collect()
2434 };
2435
2436 let csr_neighbors = match rel_lookup {
2437 RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
2438 _ => self.csr_neighbors_all(src_slot),
2439 };
2440 let all_neighbors: Vec<u64> = csr_neighbors.into_iter().chain(delta_neighbors).collect();
2441
2442 let mut seen: HashSet<u64> = HashSet::new();
2443 let mut sub_rows: Vec<HashMap<String, Value>> = Vec::new();
2444
2445 for dst_slot in all_neighbors {
2446 if !seen.insert(dst_slot) {
2447 continue;
2448 }
2449 let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
2450 let dst_props = read_node_props(&self.store, dst_node, &col_ids_dst)?;
2451 if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
2452 continue;
2453 }
2454 let row_vals = build_row_vals(&dst_props, dst_var, &col_ids_dst, &self.store);
2455 sub_rows.push(row_vals);
2456 }
2457
2458 Ok(sub_rows)
2459 }
2460
2461 fn execute_multi_pattern_scan(
2470 &self,
2471 m: &MatchStatement,
2472 column_names: &[String],
2473 ) -> Result<QueryResult> {
2474 let mut per_var: Vec<(String, u32, Vec<NodeId>)> = Vec::new(); for pat in &m.pattern {
2478 if pat.nodes.is_empty() {
2479 continue;
2480 }
2481 let node = &pat.nodes[0];
2482 if node.var.is_empty() {
2483 continue;
2484 }
2485 let label = node.labels.first().cloned().unwrap_or_default();
2486 let label_id = match self.catalog.get_label(&label)? {
2487 Some(id) => id as u32,
2488 None => return Ok(QueryResult::empty(column_names.to_vec())),
2489 };
2490 let filter_col_ids: Vec<u32> = node
2491 .props
2492 .iter()
2493 .map(|p| prop_name_to_col_id(&p.key))
2494 .collect();
2495 let params = self.dollar_params();
2496 let hwm = self.store.hwm_for_label(label_id)?;
2497 let mut candidates: Vec<NodeId> = Vec::new();
2498 for slot in 0..hwm {
2499 let node_id = NodeId(((label_id as u64) << 32) | slot);
2500 if self.is_node_tombstoned(node_id) {
2501 continue;
2502 }
2503 if filter_col_ids.is_empty() {
2504 candidates.push(node_id);
2505 } else if let Ok(raw_props) = self.store.get_node_raw(node_id, &filter_col_ids) {
2506 if matches_prop_filter_static(&raw_props, &node.props, ¶ms, &self.store) {
2507 candidates.push(node_id);
2508 }
2509 }
2510 }
2511 if candidates.is_empty() {
2512 return Ok(QueryResult::empty(column_names.to_vec()));
2513 }
2514 per_var.push((node.var.clone(), label_id, candidates));
2515 }
2516
2517 let mut accumulated: Vec<HashMap<String, Value>> = vec![HashMap::new()];
2519 for (var, _label_id, candidates) in &per_var {
2520 let mut next: Vec<HashMap<String, Value>> = Vec::new();
2521 for base_row in &accumulated {
2522 for &node_id in candidates {
2523 let mut row = base_row.clone();
2524 row.insert(var.clone(), Value::NodeRef(node_id));
2526 row.insert(format!("{var}.__node_id__"), Value::NodeRef(node_id));
2527 let label_id = (node_id.0 >> 32) as u32;
2529 let label_col_ids = self.store.col_ids_for_label(label_id).unwrap_or_default();
2530 let nullable = self
2531 .store
2532 .get_node_raw_nullable(node_id, &label_col_ids)
2533 .unwrap_or_default();
2534 for &(col_id, opt_raw) in &nullable {
2535 if let Some(raw) = opt_raw {
2536 row.insert(
2537 format!("{var}.col_{col_id}"),
2538 decode_raw_val(raw, &self.store),
2539 );
2540 }
2541 }
2542 next.push(row);
2543 }
2544 }
2545 accumulated = next;
2546 }
2547
2548 if let Some(ref where_expr) = m.where_clause {
2550 accumulated.retain(|row| self.eval_where_graph(where_expr, row));
2551 }
2552
2553 let dollar_params = self.dollar_params();
2555 if !dollar_params.is_empty() {
2556 for row in &mut accumulated {
2557 row.extend(dollar_params.clone());
2558 }
2559 }
2560
2561 let mut rows = self.aggregate_rows_graph(&accumulated, &m.return_clause.items);
2562
2563 apply_order_by(&mut rows, m, column_names);
2565 if let Some(skip) = m.skip {
2566 let skip = (skip as usize).min(rows.len());
2567 rows.drain(0..skip);
2568 }
2569 if let Some(limit) = m.limit {
2570 rows.truncate(limit as usize);
2571 }
2572
2573 Ok(QueryResult {
2574 columns: column_names.to_vec(),
2575 rows,
2576 })
2577 }
2578
2579 fn execute_scan(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
2580 let pat = &m.pattern[0];
2581 let node = &pat.nodes[0];
2582
2583 if node.labels.is_empty() {
2586 return self.execute_scan_all_labels(m, column_names);
2587 }
2588
2589 let label = node.labels.first().cloned().unwrap_or_default();
2590 let label_id = match self.catalog.get_label(&label)? {
2592 Some(id) => id as u32,
2593 None => {
2594 return Ok(QueryResult {
2595 columns: column_names.to_vec(),
2596 rows: vec![],
2597 })
2598 }
2599 };
2600 let label_id_u32 = label_id;
2601
2602 let hwm = self.store.hwm_for_label(label_id_u32)?;
2603 tracing::debug!(label = %label, hwm = hwm, "node scan start");
2604
2605 let col_ids = collect_col_ids_from_columns(column_names);
2608 let mut all_col_ids: Vec<u32> = col_ids.clone();
2609 if let Some(ref where_expr) = m.where_clause {
2611 collect_col_ids_from_expr(where_expr, &mut all_col_ids);
2612 }
2613 for p in &node.props {
2615 let col_id = prop_name_to_col_id(&p.key);
2616 if !all_col_ids.contains(&col_id) {
2617 all_col_ids.push(col_id);
2618 }
2619 }
2620
2621 let use_agg = has_aggregate_in_return(&m.return_clause.items);
2622 let use_eval_path = use_agg || needs_node_ref_in_return(&m.return_clause.items);
2628 if use_eval_path {
2629 for item in &m.return_clause.items {
2634 collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
2635 }
2636 }
2637
2638 let bare_vars = bare_var_names_in_return(&m.return_clause.items);
2641 let all_label_col_ids: Vec<u32> = if !bare_vars.is_empty() {
2642 self.store.col_ids_for_label(label_id_u32)?
2643 } else {
2644 vec![]
2645 };
2646
2647 let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
2648 let mut rows: Vec<Vec<Value>> = Vec::new();
2649
2650 let index_candidate_slots: Option<Vec<u32>> =
2655 try_index_lookup_for_props(&node.props, label_id_u32, &self.prop_index);
2656
2657 let where_eq_candidate_slots: Option<Vec<u32>> = if index_candidate_slots.is_none() {
2662 m.where_clause.as_ref().and_then(|wexpr| {
2663 try_where_eq_index_lookup(wexpr, node.var.as_str(), label_id_u32, &self.prop_index)
2664 })
2665 } else {
2666 None
2667 };
2668
2669 let where_range_candidate_slots: Option<Vec<u32>> =
2673 if index_candidate_slots.is_none() && where_eq_candidate_slots.is_none() {
2674 m.where_clause.as_ref().and_then(|wexpr| {
2675 try_where_range_index_lookup(
2676 wexpr,
2677 node.var.as_str(),
2678 label_id_u32,
2679 &self.prop_index,
2680 )
2681 })
2682 } else {
2683 None
2684 };
2685
2686 let text_candidate_slots: Option<Vec<u32>> = if index_candidate_slots.is_none()
2692 && where_eq_candidate_slots.is_none()
2693 && where_range_candidate_slots.is_none()
2694 {
2695 m.where_clause.as_ref().and_then(|wexpr| {
2696 try_text_index_lookup(wexpr, node.var.as_str(), label_id_u32, &self.text_index)
2697 })
2698 } else {
2699 None
2700 };
2701
2702 let slot_iter: Box<dyn Iterator<Item = u64>> =
2706 if let Some(ref slots) = index_candidate_slots {
2707 tracing::debug!(
2708 label = %label,
2709 candidates = slots.len(),
2710 "SPA-249: property index fast path"
2711 );
2712 Box::new(slots.iter().map(|&s| s as u64))
2713 } else if let Some(ref slots) = where_eq_candidate_slots {
2714 tracing::debug!(
2715 label = %label,
2716 candidates = slots.len(),
2717 "SPA-249 Phase 1b: WHERE equality index fast path"
2718 );
2719 Box::new(slots.iter().map(|&s| s as u64))
2720 } else if let Some(ref slots) = where_range_candidate_slots {
2721 tracing::debug!(
2722 label = %label,
2723 candidates = slots.len(),
2724 "SPA-249 Phase 2: WHERE range index fast path"
2725 );
2726 Box::new(slots.iter().map(|&s| s as u64))
2727 } else if let Some(ref slots) = text_candidate_slots {
2728 tracing::debug!(
2729 label = %label,
2730 candidates = slots.len(),
2731 "SPA-251: text index fast path"
2732 );
2733 Box::new(slots.iter().map(|&s| s as u64))
2734 } else {
2735 Box::new(0..hwm)
2736 };
2737
2738 for slot in slot_iter {
2739 self.check_deadline()?;
2741
2742 let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
2743 if slot < 1024 || slot % 10_000 == 0 {
2744 tracing::trace!(slot = slot, node_id = node_id.0, "scan emit");
2745 }
2746
2747 if self.is_node_tombstoned(node_id) {
2755 continue;
2756 }
2757
2758 let nullable_props = self.store.get_node_raw_nullable(node_id, &all_col_ids)?;
2763 let props: Vec<(u32, u64)> = nullable_props
2764 .iter()
2765 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
2766 .collect();
2767
2768 if !self.matches_prop_filter(&props, &node.props) {
2770 continue;
2771 }
2772
2773 let var_name = node.var.as_str();
2775 if let Some(ref where_expr) = m.where_clause {
2776 let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
2777 if !var_name.is_empty() && !label.is_empty() {
2779 row_vals.insert(
2780 format!("{}.__labels__", var_name),
2781 Value::List(vec![Value::String(label.clone())]),
2782 );
2783 }
2784 if !var_name.is_empty() {
2786 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2787 }
2788 row_vals.extend(self.dollar_params());
2790 if !self.eval_where_graph(where_expr, &row_vals) {
2791 continue;
2792 }
2793 }
2794
2795 if use_eval_path {
2796 let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
2798 if !var_name.is_empty() && !label.is_empty() {
2800 row_vals.insert(
2801 format!("{}.__labels__", var_name),
2802 Value::List(vec![Value::String(label.clone())]),
2803 );
2804 }
2805 if !var_name.is_empty() {
2806 if bare_vars.contains(&var_name.to_string()) && !all_label_col_ids.is_empty() {
2810 let all_nullable = self
2811 .store
2812 .get_node_raw_nullable(node_id, &all_label_col_ids)?;
2813 let all_props: Vec<(u32, u64)> = all_nullable
2814 .iter()
2815 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
2816 .collect();
2817 row_vals.insert(
2818 var_name.to_string(),
2819 build_node_map(&all_props, &self.store),
2820 );
2821 } else {
2822 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2823 }
2824 row_vals.insert(format!("{}.__node_id__", var_name), Value::NodeRef(node_id));
2827 }
2828 raw_rows.push(row_vals);
2829 } else {
2830 let row = project_row(
2832 &props,
2833 column_names,
2834 &all_col_ids,
2835 var_name,
2836 &label,
2837 &self.store,
2838 );
2839 rows.push(row);
2840 }
2841 }
2842
2843 if use_eval_path {
2844 rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
2845 } else {
2846 if m.distinct {
2847 deduplicate_rows(&mut rows);
2848 }
2849
2850 apply_order_by(&mut rows, m, column_names);
2852
2853 if let Some(skip) = m.skip {
2855 let skip = (skip as usize).min(rows.len());
2856 rows.drain(0..skip);
2857 }
2858
2859 if let Some(lim) = m.limit {
2861 rows.truncate(lim as usize);
2862 }
2863 }
2864
2865 tracing::debug!(rows = rows.len(), "node scan complete");
2866 Ok(QueryResult {
2867 columns: column_names.to_vec(),
2868 rows,
2869 })
2870 }
2871
2872 fn execute_scan_all_labels(
2881 &self,
2882 m: &MatchStatement,
2883 column_names: &[String],
2884 ) -> Result<QueryResult> {
2885 let all_labels = self.catalog.list_labels()?;
2886 tracing::debug!(label_count = all_labels.len(), "label-less full scan start");
2887
2888 let pat = &m.pattern[0];
2889 let node = &pat.nodes[0];
2890 let var_name = node.var.as_str();
2891
2892 let mut all_col_ids: Vec<u32> = collect_col_ids_from_columns(column_names);
2894 if let Some(ref where_expr) = m.where_clause {
2895 collect_col_ids_from_expr(where_expr, &mut all_col_ids);
2896 }
2897 for p in &node.props {
2898 let col_id = prop_name_to_col_id(&p.key);
2899 if !all_col_ids.contains(&col_id) {
2900 all_col_ids.push(col_id);
2901 }
2902 }
2903
2904 let use_agg = has_aggregate_in_return(&m.return_clause.items);
2905 let use_eval_path_all = use_agg || needs_node_ref_in_return(&m.return_clause.items);
2907 if use_eval_path_all {
2908 for item in &m.return_clause.items {
2909 collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
2910 }
2911 }
2912
2913 let bare_vars_all = bare_var_names_in_return(&m.return_clause.items);
2915
2916 let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
2917 let mut rows: Vec<Vec<Value>> = Vec::new();
2918
2919 for (label_id, label_name) in &all_labels {
2920 let label_id_u32 = *label_id as u32;
2921 let hwm = self.store.hwm_for_label(label_id_u32)?;
2922 tracing::debug!(label = %label_name, hwm = hwm, "label-less scan: label slot");
2923
2924 let all_label_col_ids_here: Vec<u32> = if !bare_vars_all.is_empty() {
2926 self.store.col_ids_for_label(label_id_u32)?
2927 } else {
2928 vec![]
2929 };
2930
2931 for slot in 0..hwm {
2932 self.check_deadline()?;
2934
2935 let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
2936
2937 if self.is_node_tombstoned(node_id) {
2941 continue;
2942 }
2943
2944 let nullable_props = self.store.get_node_raw_nullable(node_id, &all_col_ids)?;
2945 let props: Vec<(u32, u64)> = nullable_props
2946 .iter()
2947 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
2948 .collect();
2949
2950 if !self.matches_prop_filter(&props, &node.props) {
2952 continue;
2953 }
2954
2955 if let Some(ref where_expr) = m.where_clause {
2957 let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
2958 if !var_name.is_empty() {
2959 row_vals.insert(
2960 format!("{}.__labels__", var_name),
2961 Value::List(vec![Value::String(label_name.clone())]),
2962 );
2963 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2964 }
2965 row_vals.extend(self.dollar_params());
2966 if !self.eval_where_graph(where_expr, &row_vals) {
2967 continue;
2968 }
2969 }
2970
2971 if use_eval_path_all {
2972 let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
2973 if !var_name.is_empty() {
2974 row_vals.insert(
2975 format!("{}.__labels__", var_name),
2976 Value::List(vec![Value::String(label_name.clone())]),
2977 );
2978 if bare_vars_all.contains(&var_name.to_string())
2980 && !all_label_col_ids_here.is_empty()
2981 {
2982 let all_nullable = self
2983 .store
2984 .get_node_raw_nullable(node_id, &all_label_col_ids_here)?;
2985 let all_props: Vec<(u32, u64)> = all_nullable
2986 .iter()
2987 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
2988 .collect();
2989 row_vals.insert(
2990 var_name.to_string(),
2991 build_node_map(&all_props, &self.store),
2992 );
2993 } else {
2994 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2995 }
2996 row_vals
2997 .insert(format!("{}.__node_id__", var_name), Value::NodeRef(node_id));
2998 }
2999 raw_rows.push(row_vals);
3000 } else {
3001 let row = project_row(
3002 &props,
3003 column_names,
3004 &all_col_ids,
3005 var_name,
3006 label_name,
3007 &self.store,
3008 );
3009 rows.push(row);
3010 }
3011 }
3012 }
3013
3014 if use_eval_path_all {
3015 rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
3016 }
3017
3018 if m.distinct {
3021 deduplicate_rows(&mut rows);
3022 }
3023 apply_order_by(&mut rows, m, column_names);
3024 if let Some(skip) = m.skip {
3025 let skip = (skip as usize).min(rows.len());
3026 rows.drain(0..skip);
3027 }
3028 if let Some(lim) = m.limit {
3029 rows.truncate(lim as usize);
3030 }
3031
3032 tracing::debug!(rows = rows.len(), "label-less full scan complete");
3033 Ok(QueryResult {
3034 columns: column_names.to_vec(),
3035 rows,
3036 })
3037 }
3038
3039 fn execute_one_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
3042 let pat = &m.pattern[0];
3043 let src_node_pat = &pat.nodes[0];
3044 let dst_node_pat = &pat.nodes[1];
3045 let rel_pat = &pat.rels[0];
3046
3047 let dir = &rel_pat.dir;
3048 use sparrowdb_cypher::ast::EdgeDir;
3054
3055 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
3056 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
3057 let src_label_id_opt: Option<u32> = if src_label.is_empty() {
3059 None
3060 } else {
3061 self.catalog.get_label(&src_label)?.map(|id| id as u32)
3062 };
3063 let dst_label_id_opt: Option<u32> = if dst_label.is_empty() {
3064 None
3065 } else {
3066 self.catalog.get_label(&dst_label)?.map(|id| id as u32)
3067 };
3068
3069 let all_rel_tables = self.catalog.list_rel_tables_with_ids();
3081 let rel_tables_to_scan: Vec<(u64, u32, u32, String)> = all_rel_tables
3082 .into_iter()
3083 .filter(|(_, sid, did, rt)| {
3084 let type_ok = rel_pat.rel_type.is_empty() || rt == &rel_pat.rel_type;
3085 let src_ok = src_label_id_opt.map(|id| id == *sid as u32).unwrap_or(true);
3086 let dst_ok = dst_label_id_opt.map(|id| id == *did as u32).unwrap_or(true);
3087 type_ok && src_ok && dst_ok
3088 })
3089 .map(|(catalog_id, sid, did, rt)| (catalog_id, sid as u32, did as u32, rt))
3090 .collect();
3091
3092 let use_agg = has_aggregate_in_return(&m.return_clause.items);
3093 let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
3094 let mut rows: Vec<Vec<Value>> = Vec::new();
3095 let mut seen_undirected: HashSet<(u64, u64)> = HashSet::new();
3098
3099 let label_id_to_name: Vec<(u16, String)> = if src_label.is_empty() || dst_label.is_empty() {
3101 self.catalog.list_labels().unwrap_or_default()
3102 } else {
3103 vec![]
3104 };
3105
3106 for (catalog_rel_id, tbl_src_label_id, tbl_dst_label_id, tbl_rel_type) in
3108 &rel_tables_to_scan
3109 {
3110 let storage_rel_id = RelTableId(*catalog_rel_id as u32);
3111 let effective_src_label_id = *tbl_src_label_id;
3112 let effective_dst_label_id = *tbl_dst_label_id;
3113
3114 let effective_rel_type: &str = tbl_rel_type.as_str();
3117
3118 let effective_src_label: &str = if src_label.is_empty() {
3120 label_id_to_name
3121 .iter()
3122 .find(|(id, _)| *id as u32 == effective_src_label_id)
3123 .map(|(_, name)| name.as_str())
3124 .unwrap_or("")
3125 } else {
3126 src_label.as_str()
3127 };
3128 let effective_dst_label: &str = if dst_label.is_empty() {
3129 label_id_to_name
3130 .iter()
3131 .find(|(id, _)| *id as u32 == effective_dst_label_id)
3132 .map(|(_, name)| name.as_str())
3133 .unwrap_or("")
3134 } else {
3135 dst_label.as_str()
3136 };
3137
3138 let hwm_src = match self.store.hwm_for_label(effective_src_label_id) {
3139 Ok(h) => h,
3140 Err(_) => continue,
3141 };
3142 tracing::debug!(
3143 src_label = %effective_src_label,
3144 dst_label = %effective_dst_label,
3145 rel_type = %effective_rel_type,
3146 hwm_src = hwm_src,
3147 "one-hop traversal start"
3148 );
3149
3150 let mut col_ids_src =
3151 collect_col_ids_for_var(&src_node_pat.var, column_names, effective_src_label_id);
3152 let mut col_ids_dst =
3153 collect_col_ids_for_var(&dst_node_pat.var, column_names, effective_dst_label_id);
3154 if use_agg {
3155 for item in &m.return_clause.items {
3156 collect_col_ids_from_expr(&item.expr, &mut col_ids_src);
3157 collect_col_ids_from_expr(&item.expr, &mut col_ids_dst);
3158 }
3159 }
3160 if let Some(ref where_expr) = m.where_clause {
3162 collect_col_ids_from_expr(where_expr, &mut col_ids_src);
3163 collect_col_ids_from_expr(where_expr, &mut col_ids_dst);
3164 }
3165
3166 let delta_records_all = {
3169 let edge_store = EdgeStore::open(&self.db_root, storage_rel_id);
3170 edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
3171 };
3172
3173 for src_slot in 0..hwm_src {
3175 self.check_deadline()?;
3177
3178 let src_node = NodeId(((effective_src_label_id as u64) << 32) | src_slot);
3179 let src_props = if !col_ids_src.is_empty() || !src_node_pat.props.is_empty() {
3180 let all_needed: Vec<u32> = {
3181 let mut v = col_ids_src.clone();
3182 for p in &src_node_pat.props {
3183 let col_id = prop_name_to_col_id(&p.key);
3184 if !v.contains(&col_id) {
3185 v.push(col_id);
3186 }
3187 }
3188 v
3189 };
3190 self.store.get_node_raw(src_node, &all_needed)?
3191 } else {
3192 vec![]
3193 };
3194
3195 if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
3197 continue;
3198 }
3199
3200 let delta_neighbors: Vec<u64> = delta_records_all
3203 .iter()
3204 .filter(|r| {
3205 let r_src_label = (r.src.0 >> 32) as u32;
3206 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
3207 r_src_label == effective_src_label_id && r_src_slot == src_slot
3208 })
3209 .map(|r| r.dst.0 & 0xFFFF_FFFF)
3210 .collect();
3211
3212 let csr_neighbors: &[u64] = self
3216 .csrs
3217 .get(&u32::try_from(*catalog_rel_id).expect("rel_table_id overflowed u32"))
3218 .map(|c| c.neighbors(src_slot))
3219 .unwrap_or(&[]);
3220 let all_neighbors: Vec<u64> = csr_neighbors
3221 .iter()
3222 .copied()
3223 .chain(delta_neighbors.into_iter())
3224 .collect();
3225 let mut seen_neighbors: HashSet<u64> = HashSet::new();
3226 for &dst_slot in &all_neighbors {
3227 if !seen_neighbors.insert(dst_slot) {
3228 continue;
3229 }
3230 if *dir == EdgeDir::Both {
3233 seen_undirected.insert((src_slot, dst_slot));
3234 }
3235 let dst_node = NodeId(((effective_dst_label_id as u64) << 32) | dst_slot);
3236 let dst_props = if !col_ids_dst.is_empty() || !dst_node_pat.props.is_empty() {
3237 let all_needed: Vec<u32> = {
3238 let mut v = col_ids_dst.clone();
3239 for p in &dst_node_pat.props {
3240 let col_id = prop_name_to_col_id(&p.key);
3241 if !v.contains(&col_id) {
3242 v.push(col_id);
3243 }
3244 }
3245 v
3246 };
3247 self.store.get_node_raw(dst_node, &all_needed)?
3248 } else {
3249 vec![]
3250 };
3251
3252 if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
3254 continue;
3255 }
3256
3257 if *dir == EdgeDir::Both {
3260 seen_undirected.insert((src_slot, dst_slot));
3261 }
3262
3263 if let Some(ref where_expr) = m.where_clause {
3265 let mut row_vals = build_row_vals(
3266 &src_props,
3267 &src_node_pat.var,
3268 &col_ids_src,
3269 &self.store,
3270 );
3271 row_vals.extend(build_row_vals(
3272 &dst_props,
3273 &dst_node_pat.var,
3274 &col_ids_dst,
3275 &self.store,
3276 ));
3277 if !rel_pat.var.is_empty() {
3279 row_vals.insert(
3280 format!("{}.__type__", rel_pat.var),
3281 Value::String(effective_rel_type.to_string()),
3282 );
3283 }
3284 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3286 row_vals.insert(
3287 format!("{}.__labels__", src_node_pat.var),
3288 Value::List(vec![Value::String(effective_src_label.to_string())]),
3289 );
3290 }
3291 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3292 row_vals.insert(
3293 format!("{}.__labels__", dst_node_pat.var),
3294 Value::List(vec![Value::String(effective_dst_label.to_string())]),
3295 );
3296 }
3297 row_vals.extend(self.dollar_params());
3298 if !self.eval_where_graph(where_expr, &row_vals) {
3299 continue;
3300 }
3301 }
3302
3303 if use_agg {
3304 let mut row_vals = build_row_vals(
3305 &src_props,
3306 &src_node_pat.var,
3307 &col_ids_src,
3308 &self.store,
3309 );
3310 row_vals.extend(build_row_vals(
3311 &dst_props,
3312 &dst_node_pat.var,
3313 &col_ids_dst,
3314 &self.store,
3315 ));
3316 if !rel_pat.var.is_empty() {
3318 row_vals.insert(
3319 format!("{}.__type__", rel_pat.var),
3320 Value::String(effective_rel_type.to_string()),
3321 );
3322 }
3323 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3324 row_vals.insert(
3325 format!("{}.__labels__", src_node_pat.var),
3326 Value::List(vec![Value::String(effective_src_label.to_string())]),
3327 );
3328 }
3329 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3330 row_vals.insert(
3331 format!("{}.__labels__", dst_node_pat.var),
3332 Value::List(vec![Value::String(effective_dst_label.to_string())]),
3333 );
3334 }
3335 if !src_node_pat.var.is_empty() {
3336 row_vals.insert(src_node_pat.var.clone(), Value::NodeRef(src_node));
3337 }
3338 if !dst_node_pat.var.is_empty() {
3339 row_vals.insert(dst_node_pat.var.clone(), Value::NodeRef(dst_node));
3340 }
3341 if !rel_pat.var.is_empty() {
3344 let edge_id = sparrowdb_common::EdgeId(
3350 (*catalog_rel_id << 32) | (src_slot ^ dst_slot) & 0xFFFF_FFFF,
3351 );
3352 row_vals.insert(rel_pat.var.clone(), Value::EdgeRef(edge_id));
3353 }
3354 raw_rows.push(row_vals);
3355 } else {
3356 let rel_var_type = if !rel_pat.var.is_empty() {
3361 Some((rel_pat.var.as_str(), effective_rel_type))
3362 } else {
3363 None
3364 };
3365 let src_label_meta =
3366 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3367 Some((src_node_pat.var.as_str(), effective_src_label))
3368 } else {
3369 None
3370 };
3371 let dst_label_meta =
3372 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3373 Some((dst_node_pat.var.as_str(), effective_dst_label))
3374 } else {
3375 None
3376 };
3377 let row = project_hop_row(
3378 &src_props,
3379 &dst_props,
3380 column_names,
3381 &src_node_pat.var,
3382 &dst_node_pat.var,
3383 rel_var_type,
3384 src_label_meta,
3385 dst_label_meta,
3386 &self.store,
3387 );
3388 rows.push(row);
3389 }
3390 }
3391 }
3392 }
3393
3394 if *dir == EdgeDir::Both {
3399 for (catalog_rel_id, tbl_src_label_id, tbl_dst_label_id, tbl_rel_type) in
3400 &rel_tables_to_scan
3401 {
3402 let storage_rel_id = RelTableId(*catalog_rel_id as u32);
3403 let bwd_scan_label_id = *tbl_dst_label_id;
3405 let bwd_dst_label_id = *tbl_src_label_id;
3406 let effective_rel_type: &str = tbl_rel_type.as_str();
3407
3408 let effective_src_label: &str = if src_label.is_empty() {
3409 label_id_to_name
3410 .iter()
3411 .find(|(id, _)| *id as u32 == bwd_scan_label_id)
3412 .map(|(_, name)| name.as_str())
3413 .unwrap_or("")
3414 } else {
3415 src_label.as_str()
3416 };
3417 let effective_dst_label: &str = if dst_label.is_empty() {
3418 label_id_to_name
3419 .iter()
3420 .find(|(id, _)| *id as u32 == bwd_dst_label_id)
3421 .map(|(_, name)| name.as_str())
3422 .unwrap_or("")
3423 } else {
3424 dst_label.as_str()
3425 };
3426
3427 let hwm_bwd = match self.store.hwm_for_label(bwd_scan_label_id) {
3428 Ok(h) => h,
3429 Err(_) => continue,
3430 };
3431
3432 let mut col_ids_src =
3433 collect_col_ids_for_var(&src_node_pat.var, column_names, bwd_scan_label_id);
3434 let mut col_ids_dst =
3435 collect_col_ids_for_var(&dst_node_pat.var, column_names, bwd_dst_label_id);
3436 if use_agg {
3437 for item in &m.return_clause.items {
3438 collect_col_ids_from_expr(&item.expr, &mut col_ids_src);
3439 collect_col_ids_from_expr(&item.expr, &mut col_ids_dst);
3440 }
3441 }
3442
3443 let delta_records_bwd = EdgeStore::open(&self.db_root, storage_rel_id)
3446 .and_then(|s| s.read_delta())
3447 .unwrap_or_default();
3448
3449 let csr_bwd: Option<CsrBackward> = EdgeStore::open(&self.db_root, storage_rel_id)
3454 .and_then(|s| s.open_bwd())
3455 .ok();
3456
3457 for b_slot in 0..hwm_bwd {
3459 let b_node = NodeId(((bwd_scan_label_id as u64) << 32) | b_slot);
3460 let b_props = if !col_ids_src.is_empty() || !src_node_pat.props.is_empty() {
3461 let all_needed: Vec<u32> = {
3462 let mut v = col_ids_src.clone();
3463 for p in &src_node_pat.props {
3464 let col_id = prop_name_to_col_id(&p.key);
3465 if !v.contains(&col_id) {
3466 v.push(col_id);
3467 }
3468 }
3469 v
3470 };
3471 self.store.get_node_raw(b_node, &all_needed)?
3472 } else {
3473 vec![]
3474 };
3475 if !self.matches_prop_filter(&b_props, &src_node_pat.props) {
3480 continue;
3481 }
3482
3483 let delta_predecessors: Vec<u64> = delta_records_bwd
3486 .iter()
3487 .filter(|r| {
3488 let r_dst_label = (r.dst.0 >> 32) as u32;
3489 let r_dst_slot = r.dst.0 & 0xFFFF_FFFF;
3490 r_dst_label == bwd_scan_label_id && r_dst_slot == b_slot
3491 })
3492 .map(|r| r.src.0 & 0xFFFF_FFFF)
3493 .collect();
3494
3495 let csr_predecessors: &[u64] = csr_bwd
3501 .as_ref()
3502 .map(|c| c.predecessors(b_slot))
3503 .unwrap_or(&[]);
3504 let all_predecessors: Vec<u64> = csr_predecessors
3505 .iter()
3506 .copied()
3507 .chain(delta_predecessors.into_iter())
3508 .collect();
3509
3510 let mut seen_preds: HashSet<u64> = HashSet::new();
3511 for a_slot in all_predecessors {
3512 if !seen_preds.insert(a_slot) {
3513 continue;
3514 }
3515 if seen_undirected.contains(&(b_slot, a_slot)) {
3525 continue;
3526 }
3527
3528 let a_node = NodeId(((bwd_dst_label_id as u64) << 32) | a_slot);
3529 let a_props = if !col_ids_dst.is_empty() || !dst_node_pat.props.is_empty() {
3530 let all_needed: Vec<u32> = {
3531 let mut v = col_ids_dst.clone();
3532 for p in &dst_node_pat.props {
3533 let col_id = prop_name_to_col_id(&p.key);
3534 if !v.contains(&col_id) {
3535 v.push(col_id);
3536 }
3537 }
3538 v
3539 };
3540 self.store.get_node_raw(a_node, &all_needed)?
3541 } else {
3542 vec![]
3543 };
3544
3545 if !self.matches_prop_filter(&a_props, &dst_node_pat.props) {
3546 continue;
3547 }
3548
3549 if let Some(ref where_expr) = m.where_clause {
3551 let mut row_vals = build_row_vals(
3552 &b_props,
3553 &src_node_pat.var,
3554 &col_ids_src,
3555 &self.store,
3556 );
3557 row_vals.extend(build_row_vals(
3558 &a_props,
3559 &dst_node_pat.var,
3560 &col_ids_dst,
3561 &self.store,
3562 ));
3563 if !rel_pat.var.is_empty() {
3564 row_vals.insert(
3565 format!("{}.__type__", rel_pat.var),
3566 Value::String(effective_rel_type.to_string()),
3567 );
3568 }
3569 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3570 row_vals.insert(
3571 format!("{}.__labels__", src_node_pat.var),
3572 Value::List(vec![Value::String(
3573 effective_src_label.to_string(),
3574 )]),
3575 );
3576 }
3577 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3578 row_vals.insert(
3579 format!("{}.__labels__", dst_node_pat.var),
3580 Value::List(vec![Value::String(
3581 effective_dst_label.to_string(),
3582 )]),
3583 );
3584 }
3585 row_vals.extend(self.dollar_params());
3586 if !self.eval_where_graph(where_expr, &row_vals) {
3587 continue;
3588 }
3589 }
3590
3591 if use_agg {
3592 let mut row_vals = build_row_vals(
3593 &b_props,
3594 &src_node_pat.var,
3595 &col_ids_src,
3596 &self.store,
3597 );
3598 row_vals.extend(build_row_vals(
3599 &a_props,
3600 &dst_node_pat.var,
3601 &col_ids_dst,
3602 &self.store,
3603 ));
3604 if !rel_pat.var.is_empty() {
3605 row_vals.insert(
3606 format!("{}.__type__", rel_pat.var),
3607 Value::String(effective_rel_type.to_string()),
3608 );
3609 }
3610 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3611 row_vals.insert(
3612 format!("{}.__labels__", src_node_pat.var),
3613 Value::List(vec![Value::String(
3614 effective_src_label.to_string(),
3615 )]),
3616 );
3617 }
3618 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3619 row_vals.insert(
3620 format!("{}.__labels__", dst_node_pat.var),
3621 Value::List(vec![Value::String(
3622 effective_dst_label.to_string(),
3623 )]),
3624 );
3625 }
3626 if !src_node_pat.var.is_empty() {
3627 row_vals.insert(src_node_pat.var.clone(), Value::NodeRef(b_node));
3628 }
3629 if !dst_node_pat.var.is_empty() {
3630 row_vals.insert(dst_node_pat.var.clone(), Value::NodeRef(a_node));
3631 }
3632 if !rel_pat.var.is_empty() {
3635 let edge_id = sparrowdb_common::EdgeId(
3636 (*catalog_rel_id << 32) | (b_slot ^ a_slot) & 0xFFFF_FFFF,
3637 );
3638 row_vals.insert(rel_pat.var.clone(), Value::EdgeRef(edge_id));
3639 }
3640 raw_rows.push(row_vals);
3641 } else {
3642 let rel_var_type = if !rel_pat.var.is_empty() {
3643 Some((rel_pat.var.as_str(), effective_rel_type))
3644 } else {
3645 None
3646 };
3647 let src_label_meta = if !src_node_pat.var.is_empty()
3648 && !effective_src_label.is_empty()
3649 {
3650 Some((src_node_pat.var.as_str(), effective_src_label))
3651 } else {
3652 None
3653 };
3654 let dst_label_meta = if !dst_node_pat.var.is_empty()
3655 && !effective_dst_label.is_empty()
3656 {
3657 Some((dst_node_pat.var.as_str(), effective_dst_label))
3658 } else {
3659 None
3660 };
3661 let row = project_hop_row(
3662 &b_props,
3663 &a_props,
3664 column_names,
3665 &src_node_pat.var,
3666 &dst_node_pat.var,
3667 rel_var_type,
3668 src_label_meta,
3669 dst_label_meta,
3670 &self.store,
3671 );
3672 rows.push(row);
3673 }
3674 }
3675 }
3676 }
3677 }
3678
3679 if use_agg {
3680 rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
3681 } else {
3682 if m.distinct {
3684 deduplicate_rows(&mut rows);
3685 }
3686
3687 apply_order_by(&mut rows, m, column_names);
3689
3690 if let Some(skip) = m.skip {
3692 let skip = (skip as usize).min(rows.len());
3693 rows.drain(0..skip);
3694 }
3695
3696 if let Some(lim) = m.limit {
3698 rows.truncate(lim as usize);
3699 }
3700 }
3701
3702 tracing::debug!(rows = rows.len(), "one-hop traversal complete");
3703 Ok(QueryResult {
3704 columns: column_names.to_vec(),
3705 rows,
3706 })
3707 }
3708
3709 fn execute_two_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
3712 use crate::join::AspJoin;
3713
3714 let pat = &m.pattern[0];
3715 let src_node_pat = &pat.nodes[0];
3716 let fof_node_pat = &pat.nodes[2];
3718
3719 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
3720 let fof_label = fof_node_pat.labels.first().cloned().unwrap_or_default();
3721 let src_label_id = self
3722 .catalog
3723 .get_label(&src_label)?
3724 .ok_or(sparrowdb_common::Error::NotFound)? as u32;
3725 let fof_label_id = self
3726 .catalog
3727 .get_label(&fof_label)?
3728 .ok_or(sparrowdb_common::Error::NotFound)? as u32;
3729
3730 let hwm_src = self.store.hwm_for_label(src_label_id)?;
3731 tracing::debug!(src_label = %src_label, fof_label = %fof_label, hwm_src = hwm_src, "two-hop traversal start");
3732
3733 let col_ids_fof = {
3737 let mut ids = collect_col_ids_for_var(&fof_node_pat.var, column_names, fof_label_id);
3738 for p in &fof_node_pat.props {
3739 let col_id = prop_name_to_col_id(&p.key);
3740 if !ids.contains(&col_id) {
3741 ids.push(col_id);
3742 }
3743 }
3744 if let Some(ref where_expr) = m.where_clause {
3745 collect_col_ids_from_expr_for_var(where_expr, &fof_node_pat.var, &mut ids);
3746 }
3747 ids
3748 };
3749
3750 let col_ids_src_where: Vec<u32> = {
3755 let mut ids = collect_col_ids_for_var(&src_node_pat.var, column_names, src_label_id);
3756 if let Some(ref where_expr) = m.where_clause {
3757 collect_col_ids_from_expr_for_var(where_expr, &src_node_pat.var, &mut ids);
3758 }
3759 ids
3760 };
3761
3762 let delta_adj: HashMap<u64, Vec<u64>> = {
3768 let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
3769 for r in self.read_delta_all() {
3770 let r_src_label = (r.src.0 >> 32) as u32;
3771 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
3772 if r_src_label == src_label_id {
3773 adj.entry(r_src_slot)
3774 .or_default()
3775 .push(r.dst.0 & 0xFFFF_FFFF);
3776 }
3777 }
3778 adj
3779 };
3780
3781 let merged_csr = {
3786 let max_nodes = self.csrs.values().map(|c| c.n_nodes()).max().unwrap_or(0);
3787 let mut edges: Vec<(u64, u64)> = Vec::new();
3788 for csr in self.csrs.values() {
3789 for src in 0..csr.n_nodes() {
3790 for &dst in csr.neighbors(src) {
3791 edges.push((src, dst));
3792 }
3793 }
3794 }
3795 edges.sort_unstable();
3797 edges.dedup();
3798 CsrForward::build(max_nodes, &edges)
3799 };
3800 let join = AspJoin::new(&merged_csr);
3801 let mut rows = Vec::new();
3802
3803 for src_slot in 0..hwm_src {
3805 self.check_deadline()?;
3807
3808 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
3809 let src_needed: Vec<u32> = {
3810 let mut v = vec![];
3811 for p in &src_node_pat.props {
3812 let col_id = prop_name_to_col_id(&p.key);
3813 if !v.contains(&col_id) {
3814 v.push(col_id);
3815 }
3816 }
3817 for &col_id in &col_ids_src_where {
3818 if !v.contains(&col_id) {
3819 v.push(col_id);
3820 }
3821 }
3822 v
3823 };
3824
3825 let src_props = read_node_props(&self.store, src_node, &src_needed)?;
3826
3827 if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
3829 continue;
3830 }
3831
3832 let mut fof_slots = join.two_hop(src_slot)?;
3834
3835 let first_hop_delta = delta_adj
3838 .get(&src_slot)
3839 .map(|v| v.as_slice())
3840 .unwrap_or(&[]);
3841 if !first_hop_delta.is_empty() {
3842 let mut delta_fof: HashSet<u64> = HashSet::new();
3843 for &mid_slot in first_hop_delta {
3844 for &fof in merged_csr.neighbors(mid_slot) {
3846 delta_fof.insert(fof);
3847 }
3848 if let Some(mid_neighbors) = delta_adj.get(&mid_slot) {
3850 for &fof in mid_neighbors {
3851 delta_fof.insert(fof);
3852 }
3853 }
3854 }
3855 fof_slots.extend(delta_fof);
3856 let unique: HashSet<u64> = fof_slots.into_iter().collect();
3858 fof_slots = unique.into_iter().collect();
3859 fof_slots.sort_unstable();
3860 }
3861
3862 for fof_slot in fof_slots {
3863 let fof_node = NodeId(((fof_label_id as u64) << 32) | fof_slot);
3864 let fof_props = read_node_props(&self.store, fof_node, &col_ids_fof)?;
3865
3866 if !self.matches_prop_filter(&fof_props, &fof_node_pat.props) {
3868 continue;
3869 }
3870
3871 if let Some(ref where_expr) = m.where_clause {
3873 let mut row_vals = build_row_vals(
3874 &src_props,
3875 &src_node_pat.var,
3876 &col_ids_src_where,
3877 &self.store,
3878 );
3879 row_vals.extend(build_row_vals(
3880 &fof_props,
3881 &fof_node_pat.var,
3882 &col_ids_fof,
3883 &self.store,
3884 ));
3885 if !src_node_pat.var.is_empty() && !src_label.is_empty() {
3887 row_vals.insert(
3888 format!("{}.__labels__", src_node_pat.var),
3889 Value::List(vec![Value::String(src_label.clone())]),
3890 );
3891 }
3892 if !fof_node_pat.var.is_empty() && !fof_label.is_empty() {
3893 row_vals.insert(
3894 format!("{}.__labels__", fof_node_pat.var),
3895 Value::List(vec![Value::String(fof_label.clone())]),
3896 );
3897 }
3898 if !pat.rels[0].var.is_empty() {
3900 row_vals.insert(
3901 format!("{}.__type__", pat.rels[0].var),
3902 Value::String(pat.rels[0].rel_type.clone()),
3903 );
3904 }
3905 if !pat.rels[1].var.is_empty() {
3906 row_vals.insert(
3907 format!("{}.__type__", pat.rels[1].var),
3908 Value::String(pat.rels[1].rel_type.clone()),
3909 );
3910 }
3911 row_vals.extend(self.dollar_params());
3912 if !self.eval_where_graph(where_expr, &row_vals) {
3913 continue;
3914 }
3915 }
3916
3917 let row = project_fof_row(
3918 &src_props,
3919 &fof_props,
3920 column_names,
3921 &src_node_pat.var,
3922 &self.store,
3923 );
3924 rows.push(row);
3925 }
3926 }
3927
3928 if m.distinct {
3930 deduplicate_rows(&mut rows);
3931 }
3932
3933 apply_order_by(&mut rows, m, column_names);
3935
3936 if let Some(skip) = m.skip {
3938 let skip = (skip as usize).min(rows.len());
3939 rows.drain(0..skip);
3940 }
3941
3942 if let Some(lim) = m.limit {
3944 rows.truncate(lim as usize);
3945 }
3946
3947 tracing::debug!(rows = rows.len(), "two-hop traversal complete");
3948 Ok(QueryResult {
3949 columns: column_names.to_vec(),
3950 rows,
3951 })
3952 }
3953
3954 fn execute_n_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
3969 let pat = &m.pattern[0];
3970 let n_nodes = pat.nodes.len();
3971 let n_rels = pat.rels.len();
3972
3973 if n_nodes != n_rels + 1 {
3975 return Err(sparrowdb_common::Error::Unimplemented);
3976 }
3977
3978 let col_ids_per_node: Vec<Vec<u32>> = (0..n_nodes)
3981 .map(|i| {
3982 let node_pat = &pat.nodes[i];
3983 let var = &node_pat.var;
3984 let mut ids = if var.is_empty() {
3985 vec![]
3986 } else {
3987 collect_col_ids_for_var(var, column_names, 0)
3988 };
3989 if let Some(ref where_expr) = m.where_clause {
3991 if !var.is_empty() {
3992 collect_col_ids_from_expr_for_var(where_expr, var, &mut ids);
3993 }
3994 }
3995 for p in &node_pat.props {
3997 let col_id = prop_name_to_col_id(&p.key);
3998 if !ids.contains(&col_id) {
3999 ids.push(col_id);
4000 }
4001 }
4002 if ids.is_empty() {
4004 ids.push(0);
4005 }
4006 ids
4007 })
4008 .collect();
4009
4010 let label_ids_per_node: Vec<Option<u32>> = (0..n_nodes)
4012 .map(|i| {
4013 let label = pat.nodes[i].labels.first().cloned().unwrap_or_default();
4014 if label.is_empty() {
4015 None
4016 } else {
4017 self.catalog
4018 .get_label(&label)
4019 .ok()
4020 .flatten()
4021 .map(|id| id as u32)
4022 }
4023 })
4024 .collect();
4025
4026 let src_label_id = match label_ids_per_node[0] {
4028 Some(id) => id,
4029 None => return Err(sparrowdb_common::Error::Unimplemented),
4030 };
4031 let hwm_src = self.store.hwm_for_label(src_label_id)?;
4032
4033 let delta_all = self.read_delta_all();
4035
4036 let mut rows: Vec<Vec<Value>> = Vec::new();
4037
4038 for src_slot in 0..hwm_src {
4039 self.check_deadline()?;
4041
4042 let src_node_id = NodeId(((src_label_id as u64) << 32) | src_slot);
4043
4044 if self.is_node_tombstoned(src_node_id) {
4046 continue;
4047 }
4048
4049 let src_props = read_node_props(&self.store, src_node_id, &col_ids_per_node[0])?;
4050
4051 if !self.matches_prop_filter(&src_props, &pat.nodes[0].props) {
4053 continue;
4054 }
4055
4056 let mut row_vals: HashMap<String, Value> = HashMap::new();
4058 if !pat.nodes[0].var.is_empty() {
4059 for &(col_id, raw) in &src_props {
4060 let key = format!("{}.col_{col_id}", pat.nodes[0].var);
4061 row_vals.insert(key, decode_raw_val(raw, &self.store));
4062 }
4063 }
4064
4065 let mut frontier: Vec<(u64, HashMap<String, Value>)> = vec![(src_slot, row_vals)];
4069
4070 for hop_idx in 0..n_rels {
4071 let next_node_pat = &pat.nodes[hop_idx + 1];
4072 let next_label_id_opt = label_ids_per_node[hop_idx + 1];
4073 let next_col_ids = &col_ids_per_node[hop_idx + 1];
4074 let cur_label_id = label_ids_per_node[hop_idx].unwrap_or(src_label_id);
4075
4076 let mut next_frontier: Vec<(u64, HashMap<String, Value>)> = Vec::new();
4077
4078 for (cur_slot, cur_vals) in frontier {
4079 let csr_nb: Vec<u64> = self.csr_neighbors_all(cur_slot);
4081 let delta_nb: Vec<u64> = delta_all
4082 .iter()
4083 .filter(|r| {
4084 let r_src_label = (r.src.0 >> 32) as u32;
4085 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4086 r_src_label == cur_label_id && r_src_slot == cur_slot
4087 })
4088 .map(|r| r.dst.0 & 0xFFFF_FFFF)
4089 .collect();
4090
4091 let mut seen: HashSet<u64> = HashSet::new();
4092 let all_nb: Vec<u64> = csr_nb
4093 .into_iter()
4094 .chain(delta_nb)
4095 .filter(|&nb| seen.insert(nb))
4096 .collect();
4097
4098 for next_slot in all_nb {
4099 let next_node_id = if let Some(lbl_id) = next_label_id_opt {
4100 NodeId(((lbl_id as u64) << 32) | next_slot)
4101 } else {
4102 NodeId(next_slot)
4103 };
4104
4105 let next_props = read_node_props(&self.store, next_node_id, next_col_ids)?;
4106
4107 if !self.matches_prop_filter(&next_props, &next_node_pat.props) {
4109 continue;
4110 }
4111
4112 let mut new_vals = cur_vals.clone();
4115 if !next_node_pat.var.is_empty() {
4116 for &(col_id, raw) in &next_props {
4117 let key = format!("{}.col_{col_id}", next_node_pat.var);
4118 new_vals.insert(key, decode_raw_val(raw, &self.store));
4119 }
4120 }
4121
4122 next_frontier.push((next_slot, new_vals));
4123 }
4124 }
4125
4126 frontier = next_frontier;
4127 }
4128
4129 for (_final_slot, path_vals) in frontier {
4131 if let Some(ref where_expr) = m.where_clause {
4133 let mut eval_vals = path_vals.clone();
4134 eval_vals.extend(self.dollar_params());
4135 if !self.eval_where_graph(where_expr, &eval_vals) {
4136 continue;
4137 }
4138 }
4139
4140 let row: Vec<Value> = column_names
4143 .iter()
4144 .map(|col_name| {
4145 if let Some((var, prop)) = col_name.split_once('.') {
4146 let key = format!("{var}.col_{}", col_id_of(prop));
4147 path_vals.get(&key).cloned().unwrap_or(Value::Null)
4148 } else {
4149 Value::Null
4150 }
4151 })
4152 .collect();
4153
4154 rows.push(row);
4155 }
4156 }
4157
4158 if m.distinct {
4160 deduplicate_rows(&mut rows);
4161 }
4162
4163 apply_order_by(&mut rows, m, column_names);
4165
4166 if let Some(skip) = m.skip {
4168 let skip = (skip as usize).min(rows.len());
4169 rows.drain(0..skip);
4170 }
4171
4172 if let Some(lim) = m.limit {
4174 rows.truncate(lim as usize);
4175 }
4176
4177 tracing::debug!(
4178 rows = rows.len(),
4179 n_rels = n_rels,
4180 "n-hop traversal complete"
4181 );
4182 Ok(QueryResult {
4183 columns: column_names.to_vec(),
4184 rows,
4185 })
4186 }
4187
4188 fn get_node_neighbors_labeled(
4203 &self,
4204 src_slot: u64,
4205 src_label_id: u32,
4206 delta_all: &[sparrowdb_storage::edge_store::DeltaRecord],
4207 node_label: &std::collections::HashMap<(u64, u32), ()>,
4208 all_label_ids: &[u32],
4209 ) -> Vec<(u64, u32)> {
4210 let csr_slots: Vec<u64> = self.csr_neighbors_all(src_slot);
4213
4214 let mut out: std::collections::HashMap<(u64, u32), ()> = std::collections::HashMap::new();
4218
4219 for r in delta_all.iter().filter(|r| {
4221 let r_src_label = (r.src.0 >> 32) as u32;
4222 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4223 r_src_label == src_label_id && r_src_slot == src_slot
4224 }) {
4225 let dst_slot = r.dst.0 & 0xFFFF_FFFF;
4226 let dst_label = (r.dst.0 >> 32) as u32;
4227 out.insert((dst_slot, dst_label), ());
4228 }
4229
4230 'csr: for dst_slot in csr_slots {
4234 for &lid in all_label_ids {
4236 if out.contains_key(&(dst_slot, lid)) {
4237 continue 'csr; }
4239 }
4240 let mut found = false;
4243 for &lid in all_label_ids {
4244 if node_label.contains_key(&(dst_slot, lid)) {
4245 out.insert((dst_slot, lid), ());
4246 found = true;
4247 break;
4248 }
4249 }
4250 if !found {
4251 out.insert((dst_slot, src_label_id), ());
4255 }
4256 }
4257
4258 out.into_keys().collect()
4259 }
4260
4261 fn execute_variable_hops(
4274 &self,
4275 src_slot: u64,
4276 src_label_id: u32,
4277 min_hops: u32,
4278 max_hops: u32,
4279 ) -> Vec<(u64, u32)> {
4280 const SAFETY_CAP: u32 = 10;
4281 let max_hops = max_hops.min(SAFETY_CAP);
4282
4283 let delta_all = self.read_delta_all();
4287 let mut node_label: std::collections::HashMap<(u64, u32), ()> =
4288 std::collections::HashMap::new();
4289 for r in &delta_all {
4290 let src_s = r.src.0 & 0xFFFF_FFFF;
4291 let src_l = (r.src.0 >> 32) as u32;
4292 node_label.insert((src_s, src_l), ());
4293 let dst_s = r.dst.0 & 0xFFFF_FFFF;
4294 let dst_l = (r.dst.0 >> 32) as u32;
4295 node_label.insert((dst_s, dst_l), ());
4296 }
4297 let mut all_label_ids: Vec<u32> = node_label.keys().map(|&(_, l)| l).collect();
4298 all_label_ids.sort_unstable();
4299 all_label_ids.dedup();
4300
4301 let mut visited: std::collections::HashSet<(u64, u32)> = std::collections::HashSet::new();
4307 visited.insert((src_slot, src_label_id));
4308
4309 let mut results: std::collections::HashSet<(u64, u32)> = std::collections::HashSet::new();
4311 if min_hops == 0 {
4312 results.insert((src_slot, src_label_id));
4313 }
4314
4315 let mut frontier: Vec<(u64, u32)> = vec![(src_slot, src_label_id)];
4318
4319 for depth in 1..=max_hops {
4320 let mut next_frontier: Vec<(u64, u32)> = Vec::new();
4321 for &(node_slot, node_label_id) in &frontier {
4322 let neighbors = self.get_node_neighbors_labeled(
4323 node_slot,
4324 node_label_id,
4325 &delta_all,
4326 &node_label,
4327 &all_label_ids,
4328 );
4329 for (nb_slot, nb_label) in neighbors {
4330 if visited.insert((nb_slot, nb_label)) {
4333 next_frontier.push((nb_slot, nb_label));
4334 if depth >= min_hops {
4335 results.insert((nb_slot, nb_label));
4336 }
4337 }
4338 }
4339 }
4340 if next_frontier.is_empty() {
4341 break;
4342 }
4343 frontier = next_frontier;
4344 }
4345
4346 results.into_iter().collect()
4347 }
4348
4349 fn get_node_neighbors_by_slot(
4351 &self,
4352 src_slot: u64,
4353 src_label_id: u32,
4354 delta_all: &[sparrowdb_storage::edge_store::DeltaRecord],
4355 ) -> Vec<u64> {
4356 let csr_neighbors: Vec<u64> = self.csr_neighbors_all(src_slot);
4357 let delta_neighbors: Vec<u64> = delta_all
4358 .iter()
4359 .filter(|r| {
4360 let r_src_label = (r.src.0 >> 32) as u32;
4361 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4362 r_src_label == src_label_id && r_src_slot == src_slot
4363 })
4364 .map(|r| r.dst.0 & 0xFFFF_FFFF)
4365 .collect();
4366 let mut all: std::collections::HashSet<u64> = csr_neighbors.into_iter().collect();
4367 all.extend(delta_neighbors);
4368 all.into_iter().collect()
4369 }
4370
4371 fn execute_variable_length(
4373 &self,
4374 m: &MatchStatement,
4375 column_names: &[String],
4376 ) -> Result<QueryResult> {
4377 let pat = &m.pattern[0];
4378 let src_node_pat = &pat.nodes[0];
4379 let dst_node_pat = &pat.nodes[1];
4380 let rel_pat = &pat.rels[0];
4381
4382 if rel_pat.dir != sparrowdb_cypher::ast::EdgeDir::Outgoing {
4383 return Err(sparrowdb_common::Error::Unimplemented);
4384 }
4385
4386 let min_hops = rel_pat.min_hops.unwrap_or(1);
4387 let max_hops = rel_pat.max_hops.unwrap_or(10); let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
4390 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
4391
4392 let src_label_id = self
4393 .catalog
4394 .get_label(&src_label)?
4395 .ok_or(sparrowdb_common::Error::NotFound)? as u32;
4396 let dst_label_id: Option<u32> = if dst_label.is_empty() {
4398 None
4399 } else {
4400 Some(
4401 self.catalog
4402 .get_label(&dst_label)?
4403 .ok_or(sparrowdb_common::Error::NotFound)? as u32,
4404 )
4405 };
4406
4407 let hwm_src = self.store.hwm_for_label(src_label_id)?;
4408
4409 let col_ids_src = collect_col_ids_for_var(&src_node_pat.var, column_names, src_label_id);
4410 let col_ids_dst =
4411 collect_col_ids_for_var(&dst_node_pat.var, column_names, dst_label_id.unwrap_or(0));
4412
4413 let dst_all_col_ids: Vec<u32> = {
4416 let mut v = col_ids_dst.clone();
4417 for p in &dst_node_pat.props {
4418 let col_id = prop_name_to_col_id(&p.key);
4419 if !v.contains(&col_id) {
4420 v.push(col_id);
4421 }
4422 }
4423 if let Some(ref where_expr) = m.where_clause {
4424 collect_col_ids_from_expr(where_expr, &mut v);
4425 }
4426 v
4427 };
4428
4429 let mut rows: Vec<Vec<Value>> = Vec::new();
4430 let mut seen_pairs: std::collections::HashSet<(u64, u64, u32)> =
4434 std::collections::HashSet::new();
4435
4436 let labels_by_id: std::collections::HashMap<u16, String> = self
4439 .catalog
4440 .list_labels()
4441 .unwrap_or_default()
4442 .into_iter()
4443 .collect();
4444
4445 for src_slot in 0..hwm_src {
4446 self.check_deadline()?;
4448
4449 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
4450
4451 let src_all_col_ids: Vec<u32> = {
4453 let mut v = col_ids_src.clone();
4454 for p in &src_node_pat.props {
4455 let col_id = prop_name_to_col_id(&p.key);
4456 if !v.contains(&col_id) {
4457 v.push(col_id);
4458 }
4459 }
4460 if let Some(ref where_expr) = m.where_clause {
4461 collect_col_ids_from_expr(where_expr, &mut v);
4462 }
4463 v
4464 };
4465 let src_props = read_node_props(&self.store, src_node, &src_all_col_ids)?;
4466
4467 if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
4468 continue;
4469 }
4470
4471 let dst_nodes = self.execute_variable_hops(src_slot, src_label_id, min_hops, max_hops);
4473
4474 for (dst_slot, actual_label_id) in dst_nodes {
4475 if let Some(required_label) = dst_label_id {
4478 if actual_label_id != required_label {
4479 continue;
4480 }
4481 }
4482
4483 let resolved_dst_label_id = dst_label_id.unwrap_or(actual_label_id);
4486
4487 if !seen_pairs.insert((src_slot, dst_slot, actual_label_id)) {
4488 continue;
4489 }
4490
4491 let dst_node = NodeId(((resolved_dst_label_id as u64) << 32) | dst_slot);
4492 let dst_props = read_node_props(&self.store, dst_node, &dst_all_col_ids)?;
4497
4498 if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
4499 continue;
4500 }
4501
4502 let resolved_dst_label_name: String = if !dst_label.is_empty() {
4506 dst_label.clone()
4507 } else {
4508 labels_by_id
4509 .get(&(actual_label_id as u16))
4510 .cloned()
4511 .unwrap_or_default()
4512 };
4513
4514 if let Some(ref where_expr) = m.where_clause {
4516 let mut row_vals =
4517 build_row_vals(&src_props, &src_node_pat.var, &col_ids_src, &self.store);
4518 row_vals.extend(build_row_vals(
4519 &dst_props,
4520 &dst_node_pat.var,
4521 &col_ids_dst,
4522 &self.store,
4523 ));
4524 if !rel_pat.var.is_empty() {
4526 row_vals.insert(
4527 format!("{}.__type__", rel_pat.var),
4528 Value::String(rel_pat.rel_type.clone()),
4529 );
4530 }
4531 if !src_node_pat.var.is_empty() && !src_label.is_empty() {
4533 row_vals.insert(
4534 format!("{}.__labels__", src_node_pat.var),
4535 Value::List(vec![Value::String(src_label.clone())]),
4536 );
4537 }
4538 if !dst_node_pat.var.is_empty() && !resolved_dst_label_name.is_empty() {
4541 row_vals.insert(
4542 format!("{}.__labels__", dst_node_pat.var),
4543 Value::List(vec![Value::String(resolved_dst_label_name.clone())]),
4544 );
4545 }
4546 row_vals.extend(self.dollar_params());
4547 if !self.eval_where_graph(where_expr, &row_vals) {
4548 continue;
4549 }
4550 }
4551
4552 let rel_var_type = if !rel_pat.var.is_empty() {
4553 Some((rel_pat.var.as_str(), rel_pat.rel_type.as_str()))
4554 } else {
4555 None
4556 };
4557 let src_label_meta = if !src_node_pat.var.is_empty() && !src_label.is_empty() {
4558 Some((src_node_pat.var.as_str(), src_label.as_str()))
4559 } else {
4560 None
4561 };
4562 let dst_label_meta =
4563 if !dst_node_pat.var.is_empty() && !resolved_dst_label_name.is_empty() {
4564 Some((dst_node_pat.var.as_str(), resolved_dst_label_name.as_str()))
4565 } else {
4566 None
4567 };
4568 let row = project_hop_row(
4569 &src_props,
4570 &dst_props,
4571 column_names,
4572 &src_node_pat.var,
4573 &dst_node_pat.var,
4574 rel_var_type,
4575 src_label_meta,
4576 dst_label_meta,
4577 &self.store,
4578 );
4579 rows.push(row);
4580 }
4581 }
4582
4583 if m.distinct {
4585 deduplicate_rows(&mut rows);
4586 }
4587
4588 apply_order_by(&mut rows, m, column_names);
4590
4591 if let Some(skip) = m.skip {
4593 let skip = (skip as usize).min(rows.len());
4594 rows.drain(0..skip);
4595 }
4596
4597 if let Some(lim) = m.limit {
4599 rows.truncate(lim as usize);
4600 }
4601
4602 tracing::debug!(
4603 rows = rows.len(),
4604 min_hops,
4605 max_hops,
4606 "variable-length traversal complete"
4607 );
4608 Ok(QueryResult {
4609 columns: column_names.to_vec(),
4610 rows,
4611 })
4612 }
4613
4614 fn matches_prop_filter(
4617 &self,
4618 props: &[(u32, u64)],
4619 filters: &[sparrowdb_cypher::ast::PropEntry],
4620 ) -> bool {
4621 matches_prop_filter_static(props, filters, &self.dollar_params(), &self.store)
4622 }
4623
4624 fn dollar_params(&self) -> HashMap<String, Value> {
4630 self.params
4631 .iter()
4632 .map(|(k, v)| (format!("${k}"), v.clone()))
4633 .collect()
4634 }
4635
4636 fn eval_expr_graph(&self, expr: &Expr, vals: &HashMap<String, Value>) -> Value {
4640 match expr {
4641 Expr::ExistsSubquery(ep) => Value::Bool(self.eval_exists_subquery(ep, vals)),
4642 Expr::ShortestPath(sp) => self.eval_shortest_path_expr(sp, vals),
4643 Expr::CaseWhen {
4644 branches,
4645 else_expr,
4646 } => {
4647 for (cond, then_val) in branches {
4648 if let Value::Bool(true) = self.eval_expr_graph(cond, vals) {
4649 return self.eval_expr_graph(then_val, vals);
4650 }
4651 }
4652 else_expr
4653 .as_ref()
4654 .map(|e| self.eval_expr_graph(e, vals))
4655 .unwrap_or(Value::Null)
4656 }
4657 Expr::And(l, r) => {
4658 match (self.eval_expr_graph(l, vals), self.eval_expr_graph(r, vals)) {
4659 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
4660 _ => Value::Null,
4661 }
4662 }
4663 Expr::Or(l, r) => {
4664 match (self.eval_expr_graph(l, vals), self.eval_expr_graph(r, vals)) {
4665 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
4666 _ => Value::Null,
4667 }
4668 }
4669 Expr::Not(inner) => match self.eval_expr_graph(inner, vals) {
4670 Value::Bool(b) => Value::Bool(!b),
4671 _ => Value::Null,
4672 },
4673 Expr::PropAccess { var, prop } => {
4676 let normal = eval_expr(expr, vals);
4678 if !matches!(normal, Value::Null) {
4679 return normal;
4680 }
4681 if let Some(Value::NodeRef(node_id)) = vals
4683 .get(var.as_str())
4684 .or_else(|| vals.get(&format!("{var}.__node_id__")))
4685 {
4686 let col_id = prop_name_to_col_id(prop);
4687 if let Ok(props) = self.store.get_node_raw(*node_id, &[col_id]) {
4688 if let Some(&(_, raw)) = props.iter().find(|(c, _)| *c == col_id) {
4689 return decode_raw_val(raw, &self.store);
4690 }
4691 }
4692 }
4693 Value::Null
4694 }
4695 _ => eval_expr(expr, vals),
4696 }
4697 }
4698
4699 fn eval_where_graph(&self, expr: &Expr, vals: &HashMap<String, Value>) -> bool {
4701 match self.eval_expr_graph(expr, vals) {
4702 Value::Bool(b) => b,
4703 _ => eval_where(expr, vals),
4704 }
4705 }
4706
4707 fn eval_exists_subquery(
4709 &self,
4710 ep: &sparrowdb_cypher::ast::ExistsPattern,
4711 vals: &HashMap<String, Value>,
4712 ) -> bool {
4713 let path = &ep.path;
4714 if path.nodes.len() < 2 || path.rels.is_empty() {
4715 return false;
4716 }
4717 let src_pat = &path.nodes[0];
4718 let dst_pat = &path.nodes[1];
4719 let rel_pat = &path.rels[0];
4720
4721 let src_node_id = match self.resolve_node_id_from_var(&src_pat.var, vals) {
4722 Some(id) => id,
4723 None => return false,
4724 };
4725 let src_slot = src_node_id.0 & 0xFFFF_FFFF;
4726 let src_label_id = (src_node_id.0 >> 32) as u32;
4727
4728 let dst_label = dst_pat.labels.first().map(String::as_str).unwrap_or("");
4729 let dst_label_id_opt: Option<u32> = if dst_label.is_empty() {
4730 None
4731 } else {
4732 self.catalog
4733 .get_label(dst_label)
4734 .ok()
4735 .flatten()
4736 .map(|id| id as u32)
4737 };
4738
4739 let rel_lookup = if let Some(dst_lid) = dst_label_id_opt {
4740 self.resolve_rel_table_id(src_label_id, dst_lid, &rel_pat.rel_type)
4741 } else {
4742 RelTableLookup::All
4743 };
4744
4745 let csr_nb: Vec<u64> = match rel_lookup {
4746 RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
4747 RelTableLookup::NotFound => return false,
4748 RelTableLookup::All => self.csr_neighbors_all(src_slot),
4749 };
4750 let delta_nb: Vec<u64> = self
4751 .read_delta_all()
4752 .into_iter()
4753 .filter(|r| {
4754 let r_src_label = (r.src.0 >> 32) as u32;
4755 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4756 if r_src_label != src_label_id || r_src_slot != src_slot {
4757 return false;
4758 }
4759 if let Some(dst_lid) = dst_label_id_opt {
4763 let r_dst_label = (r.dst.0 >> 32) as u32;
4764 r_dst_label == dst_lid
4765 } else {
4766 true
4767 }
4768 })
4769 .map(|r| r.dst.0 & 0xFFFF_FFFF)
4770 .collect();
4771
4772 let all_nb: std::collections::HashSet<u64> = csr_nb.into_iter().chain(delta_nb).collect();
4773
4774 for dst_slot in all_nb {
4775 if let Some(did) = dst_label_id_opt {
4776 let probe_id = NodeId(((did as u64) << 32) | dst_slot);
4777 if self.store.get_node_raw(probe_id, &[]).is_err() {
4778 continue;
4779 }
4780 if !dst_pat.props.is_empty() {
4781 let col_ids: Vec<u32> = dst_pat
4782 .props
4783 .iter()
4784 .map(|p| prop_name_to_col_id(&p.key))
4785 .collect();
4786 match self.store.get_node_raw(probe_id, &col_ids) {
4787 Ok(props) => {
4788 let params = self.dollar_params();
4789 if !matches_prop_filter_static(
4790 &props,
4791 &dst_pat.props,
4792 ¶ms,
4793 &self.store,
4794 ) {
4795 continue;
4796 }
4797 }
4798 Err(_) => continue,
4799 }
4800 }
4801 }
4802 return true;
4803 }
4804 false
4805 }
4806
4807 fn resolve_node_id_from_var(&self, var: &str, vals: &HashMap<String, Value>) -> Option<NodeId> {
4809 let id_key = format!("{var}.__node_id__");
4810 if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
4811 return Some(*nid);
4812 }
4813 if let Some(Value::NodeRef(nid)) = vals.get(var) {
4814 return Some(*nid);
4815 }
4816 None
4817 }
4818
4819 fn eval_shortest_path_expr(
4821 &self,
4822 sp: &sparrowdb_cypher::ast::ShortestPathExpr,
4823 vals: &HashMap<String, Value>,
4824 ) -> Value {
4825 let (src_label_id, src_slot) =
4830 if let Some(nid) = self.resolve_node_id_from_var(&sp.src_var, vals) {
4831 let label_id = (nid.0 >> 32) as u32;
4832 let slot = nid.0 & 0xFFFF_FFFF;
4833 (label_id, slot)
4834 } else {
4835 let label_id = match self.catalog.get_label(&sp.src_label) {
4837 Ok(Some(id)) => id as u32,
4838 _ => return Value::Null,
4839 };
4840 match self.find_node_by_props(label_id, &sp.src_props) {
4841 Some(slot) => (label_id, slot),
4842 None => return Value::Null,
4843 }
4844 };
4845
4846 let dst_slot = if let Some(nid) = self.resolve_node_id_from_var(&sp.dst_var, vals) {
4847 nid.0 & 0xFFFF_FFFF
4848 } else {
4849 let dst_label_id = match self.catalog.get_label(&sp.dst_label) {
4850 Ok(Some(id)) => id as u32,
4851 _ => return Value::Null,
4852 };
4853 match self.find_node_by_props(dst_label_id, &sp.dst_props) {
4854 Some(slot) => slot,
4855 None => return Value::Null,
4856 }
4857 };
4858
4859 match self.bfs_shortest_path(src_slot, src_label_id, dst_slot, 10) {
4860 Some(hops) => Value::Int64(hops as i64),
4861 None => Value::Null,
4862 }
4863 }
4864
4865 fn find_node_by_props(
4867 &self,
4868 label_id: u32,
4869 props: &[sparrowdb_cypher::ast::PropEntry],
4870 ) -> Option<u64> {
4871 if props.is_empty() {
4872 return None;
4873 }
4874 let hwm = self.store.hwm_for_label(label_id).ok()?;
4875 let col_ids: Vec<u32> = props.iter().map(|p| prop_name_to_col_id(&p.key)).collect();
4876 let params = self.dollar_params();
4877 for slot in 0..hwm {
4878 let node_id = NodeId(((label_id as u64) << 32) | slot);
4879 if let Ok(raw_props) = self.store.get_node_raw(node_id, &col_ids) {
4880 if matches_prop_filter_static(&raw_props, props, ¶ms, &self.store) {
4881 return Some(slot);
4882 }
4883 }
4884 }
4885 None
4886 }
4887
4888 fn bfs_shortest_path(
4897 &self,
4898 src_slot: u64,
4899 src_label_id: u32,
4900 dst_slot: u64,
4901 max_hops: u32,
4902 ) -> Option<u32> {
4903 if src_slot == dst_slot {
4904 return Some(0);
4905 }
4906 let delta_all = self.read_delta_all();
4908 let mut visited: std::collections::HashSet<u64> = std::collections::HashSet::new();
4909 visited.insert(src_slot);
4910 let mut frontier: Vec<u64> = vec![src_slot];
4911
4912 for depth in 1..=max_hops {
4913 let mut next_frontier: Vec<u64> = Vec::new();
4914 for &node_slot in &frontier {
4915 let neighbors =
4916 self.get_node_neighbors_by_slot(node_slot, src_label_id, &delta_all);
4917 for nb in neighbors {
4918 if nb == dst_slot {
4919 return Some(depth);
4920 }
4921 if visited.insert(nb) {
4922 next_frontier.push(nb);
4923 }
4924 }
4925 }
4926 if next_frontier.is_empty() {
4927 break;
4928 }
4929 frontier = next_frontier;
4930 }
4931 None
4932 }
4933
4934 fn aggregate_rows_graph(
4937 &self,
4938 rows: &[HashMap<String, Value>],
4939 return_items: &[ReturnItem],
4940 ) -> Vec<Vec<Value>> {
4941 let needs_graph = return_items.iter().any(|item| expr_needs_graph(&item.expr));
4943 if !needs_graph {
4944 return aggregate_rows(rows, return_items);
4945 }
4946 rows.iter()
4948 .map(|row_vals| {
4949 return_items
4950 .iter()
4951 .map(|item| self.eval_expr_graph(&item.expr, row_vals))
4952 .collect()
4953 })
4954 .collect()
4955 }
4956}
4957
4958fn matches_prop_filter_static(
4961 props: &[(u32, u64)],
4962 filters: &[sparrowdb_cypher::ast::PropEntry],
4963 params: &HashMap<String, Value>,
4964 store: &NodeStore,
4965) -> bool {
4966 for f in filters {
4967 let col_id = prop_name_to_col_id(&f.key);
4968 let stored_val = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
4969
4970 let filter_val = eval_expr(&f.value, params);
4973 let matches = match filter_val {
4974 Value::Int64(n) => {
4975 stored_val == Some(StoreValue::Int64(n).to_u64())
4978 }
4979 Value::Bool(b) => {
4980 let expected = StoreValue::Int64(if b { 1 } else { 0 }).to_u64();
4983 stored_val == Some(expected)
4984 }
4985 Value::String(s) => {
4986 stored_val.is_some_and(|raw| store.raw_str_matches(raw, &s))
4989 }
4990 Value::Float64(f) => {
4991 stored_val.is_some_and(|raw| {
4994 matches!(store.decode_raw_value(raw), StoreValue::Float(stored_f) if stored_f == f)
4995 })
4996 }
4997 Value::Null => true, _ => false,
4999 };
5000 if !matches {
5001 return false;
5002 }
5003 }
5004 true
5005}
5006
5007fn eval_list_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<Value>> {
5016 match expr {
5017 Expr::List(elems) => {
5018 let mut values = Vec::with_capacity(elems.len());
5019 for elem in elems {
5020 values.push(eval_scalar_expr(elem));
5021 }
5022 Ok(values)
5023 }
5024 Expr::Literal(Literal::Param(name)) => {
5025 match params.get(name) {
5027 Some(Value::List(items)) => Ok(items.clone()),
5028 Some(other) => {
5029 Ok(vec![other.clone()])
5032 }
5033 None => {
5034 Ok(vec![])
5036 }
5037 }
5038 }
5039 Expr::FnCall { name, args } => {
5040 let name_lc = name.to_lowercase();
5043 if name_lc == "range" {
5044 let empty_vals: std::collections::HashMap<String, Value> =
5045 std::collections::HashMap::new();
5046 let evaluated: Vec<Value> =
5047 args.iter().map(|a| eval_expr(a, &empty_vals)).collect();
5048 let start = match evaluated.first() {
5050 Some(Value::Int64(n)) => *n,
5051 _ => {
5052 return Err(sparrowdb_common::Error::InvalidArgument(
5053 "range() expects integer arguments".into(),
5054 ))
5055 }
5056 };
5057 let end = match evaluated.get(1) {
5058 Some(Value::Int64(n)) => *n,
5059 _ => {
5060 return Err(sparrowdb_common::Error::InvalidArgument(
5061 "range() expects at least 2 integer arguments".into(),
5062 ))
5063 }
5064 };
5065 let step: i64 = match evaluated.get(2) {
5066 Some(Value::Int64(n)) => *n,
5067 None => 1,
5068 _ => 1,
5069 };
5070 if step == 0 {
5071 return Err(sparrowdb_common::Error::InvalidArgument(
5072 "range(): step must not be zero".into(),
5073 ));
5074 }
5075 let mut values = Vec::new();
5076 if step > 0 {
5077 let mut i = start;
5078 while i <= end {
5079 values.push(Value::Int64(i));
5080 i += step;
5081 }
5082 } else {
5083 let mut i = start;
5084 while i >= end {
5085 values.push(Value::Int64(i));
5086 i += step;
5087 }
5088 }
5089 Ok(values)
5090 } else {
5091 Err(sparrowdb_common::Error::InvalidArgument(format!(
5093 "UNWIND: function '{name}' does not return a list"
5094 )))
5095 }
5096 }
5097 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
5098 "UNWIND expression is not a list: {:?}",
5099 other
5100 ))),
5101 }
5102}
5103
5104fn eval_scalar_expr(expr: &Expr) -> Value {
5106 match expr {
5107 Expr::Literal(lit) => match lit {
5108 Literal::Int(n) => Value::Int64(*n),
5109 Literal::Float(f) => Value::Float64(*f),
5110 Literal::Bool(b) => Value::Bool(*b),
5111 Literal::String(s) => Value::String(s.clone()),
5112 Literal::Null => Value::Null,
5113 Literal::Param(_) => Value::Null,
5114 },
5115 _ => Value::Null,
5116 }
5117}
5118
5119fn extract_return_column_names(items: &[ReturnItem]) -> Vec<String> {
5120 items
5121 .iter()
5122 .map(|item| match &item.alias {
5123 Some(alias) => alias.clone(),
5124 None => match &item.expr {
5125 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
5126 Expr::Var(v) => v.clone(),
5127 Expr::CountStar => "count(*)".to_string(),
5128 Expr::FnCall { name, args } => {
5129 let arg_str = args
5130 .first()
5131 .map(|a| match a {
5132 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
5133 Expr::Var(v) => v.clone(),
5134 _ => "*".to_string(),
5135 })
5136 .unwrap_or_else(|| "*".to_string());
5137 format!("{}({})", name.to_lowercase(), arg_str)
5138 }
5139 _ => "?".to_string(),
5140 },
5141 })
5142 .collect()
5143}
5144
5145fn collect_col_ids_from_expr_for_var(expr: &Expr, target_var: &str, out: &mut Vec<u32>) {
5152 match expr {
5153 Expr::PropAccess { var, prop } => {
5154 if var == target_var {
5155 let col_id = prop_name_to_col_id(prop);
5156 if !out.contains(&col_id) {
5157 out.push(col_id);
5158 }
5159 }
5160 }
5161 Expr::BinOp { left, right, .. } => {
5162 collect_col_ids_from_expr_for_var(left, target_var, out);
5163 collect_col_ids_from_expr_for_var(right, target_var, out);
5164 }
5165 Expr::And(l, r) | Expr::Or(l, r) => {
5166 collect_col_ids_from_expr_for_var(l, target_var, out);
5167 collect_col_ids_from_expr_for_var(r, target_var, out);
5168 }
5169 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
5170 collect_col_ids_from_expr_for_var(inner, target_var, out);
5171 }
5172 Expr::InList { expr, list, .. } => {
5173 collect_col_ids_from_expr_for_var(expr, target_var, out);
5174 for item in list {
5175 collect_col_ids_from_expr_for_var(item, target_var, out);
5176 }
5177 }
5178 Expr::FnCall { args, .. } | Expr::List(args) => {
5179 for arg in args {
5180 collect_col_ids_from_expr_for_var(arg, target_var, out);
5181 }
5182 }
5183 Expr::ListPredicate {
5184 list_expr,
5185 predicate,
5186 ..
5187 } => {
5188 collect_col_ids_from_expr_for_var(list_expr, target_var, out);
5189 collect_col_ids_from_expr_for_var(predicate, target_var, out);
5190 }
5191 Expr::CaseWhen {
5193 branches,
5194 else_expr,
5195 } => {
5196 for (cond, then_val) in branches {
5197 collect_col_ids_from_expr_for_var(cond, target_var, out);
5198 collect_col_ids_from_expr_for_var(then_val, target_var, out);
5199 }
5200 if let Some(e) = else_expr {
5201 collect_col_ids_from_expr_for_var(e, target_var, out);
5202 }
5203 }
5204 _ => {}
5205 }
5206}
5207
5208fn collect_col_ids_from_expr(expr: &Expr, out: &mut Vec<u32>) {
5213 match expr {
5214 Expr::PropAccess { prop, .. } => {
5215 let col_id = prop_name_to_col_id(prop);
5216 if !out.contains(&col_id) {
5217 out.push(col_id);
5218 }
5219 }
5220 Expr::BinOp { left, right, .. } => {
5221 collect_col_ids_from_expr(left, out);
5222 collect_col_ids_from_expr(right, out);
5223 }
5224 Expr::And(l, r) | Expr::Or(l, r) => {
5225 collect_col_ids_from_expr(l, out);
5226 collect_col_ids_from_expr(r, out);
5227 }
5228 Expr::Not(inner) => collect_col_ids_from_expr(inner, out),
5229 Expr::InList { expr, list, .. } => {
5230 collect_col_ids_from_expr(expr, out);
5231 for item in list {
5232 collect_col_ids_from_expr(item, out);
5233 }
5234 }
5235 Expr::FnCall { args, .. } => {
5237 for arg in args {
5238 collect_col_ids_from_expr(arg, out);
5239 }
5240 }
5241 Expr::ListPredicate {
5242 list_expr,
5243 predicate,
5244 ..
5245 } => {
5246 collect_col_ids_from_expr(list_expr, out);
5247 collect_col_ids_from_expr(predicate, out);
5248 }
5249 Expr::List(items) => {
5251 for item in items {
5252 collect_col_ids_from_expr(item, out);
5253 }
5254 }
5255 Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
5256 collect_col_ids_from_expr(inner, out);
5257 }
5258 Expr::CaseWhen {
5260 branches,
5261 else_expr,
5262 } => {
5263 for (cond, then_val) in branches {
5264 collect_col_ids_from_expr(cond, out);
5265 collect_col_ids_from_expr(then_val, out);
5266 }
5267 if let Some(e) = else_expr {
5268 collect_col_ids_from_expr(e, out);
5269 }
5270 }
5271 _ => {}
5272 }
5273}
5274
5275#[allow(dead_code)]
5280fn literal_to_store_value(lit: &Literal) -> StoreValue {
5281 match lit {
5282 Literal::Int(n) => StoreValue::Int64(*n),
5283 Literal::String(s) => StoreValue::Bytes(s.as_bytes().to_vec()),
5284 Literal::Float(f) => StoreValue::Float(*f),
5285 Literal::Bool(b) => StoreValue::Int64(if *b { 1 } else { 0 }),
5286 Literal::Null | Literal::Param(_) => StoreValue::Int64(0),
5287 }
5288}
5289
5290fn value_to_store_value(val: Value) -> StoreValue {
5295 match val {
5296 Value::Int64(n) => StoreValue::Int64(n),
5297 Value::Float64(f) => StoreValue::Float(f),
5298 Value::Bool(b) => StoreValue::Int64(if b { 1 } else { 0 }),
5299 Value::String(s) => StoreValue::Bytes(s.into_bytes()),
5300 Value::Null => StoreValue::Int64(0),
5301 Value::NodeRef(id) => StoreValue::Int64(id.0 as i64),
5302 Value::EdgeRef(id) => StoreValue::Int64(id.0 as i64),
5303 Value::List(_) => StoreValue::Int64(0),
5304 Value::Map(_) => StoreValue::Int64(0),
5305 }
5306}
5307
5308fn string_to_raw_u64(s: &str) -> u64 {
5314 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
5315}
5316
5317fn try_index_lookup_for_props(
5328 props: &[sparrowdb_cypher::ast::PropEntry],
5329 label_id: u32,
5330 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
5331) -> Option<Vec<u32>> {
5332 if props.len() != 1 {
5334 return None;
5335 }
5336 let filter = &props[0];
5337
5338 let raw_value: u64 = match &filter.value {
5340 Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
5341 Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
5342 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
5343 }
5344 _ => return None,
5347 };
5348
5349 let col_id = prop_name_to_col_id(&filter.key);
5350 if !prop_index.is_indexed(label_id, col_id) {
5351 return None;
5352 }
5353 Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
5354}
5355
5356fn try_text_index_lookup(
5369 expr: &Expr,
5370 node_var: &str,
5371 label_id: u32,
5372 text_index: &TextIndex,
5373) -> Option<Vec<u32>> {
5374 let (left, op, right) = match expr {
5375 Expr::BinOp { left, op, right }
5376 if matches!(op, BinOpKind::Contains | BinOpKind::StartsWith) =>
5377 {
5378 (left.as_ref(), op, right.as_ref())
5379 }
5380 _ => return None,
5381 };
5382
5383 let prop_name = match left {
5385 Expr::PropAccess { var, prop } if var.as_str() == node_var => prop.as_str(),
5386 _ => return None,
5387 };
5388
5389 let pattern = match right {
5391 Expr::Literal(Literal::String(s)) => s.as_str(),
5392 _ => return None,
5393 };
5394
5395 let col_id = prop_name_to_col_id(prop_name);
5396 if !text_index.is_indexed(label_id, col_id) {
5397 return None;
5398 }
5399
5400 let slots = match op {
5401 BinOpKind::Contains => text_index.lookup_contains(label_id, col_id, pattern),
5402 BinOpKind::StartsWith => text_index.lookup_starts_with(label_id, col_id, pattern),
5403 _ => return None,
5404 };
5405
5406 Some(slots)
5407}
5408
5409fn try_where_eq_index_lookup(
5420 expr: &Expr,
5421 node_var: &str,
5422 label_id: u32,
5423 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
5424) -> Option<Vec<u32>> {
5425 let (left, op, right) = match expr {
5426 Expr::BinOp { left, op, right } if matches!(op, BinOpKind::Eq) => {
5427 (left.as_ref(), op, right.as_ref())
5428 }
5429 _ => return None,
5430 };
5431 let _ = op;
5432
5433 let (prop_name, lit) = if let Expr::PropAccess { var, prop } = left {
5435 if var.as_str() == node_var {
5436 (prop.as_str(), right)
5437 } else {
5438 return None;
5439 }
5440 } else if let Expr::PropAccess { var, prop } = right {
5441 if var.as_str() == node_var {
5442 (prop.as_str(), left)
5443 } else {
5444 return None;
5445 }
5446 } else {
5447 return None;
5448 };
5449
5450 let raw_value: u64 = match lit {
5451 Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
5452 Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
5453 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
5454 }
5455 _ => return None,
5456 };
5457
5458 let col_id = prop_name_to_col_id(prop_name);
5459 if !prop_index.is_indexed(label_id, col_id) {
5460 return None;
5461 }
5462 Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
5463}
5464
5465fn try_where_range_index_lookup(
5476 expr: &Expr,
5477 node_var: &str,
5478 label_id: u32,
5479 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
5480) -> Option<Vec<u32>> {
5481 use sparrowdb_storage::property_index::sort_key;
5482
5483 fn encode_int(n: i64) -> u64 {
5485 StoreValue::Int64(n).to_u64()
5486 }
5487
5488 #[allow(clippy::type_complexity)]
5491 fn extract_single_bound<'a>(
5492 expr: &'a Expr,
5493 node_var: &'a str,
5494 ) -> Option<(&'a str, Option<(u64, bool)>, Option<(u64, bool)>)> {
5495 let (left, op, right) = match expr {
5496 Expr::BinOp { left, op, right }
5497 if matches!(
5498 op,
5499 BinOpKind::Gt | BinOpKind::Ge | BinOpKind::Lt | BinOpKind::Le
5500 ) =>
5501 {
5502 (left.as_ref(), op, right.as_ref())
5503 }
5504 _ => return None,
5505 };
5506
5507 if let (Expr::PropAccess { var, prop }, Expr::Literal(Literal::Int(n))) = (left, right) {
5509 if var.as_str() != node_var {
5510 return None;
5511 }
5512 let sk = sort_key(encode_int(*n));
5513 let prop_name = prop.as_str();
5514 return match op {
5515 BinOpKind::Gt => Some((prop_name, Some((sk, false)), None)),
5516 BinOpKind::Ge => Some((prop_name, Some((sk, true)), None)),
5517 BinOpKind::Lt => Some((prop_name, None, Some((sk, false)))),
5518 BinOpKind::Le => Some((prop_name, None, Some((sk, true)))),
5519 _ => None,
5520 };
5521 }
5522
5523 if let (Expr::Literal(Literal::Int(n)), Expr::PropAccess { var, prop }) = (left, right) {
5525 if var.as_str() != node_var {
5526 return None;
5527 }
5528 let sk = sort_key(encode_int(*n));
5529 let prop_name = prop.as_str();
5530 return match op {
5532 BinOpKind::Gt => Some((prop_name, None, Some((sk, false)))),
5533 BinOpKind::Ge => Some((prop_name, None, Some((sk, true)))),
5534 BinOpKind::Lt => Some((prop_name, Some((sk, false)), None)),
5535 BinOpKind::Le => Some((prop_name, Some((sk, true)), None)),
5536 _ => None,
5537 };
5538 }
5539
5540 None
5541 }
5542
5543 if let Expr::BinOp {
5546 left,
5547 op: BinOpKind::And,
5548 right,
5549 } = expr
5550 {
5551 if let (Some((lp, llo, lhi)), Some((rp, rlo, rhi))) = (
5552 extract_single_bound(left, node_var),
5553 extract_single_bound(right, node_var),
5554 ) {
5555 if lp == rp {
5556 let col_id = prop_name_to_col_id(lp);
5557 if !prop_index.is_indexed(label_id, col_id) {
5558 return None;
5559 }
5560 let lo: Option<(u64, bool)> = match (llo, rlo) {
5566 (Some(a), Some(b)) => Some(std::cmp::max(a, b)),
5567 (Some(a), None) | (None, Some(a)) => Some(a),
5568 (None, None) => None,
5569 };
5570 let hi: Option<(u64, bool)> = match (lhi, rhi) {
5571 (Some(a), Some(b)) => Some(std::cmp::min(a, b)),
5572 (Some(a), None) | (None, Some(a)) => Some(a),
5573 (None, None) => None,
5574 };
5575 if lo.is_none() && hi.is_none() {
5577 return None;
5578 }
5579 return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
5580 }
5581 }
5582 }
5583
5584 if let Some((prop_name, lo, hi)) = extract_single_bound(expr, node_var) {
5586 let col_id = prop_name_to_col_id(prop_name);
5587 if !prop_index.is_indexed(label_id, col_id) {
5588 return None;
5589 }
5590 return Some(prop_index.lookup_range(label_id, col_id, lo, hi));
5591 }
5592
5593 None
5594}
5595
5596fn prop_name_to_col_id(name: &str) -> u32 {
5617 col_id_of(name)
5618}
5619
5620fn collect_col_ids_from_columns(column_names: &[String]) -> Vec<u32> {
5621 let mut ids = Vec::new();
5622 for name in column_names {
5623 let prop = name.split('.').next_back().unwrap_or(name.as_str());
5625 let col_id = prop_name_to_col_id(prop);
5626 if !ids.contains(&col_id) {
5627 ids.push(col_id);
5628 }
5629 }
5630 ids
5631}
5632
5633fn collect_col_ids_for_var(var: &str, column_names: &[String], _label_id: u32) -> Vec<u32> {
5639 let mut ids = Vec::new();
5640 for name in column_names {
5641 if let Some((v, prop)) = name.split_once('.') {
5643 if v == var {
5644 let col_id = prop_name_to_col_id(prop);
5645 if !ids.contains(&col_id) {
5646 ids.push(col_id);
5647 }
5648 }
5649 } else {
5650 let col_id = prop_name_to_col_id(name.as_str());
5652 if !ids.contains(&col_id) {
5653 ids.push(col_id);
5654 }
5655 }
5656 }
5657 if ids.is_empty() {
5658 ids.push(0);
5660 }
5661 ids
5662}
5663
5664fn read_node_props(
5676 store: &NodeStore,
5677 node_id: NodeId,
5678 col_ids: &[u32],
5679) -> sparrowdb_common::Result<Vec<(u32, u64)>> {
5680 if col_ids.is_empty() {
5681 return Ok(vec![]);
5682 }
5683 let nullable = store.get_node_raw_nullable(node_id, col_ids)?;
5684 Ok(nullable
5685 .into_iter()
5686 .filter_map(|(col_id, opt): (u32, Option<u64>)| opt.map(|v| (col_id, v)))
5687 .collect())
5688}
5689
5690fn decode_raw_val(raw: u64, store: &NodeStore) -> Value {
5697 match store.decode_raw_value(raw) {
5698 StoreValue::Int64(n) => Value::Int64(n),
5699 StoreValue::Bytes(b) => Value::String(String::from_utf8_lossy(&b).into_owned()),
5700 StoreValue::Float(f) => Value::Float64(f),
5701 }
5702}
5703
5704fn build_row_vals(
5705 props: &[(u32, u64)],
5706 var_name: &str,
5707 _col_ids: &[u32],
5708 store: &NodeStore,
5709) -> HashMap<String, Value> {
5710 let mut map = HashMap::new();
5711 for &(col_id, raw) in props {
5712 let key = format!("{var_name}.col_{col_id}");
5713 map.insert(key, decode_raw_val(raw, store));
5714 }
5715 map
5716}
5717
5718#[inline]
5724fn is_reserved_label(label: &str) -> bool {
5725 label.starts_with("__SO_")
5726}
5727
5728fn values_equal(a: &Value, b: &Value) -> bool {
5736 match (a, b) {
5737 (Value::Int64(x), Value::Int64(y)) => x == y,
5739 (Value::String(x), Value::String(y)) => x == y,
5745 (Value::Bool(x), Value::Bool(y)) => x == y,
5746 (Value::Float64(x), Value::Float64(y)) => x == y,
5747 (Value::Int64(raw), Value::String(s)) => *raw as u64 == string_to_raw_u64(s),
5751 (Value::String(s), Value::Int64(raw)) => string_to_raw_u64(s) == *raw as u64,
5752 (Value::Null, Value::Null) => true,
5754 _ => false,
5755 }
5756}
5757
5758fn eval_where(expr: &Expr, vals: &HashMap<String, Value>) -> bool {
5759 match expr {
5760 Expr::BinOp { left, op, right } => {
5761 let lv = eval_expr(left, vals);
5762 let rv = eval_expr(right, vals);
5763 match op {
5764 BinOpKind::Eq => values_equal(&lv, &rv),
5765 BinOpKind::Neq => !values_equal(&lv, &rv),
5766 BinOpKind::Contains => lv.contains(&rv),
5767 BinOpKind::StartsWith => {
5768 matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.starts_with(r.as_str()))
5769 }
5770 BinOpKind::EndsWith => {
5771 matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.ends_with(r.as_str()))
5772 }
5773 BinOpKind::Lt => match (&lv, &rv) {
5774 (Value::Int64(a), Value::Int64(b)) => a < b,
5775 _ => false,
5776 },
5777 BinOpKind::Le => match (&lv, &rv) {
5778 (Value::Int64(a), Value::Int64(b)) => a <= b,
5779 _ => false,
5780 },
5781 BinOpKind::Gt => match (&lv, &rv) {
5782 (Value::Int64(a), Value::Int64(b)) => a > b,
5783 _ => false,
5784 },
5785 BinOpKind::Ge => match (&lv, &rv) {
5786 (Value::Int64(a), Value::Int64(b)) => a >= b,
5787 _ => false,
5788 },
5789 _ => false,
5790 }
5791 }
5792 Expr::And(l, r) => eval_where(l, vals) && eval_where(r, vals),
5793 Expr::Or(l, r) => eval_where(l, vals) || eval_where(r, vals),
5794 Expr::Not(inner) => !eval_where(inner, vals),
5795 Expr::Literal(Literal::Bool(b)) => *b,
5796 Expr::Literal(_) => false,
5797 Expr::InList {
5798 expr,
5799 list,
5800 negated,
5801 } => {
5802 let lv = eval_expr(expr, vals);
5803 let matched = list
5804 .iter()
5805 .any(|item| values_equal(&lv, &eval_expr(item, vals)));
5806 if *negated {
5807 !matched
5808 } else {
5809 matched
5810 }
5811 }
5812 Expr::ListPredicate { .. } => {
5813 match eval_expr(expr, vals) {
5815 Value::Bool(b) => b,
5816 _ => false,
5817 }
5818 }
5819 Expr::IsNull(inner) => matches!(eval_expr(inner, vals), Value::Null),
5820 Expr::IsNotNull(inner) => !matches!(eval_expr(inner, vals), Value::Null),
5821 Expr::CaseWhen { .. } => matches!(eval_expr(expr, vals), Value::Bool(true)),
5823 Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
5826 false
5827 }
5828 _ => false, }
5830}
5831
5832fn eval_expr(expr: &Expr, vals: &HashMap<String, Value>) -> Value {
5833 match expr {
5834 Expr::PropAccess { var, prop } => {
5835 let key = format!("{var}.{prop}");
5837 if let Some(v) = vals.get(&key) {
5838 return v.clone();
5839 }
5840 let col_id = prop_name_to_col_id(prop);
5844 let fallback_key = format!("{var}.col_{col_id}");
5845 vals.get(&fallback_key).cloned().unwrap_or(Value::Null)
5846 }
5847 Expr::Var(v) => vals.get(v.as_str()).cloned().unwrap_or(Value::Null),
5848 Expr::Literal(lit) => match lit {
5849 Literal::Int(n) => Value::Int64(*n),
5850 Literal::Float(f) => Value::Float64(*f),
5851 Literal::Bool(b) => Value::Bool(*b),
5852 Literal::String(s) => Value::String(s.clone()),
5853 Literal::Param(p) => {
5854 vals.get(&format!("${p}")).cloned().unwrap_or(Value::Null)
5857 }
5858 Literal::Null => Value::Null,
5859 },
5860 Expr::FnCall { name, args } => {
5861 let name_lc = name.to_lowercase();
5865 if name_lc == "type" {
5866 if let Some(Expr::Var(var_name)) = args.first() {
5867 let meta_key = format!("{}.__type__", var_name);
5868 return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
5869 }
5870 }
5871 if name_lc == "labels" {
5872 if let Some(Expr::Var(var_name)) = args.first() {
5873 let meta_key = format!("{}.__labels__", var_name);
5874 return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
5875 }
5876 }
5877 if name_lc == "id" {
5880 if let Some(Expr::Var(var_name)) = args.first() {
5881 let id_key = format!("{}.__node_id__", var_name);
5883 if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
5884 return Value::Int64(nid.0 as i64);
5885 }
5886 if let Some(Value::NodeRef(nid)) = vals.get(var_name.as_str()) {
5888 return Value::Int64(nid.0 as i64);
5889 }
5890 return Value::Null;
5891 }
5892 }
5893 let evaluated: Vec<Value> = args.iter().map(|a| eval_expr(a, vals)).collect();
5895 crate::functions::dispatch_function(name, evaluated).unwrap_or(Value::Null)
5896 }
5897 Expr::BinOp { left, op, right } => {
5898 let lv = eval_expr(left, vals);
5900 let rv = eval_expr(right, vals);
5901 match op {
5902 BinOpKind::Eq => Value::Bool(lv == rv),
5903 BinOpKind::Neq => Value::Bool(lv != rv),
5904 BinOpKind::Lt => match (&lv, &rv) {
5905 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a < b),
5906 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a < b),
5907 _ => Value::Null,
5908 },
5909 BinOpKind::Le => match (&lv, &rv) {
5910 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a <= b),
5911 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a <= b),
5912 _ => Value::Null,
5913 },
5914 BinOpKind::Gt => match (&lv, &rv) {
5915 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a > b),
5916 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a > b),
5917 _ => Value::Null,
5918 },
5919 BinOpKind::Ge => match (&lv, &rv) {
5920 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a >= b),
5921 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a >= b),
5922 _ => Value::Null,
5923 },
5924 BinOpKind::Contains => match (&lv, &rv) {
5925 (Value::String(l), Value::String(r)) => Value::Bool(l.contains(r.as_str())),
5926 _ => Value::Null,
5927 },
5928 BinOpKind::StartsWith => match (&lv, &rv) {
5929 (Value::String(l), Value::String(r)) => Value::Bool(l.starts_with(r.as_str())),
5930 _ => Value::Null,
5931 },
5932 BinOpKind::EndsWith => match (&lv, &rv) {
5933 (Value::String(l), Value::String(r)) => Value::Bool(l.ends_with(r.as_str())),
5934 _ => Value::Null,
5935 },
5936 BinOpKind::And => match (&lv, &rv) {
5937 (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a && *b),
5938 _ => Value::Null,
5939 },
5940 BinOpKind::Or => match (&lv, &rv) {
5941 (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a || *b),
5942 _ => Value::Null,
5943 },
5944 BinOpKind::Add => match (&lv, &rv) {
5945 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a + b),
5946 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a + b),
5947 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 + b),
5948 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a + *b as f64),
5949 (Value::String(a), Value::String(b)) => Value::String(format!("{a}{b}")),
5950 _ => Value::Null,
5951 },
5952 BinOpKind::Sub => match (&lv, &rv) {
5953 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a - b),
5954 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a - b),
5955 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 - b),
5956 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a - *b as f64),
5957 _ => Value::Null,
5958 },
5959 BinOpKind::Mul => match (&lv, &rv) {
5960 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a * b),
5961 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a * b),
5962 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 * b),
5963 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a * *b as f64),
5964 _ => Value::Null,
5965 },
5966 BinOpKind::Div => match (&lv, &rv) {
5967 (Value::Int64(a), Value::Int64(b)) => {
5968 if *b == 0 {
5969 Value::Null
5970 } else {
5971 Value::Int64(a / b)
5972 }
5973 }
5974 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a / b),
5975 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 / b),
5976 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a / *b as f64),
5977 _ => Value::Null,
5978 },
5979 BinOpKind::Mod => match (&lv, &rv) {
5980 (Value::Int64(a), Value::Int64(b)) => {
5981 if *b == 0 {
5982 Value::Null
5983 } else {
5984 Value::Int64(a % b)
5985 }
5986 }
5987 _ => Value::Null,
5988 },
5989 }
5990 }
5991 Expr::Not(inner) => match eval_expr(inner, vals) {
5992 Value::Bool(b) => Value::Bool(!b),
5993 _ => Value::Null,
5994 },
5995 Expr::And(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
5996 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
5997 _ => Value::Null,
5998 },
5999 Expr::Or(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
6000 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
6001 _ => Value::Null,
6002 },
6003 Expr::InList {
6004 expr,
6005 list,
6006 negated,
6007 } => {
6008 let lv = eval_expr(expr, vals);
6009 let matched = list
6010 .iter()
6011 .any(|item| values_equal(&lv, &eval_expr(item, vals)));
6012 Value::Bool(if *negated { !matched } else { matched })
6013 }
6014 Expr::List(items) => {
6015 let evaluated: Vec<Value> = items.iter().map(|e| eval_expr(e, vals)).collect();
6016 Value::List(evaluated)
6017 }
6018 Expr::ListPredicate {
6019 kind,
6020 variable,
6021 list_expr,
6022 predicate,
6023 } => {
6024 let list_val = eval_expr(list_expr, vals);
6025 let items = match list_val {
6026 Value::List(v) => v,
6027 _ => return Value::Null,
6028 };
6029 let mut satisfied_count = 0usize;
6030 let mut scope = vals.clone();
6033 for item in &items {
6034 scope.insert(variable.clone(), item.clone());
6035 let result = eval_expr(predicate, &scope);
6036 if result == Value::Bool(true) {
6037 satisfied_count += 1;
6038 }
6039 }
6040 let result = match kind {
6041 ListPredicateKind::Any => satisfied_count > 0,
6042 ListPredicateKind::All => satisfied_count == items.len(),
6043 ListPredicateKind::None => satisfied_count == 0,
6044 ListPredicateKind::Single => satisfied_count == 1,
6045 };
6046 Value::Bool(result)
6047 }
6048 Expr::IsNull(inner) => Value::Bool(matches!(eval_expr(inner, vals), Value::Null)),
6049 Expr::IsNotNull(inner) => Value::Bool(!matches!(eval_expr(inner, vals), Value::Null)),
6050 Expr::CaseWhen {
6052 branches,
6053 else_expr,
6054 } => {
6055 for (cond, then_val) in branches {
6056 if let Value::Bool(true) = eval_expr(cond, vals) {
6057 return eval_expr(then_val, vals);
6058 }
6059 }
6060 else_expr
6061 .as_ref()
6062 .map(|e| eval_expr(e, vals))
6063 .unwrap_or(Value::Null)
6064 }
6065 Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
6067 Value::Null
6068 }
6069 }
6070}
6071
6072fn project_row(
6073 props: &[(u32, u64)],
6074 column_names: &[String],
6075 _col_ids: &[u32],
6076 var_name: &str,
6078 node_label: &str,
6080 store: &NodeStore,
6081) -> Vec<Value> {
6082 column_names
6083 .iter()
6084 .map(|col_name| {
6085 if let Some(inner) = col_name
6087 .strip_prefix("labels(")
6088 .and_then(|s| s.strip_suffix(')'))
6089 {
6090 if inner == var_name && !node_label.is_empty() {
6091 return Value::List(vec![Value::String(node_label.to_string())]);
6092 }
6093 return Value::Null;
6094 }
6095 let prop = col_name.split('.').next_back().unwrap_or(col_name.as_str());
6096 let col_id = prop_name_to_col_id(prop);
6097 props
6098 .iter()
6099 .find(|(c, _)| *c == col_id)
6100 .map(|(_, v)| decode_raw_val(*v, store))
6101 .unwrap_or(Value::Null)
6102 })
6103 .collect()
6104}
6105
6106#[allow(clippy::too_many_arguments)]
6107fn project_hop_row(
6108 src_props: &[(u32, u64)],
6109 dst_props: &[(u32, u64)],
6110 column_names: &[String],
6111 src_var: &str,
6112 _dst_var: &str,
6113 rel_var_type: Option<(&str, &str)>,
6115 src_label_meta: Option<(&str, &str)>,
6117 dst_label_meta: Option<(&str, &str)>,
6119 store: &NodeStore,
6120) -> Vec<Value> {
6121 column_names
6122 .iter()
6123 .map(|col_name| {
6124 if let Some(inner) = col_name
6126 .strip_prefix("type(")
6127 .and_then(|s| s.strip_suffix(')'))
6128 {
6129 if let Some((rel_var, rel_type)) = rel_var_type {
6131 if inner == rel_var {
6132 return Value::String(rel_type.to_string());
6133 }
6134 }
6135 return Value::Null;
6136 }
6137 if let Some(inner) = col_name
6139 .strip_prefix("labels(")
6140 .and_then(|s| s.strip_suffix(')'))
6141 {
6142 if let Some((meta_var, label)) = src_label_meta {
6143 if inner == meta_var {
6144 return Value::List(vec![Value::String(label.to_string())]);
6145 }
6146 }
6147 if let Some((meta_var, label)) = dst_label_meta {
6148 if inner == meta_var {
6149 return Value::List(vec![Value::String(label.to_string())]);
6150 }
6151 }
6152 return Value::Null;
6153 }
6154 if let Some((v, prop)) = col_name.split_once('.') {
6155 let col_id = prop_name_to_col_id(prop);
6156 let props = if v == src_var { src_props } else { dst_props };
6157 props
6158 .iter()
6159 .find(|(c, _)| *c == col_id)
6160 .map(|(_, val)| decode_raw_val(*val, store))
6161 .unwrap_or(Value::Null)
6162 } else {
6163 Value::Null
6164 }
6165 })
6166 .collect()
6167}
6168
6169fn project_fof_row(
6176 src_props: &[(u32, u64)],
6177 fof_props: &[(u32, u64)],
6178 column_names: &[String],
6179 src_var: &str,
6180 store: &NodeStore,
6181) -> Vec<Value> {
6182 column_names
6183 .iter()
6184 .map(|col_name| {
6185 if let Some((var, prop)) = col_name.split_once('.') {
6186 let col_id = prop_name_to_col_id(prop);
6187 let props = if !src_var.is_empty() && var == src_var {
6188 src_props
6189 } else {
6190 fof_props
6191 };
6192 props
6193 .iter()
6194 .find(|(c, _)| *c == col_id)
6195 .map(|(_, v)| decode_raw_val(*v, store))
6196 .unwrap_or(Value::Null)
6197 } else {
6198 Value::Null
6199 }
6200 })
6201 .collect()
6202}
6203
6204fn deduplicate_rows(rows: &mut Vec<Vec<Value>>) {
6205 let mut unique: Vec<Vec<Value>> = Vec::with_capacity(rows.len());
6208 for row in rows.drain(..) {
6209 if !unique.iter().any(|existing| existing == &row) {
6210 unique.push(row);
6211 }
6212 }
6213 *rows = unique;
6214}
6215
6216fn sort_spill_threshold() -> usize {
6218 std::env::var("SPARROWDB_SORT_SPILL_ROWS")
6219 .ok()
6220 .and_then(|v| v.parse().ok())
6221 .unwrap_or(crate::sort_spill::DEFAULT_ROW_THRESHOLD)
6222}
6223
6224fn make_sort_key(
6226 row: &[Value],
6227 order_by: &[(Expr, SortDir)],
6228 column_names: &[String],
6229) -> Vec<crate::sort_spill::SortKeyVal> {
6230 use crate::sort_spill::{OrdValue, SortKeyVal};
6231 order_by
6232 .iter()
6233 .map(|(expr, dir)| {
6234 let col_idx = match expr {
6235 Expr::PropAccess { var, prop } => {
6236 let key = format!("{var}.{prop}");
6237 column_names.iter().position(|c| c == &key)
6238 }
6239 Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
6240 _ => None,
6241 };
6242 let val = col_idx
6243 .and_then(|i| row.get(i))
6244 .map(OrdValue::from_value)
6245 .unwrap_or(OrdValue::Null);
6246 match dir {
6247 SortDir::Asc => SortKeyVal::Asc(val),
6248 SortDir::Desc => SortKeyVal::Desc(std::cmp::Reverse(val)),
6249 }
6250 })
6251 .collect()
6252}
6253
6254fn apply_order_by(rows: &mut Vec<Vec<Value>>, m: &MatchStatement, column_names: &[String]) {
6255 if m.order_by.is_empty() {
6256 return;
6257 }
6258
6259 let threshold = sort_spill_threshold();
6260
6261 if rows.len() <= threshold {
6262 rows.sort_by(|a, b| {
6263 for (expr, dir) in &m.order_by {
6264 let col_idx = match expr {
6265 Expr::PropAccess { var, prop } => {
6266 let key = format!("{var}.{prop}");
6267 column_names.iter().position(|c| c == &key)
6268 }
6269 Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
6270 _ => None,
6271 };
6272 if let Some(idx) = col_idx {
6273 if idx < a.len() && idx < b.len() {
6274 let cmp = compare_values(&a[idx], &b[idx]);
6275 let cmp = if *dir == SortDir::Desc {
6276 cmp.reverse()
6277 } else {
6278 cmp
6279 };
6280 if cmp != std::cmp::Ordering::Equal {
6281 return cmp;
6282 }
6283 }
6284 }
6285 }
6286 std::cmp::Ordering::Equal
6287 });
6288 } else {
6289 use crate::sort_spill::{SortableRow, SpillingSorter};
6290 let mut sorter: SpillingSorter<SortableRow> = SpillingSorter::new();
6291 for row in rows.drain(..) {
6292 let key = make_sort_key(&row, &m.order_by, column_names);
6293 if sorter.push(SortableRow { key, data: row }).is_err() {
6294 return;
6295 }
6296 }
6297 if let Ok(iter) = sorter.finish() {
6298 *rows = iter.map(|sr| sr.data).collect::<Vec<_>>();
6299 }
6300 }
6301}
6302
6303fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
6304 match (a, b) {
6305 (Value::Int64(x), Value::Int64(y)) => x.cmp(y),
6306 (Value::Float64(x), Value::Float64(y)) => {
6307 x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal)
6308 }
6309 (Value::String(x), Value::String(y)) => x.cmp(y),
6310 _ => std::cmp::Ordering::Equal,
6311 }
6312}
6313
6314fn is_aggregate_expr(expr: &Expr) -> bool {
6318 match expr {
6319 Expr::CountStar => true,
6320 Expr::FnCall { name, .. } => matches!(
6321 name.to_lowercase().as_str(),
6322 "count" | "sum" | "avg" | "min" | "max" | "collect"
6323 ),
6324 Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
6326 _ => false,
6327 }
6328}
6329
6330fn expr_has_collect(expr: &Expr) -> bool {
6332 match expr {
6333 Expr::FnCall { name, .. } => name.to_lowercase() == "collect",
6334 Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
6335 _ => false,
6336 }
6337}
6338
6339fn extract_collect_arg(expr: &Expr, row_vals: &HashMap<String, Value>) -> Value {
6345 match expr {
6346 Expr::FnCall { args, .. } if !args.is_empty() => eval_expr(&args[0], row_vals),
6347 Expr::ListPredicate { list_expr, .. } => extract_collect_arg(list_expr, row_vals),
6348 _ => Value::Null,
6349 }
6350}
6351
6352fn evaluate_aggregate_expr(
6358 expr: &Expr,
6359 accumulated_list: &Value,
6360 outer_vals: &HashMap<String, Value>,
6361) -> Value {
6362 match expr {
6363 Expr::FnCall { name, .. } if name.to_lowercase() == "collect" => accumulated_list.clone(),
6364 Expr::ListPredicate {
6365 kind,
6366 variable,
6367 predicate,
6368 ..
6369 } => {
6370 let items = match accumulated_list {
6371 Value::List(v) => v,
6372 _ => return Value::Null,
6373 };
6374 let mut satisfied_count = 0usize;
6375 for item in items {
6376 let mut scope = outer_vals.clone();
6377 scope.insert(variable.clone(), item.clone());
6378 let result = eval_expr(predicate, &scope);
6379 if result == Value::Bool(true) {
6380 satisfied_count += 1;
6381 }
6382 }
6383 let result = match kind {
6384 ListPredicateKind::Any => satisfied_count > 0,
6385 ListPredicateKind::All => satisfied_count == items.len(),
6386 ListPredicateKind::None => satisfied_count == 0,
6387 ListPredicateKind::Single => satisfied_count == 1,
6388 };
6389 Value::Bool(result)
6390 }
6391 _ => Value::Null,
6392 }
6393}
6394
6395fn has_aggregate_in_return(items: &[ReturnItem]) -> bool {
6397 items.iter().any(|item| is_aggregate_expr(&item.expr))
6398}
6399
6400fn needs_node_ref_in_return(items: &[ReturnItem]) -> bool {
6411 items.iter().any(|item| {
6412 matches!(&item.expr, Expr::FnCall { name, .. } if name.to_lowercase() == "id")
6413 || matches!(&item.expr, Expr::Var(_))
6414 || expr_needs_graph(&item.expr)
6415 || expr_needs_eval_path(&item.expr)
6416 })
6417}
6418
6419fn expr_needs_eval_path(expr: &Expr) -> bool {
6431 match expr {
6432 Expr::FnCall { name, args } => {
6433 let name_lc = name.to_lowercase();
6434 if matches!(
6436 name_lc.as_str(),
6437 "count" | "sum" | "avg" | "min" | "max" | "collect"
6438 ) {
6439 return false;
6440 }
6441 let _ = args; true
6447 }
6448 Expr::BinOp { left, right, .. } => {
6450 expr_needs_eval_path(left) || expr_needs_eval_path(right)
6451 }
6452 Expr::And(l, r) | Expr::Or(l, r) => expr_needs_eval_path(l) || expr_needs_eval_path(r),
6453 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
6454 expr_needs_eval_path(inner)
6455 }
6456 _ => false,
6457 }
6458}
6459
6460fn bare_var_names_in_return(items: &[ReturnItem]) -> Vec<String> {
6465 items
6466 .iter()
6467 .filter_map(|item| {
6468 if let Expr::Var(v) = &item.expr {
6469 Some(v.clone())
6470 } else {
6471 None
6472 }
6473 })
6474 .collect()
6475}
6476
6477fn build_node_map(props: &[(u32, u64)], store: &NodeStore) -> Value {
6482 let entries: Vec<(String, Value)> = props
6483 .iter()
6484 .map(|&(col_id, raw)| (format!("col_{col_id}"), decode_raw_val(raw, store)))
6485 .collect();
6486 Value::Map(entries)
6487}
6488
6489#[derive(Debug, Clone, PartialEq)]
6491enum AggKind {
6492 Key,
6494 CountStar,
6495 Count,
6496 Sum,
6497 Avg,
6498 Min,
6499 Max,
6500 Collect,
6501}
6502
6503fn agg_kind(expr: &Expr) -> AggKind {
6504 match expr {
6505 Expr::CountStar => AggKind::CountStar,
6506 Expr::FnCall { name, .. } => match name.to_lowercase().as_str() {
6507 "count" => AggKind::Count,
6508 "sum" => AggKind::Sum,
6509 "avg" => AggKind::Avg,
6510 "min" => AggKind::Min,
6511 "max" => AggKind::Max,
6512 "collect" => AggKind::Collect,
6513 _ => AggKind::Key,
6514 },
6515 Expr::ListPredicate { list_expr, .. } if expr_has_collect(list_expr) => AggKind::Collect,
6517 _ => AggKind::Key,
6518 }
6519}
6520
6521fn expr_needs_graph(expr: &Expr) -> bool {
6530 match expr {
6531 Expr::ShortestPath(_) | Expr::ExistsSubquery(_) | Expr::CaseWhen { .. } => true,
6532 Expr::And(l, r) | Expr::Or(l, r) => expr_needs_graph(l) || expr_needs_graph(r),
6533 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => expr_needs_graph(inner),
6534 Expr::BinOp { left, right, .. } => expr_needs_graph(left) || expr_needs_graph(right),
6535 _ => false,
6536 }
6537}
6538
6539fn aggregate_rows(rows: &[HashMap<String, Value>], return_items: &[ReturnItem]) -> Vec<Vec<Value>> {
6540 let kinds: Vec<AggKind> = return_items
6542 .iter()
6543 .map(|item| agg_kind(&item.expr))
6544 .collect();
6545
6546 let key_indices: Vec<usize> = kinds
6547 .iter()
6548 .enumerate()
6549 .filter(|(_, k)| **k == AggKind::Key)
6550 .map(|(i, _)| i)
6551 .collect();
6552
6553 let agg_indices: Vec<usize> = kinds
6554 .iter()
6555 .enumerate()
6556 .filter(|(_, k)| **k != AggKind::Key)
6557 .map(|(i, _)| i)
6558 .collect();
6559
6560 if agg_indices.is_empty() {
6562 return rows
6563 .iter()
6564 .map(|row_vals| {
6565 return_items
6566 .iter()
6567 .map(|item| eval_expr(&item.expr, row_vals))
6568 .collect()
6569 })
6570 .collect();
6571 }
6572
6573 let mut group_keys: Vec<Vec<Value>> = Vec::new();
6575 let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new();
6577
6578 for row_vals in rows {
6579 let key: Vec<Value> = key_indices
6580 .iter()
6581 .map(|&i| eval_expr(&return_items[i].expr, row_vals))
6582 .collect();
6583
6584 let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
6585 pos
6586 } else {
6587 group_keys.push(key);
6588 group_accum.push(vec![vec![]; agg_indices.len()]);
6589 group_keys.len() - 1
6590 };
6591
6592 for (ai, &ri) in agg_indices.iter().enumerate() {
6593 match &kinds[ri] {
6594 AggKind::CountStar => {
6595 group_accum[group_idx][ai].push(Value::Int64(1));
6597 }
6598 AggKind::Count | AggKind::Sum | AggKind::Avg | AggKind::Min | AggKind::Max => {
6599 let arg_val = match &return_items[ri].expr {
6600 Expr::FnCall { args, .. } if !args.is_empty() => {
6601 eval_expr(&args[0], row_vals)
6602 }
6603 _ => Value::Null,
6604 };
6605 if !matches!(arg_val, Value::Null) {
6607 group_accum[group_idx][ai].push(arg_val);
6608 }
6609 }
6610 AggKind::Collect => {
6611 let arg_val = extract_collect_arg(&return_items[ri].expr, row_vals);
6614 if !matches!(arg_val, Value::Null) {
6616 group_accum[group_idx][ai].push(arg_val);
6617 }
6618 }
6619 AggKind::Key => unreachable!(),
6620 }
6621 }
6622 }
6623
6624 if group_keys.is_empty() && key_indices.is_empty() {
6626 let empty_vals: HashMap<String, Value> = HashMap::new();
6627 let row: Vec<Value> = return_items
6628 .iter()
6629 .zip(kinds.iter())
6630 .map(|(item, k)| match k {
6631 AggKind::CountStar | AggKind::Count | AggKind::Sum => Value::Int64(0),
6632 AggKind::Avg | AggKind::Min | AggKind::Max => Value::Null,
6633 AggKind::Collect => {
6634 evaluate_aggregate_expr(&item.expr, &Value::List(vec![]), &empty_vals)
6635 }
6636 AggKind::Key => Value::Null,
6637 })
6638 .collect();
6639 return vec![row];
6640 }
6641
6642 if group_keys.is_empty() {
6644 return vec![];
6645 }
6646
6647 let mut out: Vec<Vec<Value>> = Vec::with_capacity(group_keys.len());
6649 for (gi, key_vals) in group_keys.into_iter().enumerate() {
6650 let mut output_row: Vec<Value> = Vec::with_capacity(return_items.len());
6651 let mut ki = 0usize;
6652 let mut ai = 0usize;
6653 let outer_vals: HashMap<String, Value> = key_indices
6655 .iter()
6656 .enumerate()
6657 .map(|(pos, &i)| {
6658 let name = return_items[i]
6659 .alias
6660 .clone()
6661 .unwrap_or_else(|| format!("_k{i}"));
6662 (name, key_vals[pos].clone())
6663 })
6664 .collect();
6665 for col_idx in 0..return_items.len() {
6666 if kinds[col_idx] == AggKind::Key {
6667 output_row.push(key_vals[ki].clone());
6668 ki += 1;
6669 } else {
6670 let accumulated = Value::List(group_accum[gi][ai].clone());
6671 let result = if kinds[col_idx] == AggKind::Collect {
6672 evaluate_aggregate_expr(&return_items[col_idx].expr, &accumulated, &outer_vals)
6673 } else {
6674 finalize_aggregate(&kinds[col_idx], &group_accum[gi][ai])
6675 };
6676 output_row.push(result);
6677 ai += 1;
6678 }
6679 }
6680 out.push(output_row);
6681 }
6682 out
6683}
6684
6685fn finalize_aggregate(kind: &AggKind, vals: &[Value]) -> Value {
6687 match kind {
6688 AggKind::CountStar | AggKind::Count => Value::Int64(vals.len() as i64),
6689 AggKind::Sum => {
6690 let mut sum_i: i64 = 0;
6691 let mut sum_f: f64 = 0.0;
6692 let mut is_float = false;
6693 for v in vals {
6694 match v {
6695 Value::Int64(n) => sum_i += n,
6696 Value::Float64(f) => {
6697 is_float = true;
6698 sum_f += f;
6699 }
6700 _ => {}
6701 }
6702 }
6703 if is_float {
6704 Value::Float64(sum_f + sum_i as f64)
6705 } else {
6706 Value::Int64(sum_i)
6707 }
6708 }
6709 AggKind::Avg => {
6710 if vals.is_empty() {
6711 return Value::Null;
6712 }
6713 let mut sum: f64 = 0.0;
6714 let mut count: i64 = 0;
6715 for v in vals {
6716 match v {
6717 Value::Int64(n) => {
6718 sum += *n as f64;
6719 count += 1;
6720 }
6721 Value::Float64(f) => {
6722 sum += f;
6723 count += 1;
6724 }
6725 _ => {}
6726 }
6727 }
6728 if count == 0 {
6729 Value::Null
6730 } else {
6731 Value::Float64(sum / count as f64)
6732 }
6733 }
6734 AggKind::Min => vals
6735 .iter()
6736 .fold(None::<Value>, |acc, v| match (acc, v) {
6737 (None, v) => Some(v.clone()),
6738 (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.min(*b))),
6739 (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.min(*b))),
6740 (Some(Value::String(a)), Value::String(b)) => {
6741 Some(Value::String(if a <= *b { a } else { b.clone() }))
6742 }
6743 (Some(a), _) => Some(a),
6744 })
6745 .unwrap_or(Value::Null),
6746 AggKind::Max => vals
6747 .iter()
6748 .fold(None::<Value>, |acc, v| match (acc, v) {
6749 (None, v) => Some(v.clone()),
6750 (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.max(*b))),
6751 (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.max(*b))),
6752 (Some(Value::String(a)), Value::String(b)) => {
6753 Some(Value::String(if a >= *b { a } else { b.clone() }))
6754 }
6755 (Some(a), _) => Some(a),
6756 })
6757 .unwrap_or(Value::Null),
6758 AggKind::Collect => Value::List(vals.to_vec()),
6759 AggKind::Key => Value::Null,
6760 }
6761}
6762
6763fn eval_expr_to_string(expr: &Expr) -> Result<String> {
6770 match expr {
6771 Expr::Literal(Literal::String(s)) => Ok(s.clone()),
6772 Expr::Literal(Literal::Param(p)) => Err(sparrowdb_common::Error::InvalidArgument(format!(
6773 "parameter ${p} requires runtime binding; pass a literal string instead"
6774 ))),
6775 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
6776 "procedure argument must be a string literal, got: {other:?}"
6777 ))),
6778 }
6779}
6780
6781fn expr_to_col_name(expr: &Expr) -> String {
6784 match expr {
6785 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
6786 Expr::Var(v) => v.clone(),
6787 _ => "value".to_owned(),
6788 }
6789}
6790
6791fn eval_call_expr(expr: &Expr, env: &HashMap<String, Value>, store: &NodeStore) -> Value {
6797 match expr {
6798 Expr::Var(v) => env.get(v.as_str()).cloned().unwrap_or(Value::Null),
6799 Expr::PropAccess { var, prop } => match env.get(var.as_str()) {
6800 Some(Value::NodeRef(node_id)) => {
6801 let col_id = prop_name_to_col_id(prop);
6802 read_node_props(store, *node_id, &[col_id])
6803 .ok()
6804 .and_then(|pairs| pairs.into_iter().find(|(c, _)| *c == col_id))
6805 .map(|(_, raw)| decode_raw_val(raw, store))
6806 .unwrap_or(Value::Null)
6807 }
6808 Some(other) => other.clone(),
6809 None => Value::Null,
6810 },
6811 Expr::Literal(lit) => match lit {
6812 Literal::Int(n) => Value::Int64(*n),
6813 Literal::Float(f) => Value::Float64(*f),
6814 Literal::Bool(b) => Value::Bool(*b),
6815 Literal::String(s) => Value::String(s.clone()),
6816 _ => Value::Null,
6817 },
6818 _ => Value::Null,
6819 }
6820}