1use crate::{
2 QueueConfig, WorkerConfigKind, context::JobContext, job_envelope::JobConflictStrategy,
3};
4
5pub trait Job: Send + Sync + serde::Serialize {
6 fn worker_name() -> &'static str
7 where
8 Self: Sized;
9
10 fn unique_id(&self) -> Option<String> {
11 None
12 }
13
14 fn on_conflict(&self) -> JobConflictStrategy {
15 JobConflictStrategy::Skip
16 }
17
18 fn should_resurrect() -> bool
19 where
20 Self: Sized,
21 {
22 true
23 }
24
25 fn throttle_cost(&self) -> Option<u64> {
26 None
27 }
28}
29
30#[async_trait::async_trait]
31pub trait Worker<Args: Send + Sync>: Send + Sync {
32 type Error: std::error::Error + Send + Sync;
33
34 async fn process(&self, job: &Args, ctx: &JobContext) -> Result<(), Self::Error>;
35
36 fn max_retries(&self, _job: &Args) -> u32 {
37 2
38 }
39
40 fn retry_delay(&self, _job: &Args, retries: u32) -> u64 {
41 u64::pow(5, retries + 2)
50 }
51
52 fn cron_schedule() -> Option<String>
54 where
55 Self: Sized,
56 {
57 None
58 }
59
60 fn cron_queue_config() -> Option<QueueConfig>
61 where
62 Self: Sized,
63 {
64 None
65 }
66
67 fn to_config() -> WorkerConfigKind
68 where
69 Self: Sized,
70 Args: Job,
71 {
72 if let Some(schedule) = Self::cron_schedule() {
73 let queue_config = Self::cron_queue_config()
74 .expect("Cron worker must define cron_queue_config (use #[oxanus(cron(schedule = \"...\", queue = MyQueue))])");
75 let queue_key = queue_config.static_key().expect(
76 "Cron workers must use static queues. Dynamic queues are not supported for cron workers.",
77 );
78 return WorkerConfigKind::Cron {
79 schedule,
80 queue_key,
81 resurrect: Args::should_resurrect(),
82 };
83 }
84 WorkerConfigKind::Normal
85 }
86}
87
88pub trait FromContext<T> {
89 fn from_context(ctx: &T) -> Self;
90}
91
92#[async_trait::async_trait]
93pub trait Processable: Send + Sync {
94 type Error: std::error::Error + Send + Sync;
95
96 async fn process(&self, ctx: &JobContext) -> Result<(), Self::Error>;
97 fn max_retries(&self) -> u32;
98 fn retry_delay(&self, retries: u32) -> u64;
99}
100
101pub type BoxedProcessable<ET> = Box<dyn Processable<Error = ET>>;
102
103pub(crate) struct BoundJob<W, A> {
104 pub worker: W,
105 pub job: A,
106}
107
108#[async_trait::async_trait]
109impl<W, A> Processable for BoundJob<W, A>
110where
111 W: Worker<A> + Send + Sync + 'static,
112 A: Send + Sync + 'static,
113{
114 type Error = W::Error;
115
116 async fn process(&self, ctx: &JobContext) -> Result<(), Self::Error> {
117 self.worker.process(&self.job, ctx).await
118 }
119
120 fn max_retries(&self) -> u32 {
121 self.worker.max_retries(&self.job)
122 }
123
124 fn retry_delay(&self, retries: u32) -> u64 {
125 self.worker.retry_delay(&self.job, retries)
126 }
127}
128
129#[cfg(feature = "macros")]
130#[cfg(test)]
131mod tests {
132 use super::{Job, JobConflictStrategy};
133 use crate as oxanus;
134 use serde::{Deserialize, Serialize};
135 use std::io::Error as WorkerError;
136
137 #[derive(Clone, Default)]
138 struct WorkerContext {}
139
140 #[derive(oxanus::Registry)]
141 #[allow(dead_code)]
142 struct ComponentRegistry(oxanus::ComponentRegistry<WorkerContext, WorkerError>);
143
144 #[derive(oxanus::Registry)]
145 #[allow(dead_code)]
146 struct ComponentRegistryFmt(oxanus::ComponentRegistry<WorkerContext, std::fmt::Error>);
147
148 #[tokio::test]
149 async fn test_define_worker_with_macro() {
150 #[derive(Debug, Serialize, Deserialize)]
151 struct TestJob {}
152
153 #[derive(oxanus::Worker)]
154
155 struct TestWorker;
156
157 impl TestWorker {
158 async fn process(
159 &self,
160 _job: &TestJob,
161 _ctx: &oxanus::JobContext,
162 ) -> Result<(), WorkerError> {
163 Ok(())
164 }
165 }
166
167 assert_eq!(
168 oxanus::Worker::<TestJob>::max_retries(&TestWorker, &TestJob {}),
169 2
170 );
171
172 #[derive(Debug, Serialize, Deserialize)]
173 struct TestWorkerCustomErrorJob {}
174
175 #[derive(oxanus::Worker)]
176 #[oxanus(error = std::fmt::Error, registry = ComponentRegistryFmt)]
177 #[oxanus(max_retries = 3, retry_delay = 10)]
178 #[oxanus(on_conflict = Replace)]
179 struct TestWorkerCustomError;
180
181 impl TestWorkerCustomError {
182 async fn process(
183 &self,
184 _job: &TestWorkerCustomErrorJob,
185 _ctx: &oxanus::JobContext,
186 ) -> Result<(), std::fmt::Error> {
187 use std::fmt::Write;
188 let mut s = String::new();
189 write!(&mut s, "hi")
190 }
191 }
192
193 assert_eq!(
194 oxanus::Worker::<TestWorkerCustomErrorJob>::max_retries(
195 &TestWorkerCustomError,
196 &TestWorkerCustomErrorJob {}
197 ),
198 3
199 );
200 assert_eq!(
201 oxanus::Worker::<TestWorkerCustomErrorJob>::retry_delay(
202 &TestWorkerCustomError,
203 &TestWorkerCustomErrorJob {},
204 1
205 ),
206 10
207 );
208 assert_eq!(
209 TestWorkerCustomErrorJob {}.on_conflict(),
210 JobConflictStrategy::Replace
211 );
212
213 #[derive(Debug, Serialize, Deserialize)]
214 struct TestWorkerUniqueIdJob {
215 id: i32,
216 _1: i32,
217 }
218
219 #[derive(oxanus::Worker)]
220 #[oxanus(unique_id = "test_worker_{id}")]
221 struct TestWorkerUniqueId;
222
223 impl TestWorkerUniqueId {
224 async fn process(
225 &self,
226 _job: &TestWorkerUniqueIdJob,
227 _ctx: &oxanus::JobContext,
228 ) -> Result<(), WorkerError> {
229 Ok(())
230 }
231 }
232
233 assert_eq!(
234 oxanus::Worker::<TestWorkerUniqueIdJob>::max_retries(
235 &TestWorkerUniqueId,
236 &TestWorkerUniqueIdJob { id: 0, _1: 0 }
237 ),
238 2
239 );
240 assert_eq!(
241 oxanus::Job::unique_id(&TestWorkerUniqueIdJob { id: 1, _1: 0 }),
242 Some("test_worker_1".to_string())
243 );
244 assert_eq!(
245 oxanus::Job::unique_id(&TestWorkerUniqueIdJob { id: 12, _1: 0 }),
246 Some("test_worker_12".to_string())
247 );
248
249 #[derive(Debug, Serialize, Deserialize, Default)]
250 struct NestedTask {
251 name: String,
252 }
253
254 #[derive(Debug, Serialize, Deserialize)]
255 struct TestWorkerNestedUniqueIdJob {
256 id: i32,
257 task: NestedTask,
258 }
259
260 #[derive(oxanus::Worker)]
261 #[oxanus(unique_id(fmt = "test_worker_{id}_{task}", id = self.id, task = self.task.name))]
262 struct TestWorkerNestedUniqueId;
263
264 impl TestWorkerNestedUniqueId {
265 async fn process(
266 &self,
267 _job: &TestWorkerNestedUniqueIdJob,
268 _ctx: &oxanus::JobContext,
269 ) -> Result<(), WorkerError> {
270 Ok(())
271 }
272 }
273
274 assert_eq!(
275 oxanus::Job::unique_id(&TestWorkerNestedUniqueIdJob {
276 id: 1,
277 task: NestedTask {
278 name: "task1".to_owned(),
279 }
280 }),
281 Some("test_worker_1_task1".to_string())
282 );
283 assert_eq!(
284 oxanus::Job::unique_id(&TestWorkerNestedUniqueIdJob {
285 id: 2,
286 task: NestedTask {
287 name: "task2".to_owned(),
288 }
289 }),
290 Some("test_worker_2_task2".to_string())
291 );
292
293 #[derive(Debug, Serialize, Deserialize)]
294 struct TestWorkerCustomUniqueIdJob {
295 id: i32,
296 task: NestedTask,
297 }
298
299 impl TestWorkerCustomUniqueIdJob {
300 fn unique_id(&self) -> Option<String> {
301 Some(format!("worker_id_{}_task_{}", self.id, self.task.name))
302 }
303 }
304
305 #[derive(oxanus::Worker)]
306 #[oxanus(unique_id = Self::unique_id)]
307 #[oxanus(retry_delay = Self::retry_delay)]
308 #[oxanus(max_retries = Self::max_retries)]
309 struct TestWorkerCustomUniqueId;
310
311 impl TestWorkerCustomUniqueId {
312 async fn process(
313 &self,
314 _job: &TestWorkerCustomUniqueIdJob,
315 _ctx: &oxanus::JobContext,
316 ) -> Result<(), WorkerError> {
317 Ok(())
318 }
319
320 fn retry_delay(&self, _job: &TestWorkerCustomUniqueIdJob, retries: u32) -> u64 {
321 retries as u64 * 2
322 }
323
324 fn max_retries(&self, _job: &TestWorkerCustomUniqueIdJob) -> u32 {
325 9
326 }
327 }
328
329 assert_eq!(
330 oxanus::Job::unique_id(&TestWorkerCustomUniqueIdJob {
331 id: 1,
332 task: NestedTask {
333 name: "11".to_owned(),
334 }
335 }),
336 Some("worker_id_1_task_11".to_string())
337 );
338 let job2 = TestWorkerCustomUniqueIdJob {
339 id: 2,
340 task: NestedTask {
341 name: "22".to_owned(),
342 },
343 };
344 assert_eq!(
345 oxanus::Job::unique_id(&job2),
346 Some("worker_id_2_task_22".to_string())
347 );
348 let worker = TestWorkerCustomUniqueId;
349 assert_eq!(
350 oxanus::Worker::<TestWorkerCustomUniqueIdJob>::retry_delay(&worker, &job2, 1),
351 2
352 );
353 assert_eq!(
354 oxanus::Worker::<TestWorkerCustomUniqueIdJob>::retry_delay(&worker, &job2, 2),
355 4
356 );
357 assert_eq!(
358 oxanus::Worker::<TestWorkerCustomUniqueIdJob>::max_retries(&worker, &job2),
359 9
360 );
361 }
362
363 #[tokio::test]
364 async fn test_define_cron_worker_with_macro() {
365 use crate as oxanus;
366 use crate::Queue;
367 use std::io::Error as WorkerError;
368
369 #[derive(Serialize, oxanus::Queue)]
370 struct DefaultQueue;
371
372 #[derive(Debug, Serialize, Deserialize)]
373 struct TestCronJob {}
374
375 #[derive(oxanus::Worker)]
376 #[oxanus(cron(schedule = "*/1 * * * * *", queue = DefaultQueue))]
377 struct TestCronWorker;
378
379 impl TestCronWorker {
380 async fn process(
381 &self,
382 _job: &TestCronJob,
383 _ctx: &oxanus::JobContext,
384 ) -> Result<(), WorkerError> {
385 Ok(())
386 }
387 }
388
389 assert_eq!(
390 <TestCronWorker as oxanus::Worker<TestCronJob>>::cron_schedule(),
391 Some("*/1 * * * * *".to_string())
392 );
393 assert_eq!(
394 <TestCronWorker as oxanus::Worker<TestCronJob>>::cron_queue_config(),
395 Some(DefaultQueue::to_config()),
396 );
397 assert!(<TestCronJob as oxanus::Job>::should_resurrect());
398 }
399
400 #[tokio::test]
401 async fn test_define_worker_with_resurrect_false() {
402 use crate as oxanus;
403 use std::io::Error as WorkerError;
404
405 #[derive(Debug, Serialize, Deserialize)]
406 struct NoResurrectJob {}
407
408 #[derive(oxanus::Worker)]
409 #[oxanus(resurrect = false)]
410 struct NoResurrectWorker;
411
412 impl NoResurrectWorker {
413 async fn process(
414 &self,
415 _job: &NoResurrectJob,
416 _ctx: &oxanus::JobContext,
417 ) -> Result<(), WorkerError> {
418 Ok(())
419 }
420 }
421
422 assert!(!<NoResurrectJob as oxanus::Job>::should_resurrect());
423
424 #[derive(Debug, Serialize, Deserialize)]
425 struct DefaultResurrectJob {}
426
427 #[derive(oxanus::Worker)]
428
429 struct DefaultResurrectWorker;
430
431 impl DefaultResurrectWorker {
432 async fn process(
433 &self,
434 _job: &DefaultResurrectJob,
435 _ctx: &oxanus::JobContext,
436 ) -> Result<(), WorkerError> {
437 Ok(())
438 }
439 }
440
441 assert!(<DefaultResurrectJob as oxanus::Job>::should_resurrect());
442 }
443}