Skip to main content

bsql_core/
stream.rs

1//! Streaming query results.
2//!
3//! [`QueryStream`] wraps a `RowStream` from tokio-postgres alongside the
4//! `PoolConnection` that produced it, keeping the connection alive for the
5//! lifetime of the stream. When the stream is dropped, the connection returns
6//! to the pool.
7//!
8//! Users consume the stream via `futures_core::Stream` (or `tokio_stream`).
9//! The generated `fetch_stream` method wraps this with typed row decoding.
10
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14use futures_core::Stream;
15use tokio_postgres::RowStream;
16
17use crate::error::{BsqlError, BsqlResult};
18use crate::pool::PoolConnection;
19
20/// A stream of `tokio_postgres::Row` values that keeps its connection alive.
21///
22/// Created by [`Pool::query_stream`](crate::pool::Pool::query_stream).
23/// Implements `Stream<Item = BsqlResult<tokio_postgres::Row>>`.
24///
25/// The `PoolConnection` is held until the stream is fully consumed or dropped,
26/// at which point it returns to the pool.
27pub struct QueryStream {
28    /// Held to keep the connection alive while streaming. Drops after `stream`.
29    _conn: PoolConnection,
30    /// The underlying row stream. Boxed because `RowStream` is `!Unpin`.
31    stream: Pin<Box<RowStream>>,
32}
33
34impl QueryStream {
35    /// Create a new `QueryStream` from a pool connection and a raw row stream.
36    ///
37    /// The connection must be the same one that produced the `RowStream`.
38    pub(crate) fn new(conn: PoolConnection, stream: RowStream) -> Self {
39        Self {
40            _conn: conn,
41            stream: Box::pin(stream),
42        }
43    }
44}
45
46impl Stream for QueryStream {
47    type Item = BsqlResult<tokio_postgres::Row>;
48
49    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50        self.stream
51            .as_mut()
52            .poll_next(cx)
53            .map(|opt| opt.map(|r| r.map_err(BsqlError::from)))
54    }
55
56    fn size_hint(&self) -> (usize, Option<usize>) {
57        // RowStream does not know total count upfront
58        (0, None)
59    }
60}
61
62#[cfg(test)]
63mod tests {
64    use super::*;
65
66    fn _assert_send<T: Send>() {}
67
68    #[test]
69    fn query_stream_is_send() {
70        _assert_send::<QueryStream>();
71    }
72}