replication_engine/
sync_engine.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Sync engine integration traits.
5//!
6//! Defines the interface for integrating with a sync/storage backend.
7//! Uses `sync_engine::SyncItem` directly for tight integration with the sync-engine crate.
8//!
9//! # Example
10//!
11//! ```rust,no_run
12//! use replication_engine::sync_engine::{SyncEngineRef, SyncResult, SyncError, BoxFuture};
13//! use sync_engine::SyncItem;
14//! use std::pin::Pin;
15//! use std::future::Future;
16//!
17//! struct MyBackend { /* ... */ }
18//!
19//! impl SyncEngineRef for MyBackend {
20//!     fn should_accept_writes(&self) -> bool {
21//!         true // Always accept in example
22//!     }
23//!
24//!     fn is_current(&self, _key: &str, _hash: &str) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
25//!         Box::pin(async move { Ok(true) })
26//!     }
27//!
28//!     fn submit(&self, item: SyncItem) -> Pin<Box<dyn Future<Output = SyncResult<()>> + Send + '_>> {
29//!         Box::pin(async move { Ok(()) })
30//!     }
31//!
32//!     fn delete(&self, _key: String) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
33//!         Box::pin(async move { Ok(true) })
34//!     }
35//!
36//!     fn get_merkle_root(&self) -> BoxFuture<'_, Option<[u8; 32]>> {
37//!         Box::pin(async move { Ok(None) })
38//!     }
39//!
40//!     fn get_merkle_children(&self, _path: &str) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
41//!         Box::pin(async move { Ok(vec![]) })
42//!     }
43//!
44//!     fn get(&self, _key: &str) -> BoxFuture<'_, Option<Vec<u8>>> {
45//!         Box::pin(async move { Ok(None) })
46//!     }
47//! }
48//! ```
49
50use std::future::Future;
51use std::pin::Pin;
52
53// Re-export SyncItem and BackpressureLevel for convenience
54pub use sync_engine::{BackpressureLevel, SyncItem};
55
56/// Result type for sync engine operations.
57pub type SyncResult<T> = std::result::Result<T, SyncError>;
58
59/// Type alias for boxed async futures (reduces trait signature complexity).
60pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = SyncResult<T>> + Send + 'a>>;
61
62/// Simplified error for sync engine operations.
63#[derive(Debug, Clone)]
64pub struct SyncError(pub String);
65
66impl std::fmt::Display for SyncError {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        write!(f, "{}", self.0)
69    }
70}
71
72impl std::error::Error for SyncError {}
73
74/// Trait defining what we need from sync-engine.
75///
76/// The daemon provides an implementation of this trait, allowing us to:
77/// 1. Write replicated data (`submit`)
78/// 2. Check for duplicates (`is_current`)
79/// 3. Query Merkle tree for cold path repair
80///
81/// This trait allows testing with mocks and decouples us from sync-engine internals.
82pub trait SyncEngineRef: Send + Sync + 'static {
83    /// Check if the sync-engine is accepting writes (backpressure check).
84    ///
85    /// Returns `false` when the engine is under critical pressure (>= 90% memory).
86    /// Callers should pause ingestion when this returns `false` to avoid
87    /// wasting CPU on events that will be rejected.
88    ///
89    /// Default implementation returns `true` (always accept).
90    fn should_accept_writes(&self) -> bool {
91        true
92    }
93
94    /// Submit an item to the local sync-engine.
95    ///
96    /// This is how we write replicated data from peers.
97    /// The `SyncItem` contains all necessary fields (key, content, hash, version).
98    fn submit(
99        &self,
100        item: SyncItem,
101    ) -> Pin<Box<dyn Future<Output = SyncResult<()>> + Send + '_>>;
102
103    /// Delete an item from the local sync-engine.
104    fn delete(
105        &self,
106        key: String,
107    ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>>;
108
109    /// Check if we already have content with this hash.
110    ///
111    /// Returns `true` if the item exists AND its content hash matches.
112    /// Used for CDC deduplication (loop prevention).
113    fn is_current(
114        &self,
115        key: &str,
116        content_hash: &str,
117    ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>>;
118
119    /// Get the Merkle root hash (for cold path comparison).
120    fn get_merkle_root(&self) -> BoxFuture<'_, Option<[u8; 32]>>;
121
122    /// Get children of a Merkle path (for cold path drill-down).
123    fn get_merkle_children(&self, path: &str) -> BoxFuture<'_, Vec<(String, [u8; 32])>>;
124
125    /// Fetch an item by key (for cold path repair).
126    fn get(&self, key: &str) -> BoxFuture<'_, Option<Vec<u8>>>;
127    
128    /// Get clean branches (no pending merkle recalcs) with their hashes.
129    ///
130    /// Returns branches that are safe to compare with peers.
131    /// Branches with pending writes should be skipped during cold path sync.
132    fn get_clean_branches(&self) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
133        // Default: return empty (no branch hygiene info available)
134        Box::pin(async { Ok(Vec::new()) })
135    }
136    
137    /// Check if a specific branch is clean (no pending merkle recalcs).
138    ///
139    /// Returns `true` if the branch has no dirty items and is safe to compare.
140    fn is_branch_clean(&self, _prefix: &str) -> BoxFuture<'_, bool> {
141        // Default: assume clean (conservative for mocks)
142        Box::pin(async { Ok(true) })
143    }
144}
145
146/// Implementation of SyncEngineRef for the real SyncEngine.
147///
148/// This allows the replication engine to drive the real storage backend directly.
149impl SyncEngineRef for sync_engine::SyncEngine {
150    fn should_accept_writes(&self) -> bool {
151        self.pressure().should_accept_writes()
152    }
153
154    fn submit(
155        &self,
156        item: SyncItem,
157    ) -> Pin<Box<dyn Future<Output = SyncResult<()>> + Send + '_>> {
158        Box::pin(async move {
159            self.submit(item).await.map_err(|e| SyncError(e.to_string()))
160        })
161    }
162
163    fn delete(
164        &self,
165        key: String,
166    ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
167        Box::pin(async move {
168            self.delete(&key).await.map_err(|e| SyncError(e.to_string()))
169        })
170    }
171
172    fn is_current(
173        &self,
174        key: &str,
175        content_hash: &str,
176    ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
177        let key = key.to_string();
178        let hash = content_hash.to_string();
179        Box::pin(async move {
180            // Check if we have the item and the hash matches
181            match self.get_verified(&key).await {
182                Ok(Some(item)) => Ok(item.content_hash == hash),
183                Ok(None) => Ok(false),
184                Err(_) => Ok(false), // If corrupt or error, assume not current to force re-sync
185            }
186        })
187    }
188
189    fn get_merkle_root(&self) -> BoxFuture<'_, Option<[u8; 32]>> {
190        Box::pin(async move {
191            Ok(self.get_merkle_root().await.ok().flatten())
192        })
193    }
194
195    fn get_merkle_children(&self, path: &str) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
196        let path = path.to_string();
197        Box::pin(async move {
198            Ok(match self.get_merkle_children(&path).await {
199                Ok(children_map) => children_map.into_iter().collect(),
200                Err(_) => Vec::new(),
201            })
202        })
203    }
204
205    fn get(&self, key: &str) -> BoxFuture<'_, Option<Vec<u8>>> {
206        let key = key.to_string();
207        Box::pin(async move {
208            Ok(match self.get(&key).await {
209                Ok(Some(item)) => Some(item.content),
210                _ => None,
211            })
212        })
213    }
214    
215    fn get_clean_branches(&self) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
216        Box::pin(async move {
217            Ok(match self.get_clean_branches().await {
218                Ok(branches_map) => branches_map.into_iter().collect(),
219                Err(_) => Vec::new(),
220            })
221        })
222    }
223    
224    fn is_branch_clean(&self, prefix: &str) -> BoxFuture<'_, bool> {
225        let prefix = prefix.to_string();
226        Box::pin(async move {
227            Ok(match self.branch_dirty_count(&prefix).await {
228                Ok(count) => count == 0,
229                Err(_) => false, // Assume dirty on error (conservative)
230            })
231        })
232    }
233}
234
235/// A no-op implementation for testing/standalone mode.
236///
237/// Logs operations but doesn't actually store anything.
238#[derive(Clone)]
239pub struct NoOpSyncEngine;
240
241impl SyncEngineRef for NoOpSyncEngine {
242    fn submit(
243        &self,
244        item: SyncItem,
245    ) -> Pin<Box<dyn Future<Output = SyncResult<()>> + Send + '_>> {
246        Box::pin(async move {
247            tracing::debug!(
248                key = %item.object_id,
249                hash = %item.content_hash,
250                len = item.content.len(),
251                version = item.version,
252                "NoOp: would submit item"
253            );
254            Ok(())
255        })
256    }
257
258    fn delete(
259        &self,
260        key: String,
261    ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
262        Box::pin(async move {
263            tracing::debug!(key = %key, "NoOp: would delete item");
264            Ok(true)
265        })
266    }
267
268    fn is_current(
269        &self,
270        key: &str,
271        content_hash: &str,
272    ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
273        let key = key.to_string();
274        let hash = content_hash.to_string();
275        Box::pin(async move {
276            tracing::trace!(key = %key, hash = %hash, "NoOp: is_current check (returning false)");
277            Ok(false) // Always apply in no-op mode
278        })
279    }
280
281    fn get_merkle_root(&self) -> BoxFuture<'_, Option<[u8; 32]>> {
282        Box::pin(async { Ok(None) })
283    }
284
285    fn get_merkle_children(&self, _path: &str) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
286        Box::pin(async { Ok(vec![]) })
287    }
288
289    fn get(&self, _key: &str) -> BoxFuture<'_, Option<Vec<u8>>> {
290        Box::pin(async { Ok(None) })
291    }
292}
293
294#[cfg(test)]
295mod tests {
296    use super::*;
297
298    #[tokio::test]
299    async fn test_noop_sync_engine_submit() {
300        let engine = NoOpSyncEngine;
301        
302        // Submit should succeed
303        let item = SyncItem::new("test.key".to_string(), b"data".to_vec());
304        let result = engine.submit(item).await;
305        assert!(result.is_ok());
306    }
307
308    #[tokio::test]
309    async fn test_noop_sync_engine_submit_large_content() {
310        let engine = NoOpSyncEngine;
311        
312        // Large content should work
313        let large_content = vec![0u8; 1024 * 1024]; // 1MB
314        let item = SyncItem::new("large.key".to_string(), large_content);
315        let result = engine.submit(item).await;
316        assert!(result.is_ok());
317    }
318
319    #[tokio::test]
320    async fn test_noop_sync_engine_delete() {
321        let engine = NoOpSyncEngine;
322        
323        // Delete should succeed and return true
324        let result = engine.delete("some.key".to_string()).await;
325        assert!(result.is_ok());
326        assert!(result.unwrap()); // NoOp always returns true
327    }
328
329    #[tokio::test]
330    async fn test_noop_sync_engine_is_current() {
331        let engine = NoOpSyncEngine;
332        
333        // is_current should return false (no dedup in noop mode)
334        let result = engine.is_current("test.key", "hash123").await;
335        assert!(result.is_ok());
336        assert!(!result.unwrap()); // Always false in noop mode
337    }
338
339    #[tokio::test]
340    async fn test_noop_sync_engine_is_current_various_keys() {
341        let engine = NoOpSyncEngine;
342        
343        // All keys should return false
344        assert!(!engine.is_current("key1", "hash1").await.unwrap());
345        assert!(!engine.is_current("key2", "hash2").await.unwrap());
346        assert!(!engine.is_current("", "").await.unwrap());
347    }
348
349    #[tokio::test]
350    async fn test_noop_sync_engine_get_merkle_root() {
351        let engine = NoOpSyncEngine;
352        
353        // Should return None
354        let result = engine.get_merkle_root().await;
355        assert!(result.is_ok());
356        assert!(result.unwrap().is_none());
357    }
358
359    #[tokio::test]
360    async fn test_noop_sync_engine_get_merkle_children() {
361        let engine = NoOpSyncEngine;
362        
363        // Should return empty vec
364        let result = engine.get_merkle_children("some/path").await;
365        assert!(result.is_ok());
366        assert!(result.unwrap().is_empty());
367
368        // Even for empty path
369        let result = engine.get_merkle_children("").await;
370        assert!(result.is_ok());
371        assert!(result.unwrap().is_empty());
372    }
373
374    #[tokio::test]
375    async fn test_noop_sync_engine_get() {
376        let engine = NoOpSyncEngine;
377        
378        // Should return None
379        let result = engine.get("some.key").await;
380        assert!(result.is_ok());
381        assert!(result.unwrap().is_none());
382    }
383
384    #[test]
385    fn test_sync_error_display() {
386        let error = SyncError("test error message".to_string());
387        assert_eq!(format!("{}", error), "test error message");
388    }
389
390    #[test]
391    fn test_sync_error_debug() {
392        let error = SyncError("debug message".to_string());
393        let debug = format!("{:?}", error);
394        assert!(debug.contains("debug message"));
395    }
396
397    #[test]
398    fn test_sync_error_is_error() {
399        let error = SyncError("error".to_string());
400        // Verify it implements std::error::Error
401        let _: &dyn std::error::Error = &error;
402    }
403
404    #[test]
405    fn test_noop_sync_engine_clone() {
406        let engine = NoOpSyncEngine;
407        let _cloned = engine.clone();
408        // Just verify Clone works
409    }
410
411    #[test]
412    fn test_sync_error_clone() {
413        let error = SyncError("original".to_string());
414        let cloned = error.clone();
415        assert_eq!(error.0, cloned.0);
416    }
417}