Skip to main content

rusmes_cli/commands/
migrate.rs

1//! Storage migration command
2
3use anyhow::{Context, Result};
4use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
5use rusmes_proto::{Mail, MessageId, Username};
6use rusmes_storage::backends::{
7    amaters::{AmatersBackend, AmatersConfig},
8    filesystem::FilesystemBackend,
9    postgres_complete::PostgresCompleteBackend,
10};
11use rusmes_storage::{MailboxId, MessageStore, StorageBackend};
12use serde::{Deserialize, Serialize};
13use sha2::{Digest, Sha256};
14use std::collections::HashMap;
15use std::path::PathBuf;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use tokio::sync::Semaphore;
19
20/// Storage backend type
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
22pub enum BackendType {
23    Filesystem,
24    Postgres,
25    Amaters,
26}
27
28impl std::str::FromStr for BackendType {
29    type Err = anyhow::Error;
30
31    fn from_str(s: &str) -> Result<Self> {
32        match s.to_lowercase().as_str() {
33            "filesystem" | "fs" => Ok(BackendType::Filesystem),
34            "postgres" | "postgresql" | "pg" => Ok(BackendType::Postgres),
35            "amaters" => Ok(BackendType::Amaters),
36            _ => Err(anyhow::anyhow!("Unknown backend type: {}", s)),
37        }
38    }
39}
40
41impl std::fmt::Display for BackendType {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        match self {
44            BackendType::Filesystem => write!(f, "filesystem"),
45            BackendType::Postgres => write!(f, "postgres"),
46            BackendType::Amaters => write!(f, "amaters"),
47        }
48    }
49}
50
51/// Migration configuration
52#[derive(Debug, Clone)]
53pub struct MigrationConfig {
54    pub source_type: BackendType,
55    pub source_config: String,
56    pub dest_type: BackendType,
57    pub dest_config: String,
58    pub batch_size: usize,
59    pub parallel: usize,
60    pub verify: bool,
61    pub dry_run: bool,
62    pub resume: bool,
63}
64
65impl Default for MigrationConfig {
66    fn default() -> Self {
67        Self {
68            source_type: BackendType::Filesystem,
69            source_config: "/var/lib/rusmes/mail".to_string(),
70            dest_type: BackendType::Postgres,
71            dest_config: "postgresql://localhost/rusmes".to_string(),
72            batch_size: 100,
73            parallel: 4,
74            verify: true,
75            dry_run: false,
76            resume: false,
77        }
78    }
79}
80
81/// Migration progress tracker
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct MigrationProgress {
84    pub total_users: usize,
85    pub migrated_users: usize,
86    pub total_mailboxes: usize,
87    pub migrated_mailboxes: usize,
88    pub total_messages: usize,
89    pub migrated_messages: usize,
90    pub total_bytes: u64,
91    pub migrated_bytes: u64,
92    pub failed_messages: Vec<String>,
93    pub migrated_user_list: Vec<String>,
94    pub migrated_mailbox_map: HashMap<String, String>,
95    pub started_at: i64,
96    pub last_updated_at: i64,
97    pub completed_at: Option<i64>,
98}
99
100impl MigrationProgress {
101    pub fn new() -> Self {
102        let now = chrono::Utc::now().timestamp();
103        Self {
104            total_users: 0,
105            migrated_users: 0,
106            total_mailboxes: 0,
107            migrated_mailboxes: 0,
108            total_messages: 0,
109            migrated_messages: 0,
110            total_bytes: 0,
111            migrated_bytes: 0,
112            failed_messages: Vec::new(),
113            migrated_user_list: Vec::new(),
114            migrated_mailbox_map: HashMap::new(),
115            started_at: now,
116            last_updated_at: now,
117            completed_at: None,
118        }
119    }
120
121    pub fn save_to_file(&self, path: &PathBuf) -> Result<()> {
122        let json = serde_json::to_string_pretty(self)?;
123        std::fs::write(path, json)?;
124        Ok(())
125    }
126
127    pub fn load_from_file(path: &PathBuf) -> Result<Self> {
128        let json = std::fs::read_to_string(path)?;
129        Ok(serde_json::from_str(&json)?)
130    }
131
132    pub fn mark_user_migrated(&mut self, user: &str) {
133        self.migrated_user_list.push(user.to_string());
134        self.migrated_users += 1;
135        self.last_updated_at = chrono::Utc::now().timestamp();
136    }
137
138    pub fn is_user_migrated(&self, user: &str) -> bool {
139        self.migrated_user_list.contains(&user.to_string())
140    }
141
142    pub fn mark_mailbox_migrated(&mut self, mailbox_key: String, mailbox_id: String) {
143        self.migrated_mailbox_map.insert(mailbox_key, mailbox_id);
144        self.migrated_mailboxes += 1;
145        self.last_updated_at = chrono::Utc::now().timestamp();
146    }
147
148    pub fn is_mailbox_migrated(&self, mailbox_key: &str) -> bool {
149        self.migrated_mailbox_map.contains_key(mailbox_key)
150    }
151
152    pub fn progress_percentage(&self) -> f64 {
153        if self.total_messages == 0 {
154            0.0
155        } else {
156            (self.migrated_messages as f64 / self.total_messages as f64) * 100.0
157        }
158    }
159
160    pub fn eta_seconds(&self) -> Option<u64> {
161        if self.migrated_messages == 0 {
162            return None;
163        }
164
165        let elapsed = self.last_updated_at - self.started_at;
166        if elapsed <= 0 {
167            return None;
168        }
169
170        let remaining = self.total_messages.saturating_sub(self.migrated_messages);
171        let rate = self.migrated_messages as f64 / elapsed as f64;
172
173        if rate <= 0.0 {
174            return None;
175        }
176
177        Some((remaining as f64 / rate) as u64)
178    }
179
180    pub fn messages_per_second(&self) -> f64 {
181        let elapsed = self.last_updated_at - self.started_at;
182        if elapsed <= 0 {
183            return 0.0;
184        }
185        self.migrated_messages as f64 / elapsed as f64
186    }
187}
188
189impl Default for MigrationProgress {
190    fn default() -> Self {
191        Self::new()
192    }
193}
194
195/// Migration statistics
196#[derive(Debug, Clone, Serialize, Deserialize)]
197pub struct MigrationStats {
198    pub total_users: usize,
199    pub total_mailboxes: usize,
200    pub total_messages: usize,
201    pub total_bytes: u64,
202    pub migrated_bytes: u64,
203    pub failed_messages: usize,
204    pub duration_secs: u64,
205    pub throughput_msg_sec: f64,
206    pub throughput_mbps: f64,
207}
208
209impl MigrationStats {
210    pub fn from_progress(progress: &MigrationProgress) -> Self {
211        let duration = if let Some(completed) = progress.completed_at {
212            (completed - progress.started_at) as u64
213        } else {
214            (chrono::Utc::now().timestamp() - progress.started_at) as u64
215        };
216
217        let duration_secs = duration.max(1);
218        let throughput_msg_sec = progress.migrated_messages as f64 / duration_secs as f64;
219        let throughput_mbps = if duration_secs > 0 {
220            (progress.migrated_bytes as f64 / 1_048_576.0) / duration_secs as f64
221        } else {
222            0.0
223        };
224
225        Self {
226            total_users: progress.total_users,
227            total_mailboxes: progress.total_mailboxes,
228            total_messages: progress.total_messages,
229            total_bytes: progress.total_bytes,
230            migrated_bytes: progress.migrated_bytes,
231            failed_messages: progress.failed_messages.len(),
232            duration_secs,
233            throughput_msg_sec,
234            throughput_mbps,
235        }
236    }
237
238    pub fn print(&self) {
239        println!("\n=== Migration Statistics ===");
240        println!("Users migrated: {}", self.total_users);
241        println!("Mailboxes migrated: {}", self.total_mailboxes);
242        println!("Messages migrated: {}", self.total_messages);
243        println!(
244            "Data migrated: {:.2} MB",
245            self.migrated_bytes as f64 / 1_048_576.0
246        );
247        println!("Failed messages: {}", self.failed_messages);
248        println!("Duration: {} seconds", self.duration_secs);
249        println!("Throughput: {:.2} msg/s", self.throughput_msg_sec);
250        println!("Throughput: {:.2} MB/s", self.throughput_mbps);
251    }
252}
253
254/// Message checksum for verification
255#[derive(Debug, Clone, Serialize, Deserialize)]
256pub struct MessageChecksum {
257    pub message_id: String,
258    pub size: usize,
259    pub sha256: String,
260}
261
262impl MessageChecksum {
263    pub fn compute(mail: &Mail) -> Self {
264        let size = mail.size();
265
266        // For checksum, we use the message ID and size as a simple integrity check
267        // In a full implementation, we'd serialize the entire message
268        let mut hasher = Sha256::new();
269        hasher.update(mail.message_id().to_string().as_bytes());
270        hasher.update(size.to_le_bytes());
271        let hash = hasher.finalize();
272
273        Self {
274            message_id: mail.message_id().to_string(),
275            size,
276            sha256: format!("{:x}", hash),
277        }
278    }
279}
280
281/// Storage migrator
282pub struct StorageMigrator {
283    config: MigrationConfig,
284    progress: MigrationProgress,
285    progress_file: PathBuf,
286    backup_dir: PathBuf,
287}
288
289impl StorageMigrator {
290    /// Create a new migrator
291    pub fn new(config: MigrationConfig) -> Self {
292        let progress_file = PathBuf::from("/tmp/rusmes_migration_progress.json");
293        let backup_dir = PathBuf::from("/tmp/rusmes_migration_backup");
294
295        let progress = if config.resume && progress_file.exists() {
296            MigrationProgress::load_from_file(&progress_file)
297                .unwrap_or_else(|_| MigrationProgress::new())
298        } else {
299            MigrationProgress::new()
300        };
301
302        Self {
303            config,
304            progress,
305            progress_file,
306            backup_dir,
307        }
308    }
309
310    /// Run the migration
311    pub async fn migrate(&mut self) -> Result<MigrationStats> {
312        println!(
313            "Starting migration from {} to {}",
314            self.config.source_type, self.config.dest_type
315        );
316
317        if self.config.dry_run {
318            println!("DRY RUN MODE - No changes will be made");
319        }
320
321        if self.config.resume {
322            println!("Resuming migration from previous state");
323            println!(
324                "Previously migrated: {} users, {} mailboxes, {} messages",
325                self.progress.migrated_users,
326                self.progress.migrated_mailboxes,
327                self.progress.migrated_messages
328            );
329        }
330
331        // Create backup directory
332        if !self.config.dry_run {
333            std::fs::create_dir_all(&self.backup_dir)?;
334        }
335
336        // Create backends
337        let source = self.create_source_backend().await?;
338        let dest = self.create_dest_backend().await?;
339
340        // Backup destination state before migration
341        if !self.config.dry_run && !self.config.resume {
342            println!("Creating backup of destination state...");
343            self.backup_destination_state(dest.as_ref()).await?;
344        }
345
346        // Get all users
347        let users = self.get_users(source.as_ref()).await?;
348        self.progress.total_users = users.len();
349
350        // Count total messages and mailboxes
351        self.count_totals(source.as_ref(), &users).await?;
352
353        println!(
354            "Migration scope: {} users, {} mailboxes, {} messages ({:.2} MB)",
355            self.progress.total_users,
356            self.progress.total_mailboxes,
357            self.progress.total_messages,
358            self.progress.total_bytes as f64 / 1_048_576.0
359        );
360
361        // Create progress bars
362        let multi_progress = MultiProgress::new();
363        let user_pb = multi_progress.add(ProgressBar::new(users.len() as u64));
364        user_pb.set_style(
365            ProgressStyle::default_bar()
366                .template("[{elapsed_precise}] Users {bar:30.cyan/blue} {pos}/{len} ({eta})")
367                .expect("Invalid progress bar template")
368                .progress_chars("=>-"),
369        );
370
371        let message_pb = multi_progress.add(ProgressBar::new(self.progress.total_messages as u64));
372        message_pb.set_style(
373            ProgressStyle::default_bar()
374                .template("[{elapsed_precise}] Messages {bar:30.green/blue} {pos}/{len} ({msg})")
375                .expect("Invalid progress bar template")
376                .progress_chars("=>-"),
377        );
378
379        let start_time = Instant::now();
380
381        // Migrate users
382        for user in &users {
383            if self.config.resume && self.progress.is_user_migrated(&user.to_string()) {
384                user_pb.inc(1);
385                continue;
386            }
387
388            if !self.config.dry_run {
389                self.migrate_user(source.as_ref(), dest.as_ref(), user, &message_pb)
390                    .await?;
391            }
392
393            self.progress.mark_user_migrated(&user.to_string());
394            user_pb.inc(1);
395
396            // Save progress periodically
397            if !self.config.dry_run {
398                self.progress.save_to_file(&self.progress_file)?;
399            }
400
401            // Update message progress bar with current rate
402            let rate = self.progress.messages_per_second();
403            message_pb.set_message(format!("{:.1} msg/s", rate));
404        }
405
406        user_pb.finish_with_message("All users migrated");
407        message_pb.finish_with_message("All messages migrated");
408
409        self.progress.completed_at = Some(chrono::Utc::now().timestamp());
410
411        if !self.config.dry_run {
412            self.progress.save_to_file(&self.progress_file)?;
413        }
414
415        let stats = MigrationStats::from_progress(&self.progress);
416
417        // Verification
418        if self.config.verify && !self.config.dry_run {
419            println!("\nVerifying migration integrity...");
420            self.verify_migration(source.as_ref(), dest.as_ref())
421                .await?;
422        }
423
424        let elapsed = start_time.elapsed();
425        println!("\nMigration completed in {:.2}s", elapsed.as_secs_f64());
426
427        Ok(stats)
428    }
429
430    async fn create_source_backend(&self) -> Result<Box<dyn StorageBackend>> {
431        match self.config.source_type {
432            BackendType::Filesystem => {
433                Ok(Box::new(FilesystemBackend::new(&self.config.source_config)))
434            }
435            BackendType::Postgres => {
436                let backend = PostgresCompleteBackend::new(&self.config.source_config).await?;
437                Ok(Box::new(backend))
438            }
439            BackendType::Amaters => {
440                let config = AmatersConfig::from_url(&self.config.source_config)?;
441                let backend = AmatersBackend::new(config).await?;
442                Ok(Box::new(backend))
443            }
444        }
445    }
446
447    async fn create_dest_backend(&self) -> Result<Box<dyn StorageBackend>> {
448        match self.config.dest_type {
449            BackendType::Filesystem => {
450                let path = &self.config.dest_config;
451                std::fs::create_dir_all(path)?;
452                Ok(Box::new(FilesystemBackend::new(path)))
453            }
454            BackendType::Postgres => {
455                let backend = PostgresCompleteBackend::new(&self.config.dest_config).await?;
456                if !self.config.dry_run {
457                    backend.init_schema().await?;
458                }
459                Ok(Box::new(backend))
460            }
461            BackendType::Amaters => {
462                let config = AmatersConfig::from_url(&self.config.dest_config)?;
463                let backend = AmatersBackend::new(config).await?;
464                if !self.config.dry_run {
465                    backend.init_schema().await?;
466                }
467                Ok(Box::new(backend))
468            }
469        }
470    }
471
472    async fn get_users(&self, backend: &dyn StorageBackend) -> Result<Vec<Username>> {
473        backend.list_all_users().await
474    }
475
476    async fn count_totals(
477        &mut self,
478        source: &dyn StorageBackend,
479        users: &[Username],
480    ) -> Result<()> {
481        let mailbox_store = source.mailbox_store();
482        let message_store = source.message_store();
483
484        let mut total_mailboxes = 0;
485        let mut total_messages = 0;
486        let mut total_bytes = 0u64;
487
488        for user in users {
489            let mailboxes = mailbox_store.list_mailboxes(user).await?;
490            total_mailboxes += mailboxes.len();
491
492            for mailbox in mailboxes {
493                let messages = message_store.get_mailbox_messages(mailbox.id()).await?;
494                total_messages += messages.len();
495                total_bytes += messages.iter().map(|m| m.size() as u64).sum::<u64>();
496            }
497        }
498
499        self.progress.total_mailboxes = total_mailboxes;
500        self.progress.total_messages = total_messages;
501        self.progress.total_bytes = total_bytes;
502
503        Ok(())
504    }
505
506    async fn migrate_user(
507        &mut self,
508        source: &dyn StorageBackend,
509        dest: &dyn StorageBackend,
510        user: &Username,
511        message_pb: &ProgressBar,
512    ) -> Result<()> {
513        let source_mailboxes = source.mailbox_store();
514        let dest_mailboxes = dest.mailbox_store();
515        let source_messages = source.message_store();
516        let dest_messages = dest.message_store();
517
518        // Get all mailboxes for user
519        let mailboxes = source_mailboxes.list_mailboxes(user).await?;
520
521        for mailbox in mailboxes {
522            let mailbox_key = format!("{}:{}", user, mailbox.path());
523
524            if self.config.resume && self.progress.is_mailbox_migrated(&mailbox_key) {
525                continue;
526            }
527
528            // Create mailbox in destination
529            let dest_mailbox_id = dest_mailboxes.create_mailbox(mailbox.path()).await?;
530
531            self.progress
532                .mark_mailbox_migrated(mailbox_key, dest_mailbox_id.to_string());
533
534            // Migrate messages in batches with parallel processing
535            let messages = source_messages.get_mailbox_messages(mailbox.id()).await?;
536
537            let semaphore = Arc::new(Semaphore::new(self.config.parallel));
538            let mut handles = vec![];
539
540            for chunk in messages.chunks(self.config.batch_size) {
541                for message_meta in chunk {
542                    let permit = semaphore.clone().acquire_owned().await?;
543                    let source_messages = Arc::clone(&source_messages);
544                    let dest_messages = Arc::clone(&dest_messages);
545                    let message_id = *message_meta.message_id();
546                    let mailbox_id = dest_mailbox_id;
547                    let size = message_meta.size();
548                    let flags = message_meta.flags().clone();
549
550                    let handle = tokio::spawn(async move {
551                        let _permit = permit;
552                        let result = Self::migrate_single_message(
553                            &source_messages,
554                            &dest_messages,
555                            &message_id,
556                            &mailbox_id,
557                        )
558                        .await;
559                        (message_id, size, flags, result)
560                    });
561
562                    handles.push(handle);
563                }
564
565                // Process batch results
566                for handle in handles.drain(..) {
567                    match handle.await {
568                        Ok((msg_id, size, _flags, result)) => match result {
569                            Ok(_) => {
570                                self.progress.migrated_messages += 1;
571                                self.progress.migrated_bytes += size as u64;
572                                message_pb.inc(1);
573                            }
574                            Err(e) => {
575                                tracing::error!("Failed to migrate message {}: {}", msg_id, e);
576                                self.progress.failed_messages.push(msg_id.to_string());
577                            }
578                        },
579                        Err(e) => {
580                            tracing::error!("Task failed: {}", e);
581                        }
582                    }
583                }
584            }
585        }
586
587        Ok(())
588    }
589
590    async fn migrate_single_message(
591        source: &Arc<dyn MessageStore>,
592        dest: &Arc<dyn MessageStore>,
593        message_id: &MessageId,
594        dest_mailbox_id: &MailboxId,
595    ) -> Result<()> {
596        let message = source
597            .get_message(message_id)
598            .await?
599            .context("Message not found in source")?;
600
601        dest.append_message(dest_mailbox_id, message).await?;
602
603        Ok(())
604    }
605
606    async fn verify_migration(
607        &self,
608        source: &dyn StorageBackend,
609        dest: &dyn StorageBackend,
610    ) -> Result<()> {
611        let source_mailboxes = source.mailbox_store();
612        let dest_mailboxes = dest.mailbox_store();
613        let source_messages = source.message_store();
614        let dest_messages = dest.message_store();
615
616        let users = self.get_users(source).await?;
617
618        let pb = ProgressBar::new(users.len() as u64);
619        pb.set_style(
620            ProgressStyle::default_bar()
621                .template("[{elapsed_precise}] Verifying {bar:40.yellow/blue} {pos}/{len}")
622                .expect("Invalid progress bar template")
623                .progress_chars("=>-"),
624        );
625
626        for user in &users {
627            let source_mboxes = source_mailboxes.list_mailboxes(user).await?;
628            let dest_mboxes = dest_mailboxes.list_mailboxes(user).await?;
629
630            if source_mboxes.len() != dest_mboxes.len() {
631                return Err(anyhow::anyhow!(
632                    "Mailbox count mismatch for user {}: source={}, dest={}",
633                    user,
634                    source_mboxes.len(),
635                    dest_mboxes.len()
636                ));
637            }
638
639            // Verify message counts
640            for (src_mbox, dst_mbox) in source_mboxes.iter().zip(dest_mboxes.iter()) {
641                let src_msgs = source_messages.get_mailbox_messages(src_mbox.id()).await?;
642                let dst_msgs = dest_messages.get_mailbox_messages(dst_mbox.id()).await?;
643
644                if src_msgs.len() != dst_msgs.len() {
645                    return Err(anyhow::anyhow!(
646                        "Message count mismatch in mailbox {}: source={}, dest={}",
647                        src_mbox.path(),
648                        src_msgs.len(),
649                        dst_msgs.len()
650                    ));
651                }
652
653                // Verify checksums for sample messages
654                if !src_msgs.is_empty() {
655                    let sample_size = (src_msgs.len() / 10).clamp(1, 10);
656                    for i in 0..sample_size {
657                        let idx = i * (src_msgs.len() / sample_size);
658                        if let Some(src_meta) = src_msgs.get(idx) {
659                            if let (Some(src_msg), Some(dst_msg)) = (
660                                source_messages.get_message(src_meta.message_id()).await?,
661                                dest_messages.get_message(src_meta.message_id()).await?,
662                            ) {
663                                let src_checksum = MessageChecksum::compute(&src_msg);
664                                let dst_checksum = MessageChecksum::compute(&dst_msg);
665
666                                if src_checksum.sha256 != dst_checksum.sha256 {
667                                    return Err(anyhow::anyhow!(
668                                        "Checksum mismatch for message {}",
669                                        src_meta.message_id()
670                                    ));
671                                }
672                            }
673                        }
674                    }
675                }
676            }
677
678            pb.inc(1);
679        }
680
681        pb.finish_with_message("Verification completed successfully");
682        println!("Verification passed: all mailboxes and messages verified");
683        Ok(())
684    }
685
686    async fn backup_destination_state(&self, _dest: &dyn StorageBackend) -> Result<()> {
687        let backup_metadata = serde_json::json!({
688            "timestamp": chrono::Utc::now().to_rfc3339(),
689            "dest_type": self.config.dest_type,
690            "dest_config": self.config.dest_config,
691        });
692
693        let backup_file = self.backup_dir.join("migration_backup_metadata.json");
694        std::fs::write(backup_file, serde_json::to_string_pretty(&backup_metadata)?)?;
695
696        println!("Backup metadata saved to {:?}", self.backup_dir);
697        Ok(())
698    }
699
700    /// Rollback migration
701    pub async fn rollback(&self) -> Result<()> {
702        println!("Rolling back migration...");
703
704        let backup_file = self.backup_dir.join("migration_backup_metadata.json");
705        if !backup_file.exists() {
706            return Err(anyhow::anyhow!("No backup found to rollback"));
707        }
708
709        println!(
710            "Rollback would restore from backup at {:?}",
711            self.backup_dir
712        );
713        println!("Note: Full rollback implementation requires backend-specific restore logic");
714
715        Ok(())
716    }
717
718    /// Get migration progress
719    pub fn get_progress(&self) -> &MigrationProgress {
720        &self.progress
721    }
722
723    /// Print migration report
724    pub fn print_report(&self) {
725        println!("\n=== Migration Report ===");
726        println!(
727            "Users: {}/{}",
728            self.progress.migrated_users, self.progress.total_users
729        );
730        println!(
731            "Mailboxes: {}/{}",
732            self.progress.migrated_mailboxes, self.progress.total_mailboxes
733        );
734        println!(
735            "Messages: {}/{}",
736            self.progress.migrated_messages, self.progress.total_messages
737        );
738        println!(
739            "Data: {:.2}/{:.2} MB",
740            self.progress.migrated_bytes as f64 / 1_048_576.0,
741            self.progress.total_bytes as f64 / 1_048_576.0
742        );
743
744        if !self.progress.failed_messages.is_empty() {
745            println!("\nFailed messages: {}", self.progress.failed_messages.len());
746            for msg_id in self.progress.failed_messages.iter().take(10) {
747                println!("  - {}", msg_id);
748            }
749            if self.progress.failed_messages.len() > 10 {
750                println!(
751                    "  ... and {} more",
752                    self.progress.failed_messages.len() - 10
753                );
754            }
755        }
756
757        let stats = MigrationStats::from_progress(&self.progress);
758        println!("\nDuration: {} seconds", stats.duration_secs);
759        println!(
760            "Throughput: {:.2} messages/second",
761            stats.throughput_msg_sec
762        );
763        println!("Throughput: {:.2} MB/second", stats.throughput_mbps);
764
765        if let Some(eta) = self.progress.eta_seconds() {
766            let eta_duration = Duration::from_secs(eta);
767            println!("ETA: {:?}", eta_duration);
768        }
769    }
770}
771
772/// Integrity checker
773pub struct IntegrityChecker {
774    backend: Box<dyn StorageBackend>,
775}
776
777impl IntegrityChecker {
778    pub fn new(backend: Box<dyn StorageBackend>) -> Self {
779        Self { backend }
780    }
781
782    /// Check integrity of storage backend
783    pub async fn check(&self) -> Result<IntegrityReport> {
784        let mut report = IntegrityReport::new();
785
786        let mailbox_store = self.backend.mailbox_store();
787        let message_store = self.backend.message_store();
788
789        // Get sample users for testing
790        let users = vec![
791            Username::new("user1@example.com".to_string())?,
792            Username::new("user2@example.com".to_string())?,
793        ];
794
795        for user in &users {
796            let mailboxes = mailbox_store.list_mailboxes(user).await?;
797            report.total_mailboxes += mailboxes.len();
798
799            for mailbox in mailboxes {
800                let messages = message_store.get_mailbox_messages(mailbox.id()).await?;
801                report.total_messages += messages.len();
802
803                // Check for messages that can't be retrieved
804                for msg_meta in messages {
805                    match message_store.get_message(msg_meta.message_id()).await {
806                        Ok(Some(_)) => {}
807                        Ok(None) => {
808                            report
809                                .orphaned_messages
810                                .push(msg_meta.message_id().to_string());
811                        }
812                        Err(e) => {
813                            report.errors.push(format!(
814                                "Error reading message {}: {}",
815                                msg_meta.message_id(),
816                                e
817                            ));
818                        }
819                    }
820                }
821            }
822        }
823
824        Ok(report)
825    }
826}
827
828/// Integrity check report
829#[derive(Debug, Clone, Serialize, Deserialize)]
830pub struct IntegrityReport {
831    pub total_mailboxes: usize,
832    pub total_messages: usize,
833    pub orphaned_messages: Vec<String>,
834    pub errors: Vec<String>,
835}
836
837impl IntegrityReport {
838    pub fn new() -> Self {
839        Self {
840            total_mailboxes: 0,
841            total_messages: 0,
842            orphaned_messages: Vec::new(),
843            errors: Vec::new(),
844        }
845    }
846
847    pub fn print(&self) {
848        println!("\n=== Integrity Report ===");
849        println!("Total mailboxes: {}", self.total_mailboxes);
850        println!("Total messages: {}", self.total_messages);
851        println!("Orphaned messages: {}", self.orphaned_messages.len());
852
853        if !self.orphaned_messages.is_empty() {
854            println!("\nOrphaned messages:");
855            for msg_id in self.orphaned_messages.iter().take(10) {
856                println!("  - {}", msg_id);
857            }
858            if self.orphaned_messages.len() > 10 {
859                println!("  ... and {} more", self.orphaned_messages.len() - 10);
860            }
861        }
862
863        if !self.errors.is_empty() {
864            println!("\nErrors:");
865            for error in self.errors.iter().take(10) {
866                println!("  - {}", error);
867            }
868            if self.errors.len() > 10 {
869                println!("  ... and {} more", self.errors.len() - 10);
870            }
871        }
872
873        if self.orphaned_messages.is_empty() && self.errors.is_empty() {
874            println!("\nNo integrity issues found");
875        }
876    }
877}
878
879impl Default for IntegrityReport {
880    fn default() -> Self {
881        Self::new()
882    }
883}
884
885#[cfg(test)]
886mod tests {
887    use super::*;
888
889    #[test]
890    fn test_backend_type_from_str() {
891        assert_eq!(
892            "filesystem".parse::<BackendType>().unwrap(),
893            BackendType::Filesystem
894        );
895        assert_eq!(
896            "postgres".parse::<BackendType>().unwrap(),
897            BackendType::Postgres
898        );
899        assert_eq!(
900            "amaters".parse::<BackendType>().unwrap(),
901            BackendType::Amaters
902        );
903        assert!("unknown".parse::<BackendType>().is_err());
904    }
905
906    #[test]
907    fn test_backend_type_case_insensitive() {
908        assert_eq!(
909            "FILESYSTEM".parse::<BackendType>().unwrap(),
910            BackendType::Filesystem
911        );
912        assert_eq!(
913            "PostgreSQL".parse::<BackendType>().unwrap(),
914            BackendType::Postgres
915        );
916    }
917
918    #[test]
919    fn test_backend_type_aliases() {
920        assert_eq!(
921            "fs".parse::<BackendType>().unwrap(),
922            BackendType::Filesystem
923        );
924        assert_eq!("pg".parse::<BackendType>().unwrap(), BackendType::Postgres);
925        assert_eq!(
926            "postgresql".parse::<BackendType>().unwrap(),
927            BackendType::Postgres
928        );
929    }
930
931    #[test]
932    fn test_backend_type_display() {
933        assert_eq!(BackendType::Filesystem.to_string(), "filesystem");
934        assert_eq!(BackendType::Postgres.to_string(), "postgres");
935        assert_eq!(BackendType::Amaters.to_string(), "amaters");
936    }
937
938    #[test]
939    fn test_migration_config_default() {
940        let config = MigrationConfig::default();
941        assert_eq!(config.source_type, BackendType::Filesystem);
942        assert_eq!(config.dest_type, BackendType::Postgres);
943        assert_eq!(config.batch_size, 100);
944        assert_eq!(config.parallel, 4);
945        assert!(config.verify);
946        assert!(!config.dry_run);
947        assert!(!config.resume);
948    }
949
950    #[test]
951    fn test_migration_progress_new() {
952        let progress = MigrationProgress::new();
953        assert_eq!(progress.total_users, 0);
954        assert_eq!(progress.migrated_users, 0);
955        assert_eq!(progress.total_mailboxes, 0);
956        assert_eq!(progress.migrated_messages, 0);
957        assert_eq!(progress.total_bytes, 0);
958        assert_eq!(progress.migrated_bytes, 0);
959        assert!(progress.completed_at.is_none());
960        assert!(progress.migrated_user_list.is_empty());
961        assert!(progress.migrated_mailbox_map.is_empty());
962    }
963
964    #[test]
965    fn test_migration_progress_serialization() {
966        let progress = MigrationProgress::new();
967        let json = serde_json::to_string(&progress).unwrap();
968        let deserialized: MigrationProgress = serde_json::from_str(&json).unwrap();
969        assert_eq!(progress.total_users, deserialized.total_users);
970        assert_eq!(progress.migrated_users, deserialized.migrated_users);
971    }
972
973    #[test]
974    fn test_migration_progress_mark_user_migrated() {
975        let mut progress = MigrationProgress::new();
976        progress.total_users = 2;
977
978        assert!(!progress.is_user_migrated("user1"));
979        progress.mark_user_migrated("user1");
980        assert!(progress.is_user_migrated("user1"));
981        assert_eq!(progress.migrated_users, 1);
982        assert!(!progress.is_user_migrated("user2"));
983    }
984
985    #[test]
986    fn test_migration_progress_mark_mailbox_migrated() {
987        let mut progress = MigrationProgress::new();
988
989        assert!(!progress.is_mailbox_migrated("user1:INBOX"));
990        progress.mark_mailbox_migrated("user1:INBOX".to_string(), "mailbox-123".to_string());
991        assert!(progress.is_mailbox_migrated("user1:INBOX"));
992        assert_eq!(progress.migrated_mailboxes, 1);
993    }
994
995    #[test]
996    fn test_migration_progress_percentage() {
997        let mut progress = MigrationProgress::new();
998        assert_eq!(progress.progress_percentage(), 0.0);
999
1000        progress.total_messages = 100;
1001        progress.migrated_messages = 50;
1002        assert_eq!(progress.progress_percentage(), 50.0);
1003
1004        progress.migrated_messages = 100;
1005        assert_eq!(progress.progress_percentage(), 100.0);
1006    }
1007
1008    #[test]
1009    fn test_migration_progress_messages_per_second() {
1010        let mut progress = MigrationProgress::new();
1011        progress.started_at = chrono::Utc::now().timestamp() - 10;
1012        progress.last_updated_at = chrono::Utc::now().timestamp();
1013        progress.migrated_messages = 100;
1014
1015        let rate = progress.messages_per_second();
1016        assert!(rate > 0.0);
1017        assert!(rate <= 10.0);
1018    }
1019
1020    #[test]
1021    fn test_migration_stats_from_progress() {
1022        let mut progress = MigrationProgress::new();
1023        progress.total_users = 5;
1024        progress.total_mailboxes = 20;
1025        progress.total_messages = 1000;
1026        progress.migrated_messages = 1000;
1027        progress.total_bytes = 10_485_760;
1028        progress.migrated_bytes = 10_485_760;
1029        progress.started_at = chrono::Utc::now().timestamp() - 100;
1030        progress.completed_at = Some(chrono::Utc::now().timestamp());
1031
1032        let stats = MigrationStats::from_progress(&progress);
1033        assert_eq!(stats.total_users, 5);
1034        assert_eq!(stats.total_mailboxes, 20);
1035        assert_eq!(stats.total_messages, 1000);
1036        assert!(stats.duration_secs > 0);
1037        assert!(stats.throughput_msg_sec > 0.0);
1038    }
1039
1040    #[test]
1041    fn test_integrity_report_new() {
1042        let report = IntegrityReport::new();
1043        assert_eq!(report.total_mailboxes, 0);
1044        assert_eq!(report.total_messages, 0);
1045        assert!(report.orphaned_messages.is_empty());
1046        assert!(report.errors.is_empty());
1047    }
1048
1049    #[test]
1050    fn test_migration_config_custom() {
1051        let config = MigrationConfig {
1052            source_type: BackendType::Postgres,
1053            dest_type: BackendType::Amaters,
1054            batch_size: 500,
1055            parallel: 8,
1056            verify: false,
1057            dry_run: true,
1058            resume: true,
1059            ..Default::default()
1060        };
1061
1062        assert_eq!(config.source_type, BackendType::Postgres);
1063        assert_eq!(config.dest_type, BackendType::Amaters);
1064        assert_eq!(config.batch_size, 500);
1065        assert_eq!(config.parallel, 8);
1066        assert!(!config.verify);
1067        assert!(config.dry_run);
1068        assert!(config.resume);
1069    }
1070
1071    #[test]
1072    fn test_backend_type_equality() {
1073        assert_eq!(BackendType::Filesystem, BackendType::Filesystem);
1074        assert_ne!(BackendType::Filesystem, BackendType::Postgres);
1075    }
1076
1077    #[tokio::test]
1078    async fn test_migrator_creation() {
1079        let config = MigrationConfig::default();
1080        let migrator = StorageMigrator::new(config);
1081        assert_eq!(migrator.progress.total_users, 0);
1082    }
1083
1084    #[test]
1085    fn test_progress_failed_messages() {
1086        let mut progress = MigrationProgress::new();
1087        progress.failed_messages.push("msg1".to_string());
1088        progress.failed_messages.push("msg2".to_string());
1089        assert_eq!(progress.failed_messages.len(), 2);
1090    }
1091
1092    #[test]
1093    fn test_migration_stats_fields() {
1094        let progress = MigrationProgress::new();
1095        let stats = MigrationStats::from_progress(&progress);
1096        assert_eq!(stats.total_bytes, 0);
1097        assert_eq!(stats.migrated_bytes, 0);
1098        assert_eq!(stats.failed_messages, 0);
1099    }
1100
1101    #[test]
1102    fn test_integrity_report_with_errors() {
1103        let mut report = IntegrityReport::new();
1104        report.errors.push("Error 1".to_string());
1105        report.errors.push("Error 2".to_string());
1106        assert_eq!(report.errors.len(), 2);
1107    }
1108
1109    #[test]
1110    fn test_backend_type_all_variants() {
1111        let fs = BackendType::Filesystem;
1112        let pg = BackendType::Postgres;
1113        let am = BackendType::Amaters;
1114
1115        assert_ne!(fs, pg);
1116        assert_ne!(pg, am);
1117        assert_ne!(fs, am);
1118    }
1119
1120    #[test]
1121    fn test_migration_progress_completion() {
1122        let mut progress = MigrationProgress::new();
1123        assert!(progress.completed_at.is_none());
1124
1125        progress.completed_at = Some(chrono::Utc::now().timestamp());
1126        assert!(progress.completed_at.is_some());
1127    }
1128
1129    #[test]
1130    fn test_migration_progress_eta() {
1131        let mut progress = MigrationProgress::new();
1132        assert!(progress.eta_seconds().is_none());
1133
1134        progress.started_at = chrono::Utc::now().timestamp() - 10;
1135        progress.last_updated_at = chrono::Utc::now().timestamp();
1136        progress.total_messages = 1000;
1137        progress.migrated_messages = 100;
1138
1139        let eta = progress.eta_seconds();
1140        assert!(eta.is_some());
1141    }
1142
1143    #[test]
1144    fn test_message_checksum_compute() {
1145        use bytes::Bytes;
1146        use rusmes_proto::{HeaderMap, MessageBody, MimeMessage};
1147
1148        let headers = HeaderMap::new();
1149        let body = MessageBody::Small(Bytes::from("Test body"));
1150        let message = MimeMessage::new(headers, body);
1151
1152        let mail = Mail::new(
1153            Some("sender@example.com".parse().unwrap()),
1154            vec!["recipient@example.com".parse().unwrap()],
1155            message,
1156            None,
1157            None,
1158        );
1159
1160        let checksum = MessageChecksum::compute(&mail);
1161
1162        assert!(!checksum.sha256.is_empty());
1163        assert_eq!(checksum.sha256.len(), 64);
1164        assert!(checksum.size > 0);
1165    }
1166
1167    #[test]
1168    fn test_migration_progress_default() {
1169        let progress = MigrationProgress::default();
1170        assert_eq!(progress.total_users, 0);
1171        assert_eq!(progress.migrated_users, 0);
1172    }
1173
1174    #[test]
1175    fn test_integrity_report_default() {
1176        let report = IntegrityReport::default();
1177        assert_eq!(report.total_mailboxes, 0);
1178        assert_eq!(report.total_messages, 0);
1179    }
1180
1181    #[test]
1182    fn test_backend_type_serialization() {
1183        let backend = BackendType::Filesystem;
1184        let json = serde_json::to_string(&backend).unwrap();
1185        let deserialized: BackendType = serde_json::from_str(&json).unwrap();
1186        assert_eq!(backend, deserialized);
1187    }
1188
1189    #[test]
1190    fn test_migration_config_batch_size() {
1191        let mut config = MigrationConfig::default();
1192        assert_eq!(config.batch_size, 100);
1193
1194        config.batch_size = 200;
1195        assert_eq!(config.batch_size, 200);
1196    }
1197
1198    #[test]
1199    fn test_migration_config_parallel() {
1200        let mut config = MigrationConfig::default();
1201        assert_eq!(config.parallel, 4);
1202
1203        config.parallel = 8;
1204        assert_eq!(config.parallel, 8);
1205    }
1206
1207    #[test]
1208    fn test_migration_progress_bytes_tracking() {
1209        let mut progress = MigrationProgress::new();
1210        progress.total_bytes = 1_048_576;
1211        progress.migrated_bytes = 524_288;
1212
1213        assert_eq!(progress.total_bytes, 1_048_576);
1214        assert_eq!(progress.migrated_bytes, 524_288);
1215    }
1216
1217    #[test]
1218    fn test_integrity_report_orphaned_messages() {
1219        let mut report = IntegrityReport::new();
1220        report.orphaned_messages.push("msg1".to_string());
1221        report.orphaned_messages.push("msg2".to_string());
1222
1223        assert_eq!(report.orphaned_messages.len(), 2);
1224    }
1225
1226    // ---------------------------------------------------------------------------
1227    // Tests added for Slice G stub-check pass
1228    // ---------------------------------------------------------------------------
1229
1230    /// Verify that `get_users` delegates to `StorageBackend::list_all_users`.
1231    ///
1232    /// We use `AmatersBackend` (the in-memory mock) as the backend.  An empty
1233    /// store returns an empty user list, proving the call is forwarded rather
1234    /// than returning a hardcoded list.
1235    #[tokio::test]
1236    async fn test_get_users_delegates_to_backend() {
1237        use rusmes_storage::backends::amaters::{AmatersBackend, AmatersConfig};
1238
1239        let backend = AmatersBackend::new(AmatersConfig::default())
1240            .await
1241            .expect("AmatersBackend::new failed");
1242
1243        let config = MigrationConfig::default();
1244        let migrator = StorageMigrator::new(config);
1245
1246        let users = migrator
1247            .get_users(&backend)
1248            .await
1249            .expect("get_users failed");
1250
1251        // The in-memory AmateRS backend has no registered users, so the list
1252        // must be empty — confirming the delegation (the old stub returned
1253        // two hardcoded placeholders).
1254        assert!(
1255            users.is_empty(),
1256            "expected empty user list from a fresh AmatersBackend, got {:?}",
1257            users
1258        );
1259    }
1260
1261    /// Verify `AmatersConfig::from_url` parses valid URLs and rejects invalid
1262    /// ones.
1263    #[test]
1264    fn test_amaters_config_from_url() {
1265        use rusmes_storage::backends::amaters::AmatersConfig;
1266
1267        // Single endpoint, explicit keyspace.
1268        let cfg = AmatersConfig::from_url("amaters://node1:9042/my_keyspace")
1269            .expect("valid single-endpoint URL should parse");
1270        assert_eq!(cfg.cluster_endpoints, vec!["node1:9042"]);
1271        assert_eq!(cfg.metadata_keyspace, "my_keyspace");
1272        assert_eq!(cfg.blob_keyspace, "my_keyspace_blobs");
1273
1274        // Multiple endpoints, no keyspace path → defaults.
1275        let cfg = AmatersConfig::from_url("amaters://host1:9042,host2:9042")
1276            .expect("valid multi-endpoint URL should parse");
1277        assert_eq!(cfg.cluster_endpoints, vec!["host1:9042", "host2:9042"]);
1278        assert_eq!(cfg.metadata_keyspace, "rusmes_metadata");
1279        assert_eq!(cfg.blob_keyspace, "rusmes_blobs");
1280
1281        // Multiple endpoints with keyspace.
1282        let cfg = AmatersConfig::from_url("amaters://host1:9042,host2:9042,host3:9042/prod")
1283            .expect("valid three-endpoint URL should parse");
1284        assert_eq!(cfg.cluster_endpoints.len(), 3);
1285        assert_eq!(cfg.metadata_keyspace, "prod");
1286        assert_eq!(cfg.blob_keyspace, "prod_blobs");
1287
1288        // Wrong scheme → error.
1289        assert!(
1290            AmatersConfig::from_url("cassandra://host1:9042/ks").is_err(),
1291            "wrong scheme must be rejected"
1292        );
1293
1294        // Missing port → error.
1295        assert!(
1296            AmatersConfig::from_url("amaters://host1/ks").is_err(),
1297            "endpoint without port must be rejected"
1298        );
1299
1300        // Empty authority → error.
1301        assert!(
1302            AmatersConfig::from_url("amaters:///keyspace").is_err(),
1303            "empty host list must be rejected"
1304        );
1305    }
1306}