yq_scheduler/
lib.rs

1use redis::aio::ConnectionManager;
2use redis::Client;
3use yq::{DequeueAtAction, DequeueAtStatus, Queue, YqError, YqResult};
4
5pub struct Scheduler {
6    connection_manager: ConnectionManager,
7    dequeue_at_action: DequeueAtAction,
8}
9
10impl Scheduler {
11    pub async fn new(redis_url: &str) -> YqResult<Self> {
12        let client = Client::open(redis_url).map_err(YqError::CreateRedisClient)?;
13        let connection_manager = client
14            .get_tokio_connection_manager()
15            .await
16            .map_err(YqError::GetRedisConn)?;
17
18        let queue = Queue::default();
19
20        Ok(Self {
21            connection_manager,
22            dequeue_at_action: DequeueAtAction::new(queue),
23        })
24    }
25
26    pub async fn run(mut self) -> YqResult<()> {
27        loop {
28            if let Err(err) = self.dequeue_loop().await {
29                tracing::error!("dequeue_at_loop ERROR: {err:?}");
30                tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
31            }
32        }
33    }
34
35    async fn dequeue_loop(&mut self) -> YqResult<()> {
36        loop {
37            let now = time::OffsetDateTime::now_utc();
38
39            let dequeue_at_status: DequeueAtStatus = self
40                .dequeue_at_action
41                .prepare_invoke(now.unix_timestamp())
42                .invoke_async(&mut self.connection_manager)
43                .await
44                .map_err(YqError::DequeueAt)?;
45
46            match dequeue_at_status {
47                DequeueAtStatus::Dequeued(count) => {
48                    tracing::trace!("dequeued {count} jobs");
49                }
50                DequeueAtStatus::NoJob => {
51                    tracing::trace!("dequeued no jobs");
52                    tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
53                }
54                DequeueAtStatus::Unknown(err) => {
55                    tracing::error!("dequeued ERROR: {err}");
56                    tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
57                }
58            }
59        }
60    }
61}