Skip to main content

array_format/delta/
cache.rs

1use std::ops::Range;
2use std::sync::Arc;
3
4use bytes::Bytes;
5use moka::future::Cache;
6
7use crate::{
8    block::{BlockMeta, CodecId},
9    codec::decompress_by_id,
10    error::{Error, Result},
11    storage::Storage,
12};
13
14const IO_SLAB_SIZE: u64 = 1024 * 1024; // 1 MiB
15
16/// Raw I/O cache: caches compressed file bytes in 1 MiB aligned slabs.
17///
18/// Key: `(delta_path, slab_index)`. Concurrent requests for the same slab are
19/// coalesced — only one task performs the actual storage read.
20#[derive(Clone)]
21struct IoCache {
22    inner: Cache<(Arc<str>, u64), Bytes>,
23    /// File sizes cached per path so `storage.size()` is only called once per delta.
24    file_sizes: Cache<Arc<str>, u64>,
25}
26
27impl IoCache {
28    fn new(max_capacity_bytes: u64) -> Self {
29        let inner = Cache::builder()
30            .weigher(|_: &(Arc<str>, u64), v: &Bytes| v.len().try_into().unwrap_or(u32::MAX))
31            .max_capacity(max_capacity_bytes)
32            .build();
33        let file_sizes = Cache::builder().max_capacity(1024).build();
34        Self { inner, file_sizes }
35    }
36
37    /// Reads `range` bytes, routing through aligned 1 MiB slab loads.
38    async fn read_range(
39        &self,
40        path: &Arc<str>,
41        range: Range<u64>,
42        storage: &(dyn Storage + Sync),
43    ) -> Result<Bytes> {
44        let start_slab = range.start / IO_SLAB_SIZE;
45        let end_slab = (range.end.saturating_sub(1)) / IO_SLAB_SIZE;
46
47        if start_slab == end_slab {
48            let slab = self.load_slab(path, start_slab, storage).await?;
49            let offset = (range.start - start_slab * IO_SLAB_SIZE) as usize;
50            let len = (range.end - range.start) as usize;
51            Ok(slab.slice(offset..offset + len))
52        } else {
53            let mut out = Vec::with_capacity((range.end - range.start) as usize);
54            for slab_idx in start_slab..=end_slab {
55                let slab = self.load_slab(path, slab_idx, storage).await?;
56                let slab_start = slab_idx * IO_SLAB_SIZE;
57                let from = (range.start.max(slab_start) - slab_start) as usize;
58                let to = (range.end.min(slab_start + IO_SLAB_SIZE) - slab_start) as usize;
59                out.extend_from_slice(&slab[from..to.min(slab.len())]);
60            }
61            Ok(Bytes::from(out))
62        }
63    }
64
65    async fn load_slab(
66        &self,
67        path: &Arc<str>,
68        slab_idx: u64,
69        storage: &(dyn Storage + Sync),
70    ) -> Result<Bytes> {
71        let file_size = self
72            .file_sizes
73            .try_get_with(Arc::clone(path), async move { storage.size().await })
74            .await
75            .map_err(|e| Error::Storage(format!("size fetch failed: {e}")))?;
76
77        let slab_start = slab_idx * IO_SLAB_SIZE;
78        let slab_end = (slab_start + IO_SLAB_SIZE).min(file_size);
79        let key = (Arc::clone(path), slab_idx);
80        self.inner
81            .try_get_with(key, async move {
82                storage.read_range(slab_start..slab_end).await
83            })
84            .await
85            .map_err(|e| Error::Storage(format!("io slab load failed: {e}")))
86    }
87}
88
89/// Two-level cache shared across all delta layers in an [`ArrayFile`](crate::file::ArrayFile).
90///
91/// Level 1 — block cache: decompressed block bytes, keyed by `(path, block_id)`.
92/// Level 2 — I/O slab cache: raw compressed bytes in 1 MiB slabs, keyed by `(path, slab_index)`.
93///
94/// On a block cache miss, raw bytes are fetched via the I/O slab cache (if enabled),
95/// then decompressed and stored in the block cache.
96#[derive(Clone)]
97pub struct DeltaCache {
98    block_cache: Cache<(Arc<str>, u32), Bytes>,
99    io_cache: Option<IoCache>,
100}
101
102impl DeltaCache {
103    /// Creates a new cache.
104    ///
105    /// `block_capacity` is the byte budget for decompressed blocks.
106    /// `io_capacity` is the byte budget for raw I/O slabs (0 disables the I/O tier).
107    pub fn new(block_capacity: u64, io_capacity: u64) -> Self {
108        let block_cache = Cache::builder()
109            .weigher(|_: &(Arc<str>, u32), v: &Bytes| v.len().try_into().unwrap_or(u32::MAX))
110            .max_capacity(block_capacity)
111            .build();
112        let io_cache = if io_capacity > 0 {
113            Some(IoCache::new(io_capacity))
114        } else {
115            None
116        };
117        Self {
118            block_cache,
119            io_cache,
120        }
121    }
122
123    /// Returns the decompressed block, loading from `storage` on a cache miss.
124    ///
125    /// Concurrent requests for the same `(path, block_id)` are coalesced.
126    pub async fn get_or_load(
127        &self,
128        path: &Arc<str>,
129        block_meta: &BlockMeta,
130        storage: &(dyn Storage + Sync),
131    ) -> Result<Bytes> {
132        let key = (Arc::clone(path), block_meta.id.0);
133        let range = block_meta.file_range();
134        let uncompressed_size = block_meta.uncompressed_size as usize;
135        let codec_id = block_meta.codec.clone();
136        let io_cache = self.io_cache.clone();
137        let path_for_io = Arc::clone(path);
138
139        self.block_cache
140            .try_get_with(key, async move {
141                // Only use the I/O slab cache when the block fits in a single slab.
142                // Multi-slab blocks (compressed_size > IO_SLAB_SIZE) are read directly:
143                // splitting them across N slab loads adds overhead with no coalescing benefit.
144                let same_slab =
145                    range.start / IO_SLAB_SIZE == (range.end.saturating_sub(1)) / IO_SLAB_SIZE;
146                let raw = match &io_cache {
147                    Some(io) if same_slab => io.read_range(&path_for_io, range, storage).await?,
148                    _ => storage.read_range(range).await?,
149                };
150                let decompressed = if codec_id == CodecId::None {
151                    raw
152                } else {
153                    let vec = tokio::task::spawn_blocking(move || {
154                        decompress_by_id(&codec_id, &raw, uncompressed_size)
155                    })
156                    .await
157                    .map_err(|e| Error::Storage(format!("decompression task failed: {e}")))??;
158                    Bytes::from(vec)
159                };
160                Ok::<Bytes, Error>(decompressed)
161            })
162            .await
163            .map_err(|e| Error::Storage(format!("cache load failed: {e}")))
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170    use crate::address::BlockId;
171    use crate::block::{BlockMeta, CodecId};
172    use crate::storage::InMemoryStorage;
173
174    fn make_block_meta(id: u32, offset: u64, size: u64) -> BlockMeta {
175        BlockMeta {
176            id: BlockId(id),
177            file_offset: offset,
178            compressed_size: size,
179            uncompressed_size: size,
180            codec: CodecId::None,
181        }
182    }
183
184    // ── Block cache tests ─────────────────────────────────────────────
185
186    #[tokio::test]
187    async fn block_cache_miss_loads_block() {
188        let data = vec![0xAA; 100];
189        let storage = InMemoryStorage::from_bytes(data.clone());
190        let cache = DeltaCache::new(1024, 0);
191        let meta = make_block_meta(0, 0, 100);
192
193        let result = cache
194            .get_or_load(&Arc::from("p"), &meta, &storage)
195            .await
196            .unwrap();
197        assert_eq!(&result[..], &data[..]);
198    }
199
200    #[tokio::test]
201    async fn block_cache_hit_returns_same_bytes() {
202        let data = vec![0xBB; 200];
203        let storage = InMemoryStorage::from_bytes(data.clone());
204        let cache = DeltaCache::new(4096, 0);
205        let meta = make_block_meta(0, 0, 200);
206
207        let first = cache
208            .get_or_load(&Arc::from("p"), &meta, &storage)
209            .await
210            .unwrap();
211        let second = cache
212            .get_or_load(&Arc::from("p"), &meta, &storage)
213            .await
214            .unwrap();
215        assert_eq!(first, second);
216        assert_eq!(&first[..], &data[..]);
217    }
218
219    // ── IoCache tests ─────────────────────────────────────────────────
220
221    #[tokio::test]
222    async fn io_cache_read_within_single_slab() {
223        let mut data = vec![0u8; 2 * 1024 * 1024]; // 2 MiB
224        for (i, b) in data.iter_mut().enumerate() {
225            *b = (i % 251) as u8;
226        }
227        let storage = InMemoryStorage::from_bytes(data.clone());
228        let io = IoCache::new(4 * 1024 * 1024);
229
230        let range = 100u64..500u64;
231        let result = io
232            .read_range(&Arc::from("p"), range.clone(), &storage)
233            .await
234            .unwrap();
235        assert_eq!(&result[..], &data[100..500]);
236    }
237
238    #[tokio::test]
239    async fn io_cache_read_spanning_two_slabs() {
240        let size = 2 * 1024 * 1024usize;
241        let mut data = vec![0u8; size];
242        for (i, b) in data.iter_mut().enumerate() {
243            *b = (i % 251) as u8;
244        }
245        let storage = InMemoryStorage::from_bytes(data.clone());
246        let io = IoCache::new(4 * 1024 * 1024);
247
248        // Range that straddles the 1 MiB boundary
249        let start = 1024 * 1024 - 100;
250        let end = 1024 * 1024 + 100;
251        let result = io
252            .read_range(&Arc::from("p"), start as u64..end as u64, &storage)
253            .await
254            .unwrap();
255        assert_eq!(&result[..], &data[start..end]);
256    }
257
258    #[tokio::test]
259    async fn io_cache_handles_partial_last_slab() {
260        // File smaller than 1 MiB
261        let data = vec![0xCC; 500_000];
262        let storage = InMemoryStorage::from_bytes(data.clone());
263        let io = IoCache::new(2 * 1024 * 1024);
264
265        let result = io
266            .read_range(&Arc::from("p"), 0..500_000, &storage)
267            .await
268            .unwrap();
269        assert_eq!(&result[..], &data[..]);
270    }
271
272    #[tokio::test]
273    async fn block_cache_with_io_tier_returns_correct_bytes() {
274        let data = vec![0xDE; 300];
275        let storage = InMemoryStorage::from_bytes(data.clone());
276        let cache = DeltaCache::new(4096, 2 * 1024 * 1024);
277        let meta = make_block_meta(0, 0, 300);
278
279        let result = cache
280            .get_or_load(&Arc::from("p"), &meta, &storage)
281            .await
282            .unwrap();
283        assert_eq!(&result[..], &data[..]);
284    }
285}