http_cache_tower/lib.rs
1//! HTTP caching middleware for Tower services and Axum applications.
2//!
3//! This crate provides Tower layers that implement HTTP caching according to RFC 7234.
4//! It supports both traditional buffered caching and streaming responses for large payloads.
5//!
6//! ## Basic Usage
7//!
8//! ### With Tower Services
9//!
10//! ```rust,no_run
11//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
12//! use http_cache::{CacheMode, HttpCache, HttpCacheOptions};
13//! use tower::ServiceBuilder;
14//! use tower::service_fn;
15//! use tower::ServiceExt;
16//! use http::{Request, Response};
17//! use http_body_util::Full;
18//! use bytes::Bytes;
19//! use std::convert::Infallible;
20//!
21//! async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
22//! Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
23//! }
24//!
25//! #[tokio::main]
26//! async fn main() {
27//! // Create cache manager with disk storage
28//! let cache_manager = CACacheManager::new("./cache".into(), true);
29//!
30//! // Create cache layer
31//! let cache_layer = HttpCacheLayer::new(cache_manager);
32//!
33//! // Build service with caching
34//! let service = ServiceBuilder::new()
35//! .layer(cache_layer)
36//! .service_fn(handler);
37//!
38//! // Use the service
39//! let request = Request::builder()
40//! .uri("http://example.com")
41//! .body(Full::new(Bytes::new()))
42//! .unwrap();
43//! let response = service.oneshot(request).await.unwrap();
44//! }
45//! ```
46//!
47//! ### With Custom Cache Configuration
48//!
49//! ```rust
50//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
51//! use http_cache::{CacheMode, HttpCache, HttpCacheOptions};
52//!
53//! # #[tokio::main]
54//! # async fn main() {
55//! // Create cache manager
56//! let cache_manager = CACacheManager::new("./cache".into(), true);
57//!
58//! // Configure cache behavior
59//! let cache = HttpCache {
60//! mode: CacheMode::Default,
61//! manager: cache_manager,
62//! options: HttpCacheOptions::default(),
63//! };
64//!
65//! // Create layer with custom cache
66//! let cache_layer = HttpCacheLayer::with_cache(cache);
67//! # }
68//! ```
69//!
70//! ### Streaming Support
71//!
72//! For handling large responses without buffering, use `StreamingManager`:
73//!
74//! ```rust
75//! use http_cache_tower::HttpCacheStreamingLayer;
76//! use http_cache::StreamingManager;
77//!
78//! # #[tokio::main]
79//! # async fn main() {
80//! // Create streaming cache setup
81//! let streaming_manager = StreamingManager::with_temp_dir(1000).await.unwrap();
82//! let streaming_layer = HttpCacheStreamingLayer::new(streaming_manager);
83//!
84//! // Use with your service
85//! // let service = streaming_layer.layer(your_service);
86//! # }
87//! ```
88//!
89//! ## Cache Modes
90//!
91//! Different cache modes provide different behaviors:
92//!
93//! - `CacheMode::Default`: Follow HTTP caching rules strictly
94//! - `CacheMode::NoStore`: Never cache responses
95//! - `CacheMode::NoCache`: Always revalidate with the origin server
96//! - `CacheMode::ForceCache`: Cache responses even if headers suggest otherwise
97//! - `CacheMode::OnlyIfCached`: Only serve from cache, never hit origin server
98//! - `CacheMode::IgnoreRules`: Cache everything regardless of headers
99//!
100//! ## Cache Invalidation
101//!
102//! The middleware automatically handles cache invalidation for unsafe HTTP methods:
103//!
104//! ```text
105//! These methods will invalidate any cached GET response for the same URI:
106//! - PUT /api/users/123 -> invalidates GET /api/users/123
107//! - POST /api/users/123 -> invalidates GET /api/users/123
108//! - DELETE /api/users/123 -> invalidates GET /api/users/123
109//! - PATCH /api/users/123 -> invalidates GET /api/users/123
110//! ```
111//!
112//! ## Integration with Other Tower Layers
113//!
114//! The cache layer works with other Tower middleware:
115//!
116//! ```rust,no_run
117//! use tower::ServiceBuilder;
118//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
119//! use tower::service_fn;
120//! use tower::ServiceExt;
121//! use http::{Request, Response};
122//! use http_body_util::Full;
123//! use bytes::Bytes;
124//! use std::convert::Infallible;
125//!
126//! async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
127//! Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
128//! }
129//!
130//! #[tokio::main]
131//! async fn main() {
132//! let cache_manager = CACacheManager::new("./cache".into(), true);
133//! let cache_layer = HttpCacheLayer::new(cache_manager);
134//!
135//! let service = ServiceBuilder::new()
136//! // .layer(TraceLayer::new_for_http()) // Logging (requires tower-http)
137//! // .layer(CompressionLayer::new()) // Compression (requires tower-http)
138//! .layer(cache_layer) // Caching
139//! .service_fn(handler);
140//!
141//! // Use the service
142//! let request = Request::builder()
143//! .uri("http://example.com")
144//! .body(Full::new(Bytes::new()))
145//! .unwrap();
146//! let response = service.oneshot(request).await.unwrap();
147//! }
148//! ```
149
150use bytes::Bytes;
151use http::{HeaderValue, Request, Response};
152use http_body::Body;
153use http_body_util::BodyExt;
154
155#[cfg(feature = "manager-cacache")]
156pub use http_cache::CACacheManager;
157
158#[cfg(feature = "rate-limiting")]
159pub use http_cache::rate_limiting::{
160 CacheAwareRateLimiter, DirectRateLimiter, DomainRateLimiter, Quota,
161};
162#[cfg(feature = "streaming")]
163use http_cache::StreamingError;
164use http_cache::{
165 CacheManager, CacheMode, HttpCache, HttpCacheInterface, HttpCacheOptions,
166};
167#[cfg(feature = "streaming")]
168use http_cache::{
169 HttpCacheStreamInterface, HttpStreamingCache, StreamingCacheManager,
170};
171use std::{
172 pin::Pin,
173 sync::Arc,
174 task::{Context, Poll},
175};
176use tower::{Layer, Service, ServiceExt};
177
178// Re-export unified error types from http-cache core
179pub use http_cache::HttpCacheError;
180
181#[cfg(feature = "streaming")]
182/// Type alias for tower streaming errors, using the unified streaming error system
183pub type TowerStreamingError = http_cache::ClientStreamingError;
184
185/// Helper functions for error conversions
186trait HttpCacheErrorExt<T> {
187 fn cache_err(self) -> Result<T, HttpCacheError>;
188}
189
190impl<T, E> HttpCacheErrorExt<T> for Result<T, E>
191where
192 E: ToString,
193{
194 fn cache_err(self) -> Result<T, HttpCacheError> {
195 self.map_err(|e| HttpCacheError::cache(e.to_string()))
196 }
197}
198
199/// Helper function to collect a body into bytes
200async fn collect_body<B>(body: B) -> Result<Vec<u8>, B::Error>
201where
202 B: Body,
203{
204 let collected = BodyExt::collect(body).await?;
205 Ok(collected.to_bytes().to_vec())
206}
207
208/// Helper function to add cache status headers to a response
209fn add_cache_status_headers<B>(
210 mut response: Response<HttpCacheBody<B>>,
211 hit_or_miss: &str,
212 cache_lookup: &str,
213) -> Response<HttpCacheBody<B>> {
214 let headers = response.headers_mut();
215 headers.insert(
216 http_cache::XCACHE,
217 HeaderValue::from_str(hit_or_miss).unwrap(),
218 );
219 headers.insert(
220 http_cache::XCACHELOOKUP,
221 HeaderValue::from_str(cache_lookup).unwrap(),
222 );
223 response
224}
225
226#[cfg(feature = "streaming")]
227fn add_cache_status_headers_streaming<B>(
228 mut response: Response<B>,
229 hit_or_miss: &str,
230 cache_lookup: &str,
231) -> Response<B> {
232 let headers = response.headers_mut();
233 headers.insert(
234 http_cache::XCACHE,
235 HeaderValue::from_str(hit_or_miss).unwrap(),
236 );
237 headers.insert(
238 http_cache::XCACHELOOKUP,
239 HeaderValue::from_str(cache_lookup).unwrap(),
240 );
241 response
242}
243
244/// HTTP cache layer for Tower services.
245///
246/// This layer implements HTTP caching according to RFC 7234, automatically caching
247/// GET and HEAD responses based on their cache-control headers and invalidating
248/// cache entries when unsafe methods (PUT, POST, DELETE, PATCH) are used.
249///
250/// # Example
251///
252/// ```rust
253/// use http_cache_tower::{HttpCacheLayer, CACacheManager};
254/// use tower::ServiceBuilder;
255/// use tower::service_fn;
256/// use http::{Request, Response};
257/// use http_body_util::Full;
258/// use bytes::Bytes;
259/// use std::convert::Infallible;
260///
261/// # #[tokio::main]
262/// # async fn main() {
263/// let cache_manager = CACacheManager::new("./cache".into(), true);
264/// let cache_layer = HttpCacheLayer::new(cache_manager);
265///
266/// // Use with ServiceBuilder
267/// let service = ServiceBuilder::new()
268/// .layer(cache_layer)
269/// .service_fn(|_req: Request<Full<Bytes>>| async {
270/// Ok::<_, Infallible>(Response::new(Full::new(Bytes::from("Hello"))))
271/// });
272/// # }
273/// ```
274#[derive(Clone)]
275pub struct HttpCacheLayer<CM>
276where
277 CM: CacheManager,
278{
279 cache: Arc<HttpCache<CM>>,
280}
281
282impl<CM> HttpCacheLayer<CM>
283where
284 CM: CacheManager,
285{
286 /// Create a new HTTP cache layer with default configuration.
287 ///
288 /// Uses [`CacheMode::Default`] and default [`HttpCacheOptions`].
289 ///
290 /// # Arguments
291 ///
292 /// * `cache_manager` - The cache manager to use for storing responses
293 ///
294 /// # Example
295 ///
296 /// ```rust
297 /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
298 ///
299 /// # #[tokio::main]
300 /// # async fn main() {
301 /// let cache_manager = CACacheManager::new("./cache".into(), true);
302 /// let layer = HttpCacheLayer::new(cache_manager);
303 /// # }
304 /// ```
305 pub fn new(cache_manager: CM) -> Self {
306 Self {
307 cache: Arc::new(HttpCache {
308 mode: CacheMode::Default,
309 manager: cache_manager,
310 options: HttpCacheOptions::default(),
311 }),
312 }
313 }
314
315 /// Create a new HTTP cache layer with custom options.
316 ///
317 /// Uses [`CacheMode::Default`] but allows customizing the cache behavior
318 /// through [`HttpCacheOptions`].
319 ///
320 /// # Arguments
321 ///
322 /// * `cache_manager` - The cache manager to use for storing responses
323 /// * `options` - Custom cache options
324 ///
325 /// # Example
326 ///
327 /// ```rust
328 /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
329 /// use http_cache::HttpCacheOptions;
330 ///
331 /// # #[tokio::main]
332 /// # async fn main() {
333 /// let cache_manager = CACacheManager::new("./cache".into(), true);
334 ///
335 /// let options = HttpCacheOptions {
336 /// cache_key: Some(std::sync::Arc::new(|req: &http::request::Parts| {
337 /// format!("custom:{}:{}", req.method, req.uri)
338 /// })),
339 /// ..Default::default()
340 /// };
341 ///
342 /// let layer = HttpCacheLayer::with_options(cache_manager, options);
343 /// # }
344 /// ```
345 pub fn with_options(cache_manager: CM, options: HttpCacheOptions) -> Self {
346 Self {
347 cache: Arc::new(HttpCache {
348 mode: CacheMode::Default,
349 manager: cache_manager,
350 options,
351 }),
352 }
353 }
354
355 /// Create a new HTTP cache layer with a pre-configured cache.
356 ///
357 /// This method gives you full control over the cache configuration,
358 /// including the cache mode.
359 ///
360 /// # Arguments
361 ///
362 /// * `cache` - A fully configured HttpCache instance
363 ///
364 /// # Example
365 ///
366 /// ```rust
367 /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
368 /// use http_cache::{HttpCache, CacheMode, HttpCacheOptions};
369 ///
370 /// # #[tokio::main]
371 /// # async fn main() {
372 /// let cache_manager = CACacheManager::new("./cache".into(), true);
373 ///
374 /// let cache = HttpCache {
375 /// mode: CacheMode::ForceCache,
376 /// manager: cache_manager,
377 /// options: HttpCacheOptions::default(),
378 /// };
379 ///
380 /// let layer = HttpCacheLayer::with_cache(cache);
381 /// # }
382 /// ```
383 pub fn with_cache(cache: HttpCache<CM>) -> Self {
384 Self { cache: Arc::new(cache) }
385 }
386}
387
388/// HTTP cache layer with streaming support for Tower services.
389///
390/// This layer provides the same HTTP caching functionality as [`HttpCacheLayer`]
391/// but handles streaming responses. It can work with large
392/// responses without buffering them entirely in memory.
393///
394/// # Example
395///
396/// ```rust
397/// use http_cache_tower::HttpCacheStreamingLayer;
398/// use http_cache::StreamingManager;
399/// use tower::ServiceBuilder;
400/// use tower::service_fn;
401/// use http::{Request, Response};
402/// use http_body_util::Full;
403/// use bytes::Bytes;
404/// use std::convert::Infallible;
405///
406/// async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
407/// Ok(Response::new(Full::new(Bytes::from("Hello"))))
408/// }
409///
410/// # #[tokio::main]
411/// # async fn main() {
412/// let streaming_manager = StreamingManager::with_temp_dir(1000).await.unwrap();
413/// let streaming_layer = HttpCacheStreamingLayer::new(streaming_manager);
414///
415/// // Use with ServiceBuilder
416/// let service = ServiceBuilder::new()
417/// .layer(streaming_layer)
418/// .service_fn(handler);
419/// # }
420/// ```
421#[cfg(feature = "streaming")]
422#[derive(Clone)]
423pub struct HttpCacheStreamingLayer<CM>
424where
425 CM: StreamingCacheManager,
426{
427 cache: Arc<HttpStreamingCache<CM>>,
428}
429
430#[cfg(feature = "streaming")]
431impl<CM> HttpCacheStreamingLayer<CM>
432where
433 CM: StreamingCacheManager,
434{
435 /// Create a new HTTP cache streaming layer with default configuration.
436 ///
437 /// Uses [`CacheMode::Default`] and default [`HttpCacheOptions`].
438 ///
439 /// # Arguments
440 ///
441 /// * `cache_manager` - The streaming cache manager to use
442 ///
443 /// # Example
444 ///
445 /// ```rust
446 /// use http_cache_tower::HttpCacheStreamingLayer;
447 /// use http_cache::StreamingManager;
448 ///
449 /// # #[tokio::main]
450 /// # async fn main() {
451 /// let streaming_manager = StreamingManager::with_temp_dir(1000).await.unwrap();
452 /// let layer = HttpCacheStreamingLayer::new(streaming_manager);
453 /// # }
454 /// ```
455 pub fn new(cache_manager: CM) -> Self {
456 Self {
457 cache: Arc::new(HttpStreamingCache {
458 mode: CacheMode::Default,
459 manager: cache_manager,
460 options: HttpCacheOptions::default(),
461 }),
462 }
463 }
464
465 /// Create a new HTTP cache streaming layer with custom options.
466 ///
467 /// Uses [`CacheMode::Default`] but allows customizing cache behavior.
468 ///
469 /// # Arguments
470 ///
471 /// * `cache_manager` - The streaming cache manager to use
472 /// * `options` - Custom cache options
473 ///
474 /// # Example
475 ///
476 /// ```rust
477 /// use http_cache_tower::HttpCacheStreamingLayer;
478 /// use http_cache::{StreamingManager, HttpCacheOptions};
479 ///
480 /// # #[tokio::main]
481 /// # async fn main() {
482 /// let streaming_manager = StreamingManager::with_temp_dir(1000).await.unwrap();
483 ///
484 /// let options = HttpCacheOptions {
485 /// cache_key: Some(std::sync::Arc::new(|req: &http::request::Parts| {
486 /// format!("stream:{}:{}", req.method, req.uri)
487 /// })),
488 /// ..Default::default()
489 /// };
490 ///
491 /// let layer = HttpCacheStreamingLayer::with_options(streaming_manager, options);
492 /// # }
493 /// ```
494 pub fn with_options(cache_manager: CM, options: HttpCacheOptions) -> Self {
495 Self {
496 cache: Arc::new(HttpStreamingCache {
497 mode: CacheMode::Default,
498 manager: cache_manager,
499 options,
500 }),
501 }
502 }
503
504 /// Create a new HTTP cache streaming layer with a pre-configured cache.
505 ///
506 /// This method gives you full control over the streaming cache configuration.
507 ///
508 /// # Arguments
509 ///
510 /// * `cache` - A fully configured HttpStreamingCache instance
511 ///
512 /// # Example
513 ///
514 /// ```rust
515 /// use http_cache_tower::HttpCacheStreamingLayer;
516 /// use http_cache::{StreamingManager, HttpStreamingCache, CacheMode, HttpCacheOptions};
517 ///
518 /// # #[tokio::main]
519 /// # async fn main() {
520 /// let streaming_manager = StreamingManager::with_temp_dir(1000).await.unwrap();
521 ///
522 /// let cache = HttpStreamingCache {
523 /// mode: CacheMode::ForceCache,
524 /// manager: streaming_manager,
525 /// options: HttpCacheOptions::default(),
526 /// };
527 ///
528 /// let layer = HttpCacheStreamingLayer::with_cache(cache);
529 /// # }
530 /// ```
531 pub fn with_cache(cache: HttpStreamingCache<CM>) -> Self {
532 Self { cache: Arc::new(cache) }
533 }
534}
535
536impl<S, CM> Layer<S> for HttpCacheLayer<CM>
537where
538 CM: CacheManager,
539{
540 type Service = HttpCacheService<S, CM>;
541
542 fn layer(&self, inner: S) -> Self::Service {
543 HttpCacheService { inner, cache: self.cache.clone() }
544 }
545}
546
547#[cfg(feature = "streaming")]
548impl<S, CM> Layer<S> for HttpCacheStreamingLayer<CM>
549where
550 CM: StreamingCacheManager,
551{
552 type Service = HttpCacheStreamingService<S, CM>;
553
554 fn layer(&self, inner: S) -> Self::Service {
555 HttpCacheStreamingService { inner, cache: self.cache.clone() }
556 }
557}
558
559/// HTTP cache service for Tower/Hyper
560pub struct HttpCacheService<S, CM>
561where
562 CM: CacheManager,
563{
564 inner: S,
565 cache: Arc<HttpCache<CM>>,
566}
567
568impl<S, CM> Clone for HttpCacheService<S, CM>
569where
570 S: Clone,
571 CM: CacheManager,
572{
573 fn clone(&self) -> Self {
574 Self { inner: self.inner.clone(), cache: self.cache.clone() }
575 }
576}
577
578/// HTTP cache streaming service for Tower/Hyper
579#[cfg(feature = "streaming")]
580pub struct HttpCacheStreamingService<S, CM>
581where
582 CM: StreamingCacheManager,
583{
584 inner: S,
585 cache: Arc<HttpStreamingCache<CM>>,
586}
587
588#[cfg(feature = "streaming")]
589impl<S, CM> Clone for HttpCacheStreamingService<S, CM>
590where
591 S: Clone,
592 CM: StreamingCacheManager,
593{
594 fn clone(&self) -> Self {
595 Self { inner: self.inner.clone(), cache: self.cache.clone() }
596 }
597}
598
599impl<S, CM, ReqBody, ResBody> Service<Request<ReqBody>>
600 for HttpCacheService<S, CM>
601where
602 S: Service<Request<ReqBody>, Response = Response<ResBody>>
603 + Clone
604 + Send
605 + 'static,
606 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
607 S::Future: Send + 'static,
608 ReqBody: Body + Send + 'static,
609 ReqBody::Data: Send,
610 ReqBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
611 ResBody: Body + Send + 'static,
612 ResBody::Data: Send,
613 ResBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
614 CM: CacheManager,
615{
616 type Response = Response<HttpCacheBody<ResBody>>;
617 type Error = HttpCacheError;
618 type Future = Pin<
619 Box<
620 dyn std::future::Future<
621 Output = Result<Self::Response, Self::Error>,
622 > + Send,
623 >,
624 >;
625
626 fn poll_ready(
627 &mut self,
628 cx: &mut Context<'_>,
629 ) -> Poll<Result<(), Self::Error>> {
630 self.inner.poll_ready(cx).map_err(|_e| {
631 HttpCacheError::http(Box::new(std::io::Error::other(
632 "service error".to_string(),
633 )))
634 })
635 }
636
637 fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
638 let cache = self.cache.clone();
639 let (parts, body) = req.into_parts();
640 let inner_service = self.inner.clone();
641
642 Box::pin(async move {
643 use http_cache_semantics::BeforeRequest;
644
645 // Use the core library's cache interface for analysis
646 let analysis = cache.analyze_request(&parts, None).cache_err()?;
647
648 // If not cacheable, execute request FIRST, then invalidate on success (RFC 7234 Section 4.4)
649 if !analysis.should_cache {
650 let req = Request::from_parts(parts, body);
651 let response =
652 inner_service.oneshot(req).await.map_err(|_e| {
653 HttpCacheError::http(Box::new(std::io::Error::other(
654 "service error".to_string(),
655 )))
656 })?;
657
658 // Only invalidate AFTER successful response (RFC 7234 Section 4.4)
659 if response.status().is_success()
660 || response.status().is_redirection()
661 {
662 for key in &analysis.cache_bust_keys {
663 let _ = cache.manager.delete(key).await;
664 }
665 // Invalidate both GET and HEAD caches per RFC 7234 Section 4.4
666 if !analysis.is_get_head {
667 let get_cache_key =
668 cache.options.create_cache_key_for_invalidation(
669 &analysis.request_parts,
670 "GET",
671 );
672 let _ = cache.manager.delete(&get_cache_key).await;
673 let head_cache_key =
674 cache.options.create_cache_key_for_invalidation(
675 &analysis.request_parts,
676 "HEAD",
677 );
678 let _ = cache.manager.delete(&head_cache_key).await;
679 }
680 }
681
682 return Ok(response.map(HttpCacheBody::Original));
683 }
684
685 // Special case for Reload mode: skip cache lookup but still cache response
686 if analysis.cache_mode == CacheMode::Reload {
687 let req = Request::from_parts(parts, body);
688 let response =
689 inner_service.oneshot(req).await.map_err(|_e| {
690 HttpCacheError::http(Box::new(std::io::Error::other(
691 "service error".to_string(),
692 )))
693 })?;
694
695 let (res_parts, res_body) = response.into_parts();
696 let body_bytes =
697 collect_body(res_body).await.map_err(|_e| {
698 HttpCacheError::http(Box::new(std::io::Error::other(
699 "service error".to_string(),
700 )))
701 })?;
702
703 let cached_response = cache
704 .process_response(
705 analysis,
706 Response::from_parts(res_parts, body_bytes.clone()),
707 None,
708 )
709 .await
710 .cache_err()?;
711
712 return Ok(cached_response.map(HttpCacheBody::Buffered));
713 }
714
715 // Look up cached response using interface
716 if let Some((cached_response, policy)) = cache
717 .lookup_cached_response(&analysis.cache_key)
718 .await
719 .cache_err()?
720 {
721 let before_req =
722 policy.before_request(&parts, std::time::SystemTime::now());
723 match before_req {
724 BeforeRequest::Fresh(_) => {
725 // Return cached response
726 let mut response = http_cache::HttpCacheOptions::http_response_to_response(
727 &cached_response,
728 HttpCacheBody::Buffered(cached_response.body.clone()),
729 ).map_err(HttpCacheError::other)?;
730
731 // Add cache status headers if enabled
732 if cache.options.cache_status_headers {
733 response = add_cache_status_headers(
734 response, "HIT", "HIT",
735 );
736 }
737
738 // Insert metadata into response extensions if present
739 if let Some(metadata) = cached_response.metadata {
740 response.extensions_mut().insert(
741 http_cache::HttpCacheMetadata::from(metadata),
742 );
743 }
744
745 return Ok(response);
746 }
747 BeforeRequest::Stale {
748 request: conditional_parts, ..
749 } => {
750 // Make conditional request
751 let conditional_req =
752 Request::from_parts(conditional_parts, body);
753 let conditional_response = inner_service
754 .oneshot(conditional_req)
755 .await
756 .map_err(|_e| {
757 HttpCacheError::http(Box::new(
758 std::io::Error::other(
759 "service error".to_string(),
760 ),
761 ))
762 })?;
763
764 if conditional_response.status() == 304 {
765 // Use cached response with updated headers
766 let (fresh_parts, _) =
767 conditional_response.into_parts();
768 let updated_response = cache
769 .handle_not_modified(
770 cached_response,
771 &fresh_parts,
772 )
773 .await
774 .cache_err()?;
775
776 let mut response = http_cache::HttpCacheOptions::http_response_to_response(
777 &updated_response,
778 HttpCacheBody::Buffered(updated_response.body.clone()),
779 ).map_err(HttpCacheError::other)?;
780
781 // Add cache status headers if enabled
782 if cache.options.cache_status_headers {
783 response = add_cache_status_headers(
784 response, "HIT", "HIT",
785 );
786 }
787
788 // Insert metadata into response extensions if present
789 if let Some(metadata) = updated_response.metadata {
790 response.extensions_mut().insert(
791 http_cache::HttpCacheMetadata::from(
792 metadata,
793 ),
794 );
795 }
796
797 return Ok(response);
798 } else {
799 // Process fresh response
800 let (parts, res_body) =
801 conditional_response.into_parts();
802 let body_bytes =
803 collect_body(res_body).await.map_err(|_e| {
804 HttpCacheError::http(Box::new(
805 std::io::Error::other(
806 "service error".to_string(),
807 ),
808 ))
809 })?;
810
811 let cached_response = cache
812 .process_response(
813 analysis,
814 Response::from_parts(
815 parts,
816 body_bytes.clone(),
817 ),
818 None,
819 )
820 .await
821 .cache_err()?;
822
823 let mut response =
824 cached_response.map(HttpCacheBody::Buffered);
825
826 // Add cache status headers if enabled
827 if cache.options.cache_status_headers {
828 response = add_cache_status_headers(
829 response, "MISS", "MISS",
830 );
831 }
832
833 return Ok(response);
834 }
835 }
836 }
837 }
838
839 // Fetch fresh response
840 let req = Request::from_parts(parts, body);
841 let response = inner_service.oneshot(req).await.map_err(|_e| {
842 HttpCacheError::http(Box::new(std::io::Error::other(
843 "service error".to_string(),
844 )))
845 })?;
846
847 let (res_parts, res_body) = response.into_parts();
848 let body_bytes = collect_body(res_body).await.map_err(|_e| {
849 HttpCacheError::http(Box::new(std::io::Error::other(
850 "service error".to_string(),
851 )))
852 })?;
853
854 // Process and cache using interface
855 let cached_response = cache
856 .process_response(
857 analysis,
858 Response::from_parts(res_parts, body_bytes.clone()),
859 None,
860 )
861 .await
862 .cache_err()?;
863
864 let mut response = cached_response.map(HttpCacheBody::Buffered);
865
866 // Add cache status headers if enabled
867 if cache.options.cache_status_headers {
868 response = add_cache_status_headers(response, "MISS", "MISS");
869 }
870
871 Ok(response)
872 })
873 }
874}
875
876// Hyper service implementation for HttpCacheService
877impl<S, CM> hyper::service::Service<Request<hyper::body::Incoming>>
878 for HttpCacheService<S, CM>
879where
880 S: Service<Request<hyper::body::Incoming>> + Clone + Send + 'static,
881 S::Response: Into<Response<http_body_util::Full<Bytes>>>,
882 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
883 S::Future: Send + 'static,
884 CM: CacheManager,
885{
886 type Response = Response<HttpCacheBody<http_body_util::Full<Bytes>>>;
887 type Error = HttpCacheError;
888 type Future = Pin<
889 Box<
890 dyn std::future::Future<
891 Output = Result<Self::Response, Self::Error>,
892 > + Send,
893 >,
894 >;
895
896 fn call(&self, _req: Request<hyper::body::Incoming>) -> Self::Future {
897 // Convert to the format expected by the generic Service implementation
898 let service_clone = self.clone();
899 Box::pin(async move { service_clone.call(_req).await })
900 }
901}
902
903#[cfg(feature = "streaming")]
904impl<S, CM, ReqBody, ResBody> Service<Request<ReqBody>>
905 for HttpCacheStreamingService<S, CM>
906where
907 S: Service<Request<ReqBody>, Response = Response<ResBody>>
908 + Clone
909 + Send
910 + 'static,
911 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
912 S::Future: Send + 'static,
913 ReqBody: Body + Send + 'static,
914 ReqBody::Data: Send,
915 ReqBody::Error: Into<StreamingError>,
916 ResBody: Body + Send + 'static,
917 ResBody::Data: Send,
918 ResBody::Error: Into<StreamingError>,
919 CM: StreamingCacheManager,
920 <CM::Body as http_body::Body>::Data: Send,
921 <CM::Body as http_body::Body>::Error:
922 Into<StreamingError> + Send + Sync + 'static,
923{
924 type Response = Response<CM::Body>;
925 type Error = HttpCacheError;
926 type Future = Pin<
927 Box<
928 dyn std::future::Future<
929 Output = Result<Self::Response, Self::Error>,
930 > + Send,
931 >,
932 >;
933
934 fn poll_ready(
935 &mut self,
936 cx: &mut Context<'_>,
937 ) -> Poll<Result<(), Self::Error>> {
938 self.inner.poll_ready(cx).map_err(|_e| {
939 HttpCacheError::http(Box::new(std::io::Error::other(
940 "service error".to_string(),
941 )))
942 })
943 }
944
945 fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
946 let cache = self.cache.clone();
947 let (parts, body) = req.into_parts();
948 let inner_service = self.inner.clone();
949
950 Box::pin(async move {
951 use http_cache_semantics::BeforeRequest;
952
953 // Use the core library's streaming cache interface
954 let analysis = cache.analyze_request(&parts, None).cache_err()?;
955
956 // If not cacheable, execute request FIRST, then invalidate on success (RFC 7234 Section 4.4)
957 if !analysis.should_cache {
958 // Apply rate limiting before non-cached request
959 #[cfg(feature = "rate-limiting")]
960 if let Some(rate_limiter) = &cache.options.rate_limiter {
961 if let Ok(url) = analysis
962 .request_parts
963 .uri
964 .to_string()
965 .parse::<::url::Url>()
966 {
967 let rate_limit_key =
968 url.host_str().unwrap_or("unknown");
969 rate_limiter.until_key_ready(rate_limit_key).await;
970 }
971 }
972
973 let req = Request::from_parts(parts, body);
974 let response =
975 inner_service.oneshot(req).await.map_err(|_e| {
976 HttpCacheError::http(Box::new(std::io::Error::other(
977 "service error".to_string(),
978 )))
979 })?;
980
981 // Only invalidate AFTER successful response (RFC 7234 Section 4.4)
982 if response.status().is_success()
983 || response.status().is_redirection()
984 {
985 for key in &analysis.cache_bust_keys {
986 let _ = cache.manager.delete(key).await;
987 }
988 // Invalidate both GET and HEAD caches per RFC 7234 Section 4.4
989 if !analysis.is_get_head {
990 let get_cache_key =
991 cache.options.create_cache_key_for_invalidation(
992 &analysis.request_parts,
993 "GET",
994 );
995 let _ = cache.manager.delete(&get_cache_key).await;
996 let head_cache_key =
997 cache.options.create_cache_key_for_invalidation(
998 &analysis.request_parts,
999 "HEAD",
1000 );
1001 let _ = cache.manager.delete(&head_cache_key).await;
1002 }
1003 }
1004
1005 let mut converted_response =
1006 cache.manager.convert_body(response).await.cache_err()?;
1007
1008 // Add cache status headers if enabled
1009 if cache.options.cache_status_headers {
1010 converted_response = add_cache_status_headers_streaming(
1011 converted_response,
1012 "MISS",
1013 "MISS",
1014 );
1015 }
1016
1017 return Ok(converted_response);
1018 }
1019
1020 // Special case for Reload mode: skip cache lookup but still cache response
1021 if analysis.cache_mode == CacheMode::Reload {
1022 // Apply rate limiting before reload request
1023 #[cfg(feature = "rate-limiting")]
1024 if let Some(rate_limiter) = &cache.options.rate_limiter {
1025 if let Ok(url) = parts.uri.to_string().parse::<::url::Url>()
1026 {
1027 let rate_limit_key =
1028 url.host_str().unwrap_or("unknown");
1029 rate_limiter.until_key_ready(rate_limit_key).await;
1030 }
1031 }
1032
1033 let req = Request::from_parts(parts, body);
1034 let response =
1035 inner_service.oneshot(req).await.map_err(|_e| {
1036 HttpCacheError::http(Box::new(std::io::Error::other(
1037 "service error".to_string(),
1038 )))
1039 })?;
1040
1041 let cached_response = cache
1042 .process_response(analysis, response, None)
1043 .await
1044 .cache_err()?;
1045
1046 let mut final_response = cached_response;
1047
1048 // Add cache status headers if enabled
1049 if cache.options.cache_status_headers {
1050 final_response = add_cache_status_headers_streaming(
1051 final_response,
1052 "MISS",
1053 "MISS",
1054 );
1055 }
1056
1057 return Ok(final_response);
1058 }
1059
1060 // Look up cached response using interface
1061 if let Some((cached_response, policy)) = cache
1062 .lookup_cached_response(&analysis.cache_key)
1063 .await
1064 .cache_err()?
1065 {
1066 let before_req =
1067 policy.before_request(&parts, std::time::SystemTime::now());
1068 match before_req {
1069 BeforeRequest::Fresh(_) => {
1070 let mut response = cached_response;
1071
1072 // Add cache status headers if enabled
1073 if cache.options.cache_status_headers {
1074 response = add_cache_status_headers_streaming(
1075 response, "HIT", "HIT",
1076 );
1077 }
1078
1079 return Ok(response);
1080 }
1081 BeforeRequest::Stale {
1082 request: conditional_parts, ..
1083 } => {
1084 // Apply rate limiting before conditional request
1085 #[cfg(feature = "rate-limiting")]
1086 if let Some(rate_limiter) = &cache.options.rate_limiter
1087 {
1088 if let Ok(url) = conditional_parts
1089 .uri
1090 .to_string()
1091 .parse::<::url::Url>()
1092 {
1093 let rate_limit_key =
1094 url.host_str().unwrap_or("unknown");
1095 rate_limiter
1096 .until_key_ready(rate_limit_key)
1097 .await;
1098 }
1099 }
1100
1101 let conditional_req =
1102 Request::from_parts(conditional_parts, body);
1103 let conditional_response = inner_service
1104 .oneshot(conditional_req)
1105 .await
1106 .map_err(|_e| {
1107 HttpCacheError::http(Box::new(
1108 std::io::Error::other(
1109 "service error".to_string(),
1110 ),
1111 ))
1112 })?;
1113
1114 if conditional_response.status() == 304 {
1115 let (fresh_parts, _) =
1116 conditional_response.into_parts();
1117 let updated_response = cache
1118 .handle_not_modified(
1119 cached_response,
1120 &fresh_parts,
1121 )
1122 .await
1123 .cache_err()?;
1124
1125 let mut response = updated_response;
1126
1127 // Add cache status headers if enabled
1128 if cache.options.cache_status_headers {
1129 response = add_cache_status_headers_streaming(
1130 response, "HIT", "HIT",
1131 );
1132 }
1133
1134 return Ok(response);
1135 } else {
1136 let cached_response = cache
1137 .process_response(
1138 analysis,
1139 conditional_response,
1140 None,
1141 )
1142 .await
1143 .cache_err()?;
1144
1145 let mut response = cached_response;
1146
1147 // Add cache status headers if enabled
1148 if cache.options.cache_status_headers {
1149 response = add_cache_status_headers_streaming(
1150 response, "MISS", "MISS",
1151 );
1152 }
1153
1154 return Ok(response);
1155 }
1156 }
1157 }
1158 }
1159
1160 // Handle OnlyIfCached mode: return 504 Gateway Timeout on cache miss
1161 if analysis.cache_mode == CacheMode::OnlyIfCached {
1162 let mut response = Response::builder()
1163 .status(504)
1164 .body(cache.manager.empty_body())
1165 .map_err(|e| HttpCacheError::other(e.to_string()))?;
1166
1167 if cache.options.cache_status_headers {
1168 response = add_cache_status_headers_streaming(
1169 response, "MISS", "MISS",
1170 );
1171 }
1172
1173 return Ok(response);
1174 }
1175
1176 // Apply rate limiting before fresh request
1177 #[cfg(feature = "rate-limiting")]
1178 if let Some(rate_limiter) = &cache.options.rate_limiter {
1179 if let Ok(url) = parts.uri.to_string().parse::<url::Url>() {
1180 let rate_limit_key = url.host_str().unwrap_or("unknown");
1181 rate_limiter.until_key_ready(rate_limit_key).await;
1182 }
1183 }
1184
1185 // Fetch fresh response
1186 let req = Request::from_parts(parts, body);
1187 let response = inner_service.oneshot(req).await.map_err(|_e| {
1188 HttpCacheError::http(Box::new(std::io::Error::other(
1189 "service error".to_string(),
1190 )))
1191 })?;
1192
1193 // Process using streaming interface
1194 let cached_response = cache
1195 .process_response(analysis, response, None)
1196 .await
1197 .cache_err()?;
1198
1199 let mut final_response = cached_response;
1200
1201 // Add cache status headers if enabled
1202 if cache.options.cache_status_headers {
1203 final_response = add_cache_status_headers_streaming(
1204 final_response,
1205 "MISS",
1206 "MISS",
1207 );
1208 }
1209
1210 Ok(final_response)
1211 })
1212 }
1213}
1214
1215/// Body type that wraps cached responses
1216pub enum HttpCacheBody<B> {
1217 /// Buffered body from cache
1218 Buffered(Vec<u8>),
1219 /// Original body (fallback)
1220 Original(B),
1221}
1222
1223impl<B> Body for HttpCacheBody<B>
1224where
1225 B: Body + Unpin,
1226 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
1227 B::Data: Into<bytes::Bytes>,
1228{
1229 type Data = bytes::Bytes;
1230 type Error = Box<dyn std::error::Error + Send + Sync>;
1231
1232 fn poll_frame(
1233 mut self: Pin<&mut Self>,
1234 cx: &mut Context<'_>,
1235 ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
1236 match &mut *self {
1237 HttpCacheBody::Buffered(bytes) => {
1238 if bytes.is_empty() {
1239 Poll::Ready(None)
1240 } else {
1241 let data = std::mem::take(bytes);
1242 Poll::Ready(Some(Ok(http_body::Frame::data(
1243 bytes::Bytes::from(data),
1244 ))))
1245 }
1246 }
1247 HttpCacheBody::Original(body) => {
1248 Pin::new(body).poll_frame(cx).map(|opt| {
1249 opt.map(|res| {
1250 res.map(|frame| frame.map_data(Into::into))
1251 .map_err(Into::into)
1252 })
1253 })
1254 }
1255 }
1256 }
1257
1258 fn is_end_stream(&self) -> bool {
1259 match self {
1260 HttpCacheBody::Buffered(bytes) => bytes.is_empty(),
1261 HttpCacheBody::Original(body) => body.is_end_stream(),
1262 }
1263 }
1264
1265 fn size_hint(&self) -> http_body::SizeHint {
1266 match self {
1267 HttpCacheBody::Buffered(bytes) => {
1268 let len = bytes.len() as u64;
1269 http_body::SizeHint::with_exact(len)
1270 }
1271 HttpCacheBody::Original(body) => body.size_hint(),
1272 }
1273 }
1274}
1275
1276#[cfg(test)]
1277mod test;