use crate::error::Result;
use crate::Row;
use futures::future::BoxFuture;
use futures::stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc;
pub struct RowStream<'conn> {
inner: Pin<Box<dyn Stream<Item = Result<Row>> + Send + 'conn>>,
}
impl<'conn> RowStream<'conn> {
pub(crate) fn new_from_stream(
stream: Pin<Box<dyn Stream<Item = Result<Row>> + Send + 'conn>>,
) -> Self {
Self { inner: stream }
}
pub(crate) fn from_rows_future(future: BoxFuture<'conn, Result<Vec<Row>>>) -> Self {
let stream = async_stream::stream! {
match future.await {
Ok(rows) => {
for row in rows {
yield Ok(row);
}
}
Err(error) => yield Err(error),
}
};
Self::new_from_stream(Box::pin(stream))
}
pub(crate) fn from_receiver(rx: mpsc::Receiver<Result<Row>>) -> Self {
let stream = async_stream::stream! {
let mut rx = rx;
while let Some(item) = rx.recv().await {
yield item;
}
};
Self::new_from_stream(Box::pin(stream))
}
}
impl Stream for RowStream<'_> {
type Item = Result<Row>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.inner.as_mut().poll_next(cx)
}
}
impl<'conn> Unpin for RowStream<'conn> {}