windows_threading/pool.rs
1use super::*;
2use core::{marker::PhantomData, ops::Deref};
3
4/// A `Pool` object represents a private thread pool with its own thread limits.
5///
6/// This is in contrast to the default, or shared, thread pool used by the crate's `submit` function
7/// as well as other code within the same process.
8pub struct Pool(Box<TP_CALLBACK_ENVIRON_V3>);
9
10impl Pool {
11 /// Creates a new `Pool` object.
12 pub fn new() -> Self {
13 let mut e = TP_CALLBACK_ENVIRON_V3 {
14 Version: 3,
15 CallbackPriority: TP_CALLBACK_PRIORITY_NORMAL,
16 Size: core::mem::size_of::<TP_CALLBACK_ENVIRON_V3>() as u32,
17 ..Default::default()
18 };
19
20 unsafe {
21 e.Pool = check(CreateThreadpool(core::ptr::null()));
22 e.CleanupGroup = check(CreateThreadpoolCleanupGroup());
23 }
24
25 // The `TP_CALLBACK_ENVIRON_V3` is boxed to ensure its memory address remains stable for the life of the `Pool` object.
26 Self(Box::new(e))
27 }
28
29 /// Convenience function for creating a new pool and calling [`scope`][Self::scope].
30 pub fn with_scope<'env, F>(f: F)
31 where
32 F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>),
33 {
34 let pool = Pool::new();
35 pool.scope(f);
36 }
37
38 /// Sets the thread limits for the `Pool` object.
39 pub fn set_thread_limits(&self, min: u32, max: u32) {
40 unsafe {
41 check(SetThreadpoolThreadMinimum(self.0.Pool, min));
42 SetThreadpoolThreadMaximum(self.0.Pool, max);
43 }
44 }
45
46 /// Submit the closure to the thread pool.
47 ///
48 /// * The closure must have `'static` lifetime as the thread may outlive the lifetime in which `submit` is called.
49 /// * The closure must be `Send` as it will be sent to another thread for execution.
50 pub fn submit<F: FnOnce() + Send + 'static>(&self, f: F) {
51 // This is safe because the closure has a `'static` lifetime.
52 unsafe {
53 try_submit(&*self.0, f);
54 }
55 }
56
57 /// Create a scope for submitting closures.
58 ///
59 /// Within this scope local variables can be sent to the pool thread for execution.
60 /// This is possible because `scope` will wait for all submitted closures to finish before returning,
61 /// Note however that it will also wait for closures that were submitted from other threads.
62 pub fn scope<'env, F>(&self, f: F)
63 where
64 F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>),
65 {
66 struct DropGuard<'a>(&'a Pool);
67 impl Drop for DropGuard<'_> {
68 fn drop(&mut self) {
69 self.0.join();
70 }
71 }
72 // Ensure that we always join the pool before returning.
73 let _guard = DropGuard(self);
74 let scope = Scope {
75 pool: self,
76 env: PhantomData,
77 scope: PhantomData,
78 };
79 f(&scope);
80 }
81
82 /// Waits for all submissions to finish.
83 ///
84 /// Dropping the `Pool` will also wait for all submissions to finish.
85 pub fn join(&self) {
86 unsafe {
87 CloseThreadpoolCleanupGroupMembers(self.0.CleanupGroup, 0, core::ptr::null_mut());
88 }
89 }
90}
91
92impl Default for Pool {
93 fn default() -> Self {
94 Self::new()
95 }
96}
97
98unsafe impl Sync for Pool {}
99unsafe impl Send for Pool {}
100
101impl Drop for Pool {
102 fn drop(&mut self) {
103 // The `Pool` object cannot be dropped without waiting for all closures to complete, as their
104 // lifetimes are only guaranteed to be as long as the `Pool` object.
105 self.join();
106
107 unsafe {
108 CloseThreadpoolCleanupGroup(self.0.CleanupGroup);
109 CloseThreadpool(self.0.Pool);
110 }
111 }
112}
113
114/// A scope to submit closures in.
115///
116/// See [`scope`][Pool::scope] for details.
117pub struct Scope<'scope, 'env: 'scope> {
118 pool: &'scope Pool,
119 scope: PhantomData<&'scope mut &'scope ()>,
120 env: PhantomData<&'env mut &'env ()>,
121}
122
123impl<'scope, 'env> Scope<'scope, 'env> {
124 /// Submits the closure to run on the `Pool`.
125 ///
126 /// The closure cannot outlive the `Scope` it's run in.
127 pub fn submit<F: FnOnce() + Send + 'scope>(&'scope self, f: F) {
128 unsafe {
129 try_submit(&*self.pool.0, f);
130 }
131 }
132}
133
134impl Deref for Scope<'_, '_> {
135 type Target = Pool;
136 fn deref(&self) -> &Self::Target {
137 self.pool
138 }
139}