http_cache_tower/
lib.rs

1//! HTTP caching middleware for Tower services and Axum applications.
2//!
3//! This crate provides Tower layers that implement HTTP caching according to RFC 7234.
4//! It supports both traditional buffered caching and streaming responses for large payloads.
5//!
6//! ## Basic Usage
7//!
8//! ### With Tower Services
9//!
10//! ```rust,no_run
11//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
12//! use http_cache::{CacheMode, HttpCache, HttpCacheOptions};
13//! use tower::ServiceBuilder;
14//! use tower::service_fn;
15//! use tower::ServiceExt;
16//! use http::{Request, Response};
17//! use http_body_util::Full;
18//! use bytes::Bytes;
19//! use std::convert::Infallible;
20//!
21//! async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
22//!     Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
23//! }
24//!
25//! #[tokio::main]
26//! async fn main() {
27//!     // Create cache manager with disk storage
28//!     let cache_manager = CACacheManager::new("./cache".into(), true);
29//!     
30//!     // Create cache layer
31//!     let cache_layer = HttpCacheLayer::new(cache_manager);
32//!     
33//!     // Build service with caching
34//!     let service = ServiceBuilder::new()
35//!         .layer(cache_layer)
36//!         .service_fn(handler);
37//!     
38//!     // Use the service
39//!     let request = Request::builder()
40//!         .uri("http://example.com")
41//!         .body(Full::new(Bytes::new()))
42//!         .unwrap();
43//!     let response = service.oneshot(request).await.unwrap();
44//! }
45//! ```
46//!
47//! ### With Custom Cache Configuration
48//!
49//! ```rust
50//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
51//! use http_cache::{CacheMode, HttpCache, HttpCacheOptions};
52//!
53//! # #[tokio::main]
54//! # async fn main() {
55//! // Create cache manager
56//! let cache_manager = CACacheManager::new("./cache".into(), true);
57//!
58//! // Configure cache behavior
59//! let cache = HttpCache {
60//!     mode: CacheMode::Default,
61//!     manager: cache_manager,
62//!     options: HttpCacheOptions::default(),
63//! };
64//!
65//! // Create layer with custom cache
66//! let cache_layer = HttpCacheLayer::with_cache(cache);
67//! # }
68//! ```
69//!
70//! ### Streaming Support
71//!
72//! For handling large responses without buffering, use `StreamingManager`:
73//!
74//! ```rust
75//! use http_cache_tower::HttpCacheStreamingLayer;
76//! use http_cache::StreamingManager;
77//! use std::path::PathBuf;
78//!
79//! # #[tokio::main]
80//! # async fn main() {
81//! // Create streaming cache setup
82//! let streaming_manager = StreamingManager::new("./streaming-cache".into());
83//! let streaming_layer = HttpCacheStreamingLayer::new(streaming_manager);
84//!
85//! // Use with your service
86//! // let service = streaming_layer.layer(your_service);
87//! # }
88//! ```
89//!
90//! ## Cache Modes
91//!
92//! Different cache modes provide different behaviors:
93//!
94//! - `CacheMode::Default`: Follow HTTP caching rules strictly
95//! - `CacheMode::NoStore`: Never cache responses
96//! - `CacheMode::NoCache`: Always revalidate with the origin server
97//! - `CacheMode::ForceCache`: Cache responses even if headers suggest otherwise
98//! - `CacheMode::OnlyIfCached`: Only serve from cache, never hit origin server
99//! - `CacheMode::IgnoreRules`: Cache everything regardless of headers
100//!
101//! ## Cache Invalidation
102//!
103//! The middleware automatically handles cache invalidation for unsafe HTTP methods:
104//!
105//! ```text
106//! These methods will invalidate any cached GET response for the same URI:
107//! - PUT /api/users/123    -> invalidates GET /api/users/123
108//! - POST /api/users/123   -> invalidates GET /api/users/123  
109//! - DELETE /api/users/123 -> invalidates GET /api/users/123
110//! - PATCH /api/users/123  -> invalidates GET /api/users/123
111//! ```
112//!
113//! ## Integration with Other Tower Layers
114//!
115//! The cache layer works with other Tower middleware:
116//!
117//! ```rust,no_run
118//! use tower::ServiceBuilder;
119//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
120//! use tower::service_fn;
121//! use tower::ServiceExt;
122//! use http::{Request, Response};
123//! use http_body_util::Full;
124//! use bytes::Bytes;
125//! use std::convert::Infallible;
126//!
127//! async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
128//!     Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
129//! }
130//!
131//! #[tokio::main]
132//! async fn main() {
133//!     let cache_manager = CACacheManager::new("./cache".into(), true);
134//!     let cache_layer = HttpCacheLayer::new(cache_manager);
135//!
136//!     let service = ServiceBuilder::new()
137//!         // .layer(TraceLayer::new_for_http())  // Logging (requires tower-http)
138//!         // .layer(CompressionLayer::new())     // Compression (requires tower-http)
139//!         .layer(cache_layer)                    // Caching
140//!         .service_fn(handler);
141//!     
142//!     // Use the service
143//!     let request = Request::builder()
144//!         .uri("http://example.com")
145//!         .body(Full::new(Bytes::new()))
146//!         .unwrap();
147//!     let response = service.oneshot(request).await.unwrap();
148//! }
149//! ```
150
151use bytes::Bytes;
152use http::{HeaderValue, Request, Response};
153use http_body::Body;
154use http_body_util::BodyExt;
155
156#[cfg(feature = "manager-cacache")]
157pub use http_cache::CACacheManager;
158
159#[cfg(feature = "rate-limiting")]
160pub use http_cache::rate_limiting::{
161    CacheAwareRateLimiter, DirectRateLimiter, DomainRateLimiter, Quota,
162};
163#[cfg(feature = "streaming")]
164use http_cache::StreamingError;
165use http_cache::{
166    CacheManager, CacheMode, HttpCache, HttpCacheInterface, HttpCacheOptions,
167};
168#[cfg(feature = "streaming")]
169use http_cache::{
170    HttpCacheStreamInterface, HttpStreamingCache, StreamingCacheManager,
171};
172use std::{
173    pin::Pin,
174    sync::Arc,
175    task::{Context, Poll},
176};
177use tower::{Layer, Service, ServiceExt};
178
179// Re-export unified error types from http-cache core
180pub use http_cache::HttpCacheError;
181
182#[cfg(feature = "streaming")]
183/// Type alias for tower streaming errors, using the unified streaming error system
184pub type TowerStreamingError = http_cache::ClientStreamingError;
185
186/// Helper functions for error conversions
187trait HttpCacheErrorExt<T> {
188    fn cache_err(self) -> Result<T, HttpCacheError>;
189}
190
191impl<T, E> HttpCacheErrorExt<T> for Result<T, E>
192where
193    E: ToString,
194{
195    fn cache_err(self) -> Result<T, HttpCacheError> {
196        self.map_err(|e| HttpCacheError::cache(e.to_string()))
197    }
198}
199
200/// Helper function to collect a body into bytes
201async fn collect_body<B>(body: B) -> Result<Vec<u8>, B::Error>
202where
203    B: Body,
204{
205    let collected = BodyExt::collect(body).await?;
206    Ok(collected.to_bytes().to_vec())
207}
208
209/// Helper function to add cache status headers to a response
210fn add_cache_status_headers<B>(
211    mut response: Response<HttpCacheBody<B>>,
212    hit_or_miss: &str,
213    cache_lookup: &str,
214) -> Response<HttpCacheBody<B>> {
215    let headers = response.headers_mut();
216    headers.insert(
217        http_cache::XCACHE,
218        HeaderValue::from_str(hit_or_miss).unwrap(),
219    );
220    headers.insert(
221        http_cache::XCACHELOOKUP,
222        HeaderValue::from_str(cache_lookup).unwrap(),
223    );
224    response
225}
226
227#[cfg(feature = "streaming")]
228fn add_cache_status_headers_streaming<B>(
229    mut response: Response<B>,
230    hit_or_miss: &str,
231    cache_lookup: &str,
232) -> Response<B> {
233    let headers = response.headers_mut();
234    headers.insert(
235        http_cache::XCACHE,
236        HeaderValue::from_str(hit_or_miss).unwrap(),
237    );
238    headers.insert(
239        http_cache::XCACHELOOKUP,
240        HeaderValue::from_str(cache_lookup).unwrap(),
241    );
242    response
243}
244
245/// HTTP cache layer for Tower services.
246///
247/// This layer implements HTTP caching according to RFC 7234, automatically caching
248/// GET and HEAD responses based on their cache-control headers and invalidating
249/// cache entries when unsafe methods (PUT, POST, DELETE, PATCH) are used.
250///
251/// # Example
252///
253/// ```rust
254/// use http_cache_tower::{HttpCacheLayer, CACacheManager};
255/// use tower::ServiceBuilder;
256/// use tower::service_fn;
257/// use http::{Request, Response};
258/// use http_body_util::Full;
259/// use bytes::Bytes;
260/// use std::convert::Infallible;
261///
262/// # #[tokio::main]
263/// # async fn main() {
264/// let cache_manager = CACacheManager::new("./cache".into(), true);
265/// let cache_layer = HttpCacheLayer::new(cache_manager);
266///
267/// // Use with ServiceBuilder
268/// let service = ServiceBuilder::new()
269///     .layer(cache_layer)
270///     .service_fn(|_req: Request<Full<Bytes>>| async {
271///         Ok::<_, Infallible>(Response::new(Full::new(Bytes::from("Hello"))))
272///     });
273/// # }
274/// ```
275#[derive(Clone)]
276pub struct HttpCacheLayer<CM>
277where
278    CM: CacheManager,
279{
280    cache: Arc<HttpCache<CM>>,
281}
282
283impl<CM> HttpCacheLayer<CM>
284where
285    CM: CacheManager,
286{
287    /// Create a new HTTP cache layer with default configuration.
288    ///
289    /// Uses [`CacheMode::Default`] and default [`HttpCacheOptions`].
290    ///
291    /// # Arguments
292    ///
293    /// * `cache_manager` - The cache manager to use for storing responses
294    ///
295    /// # Example
296    ///
297    /// ```rust
298    /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
299    ///
300    /// # #[tokio::main]
301    /// # async fn main() {
302    /// let cache_manager = CACacheManager::new("./cache".into(), true);
303    /// let layer = HttpCacheLayer::new(cache_manager);
304    /// # }
305    /// ```
306    pub fn new(cache_manager: CM) -> Self {
307        Self {
308            cache: Arc::new(HttpCache {
309                mode: CacheMode::Default,
310                manager: cache_manager,
311                options: HttpCacheOptions::default(),
312            }),
313        }
314    }
315
316    /// Create a new HTTP cache layer with custom options.
317    ///
318    /// Uses [`CacheMode::Default`] but allows customizing the cache behavior
319    /// through [`HttpCacheOptions`].
320    ///
321    /// # Arguments
322    ///
323    /// * `cache_manager` - The cache manager to use for storing responses
324    /// * `options` - Custom cache options
325    ///
326    /// # Example
327    ///
328    /// ```rust
329    /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
330    /// use http_cache::HttpCacheOptions;
331    ///
332    /// # #[tokio::main]
333    /// # async fn main() {
334    /// let cache_manager = CACacheManager::new("./cache".into(), true);
335    ///
336    /// let options = HttpCacheOptions {
337    ///     cache_key: Some(std::sync::Arc::new(|req: &http::request::Parts| {
338    ///         format!("custom:{}:{}", req.method, req.uri)
339    ///     })),
340    ///     ..Default::default()
341    /// };
342    ///
343    /// let layer = HttpCacheLayer::with_options(cache_manager, options);
344    /// # }
345    /// ```
346    pub fn with_options(cache_manager: CM, options: HttpCacheOptions) -> Self {
347        Self {
348            cache: Arc::new(HttpCache {
349                mode: CacheMode::Default,
350                manager: cache_manager,
351                options,
352            }),
353        }
354    }
355
356    /// Create a new HTTP cache layer with a pre-configured cache.
357    ///
358    /// This method gives you full control over the cache configuration,
359    /// including the cache mode.
360    ///
361    /// # Arguments
362    ///
363    /// * `cache` - A fully configured HttpCache instance
364    ///
365    /// # Example
366    ///
367    /// ```rust
368    /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
369    /// use http_cache::{HttpCache, CacheMode, HttpCacheOptions};
370    ///
371    /// # #[tokio::main]
372    /// # async fn main() {
373    /// let cache_manager = CACacheManager::new("./cache".into(), true);
374    ///
375    /// let cache = HttpCache {
376    ///     mode: CacheMode::ForceCache,
377    ///     manager: cache_manager,
378    ///     options: HttpCacheOptions::default(),
379    /// };
380    ///
381    /// let layer = HttpCacheLayer::with_cache(cache);
382    /// # }
383    /// ```
384    pub fn with_cache(cache: HttpCache<CM>) -> Self {
385        Self { cache: Arc::new(cache) }
386    }
387}
388
389/// HTTP cache layer with streaming support for Tower services.
390///
391/// This layer provides the same HTTP caching functionality as [`HttpCacheLayer`]
392/// but handles streaming responses. It can work with large
393/// responses without buffering them entirely in memory.
394///
395/// # Example
396///
397/// ```rust
398/// use http_cache_tower::HttpCacheStreamingLayer;
399/// use http_cache::StreamingManager;
400/// use tower::ServiceBuilder;
401/// use tower::service_fn;
402/// use http::{Request, Response};
403/// use http_body_util::Full;
404/// use bytes::Bytes;
405/// use std::convert::Infallible;
406///
407/// async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
408///     Ok(Response::new(Full::new(Bytes::from("Hello"))))
409/// }
410///
411/// # #[tokio::main]
412/// # async fn main() {
413/// let streaming_manager = StreamingManager::new("./cache".into());
414/// let streaming_layer = HttpCacheStreamingLayer::new(streaming_manager);
415///
416/// // Use with ServiceBuilder
417/// let service = ServiceBuilder::new()
418///     .layer(streaming_layer)
419///     .service_fn(handler);
420/// # }
421/// ```
422#[cfg(feature = "streaming")]
423#[derive(Clone)]
424pub struct HttpCacheStreamingLayer<CM>
425where
426    CM: StreamingCacheManager,
427{
428    cache: Arc<HttpStreamingCache<CM>>,
429}
430
431#[cfg(feature = "streaming")]
432impl<CM> HttpCacheStreamingLayer<CM>
433where
434    CM: StreamingCacheManager,
435{
436    /// Create a new HTTP cache streaming layer with default configuration.
437    ///
438    /// Uses [`CacheMode::Default`] and default [`HttpCacheOptions`].
439    ///
440    /// # Arguments
441    ///
442    /// * `cache_manager` - The streaming cache manager to use
443    ///
444    /// # Example
445    ///
446    /// ```rust
447    /// use http_cache_tower::HttpCacheStreamingLayer;
448    /// use http_cache::StreamingManager;
449    ///
450    /// # #[tokio::main]
451    /// # async fn main() {
452    /// let streaming_manager = StreamingManager::new("./cache".into());
453    /// let layer = HttpCacheStreamingLayer::new(streaming_manager);
454    /// # }
455    /// ```
456    pub fn new(cache_manager: CM) -> Self {
457        Self {
458            cache: Arc::new(HttpStreamingCache {
459                mode: CacheMode::Default,
460                manager: cache_manager,
461                options: HttpCacheOptions::default(),
462            }),
463        }
464    }
465
466    /// Create a new HTTP cache streaming layer with custom options.
467    ///
468    /// Uses [`CacheMode::Default`] but allows customizing cache behavior.
469    ///
470    /// # Arguments
471    ///
472    /// * `cache_manager` - The streaming cache manager to use
473    /// * `options` - Custom cache options
474    ///
475    /// # Example
476    ///
477    /// ```rust
478    /// use http_cache_tower::HttpCacheStreamingLayer;
479    /// use http_cache::{StreamingManager, HttpCacheOptions};
480    ///
481    /// # #[tokio::main]
482    /// # async fn main() {
483    /// let streaming_manager = StreamingManager::new("./cache".into());
484    ///
485    /// let options = HttpCacheOptions {
486    ///     cache_key: Some(std::sync::Arc::new(|req: &http::request::Parts| {
487    ///         format!("stream:{}:{}", req.method, req.uri)
488    ///     })),
489    ///     ..Default::default()
490    /// };
491    ///
492    /// let layer = HttpCacheStreamingLayer::with_options(streaming_manager, options);
493    /// # }
494    /// ```
495    pub fn with_options(cache_manager: CM, options: HttpCacheOptions) -> Self {
496        Self {
497            cache: Arc::new(HttpStreamingCache {
498                mode: CacheMode::Default,
499                manager: cache_manager,
500                options,
501            }),
502        }
503    }
504
505    /// Create a new HTTP cache streaming layer with a pre-configured cache.
506    ///
507    /// This method gives you full control over the streaming cache configuration.
508    ///
509    /// # Arguments
510    ///
511    /// * `cache` - A fully configured HttpStreamingCache instance
512    ///
513    /// # Example
514    ///
515    /// ```rust
516    /// use http_cache_tower::HttpCacheStreamingLayer;
517    /// use http_cache::{StreamingManager, HttpStreamingCache, CacheMode, HttpCacheOptions};
518    ///
519    /// # #[tokio::main]
520    /// # async fn main() {
521    /// let streaming_manager = StreamingManager::new("./cache".into());
522    ///
523    /// let cache = HttpStreamingCache {
524    ///     mode: CacheMode::ForceCache,
525    ///     manager: streaming_manager,
526    ///     options: HttpCacheOptions::default(),
527    /// };
528    ///
529    /// let layer = HttpCacheStreamingLayer::with_cache(cache);
530    /// # }
531    /// ```
532    pub fn with_cache(cache: HttpStreamingCache<CM>) -> Self {
533        Self { cache: Arc::new(cache) }
534    }
535}
536
537impl<S, CM> Layer<S> for HttpCacheLayer<CM>
538where
539    CM: CacheManager,
540{
541    type Service = HttpCacheService<S, CM>;
542
543    fn layer(&self, inner: S) -> Self::Service {
544        HttpCacheService { inner, cache: self.cache.clone() }
545    }
546}
547
548#[cfg(feature = "streaming")]
549impl<S, CM> Layer<S> for HttpCacheStreamingLayer<CM>
550where
551    CM: StreamingCacheManager,
552{
553    type Service = HttpCacheStreamingService<S, CM>;
554
555    fn layer(&self, inner: S) -> Self::Service {
556        HttpCacheStreamingService { inner, cache: self.cache.clone() }
557    }
558}
559
560/// HTTP cache service for Tower/Hyper
561pub struct HttpCacheService<S, CM>
562where
563    CM: CacheManager,
564{
565    inner: S,
566    cache: Arc<HttpCache<CM>>,
567}
568
569impl<S, CM> Clone for HttpCacheService<S, CM>
570where
571    S: Clone,
572    CM: CacheManager,
573{
574    fn clone(&self) -> Self {
575        Self { inner: self.inner.clone(), cache: self.cache.clone() }
576    }
577}
578
579/// HTTP cache streaming service for Tower/Hyper
580#[cfg(feature = "streaming")]
581pub struct HttpCacheStreamingService<S, CM>
582where
583    CM: StreamingCacheManager,
584{
585    inner: S,
586    cache: Arc<HttpStreamingCache<CM>>,
587}
588
589#[cfg(feature = "streaming")]
590impl<S, CM> Clone for HttpCacheStreamingService<S, CM>
591where
592    S: Clone,
593    CM: StreamingCacheManager,
594{
595    fn clone(&self) -> Self {
596        Self { inner: self.inner.clone(), cache: self.cache.clone() }
597    }
598}
599
600impl<S, CM, ReqBody, ResBody> Service<Request<ReqBody>>
601    for HttpCacheService<S, CM>
602where
603    S: Service<Request<ReqBody>, Response = Response<ResBody>>
604        + Clone
605        + Send
606        + 'static,
607    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
608    S::Future: Send + 'static,
609    ReqBody: Body + Send + 'static,
610    ReqBody::Data: Send,
611    ReqBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
612    ResBody: Body + Send + 'static,
613    ResBody::Data: Send,
614    ResBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
615    CM: CacheManager,
616{
617    type Response = Response<HttpCacheBody<ResBody>>;
618    type Error = HttpCacheError;
619    type Future = Pin<
620        Box<
621            dyn std::future::Future<
622                    Output = Result<Self::Response, Self::Error>,
623                > + Send,
624        >,
625    >;
626
627    fn poll_ready(
628        &mut self,
629        cx: &mut Context<'_>,
630    ) -> Poll<Result<(), Self::Error>> {
631        self.inner.poll_ready(cx).map_err(|_e| {
632            HttpCacheError::http(Box::new(std::io::Error::other(
633                "service error".to_string(),
634            )))
635        })
636    }
637
638    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
639        let cache = self.cache.clone();
640        let (parts, body) = req.into_parts();
641        let inner_service = self.inner.clone();
642
643        Box::pin(async move {
644            use http_cache_semantics::BeforeRequest;
645
646            // Use the core library's cache interface for analysis
647            let analysis = cache.analyze_request(&parts, None).cache_err()?;
648
649            // Handle cache busting and non-cacheable requests
650            for key in &analysis.cache_bust_keys {
651                cache.manager.delete(key).await.cache_err()?;
652            }
653
654            // For non-GET/HEAD requests, invalidate cached GET responses
655            if !analysis.should_cache && !analysis.is_get_head {
656                let get_cache_key = cache
657                    .options
658                    .create_cache_key_for_invalidation(&parts, "GET");
659                let _ = cache.manager.delete(&get_cache_key).await;
660            }
661
662            // If not cacheable, just pass through
663            if !analysis.should_cache {
664                let req = Request::from_parts(parts, body);
665                let response =
666                    inner_service.oneshot(req).await.map_err(|_e| {
667                        HttpCacheError::http(Box::new(std::io::Error::other(
668                            "service error".to_string(),
669                        )))
670                    })?;
671                return Ok(response.map(HttpCacheBody::Original));
672            }
673
674            // Special case for Reload mode: skip cache lookup but still cache response
675            if analysis.cache_mode == CacheMode::Reload {
676                let req = Request::from_parts(parts, body);
677                let response =
678                    inner_service.oneshot(req).await.map_err(|_e| {
679                        HttpCacheError::http(Box::new(std::io::Error::other(
680                            "service error".to_string(),
681                        )))
682                    })?;
683
684                let (res_parts, res_body) = response.into_parts();
685                let body_bytes =
686                    collect_body(res_body).await.map_err(|_e| {
687                        HttpCacheError::http(Box::new(std::io::Error::other(
688                            "service error".to_string(),
689                        )))
690                    })?;
691
692                let cached_response = cache
693                    .process_response(
694                        analysis,
695                        Response::from_parts(res_parts, body_bytes.clone()),
696                    )
697                    .await
698                    .cache_err()?;
699
700                return Ok(cached_response.map(HttpCacheBody::Buffered));
701            }
702
703            // Look up cached response using interface
704            if let Some((cached_response, policy)) = cache
705                .lookup_cached_response(&analysis.cache_key)
706                .await
707                .cache_err()?
708            {
709                let before_req =
710                    policy.before_request(&parts, std::time::SystemTime::now());
711                match before_req {
712                    BeforeRequest::Fresh(_) => {
713                        // Return cached response
714                        let mut response = http_cache::HttpCacheOptions::http_response_to_response(
715                            &cached_response,
716                            HttpCacheBody::Buffered(cached_response.body.clone()),
717                        ).map_err(HttpCacheError::other)?;
718
719                        // Add cache status headers if enabled
720                        if cache.options.cache_status_headers {
721                            response = add_cache_status_headers(
722                                response, "HIT", "HIT",
723                            );
724                        }
725
726                        return Ok(response);
727                    }
728                    BeforeRequest::Stale {
729                        request: conditional_parts, ..
730                    } => {
731                        // Make conditional request
732                        let conditional_req =
733                            Request::from_parts(conditional_parts, body);
734                        let conditional_response = inner_service
735                            .oneshot(conditional_req)
736                            .await
737                            .map_err(|_e| {
738                                HttpCacheError::http(Box::new(
739                                    std::io::Error::other(
740                                        "service error".to_string(),
741                                    ),
742                                ))
743                            })?;
744
745                        if conditional_response.status() == 304 {
746                            // Use cached response with updated headers
747                            let (fresh_parts, _) =
748                                conditional_response.into_parts();
749                            let updated_response = cache
750                                .handle_not_modified(
751                                    cached_response,
752                                    &fresh_parts,
753                                )
754                                .await
755                                .cache_err()?;
756
757                            let mut response = http_cache::HttpCacheOptions::http_response_to_response(
758                                &updated_response,
759                                HttpCacheBody::Buffered(updated_response.body.clone()),
760                            ).map_err(HttpCacheError::other)?;
761
762                            // Add cache status headers if enabled
763                            if cache.options.cache_status_headers {
764                                response = add_cache_status_headers(
765                                    response, "HIT", "HIT",
766                                );
767                            }
768
769                            return Ok(response);
770                        } else {
771                            // Process fresh response
772                            let (parts, res_body) =
773                                conditional_response.into_parts();
774                            let body_bytes =
775                                collect_body(res_body).await.map_err(|_e| {
776                                    HttpCacheError::http(Box::new(
777                                        std::io::Error::other(
778                                            "service error".to_string(),
779                                        ),
780                                    ))
781                                })?;
782
783                            let cached_response = cache
784                                .process_response(
785                                    analysis,
786                                    Response::from_parts(
787                                        parts,
788                                        body_bytes.clone(),
789                                    ),
790                                )
791                                .await
792                                .cache_err()?;
793
794                            let mut response =
795                                cached_response.map(HttpCacheBody::Buffered);
796
797                            // Add cache status headers if enabled
798                            if cache.options.cache_status_headers {
799                                response = add_cache_status_headers(
800                                    response, "MISS", "MISS",
801                                );
802                            }
803
804                            return Ok(response);
805                        }
806                    }
807                }
808            }
809
810            // Fetch fresh response
811            let req = Request::from_parts(parts, body);
812            let response = inner_service.oneshot(req).await.map_err(|_e| {
813                HttpCacheError::http(Box::new(std::io::Error::other(
814                    "service error".to_string(),
815                )))
816            })?;
817
818            let (res_parts, res_body) = response.into_parts();
819            let body_bytes = collect_body(res_body).await.map_err(|_e| {
820                HttpCacheError::http(Box::new(std::io::Error::other(
821                    "service error".to_string(),
822                )))
823            })?;
824
825            // Process and cache using interface
826            let cached_response = cache
827                .process_response(
828                    analysis,
829                    Response::from_parts(res_parts, body_bytes.clone()),
830                )
831                .await
832                .cache_err()?;
833
834            let mut response = cached_response.map(HttpCacheBody::Buffered);
835
836            // Add cache status headers if enabled
837            if cache.options.cache_status_headers {
838                response = add_cache_status_headers(response, "MISS", "MISS");
839            }
840
841            Ok(response)
842        })
843    }
844}
845
846// Hyper service implementation for HttpCacheService
847impl<S, CM> hyper::service::Service<Request<hyper::body::Incoming>>
848    for HttpCacheService<S, CM>
849where
850    S: Service<Request<hyper::body::Incoming>> + Clone + Send + 'static,
851    S::Response: Into<Response<http_body_util::Full<Bytes>>>,
852    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
853    S::Future: Send + 'static,
854    CM: CacheManager,
855{
856    type Response = Response<HttpCacheBody<http_body_util::Full<Bytes>>>;
857    type Error = HttpCacheError;
858    type Future = Pin<
859        Box<
860            dyn std::future::Future<
861                    Output = Result<Self::Response, Self::Error>,
862                > + Send,
863        >,
864    >;
865
866    fn call(&self, _req: Request<hyper::body::Incoming>) -> Self::Future {
867        // Convert to the format expected by the generic Service implementation
868        let service_clone = self.clone();
869        Box::pin(async move { service_clone.call(_req).await })
870    }
871}
872
873#[cfg(feature = "streaming")]
874impl<S, CM, ReqBody, ResBody> Service<Request<ReqBody>>
875    for HttpCacheStreamingService<S, CM>
876where
877    S: Service<Request<ReqBody>, Response = Response<ResBody>>
878        + Clone
879        + Send
880        + 'static,
881    S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
882    S::Future: Send + 'static,
883    ReqBody: Body + Send + 'static,
884    ReqBody::Data: Send,
885    ReqBody::Error: Into<StreamingError>,
886    ResBody: Body + Send + 'static,
887    ResBody::Data: Send,
888    ResBody::Error: Into<StreamingError>,
889    CM: StreamingCacheManager,
890    <CM::Body as http_body::Body>::Data: Send,
891    <CM::Body as http_body::Body>::Error:
892        Into<StreamingError> + Send + Sync + 'static,
893{
894    type Response = Response<CM::Body>;
895    type Error = HttpCacheError;
896    type Future = Pin<
897        Box<
898            dyn std::future::Future<
899                    Output = Result<Self::Response, Self::Error>,
900                > + Send,
901        >,
902    >;
903
904    fn poll_ready(
905        &mut self,
906        cx: &mut Context<'_>,
907    ) -> Poll<Result<(), Self::Error>> {
908        self.inner.poll_ready(cx).map_err(|_e| {
909            HttpCacheError::http(Box::new(std::io::Error::other(
910                "service error".to_string(),
911            )))
912        })
913    }
914
915    fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
916        let cache = self.cache.clone();
917        let (parts, body) = req.into_parts();
918        let inner_service = self.inner.clone();
919
920        Box::pin(async move {
921            use http_cache_semantics::BeforeRequest;
922
923            // Use the core library's streaming cache interface
924            let analysis = cache.analyze_request(&parts, None).cache_err()?;
925
926            // Handle cache busting
927            for key in &analysis.cache_bust_keys {
928                cache.manager.delete(key).await.cache_err()?;
929            }
930
931            // For non-GET/HEAD requests, invalidate cached GET responses
932            if !analysis.should_cache && !analysis.is_get_head {
933                let get_cache_key = cache
934                    .options
935                    .create_cache_key_for_invalidation(&parts, "GET");
936                let _ = cache.manager.delete(&get_cache_key).await;
937            }
938
939            // If not cacheable, convert body type and return
940            if !analysis.should_cache {
941                // Apply rate limiting before non-cached request
942                #[cfg(feature = "rate-limiting")]
943                if let Some(rate_limiter) = &cache.options.rate_limiter {
944                    if let Ok(url) = parts.uri.to_string().parse::<::url::Url>()
945                    {
946                        let rate_limit_key =
947                            url.host_str().unwrap_or("unknown");
948                        rate_limiter.until_key_ready(rate_limit_key).await;
949                    }
950                }
951
952                let req = Request::from_parts(parts, body);
953                let response =
954                    inner_service.oneshot(req).await.map_err(|_e| {
955                        HttpCacheError::http(Box::new(std::io::Error::other(
956                            "service error".to_string(),
957                        )))
958                    })?;
959                let mut converted_response =
960                    cache.manager.convert_body(response).await.cache_err()?;
961
962                // Add cache status headers if enabled
963                if cache.options.cache_status_headers {
964                    converted_response = add_cache_status_headers_streaming(
965                        converted_response,
966                        "MISS",
967                        "MISS",
968                    );
969                }
970
971                return Ok(converted_response);
972            }
973
974            // Special case for Reload mode: skip cache lookup but still cache response
975            if analysis.cache_mode == CacheMode::Reload {
976                // Apply rate limiting before reload request
977                #[cfg(feature = "rate-limiting")]
978                if let Some(rate_limiter) = &cache.options.rate_limiter {
979                    if let Ok(url) = parts.uri.to_string().parse::<::url::Url>()
980                    {
981                        let rate_limit_key =
982                            url.host_str().unwrap_or("unknown");
983                        rate_limiter.until_key_ready(rate_limit_key).await;
984                    }
985                }
986
987                let req = Request::from_parts(parts, body);
988                let response =
989                    inner_service.oneshot(req).await.map_err(|_e| {
990                        HttpCacheError::http(Box::new(std::io::Error::other(
991                            "service error".to_string(),
992                        )))
993                    })?;
994
995                let cached_response = cache
996                    .process_response(analysis, response)
997                    .await
998                    .cache_err()?;
999
1000                let mut final_response = cached_response;
1001
1002                // Add cache status headers if enabled
1003                if cache.options.cache_status_headers {
1004                    final_response = add_cache_status_headers_streaming(
1005                        final_response,
1006                        "MISS",
1007                        "MISS",
1008                    );
1009                }
1010
1011                return Ok(final_response);
1012            }
1013
1014            // Look up cached response using interface
1015            if let Some((cached_response, policy)) = cache
1016                .lookup_cached_response(&analysis.cache_key)
1017                .await
1018                .cache_err()?
1019            {
1020                let before_req =
1021                    policy.before_request(&parts, std::time::SystemTime::now());
1022                match before_req {
1023                    BeforeRequest::Fresh(_) => {
1024                        let mut response = cached_response;
1025
1026                        // Add cache status headers if enabled
1027                        if cache.options.cache_status_headers {
1028                            response = add_cache_status_headers_streaming(
1029                                response, "HIT", "HIT",
1030                            );
1031                        }
1032
1033                        return Ok(response);
1034                    }
1035                    BeforeRequest::Stale {
1036                        request: conditional_parts, ..
1037                    } => {
1038                        // Apply rate limiting before conditional request
1039                        #[cfg(feature = "rate-limiting")]
1040                        if let Some(rate_limiter) = &cache.options.rate_limiter
1041                        {
1042                            if let Ok(url) = conditional_parts
1043                                .uri
1044                                .to_string()
1045                                .parse::<::url::Url>()
1046                            {
1047                                let rate_limit_key =
1048                                    url.host_str().unwrap_or("unknown");
1049                                rate_limiter
1050                                    .until_key_ready(rate_limit_key)
1051                                    .await;
1052                            }
1053                        }
1054
1055                        let conditional_req =
1056                            Request::from_parts(conditional_parts, body);
1057                        let conditional_response = inner_service
1058                            .oneshot(conditional_req)
1059                            .await
1060                            .map_err(|_e| {
1061                                HttpCacheError::http(Box::new(
1062                                    std::io::Error::other(
1063                                        "service error".to_string(),
1064                                    ),
1065                                ))
1066                            })?;
1067
1068                        if conditional_response.status() == 304 {
1069                            let (fresh_parts, _) =
1070                                conditional_response.into_parts();
1071                            let updated_response = cache
1072                                .handle_not_modified(
1073                                    cached_response,
1074                                    &fresh_parts,
1075                                )
1076                                .await
1077                                .cache_err()?;
1078
1079                            let mut response = updated_response;
1080
1081                            // Add cache status headers if enabled
1082                            if cache.options.cache_status_headers {
1083                                response = add_cache_status_headers_streaming(
1084                                    response, "HIT", "HIT",
1085                                );
1086                            }
1087
1088                            return Ok(response);
1089                        } else {
1090                            let cached_response = cache
1091                                .process_response(
1092                                    analysis,
1093                                    conditional_response,
1094                                )
1095                                .await
1096                                .cache_err()?;
1097
1098                            let mut response = cached_response;
1099
1100                            // Add cache status headers if enabled
1101                            if cache.options.cache_status_headers {
1102                                response = add_cache_status_headers_streaming(
1103                                    response, "MISS", "MISS",
1104                                );
1105                            }
1106
1107                            return Ok(response);
1108                        }
1109                    }
1110                }
1111            }
1112
1113            // Apply rate limiting before fresh request
1114            #[cfg(feature = "rate-limiting")]
1115            if let Some(rate_limiter) = &cache.options.rate_limiter {
1116                if let Ok(url) = parts.uri.to_string().parse::<url::Url>() {
1117                    let rate_limit_key = url.host_str().unwrap_or("unknown");
1118                    rate_limiter.until_key_ready(rate_limit_key).await;
1119                }
1120            }
1121
1122            // Fetch fresh response
1123            let req = Request::from_parts(parts, body);
1124            let response = inner_service.oneshot(req).await.map_err(|_e| {
1125                HttpCacheError::http(Box::new(std::io::Error::other(
1126                    "service error".to_string(),
1127                )))
1128            })?;
1129
1130            // Process using streaming interface
1131            let cached_response =
1132                cache.process_response(analysis, response).await.cache_err()?;
1133
1134            let mut final_response = cached_response;
1135
1136            // Add cache status headers if enabled
1137            if cache.options.cache_status_headers {
1138                final_response = add_cache_status_headers_streaming(
1139                    final_response,
1140                    "MISS",
1141                    "MISS",
1142                );
1143            }
1144
1145            Ok(final_response)
1146        })
1147    }
1148}
1149
1150/// Body type that wraps cached responses  
1151pub enum HttpCacheBody<B> {
1152    /// Buffered body from cache
1153    Buffered(Vec<u8>),
1154    /// Original body (fallback)
1155    Original(B),
1156}
1157
1158impl<B> Body for HttpCacheBody<B>
1159where
1160    B: Body + Unpin,
1161    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
1162    B::Data: Into<bytes::Bytes>,
1163{
1164    type Data = bytes::Bytes;
1165    type Error = Box<dyn std::error::Error + Send + Sync>;
1166
1167    fn poll_frame(
1168        mut self: Pin<&mut Self>,
1169        cx: &mut Context<'_>,
1170    ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
1171        match &mut *self {
1172            HttpCacheBody::Buffered(bytes) => {
1173                if bytes.is_empty() {
1174                    Poll::Ready(None)
1175                } else {
1176                    let data = std::mem::take(bytes);
1177                    Poll::Ready(Some(Ok(http_body::Frame::data(
1178                        bytes::Bytes::from(data),
1179                    ))))
1180                }
1181            }
1182            HttpCacheBody::Original(body) => {
1183                Pin::new(body).poll_frame(cx).map(|opt| {
1184                    opt.map(|res| {
1185                        res.map(|frame| frame.map_data(Into::into))
1186                            .map_err(Into::into)
1187                    })
1188                })
1189            }
1190        }
1191    }
1192
1193    fn is_end_stream(&self) -> bool {
1194        match self {
1195            HttpCacheBody::Buffered(bytes) => bytes.is_empty(),
1196            HttpCacheBody::Original(body) => body.is_end_stream(),
1197        }
1198    }
1199
1200    fn size_hint(&self) -> http_body::SizeHint {
1201        match self {
1202            HttpCacheBody::Buffered(bytes) => {
1203                let len = bytes.len() as u64;
1204                http_body::SizeHint::with_exact(len)
1205            }
1206            HttpCacheBody::Original(body) => body.size_hint(),
1207        }
1208    }
1209}
1210
1211#[cfg(test)]
1212mod test;