hitbox_tower/upstream.rs
1//! Upstream adapter for bridging Tower services to Hitbox.
2//!
3//! This module provides [`TowerUpstream`](crate::upstream::TowerUpstream), an adapter
4//! that implements Hitbox's [`Upstream`] trait for Tower services. This allows the
5//! caching layer to call the wrapped service on cache misses.
6//!
7//! Users typically don't interact with this module directly — it's used internally
8//! by [`CacheService`](crate::service::CacheService).
9//!
10//! [`Upstream`]: hitbox_core::Upstream
11
12use std::future::Future;
13use std::marker::PhantomData;
14use std::pin::Pin;
15use std::task::{Context, Poll};
16
17use futures::ready;
18use hitbox_core::Upstream;
19use hitbox_http::{BufferedBody, CacheableHttpRequest, CacheableHttpResponse};
20use http::{Request, Response};
21use hyper::body::Body as HttpBody;
22use pin_project::pin_project;
23use tower::Service;
24
25/// Future returned by [`TowerUpstream::call`].
26///
27/// Wraps the underlying service future and converts the response from
28/// `Response<ResBody>` to [`CacheableHttpResponse<ResBody>`].
29///
30/// # When You'll Encounter This
31///
32/// You typically don't create this directly. It appears as the `Future` type
33/// in [`TowerUpstream`]'s [`Upstream`] implementation.
34///
35/// [`Upstream`]: hitbox_core::Upstream
36/// [`CacheableHttpResponse<ResBody>`]: hitbox_http::CacheableHttpResponse
37#[pin_project]
38pub struct TowerUpstreamFuture<F, ResBody, E> {
39 #[pin]
40 inner: F,
41 _phantom: PhantomData<(ResBody, E)>,
42}
43
44impl<F, ResBody, E> TowerUpstreamFuture<F, ResBody, E> {
45 /// Creates a new future wrapping the service's response future.
46 pub fn new(inner: F) -> Self {
47 Self {
48 inner,
49 _phantom: PhantomData,
50 }
51 }
52}
53
54impl<F, ResBody, E> Future for TowerUpstreamFuture<F, ResBody, E>
55where
56 F: Future<Output = Result<Response<ResBody>, E>>,
57 ResBody: HttpBody,
58{
59 type Output = Result<CacheableHttpResponse<ResBody>, E>;
60
61 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
62 let this = self.project();
63 match ready!(this.inner.poll(cx)) {
64 Ok(response) => {
65 let (parts, body) = response.into_parts();
66 let buffered = Response::from_parts(parts, BufferedBody::Passthrough(body));
67 Poll::Ready(Ok(CacheableHttpResponse::from_response(buffered)))
68 }
69 Err(e) => Poll::Ready(Err(e)),
70 }
71 }
72}
73
74/// Adapter that implements Hitbox's [`Upstream`] trait for Tower services.
75///
76/// `TowerUpstream` bridges the gap between Tower's [`Service`] trait and Hitbox's
77/// [`Upstream`] trait. It converts [`CacheableHttpRequest`] to `http::Request`
78/// for the service call, and wraps responses in [`CacheableHttpResponse`].
79///
80/// # When You'll Encounter This
81///
82/// You typically don't create this directly. It's used internally by
83/// [`CacheService`](crate::service::CacheService) to call the upstream service
84/// on cache misses.
85///
86/// # Type Parameters
87///
88/// * `S` - The Tower service being adapted
89/// * `ReqBody` - Request body type
90/// * `ResBody` - Response body type
91///
92/// [`Upstream`]: hitbox_core::Upstream
93/// [`Service`]: tower::Service
94/// [`CacheableHttpRequest`]: hitbox_http::CacheableHttpRequest
95/// [`CacheableHttpResponse`]: hitbox_http::CacheableHttpResponse
96pub struct TowerUpstream<S, ReqBody, ResBody> {
97 service: S,
98 _phantom: PhantomData<(ReqBody, ResBody)>,
99}
100
101impl<S, ReqBody, ResBody> TowerUpstream<S, ReqBody, ResBody> {
102 /// Creates a new upstream adapter wrapping the given service.
103 pub fn new(service: S) -> Self {
104 Self {
105 service,
106 _phantom: PhantomData,
107 }
108 }
109}
110
111impl<S, ReqBody, ResBody> Upstream<CacheableHttpRequest<ReqBody>>
112 for TowerUpstream<S, ReqBody, ResBody>
113where
114 S: Service<Request<BufferedBody<ReqBody>>, Response = Response<ResBody>>
115 + Clone
116 + Send
117 + 'static,
118 S::Future: Send,
119 S::Error: Send,
120 ReqBody: HttpBody + Send + 'static,
121 ReqBody::Error: Send,
122 ResBody: HttpBody + Send + 'static,
123{
124 type Response = Result<CacheableHttpResponse<ResBody>, S::Error>;
125 type Future = TowerUpstreamFuture<S::Future, ResBody, S::Error>;
126
127 fn call(&mut self, req: CacheableHttpRequest<ReqBody>) -> Self::Future {
128 let http_req = req.into_request();
129 let inner = self.service.call(http_req);
130 TowerUpstreamFuture::new(inner)
131 }
132}