xitca_postgres/
iter.rs

1use core::future::Future;
2
3/// async streaming iterator with borrowed Item from Self.
4pub trait AsyncLendingIterator {
5    type Ok<'i>
6    where
7        Self: 'i;
8    type Err;
9
10    fn try_next(&mut self) -> impl Future<Output = Result<Option<Self::Ok<'_>>, Self::Err>> + Send;
11
12    #[inline]
13    fn size_hint(&self) -> (usize, Option<usize>) {
14        (0, None)
15    }
16}
17
18pub trait AsyncLendingIteratorExt: AsyncLendingIterator {
19    fn map_ok<F, O>(self, func: F) -> MapOk<Self, F>
20    where
21        F: Fn(Self::Ok<'_>) -> O,
22        Self: Sized,
23    {
24        MapOk { iter: self, func }
25    }
26
27    #[inline]
28    fn try_collect<T>(self) -> impl Future<Output = Result<T, Self::Err>> + Send
29    where
30        T: Default + for<'i> Extend<Self::Ok<'i>> + Send,
31        Self: Send + Sized,
32    {
33        self.try_collect_into(T::default())
34    }
35
36    fn try_collect_into<T>(mut self, mut collection: T) -> impl Future<Output = Result<T, Self::Err>> + Send
37    where
38        T: for<'i> Extend<Self::Ok<'i>> + Send,
39        Self: Send + Sized,
40    {
41        async move {
42            while let Some(item) = self.try_next().await? {
43                collection.extend([item]);
44            }
45            Ok(collection)
46        }
47    }
48}
49
50pub struct MapOk<I, F> {
51    iter: I,
52    func: F,
53}
54
55impl<I, F, O> AsyncLendingIterator for MapOk<I, F>
56where
57    I: AsyncLendingIterator + Send,
58    F: Fn(I::Ok<'_>) -> O + Send,
59    O: Send,
60{
61    type Ok<'i>
62        = O
63    where
64        Self: 'i;
65    type Err = I::Err;
66
67    async fn try_next(&mut self) -> Result<Option<Self::Ok<'_>>, Self::Err> {
68        let next = self.iter.try_next().await?;
69        Ok(next.map(|item| (self.func)(item)))
70    }
71
72    #[inline]
73    fn size_hint(&self) -> (usize, Option<usize>) {
74        self.iter.size_hint()
75    }
76}
77
78impl<I> AsyncLendingIteratorExt for I where I: AsyncLendingIterator {}
79
80async fn _try_collect_test(stream: crate::RowStreamOwned) -> Result<Vec<String>, crate::error::Error> {
81    stream.map_ok(|row| row.get(0)).try_collect::<Vec<_>>().await
82}