Skip to main content

nautilus_connector/
row_stream.rs

1//! Shared row stream type for all database backends.
2//!
3//! All three backends (PostgreSQL, MySQL, SQLite) use the single [`RowStream`] type
4//! rather than three separate structs. Per-backend type aliases are provided to keep
5//! the public API stable and code at call sites readable.
6
7use crate::error::Result;
8use crate::Row;
9use futures::future::BoxFuture;
10use futures::stream::Stream;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13use tokio::sync::mpsc;
14
15/// A type-erased async stream of [`Row`] values.
16///
17/// Current connector implementations eagerly fetch database results and then
18/// expose those rows through this stream interface. The inner stream is
19/// heap-allocated and pinned, which is why implementing `Unpin` is safe here.
20pub struct RowStream<'conn> {
21    inner: Pin<Box<dyn Stream<Item = Result<Row>> + Send + 'conn>>,
22}
23
24impl<'conn> RowStream<'conn> {
25    /// Create a new `RowStream` wrapping a boxed async stream.
26    pub(crate) fn new_from_stream(
27        stream: Pin<Box<dyn Stream<Item = Result<Row>> + Send + 'conn>>,
28    ) -> Self {
29        Self { inner: stream }
30    }
31
32    /// Adapt a buffered rows future into the shared stream API.
33    pub(crate) fn from_rows_future(future: BoxFuture<'conn, Result<Vec<Row>>>) -> Self {
34        let stream = async_stream::stream! {
35            match future.await {
36                Ok(rows) => {
37                    for row in rows {
38                        yield Ok(row);
39                    }
40                }
41                Err(error) => yield Err(error),
42            }
43        };
44
45        Self::new_from_stream(Box::pin(stream))
46    }
47
48    /// Adapt an mpsc receiver into the shared stream API.
49    ///
50    /// Used by the streaming connector path: a background worker task owns the
51    /// database connection, drives the underlying sqlx stream, and forwards
52    /// rows through `rx`.
53    pub(crate) fn from_receiver(rx: mpsc::Receiver<Result<Row>>) -> Self {
54        let stream = async_stream::stream! {
55            let mut rx = rx;
56            while let Some(item) = rx.recv().await {
57                yield item;
58            }
59        };
60
61        Self::new_from_stream(Box::pin(stream))
62    }
63}
64
65/// Delegates `poll_next` to the inner boxed stream.
66impl Stream for RowStream<'_> {
67    type Item = Result<Row>;
68
69    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
70        self.inner.as_mut().poll_next(cx)
71    }
72}
73
74/// `RowStream` is `Unpin` because the inner stream is heap-allocated and pinned.
75impl<'conn> Unpin for RowStream<'conn> {}