replication_engine/
sync_engine.rs1use std::future::Future;
51use std::pin::Pin;
52
53pub use sync_engine::{BackpressureLevel, SyncItem};
55
56pub type SyncResult<T> = std::result::Result<T, SyncError>;
58
59pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = SyncResult<T>> + Send + 'a>>;
61
62#[derive(Debug, Clone)]
64pub struct SyncError(pub String);
65
66impl std::fmt::Display for SyncError {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 write!(f, "{}", self.0)
69 }
70}
71
72impl std::error::Error for SyncError {}
73
74pub trait SyncEngineRef: Send + Sync + 'static {
83 fn should_accept_writes(&self) -> bool {
91 true
92 }
93
94 fn submit(
99 &self,
100 item: SyncItem,
101 ) -> Pin<Box<dyn Future<Output = SyncResult<()>> + Send + '_>>;
102
103 fn delete(
105 &self,
106 key: String,
107 ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>>;
108
109 fn is_current(
114 &self,
115 key: &str,
116 content_hash: &str,
117 ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>>;
118
119 fn get_merkle_root(&self) -> BoxFuture<'_, Option<[u8; 32]>>;
121
122 fn get_merkle_children(&self, path: &str) -> BoxFuture<'_, Vec<(String, [u8; 32])>>;
124
125 fn get(&self, key: &str) -> BoxFuture<'_, Option<Vec<u8>>>;
127
128 fn get_clean_branches(&self) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
133 Box::pin(async { Ok(Vec::new()) })
135 }
136
137 fn is_branch_clean(&self, _prefix: &str) -> BoxFuture<'_, bool> {
141 Box::pin(async { Ok(true) })
143 }
144}
145
146impl SyncEngineRef for sync_engine::SyncEngine {
150 fn should_accept_writes(&self) -> bool {
151 self.pressure().should_accept_writes()
152 }
153
154 fn submit(
155 &self,
156 item: SyncItem,
157 ) -> Pin<Box<dyn Future<Output = SyncResult<()>> + Send + '_>> {
158 Box::pin(async move {
159 self.submit(item).await.map_err(|e| SyncError(e.to_string()))
160 })
161 }
162
163 fn delete(
164 &self,
165 key: String,
166 ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
167 Box::pin(async move {
168 self.delete(&key).await.map_err(|e| SyncError(e.to_string()))
169 })
170 }
171
172 fn is_current(
173 &self,
174 key: &str,
175 content_hash: &str,
176 ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
177 let key = key.to_string();
178 let hash = content_hash.to_string();
179 Box::pin(async move {
180 match self.get_verified(&key).await {
182 Ok(Some(item)) => Ok(item.content_hash == hash),
183 Ok(None) => Ok(false),
184 Err(_) => Ok(false), }
186 })
187 }
188
189 fn get_merkle_root(&self) -> BoxFuture<'_, Option<[u8; 32]>> {
190 Box::pin(async move {
191 Ok(self.get_merkle_root().await.ok().flatten())
192 })
193 }
194
195 fn get_merkle_children(&self, path: &str) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
196 let path = path.to_string();
197 Box::pin(async move {
198 Ok(match self.get_merkle_children(&path).await {
199 Ok(children_map) => children_map.into_iter().collect(),
200 Err(_) => Vec::new(),
201 })
202 })
203 }
204
205 fn get(&self, key: &str) -> BoxFuture<'_, Option<Vec<u8>>> {
206 let key = key.to_string();
207 Box::pin(async move {
208 Ok(match self.get(&key).await {
209 Ok(Some(item)) => Some(item.content),
210 _ => None,
211 })
212 })
213 }
214
215 fn get_clean_branches(&self) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
216 Box::pin(async move {
217 Ok(match self.get_clean_branches().await {
218 Ok(branches_map) => branches_map.into_iter().collect(),
219 Err(_) => Vec::new(),
220 })
221 })
222 }
223
224 fn is_branch_clean(&self, prefix: &str) -> BoxFuture<'_, bool> {
225 let prefix = prefix.to_string();
226 Box::pin(async move {
227 Ok(match self.branch_dirty_count(&prefix).await {
228 Ok(count) => count == 0,
229 Err(_) => false, })
231 })
232 }
233}
234
235#[derive(Clone)]
239pub struct NoOpSyncEngine;
240
241impl SyncEngineRef for NoOpSyncEngine {
242 fn submit(
243 &self,
244 item: SyncItem,
245 ) -> Pin<Box<dyn Future<Output = SyncResult<()>> + Send + '_>> {
246 Box::pin(async move {
247 tracing::debug!(
248 key = %item.object_id,
249 hash = %item.content_hash,
250 len = item.content.len(),
251 version = item.version,
252 "NoOp: would submit item"
253 );
254 Ok(())
255 })
256 }
257
258 fn delete(
259 &self,
260 key: String,
261 ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
262 Box::pin(async move {
263 tracing::debug!(key = %key, "NoOp: would delete item");
264 Ok(true)
265 })
266 }
267
268 fn is_current(
269 &self,
270 key: &str,
271 content_hash: &str,
272 ) -> Pin<Box<dyn Future<Output = SyncResult<bool>> + Send + '_>> {
273 let key = key.to_string();
274 let hash = content_hash.to_string();
275 Box::pin(async move {
276 tracing::trace!(key = %key, hash = %hash, "NoOp: is_current check (returning false)");
277 Ok(false) })
279 }
280
281 fn get_merkle_root(&self) -> BoxFuture<'_, Option<[u8; 32]>> {
282 Box::pin(async { Ok(None) })
283 }
284
285 fn get_merkle_children(&self, _path: &str) -> BoxFuture<'_, Vec<(String, [u8; 32])>> {
286 Box::pin(async { Ok(vec![]) })
287 }
288
289 fn get(&self, _key: &str) -> BoxFuture<'_, Option<Vec<u8>>> {
290 Box::pin(async { Ok(None) })
291 }
292}
293
294#[cfg(test)]
295mod tests {
296 use super::*;
297
298 #[tokio::test]
299 async fn test_noop_sync_engine_submit() {
300 let engine = NoOpSyncEngine;
301
302 let item = SyncItem::new("test.key".to_string(), b"data".to_vec());
304 let result = engine.submit(item).await;
305 assert!(result.is_ok());
306 }
307
308 #[tokio::test]
309 async fn test_noop_sync_engine_submit_large_content() {
310 let engine = NoOpSyncEngine;
311
312 let large_content = vec![0u8; 1024 * 1024]; let item = SyncItem::new("large.key".to_string(), large_content);
315 let result = engine.submit(item).await;
316 assert!(result.is_ok());
317 }
318
319 #[tokio::test]
320 async fn test_noop_sync_engine_delete() {
321 let engine = NoOpSyncEngine;
322
323 let result = engine.delete("some.key".to_string()).await;
325 assert!(result.is_ok());
326 assert!(result.unwrap()); }
328
329 #[tokio::test]
330 async fn test_noop_sync_engine_is_current() {
331 let engine = NoOpSyncEngine;
332
333 let result = engine.is_current("test.key", "hash123").await;
335 assert!(result.is_ok());
336 assert!(!result.unwrap()); }
338
339 #[tokio::test]
340 async fn test_noop_sync_engine_is_current_various_keys() {
341 let engine = NoOpSyncEngine;
342
343 assert!(!engine.is_current("key1", "hash1").await.unwrap());
345 assert!(!engine.is_current("key2", "hash2").await.unwrap());
346 assert!(!engine.is_current("", "").await.unwrap());
347 }
348
349 #[tokio::test]
350 async fn test_noop_sync_engine_get_merkle_root() {
351 let engine = NoOpSyncEngine;
352
353 let result = engine.get_merkle_root().await;
355 assert!(result.is_ok());
356 assert!(result.unwrap().is_none());
357 }
358
359 #[tokio::test]
360 async fn test_noop_sync_engine_get_merkle_children() {
361 let engine = NoOpSyncEngine;
362
363 let result = engine.get_merkle_children("some/path").await;
365 assert!(result.is_ok());
366 assert!(result.unwrap().is_empty());
367
368 let result = engine.get_merkle_children("").await;
370 assert!(result.is_ok());
371 assert!(result.unwrap().is_empty());
372 }
373
374 #[tokio::test]
375 async fn test_noop_sync_engine_get() {
376 let engine = NoOpSyncEngine;
377
378 let result = engine.get("some.key").await;
380 assert!(result.is_ok());
381 assert!(result.unwrap().is_none());
382 }
383
384 #[test]
385 fn test_sync_error_display() {
386 let error = SyncError("test error message".to_string());
387 assert_eq!(format!("{}", error), "test error message");
388 }
389
390 #[test]
391 fn test_sync_error_debug() {
392 let error = SyncError("debug message".to_string());
393 let debug = format!("{:?}", error);
394 assert!(debug.contains("debug message"));
395 }
396
397 #[test]
398 fn test_sync_error_is_error() {
399 let error = SyncError("error".to_string());
400 let _: &dyn std::error::Error = &error;
402 }
403
404 #[test]
405 fn test_noop_sync_engine_clone() {
406 let engine = NoOpSyncEngine;
407 let _cloned = engine.clone();
408 }
410
411 #[test]
412 fn test_sync_error_clone() {
413 let error = SyncError("original".to_string());
414 let cloned = error.clone();
415 assert_eq!(error.0, cloned.0);
416 }
417}