1use std::{
2 fmt,
3 time::{Duration, SystemTime},
4};
5
6use super::{Body, Error};
7use async_trait::async_trait;
8use serde::*;
9use zng_unit::*;
10
11use http_cache_semantics as hcs;
12
13pub(super) use hcs::BeforeRequest;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct CachePolicy(PolicyInner);
18impl CachePolicy {
19 pub(super) fn new(request: &isahc::Request<super::Body>, response: &isahc::Response<isahc::AsyncBody>) -> Self {
20 let p = hcs::CachePolicy::new_options(
21 request,
22 response,
23 SystemTime::now(),
24 hcs::CacheOptions {
25 shared: false,
26 ignore_cargo_cult: true,
27 ..Default::default()
28 },
29 );
30 Self(PolicyInner::Policy(p))
31 }
32
33 pub(super) fn should_store(&self) -> bool {
34 match &self.0 {
35 PolicyInner::Policy(p) => p.is_storable() && p.time_to_live(SystemTime::now()) > 5.secs(),
36 PolicyInner::Permanent(_) => true,
37 }
38 }
39
40 pub(super) fn new_permanent(response: &isahc::Response<isahc::AsyncBody>) -> Self {
41 let p = PermanentHeader {
42 res: response.headers().clone(),
43 status: response.status(),
44 };
45 Self(PolicyInner::Permanent(p))
46 }
47
48 pub(super) fn is_permanent(&self) -> bool {
49 matches!(self.0, PolicyInner::Permanent(_))
50 }
51
52 pub(super) fn before_request(&self, request: &isahc::Request<super::Body>) -> BeforeRequest {
53 match &self.0 {
54 PolicyInner::Policy(p) => p.before_request(request, SystemTime::now()),
55 PolicyInner::Permanent(p) => BeforeRequest::Fresh(p.parts()),
56 }
57 }
58
59 pub(super) fn after_response(
60 &self,
61 request: &isahc::Request<super::Body>,
62 response: &isahc::Response<isahc::AsyncBody>,
63 ) -> AfterResponse {
64 match &self.0 {
65 PolicyInner::Policy(p) => p.after_response(request, response, SystemTime::now()).into(),
66 PolicyInner::Permanent(_) => unreachable!(), }
68 }
69
70 pub fn age(&self, now: SystemTime) -> Duration {
72 match &self.0 {
73 PolicyInner::Policy(p) => p.age(now),
74 PolicyInner::Permanent(_) => Duration::MAX,
75 }
76 }
77
78 pub fn time_to_live(&self, now: SystemTime) -> Duration {
80 match &self.0 {
81 PolicyInner::Policy(p) => p.time_to_live(now),
82 PolicyInner::Permanent(_) => Duration::MAX,
83 }
84 }
85
86 pub fn is_stale(&self, now: SystemTime) -> bool {
88 match &self.0 {
89 PolicyInner::Policy(p) => p.is_stale(now),
90 PolicyInner::Permanent(_) => false,
91 }
92 }
93}
94impl From<hcs::CachePolicy> for CachePolicy {
95 fn from(p: hcs::CachePolicy) -> Self {
96 CachePolicy(PolicyInner::Policy(p))
97 }
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
101#[allow(clippy::large_enum_variant)]
102enum PolicyInner {
103 Policy(hcs::CachePolicy),
104 Permanent(PermanentHeader),
105}
106#[derive(Debug, Clone, Serialize, Deserialize)]
107struct PermanentHeader {
108 #[serde(with = "http_serde::header_map")]
109 res: super::header::HeaderMap,
110 #[serde(with = "http_serde::status_code")]
111 status: super::StatusCode,
112}
113impl PermanentHeader {
114 pub fn parts(&self) -> isahc::http::response::Parts {
115 let mut r = isahc::Response::<()>::default().into_parts().0;
116 r.headers = self.res.clone();
117 r.status = self.status;
118 r
119 }
120}
121
122pub(super) enum AfterResponse {
124 NotModified(CachePolicy, isahc::http::response::Parts),
126 Modified(CachePolicy, isahc::http::response::Parts),
128}
129impl From<hcs::AfterResponse> for AfterResponse {
130 fn from(s: hcs::AfterResponse) -> Self {
131 match s {
132 hcs::AfterResponse::NotModified(po, pa) => AfterResponse::NotModified(po.into(), pa),
133 hcs::AfterResponse::Modified(po, pa) => AfterResponse::Modified(po.into(), pa),
134 }
135 }
136}
137
138#[async_trait]
144pub trait CacheDb: Send + Sync + 'static {
145 fn clone_boxed(&self) -> Box<dyn CacheDb>;
147
148 async fn policy(&self, key: &CacheKey) -> Option<CachePolicy>;
150
151 async fn set_policy(&self, key: &CacheKey, policy: CachePolicy) -> bool;
155
156 async fn body(&self, key: &CacheKey) -> Option<Body>;
158
159 async fn set(&self, key: &CacheKey, policy: CachePolicy, body: Body) -> Option<Body>;
167
168 async fn remove(&self, key: &CacheKey);
170
171 async fn purge(&self);
173
174 async fn prune(&self);
178}
179
180#[derive(Debug, Clone, Default)]
188pub enum CacheMode {
189 NoCache,
191
192 #[default]
196 Default,
197
198 Permanent,
202
203 Error(Error),
205}
206
207#[derive(Debug, Clone, PartialEq, Eq, Hash)]
209pub struct CacheKey([u8; 32]);
210impl CacheKey {
211 pub fn from_request(request: &super::Request) -> Self {
213 Self::new(&request.req)
214 }
215
216 pub(super) fn new(request: &isahc::Request<super::Body>) -> Self {
217 let mut headers: Vec<_> = request.headers().iter().map(|(n, v)| (n.clone(), v.clone())).collect();
218
219 headers.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
220
221 use sha2::Digest;
222
223 let mut m = sha2::Sha512_256::new();
224 m.update(request.uri().to_string().as_bytes());
225 m.update(request.method().as_str());
226 for (name, value) in headers {
227 m.update(name.as_str().as_bytes());
228 m.update(value.as_bytes());
229 }
230 let hash = m.finalize();
231
232 CacheKey(hash.into())
233 }
234
235 pub fn sha(&self) -> [u8; 32] {
237 self.0
238 }
239
240 pub fn sha_str(&self) -> String {
242 use base64::*;
243
244 let hash = self.sha();
245 base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(&hash[..])
246 }
247}
248impl fmt::Display for CacheKey {
249 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
250 write!(f, "{}", self.sha_str())
251 }
252}
253
254pub use file_cache::FileSystemCache;
255
256mod file_cache {
257 use std::{
258 fs::{self, File, OpenOptions},
259 io::{self, Read, Write},
260 path::{Path, PathBuf},
261 };
262
263 use crate::http::util::{lock_exclusive, lock_shared, unlock_ok};
264 use crate::{
265 self as task,
266 io::{McBufErrorExt, McBufReader},
267 };
268 use async_trait::async_trait;
269 use fs4::fs_std::FileExt;
270 use zng_unit::TimeUnits;
271
272 use super::*;
273
274 #[derive(Clone)]
293 pub struct FileSystemCache {
294 dir: PathBuf,
295 }
296 impl FileSystemCache {
297 pub fn new(dir: impl Into<PathBuf>) -> io::Result<Self> {
299 let dir = dir.into();
300 std::fs::create_dir_all(&dir)?;
301
302 Ok(FileSystemCache { dir })
303 }
304
305 async fn entry(&self, key: &CacheKey, write: bool) -> Option<CacheEntry> {
306 let dir = self.dir.clone();
307 let key = key.sha_str();
308 task::wait(move || CacheEntry::open(dir.join(key), write)).await
309 }
310 }
311 #[async_trait]
312 impl CacheDb for FileSystemCache {
313 fn clone_boxed(&self) -> Box<dyn CacheDb> {
314 Box::new(self.clone())
315 }
316
317 async fn policy(&self, key: &CacheKey) -> Option<CachePolicy> {
318 self.entry(key, false).await.map(|mut e| e.policy.take().unwrap())
319 }
320 async fn set_policy(&self, key: &CacheKey, policy: CachePolicy) -> bool {
321 if let Some(entry) = self.entry(key, true).await {
322 task::wait(move || entry.write_policy(policy)).await
323 } else {
324 false
325 }
326 }
327
328 async fn body(&self, key: &CacheKey) -> Option<Body> {
329 self.entry(key, false).await?.open_body().await
330 }
331 async fn set(&self, key: &CacheKey, policy: CachePolicy, body: Body) -> Option<Body> {
332 match self.entry(key, true).await {
333 Some(entry) => {
334 let (entry, ok) = task::wait(move || {
335 let ok = entry.write_policy(policy);
336 (entry, ok)
337 })
338 .await;
339
340 if ok { Some(entry.write_body(body).await) } else { Some(body) }
341 }
342 _ => Some(body),
343 }
344 }
345
346 async fn remove(&self, key: &CacheKey) {
347 if let Some(entry) = self.entry(key, true).await {
348 task::wait(move || {
349 CacheEntry::try_delete_locked_dir(&entry.dir, &entry.lock);
350 })
351 .await
352 }
353 }
354
355 async fn purge(&self) {
356 let dir = self.dir.clone();
357 task::wait(move || {
358 if let Ok(entries) = std::fs::read_dir(dir) {
359 for entry in entries.flatten() {
360 let entry = entry.path();
361 if entry.is_dir()
362 && let Ok(lock) = File::open(entry.join(CacheEntry::LOCK))
363 && FileExt::try_lock_shared(&lock).is_ok()
364 {
365 CacheEntry::try_delete_locked_dir(&entry, &lock);
366 }
367 }
368 }
369 })
370 .await
371 }
372
373 async fn prune(&self) {
374 let dir = self.dir.clone();
375 task::wait(move || {
376 if let Ok(entries) = std::fs::read_dir(dir) {
377 let now = SystemTime::now();
378 let old = (24 * 3).hours();
379
380 for entry in entries.flatten() {
381 let entry = entry.path();
382 if entry.is_dir()
383 && let Some(entry) = CacheEntry::open(entry, false)
384 {
385 let policy = entry.policy.as_ref().unwrap();
386 if policy.is_stale(now) && policy.age(now) > old {
387 CacheEntry::try_delete_locked_dir(&entry.dir, &entry.lock);
388 }
389 }
390 }
391 }
392 })
393 .await
394 }
395 }
396
397 struct CacheEntry {
398 dir: PathBuf,
399 lock: File,
400
401 policy: Option<CachePolicy>,
402 }
403 impl CacheEntry {
404 const LOCK: &'static str = ".lock";
405 const WRITING: &'static str = ".w";
406 const POLICY: &'static str = ".policy";
407 const BODY: &'static str = ".body";
408
409 fn open(dir: PathBuf, write: bool) -> Option<Self> {
411 if write
412 && !dir.exists()
413 && let Err(e) = fs::create_dir_all(&dir)
414 {
415 tracing::error!("cache dir error, {e:?}");
416 return None;
417 }
418
419 let lock = dir.join(Self::LOCK);
420 let mut opt = OpenOptions::new();
421 if write {
422 opt.read(true).write(true).create(true);
423 } else {
424 opt.read(true);
425 }
426
427 let mut lock = match opt.open(lock) {
428 Ok(l) => l,
429 Err(e) if e.kind() == std::io::ErrorKind::NotFound && !dir.exists() => return None,
430 Err(e) => {
431 tracing::error!("cache lock open error, {e:?}");
432 Self::try_delete_dir(&dir);
433 return None;
434 }
435 };
436
437 const TIMEOUT: Duration = Duration::from_secs(10);
438
439 let lock_r = if write {
440 lock_exclusive(&lock, TIMEOUT)
441 } else {
442 lock_shared(&lock, TIMEOUT)
443 };
444 if let Err(e) = lock_r {
445 tracing::error!("cache lock error, {e:?}");
446 Self::try_delete_dir(&dir);
447 return None;
448 }
449
450 let mut version = String::new();
451 if let Err(e) = lock.read_to_string(&mut version) {
452 tracing::error!("cache lock read error, {e:?}");
453 Self::try_delete_locked_dir(&dir, &lock);
454 return None;
455 }
456
457 let expected_version = "zng::http::FileCache 1.0";
458 if version != expected_version {
459 if write && version.is_empty() {
460 if let Err(e) = lock.set_len(0).and_then(|()| lock.write_all(expected_version.as_bytes())) {
461 tracing::error!("cache lock write error, {e:?}");
462 Self::try_delete_locked_dir(&dir, &lock);
463 return None;
464 }
465 } else {
466 tracing::error!("unknown cache version, {version:?}");
467 Self::try_delete_locked_dir(&dir, &lock);
468 return None;
469 }
470 }
471
472 let policy_file = dir.join(Self::POLICY);
473
474 if dir.join(Self::WRITING).exists() {
475 tracing::error!("cache has partial files, removing");
476
477 if write {
478 if let Err(e) = Self::remove_files(&dir) {
479 tracing::error!("failed to clear partial files, {e:?}");
480 Self::try_delete_locked_dir(&dir, &lock);
481 return None;
482 }
483 } else {
484 Self::try_delete_locked_dir(&dir, &lock);
485 return None;
486 }
487 }
488
489 if policy_file.exists() {
490 let policy = match Self::read_policy(&policy_file) {
491 Ok(i) => i,
492 Err(e) => {
493 tracing::error!("cache policy read error, {e:?}");
494 Self::try_delete_locked_dir(&dir, &lock);
495 return None;
496 }
497 };
498
499 Some(Self {
500 dir,
501 lock,
502 policy: Some(policy),
503 })
504 } else if write {
505 Some(Self { dir, lock, policy: None })
506 } else {
507 tracing::error!("cache policy missing");
508 Self::try_delete_locked_dir(&dir, &lock);
509 None
510 }
511 }
512 fn read_policy(file: &Path) -> Result<CachePolicy, Box<dyn std::error::Error>> {
513 let file = std::fs::File::open(file)?;
514 let file = std::io::BufReader::new(file);
515 let policy = serde_json::from_reader(file)?;
516 Ok(policy)
517 }
518
519 pub fn write_policy(&self, policy: CachePolicy) -> bool {
521 let w_tag = if let Some(t) = self.writing_tag() {
522 t
523 } else {
524 return false;
525 };
526
527 if let Err(e) = self.write_policy_impl(policy) {
528 tracing::error!("cache policy serialize error, {e:?}");
529 Self::try_delete_locked_dir(&self.dir, &self.lock);
530 return false;
531 }
532
533 let _ = fs::remove_file(w_tag);
534
535 true
536 }
537 fn write_policy_impl(&self, policy: CachePolicy) -> Result<(), Box<dyn std::error::Error>> {
538 let file = std::fs::File::create(self.dir.join(Self::POLICY))?;
539 serde_json::to_writer(file, &policy)?;
540 Ok(())
541 }
542
543 pub async fn open_body(&self) -> Option<Body> {
545 match task::fs::File::open(self.dir.join(Self::BODY)).await {
546 Ok(body) => {
547 if let Ok(metadata) = body.metadata().await {
548 Some(Body::from_reader_sized(task::io::BufReader::new(body), metadata.len()))
549 } else {
550 Some(Body::from_reader(task::io::BufReader::new(body)))
551 }
552 }
553 Err(e) => {
554 tracing::error!("cache open body error, {e:?}");
555 Self::try_delete_locked_dir(&self.dir, &self.lock);
556 None
557 }
558 }
559 }
560
561 pub async fn write_body(self, body: Body) -> Body {
563 let w_tag = if let Some(t) = self.writing_tag() {
564 t
565 } else {
566 return body;
567 };
568
569 match task::fs::File::create(self.dir.join(Self::BODY)).await {
570 Ok(cache_body) => {
571 let cache_body = task::io::BufWriter::new(cache_body);
572 let len = body.len();
573 let mut cache_copy = McBufReader::new(body);
574 let body_copy = cache_copy.clone();
575 cache_copy.set_lazy(true); task::spawn(async move {
578 if let Err(e) = task::io::copy(cache_copy, cache_body).await {
579 if e.is_only_lazy_left() {
580 tracing::warn!("cache cancel");
581 } else {
582 tracing::error!("cache body write error, {e:?}");
583 }
584 Self::try_delete_locked_dir(&self.dir, &self.lock);
586 } else {
587 let _ = fs::remove_file(w_tag);
588 }
589 });
590
591 if let Some(len) = len {
592 Body::from_reader_sized(body_copy, len)
593 } else {
594 Body::from_reader(body_copy)
595 }
596 }
597 Err(e) => {
598 tracing::error!("cache body create error, {e:?}");
599 Self::try_delete_locked_dir(&self.dir, &self.lock);
600 body
601 }
602 }
603 }
604
605 fn try_delete_locked_dir(dir: &Path, lock: &File) {
606 let _ = unlock_ok(lock);
607 Self::try_delete_dir(dir);
608 }
609
610 fn try_delete_dir(dir: &Path) {
611 let _ = remove_dir_all::remove_dir_all(dir);
612 }
613
614 fn writing_tag(&self) -> Option<PathBuf> {
615 let tag = self.dir.join(Self::WRITING);
616
617 if let Err(e) = fs::write(&tag, "w") {
618 tracing::error!("cache write tag error, {e:?}");
619 Self::try_delete_locked_dir(&self.dir, &self.lock);
620 None
621 } else {
622 Some(tag)
623 }
624 }
625
626 fn remove_files(dir: &Path) -> std::io::Result<()> {
627 for file in [Self::BODY, Self::POLICY, Self::WRITING] {
628 fs::remove_file(dir.join(file))?
629 }
630 Ok(())
631 }
632 }
633 impl Drop for CacheEntry {
634 fn drop(&mut self) {
635 if let Err(e) = unlock_ok(&self.lock) {
636 tracing::error!("cache unlock error, {e:?}");
637 Self::try_delete_dir(&self.dir);
638 }
639 }
640 }
641}
642
643#[cfg(test)]
644mod tests {
645 use std::{path::PathBuf, time::SystemTime};
646
647 use zng_clone_move::async_clmv;
648
649 use crate::{
650 self as task,
651 http::{header::*, util::*, *},
652 };
653 use zng_unit::*;
654
655 #[test]
656 pub fn file_cache_miss() {
657 test_log();
658 let tmp = TestTempDir::new("file_cache_miss");
659
660 let test = FileSystemCache::new(&tmp).unwrap();
661 let request = Request::get("https://file_cache_miss.invalid/content").unwrap().build();
662 let key = CacheKey::from_request(&request);
663
664 let r = async_test(async move { test.policy(&key).await });
665
666 assert!(r.is_none());
667 }
668
669 #[test]
670 pub fn file_cache_set_no_headers() {
671 test_log();
672 let tmp = TestTempDir::new("file_cache_set_no_headers");
673
674 let test = FileSystemCache::new(&tmp).unwrap();
675 let request = Request::get("https://file_cache_set_no_headers.invalid/content").unwrap().build();
676 let response = Response::new_message(StatusCode::OK, "test content.");
677
678 let key = CacheKey::from_request(&request);
679 let policy = CachePolicy::new(&request.req, &response.0);
680
681 let (headers, body) = async_test(async move {
682 let (parts, body) = response.into_parts();
683
684 let body = test.set(&key, policy, body).await.unwrap();
685
686 let mut response = Response::from_parts(parts, body);
687
688 let body = response.text().await.unwrap();
689
690 (response.into_parts().0.headers, body)
691 });
692
693 assert_eq!(body, "test content.");
694 assert!(headers.is_empty());
695 }
696
697 #[test]
698 pub fn file_cache_set() {
699 test_log();
700 let tmp = TestTempDir::new("file_cache_set");
701
702 let test = FileSystemCache::new(&tmp).unwrap();
703 let request = Request::get("https://file_cache_set.invalid/content").unwrap().build();
704 let key = CacheKey::from_request(&request);
705
706 let mut headers = HeaderMap::default();
707 headers.insert(header::CONTENT_LENGTH, HeaderValue::from("test content.".len()));
708 let body = Body::from_reader(task::io::Cursor::new("test content."));
709 let response = Response::new(StatusCode::OK, headers, body);
710
711 let policy = CachePolicy::new(&request.req, &response.0);
712
713 let (headers, body) = async_test(async move {
714 let (parts, body) = response.into_parts();
715
716 let body = test.set(&key, policy, body).await.unwrap();
717
718 let mut response = Response::from_parts(parts, body);
719
720 let body = response.text().await.unwrap();
721
722 (response.into_parts().0.headers, body)
723 });
724
725 assert_eq!(
726 headers.get(&header::CONTENT_LENGTH),
727 Some(&HeaderValue::from("test content.".len()))
728 );
729 assert_eq!(body, "test content.");
730 }
731
732 #[test]
733 pub fn file_cache_get_cached() {
734 test_log();
735 let tmp = TestTempDir::new("file_cache_get_cached");
736 let request = Request::get("https://file_cache_get_cached.invalid/content").unwrap().build();
737 let key = CacheKey::from_request(&request);
738
739 let test = FileSystemCache::new(&tmp).unwrap();
740
741 let mut headers = HeaderMap::default();
742 headers.insert(header::CONTENT_LENGTH, HeaderValue::from("test content.".len()));
743 let body = Body::from_reader(task::io::Cursor::new("test content."));
744 let response = Response::new(StatusCode::OK, headers, body);
745
746 let policy = CachePolicy::new(&request.req, &response.0);
747
748 async_test(async_clmv!(key, {
749 let (_, body) = response.into_parts();
750
751 let mut body = test.set(&key, policy, body).await.unwrap();
752 Body::bytes(&mut body).await.unwrap();
753
754 drop(test);
755 }));
756
757 let test = FileSystemCache::new(&tmp).unwrap();
758
759 let body = async_test(async move {
760 let mut body = test.body(&key).await.unwrap();
761
762 body.text_utf8().await.unwrap()
763 });
764
765 assert_eq!(body, "test content.");
766 }
767
768 #[test]
769 pub fn file_cache_get_policy() {
770 test_log();
771 let tmp = TestTempDir::new("get_etag");
772
773 let test = FileSystemCache::new(&tmp).unwrap();
774
775 let request = Request::get("https://get_etag.invalid/content").unwrap().build();
776 let key = CacheKey::from_request(&request);
777
778 let mut headers = HeaderMap::default();
779 headers.insert(header::CONTENT_LENGTH, HeaderValue::from("test content.".len()));
780 let response = Response::new(StatusCode::OK, headers, Body::from_reader(task::io::Cursor::new("test content.")));
781 let policy = CachePolicy::new(&request.req, &response.0);
782
783 let r_policy = async_test(async_clmv!(policy, {
784 let mut body = test.set(&key, policy, response.into_parts().1).await.unwrap();
785 Body::bytes(&mut body).await.unwrap();
786
787 let test = FileSystemCache::new(&tmp).unwrap();
788
789 test.policy(&key).await.unwrap()
790 }));
791
792 let now = SystemTime::now();
793 assert_eq!(policy.age(now), r_policy.age(now));
794 }
795
796 #[test]
797 pub fn file_cache_concurrent_get() {
798 test_log();
799 let tmp = TestTempDir::new("file_cache_concurrent_get");
800 let request = Request::get("https://file_cache_concurrent_get.invalid/content").unwrap().build();
801 let key = CacheKey::from_request(&request);
802
803 let test = FileSystemCache::new(&tmp).unwrap();
804
805 let mut headers = HeaderMap::default();
806 headers.insert(header::CONTENT_LENGTH, HeaderValue::from("test content.".len()));
807 let body = Body::from_reader(task::io::Cursor::new("test content."));
808 let response = Response::new(StatusCode::OK, headers, body);
809 let policy = CachePolicy::new(&request.req, &response.0);
810
811 async_test(async_clmv!(key, {
812 let mut body = test.set(&key, policy, response.into_parts().1).await.unwrap();
813 Body::bytes(&mut body).await.unwrap();
814
815 drop(test);
816 }));
817
818 async_test(async move {
819 let a = concurrent_get(tmp.path().to_owned(), key.clone());
820 let b = concurrent_get(tmp.path().to_owned(), key.clone());
821 let c = concurrent_get(tmp.path().to_owned(), key);
822
823 task::all!(a, b, c).await;
824 });
825 }
826 async fn concurrent_get(tmp: PathBuf, body: CacheKey) {
827 task::run(async move {
828 let test = FileSystemCache::new(&tmp).unwrap();
829
830 let mut body = test.body(&body).await.unwrap();
831
832 let body = body.text_utf8().await.unwrap();
833
834 assert_eq!(body, "test content.");
835 })
836 .await
837 }
838
839 #[test]
840 pub fn file_cache_concurrent_set() {
841 test_log();
842 let tmp = TestTempDir::new("file_cache_concurrent_set");
843 let uri = Uri::try_from("https://file_cache_concurrent_set.invalid/content").unwrap();
844
845 async_test(async move {
846 let a = concurrent_set(tmp.path().to_owned(), uri.clone());
847 let b = concurrent_set(tmp.path().to_owned(), uri.clone());
848 let c = concurrent_set(tmp.path().to_owned(), uri);
849
850 task::all!(a, b, c).await;
851 });
852 }
853 async fn concurrent_set(tmp: PathBuf, uri: Uri) {
854 task::run(async move {
855 let test = FileSystemCache::new(&tmp).unwrap();
856
857 let request = Request::get(uri).unwrap().build();
858 let key = CacheKey::from_request(&request);
859
860 let mut headers = HeaderMap::default();
861 headers.insert(header::CONTENT_LENGTH, HeaderValue::from("test content.".len()));
862 let body = Body::from_reader(task::io::Cursor::new("test content."));
863 let response = Response::new(StatusCode::OK, headers, body);
864
865 let policy = CachePolicy::new(&request.req, &response.0);
866
867 let (headers, body) = async move {
868 let (parts, body) = response.into_parts();
869
870 let body = test.set(&key, policy, body).await.unwrap();
871 let mut response = Response::from_parts(parts, body);
872
873 let body = response.text().await.unwrap();
874
875 (response.into_parts().0.headers, body)
876 }
877 .await;
878
879 assert_eq!(
880 headers.get(&header::CONTENT_LENGTH),
881 Some(&HeaderValue::from("test content.".len()))
882 );
883 assert_eq!(body, "test content.");
884 })
885 .await
886 }
887
888 #[track_caller]
889 fn async_test<F>(test: F) -> F::Output
890 where
891 F: Future,
892 {
893 task::block_on(task::with_deadline(test, 30.secs())).unwrap()
894 }
895}