simple_job_queue/
job_queue.rs1use std::{marker::PhantomData, time::Duration};
2
3use async_trait::async_trait;
4use tokio::task::JoinHandle;
5
6use crate::{
7 error::{JobError, JobQueueError},
8 Job,
9};
10
11#[async_trait]
12pub trait JobQueueBackend<T>: Clone {
13 type Context: Send;
14
15 async fn setup(&self) -> Result<(), JobQueueError>;
16 async fn produce(&self, job: Job<T>) -> Result<(), JobQueueError>;
17 async fn consume(&self) -> Result<(Job<T>, Self::Context), JobQueueError>;
18 async fn done(&self, job: Job<T>, ctx: Self::Context);
19 async fn failed(&self, job: Job<T>, ctx: Self::Context);
20}
21
22#[async_trait]
23pub trait Processor<T> {
24 async fn process(&mut self, job: &Job<T>) -> Result<(), JobError>;
25}
26
27#[derive(Clone)]
28pub struct JobQueueOptions {
29 consumption_failure_backoff_interval: Duration,
30}
31
32impl JobQueueOptions {
33 pub fn consumption_failure_backoff_interval(mut self, d: Duration) -> Self {
34 self.consumption_failure_backoff_interval = d;
35 self
36 }
37}
38
39impl Default for JobQueueOptions {
40 fn default() -> Self {
41 Self {
42 consumption_failure_backoff_interval: Duration::from_secs(5),
43 }
44 }
45}
46
47struct JobQueueWorker<T, B, P>
48where
49 B: JobQueueBackend<T>,
50 P: Processor<T>,
51{
52 backend: B,
53 processor: P,
54 options: JobQueueOptions,
55 _t: PhantomData<T>,
56}
57
58impl<T, B, P> JobQueueWorker<T, B, P>
59where
60 B: JobQueueBackend<T>,
61 P: Processor<T>,
62{
63 async fn start(&mut self) -> () {
64 loop {
65 match self.backend.consume().await {
66 Ok((job, ctx)) => match self.processor.process(&job).await {
67 Ok(_) => self.backend.done(job, ctx).await,
68 Err(_) => self.backend.failed(job, ctx).await,
69 },
70 Err(_) => {
71 tokio::time::sleep(self.options.consumption_failure_backoff_interval).await;
72 }
73 };
74 }
75 }
76}
77
78pub struct JobQueue<T, B>
79where
80 B: JobQueueBackend<T>,
81{
82 backend: B,
83 worker_handle: Option<JoinHandle<()>>,
84 options: JobQueueOptions,
85 _t: PhantomData<T>,
86}
87
88impl<T, B> JobQueue<T, B>
89where
90 B: JobQueueBackend<T>,
91{
92 pub fn new(backend: B, options: JobQueueOptions) -> Self {
93 Self {
94 backend,
95 worker_handle: None,
96 options,
97 _t: PhantomData,
98 }
99 }
100
101 pub async fn submit(&self, job: Job<T>) -> Result<(), JobQueueError> {
102 self.backend.produce(job).await?;
103
104 Ok(())
105 }
106}
107
108impl<T, B> JobQueue<T, B>
109where
110 T: Send + Sync + 'static,
111 B: JobQueueBackend<T> + Send + Sync + 'static,
112{
113 pub async fn start<P>(&mut self, processor: P) -> Result<(), JobQueueError>
114 where
115 P: Processor<T> + Send + Sync + 'static,
116 {
117 self.backend.setup().await?;
118
119 let mut worker = JobQueueWorker {
120 backend: self.backend.clone(),
121 processor,
122 options: self.options.clone(),
123 _t: PhantomData,
124 };
125
126 let handle = tokio::spawn(async move { worker.start().await });
127 self.worker_handle = Some(handle);
128
129 Ok(())
130 }
131}
132
133impl<T, B> Drop for JobQueue<T, B>
134where
135 B: JobQueueBackend<T>,
136{
137 fn drop(&mut self) {
138 if let Some(handle) = self.worker_handle.take() {
139 handle.abort();
140 }
141 }
142}