Skip to main content

tower_http/on_early_drop/
body.rs

1//! Response body wrapper for [`OnEarlyDropService`].
2//!
3//! [`OnEarlyDropService`]: super::OnEarlyDropService
4
5use crate::on_early_drop::guard::OnEarlyDropGuard;
6use crate::on_early_drop::traits::OnDropCallback;
7use http_body::{Body, Frame};
8use pin_project_lite::pin_project;
9use std::{
10    pin::Pin,
11    task::{ready, Context, Poll},
12};
13
14pin_project! {
15    /// Response body for [`OnEarlyDropService`]. Fires its callback if
16    /// dropped before reaching end-of-stream.
17    ///
18    /// Bodies that already report [`Body::is_end_stream`] at construction
19    /// (HEAD requests, 204 responses, etc.) never fire.
20    ///
21    /// [`OnEarlyDropService`]: super::OnEarlyDropService
22    pub struct OnEarlyDropBody<B, Callback>
23    where
24        Callback: OnDropCallback,
25    {
26        #[pin]
27        inner: B,
28        guard: OnEarlyDropGuard<Callback>,
29    }
30}
31
32impl<B, Callback> OnEarlyDropBody<B, Callback>
33where
34    Callback: OnDropCallback,
35{
36    /// Wrap `body` with a drop callback.
37    pub(crate) fn new(body: B, callback: Callback) -> Self
38    where
39        B: Body,
40    {
41        let mut guard = OnEarlyDropGuard::new(callback);
42        if body.is_end_stream() {
43            guard.completed();
44        }
45        Self { inner: body, guard }
46    }
47}
48
49impl<B, Callback> Body for OnEarlyDropBody<B, Callback>
50where
51    B: Body,
52    Callback: OnDropCallback,
53{
54    type Data = B::Data;
55    type Error = B::Error;
56
57    fn poll_frame(
58        self: Pin<&mut Self>,
59        cx: &mut Context<'_>,
60    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
61        let this = self.project();
62        let result = ready!(this.inner.poll_frame(cx));
63        // End-of-stream (Ready(None)) or body-level error (Ready(Some(Err)))
64        // both mean the body will not yield more frames. Suppress the guard
65        // in either case; service-level errors are out of scope for this
66        // middleware.
67        if matches!(result, None | Some(Err(_))) {
68            this.guard.completed();
69        }
70        Poll::Ready(result)
71    }
72
73    fn is_end_stream(&self) -> bool {
74        self.inner.is_end_stream()
75    }
76
77    fn size_hint(&self) -> http_body::SizeHint {
78        self.inner.size_hint()
79    }
80}