1mod any;
4pub mod forest;
5mod many;
6pub mod plain;
7
8pub use any::AnyCar;
9pub use forest::ForestCar;
10use get_size2::GetSize as _;
11pub use many::{ManyCar, ReloadableManyCar};
12pub use plain::PlainCar;
13
14use cid::Cid;
15use positioned_io::{ReadAt, Size};
16use std::{
17 num::NonZeroUsize,
18 sync::{
19 Arc, LazyLock,
20 atomic::{AtomicUsize, Ordering},
21 },
22};
23
24use crate::utils::{ShallowClone, cache::SizeTrackingLruCache, get_size::CidWrapper};
25
26pub trait RandomAccessFileReader: ReadAt + Size + Send + Sync + 'static {}
27impl<X: ReadAt + Size + Send + Sync + 'static> RandomAccessFileReader for X {}
28
29pub type CacheKey = u64;
32
33type FrameOffset = u64;
34
35pub const V2_SNAPSHOT_ROOT_COUNT: usize = 1;
37
38pub static ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE: LazyLock<usize> = LazyLock::new(|| {
39 const ENV_KEY: &str = "FOREST_ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE";
40 if let Ok(value) = std::env::var(ENV_KEY) {
41 if let Ok(size) = value.parse::<NonZeroUsize>() {
42 let size = size.get();
43 tracing::info!("zstd frame max size is set to {size} via {ENV_KEY}");
44 return size;
45 } else {
46 tracing::error!(
47 "Failed to parse {ENV_KEY}={value}, value should be a positive integer"
48 );
49 }
50 }
51 256 * 1024 * 1024
53});
54
55pub struct ZstdFrameCache {
56 pub max_size: usize,
59 current_size: Arc<AtomicUsize>,
60 lru: SizeTrackingLruCache<(FrameOffset, CacheKey), hashbrown::HashMap<CidWrapper, Vec<u8>>>,
63}
64
65impl ShallowClone for ZstdFrameCache {
66 fn shallow_clone(&self) -> Self {
67 Self {
68 max_size: self.max_size,
69 current_size: self.current_size.shallow_clone(),
70 lru: self.lru.shallow_clone(),
71 }
72 }
73}
74
75impl Default for ZstdFrameCache {
76 fn default() -> Self {
77 ZstdFrameCache::new(*ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE)
78 }
79}
80
81impl ZstdFrameCache {
82 pub fn new(max_size: usize) -> Self {
83 ZstdFrameCache {
84 max_size,
85 current_size: Arc::new(AtomicUsize::new(0)),
86 lru: SizeTrackingLruCache::unbounded_with_metrics("zstd_frame".into()),
87 }
88 }
89
90 pub fn get(&self, offset: FrameOffset, key: CacheKey, cid: Cid) -> Option<Option<Vec<u8>>> {
93 self.lru
94 .cache()
95 .write()
96 .get(&(offset, key))
97 .map(|index| index.get(&CidWrapper::from(cid)).cloned())
98 }
99
100 pub fn put(
102 &self,
103 offset: FrameOffset,
104 key: CacheKey,
105 mut index: hashbrown::HashMap<CidWrapper, Vec<u8>>,
106 ) {
107 index.shrink_to_fit();
108
109 let lru_key = (offset, key);
110 let lru_key_size = lru_key.get_size();
111 let entry_size = index.get_size();
112 if entry_size.saturating_add(lru_key_size) >= self.max_size {
114 return;
115 }
116
117 if let Some(prev_entry) = self.lru.push(lru_key, index) {
118 self.current_size.fetch_add(entry_size, Ordering::Relaxed);
120 self.current_size
121 .fetch_sub(prev_entry.get_size(), Ordering::Relaxed);
122 } else {
123 self.current_size
124 .fetch_add(entry_size.saturating_add(lru_key_size), Ordering::Relaxed);
125 }
126 while self.current_size.load(Ordering::Relaxed) > self.max_size {
127 if let Some((prev_key, prev_entry)) = self.lru.pop_lru() {
128 self.current_size.fetch_sub(
129 prev_key.get_size().saturating_add(prev_entry.get_size()),
130 Ordering::Relaxed,
131 );
132 } else {
133 break;
134 }
135 }
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142 use crate::utils::{multihash::MultihashCode, rand::forest_rng};
143 use fvm_ipld_encoding::IPLD_RAW;
144 use multihash_derive::MultihashDigest;
145 use rand::Rng;
146
147 #[test]
148 fn test_zstd_frame_cache_size() {
149 let mut rng = forest_rng();
150 let cache = ZstdFrameCache::new(10);
151 for i in 0..100 {
152 let index = gen_index(&mut rng);
153 cache.put(i, i, index);
154 assert_eq!(
155 cache.current_size.load(Ordering::Relaxed),
156 cache.lru.size_in_bytes()
157 );
158 let index2 = gen_index(&mut rng);
159 cache.put(i, i, index2);
160 assert_eq!(
161 cache.current_size.load(Ordering::Relaxed),
162 cache.lru.size_in_bytes()
163 );
164 }
165 }
166
167 fn gen_index(rng: &mut impl Rng) -> hashbrown::HashMap<CidWrapper, Vec<u8>> {
168 let mut map = hashbrown::HashMap::default();
169 for _ in 0..10 {
170 let vec_len = rng.gen_range(64..1024);
171 let mut data = vec![0; vec_len];
172 rng.fill_bytes(&mut data);
173 let cid = Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data));
174 map.insert(cid.into(), data);
175 }
176 map
177 }
178}