Skip to main content

tower_http/on_early_drop/
future.rs

1//! Response future for [`OnEarlyDropService`].
2//!
3//! [`OnEarlyDropService`]: super::OnEarlyDropService
4
5use crate::on_early_drop::body::OnEarlyDropBody;
6use crate::on_early_drop::guard::OnEarlyDropGuard;
7use crate::on_early_drop::traits::{OnBodyDrop, OnDropCallback};
8use http::Response;
9use pin_project_lite::pin_project;
10use std::future::Future;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14pin_project! {
15    /// Response future for [`OnEarlyDropService`].
16    ///
17    /// [`OnEarlyDropService`]: super::OnEarlyDropService
18    pub struct OnEarlyDropFuture<F, OBD, ReqB, FC, BC>
19    where
20        OBD: OnBodyDrop<ReqB, Callback = BC>,
21        FC: OnDropCallback,
22        BC: OnDropCallback,
23    {
24        #[pin]
25        inner: F,
26        // `Some` while the inner future is pending.
27        future_guard: Option<OnEarlyDropGuard<FC>>,
28        // `Some` between call-time and response-ready time.
29        intermediate: Option<OBD::Intermediate>,
30        // Retained for `make_at_response`.
31        on_body_drop: Option<OBD>,
32        // `fn(ReqB)` keeps Send/Sync and other auto-traits independent of ReqB.
33        _phantom: std::marker::PhantomData<fn(ReqB)>,
34    }
35}
36
37impl<F, OBD, ReqB, FC, BC> OnEarlyDropFuture<F, OBD, ReqB, FC, BC>
38where
39    OBD: OnBodyDrop<ReqB, Callback = BC>,
40    FC: OnDropCallback,
41    BC: OnDropCallback,
42{
43    pub(crate) fn new(
44        inner: F,
45        future_callback: FC,
46        on_body_drop: OBD,
47        intermediate: OBD::Intermediate,
48    ) -> Self {
49        Self {
50            inner,
51            future_guard: Some(OnEarlyDropGuard::new(future_callback)),
52            intermediate: Some(intermediate),
53            on_body_drop: Some(on_body_drop),
54            _phantom: std::marker::PhantomData,
55        }
56    }
57}
58
59impl<F, OBD, B, E, ReqB, FC, BC> Future for OnEarlyDropFuture<F, OBD, ReqB, FC, BC>
60where
61    F: Future<Output = Result<Response<B>, E>>,
62    OBD: OnBodyDrop<ReqB, Callback = BC>,
63    FC: OnDropCallback,
64    BC: OnDropCallback,
65    B: http_body::Body,
66{
67    type Output = Result<Response<OnEarlyDropBody<B, BC>>, E>;
68
69    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
70        let this = self.project();
71
72        let result = match this.inner.poll(cx) {
73            Poll::Ready(result) => result,
74            Poll::Pending => return Poll::Pending,
75        };
76
77        // Inner resolved: suppress the future-drop guard for both Ok and Err.
78        if let Some(guard) = this.future_guard.as_mut() {
79            guard.completed();
80        }
81        *this.future_guard = None;
82
83        match result {
84            Ok(response) => {
85                let (parts, body) = response.into_parts();
86                let intermediate = this
87                    .intermediate
88                    .take()
89                    .expect("intermediate already consumed; OnEarlyDropFuture polled after Ready");
90                let mut hook = this
91                    .on_body_drop
92                    .take()
93                    .expect("on_body_drop already consumed; OnEarlyDropFuture polled after Ready");
94                let callback = hook.make_at_response(intermediate, &parts);
95                let wrapped_body = OnEarlyDropBody::new(body, callback);
96                Poll::Ready(Ok(Response::from_parts(parts, wrapped_body)))
97            }
98            Err(err) => Poll::Ready(Err(err)),
99        }
100    }
101}