redis-on-mysql 0.0.1

A Redis-compatible proxy that stores all data and Pub/Sub state in MySQL
Documentation
use std::sync::Arc;

use resp_async::response::RespError;
use sqlx::Row;
use tokio::time;

use bytes::Bytes;

use crate::state::{AppState, now_ms};
use crate::storage::{delete_keys_all, map_sql_err};

/// Spawn background cleanup loops for expiry, pub/sub, and pool eviction.
pub fn spawn_maintenance_tasks(state: AppState) {
    let expire_state = state.clone();
    tokio::spawn(async move {
        let mut interval = time::interval(expire_state.config.cleanup.expire_interval);
        loop {
            interval.tick().await;
            let now = now_ms() as i64;
            let pools = expire_state.pools.snapshot_pools();
            for pool in pools {
                let _ =
                    cleanup_expired_keys(pool, now, expire_state.config.cleanup.expire_batch).await;
            }
        }
    });

    let pubsub_state = state.clone();
    tokio::spawn(async move {
        let mut interval = time::interval(pubsub_state.config.cleanup.pubsub_interval);
        loop {
            interval.tick().await;
            let pools = pubsub_state.pools.snapshot_pools();
            for pool in pools {
                let _ = cleanup_pubsub(
                    pool,
                    pubsub_state.config.cleanup.pubsub_idle_ttl.as_secs(),
                    pubsub_state.config.cleanup.pubsub_message_ttl.as_secs(),
                    pubsub_state.config.cleanup.expire_batch,
                )
                .await;
            }
        }
    });

    let pool_state = state.clone();
    tokio::spawn(async move {
        let mut interval = time::interval(pool_state.config.cleanup.expire_interval);
        loop {
            interval.tick().await;
            let _ = pool_state
                .pools
                .prune_idle(pool_state.config.pool.idle_ttl.as_millis() as u64);
        }
    });
}

async fn cleanup_expired_keys(
    pool: Arc<sqlx::MySqlPool>,
    now_ms: i64,
    batch: u64,
) -> Result<(), RespError> {
    let rows = sqlx::query(
        "SELECT r_key FROM redis_kv WHERE expires_at_ms IS NOT NULL AND expires_at_ms <= ? LIMIT ?",
    )
    .bind(now_ms)
    .bind(batch as i64)
    .fetch_all(pool.as_ref())
    .await
    .map_err(map_sql_err)?;
    if rows.is_empty() {
        return Ok(());
    }
    let mut keys = Vec::with_capacity(rows.len());
    for row in rows {
        let key: Vec<u8> = row.try_get("r_key").map_err(map_sql_err)?;
        keys.push(Bytes::from(key));
    }
    delete_keys_all(pool.as_ref(), &keys).await?;
    Ok(())
}

async fn cleanup_pubsub(
    pool: Arc<sqlx::MySqlPool>,
    idle_secs: u64,
    message_ttl_secs: u64,
    batch: u64,
) -> Result<(), RespError> {
    let mut tx = pool.begin().await.map_err(map_sql_err)?;

    sqlx::query(
        "DELETE FROM redis_pubsub_subscription WHERE subscriber_id IN \
         (SELECT id FROM redis_pubsub_subscriber WHERE last_seen < DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL ? SECOND))",
    )
    .bind(idle_secs as i64)
    .execute(&mut *tx)
    .await
    .map_err(map_sql_err)?;

    sqlx::query(
        "DELETE FROM redis_pubsub_mailbox WHERE subscriber_id IN \
         (SELECT id FROM redis_pubsub_subscriber WHERE last_seen < DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL ? SECOND))",
    )
    .bind(idle_secs as i64)
    .execute(&mut *tx)
    .await
    .map_err(map_sql_err)?;

    sqlx::query(
        "DELETE FROM redis_pubsub_subscriber \
         WHERE last_seen < DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL ? SECOND)",
    )
    .bind(idle_secs as i64)
    .execute(&mut *tx)
    .await
    .map_err(map_sql_err)?;

    sqlx::query(
        "DELETE FROM redis_pubsub_message \
         WHERE created_at < DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL ? SECOND) LIMIT ?",
    )
    .bind(message_ttl_secs as i64)
    .bind(batch as i64)
    .execute(&mut *tx)
    .await
    .map_err(map_sql_err)?;

    tx.commit().await.map_err(map_sql_err)?;
    Ok(())
}