#[cfg(feature = "postgres")]
pub mod postgres;
#[cfg(feature = "redis")]
pub mod redis;
#[cfg(feature = "sqlx")]
pub mod sqlx;
#[cfg(all(feature = "postgres", feature = "redis"))]
pub mod postgres_redis;
#[cfg(all(feature = "sqlx", feature = "redis"))]
pub mod sqlx_redis;
pub mod retry;
use chrono::prelude::*;
pub use cron::Schedule;
use std::{future::Future, time::Duration};
use tokio::{signal::ctrl_c, spawn, task::JoinHandle, time::sleep};
pub use tokio_util::sync::CancellationToken;
use tracing::debug;
pub enum LoopState {
AllTerminate,
Continue,
Terminate,
Duration(Duration),
}
impl LoopState {
pub(crate) fn looper(
&self,
token: &CancellationToken,
now: &DateTime<Utc>,
schedule: &Schedule,
) -> Option<DateTime<Utc>> {
match self {
LoopState::AllTerminate => {
token.cancel();
None
}
LoopState::Terminate => None,
LoopState::Duration(duration) => {
Some(*now + *duration)
}
LoopState::Continue => {
schedule.upcoming(Utc).next()
}
}
}
pub(crate) fn worker(
&self,
token: &CancellationToken,
now: &DateTime<Utc>,
) -> Option<DateTime<Utc>> {
match self {
LoopState::AllTerminate => {
token.cancel();
None
}
LoopState::Terminate => None,
LoopState::Duration(duration) => {
Some(*now + *duration)
}
LoopState::Continue => {
Some(*now)
}
}
}
}
#[allow(dead_code)]
pub(crate) async fn execute_sleep(
stop_check_duration: &Duration,
next_tick: &DateTime<Utc>,
now: &DateTime<Utc>,
) {
if now >= next_tick {
return;
}
let tick_duration = Duration::from_secs((*next_tick - *now).num_seconds() as u64);
let duration = if stop_check_duration < &tick_duration {
stop_check_duration
} else {
&tick_duration
};
sleep(*duration).await;
}
pub fn ctrl_c_handler() -> (JoinHandle<()>, CancellationToken) {
let token = CancellationToken::new();
let cloned_token = token.clone();
(
spawn(async move {
if let Err(err) = ctrl_c().await {
debug!(error = ?err, "ctrl-c error");
} else {
debug!("received ctrl-c");
}
cloned_token.cancel();
}),
token,
)
}
pub fn make_looper<Fut1, Fut2>(
token: CancellationToken,
schedule: Schedule,
stop_check_duration: Duration,
task_function: impl Fn(DateTime<Utc>) -> Fut1 + Send + Sync + 'static,
stop_function: impl Fn() -> Fut2 + Send + Sync + 'static,
) -> JoinHandle<()>
where
Fut1: Future<Output = LoopState> + Send,
Fut2: Future<Output = ()> + Send,
{
spawn(async move {
let mut next_tick: DateTime<Utc> = match schedule.upcoming(Utc).next() {
Some(next_tick) => next_tick,
None => {
stop_function().await;
return;
}
};
loop {
if token.is_cancelled() {
stop_function().await;
break;
}
let now = Utc::now();
if now >= next_tick {
if let Some(res) = task_function(now).await.looper(&token, &now, &schedule) {
next_tick = res;
} else {
stop_function().await;
break;
}
}
execute_sleep(&stop_check_duration, &next_tick, &now).await;
}
})
}
pub fn make_worker<Fut1, Fut2>(
token: CancellationToken,
stop_check_duration: Duration,
task_function: impl Fn(DateTime<Utc>) -> Fut1 + Send + Sync + 'static,
stop_function: impl Fn() -> Fut2 + Send + Sync + 'static,
) -> JoinHandle<()>
where
Fut1: Future<Output = LoopState> + Send,
Fut2: Future<Output = ()> + Send,
{
spawn(async move {
let mut next_tick: DateTime<Utc> = Utc::now();
loop {
if token.is_cancelled() {
stop_function().await;
break;
}
let now = Utc::now();
if now >= next_tick {
if let Some(res) = task_function(now).await.worker(&token, &now) {
next_tick = res;
} else {
stop_function().await;
break;
}
}
execute_sleep(&stop_check_duration, &next_tick, &now).await;
}
})
}