http_cache_tower/lib.rs
1//! HTTP caching middleware for Tower services and Axum applications.
2//!
3//! This crate provides Tower layers that implement HTTP caching according to RFC 7234.
4//! It supports both traditional buffered caching and streaming responses for large payloads.
5//!
6//! ## Basic Usage
7//!
8//! ### With Tower Services
9//!
10//! ```rust,no_run
11//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
12//! use http_cache::{CacheMode, HttpCache, HttpCacheOptions};
13//! use tower::ServiceBuilder;
14//! use tower::service_fn;
15//! use tower::ServiceExt;
16//! use http::{Request, Response};
17//! use http_body_util::Full;
18//! use bytes::Bytes;
19//! use std::convert::Infallible;
20//!
21//! async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
22//! Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
23//! }
24//!
25//! #[tokio::main]
26//! async fn main() {
27//! // Create cache manager with disk storage
28//! let cache_manager = CACacheManager::new("./cache".into(), true);
29//!
30//! // Create cache layer
31//! let cache_layer = HttpCacheLayer::new(cache_manager);
32//!
33//! // Build service with caching
34//! let service = ServiceBuilder::new()
35//! .layer(cache_layer)
36//! .service_fn(handler);
37//!
38//! // Use the service
39//! let request = Request::builder()
40//! .uri("http://example.com")
41//! .body(Full::new(Bytes::new()))
42//! .unwrap();
43//! let response = service.oneshot(request).await.unwrap();
44//! }
45//! ```
46//!
47//! ### With Custom Cache Configuration
48//!
49//! ```rust
50//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
51//! use http_cache::{CacheMode, HttpCache, HttpCacheOptions};
52//!
53//! # #[tokio::main]
54//! # async fn main() {
55//! // Create cache manager
56//! let cache_manager = CACacheManager::new("./cache".into(), true);
57//!
58//! // Configure cache behavior
59//! let cache = HttpCache {
60//! mode: CacheMode::Default,
61//! manager: cache_manager,
62//! options: HttpCacheOptions::default(),
63//! };
64//!
65//! // Create layer with custom cache
66//! let cache_layer = HttpCacheLayer::with_cache(cache);
67//! # }
68//! ```
69//!
70//! ### Streaming Support
71//!
72//! For handling large responses without buffering, use `StreamingManager`:
73//!
74//! ```rust,ignore
75//! use http_cache_tower::HttpCacheStreamingLayer;
76//! use http_cache::StreamingManager;
77//!
78//! # #[tokio::main]
79//! # async fn main() {
80//! // Create streaming cache setup
81//! let streaming_manager = StreamingManager::with_temp_dir(1000).await.unwrap();
82//! let streaming_layer = HttpCacheStreamingLayer::new(streaming_manager);
83//!
84//! // Use with your service
85//! // let service = streaming_layer.layer(your_service);
86//! # }
87//! ```
88//!
89//! ## Cache Modes
90//!
91//! Different cache modes provide different behaviors:
92//!
93//! - `CacheMode::Default`: Follow HTTP caching rules strictly
94//! - `CacheMode::NoStore`: Never cache responses
95//! - `CacheMode::NoCache`: Always revalidate with the origin server
96//! - `CacheMode::ForceCache`: Cache responses even if headers suggest otherwise
97//! - `CacheMode::OnlyIfCached`: Only serve from cache, never hit origin server
98//! - `CacheMode::IgnoreRules`: Cache everything regardless of headers
99//!
100//! ## Cache Invalidation
101//!
102//! The middleware automatically handles cache invalidation for unsafe HTTP methods:
103//!
104//! ```text
105//! These methods will invalidate any cached GET response for the same URI:
106//! - PUT /api/users/123 -> invalidates GET /api/users/123
107//! - POST /api/users/123 -> invalidates GET /api/users/123
108//! - DELETE /api/users/123 -> invalidates GET /api/users/123
109//! - PATCH /api/users/123 -> invalidates GET /api/users/123
110//! ```
111//!
112//! ## Integration with Other Tower Layers
113//!
114//! The cache layer works with other Tower middleware:
115//!
116//! ```rust,no_run
117//! use tower::ServiceBuilder;
118//! use http_cache_tower::{HttpCacheLayer, CACacheManager};
119//! use tower::service_fn;
120//! use tower::ServiceExt;
121//! use http::{Request, Response};
122//! use http_body_util::Full;
123//! use bytes::Bytes;
124//! use std::convert::Infallible;
125//!
126//! async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
127//! Ok(Response::new(Full::new(Bytes::from("Hello, World!"))))
128//! }
129//!
130//! #[tokio::main]
131//! async fn main() {
132//! let cache_manager = CACacheManager::new("./cache".into(), true);
133//! let cache_layer = HttpCacheLayer::new(cache_manager);
134//!
135//! let service = ServiceBuilder::new()
136//! // .layer(TraceLayer::new_for_http()) // Logging (requires tower-http)
137//! // .layer(CompressionLayer::new()) // Compression (requires tower-http)
138//! .layer(cache_layer) // Caching
139//! .service_fn(handler);
140//!
141//! // Use the service
142//! let request = Request::builder()
143//! .uri("http://example.com")
144//! .body(Full::new(Bytes::new()))
145//! .unwrap();
146//! let response = service.oneshot(request).await.unwrap();
147//! }
148//! ```
149
150use bytes::Bytes;
151use http::{
152 header::CACHE_CONTROL, request, HeaderValue, Method, Request, Response,
153};
154use http_body::Body;
155use http_body_util::BodyExt;
156
157#[cfg(feature = "manager-cacache")]
158pub use http_cache::CACacheManager;
159
160#[cfg(feature = "rate-limiting")]
161pub use http_cache::rate_limiting::{
162 CacheAwareRateLimiter, DirectRateLimiter, DomainRateLimiter, Quota,
163};
164#[cfg(feature = "streaming")]
165use http_cache::StreamingError;
166use http_cache::{
167 url_parse, BoxError, CacheManager, CacheMode, CacheOptions, HitOrMiss,
168 HttpCache, HttpCacheOptions, HttpResponse, Middleware, Url, XCACHE,
169 XCACHELOOKUP,
170};
171#[cfg(feature = "streaming")]
172use http_cache::{HttpStreamingCache, StreamingCacheManager};
173use http_cache_semantics::CachePolicy;
174use std::{
175 pin::Pin,
176 sync::Arc,
177 task::{Context, Poll},
178 time::SystemTime,
179};
180use tower::{Layer, Service, ServiceExt};
181
182// Re-export unified error types from http-cache core
183pub use http_cache::HttpCacheError;
184
185#[cfg(feature = "streaming")]
186/// Type alias for tower streaming errors, using the unified streaming error system
187pub type TowerStreamingError = http_cache::ClientStreamingError;
188
189/// Helper functions for error conversions
190trait HttpCacheErrorExt<T> {
191 fn cache_err(self) -> Result<T, HttpCacheError>;
192}
193
194impl<T, E> HttpCacheErrorExt<T> for Result<T, E>
195where
196 E: ToString,
197{
198 fn cache_err(self) -> Result<T, HttpCacheError> {
199 self.map_err(|e| HttpCacheError::cache(e.to_string()))
200 }
201}
202
203/// Helper function to add cache status headers to a response
204fn add_cache_status_headers<B>(
205 mut response: Response<HttpCacheBody<B>>,
206 hit_or_miss: &str,
207 cache_lookup: &str,
208) -> Response<HttpCacheBody<B>> {
209 let headers = response.headers_mut();
210 if let Ok(hv) = HeaderValue::from_str(hit_or_miss) {
211 headers.insert(XCACHE, hv);
212 }
213 if let Ok(hv) = HeaderValue::from_str(cache_lookup) {
214 headers.insert(XCACHELOOKUP, hv);
215 }
216 response
217}
218
219/// Middleware adapter that bridges Tower services to the `http_cache::Middleware`
220/// trait, allowing `HttpCache::run` to drive the full cache flow (mode dispatch,
221/// conditional revalidation, 5xx handling, warning headers, etc.) instead of
222/// reimplementing it inline.
223struct TowerMiddleware<S, ReqBody> {
224 parts: request::Parts,
225 body: Option<ReqBody>,
226 service: Option<S>,
227}
228
229impl<S, ReqBody, ResBody> Middleware for TowerMiddleware<S, ReqBody>
230where
231 S: Service<Request<ReqBody>, Response = Response<ResBody>>
232 + Clone
233 + Send
234 + 'static,
235 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
236 S::Future: Send + 'static,
237 ReqBody: Body + Send + 'static,
238 ReqBody::Data: Send,
239 ReqBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
240 ResBody: Body + Send + 'static,
241 ResBody::Data: Send,
242 ResBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
243{
244 fn is_method_get_head(&self) -> bool {
245 self.parts.method == Method::GET || self.parts.method == Method::HEAD
246 }
247
248 fn policy(
249 &self,
250 response: &HttpResponse,
251 ) -> http_cache::Result<CachePolicy> {
252 Ok(CachePolicy::new(&self.parts, &response.parts()?))
253 }
254
255 fn policy_with_options(
256 &self,
257 response: &HttpResponse,
258 options: CacheOptions,
259 ) -> http_cache::Result<CachePolicy> {
260 Ok(CachePolicy::new_options(
261 &self.parts,
262 &response.parts()?,
263 SystemTime::now(),
264 options,
265 ))
266 }
267
268 fn update_headers(
269 &mut self,
270 parts: &request::Parts,
271 ) -> http_cache::Result<()> {
272 for (name, value) in parts.headers.iter() {
273 self.parts.headers.insert(name.clone(), value.clone());
274 }
275 Ok(())
276 }
277
278 fn force_no_cache(&mut self) -> http_cache::Result<()> {
279 self.parts
280 .headers
281 .insert(CACHE_CONTROL, HeaderValue::from_static("no-cache"));
282 Ok(())
283 }
284
285 fn parts(&self) -> http_cache::Result<request::Parts> {
286 Ok(self.parts.clone())
287 }
288
289 fn url(&self) -> http_cache::Result<Url> {
290 url_parse(self.parts.uri.to_string().as_str())
291 }
292
293 fn method(&self) -> http_cache::Result<String> {
294 Ok(self.parts.method.as_ref().to_string())
295 }
296
297 async fn remote_fetch(&mut self) -> http_cache::Result<HttpResponse> {
298 let body = self
299 .body
300 .take()
301 .ok_or_else(|| BoxError::from("request body already consumed"))?;
302 let service = self
303 .service
304 .take()
305 .ok_or_else(|| BoxError::from("inner service already consumed"))?;
306
307 let request = Request::from_parts(self.parts.clone(), body);
308 let response = service.oneshot(request).await.map_err(|e| {
309 let boxed: Box<dyn std::error::Error + Send + Sync> = e.into();
310 boxed
311 })?;
312
313 let (res_parts, res_body) = response.into_parts();
314 let collected = BodyExt::collect(res_body).await.map_err(|e| {
315 let boxed: Box<dyn std::error::Error + Send + Sync> = e.into();
316 boxed
317 })?;
318 let body_bytes = collected.to_bytes().to_vec();
319
320 let url = url_parse(self.parts.uri.to_string().as_str())?;
321 let headers = (&res_parts.headers).into();
322 let status = res_parts.status.as_u16();
323 let version = res_parts.version.try_into()?;
324
325 Ok(HttpResponse {
326 body: body_bytes,
327 headers,
328 status,
329 url,
330 version,
331 metadata: None,
332 })
333 }
334}
335
336/// Convert an [`HttpResponse`] from the cache core into a Tower
337/// `Response<HttpCacheBody<B>>`.
338fn http_response_to_tower_response<B>(
339 http_response: HttpResponse,
340) -> Result<Response<HttpCacheBody<B>>, HttpCacheError> {
341 let mut response = HttpCacheOptions::http_response_to_response(
342 &http_response,
343 HttpCacheBody::Buffered(http_response.body.clone()),
344 )
345 .map_err(HttpCacheError::other)?;
346
347 // Preserve metadata in response extensions
348 if let Some(metadata) = http_response.metadata {
349 response
350 .extensions_mut()
351 .insert(http_cache::HttpCacheMetadata::from(metadata));
352 }
353
354 Ok(response)
355}
356
357#[cfg(feature = "streaming")]
358fn add_cache_status_headers_streaming<B>(
359 mut response: Response<B>,
360 hit_or_miss: &str,
361 cache_lookup: &str,
362) -> Response<B> {
363 let headers = response.headers_mut();
364 if let Ok(hv) = HeaderValue::from_str(hit_or_miss) {
365 headers.insert(XCACHE, hv);
366 }
367 if let Ok(hv) = HeaderValue::from_str(cache_lookup) {
368 headers.insert(XCACHELOOKUP, hv);
369 }
370 response
371}
372
373/// HTTP cache layer for Tower services.
374///
375/// This layer implements HTTP caching according to RFC 7234, automatically caching
376/// GET and HEAD responses based on their cache-control headers and invalidating
377/// cache entries when unsafe methods (PUT, POST, DELETE, PATCH) are used.
378///
379/// # Example
380///
381/// ```rust
382/// use http_cache_tower::{HttpCacheLayer, CACacheManager};
383/// use tower::ServiceBuilder;
384/// use tower::service_fn;
385/// use http::{Request, Response};
386/// use http_body_util::Full;
387/// use bytes::Bytes;
388/// use std::convert::Infallible;
389///
390/// # #[tokio::main]
391/// # async fn main() {
392/// let cache_manager = CACacheManager::new("./cache".into(), true);
393/// let cache_layer = HttpCacheLayer::new(cache_manager);
394///
395/// // Use with ServiceBuilder
396/// let service = ServiceBuilder::new()
397/// .layer(cache_layer)
398/// .service_fn(|_req: Request<Full<Bytes>>| async {
399/// Ok::<_, Infallible>(Response::new(Full::new(Bytes::from("Hello"))))
400/// });
401/// # }
402/// ```
403#[derive(Clone)]
404pub struct HttpCacheLayer<CM>
405where
406 CM: CacheManager,
407{
408 cache: Arc<HttpCache<CM>>,
409}
410
411impl<CM> HttpCacheLayer<CM>
412where
413 CM: CacheManager,
414{
415 /// Create a new HTTP cache layer with default configuration.
416 ///
417 /// Uses [`CacheMode::Default`] and default [`HttpCacheOptions`].
418 ///
419 /// # Arguments
420 ///
421 /// * `cache_manager` - The cache manager to use for storing responses
422 ///
423 /// # Example
424 ///
425 /// ```rust
426 /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
427 ///
428 /// # #[tokio::main]
429 /// # async fn main() {
430 /// let cache_manager = CACacheManager::new("./cache".into(), true);
431 /// let layer = HttpCacheLayer::new(cache_manager);
432 /// # }
433 /// ```
434 pub fn new(cache_manager: CM) -> Self {
435 Self {
436 cache: Arc::new(HttpCache {
437 mode: CacheMode::Default,
438 manager: cache_manager,
439 options: HttpCacheOptions::default(),
440 }),
441 }
442 }
443
444 /// Create a new HTTP cache layer with custom options.
445 ///
446 /// Uses [`CacheMode::Default`] but allows customizing the cache behavior
447 /// through [`HttpCacheOptions`].
448 ///
449 /// # Arguments
450 ///
451 /// * `cache_manager` - The cache manager to use for storing responses
452 /// * `options` - Custom cache options
453 ///
454 /// # Example
455 ///
456 /// ```rust
457 /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
458 /// use http_cache::HttpCacheOptions;
459 ///
460 /// # #[tokio::main]
461 /// # async fn main() {
462 /// let cache_manager = CACacheManager::new("./cache".into(), true);
463 ///
464 /// let options = HttpCacheOptions {
465 /// cache_key: Some(std::sync::Arc::new(|req: &http::request::Parts| {
466 /// format!("custom:{}:{}", req.method, req.uri)
467 /// })),
468 /// ..Default::default()
469 /// };
470 ///
471 /// let layer = HttpCacheLayer::with_options(cache_manager, options);
472 /// # }
473 /// ```
474 pub fn with_options(cache_manager: CM, options: HttpCacheOptions) -> Self {
475 Self {
476 cache: Arc::new(HttpCache {
477 mode: CacheMode::Default,
478 manager: cache_manager,
479 options,
480 }),
481 }
482 }
483
484 /// Create a new HTTP cache layer with a pre-configured cache.
485 ///
486 /// This method gives you full control over the cache configuration,
487 /// including the cache mode.
488 ///
489 /// # Arguments
490 ///
491 /// * `cache` - A fully configured HttpCache instance
492 ///
493 /// # Example
494 ///
495 /// ```rust
496 /// use http_cache_tower::{HttpCacheLayer, CACacheManager};
497 /// use http_cache::{HttpCache, CacheMode, HttpCacheOptions};
498 ///
499 /// # #[tokio::main]
500 /// # async fn main() {
501 /// let cache_manager = CACacheManager::new("./cache".into(), true);
502 ///
503 /// let cache = HttpCache {
504 /// mode: CacheMode::ForceCache,
505 /// manager: cache_manager,
506 /// options: HttpCacheOptions::default(),
507 /// };
508 ///
509 /// let layer = HttpCacheLayer::with_cache(cache);
510 /// # }
511 /// ```
512 pub fn with_cache(cache: HttpCache<CM>) -> Self {
513 Self { cache: Arc::new(cache) }
514 }
515}
516
517/// HTTP cache layer with streaming support for Tower services.
518///
519/// This layer provides the same HTTP caching functionality as [`HttpCacheLayer`]
520/// but handles streaming responses. It can work with large
521/// responses without buffering them entirely in memory.
522///
523/// # Example
524///
525/// ```rust
526/// use http_cache_tower::HttpCacheStreamingLayer;
527/// use http_cache::StreamingManager;
528/// use tower::ServiceBuilder;
529/// use tower::service_fn;
530/// use http::{Request, Response};
531/// use http_body_util::Full;
532/// use bytes::Bytes;
533/// use std::convert::Infallible;
534///
535/// async fn handler(_req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
536/// Ok(Response::new(Full::new(Bytes::from("Hello"))))
537/// }
538///
539/// # #[tokio::main]
540/// # async fn main() {
541/// let streaming_manager = StreamingManager::with_temp_dir(1000).await.unwrap();
542/// let streaming_layer = HttpCacheStreamingLayer::new(streaming_manager);
543///
544/// // Use with ServiceBuilder
545/// let service = ServiceBuilder::new()
546/// .layer(streaming_layer)
547/// .service_fn(handler);
548/// # }
549/// ```
550#[cfg(feature = "streaming")]
551#[derive(Clone)]
552pub struct HttpCacheStreamingLayer<CM>
553where
554 CM: StreamingCacheManager,
555{
556 cache: Arc<HttpStreamingCache<CM>>,
557}
558
559#[cfg(feature = "streaming")]
560impl<CM> HttpCacheStreamingLayer<CM>
561where
562 CM: StreamingCacheManager,
563{
564 /// Create a new HTTP cache streaming layer with default configuration.
565 ///
566 /// Uses [`CacheMode::Default`] and default [`HttpCacheOptions`].
567 ///
568 /// # Arguments
569 ///
570 /// * `cache_manager` - The streaming cache manager to use
571 ///
572 /// # Example
573 ///
574 /// ```rust
575 /// use http_cache_tower::HttpCacheStreamingLayer;
576 /// use http_cache::StreamingManager;
577 ///
578 /// # #[tokio::main]
579 /// # async fn main() {
580 /// let streaming_manager = StreamingManager::with_temp_dir(1000).await.unwrap();
581 /// let layer = HttpCacheStreamingLayer::new(streaming_manager);
582 /// # }
583 /// ```
584 pub fn new(cache_manager: CM) -> Self {
585 Self {
586 cache: Arc::new(HttpStreamingCache {
587 mode: CacheMode::Default,
588 manager: cache_manager,
589 options: HttpCacheOptions::default(),
590 }),
591 }
592 }
593
594 /// Create a new HTTP cache streaming layer with custom options.
595 ///
596 /// Uses [`CacheMode::Default`] but allows customizing cache behavior.
597 ///
598 /// # Arguments
599 ///
600 /// * `cache_manager` - The streaming cache manager to use
601 /// * `options` - Custom cache options
602 ///
603 /// # Example
604 ///
605 /// ```rust
606 /// use http_cache_tower::HttpCacheStreamingLayer;
607 /// use http_cache::{StreamingManager, HttpCacheOptions};
608 ///
609 /// # #[tokio::main]
610 /// # async fn main() {
611 /// let streaming_manager = StreamingManager::with_temp_dir(1000).await.unwrap();
612 ///
613 /// let options = HttpCacheOptions {
614 /// cache_key: Some(std::sync::Arc::new(|req: &http::request::Parts| {
615 /// format!("stream:{}:{}", req.method, req.uri)
616 /// })),
617 /// ..Default::default()
618 /// };
619 ///
620 /// let layer = HttpCacheStreamingLayer::with_options(streaming_manager, options);
621 /// # }
622 /// ```
623 pub fn with_options(cache_manager: CM, options: HttpCacheOptions) -> Self {
624 Self {
625 cache: Arc::new(HttpStreamingCache {
626 mode: CacheMode::Default,
627 manager: cache_manager,
628 options,
629 }),
630 }
631 }
632
633 /// Create a new HTTP cache streaming layer with a pre-configured cache.
634 ///
635 /// This method gives you full control over the streaming cache configuration.
636 ///
637 /// # Arguments
638 ///
639 /// * `cache` - A fully configured HttpStreamingCache instance
640 ///
641 /// # Example
642 ///
643 /// ```rust
644 /// use http_cache_tower::HttpCacheStreamingLayer;
645 /// use http_cache::{StreamingManager, HttpStreamingCache, CacheMode, HttpCacheOptions};
646 ///
647 /// # #[tokio::main]
648 /// # async fn main() {
649 /// let streaming_manager = StreamingManager::with_temp_dir(1000).await.unwrap();
650 ///
651 /// let cache = HttpStreamingCache {
652 /// mode: CacheMode::ForceCache,
653 /// manager: streaming_manager,
654 /// options: HttpCacheOptions::default(),
655 /// };
656 ///
657 /// let layer = HttpCacheStreamingLayer::with_cache(cache);
658 /// # }
659 /// ```
660 pub fn with_cache(cache: HttpStreamingCache<CM>) -> Self {
661 Self { cache: Arc::new(cache) }
662 }
663}
664
665impl<S, CM> Layer<S> for HttpCacheLayer<CM>
666where
667 CM: CacheManager,
668{
669 type Service = HttpCacheService<S, CM>;
670
671 fn layer(&self, inner: S) -> Self::Service {
672 HttpCacheService { inner, cache: self.cache.clone() }
673 }
674}
675
676#[cfg(feature = "streaming")]
677impl<S, CM> Layer<S> for HttpCacheStreamingLayer<CM>
678where
679 CM: StreamingCacheManager,
680{
681 type Service = HttpCacheStreamingService<S, CM>;
682
683 fn layer(&self, inner: S) -> Self::Service {
684 HttpCacheStreamingService { inner, cache: self.cache.clone() }
685 }
686}
687
688/// HTTP cache service for Tower/Hyper
689pub struct HttpCacheService<S, CM>
690where
691 CM: CacheManager,
692{
693 inner: S,
694 cache: Arc<HttpCache<CM>>,
695}
696
697impl<S, CM> Clone for HttpCacheService<S, CM>
698where
699 S: Clone,
700 CM: CacheManager,
701{
702 fn clone(&self) -> Self {
703 Self { inner: self.inner.clone(), cache: self.cache.clone() }
704 }
705}
706
707/// HTTP cache streaming service for Tower/Hyper
708#[cfg(feature = "streaming")]
709pub struct HttpCacheStreamingService<S, CM>
710where
711 CM: StreamingCacheManager,
712{
713 inner: S,
714 cache: Arc<HttpStreamingCache<CM>>,
715}
716
717#[cfg(feature = "streaming")]
718impl<S, CM> Clone for HttpCacheStreamingService<S, CM>
719where
720 S: Clone,
721 CM: StreamingCacheManager,
722{
723 fn clone(&self) -> Self {
724 Self { inner: self.inner.clone(), cache: self.cache.clone() }
725 }
726}
727
728impl<S, CM, ReqBody, ResBody> Service<Request<ReqBody>>
729 for HttpCacheService<S, CM>
730where
731 S: Service<Request<ReqBody>, Response = Response<ResBody>>
732 + Clone
733 + Send
734 + 'static,
735 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
736 S::Future: Send + 'static,
737 ReqBody: Body + Send + 'static,
738 ReqBody::Data: Send,
739 ReqBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
740 ResBody: Body + Send + 'static,
741 ResBody::Data: Send,
742 ResBody::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
743 CM: CacheManager,
744{
745 type Response = Response<HttpCacheBody<ResBody>>;
746 type Error = HttpCacheError;
747 type Future = Pin<
748 Box<
749 dyn std::future::Future<
750 Output = Result<Self::Response, Self::Error>,
751 > + Send,
752 >,
753 >;
754
755 fn poll_ready(
756 &mut self,
757 cx: &mut Context<'_>,
758 ) -> Poll<Result<(), Self::Error>> {
759 self.inner.poll_ready(cx).map_err(|e| HttpCacheError::http(e.into()))
760 }
761
762 fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
763 let cache = self.cache.clone();
764 let (parts, body) = req.into_parts();
765 let inner_service = self.inner.clone();
766
767 Box::pin(async move {
768 let middleware = TowerMiddleware {
769 parts: parts.clone(),
770 body: Some(body),
771 service: Some(inner_service),
772 };
773
774 let can_cache = cache.can_cache_request(&middleware).cache_err()?;
775
776 if can_cache {
777 // Delegate the full cache orchestration (mode dispatch,
778 // conditional revalidation, 304/5xx handling, warning
779 // headers, rate limiting, cache busting) to the core.
780 let res = cache.run(middleware).await.cache_err()?;
781 http_response_to_tower_response(res)
782 } else {
783 // Not cacheable -- forward directly, then invalidate on
784 // success (RFC 7234 Section 4.4).
785 let parts_for_invalidation = middleware.parts().cache_err()?;
786
787 // Reconstruct the request from the middleware's parts.
788 let body = middleware.body.ok_or_else(|| {
789 HttpCacheError::cache(
790 "request body already consumed".to_string(),
791 )
792 })?;
793 let service = middleware.service.ok_or_else(|| {
794 HttpCacheError::cache(
795 "inner service already consumed".to_string(),
796 )
797 })?;
798 let req = Request::from_parts(parts, body);
799
800 let response = service.oneshot(req).await.map_err(|e| {
801 let boxed: Box<dyn std::error::Error + Send + Sync> =
802 e.into();
803 HttpCacheError::http(boxed)
804 })?;
805
806 // Only invalidate for unsafe methods after successful response (RFC 7234 s4.4)
807 if !parts_for_invalidation.method.is_safe()
808 && (response.status().is_success()
809 || response.status().is_redirection())
810 {
811 cache
812 .run_no_cache_from_parts(&parts_for_invalidation)
813 .await
814 .cache_err()?;
815 }
816
817 let mut response = response.map(HttpCacheBody::Original);
818
819 if cache.options.cache_status_headers {
820 response = add_cache_status_headers(
821 response,
822 HitOrMiss::MISS.to_string().as_ref(),
823 HitOrMiss::MISS.to_string().as_ref(),
824 );
825 }
826
827 Ok(response)
828 }
829 })
830 }
831}
832
833// Hyper service implementation for HttpCacheService
834impl<S, CM> hyper::service::Service<Request<hyper::body::Incoming>>
835 for HttpCacheService<S, CM>
836where
837 S: Service<
838 Request<hyper::body::Incoming>,
839 Response = Response<http_body_util::Full<Bytes>>,
840 > + Clone
841 + Send
842 + 'static,
843 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
844 S::Future: Send + 'static,
845 CM: CacheManager,
846{
847 type Response = Response<HttpCacheBody<http_body_util::Full<Bytes>>>;
848 type Error = HttpCacheError;
849 type Future = Pin<
850 Box<
851 dyn std::future::Future<
852 Output = Result<Self::Response, Self::Error>,
853 > + Send,
854 >,
855 >;
856
857 fn call(&self, req: Request<hyper::body::Incoming>) -> Self::Future {
858 // Delegate to the Tower Service impl, which takes &mut self
859 let mut service_clone = self.clone();
860 Box::pin(
861 async move { tower::Service::call(&mut service_clone, req).await },
862 )
863 }
864}
865
866#[cfg(feature = "streaming")]
867impl<S, CM, ReqBody, ResBody> Service<Request<ReqBody>>
868 for HttpCacheStreamingService<S, CM>
869where
870 S: Service<Request<ReqBody>, Response = Response<ResBody>>
871 + Clone
872 + Send
873 + 'static,
874 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
875 S::Future: Send + 'static,
876 ReqBody: Body + Send + 'static,
877 ReqBody::Data: Send,
878 ReqBody::Error: Into<StreamingError>,
879 ResBody: Body + Send + 'static,
880 ResBody::Data: Send,
881 ResBody::Error: Into<StreamingError>,
882 CM: StreamingCacheManager,
883 <CM::Body as http_body::Body>::Data: Send,
884 <CM::Body as http_body::Body>::Error:
885 Into<StreamingError> + Send + Sync + 'static,
886{
887 type Response = Response<CM::Body>;
888 type Error = HttpCacheError;
889 type Future = Pin<
890 Box<
891 dyn std::future::Future<
892 Output = Result<Self::Response, Self::Error>,
893 > + Send,
894 >,
895 >;
896
897 fn poll_ready(
898 &mut self,
899 cx: &mut Context<'_>,
900 ) -> Poll<Result<(), Self::Error>> {
901 self.inner.poll_ready(cx).map_err(|e| HttpCacheError::http(e.into()))
902 }
903
904 fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
905 let cache = self.cache.clone();
906 let (parts, body) = req.into_parts();
907 let inner_service = self.inner.clone();
908
909 Box::pin(async move {
910 // Check whether this request is cacheable. Non-cacheable
911 // requests (e.g. POST/PUT/DELETE) are forwarded directly and
912 // only trigger cache invalidation on success.
913 let can_cache =
914 cache.can_cache_request(&parts, None).cache_err()?;
915
916 if !can_cache {
917 // Forward the request without cache orchestration.
918 let req = Request::from_parts(parts.clone(), body);
919 let response =
920 inner_service.oneshot(req).await.map_err(|e| {
921 let boxed: Box<dyn std::error::Error + Send + Sync> =
922 e.into();
923 HttpCacheError::http(boxed)
924 })?;
925
926 // Only invalidate for unsafe methods after successful response (RFC 7234 s4.4)
927 if !parts.method.is_safe()
928 && (response.status().is_success()
929 || response.status().is_redirection())
930 {
931 cache.run_no_cache(&parts).await.cache_err()?;
932 }
933
934 let mut converted =
935 cache.manager.convert_body(response).await.cache_err()?;
936
937 if cache.options.cache_status_headers {
938 converted = add_cache_status_headers_streaming(
939 converted, "MISS", "MISS",
940 );
941 }
942
943 return Ok(converted);
944 }
945
946 // Delegate the full cache orchestration (analyse, lookup,
947 // conditional revalidation, 304/200/5xx handling, rate
948 // limiting, warning headers, cache busting) to the core
949 // library.
950 //
951 // The closure is `FnOnce` and called at most once.
952 // We move `body` and `inner_service` directly into the
953 // closure.
954 let result = cache
955 .run(&parts, None, |fetch_req| {
956 let parts_ref = parts.clone();
957 async move {
958 let request_parts = match fetch_req {
959 http_cache::FetchRequest::Fresh => parts_ref,
960 http_cache::FetchRequest::FreshNoCache => {
961 let mut p = parts_ref;
962 p.headers.insert(
963 CACHE_CONTROL,
964 HeaderValue::from_static("no-cache"),
965 );
966 p
967 }
968 http_cache::FetchRequest::Conditional(
969 cond_parts,
970 ) => *cond_parts,
971 };
972
973 let req = Request::from_parts(request_parts, body);
974
975 inner_service.oneshot(req).await.map_err(|e| {
976 let boxed: Box<
977 dyn std::error::Error + Send + Sync,
978 > = e.into();
979 boxed
980 })
981 }
982 })
983 .await
984 .cache_err()?;
985
986 Ok(result)
987 })
988 }
989}
990
991/// Body type that wraps cached responses
992pub enum HttpCacheBody<B> {
993 /// Buffered body from cache
994 Buffered(Vec<u8>),
995 /// Original body (fallback)
996 Original(B),
997}
998
999impl<B> Body for HttpCacheBody<B>
1000where
1001 B: Body + Unpin,
1002 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
1003 B::Data: Into<bytes::Bytes>,
1004{
1005 type Data = bytes::Bytes;
1006 type Error = Box<dyn std::error::Error + Send + Sync>;
1007
1008 fn poll_frame(
1009 mut self: Pin<&mut Self>,
1010 cx: &mut Context<'_>,
1011 ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
1012 match &mut *self {
1013 HttpCacheBody::Buffered(bytes) => {
1014 if bytes.is_empty() {
1015 Poll::Ready(None)
1016 } else {
1017 let data = std::mem::take(bytes);
1018 Poll::Ready(Some(Ok(http_body::Frame::data(
1019 bytes::Bytes::from(data),
1020 ))))
1021 }
1022 }
1023 HttpCacheBody::Original(body) => {
1024 Pin::new(body).poll_frame(cx).map(|opt| {
1025 opt.map(|res| {
1026 res.map(|frame| frame.map_data(Into::into))
1027 .map_err(Into::into)
1028 })
1029 })
1030 }
1031 }
1032 }
1033
1034 fn is_end_stream(&self) -> bool {
1035 match self {
1036 HttpCacheBody::Buffered(bytes) => bytes.is_empty(),
1037 HttpCacheBody::Original(body) => body.is_end_stream(),
1038 }
1039 }
1040
1041 fn size_hint(&self) -> http_body::SizeHint {
1042 match self {
1043 HttpCacheBody::Buffered(bytes) => {
1044 let len = bytes.len() as u64;
1045 http_body::SizeHint::with_exact(len)
1046 }
1047 HttpCacheBody::Original(body) => body.size_hint(),
1048 }
1049 }
1050}
1051
1052#[cfg(test)]
1053mod test;