async_global_executor/
threading.rs1use crate::Task;
2use async_channel::{Receiver, Sender};
3use async_lock::Mutex;
4use futures_lite::future;
5use std::{cell::OnceCell, io, thread};
6
7static GLOBAL_EXECUTOR_THREADS_NUMBER: Mutex<usize> = Mutex::new(0);
9static GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER: Mutex<usize> = Mutex::new(0);
11
12thread_local! {
13 static THREAD_SHUTDOWN: OnceCell<(Sender<()>, Receiver<()>)> = const { OnceCell::new() };
16}
17
18pub async fn spawn_more_threads(count: usize) -> io::Result<usize> {
28 let config = crate::config::GLOBAL_EXECUTOR_CONFIG
30 .get()
31 .unwrap_or_else(|| {
32 crate::init();
33 crate::config::GLOBAL_EXECUTOR_CONFIG.get().unwrap()
34 });
35 let mut threads_number = GLOBAL_EXECUTOR_THREADS_NUMBER.lock().await;
37 let mut expected_threads_number = GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER.lock().await;
39 let count = count.min(config.max_threads - *threads_number);
41 for _ in 0..count {
42 thread::Builder::new()
43 .name((config.thread_name_fn)())
44 .spawn(thread_main_loop)?;
45 *threads_number += 1;
46 *expected_threads_number += 1;
47 }
48 Ok(count)
49}
50
51pub fn stop_thread() -> Task<bool> {
61 crate::spawn(stop_current_executor_thread())
62}
63
64pub fn stop_current_thread() -> Task<bool> {
74 crate::spawn_local(stop_current_executor_thread())
75}
76
77fn thread_main_loop() {
78 let (s, r) = async_channel::bounded(1);
80 let (s_ack, r_ack) = async_channel::bounded(1);
82 THREAD_SHUTDOWN.with(|thread_shutdown| drop(thread_shutdown.set((s, r_ack))));
83
84 loop {
86 #[allow(clippy::blocks_in_conditions)]
87 if std::panic::catch_unwind(|| {
88 crate::executor::LOCAL_EXECUTOR.with(|executor| {
89 let local = executor.run(async {
90 let _ = r.recv().await;
92 });
93 let global = crate::executor::GLOBAL_EXECUTOR.run(future::pending::<()>());
94 crate::reactor::block_on(future::or(local, global));
95 });
96 })
97 .is_ok()
98 {
99 break;
100 }
101 }
102
103 wait_for_local_executor_completion();
104
105 crate::reactor::block_on(async {
107 let _ = s_ack.send(()).await;
108 });
109}
110
111fn wait_for_local_executor_completion() {
112 loop {
113 #[allow(clippy::blocks_in_conditions)]
114 if std::panic::catch_unwind(|| {
115 crate::executor::LOCAL_EXECUTOR.with(|executor| {
116 crate::reactor::block_on(async {
117 while !executor.is_empty() {
119 executor.tick().await;
120 }
121 });
122 });
123 })
124 .is_ok()
125 {
126 break;
127 }
128 }
129}
130
131async fn stop_current_executor_thread() -> bool {
132 let mut expected_threads_number = GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER.lock().await;
134 if *expected_threads_number
136 > crate::config::GLOBAL_EXECUTOR_CONFIG
137 .get()
138 .unwrap()
139 .min_threads
140 {
141 let (s, r_ack) =
142 THREAD_SHUTDOWN.with(|thread_shutdown| thread_shutdown.get().unwrap().clone());
143 let _ = s.send(()).await;
144 *expected_threads_number -= 1;
146 drop(expected_threads_number);
148 let _ = r_ack.recv().await;
149 *GLOBAL_EXECUTOR_THREADS_NUMBER.lock().await -= 1;
151 true
152 } else {
153 false
154 }
155}