resident_utils/
postgres.rs

1pub use deadpool_postgres;
2use std::{future::Future, time::Duration};
3
4use chrono::prelude::*;
5use cron::Schedule;
6use tokio::{spawn, task::JoinHandle};
7use tokio_util::sync::CancellationToken;
8
9use crate::{execute_sleep, LoopState};
10
11pub mod holder;
12
13pub fn make_looper<Fut1, Fut2>(
14    pg_pool: deadpool_postgres::Pool,
15    token: CancellationToken,
16    schedule: Schedule,
17    stop_check_duration: Duration,
18    task_function: impl Fn(
19            DateTime<Utc>,
20            Result<deadpool_postgres::Client, deadpool_postgres::PoolError>,
21            CancellationToken,
22        ) -> Fut1
23        + Send
24        + Sync
25        + 'static,
26    stop_function: impl Fn(Result<deadpool_postgres::Client, deadpool_postgres::PoolError>) -> Fut2
27        + Send
28        + Sync
29        + 'static,
30) -> JoinHandle<()>
31where
32    Fut1: Future<Output = LoopState> + Send,
33    Fut2: Future<Output = ()> + Send,
34{
35    spawn(async move {
36        let mut next_tick: DateTime<Utc> = match schedule.upcoming(Utc).next() {
37            Some(next_tick) => next_tick,
38            None => {
39                stop_function(pg_pool.get().await).await;
40                return;
41            }
42        };
43        loop {
44            // グレースフルストップのチェック
45            if token.is_cancelled() {
46                stop_function(pg_pool.get().await).await;
47                break;
48            }
49
50            let now = Utc::now();
51            if now >= next_tick {
52                // 定期的に行う処理実行
53                if let Some(res) = task_function(now, pg_pool.get().await, token.clone())
54                    .await
55                    .looper(&token, &now, &schedule)
56                {
57                    next_tick = res;
58                } else {
59                    stop_function(pg_pool.get().await).await;
60                    break;
61                }
62            }
63
64            execute_sleep(&stop_check_duration, &next_tick, &now).await;
65        }
66    })
67}
68
69pub fn make_worker<Fut1, Fut2>(
70    pg_pool: deadpool_postgres::Pool,
71    token: CancellationToken,
72    stop_check_duration: Duration,
73    task_function: impl Fn(
74            DateTime<Utc>,
75            Result<deadpool_postgres::Client, deadpool_postgres::PoolError>,
76            CancellationToken,
77        ) -> Fut1
78        + Send
79        + Sync
80        + 'static,
81    stop_function: impl Fn(Result<deadpool_postgres::Client, deadpool_postgres::PoolError>) -> Fut2
82        + Send
83        + Sync
84        + 'static,
85) -> JoinHandle<()>
86where
87    Fut1: Future<Output = LoopState> + Send,
88    Fut2: Future<Output = ()> + Send,
89{
90    spawn(async move {
91        // 動き出した瞬間は実行する
92        let mut next_tick: DateTime<Utc> = Utc::now();
93        loop {
94            // グレースフルストップのチェック
95            if token.is_cancelled() {
96                stop_function(pg_pool.get().await).await;
97                break;
98            }
99
100            // 現在時間と次実行する処理の時間をチェックする
101            let now = Utc::now();
102            if now >= next_tick {
103                // 定期的に行う処理実行
104                if let Some(res) = task_function(now, pg_pool.get().await, token.clone())
105                    .await
106                    .worker(&token, &now)
107                {
108                    next_tick = res;
109                } else {
110                    stop_function(pg_pool.get().await).await;
111                    break;
112                }
113            }
114
115            execute_sleep(&stop_check_duration, &next_tick, &now).await;
116        }
117    })
118}