zipora/concurrency/
async_blob_store.rs

1//! Asynchronous blob storage implementations
2
3use crate::RecordId;
4use crate::blob_store::BlobStoreStats;
5use crate::error::{Result, ZiporaError};
6use std::collections::HashMap;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::Instant;
11use tokio::fs::{File, OpenOptions};
12use tokio::io::{AsyncReadExt, AsyncWriteExt};
13use tokio::sync::{Mutex, RwLock};
14
15/// Asynchronous blob store trait
16///
17/// Provides async methods for storing, retrieving, and managing binary data blobs
18/// Uses async_trait to enable async functions in traits
19///
20/// This macro transforms async trait methods into regular trait methods returning Pin<Box<dyn Future>>
21#[allow(missing_docs)] // async_trait macro causes false positive
22#[async_trait::async_trait]
23pub trait AsyncBlobStore: Send + Sync {
24    /// Store a blob and return its record ID
25    async fn put(&self, data: &[u8]) -> Result<RecordId>;
26
27    /// Retrieve a blob by its record ID
28    async fn get(&self, id: RecordId) -> Result<Vec<u8>>;
29
30    /// Remove a blob by its record ID
31    async fn remove(&self, id: RecordId) -> Result<()>;
32
33    /// Check if a blob exists
34    async fn contains(&self, id: RecordId) -> bool;
35
36    /// Get the size of a blob without loading it
37    async fn size(&self, id: RecordId) -> Result<Option<usize>>;
38
39    /// Get the number of blobs in the store
40    async fn len(&self) -> usize;
41
42    /// Check if the store is empty
43    async fn is_empty(&self) -> bool {
44        self.len().await == 0
45    }
46
47    /// Flush any pending writes
48    async fn flush(&self) -> Result<()>;
49
50    /// Get store statistics
51    async fn stats(&self) -> BlobStoreStats;
52
53    /// Batch operations for better performance
54    async fn put_batch(&self, data: Vec<&[u8]>) -> Result<Vec<RecordId>> {
55        let mut ids = Vec::with_capacity(data.len());
56        for item in data {
57            ids.push(self.put(item).await?);
58        }
59        Ok(ids)
60    }
61
62    async fn get_batch(&self, ids: Vec<RecordId>) -> Result<Vec<Vec<u8>>> {
63        let mut results = Vec::with_capacity(ids.len());
64        for id in ids {
65            results.push(self.get(id).await?);
66        }
67        Ok(results)
68    }
69}
70
71/// Asynchronous in-memory blob store
72pub struct AsyncMemoryBlobStore {
73    data: Arc<RwLock<HashMap<RecordId, Vec<u8>>>>,
74    next_id: AtomicU64,
75    stats: Arc<RwLock<BlobStoreStats>>,
76}
77
78impl AsyncMemoryBlobStore {
79    /// Create a new async memory blob store
80    pub fn new() -> Self {
81        Self {
82            data: Arc::new(RwLock::new(HashMap::new())),
83            next_id: AtomicU64::new(1),
84            stats: Arc::new(RwLock::new(BlobStoreStats::default())),
85        }
86    }
87
88    /// Create with initial capacity
89    pub fn with_capacity(capacity: usize) -> Self {
90        Self {
91            data: Arc::new(RwLock::new(HashMap::with_capacity(capacity))),
92            next_id: AtomicU64::new(1),
93            stats: Arc::new(RwLock::new(BlobStoreStats::default())),
94        }
95    }
96}
97
98impl Default for AsyncMemoryBlobStore {
99    fn default() -> Self {
100        Self::new()
101    }
102}
103
104#[async_trait::async_trait]
105impl AsyncBlobStore for AsyncMemoryBlobStore {
106    async fn put(&self, data: &[u8]) -> Result<RecordId> {
107        let _start_time = Instant::now();
108        let id = self.next_id.fetch_add(1, Ordering::Relaxed) as RecordId;
109
110        {
111            let mut store = self.data.write().await;
112            store.insert(id, data.to_vec());
113        }
114
115        let mut stats = self.stats.write().await;
116        stats.put_count += 1;
117
118        Ok(id)
119    }
120
121    async fn get(&self, id: RecordId) -> Result<Vec<u8>> {
122        let _start_time = Instant::now();
123
124        let result = {
125            let store = self.data.read().await;
126            store.get(&id).cloned()
127        };
128
129        let mut stats = self.stats.write().await;
130        stats.get_count += 1;
131
132        match result {
133            Some(data) => Ok(data),
134            None => Err(ZiporaError::invalid_data(&format!(
135                "record not found: {}",
136                id
137            ))),
138        }
139    }
140
141    async fn remove(&self, id: RecordId) -> Result<()> {
142        let _start_time = Instant::now();
143
144        let removed = {
145            let mut store = self.data.write().await;
146            store.remove(&id)
147        };
148
149        let _stats = self.stats.write().await;
150
151        if removed.is_some() {
152            Ok(())
153        } else {
154            Err(ZiporaError::invalid_data(&format!(
155                "record not found: {}",
156                id
157            )))
158        }
159    }
160
161    async fn contains(&self, id: RecordId) -> bool {
162        let store = self.data.read().await;
163        store.contains_key(&id)
164    }
165
166    async fn size(&self, id: RecordId) -> Result<Option<usize>> {
167        let store = self.data.read().await;
168        Ok(store.get(&id).map(|data| data.len()))
169    }
170
171    async fn len(&self) -> usize {
172        let store = self.data.read().await;
173        store.len()
174    }
175
176    async fn flush(&self) -> Result<()> {
177        // Memory store doesn't need flushing
178        Ok(())
179    }
180
181    async fn stats(&self) -> BlobStoreStats {
182        let stats = self.stats.read().await;
183        stats.clone()
184    }
185
186    async fn put_batch(&self, data: Vec<&[u8]>) -> Result<Vec<RecordId>> {
187        let _start_time = Instant::now();
188        let mut ids = Vec::with_capacity(data.len());
189        let mut _total_bytes = 0;
190
191        {
192            let mut store = self.data.write().await;
193            for item in data {
194                let id = self.next_id.fetch_add(1, Ordering::Relaxed) as RecordId;
195                store.insert(id, item.to_vec());
196                ids.push(id);
197                _total_bytes += item.len();
198            }
199        }
200
201        let mut stats = self.stats.write().await;
202        stats.put_count += ids.len() as u64;
203
204        Ok(ids)
205    }
206
207    async fn get_batch(&self, ids: Vec<RecordId>) -> Result<Vec<Vec<u8>>> {
208        let _start_time = Instant::now();
209        let mut results = Vec::with_capacity(ids.len());
210        let mut _total_bytes = 0;
211        let mut _misses = 0;
212
213        {
214            let store = self.data.read().await;
215            for id in ids {
216                match store.get(&id) {
217                    Some(data) => {
218                        _total_bytes += data.len();
219                        results.push(data.clone());
220                    }
221                    None => {
222                        _misses += 1;
223                        return Err(ZiporaError::invalid_data(&format!(
224                            "record not found: {}",
225                            id
226                        )));
227                    }
228                }
229            }
230        }
231
232        let mut stats = self.stats.write().await;
233        stats.get_count += results.len() as u64;
234
235        Ok(results)
236    }
237}
238
239/// Asynchronous file-based blob store
240pub struct AsyncFileStore {
241    base_path: PathBuf,
242    file_handles: Arc<Mutex<HashMap<RecordId, File>>>,
243    metadata: Arc<RwLock<HashMap<RecordId, FileMetadata>>>,
244    next_id: AtomicU64,
245    stats: Arc<RwLock<BlobStoreStats>>,
246}
247
248#[derive(Clone)]
249#[allow(dead_code)]
250struct FileMetadata {
251    size: usize,
252    file_path: PathBuf,
253    created_at: Instant,
254}
255
256impl AsyncFileStore {
257    /// Create a new async file store
258    pub async fn new<P: AsRef<Path>>(base_path: P) -> Result<Self> {
259        let base_path = base_path.as_ref().to_path_buf();
260
261        // Create directory if it doesn't exist
262        tokio::fs::create_dir_all(&base_path)
263            .await
264            .map_err(|e| ZiporaError::io_error(&format!("failed to create directory: {}", e)))?;
265
266        Ok(Self {
267            base_path,
268            file_handles: Arc::new(Mutex::new(HashMap::new())),
269            metadata: Arc::new(RwLock::new(HashMap::new())),
270            next_id: AtomicU64::new(1),
271            stats: Arc::new(RwLock::new(BlobStoreStats::default())),
272        })
273    }
274
275    /// Get the file path for a record ID
276    fn get_file_path(&self, id: RecordId) -> PathBuf {
277        self.base_path.join(format!("blob_{:016x}.dat", id))
278    }
279}
280
281#[async_trait::async_trait]
282impl AsyncBlobStore for AsyncFileStore {
283    async fn put(&self, data: &[u8]) -> Result<RecordId> {
284        let _start_time = Instant::now();
285        let id = self.next_id.fetch_add(1, Ordering::Relaxed) as RecordId;
286        let file_path = self.get_file_path(id);
287
288        // Write data to file
289        let mut file = OpenOptions::new()
290            .create(true)
291            .write(true)
292            .truncate(true)
293            .open(&file_path)
294            .await
295            .map_err(|e| ZiporaError::io_error(&format!("failed to create file: {}", e)))?;
296
297        file.write_all(data)
298            .await
299            .map_err(|e| ZiporaError::io_error(&format!("failed to write data: {}", e)))?;
300
301        file.flush()
302            .await
303            .map_err(|e| ZiporaError::io_error(&format!("failed to flush file: {}", e)))?;
304
305        // Update metadata
306        {
307            let mut metadata = self.metadata.write().await;
308            metadata.insert(
309                id,
310                FileMetadata {
311                    size: data.len(),
312                    file_path: file_path.clone(),
313                    created_at: Instant::now(),
314                },
315            );
316        }
317
318        // Update statistics
319        {
320            let mut stats = self.stats.write().await;
321            stats.put_count += 1;
322        }
323
324        Ok(id)
325    }
326
327    async fn get(&self, id: RecordId) -> Result<Vec<u8>> {
328        let _start_time = Instant::now();
329        let file_path = self.get_file_path(id);
330
331        // Check if file exists in metadata
332        let metadata = {
333            let metadata = self.metadata.read().await;
334            metadata.get(&id).cloned()
335        };
336
337        let size = match metadata {
338            Some(meta) => meta.size,
339            None => {
340                let mut stats = self.stats.write().await;
341                stats.get_count += 1;
342
343                return Err(ZiporaError::invalid_data(&format!(
344                    "record not found: {}",
345                    id
346                )));
347            }
348        };
349
350        // Read data from file
351        let mut file = File::open(&file_path)
352            .await
353            .map_err(|e| ZiporaError::io_error(&format!("failed to open file: {}", e)))?;
354
355        let mut data = vec![0u8; size];
356        file.read_exact(&mut data)
357            .await
358            .map_err(|e| ZiporaError::io_error(&format!("failed to read data: {}", e)))?;
359
360        // Update statistics
361        {
362            let mut stats = self.stats.write().await;
363            stats.get_count += 1;
364        }
365
366        Ok(data)
367    }
368
369    async fn remove(&self, id: RecordId) -> Result<()> {
370        let _start_time = Instant::now();
371        let file_path = self.get_file_path(id);
372
373        // Remove from metadata first
374        let existed = {
375            let mut metadata = self.metadata.write().await;
376            metadata.remove(&id).is_some()
377        };
378
379        if !existed {
380            return Err(ZiporaError::invalid_data(&format!(
381                "record not found: {}",
382                id
383            )));
384        }
385
386        // Remove file
387        tokio::fs::remove_file(&file_path)
388            .await
389            .map_err(|e| ZiporaError::io_error(&format!("failed to remove file: {}", e)))?;
390
391        // Update statistics
392        {
393            let _stats = self.stats.write().await;
394        }
395
396        Ok(())
397    }
398
399    async fn contains(&self, id: RecordId) -> bool {
400        let metadata = self.metadata.read().await;
401        metadata.contains_key(&id)
402    }
403
404    async fn size(&self, id: RecordId) -> Result<Option<usize>> {
405        let metadata = self.metadata.read().await;
406        Ok(metadata.get(&id).map(|meta| meta.size))
407    }
408
409    async fn len(&self) -> usize {
410        let metadata = self.metadata.read().await;
411        metadata.len()
412    }
413
414    async fn flush(&self) -> Result<()> {
415        // Flush all open file handles
416        let mut handles = self.file_handles.lock().await;
417        for file in handles.values_mut() {
418            file.flush()
419                .await
420                .map_err(|e| ZiporaError::io_error(&format!("failed to flush file: {}", e)))?;
421        }
422        Ok(())
423    }
424
425    async fn stats(&self) -> BlobStoreStats {
426        let stats = self.stats.read().await;
427        stats.clone()
428    }
429}
430
431/// Wrapper that adds compression to any async blob store
432pub struct AsyncCompressedBlobStore<S: AsyncBlobStore> {
433    inner: S,
434    compression_level: i32,
435}
436
437impl<S: AsyncBlobStore> AsyncCompressedBlobStore<S> {
438    /// Create a new compressed blob store wrapper
439    pub fn new(inner: S, compression_level: i32) -> Self {
440        Self {
441            inner,
442            compression_level,
443        }
444    }
445
446    /// Compress data using zstd
447    #[cfg(feature = "zstd")]
448    async fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
449        let level = self.compression_level;
450        let data = data.to_vec();
451
452        tokio::task::spawn_blocking(move || {
453            zstd::bulk::compress(&data, level)
454                .map_err(|e| ZiporaError::configuration(&format!("compression failed: {}", e)))
455        })
456        .await
457        .map_err(|e| ZiporaError::configuration(&format!("compression task failed: {}", e)))?
458    }
459
460    /// Compress data using zstd
461    #[cfg(not(feature = "zstd"))]
462    async fn compress(&self, _data: &[u8]) -> Result<Vec<u8>> {
463        Err(ZiporaError::configuration(
464            "ZSTD compression not available - enable 'zstd' feature",
465        ))
466    }
467
468    /// Decompress data using zstd
469    #[cfg(feature = "zstd")]
470    async fn decompress(&self, data: &[u8]) -> Result<Vec<u8>> {
471        let data = data.to_vec();
472
473        tokio::task::spawn_blocking(move || {
474            zstd::bulk::decompress(&data, 10 * 1024 * 1024) // 10MB limit
475                .map_err(|e| ZiporaError::configuration(&format!("decompression failed: {}", e)))
476        })
477        .await
478        .map_err(|e| ZiporaError::configuration(&format!("decompression task failed: {}", e)))?
479    }
480
481    /// Decompress data using zstd
482    #[cfg(not(feature = "zstd"))]
483    async fn decompress(&self, _data: &[u8]) -> Result<Vec<u8>> {
484        Err(ZiporaError::configuration(
485            "ZSTD decompression not available - enable 'zstd' feature",
486        ))
487    }
488}
489
490#[async_trait::async_trait]
491impl<S: AsyncBlobStore> AsyncBlobStore for AsyncCompressedBlobStore<S> {
492    async fn put(&self, data: &[u8]) -> Result<RecordId> {
493        let compressed = self.compress(data).await?;
494        self.inner.put(&compressed).await
495    }
496
497    async fn get(&self, id: RecordId) -> Result<Vec<u8>> {
498        let compressed = self.inner.get(id).await?;
499        self.decompress(&compressed).await
500    }
501
502    async fn remove(&self, id: RecordId) -> Result<()> {
503        self.inner.remove(id).await
504    }
505
506    async fn contains(&self, id: RecordId) -> bool {
507        self.inner.contains(id).await
508    }
509
510    async fn size(&self, id: RecordId) -> Result<Option<usize>> {
511        // This returns the compressed size, not the original size
512        self.inner.size(id).await
513    }
514
515    async fn len(&self) -> usize {
516        self.inner.len().await
517    }
518
519    async fn flush(&self) -> Result<()> {
520        self.inner.flush().await
521    }
522
523    async fn stats(&self) -> BlobStoreStats {
524        self.inner.stats().await
525    }
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531    use tempfile::TempDir;
532    use tokio;
533
534    #[tokio::test]
535    async fn test_async_memory_blob_store() {
536        let store = AsyncMemoryBlobStore::new();
537
538        // Test put and get
539        let data = b"hello world";
540        let id = store.put(data).await.unwrap();
541        let retrieved = store.get(id).await.unwrap();
542
543        assert_eq!(data, retrieved.as_slice());
544        assert_eq!(store.len().await, 1);
545        assert!(store.contains(id).await);
546
547        // Test size
548        let size = store.size(id).await.unwrap();
549        assert_eq!(size, Some(data.len()));
550
551        // Test remove
552        store.remove(id).await.unwrap();
553        assert_eq!(store.len().await, 0);
554        assert!(!store.contains(id).await);
555    }
556
557    #[tokio::test]
558    async fn test_async_file_store() {
559        let temp_dir = TempDir::new().unwrap();
560        let store = AsyncFileStore::new(temp_dir.path()).await.unwrap();
561
562        // Test put and get
563        let data = b"hello file world";
564        let id = store.put(data).await.unwrap();
565        let retrieved = store.get(id).await.unwrap();
566
567        assert_eq!(data, retrieved.as_slice());
568        assert_eq!(store.len().await, 1);
569        assert!(store.contains(id).await);
570
571        // Test remove
572        store.remove(id).await.unwrap();
573        assert_eq!(store.len().await, 0);
574        assert!(!store.contains(id).await);
575    }
576
577    #[tokio::test]
578    async fn test_batch_operations() {
579        let store = AsyncMemoryBlobStore::new();
580
581        // Test batch put
582        let data = vec![b"one".as_slice(), b"two".as_slice(), b"three".as_slice()];
583        let ids = store.put_batch(data).await.unwrap();
584
585        assert_eq!(ids.len(), 3);
586        assert_eq!(store.len().await, 3);
587
588        // Test batch get
589        let retrieved = store.get_batch(ids).await.unwrap();
590        assert_eq!(retrieved.len(), 3);
591        assert_eq!(retrieved[0], b"one");
592        assert_eq!(retrieved[1], b"two");
593        assert_eq!(retrieved[2], b"three");
594    }
595
596    #[tokio::test]
597    #[cfg(feature = "zstd")]
598    async fn test_compressed_blob_store() {
599        let inner = AsyncMemoryBlobStore::new();
600        let store = AsyncCompressedBlobStore::new(inner, 3);
601
602        // Test with compressible data
603        let data = b"hello world hello world hello world hello world";
604        let id = store.put(data).await.unwrap();
605        let retrieved = store.get(id).await.unwrap();
606
607        assert_eq!(data, retrieved.as_slice());
608        assert!(store.contains(id).await);
609    }
610
611    #[tokio::test]
612    async fn test_store_statistics() {
613        let store = AsyncMemoryBlobStore::new();
614
615        // Perform some operations
616        let id1 = store.put(b"data1").await.unwrap();
617        let id2 = store.put(b"data2").await.unwrap();
618        let _data1 = store.get(id1).await.unwrap();
619        let _data2 = store.get(id2).await.unwrap();
620
621        let stats = store.stats().await;
622        assert_eq!(stats.put_count, 2);
623        assert_eq!(stats.get_count, 2);
624    }
625
626    #[tokio::test]
627    async fn test_error_handling() {
628        let store = AsyncMemoryBlobStore::new();
629
630        // Test get non-existent record
631        let result = store.get(999 as RecordId).await;
632        assert!(result.is_err());
633
634        // Test remove non-existent record
635        let result = store.remove(999 as RecordId).await;
636        assert!(result.is_err());
637
638        // Test size of non-existent record
639        let size = store.size(999 as RecordId).await.unwrap();
640        assert_eq!(size, None);
641    }
642}