Skip to main content

bsql_core/
stream.rs

1//! True PG-level streaming query results.
2//!
3//! [`QueryStream`] uses the extended query protocol's `Execute(max_rows=N)`
4//! to fetch rows in chunks from PostgreSQL. Only one chunk is in memory at a
5//! time — the arena is reset between chunks.
6//!
7//! The connection is held for the lifetime of the stream. When the stream is
8//! dropped (whether fully consumed or not), the connection returns to the pool.
9//! If the stream is dropped mid-iteration, the connection is discarded (not
10//! returned to the pool) because the portal may still be open on the server.
11
12use std::sync::Arc;
13
14use bsql_driver_postgres::arena::release_arena;
15use bsql_driver_postgres::{Arena, ColumnDesc, QueryResult};
16
17/// Default chunk size for streaming queries.
18///
19/// 64 rows per Execute call balances network round-trip overhead against
20/// memory consumption. Each chunk is parsed into the arena, decoded into
21/// owned values, then the arena is recycled.
22const STREAM_CHUNK_SIZE: i32 = 64;
23
24/// A stream of rows backed by true PG-level chunked fetching.
25///
26/// Created by [`Pool::query_stream`](crate::pool::Pool::query_stream).
27///
28/// The `PoolGuard` is held until the stream is fully consumed or dropped.
29/// Rows are fetched in chunks of 64 via `Execute(max_rows=64)`.
30///
31/// # Usage
32///
33/// Use [`advance()`](QueryStream::advance) + [`next_row()`](QueryStream::next_row)
34/// for row-by-row async iteration:
35///
36/// ```rust,ignore
37/// let mut stream = pool.query_stream(sql, hash, &[]).await?;
38/// while stream.advance().await? {
39///     let row = stream.next_row().unwrap();
40///     let id: i32 = row.get_i32(0).unwrap();
41///     // decode before next advance() — row borrows from arena
42/// }
43/// ```
44pub struct QueryStream {
45    /// Held to keep the connection alive while streaming.
46    guard: Option<bsql_driver_postgres::PoolGuard>,
47    arena: Option<Arena>,
48    /// Current chunk's row metadata.
49    current_result: Option<QueryResult>,
50    /// Position within the current chunk.
51    position: usize,
52    /// Column descriptors (shared across all chunks via Arc).
53    /// Passed by reference to `QueryResult::from_parts` to avoid Arc
54    /// refcount increments per chunk.
55    columns: Arc<[ColumnDesc]>,
56    /// Whether all rows have been consumed from the server.
57    finished: bool,
58    /// Whether we need to send Execute+Sync before reading the next chunk.
59    /// True after the first chunk (since query_streaming_start already sent
60    /// the first Execute).
61    needs_execute: bool,
62}
63
64impl QueryStream {
65    /// Create a new `QueryStream`.
66    ///
67    /// `first_result` is the first chunk of rows (from the initial Execute).
68    /// `finished` is true if the first chunk was the only chunk (CommandComplete
69    /// received).
70    pub(crate) fn new(
71        guard: bsql_driver_postgres::PoolGuard,
72        arena: Arena,
73        first_result: QueryResult,
74        columns: Arc<[ColumnDesc]>,
75        finished: bool,
76    ) -> Self {
77        Self {
78            guard: Some(guard),
79            arena: Some(arena),
80            current_result: Some(first_result),
81            position: 0,
82            columns,
83            finished,
84            needs_execute: !finished, // if not finished, next call needs Execute+Sync
85        }
86    }
87
88    /// Get the next row from the current in-memory chunk.
89    ///
90    /// Returns `None` when the current chunk is exhausted. Call
91    /// [`fetch_next_chunk()`](QueryStream::fetch_next_chunk) to load more rows
92    /// from the server, or use [`try_next()`](QueryStream::try_next) which
93    /// handles chunk management automatically.
94    ///
95    /// Rows borrow from the arena, which is reset between chunks. Each row
96    /// must be fully decoded (into owned types) before calling `next_row()`
97    /// again. The generated code already does this — it decodes into owned
98    /// struct fields.
99    pub fn next_row(&mut self) -> Option<bsql_driver_postgres::Row<'_>> {
100        // Check if current chunk has more rows
101        if let Some(ref result) = self.current_result {
102            if self.position < result.len() {
103                let arena = self.arena.as_ref()?;
104                let row = result.row(self.position, arena);
105                self.position += 1;
106                return Some(row);
107            }
108        }
109
110        // Current chunk exhausted — cannot fetch more synchronously.
111        // The async fetch is done via `fetch_next_chunk()`.
112        None
113    }
114
115    /// Ensure the current chunk has rows available for `next_row()`.
116    ///
117    /// If the current chunk is exhausted but more rows exist on the server,
118    /// fetches the next chunk. Returns `true` if rows are available (call
119    /// `next_row()` next), `false` if all rows have been consumed.
120    ///
121    /// This is the async complement to `next_row()`. Together they form
122    /// the primary iteration pattern:
123    ///
124    /// ```rust,ignore
125    /// while stream.advance().await? {
126    ///     let row = stream.next_row().unwrap();
127    ///     let id: i32 = row.get_i32(0).unwrap();
128    ///     // decode before next advance() — row borrows from arena
129    /// }
130    /// ```
131    pub async fn advance(&mut self) -> Result<bool, crate::error::BsqlError> {
132        // Fast path: current chunk still has rows
133        if let Some(ref result) = self.current_result {
134            if self.position < result.len() {
135                return Ok(true);
136            }
137        }
138
139        // Current chunk exhausted
140        if self.finished {
141            return Ok(false);
142        }
143
144        // Fetch the next chunk
145        self.fetch_next_chunk().await?;
146
147        // Check if the new chunk has rows
148        if let Some(ref result) = self.current_result {
149            if self.position < result.len() {
150                return Ok(true);
151            }
152        }
153
154        Ok(false)
155    }
156
157    /// Whether more rows might be available (either in the current chunk or
158    /// from the server).
159    pub fn has_more(&self) -> bool {
160        if let Some(ref result) = self.current_result {
161            if self.position < result.len() {
162                return true;
163            }
164        }
165        !self.finished
166    }
167
168    /// Fetch the next chunk from the server asynchronously.
169    ///
170    /// Returns `true` if a new chunk was fetched (call `next_row()` to iterate
171    /// it). Returns `false` if all rows have been consumed.
172    ///
173    /// The arena is reset before fetching the new chunk, invalidating any
174    /// previous `Row` references. The generated code always decodes rows into
175    /// owned fields before calling this.
176    pub async fn fetch_next_chunk(&mut self) -> Result<bool, crate::error::BsqlError> {
177        if self.finished {
178            return Ok(false);
179        }
180
181        let guard = self.guard.as_mut().ok_or_else(|| {
182            crate::error::BsqlError::from(bsql_driver_postgres::DriverError::Pool(
183                "stream guard already taken".into(),
184            ))
185        })?;
186
187        let arena = self.arena.as_mut().ok_or_else(|| {
188            crate::error::BsqlError::from(bsql_driver_postgres::DriverError::Pool(
189                "stream arena already taken".into(),
190            ))
191        })?;
192
193        // Reset arena for the new chunk
194        arena.reset();
195
196        // Send Execute+Sync if needed (2nd+ chunks)
197        if self.needs_execute {
198            guard
199                .streaming_send_execute(STREAM_CHUNK_SIZE)
200                .await
201                .map_err(crate::error::BsqlError::from_driver_query)?;
202        }
203
204        let num_cols = self.columns.len();
205
206        // Reclaim the Vec from the previous chunk result to reuse its allocation.
207        let mut col_offsets = match self.current_result.as_mut() {
208            Some(result) => {
209                let mut v = result.take_col_offsets();
210                v.clear();
211                v
212            }
213            None => Vec::with_capacity(num_cols * STREAM_CHUNK_SIZE as usize),
214        };
215
216        let more = guard
217            .streaming_next_chunk(arena, &mut col_offsets)
218            .await
219            .map_err(crate::error::BsqlError::from_driver_query)?;
220
221        if !more {
222            self.finished = true;
223        }
224        self.needs_execute = more; // if more rows, next call needs Execute+Sync
225
226        if col_offsets.is_empty() && !more {
227            self.current_result = None;
228            self.position = 0;
229            return Ok(false);
230        }
231
232        // Pass Arc::clone of columns. The Arc is shared across all chunks —
233        // this is a single refcount increment per chunk, not per row.
234        self.current_result = Some(QueryResult::from_parts(
235            col_offsets,
236            num_cols,
237            Arc::clone(&self.columns),
238            0,
239        ));
240        self.position = 0;
241
242        Ok(true)
243    }
244
245    /// Number of remaining rows in the current chunk.
246    pub fn remaining(&self) -> usize {
247        match self.current_result {
248            Some(ref result) => result.len().saturating_sub(self.position),
249            None => 0,
250        }
251    }
252
253    /// Column descriptors for the result set.
254    pub fn columns(&self) -> &[ColumnDesc] {
255        &self.columns
256    }
257}
258
259impl Drop for QueryStream {
260    fn drop(&mut self) {
261        if let Some(arena) = self.arena.take() {
262            release_arena(arena);
263        }
264        // If the stream was not fully consumed, the connection is in an
265        // indeterminate protocol state (portal open, no ReadyForQuery sent).
266        // We cannot send Close+Sync in Drop (requires async I/O), so we
267        // mark the guard for discard to prevent it from being returned to
268        // the pool. The TCP disconnect causes PG to clean up the portal.
269        if !self.finished {
270            if let Some(mut guard) = self.guard.take() {
271                guard.mark_discard();
272                drop(guard);
273            }
274        }
275    }
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281    use bsql_driver_postgres::arena::acquire_arena;
282    use bsql_driver_postgres::{ColumnDesc, QueryResult};
283
284    /// Build a QueryResult with `num_rows` rows and the given columns.
285    /// Each cell is NULL (offset=0, len=-1) which is fine for structural tests.
286    fn make_result(num_rows: usize, columns: &Arc<[ColumnDesc]>) -> QueryResult {
287        let num_cols = columns.len();
288        let col_offsets = vec![(0usize, -1i32); num_rows * num_cols];
289        QueryResult::from_parts(col_offsets, num_cols, Arc::clone(columns), 0)
290    }
291
292    fn sample_columns(n: usize) -> Arc<[ColumnDesc]> {
293        (0..n)
294            .map(|i| ColumnDesc {
295                name: format!("col{i}").into(),
296                type_oid: 23,
297                type_size: 4,
298                table_oid: 0,
299                column_id: 0,
300            })
301            .collect::<Vec<_>>()
302            .into()
303    }
304
305    /// Build a QueryStream without a real PoolGuard.
306    /// guard=None means fetch_next_chunk will fail, but structural methods work.
307    fn make_stream(num_rows: usize, num_cols: usize, finished: bool) -> QueryStream {
308        let columns = sample_columns(num_cols);
309        let result = make_result(num_rows, &columns);
310        let arena = acquire_arena();
311        QueryStream {
312            guard: None,
313            arena: Some(arena),
314            current_result: Some(result),
315            position: 0,
316            columns,
317            finished,
318            needs_execute: !finished,
319        }
320    }
321
322    // --- next_row returns rows from buffer ---
323
324    #[test]
325    fn next_row_returns_rows() {
326        let mut stream = make_stream(3, 2, true);
327        assert!(stream.next_row().is_some());
328        assert!(stream.next_row().is_some());
329        assert!(stream.next_row().is_some());
330    }
331
332    #[test]
333    fn next_row_returns_none_when_exhausted() {
334        let mut stream = make_stream(2, 1, true);
335        assert!(stream.next_row().is_some());
336        assert!(stream.next_row().is_some());
337        assert!(stream.next_row().is_none());
338    }
339
340    #[test]
341    fn next_row_returns_none_for_empty_result() {
342        let mut stream = make_stream(0, 1, true);
343        assert!(stream.next_row().is_none());
344    }
345
346    // --- has_more ---
347
348    #[test]
349    fn has_more_true_when_rows_in_buffer() {
350        let stream = make_stream(2, 1, true);
351        assert!(stream.has_more());
352    }
353
354    #[test]
355    fn has_more_false_when_exhausted_and_finished() {
356        let mut stream = make_stream(1, 1, true);
357        let _ = stream.next_row();
358        assert!(!stream.has_more());
359    }
360
361    #[test]
362    fn has_more_true_when_exhausted_but_not_finished() {
363        let mut stream = make_stream(1, 1, false);
364        let _ = stream.next_row();
365        // Buffer exhausted but server may have more
366        assert!(stream.has_more());
367    }
368
369    // --- remaining ---
370
371    #[test]
372    fn remaining_full_buffer() {
373        let stream = make_stream(5, 2, true);
374        assert_eq!(stream.remaining(), 5);
375    }
376
377    #[test]
378    fn remaining_after_consuming() {
379        let mut stream = make_stream(3, 1, true);
380        let _ = stream.next_row();
381        assert_eq!(stream.remaining(), 2);
382        let _ = stream.next_row();
383        assert_eq!(stream.remaining(), 1);
384        let _ = stream.next_row();
385        assert_eq!(stream.remaining(), 0);
386    }
387
388    #[test]
389    fn remaining_empty_result() {
390        let stream = make_stream(0, 1, true);
391        assert_eq!(stream.remaining(), 0);
392    }
393
394    // --- columns ---
395
396    #[test]
397    fn columns_returns_descriptors() {
398        let stream = make_stream(1, 3, true);
399        let cols = stream.columns();
400        assert_eq!(cols.len(), 3);
401        assert_eq!(&*cols[0].name, "col0");
402        assert_eq!(&*cols[1].name, "col1");
403        assert_eq!(&*cols[2].name, "col2");
404    }
405
406    // --- finished flag ---
407
408    #[test]
409    fn finished_stream_has_more_false_after_drain() {
410        let mut stream = make_stream(1, 1, true);
411        let _ = stream.next_row();
412        assert!(!stream.has_more());
413    }
414
415    // --- fetch_next_chunk requires guard ---
416
417    #[tokio::test]
418    async fn fetch_next_chunk_without_guard_errors() {
419        let mut stream = make_stream(0, 1, false);
420        let result = stream.fetch_next_chunk().await;
421        assert!(result.is_err(), "should error without guard");
422    }
423
424    #[tokio::test]
425    async fn fetch_next_chunk_when_finished_returns_false() {
426        let mut stream = make_stream(0, 1, true);
427        let result = stream.fetch_next_chunk().await.unwrap();
428        assert!(!result, "finished stream should return false");
429    }
430
431    // --- advance ---
432
433    #[tokio::test]
434    async fn advance_returns_true_when_rows_available() {
435        let mut stream = make_stream(2, 1, true);
436        let has = stream.advance().await.unwrap();
437        assert!(has);
438    }
439
440    #[tokio::test]
441    async fn advance_returns_false_when_finished_and_exhausted() {
442        let mut stream = make_stream(1, 1, true);
443        let _ = stream.next_row(); // consume the one row
444        let has = stream.advance().await.unwrap();
445        assert!(!has);
446    }
447
448    // --- Drop releases arena ---
449
450    #[test]
451    fn drop_releases_arena() {
452        let stream = make_stream(3, 2, true);
453        drop(stream);
454        // If arena was released back to pool, acquire should succeed
455        let arena = acquire_arena();
456        bsql_driver_postgres::arena::release_arena(arena);
457    }
458
459    // --- fetch_next_chunk without arena errors ---
460
461    #[tokio::test]
462    async fn fetch_next_chunk_without_arena_errors() {
463        let columns = sample_columns(1);
464        let result = make_result(0, &columns);
465        let mut stream = QueryStream {
466            guard: None,
467            arena: None, // no arena
468            current_result: Some(result),
469            position: 0,
470            columns,
471            finished: false,
472            needs_execute: false,
473        };
474        let res = stream.fetch_next_chunk().await;
475        assert!(res.is_err(), "should error without arena");
476    }
477
478    // --- advance when not finished but fetch fails ---
479
480    #[tokio::test]
481    async fn advance_fetch_fails_propagates_error() {
482        // Stream with 0 rows, not finished, no guard -> advance triggers fetch -> error
483        let mut stream = make_stream(0, 1, false);
484        let res = stream.advance().await;
485        assert!(res.is_err(), "advance should propagate fetch error");
486    }
487
488    // --- remaining on None result ---
489
490    #[test]
491    fn remaining_with_none_result() {
492        let columns = sample_columns(1);
493        let arena = acquire_arena();
494        let stream = QueryStream {
495            guard: None,
496            arena: Some(arena),
497            current_result: None,
498            position: 0,
499            columns,
500            finished: true,
501            needs_execute: false,
502        };
503        assert_eq!(stream.remaining(), 0);
504    }
505
506    // --- has_more with None result and finished ---
507
508    #[test]
509    fn has_more_with_none_result_finished() {
510        let columns = sample_columns(1);
511        let arena = acquire_arena();
512        let stream = QueryStream {
513            guard: None,
514            arena: Some(arena),
515            current_result: None,
516            position: 0,
517            columns,
518            finished: true,
519            needs_execute: false,
520        };
521        assert!(!stream.has_more());
522    }
523
524    // --- columns returns correct count ---
525
526    #[test]
527    fn columns_zero_columns() {
528        let stream = make_stream(0, 0, true);
529        assert_eq!(stream.columns().len(), 0);
530    }
531}