1use async_trait::async_trait;
2use middleware::Chain;
3use rand::{Rng, RngCore};
4use serde::{
5 de::{self, Deserializer, Visitor},
6 Deserialize, Serialize, Serializer,
7};
8use serde_json::Value as JsonValue;
9use sha2::{Digest, Sha256};
10use std::future::Future;
11use std::marker::PhantomData;
12use std::pin::Pin;
13use std::sync::Arc;
14
15pub mod periodic;
16
17mod middleware;
18mod processor;
19mod redis;
20mod scheduled;
21mod stats;
22
23pub use crate::redis::{
25 with_custom_namespace, RedisConnection, RedisConnectionManager, RedisError, RedisPool,
26};
27pub use ::redis as redis_rs;
28pub use middleware::{ChainIter, ServerMiddleware};
29pub use processor::{BalanceStrategy, Processor, ProcessorConfig, QueueConfig, WorkFetcher};
30pub use scheduled::Scheduled;
31pub use stats::{Counter, StatsPublisher};
32
33#[derive(thiserror::Error, Debug)]
34pub enum Error {
35 #[error("{0}")]
36 Message(String),
37
38 #[error(transparent)]
39 Json(#[from] serde_json::Error),
40
41 #[error(transparent)]
42 CronClock(#[from] cron_clock::error::Error),
43
44 #[error(transparent)]
45 BB8(#[from] bb8::RunError<redis::RedisError>),
46
47 #[error(transparent)]
48 ChronoRange(#[from] chrono::OutOfRangeError),
49
50 #[error(transparent)]
51 Redis(#[from] redis::RedisError),
52
53 #[error(transparent)]
54 Any(#[from] Box<dyn std::error::Error + Send + Sync>),
55}
56
57pub type Result<T> = std::result::Result<T, Error>;
58
59#[must_use]
60pub fn opts() -> EnqueueOpts {
61 EnqueueOpts {
62 queue: "default".into(),
63 retry: RetryOpts::Yes,
64 unique_for: None,
65 retry_queue: None,
66 }
67}
68
69pub struct EnqueueOpts {
70 queue: String,
71 retry: RetryOpts,
72 unique_for: Option<std::time::Duration>,
73 retry_queue: Option<String>,
74}
75
76impl EnqueueOpts {
77 #[must_use]
78 pub fn queue<S: Into<String>>(self, queue: S) -> Self {
79 Self {
80 queue: queue.into(),
81 ..self
82 }
83 }
84
85 #[must_use]
86 pub fn retry<RO>(self, retry: RO) -> Self
87 where
88 RO: Into<RetryOpts>,
89 {
90 Self {
91 retry: retry.into(),
92 ..self
93 }
94 }
95
96 #[must_use]
97 pub fn unique_for(self, unique_for: std::time::Duration) -> Self {
98 Self {
99 unique_for: Some(unique_for),
100 ..self
101 }
102 }
103
104 #[must_use]
105 pub fn retry_queue(self, retry_queue: String) -> Self {
106 Self {
107 retry_queue: Some(retry_queue),
108 ..self
109 }
110 }
111
112 pub fn create_job(&self, class: String, args: impl serde::Serialize) -> Result<Job> {
113 let args = serde_json::to_value(args)?;
114
115 let args = if args.is_array() {
117 args
118 } else {
119 JsonValue::Array(vec![args])
120 };
121
122 Ok(Job {
123 queue: self.queue.clone(),
124 class,
125 jid: new_jid(),
126 created_at: chrono::Utc::now().timestamp() as f64,
127 enqueued_at: None,
128 retry: self.retry.clone(),
129 args,
130
131 error_message: None,
133 error_class: None,
134 failed_at: None,
135 retry_count: None,
136 retried_at: None,
137
138 retry_queue: self.retry_queue.clone(),
140 unique_for: self.unique_for,
141 })
142 }
143
144 pub async fn perform_async(
145 self,
146 redis: &RedisPool,
147 class: String,
148 args: impl serde::Serialize,
149 ) -> Result<()> {
150 let job = self.create_job(class, args)?;
151 UnitOfWork::from_job(job).enqueue(redis).await?;
152 Ok(())
153 }
154
155 pub async fn perform_in(
156 &self,
157 redis: &RedisPool,
158 class: String,
159 duration: std::time::Duration,
160 args: impl serde::Serialize,
161 ) -> Result<()> {
162 let job = self.create_job(class, args)?;
163 UnitOfWork::from_job(job).schedule(redis, duration).await?;
164 Ok(())
165 }
166}
167
168pub async fn perform_async(
171 redis: &RedisPool,
172 class: String,
173 queue: String,
174 args: impl serde::Serialize,
175) -> Result<()> {
176 opts().queue(queue).perform_async(redis, class, args).await
177}
178
179pub async fn perform_in(
182 redis: &RedisPool,
183 duration: std::time::Duration,
184 class: String,
185 queue: String,
186 args: impl serde::Serialize,
187) -> Result<()> {
188 opts()
189 .queue(queue)
190 .perform_in(redis, class, duration, args)
191 .await
192}
193
194fn new_jid() -> String {
195 let mut bytes = [0u8; 12];
196 rand::thread_rng().fill_bytes(&mut bytes);
197 hex::encode(bytes)
198}
199
200pub struct WorkerOpts<Args, W: Worker<Args> + ?Sized> {
201 queue: String,
202 retry: RetryOpts,
203 args: PhantomData<Args>,
204 worker: PhantomData<W>,
205 unique_for: Option<std::time::Duration>,
206 retry_queue: Option<String>,
207}
208
209impl<Args, W> WorkerOpts<Args, W>
210where
211 W: Worker<Args>,
212{
213 #[must_use]
214 pub fn new() -> Self {
215 Self {
216 queue: "default".into(),
217 retry: RetryOpts::Yes,
218 args: PhantomData,
219 worker: PhantomData,
220 unique_for: None,
221 retry_queue: None,
222 }
223 }
224
225 #[must_use]
226 pub fn retry<RO>(self, retry: RO) -> Self
227 where
228 RO: Into<RetryOpts>,
229 {
230 Self {
231 retry: retry.into(),
232 ..self
233 }
234 }
235
236 #[must_use]
237 pub fn retry_queue<S: Into<String>>(self, retry_queue: S) -> Self {
238 Self {
239 retry_queue: Some(retry_queue.into()),
240 ..self
241 }
242 }
243
244 #[must_use]
245 pub fn queue<S: Into<String>>(self, queue: S) -> Self {
246 Self {
247 queue: queue.into(),
248 ..self
249 }
250 }
251
252 #[must_use]
253 pub fn unique_for(self, unique_for: std::time::Duration) -> Self {
254 Self {
255 unique_for: Some(unique_for),
256 ..self
257 }
258 }
259
260 #[allow(clippy::wrong_self_convention)]
261 pub fn into_opts(&self) -> EnqueueOpts {
262 self.into()
263 }
264
265 pub async fn perform_async(
266 &self,
267 redis: &RedisPool,
268 args: impl serde::Serialize + Send + 'static,
269 ) -> Result<()> {
270 self.into_opts()
271 .perform_async(redis, W::class_name(), args)
272 .await
273 }
274
275 pub async fn perform_in(
276 &self,
277 redis: &RedisPool,
278 duration: std::time::Duration,
279 args: impl serde::Serialize + Send + 'static,
280 ) -> Result<()> {
281 self.into_opts()
282 .perform_in(redis, W::class_name(), duration, args)
283 .await
284 }
285}
286
287impl<Args, W: Worker<Args>> From<&WorkerOpts<Args, W>> for EnqueueOpts {
288 fn from(opts: &WorkerOpts<Args, W>) -> Self {
289 Self {
290 retry: opts.retry.clone(),
291 queue: opts.queue.clone(),
292 unique_for: opts.unique_for,
293 retry_queue: opts.retry_queue.clone(),
294 }
295 }
296}
297
298impl<Args, W: Worker<Args>> Default for WorkerOpts<Args, W> {
299 fn default() -> Self {
300 Self::new()
301 }
302}
303
304#[async_trait]
305pub trait Worker<Args>: Send + Sync {
306 fn disable_argument_coercion(&self) -> bool {
311 false
312 }
313
314 #[must_use]
315 fn opts() -> WorkerOpts<Args, Self>
316 where
317 Self: Sized,
318 {
319 WorkerOpts::new()
320 }
321
322 fn max_retries(&self) -> usize {
325 25
326 }
327
328 #[must_use]
331 fn class_name() -> String
332 where
333 Self: Sized,
334 {
335 use convert_case::{Case, Casing};
336
337 let type_name = std::any::type_name::<Self>();
338 let name = type_name.split("::").last().unwrap_or(type_name);
339 name.to_case(Case::UpperCamel)
340 }
341
342 async fn perform_async(redis: &RedisPool, args: Args) -> Result<()>
343 where
344 Self: Sized,
345 Args: Send + Sync + serde::Serialize + 'static,
346 {
347 Self::opts().perform_async(redis, args).await
348 }
349
350 async fn perform_in(redis: &RedisPool, duration: std::time::Duration, args: Args) -> Result<()>
351 where
352 Self: Sized,
353 Args: Send + Sync + serde::Serialize + 'static,
354 {
355 Self::opts().perform_in(redis, duration, args).await
356 }
357
358 async fn perform(&self, args: Args) -> Result<()>;
359}
360
361#[derive(Clone)]
366pub struct WorkerRef {
367 #[allow(clippy::type_complexity)]
368 work_fn: Arc<
369 Box<dyn Fn(JsonValue) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>,
370 >,
371 max_retries: usize,
372}
373
374async fn invoke_worker<Args, W>(args: JsonValue, worker: Arc<W>) -> Result<()>
375where
376 Args: Send + Sync + 'static,
377 W: Worker<Args> + 'static,
378 for<'de> Args: Deserialize<'de>,
379{
380 let args = if worker.disable_argument_coercion() {
381 args
382 } else {
383 if std::any::TypeId::of::<Args>() == std::any::TypeId::of::<()>() {
385 JsonValue::Null
386 } else {
387 match args {
391 JsonValue::Array(mut arr) if arr.len() == 1 => {
392 arr.pop().expect("value change after size check")
393 }
394 _ => args,
395 }
396 }
397 };
398
399 let args: Args = serde_json::from_value(args)?;
400 worker.perform(args).await
401}
402
403impl WorkerRef {
404 pub(crate) fn wrap<Args, W>(worker: Arc<W>) -> Self
405 where
406 Args: Send + Sync + 'static,
407 W: Worker<Args> + 'static,
408 for<'de> Args: Deserialize<'de>,
409 {
410 Self {
411 work_fn: Arc::new(Box::new({
412 let worker = worker.clone();
413 move |args: JsonValue| {
414 let worker = worker.clone();
415 Box::pin(async move { invoke_worker(args, worker).await })
416 }
417 })),
418 max_retries: worker.max_retries(),
419 }
420 }
421
422 #[must_use]
423 pub fn max_retries(&self) -> usize {
424 self.max_retries
425 }
426
427 pub async fn call(&self, args: JsonValue) -> Result<()> {
428 (Arc::clone(&self.work_fn))(args).await
429 }
430}
431
432#[derive(Clone, Debug, PartialEq)]
433pub enum RetryOpts {
434 Yes,
435 Never,
436 Max(usize),
437}
438
439impl Serialize for RetryOpts {
440 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
441 where
442 S: Serializer,
443 {
444 match *self {
445 RetryOpts::Yes => serializer.serialize_bool(true),
446 RetryOpts::Never => serializer.serialize_bool(false),
447 RetryOpts::Max(value) => serializer.serialize_u64(value as u64),
448 }
449 }
450}
451
452impl<'de> Deserialize<'de> for RetryOpts {
453 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
454 where
455 D: Deserializer<'de>,
456 {
457 struct RetryOptsVisitor;
458
459 impl Visitor<'_> for RetryOptsVisitor {
460 type Value = RetryOpts;
461
462 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
463 formatter.write_str("a boolean, null, or a positive integer")
464 }
465
466 fn visit_bool<E>(self, value: bool) -> std::result::Result<Self::Value, E>
467 where
468 E: de::Error,
469 {
470 if value {
471 Ok(RetryOpts::Yes)
472 } else {
473 Ok(RetryOpts::Never)
474 }
475 }
476
477 fn visit_none<E>(self) -> std::result::Result<Self::Value, E>
478 where
479 E: de::Error,
480 {
481 Ok(RetryOpts::Never)
482 }
483
484 fn visit_u64<E>(self, value: u64) -> std::result::Result<Self::Value, E>
485 where
486 E: de::Error,
487 {
488 Ok(RetryOpts::Max(value as usize))
489 }
490 }
491
492 deserializer.deserialize_any(RetryOptsVisitor)
493 }
494}
495
496impl From<bool> for RetryOpts {
497 fn from(value: bool) -> Self {
498 match value {
499 true => RetryOpts::Yes,
500 false => RetryOpts::Never,
501 }
502 }
503}
504
505impl From<usize> for RetryOpts {
506 fn from(value: usize) -> Self {
507 RetryOpts::Max(value)
508 }
509}
510
511#[derive(Serialize, Deserialize, Debug, Clone)]
527pub struct Job {
528 pub queue: String,
529 pub args: JsonValue,
530 pub retry: RetryOpts,
531 pub class: String,
532 pub jid: String,
533 pub created_at: f64,
534 pub enqueued_at: Option<f64>,
535 pub failed_at: Option<f64>,
536 pub error_message: Option<String>,
537 pub error_class: Option<String>,
538 pub retry_count: Option<usize>,
539 pub retried_at: Option<f64>,
540 pub retry_queue: Option<String>,
541
542 #[serde(skip)]
543 pub unique_for: Option<std::time::Duration>,
544}
545
546#[derive(Debug)]
547pub struct UnitOfWork {
548 pub queue: String,
549 pub job: Job,
550}
551
552impl UnitOfWork {
553 #[must_use]
554 pub fn from_job(job: Job) -> Self {
555 Self {
556 queue: format!("queue:{}", &job.queue),
557 job,
558 }
559 }
560
561 pub fn from_job_string(job_str: String) -> Result<Self> {
562 let job: Job = serde_json::from_str(&job_str)?;
563 Ok(Self::from_job(job))
564 }
565
566 pub async fn enqueue(&self, redis: &RedisPool) -> Result<()> {
567 let mut redis = redis.get().await?;
568 self.enqueue_direct(&mut redis).await
569 }
570
571 async fn enqueue_direct(&self, redis: &mut RedisConnection) -> Result<()> {
572 let mut job = self.job.clone();
573 job.enqueued_at = Some(chrono::Utc::now().timestamp() as f64);
574
575 if let Some(ref duration) = job.unique_for {
576 let args_as_json_string: String = serde_json::to_string(&job.args)?;
580 let args_hash = format!("{:x}", Sha256::digest(&args_as_json_string));
581 let redis_key = format!(
582 "sidekiq:unique:{}:{}:{}",
583 &job.queue, &job.class, &args_hash
584 );
585 if let redis::RedisValue::Nil = redis
586 .set_nx_ex(redis_key, "", duration.as_secs() as usize)
587 .await?
588 {
589 return Ok(());
591 }
592 }
593
594 redis.sadd("queues".to_string(), job.queue.clone()).await?;
595
596 redis
597 .lpush(self.queue.clone(), serde_json::to_string(&job)?)
598 .await?;
599 Ok(())
600 }
601
602 pub async fn reenqueue(&mut self, redis: &RedisPool) -> Result<()> {
603 if let Some(retry_count) = self.job.retry_count {
604 redis
605 .get()
606 .await?
607 .zadd(
608 "retry".to_string(),
609 serde_json::to_string(&self.job)?,
610 Self::retry_job_at(retry_count).timestamp(),
611 )
612 .await?;
613 }
614
615 Ok(())
616 }
617
618 fn retry_job_at(count: usize) -> chrono::DateTime<chrono::Utc> {
619 let seconds_to_delay =
620 count.pow(4) + 15 + (rand::thread_rng().gen_range(0..30) * (count + 1));
621
622 chrono::Utc::now() + chrono::Duration::seconds(seconds_to_delay as i64)
623 }
624
625 pub async fn schedule(
626 &mut self,
627 redis: &RedisPool,
628 duration: std::time::Duration,
629 ) -> Result<()> {
630 let enqueue_at = chrono::Utc::now() + chrono::Duration::from_std(duration)?;
631
632 redis
633 .get()
634 .await?
635 .zadd(
636 "schedule".to_string(),
637 serde_json::to_string(&self.job)?,
638 enqueue_at.timestamp(),
639 )
640 .await?;
641
642 Ok(())
643 }
644}
645
646#[cfg(test)]
647mod test {
648 use super::*;
649
650 mod my {
651 pub mod cool {
652 pub mod workers {
653 use super::super::super::super::*;
654
655 pub struct TestOpts;
656
657 #[async_trait]
658 impl Worker<()> for TestOpts {
659 fn opts() -> WorkerOpts<(), Self>
660 where
661 Self: Sized,
662 {
663 WorkerOpts::new()
664 .retry(false)
666 .retry(42)
668 .retry(RetryOpts::Never)
670 .unique_for(std::time::Duration::from_secs(30))
671 .queue("yolo_quue")
672 }
673
674 async fn perform(&self, _args: ()) -> Result<()> {
675 Ok(())
676 }
677 }
678
679 pub struct X1Y2MyJob;
680
681 #[async_trait]
682 impl Worker<()> for X1Y2MyJob {
683 async fn perform(&self, _args: ()) -> Result<()> {
684 Ok(())
685 }
686 }
687
688 pub struct TestModuleWorker;
689
690 #[async_trait]
691 impl Worker<()> for TestModuleWorker {
692 async fn perform(&self, _args: ()) -> Result<()> {
693 Ok(())
694 }
695 }
696
697 pub struct TestCustomClassNameWorker;
698
699 #[async_trait]
700 impl Worker<()> for TestCustomClassNameWorker {
701 async fn perform(&self, _args: ()) -> Result<()> {
702 Ok(())
703 }
704
705 fn class_name() -> String
706 where
707 Self: Sized,
708 {
709 "My::Cool::Workers::TestCustomClassNameWorker".to_string()
710 }
711 }
712 }
713 }
714 }
715
716 #[tokio::test]
717 async fn ignores_modules_in_ruby_worker_name() {
718 assert_eq!(
719 my::cool::workers::TestModuleWorker::class_name(),
720 "TestModuleWorker".to_string()
721 );
722 }
723
724 #[tokio::test]
725 async fn does_not_reformat_valid_ruby_class_names() {
726 assert_eq!(
727 my::cool::workers::X1Y2MyJob::class_name(),
728 "X1Y2MyJob".to_string()
729 );
730 }
731
732 #[tokio::test]
733 async fn supports_custom_class_name_for_workers() {
734 assert_eq!(
735 my::cool::workers::TestCustomClassNameWorker::class_name(),
736 "My::Cool::Workers::TestCustomClassNameWorker".to_string()
737 );
738 }
739
740 #[derive(Clone, Deserialize, Serialize, Debug)]
741 struct TestArg {
742 name: String,
743 age: i32,
744 }
745
746 struct TestGenericWorker;
747 #[async_trait]
748 impl Worker<TestArg> for TestGenericWorker {
749 async fn perform(&self, _args: TestArg) -> Result<()> {
750 Ok(())
751 }
752 }
753
754 struct TestMultiArgWorker;
755 #[async_trait]
756 impl Worker<(TestArg, TestArg)> for TestMultiArgWorker {
757 async fn perform(&self, _args: (TestArg, TestArg)) -> Result<()> {
758 Ok(())
759 }
760 }
761
762 struct TestTupleArgWorker;
763 #[async_trait]
764 impl Worker<(TestArg,)> for TestTupleArgWorker {
765 fn disable_argument_coercion(&self) -> bool {
766 true
767 }
768 async fn perform(&self, _args: (TestArg,)) -> Result<()> {
769 Ok(())
770 }
771 }
772
773 struct TestVecArgWorker;
774 #[async_trait]
775 impl Worker<Vec<TestArg>> for TestVecArgWorker {
776 fn disable_argument_coercion(&self) -> bool {
777 true
778 }
779 async fn perform(&self, _args: Vec<TestArg>) -> Result<()> {
780 Ok(())
781 }
782 }
783
784 #[tokio::test]
785 async fn can_have_a_vec_with_one_or_more_items() {
786 let worker = Arc::new(TestVecArgWorker);
788 let wrap = Arc::new(WorkerRef::wrap(worker));
789 let wrap = wrap.clone();
790 let arg = serde_json::to_value(vec![TestArg {
791 name: "test A".into(),
792 age: 1337,
793 }])
794 .unwrap();
795 wrap.call(arg).await.unwrap();
796
797 let worker = Arc::new(TestVecArgWorker);
799 let wrap = Arc::new(WorkerRef::wrap(worker));
800 let wrap = wrap.clone();
801 let arg = serde_json::to_value(vec![
802 TestArg {
803 name: "test A".into(),
804 age: 1337,
805 },
806 TestArg {
807 name: "test A".into(),
808 age: 1337,
809 },
810 ])
811 .unwrap();
812 wrap.call(arg).await.unwrap();
813 }
814
815 #[tokio::test]
816 async fn can_have_multiple_arguments() {
817 let worker = Arc::new(TestMultiArgWorker);
818 let wrap = Arc::new(WorkerRef::wrap(worker));
819 let wrap = wrap.clone();
820 let arg = serde_json::to_value((
821 TestArg {
822 name: "test A".into(),
823 age: 1337,
824 },
825 TestArg {
826 name: "test B".into(),
827 age: 1336,
828 },
829 ))
830 .unwrap();
831 wrap.call(arg).await.unwrap();
832 }
833
834 #[tokio::test]
835 async fn can_have_a_single_tuple_argument() {
836 let worker = Arc::new(TestTupleArgWorker);
837 let wrap = Arc::new(WorkerRef::wrap(worker));
838 let wrap = wrap.clone();
839 let arg = serde_json::to_value((TestArg {
840 name: "test".into(),
841 age: 1337,
842 },))
843 .unwrap();
844 wrap.call(arg).await.unwrap();
845 }
846
847 #[tokio::test]
848 async fn can_have_a_single_argument() {
849 let worker = Arc::new(TestGenericWorker);
850 let wrap = Arc::new(WorkerRef::wrap(worker));
851 let wrap = wrap.clone();
852 let arg = serde_json::to_value(TestArg {
853 name: "test".into(),
854 age: 1337,
855 })
856 .unwrap();
857 wrap.call(arg).await.unwrap();
858 }
859
860 #[tokio::test]
861 async fn processor_config_has_workers_by_default() {
862 let cfg = ProcessorConfig::default();
863
864 assert!(
865 cfg.num_workers > 0,
866 "num_workers should be greater than 0 (using num cpu)"
867 );
868
869 let cfg = cfg.num_workers(1000);
870
871 assert_eq!(cfg.num_workers, 1000);
872 }
873}