1use crate::traits::BlockStore;
7use ipfrs_core::{Cid, Result};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11
12#[derive(Debug, Clone, Default)]
14pub struct MigrationStats {
15 pub blocks_migrated: u64,
17 pub bytes_migrated: u64,
19 pub blocks_skipped: u64,
21 pub errors: u64,
23 pub duration: Duration,
25 pub blocks_per_second: f64,
27 pub bytes_per_second: f64,
29}
30
31impl MigrationStats {
32 fn calculate_throughput(&mut self, duration: Duration) {
34 let seconds = duration.as_secs_f64();
35 if seconds > 0.0 {
36 self.blocks_per_second = self.blocks_migrated as f64 / seconds;
37 self.bytes_per_second = self.bytes_migrated as f64 / seconds;
38 }
39 }
40}
41
42#[derive(Debug, Clone)]
44pub struct MigrationConfig {
45 pub batch_size: usize,
47 pub skip_existing: bool,
49 pub verify: bool,
51 pub concurrency: usize,
53}
54
55impl Default for MigrationConfig {
56 fn default() -> Self {
57 Self {
58 batch_size: 100,
59 skip_existing: true,
60 verify: false,
61 concurrency: 4,
62 }
63 }
64}
65
66pub type ProgressCallback = Arc<dyn Fn(u64, u64) + Send + Sync>;
68
69pub struct StorageMigrator<S: BlockStore, D: BlockStore> {
71 source: Arc<S>,
72 destination: Arc<D>,
73 config: MigrationConfig,
74 progress_callback: Option<ProgressCallback>,
75}
76
77impl<S: BlockStore, D: BlockStore> StorageMigrator<S, D> {
78 pub fn new(source: Arc<S>, destination: Arc<D>) -> Self {
80 Self {
81 source,
82 destination,
83 config: MigrationConfig::default(),
84 progress_callback: None,
85 }
86 }
87
88 pub fn with_config(source: Arc<S>, destination: Arc<D>, config: MigrationConfig) -> Self {
90 Self {
91 source,
92 destination,
93 config,
94 progress_callback: None,
95 }
96 }
97
98 pub fn with_progress_callback<F>(mut self, callback: F) -> Self
100 where
101 F: Fn(u64, u64) + Send + Sync + 'static,
102 {
103 self.progress_callback = Some(Arc::new(callback));
104 self
105 }
106
107 pub async fn migrate_all(&self) -> Result<MigrationStats> {
109 let start = Instant::now();
110
111 let blocks_migrated = AtomicU64::new(0);
112 let bytes_migrated = AtomicU64::new(0);
113 let blocks_skipped = AtomicU64::new(0);
114 let errors = AtomicU64::new(0);
115
116 let all_cids = self.source.list_cids()?;
118 let total_blocks = all_cids.len() as u64;
119
120 for batch in all_cids.chunks(self.config.batch_size) {
122 let cids_to_migrate = if self.config.skip_existing {
124 let exists = self.destination.has_many(batch).await?;
125 batch
126 .iter()
127 .zip(exists.iter())
128 .filter_map(|(cid, exists)| {
129 if *exists {
130 blocks_skipped.fetch_add(1, Ordering::Relaxed);
131 None
132 } else {
133 Some(*cid)
134 }
135 })
136 .collect::<Vec<_>>()
137 } else {
138 batch.to_vec()
139 };
140
141 if cids_to_migrate.is_empty() {
142 continue;
143 }
144
145 let blocks_result = self.source.get_many(&cids_to_migrate).await?;
147
148 let mut valid_blocks = Vec::new();
150 for block_opt in blocks_result {
151 if let Some(block) = block_opt {
152 bytes_migrated.fetch_add(block.data().len() as u64, Ordering::Relaxed);
153 valid_blocks.push(block);
154 } else {
155 errors.fetch_add(1, Ordering::Relaxed);
156 }
157 }
158
159 if !valid_blocks.is_empty() {
161 match self.destination.put_many(&valid_blocks).await {
162 Ok(_) => {
163 blocks_migrated.fetch_add(valid_blocks.len() as u64, Ordering::Relaxed);
164
165 if self.config.verify {
167 let cids: Vec<Cid> = valid_blocks.iter().map(|b| *b.cid()).collect();
168 let verified = self.destination.has_many(&cids).await?;
169 let failed = verified.iter().filter(|&&exists| !exists).count();
170 if failed > 0 {
171 errors.fetch_add(failed as u64, Ordering::Relaxed);
172 }
173 }
174 }
175 Err(_) => {
176 errors.fetch_add(valid_blocks.len() as u64, Ordering::Relaxed);
177 }
178 }
179 }
180
181 if let Some(ref callback) = self.progress_callback {
183 let migrated = blocks_migrated.load(Ordering::Relaxed);
184 callback(migrated, total_blocks);
185 }
186 }
187
188 let mut stats = MigrationStats {
189 blocks_migrated: blocks_migrated.load(Ordering::Relaxed),
190 bytes_migrated: bytes_migrated.load(Ordering::Relaxed),
191 blocks_skipped: blocks_skipped.load(Ordering::Relaxed),
192 errors: errors.load(Ordering::Relaxed),
193 duration: start.elapsed(),
194 blocks_per_second: 0.0,
195 bytes_per_second: 0.0,
196 };
197
198 stats.calculate_throughput(stats.duration);
199
200 Ok(stats)
201 }
202
203 pub async fn migrate_cids(&self, cids: &[Cid]) -> Result<MigrationStats> {
205 let start = Instant::now();
206
207 let blocks_migrated = AtomicU64::new(0);
208 let bytes_migrated = AtomicU64::new(0);
209 let blocks_skipped = AtomicU64::new(0);
210 let errors = AtomicU64::new(0);
211
212 for batch in cids.chunks(self.config.batch_size) {
214 let cids_to_migrate = if self.config.skip_existing {
216 let exists = self.destination.has_many(batch).await?;
217 batch
218 .iter()
219 .zip(exists.iter())
220 .filter_map(|(cid, exists)| {
221 if *exists {
222 blocks_skipped.fetch_add(1, Ordering::Relaxed);
223 None
224 } else {
225 Some(*cid)
226 }
227 })
228 .collect::<Vec<_>>()
229 } else {
230 batch.to_vec()
231 };
232
233 if cids_to_migrate.is_empty() {
234 continue;
235 }
236
237 let blocks_result = self.source.get_many(&cids_to_migrate).await?;
239 let mut valid_blocks = Vec::new();
240
241 for block_opt in blocks_result {
242 if let Some(block) = block_opt {
243 bytes_migrated.fetch_add(block.data().len() as u64, Ordering::Relaxed);
244 valid_blocks.push(block);
245 } else {
246 errors.fetch_add(1, Ordering::Relaxed);
247 }
248 }
249
250 if !valid_blocks.is_empty() {
251 match self.destination.put_many(&valid_blocks).await {
252 Ok(_) => {
253 blocks_migrated.fetch_add(valid_blocks.len() as u64, Ordering::Relaxed);
254 }
255 Err(_) => {
256 errors.fetch_add(valid_blocks.len() as u64, Ordering::Relaxed);
257 }
258 }
259 }
260 }
261
262 let mut stats = MigrationStats {
263 blocks_migrated: blocks_migrated.load(Ordering::Relaxed),
264 bytes_migrated: bytes_migrated.load(Ordering::Relaxed),
265 blocks_skipped: blocks_skipped.load(Ordering::Relaxed),
266 errors: errors.load(Ordering::Relaxed),
267 duration: start.elapsed(),
268 blocks_per_second: 0.0,
269 bytes_per_second: 0.0,
270 };
271
272 stats.calculate_throughput(stats.duration);
273
274 Ok(stats)
275 }
276}
277
278pub async fn migrate_storage<S: BlockStore, D: BlockStore>(
280 source: Arc<S>,
281 destination: Arc<D>,
282) -> Result<MigrationStats> {
283 let migrator = StorageMigrator::new(source, destination);
284 migrator.migrate_all().await
285}
286
287pub async fn migrate_storage_with_progress<S: BlockStore, D: BlockStore, F>(
289 source: Arc<S>,
290 destination: Arc<D>,
291 progress_callback: F,
292) -> Result<MigrationStats>
293where
294 F: Fn(u64, u64) + Send + Sync + 'static,
295{
296 let migrator =
297 StorageMigrator::new(source, destination).with_progress_callback(progress_callback);
298 migrator.migrate_all().await
299}
300
301pub async fn migrate_storage_batched<S: BlockStore, D: BlockStore>(
303 source: Arc<S>,
304 destination: Arc<D>,
305 batch_size: usize,
306) -> Result<MigrationStats> {
307 let config = MigrationConfig {
308 batch_size,
309 ..Default::default()
310 };
311 let migrator = StorageMigrator::with_config(source, destination, config);
312 migrator.migrate_all().await
313}
314
315pub async fn migrate_storage_verified<S: BlockStore, D: BlockStore>(
317 source: Arc<S>,
318 destination: Arc<D>,
319) -> Result<MigrationStats> {
320 let config = MigrationConfig {
321 verify: true,
322 ..Default::default()
323 };
324 let migrator = StorageMigrator::with_config(source, destination, config);
325 migrator.migrate_all().await
326}
327
328#[derive(Debug, Clone)]
330pub struct MigrationEstimate {
331 pub total_blocks: usize,
333 pub total_bytes: u64,
335 pub estimated_duration_low: Duration,
337 pub estimated_duration_high: Duration,
339 pub space_required: u64,
341}
342
343pub async fn estimate_migration<S: BlockStore>(source: Arc<S>) -> Result<MigrationEstimate> {
345 let all_cids = source.list_cids()?;
346 let total_blocks = all_cids.len();
347
348 let sample_size = total_blocks.min(100);
350 let sample_cids: Vec<_> = all_cids.iter().take(sample_size).copied().collect();
351
352 let blocks = source.get_many(&sample_cids).await?;
353 let sample_bytes: u64 = blocks
354 .iter()
355 .filter_map(|b| b.as_ref())
356 .map(|b| b.data().len() as u64)
357 .sum();
358
359 let avg_block_size = if sample_size > 0 {
360 sample_bytes / sample_size as u64
361 } else {
362 0
363 };
364
365 let total_bytes = avg_block_size * total_blocks as u64;
366
367 let estimated_duration_low = Duration::from_secs(total_blocks as u64 / 100);
369 let estimated_duration_high = Duration::from_secs(total_blocks as u64 / 1000);
370
371 Ok(MigrationEstimate {
372 total_blocks,
373 total_bytes,
374 estimated_duration_low,
375 estimated_duration_high,
376 space_required: total_bytes,
377 })
378}
379
380pub async fn validate_migration<S: BlockStore, D: BlockStore>(
382 source: Arc<S>,
383 destination: Arc<D>,
384) -> Result<bool> {
385 let source_cids = source.list_cids()?;
386 let dest_cids = destination.list_cids()?;
387
388 if source_cids.len() != dest_cids.len() {
390 return Ok(false);
391 }
392
393 let exists = destination.has_many(&source_cids).await?;
395 Ok(exists.iter().all(|&e| e))
396}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401 use crate::MemoryBlockStore;
402 use bytes::Bytes;
403 use ipfrs_core::Block;
404
405 #[tokio::test]
406 async fn test_basic_migration() {
407 let source = Arc::new(MemoryBlockStore::new());
408 let destination = Arc::new(MemoryBlockStore::new());
409
410 for i in 0..10 {
412 let block = Block::new(Bytes::from(format!("block {}", i))).unwrap();
413 source.put(&block).await.unwrap();
414 }
415
416 assert_eq!(source.len(), 10);
417 assert_eq!(destination.len(), 0);
418
419 let stats = migrate_storage(source.clone(), destination.clone())
421 .await
422 .unwrap();
423
424 assert_eq!(stats.blocks_migrated, 10);
425 assert_eq!(stats.blocks_skipped, 0);
426 assert_eq!(stats.errors, 0);
427 assert_eq!(destination.len(), 10);
428 }
429
430 #[tokio::test]
431 async fn test_migration_skip_existing() {
432 let source = Arc::new(MemoryBlockStore::new());
433 let destination = Arc::new(MemoryBlockStore::new());
434
435 let mut blocks = Vec::new();
437 for i in 0..10 {
438 let block = Block::new(Bytes::from(format!("block {}", i))).unwrap();
439 blocks.push(block);
440 }
441
442 for block in &blocks {
444 source.put(block).await.unwrap();
445 }
446
447 for block in blocks.iter().take(5) {
449 destination.put(block).await.unwrap();
450 }
451
452 let config = MigrationConfig {
454 skip_existing: true,
455 ..Default::default()
456 };
457 let migrator = StorageMigrator::with_config(source, destination.clone(), config);
458 let stats = migrator.migrate_all().await.unwrap();
459
460 assert_eq!(stats.blocks_migrated, 5); assert_eq!(stats.blocks_skipped, 5); assert_eq!(destination.len(), 10);
463 }
464
465 #[tokio::test]
466 async fn test_migration_with_progress() {
467 let source = Arc::new(MemoryBlockStore::new());
468 let destination = Arc::new(MemoryBlockStore::new());
469
470 for i in 0..20 {
472 let block = Block::new(Bytes::from(format!("block {}", i))).unwrap();
473 source.put(&block).await.unwrap();
474 }
475
476 let progress_called = Arc::new(AtomicU64::new(0));
477 let progress_called_clone = progress_called.clone();
478
479 let stats = migrate_storage_with_progress(source, destination, move |_current, _total| {
480 progress_called_clone.fetch_add(1, Ordering::Relaxed);
481 })
482 .await
483 .unwrap();
484
485 assert_eq!(stats.blocks_migrated, 20);
486 assert!(progress_called.load(Ordering::Relaxed) > 0);
487 }
488
489 #[tokio::test]
490 async fn test_migrate_storage_batched() {
491 let source = Arc::new(MemoryBlockStore::new());
492 let destination = Arc::new(MemoryBlockStore::new());
493
494 for i in 0..50 {
496 let block = Block::new(Bytes::from(format!("block {}", i))).unwrap();
497 source.put(&block).await.unwrap();
498 }
499
500 let stats = migrate_storage_batched(source, destination.clone(), 10)
501 .await
502 .unwrap();
503
504 assert_eq!(stats.blocks_migrated, 50);
505 assert_eq!(destination.len(), 50);
506 }
507
508 #[tokio::test]
509 async fn test_estimate_migration() {
510 let source = Arc::new(MemoryBlockStore::new());
511
512 for i in 0..100 {
514 let block = Block::new(Bytes::from(format!("block {}", i))).unwrap();
515 source.put(&block).await.unwrap();
516 }
517
518 let estimate = estimate_migration(source).await.unwrap();
519
520 assert_eq!(estimate.total_blocks, 100);
521 assert!(estimate.total_bytes > 0);
522 assert!(estimate.space_required > 0);
523 }
524
525 #[tokio::test]
526 async fn test_validate_migration() {
527 let source = Arc::new(MemoryBlockStore::new());
528 let destination = Arc::new(MemoryBlockStore::new());
529
530 for i in 0..10 {
532 let block = Block::new(Bytes::from(format!("block {}", i))).unwrap();
533 source.put(&block).await.unwrap();
534 destination.put(&block).await.unwrap();
535 }
536
537 let valid = validate_migration(source.clone(), destination.clone())
538 .await
539 .unwrap();
540
541 assert!(valid);
542
543 let extra_block = Block::new(Bytes::from("extra")).unwrap();
545 source.put(&extra_block).await.unwrap();
546
547 let valid = validate_migration(source, destination).await.unwrap();
548 assert!(!valid);
549 }
550}