use log::LevelFilter;
use sea_query::Values;
use std::{future::Future, pin::Pin, sync::Arc};
use sqlx::{
mysql::{MySqlConnectOptions, MySqlQueryResult, MySqlRow},
pool::PoolConnection,
Connection, Executor, MySql, MySqlPool,
};
use sea_query_binder::SqlxValues;
use tracing::instrument;
use crate::{
debug_print, error::*, executor::*, AccessMode, ConnectOptions, DatabaseConnection,
DatabaseTransaction, DbBackend, IsolationLevel, QueryStream, Statement, TransactionError,
};
use super::sqlx_common::*;
#[derive(Debug)]
pub struct SqlxMySqlConnector;
#[derive(Clone)]
pub struct SqlxMySqlPoolConnection {
pub(crate) pool: MySqlPool,
metric_callback: Option<crate::metric::Callback>,
}
impl std::fmt::Debug for SqlxMySqlPoolConnection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "SqlxMySqlPoolConnection {{ pool: {:?} }}", self.pool)
}
}
impl SqlxMySqlConnector {
pub fn accepts(string: &str) -> bool {
string.starts_with("mysql://") && string.parse::<MySqlConnectOptions>().is_ok()
}
#[instrument(level = "trace")]
pub async fn connect(options: ConnectOptions) -> Result<DatabaseConnection, DbErr> {
let mut opt = options
.url
.parse::<MySqlConnectOptions>()
.map_err(sqlx_error_to_conn_err)?;
use sqlx::ConnectOptions;
if !options.sqlx_logging {
opt = opt.disable_statement_logging();
} else {
opt = opt.log_statements(options.sqlx_logging_level);
if options.sqlx_slow_statements_logging_level != LevelFilter::Off {
opt = opt.log_slow_statements(
options.sqlx_slow_statements_logging_level,
options.sqlx_slow_statements_logging_threshold,
);
}
}
match options.pool_options().connect_with(opt).await {
Ok(pool) => Ok(DatabaseConnection::SqlxMySqlPoolConnection(
SqlxMySqlPoolConnection {
pool,
metric_callback: None,
},
)),
Err(e) => Err(sqlx_error_to_conn_err(e)),
}
}
}
impl SqlxMySqlConnector {
pub fn from_sqlx_mysql_pool(pool: MySqlPool) -> DatabaseConnection {
DatabaseConnection::SqlxMySqlPoolConnection(SqlxMySqlPoolConnection {
pool,
metric_callback: None,
})
}
}
impl SqlxMySqlPoolConnection {
#[instrument(level = "trace")]
pub async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> {
debug_print!("{}", stmt);
let query = sqlx_query(&stmt);
let mut conn = self.pool.acquire().await.map_err(conn_acquire_err)?;
crate::metric::metric!(self.metric_callback, &stmt, {
match query.execute(&mut *conn).await {
Ok(res) => Ok(res.into()),
Err(err) => Err(sqlx_error_to_exec_err(err)),
}
})
}
#[instrument(level = "trace")]
pub async fn execute_unprepared(&self, sql: &str) -> Result<ExecResult, DbErr> {
debug_print!("{}", sql);
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
match conn.execute(sql).await {
Ok(res) => Ok(res.into()),
Err(err) => Err(sqlx_error_to_exec_err(err)),
}
}
#[instrument(level = "trace")]
pub async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> {
debug_print!("{}", stmt);
let query = sqlx_query(&stmt);
let mut conn = self.pool.acquire().await.map_err(conn_acquire_err)?;
crate::metric::metric!(self.metric_callback, &stmt, {
match query.fetch_one(&mut *conn).await {
Ok(row) => Ok(Some(row.into())),
Err(err) => match err {
sqlx::Error::RowNotFound => Ok(None),
_ => Err(sqlx_error_to_query_err(err)),
},
}
})
}
#[instrument(level = "trace")]
pub async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> {
debug_print!("{}", stmt);
let query = sqlx_query(&stmt);
let mut conn = self.pool.acquire().await.map_err(conn_acquire_err)?;
crate::metric::metric!(self.metric_callback, &stmt, {
match query.fetch_all(&mut *conn).await {
Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()),
Err(err) => Err(sqlx_error_to_query_err(err)),
}
})
}
#[instrument(level = "trace")]
pub async fn stream(&self, stmt: Statement) -> Result<QueryStream, DbErr> {
debug_print!("{}", stmt);
let conn = self.pool.acquire().await.map_err(conn_acquire_err)?;
Ok(QueryStream::from((
conn,
stmt,
self.metric_callback.clone(),
)))
}
#[instrument(level = "trace")]
pub async fn begin(
&self,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<DatabaseTransaction, DbErr> {
let conn = self.pool.acquire().await.map_err(conn_acquire_err)?;
DatabaseTransaction::new_mysql(
conn,
self.metric_callback.clone(),
isolation_level,
access_mode,
)
.await
}
#[instrument(level = "trace", skip(callback))]
pub async fn transaction<F, T, E>(
&self,
callback: F,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<T, TransactionError<E>>
where
F: for<'b> FnOnce(
&'b DatabaseTransaction,
) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'b>>
+ Send,
T: Send,
E: std::error::Error + Send,
{
let conn = self.pool.acquire().await.map_err(conn_acquire_err)?;
let transaction = DatabaseTransaction::new_mysql(
conn,
self.metric_callback.clone(),
isolation_level,
access_mode,
)
.await
.map_err(|e| TransactionError::Connection(e))?;
transaction.run(callback).await
}
pub(crate) fn set_metric_callback<F>(&mut self, callback: F)
where
F: Fn(&crate::metric::Info<'_>) + Send + Sync + 'static,
{
self.metric_callback = Some(Arc::new(callback));
}
pub async fn ping(&self) -> Result<(), DbErr> {
let conn = &mut self.pool.acquire().await.map_err(conn_acquire_err)?;
match conn.ping().await {
Ok(_) => Ok(()),
Err(err) => Err(sqlx_error_to_conn_err(err)),
}
}
pub async fn close(self) -> Result<(), DbErr> {
self.pool.close().await;
Ok(())
}
}
impl From<MySqlRow> for QueryResult {
fn from(row: MySqlRow) -> QueryResult {
QueryResult {
row: QueryResultRow::SqlxMySql(row),
}
}
}
impl From<MySqlQueryResult> for ExecResult {
fn from(result: MySqlQueryResult) -> ExecResult {
ExecResult {
result: ExecResultHolder::SqlxMySql(result),
}
}
}
pub(crate) fn sqlx_query(stmt: &Statement) -> sqlx::query::Query<'_, MySql, SqlxValues> {
let values = stmt
.values
.as_ref()
.map_or(Values(Vec::new()), |values| values.clone());
sqlx::query_with(&stmt.sql, SqlxValues(values))
}
pub(crate) async fn set_transaction_config(
conn: &mut PoolConnection<MySql>,
isolation_level: Option<IsolationLevel>,
access_mode: Option<AccessMode>,
) -> Result<(), DbErr> {
if let Some(isolation_level) = isolation_level {
let stmt = Statement {
sql: format!("SET TRANSACTION ISOLATION LEVEL {isolation_level}"),
values: None,
db_backend: DbBackend::MySql,
};
let query = sqlx_query(&stmt);
conn.execute(query).await.map_err(sqlx_error_to_exec_err)?;
}
if let Some(access_mode) = access_mode {
let stmt = Statement {
sql: format!("SET TRANSACTION {access_mode}"),
values: None,
db_backend: DbBackend::MySql,
};
let query = sqlx_query(&stmt);
conn.execute(query).await.map_err(sqlx_error_to_exec_err)?;
}
Ok(())
}