Skip to main content

bsql_core/
stream.rs

1//! True PG-level streaming query results.
2//!
3//! [`QueryStream`] uses the extended query protocol's `Execute(max_rows=N)`
4//! to fetch rows in chunks from PostgreSQL. Only one chunk is in memory at a
5//! time — the arena is reset between chunks.
6//!
7//! The connection is held for the lifetime of the stream. When the stream is
8//! dropped (whether fully consumed or not), the connection returns to the pool.
9//! If the stream is dropped mid-iteration, the connection is discarded (not
10//! returned to the pool) because the portal may still be open on the server.
11
12use std::sync::Arc;
13
14use bsql_driver_postgres::arena::release_arena;
15use bsql_driver_postgres::{Arena, ColumnDesc, QueryResult};
16
17/// Default chunk size for streaming queries.
18///
19/// 64 rows per Execute call balances network round-trip overhead against
20/// memory consumption. Each chunk is parsed into the arena, decoded into
21/// owned values, then the arena is recycled.
22const STREAM_CHUNK_SIZE: i32 = 64;
23
24/// A stream of rows backed by true PG-level chunked fetching.
25///
26/// Created by [`Pool::query_stream`](crate::pool::Pool::query_stream).
27///
28/// The `PoolGuard` is held until the stream is fully consumed or dropped.
29/// Rows are fetched in chunks of 64 via `Execute(max_rows=64)`.
30///
31/// # Usage
32///
33/// Use [`advance()`](QueryStream::advance) + [`next_row()`](QueryStream::next_row)
34/// for row-by-row iteration:
35///
36/// ```rust,ignore
37/// let mut stream = pool.query_stream(sql, hash, &[])?;
38/// while stream.advance()? {
39///     let row = stream.next_row().unwrap();
40///     let id: i32 = row.get_i32(0).unwrap();
41///     // decode before next advance() — row borrows from arena
42/// }
43/// ```
44pub struct QueryStream {
45    /// Held to keep the connection alive while streaming.
46    guard: Option<bsql_driver_postgres::PoolGuard>,
47    arena: Option<Arena>,
48    /// Current chunk's row metadata.
49    current_result: Option<QueryResult>,
50    /// Position within the current chunk.
51    position: usize,
52    /// Column descriptors (shared across all chunks via Arc).
53    /// Passed by reference to `QueryResult::from_parts` to avoid Arc
54    /// refcount increments per chunk.
55    columns: Arc<[ColumnDesc]>,
56    /// Whether all rows have been consumed from the server.
57    finished: bool,
58    /// Whether we need to send Execute+Sync before reading the next chunk.
59    /// True after the first chunk (since query_streaming_start already sent
60    /// the first Execute).
61    needs_execute: bool,
62}
63
64impl QueryStream {
65    /// Create a new `QueryStream`.
66    ///
67    /// `first_result` is the first chunk of rows (from the initial Execute).
68    /// `finished` is true if the first chunk was the only chunk (CommandComplete
69    /// received).
70    pub(crate) fn new(
71        guard: bsql_driver_postgres::PoolGuard,
72        arena: Arena,
73        first_result: QueryResult,
74        columns: Arc<[ColumnDesc]>,
75        finished: bool,
76    ) -> Self {
77        Self {
78            guard: Some(guard),
79            arena: Some(arena),
80            current_result: Some(first_result),
81            position: 0,
82            columns,
83            finished,
84            needs_execute: !finished, // if not finished, next call needs Execute+Sync
85        }
86    }
87
88    /// Get the next row from the current in-memory chunk.
89    ///
90    /// Returns `None` when the current chunk is exhausted. Call
91    /// [`fetch_next_chunk()`](QueryStream::fetch_next_chunk) to load more rows
92    /// from the server, or use [`advance()`](QueryStream::advance) which
93    /// handles chunk management automatically.
94    ///
95    /// Rows borrow from the arena, which is reset between chunks. Each row
96    /// must be fully decoded (into owned types) before calling `next_row()`
97    /// again. The generated code already does this — it decodes into owned
98    /// struct fields.
99    pub fn next_row(&mut self) -> Option<bsql_driver_postgres::Row<'_>> {
100        // Check if current chunk has more rows
101        if let Some(ref result) = self.current_result {
102            if self.position < result.len() {
103                let arena = self.arena.as_ref()?;
104                let row = result.row(self.position, arena);
105                self.position += 1;
106                return Some(row);
107            }
108        }
109
110        // Current chunk exhausted — cannot fetch more synchronously here.
111        // Use `fetch_next_chunk()` to load the next chunk.
112        None
113    }
114
115    /// Ensure the current chunk has rows available for `next_row()`.
116    ///
117    /// If the current chunk is exhausted but more rows exist on the server,
118    /// fetches the next chunk. Returns `true` if rows are available (call
119    /// `next_row()` next), `false` if all rows have been consumed.
120    ///
121    /// This is the complement to `next_row()`. Together they form
122    /// the primary iteration pattern:
123    ///
124    /// ```rust,ignore
125    /// while stream.advance()? {
126    ///     let row = stream.next_row().unwrap();
127    ///     let id: i32 = row.get_i32(0).unwrap();
128    ///     // decode before next advance() — row borrows from arena
129    /// }
130    /// ```
131    pub async fn advance(&mut self) -> Result<bool, crate::error::BsqlError> {
132        // Fast path: current chunk still has rows
133        if let Some(ref result) = self.current_result {
134            if self.position < result.len() {
135                return Ok(true);
136            }
137        }
138
139        // Current chunk exhausted
140        if self.finished {
141            return Ok(false);
142        }
143
144        // Fetch the next chunk
145        self.fetch_next_chunk().await?;
146
147        // Check if the new chunk has rows
148        if let Some(ref result) = self.current_result {
149            if self.position < result.len() {
150                return Ok(true);
151            }
152        }
153
154        Ok(false)
155    }
156
157    /// Whether more rows might be available (either in the current chunk or
158    /// from the server).
159    pub fn has_more(&self) -> bool {
160        if let Some(ref result) = self.current_result {
161            if self.position < result.len() {
162                return true;
163            }
164        }
165        !self.finished
166    }
167
168    /// Fetch the next chunk from the server.
169    ///
170    /// Returns `true` if a new chunk was fetched (call `next_row()` to iterate
171    /// it). Returns `false` if all rows have been consumed.
172    ///
173    /// The arena is reset before fetching the new chunk, invalidating any
174    /// previous `Row` references. The generated code always decodes rows into
175    /// owned fields before calling this.
176    pub async fn fetch_next_chunk(&mut self) -> Result<bool, crate::error::BsqlError> {
177        if self.finished {
178            return Ok(false);
179        }
180
181        let guard = self.guard.as_mut().ok_or_else(|| {
182            crate::error::BsqlError::from(bsql_driver_postgres::DriverError::Pool(
183                "stream guard already taken".into(),
184            ))
185        })?;
186
187        let arena = self.arena.as_mut().ok_or_else(|| {
188            crate::error::BsqlError::from(bsql_driver_postgres::DriverError::Pool(
189                "stream arena already taken".into(),
190            ))
191        })?;
192
193        // Reset arena for the new chunk
194        arena.reset();
195
196        // Send Execute+Sync if needed (2nd+ chunks)
197        if self.needs_execute {
198            guard
199                .streaming_send_execute(STREAM_CHUNK_SIZE)
200                .map_err(crate::error::BsqlError::from_driver_query)?;
201        }
202
203        let num_cols = self.columns.len();
204
205        // Reclaim the Vec from the previous chunk result to reuse its allocation.
206        let mut col_offsets = match self.current_result.as_mut() {
207            Some(result) => {
208                let mut v = result.take_col_offsets();
209                v.clear();
210                v
211            }
212            None => Vec::with_capacity(num_cols * STREAM_CHUNK_SIZE as usize),
213        };
214
215        let more = guard
216            .streaming_next_chunk(arena, &mut col_offsets)
217            .map_err(crate::error::BsqlError::from_driver_query)?;
218
219        if !more {
220            self.finished = true;
221        }
222        self.needs_execute = more; // if more rows, next call needs Execute+Sync
223
224        if col_offsets.is_empty() && !more {
225            self.current_result = None;
226            self.position = 0;
227            return Ok(false);
228        }
229
230        // Pass Arc::clone of columns. The Arc is shared across all chunks —
231        // this is a single refcount increment per chunk, not per row.
232        self.current_result = Some(QueryResult::from_parts(
233            col_offsets,
234            num_cols,
235            Arc::clone(&self.columns),
236            0,
237        ));
238        self.position = 0;
239
240        Ok(true)
241    }
242
243    /// Number of remaining rows in the current chunk.
244    pub fn remaining(&self) -> usize {
245        match self.current_result {
246            Some(ref result) => result.len().saturating_sub(self.position),
247            None => 0,
248        }
249    }
250
251    /// Column descriptors for the result set.
252    pub fn columns(&self) -> &[ColumnDesc] {
253        &self.columns
254    }
255}
256
257impl Drop for QueryStream {
258    fn drop(&mut self) {
259        if let Some(arena) = self.arena.take() {
260            release_arena(arena);
261        }
262        // If the stream was not fully consumed, the connection is in an
263        // indeterminate protocol state (portal open, no ReadyForQuery sent).
264        // We cannot send Close+Sync in Drop (requires I/O), so we
265        // mark the guard for discard to prevent it from being returned to
266        // the pool. The TCP disconnect causes PG to clean up the portal.
267        if !self.finished {
268            if let Some(mut guard) = self.guard.take() {
269                guard.mark_discard();
270                drop(guard);
271            }
272        }
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279    use bsql_driver_postgres::arena::acquire_arena;
280    use bsql_driver_postgres::{ColumnDesc, QueryResult};
281
282    /// Build a QueryResult with `num_rows` rows and the given columns.
283    /// Each cell is NULL (offset=0, len=-1) which is fine for structural tests.
284    fn make_result(num_rows: usize, columns: &Arc<[ColumnDesc]>) -> QueryResult {
285        let num_cols = columns.len();
286        let col_offsets = vec![(0usize, -1i32); num_rows * num_cols];
287        QueryResult::from_parts(col_offsets, num_cols, Arc::clone(columns), 0)
288    }
289
290    fn sample_columns(n: usize) -> Arc<[ColumnDesc]> {
291        (0..n)
292            .map(|i| ColumnDesc {
293                name: format!("col{i}").into(),
294                type_oid: 23,
295                type_size: 4,
296                table_oid: 0,
297                column_id: 0,
298            })
299            .collect::<Vec<_>>()
300            .into()
301    }
302
303    /// Build a QueryStream without a real PoolGuard.
304    /// guard=None means fetch_next_chunk will fail, but structural methods work.
305    fn make_stream(num_rows: usize, num_cols: usize, finished: bool) -> QueryStream {
306        let columns = sample_columns(num_cols);
307        let result = make_result(num_rows, &columns);
308        let arena = acquire_arena();
309        QueryStream {
310            guard: None,
311            arena: Some(arena),
312            current_result: Some(result),
313            position: 0,
314            columns,
315            finished,
316            needs_execute: !finished,
317        }
318    }
319
320    // --- next_row returns rows from buffer ---
321
322    #[test]
323    fn next_row_returns_rows() {
324        let mut stream = make_stream(3, 2, true);
325        assert!(stream.next_row().is_some());
326        assert!(stream.next_row().is_some());
327        assert!(stream.next_row().is_some());
328    }
329
330    #[test]
331    fn next_row_returns_none_when_exhausted() {
332        let mut stream = make_stream(2, 1, true);
333        assert!(stream.next_row().is_some());
334        assert!(stream.next_row().is_some());
335        assert!(stream.next_row().is_none());
336    }
337
338    #[test]
339    fn next_row_returns_none_for_empty_result() {
340        let mut stream = make_stream(0, 1, true);
341        assert!(stream.next_row().is_none());
342    }
343
344    // --- has_more ---
345
346    #[test]
347    fn has_more_true_when_rows_in_buffer() {
348        let stream = make_stream(2, 1, true);
349        assert!(stream.has_more());
350    }
351
352    #[test]
353    fn has_more_false_when_exhausted_and_finished() {
354        let mut stream = make_stream(1, 1, true);
355        let _ = stream.next_row();
356        assert!(!stream.has_more());
357    }
358
359    #[test]
360    fn has_more_true_when_exhausted_but_not_finished() {
361        let mut stream = make_stream(1, 1, false);
362        let _ = stream.next_row();
363        // Buffer exhausted but server may have more
364        assert!(stream.has_more());
365    }
366
367    // --- remaining ---
368
369    #[test]
370    fn remaining_full_buffer() {
371        let stream = make_stream(5, 2, true);
372        assert_eq!(stream.remaining(), 5);
373    }
374
375    #[test]
376    fn remaining_after_consuming() {
377        let mut stream = make_stream(3, 1, true);
378        let _ = stream.next_row();
379        assert_eq!(stream.remaining(), 2);
380        let _ = stream.next_row();
381        assert_eq!(stream.remaining(), 1);
382        let _ = stream.next_row();
383        assert_eq!(stream.remaining(), 0);
384    }
385
386    #[test]
387    fn remaining_empty_result() {
388        let stream = make_stream(0, 1, true);
389        assert_eq!(stream.remaining(), 0);
390    }
391
392    // --- columns ---
393
394    #[test]
395    fn columns_returns_descriptors() {
396        let stream = make_stream(1, 3, true);
397        let cols = stream.columns();
398        assert_eq!(cols.len(), 3);
399        assert_eq!(&*cols[0].name, "col0");
400        assert_eq!(&*cols[1].name, "col1");
401        assert_eq!(&*cols[2].name, "col2");
402    }
403
404    // --- finished flag ---
405
406    #[test]
407    fn finished_stream_has_more_false_after_drain() {
408        let mut stream = make_stream(1, 1, true);
409        let _ = stream.next_row();
410        assert!(!stream.has_more());
411    }
412
413    // --- fetch_next_chunk requires guard ---
414
415    #[tokio::test]
416    async fn fetch_next_chunk_without_guard_errors() {
417        let mut stream = make_stream(0, 1, false);
418        let result = stream.fetch_next_chunk().await;
419        assert!(result.is_err(), "should error without guard");
420    }
421
422    #[tokio::test]
423    async fn fetch_next_chunk_when_finished_returns_false() {
424        let mut stream = make_stream(0, 1, true);
425        let result = stream.fetch_next_chunk().await.unwrap();
426        assert!(!result, "finished stream should return false");
427    }
428
429    // --- advance ---
430
431    #[tokio::test]
432    async fn advance_returns_true_when_rows_available() {
433        let mut stream = make_stream(2, 1, true);
434        let has = stream.advance().await.unwrap();
435        assert!(has);
436    }
437
438    #[tokio::test]
439    async fn advance_returns_false_when_finished_and_exhausted() {
440        let mut stream = make_stream(1, 1, true);
441        let _ = stream.next_row(); // consume the one row
442        let has = stream.advance().await.unwrap();
443        assert!(!has);
444    }
445
446    // --- Drop releases arena ---
447
448    #[test]
449    fn drop_releases_arena() {
450        let stream = make_stream(3, 2, true);
451        drop(stream);
452        // If arena was released back to pool, acquire should succeed
453        let arena = acquire_arena();
454        bsql_driver_postgres::arena::release_arena(arena);
455    }
456
457    // --- fetch_next_chunk without arena errors ---
458
459    #[tokio::test]
460    async fn fetch_next_chunk_without_arena_errors() {
461        let columns = sample_columns(1);
462        let result = make_result(0, &columns);
463        let mut stream = QueryStream {
464            guard: None,
465            arena: None, // no arena
466            current_result: Some(result),
467            position: 0,
468            columns,
469            finished: false,
470            needs_execute: false,
471        };
472        let res = stream.fetch_next_chunk().await;
473        assert!(res.is_err(), "should error without arena");
474    }
475
476    // --- advance when not finished but fetch fails ---
477
478    #[tokio::test]
479    async fn advance_fetch_fails_propagates_error() {
480        // Stream with 0 rows, not finished, no guard -> advance triggers fetch -> error
481        let mut stream = make_stream(0, 1, false);
482        let res = stream.advance().await;
483        assert!(res.is_err(), "advance should propagate fetch error");
484    }
485
486    // --- remaining on None result ---
487
488    #[test]
489    fn remaining_with_none_result() {
490        let columns = sample_columns(1);
491        let arena = acquire_arena();
492        let stream = QueryStream {
493            guard: None,
494            arena: Some(arena),
495            current_result: None,
496            position: 0,
497            columns,
498            finished: true,
499            needs_execute: false,
500        };
501        assert_eq!(stream.remaining(), 0);
502    }
503
504    // --- has_more with None result and finished ---
505
506    #[test]
507    fn has_more_with_none_result_finished() {
508        let columns = sample_columns(1);
509        let arena = acquire_arena();
510        let stream = QueryStream {
511            guard: None,
512            arena: Some(arena),
513            current_result: None,
514            position: 0,
515            columns,
516            finished: true,
517            needs_execute: false,
518        };
519        assert!(!stream.has_more());
520    }
521
522    // --- columns returns correct count ---
523
524    #[test]
525    fn columns_zero_columns() {
526        let stream = make_stream(0, 0, true);
527        assert_eq!(stream.columns().len(), 0);
528    }
529
530    // --- Creation patterns ---
531
532    #[test]
533    fn make_stream_finished_true_needs_execute_false() {
534        let stream = make_stream(3, 2, true);
535        assert!(
536            !stream.needs_execute,
537            "finished stream should not need execute"
538        );
539        assert!(stream.finished);
540    }
541
542    #[test]
543    fn make_stream_not_finished_needs_execute_true() {
544        let stream = make_stream(3, 2, false);
545        assert!(
546            stream.needs_execute,
547            "unfinished stream should need execute"
548        );
549        assert!(!stream.finished);
550    }
551
552    #[test]
553    fn make_stream_zero_rows_zero_cols_remaining_zero() {
554        let stream = make_stream(0, 0, true);
555        assert_eq!(stream.remaining(), 0);
556        assert!(!stream.has_more());
557    }
558
559    // --- remaining/has_more interaction after partial consumption ---
560
561    #[test]
562    fn remaining_and_has_more_consistency() {
563        let mut stream = make_stream(3, 1, true);
564        // Before consuming: remaining=3, has_more=true
565        assert_eq!(stream.remaining(), 3);
566        assert!(stream.has_more());
567        // Consume 1: remaining=2, has_more=true
568        let _ = stream.next_row();
569        assert_eq!(stream.remaining(), 2);
570        assert!(stream.has_more());
571        // Consume all: remaining=0, has_more=false (finished=true)
572        let _ = stream.next_row();
573        let _ = stream.next_row();
574        assert_eq!(stream.remaining(), 0);
575        assert!(!stream.has_more());
576    }
577
578    #[test]
579    fn remaining_and_has_more_when_not_finished() {
580        let mut stream = make_stream(1, 1, false);
581        // Before consuming: remaining=1, has_more=true
582        assert_eq!(stream.remaining(), 1);
583        assert!(stream.has_more());
584        // After consuming last row: remaining=0, but has_more=true (server may have more)
585        let _ = stream.next_row();
586        assert_eq!(stream.remaining(), 0);
587        assert!(
588            stream.has_more(),
589            "unfinished stream should report has_more even with empty buffer"
590        );
591    }
592
593    // --- Drop behavior ---
594
595    #[test]
596    fn drop_finished_stream_does_not_panic() {
597        let mut stream = make_stream(1, 1, true);
598        let _ = stream.next_row();
599        // Stream is finished and consumed, drop should be clean
600        drop(stream);
601    }
602
603    #[test]
604    fn drop_unfinished_stream_does_not_panic() {
605        // Unfinished stream (guard=None so mark_discard is not called)
606        let stream = make_stream(5, 2, false);
607        drop(stream);
608    }
609
610    // --- advance with rows already available ---
611
612    #[tokio::test]
613    async fn advance_does_not_consume_row() {
614        let mut stream = make_stream(2, 1, true);
615        assert!(stream.advance().await.unwrap());
616        // advance should not advance position, just check availability
617        assert_eq!(stream.remaining(), 2);
618        // next_row consumes
619        let _ = stream.next_row();
620        assert_eq!(stream.remaining(), 1);
621    }
622}