1use std::{future::ready, pin::pin, time::Duration};
2
3use crate::{Consumer, Context, Error, JobProcessor};
4use futures::{stream, FutureExt, StreamExt, TryStreamExt};
5use serde_json::json;
6use tokio_util::sync::CancellationToken;
7use tracing::{debug, error, warn};
8
9pub struct Worker {
10 consumer: Consumer,
11 cancellation_token: CancellationToken,
12 concurrency: Option<usize>,
13 poll_interval: Option<u64>,
14}
15
16impl Worker {
17 pub fn new(consumer: Consumer) -> Self {
18 Self {
19 cancellation_token: CancellationToken::new(),
20 consumer,
21 concurrency: None,
22 poll_interval: Some(3000),
23 }
24 }
25
26 pub fn concurrency(&self) -> Option<usize> {
27 self.concurrency
28 }
29
30 pub fn with_concurrency(mut self, concurrency: Option<usize>) -> Self {
31 self.concurrency = concurrency;
32 self
33 }
34
35 pub fn poll_interval(&self) -> &Option<u64> {
36 &self.poll_interval
37 }
38
39 pub fn with_poll_interval(mut self, poll_interval: Option<u64>) -> Self {
40 self.poll_interval = poll_interval;
41 self
42 }
43
44 pub fn cancellation_token(&self) -> &CancellationToken {
45 &self.cancellation_token
46 }
47
48 pub fn with_cancellation_token(mut self, cancellation_token: CancellationToken) -> Self {
49 self.cancellation_token = cancellation_token;
50 self
51 }
52
53 pub async fn run(self, job_processor: impl JobProcessor) -> Result<(), Error> {
54 let interval =
55 tokio::time::interval(Duration::from_millis(self.poll_interval.unwrap_or(3000)));
56
57 let queues: Vec<&str> = self.consumer.handlers().keys().map(|k| &**k).collect();
58
59 let ct = pin!(self.cancellation_token.cancelled().fuse());
60
61 let job_stream = stream::unfold((interval, ct), |mut f| async {
62 tokio::select! {
63 _ = (&mut f.0).tick() => Some((StreamSource::Polling, f)),
64 _ = &mut f.1 => None,
65 }
66 });
67
68 job_stream
69 .then(|source| self.process_next_job(source, &job_processor, &queues))
70 .try_for_each_concurrent(self.concurrency, |_| ready(Ok(())))
71 .await?;
72
73 Ok(())
74 }
75
76 async fn process_next_job<T: JobProcessor>(
77 &self,
78 _source: StreamSource,
79 job_processor: &T,
80 queues: &[&str],
81 ) -> Result<(), Error> {
82 loop {
83 match job_processor.poll_next_job(queues).await? {
84 Some(job) => {
85 let handler = self
88 .consumer
89 .handlers()
90 .get(job.queue())
91 .unwrap()
92 .get(job.kind());
93
94 match handler {
95 Some(handler) => {
96 let id = job.id().to_string();
97 let ctx = Context::new(job, self.cancellation_token.clone());
98
99 match handler.handle(ctx).await {
100 Ok(result) => match result {
101 crate::JobResult::CompleteWithSuccess => {
102 job_processor
103 .complete_job_with_success(
104 handler.queue(),
105 handler.kind(),
106 &id,
107 )
108 .await?;
109 }
110 crate::JobResult::CompleteWithCancelled(message) => {
111 job_processor
112 .complete_job_with_cancelled(
113 handler.queue(),
114 handler.kind(),
115 &id,
116 message,
117 )
118 .await?;
119 }
120 },
121 Err(e) => {
122 error!(
123 "Job queue={}, kind={}, id={} failed with {:?}",
124 handler.queue(),
125 handler.kind(),
126 &id,
127 &e
128 );
129 job_processor
130 .fail_job(
131 handler.queue(),
132 handler.kind(),
133 &id,
134 json!({ "error": e.to_string() }),
135 )
136 .await?;
137 }
138 }
139 }
140 None => {
141 warn!(
142 "handler not registered. queue={} kind={}",
143 job.queue(),
144 job.kind()
145 );
146 }
147 }
148 }
149 None => {
150 debug!("No new jobs found");
151 break;
152 }
153 }
154 }
155
156 Ok(())
157 }
158}
159
160#[derive(Debug)]
161enum StreamSource {
162 Polling,
163}