Skip to main content

zrx_executor/executor/strategy/worker/
sharing.rs

1// Copyright (c) 2025-2026 Zensical and contributors
2
3// SPDX-License-Identifier: MIT
4// All contributions are certified under the DCO
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Work-sharing execution strategy.
27
28use crossbeam::channel::{bounded, Sender};
29use std::fmt::{self, Debug};
30use std::sync::atomic::{AtomicUsize, Ordering};
31use std::sync::Arc;
32use std::thread::{self, Builder, JoinHandle};
33use std::{cmp, panic};
34
35use crate::executor::strategy::Strategy;
36use crate::executor::task::Task;
37use crate::executor::Result;
38
39// ----------------------------------------------------------------------------
40// Structs
41// ----------------------------------------------------------------------------
42
43/// Work-sharing execution strategy.
44///
45/// This strategy manages its tasks centrally in a single bounded [`crossbeam`]
46/// channel, which pull tasks from it and execute them, repeating the process
47/// until they are terminated. This is a very simple, yet reasonably efficient
48/// strategy in most cases.
49///
50/// Tasks are processed in the exact same order they were submitted, albeit they
51/// might not finish in the same order. As this strategy uses a bounded channel,
52/// task submission might fail when the channel's capacity is reached, leading
53/// to better performance characteristics due to the use of atomics over locks.
54/// As an alternative, the [`WorkStealing`][] strategy can be used, which is
55/// built on unbounded channels and allows for more flexible task submission,
56/// including automatic load balancing between workers which is particularly
57/// useful when tasks create subtasks.
58///
59/// [`WorkStealing`]: crate::executor::strategy::WorkStealing
60///
61/// # Examples
62///
63/// ```
64/// # use std::error::Error;
65/// # fn main() -> Result<(), Box<dyn Error>> {
66/// use zrx_executor::strategy::{Strategy, WorkSharing};
67///
68/// // Create strategy and submit task
69/// let strategy = WorkSharing::default();
70/// strategy.submit(Box::new(|| println!("Task")))?;
71/// # Ok(())
72/// # }
73/// ```
74pub struct WorkSharing {
75    /// Task submission sender.
76    sender: Option<Sender<Box<dyn Task>>>,
77    /// Join handles of worker threads.
78    threads: Vec<JoinHandle<()>>,
79    /// Counter for pending tasks.
80    pending: Arc<AtomicUsize>,
81}
82
83// ----------------------------------------------------------------------------
84// Implementations
85// ----------------------------------------------------------------------------
86
87impl WorkSharing {
88    /// Creates a work-sharing execution strategy.
89    ///
90    /// This method creates a strategy with the given number of worker threads,
91    /// which are spawned immediately before the method returns. Internally, a
92    /// bounded channel is created with a capacity of 8 tasks per worker, so
93    /// for 4 workers, the channel will have a capacity of 32 tasks.
94    ///
95    /// Use [`WorkSharing::with_capacity`] to set a custom capacity.
96    ///
97    /// # Panics
98    ///
99    /// Panics if thread creation fails.
100    ///
101    /// # Examples
102    ///
103    /// ```
104    /// use zrx_executor::strategy::WorkSharing;
105    ///
106    /// // Create strategy
107    /// let strategy = WorkSharing::new(4);
108    /// ```
109    #[must_use]
110    pub fn new(num_workers: usize) -> Self {
111        Self::with_capacity(num_workers, 8 * num_workers)
112    }
113
114    /// Creates a work-sharing execution strategy with the given capacity.
115    ///
116    /// This method creates a strategy with the given number of worker threads,
117    /// which are spawned immediately before the method returns.
118    ///
119    /// This strategy makes use of a bounded channel for its better performance
120    /// characteristics, since the caller is expected to have control over task
121    /// submission, ensuring that the executor can accept new tasks. The given
122    /// capacity sets the number of tasks the executor accepts before starting
123    /// to reject them, which can be used to apply backpressure. Note that the
124    /// capacity is not a per-worker, but a global per-executor limit.
125    ///
126    /// # Panics
127    ///
128    /// Panics if thread creation fails.
129    ///
130    /// # Examples
131    ///
132    /// ```
133    /// use zrx_executor::strategy::WorkSharing;
134    ///
135    /// // Create strategy with capacity
136    /// let strategy = WorkSharing::with_capacity(4, 64);
137    /// ```
138    #[must_use]
139    pub fn with_capacity(num_workers: usize, capacity: usize) -> Self {
140        let (sender, receiver) = bounded::<Box<dyn Task>>(capacity);
141
142        // Keep track of pending tasks
143        let pending = Arc::new(AtomicUsize::new(0));
144
145        // Initialize worker threads
146        let iter = (0..num_workers).map(|index| {
147            let receiver = receiver.clone();
148
149            // Create worker thread and poll the receiver until the sender is
150            // dropped, automatically exiting the loop. Additionally, we keep
151            // track of the number of pending tasks to provide a simple way to
152            // monitor the load of the thread pool.
153            let pending = Arc::clone(&pending);
154            let h = move || {
155                while let Ok(task) = receiver.recv() {
156                    // Execute task and immediately execute all subtasks on the
157                    // same worker, if any, as the work-sharing strategy has no
158                    // means of distributing work to other workers threads. We
159                    // also keep the pending count due to sequential execution,
160                    // and catch panics, as we're running user-land code that
161                    // might be sloppy. However, since the executor has no way
162                    // of reporting panics, tasks should wrap execution as we
163                    // do here, and abort with a proper error.
164                    let _ = panic::catch_unwind(|| {
165                        let subtasks = task.execute();
166                        if !subtasks.is_empty() {
167                            // Execution is recursive, so in case a subtask has
168                            // further subtasks, they are executed depth-first
169                            subtasks.execute();
170                        }
171                    });
172
173                    // Update number of pending tasks
174                    pending.fetch_sub(1, Ordering::Acquire);
175                }
176            };
177
178            // We deliberately use unwrap here, as the capability to spawn
179            // threads is a fundamental requirement of the executor
180            Builder::new()
181                .name(format!("zrx/executor/{}", index + 1))
182                .spawn(h)
183                .unwrap()
184        });
185
186        // Create worker threads and return strategy
187        let threads = iter.collect();
188        Self {
189            sender: Some(sender),
190            threads,
191            pending,
192        }
193    }
194}
195
196// ----------------------------------------------------------------------------
197// Trait implementations
198// ----------------------------------------------------------------------------
199
200impl Strategy for WorkSharing {
201    /// Submits a task.
202    ///
203    /// This method submits a [`Task`], which is executed by one of the worker
204    /// threads as soon as possible. If a task computes a result, a [`Sender`][]
205    /// can be shared with the task, to send the result back to the caller,
206    /// which can then poll a [`Receiver`][].
207    ///
208    /// Note that tasks are intended to only run once, which is why they are
209    /// consumed. If a task needs to be run multiple times, it must be wrapped
210    /// in a closure that creates a new task each time. This allows for safe
211    /// sharing of state between tasks.
212    ///
213    /// [`Receiver`]: crossbeam::channel::Receiver
214    /// [`Sender`]: crossbeam::channel::Sender
215    ///
216    /// # Errors
217    ///
218    /// If the task cannot be submitted, [`Error::Submit`][] is returned, which
219    /// can only happen if the channel is disconnected or at capacity.
220    ///
221    /// [`Error::Submit`]: crate::executor::Error::Submit
222    ///
223    /// # Examples
224    ///
225    /// ```
226    /// # use std::error::Error;
227    /// # fn main() -> Result<(), Box<dyn Error>> {
228    /// use zrx_executor::strategy::{Strategy, WorkSharing};
229    ///
230    /// // Create strategy and submit task
231    /// let strategy = WorkSharing::default();
232    /// strategy.submit(Box::new(|| println!("Task")))?;
233    /// # Ok(())
234    /// # }
235    /// ```
236    fn submit(&self, task: Box<dyn Task>) -> Result {
237        // We track the total number of pending tasks, and when queried for the
238        // number of running tasks, compute those dynamically. Otherwise, there
239        // is the possibility of a race condition, where the task is acquired
240        // by a worker before the running count can be incremented.
241        self.pending.fetch_add(1, Ordering::Release);
242
243        // Submit the task to the sender
244        match self.sender.as_ref() {
245            Some(sender) => Ok(sender.try_send(task)?),
246            None => unreachable!(),
247        }
248    }
249
250    /// Returns the number of workers.
251    ///
252    /// # Examples
253    ///
254    /// ```
255    /// use zrx_executor::strategy::{Strategy, WorkSharing};
256    ///
257    /// // Get number of workers
258    /// let strategy = WorkSharing::new(1);
259    /// assert_eq!(strategy.num_workers(), 1);
260    /// ```
261    #[inline]
262    fn num_workers(&self) -> usize {
263        self.threads.len()
264    }
265
266    /// Returns the number of running tasks.
267    ///
268    /// This method allows to monitor the worker load, as it returns how many
269    /// workers are currently actively executing tasks.
270    ///
271    /// # Examples
272    ///
273    /// ```
274    /// use zrx_executor::strategy::{Strategy, WorkSharing};
275    ///
276    /// // Get number of running tasks
277    /// let strategy = WorkSharing::default();
278    /// assert_eq!(strategy.num_tasks_running(), 0);
279    /// ```
280    #[inline]
281    fn num_tasks_running(&self) -> usize {
282        self.num_tasks_pending() - self.sender.as_ref().map_or(0, Sender::len)
283    }
284
285    /// Returns the number of pending tasks.
286    ///
287    /// This method allows to throttle the submission of tasks, as it returns
288    /// how many tasks are currently waiting to be executed.
289    ///
290    /// # Examples
291    ///
292    /// ```
293    /// use zrx_executor::strategy::{Strategy, WorkSharing};
294    ///
295    /// // Get number of pending tasks
296    /// let strategy = WorkSharing::default();
297    /// assert_eq!(strategy.num_tasks_pending(), 0);
298    /// ```
299    #[inline]
300    fn num_tasks_pending(&self) -> usize {
301        self.pending.load(Ordering::Relaxed)
302    }
303
304    /// Returns the capacity.
305    ///
306    /// This method returns the maximum number of tasks that can be submitted
307    /// at once, which can be used by the strategy for applying backpressure.
308    ///
309    /// # Examples
310    ///
311    /// ```
312    /// use zrx_executor::strategy::{Strategy, WorkSharing};
313    ///
314    /// // Get capacity
315    /// let strategy = WorkSharing::default();
316    /// assert!(strategy.capacity() >= strategy.num_workers());
317    /// ```
318    #[inline]
319    fn capacity(&self) -> usize {
320        self.sender
321            .as_ref()
322            .and_then(Sender::capacity)
323            .unwrap_or_default()
324    }
325}
326
327// ----------------------------------------------------------------------------
328
329impl Default for WorkSharing {
330    /// Creates a work-sharing execution strategy using all CPUs - 1.
331    ///
332    /// The number of workers is determined by the number of logical CPUs minus
333    /// one, which reserves one core for the main thread for orchestration. If
334    /// the number of logical CPUs is fewer than 1, the strategy defaults to a
335    /// single worker thread.
336    ///
337    /// __Warning__: this method makes use of [`thread::available_parallelism`]
338    /// to determine the number of available cores, which has some limitations.
339    /// Please refer to the documentation of that function for more details, or
340    /// consider using [`num_cpus`][] as an alternative.
341    ///
342    /// [`num_cpus`]: https://crates.io/crates/num_cpus
343    ///
344    /// # Examples
345    ///
346    /// ```
347    /// use zrx_executor::strategy::WorkSharing;
348    ///
349    /// // Create strategy
350    /// let strategy = WorkSharing::default();
351    /// ```
352    #[inline]
353    fn default() -> Self {
354        Self::new(cmp::max(
355            thread::available_parallelism()
356                .map(|num| num.get().saturating_sub(1))
357                .unwrap_or(1),
358            1,
359        ))
360    }
361}
362
363impl Drop for WorkSharing {
364    /// Terminates and joins all worker threads.
365    ///
366    /// This method waits for all worker threads to finish executing currently
367    /// running tasks, while ignoring any pending tasks. All worker threads are
368    /// joined before the method returns. This is necessary to prevent worker
369    /// threads from running after the strategy has been dropped.
370    fn drop(&mut self) {
371        // Dropping the sender causes all receivers to terminate
372        if let Some(sender) = self.sender.take() {
373            drop(sender);
374        }
375
376        // Join all worker threads without panicking on errors
377        for handle in self.threads.drain(..) {
378            let _ = handle.join();
379        }
380    }
381}
382
383// ----------------------------------------------------------------------------
384
385impl Debug for WorkSharing {
386    /// Formats the execution strategy for debugging.
387    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
388        f.debug_struct("WorkSharing")
389            .field("workers", &self.num_workers())
390            .field("running", &self.num_tasks_running())
391            .field("pending", &self.num_tasks_pending())
392            .finish()
393    }
394}