Skip to main content

sidekiq/
processor.rs

1use super::Result;
2use crate::{
3    periodic::PeriodicJob, Chain, Counter, Job, RedisPool, Scheduled, ServerMiddleware,
4    StatsPublisher, UnitOfWork, Worker, WorkerRef,
5};
6use std::collections::{BTreeMap, VecDeque};
7use std::sync::Arc;
8use tokio::select;
9use tokio::task::JoinSet;
10use tokio_util::sync::CancellationToken;
11use tracing::{debug, error, info};
12
13#[derive(Clone, Eq, PartialEq, Debug)]
14pub enum WorkFetcher {
15    NoWorkFound,
16    Done,
17}
18
19#[derive(Clone)]
20pub struct Processor {
21    redis: RedisPool,
22    queues: VecDeque<String>,
23    human_readable_queues: Vec<String>,
24    periodic_jobs: Vec<PeriodicJob>,
25    workers: BTreeMap<String, Arc<WorkerRef>>,
26    chain: Chain,
27    busy_jobs: Counter,
28    cancellation_token: CancellationToken,
29    config: ProcessorConfig,
30}
31
32#[derive(Clone)]
33#[non_exhaustive]
34pub struct ProcessorConfig {
35    /// The number of Sidekiq workers that can run at the same time. Adjust as needed based on
36    /// your workload and resource (cpu/memory/etc) usage.
37    ///
38    /// This config value controls how many workers are spawned to handle the queues provided
39    /// to [`Processor::new`]. These workers will be shared across all of these queues.
40    ///
41    /// If your workload is largely CPU-bound (computationally expensive), this should probably
42    /// match your CPU count. This is the default.
43    ///
44    /// If your workload is largely IO-bound (e.g. reading from a DB, making web requests and
45    /// waiting for responses, etc), this can probably be quite a bit higher than your CPU count.
46    pub num_workers: usize,
47
48    /// The strategy for balancing the priority of fetching queues' jobs from Redis. Defaults
49    /// to [`BalanceStrategy::RoundRobin`].
50    ///
51    /// The Redis API used to fetch jobs ([brpop](https://redis.io/docs/latest/commands/brpop/))
52    /// checks queues for jobs in the order the queues are provided. This means that if the first
53    /// queue in the list provided to [`Processor::new`] always has an item, the other queues
54    /// will never have their jobs run. To mitigate this, a [`BalanceStrategy`] can be provided
55    /// to allow ensuring that no queue is starved indefinitely.
56    pub balance_strategy: BalanceStrategy,
57
58    /// Queue-specific configurations. The queues specified in this field do not need to match
59    /// the list of queues provided to [`Processor::new`].
60    pub queue_configs: BTreeMap<String, QueueConfig>,
61}
62
63#[derive(Default, Clone)]
64#[non_exhaustive]
65pub enum BalanceStrategy {
66    /// Rotate the list of queues by 1 every time jobs are fetched from Redis. This allows each
67    /// queue in the list to have an equal opportunity to have its jobs run.
68    #[default]
69    RoundRobin,
70    /// Do not modify the list of queues. Warning: This can lead to queue starvation! For example,
71    /// if the first queue in the list provided to [`Processor::new`] is heavily used and always
72    /// has a job available to run, then the jobs in the other queues will never run.
73    None,
74}
75
76#[derive(Default, Clone)]
77#[non_exhaustive]
78pub struct QueueConfig {
79    /// Similar to `ProcessorConfig#num_workers`, except allows configuring the number of
80    /// additional workers to dedicate to a specific queue. If provided, `num_workers` additional
81    /// workers will be created for this specific queue.
82    pub num_workers: usize,
83}
84
85impl ProcessorConfig {
86    #[must_use]
87    pub fn num_workers(mut self, num_workers: usize) -> Self {
88        self.num_workers = num_workers;
89        self
90    }
91
92    #[must_use]
93    pub fn balance_strategy(mut self, balance_strategy: BalanceStrategy) -> Self {
94        self.balance_strategy = balance_strategy;
95        self
96    }
97
98    #[must_use]
99    pub fn queue_config(mut self, queue: String, config: QueueConfig) -> Self {
100        self.queue_configs.insert(queue, config);
101        self
102    }
103}
104
105impl Default for ProcessorConfig {
106    fn default() -> Self {
107        Self {
108            num_workers: num_cpus::get(),
109            balance_strategy: Default::default(),
110            queue_configs: Default::default(),
111        }
112    }
113}
114
115impl QueueConfig {
116    #[must_use]
117    pub fn num_workers(mut self, num_workers: usize) -> Self {
118        self.num_workers = num_workers;
119        self
120    }
121}
122
123impl Processor {
124    #[must_use]
125    pub fn new(redis: RedisPool, queues: Vec<String>) -> Self {
126        let busy_jobs = Counter::new(0);
127
128        Self {
129            chain: Chain::new_with_stats(busy_jobs.clone()),
130            workers: BTreeMap::new(),
131            periodic_jobs: vec![],
132            busy_jobs,
133
134            redis,
135            queues: queues
136                .iter()
137                .map(|queue| format!("queue:{queue}"))
138                .collect(),
139            human_readable_queues: queues,
140            cancellation_token: CancellationToken::new(),
141            config: Default::default(),
142        }
143    }
144
145    pub fn with_config(mut self, config: ProcessorConfig) -> Self {
146        self.config = config;
147        self
148    }
149
150    pub async fn fetch(&mut self) -> Result<Option<UnitOfWork>> {
151        self.run_balance_strategy();
152
153        let response: Option<(String, String)> = self
154            .redis
155            .get()
156            .await?
157            .brpop(self.queues.clone().into(), 2)
158            .await?;
159
160        if let Some((queue, job_raw)) = response {
161            let job: Job = serde_json::from_str(&job_raw)?;
162            return Ok(Some(UnitOfWork { queue, job }));
163        }
164
165        Ok(None)
166    }
167
168    /// Re-order the `Processor#queues` based on the `ProcessorConfig#balance_strategy`.
169    fn run_balance_strategy(&mut self) {
170        if self.queues.is_empty() {
171            return;
172        }
173
174        match self.config.balance_strategy {
175            BalanceStrategy::RoundRobin => self.queues.rotate_right(1),
176            BalanceStrategy::None => {}
177        }
178    }
179
180    pub async fn process_one(&mut self) -> Result<()> {
181        loop {
182            if self.cancellation_token.is_cancelled() {
183                return Ok(());
184            }
185
186            if let WorkFetcher::NoWorkFound = self.process_one_tick_once().await? {
187                continue;
188            }
189
190            return Ok(());
191        }
192    }
193
194    pub async fn process_one_tick_once(&mut self) -> Result<WorkFetcher> {
195        let work = self.fetch().await?;
196
197        if work.is_none() {
198            // If there is no job to handle, we need to add a `yield_now` in order to allow tokio's
199            // scheduler to wake up another task that may be waiting to acquire a connection from
200            // the Redis connection pool. See the following issue for more details:
201            // https://github.com/film42/sidekiq-rs/issues/43
202            tokio::task::yield_now().await;
203            return Ok(WorkFetcher::NoWorkFound);
204        }
205        let work = work.expect("polled and found some work");
206
207        let started = std::time::Instant::now();
208
209        info!({
210            "status" = "start",
211            "class" = &work.job.class,
212            "queue" = &work.job.queue,
213            "jid" = &work.job.jid
214        }, "sidekiq");
215
216        let worker = if let Some(worker) = self.workers.get(&work.job.class) {
217            worker.clone()
218        } else {
219            Arc::new(WorkerRef::not_found(work.job.class.clone()))
220        };
221
222        self.chain
223            .call(&work.job, worker, self.redis.clone())
224            .await?;
225
226        // TODO: Make this only say "done" when the job is successful.
227        // We might need to change the ChainIter to return the final job and
228        // detect any retries?
229        info!({
230            "elapsed" = format!("{:?}", started.elapsed()),
231            "status" = "done",
232            "class" = &work.job.class,
233            "queue" = &work.job.queue,
234            "jid" = &work.job.jid}, "sidekiq");
235
236        Ok(WorkFetcher::Done)
237    }
238
239    pub fn register<
240        Args: Sync + Send + for<'de> serde::Deserialize<'de> + 'static,
241        W: Worker<Args> + 'static,
242    >(
243        &mut self,
244        worker: W,
245    ) {
246        self.workers
247            .insert(W::class_name(), Arc::new(WorkerRef::wrap(Arc::new(worker))));
248    }
249
250    pub fn get_cancellation_token(&self) -> CancellationToken {
251        self.cancellation_token.clone()
252    }
253
254    pub(crate) async fn register_periodic(&mut self, periodic_job: PeriodicJob) -> Result<()> {
255        self.periodic_jobs.push(periodic_job.clone());
256
257        let mut conn = self.redis.get().await?;
258        periodic_job.insert(&mut conn).await?;
259
260        info!({
261            "args" = &periodic_job.args,
262            "class" = &periodic_job.class,
263            "queue" = &periodic_job.queue,
264            "name" = &periodic_job.name,
265            "cron" = &periodic_job.cron,
266        },"Inserting periodic job");
267
268        Ok(())
269    }
270
271    /// Takes self to consume the processor. This is for life-cycle management, not
272    /// memory safety because you can clone processor pretty easily.
273    pub async fn run(self) {
274        let mut join_set: JoinSet<()> = JoinSet::new();
275
276        // Logic for spawning shared workers (workers that handles multiple queues) and dedicated
277        // workers (workers that handle a single queue).
278        let spawn_worker = |mut processor: Processor,
279                            cancellation_token: CancellationToken,
280                            num: usize,
281                            dedicated_queue_name: Option<String>| {
282            async move {
283                loop {
284                    if let Err(err) = processor.process_one().await {
285                        error!("Error leaked out the bottom: {:?}", err);
286                    }
287
288                    if cancellation_token.is_cancelled() {
289                        break;
290                    }
291                }
292
293                let dedicated_queue_str = dedicated_queue_name
294                    .map(|name| format!(" dedicated to queue '{name}'"))
295                    .unwrap_or_default();
296                debug!("Broke out of loop for worker {num}{dedicated_queue_str}");
297            }
298        };
299
300        // Start worker routines.
301        for i in 0..self.config.num_workers {
302            join_set.spawn(spawn_worker(
303                self.clone(),
304                self.cancellation_token.clone(),
305                i,
306                None,
307            ));
308        }
309
310        // Start dedicated worker routines.
311        for (queue, config) in &self.config.queue_configs {
312            for i in 0..config.num_workers {
313                join_set.spawn({
314                    let mut processor = self.clone();
315                    processor.queues = [queue.clone()].into();
316                    spawn_worker(
317                        processor,
318                        self.cancellation_token.clone(),
319                        i,
320                        Some(queue.clone()),
321                    )
322                });
323            }
324        }
325
326        // Start sidekiq-web metrics publisher.
327        join_set.spawn({
328            let redis = self.redis.clone();
329            let queues = self.human_readable_queues.clone();
330            let busy_jobs = self.busy_jobs.clone();
331            let cancellation_token = self.cancellation_token.clone();
332            async move {
333                let hostname = if let Some(host) = gethostname::gethostname().to_str() {
334                    host.to_string()
335                } else {
336                    "UNKNOWN_HOSTNAME".to_string()
337                };
338
339                let stats_publisher =
340                    StatsPublisher::new(hostname, queues, busy_jobs, self.config.num_workers);
341
342                loop {
343                    // TODO: Use process count to meet a 5 second avg.
344                    select! {
345                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
346                        _ = cancellation_token.cancelled() => {
347                            break;
348                        }
349                    }
350
351                    if let Err(err) = stats_publisher.publish_stats(redis.clone()).await {
352                        error!("Error publishing processor stats: {:?}", err);
353                    }
354                }
355
356                debug!("Broke out of loop web metrics");
357            }
358        });
359
360        // Start retry and scheduled routines.
361        join_set.spawn({
362            let redis = self.redis.clone();
363            let cancellation_token = self.cancellation_token.clone();
364            async move {
365                let sched = Scheduled::new(redis);
366                let sorted_sets = vec!["retry".to_string(), "schedule".to_string()];
367
368                loop {
369                    // TODO: Use process count to meet a 5 second avg.
370                    select! {
371                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
372                        _ = cancellation_token.cancelled() => {
373                            break;
374                        }
375                    }
376
377                    if let Err(err) = sched.enqueue_jobs(chrono::Utc::now(), &sorted_sets).await {
378                        error!("Error in scheduled poller routine: {:?}", err);
379                    }
380                }
381
382                debug!("Broke out of loop for retry and scheduled");
383            }
384        });
385
386        // Watch for periodic jobs and enqueue jobs.
387        join_set.spawn({
388            let redis = self.redis.clone();
389            let cancellation_token = self.cancellation_token.clone();
390            async move {
391                let sched = Scheduled::new(redis);
392
393                loop {
394                    // TODO: Use process count to meet a 30 second avg.
395                    select! {
396                        _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {}
397                        _ = cancellation_token.cancelled() => {
398                            break;
399                        }
400                    }
401
402                    if let Err(err) = sched.enqueue_periodic_jobs(chrono::Utc::now()).await {
403                        error!("Error in periodic job poller routine: {}", err);
404                    }
405                }
406
407                debug!("Broke out of loop for periodic");
408            }
409        });
410
411        while let Some(result) = join_set.join_next().await {
412            if let Err(err) = result {
413                error!("Processor had a spawned task return an error: {}", err);
414            }
415        }
416    }
417
418    pub async fn using<M>(&mut self, middleware: M)
419    where
420        M: ServerMiddleware + Send + Sync + 'static,
421    {
422        self.chain.using(Box::new(middleware)).await;
423    }
424}