resident_utils/
postgres_redis.rs1pub use deadpool_postgres;
2use std::{future::Future, time::Duration};
3
4use chrono::prelude::*;
5use cron::Schedule;
6use tokio::{spawn, task::JoinHandle};
7use tokio_util::sync::CancellationToken;
8
9use crate::{execute_sleep, LoopState};
10
11pub fn make_looper<Fut1, Fut2>(
12 pg_pool: deadpool_postgres::Pool,
13 redis_pool: deadpool_redis::Pool,
14 token: CancellationToken,
15 schedule: Schedule,
16 stop_check_duration: Duration,
17 task_function: impl Fn(
18 DateTime<Utc>,
19 Result<deadpool_postgres::Client, deadpool_postgres::PoolError>,
20 Result<deadpool_redis::Connection, deadpool_redis::PoolError>,
21 CancellationToken,
22 ) -> Fut1
23 + Send
24 + Sync
25 + 'static,
26 stop_function: impl Fn(
27 Result<deadpool_postgres::Client, deadpool_postgres::PoolError>,
28 Result<deadpool_redis::Connection, deadpool_redis::PoolError>,
29 ) -> Fut2
30 + Send
31 + Sync
32 + 'static,
33) -> JoinHandle<()>
34where
35 Fut1: Future<Output = LoopState> + Send,
36 Fut2: Future<Output = ()> + Send,
37{
38 spawn(async move {
39 let mut next_tick: DateTime<Utc> = match schedule.upcoming(Utc).next() {
40 Some(next_tick) => next_tick,
41 None => {
42 stop_function(pg_pool.get().await, redis_pool.get().await).await;
43 return;
44 }
45 };
46 loop {
47 if token.is_cancelled() {
49 stop_function(pg_pool.get().await, redis_pool.get().await).await;
50 break;
51 }
52
53 let now = Utc::now();
54 if now >= next_tick {
55 if let Some(res) = task_function(
57 now,
58 pg_pool.get().await,
59 redis_pool.get().await,
60 token.clone(),
61 )
62 .await
63 .looper(&token, &now, &schedule)
64 {
65 next_tick = res;
66 } else {
67 stop_function(pg_pool.get().await, redis_pool.get().await).await;
68 break;
69 }
70 }
71
72 execute_sleep(&stop_check_duration, &next_tick, &now).await;
73 }
74 })
75}
76
77pub fn make_worker<Fut1, Fut2>(
78 pg_pool: deadpool_postgres::Pool,
79 redis_pool: deadpool_redis::Pool,
80 token: CancellationToken,
81 stop_check_duration: Duration,
82 task_function: impl Fn(
83 DateTime<Utc>,
84 Result<deadpool_postgres::Client, deadpool_postgres::PoolError>,
85 Result<deadpool_redis::Connection, deadpool_redis::PoolError>,
86 CancellationToken,
87 ) -> Fut1
88 + Send
89 + Sync
90 + 'static,
91 stop_function: impl Fn(
92 Result<deadpool_postgres::Client, deadpool_postgres::PoolError>,
93 Result<deadpool_redis::Connection, deadpool_redis::PoolError>,
94 ) -> Fut2
95 + Send
96 + Sync
97 + 'static,
98) -> JoinHandle<()>
99where
100 Fut1: Future<Output = LoopState> + Send,
101 Fut2: Future<Output = ()> + Send,
102{
103 spawn(async move {
104 let mut next_tick: DateTime<Utc> = Utc::now();
106 loop {
107 if token.is_cancelled() {
109 stop_function(pg_pool.get().await, redis_pool.get().await).await;
110 break;
111 }
112
113 let now = Utc::now();
115 if now >= next_tick {
116 if let Some(res) = task_function(
118 now,
119 pg_pool.get().await,
120 redis_pool.get().await,
121 token.clone(),
122 )
123 .await
124 .worker(&token, &now)
125 {
126 next_tick = res;
127 } else {
128 stop_function(pg_pool.get().await, redis_pool.get().await).await;
129 break;
130 }
131 }
132
133 execute_sleep(&stop_check_duration, &next_tick, &now).await;
134 }
135 })
136}