1use std::{
4 future::Future,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9use futures_core::ready;
10use pin_project_lite::pin_project;
11
12use super::{error::Closed, message};
13
14pin_project! {
15 #[derive(Debug)]
17 pub struct ResponseFuture<T> {
18 #[pin]
19 state: ResponseState<T>,
20 }
21}
22
23pin_project! {
24 #[project = ResponseStateProj]
25 #[derive(Debug)]
26 enum ResponseState<T> {
27 Failed {
28 error: Option<crate::BoxError>,
29 },
30 Rx {
31 #[pin]
32 rx: message::Rx<T>,
33 },
34 Poll {
35 #[pin]
36 fut: T,
37 },
38 }
39}
40
41impl<T> ResponseFuture<T> {
42 pub(crate) fn new(rx: message::Rx<T>) -> Self {
43 ResponseFuture {
44 state: ResponseState::Rx { rx },
45 }
46 }
47
48 pub(crate) fn failed(err: crate::BoxError) -> Self {
49 ResponseFuture {
50 state: ResponseState::Failed { error: Some(err) },
51 }
52 }
53}
54
55impl<F, T, E> Future for ResponseFuture<F>
56where
57 F: Future<Output = Result<T, E>>,
58 E: Into<crate::BoxError>,
59{
60 type Output = Result<T, crate::BoxError>;
61
62 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
63 let mut this = self.project();
64
65 loop {
73 match this.state.as_mut().project() {
74 ResponseStateProj::Failed { error } => {
75 return Poll::Ready(Err(error.take().expect("polled after error")));
76 }
77 ResponseStateProj::Rx { rx } => match ready!(rx.poll(cx)) {
78 Ok(Ok(fut)) => this.state.set(ResponseState::Poll { fut }),
79 Ok(Err(e)) => return Poll::Ready(Err(e.into())),
80 Err(_) => return Poll::Ready(Err(Closed::new().into())),
81 },
82 ResponseStateProj::Poll { fut } => return fut.poll(cx).map_err(Into::into),
83 }
84 }
85 }
86}