coil-data 0.1.1

Data access and persistence primitives for the Coil framework.
Documentation
use std::time::Duration;

use coil_config::{DatabaseConfig, SecretRef};

use crate::{
    CompiledMigrationBatch, CompiledQuery, CompiledStatement, CompiledTransaction, DataModelError,
    DataValue, MigrationRegistry, MutationSpec, PostgresDataClient, QuerySpec, RepositorySpec,
    TransactionPlan, quote_identifier, require_non_empty,
};

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ConnectionPoolProfile {
    pub min_connections: u16,
    pub max_connections: u16,
    pub statement_timeout: Duration,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DataRuntime {
    pub driver: coil_config::DatabaseDriver,
    pub connection_secret_ref: Option<SecretRef>,
    pub connection_secret: Option<String>,
    pub schema: String,
    pub migrations_table: String,
    pub pool: ConnectionPoolProfile,
}

impl DataRuntime {
    pub fn from_config(config: &DatabaseConfig) -> Result<Self, DataModelError> {
        if config.max_connections == 0 || config.min_connections > config.max_connections {
            return Err(DataModelError::InvalidPoolSizing {
                min_connections: config.min_connections,
                max_connections: config.max_connections,
            });
        }

        if config.statement_timeout_secs == 0 {
            return Err(DataModelError::InvalidStatementTimeout);
        }

        Ok(Self {
            driver: config.driver,
            connection_secret_ref: config.url.clone(),
            connection_secret: config.url.as_ref().map(|secret| secret.redacted()),
            schema: require_non_empty("database_schema", config.schema.clone())?,
            migrations_table: require_non_empty(
                "database_migrations_table",
                config.migrations_table.clone(),
            )?,
            pool: ConnectionPoolProfile {
                min_connections: config.min_connections,
                max_connections: config.max_connections,
                statement_timeout: Duration::from_secs(config.statement_timeout_secs),
            },
        })
    }

    pub fn resolve_connection_url(&self) -> Result<String, DataModelError> {
        match self.connection_secret_ref.as_ref() {
            Some(SecretRef::Env { var }) => std::env::var(var)
                .map_err(|_| DataModelError::MissingConnectionSecretEnv { var: var.clone() }),
            Some(secret_ref) => Err(DataModelError::UnsupportedSecretRef {
                secret_ref: secret_ref.redacted(),
            }),
            None => self
                .connection_secret
                .clone()
                .ok_or(DataModelError::MissingConnectionSecret),
        }
    }

    pub fn with_resolved_connection_url(&self, connection_url: impl Into<String>) -> Self {
        let mut runtime = self.clone();
        runtime.connection_secret_ref = None;
        runtime.connection_secret = Some(connection_url.into());
        runtime
    }

    pub fn connect_lazy_postgres(&self) -> Result<PostgresDataClient, DataModelError> {
        PostgresDataClient::connect_lazy(self)
    }

    pub fn compile_query(
        &self,
        repository: &RepositorySpec,
        spec: &QuerySpec,
    ) -> Result<CompiledQuery, DataModelError> {
        repository.compile_query(spec)
    }

    pub fn compile_transaction(
        &self,
        plan: &TransactionPlan,
        mutations: &[MutationSpec],
    ) -> Result<CompiledTransaction, DataModelError> {
        if plan.writes.len() != mutations.len() {
            return Err(DataModelError::TransactionWriteCountMismatch {
                expected: plan.writes.len(),
                actual: mutations.len(),
            });
        }

        let mut statements = Vec::new();
        let mut bind_index = 1;
        for mutation in mutations {
            let compiled = mutation.compile(bind_index)?;
            bind_index += compiled.bind_values.len();
            statements.push(compiled);
        }

        let jobs_table = quote_identifier(&format!("{}.job_outbox", self.schema));
        for job in &plan.after_commit_jobs {
            statements.push(CompiledStatement {
                sql: format!(
                    "INSERT INTO {jobs_table} (transaction_name, job_name) VALUES ($1, $2)"
                ),
                bind_values: vec![
                    DataValue::String(plan.name.clone()),
                    DataValue::String(job.clone()),
                ],
            });
        }

        let events_table = quote_identifier(&format!("{}.event_outbox", self.schema));
        for event in &plan.after_commit_events {
            statements.push(CompiledStatement {
                sql: format!(
                    "INSERT INTO {events_table} (transaction_name, event_name) VALUES ($1, $2)"
                ),
                bind_values: vec![
                    DataValue::String(plan.name.clone()),
                    DataValue::String(event.clone()),
                ],
            });
        }

        Ok(CompiledTransaction {
            begin_sql: "BEGIN".to_string(),
            commit_sql: "COMMIT".to_string(),
            statements: std::iter::once(CompiledStatement {
                sql: format!("SET TRANSACTION ISOLATION LEVEL {}", plan.isolation),
                bind_values: Vec::new(),
            })
            .chain(statements)
            .collect(),
        })
    }

    pub fn compile_migrations(
        &self,
        registry: &MigrationRegistry,
    ) -> Result<CompiledMigrationBatch, DataModelError> {
        registry.compile_apply_batch(self)
    }
}