rocketmq_runtime/
lib.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::time::Duration;
19
20pub enum RocketMQRuntime {
21    Multi(tokio::runtime::Runtime),
22}
23
24impl RocketMQRuntime {
25    #[inline]
26    pub fn new_multi(threads: usize, name: &str) -> Self {
27        Self::Multi(
28            tokio::runtime::Builder::new_multi_thread()
29                .worker_threads(threads)
30                .thread_name(name)
31                .enable_all()
32                .build()
33                .unwrap(),
34        )
35    }
36}
37
38impl RocketMQRuntime {
39    #[inline]
40    pub fn get_handle(&self) -> &tokio::runtime::Handle {
41        match self {
42            Self::Multi(runtime) => runtime.handle(),
43        }
44    }
45
46    #[inline]
47    pub fn get_runtime(&self) -> &tokio::runtime::Runtime {
48        match self {
49            Self::Multi(runtime) => runtime,
50        }
51    }
52
53    #[inline]
54    pub fn shutdown(self) {
55        match self {
56            Self::Multi(runtime) => runtime.shutdown_background(),
57        }
58    }
59
60    #[inline]
61    pub fn shutdown_timeout(self, timeout: Duration) {
62        match self {
63            Self::Multi(runtime) => runtime.shutdown_timeout(timeout),
64        }
65    }
66
67    #[inline]
68    pub fn schedule_at_fixed_rate<F>(
69        &self,
70        task: F,
71        initial_delay: Option<Duration>,
72        period: Duration,
73    ) where
74        F: Fn() + Send + 'static,
75    {
76        match self {
77            RocketMQRuntime::Multi(runtime) => {
78                runtime.handle().spawn(async move {
79                    // initial delay
80                    if let Some(initial_delay_inner) = initial_delay {
81                        tokio::time::sleep(initial_delay_inner).await;
82                    }
83
84                    loop {
85                        // record current execution time
86                        let current_execution_time = tokio::time::Instant::now();
87                        // execute task
88                        task();
89                        // Calculate the time of the next execution
90                        let next_execution_time = current_execution_time + period;
91
92                        // Wait until the next execution
93                        let delay = next_execution_time
94                            .saturating_duration_since(tokio::time::Instant::now());
95                        tokio::time::sleep(delay).await;
96                    }
97                });
98            }
99        }
100    }
101
102    #[inline]
103    pub fn schedule_at_fixed_rate_mut<F>(
104        &self,
105        mut task: F,
106        initial_delay: Option<Duration>,
107        period: Duration,
108    ) where
109        F: FnMut() + Send + 'static,
110    {
111        match self {
112            RocketMQRuntime::Multi(runtime) => {
113                runtime.handle().spawn(async move {
114                    // initial delay
115                    if let Some(initial_delay_inner) = initial_delay {
116                        tokio::time::sleep(initial_delay_inner).await;
117                    }
118
119                    loop {
120                        // record current execution time
121                        let current_execution_time = tokio::time::Instant::now();
122                        // execute task
123                        task();
124                        // Calculate the time of the next execution
125                        let next_execution_time = current_execution_time + period;
126
127                        // Wait until the next execution
128                        let delay = next_execution_time
129                            .saturating_duration_since(tokio::time::Instant::now());
130                        tokio::time::sleep(delay).await;
131                    }
132                });
133            }
134        }
135    }
136}