tower_http/on_early_drop/
future.rs1use crate::on_early_drop::body::OnEarlyDropBody;
6use crate::on_early_drop::guard::OnEarlyDropGuard;
7use crate::on_early_drop::traits::{OnBodyDrop, OnDropCallback};
8use http::Response;
9use pin_project_lite::pin_project;
10use std::future::Future;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14pin_project! {
15 pub struct OnEarlyDropFuture<F, OBD, ReqB, FC, BC>
19 where
20 OBD: OnBodyDrop<ReqB, Callback = BC>,
21 FC: OnDropCallback,
22 BC: OnDropCallback,
23 {
24 #[pin]
25 inner: F,
26 future_guard: Option<OnEarlyDropGuard<FC>>,
28 intermediate: Option<OBD::Intermediate>,
30 on_body_drop: Option<OBD>,
32 _phantom: std::marker::PhantomData<fn(ReqB)>,
34 }
35}
36
37impl<F, OBD, ReqB, FC, BC> OnEarlyDropFuture<F, OBD, ReqB, FC, BC>
38where
39 OBD: OnBodyDrop<ReqB, Callback = BC>,
40 FC: OnDropCallback,
41 BC: OnDropCallback,
42{
43 pub(crate) fn new(
44 inner: F,
45 future_callback: FC,
46 on_body_drop: OBD,
47 intermediate: OBD::Intermediate,
48 ) -> Self {
49 Self {
50 inner,
51 future_guard: Some(OnEarlyDropGuard::new(future_callback)),
52 intermediate: Some(intermediate),
53 on_body_drop: Some(on_body_drop),
54 _phantom: std::marker::PhantomData,
55 }
56 }
57}
58
59impl<F, OBD, B, E, ReqB, FC, BC> Future for OnEarlyDropFuture<F, OBD, ReqB, FC, BC>
60where
61 F: Future<Output = Result<Response<B>, E>>,
62 OBD: OnBodyDrop<ReqB, Callback = BC>,
63 FC: OnDropCallback,
64 BC: OnDropCallback,
65 B: http_body::Body,
66{
67 type Output = Result<Response<OnEarlyDropBody<B, BC>>, E>;
68
69 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
70 let this = self.project();
71
72 let result = match this.inner.poll(cx) {
73 Poll::Ready(result) => result,
74 Poll::Pending => return Poll::Pending,
75 };
76
77 if let Some(guard) = this.future_guard.as_mut() {
79 guard.completed();
80 }
81 *this.future_guard = None;
82
83 match result {
84 Ok(response) => {
85 let (parts, body) = response.into_parts();
86 let intermediate = this
87 .intermediate
88 .take()
89 .expect("intermediate already consumed; OnEarlyDropFuture polled after Ready");
90 let mut hook = this
91 .on_body_drop
92 .take()
93 .expect("on_body_drop already consumed; OnEarlyDropFuture polled after Ready");
94 let callback = hook.make_at_response(intermediate, &parts);
95 let wrapped_body = OnEarlyDropBody::new(body, callback);
96 Poll::Ready(Ok(Response::from_parts(parts, wrapped_body)))
97 }
98 Err(err) => Poll::Ready(Err(err)),
99 }
100 }
101}