use crate::tiered_storage::{PendingMove, StorageTier, TieredStorageManager};
use serde::{Deserialize, Serialize};
use std::io;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;
use tokio::fs;
use tokio::time::Duration;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone)]
pub struct MigrationConfig {
pub max_concurrent: usize,
pub migration_timeout_secs: u64,
pub max_retries: u32,
pub retry_delay_ms: u64,
pub verify_after_move: bool,
pub keep_source_on_error: bool,
pub min_free_space: u64,
}
impl Default for MigrationConfig {
fn default() -> Self {
Self {
max_concurrent: 4,
migration_timeout_secs: 300, max_retries: 3,
retry_delay_ms: 1000, verify_after_move: true,
keep_source_on_error: true,
min_free_space: 1024 * 1024 * 1024, }
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum MigrationStatus {
Pending,
InProgress,
Completed,
Failed(String),
Cancelled,
}
#[derive(Debug, Clone)]
pub struct MigrationTask {
pub cid: String,
pub from: StorageTier,
pub to: StorageTier,
pub size: u64,
pub status: MigrationStatus,
pub retries: u32,
pub created_at: u64,
pub updated_at: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MigrationResult {
pub successful: usize,
pub failed: usize,
pub cancelled: usize,
pub bytes_moved: u64,
pub duration_ms: u64,
pub avg_speed_mbps: f64,
}
pub struct TierMigration {
storage: Arc<TieredStorageManager>,
config: MigrationConfig,
}
impl TierMigration {
#[must_use]
pub fn new(storage: Arc<TieredStorageManager>, config: MigrationConfig) -> Self {
Self { storage, config }
}
pub async fn execute_pending_migrations(&self) -> Result<MigrationResult, MigrationError> {
let pending = self.storage.get_pending_moves();
if pending.is_empty() {
return Ok(MigrationResult {
successful: 0,
failed: 0,
cancelled: 0,
bytes_moved: 0,
duration_ms: 0,
avg_speed_mbps: 0.0,
});
}
info!("Starting migration of {} pending items", pending.len());
let start = Instant::now();
let mut tasks: Vec<MigrationTask> =
pending.into_iter().map(|pm| self.create_task(pm)).collect();
let mut successful = 0;
let mut failed = 0;
let cancelled = 0;
let mut bytes_moved = 0u64;
let semaphore = Arc::new(tokio::sync::Semaphore::new(self.config.max_concurrent));
let mut handles = Vec::new();
for task in tasks.iter_mut() {
let permit = semaphore.clone().acquire_owned().await.unwrap();
let storage = self.storage.clone();
let config = self.config.clone();
let mut task_clone = task.clone();
let handle = tokio::spawn(async move {
let result = execute_migration(&storage, &config, &mut task_clone).await;
drop(permit);
result
});
handles.push((handle, task));
}
for (handle, task) in handles {
match handle.await {
Ok(Ok(size)) => {
successful += 1;
bytes_moved += size;
task.status = MigrationStatus::Completed;
}
Ok(Err(e)) => {
failed += 1;
task.status = MigrationStatus::Failed(e.to_string());
warn!("Migration failed for {}: {}", task.cid, e);
}
Err(e) => {
failed += 1;
task.status = MigrationStatus::Failed(format!("Task panic: {}", e));
error!("Migration task panicked for {}: {}", task.cid, e);
}
}
}
let duration = start.elapsed();
let duration_ms = duration.as_millis() as u64;
let avg_speed_mbps = if duration_ms > 0 {
(bytes_moved as f64 / 1_000_000.0) / (duration_ms as f64 / 1000.0)
} else {
0.0
};
info!(
"Migration complete: {} successful, {} failed, {} MB moved in {} ms ({:.2} MB/s)",
successful,
failed,
bytes_moved / 1_000_000,
duration_ms,
avg_speed_mbps
);
Ok(MigrationResult {
successful,
failed,
cancelled,
bytes_moved,
duration_ms,
avg_speed_mbps,
})
}
pub async fn migrate_content(
&self,
cid: &str,
target_tier: StorageTier,
) -> Result<u64, MigrationError> {
let location = self
.storage
.get_location(cid)
.ok_or_else(|| MigrationError::ContentNotFound(cid.to_string()))?;
if location.tier == target_tier {
return Err(MigrationError::AlreadyInTargetTier(cid.to_string()));
}
let mut task = MigrationTask {
cid: cid.to_string(),
from: location.tier,
to: target_tier,
size: location.size,
status: MigrationStatus::Pending,
retries: 0,
created_at: current_timestamp(),
updated_at: current_timestamp(),
};
execute_migration(&self.storage, &self.config, &mut task).await
}
#[must_use]
#[inline]
pub fn cancel_pending(&self) -> usize {
self.storage.get_pending_moves().len()
}
#[must_use]
#[inline]
pub fn config(&self) -> &MigrationConfig {
&self.config
}
#[must_use]
pub fn create_task(&self, pm: PendingMove) -> MigrationTask {
let now = current_timestamp();
MigrationTask {
cid: pm.cid,
from: pm.from,
to: pm.to,
size: pm.size,
status: MigrationStatus::Pending,
retries: 0,
created_at: now,
updated_at: now,
}
}
}
async fn execute_migration(
storage: &TieredStorageManager,
config: &MigrationConfig,
task: &mut MigrationTask,
) -> Result<u64, MigrationError> {
task.status = MigrationStatus::InProgress;
task.updated_at = current_timestamp();
let timeout = Duration::from_secs(config.migration_timeout_secs);
let migration_future = perform_migration(storage, config, task);
match tokio::time::timeout(timeout, migration_future).await {
Ok(result) => result,
Err(_) => {
task.status = MigrationStatus::Failed("Timeout".to_string());
Err(MigrationError::Timeout(task.cid.clone()))
}
}
}
async fn perform_migration(
storage: &TieredStorageManager,
config: &MigrationConfig,
task: &mut MigrationTask,
) -> Result<u64, MigrationError> {
let source_path = storage
.get_content_path(&task.cid)
.ok_or_else(|| MigrationError::SourcePathNotFound(task.cid.clone()))?;
let target_path = get_target_path(storage, &task.cid, task.to)?;
if !source_path.exists() {
return Err(MigrationError::SourceFileNotFound(source_path));
}
if let Some(parent) = target_path.parent() {
fs::create_dir_all(parent)
.await
.map_err(|e| MigrationError::IoError(format!("Failed to create target dir: {}", e)))?;
}
if !has_free_space(&target_path, task.size, config.min_free_space).await {
return Err(MigrationError::InsufficientSpace(task.to));
}
debug!(
"Migrating {} from {:?} to {:?} ({} bytes)",
task.cid, task.from, task.to, task.size
);
fs::copy(&source_path, &target_path)
.await
.map_err(|e| MigrationError::IoError(format!("Copy failed: {}", e)))?;
if config.verify_after_move && !verify_migration(&source_path, &target_path).await? {
if !config.keep_source_on_error {
let _ = fs::remove_file(&target_path).await;
}
return Err(MigrationError::VerificationFailed(task.cid.clone()));
}
fs::remove_file(&source_path)
.await
.map_err(|e| MigrationError::IoError(format!("Remove source failed: {}", e)))?;
storage.execute_move(&task.cid, task.to);
task.status = MigrationStatus::Completed;
task.updated_at = current_timestamp();
info!(
"Successfully migrated {} from {:?} to {:?}",
task.cid, task.from, task.to
);
Ok(task.size)
}
fn get_target_path(
storage: &TieredStorageManager,
cid: &str,
target_tier: StorageTier,
) -> Result<PathBuf, MigrationError> {
let tier_path = storage
.get_tier_path(target_tier)
.ok_or_else(|| MigrationError::TargetPathNotFound(cid.to_string()))?;
Ok(tier_path.join(cid))
}
async fn has_free_space(path: &Path, required: u64, min_free: u64) -> bool {
let _ = (path, required, min_free);
true
}
async fn verify_migration(source: &Path, target: &Path) -> Result<bool, MigrationError> {
let source_metadata = fs::metadata(source)
.await
.map_err(|e| MigrationError::IoError(format!("Read source metadata: {}", e)))?;
let target_metadata = fs::metadata(target)
.await
.map_err(|e| MigrationError::IoError(format!("Read target metadata: {}", e)))?;
Ok(source_metadata.len() == target_metadata.len())
}
fn current_timestamp() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
#[derive(Debug, thiserror::Error)]
pub enum MigrationError {
#[error("Content not found: {0}")]
ContentNotFound(String),
#[error("Content {0} is already in target tier")]
AlreadyInTargetTier(String),
#[error("Source path not found for content: {0}")]
SourcePathNotFound(String),
#[error("Target path not found for content: {0}")]
TargetPathNotFound(String),
#[error("Source file not found: {0}")]
SourceFileNotFound(PathBuf),
#[error("Insufficient space in target tier: {0:?}")]
InsufficientSpace(StorageTier),
#[error("Migration timeout for content: {0}")]
Timeout(String),
#[error("Migration verification failed for content: {0}")]
VerificationFailed(String),
#[error("IO error: {0}")]
IoError(String),
}
impl From<io::Error> for MigrationError {
fn from(e: io::Error) -> Self {
MigrationError::IoError(e.to_string())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tiered_storage::TieredStorageConfig;
#[test]
fn test_migration_config_default() {
let config = MigrationConfig::default();
assert_eq!(config.max_concurrent, 4);
assert_eq!(config.max_retries, 3);
assert!(config.verify_after_move);
}
#[test]
fn test_migration_task_creation() {
let pm = PendingMove {
cid: "QmTest123".to_string(),
from: StorageTier::Warm,
to: StorageTier::Hot,
size: 1024,
priority: 10,
};
let storage_config = TieredStorageConfig::default();
let storage = Arc::new(TieredStorageManager::new(storage_config));
let migration = TierMigration::new(storage, MigrationConfig::default());
let task = migration.create_task(pm);
assert_eq!(task.cid, "QmTest123");
assert_eq!(task.from, StorageTier::Warm);
assert_eq!(task.to, StorageTier::Hot);
assert_eq!(task.size, 1024);
assert_eq!(task.status, MigrationStatus::Pending);
assert_eq!(task.retries, 0);
}
#[test]
fn test_migration_status() {
assert_eq!(MigrationStatus::Pending, MigrationStatus::Pending);
assert_ne!(
MigrationStatus::Completed,
MigrationStatus::Failed("error".to_string())
);
}
}