1use crate::compression::{CompressedData, CompressionConfig, CompressionEngine, CompressionType};
6use crate::crypto::{EncryptedData, EncryptionConfig, EncryptionEngine, KeyManager, MasterKey};
7use crate::error::{BackupError, Result};
8use aes_gcm::aead::Aead;
9use aes_gcm::KeyInit;
10use rayon::prelude::*;
11use rayon::ThreadPoolBuilder;
12use std::io::{Read, Write};
13use std::path::{Path, PathBuf};
14use std::sync::Arc;
15
16#[derive(Debug, Clone)]
18pub struct PipelineConfig {
19 pub encryption: Option<EncryptionConfig>,
21 pub compression: CompressionConfig,
23 pub compression_type: CompressionType,
25 pub performance: PerformanceConfig,
27}
28
29impl Default for PipelineConfig {
30 fn default() -> Self {
31 Self {
32 encryption: None,
33 compression: CompressionConfig::zstd_default(),
34 compression_type: CompressionType::Zstd,
35 performance: PerformanceConfig::default(),
36 }
37 }
38}
39
40impl PipelineConfig {
41 #[must_use]
43 pub fn with_encryption(mut self, config: EncryptionConfig) -> Self {
44 self.encryption = Some(config);
45 self
46 }
47
48 #[must_use]
50 pub fn with_compression(
51 mut self,
52 compression_type: CompressionType,
53 config: CompressionConfig,
54 ) -> Self {
55 self.compression_type = compression_type;
56 self.compression = config;
57 self
58 }
59
60 #[must_use]
62 pub fn fast(mut self) -> Self {
63 self.compression = CompressionConfig::fast(self.compression_type);
64 self.performance = PerformanceConfig::fast();
65 self
66 }
67
68 #[must_use]
70 pub fn best_compression(mut self) -> Self {
71 self.compression = CompressionConfig::best(self.compression_type);
72 self.performance = PerformanceConfig::quality();
73 self
74 }
75}
76
77#[derive(Debug, Clone)]
79pub struct PerformanceConfig {
80 pub parallel_threads: usize,
82 pub buffer_size: usize,
84 pub memory_limit: usize,
86 pub batch_size: usize,
88}
89
90impl Default for PerformanceConfig {
91 fn default() -> Self {
92 Self {
93 parallel_threads: optimal_parallelism(), buffer_size: 1024 * 1024, memory_limit: 512 * 1024 * 1024, batch_size: 32, }
98 }
99}
100
101impl PerformanceConfig {
102 #[must_use]
104 pub fn fast() -> Self {
105 Self {
106 parallel_threads: num_cpus::get(), buffer_size: 2 * 1024 * 1024, memory_limit: 1024 * 1024 * 1024, batch_size: 64, }
111 }
112
113 #[must_use]
115 pub fn quality() -> Self {
116 Self {
117 parallel_threads: (num_cpus::get() / 2).max(1), buffer_size: 512 * 1024, memory_limit: 256 * 1024 * 1024, batch_size: 16, }
122 }
123
124 #[must_use]
126 pub fn with_parallelism(mut self, threads: usize) -> Self {
127 self.parallel_threads = threads.max(1);
128 self
129 }
130
131 #[must_use]
133 pub fn with_batch_size(mut self, size: usize) -> Self {
134 self.batch_size = size.max(1);
135 self
136 }
137}
138
139#[must_use]
144pub fn optimal_parallelism() -> usize {
145 let cpus = num_cpus::get();
146 (cpus * 3 / 4).clamp(1, 32)
147}
148
149#[must_use]
158pub fn dynamic_parallelism(file_count: usize, avg_file_size: u64) -> usize {
159 let base_parallelism = optimal_parallelism();
160
161 if file_count < base_parallelism {
163 return file_count.max(1);
164 }
165
166 if avg_file_size < 1024 * 1024 {
168 return (base_parallelism / 2).max(1);
170 }
171
172 if avg_file_size > 100 * 1024 * 1024 {
174 return (base_parallelism * 4 / 3).min(32);
176 }
177
178 base_parallelism
179}
180
181#[derive(Debug, Clone)]
183pub struct ProcessedData {
184 pub original_path: PathBuf,
186 pub data: Vec<u8>,
188 pub compression_info: Option<CompressedData>,
190 pub encryption_info: Option<EncryptedData>,
192 pub metadata: ProcessingMetadata,
194}
195
196#[derive(Debug, Clone)]
198pub struct ProcessingMetadata {
199 pub original_size: u64,
201 pub compressed_size: u64,
203 pub final_size: u64,
205 pub processing_time_ms: u64,
207 pub compression_ratio: f64,
209 pub memory_usage: u64,
211}
212
213pub struct ProcessingPipeline {
215 config: PipelineConfig,
216 encryption_engine: Option<Arc<EncryptionEngine>>,
217 compression_engine: Arc<CompressionEngine>,
218 key_manager: Option<Arc<KeyManager>>,
219 thread_pool: Option<rayon::ThreadPool>,
220}
221
222impl ProcessingPipeline {
223 #[must_use]
225 pub fn new(config: PipelineConfig) -> Self {
226 let encryption_engine = config
227 .encryption
228 .as_ref()
229 .map(|cfg| Arc::new(EncryptionEngine::new(cfg.clone())));
230
231 let compression_engine = Arc::new(CompressionEngine::new(
232 config.compression_type,
233 config.compression.clone(),
234 ));
235
236 let key_manager = encryption_engine
237 .as_ref()
238 .map(|_| Arc::new(KeyManager::default()));
239
240 let thread_pool = Self::create_thread_pool(&config.performance).ok();
242
243 Self {
244 config,
245 encryption_engine,
246 compression_engine,
247 key_manager,
248 thread_pool,
249 }
250 }
251
252 fn create_thread_pool(performance: &PerformanceConfig) -> Result<rayon::ThreadPool> {
254 ThreadPoolBuilder::new()
255 .num_threads(performance.parallel_threads)
256 .thread_name(|i| format!("backup-worker-{i}"))
257 .stack_size(8 * 1024 * 1024) .build()
259 .map_err(|e| BackupError::Other(anyhow::anyhow!("ThreadPool作成エラー: {e}")))
260 }
261
262 pub fn with_encryption(password: &str) -> Result<(Self, [u8; 16])> {
264 let config = PipelineConfig::default().with_encryption(EncryptionConfig::default());
265 let mut pipeline = Self::new(config);
266
267 let key_manager = KeyManager::default();
268 let (_master_key, salt) = key_manager.create_master_key(password)?;
269
270 pipeline.key_manager = Some(Arc::new(key_manager));
271 Ok((pipeline, salt))
275 }
276
277 pub fn process_file<P: AsRef<Path>>(
279 &self,
280 file_path: P,
281 master_key: Option<&MasterKey>,
282 salt: Option<[u8; 16]>,
283 ) -> Result<ProcessedData> {
284 let start_time = std::time::Instant::now();
285 let file_path = file_path.as_ref().to_path_buf();
286
287 let original_data = std::fs::read(&file_path)?;
289 let original_size = original_data.len() as u64;
290
291 let (compressed_data, compression_info) =
293 if self.config.compression_type != CompressionType::None {
294 let compressed = self.compression_engine.compress(&original_data)?;
295 let _compression_ratio = compressed.compression_percentage();
296 (compressed.data.clone(), Some(compressed))
297 } else {
298 (original_data, None)
299 };
300
301 let compressed_size = compressed_data.len() as u64;
302
303 let (final_data, encryption_info) = if let (Some(engine), Some(key), Some(s)) =
305 (&self.encryption_engine, master_key, salt)
306 {
307 let encrypted = engine.encrypt(&compressed_data, key, s)?;
308 (encrypted.to_bytes(), Some(encrypted))
309 } else {
310 (compressed_data, None)
311 };
312
313 let final_size = final_data.len() as u64;
314 let processing_time = start_time.elapsed().as_millis() as u64;
315
316 let metadata = ProcessingMetadata {
317 original_size,
318 compressed_size,
319 final_size,
320 processing_time_ms: processing_time,
321 compression_ratio: if original_size > 0 {
322 (original_size.saturating_sub(compressed_size) as f64 / original_size as f64)
323 * 100.0
324 } else {
325 0.0
326 },
327 memory_usage: (original_size + compressed_size + final_size),
328 };
329
330 Ok(ProcessedData {
331 original_path: file_path,
332 data: final_data,
333 compression_info,
334 encryption_info,
335 metadata,
336 })
337 }
338
339 pub fn restore_data(
341 &self,
342 processed_data: &ProcessedData,
343 master_key: Option<&MasterKey>,
344 ) -> Result<Vec<u8>> {
345 let mut data = processed_data.data.clone();
346
347 if let Some(encryption_info) = &processed_data.encryption_info {
349 if let (Some(engine), Some(key)) = (&self.encryption_engine, master_key) {
350 data = engine.decrypt(encryption_info, key)?;
351 } else {
352 return Err(BackupError::EncryptionError(
353 "復号化にはマスターキーが必要です".to_string(),
354 ));
355 }
356 }
357
358 if let Some(compression_info) = &processed_data.compression_info {
360 data = self.compression_engine.decompress(compression_info)?;
361 }
362
363 Ok(data)
364 }
365
366 pub fn process_stream<R: Read, W: Write>(
389 &self,
390 reader: R,
391 writer: W,
392 master_key: Option<&MasterKey>,
393 salt: Option<[u8; 16]>,
394 ) -> Result<ProcessingMetadata> {
395 let start_time = std::time::Instant::now();
396
397 let (original_size, compressed_size, final_size) =
400 if let (Some(engine), Some(key), Some(s)) = (&self.encryption_engine, master_key, salt)
401 {
402 self.compress_and_encrypt_stream(reader, writer, engine, key, s)?
404 } else if self.config.compression_type != CompressionType::None {
405 self.compress_stream_only(reader, writer)?
407 } else {
408 self.copy_stream(reader, writer)?
410 };
411
412 let processing_time = start_time.elapsed().as_millis() as u64;
413
414 Ok(ProcessingMetadata {
415 original_size,
416 compressed_size,
417 final_size,
418 processing_time_ms: processing_time,
419 compression_ratio: if original_size > 0 {
420 (original_size.saturating_sub(compressed_size) as f64 / original_size as f64)
421 * 100.0
422 } else {
423 0.0
424 },
425 memory_usage: 2 * 1024 * 1024, })
427 }
428
429 fn compress_and_encrypt_stream<R: Read, W: Write>(
435 &self,
436 reader: R,
437 writer: W,
438 encryption_engine: &EncryptionEngine,
439 master_key: &MasterKey,
440 salt: [u8; 16],
441 ) -> Result<(u64, u64, u64)> {
442 use std::io::Cursor;
443
444 let mut compressed_buffer = Vec::new();
446
447 let compressed_data = self
449 .compression_engine
450 .compress_stream(reader, &mut compressed_buffer)?;
451
452 let original_size = compressed_data.original_size;
453 let compressed_size = compressed_data.compressed_size;
454
455 let compressed_reader = Cursor::new(compressed_buffer);
457 let mut encrypted_buffer = Vec::new();
458
459 let nonce_bytes = crate::crypto::encryption::EncryptionEngine::generate_nonce_internal();
461 encrypted_buffer.extend_from_slice(&nonce_bytes);
462 encrypted_buffer.extend_from_slice(&salt);
463
464 #[allow(deprecated)]
466 let key = aes_gcm::Key::<aes_gcm::Aes256Gcm>::from_slice(master_key.as_bytes());
467 let cipher = aes_gcm::Aes256Gcm::new(key);
468
469 let chunk_size = encryption_engine.get_chunk_size();
470 let mut buffer = vec![0u8; chunk_size];
471 let mut compressed_reader = compressed_reader;
472 let mut chunk_index = 0u64;
473
474 loop {
475 let bytes_read = compressed_reader.read(&mut buffer)?;
476 if bytes_read == 0 {
477 break;
478 }
479
480 let mut chunk_nonce = nonce_bytes;
482 chunk_nonce[4..12].copy_from_slice(&chunk_index.to_le_bytes());
483
484 #[allow(deprecated)]
485 let nonce = aes_gcm::Nonce::from_slice(&chunk_nonce);
486 let chunk_ciphertext = cipher
487 .encrypt(nonce, &buffer[..bytes_read])
488 .map_err(|e| BackupError::EncryptionError(format!("チャンク暗号化エラー: {e}")))?;
489
490 encrypted_buffer.extend_from_slice(&(chunk_ciphertext.len() as u32).to_le_bytes());
492 encrypted_buffer.extend_from_slice(&chunk_ciphertext);
493
494 chunk_index += 1;
495 }
496
497 let final_size = encrypted_buffer.len() as u64;
498
499 let mut writer = writer;
501 writer.write_all(&encrypted_buffer)?;
502
503 Ok((original_size, compressed_size, final_size))
504 }
505
506 fn compress_stream_only<R: Read, W: Write>(
508 &self,
509 reader: R,
510 writer: W,
511 ) -> Result<(u64, u64, u64)> {
512 let compressed_data = self.compression_engine.compress_stream(reader, writer)?;
513
514 let original_size = compressed_data.original_size;
515 let compressed_size = compressed_data.compressed_size;
516
517 Ok((original_size, compressed_size, compressed_size))
518 }
519
520 fn copy_stream<R: Read, W: Write>(
522 &self,
523 mut reader: R,
524 mut writer: W,
525 ) -> Result<(u64, u64, u64)> {
526 let mut total_size = 0u64;
527 let mut buffer = vec![0u8; 1024 * 1024]; loop {
530 let bytes_read = reader.read(&mut buffer)?;
531 if bytes_read == 0 {
532 break;
533 }
534
535 writer.write_all(&buffer[..bytes_read])?;
536 total_size += bytes_read as u64;
537 }
538
539 Ok((total_size, total_size, total_size))
540 }
541
542 pub fn process_files_parallel<P: AsRef<Path> + Send + Sync>(
554 &self,
555 files: &[P],
556 master_key: Option<&MasterKey>,
557 salt: Option<[u8; 16]>,
558 ) -> Vec<Result<ProcessedData>> {
559 if files.is_empty() {
560 return Vec::new();
561 }
562
563 if let Some(pool) = &self.thread_pool {
565 pool.install(|| self.process_files_parallel_internal(files, master_key, salt))
566 } else {
567 self.process_files_parallel_internal(files, master_key, salt)
568 }
569 }
570
571 fn process_files_parallel_internal<P: AsRef<Path> + Send + Sync>(
573 &self,
574 files: &[P],
575 master_key: Option<&MasterKey>,
576 salt: Option<[u8; 16]>,
577 ) -> Vec<Result<ProcessedData>> {
578 let batch_size = self.config.performance.batch_size;
579
580 files
582 .par_chunks(batch_size)
583 .flat_map(|batch| {
584 batch
585 .par_iter()
586 .map(|file| self.process_file(file, master_key, salt))
587 .collect::<Vec<_>>()
588 })
589 .collect()
590 }
591
592 pub fn process_files_with_progress<P, F>(
601 &self,
602 files: &[P],
603 master_key: Option<&MasterKey>,
604 salt: Option<[u8; 16]>,
605 progress_callback: F,
606 ) -> Vec<Result<ProcessedData>>
607 where
608 P: AsRef<Path> + Send + Sync,
609 F: Fn(usize, usize) + Send + Sync,
610 {
611 if files.is_empty() {
612 return Vec::new();
613 }
614
615 let total = files.len();
616 let progress_callback = Arc::new(progress_callback);
617
618 files
619 .par_iter()
620 .enumerate()
621 .map(|(idx, file)| {
622 let result = self.process_file(file, master_key, salt);
623 progress_callback(idx + 1, total);
624 result
625 })
626 .collect()
627 }
628
629 #[must_use]
631 pub fn config(&self) -> &PipelineConfig {
632 &self.config
633 }
634
635 #[must_use]
637 pub fn get_performance_stats(&self) -> PerformanceStats {
638 PerformanceStats {
639 available_threads: self.config.performance.parallel_threads,
640 buffer_size: self.config.performance.buffer_size,
641 memory_limit: self.config.performance.memory_limit,
642 encryption_enabled: self.encryption_engine.is_some(),
643 compression_type: self.config.compression_type,
644 }
645 }
646
647 #[must_use]
649 pub fn is_parallel_ready(&self) -> bool {
650 self.thread_pool.is_some()
651 }
652
653 #[must_use]
655 pub fn current_parallelism(&self) -> usize {
656 self.thread_pool
657 .as_ref()
658 .map(rayon::ThreadPool::current_num_threads)
659 .unwrap_or(1)
660 }
661}
662
663impl Default for ProcessingPipeline {
664 fn default() -> Self {
665 Self::new(PipelineConfig::default())
666 }
667}
668
669#[derive(Debug, Clone)]
671pub struct PerformanceStats {
672 pub available_threads: usize,
673 pub buffer_size: usize,
674 pub memory_limit: usize,
675 pub encryption_enabled: bool,
676 pub compression_type: CompressionType,
677}
678
679mod num_cpus {
684 #[must_use]
685 pub fn get() -> usize {
686 ::num_cpus::get()
687 }
688}
689
690#[cfg(test)]
691mod tests {
692 use super::*;
693 use crate::crypto::MasterKey;
694 use std::io::Cursor;
695
696 #[test]
697 fn test_pipeline_without_encryption() {
698 let config = PipelineConfig::default()
699 .with_compression(CompressionType::Zstd, CompressionConfig::zstd_default());
700 let pipeline = ProcessingPipeline::new(config);
701
702 let test_data = b"Hello, World! This is a test message for compression.".repeat(100);
704 let temp_file = std::env::temp_dir().join("test_pipeline.txt");
705 std::fs::write(&temp_file, &test_data).unwrap();
706
707 let processed = pipeline.process_file(&temp_file, None, None).unwrap();
708
709 assert!(processed.metadata.compressed_size < processed.metadata.original_size);
711 assert!(processed.metadata.compression_ratio > 0.0);
712 assert!(processed.compression_info.is_some());
713 assert!(processed.encryption_info.is_none());
714
715 let restored = pipeline.restore_data(&processed, None).unwrap();
717 assert_eq!(test_data, restored);
718
719 let _ = std::fs::remove_file(&temp_file);
721 }
722
723 #[test]
724 fn test_pipeline_with_encryption() {
725 let config = PipelineConfig::default()
726 .with_encryption(EncryptionConfig::default())
727 .with_compression(CompressionType::Zstd, CompressionConfig::zstd_default());
728 let pipeline = ProcessingPipeline::new(config);
729
730 let master_key = MasterKey::generate();
731 let salt = crate::crypto::key_management::KeyDerivation::generate_salt();
732 let test_data = b"Secret message for encryption and compression test.".repeat(50);
733 let temp_file = std::env::temp_dir().join("test_encrypted.txt");
734 std::fs::write(&temp_file, &test_data).unwrap();
735
736 let processed = pipeline
737 .process_file(&temp_file, Some(&master_key), Some(salt))
738 .unwrap();
739
740 assert!(processed.compression_info.is_some());
742 assert!(processed.encryption_info.is_some());
743 assert!(processed.metadata.final_size > 0);
744
745 let restored = pipeline
747 .restore_data(&processed, Some(&master_key))
748 .unwrap();
749 assert_eq!(test_data, restored);
750
751 let wrong_key = MasterKey::generate();
753 assert!(pipeline.restore_data(&processed, Some(&wrong_key)).is_err());
754
755 let _ = std::fs::remove_file(&temp_file);
757 }
758
759 #[test]
760 fn test_stream_processing() {
761 let config = PipelineConfig::default().fast();
762 let pipeline = ProcessingPipeline::new(config);
763
764 let test_data = b"Stream processing test data. ".repeat(1000);
765 let reader = Cursor::new(&test_data);
766 let mut output = Vec::new();
767
768 let metadata = pipeline
769 .process_stream(reader, &mut output, None, None)
770 .unwrap();
771
772 assert_eq!(metadata.original_size, test_data.len() as u64);
773 assert!(!output.is_empty());
774 }
775
776 #[test]
777 fn test_performance_config() {
778 let fast_config = PipelineConfig::default().fast();
779 let best_config = PipelineConfig::default().best_compression();
780
781 assert!(fast_config.compression.buffer_size >= best_config.compression.buffer_size);
783
784 let pipeline = ProcessingPipeline::new(fast_config);
785 let stats = pipeline.get_performance_stats();
786
787 assert!(stats.available_threads >= 1);
788 assert!(stats.buffer_size > 0);
789 assert_eq!(stats.compression_type, CompressionType::Zstd);
790 }
791
792 #[test]
793 fn test_optimal_parallelism() {
794 let parallelism = optimal_parallelism();
795 let cpus = num_cpus::get();
796
797 assert!(parallelism >= 1);
799 assert!(parallelism <= 32);
800 assert!(parallelism <= cpus);
801 }
802
803 #[test]
804 fn test_dynamic_parallelism() {
805 let parallelism = dynamic_parallelism(2, 1024 * 1024);
807 assert!(parallelism <= 2);
808
809 let parallelism = dynamic_parallelism(100, 512 * 1024);
811 assert!(parallelism >= 1);
812
813 let parallelism = dynamic_parallelism(100, 200 * 1024 * 1024);
815 assert!(parallelism >= optimal_parallelism());
816 }
817
818 #[test]
819 fn test_parallel_processing() {
820 let config = PipelineConfig::default().fast();
821 let pipeline = ProcessingPipeline::new(config);
822
823 let temp_dir = std::env::temp_dir();
825 let test_files: Vec<PathBuf> = (0..10)
826 .map(|i| {
827 let path = temp_dir.join(format!("test_parallel_{i}.txt"));
828 let data = format!("Test data for file {i}").repeat(100);
829 std::fs::write(&path, data).unwrap();
830 path
831 })
832 .collect();
833
834 let results = pipeline.process_files_parallel(&test_files, None, None);
836
837 assert_eq!(results.len(), test_files.len());
839 assert!(results.iter().all(std::result::Result::is_ok));
840
841 assert!(pipeline.is_parallel_ready());
843 assert!(pipeline.current_parallelism() >= 1);
844
845 for file in test_files {
847 let _ = std::fs::remove_file(file);
848 }
849 }
850
851 #[test]
852 fn test_parallel_with_progress() {
853 let config = PipelineConfig::default().fast();
854 let pipeline = ProcessingPipeline::new(config);
855
856 let temp_dir = std::env::temp_dir();
857 let test_files: Vec<PathBuf> = (0..5)
858 .map(|i| {
859 let path = temp_dir.join(format!("test_progress_{i}.txt"));
860 let data = format!("Progress test {i}").repeat(50);
861 std::fs::write(&path, data).unwrap();
862 path
863 })
864 .collect();
865
866 let progress_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
868 let progress_count_clone = Arc::clone(&progress_count);
869
870 let results =
872 pipeline.process_files_with_progress(&test_files, None, None, move |current, total| {
873 progress_count_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
874 assert!(current <= total);
875 });
876
877 assert_eq!(results.len(), test_files.len());
878 assert!(results.iter().all(std::result::Result::is_ok));
879
880 assert_eq!(
882 progress_count.load(std::sync::atomic::Ordering::SeqCst),
883 test_files.len()
884 );
885
886 for file in test_files {
888 let _ = std::fs::remove_file(file);
889 }
890 }
891
892 #[test]
893 fn test_custom_parallelism() {
894 let perf_config = PerformanceConfig::default()
895 .with_parallelism(4)
896 .with_batch_size(8);
897
898 assert_eq!(perf_config.parallel_threads, 4);
899 assert_eq!(perf_config.batch_size, 8);
900
901 let pipeline_config = PipelineConfig {
903 performance: perf_config,
904 ..Default::default()
905 };
906
907 let pipeline = ProcessingPipeline::new(pipeline_config);
908 assert_eq!(pipeline.current_parallelism(), 4);
909 }
910
911 #[test]
912 fn test_pipeline_config_builder() {
913 let default_config = PipelineConfig::default();
915 assert!(default_config.encryption.is_none());
916 assert_eq!(default_config.compression_type, CompressionType::Zstd);
917
918 let enc_config = PipelineConfig::default().with_encryption(EncryptionConfig::default());
920 assert!(enc_config.encryption.is_some());
921
922 let comp_config = PipelineConfig::default()
924 .with_compression(CompressionType::Gzip, CompressionConfig::gzip_default());
925 assert_eq!(comp_config.compression_type, CompressionType::Gzip);
926
927 let fast_config = PipelineConfig::default().fast();
929 assert_eq!(fast_config.performance.parallel_threads, num_cpus::get());
930 assert_eq!(fast_config.performance.buffer_size, 2 * 1024 * 1024);
931
932 let best_config = PipelineConfig::default().best_compression();
934 assert_eq!(
935 best_config.performance.parallel_threads,
936 (num_cpus::get() / 2).max(1)
937 );
938 }
939
940 #[test]
941 fn test_performance_config_presets() {
942 let fast = PerformanceConfig::fast();
944 assert_eq!(fast.parallel_threads, num_cpus::get());
945 assert_eq!(fast.buffer_size, 2 * 1024 * 1024);
946 assert_eq!(fast.memory_limit, 1024 * 1024 * 1024);
947 assert_eq!(fast.batch_size, 64);
948
949 let quality = PerformanceConfig::quality();
951 assert_eq!(quality.parallel_threads, (num_cpus::get() / 2).max(1));
952 assert_eq!(quality.buffer_size, 512 * 1024);
953 assert_eq!(quality.memory_limit, 256 * 1024 * 1024);
954 assert_eq!(quality.batch_size, 16);
955
956 let default = PerformanceConfig::default();
958 assert_eq!(default.parallel_threads, optimal_parallelism());
959 assert_eq!(default.buffer_size, 1024 * 1024);
960 assert_eq!(default.memory_limit, 512 * 1024 * 1024);
961 assert_eq!(default.batch_size, 32);
962 }
963
964 #[test]
965 fn test_performance_config_custom_values() {
966 let config = PerformanceConfig::default().with_parallelism(0);
968 assert_eq!(config.parallel_threads, 1);
969
970 let config = PerformanceConfig::default().with_batch_size(0);
972 assert_eq!(config.batch_size, 1);
973
974 let config = PerformanceConfig::default()
976 .with_parallelism(100)
977 .with_batch_size(1000);
978 assert_eq!(config.parallel_threads, 100);
979 assert_eq!(config.batch_size, 1000);
980 }
981
982 #[test]
983 fn test_compression_type_variations() {
984 let zstd_config = PipelineConfig::default()
986 .with_compression(CompressionType::Zstd, CompressionConfig::zstd_default());
987 let pipeline = ProcessingPipeline::new(zstd_config);
988
989 let test_data = b"Zstd compression test. ".repeat(100);
990 let temp_file = std::env::temp_dir().join("test_zstd.txt");
991 std::fs::write(&temp_file, &test_data).unwrap();
992
993 let processed = pipeline.process_file(&temp_file, None, None).unwrap();
994 assert!(processed.compression_info.is_some());
995 assert_eq!(processed.metadata.original_size, test_data.len() as u64);
996
997 std::fs::remove_file(&temp_file).ok();
998
999 let gzip_config = PipelineConfig::default()
1001 .with_compression(CompressionType::Gzip, CompressionConfig::gzip_default());
1002 let pipeline = ProcessingPipeline::new(gzip_config);
1003
1004 let test_data = b"Gzip compression test. ".repeat(100);
1005 let temp_file = std::env::temp_dir().join("test_gzip.txt");
1006 std::fs::write(&temp_file, &test_data).unwrap();
1007
1008 let processed = pipeline.process_file(&temp_file, None, None).unwrap();
1009 assert!(processed.compression_info.is_some());
1010 assert_eq!(processed.metadata.original_size, test_data.len() as u64);
1011
1012 std::fs::remove_file(&temp_file).ok();
1013 }
1014
1015 #[test]
1016 fn test_empty_file_processing() {
1017 let config = PipelineConfig::default();
1018 let pipeline = ProcessingPipeline::new(config);
1019
1020 let temp_file = std::env::temp_dir().join("test_empty.txt");
1021 std::fs::write(&temp_file, b"").unwrap();
1022
1023 let processed = pipeline.process_file(&temp_file, None, None).unwrap();
1024 assert_eq!(processed.metadata.original_size, 0);
1025 assert_eq!(
1026 processed.metadata.final_size,
1027 processed.metadata.compressed_size
1028 );
1029
1030 let restored = pipeline.restore_data(&processed, None).unwrap();
1031 assert!(restored.is_empty());
1032
1033 std::fs::remove_file(&temp_file).ok();
1034 }
1035
1036 #[test]
1037 fn test_large_file_processing() {
1038 let config = PipelineConfig::default().fast();
1039 let pipeline = ProcessingPipeline::new(config);
1040
1041 let test_data = vec![b'A'; 10 * 1024 * 1024];
1043 let temp_file = std::env::temp_dir().join("test_large.txt");
1044 std::fs::write(&temp_file, &test_data).unwrap();
1045
1046 let processed = pipeline.process_file(&temp_file, None, None).unwrap();
1047 assert_eq!(processed.metadata.original_size, test_data.len() as u64);
1048 assert!(processed.metadata.compressed_size < processed.metadata.original_size);
1049
1050 assert!(processed.metadata.compression_ratio > 0.9);
1052
1053 std::fs::remove_file(&temp_file).ok();
1054 }
1055
1056 #[test]
1057 fn test_encryption_config_without_key_skips_encryption() {
1058 let config = PipelineConfig::default().with_encryption(EncryptionConfig::default());
1060 let pipeline = ProcessingPipeline::new(config);
1061
1062 let test_data = b"Test data without key".repeat(10);
1063 let temp_file = std::env::temp_dir().join("test_no_key.txt");
1064 std::fs::write(&temp_file, &test_data).unwrap();
1065
1066 let processed = pipeline.process_file(&temp_file, None, None).unwrap();
1068 assert!(processed.encryption_info.is_none());
1069 assert!(processed.compression_info.is_some());
1070
1071 std::fs::remove_file(&temp_file).ok();
1072 }
1073
1074 #[test]
1075 fn test_restore_with_wrong_key() {
1076 let config = PipelineConfig::default().with_encryption(EncryptionConfig::default());
1077 let pipeline = ProcessingPipeline::new(config);
1078
1079 let master_key = MasterKey::generate();
1080 let salt = crate::crypto::key_management::KeyDerivation::generate_salt();
1081 let test_data = b"Secret data for key test".repeat(10);
1082 let temp_file = std::env::temp_dir().join("test_wrong_key.txt");
1083 std::fs::write(&temp_file, &test_data).unwrap();
1084
1085 let processed = pipeline
1086 .process_file(&temp_file, Some(&master_key), Some(salt))
1087 .unwrap();
1088
1089 let wrong_key = MasterKey::generate();
1091 let result = pipeline.restore_data(&processed, Some(&wrong_key));
1092 assert!(result.is_err());
1093
1094 std::fs::remove_file(&temp_file).ok();
1095 }
1096
1097 #[test]
1098 fn test_restore_without_key() {
1099 let config = PipelineConfig::default().with_encryption(EncryptionConfig::default());
1100 let pipeline = ProcessingPipeline::new(config);
1101
1102 let master_key = MasterKey::generate();
1103 let salt = crate::crypto::key_management::KeyDerivation::generate_salt();
1104 let test_data = b"Encrypted data".repeat(10);
1105 let temp_file = std::env::temp_dir().join("test_restore_no_key.txt");
1106 std::fs::write(&temp_file, &test_data).unwrap();
1107
1108 let processed = pipeline
1109 .process_file(&temp_file, Some(&master_key), Some(salt))
1110 .unwrap();
1111
1112 let result = pipeline.restore_data(&processed, None);
1114 assert!(result.is_err());
1115
1116 std::fs::remove_file(&temp_file).ok();
1117 }
1118
1119 #[test]
1120 fn test_parallel_error_handling() {
1121 let config = PipelineConfig::default().fast();
1122 let pipeline = ProcessingPipeline::new(config);
1123
1124 let nonexistent_files: Vec<PathBuf> = (0..5)
1126 .map(|i| PathBuf::from(format!("/tmp/nonexistent_file_{i}.txt")))
1127 .collect();
1128
1129 let results = pipeline.process_files_parallel(&nonexistent_files, None, None);
1130
1131 assert_eq!(results.len(), nonexistent_files.len());
1133 assert!(results.iter().all(std::result::Result::is_err));
1134 }
1135
1136 #[test]
1137 fn test_parallel_mixed_results() {
1138 let config = PipelineConfig::default().fast();
1139 let pipeline = ProcessingPipeline::new(config);
1140
1141 let temp_dir = std::env::temp_dir();
1142 let mut files = Vec::new();
1143
1144 for i in 0..3 {
1146 let path = temp_dir.join(format!("test_mixed_{i}.txt"));
1147 std::fs::write(&path, format!("Test data {i}").repeat(10)).unwrap();
1148 files.push(path);
1149 }
1150
1151 files.push(PathBuf::from("/tmp/nonexistent_1.txt"));
1153 files.push(PathBuf::from("/tmp/nonexistent_2.txt"));
1154
1155 let results = pipeline.process_files_parallel(&files, None, None);
1156
1157 assert_eq!(results.len(), files.len());
1159
1160 let success_count = results.iter().filter(|r| r.is_ok()).count();
1162 let error_count = results.iter().filter(|r| r.is_err()).count();
1163
1164 assert_eq!(success_count, 3);
1165 assert_eq!(error_count, 2);
1166
1167 for i in 0..3 {
1169 let path = temp_dir.join(format!("test_mixed_{i}.txt"));
1170 std::fs::remove_file(path).ok();
1171 }
1172 }
1173
1174 #[test]
1175 fn test_stream_empty_data() {
1176 let config = PipelineConfig::default();
1177 let pipeline = ProcessingPipeline::new(config);
1178
1179 let empty_data: &[u8] = &[];
1180 let reader = Cursor::new(empty_data);
1181 let mut output = Vec::new();
1182
1183 let metadata = pipeline
1184 .process_stream(reader, &mut output, None, None)
1185 .unwrap();
1186
1187 assert_eq!(metadata.original_size, 0);
1188 assert_eq!(metadata.final_size, metadata.compressed_size);
1189 }
1190
1191 #[test]
1192 fn test_stream_large_data() {
1193 let config = PipelineConfig::default().fast();
1194 let pipeline = ProcessingPipeline::new(config);
1195
1196 let large_data = vec![b'B'; 5 * 1024 * 1024];
1198 let reader = Cursor::new(&large_data);
1199 let mut output = Vec::new();
1200
1201 let metadata = pipeline
1202 .process_stream(reader, &mut output, None, None)
1203 .unwrap();
1204
1205 assert_eq!(metadata.original_size, large_data.len() as u64);
1206 assert!(metadata.compressed_size < metadata.original_size);
1207 assert!(!output.is_empty());
1208 }
1209
1210 #[test]
1211 fn test_performance_stats() {
1212 let config = PipelineConfig::default()
1213 .fast()
1214 .with_compression(CompressionType::Gzip, CompressionConfig::gzip_default());
1215 let pipeline = ProcessingPipeline::new(config);
1216
1217 let stats = pipeline.get_performance_stats();
1218
1219 assert_eq!(stats.available_threads, num_cpus::get());
1220 assert_eq!(stats.buffer_size, 2 * 1024 * 1024);
1221 assert_eq!(stats.compression_type, CompressionType::Gzip);
1222 assert!(!stats.encryption_enabled);
1223 }
1224
1225 #[test]
1226 fn test_batch_processing() {
1227 let config = PerformanceConfig::default()
1228 .with_batch_size(3)
1229 .with_parallelism(2);
1230
1231 assert_eq!(config.batch_size, 3);
1232 assert_eq!(config.parallel_threads, 2);
1233
1234 let pipeline_config = PipelineConfig {
1235 performance: config,
1236 ..Default::default()
1237 };
1238
1239 let pipeline = ProcessingPipeline::new(pipeline_config);
1240
1241 let temp_dir = std::env::temp_dir();
1243 let test_files: Vec<PathBuf> = (0..10)
1244 .map(|i| {
1245 let path = temp_dir.join(format!("test_batch_{i}.txt"));
1246 std::fs::write(&path, format!("Batch test {i}").repeat(10)).unwrap();
1247 path
1248 })
1249 .collect();
1250
1251 let results = pipeline.process_files_parallel(&test_files, None, None);
1252
1253 assert_eq!(results.len(), 10);
1254 assert!(results.iter().all(std::result::Result::is_ok));
1255
1256 for file in test_files {
1258 std::fs::remove_file(file).ok();
1259 }
1260 }
1261
1262 #[test]
1263 fn test_compression_ratio_calculation() {
1264 let config = PipelineConfig::default();
1265 let pipeline = ProcessingPipeline::new(config);
1266
1267 let compressible_data = b"A".repeat(10000);
1269 let temp_file = std::env::temp_dir().join("test_ratio.txt");
1270 std::fs::write(&temp_file, &compressible_data).unwrap();
1271
1272 let processed = pipeline.process_file(&temp_file, None, None).unwrap();
1273
1274 assert!(processed.compression_info.is_some());
1276 assert!(processed.metadata.compressed_size < processed.metadata.original_size);
1277
1278 assert!(processed.metadata.compression_ratio >= 0.0);
1280
1281 assert!(processed.metadata.compressed_size > 0);
1284
1285 std::fs::remove_file(&temp_file).ok();
1286 }
1287}