1use crate::{periodic::PeriodicJob, RedisPool, UnitOfWork};
2use tracing::debug;
3
4pub struct Scheduled {
5 redis: RedisPool,
6}
7
8impl Scheduled {
9 #[must_use]
10 pub fn new(redis: RedisPool) -> Self {
11 Self { redis }
12 }
13
14 pub async fn enqueue_jobs(
15 &self,
16 now: chrono::DateTime<chrono::Utc>,
17 sorted_sets: &Vec<String>,
18 ) -> Result<usize, Box<dyn std::error::Error>> {
19 let mut n = 0;
20 for sorted_set in sorted_sets {
21 let mut redis = self.redis.get().await?;
22
23 let jobs: Vec<String> = redis
24 .zrangebyscore_limit(sorted_set.clone(), "-inf", now.timestamp(), 0, 10)
25 .await?;
26
27 for job in jobs {
28 if redis.zrem(sorted_set.clone(), job.clone()).await? > 0 {
29 let work = UnitOfWork::from_job_string(job)?;
30
31 debug!({
32 "class" = &work.job.class,
33 "queue" = &work.queue
34 }, "Enqueueing job");
35
36 work.enqueue_direct(&mut redis).await?;
37
38 n += 1;
39 }
40 }
41 }
42
43 Ok(n)
44 }
45
46 pub async fn enqueue_periodic_jobs(
47 &self,
48 now: chrono::DateTime<chrono::Utc>,
49 ) -> Result<usize, Box<dyn std::error::Error>> {
50 let mut conn = self.redis.get().await?;
51
52 let periodic_jobs: Vec<String> = conn
53 .zrangebyscore_limit("periodic".to_string(), "-inf", now.timestamp(), 0, 100)
54 .await?;
55
56 for periodic_job in &periodic_jobs {
57 let pj = PeriodicJob::from_periodic_job_string(periodic_job.clone())?;
58
59 if pj.update(&mut conn, periodic_job).await? {
60 let job = pj.into_job();
61 let work = UnitOfWork::from_job(job);
62
63 debug!({
64 "args" = &pj.args,
65 "class" = &work.job.class,
66 "queue" = &work.queue,
67 "name" = &pj.name,
68 "cron" = &pj.cron,
69 }, "Enqueueing periodic job");
70
71 work.enqueue_direct(&mut conn).await?;
72 }
73 }
74
75 Ok(periodic_jobs.len())
76 }
77}