1use anyhow::{Context, Result};
4use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
5use rusmes_proto::{Mail, MessageId, Username};
6use rusmes_storage::backends::{
7 filesystem::FilesystemBackend, postgres_complete::PostgresCompleteBackend,
8};
9use rusmes_storage::{MailboxId, MessageStore, StorageBackend};
10use serde::{Deserialize, Serialize};
11use sha2::{Digest, Sha256};
12use std::collections::HashMap;
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16use tokio::sync::Semaphore;
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
20pub enum BackendType {
21 Filesystem,
22 Postgres,
23 Amaters,
24}
25
26impl std::str::FromStr for BackendType {
27 type Err = anyhow::Error;
28
29 fn from_str(s: &str) -> Result<Self> {
30 match s.to_lowercase().as_str() {
31 "filesystem" | "fs" => Ok(BackendType::Filesystem),
32 "postgres" | "postgresql" | "pg" => Ok(BackendType::Postgres),
33 "amaters" => Ok(BackendType::Amaters),
34 _ => Err(anyhow::anyhow!("Unknown backend type: {}", s)),
35 }
36 }
37}
38
39impl std::fmt::Display for BackendType {
40 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 match self {
42 BackendType::Filesystem => write!(f, "filesystem"),
43 BackendType::Postgres => write!(f, "postgres"),
44 BackendType::Amaters => write!(f, "amaters"),
45 }
46 }
47}
48
49#[derive(Debug, Clone)]
51pub struct MigrationConfig {
52 pub source_type: BackendType,
53 pub source_config: String,
54 pub dest_type: BackendType,
55 pub dest_config: String,
56 pub batch_size: usize,
57 pub parallel: usize,
58 pub verify: bool,
59 pub dry_run: bool,
60 pub resume: bool,
61}
62
63impl Default for MigrationConfig {
64 fn default() -> Self {
65 Self {
66 source_type: BackendType::Filesystem,
67 source_config: "/var/lib/rusmes/mail".to_string(),
68 dest_type: BackendType::Postgres,
69 dest_config: "postgresql://localhost/rusmes".to_string(),
70 batch_size: 100,
71 parallel: 4,
72 verify: true,
73 dry_run: false,
74 resume: false,
75 }
76 }
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct MigrationProgress {
82 pub total_users: usize,
83 pub migrated_users: usize,
84 pub total_mailboxes: usize,
85 pub migrated_mailboxes: usize,
86 pub total_messages: usize,
87 pub migrated_messages: usize,
88 pub total_bytes: u64,
89 pub migrated_bytes: u64,
90 pub failed_messages: Vec<String>,
91 pub migrated_user_list: Vec<String>,
92 pub migrated_mailbox_map: HashMap<String, String>,
93 pub started_at: i64,
94 pub last_updated_at: i64,
95 pub completed_at: Option<i64>,
96}
97
98impl MigrationProgress {
99 pub fn new() -> Self {
100 let now = chrono::Utc::now().timestamp();
101 Self {
102 total_users: 0,
103 migrated_users: 0,
104 total_mailboxes: 0,
105 migrated_mailboxes: 0,
106 total_messages: 0,
107 migrated_messages: 0,
108 total_bytes: 0,
109 migrated_bytes: 0,
110 failed_messages: Vec::new(),
111 migrated_user_list: Vec::new(),
112 migrated_mailbox_map: HashMap::new(),
113 started_at: now,
114 last_updated_at: now,
115 completed_at: None,
116 }
117 }
118
119 pub fn save_to_file(&self, path: &PathBuf) -> Result<()> {
120 let json = serde_json::to_string_pretty(self)?;
121 std::fs::write(path, json)?;
122 Ok(())
123 }
124
125 pub fn load_from_file(path: &PathBuf) -> Result<Self> {
126 let json = std::fs::read_to_string(path)?;
127 Ok(serde_json::from_str(&json)?)
128 }
129
130 pub fn mark_user_migrated(&mut self, user: &str) {
131 self.migrated_user_list.push(user.to_string());
132 self.migrated_users += 1;
133 self.last_updated_at = chrono::Utc::now().timestamp();
134 }
135
136 pub fn is_user_migrated(&self, user: &str) -> bool {
137 self.migrated_user_list.contains(&user.to_string())
138 }
139
140 pub fn mark_mailbox_migrated(&mut self, mailbox_key: String, mailbox_id: String) {
141 self.migrated_mailbox_map.insert(mailbox_key, mailbox_id);
142 self.migrated_mailboxes += 1;
143 self.last_updated_at = chrono::Utc::now().timestamp();
144 }
145
146 pub fn is_mailbox_migrated(&self, mailbox_key: &str) -> bool {
147 self.migrated_mailbox_map.contains_key(mailbox_key)
148 }
149
150 pub fn progress_percentage(&self) -> f64 {
151 if self.total_messages == 0 {
152 0.0
153 } else {
154 (self.migrated_messages as f64 / self.total_messages as f64) * 100.0
155 }
156 }
157
158 pub fn eta_seconds(&self) -> Option<u64> {
159 if self.migrated_messages == 0 {
160 return None;
161 }
162
163 let elapsed = self.last_updated_at - self.started_at;
164 if elapsed <= 0 {
165 return None;
166 }
167
168 let remaining = self.total_messages.saturating_sub(self.migrated_messages);
169 let rate = self.migrated_messages as f64 / elapsed as f64;
170
171 if rate <= 0.0 {
172 return None;
173 }
174
175 Some((remaining as f64 / rate) as u64)
176 }
177
178 pub fn messages_per_second(&self) -> f64 {
179 let elapsed = self.last_updated_at - self.started_at;
180 if elapsed <= 0 {
181 return 0.0;
182 }
183 self.migrated_messages as f64 / elapsed as f64
184 }
185}
186
187impl Default for MigrationProgress {
188 fn default() -> Self {
189 Self::new()
190 }
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize)]
195pub struct MigrationStats {
196 pub total_users: usize,
197 pub total_mailboxes: usize,
198 pub total_messages: usize,
199 pub total_bytes: u64,
200 pub migrated_bytes: u64,
201 pub failed_messages: usize,
202 pub duration_secs: u64,
203 pub throughput_msg_sec: f64,
204 pub throughput_mbps: f64,
205}
206
207impl MigrationStats {
208 pub fn from_progress(progress: &MigrationProgress) -> Self {
209 let duration = if let Some(completed) = progress.completed_at {
210 (completed - progress.started_at) as u64
211 } else {
212 (chrono::Utc::now().timestamp() - progress.started_at) as u64
213 };
214
215 let duration_secs = duration.max(1);
216 let throughput_msg_sec = progress.migrated_messages as f64 / duration_secs as f64;
217 let throughput_mbps = if duration_secs > 0 {
218 (progress.migrated_bytes as f64 / 1_048_576.0) / duration_secs as f64
219 } else {
220 0.0
221 };
222
223 Self {
224 total_users: progress.total_users,
225 total_mailboxes: progress.total_mailboxes,
226 total_messages: progress.total_messages,
227 total_bytes: progress.total_bytes,
228 migrated_bytes: progress.migrated_bytes,
229 failed_messages: progress.failed_messages.len(),
230 duration_secs,
231 throughput_msg_sec,
232 throughput_mbps,
233 }
234 }
235
236 pub fn print(&self) {
237 println!("\n=== Migration Statistics ===");
238 println!("Users migrated: {}", self.total_users);
239 println!("Mailboxes migrated: {}", self.total_mailboxes);
240 println!("Messages migrated: {}", self.total_messages);
241 println!(
242 "Data migrated: {:.2} MB",
243 self.migrated_bytes as f64 / 1_048_576.0
244 );
245 println!("Failed messages: {}", self.failed_messages);
246 println!("Duration: {} seconds", self.duration_secs);
247 println!("Throughput: {:.2} msg/s", self.throughput_msg_sec);
248 println!("Throughput: {:.2} MB/s", self.throughput_mbps);
249 }
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct MessageChecksum {
255 pub message_id: String,
256 pub size: usize,
257 pub sha256: String,
258}
259
260impl MessageChecksum {
261 pub fn compute(mail: &Mail) -> Self {
262 let size = mail.size();
263
264 let mut hasher = Sha256::new();
267 hasher.update(mail.message_id().to_string().as_bytes());
268 hasher.update(size.to_le_bytes());
269 let hash = hasher.finalize();
270
271 Self {
272 message_id: mail.message_id().to_string(),
273 size,
274 sha256: format!("{:x}", hash),
275 }
276 }
277}
278
279pub struct StorageMigrator {
281 config: MigrationConfig,
282 progress: MigrationProgress,
283 progress_file: PathBuf,
284 backup_dir: PathBuf,
285}
286
287impl StorageMigrator {
288 pub fn new(config: MigrationConfig) -> Self {
290 let progress_file = PathBuf::from("/tmp/rusmes_migration_progress.json");
291 let backup_dir = PathBuf::from("/tmp/rusmes_migration_backup");
292
293 let progress = if config.resume && progress_file.exists() {
294 MigrationProgress::load_from_file(&progress_file)
295 .unwrap_or_else(|_| MigrationProgress::new())
296 } else {
297 MigrationProgress::new()
298 };
299
300 Self {
301 config,
302 progress,
303 progress_file,
304 backup_dir,
305 }
306 }
307
308 pub async fn migrate(&mut self) -> Result<MigrationStats> {
310 println!(
311 "Starting migration from {} to {}",
312 self.config.source_type, self.config.dest_type
313 );
314
315 if self.config.dry_run {
316 println!("DRY RUN MODE - No changes will be made");
317 }
318
319 if self.config.resume {
320 println!("Resuming migration from previous state");
321 println!(
322 "Previously migrated: {} users, {} mailboxes, {} messages",
323 self.progress.migrated_users,
324 self.progress.migrated_mailboxes,
325 self.progress.migrated_messages
326 );
327 }
328
329 if !self.config.dry_run {
331 std::fs::create_dir_all(&self.backup_dir)?;
332 }
333
334 let source = self.create_source_backend().await?;
336 let dest = self.create_dest_backend().await?;
337
338 if !self.config.dry_run && !self.config.resume {
340 println!("Creating backup of destination state...");
341 self.backup_destination_state(dest.as_ref()).await?;
342 }
343
344 let users = self.get_users(source.as_ref()).await?;
346 self.progress.total_users = users.len();
347
348 self.count_totals(source.as_ref(), &users).await?;
350
351 println!(
352 "Migration scope: {} users, {} mailboxes, {} messages ({:.2} MB)",
353 self.progress.total_users,
354 self.progress.total_mailboxes,
355 self.progress.total_messages,
356 self.progress.total_bytes as f64 / 1_048_576.0
357 );
358
359 let multi_progress = MultiProgress::new();
361 let user_pb = multi_progress.add(ProgressBar::new(users.len() as u64));
362 user_pb.set_style(
363 ProgressStyle::default_bar()
364 .template("[{elapsed_precise}] Users {bar:30.cyan/blue} {pos}/{len} ({eta})")
365 .expect("Invalid progress bar template")
366 .progress_chars("=>-"),
367 );
368
369 let message_pb = multi_progress.add(ProgressBar::new(self.progress.total_messages as u64));
370 message_pb.set_style(
371 ProgressStyle::default_bar()
372 .template("[{elapsed_precise}] Messages {bar:30.green/blue} {pos}/{len} ({msg})")
373 .expect("Invalid progress bar template")
374 .progress_chars("=>-"),
375 );
376
377 let start_time = Instant::now();
378
379 for user in &users {
381 if self.config.resume && self.progress.is_user_migrated(&user.to_string()) {
382 user_pb.inc(1);
383 continue;
384 }
385
386 if !self.config.dry_run {
387 self.migrate_user(source.as_ref(), dest.as_ref(), user, &message_pb)
388 .await?;
389 }
390
391 self.progress.mark_user_migrated(&user.to_string());
392 user_pb.inc(1);
393
394 if !self.config.dry_run {
396 self.progress.save_to_file(&self.progress_file)?;
397 }
398
399 let rate = self.progress.messages_per_second();
401 message_pb.set_message(format!("{:.1} msg/s", rate));
402 }
403
404 user_pb.finish_with_message("All users migrated");
405 message_pb.finish_with_message("All messages migrated");
406
407 self.progress.completed_at = Some(chrono::Utc::now().timestamp());
408
409 if !self.config.dry_run {
410 self.progress.save_to_file(&self.progress_file)?;
411 }
412
413 let stats = MigrationStats::from_progress(&self.progress);
414
415 if self.config.verify && !self.config.dry_run {
417 println!("\nVerifying migration integrity...");
418 self.verify_migration(source.as_ref(), dest.as_ref())
419 .await?;
420 }
421
422 let elapsed = start_time.elapsed();
423 println!("\nMigration completed in {:.2}s", elapsed.as_secs_f64());
424
425 Ok(stats)
426 }
427
428 async fn create_source_backend(&self) -> Result<Box<dyn StorageBackend>> {
429 match self.config.source_type {
430 BackendType::Filesystem => {
431 Ok(Box::new(FilesystemBackend::new(&self.config.source_config)))
432 }
433 BackendType::Postgres => {
434 let backend = PostgresCompleteBackend::new(&self.config.source_config).await?;
435 Ok(Box::new(backend))
436 }
437 BackendType::Amaters => Err(anyhow::anyhow!("AmateRS backend not yet implemented")),
438 }
439 }
440
441 async fn create_dest_backend(&self) -> Result<Box<dyn StorageBackend>> {
442 match self.config.dest_type {
443 BackendType::Filesystem => {
444 let path = &self.config.dest_config;
445 std::fs::create_dir_all(path)?;
446 Ok(Box::new(FilesystemBackend::new(path)))
447 }
448 BackendType::Postgres => {
449 let backend = PostgresCompleteBackend::new(&self.config.dest_config).await?;
450 if !self.config.dry_run {
451 backend.init_schema().await?;
452 }
453 Ok(Box::new(backend))
454 }
455 BackendType::Amaters => Err(anyhow::anyhow!("AmateRS backend not yet implemented")),
456 }
457 }
458
459 async fn get_users(&self, _backend: &dyn StorageBackend) -> Result<Vec<Username>> {
460 Ok(vec![
463 Username::new("user1@example.com".to_string())?,
464 Username::new("user2@example.com".to_string())?,
465 ])
466 }
467
468 async fn count_totals(
469 &mut self,
470 source: &dyn StorageBackend,
471 users: &[Username],
472 ) -> Result<()> {
473 let mailbox_store = source.mailbox_store();
474 let message_store = source.message_store();
475
476 let mut total_mailboxes = 0;
477 let mut total_messages = 0;
478 let mut total_bytes = 0u64;
479
480 for user in users {
481 let mailboxes = mailbox_store.list_mailboxes(user).await?;
482 total_mailboxes += mailboxes.len();
483
484 for mailbox in mailboxes {
485 let messages = message_store.get_mailbox_messages(mailbox.id()).await?;
486 total_messages += messages.len();
487 total_bytes += messages.iter().map(|m| m.size() as u64).sum::<u64>();
488 }
489 }
490
491 self.progress.total_mailboxes = total_mailboxes;
492 self.progress.total_messages = total_messages;
493 self.progress.total_bytes = total_bytes;
494
495 Ok(())
496 }
497
498 async fn migrate_user(
499 &mut self,
500 source: &dyn StorageBackend,
501 dest: &dyn StorageBackend,
502 user: &Username,
503 message_pb: &ProgressBar,
504 ) -> Result<()> {
505 let source_mailboxes = source.mailbox_store();
506 let dest_mailboxes = dest.mailbox_store();
507 let source_messages = source.message_store();
508 let dest_messages = dest.message_store();
509
510 let mailboxes = source_mailboxes.list_mailboxes(user).await?;
512
513 for mailbox in mailboxes {
514 let mailbox_key = format!("{}:{}", user, mailbox.path());
515
516 if self.config.resume && self.progress.is_mailbox_migrated(&mailbox_key) {
517 continue;
518 }
519
520 let dest_mailbox_id = dest_mailboxes.create_mailbox(mailbox.path()).await?;
522
523 self.progress
524 .mark_mailbox_migrated(mailbox_key, dest_mailbox_id.to_string());
525
526 let messages = source_messages.get_mailbox_messages(mailbox.id()).await?;
528
529 let semaphore = Arc::new(Semaphore::new(self.config.parallel));
530 let mut handles = vec![];
531
532 for chunk in messages.chunks(self.config.batch_size) {
533 for message_meta in chunk {
534 let permit = semaphore.clone().acquire_owned().await?;
535 let source_messages = Arc::clone(&source_messages);
536 let dest_messages = Arc::clone(&dest_messages);
537 let message_id = *message_meta.message_id();
538 let mailbox_id = dest_mailbox_id;
539 let size = message_meta.size();
540 let flags = message_meta.flags().clone();
541
542 let handle = tokio::spawn(async move {
543 let _permit = permit;
544 let result = Self::migrate_single_message(
545 &source_messages,
546 &dest_messages,
547 &message_id,
548 &mailbox_id,
549 )
550 .await;
551 (message_id, size, flags, result)
552 });
553
554 handles.push(handle);
555 }
556
557 for handle in handles.drain(..) {
559 match handle.await {
560 Ok((msg_id, size, _flags, result)) => match result {
561 Ok(_) => {
562 self.progress.migrated_messages += 1;
563 self.progress.migrated_bytes += size as u64;
564 message_pb.inc(1);
565 }
566 Err(e) => {
567 tracing::error!("Failed to migrate message {}: {}", msg_id, e);
568 self.progress.failed_messages.push(msg_id.to_string());
569 }
570 },
571 Err(e) => {
572 tracing::error!("Task failed: {}", e);
573 }
574 }
575 }
576 }
577 }
578
579 Ok(())
580 }
581
582 async fn migrate_single_message(
583 source: &Arc<dyn MessageStore>,
584 dest: &Arc<dyn MessageStore>,
585 message_id: &MessageId,
586 dest_mailbox_id: &MailboxId,
587 ) -> Result<()> {
588 let message = source
589 .get_message(message_id)
590 .await?
591 .context("Message not found in source")?;
592
593 dest.append_message(dest_mailbox_id, message).await?;
594
595 Ok(())
596 }
597
598 async fn verify_migration(
599 &self,
600 source: &dyn StorageBackend,
601 dest: &dyn StorageBackend,
602 ) -> Result<()> {
603 let source_mailboxes = source.mailbox_store();
604 let dest_mailboxes = dest.mailbox_store();
605 let source_messages = source.message_store();
606 let dest_messages = dest.message_store();
607
608 let users = self.get_users(source).await?;
609
610 let pb = ProgressBar::new(users.len() as u64);
611 pb.set_style(
612 ProgressStyle::default_bar()
613 .template("[{elapsed_precise}] Verifying {bar:40.yellow/blue} {pos}/{len}")
614 .expect("Invalid progress bar template")
615 .progress_chars("=>-"),
616 );
617
618 for user in &users {
619 let source_mboxes = source_mailboxes.list_mailboxes(user).await?;
620 let dest_mboxes = dest_mailboxes.list_mailboxes(user).await?;
621
622 if source_mboxes.len() != dest_mboxes.len() {
623 return Err(anyhow::anyhow!(
624 "Mailbox count mismatch for user {}: source={}, dest={}",
625 user,
626 source_mboxes.len(),
627 dest_mboxes.len()
628 ));
629 }
630
631 for (src_mbox, dst_mbox) in source_mboxes.iter().zip(dest_mboxes.iter()) {
633 let src_msgs = source_messages.get_mailbox_messages(src_mbox.id()).await?;
634 let dst_msgs = dest_messages.get_mailbox_messages(dst_mbox.id()).await?;
635
636 if src_msgs.len() != dst_msgs.len() {
637 return Err(anyhow::anyhow!(
638 "Message count mismatch in mailbox {}: source={}, dest={}",
639 src_mbox.path(),
640 src_msgs.len(),
641 dst_msgs.len()
642 ));
643 }
644
645 if !src_msgs.is_empty() {
647 let sample_size = (src_msgs.len() / 10).clamp(1, 10);
648 for i in 0..sample_size {
649 let idx = i * (src_msgs.len() / sample_size);
650 if let Some(src_meta) = src_msgs.get(idx) {
651 if let (Some(src_msg), Some(dst_msg)) = (
652 source_messages.get_message(src_meta.message_id()).await?,
653 dest_messages.get_message(src_meta.message_id()).await?,
654 ) {
655 let src_checksum = MessageChecksum::compute(&src_msg);
656 let dst_checksum = MessageChecksum::compute(&dst_msg);
657
658 if src_checksum.sha256 != dst_checksum.sha256 {
659 return Err(anyhow::anyhow!(
660 "Checksum mismatch for message {}",
661 src_meta.message_id()
662 ));
663 }
664 }
665 }
666 }
667 }
668 }
669
670 pb.inc(1);
671 }
672
673 pb.finish_with_message("Verification completed successfully");
674 println!("Verification passed: all mailboxes and messages verified");
675 Ok(())
676 }
677
678 async fn backup_destination_state(&self, _dest: &dyn StorageBackend) -> Result<()> {
679 let backup_metadata = serde_json::json!({
680 "timestamp": chrono::Utc::now().to_rfc3339(),
681 "dest_type": self.config.dest_type,
682 "dest_config": self.config.dest_config,
683 });
684
685 let backup_file = self.backup_dir.join("migration_backup_metadata.json");
686 std::fs::write(backup_file, serde_json::to_string_pretty(&backup_metadata)?)?;
687
688 println!("Backup metadata saved to {:?}", self.backup_dir);
689 Ok(())
690 }
691
692 pub async fn rollback(&self) -> Result<()> {
694 println!("Rolling back migration...");
695
696 let backup_file = self.backup_dir.join("migration_backup_metadata.json");
697 if !backup_file.exists() {
698 return Err(anyhow::anyhow!("No backup found to rollback"));
699 }
700
701 println!(
702 "Rollback would restore from backup at {:?}",
703 self.backup_dir
704 );
705 println!("Note: Full rollback implementation requires backend-specific restore logic");
706
707 Ok(())
708 }
709
710 pub fn get_progress(&self) -> &MigrationProgress {
712 &self.progress
713 }
714
715 pub fn print_report(&self) {
717 println!("\n=== Migration Report ===");
718 println!(
719 "Users: {}/{}",
720 self.progress.migrated_users, self.progress.total_users
721 );
722 println!(
723 "Mailboxes: {}/{}",
724 self.progress.migrated_mailboxes, self.progress.total_mailboxes
725 );
726 println!(
727 "Messages: {}/{}",
728 self.progress.migrated_messages, self.progress.total_messages
729 );
730 println!(
731 "Data: {:.2}/{:.2} MB",
732 self.progress.migrated_bytes as f64 / 1_048_576.0,
733 self.progress.total_bytes as f64 / 1_048_576.0
734 );
735
736 if !self.progress.failed_messages.is_empty() {
737 println!("\nFailed messages: {}", self.progress.failed_messages.len());
738 for msg_id in self.progress.failed_messages.iter().take(10) {
739 println!(" - {}", msg_id);
740 }
741 if self.progress.failed_messages.len() > 10 {
742 println!(
743 " ... and {} more",
744 self.progress.failed_messages.len() - 10
745 );
746 }
747 }
748
749 let stats = MigrationStats::from_progress(&self.progress);
750 println!("\nDuration: {} seconds", stats.duration_secs);
751 println!(
752 "Throughput: {:.2} messages/second",
753 stats.throughput_msg_sec
754 );
755 println!("Throughput: {:.2} MB/second", stats.throughput_mbps);
756
757 if let Some(eta) = self.progress.eta_seconds() {
758 let eta_duration = Duration::from_secs(eta);
759 println!("ETA: {:?}", eta_duration);
760 }
761 }
762}
763
764pub struct IntegrityChecker {
766 backend: Box<dyn StorageBackend>,
767}
768
769impl IntegrityChecker {
770 pub fn new(backend: Box<dyn StorageBackend>) -> Self {
771 Self { backend }
772 }
773
774 pub async fn check(&self) -> Result<IntegrityReport> {
776 let mut report = IntegrityReport::new();
777
778 let mailbox_store = self.backend.mailbox_store();
779 let message_store = self.backend.message_store();
780
781 let users = vec![
783 Username::new("user1@example.com".to_string())?,
784 Username::new("user2@example.com".to_string())?,
785 ];
786
787 for user in &users {
788 let mailboxes = mailbox_store.list_mailboxes(user).await?;
789 report.total_mailboxes += mailboxes.len();
790
791 for mailbox in mailboxes {
792 let messages = message_store.get_mailbox_messages(mailbox.id()).await?;
793 report.total_messages += messages.len();
794
795 for msg_meta in messages {
797 match message_store.get_message(msg_meta.message_id()).await {
798 Ok(Some(_)) => {}
799 Ok(None) => {
800 report
801 .orphaned_messages
802 .push(msg_meta.message_id().to_string());
803 }
804 Err(e) => {
805 report.errors.push(format!(
806 "Error reading message {}: {}",
807 msg_meta.message_id(),
808 e
809 ));
810 }
811 }
812 }
813 }
814 }
815
816 Ok(report)
817 }
818}
819
820#[derive(Debug, Clone, Serialize, Deserialize)]
822pub struct IntegrityReport {
823 pub total_mailboxes: usize,
824 pub total_messages: usize,
825 pub orphaned_messages: Vec<String>,
826 pub errors: Vec<String>,
827}
828
829impl IntegrityReport {
830 pub fn new() -> Self {
831 Self {
832 total_mailboxes: 0,
833 total_messages: 0,
834 orphaned_messages: Vec::new(),
835 errors: Vec::new(),
836 }
837 }
838
839 pub fn print(&self) {
840 println!("\n=== Integrity Report ===");
841 println!("Total mailboxes: {}", self.total_mailboxes);
842 println!("Total messages: {}", self.total_messages);
843 println!("Orphaned messages: {}", self.orphaned_messages.len());
844
845 if !self.orphaned_messages.is_empty() {
846 println!("\nOrphaned messages:");
847 for msg_id in self.orphaned_messages.iter().take(10) {
848 println!(" - {}", msg_id);
849 }
850 if self.orphaned_messages.len() > 10 {
851 println!(" ... and {} more", self.orphaned_messages.len() - 10);
852 }
853 }
854
855 if !self.errors.is_empty() {
856 println!("\nErrors:");
857 for error in self.errors.iter().take(10) {
858 println!(" - {}", error);
859 }
860 if self.errors.len() > 10 {
861 println!(" ... and {} more", self.errors.len() - 10);
862 }
863 }
864
865 if self.orphaned_messages.is_empty() && self.errors.is_empty() {
866 println!("\nNo integrity issues found");
867 }
868 }
869}
870
871impl Default for IntegrityReport {
872 fn default() -> Self {
873 Self::new()
874 }
875}
876
877#[cfg(test)]
878mod tests {
879 use super::*;
880
881 #[test]
882 fn test_backend_type_from_str() {
883 assert_eq!(
884 "filesystem".parse::<BackendType>().unwrap(),
885 BackendType::Filesystem
886 );
887 assert_eq!(
888 "postgres".parse::<BackendType>().unwrap(),
889 BackendType::Postgres
890 );
891 assert_eq!(
892 "amaters".parse::<BackendType>().unwrap(),
893 BackendType::Amaters
894 );
895 assert!("unknown".parse::<BackendType>().is_err());
896 }
897
898 #[test]
899 fn test_backend_type_case_insensitive() {
900 assert_eq!(
901 "FILESYSTEM".parse::<BackendType>().unwrap(),
902 BackendType::Filesystem
903 );
904 assert_eq!(
905 "PostgreSQL".parse::<BackendType>().unwrap(),
906 BackendType::Postgres
907 );
908 }
909
910 #[test]
911 fn test_backend_type_aliases() {
912 assert_eq!(
913 "fs".parse::<BackendType>().unwrap(),
914 BackendType::Filesystem
915 );
916 assert_eq!("pg".parse::<BackendType>().unwrap(), BackendType::Postgres);
917 assert_eq!(
918 "postgresql".parse::<BackendType>().unwrap(),
919 BackendType::Postgres
920 );
921 }
922
923 #[test]
924 fn test_backend_type_display() {
925 assert_eq!(BackendType::Filesystem.to_string(), "filesystem");
926 assert_eq!(BackendType::Postgres.to_string(), "postgres");
927 assert_eq!(BackendType::Amaters.to_string(), "amaters");
928 }
929
930 #[test]
931 fn test_migration_config_default() {
932 let config = MigrationConfig::default();
933 assert_eq!(config.source_type, BackendType::Filesystem);
934 assert_eq!(config.dest_type, BackendType::Postgres);
935 assert_eq!(config.batch_size, 100);
936 assert_eq!(config.parallel, 4);
937 assert!(config.verify);
938 assert!(!config.dry_run);
939 assert!(!config.resume);
940 }
941
942 #[test]
943 fn test_migration_progress_new() {
944 let progress = MigrationProgress::new();
945 assert_eq!(progress.total_users, 0);
946 assert_eq!(progress.migrated_users, 0);
947 assert_eq!(progress.total_mailboxes, 0);
948 assert_eq!(progress.migrated_messages, 0);
949 assert_eq!(progress.total_bytes, 0);
950 assert_eq!(progress.migrated_bytes, 0);
951 assert!(progress.completed_at.is_none());
952 assert!(progress.migrated_user_list.is_empty());
953 assert!(progress.migrated_mailbox_map.is_empty());
954 }
955
956 #[test]
957 fn test_migration_progress_serialization() {
958 let progress = MigrationProgress::new();
959 let json = serde_json::to_string(&progress).unwrap();
960 let deserialized: MigrationProgress = serde_json::from_str(&json).unwrap();
961 assert_eq!(progress.total_users, deserialized.total_users);
962 assert_eq!(progress.migrated_users, deserialized.migrated_users);
963 }
964
965 #[test]
966 fn test_migration_progress_mark_user_migrated() {
967 let mut progress = MigrationProgress::new();
968 progress.total_users = 2;
969
970 assert!(!progress.is_user_migrated("user1"));
971 progress.mark_user_migrated("user1");
972 assert!(progress.is_user_migrated("user1"));
973 assert_eq!(progress.migrated_users, 1);
974 assert!(!progress.is_user_migrated("user2"));
975 }
976
977 #[test]
978 fn test_migration_progress_mark_mailbox_migrated() {
979 let mut progress = MigrationProgress::new();
980
981 assert!(!progress.is_mailbox_migrated("user1:INBOX"));
982 progress.mark_mailbox_migrated("user1:INBOX".to_string(), "mailbox-123".to_string());
983 assert!(progress.is_mailbox_migrated("user1:INBOX"));
984 assert_eq!(progress.migrated_mailboxes, 1);
985 }
986
987 #[test]
988 fn test_migration_progress_percentage() {
989 let mut progress = MigrationProgress::new();
990 assert_eq!(progress.progress_percentage(), 0.0);
991
992 progress.total_messages = 100;
993 progress.migrated_messages = 50;
994 assert_eq!(progress.progress_percentage(), 50.0);
995
996 progress.migrated_messages = 100;
997 assert_eq!(progress.progress_percentage(), 100.0);
998 }
999
1000 #[test]
1001 fn test_migration_progress_messages_per_second() {
1002 let mut progress = MigrationProgress::new();
1003 progress.started_at = chrono::Utc::now().timestamp() - 10;
1004 progress.last_updated_at = chrono::Utc::now().timestamp();
1005 progress.migrated_messages = 100;
1006
1007 let rate = progress.messages_per_second();
1008 assert!(rate > 0.0);
1009 assert!(rate <= 10.0);
1010 }
1011
1012 #[test]
1013 fn test_migration_stats_from_progress() {
1014 let mut progress = MigrationProgress::new();
1015 progress.total_users = 5;
1016 progress.total_mailboxes = 20;
1017 progress.total_messages = 1000;
1018 progress.migrated_messages = 1000;
1019 progress.total_bytes = 10_485_760;
1020 progress.migrated_bytes = 10_485_760;
1021 progress.started_at = chrono::Utc::now().timestamp() - 100;
1022 progress.completed_at = Some(chrono::Utc::now().timestamp());
1023
1024 let stats = MigrationStats::from_progress(&progress);
1025 assert_eq!(stats.total_users, 5);
1026 assert_eq!(stats.total_mailboxes, 20);
1027 assert_eq!(stats.total_messages, 1000);
1028 assert!(stats.duration_secs > 0);
1029 assert!(stats.throughput_msg_sec > 0.0);
1030 }
1031
1032 #[test]
1033 fn test_integrity_report_new() {
1034 let report = IntegrityReport::new();
1035 assert_eq!(report.total_mailboxes, 0);
1036 assert_eq!(report.total_messages, 0);
1037 assert!(report.orphaned_messages.is_empty());
1038 assert!(report.errors.is_empty());
1039 }
1040
1041 #[test]
1042 fn test_migration_config_custom() {
1043 let config = MigrationConfig {
1044 source_type: BackendType::Postgres,
1045 dest_type: BackendType::Amaters,
1046 batch_size: 500,
1047 parallel: 8,
1048 verify: false,
1049 dry_run: true,
1050 resume: true,
1051 ..Default::default()
1052 };
1053
1054 assert_eq!(config.source_type, BackendType::Postgres);
1055 assert_eq!(config.dest_type, BackendType::Amaters);
1056 assert_eq!(config.batch_size, 500);
1057 assert_eq!(config.parallel, 8);
1058 assert!(!config.verify);
1059 assert!(config.dry_run);
1060 assert!(config.resume);
1061 }
1062
1063 #[test]
1064 fn test_backend_type_equality() {
1065 assert_eq!(BackendType::Filesystem, BackendType::Filesystem);
1066 assert_ne!(BackendType::Filesystem, BackendType::Postgres);
1067 }
1068
1069 #[tokio::test]
1070 async fn test_migrator_creation() {
1071 let config = MigrationConfig::default();
1072 let migrator = StorageMigrator::new(config);
1073 assert_eq!(migrator.progress.total_users, 0);
1074 }
1075
1076 #[test]
1077 fn test_progress_failed_messages() {
1078 let mut progress = MigrationProgress::new();
1079 progress.failed_messages.push("msg1".to_string());
1080 progress.failed_messages.push("msg2".to_string());
1081 assert_eq!(progress.failed_messages.len(), 2);
1082 }
1083
1084 #[test]
1085 fn test_migration_stats_fields() {
1086 let progress = MigrationProgress::new();
1087 let stats = MigrationStats::from_progress(&progress);
1088 assert_eq!(stats.total_bytes, 0);
1089 assert_eq!(stats.migrated_bytes, 0);
1090 assert_eq!(stats.failed_messages, 0);
1091 }
1092
1093 #[test]
1094 fn test_integrity_report_with_errors() {
1095 let mut report = IntegrityReport::new();
1096 report.errors.push("Error 1".to_string());
1097 report.errors.push("Error 2".to_string());
1098 assert_eq!(report.errors.len(), 2);
1099 }
1100
1101 #[test]
1102 fn test_backend_type_all_variants() {
1103 let fs = BackendType::Filesystem;
1104 let pg = BackendType::Postgres;
1105 let am = BackendType::Amaters;
1106
1107 assert_ne!(fs, pg);
1108 assert_ne!(pg, am);
1109 assert_ne!(fs, am);
1110 }
1111
1112 #[test]
1113 fn test_migration_progress_completion() {
1114 let mut progress = MigrationProgress::new();
1115 assert!(progress.completed_at.is_none());
1116
1117 progress.completed_at = Some(chrono::Utc::now().timestamp());
1118 assert!(progress.completed_at.is_some());
1119 }
1120
1121 #[test]
1122 fn test_migration_progress_eta() {
1123 let mut progress = MigrationProgress::new();
1124 assert!(progress.eta_seconds().is_none());
1125
1126 progress.started_at = chrono::Utc::now().timestamp() - 10;
1127 progress.last_updated_at = chrono::Utc::now().timestamp();
1128 progress.total_messages = 1000;
1129 progress.migrated_messages = 100;
1130
1131 let eta = progress.eta_seconds();
1132 assert!(eta.is_some());
1133 }
1134
1135 #[test]
1136 fn test_message_checksum_compute() {
1137 use bytes::Bytes;
1138 use rusmes_proto::{HeaderMap, MessageBody, MimeMessage};
1139
1140 let headers = HeaderMap::new();
1141 let body = MessageBody::Small(Bytes::from("Test body"));
1142 let message = MimeMessage::new(headers, body);
1143
1144 let mail = Mail::new(
1145 Some("sender@example.com".parse().unwrap()),
1146 vec!["recipient@example.com".parse().unwrap()],
1147 message,
1148 None,
1149 None,
1150 );
1151
1152 let checksum = MessageChecksum::compute(&mail);
1153
1154 assert!(!checksum.sha256.is_empty());
1155 assert_eq!(checksum.sha256.len(), 64);
1156 assert!(checksum.size > 0);
1157 }
1158
1159 #[test]
1160 fn test_migration_progress_default() {
1161 let progress = MigrationProgress::default();
1162 assert_eq!(progress.total_users, 0);
1163 assert_eq!(progress.migrated_users, 0);
1164 }
1165
1166 #[test]
1167 fn test_integrity_report_default() {
1168 let report = IntegrityReport::default();
1169 assert_eq!(report.total_mailboxes, 0);
1170 assert_eq!(report.total_messages, 0);
1171 }
1172
1173 #[test]
1174 fn test_backend_type_serialization() {
1175 let backend = BackendType::Filesystem;
1176 let json = serde_json::to_string(&backend).unwrap();
1177 let deserialized: BackendType = serde_json::from_str(&json).unwrap();
1178 assert_eq!(backend, deserialized);
1179 }
1180
1181 #[test]
1182 fn test_migration_config_batch_size() {
1183 let mut config = MigrationConfig::default();
1184 assert_eq!(config.batch_size, 100);
1185
1186 config.batch_size = 200;
1187 assert_eq!(config.batch_size, 200);
1188 }
1189
1190 #[test]
1191 fn test_migration_config_parallel() {
1192 let mut config = MigrationConfig::default();
1193 assert_eq!(config.parallel, 4);
1194
1195 config.parallel = 8;
1196 assert_eq!(config.parallel, 8);
1197 }
1198
1199 #[test]
1200 fn test_migration_progress_bytes_tracking() {
1201 let mut progress = MigrationProgress::new();
1202 progress.total_bytes = 1_048_576;
1203 progress.migrated_bytes = 524_288;
1204
1205 assert_eq!(progress.total_bytes, 1_048_576);
1206 assert_eq!(progress.migrated_bytes, 524_288);
1207 }
1208
1209 #[test]
1210 fn test_integrity_report_orphaned_messages() {
1211 let mut report = IntegrityReport::new();
1212 report.orphaned_messages.push("msg1".to_string());
1213 report.orphaned_messages.push("msg2".to_string());
1214
1215 assert_eq!(report.orphaned_messages.len(), 2);
1216 }
1217}