sidekiq/
lib.rs

1use async_trait::async_trait;
2use middleware::Chain;
3use rand::{Rng, RngCore};
4use serde::{
5    de::{self, Deserializer, Visitor},
6    Deserialize, Serialize, Serializer,
7};
8use serde_json::Value as JsonValue;
9use sha2::{Digest, Sha256};
10use std::future::Future;
11use std::marker::PhantomData;
12use std::pin::Pin;
13use std::sync::Arc;
14
15pub mod periodic;
16
17mod middleware;
18mod processor;
19mod redis;
20mod scheduled;
21mod stats;
22
23// Re-export
24pub use crate::redis::{
25    with_custom_namespace, RedisConnection, RedisConnectionManager, RedisError, RedisPool,
26};
27pub use ::redis as redis_rs;
28pub use middleware::{ChainIter, ServerMiddleware};
29pub use processor::{BalanceStrategy, Processor, ProcessorConfig, QueueConfig, WorkFetcher};
30pub use scheduled::Scheduled;
31pub use stats::{Counter, StatsPublisher};
32
33#[derive(thiserror::Error, Debug)]
34pub enum Error {
35    #[error("{0}")]
36    Message(String),
37
38    #[error(transparent)]
39    Json(#[from] serde_json::Error),
40
41    #[error(transparent)]
42    CronClock(#[from] cron_clock::error::Error),
43
44    #[error(transparent)]
45    BB8(#[from] bb8::RunError<redis::RedisError>),
46
47    #[error(transparent)]
48    ChronoRange(#[from] chrono::OutOfRangeError),
49
50    #[error(transparent)]
51    Redis(#[from] redis::RedisError),
52
53    #[error(transparent)]
54    Any(#[from] Box<dyn std::error::Error + Send + Sync>),
55}
56
57pub type Result<T> = std::result::Result<T, Error>;
58
59#[must_use]
60pub fn opts() -> EnqueueOpts {
61    EnqueueOpts {
62        queue: "default".into(),
63        retry: RetryOpts::Yes,
64        unique_for: None,
65        retry_queue: None,
66    }
67}
68
69pub struct EnqueueOpts {
70    queue: String,
71    retry: RetryOpts,
72    unique_for: Option<std::time::Duration>,
73    retry_queue: Option<String>,
74}
75
76impl EnqueueOpts {
77    #[must_use]
78    pub fn queue<S: Into<String>>(self, queue: S) -> Self {
79        Self {
80            queue: queue.into(),
81            ..self
82        }
83    }
84
85    #[must_use]
86    pub fn retry<RO>(self, retry: RO) -> Self
87    where
88        RO: Into<RetryOpts>,
89    {
90        Self {
91            retry: retry.into(),
92            ..self
93        }
94    }
95
96    #[must_use]
97    pub fn unique_for(self, unique_for: std::time::Duration) -> Self {
98        Self {
99            unique_for: Some(unique_for),
100            ..self
101        }
102    }
103
104    #[must_use]
105    pub fn retry_queue(self, retry_queue: String) -> Self {
106        Self {
107            retry_queue: Some(retry_queue),
108            ..self
109        }
110    }
111
112    pub fn create_job(&self, class: String, args: impl serde::Serialize) -> Result<Job> {
113        let args = serde_json::to_value(args)?;
114
115        // Ensure args are always wrapped in an array.
116        let args = if args.is_array() {
117            args
118        } else {
119            JsonValue::Array(vec![args])
120        };
121
122        Ok(Job {
123            queue: self.queue.clone(),
124            class,
125            jid: new_jid(),
126            created_at: chrono::Utc::now().timestamp() as f64,
127            enqueued_at: None,
128            retry: self.retry.clone(),
129            args,
130
131            // Make default eventually...
132            error_message: None,
133            error_class: None,
134            failed_at: None,
135            retry_count: None,
136            retried_at: None,
137
138            // Meta for enqueueing
139            retry_queue: self.retry_queue.clone(),
140            unique_for: self.unique_for,
141        })
142    }
143
144    pub async fn perform_async(
145        self,
146        redis: &RedisPool,
147        class: String,
148        args: impl serde::Serialize,
149    ) -> Result<()> {
150        let job = self.create_job(class, args)?;
151        UnitOfWork::from_job(job).enqueue(redis).await?;
152        Ok(())
153    }
154
155    pub async fn perform_in(
156        &self,
157        redis: &RedisPool,
158        class: String,
159        duration: std::time::Duration,
160        args: impl serde::Serialize,
161    ) -> Result<()> {
162        let job = self.create_job(class, args)?;
163        UnitOfWork::from_job(job).schedule(redis, duration).await?;
164        Ok(())
165    }
166}
167
168/// Helper function for enqueueing a worker into sidekiq.
169/// This can be used to enqueue a job for a ruby sidekiq worker to process.
170pub async fn perform_async(
171    redis: &RedisPool,
172    class: String,
173    queue: String,
174    args: impl serde::Serialize,
175) -> Result<()> {
176    opts().queue(queue).perform_async(redis, class, args).await
177}
178
179/// Helper function for enqueueing a worker into sidekiq.
180/// This can be used to enqueue a job for a ruby sidekiq worker to process.
181pub async fn perform_in(
182    redis: &RedisPool,
183    duration: std::time::Duration,
184    class: String,
185    queue: String,
186    args: impl serde::Serialize,
187) -> Result<()> {
188    opts()
189        .queue(queue)
190        .perform_in(redis, class, duration, args)
191        .await
192}
193
194fn new_jid() -> String {
195    let mut bytes = [0u8; 12];
196    rand::thread_rng().fill_bytes(&mut bytes);
197    hex::encode(bytes)
198}
199
200pub struct WorkerOpts<Args, W: Worker<Args> + ?Sized> {
201    queue: String,
202    retry: RetryOpts,
203    args: PhantomData<Args>,
204    worker: PhantomData<W>,
205    unique_for: Option<std::time::Duration>,
206    retry_queue: Option<String>,
207}
208
209impl<Args, W> WorkerOpts<Args, W>
210where
211    W: Worker<Args>,
212{
213    #[must_use]
214    pub fn new() -> Self {
215        Self {
216            queue: "default".into(),
217            retry: RetryOpts::Yes,
218            args: PhantomData,
219            worker: PhantomData,
220            unique_for: None,
221            retry_queue: None,
222        }
223    }
224
225    #[must_use]
226    pub fn retry<RO>(self, retry: RO) -> Self
227    where
228        RO: Into<RetryOpts>,
229    {
230        Self {
231            retry: retry.into(),
232            ..self
233        }
234    }
235
236    #[must_use]
237    pub fn retry_queue<S: Into<String>>(self, retry_queue: S) -> Self {
238        Self {
239            retry_queue: Some(retry_queue.into()),
240            ..self
241        }
242    }
243
244    #[must_use]
245    pub fn queue<S: Into<String>>(self, queue: S) -> Self {
246        Self {
247            queue: queue.into(),
248            ..self
249        }
250    }
251
252    #[must_use]
253    pub fn unique_for(self, unique_for: std::time::Duration) -> Self {
254        Self {
255            unique_for: Some(unique_for),
256            ..self
257        }
258    }
259
260    #[allow(clippy::wrong_self_convention)]
261    pub fn into_opts(&self) -> EnqueueOpts {
262        self.into()
263    }
264
265    pub async fn perform_async(
266        &self,
267        redis: &RedisPool,
268        args: impl serde::Serialize + Send + 'static,
269    ) -> Result<()> {
270        self.into_opts()
271            .perform_async(redis, W::class_name(), args)
272            .await
273    }
274
275    pub async fn perform_in(
276        &self,
277        redis: &RedisPool,
278        duration: std::time::Duration,
279        args: impl serde::Serialize + Send + 'static,
280    ) -> Result<()> {
281        self.into_opts()
282            .perform_in(redis, W::class_name(), duration, args)
283            .await
284    }
285}
286
287impl<Args, W: Worker<Args>> From<&WorkerOpts<Args, W>> for EnqueueOpts {
288    fn from(opts: &WorkerOpts<Args, W>) -> Self {
289        Self {
290            retry: opts.retry.clone(),
291            queue: opts.queue.clone(),
292            unique_for: opts.unique_for,
293            retry_queue: opts.retry_queue.clone(),
294        }
295    }
296}
297
298impl<Args, W: Worker<Args>> Default for WorkerOpts<Args, W> {
299    fn default() -> Self {
300        Self::new()
301    }
302}
303
304#[async_trait]
305pub trait Worker<Args>: Send + Sync {
306    /// Signal to WorkerRef to not attempt to modify the JsonValue args
307    /// before calling the perform function. This is useful if the args
308    /// are expected to be a `Vec<T>` that might be `len() == 1` or a
309    /// single sized tuple `(T,)`.
310    fn disable_argument_coercion(&self) -> bool {
311        false
312    }
313
314    #[must_use]
315    fn opts() -> WorkerOpts<Args, Self>
316    where
317        Self: Sized,
318    {
319        WorkerOpts::new()
320    }
321
322    // TODO: Make configurable through opts and make opts accessible to the
323    // retry middleware through a Box<dyn Worker>.
324    fn max_retries(&self) -> usize {
325        25
326    }
327
328    /// Derive a class_name from the Worker type to be used with sidekiq. By default
329    /// this method will
330    #[must_use]
331    fn class_name() -> String
332    where
333        Self: Sized,
334    {
335        use convert_case::{Case, Casing};
336
337        let type_name = std::any::type_name::<Self>();
338        let name = type_name.split("::").last().unwrap_or(type_name);
339        name.to_case(Case::UpperCamel)
340    }
341
342    async fn perform_async(redis: &RedisPool, args: Args) -> Result<()>
343    where
344        Self: Sized,
345        Args: Send + Sync + serde::Serialize + 'static,
346    {
347        Self::opts().perform_async(redis, args).await
348    }
349
350    async fn perform_in(redis: &RedisPool, duration: std::time::Duration, args: Args) -> Result<()>
351    where
352        Self: Sized,
353        Args: Send + Sync + serde::Serialize + 'static,
354    {
355        Self::opts().perform_in(redis, duration, args).await
356    }
357
358    async fn perform(&self, args: Args) -> Result<()>;
359}
360
361// We can't store a Vec<Box<dyn Worker<Args>>>, because that will only work
362// for a single arg type, but since any worker is JsonValue in and Result out,
363// we can wrap that generic work in a callback that shares the same type.
364// I'm sure this has a fancy name, but I don't know what it is.
365#[derive(Clone)]
366pub struct WorkerRef {
367    #[allow(clippy::type_complexity)]
368    work_fn: Arc<
369        Box<dyn Fn(JsonValue) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> + Send + Sync>,
370    >,
371    max_retries: usize,
372}
373
374async fn invoke_worker<Args, W>(args: JsonValue, worker: Arc<W>) -> Result<()>
375where
376    Args: Send + Sync + 'static,
377    W: Worker<Args> + 'static,
378    for<'de> Args: Deserialize<'de>,
379{
380    let args = if worker.disable_argument_coercion() {
381        args
382    } else {
383        // Ensure any caller expecting to receive `()` will always work.
384        if std::any::TypeId::of::<Args>() == std::any::TypeId::of::<()>() {
385            JsonValue::Null
386        } else {
387            // If the value contains a single item Vec then
388            // you can probably be sure that this is a single value item.
389            // Otherwise, the caller can impl a tuple type.
390            match args {
391                JsonValue::Array(mut arr) if arr.len() == 1 => {
392                    arr.pop().expect("value change after size check")
393                }
394                _ => args,
395            }
396        }
397    };
398
399    let args: Args = serde_json::from_value(args)?;
400    worker.perform(args).await
401}
402
403impl WorkerRef {
404    pub(crate) fn wrap<Args, W>(worker: Arc<W>) -> Self
405    where
406        Args: Send + Sync + 'static,
407        W: Worker<Args> + 'static,
408        for<'de> Args: Deserialize<'de>,
409    {
410        Self {
411            work_fn: Arc::new(Box::new({
412                let worker = worker.clone();
413                move |args: JsonValue| {
414                    let worker = worker.clone();
415                    Box::pin(async move { invoke_worker(args, worker).await })
416                }
417            })),
418            max_retries: worker.max_retries(),
419        }
420    }
421
422    #[must_use]
423    pub fn max_retries(&self) -> usize {
424        self.max_retries
425    }
426
427    pub async fn call(&self, args: JsonValue) -> Result<()> {
428        (Arc::clone(&self.work_fn))(args).await
429    }
430}
431
432#[derive(Clone, Debug, PartialEq)]
433pub enum RetryOpts {
434    Yes,
435    Never,
436    Max(usize),
437}
438
439impl Serialize for RetryOpts {
440    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
441    where
442        S: Serializer,
443    {
444        match *self {
445            RetryOpts::Yes => serializer.serialize_bool(true),
446            RetryOpts::Never => serializer.serialize_bool(false),
447            RetryOpts::Max(value) => serializer.serialize_u64(value as u64),
448        }
449    }
450}
451
452impl<'de> Deserialize<'de> for RetryOpts {
453    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
454    where
455        D: Deserializer<'de>,
456    {
457        struct RetryOptsVisitor;
458
459        impl Visitor<'_> for RetryOptsVisitor {
460            type Value = RetryOpts;
461
462            fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
463                formatter.write_str("a boolean, null, or a positive integer")
464            }
465
466            fn visit_bool<E>(self, value: bool) -> std::result::Result<Self::Value, E>
467            where
468                E: de::Error,
469            {
470                if value {
471                    Ok(RetryOpts::Yes)
472                } else {
473                    Ok(RetryOpts::Never)
474                }
475            }
476
477            fn visit_none<E>(self) -> std::result::Result<Self::Value, E>
478            where
479                E: de::Error,
480            {
481                Ok(RetryOpts::Never)
482            }
483
484            fn visit_u64<E>(self, value: u64) -> std::result::Result<Self::Value, E>
485            where
486                E: de::Error,
487            {
488                Ok(RetryOpts::Max(value as usize))
489            }
490        }
491
492        deserializer.deserialize_any(RetryOptsVisitor)
493    }
494}
495
496impl From<bool> for RetryOpts {
497    fn from(value: bool) -> Self {
498        match value {
499            true => RetryOpts::Yes,
500            false => RetryOpts::Never,
501        }
502    }
503}
504
505impl From<usize> for RetryOpts {
506    fn from(value: usize) -> Self {
507        RetryOpts::Max(value)
508    }
509}
510
511//
512// {
513//   "retry": true,
514//   "queue": "yolo",
515//   "class": "YoloWorker",
516//   "args": [
517//     {
518//       "yolo": "hiiii"
519//     }
520//   ],
521//   "jid": "f33f7063c6d7a4db0869289a",
522//   "created_at": 1647119929.3788748,
523//   "enqueued_at": 1647119929.378998
524// }
525//
526#[derive(Serialize, Deserialize, Debug, Clone)]
527pub struct Job {
528    pub queue: String,
529    pub args: JsonValue,
530    pub retry: RetryOpts,
531    pub class: String,
532    pub jid: String,
533    pub created_at: f64,
534    pub enqueued_at: Option<f64>,
535    pub failed_at: Option<f64>,
536    pub error_message: Option<String>,
537    pub error_class: Option<String>,
538    pub retry_count: Option<usize>,
539    pub retried_at: Option<f64>,
540    pub retry_queue: Option<String>,
541
542    #[serde(skip)]
543    pub unique_for: Option<std::time::Duration>,
544}
545
546#[derive(Debug)]
547pub struct UnitOfWork {
548    pub queue: String,
549    pub job: Job,
550}
551
552impl UnitOfWork {
553    #[must_use]
554    pub fn from_job(job: Job) -> Self {
555        Self {
556            queue: format!("queue:{}", &job.queue),
557            job,
558        }
559    }
560
561    pub fn from_job_string(job_str: String) -> Result<Self> {
562        let job: Job = serde_json::from_str(&job_str)?;
563        Ok(Self::from_job(job))
564    }
565
566    pub async fn enqueue(&self, redis: &RedisPool) -> Result<()> {
567        let mut redis = redis.get().await?;
568        self.enqueue_direct(&mut redis).await
569    }
570
571    async fn enqueue_direct(&self, redis: &mut RedisConnection) -> Result<()> {
572        let mut job = self.job.clone();
573        job.enqueued_at = Some(chrono::Utc::now().timestamp() as f64);
574
575        if let Some(ref duration) = job.unique_for {
576            // Check to see if this is unique for the given duration.
577            // Even though SET k v NX EQ ttl isn't the best locking
578            // mechanism, I think it's "good enough" to prove this out.
579            let args_as_json_string: String = serde_json::to_string(&job.args)?;
580            let args_hash = format!("{:x}", Sha256::digest(&args_as_json_string));
581            let redis_key = format!(
582                "sidekiq:unique:{}:{}:{}",
583                &job.queue, &job.class, &args_hash
584            );
585            if let redis::RedisValue::Nil = redis
586                .set_nx_ex(redis_key, "", duration.as_secs() as usize)
587                .await?
588            {
589                // This job has already been enqueued. Do not submit it to redis.
590                return Ok(());
591            }
592        }
593
594        redis.sadd("queues".to_string(), job.queue.clone()).await?;
595
596        redis
597            .lpush(self.queue.clone(), serde_json::to_string(&job)?)
598            .await?;
599        Ok(())
600    }
601
602    pub async fn reenqueue(&mut self, redis: &RedisPool) -> Result<()> {
603        if let Some(retry_count) = self.job.retry_count {
604            redis
605                .get()
606                .await?
607                .zadd(
608                    "retry".to_string(),
609                    serde_json::to_string(&self.job)?,
610                    Self::retry_job_at(retry_count).timestamp(),
611                )
612                .await?;
613        }
614
615        Ok(())
616    }
617
618    fn retry_job_at(count: usize) -> chrono::DateTime<chrono::Utc> {
619        let seconds_to_delay =
620            count.pow(4) + 15 + (rand::thread_rng().gen_range(0..30) * (count + 1));
621
622        chrono::Utc::now() + chrono::Duration::seconds(seconds_to_delay as i64)
623    }
624
625    pub async fn schedule(
626        &mut self,
627        redis: &RedisPool,
628        duration: std::time::Duration,
629    ) -> Result<()> {
630        let enqueue_at = chrono::Utc::now() + chrono::Duration::from_std(duration)?;
631
632        redis
633            .get()
634            .await?
635            .zadd(
636                "schedule".to_string(),
637                serde_json::to_string(&self.job)?,
638                enqueue_at.timestamp(),
639            )
640            .await?;
641
642        Ok(())
643    }
644}
645
646#[cfg(test)]
647mod test {
648    use super::*;
649
650    mod my {
651        pub mod cool {
652            pub mod workers {
653                use super::super::super::super::*;
654
655                pub struct TestOpts;
656
657                #[async_trait]
658                impl Worker<()> for TestOpts {
659                    fn opts() -> WorkerOpts<(), Self>
660                    where
661                        Self: Sized,
662                    {
663                        WorkerOpts::new()
664                            // Test bool
665                            .retry(false)
666                            // Test usize
667                            .retry(42)
668                            // Test the new type
669                            .retry(RetryOpts::Never)
670                            .unique_for(std::time::Duration::from_secs(30))
671                            .queue("yolo_quue")
672                    }
673
674                    async fn perform(&self, _args: ()) -> Result<()> {
675                        Ok(())
676                    }
677                }
678
679                pub struct X1Y2MyJob;
680
681                #[async_trait]
682                impl Worker<()> for X1Y2MyJob {
683                    async fn perform(&self, _args: ()) -> Result<()> {
684                        Ok(())
685                    }
686                }
687
688                pub struct TestModuleWorker;
689
690                #[async_trait]
691                impl Worker<()> for TestModuleWorker {
692                    async fn perform(&self, _args: ()) -> Result<()> {
693                        Ok(())
694                    }
695                }
696
697                pub struct TestCustomClassNameWorker;
698
699                #[async_trait]
700                impl Worker<()> for TestCustomClassNameWorker {
701                    async fn perform(&self, _args: ()) -> Result<()> {
702                        Ok(())
703                    }
704
705                    fn class_name() -> String
706                    where
707                        Self: Sized,
708                    {
709                        "My::Cool::Workers::TestCustomClassNameWorker".to_string()
710                    }
711                }
712            }
713        }
714    }
715
716    #[tokio::test]
717    async fn ignores_modules_in_ruby_worker_name() {
718        assert_eq!(
719            my::cool::workers::TestModuleWorker::class_name(),
720            "TestModuleWorker".to_string()
721        );
722    }
723
724    #[tokio::test]
725    async fn does_not_reformat_valid_ruby_class_names() {
726        assert_eq!(
727            my::cool::workers::X1Y2MyJob::class_name(),
728            "X1Y2MyJob".to_string()
729        );
730    }
731
732    #[tokio::test]
733    async fn supports_custom_class_name_for_workers() {
734        assert_eq!(
735            my::cool::workers::TestCustomClassNameWorker::class_name(),
736            "My::Cool::Workers::TestCustomClassNameWorker".to_string()
737        );
738    }
739
740    #[derive(Clone, Deserialize, Serialize, Debug)]
741    struct TestArg {
742        name: String,
743        age: i32,
744    }
745
746    struct TestGenericWorker;
747    #[async_trait]
748    impl Worker<TestArg> for TestGenericWorker {
749        async fn perform(&self, _args: TestArg) -> Result<()> {
750            Ok(())
751        }
752    }
753
754    struct TestMultiArgWorker;
755    #[async_trait]
756    impl Worker<(TestArg, TestArg)> for TestMultiArgWorker {
757        async fn perform(&self, _args: (TestArg, TestArg)) -> Result<()> {
758            Ok(())
759        }
760    }
761
762    struct TestTupleArgWorker;
763    #[async_trait]
764    impl Worker<(TestArg,)> for TestTupleArgWorker {
765        fn disable_argument_coercion(&self) -> bool {
766            true
767        }
768        async fn perform(&self, _args: (TestArg,)) -> Result<()> {
769            Ok(())
770        }
771    }
772
773    struct TestVecArgWorker;
774    #[async_trait]
775    impl Worker<Vec<TestArg>> for TestVecArgWorker {
776        fn disable_argument_coercion(&self) -> bool {
777            true
778        }
779        async fn perform(&self, _args: Vec<TestArg>) -> Result<()> {
780            Ok(())
781        }
782    }
783
784    #[tokio::test]
785    async fn can_have_a_vec_with_one_or_more_items() {
786        // One item
787        let worker = Arc::new(TestVecArgWorker);
788        let wrap = Arc::new(WorkerRef::wrap(worker));
789        let wrap = wrap.clone();
790        let arg = serde_json::to_value(vec![TestArg {
791            name: "test A".into(),
792            age: 1337,
793        }])
794        .unwrap();
795        wrap.call(arg).await.unwrap();
796
797        // Multiple items
798        let worker = Arc::new(TestVecArgWorker);
799        let wrap = Arc::new(WorkerRef::wrap(worker));
800        let wrap = wrap.clone();
801        let arg = serde_json::to_value(vec![
802            TestArg {
803                name: "test A".into(),
804                age: 1337,
805            },
806            TestArg {
807                name: "test A".into(),
808                age: 1337,
809            },
810        ])
811        .unwrap();
812        wrap.call(arg).await.unwrap();
813    }
814
815    #[tokio::test]
816    async fn can_have_multiple_arguments() {
817        let worker = Arc::new(TestMultiArgWorker);
818        let wrap = Arc::new(WorkerRef::wrap(worker));
819        let wrap = wrap.clone();
820        let arg = serde_json::to_value((
821            TestArg {
822                name: "test A".into(),
823                age: 1337,
824            },
825            TestArg {
826                name: "test B".into(),
827                age: 1336,
828            },
829        ))
830        .unwrap();
831        wrap.call(arg).await.unwrap();
832    }
833
834    #[tokio::test]
835    async fn can_have_a_single_tuple_argument() {
836        let worker = Arc::new(TestTupleArgWorker);
837        let wrap = Arc::new(WorkerRef::wrap(worker));
838        let wrap = wrap.clone();
839        let arg = serde_json::to_value((TestArg {
840            name: "test".into(),
841            age: 1337,
842        },))
843        .unwrap();
844        wrap.call(arg).await.unwrap();
845    }
846
847    #[tokio::test]
848    async fn can_have_a_single_argument() {
849        let worker = Arc::new(TestGenericWorker);
850        let wrap = Arc::new(WorkerRef::wrap(worker));
851        let wrap = wrap.clone();
852        let arg = serde_json::to_value(TestArg {
853            name: "test".into(),
854            age: 1337,
855        })
856        .unwrap();
857        wrap.call(arg).await.unwrap();
858    }
859
860    #[tokio::test]
861    async fn processor_config_has_workers_by_default() {
862        let cfg = ProcessorConfig::default();
863
864        assert!(
865            cfg.num_workers > 0,
866            "num_workers should be greater than 0 (using num cpu)"
867        );
868
869        let cfg = cfg.num_workers(1000);
870
871        assert_eq!(cfg.num_workers, 1000);
872    }
873}