midas_core/sequel/
postgres.rs

1use anyhow::Context;
2use indoc::indoc;
3use postgres::tls::{
4  MakeTlsConnect,
5  TlsConnect,
6};
7use postgres::{
8  Client,
9  NoTls,
10  Socket,
11};
12use url::Url;
13
14use super::{
15  AnyhowResult,
16  Driver as SequelDriver,
17  VecSerial,
18};
19
20/// The Postgres struct definition
21pub struct Postgres {
22  /// The Postgres client
23  client: Client,
24  /// The database name
25  database_name: String,
26}
27
28/// Implement the Postgres struct
29impl Postgres {
30  /// Create a new instance of Postgres
31  pub fn new(database_url: &str) -> AnyhowResult<Self> {
32    Self::new_tls(database_url, NoTls)
33  }
34
35  /// Create a new instance of Postgres with TLS
36  pub fn new_tls<T>(database_url: &str, tls_mode: T) -> AnyhowResult<Self>
37  where
38    T: MakeTlsConnect<Socket> + 'static + Send,
39    T::TlsConnect: Send,
40    T::Stream: Send,
41    <T::TlsConnect as TlsConnect<Socket>>::Future: Send,
42  {
43    // Get the database name from the URL
44    let url = Url::parse(database_url)?;
45    let database_name = url
46      .path_segments()
47      .and_then(|s| s.last())
48      .context("Database name not found")?;
49
50    // Open the connection
51    let client = Client::connect(url.as_str(), tls_mode)?;
52
53    // Create a new instance of Postgres
54    let mut db = Postgres {
55      client,
56      database_name: database_name.into(),
57    };
58
59    // Ensure the midas schema
60    db.ensure_midas_schema()?;
61    Ok(db)
62  }
63}
64
65impl SequelDriver for Postgres {
66  fn ensure_midas_schema(&mut self) -> AnyhowResult<()> {
67    self.client.execute("create schema if not exists midas", &[])?;
68    self.client.execute("grant all on schema midas to public", &[])?;
69    let payload = indoc! {"
70      create table if not exists midas.__schema_migrations (
71        id bigint generated by default as identity primary key,
72        migration bigint
73      )
74    "};
75    self.client.execute(payload, &[])?;
76    Ok(())
77  }
78
79  fn drop_migration_table(&mut self) -> AnyhowResult<()> {
80    let payload = "drop table midas.__schema_migrations";
81    self.client.execute(payload, &[])?;
82    Ok(())
83  }
84
85  fn drop_database(&mut self, db_name: &str) -> AnyhowResult<()> {
86    let payload = format! {"drop database if exists {db_name}"};
87    self.client.execute(&payload, &[])?;
88
89    let payload = format! {"create database {db_name}"};
90    self.client.execute(&payload, &[])?;
91    Ok(())
92  }
93
94  fn count_migrations(&mut self) -> AnyhowResult<i64> {
95    log::trace!("Retrieving migrations count");
96    let payload = "select count(*) as count from midas.__schema_migrations";
97    let row = self.client.query_one(payload, &[])?;
98    let result = row.get::<_, i64>(0);
99    Ok(result)
100  }
101
102  fn get_completed_migrations(&mut self) -> AnyhowResult<VecSerial> {
103    log::trace!("Retrieving all completed migrations");
104    let payload = "select migration from midas.__schema_migrations order by id asc";
105    let it = self.client.query(payload, &[])?;
106    let result = it.iter().map(|r| r.get("migration")).collect::<_>();
107    Ok(result)
108  }
109
110  fn get_last_completed_migration(&mut self) -> AnyhowResult<i64> {
111    log::trace!("Checking and retrieving the last migration stored on migrations table");
112    let payload = "select migration from midas.__schema_migrations order by id desc limit 1";
113    let result = self
114      .client
115      .query(payload, &[])
116      .with_context(|| "Failed to get last completed migration".to_string())?;
117
118    if result.is_empty() {
119      Ok(-1)
120    } else {
121      Ok(result[0].get(0))
122    }
123  }
124
125  fn add_completed_migration(&mut self, migration_number: i64) -> AnyhowResult<()> {
126    log::trace!("Adding migration to migrations table");
127    let payload = "insert into midas.__schema_migrations (migration) values ($1)";
128    self
129      .client
130      .execute(payload, &[&migration_number])
131      .with_context(|| "Failed to add completed migration".to_string())?;
132    Ok(())
133  }
134
135  fn delete_completed_migration(&mut self, migration_number: i64) -> AnyhowResult<()> {
136    log::trace!("Removing a migration in the migrations table");
137    let payload = "delete from midas.__schema_migrations where migration = $1";
138    self
139      .client
140      .execute(payload, &[&migration_number])
141      .with_context(|| "Failed to delete completed migration".to_string())?;
142    Ok(())
143  }
144
145  fn delete_last_completed_migration(&mut self) -> AnyhowResult<()> {
146    let payload =
147      "delete from midas.__schema_migrations where id=(select max(id) from midas.__schema_migrations);";
148    self
149      .client
150      .execute(payload, &[])
151      .with_context(|| "Failed to delete last completed migration".to_string())?;
152    Ok(())
153  }
154
155  fn migrate(&mut self, query: &str, migration_number: i64) -> AnyhowResult<()> {
156    self
157      .client
158      .simple_query(query)
159      .with_context(|| format!("Failed to execute migration - {migration_number}"))?;
160    Ok(())
161  }
162
163  fn db_name(&self) -> &str {
164    &self.database_name
165  }
166}