SqlitePipelineRepository

Struct SqlitePipelineRepository 

Source
pub struct SqlitePipelineRepository { /* private fields */ }
Expand description

Structured SQLite pipeline repository using proper database columns

This implementation provides a concrete SQLite-based implementation of the pipeline repository interface, following Domain-Driven Design principles and proper relational database design patterns.

§Key Features

  • Relational Design: Proper normalized database schema with separate tables
  • Type Safety: Strong typing with compile-time query validation using sqlx
  • Transaction Support: ACID transactions for data consistency
  • Performance Optimization: Efficient queries with proper indexing
  • Error Handling: Comprehensive error handling and recovery

§Database Schema

The repository uses a normalized relational schema:

  • pipelines: Main pipeline entity data
  • pipeline_stages: Pipeline stage configurations
  • pipeline_metrics: Performance and execution metrics

§Architecture

This implementation avoids JSON serialization issues by using proper relational database design with separate tables for pipeline data, configuration, stages, and metrics.

§Examples

§Visibility

  • Public: For dependency injection and external usage
  • Private Fields: Database connection pool is encapsulated

Implementations§

Source§

impl SqlitePipelineRepository

Source

pub async fn new(database_path: &str) -> Result<Self, PipelineError>

Creates a new structured pipeline repository with database connection

This constructor establishes a connection pool to the SQLite database, which will be used for all subsequent repository operations. The connection pool provides efficient resource management and supports concurrent access.

§Why Connection Pooling?

Connection pooling is used because:

  1. Performance: Reusing connections is faster than creating new ones
  2. Resource Management: Limits the number of open database connections
  3. Concurrency: Allows multiple operations to share connections safely
  4. Reliability: Automatically handles connection failures and retries
§Arguments
  • database_path - Path to the SQLite database file, or special values:
    • :memory: or sqlite::memory: for in-memory database (useful for testing)
    • Any file path like "data/pipelines.db" for persistent storage
§Returns
  • Ok(SqlitePipelineRepository) - Successfully connected repository
  • Err(PipelineError) - Connection failed with error details
§Errors

This function returns an error if:

  • The database file cannot be opened or created
  • File permissions prevent access
  • The database format is incompatible
  • The connection URL is malformed
§Examples
§Implementation Notes

The function normalizes the database path to sqlx’s expected format:

  • :memory:sqlite::memory:
  • File paths → sqlite://<path>
Source

pub async fn save(&self, entity: &Pipeline) -> Result<(), PipelineError>

Saves a pipeline to the database with ACID transaction guarantees

This method persists a complete pipeline entity to the database, including all associated data: configuration parameters, stages, and stage parameters. The entire operation is wrapped in a database transaction to ensure atomicity - either all data is saved successfully, or none of it is.

§Why ACID Transactions?

ACID (Atomicity, Consistency, Isolation, Durability) transactions ensure:

  1. Atomicity: All-or-nothing - if any part fails, everything rolls back
  2. Consistency: Database constraints are always maintained
  3. Isolation: Concurrent operations don’t interfere with each other
  4. Durability: Once committed, data survives system crashes
§What Gets Saved?

The method saves to multiple related tables:

  • pipelines: Main pipeline record (id, name, archived status, timestamps)
  • pipeline_configuration: Key-value configuration parameters
  • pipeline_stages: Processing stages with their configurations
  • stage_parameters: Parameters for each stage
§Arguments
  • entity - The pipeline entity to save. Must be a complete, valid pipeline with all required fields populated.
§Returns
  • Ok(()) - Pipeline saved successfully
  • Err(PipelineError) - Save operation failed, transaction rolled back
§Errors

This function returns an error if:

  • A pipeline with the same ID already exists (unique constraint violation)
  • Database connection is lost during the operation
  • Any SQL query fails (syntax error, constraint violation, etc.)
  • Transaction cannot be started or committed

Note: If an error occurs, the transaction is automatically rolled back, leaving the database in its original state.

§Examples
§Thread Safety

This method is safe to call concurrently from multiple tasks. The database connection pool handles concurrent access, and transaction isolation prevents interference between concurrent saves.

§Performance
  • Complexity: O(n + m) where n = number of config entries, m = number of stages
  • Database Writes: Multiple INSERT statements within one transaction
  • Network: Single round-trip for transaction commit
  • Locking: Row-level locks acquired during transaction
Source

pub async fn find_by_id( &self, id: PipelineId, ) -> Result<Option<Pipeline>, PipelineError>

PUBLIC: Domain interface - Find pipeline by ID

Source

pub async fn update(&self, pipeline: &Pipeline) -> Result<(), PipelineError>

PUBLIC: Domain interface - Update a pipeline

Source

pub async fn delete(&self, id: PipelineId) -> Result<bool, PipelineError>

PUBLIC: Domain interface - Soft delete a pipeline with cascading archive

Source

pub async fn list_all(&self) -> Result<Vec<Pipeline>, PipelineError>

PUBLIC: Domain interface - List all active pipelines

Source

pub async fn find_all(&self) -> Result<Vec<Pipeline>, PipelineError>

PUBLIC: Domain interface - Find all active pipelines (alias for list_all)

Source

pub async fn list_archived(&self) -> Result<Vec<Pipeline>, PipelineError>

PUBLIC: Domain interface - List archived pipelines

Source

pub async fn exists(&self, id: PipelineId) -> Result<bool, PipelineError>

PUBLIC: Domain interface - Check if pipeline exists

Source

pub async fn find_by_name( &self, name: &str, ) -> Result<Option<Pipeline>, PipelineError>

PUBLIC: Domain interface - Find pipeline by name

Source

pub async fn list_paginated( &self, offset: usize, limit: usize, ) -> Result<Vec<Pipeline>, PipelineError>

PUBLIC: Domain interface - List pipelines with pagination

Source

pub async fn count(&self) -> Result<usize, PipelineError>

PUBLIC: Domain interface - Count active pipelines

Source

pub async fn find_by_config( &self, key: &str, value: &str, ) -> Result<Vec<Pipeline>, PipelineError>

PUBLIC: Domain interface - Find pipelines by configuration parameter

Source

pub async fn archive(&self, id: PipelineId) -> Result<bool, PipelineError>

PUBLIC: Domain interface - Archive a pipeline (soft delete)

Source

pub async fn restore(&self, id: PipelineId) -> Result<bool, PipelineError>

PUBLIC: Domain interface - Restore an archived pipeline

Trait Implementations§

Source§

impl PipelineRepository for SqlitePipelineRepository

Source§

fn save<'life0, 'life1, 'async_trait>( &'life0 self, entity: &'life1 Pipeline, ) -> Pin<Box<dyn Future<Output = Result<(), PipelineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Saves a pipeline
Source§

fn find_by_id<'life0, 'async_trait>( &'life0 self, id: PipelineId, ) -> Pin<Box<dyn Future<Output = Result<Option<Pipeline>, PipelineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Finds a pipeline by ID
Source§

fn update<'life0, 'life1, 'async_trait>( &'life0 self, pipeline: &'life1 Pipeline, ) -> Pin<Box<dyn Future<Output = Result<(), PipelineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Updates a pipeline
Source§

fn delete<'life0, 'async_trait>( &'life0 self, id: PipelineId, ) -> Pin<Box<dyn Future<Output = Result<bool, PipelineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Deletes a pipeline by ID
Source§

fn list_all<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<Vec<Pipeline>, PipelineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Lists all pipelines
Source§

fn find_all<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<Vec<Pipeline>, PipelineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Finds all pipelines (alias for list_all)
Source§

fn list_archived<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<Vec<Pipeline>, PipelineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Lists archived pipelines
Source§

fn exists<'life0, 'async_trait>( &'life0 self, id: PipelineId, ) -> Pin<Box<dyn Future<Output = Result<bool, PipelineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Checks if a pipeline exists
Source§

fn find_by_name<'life0, 'life1, 'async_trait>( &'life0 self, name: &'life1 str, ) -> Pin<Box<dyn Future<Output = Result<Option<Pipeline>, PipelineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait,

Finds a pipeline by name
Source§

fn list_paginated<'life0, 'async_trait>( &'life0 self, offset: usize, limit: usize, ) -> Pin<Box<dyn Future<Output = Result<Vec<Pipeline>, PipelineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Lists pipelines with pagination
Source§

fn count<'life0, 'async_trait>( &'life0 self, ) -> Pin<Box<dyn Future<Output = Result<usize, PipelineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Counts total pipelines
Source§

fn find_by_config<'life0, 'life1, 'life2, 'async_trait>( &'life0 self, key: &'life1 str, value: &'life2 str, ) -> Pin<Box<dyn Future<Output = Result<Vec<Pipeline>, PipelineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait,

Finds pipelines by configuration parameter
Source§

fn archive<'life0, 'async_trait>( &'life0 self, id: PipelineId, ) -> Pin<Box<dyn Future<Output = Result<bool, PipelineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Archives a pipeline (soft delete)
Source§

fn restore<'life0, 'async_trait>( &'life0 self, id: PipelineId, ) -> Pin<Box<dyn Future<Output = Result<bool, PipelineError>> + Send + 'async_trait>>
where Self: 'async_trait, 'life0: 'async_trait,

Restores an archived pipeline

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,