forest/db/car/
mod.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3mod any;
4pub mod forest;
5mod many;
6pub mod plain;
7
8pub use any::AnyCar;
9pub use forest::ForestCar;
10use get_size2::GetSize as _;
11pub use many::ManyCar;
12pub use plain::PlainCar;
13
14use cid::Cid;
15use positioned_io::{ReadAt, Size};
16use std::{
17    num::NonZeroUsize,
18    sync::{
19        LazyLock,
20        atomic::{AtomicUsize, Ordering},
21    },
22};
23
24use crate::utils::{cache::SizeTrackingLruCache, get_size::CidWrapper};
25
26pub trait RandomAccessFileReader: ReadAt + Size + Send + Sync + 'static {}
27impl<X: ReadAt + Size + Send + Sync + 'static> RandomAccessFileReader for X {}
28
29/// Multiple `.forest.car.zst` archives may use the same cache, each with a
30/// unique cache key.
31pub type CacheKey = u64;
32
33type FrameOffset = u64;
34
35/// According to FRC-0108, v2 snapshots have exactly one root pointing to metadata
36pub const V2_SNAPSHOT_ROOT_COUNT: usize = 1;
37
38pub static ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE: LazyLock<usize> = LazyLock::new(|| {
39    const ENV_KEY: &str = "FOREST_ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE";
40    if let Ok(value) = std::env::var(ENV_KEY) {
41        if let Ok(size) = value.parse::<NonZeroUsize>() {
42            let size = size.get();
43            tracing::info!("zstd frame max size is set to {size} via {ENV_KEY}");
44            return size;
45        } else {
46            tracing::error!(
47                "Failed to parse {ENV_KEY}={value}, value should be a positive integer"
48            );
49        }
50    }
51    // 256 MiB
52    256 * 1024 * 1024
53});
54
55pub struct ZstdFrameCache {
56    /// Maximum size in bytes. Pages will be evicted if the total size of the
57    /// cache exceeds this amount.
58    pub max_size: usize,
59    current_size: AtomicUsize,
60    // use `hashbrown::HashMap` here because its `GetSize` implementation is accurate
61    // (thanks to `hashbrown::HashMap::allocation_size`).
62    lru: SizeTrackingLruCache<(FrameOffset, CacheKey), hashbrown::HashMap<CidWrapper, Vec<u8>>>,
63}
64
65impl Default for ZstdFrameCache {
66    fn default() -> Self {
67        ZstdFrameCache::new(*ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE)
68    }
69}
70
71impl ZstdFrameCache {
72    pub fn new(max_size: usize) -> Self {
73        ZstdFrameCache {
74            max_size,
75            current_size: AtomicUsize::new(0),
76            lru: SizeTrackingLruCache::unbounded_with_metrics("zstd_frame".into()),
77        }
78    }
79
80    /// Return a clone of the value associated with `cid`. If a value is found,
81    /// the cache entry is moved to the top of the queue.
82    pub fn get(&self, offset: FrameOffset, key: CacheKey, cid: Cid) -> Option<Option<Vec<u8>>> {
83        self.lru
84            .cache()
85            .write()
86            .get(&(offset, key))
87            .map(|index| index.get(&CidWrapper::from(cid)).cloned())
88    }
89
90    /// Insert entry into lru-cache and evict pages if `max_size` has been exceeded.
91    pub fn put(
92        &self,
93        offset: FrameOffset,
94        key: CacheKey,
95        mut index: hashbrown::HashMap<CidWrapper, Vec<u8>>,
96    ) {
97        index.shrink_to_fit();
98
99        let lru_key = (offset, key);
100        let lru_key_size = lru_key.get_size();
101        let entry_size = index.get_size();
102        // Skip large items
103        if entry_size.saturating_add(lru_key_size) >= self.max_size {
104            return;
105        }
106
107        if let Some(prev_entry) = self.lru.push(lru_key, index) {
108            // keys are cancelled out
109            self.current_size.fetch_add(entry_size, Ordering::Relaxed);
110            self.current_size
111                .fetch_sub(prev_entry.get_size(), Ordering::Relaxed);
112        } else {
113            self.current_size
114                .fetch_add(entry_size.saturating_add(lru_key_size), Ordering::Relaxed);
115        }
116        while self.current_size.load(Ordering::Relaxed) > self.max_size {
117            if let Some((prev_key, prev_entry)) = self.lru.pop_lru() {
118                self.current_size.fetch_sub(
119                    prev_key.get_size().saturating_add(prev_entry.get_size()),
120                    Ordering::Relaxed,
121                );
122            } else {
123                break;
124            }
125        }
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132    use crate::utils::{multihash::MultihashCode, rand::forest_rng};
133    use fvm_ipld_encoding::IPLD_RAW;
134    use multihash_derive::MultihashDigest;
135    use rand::Rng;
136
137    #[test]
138    fn test_zstd_frame_cache_size() {
139        let mut rng = forest_rng();
140        let cache = ZstdFrameCache::new(10);
141        for i in 0..100 {
142            let index = gen_index(&mut rng);
143            cache.put(i, i, index);
144            assert_eq!(
145                cache.current_size.load(Ordering::Relaxed),
146                cache.lru.size_in_bytes()
147            );
148            let index2 = gen_index(&mut rng);
149            cache.put(i, i, index2);
150            assert_eq!(
151                cache.current_size.load(Ordering::Relaxed),
152                cache.lru.size_in_bytes()
153            );
154        }
155    }
156
157    fn gen_index(rng: &mut impl Rng) -> hashbrown::HashMap<CidWrapper, Vec<u8>> {
158        let mut map = hashbrown::HashMap::default();
159        for _ in 0..10 {
160            let vec_len = rng.gen_range(64..1024);
161            let mut data = vec![0; vec_len];
162            rng.fill_bytes(&mut data);
163            let cid = Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data));
164            map.insert(cid.into(), data);
165        }
166        map
167    }
168}