1use std::time::Duration;
2
3use coil_config::{DatabaseConfig, SecretRef};
4
5use crate::{
6 CompiledMigrationBatch, CompiledQuery, CompiledStatement, CompiledTransaction, DataModelError,
7 DataValue, MigrationRegistry, MutationSpec, PostgresDataClient, QuerySpec, RepositorySpec,
8 TransactionPlan, quote_identifier, require_non_empty,
9};
10
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct ConnectionPoolProfile {
13 pub min_connections: u16,
14 pub max_connections: u16,
15 pub statement_timeout: Duration,
16}
17
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub struct DataRuntime {
20 pub driver: coil_config::DatabaseDriver,
21 pub connection_secret_ref: Option<SecretRef>,
22 pub connection_secret: Option<String>,
23 pub schema: String,
24 pub migrations_table: String,
25 pub pool: ConnectionPoolProfile,
26}
27
28impl DataRuntime {
29 pub fn from_config(config: &DatabaseConfig) -> Result<Self, DataModelError> {
30 if config.max_connections == 0 || config.min_connections > config.max_connections {
31 return Err(DataModelError::InvalidPoolSizing {
32 min_connections: config.min_connections,
33 max_connections: config.max_connections,
34 });
35 }
36
37 if config.statement_timeout_secs == 0 {
38 return Err(DataModelError::InvalidStatementTimeout);
39 }
40
41 Ok(Self {
42 driver: config.driver,
43 connection_secret_ref: config.url.clone(),
44 connection_secret: config.url.as_ref().map(|secret| secret.redacted()),
45 schema: require_non_empty("database_schema", config.schema.clone())?,
46 migrations_table: require_non_empty(
47 "database_migrations_table",
48 config.migrations_table.clone(),
49 )?,
50 pool: ConnectionPoolProfile {
51 min_connections: config.min_connections,
52 max_connections: config.max_connections,
53 statement_timeout: Duration::from_secs(config.statement_timeout_secs),
54 },
55 })
56 }
57
58 pub fn resolve_connection_url(&self) -> Result<String, DataModelError> {
59 match self.connection_secret_ref.as_ref() {
60 Some(SecretRef::Env { var }) => std::env::var(var)
61 .map_err(|_| DataModelError::MissingConnectionSecretEnv { var: var.clone() }),
62 Some(secret_ref) => Err(DataModelError::UnsupportedSecretRef {
63 secret_ref: secret_ref.redacted(),
64 }),
65 None => self
66 .connection_secret
67 .clone()
68 .ok_or(DataModelError::MissingConnectionSecret),
69 }
70 }
71
72 pub fn with_resolved_connection_url(&self, connection_url: impl Into<String>) -> Self {
73 let mut runtime = self.clone();
74 runtime.connection_secret_ref = None;
75 runtime.connection_secret = Some(connection_url.into());
76 runtime
77 }
78
79 pub fn connect_lazy_postgres(&self) -> Result<PostgresDataClient, DataModelError> {
80 PostgresDataClient::connect_lazy(self)
81 }
82
83 pub fn compile_query(
84 &self,
85 repository: &RepositorySpec,
86 spec: &QuerySpec,
87 ) -> Result<CompiledQuery, DataModelError> {
88 repository.compile_query(spec)
89 }
90
91 pub fn compile_transaction(
92 &self,
93 plan: &TransactionPlan,
94 mutations: &[MutationSpec],
95 ) -> Result<CompiledTransaction, DataModelError> {
96 if plan.writes.len() != mutations.len() {
97 return Err(DataModelError::TransactionWriteCountMismatch {
98 expected: plan.writes.len(),
99 actual: mutations.len(),
100 });
101 }
102
103 let mut statements = Vec::new();
104 let mut bind_index = 1;
105 for mutation in mutations {
106 let compiled = mutation.compile(bind_index)?;
107 bind_index += compiled.bind_values.len();
108 statements.push(compiled);
109 }
110
111 let jobs_table = quote_identifier(&format!("{}.job_outbox", self.schema));
112 for job in &plan.after_commit_jobs {
113 statements.push(CompiledStatement {
114 sql: format!(
115 "INSERT INTO {jobs_table} (transaction_name, job_name) VALUES ($1, $2)"
116 ),
117 bind_values: vec![
118 DataValue::String(plan.name.clone()),
119 DataValue::String(job.clone()),
120 ],
121 });
122 }
123
124 let events_table = quote_identifier(&format!("{}.event_outbox", self.schema));
125 for event in &plan.after_commit_events {
126 statements.push(CompiledStatement {
127 sql: format!(
128 "INSERT INTO {events_table} (transaction_name, event_name) VALUES ($1, $2)"
129 ),
130 bind_values: vec![
131 DataValue::String(plan.name.clone()),
132 DataValue::String(event.clone()),
133 ],
134 });
135 }
136
137 Ok(CompiledTransaction {
138 begin_sql: "BEGIN".to_string(),
139 commit_sql: "COMMIT".to_string(),
140 statements: std::iter::once(CompiledStatement {
141 sql: format!("SET TRANSACTION ISOLATION LEVEL {}", plan.isolation),
142 bind_values: Vec::new(),
143 })
144 .chain(statements)
145 .collect(),
146 })
147 }
148
149 pub fn compile_migrations(
150 &self,
151 registry: &MigrationRegistry,
152 ) -> Result<CompiledMigrationBatch, DataModelError> {
153 registry.compile_apply_batch(self)
154 }
155}