1use redis::aio::ConnectionManager;
2use redis::Client;
3use yq::{DequeueAtAction, DequeueAtStatus, Queue, YqError, YqResult};
4
5pub struct Scheduler {
6 connection_manager: ConnectionManager,
7 dequeue_at_action: DequeueAtAction,
8}
9
10impl Scheduler {
11 pub async fn new(redis_url: &str) -> YqResult<Self> {
12 let client = Client::open(redis_url).map_err(YqError::CreateRedisClient)?;
13 let connection_manager = client
14 .get_tokio_connection_manager()
15 .await
16 .map_err(YqError::GetRedisConn)?;
17
18 let queue = Queue::default();
19
20 Ok(Self {
21 connection_manager,
22 dequeue_at_action: DequeueAtAction::new(queue),
23 })
24 }
25
26 pub async fn run(mut self) -> YqResult<()> {
27 loop {
28 if let Err(err) = self.dequeue_loop().await {
29 tracing::error!("dequeue_at_loop ERROR: {err:?}");
30 tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
31 }
32 }
33 }
34
35 async fn dequeue_loop(&mut self) -> YqResult<()> {
36 loop {
37 let now = time::OffsetDateTime::now_utc();
38
39 let dequeue_at_status: DequeueAtStatus = self
40 .dequeue_at_action
41 .prepare_invoke(now.unix_timestamp())
42 .invoke_async(&mut self.connection_manager)
43 .await
44 .map_err(YqError::DequeueAt)?;
45
46 match dequeue_at_status {
47 DequeueAtStatus::Dequeued(count) => {
48 tracing::trace!("dequeued {count} jobs");
49 }
50 DequeueAtStatus::NoJob => {
51 tracing::trace!("dequeued no jobs");
52 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
53 }
54 DequeueAtStatus::Unknown(err) => {
55 tracing::error!("dequeued ERROR: {err}");
56 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
57 }
58 }
59 }
60 }
61}