rocketmq_common/
thread_pool.rs1use std::cmp;
19use std::future::Future;
20use std::sync::atomic::AtomicUsize;
21use std::sync::atomic::Ordering;
22use std::time::Duration;
23
24use tokio::runtime::Handle;
25use tokio::task::JoinHandle;
26
27pub struct TokioExecutorService {
28 inner: tokio::runtime::Runtime,
29}
30
31impl Default for TokioExecutorService {
32 fn default() -> Self {
33 Self::new()
34 }
35}
36
37impl TokioExecutorService {
38 pub fn shutdown(self) {
39 self.inner.shutdown_background();
40 }
41
42 pub fn shutdown_timeout(self, timeout: Duration) {
43 self.inner.shutdown_timeout(timeout);
44 }
45}
46
47impl TokioExecutorService {
48 pub fn new() -> TokioExecutorService {
49 TokioExecutorService {
50 inner: tokio::runtime::Builder::new_multi_thread()
51 .worker_threads(num_cpus::get())
52 .enable_all()
53 .build()
54 .unwrap(),
55 }
56 }
57
58 pub fn new_with_config(
59 thread_num: usize,
60 thread_prefix: Option<impl Into<String>>,
61 keep_alive: Duration,
62 max_blocking_threads: usize,
63 ) -> TokioExecutorService {
64 let thread_prefix_inner = if let Some(thread_prefix) = thread_prefix {
65 thread_prefix.into()
66 } else {
67 "rocketmq-thread-".to_string()
68 };
69 TokioExecutorService {
70 inner: tokio::runtime::Builder::new_multi_thread()
71 .worker_threads(thread_num)
72 .thread_keep_alive(keep_alive)
73 .max_blocking_threads(max_blocking_threads)
74 .thread_name_fn(move || {
75 static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
76 let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
77 format!("{thread_prefix_inner}{id}")
78 })
79 .enable_all()
80 .build()
81 .unwrap(),
82 }
83 }
84}
85
86impl TokioExecutorService {
87 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
88 where
89 F: Future + Send + 'static,
90 F::Output: Send + 'static,
91 {
92 self.inner.spawn(future)
93 }
94
95 pub fn get_handle(&self) -> &Handle {
96 self.inner.handle()
97 }
98
99 pub fn block_on<F: Future>(&self, future: F) -> F::Output {
100 self.inner.block_on(future)
101 }
102}
103
104pub struct FuturesExecutorService {
105 inner: futures::executor::ThreadPool,
106}
107impl FuturesExecutorService {
108 pub fn spawn<F>(&self, future: F)
109 where
110 F: Future<Output = ()> + Send + 'static,
111 {
112 self.inner.spawn_ok(future);
113 }
114}
115
116#[derive(Debug, Default)]
117pub struct FuturesExecutorServiceBuilder {
118 pool_size: usize,
119 stack_size: usize,
120 thread_name_prefix: Option<String>,
121}
122
123impl FuturesExecutorServiceBuilder {
124 pub fn new() -> FuturesExecutorServiceBuilder {
125 FuturesExecutorServiceBuilder {
126 pool_size: cmp::max(1, num_cpus::get()),
127 stack_size: 0,
128 thread_name_prefix: None,
129 }
130 }
131
132 pub fn pool_size(mut self, pool_size: usize) -> Self {
133 self.pool_size = pool_size;
134 self
135 }
136
137 pub fn stack_size(mut self, stack_size: usize) -> Self {
138 self.stack_size = stack_size;
139 self
140 }
141
142 pub fn create(&mut self) -> anyhow::Result<FuturesExecutorService> {
143 let thread_pool = futures::executor::ThreadPool::builder()
144 .stack_size(self.stack_size)
145 .pool_size(self.pool_size)
146 .name_prefix(
147 self.thread_name_prefix
148 .as_ref()
149 .unwrap_or(&String::from("Default-Executor")),
150 )
151 .create()
152 .unwrap();
153 Ok(FuturesExecutorService { inner: thread_pool })
154 }
155}
156
157pub struct ScheduledExecutorService {
158 inner: tokio::runtime::Runtime,
159}
160
161impl Default for ScheduledExecutorService {
162 fn default() -> Self {
163 Self::new()
164 }
165}
166impl ScheduledExecutorService {
167 pub fn new() -> ScheduledExecutorService {
168 ScheduledExecutorService {
169 inner: tokio::runtime::Builder::new_multi_thread()
170 .worker_threads(num_cpus::get())
171 .enable_all()
172 .build()
173 .unwrap(),
174 }
175 }
176
177 pub fn new_with_config(
178 thread_num: usize,
179 thread_prefix: Option<impl Into<String>>,
180 keep_alive: Duration,
181 max_blocking_threads: usize,
182 ) -> ScheduledExecutorService {
183 let thread_prefix_inner = if let Some(thread_prefix) = thread_prefix {
184 thread_prefix.into()
185 } else {
186 "rocketmq-thread-".to_string()
187 };
188 ScheduledExecutorService {
189 inner: tokio::runtime::Builder::new_multi_thread()
190 .worker_threads(thread_num)
191 .thread_keep_alive(keep_alive)
192 .max_blocking_threads(max_blocking_threads)
193 .thread_name_fn(move || {
194 static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
195 let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
196 format!("{thread_prefix_inner}{id}")
197 })
198 .enable_all()
199 .build()
200 .unwrap(),
201 }
202 }
203
204 pub fn schedule_at_fixed_rate<F>(
205 &self,
206 mut task: F,
207 initial_delay: Option<Duration>,
208 period: Duration,
209 ) where
210 F: FnMut() + Send + 'static,
211 {
212 self.inner.spawn(async move {
213 if let Some(initial_delay_inner) = initial_delay {
216 tokio::time::sleep(initial_delay_inner).await;
217 }
218
219 loop {
220 let current_execution_time = tokio::time::Instant::now();
222 task();
224 let next_execution_time = current_execution_time + period;
226
227 let delay =
229 next_execution_time.saturating_duration_since(tokio::time::Instant::now());
230 tokio::time::sleep(delay).await;
231 }
232 });
233 }
234}
235
236impl ScheduledExecutorService {
237 pub fn shutdown(self) {
238 self.inner.shutdown_background();
239 }
240
241 pub fn shutdown_timeout(self, timeout: Duration) {
242 self.inner.shutdown_timeout(timeout);
243 }
244}