1use std::{future::Future, time::Duration};
2
3use chrono::prelude::*;
4use cron::Schedule;
5use tokio::{spawn, task::JoinHandle};
6use tokio_util::sync::CancellationToken;
7
8use crate::{execute_sleep, LoopState};
9
10pub use sqlx;
11pub mod holder;
12pub type SqlxPool = sqlx::Pool<sqlx::Postgres>;
13
14pub fn make_looper<Fut1, Fut2>(
15 pg_pool: SqlxPool,
16 token: CancellationToken,
17 schedule: Schedule,
18 stop_check_duration: Duration,
19 task_function: impl Fn(DateTime<Utc>, SqlxPool, CancellationToken) -> Fut1 + Send + Sync + 'static,
20 stop_function: impl Fn(SqlxPool) -> Fut2 + Send + Sync + 'static,
21) -> JoinHandle<()>
22where
23 Fut1: Future<Output = LoopState> + Send,
24 Fut2: Future<Output = ()> + Send,
25{
26 spawn(async move {
27 let mut next_tick: DateTime<Utc> = match schedule.upcoming(Utc).next() {
28 Some(next_tick) => next_tick,
29 None => {
30 stop_function(pg_pool.clone()).await;
31 return;
32 }
33 };
34 loop {
35 if token.is_cancelled() {
37 stop_function(pg_pool.clone()).await;
38 break;
39 }
40
41 let now = Utc::now();
42 if now >= next_tick {
43 if let Some(res) = task_function(now, pg_pool.clone(), token.clone())
45 .await
46 .looper(&token, &now, &schedule)
47 {
48 next_tick = res;
49 } else {
50 stop_function(pg_pool.clone()).await;
51 break;
52 }
53 }
54
55 execute_sleep(&stop_check_duration, &next_tick, &now).await;
56 }
57 })
58}
59
60pub fn make_worker<Fut1, Fut2>(
61 pg_pool: SqlxPool,
62 token: CancellationToken,
63 stop_check_duration: Duration,
64 task_function: impl Fn(DateTime<Utc>, SqlxPool, CancellationToken) -> Fut1 + Send + Sync + 'static,
65 stop_function: impl Fn(SqlxPool) -> Fut2 + Send + Sync + 'static,
66) -> JoinHandle<()>
67where
68 Fut1: Future<Output = LoopState> + Send,
69 Fut2: Future<Output = ()> + Send,
70{
71 spawn(async move {
72 let mut next_tick: DateTime<Utc> = Utc::now();
74 loop {
75 if token.is_cancelled() {
77 stop_function(pg_pool.clone()).await;
78 break;
79 }
80
81 let now = Utc::now();
83 if now >= next_tick {
84 if let Some(res) = task_function(now, pg_pool.clone(), token.clone())
86 .await
87 .worker(&token, &now)
88 {
89 next_tick = res;
90 } else {
91 stop_function(pg_pool.clone()).await;
92 break;
93 }
94 }
95
96 execute_sleep(&stop_check_duration, &next_tick, &now).await;
97 }
98 })
99}