1use super::Result;
2use crate::{
3 periodic::PeriodicJob, Chain, Counter, Job, RedisPool, Scheduled, ServerMiddleware,
4 StatsPublisher, UnitOfWork, Worker, WorkerRef,
5};
6use std::collections::{BTreeMap, VecDeque};
7use std::sync::Arc;
8use tokio::select;
9use tokio::task::JoinSet;
10use tokio_util::sync::CancellationToken;
11use tracing::{debug, error, info};
12
13#[derive(Clone, Eq, PartialEq, Debug)]
14pub enum WorkFetcher {
15 NoWorkFound,
16 Done,
17}
18
19#[derive(Clone)]
20pub struct Processor {
21 redis: RedisPool,
22 queues: VecDeque<String>,
23 human_readable_queues: Vec<String>,
24 periodic_jobs: Vec<PeriodicJob>,
25 workers: BTreeMap<String, Arc<WorkerRef>>,
26 chain: Chain,
27 busy_jobs: Counter,
28 cancellation_token: CancellationToken,
29 config: ProcessorConfig,
30}
31
32#[derive(Clone)]
33#[non_exhaustive]
34pub struct ProcessorConfig {
35 pub num_workers: usize,
47
48 pub balance_strategy: BalanceStrategy,
57
58 pub queue_configs: BTreeMap<String, QueueConfig>,
61}
62
63#[derive(Default, Clone)]
64#[non_exhaustive]
65pub enum BalanceStrategy {
66 #[default]
69 RoundRobin,
70 None,
74}
75
76#[derive(Default, Clone)]
77#[non_exhaustive]
78pub struct QueueConfig {
79 pub num_workers: usize,
83}
84
85impl ProcessorConfig {
86 #[must_use]
87 pub fn num_workers(mut self, num_workers: usize) -> Self {
88 self.num_workers = num_workers;
89 self
90 }
91
92 #[must_use]
93 pub fn balance_strategy(mut self, balance_strategy: BalanceStrategy) -> Self {
94 self.balance_strategy = balance_strategy;
95 self
96 }
97
98 #[must_use]
99 pub fn queue_config(mut self, queue: String, config: QueueConfig) -> Self {
100 self.queue_configs.insert(queue, config);
101 self
102 }
103}
104
105impl Default for ProcessorConfig {
106 fn default() -> Self {
107 Self {
108 num_workers: num_cpus::get(),
109 balance_strategy: Default::default(),
110 queue_configs: Default::default(),
111 }
112 }
113}
114
115impl QueueConfig {
116 #[must_use]
117 pub fn num_workers(mut self, num_workers: usize) -> Self {
118 self.num_workers = num_workers;
119 self
120 }
121}
122
123impl Processor {
124 #[must_use]
125 pub fn new(redis: RedisPool, queues: Vec<String>) -> Self {
126 let busy_jobs = Counter::new(0);
127
128 Self {
129 chain: Chain::new_with_stats(busy_jobs.clone()),
130 workers: BTreeMap::new(),
131 periodic_jobs: vec![],
132 busy_jobs,
133
134 redis,
135 queues: queues
136 .iter()
137 .map(|queue| format!("queue:{queue}"))
138 .collect(),
139 human_readable_queues: queues,
140 cancellation_token: CancellationToken::new(),
141 config: Default::default(),
142 }
143 }
144
145 pub fn with_config(mut self, config: ProcessorConfig) -> Self {
146 self.config = config;
147 self
148 }
149
150 pub async fn fetch(&mut self) -> Result<Option<UnitOfWork>> {
151 self.run_balance_strategy();
152
153 let response: Option<(String, String)> = self
154 .redis
155 .get()
156 .await?
157 .brpop(self.queues.clone().into(), 2)
158 .await?;
159
160 if let Some((queue, job_raw)) = response {
161 let job: Job = serde_json::from_str(&job_raw)?;
162 return Ok(Some(UnitOfWork { queue, job }));
163 }
164
165 Ok(None)
166 }
167
168 fn run_balance_strategy(&mut self) {
170 if self.queues.is_empty() {
171 return;
172 }
173
174 match self.config.balance_strategy {
175 BalanceStrategy::RoundRobin => self.queues.rotate_right(1),
176 BalanceStrategy::None => {}
177 }
178 }
179
180 pub async fn process_one(&mut self) -> Result<()> {
181 loop {
182 if self.cancellation_token.is_cancelled() {
183 return Ok(());
184 }
185
186 if let WorkFetcher::NoWorkFound = self.process_one_tick_once().await? {
187 continue;
188 }
189
190 return Ok(());
191 }
192 }
193
194 pub async fn process_one_tick_once(&mut self) -> Result<WorkFetcher> {
195 let work = self.fetch().await?;
196
197 if work.is_none() {
198 tokio::task::yield_now().await;
203 return Ok(WorkFetcher::NoWorkFound);
204 }
205 let mut work = work.expect("polled and found some work");
206
207 let started = std::time::Instant::now();
208
209 info!({
210 "status" = "start",
211 "class" = &work.job.class,
212 "queue" = &work.job.queue,
213 "jid" = &work.job.jid
214 }, "sidekiq");
215
216 if let Some(worker) = self.workers.get_mut(&work.job.class) {
217 self.chain
218 .call(&work.job, worker.clone(), self.redis.clone())
219 .await?;
220 } else {
221 error!({
222 "staus" = "fail",
223 "class" = &work.job.class,
224 "queue" = &work.job.queue,
225 "jid" = &work.job.jid
226 },"!!! Worker not found !!!");
227 work.reenqueue(&self.redis).await?;
228 }
229
230 info!({
234 "elapsed" = format!("{:?}", started.elapsed()),
235 "status" = "done",
236 "class" = &work.job.class,
237 "queue" = &work.job.queue,
238 "jid" = &work.job.jid}, "sidekiq");
239
240 Ok(WorkFetcher::Done)
241 }
242
243 pub fn register<
244 Args: Sync + Send + for<'de> serde::Deserialize<'de> + 'static,
245 W: Worker<Args> + 'static,
246 >(
247 &mut self,
248 worker: W,
249 ) {
250 self.workers
251 .insert(W::class_name(), Arc::new(WorkerRef::wrap(Arc::new(worker))));
252 }
253
254 pub fn get_cancellation_token(&self) -> CancellationToken {
255 self.cancellation_token.clone()
256 }
257
258 pub(crate) async fn register_periodic(&mut self, periodic_job: PeriodicJob) -> Result<()> {
259 self.periodic_jobs.push(periodic_job.clone());
260
261 let mut conn = self.redis.get().await?;
262 periodic_job.insert(&mut conn).await?;
263
264 info!({
265 "args" = &periodic_job.args,
266 "class" = &periodic_job.class,
267 "queue" = &periodic_job.queue,
268 "name" = &periodic_job.name,
269 "cron" = &periodic_job.cron,
270 },"Inserting periodic job");
271
272 Ok(())
273 }
274
275 pub async fn run(self) {
278 let mut join_set: JoinSet<()> = JoinSet::new();
279
280 let spawn_worker = |mut processor: Processor,
283 cancellation_token: CancellationToken,
284 num: usize,
285 dedicated_queue_name: Option<String>| {
286 async move {
287 loop {
288 if let Err(err) = processor.process_one().await {
289 error!("Error leaked out the bottom: {:?}", err);
290 }
291
292 if cancellation_token.is_cancelled() {
293 break;
294 }
295 }
296
297 let dedicated_queue_str = dedicated_queue_name
298 .map(|name| format!(" dedicated to queue '{name}'"))
299 .unwrap_or_default();
300 debug!("Broke out of loop for worker {num}{dedicated_queue_str}");
301 }
302 };
303
304 for i in 0..self.config.num_workers {
306 join_set.spawn(spawn_worker(
307 self.clone(),
308 self.cancellation_token.clone(),
309 i,
310 None,
311 ));
312 }
313
314 for (queue, config) in &self.config.queue_configs {
316 for i in 0..config.num_workers {
317 join_set.spawn({
318 let mut processor = self.clone();
319 processor.queues = [queue.clone()].into();
320 spawn_worker(
321 processor,
322 self.cancellation_token.clone(),
323 i,
324 Some(queue.clone()),
325 )
326 });
327 }
328 }
329
330 join_set.spawn({
332 let redis = self.redis.clone();
333 let queues = self.human_readable_queues.clone();
334 let busy_jobs = self.busy_jobs.clone();
335 let cancellation_token = self.cancellation_token.clone();
336 async move {
337 let hostname = if let Some(host) = gethostname::gethostname().to_str() {
338 host.to_string()
339 } else {
340 "UNKNOWN_HOSTNAME".to_string()
341 };
342
343 let stats_publisher =
344 StatsPublisher::new(hostname, queues, busy_jobs, self.config.num_workers);
345
346 loop {
347 select! {
349 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
350 _ = cancellation_token.cancelled() => {
351 break;
352 }
353 }
354
355 if let Err(err) = stats_publisher.publish_stats(redis.clone()).await {
356 error!("Error publishing processor stats: {:?}", err);
357 }
358 }
359
360 debug!("Broke out of loop web metrics");
361 }
362 });
363
364 join_set.spawn({
366 let redis = self.redis.clone();
367 let cancellation_token = self.cancellation_token.clone();
368 async move {
369 let sched = Scheduled::new(redis);
370 let sorted_sets = vec!["retry".to_string(), "schedule".to_string()];
371
372 loop {
373 select! {
375 _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {}
376 _ = cancellation_token.cancelled() => {
377 break;
378 }
379 }
380
381 if let Err(err) = sched.enqueue_jobs(chrono::Utc::now(), &sorted_sets).await {
382 error!("Error in scheduled poller routine: {:?}", err);
383 }
384 }
385
386 debug!("Broke out of loop for retry and scheduled");
387 }
388 });
389
390 join_set.spawn({
392 let redis = self.redis.clone();
393 let cancellation_token = self.cancellation_token.clone();
394 async move {
395 let sched = Scheduled::new(redis);
396
397 loop {
398 select! {
400 _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {}
401 _ = cancellation_token.cancelled() => {
402 break;
403 }
404 }
405
406 if let Err(err) = sched.enqueue_periodic_jobs(chrono::Utc::now()).await {
407 error!("Error in periodic job poller routine: {}", err);
408 }
409 }
410
411 debug!("Broke out of loop for periodic");
412 }
413 });
414
415 while let Some(result) = join_set.join_next().await {
416 if let Err(err) = result {
417 error!("Processor had a spawned task return an error: {}", err);
418 }
419 }
420 }
421
422 pub async fn using<M>(&mut self, middleware: M)
423 where
424 M: ServerMiddleware + Send + Sync + 'static,
425 {
426 self.chain.using(Box::new(middleware)).await;
427 }
428}