nacos_sdk/common/executor/
mod.rs1use crate::api::error::Result;
2use futures::Future;
3use tokio::{
4 runtime::{Builder, Runtime},
5 task::JoinHandle,
6 time::{Duration, interval, sleep},
7};
8use tracing::{Instrument, error};
9
10static COMMON_THREAD_CORES: std::sync::LazyLock<usize> = std::sync::LazyLock::new(|| {
11 std::env::var(crate::api::constants::ENV_NACOS_CLIENT_COMMON_THREAD_CORES)
12 .ok()
13 .and_then(|v| v.parse::<usize>().ok().filter(|n| *n > 0))
14 .unwrap_or(1)
15});
16
17static RT: std::sync::LazyLock<Runtime> = std::sync::LazyLock::new(|| {
18 Builder::new_multi_thread()
19 .enable_all()
20 .thread_name("nacos-client-thread-pool")
21 .worker_threads(*COMMON_THREAD_CORES)
22 .build()
23 .unwrap()
24});
25
26pub(crate) fn spawn<F>(future: F) -> JoinHandle<F::Output>
27where
28 F: Future + Send + 'static,
29 F::Output: Send + 'static,
30{
31 RT.spawn(future)
32}
33
34#[allow(dead_code)]
35pub(crate) fn schedule<F>(future: F, delay: Duration) -> JoinHandle<F::Output>
36where
37 F: Future + Send + 'static,
38 F::Output: Send + 'static,
39{
40 RT.spawn(async move {
41 sleep(delay).await;
42 future.await
43 })
44}
45
46#[allow(dead_code)]
47pub(crate) fn schedule_at_fixed_rate(
48 task: impl Fn() -> Result<()> + Send + Sync + 'static,
49 duration: Duration,
50) -> JoinHandle<()> {
51 RT.spawn(
52 async move {
53 loop {
54 let ret = async { task() }.await;
55 if let Err(e) = ret {
56 error!("schedule_at_fixed_rate occur an error: {e}");
57 break;
58 }
59 sleep(duration).await;
60 }
61 }
62 .in_current_span(),
63 )
64}
65
66#[allow(dead_code)]
67pub(crate) fn schedule_at_fixed_delay(
68 task: impl Fn() -> Result<()> + Send + Sync + 'static,
69 duration: Duration,
70) -> JoinHandle<()> {
71 RT.spawn(
72 async move {
73 let mut interval = interval(duration);
74 loop {
75 interval.tick().await;
76 let ret = async { task() }.await;
77 if let Err(e) = ret {
78 error!("schedule_at_fixed_delay occur an error: {e}");
79 break;
80 }
81 }
82 }
83 .in_current_span(),
84 )
85}
86
87#[cfg(test)]
88mod tests {
89
90 use super::*;
91 use crate::api::constants::ENV_NACOS_CLIENT_COMMON_THREAD_CORES;
92
93 #[test]
94 fn test_common_thread_cores() {
95 let num_cpus = std::env::var(ENV_NACOS_CLIENT_COMMON_THREAD_CORES)
96 .ok()
97 .and_then(|v| v.parse::<usize>().ok().filter(|n| *n > 0))
98 .unwrap_or(std::thread::available_parallelism().unwrap().get());
99 assert!(num_cpus > 0);
100
101 unsafe {
102 std::env::set_var(ENV_NACOS_CLIENT_COMMON_THREAD_CORES, "4");
103 }
104 let num_cpus = std::env::var(ENV_NACOS_CLIENT_COMMON_THREAD_CORES)
105 .ok()
106 .and_then(|v| v.parse::<usize>().ok().filter(|n| *n > 0))
107 .unwrap_or(std::thread::available_parallelism().unwrap().get());
108 assert_eq!(num_cpus, 4);
109 }
110
111 #[test]
112 fn test_spawn() {
113 let handler = spawn(async {
114 println!("test spawn task");
115 5
116 });
117 let ret = RT.block_on(handler);
118 let ret = ret.unwrap();
119 assert_eq!(ret, 5);
120 }
121
122 #[test]
123 fn test_schedule() {
124 let handler = schedule(
125 async move {
126 println!("test schedule task");
127 5
128 },
129 tokio::time::Duration::from_secs(1),
130 );
131
132 let ret = RT.block_on(handler);
133 let ret = ret.unwrap();
134 assert_eq!(ret, 5);
135 }
136
137 #[test]
138 fn test_schedule_at_fixed_delay() {
139 let handler = schedule_at_fixed_delay(
140 || {
141 println!("test schedule at fixed delay");
142 Ok(())
143 },
144 tokio::time::Duration::from_secs(1),
145 );
146
147 std::thread::sleep(core::time::Duration::from_secs(3));
148 handler.abort();
149 std::thread::sleep(core::time::Duration::from_secs(5));
150 println!("task has been canceled!")
151 }
152
153 #[test]
154 fn test_schedule_at_fixed_rate() {
155 let handler = schedule_at_fixed_rate(
156 || {
157 println!("test schedule at fixed rate");
158 Ok(())
159 },
160 tokio::time::Duration::from_secs(1),
161 );
162
163 std::thread::sleep(core::time::Duration::from_secs(3));
164 handler.abort();
165 std::thread::sleep(core::time::Duration::from_secs(5));
166 println!("task has been canceled!")
167 }
168
169 #[test]
170 fn test_spawn_hundred_task() {
171 for i in 1..100 {
172 let _ = spawn(async move {
173 println!("test_spawn_thousand_task spawn {i}");
174 });
175 }
176 for j in 1..100 {
177 let _ = schedule(
178 async move {
179 println!("test_spawn_thousand_task schedule {j}");
180 },
181 Duration::from_millis(j),
182 );
183 }
184 std::thread::sleep(Duration::from_millis(1010));
185 }
186}