Pipeline

Struct Pipeline 

Source
pub struct Pipeline { /* private fields */ }
Expand description

Core pipeline entity representing a configurable processing workflow.

A Pipeline is a domain entity that orchestrates file processing through an ordered sequence of stages. Each pipeline has a unique identity and maintains its own configuration, metrics, and processing history.

§Entity Characteristics

  • Identity: Unique PipelineId that persists through all changes
  • Mutability: Can be modified while preserving identity
  • Business Logic: Enforces stage compatibility and ordering rules
  • Lifecycle: Tracks creation and modification timestamps
  • Persistence: Supports both generic and SQLite repository patterns

§Automatic Integrity Verification

Every pipeline automatically includes integrity verification stages:

[Input Checksum] -> [User Stage 1] -> [User Stage 2] -> [Output Checksum]
     (order: 0)       (order: 1)       (order: 2)        (order: 3)

This ensures data integrity is maintained throughout the entire processing workflow.

§Usage Examples

§Creating a New Pipeline

use adaptive_pipeline_domain::entities::pipeline::Pipeline;
use adaptive_pipeline_domain::entities::pipeline_stage::{
    PipelineStage, StageConfiguration, StageType,
};
use std::collections::HashMap;

// Create user-defined stages
let compression = PipelineStage::new(
    "compress".to_string(),
    StageType::Compression,
    StageConfiguration::new("zstd".to_string(), HashMap::new(), true),
    0,
)
.unwrap();

let encryption = PipelineStage::new(
    "encrypt".to_string(),
    StageType::Encryption,
    StageConfiguration::new("aes256gcm".to_string(), HashMap::new(), false),
    1,
)
.unwrap();

// Create pipeline (checksum stages added automatically)
let pipeline =
    Pipeline::new("Secure Backup".to_string(), vec![compression, encryption]).unwrap();

assert_eq!(pipeline.name(), "Secure Backup");
// Pipeline has 4 stages: input_checksum + 2 user stages + output_checksum
assert_eq!(pipeline.stages().len(), 4);

§Modifying Pipeline Configuration

use adaptive_pipeline_domain::entities::pipeline::Pipeline;
use adaptive_pipeline_domain::entities::pipeline_stage::{
    PipelineStage, StageConfiguration, StageType,
};
use std::collections::HashMap;

let stage = PipelineStage::new(
    "transform".to_string(),
    StageType::Transform,
    StageConfiguration::default(),
    0,
)
.unwrap();

let mut pipeline = Pipeline::new("Data Pipeline".to_string(), vec![stage]).unwrap();

// Add configuration parameters
let mut config = HashMap::new();
config.insert("output_format".to_string(), "json".to_string());
config.insert("compression_level".to_string(), "6".to_string());
pipeline.update_configuration(config);

assert_eq!(
    pipeline.configuration().get("output_format"),
    Some(&"json".to_string())
);

§Adding Stages Dynamically

use adaptive_pipeline_domain::entities::pipeline::Pipeline;
use adaptive_pipeline_domain::entities::pipeline_stage::{
    PipelineStage, StageConfiguration, StageType,
};

let initial_stage = PipelineStage::new(
    "compress".to_string(),
    StageType::Compression,
    StageConfiguration::default(),
    0,
)
.unwrap();

let mut pipeline =
    Pipeline::new("Processing Pipeline".to_string(), vec![initial_stage]).unwrap();

// Add a new encryption stage
let encryption_stage = PipelineStage::new(
    "encrypt".to_string(),
    StageType::Encryption,
    StageConfiguration::default(),
    0, // order will be adjusted
)
.unwrap();

pipeline.add_stage(encryption_stage).unwrap();

// Pipeline now has 4 stages: input_checksum + compression + encryption + output_checksum
assert_eq!(pipeline.stages().len(), 4);

§Business Rules and Validation

The pipeline enforces several important business rules:

§Stage Compatibility

Consecutive stages must be compatible with each other:

§Minimum Stage Requirement

Pipelines must contain at least one stage (including auto-added checksum stages).

§Stage Ordering

Stages are automatically reordered to maintain proper execution sequence.

§Metrics and Monitoring

Pipelines track processing metrics for performance analysis:

§Repository Integration

The pipeline supports multiple repository patterns:

§Generic Repository

§SQLite Repository

§Error Handling

Pipeline operations return Result types with specific error variants:

  • InvalidConfiguration: Invalid pipeline setup or parameters
  • IncompatibleStage: Stages cannot be used together
  • InvalidInput: Invalid data provided to pipeline methods

§Thread Safety and Concurrency

While the pipeline entity itself is not thread-safe (following DDD principles), it can be safely shared across threads when wrapped in appropriate synchronization primitives like Arc<Mutex<Pipeline>>.

§Performance Considerations

  • Stage validation is performed during modification, not during processing
  • Metrics collection has minimal overhead
  • Automatic stage insertion occurs only during pipeline creation
  • Repository operations are optimized for both read and write performance

Implementations§

Source§

impl Pipeline

Source

pub fn new( name: String, user_stages: Vec<PipelineStage>, ) -> Result<Self, PipelineError>

Creates a new pipeline with the given name and user-defined stages.

§Automatic Stage Insertion

IMPORTANT: This constructor automatically inserts mandatory checksum stages:

  • input_checksum stage is prepended (order: 0)
  • User-defined stages follow (order: 1, 2, 3…)
  • output_checksum stage is appended (order: final)

This ensures the database reflects the complete processing pipeline that actually gets executed, maintaining the “database as single source of truth” principle.

§Example
use adaptive_pipeline_domain::entities::pipeline::Pipeline;
use adaptive_pipeline_domain::entities::pipeline_stage::{
    PipelineStage, StageConfiguration, StageType,
};

let compression = PipelineStage::new(
    "compress".to_string(),
    StageType::Compression,
    StageConfiguration::default(),
    0,
)
.unwrap();

let pipeline = Pipeline::new("My Pipeline".to_string(), vec![compression]).unwrap();

// Verify automatic checksum stages were added
assert_eq!(pipeline.stages().len(), 3); // input_checksum + user stage + output_checksum
assert_eq!(pipeline.name(), "My Pipeline");
§Arguments
  • name - Pipeline name (must not be empty)
  • stages - User-defined processing stages (must not be empty)
§Returns

Returns a Pipeline with automatic checksum stages inserted, or PipelineError if validation fails.

§Errors
  • InvalidConfiguration - If name is empty or no user stages provided
Source

pub fn id(&self) -> &PipelineId

Gets the unique identifier for this pipeline

The pipeline ID is immutable and persists throughout the entity’s lifetime. This ID is used for database lookups, API references, and maintaining entity identity across system boundaries.

§Returns

A reference to the pipeline’s unique identifier

§Examples
Source

pub fn name(&self) -> &str

Gets the human-readable name of the pipeline

The name is used for display purposes, logging, and user identification. Unlike the ID, the name can be duplicated across different pipelines.

§Returns

The pipeline name as a string slice

§Examples
Source

pub fn stages(&self) -> &[PipelineStage]

Gets the ordered list of processing stages in this pipeline

Returns all stages including automatically-inserted checksum stages. Stages are returned in execution order (order: 0, 1, 2, …).

§Why This Returns a Slice

Returning a slice (&[PipelineStage]) instead of a Vec provides:

  • No cloning: Efficient access without copying data
  • Immutability: Prevents external modification of stages
  • Standard interface: Works with all slice methods and iterators
§Returns

An immutable slice containing all pipeline stages in execution order

§Examples
Source

pub fn configuration(&self) -> &HashMap<String, String>

Gets the pipeline configuration parameters

Configuration parameters are key-value pairs that control pipeline behavior. Common parameters include:

  • max_workers: Number of concurrent worker threads
  • chunk_size: Size of data chunks for processing
  • timeout: Processing timeout in seconds
  • buffer_size: I/O buffer size in bytes
§Returns

An immutable reference to the configuration HashMap

§Examples
Source

pub fn metrics(&self) -> &ProcessingMetrics

Gets the current processing metrics for this pipeline

Metrics track performance and execution statistics including:

  • Bytes processed and throughput
  • Processing duration and timestamps
  • Error and warning counts
  • Stage-specific metrics
§Returns

An immutable reference to the pipeline’s metrics

§Examples
Source

pub fn created_at(&self) -> &DateTime<Utc>

Gets the timestamp when this pipeline was created

The creation timestamp is set when the pipeline entity is first constructed and never changes. It’s useful for auditing, sorting, and determining pipeline age.

§Returns

Reference to the UTC creation timestamp

§Examples
Source

pub fn updated_at(&self) -> &DateTime<Utc>

Gets the timestamp of the last modification to this pipeline

The updated timestamp changes whenever the pipeline is modified, including configuration updates, stage additions/removals, or metrics updates.

§Why Track Updates?

Tracking update times enables:

  • Optimistic locking: Detect concurrent modifications
  • Audit trails: Know when changes occurred
  • Cache invalidation: Know when cached data is stale
  • Sorting: Order pipelines by recency
§Returns

Reference to the UTC timestamp of the last update

§Examples
Source

pub fn status(&self) -> &'static str

Gets the current status of the pipeline

Returns a simple status string indicating whether the pipeline is currently active or archived. This is a basic status indicator; detailed operational status (running, idle, failed) should be obtained from monitoring systems like Prometheus/Grafana.

§Why Simple Status?

This returns a static status because:

  • Domain purity: Pipeline entity shouldn’t know about runtime state
  • Separation of concerns: Operational status belongs in monitoring
  • Simplicity: Avoid mixing persistent state with transient state
§Returns
  • "Active" if the pipeline is available for use
  • "Archived" if the pipeline has been soft-deleted
§Examples
§Note

For real-time operational status (running, idle, error states), query your monitoring system (Prometheus/Grafana) instead.

Source

pub fn archived(&self) -> bool

Checks if the pipeline is archived (soft-deleted)

Archived pipelines are not physically deleted but are hidden from normal queries and cannot be executed. Archiving provides:

  • Reversibility: Can be restored if needed
  • Audit trail: Maintains history of deleted pipelines
  • Data integrity: Preserves foreign key relationships
§Returns
  • true if the pipeline is archived
  • false if the pipeline is active
§Examples
Source

pub fn update_configuration(&mut self, config: HashMap<String, String>)

Updates the complete pipeline configuration

Replaces the entire configuration HashMap with new values. Any previous configuration is discarded. For updating individual keys, retrieve the configuration, modify it, and pass it back.

§Why Replace Instead of Merge?

Complete replacement provides:

  • Clear semantics: No ambiguity about what happens to old values
  • Simplicity: No complex merge logic needed
  • Explicit control: Caller decides exact final state
  • Immutability pattern: Aligns with functional programming principles
§Arguments
  • config - The new configuration HashMap to set. Common keys include:
    • max_workers: Number of worker threads
    • chunk_size: Processing chunk size in bytes
    • timeout: Timeout in seconds
    • buffer_size: I/O buffer size
§Side Effects
  • Replaces all configuration values
  • Updates the updated_at timestamp
§Examples
§Updating Individual Keys
Source

pub fn add_stage(&mut self, stage: PipelineStage) -> Result<(), PipelineError>

Adds a new processing stage to the pipeline

Appends a stage to the end of the pipeline’s stage sequence. The new stage must be compatible with the last existing stage according to compatibility rules (e.g., compression should precede encryption).

§Why Compatibility Checking?

Stage compatibility ensures:

  • Correct ordering: Stages execute in a logical sequence
  • Data integrity: Each stage can process the previous stage’s output
  • Performance: Optimal stage ordering (compress before encrypt)
  • Correctness: Prevents invalid combinations (e.g., double compression)
§Arguments
  • stage - The pipeline stage to add. Must be compatible with the current last stage.
§Returns
  • Ok(()) if the stage was added successfully
  • Err(PipelineError::IncompatibleStage) if stage is incompatible
§Errors

Returns IncompatibleStage if the new stage is not compatible with the last stage in the pipeline. For example:

  • Attempting to add encryption before compression
  • Adding duplicate stage types
  • Incompatible algorithm combinations
§Side Effects
  • Appends stage to the pipeline’s stage list
  • Updates the updated_at timestamp
§Examples
Source

pub fn remove_stage( &mut self, index: usize, ) -> Result<PipelineStage, PipelineError>

Removes a processing stage from the pipeline by its index position

Removes the stage at the specified index and returns it. The pipeline must always have at least one stage remaining after removal.

§Why Index-Based Removal?

Index-based removal provides:

  • Precision: Remove exact stage by position
  • Simplicity: No need to search by name or ID
  • Efficiency: O(n) removal where n is stages after index
  • Flexibility: Works even with duplicate stage names
§Arguments
  • index - Zero-based position of the stage to remove (0 = first stage)
§Returns
  • Ok(PipelineStage) - The removed stage
  • Err(PipelineError) - If index is invalid or removal would leave pipeline empty
§Errors

This function returns an error if:

  • InvalidConfiguration: Index is out of bounds (>= stage count)
  • InvalidConfiguration: Removing the last remaining stage
§Side Effects
  • Removes stage from the pipeline
  • Shifts subsequent stages down by one position
  • Updates the updated_at timestamp
§Examples
Source

pub fn update_metrics(&mut self, metrics: ProcessingMetrics)

Updates the pipeline’s processing metrics with new values

Replaces the entire metrics object with new performance data. This is typically called after processing completes to record final statistics.

§Why Replace Metrics?

Complete replacement instead of incremental updates provides:

  • Atomicity: Metrics represent a single processing run
  • Clarity: No confusion about partial vs. complete metrics
  • Simplicity: No merge logic needed
  • Immutability: Aligns with functional programming patterns
§Arguments
  • metrics - New processing metrics to replace current metrics. Should contain complete statistics from a processing run including:
    • Bytes processed and throughput
    • Processing duration
    • Error and warning counts
    • Stage-specific metrics
§Side Effects
  • Replaces current metrics completely
  • Updates the updated_at timestamp
§Examples
Source

pub fn validate(&self) -> Result<(), PipelineError>

Validates the complete pipeline configuration for correctness

Performs comprehensive validation of the pipeline’s configuration including stage count, stage compatibility, and ordering rules. This should be called before attempting to execute the pipeline.

§What is Validated?
  • Stage Count: Pipeline must have at least one stage
  • Stage Compatibility: Each stage must be compatible with the next
  • Stage Ordering: Stages must be in correct execution order
  • Configuration Completeness: All required configuration present
§Why Validate?

Validation prevents:

  • Runtime errors: Catch issues before execution starts
  • Data corruption: Ensure stages can process each other’s output
  • Resource waste: Don’t start processing with invalid configuration
  • Poor UX: Provide clear error messages upfront
§Returns
  • Ok(()) if pipeline configuration is valid
  • Err(PipelineError) with details if validation fails
§Errors

Returns an error if:

  • InvalidConfiguration: No stages present in pipeline
  • IncompatibleStage: Adjacent stages are incompatible
§Examples
§Implementation Note

Validation uses a sliding window to check pairwise stage compatibility, which is O(n) where n is the number of stages.

Source

pub fn from_database(data: PipelineData) -> Result<Self, PipelineError>

Creates a pipeline from database data (for repository use).

§Arguments
  • data - A PipelineData DTO containing all fields from the database
§Returns

Returns Ok(Pipeline) if the data is valid, or Err(PipelineError) if validation fails.

§Errors
  • PipelineError::InvalidConfiguration - If name is empty or no stages provided

Trait Implementations§

Source§

impl Clone for Pipeline

Source§

fn clone(&self) -> Pipeline

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for Pipeline

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<'de> Deserialize<'de> for Pipeline

Source§

fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>
where __D: Deserializer<'de>,

Deserialize this value from the given Serde deserializer. Read more
Source§

impl Serialize for Pipeline

Source§

fn serialize<__S>(&self, __serializer: __S) -> Result<__S::Ok, __S::Error>
where __S: Serializer,

Serialize this value into the given Serde serializer. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> DeserializeOwned for T
where T: for<'de> Deserialize<'de>,