Skip to main content

coil_data/
runtime.rs

1use std::time::Duration;
2
3use coil_config::{DatabaseConfig, SecretRef};
4
5use crate::{
6    CompiledMigrationBatch, CompiledQuery, CompiledStatement, CompiledTransaction, DataModelError,
7    DataValue, MigrationRegistry, MutationSpec, PostgresDataClient, QuerySpec, RepositorySpec,
8    TransactionPlan, quote_identifier, require_non_empty,
9};
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct ConnectionPoolProfile {
13    pub min_connections: u16,
14    pub max_connections: u16,
15    pub statement_timeout: Duration,
16}
17
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub struct DataRuntime {
20    pub driver: coil_config::DatabaseDriver,
21    pub connection_secret_ref: Option<SecretRef>,
22    pub connection_secret: Option<String>,
23    pub schema: String,
24    pub migrations_table: String,
25    pub pool: ConnectionPoolProfile,
26}
27
28impl DataRuntime {
29    pub fn from_config(config: &DatabaseConfig) -> Result<Self, DataModelError> {
30        if config.max_connections == 0 || config.min_connections > config.max_connections {
31            return Err(DataModelError::InvalidPoolSizing {
32                min_connections: config.min_connections,
33                max_connections: config.max_connections,
34            });
35        }
36
37        if config.statement_timeout_secs == 0 {
38            return Err(DataModelError::InvalidStatementTimeout);
39        }
40
41        Ok(Self {
42            driver: config.driver,
43            connection_secret_ref: config.url.clone(),
44            connection_secret: config.url.as_ref().map(|secret| secret.redacted()),
45            schema: require_non_empty("database_schema", config.schema.clone())?,
46            migrations_table: require_non_empty(
47                "database_migrations_table",
48                config.migrations_table.clone(),
49            )?,
50            pool: ConnectionPoolProfile {
51                min_connections: config.min_connections,
52                max_connections: config.max_connections,
53                statement_timeout: Duration::from_secs(config.statement_timeout_secs),
54            },
55        })
56    }
57
58    pub fn resolve_connection_url(&self) -> Result<String, DataModelError> {
59        match self.connection_secret_ref.as_ref() {
60            Some(SecretRef::Env { var }) => std::env::var(var)
61                .map_err(|_| DataModelError::MissingConnectionSecretEnv { var: var.clone() }),
62            Some(secret_ref) => Err(DataModelError::UnsupportedSecretRef {
63                secret_ref: secret_ref.redacted(),
64            }),
65            None => self
66                .connection_secret
67                .clone()
68                .ok_or(DataModelError::MissingConnectionSecret),
69        }
70    }
71
72    pub fn with_resolved_connection_url(&self, connection_url: impl Into<String>) -> Self {
73        let mut runtime = self.clone();
74        runtime.connection_secret_ref = None;
75        runtime.connection_secret = Some(connection_url.into());
76        runtime
77    }
78
79    pub fn connect_lazy_postgres(&self) -> Result<PostgresDataClient, DataModelError> {
80        PostgresDataClient::connect_lazy(self)
81    }
82
83    pub fn compile_query(
84        &self,
85        repository: &RepositorySpec,
86        spec: &QuerySpec,
87    ) -> Result<CompiledQuery, DataModelError> {
88        repository.compile_query(spec)
89    }
90
91    pub fn compile_transaction(
92        &self,
93        plan: &TransactionPlan,
94        mutations: &[MutationSpec],
95    ) -> Result<CompiledTransaction, DataModelError> {
96        if plan.writes.len() != mutations.len() {
97            return Err(DataModelError::TransactionWriteCountMismatch {
98                expected: plan.writes.len(),
99                actual: mutations.len(),
100            });
101        }
102
103        let mut statements = Vec::new();
104        let mut bind_index = 1;
105        for mutation in mutations {
106            let compiled = mutation.compile(bind_index)?;
107            bind_index += compiled.bind_values.len();
108            statements.push(compiled);
109        }
110
111        let jobs_table = quote_identifier(&format!("{}.job_outbox", self.schema));
112        for job in &plan.after_commit_jobs {
113            statements.push(CompiledStatement {
114                sql: format!(
115                    "INSERT INTO {jobs_table} (transaction_name, job_name) VALUES ($1, $2)"
116                ),
117                bind_values: vec![
118                    DataValue::String(plan.name.clone()),
119                    DataValue::String(job.clone()),
120                ],
121            });
122        }
123
124        let events_table = quote_identifier(&format!("{}.event_outbox", self.schema));
125        for event in &plan.after_commit_events {
126            statements.push(CompiledStatement {
127                sql: format!(
128                    "INSERT INTO {events_table} (transaction_name, event_name) VALUES ($1, $2)"
129                ),
130                bind_values: vec![
131                    DataValue::String(plan.name.clone()),
132                    DataValue::String(event.clone()),
133                ],
134            });
135        }
136
137        Ok(CompiledTransaction {
138            begin_sql: "BEGIN".to_string(),
139            commit_sql: "COMMIT".to_string(),
140            statements: std::iter::once(CompiledStatement {
141                sql: format!("SET TRANSACTION ISOLATION LEVEL {}", plan.isolation),
142                bind_values: Vec::new(),
143            })
144            .chain(statements)
145            .collect(),
146        })
147    }
148
149    pub fn compile_migrations(
150        &self,
151        registry: &MigrationRegistry,
152    ) -> Result<CompiledMigrationBatch, DataModelError> {
153        registry.compile_apply_batch(self)
154    }
155}