1use std::collections::{BTreeMap, BTreeSet};
40use std::mem::ManuallyDrop;
41use std::sync::Arc;
42
43use lora_analyzer::symbols::VarId;
44use lora_analyzer::{ResolvedExpr, ResolvedProjection};
45use lora_ast::{Direction, RangeLiteral};
46use lora_compiler::physical::{
47 ExpandExec, FilterExec, HashAggregationExec, LimitExec, NodeByLabelScanExec,
48 NodeByPropertyScanExec, NodeScanExec, OptionalMatchExec, PathBuildExec, PhysicalNodeId,
49 PhysicalOp, PhysicalPlan, ProjectionExec, SortExec, UnwindExec,
50};
51use lora_compiler::CompiledQuery;
52use lora_store::{GraphStorage, GraphStorageMut, NodeId};
53
54use crate::errors::{value_kind, ExecResult, ExecutorError};
55use crate::eval::{eval_expr, take_eval_error, EvalContext};
56use crate::executor::{
57 build_path_value, compute_aggregate_expr, hydrate_node_record, hydrate_relationship_record,
58 indexed_node_property_candidates, label_group_candidates_prefiltered,
59 node_matches_label_groups, node_matches_property_filter, resolve_range,
60 scan_node_ids_for_label_groups, value_matches_property_value, ExecutionContext, Executor,
61 GroupValueKey, MutableExecutionContext, MutableExecutor,
62};
63use crate::value::{LoraValue, Row};
64
65pub trait RowSource {
72 fn next_row(&mut self) -> ExecResult<Option<Row>>;
74}
75
76pub fn drain<S: RowSource + ?Sized>(source: &mut S) -> ExecResult<Vec<Row>> {
78 let mut out = Vec::new();
79 while let Some(row) = source.next_row()? {
80 out.push(row);
81 }
82 Ok(out)
83}
84
85#[derive(Clone)]
95pub(crate) struct StreamCtx<'a, S: GraphStorage> {
96 pub storage: &'a S,
97 pub params: Arc<BTreeMap<String, LoraValue>>,
98}
99
100impl<'a, S: GraphStorage> StreamCtx<'a, S> {
101 fn new(storage: &'a S, params: Arc<BTreeMap<String, LoraValue>>) -> Self {
102 Self { storage, params }
103 }
104
105 fn eval_ctx<'b>(&'b self) -> EvalContext<'b, S> {
108 EvalContext {
109 storage: self.storage,
110 params: &self.params,
111 }
112 }
113}
114
115pub struct BufferedRowSource {
123 iter: std::vec::IntoIter<Row>,
124}
125
126impl BufferedRowSource {
127 pub fn new(rows: Vec<Row>) -> Self {
128 Self {
129 iter: rows.into_iter(),
130 }
131 }
132}
133
134impl RowSource for BufferedRowSource {
135 fn next_row(&mut self) -> ExecResult<Option<Row>> {
136 Ok(self.iter.next())
137 }
138}
139
140pub struct ArgumentSource {
147 yielded: bool,
148}
149
150impl ArgumentSource {
151 pub fn new() -> Self {
152 Self { yielded: false }
153 }
154}
155
156impl Default for ArgumentSource {
157 fn default() -> Self {
158 Self::new()
159 }
160}
161
162impl RowSource for ArgumentSource {
163 fn next_row(&mut self) -> ExecResult<Option<Row>> {
164 if self.yielded {
165 Ok(None)
166 } else {
167 self.yielded = true;
168 Ok(Some(Row::new()))
169 }
170 }
171}
172
173pub struct NodeScanSource<'a, S: GraphStorage> {
178 upstream: Box<dyn RowSource + 'a>,
179 storage: &'a S,
180 var: VarId,
181 cur_row: Option<Row>,
184 cur_ids: Vec<NodeId>,
187 cur_idx: usize,
189 cur_emitted: bool,
191}
192
193impl<'a, S: GraphStorage> NodeScanSource<'a, S> {
194 fn new(upstream: Box<dyn RowSource + 'a>, storage: &'a S, var: VarId) -> Self {
195 Self {
196 upstream,
197 storage,
198 var,
199 cur_row: None,
200 cur_ids: Vec::new(),
201 cur_idx: 0,
202 cur_emitted: false,
203 }
204 }
205}
206
207impl<'a, S: GraphStorage> RowSource for NodeScanSource<'a, S> {
208 fn next_row(&mut self) -> ExecResult<Option<Row>> {
209 loop {
210 if self.cur_row.is_none() {
211 match self.upstream.next_row()? {
212 Some(row) => {
213 self.cur_row = Some(row);
214 self.cur_ids.clear();
215 self.cur_idx = 0;
216 self.cur_emitted = false;
217 }
218 None => return Ok(None),
219 }
220 }
221
222 let row_ref = self.cur_row.as_ref().unwrap();
223
224 if let Some(existing) = row_ref.get(self.var) {
227 if self.cur_emitted {
228 self.cur_row = None;
229 continue;
230 }
231 self.cur_emitted = true;
232 match existing {
233 LoraValue::Node(id) => {
234 if self.storage.has_node(*id) {
235 let row = self.cur_row.take().unwrap();
236 self.cur_emitted = false;
237 return Ok(Some(row));
238 }
239 self.cur_row = None;
240 continue;
241 }
242 other => {
243 return Err(ExecutorError::ExpectedNodeForExpand {
244 var: format!("{:?}", self.var),
245 found: value_kind(other),
246 });
247 }
248 }
249 }
250
251 if self.cur_idx == 0 && self.cur_ids.is_empty() {
254 self.cur_ids = self.storage.all_node_ids();
255 }
256 if self.cur_idx >= self.cur_ids.len() {
257 self.cur_row = None;
258 self.cur_ids.clear();
259 continue;
260 }
261 let id = self.cur_ids[self.cur_idx];
262 self.cur_idx += 1;
263 let mut new_row = row_ref.clone();
264 new_row.insert(self.var, LoraValue::Node(id));
265 return Ok(Some(new_row));
266 }
267 }
268}
269
270pub struct NodeByLabelScanSource<'a, S: GraphStorage> {
275 upstream: Box<dyn RowSource + 'a>,
276 storage: &'a S,
277 var: VarId,
278 labels: &'a [Vec<String>],
279 cur_row: Option<Row>,
280 cur_ids: Vec<NodeId>,
281 cur_idx: usize,
282 cur_emitted: bool,
283}
284
285impl<'a, S: GraphStorage> NodeByLabelScanSource<'a, S> {
286 fn new(
287 upstream: Box<dyn RowSource + 'a>,
288 storage: &'a S,
289 var: VarId,
290 labels: &'a [Vec<String>],
291 ) -> Self {
292 Self {
293 upstream,
294 storage,
295 var,
296 labels,
297 cur_row: None,
298 cur_ids: Vec::new(),
299 cur_idx: 0,
300 cur_emitted: false,
301 }
302 }
303}
304
305impl<'a, S: GraphStorage> RowSource for NodeByLabelScanSource<'a, S> {
306 fn next_row(&mut self) -> ExecResult<Option<Row>> {
307 loop {
308 if self.cur_row.is_none() {
309 match self.upstream.next_row()? {
310 Some(row) => {
311 self.cur_row = Some(row);
312 self.cur_ids.clear();
313 self.cur_idx = 0;
314 self.cur_emitted = false;
315 }
316 None => return Ok(None),
317 }
318 }
319
320 let row_ref = self.cur_row.as_ref().unwrap();
321
322 if let Some(existing) = row_ref.get(self.var) {
323 if self.cur_emitted {
324 self.cur_row = None;
325 continue;
326 }
327 self.cur_emitted = true;
328 match existing {
329 LoraValue::Node(id) => {
330 let labels_ok = self
331 .storage
332 .with_node(*id, |n| node_matches_label_groups(&n.labels, self.labels))
333 .unwrap_or(false);
334 if labels_ok {
335 let row = self.cur_row.take().unwrap();
336 self.cur_emitted = false;
337 return Ok(Some(row));
338 }
339 self.cur_row = None;
340 continue;
341 }
342 other => {
343 return Err(ExecutorError::ExpectedNodeForExpand {
344 var: format!("{:?}", self.var),
345 found: value_kind(other),
346 });
347 }
348 }
349 }
350
351 if self.cur_idx == 0 && self.cur_ids.is_empty() {
352 self.cur_ids = scan_node_ids_for_label_groups(self.storage, self.labels);
353 }
354
355 let candidates_prefiltered = label_group_candidates_prefiltered(self.labels);
357 while self.cur_idx < self.cur_ids.len() {
358 let id = self.cur_ids[self.cur_idx];
359 self.cur_idx += 1;
360 if !candidates_prefiltered {
361 let labels_ok = self
362 .storage
363 .with_node(id, |n| node_matches_label_groups(&n.labels, self.labels))
364 .unwrap_or(false);
365 if !labels_ok {
366 continue;
367 }
368 }
369 let mut new_row = row_ref.clone();
370 new_row.insert(self.var, LoraValue::Node(id));
371 return Ok(Some(new_row));
372 }
373
374 self.cur_row = None;
375 self.cur_ids.clear();
376 }
377 }
378}
379
380pub struct NodeByPropertyScanSource<'a, S: GraphStorage> {
384 upstream: Box<dyn RowSource + 'a>,
385 ctx: StreamCtx<'a, S>,
386 var: VarId,
387 labels: &'a [Vec<String>],
388 key: &'a str,
389 value: &'a ResolvedExpr,
390 cur_row: Option<Row>,
391 cur_expected: Option<LoraValue>,
392 cur_ids: Vec<NodeId>,
393 cur_idx: usize,
394 cur_emitted: bool,
395 cur_prefiltered: bool,
396}
397
398impl<'a, S: GraphStorage> NodeByPropertyScanSource<'a, S> {
399 fn new(
400 upstream: Box<dyn RowSource + 'a>,
401 ctx: StreamCtx<'a, S>,
402 var: VarId,
403 labels: &'a [Vec<String>],
404 key: &'a str,
405 value: &'a ResolvedExpr,
406 ) -> Self {
407 Self {
408 upstream,
409 ctx,
410 var,
411 labels,
412 key,
413 value,
414 cur_row: None,
415 cur_expected: None,
416 cur_ids: Vec::new(),
417 cur_idx: 0,
418 cur_emitted: false,
419 cur_prefiltered: false,
420 }
421 }
422}
423
424impl<'a, S: GraphStorage> RowSource for NodeByPropertyScanSource<'a, S> {
425 fn next_row(&mut self) -> ExecResult<Option<Row>> {
426 loop {
427 if self.cur_row.is_none() {
428 match self.upstream.next_row()? {
429 Some(row) => {
430 let expected = eval_expr(self.value, &row, &self.ctx.eval_ctx());
431 let candidates = indexed_node_property_candidates(
432 self.ctx.storage,
433 self.labels,
434 self.key,
435 &expected,
436 );
437 self.cur_ids = candidates.ids;
438 self.cur_prefiltered = candidates.prefiltered;
439 self.cur_row = Some(row);
440 self.cur_expected = Some(expected);
441 self.cur_idx = 0;
442 self.cur_emitted = false;
443 }
444 None => return Ok(None),
445 }
446 }
447
448 let row_ref = self.cur_row.as_ref().unwrap();
449 let expected = self.cur_expected.as_ref().unwrap();
450
451 if let Some(existing) = row_ref.get(self.var) {
452 if self.cur_emitted {
453 self.cur_row = None;
454 self.cur_expected = None;
455 self.cur_ids.clear();
456 continue;
457 }
458 self.cur_emitted = true;
459 match existing {
460 LoraValue::Node(id) => {
461 if node_matches_property_filter(
462 self.ctx.storage,
463 *id,
464 self.labels,
465 self.key,
466 expected,
467 ) {
468 let row = self.cur_row.take().unwrap();
469 self.cur_expected = None;
470 self.cur_ids.clear();
471 self.cur_emitted = false;
472 return Ok(Some(row));
473 }
474 self.cur_row = None;
475 self.cur_expected = None;
476 self.cur_ids.clear();
477 continue;
478 }
479 other => {
480 return Err(ExecutorError::ExpectedNodeForExpand {
481 var: format!("{:?}", self.var),
482 found: value_kind(other),
483 });
484 }
485 }
486 }
487
488 while self.cur_idx < self.cur_ids.len() {
489 let id = self.cur_ids[self.cur_idx];
490 self.cur_idx += 1;
491 if !self.cur_prefiltered
492 && !node_matches_property_filter(
493 self.ctx.storage,
494 id,
495 self.labels,
496 self.key,
497 expected,
498 )
499 {
500 continue;
501 }
502 let mut new_row = row_ref.clone();
503 new_row.insert(self.var, LoraValue::Node(id));
504 return Ok(Some(new_row));
505 }
506
507 self.cur_row = None;
508 self.cur_expected = None;
509 self.cur_ids.clear();
510 }
511 }
512}
513
514pub struct ExpandSource<'a, S: GraphStorage> {
519 upstream: Box<dyn RowSource + 'a>,
520 ctx: StreamCtx<'a, S>,
521 src: VarId,
522 rel: Option<VarId>,
523 dst: VarId,
524 types: &'a [String],
525 direction: Direction,
526 rel_properties: Option<&'a ResolvedExpr>,
527 cur_row: Option<Row>,
528 cur_edges: Vec<(u64, NodeId)>,
529 cur_idx: usize,
530}
531
532impl<'a, S: GraphStorage> ExpandSource<'a, S> {
533 #[allow(clippy::too_many_arguments)]
534 fn new(
535 upstream: Box<dyn RowSource + 'a>,
536 ctx: StreamCtx<'a, S>,
537 src: VarId,
538 rel: Option<VarId>,
539 dst: VarId,
540 types: &'a [String],
541 direction: Direction,
542 rel_properties: Option<&'a ResolvedExpr>,
543 ) -> Self {
544 Self {
545 upstream,
546 ctx,
547 src,
548 rel,
549 dst,
550 types,
551 direction,
552 rel_properties,
553 cur_row: None,
554 cur_edges: Vec::new(),
555 cur_idx: 0,
556 }
557 }
558}
559
560impl<'a, S: GraphStorage> RowSource for ExpandSource<'a, S> {
561 fn next_row(&mut self) -> ExecResult<Option<Row>> {
562 loop {
563 if self.cur_row.is_none() {
564 match self.upstream.next_row()? {
565 Some(row) => {
566 let src_id = match row.get(self.src) {
569 Some(LoraValue::Node(id)) => *id,
570 Some(other) => {
571 return Err(ExecutorError::ExpectedNodeForExpand {
572 var: format!("{:?}", self.src),
573 found: value_kind(other),
574 });
575 }
576 None => continue,
577 };
578 self.cur_edges =
579 self.ctx
580 .storage
581 .expand_ids(src_id, self.direction, self.types);
582 self.cur_idx = 0;
583 self.cur_row = Some(row);
584 }
585 None => return Ok(None),
586 }
587 }
588
589 let row_ref = self.cur_row.as_ref().unwrap();
590
591 while self.cur_idx < self.cur_edges.len() {
592 let (rel_id, dst_id) = self.cur_edges[self.cur_idx];
593 self.cur_idx += 1;
594
595 if let Some(expr) = self.rel_properties {
597 let actual = self
598 .ctx
599 .storage
600 .with_relationship(rel_id, |rel| rel.properties.clone());
601 let matches = match actual {
602 Some(props) => {
603 let eval_ctx = self.ctx.eval_ctx();
604 let expected = eval_expr(expr, row_ref, &eval_ctx);
605 let LoraValue::Map(map) = expected else {
606 return Err(ExecutorError::ExpectedPropertyMap {
607 found: value_kind(&expected),
608 });
609 };
610 map.iter().all(|(k, v)| {
611 props
612 .get(k)
613 .map(|actual| value_matches_property_value(v, actual))
614 .unwrap_or(false)
615 })
616 }
617 None => false,
618 };
619 if !matches {
620 continue;
621 }
622 }
623
624 if let Some(existing_dst) = row_ref.get(self.dst) {
626 match existing_dst {
627 LoraValue::Node(id) if *id == dst_id => {}
628 LoraValue::Node(_) => continue,
629 other => {
630 return Err(ExecutorError::ExpectedNodeForExpand {
631 var: format!("{:?}", self.dst),
632 found: value_kind(other),
633 });
634 }
635 }
636 }
637 if let Some(rel_var) = self.rel {
638 if let Some(existing_rel) = row_ref.get(rel_var) {
639 match existing_rel {
640 LoraValue::Relationship(id) if *id == rel_id => {}
641 LoraValue::Relationship(_) => continue,
642 other => {
643 return Err(ExecutorError::ExpectedRelationshipForExpand {
644 var: format!("{:?}", rel_var),
645 found: value_kind(other),
646 });
647 }
648 }
649 }
650 }
651
652 let mut new_row = row_ref.clone();
653 if !new_row.contains_key(self.dst) {
654 new_row.insert(self.dst, LoraValue::Node(dst_id));
655 }
656 if let Some(rel_var) = self.rel {
657 if !new_row.contains_key(rel_var) {
658 new_row.insert(rel_var, LoraValue::Relationship(rel_id));
659 }
660 }
661 return Ok(Some(new_row));
662 }
663
664 self.cur_row = None;
666 self.cur_edges.clear();
667 self.cur_idx = 0;
668 }
669 }
670}
671
672pub struct VariableLengthExpandSource<'a, S: GraphStorage> {
675 upstream: Box<dyn RowSource + 'a>,
676 ctx: StreamCtx<'a, S>,
677 src: VarId,
678 rel: Option<VarId>,
679 dst: VarId,
680 types: &'a [String],
681 direction: Direction,
682 min_hops: u64,
683 max_hops: u64,
684 cur_row: Option<Row>,
685 pending_zero_hop: bool,
686 frontier: Vec<(NodeId, Vec<u64>)>,
687 frontier_idx: usize,
688 next_frontier: Vec<(NodeId, Vec<u64>)>,
689 depth: u64,
690 cur_path_node: Option<NodeId>,
691 cur_path_rels: Vec<u64>,
692 cur_edges: Vec<(u64, NodeId)>,
693 cur_edge_idx: usize,
694}
695
696impl<'a, S: GraphStorage> VariableLengthExpandSource<'a, S> {
697 #[allow(clippy::too_many_arguments)]
698 fn new(
699 upstream: Box<dyn RowSource + 'a>,
700 ctx: StreamCtx<'a, S>,
701 src: VarId,
702 rel: Option<VarId>,
703 dst: VarId,
704 types: &'a [String],
705 direction: Direction,
706 range: &'a RangeLiteral,
707 ) -> Self {
708 let (min_hops, max_hops) = resolve_range(range);
709 Self {
710 upstream,
711 ctx,
712 src,
713 rel,
714 dst,
715 types,
716 direction,
717 min_hops,
718 max_hops,
719 cur_row: None,
720 pending_zero_hop: false,
721 frontier: Vec::new(),
722 frontier_idx: 0,
723 next_frontier: Vec::new(),
724 depth: 1,
725 cur_path_node: None,
726 cur_path_rels: Vec::new(),
727 cur_edges: Vec::new(),
728 cur_edge_idx: 0,
729 }
730 }
731
732 fn start_row(&mut self, row: Row, src_id: NodeId) {
733 self.cur_row = Some(row);
734 self.pending_zero_hop = self.min_hops == 0;
735 self.frontier.clear();
736 self.frontier.push((src_id, Vec::new()));
737 self.frontier_idx = 0;
738 self.next_frontier.clear();
739 self.depth = 1;
740 self.cur_path_node = None;
741 self.cur_path_rels.clear();
742 self.cur_edges.clear();
743 self.cur_edge_idx = 0;
744 }
745
746 fn clear_current_row(&mut self) {
747 self.cur_row = None;
748 self.pending_zero_hop = false;
749 self.frontier.clear();
750 self.frontier_idx = 0;
751 self.next_frontier.clear();
752 self.depth = 1;
753 self.cur_path_node = None;
754 self.cur_path_rels.clear();
755 self.cur_edges.clear();
756 self.cur_edge_idx = 0;
757 }
758
759 fn row_for_path(&self, dst_node_id: NodeId, rel_ids: &[u64]) -> Row {
760 let mut new_row = self
761 .cur_row
762 .as_ref()
763 .expect("cur_row is set while yielding variable-length results")
764 .clone();
765 new_row.insert(self.dst, LoraValue::Node(dst_node_id));
766
767 if let Some(rel_var) = self.rel {
768 let rels = rel_ids
769 .iter()
770 .copied()
771 .map(LoraValue::Relationship)
772 .collect();
773 new_row.insert(rel_var, LoraValue::List(rels));
774 }
775
776 new_row
777 }
778
779 fn advance_frontier(&mut self) -> bool {
780 if self.next_frontier.is_empty() || self.depth >= self.max_hops {
781 return false;
782 }
783
784 std::mem::swap(&mut self.frontier, &mut self.next_frontier);
785 self.next_frontier.clear();
786 self.frontier_idx = 0;
787 self.depth += 1;
788 true
789 }
790}
791
792impl<'a, S: GraphStorage> RowSource for VariableLengthExpandSource<'a, S> {
793 fn next_row(&mut self) -> ExecResult<Option<Row>> {
794 loop {
795 if self.cur_row.is_none() {
796 match self.upstream.next_row()? {
797 Some(row) => {
798 let src_id = match row.get(self.src) {
799 Some(LoraValue::Node(id)) => *id,
800 Some(other) => {
801 return Err(ExecutorError::ExpectedNodeForExpand {
802 var: format!("{:?}", self.src),
803 found: value_kind(other),
804 });
805 }
806 None => continue,
807 };
808 self.start_row(row, src_id);
809 }
810 None => return Ok(None),
811 }
812 }
813
814 if self.pending_zero_hop {
815 self.pending_zero_hop = false;
816 if let Some(LoraValue::Node(src_id)) = self
817 .cur_row
818 .as_ref()
819 .expect("cur_row is initialized before zero-hop yield")
820 .get(self.src)
821 {
822 return Ok(Some(self.row_for_path(*src_id, &[])));
823 }
824 }
825
826 loop {
827 if self.depth > self.max_hops {
828 self.clear_current_row();
829 break;
830 }
831
832 if self.cur_path_node.is_none() {
833 if self.frontier_idx >= self.frontier.len() {
834 if self.advance_frontier() {
835 continue;
836 }
837 self.clear_current_row();
838 break;
839 }
840
841 let (node_id, rels) = &self.frontier[self.frontier_idx];
842 self.frontier_idx += 1;
843 self.cur_path_node = Some(*node_id);
844 self.cur_path_rels.clone_from(rels);
845 self.cur_edges =
846 self.ctx
847 .storage
848 .expand_ids(*node_id, self.direction, self.types);
849 self.cur_edge_idx = 0;
850 }
851
852 while self.cur_edge_idx < self.cur_edges.len() {
853 let (rel_id, neighbor_id) = self.cur_edges[self.cur_edge_idx];
854 self.cur_edge_idx += 1;
855
856 if self.cur_path_rels.contains(&rel_id) {
857 continue;
858 }
859
860 let mut rel_ids = Vec::with_capacity(self.cur_path_rels.len() + 1);
861 rel_ids.extend_from_slice(&self.cur_path_rels);
862 rel_ids.push(rel_id);
863
864 if self.depth < self.max_hops {
865 self.next_frontier.push((neighbor_id, rel_ids.clone()));
866 }
867
868 if self.depth >= self.min_hops {
869 return Ok(Some(self.row_for_path(neighbor_id, &rel_ids)));
870 }
871 }
872
873 self.cur_path_node = None;
874 self.cur_path_rels.clear();
875 self.cur_edges.clear();
876 self.cur_edge_idx = 0;
877 }
878 }
879 }
880}
881
882pub struct FilterSource<'a, S: GraphStorage> {
884 upstream: Box<dyn RowSource + 'a>,
885 ctx: StreamCtx<'a, S>,
886 predicate: &'a ResolvedExpr,
887}
888
889impl<'a, S: GraphStorage> FilterSource<'a, S> {
890 fn new(
891 upstream: Box<dyn RowSource + 'a>,
892 ctx: StreamCtx<'a, S>,
893 predicate: &'a ResolvedExpr,
894 ) -> Self {
895 Self {
896 upstream,
897 ctx,
898 predicate,
899 }
900 }
901}
902
903impl<'a, S: GraphStorage> RowSource for FilterSource<'a, S> {
904 fn next_row(&mut self) -> ExecResult<Option<Row>> {
905 loop {
906 match self.upstream.next_row()? {
907 Some(row) => {
908 let eval_ctx = self.ctx.eval_ctx();
909 if eval_expr(self.predicate, &row, &eval_ctx).is_truthy() {
910 return Ok(Some(row));
911 }
912 }
913 None => return Ok(None),
914 }
915 }
916 }
917}
918
919pub struct ProjectionSource<'a, S: GraphStorage> {
923 upstream: Box<dyn RowSource + 'a>,
924 ctx: StreamCtx<'a, S>,
925 items: &'a [ResolvedProjection],
926 include_existing: bool,
927}
928
929impl<'a, S: GraphStorage> ProjectionSource<'a, S> {
930 fn new(
931 upstream: Box<dyn RowSource + 'a>,
932 ctx: StreamCtx<'a, S>,
933 items: &'a [ResolvedProjection],
934 include_existing: bool,
935 ) -> Self {
936 Self {
937 upstream,
938 ctx,
939 items,
940 include_existing,
941 }
942 }
943}
944
945impl<'a, S: GraphStorage> RowSource for ProjectionSource<'a, S> {
946 fn next_row(&mut self) -> ExecResult<Option<Row>> {
947 match self.upstream.next_row()? {
948 None => Ok(None),
949 Some(row) => {
950 let eval_ctx = self.ctx.eval_ctx();
951 if self.include_existing {
952 let mut projected = row;
953 for item in self.items {
954 let value = eval_expr(&item.expr, &projected, &eval_ctx);
955 if let Some(err) = take_eval_error() {
956 return Err(ExecutorError::RuntimeError(err));
957 }
958 projected.insert_named(item.output, item.name.clone(), value);
959 }
960 Ok(Some(projected))
961 } else {
962 let mut projected = Row::new();
963 for item in self.items {
964 let value = eval_expr(&item.expr, &row, &eval_ctx);
965 if let Some(err) = take_eval_error() {
966 return Err(ExecutorError::RuntimeError(err));
967 }
968 projected.insert_named(item.output, item.name.clone(), value);
969 }
970 Ok(Some(projected))
971 }
972 }
973 }
974 }
975}
976
977pub struct UnwindSource<'a, S: GraphStorage> {
981 upstream: Box<dyn RowSource + 'a>,
982 ctx: StreamCtx<'a, S>,
983 expr: &'a ResolvedExpr,
984 alias: VarId,
985 cur_row: Option<Row>,
986 cur_values: Vec<LoraValue>,
987 cur_idx: usize,
988}
989
990impl<'a, S: GraphStorage> UnwindSource<'a, S> {
991 fn new(
992 upstream: Box<dyn RowSource + 'a>,
993 ctx: StreamCtx<'a, S>,
994 expr: &'a ResolvedExpr,
995 alias: VarId,
996 ) -> Self {
997 Self {
998 upstream,
999 ctx,
1000 expr,
1001 alias,
1002 cur_row: None,
1003 cur_values: Vec::new(),
1004 cur_idx: 0,
1005 }
1006 }
1007}
1008
1009impl<'a, S: GraphStorage> RowSource for UnwindSource<'a, S> {
1010 fn next_row(&mut self) -> ExecResult<Option<Row>> {
1011 loop {
1012 if self.cur_idx < self.cur_values.len() {
1013 let value = self.cur_values[self.cur_idx].clone();
1014 self.cur_idx += 1;
1015 let mut new_row = self
1016 .cur_row
1017 .as_ref()
1018 .expect("cur_values is non-empty implies cur_row is set")
1019 .clone();
1020 new_row.insert(self.alias, value);
1021 return Ok(Some(new_row));
1022 }
1023
1024 self.cur_row = None;
1025 self.cur_values.clear();
1026 self.cur_idx = 0;
1027
1028 match self.upstream.next_row()? {
1029 None => return Ok(None),
1030 Some(row) => {
1031 let eval_ctx = self.ctx.eval_ctx();
1032 let value = eval_expr(self.expr, &row, &eval_ctx);
1033 match value {
1034 LoraValue::List(values) => {
1035 self.cur_row = Some(row);
1036 self.cur_values = values;
1037 self.cur_idx = 0;
1038 }
1040 LoraValue::Null => {
1041 }
1043 scalar => {
1044 let mut new_row = row;
1046 new_row.insert(self.alias, scalar);
1047 return Ok(Some(new_row));
1048 }
1049 }
1050 }
1051 }
1052 }
1053 }
1054}
1055
1056pub struct LimitSource<'a> {
1060 upstream: Box<dyn RowSource + 'a>,
1061 skip: usize,
1062 limit: Option<usize>,
1063 skipped: usize,
1064 emitted: usize,
1065}
1066
1067impl<'a> LimitSource<'a> {
1068 fn new(upstream: Box<dyn RowSource + 'a>, skip: usize, limit: Option<usize>) -> Self {
1069 Self {
1070 upstream,
1071 skip,
1072 limit,
1073 skipped: 0,
1074 emitted: 0,
1075 }
1076 }
1077}
1078
1079impl<'a> RowSource for LimitSource<'a> {
1080 fn next_row(&mut self) -> ExecResult<Option<Row>> {
1081 while self.skipped < self.skip {
1083 match self.upstream.next_row()? {
1084 Some(_) => self.skipped += 1,
1085 None => return Ok(None),
1086 }
1087 }
1088 if let Some(lim) = self.limit {
1089 if self.emitted >= lim {
1090 return Ok(None);
1091 }
1092 }
1093 match self.upstream.next_row()? {
1094 Some(row) => {
1095 self.emitted += 1;
1096 Ok(Some(row))
1097 }
1098 None => Ok(None),
1099 }
1100 }
1101}
1102
1103pub struct SortSource<'a, S: GraphStorage> {
1114 state: SortState<'a, S>,
1115}
1116
1117enum SortState<'a, S: GraphStorage> {
1118 Pending {
1119 upstream: Box<dyn RowSource + 'a>,
1120 ctx: StreamCtx<'a, S>,
1121 items: &'a [lora_analyzer::ResolvedSortItem],
1122 },
1123 Yielding(std::vec::IntoIter<Row>),
1124}
1125
1126impl<'a, S: GraphStorage> SortSource<'a, S> {
1127 fn new(
1128 upstream: Box<dyn RowSource + 'a>,
1129 ctx: StreamCtx<'a, S>,
1130 items: &'a [lora_analyzer::ResolvedSortItem],
1131 ) -> Self {
1132 Self {
1133 state: SortState::Pending {
1134 upstream,
1135 ctx,
1136 items,
1137 },
1138 }
1139 }
1140
1141 fn materialize(
1144 upstream: &mut Box<dyn RowSource + 'a>,
1145 ctx: &StreamCtx<'a, S>,
1146 items: &[lora_analyzer::ResolvedSortItem],
1147 ) -> ExecResult<Vec<Row>> {
1148 let mut rows = drain(upstream.as_mut())?;
1149 let eval_ctx = ctx.eval_ctx();
1150 rows.sort_by(|a, b| {
1151 for item in items {
1152 let ord = crate::executor::compare_sort_item(item, a, b, &eval_ctx);
1153 if ord != std::cmp::Ordering::Equal {
1154 return ord;
1155 }
1156 }
1157 std::cmp::Ordering::Equal
1158 });
1159 Ok(rows)
1160 }
1161}
1162
1163impl<'a, S: GraphStorage> RowSource for SortSource<'a, S> {
1164 fn next_row(&mut self) -> ExecResult<Option<Row>> {
1165 loop {
1166 match &mut self.state {
1167 SortState::Pending {
1168 upstream,
1169 ctx,
1170 items,
1171 } => {
1172 let rows = Self::materialize(upstream, ctx, items)?;
1173 self.state = SortState::Yielding(rows.into_iter());
1174 }
1176 SortState::Yielding(it) => return Ok(it.next()),
1177 }
1178 }
1179 }
1180}
1181
1182pub struct DistinctSource<'a> {
1186 upstream: Box<dyn RowSource + 'a>,
1187 seen: BTreeSet<Vec<GroupValueKey>>,
1188}
1189
1190impl<'a> DistinctSource<'a> {
1191 fn new(upstream: Box<dyn RowSource + 'a>) -> Self {
1192 Self {
1193 upstream,
1194 seen: BTreeSet::new(),
1195 }
1196 }
1197}
1198
1199impl<'a> RowSource for DistinctSource<'a> {
1200 fn next_row(&mut self) -> ExecResult<Option<Row>> {
1201 while let Some(row) = self.upstream.next_row()? {
1202 let key = row
1203 .iter()
1204 .map(|(_, val)| GroupValueKey::from_value(val))
1205 .collect();
1206 if self.seen.insert(key) {
1207 return Ok(Some(row));
1208 }
1209 }
1210 Ok(None)
1211 }
1212}
1213
1214pub struct HashAggregationSource<'a, S: GraphStorage> {
1219 state: HashAggregationState<'a, S>,
1220}
1221
1222enum HashAggregationState<'a, S: GraphStorage> {
1223 Pending {
1224 upstream: Box<dyn RowSource + 'a>,
1225 ctx: StreamCtx<'a, S>,
1226 group_by: &'a [ResolvedProjection],
1227 aggregates: &'a [ResolvedProjection],
1228 },
1229 Yielding(std::vec::IntoIter<Row>),
1230}
1231
1232impl<'a, S: GraphStorage> HashAggregationSource<'a, S> {
1233 fn new(
1234 upstream: Box<dyn RowSource + 'a>,
1235 ctx: StreamCtx<'a, S>,
1236 group_by: &'a [ResolvedProjection],
1237 aggregates: &'a [ResolvedProjection],
1238 ) -> Self {
1239 Self {
1240 state: HashAggregationState::Pending {
1241 upstream,
1242 ctx,
1243 group_by,
1244 aggregates,
1245 },
1246 }
1247 }
1248
1249 fn materialize(
1250 upstream: &mut Box<dyn RowSource + 'a>,
1251 ctx: &StreamCtx<'a, S>,
1252 group_by: &[ResolvedProjection],
1253 aggregates: &[ResolvedProjection],
1254 ) -> ExecResult<Vec<Row>> {
1255 let input_rows = drain(upstream.as_mut())?;
1256 let eval_ctx = ctx.eval_ctx();
1257 let mut groups: BTreeMap<Vec<GroupValueKey>, Vec<Row>> = BTreeMap::new();
1258
1259 if group_by.is_empty() {
1260 groups.insert(Vec::new(), input_rows);
1261 } else {
1262 for row in input_rows {
1263 let key = group_by
1264 .iter()
1265 .map(|proj| GroupValueKey::from_value(&eval_expr(&proj.expr, &row, &eval_ctx)))
1266 .collect::<Vec<_>>();
1267 groups.entry(key).or_default().push(row);
1268 }
1269 }
1270
1271 let mut out = Vec::new();
1272 for rows in groups.into_values() {
1273 let mut result = Row::new();
1274 if let Some(first) = rows.first() {
1275 for proj in group_by {
1276 let value = hydrate_value(eval_expr(&proj.expr, first, &eval_ctx), ctx.storage);
1277 result.insert_named(proj.output, proj.name.clone(), value);
1278 }
1279 }
1280 for proj in aggregates {
1281 let value = compute_aggregate_expr(&proj.expr, &rows, &eval_ctx);
1282 result.insert_named(proj.output, proj.name.clone(), value);
1283 }
1284 out.push(result);
1285 }
1286
1287 Ok(out)
1288 }
1289}
1290
1291impl<'a, S: GraphStorage> RowSource for HashAggregationSource<'a, S> {
1292 fn next_row(&mut self) -> ExecResult<Option<Row>> {
1293 loop {
1294 match &mut self.state {
1295 HashAggregationState::Pending {
1296 upstream,
1297 ctx,
1298 group_by,
1299 aggregates,
1300 } => {
1301 let rows = Self::materialize(upstream, ctx, group_by, aggregates)?;
1302 self.state = HashAggregationState::Yielding(rows.into_iter());
1303 }
1304 HashAggregationState::Yielding(it) => return Ok(it.next()),
1305 }
1306 }
1307 }
1308}
1309
1310pub struct OptionalMatchSource<'a, S: GraphStorage> {
1315 upstream: Box<dyn RowSource + 'a>,
1316 ctx: StreamCtx<'a, S>,
1317 plan: &'a PhysicalPlan,
1318 inner: PhysicalNodeId,
1319 new_vars: &'a [VarId],
1320 inner_rows: Option<Vec<Row>>,
1321 cur_input: Option<Row>,
1322 cur_inner_idx: usize,
1323 cur_matched: bool,
1324}
1325
1326impl<'a, S: GraphStorage> OptionalMatchSource<'a, S> {
1327 fn new(
1328 upstream: Box<dyn RowSource + 'a>,
1329 ctx: StreamCtx<'a, S>,
1330 plan: &'a PhysicalPlan,
1331 inner: PhysicalNodeId,
1332 new_vars: &'a [VarId],
1333 ) -> Self {
1334 Self {
1335 upstream,
1336 ctx,
1337 plan,
1338 inner,
1339 new_vars,
1340 inner_rows: None,
1341 cur_input: None,
1342 cur_inner_idx: 0,
1343 cur_matched: false,
1344 }
1345 }
1346
1347 fn ensure_inner_rows(&mut self) -> ExecResult<()> {
1348 if self.inner_rows.is_none() {
1349 let mut inner = build_streaming(
1350 self.plan,
1351 self.inner,
1352 self.ctx.storage,
1353 self.ctx.params.clone(),
1354 )?;
1355 self.inner_rows = Some(drain(inner.as_mut())?);
1356 }
1357 Ok(())
1358 }
1359}
1360
1361impl<'a, S: GraphStorage> RowSource for OptionalMatchSource<'a, S> {
1362 fn next_row(&mut self) -> ExecResult<Option<Row>> {
1363 self.ensure_inner_rows()?;
1364 loop {
1365 if self.cur_input.is_none() {
1366 match self.upstream.next_row()? {
1367 Some(input_row) => {
1368 self.cur_input = Some(input_row);
1369 self.cur_inner_idx = 0;
1370 self.cur_matched = false;
1371 }
1372 None => return Ok(None),
1373 }
1374 }
1375
1376 let inner_rows = self
1377 .inner_rows
1378 .as_ref()
1379 .expect("ensure_inner_rows initializes inner_rows");
1380 let input_row = self
1381 .cur_input
1382 .as_ref()
1383 .expect("cur_input is initialized above");
1384
1385 while self.cur_inner_idx < inner_rows.len() {
1386 let inner_row = &inner_rows[self.cur_inner_idx];
1387 self.cur_inner_idx += 1;
1388
1389 let compatible = input_row
1390 .iter()
1391 .all(|(var, val)| match inner_row.get(*var) {
1392 Some(inner_val) => inner_val == val,
1393 None => true,
1394 });
1395 if !compatible {
1396 continue;
1397 }
1398
1399 let mut merged = input_row.clone();
1400 for (var, name, val) in inner_row.iter_named() {
1401 if !merged.contains_key(*var) {
1402 merged.insert_named(*var, name.into_owned(), val.clone());
1403 }
1404 }
1405 self.cur_matched = true;
1406 return Ok(Some(merged));
1407 }
1408
1409 let mut input_row = self
1410 .cur_input
1411 .take()
1412 .expect("cur_input is initialized while finishing optional row");
1413 if !self.cur_matched {
1414 for &var_id in self.new_vars {
1415 if !input_row.contains_key(var_id) {
1416 input_row.insert(var_id, LoraValue::Null);
1417 }
1418 }
1419 return Ok(Some(input_row));
1420 }
1421 }
1422 }
1423}
1424
1425pub struct PathBuildSource<'a, S: GraphStorage> {
1429 state: PathBuildState<'a, S>,
1430}
1431
1432enum PathBuildState<'a, S: GraphStorage> {
1433 Streaming {
1434 upstream: Box<dyn RowSource + 'a>,
1435 ctx: StreamCtx<'a, S>,
1436 output: VarId,
1437 node_vars: &'a [VarId],
1438 rel_vars: &'a [VarId],
1439 },
1440 PendingShortest {
1441 upstream: Box<dyn RowSource + 'a>,
1442 ctx: StreamCtx<'a, S>,
1443 output: VarId,
1444 node_vars: &'a [VarId],
1445 rel_vars: &'a [VarId],
1446 all: bool,
1447 },
1448 Yielding(std::vec::IntoIter<Row>),
1449}
1450
1451impl<'a, S: GraphStorage> PathBuildSource<'a, S> {
1452 fn new(
1453 upstream: Box<dyn RowSource + 'a>,
1454 ctx: StreamCtx<'a, S>,
1455 output: VarId,
1456 node_vars: &'a [VarId],
1457 rel_vars: &'a [VarId],
1458 shortest_path_all: Option<bool>,
1459 ) -> Self {
1460 let state = match shortest_path_all {
1461 Some(all) => PathBuildState::PendingShortest {
1462 upstream,
1463 ctx,
1464 output,
1465 node_vars,
1466 rel_vars,
1467 all,
1468 },
1469 None => PathBuildState::Streaming {
1470 upstream,
1471 ctx,
1472 output,
1473 node_vars,
1474 rel_vars,
1475 },
1476 };
1477 Self { state }
1478 }
1479
1480 fn attach_path(
1481 mut row: Row,
1482 ctx: &StreamCtx<'a, S>,
1483 output: VarId,
1484 node_vars: &[VarId],
1485 rel_vars: &[VarId],
1486 ) -> Row {
1487 let path = build_path_value(&row, node_vars, rel_vars, ctx.storage);
1488 row.insert(output, path);
1489 row
1490 }
1491
1492 fn shortest_path_rows(
1493 upstream: &mut Box<dyn RowSource + 'a>,
1494 ctx: &StreamCtx<'a, S>,
1495 output: VarId,
1496 node_vars: &[VarId],
1497 rel_vars: &[VarId],
1498 all: bool,
1499 ) -> ExecResult<Vec<Row>> {
1500 let mut best_len: Option<usize> = None;
1501 let mut best_rows = Vec::new();
1502
1503 while let Some(row) = upstream.next_row()? {
1504 let row = Self::attach_path(row, ctx, output, node_vars, rel_vars);
1505 let path_len = match row.get(output) {
1506 Some(LoraValue::Path(path)) => path.rels.len(),
1507 _ => usize::MAX,
1508 };
1509
1510 match best_len {
1511 None => {
1512 best_len = Some(path_len);
1513 best_rows.push(row);
1514 }
1515 Some(current) if path_len < current => {
1516 best_len = Some(path_len);
1517 best_rows.clear();
1518 best_rows.push(row);
1519 }
1520 Some(current) if path_len == current && all => best_rows.push(row),
1521 _ => {}
1522 }
1523 }
1524
1525 Ok(best_rows)
1526 }
1527}
1528
1529impl<'a, S: GraphStorage> RowSource for PathBuildSource<'a, S> {
1530 fn next_row(&mut self) -> ExecResult<Option<Row>> {
1531 loop {
1532 match &mut self.state {
1533 PathBuildState::Streaming {
1534 upstream,
1535 ctx,
1536 output,
1537 node_vars,
1538 rel_vars,
1539 } => {
1540 return Ok(upstream
1541 .next_row()?
1542 .map(|row| Self::attach_path(row, ctx, *output, node_vars, rel_vars)));
1543 }
1544 PathBuildState::PendingShortest {
1545 upstream,
1546 ctx,
1547 output,
1548 node_vars,
1549 rel_vars,
1550 all,
1551 } => {
1552 let rows = Self::shortest_path_rows(
1553 upstream, ctx, *output, node_vars, rel_vars, *all,
1554 )?;
1555 self.state = PathBuildState::Yielding(rows.into_iter());
1556 }
1557 PathBuildState::Yielding(it) => return Ok(it.next()),
1558 }
1559 }
1560 }
1561}
1562
1563pub struct UnionSource<'a> {
1572 branches: Vec<Box<dyn RowSource + 'a>>,
1573 branch_idx: usize,
1574 needs_dedup: bool,
1575 seen: BTreeSet<Vec<(String, GroupValueKey)>>,
1576}
1577
1578impl<'a> UnionSource<'a> {
1579 fn new(branches: Vec<Box<dyn RowSource + 'a>>, needs_dedup: bool) -> Self {
1580 Self {
1581 branches,
1582 branch_idx: 0,
1583 needs_dedup,
1584 seen: BTreeSet::new(),
1585 }
1586 }
1587}
1588
1589impl<'a> RowSource for UnionSource<'a> {
1590 fn next_row(&mut self) -> ExecResult<Option<Row>> {
1591 while self.branch_idx < self.branches.len() {
1592 match self.branches[self.branch_idx].next_row()? {
1593 Some(row) => {
1594 if self.needs_dedup {
1595 let key = row
1596 .iter_named()
1597 .map(|(_, name, val)| {
1598 (name.into_owned(), GroupValueKey::from_value(val))
1599 })
1600 .collect();
1601 if !self.seen.insert(key) {
1602 continue;
1603 }
1604 }
1605 return Ok(Some(row));
1606 }
1607 None => {
1608 self.branch_idx += 1;
1609 }
1610 }
1611 }
1612 Ok(None)
1613 }
1614}
1615
1616pub(crate) fn compiled_to_streaming<'a, S: GraphStorage + 'a>(
1628 compiled: &'a CompiledQuery,
1629 storage: &'a S,
1630 params: BTreeMap<String, LoraValue>,
1631) -> ExecResult<Box<dyn RowSource + 'a>> {
1632 let params = Arc::new(params);
1633
1634 if compiled.unions.is_empty() {
1635 let plan = &compiled.physical;
1636 let inner = build_streaming(plan, plan.root, storage, params)?;
1637 return Ok(Box::new(HydratingSource::new(inner, storage)));
1638 }
1639
1640 let mut branches: Vec<Box<dyn RowSource + 'a>> = Vec::with_capacity(compiled.unions.len() + 1);
1641
1642 let head_inner = build_streaming(
1643 &compiled.physical,
1644 compiled.physical.root,
1645 storage,
1646 params.clone(),
1647 )?;
1648 branches.push(Box::new(HydratingSource::new(head_inner, storage)));
1649
1650 let mut needs_dedup = false;
1651 for branch in &compiled.unions {
1652 let inner = build_streaming(
1653 &branch.physical,
1654 branch.physical.root,
1655 storage,
1656 params.clone(),
1657 )?;
1658 branches.push(Box::new(HydratingSource::new(inner, storage)));
1659 if !branch.all {
1660 needs_dedup = true;
1661 }
1662 }
1663
1664 Ok(Box::new(UnionSource::new(branches, needs_dedup)))
1665}
1666
1667pub struct HydratingSource<'a, S: GraphStorage> {
1671 upstream: Box<dyn RowSource + 'a>,
1672 storage: &'a S,
1673}
1674
1675impl<'a, S: GraphStorage> HydratingSource<'a, S> {
1676 fn new(upstream: Box<dyn RowSource + 'a>, storage: &'a S) -> Self {
1677 Self { upstream, storage }
1678 }
1679}
1680
1681impl<'a, S: GraphStorage> RowSource for HydratingSource<'a, S> {
1682 fn next_row(&mut self) -> ExecResult<Option<Row>> {
1683 match self.upstream.next_row()? {
1684 None => Ok(None),
1685 Some(row) => {
1686 let mut out = Row::new();
1687 for (var, name, value) in row.into_iter_named() {
1688 out.insert_named(var, name, hydrate_value(value, self.storage));
1689 }
1690 Ok(Some(out))
1691 }
1692 }
1693 }
1694}
1695
1696pub(crate) fn hydrate_value<S: GraphStorage>(value: LoraValue, storage: &S) -> LoraValue {
1697 match value {
1698 LoraValue::Node(id) => storage
1699 .with_node(id, hydrate_node_record)
1700 .unwrap_or(LoraValue::Null),
1701 LoraValue::Relationship(id) => storage
1702 .with_relationship(id, hydrate_relationship_record)
1703 .unwrap_or(LoraValue::Null),
1704 LoraValue::List(values) => LoraValue::List(
1705 values
1706 .into_iter()
1707 .map(|v| hydrate_value(v, storage))
1708 .collect(),
1709 ),
1710 LoraValue::Map(map) => LoraValue::Map(
1711 map.into_iter()
1712 .map(|(k, v)| (k, hydrate_value(v, storage)))
1713 .collect(),
1714 ),
1715 other => other,
1716 }
1717}
1718
1719pub(crate) fn is_streaming_op(op: &PhysicalOp) -> bool {
1727 match op {
1728 PhysicalOp::Argument(_)
1729 | PhysicalOp::NodeScan(_)
1730 | PhysicalOp::NodeByLabelScan(_)
1731 | PhysicalOp::NodeByPropertyScan(_)
1732 | PhysicalOp::Filter(_)
1733 | PhysicalOp::Unwind(_)
1734 | PhysicalOp::Limit(_)
1735 | PhysicalOp::Sort(_)
1742 | PhysicalOp::HashAggregation(_)
1743 | PhysicalOp::OptionalMatch(_)
1744 | PhysicalOp::PathBuild(_)
1745 | PhysicalOp::Projection(_) => true,
1749 PhysicalOp::Expand(_) => true,
1752 _ => false,
1753 }
1754}
1755
1756pub(crate) fn write_op_input(
1761 plan: &PhysicalPlan,
1762 node_id: PhysicalNodeId,
1763) -> Option<PhysicalNodeId> {
1764 match &plan.nodes[node_id] {
1765 PhysicalOp::Create(o) => Some(o.input),
1766 PhysicalOp::Set(o) => Some(o.input),
1767 PhysicalOp::Delete(o) => Some(o.input),
1768 PhysicalOp::Remove(o) => Some(o.input),
1769 PhysicalOp::Merge(o) => Some(o.input),
1770 _ => None,
1771 }
1772}
1773
1774pub(crate) fn subtree_is_fully_streaming(plan: &PhysicalPlan, node_id: PhysicalNodeId) -> bool {
1781 let op = &plan.nodes[node_id];
1782 if !is_streaming_op(op) {
1783 return false;
1784 }
1785 let child = match op {
1786 PhysicalOp::Argument(_) => return true,
1787 PhysicalOp::NodeScan(o) => o.input,
1788 PhysicalOp::NodeByLabelScan(o) => o.input,
1789 PhysicalOp::NodeByPropertyScan(o) => o.input,
1790 PhysicalOp::Filter(o) => Some(o.input),
1791 PhysicalOp::Unwind(o) => Some(o.input),
1792 PhysicalOp::Limit(o) => Some(o.input),
1793 PhysicalOp::Expand(o) => Some(o.input),
1794 PhysicalOp::Projection(o) => Some(o.input),
1795 PhysicalOp::Sort(o) => Some(o.input),
1796 PhysicalOp::HashAggregation(o) => Some(o.input),
1797 PhysicalOp::OptionalMatch(o) => Some(o.input),
1798 PhysicalOp::PathBuild(o) => Some(o.input),
1799 _ => return false,
1801 };
1802 match child {
1803 None => true,
1804 Some(c) => subtree_is_fully_streaming(plan, c),
1805 }
1806}
1807
1808pub(crate) fn build_streaming<'a, S: GraphStorage + 'a>(
1809 plan: &'a PhysicalPlan,
1810 node_id: PhysicalNodeId,
1811 storage: &'a S,
1812 params: Arc<BTreeMap<String, LoraValue>>,
1813) -> ExecResult<Box<dyn RowSource + 'a>> {
1814 let op = &plan.nodes[node_id];
1815
1816 if !is_streaming_op(op) {
1817 return build_buffered_subtree(plan, node_id, storage, ¶ms);
1818 }
1819
1820 match op {
1821 PhysicalOp::Argument(_) => Ok(Box::new(ArgumentSource::new())),
1822
1823 PhysicalOp::NodeScan(NodeScanExec { input, var }) => {
1824 let upstream = open_input(plan, *input, storage, params.clone())?;
1825 Ok(Box::new(NodeScanSource::new(upstream, storage, *var)))
1826 }
1827
1828 PhysicalOp::NodeByLabelScan(NodeByLabelScanExec { input, var, labels }) => {
1829 let upstream = open_input(plan, *input, storage, params.clone())?;
1830 Ok(Box::new(NodeByLabelScanSource::new(
1831 upstream, storage, *var, labels,
1832 )))
1833 }
1834
1835 PhysicalOp::NodeByPropertyScan(NodeByPropertyScanExec {
1836 input,
1837 var,
1838 labels,
1839 key,
1840 value,
1841 }) => {
1842 let upstream = open_input(plan, *input, storage, params.clone())?;
1843 let ctx = StreamCtx::new(storage, params);
1844 Ok(Box::new(NodeByPropertyScanSource::new(
1845 upstream, ctx, *var, labels, key, value,
1846 )))
1847 }
1848
1849 PhysicalOp::Expand(ExpandExec {
1850 input,
1851 src,
1852 rel,
1853 dst,
1854 types,
1855 direction,
1856 rel_properties,
1857 range,
1858 }) => {
1859 let upstream = build_streaming(plan, *input, storage, params.clone())?;
1860 let ctx = StreamCtx::new(storage, params);
1861 match range.as_ref() {
1862 Some(range) => Ok(Box::new(VariableLengthExpandSource::new(
1863 upstream, ctx, *src, *rel, *dst, types, *direction, range,
1864 ))),
1865 None => Ok(Box::new(ExpandSource::new(
1866 upstream,
1867 ctx,
1868 *src,
1869 *rel,
1870 *dst,
1871 types,
1872 *direction,
1873 rel_properties.as_ref(),
1874 ))),
1875 }
1876 }
1877
1878 PhysicalOp::Filter(FilterExec { input, predicate }) => {
1879 let upstream = build_streaming(plan, *input, storage, params.clone())?;
1880 let ctx = StreamCtx::new(storage, params);
1881 Ok(Box::new(FilterSource::new(upstream, ctx, predicate)))
1882 }
1883
1884 PhysicalOp::Projection(ProjectionExec {
1885 input,
1886 distinct,
1887 items,
1888 include_existing,
1889 }) => {
1890 let upstream = build_streaming(plan, *input, storage, params.clone())?;
1891 let ctx = StreamCtx::new(storage, params);
1892 let proj: Box<dyn RowSource + 'a> = Box::new(ProjectionSource::new(
1893 upstream,
1894 ctx,
1895 items,
1896 *include_existing,
1897 ));
1898 if *distinct {
1899 Ok(Box::new(DistinctSource::new(proj)))
1900 } else {
1901 Ok(proj)
1902 }
1903 }
1904
1905 PhysicalOp::Unwind(UnwindExec { input, expr, alias }) => {
1906 let upstream = build_streaming(plan, *input, storage, params.clone())?;
1907 let ctx = StreamCtx::new(storage, params);
1908 Ok(Box::new(UnwindSource::new(upstream, ctx, expr, *alias)))
1909 }
1910
1911 PhysicalOp::Limit(LimitExec { input, skip, limit }) => {
1912 let upstream = build_streaming(plan, *input, storage, params.clone())?;
1913 let ctx = StreamCtx::new(storage, params);
1916 let eval_ctx = ctx.eval_ctx();
1917 let scratch = Row::new();
1918 let skip_n = skip
1919 .as_ref()
1920 .and_then(|e| eval_expr(e, &scratch, &eval_ctx).as_i64())
1921 .unwrap_or(0)
1922 .max(0) as usize;
1923 let limit_n = limit
1924 .as_ref()
1925 .and_then(|e| eval_expr(e, &scratch, &eval_ctx).as_i64())
1926 .map(|n| n.max(0) as usize);
1927 Ok(Box::new(LimitSource::new(upstream, skip_n, limit_n)))
1928 }
1929
1930 PhysicalOp::Sort(SortExec { input, items }) => {
1931 let upstream = build_streaming(plan, *input, storage, params.clone())?;
1932 let ctx = StreamCtx::new(storage, params);
1933 Ok(Box::new(SortSource::new(upstream, ctx, items)))
1934 }
1935
1936 PhysicalOp::HashAggregation(HashAggregationExec {
1937 input,
1938 group_by,
1939 aggregates,
1940 }) => {
1941 let upstream = build_streaming(plan, *input, storage, params.clone())?;
1942 let ctx = StreamCtx::new(storage, params);
1943 Ok(Box::new(HashAggregationSource::new(
1944 upstream, ctx, group_by, aggregates,
1945 )))
1946 }
1947
1948 PhysicalOp::OptionalMatch(OptionalMatchExec {
1949 input,
1950 inner,
1951 new_vars,
1952 }) => {
1953 let upstream = build_streaming(plan, *input, storage, params.clone())?;
1954 let ctx = StreamCtx::new(storage, params);
1955 Ok(Box::new(OptionalMatchSource::new(
1956 upstream, ctx, plan, *inner, new_vars,
1957 )))
1958 }
1959
1960 PhysicalOp::PathBuild(PathBuildExec {
1961 input,
1962 output,
1963 node_vars,
1964 rel_vars,
1965 shortest_path_all,
1966 }) => {
1967 let upstream = build_streaming(plan, *input, storage, params.clone())?;
1968 let ctx = StreamCtx::new(storage, params);
1969 Ok(Box::new(PathBuildSource::new(
1970 upstream,
1971 ctx,
1972 *output,
1973 node_vars,
1974 rel_vars,
1975 *shortest_path_all,
1976 )))
1977 }
1978
1979 _ => unreachable!("non-streaming op reached streaming branch: {op:?}"),
1981 }
1982}
1983
1984fn open_input<'a, S: GraphStorage + 'a>(
1988 plan: &'a PhysicalPlan,
1989 input: Option<PhysicalNodeId>,
1990 storage: &'a S,
1991 params: Arc<BTreeMap<String, LoraValue>>,
1992) -> ExecResult<Box<dyn RowSource + 'a>> {
1993 match input {
1994 Some(input) => build_streaming(plan, input, storage, params),
1995 None => Ok(Box::new(ArgumentSource::new())),
1996 }
1997}
1998
1999fn build_buffered_subtree<'a, S: GraphStorage + 'a>(
2006 plan: &'a PhysicalPlan,
2007 node_id: PhysicalNodeId,
2008 storage: &'a S,
2009 params: &Arc<BTreeMap<String, LoraValue>>,
2010) -> ExecResult<Box<dyn RowSource + 'a>> {
2011 let executor = Executor::new(ExecutionContext {
2015 storage,
2016 params: (**params).clone(),
2017 });
2018 let rows = executor.execute_subtree(plan, node_id)?;
2019 Ok(Box::new(BufferedRowSource::new(rows)))
2020}
2021
2022pub struct PullExecutor<'a, S: GraphStorage> {
2028 storage: &'a S,
2029 params: BTreeMap<String, LoraValue>,
2030}
2031
2032impl<'a, S: GraphStorage> PullExecutor<'a, S> {
2033 pub fn new(storage: &'a S, params: BTreeMap<String, LoraValue>) -> Self {
2034 Self { storage, params }
2035 }
2036
2037 pub fn open_compiled(self, compiled: &'a CompiledQuery) -> ExecResult<Box<dyn RowSource + 'a>>
2046 where
2047 S: 'a,
2048 {
2049 let _ = take_eval_error();
2050 compiled_to_streaming(compiled, self.storage, self.params)
2051 }
2052}
2053
2054pub struct MutablePullExecutor<'a, S: GraphStorageMut> {
2059 storage: &'a mut S,
2060 params: BTreeMap<String, LoraValue>,
2061}
2062
2063impl<'a, S: GraphStorageMut + GraphStorage> MutablePullExecutor<'a, S> {
2064 pub fn new(storage: &'a mut S, params: BTreeMap<String, LoraValue>) -> Self {
2065 Self { storage, params }
2066 }
2067
2068 pub fn open_compiled(self, compiled: &'a CompiledQuery) -> ExecResult<Box<dyn RowSource + 'a>>
2082 where
2083 S: 'a,
2084 {
2085 if compiled.unions.is_empty() {
2086 return open_mutable_plan_cursor(self.storage, &compiled.physical, self.params);
2087 }
2088
2089 MutableUnionSource::open(self.storage, compiled, self.params)
2090 .map(|source| Box::new(source) as Box<dyn RowSource + 'a>)
2091 }
2092}
2093
2094fn open_mutable_plan_cursor<'a, S: GraphStorageMut + GraphStorage + 'a>(
2095 storage: &'a mut S,
2096 plan: &'a PhysicalPlan,
2097 params: BTreeMap<String, LoraValue>,
2098) -> ExecResult<Box<dyn RowSource + 'a>> {
2099 if let Some(input) = write_op_input(plan, plan.root) {
2100 if subtree_is_fully_streaming(plan, input) {
2101 return StreamingWriteCursor::open(storage, plan, plan.root, params)
2102 .map(|c| Box::new(c) as Box<dyn RowSource + 'a>);
2103 }
2104 }
2105
2106 let mut executor = MutableExecutor::new(MutableExecutionContext { storage, params });
2107 let rows = executor.execute_rows(plan)?;
2108 Ok(Box::new(BufferedRowSource::new(rows)))
2109}
2110
2111pub struct MutableUnionSource<'a, S: GraphStorageMut + GraphStorage + 'a> {
2115 storage_ptr: *mut S,
2116 compiled: &'a CompiledQuery,
2117 params: BTreeMap<String, LoraValue>,
2118 branch_idx: usize,
2119 current: Option<Box<dyn RowSource + 'a>>,
2120 needs_dedup: bool,
2121 seen: BTreeSet<Vec<(String, GroupValueKey)>>,
2122 _phantom: std::marker::PhantomData<&'a mut S>,
2123}
2124
2125impl<'a, S: GraphStorageMut + GraphStorage + 'a> MutableUnionSource<'a, S> {
2126 fn open(
2127 storage: &'a mut S,
2128 compiled: &'a CompiledQuery,
2129 params: BTreeMap<String, LoraValue>,
2130 ) -> ExecResult<Self> {
2131 let needs_dedup = compiled.unions.iter().any(|branch| !branch.all);
2132 Ok(Self {
2133 storage_ptr: storage as *mut S,
2134 compiled,
2135 params,
2136 branch_idx: 0,
2137 current: None,
2138 needs_dedup,
2139 seen: BTreeSet::new(),
2140 _phantom: std::marker::PhantomData,
2141 })
2142 }
2143
2144 fn branch_count(&self) -> usize {
2145 self.compiled.unions.len() + 1
2146 }
2147
2148 fn branch_plan(&self, idx: usize) -> &'a PhysicalPlan {
2149 if idx == 0 {
2150 &self.compiled.physical
2151 } else {
2152 &self.compiled.unions[idx - 1].physical
2153 }
2154 }
2155
2156 fn open_branch(&mut self, idx: usize) -> ExecResult<Box<dyn RowSource + 'a>> {
2157 let plan = self.branch_plan(idx);
2158 let storage = unsafe { &mut *self.storage_ptr };
2163 open_mutable_plan_cursor(storage, plan, self.params.clone())
2164 }
2165}
2166
2167impl<'a, S: GraphStorageMut + GraphStorage + 'a> RowSource for MutableUnionSource<'a, S> {
2168 fn next_row(&mut self) -> ExecResult<Option<Row>> {
2169 loop {
2170 if self.branch_idx >= self.branch_count() {
2171 return Ok(None);
2172 }
2173
2174 if self.current.is_none() {
2175 self.current = Some(self.open_branch(self.branch_idx)?);
2176 }
2177
2178 match self
2179 .current
2180 .as_mut()
2181 .expect("current branch initialized above")
2182 .next_row()?
2183 {
2184 Some(row) => {
2185 if self.needs_dedup {
2186 let key = row
2187 .iter_named()
2188 .map(|(_, name, val)| {
2189 (name.into_owned(), GroupValueKey::from_value(val))
2190 })
2191 .collect();
2192 if !self.seen.insert(key) {
2193 continue;
2194 }
2195 }
2196 return Ok(Some(row));
2197 }
2198 None => {
2199 self.current.take();
2200 self.branch_idx += 1;
2201 }
2202 }
2203 }
2204 }
2205}
2206
2207pub struct StreamingWriteCursor<'a, S: GraphStorageMut + GraphStorage + 'a> {
2231 upstream: ManuallyDrop<Box<dyn RowSource + 'a>>,
2233 storage_ptr: *mut S,
2237 plan: &'a PhysicalPlan,
2239 write_op_node: PhysicalNodeId,
2243 params: BTreeMap<String, LoraValue>,
2246 _phantom: std::marker::PhantomData<&'a mut S>,
2247}
2248
2249impl<'a, S: GraphStorageMut + GraphStorage + 'a> StreamingWriteCursor<'a, S> {
2250 pub(crate) fn open(
2254 storage: &'a mut S,
2255 plan: &'a PhysicalPlan,
2256 write_op_node: PhysicalNodeId,
2257 params: BTreeMap<String, LoraValue>,
2258 ) -> ExecResult<Self> {
2259 let input = match write_op_input(plan, write_op_node) {
2260 Some(i) => i,
2261 None => {
2262 return Err(crate::errors::ExecutorError::RuntimeError(format!(
2263 "StreamingWriteCursor::open called with non-write node {write_op_node:?}"
2264 )));
2265 }
2266 };
2267 let storage_ptr: *mut S = storage as *mut S;
2268
2269 let storage_ref: &'a S = unsafe { &*storage_ptr };
2271 let upstream = build_streaming(plan, input, storage_ref, Arc::new(params.clone()))?;
2272
2273 Ok(Self {
2274 upstream: ManuallyDrop::new(upstream),
2275 storage_ptr,
2276 plan,
2277 write_op_node,
2278 params,
2279 _phantom: std::marker::PhantomData,
2280 })
2281 }
2282}
2283
2284impl<'a, S: GraphStorageMut + GraphStorage + 'a> RowSource for StreamingWriteCursor<'a, S> {
2285 fn next_row(&mut self) -> ExecResult<Option<Row>> {
2286 let mut row = match self.upstream.next_row()? {
2287 Some(r) => r,
2288 None => return Ok(None),
2289 };
2290
2291 let storage_mut: &mut S = unsafe { &mut *self.storage_ptr };
2296 let mut exec = MutableExecutor::new(MutableExecutionContext {
2297 storage: storage_mut,
2298 params: self.params.clone(),
2299 });
2300 let op = &self.plan.nodes[self.write_op_node];
2301 exec.apply_write_op(op, &mut row)?;
2302 let row = exec.hydrate_row(row);
2303 Ok(Some(row))
2304 }
2305}
2306
2307impl<'a, S: GraphStorageMut + GraphStorage + 'a> Drop for StreamingWriteCursor<'a, S> {
2308 fn drop(&mut self) {
2309 unsafe {
2313 ManuallyDrop::drop(&mut self.upstream);
2314 }
2315 }
2316}
2317
2318pub fn collect_compiled<'a, S: GraphStorage + 'a>(
2321 storage: &'a S,
2322 params: BTreeMap<String, LoraValue>,
2323 compiled: &'a CompiledQuery,
2324) -> ExecResult<Vec<Row>> {
2325 let mut cursor = PullExecutor::new(storage, params).open_compiled(compiled)?;
2326 drain(cursor.as_mut())
2327}
2328
2329#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2336pub enum StreamShape {
2337 ReadOnly,
2340 Mutating,
2344}
2345
2346impl StreamShape {
2347 pub fn is_mutating(self) -> bool {
2348 matches!(self, StreamShape::Mutating)
2349 }
2350}
2351
2352fn plan_is_mutating(plan: &PhysicalPlan) -> bool {
2353 plan.nodes.iter().any(|op| {
2354 matches!(
2355 op,
2356 PhysicalOp::Create(_)
2357 | PhysicalOp::Merge(_)
2358 | PhysicalOp::Delete(_)
2359 | PhysicalOp::Set(_)
2360 | PhysicalOp::Remove(_)
2361 )
2362 })
2363}
2364
2365pub fn classify_stream(compiled: &CompiledQuery) -> StreamShape {
2369 if plan_is_mutating(&compiled.physical)
2370 || compiled
2371 .unions
2372 .iter()
2373 .any(|b| plan_is_mutating(&b.physical))
2374 {
2375 StreamShape::Mutating
2376 } else {
2377 StreamShape::ReadOnly
2378 }
2379}
2380
2381pub fn plan_result_columns(plan: &PhysicalPlan) -> Vec<String> {
2395 plan_columns_at(plan, plan.root).unwrap_or_default()
2396}
2397
2398fn plan_columns_at(plan: &PhysicalPlan, node: PhysicalNodeId) -> Option<Vec<String>> {
2399 match &plan.nodes[node] {
2400 PhysicalOp::Projection(p) => Some(p.items.iter().map(|i| i.name.clone()).collect()),
2401 PhysicalOp::HashAggregation(p) => Some(
2402 p.group_by
2403 .iter()
2404 .chain(p.aggregates.iter())
2405 .map(|i| i.name.clone())
2406 .collect(),
2407 ),
2408 PhysicalOp::Limit(p) => plan_columns_at(plan, p.input),
2409 PhysicalOp::Sort(p) => plan_columns_at(plan, p.input),
2410 PhysicalOp::PathBuild(p) => plan_columns_at(plan, p.input),
2411 PhysicalOp::OptionalMatch(p) => plan_columns_at(plan, p.input),
2412 PhysicalOp::Filter(p) => plan_columns_at(plan, p.input),
2413 PhysicalOp::Unwind(p) => plan_columns_at(plan, p.input),
2414 PhysicalOp::Create(p) => plan_columns_at(plan, p.input),
2415 PhysicalOp::Merge(p) => plan_columns_at(plan, p.input),
2416 PhysicalOp::Delete(p) => plan_columns_at(plan, p.input),
2417 PhysicalOp::Set(p) => plan_columns_at(plan, p.input),
2418 PhysicalOp::Remove(p) => plan_columns_at(plan, p.input),
2419 PhysicalOp::Argument(_)
2420 | PhysicalOp::NodeScan(_)
2421 | PhysicalOp::NodeByLabelScan(_)
2422 | PhysicalOp::NodeByPropertyScan(_)
2423 | PhysicalOp::Expand(_) => None,
2424 }
2425}
2426
2427pub fn compiled_result_columns(compiled: &CompiledQuery) -> Vec<String> {
2430 plan_result_columns(&compiled.physical)
2431}
2432
2433#[cfg(test)]
2434mod tests {
2435 use super::*;
2436 use lora_analyzer::symbols::VarId;
2437 use lora_ast::Span;
2438 use lora_compiler::physical::{ArgumentExec, ExpandExec, NodeByLabelScanExec, PhysicalPlan};
2439 use lora_store::{GraphStorageMut, InMemoryGraph};
2440
2441 #[test]
2442 fn variable_length_expand_has_streaming_source() {
2443 let mut graph = InMemoryGraph::new();
2444 let a = graph.create_node(vec!["N".into()], BTreeMap::new());
2445 let b = graph.create_node(vec!["N".into()], BTreeMap::new());
2446 let c = graph.create_node(vec!["N".into()], BTreeMap::new());
2447 graph
2448 .create_relationship(a.id, b.id, "R", BTreeMap::new())
2449 .unwrap();
2450 graph
2451 .create_relationship(b.id, c.id, "R", BTreeMap::new())
2452 .unwrap();
2453
2454 let src = VarId(0);
2455 let rel = VarId(1);
2456 let dst = VarId(2);
2457 let plan = PhysicalPlan {
2458 root: 2,
2459 nodes: vec![
2460 PhysicalOp::Argument(ArgumentExec),
2461 PhysicalOp::NodeByLabelScan(NodeByLabelScanExec {
2462 input: Some(0),
2463 var: src,
2464 labels: vec![vec!["N".into()]],
2465 }),
2466 PhysicalOp::Expand(ExpandExec {
2467 input: 1,
2468 src,
2469 rel: Some(rel),
2470 dst,
2471 types: vec!["R".into()],
2472 direction: Direction::Right,
2473 rel_properties: None,
2474 range: Some(RangeLiteral {
2475 start: Some(1),
2476 end: Some(2),
2477 span: Span::default(),
2478 }),
2479 }),
2480 ],
2481 };
2482
2483 assert!(subtree_is_fully_streaming(&plan, plan.root));
2484
2485 let mut source =
2486 build_streaming(&plan, plan.root, &graph, Arc::new(BTreeMap::new())).unwrap();
2487 let rows = drain(source.as_mut()).unwrap();
2488 let mut rel_lengths = rows
2489 .iter()
2490 .map(|row| match row.get(rel).unwrap() {
2491 LoraValue::List(rels) => rels.len(),
2492 other => panic!("expected relationship list, got {other:?}"),
2493 })
2494 .collect::<Vec<_>>();
2495 rel_lengths.sort_unstable();
2496
2497 assert_eq!(rows.len(), 3);
2498 assert_eq!(rel_lengths, vec![1, 1, 2]);
2499 }
2500}