http_cache_reqwest/lib.rs
1#![forbid(unsafe_code, future_incompatible)]
2#![deny(
3 missing_docs,
4 missing_debug_implementations,
5 missing_copy_implementations,
6 nonstandard_style,
7 unused_qualifications,
8 unused_import_braces,
9 unused_extern_crates,
10 trivial_casts,
11 trivial_numeric_casts
12)]
13#![allow(clippy::doc_lazy_continuation)]
14#![cfg_attr(docsrs, feature(doc_cfg))]
15//! # http-cache-reqwest
16//!
17//! HTTP caching middleware for the [reqwest] HTTP client.
18//!
19//! This middleware implements HTTP caching according to RFC 7234 for the reqwest HTTP client library.
20//! It works as part of the [reqwest-middleware] ecosystem to provide caching capabilities.
21//!
22//! ```no_run
23//! use reqwest::Client;
24//! use reqwest_middleware::{ClientBuilder, Result};
25//! use http_cache_reqwest::{Cache, CacheMode, CACacheManager, HttpCache, HttpCacheOptions};
26//!
27//! #[tokio::main]
28//! async fn main() -> Result<()> {
29//! let client = ClientBuilder::new(Client::new())
30//! .with(Cache(HttpCache {
31//! mode: CacheMode::Default,
32//! manager: CACacheManager::new("./cache".into(), true),
33//! options: HttpCacheOptions::default(),
34//! }))
35//! .build();
36//!
37//! // This request will be cached according to response headers
38//! let response = client
39//! .get("https://developer.mozilla.org/en-US/docs/Web/HTTP/Caching")
40//! .send()
41//! .await?;
42//! println!("Status: {}", response.status());
43//!
44//! // Subsequent identical requests may be served from cache
45//! let cached_response = client
46//! .get("https://developer.mozilla.org/en-US/docs/Web/HTTP/Caching")
47//! .send()
48//! .await?;
49//! println!("Cached status: {}", cached_response.status());
50//!
51//! Ok(())
52//! }
53//! ```
54//!
55//! ## Streaming Support
56//!
57//! The `StreamingCache` provides streaming support for large responses without buffering
58//! them entirely in memory. This is particularly useful for downloading large files or
59//! processing streaming APIs while still benefiting from HTTP caching.
60//!
61//! **Note**: Requires the `streaming` feature and a compatible cache manager that implements
62//! [`StreamingCacheManager`]. Currently only the `StreamingCacheManager` supports streaming -
63//! `CACacheManager` and `MokaManager` do not support streaming and will buffer responses
64//! in memory. The streaming implementation achieves significant memory savings
65//! (typically 35-40% reduction) compared to traditional buffered approaches.
66//!
67//! ```no_run
68//! # #[cfg(feature = "streaming")]
69//! use reqwest::Client;
70//! # #[cfg(feature = "streaming")]
71//! use reqwest_middleware::ClientBuilder;
72//! # #[cfg(feature = "streaming")]
73//! use http_cache_reqwest::{StreamingCache, CacheMode};
74//! # #[cfg(feature = "streaming")]
75//! use http_cache::StreamingManager;
76//!
77//! # #[cfg(feature = "streaming")]
78//! #[tokio::main]
79//! async fn main() -> reqwest_middleware::Result<()> {
80//! let client = ClientBuilder::new(Client::new())
81//! .with(StreamingCache::new(
82//! StreamingManager::new("./cache".into()),
83//! CacheMode::Default,
84//! ))
85//! .build();
86//!
87//! // Stream large responses efficiently - cached responses are also streamed
88//! let response = client
89//! .get("https://httpbin.org/stream/1000")
90//! .send()
91//! .await?;
92//! println!("Status: {}", response.status());
93//!
94//! // Process the streaming body chunk by chunk
95//! use futures_util::StreamExt;
96//! let mut stream = response.bytes_stream();
97//! while let Some(chunk) = stream.next().await {
98//! let chunk = chunk?;
99//! println!("Received chunk of {} bytes", chunk.len());
100//! // Process chunk without loading entire response into memory
101//! }
102//!
103//! Ok(())
104//! }
105//! # #[cfg(not(feature = "streaming"))]
106//! # fn main() {}
107//! ```
108//!
109//! ### Streaming Cache with Custom Options
110//!
111//! ```no_run
112//! # #[cfg(feature = "streaming")]
113//! use reqwest::Client;
114//! # #[cfg(feature = "streaming")]
115//! use reqwest_middleware::ClientBuilder;
116//! # #[cfg(feature = "streaming")]
117//! use http_cache_reqwest::{StreamingCache, CacheMode, HttpCacheOptions};
118//! # #[cfg(feature = "streaming")]
119//! use http_cache::StreamingManager;
120//!
121//! # #[cfg(feature = "streaming")]
122//! #[tokio::main]
123//! async fn main() -> reqwest_middleware::Result<()> {
124//! let options = HttpCacheOptions {
125//! cache_bust: Some(std::sync::Arc::new(|req: &http::request::Parts, _cache_key: &Option<std::sync::Arc<dyn Fn(&http::request::Parts) -> String + Send + Sync>>, _uri: &str| {
126//! // Custom cache busting logic for streaming requests
127//! if req.uri.path().contains("/stream/") {
128//! vec![format!("stream:{}", req.uri)]
129//! } else {
130//! vec![]
131//! }
132//! })),
133//! ..Default::default()
134//! };
135//!
136//! let client = ClientBuilder::new(Client::new())
137//! .with(StreamingCache::with_options(
138//! StreamingManager::new("./cache".into()),
139//! CacheMode::Default,
140//! options,
141//! ))
142//! .build();
143//!
144//! Ok(())
145//! }
146//! # #[cfg(not(feature = "streaming"))]
147//! # fn main() {}
148//! ```
149//!
150//! ## Cache Modes
151//!
152//! Control caching behavior with different modes:
153//!
154//! ```no_run
155//! use reqwest::Client;
156//! use reqwest_middleware::ClientBuilder;
157//! use http_cache_reqwest::{Cache, CacheMode, CACacheManager, HttpCache, HttpCacheOptions};
158//!
159//! #[tokio::main]
160//! async fn main() -> reqwest_middleware::Result<()> {
161//! let client = ClientBuilder::new(Client::new())
162//! .with(Cache(HttpCache {
163//! mode: CacheMode::ForceCache, // Cache everything, ignore headers
164//! manager: CACacheManager::new("./cache".into(), true),
165//! options: HttpCacheOptions::default(),
166//! }))
167//! .build();
168//!
169//! // This will be cached even if headers say not to cache
170//! client.get("https://httpbin.org/uuid").send().await?;
171//! Ok(())
172//! }
173//! ```
174//!
175//! ## Per-Request Cache Control
176//!
177//! Override the cache mode on individual requests:
178//!
179//! ```no_run
180//! use reqwest::Client;
181//! use reqwest_middleware::ClientBuilder;
182//! use http_cache_reqwest::{Cache, CacheMode, CACacheManager, HttpCache, HttpCacheOptions};
183//!
184//! #[tokio::main]
185//! async fn main() -> reqwest_middleware::Result<()> {
186//! let client = ClientBuilder::new(Client::new())
187//! .with(Cache(HttpCache {
188//! mode: CacheMode::Default,
189//! manager: CACacheManager::new("./cache".into(), true),
190//! options: HttpCacheOptions::default(),
191//! }))
192//! .build();
193//!
194//! // Override cache mode for this specific request
195//! let response = client.get("https://httpbin.org/uuid")
196//! .with_extension(CacheMode::OnlyIfCached) // Only serve from cache
197//! .send()
198//! .await?;
199//!
200//! // This request bypasses cache completely
201//! let fresh_response = client.get("https://httpbin.org/uuid")
202//! .with_extension(CacheMode::NoStore)
203//! .send()
204//! .await?;
205//!
206//! Ok(())
207//! }
208//! ```
209//!
210//! ## Custom Cache Keys
211//!
212//! Customize how cache keys are generated:
213//!
214//! ```no_run
215//! use reqwest::Client;
216//! use reqwest_middleware::ClientBuilder;
217//! use http_cache_reqwest::{Cache, CacheMode, CACacheManager, HttpCache, HttpCacheOptions};
218//! use std::sync::Arc;
219//!
220//! #[tokio::main]
221//! async fn main() -> reqwest_middleware::Result<()> {
222//! let options = HttpCacheOptions {
223//! cache_key: Some(Arc::new(|req: &http::request::Parts| {
224//! // Include query parameters in cache key
225//! format!("{}:{}", req.method, req.uri)
226//! })),
227//! ..Default::default()
228//! };
229//!
230//! let client = ClientBuilder::new(Client::new())
231//! .with(Cache(HttpCache {
232//! mode: CacheMode::Default,
233//! manager: CACacheManager::new("./cache".into(), true),
234//! options,
235//! }))
236//! .build();
237//!
238//! Ok(())
239//! }
240//! ```
241//!
242//! ## In-Memory Caching
243//!
244//! Use the Moka in-memory cache:
245//!
246//! ```no_run
247//! # #[cfg(feature = "manager-moka")]
248//! use reqwest::Client;
249//! # #[cfg(feature = "manager-moka")]
250//! use reqwest_middleware::ClientBuilder;
251//! # #[cfg(feature = "manager-moka")]
252//! use http_cache_reqwest::{Cache, CacheMode, MokaManager, HttpCache, HttpCacheOptions};
253//! # #[cfg(feature = "manager-moka")]
254//! use http_cache_reqwest::MokaCache;
255//!
256//! # #[cfg(feature = "manager-moka")]
257//! #[tokio::main]
258//! async fn main() -> reqwest_middleware::Result<()> {
259//! let client = ClientBuilder::new(Client::new())
260//! .with(Cache(HttpCache {
261//! mode: CacheMode::Default,
262//! manager: MokaManager::new(MokaCache::new(1000)), // Max 1000 entries
263//! options: HttpCacheOptions::default(),
264//! }))
265//! .build();
266//!
267//! Ok(())
268//! }
269//! # #[cfg(not(feature = "manager-moka"))]
270//! # fn main() {}
271//! ```
272// Re-export unified error types from http-cache core
273pub use http_cache::{BadRequest, HttpCacheError};
274
275#[cfg(feature = "streaming")]
276/// Type alias for reqwest streaming errors, using the unified streaming error system
277pub type ReqwestStreamingError = http_cache::ClientStreamingError;
278
279#[cfg(feature = "streaming")]
280use http_cache::StreamingCacheManager;
281
282use std::{convert::TryInto, str::FromStr, time::SystemTime};
283
284pub use http::request::Parts;
285use http::{
286 header::{HeaderName, CACHE_CONTROL},
287 Extensions, HeaderValue, Method,
288};
289use http_cache::{
290 BoxError, HitOrMiss, Middleware, Result, XCACHE, XCACHELOOKUP,
291};
292use http_cache_semantics::CachePolicy;
293use reqwest::{Request, Response, ResponseBuilderExt};
294use reqwest_middleware::{Error, Next};
295
296/// Helper function to convert our error types to reqwest middleware errors
297fn to_middleware_error<E: std::error::Error + Send + Sync + 'static>(
298 error: E,
299) -> Error {
300 // Convert to anyhow::Error which is what reqwest-middleware expects
301 Error::Middleware(anyhow::Error::new(error))
302}
303use url::Url;
304
305pub use http_cache::{
306 CacheManager, CacheMode, CacheOptions, HttpCache, HttpCacheMetadata,
307 HttpCacheOptions, HttpResponse, MetadataProvider, ResponseCacheModeFn,
308};
309
310#[cfg(feature = "streaming")]
311// Re-export streaming types for future use
312pub use http_cache::{
313 HttpCacheStreamInterface, HttpStreamingCache, StreamingBody,
314 StreamingManager,
315};
316
317#[cfg(feature = "manager-cacache")]
318#[cfg_attr(docsrs, doc(cfg(feature = "manager-cacache")))]
319pub use http_cache::CACacheManager;
320
321#[cfg(feature = "manager-moka")]
322#[cfg_attr(docsrs, doc(cfg(feature = "manager-moka")))]
323pub use http_cache::{MokaCache, MokaCacheBuilder, MokaManager};
324
325#[cfg(feature = "rate-limiting")]
326#[cfg_attr(docsrs, doc(cfg(feature = "rate-limiting")))]
327pub use http_cache::rate_limiting::{
328 CacheAwareRateLimiter, DirectRateLimiter, DomainRateLimiter, Quota,
329};
330
331/// Wrapper for [`HttpCache`]
332#[derive(Debug)]
333pub struct Cache<T: CacheManager>(pub HttpCache<T>);
334
335#[cfg(feature = "streaming")]
336/// Streaming cache wrapper that implements reqwest middleware for streaming responses
337#[derive(Debug, Clone)]
338pub struct StreamingCache<T: StreamingCacheManager> {
339 cache: HttpStreamingCache<T>,
340}
341
342#[cfg(feature = "streaming")]
343impl<T: StreamingCacheManager> StreamingCache<T> {
344 /// Create a new streaming cache with the given manager and mode
345 pub fn new(manager: T, mode: CacheMode) -> Self {
346 Self {
347 cache: HttpStreamingCache {
348 mode,
349 manager,
350 options: HttpCacheOptions::default(),
351 },
352 }
353 }
354
355 /// Create a new streaming cache with custom options
356 pub fn with_options(
357 manager: T,
358 mode: CacheMode,
359 options: HttpCacheOptions,
360 ) -> Self {
361 Self { cache: HttpStreamingCache { mode, manager, options } }
362 }
363}
364
365/// Implements ['Middleware'] for reqwest
366pub(crate) struct ReqwestMiddleware<'a> {
367 pub req: Request,
368 pub next: Next<'a>,
369 pub extensions: &'a mut Extensions,
370}
371
372fn clone_req(request: &Request) -> std::result::Result<Request, Error> {
373 match request.try_clone() {
374 Some(r) => Ok(r),
375 None => Err(to_middleware_error(BadRequest)),
376 }
377}
378
379#[async_trait::async_trait]
380impl Middleware for ReqwestMiddleware<'_> {
381 fn overridden_cache_mode(&self) -> Option<CacheMode> {
382 self.extensions.get().cloned()
383 }
384 fn is_method_get_head(&self) -> bool {
385 self.req.method() == Method::GET || self.req.method() == Method::HEAD
386 }
387 fn policy(&self, response: &HttpResponse) -> Result<CachePolicy> {
388 Ok(CachePolicy::new(&self.parts()?, &response.parts()?))
389 }
390 fn policy_with_options(
391 &self,
392 response: &HttpResponse,
393 options: CacheOptions,
394 ) -> Result<CachePolicy> {
395 Ok(CachePolicy::new_options(
396 &self.parts()?,
397 &response.parts()?,
398 SystemTime::now(),
399 options,
400 ))
401 }
402 fn update_headers(&mut self, parts: &Parts) -> Result<()> {
403 for header in parts.headers.iter() {
404 self.req.headers_mut().insert(header.0.clone(), header.1.clone());
405 }
406 Ok(())
407 }
408 fn force_no_cache(&mut self) -> Result<()> {
409 self.req
410 .headers_mut()
411 .insert(CACHE_CONTROL, HeaderValue::from_str("no-cache")?);
412 Ok(())
413 }
414 fn parts(&self) -> Result<Parts> {
415 // Extract request parts without cloning the body
416 let mut builder = http::Request::builder()
417 .method(self.req.method().as_str())
418 .uri(self.req.url().as_str())
419 .version(self.req.version());
420
421 // Add headers
422 for (name, value) in self.req.headers() {
423 builder = builder.header(name, value);
424 }
425
426 // Add extensions
427 if let Some(no_error) = builder.extensions_mut() {
428 *no_error = self.extensions.clone();
429 }
430
431 // Build with empty body just to get the Parts
432 let http_req = builder.body(()).map_err(Box::new)?;
433 Ok(http_req.into_parts().0)
434 }
435 fn url(&self) -> Result<Url> {
436 Ok(self.req.url().clone())
437 }
438 fn method(&self) -> Result<String> {
439 Ok(self.req.method().as_ref().to_string())
440 }
441 async fn remote_fetch(&mut self) -> Result<HttpResponse> {
442 let copied_req = clone_req(&self.req)?;
443 let res = self
444 .next
445 .clone()
446 .run(copied_req, self.extensions)
447 .await
448 .map_err(BoxError::from)?;
449 let headers = res.headers().into();
450 let url = res.url().clone();
451 let status = res.status().into();
452 let version = res.version();
453 let body: Vec<u8> = res.bytes().await.map_err(BoxError::from)?.to_vec();
454 Ok(HttpResponse {
455 body,
456 headers,
457 status,
458 url,
459 version: version.try_into()?,
460 metadata: None,
461 })
462 }
463}
464
465// Converts an [`HttpResponse`] to a reqwest [`Response`]
466fn convert_response(response: HttpResponse) -> Result<Response> {
467 let metadata = response.metadata.clone();
468 let mut ret_res = http::Response::builder()
469 .status(response.status)
470 .url(response.url)
471 .version(response.version.into())
472 .body(response.body)?;
473 for header in response.headers {
474 ret_res.headers_mut().insert(
475 HeaderName::from_str(&header.0)?,
476 HeaderValue::from_str(&header.1)?,
477 );
478 }
479 // Insert metadata into response extensions if present
480 if let Some(metadata) = metadata {
481 ret_res.extensions_mut().insert(HttpCacheMetadata::from(metadata));
482 }
483 Ok(Response::from(ret_res))
484}
485
486#[cfg(feature = "streaming")]
487// Converts a reqwest Response to an http::Response with Full body for streaming cache processing
488async fn convert_reqwest_response_to_http_full_body(
489 response: Response,
490) -> Result<http::Response<http_body_util::Full<bytes::Bytes>>> {
491 let status = response.status();
492 let version = response.version();
493 let headers = response.headers().clone();
494 let body_bytes = response.bytes().await.map_err(BoxError::from)?;
495
496 let mut http_response =
497 http::Response::builder().status(status).version(version);
498
499 for (name, value) in headers.iter() {
500 http_response = http_response.header(name, value);
501 }
502
503 http_response
504 .body(http_body_util::Full::new(body_bytes))
505 .map_err(BoxError::from)
506}
507
508#[cfg(feature = "streaming")]
509// Converts reqwest Response to http response parts (for 304 handling)
510fn convert_reqwest_response_to_http_parts(
511 response: Response,
512) -> Result<(http::response::Parts, ())> {
513 let status = response.status();
514 let version = response.version();
515 let headers = response.headers();
516
517 let mut http_response =
518 http::Response::builder().status(status).version(version);
519
520 for (name, value) in headers.iter() {
521 http_response = http_response.header(name, value);
522 }
523
524 let response = http_response.body(()).map_err(BoxError::from)?;
525 Ok(response.into_parts())
526}
527
528#[cfg(feature = "streaming")]
529// Helper function to add cache status headers to a streaming response
530fn add_cache_status_headers_to_response<T>(
531 mut response: http::Response<T>,
532 hit_or_miss: &str,
533 cache_lookup: &str,
534) -> http::Response<T> {
535 use http::HeaderValue;
536 use http_cache::{XCACHE, XCACHELOOKUP};
537
538 let headers = response.headers_mut();
539 if let Ok(value1) = HeaderValue::from_str(hit_or_miss) {
540 headers.insert(XCACHE, value1);
541 }
542 if let Ok(value2) = HeaderValue::from_str(cache_lookup) {
543 headers.insert(XCACHELOOKUP, value2);
544 }
545 response
546}
547
548#[cfg(feature = "streaming")]
549// Converts a streaming response to reqwest Response using the StreamingCacheManager's method
550async fn convert_streaming_body_to_reqwest<T>(
551 response: http::Response<T::Body>,
552) -> Result<Response>
553where
554 T: StreamingCacheManager,
555 <T::Body as http_body::Body>::Data: Send,
556 <T::Body as http_body::Body>::Error: Send + Sync + 'static,
557{
558 let (parts, body) = response.into_parts();
559
560 // Use the cache manager's body_to_bytes_stream method for streaming
561 let bytes_stream = T::body_to_bytes_stream(body);
562
563 // Use reqwest's Body::wrap_stream to create a streaming body
564 let reqwest_body = reqwest::Body::wrap_stream(bytes_stream);
565
566 let mut http_response =
567 http::Response::builder().status(parts.status).version(parts.version);
568
569 for (name, value) in parts.headers.iter() {
570 http_response = http_response.header(name, value);
571 }
572
573 let response = http_response.body(reqwest_body)?;
574 Ok(Response::from(response))
575}
576
577fn bad_header(e: reqwest::header::InvalidHeaderValue) -> Error {
578 to_middleware_error(HttpCacheError::Cache(e.to_string()))
579}
580
581fn from_box_error(e: BoxError) -> Error {
582 to_middleware_error(HttpCacheError::Cache(e.to_string()))
583}
584
585#[async_trait::async_trait]
586impl<T: CacheManager> reqwest_middleware::Middleware for Cache<T> {
587 async fn handle(
588 &self,
589 req: Request,
590 extensions: &mut Extensions,
591 next: Next<'_>,
592 ) -> std::result::Result<Response, Error> {
593 let mut middleware = ReqwestMiddleware { req, next, extensions };
594 let can_cache =
595 self.0.can_cache_request(&middleware).map_err(from_box_error)?;
596
597 if can_cache {
598 let res = self.0.run(middleware).await.map_err(from_box_error)?;
599 let converted = convert_response(res).map_err(|e| {
600 to_middleware_error(HttpCacheError::Cache(e.to_string()))
601 })?;
602 Ok(converted)
603 } else {
604 self.0
605 .run_no_cache(&mut middleware)
606 .await
607 .map_err(from_box_error)?;
608 let mut res = middleware
609 .next
610 .run(middleware.req, middleware.extensions)
611 .await?;
612
613 let miss =
614 HeaderValue::from_str(HitOrMiss::MISS.to_string().as_ref())
615 .map_err(bad_header)?;
616 res.headers_mut().insert(XCACHE, miss.clone());
617 res.headers_mut().insert(XCACHELOOKUP, miss);
618 Ok(res)
619 }
620 }
621}
622
623#[cfg(feature = "streaming")]
624#[async_trait::async_trait]
625impl<T: StreamingCacheManager> reqwest_middleware::Middleware
626 for StreamingCache<T>
627where
628 T::Body: Send + 'static,
629 <T::Body as http_body::Body>::Data: Send,
630 <T::Body as http_body::Body>::Error:
631 Into<http_cache::StreamingError> + Send + Sync + 'static,
632{
633 async fn handle(
634 &self,
635 req: Request,
636 extensions: &mut Extensions,
637 next: Next<'_>,
638 ) -> std::result::Result<Response, Error> {
639 use http_cache::HttpCacheStreamInterface;
640
641 // Convert reqwest Request to http::Request for analysis
642 // If the request can't be cloned (e.g., streaming body), bypass cache gracefully
643 let copied_req = match clone_req(&req) {
644 Ok(req) => req,
645 Err(_) => {
646 // Request has non-cloneable body (streaming/multipart), bypass cache
647 let response = next.run(req, extensions).await?;
648 return Ok(response);
649 }
650 };
651 let http_req = match http::Request::try_from(copied_req) {
652 Ok(r) => r,
653 Err(e) => {
654 return Err(to_middleware_error(HttpCacheError::Cache(
655 e.to_string(),
656 )))
657 }
658 };
659 let (parts, _) = http_req.into_parts();
660
661 // Check for mode override from extensions
662 let mode_override = extensions.get::<CacheMode>().cloned();
663
664 // Analyze the request for caching behavior
665 let analysis = match self.cache.analyze_request(&parts, mode_override) {
666 Ok(a) => a,
667 Err(e) => {
668 return Err(to_middleware_error(HttpCacheError::Cache(
669 e.to_string(),
670 )))
671 }
672 };
673
674 // Check if we should bypass cache entirely
675 if !analysis.should_cache {
676 let response = next.run(req, extensions).await?;
677 return Ok(response);
678 }
679
680 // Look up cached response
681 if let Some((cached_response, policy)) = self
682 .cache
683 .lookup_cached_response(&analysis.cache_key)
684 .await
685 .map_err(|e| {
686 to_middleware_error(HttpCacheError::Cache(e.to_string()))
687 })?
688 {
689 // Check if cached response is still fresh
690 use http_cache_semantics::BeforeRequest;
691 let before_req = policy.before_request(&parts, SystemTime::now());
692 match before_req {
693 BeforeRequest::Fresh(_fresh_parts) => {
694 // Convert cached streaming response back to reqwest Response
695 // Now using streaming instead of buffering!
696 let mut cached_response = cached_response;
697
698 // Add cache status headers if enabled
699 if self.cache.options.cache_status_headers {
700 cached_response = add_cache_status_headers_to_response(
701 cached_response,
702 "HIT",
703 "HIT",
704 );
705 }
706
707 return convert_streaming_body_to_reqwest::<T>(
708 cached_response,
709 )
710 .await
711 .map_err(|e| {
712 to_middleware_error(HttpCacheError::Cache(
713 e.to_string(),
714 ))
715 });
716 }
717 BeforeRequest::Stale { request: conditional_parts, .. } => {
718 // Apply rate limiting before revalidation request
719 #[cfg(feature = "rate-limiting")]
720 if let Some(rate_limiter) = &self.cache.options.rate_limiter
721 {
722 let url = req.url().clone();
723 let rate_limit_key =
724 url.host_str().unwrap_or("unknown");
725 rate_limiter.until_key_ready(rate_limit_key).await;
726 }
727
728 // Create conditional request
729 let mut conditional_req = req;
730 for (name, value) in conditional_parts.headers.iter() {
731 conditional_req
732 .headers_mut()
733 .insert(name.clone(), value.clone());
734 }
735
736 let conditional_response =
737 next.run(conditional_req, extensions).await?;
738
739 if conditional_response.status() == 304 {
740 // Convert reqwest response parts for handling not modified
741 let (fresh_parts, _) =
742 convert_reqwest_response_to_http_parts(
743 conditional_response,
744 )
745 .map_err(|e| {
746 to_middleware_error(HttpCacheError::Cache(
747 e.to_string(),
748 ))
749 })?;
750 let updated_response = self
751 .cache
752 .handle_not_modified(cached_response, &fresh_parts)
753 .await
754 .map_err(|e| {
755 to_middleware_error(HttpCacheError::Cache(
756 e.to_string(),
757 ))
758 })?;
759
760 let mut final_response = updated_response;
761
762 // Add cache status headers if enabled
763 if self.cache.options.cache_status_headers {
764 final_response =
765 add_cache_status_headers_to_response(
766 final_response,
767 "HIT",
768 "HIT",
769 );
770 }
771
772 return convert_streaming_body_to_reqwest::<T>(
773 final_response,
774 )
775 .await
776 .map_err(|e| {
777 to_middleware_error(HttpCacheError::Cache(
778 e.to_string(),
779 ))
780 });
781 } else {
782 // Fresh response received, process it through the cache
783 let http_response =
784 convert_reqwest_response_to_http_full_body(
785 conditional_response,
786 )
787 .await
788 .map_err(|e| {
789 to_middleware_error(HttpCacheError::Cache(
790 e.to_string(),
791 ))
792 })?;
793 let cached_response = self
794 .cache
795 .process_response(analysis, http_response, None)
796 .await
797 .map_err(|e| {
798 to_middleware_error(HttpCacheError::Cache(
799 e.to_string(),
800 ))
801 })?;
802
803 let mut final_response = cached_response;
804
805 // Add cache status headers if enabled
806 if self.cache.options.cache_status_headers {
807 final_response =
808 add_cache_status_headers_to_response(
809 final_response,
810 "MISS",
811 "MISS",
812 );
813 }
814
815 return convert_streaming_body_to_reqwest::<T>(
816 final_response,
817 )
818 .await
819 .map_err(|e| {
820 to_middleware_error(HttpCacheError::Cache(
821 e.to_string(),
822 ))
823 });
824 }
825 }
826 }
827 }
828
829 // Apply rate limiting before fresh request
830 #[cfg(feature = "rate-limiting")]
831 if let Some(rate_limiter) = &self.cache.options.rate_limiter {
832 let url = req.url().clone();
833 let rate_limit_key = url.host_str().unwrap_or("unknown");
834 rate_limiter.until_key_ready(rate_limit_key).await;
835 }
836
837 // Fetch fresh response from upstream
838 let response = next.run(req, extensions).await?;
839 let http_response =
840 convert_reqwest_response_to_http_full_body(response)
841 .await
842 .map_err(|e| {
843 to_middleware_error(HttpCacheError::Cache(e.to_string()))
844 })?;
845
846 // Process and potentially cache the response
847 let cached_response = self
848 .cache
849 .process_response(analysis, http_response, None)
850 .await
851 .map_err(|e| {
852 to_middleware_error(HttpCacheError::Cache(e.to_string()))
853 })?;
854
855 let mut final_response = cached_response;
856
857 // Add cache status headers if enabled
858 if self.cache.options.cache_status_headers {
859 final_response = add_cache_status_headers_to_response(
860 final_response,
861 "MISS",
862 "MISS",
863 );
864 }
865
866 convert_streaming_body_to_reqwest::<T>(final_response).await.map_err(
867 |e| to_middleware_error(HttpCacheError::Cache(e.to_string())),
868 )
869 }
870}
871
872#[cfg(test)]
873mod test;