nacos_sdk/common/executor/
mod.rs

1use crate::api::error::Result;
2use futures::Future;
3use tokio::{
4    runtime::{Builder, Runtime},
5    task::JoinHandle,
6    time::{Duration, interval, sleep},
7};
8use tracing::{Instrument, error};
9
10static COMMON_THREAD_CORES: std::sync::LazyLock<usize> = std::sync::LazyLock::new(|| {
11    std::env::var(crate::api::constants::ENV_NACOS_CLIENT_COMMON_THREAD_CORES)
12        .ok()
13        .and_then(|v| v.parse::<usize>().ok().filter(|n| *n > 0))
14        .unwrap_or(1)
15});
16
17static RT: std::sync::LazyLock<Runtime> = std::sync::LazyLock::new(|| {
18    Builder::new_multi_thread()
19        .enable_all()
20        .thread_name("nacos-client-thread-pool")
21        .worker_threads(*COMMON_THREAD_CORES)
22        .build()
23        .unwrap()
24});
25
26pub(crate) fn spawn<F>(future: F) -> JoinHandle<F::Output>
27where
28    F: Future + Send + 'static,
29    F::Output: Send + 'static,
30{
31    RT.spawn(future)
32}
33
34#[allow(dead_code)]
35pub(crate) fn schedule<F>(future: F, delay: Duration) -> JoinHandle<F::Output>
36where
37    F: Future + Send + 'static,
38    F::Output: Send + 'static,
39{
40    RT.spawn(async move {
41        sleep(delay).await;
42        future.await
43    })
44}
45
46#[allow(dead_code)]
47pub(crate) fn schedule_at_fixed_rate(
48    task: impl Fn() -> Result<()> + Send + Sync + 'static,
49    duration: Duration,
50) -> JoinHandle<()> {
51    RT.spawn(
52        async move {
53            loop {
54                let ret = async { task() }.await;
55                if let Err(e) = ret {
56                    error!("schedule_at_fixed_rate occur an error: {e}");
57                    break;
58                }
59                sleep(duration).await;
60            }
61        }
62        .in_current_span(),
63    )
64}
65
66#[allow(dead_code)]
67pub(crate) fn schedule_at_fixed_delay(
68    task: impl Fn() -> Result<()> + Send + Sync + 'static,
69    duration: Duration,
70) -> JoinHandle<()> {
71    RT.spawn(
72        async move {
73            let mut interval = interval(duration);
74            loop {
75                interval.tick().await;
76                let ret = async { task() }.await;
77                if let Err(e) = ret {
78                    error!("schedule_at_fixed_delay occur an error: {e}");
79                    break;
80                }
81            }
82        }
83        .in_current_span(),
84    )
85}
86
87#[cfg(test)]
88mod tests {
89
90    use super::*;
91    use crate::api::constants::ENV_NACOS_CLIENT_COMMON_THREAD_CORES;
92
93    #[test]
94    fn test_common_thread_cores() {
95        let num_cpus = std::env::var(ENV_NACOS_CLIENT_COMMON_THREAD_CORES)
96            .ok()
97            .and_then(|v| v.parse::<usize>().ok().filter(|n| *n > 0))
98            .unwrap_or(std::thread::available_parallelism().unwrap().get());
99        assert!(num_cpus > 0);
100
101        unsafe {
102            std::env::set_var(ENV_NACOS_CLIENT_COMMON_THREAD_CORES, "4");
103        }
104        let num_cpus = std::env::var(ENV_NACOS_CLIENT_COMMON_THREAD_CORES)
105            .ok()
106            .and_then(|v| v.parse::<usize>().ok().filter(|n| *n > 0))
107            .unwrap_or(std::thread::available_parallelism().unwrap().get());
108        assert_eq!(num_cpus, 4);
109    }
110
111    #[test]
112    fn test_spawn() {
113        let handler = spawn(async {
114            println!("test spawn task");
115            5
116        });
117        let ret = RT.block_on(handler);
118        let ret = ret.unwrap();
119        assert_eq!(ret, 5);
120    }
121
122    #[test]
123    fn test_schedule() {
124        let handler = schedule(
125            async move {
126                println!("test schedule task");
127                5
128            },
129            tokio::time::Duration::from_secs(1),
130        );
131
132        let ret = RT.block_on(handler);
133        let ret = ret.unwrap();
134        assert_eq!(ret, 5);
135    }
136
137    #[test]
138    fn test_schedule_at_fixed_delay() {
139        let handler = schedule_at_fixed_delay(
140            || {
141                println!("test schedule at fixed delay");
142                Ok(())
143            },
144            tokio::time::Duration::from_secs(1),
145        );
146
147        std::thread::sleep(core::time::Duration::from_secs(3));
148        handler.abort();
149        std::thread::sleep(core::time::Duration::from_secs(5));
150        println!("task has been canceled!")
151    }
152
153    #[test]
154    fn test_schedule_at_fixed_rate() {
155        let handler = schedule_at_fixed_rate(
156            || {
157                println!("test schedule at fixed rate");
158                Ok(())
159            },
160            tokio::time::Duration::from_secs(1),
161        );
162
163        std::thread::sleep(core::time::Duration::from_secs(3));
164        handler.abort();
165        std::thread::sleep(core::time::Duration::from_secs(5));
166        println!("task has been canceled!")
167    }
168
169    #[test]
170    fn test_spawn_hundred_task() {
171        for i in 1..100 {
172            let _ = spawn(async move {
173                println!("test_spawn_thousand_task spawn {i}");
174            });
175        }
176        for j in 1..100 {
177            let _ = schedule(
178                async move {
179                    println!("test_spawn_thousand_task schedule {j}");
180                },
181                Duration::from_millis(j),
182            );
183        }
184        std::thread::sleep(Duration::from_millis(1010));
185    }
186}