http_cache_tower/lib.rs
1//! HTTP caching middleware for Tower services and Axum applications.
2//!
3//! This crate provides Tower layers that implement HTTP caching according to RFC 7234.
4//! It supports both traditional buffered caching and streaming responses for large payloads.
5//!
6//! ## Basic Usage
7//!
8//! ### With Tower Services
9//!
10//! ```rust,no_run
11//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
12//! use http_cache::{CacheMode, HttpCache, HttpCacheOptions};
13//! use tower::ServiceBuilder;
14//! use tower::service_fn;
15//! use tower::ServiceExt;
16//! use http::{Request, Response};
17//! use http_body_util::Full;
18//! use bytes::Bytes;
19//! use std::convert::Infallible;
20//!
21//! async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
22//! Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
23//! }
24//!
25//! #[tokio::main]
26//! async fn main() {
27//! // Create cache manager with disk storage
28//! let cache_manager = CACacheManager::new("./cache".into(), true);
29//!
30//! // Create cache layer
31//! let cache_layer = HttpCacheLayer::new(cache_manager);
32//!
33//! // Build service with caching
34//! let service = ServiceBuilder::new()
35//! .layer(cache_layer)
36//! .service_fn(handler);
37//!
38//! // Use the service
39//! let request = Request::builder()
40//! .uri("http://example.com")
41//! .body(Full::new(Bytes::new()))
42//! .unwrap();
43//! let response = service.oneshot(request).await.unwrap();
44//! }
45//! ```
46//!
47//! ### With Custom Cache Configuration
48//!
49//! ```rust
50//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
51//! use http_cache::{CacheMode, HttpCache, HttpCacheOptions};
52//!
53//! # #[tokio::main]
54//! # async fn main() {
55//! // Create cache manager
56//! let cache_manager = CACacheManager::new("./cache".into(), true);
57//!
58//! // Configure cache behavior
59//! let cache = HttpCache {
60//! mode: CacheMode::Default,
61//! manager: cache_manager,
62//! options: HttpCacheOptions::default(),
63//! };
64//!
65//! // Create layer with custom cache
66//! let cache_layer = HttpCacheLayer::with_cache(cache);
67//! # }
68//! ```
69//!
70//! ### Streaming Support
71//!
72//! For handling large responses without buffering, use `StreamingManager`:
73//!
74//! ```rust
75//! use http_cache_tower::HttpCacheStreamingLayer;
76//! use http_cache::StreamingManager;
77//! use std::path::PathBuf;
78//!
79//! # #[tokio::main]
80//! # async fn main() {
81//! // Create streaming cache setup
82//! let streaming_manager = StreamingManager::new("./streaming-cache".into());
83//! let streaming_layer = HttpCacheStreamingLayer::new(streaming_manager);
84//!
85//! // Use with your service
86//! // let service = streaming_layer.layer(your_service);
87//! # }
88//! ```
89//!
90//! ## Cache Modes
91//!
92//! Different cache modes provide different behaviors:
93//!
94//! - `CacheMode::Default`: Follow HTTP caching rules strictly
95//! - `CacheMode::NoStore`: Never cache responses
96//! - `CacheMode::NoCache`: Always revalidate with the origin server
97//! - `CacheMode::ForceCache`: Cache responses even if headers suggest otherwise
98//! - `CacheMode::OnlyIfCached`: Only serve from cache, never hit origin server
99//! - `CacheMode::IgnoreRules`: Cache everything regardless of headers
100//!
101//! ## Cache Invalidation
102//!
103//! The middleware automatically handles cache invalidation for unsafe HTTP methods:
104//!
105//! ```text
106//! These methods will invalidate any cached GET response for the same URI:
107//! - PUT /api/users/123 -> invalidates GET /api/users/123
108//! - POST /api/users/123 -> invalidates GET /api/users/123
109//! - DELETE /api/users/123 -> invalidates GET /api/users/123
110//! - PATCH /api/users/123 -> invalidates GET /api/users/123
111//! ```
112//!
113//! ## Integration with Other Tower Layers
114//!
115//! The cache layer works with other Tower middleware:
116//!
117//! ```rust,no_run
118//! use tower::ServiceBuilder;
119//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
120//! use tower::service_fn;
121//! use tower::ServiceExt;
122//! use http::{Request, Response};
123//! use http_body_util::Full;
124//! use bytes::Bytes;
125//! use std::convert::Infallible;
126//!
127//! async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
128//! Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
129//! }
130//!
131//! #[tokio::main]
132//! async fn main() {
133//! let cache_manager = CACacheManager::new("./cache".into(), true);
134//! let cache_layer = HttpCacheLayer::new(cache_manager);
135//!
136//! let service = ServiceBuilder::new()
137//! // .layer(TraceLayer::new_for_http()) // Logging (requires tower-http)
138//! // .layer(CompressionLayer::new()) // Compression (requires tower-http)
139//! .layer(cache_layer) // Caching
140//! .service_fn(handler);
141//!
142//! // Use the service
143//! let request = Request::builder()
144//! .uri("http://example.com")
145//! .body(Full::new(Bytes::new()))
146//! .unwrap();
147//! let response = service.oneshot(request).await.unwrap();
148//! }
149//! ```
150
151use bytes::Bytes;
152use http::{HeaderValue, Request, Response};
153use http_body::Body;
154use http_body_util::BodyExt;
155
156#[cfg(feature = "manager-cacache")]
157pub use http_cache::CACacheManager;
158
159#[cfg(feature = "rate-limiting")]
160pub use http_cache::rate_limiting::{
161 CacheAwareRateLimiter, DirectRateLimiter, DomainRateLimiter, Quota,
162};
163#[cfg(feature = "streaming")]
164use http_cache::StreamingError;
165use http_cache::{
166 CacheManager, CacheMode, HttpCache, HttpCacheInterface, HttpCacheOptions,
167};
168#[cfg(feature = "streaming")]
169use http_cache::{
170 HttpCacheStreamInterface, HttpStreamingCache, StreamingCacheManager,
171};
172use std::{
173 pin::Pin,
174 sync::Arc,
175 task::{Context, Poll},
176};
177use tower::{Layer, Service, ServiceExt};
178
179// Re-export unified error types from http-cache core
180pub use http_cache::HttpCacheError;
181
182#[cfg(feature = "streaming")]
183/// Type alias for tower streaming errors, using the unified streaming error system
184pub type TowerStreamingError = http_cache::ClientStreamingError;
185
186/// Helper functions for error conversions
187trait HttpCacheErrorExt<T> {
188 fn cache_err(self) -> Result<T, HttpCacheError>;
189}
190
191impl<T, E> HttpCacheErrorExt<T> for Result<T, E>
192where
193 E: ToString,
194{
195 fn cache_err(self) -> Result<T, HttpCacheError> {
196 self.map_err(|e| HttpCacheError::cache(e.to_string()))
197 }
198}
199
200/// Helper function to collect a body into bytes
201async fn collect_body<B>(body: B) -> Result<Vec<u8>, B::Error>
202where
203 B: Body,
204{
205 let collected = BodyExt::collect(body).await?;
206 Ok(collected.to_bytes().to_vec())
207}
208
209/// Helper function to add cache status headers to a response
210fn add_cache_status_headers<B>(
211 mut response: Response<HttpCacheBody<B>>,
212 hit_or_miss: &str,
213 cache_lookup: &str,
214) -> Response<HttpCacheBody<B>> {
215 let headers = response.headers_mut();
216 headers.insert(
217 http_cache::XCACHE,
218 HeaderValue::from_str(hit_or_miss).unwrap(),
219 );
220 headers.insert(
221 http_cache::XCACHELOOKUP,
222 HeaderValue::from_str(cache_lookup).unwrap(),
223 );
224 response
225}
226
227#[cfg(feature = "streaming")]
228fn add_cache_status_headers_streaming<B>(
229 mut response: Response<B>,
230 hit_or_miss: &str,
231 cache_lookup: &str,
232) -> Response<B> {
233 let headers = response.headers_mut();
234 headers.insert(
235 http_cache::XCACHE,
236 HeaderValue::from_str(hit_or_miss).unwrap(),
237 );
238 headers.insert(
239 http_cache::XCACHELOOKUP,
240 HeaderValue::from_str(cache_lookup).unwrap(),
241 );
242 response
243}
244
245/// HTTP cache layer for Tower services.
246///
247/// This layer implements HTTP caching according to RFC 7234, automatically caching
248/// GET and HEAD responses based on their cache-control headers and invalidating
249/// cache entries when unsafe methods (PUT, POST, DELETE, PATCH) are used.
250///
251/// # Example
252///
253/// ```rust
254/// use http_cache_tower::{HttpCacheLayer, CACacheManager};
255/// use tower::ServiceBuilder;
256/// use tower::service_fn;
257/// use http::{Request, Response};
258/// use http_body_util::Full;
259/// use bytes::Bytes;
260/// use std::convert::Infallible;
261///
262/// # #[tokio::main]
263/// # async fn main() {
264/// let cache_manager = CACacheManager::new("./cache".into(), true);
265/// let cache_layer = HttpCacheLayer::new(cache_manager);
266///
267/// // Use with ServiceBuilder
268/// let service = ServiceBuilder::new()
269/// .layer(cache_layer)
270/// .service_fn(|_req: Request<Full<Bytes>>| async {
271/// Ok::<_, Infallible>(Response::new(Full::new(Bytes::from("Hello"))))
272/// });
273/// # }
274/// ```
275#[derive(Clone)]
276pub struct HttpCacheLayer<CM>
277where
278 CM: CacheManager,
279{
280 cache: Arc<HttpCache<CM>>,
281}
282
283impl<CM> HttpCacheLayer<CM>
284where
285 CM: CacheManager,
286{
287 /// Create a new HTTP cache layer with default configuration.
288 ///
289 /// Uses [`CacheMode::Default`] and default [`HttpCacheOptions`].
290 ///
291 /// # Arguments
292 ///
293 /// * `cache_manager` - The cache manager to use for storing responses
294 ///
295 /// # Example
296 ///
297 /// ```rust
298 /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
299 ///
300 /// # #[tokio::main]
301 /// # async fn main() {
302 /// let cache_manager = CACacheManager::new("./cache".into(), true);
303 /// let layer = HttpCacheLayer::new(cache_manager);
304 /// # }
305 /// ```
306 pub fn new(cache_manager: CM) -> Self {
307 Self {
308 cache: Arc::new(HttpCache {
309 mode: CacheMode::Default,
310 manager: cache_manager,
311 options: HttpCacheOptions::default(),
312 }),
313 }
314 }
315
316 /// Create a new HTTP cache layer with custom options.
317 ///
318 /// Uses [`CacheMode::Default`] but allows customizing the cache behavior
319 /// through [`HttpCacheOptions`].
320 ///
321 /// # Arguments
322 ///
323 /// * `cache_manager` - The cache manager to use for storing responses
324 /// * `options` - Custom cache options
325 ///
326 /// # Example
327 ///
328 /// ```rust
329 /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
330 /// use http_cache::HttpCacheOptions;
331 ///
332 /// # #[tokio::main]
333 /// # async fn main() {
334 /// let cache_manager = CACacheManager::new("./cache".into(), true);
335 ///
336 /// let options = HttpCacheOptions {
337 /// cache_key: Some(std::sync::Arc::new(|req: &http::request::Parts| {
338 /// format!("custom:{}:{}", req.method, req.uri)
339 /// })),
340 /// ..Default::default()
341 /// };
342 ///
343 /// let layer = HttpCacheLayer::with_options(cache_manager, options);
344 /// # }
345 /// ```
346 pub fn with_options(cache_manager: CM, options: HttpCacheOptions) -> Self {
347 Self {
348 cache: Arc::new(HttpCache {
349 mode: CacheMode::Default,
350 manager: cache_manager,
351 options,
352 }),
353 }
354 }
355
356 /// Create a new HTTP cache layer with a pre-configured cache.
357 ///
358 /// This method gives you full control over the cache configuration,
359 /// including the cache mode.
360 ///
361 /// # Arguments
362 ///
363 /// * `cache` - A fully configured HttpCache instance
364 ///
365 /// # Example
366 ///
367 /// ```rust
368 /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
369 /// use http_cache::{HttpCache, CacheMode, HttpCacheOptions};
370 ///
371 /// # #[tokio::main]
372 /// # async fn main() {
373 /// let cache_manager = CACacheManager::new("./cache".into(), true);
374 ///
375 /// let cache = HttpCache {
376 /// mode: CacheMode::ForceCache,
377 /// manager: cache_manager,
378 /// options: HttpCacheOptions::default(),
379 /// };
380 ///
381 /// let layer = HttpCacheLayer::with_cache(cache);
382 /// # }
383 /// ```
384 pub fn with_cache(cache: HttpCache<CM>) -> Self {
385 Self { cache: Arc::new(cache) }
386 }
387}
388
389/// HTTP cache layer with streaming support for Tower services.
390///
391/// This layer provides the same HTTP caching functionality as [`HttpCacheLayer`]
392/// but handles streaming responses. It can work with large
393/// responses without buffering them entirely in memory.
394///
395/// # Example
396///
397/// ```rust
398/// use http_cache_tower::HttpCacheStreamingLayer;
399/// use http_cache::StreamingManager;
400/// use tower::ServiceBuilder;
401/// use tower::service_fn;
402/// use http::{Request, Response};
403/// use http_body_util::Full;
404/// use bytes::Bytes;
405/// use std::convert::Infallible;
406///
407/// async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
408/// Ok(Response::new(Full::new(Bytes::from("Hello"))))
409/// }
410///
411/// # #[tokio::main]
412/// # async fn main() {
413/// let streaming_manager = StreamingManager::new("./cache".into());
414/// let streaming_layer = HttpCacheStreamingLayer::new(streaming_manager);
415///
416/// // Use with ServiceBuilder
417/// let service = ServiceBuilder::new()
418/// .layer(streaming_layer)
419/// .service_fn(handler);
420/// # }
421/// ```
422#[cfg(feature = "streaming")]
423#[derive(Clone)]
424pub struct HttpCacheStreamingLayer<CM>
425where
426 CM: StreamingCacheManager,
427{
428 cache: Arc<HttpStreamingCache<CM>>,
429}
430
431#[cfg(feature = "streaming")]
432impl<CM> HttpCacheStreamingLayer<CM>
433where
434 CM: StreamingCacheManager,
435{
436 /// Create a new HTTP cache streaming layer with default configuration.
437 ///
438 /// Uses [`CacheMode::Default`] and default [`HttpCacheOptions`].
439 ///
440 /// # Arguments
441 ///
442 /// * `cache_manager` - The streaming cache manager to use
443 ///
444 /// # Example
445 ///
446 /// ```rust
447 /// use http_cache_tower::HttpCacheStreamingLayer;
448 /// use http_cache::StreamingManager;
449 ///
450 /// # #[tokio::main]
451 /// # async fn main() {
452 /// let streaming_manager = StreamingManager::new("./cache".into());
453 /// let layer = HttpCacheStreamingLayer::new(streaming_manager);
454 /// # }
455 /// ```
456 pub fn new(cache_manager: CM) -> Self {
457 Self {
458 cache: Arc::new(HttpStreamingCache {
459 mode: CacheMode::Default,
460 manager: cache_manager,
461 options: HttpCacheOptions::default(),
462 }),
463 }
464 }
465
466 /// Create a new HTTP cache streaming layer with custom options.
467 ///
468 /// Uses [`CacheMode::Default`] but allows customizing cache behavior.
469 ///
470 /// # Arguments
471 ///
472 /// * `cache_manager` - The streaming cache manager to use
473 /// * `options` - Custom cache options
474 ///
475 /// # Example
476 ///
477 /// ```rust
478 /// use http_cache_tower::HttpCacheStreamingLayer;
479 /// use http_cache::{StreamingManager, HttpCacheOptions};
480 ///
481 /// # #[tokio::main]
482 /// # async fn main() {
483 /// let streaming_manager = StreamingManager::new("./cache".into());
484 ///
485 /// let options = HttpCacheOptions {
486 /// cache_key: Some(std::sync::Arc::new(|req: &http::request::Parts| {
487 /// format!("stream:{}:{}", req.method, req.uri)
488 /// })),
489 /// ..Default::default()
490 /// };
491 ///
492 /// let layer = HttpCacheStreamingLayer::with_options(streaming_manager, options);
493 /// # }
494 /// ```
495 pub fn with_options(cache_manager: CM, options: HttpCacheOptions) -> Self {
496 Self {
497 cache: Arc::new(HttpStreamingCache {
498 mode: CacheMode::Default,
499 manager: cache_manager,
500 options,
501 }),
502 }
503 }
504
505 /// Create a new HTTP cache streaming layer with a pre-configured cache.
506 ///
507 /// This method gives you full control over the streaming cache configuration.
508 ///
509 /// # Arguments
510 ///
511 /// * `cache` - A fully configured HttpStreamingCache instance
512 ///
513 /// # Example
514 ///
515 /// ```rust
516 /// use http_cache_tower::HttpCacheStreamingLayer;
517 /// use http_cache::{StreamingManager, HttpStreamingCache, CacheMode, HttpCacheOptions};
518 ///
519 /// # #[tokio::main]
520 /// # async fn main() {
521 /// let streaming_manager = StreamingManager::new("./cache".into());
522 ///
523 /// let cache = HttpStreamingCache {
524 /// mode: CacheMode::ForceCache,
525 /// manager: streaming_manager,
526 /// options: HttpCacheOptions::default(),
527 /// };
528 ///
529 /// let layer = HttpCacheStreamingLayer::with_cache(cache);
530 /// # }
531 /// ```
532 pub fn with_cache(cache: HttpStreamingCache<CM>) -> Self {
533 Self { cache: Arc::new(cache) }
534 }
535}
536
537impl<S, CM> Layer<S> for HttpCacheLayer<CM>
538where
539 CM: CacheManager,
540{
541 type Service = HttpCacheService<S, CM>;
542
543 fn layer(&self, inner: S) -> Self::Service {
544 HttpCacheService { inner, cache: self.cache.clone() }
545 }
546}
547
548#[cfg(feature = "streaming")]
549impl<S, CM> Layer<S> for HttpCacheStreamingLayer<CM>
550where
551 CM: StreamingCacheManager,
552{
553 type Service = HttpCacheStreamingService<S, CM>;
554
555 fn layer(&self, inner: S) -> Self::Service {
556 HttpCacheStreamingService { inner, cache: self.cache.clone() }
557 }
558}
559
560/// HTTP cache service for Tower/Hyper
561pub struct HttpCacheService<S, CM>
562where
563 CM: CacheManager,
564{
565 inner: S,
566 cache: Arc<HttpCache<CM>>,
567}
568
569impl<S, CM> Clone for HttpCacheService<S, CM>
570where
571 S: Clone,
572 CM: CacheManager,
573{
574 fn clone(&self) -> Self {
575 Self { inner: self.inner.clone(), cache: self.cache.clone() }
576 }
577}
578
579/// HTTP cache streaming service for Tower/Hyper
580#[cfg(feature = "streaming")]
581pub struct HttpCacheStreamingService<S, CM>
582where
583 CM: StreamingCacheManager,
584{
585 inner: S,
586 cache: Arc<HttpStreamingCache<CM>>,
587}
588
589#[cfg(feature = "streaming")]
590impl<S, CM> Clone for HttpCacheStreamingService<S, CM>
591where
592 S: Clone,
593 CM: StreamingCacheManager,
594{
595 fn clone(&self) -> Self {
596 Self { inner: self.inner.clone(), cache: self.cache.clone() }
597 }
598}
599
600impl<S, CM, ReqBody, ResBody> Service<Request<ReqBody>>
601 for HttpCacheService<S, CM>
602where
603 S: Service<Request<ReqBody>, Response = Response<ResBody>>
604 + Clone
605 + Send
606 + 'static,
607 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
608 S::Future: Send + 'static,
609 ReqBody: Body + Send + 'static,
610 ReqBody::Data: Send,
611 ReqBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
612 ResBody: Body + Send + 'static,
613 ResBody::Data: Send,
614 ResBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
615 CM: CacheManager,
616{
617 type Response = Response<HttpCacheBody<ResBody>>;
618 type Error = HttpCacheError;
619 type Future = Pin<
620 Box<
621 dyn std::future::Future<
622 Output = Result<Self::Response, Self::Error>,
623 > + Send,
624 >,
625 >;
626
627 fn poll_ready(
628 &mut self,
629 cx: &mut Context<'_>,
630 ) -> Poll<Result<(), Self::Error>> {
631 self.inner.poll_ready(cx).map_err(|_e| {
632 HttpCacheError::http(Box::new(std::io::Error::other(
633 "service error".to_string(),
634 )))
635 })
636 }
637
638 fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
639 let cache = self.cache.clone();
640 let (parts, body) = req.into_parts();
641 let inner_service = self.inner.clone();
642
643 Box::pin(async move {
644 use http_cache_semantics::BeforeRequest;
645
646 // Use the core library's cache interface for analysis
647 let analysis = cache.analyze_request(&parts, None).cache_err()?;
648
649 // Handle cache busting and non-cacheable requests
650 for key in &analysis.cache_bust_keys {
651 cache.manager.delete(key).await.cache_err()?;
652 }
653
654 // For non-GET/HEAD requests, invalidate cached GET responses
655 if !analysis.should_cache && !analysis.is_get_head {
656 let get_cache_key = cache
657 .options
658 .create_cache_key_for_invalidation(&parts, "GET");
659 let _ = cache.manager.delete(&get_cache_key).await;
660 }
661
662 // If not cacheable, just pass through
663 if !analysis.should_cache {
664 let req = Request::from_parts(parts, body);
665 let response =
666 inner_service.oneshot(req).await.map_err(|_e| {
667 HttpCacheError::http(Box::new(std::io::Error::other(
668 "service error".to_string(),
669 )))
670 })?;
671 return Ok(response.map(HttpCacheBody::Original));
672 }
673
674 // Special case for Reload mode: skip cache lookup but still cache response
675 if analysis.cache_mode == CacheMode::Reload {
676 let req = Request::from_parts(parts, body);
677 let response =
678 inner_service.oneshot(req).await.map_err(|_e| {
679 HttpCacheError::http(Box::new(std::io::Error::other(
680 "service error".to_string(),
681 )))
682 })?;
683
684 let (res_parts, res_body) = response.into_parts();
685 let body_bytes =
686 collect_body(res_body).await.map_err(|_e| {
687 HttpCacheError::http(Box::new(std::io::Error::other(
688 "service error".to_string(),
689 )))
690 })?;
691
692 let cached_response = cache
693 .process_response(
694 analysis,
695 Response::from_parts(res_parts, body_bytes.clone()),
696 )
697 .await
698 .cache_err()?;
699
700 return Ok(cached_response.map(HttpCacheBody::Buffered));
701 }
702
703 // Look up cached response using interface
704 if let Some((cached_response, policy)) = cache
705 .lookup_cached_response(&analysis.cache_key)
706 .await
707 .cache_err()?
708 {
709 let before_req =
710 policy.before_request(&parts, std::time::SystemTime::now());
711 match before_req {
712 BeforeRequest::Fresh(_) => {
713 // Return cached response
714 let mut response = http_cache::HttpCacheOptions::http_response_to_response(
715 &cached_response,
716 HttpCacheBody::Buffered(cached_response.body.clone()),
717 ).map_err(HttpCacheError::other)?;
718
719 // Add cache status headers if enabled
720 if cache.options.cache_status_headers {
721 response = add_cache_status_headers(
722 response, "HIT", "HIT",
723 );
724 }
725
726 return Ok(response);
727 }
728 BeforeRequest::Stale {
729 request: conditional_parts, ..
730 } => {
731 // Make conditional request
732 let conditional_req =
733 Request::from_parts(conditional_parts, body);
734 let conditional_response = inner_service
735 .oneshot(conditional_req)
736 .await
737 .map_err(|_e| {
738 HttpCacheError::http(Box::new(
739 std::io::Error::other(
740 "service error".to_string(),
741 ),
742 ))
743 })?;
744
745 if conditional_response.status() == 304 {
746 // Use cached response with updated headers
747 let (fresh_parts, _) =
748 conditional_response.into_parts();
749 let updated_response = cache
750 .handle_not_modified(
751 cached_response,
752 &fresh_parts,
753 )
754 .await
755 .cache_err()?;
756
757 let mut response = http_cache::HttpCacheOptions::http_response_to_response(
758 &updated_response,
759 HttpCacheBody::Buffered(updated_response.body.clone()),
760 ).map_err(HttpCacheError::other)?;
761
762 // Add cache status headers if enabled
763 if cache.options.cache_status_headers {
764 response = add_cache_status_headers(
765 response, "HIT", "HIT",
766 );
767 }
768
769 return Ok(response);
770 } else {
771 // Process fresh response
772 let (parts, res_body) =
773 conditional_response.into_parts();
774 let body_bytes =
775 collect_body(res_body).await.map_err(|_e| {
776 HttpCacheError::http(Box::new(
777 std::io::Error::other(
778 "service error".to_string(),
779 ),
780 ))
781 })?;
782
783 let cached_response = cache
784 .process_response(
785 analysis,
786 Response::from_parts(
787 parts,
788 body_bytes.clone(),
789 ),
790 )
791 .await
792 .cache_err()?;
793
794 let mut response =
795 cached_response.map(HttpCacheBody::Buffered);
796
797 // Add cache status headers if enabled
798 if cache.options.cache_status_headers {
799 response = add_cache_status_headers(
800 response, "MISS", "MISS",
801 );
802 }
803
804 return Ok(response);
805 }
806 }
807 }
808 }
809
810 // Fetch fresh response
811 let req = Request::from_parts(parts, body);
812 let response = inner_service.oneshot(req).await.map_err(|_e| {
813 HttpCacheError::http(Box::new(std::io::Error::other(
814 "service error".to_string(),
815 )))
816 })?;
817
818 let (res_parts, res_body) = response.into_parts();
819 let body_bytes = collect_body(res_body).await.map_err(|_e| {
820 HttpCacheError::http(Box::new(std::io::Error::other(
821 "service error".to_string(),
822 )))
823 })?;
824
825 // Process and cache using interface
826 let cached_response = cache
827 .process_response(
828 analysis,
829 Response::from_parts(res_parts, body_bytes.clone()),
830 )
831 .await
832 .cache_err()?;
833
834 let mut response = cached_response.map(HttpCacheBody::Buffered);
835
836 // Add cache status headers if enabled
837 if cache.options.cache_status_headers {
838 response = add_cache_status_headers(response, "MISS", "MISS");
839 }
840
841 Ok(response)
842 })
843 }
844}
845
846// Hyper service implementation for HttpCacheService
847impl<S, CM> hyper::service::Service<Request<hyper::body::Incoming>>
848 for HttpCacheService<S, CM>
849where
850 S: Service<Request<hyper::body::Incoming>> + Clone + Send + 'static,
851 S::Response: Into<Response<http_body_util::Full<Bytes>>>,
852 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
853 S::Future: Send + 'static,
854 CM: CacheManager,
855{
856 type Response = Response<HttpCacheBody<http_body_util::Full<Bytes>>>;
857 type Error = HttpCacheError;
858 type Future = Pin<
859 Box<
860 dyn std::future::Future<
861 Output = Result<Self::Response, Self::Error>,
862 > + Send,
863 >,
864 >;
865
866 fn call(&self, _req: Request<hyper::body::Incoming>) -> Self::Future {
867 // Convert to the format expected by the generic Service implementation
868 let service_clone = self.clone();
869 Box::pin(async move { service_clone.call(_req).await })
870 }
871}
872
873#[cfg(feature = "streaming")]
874impl<S, CM, ReqBody, ResBody> Service<Request<ReqBody>>
875 for HttpCacheStreamingService<S, CM>
876where
877 S: Service<Request<ReqBody>, Response = Response<ResBody>>
878 + Clone
879 + Send
880 + 'static,
881 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
882 S::Future: Send + 'static,
883 ReqBody: Body + Send + 'static,
884 ReqBody::Data: Send,
885 ReqBody::Error: Into<StreamingError>,
886 ResBody: Body + Send + 'static,
887 ResBody::Data: Send,
888 ResBody::Error: Into<StreamingError>,
889 CM: StreamingCacheManager,
890 <CM::Body as http_body::Body>::Data: Send,
891 <CM::Body as http_body::Body>::Error:
892 Into<StreamingError> + Send + Sync + 'static,
893{
894 type Response = Response<CM::Body>;
895 type Error = HttpCacheError;
896 type Future = Pin<
897 Box<
898 dyn std::future::Future<
899 Output = Result<Self::Response, Self::Error>,
900 > + Send,
901 >,
902 >;
903
904 fn poll_ready(
905 &mut self,
906 cx: &mut Context<'_>,
907 ) -> Poll<Result<(), Self::Error>> {
908 self.inner.poll_ready(cx).map_err(|_e| {
909 HttpCacheError::http(Box::new(std::io::Error::other(
910 "service error".to_string(),
911 )))
912 })
913 }
914
915 fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
916 let cache = self.cache.clone();
917 let (parts, body) = req.into_parts();
918 let inner_service = self.inner.clone();
919
920 Box::pin(async move {
921 use http_cache_semantics::BeforeRequest;
922
923 // Use the core library's streaming cache interface
924 let analysis = cache.analyze_request(&parts, None).cache_err()?;
925
926 // Handle cache busting
927 for key in &analysis.cache_bust_keys {
928 cache.manager.delete(key).await.cache_err()?;
929 }
930
931 // For non-GET/HEAD requests, invalidate cached GET responses
932 if !analysis.should_cache && !analysis.is_get_head {
933 let get_cache_key = cache
934 .options
935 .create_cache_key_for_invalidation(&parts, "GET");
936 let _ = cache.manager.delete(&get_cache_key).await;
937 }
938
939 // If not cacheable, convert body type and return
940 if !analysis.should_cache {
941 // Apply rate limiting before non-cached request
942 #[cfg(feature = "rate-limiting")]
943 if let Some(rate_limiter) = &cache.options.rate_limiter {
944 if let Ok(url) = parts.uri.to_string().parse::<::url::Url>()
945 {
946 let rate_limit_key =
947 url.host_str().unwrap_or("unknown");
948 rate_limiter.until_key_ready(rate_limit_key).await;
949 }
950 }
951
952 let req = Request::from_parts(parts, body);
953 let response =
954 inner_service.oneshot(req).await.map_err(|_e| {
955 HttpCacheError::http(Box::new(std::io::Error::other(
956 "service error".to_string(),
957 )))
958 })?;
959 let mut converted_response =
960 cache.manager.convert_body(response).await.cache_err()?;
961
962 // Add cache status headers if enabled
963 if cache.options.cache_status_headers {
964 converted_response = add_cache_status_headers_streaming(
965 converted_response,
966 "MISS",
967 "MISS",
968 );
969 }
970
971 return Ok(converted_response);
972 }
973
974 // Special case for Reload mode: skip cache lookup but still cache response
975 if analysis.cache_mode == CacheMode::Reload {
976 // Apply rate limiting before reload request
977 #[cfg(feature = "rate-limiting")]
978 if let Some(rate_limiter) = &cache.options.rate_limiter {
979 if let Ok(url) = parts.uri.to_string().parse::<::url::Url>()
980 {
981 let rate_limit_key =
982 url.host_str().unwrap_or("unknown");
983 rate_limiter.until_key_ready(rate_limit_key).await;
984 }
985 }
986
987 let req = Request::from_parts(parts, body);
988 let response =
989 inner_service.oneshot(req).await.map_err(|_e| {
990 HttpCacheError::http(Box::new(std::io::Error::other(
991 "service error".to_string(),
992 )))
993 })?;
994
995 let cached_response = cache
996 .process_response(analysis, response)
997 .await
998 .cache_err()?;
999
1000 let mut final_response = cached_response;
1001
1002 // Add cache status headers if enabled
1003 if cache.options.cache_status_headers {
1004 final_response = add_cache_status_headers_streaming(
1005 final_response,
1006 "MISS",
1007 "MISS",
1008 );
1009 }
1010
1011 return Ok(final_response);
1012 }
1013
1014 // Look up cached response using interface
1015 if let Some((cached_response, policy)) = cache
1016 .lookup_cached_response(&analysis.cache_key)
1017 .await
1018 .cache_err()?
1019 {
1020 let before_req =
1021 policy.before_request(&parts, std::time::SystemTime::now());
1022 match before_req {
1023 BeforeRequest::Fresh(_) => {
1024 let mut response = cached_response;
1025
1026 // Add cache status headers if enabled
1027 if cache.options.cache_status_headers {
1028 response = add_cache_status_headers_streaming(
1029 response, "HIT", "HIT",
1030 );
1031 }
1032
1033 return Ok(response);
1034 }
1035 BeforeRequest::Stale {
1036 request: conditional_parts, ..
1037 } => {
1038 // Apply rate limiting before conditional request
1039 #[cfg(feature = "rate-limiting")]
1040 if let Some(rate_limiter) = &cache.options.rate_limiter
1041 {
1042 if let Ok(url) = conditional_parts
1043 .uri
1044 .to_string()
1045 .parse::<::url::Url>()
1046 {
1047 let rate_limit_key =
1048 url.host_str().unwrap_or("unknown");
1049 rate_limiter
1050 .until_key_ready(rate_limit_key)
1051 .await;
1052 }
1053 }
1054
1055 let conditional_req =
1056 Request::from_parts(conditional_parts, body);
1057 let conditional_response = inner_service
1058 .oneshot(conditional_req)
1059 .await
1060 .map_err(|_e| {
1061 HttpCacheError::http(Box::new(
1062 std::io::Error::other(
1063 "service error".to_string(),
1064 ),
1065 ))
1066 })?;
1067
1068 if conditional_response.status() == 304 {
1069 let (fresh_parts, _) =
1070 conditional_response.into_parts();
1071 let updated_response = cache
1072 .handle_not_modified(
1073 cached_response,
1074 &fresh_parts,
1075 )
1076 .await
1077 .cache_err()?;
1078
1079 let mut response = updated_response;
1080
1081 // Add cache status headers if enabled
1082 if cache.options.cache_status_headers {
1083 response = add_cache_status_headers_streaming(
1084 response, "HIT", "HIT",
1085 );
1086 }
1087
1088 return Ok(response);
1089 } else {
1090 let cached_response = cache
1091 .process_response(
1092 analysis,
1093 conditional_response,
1094 )
1095 .await
1096 .cache_err()?;
1097
1098 let mut response = cached_response;
1099
1100 // Add cache status headers if enabled
1101 if cache.options.cache_status_headers {
1102 response = add_cache_status_headers_streaming(
1103 response, "MISS", "MISS",
1104 );
1105 }
1106
1107 return Ok(response);
1108 }
1109 }
1110 }
1111 }
1112
1113 // Apply rate limiting before fresh request
1114 #[cfg(feature = "rate-limiting")]
1115 if let Some(rate_limiter) = &cache.options.rate_limiter {
1116 if let Ok(url) = parts.uri.to_string().parse::<url::Url>() {
1117 let rate_limit_key = url.host_str().unwrap_or("unknown");
1118 rate_limiter.until_key_ready(rate_limit_key).await;
1119 }
1120 }
1121
1122 // Fetch fresh response
1123 let req = Request::from_parts(parts, body);
1124 let response = inner_service.oneshot(req).await.map_err(|_e| {
1125 HttpCacheError::http(Box::new(std::io::Error::other(
1126 "service error".to_string(),
1127 )))
1128 })?;
1129
1130 // Process using streaming interface
1131 let cached_response =
1132 cache.process_response(analysis, response).await.cache_err()?;
1133
1134 let mut final_response = cached_response;
1135
1136 // Add cache status headers if enabled
1137 if cache.options.cache_status_headers {
1138 final_response = add_cache_status_headers_streaming(
1139 final_response,
1140 "MISS",
1141 "MISS",
1142 );
1143 }
1144
1145 Ok(final_response)
1146 })
1147 }
1148}
1149
1150/// Body type that wraps cached responses
1151pub enum HttpCacheBody<B> {
1152 /// Buffered body from cache
1153 Buffered(Vec<u8>),
1154 /// Original body (fallback)
1155 Original(B),
1156}
1157
1158impl<B> Body for HttpCacheBody<B>
1159where
1160 B: Body + Unpin,
1161 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
1162 B::Data: Into<bytes::Bytes>,
1163{
1164 type Data = bytes::Bytes;
1165 type Error = Box<dyn std::error::Error + Send + Sync>;
1166
1167 fn poll_frame(
1168 mut self: Pin<&mut Self>,
1169 cx: &mut Context<'_>,
1170 ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
1171 match &mut *self {
1172 HttpCacheBody::Buffered(bytes) => {
1173 if bytes.is_empty() {
1174 Poll::Ready(None)
1175 } else {
1176 let data = std::mem::take(bytes);
1177 Poll::Ready(Some(Ok(http_body::Frame::data(
1178 bytes::Bytes::from(data),
1179 ))))
1180 }
1181 }
1182 HttpCacheBody::Original(body) => {
1183 Pin::new(body).poll_frame(cx).map(|opt| {
1184 opt.map(|res| {
1185 res.map(|frame| frame.map_data(Into::into))
1186 .map_err(Into::into)
1187 })
1188 })
1189 }
1190 }
1191 }
1192
1193 fn is_end_stream(&self) -> bool {
1194 match self {
1195 HttpCacheBody::Buffered(bytes) => bytes.is_empty(),
1196 HttpCacheBody::Original(body) => body.is_end_stream(),
1197 }
1198 }
1199
1200 fn size_hint(&self) -> http_body::SizeHint {
1201 match self {
1202 HttpCacheBody::Buffered(bytes) => {
1203 let len = bytes.len() as u64;
1204 http_body::SizeHint::with_exact(len)
1205 }
1206 HttpCacheBody::Original(body) => body.size_hint(),
1207 }
1208 }
1209}
1210
1211#[cfg(test)]
1212mod test;