use crate::FOXTIVE;
use crate::prelude::{AppResult, AppStateExt};
use crate::redis::conn::create_redis_connection;
use crate::results::redis_result::RedisResultToAppResult;
use anyhow::Error;
use futures_util::StreamExt;
use redis::{AsyncCommands, FromRedisValue, ToRedisArgs, ToSingleRedisArg};
use serde::Serialize;
use std::future::Future;
use std::num::{NonZeroU64, NonZeroUsize};
use std::time::Duration;
use tokio::runtime::Handle;
use tokio::time;
use tracing::{error, info};
pub mod config;
pub mod conn;
pub struct Redis {
pool: deadpool_redis::Pool,
}
impl Redis {
pub fn new(pool: deadpool_redis::Pool) -> Self {
Self { pool }
}
pub async fn redis(&self) -> AppResult<deadpool_redis::Connection> {
self.pool.get().await.map_err(Error::msg)
}
pub async fn queue<T>(&self, queue: &str, data: &T) -> AppResult<i32>
where
T: ToRedisArgs + Send + Sync,
{
let mut conn = self.redis().await?;
conn.lpush(queue, data).await.into_app_result()
}
pub async fn set<T>(&self, key: &str, value: &T) -> AppResult<String>
where
T: ToSingleRedisArg + Send + Sync,
{
let mut conn = self.redis().await?;
conn.set(key, value).await.into_app_result()
}
pub async fn get<T: FromRedisValue>(&self, key: &str) -> AppResult<T> {
let mut conn = self.redis().await?;
conn.get(key).await.into_app_result()
}
pub async fn delete(&self, key: &str) -> AppResult<i32> {
let mut conn = self.redis().await?;
conn.del(key).await.into_app_result()
}
pub async fn delete_by_pattern(&self, pattern: &str) -> AppResult<u32> {
let mut conn = self.redis().await?;
let keys: Vec<String> = conn.keys(pattern).await?;
if keys.is_empty() {
return Ok(0);
}
conn.del(keys).await.into_app_result()
}
pub async fn publish<T: Serialize>(&self, channel: &str, data: &T) -> AppResult<i32> {
let content = serde_json::to_string(data)?;
let mut conn = self.redis().await?;
conn.publish(channel, content).await.into_app_result()
}
pub async fn rpop<V: FromRedisValue>(
&self,
key: &str,
count: Option<NonZeroUsize>,
) -> AppResult<V> {
let mut conn = self.redis().await?;
conn.rpop(key, count).await.into_app_result()
}
pub async fn rpush<T: Serialize>(&self, queue: &str, data: &T) -> AppResult<i32> {
let content = serde_json::to_string(data)?;
let mut conn = self.redis().await?;
conn.rpush(queue, content).await.into_app_result()
}
pub async fn lpop<V: FromRedisValue>(
&self,
key: &str,
count: Option<NonZeroUsize>,
) -> AppResult<V> {
let mut conn = self.redis().await?;
conn.lpop(key, count).await.into_app_result()
}
pub async fn sadd<T: Serialize>(&self, key: &str, value: &T) -> AppResult<i32> {
let content = serde_json::to_string(value)?;
let mut conn = self.redis().await?;
conn.sadd(key, content).await.into_app_result()
}
pub async fn spop<V: FromRedisValue>(&self, key: &str) -> AppResult<V> {
let mut conn = self.redis().await?;
conn.spop(key).await.into_app_result()
}
pub async fn zadd<T: Serialize>(&self, key: &str, score: f64, value: &T) -> AppResult<i32> {
let content = serde_json::to_string(value)?;
let mut conn = self.redis().await?;
conn.zadd(key, score, content).await.into_app_result()
}
pub async fn zpopmin(&self, key: &str, count: isize) -> AppResult<Option<(String, f64)>> {
let mut conn = self.redis().await?;
conn.zpopmin(key, count).await.into_app_result()
}
pub async fn zpopmax(&self, key: &str, count: isize) -> AppResult<Option<(String, f64)>> {
let mut conn = self.redis().await?;
conn.zpopmax(key, count).await.into_app_result()
}
pub async fn blpop<V: FromRedisValue>(&self, key: &str, timeout: f64) -> AppResult<V> {
let mut conn = self.redis().await?;
conn.blpop(key, timeout).await.into_app_result()
}
pub async fn brpop<V: FromRedisValue>(&self, key: &str, timeout: f64) -> AppResult<V> {
let mut conn = self.redis().await?;
conn.brpop(key, timeout).await.into_app_result()
}
pub async fn lrange<T: FromRedisValue>(
&self,
key: &str,
start: isize,
stop: isize,
) -> AppResult<Vec<T>> {
let mut conn = self.redis().await?;
conn.lrange(key, start, stop).await.into_app_result()
}
pub async fn lrem<T: Serialize>(&self, key: &str, count: isize, value: &T) -> AppResult<i32> {
let content = serde_json::to_string(value)?;
let mut conn = self.redis().await?;
conn.lrem(key, count, content).await.into_app_result()
}
pub async fn flush_all(&self) -> AppResult<()> {
let mut conn = self.redis().await?;
redis::cmd("FLUSHALL")
.query_async(&mut *conn)
.await
.into_app_result()
}
pub async fn flush_db(&self) -> AppResult<()> {
let mut conn = self.redis().await?;
redis::cmd("FLUSHDB")
.query_async(&mut *conn)
.await
.into_app_result()
}
pub async fn poll_queue<F, Fut>(
queue: String,
interval: Option<NonZeroU64>,
len: Option<NonZeroUsize>,
mut func: F,
) where
F: FnMut(String) -> Fut + Send + Copy + 'static,
Fut: Future<Output = AppResult<()>> + Send + 'static,
{
info!("[queue] polling: {queue}");
let mut interval = time::interval(Duration::from_micros(
interval.map(|v| v.get()).unwrap_or(500_000),
));
loop {
match FOXTIVE.redis().rpop(&queue, len).await {
Ok(Some(item)) => {
let queue_clone = queue.clone();
Handle::current().spawn(async move {
if let Err(err) = func(item).await {
error!("[queue][{queue_clone}] executor error: {err:?}");
}
});
}
Ok(None) | Err(_) => {
interval.tick().await;
}
}
}
}
pub async fn subscribe<F, Fut>(channel: String, dns: String, mut func: F) -> AppResult<()>
where
F: FnMut(AppResult<String>) -> Fut + Copy + Send + 'static,
Fut: Future<Output = AppResult<()>> + Send + 'static,
{
info!("[subscriber] establishing connection...");
let client = create_redis_connection(&dns)?;
let mut pubsub = client.get_async_pubsub().await?;
info!("[subscriber] subscribing to: {channel}");
pubsub.subscribe(std::slice::from_ref(&channel)).await?;
let mut stream = pubsub.into_on_message();
while let Some(msg) = stream.next().await {
let channel_clone = channel.clone();
Handle::current().spawn(async move {
let received = msg.get_payload::<String>().into_app_result();
if let Err(err) = func(received).await {
error!("[subscriber][{channel_clone}] executor error: {err:?}");
}
});
}
Ok(())
}
pub async fn keys(&self) -> AppResult<Vec<String>> {
self.keys_by_pattern("*").await
}
pub async fn keys_by_pattern(&self, pattern: &str) -> AppResult<Vec<String>> {
let mut conn = self.redis().await?;
conn.keys(pattern).await.into_app_result()
}
}