adaptive_pipeline_domain/entities/pipeline.rs
1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Pipeline Entity
9//!
10//! Core domain entity representing a configurable file processing workflow with
11//! ordered stages (compression, encryption, validation). Automatically adds
12//! input/output checksum stages for integrity verification. Follows DDD
13//! principles with unique identity, business rule enforcement, and repository
14//! support. See mdBook for usage examples and architecture details.
15
16use crate::entities::{PipelineStage, ProcessingMetrics};
17use crate::services::datetime_serde;
18use crate::value_objects::PipelineId;
19use crate::PipelineError;
20use chrono::{DateTime, Utc};
21use serde::{Deserialize, Serialize};
22use std::collections::HashMap;
23
24// Import for generic repository support
25// Note: These traits should be defined in domain/repositories, not
26// infrastructure use crate::repositories::RepositoryEntity;
27// use crate::repositories::SqliteEntity;
28
29/// Data Transfer Object for reconstituting a Pipeline from database storage.
30///
31/// This DTO represents the raw database row structure and is used by repository
32/// implementations to reconstruct Pipeline entities. It separates database
33/// representation from domain logic, following the Repository pattern.
34///
35/// # Usage
36///
37/// This DTO is typically created by repository implementations when fetching
38/// data from the database, then passed to `Pipeline::from_database()` to
39/// create a domain entity.
40#[derive(Debug, Clone)]
41pub struct PipelineData {
42 pub id: PipelineId,
43 pub name: String,
44 pub archived: bool,
45 pub configuration: HashMap<String, String>,
46 pub metrics: ProcessingMetrics,
47 pub stages: Vec<PipelineStage>,
48 pub created_at: DateTime<Utc>,
49 pub updated_at: DateTime<Utc>,
50}
51
52/// Core pipeline entity representing a configurable processing workflow.
53///
54/// A `Pipeline` is a domain entity that orchestrates file processing through
55/// an ordered sequence of stages. Each pipeline has a unique identity and
56/// maintains its own configuration, metrics, and processing history.
57///
58/// ## Entity Characteristics
59///
60/// - **Identity**: Unique `PipelineId` that persists through all changes
61/// - **Mutability**: Can be modified while preserving identity
62/// - **Business Logic**: Enforces stage compatibility and ordering rules
63/// - **Lifecycle**: Tracks creation and modification timestamps
64/// - **Persistence**: Supports both generic and SQLite repository patterns
65///
66/// ## Automatic Integrity Verification
67///
68/// Every pipeline automatically includes integrity verification stages:
69///
70/// ```text
71/// [Input Checksum] -> [User Stage 1] -> [User Stage 2] -> [Output Checksum]
72/// (order: 0) (order: 1) (order: 2) (order: 3)
73/// ```
74///
75/// This ensures data integrity is maintained throughout the entire processing
76/// workflow.
77///
78/// ## Usage Examples
79///
80/// ### Creating a New Pipeline
81///
82/// ```
83/// use adaptive_pipeline_domain::entities::pipeline::Pipeline;
84/// use adaptive_pipeline_domain::entities::pipeline_stage::{
85/// PipelineStage, StageConfiguration, StageType,
86/// };
87/// use std::collections::HashMap;
88///
89/// // Create user-defined stages
90/// let compression = PipelineStage::new(
91/// "compress".to_string(),
92/// StageType::Compression,
93/// StageConfiguration::new("zstd".to_string(), HashMap::new(), true),
94/// 0,
95/// )
96/// .unwrap();
97///
98/// let encryption = PipelineStage::new(
99/// "encrypt".to_string(),
100/// StageType::Encryption,
101/// StageConfiguration::new("aes256gcm".to_string(), HashMap::new(), false),
102/// 1,
103/// )
104/// .unwrap();
105///
106/// // Create pipeline (checksum stages added automatically)
107/// let pipeline =
108/// Pipeline::new("Secure Backup".to_string(), vec![compression, encryption]).unwrap();
109///
110/// assert_eq!(pipeline.name(), "Secure Backup");
111/// // Pipeline has 4 stages: input_checksum + 2 user stages + output_checksum
112/// assert_eq!(pipeline.stages().len(), 4);
113/// ```
114///
115/// ### Modifying Pipeline Configuration
116///
117/// ```
118/// use adaptive_pipeline_domain::entities::pipeline::Pipeline;
119/// use adaptive_pipeline_domain::entities::pipeline_stage::{
120/// PipelineStage, StageConfiguration, StageType,
121/// };
122/// use std::collections::HashMap;
123///
124/// let stage = PipelineStage::new(
125/// "transform".to_string(),
126/// StageType::Transform,
127/// StageConfiguration::default(),
128/// 0,
129/// )
130/// .unwrap();
131///
132/// let mut pipeline = Pipeline::new("Data Pipeline".to_string(), vec![stage]).unwrap();
133///
134/// // Add configuration parameters
135/// let mut config = HashMap::new();
136/// config.insert("output_format".to_string(), "json".to_string());
137/// config.insert("compression_level".to_string(), "6".to_string());
138/// pipeline.update_configuration(config);
139///
140/// assert_eq!(
141/// pipeline.configuration().get("output_format"),
142/// Some(&"json".to_string())
143/// );
144/// ```
145///
146/// ### Adding Stages Dynamically
147///
148/// ```
149/// use adaptive_pipeline_domain::entities::pipeline::Pipeline;
150/// use adaptive_pipeline_domain::entities::pipeline_stage::{
151/// PipelineStage, StageConfiguration, StageType,
152/// };
153///
154/// let initial_stage = PipelineStage::new(
155/// "compress".to_string(),
156/// StageType::Compression,
157/// StageConfiguration::default(),
158/// 0,
159/// )
160/// .unwrap();
161///
162/// let mut pipeline =
163/// Pipeline::new("Processing Pipeline".to_string(), vec![initial_stage]).unwrap();
164///
165/// // Add a new encryption stage
166/// let encryption_stage = PipelineStage::new(
167/// "encrypt".to_string(),
168/// StageType::Encryption,
169/// StageConfiguration::default(),
170/// 0, // order will be adjusted
171/// )
172/// .unwrap();
173///
174/// pipeline.add_stage(encryption_stage).unwrap();
175///
176/// // Pipeline now has 4 stages: input_checksum + compression + encryption + output_checksum
177/// assert_eq!(pipeline.stages().len(), 4);
178/// ```
179///
180///
181/// ## Business Rules and Validation
182///
183/// The pipeline enforces several important business rules:
184///
185/// ### Stage Compatibility
186/// Consecutive stages must be compatible with each other:
187///
188///
189/// ### Minimum Stage Requirement
190/// Pipelines must contain at least one stage (including auto-added checksum
191/// stages).
192///
193/// ### Stage Ordering
194/// Stages are automatically reordered to maintain proper execution sequence.
195///
196/// ## Metrics and Monitoring
197///
198/// Pipelines track processing metrics for performance analysis:
199///
200///
201/// ## Repository Integration
202///
203/// The pipeline supports multiple repository patterns:
204///
205/// ### Generic Repository
206///
207/// ### SQLite Repository
208///
209/// ## Error Handling
210///
211/// Pipeline operations return `Result` types with specific error variants:
212///
213/// - `InvalidConfiguration`: Invalid pipeline setup or parameters
214/// - `IncompatibleStage`: Stages cannot be used together
215/// - `InvalidInput`: Invalid data provided to pipeline methods
216///
217/// ## Thread Safety and Concurrency
218///
219/// While the pipeline entity itself is not thread-safe (following DDD
220/// principles), it can be safely shared across threads when wrapped in
221/// appropriate synchronization primitives like `Arc<Mutex<Pipeline>>`.
222///
223/// ## Performance Considerations
224///
225/// - Stage validation is performed during modification, not during processing
226/// - Metrics collection has minimal overhead
227/// - Automatic stage insertion occurs only during pipeline creation
228/// - Repository operations are optimized for both read and write performance
229#[derive(Debug, Clone, Serialize, Deserialize)]
230pub struct Pipeline {
231 // Identity fields (always first)
232 id: PipelineId,
233 name: String,
234
235 // Core business fields (alphabetical within group)
236 archived: bool,
237 configuration: HashMap<String, String>,
238 metrics: ProcessingMetrics,
239 stages: Vec<PipelineStage>,
240
241 // Metadata fields (always last)
242 #[serde(with = "datetime_serde")]
243 created_at: chrono::DateTime<chrono::Utc>,
244 #[serde(with = "datetime_serde")]
245 updated_at: chrono::DateTime<chrono::Utc>,
246}
247
248impl Pipeline {
249 /// Creates the mandatory input checksum stage
250 ///
251 /// This stage is automatically prepended to every pipeline to ensure
252 /// input file integrity verification.
253 fn create_input_checksum_stage() -> Result<PipelineStage, PipelineError> {
254 PipelineStage::new(
255 "input_checksum".to_string(),
256 crate::entities::pipeline_stage::StageType::Checksum,
257 crate::entities::pipeline_stage::StageConfiguration::new(
258 "sha256".to_string(),
259 HashMap::new(),
260 false, // not parallel
261 ),
262 0, // order: first
263 )
264 }
265
266 /// Creates the mandatory output checksum stage
267 ///
268 /// This stage is automatically appended to every pipeline to ensure
269 /// output file integrity verification.
270 ///
271 /// # Arguments
272 ///
273 /// * `order` - The order position for this stage (should be last)
274 fn create_output_checksum_stage(order: u32) -> Result<PipelineStage, PipelineError> {
275 PipelineStage::new(
276 "output_checksum".to_string(),
277 crate::entities::pipeline_stage::StageType::Checksum,
278 crate::entities::pipeline_stage::StageConfiguration::new(
279 "sha256".to_string(),
280 HashMap::new(),
281 false, // not parallel
282 ),
283 order, // order: last
284 )
285 }
286 /// Creates a new pipeline with the given name and user-defined stages.
287 ///
288 /// # Automatic Stage Insertion
289 ///
290 /// **IMPORTANT**: This constructor automatically inserts mandatory checksum
291 /// stages:
292 /// - `input_checksum` stage is prepended (order: 0)
293 /// - User-defined stages follow (order: 1, 2, 3...)
294 /// - `output_checksum` stage is appended (order: final)
295 ///
296 /// This ensures the database reflects the complete processing pipeline that
297 /// actually gets executed, maintaining the "database as single source
298 /// of truth" principle.
299 ///
300 /// # Example
301 ///
302 /// ```
303 /// use adaptive_pipeline_domain::entities::pipeline::Pipeline;
304 /// use adaptive_pipeline_domain::entities::pipeline_stage::{
305 /// PipelineStage, StageConfiguration, StageType,
306 /// };
307 ///
308 /// let compression = PipelineStage::new(
309 /// "compress".to_string(),
310 /// StageType::Compression,
311 /// StageConfiguration::default(),
312 /// 0,
313 /// )
314 /// .unwrap();
315 ///
316 /// let pipeline = Pipeline::new("My Pipeline".to_string(), vec![compression]).unwrap();
317 ///
318 /// // Verify automatic checksum stages were added
319 /// assert_eq!(pipeline.stages().len(), 3); // input_checksum + user stage + output_checksum
320 /// assert_eq!(pipeline.name(), "My Pipeline");
321 /// ```
322 ///
323 /// # Arguments
324 ///
325 /// * `name` - Pipeline name (must not be empty)
326 /// * `stages` - User-defined processing stages (must not be empty)
327 ///
328 /// # Returns
329 ///
330 /// Returns a `Pipeline` with automatic checksum stages inserted, or
331 /// `PipelineError` if validation fails.
332 ///
333 /// # Errors
334 ///
335 /// * `InvalidConfiguration` - If name is empty or no user stages provided
336 pub fn new(name: String, user_stages: Vec<PipelineStage>) -> Result<Self, PipelineError> {
337 if name.is_empty() {
338 return Err(PipelineError::InvalidConfiguration(
339 "Pipeline name cannot be empty".to_string(),
340 ));
341 }
342
343 if user_stages.is_empty() {
344 return Err(PipelineError::InvalidConfiguration(
345 "Pipeline must have at least one user-defined stage".to_string(),
346 ));
347 }
348
349 let now = chrono::Utc::now();
350
351 // Calculate total stages before consuming user_stages vector
352 let user_stage_count = user_stages.len();
353
354 // Build complete pipeline stages: input_checksum + user_stages +
355 // output_checksum
356 let mut complete_stages = Vec::with_capacity(user_stage_count + 2);
357
358 // 1. Create and add input_checksum stage (order: 0)
359 let input_checksum_stage = Self::create_input_checksum_stage()?;
360 complete_stages.push(input_checksum_stage);
361
362 // 2. Add user stages with proper order (starting from 1)
363 for (index, stage) in user_stages.into_iter().enumerate() {
364 // Create new stage with adjusted order to account for input_checksum at
365 // position 0
366 let user_stage = PipelineStage::new(
367 stage.name().to_string(),
368 *stage.stage_type(),
369 stage.configuration().clone(),
370 (index + 1) as u32, // order: 1, 2, 3...
371 )?;
372 complete_stages.push(user_stage);
373 }
374
375 // 3. Create and add output_checksum stage (order: last)
376 let output_checksum_stage = Self::create_output_checksum_stage((user_stage_count + 1) as u32)?;
377 complete_stages.push(output_checksum_stage);
378
379 Ok(Pipeline {
380 // Identity fields
381 id: PipelineId::new(),
382 name,
383
384 // Core business fields (alphabetical)
385 archived: false,
386 configuration: HashMap::new(),
387 metrics: ProcessingMetrics::default(),
388 stages: complete_stages, // Complete pipeline with mandatory stages
389
390 // Metadata fields
391 created_at: now,
392 updated_at: now,
393 })
394 }
395
396 /// Gets the unique identifier for this pipeline
397 ///
398 /// The pipeline ID is immutable and persists throughout the entity's
399 /// lifetime. This ID is used for database lookups, API references, and
400 /// maintaining entity identity across system boundaries.
401 ///
402 /// # Returns
403 ///
404 /// A reference to the pipeline's unique identifier
405 ///
406 /// # Examples
407 pub fn id(&self) -> &PipelineId {
408 &self.id
409 }
410
411 /// Gets the human-readable name of the pipeline
412 ///
413 /// The name is used for display purposes, logging, and user identification.
414 /// Unlike the ID, the name can be duplicated across different pipelines.
415 ///
416 /// # Returns
417 ///
418 /// The pipeline name as a string slice
419 ///
420 /// # Examples
421 pub fn name(&self) -> &str {
422 &self.name
423 }
424
425 /// Gets the ordered list of processing stages in this pipeline
426 ///
427 /// Returns all stages including automatically-inserted checksum stages.
428 /// Stages are returned in execution order (order: 0, 1, 2, ...).
429 ///
430 /// # Why This Returns a Slice
431 ///
432 /// Returning a slice (`&[PipelineStage]`) instead of a Vec provides:
433 /// - **No cloning**: Efficient access without copying data
434 /// - **Immutability**: Prevents external modification of stages
435 /// - **Standard interface**: Works with all slice methods and iterators
436 ///
437 /// # Returns
438 ///
439 /// An immutable slice containing all pipeline stages in execution order
440 ///
441 /// # Examples
442 pub fn stages(&self) -> &[PipelineStage] {
443 &self.stages
444 }
445
446 /// Gets the pipeline configuration parameters
447 ///
448 /// Configuration parameters are key-value pairs that control pipeline
449 /// behavior. Common parameters include:
450 /// - `max_workers`: Number of concurrent worker threads
451 /// - `chunk_size`: Size of data chunks for processing
452 /// - `timeout`: Processing timeout in seconds
453 /// - `buffer_size`: I/O buffer size in bytes
454 ///
455 /// # Returns
456 ///
457 /// An immutable reference to the configuration HashMap
458 ///
459 /// # Examples
460 pub fn configuration(&self) -> &HashMap<String, String> {
461 &self.configuration
462 }
463
464 /// Gets the current processing metrics for this pipeline
465 ///
466 /// Metrics track performance and execution statistics including:
467 /// - Bytes processed and throughput
468 /// - Processing duration and timestamps
469 /// - Error and warning counts
470 /// - Stage-specific metrics
471 ///
472 /// # Returns
473 ///
474 /// An immutable reference to the pipeline's metrics
475 ///
476 /// # Examples
477 pub fn metrics(&self) -> &ProcessingMetrics {
478 &self.metrics
479 }
480
481 /// Gets the timestamp when this pipeline was created
482 ///
483 /// The creation timestamp is set when the pipeline entity is first
484 /// constructed and never changes. It's useful for auditing, sorting,
485 /// and determining pipeline age.
486 ///
487 /// # Returns
488 ///
489 /// Reference to the UTC creation timestamp
490 ///
491 /// # Examples
492 pub fn created_at(&self) -> &chrono::DateTime<chrono::Utc> {
493 &self.created_at
494 }
495
496 /// Gets the timestamp of the last modification to this pipeline
497 ///
498 /// The updated timestamp changes whenever the pipeline is modified,
499 /// including configuration updates, stage additions/removals, or
500 /// metrics updates.
501 ///
502 /// # Why Track Updates?
503 ///
504 /// Tracking update times enables:
505 /// - **Optimistic locking**: Detect concurrent modifications
506 /// - **Audit trails**: Know when changes occurred
507 /// - **Cache invalidation**: Know when cached data is stale
508 /// - **Sorting**: Order pipelines by recency
509 ///
510 /// # Returns
511 ///
512 /// Reference to the UTC timestamp of the last update
513 ///
514 /// # Examples
515 pub fn updated_at(&self) -> &chrono::DateTime<chrono::Utc> {
516 &self.updated_at
517 }
518
519 /// Gets the current status of the pipeline
520 ///
521 /// Returns a simple status string indicating whether the pipeline is
522 /// currently active or archived. This is a basic status indicator;
523 /// detailed operational status (running, idle, failed) should be
524 /// obtained from monitoring systems like Prometheus/Grafana.
525 ///
526 /// # Why Simple Status?
527 ///
528 /// This returns a static status because:
529 /// - **Domain purity**: Pipeline entity shouldn't know about runtime state
530 /// - **Separation of concerns**: Operational status belongs in monitoring
531 /// - **Simplicity**: Avoid mixing persistent state with transient state
532 ///
533 /// # Returns
534 ///
535 /// - `"Active"` if the pipeline is available for use
536 /// - `"Archived"` if the pipeline has been soft-deleted
537 ///
538 /// # Examples
539 ///
540 ///
541 /// # Note
542 ///
543 /// For real-time operational status (running, idle, error states),
544 /// query your monitoring system (Prometheus/Grafana) instead.
545 pub fn status(&self) -> &'static str {
546 if self.archived {
547 "Archived"
548 } else {
549 "Active"
550 }
551 }
552
553 /// Checks if the pipeline is archived (soft-deleted)
554 ///
555 /// Archived pipelines are not physically deleted but are hidden from
556 /// normal queries and cannot be executed. Archiving provides:
557 /// - **Reversibility**: Can be restored if needed
558 /// - **Audit trail**: Maintains history of deleted pipelines
559 /// - **Data integrity**: Preserves foreign key relationships
560 ///
561 /// # Returns
562 ///
563 /// - `true` if the pipeline is archived
564 /// - `false` if the pipeline is active
565 ///
566 /// # Examples
567 pub fn archived(&self) -> bool {
568 self.archived
569 }
570
571 /// Updates the complete pipeline configuration
572 ///
573 /// Replaces the entire configuration HashMap with new values. Any previous
574 /// configuration is discarded. For updating individual keys, retrieve the
575 /// configuration, modify it, and pass it back.
576 ///
577 /// # Why Replace Instead of Merge?
578 ///
579 /// Complete replacement provides:
580 /// - **Clear semantics**: No ambiguity about what happens to old values
581 /// - **Simplicity**: No complex merge logic needed
582 /// - **Explicit control**: Caller decides exact final state
583 /// - **Immutability pattern**: Aligns with functional programming
584 /// principles
585 ///
586 /// # Arguments
587 ///
588 /// * `config` - The new configuration HashMap to set. Common keys include:
589 /// - `max_workers`: Number of worker threads
590 /// - `chunk_size`: Processing chunk size in bytes
591 /// - `timeout`: Timeout in seconds
592 /// - `buffer_size`: I/O buffer size
593 ///
594 /// # Side Effects
595 ///
596 /// - Replaces all configuration values
597 /// - Updates the `updated_at` timestamp
598 ///
599 /// # Examples
600 ///
601 ///
602 /// ## Updating Individual Keys
603 pub fn update_configuration(&mut self, config: HashMap<String, String>) {
604 self.configuration = config;
605 self.updated_at = chrono::Utc::now();
606 }
607
608 /// Adds a new processing stage to the pipeline
609 ///
610 /// Appends a stage to the end of the pipeline's stage sequence. The new
611 /// stage must be compatible with the last existing stage according to
612 /// compatibility rules (e.g., compression should precede encryption).
613 ///
614 /// # Why Compatibility Checking?
615 ///
616 /// Stage compatibility ensures:
617 /// - **Correct ordering**: Stages execute in a logical sequence
618 /// - **Data integrity**: Each stage can process the previous stage's output
619 /// - **Performance**: Optimal stage ordering (compress before encrypt)
620 /// - **Correctness**: Prevents invalid combinations (e.g., double
621 /// compression)
622 ///
623 /// # Arguments
624 ///
625 /// * `stage` - The pipeline stage to add. Must be compatible with the
626 /// current last stage.
627 ///
628 /// # Returns
629 ///
630 /// - `Ok(())` if the stage was added successfully
631 /// - `Err(PipelineError::IncompatibleStage)` if stage is incompatible
632 ///
633 /// # Errors
634 ///
635 /// Returns `IncompatibleStage` if the new stage is not compatible with
636 /// the last stage in the pipeline. For example:
637 /// - Attempting to add encryption before compression
638 /// - Adding duplicate stage types
639 /// - Incompatible algorithm combinations
640 ///
641 /// # Side Effects
642 ///
643 /// - Appends stage to the pipeline's stage list
644 /// - Updates the `updated_at` timestamp
645 ///
646 /// # Examples
647 pub fn add_stage(&mut self, stage: PipelineStage) -> Result<(), PipelineError> {
648 // Validate stage compatibility
649 if let Some(last_stage) = self.stages.last() {
650 if !last_stage.is_compatible_with(&stage) {
651 return Err(PipelineError::IncompatibleStage(format!(
652 "Stage {} is not compatible with {}",
653 stage.name(),
654 last_stage.name()
655 )));
656 }
657 }
658
659 self.stages.push(stage);
660 self.updated_at = chrono::Utc::now();
661 Ok(())
662 }
663
664 /// Removes a processing stage from the pipeline by its index position
665 ///
666 /// Removes the stage at the specified index and returns it. The pipeline
667 /// must always have at least one stage remaining after removal.
668 ///
669 /// # Why Index-Based Removal?
670 ///
671 /// Index-based removal provides:
672 /// - **Precision**: Remove exact stage by position
673 /// - **Simplicity**: No need to search by name or ID
674 /// - **Efficiency**: O(n) removal where n is stages after index
675 /// - **Flexibility**: Works even with duplicate stage names
676 ///
677 /// # Arguments
678 ///
679 /// * `index` - Zero-based position of the stage to remove (0 = first stage)
680 ///
681 /// # Returns
682 ///
683 /// - `Ok(PipelineStage)` - The removed stage
684 /// - `Err(PipelineError)` - If index is invalid or removal would leave
685 /// pipeline empty
686 ///
687 /// # Errors
688 ///
689 /// This function returns an error if:
690 /// - `InvalidConfiguration`: Index is out of bounds (>= stage count)
691 /// - `InvalidConfiguration`: Removing the last remaining stage
692 ///
693 /// # Side Effects
694 ///
695 /// - Removes stage from the pipeline
696 /// - Shifts subsequent stages down by one position
697 /// - Updates the `updated_at` timestamp
698 ///
699 /// # Examples
700 pub fn remove_stage(&mut self, index: usize) -> Result<PipelineStage, PipelineError> {
701 if index >= self.stages.len() {
702 return Err(PipelineError::InvalidConfiguration(
703 "Stage index out of bounds".to_string(),
704 ));
705 }
706
707 if self.stages.len() == 1 {
708 return Err(PipelineError::InvalidConfiguration(
709 "Cannot remove the last stage".to_string(),
710 ));
711 }
712
713 self.updated_at = chrono::Utc::now();
714 Ok(self.stages.remove(index))
715 }
716
717 /// Updates the pipeline's processing metrics with new values
718 ///
719 /// Replaces the entire metrics object with new performance data. This is
720 /// typically called after processing completes to record final statistics.
721 ///
722 /// # Why Replace Metrics?
723 ///
724 /// Complete replacement instead of incremental updates provides:
725 /// - **Atomicity**: Metrics represent a single processing run
726 /// - **Clarity**: No confusion about partial vs. complete metrics
727 /// - **Simplicity**: No merge logic needed
728 /// - **Immutability**: Aligns with functional programming patterns
729 ///
730 /// # Arguments
731 ///
732 /// * `metrics` - New processing metrics to replace current metrics. Should
733 /// contain complete statistics from a processing run including:
734 /// - Bytes processed and throughput
735 /// - Processing duration
736 /// - Error and warning counts
737 /// - Stage-specific metrics
738 ///
739 /// # Side Effects
740 ///
741 /// - Replaces current metrics completely
742 /// - Updates the `updated_at` timestamp
743 ///
744 /// # Examples
745 pub fn update_metrics(&mut self, metrics: ProcessingMetrics) {
746 self.metrics = metrics;
747 self.updated_at = chrono::Utc::now();
748 }
749
750 /// Validates the complete pipeline configuration for correctness
751 ///
752 /// Performs comprehensive validation of the pipeline's configuration
753 /// including stage count, stage compatibility, and ordering rules. This
754 /// should be called before attempting to execute the pipeline.
755 ///
756 /// # What is Validated?
757 ///
758 /// - **Stage Count**: Pipeline must have at least one stage
759 /// - **Stage Compatibility**: Each stage must be compatible with the next
760 /// - **Stage Ordering**: Stages must be in correct execution order
761 /// - **Configuration Completeness**: All required configuration present
762 ///
763 /// # Why Validate?
764 ///
765 /// Validation prevents:
766 /// - **Runtime errors**: Catch issues before execution starts
767 /// - **Data corruption**: Ensure stages can process each other's output
768 /// - **Resource waste**: Don't start processing with invalid configuration
769 /// - **Poor UX**: Provide clear error messages upfront
770 ///
771 /// # Returns
772 ///
773 /// - `Ok(())` if pipeline configuration is valid
774 /// - `Err(PipelineError)` with details if validation fails
775 ///
776 /// # Errors
777 ///
778 /// Returns an error if:
779 /// - `InvalidConfiguration`: No stages present in pipeline
780 /// - `IncompatibleStage`: Adjacent stages are incompatible
781 ///
782 /// # Examples
783 ///
784 ///
785 /// # Implementation Note
786 ///
787 /// Validation uses a sliding window to check pairwise stage compatibility,
788 /// which is O(n) where n is the number of stages.
789 pub fn validate(&self) -> Result<(), PipelineError> {
790 if self.stages.is_empty() {
791 return Err(PipelineError::InvalidConfiguration(
792 "Pipeline must have at least one stage".to_string(),
793 ));
794 }
795
796 // Validate stage sequence
797 for window in self.stages.windows(2) {
798 if !window[0].is_compatible_with(&window[1]) {
799 return Err(PipelineError::IncompatibleStage(format!(
800 "Stages {} and {} are not compatible",
801 window[0].name(),
802 window[1].name()
803 )));
804 }
805 }
806
807 Ok(())
808 }
809
810 /// Creates a pipeline from database data (for repository use).
811 ///
812 /// # Arguments
813 ///
814 /// * `data` - A `PipelineData` DTO containing all fields from the database
815 ///
816 /// # Returns
817 ///
818 /// Returns `Ok(Pipeline)` if the data is valid, or `Err(PipelineError)` if
819 /// validation fails.
820 ///
821 /// # Errors
822 ///
823 /// * `PipelineError::InvalidConfiguration` - If name is empty or no stages
824 /// provided
825 pub fn from_database(data: PipelineData) -> Result<Self, PipelineError> {
826 if data.name.is_empty() {
827 return Err(PipelineError::InvalidConfiguration(
828 "Pipeline name cannot be empty".to_string(),
829 ));
830 }
831
832 if data.stages.is_empty() {
833 return Err(PipelineError::InvalidConfiguration(
834 "Pipeline must have at least one stage".to_string(),
835 ));
836 }
837
838 Ok(Pipeline {
839 id: data.id,
840 name: data.name,
841 archived: data.archived,
842 configuration: data.configuration,
843 metrics: data.metrics,
844 stages: data.stages,
845 created_at: data.created_at,
846 updated_at: data.updated_at,
847 })
848 }
849}
850
851// Implementation for generic repository support
852// This allows Pipeline to be used with the generic InMemoryRepository<T>
853// TODO: These traits should be defined in domain/repositories, not referenced
854// from infrastructure
855/*
856impl RepositoryEntity for Pipeline {
857 type Id = PipelineId;
858
859 fn id(&self) -> Self::Id {
860 self.id.clone()
861 }
862
863 fn name(&self) -> Option<&str> {
864 Some(&self.name)
865 }
866}
867
868// Implementation for SQLite repository support
869// This allows Pipeline to be used with the SQLite repository through adapters
870impl SqliteEntity for Pipeline {
871 type Id = PipelineId;
872
873 fn id(&self) -> Self::Id {
874 self.id.clone()
875 }
876
877 fn table_name() -> &'static str {
878 "pipelines"
879 }
880
881 fn table_schema() -> &'static str {
882 r#"
883 CREATE TABLE IF NOT EXISTS pipelines (
884 id TEXT PRIMARY KEY,
885 name TEXT NOT NULL,
886 data TEXT NOT NULL,
887 created_at TEXT NOT NULL,
888 updated_at TEXT NOT NULL,
889 archived BOOLEAN NOT NULL DEFAULT false,
890 UNIQUE(name)
891 )
892 "#
893 }
894
895 fn name(&self) -> Option<&str> {
896 Some(&self.name)
897 }
898
899 fn id_to_string(&self) -> String {
900 self.id.to_string()
901 }
902
903 fn id_to_string_static(id: &Self::Id) -> String {
904 id.to_string()
905 }
906
907 fn id_from_string(s: &str) -> Result<Self::Id, PipelineError> {
908 PipelineId::from_string(s)
909 }
910}
911*/
912
913/// Helper function to convert PipelineId to Uuid
914///
915/// This is used primarily for event sourcing where events use Uuid
916/// while the domain entities use PipelineId.
917pub fn pipeline_id_to_uuid(pipeline_id: &PipelineId) -> uuid::Uuid {
918 let ulid = pipeline_id.as_ulid();
919 uuid::Uuid::from_u128(ulid.0)
920}