resident_utils/
sqlx_redis.rs1use std::{future::Future, time::Duration};
2
3use chrono::prelude::*;
4use cron::Schedule;
5use tokio::{spawn, task::JoinHandle};
6use tokio_util::sync::CancellationToken;
7
8use crate::{execute_sleep, sqlx::SqlxPool, LoopState};
9
10pub fn make_looper<Fut1, Fut2>(
11 pg_pool: SqlxPool,
12 redis_pool: deadpool_redis::Pool,
13 token: CancellationToken,
14 schedule: Schedule,
15 stop_check_duration: Duration,
16 task_function: impl Fn(
17 DateTime<Utc>,
18 SqlxPool,
19 Result<deadpool_redis::Connection, deadpool_redis::PoolError>,
20 CancellationToken,
21 ) -> Fut1
22 + Send
23 + Sync
24 + 'static,
25 stop_function: impl Fn(SqlxPool, Result<deadpool_redis::Connection, deadpool_redis::PoolError>) -> Fut2
26 + Send
27 + Sync
28 + 'static,
29) -> JoinHandle<()>
30where
31 Fut1: Future<Output = LoopState> + Send,
32 Fut2: Future<Output = ()> + Send,
33{
34 spawn(async move {
35 let mut next_tick: DateTime<Utc> = match schedule.upcoming(Utc).next() {
36 Some(next_tick) => next_tick,
37 None => {
38 stop_function(pg_pool.clone(), redis_pool.get().await).await;
39 return;
40 }
41 };
42 loop {
43 if token.is_cancelled() {
45 stop_function(pg_pool.clone(), redis_pool.get().await).await;
46 break;
47 }
48
49 let now = Utc::now();
50 if now >= next_tick {
51 if let Some(res) =
53 task_function(now, pg_pool.clone(), redis_pool.get().await, token.clone())
54 .await
55 .looper(&token, &now, &schedule)
56 {
57 next_tick = res;
58 } else {
59 stop_function(pg_pool.clone(), redis_pool.get().await).await;
60 break;
61 }
62 }
63
64 execute_sleep(&stop_check_duration, &next_tick, &now).await;
65 }
66 })
67}
68
69pub fn make_worker<Fut1, Fut2>(
70 pg_pool: SqlxPool,
71 redis_pool: deadpool_redis::Pool,
72 token: CancellationToken,
73 stop_check_duration: Duration,
74 task_function: impl Fn(
75 DateTime<Utc>,
76 SqlxPool,
77 Result<deadpool_redis::Connection, deadpool_redis::PoolError>,
78 CancellationToken,
79 ) -> Fut1
80 + Send
81 + Sync
82 + 'static,
83 stop_function: impl Fn(SqlxPool, Result<deadpool_redis::Connection, deadpool_redis::PoolError>) -> Fut2
84 + Send
85 + Sync
86 + 'static,
87) -> JoinHandle<()>
88where
89 Fut1: Future<Output = LoopState> + Send,
90 Fut2: Future<Output = ()> + Send,
91{
92 spawn(async move {
93 let mut next_tick: DateTime<Utc> = Utc::now();
95 loop {
96 if token.is_cancelled() {
98 stop_function(pg_pool.clone(), redis_pool.get().await).await;
99 break;
100 }
101
102 let now = Utc::now();
104 if now >= next_tick {
105 if let Some(res) =
107 task_function(now, pg_pool.clone(), redis_pool.get().await, token.clone())
108 .await
109 .worker(&token, &now)
110 {
111 next_tick = res;
112 } else {
113 stop_function(pg_pool.clone(), redis_pool.get().await).await;
114 break;
115 }
116 }
117
118 execute_sleep(&stop_check_duration, &next_tick, &now).await;
119 }
120 })
121}