sync_engine/storage/
traits.rs

1use async_trait::async_trait;
2use crate::sync_item::SyncItem;
3use thiserror::Error;
4
5#[derive(Error, Debug)]
6pub enum StorageError {
7    #[error("Item not found")]
8    NotFound,
9    #[error("Storage backend error: {0}")]
10    Backend(String),
11    #[error("Data corruption detected for '{id}': expected hash {expected}, got {actual}")]
12    Corruption {
13        id: String,
14        expected: String,
15        actual: String,
16    },
17}
18
19/// Result of a batch write operation with verification
20#[derive(Debug)]
21pub struct BatchWriteResult {
22    /// Unique batch ID for this write (for SQL verification)
23    pub batch_id: String,
24    /// Number of items successfully written
25    pub written: usize,
26    /// Whether the write was verified (for SQL, reads back count; for Redis, always true)
27    pub verified: bool,
28}
29
30#[async_trait]
31pub trait CacheStore: Send + Sync {
32    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError>;
33    async fn put(&self, item: &SyncItem) -> Result<(), StorageError>;
34    async fn delete(&self, id: &str) -> Result<(), StorageError>;
35    
36    /// Check if an item exists (Redis EXISTS command - fast, no data transfer).
37    async fn exists(&self, id: &str) -> Result<bool, StorageError>;
38    
39    /// Write a batch of items atomically (pipelined for Redis).
40    /// Default implementation falls back to sequential puts.
41    async fn put_batch(&self, items: &[SyncItem]) -> Result<BatchWriteResult, StorageError> {
42        self.put_batch_with_ttl(items, None).await
43    }
44    
45    /// Write a batch of items with optional TTL (in seconds).
46    /// For Redis: uses SETEX when ttl is Some, SET when None.
47    async fn put_batch_with_ttl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
48        // Default: ignore TTL, just do sequential puts
49        let _ = ttl_secs;
50        for item in items {
51            self.put(item).await?;
52        }
53        Ok(BatchWriteResult {
54            batch_id: String::new(),
55            written: items.len(),
56            verified: true,
57        })
58    }
59}
60
61#[async_trait]
62pub trait ArchiveStore: Send + Sync {
63    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError>;
64    async fn put(&self, item: &SyncItem) -> Result<(), StorageError>;
65    async fn delete(&self, id: &str) -> Result<(), StorageError>;
66    
67    /// Check if an item exists (SQL EXISTS query - fast, no data transfer).
68    async fn exists(&self, id: &str) -> Result<bool, StorageError>;
69    
70    /// Write a batch of items with verification.
71    /// The batch_id is stamped on items and can be queried back for verification.
72    /// Default implementation falls back to sequential puts.
73    async fn put_batch(&self, items: &mut [SyncItem]) -> Result<BatchWriteResult, StorageError> {
74        for item in items.iter() {
75            self.put(item).await?;
76        }
77        Ok(BatchWriteResult {
78            batch_id: String::new(),
79            written: items.len(),
80            verified: true,
81        })
82    }
83
84    /// Scan keys for cuckoo filter warmup (paginated).
85    /// Returns empty vec when offset exceeds total count.
86    async fn scan_keys(&self, offset: u64, limit: usize) -> Result<Vec<String>, StorageError>;
87    
88    /// Count total items in store.
89    async fn count_all(&self) -> Result<u64, StorageError>;
90}