xitca_postgres/
iter.rs

1use core::future::Future;
2
3/// async streaming iterator with borrowed Item from Self.
4pub trait AsyncLendingIterator {
5    type Ok<'i>
6    where
7        Self: 'i;
8    type Err;
9
10    fn try_next(&mut self) -> impl Future<Output = Result<Option<Self::Ok<'_>>, Self::Err>> + Send;
11
12    #[inline]
13    fn size_hint(&self) -> (usize, Option<usize>) {
14        (0, None)
15    }
16}
17
18impl<I> AsyncLendingIteratorExt for I where I: AsyncLendingIterator {}
19
20pub trait AsyncLendingIteratorExt: AsyncLendingIterator {
21    fn map_ok<F, O>(self, func: F) -> MapOk<Self, F>
22    where
23        F: Fn(Self::Ok<'_>) -> O,
24        Self: Sized,
25    {
26        MapOk { iter: self, func }
27    }
28
29    fn map_err<F, O>(self, func: F) -> MapErr<Self, F>
30    where
31        F: Fn(Self::Err) -> O,
32        Self: Sized,
33    {
34        MapErr { iter: self, func }
35    }
36
37    fn try_map<F, T, E>(self, func: F) -> Map<Self, F>
38    where
39        F: Fn(Result<Self::Ok<'_>, Self::Err>) -> Result<T, E>,
40        Self: Sized,
41    {
42        Map { iter: self, func }
43    }
44
45    fn try_collect<T>(self) -> impl Future<Output = Result<T, Self::Err>> + Send
46    where
47        T: Default + for<'i> Extend<Self::Ok<'i>> + Send,
48        Self: Send + Sized,
49    {
50        async {
51            let mut collection = T::default();
52            self.try_collect_into(&mut collection).await?;
53            Ok(collection)
54        }
55    }
56
57    fn try_collect_into<T>(mut self, collection: &mut T) -> impl Future<Output = Result<&mut T, Self::Err>> + Send
58    where
59        T: for<'i> Extend<Self::Ok<'i>> + Send,
60        Self: Send + Sized,
61    {
62        async move {
63            while let Some(item) = self.try_next().await? {
64                collection.extend(Some(item));
65            }
66            Ok(collection)
67        }
68    }
69}
70
71pub struct MapOk<I, F> {
72    iter: I,
73    func: F,
74}
75
76impl<I, F, O> AsyncLendingIterator for MapOk<I, F>
77where
78    I: AsyncLendingIterator + Send,
79    F: Fn(I::Ok<'_>) -> O + Send,
80    O: Send,
81{
82    type Ok<'i>
83        = O
84    where
85        Self: 'i;
86    type Err = I::Err;
87
88    async fn try_next(&mut self) -> Result<Option<Self::Ok<'_>>, Self::Err> {
89        let next = self.iter.try_next().await?;
90        Ok(next.map(|item| (self.func)(item)))
91    }
92
93    #[inline]
94    fn size_hint(&self) -> (usize, Option<usize>) {
95        self.iter.size_hint()
96    }
97}
98
99pub struct MapErr<I, F> {
100    iter: I,
101    func: F,
102}
103
104impl<I, F, O> AsyncLendingIterator for MapErr<I, F>
105where
106    I: AsyncLendingIterator + Send,
107    F: Fn(I::Err) -> O + Send,
108    O: Send,
109{
110    type Ok<'i>
111        = I::Ok<'i>
112    where
113        Self: 'i;
114    type Err = O;
115
116    async fn try_next(&mut self) -> Result<Option<Self::Ok<'_>>, Self::Err> {
117        self.iter.try_next().await.map_err(&self.func)
118    }
119
120    #[inline]
121    fn size_hint(&self) -> (usize, Option<usize>) {
122        self.iter.size_hint()
123    }
124}
125
126pub struct Map<I, F> {
127    iter: I,
128    func: F,
129}
130
131impl<I, F, T, E> AsyncLendingIterator for Map<I, F>
132where
133    I: AsyncLendingIterator + Send,
134    F: Fn(Result<I::Ok<'_>, I::Err>) -> Result<T, E> + Send,
135    T: Send,
136    E: Send,
137{
138    type Ok<'i>
139        = T
140    where
141        Self: 'i;
142    type Err = E;
143
144    async fn try_next(&mut self) -> Result<Option<Self::Ok<'_>>, Self::Err> {
145        match self.iter.try_next().await {
146            Ok(Some(t)) => (self.func)(Ok(t)).map(Some),
147            Ok(None) => Ok(None),
148            Err(e) => (self.func)(Err(e)).map(Some),
149        }
150    }
151
152    #[inline]
153    fn size_hint(&self) -> (usize, Option<usize>) {
154        self.iter.size_hint()
155    }
156}
157
158async fn _map_ok_err_try_collect(stream: crate::RowStreamOwned) -> Result<Vec<String>, crate::error::Error> {
159    stream
160        .map_ok(|row| row.get(0))
161        .map_err(|e| dbg!(e))
162        .try_collect::<Vec<_>>()
163        .await
164}
165
166async fn _connect_quictry_collect(stream: crate::RowStreamOwned) -> Result<Vec<String>, crate::error::Error> {
167    stream.try_map(|row| row?.try_get(0)).try_collect::<Vec<_>>().await
168}
169
170async fn _try_collect_into(stream: crate::RowStreamOwned) {
171    stream
172        .try_map(|row| row?.try_get::<i32>(0))
173        .try_collect_into(&mut Vec::new())
174        .await
175        .unwrap()
176        .iter()
177        .next();
178}