Skip to main content

hitbox_tower/
service.rs

1//! Tower service implementation for HTTP caching.
2//!
3//! This module provides [`CacheService`](crate::service::CacheService), the Tower
4//! `Service` that performs the actual caching logic. Users typically don't construct
5//! this directly — it's created by the [`Cache`](crate::Cache) layer.
6
7use hitbox::concurrency::ConcurrencyManager;
8use hitbox::config::CacheConfig;
9use hitbox_core::{DisabledOffload, Offload};
10use std::sync::Arc;
11
12use hitbox::{backend::CacheBackend, fsm::CacheFuture};
13use hitbox_http::{BufferedBody, CacheableHttpRequest, CacheableHttpResponse};
14use http::header::HeaderName;
15use http::{Request, Response};
16use hyper::body::Body as HttpBody;
17use tower::Service;
18
19use crate::future::CacheServiceFuture;
20use crate::upstream::TowerUpstream;
21
22/// Tower [`Service`] that wraps an upstream service with caching.
23///
24/// `CacheService` intercepts HTTP requests, checks the cache, and either
25/// returns cached responses or forwards requests to the upstream service.
26/// It adds a cache status header (`HIT`/`MISS`/`STALE`) to every response.
27///
28/// # When You'll Encounter This
29///
30/// You typically don't create this directly. It's produced when you apply
31/// a [`Cache`] layer to a service via [`tower::ServiceBuilder`].
32///
33/// # Type Parameters
34///
35/// * `S` - The upstream Tower service being wrapped
36/// * `B` - Cache backend (e.g., [`MokaBackend`])
37/// * `C` - Configuration with predicates, extractors, and policy
38/// * `CM` - Concurrency manager for dogpile prevention
39/// * `O` - Offload strategy for background revalidation
40///
41/// [`Cache`]: crate::Cache
42/// [`Service`]: tower::Service
43/// [`MokaBackend`]: hitbox_moka::MokaBackend
44pub struct CacheService<S, B, C, CM, O = DisabledOffload> {
45    upstream: S,
46    backend: Arc<B>,
47    configuration: C,
48    offload: O,
49    concurrency_manager: CM,
50    cache_status_header: HeaderName,
51}
52
53impl<S, B, C, CM, O> CacheService<S, B, C, CM, O> {
54    /// Creates a new cache service wrapping the given upstream.
55    ///
56    /// Prefer using [`Cache::builder()`] and [`tower::ServiceBuilder`] instead
57    /// of constructing this directly.
58    ///
59    /// [`Cache::builder()`]: crate::Cache::builder
60    pub fn new(
61        upstream: S,
62        backend: Arc<B>,
63        configuration: C,
64        offload: O,
65        concurrency_manager: CM,
66        cache_status_header: HeaderName,
67    ) -> Self {
68        CacheService {
69            upstream,
70            backend,
71            configuration,
72            offload,
73            concurrency_manager,
74            cache_status_header,
75        }
76    }
77}
78
79impl<S, B, C, CM, O> Clone for CacheService<S, B, C, CM, O>
80where
81    S: Clone,
82    B: Clone,
83    C: Clone,
84    CM: Clone,
85    O: Clone,
86{
87    fn clone(&self) -> Self {
88        Self {
89            upstream: self.upstream.clone(),
90            backend: self.backend.clone(),
91            configuration: self.configuration.clone(),
92            offload: self.offload.clone(),
93            concurrency_manager: self.concurrency_manager.clone(),
94            cache_status_header: self.cache_status_header.clone(),
95        }
96    }
97}
98
99impl<S, B, C, CM, O, ReqBody, ResBody> Service<Request<ReqBody>> for CacheService<S, B, C, CM, O>
100where
101    S: Service<Request<BufferedBody<ReqBody>>, Response = Response<ResBody>>
102        + Clone
103        + Send
104        + 'static,
105    B: CacheBackend + Clone + Send + Sync + 'static,
106    S::Future: Send,
107    C: CacheConfig<CacheableHttpRequest<ReqBody>, CacheableHttpResponse<ResBody>>,
108    CM: ConcurrencyManager<Result<CacheableHttpResponse<ResBody>, S::Error>> + Clone + 'static,
109    O: Offload<'static> + Clone,
110    ReqBody: HttpBody + Send + 'static,
111    ReqBody::Error: Send,
112    ResBody: HttpBody + Send + 'static,
113    ResBody::Error: Send,
114    ResBody::Data: Send,
115    S::Error: Send,
116{
117    type Response = Response<BufferedBody<ResBody>>;
118    type Error = S::Error;
119    type Future = CacheServiceFuture<
120        CacheFuture<
121            'static,
122            B,
123            CacheableHttpRequest<ReqBody>,
124            Result<CacheableHttpResponse<ResBody>, S::Error>,
125            TowerUpstream<S, ReqBody, ResBody>,
126            C::RequestPredicate,
127            C::ResponsePredicate,
128            C::Extractor,
129            CM,
130            O,
131        >,
132        ResBody,
133        S::Error,
134    >;
135
136    fn poll_ready(
137        &mut self,
138        cx: &mut std::task::Context<'_>,
139    ) -> std::task::Poll<Result<(), Self::Error>> {
140        self.upstream.poll_ready(cx)
141    }
142
143    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
144        let configuration = &self.configuration;
145
146        // Convert incoming Request<ReqBody> to CacheableHttpRequest<ReqBody>
147        let (parts, body) = req.into_parts();
148        let buffered_request = Request::from_parts(parts, BufferedBody::Passthrough(body));
149        let cacheable_req = CacheableHttpRequest::from_request(buffered_request);
150
151        // Create upstream adapter that handles Tower service calls
152        let upstream = TowerUpstream::new(self.upstream.clone());
153
154        // Create CacheFuture with cacheable types only
155        let cache_future = CacheFuture::new(
156            self.backend.clone(),
157            cacheable_req,
158            upstream,
159            configuration.request_predicates(),
160            configuration.response_predicates(),
161            configuration.extractors(),
162            Arc::new(configuration.policy().clone()),
163            self.offload.clone(),
164            self.concurrency_manager.clone(),
165        );
166
167        // Wrap in CacheServiceFuture to add cache headers
168        CacheServiceFuture::new(cache_future, self.cache_status_header.clone())
169    }
170}