diesel_async/run_query_dsl/
utils.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use diesel::QueryResult;
6use futures_core::{ready, TryFuture, TryStream};
7use futures_util::{TryFutureExt, TryStreamExt};
8
9#[repr(transparent)]
17pub struct MapOk<F: TryFutureExt, T> {
18 future: futures_util::future::MapOk<F, fn(F::Ok) -> T>,
19}
20
21impl<F, T> Future for MapOk<F, T>
22where
23 F: TryFuture,
24 futures_util::future::MapOk<F, fn(F::Ok) -> T>: Future<Output = Result<T, F::Error>>,
25{
26 type Output = Result<T, F::Error>;
27
28 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
29 unsafe {
30 self.map_unchecked_mut(|s| &mut s.future)
33 }
34 .poll(cx)
35 }
36}
37
38impl<Fut: TryFutureExt, T> MapOk<Fut, T> {
39 pub(crate) fn new(future: Fut, f: fn(Fut::Ok) -> T) -> Self {
40 Self {
41 future: future.map_ok(f),
42 }
43 }
44}
45
46#[repr(transparent)]
48pub struct AndThen<F1: TryFuture, F2> {
49 future: futures_util::future::AndThen<F1, F2, fn(F1::Ok) -> F2>,
50}
51
52impl<Fut1, Fut2> AndThen<Fut1, Fut2>
53where
54 Fut1: TryFuture,
55 Fut2: TryFuture<Error = Fut1::Error>,
56{
57 pub(crate) fn new(fut1: Fut1, f: fn(Fut1::Ok) -> Fut2) -> AndThen<Fut1, Fut2> {
58 Self {
59 future: fut1.and_then(f),
60 }
61 }
62}
63
64impl<F1, F2> Future for AndThen<F1, F2>
65where
66 F1: TryFuture,
67 F2: TryFuture<Error = F1::Error>,
68{
69 type Output = Result<F2::Ok, F2::Error>;
70
71 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
72 unsafe {
73 self.map_unchecked_mut(|s| &mut s.future)
76 }
77 .poll(cx)
78 }
79}
80
81pub struct LoadNext<St> {
84 stream: Option<St>,
85}
86
87impl<St> LoadNext<St> {
88 pub(crate) fn new(stream: St) -> Self {
89 Self {
90 stream: Some(stream),
91 }
92 }
93}
94
95impl<St> Future for LoadNext<St>
96where
97 St: TryStream<Error = diesel::result::Error> + Unpin,
98{
99 type Output = QueryResult<St::Ok>;
100
101 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
102 let first = {
103 let s = self.stream.as_mut().expect("polling LoadNext twice");
104 ready!(s.try_poll_next_unpin(cx))
105 };
106 self.stream = None;
107 match first {
108 Some(first) => Poll::Ready(first),
109 None => Poll::Ready(Err(diesel::result::Error::NotFound)),
110 }
111 }
112}