Skip to main content

hdf5_reader/
cache.rs

1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::sync::Arc;
4
5use lru::LruCache;
6use parking_lot::{Condvar, Mutex};
7use smallvec::SmallVec;
8
9use crate::error::Result;
10
11/// Key for the chunk cache: (dataset object header address, chunk offset tuple).
12///
13/// Uses `SmallVec<[u64; 4]>` to avoid heap allocation for datasets with up to
14/// 4 dimensions (the common case for climate/science data).
15#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16pub struct ChunkKey {
17    pub dataset_addr: u64,
18    pub chunk_offsets: SmallVec<[u64; 4]>,
19}
20
21/// LRU cache for decompressed chunks.
22///
23/// Thread-safe via `parking_lot::Mutex` (non-poisoning). Values are
24/// `Arc<Vec<u8>>` so multiple readers can share the same decompressed chunk data.
25pub struct ChunkCache {
26    inner: Mutex<ChunkCacheState>,
27    max_bytes: usize,
28    max_slots: usize,
29}
30
31struct ChunkCacheState {
32    cache: LruCache<ChunkKey, Arc<Vec<u8>>>,
33    current_bytes: usize,
34    in_flight: HashMap<ChunkKey, Arc<InFlightLoad>>,
35    hits: u64,
36    misses: u64,
37    inserts: u64,
38    evictions: u64,
39}
40
41struct InFlightLoad {
42    completed: Mutex<bool>,
43    ready: Condvar,
44}
45
46/// Point-in-time statistics for a [`ChunkCache`].
47#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
48pub struct ChunkCacheStats {
49    pub hits: u64,
50    pub misses: u64,
51    pub inserts: u64,
52    pub evictions: u64,
53    pub current_bytes: usize,
54    pub entries: usize,
55    pub in_flight: usize,
56    pub max_bytes: usize,
57    pub max_slots: usize,
58}
59
60impl ChunkCache {
61    /// Create a new chunk cache.
62    ///
63    /// - `max_bytes`: maximum total bytes of decompressed data to cache (default 64 MiB)
64    /// - `max_slots`: maximum number of entries (default 521)
65    pub fn new(max_bytes: usize, max_slots: usize) -> Self {
66        let slots = NonZeroUsize::new(max_slots).unwrap_or(NonZeroUsize::new(521).unwrap());
67        ChunkCache {
68            inner: Mutex::new(ChunkCacheState {
69                cache: LruCache::new(slots),
70                current_bytes: 0,
71                in_flight: HashMap::new(),
72                hits: 0,
73                misses: 0,
74                inserts: 0,
75                evictions: 0,
76            }),
77            max_bytes,
78            max_slots: slots.get(),
79        }
80    }
81
82    /// Get a cached chunk, if present. Promotes the entry in LRU order.
83    pub fn get(&self, key: &ChunkKey) -> Option<Arc<Vec<u8>>> {
84        let mut cache = self.inner.lock();
85        let result = cache.cache.get(key).cloned();
86        if result.is_some() {
87            cache.hits += 1;
88        } else {
89            cache.misses += 1;
90        }
91        result
92    }
93
94    /// Insert a chunk into the cache. Evicts LRU entries if over capacity.
95    pub fn insert(&self, key: ChunkKey, data: Vec<u8>) -> Arc<Vec<u8>> {
96        let data_len = data.len();
97        let arc = Arc::new(data);
98
99        if self.max_bytes == 0 || data_len > self.max_bytes {
100            return arc;
101        }
102
103        let mut state = self.inner.lock();
104        // Evict until we have room
105        while state.current_bytes + data_len > self.max_bytes && !state.cache.is_empty() {
106            if let Some((_, evicted)) = state.cache.pop_lru() {
107                state.current_bytes = state.current_bytes.saturating_sub(evicted.len());
108                state.evictions += 1;
109            }
110        }
111
112        if let Some(replaced) = state.cache.peek(&key) {
113            state.current_bytes = state.current_bytes.saturating_sub(replaced.len());
114        }
115        state.current_bytes += data_len;
116        state.inserts += 1;
117        state.cache.put(key, arc.clone());
118
119        arc
120    }
121
122    /// Return a cached chunk or compute it once across concurrent callers.
123    pub fn get_or_insert_with<F>(&self, key: ChunkKey, load: F) -> Result<Arc<Vec<u8>>>
124    where
125        F: FnOnce() -> Result<Vec<u8>>,
126    {
127        loop {
128            let in_flight = {
129                let mut state = self.inner.lock();
130                if let Some(cached) = state.cache.get(&key).cloned() {
131                    state.hits += 1;
132                    return Ok(cached);
133                }
134                state.misses += 1;
135
136                if let Some(in_flight) = state.in_flight.get(&key) {
137                    Arc::clone(in_flight)
138                } else {
139                    let in_flight = Arc::new(InFlightLoad {
140                        completed: Mutex::new(false),
141                        ready: Condvar::new(),
142                    });
143                    state.in_flight.insert(key.clone(), Arc::clone(&in_flight));
144                    drop(state);
145
146                    let result = load().map(|data| self.insert(key.clone(), data));
147
148                    let mut state = self.inner.lock();
149                    state.in_flight.remove(&key);
150                    let mut completed = in_flight.completed.lock();
151                    *completed = true;
152                    in_flight.ready.notify_all();
153
154                    return result;
155                }
156            };
157
158            let mut completed = in_flight.completed.lock();
159            while !*completed {
160                in_flight.ready.wait(&mut completed);
161            }
162        }
163    }
164
165    /// Return current cache statistics.
166    pub fn stats(&self) -> ChunkCacheStats {
167        let state = self.inner.lock();
168        ChunkCacheStats {
169            hits: state.hits,
170            misses: state.misses,
171            inserts: state.inserts,
172            evictions: state.evictions,
173            current_bytes: state.current_bytes,
174            entries: state.cache.len(),
175            in_flight: state.in_flight.len(),
176            max_bytes: self.max_bytes,
177            max_slots: self.max_slots,
178        }
179    }
180}
181
182impl Default for ChunkCache {
183    fn default() -> Self {
184        Self::new(64 * 1024 * 1024, 521)
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191
192    #[test]
193    fn test_cache_insert_and_get() {
194        let cache = ChunkCache::new(1024, 10);
195        let key = ChunkKey {
196            dataset_addr: 100,
197            chunk_offsets: SmallVec::from_vec(vec![0, 0]),
198        };
199        cache.insert(key.clone(), vec![1, 2, 3]);
200        let val = cache.get(&key).unwrap();
201        assert_eq!(&**val, &[1, 2, 3]);
202    }
203
204    #[test]
205    fn test_cache_eviction() {
206        let cache = ChunkCache::new(10, 10); // 10 bytes max
207        for i in 0..5 {
208            let key = ChunkKey {
209                dataset_addr: 100,
210                chunk_offsets: SmallVec::from_vec(vec![i]),
211            };
212            cache.insert(key, vec![0; 4]); // 4 bytes each
213        }
214        // Should have evicted older entries to stay under 10 bytes
215        // At most 2 entries of 4 bytes each = 8 bytes
216        let first_key = ChunkKey {
217            dataset_addr: 100,
218            chunk_offsets: SmallVec::from_vec(vec![0]),
219        };
220        assert!(cache.get(&first_key).is_none()); // should be evicted
221    }
222
223    #[test]
224    fn test_cache_disabled_bypasses_storage() {
225        let cache = ChunkCache::new(0, 10);
226        let key = ChunkKey {
227            dataset_addr: 100,
228            chunk_offsets: SmallVec::from_vec(vec![0]),
229        };
230        cache.insert(key.clone(), vec![1, 2, 3]);
231        assert!(cache.get(&key).is_none());
232    }
233
234    #[test]
235    fn test_cache_promotes_on_get() {
236        // Verify that get() promotes entries in LRU order (the bug fix).
237        let cache = ChunkCache::new(12, 10); // room for 3 entries of 4 bytes
238        let key_a = ChunkKey {
239            dataset_addr: 1,
240            chunk_offsets: SmallVec::from_vec(vec![0]),
241        };
242        let key_b = ChunkKey {
243            dataset_addr: 2,
244            chunk_offsets: SmallVec::from_vec(vec![0]),
245        };
246        let key_c = ChunkKey {
247            dataset_addr: 3,
248            chunk_offsets: SmallVec::from_vec(vec![0]),
249        };
250
251        cache.insert(key_a.clone(), vec![0; 4]); // LRU order: a
252        cache.insert(key_b.clone(), vec![0; 4]); // LRU order: a, b
253        cache.insert(key_c.clone(), vec![0; 4]); // LRU order: a, b, c
254
255        // Access key_a to promote it
256        assert!(cache.get(&key_a).is_some()); // LRU order: b, c, a
257
258        // Insert a new entry that forces eviction
259        let key_d = ChunkKey {
260            dataset_addr: 4,
261            chunk_offsets: SmallVec::from_vec(vec![0]),
262        };
263        cache.insert(key_d, vec![0; 4]); // Should evict b (LRU)
264
265        assert!(cache.get(&key_a).is_some()); // a was promoted, should survive
266        assert!(cache.get(&key_b).is_none()); // b was LRU, should be evicted
267    }
268
269    #[test]
270    fn test_cache_replacement_updates_accounting() {
271        let cache = ChunkCache::new(8, 10);
272        let key = ChunkKey {
273            dataset_addr: 100,
274            chunk_offsets: SmallVec::from_vec(vec![0]),
275        };
276
277        cache.insert(key.clone(), vec![1, 2, 3, 4]);
278        cache.insert(key.clone(), vec![5, 6]);
279
280        let other = ChunkKey {
281            dataset_addr: 100,
282            chunk_offsets: SmallVec::from_vec(vec![1]),
283        };
284        cache.insert(other.clone(), vec![7, 8, 9, 10]);
285
286        assert_eq!(&**cache.get(&key).unwrap(), &[5, 6]);
287        assert!(cache.get(&other).is_some());
288    }
289
290    #[test]
291    fn test_cache_get_or_insert_with_deduplicates_concurrent_loads() {
292        use std::sync::atomic::{AtomicUsize, Ordering};
293
294        let cache = Arc::new(ChunkCache::new(1024, 10));
295        let key = ChunkKey {
296            dataset_addr: 100,
297            chunk_offsets: SmallVec::from_vec(vec![0, 0]),
298        };
299        let load_count = Arc::new(AtomicUsize::new(0));
300
301        std::thread::scope(|scope| {
302            for _ in 0..8 {
303                let cache = Arc::clone(&cache);
304                let key = key.clone();
305                let load_count = Arc::clone(&load_count);
306                scope.spawn(move || {
307                    let value = cache
308                        .get_or_insert_with(key, || {
309                            load_count.fetch_add(1, Ordering::SeqCst);
310                            std::thread::sleep(std::time::Duration::from_millis(10));
311                            Ok(vec![1, 2, 3, 4])
312                        })
313                        .unwrap();
314                    assert_eq!(&*value, &[1, 2, 3, 4]);
315                });
316            }
317        });
318
319        assert_eq!(load_count.load(Ordering::SeqCst), 1);
320    }
321
322    #[test]
323    fn test_cache_stats_track_hits_misses_and_evictions() {
324        let cache = ChunkCache::new(8, 2);
325        let key_a = ChunkKey {
326            dataset_addr: 1,
327            chunk_offsets: SmallVec::from_vec(vec![0]),
328        };
329        let key_b = ChunkKey {
330            dataset_addr: 1,
331            chunk_offsets: SmallVec::from_vec(vec![1]),
332        };
333        let key_c = ChunkKey {
334            dataset_addr: 1,
335            chunk_offsets: SmallVec::from_vec(vec![2]),
336        };
337
338        assert!(cache.get(&key_a).is_none());
339        cache.insert(key_a.clone(), vec![1, 2, 3, 4]);
340        assert!(cache.get(&key_a).is_some());
341        cache.insert(key_b, vec![5, 6, 7, 8]);
342        cache.insert(key_c, vec![9, 10, 11, 12]);
343
344        let stats = cache.stats();
345        assert_eq!(stats.hits, 1);
346        assert_eq!(stats.misses, 1);
347        assert_eq!(stats.inserts, 3);
348        assert_eq!(stats.evictions, 1);
349        assert_eq!(stats.entries, 2);
350        assert_eq!(stats.current_bytes, 8);
351        assert_eq!(stats.max_bytes, 8);
352        assert_eq!(stats.max_slots, 2);
353    }
354}