Skip to main content

rocketmq_runtime/
lib.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17pub enum RocketMQRuntime {
18    Multi(tokio::runtime::Runtime),
19}
20
21impl RocketMQRuntime {
22    #[inline]
23    pub fn new_multi(threads: usize, name: &str) -> Self {
24        Self::Multi(
25            tokio::runtime::Builder::new_multi_thread()
26                .worker_threads(threads)
27                .thread_name(name)
28                .enable_all()
29                .build()
30                .unwrap(),
31        )
32    }
33}
34
35impl RocketMQRuntime {
36    #[inline]
37    pub fn get_handle(&self) -> &tokio::runtime::Handle {
38        match self {
39            Self::Multi(runtime) => runtime.handle(),
40        }
41    }
42
43    #[inline]
44    pub fn get_runtime(&self) -> &tokio::runtime::Runtime {
45        match self {
46            Self::Multi(runtime) => runtime,
47        }
48    }
49
50    #[inline]
51    pub fn shutdown(self) {
52        match self {
53            Self::Multi(runtime) => runtime.shutdown_background(),
54        }
55    }
56
57    #[inline]
58    pub fn shutdown_timeout(self, timeout: Duration) {
59        match self {
60            Self::Multi(runtime) => runtime.shutdown_timeout(timeout),
61        }
62    }
63
64    #[inline]
65    pub fn schedule_at_fixed_rate<F>(&self, task: F, initial_delay: Option<Duration>, period: Duration)
66    where
67        F: Fn() + Send + 'static,
68    {
69        match self {
70            RocketMQRuntime::Multi(runtime) => {
71                runtime.handle().spawn(async move {
72                    // initial delay
73                    if let Some(initial_delay_inner) = initial_delay {
74                        tokio::time::sleep(initial_delay_inner).await;
75                    }
76
77                    loop {
78                        // record current execution time
79                        let current_execution_time = tokio::time::Instant::now();
80                        // execute task
81                        task();
82                        // Calculate the time of the next execution
83                        let next_execution_time = current_execution_time + period;
84
85                        // Wait until the next execution
86                        let delay = next_execution_time.saturating_duration_since(tokio::time::Instant::now());
87                        tokio::time::sleep(delay).await;
88                    }
89                });
90            }
91        }
92    }
93
94    #[inline]
95    pub fn schedule_at_fixed_rate_mut<F>(&self, mut task: F, initial_delay: Option<Duration>, period: Duration)
96    where
97        F: FnMut() + Send + 'static,
98    {
99        match self {
100            RocketMQRuntime::Multi(runtime) => {
101                runtime.handle().spawn(async move {
102                    // initial delay
103                    if let Some(initial_delay_inner) = initial_delay {
104                        tokio::time::sleep(initial_delay_inner).await;
105                    }
106
107                    loop {
108                        // record current execution time
109                        let current_execution_time = tokio::time::Instant::now();
110                        // execute task
111                        task();
112                        // Calculate the time of the next execution
113                        let next_execution_time = current_execution_time + period;
114
115                        // Wait until the next execution
116                        let delay = next_execution_time.saturating_duration_since(tokio::time::Instant::now());
117                        tokio::time::sleep(delay).await;
118                    }
119                });
120            }
121        }
122    }
123}