zrx_executor/executor/strategy/worker/sharing.rs
1// Copyright (c) Zensical LLC <https://zensical.org>
2
3// SPDX-License-Identifier: MIT
4// Third-party contributions licensed under CLA
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Work-sharing execution strategy.
27
28use crossbeam::channel::{bounded, Sender};
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::sync::Arc;
31use std::thread::{self, Builder, JoinHandle};
32use std::{cmp, fmt, panic};
33
34use crate::executor::strategy::Strategy;
35use crate::executor::task::Task;
36use crate::executor::Result;
37
38// ----------------------------------------------------------------------------
39// Structs
40// ----------------------------------------------------------------------------
41
42/// Work-sharing execution strategy.
43///
44/// This strategy manages its tasks centrally in a single bounded [`crossbeam`]
45/// channel, which pull tasks from it and execute them, repeating the process
46/// until they are terminated. This is a very simple, yet reasonably efficient
47/// strategy in most cases.
48///
49/// Tasks are processed in the exact same order they were submitted, albeit they
50/// might not finish in the same order. As this strategy uses a bounded channel,
51/// task submission might fail when the channel's capacity is reached, leading
52/// to better performance characteristics due to the use of atomics over locks.
53/// As an alternative, the [`WorkStealing`][] strategy can be used, which is
54/// built on unbounded channels and allows for more flexible task submission,
55/// including automatic load balancing between workers which is particularly
56/// useful when tasks create subtasks.
57///
58/// [`WorkStealing`]: crate::executor::strategy::WorkStealing
59///
60/// # Examples
61///
62/// ```
63/// # use std::error::Error;
64/// # fn main() -> Result<(), Box<dyn Error>> {
65/// use zrx_executor::strategy::{Strategy, WorkSharing};
66///
67/// // Create strategy and submit task
68/// let strategy = WorkSharing::default();
69/// strategy.submit(Box::new(|| println!("Task")))?;
70/// # Ok(())
71/// # }
72/// ```
73pub struct WorkSharing {
74 /// Task submission sender.
75 sender: Option<Sender<Box<dyn Task>>>,
76 /// Join handles of worker threads.
77 threads: Vec<JoinHandle<()>>,
78 /// Counter for running tasks.
79 running: Arc<AtomicUsize>,
80}
81
82// ----------------------------------------------------------------------------
83// Implementations
84// ----------------------------------------------------------------------------
85
86impl WorkSharing {
87 /// Creates a work-sharing execution strategy.
88 ///
89 /// This method creates a strategy with the given number of worker threads,
90 /// which are spawned immediately before the method returns. Internally, a
91 /// bounded channel is created with a capacity of 8 tasks per worker, so
92 /// for 4 workers, the channel will have a capacity of 32 tasks.
93 ///
94 /// Use [`WorkSharing::with_capacity`] to set a custom capacity.
95 ///
96 /// # Panics
97 ///
98 /// Panics if thread creation fails.
99 ///
100 /// # Examples
101 ///
102 /// ```
103 /// use zrx_executor::strategy::WorkSharing;
104 ///
105 /// // Create strategy
106 /// let strategy = WorkSharing::new(4);
107 /// ```
108 #[must_use]
109 pub fn new(num_workers: usize) -> Self {
110 Self::with_capacity(num_workers, 8 * num_workers)
111 }
112
113 /// Creates a work-sharing execution strategy with the given capacity.
114 ///
115 /// This method creates a strategy with the given number of worker threads,
116 /// which are spawned immediately before the method returns.
117 ///
118 /// This strategy makes use of a bounded channel for its better performance
119 /// characteristics, since the caller is expected to have control over task
120 /// submission, ensuring that the executor can accept new tasks. The given
121 /// capacity sets the number of tasks the executor accepts before starting
122 /// to reject them, which can be used to apply backpressure. Note that the
123 /// capacity is not a per-worker, but a global per-executor limit.
124 ///
125 /// # Panics
126 ///
127 /// Panics if thread creation fails.
128 ///
129 /// # Examples
130 ///
131 /// ```
132 /// use zrx_executor::strategy::WorkSharing;
133 ///
134 /// // Create strategy with capacity
135 /// let strategy = WorkSharing::with_capacity(4, 64);
136 /// ```
137 #[must_use]
138 pub fn with_capacity(num_workers: usize, capacity: usize) -> Self {
139 let (sender, receiver) = bounded::<Box<dyn Task>>(capacity);
140
141 // Keep track of running tasks
142 let running = Arc::new(AtomicUsize::new(0));
143
144 // Initialize worker threads
145 let iter = (0..num_workers).map(|index| {
146 let receiver = receiver.clone();
147
148 // Create worker thread and poll the receiver until the sender is
149 // dropped, automatically exiting the loop. Additionally, we keep
150 // track of the number of running tasks to provide a simple way to
151 // monitor the load of the thread pool.
152 let running = Arc::clone(&running);
153 let h = move || {
154 while let Ok(task) = receiver.recv() {
155 running.fetch_add(1, Ordering::Release);
156
157 // Execute task and immediately execute all subtasks on the
158 // same worker, if any, as the work-sharing strategy has no
159 // means of distributing work to other workers threads. We
160 // also keep the running count due to sequential execution,
161 // and catch panics, as we're running user-land code that
162 // might be sloppy. However, since the executor has no way
163 // of reporting panics, tasks should wrap execution as we
164 // do here, and abort with a proper error.
165 let _ = panic::catch_unwind(|| {
166 let subtasks = task.execute();
167 if !subtasks.is_empty() {
168 // Execution is recursive, so in case a subtask has
169 // further subtasks, they are executed depth-first
170 subtasks.execute();
171 }
172 });
173
174 // Update number of running tasks
175 running.fetch_sub(1, Ordering::Acquire);
176 }
177 };
178
179 // We deliberately use unwrap here, as the capability to spawn
180 // threads is a fundamental requirement of the executor
181 Builder::new()
182 .name(format!("zrx/executor/{}", index + 1))
183 .spawn(h)
184 .unwrap()
185 });
186
187 // Create worker threads and return strategy
188 let threads = iter.collect();
189 Self {
190 sender: Some(sender),
191 threads,
192 running,
193 }
194 }
195}
196
197// ----------------------------------------------------------------------------
198// Trait implementations
199// ----------------------------------------------------------------------------
200
201impl Strategy for WorkSharing {
202 /// Submits a task.
203 ///
204 /// This method submits a [`Task`], which is executed by one of the worker
205 /// threads as soon as possible. If a task computes a result, a [`Sender`][]
206 /// can be shared with the task, to send the result back to the caller,
207 /// which can then poll a [`Receiver`][].
208 ///
209 /// Note that tasks are intended to only run once, which is why they are
210 /// consumed. If a task needs to be run multiple times, it must be wrapped
211 /// in a closure that creates a new task each time. This allows for safe
212 /// sharing of state between tasks.
213 ///
214 /// [`Receiver`]: crossbeam::channel::Receiver
215 /// [`Sender`]: crossbeam::channel::Sender
216 ///
217 /// # Errors
218 ///
219 /// If the task cannot be submitted, [`Error::Submit`][] is returned, which
220 /// can only happen if the channel is disconnected or at capacity.
221 ///
222 /// [`Error::Submit`]: crate::executor::Error::Submit
223 ///
224 /// # Examples
225 ///
226 /// ```
227 /// # use std::error::Error;
228 /// # fn main() -> Result<(), Box<dyn Error>> {
229 /// use zrx_executor::strategy::{Strategy, WorkSharing};
230 ///
231 /// // Create strategy and submit task
232 /// let strategy = WorkSharing::default();
233 /// strategy.submit(Box::new(|| println!("Task")))?;
234 /// # Ok(())
235 /// # }
236 /// ```
237 fn submit(&self, task: Box<dyn Task>) -> Result {
238 match self.sender.as_ref() {
239 Some(sender) => Ok(sender.try_send(task)?),
240 None => unreachable!(),
241 }
242 }
243
244 /// Returns the number of workers.
245 ///
246 /// # Examples
247 ///
248 /// ```
249 /// use zrx_executor::strategy::{Strategy, WorkSharing};
250 ///
251 /// // Get number of workers
252 /// let strategy = WorkSharing::new(1);
253 /// assert_eq!(strategy.num_workers(), 1);
254 /// ```
255 #[inline]
256 fn num_workers(&self) -> usize {
257 self.threads.len()
258 }
259
260 /// Returns the number of running tasks.
261 ///
262 /// This method allows to monitor the worker load, as it returns how many
263 /// workers are currently actively executing tasks.
264 ///
265 /// # Examples
266 ///
267 /// ```
268 /// use zrx_executor::strategy::{Strategy, WorkSharing};
269 ///
270 /// // Get number of running tasks
271 /// let strategy = WorkSharing::default();
272 /// assert_eq!(strategy.num_tasks_running(), 0);
273 /// ```
274 #[inline]
275 fn num_tasks_running(&self) -> usize {
276 self.running.load(Ordering::Relaxed)
277 }
278
279 /// Returns the number of pending tasks.
280 ///
281 /// This method allows to throttle the submission of tasks, as it returns
282 /// how many tasks are currently waiting to be executed.
283 ///
284 /// # Examples
285 ///
286 /// ```
287 /// use zrx_executor::strategy::{Strategy, WorkSharing};
288 ///
289 /// // Get number of pending tasks
290 /// let strategy = WorkSharing::default();
291 /// assert_eq!(strategy.num_tasks_pending(), 0);
292 /// ```
293 #[inline]
294 fn num_tasks_pending(&self) -> usize {
295 self.sender.as_ref().map_or(0, Sender::len)
296 }
297
298 /// Returns the capacity, if bounded.
299 ///
300 /// This method returns the maximum number of tasks that can be submitted
301 /// at once, which can be used by the strategy for applying backpressure.
302 ///
303 /// # Examples
304 ///
305 /// ```
306 /// use zrx_executor::strategy::{Strategy, WorkSharing};
307 ///
308 /// // Get capacity
309 /// let strategy = WorkSharing::default();
310 /// assert!(strategy.capacity() >= Some(strategy.num_workers()));
311 /// ```
312 #[inline]
313 fn capacity(&self) -> Option<usize> {
314 self.sender.as_ref().and_then(Sender::capacity)
315 }
316}
317
318// ----------------------------------------------------------------------------
319
320impl Default for WorkSharing {
321 /// Creates a work-sharing execution strategy using all CPUs - 1.
322 ///
323 /// The number of workers is determined by the number of logical CPUs minus
324 /// one, which reserves one core for the main thread for orchestration. If
325 /// the number of logical CPUs is fewer than 1, the strategy defaults to a
326 /// single worker thread.
327 ///
328 /// __Warning__: this method makes use of [`thread::available_parallelism`]
329 /// to determine the number of available cores, which has some limitations.
330 /// Please refer to the documentation of that function for more details, or
331 /// consider using [`num_cpus`][] as an alternative.
332 ///
333 /// [`num_cpus`]: https://crates.io/crates/num_cpus
334 ///
335 /// # Examples
336 ///
337 /// ```
338 /// use zrx_executor::strategy::WorkSharing;
339 ///
340 /// // Create strategy
341 /// let strategy = WorkSharing::default();
342 /// ```
343 #[inline]
344 fn default() -> Self {
345 Self::new(cmp::max(
346 thread::available_parallelism()
347 .map(|num| num.get().saturating_sub(1))
348 .unwrap_or(1),
349 1,
350 ))
351 }
352}
353
354impl Drop for WorkSharing {
355 /// Terminates and joins all worker threads.
356 ///
357 /// This method waits for all worker threads to finish executing currently
358 /// running tasks, while ignoring any pending tasks. All worker threads are
359 /// joined before the method returns. This is necessary to prevent worker
360 /// threads from running after the strategy has been dropped.
361 fn drop(&mut self) {
362 // Dropping the sender causes all receivers to terminate
363 if let Some(sender) = self.sender.take() {
364 drop(sender);
365 }
366
367 // Join all worker threads without panicking on errors
368 for handle in self.threads.drain(..) {
369 let _ = handle.join();
370 }
371 }
372}
373
374// ----------------------------------------------------------------------------
375
376impl fmt::Debug for WorkSharing {
377 /// Formats the execution strategy for debugging.
378 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
379 f.debug_struct("WorkSharing")
380 .field("workers", &self.num_workers())
381 .field("running", &self.num_tasks_running())
382 .field("pending", &self.num_tasks_pending())
383 .finish()
384 }
385}