http_cache_tower/lib.rs
1//! HTTP caching middleware for Tower services and Axum applications.
2//!
3//! This crate provides Tower layers that implement HTTP caching according to RFC 7234.
4//! It supports both traditional buffered caching and streaming responses for large payloads.
5//!
6//! ## Basic Usage
7//!
8//! ### With Tower Services
9//!
10//! ```rust,no_run
11//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
12//! use http_cache::{CacheMode, HttpCache, HttpCacheOptions};
13//! use tower::ServiceBuilder;
14//! use tower::service_fn;
15//! use tower::ServiceExt;
16//! use http::{Request, Response};
17//! use http_body_util::Full;
18//! use bytes::Bytes;
19//! use std::convert::Infallible;
20//!
21//! async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
22//! Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
23//! }
24//!
25//! #[tokio::main]
26//! async fn main() {
27//! // Create cache manager with disk storage
28//! let cache_manager = CACacheManager::new("./cache".into(), true);
29//!
30//! // Create cache layer
31//! let cache_layer = HttpCacheLayer::new(cache_manager);
32//!
33//! // Build service with caching
34//! let service = ServiceBuilder::new()
35//! .layer(cache_layer)
36//! .service_fn(handler);
37//!
38//! // Use the service
39//! let request = Request::builder()
40//! .uri("http://example.com")
41//! .body(Full::new(Bytes::new()))
42//! .unwrap();
43//! let response = service.oneshot(request).await.unwrap();
44//! }
45//! ```
46//!
47//! ### With Custom Cache Configuration
48//!
49//! ```rust
50//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
51//! use http_cache::{CacheMode, HttpCache, HttpCacheOptions};
52//!
53//! # #[tokio::main]
54//! # async fn main() {
55//! // Create cache manager
56//! let cache_manager = CACacheManager::new("./cache".into(), true);
57//!
58//! // Configure cache behavior
59//! let cache = HttpCache {
60//! mode: CacheMode::Default,
61//! manager: cache_manager,
62//! options: HttpCacheOptions::default(),
63//! };
64//!
65//! // Create layer with custom cache
66//! let cache_layer = HttpCacheLayer::with_cache(cache);
67//! # }
68//! ```
69//!
70//! ### Streaming Support
71//!
72//! For handling large responses without buffering, use `StreamingManager`:
73//!
74//! ```rust
75//! use http_cache_tower::HttpCacheStreamingLayer;
76//! use http_cache::StreamingManager;
77//! use std::path::PathBuf;
78//!
79//! # #[tokio::main]
80//! # async fn main() {
81//! // Create streaming cache setup
82//! let streaming_manager = StreamingManager::new("./streaming-cache".into());
83//! let streaming_layer = HttpCacheStreamingLayer::new(streaming_manager);
84//!
85//! // Use with your service
86//! // let service = streaming_layer.layer(your_service);
87//! # }
88//! ```
89//!
90//! ## Cache Modes
91//!
92//! Different cache modes provide different behaviors:
93//!
94//! - `CacheMode::Default`: Follow HTTP caching rules strictly
95//! - `CacheMode::NoStore`: Never cache responses
96//! - `CacheMode::NoCache`: Always revalidate with the origin server
97//! - `CacheMode::ForceCache`: Cache responses even if headers suggest otherwise
98//! - `CacheMode::OnlyIfCached`: Only serve from cache, never hit origin server
99//! - `CacheMode::IgnoreRules`: Cache everything regardless of headers
100//!
101//! ## Cache Invalidation
102//!
103//! The middleware automatically handles cache invalidation for unsafe HTTP methods:
104//!
105//! ```text
106//! These methods will invalidate any cached GET response for the same URI:
107//! - PUT /api/users/123 -> invalidates GET /api/users/123
108//! - POST /api/users/123 -> invalidates GET /api/users/123
109//! - DELETE /api/users/123 -> invalidates GET /api/users/123
110//! - PATCH /api/users/123 -> invalidates GET /api/users/123
111//! ```
112//!
113//! ## Integration with Other Tower Layers
114//!
115//! The cache layer works with other Tower middleware:
116//!
117//! ```rust,no_run
118//! use tower::ServiceBuilder;
119//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
120//! use tower::service_fn;
121//! use tower::ServiceExt;
122//! use http::{Request, Response};
123//! use http_body_util::Full;
124//! use bytes::Bytes;
125//! use std::convert::Infallible;
126//!
127//! async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
128//! Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
129//! }
130//!
131//! #[tokio::main]
132//! async fn main() {
133//! let cache_manager = CACacheManager::new("./cache".into(), true);
134//! let cache_layer = HttpCacheLayer::new(cache_manager);
135//!
136//! let service = ServiceBuilder::new()
137//! // .layer(TraceLayer::new_for_http()) // Logging (requires tower-http)
138//! // .layer(CompressionLayer::new()) // Compression (requires tower-http)
139//! .layer(cache_layer) // Caching
140//! .service_fn(handler);
141//!
142//! // Use the service
143//! let request = Request::builder()
144//! .uri("http://example.com")
145//! .body(Full::new(Bytes::new()))
146//! .unwrap();
147//! let response = service.oneshot(request).await.unwrap();
148//! }
149//! ```
150
151use bytes::Bytes;
152use http::{HeaderValue, Request, Response};
153use http_body::Body;
154use http_body_util::BodyExt;
155
156#[cfg(feature = "manager-cacache")]
157pub use http_cache::CACacheManager;
158
159#[cfg(feature = "rate-limiting")]
160pub use http_cache::rate_limiting::{
161 CacheAwareRateLimiter, DirectRateLimiter, DomainRateLimiter, Quota,
162};
163#[cfg(feature = "streaming")]
164use http_cache::StreamingError;
165use http_cache::{
166 CacheManager, CacheMode, HttpCache, HttpCacheInterface, HttpCacheOptions,
167};
168#[cfg(feature = "streaming")]
169use http_cache::{
170 HttpCacheStreamInterface, HttpStreamingCache, StreamingCacheManager,
171};
172use std::{
173 pin::Pin,
174 sync::Arc,
175 task::{Context, Poll},
176};
177use tower::{Layer, Service, ServiceExt};
178
179// Re-export unified error types from http-cache core
180pub use http_cache::HttpCacheError;
181
182#[cfg(feature = "streaming")]
183/// Type alias for tower streaming errors, using the unified streaming error system
184pub type TowerStreamingError = http_cache::ClientStreamingError;
185
186/// Helper functions for error conversions
187trait HttpCacheErrorExt<T> {
188 fn cache_err(self) -> Result<T, HttpCacheError>;
189}
190
191impl<T, E> HttpCacheErrorExt<T> for Result<T, E>
192where
193 E: ToString,
194{
195 fn cache_err(self) -> Result<T, HttpCacheError> {
196 self.map_err(|e| HttpCacheError::cache(e.to_string()))
197 }
198}
199
200/// Helper function to collect a body into bytes
201async fn collect_body<B>(body: B) -> Result<Vec<u8>, B::Error>
202where
203 B: Body,
204{
205 let collected = BodyExt::collect(body).await?;
206 Ok(collected.to_bytes().to_vec())
207}
208
209/// Helper function to add cache status headers to a response
210fn add_cache_status_headers<B>(
211 mut response: Response<HttpCacheBody<B>>,
212 hit_or_miss: &str,
213 cache_lookup: &str,
214) -> Response<HttpCacheBody<B>> {
215 let headers = response.headers_mut();
216 headers.insert(
217 http_cache::XCACHE,
218 HeaderValue::from_str(hit_or_miss).unwrap(),
219 );
220 headers.insert(
221 http_cache::XCACHELOOKUP,
222 HeaderValue::from_str(cache_lookup).unwrap(),
223 );
224 response
225}
226
227#[cfg(feature = "streaming")]
228fn add_cache_status_headers_streaming<B>(
229 mut response: Response<B>,
230 hit_or_miss: &str,
231 cache_lookup: &str,
232) -> Response<B> {
233 let headers = response.headers_mut();
234 headers.insert(
235 http_cache::XCACHE,
236 HeaderValue::from_str(hit_or_miss).unwrap(),
237 );
238 headers.insert(
239 http_cache::XCACHELOOKUP,
240 HeaderValue::from_str(cache_lookup).unwrap(),
241 );
242 response
243}
244
245/// HTTP cache layer for Tower services.
246///
247/// This layer implements HTTP caching according to RFC 7234, automatically caching
248/// GET and HEAD responses based on their cache-control headers and invalidating
249/// cache entries when unsafe methods (PUT, POST, DELETE, PATCH) are used.
250///
251/// # Example
252///
253/// ```rust
254/// use http_cache_tower::{HttpCacheLayer, CACacheManager};
255/// use tower::ServiceBuilder;
256/// use tower::service_fn;
257/// use http::{Request, Response};
258/// use http_body_util::Full;
259/// use bytes::Bytes;
260/// use std::convert::Infallible;
261///
262/// # #[tokio::main]
263/// # async fn main() {
264/// let cache_manager = CACacheManager::new("./cache".into(), true);
265/// let cache_layer = HttpCacheLayer::new(cache_manager);
266///
267/// // Use with ServiceBuilder
268/// let service = ServiceBuilder::new()
269/// .layer(cache_layer)
270/// .service_fn(|_req: Request<Full<Bytes>>| async {
271/// Ok::<_, Infallible>(Response::new(Full::new(Bytes::from("Hello"))))
272/// });
273/// # }
274/// ```
275#[derive(Clone)]
276pub struct HttpCacheLayer<CM>
277where
278 CM: CacheManager,
279{
280 cache: Arc<HttpCache<CM>>,
281}
282
283impl<CM> HttpCacheLayer<CM>
284where
285 CM: CacheManager,
286{
287 /// Create a new HTTP cache layer with default configuration.
288 ///
289 /// Uses [`CacheMode::Default`] and default [`HttpCacheOptions`].
290 ///
291 /// # Arguments
292 ///
293 /// * `cache_manager` - The cache manager to use for storing responses
294 ///
295 /// # Example
296 ///
297 /// ```rust
298 /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
299 ///
300 /// # #[tokio::main]
301 /// # async fn main() {
302 /// let cache_manager = CACacheManager::new("./cache".into(), true);
303 /// let layer = HttpCacheLayer::new(cache_manager);
304 /// # }
305 /// ```
306 pub fn new(cache_manager: CM) -> Self {
307 Self {
308 cache: Arc::new(HttpCache {
309 mode: CacheMode::Default,
310 manager: cache_manager,
311 options: HttpCacheOptions::default(),
312 }),
313 }
314 }
315
316 /// Create a new HTTP cache layer with custom options.
317 ///
318 /// Uses [`CacheMode::Default`] but allows customizing the cache behavior
319 /// through [`HttpCacheOptions`].
320 ///
321 /// # Arguments
322 ///
323 /// * `cache_manager` - The cache manager to use for storing responses
324 /// * `options` - Custom cache options
325 ///
326 /// # Example
327 ///
328 /// ```rust
329 /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
330 /// use http_cache::HttpCacheOptions;
331 ///
332 /// # #[tokio::main]
333 /// # async fn main() {
334 /// let cache_manager = CACacheManager::new("./cache".into(), true);
335 ///
336 /// let options = HttpCacheOptions {
337 /// cache_key: Some(std::sync::Arc::new(|req: &http::request::Parts| {
338 /// format!("custom:{}:{}", req.method, req.uri)
339 /// })),
340 /// ..Default::default()
341 /// };
342 ///
343 /// let layer = HttpCacheLayer::with_options(cache_manager, options);
344 /// # }
345 /// ```
346 pub fn with_options(cache_manager: CM, options: HttpCacheOptions) -> Self {
347 Self {
348 cache: Arc::new(HttpCache {
349 mode: CacheMode::Default,
350 manager: cache_manager,
351 options,
352 }),
353 }
354 }
355
356 /// Create a new HTTP cache layer with a pre-configured cache.
357 ///
358 /// This method gives you full control over the cache configuration,
359 /// including the cache mode.
360 ///
361 /// # Arguments
362 ///
363 /// * `cache` - A fully configured HttpCache instance
364 ///
365 /// # Example
366 ///
367 /// ```rust
368 /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
369 /// use http_cache::{HttpCache, CacheMode, HttpCacheOptions};
370 ///
371 /// # #[tokio::main]
372 /// # async fn main() {
373 /// let cache_manager = CACacheManager::new("./cache".into(), true);
374 ///
375 /// let cache = HttpCache {
376 /// mode: CacheMode::ForceCache,
377 /// manager: cache_manager,
378 /// options: HttpCacheOptions::default(),
379 /// };
380 ///
381 /// let layer = HttpCacheLayer::with_cache(cache);
382 /// # }
383 /// ```
384 pub fn with_cache(cache: HttpCache<CM>) -> Self {
385 Self { cache: Arc::new(cache) }
386 }
387}
388
389/// HTTP cache layer with streaming support for Tower services.
390///
391/// This layer provides the same HTTP caching functionality as [`HttpCacheLayer`]
392/// but handles streaming responses. It can work with large
393/// responses without buffering them entirely in memory.
394///
395/// # Example
396///
397/// ```rust
398/// use http_cache_tower::HttpCacheStreamingLayer;
399/// use http_cache::StreamingManager;
400/// use tower::ServiceBuilder;
401/// use tower::service_fn;
402/// use http::{Request, Response};
403/// use http_body_util::Full;
404/// use bytes::Bytes;
405/// use std::convert::Infallible;
406///
407/// async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
408/// Ok(Response::new(Full::new(Bytes::from("Hello"))))
409/// }
410///
411/// # #[tokio::main]
412/// # async fn main() {
413/// let streaming_manager = StreamingManager::new("./cache".into());
414/// let streaming_layer = HttpCacheStreamingLayer::new(streaming_manager);
415///
416/// // Use with ServiceBuilder
417/// let service = ServiceBuilder::new()
418/// .layer(streaming_layer)
419/// .service_fn(handler);
420/// # }
421/// ```
422#[cfg(feature = "streaming")]
423#[derive(Clone)]
424pub struct HttpCacheStreamingLayer<CM>
425where
426 CM: StreamingCacheManager,
427{
428 cache: Arc<HttpStreamingCache<CM>>,
429}
430
431#[cfg(feature = "streaming")]
432impl<CM> HttpCacheStreamingLayer<CM>
433where
434 CM: StreamingCacheManager,
435{
436 /// Create a new HTTP cache streaming layer with default configuration.
437 ///
438 /// Uses [`CacheMode::Default`] and default [`HttpCacheOptions`].
439 ///
440 /// # Arguments
441 ///
442 /// * `cache_manager` - The streaming cache manager to use
443 ///
444 /// # Example
445 ///
446 /// ```rust
447 /// use http_cache_tower::HttpCacheStreamingLayer;
448 /// use http_cache::StreamingManager;
449 ///
450 /// # #[tokio::main]
451 /// # async fn main() {
452 /// let streaming_manager = StreamingManager::new("./cache".into());
453 /// let layer = HttpCacheStreamingLayer::new(streaming_manager);
454 /// # }
455 /// ```
456 pub fn new(cache_manager: CM) -> Self {
457 Self {
458 cache: Arc::new(HttpStreamingCache {
459 mode: CacheMode::Default,
460 manager: cache_manager,
461 options: HttpCacheOptions::default(),
462 }),
463 }
464 }
465
466 /// Create a new HTTP cache streaming layer with custom options.
467 ///
468 /// Uses [`CacheMode::Default`] but allows customizing cache behavior.
469 ///
470 /// # Arguments
471 ///
472 /// * `cache_manager` - The streaming cache manager to use
473 /// * `options` - Custom cache options
474 ///
475 /// # Example
476 ///
477 /// ```rust
478 /// use http_cache_tower::HttpCacheStreamingLayer;
479 /// use http_cache::{StreamingManager, HttpCacheOptions};
480 ///
481 /// # #[tokio::main]
482 /// # async fn main() {
483 /// let streaming_manager = StreamingManager::new("./cache".into());
484 ///
485 /// let options = HttpCacheOptions {
486 /// cache_key: Some(std::sync::Arc::new(|req: &http::request::Parts| {
487 /// format!("stream:{}:{}", req.method, req.uri)
488 /// })),
489 /// ..Default::default()
490 /// };
491 ///
492 /// let layer = HttpCacheStreamingLayer::with_options(streaming_manager, options);
493 /// # }
494 /// ```
495 pub fn with_options(cache_manager: CM, options: HttpCacheOptions) -> Self {
496 Self {
497 cache: Arc::new(HttpStreamingCache {
498 mode: CacheMode::Default,
499 manager: cache_manager,
500 options,
501 }),
502 }
503 }
504
505 /// Create a new HTTP cache streaming layer with a pre-configured cache.
506 ///
507 /// This method gives you full control over the streaming cache configuration.
508 ///
509 /// # Arguments
510 ///
511 /// * `cache` - A fully configured HttpStreamingCache instance
512 ///
513 /// # Example
514 ///
515 /// ```rust
516 /// use http_cache_tower::HttpCacheStreamingLayer;
517 /// use http_cache::{StreamingManager, HttpStreamingCache, CacheMode, HttpCacheOptions};
518 ///
519 /// # #[tokio::main]
520 /// # async fn main() {
521 /// let streaming_manager = StreamingManager::new("./cache".into());
522 ///
523 /// let cache = HttpStreamingCache {
524 /// mode: CacheMode::ForceCache,
525 /// manager: streaming_manager,
526 /// options: HttpCacheOptions::default(),
527 /// };
528 ///
529 /// let layer = HttpCacheStreamingLayer::with_cache(cache);
530 /// # }
531 /// ```
532 pub fn with_cache(cache: HttpStreamingCache<CM>) -> Self {
533 Self { cache: Arc::new(cache) }
534 }
535}
536
537impl<S, CM> Layer<S> for HttpCacheLayer<CM>
538where
539 CM: CacheManager,
540{
541 type Service = HttpCacheService<S, CM>;
542
543 fn layer(&self, inner: S) -> Self::Service {
544 HttpCacheService { inner, cache: self.cache.clone() }
545 }
546}
547
548#[cfg(feature = "streaming")]
549impl<S, CM> Layer<S> for HttpCacheStreamingLayer<CM>
550where
551 CM: StreamingCacheManager,
552{
553 type Service = HttpCacheStreamingService<S, CM>;
554
555 fn layer(&self, inner: S) -> Self::Service {
556 HttpCacheStreamingService { inner, cache: self.cache.clone() }
557 }
558}
559
560/// HTTP cache service for Tower/Hyper
561pub struct HttpCacheService<S, CM>
562where
563 CM: CacheManager,
564{
565 inner: S,
566 cache: Arc<HttpCache<CM>>,
567}
568
569impl<S, CM> Clone for HttpCacheService<S, CM>
570where
571 S: Clone,
572 CM: CacheManager,
573{
574 fn clone(&self) -> Self {
575 Self { inner: self.inner.clone(), cache: self.cache.clone() }
576 }
577}
578
579/// HTTP cache streaming service for Tower/Hyper
580#[cfg(feature = "streaming")]
581pub struct HttpCacheStreamingService<S, CM>
582where
583 CM: StreamingCacheManager,
584{
585 inner: S,
586 cache: Arc<HttpStreamingCache<CM>>,
587}
588
589#[cfg(feature = "streaming")]
590impl<S, CM> Clone for HttpCacheStreamingService<S, CM>
591where
592 S: Clone,
593 CM: StreamingCacheManager,
594{
595 fn clone(&self) -> Self {
596 Self { inner: self.inner.clone(), cache: self.cache.clone() }
597 }
598}
599
600impl<S, CM, ReqBody, ResBody> Service<Request<ReqBody>>
601 for HttpCacheService<S, CM>
602where
603 S: Service<Request<ReqBody>, Response = Response<ResBody>>
604 + Clone
605 + Send
606 + 'static,
607 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
608 S::Future: Send + 'static,
609 ReqBody: Body + Send + 'static,
610 ReqBody::Data: Send,
611 ReqBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
612 ResBody: Body + Send + 'static,
613 ResBody::Data: Send,
614 ResBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
615 CM: CacheManager,
616{
617 type Response = Response<HttpCacheBody<ResBody>>;
618 type Error = HttpCacheError;
619 type Future = Pin<
620 Box<
621 dyn std::future::Future<
622 Output = Result<Self::Response, Self::Error>,
623 > + Send,
624 >,
625 >;
626
627 fn poll_ready(
628 &mut self,
629 cx: &mut Context<'_>,
630 ) -> Poll<Result<(), Self::Error>> {
631 self.inner.poll_ready(cx).map_err(|_e| {
632 HttpCacheError::http(Box::new(std::io::Error::other(
633 "service error".to_string(),
634 )))
635 })
636 }
637
638 fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
639 let cache = self.cache.clone();
640 let (parts, body) = req.into_parts();
641 let inner_service = self.inner.clone();
642
643 Box::pin(async move {
644 use http_cache_semantics::BeforeRequest;
645
646 // Use the core library's cache interface for analysis
647 let analysis = cache.analyze_request(&parts, None).cache_err()?;
648
649 // Handle cache busting and non-cacheable requests
650 for key in &analysis.cache_bust_keys {
651 cache.manager.delete(key).await.cache_err()?;
652 }
653
654 // For non-GET/HEAD requests, invalidate cached GET responses
655 if !analysis.should_cache && !analysis.is_get_head {
656 let get_cache_key = cache
657 .options
658 .create_cache_key_for_invalidation(&parts, "GET");
659 let _ = cache.manager.delete(&get_cache_key).await;
660 }
661
662 // If not cacheable, just pass through
663 if !analysis.should_cache {
664 let req = Request::from_parts(parts, body);
665 let response =
666 inner_service.oneshot(req).await.map_err(|_e| {
667 HttpCacheError::http(Box::new(std::io::Error::other(
668 "service error".to_string(),
669 )))
670 })?;
671 return Ok(response.map(HttpCacheBody::Original));
672 }
673
674 // Special case for Reload mode: skip cache lookup but still cache response
675 if analysis.cache_mode == CacheMode::Reload {
676 let req = Request::from_parts(parts, body);
677 let response =
678 inner_service.oneshot(req).await.map_err(|_e| {
679 HttpCacheError::http(Box::new(std::io::Error::other(
680 "service error".to_string(),
681 )))
682 })?;
683
684 let (res_parts, res_body) = response.into_parts();
685 let body_bytes =
686 collect_body(res_body).await.map_err(|_e| {
687 HttpCacheError::http(Box::new(std::io::Error::other(
688 "service error".to_string(),
689 )))
690 })?;
691
692 let cached_response = cache
693 .process_response(
694 analysis,
695 Response::from_parts(res_parts, body_bytes.clone()),
696 None,
697 )
698 .await
699 .cache_err()?;
700
701 return Ok(cached_response.map(HttpCacheBody::Buffered));
702 }
703
704 // Look up cached response using interface
705 if let Some((cached_response, policy)) = cache
706 .lookup_cached_response(&analysis.cache_key)
707 .await
708 .cache_err()?
709 {
710 let before_req =
711 policy.before_request(&parts, std::time::SystemTime::now());
712 match before_req {
713 BeforeRequest::Fresh(_) => {
714 // Return cached response
715 let mut response = http_cache::HttpCacheOptions::http_response_to_response(
716 &cached_response,
717 HttpCacheBody::Buffered(cached_response.body.clone()),
718 ).map_err(HttpCacheError::other)?;
719
720 // Add cache status headers if enabled
721 if cache.options.cache_status_headers {
722 response = add_cache_status_headers(
723 response, "HIT", "HIT",
724 );
725 }
726
727 // Insert metadata into response extensions if present
728 if let Some(metadata) = cached_response.metadata {
729 response.extensions_mut().insert(
730 http_cache::HttpCacheMetadata::from(metadata),
731 );
732 }
733
734 return Ok(response);
735 }
736 BeforeRequest::Stale {
737 request: conditional_parts, ..
738 } => {
739 // Make conditional request
740 let conditional_req =
741 Request::from_parts(conditional_parts, body);
742 let conditional_response = inner_service
743 .oneshot(conditional_req)
744 .await
745 .map_err(|_e| {
746 HttpCacheError::http(Box::new(
747 std::io::Error::other(
748 "service error".to_string(),
749 ),
750 ))
751 })?;
752
753 if conditional_response.status() == 304 {
754 // Use cached response with updated headers
755 let (fresh_parts, _) =
756 conditional_response.into_parts();
757 let updated_response = cache
758 .handle_not_modified(
759 cached_response,
760 &fresh_parts,
761 )
762 .await
763 .cache_err()?;
764
765 let mut response = http_cache::HttpCacheOptions::http_response_to_response(
766 &updated_response,
767 HttpCacheBody::Buffered(updated_response.body.clone()),
768 ).map_err(HttpCacheError::other)?;
769
770 // Add cache status headers if enabled
771 if cache.options.cache_status_headers {
772 response = add_cache_status_headers(
773 response, "HIT", "HIT",
774 );
775 }
776
777 // Insert metadata into response extensions if present
778 if let Some(metadata) = updated_response.metadata {
779 response.extensions_mut().insert(
780 http_cache::HttpCacheMetadata::from(
781 metadata,
782 ),
783 );
784 }
785
786 return Ok(response);
787 } else {
788 // Process fresh response
789 let (parts, res_body) =
790 conditional_response.into_parts();
791 let body_bytes =
792 collect_body(res_body).await.map_err(|_e| {
793 HttpCacheError::http(Box::new(
794 std::io::Error::other(
795 "service error".to_string(),
796 ),
797 ))
798 })?;
799
800 let cached_response = cache
801 .process_response(
802 analysis,
803 Response::from_parts(
804 parts,
805 body_bytes.clone(),
806 ),
807 None,
808 )
809 .await
810 .cache_err()?;
811
812 let mut response =
813 cached_response.map(HttpCacheBody::Buffered);
814
815 // Add cache status headers if enabled
816 if cache.options.cache_status_headers {
817 response = add_cache_status_headers(
818 response, "MISS", "MISS",
819 );
820 }
821
822 return Ok(response);
823 }
824 }
825 }
826 }
827
828 // Fetch fresh response
829 let req = Request::from_parts(parts, body);
830 let response = inner_service.oneshot(req).await.map_err(|_e| {
831 HttpCacheError::http(Box::new(std::io::Error::other(
832 "service error".to_string(),
833 )))
834 })?;
835
836 let (res_parts, res_body) = response.into_parts();
837 let body_bytes = collect_body(res_body).await.map_err(|_e| {
838 HttpCacheError::http(Box::new(std::io::Error::other(
839 "service error".to_string(),
840 )))
841 })?;
842
843 // Process and cache using interface
844 let cached_response = cache
845 .process_response(
846 analysis,
847 Response::from_parts(res_parts, body_bytes.clone()),
848 None,
849 )
850 .await
851 .cache_err()?;
852
853 let mut response = cached_response.map(HttpCacheBody::Buffered);
854
855 // Add cache status headers if enabled
856 if cache.options.cache_status_headers {
857 response = add_cache_status_headers(response, "MISS", "MISS");
858 }
859
860 Ok(response)
861 })
862 }
863}
864
865// Hyper service implementation for HttpCacheService
866impl<S, CM> hyper::service::Service<Request<hyper::body::Incoming>>
867 for HttpCacheService<S, CM>
868where
869 S: Service<Request<hyper::body::Incoming>> + Clone + Send + 'static,
870 S::Response: Into<Response<http_body_util::Full<Bytes>>>,
871 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
872 S::Future: Send + 'static,
873 CM: CacheManager,
874{
875 type Response = Response<HttpCacheBody<http_body_util::Full<Bytes>>>;
876 type Error = HttpCacheError;
877 type Future = Pin<
878 Box<
879 dyn std::future::Future<
880 Output = Result<Self::Response, Self::Error>,
881 > + Send,
882 >,
883 >;
884
885 fn call(&self, _req: Request<hyper::body::Incoming>) -> Self::Future {
886 // Convert to the format expected by the generic Service implementation
887 let service_clone = self.clone();
888 Box::pin(async move { service_clone.call(_req).await })
889 }
890}
891
892#[cfg(feature = "streaming")]
893impl<S, CM, ReqBody, ResBody> Service<Request<ReqBody>>
894 for HttpCacheStreamingService<S, CM>
895where
896 S: Service<Request<ReqBody>, Response = Response<ResBody>>
897 + Clone
898 + Send
899 + 'static,
900 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
901 S::Future: Send + 'static,
902 ReqBody: Body + Send + 'static,
903 ReqBody::Data: Send,
904 ReqBody::Error: Into<StreamingError>,
905 ResBody: Body + Send + 'static,
906 ResBody::Data: Send,
907 ResBody::Error: Into<StreamingError>,
908 CM: StreamingCacheManager,
909 <CM::Body as http_body::Body>::Data: Send,
910 <CM::Body as http_body::Body>::Error:
911 Into<StreamingError> + Send + Sync + 'static,
912{
913 type Response = Response<CM::Body>;
914 type Error = HttpCacheError;
915 type Future = Pin<
916 Box<
917 dyn std::future::Future<
918 Output = Result<Self::Response, Self::Error>,
919 > + Send,
920 >,
921 >;
922
923 fn poll_ready(
924 &mut self,
925 cx: &mut Context<'_>,
926 ) -> Poll<Result<(), Self::Error>> {
927 self.inner.poll_ready(cx).map_err(|_e| {
928 HttpCacheError::http(Box::new(std::io::Error::other(
929 "service error".to_string(),
930 )))
931 })
932 }
933
934 fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
935 let cache = self.cache.clone();
936 let (parts, body) = req.into_parts();
937 let inner_service = self.inner.clone();
938
939 Box::pin(async move {
940 use http_cache_semantics::BeforeRequest;
941
942 // Use the core library's streaming cache interface
943 let analysis = cache.analyze_request(&parts, None).cache_err()?;
944
945 // Handle cache busting
946 for key in &analysis.cache_bust_keys {
947 cache.manager.delete(key).await.cache_err()?;
948 }
949
950 // For non-GET/HEAD requests, invalidate cached GET responses
951 if !analysis.should_cache && !analysis.is_get_head {
952 let get_cache_key = cache
953 .options
954 .create_cache_key_for_invalidation(&parts, "GET");
955 let _ = cache.manager.delete(&get_cache_key).await;
956 }
957
958 // If not cacheable, convert body type and return
959 if !analysis.should_cache {
960 // Apply rate limiting before non-cached request
961 #[cfg(feature = "rate-limiting")]
962 if let Some(rate_limiter) = &cache.options.rate_limiter {
963 if let Ok(url) = parts.uri.to_string().parse::<::url::Url>()
964 {
965 let rate_limit_key =
966 url.host_str().unwrap_or("unknown");
967 rate_limiter.until_key_ready(rate_limit_key).await;
968 }
969 }
970
971 let req = Request::from_parts(parts, body);
972 let response =
973 inner_service.oneshot(req).await.map_err(|_e| {
974 HttpCacheError::http(Box::new(std::io::Error::other(
975 "service error".to_string(),
976 )))
977 })?;
978 let mut converted_response =
979 cache.manager.convert_body(response).await.cache_err()?;
980
981 // Add cache status headers if enabled
982 if cache.options.cache_status_headers {
983 converted_response = add_cache_status_headers_streaming(
984 converted_response,
985 "MISS",
986 "MISS",
987 );
988 }
989
990 return Ok(converted_response);
991 }
992
993 // Special case for Reload mode: skip cache lookup but still cache response
994 if analysis.cache_mode == CacheMode::Reload {
995 // Apply rate limiting before reload request
996 #[cfg(feature = "rate-limiting")]
997 if let Some(rate_limiter) = &cache.options.rate_limiter {
998 if let Ok(url) = parts.uri.to_string().parse::<::url::Url>()
999 {
1000 let rate_limit_key =
1001 url.host_str().unwrap_or("unknown");
1002 rate_limiter.until_key_ready(rate_limit_key).await;
1003 }
1004 }
1005
1006 let req = Request::from_parts(parts, body);
1007 let response =
1008 inner_service.oneshot(req).await.map_err(|_e| {
1009 HttpCacheError::http(Box::new(std::io::Error::other(
1010 "service error".to_string(),
1011 )))
1012 })?;
1013
1014 let cached_response = cache
1015 .process_response(analysis, response, None)
1016 .await
1017 .cache_err()?;
1018
1019 let mut final_response = cached_response;
1020
1021 // Add cache status headers if enabled
1022 if cache.options.cache_status_headers {
1023 final_response = add_cache_status_headers_streaming(
1024 final_response,
1025 "MISS",
1026 "MISS",
1027 );
1028 }
1029
1030 return Ok(final_response);
1031 }
1032
1033 // Look up cached response using interface
1034 if let Some((cached_response, policy)) = cache
1035 .lookup_cached_response(&analysis.cache_key)
1036 .await
1037 .cache_err()?
1038 {
1039 let before_req =
1040 policy.before_request(&parts, std::time::SystemTime::now());
1041 match before_req {
1042 BeforeRequest::Fresh(_) => {
1043 let mut response = cached_response;
1044
1045 // Add cache status headers if enabled
1046 if cache.options.cache_status_headers {
1047 response = add_cache_status_headers_streaming(
1048 response, "HIT", "HIT",
1049 );
1050 }
1051
1052 return Ok(response);
1053 }
1054 BeforeRequest::Stale {
1055 request: conditional_parts, ..
1056 } => {
1057 // Apply rate limiting before conditional request
1058 #[cfg(feature = "rate-limiting")]
1059 if let Some(rate_limiter) = &cache.options.rate_limiter
1060 {
1061 if let Ok(url) = conditional_parts
1062 .uri
1063 .to_string()
1064 .parse::<::url::Url>()
1065 {
1066 let rate_limit_key =
1067 url.host_str().unwrap_or("unknown");
1068 rate_limiter
1069 .until_key_ready(rate_limit_key)
1070 .await;
1071 }
1072 }
1073
1074 let conditional_req =
1075 Request::from_parts(conditional_parts, body);
1076 let conditional_response = inner_service
1077 .oneshot(conditional_req)
1078 .await
1079 .map_err(|_e| {
1080 HttpCacheError::http(Box::new(
1081 std::io::Error::other(
1082 "service error".to_string(),
1083 ),
1084 ))
1085 })?;
1086
1087 if conditional_response.status() == 304 {
1088 let (fresh_parts, _) =
1089 conditional_response.into_parts();
1090 let updated_response = cache
1091 .handle_not_modified(
1092 cached_response,
1093 &fresh_parts,
1094 )
1095 .await
1096 .cache_err()?;
1097
1098 let mut response = updated_response;
1099
1100 // Add cache status headers if enabled
1101 if cache.options.cache_status_headers {
1102 response = add_cache_status_headers_streaming(
1103 response, "HIT", "HIT",
1104 );
1105 }
1106
1107 return Ok(response);
1108 } else {
1109 let cached_response = cache
1110 .process_response(
1111 analysis,
1112 conditional_response,
1113 None,
1114 )
1115 .await
1116 .cache_err()?;
1117
1118 let mut response = cached_response;
1119
1120 // Add cache status headers if enabled
1121 if cache.options.cache_status_headers {
1122 response = add_cache_status_headers_streaming(
1123 response, "MISS", "MISS",
1124 );
1125 }
1126
1127 return Ok(response);
1128 }
1129 }
1130 }
1131 }
1132
1133 // Apply rate limiting before fresh request
1134 #[cfg(feature = "rate-limiting")]
1135 if let Some(rate_limiter) = &cache.options.rate_limiter {
1136 if let Ok(url) = parts.uri.to_string().parse::<url::Url>() {
1137 let rate_limit_key = url.host_str().unwrap_or("unknown");
1138 rate_limiter.until_key_ready(rate_limit_key).await;
1139 }
1140 }
1141
1142 // Fetch fresh response
1143 let req = Request::from_parts(parts, body);
1144 let response = inner_service.oneshot(req).await.map_err(|_e| {
1145 HttpCacheError::http(Box::new(std::io::Error::other(
1146 "service error".to_string(),
1147 )))
1148 })?;
1149
1150 // Process using streaming interface
1151 let cached_response = cache
1152 .process_response(analysis, response, None)
1153 .await
1154 .cache_err()?;
1155
1156 let mut final_response = cached_response;
1157
1158 // Add cache status headers if enabled
1159 if cache.options.cache_status_headers {
1160 final_response = add_cache_status_headers_streaming(
1161 final_response,
1162 "MISS",
1163 "MISS",
1164 );
1165 }
1166
1167 Ok(final_response)
1168 })
1169 }
1170}
1171
1172/// Body type that wraps cached responses
1173pub enum HttpCacheBody<B> {
1174 /// Buffered body from cache
1175 Buffered(Vec<u8>),
1176 /// Original body (fallback)
1177 Original(B),
1178}
1179
1180impl<B> Body for HttpCacheBody<B>
1181where
1182 B: Body + Unpin,
1183 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
1184 B::Data: Into<bytes::Bytes>,
1185{
1186 type Data = bytes::Bytes;
1187 type Error = Box<dyn std::error::Error + Send + Sync>;
1188
1189 fn poll_frame(
1190 mut self: Pin<&mut Self>,
1191 cx: &mut Context<'_>,
1192 ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
1193 match &mut *self {
1194 HttpCacheBody::Buffered(bytes) => {
1195 if bytes.is_empty() {
1196 Poll::Ready(None)
1197 } else {
1198 let data = std::mem::take(bytes);
1199 Poll::Ready(Some(Ok(http_body::Frame::data(
1200 bytes::Bytes::from(data),
1201 ))))
1202 }
1203 }
1204 HttpCacheBody::Original(body) => {
1205 Pin::new(body).poll_frame(cx).map(|opt| {
1206 opt.map(|res| {
1207 res.map(|frame| frame.map_data(Into::into))
1208 .map_err(Into::into)
1209 })
1210 })
1211 }
1212 }
1213 }
1214
1215 fn is_end_stream(&self) -> bool {
1216 match self {
1217 HttpCacheBody::Buffered(bytes) => bytes.is_empty(),
1218 HttpCacheBody::Original(body) => body.is_end_stream(),
1219 }
1220 }
1221
1222 fn size_hint(&self) -> http_body::SizeHint {
1223 match self {
1224 HttpCacheBody::Buffered(bytes) => {
1225 let len = bytes.len() as u64;
1226 http_body::SizeHint::with_exact(len)
1227 }
1228 HttpCacheBody::Original(body) => body.size_hint(),
1229 }
1230 }
1231}
1232
1233#[cfg(test)]
1234mod test;