1use super::Result;
2use crate::stats::generate_tid;
3use crate::{
4 periodic::PeriodicJob, Chain, Counter, Job, RedisPool, Scheduled, ServerMiddleware,
5 StatsPublisher, UnitOfWork, Worker, WorkerRef,
6};
7use std::collections::{BTreeMap, VecDeque};
8use std::sync::Arc;
9use tokio::select;
10use tokio::task::JoinSet;
11use tokio_util::sync::CancellationToken;
12use tracing::{debug, error, info};
13
14#[derive(Clone, Eq, PartialEq, Debug)]
15pub enum WorkFetcher {
16 NoWorkFound,
17 Done,
18}
19
20#[derive(Clone)]
21pub struct Processor {
22 redis: RedisPool,
23 queues: VecDeque<String>,
24 human_readable_queues: Vec<String>,
25 periodic_jobs: Vec<PeriodicJob>,
26 workers: BTreeMap<String, Arc<WorkerRef>>,
27 chain: Chain,
28 busy_jobs: Counter,
29 cancellation_token: CancellationToken,
30 config: ProcessorConfig,
31 identity: Option<String>,
36 tid: Option<String>,
37}
38
39#[derive(Clone)]
40#[non_exhaustive]
41pub struct ProcessorConfig {
42 pub num_workers: usize,
54
55 pub balance_strategy: BalanceStrategy,
64
65 pub queue_configs: BTreeMap<String, QueueConfig>,
68}
69
70#[derive(Default, Clone)]
71#[non_exhaustive]
72pub enum BalanceStrategy {
73 #[default]
76 RoundRobin,
77 None,
81}
82
83#[derive(Default, Clone)]
84#[non_exhaustive]
85pub struct QueueConfig {
86 pub num_workers: usize,
90}
91
92impl ProcessorConfig {
93 #[must_use]
94 pub fn num_workers(mut self, num_workers: usize) -> Self {
95 self.num_workers = num_workers;
96 self
97 }
98
99 #[must_use]
100 pub fn balance_strategy(mut self, balance_strategy: BalanceStrategy) -> Self {
101 self.balance_strategy = balance_strategy;
102 self
103 }
104
105 #[must_use]
106 pub fn queue_config(mut self, queue: String, config: QueueConfig) -> Self {
107 self.queue_configs.insert(queue, config);
108 self
109 }
110}
111
112impl Default for ProcessorConfig {
113 fn default() -> Self {
114 Self {
115 num_workers: num_cpus::get(),
116 balance_strategy: Default::default(),
117 queue_configs: Default::default(),
118 }
119 }
120}
121
122impl QueueConfig {
123 #[must_use]
124 pub fn num_workers(mut self, num_workers: usize) -> Self {
125 self.num_workers = num_workers;
126 self
127 }
128}
129
130impl Processor {
131 #[must_use]
132 pub fn new(redis: RedisPool, queues: Vec<String>) -> Self {
133 let busy_jobs = Counter::new(0);
134
135 Self {
136 chain: Chain::new_with_stats(busy_jobs.clone()),
137 workers: BTreeMap::new(),
138 periodic_jobs: vec![],
139 busy_jobs,
140
141 redis,
142 queues: queues
143 .iter()
144 .map(|queue| format!("queue:{queue}"))
145 .collect(),
146 human_readable_queues: queues,
147 cancellation_token: CancellationToken::new(),
148 config: Default::default(),
149 identity: None,
150 tid: None,
151 }
152 }
153
154 pub fn with_config(mut self, config: ProcessorConfig) -> Self {
155 self.config = config;
156 self
157 }
158
159 pub async fn fetch(&mut self) -> Result<Option<UnitOfWork>> {
160 self.run_balance_strategy();
161
162 let response: Option<(String, String)> = self
163 .redis
164 .get()
165 .await?
166 .brpop(self.queues.clone().into(), 2)
167 .await?;
168
169 if let Some((queue, job_raw)) = response {
170 let job: Job = serde_json::from_str(&job_raw)?;
171 return Ok(Some(UnitOfWork { queue, job }));
172 }
173
174 Ok(None)
175 }
176
177 fn run_balance_strategy(&mut self) {
179 if self.queues.is_empty() {
180 return;
181 }
182
183 match self.config.balance_strategy {
184 BalanceStrategy::RoundRobin => self.queues.rotate_right(1),
185 BalanceStrategy::None => {}
186 }
187 }
188
189 pub async fn process_one(&mut self) -> Result<()> {
190 loop {
191 if self.cancellation_token.is_cancelled() {
192 return Ok(());
193 }
194
195 if let WorkFetcher::NoWorkFound = self.process_one_tick_once().await? {
196 continue;
197 }
198
199 return Ok(());
200 }
201 }
202
203 pub async fn process_one_tick_once(&mut self) -> Result<WorkFetcher> {
204 let work = self.fetch().await?;
205
206 if work.is_none() {
207 tokio::task::yield_now().await;
212 return Ok(WorkFetcher::NoWorkFound);
213 }
214 let work = work.expect("polled and found some work");
215
216 let started = std::time::Instant::now();
217
218 info!({
219 "status" = "start",
220 "class" = &work.job.class,
221 "queue" = &work.job.queue,
222 "jid" = &work.job.jid
223 }, "sidekiq");
224
225 let worker = if let Some(worker) = self.workers.get(&work.job.class) {
226 worker.clone()
227 } else {
228 Arc::new(WorkerRef::not_found(work.job.class.clone()))
229 };
230
231 self.set_work(&work).await;
234 let result = self
235 .chain
236 .call(&work.job, worker, self.redis.clone())
237 .await;
238 self.clear_work().await;
239 result?;
240
241 info!({
245 "elapsed" = format!("{:?}", started.elapsed()),
246 "status" = "done",
247 "class" = &work.job.class,
248 "queue" = &work.job.queue,
249 "jid" = &work.job.jid}, "sidekiq");
250
251 Ok(WorkFetcher::Done)
252 }
253
254 async fn set_work(&self, work: &UnitOfWork) {
259 let (Some(identity), Some(tid)) = (self.identity.as_deref(), self.tid.as_deref()) else {
260 return;
261 };
262
263 let result: Result<()> = async {
264 let key = format!("{identity}:work");
265 let mut conn = self.redis.get().await?;
266 conn.hset(key.clone(), tid.to_string(), work_record(&work.job)?)
267 .await?;
268 conn.expire(key, 60).await?;
269 Ok(())
270 }
271 .await;
272
273 if let Err(err) = result {
274 error!("Error recording sidekiq work state: {:?}", err);
275 }
276 }
277
278 async fn clear_work(&self) {
280 let (Some(identity), Some(tid)) = (self.identity.as_deref(), self.tid.as_deref()) else {
281 return;
282 };
283
284 let result: Result<()> = async {
285 let mut conn = self.redis.get().await?;
286 conn.hdel(format!("{identity}:work"), tid.to_string())
287 .await?;
288 Ok(())
289 }
290 .await;
291
292 if let Err(err) = result {
293 error!("Error clearing sidekiq work state: {:?}", err);
294 }
295 }
296
297 pub fn register<
298 Args: Sync + Send + for<'de> serde::Deserialize<'de> + 'static,
299 W: Worker<Args> + 'static,
300 >(
301 &mut self,
302 worker: W,
303 ) {
304 self.workers
305 .insert(W::class_name(), Arc::new(WorkerRef::wrap(Arc::new(worker))));
306 }
307
308 pub fn get_cancellation_token(&self) -> CancellationToken {
309 self.cancellation_token.clone()
310 }
311
312 pub(crate) async fn register_periodic(&mut self, periodic_job: PeriodicJob) -> Result<()> {
313 self.periodic_jobs.push(periodic_job.clone());
314
315 let mut conn = self.redis.get().await?;
316 periodic_job.insert(&mut conn).await?;
317
318 info!({
319 "args" = &periodic_job.args,
320 "class" = &periodic_job.class,
321 "queue" = &periodic_job.queue,
322 "name" = &periodic_job.name,
323 "cron" = &periodic_job.cron,
324 },"Inserting periodic job");
325
326 Ok(())
327 }
328
329 pub async fn run(self) {
332 let mut join_set: JoinSet<()> = JoinSet::new();
333
334 let hostname = if let Some(host) = gethostname::gethostname().to_str() {
340 host.to_string()
341 } else {
342 "UNKNOWN_HOSTNAME".to_string()
343 };
344 let stats_publisher = StatsPublisher::new(
345 hostname,
346 self.human_readable_queues.clone(),
347 self.busy_jobs.clone(),
348 self.config.num_workers,
349 );
350 let identity = stats_publisher.identity().to_string();
351
352 let spawn_worker = |mut processor: Processor,
355 cancellation_token: CancellationToken,
356 num: usize,
357 dedicated_queue_name: Option<String>| {
358 async move {
359 loop {
360 if let Err(err) = processor.process_one().await {
361 error!("Error leaked out the bottom: {:?}", err);
362 }
363
364 if cancellation_token.is_cancelled() {
365 break;
366 }
367 }
368
369 let dedicated_queue_str = dedicated_queue_name
370 .map(|name| format!(" dedicated to queue '{name}'"))
371 .unwrap_or_default();
372 debug!("Broke out of loop for worker {num}{dedicated_queue_str}");
373 }
374 };
375
376 for i in 0..self.config.num_workers {
378 let mut processor = self.clone();
379 processor.identity = Some(identity.clone());
380 processor.tid = Some(generate_tid());
381 join_set.spawn(spawn_worker(
382 processor,
383 self.cancellation_token.clone(),
384 i,
385 None,
386 ));
387 }
388
389 for (queue, config) in &self.config.queue_configs {
391 for i in 0..config.num_workers {
392 join_set.spawn({
393 let mut processor = self.clone();
394 processor.queues = [queue.clone()].into();
395 processor.identity = Some(identity.clone());
396 processor.tid = Some(generate_tid());
397 spawn_worker(
398 processor,
399 self.cancellation_token.clone(),
400 i,
401 Some(queue.clone()),
402 )
403 });
404 }
405 }
406
407 join_set.spawn({
410 let redis = self.redis.clone();
411 let cancellation_token = self.cancellation_token.clone();
412 async move {
413 loop {
414 select! {
416 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
417 _ = cancellation_token.cancelled() => {
418 break;
419 }
420 }
421
422 if let Err(err) = stats_publisher.publish_stats(redis.clone()).await {
423 error!("Error publishing processor stats: {:?}", err);
424 }
425 }
426
427 let identity = stats_publisher.identity().to_string();
435 if let Err(err) = stats_publisher.deregister(redis.clone()).await {
436 error!(
437 identity = %identity,
438 "Error deregistering processor from Redis on shutdown: {:?}",
439 err
440 );
441 }
442
443 debug!(identity = %identity, "Deregistered processor from Redis");
444 }
445 });
446
447 join_set.spawn({
449 let redis = self.redis.clone();
450 let cancellation_token = self.cancellation_token.clone();
451 async move {
452 let sched = Scheduled::new(redis);
453 let sorted_sets = vec!["retry".to_string(), "schedule".to_string()];
454
455 loop {
456 select! {
458 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
459 _ = cancellation_token.cancelled() => {
460 break;
461 }
462 }
463
464 if let Err(err) = sched.enqueue_jobs(chrono::Utc::now(), &sorted_sets).await {
465 error!("Error in scheduled poller routine: {:?}", err);
466 }
467 }
468
469 debug!("Broke out of loop for retry and scheduled");
470 }
471 });
472
473 join_set.spawn({
475 let redis = self.redis.clone();
476 let cancellation_token = self.cancellation_token.clone();
477 async move {
478 let sched = Scheduled::new(redis);
479
480 loop {
481 select! {
483 _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {}
484 _ = cancellation_token.cancelled() => {
485 break;
486 }
487 }
488
489 if let Err(err) = sched.enqueue_periodic_jobs(chrono::Utc::now()).await {
490 error!("Error in periodic job poller routine: {}", err);
491 }
492 }
493
494 debug!("Broke out of loop for periodic");
495 }
496 });
497
498 while let Some(result) = join_set.join_next().await {
499 if let Err(err) = result {
500 error!("Processor had a spawned task return an error: {}", err);
501 }
502 }
503 }
504
505 pub async fn using<M>(&mut self, middleware: M)
506 where
507 M: ServerMiddleware + Send + Sync + 'static,
508 {
509 self.chain.using(Box::new(middleware)).await;
510 }
511}
512
513fn work_record(job: &Job) -> Result<String> {
518 let record = serde_json::json!({
519 "queue": job.queue,
520 "payload": serde_json::to_string(job)?,
521 "run_at": chrono::Utc::now().timestamp(),
522 });
523
524 Ok(record.to_string())
525}
526
527#[cfg(test)]
528mod work_set_tests {
529 use super::*;
530
531 #[test]
532 fn work_record_matches_sidekiq_shape() {
533 let job: Job = serde_json::from_str(
534 r#"{"queue":"default","args":[1,"x"],"retry":true,"class":"HardWorker","jid":"abc123","created_at":1700000000.0}"#,
535 )
536 .expect("parse job");
537
538 let record: serde_json::Value =
539 serde_json::from_str(&work_record(&job).expect("build record")).expect("parse record");
540
541 assert_eq!(record["queue"], "default");
542 assert!(record["run_at"].is_number());
543
544 let payload = record["payload"].as_str().expect("payload is a string");
546 let payload: serde_json::Value = serde_json::from_str(payload).expect("parse payload");
547 assert_eq!(payload["class"], "HardWorker");
548 assert_eq!(payload["jid"], "abc123");
549 assert_eq!(payload["args"][1], "x");
550 assert!(payload["args"][0].is_number());
551 }
552}