sidekiq/
scheduled.rs

1use crate::{periodic::PeriodicJob, RedisPool, UnitOfWork};
2use tracing::debug;
3
4pub struct Scheduled {
5    redis: RedisPool,
6}
7
8impl Scheduled {
9    #[must_use]
10    pub fn new(redis: RedisPool) -> Self {
11        Self { redis }
12    }
13
14    pub async fn enqueue_jobs(
15        &self,
16        now: chrono::DateTime<chrono::Utc>,
17        sorted_sets: &Vec<String>,
18    ) -> Result<usize, Box<dyn std::error::Error>> {
19        let mut n = 0;
20        for sorted_set in sorted_sets {
21            let mut redis = self.redis.get().await?;
22
23            let jobs: Vec<String> = redis
24                .zrangebyscore_limit(sorted_set.clone(), "-inf", now.timestamp(), 0, 10)
25                .await?;
26
27            for job in jobs {
28                if redis.zrem(sorted_set.clone(), job.clone()).await? > 0 {
29                    let work = UnitOfWork::from_job_string(job)?;
30
31                    debug!({
32                        "class" = &work.job.class,
33                        "queue" = &work.queue
34                    },  "Enqueueing job");
35
36                    work.enqueue_direct(&mut redis).await?;
37
38                    n += 1;
39                }
40            }
41        }
42
43        Ok(n)
44    }
45
46    pub async fn enqueue_periodic_jobs(
47        &self,
48        now: chrono::DateTime<chrono::Utc>,
49    ) -> Result<usize, Box<dyn std::error::Error>> {
50        let mut conn = self.redis.get().await?;
51
52        let periodic_jobs: Vec<String> = conn
53            .zrangebyscore_limit("periodic".to_string(), "-inf", now.timestamp(), 0, 100)
54            .await?;
55
56        for periodic_job in &periodic_jobs {
57            let pj = PeriodicJob::from_periodic_job_string(periodic_job.clone())?;
58
59            if pj.update(&mut conn, periodic_job).await? {
60                let job = pj.into_job();
61                let work = UnitOfWork::from_job(job);
62
63                debug!({
64                    "args" = &pj.args,
65                    "class" = &work.job.class,
66                    "queue" = &work.queue,
67                    "name" = &pj.name,
68                    "cron" = &pj.cron,
69                }, "Enqueueing periodic job");
70
71                work.enqueue_direct(&mut conn).await?;
72            }
73        }
74
75        Ok(periodic_jobs.len())
76    }
77}