http_cache_tower/
lib.rs

1//! HTTP caching middleware for Tower services and Axum applications.
2//!
3//! This crate provides Tower layers that implement HTTP caching according to RFC 7234.
4//! It supports both traditional buffered caching and streaming responses for large payloads.
5//!
6//! ## Basic Usage
7//!
8//! ### With Tower Services
9//!
10//! ```rust,no_run
11//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
12//! use http_cache::{CacheMode, HttpCache, HttpCacheOptions};
13//! use tower::ServiceBuilder;
14//! use tower::service_fn;
15//! use tower::ServiceExt;
16//! use http::{Request, Response};
17//! use http_body_util::Full;
18//! use bytes::Bytes;
19//! use std::convert::Infallible;
20//!
21//! async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
22//!     Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
23//! }
24//!
25//! #[tokio::main]
26//! async fn main() {
27//!     // Create cache manager with disk storage
28//!     let cache_manager = CACacheManager::new("./cache".into(), true);
29//!     
30//!     // Create cache layer
31//!     let cache_layer = HttpCacheLayer::new(cache_manager);
32//!     
33//!     // Build service with caching
34//!     let service = ServiceBuilder::new()
35//!         .layer(cache_layer)
36//!         .service_fn(handler);
37//!     
38//!     // Use the service
39//!     let request = Request::builder()
40//!         .uri("http://example.com")
41//!         .body(Full::new(Bytes::new()))
42//!         .unwrap();
43//!     let response = service.oneshot(request).await.unwrap();
44//! }
45//! ```
46//!
47//! ### With Custom Cache Configuration
48//!
49//! ```rust
50//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
51//! use http_cache::{CacheMode, HttpCache, HttpCacheOptions};
52//!
53//! # #[tokio::main]
54//! # async fn main() {
55//! // Create cache manager
56//! let cache_manager = CACacheManager::new("./cache".into(), true);
57//!
58//! // Configure cache behavior
59//! let cache = HttpCache {
60//!     mode: CacheMode::Default,
61//!     manager: cache_manager,
62//!     options: HttpCacheOptions::default(),
63//! };
64//!
65//! // Create layer with custom cache
66//! let cache_layer = HttpCacheLayer::with_cache(cache);
67//! # }
68//! ```
69//!
70//! ### Streaming Support
71//!
72//! For handling large responses without buffering, use `StreamingManager`:
73//!
74//! ```rust
75//! use http_cache_tower::HttpCacheStreamingLayer;
76//! use http_cache::StreamingManager;
77//! use std::path::PathBuf;
78//!
79//! # #[tokio::main]
80//! # async fn main() {
81//! // Create streaming cache setup
82//! let streaming_manager = StreamingManager::new("./streaming-cache".into());
83//! let streaming_layer = HttpCacheStreamingLayer::new(streaming_manager);
84//!
85//! // Use with your service
86//! // let service = streaming_layer.layer(your_service);
87//! # }
88//! ```
89//!
90//! ## Cache Modes
91//!
92//! Different cache modes provide different behaviors:
93//!
94//! - `CacheMode::Default`: Follow HTTP caching rules strictly
95//! - `CacheMode::NoStore`: Never cache responses
96//! - `CacheMode::NoCache`: Always revalidate with the origin server
97//! - `CacheMode::ForceCache`: Cache responses even if headers suggest otherwise
98//! - `CacheMode::OnlyIfCached`: Only serve from cache, never hit origin server
99//! - `CacheMode::IgnoreRules`: Cache everything regardless of headers
100//!
101//! ## Cache Invalidation
102//!
103//! The middleware automatically handles cache invalidation for unsafe HTTP methods:
104//!
105//! ```text
106//! These methods will invalidate any cached GET response for the same URI:
107//! - PUT /api/users/123    -> invalidates GET /api/users/123
108//! - POST /api/users/123   -> invalidates GET /api/users/123  
109//! - DELETE /api/users/123 -> invalidates GET /api/users/123
110//! - PATCH /api/users/123  -> invalidates GET /api/users/123
111//! ```
112//!
113//! ## Integration with Other Tower Layers
114//!
115//! The cache layer works with other Tower middleware:
116//!
117//! ```rust,no_run
118//! use tower::ServiceBuilder;
119//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
120//! use tower::service_fn;
121//! use tower::ServiceExt;
122//! use http::{Request, Response};
123//! use http_body_util::Full;
124//! use bytes::Bytes;
125//! use std::convert::Infallible;
126//!
127//! async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
128//!     Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
129//! }
130//!
131//! #[tokio::main]
132//! async fn main() {
133//!     let cache_manager = CACacheManager::new("./cache".into(), true);
134//!     let cache_layer = HttpCacheLayer::new(cache_manager);
135//!
136//!     let service = ServiceBuilder::new()
137//!         // .layer(TraceLayer::new_for_http())  // Logging (requires tower-http)
138//!         // .layer(CompressionLayer::new())     // Compression (requires tower-http)
139//!         .layer(cache_layer)                    // Caching
140//!         .service_fn(handler);
141//!     
142//!     // Use the service
143//!     let request = Request::builder()
144//!         .uri("http://example.com")
145//!         .body(Full::new(Bytes::new()))
146//!         .unwrap();
147//!     let response = service.oneshot(request).await.unwrap();
148//! }
149//! ```
150
151use bytes::Bytes;
152use http::{Request, Response};
153use http_body::Body;
154use http_body_util::BodyExt;
155#[cfg(feature = "manager-cacache")]
156pub use http_cache::CACacheManager;
157#[cfg(feature = "streaming")]
158use http_cache::StreamingError;
159use http_cache::{
160    CacheManager, CacheMode, HttpCache, HttpCacheInterface, HttpCacheOptions,
161};
162#[cfg(feature = "streaming")]
163use http_cache::{
164    HttpCacheStreamInterface, HttpStreamingCache, StreamingCacheManager,
165};
166use std::{
167    pin::Pin,
168    sync::Arc,
169    task::{Context, Poll},
170};
171use tower::{Layer, Service, ServiceExt};
172
173pub mod error;
174pub use error::HttpCacheError;
175#[cfg(feature = "streaming")]
176pub use error::TowerStreamingError;
177
178/// Helper functions for error conversions
179trait HttpCacheErrorExt<T> {
180    fn cache_err(self) -> Result<T, HttpCacheError>;
181}
182
183trait HttpErrorExt<T> {
184    fn http_err(self) -> Result<T, HttpCacheError>;
185}
186
187impl<T, E> HttpCacheErrorExt<T> for Result<T, E>
188where
189    E: ToString,
190{
191    fn cache_err(self) -> Result<T, HttpCacheError> {
192        self.map_err(|e| HttpCacheError::CacheError(e.to_string()))
193    }
194}
195
196impl<T, E> HttpErrorExt<T> for Result<T, E>
197where
198    E: Into<Box<dyn std::error::Error + Send + Sync>>,
199{
200    fn http_err(self) -> Result<T, HttpCacheError> {
201        self.map_err(|e| HttpCacheError::HttpError(e.into()))
202    }
203}
204
205/// Helper function to collect a body into bytes
206async fn collect_body<B>(body: B) -> Result<Vec<u8>, B::Error>
207where
208    B: Body,
209{
210    let collected = BodyExt::collect(body).await?;
211    Ok(collected.to_bytes().to_vec())
212}
213
214/// HTTP cache layer for Tower services.
215///
216/// This layer implements HTTP caching according to RFC 7234, automatically caching
217/// GET and HEAD responses based on their cache-control headers and invalidating
218/// cache entries when unsafe methods (PUT, POST, DELETE, PATCH) are used.
219///
220/// # Example
221///
222/// ```rust
223/// use http_cache_tower::{HttpCacheLayer, CACacheManager};
224/// use tower::ServiceBuilder;
225/// use tower::service_fn;
226/// use http::{Request, Response};
227/// use http_body_util::Full;
228/// use bytes::Bytes;
229/// use std::convert::Infallible;
230///
231/// # #[tokio::main]
232/// # async fn main() {
233/// let cache_manager = CACacheManager::new("./cache".into(), true);
234/// let cache_layer = HttpCacheLayer::new(cache_manager);
235///
236/// // Use with ServiceBuilder
237/// let service = ServiceBuilder::new()
238///     .layer(cache_layer)
239///     .service_fn(|_req: Request<Full<Bytes>>| async {
240///         Ok::<_, Infallible>(Response::new(Full::new(Bytes::from("Hello"))))
241///     });
242/// # }
243/// ```
244#[derive(Clone)]
245pub struct HttpCacheLayer<CM>
246where
247    CM: CacheManager,
248{
249    cache: Arc<HttpCache<CM>>,
250}
251
252impl<CM> HttpCacheLayer<CM>
253where
254    CM: CacheManager,
255{
256    /// Create a new HTTP cache layer with default configuration.
257    ///
258    /// Uses [`CacheMode::Default`] and default [`HttpCacheOptions`].
259    ///
260    /// # Arguments
261    ///
262    /// * `cache_manager` - The cache manager to use for storing responses
263    ///
264    /// # Example
265    ///
266    /// ```rust
267    /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
268    ///
269    /// # #[tokio::main]
270    /// # async fn main() {
271    /// let cache_manager = CACacheManager::new("./cache".into(), true);
272    /// let layer = HttpCacheLayer::new(cache_manager);
273    /// # }
274    /// ```
275    pub fn new(cache_manager: CM) -> Self {
276        Self {
277            cache: Arc::new(HttpCache {
278                mode: CacheMode::Default,
279                manager: cache_manager,
280                options: HttpCacheOptions::default(),
281            }),
282        }
283    }
284
285    /// Create a new HTTP cache layer with custom options.
286    ///
287    /// Uses [`CacheMode::Default`] but allows customizing the cache behavior
288    /// through [`HttpCacheOptions`].
289    ///
290    /// # Arguments
291    ///
292    /// * `cache_manager` - The cache manager to use for storing responses
293    /// * `options` - Custom cache options
294    ///
295    /// # Example
296    ///
297    /// ```rust
298    /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
299    /// use http_cache::HttpCacheOptions;
300    ///
301    /// # #[tokio::main]
302    /// # async fn main() {
303    /// let cache_manager = CACacheManager::new("./cache".into(), true);
304    ///
305    /// let options = HttpCacheOptions {
306    ///     cache_key: Some(std::sync::Arc::new(|req: &http::request::Parts| {
307    ///         format!("custom:{}:{}", req.method, req.uri)
308    ///     })),
309    ///     ..Default::default()
310    /// };
311    ///
312    /// let layer = HttpCacheLayer::with_options(cache_manager, options);
313    /// # }
314    /// ```
315    pub fn with_options(cache_manager: CM, options: HttpCacheOptions) -> Self {
316        Self {
317            cache: Arc::new(HttpCache {
318                mode: CacheMode::Default,
319                manager: cache_manager,
320                options,
321            }),
322        }
323    }
324
325    /// Create a new HTTP cache layer with a pre-configured cache.
326    ///
327    /// This method gives you full control over the cache configuration,
328    /// including the cache mode.
329    ///
330    /// # Arguments
331    ///
332    /// * `cache` - A fully configured HttpCache instance
333    ///
334    /// # Example
335    ///
336    /// ```rust
337    /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
338    /// use http_cache::{HttpCache, CacheMode, HttpCacheOptions};
339    ///
340    /// # #[tokio::main]
341    /// # async fn main() {
342    /// let cache_manager = CACacheManager::new("./cache".into(), true);
343    ///
344    /// let cache = HttpCache {
345    ///     mode: CacheMode::ForceCache,
346    ///     manager: cache_manager,
347    ///     options: HttpCacheOptions::default(),
348    /// };
349    ///
350    /// let layer = HttpCacheLayer::with_cache(cache);
351    /// # }
352    /// ```
353    pub fn with_cache(cache: HttpCache<CM>) -> Self {
354        Self { cache: Arc::new(cache) }
355    }
356}
357
358/// HTTP cache layer with streaming support for Tower services.
359///
360/// This layer provides the same HTTP caching functionality as [`HttpCacheLayer`]
361/// but handles streaming responses. It can work with large
362/// responses without buffering them entirely in memory.
363///
364/// # Example
365///
366/// ```rust
367/// use http_cache_tower::HttpCacheStreamingLayer;
368/// use http_cache::StreamingManager;
369/// use tower::ServiceBuilder;
370/// use tower::service_fn;
371/// use http::{Request, Response};
372/// use http_body_util::Full;
373/// use bytes::Bytes;
374/// use std::convert::Infallible;
375///
376/// async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
377///     Ok(Response::new(Full::new(Bytes::from("Hello"))))
378/// }
379///
380/// # #[tokio::main]
381/// # async fn main() {
382/// let streaming_manager = StreamingManager::new("./cache".into());
383/// let streaming_layer = HttpCacheStreamingLayer::new(streaming_manager);
384///
385/// // Use with ServiceBuilder
386/// let service = ServiceBuilder::new()
387///     .layer(streaming_layer)
388///     .service_fn(handler);
389/// # }
390/// ```
391#[cfg(feature = "streaming")]
392#[derive(Clone)]
393pub struct HttpCacheStreamingLayer<CM>
394where
395    CM: StreamingCacheManager,
396{
397    cache: Arc<HttpStreamingCache<CM>>,
398}
399
400#[cfg(feature = "streaming")]
401impl<CM> HttpCacheStreamingLayer<CM>
402where
403    CM: StreamingCacheManager,
404{
405    /// Create a new HTTP cache streaming layer with default configuration.
406    ///
407    /// Uses [`CacheMode::Default`] and default [`HttpCacheOptions`].
408    ///
409    /// # Arguments
410    ///
411    /// * `cache_manager` - The streaming cache manager to use
412    ///
413    /// # Example
414    ///
415    /// ```rust
416    /// use http_cache_tower::HttpCacheStreamingLayer;
417    /// use http_cache::StreamingManager;
418    ///
419    /// # #[tokio::main]
420    /// # async fn main() {
421    /// let streaming_manager = StreamingManager::new("./cache".into());
422    /// let layer = HttpCacheStreamingLayer::new(streaming_manager);
423    /// # }
424    /// ```
425    pub fn new(cache_manager: CM) -> Self {
426        Self {
427            cache: Arc::new(HttpStreamingCache {
428                mode: CacheMode::Default,
429                manager: cache_manager,
430                options: HttpCacheOptions::default(),
431            }),
432        }
433    }
434
435    /// Create a new HTTP cache streaming layer with custom options.
436    ///
437    /// Uses [`CacheMode::Default`] but allows customizing cache behavior.
438    ///
439    /// # Arguments
440    ///
441    /// * `cache_manager` - The streaming cache manager to use
442    /// * `options` - Custom cache options
443    ///
444    /// # Example
445    ///
446    /// ```rust
447    /// use http_cache_tower::HttpCacheStreamingLayer;
448    /// use http_cache::{StreamingManager, HttpCacheOptions};
449    ///
450    /// # #[tokio::main]
451    /// # async fn main() {
452    /// let streaming_manager = StreamingManager::new("./cache".into());
453    ///
454    /// let options = HttpCacheOptions {
455    ///     cache_key: Some(std::sync::Arc::new(|req: &http::request::Parts| {
456    ///         format!("stream:{}:{}", req.method, req.uri)
457    ///     })),
458    ///     ..Default::default()
459    /// };
460    ///
461    /// let layer = HttpCacheStreamingLayer::with_options(streaming_manager, options);
462    /// # }
463    /// ```
464    pub fn with_options(cache_manager: CM, options: HttpCacheOptions) -> Self {
465        Self {
466            cache: Arc::new(HttpStreamingCache {
467                mode: CacheMode::Default,
468                manager: cache_manager,
469                options,
470            }),
471        }
472    }
473
474    /// Create a new HTTP cache streaming layer with a pre-configured cache.
475    ///
476    /// This method gives you full control over the streaming cache configuration.
477    ///
478    /// # Arguments
479    ///
480    /// * `cache` - A fully configured HttpStreamingCache instance
481    ///
482    /// # Example
483    ///
484    /// ```rust
485    /// use http_cache_tower::HttpCacheStreamingLayer;
486    /// use http_cache::{StreamingManager, HttpStreamingCache, CacheMode, HttpCacheOptions};
487    ///
488    /// # #[tokio::main]
489    /// # async fn main() {
490    /// let streaming_manager = StreamingManager::new("./cache".into());
491    ///
492    /// let cache = HttpStreamingCache {
493    ///     mode: CacheMode::ForceCache,
494    ///     manager: streaming_manager,
495    ///     options: HttpCacheOptions::default(),
496    /// };
497    ///
498    /// let layer = HttpCacheStreamingLayer::with_cache(cache);
499    /// # }
500    /// ```
501    pub fn with_cache(cache: HttpStreamingCache<CM>) -> Self {
502        Self { cache: Arc::new(cache) }
503    }
504}
505
506impl<S, CM> Layer<S> for HttpCacheLayer<CM>
507where
508    CM: CacheManager,
509{
510    type Service = HttpCacheService<S, CM>;
511
512    fn layer(&self, inner: S) -> Self::Service {
513        HttpCacheService { inner, cache: self.cache.clone() }
514    }
515}
516
517#[cfg(feature = "streaming")]
518impl<S, CM> Layer<S> for HttpCacheStreamingLayer<CM>
519where
520    CM: StreamingCacheManager,
521{
522    type Service = HttpCacheStreamingService<S, CM>;
523
524    fn layer(&self, inner: S) -> Self::Service {
525        HttpCacheStreamingService { inner, cache: self.cache.clone() }
526    }
527}
528
529/// HTTP cache service for Tower/Hyper
530pub struct HttpCacheService<S, CM>
531where
532    CM: CacheManager,
533{
534    inner: S,
535    cache: Arc<HttpCache<CM>>,
536}
537
538impl<S, CM> Clone for HttpCacheService<S, CM>
539where
540    S: Clone,
541    CM: CacheManager,
542{
543    fn clone(&self) -> Self {
544        Self { inner: self.inner.clone(), cache: self.cache.clone() }
545    }
546}
547
548/// HTTP cache streaming service for Tower/Hyper
549#[cfg(feature = "streaming")]
550pub struct HttpCacheStreamingService<S, CM>
551where
552    CM: StreamingCacheManager,
553{
554    inner: S,
555    cache: Arc<HttpStreamingCache<CM>>,
556}
557
558#[cfg(feature = "streaming")]
559impl<S, CM> Clone for HttpCacheStreamingService<S, CM>
560where
561    S: Clone,
562    CM: StreamingCacheManager,
563{
564    fn clone(&self) -> Self {
565        Self { inner: self.inner.clone(), cache: self.cache.clone() }
566    }
567}
568
569impl<S, CM, ReqBody, ResBody> Service<Request<ReqBody>>
570    for HttpCacheService<S, CM>
571where
572    S: Service<Request<ReqBody>, Response = Response<ResBody>>
573        + Clone
574        + Send
575        + 'static,
576    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
577    S::Future: Send + 'static,
578    ReqBody: Body + Send + 'static,
579    ReqBody::Data: Send,
580    ReqBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
581    ResBody: Body + Send + 'static,
582    ResBody::Data: Send,
583    ResBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
584    CM: CacheManager,
585{
586    type Response = Response<HttpCacheBody<ResBody>>;
587    type Error = HttpCacheError;
588    type Future = Pin<
589        Box<
590            dyn std::future::Future<
591                    Output = Result<Self::Response, Self::Error>,
592                > + Send,
593        >,
594    >;
595
596    fn poll_ready(
597        &mut self,
598        cx: &mut Context<'_>,
599    ) -> Poll<Result<(), Self::Error>> {
600        self.inner
601            .poll_ready(cx)
602            .map_err(|e| HttpCacheError::HttpError(e.into()))
603    }
604
605    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
606        let cache = self.cache.clone();
607        let (parts, body) = req.into_parts();
608        let inner_service = self.inner.clone();
609
610        Box::pin(async move {
611            use http_cache_semantics::BeforeRequest;
612
613            // Use the core library's cache interface for analysis
614            let analysis = cache.analyze_request(&parts, None).cache_err()?;
615
616            // Handle cache busting and non-cacheable requests
617            for key in &analysis.cache_bust_keys {
618                cache.manager.delete(key).await.cache_err()?;
619            }
620
621            // For non-GET/HEAD requests, invalidate cached GET responses
622            if !analysis.should_cache && !analysis.is_get_head {
623                let get_cache_key = cache
624                    .options
625                    .create_cache_key_for_invalidation(&parts, "GET");
626                let _ = cache.manager.delete(&get_cache_key).await;
627            }
628
629            // If not cacheable, just pass through
630            if !analysis.should_cache {
631                let req = Request::from_parts(parts, body);
632                let response = inner_service.oneshot(req).await.http_err()?;
633                return Ok(response.map(HttpCacheBody::Original));
634            }
635
636            // Special case for Reload mode: skip cache lookup but still cache response
637            if analysis.cache_mode == CacheMode::Reload {
638                let req = Request::from_parts(parts, body);
639                let response = inner_service.oneshot(req).await.http_err()?;
640
641                let (res_parts, res_body) = response.into_parts();
642                let body_bytes = collect_body(res_body).await.http_err()?;
643
644                let cached_response = cache
645                    .process_response(
646                        analysis,
647                        Response::from_parts(res_parts, body_bytes.clone()),
648                    )
649                    .await
650                    .cache_err()?;
651
652                return Ok(cached_response.map(HttpCacheBody::Buffered));
653            }
654
655            // Look up cached response using interface
656            if let Some((cached_response, policy)) = cache
657                .lookup_cached_response(&analysis.cache_key)
658                .await
659                .cache_err()?
660            {
661                let before_req =
662                    policy.before_request(&parts, std::time::SystemTime::now());
663                match before_req {
664                    BeforeRequest::Fresh(_) => {
665                        // Return cached response
666                        let response = http_cache::HttpCacheOptions::http_response_to_response(
667                            &cached_response,
668                            HttpCacheBody::Buffered(cached_response.body.clone()),
669                        ).map_err(HttpCacheError::HttpError)?;
670                        return Ok(response);
671                    }
672                    BeforeRequest::Stale {
673                        request: conditional_parts, ..
674                    } => {
675                        // Make conditional request
676                        let conditional_req =
677                            Request::from_parts(conditional_parts, body);
678                        let conditional_response = inner_service
679                            .oneshot(conditional_req)
680                            .await
681                            .http_err()?;
682
683                        if conditional_response.status() == 304 {
684                            // Use cached response with updated headers
685                            let (fresh_parts, _) =
686                                conditional_response.into_parts();
687                            let updated_response = cache
688                                .handle_not_modified(
689                                    cached_response,
690                                    &fresh_parts,
691                                )
692                                .await
693                                .cache_err()?;
694
695                            let response = http_cache::HttpCacheOptions::http_response_to_response(
696                                &updated_response,
697                                HttpCacheBody::Buffered(updated_response.body.clone()),
698                            ).map_err(HttpCacheError::HttpError)?;
699                            return Ok(response);
700                        } else {
701                            // Process fresh response
702                            let (parts, res_body) =
703                                conditional_response.into_parts();
704                            let body_bytes =
705                                collect_body(res_body).await.http_err()?;
706
707                            let cached_response = cache
708                                .process_response(
709                                    analysis,
710                                    Response::from_parts(
711                                        parts,
712                                        body_bytes.clone(),
713                                    ),
714                                )
715                                .await
716                                .cache_err()?;
717
718                            return Ok(
719                                cached_response.map(HttpCacheBody::Buffered)
720                            );
721                        }
722                    }
723                }
724            }
725
726            // Fetch fresh response
727            let req = Request::from_parts(parts, body);
728            let response = inner_service.oneshot(req).await.http_err()?;
729
730            let (res_parts, res_body) = response.into_parts();
731            let body_bytes = collect_body(res_body).await.http_err()?;
732
733            // Process and cache using interface
734            let cached_response = cache
735                .process_response(
736                    analysis,
737                    Response::from_parts(res_parts, body_bytes.clone()),
738                )
739                .await
740                .cache_err()?;
741
742            Ok(cached_response.map(HttpCacheBody::Buffered))
743        })
744    }
745}
746
747// Hyper service implementation for HttpCacheService
748impl<S, CM> hyper::service::Service<Request<hyper::body::Incoming>>
749    for HttpCacheService<S, CM>
750where
751    S: Service<Request<hyper::body::Incoming>> + Clone + Send + 'static,
752    S::Response: Into<Response<http_body_util::Full<Bytes>>>,
753    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
754    S::Future: Send + 'static,
755    CM: CacheManager,
756{
757    type Response = Response<HttpCacheBody<http_body_util::Full<Bytes>>>;
758    type Error = HttpCacheError;
759    type Future = Pin<
760        Box<
761            dyn std::future::Future<
762                    Output = Result<Self::Response, Self::Error>,
763                > + Send,
764        >,
765    >;
766
767    fn call(&self, _req: Request<hyper::body::Incoming>) -> Self::Future {
768        // Convert to the format expected by the generic Service implementation
769        let service_clone = self.clone();
770        Box::pin(async move { service_clone.call(_req).await })
771    }
772}
773
774#[cfg(feature = "streaming")]
775impl<S, CM, ReqBody, ResBody> Service<Request<ReqBody>>
776    for HttpCacheStreamingService<S, CM>
777where
778    S: Service<Request<ReqBody>, Response = Response<ResBody>>
779        + Clone
780        + Send
781        + 'static,
782    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
783    S::Future: Send + 'static,
784    ReqBody: Body + Send + 'static,
785    ReqBody::Data: Send,
786    ReqBody::Error: Into<StreamingError>,
787    ResBody: Body + Send + 'static,
788    ResBody::Data: Send,
789    ResBody::Error: Into<StreamingError>,
790    CM: StreamingCacheManager,
791    <CM::Body as http_body::Body>::Data: Send,
792    <CM::Body as http_body::Body>::Error:
793        Into<StreamingError> + Send + Sync + 'static,
794{
795    type Response = Response<CM::Body>;
796    type Error = HttpCacheError;
797    type Future = Pin<
798        Box<
799            dyn std::future::Future<
800                    Output = Result<Self::Response, Self::Error>,
801                > + Send,
802        >,
803    >;
804
805    fn poll_ready(
806        &mut self,
807        cx: &mut Context<'_>,
808    ) -> Poll<Result<(), Self::Error>> {
809        self.inner
810            .poll_ready(cx)
811            .map_err(|e| HttpCacheError::HttpError(e.into()))
812    }
813
814    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
815        let cache = self.cache.clone();
816        let (parts, body) = req.into_parts();
817        let inner_service = self.inner.clone();
818
819        Box::pin(async move {
820            use http_cache_semantics::BeforeRequest;
821
822            // Use the core library's streaming cache interface
823            let analysis = cache.analyze_request(&parts, None).cache_err()?;
824
825            // Handle cache busting
826            for key in &analysis.cache_bust_keys {
827                cache.manager.delete(key).await.cache_err()?;
828            }
829
830            // For non-GET/HEAD requests, invalidate cached GET responses
831            if !analysis.should_cache && !analysis.is_get_head {
832                let get_cache_key = cache
833                    .options
834                    .create_cache_key_for_invalidation(&parts, "GET");
835                let _ = cache.manager.delete(&get_cache_key).await;
836            }
837
838            // If not cacheable, convert body type and return
839            if !analysis.should_cache {
840                let req = Request::from_parts(parts, body);
841                let response = inner_service.oneshot(req).await.http_err()?;
842                return cache.manager.convert_body(response).await.cache_err();
843            }
844
845            // Special case for Reload mode: skip cache lookup but still cache response
846            if analysis.cache_mode == CacheMode::Reload {
847                let req = Request::from_parts(parts, body);
848                let response = inner_service.oneshot(req).await.http_err()?;
849
850                let cached_response = cache
851                    .process_response(analysis, response)
852                    .await
853                    .cache_err()?;
854
855                return Ok(cached_response);
856            }
857
858            // Look up cached response using interface
859            if let Some((cached_response, policy)) = cache
860                .lookup_cached_response(&analysis.cache_key)
861                .await
862                .cache_err()?
863            {
864                let before_req =
865                    policy.before_request(&parts, std::time::SystemTime::now());
866                match before_req {
867                    BeforeRequest::Fresh(_) => {
868                        return Ok(cached_response);
869                    }
870                    BeforeRequest::Stale {
871                        request: conditional_parts, ..
872                    } => {
873                        let conditional_req =
874                            Request::from_parts(conditional_parts, body);
875                        let conditional_response = inner_service
876                            .oneshot(conditional_req)
877                            .await
878                            .http_err()?;
879
880                        if conditional_response.status() == 304 {
881                            let (fresh_parts, _) =
882                                conditional_response.into_parts();
883                            let updated_response = cache
884                                .handle_not_modified(
885                                    cached_response,
886                                    &fresh_parts,
887                                )
888                                .await
889                                .cache_err()?;
890                            return Ok(updated_response);
891                        } else {
892                            let cached_response = cache
893                                .process_response(
894                                    analysis,
895                                    conditional_response,
896                                )
897                                .await
898                                .cache_err()?;
899                            return Ok(cached_response);
900                        }
901                    }
902                }
903            }
904
905            // Fetch fresh response
906            let req = Request::from_parts(parts, body);
907            let response = inner_service.oneshot(req).await.http_err()?;
908
909            // Process using streaming interface
910            let cached_response =
911                cache.process_response(analysis, response).await.cache_err()?;
912
913            Ok(cached_response)
914        })
915    }
916}
917
918/// Body type that wraps cached responses  
919pub enum HttpCacheBody<B> {
920    /// Buffered body from cache
921    Buffered(Vec<u8>),
922    /// Original body (fallback)
923    Original(B),
924}
925
926impl<B> Body for HttpCacheBody<B>
927where
928    B: Body + Unpin,
929    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
930    B::Data: Into<bytes::Bytes>,
931{
932    type Data = bytes::Bytes;
933    type Error = Box<dyn std::error::Error + Send + Sync>;
934
935    fn poll_frame(
936        mut self: Pin<&mut Self>,
937        cx: &mut Context<'_>,
938    ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
939        match &mut *self {
940            HttpCacheBody::Buffered(bytes) => {
941                if bytes.is_empty() {
942                    Poll::Ready(None)
943                } else {
944                    let data = std::mem::take(bytes);
945                    Poll::Ready(Some(Ok(http_body::Frame::data(
946                        bytes::Bytes::from(data),
947                    ))))
948                }
949            }
950            HttpCacheBody::Original(body) => {
951                Pin::new(body).poll_frame(cx).map(|opt| {
952                    opt.map(|res| {
953                        res.map(|frame| frame.map_data(Into::into))
954                            .map_err(Into::into)
955                    })
956                })
957            }
958        }
959    }
960
961    fn is_end_stream(&self) -> bool {
962        match self {
963            HttpCacheBody::Buffered(bytes) => bytes.is_empty(),
964            HttpCacheBody::Original(body) => body.is_end_stream(),
965        }
966    }
967
968    fn size_hint(&self) -> http_body::SizeHint {
969        match self {
970            HttpCacheBody::Buffered(bytes) => {
971                let len = bytes.len() as u64;
972                http_body::SizeHint::with_exact(len)
973            }
974            HttpCacheBody::Original(body) => body.size_hint(),
975        }
976    }
977}
978
979#[cfg(test)]
980mod test;