1use std::ops::Range;
8use std::sync::Arc;
9
10use bytes::Bytes;
11use moka::future::Cache;
12
13use crate::{
14 block::{BlockMeta, CodecId},
15 codec::decompress_by_id,
16 error::{Error, Result},
17 storage::Storage,
18};
19
20const IO_SLAB_SIZE: u64 = 1024 * 1024; #[derive(Clone)]
27struct IoCache {
28 inner: Cache<(Arc<str>, u64), Bytes>,
29 file_sizes: Cache<Arc<str>, u64>,
31}
32
33impl IoCache {
34 fn new(max_capacity_bytes: u64) -> Self {
35 let inner = Cache::builder()
36 .weigher(|_: &(Arc<str>, u64), v: &Bytes| v.len().try_into().unwrap_or(u32::MAX))
37 .max_capacity(max_capacity_bytes)
38 .build();
39 let file_sizes = Cache::builder().max_capacity(1024).build();
40 Self { inner, file_sizes }
41 }
42
43 async fn read_range(
45 &self,
46 path: &Arc<str>,
47 range: Range<u64>,
48 storage: &(dyn Storage + Sync),
49 ) -> Result<Bytes> {
50 let start_slab = range.start / IO_SLAB_SIZE;
51 let end_slab = (range.end.saturating_sub(1)) / IO_SLAB_SIZE;
52
53 if start_slab == end_slab {
54 let slab = self.load_slab(path, start_slab, storage).await?;
55 let offset = (range.start - start_slab * IO_SLAB_SIZE) as usize;
56 let len = (range.end - range.start) as usize;
57 Ok(slab.slice(offset..offset + len))
58 } else {
59 let mut out = Vec::with_capacity((range.end - range.start) as usize);
60 for slab_idx in start_slab..=end_slab {
61 let slab = self.load_slab(path, slab_idx, storage).await?;
62 let slab_start = slab_idx * IO_SLAB_SIZE;
63 let from = (range.start.max(slab_start) - slab_start) as usize;
64 let to = (range.end.min(slab_start + IO_SLAB_SIZE) - slab_start) as usize;
65 out.extend_from_slice(&slab[from..to.min(slab.len())]);
66 }
67 Ok(Bytes::from(out))
68 }
69 }
70
71 async fn load_slab(
72 &self,
73 path: &Arc<str>,
74 slab_idx: u64,
75 storage: &(dyn Storage + Sync),
76 ) -> Result<Bytes> {
77 let file_size = self
78 .file_sizes
79 .try_get_with(Arc::clone(path), async move { storage.size().await })
80 .await
81 .map_err(|e| Error::Storage(format!("size fetch failed: {e}")))?;
82
83 let slab_start = slab_idx * IO_SLAB_SIZE;
84 let slab_end = (slab_start + IO_SLAB_SIZE).min(file_size);
85 let key = (Arc::clone(path), slab_idx);
86 self.inner
87 .try_get_with(key, async move {
88 storage.read_range(slab_start..slab_end).await
89 })
90 .await
91 .map_err(|e| Error::Storage(format!("io slab load failed: {e}")))
92 }
93}
94
95#[derive(Clone)]
103pub struct DeltaCache {
104 block_cache: Cache<(Arc<str>, u32), Bytes>,
105 io_cache: Option<IoCache>,
106}
107
108impl DeltaCache {
109 pub fn new(block_capacity: u64, io_capacity: u64) -> Self {
114 let block_cache = Cache::builder()
115 .weigher(|_: &(Arc<str>, u32), v: &Bytes| v.len().try_into().unwrap_or(u32::MAX))
116 .max_capacity(block_capacity)
117 .build();
118 let io_cache = if io_capacity > 0 {
119 Some(IoCache::new(io_capacity))
120 } else {
121 None
122 };
123 Self {
124 block_cache,
125 io_cache,
126 }
127 }
128
129 pub(crate) async fn get_or_load(
133 &self,
134 path: &Arc<str>,
135 block_meta: &BlockMeta,
136 storage: &(dyn Storage + Sync),
137 ) -> Result<Bytes> {
138 let key = (Arc::clone(path), block_meta.id.0);
139 let range = block_meta.file_range();
140 let uncompressed_size = block_meta.uncompressed_size as usize;
141 let codec_id = block_meta.codec.clone();
142 let io_cache = self.io_cache.clone();
143 let path_for_io = Arc::clone(path);
144
145 self.block_cache
146 .try_get_with(key, async move {
147 let same_slab =
151 range.start / IO_SLAB_SIZE == (range.end.saturating_sub(1)) / IO_SLAB_SIZE;
152 let raw = match &io_cache {
153 Some(io) if same_slab => io.read_range(&path_for_io, range, storage).await?,
154 _ => storage.read_range(range).await?,
155 };
156 let decompressed = if codec_id == CodecId::None {
157 raw
158 } else {
159 let vec = tokio::task::spawn_blocking(move || {
160 decompress_by_id(&codec_id, &raw, uncompressed_size)
161 })
162 .await
163 .map_err(|e| Error::Storage(format!("decompression task failed: {e}")))??;
164 Bytes::from(vec)
165 };
166 Ok::<Bytes, Error>(decompressed)
167 })
168 .await
169 .map_err(|e| Error::Storage(format!("cache load failed: {e}")))
170 }
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176 use crate::address::BlockId;
177 use crate::block::{BlockMeta, CodecId};
178 use crate::storage::InMemoryStorage;
179
180 fn make_block_meta(id: u32, offset: u64, size: u64) -> BlockMeta {
181 BlockMeta {
182 id: BlockId(id),
183 file_offset: offset,
184 compressed_size: size,
185 uncompressed_size: size,
186 codec: CodecId::None,
187 }
188 }
189
190 #[tokio::test]
193 async fn block_cache_miss_loads_block() {
194 let data = vec![0xAA; 100];
195 let storage = InMemoryStorage::from_bytes(data.clone());
196 let cache = DeltaCache::new(1024, 0);
197 let meta = make_block_meta(0, 0, 100);
198
199 let result = cache
200 .get_or_load(&Arc::from("p"), &meta, &storage)
201 .await
202 .unwrap();
203 assert_eq!(&result[..], &data[..]);
204 }
205
206 #[tokio::test]
207 async fn block_cache_hit_returns_same_bytes() {
208 let data = vec![0xBB; 200];
209 let storage = InMemoryStorage::from_bytes(data.clone());
210 let cache = DeltaCache::new(4096, 0);
211 let meta = make_block_meta(0, 0, 200);
212
213 let first = cache
214 .get_or_load(&Arc::from("p"), &meta, &storage)
215 .await
216 .unwrap();
217 let second = cache
218 .get_or_load(&Arc::from("p"), &meta, &storage)
219 .await
220 .unwrap();
221 assert_eq!(first, second);
222 assert_eq!(&first[..], &data[..]);
223 }
224
225 #[tokio::test]
228 async fn io_cache_read_within_single_slab() {
229 let mut data = vec![0u8; 2 * 1024 * 1024]; for (i, b) in data.iter_mut().enumerate() {
231 *b = (i % 251) as u8;
232 }
233 let storage = InMemoryStorage::from_bytes(data.clone());
234 let io = IoCache::new(4 * 1024 * 1024);
235
236 let range = 100u64..500u64;
237 let result = io
238 .read_range(&Arc::from("p"), range.clone(), &storage)
239 .await
240 .unwrap();
241 assert_eq!(&result[..], &data[100..500]);
242 }
243
244 #[tokio::test]
245 async fn io_cache_read_spanning_two_slabs() {
246 let size = 2 * 1024 * 1024usize;
247 let mut data = vec![0u8; size];
248 for (i, b) in data.iter_mut().enumerate() {
249 *b = (i % 251) as u8;
250 }
251 let storage = InMemoryStorage::from_bytes(data.clone());
252 let io = IoCache::new(4 * 1024 * 1024);
253
254 let start = 1024 * 1024 - 100;
256 let end = 1024 * 1024 + 100;
257 let result = io
258 .read_range(&Arc::from("p"), start as u64..end as u64, &storage)
259 .await
260 .unwrap();
261 assert_eq!(&result[..], &data[start..end]);
262 }
263
264 #[tokio::test]
265 async fn io_cache_handles_partial_last_slab() {
266 let data = vec![0xCC; 500_000];
268 let storage = InMemoryStorage::from_bytes(data.clone());
269 let io = IoCache::new(2 * 1024 * 1024);
270
271 let result = io
272 .read_range(&Arc::from("p"), 0..500_000, &storage)
273 .await
274 .unwrap();
275 assert_eq!(&result[..], &data[..]);
276 }
277
278 #[tokio::test]
279 async fn block_cache_with_io_tier_returns_correct_bytes() {
280 let data = vec![0xDE; 300];
281 let storage = InMemoryStorage::from_bytes(data.clone());
282 let cache = DeltaCache::new(4096, 2 * 1024 * 1024);
283 let meta = make_block_meta(0, 0, 300);
284
285 let result = cache
286 .get_or_load(&Arc::from("p"), &meta, &storage)
287 .await
288 .unwrap();
289 assert_eq!(&result[..], &data[..]);
290 }
291}