adaptive_pipeline_domain/services/
pipeline_service.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Pipeline Service Interface
9//!
10//! This module defines the domain service interface for pipeline processing
11//! operations within the adaptive pipeline system. It provides abstractions for
12//! executing processing pipelines, managing pipeline lifecycle, and monitoring
13//! execution.
14//!
15//! ## Overview
16//!
17//! The pipeline service provides:
18//!
19//! - **Pipeline Execution**: Execute configured processing pipelines
20//! - **Lifecycle Management**: Manage pipeline creation, execution, and cleanup
21//! - **Progress Monitoring**: Real-time progress tracking and reporting
22//! - **Event Handling**: Comprehensive event system for pipeline operations
23//! - **Resource Management**: Efficient resource allocation and management
24//!
25//! ## Architecture
26//!
27//! The service follows Domain-Driven Design principles:
28//!
29//! - **Domain Interface**: `PipelineService` trait defines the contract
30//! - **Event System**: Observer pattern for pipeline events
31//! - **Processing Context**: Maintains state throughout pipeline execution
32//! - **Security Integration**: Integrated security context and validation
33//!
34//! ## Key Features
35//!
36//! ### Pipeline Execution
37//!
38//! - **Stage Orchestration**: Coordinate execution of pipeline stages
39//! - **Data Flow**: Manage data flow between processing stages
40//! - **Error Handling**: Comprehensive error handling and recovery
41//! - **Resource Allocation**: Intelligent resource allocation and cleanup
42//!
43//! ### Progress Monitoring
44//!
45//! - **Real-time Updates**: Live progress updates during execution
46//! - **Performance Metrics**: Detailed performance metrics and statistics
47//! - **Event Notifications**: Comprehensive event system for monitoring
48//! - **Throughput Tracking**: Track processing throughput and efficiency
49//!
50//! ### Security Integration
51//!
52//! - **Security Context**: Integrated security context validation
53//! - **Access Control**: Enforce access control policies
54//! - **Audit Logging**: Comprehensive audit trail of operations
55//! - **Compliance**: Support for regulatory compliance requirements
56//!
57//! ## Usage Examples
58//!
59//! ### Basic Pipeline Execution
60
61//!
62//! ### Pipeline with Observer
63
64//!
65//! ### Batch Pipeline Processing
66
67//!
68//! ## Event System
69//!
70//! ### Processing Observer
71//!
72//! The `ProcessingObserver` trait provides hooks for monitoring pipeline
73//! execution:
74//!
75//! - **Chunk Events**: Track individual chunk processing
76//! - **Progress Updates**: Receive periodic progress updates
77//! - **Lifecycle Events**: Monitor pipeline start and completion
78//! - **Error Events**: Handle processing errors and failures
79//!
80//! ### Event Types
81//!
82//! - **on_processing_started**: Called when pipeline execution begins
83//! - **on_chunk_started**: Called when a chunk begins processing
84//! - **on_chunk_completed**: Called when a chunk completes processing
85//! - **on_progress_update**: Called periodically with progress information
86//! - **on_processing_completed**: Called when pipeline execution completes
87//! - **on_error**: Called when errors occur during processing
88//!
89//! ## Processing Context
90//!
91//! ### Context Management
92//!
93//! - **State Tracking**: Track processing state throughout execution
94//! - **Resource Management**: Manage resources and cleanup
95//! - **Security Context**: Maintain security context and validation
96//! - **Metrics Collection**: Collect processing metrics and statistics
97//!
98//! ### Context Lifecycle
99//!
100//! - **Initialization**: Initialize context before processing
101//! - **Stage Transitions**: Update context between processing stages
102//! - **Error Handling**: Maintain context during error conditions
103//! - **Cleanup**: Clean up context after processing completion
104//!
105//! ## Security Integration
106//!
107//! ### Security Context
108//!
109//! - **Authentication**: Verify user authentication and authorization
110//! - **Access Control**: Enforce access control policies
111//! - **Audit Logging**: Log all security-relevant operations
112//! - **Compliance**: Support regulatory compliance requirements
113//!
114//! ### Security Validation
115//!
116//! - **Input Validation**: Validate all input parameters
117//! - **Permission Checks**: Verify permissions for file operations
118//! - **Resource Limits**: Enforce resource usage limits
119//! - **Threat Detection**: Detect and prevent security threats
120//!
121//! ## Error Handling
122//!
123//! ### Error Categories
124//!
125//! - **Configuration Errors**: Invalid pipeline configuration
126//! - **Processing Errors**: Errors during pipeline execution
127//! - **Security Errors**: Security violations and access denials
128//! - **Resource Errors**: Resource exhaustion and allocation failures
129//!
130//! ### Recovery Strategies
131//!
132//! - **Retry Logic**: Automatic retry for transient failures
133//! - **Fallback Processing**: Alternative processing strategies
134//! - **Partial Results**: Return partial results when possible
135//! - **Graceful Degradation**: Graceful handling of service failures
136//!
137//! ## Performance Considerations
138//!
139//! ### Execution Optimization
140//!
141//! - **Parallel Processing**: Parallel execution of pipeline stages
142//! - **Resource Pooling**: Efficient resource pooling and reuse
143//! - **Memory Management**: Optimized memory usage and cleanup
144//! - **I/O Optimization**: Efficient file I/O operations
145//!
146//! ### Monitoring and Metrics
147//!
148//! - **Performance Metrics**: Detailed performance monitoring
149//! - **Resource Usage**: Track resource utilization
150//! - **Bottleneck Detection**: Identify performance bottlenecks
151//! - **Optimization Recommendations**: Suggest performance improvements
152//!
153//! ## Integration
154//!
155//! The pipeline service integrates with:
156//!
157//! - **Pipeline Repository**: Load and manage pipeline configurations
158//! - **File Processor**: Execute file processing operations
159//! - **Security Service**: Validate security context and permissions
160//! - **Metrics Service**: Report processing metrics and statistics
161//!
162//! ## Thread Safety
163//!
164//! The service interface is designed for thread safety:
165//!
166//! - **Concurrent Execution**: Safe concurrent pipeline execution
167//! - **Shared Resources**: Safe sharing of pipeline resources
168//! - **State Management**: Thread-safe state management
169//!
170//! ## Future Enhancements
171//!
172//! Planned enhancements include:
173//!
174//! - **Distributed Execution**: Support for distributed pipeline execution
175//! - **Dynamic Scaling**: Automatic scaling based on workload
176//! - **Advanced Scheduling**: Sophisticated pipeline scheduling
177//! - **Machine Learning**: ML-based optimization and prediction
178
179use crate::entities::security_context::SecurityLevel;
180use crate::entities::{Pipeline, ProcessingContext, SecurityContext};
181use crate::repositories::stage_executor::ResourceRequirements;
182use crate::services::datetime_serde;
183use crate::value_objects::{FileChunk, PipelineId};
184use crate::{PipelineError, ProcessingMetrics};
185use async_trait::async_trait;
186use serde::{Deserialize, Serialize};
187use std::path::Path;
188use std::sync::Arc;
189
190/// Observer trait for pipeline processing events
191///
192/// This trait defines the interface for observing pipeline processing events,
193/// enabling real-time monitoring, progress tracking, and event handling during
194/// pipeline execution.
195///
196/// # Key Features
197///
198/// - **Event Notifications**: Receive notifications for various pipeline events
199/// - **Progress Monitoring**: Track processing progress in real-time
200/// - **Performance Metrics**: Access detailed performance metrics
201/// - **Error Handling**: Handle processing errors and failures
202/// - **Lifecycle Management**: Monitor pipeline lifecycle events
203///
204/// # Event Types
205///
206/// - **Chunk Events**: Individual chunk processing start/completion
207/// - **Progress Events**: Periodic progress updates with throughput
208/// - **Lifecycle Events**: Pipeline start/completion events
209/// - **Error Events**: Processing errors and failure notifications
210///
211/// # Examples
212#[async_trait]
213pub trait ProcessingObserver: Send + Sync {
214    /// Called when a chunk starts processing
215    async fn on_chunk_started(&self, _chunk_id: u64, _size: usize) {}
216
217    /// Called when a chunk completes processing
218    async fn on_chunk_completed(&self, _chunk_id: u64, _duration: std::time::Duration) {}
219
220    /// Called periodically with progress updates
221    async fn on_progress_update(&self, _bytes_processed: u64, _total_bytes: u64, _throughput_mbps: f64) {}
222
223    /// Called when processing starts
224    async fn on_processing_started(&self, _total_bytes: u64) {}
225
226    /// Called when processing completes
227    async fn on_processing_completed(
228        &self,
229        _total_duration: std::time::Duration,
230        _final_metrics: Option<&ProcessingMetrics>,
231    ) {
232    }
233}
234
235/// Configuration for processing a file through a pipeline
236///
237/// Groups related parameters to avoid excessive function arguments.
238/// This context is passed to `PipelineService::process_file`.
239#[derive(Clone)]
240pub struct ProcessFileContext {
241    /// Pipeline identifier
242    pub pipeline_id: PipelineId,
243    /// Security context for processing
244    pub security_context: SecurityContext,
245    /// Optional override for number of worker threads
246    pub user_worker_override: Option<usize>,
247    /// Optional override for channel depth
248    pub channel_depth_override: Option<usize>,
249    /// Optional observer for progress tracking
250    pub observer: Option<Arc<dyn ProcessingObserver>>,
251}
252
253impl ProcessFileContext {
254    /// Creates a new process file context with the given pipeline ID and
255    /// security context
256    pub fn new(pipeline_id: PipelineId, security_context: SecurityContext) -> Self {
257        Self {
258            pipeline_id,
259            security_context,
260            user_worker_override: None,
261            channel_depth_override: None,
262            observer: None,
263        }
264    }
265
266    /// Sets the worker count override
267    pub fn with_workers(mut self, workers: usize) -> Self {
268        self.user_worker_override = Some(workers);
269        self
270    }
271
272    /// Sets the channel depth override
273    pub fn with_channel_depth(mut self, depth: usize) -> Self {
274        self.channel_depth_override = Some(depth);
275        self
276    }
277
278    /// Sets the progress observer
279    pub fn with_observer(mut self, observer: Arc<dyn ProcessingObserver>) -> Self {
280        self.observer = Some(observer);
281        self
282    }
283}
284
285/// Domain service for pipeline operations
286#[async_trait]
287pub trait PipelineService: Send + Sync {
288    /// Process a file through the pipeline
289    async fn process_file(
290        &self,
291        input_path: &Path,
292        output_path: &Path,
293        context: ProcessFileContext,
294    ) -> Result<ProcessingMetrics, PipelineError>;
295
296    /// Processes file chunks through a pipeline
297    async fn process_chunks(
298        &self,
299        pipeline: &Pipeline,
300        chunks: Vec<FileChunk>,
301        context: &mut ProcessingContext,
302    ) -> Result<Vec<FileChunk>, PipelineError>;
303
304    /// Validates a pipeline configuration
305    async fn validate_pipeline(&self, pipeline: &Pipeline) -> Result<(), PipelineError>;
306
307    /// Estimates processing time for a pipeline
308    async fn estimate_processing_time(
309        &self,
310        pipeline: &Pipeline,
311        file_size: u64,
312    ) -> Result<std::time::Duration, PipelineError>;
313
314    /// Gets resource requirements for a pipeline
315    async fn get_resource_requirements(
316        &self,
317        pipeline: &Pipeline,
318        file_size: u64,
319    ) -> Result<ResourceRequirements, PipelineError>;
320
321    /// Creates an optimized pipeline for a file type
322    async fn create_optimized_pipeline(
323        &self,
324        file_path: &Path,
325        requirements: PipelineRequirements,
326    ) -> Result<Pipeline, PipelineError>;
327
328    /// Monitors pipeline execution
329    async fn monitor_execution(
330        &self,
331        pipeline_id: PipelineId,
332        context: &ProcessingContext,
333    ) -> Result<ExecutionStatus, PipelineError>;
334
335    /// Pauses pipeline execution
336    async fn pause_execution(&self, pipeline_id: PipelineId) -> Result<(), PipelineError>;
337
338    /// Resumes pipeline execution
339    async fn resume_execution(&self, pipeline_id: PipelineId) -> Result<(), PipelineError>;
340
341    /// Cancels pipeline execution
342    async fn cancel_execution(&self, pipeline_id: PipelineId) -> Result<(), PipelineError>;
343
344    /// Gets execution history for a pipeline
345    async fn get_execution_history(
346        &self,
347        pipeline_id: PipelineId,
348        limit: Option<usize>,
349    ) -> Result<Vec<ExecutionRecord>, PipelineError>;
350}
351
352/// Requirements for pipeline creation
353#[derive(Debug, Clone)]
354pub struct PipelineRequirements {
355    pub compression_required: bool,
356    pub encryption_required: bool,
357    pub integrity_required: bool,
358    pub performance_priority: PerformancePriority,
359    pub security_level: SecurityLevel,
360    pub max_memory_usage: Option<u64>,
361    pub max_processing_time: Option<std::time::Duration>,
362    pub parallel_processing: bool,
363}
364
365/// Performance priority levels
366#[derive(Debug, Clone, PartialEq)]
367pub enum PerformancePriority {
368    Speed,
369    Compression,
370    Security,
371    Balanced,
372}
373
374/// Pipeline execution status
375#[derive(Debug, Clone, Serialize, Deserialize)]
376pub struct ExecutionStatus {
377    pub pipeline_id: PipelineId,
378    pub status: ExecutionState,
379    pub progress_percentage: f64,
380    pub bytes_processed: u64,
381    pub bytes_total: u64,
382    pub current_stage: Option<String>,
383    pub estimated_remaining_time: Option<std::time::Duration>,
384    pub error_count: u64,
385    pub warning_count: u64,
386    #[serde(with = "datetime_serde")]
387    pub started_at: chrono::DateTime<chrono::Utc>,
388    #[serde(with = "datetime_serde")]
389    pub updated_at: chrono::DateTime<chrono::Utc>,
390}
391
392/// Pipeline execution states
393#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
394pub enum ExecutionState {
395    Pending,
396    Running,
397    Paused,
398    Completed,
399    Failed,
400    Cancelled,
401}
402
403/// Pipeline execution record
404#[derive(Debug, Clone, Serialize, Deserialize)]
405pub struct ExecutionRecord {
406    pub id: PipelineId,
407    pub pipeline_id: PipelineId,
408    pub input_path: std::path::PathBuf,
409    pub output_path: std::path::PathBuf,
410    pub status: ExecutionState,
411    pub metrics: ProcessingMetrics,
412    pub error_message: Option<String>,
413    #[serde(with = "datetime_serde")]
414    pub started_at: chrono::DateTime<chrono::Utc>,
415    #[serde(with = "datetime_serde::optional")]
416    pub completed_at: Option<chrono::DateTime<chrono::Utc>>,
417    pub security_context: SecurityContext,
418}
419
420impl Default for PipelineRequirements {
421    fn default() -> Self {
422        Self {
423            compression_required: false,
424            encryption_required: false,
425            integrity_required: false,
426            performance_priority: PerformancePriority::Balanced,
427            security_level: SecurityLevel::Internal,
428            max_memory_usage: None,
429            max_processing_time: None,
430            parallel_processing: true,
431        }
432    }
433}
434
435impl PipelineRequirements {
436    /// Creates new pipeline requirements
437    pub fn new() -> Self {
438        Self::default()
439    }
440
441    /// Sets compression requirement
442    pub fn with_compression(mut self, required: bool) -> Self {
443        self.compression_required = required;
444        self
445    }
446
447    /// Sets encryption requirement
448    pub fn with_encryption(mut self, required: bool) -> Self {
449        self.encryption_required = required;
450        self
451    }
452
453    /// Sets integrity requirement
454    pub fn with_integrity(mut self, required: bool) -> Self {
455        self.integrity_required = required;
456        self
457    }
458
459    /// Sets performance priority
460    pub fn with_performance_priority(mut self, priority: PerformancePriority) -> Self {
461        self.performance_priority = priority;
462        self
463    }
464
465    /// Sets security level
466    pub fn with_security_level(mut self, level: SecurityLevel) -> Self {
467        self.security_level = level;
468        self
469    }
470
471    /// Sets maximum memory usage
472    pub fn with_max_memory(mut self, max_memory: u64) -> Self {
473        self.max_memory_usage = Some(max_memory);
474        self
475    }
476
477    /// Sets maximum processing time
478    pub fn with_max_time(mut self, max_time: std::time::Duration) -> Self {
479        self.max_processing_time = Some(max_time);
480        self
481    }
482
483    /// Sets parallel processing
484    pub fn with_parallel_processing(mut self, enabled: bool) -> Self {
485        self.parallel_processing = enabled;
486        self
487    }
488}
489
490impl ExecutionStatus {
491    /// Creates new execution status
492    pub fn new(pipeline_id: PipelineId, bytes_total: u64) -> Self {
493        let now = chrono::Utc::now();
494        Self {
495            pipeline_id,
496            status: ExecutionState::Pending,
497            progress_percentage: 0.0,
498            bytes_processed: 0,
499            bytes_total,
500            current_stage: None,
501            estimated_remaining_time: None,
502            error_count: 0,
503            warning_count: 0,
504            started_at: now,
505            updated_at: now,
506        }
507    }
508
509    /// Updates the execution status
510    pub fn update(&mut self, metrics: &ProcessingMetrics, current_stage: Option<String>) {
511        self.bytes_processed = metrics.bytes_processed();
512        self.progress_percentage = metrics.progress_percentage();
513        self.current_stage = current_stage;
514        self.estimated_remaining_time = metrics.estimated_remaining_time();
515        self.error_count = metrics.error_count();
516        self.warning_count = metrics.warning_count();
517        self.updated_at = chrono::Utc::now();
518    }
519
520    /// Checks if execution is complete
521    pub fn is_complete(&self) -> bool {
522        matches!(
523            self.status,
524            ExecutionState::Completed | ExecutionState::Failed | ExecutionState::Cancelled
525        )
526    }
527
528    /// Checks if execution is active
529    pub fn is_active(&self) -> bool {
530        matches!(self.status, ExecutionState::Running | ExecutionState::Paused)
531    }
532}