1#![allow(dead_code, unused_imports, unused_variables)]
10
11use async_trait::async_trait;
33use byte_unit::Byte;
34use futures::future;
35use std::path::PathBuf;
36use std::sync::Arc;
37use tokio::sync::RwLock;
38use tracing::{debug, info, warn};
39
40use adaptive_pipeline_domain::aggregates::PipelineAggregate;
41use adaptive_pipeline_domain::entities::pipeline_stage::StageType;
42use adaptive_pipeline_domain::entities::{
43 Pipeline, PipelineStage, ProcessingContext, ProcessingMetrics, SecurityContext,
44};
45use adaptive_pipeline_domain::repositories::stage_executor::ResourceRequirements;
46use adaptive_pipeline_domain::repositories::{PipelineRepository, StageExecutor};
47use adaptive_pipeline_domain::services::file_io_service::{FileIOService, ReadOptions};
48use adaptive_pipeline_domain::services::file_processor_service::ChunkProcessor;
49use adaptive_pipeline_domain::services::{
50 CompressionService, EncryptionService, ExecutionRecord, ExecutionState, ExecutionStatus, KeyMaterial,
51 PipelineRequirements, PipelineService,
52};
53use adaptive_pipeline_domain::value_objects::{ChunkFormat, FileChunk, PipelineId, WorkerCount};
54use adaptive_pipeline_domain::PipelineError;
55
56use crate::infrastructure::services::binary_format::{BinaryFormatService, BinaryFormatWriter};
57use crate::infrastructure::services::progress_indicator::ProgressIndicatorService;
58
59#[derive(Debug)]
108struct ChunkMessage {
109 chunk_index: usize,
111
112 data: Vec<u8>,
114
115 is_final: bool,
117
118 file_chunk: FileChunk,
120
121 enqueued_at: std::time::Instant,
123}
124
125#[derive(Debug)]
137struct ProcessedChunkMessage {
138 chunk_index: usize,
140
141 processed_data: Vec<u8>,
143
144 is_final: bool,
146}
147
148#[derive(Debug)]
150struct ReaderStats {
151 chunks_read: usize,
152 bytes_read: u64,
153}
154
155#[derive(Debug)]
157struct WorkerStats {
158 worker_id: usize,
159 chunks_processed: usize,
160}
161
162#[derive(Debug)]
164struct WriterStats {
165 chunks_written: usize,
166 bytes_written: u64,
167}
168
169async fn reader_task(
198 input_path: PathBuf,
199 chunk_size: usize,
200 tx_cpu: tokio::sync::mpsc::Sender<ChunkMessage>,
201 file_io_service: Arc<dyn FileIOService>,
202 channel_capacity: usize,
203 cancel_token: adaptive_pipeline_bootstrap::shutdown::CancellationToken,
204) -> Result<ReaderStats, PipelineError> {
205 use crate::infrastructure::metrics::CONCURRENCY_METRICS;
206
207 if cancel_token.is_cancelled() {
209 return Err(PipelineError::cancelled());
210 }
211
212 let read_options = ReadOptions {
214 chunk_size: Some(chunk_size),
215 use_memory_mapping: false, calculate_checksums: false, ..Default::default()
218 };
219
220 let read_result = file_io_service
222 .read_file_chunks(&input_path, read_options)
223 .await
224 .map_err(|e| PipelineError::IoError(format!("Failed to read file chunks: {}", e)))?;
225
226 let total_chunks = read_result.chunks.len();
227 let mut bytes_read = 0u64;
228
229 for (index, file_chunk) in read_result.chunks.into_iter().enumerate() {
231 let chunk_data = file_chunk.data().to_vec();
232 let chunk_size_bytes = chunk_data.len() as u64;
233 bytes_read += chunk_size_bytes;
234
235 let message = ChunkMessage {
236 chunk_index: index,
237 data: chunk_data,
238 is_final: index == total_chunks - 1,
239 file_chunk,
240 enqueued_at: std::time::Instant::now(), };
242
243 tokio::select! {
248 _ = cancel_token.cancelled() => {
249 return Err(PipelineError::cancelled_with_msg("reader cancelled during send"));
250 }
251 send_result = tx_cpu.send(message) => {
252 send_result.map_err(|_e| PipelineError::io_error("CPU worker channel closed unexpectedly"))?;
253 }
254 }
255
256 let remaining_capacity = tx_cpu.capacity();
259 let current_depth = channel_capacity.saturating_sub(remaining_capacity);
260 CONCURRENCY_METRICS.update_cpu_queue_depth(current_depth);
261 }
262
263 drop(tx_cpu);
266
267 Ok(ReaderStats {
268 chunks_read: total_chunks,
269 bytes_read,
270 })
271}
272
273struct CpuWorkerContext {
277 writer: Arc<dyn BinaryFormatWriter>,
278 pipeline: Arc<Pipeline>,
279 stage_executor: Arc<dyn StageExecutor>,
280 input_path: PathBuf,
281 output_path: PathBuf,
282 input_size: u64,
283 security_context: SecurityContext,
284}
285
286#[allow(dead_code)]
313async fn cpu_worker_task(
314 worker_id: usize,
315 mut rx_cpu: tokio::sync::mpsc::Receiver<ChunkMessage>,
316 ctx: CpuWorkerContext,
317) -> Result<WorkerStats, PipelineError> {
318 use crate::infrastructure::metrics::CONCURRENCY_METRICS;
319 use crate::infrastructure::runtime::RESOURCE_MANAGER;
320
321 let mut chunks_processed = 0;
322
323 while let Some(chunk_msg) = rx_cpu.recv().await {
325 let cpu_wait_start = std::time::Instant::now();
331 let _cpu_permit = RESOURCE_MANAGER
332 .acquire_cpu()
333 .await
334 .map_err(|e| PipelineError::resource_exhausted(format!("Failed to acquire CPU token: {}", e)))?;
335 let cpu_wait_duration = cpu_wait_start.elapsed();
336
337 CONCURRENCY_METRICS.record_cpu_wait(cpu_wait_duration);
338 CONCURRENCY_METRICS.worker_started();
339
340 let mut local_context = ProcessingContext::new(
346 ctx.input_size,
347 ctx.security_context.clone(),
348 );
349
350 let mut file_chunk = chunk_msg.file_chunk;
353
354 for stage in ctx.pipeline.stages() {
355 file_chunk = ctx
356 .stage_executor
357 .execute(stage, file_chunk, &mut local_context)
358 .await
359 .map_err(|e| PipelineError::processing_failed(format!("Stage execution failed: {}", e)))?;
360 }
361
362 let (nonce, chunk_data) = if file_chunk.data().len() >= 12 {
374 let is_encrypted = local_context
376 .metadata()
377 .get("encrypted")
378 .map(|v| v == "true")
379 .unwrap_or(false);
380
381 if is_encrypted {
382 let mut nonce_array = [0u8; 12];
384 nonce_array.copy_from_slice(&file_chunk.data()[..12]);
385 (nonce_array, file_chunk.data()[12..].to_vec())
386 } else {
387 ([0u8; 12], file_chunk.data().to_vec())
389 }
390 } else {
391 ([0u8; 12], file_chunk.data().to_vec())
393 };
394
395 let chunk_format = ChunkFormat::new(nonce, chunk_data);
397
398 ctx.writer
400 .write_chunk_at_position(chunk_format, chunk_msg.chunk_index as u64)
401 .await?;
402
403 CONCURRENCY_METRICS.worker_completed();
405 chunks_processed += 1;
406 }
407
408 Ok(WorkerStats {
409 worker_id,
410 chunks_processed,
411 })
412}
413
414pub struct ConcurrentPipeline {
419 compression_service: Arc<dyn CompressionService>,
420 encryption_service: Arc<dyn EncryptionService>,
421 file_io_service: Arc<dyn FileIOService>,
422 pipeline_repository: Arc<dyn PipelineRepository>,
423 stage_executor: Arc<dyn StageExecutor>,
424 binary_format_service: Arc<dyn BinaryFormatService>,
425 active_pipelines: Arc<RwLock<std::collections::HashMap<String, PipelineAggregate>>>,
426}
427
428impl ConcurrentPipeline {
429 pub fn new(
439 compression_service: Arc<dyn CompressionService>,
440 encryption_service: Arc<dyn EncryptionService>,
441 file_io_service: Arc<dyn FileIOService>,
442 pipeline_repository: Arc<dyn PipelineRepository>,
443 stage_executor: Arc<dyn StageExecutor>,
444 binary_format_service: Arc<dyn BinaryFormatService>,
445 ) -> Self {
446 Self {
447 compression_service,
448 encryption_service,
449 file_io_service,
450 pipeline_repository,
451 stage_executor,
452 binary_format_service,
453 active_pipelines: Arc::new(RwLock::new(std::collections::HashMap::new())),
454 }
455 }
456
457 async fn process_chunk_through_stage(
459 &self,
460 chunk: FileChunk,
461 stage: &PipelineStage,
462 context: &mut ProcessingContext,
463 ) -> Result<FileChunk, PipelineError> {
464 debug!("Processing chunk through stage: {}", stage.name());
465
466 match stage.stage_type() {
467 StageType::Compression => {
468 let compression_config = self.extract_compression_config(stage)?;
470 self.compression_service
471 .compress_chunk(chunk, &compression_config, context)
472 }
473 StageType::Encryption => {
474 let encryption_config = self.extract_encryption_config(stage)?;
475 let key_material = KeyMaterial {
478 key: vec![0u8; 32], nonce: vec![0u8; 12], salt: vec![0u8; 32], algorithm: encryption_config.algorithm.clone(),
482 created_at: chrono::Utc::now(),
483 expires_at: None,
484 };
485 self.encryption_service
486 .encrypt_chunk(chunk, &encryption_config, &key_material, context)
487 }
488
489 StageType::Checksum => {
490 Ok(chunk)
493 }
494 StageType::Transform => {
495 self.stage_executor.execute(stage, chunk.clone(), context).await
497 }
498 StageType::PassThrough => {
499 self.stage_executor.execute(stage, chunk.clone(), context).await
501 }
502 }
503 }
504
505 fn extract_compression_config(
507 &self,
508 stage: &PipelineStage,
509 ) -> Result<adaptive_pipeline_domain::services::CompressionConfig, PipelineError> {
510 let algorithm_str = stage.configuration().algorithm.as_str();
511 let algorithm = match algorithm_str {
512 "brotli" => adaptive_pipeline_domain::services::CompressionAlgorithm::Brotli,
513 "gzip" => adaptive_pipeline_domain::services::CompressionAlgorithm::Gzip,
514 "zstd" => adaptive_pipeline_domain::services::CompressionAlgorithm::Zstd,
515 "lz4" => adaptive_pipeline_domain::services::CompressionAlgorithm::Lz4,
516 _ => {
517 return Err(PipelineError::InvalidConfiguration(format!(
518 "Unsupported compression algorithm: {}",
519 algorithm_str
520 )));
521 }
522 };
523
524 let level = stage
526 .configuration()
527 .parameters
528 .get("level")
529 .and_then(|v| v.parse::<u32>().ok())
530 .map(|l| match l {
531 0..=3 => adaptive_pipeline_domain::services::CompressionLevel::Fast,
532 4..=6 => adaptive_pipeline_domain::services::CompressionLevel::Balanced,
533 7.. => adaptive_pipeline_domain::services::CompressionLevel::Best,
534 })
535 .unwrap_or(adaptive_pipeline_domain::services::CompressionLevel::Balanced);
536
537 Ok(adaptive_pipeline_domain::services::CompressionConfig {
538 algorithm,
539 level,
540 dictionary: None,
541 window_size: None,
542 parallel_processing: stage.configuration().parallel_processing,
543 })
544 }
545
546 fn extract_encryption_config(
548 &self,
549 stage: &PipelineStage,
550 ) -> Result<adaptive_pipeline_domain::services::EncryptionConfig, PipelineError> {
551 let algorithm_str = stage.configuration().algorithm.as_str();
552 let algorithm = match algorithm_str {
553 "aes256-gcm" | "aes256gcm" => adaptive_pipeline_domain::services::EncryptionAlgorithm::Aes256Gcm,
554 "chacha20-poly1305" | "chacha20poly1305" => {
555 adaptive_pipeline_domain::services::EncryptionAlgorithm::ChaCha20Poly1305
556 }
557 "aes128-gcm" | "aes128gcm" => adaptive_pipeline_domain::services::EncryptionAlgorithm::Aes128Gcm,
558 "aes192-gcm" | "aes192gcm" => adaptive_pipeline_domain::services::EncryptionAlgorithm::Aes192Gcm,
559 _ => {
560 return Err(PipelineError::InvalidConfiguration(format!(
561 "Unsupported encryption algorithm: {}",
562 algorithm_str
563 )));
564 }
565 };
566
567 let kdf = stage
568 .configuration()
569 .parameters
570 .get("kdf")
571 .map(|kdf_str| match kdf_str.as_str() {
572 "argon2" => adaptive_pipeline_domain::services::KeyDerivationFunction::Argon2,
573 "scrypt" => adaptive_pipeline_domain::services::KeyDerivationFunction::Scrypt,
574 "pbkdf2" => adaptive_pipeline_domain::services::KeyDerivationFunction::Pbkdf2,
575 _ => adaptive_pipeline_domain::services::KeyDerivationFunction::Argon2,
576 });
577
578 Ok(adaptive_pipeline_domain::services::EncryptionConfig {
579 algorithm,
580 key_derivation: kdf.unwrap_or(adaptive_pipeline_domain::services::KeyDerivationFunction::Argon2),
581 key_size: 32, nonce_size: 12, salt_size: 16, iterations: 100_000, memory_cost: Some(65536), parallel_cost: Some(1), associated_data: None, })
589 }
590
591 fn update_metrics(&self, context: &mut ProcessingContext, stage_name: &str, duration: std::time::Duration) {
593 let mut metrics = context.metrics().clone();
594
595 let mut stage_metrics =
597 adaptive_pipeline_domain::entities::processing_metrics::StageMetrics::new(stage_name.to_string());
598 stage_metrics.update(metrics.bytes_processed(), duration);
599 metrics.add_stage_metrics(stage_metrics);
600
601 context.update_metrics(metrics);
602 }
603}
604
605#[async_trait]
606impl PipelineService for ConcurrentPipeline {
607 async fn process_file(
608 &self,
609 input_path: &std::path::Path,
610 output_path: &std::path::Path,
611 context: adaptive_pipeline_domain::services::pipeline_service::ProcessFileContext,
612 ) -> Result<ProcessingMetrics, PipelineError> {
613 debug!(
614 "Processing file: {} -> {} with pipeline {} (.adapipe format)",
615 input_path.display(),
616 output_path.display(),
617 context.pipeline_id
618 );
619
620 let start_time = std::time::Instant::now();
621
622 let pipeline = self
624 .pipeline_repository
625 .find_by_id(context.pipeline_id.clone())
626 .await?
627 .ok_or_else(|| PipelineError::PipelineNotFound(context.pipeline_id.to_string()))?;
628
629 self.validate_pipeline(&pipeline).await?;
631
632 let input_metadata = tokio::fs::metadata(input_path)
634 .await
635 .map_err(|e| PipelineError::IoError(e.to_string()))?;
636 let input_size = input_metadata.len();
637
638 let chunk_size = adaptive_pipeline_domain::value_objects::ChunkSize::optimal_for_file_size(input_size).bytes();
640
641 let read_options = adaptive_pipeline_domain::services::file_io_service::ReadOptions {
644 chunk_size: Some(chunk_size),
645 use_memory_mapping: false, calculate_checksums: false, ..Default::default()
648 };
649
650 let read_result = self.file_io_service.read_file_chunks(input_path, read_options).await?;
651
652 let input_chunks = read_result.chunks;
653
654 let original_checksum = {
657 let mut context = ring::digest::Context::new(&ring::digest::SHA256);
658 for chunk in &input_chunks {
659 context.update(chunk.data());
660 }
661 let digest = context.finish();
662 hex::encode(digest.as_ref())
663 };
664
665 debug!(
666 "Input file: {}, SHA256: {}",
667 Byte::from_u128(input_size as u128)
668 .unwrap_or_else(|| Byte::from_u64(0))
669 .get_appropriate_unit(byte_unit::UnitType::Decimal)
670 .to_string(),
671 original_checksum
672 );
673
674 let mut header = adaptive_pipeline_domain::value_objects::FileHeader::new(
676 input_path
677 .file_name()
678 .and_then(|n| n.to_str())
679 .unwrap_or("unknown")
680 .to_string(),
681 input_size,
682 original_checksum.clone(),
683 );
684
685 for stage in pipeline.stages() {
687 debug!(
688 "Processing pipeline stage: name='{}', type='{:?}', algorithm='{}'",
689 stage.name(),
690 stage.stage_type(),
691 stage.configuration().algorithm
692 );
693 match stage.stage_type() {
694 adaptive_pipeline_domain::entities::pipeline_stage::StageType::Compression => {
695 debug!("✅ Matched Compression stage: {}", stage.name());
696 let config = self.extract_compression_config(stage)?;
697 let algorithm_str = match config.algorithm {
698 adaptive_pipeline_domain::services::CompressionAlgorithm::Brotli => "brotli",
699 adaptive_pipeline_domain::services::CompressionAlgorithm::Gzip => "gzip",
700 adaptive_pipeline_domain::services::CompressionAlgorithm::Zstd => "zstd",
701 adaptive_pipeline_domain::services::CompressionAlgorithm::Lz4 => "lz4",
702 adaptive_pipeline_domain::services::CompressionAlgorithm::Custom(ref name) => name.as_str(),
703 };
704 let level = match config.level {
705 adaptive_pipeline_domain::services::CompressionLevel::Fastest => 1,
706 adaptive_pipeline_domain::services::CompressionLevel::Fast => 3,
707 adaptive_pipeline_domain::services::CompressionLevel::Balanced => 6,
708 adaptive_pipeline_domain::services::CompressionLevel::Best => 9,
709 adaptive_pipeline_domain::services::CompressionLevel::Custom(level) => level,
710 };
711 header = header.add_compression_step(algorithm_str, level);
712 }
713 adaptive_pipeline_domain::entities::pipeline_stage::StageType::Encryption => {
714 debug!("✅ Matched Encryption stage: {}", stage.name());
715 let config = self.extract_encryption_config(stage)?;
716 let algorithm_str = match config.algorithm {
717 adaptive_pipeline_domain::services::EncryptionAlgorithm::Aes128Gcm => "aes128gcm",
718 adaptive_pipeline_domain::services::EncryptionAlgorithm::Aes192Gcm => "aes192gcm",
719 adaptive_pipeline_domain::services::EncryptionAlgorithm::Aes256Gcm => "aes256gcm",
720 adaptive_pipeline_domain::services::EncryptionAlgorithm::ChaCha20Poly1305 => "chacha20poly1305",
721 adaptive_pipeline_domain::services::EncryptionAlgorithm::Custom(ref name) => name.as_str(),
722 };
723 header = header.add_encryption_step(algorithm_str, "argon2", 32, 12);
724 }
725 adaptive_pipeline_domain::entities::pipeline_stage::StageType::Checksum => {
726 debug!("✅ Matched Checksum stage: {}", stage.name());
727 header = header.add_checksum_step(stage.configuration().algorithm.as_str());
729 }
730 adaptive_pipeline_domain::entities::pipeline_stage::StageType::PassThrough => {
731 debug!("✅ Matched PassThrough stage: {}", stage.name());
732 header = header.add_passthrough_step(stage.configuration().algorithm.as_str());
734 }
735 _ => {
736 debug!(
738 "⚠️ Unhandled stage type: name='{}', type='{:?}', algorithm='{}'",
739 stage.name(),
740 stage.stage_type(),
741 stage.configuration().algorithm
742 );
743 header = header.add_custom_step(
744 stage.name(),
745 stage.configuration().algorithm.as_str(),
746 stage.configuration().parameters.clone(),
747 );
748 }
749 }
750 }
751
752 header = header
754 .with_chunk_info(chunk_size as u32, 0) .with_pipeline_id(context.pipeline_id.to_string());
756
757 let security_context_for_tasks = context.security_context.clone();
759
760 let mut processing_context = ProcessingContext::new(
761 input_size,
762 context.security_context,
763 );
764
765 {
767 let mut metrics = processing_context.metrics().clone();
768 metrics.set_input_file_info(input_size, Some(original_checksum.clone()));
769 processing_context.update_metrics(metrics);
770 }
771
772 let total_chunks = (input_size as usize).div_ceil(chunk_size);
793
794 let binary_writer = self
798 .binary_format_service
799 .create_writer(output_path, header.clone())
800 .await?;
801 let writer_shared = Arc::new(binary_writer);
802
803 let progress_indicator = Arc::new(ProgressIndicatorService::new(total_chunks as u64));
805
806 let available_cores = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(4);
808 let is_cpu_intensive = pipeline.stages().iter().any(|stage| {
809 matches!(stage.stage_type(), StageType::Checksum)
810 && (stage.name().contains("compression") || stage.name().contains("encryption"))
811 });
812
813 let optimal_worker_count =
814 WorkerCount::optimal_for_processing_type(input_size, available_cores, is_cpu_intensive);
815
816 let worker_count = if let Some(user_workers) = context.user_worker_override {
817 let validated = WorkerCount::validate_user_input(user_workers, available_cores, input_size);
818 match validated {
819 Ok(count) => {
820 debug!("Using user-specified worker count: {} (validated)", count);
821 count
822 }
823 Err(warning) => {
824 warn!(
825 "User worker count invalid: {}. Using adaptive: {}",
826 warning,
827 optimal_worker_count.count()
828 );
829 optimal_worker_count.count()
830 }
831 }
832 } else {
833 debug!("Using adaptive worker count: {}", optimal_worker_count.count());
834 optimal_worker_count.count()
835 };
836
837 debug!(
838 "Channel-based pipeline: {} workers for {} bytes ({})",
839 worker_count,
840 input_size,
841 WorkerCount::strategy_description(input_size)
842 );
843
844 let shutdown_coordinator =
848 adaptive_pipeline_bootstrap::shutdown::ShutdownCoordinator::new(std::time::Duration::from_secs(5));
849 let cancel_token = shutdown_coordinator.token();
850
851 let channel_depth = context.channel_depth_override.unwrap_or(4);
854 debug!("Using channel depth: {}", channel_depth);
855 let (tx_cpu, rx_cpu) = tokio::sync::mpsc::channel::<ChunkMessage>(channel_depth);
856
857 let rx_cpu_shared = Arc::new(tokio::sync::Mutex::new(rx_cpu));
861
862 let reader_handle = tokio::spawn(reader_task(
865 input_path.to_path_buf(),
866 chunk_size,
867 tx_cpu,
868 self.file_io_service.clone(),
869 channel_depth,
870 cancel_token.clone(),
871 ));
872
873 let mut worker_handles = Vec::new();
876 let pipeline_arc = Arc::new(pipeline.clone());
877
878 for worker_id in 0..worker_count {
879 let rx_cpu_clone = rx_cpu_shared.clone();
880 let writer_clone = writer_shared.clone();
881 let pipeline_clone = pipeline_arc.clone();
882 let stage_executor_clone = self.stage_executor.clone();
883 let input_path_clone = input_path.to_path_buf();
884 let output_path_clone = output_path.to_path_buf();
885 let security_context_clone = security_context_for_tasks.clone();
886 let cancel_token_clone = cancel_token.clone();
887
888 let worker_handle = tokio::spawn(async move {
890 use crate::infrastructure::metrics::CONCURRENCY_METRICS;
891 use crate::infrastructure::runtime::RESOURCE_MANAGER;
892
893 let mut chunks_processed = 0;
894
895 loop {
896 #[allow(clippy::await_holding_lock)]
901 let chunk_result = tokio::select! {
902 _ = cancel_token_clone.cancelled() => {
903 break;
905 }
906 chunk_msg = async {
908 let mut rx = rx_cpu_clone.lock().await;
909 rx.recv().await
910 } => chunk_msg,
911 };
912
913 match chunk_result {
914 Some(chunk_msg) => {
915 let queue_wait = chunk_msg.enqueued_at.elapsed();
918 CONCURRENCY_METRICS.record_cpu_queue_wait(queue_wait);
919
920 let cpu_wait_start = std::time::Instant::now();
922 let _cpu_permit = RESOURCE_MANAGER.acquire_cpu().await.map_err(|e| {
923 PipelineError::resource_exhausted(format!("Failed to acquire CPU token: {}", e))
924 })?;
925 let cpu_wait_duration = cpu_wait_start.elapsed();
926
927 CONCURRENCY_METRICS.record_cpu_wait(cpu_wait_duration);
928 CONCURRENCY_METRICS.worker_started();
929
930 let mut local_context = ProcessingContext::new(
932 input_size,
933 security_context_clone.clone(),
934 );
935
936 let mut file_chunk = chunk_msg.file_chunk;
938 for stage in pipeline_clone.stages() {
939 file_chunk = stage_executor_clone
940 .execute(stage, file_chunk, &mut local_context)
941 .await
942 .map_err(|e| {
943 PipelineError::processing_failed(format!("Stage execution failed: {}", e))
944 })?;
945 }
946
947 let (nonce, chunk_data) = if file_chunk.data().len() >= 12 {
950 let is_encrypted = local_context
951 .metadata()
952 .get("encrypted")
953 .map(|v| v == "true")
954 .unwrap_or(false);
955
956 if is_encrypted {
957 let mut nonce_array = [0u8; 12];
958 nonce_array.copy_from_slice(&file_chunk.data()[..12]);
959 (nonce_array, file_chunk.data()[12..].to_vec())
960 } else {
961 ([0u8; 12], file_chunk.data().to_vec())
962 }
963 } else {
964 ([0u8; 12], file_chunk.data().to_vec())
965 };
966
967 let chunk_format = ChunkFormat::new(nonce, chunk_data);
968 writer_clone
969 .write_chunk_at_position(chunk_format, chunk_msg.chunk_index as u64)
970 .await?;
971
972 CONCURRENCY_METRICS.worker_completed();
973 chunks_processed += 1;
974 }
975 None => {
976 break;
978 }
979 }
980 }
981
982 Ok::<WorkerStats, PipelineError>(WorkerStats {
983 worker_id,
984 chunks_processed,
985 })
986 });
987
988 worker_handles.push(worker_handle);
989 }
990
991 let reader_stats = reader_handle
998 .await
999 .map_err(|e| PipelineError::processing_failed(format!("Reader task failed: {}", e)))??;
1000
1001 debug!(
1002 "Reader completed: {} chunks read, {} bytes",
1003 reader_stats.chunks_read, reader_stats.bytes_read
1004 );
1005
1006 let mut total_chunks_processed = 0;
1008 for (worker_id, worker_handle) in worker_handles.into_iter().enumerate() {
1009 let worker_stats = worker_handle
1010 .await
1011 .map_err(|e| PipelineError::processing_failed(format!("Worker {} failed: {}", worker_id, e)))??;
1012
1013 debug!(
1014 "Worker {} completed: {} chunks processed",
1015 worker_stats.worker_id, worker_stats.chunks_processed
1016 );
1017 total_chunks_processed += worker_stats.chunks_processed;
1018 }
1019
1020 let _total_bytes_written = writer_shared.finalize(header).await?;
1028
1029 let chunks_processed = total_chunks_processed as u64;
1035 let total_bytes_processed = reader_stats.bytes_read;
1036
1037 let total_duration = start_time.elapsed();
1039 let throughput = (total_bytes_processed as f64) / total_duration.as_secs_f64() / (1024.0 * 1024.0); progress_indicator
1041 .show_completion(total_bytes_processed, throughput, total_duration)
1042 .await;
1043
1044 let total_output_bytes = tokio::fs::metadata(output_path)
1046 .await
1047 .map_err(|e| PipelineError::io_error(format!("Failed to get output file size: {}", e)))?
1048 .len();
1049
1050 let mut processing_metrics = processing_context.metrics().clone();
1052 processing_metrics.start();
1053 processing_metrics.update_bytes_processed(total_bytes_processed);
1054 processing_metrics.update_chunks_processed(chunks_processed);
1055 processing_metrics.set_output_file_info(total_output_bytes, None);
1056 processing_metrics.end();
1057
1058 debug!(
1060 "Channel pipeline completed: {} chunks, {:.2} MB/s, {} → {} in {:?}",
1061 chunks_processed,
1062 throughput,
1063 Byte::from_u128(total_bytes_processed as u128)
1064 .unwrap_or_else(|| Byte::from_u64(0))
1065 .get_appropriate_unit(byte_unit::UnitType::Decimal),
1066 Byte::from_u128(total_output_bytes as u128)
1067 .unwrap_or_else(|| Byte::from_u64(0))
1068 .get_appropriate_unit(byte_unit::UnitType::Decimal),
1069 total_duration
1070 );
1071
1072 let mut metrics = processing_context.metrics().clone();
1074 metrics.start();
1075 metrics.update_bytes_processed(total_bytes_processed);
1076 metrics.update_chunks_processed(chunks_processed);
1077
1078 let output_checksum = {
1080 let output_data = tokio::fs::read(output_path)
1081 .await
1082 .map_err(|e| PipelineError::io_error(e.to_string()))?;
1083 let digest = ring::digest::digest(&ring::digest::SHA256, &output_data);
1084 hex::encode(digest.as_ref())
1085 };
1086
1087 metrics.set_output_file_info(total_output_bytes, Some(output_checksum));
1089 metrics.end();
1090
1091 if let Some(obs) = &context.observer {
1093 obs.on_processing_completed(total_duration, Some(&metrics)).await;
1094 }
1095
1096 Ok(metrics)
1097 }
1098
1099 async fn process_chunks(
1100 &self,
1101 pipeline: &Pipeline,
1102 chunks: Vec<FileChunk>,
1103 context: &mut ProcessingContext,
1104 ) -> Result<Vec<FileChunk>, PipelineError> {
1105 let mut processed_chunks = chunks;
1106
1107 for stage in pipeline.stages() {
1108 info!("Processing through stage: {}", stage.name());
1109 let stage_start = std::time::Instant::now();
1110
1111 let futures: Vec<_> = processed_chunks
1114 .into_iter()
1115 .map(|chunk| {
1116 let mut ctx = context.clone();
1117 async move { self.process_chunk_through_stage(chunk, stage, &mut ctx).await }
1118 })
1119 .collect();
1120
1121 processed_chunks = future::try_join_all(futures).await?;
1122
1123 let stage_duration = stage_start.elapsed();
1124 self.update_metrics(context, stage.name(), stage_duration);
1125
1126 info!("Completed stage {} in {:?}", stage.name(), stage_duration);
1127 }
1128
1129 Ok(processed_chunks)
1130 }
1131
1132 async fn validate_pipeline(&self, pipeline: &Pipeline) -> Result<(), PipelineError> {
1133 debug!("Validating pipeline: {}", pipeline.id());
1134
1135 if pipeline.stages().is_empty() {
1137 return Err(PipelineError::InvalidConfiguration(
1138 "Pipeline has no stages".to_string(),
1139 ));
1140 }
1141
1142 for stage in pipeline.stages() {
1144 if stage.configuration().algorithm.is_empty() {
1146 return Err(PipelineError::InvalidConfiguration(format!(
1147 "Stage '{}' has no algorithm specified",
1148 stage.name()
1149 )));
1150 }
1151
1152 if let Err(e) = stage.validate() {
1154 return Err(PipelineError::InvalidConfiguration(format!(
1155 "Stage '{}' validation failed: {}",
1156 stage.name(),
1157 e
1158 )));
1159 }
1160 }
1161
1162 debug!("Validating stage ordering...");
1164 self.stage_executor.validate_stage_ordering(pipeline.stages()).await?;
1165
1166 debug!("Pipeline validation passed");
1167 Ok(())
1168 }
1169
1170 async fn estimate_processing_time(
1171 &self,
1172 pipeline: &Pipeline,
1173 file_size: u64,
1174 ) -> Result<std::time::Duration, PipelineError> {
1175 let mut total_seconds = 0.0;
1176 let file_size_mb = (file_size as f64) / (1024.0 * 1024.0);
1177
1178 for stage in pipeline.stages() {
1179 let stage_seconds = match stage.stage_type() {
1181 adaptive_pipeline_domain::entities::StageType::Compression => file_size_mb / 50.0, adaptive_pipeline_domain::entities::StageType::Encryption => file_size_mb / 100.0, _ => file_size_mb / 200.0,
1184 };
1187 total_seconds += stage_seconds;
1188 }
1189
1190 Ok(std::time::Duration::from_secs_f64(total_seconds))
1191 }
1192
1193 async fn get_resource_requirements(
1194 &self,
1195 pipeline: &Pipeline,
1196 file_size: u64,
1197 ) -> Result<ResourceRequirements, PipelineError> {
1198 let mut total_memory_mb = 0.0;
1199 let mut total_cpu_cores = 0;
1200 let mut estimated_time_seconds = 0.0;
1201
1202 for stage in pipeline.stages() {
1203 let chunk_size = stage.configuration().chunk_size.unwrap_or(1024 * 1024) as f64;
1205 let stage_memory = (chunk_size / (1024.0 * 1024.0)) * 2.0; total_memory_mb += stage_memory;
1207
1208 if stage.configuration().parallel_processing {
1210 total_cpu_cores = total_cpu_cores.max(4); } else {
1213 total_cpu_cores = total_cpu_cores.max(1);
1214 }
1215
1216 let throughput_mbps = match stage.stage_type() {
1218 adaptive_pipeline_domain::entities::StageType::Compression => 50.0,
1219 adaptive_pipeline_domain::entities::StageType::Encryption => 100.0,
1220 _ => 200.0,
1221 };
1222
1223 let file_size_mb = (file_size as f64) / (1024.0 * 1024.0);
1224 estimated_time_seconds += file_size_mb / throughput_mbps;
1225 }
1226
1227 Ok(ResourceRequirements {
1228 memory_bytes: (total_memory_mb * 1024.0 * 1024.0) as u64,
1229 cpu_cores: total_cpu_cores,
1230 disk_space_bytes: ((file_size as f64) * 2.0) as u64, network_bandwidth_bps: None, gpu_memory_bytes: None, estimated_duration: std::time::Duration::from_secs_f64(estimated_time_seconds),
1234 })
1235 }
1236
1237 async fn create_optimized_pipeline(
1238 &self,
1239 file_path: &std::path::Path,
1240 requirements: PipelineRequirements,
1241 ) -> Result<Pipeline, PipelineError> {
1242 let file_extension = file_path.extension().and_then(|ext| ext.to_str()).unwrap_or("");
1243
1244 let pipeline_name = format!("optimized_pipeline_{}", uuid::Uuid::new_v4());
1245 let mut stages = Vec::new();
1246
1247 if requirements.compression_required {
1249 let algorithm = match file_extension {
1250 "txt" | "log" | "csv" | "json" | "xml" | "html" => "brotli",
1251 "bin" | "exe" | "dll" => "zstd",
1252 _ => "brotli", };
1254
1255 let compression_config = adaptive_pipeline_domain::entities::StageConfiguration {
1256 algorithm: algorithm.to_string(),
1257 operation: adaptive_pipeline_domain::entities::Operation::Forward,
1258 parameters: std::collections::HashMap::new(),
1259 parallel_processing: requirements.parallel_processing,
1260 chunk_size: Some(1024 * 1024), };
1262
1263 let compression_stage = adaptive_pipeline_domain::entities::PipelineStage::new(
1264 "compression".to_string(),
1265 adaptive_pipeline_domain::entities::StageType::Compression,
1266 compression_config,
1267 stages.len() as u32,
1268 )?;
1269
1270 stages.push(compression_stage);
1271 }
1272
1273 if requirements.encryption_required {
1275 let encryption_config = adaptive_pipeline_domain::entities::StageConfiguration {
1276 algorithm: "aes256-gcm".to_string(),
1277 operation: adaptive_pipeline_domain::entities::Operation::Forward,
1278 parameters: std::collections::HashMap::new(),
1279 parallel_processing: requirements.parallel_processing,
1280 chunk_size: Some(1024 * 1024), };
1282
1283 let encryption_stage = adaptive_pipeline_domain::entities::PipelineStage::new(
1284 "encryption".to_string(),
1285 adaptive_pipeline_domain::entities::StageType::Encryption,
1286 encryption_config,
1287 stages.len() as u32,
1288 )?;
1289
1290 stages.push(encryption_stage);
1291 }
1292
1293 Pipeline::new(pipeline_name, stages)
1294 }
1295
1296 async fn monitor_execution(
1297 &self,
1298 pipeline_id: PipelineId,
1299 context: &ProcessingContext,
1300 ) -> Result<ExecutionStatus, PipelineError> {
1301 let active_pipelines = self.active_pipelines.read().await;
1302
1303 if let Some(_aggregate) = active_pipelines.get(&pipeline_id.to_string()) {
1304 Ok(ExecutionStatus {
1305 pipeline_id,
1306 status: ExecutionState::Running,
1307 progress_percentage: 0.0,
1308 bytes_processed: 0,
1309 bytes_total: 0,
1310 current_stage: Some("unknown".to_string()),
1311 estimated_remaining_time: None,
1312 error_count: 0,
1313 warning_count: 0,
1314 started_at: chrono::Utc::now(),
1315 updated_at: chrono::Utc::now(),
1316 })
1317 } else {
1318 Err(PipelineError::PipelineNotFound(pipeline_id.to_string()))
1319 }
1320 }
1321
1322 async fn pause_execution(&self, pipeline_id: PipelineId) -> Result<(), PipelineError> {
1323 info!("Pipeline {} paused", pipeline_id);
1324 Ok(())
1325 }
1326
1327 async fn resume_execution(&self, pipeline_id: PipelineId) -> Result<(), PipelineError> {
1328 info!("Pipeline {} resumed", pipeline_id);
1329 Ok(())
1330 }
1331
1332 async fn cancel_execution(&self, pipeline_id: PipelineId) -> Result<(), PipelineError> {
1333 let mut active_pipelines = self.active_pipelines.write().await;
1334
1335 if active_pipelines.remove(&pipeline_id.to_string()).is_some() {
1336 info!("Pipeline {} cancelled", pipeline_id);
1337 Ok(())
1338 } else {
1339 Err(PipelineError::PipelineNotFound(pipeline_id.to_string()))
1340 }
1341 }
1342
1343 async fn get_execution_history(
1344 &self,
1345 pipeline_id: PipelineId,
1346 _limit: Option<usize>,
1347 ) -> Result<Vec<ExecutionRecord>, PipelineError> {
1348 Ok(Vec::new())
1351 }
1352}
1353
1354pub struct PipelineChunkProcessor {
1356 pipeline: Pipeline,
1357 stage_executor: Arc<dyn StageExecutor>,
1358}
1359
1360impl PipelineChunkProcessor {
1361 pub fn new(pipeline: Pipeline, stage_executor: Arc<dyn StageExecutor>) -> Self {
1362 Self {
1363 pipeline,
1364 stage_executor,
1365 }
1366 }
1367}
1368
1369#[cfg(test)]
1388mod tests {
1389 use super::*;
1390 use crate::infrastructure::adapters::{MultiAlgoCompression, MultiAlgoEncryption};
1391 use crate::infrastructure::repositories::sqlite_pipeline::SqlitePipelineRepository;
1392 use crate::infrastructure::runtime::stage_executor::BasicStageExecutor;
1393 use adaptive_pipeline_domain::entities::pipeline::Pipeline;
1394 use adaptive_pipeline_domain::entities::security_context::SecurityContext;
1395 use adaptive_pipeline_domain::value_objects::binary_file_format::{
1396 FileHeader, CURRENT_FORMAT_VERSION, MAGIC_BYTES,
1397 };
1398 use std::path::PathBuf;
1399 use tempfile::TempDir;
1400 use tokio::fs;
1401
1402 #[test]
1435 fn test_pipeline_creation_for_database() {
1436 println!("Testing pipeline creation for database operations");
1438
1439 let compression_stage = adaptive_pipeline_domain::entities::PipelineStage::new(
1441 "compression".to_string(),
1442 StageType::Compression,
1443 adaptive_pipeline_domain::entities::pipeline_stage::StageConfiguration {
1444 algorithm: "brotli".to_string(),
1445 operation: adaptive_pipeline_domain::entities::Operation::Forward,
1446 parameters: std::collections::HashMap::new(),
1447 parallel_processing: false,
1448 chunk_size: Some(1024),
1449 },
1450 1,
1451 )
1452 .unwrap();
1453 println!("✅ Created compression stage");
1454
1455 let test_pipeline = Pipeline::new("test-database-integration".to_string(), vec![compression_stage]).unwrap();
1457 println!("✅ Created test pipeline with {} stages", test_pipeline.stages().len());
1458
1459 assert!(!test_pipeline.name().is_empty());
1461 assert!(!test_pipeline.stages().is_empty());
1462
1463 println!("✅ Pipeline creation test passed!");
1464 }
1465
1466 #[test]
1501 fn test_database_path_and_url_generation() {
1502 println!("Testing database path and URL generation");
1504
1505 let temp_dir = TempDir::new().unwrap();
1507 let db_file = temp_dir.path().join("test_pipeline.db");
1508 let db_path = db_file.to_str().unwrap();
1509
1510 println!("📁 Creating temporary database path: {}", db_path);
1511
1512 let database_url = format!("sqlite:{}", db_path);
1514 println!("🔗 Generated database URL: {}", database_url);
1515
1516 assert!(database_url.starts_with("sqlite:"));
1518 assert!(database_url.contains("test_pipeline.db"));
1519
1520 let schema_sql = include_str!("../../../scripts/test_data/create_fresh_structured_database.sql");
1522 println!("📝 Schema file loaded: {} characters", schema_sql.len());
1523 assert!(!schema_sql.is_empty());
1524 assert!(schema_sql.contains("CREATE TABLE"));
1525
1526 let compression_stage = adaptive_pipeline_domain::entities::PipelineStage::new(
1528 "compression".to_string(),
1529 StageType::Compression,
1530 adaptive_pipeline_domain::entities::pipeline_stage::StageConfiguration {
1531 algorithm: "brotli".to_string(),
1532 operation: adaptive_pipeline_domain::entities::Operation::Forward,
1533 parameters: std::collections::HashMap::new(),
1534 parallel_processing: false,
1535 chunk_size: Some(1024),
1536 },
1537 1,
1538 )
1539 .unwrap();
1540
1541 let test_pipeline = Pipeline::new("test-database-operations".to_string(), vec![compression_stage]).unwrap();
1542 println!(
1543 "✅ Created test pipeline: {} with {} stages",
1544 test_pipeline.name(),
1545 test_pipeline.stages().len()
1546 );
1547
1548 assert!(!test_pipeline.name().is_empty());
1550 assert!(!test_pipeline.stages().is_empty());
1551 assert!(!test_pipeline.id().to_string().is_empty());
1552
1553 println!("✅ Database preparation test passed!");
1554 }
1555
1556 #[tokio::test]
1575 async fn test_reader_task_cancellation() {
1576 use crate::infrastructure::adapters::file_io::TokioFileIO;
1577 use adaptive_pipeline_bootstrap::shutdown::ShutdownCoordinator;
1578 use adaptive_pipeline_domain::services::file_io_service::FileIOConfig;
1579 use std::time::Duration;
1580
1581 let temp_dir = TempDir::new().unwrap();
1583 let input_file = temp_dir.path().join("test_input.txt");
1584 fs::write(&input_file, b"test data for cancellation").await.unwrap();
1585
1586 let (tx, _rx) = tokio::sync::mpsc::channel(10);
1588 let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
1589 let cancel_token = coordinator.token();
1590
1591 cancel_token.cancel();
1593
1594 let file_io = Arc::new(TokioFileIO::new(FileIOConfig::default())) as Arc<dyn FileIOService>;
1596 let result = reader_task(input_file, 1024, tx, file_io, 10, cancel_token).await;
1597
1598 assert!(result.is_err());
1600 let err = result.unwrap_err();
1601 assert!(
1602 err.to_string().contains("cancel"),
1603 "Expected cancellation error, got: {}",
1604 err
1605 );
1606 }
1607
1608 #[tokio::test]
1627 async fn test_cancellation_during_processing() {
1628 use crate::infrastructure::adapters::file_io::TokioFileIO;
1629 use crate::infrastructure::runtime::{init_resource_manager, ResourceConfig};
1630 use adaptive_pipeline_bootstrap::shutdown::ShutdownCoordinator;
1631 use adaptive_pipeline_domain::services::file_io_service::FileIOConfig;
1632 use std::time::Duration;
1633
1634 let _ = init_resource_manager(ResourceConfig::default());
1636
1637 let temp_dir = TempDir::new().unwrap();
1639 let input_file = temp_dir.path().join("large_input.txt");
1640 let test_data = vec![b'X'; 1024 * 100]; fs::write(&input_file, &test_data).await.unwrap();
1642
1643 let (tx, mut rx) = tokio::sync::mpsc::channel::<ChunkMessage>(5);
1645 let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
1646 let cancel_token = coordinator.token();
1647 let cancel_clone = cancel_token.clone();
1648
1649 let file_io = Arc::new(TokioFileIO::new(FileIOConfig::default())) as Arc<dyn FileIOService>;
1651 let reader_handle =
1652 tokio::spawn(async move { reader_task(input_file, 1024, tx, file_io, 5, cancel_clone).await });
1653
1654 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1656
1657 cancel_token.cancel();
1659
1660 let reader_result = reader_handle.await.unwrap();
1662 assert!(reader_result.is_err());
1663
1664 while rx.try_recv().is_ok() {}
1667
1668 assert!(rx.recv().await.is_none(), "Channel should be closed after cancellation");
1670 }
1671
1672 #[tokio::test]
1684 async fn test_worker_cancellation() {
1685 use adaptive_pipeline_bootstrap::shutdown::ShutdownCoordinator;
1686 use std::time::Duration;
1687
1688 let (_tx, rx) = tokio::sync::mpsc::channel::<ChunkMessage>(10);
1690 let rx_shared = Arc::new(tokio::sync::Mutex::new(rx));
1691
1692 let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
1693 let cancel_token = coordinator.token();
1694 let cancel_clone = cancel_token.clone();
1695
1696 let worker_handle = tokio::spawn(async move {
1698 loop {
1699 let mut rx_lock = rx_shared.lock().await;
1700
1701 tokio::select! {
1702 _ = cancel_clone.cancelled() => {
1703 break;
1705 }
1706 _chunk_msg = rx_lock.recv() => {
1707 continue;
1708 }
1709 };
1710
1711 #[allow(unreachable_code)]
1712 {}
1713 }
1714 Ok::<(), PipelineError>(())
1715 });
1716
1717 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1719
1720 cancel_token.cancel();
1722
1723 let result = tokio::time::timeout(tokio::time::Duration::from_secs(1), worker_handle).await;
1725
1726 assert!(result.is_ok(), "Worker should exit within timeout");
1727 let worker_result = result.unwrap().unwrap();
1728 assert!(worker_result.is_ok(), "Worker should exit without error");
1729 }
1730
1731 #[tokio::test]
1743 async fn test_early_cancellation_detection() {
1744 use crate::infrastructure::adapters::file_io::TokioFileIO;
1745 use adaptive_pipeline_bootstrap::shutdown::ShutdownCoordinator;
1746 use adaptive_pipeline_domain::services::file_io_service::FileIOConfig;
1747 use std::time::Duration;
1748
1749 let temp_dir = TempDir::new().unwrap();
1750 let input_file = temp_dir.path().join("input.txt");
1751 fs::write(&input_file, b"data").await.unwrap();
1752
1753 let (tx, _rx) = tokio::sync::mpsc::channel(10);
1754 let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
1755 let cancel_token = coordinator.token();
1756
1757 cancel_token.cancel();
1759 assert!(cancel_token.is_cancelled(), "Token should be cancelled");
1760
1761 let file_io = Arc::new(TokioFileIO::new(FileIOConfig::default())) as Arc<dyn FileIOService>;
1763 let result = reader_task(input_file, 1024, tx, file_io, 10, cancel_token).await;
1764
1765 assert!(result.is_err());
1767 assert!(result.unwrap_err().to_string().contains("cancel"));
1768 }
1769
1770 #[tokio::test]
1782 async fn test_cancellation_token_propagation() {
1783 use adaptive_pipeline_bootstrap::shutdown::ShutdownCoordinator;
1784 use std::time::Duration;
1785
1786 let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
1787 let token = coordinator.token();
1788 let clone1 = token.clone();
1789 let clone2 = token.clone();
1790
1791 assert!(!token.is_cancelled());
1793 assert!(!clone1.is_cancelled());
1794 assert!(!clone2.is_cancelled());
1795
1796 token.cancel();
1798
1799 assert!(token.is_cancelled());
1801 assert!(clone1.is_cancelled());
1802 assert!(clone2.is_cancelled());
1803
1804 tokio::time::timeout(tokio::time::Duration::from_millis(100), clone1.cancelled())
1806 .await
1807 .unwrap();
1808
1809 tokio::time::timeout(tokio::time::Duration::from_millis(100), clone2.cancelled())
1810 .await
1811 .unwrap();
1812 }
1813}