resident_utils/
postgres_redis.rs

1pub use deadpool_postgres;
2use std::{future::Future, time::Duration};
3
4use chrono::prelude::*;
5use cron::Schedule;
6use tokio::{spawn, task::JoinHandle};
7use tokio_util::sync::CancellationToken;
8
9use crate::{execute_sleep, LoopState};
10
11pub fn make_looper<Fut1, Fut2>(
12    pg_pool: deadpool_postgres::Pool,
13    redis_pool: deadpool_redis::Pool,
14    token: CancellationToken,
15    schedule: Schedule,
16    stop_check_duration: Duration,
17    task_function: impl Fn(
18            DateTime<Utc>,
19            Result<deadpool_postgres::Client, deadpool_postgres::PoolError>,
20            Result<deadpool_redis::Connection, deadpool_redis::PoolError>,
21            CancellationToken,
22        ) -> Fut1
23        + Send
24        + Sync
25        + 'static,
26    stop_function: impl Fn(
27            Result<deadpool_postgres::Client, deadpool_postgres::PoolError>,
28            Result<deadpool_redis::Connection, deadpool_redis::PoolError>,
29        ) -> Fut2
30        + Send
31        + Sync
32        + 'static,
33) -> JoinHandle<()>
34where
35    Fut1: Future<Output = LoopState> + Send,
36    Fut2: Future<Output = ()> + Send,
37{
38    spawn(async move {
39        let mut next_tick: DateTime<Utc> = match schedule.upcoming(Utc).next() {
40            Some(next_tick) => next_tick,
41            None => {
42                stop_function(pg_pool.get().await, redis_pool.get().await).await;
43                return;
44            }
45        };
46        loop {
47            // グレースフルストップのチェック
48            if token.is_cancelled() {
49                stop_function(pg_pool.get().await, redis_pool.get().await).await;
50                break;
51            }
52
53            let now = Utc::now();
54            if now >= next_tick {
55                // 定期的に行う処理実行
56                if let Some(res) = task_function(
57                    now,
58                    pg_pool.get().await,
59                    redis_pool.get().await,
60                    token.clone(),
61                )
62                .await
63                .looper(&token, &now, &schedule)
64                {
65                    next_tick = res;
66                } else {
67                    stop_function(pg_pool.get().await, redis_pool.get().await).await;
68                    break;
69                }
70            }
71
72            execute_sleep(&stop_check_duration, &next_tick, &now).await;
73        }
74    })
75}
76
77pub fn make_worker<Fut1, Fut2>(
78    pg_pool: deadpool_postgres::Pool,
79    redis_pool: deadpool_redis::Pool,
80    token: CancellationToken,
81    stop_check_duration: Duration,
82    task_function: impl Fn(
83            DateTime<Utc>,
84            Result<deadpool_postgres::Client, deadpool_postgres::PoolError>,
85            Result<deadpool_redis::Connection, deadpool_redis::PoolError>,
86            CancellationToken,
87        ) -> Fut1
88        + Send
89        + Sync
90        + 'static,
91    stop_function: impl Fn(
92            Result<deadpool_postgres::Client, deadpool_postgres::PoolError>,
93            Result<deadpool_redis::Connection, deadpool_redis::PoolError>,
94        ) -> Fut2
95        + Send
96        + Sync
97        + 'static,
98) -> JoinHandle<()>
99where
100    Fut1: Future<Output = LoopState> + Send,
101    Fut2: Future<Output = ()> + Send,
102{
103    spawn(async move {
104        // 動き出した瞬間は実行する
105        let mut next_tick: DateTime<Utc> = Utc::now();
106        loop {
107            // グレースフルストップのチェック
108            if token.is_cancelled() {
109                stop_function(pg_pool.get().await, redis_pool.get().await).await;
110                break;
111            }
112
113            // 現在時間と次実行する処理の時間をチェックする
114            let now = Utc::now();
115            if now >= next_tick {
116                // 定期的に行う処理実行
117                if let Some(res) = task_function(
118                    now,
119                    pg_pool.get().await,
120                    redis_pool.get().await,
121                    token.clone(),
122                )
123                .await
124                .worker(&token, &now)
125                {
126                    next_tick = res;
127                } else {
128                    stop_function(pg_pool.get().await, redis_pool.get().await).await;
129                    break;
130                }
131            }
132
133            execute_sleep(&stop_check_duration, &next_tick, &now).await;
134        }
135    })
136}