Skip to main content

sidekiq/
processor.rs

1use super::Result;
2use crate::stats::generate_tid;
3use crate::{
4    periodic::PeriodicJob, Chain, Counter, Job, RedisPool, Scheduled, ServerMiddleware,
5    StatsPublisher, UnitOfWork, Worker, WorkerRef,
6};
7use std::collections::{BTreeMap, VecDeque};
8use std::sync::Arc;
9use tokio::select;
10use tokio::task::JoinSet;
11use tokio_util::sync::CancellationToken;
12use tracing::{debug, error, info};
13
14#[derive(Clone, Eq, PartialEq, Debug)]
15pub enum WorkFetcher {
16    NoWorkFound,
17    Done,
18}
19
20#[derive(Clone)]
21pub struct Processor {
22    redis: RedisPool,
23    queues: VecDeque<String>,
24    human_readable_queues: Vec<String>,
25    periodic_jobs: Vec<PeriodicJob>,
26    workers: BTreeMap<String, Arc<WorkerRef>>,
27    chain: Chain,
28    busy_jobs: Counter,
29    cancellation_token: CancellationToken,
30    config: ProcessorConfig,
31    // Sidekiq-web WorkSet bookkeeping. Both are assigned per-worker by `run()`;
32    // when unset (e.g. a bare `process_one()` call), WorkSet publishing is
33    // skipped. `identity` is the shared process identity (the same one the
34    // heartbeat registers in `processes`); `tid` is this worker's thread id.
35    identity: Option<String>,
36    tid: Option<String>,
37}
38
39#[derive(Clone)]
40#[non_exhaustive]
41pub struct ProcessorConfig {
42    /// The number of Sidekiq workers that can run at the same time. Adjust as needed based on
43    /// your workload and resource (cpu/memory/etc) usage.
44    ///
45    /// This config value controls how many workers are spawned to handle the queues provided
46    /// to [`Processor::new`]. These workers will be shared across all of these queues.
47    ///
48    /// If your workload is largely CPU-bound (computationally expensive), this should probably
49    /// match your CPU count. This is the default.
50    ///
51    /// If your workload is largely IO-bound (e.g. reading from a DB, making web requests and
52    /// waiting for responses, etc), this can probably be quite a bit higher than your CPU count.
53    pub num_workers: usize,
54
55    /// The strategy for balancing the priority of fetching queues' jobs from Redis. Defaults
56    /// to [`BalanceStrategy::RoundRobin`].
57    ///
58    /// The Redis API used to fetch jobs ([brpop](https://redis.io/docs/latest/commands/brpop/))
59    /// checks queues for jobs in the order the queues are provided. This means that if the first
60    /// queue in the list provided to [`Processor::new`] always has an item, the other queues
61    /// will never have their jobs run. To mitigate this, a [`BalanceStrategy`] can be provided
62    /// to allow ensuring that no queue is starved indefinitely.
63    pub balance_strategy: BalanceStrategy,
64
65    /// Queue-specific configurations. The queues specified in this field do not need to match
66    /// the list of queues provided to [`Processor::new`].
67    pub queue_configs: BTreeMap<String, QueueConfig>,
68}
69
70#[derive(Default, Clone)]
71#[non_exhaustive]
72pub enum BalanceStrategy {
73    /// Rotate the list of queues by 1 every time jobs are fetched from Redis. This allows each
74    /// queue in the list to have an equal opportunity to have its jobs run.
75    #[default]
76    RoundRobin,
77    /// Do not modify the list of queues. Warning: This can lead to queue starvation! For example,
78    /// if the first queue in the list provided to [`Processor::new`] is heavily used and always
79    /// has a job available to run, then the jobs in the other queues will never run.
80    None,
81}
82
83#[derive(Default, Clone)]
84#[non_exhaustive]
85pub struct QueueConfig {
86    /// Similar to `ProcessorConfig#num_workers`, except allows configuring the number of
87    /// additional workers to dedicate to a specific queue. If provided, `num_workers` additional
88    /// workers will be created for this specific queue.
89    pub num_workers: usize,
90}
91
92impl ProcessorConfig {
93    #[must_use]
94    pub fn num_workers(mut self, num_workers: usize) -> Self {
95        self.num_workers = num_workers;
96        self
97    }
98
99    #[must_use]
100    pub fn balance_strategy(mut self, balance_strategy: BalanceStrategy) -> Self {
101        self.balance_strategy = balance_strategy;
102        self
103    }
104
105    #[must_use]
106    pub fn queue_config(mut self, queue: String, config: QueueConfig) -> Self {
107        self.queue_configs.insert(queue, config);
108        self
109    }
110}
111
112impl Default for ProcessorConfig {
113    fn default() -> Self {
114        Self {
115            num_workers: num_cpus::get(),
116            balance_strategy: Default::default(),
117            queue_configs: Default::default(),
118        }
119    }
120}
121
122impl QueueConfig {
123    #[must_use]
124    pub fn num_workers(mut self, num_workers: usize) -> Self {
125        self.num_workers = num_workers;
126        self
127    }
128}
129
130impl Processor {
131    #[must_use]
132    pub fn new(redis: RedisPool, queues: Vec<String>) -> Self {
133        let busy_jobs = Counter::new(0);
134
135        Self {
136            chain: Chain::new_with_stats(busy_jobs.clone()),
137            workers: BTreeMap::new(),
138            periodic_jobs: vec![],
139            busy_jobs,
140
141            redis,
142            queues: queues
143                .iter()
144                .map(|queue| format!("queue:{queue}"))
145                .collect(),
146            human_readable_queues: queues,
147            cancellation_token: CancellationToken::new(),
148            config: Default::default(),
149            identity: None,
150            tid: None,
151        }
152    }
153
154    pub fn with_config(mut self, config: ProcessorConfig) -> Self {
155        self.config = config;
156        self
157    }
158
159    pub async fn fetch(&mut self) -> Result<Option<UnitOfWork>> {
160        self.run_balance_strategy();
161
162        let response: Option<(String, String)> = self
163            .redis
164            .get()
165            .await?
166            .brpop(self.queues.clone().into(), 2)
167            .await?;
168
169        if let Some((queue, job_raw)) = response {
170            let job: Job = serde_json::from_str(&job_raw)?;
171            return Ok(Some(UnitOfWork { queue, job }));
172        }
173
174        Ok(None)
175    }
176
177    /// Re-order the `Processor#queues` based on the `ProcessorConfig#balance_strategy`.
178    fn run_balance_strategy(&mut self) {
179        if self.queues.is_empty() {
180            return;
181        }
182
183        match self.config.balance_strategy {
184            BalanceStrategy::RoundRobin => self.queues.rotate_right(1),
185            BalanceStrategy::None => {}
186        }
187    }
188
189    pub async fn process_one(&mut self) -> Result<()> {
190        loop {
191            if self.cancellation_token.is_cancelled() {
192                return Ok(());
193            }
194
195            if let WorkFetcher::NoWorkFound = self.process_one_tick_once().await? {
196                continue;
197            }
198
199            return Ok(());
200        }
201    }
202
203    pub async fn process_one_tick_once(&mut self) -> Result<WorkFetcher> {
204        let work = self.fetch().await?;
205
206        if work.is_none() {
207            // If there is no job to handle, we need to add a `yield_now` in order to allow tokio's
208            // scheduler to wake up another task that may be waiting to acquire a connection from
209            // the Redis connection pool. See the following issue for more details:
210            // https://github.com/film42/sidekiq-rs/issues/43
211            tokio::task::yield_now().await;
212            return Ok(WorkFetcher::NoWorkFound);
213        }
214        let work = work.expect("polled and found some work");
215
216        let started = std::time::Instant::now();
217
218        info!({
219            "status" = "start",
220            "class" = &work.job.class,
221            "queue" = &work.job.queue,
222            "jid" = &work.job.jid
223        }, "sidekiq");
224
225        let worker = if let Some(worker) = self.workers.get(&work.job.class) {
226            worker.clone()
227        } else {
228            Arc::new(WorkerRef::not_found(work.job.class.clone()))
229        };
230
231        // Publish this job to the Sidekiq WorkSet (`<identity>:work`) so it shows
232        // on the web "Busy" page, then clear it whether the job succeeds or fails.
233        self.set_work(&work).await;
234        let result = self
235            .chain
236            .call(&work.job, worker, self.redis.clone())
237            .await;
238        self.clear_work().await;
239        result?;
240
241        // TODO: Make this only say "done" when the job is successful.
242        // We might need to change the ChainIter to return the final job and
243        // detect any retries?
244        info!({
245            "elapsed" = format!("{:?}", started.elapsed()),
246            "status" = "done",
247            "class" = &work.job.class,
248            "queue" = &work.job.queue,
249            "jid" = &work.job.jid}, "sidekiq");
250
251        Ok(WorkFetcher::Done)
252    }
253
254    /// Record an in-flight job in this process's Sidekiq WorkSet
255    /// (`<identity>:work`) so it shows on the web UI "Busy" page. Best-effort:
256    /// any Redis error is logged and never interrupts job processing. A no-op
257    /// unless an `identity` + `tid` were assigned (i.e. running under `run()`).
258    async fn set_work(&self, work: &UnitOfWork) {
259        let (Some(identity), Some(tid)) = (self.identity.as_deref(), self.tid.as_deref()) else {
260            return;
261        };
262
263        let result: Result<()> = async {
264            let key = format!("{identity}:work");
265            let mut conn = self.redis.get().await?;
266            conn.hset(key.clone(), tid.to_string(), work_record(&work.job)?)
267                .await?;
268            conn.expire(key, 60).await?;
269            Ok(())
270        }
271        .await;
272
273        if let Err(err) = result {
274            error!("Error recording sidekiq work state: {:?}", err);
275        }
276    }
277
278    /// Clear this worker's WorkSet entry once the job finishes (success or fail).
279    async fn clear_work(&self) {
280        let (Some(identity), Some(tid)) = (self.identity.as_deref(), self.tid.as_deref()) else {
281            return;
282        };
283
284        let result: Result<()> = async {
285            let mut conn = self.redis.get().await?;
286            conn.hdel(format!("{identity}:work"), tid.to_string())
287                .await?;
288            Ok(())
289        }
290        .await;
291
292        if let Err(err) = result {
293            error!("Error clearing sidekiq work state: {:?}", err);
294        }
295    }
296
297    pub fn register<
298        Args: Sync + Send + for<'de> serde::Deserialize<'de> + 'static,
299        W: Worker<Args> + 'static,
300    >(
301        &mut self,
302        worker: W,
303    ) {
304        self.workers
305            .insert(W::class_name(), Arc::new(WorkerRef::wrap(Arc::new(worker))));
306    }
307
308    pub fn get_cancellation_token(&self) -> CancellationToken {
309        self.cancellation_token.clone()
310    }
311
312    pub(crate) async fn register_periodic(&mut self, periodic_job: PeriodicJob) -> Result<()> {
313        self.periodic_jobs.push(periodic_job.clone());
314
315        let mut conn = self.redis.get().await?;
316        periodic_job.insert(&mut conn).await?;
317
318        info!({
319            "args" = &periodic_job.args,
320            "class" = &periodic_job.class,
321            "queue" = &periodic_job.queue,
322            "name" = &periodic_job.name,
323            "cron" = &periodic_job.cron,
324        },"Inserting periodic job");
325
326        Ok(())
327    }
328
329    /// Takes self to consume the processor. This is for life-cycle management, not
330    /// memory safety because you can clone processor pretty easily.
331    pub async fn run(self) {
332        let mut join_set: JoinSet<()> = JoinSet::new();
333
334        // Build the stats publisher up front so its process identity can be shared
335        // with the workers: each worker records its in-flight job under that
336        // identity's WorkSet (`<identity>:work`) — the same identity the heartbeat
337        // registers in the `processes` set — so the web "Busy" page lists running
338        // jobs against this process.
339        let hostname = if let Some(host) = gethostname::gethostname().to_str() {
340            host.to_string()
341        } else {
342            "UNKNOWN_HOSTNAME".to_string()
343        };
344        let stats_publisher = StatsPublisher::new(
345            hostname,
346            self.human_readable_queues.clone(),
347            self.busy_jobs.clone(),
348            self.config.num_workers,
349        );
350        let identity = stats_publisher.identity().to_string();
351
352        // Logic for spawning shared workers (workers that handles multiple queues) and dedicated
353        // workers (workers that handle a single queue).
354        let spawn_worker = |mut processor: Processor,
355                            cancellation_token: CancellationToken,
356                            num: usize,
357                            dedicated_queue_name: Option<String>| {
358            async move {
359                loop {
360                    if let Err(err) = processor.process_one().await {
361                        error!("Error leaked out the bottom: {:?}", err);
362                    }
363
364                    if cancellation_token.is_cancelled() {
365                        break;
366                    }
367                }
368
369                let dedicated_queue_str = dedicated_queue_name
370                    .map(|name| format!(" dedicated to queue '{name}'"))
371                    .unwrap_or_default();
372                debug!("Broke out of loop for worker {num}{dedicated_queue_str}");
373            }
374        };
375
376        // Start worker routines.
377        for i in 0..self.config.num_workers {
378            let mut processor = self.clone();
379            processor.identity = Some(identity.clone());
380            processor.tid = Some(generate_tid());
381            join_set.spawn(spawn_worker(
382                processor,
383                self.cancellation_token.clone(),
384                i,
385                None,
386            ));
387        }
388
389        // Start dedicated worker routines.
390        for (queue, config) in &self.config.queue_configs {
391            for i in 0..config.num_workers {
392                join_set.spawn({
393                    let mut processor = self.clone();
394                    processor.queues = [queue.clone()].into();
395                    processor.identity = Some(identity.clone());
396                    processor.tid = Some(generate_tid());
397                    spawn_worker(
398                        processor,
399                        self.cancellation_token.clone(),
400                        i,
401                        Some(queue.clone()),
402                    )
403                });
404            }
405        }
406
407        // Start sidekiq-web metrics publisher. Consumes the `stats_publisher` built
408        // above (whose identity the workers share for the WorkSet).
409        join_set.spawn({
410            let redis = self.redis.clone();
411            let cancellation_token = self.cancellation_token.clone();
412            async move {
413                loop {
414                    // TODO: Use process count to meet a 5 second avg.
415                    select! {
416                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
417                        _ = cancellation_token.cancelled() => {
418                            break;
419                        }
420                    }
421
422                    if let Err(err) = stats_publisher.publish_stats(redis.clone()).await {
423                        error!("Error publishing processor stats: {:?}", err);
424                    }
425                }
426
427                // On graceful shutdown, remove the process from the `processes` set and
428                // delete the heartbeat hash. This mirrors Ruby Sidekiq's clear_heartbeat():
429                //   pipeline.srem("processes", [identity])
430                //   pipeline.unlink("#{identity}:work")
431                // Without this, stale entries accumulate in the `processes` set until the
432                // heartbeat hash's 60-second TTL expires — but the set membership has no TTL
433                // and never self-cleans.
434                let identity = stats_publisher.identity().to_string();
435                if let Err(err) = stats_publisher.deregister(redis.clone()).await {
436                    error!(
437                        identity = %identity,
438                        "Error deregistering processor from Redis on shutdown: {:?}",
439                        err
440                    );
441                }
442
443                debug!(identity = %identity, "Deregistered processor from Redis");
444            }
445        });
446
447        // Start retry and scheduled routines.
448        join_set.spawn({
449            let redis = self.redis.clone();
450            let cancellation_token = self.cancellation_token.clone();
451            async move {
452                let sched = Scheduled::new(redis);
453                let sorted_sets = vec!["retry".to_string(), "schedule".to_string()];
454
455                loop {
456                    // TODO: Use process count to meet a 5 second avg.
457                    select! {
458                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
459                        _ = cancellation_token.cancelled() => {
460                            break;
461                        }
462                    }
463
464                    if let Err(err) = sched.enqueue_jobs(chrono::Utc::now(), &sorted_sets).await {
465                        error!("Error in scheduled poller routine: {:?}", err);
466                    }
467                }
468
469                debug!("Broke out of loop for retry and scheduled");
470            }
471        });
472
473        // Watch for periodic jobs and enqueue jobs.
474        join_set.spawn({
475            let redis = self.redis.clone();
476            let cancellation_token = self.cancellation_token.clone();
477            async move {
478                let sched = Scheduled::new(redis);
479
480                loop {
481                    // TODO: Use process count to meet a 30 second avg.
482                    select! {
483                        _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {}
484                        _ = cancellation_token.cancelled() => {
485                            break;
486                        }
487                    }
488
489                    if let Err(err) = sched.enqueue_periodic_jobs(chrono::Utc::now()).await {
490                        error!("Error in periodic job poller routine: {}", err);
491                    }
492                }
493
494                debug!("Broke out of loop for periodic");
495            }
496        });
497
498        while let Some(result) = join_set.join_next().await {
499            if let Err(err) = result {
500                error!("Processor had a spawned task return an error: {}", err);
501            }
502        }
503    }
504
505    pub async fn using<M>(&mut self, middleware: M)
506    where
507        M: ServerMiddleware + Send + Sync + 'static,
508    {
509        self.chain.using(Box::new(middleware)).await;
510    }
511}
512
513/// Build the value stored in the `<identity>:work` hash for an in-flight job,
514/// matching Ruby Sidekiq's `{queue, payload, run_at}` work record. `payload` is
515/// the job JSON as a *string*, exactly as Sidekiq stores it and the web UI
516/// expects it (`Sidekiq.load_json(work.payload)`).
517fn work_record(job: &Job) -> Result<String> {
518    let record = serde_json::json!({
519        "queue": job.queue,
520        "payload": serde_json::to_string(job)?,
521        "run_at": chrono::Utc::now().timestamp(),
522    });
523
524    Ok(record.to_string())
525}
526
527#[cfg(test)]
528mod work_set_tests {
529    use super::*;
530
531    #[test]
532    fn work_record_matches_sidekiq_shape() {
533        let job: Job = serde_json::from_str(
534            r#"{"queue":"default","args":[1,"x"],"retry":true,"class":"HardWorker","jid":"abc123","created_at":1700000000.0}"#,
535        )
536        .expect("parse job");
537
538        let record: serde_json::Value =
539            serde_json::from_str(&work_record(&job).expect("build record")).expect("parse record");
540
541        assert_eq!(record["queue"], "default");
542        assert!(record["run_at"].is_number());
543
544        // `payload` must be a JSON *string* (the job JSON), not a nested object.
545        let payload = record["payload"].as_str().expect("payload is a string");
546        let payload: serde_json::Value = serde_json::from_str(payload).expect("parse payload");
547        assert_eq!(payload["class"], "HardWorker");
548        assert_eq!(payload["jid"], "abc123");
549        assert_eq!(payload["args"][1], "x");
550        assert!(payload["args"][0].is_number());
551    }
552}