use async_trait::async_trait;
use middleware::Chain;
use rand::{Rng, RngCore};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use sha2::{Digest, Sha256};
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
pub mod periodic;
mod middleware;
mod processor;
mod redis;
mod scheduled;
mod stats;
pub use crate::redis::{
with_custom_namespace, RedisConnection, RedisConnectionManager, RedisError, RedisPool,
};
pub use middleware::{ChainIter, ServerMiddleware, ServerResult};
pub use processor::{Processor, WorkFetcher};
pub use scheduled::Scheduled;
pub use stats::{Counter, StatsPublisher};
pub fn opts() -> EnqueueOpts {
EnqueueOpts {
queue: "default".into(),
retry: true,
unique_for: None,
}
}
pub struct EnqueueOpts {
queue: String,
retry: bool,
unique_for: Option<std::time::Duration>,
}
impl EnqueueOpts {
pub fn queue<S: Into<String>>(self, queue: S) -> Self {
Self {
queue: queue.into(),
..self
}
}
pub fn retry(self, retry: bool) -> Self {
Self { retry, ..self }
}
pub fn unique_for(self, unique_for: std::time::Duration) -> Self {
Self {
unique_for: Some(unique_for),
..self
}
}
fn create_job(
&self,
class: String,
args: impl serde::Serialize,
) -> Result<Job, Box<dyn std::error::Error>> {
let args = serde_json::to_value(args)?;
let args = if args.is_array() {
args
} else {
JsonValue::Array(vec![args])
};
Ok(Job {
queue: self.queue.clone(),
class,
jid: new_jid(),
created_at: chrono::Utc::now().timestamp() as f64,
enqueued_at: None,
retry: self.retry,
args,
error_message: None,
failed_at: None,
retry_count: None,
retried_at: None,
unique_for: self.unique_for,
})
}
pub async fn perform_async(
self,
redis: &mut RedisPool,
class: String,
args: impl serde::Serialize,
) -> Result<(), Box<dyn std::error::Error>> {
let job = self.create_job(class, args)?;
UnitOfWork::from_job(job).enqueue(redis).await?;
Ok(())
}
pub async fn perform_in(
&self,
redis: &mut RedisPool,
class: String,
duration: std::time::Duration,
args: impl serde::Serialize,
) -> Result<(), Box<dyn std::error::Error>> {
let job = self.create_job(class, args)?;
UnitOfWork::from_job(job).schedule(redis, duration).await?;
Ok(())
}
}
pub async fn perform_async(
redis: &mut RedisPool,
class: String,
queue: String,
args: impl serde::Serialize,
) -> Result<(), Box<dyn std::error::Error>> {
opts().queue(queue).perform_async(redis, class, args).await
}
pub async fn perform_in(
redis: &mut RedisPool,
duration: std::time::Duration,
class: String,
queue: String,
args: impl serde::Serialize,
) -> Result<(), Box<dyn std::error::Error>> {
opts()
.queue(queue)
.perform_in(redis, class, duration, args)
.await
}
fn new_jid() -> String {
let mut bytes = [0u8; 12];
rand::thread_rng().fill_bytes(&mut bytes);
hex::encode(bytes)
}
pub struct WorkerOpts<Args, W: Worker<Args> + ?Sized> {
queue: String,
retry: bool,
args: PhantomData<Args>,
worker: PhantomData<W>,
unique_for: Option<std::time::Duration>,
}
impl<Args, W> WorkerOpts<Args, W>
where
W: Worker<Args>,
{
pub fn new() -> Self {
Self {
queue: "default".into(),
retry: true,
args: PhantomData,
worker: PhantomData,
unique_for: None,
}
}
pub fn retry(self, retry: bool) -> Self {
Self { retry, ..self }
}
pub fn queue<S: Into<String>>(self, queue: S) -> Self {
Self {
queue: queue.into(),
..self
}
}
pub fn unique_for(self, unique_for: std::time::Duration) -> Self {
Self {
unique_for: Some(unique_for),
..self
}
}
#[allow(clippy::wrong_self_convention)]
fn into_opts(&self) -> EnqueueOpts {
self.into()
}
pub async fn perform_async(
&self,
redis: &mut RedisPool,
args: impl serde::Serialize + Send + 'static,
) -> Result<(), Box<dyn std::error::Error>> {
self.into_opts()
.perform_async(redis, W::class_name(), args)
.await
}
pub async fn perform_in(
&self,
redis: &mut RedisPool,
duration: std::time::Duration,
args: impl serde::Serialize + Send + 'static,
) -> Result<(), Box<dyn std::error::Error>> {
self.into_opts()
.perform_in(redis, W::class_name(), duration, args)
.await
}
}
impl<Args, W: Worker<Args>> From<&WorkerOpts<Args, W>> for EnqueueOpts {
fn from(opts: &WorkerOpts<Args, W>) -> Self {
Self {
retry: opts.retry,
queue: opts.queue.clone(),
unique_for: opts.unique_for,
}
}
}
impl<Args, W: Worker<Args>> Default for WorkerOpts<Args, W> {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
pub trait Worker<Args>: Send + Sync {
fn disable_argument_coercion(&self) -> bool {
false
}
fn opts() -> WorkerOpts<Args, Self>
where
Self: Sized,
{
WorkerOpts::new()
}
fn max_retries(&self) -> usize {
25
}
fn class_name() -> String
where
Self: Sized,
{
use heck::ToUpperCamelCase;
let type_name = std::any::type_name::<Self>();
let name = type_name.split("::").last().unwrap_or(type_name);
name.to_upper_camel_case()
}
async fn perform_async(
redis: &mut RedisPool,
args: Args,
) -> Result<(), Box<dyn std::error::Error>>
where
Self: Sized,
Args: Send + Sync + serde::Serialize + 'static,
{
Self::opts().perform_async(redis, args).await
}
async fn perform_in(
redis: &mut RedisPool,
duration: std::time::Duration,
args: Args,
) -> Result<(), Box<dyn std::error::Error>>
where
Self: Sized,
Args: Send + Sync + serde::Serialize + 'static,
{
Self::opts().perform_in(redis, duration, args).await
}
async fn perform(&self, args: Args) -> Result<(), Box<dyn std::error::Error>>;
}
#[derive(Clone)]
pub struct WorkerRef {
#[allow(clippy::type_complexity)]
work_fn: Arc<
Box<
dyn Fn(
JsonValue,
)
-> Pin<Box<dyn Future<Output = Result<(), Box<dyn std::error::Error>>> + Send>>
+ Send
+ Sync,
>,
>,
max_retries: usize,
}
async fn invoke_worker<Args, W>(args: JsonValue, worker: Arc<W>) -> ServerResult
where
Args: Send + Sync + 'static,
W: Worker<Args> + 'static,
for<'de> Args: Deserialize<'de>,
{
let args = if worker.disable_argument_coercion() {
args
} else {
if std::any::TypeId::of::<Args>() == std::any::TypeId::of::<()>() {
JsonValue::Null
} else {
match args {
JsonValue::Array(mut arr) if arr.len() == 1 => {
arr.pop().expect("value change after size check")
}
_ => args,
}
}
};
let args: Args = serde_json::from_value(args)?;
worker.perform(args).await
}
impl WorkerRef {
pub(crate) fn wrap<Args, W>(worker: Arc<W>) -> Self
where
Args: Send + Sync + 'static,
W: Worker<Args> + 'static,
for<'de> Args: Deserialize<'de>,
{
Self {
work_fn: Arc::new(Box::new({
let worker = worker.clone();
move |args: JsonValue| {
let worker = worker.clone();
Box::pin(async move { invoke_worker(args, worker).await })
}
})),
max_retries: worker.max_retries(),
}
}
pub fn max_retries(&self) -> usize {
self.max_retries
}
pub async fn call(&self, args: JsonValue) -> Result<(), Box<dyn std::error::Error>> {
(Arc::clone(&self.work_fn))(args).await
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Job {
pub queue: String,
pub args: JsonValue,
pub retry: bool,
pub class: String,
pub jid: String,
pub created_at: f64,
pub enqueued_at: Option<f64>,
pub failed_at: Option<f64>,
pub error_message: Option<String>,
pub retry_count: Option<usize>,
pub retried_at: Option<f64>,
#[serde(skip)]
pub unique_for: Option<std::time::Duration>,
}
#[derive(Debug)]
pub struct UnitOfWork {
queue: String,
job: Job,
}
impl UnitOfWork {
pub fn from_job(job: Job) -> Self {
UnitOfWork {
queue: format!("queue:{}", &job.queue),
job,
}
}
pub fn from_job_string(job_str: String) -> Result<Self, Box<dyn std::error::Error>> {
let job: Job = serde_json::from_str(&job_str)?;
Ok(Self::from_job(job))
}
pub async fn enqueue(&self, redis: &mut RedisPool) -> Result<(), Box<dyn std::error::Error>> {
let mut redis = redis.get().await?;
self.enqueue_direct(&mut *redis).await
}
async fn enqueue_direct(
&self,
redis: &mut RedisConnection,
) -> Result<(), Box<dyn std::error::Error>> {
let mut job = self.job.clone();
job.enqueued_at = Some(chrono::Utc::now().timestamp() as f64);
if let Some(ref duration) = job.unique_for {
let args_as_json_string: String = serde_json::to_string(&job.args)?;
let args_hash = format!("{:x}", Sha256::digest(&args_as_json_string));
let redis_key = format!(
"sidekiq:unique:{}:{}:{}",
&job.queue, &job.class, &args_hash
);
if let redis::RedisValue::Nil = redis
.set_nx_ex(redis_key, "".into(), duration.as_secs() as usize)
.await?
{
return Ok(());
}
}
redis.sadd("queues".to_string(), job.queue.clone()).await?;
redis
.lpush(self.queue.clone(), serde_json::to_string(&job)?)
.await?;
Ok(())
}
pub async fn reenqueue(
&mut self,
redis: &mut RedisPool,
) -> Result<(), Box<dyn std::error::Error>> {
if let Some(retry_count) = self.job.retry_count {
redis
.get()
.await?
.zadd(
"retry".to_string(),
serde_json::to_string(&self.job)?,
Self::retry_job_at(retry_count).timestamp(),
)
.await?;
}
Ok(())
}
fn retry_job_at(count: usize) -> chrono::DateTime<chrono::Utc> {
let seconds_to_delay =
count.pow(4) + 15 + (rand::thread_rng().gen_range(0..30) * (count + 1));
chrono::Utc::now() + chrono::Duration::seconds(seconds_to_delay as i64)
}
pub async fn schedule(
&mut self,
redis: &mut RedisPool,
duration: std::time::Duration,
) -> Result<(), Box<dyn std::error::Error>> {
let enqueue_at = chrono::Utc::now() + chrono::Duration::from_std(duration)?;
redis
.get()
.await?
.zadd(
"schedule".to_string(),
serde_json::to_string(&self.job)?,
enqueue_at.timestamp(),
)
.await?;
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
mod my {
pub mod cool {
pub mod workers {
use super::super::super::super::*;
pub struct TestModuleWorker;
#[async_trait]
impl Worker<()> for TestModuleWorker {
async fn perform(&self, _args: ()) -> ServerResult {
Ok(())
}
}
pub struct TestCustomClassNameWorker;
#[async_trait]
impl Worker<()> for TestCustomClassNameWorker {
async fn perform(&self, _args: ()) -> ServerResult {
Ok(())
}
fn class_name() -> String
where
Self: Sized,
{
"My::Cool::Workers::TestCustomClassNameWorker".to_string()
}
}
}
}
}
#[tokio::test]
async fn ignores_modules_in_ruby_worker_name() {
assert_eq!(
my::cool::workers::TestModuleWorker::class_name(),
"TestModuleWorker".to_string()
);
}
#[tokio::test]
async fn supports_custom_class_name_for_workers() {
assert_eq!(
my::cool::workers::TestCustomClassNameWorker::class_name(),
"My::Cool::Workers::TestCustomClassNameWorker".to_string()
);
}
#[derive(Deserialize, Serialize, Debug)]
struct TestArg {
name: String,
age: i32,
}
struct TestGenericWorker;
#[async_trait]
impl Worker<TestArg> for TestGenericWorker {
async fn perform(&self, _args: TestArg) -> ServerResult {
Ok(())
}
}
struct TestMultiArgWorker;
#[async_trait]
impl Worker<(TestArg, TestArg)> for TestMultiArgWorker {
async fn perform(&self, _args: (TestArg, TestArg)) -> ServerResult {
Ok(())
}
}
struct TestTupleArgWorker;
#[async_trait]
impl Worker<(TestArg,)> for TestTupleArgWorker {
fn disable_argument_coercion(&self) -> bool {
true
}
async fn perform(&self, _args: (TestArg,)) -> ServerResult {
Ok(())
}
}
struct TestVecArgWorker;
#[async_trait]
impl Worker<Vec<TestArg>> for TestVecArgWorker {
fn disable_argument_coercion(&self) -> bool {
true
}
async fn perform(&self, _args: Vec<TestArg>) -> ServerResult {
Ok(())
}
}
#[tokio::test]
async fn can_have_a_vec_with_one_or_more_items() {
let worker = Arc::new(TestVecArgWorker);
let wrap = Arc::new(WorkerRef::wrap(worker));
let wrap = wrap.clone();
let arg = serde_json::to_value(vec![TestArg {
name: "test A".into(),
age: 1337,
}])
.unwrap();
wrap.call(arg).await.unwrap();
let worker = Arc::new(TestVecArgWorker);
let wrap = Arc::new(WorkerRef::wrap(worker));
let wrap = wrap.clone();
let arg = serde_json::to_value(vec![
TestArg {
name: "test A".into(),
age: 1337,
},
TestArg {
name: "test A".into(),
age: 1337,
},
])
.unwrap();
wrap.call(arg).await.unwrap();
}
#[tokio::test]
async fn can_have_multiple_arguments() {
let worker = Arc::new(TestMultiArgWorker);
let wrap = Arc::new(WorkerRef::wrap(worker));
let wrap = wrap.clone();
let arg = serde_json::to_value((
TestArg {
name: "test A".into(),
age: 1337,
},
TestArg {
name: "test B".into(),
age: 1336,
},
))
.unwrap();
wrap.call(arg).await.unwrap();
}
#[tokio::test]
async fn can_have_a_single_tuple_argument() {
let worker = Arc::new(TestTupleArgWorker);
let wrap = Arc::new(WorkerRef::wrap(worker));
let wrap = wrap.clone();
let arg = serde_json::to_value((TestArg {
name: "test".into(),
age: 1337,
},))
.unwrap();
wrap.call(arg).await.unwrap();
}
#[tokio::test]
async fn can_have_a_single_argument() {
let worker = Arc::new(TestGenericWorker);
let wrap = Arc::new(WorkerRef::wrap(worker));
let wrap = wrap.clone();
let arg = serde_json::to_value(TestArg {
name: "test".into(),
age: 1337,
})
.unwrap();
wrap.call(arg).await.unwrap();
}
}