Skip to main content

hitbox_tower/
upstream.rs

1//! Upstream adapter for bridging Tower services to Hitbox.
2//!
3//! This module provides [`TowerUpstream`](crate::upstream::TowerUpstream), an adapter
4//! that implements Hitbox's [`Upstream`] trait for Tower services. This allows the
5//! caching layer to call the wrapped service on cache misses.
6//!
7//! Users typically don't interact with this module directly — it's used internally
8//! by [`CacheService`](crate::service::CacheService).
9//!
10//! [`Upstream`]: hitbox_core::Upstream
11
12use std::future::Future;
13use std::marker::PhantomData;
14use std::pin::Pin;
15use std::task::{Context, Poll};
16
17use futures::ready;
18use hitbox_core::Upstream;
19use hitbox_http::{BufferedBody, CacheableHttpRequest, CacheableHttpResponse};
20use http::{Request, Response};
21use hyper::body::Body as HttpBody;
22use pin_project::pin_project;
23use tower::Service;
24
25/// Future returned by [`TowerUpstream::call`].
26///
27/// Wraps the underlying service future and converts the response from
28/// `Response<ResBody>` to [`CacheableHttpResponse<ResBody>`].
29///
30/// # When You'll Encounter This
31///
32/// You typically don't create this directly. It appears as the `Future` type
33/// in [`TowerUpstream`]'s [`Upstream`] implementation.
34///
35/// [`Upstream`]: hitbox_core::Upstream
36/// [`CacheableHttpResponse<ResBody>`]: hitbox_http::CacheableHttpResponse
37#[pin_project]
38pub struct TowerUpstreamFuture<F, ResBody, E> {
39    #[pin]
40    inner: F,
41    _phantom: PhantomData<(ResBody, E)>,
42}
43
44impl<F, ResBody, E> TowerUpstreamFuture<F, ResBody, E> {
45    /// Creates a new future wrapping the service's response future.
46    pub fn new(inner: F) -> Self {
47        Self {
48            inner,
49            _phantom: PhantomData,
50        }
51    }
52}
53
54impl<F, ResBody, E> Future for TowerUpstreamFuture<F, ResBody, E>
55where
56    F: Future<Output = Result<Response<ResBody>, E>>,
57    ResBody: HttpBody,
58{
59    type Output = Result<CacheableHttpResponse<ResBody>, E>;
60
61    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
62        let this = self.project();
63        match ready!(this.inner.poll(cx)) {
64            Ok(response) => {
65                let (parts, body) = response.into_parts();
66                let buffered = Response::from_parts(parts, BufferedBody::Passthrough(body));
67                Poll::Ready(Ok(CacheableHttpResponse::from_response(buffered)))
68            }
69            Err(e) => Poll::Ready(Err(e)),
70        }
71    }
72}
73
74/// Adapter that implements Hitbox's [`Upstream`] trait for Tower services.
75///
76/// `TowerUpstream` bridges the gap between Tower's [`Service`] trait and Hitbox's
77/// [`Upstream`] trait. It converts [`CacheableHttpRequest`] to `http::Request`
78/// for the service call, and wraps responses in [`CacheableHttpResponse`].
79///
80/// # When You'll Encounter This
81///
82/// You typically don't create this directly. It's used internally by
83/// [`CacheService`](crate::service::CacheService) to call the upstream service
84/// on cache misses.
85///
86/// # Type Parameters
87///
88/// * `S` - The Tower service being adapted
89/// * `ReqBody` - Request body type
90/// * `ResBody` - Response body type
91///
92/// [`Upstream`]: hitbox_core::Upstream
93/// [`Service`]: tower::Service
94/// [`CacheableHttpRequest`]: hitbox_http::CacheableHttpRequest
95/// [`CacheableHttpResponse`]: hitbox_http::CacheableHttpResponse
96pub struct TowerUpstream<S, ReqBody, ResBody> {
97    service: S,
98    _phantom: PhantomData<(ReqBody, ResBody)>,
99}
100
101impl<S, ReqBody, ResBody> TowerUpstream<S, ReqBody, ResBody> {
102    /// Creates a new upstream adapter wrapping the given service.
103    pub fn new(service: S) -> Self {
104        Self {
105            service,
106            _phantom: PhantomData,
107        }
108    }
109}
110
111impl<S, ReqBody, ResBody> Upstream<CacheableHttpRequest<ReqBody>>
112    for TowerUpstream<S, ReqBody, ResBody>
113where
114    S: Service<Request<BufferedBody<ReqBody>>, Response = Response<ResBody>>
115        + Clone
116        + Send
117        + 'static,
118    S::Future: Send,
119    S::Error: Send,
120    ReqBody: HttpBody + Send + 'static,
121    ReqBody::Error: Send,
122    ResBody: HttpBody + Send + 'static,
123{
124    type Response = Result<CacheableHttpResponse<ResBody>, S::Error>;
125    type Future = TowerUpstreamFuture<S::Future, ResBody, S::Error>;
126
127    fn call(&mut self, req: CacheableHttpRequest<ReqBody>) -> Self::Future {
128        let http_req = req.into_request();
129        let inner = self.service.call(http_req);
130        TowerUpstreamFuture::new(inner)
131    }
132}