sidekiq/
processor.rs

1use super::Result;
2use crate::{
3    periodic::PeriodicJob, Chain, Counter, Job, RedisPool, Scheduled, ServerMiddleware,
4    StatsPublisher, UnitOfWork, Worker, WorkerRef,
5};
6use std::collections::{BTreeMap, VecDeque};
7use std::sync::Arc;
8use tokio::select;
9use tokio::task::JoinSet;
10use tokio_util::sync::CancellationToken;
11use tracing::{debug, error, info};
12
13#[derive(Clone, Eq, PartialEq, Debug)]
14pub enum WorkFetcher {
15    NoWorkFound,
16    Done,
17}
18
19#[derive(Clone)]
20pub struct Processor {
21    redis: RedisPool,
22    queues: VecDeque<String>,
23    human_readable_queues: Vec<String>,
24    periodic_jobs: Vec<PeriodicJob>,
25    workers: BTreeMap<String, Arc<WorkerRef>>,
26    chain: Chain,
27    busy_jobs: Counter,
28    cancellation_token: CancellationToken,
29    config: ProcessorConfig,
30}
31
32#[derive(Clone)]
33#[non_exhaustive]
34pub struct ProcessorConfig {
35    /// The number of Sidekiq workers that can run at the same time. Adjust as needed based on
36    /// your workload and resource (cpu/memory/etc) usage.
37    ///
38    /// This config value controls how many workers are spawned to handle the queues provided
39    /// to [`Processor::new`]. These workers will be shared across all of these queues.
40    ///
41    /// If your workload is largely CPU-bound (computationally expensive), this should probably
42    /// match your CPU count. This is the default.
43    ///
44    /// If your workload is largely IO-bound (e.g. reading from a DB, making web requests and
45    /// waiting for responses, etc), this can probably be quite a bit higher than your CPU count.
46    pub num_workers: usize,
47
48    /// The strategy for balancing the priority of fetching queues' jobs from Redis. Defaults
49    /// to [`BalanceStrategy::RoundRobin`].
50    ///
51    /// The Redis API used to fetch jobs ([brpop](https://redis.io/docs/latest/commands/brpop/))
52    /// checks queues for jobs in the order the queues are provided. This means that if the first
53    /// queue in the list provided to [`Processor::new`] always has an item, the other queues
54    /// will never have their jobs run. To mitigate this, a [`BalanceStrategy`] can be provided
55    /// to allow ensuring that no queue is starved indefinitely.
56    pub balance_strategy: BalanceStrategy,
57
58    /// Queue-specific configurations. The queues specified in this field do not need to match
59    /// the list of queues provided to [`Processor::new`].
60    pub queue_configs: BTreeMap<String, QueueConfig>,
61}
62
63#[derive(Default, Clone)]
64#[non_exhaustive]
65pub enum BalanceStrategy {
66    /// Rotate the list of queues by 1 every time jobs are fetched from Redis. This allows each
67    /// queue in the list to have an equal opportunity to have its jobs run.
68    #[default]
69    RoundRobin,
70    /// Do not modify the list of queues. Warning: This can lead to queue starvation! For example,
71    /// if the first queue in the list provided to [`Processor::new`] is heavily used and always
72    /// has a job available to run, then the jobs in the other queues will never run.
73    None,
74}
75
76#[derive(Default, Clone)]
77#[non_exhaustive]
78pub struct QueueConfig {
79    /// Similar to `ProcessorConfig#num_workers`, except allows configuring the number of
80    /// additional workers to dedicate to a specific queue. If provided, `num_workers` additional
81    /// workers will be created for this specific queue.
82    pub num_workers: usize,
83}
84
85impl ProcessorConfig {
86    #[must_use]
87    pub fn num_workers(mut self, num_workers: usize) -> Self {
88        self.num_workers = num_workers;
89        self
90    }
91
92    #[must_use]
93    pub fn balance_strategy(mut self, balance_strategy: BalanceStrategy) -> Self {
94        self.balance_strategy = balance_strategy;
95        self
96    }
97
98    #[must_use]
99    pub fn queue_config(mut self, queue: String, config: QueueConfig) -> Self {
100        self.queue_configs.insert(queue, config);
101        self
102    }
103}
104
105impl Default for ProcessorConfig {
106    fn default() -> Self {
107        Self {
108            num_workers: num_cpus::get(),
109            balance_strategy: Default::default(),
110            queue_configs: Default::default(),
111        }
112    }
113}
114
115impl QueueConfig {
116    #[must_use]
117    pub fn num_workers(mut self, num_workers: usize) -> Self {
118        self.num_workers = num_workers;
119        self
120    }
121}
122
123impl Processor {
124    #[must_use]
125    pub fn new(redis: RedisPool, queues: Vec<String>) -> Self {
126        let busy_jobs = Counter::new(0);
127
128        Self {
129            chain: Chain::new_with_stats(busy_jobs.clone()),
130            workers: BTreeMap::new(),
131            periodic_jobs: vec![],
132            busy_jobs,
133
134            redis,
135            queues: queues
136                .iter()
137                .map(|queue| format!("queue:{queue}"))
138                .collect(),
139            human_readable_queues: queues,
140            cancellation_token: CancellationToken::new(),
141            config: Default::default(),
142        }
143    }
144
145    pub fn with_config(mut self, config: ProcessorConfig) -> Self {
146        self.config = config;
147        self
148    }
149
150    pub async fn fetch(&mut self) -> Result<Option<UnitOfWork>> {
151        self.run_balance_strategy();
152
153        let response: Option<(String, String)> = self
154            .redis
155            .get()
156            .await?
157            .brpop(self.queues.clone().into(), 2)
158            .await?;
159
160        if let Some((queue, job_raw)) = response {
161            let job: Job = serde_json::from_str(&job_raw)?;
162            return Ok(Some(UnitOfWork { queue, job }));
163        }
164
165        Ok(None)
166    }
167
168    /// Re-order the `Processor#queues` based on the `ProcessorConfig#balance_strategy`.
169    fn run_balance_strategy(&mut self) {
170        if self.queues.is_empty() {
171            return;
172        }
173
174        match self.config.balance_strategy {
175            BalanceStrategy::RoundRobin => self.queues.rotate_right(1),
176            BalanceStrategy::None => {}
177        }
178    }
179
180    pub async fn process_one(&mut self) -> Result<()> {
181        loop {
182            if self.cancellation_token.is_cancelled() {
183                return Ok(());
184            }
185
186            if let WorkFetcher::NoWorkFound = self.process_one_tick_once().await? {
187                continue;
188            }
189
190            return Ok(());
191        }
192    }
193
194    pub async fn process_one_tick_once(&mut self) -> Result<WorkFetcher> {
195        let work = self.fetch().await?;
196
197        if work.is_none() {
198            // If there is no job to handle, we need to add a `yield_now` in order to allow tokio's
199            // scheduler to wake up another task that may be waiting to acquire a connection from
200            // the Redis connection pool. See the following issue for more details:
201            // https://github.com/film42/sidekiq-rs/issues/43
202            tokio::task::yield_now().await;
203            return Ok(WorkFetcher::NoWorkFound);
204        }
205        let mut work = work.expect("polled and found some work");
206
207        let started = std::time::Instant::now();
208
209        info!({
210            "status" = "start",
211            "class" = &work.job.class,
212            "queue" = &work.job.queue,
213            "jid" = &work.job.jid
214        }, "sidekiq");
215
216        if let Some(worker) = self.workers.get_mut(&work.job.class) {
217            self.chain
218                .call(&work.job, worker.clone(), self.redis.clone())
219                .await?;
220        } else {
221            error!({
222                "staus" = "fail",
223                "class" = &work.job.class,
224                "queue" = &work.job.queue,
225                "jid" = &work.job.jid
226            },"!!! Worker not found !!!");
227            work.reenqueue(&self.redis).await?;
228        }
229
230        // TODO: Make this only say "done" when the job is successful.
231        // We might need to change the ChainIter to return the final job and
232        // detect any retries?
233        info!({
234            "elapsed" = format!("{:?}", started.elapsed()),
235            "status" = "done",
236            "class" = &work.job.class,
237            "queue" = &work.job.queue,
238            "jid" = &work.job.jid}, "sidekiq");
239
240        Ok(WorkFetcher::Done)
241    }
242
243    pub fn register<
244        Args: Sync + Send + for<'de> serde::Deserialize<'de> + 'static,
245        W: Worker<Args> + 'static,
246    >(
247        &mut self,
248        worker: W,
249    ) {
250        self.workers
251            .insert(W::class_name(), Arc::new(WorkerRef::wrap(Arc::new(worker))));
252    }
253
254    pub fn get_cancellation_token(&self) -> CancellationToken {
255        self.cancellation_token.clone()
256    }
257
258    pub(crate) async fn register_periodic(&mut self, periodic_job: PeriodicJob) -> Result<()> {
259        self.periodic_jobs.push(periodic_job.clone());
260
261        let mut conn = self.redis.get().await?;
262        periodic_job.insert(&mut conn).await?;
263
264        info!({
265            "args" = &periodic_job.args,
266            "class" = &periodic_job.class,
267            "queue" = &periodic_job.queue,
268            "name" = &periodic_job.name,
269            "cron" = &periodic_job.cron,
270        },"Inserting periodic job");
271
272        Ok(())
273    }
274
275    /// Takes self to consume the processor. This is for life-cycle management, not
276    /// memory safety because you can clone processor pretty easily.
277    pub async fn run(self) {
278        let mut join_set: JoinSet<()> = JoinSet::new();
279
280        // Logic for spawning shared workers (workers that handles multiple queues) and dedicated
281        // workers (workers that handle a single queue).
282        let spawn_worker = |mut processor: Processor,
283                            cancellation_token: CancellationToken,
284                            num: usize,
285                            dedicated_queue_name: Option<String>| {
286            async move {
287                loop {
288                    if let Err(err) = processor.process_one().await {
289                        error!("Error leaked out the bottom: {:?}", err);
290                    }
291
292                    if cancellation_token.is_cancelled() {
293                        break;
294                    }
295                }
296
297                let dedicated_queue_str = dedicated_queue_name
298                    .map(|name| format!(" dedicated to queue '{name}'"))
299                    .unwrap_or_default();
300                debug!("Broke out of loop for worker {num}{dedicated_queue_str}");
301            }
302        };
303
304        // Start worker routines.
305        for i in 0..self.config.num_workers {
306            join_set.spawn(spawn_worker(
307                self.clone(),
308                self.cancellation_token.clone(),
309                i,
310                None,
311            ));
312        }
313
314        // Start dedicated worker routines.
315        for (queue, config) in &self.config.queue_configs {
316            for i in 0..config.num_workers {
317                join_set.spawn({
318                    let mut processor = self.clone();
319                    processor.queues = [queue.clone()].into();
320                    spawn_worker(
321                        processor,
322                        self.cancellation_token.clone(),
323                        i,
324                        Some(queue.clone()),
325                    )
326                });
327            }
328        }
329
330        // Start sidekiq-web metrics publisher.
331        join_set.spawn({
332            let redis = self.redis.clone();
333            let queues = self.human_readable_queues.clone();
334            let busy_jobs = self.busy_jobs.clone();
335            let cancellation_token = self.cancellation_token.clone();
336            async move {
337                let hostname = if let Some(host) = gethostname::gethostname().to_str() {
338                    host.to_string()
339                } else {
340                    "UNKNOWN_HOSTNAME".to_string()
341                };
342
343                let stats_publisher =
344                    StatsPublisher::new(hostname, queues, busy_jobs, self.config.num_workers);
345
346                loop {
347                    // TODO: Use process count to meet a 5 second avg.
348                    select! {
349                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
350                        _ = cancellation_token.cancelled() => {
351                            break;
352                        }
353                    }
354
355                    if let Err(err) = stats_publisher.publish_stats(redis.clone()).await {
356                        error!("Error publishing processor stats: {:?}", err);
357                    }
358                }
359
360                debug!("Broke out of loop web metrics");
361            }
362        });
363
364        // Start retry and scheduled routines.
365        join_set.spawn({
366            let redis = self.redis.clone();
367            let cancellation_token = self.cancellation_token.clone();
368            async move {
369                let sched = Scheduled::new(redis);
370                let sorted_sets = vec!["retry".to_string(), "schedule".to_string()];
371
372                loop {
373                    // TODO: Use process count to meet a 5 second avg.
374                    select! {
375                        _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
376                        _ = cancellation_token.cancelled() => {
377                            break;
378                        }
379                    }
380
381                    if let Err(err) = sched.enqueue_jobs(chrono::Utc::now(), &sorted_sets).await {
382                        error!("Error in scheduled poller routine: {:?}", err);
383                    }
384                }
385
386                debug!("Broke out of loop for retry and scheduled");
387            }
388        });
389
390        // Watch for periodic jobs and enqueue jobs.
391        join_set.spawn({
392            let redis = self.redis.clone();
393            let cancellation_token = self.cancellation_token.clone();
394            async move {
395                let sched = Scheduled::new(redis);
396
397                loop {
398                    // TODO: Use process count to meet a 30 second avg.
399                    select! {
400                        _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {}
401                        _ = cancellation_token.cancelled() => {
402                            break;
403                        }
404                    }
405
406                    if let Err(err) = sched.enqueue_periodic_jobs(chrono::Utc::now()).await {
407                        error!("Error in periodic job poller routine: {}", err);
408                    }
409                }
410
411                debug!("Broke out of loop for periodic");
412            }
413        });
414
415        while let Some(result) = join_set.join_next().await {
416            if let Err(err) = result {
417                error!("Processor had a spawned task return an error: {}", err);
418            }
419        }
420    }
421
422    pub async fn using<M>(&mut self, middleware: M)
423    where
424        M: ServerMiddleware + Send + Sync + 'static,
425    {
426        self.chain.using(Box::new(middleware)).await;
427    }
428}