replication_engine/
sync_engine.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Sync engine integration traits.
5//!
6//! Defines the interface for integrating with a sync/storage backend.
7//! Uses `sync_engine::SyncItem` directly for tight integration with the sync-engine crate.
8//!
9//! # Example
10//!
11//! ```rust,no_run
12//! use replication_engine::sync_engine::{SyncEngineRef, SyncResult, SyncError, BoxFuture};
13//! use sync_engine::SyncItem;
14//! use std::pin::Pin;
15//! use std::future::Future;
16//!
17//! struct MyBackend { /* ... */ }
18//!
19//! impl SyncEngineRef for MyBackend {
20//!     fn should_accept_writes(&self) -> bool {
21//!         true // Always accept in example
22//!     }
23//!
24//!     fn is_current(&self, _key: &str, _hash: &str) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
25//!         Box::pin(async move { Ok(true) })
26//!     }
27//!
28//!     fn submit(&self, item: SyncItem) -> Pin<Box<dyn Future<Output = SyncResult<()>> + Send + '_>> {
29//!         Box::pin(async move { Ok(()) })
30//!     }
31//!
32//!     fn delete(&self, _key: String) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
33//!         Box::pin(async move { Ok(true) })
34//!     }
35//!
36//!     fn get_merkle_root(&self) -> BoxFuture<'_, Option<[u8; 32]>> {
37//!         Box::pin(async move { Ok(None) })
38//!     }
39//!
40//!     fn get_merkle_children(&self, _path: &str) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
41//!         Box::pin(async move { Ok(vec![]) })
42//!     }
43//!
44//!     fn get(&self, _key: &str) -> BoxFuture<'_, Option<Vec<u8>>> {
45//!         Box::pin(async move { Ok(None) })
46//!     }
47//! }
48//! ```
49
50use std::future::Future;
51use std::pin::Pin;
52
53// Re-export SyncItem and BackpressureLevel for convenience
54pub use sync_engine::{BackpressureLevel, SyncItem};
55
56/// Result type for sync engine operations.
57pub type SyncResult<T> = std::result::Result<T, SyncError>;
58
59/// Type alias for boxed async futures (reduces trait signature complexity).
60pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = SyncResult<T>> + Send + 'a>>;
61
62/// Simplified error for sync engine operations.
63#[derive(Debug, Clone)]
64pub struct SyncError(pub String);
65
66impl std::fmt::Display for SyncError {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        write!(f, "{}", self.0)
69    }
70}
71
72impl std::error::Error for SyncError {}
73
74/// Trait defining what we need from sync-engine.
75///
76/// The daemon provides an implementation of this trait, allowing us to:
77/// 1. Write replicated data (`submit`)
78/// 2. Check for duplicates (`is_current`)
79/// 3. Query Merkle tree for cold path repair
80///
81/// This trait allows testing with mocks and decouples us from sync-engine internals.
82pub trait SyncEngineRef: Send + Sync + 'static {
83    /// Check if the sync-engine is accepting writes (backpressure check).
84    ///
85    /// Returns `false` when the engine is under critical pressure (>= 90% memory).
86    /// Callers should pause ingestion when this returns `false` to avoid
87    /// wasting CPU on events that will be rejected.
88    ///
89    /// Default implementation returns `true` (always accept).
90    fn should_accept_writes(&self) -> bool {
91        true
92    }
93
94    /// Submit an item to the local sync-engine.
95    ///
96    /// This is how we write replicated data from peers.
97    /// The `SyncItem` contains all necessary fields (key, content, hash, version).
98    fn submit(
99        &self,
100        item: SyncItem,
101    ) -> Pin<Box<dyn Future<Output = SyncResult<()>> + Send + '_>>;
102
103    /// Delete an item from the local sync-engine.
104    fn delete(
105        &self,
106        key: String,
107    ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>>;
108
109    /// Check if we already have content with this hash.
110    ///
111    /// Returns `true` if the item exists AND its content hash matches.
112    /// Used for CDC deduplication (loop prevention).
113    fn is_current(
114        &self,
115        key: &str,
116        content_hash: &str,
117    ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>>;
118
119    /// Get the Merkle root hash (for cold path comparison).
120    fn get_merkle_root(&self) -> BoxFuture<'_, Option<[u8; 32]>>;
121
122    /// Get children of a Merkle path (for cold path drill-down).
123    fn get_merkle_children(&self, path: &str) -> BoxFuture<'_, Vec<(String, [u8; 32])>>;
124
125    /// Fetch an item by key (for cold path repair).
126    fn get(&self, key: &str) -> BoxFuture<'_, Option<Vec<u8>>>;
127}
128
129/// Implementation of SyncEngineRef for the real SyncEngine.
130///
131/// This allows the replication engine to drive the real storage backend directly.
132impl SyncEngineRef for sync_engine::SyncEngine {
133    fn should_accept_writes(&self) -> bool {
134        self.pressure().should_accept_writes()
135    }
136
137    fn submit(
138        &self,
139        item: SyncItem,
140    ) -> Pin<Box<dyn Future<Output = SyncResult<()>> + Send + '_>> {
141        Box::pin(async move {
142            self.submit(item).await.map_err(|e| SyncError(e.to_string()))
143        })
144    }
145
146    fn delete(
147        &self,
148        key: String,
149    ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
150        Box::pin(async move {
151            self.delete(&key).await.map_err(|e| SyncError(e.to_string()))
152        })
153    }
154
155    fn is_current(
156        &self,
157        key: &str,
158        content_hash: &str,
159    ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
160        let key = key.to_string();
161        let hash = content_hash.to_string();
162        Box::pin(async move {
163            // Check if we have the item and the hash matches
164            match self.get_verified(&key).await {
165                Ok(Some(item)) => Ok(item.content_hash == hash),
166                Ok(None) => Ok(false),
167                Err(_) => Ok(false), // If corrupt or error, assume not current to force re-sync
168            }
169        })
170    }
171
172    fn get_merkle_root(&self) -> BoxFuture<'_, Option<[u8; 32]>> {
173        Box::pin(async move {
174            self.get_merkle_root().await.map_err(|e| SyncError(e.to_string()))
175        })
176    }
177
178    fn get_merkle_children(&self, path: &str) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
179        let path = path.to_string();
180        Box::pin(async move {
181            let children_map = self.get_merkle_children(&path).await.map_err(|e| SyncError(e.to_string()))?;
182            // Convert BTreeMap to Vec
183            Ok(children_map.into_iter().collect())
184        })
185    }
186
187    fn get(&self, key: &str) -> BoxFuture<'_, Option<Vec<u8>>> {
188        let key = key.to_string();
189        Box::pin(async move {
190            match self.get(&key).await {
191                Ok(Some(item)) => Ok(Some(item.content)),
192                Ok(None) => Ok(None),
193                Err(e) => Err(SyncError(e.to_string())),
194            }
195        })
196    }
197}
198
199/// A no-op implementation for testing/standalone mode.
200///
201/// Logs operations but doesn't actually store anything.
202#[derive(Clone)]
203pub struct NoOpSyncEngine;
204
205impl SyncEngineRef for NoOpSyncEngine {
206    fn submit(
207        &self,
208        item: SyncItem,
209    ) -> Pin<Box<dyn Future<Output = SyncResult<()>> + Send + '_>> {
210        Box::pin(async move {
211            tracing::debug!(
212                key = %item.object_id,
213                hash = %item.content_hash,
214                len = item.content.len(),
215                version = item.version,
216                "NoOp: would submit item"
217            );
218            Ok(())
219        })
220    }
221
222    fn delete(
223        &self,
224        key: String,
225    ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
226        Box::pin(async move {
227            tracing::debug!(key = %key, "NoOp: would delete item");
228            Ok(true)
229        })
230    }
231
232    fn is_current(
233        &self,
234        key: &str,
235        content_hash: &str,
236    ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
237        let key = key.to_string();
238        let hash = content_hash.to_string();
239        Box::pin(async move {
240            tracing::trace!(key = %key, hash = %hash, "NoOp: is_current check (returning false)");
241            Ok(false) // Always apply in no-op mode
242        })
243    }
244
245    fn get_merkle_root(&self) -> BoxFuture<'_, Option<[u8; 32]>> {
246        Box::pin(async { Ok(None) })
247    }
248
249    fn get_merkle_children(&self, _path: &str) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
250        Box::pin(async { Ok(vec![]) })
251    }
252
253    fn get(&self, _key: &str) -> BoxFuture<'_, Option<Vec<u8>>> {
254        Box::pin(async { Ok(None) })
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261
262    #[tokio::test]
263    async fn test_noop_sync_engine_submit() {
264        let engine = NoOpSyncEngine;
265        
266        // Submit should succeed
267        let item = SyncItem::new("test.key".to_string(), b"data".to_vec());
268        let result = engine.submit(item).await;
269        assert!(result.is_ok());
270    }
271
272    #[tokio::test]
273    async fn test_noop_sync_engine_submit_large_content() {
274        let engine = NoOpSyncEngine;
275        
276        // Large content should work
277        let large_content = vec![0u8; 1024 * 1024]; // 1MB
278        let item = SyncItem::new("large.key".to_string(), large_content);
279        let result = engine.submit(item).await;
280        assert!(result.is_ok());
281    }
282
283    #[tokio::test]
284    async fn test_noop_sync_engine_delete() {
285        let engine = NoOpSyncEngine;
286        
287        // Delete should succeed and return true
288        let result = engine.delete("some.key".to_string()).await;
289        assert!(result.is_ok());
290        assert!(result.unwrap()); // NoOp always returns true
291    }
292
293    #[tokio::test]
294    async fn test_noop_sync_engine_is_current() {
295        let engine = NoOpSyncEngine;
296        
297        // is_current should return false (no dedup in noop mode)
298        let result = engine.is_current("test.key", "hash123").await;
299        assert!(result.is_ok());
300        assert!(!result.unwrap()); // Always false in noop mode
301    }
302
303    #[tokio::test]
304    async fn test_noop_sync_engine_is_current_various_keys() {
305        let engine = NoOpSyncEngine;
306        
307        // All keys should return false
308        assert!(!engine.is_current("key1", "hash1").await.unwrap());
309        assert!(!engine.is_current("key2", "hash2").await.unwrap());
310        assert!(!engine.is_current("", "").await.unwrap());
311    }
312
313    #[tokio::test]
314    async fn test_noop_sync_engine_get_merkle_root() {
315        let engine = NoOpSyncEngine;
316        
317        // Should return None
318        let result = engine.get_merkle_root().await;
319        assert!(result.is_ok());
320        assert!(result.unwrap().is_none());
321    }
322
323    #[tokio::test]
324    async fn test_noop_sync_engine_get_merkle_children() {
325        let engine = NoOpSyncEngine;
326        
327        // Should return empty vec
328        let result = engine.get_merkle_children("some/path").await;
329        assert!(result.is_ok());
330        assert!(result.unwrap().is_empty());
331
332        // Even for empty path
333        let result = engine.get_merkle_children("").await;
334        assert!(result.is_ok());
335        assert!(result.unwrap().is_empty());
336    }
337
338    #[tokio::test]
339    async fn test_noop_sync_engine_get() {
340        let engine = NoOpSyncEngine;
341        
342        // Should return None
343        let result = engine.get("some.key").await;
344        assert!(result.is_ok());
345        assert!(result.unwrap().is_none());
346    }
347
348    #[test]
349    fn test_sync_error_display() {
350        let error = SyncError("test error message".to_string());
351        assert_eq!(format!("{}", error), "test error message");
352    }
353
354    #[test]
355    fn test_sync_error_debug() {
356        let error = SyncError("debug message".to_string());
357        let debug = format!("{:?}", error);
358        assert!(debug.contains("debug message"));
359    }
360
361    #[test]
362    fn test_sync_error_is_error() {
363        let error = SyncError("error".to_string());
364        // Verify it implements std::error::Error
365        let _: &dyn std::error::Error = &error;
366    }
367
368    #[test]
369    fn test_noop_sync_engine_clone() {
370        let engine = NoOpSyncEngine;
371        let _cloned = engine.clone();
372        // Just verify Clone works
373    }
374
375    #[test]
376    fn test_sync_error_clone() {
377        let error = SyncError("original".to_string());
378        let cloned = error.clone();
379        assert_eq!(error.0, cloned.0);
380    }
381}