sync_engine/storage/
traits.rs

1use async_trait::async_trait;
2use crate::sync_item::SyncItem;
3use crate::search::SqlParam;
4use thiserror::Error;
5
6#[derive(Error, Debug)]
7pub enum StorageError {
8    #[error("Item not found")]
9    NotFound,
10    #[error("Storage backend error: {0}")]
11    Backend(String),
12    #[error("Connection error: {0}")]
13    Connection(String),
14    #[error("Data corruption detected for '{id}': expected hash {expected}, got {actual}")]
15    Corruption {
16        id: String,
17        expected: String,
18        actual: String,
19    },
20}
21
22/// Result of a batch write operation with verification
23#[derive(Debug)]
24pub struct BatchWriteResult {
25    /// Unique batch ID for this write (for SQL verification)
26    pub batch_id: String,
27    /// Number of items successfully written
28    pub written: usize,
29    /// Whether the write was verified (for SQL, reads back count; for Redis, always true)
30    pub verified: bool,
31}
32
33#[async_trait]
34pub trait CacheStore: Send + Sync {
35    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError>;
36    async fn put(&self, item: &SyncItem) -> Result<(), StorageError>;
37    async fn delete(&self, id: &str) -> Result<(), StorageError>;
38    
39    /// Check if an item exists (Redis EXISTS command - fast, no data transfer).
40    async fn exists(&self, id: &str) -> Result<bool, StorageError>;
41    
42    /// Write a batch of items atomically (pipelined for Redis).
43    /// Default implementation falls back to sequential puts.
44    async fn put_batch(&self, items: &[SyncItem]) -> Result<BatchWriteResult, StorageError> {
45        self.put_batch_with_ttl(items, None).await
46    }
47    
48    /// Write a batch of items with optional TTL (in seconds).
49    /// For Redis: uses SETEX when ttl is Some, SET when None.
50    async fn put_batch_with_ttl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
51        // Default: ignore TTL, just do sequential puts
52        let _ = ttl_secs;
53        for item in items {
54            self.put(item).await?;
55        }
56        Ok(BatchWriteResult {
57            batch_id: String::new(),
58            written: items.len(),
59            verified: true,
60        })
61    }
62
63    /// Create a RediSearch index (FT.CREATE).
64    async fn ft_create(&self, args: &[String]) -> Result<(), StorageError> {
65        let _ = args;
66        Err(StorageError::Backend("FT.CREATE not supported".into()))
67    }
68
69    /// Drop a RediSearch index (FT.DROPINDEX).
70    async fn ft_dropindex(&self, index: &str) -> Result<(), StorageError> {
71        let _ = index;
72        Err(StorageError::Backend("FT.DROPINDEX not supported".into()))
73    }
74
75    /// Search using RediSearch (FT.SEARCH).
76    /// Returns matching keys.
77    async fn ft_search(&self, index: &str, query: &str, limit: usize) -> Result<Vec<String>, StorageError> {
78        let _ = (index, query, limit);
79        Err(StorageError::Backend("FT.SEARCH not supported".into()))
80    }
81}
82
83#[async_trait]
84pub trait ArchiveStore: Send + Sync {
85    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError>;
86    async fn put(&self, item: &SyncItem) -> Result<(), StorageError>;
87    async fn delete(&self, id: &str) -> Result<(), StorageError>;
88    
89    /// Check if an item exists (SQL EXISTS query - fast, no data transfer).
90    async fn exists(&self, id: &str) -> Result<bool, StorageError>;
91    
92    /// Write a batch of items with verification.
93    /// The batch_id is stamped on items and can be queried back for verification.
94    /// Default implementation falls back to sequential puts.
95    async fn put_batch(&self, items: &mut [SyncItem]) -> Result<BatchWriteResult, StorageError> {
96        for item in items.iter() {
97            self.put(item).await?;
98        }
99        Ok(BatchWriteResult {
100            batch_id: String::new(),
101            written: items.len(),
102            verified: true,
103        })
104    }
105
106    /// Scan keys for cuckoo filter warmup (paginated).
107    /// Returns empty vec when offset exceeds total count.
108    async fn scan_keys(&self, offset: u64, limit: usize) -> Result<Vec<String>, StorageError>;
109    
110    /// Count total items in store.
111    async fn count_all(&self) -> Result<u64, StorageError>;
112
113    /// Search using SQL WHERE clause with JSON_EXTRACT.
114    /// Returns matching items.
115    async fn search(&self, where_clause: &str, params: &[SqlParam], limit: usize) -> Result<Vec<SyncItem>, StorageError> {
116        let _ = (where_clause, params, limit);
117        Err(StorageError::Backend("SQL search not supported".into()))
118    }
119}