rocketmq_common/
thread_pool.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18use std::cmp;
19use std::future::Future;
20use std::sync::atomic::AtomicUsize;
21use std::sync::atomic::Ordering;
22use std::time::Duration;
23
24use tokio::runtime::Handle;
25use tokio::task::JoinHandle;
26
27pub struct TokioExecutorService {
28    inner: tokio::runtime::Runtime,
29}
30
31impl Default for TokioExecutorService {
32    fn default() -> Self {
33        Self::new()
34    }
35}
36
37impl TokioExecutorService {
38    pub fn shutdown(self) {
39        self.inner.shutdown_background();
40    }
41
42    pub fn shutdown_timeout(self, timeout: Duration) {
43        self.inner.shutdown_timeout(timeout);
44    }
45}
46
47impl TokioExecutorService {
48    pub fn new() -> TokioExecutorService {
49        TokioExecutorService {
50            inner: tokio::runtime::Builder::new_multi_thread()
51                .worker_threads(num_cpus::get())
52                .enable_all()
53                .build()
54                .unwrap(),
55        }
56    }
57
58    pub fn new_with_config(
59        thread_num: usize,
60        thread_prefix: Option<impl Into<String>>,
61        keep_alive: Duration,
62        max_blocking_threads: usize,
63    ) -> TokioExecutorService {
64        let thread_prefix_inner = if let Some(thread_prefix) = thread_prefix {
65            thread_prefix.into()
66        } else {
67            "rocketmq-thread-".to_string()
68        };
69        TokioExecutorService {
70            inner: tokio::runtime::Builder::new_multi_thread()
71                .worker_threads(thread_num)
72                .thread_keep_alive(keep_alive)
73                .max_blocking_threads(max_blocking_threads)
74                .thread_name_fn(move || {
75                    static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
76                    let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
77                    format!("{thread_prefix_inner}{id}")
78                })
79                .enable_all()
80                .build()
81                .unwrap(),
82        }
83    }
84}
85
86impl TokioExecutorService {
87    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
88    where
89        F: Future + Send + 'static,
90        F::Output: Send + 'static,
91    {
92        self.inner.spawn(future)
93    }
94
95    pub fn get_handle(&self) -> &Handle {
96        self.inner.handle()
97    }
98
99    pub fn block_on<F: Future>(&self, future: F) -> F::Output {
100        self.inner.block_on(future)
101    }
102}
103
104pub struct FuturesExecutorService {
105    inner: futures::executor::ThreadPool,
106}
107impl FuturesExecutorService {
108    pub fn spawn<F>(&self, future: F)
109    where
110        F: Future<Output = ()> + Send + 'static,
111    {
112        self.inner.spawn_ok(future);
113    }
114}
115
116#[derive(Debug, Default)]
117pub struct FuturesExecutorServiceBuilder {
118    pool_size: usize,
119    stack_size: usize,
120    thread_name_prefix: Option<String>,
121}
122
123impl FuturesExecutorServiceBuilder {
124    pub fn new() -> FuturesExecutorServiceBuilder {
125        FuturesExecutorServiceBuilder {
126            pool_size: cmp::max(1, num_cpus::get()),
127            stack_size: 0,
128            thread_name_prefix: None,
129        }
130    }
131
132    pub fn pool_size(mut self, pool_size: usize) -> Self {
133        self.pool_size = pool_size;
134        self
135    }
136
137    pub fn stack_size(mut self, stack_size: usize) -> Self {
138        self.stack_size = stack_size;
139        self
140    }
141
142    pub fn create(&mut self) -> anyhow::Result<FuturesExecutorService> {
143        let thread_pool = futures::executor::ThreadPool::builder()
144            .stack_size(self.stack_size)
145            .pool_size(self.pool_size)
146            .name_prefix(
147                self.thread_name_prefix
148                    .as_ref()
149                    .unwrap_or(&String::from("Default-Executor")),
150            )
151            .create()
152            .unwrap();
153        Ok(FuturesExecutorService { inner: thread_pool })
154    }
155}
156
157pub struct ScheduledExecutorService {
158    inner: tokio::runtime::Runtime,
159}
160
161impl Default for ScheduledExecutorService {
162    fn default() -> Self {
163        Self::new()
164    }
165}
166impl ScheduledExecutorService {
167    pub fn new() -> ScheduledExecutorService {
168        ScheduledExecutorService {
169            inner: tokio::runtime::Builder::new_multi_thread()
170                .worker_threads(num_cpus::get())
171                .enable_all()
172                .build()
173                .unwrap(),
174        }
175    }
176
177    pub fn new_with_config(
178        thread_num: usize,
179        thread_prefix: Option<impl Into<String>>,
180        keep_alive: Duration,
181        max_blocking_threads: usize,
182    ) -> ScheduledExecutorService {
183        let thread_prefix_inner = if let Some(thread_prefix) = thread_prefix {
184            thread_prefix.into()
185        } else {
186            "rocketmq-thread-".to_string()
187        };
188        ScheduledExecutorService {
189            inner: tokio::runtime::Builder::new_multi_thread()
190                .worker_threads(thread_num)
191                .thread_keep_alive(keep_alive)
192                .max_blocking_threads(max_blocking_threads)
193                .thread_name_fn(move || {
194                    static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
195                    let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
196                    format!("{thread_prefix_inner}{id}")
197                })
198                .enable_all()
199                .build()
200                .unwrap(),
201        }
202    }
203
204    pub fn schedule_at_fixed_rate<F>(
205        &self,
206        mut task: F,
207        initial_delay: Option<Duration>,
208        period: Duration,
209    ) where
210        F: FnMut() + Send + 'static,
211    {
212        self.inner.spawn(async move {
213            // initial delay
214
215            if let Some(initial_delay_inner) = initial_delay {
216                tokio::time::sleep(initial_delay_inner).await;
217            }
218
219            loop {
220                // record current execution time
221                let current_execution_time = tokio::time::Instant::now();
222                // execute task
223                task();
224                // Calculate the time of the next execution
225                let next_execution_time = current_execution_time + period;
226
227                // Wait until the next execution
228                let delay =
229                    next_execution_time.saturating_duration_since(tokio::time::Instant::now());
230                tokio::time::sleep(delay).await;
231            }
232        });
233    }
234}
235
236impl ScheduledExecutorService {
237    pub fn shutdown(self) {
238        self.inner.shutdown_background();
239    }
240
241    pub fn shutdown_timeout(self, timeout: Duration) {
242        self.inner.shutdown_timeout(timeout);
243    }
244}