ipfrs_storage/
replication.rs

1//! Block replication and synchronization
2//!
3//! Provides protocols for syncing blocks between stores:
4//! - Incremental sync (delta only)
5//! - Full sync
6//! - Conflict resolution
7//! - Bi-directional replication
8//!
9//! # Example
10//!
11//! ```rust,ignore
12//! use ipfrs_storage::{Replicator, SyncStrategy, SledBlockStore, BlockStoreConfig};
13//! use std::sync::Arc;
14//! use std::path::PathBuf;
15//!
16//! # async fn example() -> ipfrs_core::Result<()> {
17//! // Create source and target stores
18//! let source = Arc::new(SledBlockStore::new(BlockStoreConfig {
19//!     path: PathBuf::from(".ipfrs/source"),
20//!     cache_size: 100 * 1024 * 1024,
21//! })?);
22//!
23//! let target = Arc::new(SledBlockStore::new(BlockStoreConfig {
24//!     path: PathBuf::from(".ipfrs/target"),
25//!     cache_size: 100 * 1024 * 1024,
26//! })?);
27//!
28//! // Create replicator
29//! let replicator = Replicator::new(source, target);
30//!
31//! // Perform incremental sync
32//! let result = replicator.sync(SyncStrategy::Incremental, None).await?;
33//! println!("Synced {} blocks ({} bytes)", result.blocks_synced, result.bytes_synced);
34//! # Ok(())
35//! # }
36//! ```
37
38use crate::traits::BlockStore;
39use ipfrs_core::{Cid, Error, Result};
40use std::collections::{HashMap, HashSet};
41use std::sync::Arc;
42use std::time::{Duration, Instant};
43
44/// Synchronization strategy
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum SyncStrategy {
47    /// Full sync - copy all blocks from source to target
48    Full,
49    /// Incremental sync - only copy blocks missing in target
50    Incremental,
51    /// Bidirectional sync - sync in both directions
52    Bidirectional,
53}
54
55/// Conflict resolution strategy
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum ConflictStrategy {
58    /// Keep source version on conflict
59    KeepSource,
60    /// Keep target version on conflict
61    KeepTarget,
62    /// Keep newer version (based on timestamp)
63    KeepNewer,
64    /// Fail on conflict
65    Fail,
66}
67
68/// Result of a synchronization operation
69#[derive(Debug, Clone, PartialEq, Eq)]
70pub struct SyncResult {
71    /// Number of blocks synced
72    pub blocks_synced: usize,
73    /// Total bytes synced
74    pub bytes_synced: u64,
75    /// Number of conflicts encountered
76    pub conflicts: usize,
77    /// Duration of sync operation
78    pub duration: Duration,
79    /// List of CIDs that had conflicts
80    pub conflicting_cids: Vec<Cid>,
81}
82
83/// Replication state for tracking sync progress
84#[derive(Debug, Clone, Default)]
85pub struct ReplicationState {
86    /// Last synced timestamp
87    pub last_sync: Option<Instant>,
88    /// CIDs synced in last operation
89    pub last_synced_cids: HashSet<Cid>,
90    /// Total blocks synced across all operations
91    pub total_blocks_synced: usize,
92    /// Total bytes synced across all operations
93    pub total_bytes_synced: u64,
94}
95
96/// Block replicator for syncing between stores
97pub struct Replicator<S: BlockStore, T: BlockStore> {
98    /// Source store
99    source: Arc<S>,
100    /// Target store
101    target: Arc<T>,
102    /// Replication state
103    state: parking_lot::RwLock<ReplicationState>,
104}
105
106impl<S: BlockStore, T: BlockStore> Replicator<S, T> {
107    /// Create a new replicator
108    pub fn new(source: Arc<S>, target: Arc<T>) -> Self {
109        Self {
110            source,
111            target,
112            state: parking_lot::RwLock::new(ReplicationState::default()),
113        }
114    }
115
116    /// Synchronize blocks from source to target
117    ///
118    /// # Arguments
119    /// * `strategy` - Synchronization strategy to use
120    /// * `conflict_strategy` - How to resolve conflicts (if None, defaults to KeepSource)
121    pub async fn sync(
122        &self,
123        strategy: SyncStrategy,
124        conflict_strategy: Option<ConflictStrategy>,
125    ) -> Result<SyncResult> {
126        let start_time = Instant::now();
127        let conflict_strategy = conflict_strategy.unwrap_or(ConflictStrategy::KeepSource);
128
129        match strategy {
130            SyncStrategy::Full => self.sync_full(conflict_strategy).await,
131            SyncStrategy::Incremental => self.sync_incremental(conflict_strategy).await,
132            SyncStrategy::Bidirectional => {
133                // Sync both directions
134                let result1 = self.sync_incremental(conflict_strategy).await?;
135
136                // Create reverse replicator
137                let reverse = Replicator::new(self.target.clone(), self.source.clone());
138                let result2 = reverse.sync_incremental(conflict_strategy).await?;
139
140                Ok(SyncResult {
141                    blocks_synced: result1.blocks_synced + result2.blocks_synced,
142                    bytes_synced: result1.bytes_synced + result2.bytes_synced,
143                    conflicts: result1.conflicts + result2.conflicts,
144                    duration: start_time.elapsed(),
145                    conflicting_cids: [result1.conflicting_cids, result2.conflicting_cids].concat(),
146                })
147            }
148        }
149    }
150
151    /// Perform full sync (all blocks)
152    async fn sync_full(&self, conflict_strategy: ConflictStrategy) -> Result<SyncResult> {
153        let start_time = Instant::now();
154
155        // Get all CIDs from source
156        let source_cids = self.source.list_cids()?;
157
158        self.sync_cids(&source_cids, conflict_strategy, start_time)
159            .await
160    }
161
162    /// Perform incremental sync (only missing blocks)
163    async fn sync_incremental(&self, conflict_strategy: ConflictStrategy) -> Result<SyncResult> {
164        let start_time = Instant::now();
165
166        // Get all CIDs from source
167        let source_cids = self.source.list_cids()?;
168
169        // Filter to only CIDs not in target
170        let target_has = self.target.has_many(&source_cids).await?;
171        let missing_cids: Vec<Cid> = source_cids
172            .into_iter()
173            .zip(target_has.iter())
174            .filter_map(|(cid, has)| if !*has { Some(cid) } else { None })
175            .collect();
176
177        self.sync_cids(&missing_cids, conflict_strategy, start_time)
178            .await
179    }
180
181    /// Sync specific CIDs from source to target
182    async fn sync_cids(
183        &self,
184        cids: &[Cid],
185        conflict_strategy: ConflictStrategy,
186        start_time: Instant,
187    ) -> Result<SyncResult> {
188        let mut blocks_synced = 0;
189        let mut bytes_synced = 0u64;
190        let mut conflicts = 0;
191        let mut conflicting_cids = Vec::new();
192        let mut synced_cids = HashSet::new();
193
194        // Sync in batches for efficiency
195        const BATCH_SIZE: usize = 100;
196        for chunk in cids.chunks(BATCH_SIZE) {
197            // Get blocks from source
198            let blocks = self.source.get_many(chunk).await?;
199
200            let mut blocks_to_put = Vec::new();
201
202            for (cid, block_opt) in chunk.iter().zip(blocks.iter()) {
203                if let Some(block) = block_opt {
204                    // Check for conflicts
205                    if let Some(existing) = self.target.get(cid).await? {
206                        // Conflict: block exists in both stores
207                        let should_replace = match conflict_strategy {
208                            ConflictStrategy::KeepSource => true,
209                            ConflictStrategy::KeepTarget => false,
210                            ConflictStrategy::KeepNewer => {
211                                // For simplicity, compare data content
212                                // In a real implementation, we'd use timestamps or versioning
213                                block.data().len() > existing.data().len()
214                            }
215                            ConflictStrategy::Fail => {
216                                return Err(Error::Storage(format!(
217                                    "Conflict detected for block {cid}"
218                                )));
219                            }
220                        };
221
222                        if should_replace {
223                            blocks_to_put.push(block.clone());
224                            bytes_synced += block.data().len() as u64;
225                            synced_cids.insert(*cid);
226                        }
227
228                        conflicts += 1;
229                        conflicting_cids.push(*cid);
230                    } else {
231                        // No conflict, just copy
232                        blocks_to_put.push(block.clone());
233                        bytes_synced += block.data().len() as u64;
234                        synced_cids.insert(*cid);
235                    }
236                }
237            }
238
239            // Batch write to target
240            if !blocks_to_put.is_empty() {
241                self.target.put_many(&blocks_to_put).await?;
242                blocks_synced += blocks_to_put.len();
243            }
244        }
245
246        // Update state
247        {
248            let mut state = self.state.write();
249            state.last_sync = Some(Instant::now());
250            state.last_synced_cids = synced_cids;
251            state.total_blocks_synced += blocks_synced;
252            state.total_bytes_synced += bytes_synced;
253        }
254
255        Ok(SyncResult {
256            blocks_synced,
257            bytes_synced,
258            conflicts,
259            duration: start_time.elapsed(),
260            conflicting_cids,
261        })
262    }
263
264    /// Get current replication state
265    pub fn state(&self) -> ReplicationState {
266        self.state.read().clone()
267    }
268
269    /// Sync specific blocks by CID list
270    pub async fn sync_blocks(
271        &self,
272        cids: &[Cid],
273        conflict_strategy: Option<ConflictStrategy>,
274    ) -> Result<SyncResult> {
275        let conflict_strategy = conflict_strategy.unwrap_or(ConflictStrategy::KeepSource);
276        self.sync_cids(cids, conflict_strategy, Instant::now())
277            .await
278    }
279
280    /// Verify sync integrity - check that all blocks in source exist in target
281    pub async fn verify(&self) -> Result<Vec<Cid>> {
282        let source_cids = self.source.list_cids()?;
283        let target_has = self.target.has_many(&source_cids).await?;
284
285        let missing: Vec<Cid> = source_cids
286            .into_iter()
287            .zip(target_has.iter())
288            .filter_map(|(cid, has)| if !*has { Some(cid) } else { None })
289            .collect();
290
291        Ok(missing)
292    }
293}
294
295/// Replication manager for coordinating multiple replicators
296pub struct ReplicationManager<S: BlockStore> {
297    /// Primary store
298    primary: Arc<S>,
299    /// Replica stores
300    replicas: Vec<Arc<S>>,
301    /// Replication statistics
302    stats: parking_lot::RwLock<HashMap<usize, ReplicationState>>,
303}
304
305impl<S: BlockStore> ReplicationManager<S> {
306    /// Create a new replication manager
307    pub fn new(primary: Arc<S>) -> Self {
308        Self {
309            primary,
310            replicas: Vec::new(),
311            stats: parking_lot::RwLock::new(HashMap::new()),
312        }
313    }
314
315    /// Add a replica store
316    pub fn add_replica(&mut self, replica: Arc<S>) {
317        self.replicas.push(replica);
318    }
319
320    /// Sync primary to all replicas
321    pub async fn sync_all(&self, strategy: SyncStrategy) -> Result<Vec<SyncResult>> {
322        let mut results = Vec::new();
323
324        for (idx, replica) in self.replicas.iter().enumerate() {
325            let replicator = Replicator::new(self.primary.clone(), replica.clone());
326            let result = replicator.sync(strategy, None).await?;
327
328            // Update stats
329            self.stats.write().insert(idx, replicator.state());
330
331            results.push(result);
332        }
333
334        Ok(results)
335    }
336
337    /// Get replication statistics for a specific replica
338    pub fn replica_stats(&self, index: usize) -> Option<ReplicationState> {
339        self.stats.read().get(&index).cloned()
340    }
341}
342
343#[cfg(test)]
344mod tests {
345    use super::*;
346    use crate::blockstore::{BlockStoreConfig, SledBlockStore};
347    use bytes::Bytes;
348    use ipfrs_core::Block;
349    use std::path::PathBuf;
350
351    #[tokio::test]
352    async fn test_full_sync() {
353        let source_config = BlockStoreConfig {
354            path: PathBuf::from("/tmp/ipfrs-replication-source"),
355            cache_size: 10 * 1024 * 1024,
356        };
357        let _ = std::fs::remove_dir_all(&source_config.path);
358
359        let target_config = BlockStoreConfig {
360            path: PathBuf::from("/tmp/ipfrs-replication-target"),
361            cache_size: 10 * 1024 * 1024,
362        };
363        let _ = std::fs::remove_dir_all(&target_config.path);
364
365        let source = Arc::new(SledBlockStore::new(source_config).unwrap());
366        let target = Arc::new(SledBlockStore::new(target_config).unwrap());
367
368        // Add blocks to source
369        let block1 = Block::new(Bytes::from("block 1")).unwrap();
370        let block2 = Block::new(Bytes::from("block 2")).unwrap();
371        source.put(&block1).await.unwrap();
372        source.put(&block2).await.unwrap();
373
374        // Sync
375        let replicator = Replicator::new(source.clone(), target.clone());
376        let result = replicator.sync(SyncStrategy::Full, None).await.unwrap();
377
378        assert_eq!(result.blocks_synced, 2);
379        assert_eq!(result.conflicts, 0);
380        assert!(target.has(block1.cid()).await.unwrap());
381        assert!(target.has(block2.cid()).await.unwrap());
382    }
383
384    #[tokio::test]
385    async fn test_incremental_sync() {
386        let source_config = BlockStoreConfig {
387            path: PathBuf::from("/tmp/ipfrs-replication-inc-source"),
388            cache_size: 10 * 1024 * 1024,
389        };
390        let _ = std::fs::remove_dir_all(&source_config.path);
391
392        let target_config = BlockStoreConfig {
393            path: PathBuf::from("/tmp/ipfrs-replication-inc-target"),
394            cache_size: 10 * 1024 * 1024,
395        };
396        let _ = std::fs::remove_dir_all(&target_config.path);
397
398        let source = Arc::new(SledBlockStore::new(source_config).unwrap());
399        let target = Arc::new(SledBlockStore::new(target_config).unwrap());
400
401        // Add some blocks to both
402        let block1 = Block::new(Bytes::from("block 1")).unwrap();
403        source.put(&block1).await.unwrap();
404        target.put(&block1).await.unwrap();
405
406        // Add unique block to source
407        let block2 = Block::new(Bytes::from("block 2")).unwrap();
408        source.put(&block2).await.unwrap();
409
410        // Incremental sync should only copy block2
411        let replicator = Replicator::new(source.clone(), target.clone());
412        let result = replicator
413            .sync(SyncStrategy::Incremental, None)
414            .await
415            .unwrap();
416
417        assert_eq!(result.blocks_synced, 1);
418        assert!(target.has(block2.cid()).await.unwrap());
419    }
420
421    #[tokio::test]
422    async fn test_conflict_resolution() {
423        let source_config = BlockStoreConfig {
424            path: PathBuf::from("/tmp/ipfrs-replication-conflict-source"),
425            cache_size: 10 * 1024 * 1024,
426        };
427        let _ = std::fs::remove_dir_all(&source_config.path);
428
429        let target_config = BlockStoreConfig {
430            path: PathBuf::from("/tmp/ipfrs-replication-conflict-target"),
431            cache_size: 10 * 1024 * 1024,
432        };
433        let _ = std::fs::remove_dir_all(&target_config.path);
434
435        let source = Arc::new(SledBlockStore::new(source_config).unwrap());
436        let target = Arc::new(SledBlockStore::new(target_config).unwrap());
437
438        // Add same CID with different content (simulate conflict)
439        let block1 = Block::new(Bytes::from("source version")).unwrap();
440        source.put(&block1).await.unwrap();
441
442        // Note: In a real conflict scenario, we'd have same CID with different data
443        // For this test, we'll just verify the conflict handling works
444        let replicator = Replicator::new(source.clone(), target.clone());
445        let result = replicator
446            .sync(SyncStrategy::Full, Some(ConflictStrategy::KeepSource))
447            .await
448            .unwrap();
449
450        assert!(result.blocks_synced > 0);
451    }
452}