1use crate::traits::BlockStore;
39use ipfrs_core::{Cid, Error, Result};
40use std::collections::{HashMap, HashSet};
41use std::sync::Arc;
42use std::time::{Duration, Instant};
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum SyncStrategy {
47 Full,
49 Incremental,
51 Bidirectional,
53}
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum ConflictStrategy {
58 KeepSource,
60 KeepTarget,
62 KeepNewer,
64 Fail,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
70pub struct SyncResult {
71 pub blocks_synced: usize,
73 pub bytes_synced: u64,
75 pub conflicts: usize,
77 pub duration: Duration,
79 pub conflicting_cids: Vec<Cid>,
81}
82
83#[derive(Debug, Clone, Default)]
85pub struct ReplicationState {
86 pub last_sync: Option<Instant>,
88 pub last_synced_cids: HashSet<Cid>,
90 pub total_blocks_synced: usize,
92 pub total_bytes_synced: u64,
94}
95
96pub struct Replicator<S: BlockStore, T: BlockStore> {
98 source: Arc<S>,
100 target: Arc<T>,
102 state: parking_lot::RwLock<ReplicationState>,
104}
105
106impl<S: BlockStore, T: BlockStore> Replicator<S, T> {
107 pub fn new(source: Arc<S>, target: Arc<T>) -> Self {
109 Self {
110 source,
111 target,
112 state: parking_lot::RwLock::new(ReplicationState::default()),
113 }
114 }
115
116 pub async fn sync(
122 &self,
123 strategy: SyncStrategy,
124 conflict_strategy: Option<ConflictStrategy>,
125 ) -> Result<SyncResult> {
126 let start_time = Instant::now();
127 let conflict_strategy = conflict_strategy.unwrap_or(ConflictStrategy::KeepSource);
128
129 match strategy {
130 SyncStrategy::Full => self.sync_full(conflict_strategy).await,
131 SyncStrategy::Incremental => self.sync_incremental(conflict_strategy).await,
132 SyncStrategy::Bidirectional => {
133 let result1 = self.sync_incremental(conflict_strategy).await?;
135
136 let reverse = Replicator::new(self.target.clone(), self.source.clone());
138 let result2 = reverse.sync_incremental(conflict_strategy).await?;
139
140 Ok(SyncResult {
141 blocks_synced: result1.blocks_synced + result2.blocks_synced,
142 bytes_synced: result1.bytes_synced + result2.bytes_synced,
143 conflicts: result1.conflicts + result2.conflicts,
144 duration: start_time.elapsed(),
145 conflicting_cids: [result1.conflicting_cids, result2.conflicting_cids].concat(),
146 })
147 }
148 }
149 }
150
151 async fn sync_full(&self, conflict_strategy: ConflictStrategy) -> Result<SyncResult> {
153 let start_time = Instant::now();
154
155 let source_cids = self.source.list_cids()?;
157
158 self.sync_cids(&source_cids, conflict_strategy, start_time)
159 .await
160 }
161
162 async fn sync_incremental(&self, conflict_strategy: ConflictStrategy) -> Result<SyncResult> {
164 let start_time = Instant::now();
165
166 let source_cids = self.source.list_cids()?;
168
169 let target_has = self.target.has_many(&source_cids).await?;
171 let missing_cids: Vec<Cid> = source_cids
172 .into_iter()
173 .zip(target_has.iter())
174 .filter_map(|(cid, has)| if !*has { Some(cid) } else { None })
175 .collect();
176
177 self.sync_cids(&missing_cids, conflict_strategy, start_time)
178 .await
179 }
180
181 async fn sync_cids(
183 &self,
184 cids: &[Cid],
185 conflict_strategy: ConflictStrategy,
186 start_time: Instant,
187 ) -> Result<SyncResult> {
188 let mut blocks_synced = 0;
189 let mut bytes_synced = 0u64;
190 let mut conflicts = 0;
191 let mut conflicting_cids = Vec::new();
192 let mut synced_cids = HashSet::new();
193
194 const BATCH_SIZE: usize = 100;
196 for chunk in cids.chunks(BATCH_SIZE) {
197 let blocks = self.source.get_many(chunk).await?;
199
200 let mut blocks_to_put = Vec::new();
201
202 for (cid, block_opt) in chunk.iter().zip(blocks.iter()) {
203 if let Some(block) = block_opt {
204 if let Some(existing) = self.target.get(cid).await? {
206 let should_replace = match conflict_strategy {
208 ConflictStrategy::KeepSource => true,
209 ConflictStrategy::KeepTarget => false,
210 ConflictStrategy::KeepNewer => {
211 block.data().len() > existing.data().len()
214 }
215 ConflictStrategy::Fail => {
216 return Err(Error::Storage(format!(
217 "Conflict detected for block {cid}"
218 )));
219 }
220 };
221
222 if should_replace {
223 blocks_to_put.push(block.clone());
224 bytes_synced += block.data().len() as u64;
225 synced_cids.insert(*cid);
226 }
227
228 conflicts += 1;
229 conflicting_cids.push(*cid);
230 } else {
231 blocks_to_put.push(block.clone());
233 bytes_synced += block.data().len() as u64;
234 synced_cids.insert(*cid);
235 }
236 }
237 }
238
239 if !blocks_to_put.is_empty() {
241 self.target.put_many(&blocks_to_put).await?;
242 blocks_synced += blocks_to_put.len();
243 }
244 }
245
246 {
248 let mut state = self.state.write();
249 state.last_sync = Some(Instant::now());
250 state.last_synced_cids = synced_cids;
251 state.total_blocks_synced += blocks_synced;
252 state.total_bytes_synced += bytes_synced;
253 }
254
255 Ok(SyncResult {
256 blocks_synced,
257 bytes_synced,
258 conflicts,
259 duration: start_time.elapsed(),
260 conflicting_cids,
261 })
262 }
263
264 pub fn state(&self) -> ReplicationState {
266 self.state.read().clone()
267 }
268
269 pub async fn sync_blocks(
271 &self,
272 cids: &[Cid],
273 conflict_strategy: Option<ConflictStrategy>,
274 ) -> Result<SyncResult> {
275 let conflict_strategy = conflict_strategy.unwrap_or(ConflictStrategy::KeepSource);
276 self.sync_cids(cids, conflict_strategy, Instant::now())
277 .await
278 }
279
280 pub async fn verify(&self) -> Result<Vec<Cid>> {
282 let source_cids = self.source.list_cids()?;
283 let target_has = self.target.has_many(&source_cids).await?;
284
285 let missing: Vec<Cid> = source_cids
286 .into_iter()
287 .zip(target_has.iter())
288 .filter_map(|(cid, has)| if !*has { Some(cid) } else { None })
289 .collect();
290
291 Ok(missing)
292 }
293}
294
295pub struct ReplicationManager<S: BlockStore> {
297 primary: Arc<S>,
299 replicas: Vec<Arc<S>>,
301 stats: parking_lot::RwLock<HashMap<usize, ReplicationState>>,
303}
304
305impl<S: BlockStore> ReplicationManager<S> {
306 pub fn new(primary: Arc<S>) -> Self {
308 Self {
309 primary,
310 replicas: Vec::new(),
311 stats: parking_lot::RwLock::new(HashMap::new()),
312 }
313 }
314
315 pub fn add_replica(&mut self, replica: Arc<S>) {
317 self.replicas.push(replica);
318 }
319
320 pub async fn sync_all(&self, strategy: SyncStrategy) -> Result<Vec<SyncResult>> {
322 let mut results = Vec::new();
323
324 for (idx, replica) in self.replicas.iter().enumerate() {
325 let replicator = Replicator::new(self.primary.clone(), replica.clone());
326 let result = replicator.sync(strategy, None).await?;
327
328 self.stats.write().insert(idx, replicator.state());
330
331 results.push(result);
332 }
333
334 Ok(results)
335 }
336
337 pub fn replica_stats(&self, index: usize) -> Option<ReplicationState> {
339 self.stats.read().get(&index).cloned()
340 }
341}
342
343#[cfg(test)]
344mod tests {
345 use super::*;
346 use crate::blockstore::{BlockStoreConfig, SledBlockStore};
347 use bytes::Bytes;
348 use ipfrs_core::Block;
349 use std::path::PathBuf;
350
351 #[tokio::test]
352 async fn test_full_sync() {
353 let source_config = BlockStoreConfig {
354 path: PathBuf::from("/tmp/ipfrs-replication-source"),
355 cache_size: 10 * 1024 * 1024,
356 };
357 let _ = std::fs::remove_dir_all(&source_config.path);
358
359 let target_config = BlockStoreConfig {
360 path: PathBuf::from("/tmp/ipfrs-replication-target"),
361 cache_size: 10 * 1024 * 1024,
362 };
363 let _ = std::fs::remove_dir_all(&target_config.path);
364
365 let source = Arc::new(SledBlockStore::new(source_config).unwrap());
366 let target = Arc::new(SledBlockStore::new(target_config).unwrap());
367
368 let block1 = Block::new(Bytes::from("block 1")).unwrap();
370 let block2 = Block::new(Bytes::from("block 2")).unwrap();
371 source.put(&block1).await.unwrap();
372 source.put(&block2).await.unwrap();
373
374 let replicator = Replicator::new(source.clone(), target.clone());
376 let result = replicator.sync(SyncStrategy::Full, None).await.unwrap();
377
378 assert_eq!(result.blocks_synced, 2);
379 assert_eq!(result.conflicts, 0);
380 assert!(target.has(block1.cid()).await.unwrap());
381 assert!(target.has(block2.cid()).await.unwrap());
382 }
383
384 #[tokio::test]
385 async fn test_incremental_sync() {
386 let source_config = BlockStoreConfig {
387 path: PathBuf::from("/tmp/ipfrs-replication-inc-source"),
388 cache_size: 10 * 1024 * 1024,
389 };
390 let _ = std::fs::remove_dir_all(&source_config.path);
391
392 let target_config = BlockStoreConfig {
393 path: PathBuf::from("/tmp/ipfrs-replication-inc-target"),
394 cache_size: 10 * 1024 * 1024,
395 };
396 let _ = std::fs::remove_dir_all(&target_config.path);
397
398 let source = Arc::new(SledBlockStore::new(source_config).unwrap());
399 let target = Arc::new(SledBlockStore::new(target_config).unwrap());
400
401 let block1 = Block::new(Bytes::from("block 1")).unwrap();
403 source.put(&block1).await.unwrap();
404 target.put(&block1).await.unwrap();
405
406 let block2 = Block::new(Bytes::from("block 2")).unwrap();
408 source.put(&block2).await.unwrap();
409
410 let replicator = Replicator::new(source.clone(), target.clone());
412 let result = replicator
413 .sync(SyncStrategy::Incremental, None)
414 .await
415 .unwrap();
416
417 assert_eq!(result.blocks_synced, 1);
418 assert!(target.has(block2.cid()).await.unwrap());
419 }
420
421 #[tokio::test]
422 async fn test_conflict_resolution() {
423 let source_config = BlockStoreConfig {
424 path: PathBuf::from("/tmp/ipfrs-replication-conflict-source"),
425 cache_size: 10 * 1024 * 1024,
426 };
427 let _ = std::fs::remove_dir_all(&source_config.path);
428
429 let target_config = BlockStoreConfig {
430 path: PathBuf::from("/tmp/ipfrs-replication-conflict-target"),
431 cache_size: 10 * 1024 * 1024,
432 };
433 let _ = std::fs::remove_dir_all(&target_config.path);
434
435 let source = Arc::new(SledBlockStore::new(source_config).unwrap());
436 let target = Arc::new(SledBlockStore::new(target_config).unwrap());
437
438 let block1 = Block::new(Bytes::from("source version")).unwrap();
440 source.put(&block1).await.unwrap();
441
442 let replicator = Replicator::new(source.clone(), target.clone());
445 let result = replicator
446 .sync(SyncStrategy::Full, Some(ConflictStrategy::KeepSource))
447 .await
448 .unwrap();
449
450 assert!(result.blocks_synced > 0);
451 }
452}