1use std::{str::FromStr, collections::HashMap};
2
3use anyhow::bail;
4use log::{warn, debug};
5use postgres::{Config, NoTls, Client, GenericClient, Transaction};
6use::anyhow::anyhow;
7use semver::Version;
8use crate::{config::ConnectionInfo, migration_data::{migrations::Migration, committed::CommittedFile}};
9
10use super::{DatabaseBackend, MigrationWithStatus, MigrationStatus};
11
12
13#[derive(Debug)]
14pub struct PgBackend {
15 config: postgres::Config,
16 meta_tables_schema: String
17}
18
19struct TryRecord {
20 hash: String
22}
23
24impl PgBackend {
25 pub fn new(connection_info: &ConnectionInfo, schema_name: &Option<String>) -> anyhow::Result<Self> {
26 let config = match connection_info {
27 ConnectionInfo::Url(u) => Config::from_str(u.as_ref().ok_or_else(|| anyhow!("connection string url is blank"))?)?,
28 ConnectionInfo::Params {
29 user,
30 password,
31 dbname,
32 options,
33 host,
34 port
35 } => {
36 let mut c = postgres::Config::new();
37 if let Some(u) = user { c.user(u); } else { c.user(&whoami::username()); }
38 if let Some(pw) = password { c.password(pw); }
39 if let Some(db) = dbname { c.dbname(db); }
40 if let Some(opt) = options { c.options(opt); }
41 if let Some(h) = host { c.host(h); } else { c.host("localhost"); }
42 if let Some(p) = port { c.port(p.parse()?); }
43
44 c
45 },
46 };
47 let instance = Self { config, meta_tables_schema: schema_name.clone().unwrap_or_else(||"public".into())};
48 instance.setup()?;
49 Ok(instance)
50 }
51
52 fn connect(&self) -> anyhow::Result<Client> {
53 Ok(self.config.connect(NoTls)?)
54 }
55
56 fn config_table(&self) -> String {
57 format!("{}.salmo_meta", self.meta_tables_schema)
58 }
59
60 fn tries_table(&self) -> String {
61 format!("{}.salmo_tried_migrations", self.meta_tables_schema)
62 }
63
64 fn executions_table(&self) -> String {
65 format!("{}.salmo_executed_migrations", self.meta_tables_schema)
66 }
67
68 fn setup(&self) -> anyhow::Result<()> {
69 let mut client = self.connect()?;
70 let mut t = client.transaction()?;
71 let config_table_name = self.config_table();
72 let try_table_name = self.tries_table();
73 let exe_table_name = self.executions_table();
74
75 t.execute(&format!("CREATE SCHEMA IF NOT EXISTS {};", self.meta_tables_schema), &[])?;
76
77 t.execute(&format!("
78 CREATE TABLE IF NOT EXISTS {config_table_name} (
79 id int PRIMARY KEY,
80 version text
81 )"), &[])?;
82 let v: Option<String> = t.query_opt(&format!("SELECT version FROM {config_table_name} WHERE id = 0"), &[])?.map(|r| r.get(0));
83 let app_version = META_MIGRATIONS[META_MIGRATIONS.len() - 1].0;
84 if let Some(db_version) = &v {
85 if Version::parse(db_version)? > Version::parse(app_version)? {
86 bail!("database is managed using a later version of salmo. This schema version: {}, db schema version: {}", app_version, db_version)
87 }
88 }
89
90 let meta_migration_index = v.and_then(|version|{
91 META_MIGRATIONS.iter().position(|m| m.0 == version).map(|p| p + 1)
92 }).unwrap_or(0);
93 for (m_version, m_text) in META_MIGRATIONS[meta_migration_index..].iter() {
94 let processed_m = m_text
95 .replace("__meta_table__", &config_table_name)
96 .replace("__tries_table__", &try_table_name)
97 .replace("__executions_table__", &exe_table_name);
98 debug!("Executing meta migration: {m_version}");
99 t.batch_execute(&processed_m)?;
100 }
101 t.execute(
102 &format!("INSERT INTO {config_table_name} (id, version) VALUES (0, $1) ON CONFLICT (id) DO UPDATE SET version = EXCLUDED.version"),
103 &[&app_version])?;
104
105 t.commit()?;
106 Ok(())
107 }
108
109
110 fn get_status(&self, client: &mut impl GenericClient, commits_file: &CommittedFile, migrations: &[Migration]) -> anyhow::Result<Vec<MigrationWithStatus>> {
111 let migration_ids = migrations.iter().map(|m| m.id.as_str()).collect::<Vec<_>>();
112 let tries = get_tries(client, &self.tries_table(), &migration_ids)?;
113 let executions = get_executions(client, &self.executions_table(), &migration_ids)?;
114 let commits = commits_file.commits_hash();
115
116 migrations.iter().map(|m| {
117 let status = match (tries.get(&m.id), commits.get(&m.id), executions.get(&m.id)) {
118 (None, None, None) => MigrationStatus::Untried,
119 (Some(t), None, None) => MigrationStatus::Tried { up_to_date: m.migrate_hash()? == t.hash },
120 (None, Some(_), None) => MigrationStatus::Committed {tried: false},
121 (Some(_), Some(_), None) => MigrationStatus::Committed {tried: true},
122 (None, Some(_), Some(_)) => MigrationStatus::Executed,
123 (t, c, e) => panic!("invalid migration: {}; tried={}, committed={}, executed={}", m.id, t.is_some(), c.is_some(), e.is_some()),
124 };
125
126 Ok(MigrationWithStatus {
127 migration: m.clone(),
128 status,
129 })
130 }).collect::<anyhow::Result<_>>()
131 }
132
133 fn try_m(&self, client: &mut Transaction, m: &Migration) -> anyhow::Result<()> {
134 self.run_migration_script(client, &m.migrate_sql()?)?;
135 let q = format!("INSERT INTO {} (id, hash) VALUES ($1, $2)", self.tries_table());
136 client.execute(&q, &[&m.id, &m.migrate_hash()?])?;
137 Ok(())
138 }
139
140 fn untry_m(&self, client: &mut Transaction, m: &Migration, run_script: bool) -> anyhow::Result<()> {
141 if run_script {
142 self.run_migration_script(client, &m.revert_sql()?)?;
143 }
144 let q = format!("DELETE FROM {} WHERE id = $1", self.tries_table());
145 client.execute(&q, &[&m.id])?;
146 Ok(())
147 }
148
149 fn execute_m(&self, client: &mut Transaction, m: &Migration, run_migration: bool) -> anyhow::Result<()> {
150 if run_migration {
151 self.run_migration_script(client, &m.migrate_sql()?)?;
152 }
153
154 let q = format!("INSERT INTO {} (id) VALUES ($1)", self.executions_table());
155 client.execute(&q, &[&m.id])?;
156 Ok(())
157 }
158
159 fn run_migration_script(&self, client: &mut Transaction, script: &str) -> anyhow::Result<()> {
160 client.batch_execute(script)?;
161 Ok(())
162 }
163
164}
165
166impl DatabaseBackend for PgBackend {
167 fn try_migrations(&mut self, migrations: &[MigrationWithStatus]) -> anyhow::Result<()> {
168 let mut client = self.connect()?;
169 let mut t = client.transaction()?;
170 for m in migrations {
171 match m.status {
172 MigrationStatus::Untried => {
173 self.try_m(&mut t, &m.migration)?
174 },
175 MigrationStatus::Tried { up_to_date } => {
176 warn!("migration {} has already been tried, skipping (it is{} up to date)", m.migration.id, if up_to_date { "" } else { " not" })
177 },
178 MigrationStatus::Committed {tried: _} => bail!("Cannot try {} it is already committed", m.migration.id),
179 MigrationStatus::Executed => bail!("Cannot try {} it is already committed and executed", m.migration.id),
180 }
181 }
182 t.commit()?;
183 Ok(())
184 }
185
186 fn untry_migrations(&mut self, migrations: &[MigrationWithStatus], execute_revert_script: bool) -> anyhow::Result<()> {
187 let mut client = self.connect()?;
188 let mut t = client.transaction()?;
189 for m in migrations {
190 match m.status {
191 MigrationStatus::Untried => {
192 warn!("migration {} has not been tried, skipping", m.migration.id)
193 },
194 MigrationStatus::Tried { up_to_date: _ } => {
195 self.untry_m(&mut t, &m.migration, execute_revert_script)?
196 },
197 MigrationStatus::Committed {tried: _} => bail!("Cannot untry {} it is already committed", m.migration.id),
198 MigrationStatus::Executed => bail!("Cannot untry {} it is already committed and executed", m.migration.id),
199 }
200 }
201 t.commit()?;
202 Ok(())
203 }
204
205 fn migration_status(&mut self, commits_file: &CommittedFile, migrations: &[Migration]) -> anyhow::Result<Vec<MigrationWithStatus>> {
206 let mut client = self.connect()?;
207 self.get_status(&mut client, commits_file, migrations)
208 }
209
210 fn execute_migrations(&mut self, migrations: &[MigrationWithStatus]) -> anyhow::Result<()> {
211 let mut client = self.connect()?;
212 let mut t = client.transaction()?;
213 for m in migrations {
214 match m.status {
215 MigrationStatus::Untried => {
216 warn!("migration {} has not been tried, skipping", m.migration.id)
217 },
218 MigrationStatus::Tried { up_to_date: _ } => {
219 warn!("migration {} has not been committed, skipping", m.migration.id)
220 },
221 MigrationStatus::Committed {tried} => {
222 debug!("executing {}; tried={:?}", m.migration.id, tried);
223 self.execute_m(&mut t, &m.migration, !tried)?
224 },
225 MigrationStatus::Executed => {}, }
227 }
228 t.commit()?;
229 Ok(())
230 }
231}
232
233
234fn get_tries(client: &mut impl GenericClient, table_name: &str, migration_ids: &[&str]) -> anyhow::Result<HashMap<String, TryRecord>> {
235 let tries_query = format!("SELECT id, hash FROM {} WHERE id = ANY ($1)", table_name);
236 let tries = client.query(&tries_query, &[&migration_ids])?
237 .into_iter()
238 .map(|r| {
239 let migration_id: String = r.get(0);
240 (migration_id, TryRecord { hash: r.get(1) })
241 })
242 .collect();
243 Ok(tries)
244}
245
246fn get_executions(client: &mut impl GenericClient, table_name: &str, migration_ids: &[&str]) -> anyhow::Result<HashMap<String, String>> {
247 let tries_query = format!("SELECT id FROM {} WHERE id = ANY ($1)", table_name);
248 let tries = client.query(&tries_query, &[&migration_ids])?
249 .into_iter()
250 .map(|r| {
251 let migration_id: String = r.get(0);
252 (migration_id.clone(), migration_id)
253 })
254 .collect();
255 Ok(tries)
256}
257
258const META_MIGRATIONS: &[(&str, &str)] = &[
259 ("0.1.0", "
260 CREATE TABLE IF NOT EXISTS __tries_table__ (
261 id TEXT PRIMARY KEY,
262 hash TEXT,
263 tried_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
264 );
265 CREATE TABLE IF NOT EXISTS __executions_table__ (
266 id TEXT PRIMARY KEY,
267 committed_index integer
268 );
269 "), ("0.2.0", "ALTER TABLE __executions_table__ DROP COLUMN IF EXISTS committed_index")
271];