ngdp_cache/
generic.rs

1//! Generic cache implementation for arbitrary data
2
3use parking_lot::Mutex;
4use std::collections::{HashMap, VecDeque};
5use std::path::{Path, PathBuf};
6use std::sync::{Arc, atomic::AtomicU64, atomic::Ordering};
7use std::time::{SystemTime, UNIX_EPOCH};
8use tracing::{debug, trace, warn};
9
10use crate::{CacheStats, Result, ensure_dir, get_cache_dir};
11
12/// Cache entry metadata for LRU tracking
13#[derive(Debug, Clone)]
14struct CacheEntryMetadata {
15    /// Last access timestamp
16    last_accessed: u64,
17    /// File size in bytes
18    size: u64,
19    /// Access count for statistics
20    access_count: u64,
21}
22
23/// Combined LRU state to reduce lock contention
24#[derive(Debug)]
25struct LruState {
26    /// Metadata for each cache entry
27    metadata: HashMap<String, CacheEntryMetadata>,
28    /// Access order for LRU eviction (most recent at back)
29    access_order: VecDeque<String>,
30}
31
32/// Generic cache for storing arbitrary data with LRU eviction
33pub struct GenericCache {
34    /// Base directory for this cache
35    base_dir: PathBuf,
36    /// Combined LRU tracking state (reduced lock contention)
37    lru_state: Arc<Mutex<LruState>>,
38    /// Maximum cache size in bytes (None for unlimited)
39    max_size_bytes: Option<u64>,
40    /// Maximum number of entries (None for unlimited)
41    max_entries: Option<usize>,
42    /// Current cache size in bytes (atomic for better performance)
43    current_size: Arc<AtomicU64>,
44    /// Cache statistics
45    stats: Arc<CacheStats>,
46}
47
48impl GenericCache {
49    /// Create a new generic cache with the default directory
50    pub async fn new() -> Result<Self> {
51        Self::with_config(None, None, None).await
52    }
53
54    /// Create a new generic cache with a custom subdirectory
55    pub async fn with_subdirectory(subdir: &str) -> Result<Self> {
56        let base_dir = get_cache_dir()?.join("generic").join(subdir);
57        Self::with_config_and_path(base_dir, None, None, None).await
58    }
59
60    /// Create a new cache with size and entry limits
61    pub async fn with_limits(
62        max_size_bytes: Option<u64>,
63        max_entries: Option<usize>,
64    ) -> Result<Self> {
65        Self::with_config(Some("generic"), max_size_bytes, max_entries).await
66    }
67
68    /// Create a cache with full configuration
69    pub async fn with_config(
70        subdir: Option<&str>,
71        max_size_bytes: Option<u64>,
72        max_entries: Option<usize>,
73    ) -> Result<Self> {
74        let base_dir = match subdir {
75            Some(sub) => get_cache_dir()?.join(sub),
76            None => get_cache_dir()?.join("generic"),
77        };
78
79        Self::with_config_and_path(base_dir, max_size_bytes, max_entries, None).await
80    }
81
82    /// Create a cache with full configuration and custom path
83    pub async fn with_config_and_path(
84        base_dir: PathBuf,
85        max_size_bytes: Option<u64>,
86        max_entries: Option<usize>,
87        stats: Option<Arc<CacheStats>>,
88    ) -> Result<Self> {
89        ensure_dir(&base_dir).await?;
90
91        let stats = stats.unwrap_or_else(|| Arc::new(CacheStats::new()));
92        let cache = Self {
93            base_dir: base_dir.clone(),
94            lru_state: Arc::new(Mutex::new(LruState {
95                metadata: HashMap::new(),
96                access_order: VecDeque::new(),
97            })),
98            max_size_bytes,
99            max_entries,
100            current_size: Arc::new(AtomicU64::new(0)),
101            stats,
102        };
103
104        // Initialize cache state by scanning existing files
105        cache.initialize_cache_state().await?;
106
107        debug!(
108            "Initialized generic cache at: {:?} (max_size: {:?} bytes, max_entries: {:?})",
109            base_dir, max_size_bytes, max_entries
110        );
111
112        Ok(cache)
113    }
114
115    /// Initialize cache state from existing files
116    async fn initialize_cache_state(&self) -> Result<()> {
117        // Collect file information without holding locks during async operations
118        let mut entries = tokio::fs::read_dir(&self.base_dir).await?;
119        let mut file_entries = Vec::new();
120        let mut total_size = 0u64;
121
122        while let Some(entry) = entries.next_entry().await? {
123            let path = entry.path();
124            if let Ok(metadata_fs) = tokio::fs::metadata(&path).await {
125                if metadata_fs.is_file() {
126                    if let Some(key) = path.file_name().and_then(|n| n.to_str()) {
127                        let modified_time = metadata_fs
128                            .modified()
129                            .unwrap_or(SystemTime::UNIX_EPOCH)
130                            .duration_since(UNIX_EPOCH)
131                            .unwrap_or_default()
132                            .as_secs();
133
134                        file_entries.push((key.to_string(), metadata_fs.len(), modified_time));
135                        total_size += metadata_fs.len();
136                    }
137                }
138            }
139        }
140
141        // Sort by modification time (oldest first for proper LRU order)
142        file_entries.sort_by_key(|(_, _, time)| *time);
143
144        // Now update the internal state with single lock acquisition
145        {
146            let mut lru_state = self.lru_state.lock();
147            self.current_size.store(total_size, Ordering::Relaxed);
148
149            for (key, size, time) in file_entries {
150                let entry_metadata = CacheEntryMetadata {
151                    last_accessed: time,
152                    size,
153                    access_count: 0,
154                };
155
156                lru_state.metadata.insert(key.clone(), entry_metadata);
157                lru_state.access_order.push_back(key);
158            }
159
160            debug!(
161                "Initialized cache with {} entries, total size: {} bytes",
162                lru_state.metadata.len(),
163                total_size
164            );
165        }
166
167        Ok(())
168    }
169
170    /// Get current timestamp in seconds since Unix epoch
171    fn current_timestamp() -> u64 {
172        SystemTime::now()
173            .duration_since(UNIX_EPOCH)
174            .unwrap_or_default()
175            .as_secs()
176    }
177
178    /// Update access order for LRU tracking - optimized to O(1) for most cases
179    fn update_access_order(&self, key: &str) {
180        let mut lru_state = self.lru_state.lock();
181
182        // Check if key is already at the back (most recent)
183        if lru_state.access_order.back() == Some(&key.to_string()) {
184            // Already most recent, just update metadata
185            if let Some(entry) = lru_state.metadata.get_mut(key) {
186                entry.last_accessed = Self::current_timestamp();
187                entry.access_count += 1;
188            }
189            return;
190        }
191
192        // Remove key from current position (still O(n) worst case)
193        lru_state.access_order.retain(|k| k != key);
194
195        // Add to back (most recent)
196        lru_state.access_order.push_back(key.to_string());
197
198        // Update access metadata
199        if let Some(entry) = lru_state.metadata.get_mut(key) {
200            entry.last_accessed = Self::current_timestamp();
201            entry.access_count += 1;
202        }
203    }
204
205    /// Evict least recently used entries to make space
206    async fn evict_if_needed(&self, new_entry_size: u64) -> Result<()> {
207        let max_size = self.max_size_bytes;
208        let max_entries = self.max_entries;
209
210        if max_size.is_none() && max_entries.is_none() {
211            return Ok(()); // No limits set
212        }
213
214        let current_size = self.current_size.load(Ordering::Relaxed);
215        let current_entries = self.lru_state.lock().metadata.len();
216
217        // Check if eviction is needed
218        let size_exceeded = max_size
219            .map(|max| current_size + new_entry_size > max)
220            .unwrap_or(false);
221        let entries_exceeded = max_entries
222            .map(|max| current_entries >= max)
223            .unwrap_or(false);
224
225        if !size_exceeded && !entries_exceeded {
226            return Ok(());
227        }
228
229        debug!(
230            "Cache eviction needed: size_exceeded={}, entries_exceeded={}",
231            size_exceeded, entries_exceeded
232        );
233
234        // Evict entries until we have space
235        let mut evicted_count = 0;
236        let mut evicted_bytes = 0;
237
238        loop {
239            let (key_to_evict, entry_size) = {
240                let lru_state = self.lru_state.lock();
241                let key = lru_state.access_order.front().cloned();
242                let size = key
243                    .as_ref()
244                    .and_then(|k| lru_state.metadata.get(k))
245                    .map(|e| e.size)
246                    .unwrap_or(0);
247                (key, size)
248            };
249
250            let Some(key) = key_to_evict else { break };
251
252            // Remove the file and metadata
253            if let Err(e) = self.evict_entry(&key).await {
254                warn!("Failed to evict cache entry '{}': {}", key, e);
255                break;
256            }
257
258            evicted_count += 1;
259            evicted_bytes += entry_size;
260            self.stats.record_eviction(entry_size);
261
262            // Check if we have enough space now
263            let new_current_size = self.current_size.load(Ordering::Relaxed);
264            let new_current_entries = self.lru_state.lock().metadata.len();
265
266            let size_ok = max_size
267                .map(|max| new_current_size + new_entry_size <= max)
268                .unwrap_or(true);
269            let entries_ok = max_entries
270                .map(|max| new_current_entries < max)
271                .unwrap_or(true);
272
273            if size_ok && entries_ok {
274                break;
275            }
276
277            // Safety check to prevent infinite loops
278            if evicted_count > 1000 {
279                warn!(
280                    "Evicted {} entries but still need more space, stopping",
281                    evicted_count
282                );
283                break;
284            }
285        }
286
287        if evicted_count > 0 {
288            debug!(
289                "Evicted {} entries ({} bytes)",
290                evicted_count, evicted_bytes
291            );
292        }
293
294        Ok(())
295    }
296
297    /// Evict a specific cache entry
298    async fn evict_entry(&self, key: &str) -> Result<()> {
299        let path = self.get_path(key);
300
301        // Remove from filesystem
302        if tokio::fs::metadata(&path).await.is_ok() {
303            tokio::fs::remove_file(&path).await?;
304        }
305
306        // Remove from tracking structures with single lock
307        let mut lru_state = self.lru_state.lock();
308
309        if let Some(entry_metadata) = lru_state.metadata.remove(key) {
310            self.current_size
311                .fetch_sub(entry_metadata.size, Ordering::Relaxed);
312        }
313
314        lru_state.access_order.retain(|k| k != key);
315
316        trace!("Evicted cache entry: {}", key);
317        Ok(())
318    }
319
320    /// Get the full path for a cache key
321    pub fn get_path(&self, key: &str) -> PathBuf {
322        self.base_dir.join(key)
323    }
324
325    /// Get cache statistics
326    pub fn stats(&self) -> Arc<CacheStats> {
327        Arc::clone(&self.stats)
328    }
329
330    /// Get current cache size in bytes
331    pub fn current_size(&self) -> u64 {
332        self.current_size.load(Ordering::Relaxed)
333    }
334
335    /// Get current number of entries
336    pub fn current_entries(&self) -> usize {
337        self.lru_state.lock().metadata.len()
338    }
339
340    /// Get cache configuration
341    pub fn config(&self) -> (Option<u64>, Option<usize>) {
342        (self.max_size_bytes, self.max_entries)
343    }
344
345    /// Check if a cache entry exists
346    pub async fn exists(&self, key: &str) -> bool {
347        let exists = tokio::fs::metadata(self.get_path(key)).await.is_ok();
348        if exists {
349            // Update access order for exists check
350            self.update_access_order(key);
351        }
352        exists
353    }
354
355    /// Write data to the cache
356    pub async fn write(&self, key: &str, data: &[u8]) -> Result<()> {
357        let data_size = data.len() as u64;
358
359        // Check if we need to evict entries to make space
360        self.evict_if_needed(data_size).await?;
361
362        let path = self.get_path(key);
363
364        // Ensure parent directory exists
365        if let Some(parent) = path.parent() {
366            ensure_dir(parent).await?;
367        }
368
369        // Check if this is an existing entry (for size tracking)
370        let existing_size = {
371            let lru_state = self.lru_state.lock();
372            lru_state.metadata.get(key).map(|e| e.size).unwrap_or(0)
373        };
374
375        trace!("Writing {} bytes to cache key: {}", data.len(), key);
376        tokio::fs::write(&path, data).await?;
377
378        // Update tracking metadata with single lock
379        {
380            let mut lru_state = self.lru_state.lock();
381
382            // Update size tracking atomically
383            self.current_size
384                .fetch_sub(existing_size, Ordering::Relaxed);
385            self.current_size.fetch_add(data_size, Ordering::Relaxed);
386
387            // Update or create entry metadata
388            let entry_metadata = CacheEntryMetadata {
389                last_accessed: Self::current_timestamp(),
390                size: data_size,
391                access_count: 0, // Will be incremented to 1 by update_access_order below
392            };
393            lru_state.metadata.insert(key.to_string(), entry_metadata);
394        }
395
396        // Update access order
397        self.update_access_order(key);
398
399        // Record statistics
400        self.stats.record_write(data_size);
401
402        Ok(())
403    }
404
405    /// Read data from the cache
406    pub async fn read(&self, key: &str) -> Result<Vec<u8>> {
407        let path = self.get_path(key);
408
409        trace!("Reading from cache key: {}", key);
410        let data = tokio::fs::read(&path).await?;
411
412        // Update access order for cache hit
413        self.update_access_order(key);
414
415        // Record cache hit statistics
416        self.stats.record_hit(data.len() as u64);
417
418        Ok(data)
419    }
420
421    /// Stream data from cache to a writer (memory-efficient for large cached files)
422    pub async fn read_to_writer<W>(&self, key: &str, mut writer: W) -> Result<u64>
423    where
424        W: tokio::io::AsyncWrite + Unpin,
425    {
426        let path = self.get_path(key);
427
428        trace!("Streaming from cache key: {}", key);
429        let mut file = tokio::fs::File::open(&path).await?;
430
431        let bytes_copied = tokio::io::copy(&mut file, &mut writer).await?;
432
433        // Update access order for cache hit
434        self.update_access_order(key);
435
436        // Record cache hit statistics
437        self.stats.record_hit(bytes_copied);
438
439        Ok(bytes_copied)
440    }
441
442    /// Delete a cache entry
443    pub async fn delete(&self, key: &str) -> Result<()> {
444        let path = self.get_path(key);
445
446        if tokio::fs::metadata(&path).await.is_ok() {
447            trace!("Deleting cache key: {}", key);
448            tokio::fs::remove_file(&path).await?;
449        }
450
451        // Update tracking metadata with single lock
452        {
453            let mut lru_state = self.lru_state.lock();
454
455            if let Some(entry_metadata) = lru_state.metadata.remove(key) {
456                self.current_size
457                    .fetch_sub(entry_metadata.size, Ordering::Relaxed);
458            }
459
460            lru_state.access_order.retain(|k| k != key);
461        }
462
463        // Record delete statistics
464        self.stats.record_delete();
465
466        Ok(())
467    }
468
469    /// Clear all entries in this cache
470    pub async fn clear(&self) -> Result<()> {
471        debug!("Clearing all entries in generic cache");
472
473        let mut entries = tokio::fs::read_dir(&self.base_dir).await?;
474        while let Some(entry) = entries.next_entry().await? {
475            let path = entry.path();
476            if let Ok(metadata) = tokio::fs::metadata(&path).await {
477                if metadata.is_file() {
478                    tokio::fs::remove_file(&path).await?;
479                }
480            }
481        }
482
483        // Clear tracking metadata with single lock
484        {
485            let mut lru_state = self.lru_state.lock();
486
487            lru_state.metadata.clear();
488            lru_state.access_order.clear();
489            self.current_size.store(0, Ordering::Relaxed);
490        }
491
492        Ok(())
493    }
494
495    /// Warm cache with a list of keys by pre-loading them into LRU order
496    pub async fn warm_cache(&self, keys: &[String]) -> Result<()> {
497        debug!("Warming cache with {} keys", keys.len());
498
499        for key in keys {
500            if self.exists(key).await {
501                // exists() already updates access order
502                trace!("Warmed cache key: {}", key);
503            }
504        }
505
506        Ok(())
507    }
508
509    /// Get LRU ordered list of cache keys (least recently used first)
510    pub fn get_lru_keys(&self) -> Vec<String> {
511        let lru_state = self.lru_state.lock();
512        lru_state.access_order.iter().cloned().collect()
513    }
514
515    /// Get most recently used keys (up to limit)
516    pub fn get_mru_keys(&self, limit: usize) -> Vec<String> {
517        let lru_state = self.lru_state.lock();
518        lru_state
519            .access_order
520            .iter()
521            .rev()
522            .take(limit)
523            .cloned()
524            .collect()
525    }
526
527    /// Get cache entry metadata
528    pub fn get_entry_info(&self, key: &str) -> Option<(u64, u64, u64)> {
529        let lru_state = self.lru_state.lock();
530        lru_state
531            .metadata
532            .get(key)
533            .map(|e| (e.size, e.last_accessed, e.access_count))
534    }
535
536    /// Get the base directory of this cache
537    pub fn base_dir(&self) -> &Path {
538        &self.base_dir
539    }
540
541    /// Write multiple entries to the cache in parallel
542    ///
543    /// This is more efficient than calling write() multiple times sequentially.
544    pub async fn write_batch(&self, entries: &[(String, Vec<u8>)]) -> Result<()> {
545        use futures::future::try_join_all;
546
547        let futures = entries.iter().map(|(key, data)| self.write(key, data));
548
549        try_join_all(futures).await?;
550        Ok(())
551    }
552
553    /// Read multiple entries from the cache in parallel
554    ///
555    /// Returns a vector of results in the same order as the input keys.
556    /// Failed reads will be represented as Err values in the vector.
557    pub async fn read_batch(&self, keys: &[String]) -> Vec<Result<Vec<u8>>> {
558        use futures::future::join_all;
559
560        let futures = keys.iter().map(|key| self.read(key));
561        join_all(futures).await
562    }
563
564    /// Delete multiple entries from the cache in parallel
565    ///
566    /// This is more efficient than calling delete() multiple times sequentially.
567    pub async fn delete_batch(&self, keys: &[String]) -> Result<()> {
568        use futures::future::try_join_all;
569
570        let futures = keys.iter().map(|key| self.delete(key));
571        try_join_all(futures).await?;
572        Ok(())
573    }
574
575    /// Check existence of multiple entries in parallel
576    ///
577    /// Returns a vector of booleans in the same order as the input keys.
578    pub async fn exists_batch(&self, keys: &[String]) -> Vec<bool> {
579        use futures::future::join_all;
580
581        let futures = keys.iter().map(|key| self.exists(key));
582        join_all(futures).await
583    }
584
585    /// Stream data from cache to a writer
586    ///
587    /// This is more memory-efficient than `read()` for large files.
588    pub async fn read_streaming<W>(&self, key: &str, mut writer: W) -> Result<u64>
589    where
590        W: tokio::io::AsyncWrite + Unpin,
591    {
592        use tokio::io::AsyncWriteExt;
593
594        let path = self.get_path(key);
595        trace!("Streaming from cache key: {}", key);
596
597        let mut file = tokio::fs::File::open(&path).await?;
598        let bytes_copied = tokio::io::copy(&mut file, &mut writer).await?;
599        writer.flush().await?;
600
601        // Update access order and record cache hit
602        self.update_access_order(key);
603        self.stats.record_hit(bytes_copied);
604
605        Ok(bytes_copied)
606    }
607
608    /// Stream data from a reader to cache
609    ///
610    /// This is more memory-efficient than `write()` for large data.
611    pub async fn write_streaming<R>(&self, key: &str, mut reader: R) -> Result<u64>
612    where
613        R: tokio::io::AsyncRead + Unpin,
614    {
615        use tokio::io::AsyncWriteExt;
616
617        // Check if this is an existing entry (for size tracking)
618        let existing_size = {
619            let lru_state = self.lru_state.lock();
620            lru_state.metadata.get(key).map(|e| e.size).unwrap_or(0)
621        };
622
623        let path = self.get_path(key);
624
625        // Ensure parent directory exists
626        if let Some(parent) = path.parent() {
627            ensure_dir(parent).await?;
628        }
629
630        trace!("Streaming to cache key: {}", key);
631
632        let mut file = tokio::fs::File::create(&path).await?;
633        let bytes_copied = tokio::io::copy(&mut reader, &mut file).await?;
634        file.flush().await?;
635
636        // Check if we need to evict entries after writing
637        self.evict_if_needed(0).await?; // Size check after write
638
639        // Update tracking metadata with single lock
640        {
641            let mut lru_state = self.lru_state.lock();
642
643            // Update size tracking atomically
644            self.current_size
645                .fetch_sub(existing_size, Ordering::Relaxed);
646            self.current_size.fetch_add(bytes_copied, Ordering::Relaxed);
647
648            // Update or create entry metadata
649            let entry_metadata = CacheEntryMetadata {
650                last_accessed: Self::current_timestamp(),
651                size: bytes_copied,
652                access_count: 0, // Will be incremented to 1 by update_access_order below
653            };
654            lru_state.metadata.insert(key.to_string(), entry_metadata);
655        }
656
657        // Update access order
658        self.update_access_order(key);
659
660        // Record statistics
661        self.stats.record_write(bytes_copied);
662
663        Ok(bytes_copied)
664    }
665
666    /// Process cache data in chunks without loading it all into memory
667    ///
668    /// The callback is called for each chunk read from the cache file.
669    pub async fn read_chunked<F>(&self, key: &str, mut callback: F) -> Result<u64>
670    where
671        F: FnMut(&[u8]) -> Result<()>,
672    {
673        use tokio::io::AsyncReadExt;
674
675        let path = self.get_path(key);
676        trace!("Reading cache key in chunks: {}", key);
677
678        let mut file = tokio::fs::File::open(&path).await?;
679        let mut buffer = vec![0u8; 8192]; // 8KB chunks
680        let mut total_bytes = 0u64;
681
682        loop {
683            let bytes_read = file.read(&mut buffer).await?;
684            if bytes_read == 0 {
685                break; // EOF
686            }
687
688            callback(&buffer[..bytes_read])?;
689            total_bytes += bytes_read as u64;
690        }
691
692        Ok(total_bytes)
693    }
694
695    /// Write data to cache in chunks from an iterator
696    ///
697    /// This allows writing large data without keeping it all in memory.
698    pub async fn write_chunked<I>(&self, key: &str, chunks: I) -> Result<u64>
699    where
700        I: IntoIterator<Item = Result<Vec<u8>>>,
701    {
702        use tokio::io::AsyncWriteExt;
703
704        let path = self.get_path(key);
705
706        // Ensure parent directory exists
707        if let Some(parent) = path.parent() {
708            ensure_dir(parent).await?;
709        }
710
711        trace!("Writing cache key in chunks: {}", key);
712
713        let mut file = tokio::fs::File::create(&path).await?;
714        let mut total_bytes = 0u64;
715
716        for chunk_result in chunks {
717            let chunk = chunk_result?;
718            file.write_all(&chunk).await?;
719            total_bytes += chunk.len() as u64;
720        }
721
722        file.flush().await?;
723        Ok(total_bytes)
724    }
725
726    /// Copy data between cache entries efficiently
727    ///
728    /// This is more efficient than read + write for large files.
729    pub async fn copy(&self, from_key: &str, to_key: &str) -> Result<u64> {
730        use tokio::io::AsyncWriteExt;
731
732        let from_path = self.get_path(from_key);
733        let to_path = self.get_path(to_key);
734
735        // Ensure parent directory exists for destination
736        if let Some(parent) = to_path.parent() {
737            ensure_dir(parent).await?;
738        }
739
740        trace!("Copying cache from {} to {}", from_key, to_key);
741
742        let mut from_file = tokio::fs::File::open(&from_path).await?;
743        let mut to_file = tokio::fs::File::create(&to_path).await?;
744
745        let bytes_copied = tokio::io::copy(&mut from_file, &mut to_file).await?;
746        to_file.flush().await?;
747
748        Ok(bytes_copied)
749    }
750
751    /// Get the size of a cache entry without reading it
752    pub async fn size(&self, key: &str) -> Result<u64> {
753        let path = self.get_path(key);
754        let metadata = tokio::fs::metadata(&path).await?;
755
756        // Update access order for size check
757        self.update_access_order(key);
758
759        Ok(metadata.len())
760    }
761
762    /// Stream data from cache with a custom buffer size
763    ///
764    /// Useful for optimizing I/O based on expected data size.
765    pub async fn read_streaming_buffered<W>(
766        &self,
767        key: &str,
768        writer: W,
769        buffer_size: usize,
770    ) -> Result<u64>
771    where
772        W: tokio::io::AsyncWrite + Unpin,
773    {
774        use tokio::io::{AsyncWriteExt, BufWriter};
775
776        let path = self.get_path(key);
777        trace!(
778            "Streaming from cache key with {}B buffer: {}",
779            buffer_size, key
780        );
781
782        let file = tokio::fs::File::open(&path).await?;
783        let mut reader = tokio::io::BufReader::with_capacity(buffer_size, file);
784        let mut writer = BufWriter::with_capacity(buffer_size, writer);
785
786        let bytes_copied = tokio::io::copy(&mut reader, &mut writer).await?;
787        writer.flush().await?;
788
789        Ok(bytes_copied)
790    }
791}
792
793#[cfg(test)]
794mod tests {
795    use super::*;
796
797    #[tokio::test]
798    async fn test_generic_cache_operations() {
799        let cache = GenericCache::with_subdirectory("test").await.unwrap();
800
801        // Test write and read
802        let key = "test_key";
803        let data = b"test data";
804
805        cache.write(key, data).await.unwrap();
806        assert!(cache.exists(key).await);
807
808        let read_data = cache.read(key).await.unwrap();
809        assert_eq!(read_data, data);
810
811        // Test delete
812        cache.delete(key).await.unwrap();
813        assert!(!cache.exists(key).await);
814
815        // Cleanup
816        let _ = cache.clear().await;
817    }
818
819    #[tokio::test]
820    async fn test_batch_operations() {
821        let cache = GenericCache::with_subdirectory("test_batch").await.unwrap();
822
823        // Test batch write
824        let entries = vec![
825            ("key1".to_string(), b"data1".to_vec()),
826            ("key2".to_string(), b"data2".to_vec()),
827            ("key3".to_string(), b"data3".to_vec()),
828        ];
829
830        cache.write_batch(&entries).await.unwrap();
831
832        // Test batch exists
833        let keys = vec![
834            "key1".to_string(),
835            "key2".to_string(),
836            "key3".to_string(),
837            "key4".to_string(),
838        ];
839        let exists = cache.exists_batch(&keys).await;
840        assert_eq!(exists, vec![true, true, true, false]);
841
842        // Test batch read
843        let keys = vec!["key1".to_string(), "key2".to_string(), "key3".to_string()];
844        let results = cache.read_batch(&keys).await;
845        assert_eq!(results.len(), 3);
846        assert_eq!(results[0].as_ref().unwrap(), b"data1");
847        assert_eq!(results[1].as_ref().unwrap(), b"data2");
848        assert_eq!(results[2].as_ref().unwrap(), b"data3");
849
850        // Test batch delete
851        let keys = vec!["key1".to_string(), "key2".to_string()];
852        cache.delete_batch(&keys).await.unwrap();
853        assert!(!cache.exists("key1").await);
854        assert!(!cache.exists("key2").await);
855        assert!(cache.exists("key3").await);
856
857        // Cleanup
858        let _ = cache.clear().await;
859    }
860
861    #[tokio::test]
862    async fn test_streaming_operations() {
863        let cache = GenericCache::with_subdirectory("test_streaming")
864            .await
865            .unwrap();
866
867        // Test streaming write
868        let key = "streaming_test";
869        let test_data = b"Hello, streaming world! This is a test of streaming I/O operations.";
870        let mut reader = std::io::Cursor::new(test_data);
871
872        let bytes_written = cache.write_streaming(key, &mut reader).await.unwrap();
873        assert_eq!(bytes_written, test_data.len() as u64);
874        assert!(cache.exists(key).await);
875
876        // Test streaming read
877        let mut output = Vec::new();
878        let bytes_read = cache.read_streaming(key, &mut output).await.unwrap();
879        assert_eq!(bytes_read, test_data.len() as u64);
880        assert_eq!(output, test_data);
881
882        // Test size
883        let size = cache.size(key).await.unwrap();
884        assert_eq!(size, test_data.len() as u64);
885
886        // Cleanup
887        let _ = cache.clear().await;
888    }
889
890    #[tokio::test]
891    async fn test_chunked_operations() {
892        let cache = GenericCache::with_subdirectory("test_chunked")
893            .await
894            .unwrap();
895
896        // Test chunked write
897        let key = "chunked_test";
898        let chunks = vec![
899            Ok(b"chunk1".to_vec()),
900            Ok(b"chunk2".to_vec()),
901            Ok(b"chunk3".to_vec()),
902        ];
903
904        let bytes_written = cache.write_chunked(key, chunks).await.unwrap();
905        assert_eq!(bytes_written, 18); // 6 + 6 + 6 bytes
906        assert!(cache.exists(key).await);
907
908        // Test chunked read
909        let mut collected_data = Vec::new();
910        let bytes_read = cache
911            .read_chunked(key, |chunk| {
912                collected_data.extend_from_slice(chunk);
913                Ok(())
914            })
915            .await
916            .unwrap();
917
918        assert_eq!(bytes_read, 18);
919        assert_eq!(collected_data, b"chunk1chunk2chunk3");
920
921        // Cleanup
922        let _ = cache.clear().await;
923    }
924
925    #[tokio::test]
926    async fn test_copy_operation() {
927        let cache = GenericCache::with_subdirectory("test_copy").await.unwrap();
928
929        // Create source data
930        let source_key = "source";
931        let dest_key = "destination";
932        let test_data = b"This data will be copied between cache entries";
933
934        cache.write(source_key, test_data).await.unwrap();
935
936        // Test copy
937        let bytes_copied = cache.copy(source_key, dest_key).await.unwrap();
938        assert_eq!(bytes_copied, test_data.len() as u64);
939
940        // Verify both entries exist and have same content
941        assert!(cache.exists(source_key).await);
942        assert!(cache.exists(dest_key).await);
943
944        let source_data = cache.read(source_key).await.unwrap();
945        let dest_data = cache.read(dest_key).await.unwrap();
946        assert_eq!(source_data, dest_data);
947        assert_eq!(source_data, test_data);
948
949        // Cleanup
950        let _ = cache.clear().await;
951    }
952
953    #[tokio::test]
954    async fn test_buffered_streaming() {
955        let cache = GenericCache::with_subdirectory("test_buffered")
956            .await
957            .unwrap();
958
959        // Create test data
960        let key = "buffered_test";
961        let test_data = vec![42u8; 16384]; // 16KB of data
962
963        cache.write(key, &test_data).await.unwrap();
964
965        // Test buffered streaming with custom buffer size
966        let mut output = Vec::new();
967        let bytes_read = cache
968            .read_streaming_buffered(key, &mut output, 4096)
969            .await
970            .unwrap();
971
972        assert_eq!(bytes_read, test_data.len() as u64);
973        assert_eq!(output, test_data);
974
975        // Cleanup
976        let _ = cache.clear().await;
977    }
978
979    #[tokio::test]
980    async fn test_large_file_streaming() {
981        let cache = GenericCache::with_subdirectory("test_large").await.unwrap();
982
983        // Create a larger test file (1MB)
984        let key = "large_test";
985        let chunk_size = 8192;
986        let num_chunks = 128; // 128 * 8192 = 1MB
987
988        // Write in chunks
989        let chunks: Vec<Result<Vec<u8>>> = (0..num_chunks)
990            .map(|i| Ok(vec![(i % 256) as u8; chunk_size]))
991            .collect();
992
993        let bytes_written = cache.write_chunked(key, chunks).await.unwrap();
994        assert_eq!(bytes_written, (chunk_size * num_chunks) as u64);
995
996        // Read back in chunks and verify
997        let mut total_read = 0u64;
998        let mut chunk_count = 0;
999
1000        cache
1001            .read_chunked(key, |chunk| {
1002                total_read += chunk.len() as u64;
1003                chunk_count += 1;
1004                Ok(())
1005            })
1006            .await
1007            .unwrap();
1008
1009        assert_eq!(total_read, bytes_written);
1010        assert!(chunk_count > 0); // Should be multiple chunks due to 8KB buffer
1011
1012        // Cleanup
1013        let _ = cache.clear().await;
1014    }
1015
1016    #[tokio::test]
1017    async fn test_lru_eviction_by_size() {
1018        // Create cache with 1KB limit
1019        let cache = GenericCache::with_config_and_path(
1020            get_cache_dir().unwrap().join("test_lru_eviction_by_size"),
1021            Some(1024),
1022            None,
1023            None,
1024        )
1025        .await
1026        .unwrap();
1027
1028        // Write 3 entries of 400 bytes each (1200 bytes total)
1029        let data_400b = vec![42u8; 400];
1030        cache.write("key1", &data_400b).await.unwrap();
1031        cache.write("key2", &data_400b).await.unwrap();
1032        cache.write("key3", &data_400b).await.unwrap(); // Should evict key1
1033
1034        // key1 should be evicted, key2 and key3 should exist
1035        assert!(!cache.exists("key1").await);
1036        assert!(cache.exists("key2").await);
1037        assert!(cache.exists("key3").await);
1038
1039        // Check that cache size is within limits
1040        assert!(cache.current_size() <= 1024);
1041        assert_eq!(cache.current_entries(), 2);
1042
1043        let _ = cache.clear().await;
1044    }
1045
1046    #[tokio::test]
1047    async fn test_lru_eviction_by_entries() {
1048        // Create cache with 2 entry limit
1049        let cache = GenericCache::with_config_and_path(
1050            get_cache_dir()
1051                .unwrap()
1052                .join("test_lru_eviction_by_entries"),
1053            None,
1054            Some(2),
1055            None,
1056        )
1057        .await
1058        .unwrap();
1059
1060        // Write 3 entries
1061        cache.write("key1", b"data1").await.unwrap();
1062        cache.write("key2", b"data2").await.unwrap();
1063        cache.write("key3", b"data3").await.unwrap(); // Should evict key1
1064
1065        // key1 should be evicted, key2 and key3 should exist
1066        assert!(!cache.exists("key1").await);
1067        assert!(cache.exists("key2").await);
1068        assert!(cache.exists("key3").await);
1069        assert_eq!(cache.current_entries(), 2);
1070
1071        let _ = cache.clear().await;
1072    }
1073
1074    #[tokio::test]
1075    async fn test_lru_access_order_update() {
1076        let cache = GenericCache::with_config_and_path(
1077            get_cache_dir()
1078                .unwrap()
1079                .join("test_lru_access_order_update"),
1080            None,
1081            Some(2),
1082            None,
1083        )
1084        .await
1085        .unwrap();
1086
1087        // Write 2 entries
1088        cache.write("key1", b"data1").await.unwrap();
1089        cache.write("key2", b"data2").await.unwrap();
1090
1091        // Access key1 to make it more recently used
1092        let _ = cache.read("key1").await.unwrap();
1093
1094        // Write key3, which should evict key2 (least recently used)
1095        cache.write("key3", b"data3").await.unwrap();
1096
1097        // key1 and key3 should exist, key2 should be evicted
1098        // Use file system check instead of exists() to avoid modifying access order
1099        let key1_path = cache.get_path("key1");
1100        let key2_path = cache.get_path("key2");
1101        let key3_path = cache.get_path("key3");
1102
1103        assert!(tokio::fs::metadata(key1_path).await.is_ok());
1104        assert!(tokio::fs::metadata(key2_path).await.is_err());
1105        assert!(tokio::fs::metadata(key3_path).await.is_ok());
1106
1107        let _ = cache.clear().await;
1108    }
1109
1110    #[tokio::test]
1111    async fn test_cache_statistics_integration() {
1112        let cache = GenericCache::with_config_and_path(
1113            get_cache_dir()
1114                .unwrap()
1115                .join("test_cache_statistics_integration"),
1116            None,
1117            Some(2),
1118            None,
1119        )
1120        .await
1121        .unwrap();
1122        let stats = cache.stats();
1123
1124        // Write some data
1125        cache.write("key1", b"data1").await.unwrap();
1126        assert_eq!(stats.bytes_written(), 5);
1127
1128        // Read data (cache hit)
1129        let _ = cache.read("key1").await.unwrap();
1130        assert_eq!(stats.hits(), 1);
1131        assert_eq!(stats.bytes_saved(), 5);
1132
1133        // Try to read non-existent key (this will be a filesystem miss, not cache miss)
1134        // Our cache doesn't track misses from read attempts, only from business logic
1135
1136        // Delete entry
1137        cache.delete("key1").await.unwrap();
1138
1139        // Verify statistics through snapshot
1140        let snapshot = stats.snapshot();
1141        assert_eq!(snapshot.write_operations, 1);
1142        assert_eq!(snapshot.read_operations, 1);
1143        assert_eq!(snapshot.delete_operations, 1);
1144
1145        let _ = cache.clear().await;
1146    }
1147
1148    #[tokio::test]
1149    async fn test_cache_warming() {
1150        let cache = GenericCache::with_subdirectory("test_warm").await.unwrap();
1151
1152        // Create some entries
1153        cache.write("key1", b"data1").await.unwrap();
1154        cache.write("key2", b"data2").await.unwrap();
1155        cache.write("key3", b"data3").await.unwrap();
1156
1157        // Clear access order (simulate cache restart)
1158        {
1159            let mut lru_state = cache.lru_state.lock();
1160            lru_state.access_order.clear();
1161        }
1162
1163        // Warm cache with specific keys
1164        let warm_keys = vec!["key2".to_string(), "key1".to_string()];
1165        cache.warm_cache(&warm_keys).await.unwrap();
1166
1167        // Check LRU order - key2 should be least recently used, key1 most recent
1168        let lru_keys = cache.get_lru_keys();
1169        assert!(lru_keys.contains(&"key1".to_string()));
1170        assert!(lru_keys.contains(&"key2".to_string()));
1171
1172        let mru_keys = cache.get_mru_keys(1);
1173        assert_eq!(mru_keys[0], "key1"); // Most recently accessed
1174
1175        let _ = cache.clear().await;
1176    }
1177
1178    #[tokio::test]
1179    async fn test_entry_metadata() {
1180        let cache = GenericCache::with_subdirectory("test_metadata")
1181            .await
1182            .unwrap();
1183
1184        // Write an entry
1185        cache.write("test_key", b"test_data").await.unwrap();
1186
1187        // Get entry info
1188        let (size, last_accessed, access_count) = cache.get_entry_info("test_key").unwrap();
1189        assert_eq!(size, 9); // "test_data" is 9 bytes
1190        assert!(last_accessed > 0); // Should have a timestamp
1191        assert_eq!(access_count, 1); // Written once (access count starts at 1)
1192
1193        // Access the entry
1194        let _ = cache.read("test_key").await.unwrap();
1195
1196        // Check updated metadata
1197        let (_, _, access_count) = cache.get_entry_info("test_key").unwrap();
1198        assert!(access_count >= 2); // Written once, read once (may be higher due to exists() calls)
1199
1200        let _ = cache.clear().await;
1201    }
1202
1203    #[tokio::test]
1204    async fn test_cache_size_tracking() {
1205        let cache = GenericCache::with_subdirectory("test_size").await.unwrap();
1206
1207        assert_eq!(cache.current_size(), 0);
1208        assert_eq!(cache.current_entries(), 0);
1209
1210        // Write entries
1211        cache.write("key1", b"hello").await.unwrap(); // 5 bytes
1212        assert_eq!(cache.current_size(), 5);
1213        assert_eq!(cache.current_entries(), 1);
1214
1215        cache.write("key2", b"world!").await.unwrap(); // 6 bytes
1216        assert_eq!(cache.current_size(), 11);
1217        assert_eq!(cache.current_entries(), 2);
1218
1219        // Overwrite existing entry
1220        cache.write("key1", b"hello world").await.unwrap(); // 11 bytes
1221        assert_eq!(cache.current_size(), 17); // 11 + 6 bytes
1222        assert_eq!(cache.current_entries(), 2);
1223
1224        // Delete entry
1225        cache.delete("key2").await.unwrap();
1226        assert_eq!(cache.current_size(), 11); // Only key1 remains
1227        assert_eq!(cache.current_entries(), 1);
1228
1229        let _ = cache.clear().await;
1230    }
1231
1232    #[tokio::test]
1233    async fn test_no_limits_cache() {
1234        // Test cache with no size or entry limits
1235        let cache = GenericCache::with_subdirectory("test_no_limits")
1236            .await
1237            .unwrap();
1238        let (max_size, max_entries) = cache.config();
1239
1240        assert_eq!(max_size, None);
1241        assert_eq!(max_entries, None);
1242
1243        // Clear any existing entries first
1244        let _ = cache.clear().await;
1245        assert_eq!(cache.current_entries(), 0);
1246
1247        // Should be able to write many entries without eviction
1248        for i in 0..100 {
1249            let key = format!("key_{i}");
1250            let data = format!("data_{i}");
1251            cache.write(&key, data.as_bytes()).await.unwrap();
1252        }
1253
1254        assert_eq!(cache.current_entries(), 100);
1255
1256        let _ = cache.clear().await;
1257    }
1258}