http_cache_reqwest/
lib.rs

1#![forbid(unsafe_code, future_incompatible)]
2#![deny(
3    missing_docs,
4    missing_debug_implementations,
5    missing_copy_implementations,
6    nonstandard_style,
7    unused_qualifications,
8    unused_import_braces,
9    unused_extern_crates,
10    trivial_casts,
11    trivial_numeric_casts
12)]
13#![allow(clippy::doc_lazy_continuation)]
14#![cfg_attr(docsrs, feature(doc_cfg))]
15//! # http-cache-reqwest
16//!
17//! HTTP caching middleware for the [reqwest] HTTP client.
18//!
19//! This middleware implements HTTP caching according to RFC 7234 for the reqwest HTTP client library.
20//! It works as part of the [reqwest-middleware] ecosystem to provide caching capabilities.
21//!
22//! ```no_run
23//! use reqwest::Client;
24//! use reqwest_middleware::{ClientBuilder, Result};
25//! use http_cache_reqwest::{Cache, CacheMode, CACacheManager, HttpCache, HttpCacheOptions};
26//!
27//! #[tokio::main]
28//! async fn main() -> Result<()> {
29//!     let client = ClientBuilder::new(Client::new())
30//!         .with(Cache(HttpCache {
31//!             mode: CacheMode::Default,
32//!             manager: CACacheManager::new("./cache".into(), true),
33//!             options: HttpCacheOptions::default(),
34//!         }))
35//!         .build();
36//!     
37//!     // This request will be cached according to response headers
38//!     let response = client
39//!         .get("https://developer.mozilla.org/en-US/docs/Web/HTTP/Caching")
40//!         .send()
41//!         .await?;
42//!     println!("Status: {}", response.status());
43//!     
44//!     // Subsequent identical requests may be served from cache
45//!     let cached_response = client
46//!         .get("https://developer.mozilla.org/en-US/docs/Web/HTTP/Caching")
47//!         .send()
48//!         .await?;
49//!     println!("Cached status: {}", cached_response.status());
50//!     
51//!     Ok(())
52//! }
53//! ```
54//!
55//! ## Streaming Support
56//!
57//! The `StreamingCache` provides streaming support for large responses without buffering
58//! them entirely in memory. This is particularly useful for downloading large files or
59//! processing streaming APIs while still benefiting from HTTP caching.
60//!
61//! **Note**: Requires the `streaming` feature and a compatible cache manager that implements
62//! [`StreamingCacheManager`]. Currently only the `StreamingCacheManager` supports streaming -
63//! `CACacheManager` and `MokaManager` do not support streaming and will buffer responses
64//! in memory. The streaming implementation achieves significant memory savings
65//! (typically 35-40% reduction) compared to traditional buffered approaches.
66//!
67//! ```no_run
68//! # #[cfg(feature = "streaming")]
69//! use reqwest::Client;
70//! # #[cfg(feature = "streaming")]
71//! use reqwest_middleware::ClientBuilder;
72//! # #[cfg(feature = "streaming")]
73//! use http_cache_reqwest::{StreamingCache, CacheMode};
74//! # #[cfg(feature = "streaming")]
75//! use http_cache::StreamingManager;
76//!
77//! # #[cfg(feature = "streaming")]
78//! #[tokio::main]
79//! async fn main() -> reqwest_middleware::Result<()> {
80//!     let client = ClientBuilder::new(Client::new())
81//!         .with(StreamingCache::new(
82//!             StreamingManager::new("./cache".into()),
83//!             CacheMode::Default,
84//!         ))
85//!         .build();
86//!         
87//!     // Stream large responses efficiently - cached responses are also streamed
88//!     let response = client
89//!         .get("https://httpbin.org/stream/1000")
90//!         .send()
91//!         .await?;
92//!     println!("Status: {}", response.status());
93//!     
94//!     // Process the streaming body chunk by chunk
95//!     use futures_util::StreamExt;
96//!     let mut stream = response.bytes_stream();
97//!     while let Some(chunk) = stream.next().await {
98//!         let chunk = chunk?;
99//!         println!("Received chunk of {} bytes", chunk.len());
100//!         // Process chunk without loading entire response into memory
101//!     }
102//!     
103//!     Ok(())
104//! }
105//! # #[cfg(not(feature = "streaming"))]
106//! # fn main() {}
107//! ```
108//!
109//! ### Streaming Cache with Custom Options
110//!
111//! ```no_run
112//! # #[cfg(feature = "streaming")]
113//! use reqwest::Client;
114//! # #[cfg(feature = "streaming")]
115//! use reqwest_middleware::ClientBuilder;
116//! # #[cfg(feature = "streaming")]
117//! use http_cache_reqwest::{StreamingCache, CacheMode, HttpCacheOptions};
118//! # #[cfg(feature = "streaming")]
119//! use http_cache::StreamingManager;
120//!
121//! # #[cfg(feature = "streaming")]
122//! #[tokio::main]
123//! async fn main() -> reqwest_middleware::Result<()> {
124//!     let options = HttpCacheOptions {
125//!         cache_bust: Some(std::sync::Arc::new(|req: &http::request::Parts, _cache_key: &Option<std::sync::Arc<dyn Fn(&http::request::Parts) -> String + Send + Sync>>, _uri: &str| {
126//!             // Custom cache busting logic for streaming requests
127//!             if req.uri.path().contains("/stream/") {
128//!                 vec![format!("stream:{}", req.uri)]
129//!             } else {
130//!                 vec![]
131//!             }
132//!         })),
133//!         ..Default::default()
134//!     };
135//!
136//!     let client = ClientBuilder::new(Client::new())
137//!         .with(StreamingCache::with_options(
138//!             StreamingManager::new("./cache".into()),
139//!             CacheMode::Default,
140//!             options,
141//!         ))
142//!         .build();
143//!         
144//!     Ok(())
145//! }
146//! # #[cfg(not(feature = "streaming"))]
147//! # fn main() {}
148//! ```
149//!
150//! ## Cache Modes
151//!
152//! Control caching behavior with different modes:
153//!
154//! ```no_run
155//! use reqwest::Client;
156//! use reqwest_middleware::ClientBuilder;
157//! use http_cache_reqwest::{Cache, CacheMode, CACacheManager, HttpCache, HttpCacheOptions};
158//!
159//! #[tokio::main]
160//! async fn main() -> reqwest_middleware::Result<()> {
161//!     let client = ClientBuilder::new(Client::new())
162//!         .with(Cache(HttpCache {
163//!             mode: CacheMode::ForceCache, // Cache everything, ignore headers
164//!             manager: CACacheManager::new("./cache".into(), true),
165//!             options: HttpCacheOptions::default(),
166//!         }))
167//!         .build();
168//!     
169//!     // This will be cached even if headers say not to cache
170//!     client.get("https://httpbin.org/uuid").send().await?;
171//!     Ok(())
172//! }
173//! ```
174//!
175//! ## Per-Request Cache Control
176//!
177//! Override the cache mode on individual requests:
178//!
179//! ```no_run
180//! use reqwest::Client;
181//! use reqwest_middleware::ClientBuilder;
182//! use http_cache_reqwest::{Cache, CacheMode, CACacheManager, HttpCache, HttpCacheOptions};
183//!
184//! #[tokio::main]
185//! async fn main() -> reqwest_middleware::Result<()> {
186//!     let client = ClientBuilder::new(Client::new())
187//!         .with(Cache(HttpCache {
188//!             mode: CacheMode::Default,
189//!             manager: CACacheManager::new("./cache".into(), true),
190//!             options: HttpCacheOptions::default(),
191//!         }))
192//!         .build();
193//!     
194//!     // Override cache mode for this specific request
195//!     let response = client.get("https://httpbin.org/uuid")
196//!         .with_extension(CacheMode::OnlyIfCached) // Only serve from cache
197//!         .send()
198//!         .await?;
199//!         
200//!     // This request bypasses cache completely
201//!     let fresh_response = client.get("https://httpbin.org/uuid")
202//!         .with_extension(CacheMode::NoStore)
203//!         .send()
204//!         .await?;
205//!         
206//!     Ok(())
207//! }
208//! ```
209//!
210//! ## Custom Cache Keys
211//!
212//! Customize how cache keys are generated:
213//!
214//! ```no_run
215//! use reqwest::Client;
216//! use reqwest_middleware::ClientBuilder;
217//! use http_cache_reqwest::{Cache, CacheMode, CACacheManager, HttpCache, HttpCacheOptions};
218//! use std::sync::Arc;
219//!
220//! #[tokio::main]
221//! async fn main() -> reqwest_middleware::Result<()> {
222//!     let options = HttpCacheOptions {
223//!         cache_key: Some(Arc::new(|req: &http::request::Parts| {
224//!             // Include query parameters in cache key
225//!             format!("{}:{}", req.method, req.uri)
226//!         })),
227//!         ..Default::default()
228//!     };
229//!     
230//!     let client = ClientBuilder::new(Client::new())
231//!         .with(Cache(HttpCache {
232//!             mode: CacheMode::Default,
233//!             manager: CACacheManager::new("./cache".into(), true),
234//!             options,
235//!         }))
236//!         .build();
237//!         
238//!     Ok(())
239//! }
240//! ```
241//!
242//! ## In-Memory Caching
243//!
244//! Use the Moka in-memory cache:
245//!
246//! ```no_run
247//! # #[cfg(feature = "manager-moka")]
248//! use reqwest::Client;
249//! # #[cfg(feature = "manager-moka")]
250//! use reqwest_middleware::ClientBuilder;
251//! # #[cfg(feature = "manager-moka")]
252//! use http_cache_reqwest::{Cache, CacheMode, MokaManager, HttpCache, HttpCacheOptions};
253//! # #[cfg(feature = "manager-moka")]
254//! use http_cache_reqwest::MokaCache;
255//!
256//! # #[cfg(feature = "manager-moka")]
257//! #[tokio::main]
258//! async fn main() -> reqwest_middleware::Result<()> {
259//!     let client = ClientBuilder::new(Client::new())
260//!         .with(Cache(HttpCache {
261//!             mode: CacheMode::Default,
262//!             manager: MokaManager::new(MokaCache::new(1000)), // Max 1000 entries
263//!             options: HttpCacheOptions::default(),
264//!         }))
265//!         .build();
266//!         
267//!     Ok(())
268//! }
269//! # #[cfg(not(feature = "manager-moka"))]
270//! # fn main() {}
271//! ```
272// Re-export unified error types from http-cache core
273pub use http_cache::{BadRequest, HttpCacheError};
274
275#[cfg(feature = "streaming")]
276/// Type alias for reqwest streaming errors, using the unified streaming error system
277pub type ReqwestStreamingError = http_cache::ClientStreamingError;
278
279#[cfg(feature = "streaming")]
280use http_cache::StreamingCacheManager;
281
282use std::{
283    collections::HashMap, convert::TryInto, str::FromStr, time::SystemTime,
284};
285
286pub use http::request::Parts;
287use http::{
288    header::{HeaderName, CACHE_CONTROL},
289    Extensions, HeaderValue, Method,
290};
291use http_cache::{
292    BoxError, HitOrMiss, Middleware, Result, XCACHE, XCACHELOOKUP,
293};
294use http_cache_semantics::CachePolicy;
295use reqwest::{Request, Response, ResponseBuilderExt};
296use reqwest_middleware::{Error, Next};
297
298/// Helper function to convert our error types to reqwest middleware errors
299fn to_middleware_error<E: std::error::Error + Send + Sync + 'static>(
300    error: E,
301) -> Error {
302    // Convert to anyhow::Error which is what reqwest-middleware expects
303    Error::Middleware(anyhow::Error::new(error))
304}
305use url::Url;
306
307pub use http_cache::{
308    CacheManager, CacheMode, CacheOptions, HttpCache, HttpCacheOptions,
309    HttpResponse, ResponseCacheModeFn,
310};
311
312#[cfg(feature = "streaming")]
313// Re-export streaming types for future use
314pub use http_cache::{
315    HttpCacheStreamInterface, HttpStreamingCache, StreamingBody,
316    StreamingManager,
317};
318
319#[cfg(feature = "manager-cacache")]
320#[cfg_attr(docsrs, doc(cfg(feature = "manager-cacache")))]
321pub use http_cache::CACacheManager;
322
323#[cfg(feature = "manager-moka")]
324#[cfg_attr(docsrs, doc(cfg(feature = "manager-moka")))]
325pub use http_cache::{MokaCache, MokaCacheBuilder, MokaManager};
326
327#[cfg(feature = "rate-limiting")]
328#[cfg_attr(docsrs, doc(cfg(feature = "rate-limiting")))]
329pub use http_cache::rate_limiting::{
330    CacheAwareRateLimiter, DirectRateLimiter, DomainRateLimiter, Quota,
331};
332
333/// Wrapper for [`HttpCache`]
334#[derive(Debug)]
335pub struct Cache<T: CacheManager>(pub HttpCache<T>);
336
337#[cfg(feature = "streaming")]
338/// Streaming cache wrapper that implements reqwest middleware for streaming responses
339#[derive(Debug, Clone)]
340pub struct StreamingCache<T: StreamingCacheManager> {
341    cache: HttpStreamingCache<T>,
342}
343
344#[cfg(feature = "streaming")]
345impl<T: StreamingCacheManager> StreamingCache<T> {
346    /// Create a new streaming cache with the given manager and mode
347    pub fn new(manager: T, mode: CacheMode) -> Self {
348        Self {
349            cache: HttpStreamingCache {
350                mode,
351                manager,
352                options: HttpCacheOptions::default(),
353            },
354        }
355    }
356
357    /// Create a new streaming cache with custom options
358    pub fn with_options(
359        manager: T,
360        mode: CacheMode,
361        options: HttpCacheOptions,
362    ) -> Self {
363        Self { cache: HttpStreamingCache { mode, manager, options } }
364    }
365}
366
367/// Implements ['Middleware'] for reqwest
368pub(crate) struct ReqwestMiddleware<'a> {
369    pub req: Request,
370    pub next: Next<'a>,
371    pub extensions: &'a mut Extensions,
372}
373
374fn clone_req(request: &Request) -> std::result::Result<Request, Error> {
375    match request.try_clone() {
376        Some(r) => Ok(r),
377        None => Err(to_middleware_error(BadRequest)),
378    }
379}
380
381#[async_trait::async_trait]
382impl Middleware for ReqwestMiddleware<'_> {
383    fn overridden_cache_mode(&self) -> Option<CacheMode> {
384        self.extensions.get().cloned()
385    }
386    fn is_method_get_head(&self) -> bool {
387        self.req.method() == Method::GET || self.req.method() == Method::HEAD
388    }
389    fn policy(&self, response: &HttpResponse) -> Result<CachePolicy> {
390        Ok(CachePolicy::new(&self.parts()?, &response.parts()?))
391    }
392    fn policy_with_options(
393        &self,
394        response: &HttpResponse,
395        options: CacheOptions,
396    ) -> Result<CachePolicy> {
397        Ok(CachePolicy::new_options(
398            &self.parts()?,
399            &response.parts()?,
400            SystemTime::now(),
401            options,
402        ))
403    }
404    fn update_headers(&mut self, parts: &Parts) -> Result<()> {
405        for header in parts.headers.iter() {
406            self.req.headers_mut().insert(header.0.clone(), header.1.clone());
407        }
408        Ok(())
409    }
410    fn force_no_cache(&mut self) -> Result<()> {
411        self.req
412            .headers_mut()
413            .insert(CACHE_CONTROL, HeaderValue::from_str("no-cache")?);
414        Ok(())
415    }
416    fn parts(&self) -> Result<Parts> {
417        // Extract request parts without cloning the body
418        let mut builder = http::Request::builder()
419            .method(self.req.method().as_str())
420            .uri(self.req.url().as_str())
421            .version(self.req.version());
422
423        // Add headers
424        for (name, value) in self.req.headers() {
425            builder = builder.header(name, value);
426        }
427
428        // Build with empty body just to get the Parts
429        let http_req = builder.body(()).map_err(Box::new)?;
430        Ok(http_req.into_parts().0)
431    }
432    fn url(&self) -> Result<Url> {
433        Ok(self.req.url().clone())
434    }
435    fn method(&self) -> Result<String> {
436        Ok(self.req.method().as_ref().to_string())
437    }
438    async fn remote_fetch(&mut self) -> Result<HttpResponse> {
439        let copied_req = clone_req(&self.req)?;
440        let res = self
441            .next
442            .clone()
443            .run(copied_req, self.extensions)
444            .await
445            .map_err(BoxError::from)?;
446        let mut headers = HashMap::new();
447        for header in res.headers() {
448            headers.insert(
449                header.0.as_str().to_owned(),
450                header.1.to_str()?.to_owned(),
451            );
452        }
453        let url = res.url().clone();
454        let status = res.status().into();
455        let version = res.version();
456        let body: Vec<u8> = res.bytes().await.map_err(BoxError::from)?.to_vec();
457        Ok(HttpResponse {
458            body,
459            headers,
460            status,
461            url,
462            version: version.try_into()?,
463        })
464    }
465}
466
467// Converts an [`HttpResponse`] to a reqwest [`Response`]
468fn convert_response(response: HttpResponse) -> Result<Response> {
469    let mut ret_res = http::Response::builder()
470        .status(response.status)
471        .url(response.url)
472        .version(response.version.into())
473        .body(response.body)?;
474    for header in response.headers {
475        ret_res.headers_mut().insert(
476            HeaderName::from_str(&header.0)?,
477            HeaderValue::from_str(&header.1)?,
478        );
479    }
480    Ok(Response::from(ret_res))
481}
482
483#[cfg(feature = "streaming")]
484// Converts a reqwest Response to an http::Response with Full body for streaming cache processing
485async fn convert_reqwest_response_to_http_full_body(
486    response: Response,
487) -> Result<http::Response<http_body_util::Full<bytes::Bytes>>> {
488    let status = response.status();
489    let version = response.version();
490    let headers = response.headers().clone();
491    let body_bytes = response.bytes().await.map_err(BoxError::from)?;
492
493    let mut http_response =
494        http::Response::builder().status(status).version(version);
495
496    for (name, value) in headers.iter() {
497        http_response = http_response.header(name, value);
498    }
499
500    http_response
501        .body(http_body_util::Full::new(body_bytes))
502        .map_err(BoxError::from)
503}
504
505#[cfg(feature = "streaming")]
506// Converts reqwest Response to http response parts (for 304 handling)
507fn convert_reqwest_response_to_http_parts(
508    response: Response,
509) -> Result<(http::response::Parts, ())> {
510    let status = response.status();
511    let version = response.version();
512    let headers = response.headers();
513
514    let mut http_response =
515        http::Response::builder().status(status).version(version);
516
517    for (name, value) in headers.iter() {
518        http_response = http_response.header(name, value);
519    }
520
521    let response = http_response.body(()).map_err(BoxError::from)?;
522    Ok(response.into_parts())
523}
524
525#[cfg(feature = "streaming")]
526// Helper function to add cache status headers to a streaming response
527fn add_cache_status_headers_to_response<T>(
528    mut response: http::Response<T>,
529    hit_or_miss: &str,
530    cache_lookup: &str,
531) -> http::Response<T> {
532    use http::HeaderValue;
533    use http_cache::{XCACHE, XCACHELOOKUP};
534
535    let headers = response.headers_mut();
536    if let Ok(value1) = HeaderValue::from_str(hit_or_miss) {
537        headers.insert(XCACHE, value1);
538    }
539    if let Ok(value2) = HeaderValue::from_str(cache_lookup) {
540        headers.insert(XCACHELOOKUP, value2);
541    }
542    response
543}
544
545#[cfg(feature = "streaming")]
546// Converts a streaming response to reqwest Response using the StreamingCacheManager's method
547async fn convert_streaming_body_to_reqwest<T>(
548    response: http::Response<T::Body>,
549) -> Result<Response>
550where
551    T: StreamingCacheManager,
552    <T::Body as http_body::Body>::Data: Send,
553    <T::Body as http_body::Body>::Error: Send + Sync + 'static,
554{
555    let (parts, body) = response.into_parts();
556
557    // Use the cache manager's body_to_bytes_stream method for streaming
558    let bytes_stream = T::body_to_bytes_stream(body);
559
560    // Use reqwest's Body::wrap_stream to create a streaming body
561    let reqwest_body = reqwest::Body::wrap_stream(bytes_stream);
562
563    let mut http_response =
564        http::Response::builder().status(parts.status).version(parts.version);
565
566    for (name, value) in parts.headers.iter() {
567        http_response = http_response.header(name, value);
568    }
569
570    let response = http_response.body(reqwest_body)?;
571    Ok(Response::from(response))
572}
573
574fn bad_header(e: reqwest::header::InvalidHeaderValue) -> Error {
575    to_middleware_error(HttpCacheError::Cache(e.to_string()))
576}
577
578fn from_box_error(e: BoxError) -> Error {
579    to_middleware_error(HttpCacheError::Cache(e.to_string()))
580}
581
582#[async_trait::async_trait]
583impl<T: CacheManager> reqwest_middleware::Middleware for Cache<T> {
584    async fn handle(
585        &self,
586        req: Request,
587        extensions: &mut Extensions,
588        next: Next<'_>,
589    ) -> std::result::Result<Response, Error> {
590        let mut middleware = ReqwestMiddleware { req, next, extensions };
591        let can_cache =
592            self.0.can_cache_request(&middleware).map_err(from_box_error)?;
593
594        if can_cache {
595            let res = self.0.run(middleware).await.map_err(from_box_error)?;
596            let converted = convert_response(res).map_err(|e| {
597                to_middleware_error(HttpCacheError::Cache(e.to_string()))
598            })?;
599            Ok(converted)
600        } else {
601            self.0
602                .run_no_cache(&mut middleware)
603                .await
604                .map_err(from_box_error)?;
605            let mut res = middleware
606                .next
607                .run(middleware.req, middleware.extensions)
608                .await?;
609
610            let miss =
611                HeaderValue::from_str(HitOrMiss::MISS.to_string().as_ref())
612                    .map_err(bad_header)?;
613            res.headers_mut().insert(XCACHE, miss.clone());
614            res.headers_mut().insert(XCACHELOOKUP, miss);
615            Ok(res)
616        }
617    }
618}
619
620#[cfg(feature = "streaming")]
621#[async_trait::async_trait]
622impl<T: StreamingCacheManager> reqwest_middleware::Middleware
623    for StreamingCache<T>
624where
625    T::Body: Send + 'static,
626    <T::Body as http_body::Body>::Data: Send,
627    <T::Body as http_body::Body>::Error:
628        Into<http_cache::StreamingError> + Send + Sync + 'static,
629{
630    async fn handle(
631        &self,
632        req: Request,
633        extensions: &mut Extensions,
634        next: Next<'_>,
635    ) -> std::result::Result<Response, Error> {
636        use http_cache::HttpCacheStreamInterface;
637
638        // Convert reqwest Request to http::Request for analysis
639        // If the request can't be cloned (e.g., streaming body), bypass cache gracefully
640        let copied_req = match clone_req(&req) {
641            Ok(req) => req,
642            Err(_) => {
643                // Request has non-cloneable body (streaming/multipart), bypass cache
644                let response = next.run(req, extensions).await?;
645                return Ok(response);
646            }
647        };
648        let http_req = match http::Request::try_from(copied_req) {
649            Ok(r) => r,
650            Err(e) => {
651                return Err(to_middleware_error(HttpCacheError::Cache(
652                    e.to_string(),
653                )))
654            }
655        };
656        let (parts, _) = http_req.into_parts();
657
658        // Check for mode override from extensions
659        let mode_override = extensions.get::<CacheMode>().cloned();
660
661        // Analyze the request for caching behavior
662        let analysis = match self.cache.analyze_request(&parts, mode_override) {
663            Ok(a) => a,
664            Err(e) => {
665                return Err(to_middleware_error(HttpCacheError::Cache(
666                    e.to_string(),
667                )))
668            }
669        };
670
671        // Check if we should bypass cache entirely
672        if !analysis.should_cache {
673            let response = next.run(req, extensions).await?;
674            return Ok(response);
675        }
676
677        // Look up cached response
678        if let Some((cached_response, policy)) = self
679            .cache
680            .lookup_cached_response(&analysis.cache_key)
681            .await
682            .map_err(|e| {
683                to_middleware_error(HttpCacheError::Cache(e.to_string()))
684            })?
685        {
686            // Check if cached response is still fresh
687            use http_cache_semantics::BeforeRequest;
688            let before_req = policy.before_request(&parts, SystemTime::now());
689            match before_req {
690                BeforeRequest::Fresh(_fresh_parts) => {
691                    // Convert cached streaming response back to reqwest Response
692                    // Now using streaming instead of buffering!
693                    let mut cached_response = cached_response;
694
695                    // Add cache status headers if enabled
696                    if self.cache.options.cache_status_headers {
697                        cached_response = add_cache_status_headers_to_response(
698                            cached_response,
699                            "HIT",
700                            "HIT",
701                        );
702                    }
703
704                    return convert_streaming_body_to_reqwest::<T>(
705                        cached_response,
706                    )
707                    .await
708                    .map_err(|e| {
709                        to_middleware_error(HttpCacheError::Cache(
710                            e.to_string(),
711                        ))
712                    });
713                }
714                BeforeRequest::Stale { request: conditional_parts, .. } => {
715                    // Apply rate limiting before revalidation request
716                    #[cfg(feature = "rate-limiting")]
717                    if let Some(rate_limiter) = &self.cache.options.rate_limiter
718                    {
719                        let url = req.url().clone();
720                        let rate_limit_key =
721                            url.host_str().unwrap_or("unknown");
722                        rate_limiter.until_key_ready(rate_limit_key).await;
723                    }
724
725                    // Create conditional request
726                    let mut conditional_req = req;
727                    for (name, value) in conditional_parts.headers.iter() {
728                        conditional_req
729                            .headers_mut()
730                            .insert(name.clone(), value.clone());
731                    }
732
733                    let conditional_response =
734                        next.run(conditional_req, extensions).await?;
735
736                    if conditional_response.status() == 304 {
737                        // Convert reqwest response parts for handling not modified
738                        let (fresh_parts, _) =
739                            convert_reqwest_response_to_http_parts(
740                                conditional_response,
741                            )
742                            .map_err(|e| {
743                                to_middleware_error(HttpCacheError::Cache(
744                                    e.to_string(),
745                                ))
746                            })?;
747                        let updated_response = self
748                            .cache
749                            .handle_not_modified(cached_response, &fresh_parts)
750                            .await
751                            .map_err(|e| {
752                                to_middleware_error(HttpCacheError::Cache(
753                                    e.to_string(),
754                                ))
755                            })?;
756
757                        let mut final_response = updated_response;
758
759                        // Add cache status headers if enabled
760                        if self.cache.options.cache_status_headers {
761                            final_response =
762                                add_cache_status_headers_to_response(
763                                    final_response,
764                                    "HIT",
765                                    "HIT",
766                                );
767                        }
768
769                        return convert_streaming_body_to_reqwest::<T>(
770                            final_response,
771                        )
772                        .await
773                        .map_err(|e| {
774                            to_middleware_error(HttpCacheError::Cache(
775                                e.to_string(),
776                            ))
777                        });
778                    } else {
779                        // Fresh response received, process it through the cache
780                        let http_response =
781                            convert_reqwest_response_to_http_full_body(
782                                conditional_response,
783                            )
784                            .await
785                            .map_err(|e| {
786                                to_middleware_error(HttpCacheError::Cache(
787                                    e.to_string(),
788                                ))
789                            })?;
790                        let cached_response = self
791                            .cache
792                            .process_response(analysis, http_response)
793                            .await
794                            .map_err(|e| {
795                                to_middleware_error(HttpCacheError::Cache(
796                                    e.to_string(),
797                                ))
798                            })?;
799
800                        let mut final_response = cached_response;
801
802                        // Add cache status headers if enabled
803                        if self.cache.options.cache_status_headers {
804                            final_response =
805                                add_cache_status_headers_to_response(
806                                    final_response,
807                                    "MISS",
808                                    "MISS",
809                                );
810                        }
811
812                        return convert_streaming_body_to_reqwest::<T>(
813                            final_response,
814                        )
815                        .await
816                        .map_err(|e| {
817                            to_middleware_error(HttpCacheError::Cache(
818                                e.to_string(),
819                            ))
820                        });
821                    }
822                }
823            }
824        }
825
826        // Apply rate limiting before fresh request
827        #[cfg(feature = "rate-limiting")]
828        if let Some(rate_limiter) = &self.cache.options.rate_limiter {
829            let url = req.url().clone();
830            let rate_limit_key = url.host_str().unwrap_or("unknown");
831            rate_limiter.until_key_ready(rate_limit_key).await;
832        }
833
834        // Fetch fresh response from upstream
835        let response = next.run(req, extensions).await?;
836        let http_response =
837            convert_reqwest_response_to_http_full_body(response)
838                .await
839                .map_err(|e| {
840                    to_middleware_error(HttpCacheError::Cache(e.to_string()))
841                })?;
842
843        // Process and potentially cache the response
844        let cached_response = self
845            .cache
846            .process_response(analysis, http_response)
847            .await
848            .map_err(|e| {
849                to_middleware_error(HttpCacheError::Cache(e.to_string()))
850            })?;
851
852        let mut final_response = cached_response;
853
854        // Add cache status headers if enabled
855        if self.cache.options.cache_status_headers {
856            final_response = add_cache_status_headers_to_response(
857                final_response,
858                "MISS",
859                "MISS",
860            );
861        }
862
863        convert_streaming_body_to_reqwest::<T>(final_response).await.map_err(
864            |e| to_middleware_error(HttpCacheError::Cache(e.to_string())),
865        )
866    }
867}
868
869#[cfg(test)]
870mod test;