1use anyhow::{anyhow, Context, Result};
2use chrono::prelude::*;
3use chrono::{DateTime, Utc};
4use dsn::DSN;
5use rand::Rng;
6use sqlx::{
7 mysql::{MySqlConnectOptions, MySqlDatabaseError},
8 ConnectOptions, Connection, Executor,
9};
10use uuid::Uuid;
11
12pub async fn test_rw(dsn: &DSN, now: DateTime<Utc>, range: u32) -> Result<String> {
13 let mut options = MySqlConnectOptions::new()
14 .username(dsn.username.clone().unwrap_or_default().as_ref())
15 .password(dsn.password.clone().unwrap_or_default().as_str())
16 .database(dsn.database.clone().unwrap_or_default().as_ref());
17
18 if let Some(host) = &dsn.host {
19 options = options.host(host.as_str()).port(dsn.port.unwrap_or(3306));
20 } else if let Some(socket) = &dsn.socket {
21 options = options.socket(socket.as_str());
22 }
23
24 let mut conn = match options.connect().await {
25 Ok(conn) => conn,
26 Err(err) => match err {
27 sqlx::Error::Database(db_err) => {
28 if db_err
29 .as_error()
30 .downcast_ref::<MySqlDatabaseError>()
31 .map(MySqlDatabaseError::number)
32 == Some(1049)
33 {
34 let tmp_options = options.clone().database("mysql");
35 let mut tmp_conn = tmp_options.connect().await?;
36 sqlx::query(&format!(
37 "CREATE DATABASE {}",
38 dsn.database.clone().unwrap_or_default()
39 ))
40 .execute(&mut tmp_conn)
41 .await?;
42 drop(tmp_conn);
43 options.connect().await?
44 } else {
45 return Err(db_err.into());
46 }
47 }
48 _ => return Err(err.into()),
49 },
50 };
51
52 let version: Option<String> = sqlx::query_scalar("SELECT VERSION()")
54 .fetch_optional(&mut conn)
55 .await
56 .context("Failed to fetch database version")?;
57
58 let is_read_only: (i32,) = sqlx::query_as("SELECT @@read_only;")
60 .fetch_one(&mut conn)
61 .await
62 .context("Failed to check if the database is in read-only mode")?;
63
64 if is_read_only.0 != 0 {
65 return Ok(format!(
66 "{} - Database is in read-only mode",
67 version.unwrap_or_default()
68 ));
69 }
70
71 let create_table_sql = r#"
73 CREATE TABLE IF NOT EXISTS dbpulse_rw (
74 id INT NOT NULL,
75 t1 INT(11) NOT NULL ,
76 t2 TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
77 uuid CHAR(36) CHARACTER SET ascii,
78 UNIQUE KEY(uuid),
79 PRIMARY KEY(id)
80 ) ENGINE=InnoDB
81 "#;
82
83 conn.execute(create_table_sql).await?;
84
85 let id: u32 = rand::thread_rng().gen_range(0..range);
87 let uuid = Uuid::new_v4();
88
89 sqlx::query(
91 r#"
92 INSERT INTO dbpulse_rw (id, t1, uuid)
93 VALUES (?, ?, ?)
94 ON DUPLICATE KEY UPDATE
95 t1 = VALUES(t1), uuid = VALUES(uuid)
96 "#,
97 )
98 .bind(id)
99 .bind(now.timestamp())
100 .bind(uuid.to_string())
101 .execute(&mut conn)
102 .await?;
103
104 let row: Option<(i64, String)> = sqlx::query_as(
106 r#"
107 SELECT t1, uuid
108 FROM dbpulse_rw
109 WHERE id = ?
110 "#,
111 )
112 .bind(id)
113 .fetch_optional(&mut conn)
114 .await
115 .context("Failed to query the database")?;
116
117 let (t1, v4) = row.context("Expected records")?;
119 if now.timestamp() != t1 || uuid.to_string() != v4 {
120 return Err(anyhow!(
121 "Records don't match: expected ({}, {}), got ({}, {})",
122 now.timestamp(),
123 uuid,
124 t1,
125 v4
126 ));
127 }
128
129 let mut tx = conn.begin().await?;
131 sqlx::query("UPDATE dbpulse_rw SET t1 = ?")
132 .bind(0)
133 .execute(tx.as_mut())
134 .await?;
135 let rows: Vec<i64> = sqlx::query_scalar("SELECT t1 FROM dbpulse_rw")
136 .fetch_all(tx.as_mut())
137 .await
138 .context("Failed to fetch rows")?;
139
140 for row in rows {
141 if row != 0 {
142 return Err(anyhow!("Records don't match: {} != {}", row, 0));
143 }
144 }
145
146 tx.rollback().await?;
148
149 let mut tx = conn.begin().await?;
151 sqlx::query(
152 r#"
153 INSERT INTO dbpulse_rw (id, t1, uuid)
154 VALUES (0, ?, UUID())
155 ON DUPLICATE KEY UPDATE t1 = ?
156 "#,
157 )
158 .bind(now.timestamp())
159 .bind(now.timestamp())
160 .execute(tx.as_mut())
161 .await
162 .context("Failed to insert or update record")?;
163 tx.commit().await?;
164
165 if now.minute() == id {
167 sqlx::query("DROP TABLE dbpulse_rw")
168 .execute(&mut conn)
169 .await
170 .context("Failed to drop table")?;
171 }
172
173 drop(conn);
174
175 version.context("Expected database version")
176}