pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
//! Shared SQL database utilities for sources and sinks
//!
//! This module provides common types and functions for working with SQL databases
//! across both source and sink implementations.

use serde::{Deserialize, Serialize};
use sqlx::postgres::{PgPool, PgPoolOptions};
use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};

use crate::error::{Error, Result};

/// SQL database driver
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum SqlDriver {
    /// SQLite database
    #[default]
    Sqlite,
    /// PostgreSQL database
    Postgres,
}

/// Internal SQL connection pool wrapper
#[derive(Debug)]
pub(crate) enum SqlPool {
    Sqlite(SqlitePool),
    Postgres(PgPool),
}

impl SqlPool {
    /// Create a new connection pool based on driver and connection string
    pub(crate) async fn connect(driver: SqlDriver, connection: &str) -> Result<Self> {
        match driver {
            SqlDriver::Sqlite => {
                let connection_url = if connection == ":memory:" {
                    "sqlite::memory:".to_string()
                } else {
                    format!("sqlite:{}", connection)
                };

                let pool = SqlitePoolOptions::new()
                    .max_connections(1)
                    .connect(&connection_url)
                    .await
                    .map_err(|e| Error::config(format!("Failed to connect to sqlite: {}", e)))?;

                Ok(SqlPool::Sqlite(pool))
            }
            SqlDriver::Postgres => {
                let pool = PgPoolOptions::new()
                    .max_connections(5)
                    .connect(connection)
                    .await
                    .map_err(|e| Error::config(format!("Failed to connect to postgres: {}", e)))?;

                Ok(SqlPool::Postgres(pool))
            }
        }
    }
}