pub struct Pipeline { /* private fields */ }Expand description
Core pipeline entity representing a configurable processing workflow.
A Pipeline is a domain entity that orchestrates file processing through
an ordered sequence of stages. Each pipeline has a unique identity and
maintains its own configuration, metrics, and processing history.
§Entity Characteristics
- Identity: Unique
PipelineIdthat persists through all changes - Mutability: Can be modified while preserving identity
- Business Logic: Enforces stage compatibility and ordering rules
- Lifecycle: Tracks creation and modification timestamps
- Persistence: Supports both generic and SQLite repository patterns
§Automatic Integrity Verification
Every pipeline automatically includes integrity verification stages:
[Input Checksum] -> [User Stage 1] -> [User Stage 2] -> [Output Checksum]
(order: 0) (order: 1) (order: 2) (order: 3)This ensures data integrity is maintained throughout the entire processing workflow.
§Usage Examples
§Creating a New Pipeline
use adaptive_pipeline_domain::entities::pipeline::Pipeline;
use adaptive_pipeline_domain::entities::pipeline_stage::{
PipelineStage, StageConfiguration, StageType,
};
use std::collections::HashMap;
// Create user-defined stages
let compression = PipelineStage::new(
"compress".to_string(),
StageType::Compression,
StageConfiguration::new("zstd".to_string(), HashMap::new(), true),
0,
)
.unwrap();
let encryption = PipelineStage::new(
"encrypt".to_string(),
StageType::Encryption,
StageConfiguration::new("aes256gcm".to_string(), HashMap::new(), false),
1,
)
.unwrap();
// Create pipeline (checksum stages added automatically)
let pipeline =
Pipeline::new("Secure Backup".to_string(), vec![compression, encryption]).unwrap();
assert_eq!(pipeline.name(), "Secure Backup");
// Pipeline has 4 stages: input_checksum + 2 user stages + output_checksum
assert_eq!(pipeline.stages().len(), 4);§Modifying Pipeline Configuration
use adaptive_pipeline_domain::entities::pipeline::Pipeline;
use adaptive_pipeline_domain::entities::pipeline_stage::{
PipelineStage, StageConfiguration, StageType,
};
use std::collections::HashMap;
let stage = PipelineStage::new(
"transform".to_string(),
StageType::Transform,
StageConfiguration::default(),
0,
)
.unwrap();
let mut pipeline = Pipeline::new("Data Pipeline".to_string(), vec![stage]).unwrap();
// Add configuration parameters
let mut config = HashMap::new();
config.insert("output_format".to_string(), "json".to_string());
config.insert("compression_level".to_string(), "6".to_string());
pipeline.update_configuration(config);
assert_eq!(
pipeline.configuration().get("output_format"),
Some(&"json".to_string())
);§Adding Stages Dynamically
use adaptive_pipeline_domain::entities::pipeline::Pipeline;
use adaptive_pipeline_domain::entities::pipeline_stage::{
PipelineStage, StageConfiguration, StageType,
};
let initial_stage = PipelineStage::new(
"compress".to_string(),
StageType::Compression,
StageConfiguration::default(),
0,
)
.unwrap();
let mut pipeline =
Pipeline::new("Processing Pipeline".to_string(), vec![initial_stage]).unwrap();
// Add a new encryption stage
let encryption_stage = PipelineStage::new(
"encrypt".to_string(),
StageType::Encryption,
StageConfiguration::default(),
0, // order will be adjusted
)
.unwrap();
pipeline.add_stage(encryption_stage).unwrap();
// Pipeline now has 4 stages: input_checksum + compression + encryption + output_checksum
assert_eq!(pipeline.stages().len(), 4);§Business Rules and Validation
The pipeline enforces several important business rules:
§Stage Compatibility
Consecutive stages must be compatible with each other:
§Minimum Stage Requirement
Pipelines must contain at least one stage (including auto-added checksum stages).
§Stage Ordering
Stages are automatically reordered to maintain proper execution sequence.
§Metrics and Monitoring
Pipelines track processing metrics for performance analysis:
§Repository Integration
The pipeline supports multiple repository patterns:
§Generic Repository
§SQLite Repository
§Error Handling
Pipeline operations return Result types with specific error variants:
InvalidConfiguration: Invalid pipeline setup or parametersIncompatibleStage: Stages cannot be used togetherInvalidInput: Invalid data provided to pipeline methods
§Thread Safety and Concurrency
While the pipeline entity itself is not thread-safe (following DDD
principles), it can be safely shared across threads when wrapped in
appropriate synchronization primitives like Arc<Mutex<Pipeline>>.
§Performance Considerations
- Stage validation is performed during modification, not during processing
- Metrics collection has minimal overhead
- Automatic stage insertion occurs only during pipeline creation
- Repository operations are optimized for both read and write performance
Implementations§
Source§impl Pipeline
impl Pipeline
Sourcepub fn new(
name: String,
user_stages: Vec<PipelineStage>,
) -> Result<Self, PipelineError>
pub fn new( name: String, user_stages: Vec<PipelineStage>, ) -> Result<Self, PipelineError>
Creates a new pipeline with the given name and user-defined stages.
§Automatic Stage Insertion
IMPORTANT: This constructor automatically inserts mandatory checksum stages:
input_checksumstage is prepended (order: 0)- User-defined stages follow (order: 1, 2, 3…)
output_checksumstage is appended (order: final)
This ensures the database reflects the complete processing pipeline that actually gets executed, maintaining the “database as single source of truth” principle.
§Example
use adaptive_pipeline_domain::entities::pipeline::Pipeline;
use adaptive_pipeline_domain::entities::pipeline_stage::{
PipelineStage, StageConfiguration, StageType,
};
let compression = PipelineStage::new(
"compress".to_string(),
StageType::Compression,
StageConfiguration::default(),
0,
)
.unwrap();
let pipeline = Pipeline::new("My Pipeline".to_string(), vec![compression]).unwrap();
// Verify automatic checksum stages were added
assert_eq!(pipeline.stages().len(), 3); // input_checksum + user stage + output_checksum
assert_eq!(pipeline.name(), "My Pipeline");§Arguments
name- Pipeline name (must not be empty)stages- User-defined processing stages (must not be empty)
§Returns
Returns a Pipeline with automatic checksum stages inserted, or
PipelineError if validation fails.
§Errors
InvalidConfiguration- If name is empty or no user stages provided
Sourcepub fn id(&self) -> &PipelineId
pub fn id(&self) -> &PipelineId
Sourcepub fn stages(&self) -> &[PipelineStage]
pub fn stages(&self) -> &[PipelineStage]
Gets the ordered list of processing stages in this pipeline
Returns all stages including automatically-inserted checksum stages. Stages are returned in execution order (order: 0, 1, 2, …).
§Why This Returns a Slice
Returning a slice (&[PipelineStage]) instead of a Vec provides:
- No cloning: Efficient access without copying data
- Immutability: Prevents external modification of stages
- Standard interface: Works with all slice methods and iterators
§Returns
An immutable slice containing all pipeline stages in execution order
§Examples
Sourcepub fn configuration(&self) -> &HashMap<String, String>
pub fn configuration(&self) -> &HashMap<String, String>
Gets the pipeline configuration parameters
Configuration parameters are key-value pairs that control pipeline behavior. Common parameters include:
max_workers: Number of concurrent worker threadschunk_size: Size of data chunks for processingtimeout: Processing timeout in secondsbuffer_size: I/O buffer size in bytes
§Returns
An immutable reference to the configuration HashMap
§Examples
Sourcepub fn metrics(&self) -> &ProcessingMetrics
pub fn metrics(&self) -> &ProcessingMetrics
Sourcepub fn created_at(&self) -> &DateTime<Utc>
pub fn created_at(&self) -> &DateTime<Utc>
Sourcepub fn updated_at(&self) -> &DateTime<Utc>
pub fn updated_at(&self) -> &DateTime<Utc>
Gets the timestamp of the last modification to this pipeline
The updated timestamp changes whenever the pipeline is modified, including configuration updates, stage additions/removals, or metrics updates.
§Why Track Updates?
Tracking update times enables:
- Optimistic locking: Detect concurrent modifications
- Audit trails: Know when changes occurred
- Cache invalidation: Know when cached data is stale
- Sorting: Order pipelines by recency
§Returns
Reference to the UTC timestamp of the last update
§Examples
Sourcepub fn status(&self) -> &'static str
pub fn status(&self) -> &'static str
Gets the current status of the pipeline
Returns a simple status string indicating whether the pipeline is currently active or archived. This is a basic status indicator; detailed operational status (running, idle, failed) should be obtained from monitoring systems like Prometheus/Grafana.
§Why Simple Status?
This returns a static status because:
- Domain purity: Pipeline entity shouldn’t know about runtime state
- Separation of concerns: Operational status belongs in monitoring
- Simplicity: Avoid mixing persistent state with transient state
§Returns
"Active"if the pipeline is available for use"Archived"if the pipeline has been soft-deleted
§Examples
§Note
For real-time operational status (running, idle, error states), query your monitoring system (Prometheus/Grafana) instead.
Sourcepub fn archived(&self) -> bool
pub fn archived(&self) -> bool
Checks if the pipeline is archived (soft-deleted)
Archived pipelines are not physically deleted but are hidden from normal queries and cannot be executed. Archiving provides:
- Reversibility: Can be restored if needed
- Audit trail: Maintains history of deleted pipelines
- Data integrity: Preserves foreign key relationships
§Returns
trueif the pipeline is archivedfalseif the pipeline is active
§Examples
Sourcepub fn update_configuration(&mut self, config: HashMap<String, String>)
pub fn update_configuration(&mut self, config: HashMap<String, String>)
Updates the complete pipeline configuration
Replaces the entire configuration HashMap with new values. Any previous configuration is discarded. For updating individual keys, retrieve the configuration, modify it, and pass it back.
§Why Replace Instead of Merge?
Complete replacement provides:
- Clear semantics: No ambiguity about what happens to old values
- Simplicity: No complex merge logic needed
- Explicit control: Caller decides exact final state
- Immutability pattern: Aligns with functional programming principles
§Arguments
config- The new configuration HashMap to set. Common keys include:max_workers: Number of worker threadschunk_size: Processing chunk size in bytestimeout: Timeout in secondsbuffer_size: I/O buffer size
§Side Effects
- Replaces all configuration values
- Updates the
updated_attimestamp
§Examples
§Updating Individual Keys
Sourcepub fn add_stage(&mut self, stage: PipelineStage) -> Result<(), PipelineError>
pub fn add_stage(&mut self, stage: PipelineStage) -> Result<(), PipelineError>
Adds a new processing stage to the pipeline
Appends a stage to the end of the pipeline’s stage sequence. The new stage must be compatible with the last existing stage according to compatibility rules (e.g., compression should precede encryption).
§Why Compatibility Checking?
Stage compatibility ensures:
- Correct ordering: Stages execute in a logical sequence
- Data integrity: Each stage can process the previous stage’s output
- Performance: Optimal stage ordering (compress before encrypt)
- Correctness: Prevents invalid combinations (e.g., double compression)
§Arguments
stage- The pipeline stage to add. Must be compatible with the current last stage.
§Returns
Ok(())if the stage was added successfullyErr(PipelineError::IncompatibleStage)if stage is incompatible
§Errors
Returns IncompatibleStage if the new stage is not compatible with
the last stage in the pipeline. For example:
- Attempting to add encryption before compression
- Adding duplicate stage types
- Incompatible algorithm combinations
§Side Effects
- Appends stage to the pipeline’s stage list
- Updates the
updated_attimestamp
§Examples
Sourcepub fn remove_stage(
&mut self,
index: usize,
) -> Result<PipelineStage, PipelineError>
pub fn remove_stage( &mut self, index: usize, ) -> Result<PipelineStage, PipelineError>
Removes a processing stage from the pipeline by its index position
Removes the stage at the specified index and returns it. The pipeline must always have at least one stage remaining after removal.
§Why Index-Based Removal?
Index-based removal provides:
- Precision: Remove exact stage by position
- Simplicity: No need to search by name or ID
- Efficiency: O(n) removal where n is stages after index
- Flexibility: Works even with duplicate stage names
§Arguments
index- Zero-based position of the stage to remove (0 = first stage)
§Returns
Ok(PipelineStage)- The removed stageErr(PipelineError)- If index is invalid or removal would leave pipeline empty
§Errors
This function returns an error if:
InvalidConfiguration: Index is out of bounds (>= stage count)InvalidConfiguration: Removing the last remaining stage
§Side Effects
- Removes stage from the pipeline
- Shifts subsequent stages down by one position
- Updates the
updated_attimestamp
§Examples
Sourcepub fn update_metrics(&mut self, metrics: ProcessingMetrics)
pub fn update_metrics(&mut self, metrics: ProcessingMetrics)
Updates the pipeline’s processing metrics with new values
Replaces the entire metrics object with new performance data. This is typically called after processing completes to record final statistics.
§Why Replace Metrics?
Complete replacement instead of incremental updates provides:
- Atomicity: Metrics represent a single processing run
- Clarity: No confusion about partial vs. complete metrics
- Simplicity: No merge logic needed
- Immutability: Aligns with functional programming patterns
§Arguments
metrics- New processing metrics to replace current metrics. Should contain complete statistics from a processing run including:- Bytes processed and throughput
- Processing duration
- Error and warning counts
- Stage-specific metrics
§Side Effects
- Replaces current metrics completely
- Updates the
updated_attimestamp
§Examples
Sourcepub fn validate(&self) -> Result<(), PipelineError>
pub fn validate(&self) -> Result<(), PipelineError>
Validates the complete pipeline configuration for correctness
Performs comprehensive validation of the pipeline’s configuration including stage count, stage compatibility, and ordering rules. This should be called before attempting to execute the pipeline.
§What is Validated?
- Stage Count: Pipeline must have at least one stage
- Stage Compatibility: Each stage must be compatible with the next
- Stage Ordering: Stages must be in correct execution order
- Configuration Completeness: All required configuration present
§Why Validate?
Validation prevents:
- Runtime errors: Catch issues before execution starts
- Data corruption: Ensure stages can process each other’s output
- Resource waste: Don’t start processing with invalid configuration
- Poor UX: Provide clear error messages upfront
§Returns
Ok(())if pipeline configuration is validErr(PipelineError)with details if validation fails
§Errors
Returns an error if:
InvalidConfiguration: No stages present in pipelineIncompatibleStage: Adjacent stages are incompatible
§Examples
§Implementation Note
Validation uses a sliding window to check pairwise stage compatibility, which is O(n) where n is the number of stages.
Sourcepub fn from_database(data: PipelineData) -> Result<Self, PipelineError>
pub fn from_database(data: PipelineData) -> Result<Self, PipelineError>
Creates a pipeline from database data (for repository use).
§Arguments
data- APipelineDataDTO containing all fields from the database
§Returns
Returns Ok(Pipeline) if the data is valid, or Err(PipelineError) if
validation fails.
§Errors
PipelineError::InvalidConfiguration- If name is empty or no stages provided
Trait Implementations§
Source§impl<'de> Deserialize<'de> for Pipeline
impl<'de> Deserialize<'de> for Pipeline
Source§fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
Auto Trait Implementations§
impl Freeze for Pipeline
impl RefUnwindSafe for Pipeline
impl Send for Pipeline
impl Sync for Pipeline
impl Unpin for Pipeline
impl UnwindSafe for Pipeline
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more