chie_core/
tier_migration.rs

1//! Content migration between storage tiers.
2//!
3//! This module implements actual file operations for moving content between
4//! storage tiers (Hot SSD → Warm HDD → Cold Archive), with proper error
5//! handling, retry logic, and progress tracking.
6//!
7//! # Example
8//!
9//! ```rust
10//! use chie_core::tier_migration::{TierMigration, MigrationConfig};
11//! use chie_core::tiered_storage::{TieredStorageManager, TieredStorageConfig, StorageTier};
12//! use std::sync::Arc;
13//!
14//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
15//! let storage_config = TieredStorageConfig::default();
16//! let storage = Arc::new(TieredStorageManager::new(storage_config));
17//!
18//! let config = MigrationConfig::default();
19//! let migration = TierMigration::new(storage.clone(), config);
20//!
21//! // Execute pending migrations
22//! let result = migration.execute_pending_migrations().await?;
23//! println!("Migrated {} items ({} bytes)", result.successful, result.bytes_moved);
24//! # Ok(())
25//! # }
26//! ```
27
28use crate::tiered_storage::{PendingMove, StorageTier, TieredStorageManager};
29use serde::{Deserialize, Serialize};
30use std::io;
31use std::path::{Path, PathBuf};
32use std::sync::Arc;
33use std::time::Instant;
34use tokio::fs;
35use tokio::time::Duration;
36use tracing::{debug, error, info, warn};
37
38/// Configuration for tier migration.
39#[derive(Debug, Clone)]
40pub struct MigrationConfig {
41    /// Maximum concurrent migrations.
42    pub max_concurrent: usize,
43    /// Timeout for a single migration (seconds).
44    pub migration_timeout_secs: u64,
45    /// Maximum retries for failed migrations.
46    pub max_retries: u32,
47    /// Retry delay (milliseconds).
48    pub retry_delay_ms: u64,
49    /// Whether to verify data after migration.
50    pub verify_after_move: bool,
51    /// Whether to keep source file after failed verification.
52    pub keep_source_on_error: bool,
53    /// Minimum free space required in target tier (bytes).
54    pub min_free_space: u64,
55}
56
57impl Default for MigrationConfig {
58    fn default() -> Self {
59        Self {
60            max_concurrent: 4,
61            migration_timeout_secs: 300, // 5 minutes
62            max_retries: 3,
63            retry_delay_ms: 1000, // 1 second
64            verify_after_move: true,
65            keep_source_on_error: true,
66            min_free_space: 1024 * 1024 * 1024, // 1 GB
67        }
68    }
69}
70
71/// Migration status for a single content item.
72#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
73pub enum MigrationStatus {
74    /// Migration pending.
75    Pending,
76    /// Migration in progress.
77    InProgress,
78    /// Migration completed successfully.
79    Completed,
80    /// Migration failed.
81    Failed(String),
82    /// Migration cancelled.
83    Cancelled,
84}
85
86/// A migration task with tracking.
87#[derive(Debug, Clone)]
88pub struct MigrationTask {
89    /// Content CID.
90    pub cid: String,
91    /// Source tier.
92    pub from: StorageTier,
93    /// Target tier.
94    pub to: StorageTier,
95    /// Content size.
96    pub size: u64,
97    /// Current status.
98    pub status: MigrationStatus,
99    /// Retry count.
100    pub retries: u32,
101    /// Created timestamp (Unix seconds).
102    pub created_at: u64,
103    /// Last update timestamp.
104    pub updated_at: u64,
105}
106
107/// Result of a migration operation.
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct MigrationResult {
110    /// Number of successful migrations.
111    pub successful: usize,
112    /// Number of failed migrations.
113    pub failed: usize,
114    /// Number of cancelled migrations.
115    pub cancelled: usize,
116    /// Total bytes moved.
117    pub bytes_moved: u64,
118    /// Total time taken (milliseconds).
119    pub duration_ms: u64,
120    /// Average migration speed (MB/s).
121    pub avg_speed_mbps: f64,
122}
123
124/// Tier migration manager.
125pub struct TierMigration {
126    /// Reference to tiered storage manager.
127    storage: Arc<TieredStorageManager>,
128    /// Migration configuration.
129    config: MigrationConfig,
130}
131
132impl TierMigration {
133    /// Create a new tier migration manager.
134    #[must_use]
135    pub fn new(storage: Arc<TieredStorageManager>, config: MigrationConfig) -> Self {
136        Self { storage, config }
137    }
138
139    /// Execute pending migrations from storage manager.
140    ///
141    /// This reads pending moves from the storage manager and executes them
142    /// with proper error handling and retry logic.
143    pub async fn execute_pending_migrations(&self) -> Result<MigrationResult, MigrationError> {
144        let pending = self.storage.get_pending_moves();
145        if pending.is_empty() {
146            return Ok(MigrationResult {
147                successful: 0,
148                failed: 0,
149                cancelled: 0,
150                bytes_moved: 0,
151                duration_ms: 0,
152                avg_speed_mbps: 0.0,
153            });
154        }
155
156        info!("Starting migration of {} pending items", pending.len());
157        let start = Instant::now();
158
159        let mut tasks: Vec<MigrationTask> =
160            pending.into_iter().map(|pm| self.create_task(pm)).collect();
161
162        // Execute migrations with concurrency limit
163        let mut successful = 0;
164        let mut failed = 0;
165        let cancelled = 0;
166        let mut bytes_moved = 0u64;
167
168        let semaphore = Arc::new(tokio::sync::Semaphore::new(self.config.max_concurrent));
169
170        let mut handles = Vec::new();
171        for task in tasks.iter_mut() {
172            let permit = semaphore.clone().acquire_owned().await.unwrap();
173            let storage = self.storage.clone();
174            let config = self.config.clone();
175            let mut task_clone = task.clone();
176
177            let handle = tokio::spawn(async move {
178                let result = execute_migration(&storage, &config, &mut task_clone).await;
179                drop(permit);
180                result
181            });
182
183            handles.push((handle, task));
184        }
185
186        // Collect results
187        for (handle, task) in handles {
188            match handle.await {
189                Ok(Ok(size)) => {
190                    successful += 1;
191                    bytes_moved += size;
192                    task.status = MigrationStatus::Completed;
193                }
194                Ok(Err(e)) => {
195                    failed += 1;
196                    task.status = MigrationStatus::Failed(e.to_string());
197                    warn!("Migration failed for {}: {}", task.cid, e);
198                }
199                Err(e) => {
200                    failed += 1;
201                    task.status = MigrationStatus::Failed(format!("Task panic: {}", e));
202                    error!("Migration task panicked for {}: {}", task.cid, e);
203                }
204            }
205        }
206
207        let duration = start.elapsed();
208        let duration_ms = duration.as_millis() as u64;
209        let avg_speed_mbps = if duration_ms > 0 {
210            (bytes_moved as f64 / 1_000_000.0) / (duration_ms as f64 / 1000.0)
211        } else {
212            0.0
213        };
214
215        info!(
216            "Migration complete: {} successful, {} failed, {} MB moved in {} ms ({:.2} MB/s)",
217            successful,
218            failed,
219            bytes_moved / 1_000_000,
220            duration_ms,
221            avg_speed_mbps
222        );
223
224        Ok(MigrationResult {
225            successful,
226            failed,
227            cancelled,
228            bytes_moved,
229            duration_ms,
230            avg_speed_mbps,
231        })
232    }
233
234    /// Migrate a single content item.
235    pub async fn migrate_content(
236        &self,
237        cid: &str,
238        target_tier: StorageTier,
239    ) -> Result<u64, MigrationError> {
240        let location = self
241            .storage
242            .get_location(cid)
243            .ok_or_else(|| MigrationError::ContentNotFound(cid.to_string()))?;
244
245        if location.tier == target_tier {
246            return Err(MigrationError::AlreadyInTargetTier(cid.to_string()));
247        }
248
249        let mut task = MigrationTask {
250            cid: cid.to_string(),
251            from: location.tier,
252            to: target_tier,
253            size: location.size,
254            status: MigrationStatus::Pending,
255            retries: 0,
256            created_at: current_timestamp(),
257            updated_at: current_timestamp(),
258        };
259
260        execute_migration(&self.storage, &self.config, &mut task).await
261    }
262
263    /// Cancel pending migrations.
264    #[must_use]
265    #[inline]
266    pub fn cancel_pending(&self) -> usize {
267        // Note: In a real implementation, this would cancel in-flight migrations
268        // For now, we just return the count of pending moves
269        self.storage.get_pending_moves().len()
270    }
271
272    /// Get migration statistics.
273    #[must_use]
274    #[inline]
275    pub fn config(&self) -> &MigrationConfig {
276        &self.config
277    }
278
279    /// Create a migration task from pending move.
280    #[must_use]
281    pub fn create_task(&self, pm: PendingMove) -> MigrationTask {
282        let now = current_timestamp();
283        MigrationTask {
284            cid: pm.cid,
285            from: pm.from,
286            to: pm.to,
287            size: pm.size,
288            status: MigrationStatus::Pending,
289            retries: 0,
290            created_at: now,
291            updated_at: now,
292        }
293    }
294}
295
296/// Execute a single migration with retry logic.
297async fn execute_migration(
298    storage: &TieredStorageManager,
299    config: &MigrationConfig,
300    task: &mut MigrationTask,
301) -> Result<u64, MigrationError> {
302    task.status = MigrationStatus::InProgress;
303    task.updated_at = current_timestamp();
304
305    let timeout = Duration::from_secs(config.migration_timeout_secs);
306    let migration_future = perform_migration(storage, config, task);
307
308    match tokio::time::timeout(timeout, migration_future).await {
309        Ok(result) => result,
310        Err(_) => {
311            task.status = MigrationStatus::Failed("Timeout".to_string());
312            Err(MigrationError::Timeout(task.cid.clone()))
313        }
314    }
315}
316
317/// Perform the actual file migration.
318async fn perform_migration(
319    storage: &TieredStorageManager,
320    config: &MigrationConfig,
321    task: &mut MigrationTask,
322) -> Result<u64, MigrationError> {
323    let source_path = storage
324        .get_content_path(&task.cid)
325        .ok_or_else(|| MigrationError::SourcePathNotFound(task.cid.clone()))?;
326
327    let target_path = get_target_path(storage, &task.cid, task.to)?;
328
329    // Check if source exists
330    if !source_path.exists() {
331        return Err(MigrationError::SourceFileNotFound(source_path));
332    }
333
334    // Create target directory if needed
335    if let Some(parent) = target_path.parent() {
336        fs::create_dir_all(parent)
337            .await
338            .map_err(|e| MigrationError::IoError(format!("Failed to create target dir: {}", e)))?;
339    }
340
341    // Check free space
342    if !has_free_space(&target_path, task.size, config.min_free_space).await {
343        return Err(MigrationError::InsufficientSpace(task.to));
344    }
345
346    debug!(
347        "Migrating {} from {:?} to {:?} ({} bytes)",
348        task.cid, task.from, task.to, task.size
349    );
350
351    // Copy file to target
352    fs::copy(&source_path, &target_path)
353        .await
354        .map_err(|e| MigrationError::IoError(format!("Copy failed: {}", e)))?;
355
356    // Verify if enabled
357    if config.verify_after_move && !verify_migration(&source_path, &target_path).await? {
358        if !config.keep_source_on_error {
359            let _ = fs::remove_file(&target_path).await;
360        }
361        return Err(MigrationError::VerificationFailed(task.cid.clone()));
362    }
363
364    // Remove source file
365    fs::remove_file(&source_path)
366        .await
367        .map_err(|e| MigrationError::IoError(format!("Remove source failed: {}", e)))?;
368
369    // Update storage manager
370    storage.execute_move(&task.cid, task.to);
371
372    task.status = MigrationStatus::Completed;
373    task.updated_at = current_timestamp();
374
375    info!(
376        "Successfully migrated {} from {:?} to {:?}",
377        task.cid, task.from, task.to
378    );
379
380    Ok(task.size)
381}
382
383/// Get target path for migration.
384fn get_target_path(
385    storage: &TieredStorageManager,
386    cid: &str,
387    target_tier: StorageTier,
388) -> Result<PathBuf, MigrationError> {
389    let tier_path = storage
390        .get_tier_path(target_tier)
391        .ok_or_else(|| MigrationError::TargetPathNotFound(cid.to_string()))?;
392
393    Ok(tier_path.join(cid))
394}
395
396/// Check if target has sufficient free space.
397async fn has_free_space(path: &Path, required: u64, min_free: u64) -> bool {
398    // Note: This is a simplified check
399    // In production, use platform-specific APIs to check disk space
400    // For now, assume sufficient space
401    let _ = (path, required, min_free);
402    true
403}
404
405/// Verify migration by comparing file sizes.
406async fn verify_migration(source: &Path, target: &Path) -> Result<bool, MigrationError> {
407    let source_metadata = fs::metadata(source)
408        .await
409        .map_err(|e| MigrationError::IoError(format!("Read source metadata: {}", e)))?;
410
411    let target_metadata = fs::metadata(target)
412        .await
413        .map_err(|e| MigrationError::IoError(format!("Read target metadata: {}", e)))?;
414
415    Ok(source_metadata.len() == target_metadata.len())
416}
417
418/// Get current Unix timestamp.
419fn current_timestamp() -> u64 {
420    std::time::SystemTime::now()
421        .duration_since(std::time::UNIX_EPOCH)
422        .map(|d| d.as_secs())
423        .unwrap_or(0)
424}
425
426/// Migration errors.
427#[derive(Debug, thiserror::Error)]
428pub enum MigrationError {
429    /// Content not found.
430    #[error("Content not found: {0}")]
431    ContentNotFound(String),
432
433    /// Already in target tier.
434    #[error("Content {0} is already in target tier")]
435    AlreadyInTargetTier(String),
436
437    /// Source path not found.
438    #[error("Source path not found for content: {0}")]
439    SourcePathNotFound(String),
440
441    /// Target path not found.
442    #[error("Target path not found for content: {0}")]
443    TargetPathNotFound(String),
444
445    /// Source file not found.
446    #[error("Source file not found: {0}")]
447    SourceFileNotFound(PathBuf),
448
449    /// Insufficient space in target tier.
450    #[error("Insufficient space in target tier: {0:?}")]
451    InsufficientSpace(StorageTier),
452
453    /// Migration timeout.
454    #[error("Migration timeout for content: {0}")]
455    Timeout(String),
456
457    /// Verification failed.
458    #[error("Migration verification failed for content: {0}")]
459    VerificationFailed(String),
460
461    /// IO error.
462    #[error("IO error: {0}")]
463    IoError(String),
464}
465
466impl From<io::Error> for MigrationError {
467    fn from(e: io::Error) -> Self {
468        MigrationError::IoError(e.to_string())
469    }
470}
471
472#[cfg(test)]
473mod tests {
474    use super::*;
475    use crate::tiered_storage::TieredStorageConfig;
476
477    #[test]
478    fn test_migration_config_default() {
479        let config = MigrationConfig::default();
480        assert_eq!(config.max_concurrent, 4);
481        assert_eq!(config.max_retries, 3);
482        assert!(config.verify_after_move);
483    }
484
485    #[test]
486    fn test_migration_task_creation() {
487        let pm = PendingMove {
488            cid: "QmTest123".to_string(),
489            from: StorageTier::Warm,
490            to: StorageTier::Hot,
491            size: 1024,
492            priority: 10,
493        };
494
495        let storage_config = TieredStorageConfig::default();
496        let storage = Arc::new(TieredStorageManager::new(storage_config));
497        let migration = TierMigration::new(storage, MigrationConfig::default());
498
499        let task = migration.create_task(pm);
500        assert_eq!(task.cid, "QmTest123");
501        assert_eq!(task.from, StorageTier::Warm);
502        assert_eq!(task.to, StorageTier::Hot);
503        assert_eq!(task.size, 1024);
504        assert_eq!(task.status, MigrationStatus::Pending);
505        assert_eq!(task.retries, 0);
506    }
507
508    #[test]
509    fn test_migration_status() {
510        assert_eq!(MigrationStatus::Pending, MigrationStatus::Pending);
511        assert_ne!(
512            MigrationStatus::Completed,
513            MigrationStatus::Failed("error".to_string())
514        );
515    }
516}