resident_utils/
sqlx_redis.rs

1use std::{future::Future, time::Duration};
2
3use chrono::prelude::*;
4use cron::Schedule;
5use tokio::{spawn, task::JoinHandle};
6use tokio_util::sync::CancellationToken;
7
8use crate::{execute_sleep, sqlx::SqlxPool, LoopState};
9
10pub fn make_looper<Fut1, Fut2>(
11    pg_pool: SqlxPool,
12    redis_pool: deadpool_redis::Pool,
13    token: CancellationToken,
14    schedule: Schedule,
15    stop_check_duration: Duration,
16    task_function: impl Fn(
17            DateTime<Utc>,
18            SqlxPool,
19            Result<deadpool_redis::Connection, deadpool_redis::PoolError>,
20            CancellationToken,
21        ) -> Fut1
22        + Send
23        + Sync
24        + 'static,
25    stop_function: impl Fn(SqlxPool, Result<deadpool_redis::Connection, deadpool_redis::PoolError>) -> Fut2
26        + Send
27        + Sync
28        + 'static,
29) -> JoinHandle<()>
30where
31    Fut1: Future<Output = LoopState> + Send,
32    Fut2: Future<Output = ()> + Send,
33{
34    spawn(async move {
35        let mut next_tick: DateTime<Utc> = match schedule.upcoming(Utc).next() {
36            Some(next_tick) => next_tick,
37            None => {
38                stop_function(pg_pool.clone(), redis_pool.get().await).await;
39                return;
40            }
41        };
42        loop {
43            // グレースフルストップのチェック
44            if token.is_cancelled() {
45                stop_function(pg_pool.clone(), redis_pool.get().await).await;
46                break;
47            }
48
49            let now = Utc::now();
50            if now >= next_tick {
51                // 定期的に行う処理実行
52                if let Some(res) =
53                    task_function(now, pg_pool.clone(), redis_pool.get().await, token.clone())
54                        .await
55                        .looper(&token, &now, &schedule)
56                {
57                    next_tick = res;
58                } else {
59                    stop_function(pg_pool.clone(), redis_pool.get().await).await;
60                    break;
61                }
62            }
63
64            execute_sleep(&stop_check_duration, &next_tick, &now).await;
65        }
66    })
67}
68
69pub fn make_worker<Fut1, Fut2>(
70    pg_pool: SqlxPool,
71    redis_pool: deadpool_redis::Pool,
72    token: CancellationToken,
73    stop_check_duration: Duration,
74    task_function: impl Fn(
75            DateTime<Utc>,
76            SqlxPool,
77            Result<deadpool_redis::Connection, deadpool_redis::PoolError>,
78            CancellationToken,
79        ) -> Fut1
80        + Send
81        + Sync
82        + 'static,
83    stop_function: impl Fn(SqlxPool, Result<deadpool_redis::Connection, deadpool_redis::PoolError>) -> Fut2
84        + Send
85        + Sync
86        + 'static,
87) -> JoinHandle<()>
88where
89    Fut1: Future<Output = LoopState> + Send,
90    Fut2: Future<Output = ()> + Send,
91{
92    spawn(async move {
93        // 動き出した瞬間は実行する
94        let mut next_tick: DateTime<Utc> = Utc::now();
95        loop {
96            // グレースフルストップのチェック
97            if token.is_cancelled() {
98                stop_function(pg_pool.clone(), redis_pool.get().await).await;
99                break;
100            }
101
102            // 現在時間と次実行する処理の時間をチェックする
103            let now = Utc::now();
104            if now >= next_tick {
105                // 定期的に行う処理実行
106                if let Some(res) =
107                    task_function(now, pg_pool.clone(), redis_pool.get().await, token.clone())
108                        .await
109                        .worker(&token, &now)
110                {
111                    next_tick = res;
112                } else {
113                    stop_function(pg_pool.clone(), redis_pool.get().await).await;
114                    break;
115                }
116            }
117
118            execute_sleep(&stop_check_duration, &next_tick, &now).await;
119        }
120    })
121}