use crate::traits::BlockStore;
use ipfrs_core::{Cid, Error, Result};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SyncStrategy {
Full,
Incremental,
Bidirectional,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConflictStrategy {
KeepSource,
KeepTarget,
KeepNewer,
Fail,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SyncResult {
pub blocks_synced: usize,
pub bytes_synced: u64,
pub conflicts: usize,
pub duration: Duration,
pub conflicting_cids: Vec<Cid>,
}
#[derive(Debug, Clone, Default)]
pub struct ReplicationState {
pub last_sync: Option<Instant>,
pub last_synced_cids: HashSet<Cid>,
pub total_blocks_synced: usize,
pub total_bytes_synced: u64,
}
pub struct Replicator<S: BlockStore, T: BlockStore> {
source: Arc<S>,
target: Arc<T>,
state: parking_lot::RwLock<ReplicationState>,
}
impl<S: BlockStore, T: BlockStore> Replicator<S, T> {
pub fn new(source: Arc<S>, target: Arc<T>) -> Self {
Self {
source,
target,
state: parking_lot::RwLock::new(ReplicationState::default()),
}
}
pub async fn sync(
&self,
strategy: SyncStrategy,
conflict_strategy: Option<ConflictStrategy>,
) -> Result<SyncResult> {
let start_time = Instant::now();
let conflict_strategy = conflict_strategy.unwrap_or(ConflictStrategy::KeepSource);
match strategy {
SyncStrategy::Full => self.sync_full(conflict_strategy).await,
SyncStrategy::Incremental => self.sync_incremental(conflict_strategy).await,
SyncStrategy::Bidirectional => {
let result1 = self.sync_incremental(conflict_strategy).await?;
let reverse = Replicator::new(self.target.clone(), self.source.clone());
let result2 = reverse.sync_incremental(conflict_strategy).await?;
Ok(SyncResult {
blocks_synced: result1.blocks_synced + result2.blocks_synced,
bytes_synced: result1.bytes_synced + result2.bytes_synced,
conflicts: result1.conflicts + result2.conflicts,
duration: start_time.elapsed(),
conflicting_cids: [result1.conflicting_cids, result2.conflicting_cids].concat(),
})
}
}
}
async fn sync_full(&self, conflict_strategy: ConflictStrategy) -> Result<SyncResult> {
let start_time = Instant::now();
let source_cids = self.source.list_cids()?;
self.sync_cids(&source_cids, conflict_strategy, start_time)
.await
}
async fn sync_incremental(&self, conflict_strategy: ConflictStrategy) -> Result<SyncResult> {
let start_time = Instant::now();
let source_cids = self.source.list_cids()?;
let target_has = self.target.has_many(&source_cids).await?;
let missing_cids: Vec<Cid> = source_cids
.into_iter()
.zip(target_has.iter())
.filter_map(|(cid, has)| if !*has { Some(cid) } else { None })
.collect();
self.sync_cids(&missing_cids, conflict_strategy, start_time)
.await
}
async fn sync_cids(
&self,
cids: &[Cid],
conflict_strategy: ConflictStrategy,
start_time: Instant,
) -> Result<SyncResult> {
let mut blocks_synced = 0;
let mut bytes_synced = 0u64;
let mut conflicts = 0;
let mut conflicting_cids = Vec::new();
let mut synced_cids = HashSet::new();
const BATCH_SIZE: usize = 100;
for chunk in cids.chunks(BATCH_SIZE) {
let blocks = self.source.get_many(chunk).await?;
let mut blocks_to_put = Vec::new();
for (cid, block_opt) in chunk.iter().zip(blocks.iter()) {
if let Some(block) = block_opt {
if let Some(existing) = self.target.get(cid).await? {
let should_replace = match conflict_strategy {
ConflictStrategy::KeepSource => true,
ConflictStrategy::KeepTarget => false,
ConflictStrategy::KeepNewer => {
block.data().len() > existing.data().len()
}
ConflictStrategy::Fail => {
return Err(Error::Storage(format!(
"Conflict detected for block {cid}"
)));
}
};
if should_replace {
blocks_to_put.push(block.clone());
bytes_synced += block.data().len() as u64;
synced_cids.insert(*cid);
}
conflicts += 1;
conflicting_cids.push(*cid);
} else {
blocks_to_put.push(block.clone());
bytes_synced += block.data().len() as u64;
synced_cids.insert(*cid);
}
}
}
if !blocks_to_put.is_empty() {
self.target.put_many(&blocks_to_put).await?;
blocks_synced += blocks_to_put.len();
}
}
{
let mut state = self.state.write();
state.last_sync = Some(Instant::now());
state.last_synced_cids = synced_cids;
state.total_blocks_synced += blocks_synced;
state.total_bytes_synced += bytes_synced;
}
Ok(SyncResult {
blocks_synced,
bytes_synced,
conflicts,
duration: start_time.elapsed(),
conflicting_cids,
})
}
pub fn state(&self) -> ReplicationState {
self.state.read().clone()
}
pub async fn sync_blocks(
&self,
cids: &[Cid],
conflict_strategy: Option<ConflictStrategy>,
) -> Result<SyncResult> {
let conflict_strategy = conflict_strategy.unwrap_or(ConflictStrategy::KeepSource);
self.sync_cids(cids, conflict_strategy, Instant::now())
.await
}
pub async fn verify(&self) -> Result<Vec<Cid>> {
let source_cids = self.source.list_cids()?;
let target_has = self.target.has_many(&source_cids).await?;
let missing: Vec<Cid> = source_cids
.into_iter()
.zip(target_has.iter())
.filter_map(|(cid, has)| if !*has { Some(cid) } else { None })
.collect();
Ok(missing)
}
}
pub struct ReplicationManager<S: BlockStore> {
primary: Arc<S>,
replicas: Vec<Arc<S>>,
stats: parking_lot::RwLock<HashMap<usize, ReplicationState>>,
}
impl<S: BlockStore> ReplicationManager<S> {
pub fn new(primary: Arc<S>) -> Self {
Self {
primary,
replicas: Vec::new(),
stats: parking_lot::RwLock::new(HashMap::new()),
}
}
pub fn add_replica(&mut self, replica: Arc<S>) {
self.replicas.push(replica);
}
pub async fn sync_all(&self, strategy: SyncStrategy) -> Result<Vec<SyncResult>> {
let mut results = Vec::new();
for (idx, replica) in self.replicas.iter().enumerate() {
let replicator = Replicator::new(self.primary.clone(), replica.clone());
let result = replicator.sync(strategy, None).await?;
self.stats.write().insert(idx, replicator.state());
results.push(result);
}
Ok(results)
}
pub fn replica_stats(&self, index: usize) -> Option<ReplicationState> {
self.stats.read().get(&index).cloned()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::blockstore::{BlockStoreConfig, SledBlockStore};
use bytes::Bytes;
use ipfrs_core::Block;
use std::path::PathBuf;
#[tokio::test]
async fn test_full_sync() {
let source_config = BlockStoreConfig {
path: PathBuf::from("/tmp/ipfrs-replication-source"),
cache_size: 10 * 1024 * 1024,
};
let _ = std::fs::remove_dir_all(&source_config.path);
let target_config = BlockStoreConfig {
path: PathBuf::from("/tmp/ipfrs-replication-target"),
cache_size: 10 * 1024 * 1024,
};
let _ = std::fs::remove_dir_all(&target_config.path);
let source = Arc::new(SledBlockStore::new(source_config).unwrap());
let target = Arc::new(SledBlockStore::new(target_config).unwrap());
let block1 = Block::new(Bytes::from("block 1")).unwrap();
let block2 = Block::new(Bytes::from("block 2")).unwrap();
source.put(&block1).await.unwrap();
source.put(&block2).await.unwrap();
let replicator = Replicator::new(source.clone(), target.clone());
let result = replicator.sync(SyncStrategy::Full, None).await.unwrap();
assert_eq!(result.blocks_synced, 2);
assert_eq!(result.conflicts, 0);
assert!(target.has(block1.cid()).await.unwrap());
assert!(target.has(block2.cid()).await.unwrap());
}
#[tokio::test]
async fn test_incremental_sync() {
let source_config = BlockStoreConfig {
path: PathBuf::from("/tmp/ipfrs-replication-inc-source"),
cache_size: 10 * 1024 * 1024,
};
let _ = std::fs::remove_dir_all(&source_config.path);
let target_config = BlockStoreConfig {
path: PathBuf::from("/tmp/ipfrs-replication-inc-target"),
cache_size: 10 * 1024 * 1024,
};
let _ = std::fs::remove_dir_all(&target_config.path);
let source = Arc::new(SledBlockStore::new(source_config).unwrap());
let target = Arc::new(SledBlockStore::new(target_config).unwrap());
let block1 = Block::new(Bytes::from("block 1")).unwrap();
source.put(&block1).await.unwrap();
target.put(&block1).await.unwrap();
let block2 = Block::new(Bytes::from("block 2")).unwrap();
source.put(&block2).await.unwrap();
let replicator = Replicator::new(source.clone(), target.clone());
let result = replicator
.sync(SyncStrategy::Incremental, None)
.await
.unwrap();
assert_eq!(result.blocks_synced, 1);
assert!(target.has(block2.cid()).await.unwrap());
}
#[tokio::test]
async fn test_conflict_resolution() {
let source_config = BlockStoreConfig {
path: PathBuf::from("/tmp/ipfrs-replication-conflict-source"),
cache_size: 10 * 1024 * 1024,
};
let _ = std::fs::remove_dir_all(&source_config.path);
let target_config = BlockStoreConfig {
path: PathBuf::from("/tmp/ipfrs-replication-conflict-target"),
cache_size: 10 * 1024 * 1024,
};
let _ = std::fs::remove_dir_all(&target_config.path);
let source = Arc::new(SledBlockStore::new(source_config).unwrap());
let target = Arc::new(SledBlockStore::new(target_config).unwrap());
let block1 = Block::new(Bytes::from("source version")).unwrap();
source.put(&block1).await.unwrap();
let replicator = Replicator::new(source.clone(), target.clone());
let result = replicator
.sync(SyncStrategy::Full, Some(ConflictStrategy::KeepSource))
.await
.unwrap();
assert!(result.blocks_synced > 0);
}
}