1use std::path::{Path, PathBuf};
4use tracing::{debug, trace};
5
6use crate::{Result, ensure_dir, get_cache_dir};
7
8pub struct GenericCache {
10 base_dir: PathBuf,
12}
13
14impl GenericCache {
15 pub async fn new() -> Result<Self> {
17 let base_dir = get_cache_dir()?.join("generic");
18 ensure_dir(&base_dir).await?;
19
20 debug!("Initialized generic cache at: {:?}", base_dir);
21
22 Ok(Self { base_dir })
23 }
24
25 pub async fn with_subdirectory(subdir: &str) -> Result<Self> {
27 let base_dir = get_cache_dir()?.join("generic").join(subdir);
28 ensure_dir(&base_dir).await?;
29
30 debug!("Initialized generic cache at: {:?}", base_dir);
31
32 Ok(Self { base_dir })
33 }
34
35 pub fn get_path(&self, key: &str) -> PathBuf {
37 self.base_dir.join(key)
38 }
39
40 pub async fn exists(&self, key: &str) -> bool {
42 tokio::fs::metadata(self.get_path(key)).await.is_ok()
43 }
44
45 pub async fn write(&self, key: &str, data: &[u8]) -> Result<()> {
47 let path = self.get_path(key);
48
49 if let Some(parent) = path.parent() {
51 ensure_dir(parent).await?;
52 }
53
54 trace!("Writing {} bytes to cache key: {}", data.len(), key);
55 tokio::fs::write(&path, data).await?;
56
57 Ok(())
58 }
59
60 pub async fn read(&self, key: &str) -> Result<Vec<u8>> {
62 let path = self.get_path(key);
63
64 trace!("Reading from cache key: {}", key);
65 let data = tokio::fs::read(&path).await?;
66
67 Ok(data)
68 }
69
70 pub async fn delete(&self, key: &str) -> Result<()> {
72 let path = self.get_path(key);
73
74 if tokio::fs::metadata(&path).await.is_ok() {
75 trace!("Deleting cache key: {}", key);
76 tokio::fs::remove_file(&path).await?;
77 }
78
79 Ok(())
80 }
81
82 pub async fn clear(&self) -> Result<()> {
84 debug!("Clearing all entries in generic cache");
85
86 let mut entries = tokio::fs::read_dir(&self.base_dir).await?;
87 while let Some(entry) = entries.next_entry().await? {
88 let path = entry.path();
89 if let Ok(metadata) = tokio::fs::metadata(&path).await {
90 if metadata.is_file() {
91 tokio::fs::remove_file(&path).await?;
92 }
93 }
94 }
95
96 Ok(())
97 }
98
99 pub fn base_dir(&self) -> &Path {
101 &self.base_dir
102 }
103
104 pub async fn write_batch(&self, entries: &[(String, Vec<u8>)]) -> Result<()> {
108 use futures::future::try_join_all;
109
110 let futures = entries.iter().map(|(key, data)| self.write(key, data));
111
112 try_join_all(futures).await?;
113 Ok(())
114 }
115
116 pub async fn read_batch(&self, keys: &[String]) -> Vec<Result<Vec<u8>>> {
121 use futures::future::join_all;
122
123 let futures = keys.iter().map(|key| self.read(key));
124 join_all(futures).await
125 }
126
127 pub async fn delete_batch(&self, keys: &[String]) -> Result<()> {
131 use futures::future::try_join_all;
132
133 let futures = keys.iter().map(|key| self.delete(key));
134 try_join_all(futures).await?;
135 Ok(())
136 }
137
138 pub async fn exists_batch(&self, keys: &[String]) -> Vec<bool> {
142 use futures::future::join_all;
143
144 let futures = keys.iter().map(|key| self.exists(key));
145 join_all(futures).await
146 }
147
148 pub async fn read_streaming<W>(&self, key: &str, mut writer: W) -> Result<u64>
152 where
153 W: tokio::io::AsyncWrite + Unpin,
154 {
155 use tokio::io::AsyncWriteExt;
156
157 let path = self.get_path(key);
158 trace!("Streaming from cache key: {}", key);
159
160 let mut file = tokio::fs::File::open(&path).await?;
161 let bytes_copied = tokio::io::copy(&mut file, &mut writer).await?;
162 writer.flush().await?;
163
164 Ok(bytes_copied)
165 }
166
167 pub async fn write_streaming<R>(&self, key: &str, mut reader: R) -> Result<u64>
171 where
172 R: tokio::io::AsyncRead + Unpin,
173 {
174 use tokio::io::AsyncWriteExt;
175
176 let path = self.get_path(key);
177
178 if let Some(parent) = path.parent() {
180 ensure_dir(parent).await?;
181 }
182
183 trace!("Streaming to cache key: {}", key);
184
185 let mut file = tokio::fs::File::create(&path).await?;
186 let bytes_copied = tokio::io::copy(&mut reader, &mut file).await?;
187 file.flush().await?;
188
189 Ok(bytes_copied)
190 }
191
192 pub async fn read_chunked<F>(&self, key: &str, mut callback: F) -> Result<u64>
196 where
197 F: FnMut(&[u8]) -> Result<()>,
198 {
199 use tokio::io::AsyncReadExt;
200
201 let path = self.get_path(key);
202 trace!("Reading cache key in chunks: {}", key);
203
204 let mut file = tokio::fs::File::open(&path).await?;
205 let mut buffer = vec![0u8; 8192]; let mut total_bytes = 0u64;
207
208 loop {
209 let bytes_read = file.read(&mut buffer).await?;
210 if bytes_read == 0 {
211 break; }
213
214 callback(&buffer[..bytes_read])?;
215 total_bytes += bytes_read as u64;
216 }
217
218 Ok(total_bytes)
219 }
220
221 pub async fn write_chunked<I>(&self, key: &str, chunks: I) -> Result<u64>
225 where
226 I: IntoIterator<Item = Result<Vec<u8>>>,
227 {
228 use tokio::io::AsyncWriteExt;
229
230 let path = self.get_path(key);
231
232 if let Some(parent) = path.parent() {
234 ensure_dir(parent).await?;
235 }
236
237 trace!("Writing cache key in chunks: {}", key);
238
239 let mut file = tokio::fs::File::create(&path).await?;
240 let mut total_bytes = 0u64;
241
242 for chunk_result in chunks {
243 let chunk = chunk_result?;
244 file.write_all(&chunk).await?;
245 total_bytes += chunk.len() as u64;
246 }
247
248 file.flush().await?;
249 Ok(total_bytes)
250 }
251
252 pub async fn copy(&self, from_key: &str, to_key: &str) -> Result<u64> {
256 use tokio::io::AsyncWriteExt;
257
258 let from_path = self.get_path(from_key);
259 let to_path = self.get_path(to_key);
260
261 if let Some(parent) = to_path.parent() {
263 ensure_dir(parent).await?;
264 }
265
266 trace!("Copying cache from {} to {}", from_key, to_key);
267
268 let mut from_file = tokio::fs::File::open(&from_path).await?;
269 let mut to_file = tokio::fs::File::create(&to_path).await?;
270
271 let bytes_copied = tokio::io::copy(&mut from_file, &mut to_file).await?;
272 to_file.flush().await?;
273
274 Ok(bytes_copied)
275 }
276
277 pub async fn size(&self, key: &str) -> Result<u64> {
279 let path = self.get_path(key);
280 let metadata = tokio::fs::metadata(&path).await?;
281 Ok(metadata.len())
282 }
283
284 pub async fn read_streaming_buffered<W>(
288 &self,
289 key: &str,
290 writer: W,
291 buffer_size: usize,
292 ) -> Result<u64>
293 where
294 W: tokio::io::AsyncWrite + Unpin,
295 {
296 use tokio::io::{AsyncWriteExt, BufWriter};
297
298 let path = self.get_path(key);
299 trace!(
300 "Streaming from cache key with {}B buffer: {}",
301 buffer_size, key
302 );
303
304 let file = tokio::fs::File::open(&path).await?;
305 let mut reader = tokio::io::BufReader::with_capacity(buffer_size, file);
306 let mut writer = BufWriter::with_capacity(buffer_size, writer);
307
308 let bytes_copied = tokio::io::copy(&mut reader, &mut writer).await?;
309 writer.flush().await?;
310
311 Ok(bytes_copied)
312 }
313}
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318
319 #[tokio::test]
320 async fn test_generic_cache_operations() {
321 let cache = GenericCache::with_subdirectory("test").await.unwrap();
322
323 let key = "test_key";
325 let data = b"test data";
326
327 cache.write(key, data).await.unwrap();
328 assert!(cache.exists(key).await);
329
330 let read_data = cache.read(key).await.unwrap();
331 assert_eq!(read_data, data);
332
333 cache.delete(key).await.unwrap();
335 assert!(!cache.exists(key).await);
336
337 let _ = cache.clear().await;
339 }
340
341 #[tokio::test]
342 async fn test_batch_operations() {
343 let cache = GenericCache::with_subdirectory("test_batch").await.unwrap();
344
345 let entries = vec![
347 ("key1".to_string(), b"data1".to_vec()),
348 ("key2".to_string(), b"data2".to_vec()),
349 ("key3".to_string(), b"data3".to_vec()),
350 ];
351
352 cache.write_batch(&entries).await.unwrap();
353
354 let keys = vec![
356 "key1".to_string(),
357 "key2".to_string(),
358 "key3".to_string(),
359 "key4".to_string(),
360 ];
361 let exists = cache.exists_batch(&keys).await;
362 assert_eq!(exists, vec![true, true, true, false]);
363
364 let keys = vec!["key1".to_string(), "key2".to_string(), "key3".to_string()];
366 let results = cache.read_batch(&keys).await;
367 assert_eq!(results.len(), 3);
368 assert_eq!(results[0].as_ref().unwrap(), b"data1");
369 assert_eq!(results[1].as_ref().unwrap(), b"data2");
370 assert_eq!(results[2].as_ref().unwrap(), b"data3");
371
372 let keys = vec!["key1".to_string(), "key2".to_string()];
374 cache.delete_batch(&keys).await.unwrap();
375 assert!(!cache.exists("key1").await);
376 assert!(!cache.exists("key2").await);
377 assert!(cache.exists("key3").await);
378
379 let _ = cache.clear().await;
381 }
382
383 #[tokio::test]
384 async fn test_streaming_operations() {
385 let cache = GenericCache::with_subdirectory("test_streaming")
386 .await
387 .unwrap();
388
389 let key = "streaming_test";
391 let test_data = b"Hello, streaming world! This is a test of streaming I/O operations.";
392 let mut reader = std::io::Cursor::new(test_data);
393
394 let bytes_written = cache.write_streaming(key, &mut reader).await.unwrap();
395 assert_eq!(bytes_written, test_data.len() as u64);
396 assert!(cache.exists(key).await);
397
398 let mut output = Vec::new();
400 let bytes_read = cache.read_streaming(key, &mut output).await.unwrap();
401 assert_eq!(bytes_read, test_data.len() as u64);
402 assert_eq!(output, test_data);
403
404 let size = cache.size(key).await.unwrap();
406 assert_eq!(size, test_data.len() as u64);
407
408 let _ = cache.clear().await;
410 }
411
412 #[tokio::test]
413 async fn test_chunked_operations() {
414 let cache = GenericCache::with_subdirectory("test_chunked")
415 .await
416 .unwrap();
417
418 let key = "chunked_test";
420 let chunks = vec![
421 Ok(b"chunk1".to_vec()),
422 Ok(b"chunk2".to_vec()),
423 Ok(b"chunk3".to_vec()),
424 ];
425
426 let bytes_written = cache.write_chunked(key, chunks).await.unwrap();
427 assert_eq!(bytes_written, 18); assert!(cache.exists(key).await);
429
430 let mut collected_data = Vec::new();
432 let bytes_read = cache
433 .read_chunked(key, |chunk| {
434 collected_data.extend_from_slice(chunk);
435 Ok(())
436 })
437 .await
438 .unwrap();
439
440 assert_eq!(bytes_read, 18);
441 assert_eq!(collected_data, b"chunk1chunk2chunk3");
442
443 let _ = cache.clear().await;
445 }
446
447 #[tokio::test]
448 async fn test_copy_operation() {
449 let cache = GenericCache::with_subdirectory("test_copy").await.unwrap();
450
451 let source_key = "source";
453 let dest_key = "destination";
454 let test_data = b"This data will be copied between cache entries";
455
456 cache.write(source_key, test_data).await.unwrap();
457
458 let bytes_copied = cache.copy(source_key, dest_key).await.unwrap();
460 assert_eq!(bytes_copied, test_data.len() as u64);
461
462 assert!(cache.exists(source_key).await);
464 assert!(cache.exists(dest_key).await);
465
466 let source_data = cache.read(source_key).await.unwrap();
467 let dest_data = cache.read(dest_key).await.unwrap();
468 assert_eq!(source_data, dest_data);
469 assert_eq!(source_data, test_data);
470
471 let _ = cache.clear().await;
473 }
474
475 #[tokio::test]
476 async fn test_buffered_streaming() {
477 let cache = GenericCache::with_subdirectory("test_buffered")
478 .await
479 .unwrap();
480
481 let key = "buffered_test";
483 let test_data = vec![42u8; 16384]; cache.write(key, &test_data).await.unwrap();
486
487 let mut output = Vec::new();
489 let bytes_read = cache
490 .read_streaming_buffered(key, &mut output, 4096)
491 .await
492 .unwrap();
493
494 assert_eq!(bytes_read, test_data.len() as u64);
495 assert_eq!(output, test_data);
496
497 let _ = cache.clear().await;
499 }
500
501 #[tokio::test]
502 async fn test_large_file_streaming() {
503 let cache = GenericCache::with_subdirectory("test_large").await.unwrap();
504
505 let key = "large_test";
507 let chunk_size = 8192;
508 let num_chunks = 128; let chunks: Vec<Result<Vec<u8>>> = (0..num_chunks)
512 .map(|i| Ok(vec![(i % 256) as u8; chunk_size]))
513 .collect();
514
515 let bytes_written = cache.write_chunked(key, chunks).await.unwrap();
516 assert_eq!(bytes_written, (chunk_size * num_chunks) as u64);
517
518 let mut total_read = 0u64;
520 let mut chunk_count = 0;
521
522 cache
523 .read_chunked(key, |chunk| {
524 total_read += chunk.len() as u64;
525 chunk_count += 1;
526 Ok(())
527 })
528 .await
529 .unwrap();
530
531 assert_eq!(total_read, bytes_written);
532 assert!(chunk_count > 0); let _ = cache.clear().await;
536 }
537}