Skip to main content

sidekiq/
lib.rs

1use async_trait::async_trait;
2use middleware::Chain;
3use rand::{Rng, RngCore};
4use serde::{
5    de::{self, Deserializer, Visitor},
6    Deserialize, Serialize, Serializer,
7};
8use serde_json::Value as JsonValue;
9use sha2::{Digest, Sha256};
10use std::future::Future;
11use std::marker::PhantomData;
12use std::pin::Pin;
13use std::sync::Arc;
14
15pub mod periodic;
16
17mod middleware;
18mod processor;
19mod redis;
20mod scheduled;
21mod stats;
22
23// Re-export
24pub use crate::redis::{
25    with_custom_namespace, RedisConnection, RedisConnectionManager, RedisError, RedisPool,
26};
27pub use ::redis as redis_rs;
28pub use middleware::{ChainIter, ServerMiddleware};
29pub use processor::{BalanceStrategy, Processor, ProcessorConfig, QueueConfig, WorkFetcher};
30pub use scheduled::Scheduled;
31pub use stats::{Counter, StatsPublisher};
32
33#[derive(thiserror::Error, Debug)]
34pub enum Error {
35    #[error("{0}")]
36    Message(String),
37
38    #[error(transparent)]
39    Json(#[from] serde_json::Error),
40
41    #[error(transparent)]
42    CronClock(#[from] cron_clock::error::Error),
43
44    #[error(transparent)]
45    BB8(#[from] bb8::RunError<redis::RedisError>),
46
47    #[error(transparent)]
48    ChronoRange(#[from] chrono::OutOfRangeError),
49
50    #[error(transparent)]
51    Redis(#[from] redis::RedisError),
52
53    #[error(transparent)]
54    Any(#[from] Box<dyn std::error::Error + Send + Sync>),
55}
56
57pub type Result<T> = std::result::Result<T, Error>;
58
59#[must_use]
60pub fn opts() -> EnqueueOpts {
61    EnqueueOpts {
62        queue: "default".into(),
63        retry: RetryOpts::Yes,
64        unique_for: None,
65        retry_queue: None,
66    }
67}
68
69pub struct EnqueueOpts {
70    queue: String,
71    retry: RetryOpts,
72    unique_for: Option<std::time::Duration>,
73    retry_queue: Option<String>,
74}
75
76impl EnqueueOpts {
77    #[must_use]
78    pub fn queue<S: Into<String>>(self, queue: S) -> Self {
79        Self {
80            queue: queue.into(),
81            ..self
82        }
83    }
84
85    #[must_use]
86    pub fn retry<RO>(self, retry: RO) -> Self
87    where
88        RO: Into<RetryOpts>,
89    {
90        Self {
91            retry: retry.into(),
92            ..self
93        }
94    }
95
96    #[must_use]
97    pub fn unique_for(self, unique_for: std::time::Duration) -> Self {
98        Self {
99            unique_for: Some(unique_for),
100            ..self
101        }
102    }
103
104    #[must_use]
105    pub fn retry_queue(self, retry_queue: String) -> Self {
106        Self {
107            retry_queue: Some(retry_queue),
108            ..self
109        }
110    }
111
112    pub fn create_job(&self, class: String, args: impl serde::Serialize) -> Result<Job> {
113        let args = serde_json::to_value(args)?;
114
115        // Ensure args are always wrapped in an array.
116        let args = if args.is_array() {
117            args
118        } else {
119            JsonValue::Array(vec![args])
120        };
121
122        Ok(Job {
123            queue: self.queue.clone(),
124            class,
125            jid: new_jid(),
126            created_at: chrono::Utc::now().timestamp_millis() as f64,
127            enqueued_at: None,
128            retry: self.retry.clone(),
129            args,
130
131            // Make default eventually...
132            error_message: None,
133            error_class: None,
134            failed_at: None,
135            retry_count: None,
136            retried_at: None,
137
138            // Meta for enqueueing
139            retry_queue: self.retry_queue.clone(),
140            unique_for: self.unique_for,
141        })
142    }
143
144    pub async fn perform_async(
145        self,
146        redis: &RedisPool,
147        class: String,
148        args: impl serde::Serialize,
149    ) -> Result<()> {
150        let job = self.create_job(class, args)?;
151        UnitOfWork::from_job(job).enqueue(redis).await?;
152        Ok(())
153    }
154
155    pub async fn perform_in(
156        &self,
157        redis: &RedisPool,
158        class: String,
159        duration: std::time::Duration,
160        args: impl serde::Serialize,
161    ) -> Result<()> {
162        let job = self.create_job(class, args)?;
163        UnitOfWork::from_job(job).schedule(redis, duration).await?;
164        Ok(())
165    }
166}
167
168/// Helper function for enqueueing a worker into sidekiq.
169/// This can be used to enqueue a job for a ruby sidekiq worker to process.
170pub async fn perform_async(
171    redis: &RedisPool,
172    class: String,
173    queue: String,
174    args: impl serde::Serialize,
175) -> Result<()> {
176    opts().queue(queue).perform_async(redis, class, args).await
177}
178
179/// Helper function for enqueueing a worker into sidekiq.
180/// This can be used to enqueue a job for a ruby sidekiq worker to process.
181pub async fn perform_in(
182    redis: &RedisPool,
183    duration: std::time::Duration,
184    class: String,
185    queue: String,
186    args: impl serde::Serialize,
187) -> Result<()> {
188    opts()
189        .queue(queue)
190        .perform_in(redis, class, duration, args)
191        .await
192}
193
194fn new_jid() -> String {
195    let mut bytes = [0u8; 12];
196    rand::rng().fill_bytes(&mut bytes);
197    hex::encode(bytes)
198}
199
200pub struct WorkerOpts<Args, W: Worker<Args> + ?Sized> {
201    queue: String,
202    retry: RetryOpts,
203    args: PhantomData<Args>,
204    worker: PhantomData<W>,
205    unique_for: Option<std::time::Duration>,
206    retry_queue: Option<String>,
207}
208
209impl<Args, W> WorkerOpts<Args, W>
210where
211    W: Worker<Args>,
212{
213    #[must_use]
214    pub fn new() -> Self {
215        Self {
216            queue: "default".into(),
217            retry: RetryOpts::Yes,
218            args: PhantomData,
219            worker: PhantomData,
220            unique_for: None,
221            retry_queue: None,
222        }
223    }
224
225    #[must_use]
226    pub fn retry<RO>(self, retry: RO) -> Self
227    where
228        RO: Into<RetryOpts>,
229    {
230        Self {
231            retry: retry.into(),
232            ..self
233        }
234    }
235
236    #[must_use]
237    pub fn retry_queue<S: Into<String>>(self, retry_queue: S) -> Self {
238        Self {
239            retry_queue: Some(retry_queue.into()),
240            ..self
241        }
242    }
243
244    #[must_use]
245    pub fn queue<S: Into<String>>(self, queue: S) -> Self {
246        Self {
247            queue: queue.into(),
248            ..self
249        }
250    }
251
252    #[must_use]
253    pub fn unique_for(self, unique_for: std::time::Duration) -> Self {
254        Self {
255            unique_for: Some(unique_for),
256            ..self
257        }
258    }
259
260    #[allow(clippy::wrong_self_convention)]
261    pub fn into_opts(&self) -> EnqueueOpts {
262        self.into()
263    }
264
265    pub async fn perform_async(
266        &self,
267        redis: &RedisPool,
268        args: impl serde::Serialize + Send + 'static,
269    ) -> Result<()> {
270        self.into_opts()
271            .perform_async(redis, W::class_name(), args)
272            .await
273    }
274
275    pub async fn perform_in(
276        &self,
277        redis: &RedisPool,
278        duration: std::time::Duration,
279        args: impl serde::Serialize + Send + 'static,
280    ) -> Result<()> {
281        self.into_opts()
282            .perform_in(redis, W::class_name(), duration, args)
283            .await
284    }
285}
286
287impl<Args, W: Worker<Args>> From<&WorkerOpts<Args, W>> for EnqueueOpts {
288    fn from(opts: &WorkerOpts<Args, W>) -> Self {
289        Self {
290            retry: opts.retry.clone(),
291            queue: opts.queue.clone(),
292            unique_for: opts.unique_for,
293            retry_queue: opts.retry_queue.clone(),
294        }
295    }
296}
297
298impl<Args, W: Worker<Args>> Default for WorkerOpts<Args, W> {
299    fn default() -> Self {
300        Self::new()
301    }
302}
303
304#[async_trait]
305pub trait Worker<Args>: Send + Sync {
306    /// Signal to WorkerRef to not attempt to modify the JsonValue args
307    /// before calling the perform function. This is useful if the args
308    /// are expected to be a `Vec<T>` that might be `len() == 1` or a
309    /// single sized tuple `(T,)`.
310    fn disable_argument_coercion(&self) -> bool {
311        false
312    }
313
314    #[must_use]
315    fn opts() -> WorkerOpts<Args, Self>
316    where
317        Self: Sized,
318    {
319        WorkerOpts::new()
320    }
321
322    // TODO: Make configurable through opts and make opts accessible to the
323    // retry middleware through a Box<dyn Worker>.
324    fn max_retries(&self) -> usize {
325        25
326    }
327
328    /// Derive a class_name from the Worker type to be used with sidekiq. By default
329    /// this method will
330    #[must_use]
331    fn class_name() -> String
332    where
333        Self: Sized,
334    {
335        use convert_case::{Case, Casing};
336
337        let type_name = std::any::type_name::<Self>();
338        let name = type_name.split("::").last().unwrap_or(type_name);
339        name.to_case(Case::UpperCamel)
340    }
341
342    async fn perform_async(redis: &RedisPool, args: Args) -> Result<()>
343    where
344        Self: Sized,
345        Args: Send + Sync + serde::Serialize + 'static,
346    {
347        Self::opts().perform_async(redis, args).await
348    }
349
350    async fn perform_in(redis: &RedisPool, duration: std::time::Duration, args: Args) -> Result<()>
351    where
352        Self: Sized,
353        Args: Send + Sync + serde::Serialize + 'static,
354    {
355        Self::opts().perform_in(redis, duration, args).await
356    }
357
358    async fn perform(&self, args: Args) -> Result<()>;
359}
360
361// We can't store a Vec<Box<dyn Worker<Args>>>, because that will only work
362// for a single arg type, but since any worker is JsonValue in and Result out,
363// we can wrap that generic work in a callback that shares the same type.
364// I'm sure this has a fancy name, but I don't know what it is.
365#[derive(Clone)]
366pub struct WorkerRef {
367    #[allow(clippy::type_complexity)]
368    work_fn: Arc<
369        Box<dyn Fn(JsonValue) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>,
370    >,
371    max_retries: usize,
372}
373
374async fn invoke_worker<Args, W>(args: JsonValue, worker: Arc<W>) -> Result<()>
375where
376    Args: Send + Sync + 'static,
377    W: Worker<Args> + 'static,
378    for<'de> Args: Deserialize<'de>,
379{
380    let args = if worker.disable_argument_coercion() {
381        args
382    } else {
383        // Ensure any caller expecting to receive `()` will always work.
384        if std::any::TypeId::of::<Args>() == std::any::TypeId::of::<()>() {
385            JsonValue::Null
386        } else {
387            // If the value contains a single item Vec then
388            // you can probably be sure that this is a single value item.
389            // Otherwise, the caller can impl a tuple type.
390            match args {
391                JsonValue::Array(mut arr) if arr.len() == 1 => {
392                    arr.pop().expect("value change after size check")
393                }
394                _ => args,
395            }
396        }
397    };
398
399    let args: Args = serde_json::from_value(args)?;
400    worker.perform(args).await
401}
402
403impl WorkerRef {
404    pub(crate) fn wrap<Args, W>(worker: Arc<W>) -> Self
405    where
406        Args: Send + Sync + 'static,
407        W: Worker<Args> + 'static,
408        for<'de> Args: Deserialize<'de>,
409    {
410        Self {
411            work_fn: Arc::new(Box::new({
412                let worker = worker.clone();
413                move |args: JsonValue| {
414                    let worker = worker.clone();
415                    Box::pin(async move { invoke_worker(args, worker).await })
416                }
417            })),
418            max_retries: worker.max_retries(),
419        }
420    }
421
422    pub(crate) fn not_found(class: String) -> Self {
423        Self {
424            work_fn: Arc::new(Box::new(move |_args: JsonValue| {
425                let class = class.clone();
426                Box::pin(async move {
427                    Err(Error::Message(format!(
428                        "Worker not found for class: {class}"
429                    )))
430                })
431            })),
432            max_retries: 25,
433        }
434    }
435
436    #[must_use]
437    pub fn max_retries(&self) -> usize {
438        self.max_retries
439    }
440
441    pub async fn call(&self, args: JsonValue) -> Result<()> {
442        (Arc::clone(&self.work_fn))(args).await
443    }
444}
445
446#[derive(Clone, Debug, PartialEq)]
447pub enum RetryOpts {
448    Yes,
449    Never,
450    Max(usize),
451}
452
453impl Serialize for RetryOpts {
454    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
455    where
456        S: Serializer,
457    {
458        match *self {
459            RetryOpts::Yes => serializer.serialize_bool(true),
460            RetryOpts::Never => serializer.serialize_bool(false),
461            RetryOpts::Max(value) => serializer.serialize_u64(value as u64),
462        }
463    }
464}
465
466impl<'de> Deserialize<'de> for RetryOpts {
467    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
468    where
469        D: Deserializer<'de>,
470    {
471        struct RetryOptsVisitor;
472
473        impl Visitor<'_> for RetryOptsVisitor {
474            type Value = RetryOpts;
475
476            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
477                formatter.write_str("a boolean, null, or a positive integer")
478            }
479
480            fn visit_bool<E>(self, value: bool) -> std::result::Result<Self::Value, E>
481            where
482                E: de::Error,
483            {
484                if value {
485                    Ok(RetryOpts::Yes)
486                } else {
487                    Ok(RetryOpts::Never)
488                }
489            }
490
491            fn visit_none<E>(self) -> std::result::Result<Self::Value, E>
492            where
493                E: de::Error,
494            {
495                Ok(RetryOpts::Never)
496            }
497
498            fn visit_u64<E>(self, value: u64) -> std::result::Result<Self::Value, E>
499            where
500                E: de::Error,
501            {
502                Ok(RetryOpts::Max(value as usize))
503            }
504        }
505
506        deserializer.deserialize_any(RetryOptsVisitor)
507    }
508}
509
510impl From<bool> for RetryOpts {
511    fn from(value: bool) -> Self {
512        match value {
513            true => RetryOpts::Yes,
514            false => RetryOpts::Never,
515        }
516    }
517}
518
519impl From<usize> for RetryOpts {
520    fn from(value: usize) -> Self {
521        RetryOpts::Max(value)
522    }
523}
524
525//
526// {
527//   "retry": true,
528//   "queue": "yolo",
529//   "class": "YoloWorker",
530//   "args": [
531//     {
532//       "yolo": "hiiii"
533//     }
534//   ],
535//   "jid": "f33f7063c6d7a4db0869289a",
536//   "created_at": 1647119929.3788748,
537//   "enqueued_at": 1647119929.378998
538// }
539//
540#[derive(Serialize, Deserialize, Debug, Clone)]
541pub struct Job {
542    pub queue: String,
543    pub args: JsonValue,
544    pub retry: RetryOpts,
545    pub class: String,
546    pub jid: String,
547    pub created_at: f64,
548    pub enqueued_at: Option<f64>,
549    pub failed_at: Option<f64>,
550    pub error_message: Option<String>,
551    pub error_class: Option<String>,
552    pub retry_count: Option<usize>,
553    pub retried_at: Option<f64>,
554    pub retry_queue: Option<String>,
555
556    #[serde(skip)]
557    pub unique_for: Option<std::time::Duration>,
558}
559
560#[derive(Debug)]
561pub struct UnitOfWork {
562    pub queue: String,
563    pub job: Job,
564}
565
566impl UnitOfWork {
567    #[must_use]
568    pub fn from_job(job: Job) -> Self {
569        Self {
570            queue: format!("queue:{}", &job.queue),
571            job,
572        }
573    }
574
575    pub fn from_job_string(job_str: String) -> Result<Self> {
576        let job: Job = serde_json::from_str(&job_str)?;
577        Ok(Self::from_job(job))
578    }
579
580    pub async fn enqueue(&self, redis: &RedisPool) -> Result<()> {
581        let mut redis = redis.get().await?;
582        self.enqueue_direct(&mut redis).await
583    }
584
585    pub(crate) async fn enqueue_direct(&self, redis: &mut RedisConnection) -> Result<()> {
586        let mut job = self.job.clone();
587        job.enqueued_at = Some(chrono::Utc::now().timestamp_millis() as f64);
588
589        if let Some(ref duration) = job.unique_for {
590            // Check to see if this is unique for the given duration.
591            // Even though SET k v NX EQ ttl isn't the best locking
592            // mechanism, I think it's "good enough" to prove this out.
593            let args_as_json_string: String = serde_json::to_string(&job.args)?;
594            let args_hash = format!("{:x}", Sha256::digest(&args_as_json_string));
595            let redis_key = format!(
596                "sidekiq:unique:{}:{}:{}",
597                &job.queue, &job.class, &args_hash
598            );
599            let result = redis
600                .set_nx_ex(redis_key, "", duration.as_secs() as usize)
601                .await?;
602
603            // SET NX returns Nil when the key already exists (job is a duplicate).
604            // Any non-Nil response (e.g. "OK") means the key was set successfully.
605            if matches!(result, redis::RedisValue::Nil) {
606                // This job has already been enqueued. Do not submit it to redis.
607                return Ok(());
608            }
609        }
610
611        redis.sadd("queues".to_string(), job.queue.clone()).await?;
612
613        redis
614            .lpush(self.queue.clone(), serde_json::to_string(&job)?)
615            .await?;
616        Ok(())
617    }
618
619    pub async fn reenqueue(&mut self, redis: &RedisPool) -> Result<()> {
620        if let Some(retry_count) = self.job.retry_count {
621            redis
622                .get()
623                .await?
624                .zadd(
625                    "retry".to_string(),
626                    serde_json::to_string(&self.job)?,
627                    Self::retry_job_at(retry_count).timestamp(),
628                )
629                .await?;
630        }
631
632        Ok(())
633    }
634
635    fn retry_job_at(count: usize) -> chrono::DateTime<chrono::Utc> {
636        let seconds_to_delay = count.pow(4) + 15 + rand::rng().random_range(0..(10 * (count + 1)));
637
638        chrono::Utc::now() + chrono::Duration::seconds(seconds_to_delay as i64)
639    }
640
641    pub async fn schedule(
642        &mut self,
643        redis: &RedisPool,
644        duration: std::time::Duration,
645    ) -> Result<()> {
646        let enqueue_at = chrono::Utc::now() + chrono::Duration::from_std(duration)?;
647
648        redis
649            .get()
650            .await?
651            .zadd(
652                "schedule".to_string(),
653                serde_json::to_string(&self.job)?,
654                enqueue_at.timestamp(),
655            )
656            .await?;
657
658        Ok(())
659    }
660}
661
662#[cfg(test)]
663mod test {
664    use super::*;
665
666    mod my {
667        pub mod cool {
668            pub mod workers {
669                use super::super::super::super::*;
670
671                #[allow(dead_code)]
672                pub struct TestOpts;
673
674                #[async_trait]
675                impl Worker<()> for TestOpts {
676                    fn opts() -> WorkerOpts<(), Self>
677                    where
678                        Self: Sized,
679                    {
680                        WorkerOpts::new()
681                            // Test bool
682                            .retry(false)
683                            // Test usize
684                            .retry(42)
685                            // Test the new type
686                            .retry(RetryOpts::Never)
687                            .unique_for(std::time::Duration::from_secs(30))
688                            .queue("yolo_quue")
689                    }
690
691                    async fn perform(&self, _args: ()) -> Result<()> {
692                        Ok(())
693                    }
694                }
695
696                pub struct X1Y2MyJob;
697
698                #[async_trait]
699                impl Worker<()> for X1Y2MyJob {
700                    async fn perform(&self, _args: ()) -> Result<()> {
701                        Ok(())
702                    }
703                }
704
705                pub struct TestModuleWorker;
706
707                #[async_trait]
708                impl Worker<()> for TestModuleWorker {
709                    async fn perform(&self, _args: ()) -> Result<()> {
710                        Ok(())
711                    }
712                }
713
714                pub struct TestCustomClassNameWorker;
715
716                #[async_trait]
717                impl Worker<()> for TestCustomClassNameWorker {
718                    async fn perform(&self, _args: ()) -> Result<()> {
719                        Ok(())
720                    }
721
722                    fn class_name() -> String
723                    where
724                        Self: Sized,
725                    {
726                        "My::Cool::Workers::TestCustomClassNameWorker".to_string()
727                    }
728                }
729            }
730        }
731    }
732
733    #[tokio::test]
734    async fn ignores_modules_in_ruby_worker_name() {
735        assert_eq!(
736            my::cool::workers::TestModuleWorker::class_name(),
737            "TestModuleWorker".to_string()
738        );
739    }
740
741    #[tokio::test]
742    async fn does_not_reformat_valid_ruby_class_names() {
743        assert_eq!(
744            my::cool::workers::X1Y2MyJob::class_name(),
745            "X1Y2MyJob".to_string()
746        );
747    }
748
749    #[tokio::test]
750    async fn supports_custom_class_name_for_workers() {
751        assert_eq!(
752            my::cool::workers::TestCustomClassNameWorker::class_name(),
753            "My::Cool::Workers::TestCustomClassNameWorker".to_string()
754        );
755    }
756
757    #[derive(Clone, Deserialize, Serialize, Debug)]
758    struct TestArg {
759        name: String,
760        age: i32,
761    }
762
763    struct TestGenericWorker;
764    #[async_trait]
765    impl Worker<TestArg> for TestGenericWorker {
766        async fn perform(&self, _args: TestArg) -> Result<()> {
767            Ok(())
768        }
769    }
770
771    struct TestMultiArgWorker;
772    #[async_trait]
773    impl Worker<(TestArg, TestArg)> for TestMultiArgWorker {
774        async fn perform(&self, _args: (TestArg, TestArg)) -> Result<()> {
775            Ok(())
776        }
777    }
778
779    struct TestTupleArgWorker;
780    #[async_trait]
781    impl Worker<(TestArg,)> for TestTupleArgWorker {
782        fn disable_argument_coercion(&self) -> bool {
783            true
784        }
785        async fn perform(&self, _args: (TestArg,)) -> Result<()> {
786            Ok(())
787        }
788    }
789
790    struct TestVecArgWorker;
791    #[async_trait]
792    impl Worker<Vec<TestArg>> for TestVecArgWorker {
793        fn disable_argument_coercion(&self) -> bool {
794            true
795        }
796        async fn perform(&self, _args: Vec<TestArg>) -> Result<()> {
797            Ok(())
798        }
799    }
800
801    #[tokio::test]
802    async fn can_have_a_vec_with_one_or_more_items() {
803        // One item
804        let worker = Arc::new(TestVecArgWorker);
805        let wrap = Arc::new(WorkerRef::wrap(worker));
806        let wrap = wrap.clone();
807        let arg = serde_json::to_value(vec![TestArg {
808            name: "test A".into(),
809            age: 1337,
810        }])
811        .unwrap();
812        wrap.call(arg).await.unwrap();
813
814        // Multiple items
815        let worker = Arc::new(TestVecArgWorker);
816        let wrap = Arc::new(WorkerRef::wrap(worker));
817        let wrap = wrap.clone();
818        let arg = serde_json::to_value(vec![
819            TestArg {
820                name: "test A".into(),
821                age: 1337,
822            },
823            TestArg {
824                name: "test A".into(),
825                age: 1337,
826            },
827        ])
828        .unwrap();
829        wrap.call(arg).await.unwrap();
830    }
831
832    #[tokio::test]
833    async fn can_have_multiple_arguments() {
834        let worker = Arc::new(TestMultiArgWorker);
835        let wrap = Arc::new(WorkerRef::wrap(worker));
836        let wrap = wrap.clone();
837        let arg = serde_json::to_value((
838            TestArg {
839                name: "test A".into(),
840                age: 1337,
841            },
842            TestArg {
843                name: "test B".into(),
844                age: 1336,
845            },
846        ))
847        .unwrap();
848        wrap.call(arg).await.unwrap();
849    }
850
851    #[tokio::test]
852    async fn can_have_a_single_tuple_argument() {
853        let worker = Arc::new(TestTupleArgWorker);
854        let wrap = Arc::new(WorkerRef::wrap(worker));
855        let wrap = wrap.clone();
856        let arg = serde_json::to_value((TestArg {
857            name: "test".into(),
858            age: 1337,
859        },))
860        .unwrap();
861        wrap.call(arg).await.unwrap();
862    }
863
864    #[tokio::test]
865    async fn can_have_a_single_argument() {
866        let worker = Arc::new(TestGenericWorker);
867        let wrap = Arc::new(WorkerRef::wrap(worker));
868        let wrap = wrap.clone();
869        let arg = serde_json::to_value(TestArg {
870            name: "test".into(),
871            age: 1337,
872        })
873        .unwrap();
874        wrap.call(arg).await.unwrap();
875    }
876
877    #[tokio::test]
878    async fn processor_config_has_workers_by_default() {
879        let cfg = ProcessorConfig::default();
880
881        assert!(
882            cfg.num_workers > 0,
883            "num_workers should be greater than 0 (using num cpu)"
884        );
885
886        let cfg = cfg.num_workers(1000);
887
888        assert_eq!(cfg.num_workers, 1000);
889    }
890}