adaptive_pipeline_domain/services/pipeline_service.rs
1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Pipeline Service Interface
9//!
10//! This module defines the domain service interface for pipeline processing
11//! operations within the adaptive pipeline system. It provides abstractions for
12//! executing processing pipelines, managing pipeline lifecycle, and monitoring
13//! execution.
14//!
15//! ## Overview
16//!
17//! The pipeline service provides:
18//!
19//! - **Pipeline Execution**: Execute configured processing pipelines
20//! - **Lifecycle Management**: Manage pipeline creation, execution, and cleanup
21//! - **Progress Monitoring**: Real-time progress tracking and reporting
22//! - **Event Handling**: Comprehensive event system for pipeline operations
23//! - **Resource Management**: Efficient resource allocation and management
24//!
25//! ## Architecture
26//!
27//! The service follows Domain-Driven Design principles:
28//!
29//! - **Domain Interface**: `PipelineService` trait defines the contract
30//! - **Event System**: Observer pattern for pipeline events
31//! - **Processing Context**: Maintains state throughout pipeline execution
32//! - **Security Integration**: Integrated security context and validation
33//!
34//! ## Key Features
35//!
36//! ### Pipeline Execution
37//!
38//! - **Stage Orchestration**: Coordinate execution of pipeline stages
39//! - **Data Flow**: Manage data flow between processing stages
40//! - **Error Handling**: Comprehensive error handling and recovery
41//! - **Resource Allocation**: Intelligent resource allocation and cleanup
42//!
43//! ### Progress Monitoring
44//!
45//! - **Real-time Updates**: Live progress updates during execution
46//! - **Performance Metrics**: Detailed performance metrics and statistics
47//! - **Event Notifications**: Comprehensive event system for monitoring
48//! - **Throughput Tracking**: Track processing throughput and efficiency
49//!
50//! ### Security Integration
51//!
52//! - **Security Context**: Integrated security context validation
53//! - **Access Control**: Enforce access control policies
54//! - **Audit Logging**: Comprehensive audit trail of operations
55//! - **Compliance**: Support for regulatory compliance requirements
56//!
57//! ## Usage Examples
58//!
59//! ### Basic Pipeline Execution
60
61//!
62//! ### Pipeline with Observer
63
64//!
65//! ### Batch Pipeline Processing
66
67//!
68//! ## Event System
69//!
70//! ### Processing Observer
71//!
72//! The `ProcessingObserver` trait provides hooks for monitoring pipeline
73//! execution:
74//!
75//! - **Chunk Events**: Track individual chunk processing
76//! - **Progress Updates**: Receive periodic progress updates
77//! - **Lifecycle Events**: Monitor pipeline start and completion
78//! - **Error Events**: Handle processing errors and failures
79//!
80//! ### Event Types
81//!
82//! - **on_processing_started**: Called when pipeline execution begins
83//! - **on_chunk_started**: Called when a chunk begins processing
84//! - **on_chunk_completed**: Called when a chunk completes processing
85//! - **on_progress_update**: Called periodically with progress information
86//! - **on_processing_completed**: Called when pipeline execution completes
87//! - **on_error**: Called when errors occur during processing
88//!
89//! ## Processing Context
90//!
91//! ### Context Management
92//!
93//! - **State Tracking**: Track processing state throughout execution
94//! - **Resource Management**: Manage resources and cleanup
95//! - **Security Context**: Maintain security context and validation
96//! - **Metrics Collection**: Collect processing metrics and statistics
97//!
98//! ### Context Lifecycle
99//!
100//! - **Initialization**: Initialize context before processing
101//! - **Stage Transitions**: Update context between processing stages
102//! - **Error Handling**: Maintain context during error conditions
103//! - **Cleanup**: Clean up context after processing completion
104//!
105//! ## Security Integration
106//!
107//! ### Security Context
108//!
109//! - **Authentication**: Verify user authentication and authorization
110//! - **Access Control**: Enforce access control policies
111//! - **Audit Logging**: Log all security-relevant operations
112//! - **Compliance**: Support regulatory compliance requirements
113//!
114//! ### Security Validation
115//!
116//! - **Input Validation**: Validate all input parameters
117//! - **Permission Checks**: Verify permissions for file operations
118//! - **Resource Limits**: Enforce resource usage limits
119//! - **Threat Detection**: Detect and prevent security threats
120//!
121//! ## Error Handling
122//!
123//! ### Error Categories
124//!
125//! - **Configuration Errors**: Invalid pipeline configuration
126//! - **Processing Errors**: Errors during pipeline execution
127//! - **Security Errors**: Security violations and access denials
128//! - **Resource Errors**: Resource exhaustion and allocation failures
129//!
130//! ### Recovery Strategies
131//!
132//! - **Retry Logic**: Automatic retry for transient failures
133//! - **Fallback Processing**: Alternative processing strategies
134//! - **Partial Results**: Return partial results when possible
135//! - **Graceful Degradation**: Graceful handling of service failures
136//!
137//! ## Performance Considerations
138//!
139//! ### Execution Optimization
140//!
141//! - **Parallel Processing**: Parallel execution of pipeline stages
142//! - **Resource Pooling**: Efficient resource pooling and reuse
143//! - **Memory Management**: Optimized memory usage and cleanup
144//! - **I/O Optimization**: Efficient file I/O operations
145//!
146//! ### Monitoring and Metrics
147//!
148//! - **Performance Metrics**: Detailed performance monitoring
149//! - **Resource Usage**: Track resource utilization
150//! - **Bottleneck Detection**: Identify performance bottlenecks
151//! - **Optimization Recommendations**: Suggest performance improvements
152//!
153//! ## Integration
154//!
155//! The pipeline service integrates with:
156//!
157//! - **Pipeline Repository**: Load and manage pipeline configurations
158//! - **File Processor**: Execute file processing operations
159//! - **Security Service**: Validate security context and permissions
160//! - **Metrics Service**: Report processing metrics and statistics
161//!
162//! ## Thread Safety
163//!
164//! The service interface is designed for thread safety:
165//!
166//! - **Concurrent Execution**: Safe concurrent pipeline execution
167//! - **Shared Resources**: Safe sharing of pipeline resources
168//! - **State Management**: Thread-safe state management
169//!
170//! ## Future Enhancements
171//!
172//! Planned enhancements include:
173//!
174//! - **Distributed Execution**: Support for distributed pipeline execution
175//! - **Dynamic Scaling**: Automatic scaling based on workload
176//! - **Advanced Scheduling**: Sophisticated pipeline scheduling
177//! - **Machine Learning**: ML-based optimization and prediction
178
179use crate::entities::security_context::SecurityLevel;
180use crate::entities::{Pipeline, ProcessingContext, SecurityContext};
181use crate::repositories::stage_executor::ResourceRequirements;
182use crate::services::datetime_serde;
183use crate::value_objects::{FileChunk, PipelineId};
184use crate::{PipelineError, ProcessingMetrics};
185use async_trait::async_trait;
186use serde::{Deserialize, Serialize};
187use std::path::Path;
188use std::sync::Arc;
189
190/// Observer trait for pipeline processing events
191///
192/// This trait defines the interface for observing pipeline processing events,
193/// enabling real-time monitoring, progress tracking, and event handling during
194/// pipeline execution.
195///
196/// # Key Features
197///
198/// - **Event Notifications**: Receive notifications for various pipeline events
199/// - **Progress Monitoring**: Track processing progress in real-time
200/// - **Performance Metrics**: Access detailed performance metrics
201/// - **Error Handling**: Handle processing errors and failures
202/// - **Lifecycle Management**: Monitor pipeline lifecycle events
203///
204/// # Event Types
205///
206/// - **Chunk Events**: Individual chunk processing start/completion
207/// - **Progress Events**: Periodic progress updates with throughput
208/// - **Lifecycle Events**: Pipeline start/completion events
209/// - **Error Events**: Processing errors and failure notifications
210///
211/// # Examples
212#[async_trait]
213pub trait ProcessingObserver: Send + Sync {
214 /// Called when a chunk starts processing
215 async fn on_chunk_started(&self, _chunk_id: u64, _size: usize) {}
216
217 /// Called when a chunk completes processing
218 async fn on_chunk_completed(&self, _chunk_id: u64, _duration: std::time::Duration) {}
219
220 /// Called periodically with progress updates
221 async fn on_progress_update(&self, _bytes_processed: u64, _total_bytes: u64, _throughput_mbps: f64) {}
222
223 /// Called when processing starts
224 async fn on_processing_started(&self, _total_bytes: u64) {}
225
226 /// Called when processing completes
227 async fn on_processing_completed(
228 &self,
229 _total_duration: std::time::Duration,
230 _final_metrics: Option<&ProcessingMetrics>,
231 ) {
232 }
233}
234
235/// Configuration for processing a file through a pipeline
236///
237/// Groups related parameters to avoid excessive function arguments.
238/// This context is passed to `PipelineService::process_file`.
239#[derive(Clone)]
240pub struct ProcessFileContext {
241 /// Pipeline identifier
242 pub pipeline_id: PipelineId,
243 /// Security context for processing
244 pub security_context: SecurityContext,
245 /// Optional override for number of worker threads
246 pub user_worker_override: Option<usize>,
247 /// Optional override for channel depth
248 pub channel_depth_override: Option<usize>,
249 /// Optional observer for progress tracking
250 pub observer: Option<Arc<dyn ProcessingObserver>>,
251}
252
253impl ProcessFileContext {
254 /// Creates a new process file context with the given pipeline ID and
255 /// security context
256 pub fn new(pipeline_id: PipelineId, security_context: SecurityContext) -> Self {
257 Self {
258 pipeline_id,
259 security_context,
260 user_worker_override: None,
261 channel_depth_override: None,
262 observer: None,
263 }
264 }
265
266 /// Sets the worker count override
267 pub fn with_workers(mut self, workers: usize) -> Self {
268 self.user_worker_override = Some(workers);
269 self
270 }
271
272 /// Sets the channel depth override
273 pub fn with_channel_depth(mut self, depth: usize) -> Self {
274 self.channel_depth_override = Some(depth);
275 self
276 }
277
278 /// Sets the progress observer
279 pub fn with_observer(mut self, observer: Arc<dyn ProcessingObserver>) -> Self {
280 self.observer = Some(observer);
281 self
282 }
283}
284
285/// Domain service for pipeline operations
286#[async_trait]
287pub trait PipelineService: Send + Sync {
288 /// Process a file through the pipeline
289 async fn process_file(
290 &self,
291 input_path: &Path,
292 output_path: &Path,
293 context: ProcessFileContext,
294 ) -> Result<ProcessingMetrics, PipelineError>;
295
296 /// Processes file chunks through a pipeline
297 async fn process_chunks(
298 &self,
299 pipeline: &Pipeline,
300 chunks: Vec<FileChunk>,
301 context: &mut ProcessingContext,
302 ) -> Result<Vec<FileChunk>, PipelineError>;
303
304 /// Validates a pipeline configuration
305 async fn validate_pipeline(&self, pipeline: &Pipeline) -> Result<(), PipelineError>;
306
307 /// Estimates processing time for a pipeline
308 async fn estimate_processing_time(
309 &self,
310 pipeline: &Pipeline,
311 file_size: u64,
312 ) -> Result<std::time::Duration, PipelineError>;
313
314 /// Gets resource requirements for a pipeline
315 async fn get_resource_requirements(
316 &self,
317 pipeline: &Pipeline,
318 file_size: u64,
319 ) -> Result<ResourceRequirements, PipelineError>;
320
321 /// Creates an optimized pipeline for a file type
322 async fn create_optimized_pipeline(
323 &self,
324 file_path: &Path,
325 requirements: PipelineRequirements,
326 ) -> Result<Pipeline, PipelineError>;
327
328 /// Monitors pipeline execution
329 async fn monitor_execution(
330 &self,
331 pipeline_id: PipelineId,
332 context: &ProcessingContext,
333 ) -> Result<ExecutionStatus, PipelineError>;
334
335 /// Pauses pipeline execution
336 async fn pause_execution(&self, pipeline_id: PipelineId) -> Result<(), PipelineError>;
337
338 /// Resumes pipeline execution
339 async fn resume_execution(&self, pipeline_id: PipelineId) -> Result<(), PipelineError>;
340
341 /// Cancels pipeline execution
342 async fn cancel_execution(&self, pipeline_id: PipelineId) -> Result<(), PipelineError>;
343
344 /// Gets execution history for a pipeline
345 async fn get_execution_history(
346 &self,
347 pipeline_id: PipelineId,
348 limit: Option<usize>,
349 ) -> Result<Vec<ExecutionRecord>, PipelineError>;
350}
351
352/// Requirements for pipeline creation
353#[derive(Debug, Clone)]
354pub struct PipelineRequirements {
355 pub compression_required: bool,
356 pub encryption_required: bool,
357 pub integrity_required: bool,
358 pub performance_priority: PerformancePriority,
359 pub security_level: SecurityLevel,
360 pub max_memory_usage: Option<u64>,
361 pub max_processing_time: Option<std::time::Duration>,
362 pub parallel_processing: bool,
363}
364
365/// Performance priority levels
366#[derive(Debug, Clone, PartialEq)]
367pub enum PerformancePriority {
368 Speed,
369 Compression,
370 Security,
371 Balanced,
372}
373
374/// Pipeline execution status
375#[derive(Debug, Clone, Serialize, Deserialize)]
376pub struct ExecutionStatus {
377 pub pipeline_id: PipelineId,
378 pub status: ExecutionState,
379 pub progress_percentage: f64,
380 pub bytes_processed: u64,
381 pub bytes_total: u64,
382 pub current_stage: Option<String>,
383 pub estimated_remaining_time: Option<std::time::Duration>,
384 pub error_count: u64,
385 pub warning_count: u64,
386 #[serde(with = "datetime_serde")]
387 pub started_at: chrono::DateTime<chrono::Utc>,
388 #[serde(with = "datetime_serde")]
389 pub updated_at: chrono::DateTime<chrono::Utc>,
390}
391
392/// Pipeline execution states
393#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
394pub enum ExecutionState {
395 Pending,
396 Running,
397 Paused,
398 Completed,
399 Failed,
400 Cancelled,
401}
402
403/// Pipeline execution record
404#[derive(Debug, Clone, Serialize, Deserialize)]
405pub struct ExecutionRecord {
406 pub id: PipelineId,
407 pub pipeline_id: PipelineId,
408 pub input_path: std::path::PathBuf,
409 pub output_path: std::path::PathBuf,
410 pub status: ExecutionState,
411 pub metrics: ProcessingMetrics,
412 pub error_message: Option<String>,
413 #[serde(with = "datetime_serde")]
414 pub started_at: chrono::DateTime<chrono::Utc>,
415 #[serde(with = "datetime_serde::optional")]
416 pub completed_at: Option<chrono::DateTime<chrono::Utc>>,
417 pub security_context: SecurityContext,
418}
419
420impl Default for PipelineRequirements {
421 fn default() -> Self {
422 Self {
423 compression_required: false,
424 encryption_required: false,
425 integrity_required: false,
426 performance_priority: PerformancePriority::Balanced,
427 security_level: SecurityLevel::Internal,
428 max_memory_usage: None,
429 max_processing_time: None,
430 parallel_processing: true,
431 }
432 }
433}
434
435impl PipelineRequirements {
436 /// Creates new pipeline requirements
437 pub fn new() -> Self {
438 Self::default()
439 }
440
441 /// Sets compression requirement
442 pub fn with_compression(mut self, required: bool) -> Self {
443 self.compression_required = required;
444 self
445 }
446
447 /// Sets encryption requirement
448 pub fn with_encryption(mut self, required: bool) -> Self {
449 self.encryption_required = required;
450 self
451 }
452
453 /// Sets integrity requirement
454 pub fn with_integrity(mut self, required: bool) -> Self {
455 self.integrity_required = required;
456 self
457 }
458
459 /// Sets performance priority
460 pub fn with_performance_priority(mut self, priority: PerformancePriority) -> Self {
461 self.performance_priority = priority;
462 self
463 }
464
465 /// Sets security level
466 pub fn with_security_level(mut self, level: SecurityLevel) -> Self {
467 self.security_level = level;
468 self
469 }
470
471 /// Sets maximum memory usage
472 pub fn with_max_memory(mut self, max_memory: u64) -> Self {
473 self.max_memory_usage = Some(max_memory);
474 self
475 }
476
477 /// Sets maximum processing time
478 pub fn with_max_time(mut self, max_time: std::time::Duration) -> Self {
479 self.max_processing_time = Some(max_time);
480 self
481 }
482
483 /// Sets parallel processing
484 pub fn with_parallel_processing(mut self, enabled: bool) -> Self {
485 self.parallel_processing = enabled;
486 self
487 }
488}
489
490impl ExecutionStatus {
491 /// Creates new execution status
492 pub fn new(pipeline_id: PipelineId, bytes_total: u64) -> Self {
493 let now = chrono::Utc::now();
494 Self {
495 pipeline_id,
496 status: ExecutionState::Pending,
497 progress_percentage: 0.0,
498 bytes_processed: 0,
499 bytes_total,
500 current_stage: None,
501 estimated_remaining_time: None,
502 error_count: 0,
503 warning_count: 0,
504 started_at: now,
505 updated_at: now,
506 }
507 }
508
509 /// Updates the execution status
510 pub fn update(&mut self, metrics: &ProcessingMetrics, current_stage: Option<String>) {
511 self.bytes_processed = metrics.bytes_processed();
512 self.progress_percentage = metrics.progress_percentage();
513 self.current_stage = current_stage;
514 self.estimated_remaining_time = metrics.estimated_remaining_time();
515 self.error_count = metrics.error_count();
516 self.warning_count = metrics.warning_count();
517 self.updated_at = chrono::Utc::now();
518 }
519
520 /// Checks if execution is complete
521 pub fn is_complete(&self) -> bool {
522 matches!(
523 self.status,
524 ExecutionState::Completed | ExecutionState::Failed | ExecutionState::Cancelled
525 )
526 }
527
528 /// Checks if execution is active
529 pub fn is_active(&self) -> bool {
530 matches!(self.status, ExecutionState::Running | ExecutionState::Paused)
531 }
532}