rocketmq_runtime/lib.rs
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::time::Duration;
19
20pub enum RocketMQRuntime {
21 Multi(tokio::runtime::Runtime),
22}
23
24impl RocketMQRuntime {
25 #[inline]
26 pub fn new_multi(threads: usize, name: &str) -> Self {
27 Self::Multi(
28 tokio::runtime::Builder::new_multi_thread()
29 .worker_threads(threads)
30 .thread_name(name)
31 .enable_all()
32 .build()
33 .unwrap(),
34 )
35 }
36}
37
38impl RocketMQRuntime {
39 #[inline]
40 pub fn get_handle(&self) -> &tokio::runtime::Handle {
41 match self {
42 Self::Multi(runtime) => runtime.handle(),
43 }
44 }
45
46 #[inline]
47 pub fn get_runtime(&self) -> &tokio::runtime::Runtime {
48 match self {
49 Self::Multi(runtime) => runtime,
50 }
51 }
52
53 #[inline]
54 pub fn shutdown(self) {
55 match self {
56 Self::Multi(runtime) => runtime.shutdown_background(),
57 }
58 }
59
60 #[inline]
61 pub fn shutdown_timeout(self, timeout: Duration) {
62 match self {
63 Self::Multi(runtime) => runtime.shutdown_timeout(timeout),
64 }
65 }
66
67 #[inline]
68 pub fn schedule_at_fixed_rate<F>(
69 &self,
70 task: F,
71 initial_delay: Option<Duration>,
72 period: Duration,
73 ) where
74 F: Fn() + Send + 'static,
75 {
76 match self {
77 RocketMQRuntime::Multi(runtime) => {
78 runtime.handle().spawn(async move {
79 // initial delay
80 if let Some(initial_delay_inner) = initial_delay {
81 tokio::time::sleep(initial_delay_inner).await;
82 }
83
84 loop {
85 // record current execution time
86 let current_execution_time = tokio::time::Instant::now();
87 // execute task
88 task();
89 // Calculate the time of the next execution
90 let next_execution_time = current_execution_time + period;
91
92 // Wait until the next execution
93 let delay = next_execution_time
94 .saturating_duration_since(tokio::time::Instant::now());
95 tokio::time::sleep(delay).await;
96 }
97 });
98 }
99 }
100 }
101
102 #[inline]
103 pub fn schedule_at_fixed_rate_mut<F>(
104 &self,
105 mut task: F,
106 initial_delay: Option<Duration>,
107 period: Duration,
108 ) where
109 F: FnMut() + Send + 'static,
110 {
111 match self {
112 RocketMQRuntime::Multi(runtime) => {
113 runtime.handle().spawn(async move {
114 // initial delay
115 if let Some(initial_delay_inner) = initial_delay {
116 tokio::time::sleep(initial_delay_inner).await;
117 }
118
119 loop {
120 // record current execution time
121 let current_execution_time = tokio::time::Instant::now();
122 // execute task
123 task();
124 // Calculate the time of the next execution
125 let next_execution_time = current_execution_time + period;
126
127 // Wait until the next execution
128 let delay = next_execution_time
129 .saturating_duration_since(tokio::time::Instant::now());
130 tokio::time::sleep(delay).await;
131 }
132 });
133 }
134 }
135 }
136}