nautilus_connector/row_stream.rs
1//! Shared row stream type for all database backends.
2//!
3//! All three backends (PostgreSQL, MySQL, SQLite) use the single [`RowStream`] type
4//! rather than three separate structs. Per-backend type aliases are provided to keep
5//! the public API stable and code at call sites readable.
6
7use crate::error::Result;
8use crate::Row;
9use futures::future::BoxFuture;
10use futures::stream::Stream;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13use tokio::sync::mpsc;
14
15/// A type-erased async stream of [`Row`] values.
16///
17/// Current connector implementations eagerly fetch database results and then
18/// expose those rows through this stream interface. The inner stream is
19/// heap-allocated and pinned, which is why implementing `Unpin` is safe here.
20pub struct RowStream<'conn> {
21 inner: Pin<Box<dyn Stream<Item = Result<Row>> + Send + 'conn>>,
22}
23
24impl<'conn> RowStream<'conn> {
25 /// Create a new `RowStream` wrapping a boxed async stream.
26 pub(crate) fn new_from_stream(
27 stream: Pin<Box<dyn Stream<Item = Result<Row>> + Send + 'conn>>,
28 ) -> Self {
29 Self { inner: stream }
30 }
31
32 /// Adapt a buffered rows future into the shared stream API.
33 pub(crate) fn from_rows_future(future: BoxFuture<'conn, Result<Vec<Row>>>) -> Self {
34 let stream = async_stream::stream! {
35 match future.await {
36 Ok(rows) => {
37 for row in rows {
38 yield Ok(row);
39 }
40 }
41 Err(error) => yield Err(error),
42 }
43 };
44
45 Self::new_from_stream(Box::pin(stream))
46 }
47
48 /// Adapt an mpsc receiver into the shared stream API.
49 ///
50 /// Used by the streaming connector path: a background worker task owns the
51 /// database connection, drives the underlying sqlx stream, and forwards
52 /// rows through `rx`.
53 pub(crate) fn from_receiver(rx: mpsc::Receiver<Result<Row>>) -> Self {
54 let stream = async_stream::stream! {
55 let mut rx = rx;
56 while let Some(item) = rx.recv().await {
57 yield item;
58 }
59 };
60
61 Self::new_from_stream(Box::pin(stream))
62 }
63}
64
65/// Delegates `poll_next` to the inner boxed stream.
66impl Stream for RowStream<'_> {
67 type Item = Result<Row>;
68
69 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
70 self.inner.as_mut().poll_next(cx)
71 }
72}
73
74/// `RowStream` is `Unpin` because the inner stream is heap-allocated and pinned.
75impl<'conn> Unpin for RowStream<'conn> {}