replication_engine/
sync_engine.rs1use std::future::Future;
51use std::pin::Pin;
52
53pub use sync_engine::{BackpressureLevel, SyncItem};
55
56pub type SyncResult<T> = std::result::Result<T, SyncError>;
58
59pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = SyncResult<T>> + Send + 'a>>;
61
62#[derive(Debug, Clone)]
64pub struct SyncError(pub String);
65
66impl std::fmt::Display for SyncError {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 write!(f, "{}", self.0)
69 }
70}
71
72impl std::error::Error for SyncError {}
73
74pub trait SyncEngineRef: Send + Sync + 'static {
83 fn should_accept_writes(&self) -> bool {
91 true
92 }
93
94 fn submit(
99 &self,
100 item: SyncItem,
101 ) -> Pin<Box<dyn Future<Output = SyncResult<()>> + Send + '_>>;
102
103 fn delete(
105 &self,
106 key: String,
107 ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>>;
108
109 fn is_current(
114 &self,
115 key: &str,
116 content_hash: &str,
117 ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>>;
118
119 fn get_merkle_root(&self) -> BoxFuture<'_, Option<[u8; 32]>>;
121
122 fn get_merkle_children(&self, path: &str) -> BoxFuture<'_, Vec<(String, [u8; 32])>>;
124
125 fn get(&self, key: &str) -> BoxFuture<'_, Option<Vec<u8>>>;
127}
128
129impl SyncEngineRef for sync_engine::SyncEngine {
133 fn should_accept_writes(&self) -> bool {
134 self.pressure().should_accept_writes()
135 }
136
137 fn submit(
138 &self,
139 item: SyncItem,
140 ) -> Pin<Box<dyn Future<Output = SyncResult<()>> + Send + '_>> {
141 Box::pin(async move {
142 self.submit(item).await.map_err(|e| SyncError(e.to_string()))
143 })
144 }
145
146 fn delete(
147 &self,
148 key: String,
149 ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
150 Box::pin(async move {
151 self.delete(&key).await.map_err(|e| SyncError(e.to_string()))
152 })
153 }
154
155 fn is_current(
156 &self,
157 key: &str,
158 content_hash: &str,
159 ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
160 let key = key.to_string();
161 let hash = content_hash.to_string();
162 Box::pin(async move {
163 match self.get_verified(&key).await {
165 Ok(Some(item)) => Ok(item.content_hash == hash),
166 Ok(None) => Ok(false),
167 Err(_) => Ok(false), }
169 })
170 }
171
172 fn get_merkle_root(&self) -> BoxFuture<'_, Option<[u8; 32]>> {
173 Box::pin(async move {
174 self.get_merkle_root().await.map_err(|e| SyncError(e.to_string()))
175 })
176 }
177
178 fn get_merkle_children(&self, path: &str) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
179 let path = path.to_string();
180 Box::pin(async move {
181 let children_map = self.get_merkle_children(&path).await.map_err(|e| SyncError(e.to_string()))?;
182 Ok(children_map.into_iter().collect())
184 })
185 }
186
187 fn get(&self, key: &str) -> BoxFuture<'_, Option<Vec<u8>>> {
188 let key = key.to_string();
189 Box::pin(async move {
190 match self.get(&key).await {
191 Ok(Some(item)) => Ok(Some(item.content)),
192 Ok(None) => Ok(None),
193 Err(e) => Err(SyncError(e.to_string())),
194 }
195 })
196 }
197}
198
199#[derive(Clone)]
203pub struct NoOpSyncEngine;
204
205impl SyncEngineRef for NoOpSyncEngine {
206 fn submit(
207 &self,
208 item: SyncItem,
209 ) -> Pin<Box<dyn Future<Output = SyncResult<()>> + Send + '_>> {
210 Box::pin(async move {
211 tracing::debug!(
212 key = %item.object_id,
213 hash = %item.content_hash,
214 len = item.content.len(),
215 version = item.version,
216 "NoOp: would submit item"
217 );
218 Ok(())
219 })
220 }
221
222 fn delete(
223 &self,
224 key: String,
225 ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
226 Box::pin(async move {
227 tracing::debug!(key = %key, "NoOp: would delete item");
228 Ok(true)
229 })
230 }
231
232 fn is_current(
233 &self,
234 key: &str,
235 content_hash: &str,
236 ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
237 let key = key.to_string();
238 let hash = content_hash.to_string();
239 Box::pin(async move {
240 tracing::trace!(key = %key, hash = %hash, "NoOp: is_current check (returning false)");
241 Ok(false) })
243 }
244
245 fn get_merkle_root(&self) -> BoxFuture<'_, Option<[u8; 32]>> {
246 Box::pin(async { Ok(None) })
247 }
248
249 fn get_merkle_children(&self, _path: &str) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
250 Box::pin(async { Ok(vec![]) })
251 }
252
253 fn get(&self, _key: &str) -> BoxFuture<'_, Option<Vec<u8>>> {
254 Box::pin(async { Ok(None) })
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261
262 #[tokio::test]
263 async fn test_noop_sync_engine_submit() {
264 let engine = NoOpSyncEngine;
265
266 let item = SyncItem::new("test.key".to_string(), b"data".to_vec());
268 let result = engine.submit(item).await;
269 assert!(result.is_ok());
270 }
271
272 #[tokio::test]
273 async fn test_noop_sync_engine_submit_large_content() {
274 let engine = NoOpSyncEngine;
275
276 let large_content = vec![0u8; 1024 * 1024]; let item = SyncItem::new("large.key".to_string(), large_content);
279 let result = engine.submit(item).await;
280 assert!(result.is_ok());
281 }
282
283 #[tokio::test]
284 async fn test_noop_sync_engine_delete() {
285 let engine = NoOpSyncEngine;
286
287 let result = engine.delete("some.key".to_string()).await;
289 assert!(result.is_ok());
290 assert!(result.unwrap()); }
292
293 #[tokio::test]
294 async fn test_noop_sync_engine_is_current() {
295 let engine = NoOpSyncEngine;
296
297 let result = engine.is_current("test.key", "hash123").await;
299 assert!(result.is_ok());
300 assert!(!result.unwrap()); }
302
303 #[tokio::test]
304 async fn test_noop_sync_engine_is_current_various_keys() {
305 let engine = NoOpSyncEngine;
306
307 assert!(!engine.is_current("key1", "hash1").await.unwrap());
309 assert!(!engine.is_current("key2", "hash2").await.unwrap());
310 assert!(!engine.is_current("", "").await.unwrap());
311 }
312
313 #[tokio::test]
314 async fn test_noop_sync_engine_get_merkle_root() {
315 let engine = NoOpSyncEngine;
316
317 let result = engine.get_merkle_root().await;
319 assert!(result.is_ok());
320 assert!(result.unwrap().is_none());
321 }
322
323 #[tokio::test]
324 async fn test_noop_sync_engine_get_merkle_children() {
325 let engine = NoOpSyncEngine;
326
327 let result = engine.get_merkle_children("some/path").await;
329 assert!(result.is_ok());
330 assert!(result.unwrap().is_empty());
331
332 let result = engine.get_merkle_children("").await;
334 assert!(result.is_ok());
335 assert!(result.unwrap().is_empty());
336 }
337
338 #[tokio::test]
339 async fn test_noop_sync_engine_get() {
340 let engine = NoOpSyncEngine;
341
342 let result = engine.get("some.key").await;
344 assert!(result.is_ok());
345 assert!(result.unwrap().is_none());
346 }
347
348 #[test]
349 fn test_sync_error_display() {
350 let error = SyncError("test error message".to_string());
351 assert_eq!(format!("{}", error), "test error message");
352 }
353
354 #[test]
355 fn test_sync_error_debug() {
356 let error = SyncError("debug message".to_string());
357 let debug = format!("{:?}", error);
358 assert!(debug.contains("debug message"));
359 }
360
361 #[test]
362 fn test_sync_error_is_error() {
363 let error = SyncError("error".to_string());
364 let _: &dyn std::error::Error = &error;
366 }
367
368 #[test]
369 fn test_noop_sync_engine_clone() {
370 let engine = NoOpSyncEngine;
371 let _cloned = engine.clone();
372 }
374
375 #[test]
376 fn test_sync_error_clone() {
377 let error = SyncError("original".to_string());
378 let cloned = error.clone();
379 assert_eq!(error.0, cloned.0);
380 }
381}