rustfs_filemeta/
metacache.rs

1// Copyright 2024 RustFS Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::error::{Error, Result};
16use crate::{FileInfo, FileInfoVersions, FileMeta, FileMetaShallowVersion, VersionType, merge_file_meta_versions};
17use rmp::Marker;
18use serde::{Deserialize, Serialize};
19use std::cmp::Ordering;
20use std::str::from_utf8;
21use std::{
22    fmt::Debug,
23    future::Future,
24    pin::Pin,
25    ptr,
26    sync::{
27        Arc,
28        atomic::{AtomicPtr, AtomicU64, Ordering as AtomicOrdering},
29    },
30    time::{Duration, SystemTime, UNIX_EPOCH},
31};
32use time::OffsetDateTime;
33use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
34use tokio::spawn;
35use tokio::sync::Mutex;
36use tracing::warn;
37
38const SLASH_SEPARATOR: &str = "/";
39
40#[derive(Clone, Debug, Default)]
41pub struct MetadataResolutionParams {
42    pub dir_quorum: usize,
43    pub obj_quorum: usize,
44    pub requested_versions: usize,
45    pub bucket: String,
46    pub strict: bool,
47    pub candidates: Vec<Vec<FileMetaShallowVersion>>,
48}
49
50#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
51pub struct MetaCacheEntry {
52    /// name is the full name of the object including prefixes
53    pub name: String,
54    /// Metadata. If none is present it is not an object but only a prefix.
55    /// Entries without metadata will only be present in non-recursive scans.
56    pub metadata: Vec<u8>,
57
58    /// cached contains the metadata if decoded.
59    #[serde(skip)]
60    pub cached: Option<FileMeta>,
61
62    /// Indicates the entry can be reused and only one reference to metadata is expected.
63    pub reusable: bool,
64}
65
66impl MetaCacheEntry {
67    pub fn marshal_msg(&self) -> Result<Vec<u8>> {
68        let mut wr = Vec::new();
69        rmp::encode::write_bool(&mut wr, true)?;
70        rmp::encode::write_str(&mut wr, &self.name)?;
71        rmp::encode::write_bin(&mut wr, &self.metadata)?;
72        Ok(wr)
73    }
74
75    pub fn is_dir(&self) -> bool {
76        self.metadata.is_empty() && self.name.ends_with('/')
77    }
78
79    pub fn is_in_dir(&self, dir: &str, separator: &str) -> bool {
80        if dir.is_empty() {
81            let idx = self.name.find(separator);
82            return idx.is_none() || idx.unwrap() == self.name.len() - separator.len();
83        }
84
85        let ext = self.name.trim_start_matches(dir);
86
87        if ext.len() != self.name.len() {
88            let idx = ext.find(separator);
89            return idx.is_none() || idx.unwrap() == ext.len() - separator.len();
90        }
91
92        false
93    }
94
95    pub fn is_object(&self) -> bool {
96        !self.metadata.is_empty()
97    }
98
99    pub fn is_object_dir(&self) -> bool {
100        !self.metadata.is_empty() && self.name.ends_with(SLASH_SEPARATOR)
101    }
102
103    pub fn is_latest_delete_marker(&mut self) -> bool {
104        if let Some(cached) = &self.cached {
105            if cached.versions.is_empty() {
106                return true;
107            }
108            return cached.versions[0].header.version_type == VersionType::Delete;
109        }
110
111        if !FileMeta::is_xl2_v1_format(&self.metadata) {
112            return false;
113        }
114
115        match FileMeta::check_xl2_v1(&self.metadata) {
116            Ok((meta, _, _)) => {
117                if !meta.is_empty() {
118                    return FileMeta::is_latest_delete_marker(meta);
119                }
120            }
121            Err(_) => return true,
122        }
123
124        match self.xl_meta() {
125            Ok(res) => {
126                if res.versions.is_empty() {
127                    return true;
128                }
129                res.versions[0].header.version_type == VersionType::Delete
130            }
131            Err(_) => true,
132        }
133    }
134
135    #[tracing::instrument(level = "debug", skip(self))]
136    pub fn to_fileinfo(&self, bucket: &str) -> Result<FileInfo> {
137        if self.is_dir() {
138            return Ok(FileInfo {
139                volume: bucket.to_owned(),
140                name: self.name.clone(),
141                ..Default::default()
142            });
143        }
144
145        if self.cached.is_some() {
146            let fm = self.cached.as_ref().unwrap();
147            if fm.versions.is_empty() {
148                return Ok(FileInfo {
149                    volume: bucket.to_owned(),
150                    name: self.name.clone(),
151                    deleted: true,
152                    is_latest: true,
153                    mod_time: Some(OffsetDateTime::UNIX_EPOCH),
154                    ..Default::default()
155                });
156            }
157
158            let fi = fm.into_fileinfo(bucket, self.name.as_str(), "", false, false)?;
159            return Ok(fi);
160        }
161
162        let mut fm = FileMeta::new();
163        fm.unmarshal_msg(&self.metadata)?;
164        let fi = fm.into_fileinfo(bucket, self.name.as_str(), "", false, false)?;
165        Ok(fi)
166    }
167
168    pub fn file_info_versions(&self, bucket: &str) -> Result<FileInfoVersions> {
169        if self.is_dir() {
170            return Ok(FileInfoVersions {
171                volume: bucket.to_string(),
172                name: self.name.clone(),
173                versions: vec![FileInfo {
174                    volume: bucket.to_string(),
175                    name: self.name.clone(),
176                    ..Default::default()
177                }],
178                ..Default::default()
179            });
180        }
181
182        let mut fm = FileMeta::new();
183        fm.unmarshal_msg(&self.metadata)?;
184        fm.into_file_info_versions(bucket, self.name.as_str(), false)
185    }
186
187    pub fn matches(&self, other: Option<&MetaCacheEntry>, strict: bool) -> (Option<MetaCacheEntry>, bool) {
188        if other.is_none() {
189            return (None, false);
190        }
191
192        let other = other.unwrap();
193        if self.name != other.name {
194            if self.name < other.name {
195                return (Some(self.clone()), false);
196            }
197            return (Some(other.clone()), false);
198        }
199
200        if other.is_dir() || self.is_dir() {
201            if self.is_dir() {
202                return (Some(self.clone()), other.is_dir() == self.is_dir());
203            }
204            return (Some(other.clone()), other.is_dir() == self.is_dir());
205        }
206
207        let self_vers = match &self.cached {
208            Some(file_meta) => file_meta.clone(),
209            None => match FileMeta::load(&self.metadata) {
210                Ok(meta) => meta,
211                Err(_) => return (None, false),
212            },
213        };
214
215        let other_vers = match &other.cached {
216            Some(file_meta) => file_meta.clone(),
217            None => match FileMeta::load(&other.metadata) {
218                Ok(meta) => meta,
219                Err(_) => return (None, false),
220            },
221        };
222
223        if self_vers.versions.len() != other_vers.versions.len() {
224            match self_vers.lastest_mod_time().cmp(&other_vers.lastest_mod_time()) {
225                Ordering::Greater => return (Some(self.clone()), false),
226                Ordering::Less => return (Some(other.clone()), false),
227                _ => {}
228            }
229
230            if self_vers.versions.len() > other_vers.versions.len() {
231                return (Some(self.clone()), false);
232            }
233            return (Some(other.clone()), false);
234        }
235
236        let mut prefer = None;
237        for (s_version, o_version) in self_vers.versions.iter().zip(other_vers.versions.iter()) {
238            if s_version.header != o_version.header {
239                if s_version.header.has_ec() != o_version.header.has_ec() {
240                    // One version has EC and the other doesn't - may have been written later.
241                    // Compare without considering EC.
242                    let (mut a, mut b) = (s_version.header.clone(), o_version.header.clone());
243                    (a.ec_n, a.ec_m, b.ec_n, b.ec_m) = (0, 0, 0, 0);
244                    if a == b {
245                        continue;
246                    }
247                }
248
249                if !strict && s_version.header.matches_not_strict(&o_version.header) {
250                    if prefer.is_none() {
251                        if s_version.header.sorts_before(&o_version.header) {
252                            prefer = Some(self.clone());
253                        } else {
254                            prefer = Some(other.clone());
255                        }
256                    }
257                    continue;
258                }
259
260                if prefer.is_some() {
261                    return (prefer, false);
262                }
263
264                if s_version.header.sorts_before(&o_version.header) {
265                    return (Some(self.clone()), false);
266                }
267
268                return (Some(other.clone()), false);
269            }
270        }
271
272        if prefer.is_none() {
273            prefer = Some(self.clone());
274        }
275
276        (prefer, true)
277    }
278
279    pub fn xl_meta(&mut self) -> Result<FileMeta> {
280        if self.is_dir() {
281            return Err(Error::FileNotFound);
282        }
283
284        if let Some(meta) = &self.cached {
285            Ok(meta.clone())
286        } else {
287            if self.metadata.is_empty() {
288                return Err(Error::FileNotFound);
289            }
290
291            let meta = FileMeta::load(&self.metadata)?;
292            self.cached = Some(meta.clone());
293            Ok(meta)
294        }
295    }
296}
297
298#[derive(Debug, Default)]
299pub struct MetaCacheEntries(pub Vec<Option<MetaCacheEntry>>);
300
301impl MetaCacheEntries {
302    #[allow(clippy::should_implement_trait)]
303    pub fn as_ref(&self) -> &[Option<MetaCacheEntry>] {
304        &self.0
305    }
306
307    pub fn resolve(&self, mut params: MetadataResolutionParams) -> Option<MetaCacheEntry> {
308        if self.0.is_empty() {
309            warn!("decommission_pool: entries resolve empty");
310            return None;
311        }
312
313        let mut dir_exists = 0;
314        let mut selected = None;
315
316        params.candidates.clear();
317        let mut objs_agree = 0;
318        let mut objs_valid = 0;
319
320        for entry in self.0.iter().flatten() {
321            let mut entry = entry.clone();
322
323            warn!("decommission_pool: entries resolve entry {:?}", entry.name);
324            if entry.name.is_empty() {
325                continue;
326            }
327            if entry.is_dir() {
328                dir_exists += 1;
329                selected = Some(entry.clone());
330                warn!("decommission_pool: entries resolve entry dir {:?}", entry.name);
331                continue;
332            }
333
334            let xl = match entry.xl_meta() {
335                Ok(xl) => xl,
336                Err(e) => {
337                    warn!("decommission_pool: entries resolve entry xl_meta {:?}", e);
338                    continue;
339                }
340            };
341
342            objs_valid += 1;
343            params.candidates.push(xl.versions.clone());
344
345            if selected.is_none() {
346                selected = Some(entry.clone());
347                objs_agree = 1;
348                warn!("decommission_pool: entries resolve entry selected {:?}", entry.name);
349                continue;
350            }
351
352            if let (prefer, true) = entry.matches(selected.as_ref(), params.strict) {
353                selected = prefer;
354                objs_agree += 1;
355                warn!("decommission_pool: entries resolve entry prefer {:?}", entry.name);
356                continue;
357            }
358        }
359
360        let Some(selected) = selected else {
361            warn!("decommission_pool: entries resolve entry no selected");
362            return None;
363        };
364
365        if selected.is_dir() && dir_exists >= params.dir_quorum {
366            warn!("decommission_pool: entries resolve entry dir selected {:?}", selected.name);
367            return Some(selected);
368        }
369
370        // If we would never be able to reach read quorum.
371        if objs_valid < params.obj_quorum {
372            warn!(
373                "decommission_pool: entries resolve entry not enough objects {} < {}",
374                objs_valid, params.obj_quorum
375            );
376            return None;
377        }
378
379        if objs_agree == objs_valid {
380            warn!("decommission_pool: entries resolve entry all agree {} == {}", objs_agree, objs_valid);
381            return Some(selected);
382        }
383
384        let Some(cached) = selected.cached else {
385            warn!("decommission_pool: entries resolve entry no cached");
386            return None;
387        };
388
389        let versions = merge_file_meta_versions(params.obj_quorum, params.strict, params.requested_versions, &params.candidates);
390        if versions.is_empty() {
391            warn!("decommission_pool: entries resolve entry no versions");
392            return None;
393        }
394
395        let metadata = match cached.marshal_msg() {
396            Ok(meta) => meta,
397            Err(e) => {
398                warn!("decommission_pool: entries resolve entry marshal_msg {:?}", e);
399                return None;
400            }
401        };
402
403        // Merge if we have disagreement.
404        // Create a new merged result.
405        let new_selected = MetaCacheEntry {
406            name: selected.name.clone(),
407            cached: Some(FileMeta {
408                meta_ver: cached.meta_ver,
409                versions,
410                ..Default::default()
411            }),
412            reusable: true,
413            metadata,
414        };
415
416        warn!("decommission_pool: entries resolve entry selected {:?}", new_selected.name);
417        Some(new_selected)
418    }
419
420    pub fn first_found(&self) -> (Option<MetaCacheEntry>, usize) {
421        (self.0.iter().find(|x| x.is_some()).cloned().unwrap_or_default(), self.0.len())
422    }
423}
424
425#[derive(Debug, Default)]
426pub struct MetaCacheEntriesSortedResult {
427    pub entries: Option<MetaCacheEntriesSorted>,
428    pub err: Option<Error>,
429}
430
431#[derive(Debug, Default)]
432pub struct MetaCacheEntriesSorted {
433    pub o: MetaCacheEntries,
434    pub list_id: Option<String>,
435    pub reuse: bool,
436    pub last_skipped_entry: Option<String>,
437}
438
439impl MetaCacheEntriesSorted {
440    pub fn entries(&self) -> Vec<&MetaCacheEntry> {
441        let entries: Vec<&MetaCacheEntry> = self.o.0.iter().flatten().collect();
442        entries
443    }
444
445    pub fn forward_past(&mut self, marker: Option<String>) {
446        if let Some(val) = marker {
447            if let Some(idx) = self.o.0.iter().flatten().position(|v| v.name > val) {
448                self.o.0 = self.o.0.split_off(idx);
449            }
450        }
451    }
452}
453
454const METACACHE_STREAM_VERSION: u8 = 2;
455
456#[derive(Debug)]
457pub struct MetacacheWriter<W> {
458    wr: W,
459    created: bool,
460    buf: Vec<u8>,
461}
462
463impl<W: AsyncWrite + Unpin> MetacacheWriter<W> {
464    pub fn new(wr: W) -> Self {
465        Self {
466            wr,
467            created: false,
468            buf: Vec::new(),
469        }
470    }
471
472    pub async fn flush(&mut self) -> Result<()> {
473        self.wr.write_all(&self.buf).await?;
474        self.buf.clear();
475        Ok(())
476    }
477
478    pub async fn init(&mut self) -> Result<()> {
479        if !self.created {
480            rmp::encode::write_u8(&mut self.buf, METACACHE_STREAM_VERSION).map_err(|e| Error::other(format!("{e:?}")))?;
481            self.flush().await?;
482            self.created = true;
483        }
484        Ok(())
485    }
486
487    pub async fn write(&mut self, objs: &[MetaCacheEntry]) -> Result<()> {
488        if objs.is_empty() {
489            return Ok(());
490        }
491
492        self.init().await?;
493
494        for obj in objs.iter() {
495            if obj.name.is_empty() {
496                return Err(Error::other("metacacheWriter: no name"));
497            }
498
499            self.write_obj(obj).await?;
500        }
501
502        Ok(())
503    }
504
505    pub async fn write_obj(&mut self, obj: &MetaCacheEntry) -> Result<()> {
506        self.init().await?;
507
508        rmp::encode::write_bool(&mut self.buf, true).map_err(|e| Error::other(format!("{e:?}")))?;
509        rmp::encode::write_str(&mut self.buf, &obj.name).map_err(|e| Error::other(format!("{e:?}")))?;
510        rmp::encode::write_bin(&mut self.buf, &obj.metadata).map_err(|e| Error::other(format!("{e:?}")))?;
511        self.flush().await?;
512
513        Ok(())
514    }
515
516    pub async fn close(&mut self) -> Result<()> {
517        rmp::encode::write_bool(&mut self.buf, false).map_err(|e| Error::other(format!("{e:?}")))?;
518        self.flush().await?;
519        Ok(())
520    }
521}
522
523pub struct MetacacheReader<R> {
524    rd: R,
525    init: bool,
526    err: Option<Error>,
527    buf: Vec<u8>,
528    offset: usize,
529    current: Option<MetaCacheEntry>,
530}
531
532impl<R: AsyncRead + Unpin> MetacacheReader<R> {
533    pub fn new(rd: R) -> Self {
534        Self {
535            rd,
536            init: false,
537            err: None,
538            buf: Vec::new(),
539            offset: 0,
540            current: None,
541        }
542    }
543
544    pub async fn read_more(&mut self, read_size: usize) -> Result<&[u8]> {
545        let ext_size = read_size + self.offset;
546
547        let extra = ext_size - self.offset;
548        if self.buf.capacity() >= ext_size {
549            // Extend the buffer if we have enough space.
550            self.buf.resize(ext_size, 0);
551        } else {
552            self.buf.extend(vec![0u8; extra]);
553        }
554
555        let pref = self.offset;
556
557        self.rd.read_exact(&mut self.buf[pref..ext_size]).await?;
558
559        self.offset += read_size;
560
561        let data = &self.buf[pref..ext_size];
562
563        Ok(data)
564    }
565
566    fn reset(&mut self) {
567        self.buf.clear();
568        self.offset = 0;
569    }
570
571    async fn check_init(&mut self) -> Result<()> {
572        if !self.init {
573            let ver = match rmp::decode::read_u8(&mut self.read_more(2).await?) {
574                Ok(res) => res,
575                Err(err) => {
576                    self.err = Some(Error::other(format!("{err:?}")));
577                    0
578                }
579            };
580            match ver {
581                1 | 2 => (),
582                _ => {
583                    self.err = Some(Error::other("invalid version"));
584                }
585            }
586
587            self.init = true;
588        }
589        Ok(())
590    }
591
592    async fn read_str_len(&mut self) -> Result<u32> {
593        let mark = match rmp::decode::read_marker(&mut self.read_more(1).await?) {
594            Ok(res) => res,
595            Err(err) => {
596                let err: Error = err.into();
597                self.err = Some(err.clone());
598                return Err(err);
599            }
600        };
601
602        match mark {
603            Marker::FixStr(size) => Ok(u32::from(size)),
604            Marker::Str8 => Ok(u32::from(self.read_u8().await?)),
605            Marker::Str16 => Ok(u32::from(self.read_u16().await?)),
606            Marker::Str32 => Ok(self.read_u32().await?),
607            _marker => Err(Error::other("str marker err")),
608        }
609    }
610
611    async fn read_bin_len(&mut self) -> Result<u32> {
612        let mark = match rmp::decode::read_marker(&mut self.read_more(1).await?) {
613            Ok(res) => res,
614            Err(err) => {
615                let err: Error = err.into();
616                self.err = Some(err.clone());
617                return Err(err);
618            }
619        };
620
621        match mark {
622            Marker::Bin8 => Ok(u32::from(self.read_u8().await?)),
623            Marker::Bin16 => Ok(u32::from(self.read_u16().await?)),
624            Marker::Bin32 => Ok(self.read_u32().await?),
625            _ => Err(Error::other("bin marker err")),
626        }
627    }
628
629    async fn read_u8(&mut self) -> Result<u8> {
630        let buf = self.read_more(1).await?;
631        Ok(u8::from_be_bytes(buf.try_into().expect("Slice with incorrect length")))
632    }
633
634    async fn read_u16(&mut self) -> Result<u16> {
635        let buf = self.read_more(2).await?;
636        Ok(u16::from_be_bytes(buf.try_into().expect("Slice with incorrect length")))
637    }
638
639    async fn read_u32(&mut self) -> Result<u32> {
640        let buf = self.read_more(4).await?;
641        Ok(u32::from_be_bytes(buf.try_into().expect("Slice with incorrect length")))
642    }
643
644    pub async fn skip(&mut self, size: usize) -> Result<()> {
645        self.check_init().await?;
646
647        if let Some(err) = &self.err {
648            return Err(err.clone());
649        }
650
651        let mut n = size;
652
653        if self.current.is_some() {
654            n -= 1;
655            self.current = None;
656        }
657
658        while n > 0 {
659            match rmp::decode::read_bool(&mut self.read_more(1).await?) {
660                Ok(res) => {
661                    if !res {
662                        return Ok(());
663                    }
664                }
665                Err(err) => {
666                    let err: Error = err.into();
667                    self.err = Some(err.clone());
668                    return Err(err);
669                }
670            };
671
672            let l = self.read_str_len().await?;
673            let _ = self.read_more(l as usize).await?;
674            let l = self.read_bin_len().await?;
675            let _ = self.read_more(l as usize).await?;
676
677            n -= 1;
678        }
679
680        Ok(())
681    }
682
683    pub async fn peek(&mut self) -> Result<Option<MetaCacheEntry>> {
684        self.check_init().await?;
685
686        if let Some(err) = &self.err {
687            return Err(err.clone());
688        }
689
690        match rmp::decode::read_bool(&mut self.read_more(1).await?) {
691            Ok(res) => {
692                if !res {
693                    return Ok(None);
694                }
695            }
696            Err(err) => {
697                let err: Error = err.into();
698                self.err = Some(err.clone());
699                return Err(err);
700            }
701        };
702
703        let l = self.read_str_len().await?;
704
705        let buf = self.read_more(l as usize).await?;
706        let name_buf = buf.to_vec();
707        let name = match from_utf8(&name_buf) {
708            Ok(decoded) => decoded.to_owned(),
709            Err(err) => {
710                self.err = Some(Error::other(err.to_string()));
711                return Err(Error::other(err.to_string()));
712            }
713        };
714
715        let l = self.read_bin_len().await?;
716
717        let buf = self.read_more(l as usize).await?;
718
719        let metadata = buf.to_vec();
720
721        self.reset();
722
723        let entry = Some(MetaCacheEntry {
724            name,
725            metadata,
726            cached: None,
727            reusable: false,
728        });
729        self.current = entry.clone();
730
731        Ok(entry)
732    }
733
734    pub async fn read_all(&mut self) -> Result<Vec<MetaCacheEntry>> {
735        let mut ret = Vec::new();
736
737        loop {
738            if let Some(entry) = self.peek().await? {
739                ret.push(entry);
740                continue;
741            }
742            break;
743        }
744
745        Ok(ret)
746    }
747}
748
749pub type UpdateFn<T> = Box<dyn Fn() -> Pin<Box<dyn Future<Output = std::io::Result<T>> + Send>> + Send + Sync + 'static>;
750
751#[derive(Clone, Debug, Default)]
752pub struct Opts {
753    pub return_last_good: bool,
754    pub no_wait: bool,
755}
756
757pub struct Cache<T: Clone + Debug + Send> {
758    update_fn: UpdateFn<T>,
759    ttl: Duration,
760    opts: Opts,
761    val: AtomicPtr<T>,
762    last_update_ms: AtomicU64,
763    updating: Arc<Mutex<bool>>,
764}
765
766impl<T: Clone + Debug + Send + 'static> Cache<T> {
767    pub fn new(update_fn: UpdateFn<T>, ttl: Duration, opts: Opts) -> Self {
768        let val = AtomicPtr::new(ptr::null_mut());
769        Self {
770            update_fn,
771            ttl,
772            opts,
773            val,
774            last_update_ms: AtomicU64::new(0),
775            updating: Arc::new(Mutex::new(false)),
776        }
777    }
778
779    #[allow(unsafe_code)]
780    pub async fn get(self: Arc<Self>) -> std::io::Result<T> {
781        let v_ptr = self.val.load(AtomicOrdering::SeqCst);
782        let v = if v_ptr.is_null() {
783            None
784        } else {
785            Some(unsafe { (*v_ptr).clone() })
786        };
787
788        let now = SystemTime::now()
789            .duration_since(UNIX_EPOCH)
790            .expect("Time went backwards")
791            .as_secs();
792        if now - self.last_update_ms.load(AtomicOrdering::SeqCst) < self.ttl.as_secs() {
793            if let Some(v) = v {
794                return Ok(v);
795            }
796        }
797
798        if self.opts.no_wait && v.is_some() && now - self.last_update_ms.load(AtomicOrdering::SeqCst) < self.ttl.as_secs() * 2 {
799            if self.updating.try_lock().is_ok() {
800                let this = Arc::clone(&self);
801                spawn(async move {
802                    let _ = this.update().await;
803                });
804            }
805
806            return Ok(v.unwrap());
807        }
808
809        let _ = self.updating.lock().await;
810
811        if let Ok(duration) =
812            SystemTime::now().duration_since(UNIX_EPOCH + Duration::from_secs(self.last_update_ms.load(AtomicOrdering::SeqCst)))
813        {
814            if duration < self.ttl {
815                return Ok(v.unwrap());
816            }
817        }
818
819        match self.update().await {
820            Ok(_) => {
821                let v_ptr = self.val.load(AtomicOrdering::SeqCst);
822                let v = if v_ptr.is_null() {
823                    None
824                } else {
825                    Some(unsafe { (*v_ptr).clone() })
826                };
827                Ok(v.unwrap())
828            }
829            Err(err) => Err(err),
830        }
831    }
832
833    async fn update(&self) -> std::io::Result<()> {
834        match (self.update_fn)().await {
835            Ok(val) => {
836                self.val.store(Box::into_raw(Box::new(val)), AtomicOrdering::SeqCst);
837                let now = SystemTime::now()
838                    .duration_since(UNIX_EPOCH)
839                    .expect("Time went backwards")
840                    .as_secs();
841                self.last_update_ms.store(now, AtomicOrdering::SeqCst);
842                Ok(())
843            }
844            Err(err) => {
845                let v_ptr = self.val.load(AtomicOrdering::SeqCst);
846                if self.opts.return_last_good && !v_ptr.is_null() {
847                    return Ok(());
848                }
849
850                Err(err)
851            }
852        }
853    }
854}
855
856#[cfg(test)]
857mod tests {
858    use super::*;
859    use std::io::Cursor;
860
861    #[tokio::test]
862    async fn test_writer() {
863        let mut f = Cursor::new(Vec::new());
864        let mut w = MetacacheWriter::new(&mut f);
865
866        let mut objs = Vec::new();
867        for i in 0..10 {
868            let info = MetaCacheEntry {
869                name: format!("item{i}"),
870                metadata: vec![0u8, 10],
871                cached: None,
872                reusable: false,
873            };
874            objs.push(info);
875        }
876
877        w.write(&objs).await.unwrap();
878        w.close().await.unwrap();
879
880        let data = f.into_inner();
881        let nf = Cursor::new(data);
882
883        let mut r = MetacacheReader::new(nf);
884        let nobjs = r.read_all().await.unwrap();
885
886        assert_eq!(objs, nobjs);
887    }
888}