Skip to main content

lora_executor/
pull.rs

1//! Pull-based row pipeline.
2//!
3//! [`RowSource`] is the fallible row cursor; [`PullExecutor::open_compiled`]
4//! and [`MutablePullExecutor::open_compiled`] return a `Box<dyn RowSource + 'a>`
5//! representing a streaming query plan execution.
6//!
7//! ## Architecture
8//!
9//! The streaming-listed operators have real per-operator
10//! [`RowSource`] implementations that pull from their upstream one
11//! row at a time:
12//!
13//! * [`ArgumentSource`]
14//! * [`NodeScanSource`]
15//! * [`NodeByLabelScanSource`]
16//! * [`ExpandSource`] (single-hop)
17//! * [`VariableLengthExpandSource`]
18//! * [`FilterSource`]
19//! * [`ProjectionSource`]
20//! * [`DistinctSource`]
21//! * [`UnwindSource`]
22//! * [`LimitSource`]
23//! * [`SortSource`] (buffers internally, yields lazily)
24//! * [`HashAggregationSource`] (buffers internally, yields lazily)
25//! * [`OptionalMatchSource`] (streams outer input, buffers inner once)
26//! * [`PathBuildSource`]
27//!
28//! Blocking internals such as sort, aggregation, and shortest-path
29//! filtering still allocate where the Cypher semantics require a
30//! complete input set. Deduping operators keep only their seen-key
31//! state and stream rows as soon as a new key appears.
32//!
33//! Hydration happens once at the top of the pipeline — operator
34//! sources yield raw rows so intermediate evaluations work on
35//! storage-borrowed values, and the topmost [`HydratingSource`]
36//! converts node / relationship references to their full hydrated
37//! map form before the row leaves the cursor.
38
39use std::collections::{BTreeMap, BTreeSet};
40use std::mem::ManuallyDrop;
41use std::sync::Arc;
42
43use lora_analyzer::symbols::VarId;
44use lora_analyzer::{ResolvedExpr, ResolvedProjection};
45use lora_ast::{Direction, RangeLiteral};
46use lora_compiler::physical::{
47    ExpandExec, FilterExec, HashAggregationExec, LimitExec, NodeByLabelScanExec,
48    NodeByPropertyScanExec, NodeScanExec, OptionalMatchExec, PathBuildExec, PhysicalNodeId,
49    PhysicalOp, PhysicalPlan, ProjectionExec, SortExec, UnwindExec,
50};
51use lora_compiler::CompiledQuery;
52use lora_store::{GraphStorage, GraphStorageMut, NodeId};
53
54use crate::errors::{value_kind, ExecResult, ExecutorError};
55use crate::eval::{eval_expr, take_eval_error, EvalContext};
56use crate::executor::{
57    build_path_value, compute_aggregate_expr, hydrate_node_record, hydrate_relationship_record,
58    indexed_node_property_candidates, label_group_candidates_prefiltered,
59    node_matches_label_groups, node_matches_property_filter, resolve_range,
60    scan_node_ids_for_label_groups, value_matches_property_value, ExecutionContext, Executor,
61    GroupValueKey, MutableExecutionContext, MutableExecutor,
62};
63use crate::value::{LoraValue, Row};
64
65/// Fallible pull-based row cursor.
66///
67/// Each call to [`RowSource::next_row`] returns the next row,
68/// `Ok(None)` when the cursor is exhausted, or an error if execution
69/// fails. The cursor stays in a valid state after an error — callers
70/// may drop it without observing additional side effects.
71pub trait RowSource {
72    /// Pull the next row.
73    fn next_row(&mut self) -> ExecResult<Option<Row>>;
74}
75
76/// Drain a row source into a `Vec<Row>`, propagating the first error.
77pub fn drain<S: RowSource + ?Sized>(source: &mut S) -> ExecResult<Vec<Row>> {
78    let mut out = Vec::new();
79    while let Some(row) = source.next_row()? {
80        out.push(row);
81    }
82    Ok(out)
83}
84
85// ---------------------------------------------------------------------------
86// Shared streaming context
87// ---------------------------------------------------------------------------
88
89/// Storage + bound parameters shared by every operator source in a
90/// pull pipeline. `Clone` is one pointer-copy plus an `Arc::clone`
91/// (params), so passing it by value down the build tree is
92/// effectively free, while consolidating "the two pieces every
93/// expression-evaluating source needs" into one field.
94#[derive(Clone)]
95pub(crate) struct StreamCtx<'a, S: GraphStorage> {
96    pub storage: &'a S,
97    pub params: Arc<BTreeMap<String, LoraValue>>,
98}
99
100impl<'a, S: GraphStorage> StreamCtx<'a, S> {
101    fn new(storage: &'a S, params: Arc<BTreeMap<String, LoraValue>>) -> Self {
102        Self { storage, params }
103    }
104
105    /// Build a borrowing [`EvalContext`] for use inside an
106    /// operator's `next_row` method. Cheap — two pointer reads.
107    fn eval_ctx<'b>(&'b self) -> EvalContext<'b, S> {
108        EvalContext {
109            storage: self.storage,
110            params: &self.params,
111        }
112    }
113}
114
115// ---------------------------------------------------------------------------
116// Buffered fallback
117// ---------------------------------------------------------------------------
118
119/// Buffered cursor backed by a pre-computed `Vec<Row>`. Used both as
120/// a simple "rows already collected" adapter and as the leaf fallback
121/// for operators whose internals still require full materialization.
122pub struct BufferedRowSource {
123    iter: std::vec::IntoIter<Row>,
124}
125
126impl BufferedRowSource {
127    pub fn new(rows: Vec<Row>) -> Self {
128        Self {
129            iter: rows.into_iter(),
130        }
131    }
132}
133
134impl RowSource for BufferedRowSource {
135    fn next_row(&mut self) -> ExecResult<Option<Row>> {
136        Ok(self.iter.next())
137    }
138}
139
140// ---------------------------------------------------------------------------
141// Per-operator streaming sources
142// ---------------------------------------------------------------------------
143
144/// Yields a single empty row exactly once. The bottom of every plan
145/// chain that doesn't start with an explicit input.
146pub struct ArgumentSource {
147    yielded: bool,
148}
149
150impl ArgumentSource {
151    pub fn new() -> Self {
152        Self { yielded: false }
153    }
154}
155
156impl Default for ArgumentSource {
157    fn default() -> Self {
158        Self::new()
159    }
160}
161
162impl RowSource for ArgumentSource {
163    fn next_row(&mut self) -> ExecResult<Option<Row>> {
164        if self.yielded {
165            Ok(None)
166        } else {
167            self.yielded = true;
168            Ok(Some(Row::new()))
169        }
170    }
171}
172
173/// Streams `(input × node_ids)`. For each upstream row, emits one
174/// row per node id with `var` bound. If `var` is already bound in
175/// the incoming row, the input row passes through iff that node
176/// still exists (or fails on a non-node binding).
177pub struct NodeScanSource<'a, S: GraphStorage> {
178    upstream: Box<dyn RowSource + 'a>,
179    storage: &'a S,
180    var: VarId,
181    /// The currently-active input row. `None` means the next call
182    /// must pull a fresh row from upstream.
183    cur_row: Option<Row>,
184    /// All node ids the next call should traverse for the current
185    /// input row.
186    cur_ids: Vec<NodeId>,
187    /// Position into `cur_ids`.
188    cur_idx: usize,
189    /// Already emitted the current row when `var` was already bound.
190    cur_emitted: bool,
191}
192
193impl<'a, S: GraphStorage> NodeScanSource<'a, S> {
194    fn new(upstream: Box<dyn RowSource + 'a>, storage: &'a S, var: VarId) -> Self {
195        Self {
196            upstream,
197            storage,
198            var,
199            cur_row: None,
200            cur_ids: Vec::new(),
201            cur_idx: 0,
202            cur_emitted: false,
203        }
204    }
205}
206
207impl<'a, S: GraphStorage> RowSource for NodeScanSource<'a, S> {
208    fn next_row(&mut self) -> ExecResult<Option<Row>> {
209        loop {
210            if self.cur_row.is_none() {
211                match self.upstream.next_row()? {
212                    Some(row) => {
213                        self.cur_row = Some(row);
214                        self.cur_ids.clear();
215                        self.cur_idx = 0;
216                        self.cur_emitted = false;
217                    }
218                    None => return Ok(None),
219                }
220            }
221
222            let row_ref = self.cur_row.as_ref().unwrap();
223
224            // Already-bound case: emit the input row once iff the
225            // bound node still exists; otherwise drop it.
226            if let Some(existing) = row_ref.get(self.var) {
227                if self.cur_emitted {
228                    self.cur_row = None;
229                    continue;
230                }
231                self.cur_emitted = true;
232                match existing {
233                    LoraValue::Node(id) => {
234                        if self.storage.has_node(*id) {
235                            let row = self.cur_row.take().unwrap();
236                            self.cur_emitted = false;
237                            return Ok(Some(row));
238                        }
239                        self.cur_row = None;
240                        continue;
241                    }
242                    other => {
243                        return Err(ExecutorError::ExpectedNodeForExpand {
244                            var: format!("{:?}", self.var),
245                            found: value_kind(other),
246                        });
247                    }
248                }
249            }
250
251            // Unbound case: lazily snapshot all node ids for this
252            // input row, then yield one row per id.
253            if self.cur_idx == 0 && self.cur_ids.is_empty() {
254                self.cur_ids = self.storage.all_node_ids();
255            }
256            if self.cur_idx >= self.cur_ids.len() {
257                self.cur_row = None;
258                self.cur_ids.clear();
259                continue;
260            }
261            let id = self.cur_ids[self.cur_idx];
262            self.cur_idx += 1;
263            let mut new_row = row_ref.clone();
264            new_row.insert(self.var, LoraValue::Node(id));
265            return Ok(Some(new_row));
266        }
267    }
268}
269
270/// Streams `(input × matching-label nodes)`. Same shape as
271/// [`NodeScanSource`] but the candidate ids are produced by
272/// [`scan_node_ids_for_label_groups`] and each candidate is
273/// re-checked under [`node_matches_label_groups`].
274pub struct NodeByLabelScanSource<'a, S: GraphStorage> {
275    upstream: Box<dyn RowSource + 'a>,
276    storage: &'a S,
277    var: VarId,
278    labels: &'a [Vec<String>],
279    cur_row: Option<Row>,
280    cur_ids: Vec<NodeId>,
281    cur_idx: usize,
282    cur_emitted: bool,
283}
284
285impl<'a, S: GraphStorage> NodeByLabelScanSource<'a, S> {
286    fn new(
287        upstream: Box<dyn RowSource + 'a>,
288        storage: &'a S,
289        var: VarId,
290        labels: &'a [Vec<String>],
291    ) -> Self {
292        Self {
293            upstream,
294            storage,
295            var,
296            labels,
297            cur_row: None,
298            cur_ids: Vec::new(),
299            cur_idx: 0,
300            cur_emitted: false,
301        }
302    }
303}
304
305impl<'a, S: GraphStorage> RowSource for NodeByLabelScanSource<'a, S> {
306    fn next_row(&mut self) -> ExecResult<Option<Row>> {
307        loop {
308            if self.cur_row.is_none() {
309                match self.upstream.next_row()? {
310                    Some(row) => {
311                        self.cur_row = Some(row);
312                        self.cur_ids.clear();
313                        self.cur_idx = 0;
314                        self.cur_emitted = false;
315                    }
316                    None => return Ok(None),
317                }
318            }
319
320            let row_ref = self.cur_row.as_ref().unwrap();
321
322            if let Some(existing) = row_ref.get(self.var) {
323                if self.cur_emitted {
324                    self.cur_row = None;
325                    continue;
326                }
327                self.cur_emitted = true;
328                match existing {
329                    LoraValue::Node(id) => {
330                        let labels_ok = self
331                            .storage
332                            .with_node(*id, |n| node_matches_label_groups(&n.labels, self.labels))
333                            .unwrap_or(false);
334                        if labels_ok {
335                            let row = self.cur_row.take().unwrap();
336                            self.cur_emitted = false;
337                            return Ok(Some(row));
338                        }
339                        self.cur_row = None;
340                        continue;
341                    }
342                    other => {
343                        return Err(ExecutorError::ExpectedNodeForExpand {
344                            var: format!("{:?}", self.var),
345                            found: value_kind(other),
346                        });
347                    }
348                }
349            }
350
351            if self.cur_idx == 0 && self.cur_ids.is_empty() {
352                self.cur_ids = scan_node_ids_for_label_groups(self.storage, self.labels);
353            }
354
355            // Skip non-matching ids cheaply.
356            let candidates_prefiltered = label_group_candidates_prefiltered(self.labels);
357            while self.cur_idx < self.cur_ids.len() {
358                let id = self.cur_ids[self.cur_idx];
359                self.cur_idx += 1;
360                if !candidates_prefiltered {
361                    let labels_ok = self
362                        .storage
363                        .with_node(id, |n| node_matches_label_groups(&n.labels, self.labels))
364                        .unwrap_or(false);
365                    if !labels_ok {
366                        continue;
367                    }
368                }
369                let mut new_row = row_ref.clone();
370                new_row.insert(self.var, LoraValue::Node(id));
371                return Ok(Some(new_row));
372            }
373
374            self.cur_row = None;
375            self.cur_ids.clear();
376        }
377    }
378}
379
380/// Streams `(input × indexed-property nodes)`. The property index supplies
381/// candidate ids and each candidate is re-checked under the full label and
382/// property equality semantics.
383pub struct NodeByPropertyScanSource<'a, S: GraphStorage> {
384    upstream: Box<dyn RowSource + 'a>,
385    ctx: StreamCtx<'a, S>,
386    var: VarId,
387    labels: &'a [Vec<String>],
388    key: &'a str,
389    value: &'a ResolvedExpr,
390    cur_row: Option<Row>,
391    cur_expected: Option<LoraValue>,
392    cur_ids: Vec<NodeId>,
393    cur_idx: usize,
394    cur_emitted: bool,
395    cur_prefiltered: bool,
396}
397
398impl<'a, S: GraphStorage> NodeByPropertyScanSource<'a, S> {
399    fn new(
400        upstream: Box<dyn RowSource + 'a>,
401        ctx: StreamCtx<'a, S>,
402        var: VarId,
403        labels: &'a [Vec<String>],
404        key: &'a str,
405        value: &'a ResolvedExpr,
406    ) -> Self {
407        Self {
408            upstream,
409            ctx,
410            var,
411            labels,
412            key,
413            value,
414            cur_row: None,
415            cur_expected: None,
416            cur_ids: Vec::new(),
417            cur_idx: 0,
418            cur_emitted: false,
419            cur_prefiltered: false,
420        }
421    }
422}
423
424impl<'a, S: GraphStorage> RowSource for NodeByPropertyScanSource<'a, S> {
425    fn next_row(&mut self) -> ExecResult<Option<Row>> {
426        loop {
427            if self.cur_row.is_none() {
428                match self.upstream.next_row()? {
429                    Some(row) => {
430                        let expected = eval_expr(self.value, &row, &self.ctx.eval_ctx());
431                        let candidates = indexed_node_property_candidates(
432                            self.ctx.storage,
433                            self.labels,
434                            self.key,
435                            &expected,
436                        );
437                        self.cur_ids = candidates.ids;
438                        self.cur_prefiltered = candidates.prefiltered;
439                        self.cur_row = Some(row);
440                        self.cur_expected = Some(expected);
441                        self.cur_idx = 0;
442                        self.cur_emitted = false;
443                    }
444                    None => return Ok(None),
445                }
446            }
447
448            let row_ref = self.cur_row.as_ref().unwrap();
449            let expected = self.cur_expected.as_ref().unwrap();
450
451            if let Some(existing) = row_ref.get(self.var) {
452                if self.cur_emitted {
453                    self.cur_row = None;
454                    self.cur_expected = None;
455                    self.cur_ids.clear();
456                    continue;
457                }
458                self.cur_emitted = true;
459                match existing {
460                    LoraValue::Node(id) => {
461                        if node_matches_property_filter(
462                            self.ctx.storage,
463                            *id,
464                            self.labels,
465                            self.key,
466                            expected,
467                        ) {
468                            let row = self.cur_row.take().unwrap();
469                            self.cur_expected = None;
470                            self.cur_ids.clear();
471                            self.cur_emitted = false;
472                            return Ok(Some(row));
473                        }
474                        self.cur_row = None;
475                        self.cur_expected = None;
476                        self.cur_ids.clear();
477                        continue;
478                    }
479                    other => {
480                        return Err(ExecutorError::ExpectedNodeForExpand {
481                            var: format!("{:?}", self.var),
482                            found: value_kind(other),
483                        });
484                    }
485                }
486            }
487
488            while self.cur_idx < self.cur_ids.len() {
489                let id = self.cur_ids[self.cur_idx];
490                self.cur_idx += 1;
491                if !self.cur_prefiltered
492                    && !node_matches_property_filter(
493                        self.ctx.storage,
494                        id,
495                        self.labels,
496                        self.key,
497                        expected,
498                    )
499                {
500                    continue;
501                }
502                let mut new_row = row_ref.clone();
503                new_row.insert(self.var, LoraValue::Node(id));
504                return Ok(Some(new_row));
505            }
506
507            self.cur_row = None;
508            self.cur_expected = None;
509            self.cur_ids.clear();
510        }
511    }
512}
513
514/// Single-hop expansion. For each input row, walks edges from `src`
515/// through the configured `direction` and `types` and emits one row
516/// per matching `(rel, dst)` pair, optionally filtering by relationship
517/// properties.
518pub struct ExpandSource<'a, S: GraphStorage> {
519    upstream: Box<dyn RowSource + 'a>,
520    ctx: StreamCtx<'a, S>,
521    src: VarId,
522    rel: Option<VarId>,
523    dst: VarId,
524    types: &'a [String],
525    direction: Direction,
526    rel_properties: Option<&'a ResolvedExpr>,
527    cur_row: Option<Row>,
528    cur_edges: Vec<(u64, NodeId)>,
529    cur_idx: usize,
530}
531
532impl<'a, S: GraphStorage> ExpandSource<'a, S> {
533    #[allow(clippy::too_many_arguments)]
534    fn new(
535        upstream: Box<dyn RowSource + 'a>,
536        ctx: StreamCtx<'a, S>,
537        src: VarId,
538        rel: Option<VarId>,
539        dst: VarId,
540        types: &'a [String],
541        direction: Direction,
542        rel_properties: Option<&'a ResolvedExpr>,
543    ) -> Self {
544        Self {
545            upstream,
546            ctx,
547            src,
548            rel,
549            dst,
550            types,
551            direction,
552            rel_properties,
553            cur_row: None,
554            cur_edges: Vec::new(),
555            cur_idx: 0,
556        }
557    }
558}
559
560impl<'a, S: GraphStorage> RowSource for ExpandSource<'a, S> {
561    fn next_row(&mut self) -> ExecResult<Option<Row>> {
562        loop {
563            if self.cur_row.is_none() {
564                match self.upstream.next_row()? {
565                    Some(row) => {
566                        // Resolve src now so we can `continue` out of
567                        // rows that don't bind it.
568                        let src_id = match row.get(self.src) {
569                            Some(LoraValue::Node(id)) => *id,
570                            Some(other) => {
571                                return Err(ExecutorError::ExpectedNodeForExpand {
572                                    var: format!("{:?}", self.src),
573                                    found: value_kind(other),
574                                });
575                            }
576                            None => continue,
577                        };
578                        self.cur_edges =
579                            self.ctx
580                                .storage
581                                .expand_ids(src_id, self.direction, self.types);
582                        self.cur_idx = 0;
583                        self.cur_row = Some(row);
584                    }
585                    None => return Ok(None),
586                }
587            }
588
589            let row_ref = self.cur_row.as_ref().unwrap();
590
591            while self.cur_idx < self.cur_edges.len() {
592                let (rel_id, dst_id) = self.cur_edges[self.cur_idx];
593                self.cur_idx += 1;
594
595                // Relationship-property prefilter.
596                if let Some(expr) = self.rel_properties {
597                    let actual = self
598                        .ctx
599                        .storage
600                        .with_relationship(rel_id, |rel| rel.properties.clone());
601                    let matches = match actual {
602                        Some(props) => {
603                            let eval_ctx = self.ctx.eval_ctx();
604                            let expected = eval_expr(expr, row_ref, &eval_ctx);
605                            let LoraValue::Map(map) = expected else {
606                                return Err(ExecutorError::ExpectedPropertyMap {
607                                    found: value_kind(&expected),
608                                });
609                            };
610                            map.iter().all(|(k, v)| {
611                                props
612                                    .get(k)
613                                    .map(|actual| value_matches_property_value(v, actual))
614                                    .unwrap_or(false)
615                            })
616                        }
617                        None => false,
618                    };
619                    if !matches {
620                        continue;
621                    }
622                }
623
624                // Existing-binding cross-checks for dst and rel.
625                if let Some(existing_dst) = row_ref.get(self.dst) {
626                    match existing_dst {
627                        LoraValue::Node(id) if *id == dst_id => {}
628                        LoraValue::Node(_) => continue,
629                        other => {
630                            return Err(ExecutorError::ExpectedNodeForExpand {
631                                var: format!("{:?}", self.dst),
632                                found: value_kind(other),
633                            });
634                        }
635                    }
636                }
637                if let Some(rel_var) = self.rel {
638                    if let Some(existing_rel) = row_ref.get(rel_var) {
639                        match existing_rel {
640                            LoraValue::Relationship(id) if *id == rel_id => {}
641                            LoraValue::Relationship(_) => continue,
642                            other => {
643                                return Err(ExecutorError::ExpectedRelationshipForExpand {
644                                    var: format!("{:?}", rel_var),
645                                    found: value_kind(other),
646                                });
647                            }
648                        }
649                    }
650                }
651
652                let mut new_row = row_ref.clone();
653                if !new_row.contains_key(self.dst) {
654                    new_row.insert(self.dst, LoraValue::Node(dst_id));
655                }
656                if let Some(rel_var) = self.rel {
657                    if !new_row.contains_key(rel_var) {
658                        new_row.insert(rel_var, LoraValue::Relationship(rel_id));
659                    }
660                }
661                return Ok(Some(new_row));
662            }
663
664            // Edges for the current input row exhausted.
665            self.cur_row = None;
666            self.cur_edges.clear();
667            self.cur_idx = 0;
668        }
669    }
670}
671
672/// Variable-length expansion streams its upstream and walks one input row's BFS
673/// frontier incrementally, yielding each matching path as it is discovered.
674pub struct VariableLengthExpandSource<'a, S: GraphStorage> {
675    upstream: Box<dyn RowSource + 'a>,
676    ctx: StreamCtx<'a, S>,
677    src: VarId,
678    rel: Option<VarId>,
679    dst: VarId,
680    types: &'a [String],
681    direction: Direction,
682    min_hops: u64,
683    max_hops: u64,
684    cur_row: Option<Row>,
685    pending_zero_hop: bool,
686    frontier: Vec<(NodeId, Vec<u64>)>,
687    frontier_idx: usize,
688    next_frontier: Vec<(NodeId, Vec<u64>)>,
689    depth: u64,
690    cur_path_node: Option<NodeId>,
691    cur_path_rels: Vec<u64>,
692    cur_edges: Vec<(u64, NodeId)>,
693    cur_edge_idx: usize,
694}
695
696impl<'a, S: GraphStorage> VariableLengthExpandSource<'a, S> {
697    #[allow(clippy::too_many_arguments)]
698    fn new(
699        upstream: Box<dyn RowSource + 'a>,
700        ctx: StreamCtx<'a, S>,
701        src: VarId,
702        rel: Option<VarId>,
703        dst: VarId,
704        types: &'a [String],
705        direction: Direction,
706        range: &'a RangeLiteral,
707    ) -> Self {
708        let (min_hops, max_hops) = resolve_range(range);
709        Self {
710            upstream,
711            ctx,
712            src,
713            rel,
714            dst,
715            types,
716            direction,
717            min_hops,
718            max_hops,
719            cur_row: None,
720            pending_zero_hop: false,
721            frontier: Vec::new(),
722            frontier_idx: 0,
723            next_frontier: Vec::new(),
724            depth: 1,
725            cur_path_node: None,
726            cur_path_rels: Vec::new(),
727            cur_edges: Vec::new(),
728            cur_edge_idx: 0,
729        }
730    }
731
732    fn start_row(&mut self, row: Row, src_id: NodeId) {
733        self.cur_row = Some(row);
734        self.pending_zero_hop = self.min_hops == 0;
735        self.frontier.clear();
736        self.frontier.push((src_id, Vec::new()));
737        self.frontier_idx = 0;
738        self.next_frontier.clear();
739        self.depth = 1;
740        self.cur_path_node = None;
741        self.cur_path_rels.clear();
742        self.cur_edges.clear();
743        self.cur_edge_idx = 0;
744    }
745
746    fn clear_current_row(&mut self) {
747        self.cur_row = None;
748        self.pending_zero_hop = false;
749        self.frontier.clear();
750        self.frontier_idx = 0;
751        self.next_frontier.clear();
752        self.depth = 1;
753        self.cur_path_node = None;
754        self.cur_path_rels.clear();
755        self.cur_edges.clear();
756        self.cur_edge_idx = 0;
757    }
758
759    fn row_for_path(&self, dst_node_id: NodeId, rel_ids: &[u64]) -> Row {
760        let mut new_row = self
761            .cur_row
762            .as_ref()
763            .expect("cur_row is set while yielding variable-length results")
764            .clone();
765        new_row.insert(self.dst, LoraValue::Node(dst_node_id));
766
767        if let Some(rel_var) = self.rel {
768            let rels = rel_ids
769                .iter()
770                .copied()
771                .map(LoraValue::Relationship)
772                .collect();
773            new_row.insert(rel_var, LoraValue::List(rels));
774        }
775
776        new_row
777    }
778
779    fn advance_frontier(&mut self) -> bool {
780        if self.next_frontier.is_empty() || self.depth >= self.max_hops {
781            return false;
782        }
783
784        std::mem::swap(&mut self.frontier, &mut self.next_frontier);
785        self.next_frontier.clear();
786        self.frontier_idx = 0;
787        self.depth += 1;
788        true
789    }
790}
791
792impl<'a, S: GraphStorage> RowSource for VariableLengthExpandSource<'a, S> {
793    fn next_row(&mut self) -> ExecResult<Option<Row>> {
794        loop {
795            if self.cur_row.is_none() {
796                match self.upstream.next_row()? {
797                    Some(row) => {
798                        let src_id = match row.get(self.src) {
799                            Some(LoraValue::Node(id)) => *id,
800                            Some(other) => {
801                                return Err(ExecutorError::ExpectedNodeForExpand {
802                                    var: format!("{:?}", self.src),
803                                    found: value_kind(other),
804                                });
805                            }
806                            None => continue,
807                        };
808                        self.start_row(row, src_id);
809                    }
810                    None => return Ok(None),
811                }
812            }
813
814            if self.pending_zero_hop {
815                self.pending_zero_hop = false;
816                if let Some(LoraValue::Node(src_id)) = self
817                    .cur_row
818                    .as_ref()
819                    .expect("cur_row is initialized before zero-hop yield")
820                    .get(self.src)
821                {
822                    return Ok(Some(self.row_for_path(*src_id, &[])));
823                }
824            }
825
826            loop {
827                if self.depth > self.max_hops {
828                    self.clear_current_row();
829                    break;
830                }
831
832                if self.cur_path_node.is_none() {
833                    if self.frontier_idx >= self.frontier.len() {
834                        if self.advance_frontier() {
835                            continue;
836                        }
837                        self.clear_current_row();
838                        break;
839                    }
840
841                    let (node_id, rels) = &self.frontier[self.frontier_idx];
842                    self.frontier_idx += 1;
843                    self.cur_path_node = Some(*node_id);
844                    self.cur_path_rels.clone_from(rels);
845                    self.cur_edges =
846                        self.ctx
847                            .storage
848                            .expand_ids(*node_id, self.direction, self.types);
849                    self.cur_edge_idx = 0;
850                }
851
852                while self.cur_edge_idx < self.cur_edges.len() {
853                    let (rel_id, neighbor_id) = self.cur_edges[self.cur_edge_idx];
854                    self.cur_edge_idx += 1;
855
856                    if self.cur_path_rels.contains(&rel_id) {
857                        continue;
858                    }
859
860                    let mut rel_ids = Vec::with_capacity(self.cur_path_rels.len() + 1);
861                    rel_ids.extend_from_slice(&self.cur_path_rels);
862                    rel_ids.push(rel_id);
863
864                    if self.depth < self.max_hops {
865                        self.next_frontier.push((neighbor_id, rel_ids.clone()));
866                    }
867
868                    if self.depth >= self.min_hops {
869                        return Ok(Some(self.row_for_path(neighbor_id, &rel_ids)));
870                    }
871                }
872
873                self.cur_path_node = None;
874                self.cur_path_rels.clear();
875                self.cur_edges.clear();
876                self.cur_edge_idx = 0;
877            }
878        }
879    }
880}
881
882/// Pulls upstream rows until one matches `predicate`, then yields it.
883pub struct FilterSource<'a, S: GraphStorage> {
884    upstream: Box<dyn RowSource + 'a>,
885    ctx: StreamCtx<'a, S>,
886    predicate: &'a ResolvedExpr,
887}
888
889impl<'a, S: GraphStorage> FilterSource<'a, S> {
890    fn new(
891        upstream: Box<dyn RowSource + 'a>,
892        ctx: StreamCtx<'a, S>,
893        predicate: &'a ResolvedExpr,
894    ) -> Self {
895        Self {
896            upstream,
897            ctx,
898            predicate,
899        }
900    }
901}
902
903impl<'a, S: GraphStorage> RowSource for FilterSource<'a, S> {
904    fn next_row(&mut self) -> ExecResult<Option<Row>> {
905        loop {
906            match self.upstream.next_row()? {
907                Some(row) => {
908                    let eval_ctx = self.ctx.eval_ctx();
909                    if eval_expr(self.predicate, &row, &eval_ctx).is_truthy() {
910                        return Ok(Some(row));
911                    }
912                }
913                None => return Ok(None),
914            }
915        }
916    }
917}
918
919/// Pulls one upstream row, projects each item, returns a single row
920/// per upstream row. `DISTINCT` projection wraps this source in
921/// [`DistinctSource`], which keeps a seen-key set and yields lazily.
922pub struct ProjectionSource<'a, S: GraphStorage> {
923    upstream: Box<dyn RowSource + 'a>,
924    ctx: StreamCtx<'a, S>,
925    items: &'a [ResolvedProjection],
926    include_existing: bool,
927}
928
929impl<'a, S: GraphStorage> ProjectionSource<'a, S> {
930    fn new(
931        upstream: Box<dyn RowSource + 'a>,
932        ctx: StreamCtx<'a, S>,
933        items: &'a [ResolvedProjection],
934        include_existing: bool,
935    ) -> Self {
936        Self {
937            upstream,
938            ctx,
939            items,
940            include_existing,
941        }
942    }
943}
944
945impl<'a, S: GraphStorage> RowSource for ProjectionSource<'a, S> {
946    fn next_row(&mut self) -> ExecResult<Option<Row>> {
947        match self.upstream.next_row()? {
948            None => Ok(None),
949            Some(row) => {
950                let eval_ctx = self.ctx.eval_ctx();
951                if self.include_existing {
952                    let mut projected = row;
953                    for item in self.items {
954                        let value = eval_expr(&item.expr, &projected, &eval_ctx);
955                        if let Some(err) = take_eval_error() {
956                            return Err(ExecutorError::RuntimeError(err));
957                        }
958                        projected.insert_named(item.output, item.name.clone(), value);
959                    }
960                    Ok(Some(projected))
961                } else {
962                    let mut projected = Row::new();
963                    for item in self.items {
964                        let value = eval_expr(&item.expr, &row, &eval_ctx);
965                        if let Some(err) = take_eval_error() {
966                            return Err(ExecutorError::RuntimeError(err));
967                        }
968                        projected.insert_named(item.output, item.name.clone(), value);
969                    }
970                    Ok(Some(projected))
971                }
972            }
973        }
974    }
975}
976
977/// Per upstream row, evaluates the unwind expression and emits one
978/// row per element of the resulting list. Null inputs are dropped;
979/// scalar inputs are emitted once.
980pub struct UnwindSource<'a, S: GraphStorage> {
981    upstream: Box<dyn RowSource + 'a>,
982    ctx: StreamCtx<'a, S>,
983    expr: &'a ResolvedExpr,
984    alias: VarId,
985    cur_row: Option<Row>,
986    cur_values: Vec<LoraValue>,
987    cur_idx: usize,
988}
989
990impl<'a, S: GraphStorage> UnwindSource<'a, S> {
991    fn new(
992        upstream: Box<dyn RowSource + 'a>,
993        ctx: StreamCtx<'a, S>,
994        expr: &'a ResolvedExpr,
995        alias: VarId,
996    ) -> Self {
997        Self {
998            upstream,
999            ctx,
1000            expr,
1001            alias,
1002            cur_row: None,
1003            cur_values: Vec::new(),
1004            cur_idx: 0,
1005        }
1006    }
1007}
1008
1009impl<'a, S: GraphStorage> RowSource for UnwindSource<'a, S> {
1010    fn next_row(&mut self) -> ExecResult<Option<Row>> {
1011        loop {
1012            if self.cur_idx < self.cur_values.len() {
1013                let value = self.cur_values[self.cur_idx].clone();
1014                self.cur_idx += 1;
1015                let mut new_row = self
1016                    .cur_row
1017                    .as_ref()
1018                    .expect("cur_values is non-empty implies cur_row is set")
1019                    .clone();
1020                new_row.insert(self.alias, value);
1021                return Ok(Some(new_row));
1022            }
1023
1024            self.cur_row = None;
1025            self.cur_values.clear();
1026            self.cur_idx = 0;
1027
1028            match self.upstream.next_row()? {
1029                None => return Ok(None),
1030                Some(row) => {
1031                    let eval_ctx = self.ctx.eval_ctx();
1032                    let value = eval_expr(self.expr, &row, &eval_ctx);
1033                    match value {
1034                        LoraValue::List(values) => {
1035                            self.cur_row = Some(row);
1036                            self.cur_values = values;
1037                            self.cur_idx = 0;
1038                            // loop around
1039                        }
1040                        LoraValue::Null => {
1041                            // Drop this input row entirely.
1042                        }
1043                        scalar => {
1044                            // Emit one row with the scalar bound.
1045                            let mut new_row = row;
1046                            new_row.insert(self.alias, scalar);
1047                            return Ok(Some(new_row));
1048                        }
1049                    }
1050                }
1051            }
1052        }
1053    }
1054}
1055
1056/// Skip the first `skip` rows, emit at most `limit` rows from
1057/// upstream, then return `None` regardless of whether upstream is
1058/// exhausted (avoids paying for a partially consumed upstream).
1059pub struct LimitSource<'a> {
1060    upstream: Box<dyn RowSource + 'a>,
1061    skip: usize,
1062    limit: Option<usize>,
1063    skipped: usize,
1064    emitted: usize,
1065}
1066
1067impl<'a> LimitSource<'a> {
1068    fn new(upstream: Box<dyn RowSource + 'a>, skip: usize, limit: Option<usize>) -> Self {
1069        Self {
1070            upstream,
1071            skip,
1072            limit,
1073            skipped: 0,
1074            emitted: 0,
1075        }
1076    }
1077}
1078
1079impl<'a> RowSource for LimitSource<'a> {
1080    fn next_row(&mut self) -> ExecResult<Option<Row>> {
1081        // Drain skip first.
1082        while self.skipped < self.skip {
1083            match self.upstream.next_row()? {
1084                Some(_) => self.skipped += 1,
1085                None => return Ok(None),
1086            }
1087        }
1088        if let Some(lim) = self.limit {
1089            if self.emitted >= lim {
1090                return Ok(None);
1091            }
1092        }
1093        match self.upstream.next_row()? {
1094            Some(row) => {
1095                self.emitted += 1;
1096                Ok(Some(row))
1097            }
1098            None => Ok(None),
1099        }
1100    }
1101}
1102
1103/// Lazy-buffered Sort source. On the first call to `next_row`,
1104/// drains the entire upstream into a `Vec`, sorts it by the plan's
1105/// sort items, then yields one row at a time on subsequent calls.
1106///
1107/// Memory is O(N) in the number of input rows — Sort can't avoid
1108/// that. The win is that everything *above* a `SortSource` (typically
1109/// a write op like CREATE / SET) streams: the auto-commit pipeline
1110/// pulls one sorted row, applies the per-row write, and emits,
1111/// instead of materializing both Sort's output and the write op's
1112/// output.
1113pub struct SortSource<'a, S: GraphStorage> {
1114    state: SortState<'a, S>,
1115}
1116
1117enum SortState<'a, S: GraphStorage> {
1118    Pending {
1119        upstream: Box<dyn RowSource + 'a>,
1120        ctx: StreamCtx<'a, S>,
1121        items: &'a [lora_analyzer::ResolvedSortItem],
1122    },
1123    Yielding(std::vec::IntoIter<Row>),
1124}
1125
1126impl<'a, S: GraphStorage> SortSource<'a, S> {
1127    fn new(
1128        upstream: Box<dyn RowSource + 'a>,
1129        ctx: StreamCtx<'a, S>,
1130        items: &'a [lora_analyzer::ResolvedSortItem],
1131    ) -> Self {
1132        Self {
1133            state: SortState::Pending {
1134                upstream,
1135                ctx,
1136                items,
1137            },
1138        }
1139    }
1140
1141    /// Drain upstream into a vector and sort it by the plan's
1142    /// sort items. Called from `next_row` on the first invocation.
1143    fn materialize(
1144        upstream: &mut Box<dyn RowSource + 'a>,
1145        ctx: &StreamCtx<'a, S>,
1146        items: &[lora_analyzer::ResolvedSortItem],
1147    ) -> ExecResult<Vec<Row>> {
1148        let mut rows = drain(upstream.as_mut())?;
1149        let eval_ctx = ctx.eval_ctx();
1150        rows.sort_by(|a, b| {
1151            for item in items {
1152                let ord = crate::executor::compare_sort_item(item, a, b, &eval_ctx);
1153                if ord != std::cmp::Ordering::Equal {
1154                    return ord;
1155                }
1156            }
1157            std::cmp::Ordering::Equal
1158        });
1159        Ok(rows)
1160    }
1161}
1162
1163impl<'a, S: GraphStorage> RowSource for SortSource<'a, S> {
1164    fn next_row(&mut self) -> ExecResult<Option<Row>> {
1165        loop {
1166            match &mut self.state {
1167                SortState::Pending {
1168                    upstream,
1169                    ctx,
1170                    items,
1171                } => {
1172                    let rows = Self::materialize(upstream, ctx, items)?;
1173                    self.state = SortState::Yielding(rows.into_iter());
1174                    // fall through to the Yielding match on the next iteration.
1175                }
1176                SortState::Yielding(it) => return Ok(it.next()),
1177            }
1178        }
1179    }
1180}
1181
1182/// Streaming DISTINCT source. Backs `Projection { distinct: true }`.
1183/// It keeps only the seen key set, then yields each first-seen row as
1184/// soon as upstream produces it.
1185pub struct DistinctSource<'a> {
1186    upstream: Box<dyn RowSource + 'a>,
1187    seen: BTreeSet<Vec<GroupValueKey>>,
1188}
1189
1190impl<'a> DistinctSource<'a> {
1191    fn new(upstream: Box<dyn RowSource + 'a>) -> Self {
1192        Self {
1193            upstream,
1194            seen: BTreeSet::new(),
1195        }
1196    }
1197}
1198
1199impl<'a> RowSource for DistinctSource<'a> {
1200    fn next_row(&mut self) -> ExecResult<Option<Row>> {
1201        while let Some(row) = self.upstream.next_row()? {
1202            let key = row
1203                .iter()
1204                .map(|(_, val)| GroupValueKey::from_value(val))
1205                .collect();
1206            if self.seen.insert(key) {
1207                return Ok(Some(row));
1208            }
1209        }
1210        Ok(None)
1211    }
1212}
1213
1214/// Lazy-buffered aggregation source. Aggregation must observe every
1215/// input row before it can emit the first group, so this source drains
1216/// upstream on first pull, builds grouped rows, then yields them one
1217/// at a time to downstream consumers.
1218pub struct HashAggregationSource<'a, S: GraphStorage> {
1219    state: HashAggregationState<'a, S>,
1220}
1221
1222enum HashAggregationState<'a, S: GraphStorage> {
1223    Pending {
1224        upstream: Box<dyn RowSource + 'a>,
1225        ctx: StreamCtx<'a, S>,
1226        group_by: &'a [ResolvedProjection],
1227        aggregates: &'a [ResolvedProjection],
1228    },
1229    Yielding(std::vec::IntoIter<Row>),
1230}
1231
1232impl<'a, S: GraphStorage> HashAggregationSource<'a, S> {
1233    fn new(
1234        upstream: Box<dyn RowSource + 'a>,
1235        ctx: StreamCtx<'a, S>,
1236        group_by: &'a [ResolvedProjection],
1237        aggregates: &'a [ResolvedProjection],
1238    ) -> Self {
1239        Self {
1240            state: HashAggregationState::Pending {
1241                upstream,
1242                ctx,
1243                group_by,
1244                aggregates,
1245            },
1246        }
1247    }
1248
1249    fn materialize(
1250        upstream: &mut Box<dyn RowSource + 'a>,
1251        ctx: &StreamCtx<'a, S>,
1252        group_by: &[ResolvedProjection],
1253        aggregates: &[ResolvedProjection],
1254    ) -> ExecResult<Vec<Row>> {
1255        let input_rows = drain(upstream.as_mut())?;
1256        let eval_ctx = ctx.eval_ctx();
1257        let mut groups: BTreeMap<Vec<GroupValueKey>, Vec<Row>> = BTreeMap::new();
1258
1259        if group_by.is_empty() {
1260            groups.insert(Vec::new(), input_rows);
1261        } else {
1262            for row in input_rows {
1263                let key = group_by
1264                    .iter()
1265                    .map(|proj| GroupValueKey::from_value(&eval_expr(&proj.expr, &row, &eval_ctx)))
1266                    .collect::<Vec<_>>();
1267                groups.entry(key).or_default().push(row);
1268            }
1269        }
1270
1271        let mut out = Vec::new();
1272        for rows in groups.into_values() {
1273            let mut result = Row::new();
1274            if let Some(first) = rows.first() {
1275                for proj in group_by {
1276                    let value = hydrate_value(eval_expr(&proj.expr, first, &eval_ctx), ctx.storage);
1277                    result.insert_named(proj.output, proj.name.clone(), value);
1278                }
1279            }
1280            for proj in aggregates {
1281                let value = compute_aggregate_expr(&proj.expr, &rows, &eval_ctx);
1282                result.insert_named(proj.output, proj.name.clone(), value);
1283            }
1284            out.push(result);
1285        }
1286
1287        Ok(out)
1288    }
1289}
1290
1291impl<'a, S: GraphStorage> RowSource for HashAggregationSource<'a, S> {
1292    fn next_row(&mut self) -> ExecResult<Option<Row>> {
1293        loop {
1294            match &mut self.state {
1295                HashAggregationState::Pending {
1296                    upstream,
1297                    ctx,
1298                    group_by,
1299                    aggregates,
1300                } => {
1301                    let rows = Self::materialize(upstream, ctx, group_by, aggregates)?;
1302                    self.state = HashAggregationState::Yielding(rows.into_iter());
1303                }
1304                HashAggregationState::Yielding(it) => return Ok(it.next()),
1305            }
1306        }
1307    }
1308}
1309
1310/// Streaming outer OPTIONAL MATCH source. The optional inner plan is
1311/// independent of each incoming row in the current physical plan, so
1312/// it is materialized once, then matched against each outer row as
1313/// the outer cursor advances.
1314pub struct OptionalMatchSource<'a, S: GraphStorage> {
1315    upstream: Box<dyn RowSource + 'a>,
1316    ctx: StreamCtx<'a, S>,
1317    plan: &'a PhysicalPlan,
1318    inner: PhysicalNodeId,
1319    new_vars: &'a [VarId],
1320    inner_rows: Option<Vec<Row>>,
1321    cur_input: Option<Row>,
1322    cur_inner_idx: usize,
1323    cur_matched: bool,
1324}
1325
1326impl<'a, S: GraphStorage> OptionalMatchSource<'a, S> {
1327    fn new(
1328        upstream: Box<dyn RowSource + 'a>,
1329        ctx: StreamCtx<'a, S>,
1330        plan: &'a PhysicalPlan,
1331        inner: PhysicalNodeId,
1332        new_vars: &'a [VarId],
1333    ) -> Self {
1334        Self {
1335            upstream,
1336            ctx,
1337            plan,
1338            inner,
1339            new_vars,
1340            inner_rows: None,
1341            cur_input: None,
1342            cur_inner_idx: 0,
1343            cur_matched: false,
1344        }
1345    }
1346
1347    fn ensure_inner_rows(&mut self) -> ExecResult<()> {
1348        if self.inner_rows.is_none() {
1349            let mut inner = build_streaming(
1350                self.plan,
1351                self.inner,
1352                self.ctx.storage,
1353                self.ctx.params.clone(),
1354            )?;
1355            self.inner_rows = Some(drain(inner.as_mut())?);
1356        }
1357        Ok(())
1358    }
1359}
1360
1361impl<'a, S: GraphStorage> RowSource for OptionalMatchSource<'a, S> {
1362    fn next_row(&mut self) -> ExecResult<Option<Row>> {
1363        self.ensure_inner_rows()?;
1364        loop {
1365            if self.cur_input.is_none() {
1366                match self.upstream.next_row()? {
1367                    Some(input_row) => {
1368                        self.cur_input = Some(input_row);
1369                        self.cur_inner_idx = 0;
1370                        self.cur_matched = false;
1371                    }
1372                    None => return Ok(None),
1373                }
1374            }
1375
1376            let inner_rows = self
1377                .inner_rows
1378                .as_ref()
1379                .expect("ensure_inner_rows initializes inner_rows");
1380            let input_row = self
1381                .cur_input
1382                .as_ref()
1383                .expect("cur_input is initialized above");
1384
1385            while self.cur_inner_idx < inner_rows.len() {
1386                let inner_row = &inner_rows[self.cur_inner_idx];
1387                self.cur_inner_idx += 1;
1388
1389                let compatible = input_row
1390                    .iter()
1391                    .all(|(var, val)| match inner_row.get(*var) {
1392                        Some(inner_val) => inner_val == val,
1393                        None => true,
1394                    });
1395                if !compatible {
1396                    continue;
1397                }
1398
1399                let mut merged = input_row.clone();
1400                for (var, name, val) in inner_row.iter_named() {
1401                    if !merged.contains_key(*var) {
1402                        merged.insert_named(*var, name.into_owned(), val.clone());
1403                    }
1404                }
1405                self.cur_matched = true;
1406                return Ok(Some(merged));
1407            }
1408
1409            let mut input_row = self
1410                .cur_input
1411                .take()
1412                .expect("cur_input is initialized while finishing optional row");
1413            if !self.cur_matched {
1414                for &var_id in self.new_vars {
1415                    if !input_row.contains_key(var_id) {
1416                        input_row.insert(var_id, LoraValue::Null);
1417                    }
1418                }
1419                return Ok(Some(input_row));
1420            }
1421        }
1422    }
1423}
1424
1425/// Path-building source. Ordinary path construction is one-in/one-out.
1426/// Shortest-path filtering still has to compare the complete path set,
1427/// so that mode drains internally before yielding.
1428pub struct PathBuildSource<'a, S: GraphStorage> {
1429    state: PathBuildState<'a, S>,
1430}
1431
1432enum PathBuildState<'a, S: GraphStorage> {
1433    Streaming {
1434        upstream: Box<dyn RowSource + 'a>,
1435        ctx: StreamCtx<'a, S>,
1436        output: VarId,
1437        node_vars: &'a [VarId],
1438        rel_vars: &'a [VarId],
1439    },
1440    PendingShortest {
1441        upstream: Box<dyn RowSource + 'a>,
1442        ctx: StreamCtx<'a, S>,
1443        output: VarId,
1444        node_vars: &'a [VarId],
1445        rel_vars: &'a [VarId],
1446        all: bool,
1447    },
1448    Yielding(std::vec::IntoIter<Row>),
1449}
1450
1451impl<'a, S: GraphStorage> PathBuildSource<'a, S> {
1452    fn new(
1453        upstream: Box<dyn RowSource + 'a>,
1454        ctx: StreamCtx<'a, S>,
1455        output: VarId,
1456        node_vars: &'a [VarId],
1457        rel_vars: &'a [VarId],
1458        shortest_path_all: Option<bool>,
1459    ) -> Self {
1460        let state = match shortest_path_all {
1461            Some(all) => PathBuildState::PendingShortest {
1462                upstream,
1463                ctx,
1464                output,
1465                node_vars,
1466                rel_vars,
1467                all,
1468            },
1469            None => PathBuildState::Streaming {
1470                upstream,
1471                ctx,
1472                output,
1473                node_vars,
1474                rel_vars,
1475            },
1476        };
1477        Self { state }
1478    }
1479
1480    fn attach_path(
1481        mut row: Row,
1482        ctx: &StreamCtx<'a, S>,
1483        output: VarId,
1484        node_vars: &[VarId],
1485        rel_vars: &[VarId],
1486    ) -> Row {
1487        let path = build_path_value(&row, node_vars, rel_vars, ctx.storage);
1488        row.insert(output, path);
1489        row
1490    }
1491
1492    fn shortest_path_rows(
1493        upstream: &mut Box<dyn RowSource + 'a>,
1494        ctx: &StreamCtx<'a, S>,
1495        output: VarId,
1496        node_vars: &[VarId],
1497        rel_vars: &[VarId],
1498        all: bool,
1499    ) -> ExecResult<Vec<Row>> {
1500        let mut best_len: Option<usize> = None;
1501        let mut best_rows = Vec::new();
1502
1503        while let Some(row) = upstream.next_row()? {
1504            let row = Self::attach_path(row, ctx, output, node_vars, rel_vars);
1505            let path_len = match row.get(output) {
1506                Some(LoraValue::Path(path)) => path.rels.len(),
1507                _ => usize::MAX,
1508            };
1509
1510            match best_len {
1511                None => {
1512                    best_len = Some(path_len);
1513                    best_rows.push(row);
1514                }
1515                Some(current) if path_len < current => {
1516                    best_len = Some(path_len);
1517                    best_rows.clear();
1518                    best_rows.push(row);
1519                }
1520                Some(current) if path_len == current && all => best_rows.push(row),
1521                _ => {}
1522            }
1523        }
1524
1525        Ok(best_rows)
1526    }
1527}
1528
1529impl<'a, S: GraphStorage> RowSource for PathBuildSource<'a, S> {
1530    fn next_row(&mut self) -> ExecResult<Option<Row>> {
1531        loop {
1532            match &mut self.state {
1533                PathBuildState::Streaming {
1534                    upstream,
1535                    ctx,
1536                    output,
1537                    node_vars,
1538                    rel_vars,
1539                } => {
1540                    return Ok(upstream
1541                        .next_row()?
1542                        .map(|row| Self::attach_path(row, ctx, *output, node_vars, rel_vars)));
1543                }
1544                PathBuildState::PendingShortest {
1545                    upstream,
1546                    ctx,
1547                    output,
1548                    node_vars,
1549                    rel_vars,
1550                    all,
1551                } => {
1552                    let rows = Self::shortest_path_rows(
1553                        upstream, ctx, *output, node_vars, rel_vars, *all,
1554                    )?;
1555                    self.state = PathBuildState::Yielding(rows.into_iter());
1556                }
1557                PathBuildState::Yielding(it) => return Ok(it.next()),
1558            }
1559        }
1560    }
1561}
1562
1563/// Streaming UNION source. Pulls each branch in sequence. `UNION ALL`
1564/// passes rows through directly; plain `UNION` keeps a seen-key set and
1565/// yields the first row for each unique named column/value key.
1566///
1567/// Replaces the buffered fallback that previously sat in
1568/// `PullExecutor::open_compiled` for any UNION-bearing plan. The
1569/// consumer side is now streaming, so a write op on top of a UNION read
1570/// can stream its writes as the union yields.
1571pub struct UnionSource<'a> {
1572    branches: Vec<Box<dyn RowSource + 'a>>,
1573    branch_idx: usize,
1574    needs_dedup: bool,
1575    seen: BTreeSet<Vec<(String, GroupValueKey)>>,
1576}
1577
1578impl<'a> UnionSource<'a> {
1579    fn new(branches: Vec<Box<dyn RowSource + 'a>>, needs_dedup: bool) -> Self {
1580        Self {
1581            branches,
1582            branch_idx: 0,
1583            needs_dedup,
1584            seen: BTreeSet::new(),
1585        }
1586    }
1587}
1588
1589impl<'a> RowSource for UnionSource<'a> {
1590    fn next_row(&mut self) -> ExecResult<Option<Row>> {
1591        while self.branch_idx < self.branches.len() {
1592            match self.branches[self.branch_idx].next_row()? {
1593                Some(row) => {
1594                    if self.needs_dedup {
1595                        let key = row
1596                            .iter_named()
1597                            .map(|(_, name, val)| {
1598                                (name.into_owned(), GroupValueKey::from_value(val))
1599                            })
1600                            .collect();
1601                        if !self.seen.insert(key) {
1602                            continue;
1603                        }
1604                    }
1605                    return Ok(Some(row));
1606                }
1607                None => {
1608                    self.branch_idx += 1;
1609                }
1610            }
1611        }
1612        Ok(None)
1613    }
1614}
1615
1616/// Build a streaming `RowSource` for an entire compiled query,
1617/// handling both the no-UNION and UNION cases. Replaces the
1618/// "UNION-bearing → BufferedRowSource" fallback that previously
1619/// sat in `PullExecutor::open_compiled`.
1620///
1621/// For non-UNION plans this is a thin wrapper around
1622/// [`build_streaming`] + [`HydratingSource`]. For UNION plans, we
1623/// build a streaming chain per branch (each ending in its own
1624/// `HydratingSource` so its node / relationship references are
1625/// resolved against the same view of storage), then combine them
1626/// through [`UnionSource`].
1627pub(crate) fn compiled_to_streaming<'a, S: GraphStorage + 'a>(
1628    compiled: &'a CompiledQuery,
1629    storage: &'a S,
1630    params: BTreeMap<String, LoraValue>,
1631) -> ExecResult<Box<dyn RowSource + 'a>> {
1632    let params = Arc::new(params);
1633
1634    if compiled.unions.is_empty() {
1635        let plan = &compiled.physical;
1636        let inner = build_streaming(plan, plan.root, storage, params)?;
1637        return Ok(Box::new(HydratingSource::new(inner, storage)));
1638    }
1639
1640    let mut branches: Vec<Box<dyn RowSource + 'a>> = Vec::with_capacity(compiled.unions.len() + 1);
1641
1642    let head_inner = build_streaming(
1643        &compiled.physical,
1644        compiled.physical.root,
1645        storage,
1646        params.clone(),
1647    )?;
1648    branches.push(Box::new(HydratingSource::new(head_inner, storage)));
1649
1650    let mut needs_dedup = false;
1651    for branch in &compiled.unions {
1652        let inner = build_streaming(
1653            &branch.physical,
1654            branch.physical.root,
1655            storage,
1656            params.clone(),
1657        )?;
1658        branches.push(Box::new(HydratingSource::new(inner, storage)));
1659        if !branch.all {
1660            needs_dedup = true;
1661        }
1662    }
1663
1664    Ok(Box::new(UnionSource::new(branches, needs_dedup)))
1665}
1666
1667/// Top-of-pipeline hydration. Replaces node / relationship id
1668/// references in each emitted row with their full hydrated map form,
1669/// matching the buffered executor's post-execution hydration step.
1670pub struct HydratingSource<'a, S: GraphStorage> {
1671    upstream: Box<dyn RowSource + 'a>,
1672    storage: &'a S,
1673}
1674
1675impl<'a, S: GraphStorage> HydratingSource<'a, S> {
1676    fn new(upstream: Box<dyn RowSource + 'a>, storage: &'a S) -> Self {
1677        Self { upstream, storage }
1678    }
1679}
1680
1681impl<'a, S: GraphStorage> RowSource for HydratingSource<'a, S> {
1682    fn next_row(&mut self) -> ExecResult<Option<Row>> {
1683        match self.upstream.next_row()? {
1684            None => Ok(None),
1685            Some(row) => {
1686                let mut out = Row::new();
1687                for (var, name, value) in row.into_iter_named() {
1688                    out.insert_named(var, name, hydrate_value(value, self.storage));
1689                }
1690                Ok(Some(out))
1691            }
1692        }
1693    }
1694}
1695
1696pub(crate) fn hydrate_value<S: GraphStorage>(value: LoraValue, storage: &S) -> LoraValue {
1697    match value {
1698        LoraValue::Node(id) => storage
1699            .with_node(id, hydrate_node_record)
1700            .unwrap_or(LoraValue::Null),
1701        LoraValue::Relationship(id) => storage
1702            .with_relationship(id, hydrate_relationship_record)
1703            .unwrap_or(LoraValue::Null),
1704        LoraValue::List(values) => LoraValue::List(
1705            values
1706                .into_iter()
1707                .map(|v| hydrate_value(v, storage))
1708                .collect(),
1709        ),
1710        LoraValue::Map(map) => LoraValue::Map(
1711            map.into_iter()
1712                .map(|(k, v)| (k, hydrate_value(v, storage)))
1713                .collect(),
1714        ),
1715        other => other,
1716    }
1717}
1718
1719// ---------------------------------------------------------------------------
1720// Plan walker
1721// ---------------------------------------------------------------------------
1722
1723/// True iff this op has a per-operator streaming source. Operators
1724/// that aren't on this list fall back to a single materialized
1725/// [`Executor::execute_subtree`] call wrapped as a [`BufferedRowSource`].
1726pub(crate) fn is_streaming_op(op: &PhysicalOp) -> bool {
1727    match op {
1728        PhysicalOp::Argument(_)
1729        | PhysicalOp::NodeScan(_)
1730        | PhysicalOp::NodeByLabelScan(_)
1731        | PhysicalOp::NodeByPropertyScan(_)
1732        | PhysicalOp::Filter(_)
1733        | PhysicalOp::Unwind(_)
1734        | PhysicalOp::Limit(_)
1735        // Sort is internally O(N) but exposed as a `RowSource`:
1736        // it drains its input on the first pull, sorts in place,
1737        // then yields lazily. This lets a write op (CREATE / SET /
1738        // DELETE) above an ORDER BY stream its writes one row at
1739        // a time instead of forcing the whole subtree to
1740        // materialize before the first write.
1741        | PhysicalOp::Sort(_)
1742        | PhysicalOp::HashAggregation(_)
1743        | PhysicalOp::OptionalMatch(_)
1744        | PhysicalOp::PathBuild(_)
1745        // Projection (both `DISTINCT` and non-`DISTINCT`). The
1746        // `DISTINCT` form drains + dedups internally and yields
1747        // lazily via `DistinctSource`.
1748        | PhysicalOp::Projection(_) => true,
1749        // Single-hop expands are fully per-edge. Variable-length expands still
1750        // allocate the current source row's BFS result, then yield lazily.
1751        PhysicalOp::Expand(_) => true,
1752        _ => false,
1753    }
1754}
1755
1756/// If `node_id` is a streamable write operator
1757/// (Create / Set / Delete / Remove / Merge), return its input
1758/// `PhysicalNodeId`. Used by [`MutablePullExecutor::open_compiled`]
1759/// to detect plans that can be driven by [`StreamingWriteCursor`].
1760pub(crate) fn write_op_input(
1761    plan: &PhysicalPlan,
1762    node_id: PhysicalNodeId,
1763) -> Option<PhysicalNodeId> {
1764    match &plan.nodes[node_id] {
1765        PhysicalOp::Create(o) => Some(o.input),
1766        PhysicalOp::Set(o) => Some(o.input),
1767        PhysicalOp::Delete(o) => Some(o.input),
1768        PhysicalOp::Remove(o) => Some(o.input),
1769        PhysicalOp::Merge(o) => Some(o.input),
1770        _ => None,
1771    }
1772}
1773
1774/// True if every operator in the subtree rooted at `node_id` is
1775/// covered by [`is_streaming_op`] (and therefore by
1776/// [`build_streaming`] without falling back to buffered execution).
1777///
1778/// Used by the mutable executor to decide whether write operators
1779/// can pull their input row-by-row instead of materializing it.
1780pub(crate) fn subtree_is_fully_streaming(plan: &PhysicalPlan, node_id: PhysicalNodeId) -> bool {
1781    let op = &plan.nodes[node_id];
1782    if !is_streaming_op(op) {
1783        return false;
1784    }
1785    let child = match op {
1786        PhysicalOp::Argument(_) => return true,
1787        PhysicalOp::NodeScan(o) => o.input,
1788        PhysicalOp::NodeByLabelScan(o) => o.input,
1789        PhysicalOp::NodeByPropertyScan(o) => o.input,
1790        PhysicalOp::Filter(o) => Some(o.input),
1791        PhysicalOp::Unwind(o) => Some(o.input),
1792        PhysicalOp::Limit(o) => Some(o.input),
1793        PhysicalOp::Expand(o) => Some(o.input),
1794        PhysicalOp::Projection(o) => Some(o.input),
1795        PhysicalOp::Sort(o) => Some(o.input),
1796        PhysicalOp::HashAggregation(o) => Some(o.input),
1797        PhysicalOp::OptionalMatch(o) => Some(o.input),
1798        PhysicalOp::PathBuild(o) => Some(o.input),
1799        // Already filtered by is_streaming_op above.
1800        _ => return false,
1801    };
1802    match child {
1803        None => true,
1804        Some(c) => subtree_is_fully_streaming(plan, c),
1805    }
1806}
1807
1808pub(crate) fn build_streaming<'a, S: GraphStorage + 'a>(
1809    plan: &'a PhysicalPlan,
1810    node_id: PhysicalNodeId,
1811    storage: &'a S,
1812    params: Arc<BTreeMap<String, LoraValue>>,
1813) -> ExecResult<Box<dyn RowSource + 'a>> {
1814    let op = &plan.nodes[node_id];
1815
1816    if !is_streaming_op(op) {
1817        return build_buffered_subtree(plan, node_id, storage, &params);
1818    }
1819
1820    match op {
1821        PhysicalOp::Argument(_) => Ok(Box::new(ArgumentSource::new())),
1822
1823        PhysicalOp::NodeScan(NodeScanExec { input, var }) => {
1824            let upstream = open_input(plan, *input, storage, params.clone())?;
1825            Ok(Box::new(NodeScanSource::new(upstream, storage, *var)))
1826        }
1827
1828        PhysicalOp::NodeByLabelScan(NodeByLabelScanExec { input, var, labels }) => {
1829            let upstream = open_input(plan, *input, storage, params.clone())?;
1830            Ok(Box::new(NodeByLabelScanSource::new(
1831                upstream, storage, *var, labels,
1832            )))
1833        }
1834
1835        PhysicalOp::NodeByPropertyScan(NodeByPropertyScanExec {
1836            input,
1837            var,
1838            labels,
1839            key,
1840            value,
1841        }) => {
1842            let upstream = open_input(plan, *input, storage, params.clone())?;
1843            let ctx = StreamCtx::new(storage, params);
1844            Ok(Box::new(NodeByPropertyScanSource::new(
1845                upstream, ctx, *var, labels, key, value,
1846            )))
1847        }
1848
1849        PhysicalOp::Expand(ExpandExec {
1850            input,
1851            src,
1852            rel,
1853            dst,
1854            types,
1855            direction,
1856            rel_properties,
1857            range,
1858        }) => {
1859            let upstream = build_streaming(plan, *input, storage, params.clone())?;
1860            let ctx = StreamCtx::new(storage, params);
1861            match range.as_ref() {
1862                Some(range) => Ok(Box::new(VariableLengthExpandSource::new(
1863                    upstream, ctx, *src, *rel, *dst, types, *direction, range,
1864                ))),
1865                None => Ok(Box::new(ExpandSource::new(
1866                    upstream,
1867                    ctx,
1868                    *src,
1869                    *rel,
1870                    *dst,
1871                    types,
1872                    *direction,
1873                    rel_properties.as_ref(),
1874                ))),
1875            }
1876        }
1877
1878        PhysicalOp::Filter(FilterExec { input, predicate }) => {
1879            let upstream = build_streaming(plan, *input, storage, params.clone())?;
1880            let ctx = StreamCtx::new(storage, params);
1881            Ok(Box::new(FilterSource::new(upstream, ctx, predicate)))
1882        }
1883
1884        PhysicalOp::Projection(ProjectionExec {
1885            input,
1886            distinct,
1887            items,
1888            include_existing,
1889        }) => {
1890            let upstream = build_streaming(plan, *input, storage, params.clone())?;
1891            let ctx = StreamCtx::new(storage, params);
1892            let proj: Box<dyn RowSource + 'a> = Box::new(ProjectionSource::new(
1893                upstream,
1894                ctx,
1895                items,
1896                *include_existing,
1897            ));
1898            if *distinct {
1899                Ok(Box::new(DistinctSource::new(proj)))
1900            } else {
1901                Ok(proj)
1902            }
1903        }
1904
1905        PhysicalOp::Unwind(UnwindExec { input, expr, alias }) => {
1906            let upstream = build_streaming(plan, *input, storage, params.clone())?;
1907            let ctx = StreamCtx::new(storage, params);
1908            Ok(Box::new(UnwindSource::new(upstream, ctx, expr, *alias)))
1909        }
1910
1911        PhysicalOp::Limit(LimitExec { input, skip, limit }) => {
1912            let upstream = build_streaming(plan, *input, storage, params.clone())?;
1913            // Skip / limit expressions are evaluated against an
1914            // empty row (matching the buffered executor semantics).
1915            let ctx = StreamCtx::new(storage, params);
1916            let eval_ctx = ctx.eval_ctx();
1917            let scratch = Row::new();
1918            let skip_n = skip
1919                .as_ref()
1920                .and_then(|e| eval_expr(e, &scratch, &eval_ctx).as_i64())
1921                .unwrap_or(0)
1922                .max(0) as usize;
1923            let limit_n = limit
1924                .as_ref()
1925                .and_then(|e| eval_expr(e, &scratch, &eval_ctx).as_i64())
1926                .map(|n| n.max(0) as usize);
1927            Ok(Box::new(LimitSource::new(upstream, skip_n, limit_n)))
1928        }
1929
1930        PhysicalOp::Sort(SortExec { input, items }) => {
1931            let upstream = build_streaming(plan, *input, storage, params.clone())?;
1932            let ctx = StreamCtx::new(storage, params);
1933            Ok(Box::new(SortSource::new(upstream, ctx, items)))
1934        }
1935
1936        PhysicalOp::HashAggregation(HashAggregationExec {
1937            input,
1938            group_by,
1939            aggregates,
1940        }) => {
1941            let upstream = build_streaming(plan, *input, storage, params.clone())?;
1942            let ctx = StreamCtx::new(storage, params);
1943            Ok(Box::new(HashAggregationSource::new(
1944                upstream, ctx, group_by, aggregates,
1945            )))
1946        }
1947
1948        PhysicalOp::OptionalMatch(OptionalMatchExec {
1949            input,
1950            inner,
1951            new_vars,
1952        }) => {
1953            let upstream = build_streaming(plan, *input, storage, params.clone())?;
1954            let ctx = StreamCtx::new(storage, params);
1955            Ok(Box::new(OptionalMatchSource::new(
1956                upstream, ctx, plan, *inner, new_vars,
1957            )))
1958        }
1959
1960        PhysicalOp::PathBuild(PathBuildExec {
1961            input,
1962            output,
1963            node_vars,
1964            rel_vars,
1965            shortest_path_all,
1966        }) => {
1967            let upstream = build_streaming(plan, *input, storage, params.clone())?;
1968            let ctx = StreamCtx::new(storage, params);
1969            Ok(Box::new(PathBuildSource::new(
1970                upstream,
1971                ctx,
1972                *output,
1973                node_vars,
1974                rel_vars,
1975                *shortest_path_all,
1976            )))
1977        }
1978
1979        // Already filtered out by `is_streaming_op`.
1980        _ => unreachable!("non-streaming op reached streaming branch: {op:?}"),
1981    }
1982}
1983
1984/// Open an upstream input source. `Option<PhysicalNodeId>` parents
1985/// (NodeScan / NodeByLabelScan) treat `None` as "start from a single
1986/// empty row".
1987fn open_input<'a, S: GraphStorage + 'a>(
1988    plan: &'a PhysicalPlan,
1989    input: Option<PhysicalNodeId>,
1990    storage: &'a S,
1991    params: Arc<BTreeMap<String, LoraValue>>,
1992) -> ExecResult<Box<dyn RowSource + 'a>> {
1993    match input {
1994        Some(input) => build_streaming(plan, input, storage, params),
1995        None => Ok(Box::new(ArgumentSource::new())),
1996    }
1997}
1998
1999/// Materialized fallback: drain the subtree through the existing
2000/// `Executor` and present the result as a [`BufferedRowSource`]. This
2001/// remains the leaf path for operators that have no cursor-shaped
2002/// source yet (most notably variable-length expansion inside a larger
2003/// streaming tree) and for write operators in the read-only pull
2004/// executor.
2005fn build_buffered_subtree<'a, S: GraphStorage + 'a>(
2006    plan: &'a PhysicalPlan,
2007    node_id: PhysicalNodeId,
2008    storage: &'a S,
2009    params: &Arc<BTreeMap<String, LoraValue>>,
2010) -> ExecResult<Box<dyn RowSource + 'a>> {
2011    // The `Executor` consumes its `ExecutionContext` so we must
2012    // clone the params map for the fallback. In practice this is
2013    // small (typically empty or a handful of named parameters).
2014    let executor = Executor::new(ExecutionContext {
2015        storage,
2016        params: (**params).clone(),
2017    });
2018    let rows = executor.execute_subtree(plan, node_id)?;
2019    Ok(Box::new(BufferedRowSource::new(rows)))
2020}
2021
2022// ---------------------------------------------------------------------------
2023// Public entry points
2024// ---------------------------------------------------------------------------
2025
2026/// Pull-based read-only executor.
2027pub struct PullExecutor<'a, S: GraphStorage> {
2028    storage: &'a S,
2029    params: BTreeMap<String, LoraValue>,
2030}
2031
2032impl<'a, S: GraphStorage> PullExecutor<'a, S> {
2033    pub fn new(storage: &'a S, params: BTreeMap<String, LoraValue>) -> Self {
2034        Self { storage, params }
2035    }
2036
2037    /// Open a streaming cursor for a compiled query.
2038    ///
2039    /// Both no-UNION and UNION-bearing plans go through
2040    /// [`compiled_to_streaming`]: UNION drains its branches via
2041    /// [`UnionSource`] (memory unchanged from the previous buffered
2042    /// path; UNION is inherently O(N) before dedup), but the
2043    /// consumer side is now streaming so any downstream pipeline
2044    /// composes uniformly.
2045    pub fn open_compiled(self, compiled: &'a CompiledQuery) -> ExecResult<Box<dyn RowSource + 'a>>
2046    where
2047        S: 'a,
2048    {
2049        let _ = take_eval_error();
2050        compiled_to_streaming(compiled, self.storage, self.params)
2051    }
2052}
2053
2054/// Pull-based read-write executor. Wraps the existing
2055/// [`MutableExecutor`] under the same row-cursor API. Mutations are
2056/// applied during `open_compiled`; the returned cursor yields the
2057/// resulting rows lazily.
2058pub struct MutablePullExecutor<'a, S: GraphStorageMut> {
2059    storage: &'a mut S,
2060    params: BTreeMap<String, LoraValue>,
2061}
2062
2063impl<'a, S: GraphStorageMut + GraphStorage> MutablePullExecutor<'a, S> {
2064    pub fn new(storage: &'a mut S, params: BTreeMap<String, LoraValue>) -> Self {
2065        Self { storage, params }
2066    }
2067
2068    /// Open a cursor for a compiled write query.
2069    ///
2070    /// Fast path: when a branch root is one of `Create` / `Set` /
2071    /// `Delete` / `Remove` / `Merge` and its input subtree is fully
2072    /// streamable, returns a [`StreamingWriteCursor`] that pulls input
2073    /// row-by-row and applies the per-row write through
2074    /// [`MutableExecutor::apply_write_op`]. `UNION ALL` plans stream
2075    /// one branch at a time. Plain `UNION` drains branches first so
2076    /// rows can be deduplicated by name.
2077    ///
2078    /// Fallback: a branch that is not streamable materializes through
2079    /// [`MutableExecutor::execute_rows`] and wraps the result in a
2080    /// [`BufferedRowSource`].
2081    pub fn open_compiled(self, compiled: &'a CompiledQuery) -> ExecResult<Box<dyn RowSource + 'a>>
2082    where
2083        S: 'a,
2084    {
2085        if compiled.unions.is_empty() {
2086            return open_mutable_plan_cursor(self.storage, &compiled.physical, self.params);
2087        }
2088
2089        MutableUnionSource::open(self.storage, compiled, self.params)
2090            .map(|source| Box::new(source) as Box<dyn RowSource + 'a>)
2091    }
2092}
2093
2094fn open_mutable_plan_cursor<'a, S: GraphStorageMut + GraphStorage + 'a>(
2095    storage: &'a mut S,
2096    plan: &'a PhysicalPlan,
2097    params: BTreeMap<String, LoraValue>,
2098) -> ExecResult<Box<dyn RowSource + 'a>> {
2099    if let Some(input) = write_op_input(plan, plan.root) {
2100        if subtree_is_fully_streaming(plan, input) {
2101            return StreamingWriteCursor::open(storage, plan, plan.root, params)
2102                .map(|c| Box::new(c) as Box<dyn RowSource + 'a>);
2103        }
2104    }
2105
2106    let mut executor = MutableExecutor::new(MutableExecutionContext { storage, params });
2107    let rows = executor.execute_rows(plan)?;
2108    Ok(Box::new(BufferedRowSource::new(rows)))
2109}
2110
2111/// Mutable UNION cursor. `UNION ALL` streams one branch at a time
2112/// against the same staged graph. Plain `UNION` streams branch-by-branch
2113/// while retaining only a seen-key set for deduplication.
2114pub struct MutableUnionSource<'a, S: GraphStorageMut + GraphStorage + 'a> {
2115    storage_ptr: *mut S,
2116    compiled: &'a CompiledQuery,
2117    params: BTreeMap<String, LoraValue>,
2118    branch_idx: usize,
2119    current: Option<Box<dyn RowSource + 'a>>,
2120    needs_dedup: bool,
2121    seen: BTreeSet<Vec<(String, GroupValueKey)>>,
2122    _phantom: std::marker::PhantomData<&'a mut S>,
2123}
2124
2125impl<'a, S: GraphStorageMut + GraphStorage + 'a> MutableUnionSource<'a, S> {
2126    fn open(
2127        storage: &'a mut S,
2128        compiled: &'a CompiledQuery,
2129        params: BTreeMap<String, LoraValue>,
2130    ) -> ExecResult<Self> {
2131        let needs_dedup = compiled.unions.iter().any(|branch| !branch.all);
2132        Ok(Self {
2133            storage_ptr: storage as *mut S,
2134            compiled,
2135            params,
2136            branch_idx: 0,
2137            current: None,
2138            needs_dedup,
2139            seen: BTreeSet::new(),
2140            _phantom: std::marker::PhantomData,
2141        })
2142    }
2143
2144    fn branch_count(&self) -> usize {
2145        self.compiled.unions.len() + 1
2146    }
2147
2148    fn branch_plan(&self, idx: usize) -> &'a PhysicalPlan {
2149        if idx == 0 {
2150            &self.compiled.physical
2151        } else {
2152            &self.compiled.unions[idx - 1].physical
2153        }
2154    }
2155
2156    fn open_branch(&mut self, idx: usize) -> ExecResult<Box<dyn RowSource + 'a>> {
2157        let plan = self.branch_plan(idx);
2158        // SAFETY: MutableUnionSource keeps at most one branch cursor
2159        // alive at a time. `current` is dropped before advancing to
2160        // the next branch, so each mutable reborrow is temporally
2161        // disjoint.
2162        let storage = unsafe { &mut *self.storage_ptr };
2163        open_mutable_plan_cursor(storage, plan, self.params.clone())
2164    }
2165}
2166
2167impl<'a, S: GraphStorageMut + GraphStorage + 'a> RowSource for MutableUnionSource<'a, S> {
2168    fn next_row(&mut self) -> ExecResult<Option<Row>> {
2169        loop {
2170            if self.branch_idx >= self.branch_count() {
2171                return Ok(None);
2172            }
2173
2174            if self.current.is_none() {
2175                self.current = Some(self.open_branch(self.branch_idx)?);
2176            }
2177
2178            match self
2179                .current
2180                .as_mut()
2181                .expect("current branch initialized above")
2182                .next_row()?
2183            {
2184                Some(row) => {
2185                    if self.needs_dedup {
2186                        let key = row
2187                            .iter_named()
2188                            .map(|(_, name, val)| {
2189                                (name.into_owned(), GroupValueKey::from_value(val))
2190                            })
2191                            .collect();
2192                        if !self.seen.insert(key) {
2193                            continue;
2194                        }
2195                    }
2196                    return Ok(Some(row));
2197                }
2198                None => {
2199                    self.current.take();
2200                    self.branch_idx += 1;
2201                }
2202            }
2203        }
2204    }
2205}
2206
2207/// Streaming write cursor for plans whose root is one of
2208/// `Create` / `Set` / `Delete` / `Remove` / `Merge` and whose input
2209/// subtree is fully streamable.
2210///
2211/// # Layout invariant
2212///
2213/// The cursor owns a raw alias `*mut S` of the original `&'a mut S`.
2214/// Its `upstream` was constructed using a `&'a S` reborrow derived
2215/// from `storage_ptr` via unsafe lifetime extension. This is sound
2216/// because the existing read-side `RowSource` impls (see
2217/// `NodeScanSource::cur_ids`, `ExpandSource::cur_edges`, etc.)
2218/// materialize their iteration state into owned `Vec`s at
2219/// construction or first call, so no live `&S` borrow into storage
2220/// persists across `next_row` calls. Read-only access happens
2221/// transiently inside each `upstream.next_row` call; mutable access
2222/// happens between calls inside [`MutableExecutor::apply_write_op`].
2223/// The borrows never overlap in time.
2224///
2225/// # Drop order
2226///
2227/// `upstream` must drop before any caller may regain `&mut S` access
2228/// to the underlying storage. The explicit `Drop` impl enforces
2229/// that order — `ManuallyDrop` lets us force the sequence.
2230pub struct StreamingWriteCursor<'a, S: GraphStorageMut + GraphStorage + 'a> {
2231    /// SAFETY: borrows from `*storage_ptr`. Must drop first.
2232    upstream: ManuallyDrop<Box<dyn RowSource + 'a>>,
2233    /// Raw alias of the `&'a mut S` handed in at construction. Used
2234    /// as `&S` (via `&*ptr`) by `upstream` and as `&mut S` (via
2235    /// `&mut *ptr`) inside this cursor's `next_row`.
2236    storage_ptr: *mut S,
2237    /// Physical plan — kept alive for the per-row op borrow.
2238    plan: &'a PhysicalPlan,
2239    /// Index into `plan.nodes` of the write operator.
2240    /// We re-fetch the op per call so this struct doesn't need to
2241    /// be parameterized by the specific op type.
2242    write_op_node: PhysicalNodeId,
2243    /// Parameters; cloned per row into a fresh `MutableExecutor`.
2244    /// In typical bulk-write workloads this is empty or tiny.
2245    params: BTreeMap<String, LoraValue>,
2246    _phantom: std::marker::PhantomData<&'a mut S>,
2247}
2248
2249impl<'a, S: GraphStorageMut + GraphStorage + 'a> StreamingWriteCursor<'a, S> {
2250    /// Build a cursor. Caller must already have verified that
2251    /// `plan.nodes[write_op_node]` is a streamable write op via
2252    /// [`write_op_input`] and [`subtree_is_fully_streaming`].
2253    pub(crate) fn open(
2254        storage: &'a mut S,
2255        plan: &'a PhysicalPlan,
2256        write_op_node: PhysicalNodeId,
2257        params: BTreeMap<String, LoraValue>,
2258    ) -> ExecResult<Self> {
2259        let input = match write_op_input(plan, write_op_node) {
2260            Some(i) => i,
2261            None => {
2262                return Err(crate::errors::ExecutorError::RuntimeError(format!(
2263                    "StreamingWriteCursor::open called with non-write node {write_op_node:?}"
2264                )));
2265            }
2266        };
2267        let storage_ptr: *mut S = storage as *mut S;
2268
2269        // SAFETY: see struct-level comment.
2270        let storage_ref: &'a S = unsafe { &*storage_ptr };
2271        let upstream = build_streaming(plan, input, storage_ref, Arc::new(params.clone()))?;
2272
2273        Ok(Self {
2274            upstream: ManuallyDrop::new(upstream),
2275            storage_ptr,
2276            plan,
2277            write_op_node,
2278            params,
2279            _phantom: std::marker::PhantomData,
2280        })
2281    }
2282}
2283
2284impl<'a, S: GraphStorageMut + GraphStorage + 'a> RowSource for StreamingWriteCursor<'a, S> {
2285    fn next_row(&mut self) -> ExecResult<Option<Row>> {
2286        let mut row = match self.upstream.next_row()? {
2287            Some(r) => r,
2288            None => return Ok(None),
2289        };
2290
2291        // SAFETY: upstream's `next_row` has returned, so its
2292        // dormant `&S` borrow is not in active use right now. We
2293        // reborrow `&mut S` for the per-row write and drop the
2294        // borrow before the next pull.
2295        let storage_mut: &mut S = unsafe { &mut *self.storage_ptr };
2296        let mut exec = MutableExecutor::new(MutableExecutionContext {
2297            storage: storage_mut,
2298            params: self.params.clone(),
2299        });
2300        let op = &self.plan.nodes[self.write_op_node];
2301        exec.apply_write_op(op, &mut row)?;
2302        let row = exec.hydrate_row(row);
2303        Ok(Some(row))
2304    }
2305}
2306
2307impl<'a, S: GraphStorageMut + GraphStorage + 'a> Drop for StreamingWriteCursor<'a, S> {
2308    fn drop(&mut self) {
2309        // SAFETY: drop `upstream` first to release its borrow into
2310        // `*storage_ptr`. Subsequent fields drop via the normal
2311        // field-drop sequence and don't touch storage.
2312        unsafe {
2313            ManuallyDrop::drop(&mut self.upstream);
2314        }
2315    }
2316}
2317
2318/// Drain a freshly opened cursor into a `Vec<Row>`. Convenience for
2319/// callers that want the streaming entry point but a buffered result.
2320pub fn collect_compiled<'a, S: GraphStorage + 'a>(
2321    storage: &'a S,
2322    params: BTreeMap<String, LoraValue>,
2323    compiled: &'a CompiledQuery,
2324) -> ExecResult<Vec<Row>> {
2325    let mut cursor = PullExecutor::new(storage, params).open_compiled(compiled)?;
2326    drain(cursor.as_mut())
2327}
2328
2329// ---------------------------------------------------------------------------
2330// Stream classification
2331// ---------------------------------------------------------------------------
2332
2333/// Classification of a compiled query, used by the database layer to
2334/// decide whether `db.stream` needs a hidden staged transaction.
2335#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2336pub enum StreamShape {
2337    /// No mutating operator anywhere in the plan or any of its
2338    /// UNION branches. Safe to stream against the live store.
2339    ReadOnly,
2340    /// Has at least one mutating operator (Create / Merge / Delete /
2341    /// Set / Remove). The host should run this against a staged
2342    /// graph and only publish on cursor exhaustion.
2343    Mutating,
2344}
2345
2346impl StreamShape {
2347    pub fn is_mutating(self) -> bool {
2348        matches!(self, StreamShape::Mutating)
2349    }
2350}
2351
2352fn plan_is_mutating(plan: &PhysicalPlan) -> bool {
2353    plan.nodes.iter().any(|op| {
2354        matches!(
2355            op,
2356            PhysicalOp::Create(_)
2357                | PhysicalOp::Merge(_)
2358                | PhysicalOp::Delete(_)
2359                | PhysicalOp::Set(_)
2360                | PhysicalOp::Remove(_)
2361        )
2362    })
2363}
2364
2365/// Classify a compiled query for streaming. Treats any UNION branch
2366/// the same as the head: a single mutating op anywhere across the
2367/// compiled query promotes the whole query to `Mutating`.
2368pub fn classify_stream(compiled: &CompiledQuery) -> StreamShape {
2369    if plan_is_mutating(&compiled.physical)
2370        || compiled
2371            .unions
2372            .iter()
2373            .any(|b| plan_is_mutating(&b.physical))
2374    {
2375        StreamShape::Mutating
2376    } else {
2377        StreamShape::ReadOnly
2378    }
2379}
2380
2381// ---------------------------------------------------------------------------
2382// Plan-derived result columns
2383// ---------------------------------------------------------------------------
2384
2385/// Result column names derived from the compiled plan.
2386///
2387/// Walks the plan from `root` looking for the topmost projection-shaped
2388/// node (Projection, HashAggregation). Other operators that wrap a
2389/// projection (Limit, Sort, PathBuild, OptionalMatch, Filter, Unwind,
2390/// Create/Merge/Set/Delete/Remove) defer to their input. Returns an
2391/// empty `Vec` for plans that have no named output (e.g. a bare
2392/// scan-only plan), preserving the previous "infer from first row"
2393/// behaviour for those cases.
2394pub fn plan_result_columns(plan: &PhysicalPlan) -> Vec<String> {
2395    plan_columns_at(plan, plan.root).unwrap_or_default()
2396}
2397
2398fn plan_columns_at(plan: &PhysicalPlan, node: PhysicalNodeId) -> Option<Vec<String>> {
2399    match &plan.nodes[node] {
2400        PhysicalOp::Projection(p) => Some(p.items.iter().map(|i| i.name.clone()).collect()),
2401        PhysicalOp::HashAggregation(p) => Some(
2402            p.group_by
2403                .iter()
2404                .chain(p.aggregates.iter())
2405                .map(|i| i.name.clone())
2406                .collect(),
2407        ),
2408        PhysicalOp::Limit(p) => plan_columns_at(plan, p.input),
2409        PhysicalOp::Sort(p) => plan_columns_at(plan, p.input),
2410        PhysicalOp::PathBuild(p) => plan_columns_at(plan, p.input),
2411        PhysicalOp::OptionalMatch(p) => plan_columns_at(plan, p.input),
2412        PhysicalOp::Filter(p) => plan_columns_at(plan, p.input),
2413        PhysicalOp::Unwind(p) => plan_columns_at(plan, p.input),
2414        PhysicalOp::Create(p) => plan_columns_at(plan, p.input),
2415        PhysicalOp::Merge(p) => plan_columns_at(plan, p.input),
2416        PhysicalOp::Delete(p) => plan_columns_at(plan, p.input),
2417        PhysicalOp::Set(p) => plan_columns_at(plan, p.input),
2418        PhysicalOp::Remove(p) => plan_columns_at(plan, p.input),
2419        PhysicalOp::Argument(_)
2420        | PhysicalOp::NodeScan(_)
2421        | PhysicalOp::NodeByLabelScan(_)
2422        | PhysicalOp::NodeByPropertyScan(_)
2423        | PhysicalOp::Expand(_) => None,
2424    }
2425}
2426
2427/// Result column names for a compiled query (head plan; UNION branches
2428/// must produce the same shape so the head's columns are authoritative).
2429pub fn compiled_result_columns(compiled: &CompiledQuery) -> Vec<String> {
2430    plan_result_columns(&compiled.physical)
2431}
2432
2433#[cfg(test)]
2434mod tests {
2435    use super::*;
2436    use lora_analyzer::symbols::VarId;
2437    use lora_ast::Span;
2438    use lora_compiler::physical::{ArgumentExec, ExpandExec, NodeByLabelScanExec, PhysicalPlan};
2439    use lora_store::{GraphStorageMut, InMemoryGraph};
2440
2441    #[test]
2442    fn variable_length_expand_has_streaming_source() {
2443        let mut graph = InMemoryGraph::new();
2444        let a = graph.create_node(vec!["N".into()], BTreeMap::new());
2445        let b = graph.create_node(vec!["N".into()], BTreeMap::new());
2446        let c = graph.create_node(vec!["N".into()], BTreeMap::new());
2447        graph
2448            .create_relationship(a.id, b.id, "R", BTreeMap::new())
2449            .unwrap();
2450        graph
2451            .create_relationship(b.id, c.id, "R", BTreeMap::new())
2452            .unwrap();
2453
2454        let src = VarId(0);
2455        let rel = VarId(1);
2456        let dst = VarId(2);
2457        let plan = PhysicalPlan {
2458            root: 2,
2459            nodes: vec![
2460                PhysicalOp::Argument(ArgumentExec),
2461                PhysicalOp::NodeByLabelScan(NodeByLabelScanExec {
2462                    input: Some(0),
2463                    var: src,
2464                    labels: vec![vec!["N".into()]],
2465                }),
2466                PhysicalOp::Expand(ExpandExec {
2467                    input: 1,
2468                    src,
2469                    rel: Some(rel),
2470                    dst,
2471                    types: vec!["R".into()],
2472                    direction: Direction::Right,
2473                    rel_properties: None,
2474                    range: Some(RangeLiteral {
2475                        start: Some(1),
2476                        end: Some(2),
2477                        span: Span::default(),
2478                    }),
2479                }),
2480            ],
2481        };
2482
2483        assert!(subtree_is_fully_streaming(&plan, plan.root));
2484
2485        let mut source =
2486            build_streaming(&plan, plan.root, &graph, Arc::new(BTreeMap::new())).unwrap();
2487        let rows = drain(source.as_mut()).unwrap();
2488        let mut rel_lengths = rows
2489            .iter()
2490            .map(|row| match row.get(rel).unwrap() {
2491                LoraValue::List(rels) => rels.len(),
2492                other => panic!("expected relationship list, got {other:?}"),
2493            })
2494            .collect::<Vec<_>>();
2495        rel_lengths.sort_unstable();
2496
2497        assert_eq!(rows.len(), 3);
2498        assert_eq!(rel_lengths, vec![1, 1, 2]);
2499    }
2500}