http_cache_tower/
lib.rs

1//! HTTP caching middleware for Tower services and Axum applications.
2//!
3//! This crate provides Tower layers that implement HTTP caching according to RFC 7234.
4//! It supports both traditional buffered caching and streaming responses for large payloads.
5//!
6//! ## Basic Usage
7//!
8//! ### With Tower Services
9//!
10//! ```rust,no_run
11//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
12//! use http_cache::{CacheMode, HttpCache, HttpCacheOptions};
13//! use tower::ServiceBuilder;
14//! use tower::service_fn;
15//! use tower::ServiceExt;
16//! use http::{Request, Response};
17//! use http_body_util::Full;
18//! use bytes::Bytes;
19//! use std::convert::Infallible;
20//!
21//! async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
22//!     Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
23//! }
24//!
25//! #[tokio::main]
26//! async fn main() {
27//!     // Create cache manager with disk storage
28//!     let cache_manager = CACacheManager::new("./cache".into(), true);
29//!     
30//!     // Create cache layer
31//!     let cache_layer = HttpCacheLayer::new(cache_manager);
32//!     
33//!     // Build service with caching
34//!     let service = ServiceBuilder::new()
35//!         .layer(cache_layer)
36//!         .service_fn(handler);
37//!     
38//!     // Use the service
39//!     let request = Request::builder()
40//!         .uri("http://example.com")
41//!         .body(Full::new(Bytes::new()))
42//!         .unwrap();
43//!     let response = service.oneshot(request).await.unwrap();
44//! }
45//! ```
46//!
47//! ### With Custom Cache Configuration
48//!
49//! ```rust
50//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
51//! use http_cache::{CacheMode, HttpCache, HttpCacheOptions};
52//!
53//! # #[tokio::main]
54//! # async fn main() {
55//! // Create cache manager
56//! let cache_manager = CACacheManager::new("./cache".into(), true);
57//!
58//! // Configure cache behavior
59//! let cache = HttpCache {
60//!     mode: CacheMode::Default,
61//!     manager: cache_manager,
62//!     options: HttpCacheOptions::default(),
63//! };
64//!
65//! // Create layer with custom cache
66//! let cache_layer = HttpCacheLayer::with_cache(cache);
67//! # }
68//! ```
69//!
70//! ### Streaming Support
71//!
72//! For handling large responses without buffering, use `StreamingManager`:
73//!
74//! ```rust
75//! use http_cache_tower::HttpCacheStreamingLayer;
76//! use http_cache::StreamingManager;
77//! use std::path::PathBuf;
78//!
79//! # #[tokio::main]
80//! # async fn main() {
81//! // Create streaming cache setup
82//! let streaming_manager = StreamingManager::new("./streaming-cache".into());
83//! let streaming_layer = HttpCacheStreamingLayer::new(streaming_manager);
84//!
85//! // Use with your service
86//! // let service = streaming_layer.layer(your_service);
87//! # }
88//! ```
89//!
90//! ## Cache Modes
91//!
92//! Different cache modes provide different behaviors:
93//!
94//! - `CacheMode::Default`: Follow HTTP caching rules strictly
95//! - `CacheMode::NoStore`: Never cache responses
96//! - `CacheMode::NoCache`: Always revalidate with the origin server
97//! - `CacheMode::ForceCache`: Cache responses even if headers suggest otherwise
98//! - `CacheMode::OnlyIfCached`: Only serve from cache, never hit origin server
99//! - `CacheMode::IgnoreRules`: Cache everything regardless of headers
100//!
101//! ## Cache Invalidation
102//!
103//! The middleware automatically handles cache invalidation for unsafe HTTP methods:
104//!
105//! ```text
106//! These methods will invalidate any cached GET response for the same URI:
107//! - PUT /api/users/123    -> invalidates GET /api/users/123
108//! - POST /api/users/123   -> invalidates GET /api/users/123  
109//! - DELETE /api/users/123 -> invalidates GET /api/users/123
110//! - PATCH /api/users/123  -> invalidates GET /api/users/123
111//! ```
112//!
113//! ## Integration with Other Tower Layers
114//!
115//! The cache layer works with other Tower middleware:
116//!
117//! ```rust,no_run
118//! use tower::ServiceBuilder;
119//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
120//! use tower::service_fn;
121//! use tower::ServiceExt;
122//! use http::{Request, Response};
123//! use http_body_util::Full;
124//! use bytes::Bytes;
125//! use std::convert::Infallible;
126//!
127//! async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
128//!     Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
129//! }
130//!
131//! #[tokio::main]
132//! async fn main() {
133//!     let cache_manager = CACacheManager::new("./cache".into(), true);
134//!     let cache_layer = HttpCacheLayer::new(cache_manager);
135//!
136//!     let service = ServiceBuilder::new()
137//!         // .layer(TraceLayer::new_for_http())  // Logging (requires tower-http)
138//!         // .layer(CompressionLayer::new())     // Compression (requires tower-http)
139//!         .layer(cache_layer)                    // Caching
140//!         .service_fn(handler);
141//!     
142//!     // Use the service
143//!     let request = Request::builder()
144//!         .uri("http://example.com")
145//!         .body(Full::new(Bytes::new()))
146//!         .unwrap();
147//!     let response = service.oneshot(request).await.unwrap();
148//! }
149//! ```
150
151use bytes::Bytes;
152use http::{HeaderValue, Request, Response};
153use http_body::Body;
154use http_body_util::BodyExt;
155
156#[cfg(feature = "manager-cacache")]
157pub use http_cache::CACacheManager;
158
159#[cfg(feature = "rate-limiting")]
160pub use http_cache::rate_limiting::{
161    CacheAwareRateLimiter, DirectRateLimiter, DomainRateLimiter, Quota,
162};
163#[cfg(feature = "streaming")]
164use http_cache::StreamingError;
165use http_cache::{
166    CacheManager, CacheMode, HttpCache, HttpCacheInterface, HttpCacheOptions,
167};
168#[cfg(feature = "streaming")]
169use http_cache::{
170    HttpCacheStreamInterface, HttpStreamingCache, StreamingCacheManager,
171};
172use std::{
173    pin::Pin,
174    sync::Arc,
175    task::{Context, Poll},
176};
177use tower::{Layer, Service, ServiceExt};
178
179// Re-export unified error types from http-cache core
180pub use http_cache::HttpCacheError;
181
182#[cfg(feature = "streaming")]
183/// Type alias for tower streaming errors, using the unified streaming error system
184pub type TowerStreamingError = http_cache::ClientStreamingError;
185
186/// Helper functions for error conversions
187trait HttpCacheErrorExt<T> {
188    fn cache_err(self) -> Result<T, HttpCacheError>;
189}
190
191impl<T, E> HttpCacheErrorExt<T> for Result<T, E>
192where
193    E: ToString,
194{
195    fn cache_err(self) -> Result<T, HttpCacheError> {
196        self.map_err(|e| HttpCacheError::cache(e.to_string()))
197    }
198}
199
200/// Helper function to collect a body into bytes
201async fn collect_body<B>(body: B) -> Result<Vec<u8>, B::Error>
202where
203    B: Body,
204{
205    let collected = BodyExt::collect(body).await?;
206    Ok(collected.to_bytes().to_vec())
207}
208
209/// Helper function to add cache status headers to a response
210fn add_cache_status_headers<B>(
211    mut response: Response<HttpCacheBody<B>>,
212    hit_or_miss: &str,
213    cache_lookup: &str,
214) -> Response<HttpCacheBody<B>> {
215    let headers = response.headers_mut();
216    headers.insert(
217        http_cache::XCACHE,
218        HeaderValue::from_str(hit_or_miss).unwrap(),
219    );
220    headers.insert(
221        http_cache::XCACHELOOKUP,
222        HeaderValue::from_str(cache_lookup).unwrap(),
223    );
224    response
225}
226
227#[cfg(feature = "streaming")]
228fn add_cache_status_headers_streaming<B>(
229    mut response: Response<B>,
230    hit_or_miss: &str,
231    cache_lookup: &str,
232) -> Response<B> {
233    let headers = response.headers_mut();
234    headers.insert(
235        http_cache::XCACHE,
236        HeaderValue::from_str(hit_or_miss).unwrap(),
237    );
238    headers.insert(
239        http_cache::XCACHELOOKUP,
240        HeaderValue::from_str(cache_lookup).unwrap(),
241    );
242    response
243}
244
245/// HTTP cache layer for Tower services.
246///
247/// This layer implements HTTP caching according to RFC 7234, automatically caching
248/// GET and HEAD responses based on their cache-control headers and invalidating
249/// cache entries when unsafe methods (PUT, POST, DELETE, PATCH) are used.
250///
251/// # Example
252///
253/// ```rust
254/// use http_cache_tower::{HttpCacheLayer, CACacheManager};
255/// use tower::ServiceBuilder;
256/// use tower::service_fn;
257/// use http::{Request, Response};
258/// use http_body_util::Full;
259/// use bytes::Bytes;
260/// use std::convert::Infallible;
261///
262/// # #[tokio::main]
263/// # async fn main() {
264/// let cache_manager = CACacheManager::new("./cache".into(), true);
265/// let cache_layer = HttpCacheLayer::new(cache_manager);
266///
267/// // Use with ServiceBuilder
268/// let service = ServiceBuilder::new()
269///     .layer(cache_layer)
270///     .service_fn(|_req: Request<Full<Bytes>>| async {
271///         Ok::<_, Infallible>(Response::new(Full::new(Bytes::from("Hello"))))
272///     });
273/// # }
274/// ```
275#[derive(Clone)]
276pub struct HttpCacheLayer<CM>
277where
278    CM: CacheManager,
279{
280    cache: Arc<HttpCache<CM>>,
281}
282
283impl<CM> HttpCacheLayer<CM>
284where
285    CM: CacheManager,
286{
287    /// Create a new HTTP cache layer with default configuration.
288    ///
289    /// Uses [`CacheMode::Default`] and default [`HttpCacheOptions`].
290    ///
291    /// # Arguments
292    ///
293    /// * `cache_manager` - The cache manager to use for storing responses
294    ///
295    /// # Example
296    ///
297    /// ```rust
298    /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
299    ///
300    /// # #[tokio::main]
301    /// # async fn main() {
302    /// let cache_manager = CACacheManager::new("./cache".into(), true);
303    /// let layer = HttpCacheLayer::new(cache_manager);
304    /// # }
305    /// ```
306    pub fn new(cache_manager: CM) -> Self {
307        Self {
308            cache: Arc::new(HttpCache {
309                mode: CacheMode::Default,
310                manager: cache_manager,
311                options: HttpCacheOptions::default(),
312            }),
313        }
314    }
315
316    /// Create a new HTTP cache layer with custom options.
317    ///
318    /// Uses [`CacheMode::Default`] but allows customizing the cache behavior
319    /// through [`HttpCacheOptions`].
320    ///
321    /// # Arguments
322    ///
323    /// * `cache_manager` - The cache manager to use for storing responses
324    /// * `options` - Custom cache options
325    ///
326    /// # Example
327    ///
328    /// ```rust
329    /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
330    /// use http_cache::HttpCacheOptions;
331    ///
332    /// # #[tokio::main]
333    /// # async fn main() {
334    /// let cache_manager = CACacheManager::new("./cache".into(), true);
335    ///
336    /// let options = HttpCacheOptions {
337    ///     cache_key: Some(std::sync::Arc::new(|req: &http::request::Parts| {
338    ///         format!("custom:{}:{}", req.method, req.uri)
339    ///     })),
340    ///     ..Default::default()
341    /// };
342    ///
343    /// let layer = HttpCacheLayer::with_options(cache_manager, options);
344    /// # }
345    /// ```
346    pub fn with_options(cache_manager: CM, options: HttpCacheOptions) -> Self {
347        Self {
348            cache: Arc::new(HttpCache {
349                mode: CacheMode::Default,
350                manager: cache_manager,
351                options,
352            }),
353        }
354    }
355
356    /// Create a new HTTP cache layer with a pre-configured cache.
357    ///
358    /// This method gives you full control over the cache configuration,
359    /// including the cache mode.
360    ///
361    /// # Arguments
362    ///
363    /// * `cache` - A fully configured HttpCache instance
364    ///
365    /// # Example
366    ///
367    /// ```rust
368    /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
369    /// use http_cache::{HttpCache, CacheMode, HttpCacheOptions};
370    ///
371    /// # #[tokio::main]
372    /// # async fn main() {
373    /// let cache_manager = CACacheManager::new("./cache".into(), true);
374    ///
375    /// let cache = HttpCache {
376    ///     mode: CacheMode::ForceCache,
377    ///     manager: cache_manager,
378    ///     options: HttpCacheOptions::default(),
379    /// };
380    ///
381    /// let layer = HttpCacheLayer::with_cache(cache);
382    /// # }
383    /// ```
384    pub fn with_cache(cache: HttpCache<CM>) -> Self {
385        Self { cache: Arc::new(cache) }
386    }
387}
388
389/// HTTP cache layer with streaming support for Tower services.
390///
391/// This layer provides the same HTTP caching functionality as [`HttpCacheLayer`]
392/// but handles streaming responses. It can work with large
393/// responses without buffering them entirely in memory.
394///
395/// # Example
396///
397/// ```rust
398/// use http_cache_tower::HttpCacheStreamingLayer;
399/// use http_cache::StreamingManager;
400/// use tower::ServiceBuilder;
401/// use tower::service_fn;
402/// use http::{Request, Response};
403/// use http_body_util::Full;
404/// use bytes::Bytes;
405/// use std::convert::Infallible;
406///
407/// async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
408///     Ok(Response::new(Full::new(Bytes::from("Hello"))))
409/// }
410///
411/// # #[tokio::main]
412/// # async fn main() {
413/// let streaming_manager = StreamingManager::new("./cache".into());
414/// let streaming_layer = HttpCacheStreamingLayer::new(streaming_manager);
415///
416/// // Use with ServiceBuilder
417/// let service = ServiceBuilder::new()
418///     .layer(streaming_layer)
419///     .service_fn(handler);
420/// # }
421/// ```
422#[cfg(feature = "streaming")]
423#[derive(Clone)]
424pub struct HttpCacheStreamingLayer<CM>
425where
426    CM: StreamingCacheManager,
427{
428    cache: Arc<HttpStreamingCache<CM>>,
429}
430
431#[cfg(feature = "streaming")]
432impl<CM> HttpCacheStreamingLayer<CM>
433where
434    CM: StreamingCacheManager,
435{
436    /// Create a new HTTP cache streaming layer with default configuration.
437    ///
438    /// Uses [`CacheMode::Default`] and default [`HttpCacheOptions`].
439    ///
440    /// # Arguments
441    ///
442    /// * `cache_manager` - The streaming cache manager to use
443    ///
444    /// # Example
445    ///
446    /// ```rust
447    /// use http_cache_tower::HttpCacheStreamingLayer;
448    /// use http_cache::StreamingManager;
449    ///
450    /// # #[tokio::main]
451    /// # async fn main() {
452    /// let streaming_manager = StreamingManager::new("./cache".into());
453    /// let layer = HttpCacheStreamingLayer::new(streaming_manager);
454    /// # }
455    /// ```
456    pub fn new(cache_manager: CM) -> Self {
457        Self {
458            cache: Arc::new(HttpStreamingCache {
459                mode: CacheMode::Default,
460                manager: cache_manager,
461                options: HttpCacheOptions::default(),
462            }),
463        }
464    }
465
466    /// Create a new HTTP cache streaming layer with custom options.
467    ///
468    /// Uses [`CacheMode::Default`] but allows customizing cache behavior.
469    ///
470    /// # Arguments
471    ///
472    /// * `cache_manager` - The streaming cache manager to use
473    /// * `options` - Custom cache options
474    ///
475    /// # Example
476    ///
477    /// ```rust
478    /// use http_cache_tower::HttpCacheStreamingLayer;
479    /// use http_cache::{StreamingManager, HttpCacheOptions};
480    ///
481    /// # #[tokio::main]
482    /// # async fn main() {
483    /// let streaming_manager = StreamingManager::new("./cache".into());
484    ///
485    /// let options = HttpCacheOptions {
486    ///     cache_key: Some(std::sync::Arc::new(|req: &http::request::Parts| {
487    ///         format!("stream:{}:{}", req.method, req.uri)
488    ///     })),
489    ///     ..Default::default()
490    /// };
491    ///
492    /// let layer = HttpCacheStreamingLayer::with_options(streaming_manager, options);
493    /// # }
494    /// ```
495    pub fn with_options(cache_manager: CM, options: HttpCacheOptions) -> Self {
496        Self {
497            cache: Arc::new(HttpStreamingCache {
498                mode: CacheMode::Default,
499                manager: cache_manager,
500                options,
501            }),
502        }
503    }
504
505    /// Create a new HTTP cache streaming layer with a pre-configured cache.
506    ///
507    /// This method gives you full control over the streaming cache configuration.
508    ///
509    /// # Arguments
510    ///
511    /// * `cache` - A fully configured HttpStreamingCache instance
512    ///
513    /// # Example
514    ///
515    /// ```rust
516    /// use http_cache_tower::HttpCacheStreamingLayer;
517    /// use http_cache::{StreamingManager, HttpStreamingCache, CacheMode, HttpCacheOptions};
518    ///
519    /// # #[tokio::main]
520    /// # async fn main() {
521    /// let streaming_manager = StreamingManager::new("./cache".into());
522    ///
523    /// let cache = HttpStreamingCache {
524    ///     mode: CacheMode::ForceCache,
525    ///     manager: streaming_manager,
526    ///     options: HttpCacheOptions::default(),
527    /// };
528    ///
529    /// let layer = HttpCacheStreamingLayer::with_cache(cache);
530    /// # }
531    /// ```
532    pub fn with_cache(cache: HttpStreamingCache<CM>) -> Self {
533        Self { cache: Arc::new(cache) }
534    }
535}
536
537impl<S, CM> Layer<S> for HttpCacheLayer<CM>
538where
539    CM: CacheManager,
540{
541    type Service = HttpCacheService<S, CM>;
542
543    fn layer(&self, inner: S) -> Self::Service {
544        HttpCacheService { inner, cache: self.cache.clone() }
545    }
546}
547
548#[cfg(feature = "streaming")]
549impl<S, CM> Layer<S> for HttpCacheStreamingLayer<CM>
550where
551    CM: StreamingCacheManager,
552{
553    type Service = HttpCacheStreamingService<S, CM>;
554
555    fn layer(&self, inner: S) -> Self::Service {
556        HttpCacheStreamingService { inner, cache: self.cache.clone() }
557    }
558}
559
560/// HTTP cache service for Tower/Hyper
561pub struct HttpCacheService<S, CM>
562where
563    CM: CacheManager,
564{
565    inner: S,
566    cache: Arc<HttpCache<CM>>,
567}
568
569impl<S, CM> Clone for HttpCacheService<S, CM>
570where
571    S: Clone,
572    CM: CacheManager,
573{
574    fn clone(&self) -> Self {
575        Self { inner: self.inner.clone(), cache: self.cache.clone() }
576    }
577}
578
579/// HTTP cache streaming service for Tower/Hyper
580#[cfg(feature = "streaming")]
581pub struct HttpCacheStreamingService<S, CM>
582where
583    CM: StreamingCacheManager,
584{
585    inner: S,
586    cache: Arc<HttpStreamingCache<CM>>,
587}
588
589#[cfg(feature = "streaming")]
590impl<S, CM> Clone for HttpCacheStreamingService<S, CM>
591where
592    S: Clone,
593    CM: StreamingCacheManager,
594{
595    fn clone(&self) -> Self {
596        Self { inner: self.inner.clone(), cache: self.cache.clone() }
597    }
598}
599
600impl<S, CM, ReqBody, ResBody> Service<Request<ReqBody>>
601    for HttpCacheService<S, CM>
602where
603    S: Service<Request<ReqBody>, Response = Response<ResBody>>
604        + Clone
605        + Send
606        + 'static,
607    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
608    S::Future: Send + 'static,
609    ReqBody: Body + Send + 'static,
610    ReqBody::Data: Send,
611    ReqBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
612    ResBody: Body + Send + 'static,
613    ResBody::Data: Send,
614    ResBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
615    CM: CacheManager,
616{
617    type Response = Response<HttpCacheBody<ResBody>>;
618    type Error = HttpCacheError;
619    type Future = Pin<
620        Box<
621            dyn std::future::Future<
622                    Output = Result<Self::Response, Self::Error>,
623                > + Send,
624        >,
625    >;
626
627    fn poll_ready(
628        &mut self,
629        cx: &mut Context<'_>,
630    ) -> Poll<Result<(), Self::Error>> {
631        self.inner.poll_ready(cx).map_err(|_e| {
632            HttpCacheError::http(Box::new(std::io::Error::other(
633                "service error".to_string(),
634            )))
635        })
636    }
637
638    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
639        let cache = self.cache.clone();
640        let (parts, body) = req.into_parts();
641        let inner_service = self.inner.clone();
642
643        Box::pin(async move {
644            use http_cache_semantics::BeforeRequest;
645
646            // Use the core library's cache interface for analysis
647            let analysis = cache.analyze_request(&parts, None).cache_err()?;
648
649            // Handle cache busting and non-cacheable requests
650            for key in &analysis.cache_bust_keys {
651                cache.manager.delete(key).await.cache_err()?;
652            }
653
654            // For non-GET/HEAD requests, invalidate cached GET responses
655            if !analysis.should_cache && !analysis.is_get_head {
656                let get_cache_key = cache
657                    .options
658                    .create_cache_key_for_invalidation(&parts, "GET");
659                let _ = cache.manager.delete(&get_cache_key).await;
660            }
661
662            // If not cacheable, just pass through
663            if !analysis.should_cache {
664                let req = Request::from_parts(parts, body);
665                let response =
666                    inner_service.oneshot(req).await.map_err(|_e| {
667                        HttpCacheError::http(Box::new(std::io::Error::other(
668                            "service error".to_string(),
669                        )))
670                    })?;
671                return Ok(response.map(HttpCacheBody::Original));
672            }
673
674            // Special case for Reload mode: skip cache lookup but still cache response
675            if analysis.cache_mode == CacheMode::Reload {
676                let req = Request::from_parts(parts, body);
677                let response =
678                    inner_service.oneshot(req).await.map_err(|_e| {
679                        HttpCacheError::http(Box::new(std::io::Error::other(
680                            "service error".to_string(),
681                        )))
682                    })?;
683
684                let (res_parts, res_body) = response.into_parts();
685                let body_bytes =
686                    collect_body(res_body).await.map_err(|_e| {
687                        HttpCacheError::http(Box::new(std::io::Error::other(
688                            "service error".to_string(),
689                        )))
690                    })?;
691
692                let cached_response = cache
693                    .process_response(
694                        analysis,
695                        Response::from_parts(res_parts, body_bytes.clone()),
696                        None,
697                    )
698                    .await
699                    .cache_err()?;
700
701                return Ok(cached_response.map(HttpCacheBody::Buffered));
702            }
703
704            // Look up cached response using interface
705            if let Some((cached_response, policy)) = cache
706                .lookup_cached_response(&analysis.cache_key)
707                .await
708                .cache_err()?
709            {
710                let before_req =
711                    policy.before_request(&parts, std::time::SystemTime::now());
712                match before_req {
713                    BeforeRequest::Fresh(_) => {
714                        // Return cached response
715                        let mut response = http_cache::HttpCacheOptions::http_response_to_response(
716                            &cached_response,
717                            HttpCacheBody::Buffered(cached_response.body.clone()),
718                        ).map_err(HttpCacheError::other)?;
719
720                        // Add cache status headers if enabled
721                        if cache.options.cache_status_headers {
722                            response = add_cache_status_headers(
723                                response, "HIT", "HIT",
724                            );
725                        }
726
727                        // Insert metadata into response extensions if present
728                        if let Some(metadata) = cached_response.metadata {
729                            response.extensions_mut().insert(
730                                http_cache::HttpCacheMetadata::from(metadata),
731                            );
732                        }
733
734                        return Ok(response);
735                    }
736                    BeforeRequest::Stale {
737                        request: conditional_parts, ..
738                    } => {
739                        // Make conditional request
740                        let conditional_req =
741                            Request::from_parts(conditional_parts, body);
742                        let conditional_response = inner_service
743                            .oneshot(conditional_req)
744                            .await
745                            .map_err(|_e| {
746                                HttpCacheError::http(Box::new(
747                                    std::io::Error::other(
748                                        "service error".to_string(),
749                                    ),
750                                ))
751                            })?;
752
753                        if conditional_response.status() == 304 {
754                            // Use cached response with updated headers
755                            let (fresh_parts, _) =
756                                conditional_response.into_parts();
757                            let updated_response = cache
758                                .handle_not_modified(
759                                    cached_response,
760                                    &fresh_parts,
761                                )
762                                .await
763                                .cache_err()?;
764
765                            let mut response = http_cache::HttpCacheOptions::http_response_to_response(
766                                &updated_response,
767                                HttpCacheBody::Buffered(updated_response.body.clone()),
768                            ).map_err(HttpCacheError::other)?;
769
770                            // Add cache status headers if enabled
771                            if cache.options.cache_status_headers {
772                                response = add_cache_status_headers(
773                                    response, "HIT", "HIT",
774                                );
775                            }
776
777                            // Insert metadata into response extensions if present
778                            if let Some(metadata) = updated_response.metadata {
779                                response.extensions_mut().insert(
780                                    http_cache::HttpCacheMetadata::from(
781                                        metadata,
782                                    ),
783                                );
784                            }
785
786                            return Ok(response);
787                        } else {
788                            // Process fresh response
789                            let (parts, res_body) =
790                                conditional_response.into_parts();
791                            let body_bytes =
792                                collect_body(res_body).await.map_err(|_e| {
793                                    HttpCacheError::http(Box::new(
794                                        std::io::Error::other(
795                                            "service error".to_string(),
796                                        ),
797                                    ))
798                                })?;
799
800                            let cached_response = cache
801                                .process_response(
802                                    analysis,
803                                    Response::from_parts(
804                                        parts,
805                                        body_bytes.clone(),
806                                    ),
807                                    None,
808                                )
809                                .await
810                                .cache_err()?;
811
812                            let mut response =
813                                cached_response.map(HttpCacheBody::Buffered);
814
815                            // Add cache status headers if enabled
816                            if cache.options.cache_status_headers {
817                                response = add_cache_status_headers(
818                                    response, "MISS", "MISS",
819                                );
820                            }
821
822                            return Ok(response);
823                        }
824                    }
825                }
826            }
827
828            // Fetch fresh response
829            let req = Request::from_parts(parts, body);
830            let response = inner_service.oneshot(req).await.map_err(|_e| {
831                HttpCacheError::http(Box::new(std::io::Error::other(
832                    "service error".to_string(),
833                )))
834            })?;
835
836            let (res_parts, res_body) = response.into_parts();
837            let body_bytes = collect_body(res_body).await.map_err(|_e| {
838                HttpCacheError::http(Box::new(std::io::Error::other(
839                    "service error".to_string(),
840                )))
841            })?;
842
843            // Process and cache using interface
844            let cached_response = cache
845                .process_response(
846                    analysis,
847                    Response::from_parts(res_parts, body_bytes.clone()),
848                    None,
849                )
850                .await
851                .cache_err()?;
852
853            let mut response = cached_response.map(HttpCacheBody::Buffered);
854
855            // Add cache status headers if enabled
856            if cache.options.cache_status_headers {
857                response = add_cache_status_headers(response, "MISS", "MISS");
858            }
859
860            Ok(response)
861        })
862    }
863}
864
865// Hyper service implementation for HttpCacheService
866impl<S, CM> hyper::service::Service<Request<hyper::body::Incoming>>
867    for HttpCacheService<S, CM>
868where
869    S: Service<Request<hyper::body::Incoming>> + Clone + Send + 'static,
870    S::Response: Into<Response<http_body_util::Full<Bytes>>>,
871    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
872    S::Future: Send + 'static,
873    CM: CacheManager,
874{
875    type Response = Response<HttpCacheBody<http_body_util::Full<Bytes>>>;
876    type Error = HttpCacheError;
877    type Future = Pin<
878        Box<
879            dyn std::future::Future<
880                    Output = Result<Self::Response, Self::Error>,
881                > + Send,
882        >,
883    >;
884
885    fn call(&self, _req: Request<hyper::body::Incoming>) -> Self::Future {
886        // Convert to the format expected by the generic Service implementation
887        let service_clone = self.clone();
888        Box::pin(async move { service_clone.call(_req).await })
889    }
890}
891
892#[cfg(feature = "streaming")]
893impl<S, CM, ReqBody, ResBody> Service<Request<ReqBody>>
894    for HttpCacheStreamingService<S, CM>
895where
896    S: Service<Request<ReqBody>, Response = Response<ResBody>>
897        + Clone
898        + Send
899        + 'static,
900    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
901    S::Future: Send + 'static,
902    ReqBody: Body + Send + 'static,
903    ReqBody::Data: Send,
904    ReqBody::Error: Into<StreamingError>,
905    ResBody: Body + Send + 'static,
906    ResBody::Data: Send,
907    ResBody::Error: Into<StreamingError>,
908    CM: StreamingCacheManager,
909    <CM::Body as http_body::Body>::Data: Send,
910    <CM::Body as http_body::Body>::Error:
911        Into<StreamingError> + Send + Sync + 'static,
912{
913    type Response = Response<CM::Body>;
914    type Error = HttpCacheError;
915    type Future = Pin<
916        Box<
917            dyn std::future::Future<
918                    Output = Result<Self::Response, Self::Error>,
919                > + Send,
920        >,
921    >;
922
923    fn poll_ready(
924        &mut self,
925        cx: &mut Context<'_>,
926    ) -> Poll<Result<(), Self::Error>> {
927        self.inner.poll_ready(cx).map_err(|_e| {
928            HttpCacheError::http(Box::new(std::io::Error::other(
929                "service error".to_string(),
930            )))
931        })
932    }
933
934    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
935        let cache = self.cache.clone();
936        let (parts, body) = req.into_parts();
937        let inner_service = self.inner.clone();
938
939        Box::pin(async move {
940            use http_cache_semantics::BeforeRequest;
941
942            // Use the core library's streaming cache interface
943            let analysis = cache.analyze_request(&parts, None).cache_err()?;
944
945            // Handle cache busting
946            for key in &analysis.cache_bust_keys {
947                cache.manager.delete(key).await.cache_err()?;
948            }
949
950            // For non-GET/HEAD requests, invalidate cached GET responses
951            if !analysis.should_cache && !analysis.is_get_head {
952                let get_cache_key = cache
953                    .options
954                    .create_cache_key_for_invalidation(&parts, "GET");
955                let _ = cache.manager.delete(&get_cache_key).await;
956            }
957
958            // If not cacheable, convert body type and return
959            if !analysis.should_cache {
960                // Apply rate limiting before non-cached request
961                #[cfg(feature = "rate-limiting")]
962                if let Some(rate_limiter) = &cache.options.rate_limiter {
963                    if let Ok(url) = parts.uri.to_string().parse::<::url::Url>()
964                    {
965                        let rate_limit_key =
966                            url.host_str().unwrap_or("unknown");
967                        rate_limiter.until_key_ready(rate_limit_key).await;
968                    }
969                }
970
971                let req = Request::from_parts(parts, body);
972                let response =
973                    inner_service.oneshot(req).await.map_err(|_e| {
974                        HttpCacheError::http(Box::new(std::io::Error::other(
975                            "service error".to_string(),
976                        )))
977                    })?;
978                let mut converted_response =
979                    cache.manager.convert_body(response).await.cache_err()?;
980
981                // Add cache status headers if enabled
982                if cache.options.cache_status_headers {
983                    converted_response = add_cache_status_headers_streaming(
984                        converted_response,
985                        "MISS",
986                        "MISS",
987                    );
988                }
989
990                return Ok(converted_response);
991            }
992
993            // Special case for Reload mode: skip cache lookup but still cache response
994            if analysis.cache_mode == CacheMode::Reload {
995                // Apply rate limiting before reload request
996                #[cfg(feature = "rate-limiting")]
997                if let Some(rate_limiter) = &cache.options.rate_limiter {
998                    if let Ok(url) = parts.uri.to_string().parse::<::url::Url>()
999                    {
1000                        let rate_limit_key =
1001                            url.host_str().unwrap_or("unknown");
1002                        rate_limiter.until_key_ready(rate_limit_key).await;
1003                    }
1004                }
1005
1006                let req = Request::from_parts(parts, body);
1007                let response =
1008                    inner_service.oneshot(req).await.map_err(|_e| {
1009                        HttpCacheError::http(Box::new(std::io::Error::other(
1010                            "service error".to_string(),
1011                        )))
1012                    })?;
1013
1014                let cached_response = cache
1015                    .process_response(analysis, response, None)
1016                    .await
1017                    .cache_err()?;
1018
1019                let mut final_response = cached_response;
1020
1021                // Add cache status headers if enabled
1022                if cache.options.cache_status_headers {
1023                    final_response = add_cache_status_headers_streaming(
1024                        final_response,
1025                        "MISS",
1026                        "MISS",
1027                    );
1028                }
1029
1030                return Ok(final_response);
1031            }
1032
1033            // Look up cached response using interface
1034            if let Some((cached_response, policy)) = cache
1035                .lookup_cached_response(&analysis.cache_key)
1036                .await
1037                .cache_err()?
1038            {
1039                let before_req =
1040                    policy.before_request(&parts, std::time::SystemTime::now());
1041                match before_req {
1042                    BeforeRequest::Fresh(_) => {
1043                        let mut response = cached_response;
1044
1045                        // Add cache status headers if enabled
1046                        if cache.options.cache_status_headers {
1047                            response = add_cache_status_headers_streaming(
1048                                response, "HIT", "HIT",
1049                            );
1050                        }
1051
1052                        return Ok(response);
1053                    }
1054                    BeforeRequest::Stale {
1055                        request: conditional_parts, ..
1056                    } => {
1057                        // Apply rate limiting before conditional request
1058                        #[cfg(feature = "rate-limiting")]
1059                        if let Some(rate_limiter) = &cache.options.rate_limiter
1060                        {
1061                            if let Ok(url) = conditional_parts
1062                                .uri
1063                                .to_string()
1064                                .parse::<::url::Url>()
1065                            {
1066                                let rate_limit_key =
1067                                    url.host_str().unwrap_or("unknown");
1068                                rate_limiter
1069                                    .until_key_ready(rate_limit_key)
1070                                    .await;
1071                            }
1072                        }
1073
1074                        let conditional_req =
1075                            Request::from_parts(conditional_parts, body);
1076                        let conditional_response = inner_service
1077                            .oneshot(conditional_req)
1078                            .await
1079                            .map_err(|_e| {
1080                                HttpCacheError::http(Box::new(
1081                                    std::io::Error::other(
1082                                        "service error".to_string(),
1083                                    ),
1084                                ))
1085                            })?;
1086
1087                        if conditional_response.status() == 304 {
1088                            let (fresh_parts, _) =
1089                                conditional_response.into_parts();
1090                            let updated_response = cache
1091                                .handle_not_modified(
1092                                    cached_response,
1093                                    &fresh_parts,
1094                                )
1095                                .await
1096                                .cache_err()?;
1097
1098                            let mut response = updated_response;
1099
1100                            // Add cache status headers if enabled
1101                            if cache.options.cache_status_headers {
1102                                response = add_cache_status_headers_streaming(
1103                                    response, "HIT", "HIT",
1104                                );
1105                            }
1106
1107                            return Ok(response);
1108                        } else {
1109                            let cached_response = cache
1110                                .process_response(
1111                                    analysis,
1112                                    conditional_response,
1113                                    None,
1114                                )
1115                                .await
1116                                .cache_err()?;
1117
1118                            let mut response = cached_response;
1119
1120                            // Add cache status headers if enabled
1121                            if cache.options.cache_status_headers {
1122                                response = add_cache_status_headers_streaming(
1123                                    response, "MISS", "MISS",
1124                                );
1125                            }
1126
1127                            return Ok(response);
1128                        }
1129                    }
1130                }
1131            }
1132
1133            // Apply rate limiting before fresh request
1134            #[cfg(feature = "rate-limiting")]
1135            if let Some(rate_limiter) = &cache.options.rate_limiter {
1136                if let Ok(url) = parts.uri.to_string().parse::<url::Url>() {
1137                    let rate_limit_key = url.host_str().unwrap_or("unknown");
1138                    rate_limiter.until_key_ready(rate_limit_key).await;
1139                }
1140            }
1141
1142            // Fetch fresh response
1143            let req = Request::from_parts(parts, body);
1144            let response = inner_service.oneshot(req).await.map_err(|_e| {
1145                HttpCacheError::http(Box::new(std::io::Error::other(
1146                    "service error".to_string(),
1147                )))
1148            })?;
1149
1150            // Process using streaming interface
1151            let cached_response = cache
1152                .process_response(analysis, response, None)
1153                .await
1154                .cache_err()?;
1155
1156            let mut final_response = cached_response;
1157
1158            // Add cache status headers if enabled
1159            if cache.options.cache_status_headers {
1160                final_response = add_cache_status_headers_streaming(
1161                    final_response,
1162                    "MISS",
1163                    "MISS",
1164                );
1165            }
1166
1167            Ok(final_response)
1168        })
1169    }
1170}
1171
1172/// Body type that wraps cached responses  
1173pub enum HttpCacheBody<B> {
1174    /// Buffered body from cache
1175    Buffered(Vec<u8>),
1176    /// Original body (fallback)
1177    Original(B),
1178}
1179
1180impl<B> Body for HttpCacheBody<B>
1181where
1182    B: Body + Unpin,
1183    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
1184    B::Data: Into<bytes::Bytes>,
1185{
1186    type Data = bytes::Bytes;
1187    type Error = Box<dyn std::error::Error + Send + Sync>;
1188
1189    fn poll_frame(
1190        mut self: Pin<&mut Self>,
1191        cx: &mut Context<'_>,
1192    ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
1193        match &mut *self {
1194            HttpCacheBody::Buffered(bytes) => {
1195                if bytes.is_empty() {
1196                    Poll::Ready(None)
1197                } else {
1198                    let data = std::mem::take(bytes);
1199                    Poll::Ready(Some(Ok(http_body::Frame::data(
1200                        bytes::Bytes::from(data),
1201                    ))))
1202                }
1203            }
1204            HttpCacheBody::Original(body) => {
1205                Pin::new(body).poll_frame(cx).map(|opt| {
1206                    opt.map(|res| {
1207                        res.map(|frame| frame.map_data(Into::into))
1208                            .map_err(Into::into)
1209                    })
1210                })
1211            }
1212        }
1213    }
1214
1215    fn is_end_stream(&self) -> bool {
1216        match self {
1217            HttpCacheBody::Buffered(bytes) => bytes.is_empty(),
1218            HttpCacheBody::Original(body) => body.is_end_stream(),
1219        }
1220    }
1221
1222    fn size_hint(&self) -> http_body::SizeHint {
1223        match self {
1224            HttpCacheBody::Buffered(bytes) => {
1225                let len = bytes.len() as u64;
1226                http_body::SizeHint::with_exact(len)
1227            }
1228            HttpCacheBody::Original(body) => body.size_hint(),
1229        }
1230    }
1231}
1232
1233#[cfg(test)]
1234mod test;