adaptive_pipeline/application/use_cases/
create_pipeline.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Create Pipeline Use Case
9//!
10//! This module implements the use case for creating new processing pipelines.
11//! It handles pipeline configuration, stage parsing, name validation, and
12//! persistence to the repository.
13//!
14//! ## Overview
15//!
16//! The Create Pipeline use case provides:
17//!
18//! - **Pipeline Configuration**: Define processing stages and algorithms
19//! - **Name Validation**: Ensure pipeline names follow conventions
20//! - **Stage Parsing**: Parse comma-separated stage specifications
21//! - **Algorithm Selection**: Support multiple compression/encryption
22//!   algorithms
23//! - **Persistence**: Save pipeline configuration to repository
24//! - **Custom Stages**: Support for user-defined transformation stages
25//!
26//! ## Architecture
27//!
28//! Following Clean Architecture and Domain-Driven Design principles:
29//!
30//! - **Use Case Layer**: Orchestrates pipeline creation workflow
31//! - **Repository Pattern**: Delegates persistence to repository interface
32//! - **Dependency Inversion**: Depends on abstractions, not implementations
33//! - **Single Responsibility**: Focused solely on pipeline creation
34//! - **Domain Validation**: Enforces business rules for pipeline configuration
35//!
36//! ## Business Rules
37//!
38//! - Pipeline names must be at least 4 characters
39//! - Names are normalized to kebab-case
40//! - Reserved names (help, version, list, etc.) are rejected
41//! - Stages are specified as comma-separated values
42//! - Supported compression: brotli, gzip, zstd, lz4
43//! - Supported encryption: aes256gcm, aes128gcm, chacha20poly1305
44//! - Supported transforms: base64, pii_masking, tee, debug, passthrough
45//! - Custom stages default to Transform type
46//! - Debug stages auto-generate unique ULID labels
47//!
48//! ## Usage Examples
49//!
50//! ```rust,ignore
51//! use adaptive_pipeline::application::use_cases::CreatePipelineUseCase;
52//!
53//! let use_case = CreatePipelineUseCase::new(pipeline_repository);
54//!
55//! // Simple compression pipeline
56//! use_case.execute(
57//!     "compress-files".to_string(),
58//!     "brotli".to_string(),
59//!     None,
60//! ).await?;
61//!
62//! // Multi-stage pipeline
63//! use_case.execute(
64//!     "secure-backup".to_string(),
65//!     "brotli,aes256gcm,checksum".to_string(),
66//!     None,
67//! ).await?;
68//! ```
69
70use anyhow::Result;
71use std::collections::HashMap;
72use std::path::PathBuf;
73use std::sync::Arc;
74use tracing::info;
75
76use crate::infrastructure::repositories::sqlite_pipeline::SqlitePipelineRepository;
77use adaptive_pipeline_domain::entities::pipeline::Pipeline;
78use adaptive_pipeline_domain::entities::pipeline_stage::{PipelineStage, StageConfiguration, StageType};
79
80/// Use case for creating new processing pipelines.
81///
82/// This use case handles the complete workflow for creating a new pipeline,
83/// including name validation, stage parsing, configuration setup, and
84/// persistence to the repository.
85///
86/// ## Responsibilities
87///
88/// - Validate and normalize pipeline name
89/// - Parse stage specifications from comma-separated string
90/// - Map stage names to types and algorithms
91/// - Create pipeline domain entity
92/// - Save pipeline to repository
93/// - Handle creation errors gracefully
94///
95/// ## Dependencies
96///
97/// - **Pipeline Repository**: For persisting pipeline data
98///
99/// ## Example
100///
101/// ```rust,ignore
102/// let use_case = CreatePipelineUseCase::new(pipeline_repository);
103///
104/// match use_case.execute(
105///     "data-backup".to_string(),
106///     "brotli,aes256gcm".to_string(),
107///     None,
108/// ).await {
109///     Ok(()) => println!("Pipeline created successfully"),
110///     Err(e) => eprintln!("Failed to create pipeline: {}", e),
111/// }
112/// ```
113pub struct CreatePipelineUseCase {
114    pipeline_repository: Arc<SqlitePipelineRepository>,
115}
116
117impl CreatePipelineUseCase {
118    /// Creates a new Create Pipeline use case.
119    ///
120    /// # Parameters
121    ///
122    /// * `pipeline_repository` - Repository for persisting pipeline data
123    ///
124    /// # Returns
125    ///
126    /// A new instance of `CreatePipelineUseCase`
127    pub fn new(pipeline_repository: Arc<SqlitePipelineRepository>) -> Self {
128        Self { pipeline_repository }
129    }
130
131    /// Executes the create pipeline use case.
132    ///
133    /// Creates a new pipeline with the specified name and stages, validates
134    /// the configuration, and persists it to the repository.
135    ///
136    /// ## Parameters
137    ///
138    /// * `name` - Pipeline name (will be normalized to kebab-case)
139    /// * `stages` - Comma-separated list of stage specifications
140    ///   - Examples: "brotli", "brotli,aes256gcm",
141    ///     "compression,encryption,checksum"
142    /// * `output` - Optional file path for pipeline configuration export (not
143    ///   yet implemented)
144    ///
145    /// ## Stage Specifications
146    ///
147    /// **Generic Types** (use default algorithms):
148    /// - `compression` → brotli
149    /// - `encryption` → aes256gcm
150    /// - `checksum` → sha256
151    /// - `passthrough` → no-op
152    ///
153    /// **Specific Algorithms**:
154    /// - Compression: `brotli`, `gzip`, `zstd`, `lz4`
155    /// - Encryption: `aes256gcm`, `aes128gcm`, `chacha20poly1305`
156    /// - Transform: `base64`, `pii_masking`, `tee`, `debug`
157    ///
158    /// **Type:Algorithm Syntax**:
159    /// - `compression:lz4`
160    /// - `encryption:chacha20poly1305`
161    ///
162    /// ## Returns
163    ///
164    /// - `Ok(())` - Pipeline created and saved successfully
165    /// - `Err(anyhow::Error)` - Validation or persistence failed
166    ///
167    /// ## Errors
168    ///
169    /// Returns errors for:
170    /// - Empty pipeline name
171    /// - Name less than 4 characters after normalization
172    /// - Reserved pipeline names
173    /// - Invalid stage specifications
174    /// - Repository save failures
175    /// - Database connection errors
176    ///
177    /// ## Example
178    ///
179    /// ```rust,ignore
180    /// // Create simple compression pipeline
181    /// use_case.execute("backup".to_string(), "brotli".to_string(), None).await?;
182    ///
183    /// // Create secure multi-stage pipeline
184    /// use_case.execute(
185    ///     "Secure Backup!".to_string(),  // Will be normalized to "secure-backup"
186    ///     "brotli,aes256gcm,checksum".to_string(),
187    ///     None,
188    /// ).await?;
189    /// ```
190    pub async fn execute(&self, name: String, stages: String, output: Option<PathBuf>) -> Result<()> {
191        info!("Creating pipeline: {}", name);
192        info!("Stages: {}", stages);
193
194        // Validate and normalize pipeline name
195        let _normalized_name = Self::validate_pipeline_name(&name)?;
196
197        // Parse stage specifications
198        let stage_names: Vec<&str> = stages.split(',').collect();
199        let mut pipeline_stages = Vec::new();
200
201        for (index, stage_name) in stage_names.iter().enumerate() {
202            let (stage_type, algorithm) = match stage_name.trim() {
203                // Generic stage types with default algorithms
204                "compression" => (StageType::Compression, "brotli".to_string()),
205                "encryption" => (StageType::Encryption, "aes256gcm".to_string()),
206                "integrity" | "checksum" => (StageType::Checksum, "sha256".to_string()),
207                custom_name if custom_name.contains("checksum") => (StageType::Checksum, "sha256".to_string()),
208                "passthrough" => (StageType::PassThrough, "passthrough".to_string()),
209
210                // Compression algorithms
211                "brotli" | "gzip" | "zstd" | "lz4" => (StageType::Compression, stage_name.trim().to_string()),
212
213                // Encryption algorithms
214                "aes256gcm" | "aes128gcm" | "chacha20poly1305" => {
215                    (StageType::Encryption, stage_name.trim().to_string())
216                }
217
218                // Transform stages (production stages)
219                "base64" | "pii_masking" | "tee" | "debug" => (StageType::Transform, stage_name.trim().to_string()),
220
221                // Handle compression:algorithm syntax
222                custom_name if custom_name.starts_with("compression:") => {
223                    let algorithm = custom_name.strip_prefix("compression:").unwrap_or("brotli").to_string();
224                    (StageType::Compression, algorithm)
225                }
226
227                // Handle encryption:algorithm syntax
228                custom_name if custom_name.starts_with("encryption:") => {
229                    let algorithm = custom_name
230                        .strip_prefix("encryption:")
231                        .unwrap_or("aes256gcm")
232                        .to_string();
233                    (StageType::Encryption, algorithm)
234                }
235
236                _custom => {
237                    // For unknown stages, treat them as Transform with the name as the algorithm
238                    // This allows for custom stages to be used without code changes
239                    (StageType::Transform, stage_name.trim().to_string())
240                }
241            };
242
243            // Create parameters HashMap with algorithm
244            let mut parameters = HashMap::new();
245            parameters.insert("algorithm".to_string(), algorithm.clone());
246
247            // For debug stages, add a unique ULID label
248            if algorithm == "debug" {
249                parameters.insert("label".to_string(), ulid::Ulid::new().to_string());
250            }
251
252            let config = StageConfiguration {
253                algorithm,
254                parameters,
255                ..Default::default()
256            };
257
258            let stage = PipelineStage::new(stage_name.trim().to_string(), stage_type, config, index as u32)?;
259
260            pipeline_stages.push(stage);
261        }
262
263        // Create pipeline domain entity
264        let pipeline = Pipeline::new(name, pipeline_stages)?;
265
266        // Save pipeline to repository
267        self.pipeline_repository
268            .save(&pipeline)
269            .await
270            .map_err(|e| anyhow::anyhow!("Failed to save pipeline: {}", e))?;
271
272        info!(
273            "Pipeline '{}' created successfully with ID: {}",
274            pipeline.name(),
275            pipeline.id()
276        );
277        info!("Pipeline saved to database");
278
279        if output.is_some() {
280            info!("Note: File output not yet implemented, pipeline saved to database only");
281        }
282
283        Ok(())
284    }
285
286    /// Normalizes pipeline name to kebab-case.
287    ///
288    /// Converts any valid input string to a clean kebab-case identifier by:
289    /// - Converting to lowercase
290    /// - Replacing separators and special characters with hyphens
291    /// - Removing non-alphanumeric characters (except hyphens)
292    /// - Collapsing multiple consecutive hyphens
293    ///
294    /// ## Parameters
295    ///
296    /// * `name` - Raw pipeline name
297    ///
298    /// ## Returns
299    ///
300    /// Normalized kebab-case string
301    ///
302    /// ## Examples
303    ///
304    /// ```text
305    /// "My Pipeline" → "my-pipeline"
306    /// "data_backup" → "data-backup"
307    /// "Test::Pipeline!" → "test-pipeline"
308    /// ```
309    fn normalize_pipeline_name(name: &str) -> String {
310        name.to_lowercase()
311            // Replace common separators with hyphens
312            .replace(
313                [
314                    ' ', '_', '.', '/', '\\', ':', ';', ',', '|', '&', '+', '=', '!', '?', '*', '%', '#', '@', '$',
315                    '^', '(', ')', '[', ']', '{', '}', '<', '>', '"', '\'', '`', '~',
316                ],
317                "-",
318            )
319            // Remove any remaining non-alphanumeric, non-hyphen characters
320            .chars()
321            .filter(|c| c.is_ascii_alphanumeric() || *c == '-')
322            .collect::<String>()
323            // Clean up multiple consecutive hyphens
324            .split('-')
325            .filter(|s| !s.is_empty())
326            .collect::<Vec<&str>>()
327            .join("-")
328    }
329
330    /// Validates pipeline name according to business rules.
331    ///
332    /// Ensures pipeline names meet the following criteria:
333    /// - Not empty
334    /// - At least 4 characters after normalization
335    /// - Not a reserved system name
336    ///
337    /// Names are automatically normalized to kebab-case.
338    ///
339    /// ## Parameters
340    ///
341    /// * `name` - Pipeline name to validate
342    ///
343    /// ## Returns
344    ///
345    /// - `Ok(String)` - Normalized pipeline name
346    /// - `Err(anyhow::Error)` - Validation failed
347    ///
348    /// ## Errors
349    ///
350    /// Returns errors for:
351    /// - Empty name
352    /// - Name less than 4 characters after normalization
353    /// - Reserved names: help, version, list, show, create, delete, update,
354    ///   config
355    ///
356    /// ## Example
357    ///
358    /// ```rust,ignore
359    /// assert!(validate_pipeline_name("").is_err());  // Too short
360    /// assert!(validate_pipeline_name("abc").is_err());  // Too short after normalization
361    /// assert!(validate_pipeline_name("help").is_err());  // Reserved
362    /// assert_eq!(validate_pipeline_name("My Pipeline").unwrap(), "my-pipeline");
363    /// ```
364    fn validate_pipeline_name(name: &str) -> Result<String> {
365        // Check for empty name
366        if name.is_empty() {
367            return Err(anyhow::anyhow!("Pipeline name cannot be empty"));
368        }
369
370        // Normalize to kebab-case
371        let normalized = Self::normalize_pipeline_name(name);
372
373        // Check minimum length after normalization
374        if normalized.len() < 4 {
375            return Err(anyhow::anyhow!("Pipeline name must be at least 4 characters long"));
376        }
377
378        // Reserved names
379        let reserved_names = [
380            "help", "version", "list", "show", "create", "delete", "update", "config",
381        ];
382        if reserved_names.contains(&normalized.as_str()) {
383            return Err(anyhow::anyhow!(
384                "Pipeline name '{}' is reserved. Please choose a different name.",
385                name
386            ));
387        }
388
389        // Inform user if name was normalized
390        if normalized != name {
391            info!(
392                "Pipeline name normalized from '{}' to '{}' (kebab-case standard)",
393                name, normalized
394            );
395        }
396
397        Ok(normalized)
398    }
399}
400
401#[cfg(test)]
402mod tests {
403    use super::*;
404
405    #[test]
406    fn test_normalize_pipeline_name() {
407        assert_eq!(
408            CreatePipelineUseCase::normalize_pipeline_name("My Pipeline"),
409            "my-pipeline"
410        );
411        assert_eq!(
412            CreatePipelineUseCase::normalize_pipeline_name("data_backup"),
413            "data-backup"
414        );
415        assert_eq!(
416            CreatePipelineUseCase::normalize_pipeline_name("Test::Pipeline!"),
417            "test-pipeline"
418        );
419        assert_eq!(
420            CreatePipelineUseCase::normalize_pipeline_name("---multiple---hyphens---"),
421            "multiple-hyphens"
422        );
423    }
424
425    #[test]
426    fn test_validate_pipeline_name() {
427        // Valid names
428        assert!(CreatePipelineUseCase::validate_pipeline_name("test-pipeline").is_ok());
429        assert!(CreatePipelineUseCase::validate_pipeline_name("my-backup").is_ok());
430
431        // Invalid: empty
432        assert!(CreatePipelineUseCase::validate_pipeline_name("").is_err());
433
434        // Invalid: too short after normalization
435        assert!(CreatePipelineUseCase::validate_pipeline_name("abc").is_err());
436
437        // Invalid: reserved names
438        assert!(CreatePipelineUseCase::validate_pipeline_name("help").is_err());
439        assert!(CreatePipelineUseCase::validate_pipeline_name("list").is_err());
440        assert!(CreatePipelineUseCase::validate_pipeline_name("create").is_err());
441    }
442
443    #[tokio::test]
444    #[ignore] // Requires database setup
445    async fn test_create_pipeline_with_real_repository() {
446        // This test would require a real database setup
447        // For now, marked as ignored
448        // See tests/integration/ for full end-to-end tests
449    }
450}