sync_engine/storage/
traits.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4use async_trait::async_trait;
5use crate::sync_item::SyncItem;
6use crate::search::SqlParam;
7use thiserror::Error;
8
9#[derive(Error, Debug)]
10pub enum StorageError {
11    #[error("Item not found")]
12    NotFound,
13    #[error("Storage backend error: {0}")]
14    Backend(String),
15    #[error("Connection error: {0}")]
16    Connection(String),
17    #[error("Data corruption detected for '{id}': expected hash {expected}, got {actual}")]
18    Corruption {
19        id: String,
20        expected: String,
21        actual: String,
22    },
23}
24
25/// Result of a batch write operation with verification
26#[derive(Debug)]
27pub struct BatchWriteResult {
28    /// Unique batch ID for this write (for SQL verification)
29    pub batch_id: String,
30    /// Number of items successfully written
31    pub written: usize,
32    /// Whether the write was verified (for SQL, reads back count; for Redis, always true)
33    pub verified: bool,
34}
35
36#[async_trait]
37pub trait CacheStore: Send + Sync {
38    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError>;
39    async fn put(&self, item: &SyncItem) -> Result<(), StorageError>;
40    async fn delete(&self, id: &str) -> Result<(), StorageError>;
41    
42    /// Check if an item exists (Redis EXISTS command - fast, no data transfer).
43    async fn exists(&self, id: &str) -> Result<bool, StorageError>;
44    
45    /// Write a batch of items atomically (pipelined for Redis).
46    /// Default implementation falls back to sequential puts.
47    async fn put_batch(&self, items: &[SyncItem]) -> Result<BatchWriteResult, StorageError> {
48        self.put_batch_with_ttl(items, None).await
49    }
50    
51    /// Write a batch of items with optional TTL (in seconds).
52    /// For Redis: uses SETEX when ttl is Some, SET when None.
53    async fn put_batch_with_ttl(&self, items: &[SyncItem], ttl_secs: Option<u64>) -> Result<BatchWriteResult, StorageError> {
54        // Default: ignore TTL, just do sequential puts
55        let _ = ttl_secs;
56        for item in items {
57            self.put(item).await?;
58        }
59        Ok(BatchWriteResult {
60            batch_id: String::new(),
61            written: items.len(),
62            verified: true,
63        })
64    }
65
66    /// Create a RediSearch index (FT.CREATE).
67    async fn ft_create(&self, args: &[String]) -> Result<(), StorageError> {
68        let _ = args;
69        Err(StorageError::Backend("FT.CREATE not supported".into()))
70    }
71
72    /// Drop a RediSearch index (FT.DROPINDEX).
73    async fn ft_dropindex(&self, index: &str) -> Result<(), StorageError> {
74        let _ = index;
75        Err(StorageError::Backend("FT.DROPINDEX not supported".into()))
76    }
77
78    /// Search using RediSearch (FT.SEARCH).
79    /// Returns matching keys.
80    async fn ft_search(&self, index: &str, query: &str, limit: usize) -> Result<Vec<String>, StorageError> {
81        let _ = (index, query, limit);
82        Err(StorageError::Backend("FT.SEARCH not supported".into()))
83    }
84
85    /// Search using RediSearch with binary parameters (for vector KNN search).
86    /// The params are (name, blob) pairs passed as PARAMS to FT.SEARCH.
87    async fn ft_search_with_params(
88        &self,
89        index: &str,
90        query: &str,
91        params: &[(String, Vec<u8>)],
92        limit: usize,
93    ) -> Result<Vec<String>, StorageError> {
94        let _ = (index, query, params, limit);
95        Err(StorageError::Backend("FT.SEARCH with PARAMS not supported".into()))
96    }
97}
98
99#[async_trait]
100pub trait ArchiveStore: Send + Sync {
101    async fn get(&self, id: &str) -> Result<Option<SyncItem>, StorageError>;
102    async fn put(&self, item: &SyncItem) -> Result<(), StorageError>;
103    async fn delete(&self, id: &str) -> Result<(), StorageError>;
104    
105    /// Check if an item exists (SQL EXISTS query - fast, no data transfer).
106    async fn exists(&self, id: &str) -> Result<bool, StorageError>;
107    
108    /// Write a batch of items with verification.
109    /// The batch_id is stamped on items and can be queried back for verification.
110    /// Default implementation falls back to sequential puts.
111    async fn put_batch(&self, items: &mut [SyncItem]) -> Result<BatchWriteResult, StorageError> {
112        for item in items.iter() {
113            self.put(item).await?;
114        }
115        Ok(BatchWriteResult {
116            batch_id: String::new(),
117            written: items.len(),
118            verified: true,
119        })
120    }
121
122    /// Scan keys for cuckoo filter warmup (paginated).
123    /// Returns empty vec when offset exceeds total count.
124    async fn scan_keys(&self, offset: u64, limit: usize) -> Result<Vec<String>, StorageError>;
125    
126    /// Count total items in store.
127    async fn count_all(&self) -> Result<u64, StorageError>;
128
129    /// Search using SQL WHERE clause with JSON_EXTRACT.
130    /// Returns matching items.
131    async fn search(&self, where_clause: &str, params: &[SqlParam], limit: usize) -> Result<Vec<SyncItem>, StorageError> {
132        let _ = (where_clause, params, limit);
133        Err(StorageError::Backend("SQL search not supported".into()))
134    }
135}