http_cache_reqwest/lib.rs
1#![forbid(unsafe_code, future_incompatible)]
2#![deny(
3 missing_docs,
4 missing_debug_implementations,
5 missing_copy_implementations,
6 nonstandard_style,
7 unused_qualifications,
8 unused_import_braces,
9 unused_extern_crates,
10 trivial_casts,
11 trivial_numeric_casts
12)]
13#![allow(clippy::doc_lazy_continuation)]
14#![cfg_attr(docsrs, feature(doc_cfg))]
15//! # http-cache-reqwest
16//!
17//! HTTP caching middleware for the [reqwest] HTTP client.
18//!
19//! This middleware implements HTTP caching according to RFC 7234 for the reqwest HTTP client library.
20//! It works as part of the [reqwest-middleware] ecosystem to provide caching capabilities.
21//!
22//! ```no_run
23//! use reqwest::Client;
24//! use reqwest_middleware::{ClientBuilder, Result};
25//! use http_cache_reqwest::{Cache, CacheMode, CACacheManager, HttpCache, HttpCacheOptions};
26//!
27//! #[tokio::main]
28//! async fn main() -> Result<()> {
29//! let client = ClientBuilder::new(Client::new())
30//! .with(Cache(HttpCache {
31//! mode: CacheMode::Default,
32//! manager: CACacheManager::new("./cache".into(), true),
33//! options: HttpCacheOptions::default(),
34//! }))
35//! .build();
36//!
37//! // This request will be cached according to response headers
38//! let response = client
39//! .get("https://developer.mozilla.org/en-US/docs/Web/HTTP/Caching")
40//! .send()
41//! .await?;
42//! println!("Status: {}", response.status());
43//!
44//! // Subsequent identical requests may be served from cache
45//! let cached_response = client
46//! .get("https://developer.mozilla.org/en-US/docs/Web/HTTP/Caching")
47//! .send()
48//! .await?;
49//! println!("Cached status: {}", cached_response.status());
50//!
51//! Ok(())
52//! }
53//! ```
54//!
55//! ## Streaming Support
56//!
57//! The `StreamingCache` provides streaming support for large responses without buffering
58//! them entirely in memory. This is particularly useful for downloading large files or
59//! processing streaming APIs while still benefiting from HTTP caching.
60//!
61//! **Note**: Requires the `streaming` feature and a compatible cache manager that implements
62//! [`StreamingCacheManager`]. Currently only the `StreamingCacheManager` supports streaming -
63//! `CACacheManager` and `MokaManager` do not support streaming and will buffer responses
64//! in memory. The streaming implementation achieves significant memory savings
65//! (typically 35-40% reduction) compared to traditional buffered approaches.
66//!
67//! ```no_run
68//! # #[cfg(feature = "streaming")]
69//! use reqwest::Client;
70//! # #[cfg(feature = "streaming")]
71//! use reqwest_middleware::ClientBuilder;
72//! # #[cfg(feature = "streaming")]
73//! use http_cache_reqwest::{StreamingCache, CacheMode};
74//! # #[cfg(feature = "streaming")]
75//! use http_cache::StreamingManager;
76//!
77//! # #[cfg(feature = "streaming")]
78//! #[tokio::main]
79//! async fn main() -> reqwest_middleware::Result<()> {
80//! let client = ClientBuilder::new(Client::new())
81//! .with(StreamingCache::new(
82//! StreamingManager::new("./cache".into()),
83//! CacheMode::Default,
84//! ))
85//! .build();
86//!
87//! // Stream large responses efficiently - cached responses are also streamed
88//! let response = client
89//! .get("https://httpbin.org/stream/1000")
90//! .send()
91//! .await?;
92//! println!("Status: {}", response.status());
93//!
94//! // Process the streaming body chunk by chunk
95//! use futures_util::StreamExt;
96//! let mut stream = response.bytes_stream();
97//! while let Some(chunk) = stream.next().await {
98//! let chunk = chunk?;
99//! println!("Received chunk of {} bytes", chunk.len());
100//! // Process chunk without loading entire response into memory
101//! }
102//!
103//! Ok(())
104//! }
105//! # #[cfg(not(feature = "streaming"))]
106//! # fn main() {}
107//! ```
108//!
109//! ### Streaming Cache with Custom Options
110//!
111//! ```no_run
112//! # #[cfg(feature = "streaming")]
113//! use reqwest::Client;
114//! # #[cfg(feature = "streaming")]
115//! use reqwest_middleware::ClientBuilder;
116//! # #[cfg(feature = "streaming")]
117//! use http_cache_reqwest::{StreamingCache, CacheMode, HttpCacheOptions};
118//! # #[cfg(feature = "streaming")]
119//! use http_cache::StreamingManager;
120//!
121//! # #[cfg(feature = "streaming")]
122//! #[tokio::main]
123//! async fn main() -> reqwest_middleware::Result<()> {
124//! let options = HttpCacheOptions {
125//! cache_bust: Some(std::sync::Arc::new(|req: &http::request::Parts, _cache_key: &Option<std::sync::Arc<dyn Fn(&http::request::Parts) -> String + Send + Sync>>, _uri: &str| {
126//! // Custom cache busting logic for streaming requests
127//! if req.uri.path().contains("/stream/") {
128//! vec![format!("stream:{}", req.uri)]
129//! } else {
130//! vec![]
131//! }
132//! })),
133//! ..Default::default()
134//! };
135//!
136//! let client = ClientBuilder::new(Client::new())
137//! .with(StreamingCache::with_options(
138//! StreamingManager::new("./cache".into()),
139//! CacheMode::Default,
140//! options,
141//! ))
142//! .build();
143//!
144//! Ok(())
145//! }
146//! # #[cfg(not(feature = "streaming"))]
147//! # fn main() {}
148//! ```
149//!
150//! ## Cache Modes
151//!
152//! Control caching behavior with different modes:
153//!
154//! ```no_run
155//! use reqwest::Client;
156//! use reqwest_middleware::ClientBuilder;
157//! use http_cache_reqwest::{Cache, CacheMode, CACacheManager, HttpCache, HttpCacheOptions};
158//!
159//! #[tokio::main]
160//! async fn main() -> reqwest_middleware::Result<()> {
161//! let client = ClientBuilder::new(Client::new())
162//! .with(Cache(HttpCache {
163//! mode: CacheMode::ForceCache, // Cache everything, ignore headers
164//! manager: CACacheManager::new("./cache".into(), true),
165//! options: HttpCacheOptions::default(),
166//! }))
167//! .build();
168//!
169//! // This will be cached even if headers say not to cache
170//! client.get("https://httpbin.org/uuid").send().await?;
171//! Ok(())
172//! }
173//! ```
174//!
175//! ## Per-Request Cache Control
176//!
177//! Override the cache mode on individual requests:
178//!
179//! ```no_run
180//! use reqwest::Client;
181//! use reqwest_middleware::ClientBuilder;
182//! use http_cache_reqwest::{Cache, CacheMode, CACacheManager, HttpCache, HttpCacheOptions};
183//!
184//! #[tokio::main]
185//! async fn main() -> reqwest_middleware::Result<()> {
186//! let client = ClientBuilder::new(Client::new())
187//! .with(Cache(HttpCache {
188//! mode: CacheMode::Default,
189//! manager: CACacheManager::new("./cache".into(), true),
190//! options: HttpCacheOptions::default(),
191//! }))
192//! .build();
193//!
194//! // Override cache mode for this specific request
195//! let response = client.get("https://httpbin.org/uuid")
196//! .with_extension(CacheMode::OnlyIfCached) // Only serve from cache
197//! .send()
198//! .await?;
199//!
200//! // This request bypasses cache completely
201//! let fresh_response = client.get("https://httpbin.org/uuid")
202//! .with_extension(CacheMode::NoStore)
203//! .send()
204//! .await?;
205//!
206//! Ok(())
207//! }
208//! ```
209//!
210//! ## Custom Cache Keys
211//!
212//! Customize how cache keys are generated:
213//!
214//! ```no_run
215//! use reqwest::Client;
216//! use reqwest_middleware::ClientBuilder;
217//! use http_cache_reqwest::{Cache, CacheMode, CACacheManager, HttpCache, HttpCacheOptions};
218//! use std::sync::Arc;
219//!
220//! #[tokio::main]
221//! async fn main() -> reqwest_middleware::Result<()> {
222//! let options = HttpCacheOptions {
223//! cache_key: Some(Arc::new(|req: &http::request::Parts| {
224//! // Include query parameters in cache key
225//! format!("{}:{}", req.method, req.uri)
226//! })),
227//! ..Default::default()
228//! };
229//!
230//! let client = ClientBuilder::new(Client::new())
231//! .with(Cache(HttpCache {
232//! mode: CacheMode::Default,
233//! manager: CACacheManager::new("./cache".into(), true),
234//! options,
235//! }))
236//! .build();
237//!
238//! Ok(())
239//! }
240//! ```
241//!
242//! ## In-Memory Caching
243//!
244//! Use the Moka in-memory cache:
245//!
246//! ```no_run
247//! # #[cfg(feature = "manager-moka")]
248//! use reqwest::Client;
249//! # #[cfg(feature = "manager-moka")]
250//! use reqwest_middleware::ClientBuilder;
251//! # #[cfg(feature = "manager-moka")]
252//! use http_cache_reqwest::{Cache, CacheMode, MokaManager, HttpCache, HttpCacheOptions};
253//! # #[cfg(feature = "manager-moka")]
254//! use http_cache_reqwest::MokaCache;
255//!
256//! # #[cfg(feature = "manager-moka")]
257//! #[tokio::main]
258//! async fn main() -> reqwest_middleware::Result<()> {
259//! let client = ClientBuilder::new(Client::new())
260//! .with(Cache(HttpCache {
261//! mode: CacheMode::Default,
262//! manager: MokaManager::new(MokaCache::new(1000)), // Max 1000 entries
263//! options: HttpCacheOptions::default(),
264//! }))
265//! .build();
266//!
267//! Ok(())
268//! }
269//! # #[cfg(not(feature = "manager-moka"))]
270//! # fn main() {}
271//! ```
272// Re-export unified error types from http-cache core
273pub use http_cache::{BadRequest, HttpCacheError};
274
275#[cfg(feature = "streaming")]
276/// Type alias for reqwest streaming errors, using the unified streaming error system
277pub type ReqwestStreamingError = http_cache::ClientStreamingError;
278
279#[cfg(feature = "streaming")]
280use http_cache::StreamingCacheManager;
281
282use std::{
283 collections::HashMap, convert::TryInto, str::FromStr, time::SystemTime,
284};
285
286pub use http::request::Parts;
287use http::{
288 header::{HeaderName, CACHE_CONTROL},
289 Extensions, HeaderValue, Method,
290};
291use http_cache::{
292 BoxError, HitOrMiss, Middleware, Result, XCACHE, XCACHELOOKUP,
293};
294use http_cache_semantics::CachePolicy;
295use reqwest::{Request, Response, ResponseBuilderExt};
296use reqwest_middleware::{Error, Next};
297
298/// Helper function to convert our error types to reqwest middleware errors
299fn to_middleware_error<E: std::error::Error + Send + Sync + 'static>(
300 error: E,
301) -> Error {
302 // Convert to anyhow::Error which is what reqwest-middleware expects
303 Error::Middleware(anyhow::Error::new(error))
304}
305use url::Url;
306
307pub use http_cache::{
308 CacheManager, CacheMode, CacheOptions, HttpCache, HttpCacheOptions,
309 HttpResponse, ResponseCacheModeFn,
310};
311
312#[cfg(feature = "streaming")]
313// Re-export streaming types for future use
314pub use http_cache::{
315 HttpCacheStreamInterface, HttpStreamingCache, StreamingBody,
316 StreamingManager,
317};
318
319#[cfg(feature = "manager-cacache")]
320#[cfg_attr(docsrs, doc(cfg(feature = "manager-cacache")))]
321pub use http_cache::CACacheManager;
322
323#[cfg(feature = "manager-moka")]
324#[cfg_attr(docsrs, doc(cfg(feature = "manager-moka")))]
325pub use http_cache::{MokaCache, MokaCacheBuilder, MokaManager};
326
327#[cfg(feature = "rate-limiting")]
328#[cfg_attr(docsrs, doc(cfg(feature = "rate-limiting")))]
329pub use http_cache::rate_limiting::{
330 CacheAwareRateLimiter, DirectRateLimiter, DomainRateLimiter, Quota,
331};
332
333/// Wrapper for [`HttpCache`]
334#[derive(Debug)]
335pub struct Cache<T: CacheManager>(pub HttpCache<T>);
336
337#[cfg(feature = "streaming")]
338/// Streaming cache wrapper that implements reqwest middleware for streaming responses
339#[derive(Debug, Clone)]
340pub struct StreamingCache<T: StreamingCacheManager> {
341 cache: HttpStreamingCache<T>,
342}
343
344#[cfg(feature = "streaming")]
345impl<T: StreamingCacheManager> StreamingCache<T> {
346 /// Create a new streaming cache with the given manager and mode
347 pub fn new(manager: T, mode: CacheMode) -> Self {
348 Self {
349 cache: HttpStreamingCache {
350 mode,
351 manager,
352 options: HttpCacheOptions::default(),
353 },
354 }
355 }
356
357 /// Create a new streaming cache with custom options
358 pub fn with_options(
359 manager: T,
360 mode: CacheMode,
361 options: HttpCacheOptions,
362 ) -> Self {
363 Self { cache: HttpStreamingCache { mode, manager, options } }
364 }
365}
366
367/// Implements ['Middleware'] for reqwest
368pub(crate) struct ReqwestMiddleware<'a> {
369 pub req: Request,
370 pub next: Next<'a>,
371 pub extensions: &'a mut Extensions,
372}
373
374fn clone_req(request: &Request) -> std::result::Result<Request, Error> {
375 match request.try_clone() {
376 Some(r) => Ok(r),
377 None => Err(to_middleware_error(BadRequest)),
378 }
379}
380
381#[async_trait::async_trait]
382impl Middleware for ReqwestMiddleware<'_> {
383 fn overridden_cache_mode(&self) -> Option<CacheMode> {
384 self.extensions.get().cloned()
385 }
386 fn is_method_get_head(&self) -> bool {
387 self.req.method() == Method::GET || self.req.method() == Method::HEAD
388 }
389 fn policy(&self, response: &HttpResponse) -> Result<CachePolicy> {
390 Ok(CachePolicy::new(&self.parts()?, &response.parts()?))
391 }
392 fn policy_with_options(
393 &self,
394 response: &HttpResponse,
395 options: CacheOptions,
396 ) -> Result<CachePolicy> {
397 Ok(CachePolicy::new_options(
398 &self.parts()?,
399 &response.parts()?,
400 SystemTime::now(),
401 options,
402 ))
403 }
404 fn update_headers(&mut self, parts: &Parts) -> Result<()> {
405 for header in parts.headers.iter() {
406 self.req.headers_mut().insert(header.0.clone(), header.1.clone());
407 }
408 Ok(())
409 }
410 fn force_no_cache(&mut self) -> Result<()> {
411 self.req
412 .headers_mut()
413 .insert(CACHE_CONTROL, HeaderValue::from_str("no-cache")?);
414 Ok(())
415 }
416 fn parts(&self) -> Result<Parts> {
417 // Extract request parts without cloning the body
418 let mut builder = http::Request::builder()
419 .method(self.req.method().as_str())
420 .uri(self.req.url().as_str())
421 .version(self.req.version());
422
423 // Add headers
424 for (name, value) in self.req.headers() {
425 builder = builder.header(name, value);
426 }
427
428 // Build with empty body just to get the Parts
429 let http_req = builder.body(()).map_err(Box::new)?;
430 Ok(http_req.into_parts().0)
431 }
432 fn url(&self) -> Result<Url> {
433 Ok(self.req.url().clone())
434 }
435 fn method(&self) -> Result<String> {
436 Ok(self.req.method().as_ref().to_string())
437 }
438 async fn remote_fetch(&mut self) -> Result<HttpResponse> {
439 let copied_req = clone_req(&self.req)?;
440 let res = self
441 .next
442 .clone()
443 .run(copied_req, self.extensions)
444 .await
445 .map_err(BoxError::from)?;
446 let mut headers = HashMap::new();
447 for header in res.headers() {
448 headers.insert(
449 header.0.as_str().to_owned(),
450 header.1.to_str()?.to_owned(),
451 );
452 }
453 let url = res.url().clone();
454 let status = res.status().into();
455 let version = res.version();
456 let body: Vec<u8> = res.bytes().await.map_err(BoxError::from)?.to_vec();
457 Ok(HttpResponse {
458 body,
459 headers,
460 status,
461 url,
462 version: version.try_into()?,
463 })
464 }
465}
466
467// Converts an [`HttpResponse`] to a reqwest [`Response`]
468fn convert_response(response: HttpResponse) -> Result<Response> {
469 let mut ret_res = http::Response::builder()
470 .status(response.status)
471 .url(response.url)
472 .version(response.version.into())
473 .body(response.body)?;
474 for header in response.headers {
475 ret_res.headers_mut().insert(
476 HeaderName::from_str(&header.0)?,
477 HeaderValue::from_str(&header.1)?,
478 );
479 }
480 Ok(Response::from(ret_res))
481}
482
483#[cfg(feature = "streaming")]
484// Converts a reqwest Response to an http::Response with Full body for streaming cache processing
485async fn convert_reqwest_response_to_http_full_body(
486 response: Response,
487) -> Result<http::Response<http_body_util::Full<bytes::Bytes>>> {
488 let status = response.status();
489 let version = response.version();
490 let headers = response.headers().clone();
491 let body_bytes = response.bytes().await.map_err(BoxError::from)?;
492
493 let mut http_response =
494 http::Response::builder().status(status).version(version);
495
496 for (name, value) in headers.iter() {
497 http_response = http_response.header(name, value);
498 }
499
500 http_response
501 .body(http_body_util::Full::new(body_bytes))
502 .map_err(BoxError::from)
503}
504
505#[cfg(feature = "streaming")]
506// Converts reqwest Response to http response parts (for 304 handling)
507fn convert_reqwest_response_to_http_parts(
508 response: Response,
509) -> Result<(http::response::Parts, ())> {
510 let status = response.status();
511 let version = response.version();
512 let headers = response.headers();
513
514 let mut http_response =
515 http::Response::builder().status(status).version(version);
516
517 for (name, value) in headers.iter() {
518 http_response = http_response.header(name, value);
519 }
520
521 let response = http_response.body(()).map_err(BoxError::from)?;
522 Ok(response.into_parts())
523}
524
525#[cfg(feature = "streaming")]
526// Helper function to add cache status headers to a streaming response
527fn add_cache_status_headers_to_response<T>(
528 mut response: http::Response<T>,
529 hit_or_miss: &str,
530 cache_lookup: &str,
531) -> http::Response<T> {
532 use http::HeaderValue;
533 use http_cache::{XCACHE, XCACHELOOKUP};
534
535 let headers = response.headers_mut();
536 if let Ok(value1) = HeaderValue::from_str(hit_or_miss) {
537 headers.insert(XCACHE, value1);
538 }
539 if let Ok(value2) = HeaderValue::from_str(cache_lookup) {
540 headers.insert(XCACHELOOKUP, value2);
541 }
542 response
543}
544
545#[cfg(feature = "streaming")]
546// Converts a streaming response to reqwest Response using the StreamingCacheManager's method
547async fn convert_streaming_body_to_reqwest<T>(
548 response: http::Response<T::Body>,
549) -> Result<Response>
550where
551 T: StreamingCacheManager,
552 <T::Body as http_body::Body>::Data: Send,
553 <T::Body as http_body::Body>::Error: Send + Sync + 'static,
554{
555 let (parts, body) = response.into_parts();
556
557 // Use the cache manager's body_to_bytes_stream method for streaming
558 let bytes_stream = T::body_to_bytes_stream(body);
559
560 // Use reqwest's Body::wrap_stream to create a streaming body
561 let reqwest_body = reqwest::Body::wrap_stream(bytes_stream);
562
563 let mut http_response =
564 http::Response::builder().status(parts.status).version(parts.version);
565
566 for (name, value) in parts.headers.iter() {
567 http_response = http_response.header(name, value);
568 }
569
570 let response = http_response.body(reqwest_body)?;
571 Ok(Response::from(response))
572}
573
574fn bad_header(e: reqwest::header::InvalidHeaderValue) -> Error {
575 to_middleware_error(HttpCacheError::Cache(e.to_string()))
576}
577
578fn from_box_error(e: BoxError) -> Error {
579 to_middleware_error(HttpCacheError::Cache(e.to_string()))
580}
581
582#[async_trait::async_trait]
583impl<T: CacheManager> reqwest_middleware::Middleware for Cache<T> {
584 async fn handle(
585 &self,
586 req: Request,
587 extensions: &mut Extensions,
588 next: Next<'_>,
589 ) -> std::result::Result<Response, Error> {
590 let mut middleware = ReqwestMiddleware { req, next, extensions };
591 let can_cache =
592 self.0.can_cache_request(&middleware).map_err(from_box_error)?;
593
594 if can_cache {
595 let res = self.0.run(middleware).await.map_err(from_box_error)?;
596 let converted = convert_response(res).map_err(|e| {
597 to_middleware_error(HttpCacheError::Cache(e.to_string()))
598 })?;
599 Ok(converted)
600 } else {
601 self.0
602 .run_no_cache(&mut middleware)
603 .await
604 .map_err(from_box_error)?;
605 let mut res = middleware
606 .next
607 .run(middleware.req, middleware.extensions)
608 .await?;
609
610 let miss =
611 HeaderValue::from_str(HitOrMiss::MISS.to_string().as_ref())
612 .map_err(bad_header)?;
613 res.headers_mut().insert(XCACHE, miss.clone());
614 res.headers_mut().insert(XCACHELOOKUP, miss);
615 Ok(res)
616 }
617 }
618}
619
620#[cfg(feature = "streaming")]
621#[async_trait::async_trait]
622impl<T: StreamingCacheManager> reqwest_middleware::Middleware
623 for StreamingCache<T>
624where
625 T::Body: Send + 'static,
626 <T::Body as http_body::Body>::Data: Send,
627 <T::Body as http_body::Body>::Error:
628 Into<http_cache::StreamingError> + Send + Sync + 'static,
629{
630 async fn handle(
631 &self,
632 req: Request,
633 extensions: &mut Extensions,
634 next: Next<'_>,
635 ) -> std::result::Result<Response, Error> {
636 use http_cache::HttpCacheStreamInterface;
637
638 // Convert reqwest Request to http::Request for analysis
639 // If the request can't be cloned (e.g., streaming body), bypass cache gracefully
640 let copied_req = match clone_req(&req) {
641 Ok(req) => req,
642 Err(_) => {
643 // Request has non-cloneable body (streaming/multipart), bypass cache
644 let response = next.run(req, extensions).await?;
645 return Ok(response);
646 }
647 };
648 let http_req = match http::Request::try_from(copied_req) {
649 Ok(r) => r,
650 Err(e) => {
651 return Err(to_middleware_error(HttpCacheError::Cache(
652 e.to_string(),
653 )))
654 }
655 };
656 let (parts, _) = http_req.into_parts();
657
658 // Check for mode override from extensions
659 let mode_override = extensions.get::<CacheMode>().cloned();
660
661 // Analyze the request for caching behavior
662 let analysis = match self.cache.analyze_request(&parts, mode_override) {
663 Ok(a) => a,
664 Err(e) => {
665 return Err(to_middleware_error(HttpCacheError::Cache(
666 e.to_string(),
667 )))
668 }
669 };
670
671 // Check if we should bypass cache entirely
672 if !analysis.should_cache {
673 let response = next.run(req, extensions).await?;
674 return Ok(response);
675 }
676
677 // Look up cached response
678 if let Some((cached_response, policy)) = self
679 .cache
680 .lookup_cached_response(&analysis.cache_key)
681 .await
682 .map_err(|e| {
683 to_middleware_error(HttpCacheError::Cache(e.to_string()))
684 })?
685 {
686 // Check if cached response is still fresh
687 use http_cache_semantics::BeforeRequest;
688 let before_req = policy.before_request(&parts, SystemTime::now());
689 match before_req {
690 BeforeRequest::Fresh(_fresh_parts) => {
691 // Convert cached streaming response back to reqwest Response
692 // Now using streaming instead of buffering!
693 let mut cached_response = cached_response;
694
695 // Add cache status headers if enabled
696 if self.cache.options.cache_status_headers {
697 cached_response = add_cache_status_headers_to_response(
698 cached_response,
699 "HIT",
700 "HIT",
701 );
702 }
703
704 return convert_streaming_body_to_reqwest::<T>(
705 cached_response,
706 )
707 .await
708 .map_err(|e| {
709 to_middleware_error(HttpCacheError::Cache(
710 e.to_string(),
711 ))
712 });
713 }
714 BeforeRequest::Stale { request: conditional_parts, .. } => {
715 // Apply rate limiting before revalidation request
716 #[cfg(feature = "rate-limiting")]
717 if let Some(rate_limiter) = &self.cache.options.rate_limiter
718 {
719 let url = req.url().clone();
720 let rate_limit_key =
721 url.host_str().unwrap_or("unknown");
722 rate_limiter.until_key_ready(rate_limit_key).await;
723 }
724
725 // Create conditional request
726 let mut conditional_req = req;
727 for (name, value) in conditional_parts.headers.iter() {
728 conditional_req
729 .headers_mut()
730 .insert(name.clone(), value.clone());
731 }
732
733 let conditional_response =
734 next.run(conditional_req, extensions).await?;
735
736 if conditional_response.status() == 304 {
737 // Convert reqwest response parts for handling not modified
738 let (fresh_parts, _) =
739 convert_reqwest_response_to_http_parts(
740 conditional_response,
741 )
742 .map_err(|e| {
743 to_middleware_error(HttpCacheError::Cache(
744 e.to_string(),
745 ))
746 })?;
747 let updated_response = self
748 .cache
749 .handle_not_modified(cached_response, &fresh_parts)
750 .await
751 .map_err(|e| {
752 to_middleware_error(HttpCacheError::Cache(
753 e.to_string(),
754 ))
755 })?;
756
757 let mut final_response = updated_response;
758
759 // Add cache status headers if enabled
760 if self.cache.options.cache_status_headers {
761 final_response =
762 add_cache_status_headers_to_response(
763 final_response,
764 "HIT",
765 "HIT",
766 );
767 }
768
769 return convert_streaming_body_to_reqwest::<T>(
770 final_response,
771 )
772 .await
773 .map_err(|e| {
774 to_middleware_error(HttpCacheError::Cache(
775 e.to_string(),
776 ))
777 });
778 } else {
779 // Fresh response received, process it through the cache
780 let http_response =
781 convert_reqwest_response_to_http_full_body(
782 conditional_response,
783 )
784 .await
785 .map_err(|e| {
786 to_middleware_error(HttpCacheError::Cache(
787 e.to_string(),
788 ))
789 })?;
790 let cached_response = self
791 .cache
792 .process_response(analysis, http_response)
793 .await
794 .map_err(|e| {
795 to_middleware_error(HttpCacheError::Cache(
796 e.to_string(),
797 ))
798 })?;
799
800 let mut final_response = cached_response;
801
802 // Add cache status headers if enabled
803 if self.cache.options.cache_status_headers {
804 final_response =
805 add_cache_status_headers_to_response(
806 final_response,
807 "MISS",
808 "MISS",
809 );
810 }
811
812 return convert_streaming_body_to_reqwest::<T>(
813 final_response,
814 )
815 .await
816 .map_err(|e| {
817 to_middleware_error(HttpCacheError::Cache(
818 e.to_string(),
819 ))
820 });
821 }
822 }
823 }
824 }
825
826 // Apply rate limiting before fresh request
827 #[cfg(feature = "rate-limiting")]
828 if let Some(rate_limiter) = &self.cache.options.rate_limiter {
829 let url = req.url().clone();
830 let rate_limit_key = url.host_str().unwrap_or("unknown");
831 rate_limiter.until_key_ready(rate_limit_key).await;
832 }
833
834 // Fetch fresh response from upstream
835 let response = next.run(req, extensions).await?;
836 let http_response =
837 convert_reqwest_response_to_http_full_body(response)
838 .await
839 .map_err(|e| {
840 to_middleware_error(HttpCacheError::Cache(e.to_string()))
841 })?;
842
843 // Process and potentially cache the response
844 let cached_response = self
845 .cache
846 .process_response(analysis, http_response)
847 .await
848 .map_err(|e| {
849 to_middleware_error(HttpCacheError::Cache(e.to_string()))
850 })?;
851
852 let mut final_response = cached_response;
853
854 // Add cache status headers if enabled
855 if self.cache.options.cache_status_headers {
856 final_response = add_cache_status_headers_to_response(
857 final_response,
858 "MISS",
859 "MISS",
860 );
861 }
862
863 convert_streaming_body_to_reqwest::<T>(final_response).await.map_err(
864 |e| to_middleware_error(HttpCacheError::Cache(e.to_string())),
865 )
866 }
867}
868
869#[cfg(test)]
870mod test;