resident_utils/
postgres.rs1pub use deadpool_postgres;
2use std::{future::Future, time::Duration};
3
4use chrono::prelude::*;
5use cron::Schedule;
6use tokio::{spawn, task::JoinHandle};
7use tokio_util::sync::CancellationToken;
8
9use crate::{execute_sleep, LoopState};
10
11pub mod holder;
12
13pub fn make_looper<Fut1, Fut2>(
14 pg_pool: deadpool_postgres::Pool,
15 token: CancellationToken,
16 schedule: Schedule,
17 stop_check_duration: Duration,
18 task_function: impl Fn(
19 DateTime<Utc>,
20 Result<deadpool_postgres::Client, deadpool_postgres::PoolError>,
21 CancellationToken,
22 ) -> Fut1
23 + Send
24 + Sync
25 + 'static,
26 stop_function: impl Fn(Result<deadpool_postgres::Client, deadpool_postgres::PoolError>) -> Fut2
27 + Send
28 + Sync
29 + 'static,
30) -> JoinHandle<()>
31where
32 Fut1: Future<Output = LoopState> + Send,
33 Fut2: Future<Output = ()> + Send,
34{
35 spawn(async move {
36 let mut next_tick: DateTime<Utc> = match schedule.upcoming(Utc).next() {
37 Some(next_tick) => next_tick,
38 None => {
39 stop_function(pg_pool.get().await).await;
40 return;
41 }
42 };
43 loop {
44 if token.is_cancelled() {
46 stop_function(pg_pool.get().await).await;
47 break;
48 }
49
50 let now = Utc::now();
51 if now >= next_tick {
52 if let Some(res) = task_function(now, pg_pool.get().await, token.clone())
54 .await
55 .looper(&token, &now, &schedule)
56 {
57 next_tick = res;
58 } else {
59 stop_function(pg_pool.get().await).await;
60 break;
61 }
62 }
63
64 execute_sleep(&stop_check_duration, &next_tick, &now).await;
65 }
66 })
67}
68
69pub fn make_worker<Fut1, Fut2>(
70 pg_pool: deadpool_postgres::Pool,
71 token: CancellationToken,
72 stop_check_duration: Duration,
73 task_function: impl Fn(
74 DateTime<Utc>,
75 Result<deadpool_postgres::Client, deadpool_postgres::PoolError>,
76 CancellationToken,
77 ) -> Fut1
78 + Send
79 + Sync
80 + 'static,
81 stop_function: impl Fn(Result<deadpool_postgres::Client, deadpool_postgres::PoolError>) -> Fut2
82 + Send
83 + Sync
84 + 'static,
85) -> JoinHandle<()>
86where
87 Fut1: Future<Output = LoopState> + Send,
88 Fut2: Future<Output = ()> + Send,
89{
90 spawn(async move {
91 let mut next_tick: DateTime<Utc> = Utc::now();
93 loop {
94 if token.is_cancelled() {
96 stop_function(pg_pool.get().await).await;
97 break;
98 }
99
100 let now = Utc::now();
102 if now >= next_tick {
103 if let Some(res) = task_function(now, pg_pool.get().await, token.clone())
105 .await
106 .worker(&token, &now)
107 {
108 next_tick = res;
109 } else {
110 stop_function(pg_pool.get().await).await;
111 break;
112 }
113 }
114
115 execute_sleep(&stop_check_duration, &next_tick, &now).await;
116 }
117 })
118}