adaptive_pipeline/application/use_cases/create_pipeline.rs
1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # Create Pipeline Use Case
9//!
10//! This module implements the use case for creating new processing pipelines.
11//! It handles pipeline configuration, stage parsing, name validation, and
12//! persistence to the repository.
13//!
14//! ## Overview
15//!
16//! The Create Pipeline use case provides:
17//!
18//! - **Pipeline Configuration**: Define processing stages and algorithms
19//! - **Name Validation**: Ensure pipeline names follow conventions
20//! - **Stage Parsing**: Parse comma-separated stage specifications
21//! - **Algorithm Selection**: Support multiple compression/encryption
22//! algorithms
23//! - **Persistence**: Save pipeline configuration to repository
24//! - **Custom Stages**: Support for user-defined transformation stages
25//!
26//! ## Architecture
27//!
28//! Following Clean Architecture and Domain-Driven Design principles:
29//!
30//! - **Use Case Layer**: Orchestrates pipeline creation workflow
31//! - **Repository Pattern**: Delegates persistence to repository interface
32//! - **Dependency Inversion**: Depends on abstractions, not implementations
33//! - **Single Responsibility**: Focused solely on pipeline creation
34//! - **Domain Validation**: Enforces business rules for pipeline configuration
35//!
36//! ## Business Rules
37//!
38//! - Pipeline names must be at least 4 characters
39//! - Names are normalized to kebab-case
40//! - Reserved names (help, version, list, etc.) are rejected
41//! - Stages are specified as comma-separated values
42//! - Supported compression: brotli, gzip, zstd, lz4
43//! - Supported encryption: aes256gcm, aes128gcm, chacha20poly1305
44//! - Supported transforms: base64, pii_masking, tee, debug, passthrough
45//! - Custom stages default to Transform type
46//! - Debug stages auto-generate unique ULID labels
47//!
48//! ## Usage Examples
49//!
50//! ```rust,ignore
51//! use adaptive_pipeline::application::use_cases::CreatePipelineUseCase;
52//!
53//! let use_case = CreatePipelineUseCase::new(pipeline_repository);
54//!
55//! // Simple compression pipeline
56//! use_case.execute(
57//! "compress-files".to_string(),
58//! "brotli".to_string(),
59//! None,
60//! ).await?;
61//!
62//! // Multi-stage pipeline
63//! use_case.execute(
64//! "secure-backup".to_string(),
65//! "brotli,aes256gcm,checksum".to_string(),
66//! None,
67//! ).await?;
68//! ```
69
70use anyhow::Result;
71use std::collections::HashMap;
72use std::path::PathBuf;
73use std::sync::Arc;
74use tracing::info;
75
76use crate::infrastructure::repositories::sqlite_pipeline::SqlitePipelineRepository;
77use adaptive_pipeline_domain::entities::pipeline::Pipeline;
78use adaptive_pipeline_domain::entities::pipeline_stage::{PipelineStage, StageConfiguration, StageType};
79
80/// Use case for creating new processing pipelines.
81///
82/// This use case handles the complete workflow for creating a new pipeline,
83/// including name validation, stage parsing, configuration setup, and
84/// persistence to the repository.
85///
86/// ## Responsibilities
87///
88/// - Validate and normalize pipeline name
89/// - Parse stage specifications from comma-separated string
90/// - Map stage names to types and algorithms
91/// - Create pipeline domain entity
92/// - Save pipeline to repository
93/// - Handle creation errors gracefully
94///
95/// ## Dependencies
96///
97/// - **Pipeline Repository**: For persisting pipeline data
98///
99/// ## Example
100///
101/// ```rust,ignore
102/// let use_case = CreatePipelineUseCase::new(pipeline_repository);
103///
104/// match use_case.execute(
105/// "data-backup".to_string(),
106/// "brotli,aes256gcm".to_string(),
107/// None,
108/// ).await {
109/// Ok(()) => println!("Pipeline created successfully"),
110/// Err(e) => eprintln!("Failed to create pipeline: {}", e),
111/// }
112/// ```
113pub struct CreatePipelineUseCase {
114 pipeline_repository: Arc<SqlitePipelineRepository>,
115}
116
117impl CreatePipelineUseCase {
118 /// Creates a new Create Pipeline use case.
119 ///
120 /// # Parameters
121 ///
122 /// * `pipeline_repository` - Repository for persisting pipeline data
123 ///
124 /// # Returns
125 ///
126 /// A new instance of `CreatePipelineUseCase`
127 pub fn new(pipeline_repository: Arc<SqlitePipelineRepository>) -> Self {
128 Self { pipeline_repository }
129 }
130
131 /// Executes the create pipeline use case.
132 ///
133 /// Creates a new pipeline with the specified name and stages, validates
134 /// the configuration, and persists it to the repository.
135 ///
136 /// ## Parameters
137 ///
138 /// * `name` - Pipeline name (will be normalized to kebab-case)
139 /// * `stages` - Comma-separated list of stage specifications
140 /// - Examples: "brotli", "brotli,aes256gcm",
141 /// "compression,encryption,checksum"
142 /// * `output` - Optional file path for pipeline configuration export (not
143 /// yet implemented)
144 ///
145 /// ## Stage Specifications
146 ///
147 /// **Generic Types** (use default algorithms):
148 /// - `compression` → brotli
149 /// - `encryption` → aes256gcm
150 /// - `checksum` → sha256
151 /// - `passthrough` → no-op
152 ///
153 /// **Specific Algorithms**:
154 /// - Compression: `brotli`, `gzip`, `zstd`, `lz4`
155 /// - Encryption: `aes256gcm`, `aes128gcm`, `chacha20poly1305`
156 /// - Transform: `base64`, `pii_masking`, `tee`, `debug`
157 ///
158 /// **Type:Algorithm Syntax**:
159 /// - `compression:lz4`
160 /// - `encryption:chacha20poly1305`
161 ///
162 /// ## Returns
163 ///
164 /// - `Ok(())` - Pipeline created and saved successfully
165 /// - `Err(anyhow::Error)` - Validation or persistence failed
166 ///
167 /// ## Errors
168 ///
169 /// Returns errors for:
170 /// - Empty pipeline name
171 /// - Name less than 4 characters after normalization
172 /// - Reserved pipeline names
173 /// - Invalid stage specifications
174 /// - Repository save failures
175 /// - Database connection errors
176 ///
177 /// ## Example
178 ///
179 /// ```rust,ignore
180 /// // Create simple compression pipeline
181 /// use_case.execute("backup".to_string(), "brotli".to_string(), None).await?;
182 ///
183 /// // Create secure multi-stage pipeline
184 /// use_case.execute(
185 /// "Secure Backup!".to_string(), // Will be normalized to "secure-backup"
186 /// "brotli,aes256gcm,checksum".to_string(),
187 /// None,
188 /// ).await?;
189 /// ```
190 pub async fn execute(&self, name: String, stages: String, output: Option<PathBuf>) -> Result<()> {
191 info!("Creating pipeline: {}", name);
192 info!("Stages: {}", stages);
193
194 // Validate and normalize pipeline name
195 let _normalized_name = Self::validate_pipeline_name(&name)?;
196
197 // Parse stage specifications
198 let stage_names: Vec<&str> = stages.split(',').collect();
199 let mut pipeline_stages = Vec::new();
200
201 for (index, stage_name) in stage_names.iter().enumerate() {
202 let (stage_type, algorithm) = match stage_name.trim() {
203 // Generic stage types with default algorithms
204 "compression" => (StageType::Compression, "brotli".to_string()),
205 "encryption" => (StageType::Encryption, "aes256gcm".to_string()),
206 "integrity" | "checksum" => (StageType::Checksum, "sha256".to_string()),
207 custom_name if custom_name.contains("checksum") => (StageType::Checksum, "sha256".to_string()),
208 "passthrough" => (StageType::PassThrough, "passthrough".to_string()),
209
210 // Compression algorithms
211 "brotli" | "gzip" | "zstd" | "lz4" => (StageType::Compression, stage_name.trim().to_string()),
212
213 // Encryption algorithms
214 "aes256gcm" | "aes128gcm" | "chacha20poly1305" => {
215 (StageType::Encryption, stage_name.trim().to_string())
216 }
217
218 // Transform stages (production stages)
219 "base64" | "pii_masking" | "tee" | "debug" => (StageType::Transform, stage_name.trim().to_string()),
220
221 // Handle compression:algorithm syntax
222 custom_name if custom_name.starts_with("compression:") => {
223 let algorithm = custom_name.strip_prefix("compression:").unwrap_or("brotli").to_string();
224 (StageType::Compression, algorithm)
225 }
226
227 // Handle encryption:algorithm syntax
228 custom_name if custom_name.starts_with("encryption:") => {
229 let algorithm = custom_name
230 .strip_prefix("encryption:")
231 .unwrap_or("aes256gcm")
232 .to_string();
233 (StageType::Encryption, algorithm)
234 }
235
236 _custom => {
237 // For unknown stages, treat them as Transform with the name as the algorithm
238 // This allows for custom stages to be used without code changes
239 (StageType::Transform, stage_name.trim().to_string())
240 }
241 };
242
243 // Create parameters HashMap with algorithm
244 let mut parameters = HashMap::new();
245 parameters.insert("algorithm".to_string(), algorithm.clone());
246
247 // For debug stages, add a unique ULID label
248 if algorithm == "debug" {
249 parameters.insert("label".to_string(), ulid::Ulid::new().to_string());
250 }
251
252 let config = StageConfiguration {
253 algorithm,
254 parameters,
255 ..Default::default()
256 };
257
258 let stage = PipelineStage::new(stage_name.trim().to_string(), stage_type, config, index as u32)?;
259
260 pipeline_stages.push(stage);
261 }
262
263 // Create pipeline domain entity
264 let pipeline = Pipeline::new(name, pipeline_stages)?;
265
266 // Save pipeline to repository
267 self.pipeline_repository
268 .save(&pipeline)
269 .await
270 .map_err(|e| anyhow::anyhow!("Failed to save pipeline: {}", e))?;
271
272 info!(
273 "Pipeline '{}' created successfully with ID: {}",
274 pipeline.name(),
275 pipeline.id()
276 );
277 info!("Pipeline saved to database");
278
279 if output.is_some() {
280 info!("Note: File output not yet implemented, pipeline saved to database only");
281 }
282
283 Ok(())
284 }
285
286 /// Normalizes pipeline name to kebab-case.
287 ///
288 /// Converts any valid input string to a clean kebab-case identifier by:
289 /// - Converting to lowercase
290 /// - Replacing separators and special characters with hyphens
291 /// - Removing non-alphanumeric characters (except hyphens)
292 /// - Collapsing multiple consecutive hyphens
293 ///
294 /// ## Parameters
295 ///
296 /// * `name` - Raw pipeline name
297 ///
298 /// ## Returns
299 ///
300 /// Normalized kebab-case string
301 ///
302 /// ## Examples
303 ///
304 /// ```text
305 /// "My Pipeline" → "my-pipeline"
306 /// "data_backup" → "data-backup"
307 /// "Test::Pipeline!" → "test-pipeline"
308 /// ```
309 fn normalize_pipeline_name(name: &str) -> String {
310 name.to_lowercase()
311 // Replace common separators with hyphens
312 .replace(
313 [
314 ' ', '_', '.', '/', '\\', ':', ';', ',', '|', '&', '+', '=', '!', '?', '*', '%', '#', '@', '$',
315 '^', '(', ')', '[', ']', '{', '}', '<', '>', '"', '\'', '`', '~',
316 ],
317 "-",
318 )
319 // Remove any remaining non-alphanumeric, non-hyphen characters
320 .chars()
321 .filter(|c| c.is_ascii_alphanumeric() || *c == '-')
322 .collect::<String>()
323 // Clean up multiple consecutive hyphens
324 .split('-')
325 .filter(|s| !s.is_empty())
326 .collect::<Vec<&str>>()
327 .join("-")
328 }
329
330 /// Validates pipeline name according to business rules.
331 ///
332 /// Ensures pipeline names meet the following criteria:
333 /// - Not empty
334 /// - At least 4 characters after normalization
335 /// - Not a reserved system name
336 ///
337 /// Names are automatically normalized to kebab-case.
338 ///
339 /// ## Parameters
340 ///
341 /// * `name` - Pipeline name to validate
342 ///
343 /// ## Returns
344 ///
345 /// - `Ok(String)` - Normalized pipeline name
346 /// - `Err(anyhow::Error)` - Validation failed
347 ///
348 /// ## Errors
349 ///
350 /// Returns errors for:
351 /// - Empty name
352 /// - Name less than 4 characters after normalization
353 /// - Reserved names: help, version, list, show, create, delete, update,
354 /// config
355 ///
356 /// ## Example
357 ///
358 /// ```rust,ignore
359 /// assert!(validate_pipeline_name("").is_err()); // Too short
360 /// assert!(validate_pipeline_name("abc").is_err()); // Too short after normalization
361 /// assert!(validate_pipeline_name("help").is_err()); // Reserved
362 /// assert_eq!(validate_pipeline_name("My Pipeline").unwrap(), "my-pipeline");
363 /// ```
364 fn validate_pipeline_name(name: &str) -> Result<String> {
365 // Check for empty name
366 if name.is_empty() {
367 return Err(anyhow::anyhow!("Pipeline name cannot be empty"));
368 }
369
370 // Normalize to kebab-case
371 let normalized = Self::normalize_pipeline_name(name);
372
373 // Check minimum length after normalization
374 if normalized.len() < 4 {
375 return Err(anyhow::anyhow!("Pipeline name must be at least 4 characters long"));
376 }
377
378 // Reserved names
379 let reserved_names = [
380 "help", "version", "list", "show", "create", "delete", "update", "config",
381 ];
382 if reserved_names.contains(&normalized.as_str()) {
383 return Err(anyhow::anyhow!(
384 "Pipeline name '{}' is reserved. Please choose a different name.",
385 name
386 ));
387 }
388
389 // Inform user if name was normalized
390 if normalized != name {
391 info!(
392 "Pipeline name normalized from '{}' to '{}' (kebab-case standard)",
393 name, normalized
394 );
395 }
396
397 Ok(normalized)
398 }
399}
400
401#[cfg(test)]
402mod tests {
403 use super::*;
404
405 #[test]
406 fn test_normalize_pipeline_name() {
407 assert_eq!(
408 CreatePipelineUseCase::normalize_pipeline_name("My Pipeline"),
409 "my-pipeline"
410 );
411 assert_eq!(
412 CreatePipelineUseCase::normalize_pipeline_name("data_backup"),
413 "data-backup"
414 );
415 assert_eq!(
416 CreatePipelineUseCase::normalize_pipeline_name("Test::Pipeline!"),
417 "test-pipeline"
418 );
419 assert_eq!(
420 CreatePipelineUseCase::normalize_pipeline_name("---multiple---hyphens---"),
421 "multiple-hyphens"
422 );
423 }
424
425 #[test]
426 fn test_validate_pipeline_name() {
427 // Valid names
428 assert!(CreatePipelineUseCase::validate_pipeline_name("test-pipeline").is_ok());
429 assert!(CreatePipelineUseCase::validate_pipeline_name("my-backup").is_ok());
430
431 // Invalid: empty
432 assert!(CreatePipelineUseCase::validate_pipeline_name("").is_err());
433
434 // Invalid: too short after normalization
435 assert!(CreatePipelineUseCase::validate_pipeline_name("abc").is_err());
436
437 // Invalid: reserved names
438 assert!(CreatePipelineUseCase::validate_pipeline_name("help").is_err());
439 assert!(CreatePipelineUseCase::validate_pipeline_name("list").is_err());
440 assert!(CreatePipelineUseCase::validate_pipeline_name("create").is_err());
441 }
442
443 #[tokio::test]
444 #[ignore] // Requires database setup
445 async fn test_create_pipeline_with_real_repository() {
446 // This test would require a real database setup
447 // For now, marked as ignored
448 // See tests/integration/ for full end-to-end tests
449 }
450}