simple_job_queue/
job_queue.rs

1use std::{marker::PhantomData, time::Duration};
2
3use async_trait::async_trait;
4use tokio::task::JoinHandle;
5
6use crate::{
7    error::{JobError, JobQueueError},
8    Job,
9};
10
11#[async_trait]
12pub trait JobQueueBackend<T>: Clone {
13    type Context: Send;
14
15    async fn setup(&self) -> Result<(), JobQueueError>;
16    async fn produce(&self, job: Job<T>) -> Result<(), JobQueueError>;
17    async fn consume(&self) -> Result<(Job<T>, Self::Context), JobQueueError>;
18    async fn done(&self, job: Job<T>, ctx: Self::Context);
19    async fn failed(&self, job: Job<T>, ctx: Self::Context);
20}
21
22#[async_trait]
23pub trait Processor<T> {
24    async fn process(&mut self, job: &Job<T>) -> Result<(), JobError>;
25}
26
27#[derive(Clone)]
28pub struct JobQueueOptions {
29    consumption_failure_backoff_interval: Duration,
30}
31
32impl JobQueueOptions {
33    pub fn consumption_failure_backoff_interval(mut self, d: Duration) -> Self {
34        self.consumption_failure_backoff_interval = d;
35        self
36    }
37}
38
39impl Default for JobQueueOptions {
40    fn default() -> Self {
41        Self {
42            consumption_failure_backoff_interval: Duration::from_secs(5),
43        }
44    }
45}
46
47struct JobQueueWorker<T, B, P>
48where
49    B: JobQueueBackend<T>,
50    P: Processor<T>,
51{
52    backend: B,
53    processor: P,
54    options: JobQueueOptions,
55    _t: PhantomData<T>,
56}
57
58impl<T, B, P> JobQueueWorker<T, B, P>
59where
60    B: JobQueueBackend<T>,
61    P: Processor<T>,
62{
63    async fn start(&mut self) -> () {
64        loop {
65            match self.backend.consume().await {
66                Ok((job, ctx)) => match self.processor.process(&job).await {
67                    Ok(_) => self.backend.done(job, ctx).await,
68                    Err(_) => self.backend.failed(job, ctx).await,
69                },
70                Err(_) => {
71                    tokio::time::sleep(self.options.consumption_failure_backoff_interval).await;
72                }
73            };
74        }
75    }
76}
77
78pub struct JobQueue<T, B>
79where
80    B: JobQueueBackend<T>,
81{
82    backend: B,
83    worker_handle: Option<JoinHandle<()>>,
84    options: JobQueueOptions,
85    _t: PhantomData<T>,
86}
87
88impl<T, B> JobQueue<T, B>
89where
90    B: JobQueueBackend<T>,
91{
92    pub fn new(backend: B, options: JobQueueOptions) -> Self {
93        Self {
94            backend,
95            worker_handle: None,
96            options,
97            _t: PhantomData,
98        }
99    }
100
101    pub async fn submit(&self, job: Job<T>) -> Result<(), JobQueueError> {
102        self.backend.produce(job).await?;
103
104        Ok(())
105    }
106}
107
108impl<T, B> JobQueue<T, B>
109where
110    T: Send + Sync + 'static,
111    B: JobQueueBackend<T> + Send + Sync + 'static,
112{
113    pub async fn start<P>(&mut self, processor: P) -> Result<(), JobQueueError>
114    where
115        P: Processor<T> + Send + Sync + 'static,
116    {
117        self.backend.setup().await?;
118
119        let mut worker = JobQueueWorker {
120            backend: self.backend.clone(),
121            processor,
122            options: self.options.clone(),
123            _t: PhantomData,
124        };
125
126        let handle = tokio::spawn(async move { worker.start().await });
127        self.worker_handle = Some(handle);
128
129        Ok(())
130    }
131}
132
133impl<T, B> Drop for JobQueue<T, B>
134where
135    B: JobQueueBackend<T>,
136{
137    fn drop(&mut self) {
138        if let Some(handle) = self.worker_handle.take() {
139            handle.abort();
140        }
141    }
142}