tokio_postgres_extractor/
stream.rs

1//! Extension traits for working with [`RowStream`]s.
2//!
3//! Using these extension traits, [`RowStream`]s can be turned into [`Stream`]s producing
4//! [`Extract`][crate::Extract]able types.
5//!
6//! # Examples
7//!
8//! ```
9//! # use futures_util::TryStreamExt;
10//! # use tokio_postgres::{Error, Row, RowStream};
11//! # use tokio_postgres_extractor::{Columns, Extract};
12//! # use tokio_postgres_extractor::stream::RowStreamExtractExt;
13//! #[derive(Columns, Extract)]
14//! struct User {
15//!     id: i32,
16//!     name: String,
17//! }
18//!
19//! async fn extract_users(i: RowStream) -> Result<Vec<User>, Error> {
20//!     i.extract().try_collect().await
21//! }
22//! ```
23
24use {
25    crate::{stream::sealed::Sealed, ExtractOwned},
26    futures_core::Stream,
27    pin_project::pin_project,
28    std::{
29        pin::Pin,
30        task::{Context, Poll},
31    },
32    tokio_postgres::{Error, RowStream},
33};
34
35#[cfg(test)]
36mod tests;
37
38/// A [`Stream`] producing `T`s from a [`RowStream`].
39///
40/// Construct it using [`RowStreamExtractExt::extract`].
41///
42/// # Panics
43///
44/// The stream panics if [`Extract::extract`][crate::Extract::extract] panics.
45#[pin_project]
46pub struct ExtractStream<T>
47where
48    T: ExtractOwned,
49{
50    /// The underlying stream.
51    ///
52    /// This field is public for easier access.
53    #[pin]
54    pub stream: RowStream,
55    columns: Option<T::Columns>,
56}
57
58impl<T> Stream for ExtractStream<T>
59where
60    T: ExtractOwned,
61{
62    type Item = Result<T, Error>;
63
64    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65        let slf = self.project();
66        slf.stream
67            .poll_next(cx)
68            .map_ok(|row| T::extract(slf.columns, &row))
69    }
70}
71
72/// A [`Stream`] producing `T`s from a [`Pin<&mut RowStream>`][RowStream].
73///
74/// Construct it using [`RowStreamExtractExt::extract_mut`].
75///
76/// # Panics
77///
78/// The stream panics if [`Extract::extract`][crate::Extract::extract] panics.
79pub struct ExtractStreamMut<'a, T>
80where
81    T: ExtractOwned,
82{
83    /// The underlying stream.
84    ///
85    /// This field is public for easier access.
86    pub stream: Pin<&'a mut RowStream>,
87    columns: Option<T::Columns>,
88}
89
90impl<'a, T> Stream for ExtractStreamMut<'a, T>
91where
92    T: ExtractOwned,
93{
94    type Item = Result<T, Error>;
95
96    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
97        let slf = self.get_mut();
98        slf.stream
99            .as_mut()
100            .poll_next(cx)
101            .map_ok(|row| T::extract(&mut slf.columns, &row))
102    }
103}
104
105/// Extension trait for extracting from a [`RowStream`].
106pub trait RowStreamExtractExt: Sealed {
107    /// Turns the [`RowStream`] into a [`Stream`] over `T`.
108    ///
109    /// # Examples
110    ///
111    /// ```
112    /// # use futures_util::TryStreamExt;
113    /// # use tokio_postgres::{Error, Row, RowStream};
114    /// # use tokio_postgres_extractor::{Columns, Extract};
115    /// # use tokio_postgres_extractor::stream::RowStreamExtractExt;
116    /// #[derive(Columns, Extract)]
117    /// struct User {
118    ///     id: i32,
119    ///     name: String,
120    /// }
121    ///
122    /// async fn extract_users(i: RowStream) -> Result<Vec<User>, Error> {
123    ///     i.extract().try_collect().await
124    /// }
125    /// ```
126    fn extract<T: ExtractOwned>(self) -> ExtractStream<T>;
127
128    ///
129    /// # Examples
130    ///
131    /// ```
132    /// # use std::pin::Pin;
133    /// # use futures_util::TryStreamExt;
134    /// # use tokio_postgres::{Error, Row, RowStream};
135    /// # use tokio_postgres_extractor::{Columns, Extract};
136    /// # use tokio_postgres_extractor::stream::RowStreamExtractExt;
137    /// #[derive(Columns, Extract)]
138    /// struct User {
139    ///     id: i32,
140    ///     name: String,
141    /// }
142    ///
143    /// async fn extract_users(i: Pin<&mut RowStream>) -> Result<Vec<User>, Error> {
144    ///     i.extract_mut().try_collect().await
145    /// }
146    /// ```
147    fn extract_mut<T: ExtractOwned>(self: Pin<&mut Self>) -> ExtractStreamMut<'_, T>;
148}
149
150impl RowStreamExtractExt for RowStream {
151    fn extract<T: ExtractOwned>(self) -> ExtractStream<T> {
152        ExtractStream {
153            stream: self,
154            columns: None,
155        }
156    }
157
158    fn extract_mut<T: ExtractOwned>(self: Pin<&mut Self>) -> ExtractStreamMut<'_, T> {
159        ExtractStreamMut {
160            stream: self,
161            columns: None,
162        }
163    }
164}
165
166impl Sealed for RowStream {}
167
168mod sealed {
169    pub trait Sealed {}
170}