Skip to main content

rocketmq_common/
thread_pool.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp;
16use std::future::Future;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering;
19use std::time::Duration;
20
21use tokio::runtime::Handle;
22use tokio::task::JoinHandle;
23
24pub struct TokioExecutorService {
25    inner: tokio::runtime::Runtime,
26}
27
28impl Default for TokioExecutorService {
29    fn default() -> Self {
30        Self::new()
31    }
32}
33
34impl TokioExecutorService {
35    pub fn shutdown(self) {
36        self.inner.shutdown_background();
37    }
38
39    pub fn shutdown_timeout(self, timeout: Duration) {
40        self.inner.shutdown_timeout(timeout);
41    }
42}
43
44impl TokioExecutorService {
45    pub fn new() -> TokioExecutorService {
46        TokioExecutorService {
47            inner: tokio::runtime::Builder::new_multi_thread()
48                .worker_threads(num_cpus::get())
49                .enable_all()
50                .build()
51                .unwrap(),
52        }
53    }
54
55    pub fn new_with_config(
56        thread_num: usize,
57        thread_prefix: Option<impl Into<String>>,
58        keep_alive: Duration,
59        max_blocking_threads: usize,
60    ) -> TokioExecutorService {
61        let thread_prefix_inner = if let Some(thread_prefix) = thread_prefix {
62            thread_prefix.into()
63        } else {
64            "rocketmq-thread-".to_string()
65        };
66        TokioExecutorService {
67            inner: tokio::runtime::Builder::new_multi_thread()
68                .worker_threads(thread_num)
69                .thread_keep_alive(keep_alive)
70                .max_blocking_threads(max_blocking_threads)
71                .thread_name_fn(move || {
72                    static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
73                    let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
74                    format!("{thread_prefix_inner}{id}")
75                })
76                .enable_all()
77                .build()
78                .unwrap(),
79        }
80    }
81}
82
83impl TokioExecutorService {
84    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
85    where
86        F: Future + Send + 'static,
87        F::Output: Send + 'static,
88    {
89        self.inner.spawn(future)
90    }
91
92    pub fn get_handle(&self) -> &Handle {
93        self.inner.handle()
94    }
95
96    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
97        self.inner.block_on(future)
98    }
99}
100
101pub struct FuturesExecutorService {
102    inner: futures::executor::ThreadPool,
103}
104impl FuturesExecutorService {
105    pub fn spawn<F>(&self, future: F)
106    where
107        F: Future<Output = ()> + Send + 'static,
108    {
109        self.inner.spawn_ok(future);
110    }
111}
112
113#[derive(Debug, Default)]
114pub struct FuturesExecutorServiceBuilder {
115    pool_size: usize,
116    stack_size: usize,
117    thread_name_prefix: Option<String>,
118}
119
120impl FuturesExecutorServiceBuilder {
121    pub fn new() -> FuturesExecutorServiceBuilder {
122        FuturesExecutorServiceBuilder {
123            pool_size: cmp::max(1, num_cpus::get()),
124            stack_size: 0,
125            thread_name_prefix: None,
126        }
127    }
128
129    pub fn pool_size(mut self, pool_size: usize) -> Self {
130        self.pool_size = pool_size;
131        self
132    }
133
134    pub fn stack_size(mut self, stack_size: usize) -> Self {
135        self.stack_size = stack_size;
136        self
137    }
138
139    pub fn create(&mut self) -> anyhow::Result<FuturesExecutorService> {
140        let thread_pool = futures::executor::ThreadPool::builder()
141            .stack_size(self.stack_size)
142            .pool_size(self.pool_size)
143            .name_prefix(
144                self.thread_name_prefix
145                    .as_ref()
146                    .unwrap_or(&String::from("Default-Executor")),
147            )
148            .create()
149            .unwrap();
150        Ok(FuturesExecutorService { inner: thread_pool })
151    }
152}
153
154pub struct ScheduledExecutorService {
155    inner: tokio::runtime::Runtime,
156}
157
158impl Default for ScheduledExecutorService {
159    fn default() -> Self {
160        Self::new()
161    }
162}
163impl ScheduledExecutorService {
164    pub fn new() -> ScheduledExecutorService {
165        ScheduledExecutorService {
166            inner: tokio::runtime::Builder::new_multi_thread()
167                .worker_threads(num_cpus::get())
168                .enable_all()
169                .build()
170                .unwrap(),
171        }
172    }
173
174    pub fn new_with_config(
175        thread_num: usize,
176        thread_prefix: Option<impl Into<String>>,
177        keep_alive: Duration,
178        max_blocking_threads: usize,
179    ) -> ScheduledExecutorService {
180        let thread_prefix_inner = if let Some(thread_prefix) = thread_prefix {
181            thread_prefix.into()
182        } else {
183            "rocketmq-thread-".to_string()
184        };
185        ScheduledExecutorService {
186            inner: tokio::runtime::Builder::new_multi_thread()
187                .worker_threads(thread_num)
188                .thread_keep_alive(keep_alive)
189                .max_blocking_threads(max_blocking_threads)
190                .thread_name_fn(move || {
191                    static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
192                    let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
193                    format!("{thread_prefix_inner}{id}")
194                })
195                .enable_all()
196                .build()
197                .unwrap(),
198        }
199    }
200
201    pub fn schedule_at_fixed_rate<F>(&self, mut task: F, initial_delay: Option<Duration>, period: Duration)
202    where
203        F: FnMut() + Send + 'static,
204    {
205        self.inner.spawn(async move {
206            // initial delay
207
208            if let Some(initial_delay_inner) = initial_delay {
209                tokio::time::sleep(initial_delay_inner).await;
210            }
211
212            loop {
213                // record current execution time
214                let current_execution_time = tokio::time::Instant::now();
215                // execute task
216                task();
217                // Calculate the time of the next execution
218                let next_execution_time = current_execution_time + period;
219
220                // Wait until the next execution
221                let delay = next_execution_time.saturating_duration_since(tokio::time::Instant::now());
222                tokio::time::sleep(delay).await;
223            }
224        });
225    }
226}
227
228impl ScheduledExecutorService {
229    pub fn shutdown(self) {
230        self.inner.shutdown_background();
231    }
232
233    pub fn shutdown_timeout(self, timeout: Duration) {
234        self.inner.shutdown_timeout(timeout);
235    }
236}