resident_utils/
lib.rs

1#[cfg(feature = "postgres")]
2pub mod postgres;
3
4#[cfg(feature = "redis")]
5pub mod redis;
6
7#[cfg(feature = "sqlx")]
8pub mod sqlx;
9
10#[cfg(all(feature = "postgres", feature = "redis"))]
11pub mod postgres_redis;
12
13#[cfg(all(feature = "sqlx", feature = "redis"))]
14pub mod sqlx_redis;
15
16pub mod retry;
17
18use chrono::prelude::*;
19pub use cron::Schedule;
20use std::{future::Future, time::Duration};
21use tokio::{signal::ctrl_c, spawn, task::JoinHandle, time::sleep};
22pub use tokio_util::sync::CancellationToken;
23use tracing::debug;
24
25///
26/// LoopState
27///   AllTerminate: stop all threads
28///   Continue: continue loop
29///   Terminate: terminate this loop
30///   Duration(duration): sleep duration
31///
32pub enum LoopState {
33    AllTerminate,
34    Continue,
35    Terminate,
36    Duration(Duration),
37}
38
39impl LoopState {
40    pub(crate) fn looper(
41        &self,
42        token: &CancellationToken,
43        now: &DateTime<Utc>,
44        schedule: &Schedule,
45    ) -> Option<DateTime<Utc>> {
46        match self {
47            LoopState::AllTerminate => {
48                token.cancel();
49                None
50            }
51            LoopState::Terminate => None,
52            LoopState::Duration(duration) => {
53                // 指定時間待つ
54                Some(*now + *duration)
55            }
56            LoopState::Continue => {
57                // 次の時間取得
58                schedule.upcoming(Utc).next()
59            }
60        }
61    }
62    pub(crate) fn worker(
63        &self,
64        token: &CancellationToken,
65        now: &DateTime<Utc>,
66    ) -> Option<DateTime<Utc>> {
67        match self {
68            LoopState::AllTerminate => {
69                token.cancel();
70                None
71            }
72            LoopState::Terminate => None,
73            LoopState::Duration(duration) => {
74                // 指定時間待つ
75                Some(*now + *duration)
76            }
77            LoopState::Continue => {
78                // 即時処理を行う
79                Some(*now)
80            }
81        }
82    }
83}
84
85// 次の処理までスリープする
86#[allow(dead_code)]
87pub(crate) async fn execute_sleep(
88    stop_check_duration: &Duration,
89    next_tick: &DateTime<Utc>,
90    now: &DateTime<Utc>,
91) {
92    // next_tickが過去ならsleepせずに終了
93    if now >= next_tick {
94        return;
95    }
96
97    // 上記でチェックしているので、as u64で問題無い。
98    let tick_duration = Duration::from_secs((*next_tick - *now).num_seconds() as u64);
99    let duration = if stop_check_duration < &tick_duration {
100        stop_check_duration
101    } else {
102        &tick_duration
103    };
104    sleep(*duration).await;
105}
106
107pub fn ctrl_c_handler() -> (JoinHandle<()>, CancellationToken) {
108    let token = CancellationToken::new();
109    let cloned_token = token.clone();
110    (
111        spawn(async move {
112            if let Err(err) = ctrl_c().await {
113                debug!(error = ?err, "ctrl-c error");
114            } else {
115                debug!("received ctrl-c");
116            }
117            cloned_token.cancel();
118        }),
119        token,
120    )
121}
122
123pub fn make_looper<Fut1, Fut2>(
124    token: CancellationToken,
125    schedule: Schedule,
126    stop_check_duration: Duration,
127    task_function: impl Fn(DateTime<Utc>) -> Fut1 + Send + Sync + 'static,
128    stop_function: impl Fn() -> Fut2 + Send + Sync + 'static,
129) -> JoinHandle<()>
130where
131    Fut1: Future<Output = LoopState> + Send,
132    Fut2: Future<Output = ()> + Send,
133{
134    spawn(async move {
135        let mut next_tick: DateTime<Utc> = match schedule.upcoming(Utc).next() {
136            Some(next_tick) => next_tick,
137            None => {
138                stop_function().await;
139                return;
140            }
141        };
142        loop {
143            // グレースフルストップのチェック
144            if token.is_cancelled() {
145                stop_function().await;
146                break;
147            }
148
149            let now = Utc::now();
150            if now >= next_tick {
151                // 定期的に行う処理実行
152                if let Some(res) = task_function(now).await.looper(&token, &now, &schedule) {
153                    next_tick = res;
154                } else {
155                    stop_function().await;
156                    break;
157                }
158            }
159
160            execute_sleep(&stop_check_duration, &next_tick, &now).await;
161        }
162    })
163}
164
165pub fn make_worker<Fut1, Fut2>(
166    token: CancellationToken,
167    stop_check_duration: Duration,
168    task_function: impl Fn(DateTime<Utc>) -> Fut1 + Send + Sync + 'static,
169    stop_function: impl Fn() -> Fut2 + Send + Sync + 'static,
170) -> JoinHandle<()>
171where
172    Fut1: Future<Output = LoopState> + Send,
173    Fut2: Future<Output = ()> + Send,
174{
175    spawn(async move {
176        // 動き出した瞬間は実行する
177        let mut next_tick: DateTime<Utc> = Utc::now();
178        loop {
179            // グレースフルストップのチェック
180            if token.is_cancelled() {
181                stop_function().await;
182                break;
183            }
184
185            // 現在時間と次実行する処理の時間をチェックする
186            let now = Utc::now();
187            if now >= next_tick {
188                // 定期的に行う処理実行
189                if let Some(res) = task_function(now).await.worker(&token, &now) {
190                    next_tick = res;
191                } else {
192                    stop_function().await;
193                    break;
194                }
195            }
196
197            execute_sleep(&stop_check_duration, &next_tick, &now).await;
198        }
199    })
200}