1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
use crate::{periodic::PeriodicJob, RedisPool, UnitOfWork};
use slog::debug;

pub struct Scheduled {
    redis: RedisPool,
    logger: slog::Logger,
}

impl Scheduled {
    pub fn new(redis: RedisPool, logger: slog::Logger) -> Self {
        Self { redis, logger }
    }

    pub async fn enqueue_jobs(
        &self,
        now: chrono::DateTime<chrono::Utc>,
        sorted_sets: &Vec<String>,
    ) -> Result<usize, Box<dyn std::error::Error>> {
        let mut n = 0;
        for sorted_set in sorted_sets {
            let mut redis = self.redis.get().await?;

            let jobs: Vec<String> = redis
                .zrangebyscore_limit(sorted_set.clone(), "-inf", now.timestamp(), 0, 100)
                .await?;

            n += jobs.len();

            for job in jobs {
                if redis.zrem(sorted_set.clone(), job.clone()).await? {
                    let work = UnitOfWork::from_job_string(job)?;

                    debug!(self.logger, "Enqueueing job";
                        "class" => &work.job.class,
                        "queue" => &work.queue
                    );

                    work.enqueue_direct(&mut redis).await?;
                }
            }
        }

        Ok(n)
    }

    pub async fn enqueue_periodic_jobs(
        &self,
        now: chrono::DateTime<chrono::Utc>,
    ) -> Result<usize, Box<dyn std::error::Error>> {
        let mut conn = self.redis.get().await?;

        let periodic_jobs: Vec<String> = conn
            .zrangebyscore_limit("periodic".to_string(), "-inf", now.timestamp(), 0, 100)
            .await?;

        for periodic_job in &periodic_jobs {
            let pj = PeriodicJob::from_periodic_job_string(periodic_job.clone())?;

            if pj.update(&mut conn, periodic_job).await? {
                let job = pj.into_job();
                let work = UnitOfWork::from_job(job);

                debug!(self.logger, "Enqueueing periodic job";
                    "args" => &pj.args,
                    "class" => &work.job.class,
                    "queue" => &work.queue,
                    "name" => &pj.name,
                    "cron" => &pj.cron,
                );

                work.enqueue_direct(&mut conn).await?;
            }
        }

        Ok(periodic_jobs.len())
    }
}