pub struct PipelineStage { /* private fields */ }Expand description
Core pipeline stage entity representing a single processing step.
A PipelineStage is a domain entity that encapsulates a specific data
transformation operation within a pipeline. Each stage has a unique
identity, maintains its own configuration, and can be enabled/disabled
independently.
§Entity Characteristics
- Identity: Unique
StageIdthat persists through configuration changes - Type Safety: Strongly typed stage operations prevent configuration errors
- Ordering: Explicit ordering ensures predictable execution sequence
- Lifecycle: Tracks creation and modification timestamps
- State Management: Can be enabled/disabled without removal
§Stage Lifecycle
- Creation: Stage is created with initial configuration
- Configuration: Parameters can be updated as needed
- Ordering: Position in pipeline can be adjusted
- Execution: Stage processes data according to its configuration
- Monitoring: Timestamps track when changes occur
§Usage Examples
§Creating a Compression Stage
use adaptive_pipeline_domain::entities::pipeline_stage::{
PipelineStage, StageConfiguration, StageType,
};
use std::collections::HashMap;
let mut params = HashMap::new();
params.insert("level".to_string(), "6".to_string());
let config = StageConfiguration::new("brotli".to_string(), params, true);
let stage =
PipelineStage::new("compression".to_string(), StageType::Compression, config, 0).unwrap();
assert_eq!(stage.name(), "compression");
assert_eq!(stage.stage_type(), &StageType::Compression);
assert_eq!(stage.algorithm(), "brotli");
assert!(stage.is_enabled());§Creating an Encryption Stage
use adaptive_pipeline_domain::entities::pipeline_stage::{
PipelineStage, StageConfiguration, StageType,
};
use std::collections::HashMap;
let mut params = HashMap::new();
params.insert("key_size".to_string(), "256".to_string());
let config = StageConfiguration::new("aes256gcm".to_string(), params, false);
let stage =
PipelineStage::new("encryption".to_string(), StageType::Encryption, config, 1).unwrap();
assert_eq!(stage.algorithm(), "aes256gcm");
assert_eq!(stage.order(), 1);§Modifying Stage Configuration
use adaptive_pipeline_domain::entities::pipeline_stage::{
PipelineStage, StageConfiguration, StageType,
};
use std::collections::HashMap;
let config = StageConfiguration::default();
let mut stage =
PipelineStage::new("transform".to_string(), StageType::Transform, config, 0).unwrap();
// Update configuration
let mut new_params = HashMap::new();
new_params.insert("format".to_string(), "json".to_string());
let new_config = StageConfiguration::new("transform".to_string(), new_params, true);
stage.update_configuration(new_config);
assert_eq!(stage.algorithm(), "transform");§Stage Compatibility Checking
use adaptive_pipeline_domain::entities::pipeline_stage::{
PipelineStage, StageConfiguration, StageType,
};
let compression = PipelineStage::new(
"compression".to_string(),
StageType::Compression,
StageConfiguration::default(),
0,
)
.unwrap();
let encryption = PipelineStage::new(
"encryption".to_string(),
StageType::Encryption,
StageConfiguration::default(),
1,
)
.unwrap();
// Compression should come before encryption
assert!(compression.is_compatible_with(&encryption));§Enabling and Disabling Stages
use adaptive_pipeline_domain::entities::pipeline_stage::{
PipelineStage, StageConfiguration, StageType,
};
let mut stage = PipelineStage::new(
"checksum".to_string(),
StageType::Checksum,
StageConfiguration::default(),
0,
)
.unwrap();
assert!(stage.is_enabled());
// Disable the stage
stage.set_enabled(false);
assert!(!stage.is_enabled());
// Re-enable the stage
stage.set_enabled(true);
assert!(stage.is_enabled());§Stage Compatibility Rules
The stage compatibility system ensures optimal pipeline performance:
§Recommended Ordering
- Input Checksum (automatic)
- Compression (reduces data size)
- Encryption (secures compressed data)
- Output Checksum (automatic)
§Compatibility Matrix
From \ To | Compression | Encryption | Checksum | PassThrough
----------------|-------------|------------|----------|------------
Compression | ❌ No | ✅ Yes | ✅ Yes | ✅ Yes
Encryption | ❌ No | ❌ No | ✅ Yes | ✅ Yes
Checksum | ✅ Yes | ✅ Yes | ✅ Yes | ✅ Yes
PassThrough | ✅ Yes | ✅ Yes | ✅ Yes | ✅ Yes§Validation and Error Handling
Stages perform validation during creation and modification:
§Performance Considerations
- Stage creation and modification are lightweight operations
- Compatibility checking is performed in constant time
- Configuration updates only affect the specific stage
- Parallel processing settings can significantly impact performance
Implementations§
Source§impl PipelineStage
impl PipelineStage
Sourcepub fn new(
name: String,
stage_type: StageType,
configuration: StageConfiguration,
order: u32,
) -> Result<PipelineStage, PipelineError>
pub fn new( name: String, stage_type: StageType, configuration: StageConfiguration, order: u32, ) -> Result<PipelineStage, PipelineError>
Creates a new pipeline stage with the specified configuration
Constructs a new stage entity with a unique identifier and timestamps. The stage is created in an enabled state by default.
§Arguments
name- Human-readable stage identifier (must not be empty)stage_type- Type of processing operation (Compression, Encryption, etc.)configuration- Algorithm and parameter configuration for the stageorder- Execution order position in the pipeline (0-based)
§Returns
Ok(PipelineStage)- Successfully created stageErr(PipelineError::InvalidConfiguration)- If name is empty
§Errors
Returns InvalidConfiguration if the stage name is empty.
§Examples
use adaptive_pipeline_domain::entities::pipeline_stage::{
PipelineStage, StageConfiguration, StageType,
};
use std::collections::HashMap;
// Create a stage successfully
let mut params = HashMap::new();
params.insert("level".to_string(), "9".to_string());
let config = StageConfiguration::new("zstd".to_string(), params, true);
let stage = PipelineStage::new(
"my-compression-stage".to_string(),
StageType::Compression,
config,
0,
)
.unwrap();
assert_eq!(stage.name(), "my-compression-stage");
// Empty name returns an error
let result = PipelineStage::new(
"".to_string(),
StageType::Compression,
StageConfiguration::default(),
0,
);
assert!(result.is_err());Sourcepub fn stage_type(&self) -> &StageType
pub fn stage_type(&self) -> &StageType
Gets the processing operation type for this stage
§Returns
Reference to the stage type (Compression, Encryption, Checksum, or PassThrough)
Sourcepub fn configuration(&self) -> &StageConfiguration
pub fn configuration(&self) -> &StageConfiguration
Gets the complete configuration for this stage
Includes algorithm selection, parameters, and processing options.
§Returns
Reference to the stage’s configuration
Sourcepub fn algorithm(&self) -> &str
pub fn algorithm(&self) -> &str
Gets the algorithm name from the stage configuration
Convenience method for accessing the algorithm without going through the configuration object. Useful for test framework compatibility.
§Returns
The algorithm name as a string slice
Sourcepub fn is_enabled(&self) -> bool
pub fn is_enabled(&self) -> bool
Checks whether the stage is currently enabled for execution
Disabled stages are skipped during pipeline execution.
§Returns
true if enabled, false if disabled
Sourcepub fn order(&self) -> u32
pub fn order(&self) -> u32
Gets the execution order position of this stage
Lower numbers execute first. Order determines the sequence of processing operations in the pipeline.
§Returns
The stage’s order position (0-based)
Sourcepub fn created_at(&self) -> &DateTime<Utc>
pub fn created_at(&self) -> &DateTime<Utc>
Sourcepub fn updated_at(&self) -> &DateTime<Utc>
pub fn updated_at(&self) -> &DateTime<Utc>
Gets the timestamp of the last modification to this stage
Updated whenever configuration, enabled state, or order changes.
§Returns
Reference to the UTC timestamp of the last update
Sourcepub fn set_enabled(&mut self, enabled: bool)
pub fn set_enabled(&mut self, enabled: bool)
Enables or disables the stage for execution
Disabled stages are skipped during pipeline execution without being removed. This allows temporary deactivation while preserving stage configuration.
§Arguments
enabled-trueto enable execution,falseto disable
§Side Effects
Updates the updated_at timestamp
Sourcepub fn update_configuration(&mut self, configuration: StageConfiguration)
pub fn update_configuration(&mut self, configuration: StageConfiguration)
Sourcepub fn update_order(&mut self, order: u32)
pub fn update_order(&mut self, order: u32)
Sourcepub fn is_compatible_with(&self, other: &PipelineStage) -> bool
pub fn is_compatible_with(&self, other: &PipelineStage) -> bool
Checks if this stage is compatible with another stage
Sourcepub fn validate(&self) -> Result<(), PipelineError>
pub fn validate(&self) -> Result<(), PipelineError>
Validates the stage configuration
Trait Implementations§
Source§impl Clone for PipelineStage
impl Clone for PipelineStage
Source§fn clone(&self) -> PipelineStage
fn clone(&self) -> PipelineStage
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreSource§impl Debug for PipelineStage
impl Debug for PipelineStage
Source§impl<'de> Deserialize<'de> for PipelineStage
impl<'de> Deserialize<'de> for PipelineStage
Source§fn deserialize<__D>(
__deserializer: __D,
) -> Result<PipelineStage, <__D as Deserializer<'de>>::Error>where
__D: Deserializer<'de>,
fn deserialize<__D>(
__deserializer: __D,
) -> Result<PipelineStage, <__D as Deserializer<'de>>::Error>where
__D: Deserializer<'de>,
Source§impl Serialize for PipelineStage
impl Serialize for PipelineStage
Source§fn serialize<__S>(
&self,
__serializer: __S,
) -> Result<<__S as Serializer>::Ok, <__S as Serializer>::Error>where
__S: Serializer,
fn serialize<__S>(
&self,
__serializer: __S,
) -> Result<<__S as Serializer>::Ok, <__S as Serializer>::Error>where
__S: Serializer,
Auto Trait Implementations§
impl Freeze for PipelineStage
impl RefUnwindSafe for PipelineStage
impl Send for PipelineStage
impl Sync for PipelineStage
impl Unpin for PipelineStage
impl UnwindSafe for PipelineStage
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more