replication_engine/
sync_engine.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Sync engine integration traits.
5//!
6//! Defines the interface for integrating with a sync/storage backend.
7//! Uses `sync_engine::SyncItem` directly for tight integration with the sync-engine crate.
8//!
9//! # Example
10//!
11//! ```rust,no_run
12//! use replication_engine::sync_engine::{SyncEngineRef, SyncResult, SyncError, BoxFuture};
13//! use sync_engine::SyncItem;
14//! use std::pin::Pin;
15//! use std::future::Future;
16//!
17//! struct MyBackend { /* ... */ }
18//!
19//! impl SyncEngineRef for MyBackend {
20//!     fn should_accept_writes(&self) -> bool {
21//!         true // Always accept in example
22//!     }
23//!
24//!     fn is_current(&self, _key: &str, _hash: &str) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
25//!         Box::pin(async move { Ok(true) })
26//!     }
27//!
28//!     fn submit(&self, item: SyncItem) -> Pin<Box<dyn Future<Output = SyncResult<()>> + Send + '_>> {
29//!         Box::pin(async move { Ok(()) })
30//!     }
31//!
32//!     fn delete(&self, _key: String) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
33//!         Box::pin(async move { Ok(true) })
34//!     }
35//!
36//!     fn delete_replicated(&self, _key: String) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
37//!         Box::pin(async move { Ok(true) })
38//!     }
39//!
40//!     fn get_merkle_root(&self) -> BoxFuture<'_, Option<[u8; 32]>> {
41//!         Box::pin(async move { Ok(None) })
42//!     }
43//!
44//!     fn get_merkle_children(&self, _path: &str) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
45//!         Box::pin(async move { Ok(vec![]) })
46//!     }
47//!
48//!     fn get(&self, _key: &str) -> BoxFuture<'_, Option<Vec<u8>>> {
49//!         Box::pin(async move { Ok(None) })
50//!     }
51//! }
52//! ```
53
54use std::future::Future;
55use std::pin::Pin;
56
57// Re-export SyncItem and BackpressureLevel for convenience
58pub use sync_engine::{BackpressureLevel, SyncItem};
59
60/// Result type for sync engine operations.
61pub type SyncResult<T> = std::result::Result<T, SyncError>;
62
63/// Type alias for boxed async futures (reduces trait signature complexity).
64pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = SyncResult<T>> + Send + 'a>>;
65
66/// Simplified error for sync engine operations.
67#[derive(Debug, Clone)]
68pub struct SyncError(pub String);
69
70impl std::fmt::Display for SyncError {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        write!(f, "{}", self.0)
73    }
74}
75
76impl std::error::Error for SyncError {}
77
78/// Trait defining what we need from sync-engine.
79///
80/// The daemon provides an implementation of this trait, allowing us to:
81/// 1. Write replicated data (`submit`)
82/// 2. Check for duplicates (`is_current`)
83/// 3. Query Merkle tree for cold path repair
84///
85/// This trait allows testing with mocks and decouples us from sync-engine internals.
86pub trait SyncEngineRef: Send + Sync + 'static {
87    /// Check if the sync-engine is accepting writes (backpressure check).
88    ///
89    /// Returns `false` when the engine is under critical pressure (>= 90% memory).
90    /// Callers should pause ingestion when this returns `false` to avoid
91    /// wasting CPU on events that will be rejected.
92    ///
93    /// Default implementation returns `true` (always accept).
94    fn should_accept_writes(&self) -> bool {
95        true
96    }
97
98    /// Submit an item to the local sync-engine.
99    ///
100    /// This is how we write replicated data from peers.
101    /// The `SyncItem` contains all necessary fields (key, content, hash, version).
102    fn submit(
103        &self,
104        item: SyncItem,
105    ) -> Pin<Box<dyn Future<Output = SyncResult<()>> + Send + '_>>;
106
107    /// Delete an item from the local sync-engine.
108    /// 
109    /// Note: This emits CDC events. For replicated deletes, use `delete_replicated()`.
110    fn delete(
111        &self,
112        key: String,
113    ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>>;
114
115    /// Delete a replicated item (does NOT emit CDC events).
116    /// 
117    /// Use this for items received via replication to prevent CDC loops.
118    fn delete_replicated(
119        &self,
120        key: String,
121    ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>>;
122
123    /// Check if we already have content with this hash.
124    ///
125    /// Returns `true` if the item exists AND its content hash matches.
126    /// Used for CDC deduplication (loop prevention).
127    fn is_current(
128        &self,
129        key: &str,
130        content_hash: &str,
131    ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>>;
132
133    /// Get the Merkle root hash (for cold path comparison).
134    fn get_merkle_root(&self) -> BoxFuture<'_, Option<[u8; 32]>>;
135
136    /// Get children of a Merkle path (for cold path drill-down).
137    fn get_merkle_children(&self, path: &str) -> BoxFuture<'_, Vec<(String, [u8; 32])>>;
138
139    /// Fetch an item by key (for cold path repair).
140    fn get(&self, key: &str) -> BoxFuture<'_, Option<Vec<u8>>>;
141    
142    /// Get clean branches (no pending merkle recalcs) with their hashes.
143    ///
144    /// Returns branches that are safe to compare with peers.
145    /// Branches with pending writes should be skipped during cold path sync.
146    fn get_clean_branches(&self) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
147        // Default: return empty (no branch hygiene info available)
148        Box::pin(async { Ok(Vec::new()) })
149    }
150    
151    /// Check if a specific branch is clean (no pending merkle recalcs).
152    ///
153    /// Returns `true` if the branch has no dirty items and is safe to compare.
154    fn is_branch_clean(&self, _prefix: &str) -> BoxFuture<'_, bool> {
155        // Default: assume clean (conservative for mocks)
156        Box::pin(async { Ok(true) })
157    }
158}
159
160/// Implementation of SyncEngineRef for the real SyncEngine.
161///
162/// This allows the replication engine to drive the real storage backend directly.
163impl SyncEngineRef for sync_engine::SyncEngine {
164    fn should_accept_writes(&self) -> bool {
165        self.pressure().should_accept_writes()
166    }
167
168    fn submit(
169        &self,
170        item: SyncItem,
171    ) -> Pin<Box<dyn Future<Output = SyncResult<()>> + Send + '_>> {
172        Box::pin(async move {
173            self.submit(item).await.map_err(|e| SyncError(e.to_string()))
174        })
175    }
176
177    fn delete(
178        &self,
179        key: String,
180    ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
181        Box::pin(async move {
182            self.delete(&key).await.map_err(|e| SyncError(e.to_string()))
183        })
184    }
185
186    fn delete_replicated(
187        &self,
188        key: String,
189    ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
190        Box::pin(async move {
191            self.delete_replicated(&key).await.map_err(|e| SyncError(e.to_string()))
192        })
193    }
194
195    fn is_current(
196        &self,
197        key: &str,
198        content_hash: &str,
199    ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
200        let key = key.to_string();
201        let hash = content_hash.to_string();
202        Box::pin(async move {
203            // Check if we have the item and the hash matches
204            match self.get_verified(&key).await {
205                Ok(Some(item)) => Ok(item.content_hash == hash),
206                Ok(None) => Ok(false),
207                Err(_) => Ok(false), // If corrupt or error, assume not current to force re-sync
208            }
209        })
210    }
211
212    fn get_merkle_root(&self) -> BoxFuture<'_, Option<[u8; 32]>> {
213        Box::pin(async move {
214            Ok(self.get_merkle_root().await.ok().flatten())
215        })
216    }
217
218    fn get_merkle_children(&self, path: &str) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
219        let path = path.to_string();
220        Box::pin(async move {
221            Ok(match self.get_merkle_children(&path).await {
222                Ok(children_map) => children_map.into_iter().collect(),
223                Err(_) => Vec::new(),
224            })
225        })
226    }
227
228    fn get(&self, key: &str) -> BoxFuture<'_, Option<Vec<u8>>> {
229        let key = key.to_string();
230        Box::pin(async move {
231            Ok(match self.get(&key).await {
232                Ok(Some(item)) => Some(item.content),
233                _ => None,
234            })
235        })
236    }
237    
238    fn get_clean_branches(&self) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
239        Box::pin(async move {
240            Ok(match self.get_clean_branches().await {
241                Ok(branches_map) => branches_map.into_iter().collect(),
242                Err(_) => Vec::new(),
243            })
244        })
245    }
246    
247    fn is_branch_clean(&self, prefix: &str) -> BoxFuture<'_, bool> {
248        let prefix = prefix.to_string();
249        Box::pin(async move {
250            Ok(match self.branch_dirty_count(&prefix).await {
251                Ok(count) => count == 0,
252                Err(_) => false, // Assume dirty on error (conservative)
253            })
254        })
255    }
256}
257
258/// A no-op implementation for testing/standalone mode.
259///
260/// Logs operations but doesn't actually store anything.
261#[derive(Clone)]
262pub struct NoOpSyncEngine;
263
264impl SyncEngineRef for NoOpSyncEngine {
265    fn submit(
266        &self,
267        item: SyncItem,
268    ) -> Pin<Box<dyn Future<Output = SyncResult<()>> + Send + '_>> {
269        Box::pin(async move {
270            tracing::debug!(
271                key = %item.object_id,
272                hash = %item.content_hash,
273                len = item.content.len(),
274                version = item.version,
275                "NoOp: would submit item"
276            );
277            Ok(())
278        })
279    }
280
281    fn delete(
282        &self,
283        key: String,
284    ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
285        Box::pin(async move {
286            tracing::debug!(key = %key, "NoOp: would delete item");
287            Ok(true)
288        })
289    }
290
291    fn delete_replicated(
292        &self,
293        key: String,
294    ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
295        Box::pin(async move {
296            tracing::debug!(key = %key, "NoOp: would delete replicated item");
297            Ok(true)
298        })
299    }
300
301    fn is_current(
302        &self,
303        key: &str,
304        content_hash: &str,
305    ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
306        let key = key.to_string();
307        let hash = content_hash.to_string();
308        Box::pin(async move {
309            tracing::trace!(key = %key, hash = %hash, "NoOp: is_current check (returning false)");
310            Ok(false) // Always apply in no-op mode
311        })
312    }
313
314    fn get_merkle_root(&self) -> BoxFuture<'_, Option<[u8; 32]>> {
315        Box::pin(async { Ok(None) })
316    }
317
318    fn get_merkle_children(&self, _path: &str) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
319        Box::pin(async { Ok(vec![]) })
320    }
321
322    fn get(&self, _key: &str) -> BoxFuture<'_, Option<Vec<u8>>> {
323        Box::pin(async { Ok(None) })
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330
331    #[tokio::test]
332    async fn test_noop_sync_engine_submit() {
333        let engine = NoOpSyncEngine;
334        
335        // Submit should succeed
336        let item = SyncItem::new("test.key".to_string(), b"data".to_vec());
337        let result = engine.submit(item).await;
338        assert!(result.is_ok());
339    }
340
341    #[tokio::test]
342    async fn test_noop_sync_engine_submit_large_content() {
343        let engine = NoOpSyncEngine;
344        
345        // Large content should work
346        let large_content = vec![0u8; 1024 * 1024]; // 1MB
347        let item = SyncItem::new("large.key".to_string(), large_content);
348        let result = engine.submit(item).await;
349        assert!(result.is_ok());
350    }
351
352    #[tokio::test]
353    async fn test_noop_sync_engine_delete() {
354        let engine = NoOpSyncEngine;
355        
356        // Delete should succeed and return true
357        let result = engine.delete("some.key".to_string()).await;
358        assert!(result.is_ok());
359        assert!(result.unwrap()); // NoOp always returns true
360    }
361
362    #[tokio::test]
363    async fn test_noop_sync_engine_is_current() {
364        let engine = NoOpSyncEngine;
365        
366        // is_current should return false (no dedup in noop mode)
367        let result = engine.is_current("test.key", "hash123").await;
368        assert!(result.is_ok());
369        assert!(!result.unwrap()); // Always false in noop mode
370    }
371
372    #[tokio::test]
373    async fn test_noop_sync_engine_is_current_various_keys() {
374        let engine = NoOpSyncEngine;
375        
376        // All keys should return false
377        assert!(!engine.is_current("key1", "hash1").await.unwrap());
378        assert!(!engine.is_current("key2", "hash2").await.unwrap());
379        assert!(!engine.is_current("", "").await.unwrap());
380    }
381
382    #[tokio::test]
383    async fn test_noop_sync_engine_get_merkle_root() {
384        let engine = NoOpSyncEngine;
385        
386        // Should return None
387        let result = engine.get_merkle_root().await;
388        assert!(result.is_ok());
389        assert!(result.unwrap().is_none());
390    }
391
392    #[tokio::test]
393    async fn test_noop_sync_engine_get_merkle_children() {
394        let engine = NoOpSyncEngine;
395        
396        // Should return empty vec
397        let result = engine.get_merkle_children("some/path").await;
398        assert!(result.is_ok());
399        assert!(result.unwrap().is_empty());
400
401        // Even for empty path
402        let result = engine.get_merkle_children("").await;
403        assert!(result.is_ok());
404        assert!(result.unwrap().is_empty());
405    }
406
407    #[tokio::test]
408    async fn test_noop_sync_engine_get() {
409        let engine = NoOpSyncEngine;
410        
411        // Should return None
412        let result = engine.get("some.key").await;
413        assert!(result.is_ok());
414        assert!(result.unwrap().is_none());
415    }
416
417    #[test]
418    fn test_sync_error_display() {
419        let error = SyncError("test error message".to_string());
420        assert_eq!(format!("{}", error), "test error message");
421    }
422
423    #[test]
424    fn test_sync_error_debug() {
425        let error = SyncError("debug message".to_string());
426        let debug = format!("{:?}", error);
427        assert!(debug.contains("debug message"));
428    }
429
430    #[test]
431    fn test_sync_error_is_error() {
432        let error = SyncError("error".to_string());
433        // Verify it implements std::error::Error
434        let _: &dyn std::error::Error = &error;
435    }
436
437    #[test]
438    fn test_noop_sync_engine_clone() {
439        let engine = NoOpSyncEngine;
440        let _cloned = engine.clone();
441        // Just verify Clone works
442    }
443
444    #[test]
445    fn test_sync_error_clone() {
446        let error = SyncError("original".to_string());
447        let cloned = error.clone();
448        assert_eq!(error.0, cloned.0);
449    }
450}