diesel_async/run_query_dsl/
utils.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use diesel::QueryResult;
6use futures_core::{ready, TryFuture, TryStream};
7use futures_util::{TryFutureExt, TryStreamExt};
8
9// We use a custom future implementation here to erase some lifetimes
10// that otherwise need to be specified explicitly
11//
12// Specifying these lifetimes results in the compiler not beeing
13// able to look through the generic code and emit
14// lifetime erros for pipelined queries. See
15// https://github.com/weiznich/diesel_async/issues/249 for more context
16#[repr(transparent)]
17pub struct MapOk<F: TryFutureExt, T> {
18    future: futures_util::future::MapOk<F, fn(F::Ok) -> T>,
19}
20
21impl<F, T> Future for MapOk<F, T>
22where
23    F: TryFuture,
24    futures_util::future::MapOk<F, fn(F::Ok) -> T>: Future<Output = Result<T, F::Error>>,
25{
26    type Output = Result<T, F::Error>;
27
28    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
29        unsafe {
30            // SAFETY: This projects pinning to the only inner field, so it
31            // should be safe
32            self.map_unchecked_mut(|s| &mut s.future)
33        }
34        .poll(cx)
35    }
36}
37
38impl<Fut: TryFutureExt, T> MapOk<Fut, T> {
39    pub(crate) fn new(future: Fut, f: fn(Fut::Ok) -> T) -> Self {
40        Self {
41            future: future.map_ok(f),
42        }
43    }
44}
45
46// similar to `MapOk` above this mainly exists to hide the lifetime
47#[repr(transparent)]
48pub struct AndThen<F1: TryFuture, F2> {
49    future: futures_util::future::AndThen<F1, F2, fn(F1::Ok) -> F2>,
50}
51
52impl<Fut1, Fut2> AndThen<Fut1, Fut2>
53where
54    Fut1: TryFuture,
55    Fut2: TryFuture<Error = Fut1::Error>,
56{
57    pub(crate) fn new(fut1: Fut1, f: fn(Fut1::Ok) -> Fut2) -> AndThen<Fut1, Fut2> {
58        Self {
59            future: fut1.and_then(f),
60        }
61    }
62}
63
64impl<F1, F2> Future for AndThen<F1, F2>
65where
66    F1: TryFuture,
67    F2: TryFuture<Error = F1::Error>,
68{
69    type Output = Result<F2::Ok, F2::Error>;
70
71    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
72        unsafe {
73            // SAFETY: This projects pinning to the only inner field, so it
74            // should be safe
75            self.map_unchecked_mut(|s| &mut s.future)
76        }
77        .poll(cx)
78    }
79}
80
81/// Converts a stream into a future, only yielding the first element.
82/// Based on [`futures_util::stream::StreamFuture`].
83pub struct LoadNext<St> {
84    stream: Option<St>,
85}
86
87impl<St> LoadNext<St> {
88    pub(crate) fn new(stream: St) -> Self {
89        Self {
90            stream: Some(stream),
91        }
92    }
93}
94
95impl<St> Future for LoadNext<St>
96where
97    St: TryStream<Error = diesel::result::Error> + Unpin,
98{
99    type Output = QueryResult<St::Ok>;
100
101    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
102        let first = {
103            let s = self.stream.as_mut().expect("polling LoadNext twice");
104            ready!(s.try_poll_next_unpin(cx))
105        };
106        self.stream = None;
107        match first {
108            Some(first) => Poll::Ready(first),
109            None => Poll::Ready(Err(diesel::result::Error::NotFound)),
110        }
111    }
112}