midas_core/sequel/
postgres.rs1use anyhow::Context;
2use indoc::indoc;
3use postgres::tls::{
4 MakeTlsConnect,
5 TlsConnect,
6};
7use postgres::{
8 Client,
9 NoTls,
10 Socket,
11};
12use url::Url;
13
14use super::{
15 AnyhowResult,
16 Driver as SequelDriver,
17 VecSerial,
18};
19
20pub struct Postgres {
22 client: Client,
24 database_name: String,
26}
27
28impl Postgres {
30 pub fn new(database_url: &str) -> AnyhowResult<Self> {
32 Self::new_tls(database_url, NoTls)
33 }
34
35 pub fn new_tls<T>(database_url: &str, tls_mode: T) -> AnyhowResult<Self>
37 where
38 T: MakeTlsConnect<Socket> + 'static + Send,
39 T::TlsConnect: Send,
40 T::Stream: Send,
41 <T::TlsConnect as TlsConnect<Socket>>::Future: Send,
42 {
43 let url = Url::parse(database_url)?;
45 let database_name = url
46 .path_segments()
47 .and_then(|s| s.last())
48 .context("Database name not found")?;
49
50 let client = Client::connect(url.as_str(), tls_mode)?;
52
53 let mut db = Postgres {
55 client,
56 database_name: database_name.into(),
57 };
58
59 db.ensure_midas_schema()?;
61 Ok(db)
62 }
63}
64
65impl SequelDriver for Postgres {
66 fn ensure_midas_schema(&mut self) -> AnyhowResult<()> {
67 self.client.execute("create schema if not exists midas", &[])?;
68 self.client.execute("grant all on schema midas to public", &[])?;
69 let payload = indoc! {"
70 create table if not exists midas.__schema_migrations (
71 id bigint generated by default as identity primary key,
72 migration bigint
73 )
74 "};
75 self.client.execute(payload, &[])?;
76 Ok(())
77 }
78
79 fn drop_migration_table(&mut self) -> AnyhowResult<()> {
80 let payload = "drop table midas.__schema_migrations";
81 self.client.execute(payload, &[])?;
82 Ok(())
83 }
84
85 fn drop_database(&mut self, db_name: &str) -> AnyhowResult<()> {
86 let payload = format! {"drop database if exists {db_name}"};
87 self.client.execute(&payload, &[])?;
88
89 let payload = format! {"create database {db_name}"};
90 self.client.execute(&payload, &[])?;
91 Ok(())
92 }
93
94 fn count_migrations(&mut self) -> AnyhowResult<i64> {
95 log::trace!("Retrieving migrations count");
96 let payload = "select count(*) as count from midas.__schema_migrations";
97 let row = self.client.query_one(payload, &[])?;
98 let result = row.get::<_, i64>(0);
99 Ok(result)
100 }
101
102 fn get_completed_migrations(&mut self) -> AnyhowResult<VecSerial> {
103 log::trace!("Retrieving all completed migrations");
104 let payload = "select migration from midas.__schema_migrations order by id asc";
105 let it = self.client.query(payload, &[])?;
106 let result = it.iter().map(|r| r.get("migration")).collect::<_>();
107 Ok(result)
108 }
109
110 fn get_last_completed_migration(&mut self) -> AnyhowResult<i64> {
111 log::trace!("Checking and retrieving the last migration stored on migrations table");
112 let payload = "select migration from midas.__schema_migrations order by id desc limit 1";
113 let result = self
114 .client
115 .query(payload, &[])
116 .with_context(|| "Failed to get last completed migration".to_string())?;
117
118 if result.is_empty() {
119 Ok(-1)
120 } else {
121 Ok(result[0].get(0))
122 }
123 }
124
125 fn add_completed_migration(&mut self, migration_number: i64) -> AnyhowResult<()> {
126 log::trace!("Adding migration to migrations table");
127 let payload = "insert into midas.__schema_migrations (migration) values ($1)";
128 self
129 .client
130 .execute(payload, &[&migration_number])
131 .with_context(|| "Failed to add completed migration".to_string())?;
132 Ok(())
133 }
134
135 fn delete_completed_migration(&mut self, migration_number: i64) -> AnyhowResult<()> {
136 log::trace!("Removing a migration in the migrations table");
137 let payload = "delete from midas.__schema_migrations where migration = $1";
138 self
139 .client
140 .execute(payload, &[&migration_number])
141 .with_context(|| "Failed to delete completed migration".to_string())?;
142 Ok(())
143 }
144
145 fn delete_last_completed_migration(&mut self) -> AnyhowResult<()> {
146 let payload =
147 "delete from midas.__schema_migrations where id=(select max(id) from midas.__schema_migrations);";
148 self
149 .client
150 .execute(payload, &[])
151 .with_context(|| "Failed to delete last completed migration".to_string())?;
152 Ok(())
153 }
154
155 fn migrate(&mut self, query: &str, migration_number: i64) -> AnyhowResult<()> {
156 self
157 .client
158 .simple_query(query)
159 .with_context(|| format!("Failed to execute migration - {migration_number}"))?;
160 Ok(())
161 }
162
163 fn db_name(&self) -> &str {
164 &self.database_name
165 }
166}