1use anyhow::{Context, Result};
4use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
5use rusmes_proto::{Mail, MessageId, Username};
6use rusmes_storage::backends::{
7 amaters::{AmatersBackend, AmatersConfig},
8 filesystem::FilesystemBackend,
9 postgres_complete::PostgresCompleteBackend,
10};
11use rusmes_storage::{MailboxId, MessageStore, StorageBackend};
12use serde::{Deserialize, Serialize};
13use sha2::{Digest, Sha256};
14use std::collections::HashMap;
15use std::path::PathBuf;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18use tokio::sync::Semaphore;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
22pub enum BackendType {
23 Filesystem,
24 Postgres,
25 Amaters,
26}
27
28impl std::str::FromStr for BackendType {
29 type Err = anyhow::Error;
30
31 fn from_str(s: &str) -> Result<Self> {
32 match s.to_lowercase().as_str() {
33 "filesystem" | "fs" => Ok(BackendType::Filesystem),
34 "postgres" | "postgresql" | "pg" => Ok(BackendType::Postgres),
35 "amaters" => Ok(BackendType::Amaters),
36 _ => Err(anyhow::anyhow!("Unknown backend type: {}", s)),
37 }
38 }
39}
40
41impl std::fmt::Display for BackendType {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 match self {
44 BackendType::Filesystem => write!(f, "filesystem"),
45 BackendType::Postgres => write!(f, "postgres"),
46 BackendType::Amaters => write!(f, "amaters"),
47 }
48 }
49}
50
51#[derive(Debug, Clone)]
53pub struct MigrationConfig {
54 pub source_type: BackendType,
55 pub source_config: String,
56 pub dest_type: BackendType,
57 pub dest_config: String,
58 pub batch_size: usize,
59 pub parallel: usize,
60 pub verify: bool,
61 pub dry_run: bool,
62 pub resume: bool,
63}
64
65impl Default for MigrationConfig {
66 fn default() -> Self {
67 Self {
68 source_type: BackendType::Filesystem,
69 source_config: "/var/lib/rusmes/mail".to_string(),
70 dest_type: BackendType::Postgres,
71 dest_config: "postgresql://localhost/rusmes".to_string(),
72 batch_size: 100,
73 parallel: 4,
74 verify: true,
75 dry_run: false,
76 resume: false,
77 }
78 }
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct MigrationProgress {
84 pub total_users: usize,
85 pub migrated_users: usize,
86 pub total_mailboxes: usize,
87 pub migrated_mailboxes: usize,
88 pub total_messages: usize,
89 pub migrated_messages: usize,
90 pub total_bytes: u64,
91 pub migrated_bytes: u64,
92 pub failed_messages: Vec<String>,
93 pub migrated_user_list: Vec<String>,
94 pub migrated_mailbox_map: HashMap<String, String>,
95 pub started_at: i64,
96 pub last_updated_at: i64,
97 pub completed_at: Option<i64>,
98}
99
100impl MigrationProgress {
101 pub fn new() -> Self {
102 let now = chrono::Utc::now().timestamp();
103 Self {
104 total_users: 0,
105 migrated_users: 0,
106 total_mailboxes: 0,
107 migrated_mailboxes: 0,
108 total_messages: 0,
109 migrated_messages: 0,
110 total_bytes: 0,
111 migrated_bytes: 0,
112 failed_messages: Vec::new(),
113 migrated_user_list: Vec::new(),
114 migrated_mailbox_map: HashMap::new(),
115 started_at: now,
116 last_updated_at: now,
117 completed_at: None,
118 }
119 }
120
121 pub fn save_to_file(&self, path: &PathBuf) -> Result<()> {
122 let json = serde_json::to_string_pretty(self)?;
123 std::fs::write(path, json)?;
124 Ok(())
125 }
126
127 pub fn load_from_file(path: &PathBuf) -> Result<Self> {
128 let json = std::fs::read_to_string(path)?;
129 Ok(serde_json::from_str(&json)?)
130 }
131
132 pub fn mark_user_migrated(&mut self, user: &str) {
133 self.migrated_user_list.push(user.to_string());
134 self.migrated_users += 1;
135 self.last_updated_at = chrono::Utc::now().timestamp();
136 }
137
138 pub fn is_user_migrated(&self, user: &str) -> bool {
139 self.migrated_user_list.contains(&user.to_string())
140 }
141
142 pub fn mark_mailbox_migrated(&mut self, mailbox_key: String, mailbox_id: String) {
143 self.migrated_mailbox_map.insert(mailbox_key, mailbox_id);
144 self.migrated_mailboxes += 1;
145 self.last_updated_at = chrono::Utc::now().timestamp();
146 }
147
148 pub fn is_mailbox_migrated(&self, mailbox_key: &str) -> bool {
149 self.migrated_mailbox_map.contains_key(mailbox_key)
150 }
151
152 pub fn progress_percentage(&self) -> f64 {
153 if self.total_messages == 0 {
154 0.0
155 } else {
156 (self.migrated_messages as f64 / self.total_messages as f64) * 100.0
157 }
158 }
159
160 pub fn eta_seconds(&self) -> Option<u64> {
161 if self.migrated_messages == 0 {
162 return None;
163 }
164
165 let elapsed = self.last_updated_at - self.started_at;
166 if elapsed <= 0 {
167 return None;
168 }
169
170 let remaining = self.total_messages.saturating_sub(self.migrated_messages);
171 let rate = self.migrated_messages as f64 / elapsed as f64;
172
173 if rate <= 0.0 {
174 return None;
175 }
176
177 Some((remaining as f64 / rate) as u64)
178 }
179
180 pub fn messages_per_second(&self) -> f64 {
181 let elapsed = self.last_updated_at - self.started_at;
182 if elapsed <= 0 {
183 return 0.0;
184 }
185 self.migrated_messages as f64 / elapsed as f64
186 }
187}
188
189impl Default for MigrationProgress {
190 fn default() -> Self {
191 Self::new()
192 }
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize)]
197pub struct MigrationStats {
198 pub total_users: usize,
199 pub total_mailboxes: usize,
200 pub total_messages: usize,
201 pub total_bytes: u64,
202 pub migrated_bytes: u64,
203 pub failed_messages: usize,
204 pub duration_secs: u64,
205 pub throughput_msg_sec: f64,
206 pub throughput_mbps: f64,
207}
208
209impl MigrationStats {
210 pub fn from_progress(progress: &MigrationProgress) -> Self {
211 let duration = if let Some(completed) = progress.completed_at {
212 (completed - progress.started_at) as u64
213 } else {
214 (chrono::Utc::now().timestamp() - progress.started_at) as u64
215 };
216
217 let duration_secs = duration.max(1);
218 let throughput_msg_sec = progress.migrated_messages as f64 / duration_secs as f64;
219 let throughput_mbps = if duration_secs > 0 {
220 (progress.migrated_bytes as f64 / 1_048_576.0) / duration_secs as f64
221 } else {
222 0.0
223 };
224
225 Self {
226 total_users: progress.total_users,
227 total_mailboxes: progress.total_mailboxes,
228 total_messages: progress.total_messages,
229 total_bytes: progress.total_bytes,
230 migrated_bytes: progress.migrated_bytes,
231 failed_messages: progress.failed_messages.len(),
232 duration_secs,
233 throughput_msg_sec,
234 throughput_mbps,
235 }
236 }
237
238 pub fn print(&self) {
239 println!("\n=== Migration Statistics ===");
240 println!("Users migrated: {}", self.total_users);
241 println!("Mailboxes migrated: {}", self.total_mailboxes);
242 println!("Messages migrated: {}", self.total_messages);
243 println!(
244 "Data migrated: {:.2} MB",
245 self.migrated_bytes as f64 / 1_048_576.0
246 );
247 println!("Failed messages: {}", self.failed_messages);
248 println!("Duration: {} seconds", self.duration_secs);
249 println!("Throughput: {:.2} msg/s", self.throughput_msg_sec);
250 println!("Throughput: {:.2} MB/s", self.throughput_mbps);
251 }
252}
253
254#[derive(Debug, Clone, Serialize, Deserialize)]
256pub struct MessageChecksum {
257 pub message_id: String,
258 pub size: usize,
259 pub sha256: String,
260}
261
262impl MessageChecksum {
263 pub fn compute(mail: &Mail) -> Self {
264 let size = mail.size();
265
266 let mut hasher = Sha256::new();
269 hasher.update(mail.message_id().to_string().as_bytes());
270 hasher.update(size.to_le_bytes());
271 let hash = hasher.finalize();
272
273 Self {
274 message_id: mail.message_id().to_string(),
275 size,
276 sha256: format!("{:x}", hash),
277 }
278 }
279}
280
281pub struct StorageMigrator {
283 config: MigrationConfig,
284 progress: MigrationProgress,
285 progress_file: PathBuf,
286 backup_dir: PathBuf,
287}
288
289impl StorageMigrator {
290 pub fn new(config: MigrationConfig) -> Self {
292 let progress_file = PathBuf::from("/tmp/rusmes_migration_progress.json");
293 let backup_dir = PathBuf::from("/tmp/rusmes_migration_backup");
294
295 let progress = if config.resume && progress_file.exists() {
296 MigrationProgress::load_from_file(&progress_file)
297 .unwrap_or_else(|_| MigrationProgress::new())
298 } else {
299 MigrationProgress::new()
300 };
301
302 Self {
303 config,
304 progress,
305 progress_file,
306 backup_dir,
307 }
308 }
309
310 pub async fn migrate(&mut self) -> Result<MigrationStats> {
312 println!(
313 "Starting migration from {} to {}",
314 self.config.source_type, self.config.dest_type
315 );
316
317 if self.config.dry_run {
318 println!("DRY RUN MODE - No changes will be made");
319 }
320
321 if self.config.resume {
322 println!("Resuming migration from previous state");
323 println!(
324 "Previously migrated: {} users, {} mailboxes, {} messages",
325 self.progress.migrated_users,
326 self.progress.migrated_mailboxes,
327 self.progress.migrated_messages
328 );
329 }
330
331 if !self.config.dry_run {
333 std::fs::create_dir_all(&self.backup_dir)?;
334 }
335
336 let source = self.create_source_backend().await?;
338 let dest = self.create_dest_backend().await?;
339
340 if !self.config.dry_run && !self.config.resume {
342 println!("Creating backup of destination state...");
343 self.backup_destination_state(dest.as_ref()).await?;
344 }
345
346 let users = self.get_users(source.as_ref()).await?;
348 self.progress.total_users = users.len();
349
350 self.count_totals(source.as_ref(), &users).await?;
352
353 println!(
354 "Migration scope: {} users, {} mailboxes, {} messages ({:.2} MB)",
355 self.progress.total_users,
356 self.progress.total_mailboxes,
357 self.progress.total_messages,
358 self.progress.total_bytes as f64 / 1_048_576.0
359 );
360
361 let multi_progress = MultiProgress::new();
363 let user_pb = multi_progress.add(ProgressBar::new(users.len() as u64));
364 user_pb.set_style(
365 ProgressStyle::default_bar()
366 .template("[{elapsed_precise}] Users {bar:30.cyan/blue} {pos}/{len} ({eta})")
367 .expect("Invalid progress bar template")
368 .progress_chars("=>-"),
369 );
370
371 let message_pb = multi_progress.add(ProgressBar::new(self.progress.total_messages as u64));
372 message_pb.set_style(
373 ProgressStyle::default_bar()
374 .template("[{elapsed_precise}] Messages {bar:30.green/blue} {pos}/{len} ({msg})")
375 .expect("Invalid progress bar template")
376 .progress_chars("=>-"),
377 );
378
379 let start_time = Instant::now();
380
381 for user in &users {
383 if self.config.resume && self.progress.is_user_migrated(&user.to_string()) {
384 user_pb.inc(1);
385 continue;
386 }
387
388 if !self.config.dry_run {
389 self.migrate_user(source.as_ref(), dest.as_ref(), user, &message_pb)
390 .await?;
391 }
392
393 self.progress.mark_user_migrated(&user.to_string());
394 user_pb.inc(1);
395
396 if !self.config.dry_run {
398 self.progress.save_to_file(&self.progress_file)?;
399 }
400
401 let rate = self.progress.messages_per_second();
403 message_pb.set_message(format!("{:.1} msg/s", rate));
404 }
405
406 user_pb.finish_with_message("All users migrated");
407 message_pb.finish_with_message("All messages migrated");
408
409 self.progress.completed_at = Some(chrono::Utc::now().timestamp());
410
411 if !self.config.dry_run {
412 self.progress.save_to_file(&self.progress_file)?;
413 }
414
415 let stats = MigrationStats::from_progress(&self.progress);
416
417 if self.config.verify && !self.config.dry_run {
419 println!("\nVerifying migration integrity...");
420 self.verify_migration(source.as_ref(), dest.as_ref())
421 .await?;
422 }
423
424 let elapsed = start_time.elapsed();
425 println!("\nMigration completed in {:.2}s", elapsed.as_secs_f64());
426
427 Ok(stats)
428 }
429
430 async fn create_source_backend(&self) -> Result<Box<dyn StorageBackend>> {
431 match self.config.source_type {
432 BackendType::Filesystem => {
433 Ok(Box::new(FilesystemBackend::new(&self.config.source_config)))
434 }
435 BackendType::Postgres => {
436 let backend = PostgresCompleteBackend::new(&self.config.source_config).await?;
437 Ok(Box::new(backend))
438 }
439 BackendType::Amaters => {
440 let config = AmatersConfig::from_url(&self.config.source_config)?;
441 let backend = AmatersBackend::new(config).await?;
442 Ok(Box::new(backend))
443 }
444 }
445 }
446
447 async fn create_dest_backend(&self) -> Result<Box<dyn StorageBackend>> {
448 match self.config.dest_type {
449 BackendType::Filesystem => {
450 let path = &self.config.dest_config;
451 std::fs::create_dir_all(path)?;
452 Ok(Box::new(FilesystemBackend::new(path)))
453 }
454 BackendType::Postgres => {
455 let backend = PostgresCompleteBackend::new(&self.config.dest_config).await?;
456 if !self.config.dry_run {
457 backend.init_schema().await?;
458 }
459 Ok(Box::new(backend))
460 }
461 BackendType::Amaters => {
462 let config = AmatersConfig::from_url(&self.config.dest_config)?;
463 let backend = AmatersBackend::new(config).await?;
464 if !self.config.dry_run {
465 backend.init_schema().await?;
466 }
467 Ok(Box::new(backend))
468 }
469 }
470 }
471
472 async fn get_users(&self, backend: &dyn StorageBackend) -> Result<Vec<Username>> {
473 backend.list_all_users().await
474 }
475
476 async fn count_totals(
477 &mut self,
478 source: &dyn StorageBackend,
479 users: &[Username],
480 ) -> Result<()> {
481 let mailbox_store = source.mailbox_store();
482 let message_store = source.message_store();
483
484 let mut total_mailboxes = 0;
485 let mut total_messages = 0;
486 let mut total_bytes = 0u64;
487
488 for user in users {
489 let mailboxes = mailbox_store.list_mailboxes(user).await?;
490 total_mailboxes += mailboxes.len();
491
492 for mailbox in mailboxes {
493 let messages = message_store.get_mailbox_messages(mailbox.id()).await?;
494 total_messages += messages.len();
495 total_bytes += messages.iter().map(|m| m.size() as u64).sum::<u64>();
496 }
497 }
498
499 self.progress.total_mailboxes = total_mailboxes;
500 self.progress.total_messages = total_messages;
501 self.progress.total_bytes = total_bytes;
502
503 Ok(())
504 }
505
506 async fn migrate_user(
507 &mut self,
508 source: &dyn StorageBackend,
509 dest: &dyn StorageBackend,
510 user: &Username,
511 message_pb: &ProgressBar,
512 ) -> Result<()> {
513 let source_mailboxes = source.mailbox_store();
514 let dest_mailboxes = dest.mailbox_store();
515 let source_messages = source.message_store();
516 let dest_messages = dest.message_store();
517
518 let mailboxes = source_mailboxes.list_mailboxes(user).await?;
520
521 for mailbox in mailboxes {
522 let mailbox_key = format!("{}:{}", user, mailbox.path());
523
524 if self.config.resume && self.progress.is_mailbox_migrated(&mailbox_key) {
525 continue;
526 }
527
528 let dest_mailbox_id = dest_mailboxes.create_mailbox(mailbox.path()).await?;
530
531 self.progress
532 .mark_mailbox_migrated(mailbox_key, dest_mailbox_id.to_string());
533
534 let messages = source_messages.get_mailbox_messages(mailbox.id()).await?;
536
537 let semaphore = Arc::new(Semaphore::new(self.config.parallel));
538 let mut handles = vec![];
539
540 for chunk in messages.chunks(self.config.batch_size) {
541 for message_meta in chunk {
542 let permit = semaphore.clone().acquire_owned().await?;
543 let source_messages = Arc::clone(&source_messages);
544 let dest_messages = Arc::clone(&dest_messages);
545 let message_id = *message_meta.message_id();
546 let mailbox_id = dest_mailbox_id;
547 let size = message_meta.size();
548 let flags = message_meta.flags().clone();
549
550 let handle = tokio::spawn(async move {
551 let _permit = permit;
552 let result = Self::migrate_single_message(
553 &source_messages,
554 &dest_messages,
555 &message_id,
556 &mailbox_id,
557 )
558 .await;
559 (message_id, size, flags, result)
560 });
561
562 handles.push(handle);
563 }
564
565 for handle in handles.drain(..) {
567 match handle.await {
568 Ok((msg_id, size, _flags, result)) => match result {
569 Ok(_) => {
570 self.progress.migrated_messages += 1;
571 self.progress.migrated_bytes += size as u64;
572 message_pb.inc(1);
573 }
574 Err(e) => {
575 tracing::error!("Failed to migrate message {}: {}", msg_id, e);
576 self.progress.failed_messages.push(msg_id.to_string());
577 }
578 },
579 Err(e) => {
580 tracing::error!("Task failed: {}", e);
581 }
582 }
583 }
584 }
585 }
586
587 Ok(())
588 }
589
590 async fn migrate_single_message(
591 source: &Arc<dyn MessageStore>,
592 dest: &Arc<dyn MessageStore>,
593 message_id: &MessageId,
594 dest_mailbox_id: &MailboxId,
595 ) -> Result<()> {
596 let message = source
597 .get_message(message_id)
598 .await?
599 .context("Message not found in source")?;
600
601 dest.append_message(dest_mailbox_id, message).await?;
602
603 Ok(())
604 }
605
606 async fn verify_migration(
607 &self,
608 source: &dyn StorageBackend,
609 dest: &dyn StorageBackend,
610 ) -> Result<()> {
611 let source_mailboxes = source.mailbox_store();
612 let dest_mailboxes = dest.mailbox_store();
613 let source_messages = source.message_store();
614 let dest_messages = dest.message_store();
615
616 let users = self.get_users(source).await?;
617
618 let pb = ProgressBar::new(users.len() as u64);
619 pb.set_style(
620 ProgressStyle::default_bar()
621 .template("[{elapsed_precise}] Verifying {bar:40.yellow/blue} {pos}/{len}")
622 .expect("Invalid progress bar template")
623 .progress_chars("=>-"),
624 );
625
626 for user in &users {
627 let source_mboxes = source_mailboxes.list_mailboxes(user).await?;
628 let dest_mboxes = dest_mailboxes.list_mailboxes(user).await?;
629
630 if source_mboxes.len() != dest_mboxes.len() {
631 return Err(anyhow::anyhow!(
632 "Mailbox count mismatch for user {}: source={}, dest={}",
633 user,
634 source_mboxes.len(),
635 dest_mboxes.len()
636 ));
637 }
638
639 for (src_mbox, dst_mbox) in source_mboxes.iter().zip(dest_mboxes.iter()) {
641 let src_msgs = source_messages.get_mailbox_messages(src_mbox.id()).await?;
642 let dst_msgs = dest_messages.get_mailbox_messages(dst_mbox.id()).await?;
643
644 if src_msgs.len() != dst_msgs.len() {
645 return Err(anyhow::anyhow!(
646 "Message count mismatch in mailbox {}: source={}, dest={}",
647 src_mbox.path(),
648 src_msgs.len(),
649 dst_msgs.len()
650 ));
651 }
652
653 if !src_msgs.is_empty() {
655 let sample_size = (src_msgs.len() / 10).clamp(1, 10);
656 for i in 0..sample_size {
657 let idx = i * (src_msgs.len() / sample_size);
658 if let Some(src_meta) = src_msgs.get(idx) {
659 if let (Some(src_msg), Some(dst_msg)) = (
660 source_messages.get_message(src_meta.message_id()).await?,
661 dest_messages.get_message(src_meta.message_id()).await?,
662 ) {
663 let src_checksum = MessageChecksum::compute(&src_msg);
664 let dst_checksum = MessageChecksum::compute(&dst_msg);
665
666 if src_checksum.sha256 != dst_checksum.sha256 {
667 return Err(anyhow::anyhow!(
668 "Checksum mismatch for message {}",
669 src_meta.message_id()
670 ));
671 }
672 }
673 }
674 }
675 }
676 }
677
678 pb.inc(1);
679 }
680
681 pb.finish_with_message("Verification completed successfully");
682 println!("Verification passed: all mailboxes and messages verified");
683 Ok(())
684 }
685
686 async fn backup_destination_state(&self, _dest: &dyn StorageBackend) -> Result<()> {
687 let backup_metadata = serde_json::json!({
688 "timestamp": chrono::Utc::now().to_rfc3339(),
689 "dest_type": self.config.dest_type,
690 "dest_config": self.config.dest_config,
691 });
692
693 let backup_file = self.backup_dir.join("migration_backup_metadata.json");
694 std::fs::write(backup_file, serde_json::to_string_pretty(&backup_metadata)?)?;
695
696 println!("Backup metadata saved to {:?}", self.backup_dir);
697 Ok(())
698 }
699
700 pub async fn rollback(&self) -> Result<()> {
702 println!("Rolling back migration...");
703
704 let backup_file = self.backup_dir.join("migration_backup_metadata.json");
705 if !backup_file.exists() {
706 return Err(anyhow::anyhow!("No backup found to rollback"));
707 }
708
709 println!(
710 "Rollback would restore from backup at {:?}",
711 self.backup_dir
712 );
713 println!("Note: Full rollback implementation requires backend-specific restore logic");
714
715 Ok(())
716 }
717
718 pub fn get_progress(&self) -> &MigrationProgress {
720 &self.progress
721 }
722
723 pub fn print_report(&self) {
725 println!("\n=== Migration Report ===");
726 println!(
727 "Users: {}/{}",
728 self.progress.migrated_users, self.progress.total_users
729 );
730 println!(
731 "Mailboxes: {}/{}",
732 self.progress.migrated_mailboxes, self.progress.total_mailboxes
733 );
734 println!(
735 "Messages: {}/{}",
736 self.progress.migrated_messages, self.progress.total_messages
737 );
738 println!(
739 "Data: {:.2}/{:.2} MB",
740 self.progress.migrated_bytes as f64 / 1_048_576.0,
741 self.progress.total_bytes as f64 / 1_048_576.0
742 );
743
744 if !self.progress.failed_messages.is_empty() {
745 println!("\nFailed messages: {}", self.progress.failed_messages.len());
746 for msg_id in self.progress.failed_messages.iter().take(10) {
747 println!(" - {}", msg_id);
748 }
749 if self.progress.failed_messages.len() > 10 {
750 println!(
751 " ... and {} more",
752 self.progress.failed_messages.len() - 10
753 );
754 }
755 }
756
757 let stats = MigrationStats::from_progress(&self.progress);
758 println!("\nDuration: {} seconds", stats.duration_secs);
759 println!(
760 "Throughput: {:.2} messages/second",
761 stats.throughput_msg_sec
762 );
763 println!("Throughput: {:.2} MB/second", stats.throughput_mbps);
764
765 if let Some(eta) = self.progress.eta_seconds() {
766 let eta_duration = Duration::from_secs(eta);
767 println!("ETA: {:?}", eta_duration);
768 }
769 }
770}
771
772pub struct IntegrityChecker {
774 backend: Box<dyn StorageBackend>,
775}
776
777impl IntegrityChecker {
778 pub fn new(backend: Box<dyn StorageBackend>) -> Self {
779 Self { backend }
780 }
781
782 pub async fn check(&self) -> Result<IntegrityReport> {
784 let mut report = IntegrityReport::new();
785
786 let mailbox_store = self.backend.mailbox_store();
787 let message_store = self.backend.message_store();
788
789 let users = vec![
791 Username::new("user1@example.com".to_string())?,
792 Username::new("user2@example.com".to_string())?,
793 ];
794
795 for user in &users {
796 let mailboxes = mailbox_store.list_mailboxes(user).await?;
797 report.total_mailboxes += mailboxes.len();
798
799 for mailbox in mailboxes {
800 let messages = message_store.get_mailbox_messages(mailbox.id()).await?;
801 report.total_messages += messages.len();
802
803 for msg_meta in messages {
805 match message_store.get_message(msg_meta.message_id()).await {
806 Ok(Some(_)) => {}
807 Ok(None) => {
808 report
809 .orphaned_messages
810 .push(msg_meta.message_id().to_string());
811 }
812 Err(e) => {
813 report.errors.push(format!(
814 "Error reading message {}: {}",
815 msg_meta.message_id(),
816 e
817 ));
818 }
819 }
820 }
821 }
822 }
823
824 Ok(report)
825 }
826}
827
828#[derive(Debug, Clone, Serialize, Deserialize)]
830pub struct IntegrityReport {
831 pub total_mailboxes: usize,
832 pub total_messages: usize,
833 pub orphaned_messages: Vec<String>,
834 pub errors: Vec<String>,
835}
836
837impl IntegrityReport {
838 pub fn new() -> Self {
839 Self {
840 total_mailboxes: 0,
841 total_messages: 0,
842 orphaned_messages: Vec::new(),
843 errors: Vec::new(),
844 }
845 }
846
847 pub fn print(&self) {
848 println!("\n=== Integrity Report ===");
849 println!("Total mailboxes: {}", self.total_mailboxes);
850 println!("Total messages: {}", self.total_messages);
851 println!("Orphaned messages: {}", self.orphaned_messages.len());
852
853 if !self.orphaned_messages.is_empty() {
854 println!("\nOrphaned messages:");
855 for msg_id in self.orphaned_messages.iter().take(10) {
856 println!(" - {}", msg_id);
857 }
858 if self.orphaned_messages.len() > 10 {
859 println!(" ... and {} more", self.orphaned_messages.len() - 10);
860 }
861 }
862
863 if !self.errors.is_empty() {
864 println!("\nErrors:");
865 for error in self.errors.iter().take(10) {
866 println!(" - {}", error);
867 }
868 if self.errors.len() > 10 {
869 println!(" ... and {} more", self.errors.len() - 10);
870 }
871 }
872
873 if self.orphaned_messages.is_empty() && self.errors.is_empty() {
874 println!("\nNo integrity issues found");
875 }
876 }
877}
878
879impl Default for IntegrityReport {
880 fn default() -> Self {
881 Self::new()
882 }
883}
884
885#[cfg(test)]
886mod tests {
887 use super::*;
888
889 #[test]
890 fn test_backend_type_from_str() {
891 assert_eq!(
892 "filesystem".parse::<BackendType>().unwrap(),
893 BackendType::Filesystem
894 );
895 assert_eq!(
896 "postgres".parse::<BackendType>().unwrap(),
897 BackendType::Postgres
898 );
899 assert_eq!(
900 "amaters".parse::<BackendType>().unwrap(),
901 BackendType::Amaters
902 );
903 assert!("unknown".parse::<BackendType>().is_err());
904 }
905
906 #[test]
907 fn test_backend_type_case_insensitive() {
908 assert_eq!(
909 "FILESYSTEM".parse::<BackendType>().unwrap(),
910 BackendType::Filesystem
911 );
912 assert_eq!(
913 "PostgreSQL".parse::<BackendType>().unwrap(),
914 BackendType::Postgres
915 );
916 }
917
918 #[test]
919 fn test_backend_type_aliases() {
920 assert_eq!(
921 "fs".parse::<BackendType>().unwrap(),
922 BackendType::Filesystem
923 );
924 assert_eq!("pg".parse::<BackendType>().unwrap(), BackendType::Postgres);
925 assert_eq!(
926 "postgresql".parse::<BackendType>().unwrap(),
927 BackendType::Postgres
928 );
929 }
930
931 #[test]
932 fn test_backend_type_display() {
933 assert_eq!(BackendType::Filesystem.to_string(), "filesystem");
934 assert_eq!(BackendType::Postgres.to_string(), "postgres");
935 assert_eq!(BackendType::Amaters.to_string(), "amaters");
936 }
937
938 #[test]
939 fn test_migration_config_default() {
940 let config = MigrationConfig::default();
941 assert_eq!(config.source_type, BackendType::Filesystem);
942 assert_eq!(config.dest_type, BackendType::Postgres);
943 assert_eq!(config.batch_size, 100);
944 assert_eq!(config.parallel, 4);
945 assert!(config.verify);
946 assert!(!config.dry_run);
947 assert!(!config.resume);
948 }
949
950 #[test]
951 fn test_migration_progress_new() {
952 let progress = MigrationProgress::new();
953 assert_eq!(progress.total_users, 0);
954 assert_eq!(progress.migrated_users, 0);
955 assert_eq!(progress.total_mailboxes, 0);
956 assert_eq!(progress.migrated_messages, 0);
957 assert_eq!(progress.total_bytes, 0);
958 assert_eq!(progress.migrated_bytes, 0);
959 assert!(progress.completed_at.is_none());
960 assert!(progress.migrated_user_list.is_empty());
961 assert!(progress.migrated_mailbox_map.is_empty());
962 }
963
964 #[test]
965 fn test_migration_progress_serialization() {
966 let progress = MigrationProgress::new();
967 let json = serde_json::to_string(&progress).unwrap();
968 let deserialized: MigrationProgress = serde_json::from_str(&json).unwrap();
969 assert_eq!(progress.total_users, deserialized.total_users);
970 assert_eq!(progress.migrated_users, deserialized.migrated_users);
971 }
972
973 #[test]
974 fn test_migration_progress_mark_user_migrated() {
975 let mut progress = MigrationProgress::new();
976 progress.total_users = 2;
977
978 assert!(!progress.is_user_migrated("user1"));
979 progress.mark_user_migrated("user1");
980 assert!(progress.is_user_migrated("user1"));
981 assert_eq!(progress.migrated_users, 1);
982 assert!(!progress.is_user_migrated("user2"));
983 }
984
985 #[test]
986 fn test_migration_progress_mark_mailbox_migrated() {
987 let mut progress = MigrationProgress::new();
988
989 assert!(!progress.is_mailbox_migrated("user1:INBOX"));
990 progress.mark_mailbox_migrated("user1:INBOX".to_string(), "mailbox-123".to_string());
991 assert!(progress.is_mailbox_migrated("user1:INBOX"));
992 assert_eq!(progress.migrated_mailboxes, 1);
993 }
994
995 #[test]
996 fn test_migration_progress_percentage() {
997 let mut progress = MigrationProgress::new();
998 assert_eq!(progress.progress_percentage(), 0.0);
999
1000 progress.total_messages = 100;
1001 progress.migrated_messages = 50;
1002 assert_eq!(progress.progress_percentage(), 50.0);
1003
1004 progress.migrated_messages = 100;
1005 assert_eq!(progress.progress_percentage(), 100.0);
1006 }
1007
1008 #[test]
1009 fn test_migration_progress_messages_per_second() {
1010 let mut progress = MigrationProgress::new();
1011 progress.started_at = chrono::Utc::now().timestamp() - 10;
1012 progress.last_updated_at = chrono::Utc::now().timestamp();
1013 progress.migrated_messages = 100;
1014
1015 let rate = progress.messages_per_second();
1016 assert!(rate > 0.0);
1017 assert!(rate <= 10.0);
1018 }
1019
1020 #[test]
1021 fn test_migration_stats_from_progress() {
1022 let mut progress = MigrationProgress::new();
1023 progress.total_users = 5;
1024 progress.total_mailboxes = 20;
1025 progress.total_messages = 1000;
1026 progress.migrated_messages = 1000;
1027 progress.total_bytes = 10_485_760;
1028 progress.migrated_bytes = 10_485_760;
1029 progress.started_at = chrono::Utc::now().timestamp() - 100;
1030 progress.completed_at = Some(chrono::Utc::now().timestamp());
1031
1032 let stats = MigrationStats::from_progress(&progress);
1033 assert_eq!(stats.total_users, 5);
1034 assert_eq!(stats.total_mailboxes, 20);
1035 assert_eq!(stats.total_messages, 1000);
1036 assert!(stats.duration_secs > 0);
1037 assert!(stats.throughput_msg_sec > 0.0);
1038 }
1039
1040 #[test]
1041 fn test_integrity_report_new() {
1042 let report = IntegrityReport::new();
1043 assert_eq!(report.total_mailboxes, 0);
1044 assert_eq!(report.total_messages, 0);
1045 assert!(report.orphaned_messages.is_empty());
1046 assert!(report.errors.is_empty());
1047 }
1048
1049 #[test]
1050 fn test_migration_config_custom() {
1051 let config = MigrationConfig {
1052 source_type: BackendType::Postgres,
1053 dest_type: BackendType::Amaters,
1054 batch_size: 500,
1055 parallel: 8,
1056 verify: false,
1057 dry_run: true,
1058 resume: true,
1059 ..Default::default()
1060 };
1061
1062 assert_eq!(config.source_type, BackendType::Postgres);
1063 assert_eq!(config.dest_type, BackendType::Amaters);
1064 assert_eq!(config.batch_size, 500);
1065 assert_eq!(config.parallel, 8);
1066 assert!(!config.verify);
1067 assert!(config.dry_run);
1068 assert!(config.resume);
1069 }
1070
1071 #[test]
1072 fn test_backend_type_equality() {
1073 assert_eq!(BackendType::Filesystem, BackendType::Filesystem);
1074 assert_ne!(BackendType::Filesystem, BackendType::Postgres);
1075 }
1076
1077 #[tokio::test]
1078 async fn test_migrator_creation() {
1079 let config = MigrationConfig::default();
1080 let migrator = StorageMigrator::new(config);
1081 assert_eq!(migrator.progress.total_users, 0);
1082 }
1083
1084 #[test]
1085 fn test_progress_failed_messages() {
1086 let mut progress = MigrationProgress::new();
1087 progress.failed_messages.push("msg1".to_string());
1088 progress.failed_messages.push("msg2".to_string());
1089 assert_eq!(progress.failed_messages.len(), 2);
1090 }
1091
1092 #[test]
1093 fn test_migration_stats_fields() {
1094 let progress = MigrationProgress::new();
1095 let stats = MigrationStats::from_progress(&progress);
1096 assert_eq!(stats.total_bytes, 0);
1097 assert_eq!(stats.migrated_bytes, 0);
1098 assert_eq!(stats.failed_messages, 0);
1099 }
1100
1101 #[test]
1102 fn test_integrity_report_with_errors() {
1103 let mut report = IntegrityReport::new();
1104 report.errors.push("Error 1".to_string());
1105 report.errors.push("Error 2".to_string());
1106 assert_eq!(report.errors.len(), 2);
1107 }
1108
1109 #[test]
1110 fn test_backend_type_all_variants() {
1111 let fs = BackendType::Filesystem;
1112 let pg = BackendType::Postgres;
1113 let am = BackendType::Amaters;
1114
1115 assert_ne!(fs, pg);
1116 assert_ne!(pg, am);
1117 assert_ne!(fs, am);
1118 }
1119
1120 #[test]
1121 fn test_migration_progress_completion() {
1122 let mut progress = MigrationProgress::new();
1123 assert!(progress.completed_at.is_none());
1124
1125 progress.completed_at = Some(chrono::Utc::now().timestamp());
1126 assert!(progress.completed_at.is_some());
1127 }
1128
1129 #[test]
1130 fn test_migration_progress_eta() {
1131 let mut progress = MigrationProgress::new();
1132 assert!(progress.eta_seconds().is_none());
1133
1134 progress.started_at = chrono::Utc::now().timestamp() - 10;
1135 progress.last_updated_at = chrono::Utc::now().timestamp();
1136 progress.total_messages = 1000;
1137 progress.migrated_messages = 100;
1138
1139 let eta = progress.eta_seconds();
1140 assert!(eta.is_some());
1141 }
1142
1143 #[test]
1144 fn test_message_checksum_compute() {
1145 use bytes::Bytes;
1146 use rusmes_proto::{HeaderMap, MessageBody, MimeMessage};
1147
1148 let headers = HeaderMap::new();
1149 let body = MessageBody::Small(Bytes::from("Test body"));
1150 let message = MimeMessage::new(headers, body);
1151
1152 let mail = Mail::new(
1153 Some("sender@example.com".parse().unwrap()),
1154 vec!["recipient@example.com".parse().unwrap()],
1155 message,
1156 None,
1157 None,
1158 );
1159
1160 let checksum = MessageChecksum::compute(&mail);
1161
1162 assert!(!checksum.sha256.is_empty());
1163 assert_eq!(checksum.sha256.len(), 64);
1164 assert!(checksum.size > 0);
1165 }
1166
1167 #[test]
1168 fn test_migration_progress_default() {
1169 let progress = MigrationProgress::default();
1170 assert_eq!(progress.total_users, 0);
1171 assert_eq!(progress.migrated_users, 0);
1172 }
1173
1174 #[test]
1175 fn test_integrity_report_default() {
1176 let report = IntegrityReport::default();
1177 assert_eq!(report.total_mailboxes, 0);
1178 assert_eq!(report.total_messages, 0);
1179 }
1180
1181 #[test]
1182 fn test_backend_type_serialization() {
1183 let backend = BackendType::Filesystem;
1184 let json = serde_json::to_string(&backend).unwrap();
1185 let deserialized: BackendType = serde_json::from_str(&json).unwrap();
1186 assert_eq!(backend, deserialized);
1187 }
1188
1189 #[test]
1190 fn test_migration_config_batch_size() {
1191 let mut config = MigrationConfig::default();
1192 assert_eq!(config.batch_size, 100);
1193
1194 config.batch_size = 200;
1195 assert_eq!(config.batch_size, 200);
1196 }
1197
1198 #[test]
1199 fn test_migration_config_parallel() {
1200 let mut config = MigrationConfig::default();
1201 assert_eq!(config.parallel, 4);
1202
1203 config.parallel = 8;
1204 assert_eq!(config.parallel, 8);
1205 }
1206
1207 #[test]
1208 fn test_migration_progress_bytes_tracking() {
1209 let mut progress = MigrationProgress::new();
1210 progress.total_bytes = 1_048_576;
1211 progress.migrated_bytes = 524_288;
1212
1213 assert_eq!(progress.total_bytes, 1_048_576);
1214 assert_eq!(progress.migrated_bytes, 524_288);
1215 }
1216
1217 #[test]
1218 fn test_integrity_report_orphaned_messages() {
1219 let mut report = IntegrityReport::new();
1220 report.orphaned_messages.push("msg1".to_string());
1221 report.orphaned_messages.push("msg2".to_string());
1222
1223 assert_eq!(report.orphaned_messages.len(), 2);
1224 }
1225
1226 #[tokio::test]
1236 async fn test_get_users_delegates_to_backend() {
1237 use rusmes_storage::backends::amaters::{AmatersBackend, AmatersConfig};
1238
1239 let backend = AmatersBackend::new(AmatersConfig::default())
1240 .await
1241 .expect("AmatersBackend::new failed");
1242
1243 let config = MigrationConfig::default();
1244 let migrator = StorageMigrator::new(config);
1245
1246 let users = migrator
1247 .get_users(&backend)
1248 .await
1249 .expect("get_users failed");
1250
1251 assert!(
1255 users.is_empty(),
1256 "expected empty user list from a fresh AmatersBackend, got {:?}",
1257 users
1258 );
1259 }
1260
1261 #[test]
1264 fn test_amaters_config_from_url() {
1265 use rusmes_storage::backends::amaters::AmatersConfig;
1266
1267 let cfg = AmatersConfig::from_url("amaters://node1:9042/my_keyspace")
1269 .expect("valid single-endpoint URL should parse");
1270 assert_eq!(cfg.cluster_endpoints, vec!["node1:9042"]);
1271 assert_eq!(cfg.metadata_keyspace, "my_keyspace");
1272 assert_eq!(cfg.blob_keyspace, "my_keyspace_blobs");
1273
1274 let cfg = AmatersConfig::from_url("amaters://host1:9042,host2:9042")
1276 .expect("valid multi-endpoint URL should parse");
1277 assert_eq!(cfg.cluster_endpoints, vec!["host1:9042", "host2:9042"]);
1278 assert_eq!(cfg.metadata_keyspace, "rusmes_metadata");
1279 assert_eq!(cfg.blob_keyspace, "rusmes_blobs");
1280
1281 let cfg = AmatersConfig::from_url("amaters://host1:9042,host2:9042,host3:9042/prod")
1283 .expect("valid three-endpoint URL should parse");
1284 assert_eq!(cfg.cluster_endpoints.len(), 3);
1285 assert_eq!(cfg.metadata_keyspace, "prod");
1286 assert_eq!(cfg.blob_keyspace, "prod_blobs");
1287
1288 assert!(
1290 AmatersConfig::from_url("cassandra://host1:9042/ks").is_err(),
1291 "wrong scheme must be rejected"
1292 );
1293
1294 assert!(
1296 AmatersConfig::from_url("amaters://host1/ks").is_err(),
1297 "endpoint without port must be rejected"
1298 );
1299
1300 assert!(
1302 AmatersConfig::from_url("amaters:///keyspace").is_err(),
1303 "empty host list must be rejected"
1304 );
1305 }
1306}