diesel_async/run_query_dsl/
utils.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use diesel::QueryResult;
6use futures_core::{ready, TryFuture, TryStream};
7use futures_util::{TryFutureExt, TryStreamExt};
8use pin_project_lite::pin_project;
9
10pin_project! {
18 #[repr(transparent)]
19 pub struct MapOk<F: TryFutureExt, T> {
20 #[pin]
21 future: futures_util::future::MapOk<F, fn(F::Ok) -> T>,
22 }
23}
24
25impl<F, T> Future for MapOk<F, T>
26where
27 F: TryFuture,
28 futures_util::future::MapOk<F, fn(F::Ok) -> T>: Future<Output = Result<T, F::Error>>,
29{
30 type Output = Result<T, F::Error>;
31
32 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
33 self.project().future.poll(cx)
34 }
35}
36
37impl<Fut: TryFutureExt, T> MapOk<Fut, T> {
38 pub(crate) fn new(future: Fut, f: fn(Fut::Ok) -> T) -> Self {
39 Self {
40 future: future.map_ok(f),
41 }
42 }
43}
44
45pin_project! {
47 #[repr(transparent)]
48 pub struct AndThen<F1: TryFuture, F2> {
49 #[pin]
50 future: futures_util::future::AndThen<F1, F2, fn(F1::Ok) -> F2>,
51 }
52}
53
54impl<Fut1, Fut2> AndThen<Fut1, Fut2>
55where
56 Fut1: TryFuture,
57 Fut2: TryFuture<Error = Fut1::Error>,
58{
59 pub(crate) fn new(fut1: Fut1, f: fn(Fut1::Ok) -> Fut2) -> AndThen<Fut1, Fut2> {
60 Self {
61 future: fut1.and_then(f),
62 }
63 }
64}
65
66impl<F1, F2> Future for AndThen<F1, F2>
67where
68 F1: TryFuture,
69 F2: TryFuture<Error = F1::Error>,
70{
71 type Output = Result<F2::Ok, F2::Error>;
72
73 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
74 self.project().future.poll(cx)
75 }
76}
77
78pub struct LoadNext<St> {
81 stream: Option<St>,
82}
83
84impl<St> LoadNext<St> {
85 pub(crate) fn new(stream: St) -> Self {
86 Self {
87 stream: Some(stream),
88 }
89 }
90}
91
92impl<St> Future for LoadNext<St>
93where
94 St: TryStream<Error = diesel::result::Error> + Unpin,
95{
96 type Output = QueryResult<St::Ok>;
97
98 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
99 let first = {
100 let s = self.stream.as_mut().expect("polling LoadNext twice");
101 ready!(s.try_poll_next_unpin(cx))
102 };
103 self.stream = None;
104 match first {
105 Some(first) => Poll::Ready(first),
106 None => Poll::Ready(Err(diesel::result::Error::NotFound)),
107 }
108 }
109}