libsalmo/backend/
pg.rs

1use std::{str::FromStr, collections::HashMap};
2
3use anyhow::bail;
4use log::{warn, debug};
5use postgres::{Config, NoTls, Client, GenericClient, Transaction};
6use::anyhow::anyhow;
7use semver::Version;
8use crate::{config::ConnectionInfo, migration_data::{migrations::Migration, committed::CommittedFile}};
9
10use super::{DatabaseBackend, MigrationWithStatus, MigrationStatus};
11
12
13#[derive(Debug)]
14pub struct PgBackend {
15  config: postgres::Config,
16  meta_tables_schema: String
17}
18
19struct TryRecord {
20  // migration_id: String,
21  hash: String
22}
23
24impl PgBackend {
25    pub fn new(connection_info: &ConnectionInfo, schema_name: &Option<String>) -> anyhow::Result<Self> {
26      let config = match connection_info {
27        ConnectionInfo::Url(u) => Config::from_str(u.as_ref().ok_or_else(|| anyhow!("connection string url is blank"))?)?,
28        ConnectionInfo::Params {
29          user,
30          password,
31          dbname,
32          options,
33          host,
34          port
35        } => {
36          let mut c = postgres::Config::new();
37          if let Some(u) = user { c.user(u); } else { c.user(&whoami::username()); }
38          if let Some(pw) = password { c.password(pw); }
39          if let Some(db) = dbname { c.dbname(db); }
40          if let Some(opt) = options { c.options(opt); }
41          if let Some(h) = host { c.host(h); } else { c.host("localhost"); }
42          if let Some(p) = port { c.port(p.parse()?); }
43
44          c
45        },
46      };
47      let instance = Self { config, meta_tables_schema: schema_name.clone().unwrap_or_else(||"public".into())};
48      instance.setup()?;
49      Ok(instance)
50    }
51
52    fn connect(&self) -> anyhow::Result<Client> {
53      Ok(self.config.connect(NoTls)?)
54    }
55
56    fn config_table(&self) -> String {
57      format!("{}.salmo_meta", self.meta_tables_schema)
58    }
59
60    fn tries_table(&self) -> String {
61      format!("{}.salmo_tried_migrations", self.meta_tables_schema)
62    }
63
64    fn executions_table(&self) -> String {
65      format!("{}.salmo_executed_migrations", self.meta_tables_schema)
66    }
67
68    fn setup(&self) -> anyhow::Result<()> {
69      let mut client = self.connect()?;
70      let mut t = client.transaction()?;
71      let config_table_name = self.config_table();
72      let try_table_name = self.tries_table();
73      let exe_table_name = self.executions_table();
74
75      t.execute(&format!("CREATE SCHEMA IF NOT EXISTS {};", self.meta_tables_schema), &[])?;
76
77      t.execute(&format!("
78      CREATE TABLE IF NOT EXISTS {config_table_name} (
79        id int PRIMARY KEY,
80        version text
81      )"), &[])?;
82      let v: Option<String> = t.query_opt(&format!("SELECT version FROM {config_table_name} WHERE id = 0"), &[])?.map(|r| r.get(0));
83      let app_version = META_MIGRATIONS[META_MIGRATIONS.len() - 1].0;
84      if let Some(db_version) = &v {
85        if Version::parse(db_version)? > Version::parse(app_version)? {
86          bail!("database is managed using a later version of salmo. This schema version: {}, db schema version: {}", app_version, db_version)
87        }
88      }
89
90      let meta_migration_index = v.and_then(|version|{
91        META_MIGRATIONS.iter().position(|m| m.0 == version).map(|p| p + 1)
92      }).unwrap_or(0);
93      for (m_version, m_text) in META_MIGRATIONS[meta_migration_index..].iter() {
94        let processed_m = m_text
95          .replace("__meta_table__", &config_table_name)
96          .replace("__tries_table__", &try_table_name)
97          .replace("__executions_table__", &exe_table_name);
98        debug!("Executing meta migration: {m_version}");
99        t.batch_execute(&processed_m)?;
100      }
101      t.execute(
102        &format!("INSERT INTO {config_table_name} (id, version) VALUES (0, $1) ON CONFLICT (id) DO UPDATE SET version = EXCLUDED.version"),
103        &[&app_version])?;
104
105      t.commit()?;
106      Ok(())
107    }
108
109
110    fn get_status(&self, client: &mut impl GenericClient, commits_file: &CommittedFile, migrations: &[Migration]) -> anyhow::Result<Vec<MigrationWithStatus>> {
111      let migration_ids = migrations.iter().map(|m| m.id.as_str()).collect::<Vec<_>>();
112      let tries = get_tries(client, &self.tries_table(), &migration_ids)?;
113      let executions = get_executions(client, &self.executions_table(), &migration_ids)?;
114      let commits = commits_file.commits_hash();
115
116      migrations.iter().map(|m| {
117        let status = match (tries.get(&m.id), commits.get(&m.id), executions.get(&m.id)) {
118            (None, None, None) => MigrationStatus::Untried,
119            (Some(t), None, None) => MigrationStatus::Tried { up_to_date: m.migrate_hash()? == t.hash },
120            (None, Some(_), None) => MigrationStatus::Committed {tried: false},
121            (Some(_), Some(_), None) => MigrationStatus::Committed {tried: true},
122            (None, Some(_), Some(_)) => MigrationStatus::Executed,
123            (t, c, e) => panic!("invalid migration: {}; tried={}, committed={}, executed={}", m.id, t.is_some(), c.is_some(), e.is_some()),
124        };
125
126        Ok(MigrationWithStatus {
127            migration: m.clone(),
128            status,
129        })
130      }).collect::<anyhow::Result<_>>()
131    }
132
133    fn try_m(&self, client: &mut Transaction, m: &Migration) -> anyhow::Result<()> {
134      self.run_migration_script(client, &m.migrate_sql()?)?;
135      let q = format!("INSERT INTO {} (id, hash) VALUES ($1, $2)", self.tries_table());
136      client.execute(&q, &[&m.id, &m.migrate_hash()?])?;
137      Ok(())
138    }
139
140    fn untry_m(&self, client: &mut Transaction, m: &Migration, run_script: bool) -> anyhow::Result<()> {
141      if run_script {
142        self.run_migration_script(client, &m.revert_sql()?)?;
143      }
144      let q = format!("DELETE FROM {} WHERE id = $1", self.tries_table());
145      client.execute(&q, &[&m.id])?;
146      Ok(())
147    }
148
149    fn execute_m(&self, client: &mut Transaction, m: &Migration, run_migration: bool) -> anyhow::Result<()> {
150      if run_migration {
151        self.run_migration_script(client, &m.migrate_sql()?)?;
152      }
153
154      let q = format!("INSERT INTO {} (id) VALUES ($1)", self.executions_table());
155      client.execute(&q, &[&m.id])?;
156      Ok(())
157    }
158
159    fn run_migration_script(&self, client: &mut Transaction, script: &str) -> anyhow::Result<()> {
160      client.batch_execute(script)?;
161      Ok(())
162    }
163
164}
165
166impl DatabaseBackend for PgBackend {
167    fn try_migrations(&mut self, migrations: &[MigrationWithStatus]) -> anyhow::Result<()> {
168      let mut client = self.connect()?;
169      let mut t = client.transaction()?;
170      for m in migrations {
171        match m.status {
172            MigrationStatus::Untried => {
173              self.try_m(&mut t, &m.migration)?
174            },
175            MigrationStatus::Tried { up_to_date } => {
176              warn!("migration {} has already been tried, skipping (it is{} up to date)", m.migration.id, if up_to_date { "" } else { " not" })
177            },
178            MigrationStatus::Committed {tried: _} => bail!("Cannot try {} it is already committed", m.migration.id),
179            MigrationStatus::Executed => bail!("Cannot try {} it is already committed and executed", m.migration.id),
180        }
181      }
182      t.commit()?;
183      Ok(())
184    }
185
186    fn untry_migrations(&mut self, migrations: &[MigrationWithStatus], execute_revert_script: bool) -> anyhow::Result<()> {
187      let mut client = self.connect()?;
188      let mut t = client.transaction()?;
189      for m in migrations {
190        match m.status {
191            MigrationStatus::Untried => {
192              warn!("migration {} has not been tried, skipping", m.migration.id)
193            },
194            MigrationStatus::Tried { up_to_date: _  } => {
195              self.untry_m(&mut t, &m.migration, execute_revert_script)?
196            },
197            MigrationStatus::Committed {tried: _} => bail!("Cannot untry {} it is already committed", m.migration.id),
198            MigrationStatus::Executed => bail!("Cannot untry {} it is already committed and executed", m.migration.id),
199        }
200      }
201      t.commit()?;
202      Ok(())
203    }
204
205    fn migration_status(&mut self, commits_file: &CommittedFile, migrations: &[Migration]) -> anyhow::Result<Vec<MigrationWithStatus>> {
206      let mut client = self.connect()?;
207      self.get_status(&mut client, commits_file, migrations)
208    }
209
210    fn execute_migrations(&mut self, migrations: &[MigrationWithStatus]) -> anyhow::Result<()> {
211      let mut client = self.connect()?;
212      let mut t = client.transaction()?;
213      for m in migrations {
214        match m.status {
215            MigrationStatus::Untried => {
216              warn!("migration {} has not been tried, skipping", m.migration.id)
217            },
218            MigrationStatus::Tried { up_to_date: _ } => {
219              warn!("migration {} has not been committed, skipping", m.migration.id)
220            },
221            MigrationStatus::Committed {tried} => {
222              debug!("executing {}; tried={:?}", m.migration.id, tried);
223              self.execute_m(&mut t, &m.migration, !tried)?
224            },
225            MigrationStatus::Executed => {}, // do nothing for these
226        }
227      }
228      t.commit()?;
229      Ok(())
230    }
231}
232
233
234fn get_tries(client: &mut impl GenericClient, table_name: &str, migration_ids: &[&str]) -> anyhow::Result<HashMap<String, TryRecord>> {
235  let tries_query = format!("SELECT id, hash FROM {} WHERE id = ANY ($1)", table_name);
236  let tries = client.query(&tries_query, &[&migration_ids])?
237        .into_iter()
238        .map(|r| {
239          let migration_id: String = r.get(0);
240          (migration_id, TryRecord { hash: r.get(1) })
241        })
242        .collect();
243  Ok(tries)
244}
245
246fn get_executions(client: &mut impl GenericClient, table_name: &str, migration_ids: &[&str]) -> anyhow::Result<HashMap<String, String>> {
247  let tries_query = format!("SELECT id FROM {} WHERE id = ANY ($1)", table_name);
248  let tries = client.query(&tries_query, &[&migration_ids])?
249        .into_iter()
250        .map(|r| {
251          let migration_id: String = r.get(0);
252          (migration_id.clone(), migration_id)
253        })
254        .collect();
255  Ok(tries)
256}
257
258const META_MIGRATIONS: &[(&str, &str)] = &[
259  ("0.1.0", "
260    CREATE TABLE IF NOT EXISTS __tries_table__ (
261      id TEXT PRIMARY KEY,
262      hash TEXT,
263      tried_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
264    );
265    CREATE TABLE IF NOT EXISTS __executions_table__ (
266      id TEXT PRIMARY KEY,
267      committed_index integer
268    );
269  "), // placeholder for initial
270  ("0.2.0", "ALTER TABLE __executions_table__ DROP COLUMN IF EXISTS committed_index")
271];