bsql_core/stream.rs
1//! Streaming query results.
2//!
3//! [`QueryStream`] wraps a `RowStream` from tokio-postgres alongside the
4//! `PoolConnection` that produced it, keeping the connection alive for the
5//! lifetime of the stream. When the stream is dropped, the connection returns
6//! to the pool.
7//!
8//! Users consume the stream via `futures_core::Stream` (or `tokio_stream`).
9//! The generated `fetch_stream` method wraps this with typed row decoding.
10
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14use futures_core::Stream;
15use tokio_postgres::RowStream;
16
17use crate::error::{BsqlError, BsqlResult};
18use crate::pool::PoolConnection;
19
20/// A stream of `tokio_postgres::Row` values that keeps its connection alive.
21///
22/// Created by [`Pool::query_stream`](crate::pool::Pool::query_stream).
23/// Implements `Stream<Item = BsqlResult<tokio_postgres::Row>>`.
24///
25/// The `PoolConnection` is held until the stream is fully consumed or dropped,
26/// at which point it returns to the pool.
27pub struct QueryStream {
28 /// Held to keep the connection alive while streaming. Drops after `stream`.
29 _conn: PoolConnection,
30 /// The underlying row stream. Boxed because `RowStream` is `!Unpin`.
31 stream: Pin<Box<RowStream>>,
32}
33
34impl QueryStream {
35 /// Create a new `QueryStream` from a pool connection and a raw row stream.
36 ///
37 /// The connection must be the same one that produced the `RowStream`.
38 pub(crate) fn new(conn: PoolConnection, stream: RowStream) -> Self {
39 Self {
40 _conn: conn,
41 stream: Box::pin(stream),
42 }
43 }
44}
45
46impl Stream for QueryStream {
47 type Item = BsqlResult<tokio_postgres::Row>;
48
49 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50 self.stream
51 .as_mut()
52 .poll_next(cx)
53 .map(|opt| opt.map(|r| r.map_err(BsqlError::from)))
54 }
55
56 fn size_hint(&self) -> (usize, Option<usize>) {
57 // RowStream does not know total count upfront
58 (0, None)
59 }
60}
61
62#[cfg(test)]
63mod tests {
64 use super::*;
65
66 fn _assert_send<T: Send>() {}
67
68 #[test]
69 fn query_stream_is_send() {
70 _assert_send::<QueryStream>();
71 }
72}