hitbox_reqwest/upstream.rs
1//! Upstream wrapper for reqwest-middleware's Next type.
2//!
3//! This module provides [`ReqwestUpstream`] which bridges the gap between
4//! hitbox's [`Upstream`] trait and reqwest-middleware's [`Next`] type.
5//!
6//! # Overview
7//!
8//! When the cache middleware needs to fetch data from the actual HTTP endpoint
9//! (on cache miss or stale), it uses [`ReqwestUpstream`] to:
10//!
11//! 1. Convert [`CacheableHttpRequest`] back to [`reqwest::Request`]
12//! 2. Call the next middleware in the chain via [`Next::run`]
13//! 3. Convert [`reqwest::Response`] to [`CacheableHttpResponse`]
14//!
15//! [`Upstream`]: hitbox_core::Upstream
16//! [`Next`]: reqwest_middleware::Next
17//! [`CacheableHttpRequest`]: hitbox_http::CacheableHttpRequest
18//! [`CacheableHttpResponse`]: hitbox_http::CacheableHttpResponse
19
20use std::future::Future;
21use std::pin::Pin;
22
23use bytes::Bytes;
24use hitbox_core::Upstream;
25use hitbox_http::{BufferedBody, CacheableHttpRequest, CacheableHttpResponse};
26use http::Extensions;
27use reqwest_middleware::{Next, Result};
28
29/// Upstream wrapper that bridges reqwest-middleware's [`Next`] to hitbox's [`Upstream`] trait.
30///
31/// This adapter allows the hitbox cache FSM to call the remaining middleware
32/// chain when it needs to fetch fresh data from upstream.
33///
34/// # Type Parameter
35///
36/// The lifetime `'a` comes from [`Next<'a>`], representing the middleware
37/// chain's lifetime. This is why [`DisabledOffload`] is used in the middleware -
38/// we cannot spawn background tasks with non-`'static` lifetimes.
39///
40/// [`Next`]: reqwest_middleware::Next
41/// [`DisabledOffload`]: hitbox_core::DisabledOffload
42pub struct ReqwestUpstream<'a> {
43 next: Next<'a>,
44 extensions: Extensions,
45}
46
47impl<'a> ReqwestUpstream<'a> {
48 /// Creates a new upstream wrapper. Typically called internally by
49 /// [`CacheMiddleware`](crate::CacheMiddleware).
50 pub fn new(next: Next<'a>, extensions: Extensions) -> Self {
51 Self { next, extensions }
52 }
53}
54
55/// Implementation of [`Upstream`] for reqwest-middleware integration.
56///
57/// This allows the hitbox cache FSM to treat the remaining middleware chain
58/// as an upstream service that can be called on cache misses.
59///
60/// [`Upstream`]: hitbox_core::Upstream
61impl<'a> Upstream<CacheableHttpRequest<reqwest::Body>> for ReqwestUpstream<'a> {
62 type Response = Result<CacheableHttpResponse<reqwest::Body>>;
63 type Future = Pin<Box<dyn Future<Output = Self::Response> + Send + 'a>>;
64
65 fn call(&mut self, req: CacheableHttpRequest<reqwest::Body>) -> Self::Future {
66 let next = self.next.clone();
67 let mut extensions = std::mem::take(&mut self.extensions);
68
69 Box::pin(async move {
70 // Convert CacheableHttpRequest back to reqwest::Request
71 let http_request = req.into_request();
72 let (parts, buffered_body) = http_request.into_parts();
73
74 // Convert BufferedBody back to reqwest::Body
75 let body = buffered_body_to_reqwest(buffered_body);
76
77 // Reconstruct http::Request and convert to reqwest::Request
78 let http_request = http::Request::from_parts(parts, body);
79 let reqwest_request: reqwest::Request = http_request
80 .try_into()
81 .map_err(|e: reqwest::Error| reqwest_middleware::Error::Reqwest(e))?;
82
83 // Call the next middleware
84 let response = next.run(reqwest_request, &mut extensions).await?;
85
86 // Convert reqwest::Response to CacheableHttpResponse
87 let http_response: http::Response<reqwest::Body> = response.into();
88 let (parts, body) = http_response.into_parts();
89 let buffered_body = BufferedBody::Passthrough(body);
90 let http_response = http::Response::from_parts(parts, buffered_body);
91
92 Ok(CacheableHttpResponse::from_response(http_response))
93 })
94 }
95}
96
97/// Converts a [`BufferedBody`] to [`reqwest::Body`].
98///
99/// # Performance
100///
101/// This conversion is cheap for most cases:
102///
103/// - **Passthrough**: Unwraps the inner body with zero overhead
104/// - **Complete**: Creates a body from the buffered bytes
105/// - **Partial**: Wraps a [`PartialBufferedBody`] which implements [`http_body::Body`],
106/// yielding the buffered prefix first, then the remaining stream
107///
108/// [`BufferedBody`]: hitbox_http::BufferedBody
109/// [`PartialBufferedBody`]: hitbox_http::PartialBufferedBody
110pub fn buffered_body_to_reqwest(buffered: BufferedBody<reqwest::Body>) -> reqwest::Body {
111 match buffered {
112 BufferedBody::Passthrough(body) => body,
113 BufferedBody::Complete(Some(bytes)) => reqwest::Body::from(bytes),
114 BufferedBody::Complete(None) => reqwest::Body::from(Bytes::new()),
115 BufferedBody::Partial(partial) => {
116 // PartialBufferedBody implements HttpBody, handling:
117 // - prefix bytes (yielded first)
118 // - remaining stream OR error
119 reqwest::Body::wrap(partial)
120 }
121 }
122}