Skip to main content

diesel_async/run_query_dsl/
utils.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use diesel::QueryResult;
6use futures_core::{ready, TryFuture, TryStream};
7use futures_util::{TryFutureExt, TryStreamExt};
8use pin_project_lite::pin_project;
9
10// We use a custom future implementation here to erase some lifetimes
11// that otherwise need to be specified explicitly
12//
13// Specifying these lifetimes results in the compiler not being
14// able to look through the generic code and emit
15// lifetime erros for pipelined queries. See
16// https://github.com/weiznich/diesel_async/issues/249 for more context
17pin_project! {
18    #[repr(transparent)]
19    pub struct MapOk<F: TryFutureExt, T> {
20        #[pin]
21        future: futures_util::future::MapOk<F, fn(F::Ok) -> T>,
22    }
23}
24
25impl<F, T> Future for MapOk<F, T>
26where
27    F: TryFuture,
28    futures_util::future::MapOk<F, fn(F::Ok) -> T>: Future<Output = Result<T, F::Error>>,
29{
30    type Output = Result<T, F::Error>;
31
32    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
33        self.project().future.poll(cx)
34    }
35}
36
37impl<Fut: TryFutureExt, T> MapOk<Fut, T> {
38    pub(crate) fn new(future: Fut, f: fn(Fut::Ok) -> T) -> Self {
39        Self {
40            future: future.map_ok(f),
41        }
42    }
43}
44
45// similar to `MapOk` above this mainly exists to hide the lifetime
46pin_project! {
47    #[repr(transparent)]
48    pub struct AndThen<F1: TryFuture, F2> {
49        #[pin]
50        future: futures_util::future::AndThen<F1, F2, fn(F1::Ok) -> F2>,
51    }
52}
53
54impl<Fut1, Fut2> AndThen<Fut1, Fut2>
55where
56    Fut1: TryFuture,
57    Fut2: TryFuture<Error = Fut1::Error>,
58{
59    pub(crate) fn new(fut1: Fut1, f: fn(Fut1::Ok) -> Fut2) -> AndThen<Fut1, Fut2> {
60        Self {
61            future: fut1.and_then(f),
62        }
63    }
64}
65
66impl<F1, F2> Future for AndThen<F1, F2>
67where
68    F1: TryFuture,
69    F2: TryFuture<Error = F1::Error>,
70{
71    type Output = Result<F2::Ok, F2::Error>;
72
73    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
74        self.project().future.poll(cx)
75    }
76}
77
78/// Converts a stream into a future, only yielding the first element.
79/// Based on [`futures_util::stream::StreamFuture`].
80pub struct LoadNext<St> {
81    stream: Option<St>,
82}
83
84impl<St> LoadNext<St> {
85    pub(crate) fn new(stream: St) -> Self {
86        Self {
87            stream: Some(stream),
88        }
89    }
90}
91
92impl<St> Future for LoadNext<St>
93where
94    St: TryStream<Error = diesel::result::Error> + Unpin,
95{
96    type Output = QueryResult<St::Ok>;
97
98    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
99        let first = {
100            let s = self.stream.as_mut().expect("polling LoadNext twice");
101            ready!(s.try_poll_next_unpin(cx))
102        };
103        self.stream = None;
104        match first {
105            Some(first) => Poll::Ready(first),
106            None => Poll::Ready(Err(diesel::result::Error::NotFound)),
107        }
108    }
109}