zrx_executor/executor/strategy/worker/sharing.rs
1// Copyright (c) 2025-2026 Zensical and contributors
2
3// SPDX-License-Identifier: MIT
4// All contributions are certified under the DCO
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Work-sharing execution strategy.
27
28use crossbeam::channel::{bounded, Sender};
29use std::fmt::{self, Debug};
30use std::sync::atomic::{AtomicUsize, Ordering};
31use std::sync::Arc;
32use std::thread::{self, Builder, JoinHandle};
33use std::{cmp, panic};
34
35use crate::executor::strategy::Strategy;
36use crate::executor::task::Task;
37use crate::executor::Result;
38
39// ----------------------------------------------------------------------------
40// Structs
41// ----------------------------------------------------------------------------
42
43/// Work-sharing execution strategy.
44///
45/// This strategy manages its tasks centrally in a single bounded [`crossbeam`]
46/// channel, which pull tasks from it and execute them, repeating the process
47/// until they are terminated. This is a very simple, yet reasonably efficient
48/// strategy in most cases.
49///
50/// Tasks are processed in the exact same order they were submitted, albeit they
51/// might not finish in the same order. As this strategy uses a bounded channel,
52/// task submission might fail when the channel's capacity is reached, leading
53/// to better performance characteristics due to the use of atomics over locks.
54/// As an alternative, the [`WorkStealing`][] strategy can be used, which is
55/// built on unbounded channels and allows for more flexible task submission,
56/// including automatic load balancing between workers which is particularly
57/// useful when tasks create subtasks.
58///
59/// [`WorkStealing`]: crate::executor::strategy::WorkStealing
60///
61/// # Examples
62///
63/// ```
64/// # use std::error::Error;
65/// # fn main() -> Result<(), Box<dyn Error>> {
66/// use zrx_executor::strategy::{Strategy, WorkSharing};
67///
68/// // Create strategy and submit task
69/// let strategy = WorkSharing::default();
70/// strategy.submit(Box::new(|| println!("Task")))?;
71/// # Ok(())
72/// # }
73/// ```
74pub struct WorkSharing {
75 /// Task submission sender.
76 sender: Option<Sender<Box<dyn Task>>>,
77 /// Join handles of worker threads.
78 threads: Vec<JoinHandle<()>>,
79 /// Counter for pending tasks.
80 pending: Arc<AtomicUsize>,
81}
82
83// ----------------------------------------------------------------------------
84// Implementations
85// ----------------------------------------------------------------------------
86
87impl WorkSharing {
88 /// Creates a work-sharing execution strategy.
89 ///
90 /// This method creates a strategy with the given number of worker threads,
91 /// which are spawned immediately before the method returns. Internally, a
92 /// bounded channel is created with a capacity of 8 tasks per worker, so
93 /// for 4 workers, the channel will have a capacity of 32 tasks.
94 ///
95 /// Use [`WorkSharing::with_capacity`] to set a custom capacity.
96 ///
97 /// # Panics
98 ///
99 /// Panics if thread creation fails.
100 ///
101 /// # Examples
102 ///
103 /// ```
104 /// use zrx_executor::strategy::WorkSharing;
105 ///
106 /// // Create strategy
107 /// let strategy = WorkSharing::new(4);
108 /// ```
109 #[must_use]
110 pub fn new(num_workers: usize) -> Self {
111 Self::with_capacity(num_workers, 8 * num_workers)
112 }
113
114 /// Creates a work-sharing execution strategy with the given capacity.
115 ///
116 /// This method creates a strategy with the given number of worker threads,
117 /// which are spawned immediately before the method returns.
118 ///
119 /// This strategy makes use of a bounded channel for its better performance
120 /// characteristics, since the caller is expected to have control over task
121 /// submission, ensuring that the executor can accept new tasks. The given
122 /// capacity sets the number of tasks the executor accepts before starting
123 /// to reject them, which can be used to apply backpressure. Note that the
124 /// capacity is not a per-worker, but a global per-executor limit.
125 ///
126 /// # Panics
127 ///
128 /// Panics if thread creation fails.
129 ///
130 /// # Examples
131 ///
132 /// ```
133 /// use zrx_executor::strategy::WorkSharing;
134 ///
135 /// // Create strategy with capacity
136 /// let strategy = WorkSharing::with_capacity(4, 64);
137 /// ```
138 #[must_use]
139 pub fn with_capacity(num_workers: usize, capacity: usize) -> Self {
140 let (sender, receiver) = bounded::<Box<dyn Task>>(capacity);
141
142 // Keep track of pending tasks
143 let pending = Arc::new(AtomicUsize::new(0));
144
145 // Initialize worker threads
146 let iter = (0..num_workers).map(|index| {
147 let receiver = receiver.clone();
148
149 // Create worker thread and poll the receiver until the sender is
150 // dropped, automatically exiting the loop. Additionally, we keep
151 // track of the number of pending tasks to provide a simple way to
152 // monitor the load of the thread pool.
153 let pending = Arc::clone(&pending);
154 let h = move || {
155 while let Ok(task) = receiver.recv() {
156 // Execute task and immediately execute all subtasks on the
157 // same worker, if any, as the work-sharing strategy has no
158 // means of distributing work to other workers threads. We
159 // also keep the pending count due to sequential execution,
160 // and catch panics, as we're running user-land code that
161 // might be sloppy. However, since the executor has no way
162 // of reporting panics, tasks should wrap execution as we
163 // do here, and abort with a proper error.
164 let _ = panic::catch_unwind(|| {
165 let subtasks = task.execute();
166 if !subtasks.is_empty() {
167 // Execution is recursive, so in case a subtask has
168 // further subtasks, they are executed depth-first
169 subtasks.execute();
170 }
171 });
172
173 // Update number of pending tasks
174 pending.fetch_sub(1, Ordering::Acquire);
175 }
176 };
177
178 // We deliberately use unwrap here, as the capability to spawn
179 // threads is a fundamental requirement of the executor
180 Builder::new()
181 .name(format!("zrx/executor/{}", index + 1))
182 .spawn(h)
183 .unwrap()
184 });
185
186 // Create worker threads and return strategy
187 let threads = iter.collect();
188 Self {
189 sender: Some(sender),
190 threads,
191 pending,
192 }
193 }
194}
195
196// ----------------------------------------------------------------------------
197// Trait implementations
198// ----------------------------------------------------------------------------
199
200impl Strategy for WorkSharing {
201 /// Submits a task.
202 ///
203 /// This method submits a [`Task`], which is executed by one of the worker
204 /// threads as soon as possible. If a task computes a result, a [`Sender`][]
205 /// can be shared with the task, to send the result back to the caller,
206 /// which can then poll a [`Receiver`][].
207 ///
208 /// Note that tasks are intended to only run once, which is why they are
209 /// consumed. If a task needs to be run multiple times, it must be wrapped
210 /// in a closure that creates a new task each time. This allows for safe
211 /// sharing of state between tasks.
212 ///
213 /// [`Receiver`]: crossbeam::channel::Receiver
214 /// [`Sender`]: crossbeam::channel::Sender
215 ///
216 /// # Errors
217 ///
218 /// If the task cannot be submitted, [`Error::Submit`][] is returned, which
219 /// can only happen if the channel is disconnected or at capacity.
220 ///
221 /// [`Error::Submit`]: crate::executor::Error::Submit
222 ///
223 /// # Examples
224 ///
225 /// ```
226 /// # use std::error::Error;
227 /// # fn main() -> Result<(), Box<dyn Error>> {
228 /// use zrx_executor::strategy::{Strategy, WorkSharing};
229 ///
230 /// // Create strategy and submit task
231 /// let strategy = WorkSharing::default();
232 /// strategy.submit(Box::new(|| println!("Task")))?;
233 /// # Ok(())
234 /// # }
235 /// ```
236 fn submit(&self, task: Box<dyn Task>) -> Result {
237 // We track the total number of pending tasks, and when queried for the
238 // number of running tasks, compute those dynamically. Otherwise, there
239 // is the possibility of a race condition, where the task is acquired
240 // by a worker before the running count can be incremented.
241 self.pending.fetch_add(1, Ordering::Release);
242
243 // Submit the task to the sender
244 match self.sender.as_ref() {
245 Some(sender) => Ok(sender.try_send(task)?),
246 None => unreachable!(),
247 }
248 }
249
250 /// Returns the number of workers.
251 ///
252 /// # Examples
253 ///
254 /// ```
255 /// use zrx_executor::strategy::{Strategy, WorkSharing};
256 ///
257 /// // Get number of workers
258 /// let strategy = WorkSharing::new(1);
259 /// assert_eq!(strategy.num_workers(), 1);
260 /// ```
261 #[inline]
262 fn num_workers(&self) -> usize {
263 self.threads.len()
264 }
265
266 /// Returns the number of running tasks.
267 ///
268 /// This method allows to monitor the worker load, as it returns how many
269 /// workers are currently actively executing tasks.
270 ///
271 /// # Examples
272 ///
273 /// ```
274 /// use zrx_executor::strategy::{Strategy, WorkSharing};
275 ///
276 /// // Get number of running tasks
277 /// let strategy = WorkSharing::default();
278 /// assert_eq!(strategy.num_tasks_running(), 0);
279 /// ```
280 #[inline]
281 fn num_tasks_running(&self) -> usize {
282 self.num_tasks_pending() - self.sender.as_ref().map_or(0, Sender::len)
283 }
284
285 /// Returns the number of pending tasks.
286 ///
287 /// This method allows to throttle the submission of tasks, as it returns
288 /// how many tasks are currently waiting to be executed.
289 ///
290 /// # Examples
291 ///
292 /// ```
293 /// use zrx_executor::strategy::{Strategy, WorkSharing};
294 ///
295 /// // Get number of pending tasks
296 /// let strategy = WorkSharing::default();
297 /// assert_eq!(strategy.num_tasks_pending(), 0);
298 /// ```
299 #[inline]
300 fn num_tasks_pending(&self) -> usize {
301 self.pending.load(Ordering::Relaxed)
302 }
303
304 /// Returns the capacity.
305 ///
306 /// This method returns the maximum number of tasks that can be submitted
307 /// at once, which can be used by the strategy for applying backpressure.
308 ///
309 /// # Examples
310 ///
311 /// ```
312 /// use zrx_executor::strategy::{Strategy, WorkSharing};
313 ///
314 /// // Get capacity
315 /// let strategy = WorkSharing::default();
316 /// assert!(strategy.capacity() >= strategy.num_workers());
317 /// ```
318 #[inline]
319 fn capacity(&self) -> usize {
320 self.sender
321 .as_ref()
322 .and_then(Sender::capacity)
323 .unwrap_or_default()
324 }
325}
326
327// ----------------------------------------------------------------------------
328
329impl Default for WorkSharing {
330 /// Creates a work-sharing execution strategy using all CPUs - 1.
331 ///
332 /// The number of workers is determined by the number of logical CPUs minus
333 /// one, which reserves one core for the main thread for orchestration. If
334 /// the number of logical CPUs is fewer than 1, the strategy defaults to a
335 /// single worker thread.
336 ///
337 /// __Warning__: this method makes use of [`thread::available_parallelism`]
338 /// to determine the number of available cores, which has some limitations.
339 /// Please refer to the documentation of that function for more details, or
340 /// consider using [`num_cpus`][] as an alternative.
341 ///
342 /// [`num_cpus`]: https://crates.io/crates/num_cpus
343 ///
344 /// # Examples
345 ///
346 /// ```
347 /// use zrx_executor::strategy::WorkSharing;
348 ///
349 /// // Create strategy
350 /// let strategy = WorkSharing::default();
351 /// ```
352 #[inline]
353 fn default() -> Self {
354 Self::new(cmp::max(
355 thread::available_parallelism()
356 .map(|num| num.get().saturating_sub(1))
357 .unwrap_or(1),
358 1,
359 ))
360 }
361}
362
363impl Drop for WorkSharing {
364 /// Terminates and joins all worker threads.
365 ///
366 /// This method waits for all worker threads to finish executing currently
367 /// running tasks, while ignoring any pending tasks. All worker threads are
368 /// joined before the method returns. This is necessary to prevent worker
369 /// threads from running after the strategy has been dropped.
370 fn drop(&mut self) {
371 // Dropping the sender causes all receivers to terminate
372 if let Some(sender) = self.sender.take() {
373 drop(sender);
374 }
375
376 // Join all worker threads without panicking on errors
377 for handle in self.threads.drain(..) {
378 let _ = handle.join();
379 }
380 }
381}
382
383// ----------------------------------------------------------------------------
384
385impl Debug for WorkSharing {
386 /// Formats the execution strategy for debugging.
387 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
388 f.debug_struct("WorkSharing")
389 .field("workers", &self.num_workers())
390 .field("running", &self.num_tasks_running())
391 .field("pending", &self.num_tasks_pending())
392 .finish()
393 }
394}