zng_task/http/
cache.rs

1use std::{
2    fmt,
3    time::{Duration, SystemTime},
4};
5
6use super::{Body, Error};
7use async_trait::async_trait;
8use serde::*;
9use zng_unit::*;
10
11use http_cache_semantics as hcs;
12
13pub(super) use hcs::BeforeRequest;
14
15/// Represents a serializable configuration for a cache entry in a [`CacheDb`].
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct CachePolicy(PolicyInner);
18impl CachePolicy {
19    pub(super) fn new(request: &isahc::Request<super::Body>, response: &isahc::Response<isahc::AsyncBody>) -> Self {
20        let p = hcs::CachePolicy::new_options(
21            request,
22            response,
23            SystemTime::now(),
24            hcs::CacheOptions {
25                shared: false,
26                ignore_cargo_cult: true,
27                ..Default::default()
28            },
29        );
30        Self(PolicyInner::Policy(p))
31    }
32
33    pub(super) fn should_store(&self) -> bool {
34        match &self.0 {
35            PolicyInner::Policy(p) => p.is_storable() && p.time_to_live(SystemTime::now()) > 5.secs(),
36            PolicyInner::Permanent(_) => true,
37        }
38    }
39
40    pub(super) fn new_permanent(response: &isahc::Response<isahc::AsyncBody>) -> Self {
41        let p = PermanentHeader {
42            res: response.headers().clone(),
43            status: response.status(),
44        };
45        Self(PolicyInner::Permanent(p))
46    }
47
48    pub(super) fn is_permanent(&self) -> bool {
49        matches!(self.0, PolicyInner::Permanent(_))
50    }
51
52    pub(super) fn before_request(&self, request: &isahc::Request<super::Body>) -> BeforeRequest {
53        match &self.0 {
54            PolicyInner::Policy(p) => p.before_request(request, SystemTime::now()),
55            PolicyInner::Permanent(p) => BeforeRequest::Fresh(p.parts()),
56        }
57    }
58
59    pub(super) fn after_response(
60        &self,
61        request: &isahc::Request<super::Body>,
62        response: &isahc::Response<isahc::AsyncBody>,
63    ) -> AfterResponse {
64        match &self.0 {
65            PolicyInner::Policy(p) => p.after_response(request, response, SystemTime::now()).into(),
66            PolicyInner::Permanent(_) => unreachable!(), // don't call `after_response` for `Fresh` `before_request`
67        }
68    }
69
70    /// Returns how long the response has been sitting in cache.
71    pub fn age(&self, now: SystemTime) -> Duration {
72        match &self.0 {
73            PolicyInner::Policy(p) => p.age(now),
74            PolicyInner::Permanent(_) => Duration::MAX,
75        }
76    }
77
78    /// Returns approximate time in milliseconds until the response becomes stale.
79    pub fn time_to_live(&self, now: SystemTime) -> Duration {
80        match &self.0 {
81            PolicyInner::Policy(p) => p.time_to_live(now),
82            PolicyInner::Permanent(_) => Duration::MAX,
83        }
84    }
85
86    /// Returns `true` if the cache entry has expired.
87    pub fn is_stale(&self, now: SystemTime) -> bool {
88        match &self.0 {
89            PolicyInner::Policy(p) => p.is_stale(now),
90            PolicyInner::Permanent(_) => false,
91        }
92    }
93}
94impl From<hcs::CachePolicy> for CachePolicy {
95    fn from(p: hcs::CachePolicy) -> Self {
96        CachePolicy(PolicyInner::Policy(p))
97    }
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
101#[allow(clippy::large_enum_variant)]
102enum PolicyInner {
103    Policy(hcs::CachePolicy),
104    Permanent(PermanentHeader),
105}
106#[derive(Debug, Clone, Serialize, Deserialize)]
107struct PermanentHeader {
108    #[serde(with = "http_serde::header_map")]
109    res: super::header::HeaderMap,
110    #[serde(with = "http_serde::status_code")]
111    status: super::StatusCode,
112}
113impl PermanentHeader {
114    pub fn parts(&self) -> isahc::http::response::Parts {
115        let mut r = isahc::Response::<()>::default().into_parts().0;
116        r.headers = self.res.clone();
117        r.status = self.status;
118        r
119    }
120}
121
122/// New policy and flags to act on `after_response()`
123pub(super) enum AfterResponse {
124    /// You can use the cached body! Make sure to use these updated headers
125    NotModified(CachePolicy, isahc::http::response::Parts),
126    /// You need to update the body in the cache
127    Modified(CachePolicy, isahc::http::response::Parts),
128}
129impl From<hcs::AfterResponse> for AfterResponse {
130    fn from(s: hcs::AfterResponse) -> Self {
131        match s {
132            hcs::AfterResponse::NotModified(po, pa) => AfterResponse::NotModified(po.into(), pa),
133            hcs::AfterResponse::Modified(po, pa) => AfterResponse::Modified(po.into(), pa),
134        }
135    }
136}
137
138/// Represents a download cache in a [`Client`].
139///
140/// Cache implementers must store a [`CachePolicy`] and [`Body`] for a given [`CacheKey`].
141///
142/// [`Client`]: crate::http::Client
143#[async_trait]
144pub trait CacheDb: Send + Sync + 'static {
145    /// Dynamic clone.
146    fn clone_boxed(&self) -> Box<dyn CacheDb>;
147
148    /// Retrieves the cache-policy for the given `key`.
149    async fn policy(&self, key: &CacheKey) -> Option<CachePolicy>;
150
151    /// Replaces the cache-policy for the given `key`.
152    ///
153    /// Returns `false` if the entry does not exist.
154    async fn set_policy(&self, key: &CacheKey, policy: CachePolicy) -> bool;
155
156    /// Read/clone the cached body for the given `key`.
157    async fn body(&self, key: &CacheKey) -> Option<Body>;
158
159    /// Caches the `policy` and `body` for the given `key`.
160    ///
161    /// The `body` is fully downloaded and stored into the cache, this method can await for the full download
162    /// before returning or return immediately with a body that updates as data is cached.
163    ///
164    /// In case of error the cache entry is removed, the returned body may continue downloading data if possible.
165    /// In case of a cache entry creation error the input `body` may be returned if it was not lost in the error.
166    async fn set(&self, key: &CacheKey, policy: CachePolicy, body: Body) -> Option<Body>;
167
168    /// Remove cached policy and body for the given `key`.
169    async fn remove(&self, key: &CacheKey);
170
171    /// Remove all cached entries that are not locked in a `set*` operation.
172    async fn purge(&self);
173
174    /// Remove cache entries to reduce pressure.
175    ///
176    /// What entries are removed depends on the cache DB implementer.
177    async fn prune(&self);
178}
179
180/// Cache mode selected for a [`Uri`].
181///
182/// See [`ClientBuilder::cache_mode`] for more information.
183///
184/// [`Uri`]: crate::http::Uri
185///
186/// [`ClientBuilder::cache_mode`]: crate::http::ClientBuilder::cache_mode
187#[derive(Debug, Clone, Default)]
188pub enum CacheMode {
189    /// Always requests the server, never caches the response.
190    NoCache,
191
192    /// Follow the standard cache policy as computed by [`http-cache-semantics`].
193    ///
194    /// [`http-cache-semantics`]: https://docs.rs/http-cache-semantics
195    #[default]
196    Default,
197
198    /// Always caches the response, overwriting cache control configs.
199    ///
200    /// If the response is cached returns it instead of requesting an update.
201    Permanent,
202
203    /// Returns the error.
204    Error(Error),
205}
206
207/// Represents a SHA-512/256 hash computed from a normalized request.
208#[derive(Debug, Clone, PartialEq, Eq, Hash)]
209pub struct CacheKey([u8; 32]);
210impl CacheKey {
211    /// Compute key from request.
212    pub fn from_request(request: &super::Request) -> Self {
213        Self::new(&request.req)
214    }
215
216    pub(super) fn new(request: &isahc::Request<super::Body>) -> Self {
217        let mut headers: Vec<_> = request.headers().iter().map(|(n, v)| (n.clone(), v.clone())).collect();
218
219        headers.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
220
221        use sha2::Digest;
222
223        let mut m = sha2::Sha512_256::new();
224        m.update(request.uri().to_string().as_bytes());
225        m.update(request.method().as_str());
226        for (name, value) in headers {
227            m.update(name.as_str().as_bytes());
228            m.update(value.as_bytes());
229        }
230        let hash = m.finalize();
231
232        CacheKey(hash.into())
233    }
234
235    /// Returns the SHA-512/256 hash.
236    pub fn sha(&self) -> [u8; 32] {
237        self.0
238    }
239
240    /// Computes a URI safe base64 encoded SHA-512/256 from the key data.
241    pub fn sha_str(&self) -> String {
242        use base64::*;
243
244        let hash = self.sha();
245        base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(&hash[..])
246    }
247}
248impl fmt::Display for CacheKey {
249    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
250        write!(f, "{}", self.sha_str())
251    }
252}
253
254pub use file_cache::FileSystemCache;
255
256mod file_cache {
257    use std::{
258        fs::{self, File, OpenOptions},
259        io::{self, Read, Write},
260        path::{Path, PathBuf},
261    };
262
263    use crate::http::util::{lock_exclusive, lock_shared, unlock_ok};
264    use crate::{
265        self as task,
266        io::{McBufErrorExt, McBufReader},
267    };
268    use async_trait::async_trait;
269    use fs4::fs_std::FileExt;
270    use zng_unit::TimeUnits;
271
272    use super::*;
273
274    /// A simple [`CacheDb`] implementation that uses a local directory.
275    ///
276    /// # Implementation Details
277    ///
278    /// A file lock is used to control data access, read operations use a shared lock so concurrent reads can happen,
279    /// the [`set`] operation uses a exclusive lock for the duration of the body download, so subsequent requests for
280    /// a caching resource will await until the cache is completed to return a body that will then read the cached data.
281    ///
282    /// The [`set`] operation returns a body as soon as the entry is created, the body will receive data as it is downloaded and cached,
283    /// in case of a cache error mid-download the cache entry is removed but the returned body will still download the rest of the data.
284    /// In case of an error creating the entry the original body is always returned so the [`Client`] can continue with a normal
285    /// download also.
286    ///
287    /// The cache does not pull data, only data read by the returned body is written to the cache, dropping the body without reading
288    /// to end cancels the cache entry.
289    ///
290    /// [`Client`]: crate::http::Client
291    /// [`set`]: crate::http::CacheDb::set
292    #[derive(Clone)]
293    pub struct FileSystemCache {
294        dir: PathBuf,
295    }
296    impl FileSystemCache {
297        /// Open the cache in `dir` or create it.
298        pub fn new(dir: impl Into<PathBuf>) -> io::Result<Self> {
299            let dir = dir.into();
300            std::fs::create_dir_all(&dir)?;
301
302            Ok(FileSystemCache { dir })
303        }
304
305        async fn entry(&self, key: &CacheKey, write: bool) -> Option<CacheEntry> {
306            let dir = self.dir.clone();
307            let key = key.sha_str();
308            task::wait(move || CacheEntry::open(dir.join(key), write)).await
309        }
310    }
311    #[async_trait]
312    impl CacheDb for FileSystemCache {
313        fn clone_boxed(&self) -> Box<dyn CacheDb> {
314            Box::new(self.clone())
315        }
316
317        async fn policy(&self, key: &CacheKey) -> Option<CachePolicy> {
318            self.entry(key, false).await.map(|mut e| e.policy.take().unwrap())
319        }
320        async fn set_policy(&self, key: &CacheKey, policy: CachePolicy) -> bool {
321            if let Some(entry) = self.entry(key, true).await {
322                task::wait(move || entry.write_policy(policy)).await
323            } else {
324                false
325            }
326        }
327
328        async fn body(&self, key: &CacheKey) -> Option<Body> {
329            self.entry(key, false).await?.open_body().await
330        }
331        async fn set(&self, key: &CacheKey, policy: CachePolicy, body: Body) -> Option<Body> {
332            match self.entry(key, true).await {
333                Some(entry) => {
334                    let (entry, ok) = task::wait(move || {
335                        let ok = entry.write_policy(policy);
336                        (entry, ok)
337                    })
338                    .await;
339
340                    if ok { Some(entry.write_body(body).await) } else { Some(body) }
341                }
342                _ => Some(body),
343            }
344        }
345
346        async fn remove(&self, key: &CacheKey) {
347            if let Some(entry) = self.entry(key, true).await {
348                task::wait(move || {
349                    CacheEntry::try_delete_locked_dir(&entry.dir, &entry.lock);
350                })
351                .await
352            }
353        }
354
355        async fn purge(&self) {
356            let dir = self.dir.clone();
357            task::wait(move || {
358                if let Ok(entries) = std::fs::read_dir(dir) {
359                    for entry in entries.flatten() {
360                        let entry = entry.path();
361                        if entry.is_dir()
362                            && let Ok(lock) = File::open(entry.join(CacheEntry::LOCK))
363                            && FileExt::try_lock_shared(&lock).is_ok()
364                        {
365                            CacheEntry::try_delete_locked_dir(&entry, &lock);
366                        }
367                    }
368                }
369            })
370            .await
371        }
372
373        async fn prune(&self) {
374            let dir = self.dir.clone();
375            task::wait(move || {
376                if let Ok(entries) = std::fs::read_dir(dir) {
377                    let now = SystemTime::now();
378                    let old = (24 * 3).hours();
379
380                    for entry in entries.flatten() {
381                        let entry = entry.path();
382                        if entry.is_dir()
383                            && let Some(entry) = CacheEntry::open(entry, false)
384                        {
385                            let policy = entry.policy.as_ref().unwrap();
386                            if policy.is_stale(now) && policy.age(now) > old {
387                                CacheEntry::try_delete_locked_dir(&entry.dir, &entry.lock);
388                            }
389                        }
390                    }
391                }
392            })
393            .await
394        }
395    }
396
397    struct CacheEntry {
398        dir: PathBuf,
399        lock: File,
400
401        policy: Option<CachePolicy>,
402    }
403    impl CacheEntry {
404        const LOCK: &'static str = ".lock";
405        const WRITING: &'static str = ".w";
406        const POLICY: &'static str = ".policy";
407        const BODY: &'static str = ".body";
408
409        /// Open or create an entry.
410        fn open(dir: PathBuf, write: bool) -> Option<Self> {
411            if write
412                && !dir.exists()
413                && let Err(e) = fs::create_dir_all(&dir)
414            {
415                tracing::error!("cache dir error, {e:?}");
416                return None;
417            }
418
419            let lock = dir.join(Self::LOCK);
420            let mut opt = OpenOptions::new();
421            if write {
422                opt.read(true).write(true).create(true);
423            } else {
424                opt.read(true);
425            }
426
427            let mut lock = match opt.open(lock) {
428                Ok(l) => l,
429                Err(e) if e.kind() == std::io::ErrorKind::NotFound && !dir.exists() => return None,
430                Err(e) => {
431                    tracing::error!("cache lock open error, {e:?}");
432                    Self::try_delete_dir(&dir);
433                    return None;
434                }
435            };
436
437            const TIMEOUT: Duration = Duration::from_secs(10);
438
439            let lock_r = if write {
440                lock_exclusive(&lock, TIMEOUT)
441            } else {
442                lock_shared(&lock, TIMEOUT)
443            };
444            if let Err(e) = lock_r {
445                tracing::error!("cache lock error, {e:?}");
446                Self::try_delete_dir(&dir);
447                return None;
448            }
449
450            let mut version = String::new();
451            if let Err(e) = lock.read_to_string(&mut version) {
452                tracing::error!("cache lock read error, {e:?}");
453                Self::try_delete_locked_dir(&dir, &lock);
454                return None;
455            }
456
457            let expected_version = "zng::http::FileCache 1.0";
458            if version != expected_version {
459                if write && version.is_empty() {
460                    if let Err(e) = lock.set_len(0).and_then(|()| lock.write_all(expected_version.as_bytes())) {
461                        tracing::error!("cache lock write error, {e:?}");
462                        Self::try_delete_locked_dir(&dir, &lock);
463                        return None;
464                    }
465                } else {
466                    tracing::error!("unknown cache version, {version:?}");
467                    Self::try_delete_locked_dir(&dir, &lock);
468                    return None;
469                }
470            }
471
472            let policy_file = dir.join(Self::POLICY);
473
474            if dir.join(Self::WRITING).exists() {
475                tracing::error!("cache has partial files, removing");
476
477                if write {
478                    if let Err(e) = Self::remove_files(&dir) {
479                        tracing::error!("failed to clear partial files, {e:?}");
480                        Self::try_delete_locked_dir(&dir, &lock);
481                        return None;
482                    }
483                } else {
484                    Self::try_delete_locked_dir(&dir, &lock);
485                    return None;
486                }
487            }
488
489            if policy_file.exists() {
490                let policy = match Self::read_policy(&policy_file) {
491                    Ok(i) => i,
492                    Err(e) => {
493                        tracing::error!("cache policy read error, {e:?}");
494                        Self::try_delete_locked_dir(&dir, &lock);
495                        return None;
496                    }
497                };
498
499                Some(Self {
500                    dir,
501                    lock,
502                    policy: Some(policy),
503                })
504            } else if write {
505                Some(Self { dir, lock, policy: None })
506            } else {
507                tracing::error!("cache policy missing");
508                Self::try_delete_locked_dir(&dir, &lock);
509                None
510            }
511        }
512        fn read_policy(file: &Path) -> Result<CachePolicy, Box<dyn std::error::Error>> {
513            let file = std::fs::File::open(file)?;
514            let file = std::io::BufReader::new(file);
515            let policy = serde_json::from_reader(file)?;
516            Ok(policy)
517        }
518
519        /// Replace the .policy content, returns `true` if the entry still exists.
520        pub fn write_policy(&self, policy: CachePolicy) -> bool {
521            let w_tag = if let Some(t) = self.writing_tag() {
522                t
523            } else {
524                return false;
525            };
526
527            if let Err(e) = self.write_policy_impl(policy) {
528                tracing::error!("cache policy serialize error, {e:?}");
529                Self::try_delete_locked_dir(&self.dir, &self.lock);
530                return false;
531            }
532
533            let _ = fs::remove_file(w_tag);
534
535            true
536        }
537        fn write_policy_impl(&self, policy: CachePolicy) -> Result<(), Box<dyn std::error::Error>> {
538            let file = std::fs::File::create(self.dir.join(Self::POLICY))?;
539            serde_json::to_writer(file, &policy)?;
540            Ok(())
541        }
542
543        /// Start reading the body content, returns `Some(_)` if the entry still exists.
544        pub async fn open_body(&self) -> Option<Body> {
545            match task::fs::File::open(self.dir.join(Self::BODY)).await {
546                Ok(body) => {
547                    if let Ok(metadata) = body.metadata().await {
548                        Some(Body::from_reader_sized(task::io::BufReader::new(body), metadata.len()))
549                    } else {
550                        Some(Body::from_reader(task::io::BufReader::new(body)))
551                    }
552                }
553                Err(e) => {
554                    tracing::error!("cache open body error, {e:?}");
555                    Self::try_delete_locked_dir(&self.dir, &self.lock);
556                    None
557                }
558            }
559        }
560
561        /// Start downloading and writing a copy of the body to the cache entry.
562        pub async fn write_body(self, body: Body) -> Body {
563            let w_tag = if let Some(t) = self.writing_tag() {
564                t
565            } else {
566                return body;
567            };
568
569            match task::fs::File::create(self.dir.join(Self::BODY)).await {
570                Ok(cache_body) => {
571                    let cache_body = task::io::BufWriter::new(cache_body);
572                    let len = body.len();
573                    let mut cache_copy = McBufReader::new(body);
574                    let body_copy = cache_copy.clone();
575                    cache_copy.set_lazy(true); // don't read more than body, gets error if body is dropped before EOF.
576
577                    task::spawn(async move {
578                        if let Err(e) = task::io::copy(cache_copy, cache_body).await {
579                            if e.is_only_lazy_left() {
580                                tracing::warn!("cache cancel");
581                            } else {
582                                tracing::error!("cache body write error, {e:?}");
583                            }
584                            // cleanup partial download, stopped by error of by user dropping body reader.
585                            Self::try_delete_locked_dir(&self.dir, &self.lock);
586                        } else {
587                            let _ = fs::remove_file(w_tag);
588                        }
589                    });
590
591                    if let Some(len) = len {
592                        Body::from_reader_sized(body_copy, len)
593                    } else {
594                        Body::from_reader(body_copy)
595                    }
596                }
597                Err(e) => {
598                    tracing::error!("cache body create error, {e:?}");
599                    Self::try_delete_locked_dir(&self.dir, &self.lock);
600                    body
601                }
602            }
603        }
604
605        fn try_delete_locked_dir(dir: &Path, lock: &File) {
606            let _ = unlock_ok(lock);
607            Self::try_delete_dir(dir);
608        }
609
610        fn try_delete_dir(dir: &Path) {
611            let _ = remove_dir_all::remove_dir_all(dir);
612        }
613
614        fn writing_tag(&self) -> Option<PathBuf> {
615            let tag = self.dir.join(Self::WRITING);
616
617            if let Err(e) = fs::write(&tag, "w") {
618                tracing::error!("cache write tag error, {e:?}");
619                Self::try_delete_locked_dir(&self.dir, &self.lock);
620                None
621            } else {
622                Some(tag)
623            }
624        }
625
626        fn remove_files(dir: &Path) -> std::io::Result<()> {
627            for file in [Self::BODY, Self::POLICY, Self::WRITING] {
628                fs::remove_file(dir.join(file))?
629            }
630            Ok(())
631        }
632    }
633    impl Drop for CacheEntry {
634        fn drop(&mut self) {
635            if let Err(e) = unlock_ok(&self.lock) {
636                tracing::error!("cache unlock error, {e:?}");
637                Self::try_delete_dir(&self.dir);
638            }
639        }
640    }
641}
642
643#[cfg(test)]
644mod tests {
645    use std::{path::PathBuf, time::SystemTime};
646
647    use zng_clone_move::async_clmv;
648
649    use crate::{
650        self as task,
651        http::{header::*, util::*, *},
652    };
653    use zng_unit::*;
654
655    #[test]
656    pub fn file_cache_miss() {
657        test_log();
658        let tmp = TestTempDir::new("file_cache_miss");
659
660        let test = FileSystemCache::new(&tmp).unwrap();
661        let request = Request::get("https://file_cache_miss.invalid/content").unwrap().build();
662        let key = CacheKey::from_request(&request);
663
664        let r = async_test(async move { test.policy(&key).await });
665
666        assert!(r.is_none());
667    }
668
669    #[test]
670    pub fn file_cache_set_no_headers() {
671        test_log();
672        let tmp = TestTempDir::new("file_cache_set_no_headers");
673
674        let test = FileSystemCache::new(&tmp).unwrap();
675        let request = Request::get("https://file_cache_set_no_headers.invalid/content").unwrap().build();
676        let response = Response::new_message(StatusCode::OK, "test content.");
677
678        let key = CacheKey::from_request(&request);
679        let policy = CachePolicy::new(&request.req, &response.0);
680
681        let (headers, body) = async_test(async move {
682            let (parts, body) = response.into_parts();
683
684            let body = test.set(&key, policy, body).await.unwrap();
685
686            let mut response = Response::from_parts(parts, body);
687
688            let body = response.text().await.unwrap();
689
690            (response.into_parts().0.headers, body)
691        });
692
693        assert_eq!(body, "test content.");
694        assert!(headers.is_empty());
695    }
696
697    #[test]
698    pub fn file_cache_set() {
699        test_log();
700        let tmp = TestTempDir::new("file_cache_set");
701
702        let test = FileSystemCache::new(&tmp).unwrap();
703        let request = Request::get("https://file_cache_set.invalid/content").unwrap().build();
704        let key = CacheKey::from_request(&request);
705
706        let mut headers = HeaderMap::default();
707        headers.insert(header::CONTENT_LENGTH, HeaderValue::from("test content.".len()));
708        let body = Body::from_reader(task::io::Cursor::new("test content."));
709        let response = Response::new(StatusCode::OK, headers, body);
710
711        let policy = CachePolicy::new(&request.req, &response.0);
712
713        let (headers, body) = async_test(async move {
714            let (parts, body) = response.into_parts();
715
716            let body = test.set(&key, policy, body).await.unwrap();
717
718            let mut response = Response::from_parts(parts, body);
719
720            let body = response.text().await.unwrap();
721
722            (response.into_parts().0.headers, body)
723        });
724
725        assert_eq!(
726            headers.get(&header::CONTENT_LENGTH),
727            Some(&HeaderValue::from("test content.".len()))
728        );
729        assert_eq!(body, "test content.");
730    }
731
732    #[test]
733    pub fn file_cache_get_cached() {
734        test_log();
735        let tmp = TestTempDir::new("file_cache_get_cached");
736        let request = Request::get("https://file_cache_get_cached.invalid/content").unwrap().build();
737        let key = CacheKey::from_request(&request);
738
739        let test = FileSystemCache::new(&tmp).unwrap();
740
741        let mut headers = HeaderMap::default();
742        headers.insert(header::CONTENT_LENGTH, HeaderValue::from("test content.".len()));
743        let body = Body::from_reader(task::io::Cursor::new("test content."));
744        let response = Response::new(StatusCode::OK, headers, body);
745
746        let policy = CachePolicy::new(&request.req, &response.0);
747
748        async_test(async_clmv!(key, {
749            let (_, body) = response.into_parts();
750
751            let mut body = test.set(&key, policy, body).await.unwrap();
752            Body::bytes(&mut body).await.unwrap();
753
754            drop(test);
755        }));
756
757        let test = FileSystemCache::new(&tmp).unwrap();
758
759        let body = async_test(async move {
760            let mut body = test.body(&key).await.unwrap();
761
762            body.text_utf8().await.unwrap()
763        });
764
765        assert_eq!(body, "test content.");
766    }
767
768    #[test]
769    pub fn file_cache_get_policy() {
770        test_log();
771        let tmp = TestTempDir::new("get_etag");
772
773        let test = FileSystemCache::new(&tmp).unwrap();
774
775        let request = Request::get("https://get_etag.invalid/content").unwrap().build();
776        let key = CacheKey::from_request(&request);
777
778        let mut headers = HeaderMap::default();
779        headers.insert(header::CONTENT_LENGTH, HeaderValue::from("test content.".len()));
780        let response = Response::new(StatusCode::OK, headers, Body::from_reader(task::io::Cursor::new("test content.")));
781        let policy = CachePolicy::new(&request.req, &response.0);
782
783        let r_policy = async_test(async_clmv!(policy, {
784            let mut body = test.set(&key, policy, response.into_parts().1).await.unwrap();
785            Body::bytes(&mut body).await.unwrap();
786
787            let test = FileSystemCache::new(&tmp).unwrap();
788
789            test.policy(&key).await.unwrap()
790        }));
791
792        let now = SystemTime::now();
793        assert_eq!(policy.age(now), r_policy.age(now));
794    }
795
796    #[test]
797    pub fn file_cache_concurrent_get() {
798        test_log();
799        let tmp = TestTempDir::new("file_cache_concurrent_get");
800        let request = Request::get("https://file_cache_concurrent_get.invalid/content").unwrap().build();
801        let key = CacheKey::from_request(&request);
802
803        let test = FileSystemCache::new(&tmp).unwrap();
804
805        let mut headers = HeaderMap::default();
806        headers.insert(header::CONTENT_LENGTH, HeaderValue::from("test content.".len()));
807        let body = Body::from_reader(task::io::Cursor::new("test content."));
808        let response = Response::new(StatusCode::OK, headers, body);
809        let policy = CachePolicy::new(&request.req, &response.0);
810
811        async_test(async_clmv!(key, {
812            let mut body = test.set(&key, policy, response.into_parts().1).await.unwrap();
813            Body::bytes(&mut body).await.unwrap();
814
815            drop(test);
816        }));
817
818        async_test(async move {
819            let a = concurrent_get(tmp.path().to_owned(), key.clone());
820            let b = concurrent_get(tmp.path().to_owned(), key.clone());
821            let c = concurrent_get(tmp.path().to_owned(), key);
822
823            task::all!(a, b, c).await;
824        });
825    }
826    async fn concurrent_get(tmp: PathBuf, body: CacheKey) {
827        task::run(async move {
828            let test = FileSystemCache::new(&tmp).unwrap();
829
830            let mut body = test.body(&body).await.unwrap();
831
832            let body = body.text_utf8().await.unwrap();
833
834            assert_eq!(body, "test content.");
835        })
836        .await
837    }
838
839    #[test]
840    pub fn file_cache_concurrent_set() {
841        test_log();
842        let tmp = TestTempDir::new("file_cache_concurrent_set");
843        let uri = Uri::try_from("https://file_cache_concurrent_set.invalid/content").unwrap();
844
845        async_test(async move {
846            let a = concurrent_set(tmp.path().to_owned(), uri.clone());
847            let b = concurrent_set(tmp.path().to_owned(), uri.clone());
848            let c = concurrent_set(tmp.path().to_owned(), uri);
849
850            task::all!(a, b, c).await;
851        });
852    }
853    async fn concurrent_set(tmp: PathBuf, uri: Uri) {
854        task::run(async move {
855            let test = FileSystemCache::new(&tmp).unwrap();
856
857            let request = Request::get(uri).unwrap().build();
858            let key = CacheKey::from_request(&request);
859
860            let mut headers = HeaderMap::default();
861            headers.insert(header::CONTENT_LENGTH, HeaderValue::from("test content.".len()));
862            let body = Body::from_reader(task::io::Cursor::new("test content."));
863            let response = Response::new(StatusCode::OK, headers, body);
864
865            let policy = CachePolicy::new(&request.req, &response.0);
866
867            let (headers, body) = async move {
868                let (parts, body) = response.into_parts();
869
870                let body = test.set(&key, policy, body).await.unwrap();
871                let mut response = Response::from_parts(parts, body);
872
873                let body = response.text().await.unwrap();
874
875                (response.into_parts().0.headers, body)
876            }
877            .await;
878
879            assert_eq!(
880                headers.get(&header::CONTENT_LENGTH),
881                Some(&HeaderValue::from("test content.".len()))
882            );
883            assert_eq!(body, "test content.");
884        })
885        .await
886    }
887
888    #[track_caller]
889    fn async_test<F>(test: F) -> F::Output
890    where
891        F: Future,
892    {
893        task::block_on(task::with_deadline(test, 30.secs())).unwrap()
894    }
895}