1#[cfg(feature = "postgres")]
2pub mod postgres;
3
4#[cfg(feature = "redis")]
5pub mod redis;
6
7#[cfg(feature = "sqlx")]
8pub mod sqlx;
9
10#[cfg(all(feature = "postgres", feature = "redis"))]
11pub mod postgres_redis;
12
13#[cfg(all(feature = "sqlx", feature = "redis"))]
14pub mod sqlx_redis;
15
16pub mod retry;
17
18use chrono::prelude::*;
19pub use cron::Schedule;
20use std::{future::Future, time::Duration};
21use tokio::{signal::ctrl_c, spawn, task::JoinHandle, time::sleep};
22pub use tokio_util::sync::CancellationToken;
23use tracing::debug;
24
25pub enum LoopState {
33 AllTerminate,
34 Continue,
35 Terminate,
36 Duration(Duration),
37}
38
39impl LoopState {
40 pub(crate) fn looper(
41 &self,
42 token: &CancellationToken,
43 now: &DateTime<Utc>,
44 schedule: &Schedule,
45 ) -> Option<DateTime<Utc>> {
46 match self {
47 LoopState::AllTerminate => {
48 token.cancel();
49 None
50 }
51 LoopState::Terminate => None,
52 LoopState::Duration(duration) => {
53 Some(*now + *duration)
55 }
56 LoopState::Continue => {
57 schedule.upcoming(Utc).next()
59 }
60 }
61 }
62 pub(crate) fn worker(
63 &self,
64 token: &CancellationToken,
65 now: &DateTime<Utc>,
66 ) -> Option<DateTime<Utc>> {
67 match self {
68 LoopState::AllTerminate => {
69 token.cancel();
70 None
71 }
72 LoopState::Terminate => None,
73 LoopState::Duration(duration) => {
74 Some(*now + *duration)
76 }
77 LoopState::Continue => {
78 Some(*now)
80 }
81 }
82 }
83}
84
85#[allow(dead_code)]
87pub(crate) async fn execute_sleep(
88 stop_check_duration: &Duration,
89 next_tick: &DateTime<Utc>,
90 now: &DateTime<Utc>,
91) {
92 if now >= next_tick {
94 return;
95 }
96
97 let tick_duration = Duration::from_secs((*next_tick - *now).num_seconds() as u64);
99 let duration = if stop_check_duration < &tick_duration {
100 stop_check_duration
101 } else {
102 &tick_duration
103 };
104 sleep(*duration).await;
105}
106
107pub fn ctrl_c_handler() -> (JoinHandle<()>, CancellationToken) {
108 let token = CancellationToken::new();
109 let cloned_token = token.clone();
110 (
111 spawn(async move {
112 if let Err(err) = ctrl_c().await {
113 debug!(error = ?err, "ctrl-c error");
114 } else {
115 debug!("received ctrl-c");
116 }
117 cloned_token.cancel();
118 }),
119 token,
120 )
121}
122
123pub fn make_looper<Fut1, Fut2>(
124 token: CancellationToken,
125 schedule: Schedule,
126 stop_check_duration: Duration,
127 task_function: impl Fn(DateTime<Utc>) -> Fut1 + Send + Sync + 'static,
128 stop_function: impl Fn() -> Fut2 + Send + Sync + 'static,
129) -> JoinHandle<()>
130where
131 Fut1: Future<Output = LoopState> + Send,
132 Fut2: Future<Output = ()> + Send,
133{
134 spawn(async move {
135 let mut next_tick: DateTime<Utc> = match schedule.upcoming(Utc).next() {
136 Some(next_tick) => next_tick,
137 None => {
138 stop_function().await;
139 return;
140 }
141 };
142 loop {
143 if token.is_cancelled() {
145 stop_function().await;
146 break;
147 }
148
149 let now = Utc::now();
150 if now >= next_tick {
151 if let Some(res) = task_function(now).await.looper(&token, &now, &schedule) {
153 next_tick = res;
154 } else {
155 stop_function().await;
156 break;
157 }
158 }
159
160 execute_sleep(&stop_check_duration, &next_tick, &now).await;
161 }
162 })
163}
164
165pub fn make_worker<Fut1, Fut2>(
166 token: CancellationToken,
167 stop_check_duration: Duration,
168 task_function: impl Fn(DateTime<Utc>) -> Fut1 + Send + Sync + 'static,
169 stop_function: impl Fn() -> Fut2 + Send + Sync + 'static,
170) -> JoinHandle<()>
171where
172 Fut1: Future<Output = LoopState> + Send,
173 Fut2: Future<Output = ()> + Send,
174{
175 spawn(async move {
176 let mut next_tick: DateTime<Utc> = Utc::now();
178 loop {
179 if token.is_cancelled() {
181 stop_function().await;
182 break;
183 }
184
185 let now = Utc::now();
187 if now >= next_tick {
188 if let Some(res) = task_function(now).await.worker(&token, &now) {
190 next_tick = res;
191 } else {
192 stop_function().await;
193 break;
194 }
195 }
196
197 execute_sleep(&stop_check_duration, &next_tick, &now).await;
198 }
199 })
200}