http_cache_tower/lib.rs
1//! HTTP caching middleware for Tower services and Axum applications.
2//!
3//! This crate provides Tower layers that implement HTTP caching according to RFC 7234.
4//! It supports both traditional buffered caching and streaming responses for large payloads.
5//!
6//! ## Basic Usage
7//!
8//! ### With Tower Services
9//!
10//! ```rust,no_run
11//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
12//! use http_cache::{CacheMode, HttpCache, HttpCacheOptions};
13//! use tower::ServiceBuilder;
14//! use tower::service_fn;
15//! use tower::ServiceExt;
16//! use http::{Request, Response};
17//! use http_body_util::Full;
18//! use bytes::Bytes;
19//! use std::convert::Infallible;
20//!
21//! async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
22//! Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
23//! }
24//!
25//! #[tokio::main]
26//! async fn main() {
27//! // Create cache manager with disk storage
28//! let cache_manager = CACacheManager::new("./cache".into(), true);
29//!
30//! // Create cache layer
31//! let cache_layer = HttpCacheLayer::new(cache_manager);
32//!
33//! // Build service with caching
34//! let service = ServiceBuilder::new()
35//! .layer(cache_layer)
36//! .service_fn(handler);
37//!
38//! // Use the service
39//! let request = Request::builder()
40//! .uri("http://example.com")
41//! .body(Full::new(Bytes::new()))
42//! .unwrap();
43//! let response = service.oneshot(request).await.unwrap();
44//! }
45//! ```
46//!
47//! ### With Custom Cache Configuration
48//!
49//! ```rust
50//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
51//! use http_cache::{CacheMode, HttpCache, HttpCacheOptions};
52//!
53//! # #[tokio::main]
54//! # async fn main() {
55//! // Create cache manager
56//! let cache_manager = CACacheManager::new("./cache".into(), true);
57//!
58//! // Configure cache behavior
59//! let cache = HttpCache {
60//! mode: CacheMode::Default,
61//! manager: cache_manager,
62//! options: HttpCacheOptions::default(),
63//! };
64//!
65//! // Create layer with custom cache
66//! let cache_layer = HttpCacheLayer::with_cache(cache);
67//! # }
68//! ```
69//!
70//! ### Streaming Support
71//!
72//! For handling large responses without buffering, use `StreamingManager`:
73//!
74//! ```rust
75//! use http_cache_tower::HttpCacheStreamingLayer;
76//! use http_cache::StreamingManager;
77//! use std::path::PathBuf;
78//!
79//! # #[tokio::main]
80//! # async fn main() {
81//! // Create streaming cache setup
82//! let streaming_manager = StreamingManager::new("./streaming-cache".into());
83//! let streaming_layer = HttpCacheStreamingLayer::new(streaming_manager);
84//!
85//! // Use with your service
86//! // let service = streaming_layer.layer(your_service);
87//! # }
88//! ```
89//!
90//! ## Cache Modes
91//!
92//! Different cache modes provide different behaviors:
93//!
94//! - `CacheMode::Default`: Follow HTTP caching rules strictly
95//! - `CacheMode::NoStore`: Never cache responses
96//! - `CacheMode::NoCache`: Always revalidate with the origin server
97//! - `CacheMode::ForceCache`: Cache responses even if headers suggest otherwise
98//! - `CacheMode::OnlyIfCached`: Only serve from cache, never hit origin server
99//! - `CacheMode::IgnoreRules`: Cache everything regardless of headers
100//!
101//! ## Cache Invalidation
102//!
103//! The middleware automatically handles cache invalidation for unsafe HTTP methods:
104//!
105//! ```text
106//! These methods will invalidate any cached GET response for the same URI:
107//! - PUT /api/users/123 -> invalidates GET /api/users/123
108//! - POST /api/users/123 -> invalidates GET /api/users/123
109//! - DELETE /api/users/123 -> invalidates GET /api/users/123
110//! - PATCH /api/users/123 -> invalidates GET /api/users/123
111//! ```
112//!
113//! ## Integration with Other Tower Layers
114//!
115//! The cache layer works with other Tower middleware:
116//!
117//! ```rust,no_run
118//! use tower::ServiceBuilder;
119//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
120//! use tower::service_fn;
121//! use tower::ServiceExt;
122//! use http::{Request, Response};
123//! use http_body_util::Full;
124//! use bytes::Bytes;
125//! use std::convert::Infallible;
126//!
127//! async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
128//! Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
129//! }
130//!
131//! #[tokio::main]
132//! async fn main() {
133//! let cache_manager = CACacheManager::new("./cache".into(), true);
134//! let cache_layer = HttpCacheLayer::new(cache_manager);
135//!
136//! let service = ServiceBuilder::new()
137//! // .layer(TraceLayer::new_for_http()) // Logging (requires tower-http)
138//! // .layer(CompressionLayer::new()) // Compression (requires tower-http)
139//! .layer(cache_layer) // Caching
140//! .service_fn(handler);
141//!
142//! // Use the service
143//! let request = Request::builder()
144//! .uri("http://example.com")
145//! .body(Full::new(Bytes::new()))
146//! .unwrap();
147//! let response = service.oneshot(request).await.unwrap();
148//! }
149//! ```
150
151use bytes::Bytes;
152use http::{Request, Response};
153use http_body::Body;
154use http_body_util::BodyExt;
155#[cfg(feature = "manager-cacache")]
156pub use http_cache::CACacheManager;
157#[cfg(feature = "streaming")]
158use http_cache::StreamingError;
159use http_cache::{
160 CacheManager, CacheMode, HttpCache, HttpCacheInterface, HttpCacheOptions,
161};
162#[cfg(feature = "streaming")]
163use http_cache::{
164 HttpCacheStreamInterface, HttpStreamingCache, StreamingCacheManager,
165};
166use std::{
167 pin::Pin,
168 sync::Arc,
169 task::{Context, Poll},
170};
171use tower::{Layer, Service, ServiceExt};
172
173pub mod error;
174pub use error::HttpCacheError;
175#[cfg(feature = "streaming")]
176pub use error::TowerStreamingError;
177
178/// Helper functions for error conversions
179trait HttpCacheErrorExt<T> {
180 fn cache_err(self) -> Result<T, HttpCacheError>;
181}
182
183trait HttpErrorExt<T> {
184 fn http_err(self) -> Result<T, HttpCacheError>;
185}
186
187impl<T, E> HttpCacheErrorExt<T> for Result<T, E>
188where
189 E: ToString,
190{
191 fn cache_err(self) -> Result<T, HttpCacheError> {
192 self.map_err(|e| HttpCacheError::CacheError(e.to_string()))
193 }
194}
195
196impl<T, E> HttpErrorExt<T> for Result<T, E>
197where
198 E: Into<Box<dyn std::error::Error + Send + Sync>>,
199{
200 fn http_err(self) -> Result<T, HttpCacheError> {
201 self.map_err(|e| HttpCacheError::HttpError(e.into()))
202 }
203}
204
205/// Helper function to collect a body into bytes
206async fn collect_body<B>(body: B) -> Result<Vec<u8>, B::Error>
207where
208 B: Body,
209{
210 let collected = BodyExt::collect(body).await?;
211 Ok(collected.to_bytes().to_vec())
212}
213
214/// HTTP cache layer for Tower services.
215///
216/// This layer implements HTTP caching according to RFC 7234, automatically caching
217/// GET and HEAD responses based on their cache-control headers and invalidating
218/// cache entries when unsafe methods (PUT, POST, DELETE, PATCH) are used.
219///
220/// # Example
221///
222/// ```rust
223/// use http_cache_tower::{HttpCacheLayer, CACacheManager};
224/// use tower::ServiceBuilder;
225/// use tower::service_fn;
226/// use http::{Request, Response};
227/// use http_body_util::Full;
228/// use bytes::Bytes;
229/// use std::convert::Infallible;
230///
231/// # #[tokio::main]
232/// # async fn main() {
233/// let cache_manager = CACacheManager::new("./cache".into(), true);
234/// let cache_layer = HttpCacheLayer::new(cache_manager);
235///
236/// // Use with ServiceBuilder
237/// let service = ServiceBuilder::new()
238/// .layer(cache_layer)
239/// .service_fn(|_req: Request<Full<Bytes>>| async {
240/// Ok::<_, Infallible>(Response::new(Full::new(Bytes::from("Hello"))))
241/// });
242/// # }
243/// ```
244#[derive(Clone)]
245pub struct HttpCacheLayer<CM>
246where
247 CM: CacheManager,
248{
249 cache: Arc<HttpCache<CM>>,
250}
251
252impl<CM> HttpCacheLayer<CM>
253where
254 CM: CacheManager,
255{
256 /// Create a new HTTP cache layer with default configuration.
257 ///
258 /// Uses [`CacheMode::Default`] and default [`HttpCacheOptions`].
259 ///
260 /// # Arguments
261 ///
262 /// * `cache_manager` - The cache manager to use for storing responses
263 ///
264 /// # Example
265 ///
266 /// ```rust
267 /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
268 ///
269 /// # #[tokio::main]
270 /// # async fn main() {
271 /// let cache_manager = CACacheManager::new("./cache".into(), true);
272 /// let layer = HttpCacheLayer::new(cache_manager);
273 /// # }
274 /// ```
275 pub fn new(cache_manager: CM) -> Self {
276 Self {
277 cache: Arc::new(HttpCache {
278 mode: CacheMode::Default,
279 manager: cache_manager,
280 options: HttpCacheOptions::default(),
281 }),
282 }
283 }
284
285 /// Create a new HTTP cache layer with custom options.
286 ///
287 /// Uses [`CacheMode::Default`] but allows customizing the cache behavior
288 /// through [`HttpCacheOptions`].
289 ///
290 /// # Arguments
291 ///
292 /// * `cache_manager` - The cache manager to use for storing responses
293 /// * `options` - Custom cache options
294 ///
295 /// # Example
296 ///
297 /// ```rust
298 /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
299 /// use http_cache::HttpCacheOptions;
300 ///
301 /// # #[tokio::main]
302 /// # async fn main() {
303 /// let cache_manager = CACacheManager::new("./cache".into(), true);
304 ///
305 /// let options = HttpCacheOptions {
306 /// cache_key: Some(std::sync::Arc::new(|req: &http::request::Parts| {
307 /// format!("custom:{}:{}", req.method, req.uri)
308 /// })),
309 /// ..Default::default()
310 /// };
311 ///
312 /// let layer = HttpCacheLayer::with_options(cache_manager, options);
313 /// # }
314 /// ```
315 pub fn with_options(cache_manager: CM, options: HttpCacheOptions) -> Self {
316 Self {
317 cache: Arc::new(HttpCache {
318 mode: CacheMode::Default,
319 manager: cache_manager,
320 options,
321 }),
322 }
323 }
324
325 /// Create a new HTTP cache layer with a pre-configured cache.
326 ///
327 /// This method gives you full control over the cache configuration,
328 /// including the cache mode.
329 ///
330 /// # Arguments
331 ///
332 /// * `cache` - A fully configured HttpCache instance
333 ///
334 /// # Example
335 ///
336 /// ```rust
337 /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
338 /// use http_cache::{HttpCache, CacheMode, HttpCacheOptions};
339 ///
340 /// # #[tokio::main]
341 /// # async fn main() {
342 /// let cache_manager = CACacheManager::new("./cache".into(), true);
343 ///
344 /// let cache = HttpCache {
345 /// mode: CacheMode::ForceCache,
346 /// manager: cache_manager,
347 /// options: HttpCacheOptions::default(),
348 /// };
349 ///
350 /// let layer = HttpCacheLayer::with_cache(cache);
351 /// # }
352 /// ```
353 pub fn with_cache(cache: HttpCache<CM>) -> Self {
354 Self { cache: Arc::new(cache) }
355 }
356}
357
358/// HTTP cache layer with streaming support for Tower services.
359///
360/// This layer provides the same HTTP caching functionality as [`HttpCacheLayer`]
361/// but handles streaming responses. It can work with large
362/// responses without buffering them entirely in memory.
363///
364/// # Example
365///
366/// ```rust
367/// use http_cache_tower::HttpCacheStreamingLayer;
368/// use http_cache::StreamingManager;
369/// use tower::ServiceBuilder;
370/// use tower::service_fn;
371/// use http::{Request, Response};
372/// use http_body_util::Full;
373/// use bytes::Bytes;
374/// use std::convert::Infallible;
375///
376/// async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
377/// Ok(Response::new(Full::new(Bytes::from("Hello"))))
378/// }
379///
380/// # #[tokio::main]
381/// # async fn main() {
382/// let streaming_manager = StreamingManager::new("./cache".into());
383/// let streaming_layer = HttpCacheStreamingLayer::new(streaming_manager);
384///
385/// // Use with ServiceBuilder
386/// let service = ServiceBuilder::new()
387/// .layer(streaming_layer)
388/// .service_fn(handler);
389/// # }
390/// ```
391#[cfg(feature = "streaming")]
392#[derive(Clone)]
393pub struct HttpCacheStreamingLayer<CM>
394where
395 CM: StreamingCacheManager,
396{
397 cache: Arc<HttpStreamingCache<CM>>,
398}
399
400#[cfg(feature = "streaming")]
401impl<CM> HttpCacheStreamingLayer<CM>
402where
403 CM: StreamingCacheManager,
404{
405 /// Create a new HTTP cache streaming layer with default configuration.
406 ///
407 /// Uses [`CacheMode::Default`] and default [`HttpCacheOptions`].
408 ///
409 /// # Arguments
410 ///
411 /// * `cache_manager` - The streaming cache manager to use
412 ///
413 /// # Example
414 ///
415 /// ```rust
416 /// use http_cache_tower::HttpCacheStreamingLayer;
417 /// use http_cache::StreamingManager;
418 ///
419 /// # #[tokio::main]
420 /// # async fn main() {
421 /// let streaming_manager = StreamingManager::new("./cache".into());
422 /// let layer = HttpCacheStreamingLayer::new(streaming_manager);
423 /// # }
424 /// ```
425 pub fn new(cache_manager: CM) -> Self {
426 Self {
427 cache: Arc::new(HttpStreamingCache {
428 mode: CacheMode::Default,
429 manager: cache_manager,
430 options: HttpCacheOptions::default(),
431 }),
432 }
433 }
434
435 /// Create a new HTTP cache streaming layer with custom options.
436 ///
437 /// Uses [`CacheMode::Default`] but allows customizing cache behavior.
438 ///
439 /// # Arguments
440 ///
441 /// * `cache_manager` - The streaming cache manager to use
442 /// * `options` - Custom cache options
443 ///
444 /// # Example
445 ///
446 /// ```rust
447 /// use http_cache_tower::HttpCacheStreamingLayer;
448 /// use http_cache::{StreamingManager, HttpCacheOptions};
449 ///
450 /// # #[tokio::main]
451 /// # async fn main() {
452 /// let streaming_manager = StreamingManager::new("./cache".into());
453 ///
454 /// let options = HttpCacheOptions {
455 /// cache_key: Some(std::sync::Arc::new(|req: &http::request::Parts| {
456 /// format!("stream:{}:{}", req.method, req.uri)
457 /// })),
458 /// ..Default::default()
459 /// };
460 ///
461 /// let layer = HttpCacheStreamingLayer::with_options(streaming_manager, options);
462 /// # }
463 /// ```
464 pub fn with_options(cache_manager: CM, options: HttpCacheOptions) -> Self {
465 Self {
466 cache: Arc::new(HttpStreamingCache {
467 mode: CacheMode::Default,
468 manager: cache_manager,
469 options,
470 }),
471 }
472 }
473
474 /// Create a new HTTP cache streaming layer with a pre-configured cache.
475 ///
476 /// This method gives you full control over the streaming cache configuration.
477 ///
478 /// # Arguments
479 ///
480 /// * `cache` - A fully configured HttpStreamingCache instance
481 ///
482 /// # Example
483 ///
484 /// ```rust
485 /// use http_cache_tower::HttpCacheStreamingLayer;
486 /// use http_cache::{StreamingManager, HttpStreamingCache, CacheMode, HttpCacheOptions};
487 ///
488 /// # #[tokio::main]
489 /// # async fn main() {
490 /// let streaming_manager = StreamingManager::new("./cache".into());
491 ///
492 /// let cache = HttpStreamingCache {
493 /// mode: CacheMode::ForceCache,
494 /// manager: streaming_manager,
495 /// options: HttpCacheOptions::default(),
496 /// };
497 ///
498 /// let layer = HttpCacheStreamingLayer::with_cache(cache);
499 /// # }
500 /// ```
501 pub fn with_cache(cache: HttpStreamingCache<CM>) -> Self {
502 Self { cache: Arc::new(cache) }
503 }
504}
505
506impl<S, CM> Layer<S> for HttpCacheLayer<CM>
507where
508 CM: CacheManager,
509{
510 type Service = HttpCacheService<S, CM>;
511
512 fn layer(&self, inner: S) -> Self::Service {
513 HttpCacheService { inner, cache: self.cache.clone() }
514 }
515}
516
517#[cfg(feature = "streaming")]
518impl<S, CM> Layer<S> for HttpCacheStreamingLayer<CM>
519where
520 CM: StreamingCacheManager,
521{
522 type Service = HttpCacheStreamingService<S, CM>;
523
524 fn layer(&self, inner: S) -> Self::Service {
525 HttpCacheStreamingService { inner, cache: self.cache.clone() }
526 }
527}
528
529/// HTTP cache service for Tower/Hyper
530pub struct HttpCacheService<S, CM>
531where
532 CM: CacheManager,
533{
534 inner: S,
535 cache: Arc<HttpCache<CM>>,
536}
537
538impl<S, CM> Clone for HttpCacheService<S, CM>
539where
540 S: Clone,
541 CM: CacheManager,
542{
543 fn clone(&self) -> Self {
544 Self { inner: self.inner.clone(), cache: self.cache.clone() }
545 }
546}
547
548/// HTTP cache streaming service for Tower/Hyper
549#[cfg(feature = "streaming")]
550pub struct HttpCacheStreamingService<S, CM>
551where
552 CM: StreamingCacheManager,
553{
554 inner: S,
555 cache: Arc<HttpStreamingCache<CM>>,
556}
557
558#[cfg(feature = "streaming")]
559impl<S, CM> Clone for HttpCacheStreamingService<S, CM>
560where
561 S: Clone,
562 CM: StreamingCacheManager,
563{
564 fn clone(&self) -> Self {
565 Self { inner: self.inner.clone(), cache: self.cache.clone() }
566 }
567}
568
569impl<S, CM, ReqBody, ResBody> Service<Request<ReqBody>>
570 for HttpCacheService<S, CM>
571where
572 S: Service<Request<ReqBody>, Response = Response<ResBody>>
573 + Clone
574 + Send
575 + 'static,
576 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
577 S::Future: Send + 'static,
578 ReqBody: Body + Send + 'static,
579 ReqBody::Data: Send,
580 ReqBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
581 ResBody: Body + Send + 'static,
582 ResBody::Data: Send,
583 ResBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
584 CM: CacheManager,
585{
586 type Response = Response<HttpCacheBody<ResBody>>;
587 type Error = HttpCacheError;
588 type Future = Pin<
589 Box<
590 dyn std::future::Future<
591 Output = Result<Self::Response, Self::Error>,
592 > + Send,
593 >,
594 >;
595
596 fn poll_ready(
597 &mut self,
598 cx: &mut Context<'_>,
599 ) -> Poll<Result<(), Self::Error>> {
600 self.inner
601 .poll_ready(cx)
602 .map_err(|e| HttpCacheError::HttpError(e.into()))
603 }
604
605 fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
606 let cache = self.cache.clone();
607 let (parts, body) = req.into_parts();
608 let inner_service = self.inner.clone();
609
610 Box::pin(async move {
611 use http_cache_semantics::BeforeRequest;
612
613 // Use the core library's cache interface for analysis
614 let analysis = cache.analyze_request(&parts, None).cache_err()?;
615
616 // Handle cache busting and non-cacheable requests
617 for key in &analysis.cache_bust_keys {
618 cache.manager.delete(key).await.cache_err()?;
619 }
620
621 // For non-GET/HEAD requests, invalidate cached GET responses
622 if !analysis.should_cache && !analysis.is_get_head {
623 let get_cache_key = cache
624 .options
625 .create_cache_key_for_invalidation(&parts, "GET");
626 let _ = cache.manager.delete(&get_cache_key).await;
627 }
628
629 // If not cacheable, just pass through
630 if !analysis.should_cache {
631 let req = Request::from_parts(parts, body);
632 let response = inner_service.oneshot(req).await.http_err()?;
633 return Ok(response.map(HttpCacheBody::Original));
634 }
635
636 // Special case for Reload mode: skip cache lookup but still cache response
637 if analysis.cache_mode == CacheMode::Reload {
638 let req = Request::from_parts(parts, body);
639 let response = inner_service.oneshot(req).await.http_err()?;
640
641 let (res_parts, res_body) = response.into_parts();
642 let body_bytes = collect_body(res_body).await.http_err()?;
643
644 let cached_response = cache
645 .process_response(
646 analysis,
647 Response::from_parts(res_parts, body_bytes.clone()),
648 )
649 .await
650 .cache_err()?;
651
652 return Ok(cached_response.map(HttpCacheBody::Buffered));
653 }
654
655 // Look up cached response using interface
656 if let Some((cached_response, policy)) = cache
657 .lookup_cached_response(&analysis.cache_key)
658 .await
659 .cache_err()?
660 {
661 let before_req =
662 policy.before_request(&parts, std::time::SystemTime::now());
663 match before_req {
664 BeforeRequest::Fresh(_) => {
665 // Return cached response
666 let response = http_cache::HttpCacheOptions::http_response_to_response(
667 &cached_response,
668 HttpCacheBody::Buffered(cached_response.body.clone()),
669 ).map_err(HttpCacheError::HttpError)?;
670 return Ok(response);
671 }
672 BeforeRequest::Stale {
673 request: conditional_parts, ..
674 } => {
675 // Make conditional request
676 let conditional_req =
677 Request::from_parts(conditional_parts, body);
678 let conditional_response = inner_service
679 .oneshot(conditional_req)
680 .await
681 .http_err()?;
682
683 if conditional_response.status() == 304 {
684 // Use cached response with updated headers
685 let (fresh_parts, _) =
686 conditional_response.into_parts();
687 let updated_response = cache
688 .handle_not_modified(
689 cached_response,
690 &fresh_parts,
691 )
692 .await
693 .cache_err()?;
694
695 let response = http_cache::HttpCacheOptions::http_response_to_response(
696 &updated_response,
697 HttpCacheBody::Buffered(updated_response.body.clone()),
698 ).map_err(HttpCacheError::HttpError)?;
699 return Ok(response);
700 } else {
701 // Process fresh response
702 let (parts, res_body) =
703 conditional_response.into_parts();
704 let body_bytes =
705 collect_body(res_body).await.http_err()?;
706
707 let cached_response = cache
708 .process_response(
709 analysis,
710 Response::from_parts(
711 parts,
712 body_bytes.clone(),
713 ),
714 )
715 .await
716 .cache_err()?;
717
718 return Ok(
719 cached_response.map(HttpCacheBody::Buffered)
720 );
721 }
722 }
723 }
724 }
725
726 // Fetch fresh response
727 let req = Request::from_parts(parts, body);
728 let response = inner_service.oneshot(req).await.http_err()?;
729
730 let (res_parts, res_body) = response.into_parts();
731 let body_bytes = collect_body(res_body).await.http_err()?;
732
733 // Process and cache using interface
734 let cached_response = cache
735 .process_response(
736 analysis,
737 Response::from_parts(res_parts, body_bytes.clone()),
738 )
739 .await
740 .cache_err()?;
741
742 Ok(cached_response.map(HttpCacheBody::Buffered))
743 })
744 }
745}
746
747// Hyper service implementation for HttpCacheService
748impl<S, CM> hyper::service::Service<Request<hyper::body::Incoming>>
749 for HttpCacheService<S, CM>
750where
751 S: Service<Request<hyper::body::Incoming>> + Clone + Send + 'static,
752 S::Response: Into<Response<http_body_util::Full<Bytes>>>,
753 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
754 S::Future: Send + 'static,
755 CM: CacheManager,
756{
757 type Response = Response<HttpCacheBody<http_body_util::Full<Bytes>>>;
758 type Error = HttpCacheError;
759 type Future = Pin<
760 Box<
761 dyn std::future::Future<
762 Output = Result<Self::Response, Self::Error>,
763 > + Send,
764 >,
765 >;
766
767 fn call(&self, _req: Request<hyper::body::Incoming>) -> Self::Future {
768 // Convert to the format expected by the generic Service implementation
769 let service_clone = self.clone();
770 Box::pin(async move { service_clone.call(_req).await })
771 }
772}
773
774#[cfg(feature = "streaming")]
775impl<S, CM, ReqBody, ResBody> Service<Request<ReqBody>>
776 for HttpCacheStreamingService<S, CM>
777where
778 S: Service<Request<ReqBody>, Response = Response<ResBody>>
779 + Clone
780 + Send
781 + 'static,
782 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
783 S::Future: Send + 'static,
784 ReqBody: Body + Send + 'static,
785 ReqBody::Data: Send,
786 ReqBody::Error: Into<StreamingError>,
787 ResBody: Body + Send + 'static,
788 ResBody::Data: Send,
789 ResBody::Error: Into<StreamingError>,
790 CM: StreamingCacheManager,
791 <CM::Body as http_body::Body>::Data: Send,
792 <CM::Body as http_body::Body>::Error:
793 Into<StreamingError> + Send + Sync + 'static,
794{
795 type Response = Response<CM::Body>;
796 type Error = HttpCacheError;
797 type Future = Pin<
798 Box<
799 dyn std::future::Future<
800 Output = Result<Self::Response, Self::Error>,
801 > + Send,
802 >,
803 >;
804
805 fn poll_ready(
806 &mut self,
807 cx: &mut Context<'_>,
808 ) -> Poll<Result<(), Self::Error>> {
809 self.inner
810 .poll_ready(cx)
811 .map_err(|e| HttpCacheError::HttpError(e.into()))
812 }
813
814 fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
815 let cache = self.cache.clone();
816 let (parts, body) = req.into_parts();
817 let inner_service = self.inner.clone();
818
819 Box::pin(async move {
820 use http_cache_semantics::BeforeRequest;
821
822 // Use the core library's streaming cache interface
823 let analysis = cache.analyze_request(&parts, None).cache_err()?;
824
825 // Handle cache busting
826 for key in &analysis.cache_bust_keys {
827 cache.manager.delete(key).await.cache_err()?;
828 }
829
830 // For non-GET/HEAD requests, invalidate cached GET responses
831 if !analysis.should_cache && !analysis.is_get_head {
832 let get_cache_key = cache
833 .options
834 .create_cache_key_for_invalidation(&parts, "GET");
835 let _ = cache.manager.delete(&get_cache_key).await;
836 }
837
838 // If not cacheable, convert body type and return
839 if !analysis.should_cache {
840 let req = Request::from_parts(parts, body);
841 let response = inner_service.oneshot(req).await.http_err()?;
842 return cache.manager.convert_body(response).await.cache_err();
843 }
844
845 // Special case for Reload mode: skip cache lookup but still cache response
846 if analysis.cache_mode == CacheMode::Reload {
847 let req = Request::from_parts(parts, body);
848 let response = inner_service.oneshot(req).await.http_err()?;
849
850 let cached_response = cache
851 .process_response(analysis, response)
852 .await
853 .cache_err()?;
854
855 return Ok(cached_response);
856 }
857
858 // Look up cached response using interface
859 if let Some((cached_response, policy)) = cache
860 .lookup_cached_response(&analysis.cache_key)
861 .await
862 .cache_err()?
863 {
864 let before_req =
865 policy.before_request(&parts, std::time::SystemTime::now());
866 match before_req {
867 BeforeRequest::Fresh(_) => {
868 return Ok(cached_response);
869 }
870 BeforeRequest::Stale {
871 request: conditional_parts, ..
872 } => {
873 let conditional_req =
874 Request::from_parts(conditional_parts, body);
875 let conditional_response = inner_service
876 .oneshot(conditional_req)
877 .await
878 .http_err()?;
879
880 if conditional_response.status() == 304 {
881 let (fresh_parts, _) =
882 conditional_response.into_parts();
883 let updated_response = cache
884 .handle_not_modified(
885 cached_response,
886 &fresh_parts,
887 )
888 .await
889 .cache_err()?;
890 return Ok(updated_response);
891 } else {
892 let cached_response = cache
893 .process_response(
894 analysis,
895 conditional_response,
896 )
897 .await
898 .cache_err()?;
899 return Ok(cached_response);
900 }
901 }
902 }
903 }
904
905 // Fetch fresh response
906 let req = Request::from_parts(parts, body);
907 let response = inner_service.oneshot(req).await.http_err()?;
908
909 // Process using streaming interface
910 let cached_response =
911 cache.process_response(analysis, response).await.cache_err()?;
912
913 Ok(cached_response)
914 })
915 }
916}
917
918/// Body type that wraps cached responses
919pub enum HttpCacheBody<B> {
920 /// Buffered body from cache
921 Buffered(Vec<u8>),
922 /// Original body (fallback)
923 Original(B),
924}
925
926impl<B> Body for HttpCacheBody<B>
927where
928 B: Body + Unpin,
929 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
930 B::Data: Into<bytes::Bytes>,
931{
932 type Data = bytes::Bytes;
933 type Error = Box<dyn std::error::Error + Send + Sync>;
934
935 fn poll_frame(
936 mut self: Pin<&mut Self>,
937 cx: &mut Context<'_>,
938 ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
939 match &mut *self {
940 HttpCacheBody::Buffered(bytes) => {
941 if bytes.is_empty() {
942 Poll::Ready(None)
943 } else {
944 let data = std::mem::take(bytes);
945 Poll::Ready(Some(Ok(http_body::Frame::data(
946 bytes::Bytes::from(data),
947 ))))
948 }
949 }
950 HttpCacheBody::Original(body) => {
951 Pin::new(body).poll_frame(cx).map(|opt| {
952 opt.map(|res| {
953 res.map(|frame| frame.map_data(Into::into))
954 .map_err(Into::into)
955 })
956 })
957 }
958 }
959 }
960
961 fn is_end_stream(&self) -> bool {
962 match self {
963 HttpCacheBody::Buffered(bytes) => bytes.is_empty(),
964 HttpCacheBody::Original(body) => body.is_end_stream(),
965 }
966 }
967
968 fn size_hint(&self) -> http_body::SizeHint {
969 match self {
970 HttpCacheBody::Buffered(bytes) => {
971 let len = bytes.len() as u64;
972 http_body::SizeHint::with_exact(len)
973 }
974 HttpCacheBody::Original(body) => body.size_hint(),
975 }
976 }
977}
978
979#[cfg(test)]
980mod test;