dbpulse/queries/
mysql.rs

1use anyhow::{anyhow, Context, Result};
2use chrono::prelude::*;
3use chrono::{DateTime, Utc};
4use dsn::DSN;
5use rand::Rng;
6use sqlx::{
7    mysql::{MySqlConnectOptions, MySqlDatabaseError},
8    ConnectOptions, Connection, Executor,
9};
10use uuid::Uuid;
11
12pub async fn test_rw(dsn: &DSN, now: DateTime<Utc>, range: u32) -> Result<String> {
13    let mut options = MySqlConnectOptions::new()
14        .username(dsn.username.clone().unwrap_or_default().as_ref())
15        .password(dsn.password.clone().unwrap_or_default().as_str())
16        .database(dsn.database.clone().unwrap_or_default().as_ref());
17
18    if let Some(host) = &dsn.host {
19        options = options.host(host.as_str()).port(dsn.port.unwrap_or(3306));
20    } else if let Some(socket) = &dsn.socket {
21        options = options.socket(socket.as_str());
22    }
23
24    let mut conn = match options.connect().await {
25        Ok(conn) => conn,
26        Err(err) => match err {
27            sqlx::Error::Database(db_err) => {
28                if db_err
29                    .as_error()
30                    .downcast_ref::<MySqlDatabaseError>()
31                    .map(MySqlDatabaseError::number)
32                    == Some(1049)
33                {
34                    let tmp_options = options.clone().database("mysql");
35                    let mut tmp_conn = tmp_options.connect().await?;
36                    sqlx::query(&format!(
37                        "CREATE DATABASE {}",
38                        dsn.database.clone().unwrap_or_default()
39                    ))
40                    .execute(&mut tmp_conn)
41                    .await?;
42                    drop(tmp_conn);
43                    options.connect().await?
44                } else {
45                    return Err(db_err.into());
46                }
47            }
48            _ => return Err(err.into()),
49        },
50    };
51
52    // Get database version
53    let version: Option<String> = sqlx::query_scalar("SELECT VERSION()")
54        .fetch_optional(&mut conn)
55        .await
56        .context("Failed to fetch database version")?;
57
58    // check if db is in read-only mode
59    let is_read_only: (i32,) = sqlx::query_as("SELECT @@read_only;")
60        .fetch_one(&mut conn)
61        .await
62        .context("Failed to check if the database is in read-only mode")?;
63
64    if is_read_only.0 != 0 {
65        return Ok(format!(
66            "{} - Database is in read-only mode",
67            version.unwrap_or_default()
68        ));
69    }
70
71    // create table
72    let create_table_sql = r#"
73        CREATE TABLE IF NOT EXISTS dbpulse_rw (
74            id INT NOT NULL,
75            t1 INT(11) NOT NULL ,
76            t2 TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
77            uuid CHAR(36) CHARACTER SET ascii,
78            UNIQUE KEY(uuid),
79            PRIMARY KEY(id)
80        ) ENGINE=InnoDB
81    "#;
82
83    conn.execute(create_table_sql).await?;
84
85    // write into table
86    let id: u32 = rand::thread_rng().gen_range(0..range);
87    let uuid = Uuid::new_v4();
88
89    // SQL Query
90    sqlx::query(
91        r#"
92        INSERT INTO dbpulse_rw (id, t1, uuid)
93        VALUES (?, ?, ?)
94        ON DUPLICATE KEY UPDATE
95        t1 = VALUES(t1), uuid = VALUES(uuid)
96        "#,
97    )
98    .bind(id)
99    .bind(now.timestamp())
100    .bind(uuid.to_string())
101    .execute(&mut conn)
102    .await?;
103
104    // Check if stored record matches
105    let row: Option<(i64, String)> = sqlx::query_as(
106        r#"
107        SELECT t1, uuid
108        FROM dbpulse_rw
109        WHERE id = ?
110        "#,
111    )
112    .bind(id)
113    .fetch_optional(&mut conn)
114    .await
115    .context("Failed to query the database")?;
116
117    // Ensure the row exists and matches
118    let (t1, v4) = row.context("Expected records")?;
119    if now.timestamp() != t1 || uuid.to_string() != v4 {
120        return Err(anyhow!(
121            "Records don't match: expected ({}, {}), got ({}, {})",
122            now.timestamp(),
123            uuid,
124            t1,
125            v4
126        ));
127    }
128
129    // Start a transaction to set all `t1` records to 0
130    let mut tx = conn.begin().await?;
131    sqlx::query("UPDATE dbpulse_rw SET t1 = ?")
132        .bind(0)
133        .execute(tx.as_mut())
134        .await?;
135    let rows: Vec<i64> = sqlx::query_scalar("SELECT t1 FROM dbpulse_rw")
136        .fetch_all(tx.as_mut())
137        .await
138        .context("Failed to fetch rows")?;
139
140    for row in rows {
141        if row != 0 {
142            return Err(anyhow!("Records don't match: {} != {}", row, 0));
143        }
144    }
145
146    // Roll back this transaction
147    tx.rollback().await?;
148
149    // Start a new transaction to update record 0 with current timestamp
150    let mut tx = conn.begin().await?;
151    sqlx::query(
152        r#"
153        INSERT INTO dbpulse_rw (id, t1, uuid)
154        VALUES (0, ?, UUID())
155        ON DUPLICATE KEY UPDATE t1 = ?
156        "#,
157    )
158    .bind(now.timestamp())
159    .bind(now.timestamp())
160    .execute(tx.as_mut())
161    .await
162    .context("Failed to insert or update record")?;
163    tx.commit().await?;
164
165    // Drop the table conditionally
166    if now.minute() == id {
167        sqlx::query("DROP TABLE dbpulse_rw")
168            .execute(&mut conn)
169            .await
170            .context("Failed to drop table")?;
171    }
172
173    drop(conn);
174
175    version.context("Expected database version")
176}