1use parking_lot::Mutex;
4use std::collections::{HashMap, VecDeque};
5use std::path::{Path, PathBuf};
6use std::sync::{Arc, atomic::AtomicU64, atomic::Ordering};
7use std::time::{SystemTime, UNIX_EPOCH};
8use tracing::{debug, trace, warn};
9
10use crate::{CacheStats, Result, ensure_dir, get_cache_dir};
11
12#[derive(Debug, Clone)]
14struct CacheEntryMetadata {
15 last_accessed: u64,
17 size: u64,
19 access_count: u64,
21}
22
23#[derive(Debug)]
25struct LruState {
26 metadata: HashMap<String, CacheEntryMetadata>,
28 access_order: VecDeque<String>,
30}
31
32pub struct GenericCache {
34 base_dir: PathBuf,
36 lru_state: Arc<Mutex<LruState>>,
38 max_size_bytes: Option<u64>,
40 max_entries: Option<usize>,
42 current_size: Arc<AtomicU64>,
44 stats: Arc<CacheStats>,
46}
47
48impl GenericCache {
49 pub async fn new() -> Result<Self> {
51 Self::with_config(None, None, None).await
52 }
53
54 pub async fn with_subdirectory(subdir: &str) -> Result<Self> {
56 let base_dir = get_cache_dir()?.join("generic").join(subdir);
57 Self::with_config_and_path(base_dir, None, None, None).await
58 }
59
60 pub async fn with_limits(
62 max_size_bytes: Option<u64>,
63 max_entries: Option<usize>,
64 ) -> Result<Self> {
65 Self::with_config(Some("generic"), max_size_bytes, max_entries).await
66 }
67
68 pub async fn with_config(
70 subdir: Option<&str>,
71 max_size_bytes: Option<u64>,
72 max_entries: Option<usize>,
73 ) -> Result<Self> {
74 let base_dir = match subdir {
75 Some(sub) => get_cache_dir()?.join(sub),
76 None => get_cache_dir()?.join("generic"),
77 };
78
79 Self::with_config_and_path(base_dir, max_size_bytes, max_entries, None).await
80 }
81
82 pub async fn with_config_and_path(
84 base_dir: PathBuf,
85 max_size_bytes: Option<u64>,
86 max_entries: Option<usize>,
87 stats: Option<Arc<CacheStats>>,
88 ) -> Result<Self> {
89 ensure_dir(&base_dir).await?;
90
91 let stats = stats.unwrap_or_else(|| Arc::new(CacheStats::new()));
92 let cache = Self {
93 base_dir: base_dir.clone(),
94 lru_state: Arc::new(Mutex::new(LruState {
95 metadata: HashMap::new(),
96 access_order: VecDeque::new(),
97 })),
98 max_size_bytes,
99 max_entries,
100 current_size: Arc::new(AtomicU64::new(0)),
101 stats,
102 };
103
104 cache.initialize_cache_state().await?;
106
107 debug!(
108 "Initialized generic cache at: {:?} (max_size: {:?} bytes, max_entries: {:?})",
109 base_dir, max_size_bytes, max_entries
110 );
111
112 Ok(cache)
113 }
114
115 async fn initialize_cache_state(&self) -> Result<()> {
117 let mut entries = tokio::fs::read_dir(&self.base_dir).await?;
119 let mut file_entries = Vec::new();
120 let mut total_size = 0u64;
121
122 while let Some(entry) = entries.next_entry().await? {
123 let path = entry.path();
124 if let Ok(metadata_fs) = tokio::fs::metadata(&path).await {
125 if metadata_fs.is_file() {
126 if let Some(key) = path.file_name().and_then(|n| n.to_str()) {
127 let modified_time = metadata_fs
128 .modified()
129 .unwrap_or(SystemTime::UNIX_EPOCH)
130 .duration_since(UNIX_EPOCH)
131 .unwrap_or_default()
132 .as_secs();
133
134 file_entries.push((key.to_string(), metadata_fs.len(), modified_time));
135 total_size += metadata_fs.len();
136 }
137 }
138 }
139 }
140
141 file_entries.sort_by_key(|(_, _, time)| *time);
143
144 {
146 let mut lru_state = self.lru_state.lock();
147 self.current_size.store(total_size, Ordering::Relaxed);
148
149 for (key, size, time) in file_entries {
150 let entry_metadata = CacheEntryMetadata {
151 last_accessed: time,
152 size,
153 access_count: 0,
154 };
155
156 lru_state.metadata.insert(key.clone(), entry_metadata);
157 lru_state.access_order.push_back(key);
158 }
159
160 debug!(
161 "Initialized cache with {} entries, total size: {} bytes",
162 lru_state.metadata.len(),
163 total_size
164 );
165 }
166
167 Ok(())
168 }
169
170 fn current_timestamp() -> u64 {
172 SystemTime::now()
173 .duration_since(UNIX_EPOCH)
174 .unwrap_or_default()
175 .as_secs()
176 }
177
178 fn update_access_order(&self, key: &str) {
180 let mut lru_state = self.lru_state.lock();
181
182 if lru_state.access_order.back() == Some(&key.to_string()) {
184 if let Some(entry) = lru_state.metadata.get_mut(key) {
186 entry.last_accessed = Self::current_timestamp();
187 entry.access_count += 1;
188 }
189 return;
190 }
191
192 lru_state.access_order.retain(|k| k != key);
194
195 lru_state.access_order.push_back(key.to_string());
197
198 if let Some(entry) = lru_state.metadata.get_mut(key) {
200 entry.last_accessed = Self::current_timestamp();
201 entry.access_count += 1;
202 }
203 }
204
205 async fn evict_if_needed(&self, new_entry_size: u64) -> Result<()> {
207 let max_size = self.max_size_bytes;
208 let max_entries = self.max_entries;
209
210 if max_size.is_none() && max_entries.is_none() {
211 return Ok(()); }
213
214 let current_size = self.current_size.load(Ordering::Relaxed);
215 let current_entries = self.lru_state.lock().metadata.len();
216
217 let size_exceeded = max_size
219 .map(|max| current_size + new_entry_size > max)
220 .unwrap_or(false);
221 let entries_exceeded = max_entries
222 .map(|max| current_entries >= max)
223 .unwrap_or(false);
224
225 if !size_exceeded && !entries_exceeded {
226 return Ok(());
227 }
228
229 debug!(
230 "Cache eviction needed: size_exceeded={}, entries_exceeded={}",
231 size_exceeded, entries_exceeded
232 );
233
234 let mut evicted_count = 0;
236 let mut evicted_bytes = 0;
237
238 loop {
239 let (key_to_evict, entry_size) = {
240 let lru_state = self.lru_state.lock();
241 let key = lru_state.access_order.front().cloned();
242 let size = key
243 .as_ref()
244 .and_then(|k| lru_state.metadata.get(k))
245 .map(|e| e.size)
246 .unwrap_or(0);
247 (key, size)
248 };
249
250 let Some(key) = key_to_evict else { break };
251
252 if let Err(e) = self.evict_entry(&key).await {
254 warn!("Failed to evict cache entry '{}': {}", key, e);
255 break;
256 }
257
258 evicted_count += 1;
259 evicted_bytes += entry_size;
260 self.stats.record_eviction(entry_size);
261
262 let new_current_size = self.current_size.load(Ordering::Relaxed);
264 let new_current_entries = self.lru_state.lock().metadata.len();
265
266 let size_ok = max_size
267 .map(|max| new_current_size + new_entry_size <= max)
268 .unwrap_or(true);
269 let entries_ok = max_entries
270 .map(|max| new_current_entries < max)
271 .unwrap_or(true);
272
273 if size_ok && entries_ok {
274 break;
275 }
276
277 if evicted_count > 1000 {
279 warn!(
280 "Evicted {} entries but still need more space, stopping",
281 evicted_count
282 );
283 break;
284 }
285 }
286
287 if evicted_count > 0 {
288 debug!(
289 "Evicted {} entries ({} bytes)",
290 evicted_count, evicted_bytes
291 );
292 }
293
294 Ok(())
295 }
296
297 async fn evict_entry(&self, key: &str) -> Result<()> {
299 let path = self.get_path(key);
300
301 if tokio::fs::metadata(&path).await.is_ok() {
303 tokio::fs::remove_file(&path).await?;
304 }
305
306 let mut lru_state = self.lru_state.lock();
308
309 if let Some(entry_metadata) = lru_state.metadata.remove(key) {
310 self.current_size
311 .fetch_sub(entry_metadata.size, Ordering::Relaxed);
312 }
313
314 lru_state.access_order.retain(|k| k != key);
315
316 trace!("Evicted cache entry: {}", key);
317 Ok(())
318 }
319
320 pub fn get_path(&self, key: &str) -> PathBuf {
322 self.base_dir.join(key)
323 }
324
325 pub fn stats(&self) -> Arc<CacheStats> {
327 Arc::clone(&self.stats)
328 }
329
330 pub fn current_size(&self) -> u64 {
332 self.current_size.load(Ordering::Relaxed)
333 }
334
335 pub fn current_entries(&self) -> usize {
337 self.lru_state.lock().metadata.len()
338 }
339
340 pub fn config(&self) -> (Option<u64>, Option<usize>) {
342 (self.max_size_bytes, self.max_entries)
343 }
344
345 pub async fn exists(&self, key: &str) -> bool {
347 let exists = tokio::fs::metadata(self.get_path(key)).await.is_ok();
348 if exists {
349 self.update_access_order(key);
351 }
352 exists
353 }
354
355 pub async fn write(&self, key: &str, data: &[u8]) -> Result<()> {
357 let data_size = data.len() as u64;
358
359 self.evict_if_needed(data_size).await?;
361
362 let path = self.get_path(key);
363
364 if let Some(parent) = path.parent() {
366 ensure_dir(parent).await?;
367 }
368
369 let existing_size = {
371 let lru_state = self.lru_state.lock();
372 lru_state.metadata.get(key).map(|e| e.size).unwrap_or(0)
373 };
374
375 trace!("Writing {} bytes to cache key: {}", data.len(), key);
376 tokio::fs::write(&path, data).await?;
377
378 {
380 let mut lru_state = self.lru_state.lock();
381
382 self.current_size
384 .fetch_sub(existing_size, Ordering::Relaxed);
385 self.current_size.fetch_add(data_size, Ordering::Relaxed);
386
387 let entry_metadata = CacheEntryMetadata {
389 last_accessed: Self::current_timestamp(),
390 size: data_size,
391 access_count: 0, };
393 lru_state.metadata.insert(key.to_string(), entry_metadata);
394 }
395
396 self.update_access_order(key);
398
399 self.stats.record_write(data_size);
401
402 Ok(())
403 }
404
405 pub async fn read(&self, key: &str) -> Result<Vec<u8>> {
407 let path = self.get_path(key);
408
409 trace!("Reading from cache key: {}", key);
410 let data = tokio::fs::read(&path).await?;
411
412 self.update_access_order(key);
414
415 self.stats.record_hit(data.len() as u64);
417
418 Ok(data)
419 }
420
421 pub async fn read_to_writer<W>(&self, key: &str, mut writer: W) -> Result<u64>
423 where
424 W: tokio::io::AsyncWrite + Unpin,
425 {
426 let path = self.get_path(key);
427
428 trace!("Streaming from cache key: {}", key);
429 let mut file = tokio::fs::File::open(&path).await?;
430
431 let bytes_copied = tokio::io::copy(&mut file, &mut writer).await?;
432
433 self.update_access_order(key);
435
436 self.stats.record_hit(bytes_copied);
438
439 Ok(bytes_copied)
440 }
441
442 pub async fn delete(&self, key: &str) -> Result<()> {
444 let path = self.get_path(key);
445
446 if tokio::fs::metadata(&path).await.is_ok() {
447 trace!("Deleting cache key: {}", key);
448 tokio::fs::remove_file(&path).await?;
449 }
450
451 {
453 let mut lru_state = self.lru_state.lock();
454
455 if let Some(entry_metadata) = lru_state.metadata.remove(key) {
456 self.current_size
457 .fetch_sub(entry_metadata.size, Ordering::Relaxed);
458 }
459
460 lru_state.access_order.retain(|k| k != key);
461 }
462
463 self.stats.record_delete();
465
466 Ok(())
467 }
468
469 pub async fn clear(&self) -> Result<()> {
471 debug!("Clearing all entries in generic cache");
472
473 let mut entries = tokio::fs::read_dir(&self.base_dir).await?;
474 while let Some(entry) = entries.next_entry().await? {
475 let path = entry.path();
476 if let Ok(metadata) = tokio::fs::metadata(&path).await {
477 if metadata.is_file() {
478 tokio::fs::remove_file(&path).await?;
479 }
480 }
481 }
482
483 {
485 let mut lru_state = self.lru_state.lock();
486
487 lru_state.metadata.clear();
488 lru_state.access_order.clear();
489 self.current_size.store(0, Ordering::Relaxed);
490 }
491
492 Ok(())
493 }
494
495 pub async fn warm_cache(&self, keys: &[String]) -> Result<()> {
497 debug!("Warming cache with {} keys", keys.len());
498
499 for key in keys {
500 if self.exists(key).await {
501 trace!("Warmed cache key: {}", key);
503 }
504 }
505
506 Ok(())
507 }
508
509 pub fn get_lru_keys(&self) -> Vec<String> {
511 let lru_state = self.lru_state.lock();
512 lru_state.access_order.iter().cloned().collect()
513 }
514
515 pub fn get_mru_keys(&self, limit: usize) -> Vec<String> {
517 let lru_state = self.lru_state.lock();
518 lru_state
519 .access_order
520 .iter()
521 .rev()
522 .take(limit)
523 .cloned()
524 .collect()
525 }
526
527 pub fn get_entry_info(&self, key: &str) -> Option<(u64, u64, u64)> {
529 let lru_state = self.lru_state.lock();
530 lru_state
531 .metadata
532 .get(key)
533 .map(|e| (e.size, e.last_accessed, e.access_count))
534 }
535
536 pub fn base_dir(&self) -> &Path {
538 &self.base_dir
539 }
540
541 pub async fn write_batch(&self, entries: &[(String, Vec<u8>)]) -> Result<()> {
545 use futures::future::try_join_all;
546
547 let futures = entries.iter().map(|(key, data)| self.write(key, data));
548
549 try_join_all(futures).await?;
550 Ok(())
551 }
552
553 pub async fn read_batch(&self, keys: &[String]) -> Vec<Result<Vec<u8>>> {
558 use futures::future::join_all;
559
560 let futures = keys.iter().map(|key| self.read(key));
561 join_all(futures).await
562 }
563
564 pub async fn delete_batch(&self, keys: &[String]) -> Result<()> {
568 use futures::future::try_join_all;
569
570 let futures = keys.iter().map(|key| self.delete(key));
571 try_join_all(futures).await?;
572 Ok(())
573 }
574
575 pub async fn exists_batch(&self, keys: &[String]) -> Vec<bool> {
579 use futures::future::join_all;
580
581 let futures = keys.iter().map(|key| self.exists(key));
582 join_all(futures).await
583 }
584
585 pub async fn read_streaming<W>(&self, key: &str, mut writer: W) -> Result<u64>
589 where
590 W: tokio::io::AsyncWrite + Unpin,
591 {
592 use tokio::io::AsyncWriteExt;
593
594 let path = self.get_path(key);
595 trace!("Streaming from cache key: {}", key);
596
597 let mut file = tokio::fs::File::open(&path).await?;
598 let bytes_copied = tokio::io::copy(&mut file, &mut writer).await?;
599 writer.flush().await?;
600
601 self.update_access_order(key);
603 self.stats.record_hit(bytes_copied);
604
605 Ok(bytes_copied)
606 }
607
608 pub async fn write_streaming<R>(&self, key: &str, mut reader: R) -> Result<u64>
612 where
613 R: tokio::io::AsyncRead + Unpin,
614 {
615 use tokio::io::AsyncWriteExt;
616
617 let existing_size = {
619 let lru_state = self.lru_state.lock();
620 lru_state.metadata.get(key).map(|e| e.size).unwrap_or(0)
621 };
622
623 let path = self.get_path(key);
624
625 if let Some(parent) = path.parent() {
627 ensure_dir(parent).await?;
628 }
629
630 trace!("Streaming to cache key: {}", key);
631
632 let mut file = tokio::fs::File::create(&path).await?;
633 let bytes_copied = tokio::io::copy(&mut reader, &mut file).await?;
634 file.flush().await?;
635
636 self.evict_if_needed(0).await?; {
641 let mut lru_state = self.lru_state.lock();
642
643 self.current_size
645 .fetch_sub(existing_size, Ordering::Relaxed);
646 self.current_size.fetch_add(bytes_copied, Ordering::Relaxed);
647
648 let entry_metadata = CacheEntryMetadata {
650 last_accessed: Self::current_timestamp(),
651 size: bytes_copied,
652 access_count: 0, };
654 lru_state.metadata.insert(key.to_string(), entry_metadata);
655 }
656
657 self.update_access_order(key);
659
660 self.stats.record_write(bytes_copied);
662
663 Ok(bytes_copied)
664 }
665
666 pub async fn read_chunked<F>(&self, key: &str, mut callback: F) -> Result<u64>
670 where
671 F: FnMut(&[u8]) -> Result<()>,
672 {
673 use tokio::io::AsyncReadExt;
674
675 let path = self.get_path(key);
676 trace!("Reading cache key in chunks: {}", key);
677
678 let mut file = tokio::fs::File::open(&path).await?;
679 let mut buffer = vec![0u8; 8192]; let mut total_bytes = 0u64;
681
682 loop {
683 let bytes_read = file.read(&mut buffer).await?;
684 if bytes_read == 0 {
685 break; }
687
688 callback(&buffer[..bytes_read])?;
689 total_bytes += bytes_read as u64;
690 }
691
692 Ok(total_bytes)
693 }
694
695 pub async fn write_chunked<I>(&self, key: &str, chunks: I) -> Result<u64>
699 where
700 I: IntoIterator<Item = Result<Vec<u8>>>,
701 {
702 use tokio::io::AsyncWriteExt;
703
704 let path = self.get_path(key);
705
706 if let Some(parent) = path.parent() {
708 ensure_dir(parent).await?;
709 }
710
711 trace!("Writing cache key in chunks: {}", key);
712
713 let mut file = tokio::fs::File::create(&path).await?;
714 let mut total_bytes = 0u64;
715
716 for chunk_result in chunks {
717 let chunk = chunk_result?;
718 file.write_all(&chunk).await?;
719 total_bytes += chunk.len() as u64;
720 }
721
722 file.flush().await?;
723 Ok(total_bytes)
724 }
725
726 pub async fn copy(&self, from_key: &str, to_key: &str) -> Result<u64> {
730 use tokio::io::AsyncWriteExt;
731
732 let from_path = self.get_path(from_key);
733 let to_path = self.get_path(to_key);
734
735 if let Some(parent) = to_path.parent() {
737 ensure_dir(parent).await?;
738 }
739
740 trace!("Copying cache from {} to {}", from_key, to_key);
741
742 let mut from_file = tokio::fs::File::open(&from_path).await?;
743 let mut to_file = tokio::fs::File::create(&to_path).await?;
744
745 let bytes_copied = tokio::io::copy(&mut from_file, &mut to_file).await?;
746 to_file.flush().await?;
747
748 Ok(bytes_copied)
749 }
750
751 pub async fn size(&self, key: &str) -> Result<u64> {
753 let path = self.get_path(key);
754 let metadata = tokio::fs::metadata(&path).await?;
755
756 self.update_access_order(key);
758
759 Ok(metadata.len())
760 }
761
762 pub async fn read_streaming_buffered<W>(
766 &self,
767 key: &str,
768 writer: W,
769 buffer_size: usize,
770 ) -> Result<u64>
771 where
772 W: tokio::io::AsyncWrite + Unpin,
773 {
774 use tokio::io::{AsyncWriteExt, BufWriter};
775
776 let path = self.get_path(key);
777 trace!(
778 "Streaming from cache key with {}B buffer: {}",
779 buffer_size, key
780 );
781
782 let file = tokio::fs::File::open(&path).await?;
783 let mut reader = tokio::io::BufReader::with_capacity(buffer_size, file);
784 let mut writer = BufWriter::with_capacity(buffer_size, writer);
785
786 let bytes_copied = tokio::io::copy(&mut reader, &mut writer).await?;
787 writer.flush().await?;
788
789 Ok(bytes_copied)
790 }
791}
792
793#[cfg(test)]
794mod tests {
795 use super::*;
796
797 #[tokio::test]
798 async fn test_generic_cache_operations() {
799 let cache = GenericCache::with_subdirectory("test").await.unwrap();
800
801 let key = "test_key";
803 let data = b"test data";
804
805 cache.write(key, data).await.unwrap();
806 assert!(cache.exists(key).await);
807
808 let read_data = cache.read(key).await.unwrap();
809 assert_eq!(read_data, data);
810
811 cache.delete(key).await.unwrap();
813 assert!(!cache.exists(key).await);
814
815 let _ = cache.clear().await;
817 }
818
819 #[tokio::test]
820 async fn test_batch_operations() {
821 let cache = GenericCache::with_subdirectory("test_batch").await.unwrap();
822
823 let entries = vec![
825 ("key1".to_string(), b"data1".to_vec()),
826 ("key2".to_string(), b"data2".to_vec()),
827 ("key3".to_string(), b"data3".to_vec()),
828 ];
829
830 cache.write_batch(&entries).await.unwrap();
831
832 let keys = vec![
834 "key1".to_string(),
835 "key2".to_string(),
836 "key3".to_string(),
837 "key4".to_string(),
838 ];
839 let exists = cache.exists_batch(&keys).await;
840 assert_eq!(exists, vec![true, true, true, false]);
841
842 let keys = vec!["key1".to_string(), "key2".to_string(), "key3".to_string()];
844 let results = cache.read_batch(&keys).await;
845 assert_eq!(results.len(), 3);
846 assert_eq!(results[0].as_ref().unwrap(), b"data1");
847 assert_eq!(results[1].as_ref().unwrap(), b"data2");
848 assert_eq!(results[2].as_ref().unwrap(), b"data3");
849
850 let keys = vec!["key1".to_string(), "key2".to_string()];
852 cache.delete_batch(&keys).await.unwrap();
853 assert!(!cache.exists("key1").await);
854 assert!(!cache.exists("key2").await);
855 assert!(cache.exists("key3").await);
856
857 let _ = cache.clear().await;
859 }
860
861 #[tokio::test]
862 async fn test_streaming_operations() {
863 let cache = GenericCache::with_subdirectory("test_streaming")
864 .await
865 .unwrap();
866
867 let key = "streaming_test";
869 let test_data = b"Hello, streaming world! This is a test of streaming I/O operations.";
870 let mut reader = std::io::Cursor::new(test_data);
871
872 let bytes_written = cache.write_streaming(key, &mut reader).await.unwrap();
873 assert_eq!(bytes_written, test_data.len() as u64);
874 assert!(cache.exists(key).await);
875
876 let mut output = Vec::new();
878 let bytes_read = cache.read_streaming(key, &mut output).await.unwrap();
879 assert_eq!(bytes_read, test_data.len() as u64);
880 assert_eq!(output, test_data);
881
882 let size = cache.size(key).await.unwrap();
884 assert_eq!(size, test_data.len() as u64);
885
886 let _ = cache.clear().await;
888 }
889
890 #[tokio::test]
891 async fn test_chunked_operations() {
892 let cache = GenericCache::with_subdirectory("test_chunked")
893 .await
894 .unwrap();
895
896 let key = "chunked_test";
898 let chunks = vec![
899 Ok(b"chunk1".to_vec()),
900 Ok(b"chunk2".to_vec()),
901 Ok(b"chunk3".to_vec()),
902 ];
903
904 let bytes_written = cache.write_chunked(key, chunks).await.unwrap();
905 assert_eq!(bytes_written, 18); assert!(cache.exists(key).await);
907
908 let mut collected_data = Vec::new();
910 let bytes_read = cache
911 .read_chunked(key, |chunk| {
912 collected_data.extend_from_slice(chunk);
913 Ok(())
914 })
915 .await
916 .unwrap();
917
918 assert_eq!(bytes_read, 18);
919 assert_eq!(collected_data, b"chunk1chunk2chunk3");
920
921 let _ = cache.clear().await;
923 }
924
925 #[tokio::test]
926 async fn test_copy_operation() {
927 let cache = GenericCache::with_subdirectory("test_copy").await.unwrap();
928
929 let source_key = "source";
931 let dest_key = "destination";
932 let test_data = b"This data will be copied between cache entries";
933
934 cache.write(source_key, test_data).await.unwrap();
935
936 let bytes_copied = cache.copy(source_key, dest_key).await.unwrap();
938 assert_eq!(bytes_copied, test_data.len() as u64);
939
940 assert!(cache.exists(source_key).await);
942 assert!(cache.exists(dest_key).await);
943
944 let source_data = cache.read(source_key).await.unwrap();
945 let dest_data = cache.read(dest_key).await.unwrap();
946 assert_eq!(source_data, dest_data);
947 assert_eq!(source_data, test_data);
948
949 let _ = cache.clear().await;
951 }
952
953 #[tokio::test]
954 async fn test_buffered_streaming() {
955 let cache = GenericCache::with_subdirectory("test_buffered")
956 .await
957 .unwrap();
958
959 let key = "buffered_test";
961 let test_data = vec![42u8; 16384]; cache.write(key, &test_data).await.unwrap();
964
965 let mut output = Vec::new();
967 let bytes_read = cache
968 .read_streaming_buffered(key, &mut output, 4096)
969 .await
970 .unwrap();
971
972 assert_eq!(bytes_read, test_data.len() as u64);
973 assert_eq!(output, test_data);
974
975 let _ = cache.clear().await;
977 }
978
979 #[tokio::test]
980 async fn test_large_file_streaming() {
981 let cache = GenericCache::with_subdirectory("test_large").await.unwrap();
982
983 let key = "large_test";
985 let chunk_size = 8192;
986 let num_chunks = 128; let chunks: Vec<Result<Vec<u8>>> = (0..num_chunks)
990 .map(|i| Ok(vec![(i % 256) as u8; chunk_size]))
991 .collect();
992
993 let bytes_written = cache.write_chunked(key, chunks).await.unwrap();
994 assert_eq!(bytes_written, (chunk_size * num_chunks) as u64);
995
996 let mut total_read = 0u64;
998 let mut chunk_count = 0;
999
1000 cache
1001 .read_chunked(key, |chunk| {
1002 total_read += chunk.len() as u64;
1003 chunk_count += 1;
1004 Ok(())
1005 })
1006 .await
1007 .unwrap();
1008
1009 assert_eq!(total_read, bytes_written);
1010 assert!(chunk_count > 0); let _ = cache.clear().await;
1014 }
1015
1016 #[tokio::test]
1017 async fn test_lru_eviction_by_size() {
1018 let cache = GenericCache::with_config_and_path(
1020 get_cache_dir().unwrap().join("test_lru_eviction_by_size"),
1021 Some(1024),
1022 None,
1023 None,
1024 )
1025 .await
1026 .unwrap();
1027
1028 let data_400b = vec![42u8; 400];
1030 cache.write("key1", &data_400b).await.unwrap();
1031 cache.write("key2", &data_400b).await.unwrap();
1032 cache.write("key3", &data_400b).await.unwrap(); assert!(!cache.exists("key1").await);
1036 assert!(cache.exists("key2").await);
1037 assert!(cache.exists("key3").await);
1038
1039 assert!(cache.current_size() <= 1024);
1041 assert_eq!(cache.current_entries(), 2);
1042
1043 let _ = cache.clear().await;
1044 }
1045
1046 #[tokio::test]
1047 async fn test_lru_eviction_by_entries() {
1048 let cache = GenericCache::with_config_and_path(
1050 get_cache_dir()
1051 .unwrap()
1052 .join("test_lru_eviction_by_entries"),
1053 None,
1054 Some(2),
1055 None,
1056 )
1057 .await
1058 .unwrap();
1059
1060 cache.write("key1", b"data1").await.unwrap();
1062 cache.write("key2", b"data2").await.unwrap();
1063 cache.write("key3", b"data3").await.unwrap(); assert!(!cache.exists("key1").await);
1067 assert!(cache.exists("key2").await);
1068 assert!(cache.exists("key3").await);
1069 assert_eq!(cache.current_entries(), 2);
1070
1071 let _ = cache.clear().await;
1072 }
1073
1074 #[tokio::test]
1075 async fn test_lru_access_order_update() {
1076 let cache = GenericCache::with_config_and_path(
1077 get_cache_dir()
1078 .unwrap()
1079 .join("test_lru_access_order_update"),
1080 None,
1081 Some(2),
1082 None,
1083 )
1084 .await
1085 .unwrap();
1086
1087 cache.write("key1", b"data1").await.unwrap();
1089 cache.write("key2", b"data2").await.unwrap();
1090
1091 let _ = cache.read("key1").await.unwrap();
1093
1094 cache.write("key3", b"data3").await.unwrap();
1096
1097 let key1_path = cache.get_path("key1");
1100 let key2_path = cache.get_path("key2");
1101 let key3_path = cache.get_path("key3");
1102
1103 assert!(tokio::fs::metadata(key1_path).await.is_ok());
1104 assert!(tokio::fs::metadata(key2_path).await.is_err());
1105 assert!(tokio::fs::metadata(key3_path).await.is_ok());
1106
1107 let _ = cache.clear().await;
1108 }
1109
1110 #[tokio::test]
1111 async fn test_cache_statistics_integration() {
1112 let cache = GenericCache::with_config_and_path(
1113 get_cache_dir()
1114 .unwrap()
1115 .join("test_cache_statistics_integration"),
1116 None,
1117 Some(2),
1118 None,
1119 )
1120 .await
1121 .unwrap();
1122 let stats = cache.stats();
1123
1124 cache.write("key1", b"data1").await.unwrap();
1126 assert_eq!(stats.bytes_written(), 5);
1127
1128 let _ = cache.read("key1").await.unwrap();
1130 assert_eq!(stats.hits(), 1);
1131 assert_eq!(stats.bytes_saved(), 5);
1132
1133 cache.delete("key1").await.unwrap();
1138
1139 let snapshot = stats.snapshot();
1141 assert_eq!(snapshot.write_operations, 1);
1142 assert_eq!(snapshot.read_operations, 1);
1143 assert_eq!(snapshot.delete_operations, 1);
1144
1145 let _ = cache.clear().await;
1146 }
1147
1148 #[tokio::test]
1149 async fn test_cache_warming() {
1150 let cache = GenericCache::with_subdirectory("test_warm").await.unwrap();
1151
1152 cache.write("key1", b"data1").await.unwrap();
1154 cache.write("key2", b"data2").await.unwrap();
1155 cache.write("key3", b"data3").await.unwrap();
1156
1157 {
1159 let mut lru_state = cache.lru_state.lock();
1160 lru_state.access_order.clear();
1161 }
1162
1163 let warm_keys = vec!["key2".to_string(), "key1".to_string()];
1165 cache.warm_cache(&warm_keys).await.unwrap();
1166
1167 let lru_keys = cache.get_lru_keys();
1169 assert!(lru_keys.contains(&"key1".to_string()));
1170 assert!(lru_keys.contains(&"key2".to_string()));
1171
1172 let mru_keys = cache.get_mru_keys(1);
1173 assert_eq!(mru_keys[0], "key1"); let _ = cache.clear().await;
1176 }
1177
1178 #[tokio::test]
1179 async fn test_entry_metadata() {
1180 let cache = GenericCache::with_subdirectory("test_metadata")
1181 .await
1182 .unwrap();
1183
1184 cache.write("test_key", b"test_data").await.unwrap();
1186
1187 let (size, last_accessed, access_count) = cache.get_entry_info("test_key").unwrap();
1189 assert_eq!(size, 9); assert!(last_accessed > 0); assert_eq!(access_count, 1); let _ = cache.read("test_key").await.unwrap();
1195
1196 let (_, _, access_count) = cache.get_entry_info("test_key").unwrap();
1198 assert!(access_count >= 2); let _ = cache.clear().await;
1201 }
1202
1203 #[tokio::test]
1204 async fn test_cache_size_tracking() {
1205 let cache = GenericCache::with_subdirectory("test_size").await.unwrap();
1206
1207 assert_eq!(cache.current_size(), 0);
1208 assert_eq!(cache.current_entries(), 0);
1209
1210 cache.write("key1", b"hello").await.unwrap(); assert_eq!(cache.current_size(), 5);
1213 assert_eq!(cache.current_entries(), 1);
1214
1215 cache.write("key2", b"world!").await.unwrap(); assert_eq!(cache.current_size(), 11);
1217 assert_eq!(cache.current_entries(), 2);
1218
1219 cache.write("key1", b"hello world").await.unwrap(); assert_eq!(cache.current_size(), 17); assert_eq!(cache.current_entries(), 2);
1223
1224 cache.delete("key2").await.unwrap();
1226 assert_eq!(cache.current_size(), 11); assert_eq!(cache.current_entries(), 1);
1228
1229 let _ = cache.clear().await;
1230 }
1231
1232 #[tokio::test]
1233 async fn test_no_limits_cache() {
1234 let cache = GenericCache::with_subdirectory("test_no_limits")
1236 .await
1237 .unwrap();
1238 let (max_size, max_entries) = cache.config();
1239
1240 assert_eq!(max_size, None);
1241 assert_eq!(max_entries, None);
1242
1243 let _ = cache.clear().await;
1245 assert_eq!(cache.current_entries(), 0);
1246
1247 for i in 0..100 {
1249 let key = format!("key_{i}");
1250 let data = format!("data_{i}");
1251 cache.write(&key, data.as_bytes()).await.unwrap();
1252 }
1253
1254 assert_eq!(cache.current_entries(), 100);
1255
1256 let _ = cache.clear().await;
1257 }
1258}