1use std::future::Future;
55use std::pin::Pin;
56
57pub use sync_engine::{BackpressureLevel, SyncItem};
59
60pub type SyncResult<T> = std::result::Result<T, SyncError>;
62
63pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = SyncResult<T>> + Send + 'a>>;
65
66#[derive(Debug, Clone)]
68pub struct SyncError(pub String);
69
70impl std::fmt::Display for SyncError {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 write!(f, "{}", self.0)
73 }
74}
75
76impl std::error::Error for SyncError {}
77
78pub trait SyncEngineRef: Send + Sync + 'static {
87 fn should_accept_writes(&self) -> bool {
95 true
96 }
97
98 fn submit(
103 &self,
104 item: SyncItem,
105 ) -> Pin<Box<dyn Future<Output = SyncResult<()>> + Send + '_>>;
106
107 fn delete(
111 &self,
112 key: String,
113 ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>>;
114
115 fn delete_replicated(
119 &self,
120 key: String,
121 ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>>;
122
123 fn is_current(
128 &self,
129 key: &str,
130 content_hash: &str,
131 ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>>;
132
133 fn get_merkle_root(&self) -> BoxFuture<'_, Option<[u8; 32]>>;
135
136 fn get_merkle_children(&self, path: &str) -> BoxFuture<'_, Vec<(String, [u8; 32])>>;
138
139 fn get(&self, key: &str) -> BoxFuture<'_, Option<Vec<u8>>>;
141
142 fn get_clean_branches(&self) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
147 Box::pin(async { Ok(Vec::new()) })
149 }
150
151 fn is_branch_clean(&self, _prefix: &str) -> BoxFuture<'_, bool> {
155 Box::pin(async { Ok(true) })
157 }
158}
159
160impl SyncEngineRef for sync_engine::SyncEngine {
164 fn should_accept_writes(&self) -> bool {
165 self.pressure().should_accept_writes()
166 }
167
168 fn submit(
169 &self,
170 item: SyncItem,
171 ) -> Pin<Box<dyn Future<Output = SyncResult<()>> + Send + '_>> {
172 Box::pin(async move {
173 self.submit(item).await.map_err(|e| SyncError(e.to_string()))
174 })
175 }
176
177 fn delete(
178 &self,
179 key: String,
180 ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
181 Box::pin(async move {
182 self.delete(&key).await.map_err(|e| SyncError(e.to_string()))
183 })
184 }
185
186 fn delete_replicated(
187 &self,
188 key: String,
189 ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
190 Box::pin(async move {
191 self.delete_replicated(&key).await.map_err(|e| SyncError(e.to_string()))
192 })
193 }
194
195 fn is_current(
196 &self,
197 key: &str,
198 content_hash: &str,
199 ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
200 let key = key.to_string();
201 let hash = content_hash.to_string();
202 Box::pin(async move {
203 match self.get_verified(&key).await {
205 Ok(Some(item)) => Ok(item.content_hash == hash),
206 Ok(None) => Ok(false),
207 Err(_) => Ok(false), }
209 })
210 }
211
212 fn get_merkle_root(&self) -> BoxFuture<'_, Option<[u8; 32]>> {
213 Box::pin(async move {
214 Ok(self.get_merkle_root().await.ok().flatten())
215 })
216 }
217
218 fn get_merkle_children(&self, path: &str) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
219 let path = path.to_string();
220 Box::pin(async move {
221 Ok(match self.get_merkle_children(&path).await {
222 Ok(children_map) => children_map.into_iter().collect(),
223 Err(_) => Vec::new(),
224 })
225 })
226 }
227
228 fn get(&self, key: &str) -> BoxFuture<'_, Option<Vec<u8>>> {
229 let key = key.to_string();
230 Box::pin(async move {
231 Ok(match self.get(&key).await {
232 Ok(Some(item)) => Some(item.content),
233 _ => None,
234 })
235 })
236 }
237
238 fn get_clean_branches(&self) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
239 Box::pin(async move {
240 Ok(match self.get_clean_branches().await {
241 Ok(branches_map) => branches_map.into_iter().collect(),
242 Err(_) => Vec::new(),
243 })
244 })
245 }
246
247 fn is_branch_clean(&self, prefix: &str) -> BoxFuture<'_, bool> {
248 let prefix = prefix.to_string();
249 Box::pin(async move {
250 Ok(match self.branch_dirty_count(&prefix).await {
251 Ok(count) => count == 0,
252 Err(_) => false, })
254 })
255 }
256}
257
258#[derive(Clone)]
262pub struct NoOpSyncEngine;
263
264impl SyncEngineRef for NoOpSyncEngine {
265 fn submit(
266 &self,
267 item: SyncItem,
268 ) -> Pin<Box<dyn Future<Output = SyncResult<()>> + Send + '_>> {
269 Box::pin(async move {
270 tracing::debug!(
271 key = %item.object_id,
272 hash = %item.content_hash,
273 len = item.content.len(),
274 version = item.version,
275 "NoOp: would submit item"
276 );
277 Ok(())
278 })
279 }
280
281 fn delete(
282 &self,
283 key: String,
284 ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
285 Box::pin(async move {
286 tracing::debug!(key = %key, "NoOp: would delete item");
287 Ok(true)
288 })
289 }
290
291 fn delete_replicated(
292 &self,
293 key: String,
294 ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
295 Box::pin(async move {
296 tracing::debug!(key = %key, "NoOp: would delete replicated item");
297 Ok(true)
298 })
299 }
300
301 fn is_current(
302 &self,
303 key: &str,
304 content_hash: &str,
305 ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
306 let key = key.to_string();
307 let hash = content_hash.to_string();
308 Box::pin(async move {
309 tracing::trace!(key = %key, hash = %hash, "NoOp: is_current check (returning false)");
310 Ok(false) })
312 }
313
314 fn get_merkle_root(&self) -> BoxFuture<'_, Option<[u8; 32]>> {
315 Box::pin(async { Ok(None) })
316 }
317
318 fn get_merkle_children(&self, _path: &str) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
319 Box::pin(async { Ok(vec![]) })
320 }
321
322 fn get(&self, _key: &str) -> BoxFuture<'_, Option<Vec<u8>>> {
323 Box::pin(async { Ok(None) })
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330
331 #[tokio::test]
332 async fn test_noop_sync_engine_submit() {
333 let engine = NoOpSyncEngine;
334
335 let item = SyncItem::new("test.key".to_string(), b"data".to_vec());
337 let result = engine.submit(item).await;
338 assert!(result.is_ok());
339 }
340
341 #[tokio::test]
342 async fn test_noop_sync_engine_submit_large_content() {
343 let engine = NoOpSyncEngine;
344
345 let large_content = vec![0u8; 1024 * 1024]; let item = SyncItem::new("large.key".to_string(), large_content);
348 let result = engine.submit(item).await;
349 assert!(result.is_ok());
350 }
351
352 #[tokio::test]
353 async fn test_noop_sync_engine_delete() {
354 let engine = NoOpSyncEngine;
355
356 let result = engine.delete("some.key".to_string()).await;
358 assert!(result.is_ok());
359 assert!(result.unwrap()); }
361
362 #[tokio::test]
363 async fn test_noop_sync_engine_is_current() {
364 let engine = NoOpSyncEngine;
365
366 let result = engine.is_current("test.key", "hash123").await;
368 assert!(result.is_ok());
369 assert!(!result.unwrap()); }
371
372 #[tokio::test]
373 async fn test_noop_sync_engine_is_current_various_keys() {
374 let engine = NoOpSyncEngine;
375
376 assert!(!engine.is_current("key1", "hash1").await.unwrap());
378 assert!(!engine.is_current("key2", "hash2").await.unwrap());
379 assert!(!engine.is_current("", "").await.unwrap());
380 }
381
382 #[tokio::test]
383 async fn test_noop_sync_engine_get_merkle_root() {
384 let engine = NoOpSyncEngine;
385
386 let result = engine.get_merkle_root().await;
388 assert!(result.is_ok());
389 assert!(result.unwrap().is_none());
390 }
391
392 #[tokio::test]
393 async fn test_noop_sync_engine_get_merkle_children() {
394 let engine = NoOpSyncEngine;
395
396 let result = engine.get_merkle_children("some/path").await;
398 assert!(result.is_ok());
399 assert!(result.unwrap().is_empty());
400
401 let result = engine.get_merkle_children("").await;
403 assert!(result.is_ok());
404 assert!(result.unwrap().is_empty());
405 }
406
407 #[tokio::test]
408 async fn test_noop_sync_engine_get() {
409 let engine = NoOpSyncEngine;
410
411 let result = engine.get("some.key").await;
413 assert!(result.is_ok());
414 assert!(result.unwrap().is_none());
415 }
416
417 #[test]
418 fn test_sync_error_display() {
419 let error = SyncError("test error message".to_string());
420 assert_eq!(format!("{}", error), "test error message");
421 }
422
423 #[test]
424 fn test_sync_error_debug() {
425 let error = SyncError("debug message".to_string());
426 let debug = format!("{:?}", error);
427 assert!(debug.contains("debug message"));
428 }
429
430 #[test]
431 fn test_sync_error_is_error() {
432 let error = SyncError("error".to_string());
433 let _: &dyn std::error::Error = &error;
435 }
436
437 #[test]
438 fn test_noop_sync_engine_clone() {
439 let engine = NoOpSyncEngine;
440 let _cloned = engine.clone();
441 }
443
444 #[test]
445 fn test_sync_error_clone() {
446 let error = SyncError("original".to_string());
447 let cloned = error.clone();
448 assert_eq!(error.0, cloned.0);
449 }
450}