midas_core/sequel/
mysql.rs

1use anyhow::Context as _;
2use indoc::{
3  formatdoc,
4  indoc,
5};
6use mysql::prelude::Queryable;
7use mysql::{
8  params,
9  Pool,
10  PooledConn,
11};
12
13use super::{
14  AnyhowResult,
15  Driver as SequelDriver,
16  VecSerial,
17};
18
19/// The MySQL struct definition
20pub struct Mysql {
21  /// The MySQL connection
22  conn: PooledConn,
23
24  /// The database name
25  database_name: String,
26}
27
28/// Implement the MySQL struct
29impl Mysql {
30  pub fn new(database_url: &str) -> AnyhowResult<Self> {
31    // Open the connection
32    let pool = Pool::new(database_url)?;
33    let conn = pool.get_conn()?;
34
35    // Get the database name from the URL
36    let url = url::Url::parse(database_url)?;
37    let database_name = url
38      .path_segments()
39      .and_then(|s| s.last())
40      .context("Database name not found")?;
41
42    // Create a new instance of MySQL
43    let mut db = Mysql {
44      conn,
45      database_name: database_name.into(),
46    };
47
48    // Ensure the midas schema
49    db.ensure_midas_schema()?;
50    Ok(db)
51  }
52}
53
54/// Implement the SequelDriver trait for MySQL
55impl SequelDriver for Mysql {
56  /// Implement the ensure_midas_schema method
57  fn ensure_midas_schema(&mut self) -> AnyhowResult<()> {
58    let payload = indoc! {"
59      CREATE TABLE IF NOT EXISTS __schema_migrations (
60        id INT NOT NULL AUTO_INCREMENT,
61        migration BIGINT,
62        PRIMARY KEY (id)
63      ) AUTO_INCREMENT = 100;
64    "};
65    self.conn.query_drop(payload)?;
66    Ok(())
67  }
68
69  /// Drop the migration table
70  fn drop_migration_table(&mut self) -> AnyhowResult<()> {
71    let payload = "DROP TABLE __schema_migrations";
72    self.conn.query_drop(payload)?;
73    Ok(())
74  }
75
76  /// Drop the database
77  fn drop_database(&mut self, db_name: &str) -> AnyhowResult<()> {
78    let payload = formatdoc! {"
79      DROP DATABASE IF EXISTS `{db_name}`;
80      CREATE DATABASE `{db_name}`;
81    ", db_name = db_name};
82    self.conn.exec_drop(payload, ())?;
83    Ok(())
84  }
85
86  /// Count the number of migrations
87  fn count_migrations(&mut self) -> AnyhowResult<i64> {
88    log::trace!("Retrieving migrations count");
89    let payload = "SELECT COUNT(*) as count FROM __schema_migrations";
90    let row: Option<i64> = self.conn.query_first(payload)?;
91    let result = row.unwrap();
92    Ok(result)
93  }
94
95  /// Get all completed migrations
96  fn get_completed_migrations(&mut self) -> AnyhowResult<VecSerial> {
97    log::trace!("Retrieving all completed migrations");
98    let payload = "SELECT migration FROM __schema_migrations ORDER BY id ASC";
99    let result: VecSerial = self.conn.query(payload)?;
100    Ok(result)
101  }
102
103  /// Get the last completed migration
104  fn get_last_completed_migration(&mut self) -> AnyhowResult<i64> {
105    log::trace!("Checking and retrieving the last migration stored on migrations table");
106    let payload = "SELECT migration FROM __schema_migrations ORDER BY id DESC LIMIT 1";
107    let row: Option<i64> = self.conn.query_first(payload)?;
108    let result = row.unwrap();
109    Ok(result)
110  }
111
112  /// Add a completed migration
113  fn add_completed_migration(&mut self, migration_number: i64) -> AnyhowResult<()> {
114    log::trace!("Adding migration to migrations table");
115    let payload = "INSERT INTO __schema_migrations (migration) VALUES (:migration_number)";
116    self
117      .conn
118      .exec_drop(payload, params! { "migration_number" => migration_number })?;
119    Ok(())
120  }
121
122  /// Delete a completed migration
123  fn delete_completed_migration(&mut self, migration_number: i64) -> AnyhowResult<()> {
124    log::trace!("Removing a migration in the migrations table");
125    let payload = "DELETE FROM __schema_migrations WHERE migration = :migration_number";
126    self
127      .conn
128      .exec_drop(payload, params! { "migration_number" => migration_number })?;
129    Ok(())
130  }
131
132  /// Delete the last completed migration
133  fn delete_last_completed_migration(&mut self) -> AnyhowResult<()> {
134    let payload = "DELETE FROM __schema_migrations WHERE id=(SELECT MAX(id) FROM __schema_migrations);";
135    self.conn.query_drop(payload)?;
136    Ok(())
137  }
138
139  /// Run a migration
140  fn migrate(&mut self, query: &str, _migration_number: i64) -> AnyhowResult<()> {
141    self.conn.query_drop(query)?;
142    Ok(())
143  }
144
145  /// Get the database name
146  fn db_name(&self) -> &str {
147    &self.database_name
148  }
149}