tokio_postgres_extractor/stream.rs
1//! Extension traits for working with [`RowStream`]s.
2//!
3//! Using these extension traits, [`RowStream`]s can be turned into [`Stream`]s producing
4//! [`Extract`][crate::Extract]able types.
5//!
6//! # Examples
7//!
8//! ```
9//! # use futures_util::TryStreamExt;
10//! # use tokio_postgres::{Error, Row, RowStream};
11//! # use tokio_postgres_extractor::{Columns, Extract};
12//! # use tokio_postgres_extractor::stream::RowStreamExtractExt;
13//! #[derive(Columns, Extract)]
14//! struct User {
15//! id: i32,
16//! name: String,
17//! }
18//!
19//! async fn extract_users(i: RowStream) -> Result<Vec<User>, Error> {
20//! i.extract().try_collect().await
21//! }
22//! ```
23
24use {
25 crate::{stream::sealed::Sealed, ExtractOwned},
26 futures_core::Stream,
27 pin_project::pin_project,
28 std::{
29 pin::Pin,
30 task::{Context, Poll},
31 },
32 tokio_postgres::{Error, RowStream},
33};
34
35#[cfg(test)]
36mod tests;
37
38/// A [`Stream`] producing `T`s from a [`RowStream`].
39///
40/// Construct it using [`RowStreamExtractExt::extract`].
41///
42/// # Panics
43///
44/// The stream panics if [`Extract::extract`][crate::Extract::extract] panics.
45#[pin_project]
46pub struct ExtractStream<T>
47where
48 T: ExtractOwned,
49{
50 /// The underlying stream.
51 ///
52 /// This field is public for easier access.
53 #[pin]
54 pub stream: RowStream,
55 columns: Option<T::Columns>,
56}
57
58impl<T> Stream for ExtractStream<T>
59where
60 T: ExtractOwned,
61{
62 type Item = Result<T, Error>;
63
64 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
65 let slf = self.project();
66 slf.stream
67 .poll_next(cx)
68 .map_ok(|row| T::extract(slf.columns, &row))
69 }
70}
71
72/// A [`Stream`] producing `T`s from a [`Pin<&mut RowStream>`][RowStream].
73///
74/// Construct it using [`RowStreamExtractExt::extract_mut`].
75///
76/// # Panics
77///
78/// The stream panics if [`Extract::extract`][crate::Extract::extract] panics.
79pub struct ExtractStreamMut<'a, T>
80where
81 T: ExtractOwned,
82{
83 /// The underlying stream.
84 ///
85 /// This field is public for easier access.
86 pub stream: Pin<&'a mut RowStream>,
87 columns: Option<T::Columns>,
88}
89
90impl<'a, T> Stream for ExtractStreamMut<'a, T>
91where
92 T: ExtractOwned,
93{
94 type Item = Result<T, Error>;
95
96 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
97 let slf = self.get_mut();
98 slf.stream
99 .as_mut()
100 .poll_next(cx)
101 .map_ok(|row| T::extract(&mut slf.columns, &row))
102 }
103}
104
105/// Extension trait for extracting from a [`RowStream`].
106pub trait RowStreamExtractExt: Sealed {
107 /// Turns the [`RowStream`] into a [`Stream`] over `T`.
108 ///
109 /// # Examples
110 ///
111 /// ```
112 /// # use futures_util::TryStreamExt;
113 /// # use tokio_postgres::{Error, Row, RowStream};
114 /// # use tokio_postgres_extractor::{Columns, Extract};
115 /// # use tokio_postgres_extractor::stream::RowStreamExtractExt;
116 /// #[derive(Columns, Extract)]
117 /// struct User {
118 /// id: i32,
119 /// name: String,
120 /// }
121 ///
122 /// async fn extract_users(i: RowStream) -> Result<Vec<User>, Error> {
123 /// i.extract().try_collect().await
124 /// }
125 /// ```
126 fn extract<T: ExtractOwned>(self) -> ExtractStream<T>;
127
128 ///
129 /// # Examples
130 ///
131 /// ```
132 /// # use std::pin::Pin;
133 /// # use futures_util::TryStreamExt;
134 /// # use tokio_postgres::{Error, Row, RowStream};
135 /// # use tokio_postgres_extractor::{Columns, Extract};
136 /// # use tokio_postgres_extractor::stream::RowStreamExtractExt;
137 /// #[derive(Columns, Extract)]
138 /// struct User {
139 /// id: i32,
140 /// name: String,
141 /// }
142 ///
143 /// async fn extract_users(i: Pin<&mut RowStream>) -> Result<Vec<User>, Error> {
144 /// i.extract_mut().try_collect().await
145 /// }
146 /// ```
147 fn extract_mut<T: ExtractOwned>(self: Pin<&mut Self>) -> ExtractStreamMut<'_, T>;
148}
149
150impl RowStreamExtractExt for RowStream {
151 fn extract<T: ExtractOwned>(self) -> ExtractStream<T> {
152 ExtractStream {
153 stream: self,
154 columns: None,
155 }
156 }
157
158 fn extract_mut<T: ExtractOwned>(self: Pin<&mut Self>) -> ExtractStreamMut<'_, T> {
159 ExtractStreamMut {
160 stream: self,
161 columns: None,
162 }
163 }
164}
165
166impl Sealed for RowStream {}
167
168mod sealed {
169 pub trait Sealed {}
170}