PipelineStage

Struct PipelineStage 

Source
pub struct PipelineStage { /* private fields */ }
Expand description

Core pipeline stage entity representing a single processing step.

A PipelineStage is a domain entity that encapsulates a specific data transformation operation within a pipeline. Each stage has a unique identity, maintains its own configuration, and can be enabled/disabled independently.

§Entity Characteristics

  • Identity: Unique StageId that persists through configuration changes
  • Type Safety: Strongly typed stage operations prevent configuration errors
  • Ordering: Explicit ordering ensures predictable execution sequence
  • Lifecycle: Tracks creation and modification timestamps
  • State Management: Can be enabled/disabled without removal

§Stage Lifecycle

  1. Creation: Stage is created with initial configuration
  2. Configuration: Parameters can be updated as needed
  3. Ordering: Position in pipeline can be adjusted
  4. Execution: Stage processes data according to its configuration
  5. Monitoring: Timestamps track when changes occur

§Usage Examples

§Creating a Compression Stage

use adaptive_pipeline_domain::entities::pipeline_stage::{
    PipelineStage, StageConfiguration, StageType,
};
use std::collections::HashMap;

let mut params = HashMap::new();
params.insert("level".to_string(), "6".to_string());

let config = StageConfiguration::new("brotli".to_string(), params, true);
let stage =
    PipelineStage::new("compression".to_string(), StageType::Compression, config, 0).unwrap();

assert_eq!(stage.name(), "compression");
assert_eq!(stage.stage_type(), &StageType::Compression);
assert_eq!(stage.algorithm(), "brotli");
assert!(stage.is_enabled());

§Creating an Encryption Stage

use adaptive_pipeline_domain::entities::pipeline_stage::{
    PipelineStage, StageConfiguration, StageType,
};
use std::collections::HashMap;

let mut params = HashMap::new();
params.insert("key_size".to_string(), "256".to_string());

let config = StageConfiguration::new("aes256gcm".to_string(), params, false);
let stage =
    PipelineStage::new("encryption".to_string(), StageType::Encryption, config, 1).unwrap();

assert_eq!(stage.algorithm(), "aes256gcm");
assert_eq!(stage.order(), 1);

§Modifying Stage Configuration

use adaptive_pipeline_domain::entities::pipeline_stage::{
    PipelineStage, StageConfiguration, StageType,
};
use std::collections::HashMap;

let config = StageConfiguration::default();
let mut stage =
    PipelineStage::new("transform".to_string(), StageType::Transform, config, 0).unwrap();

// Update configuration
let mut new_params = HashMap::new();
new_params.insert("format".to_string(), "json".to_string());
let new_config = StageConfiguration::new("transform".to_string(), new_params, true);
stage.update_configuration(new_config);

assert_eq!(stage.algorithm(), "transform");

§Stage Compatibility Checking

use adaptive_pipeline_domain::entities::pipeline_stage::{
    PipelineStage, StageConfiguration, StageType,
};

let compression = PipelineStage::new(
    "compression".to_string(),
    StageType::Compression,
    StageConfiguration::default(),
    0,
)
.unwrap();

let encryption = PipelineStage::new(
    "encryption".to_string(),
    StageType::Encryption,
    StageConfiguration::default(),
    1,
)
.unwrap();

// Compression should come before encryption
assert!(compression.is_compatible_with(&encryption));

§Enabling and Disabling Stages

use adaptive_pipeline_domain::entities::pipeline_stage::{
    PipelineStage, StageConfiguration, StageType,
};

let mut stage = PipelineStage::new(
    "checksum".to_string(),
    StageType::Checksum,
    StageConfiguration::default(),
    0,
)
.unwrap();

assert!(stage.is_enabled());

// Disable the stage
stage.set_enabled(false);
assert!(!stage.is_enabled());

// Re-enable the stage
stage.set_enabled(true);
assert!(stage.is_enabled());

§Stage Compatibility Rules

The stage compatibility system ensures optimal pipeline performance:

  1. Input Checksum (automatic)
  2. Compression (reduces data size)
  3. Encryption (secures compressed data)
  4. Output Checksum (automatic)

§Compatibility Matrix

From \ To      | Compression | Encryption | Checksum | PassThrough
----------------|-------------|------------|----------|------------
Compression     | ❌ No       | ✅ Yes     | ✅ Yes   | ✅ Yes
Encryption      | ❌ No       | ❌ No      | ✅ Yes   | ✅ Yes
Checksum        | ✅ Yes      | ✅ Yes     | ✅ Yes   | ✅ Yes
PassThrough     | ✅ Yes      | ✅ Yes     | ✅ Yes   | ✅ Yes

§Validation and Error Handling

Stages perform validation during creation and modification:

§Performance Considerations

  • Stage creation and modification are lightweight operations
  • Compatibility checking is performed in constant time
  • Configuration updates only affect the specific stage
  • Parallel processing settings can significantly impact performance

Implementations§

Source§

impl PipelineStage

Source

pub fn new( name: String, stage_type: StageType, configuration: StageConfiguration, order: u32, ) -> Result<PipelineStage, PipelineError>

Creates a new pipeline stage with the specified configuration

Constructs a new stage entity with a unique identifier and timestamps. The stage is created in an enabled state by default.

§Arguments
  • name - Human-readable stage identifier (must not be empty)
  • stage_type - Type of processing operation (Compression, Encryption, etc.)
  • configuration - Algorithm and parameter configuration for the stage
  • order - Execution order position in the pipeline (0-based)
§Returns
  • Ok(PipelineStage) - Successfully created stage
  • Err(PipelineError::InvalidConfiguration) - If name is empty
§Errors

Returns InvalidConfiguration if the stage name is empty.

§Examples
use adaptive_pipeline_domain::entities::pipeline_stage::{
    PipelineStage, StageConfiguration, StageType,
};
use std::collections::HashMap;

// Create a stage successfully
let mut params = HashMap::new();
params.insert("level".to_string(), "9".to_string());
let config = StageConfiguration::new("zstd".to_string(), params, true);

let stage = PipelineStage::new(
    "my-compression-stage".to_string(),
    StageType::Compression,
    config,
    0,
)
.unwrap();

assert_eq!(stage.name(), "my-compression-stage");

// Empty name returns an error
let result = PipelineStage::new(
    "".to_string(),
    StageType::Compression,
    StageConfiguration::default(),
    0,
);
assert!(result.is_err());
Source

pub fn id(&self) -> &StageId

Gets the unique identifier for this stage

§Returns

Reference to the stage’s unique identifier

Source

pub fn name(&self) -> &str

Gets the human-readable name of the stage

§Returns

The stage name as a string slice

Source

pub fn stage_type(&self) -> &StageType

Gets the processing operation type for this stage

§Returns

Reference to the stage type (Compression, Encryption, Checksum, or PassThrough)

Source

pub fn configuration(&self) -> &StageConfiguration

Gets the complete configuration for this stage

Includes algorithm selection, parameters, and processing options.

§Returns

Reference to the stage’s configuration

Source

pub fn algorithm(&self) -> &str

Gets the algorithm name from the stage configuration

Convenience method for accessing the algorithm without going through the configuration object. Useful for test framework compatibility.

§Returns

The algorithm name as a string slice

Source

pub fn is_enabled(&self) -> bool

Checks whether the stage is currently enabled for execution

Disabled stages are skipped during pipeline execution.

§Returns

true if enabled, false if disabled

Source

pub fn order(&self) -> u32

Gets the execution order position of this stage

Lower numbers execute first. Order determines the sequence of processing operations in the pipeline.

§Returns

The stage’s order position (0-based)

Source

pub fn created_at(&self) -> &DateTime<Utc>

Gets the timestamp when this stage was created

§Returns

Reference to the UTC creation timestamp

Source

pub fn updated_at(&self) -> &DateTime<Utc>

Gets the timestamp of the last modification to this stage

Updated whenever configuration, enabled state, or order changes.

§Returns

Reference to the UTC timestamp of the last update

Source

pub fn set_enabled(&mut self, enabled: bool)

Enables or disables the stage for execution

Disabled stages are skipped during pipeline execution without being removed. This allows temporary deactivation while preserving stage configuration.

§Arguments
  • enabled - true to enable execution, false to disable
§Side Effects

Updates the updated_at timestamp

Source

pub fn update_configuration(&mut self, configuration: StageConfiguration)

Updates the complete stage configuration

Replaces the entire configuration including algorithm, parameters, and processing options.

§Arguments
  • configuration - New configuration to apply to the stage
§Side Effects

Updates the updated_at timestamp

Source

pub fn update_order(&mut self, order: u32)

Updates the execution order position of this stage

Changes where this stage executes in the pipeline sequence. Lower order values execute first.

§Arguments
  • order - New order position (0-based)
§Side Effects

Updates the updated_at timestamp

Source

pub fn is_compatible_with(&self, other: &PipelineStage) -> bool

Checks if this stage is compatible with another stage

Source

pub fn validate(&self) -> Result<(), PipelineError>

Validates the stage configuration

Trait Implementations§

Source§

impl Clone for PipelineStage

Source§

fn clone(&self) -> PipelineStage

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for PipelineStage

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error>

Formats the value using the given formatter. Read more
Source§

impl<'de> Deserialize<'de> for PipelineStage

Source§

fn deserialize<__D>( __deserializer: __D, ) -> Result<PipelineStage, <__D as Deserializer<'de>>::Error>
where __D: Deserializer<'de>,

Deserialize this value from the given Serde deserializer. Read more
Source§

impl Serialize for PipelineStage

Source§

fn serialize<__S>( &self, __serializer: __S, ) -> Result<<__S as Serializer>::Ok, <__S as Serializer>::Error>
where __S: Serializer,

Serialize this value into the given Serde serializer. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> DeserializeOwned for T
where T: for<'de> Deserialize<'de>,

Source§

impl<T> ErasedDestructor for T
where T: 'static,