extern crate redis;
use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
use crate::backend::{CommitBackend, DequeuBackend, EnqueuBackend};
use crate::backends::redis::RedisBackend;
#[derive(Debug)]
pub enum JsonRedisError {
Redis(redis::RedisError),
Serde(serde_json::Error),
}
#[derive(Clone)]
pub struct JsonRedisBackend<T> {
backend: RedisBackend,
_phantom: std::marker::PhantomData<T>,
}
impl<T> JsonRedisBackend<T> {
pub fn new(backend: RedisBackend) -> Self {
Self {
backend,
_phantom: std::marker::PhantomData,
}
}
pub async fn is_redis_version_ok(&self) -> Result<bool, redis::RedisError> {
self.backend.is_redis_version_ok().await
}
}
impl<T> JsonRedisBackend<T>
where
T: Serialize + Send + Sync,
{
pub async fn write(&self, data: &T, score: &f64) -> Result<(), JsonRedisError> {
let data = match serde_json::to_string(&data) {
Ok(data) => data,
Err(e) => return Err(JsonRedisError::Serde(e)),
};
match self.backend.write(&data, score).await {
Ok(_) => Ok(()),
Err(e) => Err(JsonRedisError::Redis(e)),
}
}
pub async fn delete(&self, data: &T) -> Result<(), JsonRedisError> {
if self.backend.autodelete {
return Ok(());
}
let data = match serde_json::to_string(&data) {
Ok(data) => data,
Err(e) => return Err(JsonRedisError::Serde(e)),
};
match self.backend.commit(&data).await {
Ok(_) => Ok(()),
Err(e) => Err(JsonRedisError::Redis(e)),
}
}
}
impl<T> JsonRedisBackend<T>
where
T: DeserializeOwned + Send + Sync,
{
pub async fn read(&self, score: &f64) -> Result<Vec<serde_json::Result<T>>, redis::RedisError> {
let data = self.backend.read(score).await?;
Ok(data.into_iter().map(|d| serde_json::from_str(&d)).collect())
}
}
#[async_trait]
impl<T, 'a> CommitBackend<'a, T, JsonRedisError> for JsonRedisBackend<T>
where
T: Serialize + Send + Sync,
{
async fn commit(&self, data: &'a T) -> Result<(), JsonRedisError> {
self.delete(data).await
}
}
#[async_trait]
impl<T, 'a> EnqueuBackend<'a, T, f64, JsonRedisError> for JsonRedisBackend<T>
where
T: Serialize + Send + Sync,
{
async fn enqueue(&self, data: &'a T, score: &'a f64) -> Result<(), JsonRedisError> {
self.write(data, score).await
}
}
#[async_trait]
impl<T, 'a> DequeuBackend<'a, serde_json::Result<T>, f64, redis::RedisError> for JsonRedisBackend<T>
where
T: DeserializeOwned + Send + Sync,
{
async fn dequeue(
&self,
score: &'a f64,
) -> Result<Vec<serde_json::Result<T>>, redis::RedisError> {
self.read(score).await
}
}