use bytes::Bytes;
use resp_async::response::RespError;
use sqlx::error::DatabaseError;
use sqlx::{MySqlPool, Row};
pub const TYPE_STRING: u8 = 0;
pub const TYPE_HASH: u8 = 1;
pub const TYPE_LIST: u8 = 2;
pub const TYPE_SET: u8 = 3;
pub const TYPE_ZSET: u8 = 4;
#[derive(Debug, Clone)]
pub struct KeyMeta {
pub r_type: u8,
pub r_len: i64,
#[allow(dead_code)]
pub expires_at_ms: Option<i64>,
}
pub fn type_name(r_type: u8) -> &'static [u8] {
match r_type {
TYPE_STRING => b"string",
TYPE_HASH => b"hash",
TYPE_LIST => b"list",
TYPE_SET => b"set",
TYPE_ZSET => b"zset",
_ => b"none",
}
}
pub fn is_expired(expires_at_ms: Option<i64>, now_ms: i64) -> bool {
matches!(expires_at_ms, Some(exp) if exp <= now_ms)
}
pub async fn load_meta(
pool: &MySqlPool,
key: &Bytes,
now_ms: i64,
) -> Result<Option<KeyMeta>, RespError> {
let row = sqlx::query("SELECT r_type, r_len, expires_at_ms FROM redis_kv WHERE r_key = ?")
.bind(key.as_ref())
.fetch_optional(pool)
.await
.map_err(map_sql_err)?;
let Some(row) = row else {
return Ok(None);
};
let r_type: u8 = row.try_get("r_type").map_err(map_sql_err)?;
let r_len: i64 = row.try_get("r_len").map_err(map_sql_err)?;
let expires_at_ms: Option<i64> = row.try_get("expires_at_ms").map_err(map_sql_err)?;
if is_expired(expires_at_ms, now_ms) {
delete_key_all(pool, key).await?;
return Ok(None);
}
Ok(Some(KeyMeta {
r_type,
r_len,
expires_at_ms,
}))
}
pub fn map_sql_err(err: sqlx::Error) -> RespError {
log_sql_error(&err);
match err {
sqlx::Error::Database(db_err) => map_db_err(&*db_err),
sqlx::Error::Io(_) | sqlx::Error::Tls(_) | sqlx::Error::Protocol(_) => {
RespError::invalid_data("ERR backend unavailable")
}
sqlx::Error::PoolTimedOut | sqlx::Error::PoolClosed | sqlx::Error::WorkerCrashed => {
RespError::invalid_data("ERR backend unavailable")
}
sqlx::Error::RowNotFound => RespError::invalid_data("ERR backend error"),
sqlx::Error::Decode(_)
| sqlx::Error::TypeNotFound { .. }
| sqlx::Error::ColumnNotFound(_) => RespError::invalid_data("ERR backend schema mismatch"),
_ => RespError::invalid_data("ERR backend error"),
}
}
fn log_sql_error(err: &sqlx::Error) {
match err {
sqlx::Error::Database(db_err) => {
let code = db_err
.code()
.map(|c| c.to_string())
.unwrap_or_else(|| "<none>".to_string());
log::error!(
target: "sql",
"db error code={} message={}",
code,
db_err.message()
);
}
other => {
log::error!(target: "sql", "sqlx error: {:?}", other);
}
}
}
fn map_db_err(err: &dyn DatabaseError) -> RespError {
let Some(code) = err.code() else {
return RespError::invalid_data("ERR backend error");
};
let code = code.as_ref().to_string();
match code.as_str() {
"1044" => RespError::invalid_data("ERR backend permission denied"),
"1045" => RespError::invalid_data("ERR backend auth error"),
"1049" => RespError::invalid_data("ERR backend database not found"),
"1054" => RespError::invalid_data("ERR backend schema mismatch"),
"1064" | "1065" => RespError::invalid_data("ERR backend query error"),
"1062" => RespError::invalid_data("ERR backend constraint error"),
"1146" => RespError::invalid_data("ERR backend schema missing"),
"1142" | "1143" => RespError::invalid_data("ERR backend permission denied"),
"1205" => RespError::invalid_data("ERR backend timeout"),
"1213" => RespError::invalid_data("ERR backend transaction conflict"),
"1227" => RespError::invalid_data("ERR backend permission denied"),
"1364" => RespError::invalid_data("ERR backend schema mismatch"),
"1366" => RespError::invalid_data("ERR backend data error"),
"42000" => RespError::invalid_data("ERR backend permission denied"),
_ => RespError::invalid_data(format!("backend error (code {})", code)),
}
}
pub async fn delete_key_all(pool: &MySqlPool, key: &Bytes) -> Result<(), RespError> {
delete_keys_all(pool, std::slice::from_ref(key)).await
}
pub async fn delete_keys_all(pool: &MySqlPool, keys: &[Bytes]) -> Result<(), RespError> {
if keys.is_empty() {
return Ok(());
}
delete_from_table(pool, "redis_hash", keys).await?;
delete_from_table(pool, "redis_set", keys).await?;
delete_from_table(pool, "redis_zset", keys).await?;
delete_from_table(pool, "redis_list", keys).await?;
delete_from_table(pool, "redis_list_meta", keys).await?;
delete_from_table(pool, "redis_kv", keys).await?;
Ok(())
}
pub async fn delete_key_in_tx(
tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
key: &Bytes,
) -> Result<(), RespError> {
for sql in [
"DELETE FROM redis_hash WHERE r_key = ?",
"DELETE FROM redis_set WHERE r_key = ?",
"DELETE FROM redis_zset WHERE r_key = ?",
"DELETE FROM redis_list WHERE r_key = ?",
"DELETE FROM redis_list_meta WHERE r_key = ?",
"DELETE FROM redis_kv WHERE r_key = ?",
] {
sqlx::query(sql)
.bind(key.as_ref())
.execute(&mut **tx)
.await
.map_err(map_sql_err)?;
}
Ok(())
}
async fn delete_from_table(pool: &MySqlPool, table: &str, keys: &[Bytes]) -> Result<(), RespError> {
let mut qb = sqlx::QueryBuilder::new("DELETE FROM ");
qb.push(table);
qb.push(" WHERE r_key IN (");
let mut separated = qb.separated(", ");
for key in keys {
separated.push_bind(key.as_ref());
}
qb.push(")");
qb.build().execute(pool).await.map_err(map_sql_err)?;
Ok(())
}