ipfrs_storage/
migration.rs

1//! Storage migration utilities
2//!
3//! This module provides utilities for migrating data between different storage backends,
4//! enabling seamless transitions in production deployments.
5
6use crate::traits::BlockStore;
7use ipfrs_core::{Cid, Result};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11
12/// Migration statistics
13#[derive(Debug, Clone, Default)]
14pub struct MigrationStats {
15    /// Total blocks migrated
16    pub blocks_migrated: u64,
17    /// Total bytes migrated
18    pub bytes_migrated: u64,
19    /// Number of blocks skipped (already present in destination)
20    pub blocks_skipped: u64,
21    /// Number of errors encountered
22    pub errors: u64,
23    /// Migration duration
24    pub duration: Duration,
25    /// Migration throughput in blocks per second
26    pub blocks_per_second: f64,
27    /// Migration throughput in bytes per second
28    pub bytes_per_second: f64,
29}
30
31impl MigrationStats {
32    /// Calculate throughput metrics
33    fn calculate_throughput(&mut self, duration: Duration) {
34        let seconds = duration.as_secs_f64();
35        if seconds > 0.0 {
36            self.blocks_per_second = self.blocks_migrated as f64 / seconds;
37            self.bytes_per_second = self.bytes_migrated as f64 / seconds;
38        }
39    }
40}
41
42/// Migration configuration
43#[derive(Debug, Clone)]
44pub struct MigrationConfig {
45    /// Batch size for bulk operations
46    pub batch_size: usize,
47    /// Whether to skip blocks that already exist in destination
48    pub skip_existing: bool,
49    /// Whether to verify each block after migration
50    pub verify: bool,
51    /// Maximum number of concurrent operations
52    pub concurrency: usize,
53}
54
55impl Default for MigrationConfig {
56    fn default() -> Self {
57        Self {
58            batch_size: 100,
59            skip_existing: true,
60            verify: false,
61            concurrency: 4,
62        }
63    }
64}
65
66/// Progress callback type
67pub type ProgressCallback = Arc<dyn Fn(u64, u64) + Send + Sync>;
68
69/// Storage migrator
70pub struct StorageMigrator<S: BlockStore, D: BlockStore> {
71    source: Arc<S>,
72    destination: Arc<D>,
73    config: MigrationConfig,
74    progress_callback: Option<ProgressCallback>,
75}
76
77impl<S: BlockStore, D: BlockStore> StorageMigrator<S, D> {
78    /// Create a new migrator
79    pub fn new(source: Arc<S>, destination: Arc<D>) -> Self {
80        Self {
81            source,
82            destination,
83            config: MigrationConfig::default(),
84            progress_callback: None,
85        }
86    }
87
88    /// Create with custom configuration
89    pub fn with_config(source: Arc<S>, destination: Arc<D>, config: MigrationConfig) -> Self {
90        Self {
91            source,
92            destination,
93            config,
94            progress_callback: None,
95        }
96    }
97
98    /// Set progress callback
99    pub fn with_progress_callback<F>(mut self, callback: F) -> Self
100    where
101        F: Fn(u64, u64) + Send + Sync + 'static,
102    {
103        self.progress_callback = Some(Arc::new(callback));
104        self
105    }
106
107    /// Migrate all blocks from source to destination
108    pub async fn migrate_all(&self) -> Result<MigrationStats> {
109        let start = Instant::now();
110
111        let blocks_migrated = AtomicU64::new(0);
112        let bytes_migrated = AtomicU64::new(0);
113        let blocks_skipped = AtomicU64::new(0);
114        let errors = AtomicU64::new(0);
115
116        // Get all CIDs from source
117        let all_cids = self.source.list_cids()?;
118        let total_blocks = all_cids.len() as u64;
119
120        // Migrate in batches
121        for batch in all_cids.chunks(self.config.batch_size) {
122            // Check which blocks already exist in destination if skip_existing is enabled
123            let cids_to_migrate = if self.config.skip_existing {
124                let exists = self.destination.has_many(batch).await?;
125                batch
126                    .iter()
127                    .zip(exists.iter())
128                    .filter_map(|(cid, exists)| {
129                        if *exists {
130                            blocks_skipped.fetch_add(1, Ordering::Relaxed);
131                            None
132                        } else {
133                            Some(*cid)
134                        }
135                    })
136                    .collect::<Vec<_>>()
137            } else {
138                batch.to_vec()
139            };
140
141            if cids_to_migrate.is_empty() {
142                continue;
143            }
144
145            // Get blocks from source
146            let blocks_result = self.source.get_many(&cids_to_migrate).await?;
147
148            // Filter out None values and collect valid blocks
149            let mut valid_blocks = Vec::new();
150            for block_opt in blocks_result {
151                if let Some(block) = block_opt {
152                    bytes_migrated.fetch_add(block.data().len() as u64, Ordering::Relaxed);
153                    valid_blocks.push(block);
154                } else {
155                    errors.fetch_add(1, Ordering::Relaxed);
156                }
157            }
158
159            // Put blocks to destination
160            if !valid_blocks.is_empty() {
161                match self.destination.put_many(&valid_blocks).await {
162                    Ok(_) => {
163                        blocks_migrated.fetch_add(valid_blocks.len() as u64, Ordering::Relaxed);
164
165                        // Verify if enabled
166                        if self.config.verify {
167                            let cids: Vec<Cid> = valid_blocks.iter().map(|b| *b.cid()).collect();
168                            let verified = self.destination.has_many(&cids).await?;
169                            let failed = verified.iter().filter(|&&exists| !exists).count();
170                            if failed > 0 {
171                                errors.fetch_add(failed as u64, Ordering::Relaxed);
172                            }
173                        }
174                    }
175                    Err(_) => {
176                        errors.fetch_add(valid_blocks.len() as u64, Ordering::Relaxed);
177                    }
178                }
179            }
180
181            // Call progress callback
182            if let Some(ref callback) = self.progress_callback {
183                let migrated = blocks_migrated.load(Ordering::Relaxed);
184                callback(migrated, total_blocks);
185            }
186        }
187
188        let mut stats = MigrationStats {
189            blocks_migrated: blocks_migrated.load(Ordering::Relaxed),
190            bytes_migrated: bytes_migrated.load(Ordering::Relaxed),
191            blocks_skipped: blocks_skipped.load(Ordering::Relaxed),
192            errors: errors.load(Ordering::Relaxed),
193            duration: start.elapsed(),
194            blocks_per_second: 0.0,
195            bytes_per_second: 0.0,
196        };
197
198        stats.calculate_throughput(stats.duration);
199
200        Ok(stats)
201    }
202
203    /// Migrate specific CIDs
204    pub async fn migrate_cids(&self, cids: &[Cid]) -> Result<MigrationStats> {
205        let start = Instant::now();
206
207        let blocks_migrated = AtomicU64::new(0);
208        let bytes_migrated = AtomicU64::new(0);
209        let blocks_skipped = AtomicU64::new(0);
210        let errors = AtomicU64::new(0);
211
212        // Migrate in batches
213        for batch in cids.chunks(self.config.batch_size) {
214            // Check which blocks already exist
215            let cids_to_migrate = if self.config.skip_existing {
216                let exists = self.destination.has_many(batch).await?;
217                batch
218                    .iter()
219                    .zip(exists.iter())
220                    .filter_map(|(cid, exists)| {
221                        if *exists {
222                            blocks_skipped.fetch_add(1, Ordering::Relaxed);
223                            None
224                        } else {
225                            Some(*cid)
226                        }
227                    })
228                    .collect::<Vec<_>>()
229            } else {
230                batch.to_vec()
231            };
232
233            if cids_to_migrate.is_empty() {
234                continue;
235            }
236
237            // Get and migrate blocks
238            let blocks_result = self.source.get_many(&cids_to_migrate).await?;
239            let mut valid_blocks = Vec::new();
240
241            for block_opt in blocks_result {
242                if let Some(block) = block_opt {
243                    bytes_migrated.fetch_add(block.data().len() as u64, Ordering::Relaxed);
244                    valid_blocks.push(block);
245                } else {
246                    errors.fetch_add(1, Ordering::Relaxed);
247                }
248            }
249
250            if !valid_blocks.is_empty() {
251                match self.destination.put_many(&valid_blocks).await {
252                    Ok(_) => {
253                        blocks_migrated.fetch_add(valid_blocks.len() as u64, Ordering::Relaxed);
254                    }
255                    Err(_) => {
256                        errors.fetch_add(valid_blocks.len() as u64, Ordering::Relaxed);
257                    }
258                }
259            }
260        }
261
262        let mut stats = MigrationStats {
263            blocks_migrated: blocks_migrated.load(Ordering::Relaxed),
264            bytes_migrated: bytes_migrated.load(Ordering::Relaxed),
265            blocks_skipped: blocks_skipped.load(Ordering::Relaxed),
266            errors: errors.load(Ordering::Relaxed),
267            duration: start.elapsed(),
268            blocks_per_second: 0.0,
269            bytes_per_second: 0.0,
270        };
271
272        stats.calculate_throughput(stats.duration);
273
274        Ok(stats)
275    }
276}
277
278/// Helper function to migrate between stores
279pub async fn migrate_storage<S: BlockStore, D: BlockStore>(
280    source: Arc<S>,
281    destination: Arc<D>,
282) -> Result<MigrationStats> {
283    let migrator = StorageMigrator::new(source, destination);
284    migrator.migrate_all().await
285}
286
287/// Helper function to migrate with progress reporting
288pub async fn migrate_storage_with_progress<S: BlockStore, D: BlockStore, F>(
289    source: Arc<S>,
290    destination: Arc<D>,
291    progress_callback: F,
292) -> Result<MigrationStats>
293where
294    F: Fn(u64, u64) + Send + Sync + 'static,
295{
296    let migrator =
297        StorageMigrator::new(source, destination).with_progress_callback(progress_callback);
298    migrator.migrate_all().await
299}
300
301/// Migrate with custom batch size for optimal performance
302pub async fn migrate_storage_batched<S: BlockStore, D: BlockStore>(
303    source: Arc<S>,
304    destination: Arc<D>,
305    batch_size: usize,
306) -> Result<MigrationStats> {
307    let config = MigrationConfig {
308        batch_size,
309        ..Default::default()
310    };
311    let migrator = StorageMigrator::with_config(source, destination, config);
312    migrator.migrate_all().await
313}
314
315/// Migrate with verification enabled (slower but safer)
316pub async fn migrate_storage_verified<S: BlockStore, D: BlockStore>(
317    source: Arc<S>,
318    destination: Arc<D>,
319) -> Result<MigrationStats> {
320    let config = MigrationConfig {
321        verify: true,
322        ..Default::default()
323    };
324    let migrator = StorageMigrator::with_config(source, destination, config);
325    migrator.migrate_all().await
326}
327
328/// Estimate migration time and space requirements
329#[derive(Debug, Clone)]
330pub struct MigrationEstimate {
331    /// Total blocks to migrate
332    pub total_blocks: usize,
333    /// Total bytes to migrate
334    pub total_bytes: u64,
335    /// Estimated duration at 100 blocks/sec
336    pub estimated_duration_low: Duration,
337    /// Estimated duration at 1000 blocks/sec
338    pub estimated_duration_high: Duration,
339    /// Space required in destination
340    pub space_required: u64,
341}
342
343/// Estimate migration requirements
344pub async fn estimate_migration<S: BlockStore>(source: Arc<S>) -> Result<MigrationEstimate> {
345    let all_cids = source.list_cids()?;
346    let total_blocks = all_cids.len();
347
348    // Sample first 100 blocks to estimate average size
349    let sample_size = total_blocks.min(100);
350    let sample_cids: Vec<_> = all_cids.iter().take(sample_size).copied().collect();
351
352    let blocks = source.get_many(&sample_cids).await?;
353    let sample_bytes: u64 = blocks
354        .iter()
355        .filter_map(|b| b.as_ref())
356        .map(|b| b.data().len() as u64)
357        .sum();
358
359    let avg_block_size = if sample_size > 0 {
360        sample_bytes / sample_size as u64
361    } else {
362        0
363    };
364
365    let total_bytes = avg_block_size * total_blocks as u64;
366
367    // Estimate durations (conservative: 100 blocks/sec, optimistic: 1000 blocks/sec)
368    let estimated_duration_low = Duration::from_secs(total_blocks as u64 / 100);
369    let estimated_duration_high = Duration::from_secs(total_blocks as u64 / 1000);
370
371    Ok(MigrationEstimate {
372        total_blocks,
373        total_bytes,
374        estimated_duration_low,
375        estimated_duration_high,
376        space_required: total_bytes,
377    })
378}
379
380/// Migration validation - verify both stores have identical content
381pub async fn validate_migration<S: BlockStore, D: BlockStore>(
382    source: Arc<S>,
383    destination: Arc<D>,
384) -> Result<bool> {
385    let source_cids = source.list_cids()?;
386    let dest_cids = destination.list_cids()?;
387
388    // Check if same number of blocks
389    if source_cids.len() != dest_cids.len() {
390        return Ok(false);
391    }
392
393    // Check all source CIDs exist in destination
394    let exists = destination.has_many(&source_cids).await?;
395    Ok(exists.iter().all(|&e| e))
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401    use crate::MemoryBlockStore;
402    use bytes::Bytes;
403    use ipfrs_core::Block;
404
405    #[tokio::test]
406    async fn test_basic_migration() {
407        let source = Arc::new(MemoryBlockStore::new());
408        let destination = Arc::new(MemoryBlockStore::new());
409
410        // Add some blocks to source
411        for i in 0..10 {
412            let block = Block::new(Bytes::from(format!("block {}", i))).unwrap();
413            source.put(&block).await.unwrap();
414        }
415
416        assert_eq!(source.len(), 10);
417        assert_eq!(destination.len(), 0);
418
419        // Migrate
420        let stats = migrate_storage(source.clone(), destination.clone())
421            .await
422            .unwrap();
423
424        assert_eq!(stats.blocks_migrated, 10);
425        assert_eq!(stats.blocks_skipped, 0);
426        assert_eq!(stats.errors, 0);
427        assert_eq!(destination.len(), 10);
428    }
429
430    #[tokio::test]
431    async fn test_migration_skip_existing() {
432        let source = Arc::new(MemoryBlockStore::new());
433        let destination = Arc::new(MemoryBlockStore::new());
434
435        // Add blocks to both stores
436        let mut blocks = Vec::new();
437        for i in 0..10 {
438            let block = Block::new(Bytes::from(format!("block {}", i))).unwrap();
439            blocks.push(block);
440        }
441
442        // Add all to source
443        for block in &blocks {
444            source.put(block).await.unwrap();
445        }
446
447        // Add first 5 to destination
448        for block in blocks.iter().take(5) {
449            destination.put(block).await.unwrap();
450        }
451
452        // Migrate with skip_existing
453        let config = MigrationConfig {
454            skip_existing: true,
455            ..Default::default()
456        };
457        let migrator = StorageMigrator::with_config(source, destination.clone(), config);
458        let stats = migrator.migrate_all().await.unwrap();
459
460        assert_eq!(stats.blocks_migrated, 5); // Only new blocks
461        assert_eq!(stats.blocks_skipped, 5); // Existing blocks
462        assert_eq!(destination.len(), 10);
463    }
464
465    #[tokio::test]
466    async fn test_migration_with_progress() {
467        let source = Arc::new(MemoryBlockStore::new());
468        let destination = Arc::new(MemoryBlockStore::new());
469
470        // Add blocks
471        for i in 0..20 {
472            let block = Block::new(Bytes::from(format!("block {}", i))).unwrap();
473            source.put(&block).await.unwrap();
474        }
475
476        let progress_called = Arc::new(AtomicU64::new(0));
477        let progress_called_clone = progress_called.clone();
478
479        let stats = migrate_storage_with_progress(source, destination, move |_current, _total| {
480            progress_called_clone.fetch_add(1, Ordering::Relaxed);
481        })
482        .await
483        .unwrap();
484
485        assert_eq!(stats.blocks_migrated, 20);
486        assert!(progress_called.load(Ordering::Relaxed) > 0);
487    }
488
489    #[tokio::test]
490    async fn test_migrate_storage_batched() {
491        let source = Arc::new(MemoryBlockStore::new());
492        let destination = Arc::new(MemoryBlockStore::new());
493
494        // Add blocks
495        for i in 0..50 {
496            let block = Block::new(Bytes::from(format!("block {}", i))).unwrap();
497            source.put(&block).await.unwrap();
498        }
499
500        let stats = migrate_storage_batched(source, destination.clone(), 10)
501            .await
502            .unwrap();
503
504        assert_eq!(stats.blocks_migrated, 50);
505        assert_eq!(destination.len(), 50);
506    }
507
508    #[tokio::test]
509    async fn test_estimate_migration() {
510        let source = Arc::new(MemoryBlockStore::new());
511
512        // Add blocks with unique data (so they have unique CIDs)
513        for i in 0..100 {
514            let block = Block::new(Bytes::from(format!("block {}", i))).unwrap();
515            source.put(&block).await.unwrap();
516        }
517
518        let estimate = estimate_migration(source).await.unwrap();
519
520        assert_eq!(estimate.total_blocks, 100);
521        assert!(estimate.total_bytes > 0);
522        assert!(estimate.space_required > 0);
523    }
524
525    #[tokio::test]
526    async fn test_validate_migration() {
527        let source = Arc::new(MemoryBlockStore::new());
528        let destination = Arc::new(MemoryBlockStore::new());
529
530        // Add same blocks to both
531        for i in 0..10 {
532            let block = Block::new(Bytes::from(format!("block {}", i))).unwrap();
533            source.put(&block).await.unwrap();
534            destination.put(&block).await.unwrap();
535        }
536
537        let valid = validate_migration(source.clone(), destination.clone())
538            .await
539            .unwrap();
540
541        assert!(valid);
542
543        // Add one more block to source only
544        let extra_block = Block::new(Bytes::from("extra")).unwrap();
545        source.put(&extra_block).await.unwrap();
546
547        let valid = validate_migration(source, destination).await.unwrap();
548        assert!(!valid);
549    }
550}