1use core::{
2 future::Future,
3 marker::PhantomData,
4 ops::Range,
5 pin::Pin,
6 task::{Context, Poll, ready},
7};
8
9use std::sync::Arc;
10
11use fallible_iterator::FallibleIterator;
12use postgres_protocol::message::backend;
13
14use crate::{
15 column::Column,
16 driver::codec::Response,
17 error::Error,
18 iter::AsyncLendingIterator,
19 prepare::Prepare,
20 row::{Row, RowOwned, RowSimple, RowSimpleOwned, marker},
21 types::Type,
22};
23
24#[derive(Debug)]
25pub struct GenericRowStream<C, M> {
26 pub(crate) res: Response,
27 pub(crate) ranges: Vec<Range<usize>>,
28 pub(crate) col: C,
29 pub(crate) _marker: PhantomData<M>,
30}
31
32impl<C, M> GenericRowStream<C, M>
33where
34 C: AsRef<[Column]>,
35{
36 pub(crate) fn new(res: Response, col: C) -> Self {
37 Self {
38 res,
39 ranges: Vec::with_capacity(col.as_ref().len()),
40 col,
41 _marker: PhantomData,
42 }
43 }
44}
45
46pub type RowStream<'a> = GenericRowStream<&'a [Column], marker::Typed>;
48
49impl AsyncLendingIterator for RowStream<'_> {
50 type Ok<'i>
51 = Row<'i>
52 where
53 Self: 'i;
54 type Err = Error;
55
56 #[inline]
57 fn try_next(&mut self) -> impl Future<Output = Result<Option<Self::Ok<'_>>, Self::Err>> + Send {
58 try_next(&mut self.res, self.col, &mut self.ranges)
59 }
60}
61
62async fn try_next<'r>(
63 res: &mut Response,
64 col: &'r [Column],
65 ranges: &'r mut Vec<Range<usize>>,
66) -> Result<Option<Row<'r>>, Error> {
67 loop {
68 match res.recv().await? {
69 backend::Message::DataRow(body) => return Row::try_new(col, body, ranges).map(Some),
70 backend::Message::BindComplete
71 | backend::Message::EmptyQueryResponse
72 | backend::Message::CommandComplete(_)
73 | backend::Message::PortalSuspended => {}
74 backend::Message::ReadyForQuery(_) => return Ok(None),
75 _ => return Err(Error::unexpected()),
76 }
77 }
78}
79
80pub type RowStreamOwned = GenericRowStream<Arc<[Column]>, marker::Typed>;
117
118impl From<RowStream<'_>> for RowStreamOwned {
119 fn from(stream: RowStream<'_>) -> Self {
120 Self {
121 res: stream.res,
122 col: Arc::from(stream.col),
123 ranges: stream.ranges,
124 _marker: PhantomData,
125 }
126 }
127}
128
129impl AsyncLendingIterator for RowStreamOwned {
130 type Ok<'i>
131 = Row<'i>
132 where
133 Self: 'i;
134 type Err = Error;
135
136 #[inline]
137 fn try_next(&mut self) -> impl Future<Output = Result<Option<Self::Ok<'_>>, Self::Err>> + Send {
138 try_next(&mut self.res, &self.col, &mut self.ranges)
139 }
140}
141
142impl IntoIterator for RowStream<'_> {
143 type Item = Result<RowOwned, Error>;
144 type IntoIter = RowStreamOwned;
145
146 fn into_iter(self) -> Self::IntoIter {
147 RowStreamOwned::from(self)
148 }
149}
150
151impl Iterator for RowStreamOwned {
152 type Item = Result<RowOwned, Error>;
153
154 fn next(&mut self) -> Option<Self::Item> {
155 loop {
156 match self.res.blocking_recv() {
157 Ok(msg) => match msg {
158 backend::Message::DataRow(body) => {
159 return Some(RowOwned::try_new_owned(&self.col, body));
160 }
161 backend::Message::BindComplete
162 | backend::Message::EmptyQueryResponse
163 | backend::Message::CommandComplete(_)
164 | backend::Message::PortalSuspended => {}
165 backend::Message::ReadyForQuery(_) => return None,
166 _ => return Some(Err(Error::unexpected())),
167 },
168 Err(e) => return Some(Err(e)),
169 }
170 }
171 }
172}
173
174pub type RowSimpleStream = GenericRowStream<Vec<Column>, marker::NoTyped>;
176
177impl AsyncLendingIterator for RowSimpleStream {
178 type Ok<'i>
179 = RowSimple<'i>
180 where
181 Self: 'i;
182 type Err = Error;
183
184 async fn try_next(&mut self) -> Result<Option<Self::Ok<'_>>, Self::Err> {
185 loop {
186 match self.res.recv().await? {
187 backend::Message::RowDescription(body) => {
188 self.col = body
189 .fields()
190 .map(|f| Ok(Column::new(f.name(), Type::TEXT)))
195 .collect::<Vec<_>>()?;
196 }
197 backend::Message::DataRow(body) => {
198 return RowSimple::try_new(&self.col, body, &mut self.ranges).map(Some);
199 }
200 backend::Message::CommandComplete(_) | backend::Message::EmptyQueryResponse => {}
201 backend::Message::ReadyForQuery(_) => return Ok(None),
202 _ => return Err(Error::unexpected()),
203 }
204 }
205 }
206}
207
208pub type RowSimpleStreamOwned = GenericRowStream<Arc<[Column]>, marker::NoTyped>;
210
211impl From<RowSimpleStream> for RowSimpleStreamOwned {
212 fn from(stream: RowSimpleStream) -> Self {
213 Self {
214 res: stream.res,
215 col: stream.col.into(),
216 ranges: stream.ranges,
217 _marker: PhantomData,
218 }
219 }
220}
221
222impl IntoIterator for RowSimpleStream {
223 type IntoIter = RowSimpleStreamOwned;
224 type Item = Result<RowSimpleOwned, Error>;
225
226 fn into_iter(self) -> Self::IntoIter {
227 RowSimpleStreamOwned::from(self)
228 }
229}
230
231impl Iterator for RowSimpleStreamOwned {
232 type Item = Result<RowSimpleOwned, Error>;
233
234 fn next(&mut self) -> Option<Self::Item> {
235 loop {
236 match self.res.blocking_recv() {
237 Ok(msg) => match msg {
238 backend::Message::RowDescription(body) => match body
239 .fields()
240 .map(|f| Ok(Column::new(f.name(), Type::TEXT)))
241 .collect::<Vec<_>>()
242 {
243 Ok(col) => self.col = col.into(),
244 Err(e) => return Some(Err(Error::from(e))),
245 },
246 backend::Message::DataRow(body) => {
247 return Some(RowSimpleOwned::try_new_owned(&self.col, body));
248 }
249 backend::Message::CommandComplete(_)
250 | backend::Message::EmptyQueryResponse
251 | backend::Message::ReadyForQuery(_) => return None,
252 _ => return Some(Err(Error::unexpected())),
253 },
254 Err(e) => return Some(Err(e)),
255 }
256 }
257 }
258}
259
260pub struct RowStreamGuarded<'a, C> {
262 pub(crate) res: Response,
263 pub(crate) col: Vec<Column>,
264 pub(crate) ranges: Vec<Range<usize>>,
265 pub(crate) cli: &'a C,
266}
267
268impl<'a, C> RowStreamGuarded<'a, C> {
269 pub(crate) fn new(res: Response, cli: &'a C) -> Self {
270 Self {
271 res,
272 col: Vec::new(),
273 ranges: Vec::new(),
274 cli,
275 }
276 }
277}
278
279impl<C> AsyncLendingIterator for RowStreamGuarded<'_, C>
280where
281 C: Prepare + Sync,
282{
283 type Ok<'i>
284 = Row<'i>
285 where
286 Self: 'i;
287 type Err = Error;
288
289 async fn try_next(&mut self) -> Result<Option<Self::Ok<'_>>, Self::Err> {
290 loop {
291 match self.res.recv().await? {
292 backend::Message::RowDescription(body) => {
293 let mut it = body.fields();
294 while let Some(field) = it.next()? {
295 let ty = self.cli._get_type(field.type_oid()).await?;
296 self.col.push(Column::new(field.name(), ty));
297 }
298 }
299 backend::Message::DataRow(body) => return Row::try_new(&self.col, body, &mut self.ranges).map(Some),
300 backend::Message::ParseComplete
301 | backend::Message::BindComplete
302 | backend::Message::ParameterDescription(_)
303 | backend::Message::EmptyQueryResponse
304 | backend::Message::CommandComplete(_)
305 | backend::Message::PortalSuspended
306 | backend::Message::NoData => {}
307 backend::Message::ReadyForQuery(_) => return Ok(None),
308 _ => return Err(Error::unexpected()),
309 }
310 }
311 }
312}
313
314pub struct RowStreamGuardedOwned<'a, C> {
315 res: Response,
316 col: Arc<[Column]>,
317 cli: &'a C,
318}
319
320impl<'a, C> From<RowStreamGuarded<'a, C>> for RowStreamGuardedOwned<'a, C> {
321 fn from(stream: RowStreamGuarded<'a, C>) -> Self {
322 Self {
323 res: stream.res,
324 col: stream.col.into(),
325 cli: stream.cli,
326 }
327 }
328}
329
330impl<'a, C> IntoIterator for RowStreamGuarded<'a, C>
331where
332 C: Prepare,
333{
334 type Item = Result<RowOwned, Error>;
335 type IntoIter = RowStreamGuardedOwned<'a, C>;
336
337 fn into_iter(self) -> Self::IntoIter {
338 RowStreamGuardedOwned::from(self)
339 }
340}
341
342impl<C> Iterator for RowStreamGuardedOwned<'_, C>
343where
344 C: Prepare,
345{
346 type Item = Result<RowOwned, Error>;
347
348 fn next(&mut self) -> Option<Self::Item> {
349 loop {
350 match self.res.blocking_recv() {
351 Ok(msg) => match msg {
352 backend::Message::RowDescription(body) => {
353 match body
354 .fields()
355 .map_err(Error::from)
356 .map(|f| {
357 let ty = self.cli._get_type_blocking(f.type_oid())?;
358 Ok(Column::new(f.name(), ty))
359 })
360 .collect::<Vec<_>>()
361 {
362 Ok(col) => self.col = col.into(),
363 Err(e) => return Some(Err(e)),
364 }
365 }
366 backend::Message::DataRow(body) => {
367 return Some(RowOwned::try_new_owned(&self.col, body));
368 }
369 backend::Message::ParseComplete
370 | backend::Message::BindComplete
371 | backend::Message::ParameterDescription(_)
372 | backend::Message::EmptyQueryResponse
373 | backend::Message::CommandComplete(_)
374 | backend::Message::PortalSuspended => {}
375 backend::Message::NoData | backend::Message::ReadyForQuery(_) => return None,
376 _ => return Some(Err(Error::unexpected())),
377 },
378 Err(e) => return Some(Err(e)),
379 }
380 }
381 }
382}
383
384#[must_use = "futures do nothing unless you `.await` or poll them"]
386pub struct RowAffected {
387 res: Response,
388 rows_affected: u64,
389}
390
391impl Future for RowAffected {
392 type Output = Result<u64, Error>;
393
394 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
395 let this = self.get_mut();
396 ready!(this.res.poll_try_into_ready(&mut this.rows_affected, cx))?;
397 Poll::Ready(Ok(this.rows_affected))
398 }
399}
400
401impl RowAffected {
402 pub(crate) fn wait(self) -> Result<u64, Error> {
403 self.res.try_into_row_affected_blocking()
404 }
405}
406
407impl<C, M> From<GenericRowStream<C, M>> for RowAffected {
408 fn from(stream: GenericRowStream<C, M>) -> Self {
409 Self {
410 res: stream.res,
411 rows_affected: 0,
412 }
413 }
414}
415
416impl<C> From<RowStreamGuarded<'_, C>> for RowAffected {
417 fn from(stream: RowStreamGuarded<'_, C>) -> Self {
418 Self {
419 res: stream.res,
420 rows_affected: 0,
421 }
422 }
423}