midas_core/sequel/
mysql.rs1use anyhow::Context as _;
2use indoc::{
3 formatdoc,
4 indoc,
5};
6use mysql::prelude::Queryable;
7use mysql::{
8 params,
9 Pool,
10 PooledConn,
11};
12
13use super::{
14 AnyhowResult,
15 Driver as SequelDriver,
16 VecSerial,
17};
18
19pub struct Mysql {
21 conn: PooledConn,
23
24 database_name: String,
26}
27
28impl Mysql {
30 pub fn new(database_url: &str) -> AnyhowResult<Self> {
31 let pool = Pool::new(database_url)?;
33 let conn = pool.get_conn()?;
34
35 let url = url::Url::parse(database_url)?;
37 let database_name = url
38 .path_segments()
39 .and_then(|s| s.last())
40 .context("Database name not found")?;
41
42 let mut db = Mysql {
44 conn,
45 database_name: database_name.into(),
46 };
47
48 db.ensure_midas_schema()?;
50 Ok(db)
51 }
52}
53
54impl SequelDriver for Mysql {
56 fn ensure_midas_schema(&mut self) -> AnyhowResult<()> {
58 let payload = indoc! {"
59 CREATE TABLE IF NOT EXISTS __schema_migrations (
60 id INT NOT NULL AUTO_INCREMENT,
61 migration BIGINT,
62 PRIMARY KEY (id)
63 ) AUTO_INCREMENT = 100;
64 "};
65 self.conn.query_drop(payload)?;
66 Ok(())
67 }
68
69 fn drop_migration_table(&mut self) -> AnyhowResult<()> {
71 let payload = "DROP TABLE __schema_migrations";
72 self.conn.query_drop(payload)?;
73 Ok(())
74 }
75
76 fn drop_database(&mut self, db_name: &str) -> AnyhowResult<()> {
78 let payload = formatdoc! {"
79 DROP DATABASE IF EXISTS `{db_name}`;
80 CREATE DATABASE `{db_name}`;
81 ", db_name = db_name};
82 self.conn.exec_drop(payload, ())?;
83 Ok(())
84 }
85
86 fn count_migrations(&mut self) -> AnyhowResult<i64> {
88 log::trace!("Retrieving migrations count");
89 let payload = "SELECT COUNT(*) as count FROM __schema_migrations";
90 let row: Option<i64> = self.conn.query_first(payload)?;
91 let result = row.unwrap();
92 Ok(result)
93 }
94
95 fn get_completed_migrations(&mut self) -> AnyhowResult<VecSerial> {
97 log::trace!("Retrieving all completed migrations");
98 let payload = "SELECT migration FROM __schema_migrations ORDER BY id ASC";
99 let result: VecSerial = self.conn.query(payload)?;
100 Ok(result)
101 }
102
103 fn get_last_completed_migration(&mut self) -> AnyhowResult<i64> {
105 log::trace!("Checking and retrieving the last migration stored on migrations table");
106 let payload = "SELECT migration FROM __schema_migrations ORDER BY id DESC LIMIT 1";
107 let row: Option<i64> = self.conn.query_first(payload)?;
108 let result = row.unwrap();
109 Ok(result)
110 }
111
112 fn add_completed_migration(&mut self, migration_number: i64) -> AnyhowResult<()> {
114 log::trace!("Adding migration to migrations table");
115 let payload = "INSERT INTO __schema_migrations (migration) VALUES (:migration_number)";
116 self
117 .conn
118 .exec_drop(payload, params! { "migration_number" => migration_number })?;
119 Ok(())
120 }
121
122 fn delete_completed_migration(&mut self, migration_number: i64) -> AnyhowResult<()> {
124 log::trace!("Removing a migration in the migrations table");
125 let payload = "DELETE FROM __schema_migrations WHERE migration = :migration_number";
126 self
127 .conn
128 .exec_drop(payload, params! { "migration_number" => migration_number })?;
129 Ok(())
130 }
131
132 fn delete_last_completed_migration(&mut self) -> AnyhowResult<()> {
134 let payload = "DELETE FROM __schema_migrations WHERE id=(SELECT MAX(id) FROM __schema_migrations);";
135 self.conn.query_drop(payload)?;
136 Ok(())
137 }
138
139 fn migrate(&mut self, query: &str, _migration_number: i64) -> AnyhowResult<()> {
141 self.conn.query_drop(query)?;
142 Ok(())
143 }
144
145 fn db_name(&self) -> &str {
147 &self.database_name
148 }
149}