resident_utils/
sqlx.rs

1use std::{future::Future, time::Duration};
2
3use chrono::prelude::*;
4use cron::Schedule;
5use tokio::{spawn, task::JoinHandle};
6use tokio_util::sync::CancellationToken;
7
8use crate::{execute_sleep, LoopState};
9
10pub use sqlx;
11pub mod holder;
12pub type SqlxPool = sqlx::Pool<sqlx::Postgres>;
13
14pub fn make_looper<Fut1, Fut2>(
15    pg_pool: SqlxPool,
16    token: CancellationToken,
17    schedule: Schedule,
18    stop_check_duration: Duration,
19    task_function: impl Fn(DateTime<Utc>, SqlxPool, CancellationToken) -> Fut1 + Send + Sync + 'static,
20    stop_function: impl Fn(SqlxPool) -> Fut2 + Send + Sync + 'static,
21) -> JoinHandle<()>
22where
23    Fut1: Future<Output = LoopState> + Send,
24    Fut2: Future<Output = ()> + Send,
25{
26    spawn(async move {
27        let mut next_tick: DateTime<Utc> = match schedule.upcoming(Utc).next() {
28            Some(next_tick) => next_tick,
29            None => {
30                stop_function(pg_pool.clone()).await;
31                return;
32            }
33        };
34        loop {
35            // グレースフルストップのチェック
36            if token.is_cancelled() {
37                stop_function(pg_pool.clone()).await;
38                break;
39            }
40
41            let now = Utc::now();
42            if now >= next_tick {
43                // 定期的に行う処理実行
44                if let Some(res) = task_function(now, pg_pool.clone(), token.clone())
45                    .await
46                    .looper(&token, &now, &schedule)
47                {
48                    next_tick = res;
49                } else {
50                    stop_function(pg_pool.clone()).await;
51                    break;
52                }
53            }
54
55            execute_sleep(&stop_check_duration, &next_tick, &now).await;
56        }
57    })
58}
59
60pub fn make_worker<Fut1, Fut2>(
61    pg_pool: SqlxPool,
62    token: CancellationToken,
63    stop_check_duration: Duration,
64    task_function: impl Fn(DateTime<Utc>, SqlxPool, CancellationToken) -> Fut1 + Send + Sync + 'static,
65    stop_function: impl Fn(SqlxPool) -> Fut2 + Send + Sync + 'static,
66) -> JoinHandle<()>
67where
68    Fut1: Future<Output = LoopState> + Send,
69    Fut2: Future<Output = ()> + Send,
70{
71    spawn(async move {
72        // 動き出した瞬間は実行する
73        let mut next_tick: DateTime<Utc> = Utc::now();
74        loop {
75            // グレースフルストップのチェック
76            if token.is_cancelled() {
77                stop_function(pg_pool.clone()).await;
78                break;
79            }
80
81            // 現在時間と次実行する処理の時間をチェックする
82            let now = Utc::now();
83            if now >= next_tick {
84                // 定期的に行う処理実行
85                if let Some(res) = task_function(now, pg_pool.clone(), token.clone())
86                    .await
87                    .worker(&token, &now)
88                {
89                    next_tick = res;
90                } else {
91                    stop_function(pg_pool.clone()).await;
92                    break;
93                }
94            }
95
96            execute_sleep(&stop_check_duration, &next_tick, &now).await;
97        }
98    })
99}