Skip to main content

apiary_runtime/
cache.rs

1//! Local cell cache with LRU eviction.
2//!
3//! Each node maintains a local cache of recently accessed cells from object storage.
4//! The cache uses an LRU (Least Recently Used) eviction policy to stay within its
5//! size limit. Cache hits eliminate S3 fetches, significantly improving query performance.
6
7use std::collections::HashMap;
8use std::path::PathBuf;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11use std::time::Instant;
12
13use tokio::fs;
14use tokio::io::AsyncWriteExt;
15use tokio::sync::RwLock;
16use tracing::{debug, info, warn};
17
18use apiary_core::error::ApiaryError;
19use apiary_core::storage::StorageBackend;
20use apiary_core::Result;
21
22/// A single entry in the cell cache.
23#[derive(Debug, Clone)]
24pub struct CacheEntry {
25    /// Storage key for this cell (e.g., "cells/frame_id/cell_abc123.parquet").
26    pub storage_key: String,
27
28    /// Local filesystem path where the cell is cached.
29    pub local_path: PathBuf,
30
31    /// Size of the cached file in bytes.
32    pub size: u64,
33
34    /// Last time this entry was accessed (for LRU eviction).
35    pub last_accessed: Instant,
36}
37
38/// Local cell cache with LRU eviction policy.
39///
40/// The cache stores recently accessed cells from object storage in the local
41/// filesystem. When the cache exceeds its size limit, the least recently
42/// accessed entries are evicted.
43///
44/// # Thread Safety
45///
46/// The cache is designed for concurrent access from multiple bees. All
47/// operations use interior mutability via `RwLock` and atomic operations.
48pub struct CellCache {
49    /// Directory where cached cells are stored.
50    cache_dir: PathBuf,
51
52    /// Maximum total size of the cache in bytes.
53    max_size: u64,
54
55    /// Current total size of cached files in bytes (atomic for fast reads).
56    current_size: Arc<AtomicU64>,
57
58    /// Map of storage keys to cache entries.
59    entries: Arc<RwLock<HashMap<String, CacheEntry>>>,
60
61    /// Reference to the storage backend for fetching cells on cache miss.
62    storage: Arc<dyn StorageBackend>,
63}
64
65impl CellCache {
66    /// Create a new `CellCache` with the specified directory and size limit.
67    ///
68    /// # Arguments
69    ///
70    /// * `cache_dir` — Directory where cached cells will be stored.
71    /// * `max_size` — Maximum total size of cached files in bytes.
72    /// * `storage` — Storage backend for fetching cells on cache miss.
73    ///
74    /// # Errors
75    ///
76    /// Returns an error if the cache directory cannot be created.
77    pub async fn new(
78        cache_dir: PathBuf,
79        max_size: u64,
80        storage: Arc<dyn StorageBackend>,
81    ) -> Result<Self> {
82        // Create the cache directory if it doesn't exist
83        fs::create_dir_all(&cache_dir)
84            .await
85            .map_err(|e| ApiaryError::Storage {
86                message: format!("Failed to create cache directory: {:?}", cache_dir),
87                source: Some(Box::new(e)),
88            })?;
89
90        info!(
91            cache_dir = ?cache_dir,
92            max_size_mb = max_size / (1024 * 1024),
93            "Cell cache initialized"
94        );
95
96        Ok(Self {
97            cache_dir,
98            max_size,
99            current_size: Arc::new(AtomicU64::new(0)),
100            entries: Arc::new(RwLock::new(HashMap::new())),
101            storage,
102        })
103    }
104
105    /// Get a cell from the cache or fetch it from storage on cache miss.
106    ///
107    /// # Arguments
108    ///
109    /// * `storage_key` — The storage key for the cell (e.g., "cells/frame_id/cell_abc.parquet").
110    ///
111    /// # Returns
112    ///
113    /// Returns the local filesystem path to the cached cell.
114    ///
115    /// # Behavior
116    ///
117    /// - **Cache hit**: Updates last accessed time and returns the local path.
118    /// - **Cache miss**: Fetches the cell from storage, adds it to the cache, and returns the path.
119    /// - Automatically evicts LRU entries if the cache exceeds its size limit.
120    pub async fn get(&self, storage_key: &str) -> Result<PathBuf> {
121        // Check for cache hit with read lock first
122        {
123            let entries = self.entries.read().await;
124            if let Some(entry) = entries.get(storage_key) {
125                let path = entry.local_path.clone();
126                drop(entries); // Release read lock before acquiring write lock
127
128                // Update last accessed time with write lock
129                let mut entries_write = self.entries.write().await;
130                if let Some(entry) = entries_write.get_mut(storage_key) {
131                    entry.last_accessed = Instant::now();
132                }
133
134                debug!(storage_key, "Cache hit");
135                return Ok(path);
136            }
137        }
138
139        // Cache miss - fetch from storage
140        debug!(storage_key, "Cache miss - fetching from storage");
141
142        // Create a local path based on the storage key (sanitize for filesystem)
143        let sanitized = storage_key.replace('/', "_");
144        let local_path = self.cache_dir.join(&sanitized);
145
146        // Fetch the cell from storage
147        let data = self.storage.get(storage_key).await?;
148        let size = data.len() as u64;
149
150        // Write to local cache
151        let mut file = fs::File::create(&local_path)
152            .await
153            .map_err(|e| ApiaryError::Storage {
154                message: format!("Failed to create cache file: {:?}", local_path),
155                source: Some(Box::new(e)),
156            })?;
157
158        file.write_all(&data)
159            .await
160            .map_err(|e| ApiaryError::Storage {
161                message: format!("Failed to write cache file: {:?}", local_path),
162                source: Some(Box::new(e)),
163            })?;
164
165        // Add to cache entries
166        {
167            let mut entries = self.entries.write().await;
168            entries.insert(
169                storage_key.to_string(),
170                CacheEntry {
171                    storage_key: storage_key.to_string(),
172                    local_path: local_path.clone(),
173                    size,
174                    last_accessed: Instant::now(),
175                },
176            );
177        }
178
179        // Update current size
180        self.current_size.fetch_add(size, Ordering::SeqCst);
181
182        // Evict if needed
183        self.evict_if_needed().await?;
184
185        debug!(storage_key, size, "Cell cached");
186        Ok(local_path)
187    }
188
189    /// Evict the least recently used entries until the cache is under its size limit.
190    ///
191    /// This method is called automatically after adding a new entry to the cache.
192    ///
193    /// Note: Current implementation clones and sorts all entries on each eviction.
194    /// This is acceptable for v1 since:
195    /// - Evictions are rare (only when cache exceeds limit)
196    /// - Typical cache sizes are manageable (hundreds of cells)
197    ///
198    /// Future optimization: Use a linked hashmap or priority queue for O(1) LRU tracking
199    async fn evict_if_needed(&self) -> Result<()> {
200        let current = self.current_size.load(Ordering::SeqCst);
201        if current <= self.max_size {
202            return Ok(());
203        }
204
205        info!(
206            current_mb = current / (1024 * 1024),
207            max_mb = self.max_size / (1024 * 1024),
208            "Cache size exceeded - starting LRU eviction"
209        );
210
211        let mut entries = self.entries.write().await;
212
213        // Sort entries by last accessed time (oldest first)
214        // Note: This clones entries for sorting - acceptable for v1 scale
215        let mut sorted: Vec<_> = entries.values().cloned().collect();
216        sorted.sort_by_key(|e| e.last_accessed);
217
218        // Evict entries until we're under the limit
219        let mut freed = 0u64;
220        for entry in sorted {
221            if self.current_size.load(Ordering::SeqCst) <= self.max_size {
222                break;
223            }
224
225            // Remove from entries map
226            entries.remove(&entry.storage_key);
227
228            // Delete the local file
229            if let Err(e) = fs::remove_file(&entry.local_path).await {
230                warn!(
231                    path = ?entry.local_path,
232                    error = %e,
233                    "Failed to delete evicted cache file"
234                );
235            } else {
236                freed += entry.size;
237                self.current_size.fetch_sub(entry.size, Ordering::SeqCst);
238                debug!(
239                    storage_key = entry.storage_key,
240                    size = entry.size,
241                    "Evicted cache entry"
242                );
243            }
244        }
245
246        info!(
247            freed_mb = freed / (1024 * 1024),
248            remaining_mb = self.current_size.load(Ordering::SeqCst) / (1024 * 1024),
249            "Cache eviction complete"
250        );
251
252        Ok(())
253    }
254
255    /// Get the current size of the cache in bytes.
256    pub fn size(&self) -> u64 {
257        self.current_size.load(Ordering::SeqCst)
258    }
259
260    /// Get a list of all cached cells for heartbeat reporting.
261    ///
262    /// Returns a map of storage keys to their sizes in bytes.
263    pub async fn list_cached_cells(&self) -> HashMap<String, u64> {
264        let entries = self.entries.read().await;
265        entries
266            .iter()
267            .map(|(key, entry)| (key.clone(), entry.size))
268            .collect()
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275    use apiary_storage::local::LocalBackend;
276    use tempfile::TempDir;
277
278    #[tokio::test]
279    async fn test_cache_miss_fetches_from_storage() {
280        let storage_dir = TempDir::new().unwrap();
281        let cache_dir = TempDir::new().unwrap();
282
283        let storage = Arc::new(LocalBackend::new(storage_dir.path()).await.unwrap());
284
285        // Write a test cell to storage
286        let test_data = b"test cell data";
287        storage
288            .put("cells/test.parquet", test_data.as_slice().into())
289            .await
290            .unwrap();
291
292        // Create cache
293        let cache = CellCache::new(
294            cache_dir.path().to_path_buf(),
295            10 * 1024 * 1024, // 10 MB
296            storage.clone(),
297        )
298        .await
299        .unwrap();
300
301        // First access should be a cache miss
302        let path = cache.get("cells/test.parquet").await.unwrap();
303        assert!(path.exists());
304
305        // Read the cached file
306        let cached_data = fs::read(&path).await.unwrap();
307        assert_eq!(cached_data, test_data);
308
309        // Check cache size
310        assert_eq!(cache.size(), test_data.len() as u64);
311    }
312
313    #[tokio::test]
314    async fn test_cache_hit_returns_cached_path() {
315        let storage_dir = TempDir::new().unwrap();
316        let cache_dir = TempDir::new().unwrap();
317
318        let storage = Arc::new(LocalBackend::new(storage_dir.path()).await.unwrap());
319        storage
320            .put("cells/test.parquet", b"data".as_slice().into())
321            .await
322            .unwrap();
323
324        let cache = CellCache::new(
325            cache_dir.path().to_path_buf(),
326            10 * 1024 * 1024,
327            storage.clone(),
328        )
329        .await
330        .unwrap();
331
332        // First access (miss)
333        let path1 = cache.get("cells/test.parquet").await.unwrap();
334
335        // Second access (hit) - should return the same path
336        let path2 = cache.get("cells/test.parquet").await.unwrap();
337        assert_eq!(path1, path2);
338    }
339
340    #[tokio::test]
341    async fn test_lru_eviction() {
342        let storage_dir = TempDir::new().unwrap();
343        let cache_dir = TempDir::new().unwrap();
344
345        let storage = Arc::new(LocalBackend::new(storage_dir.path()).await.unwrap());
346
347        // Create test cells
348        let data1 = vec![1u8; 1024]; // 1 KB
349        let data2 = vec![2u8; 1024]; // 1 KB
350        let data3 = vec![3u8; 1024]; // 1 KB
351
352        storage
353            .put("cells/cell1.parquet", data1.into())
354            .await
355            .unwrap();
356        storage
357            .put("cells/cell2.parquet", data2.into())
358            .await
359            .unwrap();
360        storage
361            .put("cells/cell3.parquet", data3.into())
362            .await
363            .unwrap();
364
365        // Create cache with 2.5 KB limit (can hold 2 cells, will evict on 3rd)
366        let cache = CellCache::new(
367            cache_dir.path().to_path_buf(),
368            2500, // 2.5 KB
369            storage.clone(),
370        )
371        .await
372        .unwrap();
373
374        // Cache cell1
375        let path1 = cache.get("cells/cell1.parquet").await.unwrap();
376        assert!(path1.exists());
377
378        // Small delay to ensure different timestamps
379        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
380
381        // Cache cell2
382        let path2 = cache.get("cells/cell2.parquet").await.unwrap();
383        assert!(path2.exists());
384
385        // Small delay
386        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
387
388        // Cache cell3 - should trigger eviction of cell1 (oldest)
389        let path3 = cache.get("cells/cell3.parquet").await.unwrap();
390        assert!(path3.exists());
391
392        // cell1 should be evicted (file deleted)
393        assert!(!path1.exists());
394
395        // cell2 and cell3 should still be cached
396        assert!(path2.exists());
397        assert!(path3.exists());
398
399        // Cache size should be under limit
400        assert!(cache.size() <= 2500);
401    }
402
403    #[tokio::test]
404    async fn test_list_cached_cells() {
405        let storage_dir = TempDir::new().unwrap();
406        let cache_dir = TempDir::new().unwrap();
407
408        let storage = Arc::new(LocalBackend::new(storage_dir.path()).await.unwrap());
409        storage
410            .put("cells/cell1.parquet", b"data1".as_slice().into())
411            .await
412            .unwrap();
413        storage
414            .put("cells/cell2.parquet", b"data22".as_slice().into())
415            .await
416            .unwrap();
417
418        let cache = CellCache::new(
419            cache_dir.path().to_path_buf(),
420            10 * 1024 * 1024,
421            storage.clone(),
422        )
423        .await
424        .unwrap();
425
426        // Cache two cells
427        cache.get("cells/cell1.parquet").await.unwrap();
428        cache.get("cells/cell2.parquet").await.unwrap();
429
430        // List should return both
431        let cached = cache.list_cached_cells().await;
432        assert_eq!(cached.len(), 2);
433        assert_eq!(cached.get("cells/cell1.parquet"), Some(&5u64));
434        assert_eq!(cached.get("cells/cell2.parquet"), Some(&6u64));
435    }
436}