1use crate::tiered_storage::{PendingMove, StorageTier, TieredStorageManager};
29use serde::{Deserialize, Serialize};
30use std::io;
31use std::path::{Path, PathBuf};
32use std::sync::Arc;
33use std::time::Instant;
34use tokio::fs;
35use tokio::time::Duration;
36use tracing::{debug, error, info, warn};
37
38#[derive(Debug, Clone)]
40pub struct MigrationConfig {
41 pub max_concurrent: usize,
43 pub migration_timeout_secs: u64,
45 pub max_retries: u32,
47 pub retry_delay_ms: u64,
49 pub verify_after_move: bool,
51 pub keep_source_on_error: bool,
53 pub min_free_space: u64,
55}
56
57impl Default for MigrationConfig {
58 fn default() -> Self {
59 Self {
60 max_concurrent: 4,
61 migration_timeout_secs: 300, max_retries: 3,
63 retry_delay_ms: 1000, verify_after_move: true,
65 keep_source_on_error: true,
66 min_free_space: 1024 * 1024 * 1024, }
68 }
69}
70
71#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
73pub enum MigrationStatus {
74 Pending,
76 InProgress,
78 Completed,
80 Failed(String),
82 Cancelled,
84}
85
86#[derive(Debug, Clone)]
88pub struct MigrationTask {
89 pub cid: String,
91 pub from: StorageTier,
93 pub to: StorageTier,
95 pub size: u64,
97 pub status: MigrationStatus,
99 pub retries: u32,
101 pub created_at: u64,
103 pub updated_at: u64,
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct MigrationResult {
110 pub successful: usize,
112 pub failed: usize,
114 pub cancelled: usize,
116 pub bytes_moved: u64,
118 pub duration_ms: u64,
120 pub avg_speed_mbps: f64,
122}
123
124pub struct TierMigration {
126 storage: Arc<TieredStorageManager>,
128 config: MigrationConfig,
130}
131
132impl TierMigration {
133 #[must_use]
135 pub fn new(storage: Arc<TieredStorageManager>, config: MigrationConfig) -> Self {
136 Self { storage, config }
137 }
138
139 pub async fn execute_pending_migrations(&self) -> Result<MigrationResult, MigrationError> {
144 let pending = self.storage.get_pending_moves();
145 if pending.is_empty() {
146 return Ok(MigrationResult {
147 successful: 0,
148 failed: 0,
149 cancelled: 0,
150 bytes_moved: 0,
151 duration_ms: 0,
152 avg_speed_mbps: 0.0,
153 });
154 }
155
156 info!("Starting migration of {} pending items", pending.len());
157 let start = Instant::now();
158
159 let mut tasks: Vec<MigrationTask> =
160 pending.into_iter().map(|pm| self.create_task(pm)).collect();
161
162 let mut successful = 0;
164 let mut failed = 0;
165 let cancelled = 0;
166 let mut bytes_moved = 0u64;
167
168 let semaphore = Arc::new(tokio::sync::Semaphore::new(self.config.max_concurrent));
169
170 let mut handles = Vec::new();
171 for task in tasks.iter_mut() {
172 let permit = semaphore.clone().acquire_owned().await.unwrap();
173 let storage = self.storage.clone();
174 let config = self.config.clone();
175 let mut task_clone = task.clone();
176
177 let handle = tokio::spawn(async move {
178 let result = execute_migration(&storage, &config, &mut task_clone).await;
179 drop(permit);
180 result
181 });
182
183 handles.push((handle, task));
184 }
185
186 for (handle, task) in handles {
188 match handle.await {
189 Ok(Ok(size)) => {
190 successful += 1;
191 bytes_moved += size;
192 task.status = MigrationStatus::Completed;
193 }
194 Ok(Err(e)) => {
195 failed += 1;
196 task.status = MigrationStatus::Failed(e.to_string());
197 warn!("Migration failed for {}: {}", task.cid, e);
198 }
199 Err(e) => {
200 failed += 1;
201 task.status = MigrationStatus::Failed(format!("Task panic: {}", e));
202 error!("Migration task panicked for {}: {}", task.cid, e);
203 }
204 }
205 }
206
207 let duration = start.elapsed();
208 let duration_ms = duration.as_millis() as u64;
209 let avg_speed_mbps = if duration_ms > 0 {
210 (bytes_moved as f64 / 1_000_000.0) / (duration_ms as f64 / 1000.0)
211 } else {
212 0.0
213 };
214
215 info!(
216 "Migration complete: {} successful, {} failed, {} MB moved in {} ms ({:.2} MB/s)",
217 successful,
218 failed,
219 bytes_moved / 1_000_000,
220 duration_ms,
221 avg_speed_mbps
222 );
223
224 Ok(MigrationResult {
225 successful,
226 failed,
227 cancelled,
228 bytes_moved,
229 duration_ms,
230 avg_speed_mbps,
231 })
232 }
233
234 pub async fn migrate_content(
236 &self,
237 cid: &str,
238 target_tier: StorageTier,
239 ) -> Result<u64, MigrationError> {
240 let location = self
241 .storage
242 .get_location(cid)
243 .ok_or_else(|| MigrationError::ContentNotFound(cid.to_string()))?;
244
245 if location.tier == target_tier {
246 return Err(MigrationError::AlreadyInTargetTier(cid.to_string()));
247 }
248
249 let mut task = MigrationTask {
250 cid: cid.to_string(),
251 from: location.tier,
252 to: target_tier,
253 size: location.size,
254 status: MigrationStatus::Pending,
255 retries: 0,
256 created_at: current_timestamp(),
257 updated_at: current_timestamp(),
258 };
259
260 execute_migration(&self.storage, &self.config, &mut task).await
261 }
262
263 #[must_use]
265 #[inline]
266 pub fn cancel_pending(&self) -> usize {
267 self.storage.get_pending_moves().len()
270 }
271
272 #[must_use]
274 #[inline]
275 pub fn config(&self) -> &MigrationConfig {
276 &self.config
277 }
278
279 #[must_use]
281 pub fn create_task(&self, pm: PendingMove) -> MigrationTask {
282 let now = current_timestamp();
283 MigrationTask {
284 cid: pm.cid,
285 from: pm.from,
286 to: pm.to,
287 size: pm.size,
288 status: MigrationStatus::Pending,
289 retries: 0,
290 created_at: now,
291 updated_at: now,
292 }
293 }
294}
295
296async fn execute_migration(
298 storage: &TieredStorageManager,
299 config: &MigrationConfig,
300 task: &mut MigrationTask,
301) -> Result<u64, MigrationError> {
302 task.status = MigrationStatus::InProgress;
303 task.updated_at = current_timestamp();
304
305 let timeout = Duration::from_secs(config.migration_timeout_secs);
306 let migration_future = perform_migration(storage, config, task);
307
308 match tokio::time::timeout(timeout, migration_future).await {
309 Ok(result) => result,
310 Err(_) => {
311 task.status = MigrationStatus::Failed("Timeout".to_string());
312 Err(MigrationError::Timeout(task.cid.clone()))
313 }
314 }
315}
316
317async fn perform_migration(
319 storage: &TieredStorageManager,
320 config: &MigrationConfig,
321 task: &mut MigrationTask,
322) -> Result<u64, MigrationError> {
323 let source_path = storage
324 .get_content_path(&task.cid)
325 .ok_or_else(|| MigrationError::SourcePathNotFound(task.cid.clone()))?;
326
327 let target_path = get_target_path(storage, &task.cid, task.to)?;
328
329 if !source_path.exists() {
331 return Err(MigrationError::SourceFileNotFound(source_path));
332 }
333
334 if let Some(parent) = target_path.parent() {
336 fs::create_dir_all(parent)
337 .await
338 .map_err(|e| MigrationError::IoError(format!("Failed to create target dir: {}", e)))?;
339 }
340
341 if !has_free_space(&target_path, task.size, config.min_free_space).await {
343 return Err(MigrationError::InsufficientSpace(task.to));
344 }
345
346 debug!(
347 "Migrating {} from {:?} to {:?} ({} bytes)",
348 task.cid, task.from, task.to, task.size
349 );
350
351 fs::copy(&source_path, &target_path)
353 .await
354 .map_err(|e| MigrationError::IoError(format!("Copy failed: {}", e)))?;
355
356 if config.verify_after_move && !verify_migration(&source_path, &target_path).await? {
358 if !config.keep_source_on_error {
359 let _ = fs::remove_file(&target_path).await;
360 }
361 return Err(MigrationError::VerificationFailed(task.cid.clone()));
362 }
363
364 fs::remove_file(&source_path)
366 .await
367 .map_err(|e| MigrationError::IoError(format!("Remove source failed: {}", e)))?;
368
369 storage.execute_move(&task.cid, task.to);
371
372 task.status = MigrationStatus::Completed;
373 task.updated_at = current_timestamp();
374
375 info!(
376 "Successfully migrated {} from {:?} to {:?}",
377 task.cid, task.from, task.to
378 );
379
380 Ok(task.size)
381}
382
383fn get_target_path(
385 storage: &TieredStorageManager,
386 cid: &str,
387 target_tier: StorageTier,
388) -> Result<PathBuf, MigrationError> {
389 let tier_path = storage
390 .get_tier_path(target_tier)
391 .ok_or_else(|| MigrationError::TargetPathNotFound(cid.to_string()))?;
392
393 Ok(tier_path.join(cid))
394}
395
396async fn has_free_space(path: &Path, required: u64, min_free: u64) -> bool {
398 let _ = (path, required, min_free);
402 true
403}
404
405async fn verify_migration(source: &Path, target: &Path) -> Result<bool, MigrationError> {
407 let source_metadata = fs::metadata(source)
408 .await
409 .map_err(|e| MigrationError::IoError(format!("Read source metadata: {}", e)))?;
410
411 let target_metadata = fs::metadata(target)
412 .await
413 .map_err(|e| MigrationError::IoError(format!("Read target metadata: {}", e)))?;
414
415 Ok(source_metadata.len() == target_metadata.len())
416}
417
418fn current_timestamp() -> u64 {
420 std::time::SystemTime::now()
421 .duration_since(std::time::UNIX_EPOCH)
422 .map(|d| d.as_secs())
423 .unwrap_or(0)
424}
425
426#[derive(Debug, thiserror::Error)]
428pub enum MigrationError {
429 #[error("Content not found: {0}")]
431 ContentNotFound(String),
432
433 #[error("Content {0} is already in target tier")]
435 AlreadyInTargetTier(String),
436
437 #[error("Source path not found for content: {0}")]
439 SourcePathNotFound(String),
440
441 #[error("Target path not found for content: {0}")]
443 TargetPathNotFound(String),
444
445 #[error("Source file not found: {0}")]
447 SourceFileNotFound(PathBuf),
448
449 #[error("Insufficient space in target tier: {0:?}")]
451 InsufficientSpace(StorageTier),
452
453 #[error("Migration timeout for content: {0}")]
455 Timeout(String),
456
457 #[error("Migration verification failed for content: {0}")]
459 VerificationFailed(String),
460
461 #[error("IO error: {0}")]
463 IoError(String),
464}
465
466impl From<io::Error> for MigrationError {
467 fn from(e: io::Error) -> Self {
468 MigrationError::IoError(e.to_string())
469 }
470}
471
472#[cfg(test)]
473mod tests {
474 use super::*;
475 use crate::tiered_storage::TieredStorageConfig;
476
477 #[test]
478 fn test_migration_config_default() {
479 let config = MigrationConfig::default();
480 assert_eq!(config.max_concurrent, 4);
481 assert_eq!(config.max_retries, 3);
482 assert!(config.verify_after_move);
483 }
484
485 #[test]
486 fn test_migration_task_creation() {
487 let pm = PendingMove {
488 cid: "QmTest123".to_string(),
489 from: StorageTier::Warm,
490 to: StorageTier::Hot,
491 size: 1024,
492 priority: 10,
493 };
494
495 let storage_config = TieredStorageConfig::default();
496 let storage = Arc::new(TieredStorageManager::new(storage_config));
497 let migration = TierMigration::new(storage, MigrationConfig::default());
498
499 let task = migration.create_task(pm);
500 assert_eq!(task.cid, "QmTest123");
501 assert_eq!(task.from, StorageTier::Warm);
502 assert_eq!(task.to, StorageTier::Hot);
503 assert_eq!(task.size, 1024);
504 assert_eq!(task.status, MigrationStatus::Pending);
505 assert_eq!(task.retries, 0);
506 }
507
508 #[test]
509 fn test_migration_status() {
510 assert_eq!(MigrationStatus::Pending, MigrationStatus::Pending);
511 assert_ne!(
512 MigrationStatus::Completed,
513 MigrationStatus::Failed("error".to_string())
514 );
515 }
516}