use std::sync::Arc;
use resp_async::response::RespError;
use sqlx::Row;
use tokio::time;
use bytes::Bytes;
use crate::state::{AppState, now_ms};
use crate::storage::{delete_keys_all, map_sql_err};
pub fn spawn_maintenance_tasks(state: AppState) {
let expire_state = state.clone();
tokio::spawn(async move {
let mut interval = time::interval(expire_state.config.cleanup.expire_interval);
loop {
interval.tick().await;
let now = now_ms() as i64;
let pools = expire_state.pools.snapshot_pools();
for pool in pools {
let _ =
cleanup_expired_keys(pool, now, expire_state.config.cleanup.expire_batch).await;
}
}
});
let pubsub_state = state.clone();
tokio::spawn(async move {
let mut interval = time::interval(pubsub_state.config.cleanup.pubsub_interval);
loop {
interval.tick().await;
let pools = pubsub_state.pools.snapshot_pools();
for pool in pools {
let _ = cleanup_pubsub(
pool,
pubsub_state.config.cleanup.pubsub_idle_ttl.as_secs(),
pubsub_state.config.cleanup.pubsub_message_ttl.as_secs(),
pubsub_state.config.cleanup.expire_batch,
)
.await;
}
}
});
let pool_state = state.clone();
tokio::spawn(async move {
let mut interval = time::interval(pool_state.config.cleanup.expire_interval);
loop {
interval.tick().await;
let _ = pool_state
.pools
.prune_idle(pool_state.config.pool.idle_ttl.as_millis() as u64);
}
});
}
async fn cleanup_expired_keys(
pool: Arc<sqlx::MySqlPool>,
now_ms: i64,
batch: u64,
) -> Result<(), RespError> {
let rows = sqlx::query(
"SELECT r_key FROM redis_kv WHERE expires_at_ms IS NOT NULL AND expires_at_ms <= ? LIMIT ?",
)
.bind(now_ms)
.bind(batch as i64)
.fetch_all(pool.as_ref())
.await
.map_err(map_sql_err)?;
if rows.is_empty() {
return Ok(());
}
let mut keys = Vec::with_capacity(rows.len());
for row in rows {
let key: Vec<u8> = row.try_get("r_key").map_err(map_sql_err)?;
keys.push(Bytes::from(key));
}
delete_keys_all(pool.as_ref(), &keys).await?;
Ok(())
}
async fn cleanup_pubsub(
pool: Arc<sqlx::MySqlPool>,
idle_secs: u64,
message_ttl_secs: u64,
batch: u64,
) -> Result<(), RespError> {
let mut tx = pool.begin().await.map_err(map_sql_err)?;
sqlx::query(
"DELETE FROM redis_pubsub_subscription WHERE subscriber_id IN \
(SELECT id FROM redis_pubsub_subscriber WHERE last_seen < DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL ? SECOND))",
)
.bind(idle_secs as i64)
.execute(&mut *tx)
.await
.map_err(map_sql_err)?;
sqlx::query(
"DELETE FROM redis_pubsub_mailbox WHERE subscriber_id IN \
(SELECT id FROM redis_pubsub_subscriber WHERE last_seen < DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL ? SECOND))",
)
.bind(idle_secs as i64)
.execute(&mut *tx)
.await
.map_err(map_sql_err)?;
sqlx::query(
"DELETE FROM redis_pubsub_subscriber \
WHERE last_seen < DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL ? SECOND)",
)
.bind(idle_secs as i64)
.execute(&mut *tx)
.await
.map_err(map_sql_err)?;
sqlx::query(
"DELETE FROM redis_pubsub_message \
WHERE created_at < DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL ? SECOND) LIMIT ?",
)
.bind(message_ttl_secs as i64)
.bind(batch as i64)
.execute(&mut *tx)
.await
.map_err(map_sql_err)?;
tx.commit().await.map_err(map_sql_err)?;
Ok(())
}