use crate::card_service::REDIS_CONNECT_CELL;
use std::future::Future;
use std::time::Duration;
use crate::static_def::{BASE_CONFIG, CONFIG};
use anyhow::Result;
use once_cell::sync::OnceCell;
use redis::aio::{ConnectionManager, ConnectionManagerConfig};
use redis::{Client, IntoConnectionInfo, RedisResult};
use tokio::sync::Mutex;
const REDIS_RETRY_COUNT: usize = 5;
static LOCK_REDIS_INIT: Mutex<()> = Mutex::const_new(());
pub struct RedisConnect {
connect: ConnectionManager,
}
#[derive(Debug, Clone, Default)]
pub struct UnfinishInfo {
pub level_id: i32,
pub bet_money: f64,
pub tag: i32,
pub drawing: i32,
pub index: i64,
pub position: u32,
key: OnceCell<String>,
}
impl UnfinishInfo {
#[inline]
pub fn get_redis_key(&self) -> &str {
self.key.get_or_init(|| {
format!(
"game_cache:{}_{}_{}_{}",
BASE_CONFIG.base.server_id, self.level_id, self.bet_money, self.tag
)
})
}
}
impl RedisConnect {
pub async fn init() -> Result<()> {
let _guard = LOCK_REDIS_INIT.lock().await;
if !REDIS_CONNECT_CELL.initialized() {
let test_connect_info = (&*CONFIG.redis.redis_url).into_connection_info()?;
let url = if test_connect_info.redis_settings().db() == 0
&& CONFIG.redis.redis_index.unwrap_or(0) != 0
{
format!(
"{}/{}",
CONFIG.redis.redis_url,
CONFIG.redis.redis_index.unwrap_or(0)
)
} else {
CONFIG.redis.redis_url.clone()
};
let connect = ConnectionManager::new_with_config(
Client::open(url)?,
ConnectionManagerConfig::new()
.set_connection_timeout(Some(Duration::from_secs(5)))
.set_response_timeout(Some(Duration::from_secs(5)))
.set_number_of_retries(6)
.set_max_delay(Duration::from_secs(10)),
)
.await?;
REDIS_CONNECT_CELL
.set(RedisConnect { connect })
.map_err(|_| anyhow::anyhow!("redis init error"))?;
log::info!("redis init ok");
}
Ok(())
}
#[inline]
fn retry_delay(attempt: usize) -> Duration {
const BASE_MS: u64 = 5;
const MAX_MS: u64 = 1_000;
let exp = 1u64 << attempt.min(6);
let backoff = BASE_MS.saturating_mul(exp).min(MAX_MS);
let jitter = ((attempt as u64 * 13) % 17) + 3;
Duration::from_millis(backoff.saturating_add(jitter))
}
#[inline]
pub async fn retry_redis_func<F, Fut, T>(mut func: F) -> Result<T>
where
F: FnMut() -> Fut,
Fut: Future<Output = RedisResult<T>>,
{
let mut retries = 0;
loop {
match func().await {
Ok(result) => return Ok(result),
Err(e) => {
if !e.is_connection_dropped() {
log::warn!(
"Redis operation failed with non-retryable error attempt:{} max_retries:{REDIS_RETRY_COUNT} error:{e}",
retries + 1
);
return Err(anyhow::anyhow!(e));
}
if retries >= REDIS_RETRY_COUNT {
return Err(anyhow::anyhow!(
"Redis operation failed after {} retries: {}",
REDIS_RETRY_COUNT,
e
));
}
retries += 1;
let delay = Self::retry_delay(retries);
log::warn!(
"Redis operation failed, retrying attempt:{retries} max_retries:{REDIS_RETRY_COUNT} error:{e}, next retry in {} ms",
delay.as_millis() as u64
);
tokio::time::sleep(delay).await;
}
}
}
}
#[inline]
fn get_redis_connect() -> ConnectionManager {
REDIS_CONNECT_CELL
.get()
.expect("not found redis connect init")
.connect
.clone()
}
pub async fn set_current_status(key: &str, drawing: i32, current_index: i64) -> Result<()> {
Self::retry_redis_func(|| async {
let mut connect = Self::get_redis_connect();
redis::pipe()
.hset(key, "drawing", drawing)
.hset(key, "current_index", current_index)
.query_async::<()>(&mut connect)
.await
})
.await
}
pub async fn get_current_status(key: &str) -> Result<(i32, i64)> {
let result = Self::retry_redis_func(|| async {
let mut connect = Self::get_redis_connect();
redis::pipe()
.hget(key, "drawing")
.hget(key, "current_index")
.query_async(&mut connect)
.await
})
.await;
match result {
Ok(Some((drawing, current_index))) => Ok((drawing, current_index)),
_ => {
Self::set_current_status(key, 0, 0).await?;
Ok((0, 0))
}
}
}
fn make_unfinish_key(
server_id: u32,
level_id: i32,
bet_money: f64,
tag: i32,
drawing: i32,
index: i64,
) -> String {
format!("unfinish_{server_id}_{level_id}_{bet_money}_{tag}_{drawing}_{index}")
}
pub async fn set_unfinish_index(
level_id: i32,
bet_money: f64,
tag: i32,
drawing: i32,
index: i64,
position: u32,
) -> Result<()> {
let key = Self::make_unfinish_key(
BASE_CONFIG.base.server_id,
level_id,
bet_money,
tag,
drawing,
index,
);
Self::retry_redis_func(|| async {
let mut connect = Self::get_redis_connect();
redis::cmd("set")
.arg(&key)
.arg(position)
.query_async::<()>(&mut connect)
.await
})
.await
}
pub async fn get_unfinish_index(
level_id: i32,
bet_money: f64,
tag: i32,
drawing: i32,
index: i64,
) -> Result<Option<u32>> {
let key = Self::make_unfinish_key(
BASE_CONFIG.base.server_id,
level_id,
bet_money,
tag,
drawing,
index,
);
Self::retry_redis_func(|| async {
let mut connect = Self::get_redis_connect();
let result: Option<u32> = redis::cmd("get")
.arg(&key)
.query_async(&mut connect)
.await?;
Ok(result)
})
.await
}
pub async fn delete_unfinish_by_server_id() -> Result<u32> {
let pattern = format!("unfinish_{}_*", BASE_CONFIG.base.server_id);
Self::retry_redis_func(|| async {
let mut connect = Self::get_redis_connect();
let mut cursor: u64 = 0;
let mut total_deleted = 0u32;
loop {
let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("scan")
.arg(cursor)
.arg("MATCH")
.arg(&pattern)
.arg("COUNT")
.arg(200)
.query_async(&mut connect)
.await?;
if !keys.is_empty() {
let deleted: u32 = redis::cmd("del")
.arg(keys)
.query_async(&mut connect)
.await?;
total_deleted += deleted;
}
if next_cursor == 0 {
break;
}
cursor = next_cursor;
}
Ok(total_deleted)
})
.await
}
pub async fn delete_unfinish_index(
level_id: i32,
bet_money: f64,
tag: i32,
drawing: i32,
index: i64,
) -> Result<u32> {
let key = Self::make_unfinish_key(
BASE_CONFIG.base.server_id,
level_id,
bet_money,
tag,
drawing,
index,
);
Self::retry_redis_func(|| async {
let mut connect = Self::get_redis_connect();
let deleted: u32 = redis::cmd("del")
.arg(&key)
.query_async(&mut connect)
.await?;
Ok(deleted)
})
.await
}
pub async fn get_all_unfinish_by_server() -> Result<Vec<UnfinishInfo>> {
let pattern = format!("unfinish_{}_*", BASE_CONFIG.base.server_id);
Self::retry_redis_func(|| async {
let mut connect = Self::get_redis_connect();
let mut cursor: u64 = 0;
let mut all_keys = Vec::new();
loop {
let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("scan")
.arg(cursor)
.arg("MATCH")
.arg(&pattern)
.arg("COUNT")
.arg(100)
.query_async(&mut connect)
.await?;
all_keys.extend(keys);
if next_cursor == 0 {
break;
}
cursor = next_cursor;
}
if all_keys.is_empty() {
return Ok(vec![]);
}
let indexes: Vec<Option<u32>> = redis::cmd("mget")
.arg(all_keys.clone())
.query_async(&mut connect)
.await?;
let mut result = Vec::new();
for (key, position_opt) in all_keys.into_iter().zip(indexes.into_iter()) {
if let Some(position) = position_opt {
let parts: Vec<&str> = key.split('_').collect();
if parts.len() == 7 {
if let (Ok(level_id), Ok(bet_money), Ok(tag), Ok(drawing), Ok(index)) = (
parts[2].parse::<i32>(),
parts[3].parse::<f64>(),
parts[4].parse::<i32>(),
parts[5].parse::<i32>(),
parts[6].parse::<i64>(),
) {
result.push(UnfinishInfo {
level_id,
bet_money,
tag,
drawing,
index,
position,
key: Default::default(),
});
}
}
}
}
Ok(result)
})
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio;
use tokio::sync::Mutex;
static LOCK_REDIS: Mutex<()> = Mutex::const_new(());
const TEST_LEVEL_ID: i32 = 101;
const TEST_BET_MONEY: f64 = 50.5;
const TEST_TAG: i32 = 9007;
const TEST_DRAWING: i32 = 1;
const TEST_INDEX: i64 = 10;
const TEST_POSITION: u32 = 888;
const TEST_POSITION2: u32 = 999;
async fn clean_test_keys() {
let _ = RedisConnect::delete_unfinish_by_server_id().await;
}
fn make_test_status_key(case_name: &str) -> String {
let nonce = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
format!(
"test:current_status:{}:{}:{}",
BASE_CONFIG.base.server_id, case_name, nonce
)
}
async fn cleanup_status_key(key: &str) {
let _ = RedisConnect::retry_redis_func(|| async {
let mut connect = RedisConnect::get_redis_connect();
let _: u32 = redis::cmd("del").arg(key).query_async(&mut connect).await?;
Ok(())
})
.await;
}
#[tokio::test]
async fn test_set_and_get_unfinish_index() {
let _lock = LOCK_REDIS.lock().await;
RedisConnect::init().await.unwrap();
clean_test_keys().await;
RedisConnect::set_unfinish_index(
TEST_LEVEL_ID,
TEST_BET_MONEY,
TEST_TAG,
TEST_DRAWING,
TEST_INDEX,
TEST_POSITION,
)
.await
.unwrap();
let idx = RedisConnect::get_unfinish_index(
TEST_LEVEL_ID,
TEST_BET_MONEY,
TEST_TAG,
TEST_DRAWING,
TEST_INDEX,
)
.await
.unwrap();
assert_eq!(idx, Some(TEST_POSITION));
clean_test_keys().await;
}
#[tokio::test]
async fn test_update_unfinish_index() {
let _lock = LOCK_REDIS.lock().await;
RedisConnect::init().await.unwrap();
clean_test_keys().await;
RedisConnect::set_unfinish_index(
TEST_LEVEL_ID,
TEST_BET_MONEY,
TEST_TAG,
TEST_DRAWING,
TEST_INDEX,
TEST_POSITION,
)
.await
.unwrap();
RedisConnect::set_unfinish_index(
TEST_LEVEL_ID,
TEST_BET_MONEY,
TEST_TAG,
TEST_DRAWING,
TEST_INDEX,
TEST_POSITION2,
)
.await
.unwrap();
let idx = RedisConnect::get_unfinish_index(
TEST_LEVEL_ID,
TEST_BET_MONEY,
TEST_TAG,
TEST_DRAWING,
TEST_INDEX,
)
.await
.unwrap();
assert_eq!(idx, Some(TEST_POSITION2));
clean_test_keys().await;
}
#[tokio::test]
async fn test_delete_unfinish_index() {
let _lock = LOCK_REDIS.lock().await;
RedisConnect::init().await.unwrap();
clean_test_keys().await;
RedisConnect::set_unfinish_index(
TEST_LEVEL_ID,
TEST_BET_MONEY,
TEST_TAG,
TEST_DRAWING,
TEST_INDEX,
TEST_POSITION,
)
.await
.unwrap();
let deleted = RedisConnect::delete_unfinish_index(
TEST_LEVEL_ID,
TEST_BET_MONEY,
TEST_TAG,
TEST_DRAWING,
TEST_INDEX,
)
.await
.unwrap();
assert_eq!(deleted, 1);
let idx = RedisConnect::get_unfinish_index(
TEST_LEVEL_ID,
TEST_BET_MONEY,
TEST_TAG,
TEST_DRAWING,
TEST_INDEX,
)
.await
.unwrap();
assert_eq!(idx, None);
clean_test_keys().await;
}
#[tokio::test]
async fn test_get_all_unfinish_by_server() {
let _lock = LOCK_REDIS.lock().await;
RedisConnect::init().await.unwrap();
clean_test_keys().await;
let mut expected = HashSet::new();
for i in 0..300 {
let drawing = TEST_DRAWING + i;
let index = TEST_INDEX + i as i64;
let position = TEST_POSITION + i as u32;
RedisConnect::set_unfinish_index(
TEST_LEVEL_ID,
TEST_BET_MONEY,
TEST_TAG,
drawing,
index,
position,
)
.await
.unwrap();
expected.insert((
TEST_LEVEL_ID,
TEST_BET_MONEY.to_bits(),
TEST_TAG,
drawing,
index,
position,
));
}
let all = RedisConnect::get_all_unfinish_by_server().await.unwrap();
let got: HashSet<_> = all
.into_iter()
.map(|u| {
(
u.level_id,
u.bet_money.to_bits(),
u.tag,
u.drawing,
u.index,
u.position,
)
})
.collect();
assert_eq!(got, expected);
clean_test_keys().await;
}
#[tokio::test]
async fn test_delete_unfinish_by_server_id() {
let _lock = LOCK_REDIS.lock().await;
RedisConnect::init().await.unwrap();
clean_test_keys().await;
for i in 0..300 {
let drawing = TEST_DRAWING + i;
let index = TEST_INDEX + i as i64;
let position = TEST_POSITION + i as u32;
RedisConnect::set_unfinish_index(
TEST_LEVEL_ID,
TEST_BET_MONEY,
TEST_TAG,
drawing,
index,
position,
)
.await
.unwrap();
}
let deleted = RedisConnect::delete_unfinish_by_server_id().await.unwrap();
assert!(deleted >= 300);
let all = RedisConnect::get_all_unfinish_by_server().await.unwrap();
assert!(all.is_empty());
}
#[tokio::test]
async fn test_get_unfinish_index_not_exist() {
let _lock = LOCK_REDIS.lock().await;
RedisConnect::init().await.unwrap();
clean_test_keys().await;
let idx = RedisConnect::get_unfinish_index(9999, 1.0, 1, 1, 1)
.await
.unwrap();
assert_eq!(idx, None);
}
#[tokio::test]
async fn test_get_all_unfinish_empty() {
let _lock = LOCK_REDIS.lock().await;
RedisConnect::init().await.unwrap();
clean_test_keys().await;
let all = RedisConnect::get_all_unfinish_by_server().await.unwrap();
assert!(all.is_empty());
}
#[tokio::test]
async fn test_current_status_set_then_get() {
RedisConnect::init().await.unwrap();
let key = make_test_status_key("set_then_get");
cleanup_status_key(&key).await;
RedisConnect::set_current_status(&key, 123, 456)
.await
.unwrap();
let (drawing, current_index) = RedisConnect::get_current_status(&key).await.unwrap();
assert_eq!(drawing, 123);
assert_eq!(current_index, 456);
cleanup_status_key(&key).await;
}
#[tokio::test]
async fn test_current_status_get_missing_key_init_default() {
RedisConnect::init().await.unwrap();
let key = make_test_status_key("missing_default");
cleanup_status_key(&key).await;
let (drawing, current_index) = RedisConnect::get_current_status(&key).await.unwrap();
assert_eq!((drawing, current_index), (0, 0));
let (drawing2, current_index2) = RedisConnect::get_current_status(&key).await.unwrap();
assert_eq!((drawing2, current_index2), (0, 0));
cleanup_status_key(&key).await;
}
#[tokio::test]
async fn test_current_status_set_overwrite() {
RedisConnect::init().await.unwrap();
let key = make_test_status_key("set_overwrite");
cleanup_status_key(&key).await;
RedisConnect::set_current_status(&key, 1, 10).await.unwrap();
RedisConnect::set_current_status(&key, 2, 20).await.unwrap();
let got = RedisConnect::get_current_status(&key).await.unwrap();
assert_eq!(got, (2, 20));
cleanup_status_key(&key).await;
}
}