http_cache_reqwest/
lib.rs

1#![forbid(unsafe_code, future_incompatible)]
2#![deny(
3    missing_docs,
4    missing_debug_implementations,
5    missing_copy_implementations,
6    nonstandard_style,
7    unused_qualifications,
8    unused_import_braces,
9    unused_extern_crates,
10    trivial_casts,
11    trivial_numeric_casts
12)]
13#![allow(clippy::doc_lazy_continuation)]
14#![cfg_attr(docsrs, feature(doc_cfg))]
15//! # http-cache-reqwest
16//!
17//! HTTP caching middleware for the [reqwest] HTTP client.
18//!
19//! This middleware implements HTTP caching according to RFC 7234 for the reqwest HTTP client library.
20//! It works as part of the [reqwest-middleware] ecosystem to provide caching capabilities.
21//!
22//! ```no_run
23//! use reqwest::Client;
24//! use reqwest_middleware::{ClientBuilder, Result};
25//! use http_cache_reqwest::{Cache, CacheMode, CACacheManager, HttpCache, HttpCacheOptions};
26//!
27//! #[tokio::main]
28//! async fn main() -> Result<()> {
29//!     let client = ClientBuilder::new(Client::new())
30//!         .with(Cache(HttpCache {
31//!             mode: CacheMode::Default,
32//!             manager: CACacheManager::new("./cache".into(), true),
33//!             options: HttpCacheOptions::default(),
34//!         }))
35//!         .build();
36//!     
37//!     // This request will be cached according to response headers
38//!     let response = client
39//!         .get("https://developer.mozilla.org/en-US/docs/Web/HTTP/Caching")
40//!         .send()
41//!         .await?;
42//!     println!("Status: {}", response.status());
43//!     
44//!     // Subsequent identical requests may be served from cache
45//!     let cached_response = client
46//!         .get("https://developer.mozilla.org/en-US/docs/Web/HTTP/Caching")
47//!         .send()
48//!         .await?;
49//!     println!("Cached status: {}", cached_response.status());
50//!     
51//!     Ok(())
52//! }
53//! ```
54//!
55//! ## Streaming Support
56//!
57//! The `StreamingCache` provides streaming support for large responses without buffering
58//! them entirely in memory. This is particularly useful for downloading large files or
59//! processing streaming APIs while still benefiting from HTTP caching.
60//!
61//! **Note**: Requires the `streaming` feature and a compatible cache manager that implements
62//! [`StreamingCacheManager`]. Currently only the `StreamingCacheManager` supports streaming -
63//! `CACacheManager` and `MokaManager` do not support streaming and will buffer responses
64//! in memory. The streaming implementation achieves significant memory savings
65//! (typically 35-40% reduction) compared to traditional buffered approaches.
66//!
67//! ```no_run
68//! # #[cfg(feature = "streaming")]
69//! use reqwest::Client;
70//! # #[cfg(feature = "streaming")]
71//! use reqwest_middleware::ClientBuilder;
72//! # #[cfg(feature = "streaming")]
73//! use http_cache_reqwest::{StreamingCache, CacheMode};
74//! # #[cfg(feature = "streaming")]
75//! use http_cache::StreamingManager;
76//!
77//! # #[cfg(feature = "streaming")]
78//! #[tokio::main]
79//! async fn main() -> reqwest_middleware::Result<()> {
80//!     let client = ClientBuilder::new(Client::new())
81//!         .with(StreamingCache::new(
82//!             StreamingManager::new("./cache".into()),
83//!             CacheMode::Default,
84//!         ))
85//!         .build();
86//!         
87//!     // Stream large responses efficiently - cached responses are also streamed
88//!     let response = client
89//!         .get("https://httpbin.org/stream/1000")
90//!         .send()
91//!         .await?;
92//!     println!("Status: {}", response.status());
93//!     
94//!     // Process the streaming body chunk by chunk
95//!     use futures_util::StreamExt;
96//!     let mut stream = response.bytes_stream();
97//!     while let Some(chunk) = stream.next().await {
98//!         let chunk = chunk?;
99//!         println!("Received chunk of {} bytes", chunk.len());
100//!         // Process chunk without loading entire response into memory
101//!     }
102//!     
103//!     Ok(())
104//! }
105//! # #[cfg(not(feature = "streaming"))]
106//! # fn main() {}
107//! ```
108//!
109//! ### Streaming Cache with Custom Options
110//!
111//! ```no_run
112//! # #[cfg(feature = "streaming")]
113//! use reqwest::Client;
114//! # #[cfg(feature = "streaming")]
115//! use reqwest_middleware::ClientBuilder;
116//! # #[cfg(feature = "streaming")]
117//! use http_cache_reqwest::{StreamingCache, CacheMode, HttpCacheOptions};
118//! # #[cfg(feature = "streaming")]
119//! use http_cache::StreamingManager;
120//!
121//! # #[cfg(feature = "streaming")]
122//! #[tokio::main]
123//! async fn main() -> reqwest_middleware::Result<()> {
124//!     let options = HttpCacheOptions {
125//!         cache_bust: Some(std::sync::Arc::new(|req: &http::request::Parts, _cache_key: &Option<std::sync::Arc<dyn Fn(&http::request::Parts) -> String + Send + Sync>>, _uri: &str| {
126//!             // Custom cache busting logic for streaming requests
127//!             if req.uri.path().contains("/stream/") {
128//!                 vec![format!("stream:{}", req.uri)]
129//!             } else {
130//!                 vec![]
131//!             }
132//!         })),
133//!         ..Default::default()
134//!     };
135//!
136//!     let client = ClientBuilder::new(Client::new())
137//!         .with(StreamingCache::with_options(
138//!             StreamingManager::new("./cache".into()),
139//!             CacheMode::Default,
140//!             options,
141//!         ))
142//!         .build();
143//!         
144//!     Ok(())
145//! }
146//! # #[cfg(not(feature = "streaming"))]
147//! # fn main() {}
148//! ```
149//!
150//! ## Cache Modes
151//!
152//! Control caching behavior with different modes:
153//!
154//! ```no_run
155//! use reqwest::Client;
156//! use reqwest_middleware::ClientBuilder;
157//! use http_cache_reqwest::{Cache, CacheMode, CACacheManager, HttpCache, HttpCacheOptions};
158//!
159//! #[tokio::main]
160//! async fn main() -> reqwest_middleware::Result<()> {
161//!     let client = ClientBuilder::new(Client::new())
162//!         .with(Cache(HttpCache {
163//!             mode: CacheMode::ForceCache, // Cache everything, ignore headers
164//!             manager: CACacheManager::new("./cache".into(), true),
165//!             options: HttpCacheOptions::default(),
166//!         }))
167//!         .build();
168//!     
169//!     // This will be cached even if headers say not to cache
170//!     client.get("https://httpbin.org/uuid").send().await?;
171//!     Ok(())
172//! }
173//! ```
174//!
175//! ## Per-Request Cache Control
176//!
177//! Override the cache mode on individual requests:
178//!
179//! ```no_run
180//! use reqwest::Client;
181//! use reqwest_middleware::ClientBuilder;
182//! use http_cache_reqwest::{Cache, CacheMode, CACacheManager, HttpCache, HttpCacheOptions};
183//!
184//! #[tokio::main]
185//! async fn main() -> reqwest_middleware::Result<()> {
186//!     let client = ClientBuilder::new(Client::new())
187//!         .with(Cache(HttpCache {
188//!             mode: CacheMode::Default,
189//!             manager: CACacheManager::new("./cache".into(), true),
190//!             options: HttpCacheOptions::default(),
191//!         }))
192//!         .build();
193//!     
194//!     // Override cache mode for this specific request
195//!     let response = client.get("https://httpbin.org/uuid")
196//!         .with_extension(CacheMode::OnlyIfCached) // Only serve from cache
197//!         .send()
198//!         .await?;
199//!         
200//!     // This request bypasses cache completely
201//!     let fresh_response = client.get("https://httpbin.org/uuid")
202//!         .with_extension(CacheMode::NoStore)
203//!         .send()
204//!         .await?;
205//!         
206//!     Ok(())
207//! }
208//! ```
209//!
210//! ## Custom Cache Keys
211//!
212//! Customize how cache keys are generated:
213//!
214//! ```no_run
215//! use reqwest::Client;
216//! use reqwest_middleware::ClientBuilder;
217//! use http_cache_reqwest::{Cache, CacheMode, CACacheManager, HttpCache, HttpCacheOptions};
218//! use std::sync::Arc;
219//!
220//! #[tokio::main]
221//! async fn main() -> reqwest_middleware::Result<()> {
222//!     let options = HttpCacheOptions {
223//!         cache_key: Some(Arc::new(|req: &http::request::Parts| {
224//!             // Include query parameters in cache key
225//!             format!("{}:{}", req.method, req.uri)
226//!         })),
227//!         ..Default::default()
228//!     };
229//!     
230//!     let client = ClientBuilder::new(Client::new())
231//!         .with(Cache(HttpCache {
232//!             mode: CacheMode::Default,
233//!             manager: CACacheManager::new("./cache".into(), true),
234//!             options,
235//!         }))
236//!         .build();
237//!         
238//!     Ok(())
239//! }
240//! ```
241//!
242//! ## In-Memory Caching
243//!
244//! Use the Moka in-memory cache:
245//!
246//! ```no_run
247//! # #[cfg(feature = "manager-moka")]
248//! use reqwest::Client;
249//! # #[cfg(feature = "manager-moka")]
250//! use reqwest_middleware::ClientBuilder;
251//! # #[cfg(feature = "manager-moka")]
252//! use http_cache_reqwest::{Cache, CacheMode, MokaManager, HttpCache, HttpCacheOptions};
253//! # #[cfg(feature = "manager-moka")]
254//! use http_cache_reqwest::MokaCache;
255//!
256//! # #[cfg(feature = "manager-moka")]
257//! #[tokio::main]
258//! async fn main() -> reqwest_middleware::Result<()> {
259//!     let client = ClientBuilder::new(Client::new())
260//!         .with(Cache(HttpCache {
261//!             mode: CacheMode::Default,
262//!             manager: MokaManager::new(MokaCache::new(1000)), // Max 1000 entries
263//!             options: HttpCacheOptions::default(),
264//!         }))
265//!         .build();
266//!         
267//!     Ok(())
268//! }
269//! # #[cfg(not(feature = "manager-moka"))]
270//! # fn main() {}
271//! ```
272// Re-export unified error types from http-cache core
273pub use http_cache::{BadRequest, HttpCacheError};
274
275#[cfg(feature = "streaming")]
276/// Type alias for reqwest streaming errors, using the unified streaming error system
277pub type ReqwestStreamingError = http_cache::ClientStreamingError;
278
279#[cfg(feature = "streaming")]
280use http_cache::StreamingCacheManager;
281
282use std::{convert::TryInto, str::FromStr, time::SystemTime};
283
284pub use http::request::Parts;
285use http::{
286    header::{HeaderName, CACHE_CONTROL},
287    Extensions, HeaderValue, Method,
288};
289use http_cache::{
290    BoxError, HitOrMiss, Middleware, Result, XCACHE, XCACHELOOKUP,
291};
292use http_cache_semantics::CachePolicy;
293use reqwest::{Request, Response, ResponseBuilderExt};
294use reqwest_middleware::{Error, Next};
295
296/// Helper function to convert our error types to reqwest middleware errors
297fn to_middleware_error<E: std::error::Error + Send + Sync + 'static>(
298    error: E,
299) -> Error {
300    // Convert to anyhow::Error which is what reqwest-middleware expects
301    Error::Middleware(anyhow::Error::new(error))
302}
303use url::Url;
304
305pub use http_cache::{
306    CacheManager, CacheMode, CacheOptions, HttpCache, HttpCacheMetadata,
307    HttpCacheOptions, HttpResponse, MetadataProvider, ResponseCacheModeFn,
308};
309
310#[cfg(feature = "streaming")]
311// Re-export streaming types for future use
312pub use http_cache::{
313    HttpCacheStreamInterface, HttpStreamingCache, StreamingBody,
314    StreamingManager,
315};
316
317#[cfg(feature = "manager-cacache")]
318#[cfg_attr(docsrs, doc(cfg(feature = "manager-cacache")))]
319pub use http_cache::CACacheManager;
320
321#[cfg(feature = "manager-moka")]
322#[cfg_attr(docsrs, doc(cfg(feature = "manager-moka")))]
323pub use http_cache::{MokaCache, MokaCacheBuilder, MokaManager};
324
325#[cfg(feature = "rate-limiting")]
326#[cfg_attr(docsrs, doc(cfg(feature = "rate-limiting")))]
327pub use http_cache::rate_limiting::{
328    CacheAwareRateLimiter, DirectRateLimiter, DomainRateLimiter, Quota,
329};
330
331/// Wrapper for [`HttpCache`]
332#[derive(Debug)]
333pub struct Cache<T: CacheManager>(pub HttpCache<T>);
334
335#[cfg(feature = "streaming")]
336/// Streaming cache wrapper that implements reqwest middleware for streaming responses
337#[derive(Debug, Clone)]
338pub struct StreamingCache<T: StreamingCacheManager> {
339    cache: HttpStreamingCache<T>,
340}
341
342#[cfg(feature = "streaming")]
343impl<T: StreamingCacheManager> StreamingCache<T> {
344    /// Create a new streaming cache with the given manager and mode
345    pub fn new(manager: T, mode: CacheMode) -> Self {
346        Self {
347            cache: HttpStreamingCache {
348                mode,
349                manager,
350                options: HttpCacheOptions::default(),
351            },
352        }
353    }
354
355    /// Create a new streaming cache with custom options
356    pub fn with_options(
357        manager: T,
358        mode: CacheMode,
359        options: HttpCacheOptions,
360    ) -> Self {
361        Self { cache: HttpStreamingCache { mode, manager, options } }
362    }
363}
364
365/// Implements ['Middleware'] for reqwest
366pub(crate) struct ReqwestMiddleware<'a> {
367    pub req: Request,
368    pub next: Next<'a>,
369    pub extensions: &'a mut Extensions,
370}
371
372fn clone_req(request: &Request) -> std::result::Result<Request, Error> {
373    match request.try_clone() {
374        Some(r) => Ok(r),
375        None => Err(to_middleware_error(BadRequest)),
376    }
377}
378
379#[async_trait::async_trait]
380impl Middleware for ReqwestMiddleware<'_> {
381    fn overridden_cache_mode(&self) -> Option<CacheMode> {
382        self.extensions.get().cloned()
383    }
384    fn is_method_get_head(&self) -> bool {
385        self.req.method() == Method::GET || self.req.method() == Method::HEAD
386    }
387    fn policy(&self, response: &HttpResponse) -> Result<CachePolicy> {
388        Ok(CachePolicy::new(&self.parts()?, &response.parts()?))
389    }
390    fn policy_with_options(
391        &self,
392        response: &HttpResponse,
393        options: CacheOptions,
394    ) -> Result<CachePolicy> {
395        Ok(CachePolicy::new_options(
396            &self.parts()?,
397            &response.parts()?,
398            SystemTime::now(),
399            options,
400        ))
401    }
402    fn update_headers(&mut self, parts: &Parts) -> Result<()> {
403        for header in parts.headers.iter() {
404            self.req.headers_mut().insert(header.0.clone(), header.1.clone());
405        }
406        Ok(())
407    }
408    fn force_no_cache(&mut self) -> Result<()> {
409        self.req
410            .headers_mut()
411            .insert(CACHE_CONTROL, HeaderValue::from_str("no-cache")?);
412        Ok(())
413    }
414    fn parts(&self) -> Result<Parts> {
415        // Extract request parts without cloning the body
416        let mut builder = http::Request::builder()
417            .method(self.req.method().as_str())
418            .uri(self.req.url().as_str())
419            .version(self.req.version());
420
421        // Add headers
422        for (name, value) in self.req.headers() {
423            builder = builder.header(name, value);
424        }
425
426        // Add extensions
427        if let Some(no_error) = builder.extensions_mut() {
428            *no_error = self.extensions.clone();
429        }
430
431        // Build with empty body just to get the Parts
432        let http_req = builder.body(()).map_err(Box::new)?;
433        Ok(http_req.into_parts().0)
434    }
435    fn url(&self) -> Result<Url> {
436        Ok(self.req.url().clone())
437    }
438    fn method(&self) -> Result<String> {
439        Ok(self.req.method().as_ref().to_string())
440    }
441    async fn remote_fetch(&mut self) -> Result<HttpResponse> {
442        let copied_req = clone_req(&self.req)?;
443        let res = self
444            .next
445            .clone()
446            .run(copied_req, self.extensions)
447            .await
448            .map_err(BoxError::from)?;
449        let headers = res.headers().into();
450        let url = res.url().clone();
451        let status = res.status().into();
452        let version = res.version();
453        let body: Vec<u8> = res.bytes().await.map_err(BoxError::from)?.to_vec();
454        Ok(HttpResponse {
455            body,
456            headers,
457            status,
458            url,
459            version: version.try_into()?,
460            metadata: None,
461        })
462    }
463}
464
465// Converts an [`HttpResponse`] to a reqwest [`Response`]
466fn convert_response(response: HttpResponse) -> Result<Response> {
467    let metadata = response.metadata.clone();
468    let mut ret_res = http::Response::builder()
469        .status(response.status)
470        .url(response.url)
471        .version(response.version.into())
472        .body(response.body)?;
473    for header in response.headers {
474        ret_res.headers_mut().insert(
475            HeaderName::from_str(&header.0)?,
476            HeaderValue::from_str(&header.1)?,
477        );
478    }
479    // Insert metadata into response extensions if present
480    if let Some(metadata) = metadata {
481        ret_res.extensions_mut().insert(HttpCacheMetadata::from(metadata));
482    }
483    Ok(Response::from(ret_res))
484}
485
486#[cfg(feature = "streaming")]
487// Converts a reqwest Response to an http::Response with Full body for streaming cache processing
488async fn convert_reqwest_response_to_http_full_body(
489    response: Response,
490) -> Result<http::Response<http_body_util::Full<bytes::Bytes>>> {
491    let status = response.status();
492    let version = response.version();
493    let headers = response.headers().clone();
494    let body_bytes = response.bytes().await.map_err(BoxError::from)?;
495
496    let mut http_response =
497        http::Response::builder().status(status).version(version);
498
499    for (name, value) in headers.iter() {
500        http_response = http_response.header(name, value);
501    }
502
503    http_response
504        .body(http_body_util::Full::new(body_bytes))
505        .map_err(BoxError::from)
506}
507
508#[cfg(feature = "streaming")]
509// Converts reqwest Response to http response parts (for 304 handling)
510fn convert_reqwest_response_to_http_parts(
511    response: Response,
512) -> Result<(http::response::Parts, ())> {
513    let status = response.status();
514    let version = response.version();
515    let headers = response.headers();
516
517    let mut http_response =
518        http::Response::builder().status(status).version(version);
519
520    for (name, value) in headers.iter() {
521        http_response = http_response.header(name, value);
522    }
523
524    let response = http_response.body(()).map_err(BoxError::from)?;
525    Ok(response.into_parts())
526}
527
528#[cfg(feature = "streaming")]
529// Helper function to add cache status headers to a streaming response
530fn add_cache_status_headers_to_response<T>(
531    mut response: http::Response<T>,
532    hit_or_miss: &str,
533    cache_lookup: &str,
534) -> http::Response<T> {
535    use http::HeaderValue;
536    use http_cache::{XCACHE, XCACHELOOKUP};
537
538    let headers = response.headers_mut();
539    if let Ok(value1) = HeaderValue::from_str(hit_or_miss) {
540        headers.insert(XCACHE, value1);
541    }
542    if let Ok(value2) = HeaderValue::from_str(cache_lookup) {
543        headers.insert(XCACHELOOKUP, value2);
544    }
545    response
546}
547
548#[cfg(feature = "streaming")]
549// Converts a streaming response to reqwest Response using the StreamingCacheManager's method
550async fn convert_streaming_body_to_reqwest<T>(
551    response: http::Response<T::Body>,
552) -> Result<Response>
553where
554    T: StreamingCacheManager,
555    <T::Body as http_body::Body>::Data: Send,
556    <T::Body as http_body::Body>::Error: Send + Sync + 'static,
557{
558    let (parts, body) = response.into_parts();
559
560    // Use the cache manager's body_to_bytes_stream method for streaming
561    let bytes_stream = T::body_to_bytes_stream(body);
562
563    // Use reqwest's Body::wrap_stream to create a streaming body
564    let reqwest_body = reqwest::Body::wrap_stream(bytes_stream);
565
566    let mut http_response =
567        http::Response::builder().status(parts.status).version(parts.version);
568
569    for (name, value) in parts.headers.iter() {
570        http_response = http_response.header(name, value);
571    }
572
573    let response = http_response.body(reqwest_body)?;
574    Ok(Response::from(response))
575}
576
577fn bad_header(e: reqwest::header::InvalidHeaderValue) -> Error {
578    to_middleware_error(HttpCacheError::Cache(e.to_string()))
579}
580
581fn from_box_error(e: BoxError) -> Error {
582    to_middleware_error(HttpCacheError::Cache(e.to_string()))
583}
584
585#[async_trait::async_trait]
586impl<T: CacheManager> reqwest_middleware::Middleware for Cache<T> {
587    async fn handle(
588        &self,
589        req: Request,
590        extensions: &mut Extensions,
591        next: Next<'_>,
592    ) -> std::result::Result<Response, Error> {
593        let mut middleware = ReqwestMiddleware { req, next, extensions };
594        let can_cache =
595            self.0.can_cache_request(&middleware).map_err(from_box_error)?;
596
597        if can_cache {
598            let res = self.0.run(middleware).await.map_err(from_box_error)?;
599            let converted = convert_response(res).map_err(|e| {
600                to_middleware_error(HttpCacheError::Cache(e.to_string()))
601            })?;
602            Ok(converted)
603        } else {
604            self.0
605                .run_no_cache(&mut middleware)
606                .await
607                .map_err(from_box_error)?;
608            let mut res = middleware
609                .next
610                .run(middleware.req, middleware.extensions)
611                .await?;
612
613            let miss =
614                HeaderValue::from_str(HitOrMiss::MISS.to_string().as_ref())
615                    .map_err(bad_header)?;
616            res.headers_mut().insert(XCACHE, miss.clone());
617            res.headers_mut().insert(XCACHELOOKUP, miss);
618            Ok(res)
619        }
620    }
621}
622
623#[cfg(feature = "streaming")]
624#[async_trait::async_trait]
625impl<T: StreamingCacheManager> reqwest_middleware::Middleware
626    for StreamingCache<T>
627where
628    T::Body: Send + 'static,
629    <T::Body as http_body::Body>::Data: Send,
630    <T::Body as http_body::Body>::Error:
631        Into<http_cache::StreamingError> + Send + Sync + 'static,
632{
633    async fn handle(
634        &self,
635        req: Request,
636        extensions: &mut Extensions,
637        next: Next<'_>,
638    ) -> std::result::Result<Response, Error> {
639        use http_cache::HttpCacheStreamInterface;
640
641        // Convert reqwest Request to http::Request for analysis
642        // If the request can't be cloned (e.g., streaming body), bypass cache gracefully
643        let copied_req = match clone_req(&req) {
644            Ok(req) => req,
645            Err(_) => {
646                // Request has non-cloneable body (streaming/multipart), bypass cache
647                let response = next.run(req, extensions).await?;
648                return Ok(response);
649            }
650        };
651        let http_req = match http::Request::try_from(copied_req) {
652            Ok(r) => r,
653            Err(e) => {
654                return Err(to_middleware_error(HttpCacheError::Cache(
655                    e.to_string(),
656                )))
657            }
658        };
659        let (parts, _) = http_req.into_parts();
660
661        // Check for mode override from extensions
662        let mode_override = extensions.get::<CacheMode>().cloned();
663
664        // Analyze the request for caching behavior
665        let analysis = match self.cache.analyze_request(&parts, mode_override) {
666            Ok(a) => a,
667            Err(e) => {
668                return Err(to_middleware_error(HttpCacheError::Cache(
669                    e.to_string(),
670                )))
671            }
672        };
673
674        // Check if we should bypass cache entirely
675        if !analysis.should_cache {
676            let response = next.run(req, extensions).await?;
677            return Ok(response);
678        }
679
680        // Look up cached response
681        if let Some((cached_response, policy)) = self
682            .cache
683            .lookup_cached_response(&analysis.cache_key)
684            .await
685            .map_err(|e| {
686                to_middleware_error(HttpCacheError::Cache(e.to_string()))
687            })?
688        {
689            // Check if cached response is still fresh
690            use http_cache_semantics::BeforeRequest;
691            let before_req = policy.before_request(&parts, SystemTime::now());
692            match before_req {
693                BeforeRequest::Fresh(_fresh_parts) => {
694                    // Convert cached streaming response back to reqwest Response
695                    // Now using streaming instead of buffering!
696                    let mut cached_response = cached_response;
697
698                    // Add cache status headers if enabled
699                    if self.cache.options.cache_status_headers {
700                        cached_response = add_cache_status_headers_to_response(
701                            cached_response,
702                            "HIT",
703                            "HIT",
704                        );
705                    }
706
707                    return convert_streaming_body_to_reqwest::<T>(
708                        cached_response,
709                    )
710                    .await
711                    .map_err(|e| {
712                        to_middleware_error(HttpCacheError::Cache(
713                            e.to_string(),
714                        ))
715                    });
716                }
717                BeforeRequest::Stale { request: conditional_parts, .. } => {
718                    // Apply rate limiting before revalidation request
719                    #[cfg(feature = "rate-limiting")]
720                    if let Some(rate_limiter) = &self.cache.options.rate_limiter
721                    {
722                        let url = req.url().clone();
723                        let rate_limit_key =
724                            url.host_str().unwrap_or("unknown");
725                        rate_limiter.until_key_ready(rate_limit_key).await;
726                    }
727
728                    // Create conditional request
729                    let mut conditional_req = req;
730                    for (name, value) in conditional_parts.headers.iter() {
731                        conditional_req
732                            .headers_mut()
733                            .insert(name.clone(), value.clone());
734                    }
735
736                    let conditional_response =
737                        next.run(conditional_req, extensions).await?;
738
739                    if conditional_response.status() == 304 {
740                        // Convert reqwest response parts for handling not modified
741                        let (fresh_parts, _) =
742                            convert_reqwest_response_to_http_parts(
743                                conditional_response,
744                            )
745                            .map_err(|e| {
746                                to_middleware_error(HttpCacheError::Cache(
747                                    e.to_string(),
748                                ))
749                            })?;
750                        let updated_response = self
751                            .cache
752                            .handle_not_modified(cached_response, &fresh_parts)
753                            .await
754                            .map_err(|e| {
755                                to_middleware_error(HttpCacheError::Cache(
756                                    e.to_string(),
757                                ))
758                            })?;
759
760                        let mut final_response = updated_response;
761
762                        // Add cache status headers if enabled
763                        if self.cache.options.cache_status_headers {
764                            final_response =
765                                add_cache_status_headers_to_response(
766                                    final_response,
767                                    "HIT",
768                                    "HIT",
769                                );
770                        }
771
772                        return convert_streaming_body_to_reqwest::<T>(
773                            final_response,
774                        )
775                        .await
776                        .map_err(|e| {
777                            to_middleware_error(HttpCacheError::Cache(
778                                e.to_string(),
779                            ))
780                        });
781                    } else {
782                        // Fresh response received, process it through the cache
783                        let http_response =
784                            convert_reqwest_response_to_http_full_body(
785                                conditional_response,
786                            )
787                            .await
788                            .map_err(|e| {
789                                to_middleware_error(HttpCacheError::Cache(
790                                    e.to_string(),
791                                ))
792                            })?;
793                        let cached_response = self
794                            .cache
795                            .process_response(analysis, http_response, None)
796                            .await
797                            .map_err(|e| {
798                                to_middleware_error(HttpCacheError::Cache(
799                                    e.to_string(),
800                                ))
801                            })?;
802
803                        let mut final_response = cached_response;
804
805                        // Add cache status headers if enabled
806                        if self.cache.options.cache_status_headers {
807                            final_response =
808                                add_cache_status_headers_to_response(
809                                    final_response,
810                                    "MISS",
811                                    "MISS",
812                                );
813                        }
814
815                        return convert_streaming_body_to_reqwest::<T>(
816                            final_response,
817                        )
818                        .await
819                        .map_err(|e| {
820                            to_middleware_error(HttpCacheError::Cache(
821                                e.to_string(),
822                            ))
823                        });
824                    }
825                }
826            }
827        }
828
829        // Apply rate limiting before fresh request
830        #[cfg(feature = "rate-limiting")]
831        if let Some(rate_limiter) = &self.cache.options.rate_limiter {
832            let url = req.url().clone();
833            let rate_limit_key = url.host_str().unwrap_or("unknown");
834            rate_limiter.until_key_ready(rate_limit_key).await;
835        }
836
837        // Fetch fresh response from upstream
838        let response = next.run(req, extensions).await?;
839        let http_response =
840            convert_reqwest_response_to_http_full_body(response)
841                .await
842                .map_err(|e| {
843                    to_middleware_error(HttpCacheError::Cache(e.to_string()))
844                })?;
845
846        // Process and potentially cache the response
847        let cached_response = self
848            .cache
849            .process_response(analysis, http_response, None)
850            .await
851            .map_err(|e| {
852                to_middleware_error(HttpCacheError::Cache(e.to_string()))
853            })?;
854
855        let mut final_response = cached_response;
856
857        // Add cache status headers if enabled
858        if self.cache.options.cache_status_headers {
859            final_response = add_cache_status_headers_to_response(
860                final_response,
861                "MISS",
862                "MISS",
863            );
864        }
865
866        convert_streaming_body_to_reqwest::<T>(final_response).await.map_err(
867            |e| to_middleware_error(HttpCacheError::Cache(e.to_string())),
868        )
869    }
870}
871
872#[cfg(test)]
873mod test;