Skip to main content

oxanus/
worker.rs

1use crate::{
2    QueueConfig, WorkerConfigKind, context::JobContext, job_envelope::JobConflictStrategy,
3};
4
5pub trait Job: Send + Sync + serde::Serialize {
6    fn worker_name() -> &'static str
7    where
8        Self: Sized;
9
10    fn unique_id(&self) -> Option<String> {
11        None
12    }
13
14    fn on_conflict(&self) -> JobConflictStrategy {
15        JobConflictStrategy::Skip
16    }
17
18    fn should_resurrect() -> bool
19    where
20        Self: Sized,
21    {
22        true
23    }
24
25    fn throttle_cost(&self) -> Option<u64> {
26        None
27    }
28}
29
30#[async_trait::async_trait]
31pub trait Worker<Args: Send + Sync>: Send + Sync {
32    type Error: std::error::Error + Send + Sync;
33
34    async fn process(&self, job: &Args, ctx: &JobContext) -> Result<(), Self::Error>;
35
36    fn max_retries(&self, _job: &Args) -> u32 {
37        2
38    }
39
40    fn retry_delay(&self, _job: &Args, retries: u32) -> u64 {
41        // 0 -> 25 seconds
42        // 1 -> 125 seconds
43        // 2 -> 625 seconds
44        // 3 -> 3125 seconds
45        // 4 -> 15625 seconds
46        // 5 -> 78125 seconds
47        // 6 -> 390625 seconds
48        // 7 -> 1953125 seconds
49        u64::pow(5, retries + 2)
50    }
51
52    /// 6 part cron schedule: "* * * * * *"
53    fn cron_schedule() -> Option<String>
54    where
55        Self: Sized,
56    {
57        None
58    }
59
60    fn cron_queue_config() -> Option<QueueConfig>
61    where
62        Self: Sized,
63    {
64        None
65    }
66
67    fn to_config() -> WorkerConfigKind
68    where
69        Self: Sized,
70        Args: Job,
71    {
72        if let Some(schedule) = Self::cron_schedule() {
73            let queue_config = Self::cron_queue_config()
74                .expect("Cron worker must define cron_queue_config (use #[oxanus(cron(schedule = \"...\", queue = MyQueue))])");
75            let queue_key = queue_config.static_key().expect(
76                "Cron workers must use static queues. Dynamic queues are not supported for cron workers.",
77            );
78            return WorkerConfigKind::Cron {
79                schedule,
80                queue_key,
81                resurrect: Args::should_resurrect(),
82            };
83        }
84        WorkerConfigKind::Normal
85    }
86}
87
88pub trait FromContext<T> {
89    fn from_context(ctx: &T) -> Self;
90}
91
92#[async_trait::async_trait]
93pub trait Processable: Send + Sync {
94    type Error: std::error::Error + Send + Sync;
95
96    async fn process(&self, ctx: &JobContext) -> Result<(), Self::Error>;
97    fn max_retries(&self) -> u32;
98    fn retry_delay(&self, retries: u32) -> u64;
99}
100
101pub type BoxedProcessable<ET> = Box<dyn Processable<Error = ET>>;
102
103pub(crate) struct BoundJob<W, A> {
104    pub worker: W,
105    pub job: A,
106}
107
108#[async_trait::async_trait]
109impl<W, A> Processable for BoundJob<W, A>
110where
111    W: Worker<A> + Send + Sync + 'static,
112    A: Send + Sync + 'static,
113{
114    type Error = W::Error;
115
116    async fn process(&self, ctx: &JobContext) -> Result<(), Self::Error> {
117        self.worker.process(&self.job, ctx).await
118    }
119
120    fn max_retries(&self) -> u32 {
121        self.worker.max_retries(&self.job)
122    }
123
124    fn retry_delay(&self, retries: u32) -> u64 {
125        self.worker.retry_delay(&self.job, retries)
126    }
127}
128
129#[cfg(feature = "macros")]
130#[cfg(test)]
131mod tests {
132    use super::{Job, JobConflictStrategy};
133    use crate as oxanus;
134    use serde::{Deserialize, Serialize};
135    use std::io::Error as WorkerError;
136
137    #[derive(Clone, Default)]
138    struct WorkerContext {}
139
140    #[derive(oxanus::Registry)]
141    #[allow(dead_code)]
142    struct ComponentRegistry(oxanus::ComponentRegistry<WorkerContext, WorkerError>);
143
144    #[derive(oxanus::Registry)]
145    #[allow(dead_code)]
146    struct ComponentRegistryFmt(oxanus::ComponentRegistry<WorkerContext, std::fmt::Error>);
147
148    #[tokio::test]
149    async fn test_define_worker_with_macro() {
150        #[derive(Debug, Serialize, Deserialize)]
151        struct TestJob {}
152
153        #[derive(oxanus::Worker)]
154
155        struct TestWorker;
156
157        impl TestWorker {
158            async fn process(
159                &self,
160                _job: &TestJob,
161                _ctx: &oxanus::JobContext,
162            ) -> Result<(), WorkerError> {
163                Ok(())
164            }
165        }
166
167        assert_eq!(
168            oxanus::Worker::<TestJob>::max_retries(&TestWorker, &TestJob {}),
169            2
170        );
171
172        #[derive(Debug, Serialize, Deserialize)]
173        struct TestWorkerCustomErrorJob {}
174
175        #[derive(oxanus::Worker)]
176        #[oxanus(error = std::fmt::Error, registry = ComponentRegistryFmt)]
177        #[oxanus(max_retries = 3, retry_delay = 10)]
178        #[oxanus(on_conflict = Replace)]
179        struct TestWorkerCustomError;
180
181        impl TestWorkerCustomError {
182            async fn process(
183                &self,
184                _job: &TestWorkerCustomErrorJob,
185                _ctx: &oxanus::JobContext,
186            ) -> Result<(), std::fmt::Error> {
187                use std::fmt::Write;
188                let mut s = String::new();
189                write!(&mut s, "hi")
190            }
191        }
192
193        assert_eq!(
194            oxanus::Worker::<TestWorkerCustomErrorJob>::max_retries(
195                &TestWorkerCustomError,
196                &TestWorkerCustomErrorJob {}
197            ),
198            3
199        );
200        assert_eq!(
201            oxanus::Worker::<TestWorkerCustomErrorJob>::retry_delay(
202                &TestWorkerCustomError,
203                &TestWorkerCustomErrorJob {},
204                1
205            ),
206            10
207        );
208        assert_eq!(
209            TestWorkerCustomErrorJob {}.on_conflict(),
210            JobConflictStrategy::Replace
211        );
212
213        #[derive(Debug, Serialize, Deserialize)]
214        struct TestWorkerUniqueIdJob {
215            id: i32,
216            _1: i32,
217        }
218
219        #[derive(oxanus::Worker)]
220        #[oxanus(unique_id = "test_worker_{id}")]
221        struct TestWorkerUniqueId;
222
223        impl TestWorkerUniqueId {
224            async fn process(
225                &self,
226                _job: &TestWorkerUniqueIdJob,
227                _ctx: &oxanus::JobContext,
228            ) -> Result<(), WorkerError> {
229                Ok(())
230            }
231        }
232
233        assert_eq!(
234            oxanus::Worker::<TestWorkerUniqueIdJob>::max_retries(
235                &TestWorkerUniqueId,
236                &TestWorkerUniqueIdJob { id: 0, _1: 0 }
237            ),
238            2
239        );
240        assert_eq!(
241            oxanus::Job::unique_id(&TestWorkerUniqueIdJob { id: 1, _1: 0 }),
242            Some("test_worker_1".to_string())
243        );
244        assert_eq!(
245            oxanus::Job::unique_id(&TestWorkerUniqueIdJob { id: 12, _1: 0 }),
246            Some("test_worker_12".to_string())
247        );
248
249        #[derive(Debug, Serialize, Deserialize, Default)]
250        struct NestedTask {
251            name: String,
252        }
253
254        #[derive(Debug, Serialize, Deserialize)]
255        struct TestWorkerNestedUniqueIdJob {
256            id: i32,
257            task: NestedTask,
258        }
259
260        #[derive(oxanus::Worker)]
261        #[oxanus(unique_id(fmt = "test_worker_{id}_{task}", id = self.id, task = self.task.name))]
262        struct TestWorkerNestedUniqueId;
263
264        impl TestWorkerNestedUniqueId {
265            async fn process(
266                &self,
267                _job: &TestWorkerNestedUniqueIdJob,
268                _ctx: &oxanus::JobContext,
269            ) -> Result<(), WorkerError> {
270                Ok(())
271            }
272        }
273
274        assert_eq!(
275            oxanus::Job::unique_id(&TestWorkerNestedUniqueIdJob {
276                id: 1,
277                task: NestedTask {
278                    name: "task1".to_owned(),
279                }
280            }),
281            Some("test_worker_1_task1".to_string())
282        );
283        assert_eq!(
284            oxanus::Job::unique_id(&TestWorkerNestedUniqueIdJob {
285                id: 2,
286                task: NestedTask {
287                    name: "task2".to_owned(),
288                }
289            }),
290            Some("test_worker_2_task2".to_string())
291        );
292
293        #[derive(Debug, Serialize, Deserialize)]
294        struct TestWorkerCustomUniqueIdJob {
295            id: i32,
296            task: NestedTask,
297        }
298
299        impl TestWorkerCustomUniqueIdJob {
300            fn unique_id(&self) -> Option<String> {
301                Some(format!("worker_id_{}_task_{}", self.id, self.task.name))
302            }
303        }
304
305        #[derive(oxanus::Worker)]
306        #[oxanus(unique_id = Self::unique_id)]
307        #[oxanus(retry_delay = Self::retry_delay)]
308        #[oxanus(max_retries = Self::max_retries)]
309        struct TestWorkerCustomUniqueId;
310
311        impl TestWorkerCustomUniqueId {
312            async fn process(
313                &self,
314                _job: &TestWorkerCustomUniqueIdJob,
315                _ctx: &oxanus::JobContext,
316            ) -> Result<(), WorkerError> {
317                Ok(())
318            }
319
320            fn retry_delay(&self, _job: &TestWorkerCustomUniqueIdJob, retries: u32) -> u64 {
321                retries as u64 * 2
322            }
323
324            fn max_retries(&self, _job: &TestWorkerCustomUniqueIdJob) -> u32 {
325                9
326            }
327        }
328
329        assert_eq!(
330            oxanus::Job::unique_id(&TestWorkerCustomUniqueIdJob {
331                id: 1,
332                task: NestedTask {
333                    name: "11".to_owned(),
334                }
335            }),
336            Some("worker_id_1_task_11".to_string())
337        );
338        let job2 = TestWorkerCustomUniqueIdJob {
339            id: 2,
340            task: NestedTask {
341                name: "22".to_owned(),
342            },
343        };
344        assert_eq!(
345            oxanus::Job::unique_id(&job2),
346            Some("worker_id_2_task_22".to_string())
347        );
348        let worker = TestWorkerCustomUniqueId;
349        assert_eq!(
350            oxanus::Worker::<TestWorkerCustomUniqueIdJob>::retry_delay(&worker, &job2, 1),
351            2
352        );
353        assert_eq!(
354            oxanus::Worker::<TestWorkerCustomUniqueIdJob>::retry_delay(&worker, &job2, 2),
355            4
356        );
357        assert_eq!(
358            oxanus::Worker::<TestWorkerCustomUniqueIdJob>::max_retries(&worker, &job2),
359            9
360        );
361    }
362
363    #[tokio::test]
364    async fn test_define_cron_worker_with_macro() {
365        use crate as oxanus;
366        use crate::Queue;
367        use std::io::Error as WorkerError;
368
369        #[derive(Serialize, oxanus::Queue)]
370        struct DefaultQueue;
371
372        #[derive(Debug, Serialize, Deserialize)]
373        struct TestCronJob {}
374
375        #[derive(oxanus::Worker)]
376        #[oxanus(cron(schedule = "*/1 * * * * *", queue = DefaultQueue))]
377        struct TestCronWorker;
378
379        impl TestCronWorker {
380            async fn process(
381                &self,
382                _job: &TestCronJob,
383                _ctx: &oxanus::JobContext,
384            ) -> Result<(), WorkerError> {
385                Ok(())
386            }
387        }
388
389        assert_eq!(
390            <TestCronWorker as oxanus::Worker<TestCronJob>>::cron_schedule(),
391            Some("*/1 * * * * *".to_string())
392        );
393        assert_eq!(
394            <TestCronWorker as oxanus::Worker<TestCronJob>>::cron_queue_config(),
395            Some(DefaultQueue::to_config()),
396        );
397        assert!(<TestCronJob as oxanus::Job>::should_resurrect());
398    }
399
400    #[tokio::test]
401    async fn test_define_worker_with_resurrect_false() {
402        use crate as oxanus;
403        use std::io::Error as WorkerError;
404
405        #[derive(Debug, Serialize, Deserialize)]
406        struct NoResurrectJob {}
407
408        #[derive(oxanus::Worker)]
409        #[oxanus(resurrect = false)]
410        struct NoResurrectWorker;
411
412        impl NoResurrectWorker {
413            async fn process(
414                &self,
415                _job: &NoResurrectJob,
416                _ctx: &oxanus::JobContext,
417            ) -> Result<(), WorkerError> {
418                Ok(())
419            }
420        }
421
422        assert!(!<NoResurrectJob as oxanus::Job>::should_resurrect());
423
424        #[derive(Debug, Serialize, Deserialize)]
425        struct DefaultResurrectJob {}
426
427        #[derive(oxanus::Worker)]
428
429        struct DefaultResurrectWorker;
430
431        impl DefaultResurrectWorker {
432            async fn process(
433                &self,
434                _job: &DefaultResurrectJob,
435                _ctx: &oxanus::JobContext,
436            ) -> Result<(), WorkerError> {
437                Ok(())
438            }
439        }
440
441        assert!(<DefaultResurrectJob as oxanus::Job>::should_resurrect());
442    }
443}