1use adaptive_pipeline_domain::entities::pipeline_stage::{StageConfiguration, StageType};
16use adaptive_pipeline_domain::value_objects::PipelineId;
17use adaptive_pipeline_domain::{Pipeline, PipelineError, PipelineStage, ProcessingMetrics};
18use sqlx::{Row, SqlitePool};
19use std::collections::HashMap;
20use tracing::debug;
21pub struct SqlitePipelineRepository {
61 pool: SqlitePool,
63}
64
65impl SqlitePipelineRepository {
66 pub async fn new(database_path: &str) -> Result<Self, PipelineError> {
113 debug!("Creating SqlitePipelineRepository with database: {}", database_path);
114
115 let database_url = if database_path == ":memory:" || database_path == "sqlite::memory:" {
119 "sqlite::memory:".to_string()
120 } else {
121 format!("sqlite://{}", database_path)
122 };
123
124 let pool = crate::infrastructure::repositories::schema::initialize_database(&database_url)
126 .await
127 .map_err(|e| {
128 PipelineError::database_error(format!("Failed to initialize database '{}': {}", database_path, e))
129 })?;
130
131 debug!("Successfully connected to structured SQLite database");
132 Ok(Self { pool })
133 }
134
135 pub async fn save(&self, entity: &Pipeline) -> Result<(), PipelineError> {
201 debug!(
202 pipeline_name = %entity.name(),
203 "SqlitePipelineRepository::save called"
204 );
205
206 let mut tx = self
208 .pool
209 .begin()
210 .await
211 .map_err(|e| PipelineError::database_error(format!("Failed to start transaction: {}", e)))?;
212
213 let pipeline_query = r#"
215 INSERT INTO pipelines (id, name, archived, created_at, updated_at)
216 VALUES (?, ?, ?, ?, ?)
217 "#;
218
219 sqlx::query(pipeline_query)
220 .bind(entity.id().to_string())
221 .bind(entity.name())
222 .bind(entity.archived())
223 .bind(entity.created_at().to_rfc3339())
224 .bind(entity.updated_at().to_rfc3339())
225 .execute(&mut *tx)
226 .await
227 .map_err(|e| PipelineError::database_error(format!("Failed to insert pipeline: {}", e)))?;
228
229 for (key, value) in entity.configuration() {
231 let config_query = r#"
232 INSERT INTO pipeline_configuration (pipeline_id, key, value, created_at, updated_at)
233 VALUES (?, ?, ?, ?, ?)
234 "#;
235
236 sqlx::query(config_query)
237 .bind(entity.id().to_string())
238 .bind(key)
239 .bind(value)
240 .bind(entity.created_at().to_rfc3339())
241 .bind(entity.updated_at().to_rfc3339())
242 .execute(&mut *tx)
243 .await
244 .map_err(|e| PipelineError::database_error(format!("Failed to insert configuration: {}", e)))?;
245 }
246
247 for (index, stage) in entity.stages().iter().enumerate() {
249 let stage_query = r#"
250 INSERT INTO pipeline_stages (
251 id, pipeline_id, name, stage_type, enabled, stage_order,
252 algorithm, parallel_processing, chunk_size, created_at, updated_at
253 )
254 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
255 "#;
256
257 sqlx::query(stage_query)
258 .bind(stage.id().to_string())
259 .bind(entity.id().to_string())
260 .bind(stage.name())
261 .bind(stage.stage_type().to_string())
262 .bind(stage.is_enabled())
263 .bind(index as i32)
264 .bind(&stage.configuration().algorithm)
265 .bind(stage.configuration().parallel_processing)
266 .bind(stage.configuration().chunk_size.map(|s| s as i64))
267 .bind(stage.created_at().to_rfc3339())
268 .bind(stage.updated_at().to_rfc3339())
269 .execute(&mut *tx)
270 .await
271 .map_err(|e| PipelineError::database_error(format!("Failed to insert stage: {}", e)))?;
272
273 for (param_key, param_value) in &stage.configuration().parameters {
275 let param_query = r#"
276 INSERT INTO stage_parameters (stage_id, key, value, created_at, updated_at)
277 VALUES (?, ?, ?, ?, ?)
278 "#;
279
280 sqlx::query(param_query)
281 .bind(stage.id().to_string())
282 .bind(param_key)
283 .bind(param_value)
284 .bind(stage.created_at().to_rfc3339())
285 .bind(stage.updated_at().to_rfc3339())
286 .execute(&mut *tx)
287 .await
288 .map_err(|e| PipelineError::database_error(format!("Failed to insert stage parameter: {}", e)))?;
289 }
290 }
291
292 tx.commit()
298 .await
299 .map_err(|e| PipelineError::database_error(format!("Failed to commit transaction: {}", e)))?;
300
301 debug!(
302 pipeline_name = %entity.name(),
303 "Successfully saved pipeline with ACID transaction"
304 );
305 Ok(())
306 }
307
308 pub async fn find_by_id(&self, id: PipelineId) -> Result<Option<Pipeline>, PipelineError> {
310 self.load_pipeline_from_db(id).await
311 }
312
313 pub async fn update(&self, pipeline: &Pipeline) -> Result<(), PipelineError> {
315 debug!(
317 pipeline_name = %pipeline.name(),
318 "SqlitePipelineRepository::update called"
319 );
320 Ok(())
321 }
322
323 pub async fn delete(&self, id: PipelineId) -> Result<bool, PipelineError> {
325 debug!(pipeline_id = %id, "Starting delete for pipeline");
326
327 let mut tx = self
328 .pool
329 .begin()
330 .await
331 .map_err(|e| PipelineError::database_error(format!("Failed to begin transaction: {}", e)))?;
332
333 let now = chrono::Utc::now().to_rfc3339();
334 let id_str = id.to_string();
335
336 debug!("Archiving pipeline stages...");
337 let stages_query = r#"
339 UPDATE pipeline_stages
340 SET archived = true, updated_at = ?
341 WHERE pipeline_id = ? AND archived = false
342 "#;
343
344 let stages_result = sqlx::query(stages_query)
345 .bind(&now)
346 .bind(&id_str)
347 .execute(&mut *tx)
348 .await
349 .map_err(|e| PipelineError::database_error(format!("Failed to archive pipeline stages: {}", e)))?;
350
351 debug!(
352 stages_archived = stages_result.rows_affected(),
353 "Archived pipeline stages"
354 );
355
356 debug!("Archiving stage parameters...");
357 let params_query = r#"
359 UPDATE stage_parameters
360 SET archived = true, updated_at = ?
361 WHERE stage_id IN (
362 SELECT id FROM pipeline_stages
363 WHERE pipeline_id = ?
364 ) AND archived = false
365 "#;
366
367 let params_result = sqlx::query(params_query)
368 .bind(&now)
369 .bind(&id_str)
370 .execute(&mut *tx)
371 .await
372 .map_err(|e| PipelineError::database_error(format!("Failed to archive stage parameters: {}", e)))?;
373
374 debug!(
375 parameters_archived = params_result.rows_affected(),
376 "Archived stage parameters"
377 );
378
379 debug!("Archiving pipeline configuration...");
380 let config_query = r#"
382 UPDATE pipeline_configuration
383 SET archived = true, updated_at = ?
384 WHERE pipeline_id = ? AND archived = false
385 "#;
386
387 let config_result = sqlx::query(config_query)
388 .bind(&now)
389 .bind(&id_str)
390 .execute(&mut *tx)
391 .await
392 .map_err(|e| PipelineError::database_error(format!("Failed to archive pipeline configuration: {}", e)))?;
393
394 debug!(
395 config_entries_archived = config_result.rows_affected(),
396 "Archived config entries"
397 );
398
399 debug!("Archiving main pipeline...");
400 let pipeline_query = r#"
402 UPDATE pipelines
403 SET archived = true, updated_at = ?
404 WHERE id = ? AND archived = false
405 "#;
406
407 let result = sqlx::query(pipeline_query)
408 .bind(&now)
409 .bind(&id_str)
410 .execute(&mut *tx)
411 .await
412 .map_err(|e| PipelineError::database_error(format!("Failed to archive pipeline: {}", e)))?;
413
414 let success = result.rows_affected() > 0;
415 debug!(
416 success = success,
417 rows_affected = result.rows_affected(),
418 "Pipeline archive result"
419 );
420
421 if success {
422 tx.commit()
423 .await
424 .map_err(|e| PipelineError::database_error(format!("Failed to commit archive transaction: {}", e)))?;
425 debug!("Transaction committed successfully");
426 } else {
427 tx.rollback()
428 .await
429 .map_err(|e| PipelineError::database_error(format!("Failed to rollback archive transaction: {}", e)))?;
430 debug!("Transaction rolled back");
431 }
432
433 Ok(success)
434 }
435
436 pub async fn list_all(&self) -> Result<Vec<Pipeline>, PipelineError> {
438 debug!("SqlitePipelineRepository::list_all called (excluding archived)");
439
440 let query = "SELECT id FROM pipelines WHERE archived = false ORDER BY name";
442 let rows = sqlx::query(query)
443 .fetch_all(&self.pool)
444 .await
445 .map_err(|e| PipelineError::database_error(format!("Failed to query pipelines: {}", e)))?;
446
447 let mut pipelines = Vec::new();
448 for row in rows {
449 let id_str: String = row.get("id");
450 let pipeline_id = PipelineId::from_string(&id_str)?;
451
452 if let Some(pipeline) = self.load_pipeline_from_db(pipeline_id).await? {
453 pipelines.push(pipeline);
454 }
455 }
456
457 debug!(pipeline_count = pipelines.len(), "Found active pipelines");
458 Ok(pipelines)
459 }
460
461 pub async fn find_all(&self) -> Result<Vec<Pipeline>, PipelineError> {
464 self.list_all().await
465 }
466
467 pub async fn list_archived(&self) -> Result<Vec<Pipeline>, PipelineError> {
469 debug!("SqlitePipelineRepository::list_archived called");
470
471 let query = "SELECT id FROM pipelines WHERE archived = true ORDER BY name";
473 let rows = sqlx::query(query)
474 .fetch_all(&self.pool)
475 .await
476 .map_err(|e| PipelineError::database_error(format!("Failed to query pipelines: {}", e)))?;
477
478 let mut pipelines = Vec::new();
479 for row in rows {
480 let id_str: String = row.get("id");
481 let pipeline_id = PipelineId::from_string(&id_str)?;
482
483 if let Some(pipeline) = self.load_pipeline_from_db_with_archived(pipeline_id, true).await? {
484 pipelines.push(pipeline);
485 }
486 }
487
488 debug!(pipeline_count = pipelines.len(), "Found archived pipelines");
489 Ok(pipelines)
490 }
491
492 pub async fn exists(&self, id: PipelineId) -> Result<bool, PipelineError> {
494 let query = "SELECT 1 FROM pipelines WHERE id = ? AND archived = false";
495 let result = sqlx::query(query)
496 .bind(id.to_string())
497 .fetch_optional(&self.pool)
498 .await
499 .map_err(|e| PipelineError::database_error(format!("Failed to check pipeline existence: {}", e)))?;
500
501 Ok(result.is_some())
502 }
503
504 pub async fn find_by_name(&self, name: &str) -> Result<Option<Pipeline>, PipelineError> {
506 debug!("SqlitePipelineRepository::find_by_name called for: {}", name);
507
508 let query = "SELECT id FROM pipelines WHERE name = ? AND archived = false";
509 let row = sqlx::query(query)
510 .bind(name)
511 .fetch_optional(&self.pool)
512 .await
513 .map_err(|e| PipelineError::database_error(format!("Failed to find pipeline by name: {}", e)))?;
514
515 if let Some(row) = row {
516 let id_str: String = row.get("id");
517 let pipeline_id = PipelineId::from_string(&id_str)?;
518 self.load_pipeline_from_db(pipeline_id).await
519 } else {
520 debug!(pipeline_name = name, "No pipeline found with name");
521 Ok(None)
522 }
523 }
524
525 pub async fn list_paginated(&self, offset: usize, limit: usize) -> Result<Vec<Pipeline>, PipelineError> {
527 let query = "SELECT id FROM pipelines WHERE archived = false ORDER BY name LIMIT ? OFFSET ?";
528 let rows = sqlx::query(query)
529 .bind(limit as i64)
530 .bind(offset as i64)
531 .fetch_all(&self.pool)
532 .await
533 .map_err(|e| PipelineError::database_error(format!("Failed to query paginated pipelines: {}", e)))?;
534
535 let mut pipelines = Vec::new();
536 for row in rows {
537 let id_str: String = row.get("id");
538 let pipeline_id = PipelineId::from_string(&id_str)?;
539
540 if let Some(pipeline) = self.load_pipeline_from_db(pipeline_id).await? {
541 pipelines.push(pipeline);
542 }
543 }
544
545 Ok(pipelines)
546 }
547
548 pub async fn count(&self) -> Result<usize, PipelineError> {
550 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM pipelines WHERE archived = false")
551 .fetch_one(&self.pool)
552 .await
553 .map_err(|e| PipelineError::database_error(format!("Failed to count pipelines: {}", e)))?;
554
555 Ok(count as usize)
556 }
557
558 pub async fn find_by_config(&self, key: &str, value: &str) -> Result<Vec<Pipeline>, PipelineError> {
560 debug!(
561 config_key = key,
562 config_value = value,
563 "SqlitePipelineRepository::find_by_config called"
564 );
565
566 let query = r#"
567 SELECT DISTINCT p.id
568 FROM pipelines p
569 JOIN pipeline_configuration pc ON p.id = pc.pipeline_id
570 WHERE pc.key = ? AND pc.value = ? AND p.archived = false AND pc.archived = false
571 "#;
572
573 let rows = sqlx::query(query)
574 .bind(key)
575 .bind(value)
576 .fetch_all(&self.pool)
577 .await
578 .map_err(|e| PipelineError::database_error(format!("Failed to find pipelines by config: {}", e)))?;
579
580 let mut pipelines = Vec::new();
581 for row in rows {
582 let id_str: String = row.get("id");
583 let pipeline_id = PipelineId::from_string(&id_str)?;
584
585 if let Some(pipeline) = self.load_pipeline_from_db(pipeline_id).await? {
586 pipelines.push(pipeline);
587 }
588 }
589
590 debug!(
591 pipeline_count = pipelines.len(),
592 config_key = key,
593 config_value = value,
594 "Found pipelines with config"
595 );
596 Ok(pipelines)
597 }
598
599 pub async fn archive(&self, id: PipelineId) -> Result<bool, PipelineError> {
601 self.delete(id).await
602 }
603
604 pub async fn restore(&self, id: PipelineId) -> Result<bool, PipelineError> {
606 let query = r#"
607 UPDATE pipelines
608 SET archived = false, updated_at = ?
609 WHERE id = ? AND archived = true
610 "#;
611
612 let now = chrono::Utc::now().to_rfc3339();
613 let result = sqlx::query(query)
614 .bind(now)
615 .bind(id.to_string())
616 .execute(&self.pool)
617 .await
618 .map_err(|e| PipelineError::database_error(format!("Failed to restore pipeline: {}", e)))?;
619
620 Ok(result.rows_affected() > 0)
621 }
622
623 async fn load_pipeline_from_db(&self, id: PipelineId) -> Result<Option<Pipeline>, PipelineError> {
625 self.load_pipeline_from_db_with_archived(id, false).await
626 }
627
628 async fn load_pipeline_from_db_with_archived(
629 &self,
630 id: PipelineId,
631 include_archived: bool,
632 ) -> Result<Option<Pipeline>, PipelineError> {
633 debug!("Loading pipeline from structured DB: {}", id);
634
635 let pipeline_query = if include_archived {
637 "SELECT id, name, archived, created_at, updated_at FROM pipelines WHERE id = ?"
638 } else {
639 "SELECT id, name, archived, created_at, updated_at FROM pipelines WHERE id = ? AND archived = false"
640 };
641 let pipeline_row = sqlx::query(pipeline_query)
642 .bind(id.to_string())
643 .fetch_optional(&self.pool)
644 .await
645 .map_err(|e| PipelineError::database_error(format!("Failed to load pipeline: {}", e)))?;
646
647 let pipeline_row = match pipeline_row {
648 Some(row) => row,
649 None => {
650 return Ok(None);
651 }
652 };
653
654 let name: String = pipeline_row.get("name");
656 let archived: bool = pipeline_row.get("archived");
657 let created_at_str: String = pipeline_row.get("created_at");
658 let updated_at_str: String = pipeline_row.get("updated_at");
659
660 let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
661 .map_err(|e| PipelineError::SerializationError(format!("Invalid created_at format: {}", e)))?
662 .with_timezone(&chrono::Utc);
663 let updated_at = chrono::DateTime::parse_from_rfc3339(&updated_at_str)
664 .map_err(|e| PipelineError::SerializationError(format!("Invalid updated_at format: {}", e)))?
665 .with_timezone(&chrono::Utc);
666
667 let config_query = "SELECT key, value FROM pipeline_configuration WHERE pipeline_id = ?";
669 let config_rows = sqlx::query(config_query)
670 .bind(id.to_string())
671 .fetch_all(&self.pool)
672 .await
673 .map_err(|e| PipelineError::database_error(format!("Failed to load configuration: {}", e)))?;
674
675 let mut configuration = HashMap::new();
676 for row in config_rows {
677 let key: String = row.get("key");
678 let value: String = row.get("value");
679 configuration.insert(key, value);
680 }
681
682 let stage_query = r#"
684 SELECT id, name, stage_type, enabled, stage_order, algorithm,
685 parallel_processing, chunk_size, created_at, updated_at
686 FROM pipeline_stages
687 WHERE pipeline_id = ?
688 ORDER BY stage_order
689 "#;
690 let stage_rows = sqlx::query(stage_query)
691 .bind(id.to_string())
692 .fetch_all(&self.pool)
693 .await
694 .map_err(|e| PipelineError::database_error(format!("Failed to load stages: {}", e)))?;
695
696 let mut stages = Vec::new();
697 for row in stage_rows {
698 let stage_id_str: String = row.get("id");
699 let stage_name: String = row.get("name");
700 let stage_type_str: String = row.get("stage_type");
701 let _enabled: bool = row.get("enabled");
702 let stage_order: i32 = row.get("stage_order");
703 let algorithm: String = row.get("algorithm");
704 let parallel_processing: bool = row.get("parallel_processing");
705 let chunk_size: Option<i64> = row.get("chunk_size");
706 let created_at_str: String = row.get("created_at");
707 let updated_at_str: String = row.get("updated_at");
708
709 let stage_type = stage_type_str
711 .parse::<StageType>()
712 .map_err(|e| PipelineError::SerializationError(format!("Invalid stage type: {}", e)))?;
713
714 let params_query = "SELECT key, value FROM stage_parameters WHERE stage_id = ?";
716 let params_rows = sqlx::query(params_query)
717 .bind(&stage_id_str)
718 .fetch_all(&self.pool)
719 .await
720 .map_err(|e| PipelineError::database_error(format!("Failed to load stage parameters: {}", e)))?;
721
722 let mut parameters = std::collections::HashMap::new();
723 for param_row in params_rows {
724 let key: String = param_row.get("key");
725 let value: String = param_row.get("value");
726 parameters.insert(key, value);
727 }
728
729 let stage_config = StageConfiguration {
731 algorithm,
732 operation: adaptive_pipeline_domain::entities::Operation::Forward, parameters,
734 parallel_processing,
735 chunk_size: chunk_size.map(|s| s as usize),
736 };
737
738 let _created_at = chrono::DateTime::parse_from_rfc3339(&created_at_str)
740 .map_err(|e| PipelineError::SerializationError(format!("Invalid stage created_at format: {}", e)))?
741 .with_timezone(&chrono::Utc);
742 let _updated_at = chrono::DateTime::parse_from_rfc3339(&updated_at_str)
743 .map_err(|e| PipelineError::SerializationError(format!("Invalid stage updated_at format: {}", e)))?
744 .with_timezone(&chrono::Utc);
745
746 let stage = PipelineStage::new(stage_name, stage_type, stage_config, stage_order as u32)?;
748
749 stages.push(stage);
754 }
755
756 let metrics = ProcessingMetrics::new(0, 0); let data = adaptive_pipeline_domain::entities::pipeline::PipelineData {
761 id,
762 name,
763 archived,
764 configuration,
765 metrics,
766 stages,
767 created_at,
768 updated_at,
769 };
770
771 let pipeline = Pipeline::from_database(data)?;
772
773 debug!("Successfully loaded pipeline: {}", pipeline.name());
774 Ok(Some(pipeline))
775 }
776}
777
778#[async_trait::async_trait]
780impl adaptive_pipeline_domain::repositories::pipeline_repository::PipelineRepository for SqlitePipelineRepository {
781 async fn save(&self, entity: &Pipeline) -> Result<(), PipelineError> {
782 self.save(entity).await
783 }
784
785 async fn find_by_id(&self, id: PipelineId) -> Result<Option<Pipeline>, PipelineError> {
786 self.find_by_id(id).await
787 }
788
789 async fn update(&self, pipeline: &Pipeline) -> Result<(), PipelineError> {
790 self.update(pipeline).await
791 }
792
793 async fn delete(&self, id: PipelineId) -> Result<bool, PipelineError> {
794 self.delete(id).await
795 }
796
797 async fn list_all(&self) -> Result<Vec<Pipeline>, PipelineError> {
798 self.list_all().await
799 }
800
801 async fn find_all(&self) -> Result<Vec<Pipeline>, PipelineError> {
802 self.find_all().await
803 }
804
805 async fn list_archived(&self) -> Result<Vec<Pipeline>, PipelineError> {
806 self.list_archived().await
807 }
808
809 async fn exists(&self, id: PipelineId) -> Result<bool, PipelineError> {
810 self.exists(id).await
811 }
812
813 async fn find_by_name(&self, name: &str) -> Result<Option<Pipeline>, PipelineError> {
814 self.find_by_name(name).await
815 }
816
817 async fn list_paginated(&self, offset: usize, limit: usize) -> Result<Vec<Pipeline>, PipelineError> {
818 self.list_paginated(offset, limit).await
819 }
820
821 async fn count(&self) -> Result<usize, PipelineError> {
822 self.count().await
823 }
824
825 async fn find_by_config(&self, key: &str, value: &str) -> Result<Vec<Pipeline>, PipelineError> {
826 self.find_by_config(key, value).await
827 }
828
829 async fn archive(&self, id: PipelineId) -> Result<bool, PipelineError> {
830 self.archive(id).await
831 }
832
833 async fn restore(&self, id: PipelineId) -> Result<bool, PipelineError> {
834 self.restore(id).await
835 }
836}
837
838#[cfg(test)]
839mod tests {
840 use super::*;
841
842 #[test]
844 fn test_database_url_formatting() {
845 let test_cases = vec![
850 ("/path/to/database.db", "sqlite:///path/to/database.db"),
851 ("./local.db", "sqlite://./local.db"),
852 (":memory:", "sqlite::memory:"),
853 ("/tmp/test.db", "sqlite:///tmp/test.db"),
854 ];
855
856 for (input_path, expected_url) in test_cases {
857 let formatted_url = if input_path == ":memory:" {
858 "sqlite::memory:".to_string()
859 } else {
860 format!("sqlite://{}", input_path)
861 };
862 assert_eq!(
863 formatted_url, expected_url,
864 "Database URL formatting failed for path: {}",
865 input_path
866 );
867 }
868 }
869
870 #[test]
872 fn test_error_handling() {
873 let db_error = PipelineError::DatabaseError("Connection failed".to_string());
877 match db_error {
878 PipelineError::DatabaseError(msg) => {
879 assert_eq!(msg, "Connection failed");
880 }
881 _ => panic!("Expected DatabaseError"),
882 }
883
884 let db_error_helper = PipelineError::database_error("SQL syntax error");
886 match db_error_helper {
887 PipelineError::DatabaseError(msg) => {
888 assert_eq!(msg, "SQL syntax error");
889 }
890 _ => panic!("Expected DatabaseError from helper"),
891 }
892 }
893
894 }