Skip to main content

array_format/delta/
cache.rs

1//! Two-level read cache shared across the delta layers of a file.
2//!
3//! [`DeltaCache`] holds decompressed blocks (level 1) on top of an internal
4//! raw-bytes slab cache (level 2), so repeated reads avoid both storage round
5//! trips and redundant decompression.
6
7use std::ops::Range;
8use std::sync::Arc;
9
10use bytes::Bytes;
11use moka::future::Cache;
12
13use crate::{
14    block::{BlockMeta, CodecId},
15    codec::decompress_by_id,
16    error::{Error, Result},
17    storage::Storage,
18};
19
20const IO_SLAB_SIZE: u64 = 1024 * 1024; // 1 MiB
21
22/// Raw I/O cache: caches compressed file bytes in 1 MiB aligned slabs.
23///
24/// Key: `(delta_path, slab_index)`. Concurrent requests for the same slab are
25/// coalesced — only one task performs the actual storage read.
26#[derive(Clone)]
27struct IoCache {
28    inner: Cache<(Arc<str>, u64), Bytes>,
29    /// File sizes cached per path so `storage.size()` is only called once per delta.
30    file_sizes: Cache<Arc<str>, u64>,
31}
32
33impl IoCache {
34    fn new(max_capacity_bytes: u64) -> Self {
35        let inner = Cache::builder()
36            .weigher(|_: &(Arc<str>, u64), v: &Bytes| v.len().try_into().unwrap_or(u32::MAX))
37            .max_capacity(max_capacity_bytes)
38            .build();
39        let file_sizes = Cache::builder().max_capacity(1024).build();
40        Self { inner, file_sizes }
41    }
42
43    /// Reads `range` bytes, routing through aligned 1 MiB slab loads.
44    async fn read_range(
45        &self,
46        path: &Arc<str>,
47        range: Range<u64>,
48        storage: &(dyn Storage + Sync),
49    ) -> Result<Bytes> {
50        let start_slab = range.start / IO_SLAB_SIZE;
51        let end_slab = (range.end.saturating_sub(1)) / IO_SLAB_SIZE;
52
53        if start_slab == end_slab {
54            let slab = self.load_slab(path, start_slab, storage).await?;
55            let offset = (range.start - start_slab * IO_SLAB_SIZE) as usize;
56            let len = (range.end - range.start) as usize;
57            Ok(slab.slice(offset..offset + len))
58        } else {
59            let mut out = Vec::with_capacity((range.end - range.start) as usize);
60            for slab_idx in start_slab..=end_slab {
61                let slab = self.load_slab(path, slab_idx, storage).await?;
62                let slab_start = slab_idx * IO_SLAB_SIZE;
63                let from = (range.start.max(slab_start) - slab_start) as usize;
64                let to = (range.end.min(slab_start + IO_SLAB_SIZE) - slab_start) as usize;
65                out.extend_from_slice(&slab[from..to.min(slab.len())]);
66            }
67            Ok(Bytes::from(out))
68        }
69    }
70
71    async fn load_slab(
72        &self,
73        path: &Arc<str>,
74        slab_idx: u64,
75        storage: &(dyn Storage + Sync),
76    ) -> Result<Bytes> {
77        let file_size = self
78            .file_sizes
79            .try_get_with(Arc::clone(path), async move { storage.size().await })
80            .await
81            .map_err(|e| Error::Storage(format!("size fetch failed: {e}")))?;
82
83        let slab_start = slab_idx * IO_SLAB_SIZE;
84        let slab_end = (slab_start + IO_SLAB_SIZE).min(file_size);
85        let key = (Arc::clone(path), slab_idx);
86        self.inner
87            .try_get_with(key, async move {
88                storage.read_range(slab_start..slab_end).await
89            })
90            .await
91            .map_err(|e| Error::Storage(format!("io slab load failed: {e}")))
92    }
93}
94
95/// Two-level cache shared across all delta layers in an [`ArrayFile`](crate::file::ArrayFile).
96///
97/// Level 1 — block cache: decompressed block bytes, keyed by `(path, block_id)`.
98/// Level 2 — I/O slab cache: raw compressed bytes in 1 MiB slabs, keyed by `(path, slab_index)`.
99///
100/// On a block cache miss, raw bytes are fetched via the I/O slab cache (if enabled),
101/// then decompressed and stored in the block cache.
102#[derive(Clone)]
103pub struct DeltaCache {
104    block_cache: Cache<(Arc<str>, u32), Bytes>,
105    io_cache: Option<IoCache>,
106}
107
108impl DeltaCache {
109    /// Creates a new cache.
110    ///
111    /// `block_capacity` is the byte budget for decompressed blocks.
112    /// `io_capacity` is the byte budget for raw I/O slabs (0 disables the I/O tier).
113    pub fn new(block_capacity: u64, io_capacity: u64) -> Self {
114        let block_cache = Cache::builder()
115            .weigher(|_: &(Arc<str>, u32), v: &Bytes| v.len().try_into().unwrap_or(u32::MAX))
116            .max_capacity(block_capacity)
117            .build();
118        let io_cache = if io_capacity > 0 {
119            Some(IoCache::new(io_capacity))
120        } else {
121            None
122        };
123        Self {
124            block_cache,
125            io_cache,
126        }
127    }
128
129    /// Returns the decompressed block, loading from `storage` on a cache miss.
130    ///
131    /// Concurrent requests for the same `(path, block_id)` are coalesced.
132    pub(crate) async fn get_or_load(
133        &self,
134        path: &Arc<str>,
135        block_meta: &BlockMeta,
136        storage: &(dyn Storage + Sync),
137    ) -> Result<Bytes> {
138        let key = (Arc::clone(path), block_meta.id.0);
139        let range = block_meta.file_range();
140        let uncompressed_size = block_meta.uncompressed_size as usize;
141        let codec_id = block_meta.codec.clone();
142        let io_cache = self.io_cache.clone();
143        let path_for_io = Arc::clone(path);
144
145        self.block_cache
146            .try_get_with(key, async move {
147                // Only use the I/O slab cache when the block fits in a single slab.
148                // Multi-slab blocks (compressed_size > IO_SLAB_SIZE) are read directly:
149                // splitting them across N slab loads adds overhead with no coalescing benefit.
150                let same_slab =
151                    range.start / IO_SLAB_SIZE == (range.end.saturating_sub(1)) / IO_SLAB_SIZE;
152                let raw = match &io_cache {
153                    Some(io) if same_slab => io.read_range(&path_for_io, range, storage).await?,
154                    _ => storage.read_range(range).await?,
155                };
156                let decompressed = if codec_id == CodecId::None {
157                    raw
158                } else {
159                    let vec = tokio::task::spawn_blocking(move || {
160                        decompress_by_id(&codec_id, &raw, uncompressed_size)
161                    })
162                    .await
163                    .map_err(|e| Error::Storage(format!("decompression task failed: {e}")))??;
164                    Bytes::from(vec)
165                };
166                Ok::<Bytes, Error>(decompressed)
167            })
168            .await
169            .map_err(|e| Error::Storage(format!("cache load failed: {e}")))
170    }
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176    use crate::address::BlockId;
177    use crate::block::{BlockMeta, CodecId};
178    use crate::storage::InMemoryStorage;
179
180    fn make_block_meta(id: u32, offset: u64, size: u64) -> BlockMeta {
181        BlockMeta {
182            id: BlockId(id),
183            file_offset: offset,
184            compressed_size: size,
185            uncompressed_size: size,
186            codec: CodecId::None,
187        }
188    }
189
190    // ── Block cache tests ─────────────────────────────────────────────
191
192    #[tokio::test]
193    async fn block_cache_miss_loads_block() {
194        let data = vec![0xAA; 100];
195        let storage = InMemoryStorage::from_bytes(data.clone());
196        let cache = DeltaCache::new(1024, 0);
197        let meta = make_block_meta(0, 0, 100);
198
199        let result = cache
200            .get_or_load(&Arc::from("p"), &meta, &storage)
201            .await
202            .unwrap();
203        assert_eq!(&result[..], &data[..]);
204    }
205
206    #[tokio::test]
207    async fn block_cache_hit_returns_same_bytes() {
208        let data = vec![0xBB; 200];
209        let storage = InMemoryStorage::from_bytes(data.clone());
210        let cache = DeltaCache::new(4096, 0);
211        let meta = make_block_meta(0, 0, 200);
212
213        let first = cache
214            .get_or_load(&Arc::from("p"), &meta, &storage)
215            .await
216            .unwrap();
217        let second = cache
218            .get_or_load(&Arc::from("p"), &meta, &storage)
219            .await
220            .unwrap();
221        assert_eq!(first, second);
222        assert_eq!(&first[..], &data[..]);
223    }
224
225    // ── IoCache tests ─────────────────────────────────────────────────
226
227    #[tokio::test]
228    async fn io_cache_read_within_single_slab() {
229        let mut data = vec![0u8; 2 * 1024 * 1024]; // 2 MiB
230        for (i, b) in data.iter_mut().enumerate() {
231            *b = (i % 251) as u8;
232        }
233        let storage = InMemoryStorage::from_bytes(data.clone());
234        let io = IoCache::new(4 * 1024 * 1024);
235
236        let range = 100u64..500u64;
237        let result = io
238            .read_range(&Arc::from("p"), range.clone(), &storage)
239            .await
240            .unwrap();
241        assert_eq!(&result[..], &data[100..500]);
242    }
243
244    #[tokio::test]
245    async fn io_cache_read_spanning_two_slabs() {
246        let size = 2 * 1024 * 1024usize;
247        let mut data = vec![0u8; size];
248        for (i, b) in data.iter_mut().enumerate() {
249            *b = (i % 251) as u8;
250        }
251        let storage = InMemoryStorage::from_bytes(data.clone());
252        let io = IoCache::new(4 * 1024 * 1024);
253
254        // Range that straddles the 1 MiB boundary
255        let start = 1024 * 1024 - 100;
256        let end = 1024 * 1024 + 100;
257        let result = io
258            .read_range(&Arc::from("p"), start as u64..end as u64, &storage)
259            .await
260            .unwrap();
261        assert_eq!(&result[..], &data[start..end]);
262    }
263
264    #[tokio::test]
265    async fn io_cache_handles_partial_last_slab() {
266        // File smaller than 1 MiB
267        let data = vec![0xCC; 500_000];
268        let storage = InMemoryStorage::from_bytes(data.clone());
269        let io = IoCache::new(2 * 1024 * 1024);
270
271        let result = io
272            .read_range(&Arc::from("p"), 0..500_000, &storage)
273            .await
274            .unwrap();
275        assert_eq!(&result[..], &data[..]);
276    }
277
278    #[tokio::test]
279    async fn block_cache_with_io_tier_returns_correct_bytes() {
280        let data = vec![0xDE; 300];
281        let storage = InMemoryStorage::from_bytes(data.clone());
282        let cache = DeltaCache::new(4096, 2 * 1024 * 1024);
283        let meta = make_block_meta(0, 0, 300);
284
285        let result = cache
286            .get_or_load(&Arc::from("p"), &meta, &storage)
287            .await
288            .unwrap();
289        assert_eq!(&result[..], &data[..]);
290    }
291}