pub struct SqlitePipelineRepository { /* private fields */ }Expand description
Structured SQLite pipeline repository using proper database columns
This implementation provides a concrete SQLite-based implementation of the pipeline repository interface, following Domain-Driven Design principles and proper relational database design patterns.
§Key Features
- Relational Design: Proper normalized database schema with separate tables
- Type Safety: Strong typing with compile-time query validation using sqlx
- Transaction Support: ACID transactions for data consistency
- Performance Optimization: Efficient queries with proper indexing
- Error Handling: Comprehensive error handling and recovery
§Database Schema
The repository uses a normalized relational schema:
- pipelines: Main pipeline entity data
- pipeline_stages: Pipeline stage configurations
- pipeline_metrics: Performance and execution metrics
§Architecture
This implementation avoids JSON serialization issues by using proper relational database design with separate tables for pipeline data, configuration, stages, and metrics.
§Examples
§Visibility
- Public: For dependency injection and external usage
- Private Fields: Database connection pool is encapsulated
Implementations§
Source§impl SqlitePipelineRepository
impl SqlitePipelineRepository
Sourcepub async fn new(database_path: &str) -> Result<Self, PipelineError>
pub async fn new(database_path: &str) -> Result<Self, PipelineError>
Creates a new structured pipeline repository with database connection
This constructor establishes a connection pool to the SQLite database, which will be used for all subsequent repository operations. The connection pool provides efficient resource management and supports concurrent access.
§Why Connection Pooling?
Connection pooling is used because:
- Performance: Reusing connections is faster than creating new ones
- Resource Management: Limits the number of open database connections
- Concurrency: Allows multiple operations to share connections safely
- Reliability: Automatically handles connection failures and retries
§Arguments
database_path- Path to the SQLite database file, or special values::memory:orsqlite::memory:for in-memory database (useful for testing)- Any file path like
"data/pipelines.db"for persistent storage
§Returns
Ok(SqlitePipelineRepository)- Successfully connected repositoryErr(PipelineError)- Connection failed with error details
§Errors
This function returns an error if:
- The database file cannot be opened or created
- File permissions prevent access
- The database format is incompatible
- The connection URL is malformed
§Examples
§Implementation Notes
The function normalizes the database path to sqlx’s expected format:
:memory:→sqlite::memory:- File paths →
sqlite://<path>
Sourcepub async fn save(&self, entity: &Pipeline) -> Result<(), PipelineError>
pub async fn save(&self, entity: &Pipeline) -> Result<(), PipelineError>
Saves a pipeline to the database with ACID transaction guarantees
This method persists a complete pipeline entity to the database, including all associated data: configuration parameters, stages, and stage parameters. The entire operation is wrapped in a database transaction to ensure atomicity - either all data is saved successfully, or none of it is.
§Why ACID Transactions?
ACID (Atomicity, Consistency, Isolation, Durability) transactions ensure:
- Atomicity: All-or-nothing - if any part fails, everything rolls back
- Consistency: Database constraints are always maintained
- Isolation: Concurrent operations don’t interfere with each other
- Durability: Once committed, data survives system crashes
§What Gets Saved?
The method saves to multiple related tables:
- pipelines: Main pipeline record (id, name, archived status, timestamps)
- pipeline_configuration: Key-value configuration parameters
- pipeline_stages: Processing stages with their configurations
- stage_parameters: Parameters for each stage
§Arguments
entity- The pipeline entity to save. Must be a complete, valid pipeline with all required fields populated.
§Returns
Ok(())- Pipeline saved successfullyErr(PipelineError)- Save operation failed, transaction rolled back
§Errors
This function returns an error if:
- A pipeline with the same ID already exists (unique constraint violation)
- Database connection is lost during the operation
- Any SQL query fails (syntax error, constraint violation, etc.)
- Transaction cannot be started or committed
Note: If an error occurs, the transaction is automatically rolled back, leaving the database in its original state.
§Examples
§Thread Safety
This method is safe to call concurrently from multiple tasks. The database connection pool handles concurrent access, and transaction isolation prevents interference between concurrent saves.
§Performance
- Complexity: O(n + m) where n = number of config entries, m = number of stages
- Database Writes: Multiple INSERT statements within one transaction
- Network: Single round-trip for transaction commit
- Locking: Row-level locks acquired during transaction
Sourcepub async fn find_by_id(
&self,
id: PipelineId,
) -> Result<Option<Pipeline>, PipelineError>
pub async fn find_by_id( &self, id: PipelineId, ) -> Result<Option<Pipeline>, PipelineError>
PUBLIC: Domain interface - Find pipeline by ID
Sourcepub async fn update(&self, pipeline: &Pipeline) -> Result<(), PipelineError>
pub async fn update(&self, pipeline: &Pipeline) -> Result<(), PipelineError>
PUBLIC: Domain interface - Update a pipeline
Sourcepub async fn delete(&self, id: PipelineId) -> Result<bool, PipelineError>
pub async fn delete(&self, id: PipelineId) -> Result<bool, PipelineError>
PUBLIC: Domain interface - Soft delete a pipeline with cascading archive
Sourcepub async fn list_all(&self) -> Result<Vec<Pipeline>, PipelineError>
pub async fn list_all(&self) -> Result<Vec<Pipeline>, PipelineError>
PUBLIC: Domain interface - List all active pipelines
Sourcepub async fn find_all(&self) -> Result<Vec<Pipeline>, PipelineError>
pub async fn find_all(&self) -> Result<Vec<Pipeline>, PipelineError>
PUBLIC: Domain interface - Find all active pipelines (alias for list_all)
Sourcepub async fn list_archived(&self) -> Result<Vec<Pipeline>, PipelineError>
pub async fn list_archived(&self) -> Result<Vec<Pipeline>, PipelineError>
PUBLIC: Domain interface - List archived pipelines
Sourcepub async fn exists(&self, id: PipelineId) -> Result<bool, PipelineError>
pub async fn exists(&self, id: PipelineId) -> Result<bool, PipelineError>
PUBLIC: Domain interface - Check if pipeline exists
Sourcepub async fn find_by_name(
&self,
name: &str,
) -> Result<Option<Pipeline>, PipelineError>
pub async fn find_by_name( &self, name: &str, ) -> Result<Option<Pipeline>, PipelineError>
PUBLIC: Domain interface - Find pipeline by name
Sourcepub async fn list_paginated(
&self,
offset: usize,
limit: usize,
) -> Result<Vec<Pipeline>, PipelineError>
pub async fn list_paginated( &self, offset: usize, limit: usize, ) -> Result<Vec<Pipeline>, PipelineError>
PUBLIC: Domain interface - List pipelines with pagination
Sourcepub async fn count(&self) -> Result<usize, PipelineError>
pub async fn count(&self) -> Result<usize, PipelineError>
PUBLIC: Domain interface - Count active pipelines
Sourcepub async fn find_by_config(
&self,
key: &str,
value: &str,
) -> Result<Vec<Pipeline>, PipelineError>
pub async fn find_by_config( &self, key: &str, value: &str, ) -> Result<Vec<Pipeline>, PipelineError>
PUBLIC: Domain interface - Find pipelines by configuration parameter
Sourcepub async fn archive(&self, id: PipelineId) -> Result<bool, PipelineError>
pub async fn archive(&self, id: PipelineId) -> Result<bool, PipelineError>
PUBLIC: Domain interface - Archive a pipeline (soft delete)
Sourcepub async fn restore(&self, id: PipelineId) -> Result<bool, PipelineError>
pub async fn restore(&self, id: PipelineId) -> Result<bool, PipelineError>
PUBLIC: Domain interface - Restore an archived pipeline
Trait Implementations§
Source§impl PipelineRepository for SqlitePipelineRepository
impl PipelineRepository for SqlitePipelineRepository
Source§fn save<'life0, 'life1, 'async_trait>(
&'life0 self,
entity: &'life1 Pipeline,
) -> Pin<Box<dyn Future<Output = Result<(), PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn save<'life0, 'life1, 'async_trait>(
&'life0 self,
entity: &'life1 Pipeline,
) -> Pin<Box<dyn Future<Output = Result<(), PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn find_by_id<'life0, 'async_trait>(
&'life0 self,
id: PipelineId,
) -> Pin<Box<dyn Future<Output = Result<Option<Pipeline>, PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn find_by_id<'life0, 'async_trait>(
&'life0 self,
id: PipelineId,
) -> Pin<Box<dyn Future<Output = Result<Option<Pipeline>, PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn update<'life0, 'life1, 'async_trait>(
&'life0 self,
pipeline: &'life1 Pipeline,
) -> Pin<Box<dyn Future<Output = Result<(), PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn update<'life0, 'life1, 'async_trait>(
&'life0 self,
pipeline: &'life1 Pipeline,
) -> Pin<Box<dyn Future<Output = Result<(), PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn delete<'life0, 'async_trait>(
&'life0 self,
id: PipelineId,
) -> Pin<Box<dyn Future<Output = Result<bool, PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn delete<'life0, 'async_trait>(
&'life0 self,
id: PipelineId,
) -> Pin<Box<dyn Future<Output = Result<bool, PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn list_all<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<Pipeline>, PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_all<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<Pipeline>, PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn find_all<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<Pipeline>, PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn find_all<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<Pipeline>, PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn list_archived<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<Pipeline>, PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_archived<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<Pipeline>, PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn exists<'life0, 'async_trait>(
&'life0 self,
id: PipelineId,
) -> Pin<Box<dyn Future<Output = Result<bool, PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn exists<'life0, 'async_trait>(
&'life0 self,
id: PipelineId,
) -> Pin<Box<dyn Future<Output = Result<bool, PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn find_by_name<'life0, 'life1, 'async_trait>(
&'life0 self,
name: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<Option<Pipeline>, PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn find_by_name<'life0, 'life1, 'async_trait>(
&'life0 self,
name: &'life1 str,
) -> Pin<Box<dyn Future<Output = Result<Option<Pipeline>, PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn list_paginated<'life0, 'async_trait>(
&'life0 self,
offset: usize,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<Vec<Pipeline>, PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn list_paginated<'life0, 'async_trait>(
&'life0 self,
offset: usize,
limit: usize,
) -> Pin<Box<dyn Future<Output = Result<Vec<Pipeline>, PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn count<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<usize, PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn count<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<usize, PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn find_by_config<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
key: &'life1 str,
value: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<Vec<Pipeline>, PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
fn find_by_config<'life0, 'life1, 'life2, 'async_trait>(
&'life0 self,
key: &'life1 str,
value: &'life2 str,
) -> Pin<Box<dyn Future<Output = Result<Vec<Pipeline>, PipelineError>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
'life2: 'async_trait,
Auto Trait Implementations§
impl Freeze for SqlitePipelineRepository
impl !RefUnwindSafe for SqlitePipelineRepository
impl Send for SqlitePipelineRepository
impl Sync for SqlitePipelineRepository
impl Unpin for SqlitePipelineRepository
impl !UnwindSafe for SqlitePipelineRepository
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more