Skip to main content

http_cache_tower/
lib.rs

1//! HTTP caching middleware for Tower services and Axum applications.
2//!
3//! This crate provides Tower layers that implement HTTP caching according to RFC 7234.
4//! It supports both traditional buffered caching and streaming responses for large payloads.
5//!
6//! ## Basic Usage
7//!
8//! ### With Tower Services
9//!
10//! ```rust,no_run
11//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
12//! use http_cache::{CacheMode, HttpCache, HttpCacheOptions};
13//! use tower::ServiceBuilder;
14//! use tower::service_fn;
15//! use tower::ServiceExt;
16//! use http::{Request, Response};
17//! use http_body_util::Full;
18//! use bytes::Bytes;
19//! use std::convert::Infallible;
20//!
21//! async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
22//!     Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
23//! }
24//!
25//! #[tokio::main]
26//! async fn main() {
27//!     // Create cache manager with disk storage
28//!     let cache_manager = CACacheManager::new("./cache".into(), true);
29//!     
30//!     // Create cache layer
31//!     let cache_layer = HttpCacheLayer::new(cache_manager);
32//!     
33//!     // Build service with caching
34//!     let service = ServiceBuilder::new()
35//!         .layer(cache_layer)
36//!         .service_fn(handler);
37//!     
38//!     // Use the service
39//!     let request = Request::builder()
40//!         .uri("http://example.com")
41//!         .body(Full::new(Bytes::new()))
42//!         .unwrap();
43//!     let response = service.oneshot(request).await.unwrap();
44//! }
45//! ```
46//!
47//! ### With Custom Cache Configuration
48//!
49//! ```rust
50//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
51//! use http_cache::{CacheMode, HttpCache, HttpCacheOptions};
52//!
53//! # #[tokio::main]
54//! # async fn main() {
55//! // Create cache manager
56//! let cache_manager = CACacheManager::new("./cache".into(), true);
57//!
58//! // Configure cache behavior
59//! let cache = HttpCache {
60//!     mode: CacheMode::Default,
61//!     manager: cache_manager,
62//!     options: HttpCacheOptions::default(),
63//! };
64//!
65//! // Create layer with custom cache
66//! let cache_layer = HttpCacheLayer::with_cache(cache);
67//! # }
68//! ```
69//!
70//! ### Streaming Support
71//!
72//! For handling large responses without buffering, use `StreamingManager`:
73//!
74//! ```rust
75//! use http_cache_tower::HttpCacheStreamingLayer;
76//! use http_cache::StreamingManager;
77//!
78//! # #[tokio::main]
79//! # async fn main() {
80//! // Create streaming cache setup
81//! let streaming_manager = StreamingManager::with_temp_dir(1000).await.unwrap();
82//! let streaming_layer = HttpCacheStreamingLayer::new(streaming_manager);
83//!
84//! // Use with your service
85//! // let service = streaming_layer.layer(your_service);
86//! # }
87//! ```
88//!
89//! ## Cache Modes
90//!
91//! Different cache modes provide different behaviors:
92//!
93//! - `CacheMode::Default`: Follow HTTP caching rules strictly
94//! - `CacheMode::NoStore`: Never cache responses
95//! - `CacheMode::NoCache`: Always revalidate with the origin server
96//! - `CacheMode::ForceCache`: Cache responses even if headers suggest otherwise
97//! - `CacheMode::OnlyIfCached`: Only serve from cache, never hit origin server
98//! - `CacheMode::IgnoreRules`: Cache everything regardless of headers
99//!
100//! ## Cache Invalidation
101//!
102//! The middleware automatically handles cache invalidation for unsafe HTTP methods:
103//!
104//! ```text
105//! These methods will invalidate any cached GET response for the same URI:
106//! - PUT /api/users/123    -> invalidates GET /api/users/123
107//! - POST /api/users/123   -> invalidates GET /api/users/123  
108//! - DELETE /api/users/123 -> invalidates GET /api/users/123
109//! - PATCH /api/users/123  -> invalidates GET /api/users/123
110//! ```
111//!
112//! ## Integration with Other Tower Layers
113//!
114//! The cache layer works with other Tower middleware:
115//!
116//! ```rust,no_run
117//! use tower::ServiceBuilder;
118//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
119//! use tower::service_fn;
120//! use tower::ServiceExt;
121//! use http::{Request, Response};
122//! use http_body_util::Full;
123//! use bytes::Bytes;
124//! use std::convert::Infallible;
125//!
126//! async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
127//!     Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
128//! }
129//!
130//! #[tokio::main]
131//! async fn main() {
132//!     let cache_manager = CACacheManager::new("./cache".into(), true);
133//!     let cache_layer = HttpCacheLayer::new(cache_manager);
134//!
135//!     let service = ServiceBuilder::new()
136//!         // .layer(TraceLayer::new_for_http())  // Logging (requires tower-http)
137//!         // .layer(CompressionLayer::new())     // Compression (requires tower-http)
138//!         .layer(cache_layer)                    // Caching
139//!         .service_fn(handler);
140//!     
141//!     // Use the service
142//!     let request = Request::builder()
143//!         .uri("http://example.com")
144//!         .body(Full::new(Bytes::new()))
145//!         .unwrap();
146//!     let response = service.oneshot(request).await.unwrap();
147//! }
148//! ```
149
150use bytes::Bytes;
151use http::{HeaderValue, Request, Response};
152use http_body::Body;
153use http_body_util::BodyExt;
154
155#[cfg(feature = "manager-cacache")]
156pub use http_cache::CACacheManager;
157
158#[cfg(feature = "rate-limiting")]
159pub use http_cache::rate_limiting::{
160    CacheAwareRateLimiter, DirectRateLimiter, DomainRateLimiter, Quota,
161};
162#[cfg(feature = "streaming")]
163use http_cache::StreamingError;
164use http_cache::{
165    CacheManager, CacheMode, HttpCache, HttpCacheInterface, HttpCacheOptions,
166};
167#[cfg(feature = "streaming")]
168use http_cache::{
169    HttpCacheStreamInterface, HttpStreamingCache, StreamingCacheManager,
170};
171use std::{
172    pin::Pin,
173    sync::Arc,
174    task::{Context, Poll},
175};
176use tower::{Layer, Service, ServiceExt};
177
178// Re-export unified error types from http-cache core
179pub use http_cache::HttpCacheError;
180
181#[cfg(feature = "streaming")]
182/// Type alias for tower streaming errors, using the unified streaming error system
183pub type TowerStreamingError = http_cache::ClientStreamingError;
184
185/// Helper functions for error conversions
186trait HttpCacheErrorExt<T> {
187    fn cache_err(self) -> Result<T, HttpCacheError>;
188}
189
190impl<T, E> HttpCacheErrorExt<T> for Result<T, E>
191where
192    E: ToString,
193{
194    fn cache_err(self) -> Result<T, HttpCacheError> {
195        self.map_err(|e| HttpCacheError::cache(e.to_string()))
196    }
197}
198
199/// Helper function to collect a body into bytes
200async fn collect_body<B>(body: B) -> Result<Vec<u8>, B::Error>
201where
202    B: Body,
203{
204    let collected = BodyExt::collect(body).await?;
205    Ok(collected.to_bytes().to_vec())
206}
207
208/// Helper function to add cache status headers to a response
209fn add_cache_status_headers<B>(
210    mut response: Response<HttpCacheBody<B>>,
211    hit_or_miss: &str,
212    cache_lookup: &str,
213) -> Response<HttpCacheBody<B>> {
214    let headers = response.headers_mut();
215    headers.insert(
216        http_cache::XCACHE,
217        HeaderValue::from_str(hit_or_miss).unwrap(),
218    );
219    headers.insert(
220        http_cache::XCACHELOOKUP,
221        HeaderValue::from_str(cache_lookup).unwrap(),
222    );
223    response
224}
225
226#[cfg(feature = "streaming")]
227fn add_cache_status_headers_streaming<B>(
228    mut response: Response<B>,
229    hit_or_miss: &str,
230    cache_lookup: &str,
231) -> Response<B> {
232    let headers = response.headers_mut();
233    headers.insert(
234        http_cache::XCACHE,
235        HeaderValue::from_str(hit_or_miss).unwrap(),
236    );
237    headers.insert(
238        http_cache::XCACHELOOKUP,
239        HeaderValue::from_str(cache_lookup).unwrap(),
240    );
241    response
242}
243
244/// HTTP cache layer for Tower services.
245///
246/// This layer implements HTTP caching according to RFC 7234, automatically caching
247/// GET and HEAD responses based on their cache-control headers and invalidating
248/// cache entries when unsafe methods (PUT, POST, DELETE, PATCH) are used.
249///
250/// # Example
251///
252/// ```rust
253/// use http_cache_tower::{HttpCacheLayer, CACacheManager};
254/// use tower::ServiceBuilder;
255/// use tower::service_fn;
256/// use http::{Request, Response};
257/// use http_body_util::Full;
258/// use bytes::Bytes;
259/// use std::convert::Infallible;
260///
261/// # #[tokio::main]
262/// # async fn main() {
263/// let cache_manager = CACacheManager::new("./cache".into(), true);
264/// let cache_layer = HttpCacheLayer::new(cache_manager);
265///
266/// // Use with ServiceBuilder
267/// let service = ServiceBuilder::new()
268///     .layer(cache_layer)
269///     .service_fn(|_req: Request<Full<Bytes>>| async {
270///         Ok::<_, Infallible>(Response::new(Full::new(Bytes::from("Hello"))))
271///     });
272/// # }
273/// ```
274#[derive(Clone)]
275pub struct HttpCacheLayer<CM>
276where
277    CM: CacheManager,
278{
279    cache: Arc<HttpCache<CM>>,
280}
281
282impl<CM> HttpCacheLayer<CM>
283where
284    CM: CacheManager,
285{
286    /// Create a new HTTP cache layer with default configuration.
287    ///
288    /// Uses [`CacheMode::Default`] and default [`HttpCacheOptions`].
289    ///
290    /// # Arguments
291    ///
292    /// * `cache_manager` - The cache manager to use for storing responses
293    ///
294    /// # Example
295    ///
296    /// ```rust
297    /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
298    ///
299    /// # #[tokio::main]
300    /// # async fn main() {
301    /// let cache_manager = CACacheManager::new("./cache".into(), true);
302    /// let layer = HttpCacheLayer::new(cache_manager);
303    /// # }
304    /// ```
305    pub fn new(cache_manager: CM) -> Self {
306        Self {
307            cache: Arc::new(HttpCache {
308                mode: CacheMode::Default,
309                manager: cache_manager,
310                options: HttpCacheOptions::default(),
311            }),
312        }
313    }
314
315    /// Create a new HTTP cache layer with custom options.
316    ///
317    /// Uses [`CacheMode::Default`] but allows customizing the cache behavior
318    /// through [`HttpCacheOptions`].
319    ///
320    /// # Arguments
321    ///
322    /// * `cache_manager` - The cache manager to use for storing responses
323    /// * `options` - Custom cache options
324    ///
325    /// # Example
326    ///
327    /// ```rust
328    /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
329    /// use http_cache::HttpCacheOptions;
330    ///
331    /// # #[tokio::main]
332    /// # async fn main() {
333    /// let cache_manager = CACacheManager::new("./cache".into(), true);
334    ///
335    /// let options = HttpCacheOptions {
336    ///     cache_key: Some(std::sync::Arc::new(|req: &http::request::Parts| {
337    ///         format!("custom:{}:{}", req.method, req.uri)
338    ///     })),
339    ///     ..Default::default()
340    /// };
341    ///
342    /// let layer = HttpCacheLayer::with_options(cache_manager, options);
343    /// # }
344    /// ```
345    pub fn with_options(cache_manager: CM, options: HttpCacheOptions) -> Self {
346        Self {
347            cache: Arc::new(HttpCache {
348                mode: CacheMode::Default,
349                manager: cache_manager,
350                options,
351            }),
352        }
353    }
354
355    /// Create a new HTTP cache layer with a pre-configured cache.
356    ///
357    /// This method gives you full control over the cache configuration,
358    /// including the cache mode.
359    ///
360    /// # Arguments
361    ///
362    /// * `cache` - A fully configured HttpCache instance
363    ///
364    /// # Example
365    ///
366    /// ```rust
367    /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
368    /// use http_cache::{HttpCache, CacheMode, HttpCacheOptions};
369    ///
370    /// # #[tokio::main]
371    /// # async fn main() {
372    /// let cache_manager = CACacheManager::new("./cache".into(), true);
373    ///
374    /// let cache = HttpCache {
375    ///     mode: CacheMode::ForceCache,
376    ///     manager: cache_manager,
377    ///     options: HttpCacheOptions::default(),
378    /// };
379    ///
380    /// let layer = HttpCacheLayer::with_cache(cache);
381    /// # }
382    /// ```
383    pub fn with_cache(cache: HttpCache<CM>) -> Self {
384        Self { cache: Arc::new(cache) }
385    }
386}
387
388/// HTTP cache layer with streaming support for Tower services.
389///
390/// This layer provides the same HTTP caching functionality as [`HttpCacheLayer`]
391/// but handles streaming responses. It can work with large
392/// responses without buffering them entirely in memory.
393///
394/// # Example
395///
396/// ```rust
397/// use http_cache_tower::HttpCacheStreamingLayer;
398/// use http_cache::StreamingManager;
399/// use tower::ServiceBuilder;
400/// use tower::service_fn;
401/// use http::{Request, Response};
402/// use http_body_util::Full;
403/// use bytes::Bytes;
404/// use std::convert::Infallible;
405///
406/// async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
407///     Ok(Response::new(Full::new(Bytes::from("Hello"))))
408/// }
409///
410/// # #[tokio::main]
411/// # async fn main() {
412/// let streaming_manager = StreamingManager::with_temp_dir(1000).await.unwrap();
413/// let streaming_layer = HttpCacheStreamingLayer::new(streaming_manager);
414///
415/// // Use with ServiceBuilder
416/// let service = ServiceBuilder::new()
417///     .layer(streaming_layer)
418///     .service_fn(handler);
419/// # }
420/// ```
421#[cfg(feature = "streaming")]
422#[derive(Clone)]
423pub struct HttpCacheStreamingLayer<CM>
424where
425    CM: StreamingCacheManager,
426{
427    cache: Arc<HttpStreamingCache<CM>>,
428}
429
430#[cfg(feature = "streaming")]
431impl<CM> HttpCacheStreamingLayer<CM>
432where
433    CM: StreamingCacheManager,
434{
435    /// Create a new HTTP cache streaming layer with default configuration.
436    ///
437    /// Uses [`CacheMode::Default`] and default [`HttpCacheOptions`].
438    ///
439    /// # Arguments
440    ///
441    /// * `cache_manager` - The streaming cache manager to use
442    ///
443    /// # Example
444    ///
445    /// ```rust
446    /// use http_cache_tower::HttpCacheStreamingLayer;
447    /// use http_cache::StreamingManager;
448    ///
449    /// # #[tokio::main]
450    /// # async fn main() {
451    /// let streaming_manager = StreamingManager::with_temp_dir(1000).await.unwrap();
452    /// let layer = HttpCacheStreamingLayer::new(streaming_manager);
453    /// # }
454    /// ```
455    pub fn new(cache_manager: CM) -> Self {
456        Self {
457            cache: Arc::new(HttpStreamingCache {
458                mode: CacheMode::Default,
459                manager: cache_manager,
460                options: HttpCacheOptions::default(),
461            }),
462        }
463    }
464
465    /// Create a new HTTP cache streaming layer with custom options.
466    ///
467    /// Uses [`CacheMode::Default`] but allows customizing cache behavior.
468    ///
469    /// # Arguments
470    ///
471    /// * `cache_manager` - The streaming cache manager to use
472    /// * `options` - Custom cache options
473    ///
474    /// # Example
475    ///
476    /// ```rust
477    /// use http_cache_tower::HttpCacheStreamingLayer;
478    /// use http_cache::{StreamingManager, HttpCacheOptions};
479    ///
480    /// # #[tokio::main]
481    /// # async fn main() {
482    /// let streaming_manager = StreamingManager::with_temp_dir(1000).await.unwrap();
483    ///
484    /// let options = HttpCacheOptions {
485    ///     cache_key: Some(std::sync::Arc::new(|req: &http::request::Parts| {
486    ///         format!("stream:{}:{}", req.method, req.uri)
487    ///     })),
488    ///     ..Default::default()
489    /// };
490    ///
491    /// let layer = HttpCacheStreamingLayer::with_options(streaming_manager, options);
492    /// # }
493    /// ```
494    pub fn with_options(cache_manager: CM, options: HttpCacheOptions) -> Self {
495        Self {
496            cache: Arc::new(HttpStreamingCache {
497                mode: CacheMode::Default,
498                manager: cache_manager,
499                options,
500            }),
501        }
502    }
503
504    /// Create a new HTTP cache streaming layer with a pre-configured cache.
505    ///
506    /// This method gives you full control over the streaming cache configuration.
507    ///
508    /// # Arguments
509    ///
510    /// * `cache` - A fully configured HttpStreamingCache instance
511    ///
512    /// # Example
513    ///
514    /// ```rust
515    /// use http_cache_tower::HttpCacheStreamingLayer;
516    /// use http_cache::{StreamingManager, HttpStreamingCache, CacheMode, HttpCacheOptions};
517    ///
518    /// # #[tokio::main]
519    /// # async fn main() {
520    /// let streaming_manager = StreamingManager::with_temp_dir(1000).await.unwrap();
521    ///
522    /// let cache = HttpStreamingCache {
523    ///     mode: CacheMode::ForceCache,
524    ///     manager: streaming_manager,
525    ///     options: HttpCacheOptions::default(),
526    /// };
527    ///
528    /// let layer = HttpCacheStreamingLayer::with_cache(cache);
529    /// # }
530    /// ```
531    pub fn with_cache(cache: HttpStreamingCache<CM>) -> Self {
532        Self { cache: Arc::new(cache) }
533    }
534}
535
536impl<S, CM> Layer<S> for HttpCacheLayer<CM>
537where
538    CM: CacheManager,
539{
540    type Service = HttpCacheService<S, CM>;
541
542    fn layer(&self, inner: S) -> Self::Service {
543        HttpCacheService { inner, cache: self.cache.clone() }
544    }
545}
546
547#[cfg(feature = "streaming")]
548impl<S, CM> Layer<S> for HttpCacheStreamingLayer<CM>
549where
550    CM: StreamingCacheManager,
551{
552    type Service = HttpCacheStreamingService<S, CM>;
553
554    fn layer(&self, inner: S) -> Self::Service {
555        HttpCacheStreamingService { inner, cache: self.cache.clone() }
556    }
557}
558
559/// HTTP cache service for Tower/Hyper
560pub struct HttpCacheService<S, CM>
561where
562    CM: CacheManager,
563{
564    inner: S,
565    cache: Arc<HttpCache<CM>>,
566}
567
568impl<S, CM> Clone for HttpCacheService<S, CM>
569where
570    S: Clone,
571    CM: CacheManager,
572{
573    fn clone(&self) -> Self {
574        Self { inner: self.inner.clone(), cache: self.cache.clone() }
575    }
576}
577
578/// HTTP cache streaming service for Tower/Hyper
579#[cfg(feature = "streaming")]
580pub struct HttpCacheStreamingService<S, CM>
581where
582    CM: StreamingCacheManager,
583{
584    inner: S,
585    cache: Arc<HttpStreamingCache<CM>>,
586}
587
588#[cfg(feature = "streaming")]
589impl<S, CM> Clone for HttpCacheStreamingService<S, CM>
590where
591    S: Clone,
592    CM: StreamingCacheManager,
593{
594    fn clone(&self) -> Self {
595        Self { inner: self.inner.clone(), cache: self.cache.clone() }
596    }
597}
598
599impl<S, CM, ReqBody, ResBody> Service<Request<ReqBody>>
600    for HttpCacheService<S, CM>
601where
602    S: Service<Request<ReqBody>, Response = Response<ResBody>>
603        + Clone
604        + Send
605        + 'static,
606    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
607    S::Future: Send + 'static,
608    ReqBody: Body + Send + 'static,
609    ReqBody::Data: Send,
610    ReqBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
611    ResBody: Body + Send + 'static,
612    ResBody::Data: Send,
613    ResBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
614    CM: CacheManager,
615{
616    type Response = Response<HttpCacheBody<ResBody>>;
617    type Error = HttpCacheError;
618    type Future = Pin<
619        Box<
620            dyn std::future::Future<
621                    Output = Result<Self::Response, Self::Error>,
622                > + Send,
623        >,
624    >;
625
626    fn poll_ready(
627        &mut self,
628        cx: &mut Context<'_>,
629    ) -> Poll<Result<(), Self::Error>> {
630        self.inner.poll_ready(cx).map_err(|_e| {
631            HttpCacheError::http(Box::new(std::io::Error::other(
632                "service error".to_string(),
633            )))
634        })
635    }
636
637    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
638        let cache = self.cache.clone();
639        let (parts, body) = req.into_parts();
640        let inner_service = self.inner.clone();
641
642        Box::pin(async move {
643            use http_cache_semantics::BeforeRequest;
644
645            // Use the core library's cache interface for analysis
646            let analysis = cache.analyze_request(&parts, None).cache_err()?;
647
648            // If not cacheable, execute request FIRST, then invalidate on success (RFC 7234 Section 4.4)
649            if !analysis.should_cache {
650                let req = Request::from_parts(parts, body);
651                let response =
652                    inner_service.oneshot(req).await.map_err(|_e| {
653                        HttpCacheError::http(Box::new(std::io::Error::other(
654                            "service error".to_string(),
655                        )))
656                    })?;
657
658                // Only invalidate AFTER successful response (RFC 7234 Section 4.4)
659                if response.status().is_success()
660                    || response.status().is_redirection()
661                {
662                    for key in &analysis.cache_bust_keys {
663                        let _ = cache.manager.delete(key).await;
664                    }
665                    // Invalidate both GET and HEAD caches per RFC 7234 Section 4.4
666                    if !analysis.is_get_head {
667                        let get_cache_key =
668                            cache.options.create_cache_key_for_invalidation(
669                                &analysis.request_parts,
670                                "GET",
671                            );
672                        let _ = cache.manager.delete(&get_cache_key).await;
673                        let head_cache_key =
674                            cache.options.create_cache_key_for_invalidation(
675                                &analysis.request_parts,
676                                "HEAD",
677                            );
678                        let _ = cache.manager.delete(&head_cache_key).await;
679                    }
680                }
681
682                return Ok(response.map(HttpCacheBody::Original));
683            }
684
685            // Special case for Reload mode: skip cache lookup but still cache response
686            if analysis.cache_mode == CacheMode::Reload {
687                let req = Request::from_parts(parts, body);
688                let response =
689                    inner_service.oneshot(req).await.map_err(|_e| {
690                        HttpCacheError::http(Box::new(std::io::Error::other(
691                            "service error".to_string(),
692                        )))
693                    })?;
694
695                let (res_parts, res_body) = response.into_parts();
696                let body_bytes =
697                    collect_body(res_body).await.map_err(|_e| {
698                        HttpCacheError::http(Box::new(std::io::Error::other(
699                            "service error".to_string(),
700                        )))
701                    })?;
702
703                let cached_response = cache
704                    .process_response(
705                        analysis,
706                        Response::from_parts(res_parts, body_bytes.clone()),
707                        None,
708                    )
709                    .await
710                    .cache_err()?;
711
712                return Ok(cached_response.map(HttpCacheBody::Buffered));
713            }
714
715            // Look up cached response using interface
716            if let Some((cached_response, policy)) = cache
717                .lookup_cached_response(&analysis.cache_key)
718                .await
719                .cache_err()?
720            {
721                let before_req =
722                    policy.before_request(&parts, std::time::SystemTime::now());
723                match before_req {
724                    BeforeRequest::Fresh(_) => {
725                        // Return cached response
726                        let mut response = http_cache::HttpCacheOptions::http_response_to_response(
727                            &cached_response,
728                            HttpCacheBody::Buffered(cached_response.body.clone()),
729                        ).map_err(HttpCacheError::other)?;
730
731                        // Add cache status headers if enabled
732                        if cache.options.cache_status_headers {
733                            response = add_cache_status_headers(
734                                response, "HIT", "HIT",
735                            );
736                        }
737
738                        // Insert metadata into response extensions if present
739                        if let Some(metadata) = cached_response.metadata {
740                            response.extensions_mut().insert(
741                                http_cache::HttpCacheMetadata::from(metadata),
742                            );
743                        }
744
745                        return Ok(response);
746                    }
747                    BeforeRequest::Stale {
748                        request: conditional_parts, ..
749                    } => {
750                        // Make conditional request
751                        let conditional_req =
752                            Request::from_parts(conditional_parts, body);
753                        let conditional_response = inner_service
754                            .oneshot(conditional_req)
755                            .await
756                            .map_err(|_e| {
757                                HttpCacheError::http(Box::new(
758                                    std::io::Error::other(
759                                        "service error".to_string(),
760                                    ),
761                                ))
762                            })?;
763
764                        if conditional_response.status() == 304 {
765                            // Use cached response with updated headers
766                            let (fresh_parts, _) =
767                                conditional_response.into_parts();
768                            let updated_response = cache
769                                .handle_not_modified(
770                                    cached_response,
771                                    &fresh_parts,
772                                )
773                                .await
774                                .cache_err()?;
775
776                            let mut response = http_cache::HttpCacheOptions::http_response_to_response(
777                                &updated_response,
778                                HttpCacheBody::Buffered(updated_response.body.clone()),
779                            ).map_err(HttpCacheError::other)?;
780
781                            // Add cache status headers if enabled
782                            if cache.options.cache_status_headers {
783                                response = add_cache_status_headers(
784                                    response, "HIT", "HIT",
785                                );
786                            }
787
788                            // Insert metadata into response extensions if present
789                            if let Some(metadata) = updated_response.metadata {
790                                response.extensions_mut().insert(
791                                    http_cache::HttpCacheMetadata::from(
792                                        metadata,
793                                    ),
794                                );
795                            }
796
797                            return Ok(response);
798                        } else {
799                            // Process fresh response
800                            let (parts, res_body) =
801                                conditional_response.into_parts();
802                            let body_bytes =
803                                collect_body(res_body).await.map_err(|_e| {
804                                    HttpCacheError::http(Box::new(
805                                        std::io::Error::other(
806                                            "service error".to_string(),
807                                        ),
808                                    ))
809                                })?;
810
811                            let cached_response = cache
812                                .process_response(
813                                    analysis,
814                                    Response::from_parts(
815                                        parts,
816                                        body_bytes.clone(),
817                                    ),
818                                    None,
819                                )
820                                .await
821                                .cache_err()?;
822
823                            let mut response =
824                                cached_response.map(HttpCacheBody::Buffered);
825
826                            // Add cache status headers if enabled
827                            if cache.options.cache_status_headers {
828                                response = add_cache_status_headers(
829                                    response, "MISS", "MISS",
830                                );
831                            }
832
833                            return Ok(response);
834                        }
835                    }
836                }
837            }
838
839            // Fetch fresh response
840            let req = Request::from_parts(parts, body);
841            let response = inner_service.oneshot(req).await.map_err(|_e| {
842                HttpCacheError::http(Box::new(std::io::Error::other(
843                    "service error".to_string(),
844                )))
845            })?;
846
847            let (res_parts, res_body) = response.into_parts();
848            let body_bytes = collect_body(res_body).await.map_err(|_e| {
849                HttpCacheError::http(Box::new(std::io::Error::other(
850                    "service error".to_string(),
851                )))
852            })?;
853
854            // Process and cache using interface
855            let cached_response = cache
856                .process_response(
857                    analysis,
858                    Response::from_parts(res_parts, body_bytes.clone()),
859                    None,
860                )
861                .await
862                .cache_err()?;
863
864            let mut response = cached_response.map(HttpCacheBody::Buffered);
865
866            // Add cache status headers if enabled
867            if cache.options.cache_status_headers {
868                response = add_cache_status_headers(response, "MISS", "MISS");
869            }
870
871            Ok(response)
872        })
873    }
874}
875
876// Hyper service implementation for HttpCacheService
877impl<S, CM> hyper::service::Service<Request<hyper::body::Incoming>>
878    for HttpCacheService<S, CM>
879where
880    S: Service<Request<hyper::body::Incoming>> + Clone + Send + 'static,
881    S::Response: Into<Response<http_body_util::Full<Bytes>>>,
882    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
883    S::Future: Send + 'static,
884    CM: CacheManager,
885{
886    type Response = Response<HttpCacheBody<http_body_util::Full<Bytes>>>;
887    type Error = HttpCacheError;
888    type Future = Pin<
889        Box<
890            dyn std::future::Future<
891                    Output = Result<Self::Response, Self::Error>,
892                > + Send,
893        >,
894    >;
895
896    fn call(&self, _req: Request<hyper::body::Incoming>) -> Self::Future {
897        // Convert to the format expected by the generic Service implementation
898        let service_clone = self.clone();
899        Box::pin(async move { service_clone.call(_req).await })
900    }
901}
902
903#[cfg(feature = "streaming")]
904impl<S, CM, ReqBody, ResBody> Service<Request<ReqBody>>
905    for HttpCacheStreamingService<S, CM>
906where
907    S: Service<Request<ReqBody>, Response = Response<ResBody>>
908        + Clone
909        + Send
910        + 'static,
911    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
912    S::Future: Send + 'static,
913    ReqBody: Body + Send + 'static,
914    ReqBody::Data: Send,
915    ReqBody::Error: Into<StreamingError>,
916    ResBody: Body + Send + 'static,
917    ResBody::Data: Send,
918    ResBody::Error: Into<StreamingError>,
919    CM: StreamingCacheManager,
920    <CM::Body as http_body::Body>::Data: Send,
921    <CM::Body as http_body::Body>::Error:
922        Into<StreamingError> + Send + Sync + 'static,
923{
924    type Response = Response<CM::Body>;
925    type Error = HttpCacheError;
926    type Future = Pin<
927        Box<
928            dyn std::future::Future<
929                    Output = Result<Self::Response, Self::Error>,
930                > + Send,
931        >,
932    >;
933
934    fn poll_ready(
935        &mut self,
936        cx: &mut Context<'_>,
937    ) -> Poll<Result<(), Self::Error>> {
938        self.inner.poll_ready(cx).map_err(|_e| {
939            HttpCacheError::http(Box::new(std::io::Error::other(
940                "service error".to_string(),
941            )))
942        })
943    }
944
945    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
946        let cache = self.cache.clone();
947        let (parts, body) = req.into_parts();
948        let inner_service = self.inner.clone();
949
950        Box::pin(async move {
951            use http_cache_semantics::BeforeRequest;
952
953            // Use the core library's streaming cache interface
954            let analysis = cache.analyze_request(&parts, None).cache_err()?;
955
956            // If not cacheable, execute request FIRST, then invalidate on success (RFC 7234 Section 4.4)
957            if !analysis.should_cache {
958                // Apply rate limiting before non-cached request
959                #[cfg(feature = "rate-limiting")]
960                if let Some(rate_limiter) = &cache.options.rate_limiter {
961                    if let Ok(url) = analysis
962                        .request_parts
963                        .uri
964                        .to_string()
965                        .parse::<::url::Url>()
966                    {
967                        let rate_limit_key =
968                            url.host_str().unwrap_or("unknown");
969                        rate_limiter.until_key_ready(rate_limit_key).await;
970                    }
971                }
972
973                let req = Request::from_parts(parts, body);
974                let response =
975                    inner_service.oneshot(req).await.map_err(|_e| {
976                        HttpCacheError::http(Box::new(std::io::Error::other(
977                            "service error".to_string(),
978                        )))
979                    })?;
980
981                // Only invalidate AFTER successful response (RFC 7234 Section 4.4)
982                if response.status().is_success()
983                    || response.status().is_redirection()
984                {
985                    for key in &analysis.cache_bust_keys {
986                        let _ = cache.manager.delete(key).await;
987                    }
988                    // Invalidate both GET and HEAD caches per RFC 7234 Section 4.4
989                    if !analysis.is_get_head {
990                        let get_cache_key =
991                            cache.options.create_cache_key_for_invalidation(
992                                &analysis.request_parts,
993                                "GET",
994                            );
995                        let _ = cache.manager.delete(&get_cache_key).await;
996                        let head_cache_key =
997                            cache.options.create_cache_key_for_invalidation(
998                                &analysis.request_parts,
999                                "HEAD",
1000                            );
1001                        let _ = cache.manager.delete(&head_cache_key).await;
1002                    }
1003                }
1004
1005                let mut converted_response =
1006                    cache.manager.convert_body(response).await.cache_err()?;
1007
1008                // Add cache status headers if enabled
1009                if cache.options.cache_status_headers {
1010                    converted_response = add_cache_status_headers_streaming(
1011                        converted_response,
1012                        "MISS",
1013                        "MISS",
1014                    );
1015                }
1016
1017                return Ok(converted_response);
1018            }
1019
1020            // Special case for Reload mode: skip cache lookup but still cache response
1021            if analysis.cache_mode == CacheMode::Reload {
1022                // Apply rate limiting before reload request
1023                #[cfg(feature = "rate-limiting")]
1024                if let Some(rate_limiter) = &cache.options.rate_limiter {
1025                    if let Ok(url) = parts.uri.to_string().parse::<::url::Url>()
1026                    {
1027                        let rate_limit_key =
1028                            url.host_str().unwrap_or("unknown");
1029                        rate_limiter.until_key_ready(rate_limit_key).await;
1030                    }
1031                }
1032
1033                let req = Request::from_parts(parts, body);
1034                let response =
1035                    inner_service.oneshot(req).await.map_err(|_e| {
1036                        HttpCacheError::http(Box::new(std::io::Error::other(
1037                            "service error".to_string(),
1038                        )))
1039                    })?;
1040
1041                let cached_response = cache
1042                    .process_response(analysis, response, None)
1043                    .await
1044                    .cache_err()?;
1045
1046                let mut final_response = cached_response;
1047
1048                // Add cache status headers if enabled
1049                if cache.options.cache_status_headers {
1050                    final_response = add_cache_status_headers_streaming(
1051                        final_response,
1052                        "MISS",
1053                        "MISS",
1054                    );
1055                }
1056
1057                return Ok(final_response);
1058            }
1059
1060            // Look up cached response using interface
1061            if let Some((cached_response, policy)) = cache
1062                .lookup_cached_response(&analysis.cache_key)
1063                .await
1064                .cache_err()?
1065            {
1066                let before_req =
1067                    policy.before_request(&parts, std::time::SystemTime::now());
1068                match before_req {
1069                    BeforeRequest::Fresh(_) => {
1070                        let mut response = cached_response;
1071
1072                        // Add cache status headers if enabled
1073                        if cache.options.cache_status_headers {
1074                            response = add_cache_status_headers_streaming(
1075                                response, "HIT", "HIT",
1076                            );
1077                        }
1078
1079                        return Ok(response);
1080                    }
1081                    BeforeRequest::Stale {
1082                        request: conditional_parts, ..
1083                    } => {
1084                        // Apply rate limiting before conditional request
1085                        #[cfg(feature = "rate-limiting")]
1086                        if let Some(rate_limiter) = &cache.options.rate_limiter
1087                        {
1088                            if let Ok(url) = conditional_parts
1089                                .uri
1090                                .to_string()
1091                                .parse::<::url::Url>()
1092                            {
1093                                let rate_limit_key =
1094                                    url.host_str().unwrap_or("unknown");
1095                                rate_limiter
1096                                    .until_key_ready(rate_limit_key)
1097                                    .await;
1098                            }
1099                        }
1100
1101                        let conditional_req =
1102                            Request::from_parts(conditional_parts, body);
1103                        let conditional_response = inner_service
1104                            .oneshot(conditional_req)
1105                            .await
1106                            .map_err(|_e| {
1107                                HttpCacheError::http(Box::new(
1108                                    std::io::Error::other(
1109                                        "service error".to_string(),
1110                                    ),
1111                                ))
1112                            })?;
1113
1114                        if conditional_response.status() == 304 {
1115                            let (fresh_parts, _) =
1116                                conditional_response.into_parts();
1117                            let updated_response = cache
1118                                .handle_not_modified(
1119                                    cached_response,
1120                                    &fresh_parts,
1121                                )
1122                                .await
1123                                .cache_err()?;
1124
1125                            let mut response = updated_response;
1126
1127                            // Add cache status headers if enabled
1128                            if cache.options.cache_status_headers {
1129                                response = add_cache_status_headers_streaming(
1130                                    response, "HIT", "HIT",
1131                                );
1132                            }
1133
1134                            return Ok(response);
1135                        } else {
1136                            let cached_response = cache
1137                                .process_response(
1138                                    analysis,
1139                                    conditional_response,
1140                                    None,
1141                                )
1142                                .await
1143                                .cache_err()?;
1144
1145                            let mut response = cached_response;
1146
1147                            // Add cache status headers if enabled
1148                            if cache.options.cache_status_headers {
1149                                response = add_cache_status_headers_streaming(
1150                                    response, "MISS", "MISS",
1151                                );
1152                            }
1153
1154                            return Ok(response);
1155                        }
1156                    }
1157                }
1158            }
1159
1160            // Handle OnlyIfCached mode: return 504 Gateway Timeout on cache miss
1161            if analysis.cache_mode == CacheMode::OnlyIfCached {
1162                let mut response = Response::builder()
1163                    .status(504)
1164                    .body(cache.manager.empty_body())
1165                    .map_err(|e| HttpCacheError::other(e.to_string()))?;
1166
1167                if cache.options.cache_status_headers {
1168                    response = add_cache_status_headers_streaming(
1169                        response, "MISS", "MISS",
1170                    );
1171                }
1172
1173                return Ok(response);
1174            }
1175
1176            // Apply rate limiting before fresh request
1177            #[cfg(feature = "rate-limiting")]
1178            if let Some(rate_limiter) = &cache.options.rate_limiter {
1179                if let Ok(url) = parts.uri.to_string().parse::<url::Url>() {
1180                    let rate_limit_key = url.host_str().unwrap_or("unknown");
1181                    rate_limiter.until_key_ready(rate_limit_key).await;
1182                }
1183            }
1184
1185            // Fetch fresh response
1186            let req = Request::from_parts(parts, body);
1187            let response = inner_service.oneshot(req).await.map_err(|_e| {
1188                HttpCacheError::http(Box::new(std::io::Error::other(
1189                    "service error".to_string(),
1190                )))
1191            })?;
1192
1193            // Process using streaming interface
1194            let cached_response = cache
1195                .process_response(analysis, response, None)
1196                .await
1197                .cache_err()?;
1198
1199            let mut final_response = cached_response;
1200
1201            // Add cache status headers if enabled
1202            if cache.options.cache_status_headers {
1203                final_response = add_cache_status_headers_streaming(
1204                    final_response,
1205                    "MISS",
1206                    "MISS",
1207                );
1208            }
1209
1210            Ok(final_response)
1211        })
1212    }
1213}
1214
1215/// Body type that wraps cached responses  
1216pub enum HttpCacheBody<B> {
1217    /// Buffered body from cache
1218    Buffered(Vec<u8>),
1219    /// Original body (fallback)
1220    Original(B),
1221}
1222
1223impl<B> Body for HttpCacheBody<B>
1224where
1225    B: Body + Unpin,
1226    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
1227    B::Data: Into<bytes::Bytes>,
1228{
1229    type Data = bytes::Bytes;
1230    type Error = Box<dyn std::error::Error + Send + Sync>;
1231
1232    fn poll_frame(
1233        mut self: Pin<&mut Self>,
1234        cx: &mut Context<'_>,
1235    ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
1236        match &mut *self {
1237            HttpCacheBody::Buffered(bytes) => {
1238                if bytes.is_empty() {
1239                    Poll::Ready(None)
1240                } else {
1241                    let data = std::mem::take(bytes);
1242                    Poll::Ready(Some(Ok(http_body::Frame::data(
1243                        bytes::Bytes::from(data),
1244                    ))))
1245                }
1246            }
1247            HttpCacheBody::Original(body) => {
1248                Pin::new(body).poll_frame(cx).map(|opt| {
1249                    opt.map(|res| {
1250                        res.map(|frame| frame.map_data(Into::into))
1251                            .map_err(Into::into)
1252                    })
1253                })
1254            }
1255        }
1256    }
1257
1258    fn is_end_stream(&self) -> bool {
1259        match self {
1260            HttpCacheBody::Buffered(bytes) => bytes.is_empty(),
1261            HttpCacheBody::Original(body) => body.is_end_stream(),
1262        }
1263    }
1264
1265    fn size_hint(&self) -> http_body::SizeHint {
1266        match self {
1267            HttpCacheBody::Buffered(bytes) => {
1268                let len = bytes.len() as u64;
1269                http_body::SizeHint::with_exact(len)
1270            }
1271            HttpCacheBody::Original(body) => body.size_hint(),
1272        }
1273    }
1274}
1275
1276#[cfg(test)]
1277mod test;