adaptive_pipeline/application/services/
pipeline.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8// Application service with infrastructure types and future features
9#![allow(dead_code, unused_imports, unused_variables)]
10
11//! # Pipeline Service Implementation
12//!
13//! Application layer orchestration of file processing workflows. Coordinates
14//! multi-stage pipelines (compression, encryption, I/O) with async processing,
15//! progress monitoring, and parallel chunk processing. Integrates domain
16//! services via dependency injection. Provides real-time metrics, error
17//! recovery, and resource management. See mdBook for workflow details and
18//! integration patterns.
19//! - **Encryption Integration**: Seamless encryption of sensitive data
20//! - **Key Management**: Secure key generation, storage, and rotation
21//! - **Memory Protection**: Secure handling of sensitive data in memory
22//!
23//! ## Integration
24//!
25//! The pipeline service integrates with:
26//!
27//! - **Domain Layer**: Implements `PipelineService` trait
28//! - **Repository Layer**: Persists pipeline state and metadata
29//! - **Monitoring Systems**: Reports metrics and traces
30//! - **Configuration System**: Dynamic configuration updates
31
32use async_trait::async_trait;
33use byte_unit::Byte;
34use futures::future;
35use std::path::PathBuf;
36use std::sync::Arc;
37use tokio::sync::RwLock;
38use tracing::{debug, info, warn};
39
40use adaptive_pipeline_domain::aggregates::PipelineAggregate;
41use adaptive_pipeline_domain::entities::pipeline_stage::StageType;
42use adaptive_pipeline_domain::entities::{
43    Pipeline, PipelineStage, ProcessingContext, ProcessingMetrics, SecurityContext,
44};
45use adaptive_pipeline_domain::repositories::stage_executor::ResourceRequirements;
46use adaptive_pipeline_domain::repositories::{PipelineRepository, StageExecutor};
47use adaptive_pipeline_domain::services::file_io_service::{FileIOService, ReadOptions};
48use adaptive_pipeline_domain::services::file_processor_service::ChunkProcessor;
49use adaptive_pipeline_domain::services::{
50    CompressionService, EncryptionService, ExecutionRecord, ExecutionState, ExecutionStatus, KeyMaterial,
51    PipelineRequirements, PipelineService,
52};
53use adaptive_pipeline_domain::value_objects::{ChunkFormat, FileChunk, PipelineId, WorkerCount};
54use adaptive_pipeline_domain::PipelineError;
55
56use crate::infrastructure::services::binary_format::{BinaryFormatService, BinaryFormatWriter};
57use crate::infrastructure::services::progress_indicator::ProgressIndicatorService;
58
59// Concrete implementation of the pipeline service
60//
61// This struct provides the main orchestration logic for the adaptive pipeline
62// system, coordinating multiple services to process files through compression,
63// encryption, and binary format operations.
64//
65// # Architecture
66//
67// The pipeline service acts as the central coordinator, managing:
68// - Service dependencies and their lifecycles
69// - Processing workflow orchestration
70// - Resource allocation and management
71// - Progress monitoring and reporting
72// - Error handling and recovery
73//
74// # Dependencies
75//
76// The service requires several injected dependencies:
77// - **Compression Service**: Handles data compression operations
78// - **Encryption Service**: Manages encryption and key operations
79// - **Binary Format Service**: Creates and manages .adapipe format files
80// - **Pipeline Repository**: Persists pipeline state and metadata
81// - **Stage Executor**: Executes individual pipeline stages
82// - **Metrics Service**: Collects and reports performance metrics
83// - **Progress Indicator**: Provides real-time progress updates
84//
85// # Examples
86// ============================================================================
87// Channel-Based Pipeline Architecture
88// ============================================================================
89// Educational: This section implements the three-stage execution pipeline
90// (Reader → CPU Workers → Writer) using channels for communication.
91//
92// See: docs/EXECUTION_VS_PROCESSING_PIPELINES.md for architectural overview
93
94/// Message sent from Reader task to CPU Worker tasks
95///
96/// ## Educational: Channel Message Design
97///
98/// This represents a unit of work flowing through the execution pipeline.
99/// The reader sends these messages to workers, who process them and forward
100/// results to the writer.
101///
102/// ## Design Rationale:
103/// - `chunk_index`: Required for ordered writes (future enhancement)
104/// - `data`: Owned Vec<u8> for zero-copy channel transfer
105/// - `is_final`: Allows writer to finalize file on last chunk
106/// - `enqueued_at`: Timestamp for queue wait metrics
107#[derive(Debug)]
108struct ChunkMessage {
109    /// Index of this chunk in the file (0-based)
110    chunk_index: usize,
111
112    /// Raw chunk data (owned for channel transfer)
113    data: Vec<u8>,
114
115    /// True if this is the last chunk in the file
116    is_final: bool,
117
118    /// Original file chunk with metadata
119    file_chunk: FileChunk,
120
121    /// Timestamp when message was enqueued (for queue wait metrics)
122    enqueued_at: std::time::Instant,
123}
124
125/// Message sent from CPU Worker tasks to Writer task
126///
127/// ## Educational: Processing Result
128///
129/// After CPU workers execute all processing stages (compression, encryption,
130/// etc.), they send this message to the writer for persistence.
131///
132/// ## Design Rationale:
133/// - `chunk_index`: Enables ordered writes (if needed)
134/// - `processed_data`: Result of all processing stages
135/// - `is_final`: Signals writer to finalize file
136#[derive(Debug)]
137struct ProcessedChunkMessage {
138    /// Index of this chunk in the file (0-based)
139    chunk_index: usize,
140
141    /// Processed data (after all pipeline stages)
142    processed_data: Vec<u8>,
143
144    /// True if this is the last chunk in the file
145    is_final: bool,
146}
147
148/// Statistics from the reader task
149#[derive(Debug)]
150struct ReaderStats {
151    chunks_read: usize,
152    bytes_read: u64,
153}
154
155/// Statistics from a CPU worker task
156#[derive(Debug)]
157struct WorkerStats {
158    worker_id: usize,
159    chunks_processed: usize,
160}
161
162/// Statistics from the writer task
163#[derive(Debug)]
164struct WriterStats {
165    chunks_written: usize,
166    bytes_written: u64,
167}
168
169// ============================================================================
170// Pipeline Task Implementations
171// ============================================================================
172
173/// Reader Task - Stage 1 of Execution Pipeline
174///
175/// ## Educational: Single Reader Pattern
176///
177/// This task demonstrates the "single reader" pattern, which eliminates
178/// coordination overhead. Only ONE task reads from disk, ensuring sequential
179/// access patterns optimal for filesystem performance.
180///
181/// ## Backpressure Mechanism
182///
183/// The bounded channel creates natural backpressure:
184/// - When workers are fast: Channel stays empty, reader proceeds immediately
185/// - When workers are slow: Channel fills up, `tx_cpu.send()` blocks
186/// - Result: Automatic flow control without explicit rate limiting!
187///
188/// ## Arguments
189/// - `input_path`: File to read chunks from
190/// - `chunk_size`: Size of each chunk in bytes
191/// - `tx_cpu`: Channel sender to CPU workers (blocks when full)
192/// - `file_io_service`: Service for reading file chunks
193/// - `cancel_token`: Token for graceful cancellation
194///
195/// ## Returns
196/// `ReaderStats` with chunks read and bytes read
197async fn reader_task(
198    input_path: PathBuf,
199    chunk_size: usize,
200    tx_cpu: tokio::sync::mpsc::Sender<ChunkMessage>,
201    file_io_service: Arc<dyn FileIOService>,
202    channel_capacity: usize,
203    cancel_token: adaptive_pipeline_bootstrap::shutdown::CancellationToken,
204) -> Result<ReaderStats, PipelineError> {
205    use crate::infrastructure::metrics::CONCURRENCY_METRICS;
206
207    // Check for cancellation before starting
208    if cancel_token.is_cancelled() {
209        return Err(PipelineError::cancelled());
210    }
211
212    // Configure read options for streaming
213    let read_options = ReadOptions {
214        chunk_size: Some(chunk_size),
215        use_memory_mapping: false,  // Stream from disk, don't load all into memory
216        calculate_checksums: false, // We'll calculate during processing
217        ..Default::default()
218    };
219
220    // Read file into chunks using FileIOService
221    let read_result = file_io_service
222        .read_file_chunks(&input_path, read_options)
223        .await
224        .map_err(|e| PipelineError::IoError(format!("Failed to read file chunks: {}", e)))?;
225
226    let total_chunks = read_result.chunks.len();
227    let mut bytes_read = 0u64;
228
229    // Send each chunk to CPU workers
230    for (index, file_chunk) in read_result.chunks.into_iter().enumerate() {
231        let chunk_data = file_chunk.data().to_vec();
232        let chunk_size_bytes = chunk_data.len() as u64;
233        bytes_read += chunk_size_bytes;
234
235        let message = ChunkMessage {
236            chunk_index: index,
237            data: chunk_data,
238            is_final: index == total_chunks - 1,
239            file_chunk,
240            enqueued_at: std::time::Instant::now(), // Timestamp for queue wait
241        };
242
243        // Educational: This blocks if channel is full → backpressure!
244        // When workers are processing slowly, the reader waits here,
245        // preventing memory overload from reading too far ahead.
246        // Also cancellable for graceful shutdown.
247        tokio::select! {
248            _ = cancel_token.cancelled() => {
249                return Err(PipelineError::cancelled_with_msg("reader cancelled during send"));
250            }
251            send_result = tx_cpu.send(message) => {
252                send_result.map_err(|_e| PipelineError::io_error("CPU worker channel closed unexpectedly"))?;
253            }
254        }
255
256        // Update queue depth metrics after send
257        // Educational: Shows backpressure in real-time
258        let remaining_capacity = tx_cpu.capacity();
259        let current_depth = channel_capacity.saturating_sub(remaining_capacity);
260        CONCURRENCY_METRICS.update_cpu_queue_depth(current_depth);
261    }
262
263    // Educational: Dropping tx_cpu signals "no more chunks" to workers
264    // Workers receive None from rx_cpu.recv() and gracefully shut down
265    drop(tx_cpu);
266
267    Ok(ReaderStats {
268        chunks_read: total_chunks,
269        bytes_read,
270    })
271}
272
273/// Context for CPU worker tasks
274///
275/// Groups related parameters to avoid excessive function arguments
276struct CpuWorkerContext {
277    writer: Arc<dyn BinaryFormatWriter>,
278    pipeline: Arc<Pipeline>,
279    stage_executor: Arc<dyn StageExecutor>,
280    input_path: PathBuf,
281    output_path: PathBuf,
282    input_size: u64,
283    security_context: SecurityContext,
284}
285
286/// CPU Worker Task - Stage 2 of Execution Pipeline
287///
288/// ## Educational: Worker Pool Pattern
289///
290/// Multiple instances of this task run concurrently, forming a worker pool.
291/// Each worker:
292/// 1. Receives chunks from shared channel (MPSC pattern)
293/// 2. Acquires global CPU token (prevents oversubscription)
294/// 3. Executes ALL processing stages sequentially for ONE chunk
295/// 4. Writes directly to shared writer using concurrent random-access writes
296///
297/// ## Execution vs Processing Pipeline
298///
299/// This is where the two pipelines intersect:
300/// - **Execution pipeline**: Concurrency management (receive → process → write)
301/// - **Processing pipeline**: Business logic (compress → encrypt → checksum)
302///
303/// See: docs/EXECUTION_VS_PROCESSING_PIPELINES.md for details
304///
305/// ## Arguments
306/// - `worker_id`: Unique identifier for this worker (for metrics/debugging)
307/// - `rx_cpu`: Channel receiver for chunks (shared among workers)
308/// - `ctx`: Context containing processing dependencies and file information
309///
310/// ## Returns
311/// `WorkerStats` with worker ID and chunks processed
312#[allow(dead_code)]
313async fn cpu_worker_task(
314    worker_id: usize,
315    mut rx_cpu: tokio::sync::mpsc::Receiver<ChunkMessage>,
316    ctx: CpuWorkerContext,
317) -> Result<WorkerStats, PipelineError> {
318    use crate::infrastructure::metrics::CONCURRENCY_METRICS;
319    use crate::infrastructure::runtime::RESOURCE_MANAGER;
320
321    let mut chunks_processed = 0;
322
323    // Educational: Worker loop - receive, process, write
324    while let Some(chunk_msg) = rx_cpu.recv().await {
325        // ===================================================
326        // EXECUTION PIPELINE: Resource acquisition
327        // ===================================================
328
329        // Acquire global CPU token to prevent oversubscription
330        let cpu_wait_start = std::time::Instant::now();
331        let _cpu_permit = RESOURCE_MANAGER
332            .acquire_cpu()
333            .await
334            .map_err(|e| PipelineError::resource_exhausted(format!("Failed to acquire CPU token: {}", e)))?;
335        let cpu_wait_duration = cpu_wait_start.elapsed();
336
337        CONCURRENCY_METRICS.record_cpu_wait(cpu_wait_duration);
338        CONCURRENCY_METRICS.worker_started();
339
340        // ===================================================
341        // PROCESSING PIPELINE: Business logic execution
342        // ===================================================
343
344        // Create local processing context for this chunk
345        let mut local_context = ProcessingContext::new(
346            ctx.input_size,
347            ctx.security_context.clone(),
348        );
349
350        // Execute each configured stage sequentially on this chunk
351        // Start with the FileChunk we received
352        let mut file_chunk = chunk_msg.file_chunk;
353
354        for stage in ctx.pipeline.stages() {
355            file_chunk = ctx
356                .stage_executor
357                .execute(stage, file_chunk, &mut local_context)
358                .await
359                .map_err(|e| PipelineError::processing_failed(format!("Stage execution failed: {}", e)))?;
360        }
361
362        // ===================================================
363        // EXECUTION PIPELINE: Direct concurrent write
364        // ===================================================
365
366        // Educational: No writer task! No mutex contention!
367        // Workers write directly using thread-safe random-access writes.
368        // Each write goes to a different file position, so they don't conflict.
369
370        // Prepare chunk for .adapipe file format
371        // Extract nonce from encrypted data if encryption was applied
372        // When encryption runs, it prepends a 12-byte nonce to the encrypted data
373        let (nonce, chunk_data) = if file_chunk.data().len() >= 12 {
374            // Check if this chunk was encrypted by looking at processing context
375            let is_encrypted = local_context
376                .metadata()
377                .get("encrypted")
378                .map(|v| v == "true")
379                .unwrap_or(false);
380
381            if is_encrypted {
382                // Extract nonce (first 12 bytes) from encrypted data
383                let mut nonce_array = [0u8; 12];
384                nonce_array.copy_from_slice(&file_chunk.data()[..12]);
385                (nonce_array, file_chunk.data()[12..].to_vec())
386            } else {
387                // No encryption: use zero nonce as placeholder, keep all data
388                ([0u8; 12], file_chunk.data().to_vec())
389            }
390        } else {
391            // Data too small to contain nonce: use zero nonce, keep all data
392            ([0u8; 12], file_chunk.data().to_vec())
393        };
394
395        // Convert processed FileChunk to ChunkFormat for binary format
396        let chunk_format = ChunkFormat::new(nonce, chunk_data);
397
398        // Direct concurrent write to calculated position
399        ctx.writer
400            .write_chunk_at_position(chunk_format, chunk_msg.chunk_index as u64)
401            .await?;
402
403        // Educational: CPU token released automatically (RAII drop)
404        CONCURRENCY_METRICS.worker_completed();
405        chunks_processed += 1;
406    }
407
408    Ok(WorkerStats {
409        worker_id,
410        chunks_processed,
411    })
412}
413
414// ============================================================================
415// Public Implementation
416// ============================================================================
417
418pub struct ConcurrentPipeline {
419    compression_service: Arc<dyn CompressionService>,
420    encryption_service: Arc<dyn EncryptionService>,
421    file_io_service: Arc<dyn FileIOService>,
422    pipeline_repository: Arc<dyn PipelineRepository>,
423    stage_executor: Arc<dyn StageExecutor>,
424    binary_format_service: Arc<dyn BinaryFormatService>,
425    active_pipelines: Arc<RwLock<std::collections::HashMap<String, PipelineAggregate>>>,
426}
427
428impl ConcurrentPipeline {
429    /// Creates a new pipeline service with injected dependencies
430    ///
431    /// # Arguments
432    /// * `compression_service` - Service for compression operations
433    /// * `encryption_service` - Service for encryption operations
434    /// * `file_io_service` - Service for file I/O operations
435    /// * `pipeline_repository` - Repository for pipeline persistence
436    /// * `stage_executor` - Executor for pipeline stages
437    /// * `binary_format_service` - Service for binary format operations
438    pub fn new(
439        compression_service: Arc<dyn CompressionService>,
440        encryption_service: Arc<dyn EncryptionService>,
441        file_io_service: Arc<dyn FileIOService>,
442        pipeline_repository: Arc<dyn PipelineRepository>,
443        stage_executor: Arc<dyn StageExecutor>,
444        binary_format_service: Arc<dyn BinaryFormatService>,
445    ) -> Self {
446        Self {
447            compression_service,
448            encryption_service,
449            file_io_service,
450            pipeline_repository,
451            stage_executor,
452            binary_format_service,
453            active_pipelines: Arc::new(RwLock::new(std::collections::HashMap::new())),
454        }
455    }
456
457    /// Processes a single chunk through a pipeline stage
458    async fn process_chunk_through_stage(
459        &self,
460        chunk: FileChunk,
461        stage: &PipelineStage,
462        context: &mut ProcessingContext,
463    ) -> Result<FileChunk, PipelineError> {
464        debug!("Processing chunk through stage: {}", stage.name());
465
466        match stage.stage_type() {
467            StageType::Compression => {
468                // Extract compression configuration from stage
469                let compression_config = self.extract_compression_config(stage)?;
470                self.compression_service
471                    .compress_chunk(chunk, &compression_config, context)
472            }
473            StageType::Encryption => {
474                let encryption_config = self.extract_encryption_config(stage)?;
475                // Generate a temporary key material for demonstration (NOT secure for
476                // production)
477                let key_material = KeyMaterial {
478                    key: vec![0u8; 32],   // 32-byte key
479                    nonce: vec![0u8; 12], // 12-byte nonce
480                    salt: vec![0u8; 32],  // 32-byte salt
481                    algorithm: encryption_config.algorithm.clone(),
482                    created_at: chrono::Utc::now(),
483                    expires_at: None,
484                };
485                self.encryption_service
486                    .encrypt_chunk(chunk, &encryption_config, &key_material, context)
487            }
488
489            StageType::Checksum => {
490                // For integrity checking, just pass through the chunk unchanged
491                // In a real implementation, this would calculate and verify checksums
492                Ok(chunk)
493            }
494            StageType::Transform => {
495                // For transform stages, delegate to the stage executor
496                self.stage_executor.execute(stage, chunk.clone(), context).await
497            }
498            StageType::PassThrough => {
499                // For custom stages, delegate to the stage executor
500                self.stage_executor.execute(stage, chunk.clone(), context).await
501            }
502        }
503    }
504
505    /// Extracts compression configuration from a pipeline stage
506    fn extract_compression_config(
507        &self,
508        stage: &PipelineStage,
509    ) -> Result<adaptive_pipeline_domain::services::CompressionConfig, PipelineError> {
510        let algorithm_str = stage.configuration().algorithm.as_str();
511        let algorithm = match algorithm_str {
512            "brotli" => adaptive_pipeline_domain::services::CompressionAlgorithm::Brotli,
513            "gzip" => adaptive_pipeline_domain::services::CompressionAlgorithm::Gzip,
514            "zstd" => adaptive_pipeline_domain::services::CompressionAlgorithm::Zstd,
515            "lz4" => adaptive_pipeline_domain::services::CompressionAlgorithm::Lz4,
516            _ => {
517                return Err(PipelineError::InvalidConfiguration(format!(
518                    "Unsupported compression algorithm: {}",
519                    algorithm_str
520                )));
521            }
522        };
523
524        // Extract compression level from parameters
525        let level = stage
526            .configuration()
527            .parameters
528            .get("level")
529            .and_then(|v| v.parse::<u32>().ok())
530            .map(|l| match l {
531                0..=3 => adaptive_pipeline_domain::services::CompressionLevel::Fast,
532                4..=6 => adaptive_pipeline_domain::services::CompressionLevel::Balanced,
533                7.. => adaptive_pipeline_domain::services::CompressionLevel::Best,
534            })
535            .unwrap_or(adaptive_pipeline_domain::services::CompressionLevel::Balanced);
536
537        Ok(adaptive_pipeline_domain::services::CompressionConfig {
538            algorithm,
539            level,
540            dictionary: None,
541            window_size: None,
542            parallel_processing: stage.configuration().parallel_processing,
543        })
544    }
545
546    /// Extracts encryption configuration from a pipeline stage
547    fn extract_encryption_config(
548        &self,
549        stage: &PipelineStage,
550    ) -> Result<adaptive_pipeline_domain::services::EncryptionConfig, PipelineError> {
551        let algorithm_str = stage.configuration().algorithm.as_str();
552        let algorithm = match algorithm_str {
553            "aes256-gcm" | "aes256gcm" => adaptive_pipeline_domain::services::EncryptionAlgorithm::Aes256Gcm,
554            "chacha20-poly1305" | "chacha20poly1305" => {
555                adaptive_pipeline_domain::services::EncryptionAlgorithm::ChaCha20Poly1305
556            }
557            "aes128-gcm" | "aes128gcm" => adaptive_pipeline_domain::services::EncryptionAlgorithm::Aes128Gcm,
558            "aes192-gcm" | "aes192gcm" => adaptive_pipeline_domain::services::EncryptionAlgorithm::Aes192Gcm,
559            _ => {
560                return Err(PipelineError::InvalidConfiguration(format!(
561                    "Unsupported encryption algorithm: {}",
562                    algorithm_str
563                )));
564            }
565        };
566
567        let kdf = stage
568            .configuration()
569            .parameters
570            .get("kdf")
571            .map(|kdf_str| match kdf_str.as_str() {
572                "argon2" => adaptive_pipeline_domain::services::KeyDerivationFunction::Argon2,
573                "scrypt" => adaptive_pipeline_domain::services::KeyDerivationFunction::Scrypt,
574                "pbkdf2" => adaptive_pipeline_domain::services::KeyDerivationFunction::Pbkdf2,
575                _ => adaptive_pipeline_domain::services::KeyDerivationFunction::Argon2,
576            });
577
578        Ok(adaptive_pipeline_domain::services::EncryptionConfig {
579            algorithm,
580            key_derivation: kdf.unwrap_or(adaptive_pipeline_domain::services::KeyDerivationFunction::Argon2),
581            key_size: 32,             // Default to 256-bit keys
582            nonce_size: 12,           // Standard for AES-GCM
583            salt_size: 16,            // Standard salt size
584            iterations: 100_000,      // Default iterations for PBKDF2
585            memory_cost: Some(65536), // Default for Argon2
586            parallel_cost: Some(1),   // Default for Argon2
587            associated_data: None,    // No additional authenticated data by default
588        })
589    }
590
591    /// Updates processing metrics based on execution results
592    fn update_metrics(&self, context: &mut ProcessingContext, stage_name: &str, duration: std::time::Duration) {
593        let mut metrics = context.metrics().clone();
594
595        // Create new stage metrics with actual data
596        let mut stage_metrics =
597            adaptive_pipeline_domain::entities::processing_metrics::StageMetrics::new(stage_name.to_string());
598        stage_metrics.update(metrics.bytes_processed(), duration);
599        metrics.add_stage_metrics(stage_metrics);
600
601        context.update_metrics(metrics);
602    }
603}
604
605#[async_trait]
606impl PipelineService for ConcurrentPipeline {
607    async fn process_file(
608        &self,
609        input_path: &std::path::Path,
610        output_path: &std::path::Path,
611        context: adaptive_pipeline_domain::services::pipeline_service::ProcessFileContext,
612    ) -> Result<ProcessingMetrics, PipelineError> {
613        debug!(
614            "Processing file: {} -> {} with pipeline {} (.adapipe format)",
615            input_path.display(),
616            output_path.display(),
617            context.pipeline_id
618        );
619
620        let start_time = std::time::Instant::now();
621
622        // Load pipeline from repository using the provided PipelineId
623        let pipeline = self
624            .pipeline_repository
625            .find_by_id(context.pipeline_id.clone())
626            .await?
627            .ok_or_else(|| PipelineError::PipelineNotFound(context.pipeline_id.to_string()))?;
628
629        // Validate pipeline before execution
630        self.validate_pipeline(&pipeline).await?;
631
632        // Get file metadata first to determine optimal chunk size
633        let input_metadata = tokio::fs::metadata(input_path)
634            .await
635            .map_err(|e| PipelineError::IoError(e.to_string()))?;
636        let input_size = input_metadata.len();
637
638        // Calculate optimal chunk size based on file size
639        let chunk_size = adaptive_pipeline_domain::value_objects::ChunkSize::optimal_for_file_size(input_size).bytes();
640
641        // Use FileIOService to read file in chunks (streaming, memory-efficient)
642        // This avoids loading the entire file into memory
643        let read_options = adaptive_pipeline_domain::services::file_io_service::ReadOptions {
644            chunk_size: Some(chunk_size),
645            use_memory_mapping: false,  // Start with streaming; can optimize later
646            calculate_checksums: false, // We'll calculate overall checksum ourselves
647            ..Default::default()
648        };
649
650        let read_result = self.file_io_service.read_file_chunks(input_path, read_options).await?;
651
652        let input_chunks = read_result.chunks;
653
654        // Calculate original file checksum incrementally from chunks
655        // This way we don't need the entire file in memory
656        let original_checksum = {
657            let mut context = ring::digest::Context::new(&ring::digest::SHA256);
658            for chunk in &input_chunks {
659                context.update(chunk.data());
660            }
661            let digest = context.finish();
662            hex::encode(digest.as_ref())
663        };
664
665        debug!(
666            "Input file: {}, SHA256: {}",
667            Byte::from_u128(input_size as u128)
668                .unwrap_or_else(|| Byte::from_u64(0))
669                .get_appropriate_unit(byte_unit::UnitType::Decimal)
670                .to_string(),
671            original_checksum
672        );
673
674        // Create .adapipe file header
675        let mut header = adaptive_pipeline_domain::value_objects::FileHeader::new(
676            input_path
677                .file_name()
678                .and_then(|n| n.to_str())
679                .unwrap_or("unknown")
680                .to_string(),
681            input_size,
682            original_checksum.clone(),
683        );
684
685        // Add processing steps based on pipeline stages
686        for stage in pipeline.stages() {
687            debug!(
688                "Processing pipeline stage: name='{}', type='{:?}', algorithm='{}'",
689                stage.name(),
690                stage.stage_type(),
691                stage.configuration().algorithm
692            );
693            match stage.stage_type() {
694                adaptive_pipeline_domain::entities::pipeline_stage::StageType::Compression => {
695                    debug!("✅ Matched Compression stage: {}", stage.name());
696                    let config = self.extract_compression_config(stage)?;
697                    let algorithm_str = match config.algorithm {
698                        adaptive_pipeline_domain::services::CompressionAlgorithm::Brotli => "brotli",
699                        adaptive_pipeline_domain::services::CompressionAlgorithm::Gzip => "gzip",
700                        adaptive_pipeline_domain::services::CompressionAlgorithm::Zstd => "zstd",
701                        adaptive_pipeline_domain::services::CompressionAlgorithm::Lz4 => "lz4",
702                        adaptive_pipeline_domain::services::CompressionAlgorithm::Custom(ref name) => name.as_str(),
703                    };
704                    let level = match config.level {
705                        adaptive_pipeline_domain::services::CompressionLevel::Fastest => 1,
706                        adaptive_pipeline_domain::services::CompressionLevel::Fast => 3,
707                        adaptive_pipeline_domain::services::CompressionLevel::Balanced => 6,
708                        adaptive_pipeline_domain::services::CompressionLevel::Best => 9,
709                        adaptive_pipeline_domain::services::CompressionLevel::Custom(level) => level,
710                    };
711                    header = header.add_compression_step(algorithm_str, level);
712                }
713                adaptive_pipeline_domain::entities::pipeline_stage::StageType::Encryption => {
714                    debug!("✅ Matched Encryption stage: {}", stage.name());
715                    let config = self.extract_encryption_config(stage)?;
716                    let algorithm_str = match config.algorithm {
717                        adaptive_pipeline_domain::services::EncryptionAlgorithm::Aes128Gcm => "aes128gcm",
718                        adaptive_pipeline_domain::services::EncryptionAlgorithm::Aes192Gcm => "aes192gcm",
719                        adaptive_pipeline_domain::services::EncryptionAlgorithm::Aes256Gcm => "aes256gcm",
720                        adaptive_pipeline_domain::services::EncryptionAlgorithm::ChaCha20Poly1305 => "chacha20poly1305",
721                        adaptive_pipeline_domain::services::EncryptionAlgorithm::Custom(ref name) => name.as_str(),
722                    };
723                    header = header.add_encryption_step(algorithm_str, "argon2", 32, 12);
724                }
725                adaptive_pipeline_domain::entities::pipeline_stage::StageType::Checksum => {
726                    debug!("✅ Matched Checksum stage: {}", stage.name());
727                    // Checksum stages use proper ProcessingStepType::Checksum
728                    header = header.add_checksum_step(stage.configuration().algorithm.as_str());
729                }
730                adaptive_pipeline_domain::entities::pipeline_stage::StageType::PassThrough => {
731                    debug!("✅ Matched PassThrough stage: {}", stage.name());
732                    // PassThrough stages use proper ProcessingStepType::PassThrough
733                    header = header.add_passthrough_step(stage.configuration().algorithm.as_str());
734                }
735                _ => {
736                    // Fallback for any unhandled stage types
737                    debug!(
738                        "⚠️ Unhandled stage type: name='{}', type='{:?}', algorithm='{}'",
739                        stage.name(),
740                        stage.stage_type(),
741                        stage.configuration().algorithm
742                    );
743                    header = header.add_custom_step(
744                        stage.name(),
745                        stage.configuration().algorithm.as_str(),
746                        stage.configuration().parameters.clone(),
747                    );
748                }
749            }
750        }
751
752        // Set chunk info and pipeline ID - chunk_size already calculated above
753        header = header
754            .with_chunk_info(chunk_size as u32, 0) // chunk_count will be updated later
755            .with_pipeline_id(context.pipeline_id.to_string());
756
757        // Clone security context before moving it into ProcessingContext
758        let security_context_for_tasks = context.security_context.clone();
759
760        let mut processing_context = ProcessingContext::new(
761            input_size,
762            context.security_context,
763        );
764
765        // Set input file checksum in metrics
766        {
767            let mut metrics = processing_context.metrics().clone();
768            metrics.set_input_file_info(input_size, Some(original_checksum.clone()));
769            processing_context.update_metrics(metrics);
770        }
771
772        // =============================================================================
773        // CHANNEL-BASED PIPELINE ARCHITECTURE
774        // =============================================================================
775        // This section implements the three-stage execution pipeline using channels
776        // for natural backpressure and lock-free concurrent writes.
777        //
778        // ARCHITECTURE:
779        // Reader Task → [Channel] → CPU Worker Pool → Direct Concurrent Writes
780        //
781        // KEY BENEFITS:
782        // 1. NO MUTEX CONTENTION: Workers write directly using thread-safe
783        //    random-access
784        // 2. NATURAL BACKPRESSURE: Bounded channels prevent memory overload
785        // 3. CLEAR SEPARATION: Reader/Workers have distinct responsibilities
786        // 4. OBSERVABLE: Channel depths reveal bottlenecks
787        // 5. SCALABLE: True parallel writes to non-overlapping file positions
788        //
789        // See: docs/EXECUTION_VS_PROCESSING_PIPELINES.md for architectural details
790
791        // STEP 1: Calculate total number of chunks
792        let total_chunks = (input_size as usize).div_ceil(chunk_size);
793
794        // STEP 2: Create thread-safe writer
795        // Writer uses &self for concurrent writes (no mutex on individual writes!)
796        // But we wrap in Arc for sharing, and Mutex is needed only for finalization
797        let binary_writer = self
798            .binary_format_service
799            .create_writer(output_path, header.clone())
800            .await?;
801        let writer_shared = Arc::new(binary_writer);
802
803        // Create progress indicator for this operation
804        let progress_indicator = Arc::new(ProgressIndicatorService::new(total_chunks as u64));
805
806        // STEP 3: Determine worker count (adaptive or user-specified)
807        let available_cores = std::thread::available_parallelism().map(|n| n.get()).unwrap_or(4);
808        let is_cpu_intensive = pipeline.stages().iter().any(|stage| {
809            matches!(stage.stage_type(), StageType::Checksum)
810                && (stage.name().contains("compression") || stage.name().contains("encryption"))
811        });
812
813        let optimal_worker_count =
814            WorkerCount::optimal_for_processing_type(input_size, available_cores, is_cpu_intensive);
815
816        let worker_count = if let Some(user_workers) = context.user_worker_override {
817            let validated = WorkerCount::validate_user_input(user_workers, available_cores, input_size);
818            match validated {
819                Ok(count) => {
820                    debug!("Using user-specified worker count: {} (validated)", count);
821                    count
822                }
823                Err(warning) => {
824                    warn!(
825                        "User worker count invalid: {}. Using adaptive: {}",
826                        warning,
827                        optimal_worker_count.count()
828                    );
829                    optimal_worker_count.count()
830                }
831            }
832        } else {
833            debug!("Using adaptive worker count: {}", optimal_worker_count.count());
834            optimal_worker_count.count()
835        };
836
837        debug!(
838            "Channel-based pipeline: {} workers for {} bytes ({})",
839            worker_count,
840            input_size,
841            WorkerCount::strategy_description(input_size)
842        );
843
844        // STEP 4: Create cancellation token for graceful shutdown
845        // Educational: Enables graceful cancellation of reader and worker tasks
846        // TODO: Wire this to global ShutdownCoordinator for Ctrl-C handling
847        let shutdown_coordinator =
848            adaptive_pipeline_bootstrap::shutdown::ShutdownCoordinator::new(std::time::Duration::from_secs(5));
849        let cancel_token = shutdown_coordinator.token();
850
851        // STEP 5: Create bounded channels for pipeline stages
852        // Educational: Channel depth creates backpressure to prevent memory overload
853        let channel_depth = context.channel_depth_override.unwrap_or(4);
854        debug!("Using channel depth: {}", channel_depth);
855        let (tx_cpu, rx_cpu) = tokio::sync::mpsc::channel::<ChunkMessage>(channel_depth);
856
857        // STEP 5: Wrap receiver in Arc<Mutex> for sharing among workers
858        // Educational: Multiple workers need to share ONE receiver (MPSC pattern)
859        // This adds some contention, but only on channel receive (not on writes!)
860        let rx_cpu_shared = Arc::new(tokio::sync::Mutex::new(rx_cpu));
861
862        // STEP 6: Spawn reader task
863        // Single reader streams chunks from disk to CPU workers
864        let reader_handle = tokio::spawn(reader_task(
865            input_path.to_path_buf(),
866            chunk_size,
867            tx_cpu,
868            self.file_io_service.clone(),
869            channel_depth,
870            cancel_token.clone(),
871        ));
872
873        // STEP 7: Spawn CPU worker pool
874        // Multiple workers receive chunks, process them, and write directly
875        let mut worker_handles = Vec::new();
876        let pipeline_arc = Arc::new(pipeline.clone());
877
878        for worker_id in 0..worker_count {
879            let rx_cpu_clone = rx_cpu_shared.clone();
880            let writer_clone = writer_shared.clone();
881            let pipeline_clone = pipeline_arc.clone();
882            let stage_executor_clone = self.stage_executor.clone();
883            let input_path_clone = input_path.to_path_buf();
884            let output_path_clone = output_path.to_path_buf();
885            let security_context_clone = security_context_for_tasks.clone();
886            let cancel_token_clone = cancel_token.clone();
887
888            // Each worker shares the receiver via Arc<Mutex>
889            let worker_handle = tokio::spawn(async move {
890                use crate::infrastructure::metrics::CONCURRENCY_METRICS;
891                use crate::infrastructure::runtime::RESOURCE_MANAGER;
892
893                let mut chunks_processed = 0;
894
895                loop {
896                    // Check for cancellation before receiving next chunk
897                    // Educational: Cancellation checked at loop boundary (not in hot path)
898                    // IMPORTANT: We hold the mutex across await in the receive - this is correct!
899                    // It ensures atomic receive from shared receiver (work-stealing pattern)
900                    #[allow(clippy::await_holding_lock)]
901                    let chunk_result = tokio::select! {
902                        _ = cancel_token_clone.cancelled() => {
903                            // Graceful shutdown: exit worker loop
904                            break;
905                        }
906                        // Lock receiver to get next chunk
907                        chunk_msg = async {
908                            let mut rx = rx_cpu_clone.lock().await;
909                            rx.recv().await
910                        } => chunk_msg,
911                    };
912
913                    match chunk_result {
914                        Some(chunk_msg) => {
915                            // Record queue wait time (time chunk spent in channel)
916                            // Educational: High wait times indicate worker saturation
917                            let queue_wait = chunk_msg.enqueued_at.elapsed();
918                            CONCURRENCY_METRICS.record_cpu_queue_wait(queue_wait);
919
920                            // Acquire global CPU token
921                            let cpu_wait_start = std::time::Instant::now();
922                            let _cpu_permit = RESOURCE_MANAGER.acquire_cpu().await.map_err(|e| {
923                                PipelineError::resource_exhausted(format!("Failed to acquire CPU token: {}", e))
924                            })?;
925                            let cpu_wait_duration = cpu_wait_start.elapsed();
926
927                            CONCURRENCY_METRICS.record_cpu_wait(cpu_wait_duration);
928                            CONCURRENCY_METRICS.worker_started();
929
930                            // Create local processing context
931                            let mut local_context = ProcessingContext::new(
932                                input_size,
933                                security_context_clone.clone(),
934                            );
935
936                            // Execute all processing stages
937                            let mut file_chunk = chunk_msg.file_chunk;
938                            for stage in pipeline_clone.stages() {
939                                file_chunk = stage_executor_clone
940                                    .execute(stage, file_chunk, &mut local_context)
941                                    .await
942                                    .map_err(|e| {
943                                        PipelineError::processing_failed(format!("Stage execution failed: {}", e))
944                                    })?;
945                            }
946
947                            // Prepare and write chunk
948                            // Extract nonce from encrypted data if encryption was applied
949                            let (nonce, chunk_data) = if file_chunk.data().len() >= 12 {
950                                let is_encrypted = local_context
951                                    .metadata()
952                                    .get("encrypted")
953                                    .map(|v| v == "true")
954                                    .unwrap_or(false);
955
956                                if is_encrypted {
957                                    let mut nonce_array = [0u8; 12];
958                                    nonce_array.copy_from_slice(&file_chunk.data()[..12]);
959                                    (nonce_array, file_chunk.data()[12..].to_vec())
960                                } else {
961                                    ([0u8; 12], file_chunk.data().to_vec())
962                                }
963                            } else {
964                                ([0u8; 12], file_chunk.data().to_vec())
965                            };
966
967                            let chunk_format = ChunkFormat::new(nonce, chunk_data);
968                            writer_clone
969                                .write_chunk_at_position(chunk_format, chunk_msg.chunk_index as u64)
970                                .await?;
971
972                            CONCURRENCY_METRICS.worker_completed();
973                            chunks_processed += 1;
974                        }
975                        None => {
976                            // Channel closed, exit
977                            break;
978                        }
979                    }
980                }
981
982                Ok::<WorkerStats, PipelineError>(WorkerStats {
983                    worker_id,
984                    chunks_processed,
985                })
986            });
987
988            worker_handles.push(worker_handle);
989        }
990
991        // =============================================================================
992        // STEP 7: WAIT FOR PIPELINE COMPLETION
993        // =============================================================================
994        // Reader → Workers all complete independently, coordinated by channels
995
996        // Wait for reader to finish
997        let reader_stats = reader_handle
998            .await
999            .map_err(|e| PipelineError::processing_failed(format!("Reader task failed: {}", e)))??;
1000
1001        debug!(
1002            "Reader completed: {} chunks read, {} bytes",
1003            reader_stats.chunks_read, reader_stats.bytes_read
1004        );
1005
1006        // Wait for all workers to complete
1007        let mut total_chunks_processed = 0;
1008        for (worker_id, worker_handle) in worker_handles.into_iter().enumerate() {
1009            let worker_stats = worker_handle
1010                .await
1011                .map_err(|e| PipelineError::processing_failed(format!("Worker {} failed: {}", worker_id, e)))??;
1012
1013            debug!(
1014                "Worker {} completed: {} chunks processed",
1015                worker_stats.worker_id, worker_stats.chunks_processed
1016            );
1017            total_chunks_processed += worker_stats.chunks_processed;
1018        }
1019
1020        // =============================================================================
1021        // STEP 8: FINALIZE WRITER
1022        // =============================================================================
1023        // All chunks written, now write footer and finalize
1024
1025        // Finalize writer using &self signature (works perfectly with Arc!)
1026        // Educational: No Arc::try_unwrap needed, just call finalize directly
1027        let _total_bytes_written = writer_shared.finalize(header).await?;
1028
1029        // =============================================================================
1030        // STEP 9: COLLECT METRICS AND COMPLETE
1031        // =============================================================================
1032
1033        // Calculate final metrics from task results
1034        let chunks_processed = total_chunks_processed as u64;
1035        let total_bytes_processed = reader_stats.bytes_read;
1036
1037        // Show completion summary to user
1038        let total_duration = start_time.elapsed();
1039        let throughput = (total_bytes_processed as f64) / total_duration.as_secs_f64() / (1024.0 * 1024.0); // MB/s
1040        progress_indicator
1041            .show_completion(total_bytes_processed, throughput, total_duration)
1042            .await;
1043
1044        // Get the final file size for metrics
1045        let total_output_bytes = tokio::fs::metadata(output_path)
1046            .await
1047            .map_err(|e| PipelineError::io_error(format!("Failed to get output file size: {}", e)))?
1048            .len();
1049
1050        // Record metrics to Prometheus
1051        let mut processing_metrics = processing_context.metrics().clone();
1052        processing_metrics.start();
1053        processing_metrics.update_bytes_processed(total_bytes_processed);
1054        processing_metrics.update_chunks_processed(chunks_processed);
1055        processing_metrics.set_output_file_info(total_output_bytes, None);
1056        processing_metrics.end();
1057
1058        // Single concise completion log
1059        debug!(
1060            "Channel pipeline completed: {} chunks, {:.2} MB/s, {} → {} in {:?}",
1061            chunks_processed,
1062            throughput,
1063            Byte::from_u128(total_bytes_processed as u128)
1064                .unwrap_or_else(|| Byte::from_u64(0))
1065                .get_appropriate_unit(byte_unit::UnitType::Decimal),
1066            Byte::from_u128(total_output_bytes as u128)
1067                .unwrap_or_else(|| Byte::from_u64(0))
1068                .get_appropriate_unit(byte_unit::UnitType::Decimal),
1069            total_duration
1070        );
1071
1072        // Create and return processing metrics
1073        let mut metrics = processing_context.metrics().clone();
1074        metrics.start();
1075        metrics.update_bytes_processed(total_bytes_processed);
1076        metrics.update_chunks_processed(chunks_processed);
1077
1078        // Calculate output file checksum
1079        let output_checksum = {
1080            let output_data = tokio::fs::read(output_path)
1081                .await
1082                .map_err(|e| PipelineError::io_error(e.to_string()))?;
1083            let digest = ring::digest::digest(&ring::digest::SHA256, &output_data);
1084            hex::encode(digest.as_ref())
1085        };
1086
1087        // Set the actual output file size and checksum
1088        metrics.set_output_file_info(total_output_bytes, Some(output_checksum));
1089        metrics.end();
1090
1091        // Notify observer that processing completed with final metrics
1092        if let Some(obs) = &context.observer {
1093            obs.on_processing_completed(total_duration, Some(&metrics)).await;
1094        }
1095
1096        Ok(metrics)
1097    }
1098
1099    async fn process_chunks(
1100        &self,
1101        pipeline: &Pipeline,
1102        chunks: Vec<FileChunk>,
1103        context: &mut ProcessingContext,
1104    ) -> Result<Vec<FileChunk>, PipelineError> {
1105        let mut processed_chunks = chunks;
1106
1107        for stage in pipeline.stages() {
1108            info!("Processing through stage: {}", stage.name());
1109            let stage_start = std::time::Instant::now();
1110
1111            // Process chunks in parallel within this stage
1112            // Note: Each chunk gets a cloned context since we're processing in parallel
1113            let futures: Vec<_> = processed_chunks
1114                .into_iter()
1115                .map(|chunk| {
1116                    let mut ctx = context.clone();
1117                    async move { self.process_chunk_through_stage(chunk, stage, &mut ctx).await }
1118                })
1119                .collect();
1120
1121            processed_chunks = future::try_join_all(futures).await?;
1122
1123            let stage_duration = stage_start.elapsed();
1124            self.update_metrics(context, stage.name(), stage_duration);
1125
1126            info!("Completed stage {} in {:?}", stage.name(), stage_duration);
1127        }
1128
1129        Ok(processed_chunks)
1130    }
1131
1132    async fn validate_pipeline(&self, pipeline: &Pipeline) -> Result<(), PipelineError> {
1133        debug!("Validating pipeline: {}", pipeline.id());
1134
1135        // Check if pipeline has stages
1136        if pipeline.stages().is_empty() {
1137            return Err(PipelineError::InvalidConfiguration(
1138                "Pipeline has no stages".to_string(),
1139            ));
1140        }
1141
1142        // Validate each stage
1143        for stage in pipeline.stages() {
1144            // Check stage configuration
1145            if stage.configuration().algorithm.is_empty() {
1146                return Err(PipelineError::InvalidConfiguration(format!(
1147                    "Stage '{}' has no algorithm specified",
1148                    stage.name()
1149                )));
1150            }
1151
1152            // Check stage compatibility
1153            if let Err(e) = stage.validate() {
1154                return Err(PipelineError::InvalidConfiguration(format!(
1155                    "Stage '{}' validation failed: {}",
1156                    stage.name(),
1157                    e
1158                )));
1159            }
1160        }
1161
1162        // Validate stage ordering (PreBinary must come before PostBinary)
1163        debug!("Validating stage ordering...");
1164        self.stage_executor.validate_stage_ordering(pipeline.stages()).await?;
1165
1166        debug!("Pipeline validation passed");
1167        Ok(())
1168    }
1169
1170    async fn estimate_processing_time(
1171        &self,
1172        pipeline: &Pipeline,
1173        file_size: u64,
1174    ) -> Result<std::time::Duration, PipelineError> {
1175        let mut total_seconds = 0.0;
1176        let file_size_mb = (file_size as f64) / (1024.0 * 1024.0);
1177
1178        for stage in pipeline.stages() {
1179            // Estimate based on stage type and file size
1180            let stage_seconds = match stage.stage_type() {
1181                adaptive_pipeline_domain::entities::StageType::Compression => file_size_mb / 50.0, // 50 MB/s
1182                adaptive_pipeline_domain::entities::StageType::Encryption => file_size_mb / 100.0, // 100 MB/s
1183                _ => file_size_mb / 200.0,
1184                /* 200 MB/s for other
1185                 * operations */
1186            };
1187            total_seconds += stage_seconds;
1188        }
1189
1190        Ok(std::time::Duration::from_secs_f64(total_seconds))
1191    }
1192
1193    async fn get_resource_requirements(
1194        &self,
1195        pipeline: &Pipeline,
1196        file_size: u64,
1197    ) -> Result<ResourceRequirements, PipelineError> {
1198        let mut total_memory_mb = 0.0;
1199        let mut total_cpu_cores = 0;
1200        let mut estimated_time_seconds = 0.0;
1201
1202        for stage in pipeline.stages() {
1203            // Estimate memory usage based on stage type and chunk size
1204            let chunk_size = stage.configuration().chunk_size.unwrap_or(1024 * 1024) as f64;
1205            let stage_memory = (chunk_size / (1024.0 * 1024.0)) * 2.0; // Estimate 2x chunk size for processing
1206            total_memory_mb += stage_memory;
1207
1208            // Estimate CPU cores needed
1209            if stage.configuration().parallel_processing {
1210                total_cpu_cores = total_cpu_cores.max(4); // Assume 4 cores for
1211                                                          // parallel stages
1212            } else {
1213                total_cpu_cores = total_cpu_cores.max(1);
1214            }
1215
1216            // Estimate processing time
1217            let throughput_mbps = match stage.stage_type() {
1218                adaptive_pipeline_domain::entities::StageType::Compression => 50.0,
1219                adaptive_pipeline_domain::entities::StageType::Encryption => 100.0,
1220                _ => 200.0,
1221            };
1222
1223            let file_size_mb = (file_size as f64) / (1024.0 * 1024.0);
1224            estimated_time_seconds += file_size_mb / throughput_mbps;
1225        }
1226
1227        Ok(ResourceRequirements {
1228            memory_bytes: (total_memory_mb * 1024.0 * 1024.0) as u64,
1229            cpu_cores: total_cpu_cores,
1230            disk_space_bytes: ((file_size as f64) * 2.0) as u64, // Estimate 2x file size
1231            network_bandwidth_bps: None,                         // Not applicable for local processing
1232            gpu_memory_bytes: None,                              // Not implemented yet
1233            estimated_duration: std::time::Duration::from_secs_f64(estimated_time_seconds),
1234        })
1235    }
1236
1237    async fn create_optimized_pipeline(
1238        &self,
1239        file_path: &std::path::Path,
1240        requirements: PipelineRequirements,
1241    ) -> Result<Pipeline, PipelineError> {
1242        let file_extension = file_path.extension().and_then(|ext| ext.to_str()).unwrap_or("");
1243
1244        let pipeline_name = format!("optimized_pipeline_{}", uuid::Uuid::new_v4());
1245        let mut stages = Vec::new();
1246
1247        // Add compression stage if requested
1248        if requirements.compression_required {
1249            let algorithm = match file_extension {
1250                "txt" | "log" | "csv" | "json" | "xml" | "html" => "brotli",
1251                "bin" | "exe" | "dll" => "zstd",
1252                _ => "brotli", // Default to Brotli
1253            };
1254
1255            let compression_config = adaptive_pipeline_domain::entities::StageConfiguration {
1256                algorithm: algorithm.to_string(),
1257                operation: adaptive_pipeline_domain::entities::Operation::Forward,
1258                parameters: std::collections::HashMap::new(),
1259                parallel_processing: requirements.parallel_processing,
1260                chunk_size: Some(1024 * 1024), // Default 1MB chunks
1261            };
1262
1263            let compression_stage = adaptive_pipeline_domain::entities::PipelineStage::new(
1264                "compression".to_string(),
1265                adaptive_pipeline_domain::entities::StageType::Compression,
1266                compression_config,
1267                stages.len() as u32,
1268            )?;
1269
1270            stages.push(compression_stage);
1271        }
1272
1273        // Add encryption stage if requested
1274        if requirements.encryption_required {
1275            let encryption_config = adaptive_pipeline_domain::entities::StageConfiguration {
1276                algorithm: "aes256-gcm".to_string(),
1277                operation: adaptive_pipeline_domain::entities::Operation::Forward,
1278                parameters: std::collections::HashMap::new(),
1279                parallel_processing: requirements.parallel_processing,
1280                chunk_size: Some(1024 * 1024), // Default 1MB chunks
1281            };
1282
1283            let encryption_stage = adaptive_pipeline_domain::entities::PipelineStage::new(
1284                "encryption".to_string(),
1285                adaptive_pipeline_domain::entities::StageType::Encryption,
1286                encryption_config,
1287                stages.len() as u32,
1288            )?;
1289
1290            stages.push(encryption_stage);
1291        }
1292
1293        Pipeline::new(pipeline_name, stages)
1294    }
1295
1296    async fn monitor_execution(
1297        &self,
1298        pipeline_id: PipelineId,
1299        context: &ProcessingContext,
1300    ) -> Result<ExecutionStatus, PipelineError> {
1301        let active_pipelines = self.active_pipelines.read().await;
1302
1303        if let Some(_aggregate) = active_pipelines.get(&pipeline_id.to_string()) {
1304            Ok(ExecutionStatus {
1305                pipeline_id,
1306                status: ExecutionState::Running,
1307                progress_percentage: 0.0,
1308                bytes_processed: 0,
1309                bytes_total: 0,
1310                current_stage: Some("unknown".to_string()),
1311                estimated_remaining_time: None,
1312                error_count: 0,
1313                warning_count: 0,
1314                started_at: chrono::Utc::now(),
1315                updated_at: chrono::Utc::now(),
1316            })
1317        } else {
1318            Err(PipelineError::PipelineNotFound(pipeline_id.to_string()))
1319        }
1320    }
1321
1322    async fn pause_execution(&self, pipeline_id: PipelineId) -> Result<(), PipelineError> {
1323        info!("Pipeline {} paused", pipeline_id);
1324        Ok(())
1325    }
1326
1327    async fn resume_execution(&self, pipeline_id: PipelineId) -> Result<(), PipelineError> {
1328        info!("Pipeline {} resumed", pipeline_id);
1329        Ok(())
1330    }
1331
1332    async fn cancel_execution(&self, pipeline_id: PipelineId) -> Result<(), PipelineError> {
1333        let mut active_pipelines = self.active_pipelines.write().await;
1334
1335        if active_pipelines.remove(&pipeline_id.to_string()).is_some() {
1336            info!("Pipeline {} cancelled", pipeline_id);
1337            Ok(())
1338        } else {
1339            Err(PipelineError::PipelineNotFound(pipeline_id.to_string()))
1340        }
1341    }
1342
1343    async fn get_execution_history(
1344        &self,
1345        pipeline_id: PipelineId,
1346        _limit: Option<usize>,
1347    ) -> Result<Vec<ExecutionRecord>, PipelineError> {
1348        // In a real implementation, this would query a database
1349        // For now, return empty history
1350        Ok(Vec::new())
1351    }
1352}
1353
1354/// ChunkProcessor implementation that processes chunks through a pipeline
1355pub struct PipelineChunkProcessor {
1356    pipeline: Pipeline,
1357    stage_executor: Arc<dyn StageExecutor>,
1358}
1359
1360impl PipelineChunkProcessor {
1361    pub fn new(pipeline: Pipeline, stage_executor: Arc<dyn StageExecutor>) -> Self {
1362        Self {
1363            pipeline,
1364            stage_executor,
1365        }
1366    }
1367}
1368
1369// NOTE: PipelineChunkProcessor cannot implement the sync ChunkProcessor trait
1370// because it coordinates async operations (stage_executor.execute is async).
1371// This is an application-level service that orchestrates multiple async
1372// operations, not a CPU-bound chunk processor.
1373//
1374// The ChunkProcessor trait is for sync, CPU-bound processing (compression,
1375// encryption, etc.) Pipeline orchestration involves async I/O and should use
1376// application-level patterns instead.
1377//
1378// TODO: If needed, create a separate async pipeline processing interface in the
1379// application layer.
1380
1381// Removed ChunkProcessor implementation - architectural mismatch
1382// impl ChunkProcessor for PipelineChunkProcessor {
1383//     ...
1384// }
1385// Orphaned methods removed (were part of ChunkProcessor trait implementation)
1386
1387#[cfg(test)]
1388mod tests {
1389    use super::*;
1390    use crate::infrastructure::adapters::{MultiAlgoCompression, MultiAlgoEncryption};
1391    use crate::infrastructure::repositories::sqlite_pipeline::SqlitePipelineRepository;
1392    use crate::infrastructure::runtime::stage_executor::BasicStageExecutor;
1393    use adaptive_pipeline_domain::entities::pipeline::Pipeline;
1394    use adaptive_pipeline_domain::entities::security_context::SecurityContext;
1395    use adaptive_pipeline_domain::value_objects::binary_file_format::{
1396        FileHeader, CURRENT_FORMAT_VERSION, MAGIC_BYTES,
1397    };
1398    use std::path::PathBuf;
1399    use tempfile::TempDir;
1400    use tokio::fs;
1401
1402    /// Tests pipeline creation for database operations.
1403    ///
1404    /// This test validates that pipelines can be created with proper
1405    /// configuration for database storage and retrieval operations,
1406    /// including stage creation and pipeline assembly.
1407    ///
1408    /// # Test Coverage
1409    ///
1410    /// - Pipeline stage creation with compression configuration
1411    /// - Pipeline assembly with multiple stages
1412    /// - Stage configuration validation
1413    /// - Pipeline metadata verification
1414    /// - Database-ready pipeline preparation
1415    ///
1416    /// # Test Scenario
1417    ///
1418    /// Creates a compression stage with specific configuration and
1419    /// assembles it into a pipeline suitable for database operations.
1420    ///
1421    /// # Infrastructure Concerns
1422    ///
1423    /// - Pipeline creation for database persistence
1424    /// - Stage configuration and validation
1425    /// - Pipeline metadata management
1426    /// - Database integration preparation
1427    ///
1428    /// # Assertions
1429    ///
1430    /// - Compression stage creation succeeds
1431    /// - Pipeline creation succeeds
1432    /// - Pipeline has non-empty name
1433    /// - Pipeline contains expected number of stages
1434    #[test]
1435    fn test_pipeline_creation_for_database() {
1436        // Test basic pipeline creation that would be used in database operations
1437        println!("Testing pipeline creation for database operations");
1438
1439        // Test that we can create a simple pipeline for database operations
1440        let compression_stage = adaptive_pipeline_domain::entities::PipelineStage::new(
1441            "compression".to_string(),
1442            StageType::Compression,
1443            adaptive_pipeline_domain::entities::pipeline_stage::StageConfiguration {
1444                algorithm: "brotli".to_string(),
1445                operation: adaptive_pipeline_domain::entities::Operation::Forward,
1446                parameters: std::collections::HashMap::new(),
1447                parallel_processing: false,
1448                chunk_size: Some(1024),
1449            },
1450            1,
1451        )
1452        .unwrap();
1453        println!("✅ Created compression stage");
1454
1455        // Just test that we can create a pipeline - no complex assertions
1456        let test_pipeline = Pipeline::new("test-database-integration".to_string(), vec![compression_stage]).unwrap();
1457        println!("✅ Created test pipeline with {} stages", test_pipeline.stages().len());
1458
1459        // Basic sanity checks
1460        assert!(!test_pipeline.name().is_empty());
1461        assert!(!test_pipeline.stages().is_empty());
1462
1463        println!("✅ Pipeline creation test passed!");
1464    }
1465
1466    /// Tests database path handling and URL generation.
1467    ///
1468    /// This test validates that the service can properly handle
1469    /// database file paths, generate SQLite connection URLs,
1470    /// and prepare pipelines for database operations.
1471    ///
1472    /// # Test Coverage
1473    ///
1474    /// - Temporary database file creation
1475    /// - SQLite URL generation and formatting
1476    /// - Database schema file loading
1477    /// - Pipeline creation for database operations
1478    /// - Database preparation validation
1479    ///
1480    /// # Test Scenario
1481    ///
1482    /// Creates a temporary database file, generates a SQLite URL,
1483    /// loads the database schema, and creates a pipeline ready
1484    /// for database operations.
1485    ///
1486    /// # Infrastructure Concerns
1487    ///
1488    /// - Database file path management
1489    /// - SQLite connection URL formatting
1490    /// - Database schema loading and validation
1491    /// - Pipeline-database integration preparation
1492    ///
1493    /// # Assertions
1494    ///
1495    /// - Database URL has correct SQLite prefix
1496    /// - URL contains expected database filename
1497    /// - Schema file loads successfully
1498    /// - Schema contains CREATE TABLE statements
1499    /// - Pipeline is created and ready for database operations
1500    #[test]
1501    fn test_database_path_and_url_generation() {
1502        // Test database path handling and URL generation without async operations
1503        println!("Testing database path and URL generation");
1504
1505        // Create temporary directory for test files
1506        let temp_dir = TempDir::new().unwrap();
1507        let db_file = temp_dir.path().join("test_pipeline.db");
1508        let db_path = db_file.to_str().unwrap();
1509
1510        println!("📁 Creating temporary database path: {}", db_path);
1511
1512        // Test database URL generation
1513        let database_url = format!("sqlite:{}", db_path);
1514        println!("🔗 Generated database URL: {}", database_url);
1515
1516        // Verify URL format
1517        assert!(database_url.starts_with("sqlite:"));
1518        assert!(database_url.contains("test_pipeline.db"));
1519
1520        // Test that we can read the schema file
1521        let schema_sql = include_str!("../../../scripts/test_data/create_fresh_structured_database.sql");
1522        println!("📝 Schema file loaded: {} characters", schema_sql.len());
1523        assert!(!schema_sql.is_empty());
1524        assert!(schema_sql.contains("CREATE TABLE"));
1525
1526        // Test pipeline creation for database operations
1527        let compression_stage = adaptive_pipeline_domain::entities::PipelineStage::new(
1528            "compression".to_string(),
1529            StageType::Compression,
1530            adaptive_pipeline_domain::entities::pipeline_stage::StageConfiguration {
1531                algorithm: "brotli".to_string(),
1532                operation: adaptive_pipeline_domain::entities::Operation::Forward,
1533                parameters: std::collections::HashMap::new(),
1534                parallel_processing: false,
1535                chunk_size: Some(1024),
1536            },
1537            1,
1538        )
1539        .unwrap();
1540
1541        let test_pipeline = Pipeline::new("test-database-operations".to_string(), vec![compression_stage]).unwrap();
1542        println!(
1543            "✅ Created test pipeline: {} with {} stages",
1544            test_pipeline.name(),
1545            test_pipeline.stages().len()
1546        );
1547
1548        // Verify pipeline is ready for database operations
1549        assert!(!test_pipeline.name().is_empty());
1550        assert!(!test_pipeline.stages().is_empty());
1551        assert!(!test_pipeline.id().to_string().is_empty());
1552
1553        println!("✅ Database preparation test passed!");
1554    }
1555
1556    /// Tests cancellation propagation to reader task.
1557    ///
1558    /// This test validates that when a cancellation token is triggered,
1559    /// the reader task stops gracefully and returns a cancellation error.
1560    ///
1561    /// # Test Coverage
1562    ///
1563    /// - Cancellation token creation and triggering
1564    /// - Reader task cancellation detection
1565    /// - Graceful shutdown of reader
1566    /// - Cancellation error propagation
1567    ///
1568    /// # Test Scenario
1569    ///
1570    /// 1. Create a test file with data
1571    /// 2. Start reader task with cancellation token
1572    /// 3. Trigger cancellation immediately
1573    /// 4. Verify reader stops with cancellation error
1574    #[tokio::test]
1575    async fn test_reader_task_cancellation() {
1576        use crate::infrastructure::adapters::file_io::TokioFileIO;
1577        use adaptive_pipeline_bootstrap::shutdown::ShutdownCoordinator;
1578        use adaptive_pipeline_domain::services::file_io_service::FileIOConfig;
1579        use std::time::Duration;
1580
1581        // Create test file
1582        let temp_dir = TempDir::new().unwrap();
1583        let input_file = temp_dir.path().join("test_input.txt");
1584        fs::write(&input_file, b"test data for cancellation").await.unwrap();
1585
1586        // Create channel and cancellation token
1587        let (tx, _rx) = tokio::sync::mpsc::channel(10);
1588        let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
1589        let cancel_token = coordinator.token();
1590
1591        // Cancel immediately
1592        cancel_token.cancel();
1593
1594        // Start reader task (should detect cancellation and exit)
1595        let file_io = Arc::new(TokioFileIO::new(FileIOConfig::default())) as Arc<dyn FileIOService>;
1596        let result = reader_task(input_file, 1024, tx, file_io, 10, cancel_token).await;
1597
1598        // Verify cancellation error
1599        assert!(result.is_err());
1600        let err = result.unwrap_err();
1601        assert!(
1602            err.to_string().contains("cancel"),
1603            "Expected cancellation error, got: {}",
1604            err
1605        );
1606    }
1607
1608    /// Tests cancellation propagation during active processing.
1609    ///
1610    /// This test validates that when cancellation is triggered while
1611    /// processing is in progress, all tasks stop gracefully.
1612    ///
1613    /// # Test Coverage
1614    ///
1615    /// - Cancellation during active file processing
1616    /// - Graceful shutdown of reader and workers
1617    /// - Channel cleanup on cancellation
1618    /// - No resource leaks on cancellation
1619    ///
1620    /// # Test Scenario
1621    ///
1622    /// 1. Create a larger test file
1623    /// 2. Start processing with cancellation token
1624    /// 3. Trigger cancellation during processing
1625    /// 4. Verify all tasks stop gracefully
1626    #[tokio::test]
1627    async fn test_cancellation_during_processing() {
1628        use crate::infrastructure::adapters::file_io::TokioFileIO;
1629        use crate::infrastructure::runtime::{init_resource_manager, ResourceConfig};
1630        use adaptive_pipeline_bootstrap::shutdown::ShutdownCoordinator;
1631        use adaptive_pipeline_domain::services::file_io_service::FileIOConfig;
1632        use std::time::Duration;
1633
1634        // Initialize resource manager for test (required by CONCURRENCY_METRICS)
1635        let _ = init_resource_manager(ResourceConfig::default());
1636
1637        // Create a larger test file to ensure processing takes time
1638        let temp_dir = TempDir::new().unwrap();
1639        let input_file = temp_dir.path().join("large_input.txt");
1640        let test_data = vec![b'X'; 1024 * 100]; // 100KB
1641        fs::write(&input_file, &test_data).await.unwrap();
1642
1643        // Create channel and cancellation token
1644        let (tx, mut rx) = tokio::sync::mpsc::channel::<ChunkMessage>(5);
1645        let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
1646        let cancel_token = coordinator.token();
1647        let cancel_clone = cancel_token.clone();
1648
1649        // Spawn reader task
1650        let file_io = Arc::new(TokioFileIO::new(FileIOConfig::default())) as Arc<dyn FileIOService>;
1651        let reader_handle =
1652            tokio::spawn(async move { reader_task(input_file, 1024, tx, file_io, 5, cancel_clone).await });
1653
1654        // Let some chunks be sent
1655        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1656
1657        // Trigger cancellation
1658        cancel_token.cancel();
1659
1660        // Reader should exit with cancellation error
1661        let reader_result = reader_handle.await.unwrap();
1662        assert!(reader_result.is_err());
1663
1664        // Channel should be closed (no more messages)
1665        // Drain any remaining messages
1666        while rx.try_recv().is_ok() {}
1667
1668        // Verify channel is now empty and closed
1669        assert!(rx.recv().await.is_none(), "Channel should be closed after cancellation");
1670    }
1671
1672    /// Tests that cancelled workers exit gracefully.
1673    ///
1674    /// This test validates that worker tasks respect cancellation
1675    /// and exit their processing loop cleanly.
1676    ///
1677    /// # Test Coverage
1678    ///
1679    /// - Worker task cancellation detection
1680    /// - Graceful exit from worker loop
1681    /// - No panics on cancellation
1682    /// - Resource cleanup in workers
1683    #[tokio::test]
1684    async fn test_worker_cancellation() {
1685        use adaptive_pipeline_bootstrap::shutdown::ShutdownCoordinator;
1686        use std::time::Duration;
1687
1688        // Create a channel that will receive chunks
1689        let (_tx, rx) = tokio::sync::mpsc::channel::<ChunkMessage>(10);
1690        let rx_shared = Arc::new(tokio::sync::Mutex::new(rx));
1691
1692        let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
1693        let cancel_token = coordinator.token();
1694        let cancel_clone = cancel_token.clone();
1695
1696        // Spawn worker that will wait for chunks or cancellation
1697        let worker_handle = tokio::spawn(async move {
1698            loop {
1699                let mut rx_lock = rx_shared.lock().await;
1700
1701                tokio::select! {
1702                    _ = cancel_clone.cancelled() => {
1703                        // Graceful shutdown: exit worker loop
1704                        break;
1705                    }
1706                    _chunk_msg = rx_lock.recv() => {
1707                        continue;
1708                    }
1709                };
1710
1711                #[allow(unreachable_code)]
1712                {}
1713            }
1714            Ok::<(), PipelineError>(())
1715        });
1716
1717        // Give worker time to start
1718        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1719
1720        // Trigger cancellation
1721        cancel_token.cancel();
1722
1723        // Worker should exit cleanly
1724        let result = tokio::time::timeout(tokio::time::Duration::from_secs(1), worker_handle).await;
1725
1726        assert!(result.is_ok(), "Worker should exit within timeout");
1727        let worker_result = result.unwrap().unwrap();
1728        assert!(worker_result.is_ok(), "Worker should exit without error");
1729    }
1730
1731    /// Tests early cancellation before processing starts.
1732    ///
1733    /// This test validates that if cancellation is triggered before
1734    /// processing begins, the system detects it and aborts cleanly.
1735    ///
1736    /// # Test Coverage
1737    ///
1738    /// - Pre-processing cancellation detection
1739    /// - Early abort mechanism
1740    /// - No resource allocation on early cancel
1741    /// - Clean error propagation
1742    #[tokio::test]
1743    async fn test_early_cancellation_detection() {
1744        use crate::infrastructure::adapters::file_io::TokioFileIO;
1745        use adaptive_pipeline_bootstrap::shutdown::ShutdownCoordinator;
1746        use adaptive_pipeline_domain::services::file_io_service::FileIOConfig;
1747        use std::time::Duration;
1748
1749        let temp_dir = TempDir::new().unwrap();
1750        let input_file = temp_dir.path().join("input.txt");
1751        fs::write(&input_file, b"data").await.unwrap();
1752
1753        let (tx, _rx) = tokio::sync::mpsc::channel(10);
1754        let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
1755        let cancel_token = coordinator.token();
1756
1757        // Cancel BEFORE starting any work
1758        cancel_token.cancel();
1759        assert!(cancel_token.is_cancelled(), "Token should be cancelled");
1760
1761        // Attempt to start reader
1762        let file_io = Arc::new(TokioFileIO::new(FileIOConfig::default())) as Arc<dyn FileIOService>;
1763        let result = reader_task(input_file, 1024, tx, file_io, 10, cancel_token).await;
1764
1765        // Should immediately return cancellation error
1766        assert!(result.is_err());
1767        assert!(result.unwrap_err().to_string().contains("cancel"));
1768    }
1769
1770    /// Tests cancellation token cloning and propagation.
1771    ///
1772    /// This test validates that cancellation tokens can be cloned
1773    /// and all clones observe the cancellation state.
1774    ///
1775    /// # Test Coverage
1776    ///
1777    /// - Token cloning behavior
1778    /// - Cancellation propagation to clones
1779    /// - Shared state consistency
1780    /// - Multiple task coordination
1781    #[tokio::test]
1782    async fn test_cancellation_token_propagation() {
1783        use adaptive_pipeline_bootstrap::shutdown::ShutdownCoordinator;
1784        use std::time::Duration;
1785
1786        let coordinator = ShutdownCoordinator::new(Duration::from_secs(5));
1787        let token = coordinator.token();
1788        let clone1 = token.clone();
1789        let clone2 = token.clone();
1790
1791        // None should be cancelled initially
1792        assert!(!token.is_cancelled());
1793        assert!(!clone1.is_cancelled());
1794        assert!(!clone2.is_cancelled());
1795
1796        // Cancel original
1797        token.cancel();
1798
1799        // All clones should see cancellation
1800        assert!(token.is_cancelled());
1801        assert!(clone1.is_cancelled());
1802        assert!(clone2.is_cancelled());
1803
1804        // All should unblock from cancelled()
1805        tokio::time::timeout(tokio::time::Duration::from_millis(100), clone1.cancelled())
1806            .await
1807            .unwrap();
1808
1809        tokio::time::timeout(tokio::time::Duration::from_millis(100), clone2.cancelled())
1810            .await
1811            .unwrap();
1812    }
1813}