midas_core/sequel/
postgres.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
use anyhow::Context;
use indoc::indoc;
use postgres::tls::{
  MakeTlsConnect,
  TlsConnect,
};
use postgres::{
  Client,
  NoTls,
  Socket,
};
use url::Url;

use super::{
  AnyhowResult,
  Driver as SequelDriver,
  VecSerial,
};

pub struct Postgres {
  client: Client,
  database_name: String,
}

impl Postgres {
  pub fn new(database_url: &str) -> AnyhowResult<Self> {
    Self::new_tls(database_url, NoTls)
  }

  pub fn new_tls<T>(database_url: &str, tls_mode: T) -> AnyhowResult<Self>
  where
    T: MakeTlsConnect<Socket> + 'static + Send,
    T::TlsConnect: Send,
    T::Stream: Send,
    <T::TlsConnect as TlsConnect<Socket>>::Future: Send,
  {
    let url = Url::parse(database_url)?;
    let database_name = url
      .path_segments()
      .and_then(|s| s.last())
      .context("Database name not found")?;

    let client = Client::connect(url.as_str(), tls_mode)?;

    let mut db = Postgres {
      client,
      database_name: database_name.into(),
    };
    db.ensure_midas_schema()?;
    Ok(db)
  }
}

impl SequelDriver for Postgres {
  fn ensure_midas_schema(&mut self) -> AnyhowResult<()> {
    self.client.execute("create schema if not exists midas", &[])?;
    self.client.execute("grant all on schema midas to public", &[])?;
    let payload = indoc! {"
      create table if not exists midas.__schema_migrations (
        id bigint generated by default as identity primary key,
        migration bigint
      )
    "};
    self.client.execute(payload, &[])?;
    Ok(())
  }

  fn drop_migration_table(&mut self) -> AnyhowResult<()> {
    let payload = "drop table midas.__schema_migrations";
    self.client.execute(payload, &[])?;
    Ok(())
  }

  fn drop_database(&mut self, db_name: &str) -> AnyhowResult<()> {
    let payload = format! {"drop database if exists {db_name}"};
    self.client.execute(&payload, &[])?;

    let payload = format! {"create database {db_name}"};
    self.client.execute(&payload, &[])?;
    Ok(())
  }

  fn count_migrations(&mut self) -> AnyhowResult<i64> {
    log::trace!("Retrieving migrations count");
    let payload = "select count(*) as count from midas.__schema_migrations";
    let row = self.client.query_one(payload, &[])?;
    let result = row.get::<_, i64>(0);
    Ok(result)
  }

  fn get_completed_migrations(&mut self) -> AnyhowResult<VecSerial> {
    log::trace!("Retrieving all completed migrations");
    let payload = "select migration from midas.__schema_migrations order by id asc";
    let it = self.client.query(payload, &[])?;
    let result = it.iter().map(|r| r.get("migration")).collect::<_>();
    Ok(result)
  }

  fn get_last_completed_migration(&mut self) -> AnyhowResult<i64> {
    log::trace!("Checking and retrieving the last migration stored on migrations table");
    let payload = "select migration from midas.__schema_migrations order by id desc limit 1";
    let result = self
      .client
      .query(payload, &[])
      .with_context(|| "Failed to get last completed migration".to_string())?;

    if result.is_empty() {
      Ok(-1)
    } else {
      Ok(result[0].get(0))
    }
  }

  fn add_completed_migration(&mut self, migration_number: i64) -> AnyhowResult<()> {
    log::trace!("Adding migration to migrations table");
    let payload = "insert into midas.__schema_migrations (migration) values ($1)";
    self
      .client
      .execute(payload, &[&migration_number])
      .with_context(|| "Failed to add completed migration".to_string())?;
    Ok(())
  }

  fn delete_completed_migration(&mut self, migration_number: i64) -> AnyhowResult<()> {
    log::trace!("Removing a migration in the migrations table");
    let payload = "delete from midas.__schema_migrations where migration = $1";
    self
      .client
      .execute(payload, &[&migration_number])
      .with_context(|| "Failed to delete completed migration".to_string())?;
    Ok(())
  }

  fn delete_last_completed_migration(&mut self) -> AnyhowResult<()> {
    let payload =
      "delete from midas.__schema_migrations where id=(select max(id) from midas.__schema_migrations);";
    self
      .client
      .execute(payload, &[])
      .with_context(|| "Failed to delete last completed migration".to_string())?;
    Ok(())
  }

  fn migrate(&mut self, query: &str, migration_number: i64) -> AnyhowResult<()> {
    self
      .client
      .simple_query(query)
      .with_context(|| format!("Failed to execute migration - {migration_number}"))?;
    Ok(())
  }

  fn db_name(&self) -> &str {
    &self.database_name
  }
}