#![forbid(unsafe_code)]
mod error;
pub use error::{RsmqError, RsmqResult};
use bb8_redis::{
bb8,
redis::{cmd, pipe, Script},
RedisConnectionManager,
};
use lazy_static::lazy_static;
use radix_fmt::radix_36;
use rand::seq::IteratorRandom;
use serde::{de::DeserializeOwned, ser::Serialize};
use std::{borrow::Cow, marker::PhantomData};
#[derive(Debug)]
struct QueueDescriptor {
vt: u64,
delay: u64,
maxsize: i64,
ts: u64,
uid: Option<String>,
}
#[derive(Debug, Clone)]
pub struct RsmqOptions {
pub host: String,
pub port: String,
pub db: u8,
pub realtime: bool,
pub username: Option<String>,
pub password: Option<String>,
pub ns: String,
}
impl Default for RsmqOptions {
fn default() -> Self {
RsmqOptions {
host: "localhost".to_string(),
port: "6379".to_string(),
db: 0,
realtime: false,
username: None,
password: None,
ns: "rsmq".to_string(),
}
}
}
#[derive(Debug, Clone)]
pub struct RsmqMessage<T: Serialize + DeserializeOwned + Clone> {
pub id: String,
pub message: T,
pub rc: u64,
pub fr: u64,
pub sent: u64,
}
#[derive(Debug, Clone)]
pub struct RsmqQueueAttributes {
pub vt: u64,
pub delay: u64,
pub maxsize: i64,
pub totalrecv: u64,
pub totalsent: u64,
pub created: u64,
pub modified: u64,
pub msgs: u64,
pub hiddenmsgs: u64,
}
lazy_static! {
static ref CHANGE_MESSAGE_VISIVILITY: Script =
Script::new(include_str!("./redis-scripts/changeMessageVisibility.lua"));
static ref POP_MESSAGE: Script = Script::new(include_str!("./redis-scripts/popMessage.lua"));
static ref RECEIVE_MESSAGE: Script =
Script::new(include_str!("./redis-scripts/receiveMessage.lua"));
}
#[derive(Debug, Clone)]
pub struct Rsmq<T: Serialize + DeserializeOwned + Clone> {
pool: bb8::Pool<RedisConnectionManager>,
options: RsmqOptions,
_marker: PhantomData<T>,
}
impl<T> Rsmq<T>
where
T: Serialize + DeserializeOwned + Clone,
{
pub async fn new(options: RsmqOptions) -> RsmqResult<Rsmq<T>> {
let auth: Cow<'_, str> = match (options.username.as_ref(), options.password.as_ref()) {
(Some(username), Some(password)) => format!("{}:{}@", username, password).into(),
(None, Some(password)) => format!("redis:{}@", password).into(),
_ => "".into(),
};
let url = format!(
"redis://{}{}:{}/{}",
auth, options.host, options.port, options.db
);
let manager = RedisConnectionManager::new(url)?;
let pool = bb8::Pool::builder().build(manager).await?;
Ok(Rsmq::new_with_pool(options, pool))
}
pub fn new_with_pool(options: RsmqOptions, pool: bb8::Pool<RedisConnectionManager>) -> Rsmq<T> {
Rsmq {
pool,
options,
_marker: PhantomData,
}
}
pub async fn create_queue(
&mut self,
qname: &str,
seconds_hidden: Option<u64>,
delay: Option<u64>,
maxsize: Option<i64>,
) -> RsmqResult<()> {
valid_name_format(qname)?;
let key = format!("{}{}:Q", self.options.ns, qname);
let seconds_hidden = seconds_hidden.unwrap_or(30);
let delay = delay.unwrap_or(0);
let maxsize = maxsize.unwrap_or(65536);
number_in_range(seconds_hidden, 0, 9_999_999)?;
number_in_range(delay, 0, 9_999_999)?;
if maxsize != -1 {
number_in_range(maxsize, 1024, 65536)?;
}
let mut conn = self.pool.get().await?;
let time: (u64, u64) = cmd("TIME").query_async(&mut *conn).await?;
let results: Vec<u64> = pipe()
.atomic()
.cmd("SADD")
.arg(format!("{}QUEUES", self.options.ns))
.arg(qname)
.cmd("HSETNX")
.arg(&key)
.arg("vt")
.arg(seconds_hidden)
.cmd("HSETNX")
.arg(&key)
.arg("delay")
.arg(delay)
.cmd("HSETNX")
.arg(&key)
.arg("maxsize")
.arg(maxsize)
.cmd("HSETNX")
.arg(&key)
.arg("created")
.arg(time.0)
.cmd("HSETNX")
.arg(&key)
.arg("modified")
.arg(time.0)
.cmd("HSETNX")
.arg(&key)
.arg("totalrecv")
.arg(0_i32)
.cmd("HSETNX")
.arg(&key)
.arg("totalsent")
.arg(0_i32)
.query_async(&mut *conn)
.await?;
if results[0] != 1 {
return Err(RsmqError::QueueExists);
}
Ok(())
}
pub async fn delete_queue(&mut self, qname: &str) -> RsmqResult<()> {
let mut conn = self.pool.get().await?;
let key = format!("{}{}", self.options.ns, qname);
let results: (u16, u16) = pipe()
.atomic()
.cmd("SREM")
.arg(format!("{}QUEUES", self.options.ns))
.arg(qname)
.cmd("DEL")
.arg(format!("{}:Q", &key))
.arg(key)
.query_async(&mut *conn)
.await?;
if results.0 != 1 {
return Err(RsmqError::QueueNotFound);
}
Ok(())
}
pub async fn has_queue(&mut self, qname: &str) -> RsmqResult<bool> {
let mut conn = self.pool.get().await?;
Ok(cmd("SISMEMBER")
.arg(format!("{}QUEUES", self.options.ns))
.arg(qname)
.query_async(&mut *conn)
.await?)
}
pub async fn list_queues(&mut self) -> RsmqResult<Vec<String>> {
let mut conn = self.pool.get().await?;
Ok(cmd("SMEMBERS")
.arg(format!("{}QUEUES", self.options.ns))
.query_async(&mut *conn)
.await?)
}
pub async fn send_message(
&mut self,
qname: &str,
message: &T,
delay: Option<u64>,
) -> RsmqResult<String> {
let queue = self.get_queue(qname, true).await?;
let mut conn = self.pool.get().await?;
let delay = delay.unwrap_or(queue.delay) * 1000;
let key = format!("{}{}", self.options.ns, qname);
number_in_range(delay, 0, 9_999_999)?;
let message = serde_json::to_string(message)?;
if queue.maxsize != -1 && message.as_bytes().len() as i64 > queue.maxsize {
return Err(RsmqError::MessageTooLong);
}
let queue_uid = queue.uid.unwrap();
let queue_key = format!("{}:Q", key);
let mut pipeline = pipe();
let mut commands = pipeline
.atomic()
.cmd("ZADD")
.arg(&key)
.arg(queue.ts + delay)
.arg(&queue_uid)
.cmd("HSET")
.arg(&queue_key)
.arg(&queue_uid)
.arg(message)
.cmd("HINCRBY")
.arg(&queue_key)
.arg("totalsent")
.arg(1_u64);
if self.options.realtime {
commands = commands.cmd("ZCARD").arg(&key);
}
let results: Vec<u64> = commands.query_async(&mut *conn).await?;
if self.options.realtime {
cmd("PUBLISH")
.arg(format!("{}rt:{}", self.options.ns, qname))
.arg(results[3])
.query_async(&mut *conn)
.await?;
}
Ok(queue_uid)
}
pub async fn delete_message(&mut self, qname: &str, id: &str) -> RsmqResult<bool> {
let mut conn = self.pool.get().await?;
let key = format!("{}{}", self.options.ns, qname);
let results: (u16, u16) = pipe()
.atomic()
.cmd("ZREM")
.arg(&key)
.arg(id)
.cmd("HDEL")
.arg(format!("{}:Q", &key))
.arg(id)
.arg(format!("{}:rc", id))
.arg(format!("{}:fr", id))
.query_async(&mut *conn)
.await?;
if results.0 == 1 && results.1 > 0 {
return Ok(true);
}
Ok(false)
}
pub async fn change_message_visibility(
&mut self,
qname: &str,
message_id: &str,
seconds_hidden: u64,
) -> RsmqResult<()> {
let queue = self.get_queue(qname, false).await?;
let mut conn = self.pool.get().await?;
number_in_range(seconds_hidden, 0, 9_999_999)?;
CHANGE_MESSAGE_VISIVILITY
.key(format!("{}{}", self.options.ns, qname))
.key(message_id)
.key(queue.ts + seconds_hidden * 1000)
.invoke_async::<_, bool>(&mut *conn)
.await?;
Ok(())
}
pub async fn pop_message(&mut self, qname: &str) -> RsmqResult<Option<RsmqMessage<T>>> {
let queue = self.get_queue(qname, false).await?;
let mut conn = self.pool.get().await?;
let results: (bool, String, String, u64, u64) = POP_MESSAGE
.key(format!("{}{}", self.options.ns, qname))
.key(queue.ts)
.invoke_async(&mut *conn)
.await?;
if !results.0 {
return Ok(None);
}
Ok(Some(RsmqMessage {
id: results.1.clone(),
message: serde_json::from_str::<T>(&results.2)?,
rc: results.3,
fr: results.4,
sent: u64::from_str_radix(&results.1[0..10], 36).unwrap_or(0),
}))
}
pub async fn receive_message(
&mut self,
qname: &str,
seconds_hidden: Option<u64>,
) -> RsmqResult<Option<RsmqMessage<T>>> {
let queue = self.get_queue(qname, false).await?;
let mut conn = self.pool.get().await?;
let seconds_hidden = seconds_hidden.unwrap_or(queue.vt) * 1000;
number_in_range(seconds_hidden, 0, 9_999_999_000)?;
let results: (bool, String, String, u64, u64) = RECEIVE_MESSAGE
.key(format!("{}{}", self.options.ns, qname))
.key(queue.ts)
.key(queue.ts + seconds_hidden)
.invoke_async(&mut *conn)
.await?;
if !results.0 {
return Ok(None);
}
Ok(Some(RsmqMessage {
id: results.1.clone(),
message: serde_json::from_str::<T>(&results.2)?,
rc: results.3,
fr: results.4,
sent: u64::from_str_radix(&results.1[0..10], 36).unwrap_or(0),
}))
}
pub async fn set_queue_attributes(
&mut self,
qname: &str,
seconds_hidden: Option<u64>,
delay: Option<u64>,
maxsize: Option<i64>,
) -> RsmqResult<RsmqQueueAttributes> {
self.get_queue(qname, false).await?;
{
let mut conn = self.pool.get().await?;
let queue_name = format!("{}{}:Q", self.options.ns, qname);
let time: (u64, u64) = cmd("TIME").query_async(&mut *conn).await?;
let mut commands = &mut pipe();
commands = commands
.atomic()
.cmd("HSET")
.arg(&queue_name)
.arg("modified")
.arg(time.0);
if let Some(duration) = seconds_hidden {
number_in_range(duration, 0, 9_999_999)?;
commands = commands
.cmd("HSET")
.arg(&queue_name)
.arg("vt")
.arg(duration);
}
if let Some(delay) = delay {
number_in_range(delay, 0, 9_999_999)?;
commands = commands
.cmd("HSET")
.arg(&queue_name)
.arg("delay")
.arg(delay);
}
if let Some(maxsize) = maxsize {
if maxsize != -1 {
number_in_range(maxsize, 1024, 65536)?;
}
commands = commands
.cmd("HSET")
.arg(&queue_name)
.arg("maxsize")
.arg(maxsize);
}
commands.query_async(&mut *conn).await?;
}
self.get_queue_attributes(qname).await
}
pub async fn get_queue_attributes(&mut self, qname: &str) -> RsmqResult<RsmqQueueAttributes> {
let mut conn = self.pool.get().await?;
let key = format!("{}{}", self.options.ns, qname);
let time: (u64, u64) = cmd("TIME").query_async(&mut *conn).await?;
let result: (Vec<u64>, u64, u64) = pipe()
.cmd("HMGET")
.arg(format!("{}:Q", key))
.arg("vt")
.arg("delay")
.arg("maxsize")
.arg("totalrecv")
.arg("totalsent")
.arg("created")
.arg("modified")
.cmd("ZCARD")
.arg(&key)
.cmd("ZCOUNT")
.arg(&key)
.arg(time.0 * 1000)
.arg("+inf")
.query_async(&mut *conn)
.await?;
if result.0.is_empty() {
return Err(RsmqError::QueueNotFound);
}
Ok(RsmqQueueAttributes {
vt: *result.0.get(0).unwrap_or(&0),
delay: *result.0.get(1).unwrap_or(&0),
maxsize: *result.0.get(2).unwrap_or(&0) as i64,
totalrecv: *result.0.get(3).unwrap_or(&0),
totalsent: *result.0.get(4).unwrap_or(&0),
created: *result.0.get(5).unwrap_or(&0),
modified: *result.0.get(6).unwrap_or(&0),
msgs: result.1,
hiddenmsgs: result.2,
})
}
async fn get_queue(&mut self, qname: &str, uid: bool) -> RsmqResult<QueueDescriptor> {
let mut conn = self.pool.get().await?;
let result: (Vec<String>, (u64, u64)) = pipe()
.cmd("HMGET")
.arg(format!("{}{}:Q", self.options.ns, qname))
.arg("vt")
.arg("delay")
.arg("maxsize")
.cmd("TIME")
.query_async(&mut *conn)
.await?;
let time_seconds = (result.1).0;
let time_microseconds = (result.1).1;
let (hmget_first, hmget_second, hmget_third) =
match (result.0.get(0), result.0.get(1), result.0.get(2)) {
(Some(v0), Some(v1), Some(v2)) => (v0, v1, v2),
_ => return Err(RsmqError::QueueNotFound),
};
let ts = time_seconds * 1000 + time_microseconds / 1000;
let quid = if uid {
Some(radix_36(ts).to_string() + &Rsmq::<T>::make_id(22))
} else {
None
};
Ok(QueueDescriptor {
vt: hmget_first.parse().expect("cannot parse queue vt"),
delay: hmget_second.parse().expect("cannot parse queue delay"),
maxsize: hmget_third.parse().expect("cannot parse queue maxsize"),
ts,
uid: quid,
})
}
fn make_id(len: usize) -> String {
let possible = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
let mut rng = rand::thread_rng();
let mut id = String::with_capacity(len);
for _ in 0..len {
id.push(
possible
.chars()
.choose(&mut rng)
.expect("failed to choose character"),
);
}
id
}
}
fn number_in_range<T: std::cmp::PartialOrd + std::fmt::Display>(
value: T,
min: T,
max: T,
) -> RsmqResult<()> {
if value >= min && value <= max {
Ok(())
} else {
Err(RsmqError::InvalidValue(
format!("{}", value),
format!("{}", min),
format!("{}", max),
))
}
}
fn valid_name_format(name: &str) -> RsmqResult<()> {
if name.is_empty() && name.len() > 160 {
return Err(RsmqError::InvalidFormat(name.to_string()));
} else {
name.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_');
}
Ok(())
}