windows_threading/
pool.rs

1use super::*;
2use core::{marker::PhantomData, ops::Deref};
3
4/// A `Pool` object represents a private thread pool with its own thread limits.
5///
6/// This is in contrast to the default, or shared, thread pool used by the crate's `submit` function
7/// as well as other code within the same process.
8pub struct Pool(Box<TP_CALLBACK_ENVIRON_V3>);
9
10impl Pool {
11    /// Creates a new `Pool` object.
12    pub fn new() -> Self {
13        let mut e = TP_CALLBACK_ENVIRON_V3 {
14            Version: 3,
15            CallbackPriority: TP_CALLBACK_PRIORITY_NORMAL,
16            Size: core::mem::size_of::<TP_CALLBACK_ENVIRON_V3>() as u32,
17            ..Default::default()
18        };
19
20        unsafe {
21            e.Pool = check(CreateThreadpool(core::ptr::null()));
22            e.CleanupGroup = check(CreateThreadpoolCleanupGroup());
23        }
24
25        // The `TP_CALLBACK_ENVIRON_V3` is boxed to ensure its memory address remains stable for the life of the `Pool` object.
26        Self(Box::new(e))
27    }
28
29    /// Convenience function for creating a new pool and calling [`scope`][Self::scope].
30    pub fn with_scope<'env, F>(f: F)
31    where
32        F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>),
33    {
34        let pool = Pool::new();
35        pool.scope(f);
36    }
37
38    /// Sets the thread limits for the `Pool` object.
39    pub fn set_thread_limits(&self, min: u32, max: u32) {
40        unsafe {
41            check(SetThreadpoolThreadMinimum(self.0.Pool, min));
42            SetThreadpoolThreadMaximum(self.0.Pool, max);
43        }
44    }
45
46    /// Submit the closure to the thread pool.
47    ///
48    /// * The closure must have `'static` lifetime as the thread may outlive the lifetime in which `submit` is called.
49    /// * The closure must be `Send` as it will be sent to another thread for execution.
50    pub fn submit<F: FnOnce() + Send + 'static>(&self, f: F) {
51        // This is safe because the closure has a `'static` lifetime.
52        unsafe {
53            try_submit(&*self.0, f);
54        }
55    }
56
57    /// Create a scope for submitting closures.
58    ///
59    /// Within this scope local variables can be sent to the pool thread for execution.
60    /// This is possible because `scope` will wait for all submitted closures to finish before returning,
61    /// Note however that it will also wait for closures that were submitted from other threads.
62    pub fn scope<'env, F>(&self, f: F)
63    where
64        F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>),
65    {
66        struct DropGuard<'a>(&'a Pool);
67        impl Drop for DropGuard<'_> {
68            fn drop(&mut self) {
69                self.0.join();
70            }
71        }
72        // Ensure that we always join the pool before returning.
73        let _guard = DropGuard(self);
74        let scope = Scope {
75            pool: self,
76            env: PhantomData,
77            scope: PhantomData,
78        };
79        f(&scope);
80    }
81
82    /// Waits for all submissions to finish.
83    ///
84    /// Dropping the `Pool` will also wait for all submissions to finish.
85    pub fn join(&self) {
86        unsafe {
87            CloseThreadpoolCleanupGroupMembers(self.0.CleanupGroup, 0, core::ptr::null_mut());
88        }
89    }
90}
91
92impl Default for Pool {
93    fn default() -> Self {
94        Self::new()
95    }
96}
97
98unsafe impl Sync for Pool {}
99unsafe impl Send for Pool {}
100
101impl Drop for Pool {
102    fn drop(&mut self) {
103        // The `Pool` object cannot be dropped without waiting for all closures to complete, as their
104        // lifetimes are only guaranteed to be as long as the `Pool` object.
105        self.join();
106
107        unsafe {
108            CloseThreadpoolCleanupGroup(self.0.CleanupGroup);
109            CloseThreadpool(self.0.Pool);
110        }
111    }
112}
113
114/// A scope to submit closures in.
115///
116/// See [`scope`][Pool::scope] for details.
117pub struct Scope<'scope, 'env: 'scope> {
118    pool: &'scope Pool,
119    scope: PhantomData<&'scope mut &'scope ()>,
120    env: PhantomData<&'env mut &'env ()>,
121}
122
123impl<'scope, 'env> Scope<'scope, 'env> {
124    /// Submits the closure to run on the `Pool`.
125    ///
126    /// The closure cannot outlive the `Scope` it's run in.
127    pub fn submit<F: FnOnce() + Send + 'scope>(&'scope self, f: F) {
128        unsafe {
129            try_submit(&*self.pool.0, f);
130        }
131    }
132}
133
134impl Deref for Scope<'_, '_> {
135    type Target = Pool;
136    fn deref(&self) -> &Self::Target {
137        self.pool
138    }
139}