adaptive_pipeline/infrastructure/repositories/
sqlite_pipeline.rs

1// /////////////////////////////////////////////////////////////////////////////
2// Adaptive Pipeline
3// Copyright (c) 2025 Michael Gardner, A Bit of Help, Inc.
4// SPDX-License-Identifier: BSD-3-Clause
5// See LICENSE file in the project root.
6// /////////////////////////////////////////////////////////////////////////////
7
8//! # SQLite Pipeline Repository Adapter
9//!
10//! Implements the pipeline repository interface using SQLite for persistence.
11//! Follows Hexagonal Architecture with proper relational schema, ACID
12//! transactions, connection pooling, and parameterized queries for security.
13//! See mdBook for detailed schema documentation and usage examples.
14
15use adaptive_pipeline_domain::entities::pipeline_stage::{StageConfiguration, StageType};
16use adaptive_pipeline_domain::value_objects::PipelineId;
17use adaptive_pipeline_domain::{Pipeline, PipelineError, PipelineStage, ProcessingMetrics};
18use sqlx::{Row, SqlitePool};
19use std::collections::HashMap;
20use tracing::debug;
21// REMOVED: Generic Repository import - violates DIP
22// DDD Principle: Use only domain-specific repository interfaces
23
24/// Structured SQLite pipeline repository using proper database columns
25///
26/// This implementation provides a concrete SQLite-based implementation of the
27/// pipeline repository interface, following Domain-Driven Design principles
28/// and proper relational database design patterns.
29///
30/// # Key Features
31///
32/// - **Relational Design**: Proper normalized database schema with separate
33///   tables
34/// - **Type Safety**: Strong typing with compile-time query validation using
35///   sqlx
36/// - **Transaction Support**: ACID transactions for data consistency
37/// - **Performance Optimization**: Efficient queries with proper indexing
38/// - **Error Handling**: Comprehensive error handling and recovery
39///
40/// # Database Schema
41///
42/// The repository uses a normalized relational schema:
43/// - **pipelines**: Main pipeline entity data
44/// - **pipeline_stages**: Pipeline stage configurations
45/// - **pipeline_metrics**: Performance and execution metrics
46///
47/// # Architecture
48///
49/// This implementation avoids JSON serialization issues by using proper
50/// relational database design with separate tables for pipeline data,
51/// configuration, stages, and metrics.
52///
53/// # Examples
54///
55///
56/// # Visibility
57///
58/// - **Public**: For dependency injection and external usage
59/// - **Private Fields**: Database connection pool is encapsulated
60pub struct SqlitePipelineRepository {
61    // PRIVATE: Database connection pool - internal implementation detail
62    pool: SqlitePool,
63}
64
65impl SqlitePipelineRepository {
66    /// Creates a new structured pipeline repository with database connection
67    ///
68    /// This constructor establishes a connection pool to the SQLite database,
69    /// which will be used for all subsequent repository operations. The
70    /// connection pool provides efficient resource management and supports
71    /// concurrent access.
72    ///
73    /// # Why Connection Pooling?
74    ///
75    /// Connection pooling is used because:
76    /// 1. **Performance**: Reusing connections is faster than creating new ones
77    /// 2. **Resource Management**: Limits the number of open database
78    ///    connections
79    /// 3. **Concurrency**: Allows multiple operations to share connections
80    ///    safely
81    /// 4. **Reliability**: Automatically handles connection failures and
82    ///    retries
83    ///
84    /// # Arguments
85    ///
86    /// * `database_path` - Path to the SQLite database file, or special values:
87    ///   - `:memory:` or `sqlite::memory:` for in-memory database (useful for
88    ///     testing)
89    ///   - Any file path like `"data/pipelines.db"` for persistent storage
90    ///
91    /// # Returns
92    ///
93    /// * `Ok(SqlitePipelineRepository)` - Successfully connected repository
94    /// * `Err(PipelineError)` - Connection failed with error details
95    ///
96    /// # Errors
97    ///
98    /// This function returns an error if:
99    /// - The database file cannot be opened or created
100    /// - File permissions prevent access
101    /// - The database format is incompatible
102    /// - The connection URL is malformed
103    ///
104    /// # Examples
105    ///
106    ///
107    /// # Implementation Notes
108    ///
109    /// The function normalizes the database path to sqlx's expected format:
110    /// - `:memory:` → `sqlite::memory:`
111    /// - File paths → `sqlite://<path>`
112    pub async fn new(database_path: &str) -> Result<Self, PipelineError> {
113        debug!("Creating SqlitePipelineRepository with database: {}", database_path);
114
115        // Build a proper SQLite connection URL. sqlx expects either:
116        // - "sqlite::memory:" for in-memory DB, or
117        // - "sqlite://<path>" for file-backed DB
118        let database_url = if database_path == ":memory:" || database_path == "sqlite::memory:" {
119            "sqlite::memory:".to_string()
120        } else {
121            format!("sqlite://{}", database_path)
122        };
123
124        // Use schema initialization which handles database creation and migrations
125        let pool = crate::infrastructure::repositories::schema::initialize_database(&database_url)
126            .await
127            .map_err(|e| {
128                PipelineError::database_error(format!("Failed to initialize database '{}': {}", database_path, e))
129            })?;
130
131        debug!("Successfully connected to structured SQLite database");
132        Ok(Self { pool })
133    }
134
135    /// Saves a pipeline to the database with ACID transaction guarantees
136    ///
137    /// This method persists a complete pipeline entity to the database,
138    /// including all associated data: configuration parameters, stages, and
139    /// stage parameters. The entire operation is wrapped in a database
140    /// transaction to ensure atomicity - either all data is saved
141    /// successfully, or none of it is.
142    ///
143    /// # Why ACID Transactions?
144    ///
145    /// ACID (Atomicity, Consistency, Isolation, Durability) transactions
146    /// ensure:
147    /// 1. **Atomicity**: All-or-nothing - if any part fails, everything rolls
148    ///    back
149    /// 2. **Consistency**: Database constraints are always maintained
150    /// 3. **Isolation**: Concurrent operations don't interfere with each other
151    /// 4. **Durability**: Once committed, data survives system crashes
152    ///
153    /// # What Gets Saved?
154    ///
155    /// The method saves to multiple related tables:
156    /// - **pipelines**: Main pipeline record (id, name, archived status,
157    ///   timestamps)
158    /// - **pipeline_configuration**: Key-value configuration parameters
159    /// - **pipeline_stages**: Processing stages with their configurations
160    /// - **stage_parameters**: Parameters for each stage
161    ///
162    /// # Arguments
163    ///
164    /// * `entity` - The pipeline entity to save. Must be a complete, valid
165    ///   pipeline with all required fields populated.
166    ///
167    /// # Returns
168    ///
169    /// * `Ok(())` - Pipeline saved successfully
170    /// * `Err(PipelineError)` - Save operation failed, transaction rolled back
171    ///
172    /// # Errors
173    ///
174    /// This function returns an error if:
175    /// - A pipeline with the same ID already exists (unique constraint
176    ///   violation)
177    /// - Database connection is lost during the operation
178    /// - Any SQL query fails (syntax error, constraint violation, etc.)
179    /// - Transaction cannot be started or committed
180    ///
181    /// Note: If an error occurs, the transaction is automatically rolled back,
182    /// leaving the database in its original state.
183    ///
184    /// # Examples
185    ///
186    ///
187    /// # Thread Safety
188    ///
189    /// This method is safe to call concurrently from multiple tasks. The
190    /// database connection pool handles concurrent access, and transaction
191    /// isolation prevents interference between concurrent saves.
192    ///
193    /// # Performance
194    ///
195    /// - **Complexity**: O(n + m) where n = number of config entries, m =
196    ///   number of stages
197    /// - **Database Writes**: Multiple INSERT statements within one transaction
198    /// - **Network**: Single round-trip for transaction commit
199    /// - **Locking**: Row-level locks acquired during transaction
200    pub async fn save(&self, entity: &Pipeline) -> Result<(), PipelineError> {
201        debug!(
202            pipeline_name = %entity.name(),
203            "SqlitePipelineRepository::save called"
204        );
205
206        // Start database transaction for ACID compliance
207        let mut tx = self
208            .pool
209            .begin()
210            .await
211            .map_err(|e| PipelineError::database_error(format!("Failed to start transaction: {}", e)))?;
212
213        // Insert main pipeline record
214        let pipeline_query = r#"
215            INSERT INTO pipelines (id, name, archived, created_at, updated_at)
216            VALUES (?, ?, ?, ?, ?)
217        "#;
218
219        sqlx::query(pipeline_query)
220            .bind(entity.id().to_string())
221            .bind(entity.name())
222            .bind(entity.archived())
223            .bind(entity.created_at().to_rfc3339())
224            .bind(entity.updated_at().to_rfc3339())
225            .execute(&mut *tx)
226            .await
227            .map_err(|e| PipelineError::database_error(format!("Failed to insert pipeline: {}", e)))?;
228
229        // Insert pipeline configuration
230        for (key, value) in entity.configuration() {
231            let config_query = r#"
232                INSERT INTO pipeline_configuration (pipeline_id, key, value, created_at, updated_at)
233                VALUES (?, ?, ?, ?, ?)
234            "#;
235
236            sqlx::query(config_query)
237                .bind(entity.id().to_string())
238                .bind(key)
239                .bind(value)
240                .bind(entity.created_at().to_rfc3339())
241                .bind(entity.updated_at().to_rfc3339())
242                .execute(&mut *tx)
243                .await
244                .map_err(|e| PipelineError::database_error(format!("Failed to insert configuration: {}", e)))?;
245        }
246
247        // Insert pipeline stages
248        for (index, stage) in entity.stages().iter().enumerate() {
249            let stage_query = r#"
250                INSERT INTO pipeline_stages (
251                    id, pipeline_id, name, stage_type, enabled, stage_order, 
252                    algorithm, parallel_processing, chunk_size, created_at, updated_at
253                )
254                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
255            "#;
256
257            sqlx::query(stage_query)
258                .bind(stage.id().to_string())
259                .bind(entity.id().to_string())
260                .bind(stage.name())
261                .bind(stage.stage_type().to_string())
262                .bind(stage.is_enabled())
263                .bind(index as i32)
264                .bind(&stage.configuration().algorithm)
265                .bind(stage.configuration().parallel_processing)
266                .bind(stage.configuration().chunk_size.map(|s| s as i64))
267                .bind(stage.created_at().to_rfc3339())
268                .bind(stage.updated_at().to_rfc3339())
269                .execute(&mut *tx)
270                .await
271                .map_err(|e| PipelineError::database_error(format!("Failed to insert stage: {}", e)))?;
272
273            // Insert stage parameters
274            for (param_key, param_value) in &stage.configuration().parameters {
275                let param_query = r#"
276                    INSERT INTO stage_parameters (stage_id, key, value, created_at, updated_at)
277                    VALUES (?, ?, ?, ?, ?)
278                "#;
279
280                sqlx::query(param_query)
281                    .bind(stage.id().to_string())
282                    .bind(param_key)
283                    .bind(param_value)
284                    .bind(stage.created_at().to_rfc3339())
285                    .bind(stage.updated_at().to_rfc3339())
286                    .execute(&mut *tx)
287                    .await
288                    .map_err(|e| PipelineError::database_error(format!("Failed to insert stage parameter: {}", e)))?;
289            }
290        }
291
292        // NOTE: Metrics are handled by Prometheus (per SRS requirements), not stored in
293        // database Skip metrics insertion - observability is handled externally
294        // This keeps the database focused on core pipeline data only
295
296        // Commit transaction - ensures ACID compliance
297        tx.commit()
298            .await
299            .map_err(|e| PipelineError::database_error(format!("Failed to commit transaction: {}", e)))?;
300
301        debug!(
302            pipeline_name = %entity.name(),
303            "Successfully saved pipeline with ACID transaction"
304        );
305        Ok(())
306    }
307
308    /// PUBLIC: Domain interface - Find pipeline by ID
309    pub async fn find_by_id(&self, id: PipelineId) -> Result<Option<Pipeline>, PipelineError> {
310        self.load_pipeline_from_db(id).await
311    }
312
313    /// PUBLIC: Domain interface - Update a pipeline
314    pub async fn update(&self, pipeline: &Pipeline) -> Result<(), PipelineError> {
315        // Implementation simplified for now
316        debug!(
317            pipeline_name = %pipeline.name(),
318            "SqlitePipelineRepository::update called"
319        );
320        Ok(())
321    }
322
323    /// PUBLIC: Domain interface - Soft delete a pipeline with cascading archive
324    pub async fn delete(&self, id: PipelineId) -> Result<bool, PipelineError> {
325        debug!(pipeline_id = %id, "Starting delete for pipeline");
326
327        let mut tx = self
328            .pool
329            .begin()
330            .await
331            .map_err(|e| PipelineError::database_error(format!("Failed to begin transaction: {}", e)))?;
332
333        let now = chrono::Utc::now().to_rfc3339();
334        let id_str = id.to_string();
335
336        debug!("Archiving pipeline stages...");
337        // Archive pipeline stages first
338        let stages_query = r#"
339            UPDATE pipeline_stages 
340            SET archived = true, updated_at = ?
341            WHERE pipeline_id = ? AND archived = false
342        "#;
343
344        let stages_result = sqlx::query(stages_query)
345            .bind(&now)
346            .bind(&id_str)
347            .execute(&mut *tx)
348            .await
349            .map_err(|e| PipelineError::database_error(format!("Failed to archive pipeline stages: {}", e)))?;
350
351        debug!(
352            stages_archived = stages_result.rows_affected(),
353            "Archived pipeline stages"
354        );
355
356        debug!("Archiving stage parameters...");
357        // Archive stage parameters
358        let params_query = r#"
359            UPDATE stage_parameters 
360            SET archived = true, updated_at = ?
361            WHERE stage_id IN (
362                SELECT id FROM pipeline_stages 
363                WHERE pipeline_id = ?
364            ) AND archived = false
365        "#;
366
367        let params_result = sqlx::query(params_query)
368            .bind(&now)
369            .bind(&id_str)
370            .execute(&mut *tx)
371            .await
372            .map_err(|e| PipelineError::database_error(format!("Failed to archive stage parameters: {}", e)))?;
373
374        debug!(
375            parameters_archived = params_result.rows_affected(),
376            "Archived stage parameters"
377        );
378
379        debug!("Archiving pipeline configuration...");
380        // Archive pipeline configuration
381        let config_query = r#"
382            UPDATE pipeline_configuration 
383            SET archived = true, updated_at = ?
384            WHERE pipeline_id = ? AND archived = false
385        "#;
386
387        let config_result = sqlx::query(config_query)
388            .bind(&now)
389            .bind(&id_str)
390            .execute(&mut *tx)
391            .await
392            .map_err(|e| PipelineError::database_error(format!("Failed to archive pipeline configuration: {}", e)))?;
393
394        debug!(
395            config_entries_archived = config_result.rows_affected(),
396            "Archived config entries"
397        );
398
399        debug!("Archiving main pipeline...");
400        // Finally, archive the main pipeline record
401        let pipeline_query = r#"
402            UPDATE pipelines 
403            SET archived = true, updated_at = ?
404            WHERE id = ? AND archived = false
405        "#;
406
407        let result = sqlx::query(pipeline_query)
408            .bind(&now)
409            .bind(&id_str)
410            .execute(&mut *tx)
411            .await
412            .map_err(|e| PipelineError::database_error(format!("Failed to archive pipeline: {}", e)))?;
413
414        let success = result.rows_affected() > 0;
415        debug!(
416            success = success,
417            rows_affected = result.rows_affected(),
418            "Pipeline archive result"
419        );
420
421        if success {
422            tx.commit()
423                .await
424                .map_err(|e| PipelineError::database_error(format!("Failed to commit archive transaction: {}", e)))?;
425            debug!("Transaction committed successfully");
426        } else {
427            tx.rollback()
428                .await
429                .map_err(|e| PipelineError::database_error(format!("Failed to rollback archive transaction: {}", e)))?;
430            debug!("Transaction rolled back");
431        }
432
433        Ok(success)
434    }
435
436    /// PUBLIC: Domain interface - List all active pipelines
437    pub async fn list_all(&self) -> Result<Vec<Pipeline>, PipelineError> {
438        debug!("SqlitePipelineRepository::list_all called (excluding archived)");
439
440        // Get all non-archived pipelines
441        let query = "SELECT id FROM pipelines WHERE archived = false ORDER BY name";
442        let rows = sqlx::query(query)
443            .fetch_all(&self.pool)
444            .await
445            .map_err(|e| PipelineError::database_error(format!("Failed to query pipelines: {}", e)))?;
446
447        let mut pipelines = Vec::new();
448        for row in rows {
449            let id_str: String = row.get("id");
450            let pipeline_id = PipelineId::from_string(&id_str)?;
451
452            if let Some(pipeline) = self.load_pipeline_from_db(pipeline_id).await? {
453                pipelines.push(pipeline);
454            }
455        }
456
457        debug!(pipeline_count = pipelines.len(), "Found active pipelines");
458        Ok(pipelines)
459    }
460
461    /// PUBLIC: Domain interface - Find all active pipelines (alias for
462    /// list_all)
463    pub async fn find_all(&self) -> Result<Vec<Pipeline>, PipelineError> {
464        self.list_all().await
465    }
466
467    /// PUBLIC: Domain interface - List archived pipelines
468    pub async fn list_archived(&self) -> Result<Vec<Pipeline>, PipelineError> {
469        debug!("SqlitePipelineRepository::list_archived called");
470
471        // Get all archived pipelines
472        let query = "SELECT id FROM pipelines WHERE archived = true ORDER BY name";
473        let rows = sqlx::query(query)
474            .fetch_all(&self.pool)
475            .await
476            .map_err(|e| PipelineError::database_error(format!("Failed to query pipelines: {}", e)))?;
477
478        let mut pipelines = Vec::new();
479        for row in rows {
480            let id_str: String = row.get("id");
481            let pipeline_id = PipelineId::from_string(&id_str)?;
482
483            if let Some(pipeline) = self.load_pipeline_from_db_with_archived(pipeline_id, true).await? {
484                pipelines.push(pipeline);
485            }
486        }
487
488        debug!(pipeline_count = pipelines.len(), "Found archived pipelines");
489        Ok(pipelines)
490    }
491
492    /// PUBLIC: Domain interface - Check if pipeline exists
493    pub async fn exists(&self, id: PipelineId) -> Result<bool, PipelineError> {
494        let query = "SELECT 1 FROM pipelines WHERE id = ? AND archived = false";
495        let result = sqlx::query(query)
496            .bind(id.to_string())
497            .fetch_optional(&self.pool)
498            .await
499            .map_err(|e| PipelineError::database_error(format!("Failed to check pipeline existence: {}", e)))?;
500
501        Ok(result.is_some())
502    }
503
504    /// PUBLIC: Domain interface - Find pipeline by name
505    pub async fn find_by_name(&self, name: &str) -> Result<Option<Pipeline>, PipelineError> {
506        debug!("SqlitePipelineRepository::find_by_name called for: {}", name);
507
508        let query = "SELECT id FROM pipelines WHERE name = ? AND archived = false";
509        let row = sqlx::query(query)
510            .bind(name)
511            .fetch_optional(&self.pool)
512            .await
513            .map_err(|e| PipelineError::database_error(format!("Failed to find pipeline by name: {}", e)))?;
514
515        if let Some(row) = row {
516            let id_str: String = row.get("id");
517            let pipeline_id = PipelineId::from_string(&id_str)?;
518            self.load_pipeline_from_db(pipeline_id).await
519        } else {
520            debug!(pipeline_name = name, "No pipeline found with name");
521            Ok(None)
522        }
523    }
524
525    /// PUBLIC: Domain interface - List pipelines with pagination
526    pub async fn list_paginated(&self, offset: usize, limit: usize) -> Result<Vec<Pipeline>, PipelineError> {
527        let query = "SELECT id FROM pipelines WHERE archived = false ORDER BY name LIMIT ? OFFSET ?";
528        let rows = sqlx::query(query)
529            .bind(limit as i64)
530            .bind(offset as i64)
531            .fetch_all(&self.pool)
532            .await
533            .map_err(|e| PipelineError::database_error(format!("Failed to query paginated pipelines: {}", e)))?;
534
535        let mut pipelines = Vec::new();
536        for row in rows {
537            let id_str: String = row.get("id");
538            let pipeline_id = PipelineId::from_string(&id_str)?;
539
540            if let Some(pipeline) = self.load_pipeline_from_db(pipeline_id).await? {
541                pipelines.push(pipeline);
542            }
543        }
544
545        Ok(pipelines)
546    }
547
548    /// PUBLIC: Domain interface - Count active pipelines
549    pub async fn count(&self) -> Result<usize, PipelineError> {
550        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM pipelines WHERE archived = false")
551            .fetch_one(&self.pool)
552            .await
553            .map_err(|e| PipelineError::database_error(format!("Failed to count pipelines: {}", e)))?;
554
555        Ok(count as usize)
556    }
557
558    /// PUBLIC: Domain interface - Find pipelines by configuration parameter
559    pub async fn find_by_config(&self, key: &str, value: &str) -> Result<Vec<Pipeline>, PipelineError> {
560        debug!(
561            config_key = key,
562            config_value = value,
563            "SqlitePipelineRepository::find_by_config called"
564        );
565
566        let query = r#"
567            SELECT DISTINCT p.id 
568            FROM pipelines p 
569            JOIN pipeline_configuration pc ON p.id = pc.pipeline_id 
570            WHERE pc.key = ? AND pc.value = ? AND p.archived = false AND pc.archived = false
571        "#;
572
573        let rows = sqlx::query(query)
574            .bind(key)
575            .bind(value)
576            .fetch_all(&self.pool)
577            .await
578            .map_err(|e| PipelineError::database_error(format!("Failed to find pipelines by config: {}", e)))?;
579
580        let mut pipelines = Vec::new();
581        for row in rows {
582            let id_str: String = row.get("id");
583            let pipeline_id = PipelineId::from_string(&id_str)?;
584
585            if let Some(pipeline) = self.load_pipeline_from_db(pipeline_id).await? {
586                pipelines.push(pipeline);
587            }
588        }
589
590        debug!(
591            pipeline_count = pipelines.len(),
592            config_key = key,
593            config_value = value,
594            "Found pipelines with config"
595        );
596        Ok(pipelines)
597    }
598
599    /// PUBLIC: Domain interface - Archive a pipeline (soft delete)
600    pub async fn archive(&self, id: PipelineId) -> Result<bool, PipelineError> {
601        self.delete(id).await
602    }
603
604    /// PUBLIC: Domain interface - Restore an archived pipeline
605    pub async fn restore(&self, id: PipelineId) -> Result<bool, PipelineError> {
606        let query = r#"
607            UPDATE pipelines 
608            SET archived = false, updated_at = ?
609            WHERE id = ? AND archived = true
610        "#;
611
612        let now = chrono::Utc::now().to_rfc3339();
613        let result = sqlx::query(query)
614            .bind(now)
615            .bind(id.to_string())
616            .execute(&self.pool)
617            .await
618            .map_err(|e| PipelineError::database_error(format!("Failed to restore pipeline: {}", e)))?;
619
620        Ok(result.rows_affected() > 0)
621    }
622
623    // PRIVATE: Internal helper methods
624    async fn load_pipeline_from_db(&self, id: PipelineId) -> Result<Option<Pipeline>, PipelineError> {
625        self.load_pipeline_from_db_with_archived(id, false).await
626    }
627
628    async fn load_pipeline_from_db_with_archived(
629        &self,
630        id: PipelineId,
631        include_archived: bool,
632    ) -> Result<Option<Pipeline>, PipelineError> {
633        debug!("Loading pipeline from structured DB: {}", id);
634
635        // Load main pipeline record
636        let pipeline_query = if include_archived {
637            "SELECT id, name, archived, created_at, updated_at FROM pipelines WHERE id = ?"
638        } else {
639            "SELECT id, name, archived, created_at, updated_at FROM pipelines WHERE id = ? AND archived = false"
640        };
641        let pipeline_row = sqlx::query(pipeline_query)
642            .bind(id.to_string())
643            .fetch_optional(&self.pool)
644            .await
645            .map_err(|e| PipelineError::database_error(format!("Failed to load pipeline: {}", e)))?;
646
647        let pipeline_row = match pipeline_row {
648            Some(row) => row,
649            None => {
650                return Ok(None);
651            }
652        };
653
654        // Parse pipeline data
655        let name: String = pipeline_row.get("name");
656        let archived: bool = pipeline_row.get("archived");
657        let created_at_str: String = pipeline_row.get("created_at");
658        let updated_at_str: String = pipeline_row.get("updated_at");
659
660        let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
661            .map_err(|e| PipelineError::SerializationError(format!("Invalid created_at format: {}", e)))?
662            .with_timezone(&chrono::Utc);
663        let updated_at = chrono::DateTime::parse_from_rfc3339(&updated_at_str)
664            .map_err(|e| PipelineError::SerializationError(format!("Invalid updated_at format: {}", e)))?
665            .with_timezone(&chrono::Utc);
666
667        // Load configuration
668        let config_query = "SELECT key, value FROM pipeline_configuration WHERE pipeline_id = ?";
669        let config_rows = sqlx::query(config_query)
670            .bind(id.to_string())
671            .fetch_all(&self.pool)
672            .await
673            .map_err(|e| PipelineError::database_error(format!("Failed to load configuration: {}", e)))?;
674
675        let mut configuration = HashMap::new();
676        for row in config_rows {
677            let key: String = row.get("key");
678            let value: String = row.get("value");
679            configuration.insert(key, value);
680        }
681
682        // Load stages
683        let stage_query = r#"
684            SELECT id, name, stage_type, enabled, stage_order, algorithm, 
685                   parallel_processing, chunk_size, created_at, updated_at 
686            FROM pipeline_stages 
687            WHERE pipeline_id = ?
688            ORDER BY stage_order
689        "#;
690        let stage_rows = sqlx::query(stage_query)
691            .bind(id.to_string())
692            .fetch_all(&self.pool)
693            .await
694            .map_err(|e| PipelineError::database_error(format!("Failed to load stages: {}", e)))?;
695
696        let mut stages = Vec::new();
697        for row in stage_rows {
698            let stage_id_str: String = row.get("id");
699            let stage_name: String = row.get("name");
700            let stage_type_str: String = row.get("stage_type");
701            let _enabled: bool = row.get("enabled");
702            let stage_order: i32 = row.get("stage_order");
703            let algorithm: String = row.get("algorithm");
704            let parallel_processing: bool = row.get("parallel_processing");
705            let chunk_size: Option<i64> = row.get("chunk_size");
706            let created_at_str: String = row.get("created_at");
707            let updated_at_str: String = row.get("updated_at");
708
709            // Parse stage type
710            let stage_type = stage_type_str
711                .parse::<StageType>()
712                .map_err(|e| PipelineError::SerializationError(format!("Invalid stage type: {}", e)))?;
713
714            // Load stage parameters from stage_parameters table
715            let params_query = "SELECT key, value FROM stage_parameters WHERE stage_id = ?";
716            let params_rows = sqlx::query(params_query)
717                .bind(&stage_id_str)
718                .fetch_all(&self.pool)
719                .await
720                .map_err(|e| PipelineError::database_error(format!("Failed to load stage parameters: {}", e)))?;
721
722            let mut parameters = std::collections::HashMap::new();
723            for param_row in params_rows {
724                let key: String = param_row.get("key");
725                let value: String = param_row.get("value");
726                parameters.insert(key, value);
727            }
728
729            // Build stage configuration
730            let stage_config = StageConfiguration {
731                algorithm,
732                operation: adaptive_pipeline_domain::entities::Operation::Forward, // Default to Forward
733                parameters,
734                parallel_processing,
735                chunk_size: chunk_size.map(|s| s as usize),
736            };
737
738            // Parse timestamps
739            let _created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
740                .map_err(|e| PipelineError::SerializationError(format!("Invalid stage created_at format: {}", e)))?
741                .with_timezone(&chrono::Utc);
742            let _updated_at = chrono::DateTime::parse_from_rfc3339(&updated_at_str)
743                .map_err(|e| PipelineError::SerializationError(format!("Invalid stage updated_at format: {}", e)))?
744                .with_timezone(&chrono::Utc);
745
746            // Create stage with proper arguments: name, stage_type, configuration, order
747            let stage = PipelineStage::new(stage_name, stage_type, stage_config, stage_order as u32)?;
748
749            // Set additional properties that can't be set via constructor
750            // Note: We'd need setters or a from_database constructor for this
751            // For now, we'll use the new constructor which creates new timestamps
752
753            stages.push(stage);
754        }
755
756        // Load metrics (simplified for now)
757        let metrics = ProcessingMetrics::new(0, 0); // TODO: Implement metrics loading
758
759        // Construct DTO and reconstruct pipeline
760        let data = adaptive_pipeline_domain::entities::pipeline::PipelineData {
761            id,
762            name,
763            archived,
764            configuration,
765            metrics,
766            stages,
767            created_at,
768            updated_at,
769        };
770
771        let pipeline = Pipeline::from_database(data)?;
772
773        debug!("Successfully loaded pipeline: {}", pipeline.name());
774        Ok(Some(pipeline))
775    }
776}
777
778// Clean trait implementation that delegates to public methods
779#[async_trait::async_trait]
780impl adaptive_pipeline_domain::repositories::pipeline_repository::PipelineRepository for SqlitePipelineRepository {
781    async fn save(&self, entity: &Pipeline) -> Result<(), PipelineError> {
782        self.save(entity).await
783    }
784
785    async fn find_by_id(&self, id: PipelineId) -> Result<Option<Pipeline>, PipelineError> {
786        self.find_by_id(id).await
787    }
788
789    async fn update(&self, pipeline: &Pipeline) -> Result<(), PipelineError> {
790        self.update(pipeline).await
791    }
792
793    async fn delete(&self, id: PipelineId) -> Result<bool, PipelineError> {
794        self.delete(id).await
795    }
796
797    async fn list_all(&self) -> Result<Vec<Pipeline>, PipelineError> {
798        self.list_all().await
799    }
800
801    async fn find_all(&self) -> Result<Vec<Pipeline>, PipelineError> {
802        self.find_all().await
803    }
804
805    async fn list_archived(&self) -> Result<Vec<Pipeline>, PipelineError> {
806        self.list_archived().await
807    }
808
809    async fn exists(&self, id: PipelineId) -> Result<bool, PipelineError> {
810        self.exists(id).await
811    }
812
813    async fn find_by_name(&self, name: &str) -> Result<Option<Pipeline>, PipelineError> {
814        self.find_by_name(name).await
815    }
816
817    async fn list_paginated(&self, offset: usize, limit: usize) -> Result<Vec<Pipeline>, PipelineError> {
818        self.list_paginated(offset, limit).await
819    }
820
821    async fn count(&self) -> Result<usize, PipelineError> {
822        self.count().await
823    }
824
825    async fn find_by_config(&self, key: &str, value: &str) -> Result<Vec<Pipeline>, PipelineError> {
826        self.find_by_config(key, value).await
827    }
828
829    async fn archive(&self, id: PipelineId) -> Result<bool, PipelineError> {
830        self.archive(id).await
831    }
832
833    async fn restore(&self, id: PipelineId) -> Result<bool, PipelineError> {
834        self.restore(id).await
835    }
836}
837
838#[cfg(test)]
839mod tests {
840    use super::*;
841
842    /// Tests database URL formatting for various SQLite connection paths.
843    #[test]
844    fn test_database_url_formatting() {
845        // Unit test: Test the database URL formatting logic
846        // This tests the internal logic without requiring actual database connectivity
847        // INFRASTRUCTURE CONCERN: Testing repository URL generation
848
849        let test_cases = vec![
850            ("/path/to/database.db", "sqlite:///path/to/database.db"),
851            ("./local.db", "sqlite://./local.db"),
852            (":memory:", "sqlite::memory:"),
853            ("/tmp/test.db", "sqlite:///tmp/test.db"),
854        ];
855
856        for (input_path, expected_url) in test_cases {
857            let formatted_url = if input_path == ":memory:" {
858                "sqlite::memory:".to_string()
859            } else {
860                format!("sqlite://{}", input_path)
861            };
862            assert_eq!(
863                formatted_url, expected_url,
864                "Database URL formatting failed for path: {}",
865                input_path
866            );
867        }
868    }
869
870    /// Tests repository error handling and PipelineError type mapping.
871    #[test]
872    fn test_error_handling() {
873        // Unit test: Test repository error handling
874        // INFRASTRUCTURE CONCERN: Testing error mapping and handling
875
876        let db_error = PipelineError::DatabaseError("Connection failed".to_string());
877        match db_error {
878            PipelineError::DatabaseError(msg) => {
879                assert_eq!(msg, "Connection failed");
880            }
881            _ => panic!("Expected DatabaseError"),
882        }
883
884        // Test using the helper method
885        let db_error_helper = PipelineError::database_error("SQL syntax error");
886        match db_error_helper {
887            PipelineError::DatabaseError(msg) => {
888                assert_eq!(msg, "SQL syntax error");
889            }
890            _ => panic!("Expected DatabaseError from helper"),
891        }
892    }
893
894    // NOTE: Domain logic tests (Pipeline creation, Stage configuration, etc.)
895    // have been moved to their proper domain entity files following DDD
896    // principles. Repository tests should focus on infrastructure concerns
897    // only.
898    //
899    // Repository functionality is tested through:
900    // 1. Application layer integration tests (respecting DIP)
901    // 2. The create_test_database utility (validates schema and operations)
902    // 3. These focused infrastructure unit tests
903}