Skip to main content

http_cache_tower/
lib.rs

1//! HTTP caching middleware for Tower services and Axum applications.
2//!
3//! This crate provides Tower layers that implement HTTP caching according to RFC 7234.
4//! It supports both traditional buffered caching and streaming responses for large payloads.
5//!
6//! ## Basic Usage
7//!
8//! ### With Tower Services
9//!
10//! ```rust,no_run
11//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
12//! use http_cache::{CacheMode, HttpCache, HttpCacheOptions};
13//! use tower::ServiceBuilder;
14//! use tower::service_fn;
15//! use tower::ServiceExt;
16//! use http::{Request, Response};
17//! use http_body_util::Full;
18//! use bytes::Bytes;
19//! use std::convert::Infallible;
20//!
21//! async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
22//!     Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
23//! }
24//!
25//! #[tokio::main]
26//! async fn main() {
27//!     // Create cache manager with disk storage
28//!     let cache_manager = CACacheManager::new("./cache".into(), true);
29//!     
30//!     // Create cache layer
31//!     let cache_layer = HttpCacheLayer::new(cache_manager);
32//!     
33//!     // Build service with caching
34//!     let service = ServiceBuilder::new()
35//!         .layer(cache_layer)
36//!         .service_fn(handler);
37//!     
38//!     // Use the service
39//!     let request = Request::builder()
40//!         .uri("http://example.com")
41//!         .body(Full::new(Bytes::new()))
42//!         .unwrap();
43//!     let response = service.oneshot(request).await.unwrap();
44//! }
45//! ```
46//!
47//! ### With Custom Cache Configuration
48//!
49//! ```rust
50//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
51//! use http_cache::{CacheMode, HttpCache, HttpCacheOptions};
52//!
53//! # #[tokio::main]
54//! # async fn main() {
55//! // Create cache manager
56//! let cache_manager = CACacheManager::new("./cache".into(), true);
57//!
58//! // Configure cache behavior
59//! let cache = HttpCache {
60//!     mode: CacheMode::Default,
61//!     manager: cache_manager,
62//!     options: HttpCacheOptions::default(),
63//! };
64//!
65//! // Create layer with custom cache
66//! let cache_layer = HttpCacheLayer::with_cache(cache);
67//! # }
68//! ```
69//!
70//! ### Streaming Support
71//!
72//! For handling large responses without buffering, use `StreamingManager`:
73//!
74//! ```rust,ignore
75//! use http_cache_tower::HttpCacheStreamingLayer;
76//! use http_cache::StreamingManager;
77//!
78//! # #[tokio::main]
79//! # async fn main() {
80//! // Create streaming cache setup
81//! let streaming_manager = StreamingManager::with_temp_dir(1000).await.unwrap();
82//! let streaming_layer = HttpCacheStreamingLayer::new(streaming_manager);
83//!
84//! // Use with your service
85//! // let service = streaming_layer.layer(your_service);
86//! # }
87//! ```
88//!
89//! ## Cache Modes
90//!
91//! Different cache modes provide different behaviors:
92//!
93//! - `CacheMode::Default`: Follow HTTP caching rules strictly
94//! - `CacheMode::NoStore`: Never cache responses
95//! - `CacheMode::NoCache`: Always revalidate with the origin server
96//! - `CacheMode::ForceCache`: Cache responses even if headers suggest otherwise
97//! - `CacheMode::OnlyIfCached`: Only serve from cache, never hit origin server
98//! - `CacheMode::IgnoreRules`: Cache everything regardless of headers
99//!
100//! ## Cache Invalidation
101//!
102//! The middleware automatically handles cache invalidation for unsafe HTTP methods:
103//!
104//! ```text
105//! These methods will invalidate any cached GET response for the same URI:
106//! - PUT /api/users/123    -> invalidates GET /api/users/123
107//! - POST /api/users/123   -> invalidates GET /api/users/123  
108//! - DELETE /api/users/123 -> invalidates GET /api/users/123
109//! - PATCH /api/users/123  -> invalidates GET /api/users/123
110//! ```
111//!
112//! ## Integration with Other Tower Layers
113//!
114//! The cache layer works with other Tower middleware:
115//!
116//! ```rust,no_run
117//! use tower::ServiceBuilder;
118//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
119//! use tower::service_fn;
120//! use tower::ServiceExt;
121//! use http::{Request, Response};
122//! use http_body_util::Full;
123//! use bytes::Bytes;
124//! use std::convert::Infallible;
125//!
126//! async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
127//!     Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
128//! }
129//!
130//! #[tokio::main]
131//! async fn main() {
132//!     let cache_manager = CACacheManager::new("./cache".into(), true);
133//!     let cache_layer = HttpCacheLayer::new(cache_manager);
134//!
135//!     let service = ServiceBuilder::new()
136//!         // .layer(TraceLayer::new_for_http())  // Logging (requires tower-http)
137//!         // .layer(CompressionLayer::new())     // Compression (requires tower-http)
138//!         .layer(cache_layer)                    // Caching
139//!         .service_fn(handler);
140//!     
141//!     // Use the service
142//!     let request = Request::builder()
143//!         .uri("http://example.com")
144//!         .body(Full::new(Bytes::new()))
145//!         .unwrap();
146//!     let response = service.oneshot(request).await.unwrap();
147//! }
148//! ```
149
150use bytes::Bytes;
151use http::{
152    header::CACHE_CONTROL, request, HeaderValue, Method, Request, Response,
153};
154use http_body::Body;
155use http_body_util::BodyExt;
156
157#[cfg(feature = "manager-cacache")]
158pub use http_cache::CACacheManager;
159
160#[cfg(feature = "rate-limiting")]
161pub use http_cache::rate_limiting::{
162    CacheAwareRateLimiter, DirectRateLimiter, DomainRateLimiter, Quota,
163};
164#[cfg(feature = "streaming")]
165use http_cache::StreamingError;
166use http_cache::{
167    url_parse, BoxError, CacheManager, CacheMode, CacheOptions, HitOrMiss,
168    HttpCache, HttpCacheOptions, HttpResponse, Middleware, Url, XCACHE,
169    XCACHELOOKUP,
170};
171#[cfg(feature = "streaming")]
172use http_cache::{HttpStreamingCache, StreamingCacheManager};
173use http_cache_semantics::CachePolicy;
174use std::{
175    pin::Pin,
176    sync::Arc,
177    task::{Context, Poll},
178    time::SystemTime,
179};
180use tower::{Layer, Service, ServiceExt};
181
182// Re-export unified error types from http-cache core
183pub use http_cache::HttpCacheError;
184
185#[cfg(feature = "streaming")]
186/// Type alias for tower streaming errors, using the unified streaming error system
187pub type TowerStreamingError = http_cache::ClientStreamingError;
188
189/// Helper functions for error conversions
190trait HttpCacheErrorExt<T> {
191    fn cache_err(self) -> Result<T, HttpCacheError>;
192}
193
194impl<T, E> HttpCacheErrorExt<T> for Result<T, E>
195where
196    E: ToString,
197{
198    fn cache_err(self) -> Result<T, HttpCacheError> {
199        self.map_err(|e| HttpCacheError::cache(e.to_string()))
200    }
201}
202
203/// Helper function to add cache status headers to a response
204fn add_cache_status_headers<B>(
205    mut response: Response<HttpCacheBody<B>>,
206    hit_or_miss: &str,
207    cache_lookup: &str,
208) -> Response<HttpCacheBody<B>> {
209    let headers = response.headers_mut();
210    if let Ok(hv) = HeaderValue::from_str(hit_or_miss) {
211        headers.insert(XCACHE, hv);
212    }
213    if let Ok(hv) = HeaderValue::from_str(cache_lookup) {
214        headers.insert(XCACHELOOKUP, hv);
215    }
216    response
217}
218
219/// Middleware adapter that bridges Tower services to the `http_cache::Middleware`
220/// trait, allowing `HttpCache::run` to drive the full cache flow (mode dispatch,
221/// conditional revalidation, 5xx handling, warning headers, etc.) instead of
222/// reimplementing it inline.
223struct TowerMiddleware<S, ReqBody> {
224    parts: request::Parts,
225    body: Option<ReqBody>,
226    service: Option<S>,
227}
228
229impl<S, ReqBody, ResBody> Middleware for TowerMiddleware<S, ReqBody>
230where
231    S: Service<Request<ReqBody>, Response = Response<ResBody>>
232        + Clone
233        + Send
234        + 'static,
235    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
236    S::Future: Send + 'static,
237    ReqBody: Body + Send + 'static,
238    ReqBody::Data: Send,
239    ReqBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
240    ResBody: Body + Send + 'static,
241    ResBody::Data: Send,
242    ResBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
243{
244    fn is_method_get_head(&self) -> bool {
245        self.parts.method == Method::GET || self.parts.method == Method::HEAD
246    }
247
248    fn policy(
249        &self,
250        response: &HttpResponse,
251    ) -> http_cache::Result<CachePolicy> {
252        Ok(CachePolicy::new(&self.parts, &response.parts()?))
253    }
254
255    fn policy_with_options(
256        &self,
257        response: &HttpResponse,
258        options: CacheOptions,
259    ) -> http_cache::Result<CachePolicy> {
260        Ok(CachePolicy::new_options(
261            &self.parts,
262            &response.parts()?,
263            SystemTime::now(),
264            options,
265        ))
266    }
267
268    fn update_headers(
269        &mut self,
270        parts: &request::Parts,
271    ) -> http_cache::Result<()> {
272        for (name, value) in parts.headers.iter() {
273            self.parts.headers.insert(name.clone(), value.clone());
274        }
275        Ok(())
276    }
277
278    fn force_no_cache(&mut self) -> http_cache::Result<()> {
279        self.parts
280            .headers
281            .insert(CACHE_CONTROL, HeaderValue::from_static("no-cache"));
282        Ok(())
283    }
284
285    fn parts(&self) -> http_cache::Result<request::Parts> {
286        Ok(self.parts.clone())
287    }
288
289    fn url(&self) -> http_cache::Result<Url> {
290        url_parse(self.parts.uri.to_string().as_str())
291    }
292
293    fn method(&self) -> http_cache::Result<String> {
294        Ok(self.parts.method.as_ref().to_string())
295    }
296
297    async fn remote_fetch(&mut self) -> http_cache::Result<HttpResponse> {
298        let body = self
299            .body
300            .take()
301            .ok_or_else(|| BoxError::from("request body already consumed"))?;
302        let service = self
303            .service
304            .take()
305            .ok_or_else(|| BoxError::from("inner service already consumed"))?;
306
307        let request = Request::from_parts(self.parts.clone(), body);
308        let response = service.oneshot(request).await.map_err(|e| {
309            let boxed: Box<dyn std::error::Error + Send + Sync> = e.into();
310            boxed
311        })?;
312
313        let (res_parts, res_body) = response.into_parts();
314        let collected = BodyExt::collect(res_body).await.map_err(|e| {
315            let boxed: Box<dyn std::error::Error + Send + Sync> = e.into();
316            boxed
317        })?;
318        let body_bytes = collected.to_bytes().to_vec();
319
320        let url = url_parse(self.parts.uri.to_string().as_str())?;
321        let headers = (&res_parts.headers).into();
322        let status = res_parts.status.as_u16();
323        let version = res_parts.version.try_into()?;
324
325        Ok(HttpResponse {
326            body: body_bytes,
327            headers,
328            status,
329            url,
330            version,
331            metadata: None,
332        })
333    }
334}
335
336/// Convert an [`HttpResponse`] from the cache core into a Tower
337/// `Response<HttpCacheBody<B>>`.
338fn http_response_to_tower_response<B>(
339    http_response: HttpResponse,
340) -> Result<Response<HttpCacheBody<B>>, HttpCacheError> {
341    let mut response = HttpCacheOptions::http_response_to_response(
342        &http_response,
343        HttpCacheBody::Buffered(http_response.body.clone()),
344    )
345    .map_err(HttpCacheError::other)?;
346
347    // Preserve metadata in response extensions
348    if let Some(metadata) = http_response.metadata {
349        response
350            .extensions_mut()
351            .insert(http_cache::HttpCacheMetadata::from(metadata));
352    }
353
354    Ok(response)
355}
356
357#[cfg(feature = "streaming")]
358fn add_cache_status_headers_streaming<B>(
359    mut response: Response<B>,
360    hit_or_miss: &str,
361    cache_lookup: &str,
362) -> Response<B> {
363    let headers = response.headers_mut();
364    if let Ok(hv) = HeaderValue::from_str(hit_or_miss) {
365        headers.insert(XCACHE, hv);
366    }
367    if let Ok(hv) = HeaderValue::from_str(cache_lookup) {
368        headers.insert(XCACHELOOKUP, hv);
369    }
370    response
371}
372
373/// HTTP cache layer for Tower services.
374///
375/// This layer implements HTTP caching according to RFC 7234, automatically caching
376/// GET and HEAD responses based on their cache-control headers and invalidating
377/// cache entries when unsafe methods (PUT, POST, DELETE, PATCH) are used.
378///
379/// # Example
380///
381/// ```rust
382/// use http_cache_tower::{HttpCacheLayer, CACacheManager};
383/// use tower::ServiceBuilder;
384/// use tower::service_fn;
385/// use http::{Request, Response};
386/// use http_body_util::Full;
387/// use bytes::Bytes;
388/// use std::convert::Infallible;
389///
390/// # #[tokio::main]
391/// # async fn main() {
392/// let cache_manager = CACacheManager::new("./cache".into(), true);
393/// let cache_layer = HttpCacheLayer::new(cache_manager);
394///
395/// // Use with ServiceBuilder
396/// let service = ServiceBuilder::new()
397///     .layer(cache_layer)
398///     .service_fn(|_req: Request<Full<Bytes>>| async {
399///         Ok::<_, Infallible>(Response::new(Full::new(Bytes::from("Hello"))))
400///     });
401/// # }
402/// ```
403#[derive(Clone)]
404pub struct HttpCacheLayer<CM>
405where
406    CM: CacheManager,
407{
408    cache: Arc<HttpCache<CM>>,
409}
410
411impl<CM> HttpCacheLayer<CM>
412where
413    CM: CacheManager,
414{
415    /// Create a new HTTP cache layer with default configuration.
416    ///
417    /// Uses [`CacheMode::Default`] and default [`HttpCacheOptions`].
418    ///
419    /// # Arguments
420    ///
421    /// * `cache_manager` - The cache manager to use for storing responses
422    ///
423    /// # Example
424    ///
425    /// ```rust
426    /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
427    ///
428    /// # #[tokio::main]
429    /// # async fn main() {
430    /// let cache_manager = CACacheManager::new("./cache".into(), true);
431    /// let layer = HttpCacheLayer::new(cache_manager);
432    /// # }
433    /// ```
434    pub fn new(cache_manager: CM) -> Self {
435        Self {
436            cache: Arc::new(HttpCache {
437                mode: CacheMode::Default,
438                manager: cache_manager,
439                options: HttpCacheOptions::default(),
440            }),
441        }
442    }
443
444    /// Create a new HTTP cache layer with custom options.
445    ///
446    /// Uses [`CacheMode::Default`] but allows customizing the cache behavior
447    /// through [`HttpCacheOptions`].
448    ///
449    /// # Arguments
450    ///
451    /// * `cache_manager` - The cache manager to use for storing responses
452    /// * `options` - Custom cache options
453    ///
454    /// # Example
455    ///
456    /// ```rust
457    /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
458    /// use http_cache::HttpCacheOptions;
459    ///
460    /// # #[tokio::main]
461    /// # async fn main() {
462    /// let cache_manager = CACacheManager::new("./cache".into(), true);
463    ///
464    /// let options = HttpCacheOptions {
465    ///     cache_key: Some(std::sync::Arc::new(|req: &http::request::Parts| {
466    ///         format!("custom:{}:{}", req.method, req.uri)
467    ///     })),
468    ///     ..Default::default()
469    /// };
470    ///
471    /// let layer = HttpCacheLayer::with_options(cache_manager, options);
472    /// # }
473    /// ```
474    pub fn with_options(cache_manager: CM, options: HttpCacheOptions) -> Self {
475        Self {
476            cache: Arc::new(HttpCache {
477                mode: CacheMode::Default,
478                manager: cache_manager,
479                options,
480            }),
481        }
482    }
483
484    /// Create a new HTTP cache layer with a pre-configured cache.
485    ///
486    /// This method gives you full control over the cache configuration,
487    /// including the cache mode.
488    ///
489    /// # Arguments
490    ///
491    /// * `cache` - A fully configured HttpCache instance
492    ///
493    /// # Example
494    ///
495    /// ```rust
496    /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
497    /// use http_cache::{HttpCache, CacheMode, HttpCacheOptions};
498    ///
499    /// # #[tokio::main]
500    /// # async fn main() {
501    /// let cache_manager = CACacheManager::new("./cache".into(), true);
502    ///
503    /// let cache = HttpCache {
504    ///     mode: CacheMode::ForceCache,
505    ///     manager: cache_manager,
506    ///     options: HttpCacheOptions::default(),
507    /// };
508    ///
509    /// let layer = HttpCacheLayer::with_cache(cache);
510    /// # }
511    /// ```
512    pub fn with_cache(cache: HttpCache<CM>) -> Self {
513        Self { cache: Arc::new(cache) }
514    }
515}
516
517/// HTTP cache layer with streaming support for Tower services.
518///
519/// This layer provides the same HTTP caching functionality as [`HttpCacheLayer`]
520/// but handles streaming responses. It can work with large
521/// responses without buffering them entirely in memory.
522///
523/// # Example
524///
525/// ```rust
526/// use http_cache_tower::HttpCacheStreamingLayer;
527/// use http_cache::StreamingManager;
528/// use tower::ServiceBuilder;
529/// use tower::service_fn;
530/// use http::{Request, Response};
531/// use http_body_util::Full;
532/// use bytes::Bytes;
533/// use std::convert::Infallible;
534///
535/// async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
536///     Ok(Response::new(Full::new(Bytes::from("Hello"))))
537/// }
538///
539/// # #[tokio::main]
540/// # async fn main() {
541/// let streaming_manager = StreamingManager::with_temp_dir(1000).await.unwrap();
542/// let streaming_layer = HttpCacheStreamingLayer::new(streaming_manager);
543///
544/// // Use with ServiceBuilder
545/// let service = ServiceBuilder::new()
546///     .layer(streaming_layer)
547///     .service_fn(handler);
548/// # }
549/// ```
550#[cfg(feature = "streaming")]
551#[derive(Clone)]
552pub struct HttpCacheStreamingLayer<CM>
553where
554    CM: StreamingCacheManager,
555{
556    cache: Arc<HttpStreamingCache<CM>>,
557}
558
559#[cfg(feature = "streaming")]
560impl<CM> HttpCacheStreamingLayer<CM>
561where
562    CM: StreamingCacheManager,
563{
564    /// Create a new HTTP cache streaming layer with default configuration.
565    ///
566    /// Uses [`CacheMode::Default`] and default [`HttpCacheOptions`].
567    ///
568    /// # Arguments
569    ///
570    /// * `cache_manager` - The streaming cache manager to use
571    ///
572    /// # Example
573    ///
574    /// ```rust
575    /// use http_cache_tower::HttpCacheStreamingLayer;
576    /// use http_cache::StreamingManager;
577    ///
578    /// # #[tokio::main]
579    /// # async fn main() {
580    /// let streaming_manager = StreamingManager::with_temp_dir(1000).await.unwrap();
581    /// let layer = HttpCacheStreamingLayer::new(streaming_manager);
582    /// # }
583    /// ```
584    pub fn new(cache_manager: CM) -> Self {
585        Self {
586            cache: Arc::new(HttpStreamingCache {
587                mode: CacheMode::Default,
588                manager: cache_manager,
589                options: HttpCacheOptions::default(),
590            }),
591        }
592    }
593
594    /// Create a new HTTP cache streaming layer with custom options.
595    ///
596    /// Uses [`CacheMode::Default`] but allows customizing cache behavior.
597    ///
598    /// # Arguments
599    ///
600    /// * `cache_manager` - The streaming cache manager to use
601    /// * `options` - Custom cache options
602    ///
603    /// # Example
604    ///
605    /// ```rust
606    /// use http_cache_tower::HttpCacheStreamingLayer;
607    /// use http_cache::{StreamingManager, HttpCacheOptions};
608    ///
609    /// # #[tokio::main]
610    /// # async fn main() {
611    /// let streaming_manager = StreamingManager::with_temp_dir(1000).await.unwrap();
612    ///
613    /// let options = HttpCacheOptions {
614    ///     cache_key: Some(std::sync::Arc::new(|req: &http::request::Parts| {
615    ///         format!("stream:{}:{}", req.method, req.uri)
616    ///     })),
617    ///     ..Default::default()
618    /// };
619    ///
620    /// let layer = HttpCacheStreamingLayer::with_options(streaming_manager, options);
621    /// # }
622    /// ```
623    pub fn with_options(cache_manager: CM, options: HttpCacheOptions) -> Self {
624        Self {
625            cache: Arc::new(HttpStreamingCache {
626                mode: CacheMode::Default,
627                manager: cache_manager,
628                options,
629            }),
630        }
631    }
632
633    /// Create a new HTTP cache streaming layer with a pre-configured cache.
634    ///
635    /// This method gives you full control over the streaming cache configuration.
636    ///
637    /// # Arguments
638    ///
639    /// * `cache` - A fully configured HttpStreamingCache instance
640    ///
641    /// # Example
642    ///
643    /// ```rust
644    /// use http_cache_tower::HttpCacheStreamingLayer;
645    /// use http_cache::{StreamingManager, HttpStreamingCache, CacheMode, HttpCacheOptions};
646    ///
647    /// # #[tokio::main]
648    /// # async fn main() {
649    /// let streaming_manager = StreamingManager::with_temp_dir(1000).await.unwrap();
650    ///
651    /// let cache = HttpStreamingCache {
652    ///     mode: CacheMode::ForceCache,
653    ///     manager: streaming_manager,
654    ///     options: HttpCacheOptions::default(),
655    /// };
656    ///
657    /// let layer = HttpCacheStreamingLayer::with_cache(cache);
658    /// # }
659    /// ```
660    pub fn with_cache(cache: HttpStreamingCache<CM>) -> Self {
661        Self { cache: Arc::new(cache) }
662    }
663}
664
665impl<S, CM> Layer<S> for HttpCacheLayer<CM>
666where
667    CM: CacheManager,
668{
669    type Service = HttpCacheService<S, CM>;
670
671    fn layer(&self, inner: S) -> Self::Service {
672        HttpCacheService { inner, cache: self.cache.clone() }
673    }
674}
675
676#[cfg(feature = "streaming")]
677impl<S, CM> Layer<S> for HttpCacheStreamingLayer<CM>
678where
679    CM: StreamingCacheManager,
680{
681    type Service = HttpCacheStreamingService<S, CM>;
682
683    fn layer(&self, inner: S) -> Self::Service {
684        HttpCacheStreamingService { inner, cache: self.cache.clone() }
685    }
686}
687
688/// HTTP cache service for Tower/Hyper
689pub struct HttpCacheService<S, CM>
690where
691    CM: CacheManager,
692{
693    inner: S,
694    cache: Arc<HttpCache<CM>>,
695}
696
697impl<S, CM> Clone for HttpCacheService<S, CM>
698where
699    S: Clone,
700    CM: CacheManager,
701{
702    fn clone(&self) -> Self {
703        Self { inner: self.inner.clone(), cache: self.cache.clone() }
704    }
705}
706
707/// HTTP cache streaming service for Tower/Hyper
708#[cfg(feature = "streaming")]
709pub struct HttpCacheStreamingService<S, CM>
710where
711    CM: StreamingCacheManager,
712{
713    inner: S,
714    cache: Arc<HttpStreamingCache<CM>>,
715}
716
717#[cfg(feature = "streaming")]
718impl<S, CM> Clone for HttpCacheStreamingService<S, CM>
719where
720    S: Clone,
721    CM: StreamingCacheManager,
722{
723    fn clone(&self) -> Self {
724        Self { inner: self.inner.clone(), cache: self.cache.clone() }
725    }
726}
727
728impl<S, CM, ReqBody, ResBody> Service<Request<ReqBody>>
729    for HttpCacheService<S, CM>
730where
731    S: Service<Request<ReqBody>, Response = Response<ResBody>>
732        + Clone
733        + Send
734        + 'static,
735    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
736    S::Future: Send + 'static,
737    ReqBody: Body + Send + 'static,
738    ReqBody::Data: Send,
739    ReqBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
740    ResBody: Body + Send + 'static,
741    ResBody::Data: Send,
742    ResBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
743    CM: CacheManager,
744{
745    type Response = Response<HttpCacheBody<ResBody>>;
746    type Error = HttpCacheError;
747    type Future = Pin<
748        Box<
749            dyn std::future::Future<
750                    Output = Result<Self::Response, Self::Error>,
751                > + Send,
752        >,
753    >;
754
755    fn poll_ready(
756        &mut self,
757        cx: &mut Context<'_>,
758    ) -> Poll<Result<(), Self::Error>> {
759        self.inner.poll_ready(cx).map_err(|e| HttpCacheError::http(e.into()))
760    }
761
762    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
763        let cache = self.cache.clone();
764        let (parts, body) = req.into_parts();
765        let inner_service = self.inner.clone();
766
767        Box::pin(async move {
768            let middleware = TowerMiddleware {
769                parts: parts.clone(),
770                body: Some(body),
771                service: Some(inner_service),
772            };
773
774            let can_cache = cache.can_cache_request(&middleware).cache_err()?;
775
776            if can_cache {
777                // Delegate the full cache orchestration (mode dispatch,
778                // conditional revalidation, 304/5xx handling, warning
779                // headers, rate limiting, cache busting) to the core.
780                let res = cache.run(middleware).await.cache_err()?;
781                http_response_to_tower_response(res)
782            } else {
783                // Not cacheable -- forward directly, then invalidate on
784                // success (RFC 7234 Section 4.4).
785                let parts_for_invalidation = middleware.parts().cache_err()?;
786
787                // Reconstruct the request from the middleware's parts.
788                let body = middleware.body.ok_or_else(|| {
789                    HttpCacheError::cache(
790                        "request body already consumed".to_string(),
791                    )
792                })?;
793                let service = middleware.service.ok_or_else(|| {
794                    HttpCacheError::cache(
795                        "inner service already consumed".to_string(),
796                    )
797                })?;
798                let req = Request::from_parts(parts, body);
799
800                let response = service.oneshot(req).await.map_err(|e| {
801                    let boxed: Box<dyn std::error::Error + Send + Sync> =
802                        e.into();
803                    HttpCacheError::http(boxed)
804                })?;
805
806                // Only invalidate for unsafe methods after successful response (RFC 7234 s4.4)
807                if !parts_for_invalidation.method.is_safe()
808                    && (response.status().is_success()
809                        || response.status().is_redirection())
810                {
811                    cache
812                        .run_no_cache_from_parts(&parts_for_invalidation)
813                        .await
814                        .cache_err()?;
815                }
816
817                let mut response = response.map(HttpCacheBody::Original);
818
819                if cache.options.cache_status_headers {
820                    response = add_cache_status_headers(
821                        response,
822                        HitOrMiss::MISS.to_string().as_ref(),
823                        HitOrMiss::MISS.to_string().as_ref(),
824                    );
825                }
826
827                Ok(response)
828            }
829        })
830    }
831}
832
833// Hyper service implementation for HttpCacheService
834impl<S, CM> hyper::service::Service<Request<hyper::body::Incoming>>
835    for HttpCacheService<S, CM>
836where
837    S: Service<
838            Request<hyper::body::Incoming>,
839            Response = Response<http_body_util::Full<Bytes>>,
840        > + Clone
841        + Send
842        + 'static,
843    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
844    S::Future: Send + 'static,
845    CM: CacheManager,
846{
847    type Response = Response<HttpCacheBody<http_body_util::Full<Bytes>>>;
848    type Error = HttpCacheError;
849    type Future = Pin<
850        Box<
851            dyn std::future::Future<
852                    Output = Result<Self::Response, Self::Error>,
853                > + Send,
854        >,
855    >;
856
857    fn call(&self, req: Request<hyper::body::Incoming>) -> Self::Future {
858        // Delegate to the Tower Service impl, which takes &mut self
859        let mut service_clone = self.clone();
860        Box::pin(
861            async move { tower::Service::call(&mut service_clone, req).await },
862        )
863    }
864}
865
866#[cfg(feature = "streaming")]
867impl<S, CM, ReqBody, ResBody> Service<Request<ReqBody>>
868    for HttpCacheStreamingService<S, CM>
869where
870    S: Service<Request<ReqBody>, Response = Response<ResBody>>
871        + Clone
872        + Send
873        + 'static,
874    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
875    S::Future: Send + 'static,
876    ReqBody: Body + Send + 'static,
877    ReqBody::Data: Send,
878    ReqBody::Error: Into<StreamingError>,
879    ResBody: Body + Send + 'static,
880    ResBody::Data: Send,
881    ResBody::Error: Into<StreamingError>,
882    CM: StreamingCacheManager,
883    <CM::Body as http_body::Body>::Data: Send,
884    <CM::Body as http_body::Body>::Error:
885        Into<StreamingError> + Send + Sync + 'static,
886{
887    type Response = Response<CM::Body>;
888    type Error = HttpCacheError;
889    type Future = Pin<
890        Box<
891            dyn std::future::Future<
892                    Output = Result<Self::Response, Self::Error>,
893                > + Send,
894        >,
895    >;
896
897    fn poll_ready(
898        &mut self,
899        cx: &mut Context<'_>,
900    ) -> Poll<Result<(), Self::Error>> {
901        self.inner.poll_ready(cx).map_err(|e| HttpCacheError::http(e.into()))
902    }
903
904    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
905        let cache = self.cache.clone();
906        let (parts, body) = req.into_parts();
907        let inner_service = self.inner.clone();
908
909        Box::pin(async move {
910            // Check whether this request is cacheable.  Non-cacheable
911            // requests (e.g. POST/PUT/DELETE) are forwarded directly and
912            // only trigger cache invalidation on success.
913            let can_cache =
914                cache.can_cache_request(&parts, None).cache_err()?;
915
916            if !can_cache {
917                // Forward the request without cache orchestration.
918                let req = Request::from_parts(parts.clone(), body);
919                let response =
920                    inner_service.oneshot(req).await.map_err(|e| {
921                        let boxed: Box<dyn std::error::Error + Send + Sync> =
922                            e.into();
923                        HttpCacheError::http(boxed)
924                    })?;
925
926                // Only invalidate for unsafe methods after successful response (RFC 7234 s4.4)
927                if !parts.method.is_safe()
928                    && (response.status().is_success()
929                        || response.status().is_redirection())
930                {
931                    cache.run_no_cache(&parts).await.cache_err()?;
932                }
933
934                let mut converted =
935                    cache.manager.convert_body(response).await.cache_err()?;
936
937                if cache.options.cache_status_headers {
938                    converted = add_cache_status_headers_streaming(
939                        converted, "MISS", "MISS",
940                    );
941                }
942
943                return Ok(converted);
944            }
945
946            // Delegate the full cache orchestration (analyse, lookup,
947            // conditional revalidation, 304/200/5xx handling, rate
948            // limiting, warning headers, cache busting) to the core
949            // library.
950            //
951            // The closure is `FnOnce` and called at most once.
952            // We move `body` and `inner_service` directly into the
953            // closure.
954            let result = cache
955                .run(&parts, None, |fetch_req| {
956                    let parts_ref = parts.clone();
957                    async move {
958                        let request_parts = match fetch_req {
959                            http_cache::FetchRequest::Fresh => parts_ref,
960                            http_cache::FetchRequest::FreshNoCache => {
961                                let mut p = parts_ref;
962                                p.headers.insert(
963                                    CACHE_CONTROL,
964                                    HeaderValue::from_static("no-cache"),
965                                );
966                                p
967                            }
968                            http_cache::FetchRequest::Conditional(
969                                cond_parts,
970                            ) => *cond_parts,
971                        };
972
973                        let req = Request::from_parts(request_parts, body);
974
975                        inner_service.oneshot(req).await.map_err(|e| {
976                            let boxed: Box<
977                                dyn std::error::Error + Send + Sync,
978                            > = e.into();
979                            boxed
980                        })
981                    }
982                })
983                .await
984                .cache_err()?;
985
986            Ok(result)
987        })
988    }
989}
990
991/// Body type that wraps cached responses  
992pub enum HttpCacheBody<B> {
993    /// Buffered body from cache
994    Buffered(Vec<u8>),
995    /// Original body (fallback)
996    Original(B),
997}
998
999impl<B> Body for HttpCacheBody<B>
1000where
1001    B: Body + Unpin,
1002    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
1003    B::Data: Into<bytes::Bytes>,
1004{
1005    type Data = bytes::Bytes;
1006    type Error = Box<dyn std::error::Error + Send + Sync>;
1007
1008    fn poll_frame(
1009        mut self: Pin<&mut Self>,
1010        cx: &mut Context<'_>,
1011    ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
1012        match &mut *self {
1013            HttpCacheBody::Buffered(bytes) => {
1014                if bytes.is_empty() {
1015                    Poll::Ready(None)
1016                } else {
1017                    let data = std::mem::take(bytes);
1018                    Poll::Ready(Some(Ok(http_body::Frame::data(
1019                        bytes::Bytes::from(data),
1020                    ))))
1021                }
1022            }
1023            HttpCacheBody::Original(body) => {
1024                Pin::new(body).poll_frame(cx).map(|opt| {
1025                    opt.map(|res| {
1026                        res.map(|frame| frame.map_data(Into::into))
1027                            .map_err(Into::into)
1028                    })
1029                })
1030            }
1031        }
1032    }
1033
1034    fn is_end_stream(&self) -> bool {
1035        match self {
1036            HttpCacheBody::Buffered(bytes) => bytes.is_empty(),
1037            HttpCacheBody::Original(body) => body.is_end_stream(),
1038        }
1039    }
1040
1041    fn size_hint(&self) -> http_body::SizeHint {
1042        match self {
1043            HttpCacheBody::Buffered(bytes) => {
1044                let len = bytes.len() as u64;
1045                http_body::SizeHint::with_exact(len)
1046            }
1047            HttpCacheBody::Original(body) => body.size_hint(),
1048        }
1049    }
1050}
1051
1052#[cfg(test)]
1053mod test;