ngdp_cache/
generic.rs

1//! Generic cache implementation for arbitrary data
2
3use std::path::{Path, PathBuf};
4use tracing::{debug, trace};
5
6use crate::{Result, ensure_dir, get_cache_dir};
7
8/// Generic cache for storing arbitrary data
9pub struct GenericCache {
10    /// Base directory for this cache
11    base_dir: PathBuf,
12}
13
14impl GenericCache {
15    /// Create a new generic cache with the default directory
16    pub async fn new() -> Result<Self> {
17        let base_dir = get_cache_dir()?.join("generic");
18        ensure_dir(&base_dir).await?;
19
20        debug!("Initialized generic cache at: {:?}", base_dir);
21
22        Ok(Self { base_dir })
23    }
24
25    /// Create a new generic cache with a custom subdirectory
26    pub async fn with_subdirectory(subdir: &str) -> Result<Self> {
27        let base_dir = get_cache_dir()?.join("generic").join(subdir);
28        ensure_dir(&base_dir).await?;
29
30        debug!("Initialized generic cache at: {:?}", base_dir);
31
32        Ok(Self { base_dir })
33    }
34
35    /// Get the full path for a cache key
36    pub fn get_path(&self, key: &str) -> PathBuf {
37        self.base_dir.join(key)
38    }
39
40    /// Check if a cache entry exists
41    pub async fn exists(&self, key: &str) -> bool {
42        tokio::fs::metadata(self.get_path(key)).await.is_ok()
43    }
44
45    /// Write data to the cache
46    pub async fn write(&self, key: &str, data: &[u8]) -> Result<()> {
47        let path = self.get_path(key);
48
49        // Ensure parent directory exists
50        if let Some(parent) = path.parent() {
51            ensure_dir(parent).await?;
52        }
53
54        trace!("Writing {} bytes to cache key: {}", data.len(), key);
55        tokio::fs::write(&path, data).await?;
56
57        Ok(())
58    }
59
60    /// Read data from the cache
61    pub async fn read(&self, key: &str) -> Result<Vec<u8>> {
62        let path = self.get_path(key);
63
64        trace!("Reading from cache key: {}", key);
65        let data = tokio::fs::read(&path).await?;
66
67        Ok(data)
68    }
69
70    /// Delete a cache entry
71    pub async fn delete(&self, key: &str) -> Result<()> {
72        let path = self.get_path(key);
73
74        if tokio::fs::metadata(&path).await.is_ok() {
75            trace!("Deleting cache key: {}", key);
76            tokio::fs::remove_file(&path).await?;
77        }
78
79        Ok(())
80    }
81
82    /// Clear all entries in this cache
83    pub async fn clear(&self) -> Result<()> {
84        debug!("Clearing all entries in generic cache");
85
86        let mut entries = tokio::fs::read_dir(&self.base_dir).await?;
87        while let Some(entry) = entries.next_entry().await? {
88            let path = entry.path();
89            if let Ok(metadata) = tokio::fs::metadata(&path).await {
90                if metadata.is_file() {
91                    tokio::fs::remove_file(&path).await?;
92                }
93            }
94        }
95
96        Ok(())
97    }
98
99    /// Get the base directory of this cache
100    pub fn base_dir(&self) -> &Path {
101        &self.base_dir
102    }
103
104    /// Write multiple entries to the cache in parallel
105    ///
106    /// This is more efficient than calling write() multiple times sequentially.
107    pub async fn write_batch(&self, entries: &[(String, Vec<u8>)]) -> Result<()> {
108        use futures::future::try_join_all;
109
110        let futures = entries.iter().map(|(key, data)| self.write(key, data));
111
112        try_join_all(futures).await?;
113        Ok(())
114    }
115
116    /// Read multiple entries from the cache in parallel
117    ///
118    /// Returns a vector of results in the same order as the input keys.
119    /// Failed reads will be represented as Err values in the vector.
120    pub async fn read_batch(&self, keys: &[String]) -> Vec<Result<Vec<u8>>> {
121        use futures::future::join_all;
122
123        let futures = keys.iter().map(|key| self.read(key));
124        join_all(futures).await
125    }
126
127    /// Delete multiple entries from the cache in parallel
128    ///
129    /// This is more efficient than calling delete() multiple times sequentially.
130    pub async fn delete_batch(&self, keys: &[String]) -> Result<()> {
131        use futures::future::try_join_all;
132
133        let futures = keys.iter().map(|key| self.delete(key));
134        try_join_all(futures).await?;
135        Ok(())
136    }
137
138    /// Check existence of multiple entries in parallel
139    ///
140    /// Returns a vector of booleans in the same order as the input keys.
141    pub async fn exists_batch(&self, keys: &[String]) -> Vec<bool> {
142        use futures::future::join_all;
143
144        let futures = keys.iter().map(|key| self.exists(key));
145        join_all(futures).await
146    }
147
148    /// Stream data from cache to a writer
149    ///
150    /// This is more memory-efficient than `read()` for large files.
151    pub async fn read_streaming<W>(&self, key: &str, mut writer: W) -> Result<u64>
152    where
153        W: tokio::io::AsyncWrite + Unpin,
154    {
155        use tokio::io::AsyncWriteExt;
156
157        let path = self.get_path(key);
158        trace!("Streaming from cache key: {}", key);
159
160        let mut file = tokio::fs::File::open(&path).await?;
161        let bytes_copied = tokio::io::copy(&mut file, &mut writer).await?;
162        writer.flush().await?;
163
164        Ok(bytes_copied)
165    }
166
167    /// Stream data from a reader to cache
168    ///
169    /// This is more memory-efficient than `write()` for large data.
170    pub async fn write_streaming<R>(&self, key: &str, mut reader: R) -> Result<u64>
171    where
172        R: tokio::io::AsyncRead + Unpin,
173    {
174        use tokio::io::AsyncWriteExt;
175
176        let path = self.get_path(key);
177
178        // Ensure parent directory exists
179        if let Some(parent) = path.parent() {
180            ensure_dir(parent).await?;
181        }
182
183        trace!("Streaming to cache key: {}", key);
184
185        let mut file = tokio::fs::File::create(&path).await?;
186        let bytes_copied = tokio::io::copy(&mut reader, &mut file).await?;
187        file.flush().await?;
188
189        Ok(bytes_copied)
190    }
191
192    /// Process cache data in chunks without loading it all into memory
193    ///
194    /// The callback is called for each chunk read from the cache file.
195    pub async fn read_chunked<F>(&self, key: &str, mut callback: F) -> Result<u64>
196    where
197        F: FnMut(&[u8]) -> Result<()>,
198    {
199        use tokio::io::AsyncReadExt;
200
201        let path = self.get_path(key);
202        trace!("Reading cache key in chunks: {}", key);
203
204        let mut file = tokio::fs::File::open(&path).await?;
205        let mut buffer = vec![0u8; 8192]; // 8KB chunks
206        let mut total_bytes = 0u64;
207
208        loop {
209            let bytes_read = file.read(&mut buffer).await?;
210            if bytes_read == 0 {
211                break; // EOF
212            }
213
214            callback(&buffer[..bytes_read])?;
215            total_bytes += bytes_read as u64;
216        }
217
218        Ok(total_bytes)
219    }
220
221    /// Write data to cache in chunks from an iterator
222    ///
223    /// This allows writing large data without keeping it all in memory.
224    pub async fn write_chunked<I>(&self, key: &str, chunks: I) -> Result<u64>
225    where
226        I: IntoIterator<Item = Result<Vec<u8>>>,
227    {
228        use tokio::io::AsyncWriteExt;
229
230        let path = self.get_path(key);
231
232        // Ensure parent directory exists
233        if let Some(parent) = path.parent() {
234            ensure_dir(parent).await?;
235        }
236
237        trace!("Writing cache key in chunks: {}", key);
238
239        let mut file = tokio::fs::File::create(&path).await?;
240        let mut total_bytes = 0u64;
241
242        for chunk_result in chunks {
243            let chunk = chunk_result?;
244            file.write_all(&chunk).await?;
245            total_bytes += chunk.len() as u64;
246        }
247
248        file.flush().await?;
249        Ok(total_bytes)
250    }
251
252    /// Copy data between cache entries efficiently
253    ///
254    /// This is more efficient than read + write for large files.
255    pub async fn copy(&self, from_key: &str, to_key: &str) -> Result<u64> {
256        use tokio::io::AsyncWriteExt;
257
258        let from_path = self.get_path(from_key);
259        let to_path = self.get_path(to_key);
260
261        // Ensure parent directory exists for destination
262        if let Some(parent) = to_path.parent() {
263            ensure_dir(parent).await?;
264        }
265
266        trace!("Copying cache from {} to {}", from_key, to_key);
267
268        let mut from_file = tokio::fs::File::open(&from_path).await?;
269        let mut to_file = tokio::fs::File::create(&to_path).await?;
270
271        let bytes_copied = tokio::io::copy(&mut from_file, &mut to_file).await?;
272        to_file.flush().await?;
273
274        Ok(bytes_copied)
275    }
276
277    /// Get the size of a cache entry without reading it
278    pub async fn size(&self, key: &str) -> Result<u64> {
279        let path = self.get_path(key);
280        let metadata = tokio::fs::metadata(&path).await?;
281        Ok(metadata.len())
282    }
283
284    /// Stream data from cache with a custom buffer size
285    ///
286    /// Useful for optimizing I/O based on expected data size.
287    pub async fn read_streaming_buffered<W>(
288        &self,
289        key: &str,
290        writer: W,
291        buffer_size: usize,
292    ) -> Result<u64>
293    where
294        W: tokio::io::AsyncWrite + Unpin,
295    {
296        use tokio::io::{AsyncWriteExt, BufWriter};
297
298        let path = self.get_path(key);
299        trace!(
300            "Streaming from cache key with {}B buffer: {}",
301            buffer_size, key
302        );
303
304        let file = tokio::fs::File::open(&path).await?;
305        let mut reader = tokio::io::BufReader::with_capacity(buffer_size, file);
306        let mut writer = BufWriter::with_capacity(buffer_size, writer);
307
308        let bytes_copied = tokio::io::copy(&mut reader, &mut writer).await?;
309        writer.flush().await?;
310
311        Ok(bytes_copied)
312    }
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318
319    #[tokio::test]
320    async fn test_generic_cache_operations() {
321        let cache = GenericCache::with_subdirectory("test").await.unwrap();
322
323        // Test write and read
324        let key = "test_key";
325        let data = b"test data";
326
327        cache.write(key, data).await.unwrap();
328        assert!(cache.exists(key).await);
329
330        let read_data = cache.read(key).await.unwrap();
331        assert_eq!(read_data, data);
332
333        // Test delete
334        cache.delete(key).await.unwrap();
335        assert!(!cache.exists(key).await);
336
337        // Cleanup
338        let _ = cache.clear().await;
339    }
340
341    #[tokio::test]
342    async fn test_batch_operations() {
343        let cache = GenericCache::with_subdirectory("test_batch").await.unwrap();
344
345        // Test batch write
346        let entries = vec![
347            ("key1".to_string(), b"data1".to_vec()),
348            ("key2".to_string(), b"data2".to_vec()),
349            ("key3".to_string(), b"data3".to_vec()),
350        ];
351
352        cache.write_batch(&entries).await.unwrap();
353
354        // Test batch exists
355        let keys = vec![
356            "key1".to_string(),
357            "key2".to_string(),
358            "key3".to_string(),
359            "key4".to_string(),
360        ];
361        let exists = cache.exists_batch(&keys).await;
362        assert_eq!(exists, vec![true, true, true, false]);
363
364        // Test batch read
365        let keys = vec!["key1".to_string(), "key2".to_string(), "key3".to_string()];
366        let results = cache.read_batch(&keys).await;
367        assert_eq!(results.len(), 3);
368        assert_eq!(results[0].as_ref().unwrap(), b"data1");
369        assert_eq!(results[1].as_ref().unwrap(), b"data2");
370        assert_eq!(results[2].as_ref().unwrap(), b"data3");
371
372        // Test batch delete
373        let keys = vec!["key1".to_string(), "key2".to_string()];
374        cache.delete_batch(&keys).await.unwrap();
375        assert!(!cache.exists("key1").await);
376        assert!(!cache.exists("key2").await);
377        assert!(cache.exists("key3").await);
378
379        // Cleanup
380        let _ = cache.clear().await;
381    }
382
383    #[tokio::test]
384    async fn test_streaming_operations() {
385        let cache = GenericCache::with_subdirectory("test_streaming")
386            .await
387            .unwrap();
388
389        // Test streaming write
390        let key = "streaming_test";
391        let test_data = b"Hello, streaming world! This is a test of streaming I/O operations.";
392        let mut reader = std::io::Cursor::new(test_data);
393
394        let bytes_written = cache.write_streaming(key, &mut reader).await.unwrap();
395        assert_eq!(bytes_written, test_data.len() as u64);
396        assert!(cache.exists(key).await);
397
398        // Test streaming read
399        let mut output = Vec::new();
400        let bytes_read = cache.read_streaming(key, &mut output).await.unwrap();
401        assert_eq!(bytes_read, test_data.len() as u64);
402        assert_eq!(output, test_data);
403
404        // Test size
405        let size = cache.size(key).await.unwrap();
406        assert_eq!(size, test_data.len() as u64);
407
408        // Cleanup
409        let _ = cache.clear().await;
410    }
411
412    #[tokio::test]
413    async fn test_chunked_operations() {
414        let cache = GenericCache::with_subdirectory("test_chunked")
415            .await
416            .unwrap();
417
418        // Test chunked write
419        let key = "chunked_test";
420        let chunks = vec![
421            Ok(b"chunk1".to_vec()),
422            Ok(b"chunk2".to_vec()),
423            Ok(b"chunk3".to_vec()),
424        ];
425
426        let bytes_written = cache.write_chunked(key, chunks).await.unwrap();
427        assert_eq!(bytes_written, 18); // 6 + 6 + 6 bytes
428        assert!(cache.exists(key).await);
429
430        // Test chunked read
431        let mut collected_data = Vec::new();
432        let bytes_read = cache
433            .read_chunked(key, |chunk| {
434                collected_data.extend_from_slice(chunk);
435                Ok(())
436            })
437            .await
438            .unwrap();
439
440        assert_eq!(bytes_read, 18);
441        assert_eq!(collected_data, b"chunk1chunk2chunk3");
442
443        // Cleanup
444        let _ = cache.clear().await;
445    }
446
447    #[tokio::test]
448    async fn test_copy_operation() {
449        let cache = GenericCache::with_subdirectory("test_copy").await.unwrap();
450
451        // Create source data
452        let source_key = "source";
453        let dest_key = "destination";
454        let test_data = b"This data will be copied between cache entries";
455
456        cache.write(source_key, test_data).await.unwrap();
457
458        // Test copy
459        let bytes_copied = cache.copy(source_key, dest_key).await.unwrap();
460        assert_eq!(bytes_copied, test_data.len() as u64);
461
462        // Verify both entries exist and have same content
463        assert!(cache.exists(source_key).await);
464        assert!(cache.exists(dest_key).await);
465
466        let source_data = cache.read(source_key).await.unwrap();
467        let dest_data = cache.read(dest_key).await.unwrap();
468        assert_eq!(source_data, dest_data);
469        assert_eq!(source_data, test_data);
470
471        // Cleanup
472        let _ = cache.clear().await;
473    }
474
475    #[tokio::test]
476    async fn test_buffered_streaming() {
477        let cache = GenericCache::with_subdirectory("test_buffered")
478            .await
479            .unwrap();
480
481        // Create test data
482        let key = "buffered_test";
483        let test_data = vec![42u8; 16384]; // 16KB of data
484
485        cache.write(key, &test_data).await.unwrap();
486
487        // Test buffered streaming with custom buffer size
488        let mut output = Vec::new();
489        let bytes_read = cache
490            .read_streaming_buffered(key, &mut output, 4096)
491            .await
492            .unwrap();
493
494        assert_eq!(bytes_read, test_data.len() as u64);
495        assert_eq!(output, test_data);
496
497        // Cleanup
498        let _ = cache.clear().await;
499    }
500
501    #[tokio::test]
502    async fn test_large_file_streaming() {
503        let cache = GenericCache::with_subdirectory("test_large").await.unwrap();
504
505        // Create a larger test file (1MB)
506        let key = "large_test";
507        let chunk_size = 8192;
508        let num_chunks = 128; // 128 * 8192 = 1MB
509
510        // Write in chunks
511        let chunks: Vec<Result<Vec<u8>>> = (0..num_chunks)
512            .map(|i| Ok(vec![(i % 256) as u8; chunk_size]))
513            .collect();
514
515        let bytes_written = cache.write_chunked(key, chunks).await.unwrap();
516        assert_eq!(bytes_written, (chunk_size * num_chunks) as u64);
517
518        // Read back in chunks and verify
519        let mut total_read = 0u64;
520        let mut chunk_count = 0;
521
522        cache
523            .read_chunked(key, |chunk| {
524                total_read += chunk.len() as u64;
525                chunk_count += 1;
526                Ok(())
527            })
528            .await
529            .unwrap();
530
531        assert_eq!(total_read, bytes_written);
532        assert!(chunk_count > 0); // Should be multiple chunks due to 8KB buffer
533
534        // Cleanup
535        let _ = cache.clear().await;
536    }
537}