xitca_postgres/query/
stream.rs

1use core::{
2    future::Future,
3    marker::PhantomData,
4    ops::Range,
5    pin::Pin,
6    task::{Context, Poll, ready},
7};
8
9use std::sync::Arc;
10
11use fallible_iterator::FallibleIterator;
12use postgres_protocol::message::backend;
13
14use crate::{
15    column::Column,
16    driver::codec::Response,
17    error::Error,
18    iter::AsyncLendingIterator,
19    prepare::Prepare,
20    row::{Row, RowOwned, RowSimple, RowSimpleOwned, marker},
21    types::Type,
22};
23
24#[derive(Debug)]
25pub struct GenericRowStream<C, M> {
26    pub(crate) res: Response,
27    pub(crate) ranges: Vec<Range<usize>>,
28    pub(crate) col: C,
29    pub(crate) _marker: PhantomData<M>,
30}
31
32impl<C, M> GenericRowStream<C, M>
33where
34    C: AsRef<[Column]>,
35{
36    pub(crate) fn new(res: Response, col: C) -> Self {
37        Self {
38            res,
39            ranges: Vec::with_capacity(col.as_ref().len()),
40            col,
41            _marker: PhantomData,
42        }
43    }
44}
45
46/// A stream of table rows.
47pub type RowStream<'a> = GenericRowStream<&'a [Column], marker::Typed>;
48
49impl AsyncLendingIterator for RowStream<'_> {
50    type Ok<'i>
51        = Row<'i>
52    where
53        Self: 'i;
54    type Err = Error;
55
56    #[inline]
57    fn try_next(&mut self) -> impl Future<Output = Result<Option<Self::Ok<'_>>, Self::Err>> + Send {
58        try_next(&mut self.res, self.col, &mut self.ranges)
59    }
60}
61
62async fn try_next<'r>(
63    res: &mut Response,
64    col: &'r [Column],
65    ranges: &'r mut Vec<Range<usize>>,
66) -> Result<Option<Row<'r>>, Error> {
67    loop {
68        match res.recv().await? {
69            backend::Message::DataRow(body) => return Row::try_new(col, body, ranges).map(Some),
70            backend::Message::BindComplete
71            | backend::Message::EmptyQueryResponse
72            | backend::Message::CommandComplete(_)
73            | backend::Message::PortalSuspended => {}
74            backend::Message::ReadyForQuery(_) => return Ok(None),
75            _ => return Err(Error::unexpected()),
76        }
77    }
78}
79
80/// [`RowStream`] with static lifetime
81///
82/// # Usage
83/// due to Rust's GAT limitation [`AsyncLendingIterator`] only works well type that have static lifetime.
84/// actively converting a [`RowStream`] to [`RowStreamOwned`] will opens up convenient high level APIs at some additional
85/// cost (More memory allocation)
86///
87/// # Examples
88/// ```
89/// # use xitca_postgres::{iter::{AsyncLendingIterator, AsyncLendingIteratorExt}, Client, Error, Execute, RowStreamOwned, Statement};
90/// # async fn collect(cli: Client) -> Result<(), Error> {
91/// // prepare statement and query for some users from database.
92/// let stmt = Statement::named("SELECT * FROM users", &[]).execute(&cli).await?;
93/// let mut stream = stmt.query(&cli).await?;
94///
95/// // assuming users contain name column where it can be parsed to string.
96/// // then collecting all user name to a collection
97/// let mut strings = Vec::new();
98/// while let Some(row) = stream.try_next().await? {
99///     let name = row.get::<String>("name");
100///     strings.push(name);
101/// }
102///
103/// // the same operation with owned row stream can be simplified a bit:
104/// let stream = stmt.query(&cli).await?;
105/// // use extended api on top of AsyncIterator to collect user names to collection
106/// let strings_2: Vec<String> = RowStreamOwned::from(stream).map_ok(|row| row.get("name")).try_collect().await?;
107/// assert_eq!(strings, strings_2);
108///
109/// // there is also an owned version of statement query that can produce owned row stream directly with zero copy.
110/// // it's slightly cheaper than the from conversion showed above.
111/// let strings_3: Vec<String> = stmt.bind::<[i8; 0]>([]).into_owned().query(&cli).await?.map_ok(|row| row.get("name")).try_collect().await?;
112/// assert_eq!(strings, strings_3);
113/// # Ok(())
114/// # }
115/// ```
116pub type RowStreamOwned = GenericRowStream<Arc<[Column]>, marker::Typed>;
117
118impl From<RowStream<'_>> for RowStreamOwned {
119    fn from(stream: RowStream<'_>) -> Self {
120        Self {
121            res: stream.res,
122            col: Arc::from(stream.col),
123            ranges: stream.ranges,
124            _marker: PhantomData,
125        }
126    }
127}
128
129impl AsyncLendingIterator for RowStreamOwned {
130    type Ok<'i>
131        = Row<'i>
132    where
133        Self: 'i;
134    type Err = Error;
135
136    #[inline]
137    fn try_next(&mut self) -> impl Future<Output = Result<Option<Self::Ok<'_>>, Self::Err>> + Send {
138        try_next(&mut self.res, &self.col, &mut self.ranges)
139    }
140}
141
142impl IntoIterator for RowStream<'_> {
143    type Item = Result<RowOwned, Error>;
144    type IntoIter = RowStreamOwned;
145
146    fn into_iter(self) -> Self::IntoIter {
147        RowStreamOwned::from(self)
148    }
149}
150
151impl Iterator for RowStreamOwned {
152    type Item = Result<RowOwned, Error>;
153
154    fn next(&mut self) -> Option<Self::Item> {
155        loop {
156            match self.res.blocking_recv() {
157                Ok(msg) => match msg {
158                    backend::Message::DataRow(body) => {
159                        return Some(RowOwned::try_new_owned(&self.col, body));
160                    }
161                    backend::Message::BindComplete
162                    | backend::Message::EmptyQueryResponse
163                    | backend::Message::CommandComplete(_)
164                    | backend::Message::PortalSuspended => {}
165                    backend::Message::ReadyForQuery(_) => return None,
166                    _ => return Some(Err(Error::unexpected())),
167                },
168                Err(e) => return Some(Err(e)),
169            }
170        }
171    }
172}
173
174/// A stream of simple query results.
175pub type RowSimpleStream = GenericRowStream<Vec<Column>, marker::NoTyped>;
176
177impl AsyncLendingIterator for RowSimpleStream {
178    type Ok<'i>
179        = RowSimple<'i>
180    where
181        Self: 'i;
182    type Err = Error;
183
184    async fn try_next(&mut self) -> Result<Option<Self::Ok<'_>>, Self::Err> {
185        loop {
186            match self.res.recv().await? {
187                backend::Message::RowDescription(body) => {
188                    self.col = body
189                        .fields()
190                        // text type is used to match RowSimple::try_get's implementation
191                        // where column's pg type is always assumed as Option<&str>.
192                        // (no runtime pg type check so this does not really matter. it's
193                        // better to keep the type consistent though)
194                        .map(|f| Ok(Column::new(f.name(), Type::TEXT)))
195                        .collect::<Vec<_>>()?;
196                }
197                backend::Message::DataRow(body) => {
198                    return RowSimple::try_new(&self.col, body, &mut self.ranges).map(Some);
199                }
200                backend::Message::CommandComplete(_) | backend::Message::EmptyQueryResponse => {}
201                backend::Message::ReadyForQuery(_) => return Ok(None),
202                _ => return Err(Error::unexpected()),
203            }
204        }
205    }
206}
207
208/// [`RowSimpleStreamOwned`] with static lifetime
209pub type RowSimpleStreamOwned = GenericRowStream<Arc<[Column]>, marker::NoTyped>;
210
211impl From<RowSimpleStream> for RowSimpleStreamOwned {
212    fn from(stream: RowSimpleStream) -> Self {
213        Self {
214            res: stream.res,
215            col: stream.col.into(),
216            ranges: stream.ranges,
217            _marker: PhantomData,
218        }
219    }
220}
221
222impl IntoIterator for RowSimpleStream {
223    type IntoIter = RowSimpleStreamOwned;
224    type Item = Result<RowSimpleOwned, Error>;
225
226    fn into_iter(self) -> Self::IntoIter {
227        RowSimpleStreamOwned::from(self)
228    }
229}
230
231impl Iterator for RowSimpleStreamOwned {
232    type Item = Result<RowSimpleOwned, Error>;
233
234    fn next(&mut self) -> Option<Self::Item> {
235        loop {
236            match self.res.blocking_recv() {
237                Ok(msg) => match msg {
238                    backend::Message::RowDescription(body) => match body
239                        .fields()
240                        .map(|f| Ok(Column::new(f.name(), Type::TEXT)))
241                        .collect::<Vec<_>>()
242                    {
243                        Ok(col) => self.col = col.into(),
244                        Err(e) => return Some(Err(Error::from(e))),
245                    },
246                    backend::Message::DataRow(body) => {
247                        return Some(RowSimpleOwned::try_new_owned(&self.col, body));
248                    }
249                    backend::Message::CommandComplete(_)
250                    | backend::Message::EmptyQueryResponse
251                    | backend::Message::ReadyForQuery(_) => return None,
252                    _ => return Some(Err(Error::unexpected())),
253                },
254                Err(e) => return Some(Err(e)),
255            }
256        }
257    }
258}
259
260/// a stream of table rows where column type looked up and row parsing are bundled together
261pub struct RowStreamGuarded<'a, C> {
262    pub(crate) res: Response,
263    pub(crate) col: Vec<Column>,
264    pub(crate) ranges: Vec<Range<usize>>,
265    pub(crate) cli: &'a C,
266}
267
268impl<'a, C> RowStreamGuarded<'a, C> {
269    pub(crate) fn new(res: Response, cli: &'a C) -> Self {
270        Self {
271            res,
272            col: Vec::new(),
273            ranges: Vec::new(),
274            cli,
275        }
276    }
277}
278
279impl<C> AsyncLendingIterator for RowStreamGuarded<'_, C>
280where
281    C: Prepare + Sync,
282{
283    type Ok<'i>
284        = Row<'i>
285    where
286        Self: 'i;
287    type Err = Error;
288
289    async fn try_next(&mut self) -> Result<Option<Self::Ok<'_>>, Self::Err> {
290        loop {
291            match self.res.recv().await? {
292                backend::Message::RowDescription(body) => {
293                    let mut it = body.fields();
294                    while let Some(field) = it.next()? {
295                        let ty = self.cli._get_type(field.type_oid()).await?;
296                        self.col.push(Column::new(field.name(), ty));
297                    }
298                }
299                backend::Message::DataRow(body) => return Row::try_new(&self.col, body, &mut self.ranges).map(Some),
300                backend::Message::ParseComplete
301                | backend::Message::BindComplete
302                | backend::Message::ParameterDescription(_)
303                | backend::Message::EmptyQueryResponse
304                | backend::Message::CommandComplete(_)
305                | backend::Message::PortalSuspended
306                | backend::Message::NoData => {}
307                backend::Message::ReadyForQuery(_) => return Ok(None),
308                _ => return Err(Error::unexpected()),
309            }
310        }
311    }
312}
313
314pub struct RowStreamGuardedOwned<'a, C> {
315    res: Response,
316    col: Arc<[Column]>,
317    cli: &'a C,
318}
319
320impl<'a, C> From<RowStreamGuarded<'a, C>> for RowStreamGuardedOwned<'a, C> {
321    fn from(stream: RowStreamGuarded<'a, C>) -> Self {
322        Self {
323            res: stream.res,
324            col: stream.col.into(),
325            cli: stream.cli,
326        }
327    }
328}
329
330impl<'a, C> IntoIterator for RowStreamGuarded<'a, C>
331where
332    C: Prepare,
333{
334    type Item = Result<RowOwned, Error>;
335    type IntoIter = RowStreamGuardedOwned<'a, C>;
336
337    fn into_iter(self) -> Self::IntoIter {
338        RowStreamGuardedOwned::from(self)
339    }
340}
341
342impl<C> Iterator for RowStreamGuardedOwned<'_, C>
343where
344    C: Prepare,
345{
346    type Item = Result<RowOwned, Error>;
347
348    fn next(&mut self) -> Option<Self::Item> {
349        loop {
350            match self.res.blocking_recv() {
351                Ok(msg) => match msg {
352                    backend::Message::RowDescription(body) => {
353                        match body
354                            .fields()
355                            .map_err(Error::from)
356                            .map(|f| {
357                                let ty = self.cli._get_type_blocking(f.type_oid())?;
358                                Ok(Column::new(f.name(), ty))
359                            })
360                            .collect::<Vec<_>>()
361                        {
362                            Ok(col) => self.col = col.into(),
363                            Err(e) => return Some(Err(e)),
364                        }
365                    }
366                    backend::Message::DataRow(body) => {
367                        return Some(RowOwned::try_new_owned(&self.col, body));
368                    }
369                    backend::Message::ParseComplete
370                    | backend::Message::BindComplete
371                    | backend::Message::ParameterDescription(_)
372                    | backend::Message::EmptyQueryResponse
373                    | backend::Message::CommandComplete(_)
374                    | backend::Message::PortalSuspended => {}
375                    backend::Message::NoData | backend::Message::ReadyForQuery(_) => return None,
376                    _ => return Some(Err(Error::unexpected())),
377                },
378                Err(e) => return Some(Err(e)),
379            }
380        }
381    }
382}
383
384/// collect how many hows has been modified in async
385#[must_use = "futures do nothing unless you `.await` or poll them"]
386pub struct RowAffected {
387    res: Response,
388    rows_affected: u64,
389}
390
391impl Future for RowAffected {
392    type Output = Result<u64, Error>;
393
394    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
395        let this = self.get_mut();
396        ready!(this.res.poll_try_into_ready(&mut this.rows_affected, cx))?;
397        Poll::Ready(Ok(this.rows_affected))
398    }
399}
400
401impl RowAffected {
402    pub(crate) fn wait(self) -> Result<u64, Error> {
403        self.res.try_into_row_affected_blocking()
404    }
405}
406
407impl<C, M> From<GenericRowStream<C, M>> for RowAffected {
408    fn from(stream: GenericRowStream<C, M>) -> Self {
409        Self {
410            res: stream.res,
411            rows_affected: 0,
412        }
413    }
414}
415
416impl<C> From<RowStreamGuarded<'_, C>> for RowAffected {
417    fn from(stream: RowStreamGuarded<'_, C>) -> Self {
418        Self {
419            res: stream.res,
420            rows_affected: 0,
421        }
422    }
423}