Skip to main content

hitbox_reqwest/
upstream.rs

1//! Upstream wrapper for reqwest-middleware's Next type.
2//!
3//! This module provides [`ReqwestUpstream`] which bridges the gap between
4//! hitbox's [`Upstream`] trait and reqwest-middleware's [`Next`] type.
5//!
6//! # Overview
7//!
8//! When the cache middleware needs to fetch data from the actual HTTP endpoint
9//! (on cache miss or stale), it uses [`ReqwestUpstream`] to:
10//!
11//! 1. Convert [`CacheableHttpRequest`] back to [`reqwest::Request`]
12//! 2. Call the next middleware in the chain via [`Next::run`]
13//! 3. Convert [`reqwest::Response`] to [`CacheableHttpResponse`]
14//!
15//! [`Upstream`]: hitbox_core::Upstream
16//! [`Next`]: reqwest_middleware::Next
17//! [`CacheableHttpRequest`]: hitbox_http::CacheableHttpRequest
18//! [`CacheableHttpResponse`]: hitbox_http::CacheableHttpResponse
19
20use std::future::Future;
21use std::pin::Pin;
22
23use bytes::Bytes;
24use hitbox_core::Upstream;
25use hitbox_http::{BufferedBody, CacheableHttpRequest, CacheableHttpResponse};
26use http::Extensions;
27use reqwest_middleware::{Next, Result};
28
29/// Upstream wrapper that bridges reqwest-middleware's [`Next`] to hitbox's [`Upstream`] trait.
30///
31/// This adapter allows the hitbox cache FSM to call the remaining middleware
32/// chain when it needs to fetch fresh data from upstream.
33///
34/// # Type Parameter
35///
36/// The lifetime `'a` comes from [`Next<'a>`], representing the middleware
37/// chain's lifetime. This is why [`DisabledOffload`] is used in the middleware -
38/// we cannot spawn background tasks with non-`'static` lifetimes.
39///
40/// [`Next`]: reqwest_middleware::Next
41/// [`DisabledOffload`]: hitbox_core::DisabledOffload
42pub struct ReqwestUpstream<'a> {
43    next: Next<'a>,
44    extensions: Extensions,
45}
46
47impl<'a> ReqwestUpstream<'a> {
48    /// Creates a new upstream wrapper. Typically called internally by
49    /// [`CacheMiddleware`](crate::CacheMiddleware).
50    pub fn new(next: Next<'a>, extensions: Extensions) -> Self {
51        Self { next, extensions }
52    }
53}
54
55/// Implementation of [`Upstream`] for reqwest-middleware integration.
56///
57/// This allows the hitbox cache FSM to treat the remaining middleware chain
58/// as an upstream service that can be called on cache misses.
59///
60/// [`Upstream`]: hitbox_core::Upstream
61impl<'a> Upstream<CacheableHttpRequest<reqwest::Body>> for ReqwestUpstream<'a> {
62    type Response = Result<CacheableHttpResponse<reqwest::Body>>;
63    type Future = Pin<Box<dyn Future<Output = Self::Response> + Send + 'a>>;
64
65    fn call(&mut self, req: CacheableHttpRequest<reqwest::Body>) -> Self::Future {
66        let next = self.next.clone();
67        let mut extensions = std::mem::take(&mut self.extensions);
68
69        Box::pin(async move {
70            // Convert CacheableHttpRequest back to reqwest::Request
71            let http_request = req.into_request();
72            let (parts, buffered_body) = http_request.into_parts();
73
74            // Convert BufferedBody back to reqwest::Body
75            let body = buffered_body_to_reqwest(buffered_body);
76
77            // Reconstruct http::Request and convert to reqwest::Request
78            let http_request = http::Request::from_parts(parts, body);
79            let reqwest_request: reqwest::Request = http_request
80                .try_into()
81                .map_err(|e: reqwest::Error| reqwest_middleware::Error::Reqwest(e))?;
82
83            // Call the next middleware
84            let response = next.run(reqwest_request, &mut extensions).await?;
85
86            // Convert reqwest::Response to CacheableHttpResponse
87            let http_response: http::Response<reqwest::Body> = response.into();
88            let (parts, body) = http_response.into_parts();
89            let buffered_body = BufferedBody::Passthrough(body);
90            let http_response = http::Response::from_parts(parts, buffered_body);
91
92            Ok(CacheableHttpResponse::from_response(http_response))
93        })
94    }
95}
96
97/// Converts a [`BufferedBody`] to [`reqwest::Body`].
98///
99/// # Performance
100///
101/// This conversion is cheap for most cases:
102///
103/// - **Passthrough**: Unwraps the inner body with zero overhead
104/// - **Complete**: Creates a body from the buffered bytes
105/// - **Partial**: Wraps a [`PartialBufferedBody`] which implements [`http_body::Body`],
106///   yielding the buffered prefix first, then the remaining stream
107///
108/// [`BufferedBody`]: hitbox_http::BufferedBody
109/// [`PartialBufferedBody`]: hitbox_http::PartialBufferedBody
110pub fn buffered_body_to_reqwest(buffered: BufferedBody<reqwest::Body>) -> reqwest::Body {
111    match buffered {
112        BufferedBody::Passthrough(body) => body,
113        BufferedBody::Complete(Some(bytes)) => reqwest::Body::from(bytes),
114        BufferedBody::Complete(None) => reqwest::Body::from(Bytes::new()),
115        BufferedBody::Partial(partial) => {
116            // PartialBufferedBody implements HttpBody, handling:
117            // - prefix bytes (yielded first)
118            // - remaining stream OR error
119            reqwest::Body::wrap(partial)
120        }
121    }
122}