Skip to main content

rusmes_cli/commands/
migrate.rs

1//! Storage migration command
2
3use anyhow::{Context, Result};
4use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
5use rusmes_proto::{Mail, MessageId, Username};
6use rusmes_storage::backends::{
7    filesystem::FilesystemBackend, postgres_complete::PostgresCompleteBackend,
8};
9use rusmes_storage::{MailboxId, MessageStore, StorageBackend};
10use serde::{Deserialize, Serialize};
11use sha2::{Digest, Sha256};
12use std::collections::HashMap;
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::Semaphore;
17
18/// Storage backend type
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
20pub enum BackendType {
21    Filesystem,
22    Postgres,
23    Amaters,
24}
25
26impl std::str::FromStr for BackendType {
27    type Err = anyhow::Error;
28
29    fn from_str(s: &str) -> Result<Self> {
30        match s.to_lowercase().as_str() {
31            "filesystem" | "fs" => Ok(BackendType::Filesystem),
32            "postgres" | "postgresql" | "pg" => Ok(BackendType::Postgres),
33            "amaters" => Ok(BackendType::Amaters),
34            _ => Err(anyhow::anyhow!("Unknown backend type: {}", s)),
35        }
36    }
37}
38
39impl std::fmt::Display for BackendType {
40    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41        match self {
42            BackendType::Filesystem => write!(f, "filesystem"),
43            BackendType::Postgres => write!(f, "postgres"),
44            BackendType::Amaters => write!(f, "amaters"),
45        }
46    }
47}
48
49/// Migration configuration
50#[derive(Debug, Clone)]
51pub struct MigrationConfig {
52    pub source_type: BackendType,
53    pub source_config: String,
54    pub dest_type: BackendType,
55    pub dest_config: String,
56    pub batch_size: usize,
57    pub parallel: usize,
58    pub verify: bool,
59    pub dry_run: bool,
60    pub resume: bool,
61}
62
63impl Default for MigrationConfig {
64    fn default() -> Self {
65        Self {
66            source_type: BackendType::Filesystem,
67            source_config: "/var/lib/rusmes/mail".to_string(),
68            dest_type: BackendType::Postgres,
69            dest_config: "postgresql://localhost/rusmes".to_string(),
70            batch_size: 100,
71            parallel: 4,
72            verify: true,
73            dry_run: false,
74            resume: false,
75        }
76    }
77}
78
79/// Migration progress tracker
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct MigrationProgress {
82    pub total_users: usize,
83    pub migrated_users: usize,
84    pub total_mailboxes: usize,
85    pub migrated_mailboxes: usize,
86    pub total_messages: usize,
87    pub migrated_messages: usize,
88    pub total_bytes: u64,
89    pub migrated_bytes: u64,
90    pub failed_messages: Vec<String>,
91    pub migrated_user_list: Vec<String>,
92    pub migrated_mailbox_map: HashMap<String, String>,
93    pub started_at: i64,
94    pub last_updated_at: i64,
95    pub completed_at: Option<i64>,
96}
97
98impl MigrationProgress {
99    pub fn new() -> Self {
100        let now = chrono::Utc::now().timestamp();
101        Self {
102            total_users: 0,
103            migrated_users: 0,
104            total_mailboxes: 0,
105            migrated_mailboxes: 0,
106            total_messages: 0,
107            migrated_messages: 0,
108            total_bytes: 0,
109            migrated_bytes: 0,
110            failed_messages: Vec::new(),
111            migrated_user_list: Vec::new(),
112            migrated_mailbox_map: HashMap::new(),
113            started_at: now,
114            last_updated_at: now,
115            completed_at: None,
116        }
117    }
118
119    pub fn save_to_file(&self, path: &PathBuf) -> Result<()> {
120        let json = serde_json::to_string_pretty(self)?;
121        std::fs::write(path, json)?;
122        Ok(())
123    }
124
125    pub fn load_from_file(path: &PathBuf) -> Result<Self> {
126        let json = std::fs::read_to_string(path)?;
127        Ok(serde_json::from_str(&json)?)
128    }
129
130    pub fn mark_user_migrated(&mut self, user: &str) {
131        self.migrated_user_list.push(user.to_string());
132        self.migrated_users += 1;
133        self.last_updated_at = chrono::Utc::now().timestamp();
134    }
135
136    pub fn is_user_migrated(&self, user: &str) -> bool {
137        self.migrated_user_list.contains(&user.to_string())
138    }
139
140    pub fn mark_mailbox_migrated(&mut self, mailbox_key: String, mailbox_id: String) {
141        self.migrated_mailbox_map.insert(mailbox_key, mailbox_id);
142        self.migrated_mailboxes += 1;
143        self.last_updated_at = chrono::Utc::now().timestamp();
144    }
145
146    pub fn is_mailbox_migrated(&self, mailbox_key: &str) -> bool {
147        self.migrated_mailbox_map.contains_key(mailbox_key)
148    }
149
150    pub fn progress_percentage(&self) -> f64 {
151        if self.total_messages == 0 {
152            0.0
153        } else {
154            (self.migrated_messages as f64 / self.total_messages as f64) * 100.0
155        }
156    }
157
158    pub fn eta_seconds(&self) -> Option<u64> {
159        if self.migrated_messages == 0 {
160            return None;
161        }
162
163        let elapsed = self.last_updated_at - self.started_at;
164        if elapsed <= 0 {
165            return None;
166        }
167
168        let remaining = self.total_messages.saturating_sub(self.migrated_messages);
169        let rate = self.migrated_messages as f64 / elapsed as f64;
170
171        if rate <= 0.0 {
172            return None;
173        }
174
175        Some((remaining as f64 / rate) as u64)
176    }
177
178    pub fn messages_per_second(&self) -> f64 {
179        let elapsed = self.last_updated_at - self.started_at;
180        if elapsed <= 0 {
181            return 0.0;
182        }
183        self.migrated_messages as f64 / elapsed as f64
184    }
185}
186
187impl Default for MigrationProgress {
188    fn default() -> Self {
189        Self::new()
190    }
191}
192
193/// Migration statistics
194#[derive(Debug, Clone, Serialize, Deserialize)]
195pub struct MigrationStats {
196    pub total_users: usize,
197    pub total_mailboxes: usize,
198    pub total_messages: usize,
199    pub total_bytes: u64,
200    pub migrated_bytes: u64,
201    pub failed_messages: usize,
202    pub duration_secs: u64,
203    pub throughput_msg_sec: f64,
204    pub throughput_mbps: f64,
205}
206
207impl MigrationStats {
208    pub fn from_progress(progress: &MigrationProgress) -> Self {
209        let duration = if let Some(completed) = progress.completed_at {
210            (completed - progress.started_at) as u64
211        } else {
212            (chrono::Utc::now().timestamp() - progress.started_at) as u64
213        };
214
215        let duration_secs = duration.max(1);
216        let throughput_msg_sec = progress.migrated_messages as f64 / duration_secs as f64;
217        let throughput_mbps = if duration_secs > 0 {
218            (progress.migrated_bytes as f64 / 1_048_576.0) / duration_secs as f64
219        } else {
220            0.0
221        };
222
223        Self {
224            total_users: progress.total_users,
225            total_mailboxes: progress.total_mailboxes,
226            total_messages: progress.total_messages,
227            total_bytes: progress.total_bytes,
228            migrated_bytes: progress.migrated_bytes,
229            failed_messages: progress.failed_messages.len(),
230            duration_secs,
231            throughput_msg_sec,
232            throughput_mbps,
233        }
234    }
235
236    pub fn print(&self) {
237        println!("\n=== Migration Statistics ===");
238        println!("Users migrated: {}", self.total_users);
239        println!("Mailboxes migrated: {}", self.total_mailboxes);
240        println!("Messages migrated: {}", self.total_messages);
241        println!(
242            "Data migrated: {:.2} MB",
243            self.migrated_bytes as f64 / 1_048_576.0
244        );
245        println!("Failed messages: {}", self.failed_messages);
246        println!("Duration: {} seconds", self.duration_secs);
247        println!("Throughput: {:.2} msg/s", self.throughput_msg_sec);
248        println!("Throughput: {:.2} MB/s", self.throughput_mbps);
249    }
250}
251
252/// Message checksum for verification
253#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct MessageChecksum {
255    pub message_id: String,
256    pub size: usize,
257    pub sha256: String,
258}
259
260impl MessageChecksum {
261    pub fn compute(mail: &Mail) -> Self {
262        let size = mail.size();
263
264        // For checksum, we use the message ID and size as a simple integrity check
265        // In a full implementation, we'd serialize the entire message
266        let mut hasher = Sha256::new();
267        hasher.update(mail.message_id().to_string().as_bytes());
268        hasher.update(size.to_le_bytes());
269        let hash = hasher.finalize();
270
271        Self {
272            message_id: mail.message_id().to_string(),
273            size,
274            sha256: format!("{:x}", hash),
275        }
276    }
277}
278
279/// Storage migrator
280pub struct StorageMigrator {
281    config: MigrationConfig,
282    progress: MigrationProgress,
283    progress_file: PathBuf,
284    backup_dir: PathBuf,
285}
286
287impl StorageMigrator {
288    /// Create a new migrator
289    pub fn new(config: MigrationConfig) -> Self {
290        let progress_file = PathBuf::from("/tmp/rusmes_migration_progress.json");
291        let backup_dir = PathBuf::from("/tmp/rusmes_migration_backup");
292
293        let progress = if config.resume && progress_file.exists() {
294            MigrationProgress::load_from_file(&progress_file)
295                .unwrap_or_else(|_| MigrationProgress::new())
296        } else {
297            MigrationProgress::new()
298        };
299
300        Self {
301            config,
302            progress,
303            progress_file,
304            backup_dir,
305        }
306    }
307
308    /// Run the migration
309    pub async fn migrate(&mut self) -> Result<MigrationStats> {
310        println!(
311            "Starting migration from {} to {}",
312            self.config.source_type, self.config.dest_type
313        );
314
315        if self.config.dry_run {
316            println!("DRY RUN MODE - No changes will be made");
317        }
318
319        if self.config.resume {
320            println!("Resuming migration from previous state");
321            println!(
322                "Previously migrated: {} users, {} mailboxes, {} messages",
323                self.progress.migrated_users,
324                self.progress.migrated_mailboxes,
325                self.progress.migrated_messages
326            );
327        }
328
329        // Create backup directory
330        if !self.config.dry_run {
331            std::fs::create_dir_all(&self.backup_dir)?;
332        }
333
334        // Create backends
335        let source = self.create_source_backend().await?;
336        let dest = self.create_dest_backend().await?;
337
338        // Backup destination state before migration
339        if !self.config.dry_run && !self.config.resume {
340            println!("Creating backup of destination state...");
341            self.backup_destination_state(dest.as_ref()).await?;
342        }
343
344        // Get all users
345        let users = self.get_users(source.as_ref()).await?;
346        self.progress.total_users = users.len();
347
348        // Count total messages and mailboxes
349        self.count_totals(source.as_ref(), &users).await?;
350
351        println!(
352            "Migration scope: {} users, {} mailboxes, {} messages ({:.2} MB)",
353            self.progress.total_users,
354            self.progress.total_mailboxes,
355            self.progress.total_messages,
356            self.progress.total_bytes as f64 / 1_048_576.0
357        );
358
359        // Create progress bars
360        let multi_progress = MultiProgress::new();
361        let user_pb = multi_progress.add(ProgressBar::new(users.len() as u64));
362        user_pb.set_style(
363            ProgressStyle::default_bar()
364                .template("[{elapsed_precise}] Users {bar:30.cyan/blue} {pos}/{len} ({eta})")
365                .expect("Invalid progress bar template")
366                .progress_chars("=>-"),
367        );
368
369        let message_pb = multi_progress.add(ProgressBar::new(self.progress.total_messages as u64));
370        message_pb.set_style(
371            ProgressStyle::default_bar()
372                .template("[{elapsed_precise}] Messages {bar:30.green/blue} {pos}/{len} ({msg})")
373                .expect("Invalid progress bar template")
374                .progress_chars("=>-"),
375        );
376
377        let start_time = Instant::now();
378
379        // Migrate users
380        for user in &users {
381            if self.config.resume && self.progress.is_user_migrated(&user.to_string()) {
382                user_pb.inc(1);
383                continue;
384            }
385
386            if !self.config.dry_run {
387                self.migrate_user(source.as_ref(), dest.as_ref(), user, &message_pb)
388                    .await?;
389            }
390
391            self.progress.mark_user_migrated(&user.to_string());
392            user_pb.inc(1);
393
394            // Save progress periodically
395            if !self.config.dry_run {
396                self.progress.save_to_file(&self.progress_file)?;
397            }
398
399            // Update message progress bar with current rate
400            let rate = self.progress.messages_per_second();
401            message_pb.set_message(format!("{:.1} msg/s", rate));
402        }
403
404        user_pb.finish_with_message("All users migrated");
405        message_pb.finish_with_message("All messages migrated");
406
407        self.progress.completed_at = Some(chrono::Utc::now().timestamp());
408
409        if !self.config.dry_run {
410            self.progress.save_to_file(&self.progress_file)?;
411        }
412
413        let stats = MigrationStats::from_progress(&self.progress);
414
415        // Verification
416        if self.config.verify && !self.config.dry_run {
417            println!("\nVerifying migration integrity...");
418            self.verify_migration(source.as_ref(), dest.as_ref())
419                .await?;
420        }
421
422        let elapsed = start_time.elapsed();
423        println!("\nMigration completed in {:.2}s", elapsed.as_secs_f64());
424
425        Ok(stats)
426    }
427
428    async fn create_source_backend(&self) -> Result<Box<dyn StorageBackend>> {
429        match self.config.source_type {
430            BackendType::Filesystem => {
431                Ok(Box::new(FilesystemBackend::new(&self.config.source_config)))
432            }
433            BackendType::Postgres => {
434                let backend = PostgresCompleteBackend::new(&self.config.source_config).await?;
435                Ok(Box::new(backend))
436            }
437            BackendType::Amaters => Err(anyhow::anyhow!("AmateRS backend not yet implemented")),
438        }
439    }
440
441    async fn create_dest_backend(&self) -> Result<Box<dyn StorageBackend>> {
442        match self.config.dest_type {
443            BackendType::Filesystem => {
444                let path = &self.config.dest_config;
445                std::fs::create_dir_all(path)?;
446                Ok(Box::new(FilesystemBackend::new(path)))
447            }
448            BackendType::Postgres => {
449                let backend = PostgresCompleteBackend::new(&self.config.dest_config).await?;
450                if !self.config.dry_run {
451                    backend.init_schema().await?;
452                }
453                Ok(Box::new(backend))
454            }
455            BackendType::Amaters => Err(anyhow::anyhow!("AmateRS backend not yet implemented")),
456        }
457    }
458
459    async fn get_users(&self, _backend: &dyn StorageBackend) -> Result<Vec<Username>> {
460        // In production: query backend for all users
461        // For now, return placeholder users
462        Ok(vec![
463            Username::new("user1@example.com".to_string())?,
464            Username::new("user2@example.com".to_string())?,
465        ])
466    }
467
468    async fn count_totals(
469        &mut self,
470        source: &dyn StorageBackend,
471        users: &[Username],
472    ) -> Result<()> {
473        let mailbox_store = source.mailbox_store();
474        let message_store = source.message_store();
475
476        let mut total_mailboxes = 0;
477        let mut total_messages = 0;
478        let mut total_bytes = 0u64;
479
480        for user in users {
481            let mailboxes = mailbox_store.list_mailboxes(user).await?;
482            total_mailboxes += mailboxes.len();
483
484            for mailbox in mailboxes {
485                let messages = message_store.get_mailbox_messages(mailbox.id()).await?;
486                total_messages += messages.len();
487                total_bytes += messages.iter().map(|m| m.size() as u64).sum::<u64>();
488            }
489        }
490
491        self.progress.total_mailboxes = total_mailboxes;
492        self.progress.total_messages = total_messages;
493        self.progress.total_bytes = total_bytes;
494
495        Ok(())
496    }
497
498    async fn migrate_user(
499        &mut self,
500        source: &dyn StorageBackend,
501        dest: &dyn StorageBackend,
502        user: &Username,
503        message_pb: &ProgressBar,
504    ) -> Result<()> {
505        let source_mailboxes = source.mailbox_store();
506        let dest_mailboxes = dest.mailbox_store();
507        let source_messages = source.message_store();
508        let dest_messages = dest.message_store();
509
510        // Get all mailboxes for user
511        let mailboxes = source_mailboxes.list_mailboxes(user).await?;
512
513        for mailbox in mailboxes {
514            let mailbox_key = format!("{}:{}", user, mailbox.path());
515
516            if self.config.resume && self.progress.is_mailbox_migrated(&mailbox_key) {
517                continue;
518            }
519
520            // Create mailbox in destination
521            let dest_mailbox_id = dest_mailboxes.create_mailbox(mailbox.path()).await?;
522
523            self.progress
524                .mark_mailbox_migrated(mailbox_key, dest_mailbox_id.to_string());
525
526            // Migrate messages in batches with parallel processing
527            let messages = source_messages.get_mailbox_messages(mailbox.id()).await?;
528
529            let semaphore = Arc::new(Semaphore::new(self.config.parallel));
530            let mut handles = vec![];
531
532            for chunk in messages.chunks(self.config.batch_size) {
533                for message_meta in chunk {
534                    let permit = semaphore.clone().acquire_owned().await?;
535                    let source_messages = Arc::clone(&source_messages);
536                    let dest_messages = Arc::clone(&dest_messages);
537                    let message_id = *message_meta.message_id();
538                    let mailbox_id = dest_mailbox_id;
539                    let size = message_meta.size();
540                    let flags = message_meta.flags().clone();
541
542                    let handle = tokio::spawn(async move {
543                        let _permit = permit;
544                        let result = Self::migrate_single_message(
545                            &source_messages,
546                            &dest_messages,
547                            &message_id,
548                            &mailbox_id,
549                        )
550                        .await;
551                        (message_id, size, flags, result)
552                    });
553
554                    handles.push(handle);
555                }
556
557                // Process batch results
558                for handle in handles.drain(..) {
559                    match handle.await {
560                        Ok((msg_id, size, _flags, result)) => match result {
561                            Ok(_) => {
562                                self.progress.migrated_messages += 1;
563                                self.progress.migrated_bytes += size as u64;
564                                message_pb.inc(1);
565                            }
566                            Err(e) => {
567                                tracing::error!("Failed to migrate message {}: {}", msg_id, e);
568                                self.progress.failed_messages.push(msg_id.to_string());
569                            }
570                        },
571                        Err(e) => {
572                            tracing::error!("Task failed: {}", e);
573                        }
574                    }
575                }
576            }
577        }
578
579        Ok(())
580    }
581
582    async fn migrate_single_message(
583        source: &Arc<dyn MessageStore>,
584        dest: &Arc<dyn MessageStore>,
585        message_id: &MessageId,
586        dest_mailbox_id: &MailboxId,
587    ) -> Result<()> {
588        let message = source
589            .get_message(message_id)
590            .await?
591            .context("Message not found in source")?;
592
593        dest.append_message(dest_mailbox_id, message).await?;
594
595        Ok(())
596    }
597
598    async fn verify_migration(
599        &self,
600        source: &dyn StorageBackend,
601        dest: &dyn StorageBackend,
602    ) -> Result<()> {
603        let source_mailboxes = source.mailbox_store();
604        let dest_mailboxes = dest.mailbox_store();
605        let source_messages = source.message_store();
606        let dest_messages = dest.message_store();
607
608        let users = self.get_users(source).await?;
609
610        let pb = ProgressBar::new(users.len() as u64);
611        pb.set_style(
612            ProgressStyle::default_bar()
613                .template("[{elapsed_precise}] Verifying {bar:40.yellow/blue} {pos}/{len}")
614                .expect("Invalid progress bar template")
615                .progress_chars("=>-"),
616        );
617
618        for user in &users {
619            let source_mboxes = source_mailboxes.list_mailboxes(user).await?;
620            let dest_mboxes = dest_mailboxes.list_mailboxes(user).await?;
621
622            if source_mboxes.len() != dest_mboxes.len() {
623                return Err(anyhow::anyhow!(
624                    "Mailbox count mismatch for user {}: source={}, dest={}",
625                    user,
626                    source_mboxes.len(),
627                    dest_mboxes.len()
628                ));
629            }
630
631            // Verify message counts
632            for (src_mbox, dst_mbox) in source_mboxes.iter().zip(dest_mboxes.iter()) {
633                let src_msgs = source_messages.get_mailbox_messages(src_mbox.id()).await?;
634                let dst_msgs = dest_messages.get_mailbox_messages(dst_mbox.id()).await?;
635
636                if src_msgs.len() != dst_msgs.len() {
637                    return Err(anyhow::anyhow!(
638                        "Message count mismatch in mailbox {}: source={}, dest={}",
639                        src_mbox.path(),
640                        src_msgs.len(),
641                        dst_msgs.len()
642                    ));
643                }
644
645                // Verify checksums for sample messages
646                if !src_msgs.is_empty() {
647                    let sample_size = (src_msgs.len() / 10).clamp(1, 10);
648                    for i in 0..sample_size {
649                        let idx = i * (src_msgs.len() / sample_size);
650                        if let Some(src_meta) = src_msgs.get(idx) {
651                            if let (Some(src_msg), Some(dst_msg)) = (
652                                source_messages.get_message(src_meta.message_id()).await?,
653                                dest_messages.get_message(src_meta.message_id()).await?,
654                            ) {
655                                let src_checksum = MessageChecksum::compute(&src_msg);
656                                let dst_checksum = MessageChecksum::compute(&dst_msg);
657
658                                if src_checksum.sha256 != dst_checksum.sha256 {
659                                    return Err(anyhow::anyhow!(
660                                        "Checksum mismatch for message {}",
661                                        src_meta.message_id()
662                                    ));
663                                }
664                            }
665                        }
666                    }
667                }
668            }
669
670            pb.inc(1);
671        }
672
673        pb.finish_with_message("Verification completed successfully");
674        println!("Verification passed: all mailboxes and messages verified");
675        Ok(())
676    }
677
678    async fn backup_destination_state(&self, _dest: &dyn StorageBackend) -> Result<()> {
679        let backup_metadata = serde_json::json!({
680            "timestamp": chrono::Utc::now().to_rfc3339(),
681            "dest_type": self.config.dest_type,
682            "dest_config": self.config.dest_config,
683        });
684
685        let backup_file = self.backup_dir.join("migration_backup_metadata.json");
686        std::fs::write(backup_file, serde_json::to_string_pretty(&backup_metadata)?)?;
687
688        println!("Backup metadata saved to {:?}", self.backup_dir);
689        Ok(())
690    }
691
692    /// Rollback migration
693    pub async fn rollback(&self) -> Result<()> {
694        println!("Rolling back migration...");
695
696        let backup_file = self.backup_dir.join("migration_backup_metadata.json");
697        if !backup_file.exists() {
698            return Err(anyhow::anyhow!("No backup found to rollback"));
699        }
700
701        println!(
702            "Rollback would restore from backup at {:?}",
703            self.backup_dir
704        );
705        println!("Note: Full rollback implementation requires backend-specific restore logic");
706
707        Ok(())
708    }
709
710    /// Get migration progress
711    pub fn get_progress(&self) -> &MigrationProgress {
712        &self.progress
713    }
714
715    /// Print migration report
716    pub fn print_report(&self) {
717        println!("\n=== Migration Report ===");
718        println!(
719            "Users: {}/{}",
720            self.progress.migrated_users, self.progress.total_users
721        );
722        println!(
723            "Mailboxes: {}/{}",
724            self.progress.migrated_mailboxes, self.progress.total_mailboxes
725        );
726        println!(
727            "Messages: {}/{}",
728            self.progress.migrated_messages, self.progress.total_messages
729        );
730        println!(
731            "Data: {:.2}/{:.2} MB",
732            self.progress.migrated_bytes as f64 / 1_048_576.0,
733            self.progress.total_bytes as f64 / 1_048_576.0
734        );
735
736        if !self.progress.failed_messages.is_empty() {
737            println!("\nFailed messages: {}", self.progress.failed_messages.len());
738            for msg_id in self.progress.failed_messages.iter().take(10) {
739                println!("  - {}", msg_id);
740            }
741            if self.progress.failed_messages.len() > 10 {
742                println!(
743                    "  ... and {} more",
744                    self.progress.failed_messages.len() - 10
745                );
746            }
747        }
748
749        let stats = MigrationStats::from_progress(&self.progress);
750        println!("\nDuration: {} seconds", stats.duration_secs);
751        println!(
752            "Throughput: {:.2} messages/second",
753            stats.throughput_msg_sec
754        );
755        println!("Throughput: {:.2} MB/second", stats.throughput_mbps);
756
757        if let Some(eta) = self.progress.eta_seconds() {
758            let eta_duration = Duration::from_secs(eta);
759            println!("ETA: {:?}", eta_duration);
760        }
761    }
762}
763
764/// Integrity checker
765pub struct IntegrityChecker {
766    backend: Box<dyn StorageBackend>,
767}
768
769impl IntegrityChecker {
770    pub fn new(backend: Box<dyn StorageBackend>) -> Self {
771        Self { backend }
772    }
773
774    /// Check integrity of storage backend
775    pub async fn check(&self) -> Result<IntegrityReport> {
776        let mut report = IntegrityReport::new();
777
778        let mailbox_store = self.backend.mailbox_store();
779        let message_store = self.backend.message_store();
780
781        // Get sample users for testing
782        let users = vec![
783            Username::new("user1@example.com".to_string())?,
784            Username::new("user2@example.com".to_string())?,
785        ];
786
787        for user in &users {
788            let mailboxes = mailbox_store.list_mailboxes(user).await?;
789            report.total_mailboxes += mailboxes.len();
790
791            for mailbox in mailboxes {
792                let messages = message_store.get_mailbox_messages(mailbox.id()).await?;
793                report.total_messages += messages.len();
794
795                // Check for messages that can't be retrieved
796                for msg_meta in messages {
797                    match message_store.get_message(msg_meta.message_id()).await {
798                        Ok(Some(_)) => {}
799                        Ok(None) => {
800                            report
801                                .orphaned_messages
802                                .push(msg_meta.message_id().to_string());
803                        }
804                        Err(e) => {
805                            report.errors.push(format!(
806                                "Error reading message {}: {}",
807                                msg_meta.message_id(),
808                                e
809                            ));
810                        }
811                    }
812                }
813            }
814        }
815
816        Ok(report)
817    }
818}
819
820/// Integrity check report
821#[derive(Debug, Clone, Serialize, Deserialize)]
822pub struct IntegrityReport {
823    pub total_mailboxes: usize,
824    pub total_messages: usize,
825    pub orphaned_messages: Vec<String>,
826    pub errors: Vec<String>,
827}
828
829impl IntegrityReport {
830    pub fn new() -> Self {
831        Self {
832            total_mailboxes: 0,
833            total_messages: 0,
834            orphaned_messages: Vec::new(),
835            errors: Vec::new(),
836        }
837    }
838
839    pub fn print(&self) {
840        println!("\n=== Integrity Report ===");
841        println!("Total mailboxes: {}", self.total_mailboxes);
842        println!("Total messages: {}", self.total_messages);
843        println!("Orphaned messages: {}", self.orphaned_messages.len());
844
845        if !self.orphaned_messages.is_empty() {
846            println!("\nOrphaned messages:");
847            for msg_id in self.orphaned_messages.iter().take(10) {
848                println!("  - {}", msg_id);
849            }
850            if self.orphaned_messages.len() > 10 {
851                println!("  ... and {} more", self.orphaned_messages.len() - 10);
852            }
853        }
854
855        if !self.errors.is_empty() {
856            println!("\nErrors:");
857            for error in self.errors.iter().take(10) {
858                println!("  - {}", error);
859            }
860            if self.errors.len() > 10 {
861                println!("  ... and {} more", self.errors.len() - 10);
862            }
863        }
864
865        if self.orphaned_messages.is_empty() && self.errors.is_empty() {
866            println!("\nNo integrity issues found");
867        }
868    }
869}
870
871impl Default for IntegrityReport {
872    fn default() -> Self {
873        Self::new()
874    }
875}
876
877#[cfg(test)]
878mod tests {
879    use super::*;
880
881    #[test]
882    fn test_backend_type_from_str() {
883        assert_eq!(
884            "filesystem".parse::<BackendType>().unwrap(),
885            BackendType::Filesystem
886        );
887        assert_eq!(
888            "postgres".parse::<BackendType>().unwrap(),
889            BackendType::Postgres
890        );
891        assert_eq!(
892            "amaters".parse::<BackendType>().unwrap(),
893            BackendType::Amaters
894        );
895        assert!("unknown".parse::<BackendType>().is_err());
896    }
897
898    #[test]
899    fn test_backend_type_case_insensitive() {
900        assert_eq!(
901            "FILESYSTEM".parse::<BackendType>().unwrap(),
902            BackendType::Filesystem
903        );
904        assert_eq!(
905            "PostgreSQL".parse::<BackendType>().unwrap(),
906            BackendType::Postgres
907        );
908    }
909
910    #[test]
911    fn test_backend_type_aliases() {
912        assert_eq!(
913            "fs".parse::<BackendType>().unwrap(),
914            BackendType::Filesystem
915        );
916        assert_eq!("pg".parse::<BackendType>().unwrap(), BackendType::Postgres);
917        assert_eq!(
918            "postgresql".parse::<BackendType>().unwrap(),
919            BackendType::Postgres
920        );
921    }
922
923    #[test]
924    fn test_backend_type_display() {
925        assert_eq!(BackendType::Filesystem.to_string(), "filesystem");
926        assert_eq!(BackendType::Postgres.to_string(), "postgres");
927        assert_eq!(BackendType::Amaters.to_string(), "amaters");
928    }
929
930    #[test]
931    fn test_migration_config_default() {
932        let config = MigrationConfig::default();
933        assert_eq!(config.source_type, BackendType::Filesystem);
934        assert_eq!(config.dest_type, BackendType::Postgres);
935        assert_eq!(config.batch_size, 100);
936        assert_eq!(config.parallel, 4);
937        assert!(config.verify);
938        assert!(!config.dry_run);
939        assert!(!config.resume);
940    }
941
942    #[test]
943    fn test_migration_progress_new() {
944        let progress = MigrationProgress::new();
945        assert_eq!(progress.total_users, 0);
946        assert_eq!(progress.migrated_users, 0);
947        assert_eq!(progress.total_mailboxes, 0);
948        assert_eq!(progress.migrated_messages, 0);
949        assert_eq!(progress.total_bytes, 0);
950        assert_eq!(progress.migrated_bytes, 0);
951        assert!(progress.completed_at.is_none());
952        assert!(progress.migrated_user_list.is_empty());
953        assert!(progress.migrated_mailbox_map.is_empty());
954    }
955
956    #[test]
957    fn test_migration_progress_serialization() {
958        let progress = MigrationProgress::new();
959        let json = serde_json::to_string(&progress).unwrap();
960        let deserialized: MigrationProgress = serde_json::from_str(&json).unwrap();
961        assert_eq!(progress.total_users, deserialized.total_users);
962        assert_eq!(progress.migrated_users, deserialized.migrated_users);
963    }
964
965    #[test]
966    fn test_migration_progress_mark_user_migrated() {
967        let mut progress = MigrationProgress::new();
968        progress.total_users = 2;
969
970        assert!(!progress.is_user_migrated("user1"));
971        progress.mark_user_migrated("user1");
972        assert!(progress.is_user_migrated("user1"));
973        assert_eq!(progress.migrated_users, 1);
974        assert!(!progress.is_user_migrated("user2"));
975    }
976
977    #[test]
978    fn test_migration_progress_mark_mailbox_migrated() {
979        let mut progress = MigrationProgress::new();
980
981        assert!(!progress.is_mailbox_migrated("user1:INBOX"));
982        progress.mark_mailbox_migrated("user1:INBOX".to_string(), "mailbox-123".to_string());
983        assert!(progress.is_mailbox_migrated("user1:INBOX"));
984        assert_eq!(progress.migrated_mailboxes, 1);
985    }
986
987    #[test]
988    fn test_migration_progress_percentage() {
989        let mut progress = MigrationProgress::new();
990        assert_eq!(progress.progress_percentage(), 0.0);
991
992        progress.total_messages = 100;
993        progress.migrated_messages = 50;
994        assert_eq!(progress.progress_percentage(), 50.0);
995
996        progress.migrated_messages = 100;
997        assert_eq!(progress.progress_percentage(), 100.0);
998    }
999
1000    #[test]
1001    fn test_migration_progress_messages_per_second() {
1002        let mut progress = MigrationProgress::new();
1003        progress.started_at = chrono::Utc::now().timestamp() - 10;
1004        progress.last_updated_at = chrono::Utc::now().timestamp();
1005        progress.migrated_messages = 100;
1006
1007        let rate = progress.messages_per_second();
1008        assert!(rate > 0.0);
1009        assert!(rate <= 10.0);
1010    }
1011
1012    #[test]
1013    fn test_migration_stats_from_progress() {
1014        let mut progress = MigrationProgress::new();
1015        progress.total_users = 5;
1016        progress.total_mailboxes = 20;
1017        progress.total_messages = 1000;
1018        progress.migrated_messages = 1000;
1019        progress.total_bytes = 10_485_760;
1020        progress.migrated_bytes = 10_485_760;
1021        progress.started_at = chrono::Utc::now().timestamp() - 100;
1022        progress.completed_at = Some(chrono::Utc::now().timestamp());
1023
1024        let stats = MigrationStats::from_progress(&progress);
1025        assert_eq!(stats.total_users, 5);
1026        assert_eq!(stats.total_mailboxes, 20);
1027        assert_eq!(stats.total_messages, 1000);
1028        assert!(stats.duration_secs > 0);
1029        assert!(stats.throughput_msg_sec > 0.0);
1030    }
1031
1032    #[test]
1033    fn test_integrity_report_new() {
1034        let report = IntegrityReport::new();
1035        assert_eq!(report.total_mailboxes, 0);
1036        assert_eq!(report.total_messages, 0);
1037        assert!(report.orphaned_messages.is_empty());
1038        assert!(report.errors.is_empty());
1039    }
1040
1041    #[test]
1042    fn test_migration_config_custom() {
1043        let config = MigrationConfig {
1044            source_type: BackendType::Postgres,
1045            dest_type: BackendType::Amaters,
1046            batch_size: 500,
1047            parallel: 8,
1048            verify: false,
1049            dry_run: true,
1050            resume: true,
1051            ..Default::default()
1052        };
1053
1054        assert_eq!(config.source_type, BackendType::Postgres);
1055        assert_eq!(config.dest_type, BackendType::Amaters);
1056        assert_eq!(config.batch_size, 500);
1057        assert_eq!(config.parallel, 8);
1058        assert!(!config.verify);
1059        assert!(config.dry_run);
1060        assert!(config.resume);
1061    }
1062
1063    #[test]
1064    fn test_backend_type_equality() {
1065        assert_eq!(BackendType::Filesystem, BackendType::Filesystem);
1066        assert_ne!(BackendType::Filesystem, BackendType::Postgres);
1067    }
1068
1069    #[tokio::test]
1070    async fn test_migrator_creation() {
1071        let config = MigrationConfig::default();
1072        let migrator = StorageMigrator::new(config);
1073        assert_eq!(migrator.progress.total_users, 0);
1074    }
1075
1076    #[test]
1077    fn test_progress_failed_messages() {
1078        let mut progress = MigrationProgress::new();
1079        progress.failed_messages.push("msg1".to_string());
1080        progress.failed_messages.push("msg2".to_string());
1081        assert_eq!(progress.failed_messages.len(), 2);
1082    }
1083
1084    #[test]
1085    fn test_migration_stats_fields() {
1086        let progress = MigrationProgress::new();
1087        let stats = MigrationStats::from_progress(&progress);
1088        assert_eq!(stats.total_bytes, 0);
1089        assert_eq!(stats.migrated_bytes, 0);
1090        assert_eq!(stats.failed_messages, 0);
1091    }
1092
1093    #[test]
1094    fn test_integrity_report_with_errors() {
1095        let mut report = IntegrityReport::new();
1096        report.errors.push("Error 1".to_string());
1097        report.errors.push("Error 2".to_string());
1098        assert_eq!(report.errors.len(), 2);
1099    }
1100
1101    #[test]
1102    fn test_backend_type_all_variants() {
1103        let fs = BackendType::Filesystem;
1104        let pg = BackendType::Postgres;
1105        let am = BackendType::Amaters;
1106
1107        assert_ne!(fs, pg);
1108        assert_ne!(pg, am);
1109        assert_ne!(fs, am);
1110    }
1111
1112    #[test]
1113    fn test_migration_progress_completion() {
1114        let mut progress = MigrationProgress::new();
1115        assert!(progress.completed_at.is_none());
1116
1117        progress.completed_at = Some(chrono::Utc::now().timestamp());
1118        assert!(progress.completed_at.is_some());
1119    }
1120
1121    #[test]
1122    fn test_migration_progress_eta() {
1123        let mut progress = MigrationProgress::new();
1124        assert!(progress.eta_seconds().is_none());
1125
1126        progress.started_at = chrono::Utc::now().timestamp() - 10;
1127        progress.last_updated_at = chrono::Utc::now().timestamp();
1128        progress.total_messages = 1000;
1129        progress.migrated_messages = 100;
1130
1131        let eta = progress.eta_seconds();
1132        assert!(eta.is_some());
1133    }
1134
1135    #[test]
1136    fn test_message_checksum_compute() {
1137        use bytes::Bytes;
1138        use rusmes_proto::{HeaderMap, MessageBody, MimeMessage};
1139
1140        let headers = HeaderMap::new();
1141        let body = MessageBody::Small(Bytes::from("Test body"));
1142        let message = MimeMessage::new(headers, body);
1143
1144        let mail = Mail::new(
1145            Some("sender@example.com".parse().unwrap()),
1146            vec!["recipient@example.com".parse().unwrap()],
1147            message,
1148            None,
1149            None,
1150        );
1151
1152        let checksum = MessageChecksum::compute(&mail);
1153
1154        assert!(!checksum.sha256.is_empty());
1155        assert_eq!(checksum.sha256.len(), 64);
1156        assert!(checksum.size > 0);
1157    }
1158
1159    #[test]
1160    fn test_migration_progress_default() {
1161        let progress = MigrationProgress::default();
1162        assert_eq!(progress.total_users, 0);
1163        assert_eq!(progress.migrated_users, 0);
1164    }
1165
1166    #[test]
1167    fn test_integrity_report_default() {
1168        let report = IntegrityReport::default();
1169        assert_eq!(report.total_mailboxes, 0);
1170        assert_eq!(report.total_messages, 0);
1171    }
1172
1173    #[test]
1174    fn test_backend_type_serialization() {
1175        let backend = BackendType::Filesystem;
1176        let json = serde_json::to_string(&backend).unwrap();
1177        let deserialized: BackendType = serde_json::from_str(&json).unwrap();
1178        assert_eq!(backend, deserialized);
1179    }
1180
1181    #[test]
1182    fn test_migration_config_batch_size() {
1183        let mut config = MigrationConfig::default();
1184        assert_eq!(config.batch_size, 100);
1185
1186        config.batch_size = 200;
1187        assert_eq!(config.batch_size, 200);
1188    }
1189
1190    #[test]
1191    fn test_migration_config_parallel() {
1192        let mut config = MigrationConfig::default();
1193        assert_eq!(config.parallel, 4);
1194
1195        config.parallel = 8;
1196        assert_eq!(config.parallel, 8);
1197    }
1198
1199    #[test]
1200    fn test_migration_progress_bytes_tracking() {
1201        let mut progress = MigrationProgress::new();
1202        progress.total_bytes = 1_048_576;
1203        progress.migrated_bytes = 524_288;
1204
1205        assert_eq!(progress.total_bytes, 1_048_576);
1206        assert_eq!(progress.migrated_bytes, 524_288);
1207    }
1208
1209    #[test]
1210    fn test_integrity_report_orphaned_messages() {
1211        let mut report = IntegrityReport::new();
1212        report.orphaned_messages.push("msg1".to_string());
1213        report.orphaned_messages.push("msg2".to_string());
1214
1215        assert_eq!(report.orphaned_messages.len(), 2);
1216    }
1217}