1use std::collections::{HashMap, HashSet};
7use std::path::Path;
8
9use tracing::info_span;
10
11use sparrowdb_catalog::catalog::Catalog;
12use sparrowdb_common::{col_id_of, NodeId, Result};
13use sparrowdb_cypher::ast::{
14 BinOpKind, CallStatement, CreateStatement, Expr, ListPredicateKind, Literal,
15 MatchCreateStatement, MatchMergeRelStatement, MatchMutateStatement,
16 MatchOptionalMatchStatement, MatchStatement, MatchWithStatement, Mutation,
17 OptionalMatchStatement, PathPattern, PipelineStage, PipelineStatement, ReturnItem, SortDir,
18 Statement, UnionStatement, UnwindStatement, WithClause,
19};
20use sparrowdb_cypher::{bind, parse};
21use sparrowdb_storage::csr::{CsrBackward, CsrForward};
22use sparrowdb_storage::edge_store::{DeltaRecord, EdgeStore, RelTableId};
23use sparrowdb_storage::fulltext_index::FulltextIndex;
24use sparrowdb_storage::node_store::{NodeStore, Value as StoreValue};
25use sparrowdb_storage::property_index::PropertyIndex;
26use sparrowdb_storage::text_index::TextIndex;
27use sparrowdb_storage::wal::WalReplayer;
28
29use crate::types::{QueryResult, Value};
30
31#[derive(Debug, Clone, Copy)]
37enum RelTableLookup {
38 All,
40 Found(u32),
42 NotFound,
45}
46
47pub struct Engine {
49 pub store: NodeStore,
50 pub catalog: Catalog,
51 pub csrs: HashMap<u32, CsrForward>,
55 pub db_root: std::path::PathBuf,
56 pub params: HashMap<String, Value>,
58 pub prop_index: PropertyIndex,
63 pub text_index: TextIndex,
70 pub deadline: Option<std::time::Instant>,
77}
78
79impl Engine {
80 pub fn new(
86 store: NodeStore,
87 catalog: Catalog,
88 csrs: HashMap<u32, CsrForward>,
89 db_root: &Path,
90 ) -> Self {
91 let (prop_index, text_index) = match catalog.list_labels() {
94 Ok(labels) => {
95 let labels_u32: Vec<(u32, String)> = labels
97 .into_iter()
98 .map(|(id, name)| (id as u32, name))
99 .collect();
100 let pi = PropertyIndex::build(&store, &labels_u32);
101 let ti = TextIndex::build(&store, &labels_u32);
103 (pi, ti)
104 }
105 Err(e) => {
106 tracing::warn!(error = ?e, "SPA-249/SPA-251: index build failed; disabled");
107 (PropertyIndex::new(), TextIndex::new())
108 }
109 };
110 Engine {
111 store,
112 catalog,
113 csrs,
114 db_root: db_root.to_path_buf(),
115 params: HashMap::new(),
116 prop_index,
117 text_index,
118 deadline: None,
119 }
120 }
121
122 pub fn with_single_csr(
128 store: NodeStore,
129 catalog: Catalog,
130 csr: CsrForward,
131 db_root: &Path,
132 ) -> Self {
133 let mut csrs = HashMap::new();
134 csrs.insert(0u32, csr);
135 Self::new(store, catalog, csrs, db_root)
136 }
137
138 pub fn with_params(mut self, params: HashMap<String, Value>) -> Self {
143 self.params = params;
144 self
145 }
146
147 pub fn with_deadline(mut self, deadline: std::time::Instant) -> Self {
152 self.deadline = Some(deadline);
153 self
154 }
155
156 #[inline]
162 fn check_deadline(&self) -> sparrowdb_common::Result<()> {
163 if let Some(dl) = self.deadline {
164 if std::time::Instant::now() >= dl {
165 return Err(sparrowdb_common::Error::QueryTimeout);
166 }
167 }
168 Ok(())
169 }
170
171 fn resolve_rel_table_id(
180 &self,
181 src_label_id: u32,
182 dst_label_id: u32,
183 rel_type: &str,
184 ) -> RelTableLookup {
185 if rel_type.is_empty() {
186 return RelTableLookup::All;
187 }
188 match self
189 .catalog
190 .get_rel_table(src_label_id as u16, dst_label_id as u16, rel_type)
191 .ok()
192 .flatten()
193 {
194 Some(id) => RelTableLookup::Found(id as u32),
195 None => RelTableLookup::NotFound,
196 }
197 }
198
199 fn read_delta_for(&self, rel_table_id: u32) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
204 EdgeStore::open(&self.db_root, RelTableId(rel_table_id))
205 .and_then(|s| s.read_delta())
206 .unwrap_or_default()
207 }
208
209 fn read_delta_all(&self) -> Vec<sparrowdb_storage::edge_store::DeltaRecord> {
213 let ids = self.catalog.list_rel_table_ids();
214 if ids.is_empty() {
215 return EdgeStore::open(&self.db_root, RelTableId(0))
217 .and_then(|s| s.read_delta())
218 .unwrap_or_default();
219 }
220 ids.into_iter()
221 .flat_map(|(id, _, _, _)| {
222 EdgeStore::open(&self.db_root, RelTableId(id as u32))
223 .and_then(|s| s.read_delta())
224 .unwrap_or_default()
225 })
226 .collect()
227 }
228
229 fn csr_neighbors(&self, rel_table_id: u32, src_slot: u64) -> Vec<u64> {
231 self.csrs
232 .get(&rel_table_id)
233 .map(|csr| csr.neighbors(src_slot).to_vec())
234 .unwrap_or_default()
235 }
236
237 fn csr_neighbors_all(&self, src_slot: u64) -> Vec<u64> {
239 let mut out: Vec<u64> = Vec::new();
240 for csr in self.csrs.values() {
241 out.extend_from_slice(csr.neighbors(src_slot));
242 }
243 out
244 }
245
246 pub fn execute(&mut self, cypher: &str) -> Result<QueryResult> {
251 let stmt = {
252 let _parse_span = info_span!("sparrowdb.parse", cypher = cypher).entered();
253 parse(cypher)?
254 };
255
256 let bound = {
257 let _bind_span = info_span!("sparrowdb.bind").entered();
258 bind(stmt, &self.catalog)?
259 };
260
261 {
262 let _plan_span = info_span!("sparrowdb.plan_execute").entered();
263 self.execute_bound(bound.inner)
264 }
265 }
266
267 pub fn execute_statement(&mut self, stmt: Statement) -> Result<QueryResult> {
272 self.execute_bound(stmt)
273 }
274
275 fn execute_bound(&mut self, stmt: Statement) -> Result<QueryResult> {
276 match stmt {
277 Statement::Match(m) => self.execute_match(&m),
278 Statement::MatchWith(mw) => self.execute_match_with(&mw),
279 Statement::Unwind(u) => self.execute_unwind(&u),
280 Statement::Create(c) => self.execute_create(&c),
281 Statement::Merge(_)
285 | Statement::MatchMergeRel(_)
286 | Statement::MatchMutate(_)
287 | Statement::MatchCreate(_) => Err(sparrowdb_common::Error::InvalidArgument(
288 "mutation statements must be executed via execute_mutation".into(),
289 )),
290 Statement::OptionalMatch(om) => self.execute_optional_match(&om),
291 Statement::MatchOptionalMatch(mom) => self.execute_match_optional_match(&mom),
292 Statement::Union(u) => self.execute_union(u),
293 Statement::Checkpoint | Statement::Optimize => Ok(QueryResult::empty(vec![])),
294 Statement::Call(c) => self.execute_call(&c),
295 Statement::Pipeline(p) => self.execute_pipeline(&p),
296 }
297 }
298
299 fn execute_call(&self, c: &CallStatement) -> Result<QueryResult> {
306 match c.procedure.as_str() {
307 "db.index.fulltext.queryNodes" => self.call_fulltext_query_nodes(c),
308 "db.schema" => self.call_db_schema(c),
309 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
310 "unknown procedure: {other}"
311 ))),
312 }
313 }
314
315 fn call_fulltext_query_nodes(&self, c: &CallStatement) -> Result<QueryResult> {
324 if c.args.len() != 2 {
326 return Err(sparrowdb_common::Error::InvalidArgument(
327 "db.index.fulltext.queryNodes requires exactly 2 arguments: (indexName, query)"
328 .into(),
329 ));
330 }
331
332 let index_name = eval_expr_to_string(&c.args[0])?;
334 let query = eval_expr_to_string(&c.args[1])?;
336
337 let index = FulltextIndex::open(&self.db_root, &index_name)?;
340 let node_ids = index.search(&query);
341
342 let yield_cols: Vec<String> = if c.yield_columns.is_empty() {
345 vec!["node".to_owned()]
346 } else {
347 c.yield_columns.clone()
348 };
349
350 if let Some(bad_col) = yield_cols.iter().find(|c| c.as_str() != "node") {
352 return Err(sparrowdb_common::Error::InvalidArgument(format!(
353 "unsupported YIELD column for db.index.fulltext.queryNodes: {bad_col}"
354 )));
355 }
356
357 let mut rows: Vec<Vec<Value>> = Vec::new();
359 for raw_id in node_ids {
360 let node_id = sparrowdb_common::NodeId(raw_id);
361 let row: Vec<Value> = yield_cols.iter().map(|_| Value::NodeRef(node_id)).collect();
362 rows.push(row);
363 }
364
365 let (columns, rows) = if let Some(ref ret) = c.return_clause {
367 self.project_call_return(ret, &yield_cols, rows)?
368 } else {
369 (yield_cols, rows)
370 };
371
372 Ok(QueryResult { columns, rows })
373 }
374
375 fn call_db_schema(&self, c: &CallStatement) -> Result<QueryResult> {
386 if !c.args.is_empty() {
387 return Err(sparrowdb_common::Error::InvalidArgument(
388 "db.schema requires exactly 0 arguments".into(),
389 ));
390 }
391 let columns = vec![
392 "type".to_owned(),
393 "name".to_owned(),
394 "properties".to_owned(),
395 ];
396
397 let wal_dir = self.db_root.join("wal");
399 let schema = WalReplayer::scan_schema(&wal_dir)?;
400
401 let mut rows: Vec<Vec<Value>> = Vec::new();
402
403 let labels = self.catalog.list_labels()?;
405 for (label_id, label_name) in &labels {
406 let mut prop_names: Vec<String> = schema
407 .node_props
408 .get(&(*label_id as u32))
409 .map(|s| s.iter().cloned().collect())
410 .unwrap_or_default();
411 prop_names.sort();
412 let props_value = Value::List(prop_names.into_iter().map(Value::String).collect());
413 rows.push(vec![
414 Value::String("node".to_owned()),
415 Value::String(label_name.clone()),
416 props_value,
417 ]);
418 }
419
420 let rel_tables = self.catalog.list_rel_tables()?;
422 let mut seen_rel_types: std::collections::HashSet<String> =
424 std::collections::HashSet::new();
425 for (_, _, rel_type) in &rel_tables {
426 if seen_rel_types.insert(rel_type.clone()) {
427 let mut prop_names: Vec<String> = schema
428 .rel_props
429 .get(rel_type)
430 .map(|s| s.iter().cloned().collect())
431 .unwrap_or_default();
432 prop_names.sort();
433 let props_value = Value::List(prop_names.into_iter().map(Value::String).collect());
434 rows.push(vec![
435 Value::String("relationship".to_owned()),
436 Value::String(rel_type.clone()),
437 props_value,
438 ]);
439 }
440 }
441
442 Ok(QueryResult { columns, rows })
443 }
444
445 fn project_call_return(
455 &self,
456 ret: &sparrowdb_cypher::ast::ReturnClause,
457 yield_cols: &[String],
458 rows: Vec<Vec<Value>>,
459 ) -> Result<(Vec<String>, Vec<Vec<Value>>)> {
460 let out_cols: Vec<String> = ret
462 .items
463 .iter()
464 .map(|item| {
465 item.alias
466 .clone()
467 .unwrap_or_else(|| expr_to_col_name(&item.expr))
468 })
469 .collect();
470
471 let mut out_rows = Vec::new();
472 for row in rows {
473 let env: HashMap<String, Value> = yield_cols
475 .iter()
476 .zip(row.iter())
477 .map(|(k, v)| (k.clone(), v.clone()))
478 .collect();
479
480 let projected: Vec<Value> = ret
481 .items
482 .iter()
483 .map(|item| eval_call_expr(&item.expr, &env, &self.store))
484 .collect();
485 out_rows.push(projected);
486 }
487 Ok((out_cols, out_rows))
488 }
489
490 pub fn is_mutation(stmt: &Statement) -> bool {
495 match stmt {
496 Statement::Merge(_)
497 | Statement::MatchMergeRel(_)
498 | Statement::MatchMutate(_)
499 | Statement::MatchCreate(_) => true,
500 Statement::Create(_) => true,
504 _ => false,
505 }
506 }
507
508 pub fn scan_match_mutate(&self, mm: &MatchMutateStatement) -> Result<Vec<NodeId>> {
514 if mm.match_patterns.is_empty() {
515 return Ok(vec![]);
516 }
517
518 if mm.match_patterns.len() != 1 || !mm.match_patterns[0].rels.is_empty() {
522 return Err(sparrowdb_common::Error::InvalidArgument(
523 "MATCH...SET/DELETE currently supports only single-node patterns (no relationships)"
524 .into(),
525 ));
526 }
527
528 let pat = &mm.match_patterns[0];
529 if pat.nodes.is_empty() {
530 return Ok(vec![]);
531 }
532 let node_pat = &pat.nodes[0];
533 let label = node_pat.labels.first().cloned().unwrap_or_default();
534
535 let label_id = match self.catalog.get_label(&label)? {
536 Some(id) => id as u32,
537 None => return Ok(vec![]),
539 };
540
541 let hwm = self.store.hwm_for_label(label_id)?;
542
543 let filter_col_ids: Vec<u32> = node_pat
545 .props
546 .iter()
547 .map(|pe| prop_name_to_col_id(&pe.key))
548 .collect();
549
550 let mut all_col_ids: Vec<u32> = filter_col_ids;
552 if let Some(ref where_expr) = mm.where_clause {
553 collect_col_ids_from_expr(where_expr, &mut all_col_ids);
554 }
555
556 let var_name = node_pat.var.as_str();
557 let mut matching_ids = Vec::new();
558
559 for slot in 0..hwm {
560 let node_id = NodeId(((label_id as u64) << 32) | slot);
561
562 if self.is_node_tombstoned(node_id) {
565 continue;
566 }
567
568 let props = read_node_props(&self.store, node_id, &all_col_ids)?;
569
570 if !matches_prop_filter_static(
571 &props,
572 &node_pat.props,
573 &self.dollar_params(),
574 &self.store,
575 ) {
576 continue;
577 }
578
579 if let Some(ref where_expr) = mm.where_clause {
580 let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
581 row_vals.extend(self.dollar_params());
582 if !self.eval_where_graph(where_expr, &row_vals) {
583 continue;
584 }
585 }
586
587 matching_ids.push(node_id);
588 }
589
590 Ok(matching_ids)
591 }
592
593 pub fn mutation_from_match_mutate(mm: &MatchMutateStatement) -> &Mutation {
596 &mm.mutation
597 }
598
599 fn is_node_tombstoned(&self, node_id: NodeId) -> bool {
608 match self.store.get_node_raw(node_id, &[0u32]) {
609 Ok(col0) => col0.iter().any(|&(c, v)| c == 0 && v == u64::MAX),
610 Err(sparrowdb_common::Error::NotFound) => false,
611 Err(e) => {
612 tracing::warn!(
613 node_id = node_id.0,
614 error = ?e,
615 "tombstone check failed; treating node as not tombstoned"
616 );
617 false
618 }
619 }
620 }
621
622 fn node_matches_prop_filter(
629 &self,
630 node_id: NodeId,
631 filter_col_ids: &[u32],
632 props: &[sparrowdb_cypher::ast::PropEntry],
633 ) -> bool {
634 if props.is_empty() {
635 return true;
636 }
637 match self.store.get_node_raw(node_id, filter_col_ids) {
638 Ok(raw_props) => {
639 matches_prop_filter_static(&raw_props, props, &self.dollar_params(), &self.store)
640 }
641 Err(_) => false,
642 }
643 }
644
645 pub fn scan_match_create(
653 &self,
654 mc: &MatchCreateStatement,
655 ) -> Result<HashMap<String, Vec<NodeId>>> {
656 let mut var_candidates: HashMap<String, Vec<NodeId>> = HashMap::new();
657
658 for pat in &mc.match_patterns {
659 for node_pat in &pat.nodes {
660 if node_pat.var.is_empty() {
661 continue;
662 }
663 if var_candidates.contains_key(&node_pat.var) {
665 continue;
666 }
667
668 let label = node_pat.labels.first().cloned().unwrap_or_default();
669 let label_id: u32 = match self.catalog.get_label(&label)? {
670 Some(id) => id as u32,
671 None => {
672 var_candidates.insert(node_pat.var.clone(), vec![]);
674 continue;
675 }
676 };
677
678 let hwm = self.store.hwm_for_label(label_id)?;
679
680 let filter_col_ids: Vec<u32> = node_pat
682 .props
683 .iter()
684 .map(|p| prop_name_to_col_id(&p.key))
685 .collect();
686
687 let mut matching_ids: Vec<NodeId> = Vec::new();
688 for slot in 0..hwm {
689 let node_id = NodeId(((label_id as u64) << 32) | slot);
690
691 match self.store.get_node_raw(node_id, &[0u32]) {
694 Ok(col0) if col0.iter().any(|&(c, v)| c == 0 && v == u64::MAX) => {
695 continue;
696 }
697 Ok(_) | Err(_) => {}
698 }
699
700 if !node_pat.props.is_empty() {
702 match self.store.get_node_raw(node_id, &filter_col_ids) {
703 Ok(props) => {
704 if !matches_prop_filter_static(
705 &props,
706 &node_pat.props,
707 &self.dollar_params(),
708 &self.store,
709 ) {
710 continue;
711 }
712 }
713 Err(_) => continue,
716 }
717 }
718
719 matching_ids.push(node_id);
720 }
721
722 var_candidates.insert(node_pat.var.clone(), matching_ids);
723 }
724 }
725
726 Ok(var_candidates)
727 }
728
729 pub fn scan_match_create_rows(
751 &self,
752 mc: &MatchCreateStatement,
753 ) -> Result<Vec<HashMap<String, NodeId>>> {
754 let mut accumulated: Vec<HashMap<String, NodeId>> = vec![HashMap::new()];
756
757 for pat in &mc.match_patterns {
758 if pat.rels.is_empty() {
759 let mut per_var: Vec<(String, Vec<NodeId>)> = Vec::new();
764
765 for node_pat in &pat.nodes {
766 if node_pat.var.is_empty() {
767 continue;
768 }
769
770 let scan_label_ids: Vec<u32> = if node_pat.labels.is_empty() {
774 self.catalog
775 .list_labels()?
776 .into_iter()
777 .map(|(id, _)| id as u32)
778 .collect()
779 } else {
780 let label = node_pat.labels.first().cloned().unwrap_or_default();
781 match self.catalog.get_label(&label)? {
782 Some(id) => vec![id as u32],
783 None => {
784 return Ok(vec![]);
786 }
787 }
788 };
789
790 let filter_col_ids: Vec<u32> = node_pat
791 .props
792 .iter()
793 .map(|p| prop_name_to_col_id(&p.key))
794 .collect();
795
796 let mut matching_ids: Vec<NodeId> = Vec::new();
797 for label_id in scan_label_ids {
798 let hwm = self.store.hwm_for_label(label_id)?;
799 for slot in 0..hwm {
800 let node_id = NodeId(((label_id as u64) << 32) | slot);
801
802 if self.is_node_tombstoned(node_id) {
803 continue;
804 }
805 if !self.node_matches_prop_filter(
806 node_id,
807 &filter_col_ids,
808 &node_pat.props,
809 ) {
810 continue;
811 }
812
813 matching_ids.push(node_id);
814 }
815 }
816
817 if matching_ids.is_empty() {
818 return Ok(vec![]);
820 }
821
822 per_var.push((node_pat.var.clone(), matching_ids));
823 }
824
825 for (var, candidates) in per_var {
829 let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
830 for row in &accumulated {
831 for &node_id in &candidates {
832 let mut new_row = row.clone();
833 new_row.insert(var.clone(), node_id);
834 next.push(new_row);
835 }
836 }
837 accumulated = next;
838 }
839 } else if pat.rels.len() == 1 && pat.nodes.len() == 2 {
840 let src_node_pat = &pat.nodes[0];
843 let dst_node_pat = &pat.nodes[1];
844 let rel_pat = &pat.rels[0];
845
846 if rel_pat.dir != sparrowdb_cypher::ast::EdgeDir::Outgoing {
848 return Err(sparrowdb_common::Error::Unimplemented);
849 }
850
851 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
852 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
853
854 let src_label_id: u32 = match self.catalog.get_label(&src_label)? {
855 Some(id) => id as u32,
856 None => return Ok(vec![]),
857 };
858 let dst_label_id: u32 = match self.catalog.get_label(&dst_label)? {
859 Some(id) => id as u32,
860 None => return Ok(vec![]),
861 };
862
863 let src_filter_cols: Vec<u32> = src_node_pat
864 .props
865 .iter()
866 .map(|p| prop_name_to_col_id(&p.key))
867 .collect();
868 let dst_filter_cols: Vec<u32> = dst_node_pat
869 .props
870 .iter()
871 .map(|p| prop_name_to_col_id(&p.key))
872 .collect();
873
874 let rel_lookup =
876 self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
877 if matches!(rel_lookup, RelTableLookup::NotFound) {
878 return Ok(vec![]);
879 }
880
881 let delta_adj: HashMap<u64, Vec<u64>> = {
884 let records: Vec<DeltaRecord> = match rel_lookup {
885 RelTableLookup::Found(rtid) => self.read_delta_for(rtid),
886 _ => self.read_delta_all(),
887 };
888 let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
889 for r in records {
890 let s = r.src.0;
891 let s_label = (s >> 32) as u32;
892 if s_label == src_label_id {
893 let s_slot = s & 0xFFFF_FFFF;
894 adj.entry(s_slot).or_default().push(r.dst.0 & 0xFFFF_FFFF);
895 }
896 }
897 adj
898 };
899
900 let hwm_src = self.store.hwm_for_label(src_label_id)?;
901
902 let mut pattern_rows: Vec<HashMap<String, NodeId>> = Vec::new();
904
905 for src_slot in 0..hwm_src {
906 self.check_deadline()?;
908
909 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
910
911 if self.is_node_tombstoned(src_node) {
912 continue;
913 }
914 if !self.node_matches_prop_filter(
915 src_node,
916 &src_filter_cols,
917 &src_node_pat.props,
918 ) {
919 continue;
920 }
921
922 let csr_neighbors_vec: Vec<u64> = match rel_lookup {
924 RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
925 _ => self.csr_neighbors_all(src_slot),
926 };
927 let empty: Vec<u64> = Vec::new();
928 let delta_neighbors: &[u64] =
929 delta_adj.get(&src_slot).map_or(&empty, |v| v.as_slice());
930
931 let mut seen: HashSet<u64> = HashSet::new();
932 for &dst_slot in csr_neighbors_vec.iter().chain(delta_neighbors.iter()) {
933 if !seen.insert(dst_slot) {
934 continue;
935 }
936 let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
937
938 if self.is_node_tombstoned(dst_node) {
939 continue;
940 }
941 if !self.node_matches_prop_filter(
942 dst_node,
943 &dst_filter_cols,
944 &dst_node_pat.props,
945 ) {
946 continue;
947 }
948
949 let mut row: HashMap<String, NodeId> = HashMap::new();
950
951 if !src_node_pat.var.is_empty()
954 && !dst_node_pat.var.is_empty()
955 && src_node_pat.var == dst_node_pat.var
956 {
957 if src_node != dst_node {
958 continue;
959 }
960 row.insert(src_node_pat.var.clone(), src_node);
961 } else {
962 if !src_node_pat.var.is_empty() {
963 row.insert(src_node_pat.var.clone(), src_node);
964 }
965 if !dst_node_pat.var.is_empty() {
966 row.insert(dst_node_pat.var.clone(), dst_node);
967 }
968 }
969 pattern_rows.push(row);
970 }
971 }
972
973 if pattern_rows.is_empty() {
974 return Ok(vec![]);
975 }
976
977 let mut next: Vec<HashMap<String, NodeId>> = Vec::new();
981 for acc_row in &accumulated {
982 'outer: for pat_row in &pattern_rows {
983 for (k, v) in pat_row {
985 if let Some(existing) = acc_row.get(k) {
986 if existing != v {
987 continue 'outer;
988 }
989 }
990 }
991 let mut new_row = acc_row.clone();
992 new_row.extend(pat_row.iter().map(|(k, v)| (k.clone(), *v)));
993 next.push(new_row);
994 }
995 }
996 accumulated = next;
997 } else {
998 return Err(sparrowdb_common::Error::Unimplemented);
1000 }
1001 }
1002
1003 Ok(accumulated)
1004 }
1005
1006 pub fn scan_match_merge_rel_rows(
1010 &self,
1011 mm: &MatchMergeRelStatement,
1012 ) -> Result<Vec<HashMap<String, NodeId>>> {
1013 let proxy = MatchCreateStatement {
1016 match_patterns: mm.match_patterns.clone(),
1017 match_props: vec![],
1018 create: CreateStatement {
1019 nodes: vec![],
1020 edges: vec![],
1021 },
1022 };
1023 self.scan_match_create_rows(&proxy)
1024 }
1025
1026 fn execute_unwind(&self, u: &UnwindStatement) -> Result<QueryResult> {
1029 use crate::operators::{Operator, UnwindOperator};
1030
1031 let values = eval_list_expr(&u.expr, &self.params)?;
1033
1034 let column_names = extract_return_column_names(&u.return_clause.items);
1036
1037 if values.is_empty() {
1038 return Ok(QueryResult::empty(column_names));
1039 }
1040
1041 let mut op = UnwindOperator::new(u.alias.clone(), values);
1042 let chunks = op.collect_all()?;
1043
1044 let mut rows: Vec<Vec<Value>> = Vec::new();
1051 for chunk in &chunks {
1052 for group in &chunk.groups {
1053 let n = group.len();
1054 for row_idx in 0..n {
1055 let row = u
1056 .return_clause
1057 .items
1058 .iter()
1059 .map(|item| {
1060 let is_alias = match &item.expr {
1063 Expr::Var(name) => name == &u.alias,
1064 _ => false,
1065 };
1066 if is_alias {
1067 group.get_value(&u.alias, row_idx).unwrap_or(Value::Null)
1068 } else {
1069 Value::Null
1072 }
1073 })
1074 .collect();
1075 rows.push(row);
1076 }
1077 }
1078 }
1079
1080 Ok(QueryResult {
1081 columns: column_names,
1082 rows,
1083 })
1084 }
1085
1086 fn execute_create(&mut self, create: &CreateStatement) -> Result<QueryResult> {
1096 for node in &create.nodes {
1097 let label = node.labels.first().cloned().unwrap_or_default();
1099
1100 if is_reserved_label(&label) {
1102 return Err(sparrowdb_common::Error::InvalidArgument(format!(
1103 "invalid argument: label \"{label}\" is reserved — the __SO_ prefix is for internal use only"
1104 )));
1105 }
1106
1107 let label_id: u32 = match self.catalog.get_label(&label)? {
1108 Some(id) => id as u32,
1109 None => self.catalog.create_label(&label)? as u32,
1110 };
1111
1112 let empty_bindings: HashMap<String, Value> = HashMap::new();
1116 let props: Vec<(u32, StoreValue)> = node
1117 .props
1118 .iter()
1119 .map(|entry| {
1120 let col_id = prop_name_to_col_id(&entry.key);
1121 let val = eval_expr(&entry.value, &empty_bindings);
1122 let store_val = value_to_store_value(val);
1123 (col_id, store_val)
1124 })
1125 .collect();
1126
1127 self.store.create_node(label_id, &props)?;
1128 }
1129 Ok(QueryResult::empty(vec![]))
1130 }
1131
1132 fn execute_union(&mut self, u: UnionStatement) -> Result<QueryResult> {
1141 let left_result = self.execute_bound(*u.left)?;
1142 let right_result = self.execute_bound(*u.right)?;
1143
1144 if !left_result.columns.is_empty()
1146 && !right_result.columns.is_empty()
1147 && left_result.columns.len() != right_result.columns.len()
1148 {
1149 return Err(sparrowdb_common::Error::InvalidArgument(format!(
1150 "UNION: left side has {} columns, right side has {}",
1151 left_result.columns.len(),
1152 right_result.columns.len()
1153 )));
1154 }
1155
1156 let columns = if !left_result.columns.is_empty() {
1157 left_result.columns.clone()
1158 } else {
1159 right_result.columns.clone()
1160 };
1161
1162 let mut rows = left_result.rows;
1163 rows.extend(right_result.rows);
1164
1165 if !u.all {
1166 deduplicate_rows(&mut rows);
1167 }
1168
1169 Ok(QueryResult { columns, rows })
1170 }
1171
1172 fn execute_match_with(&self, m: &MatchWithStatement) -> Result<QueryResult> {
1181 let intermediate = self.collect_match_rows_for_with(
1183 &m.match_patterns,
1184 m.match_where.as_ref(),
1185 &m.with_clause,
1186 )?;
1187
1188 let has_agg = m
1192 .with_clause
1193 .items
1194 .iter()
1195 .any(|item| is_aggregate_expr(&item.expr));
1196
1197 let projected: Vec<HashMap<String, Value>> = if has_agg {
1198 let agg_rows = self.aggregate_with_items(&intermediate, &m.with_clause.items);
1200 agg_rows
1202 .into_iter()
1203 .filter(|with_vals| {
1204 if let Some(ref where_expr) = m.with_clause.where_clause {
1205 let mut with_vals_p = with_vals.clone();
1206 with_vals_p.extend(self.dollar_params());
1207 self.eval_where_graph(where_expr, &with_vals_p)
1208 } else {
1209 true
1210 }
1211 })
1212 .map(|mut with_vals| {
1213 with_vals.extend(self.dollar_params());
1214 with_vals
1215 })
1216 .collect()
1217 } else {
1218 let mut projected: Vec<HashMap<String, Value>> = Vec::new();
1220 for row_vals in &intermediate {
1221 let mut with_vals: HashMap<String, Value> = HashMap::new();
1222 for item in &m.with_clause.items {
1223 let val = self.eval_expr_graph(&item.expr, row_vals);
1224 with_vals.insert(item.alias.clone(), val);
1225 if let sparrowdb_cypher::ast::Expr::Var(ref src_var) = item.expr {
1229 if let Some(node_ref) = row_vals.get(src_var) {
1230 if matches!(node_ref, Value::NodeRef(_)) {
1231 with_vals.insert(item.alias.clone(), node_ref.clone());
1232 with_vals.insert(
1233 format!("{}.__node_id__", item.alias),
1234 node_ref.clone(),
1235 );
1236 }
1237 }
1238 let nid_key = format!("{src_var}.__node_id__");
1240 if let Some(node_ref) = row_vals.get(&nid_key) {
1241 with_vals
1242 .insert(format!("{}.__node_id__", item.alias), node_ref.clone());
1243 }
1244 }
1245 }
1246 if let Some(ref where_expr) = m.with_clause.where_clause {
1247 let mut with_vals_p = with_vals.clone();
1248 with_vals_p.extend(self.dollar_params());
1249 if !self.eval_where_graph(where_expr, &with_vals_p) {
1250 continue;
1251 }
1252 }
1253 with_vals.extend(self.dollar_params());
1256 projected.push(with_vals);
1257 }
1258 projected
1259 };
1260
1261 let column_names = extract_return_column_names(&m.return_clause.items);
1263
1264 let mut ordered_projected = projected;
1268 if !m.order_by.is_empty() {
1269 ordered_projected.sort_by(|a, b| {
1270 for (expr, dir) in &m.order_by {
1271 let val_a = eval_expr(expr, a);
1272 let val_b = eval_expr(expr, b);
1273 let cmp = compare_values(&val_a, &val_b);
1274 let cmp = if *dir == SortDir::Desc {
1275 cmp.reverse()
1276 } else {
1277 cmp
1278 };
1279 if cmp != std::cmp::Ordering::Equal {
1280 return cmp;
1281 }
1282 }
1283 std::cmp::Ordering::Equal
1284 });
1285 }
1286
1287 if let Some(skip) = m.skip {
1289 let skip = (skip as usize).min(ordered_projected.len());
1290 ordered_projected.drain(0..skip);
1291 }
1292 if let Some(lim) = m.limit {
1293 ordered_projected.truncate(lim as usize);
1294 }
1295
1296 let mut rows: Vec<Vec<Value>> = ordered_projected
1297 .iter()
1298 .map(|with_vals| {
1299 m.return_clause
1300 .items
1301 .iter()
1302 .map(|item| self.eval_expr_graph(&item.expr, with_vals))
1303 .collect()
1304 })
1305 .collect();
1306
1307 if m.distinct {
1308 deduplicate_rows(&mut rows);
1309 }
1310
1311 Ok(QueryResult {
1312 columns: column_names,
1313 rows,
1314 })
1315 }
1316
1317 fn aggregate_with_items(
1322 &self,
1323 rows: &[HashMap<String, Value>],
1324 items: &[sparrowdb_cypher::ast::WithItem],
1325 ) -> Vec<HashMap<String, Value>> {
1326 let key_indices: Vec<usize> = items
1328 .iter()
1329 .enumerate()
1330 .filter(|(_, item)| !is_aggregate_expr(&item.expr))
1331 .map(|(i, _)| i)
1332 .collect();
1333 let agg_indices: Vec<usize> = items
1334 .iter()
1335 .enumerate()
1336 .filter(|(_, item)| is_aggregate_expr(&item.expr))
1337 .map(|(i, _)| i)
1338 .collect();
1339
1340 let mut group_keys: Vec<Vec<Value>> = Vec::new();
1342 let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new(); for row_vals in rows {
1345 let key: Vec<Value> = key_indices
1346 .iter()
1347 .map(|&i| eval_expr(&items[i].expr, row_vals))
1348 .collect();
1349 let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
1350 pos
1351 } else {
1352 group_keys.push(key);
1353 group_accum.push(vec![vec![]; agg_indices.len()]);
1354 group_keys.len() - 1
1355 };
1356 for (ai, &ri) in agg_indices.iter().enumerate() {
1357 match &items[ri].expr {
1358 sparrowdb_cypher::ast::Expr::CountStar => {
1359 group_accum[group_idx][ai].push(Value::Int64(1));
1360 }
1361 sparrowdb_cypher::ast::Expr::FnCall { name, args }
1362 if name.to_lowercase() == "collect" =>
1363 {
1364 let val = if !args.is_empty() {
1365 eval_expr(&args[0], row_vals)
1366 } else {
1367 Value::Null
1368 };
1369 if !matches!(val, Value::Null) {
1370 group_accum[group_idx][ai].push(val);
1371 }
1372 }
1373 sparrowdb_cypher::ast::Expr::FnCall { name, args }
1374 if matches!(
1375 name.to_lowercase().as_str(),
1376 "count" | "sum" | "avg" | "min" | "max"
1377 ) =>
1378 {
1379 let val = if !args.is_empty() {
1380 eval_expr(&args[0], row_vals)
1381 } else {
1382 Value::Null
1383 };
1384 if !matches!(val, Value::Null) {
1385 group_accum[group_idx][ai].push(val);
1386 }
1387 }
1388 _ => {}
1389 }
1390 }
1391 }
1392
1393 if rows.is_empty() && key_indices.is_empty() {
1396 let mut out_row: HashMap<String, Value> = HashMap::new();
1397 for &ri in &agg_indices {
1398 let val = match &items[ri].expr {
1399 sparrowdb_cypher::ast::Expr::CountStar => Value::Int64(0),
1400 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1401 if name.to_lowercase() == "collect" =>
1402 {
1403 Value::List(vec![])
1404 }
1405 _ => Value::Int64(0),
1406 };
1407 out_row.insert(items[ri].alias.clone(), val);
1408 }
1409 return vec![out_row];
1410 }
1411
1412 let mut result: Vec<HashMap<String, Value>> = Vec::new();
1414 for (gi, key_vals) in group_keys.iter().enumerate() {
1415 let mut out_row: HashMap<String, Value> = HashMap::new();
1416 for (ki, &ri) in key_indices.iter().enumerate() {
1418 out_row.insert(items[ri].alias.clone(), key_vals[ki].clone());
1419 }
1420 for (ai, &ri) in agg_indices.iter().enumerate() {
1422 let accum = &group_accum[gi][ai];
1423 let val = match &items[ri].expr {
1424 sparrowdb_cypher::ast::Expr::CountStar => Value::Int64(accum.len() as i64),
1425 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1426 if name.to_lowercase() == "collect" =>
1427 {
1428 Value::List(accum.clone())
1429 }
1430 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1431 if name.to_lowercase() == "count" =>
1432 {
1433 Value::Int64(accum.len() as i64)
1434 }
1435 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1436 if name.to_lowercase() == "sum" =>
1437 {
1438 let sum: i64 = accum
1439 .iter()
1440 .filter_map(|v| {
1441 if let Value::Int64(n) = v {
1442 Some(*n)
1443 } else {
1444 None
1445 }
1446 })
1447 .sum();
1448 Value::Int64(sum)
1449 }
1450 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1451 if name.to_lowercase() == "min" =>
1452 {
1453 accum
1454 .iter()
1455 .min_by(|a, b| compare_values(a, b))
1456 .cloned()
1457 .unwrap_or(Value::Null)
1458 }
1459 sparrowdb_cypher::ast::Expr::FnCall { name, .. }
1460 if name.to_lowercase() == "max" =>
1461 {
1462 accum
1463 .iter()
1464 .max_by(|a, b| compare_values(a, b))
1465 .cloned()
1466 .unwrap_or(Value::Null)
1467 }
1468 _ => Value::Null,
1469 };
1470 out_row.insert(items[ri].alias.clone(), val);
1471 }
1472 result.push(out_row);
1473 }
1474 result
1475 }
1476
1477 fn execute_pipeline(&self, p: &PipelineStatement) -> Result<QueryResult> {
1482 let mut current_rows: Vec<HashMap<String, Value>> =
1484 if let Some((expr, alias)) = &p.leading_unwind {
1485 let values = eval_list_expr(expr, &self.params)?;
1487 values
1488 .into_iter()
1489 .map(|v| {
1490 let mut m = HashMap::new();
1491 m.insert(alias.clone(), v);
1492 m
1493 })
1494 .collect()
1495 } else if let Some(ref patterns) = p.leading_match {
1496 self.collect_pipeline_match_rows(patterns, p.leading_where.as_ref())?
1501 } else {
1502 vec![HashMap::new()]
1503 };
1504
1505 for stage in &p.stages {
1507 match stage {
1508 PipelineStage::With {
1509 clause,
1510 order_by,
1511 skip,
1512 limit,
1513 } => {
1514 if !order_by.is_empty() {
1518 current_rows.sort_by(|a, b| {
1519 for (expr, dir) in order_by {
1520 let va = eval_expr(expr, a);
1521 let vb = eval_expr(expr, b);
1522 let cmp = compare_values(&va, &vb);
1523 let cmp = if *dir == SortDir::Desc {
1524 cmp.reverse()
1525 } else {
1526 cmp
1527 };
1528 if cmp != std::cmp::Ordering::Equal {
1529 return cmp;
1530 }
1531 }
1532 std::cmp::Ordering::Equal
1533 });
1534 }
1535 if let Some(s) = skip {
1536 let s = (*s as usize).min(current_rows.len());
1537 current_rows.drain(0..s);
1538 }
1539 if let Some(l) = limit {
1540 current_rows.truncate(*l as usize);
1541 }
1542
1543 let has_agg = clause
1545 .items
1546 .iter()
1547 .any(|item| is_aggregate_expr(&item.expr));
1548 let next_rows: Vec<HashMap<String, Value>> = if has_agg {
1549 let agg_rows = self.aggregate_with_items(¤t_rows, &clause.items);
1550 agg_rows
1551 .into_iter()
1552 .filter(|with_vals| {
1553 if let Some(ref where_expr) = clause.where_clause {
1554 let mut wv = with_vals.clone();
1555 wv.extend(self.dollar_params());
1556 self.eval_where_graph(where_expr, &wv)
1557 } else {
1558 true
1559 }
1560 })
1561 .map(|mut with_vals| {
1562 with_vals.extend(self.dollar_params());
1563 with_vals
1564 })
1565 .collect()
1566 } else {
1567 let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
1568 for row_vals in ¤t_rows {
1569 let mut with_vals: HashMap<String, Value> = HashMap::new();
1570 for item in &clause.items {
1571 let val = self.eval_expr_graph(&item.expr, row_vals);
1572 with_vals.insert(item.alias.clone(), val);
1573 if let sparrowdb_cypher::ast::Expr::Var(ref src_var) = item.expr {
1575 if let Some(nr @ Value::NodeRef(_)) = row_vals.get(src_var) {
1576 with_vals.insert(item.alias.clone(), nr.clone());
1577 with_vals.insert(
1578 format!("{}.__node_id__", item.alias),
1579 nr.clone(),
1580 );
1581 }
1582 let nid_key = format!("{src_var}.__node_id__");
1583 if let Some(nr) = row_vals.get(&nid_key) {
1584 with_vals.insert(
1585 format!("{}.__node_id__", item.alias),
1586 nr.clone(),
1587 );
1588 }
1589 }
1590 }
1591 if let Some(ref where_expr) = clause.where_clause {
1592 let mut wv = with_vals.clone();
1593 wv.extend(self.dollar_params());
1594 if !self.eval_where_graph(where_expr, &wv) {
1595 continue;
1596 }
1597 }
1598 with_vals.extend(self.dollar_params());
1599 next_rows.push(with_vals);
1600 }
1601 next_rows
1602 };
1603 current_rows = next_rows;
1604 }
1605 PipelineStage::Match {
1606 patterns,
1607 where_clause,
1608 } => {
1609 let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
1612 for binding in ¤t_rows {
1613 let new_rows = self.execute_pipeline_match_stage(
1614 patterns,
1615 where_clause.as_ref(),
1616 binding,
1617 )?;
1618 next_rows.extend(new_rows);
1619 }
1620 current_rows = next_rows;
1621 }
1622 PipelineStage::Unwind { alias, new_alias } => {
1623 let mut next_rows: Vec<HashMap<String, Value>> = Vec::new();
1625 for row_vals in ¤t_rows {
1626 let list_val = row_vals.get(alias.as_str()).cloned().unwrap_or(Value::Null);
1627 let items = match list_val {
1628 Value::List(v) => v,
1629 other => vec![other],
1630 };
1631 for item in items {
1632 let mut new_row = row_vals.clone();
1633 new_row.insert(new_alias.clone(), item);
1634 next_rows.push(new_row);
1635 }
1636 }
1637 current_rows = next_rows;
1638 }
1639 }
1640 }
1641
1642 let column_names = extract_return_column_names(&p.return_clause.items);
1644
1645 if !p.return_order_by.is_empty() {
1647 current_rows.sort_by(|a, b| {
1648 for (expr, dir) in &p.return_order_by {
1649 let va = eval_expr(expr, a);
1650 let vb = eval_expr(expr, b);
1651 let cmp = compare_values(&va, &vb);
1652 let cmp = if *dir == SortDir::Desc {
1653 cmp.reverse()
1654 } else {
1655 cmp
1656 };
1657 if cmp != std::cmp::Ordering::Equal {
1658 return cmp;
1659 }
1660 }
1661 std::cmp::Ordering::Equal
1662 });
1663 }
1664
1665 if let Some(skip) = p.return_skip {
1666 let skip = (skip as usize).min(current_rows.len());
1667 current_rows.drain(0..skip);
1668 }
1669 if let Some(lim) = p.return_limit {
1670 current_rows.truncate(lim as usize);
1671 }
1672
1673 let mut rows: Vec<Vec<Value>> = current_rows
1674 .iter()
1675 .map(|row_vals| {
1676 p.return_clause
1677 .items
1678 .iter()
1679 .map(|item| self.eval_expr_graph(&item.expr, row_vals))
1680 .collect()
1681 })
1682 .collect();
1683
1684 if p.distinct {
1685 deduplicate_rows(&mut rows);
1686 }
1687
1688 Ok(QueryResult {
1689 columns: column_names,
1690 rows,
1691 })
1692 }
1693
1694 fn collect_pipeline_match_rows(
1700 &self,
1701 patterns: &[PathPattern],
1702 where_clause: Option<&Expr>,
1703 ) -> Result<Vec<HashMap<String, Value>>> {
1704 if patterns.is_empty() {
1705 return Ok(vec![HashMap::new()]);
1706 }
1707
1708 let pat = &patterns[0];
1710 let node = &pat.nodes[0];
1711 let var_name = node.var.as_str();
1712 let label = node.labels.first().cloned().unwrap_or_default();
1713
1714 let label_id = match self.catalog.get_label(&label)? {
1715 Some(id) => id as u32,
1716 None => return Ok(vec![]),
1717 };
1718 let hwm = self.store.hwm_for_label(label_id)?;
1719 let col_ids: Vec<u32> = self.store.col_ids_for_label(label_id).unwrap_or_default();
1720
1721 let mut result: Vec<HashMap<String, Value>> = Vec::new();
1722 for slot in 0..hwm {
1723 let node_id = NodeId(((label_id as u64) << 32) | slot);
1724 if self.is_node_tombstoned(node_id) {
1725 continue;
1726 }
1727 let props = match self.store.get_node_raw(node_id, &col_ids) {
1728 Ok(p) => p,
1729 Err(_) => continue,
1730 };
1731 if !self.matches_prop_filter(&props, &node.props) {
1732 continue;
1733 }
1734 let mut row_vals = build_row_vals(&props, var_name, &col_ids, &self.store);
1735 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
1737 row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
1738
1739 if let Some(wexpr) = where_clause {
1740 let mut row_vals_p = row_vals.clone();
1741 row_vals_p.extend(self.dollar_params());
1742 if !self.eval_where_graph(wexpr, &row_vals_p) {
1743 continue;
1744 }
1745 }
1746 result.push(row_vals);
1747 }
1748 Ok(result)
1749 }
1750
1751 fn execute_pipeline_match_stage(
1760 &self,
1761 patterns: &[PathPattern],
1762 where_clause: Option<&Expr>,
1763 binding: &HashMap<String, Value>,
1764 ) -> Result<Vec<HashMap<String, Value>>> {
1765 if patterns.is_empty() {
1766 return Ok(vec![binding.clone()]);
1767 }
1768
1769 let pat = &patterns[0];
1770
1771 if !pat.rels.is_empty() {
1773 return self.execute_pipeline_match_hop(pat, where_clause, binding);
1776 }
1777
1778 let node = &pat.nodes[0];
1779 let var_name = node.var.as_str();
1780 let label = node.labels.first().cloned().unwrap_or_default();
1781
1782 let label_id = match self.catalog.get_label(&label)? {
1783 Some(id) => id as u32,
1784 None => return Ok(vec![]),
1785 };
1786 let hwm = self.store.hwm_for_label(label_id)?;
1787 let col_ids: Vec<u32> = self.store.col_ids_for_label(label_id).unwrap_or_default();
1788
1789 let mut result: Vec<HashMap<String, Value>> = Vec::new();
1790 let params = self.dollar_params();
1791 for slot in 0..hwm {
1792 let node_id = NodeId(((label_id as u64) << 32) | slot);
1793 if self.is_node_tombstoned(node_id) {
1794 continue;
1795 }
1796 let props = match self.store.get_node_raw(node_id, &col_ids) {
1797 Ok(p) => p,
1798 Err(_) => continue,
1799 };
1800
1801 if !self.matches_prop_filter_with_binding(&props, &node.props, binding, ¶ms) {
1803 continue;
1804 }
1805
1806 let mut row_vals = build_row_vals(&props, var_name, &col_ids, &self.store);
1807 row_vals.extend(binding.clone());
1809 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
1810 row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
1811
1812 if let Some(wexpr) = where_clause {
1813 let mut row_vals_p = row_vals.clone();
1814 row_vals_p.extend(params.clone());
1815 if !self.eval_where_graph(wexpr, &row_vals_p) {
1816 continue;
1817 }
1818 }
1819 result.push(row_vals);
1820 }
1821 Ok(result)
1822 }
1823
1824 fn execute_pipeline_match_hop(
1829 &self,
1830 pat: &sparrowdb_cypher::ast::PathPattern,
1831 where_clause: Option<&Expr>,
1832 binding: &HashMap<String, Value>,
1833 ) -> Result<Vec<HashMap<String, Value>>> {
1834 if pat.nodes.len() < 2 || pat.rels.is_empty() {
1835 return Ok(vec![]);
1836 }
1837 let src_pat = &pat.nodes[0];
1838 let dst_pat = &pat.nodes[1];
1839 let rel_pat = &pat.rels[0];
1840
1841 let src_label = src_pat.labels.first().cloned().unwrap_or_default();
1842 let dst_label = dst_pat.labels.first().cloned().unwrap_or_default();
1843
1844 let src_label_id = match self.catalog.get_label(&src_label)? {
1845 Some(id) => id as u32,
1846 None => return Ok(vec![]),
1847 };
1848 let dst_label_id = match self.catalog.get_label(&dst_label)? {
1849 Some(id) => id as u32,
1850 None => return Ok(vec![]),
1851 };
1852
1853 let src_col_ids: Vec<u32> = self
1854 .store
1855 .col_ids_for_label(src_label_id)
1856 .unwrap_or_default();
1857 let dst_col_ids: Vec<u32> = self
1858 .store
1859 .col_ids_for_label(dst_label_id)
1860 .unwrap_or_default();
1861 let params = self.dollar_params();
1862
1863 let src_candidates: Vec<NodeId> = {
1865 let bound_src = binding
1867 .get(&src_pat.var)
1868 .or_else(|| binding.get(&format!("{}.__node_id__", src_pat.var)));
1869 if let Some(Value::NodeRef(nid)) = bound_src {
1870 vec![*nid]
1871 } else {
1872 let hwm = self.store.hwm_for_label(src_label_id)?;
1873 let mut cands = Vec::new();
1874 for slot in 0..hwm {
1875 let node_id = NodeId(((src_label_id as u64) << 32) | slot);
1876 if self.is_node_tombstoned(node_id) {
1877 continue;
1878 }
1879 if let Ok(props) = self.store.get_node_raw(node_id, &src_col_ids) {
1880 if self.matches_prop_filter_with_binding(
1881 &props,
1882 &src_pat.props,
1883 binding,
1884 ¶ms,
1885 ) {
1886 cands.push(node_id);
1887 }
1888 }
1889 }
1890 cands
1891 }
1892 };
1893
1894 let rel_table_id = self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
1895
1896 let mut result: Vec<HashMap<String, Value>> = Vec::new();
1897 for src_id in src_candidates {
1898 let src_slot = src_id.0 & 0xFFFF_FFFF;
1899 let dst_slots: Vec<u64> = match &rel_table_id {
1900 RelTableLookup::Found(rtid) => self.csr_neighbors(*rtid, src_slot),
1901 RelTableLookup::NotFound => continue,
1902 RelTableLookup::All => self.csr_neighbors_all(src_slot),
1903 };
1904 let delta_slots: Vec<u64> = self
1906 .read_delta_all()
1907 .into_iter()
1908 .filter(|r| {
1909 let r_src_label = (r.src.0 >> 32) as u32;
1910 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
1911 r_src_label == src_label_id && r_src_slot == src_slot
1912 })
1913 .map(|r| r.dst.0 & 0xFFFF_FFFF)
1914 .collect();
1915 let all_slots: std::collections::HashSet<u64> =
1916 dst_slots.into_iter().chain(delta_slots).collect();
1917
1918 for dst_slot in all_slots {
1919 let dst_id = NodeId(((dst_label_id as u64) << 32) | dst_slot);
1920 if self.is_node_tombstoned(dst_id) {
1921 continue;
1922 }
1923 if let Ok(dst_props) = self.store.get_node_raw(dst_id, &dst_col_ids) {
1924 if !self.matches_prop_filter_with_binding(
1925 &dst_props,
1926 &dst_pat.props,
1927 binding,
1928 ¶ms,
1929 ) {
1930 continue;
1931 }
1932 let src_props = self
1933 .store
1934 .get_node_raw(src_id, &src_col_ids)
1935 .unwrap_or_default();
1936 let mut row_vals =
1937 build_row_vals(&src_props, &src_pat.var, &src_col_ids, &self.store);
1938 row_vals.extend(build_row_vals(
1939 &dst_props,
1940 &dst_pat.var,
1941 &dst_col_ids,
1942 &self.store,
1943 ));
1944 row_vals.extend(binding.clone());
1946 row_vals.insert(src_pat.var.clone(), Value::NodeRef(src_id));
1947 row_vals.insert(
1948 format!("{}.__node_id__", src_pat.var),
1949 Value::NodeRef(src_id),
1950 );
1951 row_vals.insert(dst_pat.var.clone(), Value::NodeRef(dst_id));
1952 row_vals.insert(
1953 format!("{}.__node_id__", dst_pat.var),
1954 Value::NodeRef(dst_id),
1955 );
1956
1957 if let Some(wexpr) = where_clause {
1958 let mut row_vals_p = row_vals.clone();
1959 row_vals_p.extend(params.clone());
1960 if !self.eval_where_graph(wexpr, &row_vals_p) {
1961 continue;
1962 }
1963 }
1964 result.push(row_vals);
1965 }
1966 }
1967 }
1968 Ok(result)
1969 }
1970
1971 fn matches_prop_filter_with_binding(
1977 &self,
1978 props: &[(u32, u64)],
1979 filters: &[sparrowdb_cypher::ast::PropEntry],
1980 binding: &HashMap<String, Value>,
1981 params: &HashMap<String, Value>,
1982 ) -> bool {
1983 for f in filters {
1984 let col_id = prop_name_to_col_id(&f.key);
1985 let stored_raw = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
1986
1987 let filter_val = match &f.value {
1989 sparrowdb_cypher::ast::Expr::Var(v) => {
1990 binding.get(v).cloned().unwrap_or(Value::Null)
1992 }
1993 other => eval_expr(other, params),
1994 };
1995
1996 let stored_val = stored_raw.map(|raw| decode_raw_val(raw, &self.store));
1997 let matches = match (stored_val, &filter_val) {
1998 (Some(Value::String(a)), Value::String(b)) => &a == b,
1999 (Some(Value::Int64(a)), Value::Int64(b)) => a == *b,
2000 (Some(Value::Bool(a)), Value::Bool(b)) => a == *b,
2001 (Some(Value::Float64(a)), Value::Float64(b)) => a == *b,
2002 (None, Value::Null) => true,
2003 _ => false,
2004 };
2005 if !matches {
2006 return false;
2007 }
2008 }
2009 true
2010 }
2011
2012 fn collect_match_rows_for_with(
2021 &self,
2022 patterns: &[PathPattern],
2023 where_clause: Option<&Expr>,
2024 with_clause: &WithClause,
2025 ) -> Result<Vec<HashMap<String, Value>>> {
2026 if patterns.is_empty() || patterns[0].rels.is_empty() {
2027 let pat = &patterns[0];
2028 let node = &pat.nodes[0];
2029 let var_name = node.var.as_str();
2030 let label = node.labels.first().cloned().unwrap_or_default();
2031 let label_id = self
2032 .catalog
2033 .get_label(&label)?
2034 .ok_or(sparrowdb_common::Error::NotFound)?;
2035 let label_id_u32 = label_id as u32;
2036 let hwm = self.store.hwm_for_label(label_id_u32)?;
2037
2038 let mut all_col_ids: Vec<u32> = Vec::new();
2040 if let Some(wexpr) = &where_clause {
2041 collect_col_ids_from_expr(wexpr, &mut all_col_ids);
2042 }
2043 for item in &with_clause.items {
2044 collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
2045 }
2046 for p in &node.props {
2047 let col_id = prop_name_to_col_id(&p.key);
2048 if !all_col_ids.contains(&col_id) {
2049 all_col_ids.push(col_id);
2050 }
2051 }
2052
2053 let mut result: Vec<HashMap<String, Value>> = Vec::new();
2054 for slot in 0..hwm {
2055 let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
2056 if self.is_node_tombstoned(node_id) {
2059 continue;
2060 }
2061 let props = read_node_props(&self.store, node_id, &all_col_ids)?;
2062 if !self.matches_prop_filter(&props, &node.props) {
2063 continue;
2064 }
2065 let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
2066 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2069 row_vals.insert(format!("{var_name}.__node_id__"), Value::NodeRef(node_id));
2070 if let Some(wexpr) = &where_clause {
2071 let mut row_vals_p = row_vals.clone();
2072 row_vals_p.extend(self.dollar_params());
2073 if !self.eval_where_graph(wexpr, &row_vals_p) {
2074 continue;
2075 }
2076 }
2077 result.push(row_vals);
2078 }
2079 Ok(result)
2080 } else {
2081 Err(sparrowdb_common::Error::Unimplemented)
2082 }
2083 }
2084
2085 fn execute_match(&self, m: &MatchStatement) -> Result<QueryResult> {
2086 if m.pattern.is_empty() {
2087 let column_names = extract_return_column_names(&m.return_clause.items);
2089 let empty_vals: HashMap<String, Value> = HashMap::new();
2090 let row: Vec<Value> = m
2091 .return_clause
2092 .items
2093 .iter()
2094 .map(|item| eval_expr(&item.expr, &empty_vals))
2095 .collect();
2096 return Ok(QueryResult {
2097 columns: column_names,
2098 rows: vec![row],
2099 });
2100 }
2101
2102 let is_two_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() == 2;
2104 let is_one_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() == 1;
2105 let is_n_hop = m.pattern.len() == 1 && m.pattern[0].rels.len() >= 3;
2107 let is_var_len = m.pattern.len() == 1
2109 && m.pattern[0].rels.len() == 1
2110 && m.pattern[0].rels[0].min_hops.is_some();
2111
2112 let column_names = extract_return_column_names(&m.return_clause.items);
2113
2114 let is_multi_pattern = m.pattern.len() > 1 && m.pattern.iter().all(|p| p.rels.is_empty());
2117
2118 if is_var_len {
2119 self.execute_variable_length(m, &column_names)
2120 } else if is_two_hop {
2121 self.execute_two_hop(m, &column_names)
2122 } else if is_one_hop {
2123 self.execute_one_hop(m, &column_names)
2124 } else if is_n_hop {
2125 self.execute_n_hop(m, &column_names)
2126 } else if is_multi_pattern {
2127 self.execute_multi_pattern_scan(m, &column_names)
2128 } else if m.pattern[0].rels.is_empty() {
2129 self.execute_scan(m, &column_names)
2130 } else {
2131 self.execute_scan(m, &column_names)
2133 }
2134 }
2135
2136 fn execute_optional_match(&self, om: &OptionalMatchStatement) -> Result<QueryResult> {
2143 use sparrowdb_common::Error;
2144
2145 let match_stmt = MatchStatement {
2147 pattern: om.pattern.clone(),
2148 where_clause: om.where_clause.clone(),
2149 return_clause: om.return_clause.clone(),
2150 order_by: om.order_by.clone(),
2151 skip: om.skip,
2152 limit: om.limit,
2153 distinct: om.distinct,
2154 };
2155
2156 let column_names = extract_return_column_names(&om.return_clause.items);
2157
2158 let result = self.execute_match(&match_stmt);
2159
2160 match result {
2161 Ok(qr) if !qr.rows.is_empty() => Ok(qr),
2162 Ok(_) | Err(Error::NotFound) | Err(Error::InvalidArgument(_)) => {
2164 let null_row = vec![Value::Null; column_names.len()];
2165 Ok(QueryResult {
2166 columns: column_names,
2167 rows: vec![null_row],
2168 })
2169 }
2170 Err(e) => Err(e),
2171 }
2172 }
2173
2174 fn execute_match_optional_match(
2182 &self,
2183 mom: &MatchOptionalMatchStatement,
2184 ) -> Result<QueryResult> {
2185 let column_names = extract_return_column_names(&mom.return_clause.items);
2186
2187 let lead_return_items: Vec<ReturnItem> = mom
2190 .return_clause
2191 .items
2192 .iter()
2193 .filter(|item| {
2194 let lead_vars: Vec<&str> = mom
2196 .match_patterns
2197 .iter()
2198 .flat_map(|p| p.nodes.iter().map(|n| n.var.as_str()))
2199 .collect();
2200 match &item.expr {
2201 Expr::PropAccess { var, .. } => lead_vars.contains(&var.as_str()),
2202 Expr::Var(v) => lead_vars.contains(&v.as_str()),
2203 _ => false,
2204 }
2205 })
2206 .cloned()
2207 .collect();
2208
2209 let lead_col_names = extract_return_column_names(&lead_return_items);
2212
2213 if mom.match_patterns.is_empty() || mom.match_patterns[0].nodes.is_empty() {
2215 let null_row = vec![Value::Null; column_names.len()];
2216 return Ok(QueryResult {
2217 columns: column_names,
2218 rows: vec![null_row],
2219 });
2220 }
2221 let lead_node_pat = &mom.match_patterns[0].nodes[0];
2222 let lead_label = lead_node_pat.labels.first().cloned().unwrap_or_default();
2223 let lead_label_id = match self.catalog.get_label(&lead_label)? {
2224 Some(id) => id as u32,
2225 None => {
2226 return Ok(QueryResult {
2228 columns: column_names,
2229 rows: vec![],
2230 });
2231 }
2232 };
2233
2234 let lead_all_col_ids: Vec<u32> = {
2236 let mut ids = collect_col_ids_from_columns(&lead_col_names);
2237 if let Some(ref wexpr) = mom.match_where {
2238 collect_col_ids_from_expr(wexpr, &mut ids);
2239 }
2240 for p in &lead_node_pat.props {
2241 let col_id = prop_name_to_col_id(&p.key);
2242 if !ids.contains(&col_id) {
2243 ids.push(col_id);
2244 }
2245 }
2246 ids
2247 };
2248
2249 let lead_hwm = self.store.hwm_for_label(lead_label_id)?;
2250 let lead_var = lead_node_pat.var.as_str();
2251
2252 let mut lead_rows: Vec<(u64, Vec<(u32, u64)>)> = Vec::new();
2254 for slot in 0..lead_hwm {
2255 let node_id = NodeId(((lead_label_id as u64) << 32) | slot);
2256 if self.is_node_tombstoned(node_id) {
2259 continue;
2260 }
2261 let props = read_node_props(&self.store, node_id, &lead_all_col_ids)?;
2262 if !self.matches_prop_filter(&props, &lead_node_pat.props) {
2263 continue;
2264 }
2265 if let Some(ref wexpr) = mom.match_where {
2266 let mut row_vals = build_row_vals(&props, lead_var, &lead_all_col_ids, &self.store);
2267 row_vals.extend(self.dollar_params());
2268 if !self.eval_where_graph(wexpr, &row_vals) {
2269 continue;
2270 }
2271 }
2272 lead_rows.push((slot, props));
2273 }
2274
2275 let opt_patterns = &mom.optional_patterns;
2279
2280 let opt_vars: Vec<String> = opt_patterns
2282 .iter()
2283 .flat_map(|p| p.nodes.iter().map(|n| n.var.clone()))
2284 .filter(|v| !v.is_empty())
2285 .collect();
2286
2287 let mut result_rows: Vec<Vec<Value>> = Vec::new();
2288
2289 for (lead_slot, lead_props) in &lead_rows {
2290 let lead_row_vals =
2291 build_row_vals(lead_props, lead_var, &lead_all_col_ids, &self.store);
2292
2293 let opt_sub_rows: Vec<HashMap<String, Value>> = if opt_patterns.len() == 1
2298 && opt_patterns[0].rels.len() == 1
2299 && opt_patterns[0].nodes.len() == 2
2300 {
2301 let opt_pat = &opt_patterns[0];
2302 let opt_src_pat = &opt_pat.nodes[0];
2303 let opt_dst_pat = &opt_pat.nodes[1];
2304 let opt_rel_pat = &opt_pat.rels[0];
2305
2306 let opt_dst_label = opt_dst_pat.labels.first().cloned().unwrap_or_default();
2308 let opt_dst_label_id: Option<u32> = match self.catalog.get_label(&opt_dst_label) {
2309 Ok(Some(id)) => Some(id as u32),
2310 _ => None,
2311 };
2312
2313 self.optional_one_hop_sub_rows(
2314 *lead_slot,
2315 lead_label_id,
2316 opt_dst_label_id,
2317 opt_src_pat,
2318 opt_dst_pat,
2319 opt_rel_pat,
2320 &opt_vars,
2321 &column_names,
2322 )
2323 .unwrap_or_default()
2324 } else {
2325 vec![]
2327 };
2328
2329 if opt_sub_rows.is_empty() {
2330 let row: Vec<Value> = mom
2332 .return_clause
2333 .items
2334 .iter()
2335 .map(|item| {
2336 let v = eval_expr(&item.expr, &lead_row_vals);
2337 if v == Value::Null {
2338 match &item.expr {
2341 Expr::PropAccess { var, .. } | Expr::Var(var) => {
2342 if opt_vars.contains(var) {
2343 Value::Null
2344 } else {
2345 eval_expr(&item.expr, &lead_row_vals)
2346 }
2347 }
2348 _ => eval_expr(&item.expr, &lead_row_vals),
2349 }
2350 } else {
2351 v
2352 }
2353 })
2354 .collect();
2355 result_rows.push(row);
2356 } else {
2357 for opt_row_vals in opt_sub_rows {
2359 let mut combined = lead_row_vals.clone();
2360 combined.extend(opt_row_vals);
2361 let row: Vec<Value> = mom
2362 .return_clause
2363 .items
2364 .iter()
2365 .map(|item| eval_expr(&item.expr, &combined))
2366 .collect();
2367 result_rows.push(row);
2368 }
2369 }
2370 }
2371
2372 if mom.distinct {
2373 deduplicate_rows(&mut result_rows);
2374 }
2375 if let Some(skip) = mom.skip {
2376 let skip = (skip as usize).min(result_rows.len());
2377 result_rows.drain(0..skip);
2378 }
2379 if let Some(lim) = mom.limit {
2380 result_rows.truncate(lim as usize);
2381 }
2382
2383 Ok(QueryResult {
2384 columns: column_names,
2385 rows: result_rows,
2386 })
2387 }
2388
2389 #[allow(clippy::too_many_arguments)]
2392 fn optional_one_hop_sub_rows(
2393 &self,
2394 src_slot: u64,
2395 src_label_id: u32,
2396 dst_label_id: Option<u32>,
2397 _src_pat: &sparrowdb_cypher::ast::NodePattern,
2398 dst_node_pat: &sparrowdb_cypher::ast::NodePattern,
2399 rel_pat: &sparrowdb_cypher::ast::RelPattern,
2400 opt_vars: &[String],
2401 column_names: &[String],
2402 ) -> Result<Vec<HashMap<String, Value>>> {
2403 let dst_label_id = match dst_label_id {
2404 Some(id) => id,
2405 None => return Ok(vec![]),
2406 };
2407
2408 let dst_var = dst_node_pat.var.as_str();
2409 let col_ids_dst = collect_col_ids_for_var(dst_var, column_names, dst_label_id);
2410 let _ = opt_vars;
2411
2412 let rel_lookup = self.resolve_rel_table_id(src_label_id, dst_label_id, &rel_pat.rel_type);
2414
2415 if matches!(rel_lookup, RelTableLookup::NotFound) {
2417 return Ok(vec![]);
2418 }
2419
2420 let delta_neighbors: Vec<u64> = {
2421 let records: Vec<DeltaRecord> = match rel_lookup {
2422 RelTableLookup::Found(rtid) => self.read_delta_for(rtid),
2423 _ => self.read_delta_all(),
2424 };
2425 records
2426 .into_iter()
2427 .filter(|r| {
2428 let r_src_label = (r.src.0 >> 32) as u32;
2429 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
2430 r_src_label == src_label_id && r_src_slot == src_slot
2431 })
2432 .map(|r| r.dst.0 & 0xFFFF_FFFF)
2433 .collect()
2434 };
2435
2436 let csr_neighbors = match rel_lookup {
2437 RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
2438 _ => self.csr_neighbors_all(src_slot),
2439 };
2440 let all_neighbors: Vec<u64> = csr_neighbors.into_iter().chain(delta_neighbors).collect();
2441
2442 let mut seen: HashSet<u64> = HashSet::new();
2443 let mut sub_rows: Vec<HashMap<String, Value>> = Vec::new();
2444
2445 for dst_slot in all_neighbors {
2446 if !seen.insert(dst_slot) {
2447 continue;
2448 }
2449 let dst_node = NodeId(((dst_label_id as u64) << 32) | dst_slot);
2450 let dst_props = read_node_props(&self.store, dst_node, &col_ids_dst)?;
2451 if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
2452 continue;
2453 }
2454 let row_vals = build_row_vals(&dst_props, dst_var, &col_ids_dst, &self.store);
2455 sub_rows.push(row_vals);
2456 }
2457
2458 Ok(sub_rows)
2459 }
2460
2461 fn execute_multi_pattern_scan(
2470 &self,
2471 m: &MatchStatement,
2472 column_names: &[String],
2473 ) -> Result<QueryResult> {
2474 let mut per_var: Vec<(String, u32, Vec<NodeId>)> = Vec::new(); for pat in &m.pattern {
2478 if pat.nodes.is_empty() {
2479 continue;
2480 }
2481 let node = &pat.nodes[0];
2482 if node.var.is_empty() {
2483 continue;
2484 }
2485 let label = node.labels.first().cloned().unwrap_or_default();
2486 let label_id = match self.catalog.get_label(&label)? {
2487 Some(id) => id as u32,
2488 None => return Ok(QueryResult::empty(column_names.to_vec())),
2489 };
2490 let filter_col_ids: Vec<u32> = node
2491 .props
2492 .iter()
2493 .map(|p| prop_name_to_col_id(&p.key))
2494 .collect();
2495 let params = self.dollar_params();
2496 let hwm = self.store.hwm_for_label(label_id)?;
2497 let mut candidates: Vec<NodeId> = Vec::new();
2498 for slot in 0..hwm {
2499 let node_id = NodeId(((label_id as u64) << 32) | slot);
2500 if self.is_node_tombstoned(node_id) {
2501 continue;
2502 }
2503 if filter_col_ids.is_empty() {
2504 candidates.push(node_id);
2505 } else if let Ok(raw_props) = self.store.get_node_raw(node_id, &filter_col_ids) {
2506 if matches_prop_filter_static(&raw_props, &node.props, ¶ms, &self.store) {
2507 candidates.push(node_id);
2508 }
2509 }
2510 }
2511 if candidates.is_empty() {
2512 return Ok(QueryResult::empty(column_names.to_vec()));
2513 }
2514 per_var.push((node.var.clone(), label_id, candidates));
2515 }
2516
2517 let mut accumulated: Vec<HashMap<String, Value>> = vec![HashMap::new()];
2519 for (var, _label_id, candidates) in &per_var {
2520 let mut next: Vec<HashMap<String, Value>> = Vec::new();
2521 for base_row in &accumulated {
2522 for &node_id in candidates {
2523 let mut row = base_row.clone();
2524 row.insert(var.clone(), Value::NodeRef(node_id));
2526 row.insert(format!("{var}.__node_id__"), Value::NodeRef(node_id));
2527 let label_id = (node_id.0 >> 32) as u32;
2529 let label_col_ids = self.store.col_ids_for_label(label_id).unwrap_or_default();
2530 let nullable = self
2531 .store
2532 .get_node_raw_nullable(node_id, &label_col_ids)
2533 .unwrap_or_default();
2534 for &(col_id, opt_raw) in &nullable {
2535 if let Some(raw) = opt_raw {
2536 row.insert(
2537 format!("{var}.col_{col_id}"),
2538 decode_raw_val(raw, &self.store),
2539 );
2540 }
2541 }
2542 next.push(row);
2543 }
2544 }
2545 accumulated = next;
2546 }
2547
2548 if let Some(ref where_expr) = m.where_clause {
2550 accumulated.retain(|row| self.eval_where_graph(where_expr, row));
2551 }
2552
2553 let dollar_params = self.dollar_params();
2555 if !dollar_params.is_empty() {
2556 for row in &mut accumulated {
2557 row.extend(dollar_params.clone());
2558 }
2559 }
2560
2561 let mut rows = self.aggregate_rows_graph(&accumulated, &m.return_clause.items);
2562
2563 apply_order_by(&mut rows, m, column_names);
2565 if let Some(skip) = m.skip {
2566 let skip = (skip as usize).min(rows.len());
2567 rows.drain(0..skip);
2568 }
2569 if let Some(limit) = m.limit {
2570 rows.truncate(limit as usize);
2571 }
2572
2573 Ok(QueryResult {
2574 columns: column_names.to_vec(),
2575 rows,
2576 })
2577 }
2578
2579 fn execute_scan(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
2580 let pat = &m.pattern[0];
2581 let node = &pat.nodes[0];
2582
2583 if node.labels.is_empty() {
2586 return self.execute_scan_all_labels(m, column_names);
2587 }
2588
2589 let label = node.labels.first().cloned().unwrap_or_default();
2590 let label_id = match self.catalog.get_label(&label)? {
2592 Some(id) => id as u32,
2593 None => {
2594 return Ok(QueryResult {
2595 columns: column_names.to_vec(),
2596 rows: vec![],
2597 })
2598 }
2599 };
2600 let label_id_u32 = label_id;
2601
2602 let hwm = self.store.hwm_for_label(label_id_u32)?;
2603 tracing::debug!(label = %label, hwm = hwm, "node scan start");
2604
2605 let col_ids = collect_col_ids_from_columns(column_names);
2608 let mut all_col_ids: Vec<u32> = col_ids.clone();
2609 if let Some(ref where_expr) = m.where_clause {
2611 collect_col_ids_from_expr(where_expr, &mut all_col_ids);
2612 }
2613 for p in &node.props {
2615 let col_id = prop_name_to_col_id(&p.key);
2616 if !all_col_ids.contains(&col_id) {
2617 all_col_ids.push(col_id);
2618 }
2619 }
2620
2621 let use_agg = has_aggregate_in_return(&m.return_clause.items);
2622 let use_eval_path = use_agg || needs_node_ref_in_return(&m.return_clause.items);
2628 if use_eval_path {
2629 for item in &m.return_clause.items {
2634 collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
2635 }
2636 }
2637
2638 let bare_vars = bare_var_names_in_return(&m.return_clause.items);
2641 let all_label_col_ids: Vec<u32> = if !bare_vars.is_empty() {
2642 self.store.col_ids_for_label(label_id_u32)?
2643 } else {
2644 vec![]
2645 };
2646
2647 let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
2648 let mut rows: Vec<Vec<Value>> = Vec::new();
2649
2650 let index_candidate_slots: Option<Vec<u32>> =
2655 try_index_lookup_for_props(&node.props, label_id_u32, &self.prop_index);
2656
2657 let text_candidate_slots: Option<Vec<u32>> = if index_candidate_slots.is_none() {
2663 m.where_clause.as_ref().and_then(|wexpr| {
2664 try_text_index_lookup(wexpr, node.var.as_str(), label_id_u32, &self.text_index)
2665 })
2666 } else {
2667 None
2668 };
2669
2670 let slot_iter: Box<dyn Iterator<Item = u64>> =
2674 if let Some(ref slots) = index_candidate_slots {
2675 tracing::debug!(
2676 label = %label,
2677 candidates = slots.len(),
2678 "SPA-249: property index fast path"
2679 );
2680 Box::new(slots.iter().map(|&s| s as u64))
2681 } else if let Some(ref slots) = text_candidate_slots {
2682 tracing::debug!(
2683 label = %label,
2684 candidates = slots.len(),
2685 "SPA-251: text index fast path"
2686 );
2687 Box::new(slots.iter().map(|&s| s as u64))
2688 } else {
2689 Box::new(0..hwm)
2690 };
2691
2692 for slot in slot_iter {
2693 self.check_deadline()?;
2695
2696 let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
2697 if slot < 1024 || slot % 10_000 == 0 {
2698 tracing::trace!(slot = slot, node_id = node_id.0, "scan emit");
2699 }
2700
2701 if self.is_node_tombstoned(node_id) {
2709 continue;
2710 }
2711
2712 let nullable_props = self.store.get_node_raw_nullable(node_id, &all_col_ids)?;
2717 let props: Vec<(u32, u64)> = nullable_props
2718 .iter()
2719 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
2720 .collect();
2721
2722 if !self.matches_prop_filter(&props, &node.props) {
2724 continue;
2725 }
2726
2727 let var_name = node.var.as_str();
2729 if let Some(ref where_expr) = m.where_clause {
2730 let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
2731 if !var_name.is_empty() && !label.is_empty() {
2733 row_vals.insert(
2734 format!("{}.__labels__", var_name),
2735 Value::List(vec![Value::String(label.clone())]),
2736 );
2737 }
2738 if !var_name.is_empty() {
2740 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2741 }
2742 row_vals.extend(self.dollar_params());
2744 if !self.eval_where_graph(where_expr, &row_vals) {
2745 continue;
2746 }
2747 }
2748
2749 if use_eval_path {
2750 let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
2752 if !var_name.is_empty() && !label.is_empty() {
2754 row_vals.insert(
2755 format!("{}.__labels__", var_name),
2756 Value::List(vec![Value::String(label.clone())]),
2757 );
2758 }
2759 if !var_name.is_empty() {
2760 if bare_vars.contains(&var_name.to_string()) && !all_label_col_ids.is_empty() {
2764 let all_nullable = self
2765 .store
2766 .get_node_raw_nullable(node_id, &all_label_col_ids)?;
2767 let all_props: Vec<(u32, u64)> = all_nullable
2768 .iter()
2769 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
2770 .collect();
2771 row_vals.insert(
2772 var_name.to_string(),
2773 build_node_map(&all_props, &self.store),
2774 );
2775 } else {
2776 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2777 }
2778 row_vals.insert(format!("{}.__node_id__", var_name), Value::NodeRef(node_id));
2781 }
2782 raw_rows.push(row_vals);
2783 } else {
2784 let row = project_row(
2786 &props,
2787 column_names,
2788 &all_col_ids,
2789 var_name,
2790 &label,
2791 &self.store,
2792 );
2793 rows.push(row);
2794 }
2795 }
2796
2797 if use_eval_path {
2798 rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
2799 } else {
2800 if m.distinct {
2801 deduplicate_rows(&mut rows);
2802 }
2803
2804 apply_order_by(&mut rows, m, column_names);
2806
2807 if let Some(skip) = m.skip {
2809 let skip = (skip as usize).min(rows.len());
2810 rows.drain(0..skip);
2811 }
2812
2813 if let Some(lim) = m.limit {
2815 rows.truncate(lim as usize);
2816 }
2817 }
2818
2819 tracing::debug!(rows = rows.len(), "node scan complete");
2820 Ok(QueryResult {
2821 columns: column_names.to_vec(),
2822 rows,
2823 })
2824 }
2825
2826 fn execute_scan_all_labels(
2835 &self,
2836 m: &MatchStatement,
2837 column_names: &[String],
2838 ) -> Result<QueryResult> {
2839 let all_labels = self.catalog.list_labels()?;
2840 tracing::debug!(label_count = all_labels.len(), "label-less full scan start");
2841
2842 let pat = &m.pattern[0];
2843 let node = &pat.nodes[0];
2844 let var_name = node.var.as_str();
2845
2846 let mut all_col_ids: Vec<u32> = collect_col_ids_from_columns(column_names);
2848 if let Some(ref where_expr) = m.where_clause {
2849 collect_col_ids_from_expr(where_expr, &mut all_col_ids);
2850 }
2851 for p in &node.props {
2852 let col_id = prop_name_to_col_id(&p.key);
2853 if !all_col_ids.contains(&col_id) {
2854 all_col_ids.push(col_id);
2855 }
2856 }
2857
2858 let use_agg = has_aggregate_in_return(&m.return_clause.items);
2859 let use_eval_path_all = use_agg || needs_node_ref_in_return(&m.return_clause.items);
2861 if use_eval_path_all {
2862 for item in &m.return_clause.items {
2863 collect_col_ids_from_expr(&item.expr, &mut all_col_ids);
2864 }
2865 }
2866
2867 let bare_vars_all = bare_var_names_in_return(&m.return_clause.items);
2869
2870 let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
2871 let mut rows: Vec<Vec<Value>> = Vec::new();
2872
2873 for (label_id, label_name) in &all_labels {
2874 let label_id_u32 = *label_id as u32;
2875 let hwm = self.store.hwm_for_label(label_id_u32)?;
2876 tracing::debug!(label = %label_name, hwm = hwm, "label-less scan: label slot");
2877
2878 let all_label_col_ids_here: Vec<u32> = if !bare_vars_all.is_empty() {
2880 self.store.col_ids_for_label(label_id_u32)?
2881 } else {
2882 vec![]
2883 };
2884
2885 for slot in 0..hwm {
2886 self.check_deadline()?;
2888
2889 let node_id = NodeId(((label_id_u32 as u64) << 32) | slot);
2890
2891 if self.is_node_tombstoned(node_id) {
2895 continue;
2896 }
2897
2898 let nullable_props = self.store.get_node_raw_nullable(node_id, &all_col_ids)?;
2899 let props: Vec<(u32, u64)> = nullable_props
2900 .iter()
2901 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
2902 .collect();
2903
2904 if !self.matches_prop_filter(&props, &node.props) {
2906 continue;
2907 }
2908
2909 if let Some(ref where_expr) = m.where_clause {
2911 let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
2912 if !var_name.is_empty() {
2913 row_vals.insert(
2914 format!("{}.__labels__", var_name),
2915 Value::List(vec![Value::String(label_name.clone())]),
2916 );
2917 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2918 }
2919 row_vals.extend(self.dollar_params());
2920 if !self.eval_where_graph(where_expr, &row_vals) {
2921 continue;
2922 }
2923 }
2924
2925 if use_eval_path_all {
2926 let mut row_vals = build_row_vals(&props, var_name, &all_col_ids, &self.store);
2927 if !var_name.is_empty() {
2928 row_vals.insert(
2929 format!("{}.__labels__", var_name),
2930 Value::List(vec![Value::String(label_name.clone())]),
2931 );
2932 if bare_vars_all.contains(&var_name.to_string())
2934 && !all_label_col_ids_here.is_empty()
2935 {
2936 let all_nullable = self
2937 .store
2938 .get_node_raw_nullable(node_id, &all_label_col_ids_here)?;
2939 let all_props: Vec<(u32, u64)> = all_nullable
2940 .iter()
2941 .filter_map(|&(col_id, opt)| opt.map(|v| (col_id, v)))
2942 .collect();
2943 row_vals.insert(
2944 var_name.to_string(),
2945 build_node_map(&all_props, &self.store),
2946 );
2947 } else {
2948 row_vals.insert(var_name.to_string(), Value::NodeRef(node_id));
2949 }
2950 row_vals
2951 .insert(format!("{}.__node_id__", var_name), Value::NodeRef(node_id));
2952 }
2953 raw_rows.push(row_vals);
2954 } else {
2955 let row = project_row(
2956 &props,
2957 column_names,
2958 &all_col_ids,
2959 var_name,
2960 label_name,
2961 &self.store,
2962 );
2963 rows.push(row);
2964 }
2965 }
2966 }
2967
2968 if use_eval_path_all {
2969 rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
2970 }
2971
2972 if m.distinct {
2975 deduplicate_rows(&mut rows);
2976 }
2977 apply_order_by(&mut rows, m, column_names);
2978 if let Some(skip) = m.skip {
2979 let skip = (skip as usize).min(rows.len());
2980 rows.drain(0..skip);
2981 }
2982 if let Some(lim) = m.limit {
2983 rows.truncate(lim as usize);
2984 }
2985
2986 tracing::debug!(rows = rows.len(), "label-less full scan complete");
2987 Ok(QueryResult {
2988 columns: column_names.to_vec(),
2989 rows,
2990 })
2991 }
2992
2993 fn execute_one_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
2996 let pat = &m.pattern[0];
2997 let src_node_pat = &pat.nodes[0];
2998 let dst_node_pat = &pat.nodes[1];
2999 let rel_pat = &pat.rels[0];
3000
3001 let dir = &rel_pat.dir;
3002 use sparrowdb_cypher::ast::EdgeDir;
3008
3009 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
3010 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
3011 let src_label_id_opt: Option<u32> = if src_label.is_empty() {
3013 None
3014 } else {
3015 self.catalog.get_label(&src_label)?.map(|id| id as u32)
3016 };
3017 let dst_label_id_opt: Option<u32> = if dst_label.is_empty() {
3018 None
3019 } else {
3020 self.catalog.get_label(&dst_label)?.map(|id| id as u32)
3021 };
3022
3023 let all_rel_tables = self.catalog.list_rel_tables_with_ids();
3035 let rel_tables_to_scan: Vec<(u64, u32, u32, String)> = all_rel_tables
3036 .into_iter()
3037 .filter(|(_, sid, did, rt)| {
3038 let type_ok = rel_pat.rel_type.is_empty() || rt == &rel_pat.rel_type;
3039 let src_ok = src_label_id_opt.map(|id| id == *sid as u32).unwrap_or(true);
3040 let dst_ok = dst_label_id_opt.map(|id| id == *did as u32).unwrap_or(true);
3041 type_ok && src_ok && dst_ok
3042 })
3043 .map(|(catalog_id, sid, did, rt)| (catalog_id, sid as u32, did as u32, rt))
3044 .collect();
3045
3046 let use_agg = has_aggregate_in_return(&m.return_clause.items);
3047 let mut raw_rows: Vec<HashMap<String, Value>> = Vec::new();
3048 let mut rows: Vec<Vec<Value>> = Vec::new();
3049 let mut seen_undirected: HashSet<(u64, u64)> = HashSet::new();
3052
3053 let label_id_to_name: Vec<(u16, String)> = if src_label.is_empty() || dst_label.is_empty() {
3055 self.catalog.list_labels().unwrap_or_default()
3056 } else {
3057 vec![]
3058 };
3059
3060 for (catalog_rel_id, tbl_src_label_id, tbl_dst_label_id, tbl_rel_type) in
3062 &rel_tables_to_scan
3063 {
3064 let storage_rel_id = RelTableId(*catalog_rel_id as u32);
3065 let effective_src_label_id = *tbl_src_label_id;
3066 let effective_dst_label_id = *tbl_dst_label_id;
3067
3068 let effective_rel_type: &str = tbl_rel_type.as_str();
3071
3072 let effective_src_label: &str = if src_label.is_empty() {
3074 label_id_to_name
3075 .iter()
3076 .find(|(id, _)| *id as u32 == effective_src_label_id)
3077 .map(|(_, name)| name.as_str())
3078 .unwrap_or("")
3079 } else {
3080 src_label.as_str()
3081 };
3082 let effective_dst_label: &str = if dst_label.is_empty() {
3083 label_id_to_name
3084 .iter()
3085 .find(|(id, _)| *id as u32 == effective_dst_label_id)
3086 .map(|(_, name)| name.as_str())
3087 .unwrap_or("")
3088 } else {
3089 dst_label.as_str()
3090 };
3091
3092 let hwm_src = match self.store.hwm_for_label(effective_src_label_id) {
3093 Ok(h) => h,
3094 Err(_) => continue,
3095 };
3096 tracing::debug!(
3097 src_label = %effective_src_label,
3098 dst_label = %effective_dst_label,
3099 rel_type = %effective_rel_type,
3100 hwm_src = hwm_src,
3101 "one-hop traversal start"
3102 );
3103
3104 let mut col_ids_src =
3105 collect_col_ids_for_var(&src_node_pat.var, column_names, effective_src_label_id);
3106 let mut col_ids_dst =
3107 collect_col_ids_for_var(&dst_node_pat.var, column_names, effective_dst_label_id);
3108 if use_agg {
3109 for item in &m.return_clause.items {
3110 collect_col_ids_from_expr(&item.expr, &mut col_ids_src);
3111 collect_col_ids_from_expr(&item.expr, &mut col_ids_dst);
3112 }
3113 }
3114 if let Some(ref where_expr) = m.where_clause {
3116 collect_col_ids_from_expr(where_expr, &mut col_ids_src);
3117 collect_col_ids_from_expr(where_expr, &mut col_ids_dst);
3118 }
3119
3120 let delta_records_all = {
3123 let edge_store = EdgeStore::open(&self.db_root, storage_rel_id);
3124 edge_store.and_then(|s| s.read_delta()).unwrap_or_default()
3125 };
3126
3127 for src_slot in 0..hwm_src {
3129 self.check_deadline()?;
3131
3132 let src_node = NodeId(((effective_src_label_id as u64) << 32) | src_slot);
3133 let src_props = if !col_ids_src.is_empty() || !src_node_pat.props.is_empty() {
3134 let all_needed: Vec<u32> = {
3135 let mut v = col_ids_src.clone();
3136 for p in &src_node_pat.props {
3137 let col_id = prop_name_to_col_id(&p.key);
3138 if !v.contains(&col_id) {
3139 v.push(col_id);
3140 }
3141 }
3142 v
3143 };
3144 self.store.get_node_raw(src_node, &all_needed)?
3145 } else {
3146 vec![]
3147 };
3148
3149 if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
3151 continue;
3152 }
3153
3154 let delta_neighbors: Vec<u64> = delta_records_all
3157 .iter()
3158 .filter(|r| {
3159 let r_src_label = (r.src.0 >> 32) as u32;
3160 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
3161 r_src_label == effective_src_label_id && r_src_slot == src_slot
3162 })
3163 .map(|r| r.dst.0 & 0xFFFF_FFFF)
3164 .collect();
3165
3166 let csr_neighbors: &[u64] = self
3170 .csrs
3171 .get(&u32::try_from(*catalog_rel_id).expect("rel_table_id overflowed u32"))
3172 .map(|c| c.neighbors(src_slot))
3173 .unwrap_or(&[]);
3174 let all_neighbors: Vec<u64> = csr_neighbors
3175 .iter()
3176 .copied()
3177 .chain(delta_neighbors.into_iter())
3178 .collect();
3179 let mut seen_neighbors: HashSet<u64> = HashSet::new();
3180 for &dst_slot in &all_neighbors {
3181 if !seen_neighbors.insert(dst_slot) {
3182 continue;
3183 }
3184 if *dir == EdgeDir::Both {
3187 seen_undirected.insert((src_slot, dst_slot));
3188 }
3189 let dst_node = NodeId(((effective_dst_label_id as u64) << 32) | dst_slot);
3190 let dst_props = if !col_ids_dst.is_empty() || !dst_node_pat.props.is_empty() {
3191 let all_needed: Vec<u32> = {
3192 let mut v = col_ids_dst.clone();
3193 for p in &dst_node_pat.props {
3194 let col_id = prop_name_to_col_id(&p.key);
3195 if !v.contains(&col_id) {
3196 v.push(col_id);
3197 }
3198 }
3199 v
3200 };
3201 self.store.get_node_raw(dst_node, &all_needed)?
3202 } else {
3203 vec![]
3204 };
3205
3206 if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
3208 continue;
3209 }
3210
3211 if *dir == EdgeDir::Both {
3214 seen_undirected.insert((src_slot, dst_slot));
3215 }
3216
3217 if let Some(ref where_expr) = m.where_clause {
3219 let mut row_vals = build_row_vals(
3220 &src_props,
3221 &src_node_pat.var,
3222 &col_ids_src,
3223 &self.store,
3224 );
3225 row_vals.extend(build_row_vals(
3226 &dst_props,
3227 &dst_node_pat.var,
3228 &col_ids_dst,
3229 &self.store,
3230 ));
3231 if !rel_pat.var.is_empty() {
3233 row_vals.insert(
3234 format!("{}.__type__", rel_pat.var),
3235 Value::String(effective_rel_type.to_string()),
3236 );
3237 }
3238 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3240 row_vals.insert(
3241 format!("{}.__labels__", src_node_pat.var),
3242 Value::List(vec![Value::String(effective_src_label.to_string())]),
3243 );
3244 }
3245 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3246 row_vals.insert(
3247 format!("{}.__labels__", dst_node_pat.var),
3248 Value::List(vec![Value::String(effective_dst_label.to_string())]),
3249 );
3250 }
3251 row_vals.extend(self.dollar_params());
3252 if !self.eval_where_graph(where_expr, &row_vals) {
3253 continue;
3254 }
3255 }
3256
3257 if use_agg {
3258 let mut row_vals = build_row_vals(
3259 &src_props,
3260 &src_node_pat.var,
3261 &col_ids_src,
3262 &self.store,
3263 );
3264 row_vals.extend(build_row_vals(
3265 &dst_props,
3266 &dst_node_pat.var,
3267 &col_ids_dst,
3268 &self.store,
3269 ));
3270 if !rel_pat.var.is_empty() {
3272 row_vals.insert(
3273 format!("{}.__type__", rel_pat.var),
3274 Value::String(effective_rel_type.to_string()),
3275 );
3276 }
3277 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3278 row_vals.insert(
3279 format!("{}.__labels__", src_node_pat.var),
3280 Value::List(vec![Value::String(effective_src_label.to_string())]),
3281 );
3282 }
3283 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3284 row_vals.insert(
3285 format!("{}.__labels__", dst_node_pat.var),
3286 Value::List(vec![Value::String(effective_dst_label.to_string())]),
3287 );
3288 }
3289 if !src_node_pat.var.is_empty() {
3290 row_vals.insert(src_node_pat.var.clone(), Value::NodeRef(src_node));
3291 }
3292 if !dst_node_pat.var.is_empty() {
3293 row_vals.insert(dst_node_pat.var.clone(), Value::NodeRef(dst_node));
3294 }
3295 if !rel_pat.var.is_empty() {
3298 let edge_id = sparrowdb_common::EdgeId(
3304 (*catalog_rel_id << 32) | (src_slot ^ dst_slot) & 0xFFFF_FFFF,
3305 );
3306 row_vals.insert(rel_pat.var.clone(), Value::EdgeRef(edge_id));
3307 }
3308 raw_rows.push(row_vals);
3309 } else {
3310 let rel_var_type = if !rel_pat.var.is_empty() {
3315 Some((rel_pat.var.as_str(), effective_rel_type))
3316 } else {
3317 None
3318 };
3319 let src_label_meta =
3320 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3321 Some((src_node_pat.var.as_str(), effective_src_label))
3322 } else {
3323 None
3324 };
3325 let dst_label_meta =
3326 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3327 Some((dst_node_pat.var.as_str(), effective_dst_label))
3328 } else {
3329 None
3330 };
3331 let row = project_hop_row(
3332 &src_props,
3333 &dst_props,
3334 column_names,
3335 &src_node_pat.var,
3336 &dst_node_pat.var,
3337 rel_var_type,
3338 src_label_meta,
3339 dst_label_meta,
3340 &self.store,
3341 );
3342 rows.push(row);
3343 }
3344 }
3345 }
3346 }
3347
3348 if *dir == EdgeDir::Both {
3353 for (catalog_rel_id, tbl_src_label_id, tbl_dst_label_id, tbl_rel_type) in
3354 &rel_tables_to_scan
3355 {
3356 let storage_rel_id = RelTableId(*catalog_rel_id as u32);
3357 let bwd_scan_label_id = *tbl_dst_label_id;
3359 let bwd_dst_label_id = *tbl_src_label_id;
3360 let effective_rel_type: &str = tbl_rel_type.as_str();
3361
3362 let effective_src_label: &str = if src_label.is_empty() {
3363 label_id_to_name
3364 .iter()
3365 .find(|(id, _)| *id as u32 == bwd_scan_label_id)
3366 .map(|(_, name)| name.as_str())
3367 .unwrap_or("")
3368 } else {
3369 src_label.as_str()
3370 };
3371 let effective_dst_label: &str = if dst_label.is_empty() {
3372 label_id_to_name
3373 .iter()
3374 .find(|(id, _)| *id as u32 == bwd_dst_label_id)
3375 .map(|(_, name)| name.as_str())
3376 .unwrap_or("")
3377 } else {
3378 dst_label.as_str()
3379 };
3380
3381 let hwm_bwd = match self.store.hwm_for_label(bwd_scan_label_id) {
3382 Ok(h) => h,
3383 Err(_) => continue,
3384 };
3385
3386 let mut col_ids_src =
3387 collect_col_ids_for_var(&src_node_pat.var, column_names, bwd_scan_label_id);
3388 let mut col_ids_dst =
3389 collect_col_ids_for_var(&dst_node_pat.var, column_names, bwd_dst_label_id);
3390 if use_agg {
3391 for item in &m.return_clause.items {
3392 collect_col_ids_from_expr(&item.expr, &mut col_ids_src);
3393 collect_col_ids_from_expr(&item.expr, &mut col_ids_dst);
3394 }
3395 }
3396
3397 let delta_records_bwd = EdgeStore::open(&self.db_root, storage_rel_id)
3400 .and_then(|s| s.read_delta())
3401 .unwrap_or_default();
3402
3403 let csr_bwd: Option<CsrBackward> = EdgeStore::open(&self.db_root, storage_rel_id)
3408 .and_then(|s| s.open_bwd())
3409 .ok();
3410
3411 for b_slot in 0..hwm_bwd {
3413 let b_node = NodeId(((bwd_scan_label_id as u64) << 32) | b_slot);
3414 let b_props = if !col_ids_src.is_empty() || !src_node_pat.props.is_empty() {
3415 let all_needed: Vec<u32> = {
3416 let mut v = col_ids_src.clone();
3417 for p in &src_node_pat.props {
3418 let col_id = prop_name_to_col_id(&p.key);
3419 if !v.contains(&col_id) {
3420 v.push(col_id);
3421 }
3422 }
3423 v
3424 };
3425 self.store.get_node_raw(b_node, &all_needed)?
3426 } else {
3427 vec![]
3428 };
3429 if !self.matches_prop_filter(&b_props, &src_node_pat.props) {
3434 continue;
3435 }
3436
3437 let delta_predecessors: Vec<u64> = delta_records_bwd
3440 .iter()
3441 .filter(|r| {
3442 let r_dst_label = (r.dst.0 >> 32) as u32;
3443 let r_dst_slot = r.dst.0 & 0xFFFF_FFFF;
3444 r_dst_label == bwd_scan_label_id && r_dst_slot == b_slot
3445 })
3446 .map(|r| r.src.0 & 0xFFFF_FFFF)
3447 .collect();
3448
3449 let csr_predecessors: &[u64] = csr_bwd
3455 .as_ref()
3456 .map(|c| c.predecessors(b_slot))
3457 .unwrap_or(&[]);
3458 let all_predecessors: Vec<u64> = csr_predecessors
3459 .iter()
3460 .copied()
3461 .chain(delta_predecessors.into_iter())
3462 .collect();
3463
3464 let mut seen_preds: HashSet<u64> = HashSet::new();
3465 for a_slot in all_predecessors {
3466 if !seen_preds.insert(a_slot) {
3467 continue;
3468 }
3469 if seen_undirected.contains(&(b_slot, a_slot)) {
3479 continue;
3480 }
3481
3482 let a_node = NodeId(((bwd_dst_label_id as u64) << 32) | a_slot);
3483 let a_props = if !col_ids_dst.is_empty() || !dst_node_pat.props.is_empty() {
3484 let all_needed: Vec<u32> = {
3485 let mut v = col_ids_dst.clone();
3486 for p in &dst_node_pat.props {
3487 let col_id = prop_name_to_col_id(&p.key);
3488 if !v.contains(&col_id) {
3489 v.push(col_id);
3490 }
3491 }
3492 v
3493 };
3494 self.store.get_node_raw(a_node, &all_needed)?
3495 } else {
3496 vec![]
3497 };
3498
3499 if !self.matches_prop_filter(&a_props, &dst_node_pat.props) {
3500 continue;
3501 }
3502
3503 if let Some(ref where_expr) = m.where_clause {
3505 let mut row_vals = build_row_vals(
3506 &b_props,
3507 &src_node_pat.var,
3508 &col_ids_src,
3509 &self.store,
3510 );
3511 row_vals.extend(build_row_vals(
3512 &a_props,
3513 &dst_node_pat.var,
3514 &col_ids_dst,
3515 &self.store,
3516 ));
3517 if !rel_pat.var.is_empty() {
3518 row_vals.insert(
3519 format!("{}.__type__", rel_pat.var),
3520 Value::String(effective_rel_type.to_string()),
3521 );
3522 }
3523 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3524 row_vals.insert(
3525 format!("{}.__labels__", src_node_pat.var),
3526 Value::List(vec![Value::String(
3527 effective_src_label.to_string(),
3528 )]),
3529 );
3530 }
3531 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3532 row_vals.insert(
3533 format!("{}.__labels__", dst_node_pat.var),
3534 Value::List(vec![Value::String(
3535 effective_dst_label.to_string(),
3536 )]),
3537 );
3538 }
3539 row_vals.extend(self.dollar_params());
3540 if !self.eval_where_graph(where_expr, &row_vals) {
3541 continue;
3542 }
3543 }
3544
3545 if use_agg {
3546 let mut row_vals = build_row_vals(
3547 &b_props,
3548 &src_node_pat.var,
3549 &col_ids_src,
3550 &self.store,
3551 );
3552 row_vals.extend(build_row_vals(
3553 &a_props,
3554 &dst_node_pat.var,
3555 &col_ids_dst,
3556 &self.store,
3557 ));
3558 if !rel_pat.var.is_empty() {
3559 row_vals.insert(
3560 format!("{}.__type__", rel_pat.var),
3561 Value::String(effective_rel_type.to_string()),
3562 );
3563 }
3564 if !src_node_pat.var.is_empty() && !effective_src_label.is_empty() {
3565 row_vals.insert(
3566 format!("{}.__labels__", src_node_pat.var),
3567 Value::List(vec![Value::String(
3568 effective_src_label.to_string(),
3569 )]),
3570 );
3571 }
3572 if !dst_node_pat.var.is_empty() && !effective_dst_label.is_empty() {
3573 row_vals.insert(
3574 format!("{}.__labels__", dst_node_pat.var),
3575 Value::List(vec![Value::String(
3576 effective_dst_label.to_string(),
3577 )]),
3578 );
3579 }
3580 if !src_node_pat.var.is_empty() {
3581 row_vals.insert(src_node_pat.var.clone(), Value::NodeRef(b_node));
3582 }
3583 if !dst_node_pat.var.is_empty() {
3584 row_vals.insert(dst_node_pat.var.clone(), Value::NodeRef(a_node));
3585 }
3586 if !rel_pat.var.is_empty() {
3589 let edge_id = sparrowdb_common::EdgeId(
3590 (*catalog_rel_id << 32) | (b_slot ^ a_slot) & 0xFFFF_FFFF,
3591 );
3592 row_vals.insert(rel_pat.var.clone(), Value::EdgeRef(edge_id));
3593 }
3594 raw_rows.push(row_vals);
3595 } else {
3596 let rel_var_type = if !rel_pat.var.is_empty() {
3597 Some((rel_pat.var.as_str(), effective_rel_type))
3598 } else {
3599 None
3600 };
3601 let src_label_meta = if !src_node_pat.var.is_empty()
3602 && !effective_src_label.is_empty()
3603 {
3604 Some((src_node_pat.var.as_str(), effective_src_label))
3605 } else {
3606 None
3607 };
3608 let dst_label_meta = if !dst_node_pat.var.is_empty()
3609 && !effective_dst_label.is_empty()
3610 {
3611 Some((dst_node_pat.var.as_str(), effective_dst_label))
3612 } else {
3613 None
3614 };
3615 let row = project_hop_row(
3616 &b_props,
3617 &a_props,
3618 column_names,
3619 &src_node_pat.var,
3620 &dst_node_pat.var,
3621 rel_var_type,
3622 src_label_meta,
3623 dst_label_meta,
3624 &self.store,
3625 );
3626 rows.push(row);
3627 }
3628 }
3629 }
3630 }
3631 }
3632
3633 if use_agg {
3634 rows = self.aggregate_rows_graph(&raw_rows, &m.return_clause.items);
3635 } else {
3636 if m.distinct {
3638 deduplicate_rows(&mut rows);
3639 }
3640
3641 apply_order_by(&mut rows, m, column_names);
3643
3644 if let Some(skip) = m.skip {
3646 let skip = (skip as usize).min(rows.len());
3647 rows.drain(0..skip);
3648 }
3649
3650 if let Some(lim) = m.limit {
3652 rows.truncate(lim as usize);
3653 }
3654 }
3655
3656 tracing::debug!(rows = rows.len(), "one-hop traversal complete");
3657 Ok(QueryResult {
3658 columns: column_names.to_vec(),
3659 rows,
3660 })
3661 }
3662
3663 fn execute_two_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
3666 use crate::join::AspJoin;
3667
3668 let pat = &m.pattern[0];
3669 let src_node_pat = &pat.nodes[0];
3670 let fof_node_pat = &pat.nodes[2];
3672
3673 let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
3674 let fof_label = fof_node_pat.labels.first().cloned().unwrap_or_default();
3675 let src_label_id = self
3676 .catalog
3677 .get_label(&src_label)?
3678 .ok_or(sparrowdb_common::Error::NotFound)? as u32;
3679 let fof_label_id = self
3680 .catalog
3681 .get_label(&fof_label)?
3682 .ok_or(sparrowdb_common::Error::NotFound)? as u32;
3683
3684 let hwm_src = self.store.hwm_for_label(src_label_id)?;
3685 tracing::debug!(src_label = %src_label, fof_label = %fof_label, hwm_src = hwm_src, "two-hop traversal start");
3686
3687 let col_ids_fof = {
3691 let mut ids = collect_col_ids_for_var(&fof_node_pat.var, column_names, fof_label_id);
3692 for p in &fof_node_pat.props {
3693 let col_id = prop_name_to_col_id(&p.key);
3694 if !ids.contains(&col_id) {
3695 ids.push(col_id);
3696 }
3697 }
3698 if let Some(ref where_expr) = m.where_clause {
3699 collect_col_ids_from_expr_for_var(where_expr, &fof_node_pat.var, &mut ids);
3700 }
3701 ids
3702 };
3703
3704 let col_ids_src_where: Vec<u32> = {
3709 let mut ids = collect_col_ids_for_var(&src_node_pat.var, column_names, src_label_id);
3710 if let Some(ref where_expr) = m.where_clause {
3711 collect_col_ids_from_expr_for_var(where_expr, &src_node_pat.var, &mut ids);
3712 }
3713 ids
3714 };
3715
3716 let delta_adj: HashMap<u64, Vec<u64>> = {
3722 let mut adj: HashMap<u64, Vec<u64>> = HashMap::new();
3723 for r in self.read_delta_all() {
3724 let r_src_label = (r.src.0 >> 32) as u32;
3725 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
3726 if r_src_label == src_label_id {
3727 adj.entry(r_src_slot)
3728 .or_default()
3729 .push(r.dst.0 & 0xFFFF_FFFF);
3730 }
3731 }
3732 adj
3733 };
3734
3735 let merged_csr = {
3740 let max_nodes = self.csrs.values().map(|c| c.n_nodes()).max().unwrap_or(0);
3741 let mut edges: Vec<(u64, u64)> = Vec::new();
3742 for csr in self.csrs.values() {
3743 for src in 0..csr.n_nodes() {
3744 for &dst in csr.neighbors(src) {
3745 edges.push((src, dst));
3746 }
3747 }
3748 }
3749 edges.sort_unstable();
3751 edges.dedup();
3752 CsrForward::build(max_nodes, &edges)
3753 };
3754 let join = AspJoin::new(&merged_csr);
3755 let mut rows = Vec::new();
3756
3757 for src_slot in 0..hwm_src {
3759 self.check_deadline()?;
3761
3762 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
3763 let src_needed: Vec<u32> = {
3764 let mut v = vec![];
3765 for p in &src_node_pat.props {
3766 let col_id = prop_name_to_col_id(&p.key);
3767 if !v.contains(&col_id) {
3768 v.push(col_id);
3769 }
3770 }
3771 for &col_id in &col_ids_src_where {
3772 if !v.contains(&col_id) {
3773 v.push(col_id);
3774 }
3775 }
3776 v
3777 };
3778
3779 let src_props = read_node_props(&self.store, src_node, &src_needed)?;
3780
3781 if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
3783 continue;
3784 }
3785
3786 let mut fof_slots = join.two_hop(src_slot)?;
3788
3789 let first_hop_delta = delta_adj
3792 .get(&src_slot)
3793 .map(|v| v.as_slice())
3794 .unwrap_or(&[]);
3795 if !first_hop_delta.is_empty() {
3796 let mut delta_fof: HashSet<u64> = HashSet::new();
3797 for &mid_slot in first_hop_delta {
3798 for &fof in merged_csr.neighbors(mid_slot) {
3800 delta_fof.insert(fof);
3801 }
3802 if let Some(mid_neighbors) = delta_adj.get(&mid_slot) {
3804 for &fof in mid_neighbors {
3805 delta_fof.insert(fof);
3806 }
3807 }
3808 }
3809 fof_slots.extend(delta_fof);
3810 let unique: HashSet<u64> = fof_slots.into_iter().collect();
3812 fof_slots = unique.into_iter().collect();
3813 fof_slots.sort_unstable();
3814 }
3815
3816 for fof_slot in fof_slots {
3817 let fof_node = NodeId(((fof_label_id as u64) << 32) | fof_slot);
3818 let fof_props = read_node_props(&self.store, fof_node, &col_ids_fof)?;
3819
3820 if !self.matches_prop_filter(&fof_props, &fof_node_pat.props) {
3822 continue;
3823 }
3824
3825 if let Some(ref where_expr) = m.where_clause {
3827 let mut row_vals = build_row_vals(
3828 &src_props,
3829 &src_node_pat.var,
3830 &col_ids_src_where,
3831 &self.store,
3832 );
3833 row_vals.extend(build_row_vals(
3834 &fof_props,
3835 &fof_node_pat.var,
3836 &col_ids_fof,
3837 &self.store,
3838 ));
3839 if !src_node_pat.var.is_empty() && !src_label.is_empty() {
3841 row_vals.insert(
3842 format!("{}.__labels__", src_node_pat.var),
3843 Value::List(vec![Value::String(src_label.clone())]),
3844 );
3845 }
3846 if !fof_node_pat.var.is_empty() && !fof_label.is_empty() {
3847 row_vals.insert(
3848 format!("{}.__labels__", fof_node_pat.var),
3849 Value::List(vec![Value::String(fof_label.clone())]),
3850 );
3851 }
3852 if !pat.rels[0].var.is_empty() {
3854 row_vals.insert(
3855 format!("{}.__type__", pat.rels[0].var),
3856 Value::String(pat.rels[0].rel_type.clone()),
3857 );
3858 }
3859 if !pat.rels[1].var.is_empty() {
3860 row_vals.insert(
3861 format!("{}.__type__", pat.rels[1].var),
3862 Value::String(pat.rels[1].rel_type.clone()),
3863 );
3864 }
3865 row_vals.extend(self.dollar_params());
3866 if !self.eval_where_graph(where_expr, &row_vals) {
3867 continue;
3868 }
3869 }
3870
3871 let row = project_fof_row(
3872 &src_props,
3873 &fof_props,
3874 column_names,
3875 &src_node_pat.var,
3876 &self.store,
3877 );
3878 rows.push(row);
3879 }
3880 }
3881
3882 if m.distinct {
3884 deduplicate_rows(&mut rows);
3885 }
3886
3887 apply_order_by(&mut rows, m, column_names);
3889
3890 if let Some(skip) = m.skip {
3892 let skip = (skip as usize).min(rows.len());
3893 rows.drain(0..skip);
3894 }
3895
3896 if let Some(lim) = m.limit {
3898 rows.truncate(lim as usize);
3899 }
3900
3901 tracing::debug!(rows = rows.len(), "two-hop traversal complete");
3902 Ok(QueryResult {
3903 columns: column_names.to_vec(),
3904 rows,
3905 })
3906 }
3907
3908 fn execute_n_hop(&self, m: &MatchStatement, column_names: &[String]) -> Result<QueryResult> {
3923 let pat = &m.pattern[0];
3924 let n_nodes = pat.nodes.len();
3925 let n_rels = pat.rels.len();
3926
3927 if n_nodes != n_rels + 1 {
3929 return Err(sparrowdb_common::Error::Unimplemented);
3930 }
3931
3932 let col_ids_per_node: Vec<Vec<u32>> = (0..n_nodes)
3935 .map(|i| {
3936 let node_pat = &pat.nodes[i];
3937 let var = &node_pat.var;
3938 let mut ids = if var.is_empty() {
3939 vec![]
3940 } else {
3941 collect_col_ids_for_var(var, column_names, 0)
3942 };
3943 if let Some(ref where_expr) = m.where_clause {
3945 if !var.is_empty() {
3946 collect_col_ids_from_expr_for_var(where_expr, var, &mut ids);
3947 }
3948 }
3949 for p in &node_pat.props {
3951 let col_id = prop_name_to_col_id(&p.key);
3952 if !ids.contains(&col_id) {
3953 ids.push(col_id);
3954 }
3955 }
3956 if ids.is_empty() {
3958 ids.push(0);
3959 }
3960 ids
3961 })
3962 .collect();
3963
3964 let label_ids_per_node: Vec<Option<u32>> = (0..n_nodes)
3966 .map(|i| {
3967 let label = pat.nodes[i].labels.first().cloned().unwrap_or_default();
3968 if label.is_empty() {
3969 None
3970 } else {
3971 self.catalog
3972 .get_label(&label)
3973 .ok()
3974 .flatten()
3975 .map(|id| id as u32)
3976 }
3977 })
3978 .collect();
3979
3980 let src_label_id = match label_ids_per_node[0] {
3982 Some(id) => id,
3983 None => return Err(sparrowdb_common::Error::Unimplemented),
3984 };
3985 let hwm_src = self.store.hwm_for_label(src_label_id)?;
3986
3987 let delta_all = self.read_delta_all();
3989
3990 let mut rows: Vec<Vec<Value>> = Vec::new();
3991
3992 for src_slot in 0..hwm_src {
3993 self.check_deadline()?;
3995
3996 let src_node_id = NodeId(((src_label_id as u64) << 32) | src_slot);
3997
3998 if self.is_node_tombstoned(src_node_id) {
4000 continue;
4001 }
4002
4003 let src_props = read_node_props(&self.store, src_node_id, &col_ids_per_node[0])?;
4004
4005 if !self.matches_prop_filter(&src_props, &pat.nodes[0].props) {
4007 continue;
4008 }
4009
4010 let mut row_vals: HashMap<String, Value> = HashMap::new();
4012 if !pat.nodes[0].var.is_empty() {
4013 for &(col_id, raw) in &src_props {
4014 let key = format!("{}.col_{col_id}", pat.nodes[0].var);
4015 row_vals.insert(key, decode_raw_val(raw, &self.store));
4016 }
4017 }
4018
4019 let mut frontier: Vec<(u64, HashMap<String, Value>)> = vec![(src_slot, row_vals)];
4023
4024 for hop_idx in 0..n_rels {
4025 let next_node_pat = &pat.nodes[hop_idx + 1];
4026 let next_label_id_opt = label_ids_per_node[hop_idx + 1];
4027 let next_col_ids = &col_ids_per_node[hop_idx + 1];
4028 let cur_label_id = label_ids_per_node[hop_idx].unwrap_or(src_label_id);
4029
4030 let mut next_frontier: Vec<(u64, HashMap<String, Value>)> = Vec::new();
4031
4032 for (cur_slot, cur_vals) in frontier {
4033 let csr_nb: Vec<u64> = self.csr_neighbors_all(cur_slot);
4035 let delta_nb: Vec<u64> = delta_all
4036 .iter()
4037 .filter(|r| {
4038 let r_src_label = (r.src.0 >> 32) as u32;
4039 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4040 r_src_label == cur_label_id && r_src_slot == cur_slot
4041 })
4042 .map(|r| r.dst.0 & 0xFFFF_FFFF)
4043 .collect();
4044
4045 let mut seen: HashSet<u64> = HashSet::new();
4046 let all_nb: Vec<u64> = csr_nb
4047 .into_iter()
4048 .chain(delta_nb)
4049 .filter(|&nb| seen.insert(nb))
4050 .collect();
4051
4052 for next_slot in all_nb {
4053 let next_node_id = if let Some(lbl_id) = next_label_id_opt {
4054 NodeId(((lbl_id as u64) << 32) | next_slot)
4055 } else {
4056 NodeId(next_slot)
4057 };
4058
4059 let next_props = read_node_props(&self.store, next_node_id, next_col_ids)?;
4060
4061 if !self.matches_prop_filter(&next_props, &next_node_pat.props) {
4063 continue;
4064 }
4065
4066 let mut new_vals = cur_vals.clone();
4069 if !next_node_pat.var.is_empty() {
4070 for &(col_id, raw) in &next_props {
4071 let key = format!("{}.col_{col_id}", next_node_pat.var);
4072 new_vals.insert(key, decode_raw_val(raw, &self.store));
4073 }
4074 }
4075
4076 next_frontier.push((next_slot, new_vals));
4077 }
4078 }
4079
4080 frontier = next_frontier;
4081 }
4082
4083 for (_final_slot, path_vals) in frontier {
4085 if let Some(ref where_expr) = m.where_clause {
4087 let mut eval_vals = path_vals.clone();
4088 eval_vals.extend(self.dollar_params());
4089 if !self.eval_where_graph(where_expr, &eval_vals) {
4090 continue;
4091 }
4092 }
4093
4094 let row: Vec<Value> = column_names
4097 .iter()
4098 .map(|col_name| {
4099 if let Some((var, prop)) = col_name.split_once('.') {
4100 let key = format!("{var}.col_{}", col_id_of(prop));
4101 path_vals.get(&key).cloned().unwrap_or(Value::Null)
4102 } else {
4103 Value::Null
4104 }
4105 })
4106 .collect();
4107
4108 rows.push(row);
4109 }
4110 }
4111
4112 if m.distinct {
4114 deduplicate_rows(&mut rows);
4115 }
4116
4117 apply_order_by(&mut rows, m, column_names);
4119
4120 if let Some(skip) = m.skip {
4122 let skip = (skip as usize).min(rows.len());
4123 rows.drain(0..skip);
4124 }
4125
4126 if let Some(lim) = m.limit {
4128 rows.truncate(lim as usize);
4129 }
4130
4131 tracing::debug!(
4132 rows = rows.len(),
4133 n_rels = n_rels,
4134 "n-hop traversal complete"
4135 );
4136 Ok(QueryResult {
4137 columns: column_names.to_vec(),
4138 rows,
4139 })
4140 }
4141
4142 fn get_node_neighbors_labeled(
4157 &self,
4158 src_slot: u64,
4159 src_label_id: u32,
4160 delta_all: &[sparrowdb_storage::edge_store::DeltaRecord],
4161 node_label: &std::collections::HashMap<(u64, u32), ()>,
4162 all_label_ids: &[u32],
4163 ) -> Vec<(u64, u32)> {
4164 let csr_slots: Vec<u64> = self.csr_neighbors_all(src_slot);
4167
4168 let mut out: std::collections::HashMap<(u64, u32), ()> = std::collections::HashMap::new();
4172
4173 for r in delta_all.iter().filter(|r| {
4175 let r_src_label = (r.src.0 >> 32) as u32;
4176 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4177 r_src_label == src_label_id && r_src_slot == src_slot
4178 }) {
4179 let dst_slot = r.dst.0 & 0xFFFF_FFFF;
4180 let dst_label = (r.dst.0 >> 32) as u32;
4181 out.insert((dst_slot, dst_label), ());
4182 }
4183
4184 'csr: for dst_slot in csr_slots {
4188 for &lid in all_label_ids {
4190 if out.contains_key(&(dst_slot, lid)) {
4191 continue 'csr; }
4193 }
4194 let mut found = false;
4197 for &lid in all_label_ids {
4198 if node_label.contains_key(&(dst_slot, lid)) {
4199 out.insert((dst_slot, lid), ());
4200 found = true;
4201 break;
4202 }
4203 }
4204 if !found {
4205 out.insert((dst_slot, src_label_id), ());
4209 }
4210 }
4211
4212 out.into_keys().collect()
4213 }
4214
4215 fn execute_variable_hops(
4228 &self,
4229 src_slot: u64,
4230 src_label_id: u32,
4231 min_hops: u32,
4232 max_hops: u32,
4233 ) -> Vec<(u64, u32)> {
4234 const SAFETY_CAP: u32 = 10;
4235 let max_hops = max_hops.min(SAFETY_CAP);
4236
4237 let delta_all = self.read_delta_all();
4241 let mut node_label: std::collections::HashMap<(u64, u32), ()> =
4242 std::collections::HashMap::new();
4243 for r in &delta_all {
4244 let src_s = r.src.0 & 0xFFFF_FFFF;
4245 let src_l = (r.src.0 >> 32) as u32;
4246 node_label.insert((src_s, src_l), ());
4247 let dst_s = r.dst.0 & 0xFFFF_FFFF;
4248 let dst_l = (r.dst.0 >> 32) as u32;
4249 node_label.insert((dst_s, dst_l), ());
4250 }
4251 let mut all_label_ids: Vec<u32> = node_label.keys().map(|&(_, l)| l).collect();
4252 all_label_ids.sort_unstable();
4253 all_label_ids.dedup();
4254
4255 let mut visited: std::collections::HashSet<(u64, u32)> = std::collections::HashSet::new();
4261 visited.insert((src_slot, src_label_id));
4262
4263 let mut results: std::collections::HashSet<(u64, u32)> = std::collections::HashSet::new();
4265 if min_hops == 0 {
4266 results.insert((src_slot, src_label_id));
4267 }
4268
4269 let mut frontier: Vec<(u64, u32)> = vec![(src_slot, src_label_id)];
4272
4273 for depth in 1..=max_hops {
4274 let mut next_frontier: Vec<(u64, u32)> = Vec::new();
4275 for &(node_slot, node_label_id) in &frontier {
4276 let neighbors = self.get_node_neighbors_labeled(
4277 node_slot,
4278 node_label_id,
4279 &delta_all,
4280 &node_label,
4281 &all_label_ids,
4282 );
4283 for (nb_slot, nb_label) in neighbors {
4284 if visited.insert((nb_slot, nb_label)) {
4287 next_frontier.push((nb_slot, nb_label));
4288 if depth >= min_hops {
4289 results.insert((nb_slot, nb_label));
4290 }
4291 }
4292 }
4293 }
4294 if next_frontier.is_empty() {
4295 break;
4296 }
4297 frontier = next_frontier;
4298 }
4299
4300 results.into_iter().collect()
4301 }
4302
4303 fn get_node_neighbors_by_slot(&self, src_slot: u64, src_label_id: u32) -> Vec<u64> {
4305 let csr_neighbors: Vec<u64> = self.csr_neighbors_all(src_slot);
4306 let delta_neighbors: Vec<u64> = self
4307 .read_delta_all()
4308 .into_iter()
4309 .filter(|r| {
4310 let r_src_label = (r.src.0 >> 32) as u32;
4311 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4312 r_src_label == src_label_id && r_src_slot == src_slot
4313 })
4314 .map(|r| r.dst.0 & 0xFFFF_FFFF)
4315 .collect();
4316 let mut all: std::collections::HashSet<u64> = csr_neighbors.into_iter().collect();
4317 all.extend(delta_neighbors);
4318 all.into_iter().collect()
4319 }
4320
4321 fn execute_variable_length(
4323 &self,
4324 m: &MatchStatement,
4325 column_names: &[String],
4326 ) -> Result<QueryResult> {
4327 let pat = &m.pattern[0];
4328 let src_node_pat = &pat.nodes[0];
4329 let dst_node_pat = &pat.nodes[1];
4330 let rel_pat = &pat.rels[0];
4331
4332 if rel_pat.dir != sparrowdb_cypher::ast::EdgeDir::Outgoing {
4333 return Err(sparrowdb_common::Error::Unimplemented);
4334 }
4335
4336 let min_hops = rel_pat.min_hops.unwrap_or(1);
4337 let max_hops = rel_pat.max_hops.unwrap_or(10); let src_label = src_node_pat.labels.first().cloned().unwrap_or_default();
4340 let dst_label = dst_node_pat.labels.first().cloned().unwrap_or_default();
4341
4342 let src_label_id = self
4343 .catalog
4344 .get_label(&src_label)?
4345 .ok_or(sparrowdb_common::Error::NotFound)? as u32;
4346 let dst_label_id: Option<u32> = if dst_label.is_empty() {
4348 None
4349 } else {
4350 Some(
4351 self.catalog
4352 .get_label(&dst_label)?
4353 .ok_or(sparrowdb_common::Error::NotFound)? as u32,
4354 )
4355 };
4356
4357 let hwm_src = self.store.hwm_for_label(src_label_id)?;
4358
4359 let col_ids_src = collect_col_ids_for_var(&src_node_pat.var, column_names, src_label_id);
4360 let col_ids_dst =
4361 collect_col_ids_for_var(&dst_node_pat.var, column_names, dst_label_id.unwrap_or(0));
4362
4363 let dst_all_col_ids: Vec<u32> = {
4366 let mut v = col_ids_dst.clone();
4367 for p in &dst_node_pat.props {
4368 let col_id = prop_name_to_col_id(&p.key);
4369 if !v.contains(&col_id) {
4370 v.push(col_id);
4371 }
4372 }
4373 if let Some(ref where_expr) = m.where_clause {
4374 collect_col_ids_from_expr(where_expr, &mut v);
4375 }
4376 v
4377 };
4378
4379 let mut rows: Vec<Vec<Value>> = Vec::new();
4380 let mut seen_pairs: std::collections::HashSet<(u64, u64, u32)> =
4384 std::collections::HashSet::new();
4385
4386 for src_slot in 0..hwm_src {
4387 self.check_deadline()?;
4389
4390 let src_node = NodeId(((src_label_id as u64) << 32) | src_slot);
4391
4392 let src_all_col_ids: Vec<u32> = {
4394 let mut v = col_ids_src.clone();
4395 for p in &src_node_pat.props {
4396 let col_id = prop_name_to_col_id(&p.key);
4397 if !v.contains(&col_id) {
4398 v.push(col_id);
4399 }
4400 }
4401 if let Some(ref where_expr) = m.where_clause {
4402 collect_col_ids_from_expr(where_expr, &mut v);
4403 }
4404 v
4405 };
4406 let src_props = read_node_props(&self.store, src_node, &src_all_col_ids)?;
4407
4408 if !self.matches_prop_filter(&src_props, &src_node_pat.props) {
4409 continue;
4410 }
4411
4412 let dst_nodes = self.execute_variable_hops(src_slot, src_label_id, min_hops, max_hops);
4414
4415 for (dst_slot, actual_label_id) in dst_nodes {
4416 if let Some(required_label) = dst_label_id {
4419 if actual_label_id != required_label {
4420 continue;
4421 }
4422 }
4423
4424 let resolved_dst_label_id = dst_label_id.unwrap_or(actual_label_id);
4427
4428 if !seen_pairs.insert((src_slot, dst_slot, actual_label_id)) {
4429 continue;
4430 }
4431
4432 let dst_node = NodeId(((resolved_dst_label_id as u64) << 32) | dst_slot);
4433 let dst_props = read_node_props(&self.store, dst_node, &dst_all_col_ids)?;
4438
4439 if !self.matches_prop_filter(&dst_props, &dst_node_pat.props) {
4440 continue;
4441 }
4442
4443 if let Some(ref where_expr) = m.where_clause {
4445 let mut row_vals =
4446 build_row_vals(&src_props, &src_node_pat.var, &col_ids_src, &self.store);
4447 row_vals.extend(build_row_vals(
4448 &dst_props,
4449 &dst_node_pat.var,
4450 &col_ids_dst,
4451 &self.store,
4452 ));
4453 if !rel_pat.var.is_empty() {
4455 row_vals.insert(
4456 format!("{}.__type__", rel_pat.var),
4457 Value::String(rel_pat.rel_type.clone()),
4458 );
4459 }
4460 if !src_node_pat.var.is_empty() && !src_label.is_empty() {
4462 row_vals.insert(
4463 format!("{}.__labels__", src_node_pat.var),
4464 Value::List(vec![Value::String(src_label.clone())]),
4465 );
4466 }
4467 if !dst_node_pat.var.is_empty() && !dst_label.is_empty() {
4468 row_vals.insert(
4469 format!("{}.__labels__", dst_node_pat.var),
4470 Value::List(vec![Value::String(dst_label.clone())]),
4471 );
4472 }
4473 row_vals.extend(self.dollar_params());
4474 if !self.eval_where_graph(where_expr, &row_vals) {
4475 continue;
4476 }
4477 }
4478
4479 let rel_var_type = if !rel_pat.var.is_empty() {
4480 Some((rel_pat.var.as_str(), rel_pat.rel_type.as_str()))
4481 } else {
4482 None
4483 };
4484 let src_label_meta = if !src_node_pat.var.is_empty() && !src_label.is_empty() {
4485 Some((src_node_pat.var.as_str(), src_label.as_str()))
4486 } else {
4487 None
4488 };
4489 let dst_label_meta = if !dst_node_pat.var.is_empty() && !dst_label.is_empty() {
4490 Some((dst_node_pat.var.as_str(), dst_label.as_str()))
4491 } else {
4492 None
4493 };
4494 let row = project_hop_row(
4495 &src_props,
4496 &dst_props,
4497 column_names,
4498 &src_node_pat.var,
4499 &dst_node_pat.var,
4500 rel_var_type,
4501 src_label_meta,
4502 dst_label_meta,
4503 &self.store,
4504 );
4505 rows.push(row);
4506 }
4507 }
4508
4509 if m.distinct {
4511 deduplicate_rows(&mut rows);
4512 }
4513
4514 apply_order_by(&mut rows, m, column_names);
4516
4517 if let Some(skip) = m.skip {
4519 let skip = (skip as usize).min(rows.len());
4520 rows.drain(0..skip);
4521 }
4522
4523 if let Some(lim) = m.limit {
4525 rows.truncate(lim as usize);
4526 }
4527
4528 tracing::debug!(
4529 rows = rows.len(),
4530 min_hops,
4531 max_hops,
4532 "variable-length traversal complete"
4533 );
4534 Ok(QueryResult {
4535 columns: column_names.to_vec(),
4536 rows,
4537 })
4538 }
4539
4540 fn matches_prop_filter(
4543 &self,
4544 props: &[(u32, u64)],
4545 filters: &[sparrowdb_cypher::ast::PropEntry],
4546 ) -> bool {
4547 matches_prop_filter_static(props, filters, &self.dollar_params(), &self.store)
4548 }
4549
4550 fn dollar_params(&self) -> HashMap<String, Value> {
4556 self.params
4557 .iter()
4558 .map(|(k, v)| (format!("${k}"), v.clone()))
4559 .collect()
4560 }
4561
4562 fn eval_expr_graph(&self, expr: &Expr, vals: &HashMap<String, Value>) -> Value {
4566 match expr {
4567 Expr::ExistsSubquery(ep) => Value::Bool(self.eval_exists_subquery(ep, vals)),
4568 Expr::ShortestPath(sp) => self.eval_shortest_path_expr(sp, vals),
4569 Expr::CaseWhen {
4570 branches,
4571 else_expr,
4572 } => {
4573 for (cond, then_val) in branches {
4574 if let Value::Bool(true) = self.eval_expr_graph(cond, vals) {
4575 return self.eval_expr_graph(then_val, vals);
4576 }
4577 }
4578 else_expr
4579 .as_ref()
4580 .map(|e| self.eval_expr_graph(e, vals))
4581 .unwrap_or(Value::Null)
4582 }
4583 Expr::And(l, r) => {
4584 match (self.eval_expr_graph(l, vals), self.eval_expr_graph(r, vals)) {
4585 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
4586 _ => Value::Null,
4587 }
4588 }
4589 Expr::Or(l, r) => {
4590 match (self.eval_expr_graph(l, vals), self.eval_expr_graph(r, vals)) {
4591 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
4592 _ => Value::Null,
4593 }
4594 }
4595 Expr::Not(inner) => match self.eval_expr_graph(inner, vals) {
4596 Value::Bool(b) => Value::Bool(!b),
4597 _ => Value::Null,
4598 },
4599 Expr::PropAccess { var, prop } => {
4602 let normal = eval_expr(expr, vals);
4604 if !matches!(normal, Value::Null) {
4605 return normal;
4606 }
4607 if let Some(Value::NodeRef(node_id)) = vals
4609 .get(var.as_str())
4610 .or_else(|| vals.get(&format!("{var}.__node_id__")))
4611 {
4612 let col_id = prop_name_to_col_id(prop);
4613 if let Ok(props) = self.store.get_node_raw(*node_id, &[col_id]) {
4614 if let Some(&(_, raw)) = props.iter().find(|(c, _)| *c == col_id) {
4615 return decode_raw_val(raw, &self.store);
4616 }
4617 }
4618 }
4619 Value::Null
4620 }
4621 _ => eval_expr(expr, vals),
4622 }
4623 }
4624
4625 fn eval_where_graph(&self, expr: &Expr, vals: &HashMap<String, Value>) -> bool {
4627 match self.eval_expr_graph(expr, vals) {
4628 Value::Bool(b) => b,
4629 _ => eval_where(expr, vals),
4630 }
4631 }
4632
4633 fn eval_exists_subquery(
4635 &self,
4636 ep: &sparrowdb_cypher::ast::ExistsPattern,
4637 vals: &HashMap<String, Value>,
4638 ) -> bool {
4639 let path = &ep.path;
4640 if path.nodes.len() < 2 || path.rels.is_empty() {
4641 return false;
4642 }
4643 let src_pat = &path.nodes[0];
4644 let dst_pat = &path.nodes[1];
4645 let rel_pat = &path.rels[0];
4646
4647 let src_node_id = match self.resolve_node_id_from_var(&src_pat.var, vals) {
4648 Some(id) => id,
4649 None => return false,
4650 };
4651 let src_slot = src_node_id.0 & 0xFFFF_FFFF;
4652 let src_label_id = (src_node_id.0 >> 32) as u32;
4653
4654 let dst_label = dst_pat.labels.first().map(String::as_str).unwrap_or("");
4655 let dst_label_id_opt: Option<u32> = if dst_label.is_empty() {
4656 None
4657 } else {
4658 self.catalog
4659 .get_label(dst_label)
4660 .ok()
4661 .flatten()
4662 .map(|id| id as u32)
4663 };
4664
4665 let rel_lookup = if let Some(dst_lid) = dst_label_id_opt {
4666 self.resolve_rel_table_id(src_label_id, dst_lid, &rel_pat.rel_type)
4667 } else {
4668 RelTableLookup::All
4669 };
4670
4671 let csr_nb: Vec<u64> = match rel_lookup {
4672 RelTableLookup::Found(rtid) => self.csr_neighbors(rtid, src_slot),
4673 RelTableLookup::NotFound => return false,
4674 RelTableLookup::All => self.csr_neighbors_all(src_slot),
4675 };
4676 let delta_nb: Vec<u64> = self
4677 .read_delta_all()
4678 .into_iter()
4679 .filter(|r| {
4680 let r_src_label = (r.src.0 >> 32) as u32;
4681 let r_src_slot = r.src.0 & 0xFFFF_FFFF;
4682 if r_src_label != src_label_id || r_src_slot != src_slot {
4683 return false;
4684 }
4685 if let Some(dst_lid) = dst_label_id_opt {
4689 let r_dst_label = (r.dst.0 >> 32) as u32;
4690 r_dst_label == dst_lid
4691 } else {
4692 true
4693 }
4694 })
4695 .map(|r| r.dst.0 & 0xFFFF_FFFF)
4696 .collect();
4697
4698 let all_nb: std::collections::HashSet<u64> = csr_nb.into_iter().chain(delta_nb).collect();
4699
4700 for dst_slot in all_nb {
4701 if let Some(did) = dst_label_id_opt {
4702 let probe_id = NodeId(((did as u64) << 32) | dst_slot);
4703 if self.store.get_node_raw(probe_id, &[]).is_err() {
4704 continue;
4705 }
4706 if !dst_pat.props.is_empty() {
4707 let col_ids: Vec<u32> = dst_pat
4708 .props
4709 .iter()
4710 .map(|p| prop_name_to_col_id(&p.key))
4711 .collect();
4712 match self.store.get_node_raw(probe_id, &col_ids) {
4713 Ok(props) => {
4714 let params = self.dollar_params();
4715 if !matches_prop_filter_static(
4716 &props,
4717 &dst_pat.props,
4718 ¶ms,
4719 &self.store,
4720 ) {
4721 continue;
4722 }
4723 }
4724 Err(_) => continue,
4725 }
4726 }
4727 }
4728 return true;
4729 }
4730 false
4731 }
4732
4733 fn resolve_node_id_from_var(&self, var: &str, vals: &HashMap<String, Value>) -> Option<NodeId> {
4735 let id_key = format!("{var}.__node_id__");
4736 if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
4737 return Some(*nid);
4738 }
4739 if let Some(Value::NodeRef(nid)) = vals.get(var) {
4740 return Some(*nid);
4741 }
4742 None
4743 }
4744
4745 fn eval_shortest_path_expr(
4747 &self,
4748 sp: &sparrowdb_cypher::ast::ShortestPathExpr,
4749 vals: &HashMap<String, Value>,
4750 ) -> Value {
4751 let (src_label_id, src_slot) =
4756 if let Some(nid) = self.resolve_node_id_from_var(&sp.src_var, vals) {
4757 let label_id = (nid.0 >> 32) as u32;
4758 let slot = nid.0 & 0xFFFF_FFFF;
4759 (label_id, slot)
4760 } else {
4761 let label_id = match self.catalog.get_label(&sp.src_label) {
4763 Ok(Some(id)) => id as u32,
4764 _ => return Value::Null,
4765 };
4766 match self.find_node_by_props(label_id, &sp.src_props) {
4767 Some(slot) => (label_id, slot),
4768 None => return Value::Null,
4769 }
4770 };
4771
4772 let dst_slot = if let Some(nid) = self.resolve_node_id_from_var(&sp.dst_var, vals) {
4773 nid.0 & 0xFFFF_FFFF
4774 } else {
4775 let dst_label_id = match self.catalog.get_label(&sp.dst_label) {
4776 Ok(Some(id)) => id as u32,
4777 _ => return Value::Null,
4778 };
4779 match self.find_node_by_props(dst_label_id, &sp.dst_props) {
4780 Some(slot) => slot,
4781 None => return Value::Null,
4782 }
4783 };
4784
4785 match self.bfs_shortest_path(src_slot, src_label_id, dst_slot, 10) {
4786 Some(hops) => Value::Int64(hops as i64),
4787 None => Value::Null,
4788 }
4789 }
4790
4791 fn find_node_by_props(
4793 &self,
4794 label_id: u32,
4795 props: &[sparrowdb_cypher::ast::PropEntry],
4796 ) -> Option<u64> {
4797 if props.is_empty() {
4798 return None;
4799 }
4800 let hwm = self.store.hwm_for_label(label_id).ok()?;
4801 let col_ids: Vec<u32> = props.iter().map(|p| prop_name_to_col_id(&p.key)).collect();
4802 let params = self.dollar_params();
4803 for slot in 0..hwm {
4804 let node_id = NodeId(((label_id as u64) << 32) | slot);
4805 if let Ok(raw_props) = self.store.get_node_raw(node_id, &col_ids) {
4806 if matches_prop_filter_static(&raw_props, props, ¶ms, &self.store) {
4807 return Some(slot);
4808 }
4809 }
4810 }
4811 None
4812 }
4813
4814 fn bfs_shortest_path(
4823 &self,
4824 src_slot: u64,
4825 src_label_id: u32,
4826 dst_slot: u64,
4827 max_hops: u32,
4828 ) -> Option<u32> {
4829 if src_slot == dst_slot {
4830 return Some(0);
4831 }
4832 let mut visited: std::collections::HashSet<u64> = std::collections::HashSet::new();
4833 visited.insert(src_slot);
4834 let mut frontier: Vec<u64> = vec![src_slot];
4835
4836 for depth in 1..=max_hops {
4837 let mut next_frontier: Vec<u64> = Vec::new();
4838 for &node_slot in &frontier {
4839 let neighbors = self.get_node_neighbors_by_slot(node_slot, src_label_id);
4840 for nb in neighbors {
4841 if nb == dst_slot {
4842 return Some(depth);
4843 }
4844 if visited.insert(nb) {
4845 next_frontier.push(nb);
4846 }
4847 }
4848 }
4849 if next_frontier.is_empty() {
4850 break;
4851 }
4852 frontier = next_frontier;
4853 }
4854 None
4855 }
4856
4857 fn aggregate_rows_graph(
4860 &self,
4861 rows: &[HashMap<String, Value>],
4862 return_items: &[ReturnItem],
4863 ) -> Vec<Vec<Value>> {
4864 let needs_graph = return_items.iter().any(|item| expr_needs_graph(&item.expr));
4866 if !needs_graph {
4867 return aggregate_rows(rows, return_items);
4868 }
4869 rows.iter()
4871 .map(|row_vals| {
4872 return_items
4873 .iter()
4874 .map(|item| self.eval_expr_graph(&item.expr, row_vals))
4875 .collect()
4876 })
4877 .collect()
4878 }
4879}
4880
4881fn matches_prop_filter_static(
4884 props: &[(u32, u64)],
4885 filters: &[sparrowdb_cypher::ast::PropEntry],
4886 params: &HashMap<String, Value>,
4887 store: &NodeStore,
4888) -> bool {
4889 for f in filters {
4890 let col_id = prop_name_to_col_id(&f.key);
4891 let stored_val = props.iter().find(|(c, _)| *c == col_id).map(|(_, v)| *v);
4892
4893 let filter_val = eval_expr(&f.value, params);
4896 let matches = match filter_val {
4897 Value::Int64(n) => {
4898 stored_val == Some(StoreValue::Int64(n).to_u64())
4901 }
4902 Value::Bool(b) => {
4903 let expected = StoreValue::Int64(if b { 1 } else { 0 }).to_u64();
4906 stored_val == Some(expected)
4907 }
4908 Value::String(s) => {
4909 stored_val.is_some_and(|raw| store.raw_str_matches(raw, &s))
4912 }
4913 Value::Float64(f) => {
4914 stored_val.is_some_and(|raw| {
4917 matches!(store.decode_raw_value(raw), StoreValue::Float(stored_f) if stored_f == f)
4918 })
4919 }
4920 Value::Null => true, _ => false,
4922 };
4923 if !matches {
4924 return false;
4925 }
4926 }
4927 true
4928}
4929
4930fn eval_list_expr(expr: &Expr, params: &HashMap<String, Value>) -> Result<Vec<Value>> {
4939 match expr {
4940 Expr::List(elems) => {
4941 let mut values = Vec::with_capacity(elems.len());
4942 for elem in elems {
4943 values.push(eval_scalar_expr(elem));
4944 }
4945 Ok(values)
4946 }
4947 Expr::Literal(Literal::Param(name)) => {
4948 match params.get(name) {
4950 Some(Value::List(items)) => Ok(items.clone()),
4951 Some(other) => {
4952 Ok(vec![other.clone()])
4955 }
4956 None => {
4957 Ok(vec![])
4959 }
4960 }
4961 }
4962 Expr::FnCall { name, args } => {
4963 let name_lc = name.to_lowercase();
4966 if name_lc == "range" {
4967 let empty_vals: std::collections::HashMap<String, Value> =
4968 std::collections::HashMap::new();
4969 let evaluated: Vec<Value> =
4970 args.iter().map(|a| eval_expr(a, &empty_vals)).collect();
4971 let start = match evaluated.first() {
4973 Some(Value::Int64(n)) => *n,
4974 _ => {
4975 return Err(sparrowdb_common::Error::InvalidArgument(
4976 "range() expects integer arguments".into(),
4977 ))
4978 }
4979 };
4980 let end = match evaluated.get(1) {
4981 Some(Value::Int64(n)) => *n,
4982 _ => {
4983 return Err(sparrowdb_common::Error::InvalidArgument(
4984 "range() expects at least 2 integer arguments".into(),
4985 ))
4986 }
4987 };
4988 let step: i64 = match evaluated.get(2) {
4989 Some(Value::Int64(n)) => *n,
4990 None => 1,
4991 _ => 1,
4992 };
4993 if step == 0 {
4994 return Err(sparrowdb_common::Error::InvalidArgument(
4995 "range(): step must not be zero".into(),
4996 ));
4997 }
4998 let mut values = Vec::new();
4999 if step > 0 {
5000 let mut i = start;
5001 while i <= end {
5002 values.push(Value::Int64(i));
5003 i += step;
5004 }
5005 } else {
5006 let mut i = start;
5007 while i >= end {
5008 values.push(Value::Int64(i));
5009 i += step;
5010 }
5011 }
5012 Ok(values)
5013 } else {
5014 Err(sparrowdb_common::Error::InvalidArgument(format!(
5016 "UNWIND: function '{name}' does not return a list"
5017 )))
5018 }
5019 }
5020 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
5021 "UNWIND expression is not a list: {:?}",
5022 other
5023 ))),
5024 }
5025}
5026
5027fn eval_scalar_expr(expr: &Expr) -> Value {
5029 match expr {
5030 Expr::Literal(lit) => match lit {
5031 Literal::Int(n) => Value::Int64(*n),
5032 Literal::Float(f) => Value::Float64(*f),
5033 Literal::Bool(b) => Value::Bool(*b),
5034 Literal::String(s) => Value::String(s.clone()),
5035 Literal::Null => Value::Null,
5036 Literal::Param(_) => Value::Null,
5037 },
5038 _ => Value::Null,
5039 }
5040}
5041
5042fn extract_return_column_names(items: &[ReturnItem]) -> Vec<String> {
5043 items
5044 .iter()
5045 .map(|item| match &item.alias {
5046 Some(alias) => alias.clone(),
5047 None => match &item.expr {
5048 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
5049 Expr::Var(v) => v.clone(),
5050 Expr::CountStar => "count(*)".to_string(),
5051 Expr::FnCall { name, args } => {
5052 let arg_str = args
5053 .first()
5054 .map(|a| match a {
5055 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
5056 Expr::Var(v) => v.clone(),
5057 _ => "*".to_string(),
5058 })
5059 .unwrap_or_else(|| "*".to_string());
5060 format!("{}({})", name.to_lowercase(), arg_str)
5061 }
5062 _ => "?".to_string(),
5063 },
5064 })
5065 .collect()
5066}
5067
5068fn collect_col_ids_from_expr_for_var(expr: &Expr, target_var: &str, out: &mut Vec<u32>) {
5075 match expr {
5076 Expr::PropAccess { var, prop } => {
5077 if var == target_var {
5078 let col_id = prop_name_to_col_id(prop);
5079 if !out.contains(&col_id) {
5080 out.push(col_id);
5081 }
5082 }
5083 }
5084 Expr::BinOp { left, right, .. } => {
5085 collect_col_ids_from_expr_for_var(left, target_var, out);
5086 collect_col_ids_from_expr_for_var(right, target_var, out);
5087 }
5088 Expr::And(l, r) | Expr::Or(l, r) => {
5089 collect_col_ids_from_expr_for_var(l, target_var, out);
5090 collect_col_ids_from_expr_for_var(r, target_var, out);
5091 }
5092 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
5093 collect_col_ids_from_expr_for_var(inner, target_var, out);
5094 }
5095 Expr::InList { expr, list, .. } => {
5096 collect_col_ids_from_expr_for_var(expr, target_var, out);
5097 for item in list {
5098 collect_col_ids_from_expr_for_var(item, target_var, out);
5099 }
5100 }
5101 Expr::FnCall { args, .. } | Expr::List(args) => {
5102 for arg in args {
5103 collect_col_ids_from_expr_for_var(arg, target_var, out);
5104 }
5105 }
5106 Expr::ListPredicate {
5107 list_expr,
5108 predicate,
5109 ..
5110 } => {
5111 collect_col_ids_from_expr_for_var(list_expr, target_var, out);
5112 collect_col_ids_from_expr_for_var(predicate, target_var, out);
5113 }
5114 Expr::CaseWhen {
5116 branches,
5117 else_expr,
5118 } => {
5119 for (cond, then_val) in branches {
5120 collect_col_ids_from_expr_for_var(cond, target_var, out);
5121 collect_col_ids_from_expr_for_var(then_val, target_var, out);
5122 }
5123 if let Some(e) = else_expr {
5124 collect_col_ids_from_expr_for_var(e, target_var, out);
5125 }
5126 }
5127 _ => {}
5128 }
5129}
5130
5131fn collect_col_ids_from_expr(expr: &Expr, out: &mut Vec<u32>) {
5136 match expr {
5137 Expr::PropAccess { prop, .. } => {
5138 let col_id = prop_name_to_col_id(prop);
5139 if !out.contains(&col_id) {
5140 out.push(col_id);
5141 }
5142 }
5143 Expr::BinOp { left, right, .. } => {
5144 collect_col_ids_from_expr(left, out);
5145 collect_col_ids_from_expr(right, out);
5146 }
5147 Expr::And(l, r) | Expr::Or(l, r) => {
5148 collect_col_ids_from_expr(l, out);
5149 collect_col_ids_from_expr(r, out);
5150 }
5151 Expr::Not(inner) => collect_col_ids_from_expr(inner, out),
5152 Expr::InList { expr, list, .. } => {
5153 collect_col_ids_from_expr(expr, out);
5154 for item in list {
5155 collect_col_ids_from_expr(item, out);
5156 }
5157 }
5158 Expr::FnCall { args, .. } => {
5160 for arg in args {
5161 collect_col_ids_from_expr(arg, out);
5162 }
5163 }
5164 Expr::ListPredicate {
5165 list_expr,
5166 predicate,
5167 ..
5168 } => {
5169 collect_col_ids_from_expr(list_expr, out);
5170 collect_col_ids_from_expr(predicate, out);
5171 }
5172 Expr::List(items) => {
5174 for item in items {
5175 collect_col_ids_from_expr(item, out);
5176 }
5177 }
5178 Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
5179 collect_col_ids_from_expr(inner, out);
5180 }
5181 Expr::CaseWhen {
5183 branches,
5184 else_expr,
5185 } => {
5186 for (cond, then_val) in branches {
5187 collect_col_ids_from_expr(cond, out);
5188 collect_col_ids_from_expr(then_val, out);
5189 }
5190 if let Some(e) = else_expr {
5191 collect_col_ids_from_expr(e, out);
5192 }
5193 }
5194 _ => {}
5195 }
5196}
5197
5198#[allow(dead_code)]
5203fn literal_to_store_value(lit: &Literal) -> StoreValue {
5204 match lit {
5205 Literal::Int(n) => StoreValue::Int64(*n),
5206 Literal::String(s) => StoreValue::Bytes(s.as_bytes().to_vec()),
5207 Literal::Float(f) => StoreValue::Float(*f),
5208 Literal::Bool(b) => StoreValue::Int64(if *b { 1 } else { 0 }),
5209 Literal::Null | Literal::Param(_) => StoreValue::Int64(0),
5210 }
5211}
5212
5213fn value_to_store_value(val: Value) -> StoreValue {
5218 match val {
5219 Value::Int64(n) => StoreValue::Int64(n),
5220 Value::Float64(f) => StoreValue::Float(f),
5221 Value::Bool(b) => StoreValue::Int64(if b { 1 } else { 0 }),
5222 Value::String(s) => StoreValue::Bytes(s.into_bytes()),
5223 Value::Null => StoreValue::Int64(0),
5224 Value::NodeRef(id) => StoreValue::Int64(id.0 as i64),
5225 Value::EdgeRef(id) => StoreValue::Int64(id.0 as i64),
5226 Value::List(_) => StoreValue::Int64(0),
5227 Value::Map(_) => StoreValue::Int64(0),
5228 }
5229}
5230
5231fn string_to_raw_u64(s: &str) -> u64 {
5237 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
5238}
5239
5240fn try_index_lookup_for_props(
5251 props: &[sparrowdb_cypher::ast::PropEntry],
5252 label_id: u32,
5253 prop_index: &sparrowdb_storage::property_index::PropertyIndex,
5254) -> Option<Vec<u32>> {
5255 if props.len() != 1 {
5257 return None;
5258 }
5259 let filter = &props[0];
5260
5261 let raw_value: u64 = match &filter.value {
5263 Expr::Literal(Literal::Int(n)) => StoreValue::Int64(*n).to_u64(),
5264 Expr::Literal(Literal::String(s)) if s.len() <= 7 => {
5265 StoreValue::Bytes(s.as_bytes().to_vec()).to_u64()
5266 }
5267 _ => return None,
5270 };
5271
5272 let col_id = prop_name_to_col_id(&filter.key);
5273 if !prop_index.is_indexed(label_id, col_id) {
5274 return None;
5275 }
5276 Some(prop_index.lookup(label_id, col_id, raw_value).to_vec())
5277}
5278
5279fn try_text_index_lookup(
5292 expr: &Expr,
5293 node_var: &str,
5294 label_id: u32,
5295 text_index: &TextIndex,
5296) -> Option<Vec<u32>> {
5297 let (left, op, right) = match expr {
5298 Expr::BinOp { left, op, right }
5299 if matches!(op, BinOpKind::Contains | BinOpKind::StartsWith) =>
5300 {
5301 (left.as_ref(), op, right.as_ref())
5302 }
5303 _ => return None,
5304 };
5305
5306 let prop_name = match left {
5308 Expr::PropAccess { var, prop } if var.as_str() == node_var => prop.as_str(),
5309 _ => return None,
5310 };
5311
5312 let pattern = match right {
5314 Expr::Literal(Literal::String(s)) => s.as_str(),
5315 _ => return None,
5316 };
5317
5318 let col_id = prop_name_to_col_id(prop_name);
5319 if !text_index.is_indexed(label_id, col_id) {
5320 return None;
5321 }
5322
5323 let slots = match op {
5324 BinOpKind::Contains => text_index.lookup_contains(label_id, col_id, pattern),
5325 BinOpKind::StartsWith => text_index.lookup_starts_with(label_id, col_id, pattern),
5326 _ => return None,
5327 };
5328
5329 Some(slots)
5330}
5331
5332fn prop_name_to_col_id(name: &str) -> u32 {
5353 col_id_of(name)
5354}
5355
5356fn collect_col_ids_from_columns(column_names: &[String]) -> Vec<u32> {
5357 let mut ids = Vec::new();
5358 for name in column_names {
5359 let prop = name.split('.').next_back().unwrap_or(name.as_str());
5361 let col_id = prop_name_to_col_id(prop);
5362 if !ids.contains(&col_id) {
5363 ids.push(col_id);
5364 }
5365 }
5366 ids
5367}
5368
5369fn collect_col_ids_for_var(var: &str, column_names: &[String], _label_id: u32) -> Vec<u32> {
5370 let mut ids = Vec::new();
5371 for name in column_names {
5372 if let Some((v, prop)) = name.split_once('.') {
5374 if v == var {
5375 let col_id = prop_name_to_col_id(prop);
5376 if !ids.contains(&col_id) {
5377 ids.push(col_id);
5378 }
5379 }
5380 } else {
5381 let col_id = prop_name_to_col_id(name.as_str());
5383 if !ids.contains(&col_id) {
5384 ids.push(col_id);
5385 }
5386 }
5387 }
5388 if ids.is_empty() {
5389 ids.push(0);
5391 }
5392 ids
5393}
5394
5395fn read_node_props(
5407 store: &NodeStore,
5408 node_id: NodeId,
5409 col_ids: &[u32],
5410) -> sparrowdb_common::Result<Vec<(u32, u64)>> {
5411 if col_ids.is_empty() {
5412 return Ok(vec![]);
5413 }
5414 let nullable = store.get_node_raw_nullable(node_id, col_ids)?;
5415 Ok(nullable
5416 .into_iter()
5417 .filter_map(|(col_id, opt): (u32, Option<u64>)| opt.map(|v| (col_id, v)))
5418 .collect())
5419}
5420
5421fn decode_raw_val(raw: u64, store: &NodeStore) -> Value {
5428 match store.decode_raw_value(raw) {
5429 StoreValue::Int64(n) => Value::Int64(n),
5430 StoreValue::Bytes(b) => Value::String(String::from_utf8_lossy(&b).into_owned()),
5431 StoreValue::Float(f) => Value::Float64(f),
5432 }
5433}
5434
5435fn build_row_vals(
5436 props: &[(u32, u64)],
5437 var_name: &str,
5438 _col_ids: &[u32],
5439 store: &NodeStore,
5440) -> HashMap<String, Value> {
5441 let mut map = HashMap::new();
5442 for &(col_id, raw) in props {
5443 let key = format!("{var_name}.col_{col_id}");
5444 map.insert(key, decode_raw_val(raw, store));
5445 }
5446 map
5447}
5448
5449#[inline]
5455fn is_reserved_label(label: &str) -> bool {
5456 label.starts_with("__SO_")
5457}
5458
5459fn values_equal(a: &Value, b: &Value) -> bool {
5467 match (a, b) {
5468 (Value::Int64(x), Value::Int64(y)) => x == y,
5470 (Value::String(x), Value::String(y)) => x == y,
5476 (Value::Bool(x), Value::Bool(y)) => x == y,
5477 (Value::Float64(x), Value::Float64(y)) => x == y,
5478 (Value::Int64(raw), Value::String(s)) => *raw as u64 == string_to_raw_u64(s),
5482 (Value::String(s), Value::Int64(raw)) => string_to_raw_u64(s) == *raw as u64,
5483 (Value::Null, Value::Null) => true,
5485 _ => false,
5486 }
5487}
5488
5489fn eval_where(expr: &Expr, vals: &HashMap<String, Value>) -> bool {
5490 match expr {
5491 Expr::BinOp { left, op, right } => {
5492 let lv = eval_expr(left, vals);
5493 let rv = eval_expr(right, vals);
5494 match op {
5495 BinOpKind::Eq => values_equal(&lv, &rv),
5496 BinOpKind::Neq => !values_equal(&lv, &rv),
5497 BinOpKind::Contains => lv.contains(&rv),
5498 BinOpKind::StartsWith => {
5499 matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.starts_with(r.as_str()))
5500 }
5501 BinOpKind::EndsWith => {
5502 matches!((&lv, &rv), (Value::String(l), Value::String(r)) if l.ends_with(r.as_str()))
5503 }
5504 BinOpKind::Lt => match (&lv, &rv) {
5505 (Value::Int64(a), Value::Int64(b)) => a < b,
5506 _ => false,
5507 },
5508 BinOpKind::Le => match (&lv, &rv) {
5509 (Value::Int64(a), Value::Int64(b)) => a <= b,
5510 _ => false,
5511 },
5512 BinOpKind::Gt => match (&lv, &rv) {
5513 (Value::Int64(a), Value::Int64(b)) => a > b,
5514 _ => false,
5515 },
5516 BinOpKind::Ge => match (&lv, &rv) {
5517 (Value::Int64(a), Value::Int64(b)) => a >= b,
5518 _ => false,
5519 },
5520 _ => false,
5521 }
5522 }
5523 Expr::And(l, r) => eval_where(l, vals) && eval_where(r, vals),
5524 Expr::Or(l, r) => eval_where(l, vals) || eval_where(r, vals),
5525 Expr::Not(inner) => !eval_where(inner, vals),
5526 Expr::Literal(Literal::Bool(b)) => *b,
5527 Expr::Literal(_) => false,
5528 Expr::InList {
5529 expr,
5530 list,
5531 negated,
5532 } => {
5533 let lv = eval_expr(expr, vals);
5534 let matched = list
5535 .iter()
5536 .any(|item| values_equal(&lv, &eval_expr(item, vals)));
5537 if *negated {
5538 !matched
5539 } else {
5540 matched
5541 }
5542 }
5543 Expr::ListPredicate { .. } => {
5544 match eval_expr(expr, vals) {
5546 Value::Bool(b) => b,
5547 _ => false,
5548 }
5549 }
5550 Expr::IsNull(inner) => matches!(eval_expr(inner, vals), Value::Null),
5551 Expr::IsNotNull(inner) => !matches!(eval_expr(inner, vals), Value::Null),
5552 Expr::CaseWhen { .. } => matches!(eval_expr(expr, vals), Value::Bool(true)),
5554 Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
5557 false
5558 }
5559 _ => false, }
5561}
5562
5563fn eval_expr(expr: &Expr, vals: &HashMap<String, Value>) -> Value {
5564 match expr {
5565 Expr::PropAccess { var, prop } => {
5566 let key = format!("{var}.{prop}");
5568 if let Some(v) = vals.get(&key) {
5569 return v.clone();
5570 }
5571 let col_id = prop_name_to_col_id(prop);
5575 let fallback_key = format!("{var}.col_{col_id}");
5576 vals.get(&fallback_key).cloned().unwrap_or(Value::Null)
5577 }
5578 Expr::Var(v) => vals.get(v.as_str()).cloned().unwrap_or(Value::Null),
5579 Expr::Literal(lit) => match lit {
5580 Literal::Int(n) => Value::Int64(*n),
5581 Literal::Float(f) => Value::Float64(*f),
5582 Literal::Bool(b) => Value::Bool(*b),
5583 Literal::String(s) => Value::String(s.clone()),
5584 Literal::Param(p) => {
5585 vals.get(&format!("${p}")).cloned().unwrap_or(Value::Null)
5588 }
5589 Literal::Null => Value::Null,
5590 },
5591 Expr::FnCall { name, args } => {
5592 let name_lc = name.to_lowercase();
5596 if name_lc == "type" {
5597 if let Some(Expr::Var(var_name)) = args.first() {
5598 let meta_key = format!("{}.__type__", var_name);
5599 return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
5600 }
5601 }
5602 if name_lc == "labels" {
5603 if let Some(Expr::Var(var_name)) = args.first() {
5604 let meta_key = format!("{}.__labels__", var_name);
5605 return vals.get(&meta_key).cloned().unwrap_or(Value::Null);
5606 }
5607 }
5608 if name_lc == "id" {
5611 if let Some(Expr::Var(var_name)) = args.first() {
5612 let id_key = format!("{}.__node_id__", var_name);
5614 if let Some(Value::NodeRef(nid)) = vals.get(&id_key) {
5615 return Value::Int64(nid.0 as i64);
5616 }
5617 if let Some(Value::NodeRef(nid)) = vals.get(var_name.as_str()) {
5619 return Value::Int64(nid.0 as i64);
5620 }
5621 return Value::Null;
5622 }
5623 }
5624 let evaluated: Vec<Value> = args.iter().map(|a| eval_expr(a, vals)).collect();
5626 crate::functions::dispatch_function(name, evaluated).unwrap_or(Value::Null)
5627 }
5628 Expr::BinOp { left, op, right } => {
5629 let lv = eval_expr(left, vals);
5631 let rv = eval_expr(right, vals);
5632 match op {
5633 BinOpKind::Eq => Value::Bool(lv == rv),
5634 BinOpKind::Neq => Value::Bool(lv != rv),
5635 BinOpKind::Lt => match (&lv, &rv) {
5636 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a < b),
5637 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a < b),
5638 _ => Value::Null,
5639 },
5640 BinOpKind::Le => match (&lv, &rv) {
5641 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a <= b),
5642 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a <= b),
5643 _ => Value::Null,
5644 },
5645 BinOpKind::Gt => match (&lv, &rv) {
5646 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a > b),
5647 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a > b),
5648 _ => Value::Null,
5649 },
5650 BinOpKind::Ge => match (&lv, &rv) {
5651 (Value::Int64(a), Value::Int64(b)) => Value::Bool(a >= b),
5652 (Value::Float64(a), Value::Float64(b)) => Value::Bool(a >= b),
5653 _ => Value::Null,
5654 },
5655 BinOpKind::Contains => match (&lv, &rv) {
5656 (Value::String(l), Value::String(r)) => Value::Bool(l.contains(r.as_str())),
5657 _ => Value::Null,
5658 },
5659 BinOpKind::StartsWith => match (&lv, &rv) {
5660 (Value::String(l), Value::String(r)) => Value::Bool(l.starts_with(r.as_str())),
5661 _ => Value::Null,
5662 },
5663 BinOpKind::EndsWith => match (&lv, &rv) {
5664 (Value::String(l), Value::String(r)) => Value::Bool(l.ends_with(r.as_str())),
5665 _ => Value::Null,
5666 },
5667 BinOpKind::And => match (&lv, &rv) {
5668 (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a && *b),
5669 _ => Value::Null,
5670 },
5671 BinOpKind::Or => match (&lv, &rv) {
5672 (Value::Bool(a), Value::Bool(b)) => Value::Bool(*a || *b),
5673 _ => Value::Null,
5674 },
5675 BinOpKind::Add => match (&lv, &rv) {
5676 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a + b),
5677 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a + b),
5678 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 + b),
5679 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a + *b as f64),
5680 (Value::String(a), Value::String(b)) => Value::String(format!("{a}{b}")),
5681 _ => Value::Null,
5682 },
5683 BinOpKind::Sub => match (&lv, &rv) {
5684 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a - b),
5685 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a - b),
5686 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 - b),
5687 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a - *b as f64),
5688 _ => Value::Null,
5689 },
5690 BinOpKind::Mul => match (&lv, &rv) {
5691 (Value::Int64(a), Value::Int64(b)) => Value::Int64(a * b),
5692 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a * b),
5693 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 * b),
5694 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a * *b as f64),
5695 _ => Value::Null,
5696 },
5697 BinOpKind::Div => match (&lv, &rv) {
5698 (Value::Int64(a), Value::Int64(b)) => {
5699 if *b == 0 {
5700 Value::Null
5701 } else {
5702 Value::Int64(a / b)
5703 }
5704 }
5705 (Value::Float64(a), Value::Float64(b)) => Value::Float64(a / b),
5706 (Value::Int64(a), Value::Float64(b)) => Value::Float64(*a as f64 / b),
5707 (Value::Float64(a), Value::Int64(b)) => Value::Float64(a / *b as f64),
5708 _ => Value::Null,
5709 },
5710 BinOpKind::Mod => match (&lv, &rv) {
5711 (Value::Int64(a), Value::Int64(b)) => {
5712 if *b == 0 {
5713 Value::Null
5714 } else {
5715 Value::Int64(a % b)
5716 }
5717 }
5718 _ => Value::Null,
5719 },
5720 }
5721 }
5722 Expr::Not(inner) => match eval_expr(inner, vals) {
5723 Value::Bool(b) => Value::Bool(!b),
5724 _ => Value::Null,
5725 },
5726 Expr::And(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
5727 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a && b),
5728 _ => Value::Null,
5729 },
5730 Expr::Or(l, r) => match (eval_expr(l, vals), eval_expr(r, vals)) {
5731 (Value::Bool(a), Value::Bool(b)) => Value::Bool(a || b),
5732 _ => Value::Null,
5733 },
5734 Expr::InList {
5735 expr,
5736 list,
5737 negated,
5738 } => {
5739 let lv = eval_expr(expr, vals);
5740 let matched = list
5741 .iter()
5742 .any(|item| values_equal(&lv, &eval_expr(item, vals)));
5743 Value::Bool(if *negated { !matched } else { matched })
5744 }
5745 Expr::List(items) => {
5746 let evaluated: Vec<Value> = items.iter().map(|e| eval_expr(e, vals)).collect();
5747 Value::List(evaluated)
5748 }
5749 Expr::ListPredicate {
5750 kind,
5751 variable,
5752 list_expr,
5753 predicate,
5754 } => {
5755 let list_val = eval_expr(list_expr, vals);
5756 let items = match list_val {
5757 Value::List(v) => v,
5758 _ => return Value::Null,
5759 };
5760 let mut satisfied_count = 0usize;
5761 let mut scope = vals.clone();
5764 for item in &items {
5765 scope.insert(variable.clone(), item.clone());
5766 let result = eval_expr(predicate, &scope);
5767 if result == Value::Bool(true) {
5768 satisfied_count += 1;
5769 }
5770 }
5771 let result = match kind {
5772 ListPredicateKind::Any => satisfied_count > 0,
5773 ListPredicateKind::All => satisfied_count == items.len(),
5774 ListPredicateKind::None => satisfied_count == 0,
5775 ListPredicateKind::Single => satisfied_count == 1,
5776 };
5777 Value::Bool(result)
5778 }
5779 Expr::IsNull(inner) => Value::Bool(matches!(eval_expr(inner, vals), Value::Null)),
5780 Expr::IsNotNull(inner) => Value::Bool(!matches!(eval_expr(inner, vals), Value::Null)),
5781 Expr::CaseWhen {
5783 branches,
5784 else_expr,
5785 } => {
5786 for (cond, then_val) in branches {
5787 if let Value::Bool(true) = eval_expr(cond, vals) {
5788 return eval_expr(then_val, vals);
5789 }
5790 }
5791 else_expr
5792 .as_ref()
5793 .map(|e| eval_expr(e, vals))
5794 .unwrap_or(Value::Null)
5795 }
5796 Expr::ExistsSubquery(_) | Expr::ShortestPath(_) | Expr::NotExists(_) | Expr::CountStar => {
5798 Value::Null
5799 }
5800 }
5801}
5802
5803fn project_row(
5804 props: &[(u32, u64)],
5805 column_names: &[String],
5806 _col_ids: &[u32],
5807 var_name: &str,
5809 node_label: &str,
5811 store: &NodeStore,
5812) -> Vec<Value> {
5813 column_names
5814 .iter()
5815 .map(|col_name| {
5816 if let Some(inner) = col_name
5818 .strip_prefix("labels(")
5819 .and_then(|s| s.strip_suffix(')'))
5820 {
5821 if inner == var_name && !node_label.is_empty() {
5822 return Value::List(vec![Value::String(node_label.to_string())]);
5823 }
5824 return Value::Null;
5825 }
5826 let prop = col_name.split('.').next_back().unwrap_or(col_name.as_str());
5827 let col_id = prop_name_to_col_id(prop);
5828 props
5829 .iter()
5830 .find(|(c, _)| *c == col_id)
5831 .map(|(_, v)| decode_raw_val(*v, store))
5832 .unwrap_or(Value::Null)
5833 })
5834 .collect()
5835}
5836
5837#[allow(clippy::too_many_arguments)]
5838fn project_hop_row(
5839 src_props: &[(u32, u64)],
5840 dst_props: &[(u32, u64)],
5841 column_names: &[String],
5842 src_var: &str,
5843 _dst_var: &str,
5844 rel_var_type: Option<(&str, &str)>,
5846 src_label_meta: Option<(&str, &str)>,
5848 dst_label_meta: Option<(&str, &str)>,
5850 store: &NodeStore,
5851) -> Vec<Value> {
5852 column_names
5853 .iter()
5854 .map(|col_name| {
5855 if let Some(inner) = col_name
5857 .strip_prefix("type(")
5858 .and_then(|s| s.strip_suffix(')'))
5859 {
5860 if let Some((rel_var, rel_type)) = rel_var_type {
5862 if inner == rel_var {
5863 return Value::String(rel_type.to_string());
5864 }
5865 }
5866 return Value::Null;
5867 }
5868 if let Some(inner) = col_name
5870 .strip_prefix("labels(")
5871 .and_then(|s| s.strip_suffix(')'))
5872 {
5873 if let Some((meta_var, label)) = src_label_meta {
5874 if inner == meta_var {
5875 return Value::List(vec![Value::String(label.to_string())]);
5876 }
5877 }
5878 if let Some((meta_var, label)) = dst_label_meta {
5879 if inner == meta_var {
5880 return Value::List(vec![Value::String(label.to_string())]);
5881 }
5882 }
5883 return Value::Null;
5884 }
5885 if let Some((v, prop)) = col_name.split_once('.') {
5886 let col_id = prop_name_to_col_id(prop);
5887 let props = if v == src_var { src_props } else { dst_props };
5888 props
5889 .iter()
5890 .find(|(c, _)| *c == col_id)
5891 .map(|(_, val)| decode_raw_val(*val, store))
5892 .unwrap_or(Value::Null)
5893 } else {
5894 Value::Null
5895 }
5896 })
5897 .collect()
5898}
5899
5900fn project_fof_row(
5907 src_props: &[(u32, u64)],
5908 fof_props: &[(u32, u64)],
5909 column_names: &[String],
5910 src_var: &str,
5911 store: &NodeStore,
5912) -> Vec<Value> {
5913 column_names
5914 .iter()
5915 .map(|col_name| {
5916 if let Some((var, prop)) = col_name.split_once('.') {
5917 let col_id = prop_name_to_col_id(prop);
5918 let props = if !src_var.is_empty() && var == src_var {
5919 src_props
5920 } else {
5921 fof_props
5922 };
5923 props
5924 .iter()
5925 .find(|(c, _)| *c == col_id)
5926 .map(|(_, v)| decode_raw_val(*v, store))
5927 .unwrap_or(Value::Null)
5928 } else {
5929 Value::Null
5930 }
5931 })
5932 .collect()
5933}
5934
5935fn deduplicate_rows(rows: &mut Vec<Vec<Value>>) {
5936 let mut unique: Vec<Vec<Value>> = Vec::with_capacity(rows.len());
5939 for row in rows.drain(..) {
5940 if !unique.iter().any(|existing| existing == &row) {
5941 unique.push(row);
5942 }
5943 }
5944 *rows = unique;
5945}
5946
5947fn sort_spill_threshold() -> usize {
5949 std::env::var("SPARROWDB_SORT_SPILL_ROWS")
5950 .ok()
5951 .and_then(|v| v.parse().ok())
5952 .unwrap_or(crate::sort_spill::DEFAULT_ROW_THRESHOLD)
5953}
5954
5955fn make_sort_key(
5957 row: &[Value],
5958 order_by: &[(Expr, SortDir)],
5959 column_names: &[String],
5960) -> Vec<crate::sort_spill::SortKeyVal> {
5961 use crate::sort_spill::{OrdValue, SortKeyVal};
5962 order_by
5963 .iter()
5964 .map(|(expr, dir)| {
5965 let col_idx = match expr {
5966 Expr::PropAccess { var, prop } => {
5967 let key = format!("{var}.{prop}");
5968 column_names.iter().position(|c| c == &key)
5969 }
5970 Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
5971 _ => None,
5972 };
5973 let val = col_idx
5974 .and_then(|i| row.get(i))
5975 .map(OrdValue::from_value)
5976 .unwrap_or(OrdValue::Null);
5977 match dir {
5978 SortDir::Asc => SortKeyVal::Asc(val),
5979 SortDir::Desc => SortKeyVal::Desc(std::cmp::Reverse(val)),
5980 }
5981 })
5982 .collect()
5983}
5984
5985fn apply_order_by(rows: &mut Vec<Vec<Value>>, m: &MatchStatement, column_names: &[String]) {
5986 if m.order_by.is_empty() {
5987 return;
5988 }
5989
5990 let threshold = sort_spill_threshold();
5991
5992 if rows.len() <= threshold {
5993 rows.sort_by(|a, b| {
5994 for (expr, dir) in &m.order_by {
5995 let col_idx = match expr {
5996 Expr::PropAccess { var, prop } => {
5997 let key = format!("{var}.{prop}");
5998 column_names.iter().position(|c| c == &key)
5999 }
6000 Expr::Var(v) => column_names.iter().position(|c| c == v.as_str()),
6001 _ => None,
6002 };
6003 if let Some(idx) = col_idx {
6004 if idx < a.len() && idx < b.len() {
6005 let cmp = compare_values(&a[idx], &b[idx]);
6006 let cmp = if *dir == SortDir::Desc {
6007 cmp.reverse()
6008 } else {
6009 cmp
6010 };
6011 if cmp != std::cmp::Ordering::Equal {
6012 return cmp;
6013 }
6014 }
6015 }
6016 }
6017 std::cmp::Ordering::Equal
6018 });
6019 } else {
6020 use crate::sort_spill::{SortableRow, SpillingSorter};
6021 let mut sorter: SpillingSorter<SortableRow> = SpillingSorter::new();
6022 for row in rows.drain(..) {
6023 let key = make_sort_key(&row, &m.order_by, column_names);
6024 if sorter.push(SortableRow { key, data: row }).is_err() {
6025 return;
6026 }
6027 }
6028 if let Ok(iter) = sorter.finish() {
6029 *rows = iter.map(|sr| sr.data).collect::<Vec<_>>();
6030 }
6031 }
6032}
6033
6034fn compare_values(a: &Value, b: &Value) -> std::cmp::Ordering {
6035 match (a, b) {
6036 (Value::Int64(x), Value::Int64(y)) => x.cmp(y),
6037 (Value::Float64(x), Value::Float64(y)) => {
6038 x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Equal)
6039 }
6040 (Value::String(x), Value::String(y)) => x.cmp(y),
6041 _ => std::cmp::Ordering::Equal,
6042 }
6043}
6044
6045fn is_aggregate_expr(expr: &Expr) -> bool {
6049 match expr {
6050 Expr::CountStar => true,
6051 Expr::FnCall { name, .. } => matches!(
6052 name.to_lowercase().as_str(),
6053 "count" | "sum" | "avg" | "min" | "max" | "collect"
6054 ),
6055 Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
6057 _ => false,
6058 }
6059}
6060
6061fn expr_has_collect(expr: &Expr) -> bool {
6063 match expr {
6064 Expr::FnCall { name, .. } => name.to_lowercase() == "collect",
6065 Expr::ListPredicate { list_expr, .. } => expr_has_collect(list_expr),
6066 _ => false,
6067 }
6068}
6069
6070fn extract_collect_arg(expr: &Expr, row_vals: &HashMap<String, Value>) -> Value {
6076 match expr {
6077 Expr::FnCall { args, .. } if !args.is_empty() => eval_expr(&args[0], row_vals),
6078 Expr::ListPredicate { list_expr, .. } => extract_collect_arg(list_expr, row_vals),
6079 _ => Value::Null,
6080 }
6081}
6082
6083fn evaluate_aggregate_expr(
6089 expr: &Expr,
6090 accumulated_list: &Value,
6091 outer_vals: &HashMap<String, Value>,
6092) -> Value {
6093 match expr {
6094 Expr::FnCall { name, .. } if name.to_lowercase() == "collect" => accumulated_list.clone(),
6095 Expr::ListPredicate {
6096 kind,
6097 variable,
6098 predicate,
6099 ..
6100 } => {
6101 let items = match accumulated_list {
6102 Value::List(v) => v,
6103 _ => return Value::Null,
6104 };
6105 let mut satisfied_count = 0usize;
6106 for item in items {
6107 let mut scope = outer_vals.clone();
6108 scope.insert(variable.clone(), item.clone());
6109 let result = eval_expr(predicate, &scope);
6110 if result == Value::Bool(true) {
6111 satisfied_count += 1;
6112 }
6113 }
6114 let result = match kind {
6115 ListPredicateKind::Any => satisfied_count > 0,
6116 ListPredicateKind::All => satisfied_count == items.len(),
6117 ListPredicateKind::None => satisfied_count == 0,
6118 ListPredicateKind::Single => satisfied_count == 1,
6119 };
6120 Value::Bool(result)
6121 }
6122 _ => Value::Null,
6123 }
6124}
6125
6126fn has_aggregate_in_return(items: &[ReturnItem]) -> bool {
6128 items.iter().any(|item| is_aggregate_expr(&item.expr))
6129}
6130
6131fn needs_node_ref_in_return(items: &[ReturnItem]) -> bool {
6142 items.iter().any(|item| {
6143 matches!(&item.expr, Expr::FnCall { name, .. } if name.to_lowercase() == "id")
6144 || matches!(&item.expr, Expr::Var(_))
6145 || expr_needs_graph(&item.expr)
6146 || expr_needs_eval_path(&item.expr)
6147 })
6148}
6149
6150fn expr_needs_eval_path(expr: &Expr) -> bool {
6162 match expr {
6163 Expr::FnCall { name, args } => {
6164 let name_lc = name.to_lowercase();
6165 if matches!(
6167 name_lc.as_str(),
6168 "count" | "sum" | "avg" | "min" | "max" | "collect"
6169 ) {
6170 return false;
6171 }
6172 let _ = args; true
6178 }
6179 Expr::BinOp { left, right, .. } => {
6181 expr_needs_eval_path(left) || expr_needs_eval_path(right)
6182 }
6183 Expr::And(l, r) | Expr::Or(l, r) => expr_needs_eval_path(l) || expr_needs_eval_path(r),
6184 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
6185 expr_needs_eval_path(inner)
6186 }
6187 _ => false,
6188 }
6189}
6190
6191fn bare_var_names_in_return(items: &[ReturnItem]) -> Vec<String> {
6196 items
6197 .iter()
6198 .filter_map(|item| {
6199 if let Expr::Var(v) = &item.expr {
6200 Some(v.clone())
6201 } else {
6202 None
6203 }
6204 })
6205 .collect()
6206}
6207
6208fn build_node_map(props: &[(u32, u64)], store: &NodeStore) -> Value {
6213 let entries: Vec<(String, Value)> = props
6214 .iter()
6215 .map(|&(col_id, raw)| (format!("col_{col_id}"), decode_raw_val(raw, store)))
6216 .collect();
6217 Value::Map(entries)
6218}
6219
6220#[derive(Debug, Clone, PartialEq)]
6222enum AggKind {
6223 Key,
6225 CountStar,
6226 Count,
6227 Sum,
6228 Avg,
6229 Min,
6230 Max,
6231 Collect,
6232}
6233
6234fn agg_kind(expr: &Expr) -> AggKind {
6235 match expr {
6236 Expr::CountStar => AggKind::CountStar,
6237 Expr::FnCall { name, .. } => match name.to_lowercase().as_str() {
6238 "count" => AggKind::Count,
6239 "sum" => AggKind::Sum,
6240 "avg" => AggKind::Avg,
6241 "min" => AggKind::Min,
6242 "max" => AggKind::Max,
6243 "collect" => AggKind::Collect,
6244 _ => AggKind::Key,
6245 },
6246 Expr::ListPredicate { list_expr, .. } if expr_has_collect(list_expr) => AggKind::Collect,
6248 _ => AggKind::Key,
6249 }
6250}
6251
6252fn expr_needs_graph(expr: &Expr) -> bool {
6261 match expr {
6262 Expr::ShortestPath(_) | Expr::ExistsSubquery(_) | Expr::CaseWhen { .. } => true,
6263 Expr::And(l, r) | Expr::Or(l, r) => expr_needs_graph(l) || expr_needs_graph(r),
6264 Expr::Not(inner) | Expr::IsNull(inner) | Expr::IsNotNull(inner) => expr_needs_graph(inner),
6265 Expr::BinOp { left, right, .. } => expr_needs_graph(left) || expr_needs_graph(right),
6266 _ => false,
6267 }
6268}
6269
6270fn aggregate_rows(rows: &[HashMap<String, Value>], return_items: &[ReturnItem]) -> Vec<Vec<Value>> {
6271 let kinds: Vec<AggKind> = return_items
6273 .iter()
6274 .map(|item| agg_kind(&item.expr))
6275 .collect();
6276
6277 let key_indices: Vec<usize> = kinds
6278 .iter()
6279 .enumerate()
6280 .filter(|(_, k)| **k == AggKind::Key)
6281 .map(|(i, _)| i)
6282 .collect();
6283
6284 let agg_indices: Vec<usize> = kinds
6285 .iter()
6286 .enumerate()
6287 .filter(|(_, k)| **k != AggKind::Key)
6288 .map(|(i, _)| i)
6289 .collect();
6290
6291 if agg_indices.is_empty() {
6293 return rows
6294 .iter()
6295 .map(|row_vals| {
6296 return_items
6297 .iter()
6298 .map(|item| eval_expr(&item.expr, row_vals))
6299 .collect()
6300 })
6301 .collect();
6302 }
6303
6304 let mut group_keys: Vec<Vec<Value>> = Vec::new();
6306 let mut group_accum: Vec<Vec<Vec<Value>>> = Vec::new();
6308
6309 for row_vals in rows {
6310 let key: Vec<Value> = key_indices
6311 .iter()
6312 .map(|&i| eval_expr(&return_items[i].expr, row_vals))
6313 .collect();
6314
6315 let group_idx = if let Some(pos) = group_keys.iter().position(|k| k == &key) {
6316 pos
6317 } else {
6318 group_keys.push(key);
6319 group_accum.push(vec![vec![]; agg_indices.len()]);
6320 group_keys.len() - 1
6321 };
6322
6323 for (ai, &ri) in agg_indices.iter().enumerate() {
6324 match &kinds[ri] {
6325 AggKind::CountStar => {
6326 group_accum[group_idx][ai].push(Value::Int64(1));
6328 }
6329 AggKind::Count | AggKind::Sum | AggKind::Avg | AggKind::Min | AggKind::Max => {
6330 let arg_val = match &return_items[ri].expr {
6331 Expr::FnCall { args, .. } if !args.is_empty() => {
6332 eval_expr(&args[0], row_vals)
6333 }
6334 _ => Value::Null,
6335 };
6336 if !matches!(arg_val, Value::Null) {
6338 group_accum[group_idx][ai].push(arg_val);
6339 }
6340 }
6341 AggKind::Collect => {
6342 let arg_val = extract_collect_arg(&return_items[ri].expr, row_vals);
6345 if !matches!(arg_val, Value::Null) {
6347 group_accum[group_idx][ai].push(arg_val);
6348 }
6349 }
6350 AggKind::Key => unreachable!(),
6351 }
6352 }
6353 }
6354
6355 if group_keys.is_empty() && key_indices.is_empty() {
6357 let empty_vals: HashMap<String, Value> = HashMap::new();
6358 let row: Vec<Value> = return_items
6359 .iter()
6360 .zip(kinds.iter())
6361 .map(|(item, k)| match k {
6362 AggKind::CountStar | AggKind::Count | AggKind::Sum => Value::Int64(0),
6363 AggKind::Avg | AggKind::Min | AggKind::Max => Value::Null,
6364 AggKind::Collect => {
6365 evaluate_aggregate_expr(&item.expr, &Value::List(vec![]), &empty_vals)
6366 }
6367 AggKind::Key => Value::Null,
6368 })
6369 .collect();
6370 return vec![row];
6371 }
6372
6373 if group_keys.is_empty() {
6375 return vec![];
6376 }
6377
6378 let mut out: Vec<Vec<Value>> = Vec::with_capacity(group_keys.len());
6380 for (gi, key_vals) in group_keys.into_iter().enumerate() {
6381 let mut output_row: Vec<Value> = Vec::with_capacity(return_items.len());
6382 let mut ki = 0usize;
6383 let mut ai = 0usize;
6384 let outer_vals: HashMap<String, Value> = key_indices
6386 .iter()
6387 .enumerate()
6388 .map(|(pos, &i)| {
6389 let name = return_items[i]
6390 .alias
6391 .clone()
6392 .unwrap_or_else(|| format!("_k{i}"));
6393 (name, key_vals[pos].clone())
6394 })
6395 .collect();
6396 for col_idx in 0..return_items.len() {
6397 if kinds[col_idx] == AggKind::Key {
6398 output_row.push(key_vals[ki].clone());
6399 ki += 1;
6400 } else {
6401 let accumulated = Value::List(group_accum[gi][ai].clone());
6402 let result = if kinds[col_idx] == AggKind::Collect {
6403 evaluate_aggregate_expr(&return_items[col_idx].expr, &accumulated, &outer_vals)
6404 } else {
6405 finalize_aggregate(&kinds[col_idx], &group_accum[gi][ai])
6406 };
6407 output_row.push(result);
6408 ai += 1;
6409 }
6410 }
6411 out.push(output_row);
6412 }
6413 out
6414}
6415
6416fn finalize_aggregate(kind: &AggKind, vals: &[Value]) -> Value {
6418 match kind {
6419 AggKind::CountStar | AggKind::Count => Value::Int64(vals.len() as i64),
6420 AggKind::Sum => {
6421 let mut sum_i: i64 = 0;
6422 let mut sum_f: f64 = 0.0;
6423 let mut is_float = false;
6424 for v in vals {
6425 match v {
6426 Value::Int64(n) => sum_i += n,
6427 Value::Float64(f) => {
6428 is_float = true;
6429 sum_f += f;
6430 }
6431 _ => {}
6432 }
6433 }
6434 if is_float {
6435 Value::Float64(sum_f + sum_i as f64)
6436 } else {
6437 Value::Int64(sum_i)
6438 }
6439 }
6440 AggKind::Avg => {
6441 if vals.is_empty() {
6442 return Value::Null;
6443 }
6444 let mut sum: f64 = 0.0;
6445 let mut count: i64 = 0;
6446 for v in vals {
6447 match v {
6448 Value::Int64(n) => {
6449 sum += *n as f64;
6450 count += 1;
6451 }
6452 Value::Float64(f) => {
6453 sum += f;
6454 count += 1;
6455 }
6456 _ => {}
6457 }
6458 }
6459 if count == 0 {
6460 Value::Null
6461 } else {
6462 Value::Float64(sum / count as f64)
6463 }
6464 }
6465 AggKind::Min => vals
6466 .iter()
6467 .fold(None::<Value>, |acc, v| match (acc, v) {
6468 (None, v) => Some(v.clone()),
6469 (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.min(*b))),
6470 (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.min(*b))),
6471 (Some(Value::String(a)), Value::String(b)) => {
6472 Some(Value::String(if a <= *b { a } else { b.clone() }))
6473 }
6474 (Some(a), _) => Some(a),
6475 })
6476 .unwrap_or(Value::Null),
6477 AggKind::Max => vals
6478 .iter()
6479 .fold(None::<Value>, |acc, v| match (acc, v) {
6480 (None, v) => Some(v.clone()),
6481 (Some(Value::Int64(a)), Value::Int64(b)) => Some(Value::Int64(a.max(*b))),
6482 (Some(Value::Float64(a)), Value::Float64(b)) => Some(Value::Float64(a.max(*b))),
6483 (Some(Value::String(a)), Value::String(b)) => {
6484 Some(Value::String(if a >= *b { a } else { b.clone() }))
6485 }
6486 (Some(a), _) => Some(a),
6487 })
6488 .unwrap_or(Value::Null),
6489 AggKind::Collect => Value::List(vals.to_vec()),
6490 AggKind::Key => Value::Null,
6491 }
6492}
6493
6494fn eval_expr_to_string(expr: &Expr) -> Result<String> {
6501 match expr {
6502 Expr::Literal(Literal::String(s)) => Ok(s.clone()),
6503 Expr::Literal(Literal::Param(p)) => Err(sparrowdb_common::Error::InvalidArgument(format!(
6504 "parameter ${p} requires runtime binding; pass a literal string instead"
6505 ))),
6506 other => Err(sparrowdb_common::Error::InvalidArgument(format!(
6507 "procedure argument must be a string literal, got: {other:?}"
6508 ))),
6509 }
6510}
6511
6512fn expr_to_col_name(expr: &Expr) -> String {
6515 match expr {
6516 Expr::PropAccess { var, prop } => format!("{var}.{prop}"),
6517 Expr::Var(v) => v.clone(),
6518 _ => "value".to_owned(),
6519 }
6520}
6521
6522fn eval_call_expr(expr: &Expr, env: &HashMap<String, Value>, store: &NodeStore) -> Value {
6528 match expr {
6529 Expr::Var(v) => env.get(v.as_str()).cloned().unwrap_or(Value::Null),
6530 Expr::PropAccess { var, prop } => match env.get(var.as_str()) {
6531 Some(Value::NodeRef(node_id)) => {
6532 let col_id = prop_name_to_col_id(prop);
6533 read_node_props(store, *node_id, &[col_id])
6534 .ok()
6535 .and_then(|pairs| pairs.into_iter().find(|(c, _)| *c == col_id))
6536 .map(|(_, raw)| decode_raw_val(raw, store))
6537 .unwrap_or(Value::Null)
6538 }
6539 Some(other) => other.clone(),
6540 None => Value::Null,
6541 },
6542 Expr::Literal(lit) => match lit {
6543 Literal::Int(n) => Value::Int64(*n),
6544 Literal::Float(f) => Value::Float64(*f),
6545 Literal::Bool(b) => Value::Bool(*b),
6546 Literal::String(s) => Value::String(s.clone()),
6547 _ => Value::Null,
6548 },
6549 _ => Value::Null,
6550 }
6551}