use crate::{Bitmap, FiltersInfo};
use redis::cluster::ClusterClient;
use redis::{AsyncCommands, Client, IntoConnectionInfo};
use std::collections::{HashMap, HashSet};
use wd_tools::PFOk;
#[derive(Clone)]
#[allow(dead_code)]
pub enum RedisClient {
CLUSTER(ClusterClient),
SINGLE(Client),
}
impl TryFrom<&str> for RedisClient {
type Error = anyhow::Error;
fn try_from(value: &str) -> Result<Self, Self::Error> {
let client = Client::open(value)?;
Ok(RedisClient::SINGLE(client))
}
}
impl TryFrom<Vec<String>> for RedisClient {
type Error = anyhow::Error;
fn try_from(value: Vec<String>) -> Result<Self, Self::Error> {
let cs = ClusterClient::new(value)?;
Ok(RedisClient::CLUSTER(cs))
}
}
pub struct BitmapRedis {
client: RedisClient,
}
impl From<RedisClient> for BitmapRedis {
fn from(client: RedisClient) -> Self {
Self { client }
}
}
impl BitmapRedis {
#[allow(dead_code)]
pub fn new_from_cluster(client: ClusterClient) -> Self {
let client = RedisClient::CLUSTER(client);
Self { client }
}
#[allow(dead_code)]
pub fn redis_single_node(url: &str) -> anyhow::Result<Self> {
let client = redis::Client::open(url)?;
let client = RedisClient::SINGLE(client);
Ok(Self { client })
}
#[allow(dead_code)]
pub fn redis_cluster<T: IntoConnectionInfo>(
nodes: impl IntoIterator<Item = T>,
) -> anyhow::Result<ClusterClient> {
let client = ClusterClient::new(nodes)?;
Ok(client)
}
}
#[async_trait::async_trait]
impl Bitmap for BitmapRedis {
async fn set(&self, key: &str, offset: usize, value: bool) -> anyhow::Result<()> {
return match self.client {
RedisClient::CLUSTER(ref clu) => {
let mut conn = clu.get_async_connection().await?;
let _ = conn.setbit(key, offset, value).await?;
Ok(())
}
RedisClient::SINGLE(ref sin) => {
let mut conn = sin.get_async_connection().await?;
let _ = conn.setbit(key, offset, value).await?;
Ok(())
}
};
}
async fn get(&self, key: &str, offset: usize) -> anyhow::Result<bool> {
return match self.client {
RedisClient::CLUSTER(ref clu) => {
let mut conn = clu.get_async_connection().await?;
let result: bool = conn.getbit(key, offset).await?;
Ok(result)
}
RedisClient::SINGLE(ref sin) => {
let mut conn = sin.get_async_connection().await?;
let result: bool = conn.getbit(key, offset).await?;
Ok(result)
}
};
}
async fn mul_set(&self, key: &str, list: HashSet<usize>) -> anyhow::Result<()> {
return match self.client {
RedisClient::CLUSTER(ref clu) => {
let mut conn = clu.get_async_connection().await?;
let result: Option<Vec<u8>> = conn.get(key).await?;
let mut buf = if let Some(s) = result { s } else { vec![] };
for i in list {
let l = i / 8;
if l >= buf.len() {
let mut avec = vec![0u8; l - buf.len() + 1];
buf.append(&mut avec);
}
buf[l] |= 0x80 >> (i % 8)
}
conn.set(key, buf).await?;
Ok(())
}
RedisClient::SINGLE(ref sin) => {
let mut conn = sin.get_async_connection().await?;
let result: Option<Vec<u8>> = conn.get(key).await?;
let mut buf = if let Some(s) = result { s } else { vec![] };
for i in list {
let l = i / 8;
if l >= buf.len() {
let mut avec = vec![0u8; l - buf.len() + 1];
buf.append(&mut avec);
}
buf[l] |= 0x80 >> (i % 8)
}
conn.set(key, buf).await?;
Ok(())
}
};
}
async fn mul_get(&self, key: &str) -> anyhow::Result<Vec<u8>> {
return match self.client {
RedisClient::CLUSTER(ref clu) => {
let mut conn = clu.get_async_connection().await?;
let result: Option<Vec<u8>> = conn.get(key).await?;
let buf = if let Some(s) = result { s } else { vec![] };
Ok(buf)
}
RedisClient::SINGLE(ref sin) => {
let mut conn = sin.get_async_connection().await?;
let result: Option<Vec<u8>> = conn.get(key).await?;
let buf = if let Some(s) = result { s } else { vec![] };
Ok(buf)
}
};
}
}
#[derive(Clone)]
pub struct FilterInfoRedis {
client: RedisClient,
}
impl From<RedisClient> for FilterInfoRedis {
fn from(client: RedisClient) -> Self {
Self { client }
}
}
impl FilterInfoRedis {
#[allow(dead_code)]
pub fn new_from_cluster(client: ClusterClient) -> Self {
let client = RedisClient::CLUSTER(client);
Self { client }
}
#[allow(dead_code)]
pub fn redis_single_node(url: &str) -> anyhow::Result<Self> {
let client = Client::open(url)?;
let client = RedisClient::SINGLE(client);
Ok(Self { client })
}
}
#[async_trait::async_trait]
impl FiltersInfo for FilterInfoRedis {
async fn list(&self, group: &str) -> anyhow::Result<Vec<(String, usize)>> {
match self.client {
RedisClient::CLUSTER(ref clu) => {
let mut client = clu.get_async_connection().await?;
let result: Option<HashMap<String, usize>> = client.hgetall(group).await?;
let map = result.unwrap_or(HashMap::new());
let mut list = vec![];
for (k, v) in map.into_iter() {
list.push((k, v));
}
list.sort_by(|(a, _), (b, _)| a.cmp(b));
Ok(list)
}
RedisClient::SINGLE(ref sin) => {
let mut client = sin.get_async_connection().await?;
let result: Option<HashMap<String, usize>> = client.hgetall(group).await?;
let map = result.unwrap_or(HashMap::new());
let mut list = vec![];
for (k, v) in map.into_iter() {
list.push((k, v));
}
list.sort_by(|(a, _), (b, _)| a.cmp(b));
Ok(list)
}
}
}
async fn count(&self, group: &str, key: &str) -> anyhow::Result<usize> {
match self.client {
RedisClient::CLUSTER(ref clu) => {
let mut client = clu.get_async_connection().await?;
let result: Option<usize> = client.hget(group, key).await?;
result.unwrap_or(0).ok()
}
RedisClient::SINGLE(ref sin) => {
let mut client = sin.get_async_connection().await?;
let result: Option<usize> = client.hget(group, key).await?;
result.unwrap_or(0).ok()
}
}
}
async fn add(&self, group: &str, key: &str, count: usize) -> anyhow::Result<()> {
match self.client {
RedisClient::CLUSTER(ref clu) => {
let mut client = clu.get_async_connection().await?;
let _: isize = client.hincr(group, key, count).await?;
}
RedisClient::SINGLE(ref sin) => {
let mut client = sin.get_async_connection().await?;
let _: isize = client.hincr(group, key, count).await?;
}
}
Ok(())
}
}