Skip to main content

mq/
worker.rs

1use std::{future::ready, pin::pin, time::Duration};
2
3use crate::{Consumer, Context, Error, JobProcessor};
4use futures::{stream, FutureExt, StreamExt, TryStreamExt};
5use serde_json::json;
6use tokio_util::sync::CancellationToken;
7use tracing::{debug, error, warn};
8
9pub struct Worker {
10    consumer: Consumer,
11    cancellation_token: CancellationToken,
12    concurrency: Option<usize>,
13    poll_interval: Option<u64>,
14}
15
16impl Worker {
17    pub fn new(consumer: Consumer) -> Self {
18        Self {
19            cancellation_token: CancellationToken::new(),
20            consumer,
21            concurrency: None,
22            poll_interval: Some(3000),
23        }
24    }
25
26    pub fn concurrency(&self) -> Option<usize> {
27        self.concurrency
28    }
29
30    pub fn with_concurrency(mut self, concurrency: Option<usize>) -> Self {
31        self.concurrency = concurrency;
32        self
33    }
34
35    pub fn poll_interval(&self) -> &Option<u64> {
36        &self.poll_interval
37    }
38
39    pub fn with_poll_interval(mut self, poll_interval: Option<u64>) -> Self {
40        self.poll_interval = poll_interval;
41        self
42    }
43
44    pub fn cancellation_token(&self) -> &CancellationToken {
45        &self.cancellation_token
46    }
47
48    pub fn with_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
49        self.cancellation_token = cancellation_token;
50        self
51    }
52
53    pub async fn run(self, job_processor: impl JobProcessor) -> Result<(), Error> {
54        let interval =
55            tokio::time::interval(Duration::from_millis(self.poll_interval.unwrap_or(3000)));
56
57        let queues: Vec<&str> = self.consumer.handlers().keys().map(|k| &**k).collect();
58
59        let ct = pin!(self.cancellation_token.cancelled().fuse());
60
61        let job_stream = stream::unfold((interval, ct), |mut f| async {
62            tokio::select! {
63                 _ = (&mut f.0).tick() => Some((StreamSource::Polling, f)),
64                _ = &mut f.1 => None,
65            }
66        });
67
68        job_stream
69            .then(|source| self.process_next_job(source, &job_processor, &queues))
70            .try_for_each_concurrent(self.concurrency, |_| ready(Ok(())))
71            .await?;
72
73        Ok(())
74    }
75
76    async fn process_next_job<T: JobProcessor>(
77        &self,
78        _source: StreamSource,
79        job_processor: &T,
80        queues: &[&str],
81    ) -> Result<(), Error> {
82        loop {
83            match job_processor.poll_next_job(queues).await? {
84                Some(job) => {
85                    // TODO: Probably want to filter via queues+kind instead of just queue. But for now
86                    // using queues so it is compatible with other backends.
87                    let handler = self
88                        .consumer
89                        .handlers()
90                        .get(job.queue())
91                        .unwrap()
92                        .get(job.kind());
93
94                    match handler {
95                        Some(handler) => {
96                            let id = job.id().to_string();
97                            let ctx = Context::new(job, self.cancellation_token.clone());
98
99                            match handler.handle(ctx).await {
100                                Ok(result) => match result {
101                                    crate::JobResult::CompleteWithSuccess => {
102                                        job_processor
103                                            .complete_job_with_success(
104                                                handler.queue(),
105                                                handler.kind(),
106                                                &id,
107                                            )
108                                            .await?;
109                                    }
110                                    crate::JobResult::CompleteWithCancelled(message) => {
111                                        job_processor
112                                            .complete_job_with_cancelled(
113                                                handler.queue(),
114                                                handler.kind(),
115                                                &id,
116                                                message,
117                                            )
118                                            .await?;
119                                    }
120                                },
121                                Err(e) => {
122                                    error!(
123                                        "Job queue={}, kind={}, id={} failed with {:?}",
124                                        handler.queue(),
125                                        handler.kind(),
126                                        &id,
127                                        &e
128                                    );
129                                    job_processor
130                                        .fail_job(
131                                            handler.queue(),
132                                            handler.kind(),
133                                            &id,
134                                            json!({ "error": e.to_string() }),
135                                        )
136                                        .await?;
137                                }
138                            }
139                        }
140                        None => {
141                            warn!(
142                                "handler not registered. queue={} kind={}",
143                                job.queue(),
144                                job.kind()
145                            );
146                        }
147                    }
148                }
149                None => {
150                    debug!("No new jobs found");
151                    break;
152                }
153            }
154        }
155
156        Ok(())
157    }
158}
159
160#[derive(Debug)]
161enum StreamSource {
162    Polling,
163}