redis-on-mysql 0.0.1

A Redis-compatible proxy that stores all data and Pub/Sub state in MySQL
Documentation
use bytes::Bytes;
use resp_async::response::RespError;
use sqlx::error::DatabaseError;
use sqlx::{MySqlPool, Row};

pub const TYPE_STRING: u8 = 0;
pub const TYPE_HASH: u8 = 1;
pub const TYPE_LIST: u8 = 2;
pub const TYPE_SET: u8 = 3;
pub const TYPE_ZSET: u8 = 4;

#[derive(Debug, Clone)]
pub struct KeyMeta {
    pub r_type: u8,
    pub r_len: i64,
    #[allow(dead_code)]
    pub expires_at_ms: Option<i64>,
}

pub fn type_name(r_type: u8) -> &'static [u8] {
    match r_type {
        TYPE_STRING => b"string",
        TYPE_HASH => b"hash",
        TYPE_LIST => b"list",
        TYPE_SET => b"set",
        TYPE_ZSET => b"zset",
        _ => b"none",
    }
}

pub fn is_expired(expires_at_ms: Option<i64>, now_ms: i64) -> bool {
    matches!(expires_at_ms, Some(exp) if exp <= now_ms)
}

pub async fn load_meta(
    pool: &MySqlPool,
    key: &Bytes,
    now_ms: i64,
) -> Result<Option<KeyMeta>, RespError> {
    let row = sqlx::query("SELECT r_type, r_len, expires_at_ms FROM redis_kv WHERE r_key = ?")
        .bind(key.as_ref())
        .fetch_optional(pool)
        .await
        .map_err(map_sql_err)?;
    let Some(row) = row else {
        return Ok(None);
    };
    let r_type: u8 = row.try_get("r_type").map_err(map_sql_err)?;
    let r_len: i64 = row.try_get("r_len").map_err(map_sql_err)?;
    let expires_at_ms: Option<i64> = row.try_get("expires_at_ms").map_err(map_sql_err)?;
    if is_expired(expires_at_ms, now_ms) {
        delete_key_all(pool, key).await?;
        return Ok(None);
    }
    Ok(Some(KeyMeta {
        r_type,
        r_len,
        expires_at_ms,
    }))
}

pub fn map_sql_err(err: sqlx::Error) -> RespError {
    log_sql_error(&err);
    match err {
        sqlx::Error::Database(db_err) => map_db_err(&*db_err),
        sqlx::Error::Io(_) | sqlx::Error::Tls(_) | sqlx::Error::Protocol(_) => {
            RespError::invalid_data("ERR backend unavailable")
        }
        sqlx::Error::PoolTimedOut | sqlx::Error::PoolClosed | sqlx::Error::WorkerCrashed => {
            RespError::invalid_data("ERR backend unavailable")
        }
        sqlx::Error::RowNotFound => RespError::invalid_data("ERR backend error"),
        sqlx::Error::Decode(_)
        | sqlx::Error::TypeNotFound { .. }
        | sqlx::Error::ColumnNotFound(_) => RespError::invalid_data("ERR backend schema mismatch"),
        _ => RespError::invalid_data("ERR backend error"),
    }
}

fn log_sql_error(err: &sqlx::Error) {
    match err {
        sqlx::Error::Database(db_err) => {
            let code = db_err
                .code()
                .map(|c| c.to_string())
                .unwrap_or_else(|| "<none>".to_string());
            log::error!(
                target: "sql",
                "db error code={} message={}",
                code,
                db_err.message()
            );
        }
        other => {
            log::error!(target: "sql", "sqlx error: {:?}", other);
        }
    }
}

fn map_db_err(err: &dyn DatabaseError) -> RespError {
    let Some(code) = err.code() else {
        return RespError::invalid_data("ERR backend error");
    };
    let code = code.as_ref().to_string();
    match code.as_str() {
        "1044" => RespError::invalid_data("ERR backend permission denied"),
        "1045" => RespError::invalid_data("ERR backend auth error"),
        "1049" => RespError::invalid_data("ERR backend database not found"),
        "1054" => RespError::invalid_data("ERR backend schema mismatch"),
        "1064" | "1065" => RespError::invalid_data("ERR backend query error"),
        "1062" => RespError::invalid_data("ERR backend constraint error"),
        "1146" => RespError::invalid_data("ERR backend schema missing"),
        "1142" | "1143" => RespError::invalid_data("ERR backend permission denied"),
        "1205" => RespError::invalid_data("ERR backend timeout"),
        "1213" => RespError::invalid_data("ERR backend transaction conflict"),
        "1227" => RespError::invalid_data("ERR backend permission denied"),
        "1364" => RespError::invalid_data("ERR backend schema mismatch"),
        "1366" => RespError::invalid_data("ERR backend data error"),
        "42000" => RespError::invalid_data("ERR backend permission denied"),
        _ => RespError::invalid_data(format!("backend error (code {})", code)),
    }
}

pub async fn delete_key_all(pool: &MySqlPool, key: &Bytes) -> Result<(), RespError> {
    delete_keys_all(pool, std::slice::from_ref(key)).await
}

pub async fn delete_keys_all(pool: &MySqlPool, keys: &[Bytes]) -> Result<(), RespError> {
    if keys.is_empty() {
        return Ok(());
    }
    delete_from_table(pool, "redis_hash", keys).await?;
    delete_from_table(pool, "redis_set", keys).await?;
    delete_from_table(pool, "redis_zset", keys).await?;
    delete_from_table(pool, "redis_list", keys).await?;
    delete_from_table(pool, "redis_list_meta", keys).await?;
    delete_from_table(pool, "redis_kv", keys).await?;
    Ok(())
}

pub async fn delete_key_in_tx(
    tx: &mut sqlx::Transaction<'_, sqlx::MySql>,
    key: &Bytes,
) -> Result<(), RespError> {
    for sql in [
        "DELETE FROM redis_hash WHERE r_key = ?",
        "DELETE FROM redis_set WHERE r_key = ?",
        "DELETE FROM redis_zset WHERE r_key = ?",
        "DELETE FROM redis_list WHERE r_key = ?",
        "DELETE FROM redis_list_meta WHERE r_key = ?",
        "DELETE FROM redis_kv WHERE r_key = ?",
    ] {
        sqlx::query(sql)
            .bind(key.as_ref())
            .execute(&mut **tx)
            .await
            .map_err(map_sql_err)?;
    }
    Ok(())
}

async fn delete_from_table(pool: &MySqlPool, table: &str, keys: &[Bytes]) -> Result<(), RespError> {
    let mut qb = sqlx::QueryBuilder::new("DELETE FROM ");
    qb.push(table);
    qb.push(" WHERE r_key IN (");
    let mut separated = qb.separated(", ");
    for key in keys {
        separated.push_bind(key.as_ref());
    }
    qb.push(")");
    qb.build().execute(pool).await.map_err(map_sql_err)?;
    Ok(())
}