use {
crate::{stream::sealed::Sealed, ExtractOwned},
futures_core::Stream,
pin_project::pin_project,
std::{
pin::Pin,
task::{Context, Poll},
},
tokio_postgres::{Error, RowStream},
};
#[cfg(test)]
mod tests;
#[pin_project]
pub struct ExtractStream<T>
where
T: ExtractOwned,
{
#[pin]
pub stream: RowStream,
columns: Option<T::Columns>,
}
impl<T> Stream for ExtractStream<T>
where
T: ExtractOwned,
{
type Item = Result<T, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let slf = self.project();
slf.stream
.poll_next(cx)
.map_ok(|row| T::extract(slf.columns, &row))
}
}
pub struct ExtractStreamMut<'a, T>
where
T: ExtractOwned,
{
pub stream: Pin<&'a mut RowStream>,
columns: Option<T::Columns>,
}
impl<'a, T> Stream for ExtractStreamMut<'a, T>
where
T: ExtractOwned,
{
type Item = Result<T, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let slf = self.get_mut();
slf.stream
.as_mut()
.poll_next(cx)
.map_ok(|row| T::extract(&mut slf.columns, &row))
}
}
pub trait RowStreamExtractExt: Sealed {
fn extract<T: ExtractOwned>(self) -> ExtractStream<T>;
fn extract_mut<T: ExtractOwned>(self: Pin<&mut Self>) -> ExtractStreamMut<'_, T>;
}
impl RowStreamExtractExt for RowStream {
fn extract<T: ExtractOwned>(self) -> ExtractStream<T> {
ExtractStream {
stream: self,
columns: None,
}
}
fn extract_mut<T: ExtractOwned>(self: Pin<&mut Self>) -> ExtractStreamMut<'_, T> {
ExtractStreamMut {
stream: self,
columns: None,
}
}
}
impl Sealed for RowStream {}
mod sealed {
pub trait Sealed {}
}