1use crate::RecordId;
4use crate::blob_store::BlobStoreStats;
5use crate::error::{Result, ZiporaError};
6use std::collections::HashMap;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::Instant;
11use tokio::fs::{File, OpenOptions};
12use tokio::io::{AsyncReadExt, AsyncWriteExt};
13use tokio::sync::{Mutex, RwLock};
14
15#[allow(missing_docs)] #[async_trait::async_trait]
23pub trait AsyncBlobStore: Send + Sync {
24 async fn put(&self, data: &[u8]) -> Result<RecordId>;
26
27 async fn get(&self, id: RecordId) -> Result<Vec<u8>>;
29
30 async fn remove(&self, id: RecordId) -> Result<()>;
32
33 async fn contains(&self, id: RecordId) -> bool;
35
36 async fn size(&self, id: RecordId) -> Result<Option<usize>>;
38
39 async fn len(&self) -> usize;
41
42 async fn is_empty(&self) -> bool {
44 self.len().await == 0
45 }
46
47 async fn flush(&self) -> Result<()>;
49
50 async fn stats(&self) -> BlobStoreStats;
52
53 async fn put_batch(&self, data: Vec<&[u8]>) -> Result<Vec<RecordId>> {
55 let mut ids = Vec::with_capacity(data.len());
56 for item in data {
57 ids.push(self.put(item).await?);
58 }
59 Ok(ids)
60 }
61
62 async fn get_batch(&self, ids: Vec<RecordId>) -> Result<Vec<Vec<u8>>> {
63 let mut results = Vec::with_capacity(ids.len());
64 for id in ids {
65 results.push(self.get(id).await?);
66 }
67 Ok(results)
68 }
69}
70
71pub struct AsyncMemoryBlobStore {
73 data: Arc<RwLock<HashMap<RecordId, Vec<u8>>>>,
74 next_id: AtomicU64,
75 stats: Arc<RwLock<BlobStoreStats>>,
76}
77
78impl AsyncMemoryBlobStore {
79 pub fn new() -> Self {
81 Self {
82 data: Arc::new(RwLock::new(HashMap::new())),
83 next_id: AtomicU64::new(1),
84 stats: Arc::new(RwLock::new(BlobStoreStats::default())),
85 }
86 }
87
88 pub fn with_capacity(capacity: usize) -> Self {
90 Self {
91 data: Arc::new(RwLock::new(HashMap::with_capacity(capacity))),
92 next_id: AtomicU64::new(1),
93 stats: Arc::new(RwLock::new(BlobStoreStats::default())),
94 }
95 }
96}
97
98impl Default for AsyncMemoryBlobStore {
99 fn default() -> Self {
100 Self::new()
101 }
102}
103
104#[async_trait::async_trait]
105impl AsyncBlobStore for AsyncMemoryBlobStore {
106 async fn put(&self, data: &[u8]) -> Result<RecordId> {
107 let _start_time = Instant::now();
108 let id = self.next_id.fetch_add(1, Ordering::Relaxed) as RecordId;
109
110 {
111 let mut store = self.data.write().await;
112 store.insert(id, data.to_vec());
113 }
114
115 let mut stats = self.stats.write().await;
116 stats.put_count += 1;
117
118 Ok(id)
119 }
120
121 async fn get(&self, id: RecordId) -> Result<Vec<u8>> {
122 let _start_time = Instant::now();
123
124 let result = {
125 let store = self.data.read().await;
126 store.get(&id).cloned()
127 };
128
129 let mut stats = self.stats.write().await;
130 stats.get_count += 1;
131
132 match result {
133 Some(data) => Ok(data),
134 None => Err(ZiporaError::invalid_data(&format!(
135 "record not found: {}",
136 id
137 ))),
138 }
139 }
140
141 async fn remove(&self, id: RecordId) -> Result<()> {
142 let _start_time = Instant::now();
143
144 let removed = {
145 let mut store = self.data.write().await;
146 store.remove(&id)
147 };
148
149 let _stats = self.stats.write().await;
150
151 if removed.is_some() {
152 Ok(())
153 } else {
154 Err(ZiporaError::invalid_data(&format!(
155 "record not found: {}",
156 id
157 )))
158 }
159 }
160
161 async fn contains(&self, id: RecordId) -> bool {
162 let store = self.data.read().await;
163 store.contains_key(&id)
164 }
165
166 async fn size(&self, id: RecordId) -> Result<Option<usize>> {
167 let store = self.data.read().await;
168 Ok(store.get(&id).map(|data| data.len()))
169 }
170
171 async fn len(&self) -> usize {
172 let store = self.data.read().await;
173 store.len()
174 }
175
176 async fn flush(&self) -> Result<()> {
177 Ok(())
179 }
180
181 async fn stats(&self) -> BlobStoreStats {
182 let stats = self.stats.read().await;
183 stats.clone()
184 }
185
186 async fn put_batch(&self, data: Vec<&[u8]>) -> Result<Vec<RecordId>> {
187 let _start_time = Instant::now();
188 let mut ids = Vec::with_capacity(data.len());
189 let mut _total_bytes = 0;
190
191 {
192 let mut store = self.data.write().await;
193 for item in data {
194 let id = self.next_id.fetch_add(1, Ordering::Relaxed) as RecordId;
195 store.insert(id, item.to_vec());
196 ids.push(id);
197 _total_bytes += item.len();
198 }
199 }
200
201 let mut stats = self.stats.write().await;
202 stats.put_count += ids.len() as u64;
203
204 Ok(ids)
205 }
206
207 async fn get_batch(&self, ids: Vec<RecordId>) -> Result<Vec<Vec<u8>>> {
208 let _start_time = Instant::now();
209 let mut results = Vec::with_capacity(ids.len());
210 let mut _total_bytes = 0;
211 let mut _misses = 0;
212
213 {
214 let store = self.data.read().await;
215 for id in ids {
216 match store.get(&id) {
217 Some(data) => {
218 _total_bytes += data.len();
219 results.push(data.clone());
220 }
221 None => {
222 _misses += 1;
223 return Err(ZiporaError::invalid_data(&format!(
224 "record not found: {}",
225 id
226 )));
227 }
228 }
229 }
230 }
231
232 let mut stats = self.stats.write().await;
233 stats.get_count += results.len() as u64;
234
235 Ok(results)
236 }
237}
238
239pub struct AsyncFileStore {
241 base_path: PathBuf,
242 file_handles: Arc<Mutex<HashMap<RecordId, File>>>,
243 metadata: Arc<RwLock<HashMap<RecordId, FileMetadata>>>,
244 next_id: AtomicU64,
245 stats: Arc<RwLock<BlobStoreStats>>,
246}
247
248#[derive(Clone)]
249#[allow(dead_code)]
250struct FileMetadata {
251 size: usize,
252 file_path: PathBuf,
253 created_at: Instant,
254}
255
256impl AsyncFileStore {
257 pub async fn new<P: AsRef<Path>>(base_path: P) -> Result<Self> {
259 let base_path = base_path.as_ref().to_path_buf();
260
261 tokio::fs::create_dir_all(&base_path)
263 .await
264 .map_err(|e| ZiporaError::io_error(&format!("failed to create directory: {}", e)))?;
265
266 Ok(Self {
267 base_path,
268 file_handles: Arc::new(Mutex::new(HashMap::new())),
269 metadata: Arc::new(RwLock::new(HashMap::new())),
270 next_id: AtomicU64::new(1),
271 stats: Arc::new(RwLock::new(BlobStoreStats::default())),
272 })
273 }
274
275 fn get_file_path(&self, id: RecordId) -> PathBuf {
277 self.base_path.join(format!("blob_{:016x}.dat", id))
278 }
279}
280
281#[async_trait::async_trait]
282impl AsyncBlobStore for AsyncFileStore {
283 async fn put(&self, data: &[u8]) -> Result<RecordId> {
284 let _start_time = Instant::now();
285 let id = self.next_id.fetch_add(1, Ordering::Relaxed) as RecordId;
286 let file_path = self.get_file_path(id);
287
288 let mut file = OpenOptions::new()
290 .create(true)
291 .write(true)
292 .truncate(true)
293 .open(&file_path)
294 .await
295 .map_err(|e| ZiporaError::io_error(&format!("failed to create file: {}", e)))?;
296
297 file.write_all(data)
298 .await
299 .map_err(|e| ZiporaError::io_error(&format!("failed to write data: {}", e)))?;
300
301 file.flush()
302 .await
303 .map_err(|e| ZiporaError::io_error(&format!("failed to flush file: {}", e)))?;
304
305 {
307 let mut metadata = self.metadata.write().await;
308 metadata.insert(
309 id,
310 FileMetadata {
311 size: data.len(),
312 file_path: file_path.clone(),
313 created_at: Instant::now(),
314 },
315 );
316 }
317
318 {
320 let mut stats = self.stats.write().await;
321 stats.put_count += 1;
322 }
323
324 Ok(id)
325 }
326
327 async fn get(&self, id: RecordId) -> Result<Vec<u8>> {
328 let _start_time = Instant::now();
329 let file_path = self.get_file_path(id);
330
331 let metadata = {
333 let metadata = self.metadata.read().await;
334 metadata.get(&id).cloned()
335 };
336
337 let size = match metadata {
338 Some(meta) => meta.size,
339 None => {
340 let mut stats = self.stats.write().await;
341 stats.get_count += 1;
342
343 return Err(ZiporaError::invalid_data(&format!(
344 "record not found: {}",
345 id
346 )));
347 }
348 };
349
350 let mut file = File::open(&file_path)
352 .await
353 .map_err(|e| ZiporaError::io_error(&format!("failed to open file: {}", e)))?;
354
355 let mut data = vec![0u8; size];
356 file.read_exact(&mut data)
357 .await
358 .map_err(|e| ZiporaError::io_error(&format!("failed to read data: {}", e)))?;
359
360 {
362 let mut stats = self.stats.write().await;
363 stats.get_count += 1;
364 }
365
366 Ok(data)
367 }
368
369 async fn remove(&self, id: RecordId) -> Result<()> {
370 let _start_time = Instant::now();
371 let file_path = self.get_file_path(id);
372
373 let existed = {
375 let mut metadata = self.metadata.write().await;
376 metadata.remove(&id).is_some()
377 };
378
379 if !existed {
380 return Err(ZiporaError::invalid_data(&format!(
381 "record not found: {}",
382 id
383 )));
384 }
385
386 tokio::fs::remove_file(&file_path)
388 .await
389 .map_err(|e| ZiporaError::io_error(&format!("failed to remove file: {}", e)))?;
390
391 {
393 let _stats = self.stats.write().await;
394 }
395
396 Ok(())
397 }
398
399 async fn contains(&self, id: RecordId) -> bool {
400 let metadata = self.metadata.read().await;
401 metadata.contains_key(&id)
402 }
403
404 async fn size(&self, id: RecordId) -> Result<Option<usize>> {
405 let metadata = self.metadata.read().await;
406 Ok(metadata.get(&id).map(|meta| meta.size))
407 }
408
409 async fn len(&self) -> usize {
410 let metadata = self.metadata.read().await;
411 metadata.len()
412 }
413
414 async fn flush(&self) -> Result<()> {
415 let mut handles = self.file_handles.lock().await;
417 for file in handles.values_mut() {
418 file.flush()
419 .await
420 .map_err(|e| ZiporaError::io_error(&format!("failed to flush file: {}", e)))?;
421 }
422 Ok(())
423 }
424
425 async fn stats(&self) -> BlobStoreStats {
426 let stats = self.stats.read().await;
427 stats.clone()
428 }
429}
430
431pub struct AsyncCompressedBlobStore<S: AsyncBlobStore> {
433 inner: S,
434 compression_level: i32,
435}
436
437impl<S: AsyncBlobStore> AsyncCompressedBlobStore<S> {
438 pub fn new(inner: S, compression_level: i32) -> Self {
440 Self {
441 inner,
442 compression_level,
443 }
444 }
445
446 #[cfg(feature = "zstd")]
448 async fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
449 let level = self.compression_level;
450 let data = data.to_vec();
451
452 tokio::task::spawn_blocking(move || {
453 zstd::bulk::compress(&data, level)
454 .map_err(|e| ZiporaError::configuration(&format!("compression failed: {}", e)))
455 })
456 .await
457 .map_err(|e| ZiporaError::configuration(&format!("compression task failed: {}", e)))?
458 }
459
460 #[cfg(not(feature = "zstd"))]
462 async fn compress(&self, _data: &[u8]) -> Result<Vec<u8>> {
463 Err(ZiporaError::configuration(
464 "ZSTD compression not available - enable 'zstd' feature",
465 ))
466 }
467
468 #[cfg(feature = "zstd")]
470 async fn decompress(&self, data: &[u8]) -> Result<Vec<u8>> {
471 let data = data.to_vec();
472
473 tokio::task::spawn_blocking(move || {
474 zstd::bulk::decompress(&data, 10 * 1024 * 1024) .map_err(|e| ZiporaError::configuration(&format!("decompression failed: {}", e)))
476 })
477 .await
478 .map_err(|e| ZiporaError::configuration(&format!("decompression task failed: {}", e)))?
479 }
480
481 #[cfg(not(feature = "zstd"))]
483 async fn decompress(&self, _data: &[u8]) -> Result<Vec<u8>> {
484 Err(ZiporaError::configuration(
485 "ZSTD decompression not available - enable 'zstd' feature",
486 ))
487 }
488}
489
490#[async_trait::async_trait]
491impl<S: AsyncBlobStore> AsyncBlobStore for AsyncCompressedBlobStore<S> {
492 async fn put(&self, data: &[u8]) -> Result<RecordId> {
493 let compressed = self.compress(data).await?;
494 self.inner.put(&compressed).await
495 }
496
497 async fn get(&self, id: RecordId) -> Result<Vec<u8>> {
498 let compressed = self.inner.get(id).await?;
499 self.decompress(&compressed).await
500 }
501
502 async fn remove(&self, id: RecordId) -> Result<()> {
503 self.inner.remove(id).await
504 }
505
506 async fn contains(&self, id: RecordId) -> bool {
507 self.inner.contains(id).await
508 }
509
510 async fn size(&self, id: RecordId) -> Result<Option<usize>> {
511 self.inner.size(id).await
513 }
514
515 async fn len(&self) -> usize {
516 self.inner.len().await
517 }
518
519 async fn flush(&self) -> Result<()> {
520 self.inner.flush().await
521 }
522
523 async fn stats(&self) -> BlobStoreStats {
524 self.inner.stats().await
525 }
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531 use tempfile::TempDir;
532 use tokio;
533
534 #[tokio::test]
535 async fn test_async_memory_blob_store() {
536 let store = AsyncMemoryBlobStore::new();
537
538 let data = b"hello world";
540 let id = store.put(data).await.unwrap();
541 let retrieved = store.get(id).await.unwrap();
542
543 assert_eq!(data, retrieved.as_slice());
544 assert_eq!(store.len().await, 1);
545 assert!(store.contains(id).await);
546
547 let size = store.size(id).await.unwrap();
549 assert_eq!(size, Some(data.len()));
550
551 store.remove(id).await.unwrap();
553 assert_eq!(store.len().await, 0);
554 assert!(!store.contains(id).await);
555 }
556
557 #[tokio::test]
558 async fn test_async_file_store() {
559 let temp_dir = TempDir::new().unwrap();
560 let store = AsyncFileStore::new(temp_dir.path()).await.unwrap();
561
562 let data = b"hello file world";
564 let id = store.put(data).await.unwrap();
565 let retrieved = store.get(id).await.unwrap();
566
567 assert_eq!(data, retrieved.as_slice());
568 assert_eq!(store.len().await, 1);
569 assert!(store.contains(id).await);
570
571 store.remove(id).await.unwrap();
573 assert_eq!(store.len().await, 0);
574 assert!(!store.contains(id).await);
575 }
576
577 #[tokio::test]
578 async fn test_batch_operations() {
579 let store = AsyncMemoryBlobStore::new();
580
581 let data = vec![b"one".as_slice(), b"two".as_slice(), b"three".as_slice()];
583 let ids = store.put_batch(data).await.unwrap();
584
585 assert_eq!(ids.len(), 3);
586 assert_eq!(store.len().await, 3);
587
588 let retrieved = store.get_batch(ids).await.unwrap();
590 assert_eq!(retrieved.len(), 3);
591 assert_eq!(retrieved[0], b"one");
592 assert_eq!(retrieved[1], b"two");
593 assert_eq!(retrieved[2], b"three");
594 }
595
596 #[tokio::test]
597 #[cfg(feature = "zstd")]
598 async fn test_compressed_blob_store() {
599 let inner = AsyncMemoryBlobStore::new();
600 let store = AsyncCompressedBlobStore::new(inner, 3);
601
602 let data = b"hello world hello world hello world hello world";
604 let id = store.put(data).await.unwrap();
605 let retrieved = store.get(id).await.unwrap();
606
607 assert_eq!(data, retrieved.as_slice());
608 assert!(store.contains(id).await);
609 }
610
611 #[tokio::test]
612 async fn test_store_statistics() {
613 let store = AsyncMemoryBlobStore::new();
614
615 let id1 = store.put(b"data1").await.unwrap();
617 let id2 = store.put(b"data2").await.unwrap();
618 let _data1 = store.get(id1).await.unwrap();
619 let _data2 = store.get(id2).await.unwrap();
620
621 let stats = store.stats().await;
622 assert_eq!(stats.put_count, 2);
623 assert_eq!(stats.get_count, 2);
624 }
625
626 #[tokio::test]
627 async fn test_error_handling() {
628 let store = AsyncMemoryBlobStore::new();
629
630 let result = store.get(999 as RecordId).await;
632 assert!(result.is_err());
633
634 let result = store.remove(999 as RecordId).await;
636 assert!(result.is_err());
637
638 let size = store.size(999 as RecordId).await.unwrap();
640 assert_eq!(size, None);
641 }
642}