zrx_executor/executor/strategy/worker/
sharing.rs

1// Copyright (c) Zensical LLC <https://zensical.org>
2
3// SPDX-License-Identifier: MIT
4// Third-party contributions licensed under CLA
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Work-sharing execution strategy.
27
28use crossbeam::channel::{bounded, Sender};
29use std::sync::atomic::{AtomicUsize, Ordering};
30use std::sync::Arc;
31use std::thread::{self, Builder, JoinHandle};
32use std::{cmp, fmt, panic};
33
34use crate::executor::strategy::Strategy;
35use crate::executor::task::Task;
36use crate::executor::Result;
37
38// ----------------------------------------------------------------------------
39// Structs
40// ----------------------------------------------------------------------------
41
42/// Work-sharing execution strategy.
43///
44/// This strategy manages its tasks centrally in a single bounded [`crossbeam`]
45/// channel, which pull tasks from it and execute them, repeating the process
46/// until they are terminated. This is a very simple, yet reasonably efficient
47/// strategy in most cases.
48///
49/// Tasks are processed in the exact same order they were submitted, albeit they
50/// might not finish in the same order. As this strategy uses a bounded channel,
51/// task submission might fail when the channel's capacity is reached, leading
52/// to better performance characteristics due to the use of atomics over locks.
53/// As an alternative, the [`WorkStealing`][] strategy can be used, which is
54/// built on unbounded channels and allows for more flexible task submission,
55/// including automatic load balancing between workers which is particularly
56/// useful when tasks create subtasks.
57///
58/// [`WorkStealing`]: crate::executor::strategy::WorkStealing
59///
60/// # Examples
61///
62/// ```
63/// # use std::error::Error;
64/// # fn main() -> Result<(), Box<dyn Error>> {
65/// use zrx_executor::strategy::{Strategy, WorkSharing};
66///
67/// // Create strategy and submit task
68/// let strategy = WorkSharing::default();
69/// strategy.submit(Box::new(|| println!("Task")))?;
70/// # Ok(())
71/// # }
72/// ```
73pub struct WorkSharing {
74    /// Task submission sender.
75    sender: Option<Sender<Box<dyn Task>>>,
76    /// Join handles of worker threads.
77    threads: Vec<JoinHandle<()>>,
78    /// Counter for running tasks.
79    running: Arc<AtomicUsize>,
80}
81
82// ----------------------------------------------------------------------------
83// Implementations
84// ----------------------------------------------------------------------------
85
86impl WorkSharing {
87    /// Creates a work-sharing execution strategy.
88    ///
89    /// This method creates a strategy with the given number of worker threads,
90    /// which are spawned immediately before the method returns. Internally, a
91    /// bounded channel is created with a capacity of 8 tasks per worker, so
92    /// for 4 workers, the channel will have a capacity of 32 tasks.
93    ///
94    /// Use [`WorkSharing::with_capacity`] to set a custom capacity.
95    ///
96    /// # Panics
97    ///
98    /// Panics if thread creation fails.
99    ///
100    /// # Examples
101    ///
102    /// ```
103    /// use zrx_executor::strategy::WorkSharing;
104    ///
105    /// // Create strategy
106    /// let strategy = WorkSharing::new(4);
107    /// ```
108    #[must_use]
109    pub fn new(num_workers: usize) -> Self {
110        Self::with_capacity(num_workers, 8 * num_workers)
111    }
112
113    /// Creates a work-sharing execution strategy with the given capacity.
114    ///
115    /// This method creates a strategy with the given number of worker threads,
116    /// which are spawned immediately before the method returns.
117    ///
118    /// This strategy makes use of a bounded channel for its better performance
119    /// characteristics, since the caller is expected to have control over task
120    /// submission, ensuring that the executor can accept new tasks. The given
121    /// capacity sets the number of tasks the executor accepts before starting
122    /// to reject them, which can be used to apply backpressure. Note that the
123    /// capacity is not a per-worker, but a global per-executor limit.
124    ///
125    /// # Panics
126    ///
127    /// Panics if thread creation fails.
128    ///
129    /// # Examples
130    ///
131    /// ```
132    /// use zrx_executor::strategy::WorkSharing;
133    ///
134    /// // Create strategy with capacity
135    /// let strategy = WorkSharing::with_capacity(4, 64);
136    /// ```
137    #[must_use]
138    pub fn with_capacity(num_workers: usize, capacity: usize) -> Self {
139        let (sender, receiver) = bounded::<Box<dyn Task>>(capacity);
140
141        // Keep track of running tasks
142        let running = Arc::new(AtomicUsize::new(0));
143
144        // Initialize worker threads
145        let iter = (0..num_workers).map(|index| {
146            let receiver = receiver.clone();
147
148            // Create worker thread and poll the receiver until the sender is
149            // dropped, automatically exiting the loop. Additionally, we keep
150            // track of the number of running tasks to provide a simple way to
151            // monitor the load of the thread pool.
152            let running = Arc::clone(&running);
153            let h = move || {
154                while let Ok(task) = receiver.recv() {
155                    running.fetch_add(1, Ordering::Release);
156
157                    // Execute task and immediately execute all subtasks on the
158                    // same worker, if any, as the work-sharing strategy has no
159                    // means of distributing work to other workers threads. We
160                    // also keep the running count due to sequential execution,
161                    // and catch panics, as we're running user-land code that
162                    // might be sloppy. However, since the executor has no way
163                    // of reporting panics, tasks should wrap execution as we
164                    // do here, and abort with a proper error.
165                    let _ = panic::catch_unwind(|| {
166                        let subtasks = task.execute();
167                        if !subtasks.is_empty() {
168                            // Execution is recursive, so in case a subtask has
169                            // further subtasks, they are executed depth-first
170                            subtasks.execute();
171                        }
172                    });
173
174                    // Update number of running tasks
175                    running.fetch_sub(1, Ordering::Acquire);
176                }
177            };
178
179            // We deliberately use unwrap here, as the capability to spawn
180            // threads is a fundamental requirement of the executor
181            Builder::new()
182                .name(format!("zrx/executor/{}", index + 1))
183                .spawn(h)
184                .unwrap()
185        });
186
187        // Create worker threads and return strategy
188        let threads = iter.collect();
189        Self {
190            sender: Some(sender),
191            threads,
192            running,
193        }
194    }
195}
196
197// ----------------------------------------------------------------------------
198// Trait implementations
199// ----------------------------------------------------------------------------
200
201impl Strategy for WorkSharing {
202    /// Submits a task.
203    ///
204    /// This method submits a [`Task`], which is executed by one of the worker
205    /// threads as soon as possible. If a task computes a result, a [`Sender`][]
206    /// can be shared with the task, to send the result back to the caller,
207    /// which can then poll a [`Receiver`][].
208    ///
209    /// Note that tasks are intended to only run once, which is why they are
210    /// consumed. If a task needs to be run multiple times, it must be wrapped
211    /// in a closure that creates a new task each time. This allows for safe
212    /// sharing of state between tasks.
213    ///
214    /// [`Receiver`]: crossbeam::channel::Receiver
215    /// [`Sender`]: crossbeam::channel::Sender
216    ///
217    /// # Errors
218    ///
219    /// If the task cannot be submitted, [`Error::Submit`][] is returned, which
220    /// can only happen if the channel is disconnected or at capacity.
221    ///
222    /// [`Error::Submit`]: crate::executor::Error::Submit
223    ///
224    /// # Examples
225    ///
226    /// ```
227    /// # use std::error::Error;
228    /// # fn main() -> Result<(), Box<dyn Error>> {
229    /// use zrx_executor::strategy::{Strategy, WorkSharing};
230    ///
231    /// // Create strategy and submit task
232    /// let strategy = WorkSharing::default();
233    /// strategy.submit(Box::new(|| println!("Task")))?;
234    /// # Ok(())
235    /// # }
236    /// ```
237    fn submit(&self, task: Box<dyn Task>) -> Result {
238        match self.sender.as_ref() {
239            Some(sender) => Ok(sender.try_send(task)?),
240            None => unreachable!(),
241        }
242    }
243
244    /// Returns the number of workers.
245    ///
246    /// # Examples
247    ///
248    /// ```
249    /// use zrx_executor::strategy::{Strategy, WorkSharing};
250    ///
251    /// // Get number of workers
252    /// let strategy = WorkSharing::new(1);
253    /// assert_eq!(strategy.num_workers(), 1);
254    /// ```
255    #[inline]
256    fn num_workers(&self) -> usize {
257        self.threads.len()
258    }
259
260    /// Returns the number of running tasks.
261    ///
262    /// This method allows to monitor the worker load, as it returns how many
263    /// workers are currently actively executing tasks.
264    ///
265    /// # Examples
266    ///
267    /// ```
268    /// use zrx_executor::strategy::{Strategy, WorkSharing};
269    ///
270    /// // Get number of running tasks
271    /// let strategy = WorkSharing::default();
272    /// assert_eq!(strategy.num_tasks_running(), 0);
273    /// ```
274    #[inline]
275    fn num_tasks_running(&self) -> usize {
276        self.running.load(Ordering::Relaxed)
277    }
278
279    /// Returns the number of pending tasks.
280    ///
281    /// This method allows to throttle the submission of tasks, as it returns
282    /// how many tasks are currently waiting to be executed.
283    ///
284    /// # Examples
285    ///
286    /// ```
287    /// use zrx_executor::strategy::{Strategy, WorkSharing};
288    ///
289    /// // Get number of pending tasks
290    /// let strategy = WorkSharing::default();
291    /// assert_eq!(strategy.num_tasks_pending(), 0);
292    /// ```
293    #[inline]
294    fn num_tasks_pending(&self) -> usize {
295        self.sender.as_ref().map_or(0, Sender::len)
296    }
297
298    /// Returns the capacity, if bounded.
299    ///
300    /// This method returns the maximum number of tasks that can be submitted
301    /// at once, which can be used by the strategy for applying backpressure.
302    ///
303    /// # Examples
304    ///
305    /// ```
306    /// use zrx_executor::strategy::{Strategy, WorkSharing};
307    ///
308    /// // Get capacity
309    /// let strategy = WorkSharing::default();
310    /// assert!(strategy.capacity() >= Some(strategy.num_workers()));
311    /// ```
312    #[inline]
313    fn capacity(&self) -> Option<usize> {
314        self.sender.as_ref().and_then(Sender::capacity)
315    }
316}
317
318// ----------------------------------------------------------------------------
319
320impl Default for WorkSharing {
321    /// Creates a work-sharing execution strategy using all CPUs - 1.
322    ///
323    /// The number of workers is determined by the number of logical CPUs minus
324    /// one, which reserves one core for the main thread for orchestration. If
325    /// the number of logical CPUs is fewer than 1, the strategy defaults to a
326    /// single worker thread.
327    ///
328    /// __Warning__: this method makes use of [`thread::available_parallelism`]
329    /// to determine the number of available cores, which has some limitations.
330    /// Please refer to the documentation of that function for more details, or
331    /// consider using [`num_cpus`][] as an alternative.
332    ///
333    /// [`num_cpus`]: https://crates.io/crates/num_cpus
334    ///
335    /// # Examples
336    ///
337    /// ```
338    /// use zrx_executor::strategy::WorkSharing;
339    ///
340    /// // Create strategy
341    /// let strategy = WorkSharing::default();
342    /// ```
343    #[inline]
344    fn default() -> Self {
345        Self::new(cmp::max(
346            thread::available_parallelism()
347                .map(|num| num.get().saturating_sub(1))
348                .unwrap_or(1),
349            1,
350        ))
351    }
352}
353
354impl Drop for WorkSharing {
355    /// Terminates and joins all worker threads.
356    ///
357    /// This method waits for all worker threads to finish executing currently
358    /// running tasks, while ignoring any pending tasks. All worker threads are
359    /// joined before the method returns. This is necessary to prevent worker
360    /// threads from running after the strategy has been dropped.
361    fn drop(&mut self) {
362        // Dropping the sender causes all receivers to terminate
363        if let Some(sender) = self.sender.take() {
364            drop(sender);
365        }
366
367        // Join all worker threads without panicking on errors
368        for handle in self.threads.drain(..) {
369            let _ = handle.join();
370        }
371    }
372}
373
374// ----------------------------------------------------------------------------
375
376impl fmt::Debug for WorkSharing {
377    /// Formats the execution strategy for debugging.
378    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
379        f.debug_struct("WorkSharing")
380            .field("workers", &self.num_workers())
381            .field("running", &self.num_tasks_running())
382            .field("pending", &self.num_tasks_pending())
383            .finish()
384    }
385}