use redis::{AsyncCommands, FromRedisValue, RedisError, ToRedisArgs};
static POP_SCHEDULE_SCRIPT: &str = r#"
local key = KEYS[1]
local unix_ts = ARGV[1]
local limit = ARGV[2]
local autodelete = ARGV[3] == '1'
local res = redis.call('zrange', key, '-inf', unix_ts, 'byscore', 'limit', 0, limit)
if autodelete then
for _, raw in ipairs(res) do
redis.call('zrem', key, raw)
end
end
return res
"#;
#[derive(Debug, Clone)]
pub struct TasklineConfig<K: ToRedisArgs + Clone> {
queue_key: K,
read_batch_size: usize,
autodelete: bool,
}
impl<K: ToRedisArgs + Default + Clone> Default for TasklineConfig<K> {
fn default() -> Self {
Self {
queue_key: K::default(),
read_batch_size: 50,
autodelete: true,
}
}
}
impl<K: ToRedisArgs + Default + Clone> TasklineConfig<K> {
pub fn new() -> Self {
Self::default()
}
pub fn queue_key(mut self, queue_key: impl Into<K>) -> Self {
self.queue_key = queue_key.into();
self
}
pub fn read_batch_size(mut self, read_batch_size: usize) -> Self {
self.read_batch_size = read_batch_size;
self
}
pub fn autodelete(mut self, autodelete: bool) -> Self {
self.autodelete = autodelete;
self
}
pub fn build(self) -> Self {
Self {
queue_key: self.queue_key,
read_batch_size: self.read_batch_size,
autodelete: self.autodelete,
}
}
}
#[derive(Debug, Clone)]
pub struct Taskline<K: ToRedisArgs + Clone> {
config: TasklineConfig<K>,
client: redis::Client,
pop_schedule_script: redis::Script,
}
impl<K: ToRedisArgs + Clone> Taskline<K> {
pub fn new(config: TasklineConfig<K>, client: redis::Client) -> Self {
Self {
config,
client,
pop_schedule_script: redis::Script::new(POP_SCHEDULE_SCRIPT),
}
}
pub async fn read<S: ToRedisArgs, R: FromRedisValue>(&self, score: S) -> Result<R, RedisError> {
let mut con = self.client.get_multiplexed_async_connection().await?;
let result: R = self
.pop_schedule_script
.key(self.config.queue_key.clone())
.arg(score)
.arg(self.config.read_batch_size)
.arg(self.config.autodelete as u8)
.invoke_async(&mut con)
.await?;
Ok(result)
}
}
impl<K: ToRedisArgs + Clone + Send + Sync> Taskline<K> {
pub async fn write<M: ToRedisArgs + Send + Sync, S: ToRedisArgs + Send + Sync>(
&self,
task: M,
score: S,
) -> Result<(), RedisError> {
let mut con = self.client.get_multiplexed_async_connection().await?;
con.zadd(self.config.queue_key.clone(), task, score).await
}
pub async fn delete<M: ToRedisArgs + Send + Sync>(&self, task: M) -> Result<(), RedisError> {
if self.config.autodelete {
return Ok(());
}
let mut con = self.client.get_multiplexed_async_connection().await?;
con.zrem(self.config.queue_key.clone(), task).await
}
}