async_global_executor/
threading.rs

1use crate::Task;
2use async_channel::{Receiver, Sender};
3use async_lock::Mutex;
4use futures_lite::future;
5use std::{cell::OnceCell, io, thread};
6
7// The current number of threads (some might be shutting down and not in the pool anymore)
8static GLOBAL_EXECUTOR_THREADS_NUMBER: Mutex<usize> = Mutex::new(0);
9// The expected number of threads (excluding the one that are shutting down)
10static GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER: Mutex<usize> = Mutex::new(0);
11
12thread_local! {
13    // Used to shutdown a thread when we receive a message from the Sender.
14    // We send an ack using to the Receiver once we're finished shutting down.
15    static THREAD_SHUTDOWN: OnceCell<(Sender<()>, Receiver<()>)> = const { OnceCell::new() };
16}
17
18/// Spawn more executor threads, up to configured max value.
19///
20/// Returns how many threads we spawned.
21///
22/// # Examples
23///
24/// ```
25/// async_global_executor::spawn_more_threads(2);
26/// ```
27pub async fn spawn_more_threads(count: usize) -> io::Result<usize> {
28    // Get the current configuration, or initialize the thread pool.
29    let config = crate::config::GLOBAL_EXECUTOR_CONFIG
30        .get()
31        .unwrap_or_else(|| {
32            crate::init();
33            crate::config::GLOBAL_EXECUTOR_CONFIG.get().unwrap()
34        });
35    // How many threads do we have (including shutting down)
36    let mut threads_number = GLOBAL_EXECUTOR_THREADS_NUMBER.lock().await;
37    // How many threads are we supposed to have (when all shutdowns are complete)
38    let mut expected_threads_number = GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER.lock().await;
39    // Ensure we don't exceed configured max threads (including shutting down)
40    let count = count.min(config.max_threads - *threads_number);
41    for _ in 0..count {
42        thread::Builder::new()
43            .name((config.thread_name_fn)())
44            .spawn(thread_main_loop)?;
45        *threads_number += 1;
46        *expected_threads_number += 1;
47    }
48    Ok(count)
49}
50
51/// Stop one of the executor threads, down to configured min value
52///
53/// Returns whether a thread has been stopped.
54///
55/// # Examples
56///
57/// ```
58/// async_global_executor::stop_thread();
59/// ```
60pub fn stop_thread() -> Task<bool> {
61    crate::spawn(stop_current_executor_thread())
62}
63
64/// Stop the current executor thread, if we exceed the configured min value
65///
66/// Returns whether the thread has been stopped.
67///
68/// # Examples
69///
70/// ```
71/// async_global_executor::stop_current_thread();
72/// ```
73pub fn stop_current_thread() -> Task<bool> {
74    crate::spawn_local(stop_current_executor_thread())
75}
76
77fn thread_main_loop() {
78    // This will be used to ask for shutdown.
79    let (s, r) = async_channel::bounded(1);
80    // This wil be used to ack once shutdown is complete.
81    let (s_ack, r_ack) = async_channel::bounded(1);
82    THREAD_SHUTDOWN.with(|thread_shutdown| drop(thread_shutdown.set((s, r_ack))));
83
84    // Main loop
85    loop {
86        #[allow(clippy::blocks_in_conditions)]
87        if std::panic::catch_unwind(|| {
88            crate::executor::LOCAL_EXECUTOR.with(|executor| {
89                let local = executor.run(async {
90                    // Wait until we're asked to shutdown.
91                    let _ = r.recv().await;
92                });
93                let global = crate::executor::GLOBAL_EXECUTOR.run(future::pending::<()>());
94                crate::reactor::block_on(future::or(local, global));
95            });
96        })
97        .is_ok()
98        {
99            break;
100        }
101    }
102
103    wait_for_local_executor_completion();
104
105    // Ack that we're done shutting down.
106    crate::reactor::block_on(async {
107        let _ = s_ack.send(()).await;
108    });
109}
110
111fn wait_for_local_executor_completion() {
112    loop {
113        #[allow(clippy::blocks_in_conditions)]
114        if std::panic::catch_unwind(|| {
115            crate::executor::LOCAL_EXECUTOR.with(|executor| {
116                crate::reactor::block_on(async {
117                    // Wait for spawned tasks completion
118                    while !executor.is_empty() {
119                        executor.tick().await;
120                    }
121                });
122            });
123        })
124        .is_ok()
125        {
126            break;
127        }
128    }
129}
130
131async fn stop_current_executor_thread() -> bool {
132    // How many threads are we supposed to have (when all shutdowns are complete)
133    let mut expected_threads_number = GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER.lock().await;
134    // Ensure we don't go below the configured min_threads (ignoring shutting down)
135    if *expected_threads_number
136        > crate::config::GLOBAL_EXECUTOR_CONFIG
137            .get()
138            .unwrap()
139            .min_threads
140    {
141        let (s, r_ack) =
142            THREAD_SHUTDOWN.with(|thread_shutdown| thread_shutdown.get().unwrap().clone());
143        let _ = s.send(()).await;
144        // We now expect to have one less thread (this one is shutting down)
145        *expected_threads_number -= 1;
146        // Unlock the Mutex
147        drop(expected_threads_number);
148        let _ = r_ack.recv().await;
149        // This thread is done shutting down
150        *GLOBAL_EXECUTOR_THREADS_NUMBER.lock().await -= 1;
151        true
152    } else {
153        false
154    }
155}