1use crate::error::{Error, Result};
16use crate::{FileInfo, FileInfoVersions, FileMeta, FileMetaShallowVersion, VersionType, merge_file_meta_versions};
17use rmp::Marker;
18use serde::{Deserialize, Serialize};
19use std::cmp::Ordering;
20use std::str::from_utf8;
21use std::{
22 fmt::Debug,
23 future::Future,
24 pin::Pin,
25 ptr,
26 sync::{
27 Arc,
28 atomic::{AtomicPtr, AtomicU64, Ordering as AtomicOrdering},
29 },
30 time::{Duration, SystemTime, UNIX_EPOCH},
31};
32use time::OffsetDateTime;
33use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
34use tokio::spawn;
35use tokio::sync::Mutex;
36use tracing::warn;
37
38const SLASH_SEPARATOR: &str = "/";
39
40#[derive(Clone, Debug, Default)]
41pub struct MetadataResolutionParams {
42 pub dir_quorum: usize,
43 pub obj_quorum: usize,
44 pub requested_versions: usize,
45 pub bucket: String,
46 pub strict: bool,
47 pub candidates: Vec<Vec<FileMetaShallowVersion>>,
48}
49
50#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
51pub struct MetaCacheEntry {
52 pub name: String,
54 pub metadata: Vec<u8>,
57
58 #[serde(skip)]
60 pub cached: Option<FileMeta>,
61
62 pub reusable: bool,
64}
65
66impl MetaCacheEntry {
67 pub fn marshal_msg(&self) -> Result<Vec<u8>> {
68 let mut wr = Vec::new();
69 rmp::encode::write_bool(&mut wr, true)?;
70 rmp::encode::write_str(&mut wr, &self.name)?;
71 rmp::encode::write_bin(&mut wr, &self.metadata)?;
72 Ok(wr)
73 }
74
75 pub fn is_dir(&self) -> bool {
76 self.metadata.is_empty() && self.name.ends_with('/')
77 }
78
79 pub fn is_in_dir(&self, dir: &str, separator: &str) -> bool {
80 if dir.is_empty() {
81 let idx = self.name.find(separator);
82 return idx.is_none() || idx.unwrap() == self.name.len() - separator.len();
83 }
84
85 let ext = self.name.trim_start_matches(dir);
86
87 if ext.len() != self.name.len() {
88 let idx = ext.find(separator);
89 return idx.is_none() || idx.unwrap() == ext.len() - separator.len();
90 }
91
92 false
93 }
94
95 pub fn is_object(&self) -> bool {
96 !self.metadata.is_empty()
97 }
98
99 pub fn is_object_dir(&self) -> bool {
100 !self.metadata.is_empty() && self.name.ends_with(SLASH_SEPARATOR)
101 }
102
103 pub fn is_latest_delete_marker(&mut self) -> bool {
104 if let Some(cached) = &self.cached {
105 if cached.versions.is_empty() {
106 return true;
107 }
108 return cached.versions[0].header.version_type == VersionType::Delete;
109 }
110
111 if !FileMeta::is_xl2_v1_format(&self.metadata) {
112 return false;
113 }
114
115 match FileMeta::check_xl2_v1(&self.metadata) {
116 Ok((meta, _, _)) => {
117 if !meta.is_empty() {
118 return FileMeta::is_latest_delete_marker(meta);
119 }
120 }
121 Err(_) => return true,
122 }
123
124 match self.xl_meta() {
125 Ok(res) => {
126 if res.versions.is_empty() {
127 return true;
128 }
129 res.versions[0].header.version_type == VersionType::Delete
130 }
131 Err(_) => true,
132 }
133 }
134
135 #[tracing::instrument(level = "debug", skip(self))]
136 pub fn to_fileinfo(&self, bucket: &str) -> Result<FileInfo> {
137 if self.is_dir() {
138 return Ok(FileInfo {
139 volume: bucket.to_owned(),
140 name: self.name.clone(),
141 ..Default::default()
142 });
143 }
144
145 if self.cached.is_some() {
146 let fm = self.cached.as_ref().unwrap();
147 if fm.versions.is_empty() {
148 return Ok(FileInfo {
149 volume: bucket.to_owned(),
150 name: self.name.clone(),
151 deleted: true,
152 is_latest: true,
153 mod_time: Some(OffsetDateTime::UNIX_EPOCH),
154 ..Default::default()
155 });
156 }
157
158 let fi = fm.into_fileinfo(bucket, self.name.as_str(), "", false, false)?;
159 return Ok(fi);
160 }
161
162 let mut fm = FileMeta::new();
163 fm.unmarshal_msg(&self.metadata)?;
164 let fi = fm.into_fileinfo(bucket, self.name.as_str(), "", false, false)?;
165 Ok(fi)
166 }
167
168 pub fn file_info_versions(&self, bucket: &str) -> Result<FileInfoVersions> {
169 if self.is_dir() {
170 return Ok(FileInfoVersions {
171 volume: bucket.to_string(),
172 name: self.name.clone(),
173 versions: vec![FileInfo {
174 volume: bucket.to_string(),
175 name: self.name.clone(),
176 ..Default::default()
177 }],
178 ..Default::default()
179 });
180 }
181
182 let mut fm = FileMeta::new();
183 fm.unmarshal_msg(&self.metadata)?;
184 fm.into_file_info_versions(bucket, self.name.as_str(), false)
185 }
186
187 pub fn matches(&self, other: Option<&MetaCacheEntry>, strict: bool) -> (Option<MetaCacheEntry>, bool) {
188 if other.is_none() {
189 return (None, false);
190 }
191
192 let other = other.unwrap();
193 if self.name != other.name {
194 if self.name < other.name {
195 return (Some(self.clone()), false);
196 }
197 return (Some(other.clone()), false);
198 }
199
200 if other.is_dir() || self.is_dir() {
201 if self.is_dir() {
202 return (Some(self.clone()), other.is_dir() == self.is_dir());
203 }
204 return (Some(other.clone()), other.is_dir() == self.is_dir());
205 }
206
207 let self_vers = match &self.cached {
208 Some(file_meta) => file_meta.clone(),
209 None => match FileMeta::load(&self.metadata) {
210 Ok(meta) => meta,
211 Err(_) => return (None, false),
212 },
213 };
214
215 let other_vers = match &other.cached {
216 Some(file_meta) => file_meta.clone(),
217 None => match FileMeta::load(&other.metadata) {
218 Ok(meta) => meta,
219 Err(_) => return (None, false),
220 },
221 };
222
223 if self_vers.versions.len() != other_vers.versions.len() {
224 match self_vers.lastest_mod_time().cmp(&other_vers.lastest_mod_time()) {
225 Ordering::Greater => return (Some(self.clone()), false),
226 Ordering::Less => return (Some(other.clone()), false),
227 _ => {}
228 }
229
230 if self_vers.versions.len() > other_vers.versions.len() {
231 return (Some(self.clone()), false);
232 }
233 return (Some(other.clone()), false);
234 }
235
236 let mut prefer = None;
237 for (s_version, o_version) in self_vers.versions.iter().zip(other_vers.versions.iter()) {
238 if s_version.header != o_version.header {
239 if s_version.header.has_ec() != o_version.header.has_ec() {
240 let (mut a, mut b) = (s_version.header.clone(), o_version.header.clone());
243 (a.ec_n, a.ec_m, b.ec_n, b.ec_m) = (0, 0, 0, 0);
244 if a == b {
245 continue;
246 }
247 }
248
249 if !strict && s_version.header.matches_not_strict(&o_version.header) {
250 if prefer.is_none() {
251 if s_version.header.sorts_before(&o_version.header) {
252 prefer = Some(self.clone());
253 } else {
254 prefer = Some(other.clone());
255 }
256 }
257 continue;
258 }
259
260 if prefer.is_some() {
261 return (prefer, false);
262 }
263
264 if s_version.header.sorts_before(&o_version.header) {
265 return (Some(self.clone()), false);
266 }
267
268 return (Some(other.clone()), false);
269 }
270 }
271
272 if prefer.is_none() {
273 prefer = Some(self.clone());
274 }
275
276 (prefer, true)
277 }
278
279 pub fn xl_meta(&mut self) -> Result<FileMeta> {
280 if self.is_dir() {
281 return Err(Error::FileNotFound);
282 }
283
284 if let Some(meta) = &self.cached {
285 Ok(meta.clone())
286 } else {
287 if self.metadata.is_empty() {
288 return Err(Error::FileNotFound);
289 }
290
291 let meta = FileMeta::load(&self.metadata)?;
292 self.cached = Some(meta.clone());
293 Ok(meta)
294 }
295 }
296}
297
298#[derive(Debug, Default)]
299pub struct MetaCacheEntries(pub Vec<Option<MetaCacheEntry>>);
300
301impl MetaCacheEntries {
302 #[allow(clippy::should_implement_trait)]
303 pub fn as_ref(&self) -> &[Option<MetaCacheEntry>] {
304 &self.0
305 }
306
307 pub fn resolve(&self, mut params: MetadataResolutionParams) -> Option<MetaCacheEntry> {
308 if self.0.is_empty() {
309 warn!("decommission_pool: entries resolve empty");
310 return None;
311 }
312
313 let mut dir_exists = 0;
314 let mut selected = None;
315
316 params.candidates.clear();
317 let mut objs_agree = 0;
318 let mut objs_valid = 0;
319
320 for entry in self.0.iter().flatten() {
321 let mut entry = entry.clone();
322
323 warn!("decommission_pool: entries resolve entry {:?}", entry.name);
324 if entry.name.is_empty() {
325 continue;
326 }
327 if entry.is_dir() {
328 dir_exists += 1;
329 selected = Some(entry.clone());
330 warn!("decommission_pool: entries resolve entry dir {:?}", entry.name);
331 continue;
332 }
333
334 let xl = match entry.xl_meta() {
335 Ok(xl) => xl,
336 Err(e) => {
337 warn!("decommission_pool: entries resolve entry xl_meta {:?}", e);
338 continue;
339 }
340 };
341
342 objs_valid += 1;
343 params.candidates.push(xl.versions.clone());
344
345 if selected.is_none() {
346 selected = Some(entry.clone());
347 objs_agree = 1;
348 warn!("decommission_pool: entries resolve entry selected {:?}", entry.name);
349 continue;
350 }
351
352 if let (prefer, true) = entry.matches(selected.as_ref(), params.strict) {
353 selected = prefer;
354 objs_agree += 1;
355 warn!("decommission_pool: entries resolve entry prefer {:?}", entry.name);
356 continue;
357 }
358 }
359
360 let Some(selected) = selected else {
361 warn!("decommission_pool: entries resolve entry no selected");
362 return None;
363 };
364
365 if selected.is_dir() && dir_exists >= params.dir_quorum {
366 warn!("decommission_pool: entries resolve entry dir selected {:?}", selected.name);
367 return Some(selected);
368 }
369
370 if objs_valid < params.obj_quorum {
372 warn!(
373 "decommission_pool: entries resolve entry not enough objects {} < {}",
374 objs_valid, params.obj_quorum
375 );
376 return None;
377 }
378
379 if objs_agree == objs_valid {
380 warn!("decommission_pool: entries resolve entry all agree {} == {}", objs_agree, objs_valid);
381 return Some(selected);
382 }
383
384 let Some(cached) = selected.cached else {
385 warn!("decommission_pool: entries resolve entry no cached");
386 return None;
387 };
388
389 let versions = merge_file_meta_versions(params.obj_quorum, params.strict, params.requested_versions, ¶ms.candidates);
390 if versions.is_empty() {
391 warn!("decommission_pool: entries resolve entry no versions");
392 return None;
393 }
394
395 let metadata = match cached.marshal_msg() {
396 Ok(meta) => meta,
397 Err(e) => {
398 warn!("decommission_pool: entries resolve entry marshal_msg {:?}", e);
399 return None;
400 }
401 };
402
403 let new_selected = MetaCacheEntry {
406 name: selected.name.clone(),
407 cached: Some(FileMeta {
408 meta_ver: cached.meta_ver,
409 versions,
410 ..Default::default()
411 }),
412 reusable: true,
413 metadata,
414 };
415
416 warn!("decommission_pool: entries resolve entry selected {:?}", new_selected.name);
417 Some(new_selected)
418 }
419
420 pub fn first_found(&self) -> (Option<MetaCacheEntry>, usize) {
421 (self.0.iter().find(|x| x.is_some()).cloned().unwrap_or_default(), self.0.len())
422 }
423}
424
425#[derive(Debug, Default)]
426pub struct MetaCacheEntriesSortedResult {
427 pub entries: Option<MetaCacheEntriesSorted>,
428 pub err: Option<Error>,
429}
430
431#[derive(Debug, Default)]
432pub struct MetaCacheEntriesSorted {
433 pub o: MetaCacheEntries,
434 pub list_id: Option<String>,
435 pub reuse: bool,
436 pub last_skipped_entry: Option<String>,
437}
438
439impl MetaCacheEntriesSorted {
440 pub fn entries(&self) -> Vec<&MetaCacheEntry> {
441 let entries: Vec<&MetaCacheEntry> = self.o.0.iter().flatten().collect();
442 entries
443 }
444
445 pub fn forward_past(&mut self, marker: Option<String>) {
446 if let Some(val) = marker {
447 if let Some(idx) = self.o.0.iter().flatten().position(|v| v.name > val) {
448 self.o.0 = self.o.0.split_off(idx);
449 }
450 }
451 }
452}
453
454const METACACHE_STREAM_VERSION: u8 = 2;
455
456#[derive(Debug)]
457pub struct MetacacheWriter<W> {
458 wr: W,
459 created: bool,
460 buf: Vec<u8>,
461}
462
463impl<W: AsyncWrite + Unpin> MetacacheWriter<W> {
464 pub fn new(wr: W) -> Self {
465 Self {
466 wr,
467 created: false,
468 buf: Vec::new(),
469 }
470 }
471
472 pub async fn flush(&mut self) -> Result<()> {
473 self.wr.write_all(&self.buf).await?;
474 self.buf.clear();
475 Ok(())
476 }
477
478 pub async fn init(&mut self) -> Result<()> {
479 if !self.created {
480 rmp::encode::write_u8(&mut self.buf, METACACHE_STREAM_VERSION).map_err(|e| Error::other(format!("{e:?}")))?;
481 self.flush().await?;
482 self.created = true;
483 }
484 Ok(())
485 }
486
487 pub async fn write(&mut self, objs: &[MetaCacheEntry]) -> Result<()> {
488 if objs.is_empty() {
489 return Ok(());
490 }
491
492 self.init().await?;
493
494 for obj in objs.iter() {
495 if obj.name.is_empty() {
496 return Err(Error::other("metacacheWriter: no name"));
497 }
498
499 self.write_obj(obj).await?;
500 }
501
502 Ok(())
503 }
504
505 pub async fn write_obj(&mut self, obj: &MetaCacheEntry) -> Result<()> {
506 self.init().await?;
507
508 rmp::encode::write_bool(&mut self.buf, true).map_err(|e| Error::other(format!("{e:?}")))?;
509 rmp::encode::write_str(&mut self.buf, &obj.name).map_err(|e| Error::other(format!("{e:?}")))?;
510 rmp::encode::write_bin(&mut self.buf, &obj.metadata).map_err(|e| Error::other(format!("{e:?}")))?;
511 self.flush().await?;
512
513 Ok(())
514 }
515
516 pub async fn close(&mut self) -> Result<()> {
517 rmp::encode::write_bool(&mut self.buf, false).map_err(|e| Error::other(format!("{e:?}")))?;
518 self.flush().await?;
519 Ok(())
520 }
521}
522
523pub struct MetacacheReader<R> {
524 rd: R,
525 init: bool,
526 err: Option<Error>,
527 buf: Vec<u8>,
528 offset: usize,
529 current: Option<MetaCacheEntry>,
530}
531
532impl<R: AsyncRead + Unpin> MetacacheReader<R> {
533 pub fn new(rd: R) -> Self {
534 Self {
535 rd,
536 init: false,
537 err: None,
538 buf: Vec::new(),
539 offset: 0,
540 current: None,
541 }
542 }
543
544 pub async fn read_more(&mut self, read_size: usize) -> Result<&[u8]> {
545 let ext_size = read_size + self.offset;
546
547 let extra = ext_size - self.offset;
548 if self.buf.capacity() >= ext_size {
549 self.buf.resize(ext_size, 0);
551 } else {
552 self.buf.extend(vec![0u8; extra]);
553 }
554
555 let pref = self.offset;
556
557 self.rd.read_exact(&mut self.buf[pref..ext_size]).await?;
558
559 self.offset += read_size;
560
561 let data = &self.buf[pref..ext_size];
562
563 Ok(data)
564 }
565
566 fn reset(&mut self) {
567 self.buf.clear();
568 self.offset = 0;
569 }
570
571 async fn check_init(&mut self) -> Result<()> {
572 if !self.init {
573 let ver = match rmp::decode::read_u8(&mut self.read_more(2).await?) {
574 Ok(res) => res,
575 Err(err) => {
576 self.err = Some(Error::other(format!("{err:?}")));
577 0
578 }
579 };
580 match ver {
581 1 | 2 => (),
582 _ => {
583 self.err = Some(Error::other("invalid version"));
584 }
585 }
586
587 self.init = true;
588 }
589 Ok(())
590 }
591
592 async fn read_str_len(&mut self) -> Result<u32> {
593 let mark = match rmp::decode::read_marker(&mut self.read_more(1).await?) {
594 Ok(res) => res,
595 Err(err) => {
596 let err: Error = err.into();
597 self.err = Some(err.clone());
598 return Err(err);
599 }
600 };
601
602 match mark {
603 Marker::FixStr(size) => Ok(u32::from(size)),
604 Marker::Str8 => Ok(u32::from(self.read_u8().await?)),
605 Marker::Str16 => Ok(u32::from(self.read_u16().await?)),
606 Marker::Str32 => Ok(self.read_u32().await?),
607 _marker => Err(Error::other("str marker err")),
608 }
609 }
610
611 async fn read_bin_len(&mut self) -> Result<u32> {
612 let mark = match rmp::decode::read_marker(&mut self.read_more(1).await?) {
613 Ok(res) => res,
614 Err(err) => {
615 let err: Error = err.into();
616 self.err = Some(err.clone());
617 return Err(err);
618 }
619 };
620
621 match mark {
622 Marker::Bin8 => Ok(u32::from(self.read_u8().await?)),
623 Marker::Bin16 => Ok(u32::from(self.read_u16().await?)),
624 Marker::Bin32 => Ok(self.read_u32().await?),
625 _ => Err(Error::other("bin marker err")),
626 }
627 }
628
629 async fn read_u8(&mut self) -> Result<u8> {
630 let buf = self.read_more(1).await?;
631 Ok(u8::from_be_bytes(buf.try_into().expect("Slice with incorrect length")))
632 }
633
634 async fn read_u16(&mut self) -> Result<u16> {
635 let buf = self.read_more(2).await?;
636 Ok(u16::from_be_bytes(buf.try_into().expect("Slice with incorrect length")))
637 }
638
639 async fn read_u32(&mut self) -> Result<u32> {
640 let buf = self.read_more(4).await?;
641 Ok(u32::from_be_bytes(buf.try_into().expect("Slice with incorrect length")))
642 }
643
644 pub async fn skip(&mut self, size: usize) -> Result<()> {
645 self.check_init().await?;
646
647 if let Some(err) = &self.err {
648 return Err(err.clone());
649 }
650
651 let mut n = size;
652
653 if self.current.is_some() {
654 n -= 1;
655 self.current = None;
656 }
657
658 while n > 0 {
659 match rmp::decode::read_bool(&mut self.read_more(1).await?) {
660 Ok(res) => {
661 if !res {
662 return Ok(());
663 }
664 }
665 Err(err) => {
666 let err: Error = err.into();
667 self.err = Some(err.clone());
668 return Err(err);
669 }
670 };
671
672 let l = self.read_str_len().await?;
673 let _ = self.read_more(l as usize).await?;
674 let l = self.read_bin_len().await?;
675 let _ = self.read_more(l as usize).await?;
676
677 n -= 1;
678 }
679
680 Ok(())
681 }
682
683 pub async fn peek(&mut self) -> Result<Option<MetaCacheEntry>> {
684 self.check_init().await?;
685
686 if let Some(err) = &self.err {
687 return Err(err.clone());
688 }
689
690 match rmp::decode::read_bool(&mut self.read_more(1).await?) {
691 Ok(res) => {
692 if !res {
693 return Ok(None);
694 }
695 }
696 Err(err) => {
697 let err: Error = err.into();
698 self.err = Some(err.clone());
699 return Err(err);
700 }
701 };
702
703 let l = self.read_str_len().await?;
704
705 let buf = self.read_more(l as usize).await?;
706 let name_buf = buf.to_vec();
707 let name = match from_utf8(&name_buf) {
708 Ok(decoded) => decoded.to_owned(),
709 Err(err) => {
710 self.err = Some(Error::other(err.to_string()));
711 return Err(Error::other(err.to_string()));
712 }
713 };
714
715 let l = self.read_bin_len().await?;
716
717 let buf = self.read_more(l as usize).await?;
718
719 let metadata = buf.to_vec();
720
721 self.reset();
722
723 let entry = Some(MetaCacheEntry {
724 name,
725 metadata,
726 cached: None,
727 reusable: false,
728 });
729 self.current = entry.clone();
730
731 Ok(entry)
732 }
733
734 pub async fn read_all(&mut self) -> Result<Vec<MetaCacheEntry>> {
735 let mut ret = Vec::new();
736
737 loop {
738 if let Some(entry) = self.peek().await? {
739 ret.push(entry);
740 continue;
741 }
742 break;
743 }
744
745 Ok(ret)
746 }
747}
748
749pub type UpdateFn<T> = Box<dyn Fn() -> Pin<Box<dyn Future<Output = std::io::Result<T>> + Send>> + Send + Sync + 'static>;
750
751#[derive(Clone, Debug, Default)]
752pub struct Opts {
753 pub return_last_good: bool,
754 pub no_wait: bool,
755}
756
757pub struct Cache<T: Clone + Debug + Send> {
758 update_fn: UpdateFn<T>,
759 ttl: Duration,
760 opts: Opts,
761 val: AtomicPtr<T>,
762 last_update_ms: AtomicU64,
763 updating: Arc<Mutex<bool>>,
764}
765
766impl<T: Clone + Debug + Send + 'static> Cache<T> {
767 pub fn new(update_fn: UpdateFn<T>, ttl: Duration, opts: Opts) -> Self {
768 let val = AtomicPtr::new(ptr::null_mut());
769 Self {
770 update_fn,
771 ttl,
772 opts,
773 val,
774 last_update_ms: AtomicU64::new(0),
775 updating: Arc::new(Mutex::new(false)),
776 }
777 }
778
779 #[allow(unsafe_code)]
780 pub async fn get(self: Arc<Self>) -> std::io::Result<T> {
781 let v_ptr = self.val.load(AtomicOrdering::SeqCst);
782 let v = if v_ptr.is_null() {
783 None
784 } else {
785 Some(unsafe { (*v_ptr).clone() })
786 };
787
788 let now = SystemTime::now()
789 .duration_since(UNIX_EPOCH)
790 .expect("Time went backwards")
791 .as_secs();
792 if now - self.last_update_ms.load(AtomicOrdering::SeqCst) < self.ttl.as_secs() {
793 if let Some(v) = v {
794 return Ok(v);
795 }
796 }
797
798 if self.opts.no_wait && v.is_some() && now - self.last_update_ms.load(AtomicOrdering::SeqCst) < self.ttl.as_secs() * 2 {
799 if self.updating.try_lock().is_ok() {
800 let this = Arc::clone(&self);
801 spawn(async move {
802 let _ = this.update().await;
803 });
804 }
805
806 return Ok(v.unwrap());
807 }
808
809 let _ = self.updating.lock().await;
810
811 if let Ok(duration) =
812 SystemTime::now().duration_since(UNIX_EPOCH + Duration::from_secs(self.last_update_ms.load(AtomicOrdering::SeqCst)))
813 {
814 if duration < self.ttl {
815 return Ok(v.unwrap());
816 }
817 }
818
819 match self.update().await {
820 Ok(_) => {
821 let v_ptr = self.val.load(AtomicOrdering::SeqCst);
822 let v = if v_ptr.is_null() {
823 None
824 } else {
825 Some(unsafe { (*v_ptr).clone() })
826 };
827 Ok(v.unwrap())
828 }
829 Err(err) => Err(err),
830 }
831 }
832
833 async fn update(&self) -> std::io::Result<()> {
834 match (self.update_fn)().await {
835 Ok(val) => {
836 self.val.store(Box::into_raw(Box::new(val)), AtomicOrdering::SeqCst);
837 let now = SystemTime::now()
838 .duration_since(UNIX_EPOCH)
839 .expect("Time went backwards")
840 .as_secs();
841 self.last_update_ms.store(now, AtomicOrdering::SeqCst);
842 Ok(())
843 }
844 Err(err) => {
845 let v_ptr = self.val.load(AtomicOrdering::SeqCst);
846 if self.opts.return_last_good && !v_ptr.is_null() {
847 return Ok(());
848 }
849
850 Err(err)
851 }
852 }
853 }
854}
855
856#[cfg(test)]
857mod tests {
858 use super::*;
859 use std::io::Cursor;
860
861 #[tokio::test]
862 async fn test_writer() {
863 let mut f = Cursor::new(Vec::new());
864 let mut w = MetacacheWriter::new(&mut f);
865
866 let mut objs = Vec::new();
867 for i in 0..10 {
868 let info = MetaCacheEntry {
869 name: format!("item{i}"),
870 metadata: vec![0u8, 10],
871 cached: None,
872 reusable: false,
873 };
874 objs.push(info);
875 }
876
877 w.write(&objs).await.unwrap();
878 w.close().await.unwrap();
879
880 let data = f.into_inner();
881 let nf = Cursor::new(data);
882
883 let mut r = MetacacheReader::new(nf);
884 let nobjs = r.read_all().await.unwrap();
885
886 assert_eq!(objs, nobjs);
887 }
888}