1use super::Result;
2use crate::{
3 periodic::PeriodicJob, Chain, Counter, Job, RedisPool, Scheduled, ServerMiddleware,
4 StatsPublisher, UnitOfWork, Worker, WorkerRef,
5};
6use std::collections::{BTreeMap, VecDeque};
7use std::sync::Arc;
8use tokio::select;
9use tokio::task::JoinSet;
10use tokio_util::sync::CancellationToken;
11use tracing::{debug, error, info};
12
13#[derive(Clone, Eq, PartialEq, Debug)]
14pub enum WorkFetcher {
15 NoWorkFound,
16 Done,
17}
18
19#[derive(Clone)]
20pub struct Processor {
21 redis: RedisPool,
22 queues: VecDeque<String>,
23 human_readable_queues: Vec<String>,
24 periodic_jobs: Vec<PeriodicJob>,
25 workers: BTreeMap<String, Arc<WorkerRef>>,
26 chain: Chain,
27 busy_jobs: Counter,
28 cancellation_token: CancellationToken,
29 config: ProcessorConfig,
30}
31
32#[derive(Clone)]
33#[non_exhaustive]
34pub struct ProcessorConfig {
35 pub num_workers: usize,
47
48 pub balance_strategy: BalanceStrategy,
57
58 pub queue_configs: BTreeMap<String, QueueConfig>,
61}
62
63#[derive(Default, Clone)]
64#[non_exhaustive]
65pub enum BalanceStrategy {
66 #[default]
69 RoundRobin,
70 None,
74}
75
76#[derive(Default, Clone)]
77#[non_exhaustive]
78pub struct QueueConfig {
79 pub num_workers: usize,
83}
84
85impl ProcessorConfig {
86 #[must_use]
87 pub fn num_workers(mut self, num_workers: usize) -> Self {
88 self.num_workers = num_workers;
89 self
90 }
91
92 #[must_use]
93 pub fn balance_strategy(mut self, balance_strategy: BalanceStrategy) -> Self {
94 self.balance_strategy = balance_strategy;
95 self
96 }
97
98 #[must_use]
99 pub fn queue_config(mut self, queue: String, config: QueueConfig) -> Self {
100 self.queue_configs.insert(queue, config);
101 self
102 }
103}
104
105impl Default for ProcessorConfig {
106 fn default() -> Self {
107 Self {
108 num_workers: num_cpus::get(),
109 balance_strategy: Default::default(),
110 queue_configs: Default::default(),
111 }
112 }
113}
114
115impl QueueConfig {
116 #[must_use]
117 pub fn num_workers(mut self, num_workers: usize) -> Self {
118 self.num_workers = num_workers;
119 self
120 }
121}
122
123impl Processor {
124 #[must_use]
125 pub fn new(redis: RedisPool, queues: Vec<String>) -> Self {
126 let busy_jobs = Counter::new(0);
127
128 Self {
129 chain: Chain::new_with_stats(busy_jobs.clone()),
130 workers: BTreeMap::new(),
131 periodic_jobs: vec![],
132 busy_jobs,
133
134 redis,
135 queues: queues
136 .iter()
137 .map(|queue| format!("queue:{queue}"))
138 .collect(),
139 human_readable_queues: queues,
140 cancellation_token: CancellationToken::new(),
141 config: Default::default(),
142 }
143 }
144
145 pub fn with_config(mut self, config: ProcessorConfig) -> Self {
146 self.config = config;
147 self
148 }
149
150 pub async fn fetch(&mut self) -> Result<Option<UnitOfWork>> {
151 self.run_balance_strategy();
152
153 let response: Option<(String, String)> = self
154 .redis
155 .get()
156 .await?
157 .brpop(self.queues.clone().into(), 2)
158 .await?;
159
160 if let Some((queue, job_raw)) = response {
161 let job: Job = serde_json::from_str(&job_raw)?;
162 return Ok(Some(UnitOfWork { queue, job }));
163 }
164
165 Ok(None)
166 }
167
168 fn run_balance_strategy(&mut self) {
170 if self.queues.is_empty() {
171 return;
172 }
173
174 match self.config.balance_strategy {
175 BalanceStrategy::RoundRobin => self.queues.rotate_right(1),
176 BalanceStrategy::None => {}
177 }
178 }
179
180 pub async fn process_one(&mut self) -> Result<()> {
181 loop {
182 if self.cancellation_token.is_cancelled() {
183 return Ok(());
184 }
185
186 if let WorkFetcher::NoWorkFound = self.process_one_tick_once().await? {
187 continue;
188 }
189
190 return Ok(());
191 }
192 }
193
194 pub async fn process_one_tick_once(&mut self) -> Result<WorkFetcher> {
195 let work = self.fetch().await?;
196
197 if work.is_none() {
198 tokio::task::yield_now().await;
203 return Ok(WorkFetcher::NoWorkFound);
204 }
205 let work = work.expect("polled and found some work");
206
207 let started = std::time::Instant::now();
208
209 info!({
210 "status" = "start",
211 "class" = &work.job.class,
212 "queue" = &work.job.queue,
213 "jid" = &work.job.jid
214 }, "sidekiq");
215
216 let worker = if let Some(worker) = self.workers.get(&work.job.class) {
217 worker.clone()
218 } else {
219 Arc::new(WorkerRef::not_found(work.job.class.clone()))
220 };
221
222 self.chain
223 .call(&work.job, worker, self.redis.clone())
224 .await?;
225
226 info!({
230 "elapsed" = format!("{:?}", started.elapsed()),
231 "status" = "done",
232 "class" = &work.job.class,
233 "queue" = &work.job.queue,
234 "jid" = &work.job.jid}, "sidekiq");
235
236 Ok(WorkFetcher::Done)
237 }
238
239 pub fn register<
240 Args: Sync + Send + for<'de> serde::Deserialize<'de> + 'static,
241 W: Worker<Args> + 'static,
242 >(
243 &mut self,
244 worker: W,
245 ) {
246 self.workers
247 .insert(W::class_name(), Arc::new(WorkerRef::wrap(Arc::new(worker))));
248 }
249
250 pub fn get_cancellation_token(&self) -> CancellationToken {
251 self.cancellation_token.clone()
252 }
253
254 pub(crate) async fn register_periodic(&mut self, periodic_job: PeriodicJob) -> Result<()> {
255 self.periodic_jobs.push(periodic_job.clone());
256
257 let mut conn = self.redis.get().await?;
258 periodic_job.insert(&mut conn).await?;
259
260 info!({
261 "args" = &periodic_job.args,
262 "class" = &periodic_job.class,
263 "queue" = &periodic_job.queue,
264 "name" = &periodic_job.name,
265 "cron" = &periodic_job.cron,
266 },"Inserting periodic job");
267
268 Ok(())
269 }
270
271 pub async fn run(self) {
274 let mut join_set: JoinSet<()> = JoinSet::new();
275
276 let spawn_worker = |mut processor: Processor,
279 cancellation_token: CancellationToken,
280 num: usize,
281 dedicated_queue_name: Option<String>| {
282 async move {
283 loop {
284 if let Err(err) = processor.process_one().await {
285 error!("Error leaked out the bottom: {:?}", err);
286 }
287
288 if cancellation_token.is_cancelled() {
289 break;
290 }
291 }
292
293 let dedicated_queue_str = dedicated_queue_name
294 .map(|name| format!(" dedicated to queue '{name}'"))
295 .unwrap_or_default();
296 debug!("Broke out of loop for worker {num}{dedicated_queue_str}");
297 }
298 };
299
300 for i in 0..self.config.num_workers {
302 join_set.spawn(spawn_worker(
303 self.clone(),
304 self.cancellation_token.clone(),
305 i,
306 None,
307 ));
308 }
309
310 for (queue, config) in &self.config.queue_configs {
312 for i in 0..config.num_workers {
313 join_set.spawn({
314 let mut processor = self.clone();
315 processor.queues = [queue.clone()].into();
316 spawn_worker(
317 processor,
318 self.cancellation_token.clone(),
319 i,
320 Some(queue.clone()),
321 )
322 });
323 }
324 }
325
326 join_set.spawn({
328 let redis = self.redis.clone();
329 let queues = self.human_readable_queues.clone();
330 let busy_jobs = self.busy_jobs.clone();
331 let cancellation_token = self.cancellation_token.clone();
332 async move {
333 let hostname = if let Some(host) = gethostname::gethostname().to_str() {
334 host.to_string()
335 } else {
336 "UNKNOWN_HOSTNAME".to_string()
337 };
338
339 let stats_publisher =
340 StatsPublisher::new(hostname, queues, busy_jobs, self.config.num_workers);
341
342 loop {
343 select! {
345 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
346 _ = cancellation_token.cancelled() => {
347 break;
348 }
349 }
350
351 if let Err(err) = stats_publisher.publish_stats(redis.clone()).await {
352 error!("Error publishing processor stats: {:?}", err);
353 }
354 }
355
356 debug!("Broke out of loop web metrics");
357 }
358 });
359
360 join_set.spawn({
362 let redis = self.redis.clone();
363 let cancellation_token = self.cancellation_token.clone();
364 async move {
365 let sched = Scheduled::new(redis);
366 let sorted_sets = vec!["retry".to_string(), "schedule".to_string()];
367
368 loop {
369 select! {
371 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
372 _ = cancellation_token.cancelled() => {
373 break;
374 }
375 }
376
377 if let Err(err) = sched.enqueue_jobs(chrono::Utc::now(), &sorted_sets).await {
378 error!("Error in scheduled poller routine: {:?}", err);
379 }
380 }
381
382 debug!("Broke out of loop for retry and scheduled");
383 }
384 });
385
386 join_set.spawn({
388 let redis = self.redis.clone();
389 let cancellation_token = self.cancellation_token.clone();
390 async move {
391 let sched = Scheduled::new(redis);
392
393 loop {
394 select! {
396 _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {}
397 _ = cancellation_token.cancelled() => {
398 break;
399 }
400 }
401
402 if let Err(err) = sched.enqueue_periodic_jobs(chrono::Utc::now()).await {
403 error!("Error in periodic job poller routine: {}", err);
404 }
405 }
406
407 debug!("Broke out of loop for periodic");
408 }
409 });
410
411 while let Some(result) = join_set.join_next().await {
412 if let Err(err) = result {
413 error!("Processor had a spawned task return an error: {}", err);
414 }
415 }
416 }
417
418 pub async fn using<M>(&mut self, middleware: M)
419 where
420 M: ServerMiddleware + Send + Sync + 'static,
421 {
422 self.chain.using(Box::new(middleware)).await;
423 }
424}