Skip to main content

grafeo_engine/query/executor/
stream.rs

1//! Lazy, cursor-based query result streams.
2//!
3//! Today `Session::execute` drains the entire operator pipeline into a
4//! `QueryResult { rows: Vec<Vec<Value>> }` before returning. For large result
5//! sets this either exhausts memory or forces the caller to wait until the
6//! final row has been produced before they can see the first one.
7//!
8//! `ResultStream` exposes the pipeline lazily: the consumer pulls one
9//! `DataChunk` (up to 2048 rows) at a time from the root operator. Dropping
10//! the stream releases the operator tree and decrements the owning session's
11//! `active_streams` counter.
12//!
13//! # Stability: Experimental
14//!
15//! This module is new in 0.5.40. Signatures may change before being promoted
16//! to Beta. Use from embedded callers that want first-row latency or bounded
17//! memory; use `Session::execute` when you want a fully materialized result.
18
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::time::Instant;
21
22use grafeo_common::types::{LogicalType, Value};
23use grafeo_common::utils::error::{Error, QueryError, Result};
24use grafeo_core::execution::DataChunk;
25use grafeo_core::execution::operators::Operator;
26
27use crate::database::QueryResult;
28
29/// RAII guard that increments/decrements a session's active-stream counter.
30///
31/// The counter prevents `commit()` / `rollback()` from racing with in-flight
32/// streams that still hold references to the session's read snapshot.
33pub(crate) struct StreamGuard<'s> {
34    counter: &'s AtomicUsize,
35}
36
37impl<'s> StreamGuard<'s> {
38    pub(crate) fn new(counter: &'s AtomicUsize) -> Self {
39        counter.fetch_add(1, Ordering::AcqRel);
40        Self { counter }
41    }
42}
43
44impl Drop for StreamGuard<'_> {
45    fn drop(&mut self) {
46        self.counter.fetch_sub(1, Ordering::AcqRel);
47    }
48}
49
50/// Lazy, chunk-based result stream bound to a session's lifetime.
51///
52/// Created by [`Session::execute_streaming`](crate::Session::execute_streaming).
53/// Iterate via [`next_chunk`](Self::next_chunk) for chunk granularity or
54/// [`into_row_iter`](Self::into_row_iter) for a row iterator.
55///
56/// # Stability: Experimental
57pub struct ResultStream<'session> {
58    operator: Box<dyn Operator>,
59    columns: Vec<String>,
60    column_types: Vec<LogicalType>,
61    deadline: Option<Instant>,
62    exhausted: bool,
63    _guard: StreamGuard<'session>,
64}
65
66impl<'s> ResultStream<'s> {
67    pub(crate) fn new(
68        operator: Box<dyn Operator>,
69        columns: Vec<String>,
70        deadline: Option<Instant>,
71        guard: StreamGuard<'s>,
72    ) -> Self {
73        let len = columns.len();
74        Self {
75            operator,
76            columns,
77            column_types: vec![LogicalType::Any; len],
78            deadline,
79            exhausted: false,
80            _guard: guard,
81        }
82    }
83
84    /// Column names in the order they appear in each row.
85    #[must_use]
86    pub fn columns(&self) -> &[String] {
87        &self.columns
88    }
89
90    /// Column types. Initially `Any`; refined after the first non-empty chunk.
91    #[must_use]
92    pub fn column_types(&self) -> &[LogicalType] {
93        &self.column_types
94    }
95
96    /// Pulls the next chunk from the pipeline.
97    ///
98    /// Returns `Ok(None)` when the stream is exhausted.
99    ///
100    /// # Errors
101    ///
102    /// Propagates operator errors and returns `Error::Query(QueryError::Timeout)`
103    /// if the session's query deadline has passed.
104    pub fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
105        if self.exhausted {
106            return Ok(None);
107        }
108        check_deadline(self.deadline)?;
109        match self.operator.next() {
110            Ok(Some(chunk)) => {
111                refine_column_types(&chunk, &mut self.column_types);
112                Ok(Some(chunk))
113            }
114            Ok(None) => {
115                self.exhausted = true;
116                Ok(None)
117            }
118            Err(err) => Err(super::convert_operator_error(err)),
119        }
120    }
121
122    /// Converts to a row-level iterator that buffers one chunk internally.
123    #[must_use]
124    pub fn into_row_iter(self) -> RowIterator<'s> {
125        RowIterator {
126            stream: self,
127            current: None,
128            cursor: 0,
129        }
130    }
131
132    /// Drains the stream into a fully materialized [`QueryResult`].
133    ///
134    /// Useful as an escape hatch when a caller requested streaming but then
135    /// decides to collect everything (e.g., `stream.collect()?` in tests).
136    ///
137    /// # Errors
138    ///
139    /// Propagates operator errors and deadline timeouts.
140    pub fn collect(mut self) -> Result<QueryResult> {
141        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
142        while let Some(chunk) = self.next_chunk()? {
143            append_chunk(&chunk, &mut result);
144        }
145        result.column_types = self.column_types;
146        Ok(result)
147    }
148}
149
150/// Row-level iterator adapter over a [`ResultStream`].
151///
152/// # Stability: Experimental
153pub struct RowIterator<'s> {
154    stream: ResultStream<'s>,
155    // Indices are materialized once per chunk; re-collecting on every `next`
156    // would turn per-chunk iteration from O(n) into O(n^2).
157    current: Option<(DataChunk, Vec<usize>)>,
158    cursor: usize,
159}
160
161impl RowIterator<'_> {
162    /// Column names from the source stream.
163    #[must_use]
164    pub fn columns(&self) -> &[String] {
165        self.stream.columns()
166    }
167}
168
169impl Iterator for RowIterator<'_> {
170    type Item = Result<Vec<Value>>;
171
172    fn next(&mut self) -> Option<Self::Item> {
173        loop {
174            if let Some((chunk, indices)) = &self.current {
175                if self.cursor < indices.len() {
176                    let row_idx = indices[self.cursor];
177                    self.cursor += 1;
178                    return Some(Ok(extract_row(chunk, row_idx)));
179                }
180                self.current = None;
181                self.cursor = 0;
182            }
183            match self.stream.next_chunk() {
184                Ok(Some(chunk)) => {
185                    if chunk.row_count() == 0 {
186                        continue;
187                    }
188                    let indices: Vec<usize> = chunk.selected_indices().collect();
189                    self.current = Some((chunk, indices));
190                    self.cursor = 0;
191                }
192                Ok(None) => return None,
193                Err(err) => return Some(Err(err)),
194            }
195        }
196    }
197}
198
199/// Binding-friendly result stream with no lifetime parameter.
200///
201/// Used by language bindings (Python, Node.js, WASM) where Rust lifetimes
202/// cannot be expressed at the FFI boundary. The operator tree is `'static`
203/// because operators hold `Arc<dyn GraphStoreSearch>` rather than borrows; the
204/// stores remain alive as long as the stream does.
205///
206/// Callers that need to tie the stream's lifetime to something else (e.g.
207/// a wrapping `Arc<RwLock<GrafeoDB>>` in a binding) should carry that
208/// keepalive in their own wrapper alongside the stream.
209///
210/// # Stability: Experimental
211pub struct OwnedResultStream {
212    operator: Box<dyn Operator>,
213    columns: Vec<String>,
214    column_types: Vec<LogicalType>,
215    deadline: Option<Instant>,
216    exhausted: bool,
217}
218
219impl std::fmt::Debug for OwnedResultStream {
220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221        f.debug_struct("OwnedResultStream")
222            .field("columns", &self.columns)
223            .field("column_types", &self.column_types)
224            .field("deadline", &self.deadline)
225            .field("exhausted", &self.exhausted)
226            .finish_non_exhaustive()
227    }
228}
229
230impl OwnedResultStream {
231    pub(crate) fn new(
232        operator: Box<dyn Operator>,
233        columns: Vec<String>,
234        deadline: Option<Instant>,
235    ) -> Self {
236        let len = columns.len();
237        Self {
238            operator,
239            columns,
240            column_types: vec![LogicalType::Any; len],
241            deadline,
242            exhausted: false,
243        }
244    }
245
246    /// Column names in the order they appear in each row.
247    #[must_use]
248    pub fn columns(&self) -> &[String] {
249        &self.columns
250    }
251
252    /// Column types. Initially `Any`; refined after the first non-empty chunk.
253    #[must_use]
254    pub fn column_types(&self) -> &[LogicalType] {
255        &self.column_types
256    }
257
258    /// Pulls the next chunk. See [`ResultStream::next_chunk`].
259    ///
260    /// # Errors
261    ///
262    /// Propagates operator errors and deadline timeouts.
263    pub fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
264        if self.exhausted {
265            return Ok(None);
266        }
267        check_deadline(self.deadline)?;
268        match self.operator.next() {
269            Ok(Some(chunk)) => {
270                refine_column_types(&chunk, &mut self.column_types);
271                Ok(Some(chunk))
272            }
273            Ok(None) => {
274                self.exhausted = true;
275                Ok(None)
276            }
277            Err(err) => Err(super::convert_operator_error(err)),
278        }
279    }
280
281    /// Converts to a row iterator that buffers one chunk internally.
282    #[must_use]
283    pub fn into_row_iter(self) -> OwnedRowIterator {
284        OwnedRowIterator {
285            stream: self,
286            current: None,
287            cursor: 0,
288        }
289    }
290
291    /// Drains into a [`QueryResult`].
292    ///
293    /// # Errors
294    ///
295    /// Propagates operator errors and deadline timeouts.
296    pub fn collect(mut self) -> Result<QueryResult> {
297        let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
298        while let Some(chunk) = self.next_chunk()? {
299            append_chunk(&chunk, &mut result);
300        }
301        result.column_types = self.column_types;
302        Ok(result)
303    }
304}
305
306/// Row-level iterator over an [`OwnedResultStream`].
307///
308/// # Stability: Experimental
309pub struct OwnedRowIterator {
310    stream: OwnedResultStream,
311    current: Option<(DataChunk, Vec<usize>)>,
312    cursor: usize,
313}
314
315impl OwnedRowIterator {
316    /// Column names from the source stream.
317    #[must_use]
318    pub fn columns(&self) -> &[String] {
319        self.stream.columns()
320    }
321}
322
323impl Iterator for OwnedRowIterator {
324    type Item = Result<Vec<Value>>;
325
326    fn next(&mut self) -> Option<Self::Item> {
327        loop {
328            if let Some((chunk, indices)) = &self.current {
329                if self.cursor < indices.len() {
330                    let row_idx = indices[self.cursor];
331                    self.cursor += 1;
332                    return Some(Ok(extract_row(chunk, row_idx)));
333                }
334                self.current = None;
335                self.cursor = 0;
336            }
337            match self.stream.next_chunk() {
338                Ok(Some(chunk)) => {
339                    if chunk.row_count() == 0 {
340                        continue;
341                    }
342                    let indices: Vec<usize> = chunk.selected_indices().collect();
343                    self.current = Some((chunk, indices));
344                    self.cursor = 0;
345                }
346                Ok(None) => return None,
347                Err(err) => return Some(Err(err)),
348            }
349        }
350    }
351}
352
353// ---------------------------------------------------------------------------
354// Internal helpers
355// ---------------------------------------------------------------------------
356
357fn check_deadline(deadline: Option<Instant>) -> Result<()> {
358    #[cfg(not(target_arch = "wasm32"))]
359    if let Some(d) = deadline
360        && Instant::now() >= d
361    {
362        return Err(Error::Query(QueryError::timeout()));
363    }
364    #[cfg(target_arch = "wasm32")]
365    let _ = deadline;
366    Ok(())
367}
368
369fn refine_column_types(chunk: &DataChunk, types: &mut Vec<LogicalType>) {
370    let col_count = chunk.column_count();
371    if col_count == 0 {
372        return;
373    }
374    if types.len() != col_count {
375        types.resize(col_count, LogicalType::Any);
376    }
377    for (col_idx, slot) in types.iter_mut().enumerate().take(col_count) {
378        if matches!(slot, LogicalType::Any)
379            && let Some(col) = chunk.column(col_idx)
380        {
381            *slot = col.data_type().clone();
382        }
383    }
384}
385
386fn extract_row(chunk: &DataChunk, row_idx: usize) -> Vec<Value> {
387    let col_count = chunk.column_count();
388    let mut row = Vec::with_capacity(col_count);
389    for col_idx in 0..col_count {
390        let value = chunk
391            .column(col_idx)
392            .and_then(|col| col.get_value(row_idx))
393            .unwrap_or(Value::Null);
394        row.push(value);
395    }
396    row
397}
398
399fn append_chunk(chunk: &DataChunk, result: &mut QueryResult) {
400    for row_idx in chunk.selected_indices() {
401        result.rows.push(extract_row(chunk, row_idx));
402    }
403}