adaptive_pipeline_domain/entities/
pipeline.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Pipeline Entity
9//!
10//! Core domain entity representing a configurable file processing workflow with
11//! ordered stages (compression, encryption, validation). Automatically adds
12//! input/output checksum stages for integrity verification. Follows DDD
13//! principles with unique identity, business rule enforcement, and repository
14//! support. See mdBook for usage examples and architecture details.
15
16use crate::entities::{PipelineStage, ProcessingMetrics};
17use crate::services::datetime_serde;
18use crate::value_objects::PipelineId;
19use crate::PipelineError;
20use chrono::{DateTime, Utc};
21use serde::{Deserialize, Serialize};
22use std::collections::HashMap;
23
24// Import for generic repository support
25// Note: These traits should be defined in domain/repositories, not
26// infrastructure use crate::repositories::RepositoryEntity;
27// use crate::repositories::SqliteEntity;
28
29/// Data Transfer Object for reconstituting a Pipeline from database storage.
30///
31/// This DTO represents the raw database row structure and is used by repository
32/// implementations to reconstruct Pipeline entities. It separates database
33/// representation from domain logic, following the Repository pattern.
34///
35/// # Usage
36///
37/// This DTO is typically created by repository implementations when fetching
38/// data from the database, then passed to `Pipeline::from_database()` to
39/// create a domain entity.
40#[derive(Debug, Clone)]
41pub struct PipelineData {
42    pub id: PipelineId,
43    pub name: String,
44    pub archived: bool,
45    pub configuration: HashMap<String, String>,
46    pub metrics: ProcessingMetrics,
47    pub stages: Vec<PipelineStage>,
48    pub created_at: DateTime<Utc>,
49    pub updated_at: DateTime<Utc>,
50}
51
52/// Core pipeline entity representing a configurable processing workflow.
53///
54/// A `Pipeline` is a domain entity that orchestrates file processing through
55/// an ordered sequence of stages. Each pipeline has a unique identity and
56/// maintains its own configuration, metrics, and processing history.
57///
58/// ## Entity Characteristics
59///
60/// - **Identity**: Unique `PipelineId` that persists through all changes
61/// - **Mutability**: Can be modified while preserving identity
62/// - **Business Logic**: Enforces stage compatibility and ordering rules
63/// - **Lifecycle**: Tracks creation and modification timestamps
64/// - **Persistence**: Supports both generic and SQLite repository patterns
65///
66/// ## Automatic Integrity Verification
67///
68/// Every pipeline automatically includes integrity verification stages:
69///
70/// ```text
71/// [Input Checksum] -> [User Stage 1] -> [User Stage 2] -> [Output Checksum]
72///      (order: 0)       (order: 1)       (order: 2)        (order: 3)
73/// ```
74///
75/// This ensures data integrity is maintained throughout the entire processing
76/// workflow.
77///
78/// ## Usage Examples
79///
80/// ### Creating a New Pipeline
81///
82/// ```
83/// use adaptive_pipeline_domain::entities::pipeline::Pipeline;
84/// use adaptive_pipeline_domain::entities::pipeline_stage::{
85///     PipelineStage, StageConfiguration, StageType,
86/// };
87/// use std::collections::HashMap;
88///
89/// // Create user-defined stages
90/// let compression = PipelineStage::new(
91///     "compress".to_string(),
92///     StageType::Compression,
93///     StageConfiguration::new("zstd".to_string(), HashMap::new(), true),
94///     0,
95/// )
96/// .unwrap();
97///
98/// let encryption = PipelineStage::new(
99///     "encrypt".to_string(),
100///     StageType::Encryption,
101///     StageConfiguration::new("aes256gcm".to_string(), HashMap::new(), false),
102///     1,
103/// )
104/// .unwrap();
105///
106/// // Create pipeline (checksum stages added automatically)
107/// let pipeline =
108///     Pipeline::new("Secure Backup".to_string(), vec![compression, encryption]).unwrap();
109///
110/// assert_eq!(pipeline.name(), "Secure Backup");
111/// // Pipeline has 4 stages: input_checksum + 2 user stages + output_checksum
112/// assert_eq!(pipeline.stages().len(), 4);
113/// ```
114///
115/// ### Modifying Pipeline Configuration
116///
117/// ```
118/// use adaptive_pipeline_domain::entities::pipeline::Pipeline;
119/// use adaptive_pipeline_domain::entities::pipeline_stage::{
120///     PipelineStage, StageConfiguration, StageType,
121/// };
122/// use std::collections::HashMap;
123///
124/// let stage = PipelineStage::new(
125///     "transform".to_string(),
126///     StageType::Transform,
127///     StageConfiguration::default(),
128///     0,
129/// )
130/// .unwrap();
131///
132/// let mut pipeline = Pipeline::new("Data Pipeline".to_string(), vec![stage]).unwrap();
133///
134/// // Add configuration parameters
135/// let mut config = HashMap::new();
136/// config.insert("output_format".to_string(), "json".to_string());
137/// config.insert("compression_level".to_string(), "6".to_string());
138/// pipeline.update_configuration(config);
139///
140/// assert_eq!(
141///     pipeline.configuration().get("output_format"),
142///     Some(&"json".to_string())
143/// );
144/// ```
145///
146/// ### Adding Stages Dynamically
147///
148/// ```
149/// use adaptive_pipeline_domain::entities::pipeline::Pipeline;
150/// use adaptive_pipeline_domain::entities::pipeline_stage::{
151///     PipelineStage, StageConfiguration, StageType,
152/// };
153///
154/// let initial_stage = PipelineStage::new(
155///     "compress".to_string(),
156///     StageType::Compression,
157///     StageConfiguration::default(),
158///     0,
159/// )
160/// .unwrap();
161///
162/// let mut pipeline =
163///     Pipeline::new("Processing Pipeline".to_string(), vec![initial_stage]).unwrap();
164///
165/// // Add a new encryption stage
166/// let encryption_stage = PipelineStage::new(
167///     "encrypt".to_string(),
168///     StageType::Encryption,
169///     StageConfiguration::default(),
170///     0, // order will be adjusted
171/// )
172/// .unwrap();
173///
174/// pipeline.add_stage(encryption_stage).unwrap();
175///
176/// // Pipeline now has 4 stages: input_checksum + compression + encryption + output_checksum
177/// assert_eq!(pipeline.stages().len(), 4);
178/// ```
179///
180///
181/// ## Business Rules and Validation
182///
183/// The pipeline enforces several important business rules:
184///
185/// ### Stage Compatibility
186/// Consecutive stages must be compatible with each other:
187///
188///
189/// ### Minimum Stage Requirement
190/// Pipelines must contain at least one stage (including auto-added checksum
191/// stages).
192///
193/// ### Stage Ordering
194/// Stages are automatically reordered to maintain proper execution sequence.
195///
196/// ## Metrics and Monitoring
197///
198/// Pipelines track processing metrics for performance analysis:
199///
200///
201/// ## Repository Integration
202///
203/// The pipeline supports multiple repository patterns:
204///
205/// ### Generic Repository
206///
207/// ### SQLite Repository
208///
209/// ## Error Handling
210///
211/// Pipeline operations return `Result` types with specific error variants:
212///
213/// - `InvalidConfiguration`: Invalid pipeline setup or parameters
214/// - `IncompatibleStage`: Stages cannot be used together
215/// - `InvalidInput`: Invalid data provided to pipeline methods
216///
217/// ## Thread Safety and Concurrency
218///
219/// While the pipeline entity itself is not thread-safe (following DDD
220/// principles), it can be safely shared across threads when wrapped in
221/// appropriate synchronization primitives like `Arc<Mutex<Pipeline>>`.
222///
223/// ## Performance Considerations
224///
225/// - Stage validation is performed during modification, not during processing
226/// - Metrics collection has minimal overhead
227/// - Automatic stage insertion occurs only during pipeline creation
228/// - Repository operations are optimized for both read and write performance
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct Pipeline {
231    // Identity fields (always first)
232    id: PipelineId,
233    name: String,
234
235    // Core business fields (alphabetical within group)
236    archived: bool,
237    configuration: HashMap<String, String>,
238    metrics: ProcessingMetrics,
239    stages: Vec<PipelineStage>,
240
241    // Metadata fields (always last)
242    #[serde(with = "datetime_serde")]
243    created_at: chrono::DateTime<chrono::Utc>,
244    #[serde(with = "datetime_serde")]
245    updated_at: chrono::DateTime<chrono::Utc>,
246}
247
248impl Pipeline {
249    /// Creates the mandatory input checksum stage
250    ///
251    /// This stage is automatically prepended to every pipeline to ensure
252    /// input file integrity verification.
253    fn create_input_checksum_stage() -> Result<PipelineStage, PipelineError> {
254        PipelineStage::new(
255            "input_checksum".to_string(),
256            crate::entities::pipeline_stage::StageType::Checksum,
257            crate::entities::pipeline_stage::StageConfiguration::new(
258                "sha256".to_string(),
259                HashMap::new(),
260                false, // not parallel
261            ),
262            0, // order: first
263        )
264    }
265
266    /// Creates the mandatory output checksum stage
267    ///
268    /// This stage is automatically appended to every pipeline to ensure
269    /// output file integrity verification.
270    ///
271    /// # Arguments
272    ///
273    /// * `order` - The order position for this stage (should be last)
274    fn create_output_checksum_stage(order: u32) -> Result<PipelineStage, PipelineError> {
275        PipelineStage::new(
276            "output_checksum".to_string(),
277            crate::entities::pipeline_stage::StageType::Checksum,
278            crate::entities::pipeline_stage::StageConfiguration::new(
279                "sha256".to_string(),
280                HashMap::new(),
281                false, // not parallel
282            ),
283            order, // order: last
284        )
285    }
286    /// Creates a new pipeline with the given name and user-defined stages.
287    ///
288    /// # Automatic Stage Insertion
289    ///
290    /// **IMPORTANT**: This constructor automatically inserts mandatory checksum
291    /// stages:
292    /// - `input_checksum` stage is prepended (order: 0)
293    /// - User-defined stages follow (order: 1, 2, 3...)
294    /// - `output_checksum` stage is appended (order: final)
295    ///
296    /// This ensures the database reflects the complete processing pipeline that
297    /// actually gets executed, maintaining the "database as single source
298    /// of truth" principle.
299    ///
300    /// # Example
301    ///
302    /// ```
303    /// use adaptive_pipeline_domain::entities::pipeline::Pipeline;
304    /// use adaptive_pipeline_domain::entities::pipeline_stage::{
305    ///     PipelineStage, StageConfiguration, StageType,
306    /// };
307    ///
308    /// let compression = PipelineStage::new(
309    ///     "compress".to_string(),
310    ///     StageType::Compression,
311    ///     StageConfiguration::default(),
312    ///     0,
313    /// )
314    /// .unwrap();
315    ///
316    /// let pipeline = Pipeline::new("My Pipeline".to_string(), vec![compression]).unwrap();
317    ///
318    /// // Verify automatic checksum stages were added
319    /// assert_eq!(pipeline.stages().len(), 3); // input_checksum + user stage + output_checksum
320    /// assert_eq!(pipeline.name(), "My Pipeline");
321    /// ```
322    ///
323    /// # Arguments
324    ///
325    /// * `name` - Pipeline name (must not be empty)
326    /// * `stages` - User-defined processing stages (must not be empty)
327    ///
328    /// # Returns
329    ///
330    /// Returns a `Pipeline` with automatic checksum stages inserted, or
331    /// `PipelineError` if validation fails.
332    ///
333    /// # Errors
334    ///
335    /// * `InvalidConfiguration` - If name is empty or no user stages provided
336    pub fn new(name: String, user_stages: Vec<PipelineStage>) -> Result<Self, PipelineError> {
337        if name.is_empty() {
338            return Err(PipelineError::InvalidConfiguration(
339                "Pipeline name cannot be empty".to_string(),
340            ));
341        }
342
343        if user_stages.is_empty() {
344            return Err(PipelineError::InvalidConfiguration(
345                "Pipeline must have at least one user-defined stage".to_string(),
346            ));
347        }
348
349        let now = chrono::Utc::now();
350
351        // Calculate total stages before consuming user_stages vector
352        let user_stage_count = user_stages.len();
353
354        // Build complete pipeline stages: input_checksum + user_stages +
355        // output_checksum
356        let mut complete_stages = Vec::with_capacity(user_stage_count + 2);
357
358        // 1. Create and add input_checksum stage (order: 0)
359        let input_checksum_stage = Self::create_input_checksum_stage()?;
360        complete_stages.push(input_checksum_stage);
361
362        // 2. Add user stages with proper order (starting from 1)
363        for (index, stage) in user_stages.into_iter().enumerate() {
364            // Create new stage with adjusted order to account for input_checksum at
365            // position 0
366            let user_stage = PipelineStage::new(
367                stage.name().to_string(),
368                *stage.stage_type(),
369                stage.configuration().clone(),
370                (index + 1) as u32, // order: 1, 2, 3...
371            )?;
372            complete_stages.push(user_stage);
373        }
374
375        // 3. Create and add output_checksum stage (order: last)
376        let output_checksum_stage = Self::create_output_checksum_stage((user_stage_count + 1) as u32)?;
377        complete_stages.push(output_checksum_stage);
378
379        Ok(Pipeline {
380            // Identity fields
381            id: PipelineId::new(),
382            name,
383
384            // Core business fields (alphabetical)
385            archived: false,
386            configuration: HashMap::new(),
387            metrics: ProcessingMetrics::default(),
388            stages: complete_stages, // Complete pipeline with mandatory stages
389
390            // Metadata fields
391            created_at: now,
392            updated_at: now,
393        })
394    }
395
396    /// Gets the unique identifier for this pipeline
397    ///
398    /// The pipeline ID is immutable and persists throughout the entity's
399    /// lifetime. This ID is used for database lookups, API references, and
400    /// maintaining entity identity across system boundaries.
401    ///
402    /// # Returns
403    ///
404    /// A reference to the pipeline's unique identifier
405    ///
406    /// # Examples
407    pub fn id(&self) -> &PipelineId {
408        &self.id
409    }
410
411    /// Gets the human-readable name of the pipeline
412    ///
413    /// The name is used for display purposes, logging, and user identification.
414    /// Unlike the ID, the name can be duplicated across different pipelines.
415    ///
416    /// # Returns
417    ///
418    /// The pipeline name as a string slice
419    ///
420    /// # Examples
421    pub fn name(&self) -> &str {
422        &self.name
423    }
424
425    /// Gets the ordered list of processing stages in this pipeline
426    ///
427    /// Returns all stages including automatically-inserted checksum stages.
428    /// Stages are returned in execution order (order: 0, 1, 2, ...).
429    ///
430    /// # Why This Returns a Slice
431    ///
432    /// Returning a slice (`&[PipelineStage]`) instead of a Vec provides:
433    /// - **No cloning**: Efficient access without copying data
434    /// - **Immutability**: Prevents external modification of stages
435    /// - **Standard interface**: Works with all slice methods and iterators
436    ///
437    /// # Returns
438    ///
439    /// An immutable slice containing all pipeline stages in execution order
440    ///
441    /// # Examples
442    pub fn stages(&self) -> &[PipelineStage] {
443        &self.stages
444    }
445
446    /// Gets the pipeline configuration parameters
447    ///
448    /// Configuration parameters are key-value pairs that control pipeline
449    /// behavior. Common parameters include:
450    /// - `max_workers`: Number of concurrent worker threads
451    /// - `chunk_size`: Size of data chunks for processing
452    /// - `timeout`: Processing timeout in seconds
453    /// - `buffer_size`: I/O buffer size in bytes
454    ///
455    /// # Returns
456    ///
457    /// An immutable reference to the configuration HashMap
458    ///
459    /// # Examples
460    pub fn configuration(&self) -> &HashMap<String, String> {
461        &self.configuration
462    }
463
464    /// Gets the current processing metrics for this pipeline
465    ///
466    /// Metrics track performance and execution statistics including:
467    /// - Bytes processed and throughput
468    /// - Processing duration and timestamps
469    /// - Error and warning counts
470    /// - Stage-specific metrics
471    ///
472    /// # Returns
473    ///
474    /// An immutable reference to the pipeline's metrics
475    ///
476    /// # Examples
477    pub fn metrics(&self) -> &ProcessingMetrics {
478        &self.metrics
479    }
480
481    /// Gets the timestamp when this pipeline was created
482    ///
483    /// The creation timestamp is set when the pipeline entity is first
484    /// constructed and never changes. It's useful for auditing, sorting,
485    /// and determining pipeline age.
486    ///
487    /// # Returns
488    ///
489    /// Reference to the UTC creation timestamp
490    ///
491    /// # Examples
492    pub fn created_at(&self) -> &chrono::DateTime<chrono::Utc> {
493        &self.created_at
494    }
495
496    /// Gets the timestamp of the last modification to this pipeline
497    ///
498    /// The updated timestamp changes whenever the pipeline is modified,
499    /// including configuration updates, stage additions/removals, or
500    /// metrics updates.
501    ///
502    /// # Why Track Updates?
503    ///
504    /// Tracking update times enables:
505    /// - **Optimistic locking**: Detect concurrent modifications
506    /// - **Audit trails**: Know when changes occurred
507    /// - **Cache invalidation**: Know when cached data is stale
508    /// - **Sorting**: Order pipelines by recency
509    ///
510    /// # Returns
511    ///
512    /// Reference to the UTC timestamp of the last update
513    ///
514    /// # Examples
515    pub fn updated_at(&self) -> &chrono::DateTime<chrono::Utc> {
516        &self.updated_at
517    }
518
519    /// Gets the current status of the pipeline
520    ///
521    /// Returns a simple status string indicating whether the pipeline is
522    /// currently active or archived. This is a basic status indicator;
523    /// detailed operational status (running, idle, failed) should be
524    /// obtained from monitoring systems like Prometheus/Grafana.
525    ///
526    /// # Why Simple Status?
527    ///
528    /// This returns a static status because:
529    /// - **Domain purity**: Pipeline entity shouldn't know about runtime state
530    /// - **Separation of concerns**: Operational status belongs in monitoring
531    /// - **Simplicity**: Avoid mixing persistent state with transient state
532    ///
533    /// # Returns
534    ///
535    /// - `"Active"` if the pipeline is available for use
536    /// - `"Archived"` if the pipeline has been soft-deleted
537    ///
538    /// # Examples
539    ///
540    ///
541    /// # Note
542    ///
543    /// For real-time operational status (running, idle, error states),
544    /// query your monitoring system (Prometheus/Grafana) instead.
545    pub fn status(&self) -> &'static str {
546        if self.archived {
547            "Archived"
548        } else {
549            "Active"
550        }
551    }
552
553    /// Checks if the pipeline is archived (soft-deleted)
554    ///
555    /// Archived pipelines are not physically deleted but are hidden from
556    /// normal queries and cannot be executed. Archiving provides:
557    /// - **Reversibility**: Can be restored if needed
558    /// - **Audit trail**: Maintains history of deleted pipelines
559    /// - **Data integrity**: Preserves foreign key relationships
560    ///
561    /// # Returns
562    ///
563    /// - `true` if the pipeline is archived
564    /// - `false` if the pipeline is active
565    ///
566    /// # Examples
567    pub fn archived(&self) -> bool {
568        self.archived
569    }
570
571    /// Updates the complete pipeline configuration
572    ///
573    /// Replaces the entire configuration HashMap with new values. Any previous
574    /// configuration is discarded. For updating individual keys, retrieve the
575    /// configuration, modify it, and pass it back.
576    ///
577    /// # Why Replace Instead of Merge?
578    ///
579    /// Complete replacement provides:
580    /// - **Clear semantics**: No ambiguity about what happens to old values
581    /// - **Simplicity**: No complex merge logic needed
582    /// - **Explicit control**: Caller decides exact final state
583    /// - **Immutability pattern**: Aligns with functional programming
584    ///   principles
585    ///
586    /// # Arguments
587    ///
588    /// * `config` - The new configuration HashMap to set. Common keys include:
589    ///   - `max_workers`: Number of worker threads
590    ///   - `chunk_size`: Processing chunk size in bytes
591    ///   - `timeout`: Timeout in seconds
592    ///   - `buffer_size`: I/O buffer size
593    ///
594    /// # Side Effects
595    ///
596    /// - Replaces all configuration values
597    /// - Updates the `updated_at` timestamp
598    ///
599    /// # Examples
600    ///
601    ///
602    /// ## Updating Individual Keys
603    pub fn update_configuration(&mut self, config: HashMap<String, String>) {
604        self.configuration = config;
605        self.updated_at = chrono::Utc::now();
606    }
607
608    /// Adds a new processing stage to the pipeline
609    ///
610    /// Appends a stage to the end of the pipeline's stage sequence. The new
611    /// stage must be compatible with the last existing stage according to
612    /// compatibility rules (e.g., compression should precede encryption).
613    ///
614    /// # Why Compatibility Checking?
615    ///
616    /// Stage compatibility ensures:
617    /// - **Correct ordering**: Stages execute in a logical sequence
618    /// - **Data integrity**: Each stage can process the previous stage's output
619    /// - **Performance**: Optimal stage ordering (compress before encrypt)
620    /// - **Correctness**: Prevents invalid combinations (e.g., double
621    ///   compression)
622    ///
623    /// # Arguments
624    ///
625    /// * `stage` - The pipeline stage to add. Must be compatible with the
626    ///   current last stage.
627    ///
628    /// # Returns
629    ///
630    /// - `Ok(())` if the stage was added successfully
631    /// - `Err(PipelineError::IncompatibleStage)` if stage is incompatible
632    ///
633    /// # Errors
634    ///
635    /// Returns `IncompatibleStage` if the new stage is not compatible with
636    /// the last stage in the pipeline. For example:
637    /// - Attempting to add encryption before compression
638    /// - Adding duplicate stage types
639    /// - Incompatible algorithm combinations
640    ///
641    /// # Side Effects
642    ///
643    /// - Appends stage to the pipeline's stage list
644    /// - Updates the `updated_at` timestamp
645    ///
646    /// # Examples
647    pub fn add_stage(&mut self, stage: PipelineStage) -> Result<(), PipelineError> {
648        // Validate stage compatibility
649        if let Some(last_stage) = self.stages.last() {
650            if !last_stage.is_compatible_with(&stage) {
651                return Err(PipelineError::IncompatibleStage(format!(
652                    "Stage {} is not compatible with {}",
653                    stage.name(),
654                    last_stage.name()
655                )));
656            }
657        }
658
659        self.stages.push(stage);
660        self.updated_at = chrono::Utc::now();
661        Ok(())
662    }
663
664    /// Removes a processing stage from the pipeline by its index position
665    ///
666    /// Removes the stage at the specified index and returns it. The pipeline
667    /// must always have at least one stage remaining after removal.
668    ///
669    /// # Why Index-Based Removal?
670    ///
671    /// Index-based removal provides:
672    /// - **Precision**: Remove exact stage by position
673    /// - **Simplicity**: No need to search by name or ID
674    /// - **Efficiency**: O(n) removal where n is stages after index
675    /// - **Flexibility**: Works even with duplicate stage names
676    ///
677    /// # Arguments
678    ///
679    /// * `index` - Zero-based position of the stage to remove (0 = first stage)
680    ///
681    /// # Returns
682    ///
683    /// - `Ok(PipelineStage)` - The removed stage
684    /// - `Err(PipelineError)` - If index is invalid or removal would leave
685    ///   pipeline empty
686    ///
687    /// # Errors
688    ///
689    /// This function returns an error if:
690    /// - `InvalidConfiguration`: Index is out of bounds (>= stage count)
691    /// - `InvalidConfiguration`: Removing the last remaining stage
692    ///
693    /// # Side Effects
694    ///
695    /// - Removes stage from the pipeline
696    /// - Shifts subsequent stages down by one position
697    /// - Updates the `updated_at` timestamp
698    ///
699    /// # Examples
700    pub fn remove_stage(&mut self, index: usize) -> Result<PipelineStage, PipelineError> {
701        if index >= self.stages.len() {
702            return Err(PipelineError::InvalidConfiguration(
703                "Stage index out of bounds".to_string(),
704            ));
705        }
706
707        if self.stages.len() == 1 {
708            return Err(PipelineError::InvalidConfiguration(
709                "Cannot remove the last stage".to_string(),
710            ));
711        }
712
713        self.updated_at = chrono::Utc::now();
714        Ok(self.stages.remove(index))
715    }
716
717    /// Updates the pipeline's processing metrics with new values
718    ///
719    /// Replaces the entire metrics object with new performance data. This is
720    /// typically called after processing completes to record final statistics.
721    ///
722    /// # Why Replace Metrics?
723    ///
724    /// Complete replacement instead of incremental updates provides:
725    /// - **Atomicity**: Metrics represent a single processing run
726    /// - **Clarity**: No confusion about partial vs. complete metrics
727    /// - **Simplicity**: No merge logic needed
728    /// - **Immutability**: Aligns with functional programming patterns
729    ///
730    /// # Arguments
731    ///
732    /// * `metrics` - New processing metrics to replace current metrics. Should
733    ///   contain complete statistics from a processing run including:
734    ///   - Bytes processed and throughput
735    ///   - Processing duration
736    ///   - Error and warning counts
737    ///   - Stage-specific metrics
738    ///
739    /// # Side Effects
740    ///
741    /// - Replaces current metrics completely
742    /// - Updates the `updated_at` timestamp
743    ///
744    /// # Examples
745    pub fn update_metrics(&mut self, metrics: ProcessingMetrics) {
746        self.metrics = metrics;
747        self.updated_at = chrono::Utc::now();
748    }
749
750    /// Validates the complete pipeline configuration for correctness
751    ///
752    /// Performs comprehensive validation of the pipeline's configuration
753    /// including stage count, stage compatibility, and ordering rules. This
754    /// should be called before attempting to execute the pipeline.
755    ///
756    /// # What is Validated?
757    ///
758    /// - **Stage Count**: Pipeline must have at least one stage
759    /// - **Stage Compatibility**: Each stage must be compatible with the next
760    /// - **Stage Ordering**: Stages must be in correct execution order
761    /// - **Configuration Completeness**: All required configuration present
762    ///
763    /// # Why Validate?
764    ///
765    /// Validation prevents:
766    /// - **Runtime errors**: Catch issues before execution starts
767    /// - **Data corruption**: Ensure stages can process each other's output
768    /// - **Resource waste**: Don't start processing with invalid configuration
769    /// - **Poor UX**: Provide clear error messages upfront
770    ///
771    /// # Returns
772    ///
773    /// - `Ok(())` if pipeline configuration is valid
774    /// - `Err(PipelineError)` with details if validation fails
775    ///
776    /// # Errors
777    ///
778    /// Returns an error if:
779    /// - `InvalidConfiguration`: No stages present in pipeline
780    /// - `IncompatibleStage`: Adjacent stages are incompatible
781    ///
782    /// # Examples
783    ///
784    ///
785    /// # Implementation Note
786    ///
787    /// Validation uses a sliding window to check pairwise stage compatibility,
788    /// which is O(n) where n is the number of stages.
789    pub fn validate(&self) -> Result<(), PipelineError> {
790        if self.stages.is_empty() {
791            return Err(PipelineError::InvalidConfiguration(
792                "Pipeline must have at least one stage".to_string(),
793            ));
794        }
795
796        // Validate stage sequence
797        for window in self.stages.windows(2) {
798            if !window[0].is_compatible_with(&window[1]) {
799                return Err(PipelineError::IncompatibleStage(format!(
800                    "Stages {} and {} are not compatible",
801                    window[0].name(),
802                    window[1].name()
803                )));
804            }
805        }
806
807        Ok(())
808    }
809
810    /// Creates a pipeline from database data (for repository use).
811    ///
812    /// # Arguments
813    ///
814    /// * `data` - A `PipelineData` DTO containing all fields from the database
815    ///
816    /// # Returns
817    ///
818    /// Returns `Ok(Pipeline)` if the data is valid, or `Err(PipelineError)` if
819    /// validation fails.
820    ///
821    /// # Errors
822    ///
823    /// * `PipelineError::InvalidConfiguration` - If name is empty or no stages
824    ///   provided
825    pub fn from_database(data: PipelineData) -> Result<Self, PipelineError> {
826        if data.name.is_empty() {
827            return Err(PipelineError::InvalidConfiguration(
828                "Pipeline name cannot be empty".to_string(),
829            ));
830        }
831
832        if data.stages.is_empty() {
833            return Err(PipelineError::InvalidConfiguration(
834                "Pipeline must have at least one stage".to_string(),
835            ));
836        }
837
838        Ok(Pipeline {
839            id: data.id,
840            name: data.name,
841            archived: data.archived,
842            configuration: data.configuration,
843            metrics: data.metrics,
844            stages: data.stages,
845            created_at: data.created_at,
846            updated_at: data.updated_at,
847        })
848    }
849}
850
851// Implementation for generic repository support
852// This allows Pipeline to be used with the generic InMemoryRepository<T>
853// TODO: These traits should be defined in domain/repositories, not referenced
854// from infrastructure
855/*
856impl RepositoryEntity for Pipeline {
857    type Id = PipelineId;
858
859    fn id(&self) -> Self::Id {
860        self.id.clone()
861    }
862
863    fn name(&self) -> Option<&str> {
864        Some(&self.name)
865    }
866}
867
868// Implementation for SQLite repository support
869// This allows Pipeline to be used with the SQLite repository through adapters
870impl SqliteEntity for Pipeline {
871    type Id = PipelineId;
872
873    fn id(&self) -> Self::Id {
874        self.id.clone()
875    }
876
877    fn table_name() -> &'static str {
878        "pipelines"
879    }
880
881    fn table_schema() -> &'static str {
882        r#"
883        CREATE TABLE IF NOT EXISTS pipelines (
884            id TEXT PRIMARY KEY,
885            name TEXT NOT NULL,
886            data TEXT NOT NULL,
887            created_at TEXT NOT NULL,
888            updated_at TEXT NOT NULL,
889            archived BOOLEAN NOT NULL DEFAULT false,
890            UNIQUE(name)
891        )
892        "#
893    }
894
895    fn name(&self) -> Option<&str> {
896        Some(&self.name)
897    }
898
899    fn id_to_string(&self) -> String {
900        self.id.to_string()
901    }
902
903    fn id_to_string_static(id: &Self::Id) -> String {
904        id.to_string()
905    }
906
907    fn id_from_string(s: &str) -> Result<Self::Id, PipelineError> {
908        PipelineId::from_string(s)
909    }
910}
911*/
912
913/// Helper function to convert PipelineId to Uuid
914///
915/// This is used primarily for event sourcing where events use Uuid
916/// while the domain entities use PipelineId.
917pub fn pipeline_id_to_uuid(pipeline_id: &PipelineId) -> uuid::Uuid {
918    let ulid = pipeline_id.as_ulid();
919    uuid::Uuid::from_u128(ulid.0)
920}