rocketmq_common/
thread_pool.rs1use std::cmp;
16use std::future::Future;
17use std::sync::atomic::AtomicUsize;
18use std::sync::atomic::Ordering;
19use std::time::Duration;
20
21use tokio::runtime::Handle;
22use tokio::task::JoinHandle;
23
24pub struct TokioExecutorService {
25 inner: tokio::runtime::Runtime,
26}
27
28impl Default for TokioExecutorService {
29 fn default() -> Self {
30 Self::new()
31 }
32}
33
34impl TokioExecutorService {
35 pub fn shutdown(self) {
36 self.inner.shutdown_background();
37 }
38
39 pub fn shutdown_timeout(self, timeout: Duration) {
40 self.inner.shutdown_timeout(timeout);
41 }
42}
43
44impl TokioExecutorService {
45 pub fn new() -> TokioExecutorService {
46 TokioExecutorService {
47 inner: tokio::runtime::Builder::new_multi_thread()
48 .worker_threads(num_cpus::get())
49 .enable_all()
50 .build()
51 .unwrap(),
52 }
53 }
54
55 pub fn new_with_config(
56 thread_num: usize,
57 thread_prefix: Option<impl Into<String>>,
58 keep_alive: Duration,
59 max_blocking_threads: usize,
60 ) -> TokioExecutorService {
61 let thread_prefix_inner = if let Some(thread_prefix) = thread_prefix {
62 thread_prefix.into()
63 } else {
64 "rocketmq-thread-".to_string()
65 };
66 TokioExecutorService {
67 inner: tokio::runtime::Builder::new_multi_thread()
68 .worker_threads(thread_num)
69 .thread_keep_alive(keep_alive)
70 .max_blocking_threads(max_blocking_threads)
71 .thread_name_fn(move || {
72 static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
73 let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
74 format!("{thread_prefix_inner}{id}")
75 })
76 .enable_all()
77 .build()
78 .unwrap(),
79 }
80 }
81}
82
83impl TokioExecutorService {
84 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
85 where
86 F: Future + Send + 'static,
87 F::Output: Send + 'static,
88 {
89 self.inner.spawn(future)
90 }
91
92 pub fn get_handle(&self) -> &Handle {
93 self.inner.handle()
94 }
95
96 pub fn block_on<F: Future>(&self, future: F) -> F::Output {
97 self.inner.block_on(future)
98 }
99}
100
101pub struct FuturesExecutorService {
102 inner: futures::executor::ThreadPool,
103}
104impl FuturesExecutorService {
105 pub fn spawn<F>(&self, future: F)
106 where
107 F: Future<Output = ()> + Send + 'static,
108 {
109 self.inner.spawn_ok(future);
110 }
111}
112
113#[derive(Debug, Default)]
114pub struct FuturesExecutorServiceBuilder {
115 pool_size: usize,
116 stack_size: usize,
117 thread_name_prefix: Option<String>,
118}
119
120impl FuturesExecutorServiceBuilder {
121 pub fn new() -> FuturesExecutorServiceBuilder {
122 FuturesExecutorServiceBuilder {
123 pool_size: cmp::max(1, num_cpus::get()),
124 stack_size: 0,
125 thread_name_prefix: None,
126 }
127 }
128
129 pub fn pool_size(mut self, pool_size: usize) -> Self {
130 self.pool_size = pool_size;
131 self
132 }
133
134 pub fn stack_size(mut self, stack_size: usize) -> Self {
135 self.stack_size = stack_size;
136 self
137 }
138
139 pub fn create(&mut self) -> anyhow::Result<FuturesExecutorService> {
140 let thread_pool = futures::executor::ThreadPool::builder()
141 .stack_size(self.stack_size)
142 .pool_size(self.pool_size)
143 .name_prefix(
144 self.thread_name_prefix
145 .as_ref()
146 .unwrap_or(&String::from("Default-Executor")),
147 )
148 .create()
149 .unwrap();
150 Ok(FuturesExecutorService { inner: thread_pool })
151 }
152}
153
154pub struct ScheduledExecutorService {
155 inner: tokio::runtime::Runtime,
156}
157
158impl Default for ScheduledExecutorService {
159 fn default() -> Self {
160 Self::new()
161 }
162}
163impl ScheduledExecutorService {
164 pub fn new() -> ScheduledExecutorService {
165 ScheduledExecutorService {
166 inner: tokio::runtime::Builder::new_multi_thread()
167 .worker_threads(num_cpus::get())
168 .enable_all()
169 .build()
170 .unwrap(),
171 }
172 }
173
174 pub fn new_with_config(
175 thread_num: usize,
176 thread_prefix: Option<impl Into<String>>,
177 keep_alive: Duration,
178 max_blocking_threads: usize,
179 ) -> ScheduledExecutorService {
180 let thread_prefix_inner = if let Some(thread_prefix) = thread_prefix {
181 thread_prefix.into()
182 } else {
183 "rocketmq-thread-".to_string()
184 };
185 ScheduledExecutorService {
186 inner: tokio::runtime::Builder::new_multi_thread()
187 .worker_threads(thread_num)
188 .thread_keep_alive(keep_alive)
189 .max_blocking_threads(max_blocking_threads)
190 .thread_name_fn(move || {
191 static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
192 let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
193 format!("{thread_prefix_inner}{id}")
194 })
195 .enable_all()
196 .build()
197 .unwrap(),
198 }
199 }
200
201 pub fn schedule_at_fixed_rate<F>(&self, mut task: F, initial_delay: Option<Duration>, period: Duration)
202 where
203 F: FnMut() + Send + 'static,
204 {
205 self.inner.spawn(async move {
206 if let Some(initial_delay_inner) = initial_delay {
209 tokio::time::sleep(initial_delay_inner).await;
210 }
211
212 loop {
213 let current_execution_time = tokio::time::Instant::now();
215 task();
217 let next_execution_time = current_execution_time + period;
219
220 let delay = next_execution_time.saturating_duration_since(tokio::time::Instant::now());
222 tokio::time::sleep(delay).await;
223 }
224 });
225 }
226}
227
228impl ScheduledExecutorService {
229 pub fn shutdown(self) {
230 self.inner.shutdown_background();
231 }
232
233 pub fn shutdown_timeout(self, timeout: Duration) {
234 self.inner.shutdown_timeout(timeout);
235 }
236}