1use async_trait::async_trait;
2use middleware::Chain;
3use rand::{Rng, RngCore};
4use serde::{
5 de::{self, Deserializer, Visitor},
6 Deserialize, Serialize, Serializer,
7};
8use serde_json::Value as JsonValue;
9use sha2::{Digest, Sha256};
10use std::future::Future;
11use std::marker::PhantomData;
12use std::pin::Pin;
13use std::sync::Arc;
14
15pub mod periodic;
16
17mod middleware;
18mod processor;
19mod redis;
20mod scheduled;
21mod stats;
22
23pub use crate::redis::{
25 with_custom_namespace, RedisConnection, RedisConnectionManager, RedisError, RedisPool,
26};
27pub use ::redis as redis_rs;
28pub use middleware::{ChainIter, ServerMiddleware};
29pub use processor::{BalanceStrategy, Processor, ProcessorConfig, QueueConfig, WorkFetcher};
30pub use scheduled::Scheduled;
31pub use stats::{Counter, StatsPublisher};
32
33#[derive(thiserror::Error, Debug)]
34pub enum Error {
35 #[error("{0}")]
36 Message(String),
37
38 #[error(transparent)]
39 Json(#[from] serde_json::Error),
40
41 #[error(transparent)]
42 CronClock(#[from] cron_clock::error::Error),
43
44 #[error(transparent)]
45 BB8(#[from] bb8::RunError<redis::RedisError>),
46
47 #[error(transparent)]
48 ChronoRange(#[from] chrono::OutOfRangeError),
49
50 #[error(transparent)]
51 Redis(#[from] redis::RedisError),
52
53 #[error(transparent)]
54 Any(#[from] Box<dyn std::error::Error + Send + Sync>),
55}
56
57pub type Result<T> = std::result::Result<T, Error>;
58
59#[must_use]
60pub fn opts() -> EnqueueOpts {
61 EnqueueOpts {
62 queue: "default".into(),
63 retry: RetryOpts::Yes,
64 unique_for: None,
65 retry_queue: None,
66 }
67}
68
69pub struct EnqueueOpts {
70 queue: String,
71 retry: RetryOpts,
72 unique_for: Option<std::time::Duration>,
73 retry_queue: Option<String>,
74}
75
76impl EnqueueOpts {
77 #[must_use]
78 pub fn queue<S: Into<String>>(self, queue: S) -> Self {
79 Self {
80 queue: queue.into(),
81 ..self
82 }
83 }
84
85 #[must_use]
86 pub fn retry<RO>(self, retry: RO) -> Self
87 where
88 RO: Into<RetryOpts>,
89 {
90 Self {
91 retry: retry.into(),
92 ..self
93 }
94 }
95
96 #[must_use]
97 pub fn unique_for(self, unique_for: std::time::Duration) -> Self {
98 Self {
99 unique_for: Some(unique_for),
100 ..self
101 }
102 }
103
104 #[must_use]
105 pub fn retry_queue(self, retry_queue: String) -> Self {
106 Self {
107 retry_queue: Some(retry_queue),
108 ..self
109 }
110 }
111
112 pub fn create_job(&self, class: String, args: impl serde::Serialize) -> Result<Job> {
113 let args = serde_json::to_value(args)?;
114
115 let args = if args.is_array() {
117 args
118 } else {
119 JsonValue::Array(vec![args])
120 };
121
122 Ok(Job {
123 queue: self.queue.clone(),
124 class,
125 jid: new_jid(),
126 created_at: chrono::Utc::now().timestamp_millis() as f64,
127 enqueued_at: None,
128 retry: self.retry.clone(),
129 args,
130
131 error_message: None,
133 error_class: None,
134 failed_at: None,
135 retry_count: None,
136 retried_at: None,
137
138 retry_queue: self.retry_queue.clone(),
140 unique_for: self.unique_for,
141 })
142 }
143
144 pub async fn perform_async(
145 self,
146 redis: &RedisPool,
147 class: String,
148 args: impl serde::Serialize,
149 ) -> Result<()> {
150 let job = self.create_job(class, args)?;
151 UnitOfWork::from_job(job).enqueue(redis).await?;
152 Ok(())
153 }
154
155 pub async fn perform_in(
156 &self,
157 redis: &RedisPool,
158 class: String,
159 duration: std::time::Duration,
160 args: impl serde::Serialize,
161 ) -> Result<()> {
162 let job = self.create_job(class, args)?;
163 UnitOfWork::from_job(job).schedule(redis, duration).await?;
164 Ok(())
165 }
166}
167
168pub async fn perform_async(
171 redis: &RedisPool,
172 class: String,
173 queue: String,
174 args: impl serde::Serialize,
175) -> Result<()> {
176 opts().queue(queue).perform_async(redis, class, args).await
177}
178
179pub async fn perform_in(
182 redis: &RedisPool,
183 duration: std::time::Duration,
184 class: String,
185 queue: String,
186 args: impl serde::Serialize,
187) -> Result<()> {
188 opts()
189 .queue(queue)
190 .perform_in(redis, class, duration, args)
191 .await
192}
193
194fn new_jid() -> String {
195 let mut bytes = [0u8; 12];
196 rand::rng().fill_bytes(&mut bytes);
197 hex::encode(bytes)
198}
199
200pub struct WorkerOpts<Args, W: Worker<Args> + ?Sized> {
201 queue: String,
202 retry: RetryOpts,
203 args: PhantomData<Args>,
204 worker: PhantomData<W>,
205 unique_for: Option<std::time::Duration>,
206 retry_queue: Option<String>,
207}
208
209impl<Args, W> WorkerOpts<Args, W>
210where
211 W: Worker<Args>,
212{
213 #[must_use]
214 pub fn new() -> Self {
215 Self {
216 queue: "default".into(),
217 retry: RetryOpts::Yes,
218 args: PhantomData,
219 worker: PhantomData,
220 unique_for: None,
221 retry_queue: None,
222 }
223 }
224
225 #[must_use]
226 pub fn retry<RO>(self, retry: RO) -> Self
227 where
228 RO: Into<RetryOpts>,
229 {
230 Self {
231 retry: retry.into(),
232 ..self
233 }
234 }
235
236 #[must_use]
237 pub fn retry_queue<S: Into<String>>(self, retry_queue: S) -> Self {
238 Self {
239 retry_queue: Some(retry_queue.into()),
240 ..self
241 }
242 }
243
244 #[must_use]
245 pub fn queue<S: Into<String>>(self, queue: S) -> Self {
246 Self {
247 queue: queue.into(),
248 ..self
249 }
250 }
251
252 #[must_use]
253 pub fn unique_for(self, unique_for: std::time::Duration) -> Self {
254 Self {
255 unique_for: Some(unique_for),
256 ..self
257 }
258 }
259
260 #[allow(clippy::wrong_self_convention)]
261 pub fn into_opts(&self) -> EnqueueOpts {
262 self.into()
263 }
264
265 pub async fn perform_async(
266 &self,
267 redis: &RedisPool,
268 args: impl serde::Serialize + Send + 'static,
269 ) -> Result<()> {
270 self.into_opts()
271 .perform_async(redis, W::class_name(), args)
272 .await
273 }
274
275 pub async fn perform_in(
276 &self,
277 redis: &RedisPool,
278 duration: std::time::Duration,
279 args: impl serde::Serialize + Send + 'static,
280 ) -> Result<()> {
281 self.into_opts()
282 .perform_in(redis, W::class_name(), duration, args)
283 .await
284 }
285}
286
287impl<Args, W: Worker<Args>> From<&WorkerOpts<Args, W>> for EnqueueOpts {
288 fn from(opts: &WorkerOpts<Args, W>) -> Self {
289 Self {
290 retry: opts.retry.clone(),
291 queue: opts.queue.clone(),
292 unique_for: opts.unique_for,
293 retry_queue: opts.retry_queue.clone(),
294 }
295 }
296}
297
298impl<Args, W: Worker<Args>> Default for WorkerOpts<Args, W> {
299 fn default() -> Self {
300 Self::new()
301 }
302}
303
304#[async_trait]
305pub trait Worker<Args>: Send + Sync {
306 fn disable_argument_coercion(&self) -> bool {
311 false
312 }
313
314 #[must_use]
315 fn opts() -> WorkerOpts<Args, Self>
316 where
317 Self: Sized,
318 {
319 WorkerOpts::new()
320 }
321
322 fn max_retries(&self) -> usize {
325 25
326 }
327
328 #[must_use]
331 fn class_name() -> String
332 where
333 Self: Sized,
334 {
335 use convert_case::{Case, Casing};
336
337 let type_name = std::any::type_name::<Self>();
338 let name = type_name.split("::").last().unwrap_or(type_name);
339 name.to_case(Case::UpperCamel)
340 }
341
342 async fn perform_async(redis: &RedisPool, args: Args) -> Result<()>
343 where
344 Self: Sized,
345 Args: Send + Sync + serde::Serialize + 'static,
346 {
347 Self::opts().perform_async(redis, args).await
348 }
349
350 async fn perform_in(redis: &RedisPool, duration: std::time::Duration, args: Args) -> Result<()>
351 where
352 Self: Sized,
353 Args: Send + Sync + serde::Serialize + 'static,
354 {
355 Self::opts().perform_in(redis, duration, args).await
356 }
357
358 async fn perform(&self, args: Args) -> Result<()>;
359}
360
361#[derive(Clone)]
366pub struct WorkerRef {
367 #[allow(clippy::type_complexity)]
368 work_fn: Arc<
369 Box<dyn Fn(JsonValue) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>,
370 >,
371 max_retries: usize,
372}
373
374async fn invoke_worker<Args, W>(args: JsonValue, worker: Arc<W>) -> Result<()>
375where
376 Args: Send + Sync + 'static,
377 W: Worker<Args> + 'static,
378 for<'de> Args: Deserialize<'de>,
379{
380 let args = if worker.disable_argument_coercion() {
381 args
382 } else {
383 if std::any::TypeId::of::<Args>() == std::any::TypeId::of::<()>() {
385 JsonValue::Null
386 } else {
387 match args {
391 JsonValue::Array(mut arr) if arr.len() == 1 => {
392 arr.pop().expect("value change after size check")
393 }
394 _ => args,
395 }
396 }
397 };
398
399 let args: Args = serde_json::from_value(args)?;
400 worker.perform(args).await
401}
402
403impl WorkerRef {
404 pub(crate) fn wrap<Args, W>(worker: Arc<W>) -> Self
405 where
406 Args: Send + Sync + 'static,
407 W: Worker<Args> + 'static,
408 for<'de> Args: Deserialize<'de>,
409 {
410 Self {
411 work_fn: Arc::new(Box::new({
412 let worker = worker.clone();
413 move |args: JsonValue| {
414 let worker = worker.clone();
415 Box::pin(async move { invoke_worker(args, worker).await })
416 }
417 })),
418 max_retries: worker.max_retries(),
419 }
420 }
421
422 pub(crate) fn not_found(class: String) -> Self {
423 Self {
424 work_fn: Arc::new(Box::new(move |_args: JsonValue| {
425 let class = class.clone();
426 Box::pin(async move {
427 Err(Error::Message(format!(
428 "Worker not found for class: {class}"
429 )))
430 })
431 })),
432 max_retries: 25,
433 }
434 }
435
436 #[must_use]
437 pub fn max_retries(&self) -> usize {
438 self.max_retries
439 }
440
441 pub async fn call(&self, args: JsonValue) -> Result<()> {
442 (Arc::clone(&self.work_fn))(args).await
443 }
444}
445
446#[derive(Clone, Debug, PartialEq)]
447pub enum RetryOpts {
448 Yes,
449 Never,
450 Max(usize),
451}
452
453impl Serialize for RetryOpts {
454 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
455 where
456 S: Serializer,
457 {
458 match *self {
459 RetryOpts::Yes => serializer.serialize_bool(true),
460 RetryOpts::Never => serializer.serialize_bool(false),
461 RetryOpts::Max(value) => serializer.serialize_u64(value as u64),
462 }
463 }
464}
465
466impl<'de> Deserialize<'de> for RetryOpts {
467 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
468 where
469 D: Deserializer<'de>,
470 {
471 struct RetryOptsVisitor;
472
473 impl Visitor<'_> for RetryOptsVisitor {
474 type Value = RetryOpts;
475
476 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
477 formatter.write_str("a boolean, null, or a positive integer")
478 }
479
480 fn visit_bool<E>(self, value: bool) -> std::result::Result<Self::Value, E>
481 where
482 E: de::Error,
483 {
484 if value {
485 Ok(RetryOpts::Yes)
486 } else {
487 Ok(RetryOpts::Never)
488 }
489 }
490
491 fn visit_none<E>(self) -> std::result::Result<Self::Value, E>
492 where
493 E: de::Error,
494 {
495 Ok(RetryOpts::Never)
496 }
497
498 fn visit_u64<E>(self, value: u64) -> std::result::Result<Self::Value, E>
499 where
500 E: de::Error,
501 {
502 Ok(RetryOpts::Max(value as usize))
503 }
504 }
505
506 deserializer.deserialize_any(RetryOptsVisitor)
507 }
508}
509
510impl From<bool> for RetryOpts {
511 fn from(value: bool) -> Self {
512 match value {
513 true => RetryOpts::Yes,
514 false => RetryOpts::Never,
515 }
516 }
517}
518
519impl From<usize> for RetryOpts {
520 fn from(value: usize) -> Self {
521 RetryOpts::Max(value)
522 }
523}
524
525#[derive(Serialize, Deserialize, Debug, Clone)]
541pub struct Job {
542 pub queue: String,
543 pub args: JsonValue,
544 pub retry: RetryOpts,
545 pub class: String,
546 pub jid: String,
547 pub created_at: f64,
548 pub enqueued_at: Option<f64>,
549 pub failed_at: Option<f64>,
550 pub error_message: Option<String>,
551 pub error_class: Option<String>,
552 pub retry_count: Option<usize>,
553 pub retried_at: Option<f64>,
554 pub retry_queue: Option<String>,
555
556 #[serde(skip)]
557 pub unique_for: Option<std::time::Duration>,
558}
559
560#[derive(Debug)]
561pub struct UnitOfWork {
562 pub queue: String,
563 pub job: Job,
564}
565
566impl UnitOfWork {
567 #[must_use]
568 pub fn from_job(job: Job) -> Self {
569 Self {
570 queue: format!("queue:{}", &job.queue),
571 job,
572 }
573 }
574
575 pub fn from_job_string(job_str: String) -> Result<Self> {
576 let job: Job = serde_json::from_str(&job_str)?;
577 Ok(Self::from_job(job))
578 }
579
580 pub async fn enqueue(&self, redis: &RedisPool) -> Result<()> {
581 let mut redis = redis.get().await?;
582 self.enqueue_direct(&mut redis).await
583 }
584
585 pub(crate) async fn enqueue_direct(&self, redis: &mut RedisConnection) -> Result<()> {
586 let mut job = self.job.clone();
587 job.enqueued_at = Some(chrono::Utc::now().timestamp_millis() as f64);
588
589 if let Some(ref duration) = job.unique_for {
590 let args_as_json_string: String = serde_json::to_string(&job.args)?;
594 let args_hash = format!("{:x}", Sha256::digest(&args_as_json_string));
595 let redis_key = format!(
596 "sidekiq:unique:{}:{}:{}",
597 &job.queue, &job.class, &args_hash
598 );
599 let result = redis
600 .set_nx_ex(redis_key, "", duration.as_secs() as usize)
601 .await?;
602
603 if matches!(result, redis::RedisValue::Nil) {
606 return Ok(());
608 }
609 }
610
611 redis.sadd("queues".to_string(), job.queue.clone()).await?;
612
613 redis
614 .lpush(self.queue.clone(), serde_json::to_string(&job)?)
615 .await?;
616 Ok(())
617 }
618
619 pub async fn reenqueue(&mut self, redis: &RedisPool) -> Result<()> {
620 if let Some(retry_count) = self.job.retry_count {
621 redis
622 .get()
623 .await?
624 .zadd(
625 "retry".to_string(),
626 serde_json::to_string(&self.job)?,
627 Self::retry_job_at(retry_count).timestamp(),
628 )
629 .await?;
630 }
631
632 Ok(())
633 }
634
635 fn retry_job_at(count: usize) -> chrono::DateTime<chrono::Utc> {
636 let seconds_to_delay = count.pow(4) + 15 + rand::rng().random_range(0..(10 * (count + 1)));
637
638 chrono::Utc::now() + chrono::Duration::seconds(seconds_to_delay as i64)
639 }
640
641 pub async fn schedule(
642 &mut self,
643 redis: &RedisPool,
644 duration: std::time::Duration,
645 ) -> Result<()> {
646 let enqueue_at = chrono::Utc::now() + chrono::Duration::from_std(duration)?;
647
648 redis
649 .get()
650 .await?
651 .zadd(
652 "schedule".to_string(),
653 serde_json::to_string(&self.job)?,
654 enqueue_at.timestamp(),
655 )
656 .await?;
657
658 Ok(())
659 }
660}
661
662#[cfg(test)]
663mod test {
664 use super::*;
665
666 mod my {
667 pub mod cool {
668 pub mod workers {
669 use super::super::super::super::*;
670
671 #[allow(dead_code)]
672 pub struct TestOpts;
673
674 #[async_trait]
675 impl Worker<()> for TestOpts {
676 fn opts() -> WorkerOpts<(), Self>
677 where
678 Self: Sized,
679 {
680 WorkerOpts::new()
681 .retry(false)
683 .retry(42)
685 .retry(RetryOpts::Never)
687 .unique_for(std::time::Duration::from_secs(30))
688 .queue("yolo_quue")
689 }
690
691 async fn perform(&self, _args: ()) -> Result<()> {
692 Ok(())
693 }
694 }
695
696 pub struct X1Y2MyJob;
697
698 #[async_trait]
699 impl Worker<()> for X1Y2MyJob {
700 async fn perform(&self, _args: ()) -> Result<()> {
701 Ok(())
702 }
703 }
704
705 pub struct TestModuleWorker;
706
707 #[async_trait]
708 impl Worker<()> for TestModuleWorker {
709 async fn perform(&self, _args: ()) -> Result<()> {
710 Ok(())
711 }
712 }
713
714 pub struct TestCustomClassNameWorker;
715
716 #[async_trait]
717 impl Worker<()> for TestCustomClassNameWorker {
718 async fn perform(&self, _args: ()) -> Result<()> {
719 Ok(())
720 }
721
722 fn class_name() -> String
723 where
724 Self: Sized,
725 {
726 "My::Cool::Workers::TestCustomClassNameWorker".to_string()
727 }
728 }
729 }
730 }
731 }
732
733 #[tokio::test]
734 async fn ignores_modules_in_ruby_worker_name() {
735 assert_eq!(
736 my::cool::workers::TestModuleWorker::class_name(),
737 "TestModuleWorker".to_string()
738 );
739 }
740
741 #[tokio::test]
742 async fn does_not_reformat_valid_ruby_class_names() {
743 assert_eq!(
744 my::cool::workers::X1Y2MyJob::class_name(),
745 "X1Y2MyJob".to_string()
746 );
747 }
748
749 #[tokio::test]
750 async fn supports_custom_class_name_for_workers() {
751 assert_eq!(
752 my::cool::workers::TestCustomClassNameWorker::class_name(),
753 "My::Cool::Workers::TestCustomClassNameWorker".to_string()
754 );
755 }
756
757 #[derive(Clone, Deserialize, Serialize, Debug)]
758 struct TestArg {
759 name: String,
760 age: i32,
761 }
762
763 struct TestGenericWorker;
764 #[async_trait]
765 impl Worker<TestArg> for TestGenericWorker {
766 async fn perform(&self, _args: TestArg) -> Result<()> {
767 Ok(())
768 }
769 }
770
771 struct TestMultiArgWorker;
772 #[async_trait]
773 impl Worker<(TestArg, TestArg)> for TestMultiArgWorker {
774 async fn perform(&self, _args: (TestArg, TestArg)) -> Result<()> {
775 Ok(())
776 }
777 }
778
779 struct TestTupleArgWorker;
780 #[async_trait]
781 impl Worker<(TestArg,)> for TestTupleArgWorker {
782 fn disable_argument_coercion(&self) -> bool {
783 true
784 }
785 async fn perform(&self, _args: (TestArg,)) -> Result<()> {
786 Ok(())
787 }
788 }
789
790 struct TestVecArgWorker;
791 #[async_trait]
792 impl Worker<Vec<TestArg>> for TestVecArgWorker {
793 fn disable_argument_coercion(&self) -> bool {
794 true
795 }
796 async fn perform(&self, _args: Vec<TestArg>) -> Result<()> {
797 Ok(())
798 }
799 }
800
801 #[tokio::test]
802 async fn can_have_a_vec_with_one_or_more_items() {
803 let worker = Arc::new(TestVecArgWorker);
805 let wrap = Arc::new(WorkerRef::wrap(worker));
806 let wrap = wrap.clone();
807 let arg = serde_json::to_value(vec![TestArg {
808 name: "test A".into(),
809 age: 1337,
810 }])
811 .unwrap();
812 wrap.call(arg).await.unwrap();
813
814 let worker = Arc::new(TestVecArgWorker);
816 let wrap = Arc::new(WorkerRef::wrap(worker));
817 let wrap = wrap.clone();
818 let arg = serde_json::to_value(vec![
819 TestArg {
820 name: "test A".into(),
821 age: 1337,
822 },
823 TestArg {
824 name: "test A".into(),
825 age: 1337,
826 },
827 ])
828 .unwrap();
829 wrap.call(arg).await.unwrap();
830 }
831
832 #[tokio::test]
833 async fn can_have_multiple_arguments() {
834 let worker = Arc::new(TestMultiArgWorker);
835 let wrap = Arc::new(WorkerRef::wrap(worker));
836 let wrap = wrap.clone();
837 let arg = serde_json::to_value((
838 TestArg {
839 name: "test A".into(),
840 age: 1337,
841 },
842 TestArg {
843 name: "test B".into(),
844 age: 1336,
845 },
846 ))
847 .unwrap();
848 wrap.call(arg).await.unwrap();
849 }
850
851 #[tokio::test]
852 async fn can_have_a_single_tuple_argument() {
853 let worker = Arc::new(TestTupleArgWorker);
854 let wrap = Arc::new(WorkerRef::wrap(worker));
855 let wrap = wrap.clone();
856 let arg = serde_json::to_value((TestArg {
857 name: "test".into(),
858 age: 1337,
859 },))
860 .unwrap();
861 wrap.call(arg).await.unwrap();
862 }
863
864 #[tokio::test]
865 async fn can_have_a_single_argument() {
866 let worker = Arc::new(TestGenericWorker);
867 let wrap = Arc::new(WorkerRef::wrap(worker));
868 let wrap = wrap.clone();
869 let arg = serde_json::to_value(TestArg {
870 name: "test".into(),
871 age: 1337,
872 })
873 .unwrap();
874 wrap.call(arg).await.unwrap();
875 }
876
877 #[tokio::test]
878 async fn processor_config_has_workers_by_default() {
879 let cfg = ProcessorConfig::default();
880
881 assert!(
882 cfg.num_workers > 0,
883 "num_workers should be greater than 0 (using num cpu)"
884 );
885
886 let cfg = cfg.num_workers(1000);
887
888 assert_eq!(cfg.num_workers, 1000);
889 }
890}