use std::future::Future;
use std::time::Duration;
use redis::{AsyncCommands, RedisResult};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct KeyPrefix {
prefix: String,
}
impl KeyPrefix {
pub fn new(prefix: String) -> KeyPrefix {
KeyPrefix { prefix }
}
pub fn of(&self, name: &str) -> String {
let mut key = String::with_capacity(self.prefix.len() + name.len());
key.push_str(&self.prefix);
key.push_str(name);
key
}
pub fn and(&self, other: &str) -> KeyPrefix {
KeyPrefix::new(self.of(other))
}
pub fn concat(mut self, other: &str) -> KeyPrefix {
self.prefix.push_str(other);
self
}
}
impl From<String> for KeyPrefix {
fn from(prefix: String) -> KeyPrefix {
KeyPrefix::new(prefix)
}
}
impl From<&str> for KeyPrefix {
fn from(prefix: &str) -> KeyPrefix {
KeyPrefix::new(prefix.to_string())
}
}
impl From<KeyPrefix> for String {
fn from(key_prefix: KeyPrefix) -> String {
key_prefix.prefix
}
}
impl AsRef<str> for KeyPrefix {
fn as_ref(&self) -> &str {
&self.prefix
}
}
#[derive(Clone, Debug)]
pub struct Item {
pub id: String,
pub data: Box<[u8]>,
}
impl Item {
pub fn new(data: Box<[u8]>) -> Item {
Item {
data,
id: Uuid::new_v4().to_string(),
}
}
pub fn from_string_data(data: String) -> Item {
Item::new(data.into_bytes().into_boxed_slice())
}
pub fn from_json_data<T: Serialize>(data: &T) -> serde_json::Result<Item> {
Ok(Item::new(serde_json::to_vec(data)?.into()))
}
pub fn data_json<'a, T: Deserialize<'a>>(&'a self) -> serde_json::Result<T> {
serde_json::from_slice(&self.data)
}
pub fn data_json_static<T: for<'de> Deserialize<'de>>(&self) -> serde_json::Result<T> {
serde_json::from_slice(&self.data)
}
}
pub struct WorkQueue {
session: String,
main_queue_key: String,
processing_key: String,
lease_key: KeyPrefix,
item_data_key: KeyPrefix,
}
impl WorkQueue {
pub fn new(name: KeyPrefix) -> WorkQueue {
WorkQueue {
session: Uuid::new_v4().to_string(),
main_queue_key: name.of(":queue"),
processing_key: name.of(":processing"),
lease_key: name.and(":lease:"),
item_data_key: name.and(":item:"),
}
}
pub async fn add_item<C: AsyncCommands>(&self, db: &mut C, item: &Item) -> RedisResult<bool> {
let added = db
.set_nx(self.item_data_key.of(&item.id), item.data.as_ref())
.await?;
if added {
db.lpush(&self.main_queue_key, &item.id).await?;
}
Ok(added)
}
pub fn add_unique_item_to_pipeline(&self, pipeline: &mut redis::Pipeline, item: &Item) {
pipeline.set(self.item_data_key.of(&item.id), item.data.as_ref());
pipeline.lpush(&self.main_queue_key, &item.id);
}
pub async fn add_unique_item<C: AsyncCommands>(
&self,
db: &mut C,
item: &Item,
) -> RedisResult<()> {
let mut pipeline = Box::new(redis::pipe());
self.add_unique_item_to_pipeline(&mut pipeline, item);
pipeline.query_async(db).await
}
pub fn queue_len<'a, C: AsyncCommands>(
&'a self,
db: &'a mut C,
) -> impl Future<Output = RedisResult<usize>> + 'a {
db.llen(&self.main_queue_key)
}
pub fn processing<'a, C: AsyncCommands>(
&'a self,
db: &'a mut C,
) -> impl Future<Output = RedisResult<usize>> + 'a {
db.llen(&self.processing_key)
}
pub async fn lease<C: AsyncCommands>(
&self,
db: &mut C,
timeout: Option<Duration>,
lease_duration: Duration,
) -> RedisResult<Option<Item>> {
loop {
let Some(item_id): Option<String> = (match timeout {
Some(Duration::ZERO) => {
db.rpoplpush(&self.main_queue_key, &self.processing_key)
.await?
}
_ => {
db.brpoplpush(
&self.main_queue_key,
&self.processing_key,
timeout.map(|d| d.as_secs() as f64).unwrap_or(0.),
)
.await?
}
}) else {
return Ok(None);
};
let item_data: Vec<u8> = match db.get(self.item_data_key.of(&item_id)).await? {
Some(item_data) => item_data,
None if timeout == None => continue,
None => return Ok(None),
};
db.set_ex(
self.lease_key.of(&item_id),
&self.session,
lease_duration.as_secs(),
)
.await?;
return Ok(Some(Item {
data: item_data.into_boxed_slice(),
id: item_id,
}));
}
}
pub async fn complete<C: AsyncCommands>(&self, db: &mut C, item: &Item) -> RedisResult<bool> {
let (items_deleted, (), ()): (usize, (), ()) = redis::pipe()
.del(self.item_data_key.of(&item.id))
.lrem(&self.processing_key, 0, &item.id)
.del(self.lease_key.of(&item.id))
.query_async(db)
.await?;
Ok(items_deleted > 0)
}
}
#[cfg(test)]
mod tests {
use super::{Item, KeyPrefix};
use serde::{Deserialize, Serialize};
#[test]
fn test_key_prefix() {
let prefix = KeyPrefix::new("abc".to_string());
let another_prefix = prefix.and("123");
let final_prefix = KeyPrefix::new("abc123".to_string());
assert_eq!(another_prefix, final_prefix);
assert_ne!(prefix, another_prefix);
assert_eq!(another_prefix.as_ref(), final_prefix.as_ref());
assert_eq!(prefix.as_ref(), "abc");
assert_eq!(prefix.of("bar"), "abcbar");
assert_eq!(
Into::<String>::into(prefix.and("foo")),
"abcfoo".to_string()
);
assert_eq!(prefix.of("foo"), "abcfoo".to_string());
assert_eq!(prefix.and("foo").of("bar"), "abcfoobar".to_string());
}
#[test]
fn test_item_json() {
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
struct Test {
#[serde(default)]
n: usize,
s: String,
}
let test_foo = Test {
n: 7,
s: "foo".to_string(),
};
let test_bar = Test {
n: 8,
s: "bar".to_string(),
};
let test_baz = Test {
n: 0,
s: "baz".to_string(),
};
assert_eq!(
test_foo,
Item::from_json_data(&test_foo)
.unwrap()
.data_json()
.unwrap()
);
let test_item_bar = Item::from_json_data(&test_bar).unwrap();
assert_eq!(
test_item_bar.id.len(),
"00112233-4455-6677-8899-aabbccddeeff".len()
);
let test_item_baz = Item::new(
"{\"s\":\"baz\"}"
.to_string()
.into_bytes()
.into_boxed_slice(),
);
assert_eq!(
test_item_baz.id.len(),
"00112233-4455-6677-8899-aabbccddeeff".len()
);
assert_ne!(test_item_bar.id, test_item_baz.id);
assert_ne!(test_item_bar.data, test_item_baz.data);
assert_ne!(
test_item_bar.data_json::<Test>().unwrap(),
test_item_baz.data_json().unwrap()
);
assert_eq!(
test_item_bar.data_json::<Test>().unwrap(),
test_item_bar.data_json().unwrap()
);
assert_eq!(test_item_bar.data_json::<Test>().unwrap(), test_bar);
assert_eq!(test_item_baz.data_json::<Test>().unwrap(), test_baz);
}
}