1use std::ops::Range;
2use std::sync::Arc;
3
4use bytes::Bytes;
5use moka::future::Cache;
6
7use crate::{
8 block::{BlockMeta, CodecId},
9 codec::decompress_by_id,
10 error::{Error, Result},
11 storage::Storage,
12};
13
14const IO_SLAB_SIZE: u64 = 1024 * 1024; #[derive(Clone)]
21struct IoCache {
22 inner: Cache<(Arc<str>, u64), Bytes>,
23 file_sizes: Cache<Arc<str>, u64>,
25}
26
27impl IoCache {
28 fn new(max_capacity_bytes: u64) -> Self {
29 let inner = Cache::builder()
30 .weigher(|_: &(Arc<str>, u64), v: &Bytes| v.len().try_into().unwrap_or(u32::MAX))
31 .max_capacity(max_capacity_bytes)
32 .build();
33 let file_sizes = Cache::builder().max_capacity(1024).build();
34 Self { inner, file_sizes }
35 }
36
37 async fn read_range(
39 &self,
40 path: &Arc<str>,
41 range: Range<u64>,
42 storage: &(dyn Storage + Sync),
43 ) -> Result<Bytes> {
44 let start_slab = range.start / IO_SLAB_SIZE;
45 let end_slab = (range.end.saturating_sub(1)) / IO_SLAB_SIZE;
46
47 if start_slab == end_slab {
48 let slab = self.load_slab(path, start_slab, storage).await?;
49 let offset = (range.start - start_slab * IO_SLAB_SIZE) as usize;
50 let len = (range.end - range.start) as usize;
51 Ok(slab.slice(offset..offset + len))
52 } else {
53 let mut out = Vec::with_capacity((range.end - range.start) as usize);
54 for slab_idx in start_slab..=end_slab {
55 let slab = self.load_slab(path, slab_idx, storage).await?;
56 let slab_start = slab_idx * IO_SLAB_SIZE;
57 let from = (range.start.max(slab_start) - slab_start) as usize;
58 let to = (range.end.min(slab_start + IO_SLAB_SIZE) - slab_start) as usize;
59 out.extend_from_slice(&slab[from..to.min(slab.len())]);
60 }
61 Ok(Bytes::from(out))
62 }
63 }
64
65 async fn load_slab(
66 &self,
67 path: &Arc<str>,
68 slab_idx: u64,
69 storage: &(dyn Storage + Sync),
70 ) -> Result<Bytes> {
71 let file_size = self
72 .file_sizes
73 .try_get_with(Arc::clone(path), async move { storage.size().await })
74 .await
75 .map_err(|e| Error::Storage(format!("size fetch failed: {e}")))?;
76
77 let slab_start = slab_idx * IO_SLAB_SIZE;
78 let slab_end = (slab_start + IO_SLAB_SIZE).min(file_size);
79 let key = (Arc::clone(path), slab_idx);
80 self.inner
81 .try_get_with(key, async move {
82 storage.read_range(slab_start..slab_end).await
83 })
84 .await
85 .map_err(|e| Error::Storage(format!("io slab load failed: {e}")))
86 }
87}
88
89#[derive(Clone)]
97pub struct DeltaCache {
98 block_cache: Cache<(Arc<str>, u32), Bytes>,
99 io_cache: Option<IoCache>,
100}
101
102impl DeltaCache {
103 pub fn new(block_capacity: u64, io_capacity: u64) -> Self {
108 let block_cache = Cache::builder()
109 .weigher(|_: &(Arc<str>, u32), v: &Bytes| v.len().try_into().unwrap_or(u32::MAX))
110 .max_capacity(block_capacity)
111 .build();
112 let io_cache = if io_capacity > 0 {
113 Some(IoCache::new(io_capacity))
114 } else {
115 None
116 };
117 Self {
118 block_cache,
119 io_cache,
120 }
121 }
122
123 pub async fn get_or_load(
127 &self,
128 path: &Arc<str>,
129 block_meta: &BlockMeta,
130 storage: &(dyn Storage + Sync),
131 ) -> Result<Bytes> {
132 let key = (Arc::clone(path), block_meta.id.0);
133 let range = block_meta.file_range();
134 let uncompressed_size = block_meta.uncompressed_size as usize;
135 let codec_id = block_meta.codec.clone();
136 let io_cache = self.io_cache.clone();
137 let path_for_io = Arc::clone(path);
138
139 self.block_cache
140 .try_get_with(key, async move {
141 let same_slab =
145 range.start / IO_SLAB_SIZE == (range.end.saturating_sub(1)) / IO_SLAB_SIZE;
146 let raw = match &io_cache {
147 Some(io) if same_slab => io.read_range(&path_for_io, range, storage).await?,
148 _ => storage.read_range(range).await?,
149 };
150 let decompressed = if codec_id == CodecId::None {
151 raw
152 } else {
153 let vec = tokio::task::spawn_blocking(move || {
154 decompress_by_id(&codec_id, &raw, uncompressed_size)
155 })
156 .await
157 .map_err(|e| Error::Storage(format!("decompression task failed: {e}")))??;
158 Bytes::from(vec)
159 };
160 Ok::<Bytes, Error>(decompressed)
161 })
162 .await
163 .map_err(|e| Error::Storage(format!("cache load failed: {e}")))
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170 use crate::address::BlockId;
171 use crate::block::{BlockMeta, CodecId};
172 use crate::storage::InMemoryStorage;
173
174 fn make_block_meta(id: u32, offset: u64, size: u64) -> BlockMeta {
175 BlockMeta {
176 id: BlockId(id),
177 file_offset: offset,
178 compressed_size: size,
179 uncompressed_size: size,
180 codec: CodecId::None,
181 }
182 }
183
184 #[tokio::test]
187 async fn block_cache_miss_loads_block() {
188 let data = vec![0xAA; 100];
189 let storage = InMemoryStorage::from_bytes(data.clone());
190 let cache = DeltaCache::new(1024, 0);
191 let meta = make_block_meta(0, 0, 100);
192
193 let result = cache
194 .get_or_load(&Arc::from("p"), &meta, &storage)
195 .await
196 .unwrap();
197 assert_eq!(&result[..], &data[..]);
198 }
199
200 #[tokio::test]
201 async fn block_cache_hit_returns_same_bytes() {
202 let data = vec![0xBB; 200];
203 let storage = InMemoryStorage::from_bytes(data.clone());
204 let cache = DeltaCache::new(4096, 0);
205 let meta = make_block_meta(0, 0, 200);
206
207 let first = cache
208 .get_or_load(&Arc::from("p"), &meta, &storage)
209 .await
210 .unwrap();
211 let second = cache
212 .get_or_load(&Arc::from("p"), &meta, &storage)
213 .await
214 .unwrap();
215 assert_eq!(first, second);
216 assert_eq!(&first[..], &data[..]);
217 }
218
219 #[tokio::test]
222 async fn io_cache_read_within_single_slab() {
223 let mut data = vec![0u8; 2 * 1024 * 1024]; for (i, b) in data.iter_mut().enumerate() {
225 *b = (i % 251) as u8;
226 }
227 let storage = InMemoryStorage::from_bytes(data.clone());
228 let io = IoCache::new(4 * 1024 * 1024);
229
230 let range = 100u64..500u64;
231 let result = io
232 .read_range(&Arc::from("p"), range.clone(), &storage)
233 .await
234 .unwrap();
235 assert_eq!(&result[..], &data[100..500]);
236 }
237
238 #[tokio::test]
239 async fn io_cache_read_spanning_two_slabs() {
240 let size = 2 * 1024 * 1024usize;
241 let mut data = vec![0u8; size];
242 for (i, b) in data.iter_mut().enumerate() {
243 *b = (i % 251) as u8;
244 }
245 let storage = InMemoryStorage::from_bytes(data.clone());
246 let io = IoCache::new(4 * 1024 * 1024);
247
248 let start = 1024 * 1024 - 100;
250 let end = 1024 * 1024 + 100;
251 let result = io
252 .read_range(&Arc::from("p"), start as u64..end as u64, &storage)
253 .await
254 .unwrap();
255 assert_eq!(&result[..], &data[start..end]);
256 }
257
258 #[tokio::test]
259 async fn io_cache_handles_partial_last_slab() {
260 let data = vec![0xCC; 500_000];
262 let storage = InMemoryStorage::from_bytes(data.clone());
263 let io = IoCache::new(2 * 1024 * 1024);
264
265 let result = io
266 .read_range(&Arc::from("p"), 0..500_000, &storage)
267 .await
268 .unwrap();
269 assert_eq!(&result[..], &data[..]);
270 }
271
272 #[tokio::test]
273 async fn block_cache_with_io_tier_returns_correct_bytes() {
274 let data = vec![0xDE; 300];
275 let storage = InMemoryStorage::from_bytes(data.clone());
276 let cache = DeltaCache::new(4096, 2 * 1024 * 1024);
277 let meta = make_block_meta(0, 0, 300);
278
279 let result = cache
280 .get_or_load(&Arc::from("p"), &meta, &storage)
281 .await
282 .unwrap();
283 assert_eq!(&result[..], &data[..]);
284 }
285}