Skip to main content

zrx_executor/
executor.rs

1// Copyright (c) 2025-2026 Zensical and contributors
2
3// SPDX-License-Identifier: MIT
4// All contributions are certified under the DCO
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Executor.
27
28use std::sync::Arc;
29use std::thread;
30use std::time::Duration;
31
32mod error;
33mod signal;
34pub mod strategy;
35pub mod task;
36
37pub use error::{Error, Result};
38use strategy::{Strategy, WorkSharing};
39use task::Task;
40
41// ----------------------------------------------------------------------------
42// Structs
43// ----------------------------------------------------------------------------
44
45/// Executor.
46///
47/// Executors serve as the primary interface for submitting and monitoring tasks
48/// within the system. They act as a frontend to various execution [`Strategy`]
49/// implementations, which define how tasks are prioritized and executed. Each
50/// execution [`Strategy`] encapsulates an implementation that determines the
51/// order and concurrency of execution. Abstracting the execution mechanism
52/// allows for flexible and interchangeable task management strategies.
53///
54/// Additionally, executors implement [`Clone`], which allows to easily share
55/// them among different parts of the system without borrowing issues.
56///
57/// Note that executors are not responsible for managing the lifetime of tasks,
58/// as it is assumed that tasks are self-contained and can be run independently.
59/// If a [`Task`] is submitted to an executor, it can't be cancelled or stopped,
60/// as the executor is not aware of the task's internal state. However, callers
61/// can implement fine-grained execution strategies on top of the executor to
62/// gain fine-grained control over task execution.
63///
64/// This is an opinionated implementation that specifically targets the needs of
65/// our execution model. It is not meant to be a general-purpose executor.
66///
67/// # Examples
68///
69/// Create an executor spawning 8 tasks using all CPUs - 1:
70///
71/// ```
72/// # use std::error::Error;
73/// # fn main() -> Result<(), Box<dyn Error>> {
74/// use std::thread;
75/// use std::time::Duration;
76/// use zrx_executor::Executor;
77///
78/// // Create executor
79/// let executor = Executor::default();
80///
81/// // Create tasks up to the executor's capacity
82/// for _ in 0..executor.capacity() {
83///     executor.submit(|| {
84///         thread::sleep(Duration::from_millis(20));
85///     })?;
86/// }
87/// # Ok(())
88/// # }
89/// ```
90#[derive(Debug)]
91pub struct Executor<S>
92where
93    S: Strategy,
94{
95    // Execution strategy.
96    strategy: Arc<S>,
97}
98
99// ----------------------------------------------------------------------------
100// Implementations
101// ----------------------------------------------------------------------------
102
103impl<S> Executor<S>
104where
105    S: Strategy,
106{
107    /// Creates an executor.
108    ///
109    /// # Examples
110    ///
111    /// ```
112    /// use zrx_executor::strategy::WorkSharing;
113    /// use zrx_executor::Executor;
114    ///
115    /// // Create executor with strategy
116    /// let executor = Executor::new(WorkSharing::default());
117    /// ```
118    #[must_use]
119    pub fn new(strategy: S) -> Self {
120        Self { strategy: Arc::new(strategy) }
121    }
122
123    /// Submits a task.
124    ///
125    /// This method submits a [`Task`], which is executed by one of the worker
126    /// threads as soon as possible. If a task computes a result, a [`Sender`][]
127    /// can be shared with the task, to send the result back to the caller,
128    /// which can then poll a [`Receiver`][].
129    ///
130    /// Note that tasks are intended to only run once, which is why they are
131    /// consumed. If a task needs to be run multiple times, it must be wrapped
132    /// in a closure that creates a new task each time. This allows for safe
133    /// sharing of state between tasks.
134    ///
135    /// [`Receiver`]: crossbeam::channel::Receiver
136    /// [`Sender`]: crossbeam::channel::Sender
137    ///
138    /// # Errors
139    ///
140    /// If the executor encounters a problem during task submission, it will
141    /// forward the encountered error to the caller, returning the task. Most
142    /// likely, the underlying execution strategy is at capacity, which means
143    /// the caller should resubmit the task at a later time. This is possible,
144    /// since this method accepts any type that implements the [`Task`] trait
145    /// and converts it into a boxed task.
146    ///
147    /// # Examples
148    ///
149    /// Submit a task:
150    ///
151    /// ```
152    /// # use std::error::Error;
153    /// # fn main() -> Result<(), Box<dyn Error>> {
154    /// use zrx_executor::Executor;
155    ///
156    /// // Create executor and submit task
157    /// let executor = Executor::default();
158    /// executor.submit(|| println!("Task"))?;
159    /// # Ok(())
160    /// # }
161    /// ```
162    ///
163    /// Submit a task returning subtasks:
164    ///
165    /// ```
166    /// # use std::error::Error;
167    /// # fn main() -> Result<(), Box<dyn Error>> {
168    /// use zrx_executor::Executor;
169    ///
170    /// // Create executor and submit task
171    /// let executor = Executor::default();
172    /// executor.submit(|| {
173    ///     println!("Task 1");
174    ///     || {
175    ///         println!("Task 1.1");
176    ///         || {
177    ///             println!("Task 1.1.1");
178    ///         }
179    ///     }
180    /// })?;
181    /// # Ok(())
182    /// # }
183    /// ```
184    ///
185    /// Submit a task returning a task collection:
186    ///
187    /// ```
188    /// # use std::error::Error;
189    /// # fn main() -> Result<(), Box<dyn Error>> {
190    /// use zrx_executor::task::Tasks;
191    /// use zrx_executor::Executor;
192    ///
193    /// // Create executor and submit task
194    /// let executor = Executor::default();
195    /// executor.submit(|| {
196    ///     println!("Task 1");
197    ///
198    ///     // Create subtasks
199    ///     let mut tasks = Tasks::new();
200    ///     tasks.add(|| println!("Task 1.1"));
201    ///     tasks.add(|| println!("Task 1.2"));
202    ///     tasks.add(|| println!("Task 1.3"));
203    ///     tasks
204    /// })?;
205    /// # Ok(())
206    /// # }
207    /// ```
208    #[inline]
209    pub fn submit<T>(&self, task: T) -> Result
210    where
211        T: Into<Box<dyn Task>>,
212    {
213        self.strategy.submit(task.into())
214    }
215
216    /// Waits for all tasks to finish.
217    ///
218    /// This method blocks the current thread until all submitted running and
219    /// pending tasks have been completed. Calling this method is not necessary,
220    /// as it's called automatically when the executor is dropped, but it might
221    /// be helpful for testing and debugging purposes.
222    ///
223    /// # Examples
224    ///
225    /// ```
226    /// # use std::error::Error;
227    /// # fn main() -> Result<(), Box<dyn Error>> {
228    /// use std::thread;
229    /// use std::time::Duration;
230    /// use zrx_executor::Executor;
231    ///
232    /// // Create executor
233    /// let executor = Executor::default();
234    ///
235    /// // Create tasks up to the executor's capacity
236    /// for _ in 0..executor.capacity() {
237    ///     executor.submit(|| {
238    ///         thread::sleep(Duration::from_millis(20));
239    ///     })?;
240    /// }
241    ///
242    /// // Wait for all tasks to finish
243    /// executor.wait();
244    /// assert!(executor.is_empty());
245    /// # Ok(())
246    /// # }
247    /// ```
248    pub fn wait(&self) {
249        let duration = Duration::from_millis(10);
250        while !self.is_empty() {
251            thread::sleep(duration);
252        }
253    }
254}
255
256#[allow(clippy::must_use_candidate)]
257impl<S> Executor<S>
258where
259    S: Strategy,
260{
261    /// Returns the number of tasks.
262    ///
263    /// This method returns the total number of tasks currently managed by the
264    /// executor, which includes running as well as pending tasks.
265    ///
266    /// # Examples
267    ///
268    /// ```
269    /// use zrx_executor::Executor;
270    ///
271    /// // Get number of tasks
272    /// let executor = Executor::default();
273    /// assert_eq!(executor.len(), 0);
274    /// ```
275    #[inline]
276    pub fn len(&self) -> usize {
277        self.num_tasks_running() + self.num_tasks_pending()
278    }
279
280    /// Returns whether there are any tasks.
281    ///
282    /// This method checks whether the executor has running or pending tasks,
283    /// and if not, considers the executor as idle. It's particularly useful
284    /// for waiting until an executor has processed all tasks, which is
285    /// necessary for implementing schedulers on top of executors.
286    ///
287    /// # Examples
288    ///
289    /// ```
290    /// use zrx_executor::Executor;
291    ///
292    /// // Check whether executor is idle
293    /// let executor = Executor::default();
294    /// assert!(executor.is_empty());
295    /// ```
296    #[inline]
297    pub fn is_empty(&self) -> bool {
298        self.len() == 0
299    }
300
301    /// Returns whether the executor is saturated.
302    ///
303    /// This method checks whether the executor is at capacity, which means
304    /// task submission will fail until a worker has finished a task.
305    ///
306    /// # Examples
307    ///
308    /// ```
309    /// use zrx_executor::Executor;
310    ///
311    /// // Check whether executor is saturated
312    /// let executor = Executor::default();
313    /// assert!(!executor.is_saturated());
314    /// ```
315    #[inline]
316    pub fn is_saturated(&self) -> bool {
317        self.num_tasks_pending() >= self.capacity()
318    }
319
320    /// Returns the number of workers.
321    ///
322    /// # Examples
323    ///
324    /// ```
325    /// use zrx_executor::strategy::WorkSharing;
326    /// use zrx_executor::Executor;
327    ///
328    /// // Get number of workers
329    /// let executor = Executor::new(WorkSharing::new(1));
330    /// assert_eq!(executor.num_workers(), 1);
331    /// ```
332    #[inline]
333    pub fn num_workers(&self) -> usize {
334        self.strategy.num_workers()
335    }
336
337    /// Returns the number of running tasks.
338    ///
339    /// This method allows to monitor the worker load, as it returns how many
340    /// workers are currently actively executing tasks.
341    ///
342    /// # Examples
343    ///
344    /// ```
345    /// use zrx_executor::Executor;
346    ///
347    /// // Get number of running tasks
348    /// let executor = Executor::default();
349    /// assert_eq!(executor.num_tasks_running(), 0);
350    /// ```
351    #[inline]
352    pub fn num_tasks_running(&self) -> usize {
353        self.strategy.num_tasks_running()
354    }
355
356    /// Returns the number of pending tasks.
357    ///
358    /// This method allows to throttle the submission of tasks, as it returns
359    /// how many tasks are currently waiting to be executed.
360    ///
361    /// # Examples
362    ///
363    /// ```
364    /// use zrx_executor::Executor;
365    ///
366    /// // Get number of pending tasks
367    /// let executor = Executor::default();
368    /// assert_eq!(executor.num_tasks_pending(), 0);
369    /// ```
370    #[inline]
371    pub fn num_tasks_pending(&self) -> usize {
372        self.strategy.num_tasks_pending()
373    }
374
375    /// Returns the capacity.
376    ///
377    /// This method returns the maximum number of tasks that can be submitted
378    /// at once, which can be used by the strategy for applying backpressure.
379    ///
380    /// # Examples
381    ///
382    /// ```
383    /// use zrx_executor::Executor;
384    ///
385    /// // Get maximum number of tasks
386    /// let executor = Executor::default();
387    /// assert!(executor.capacity() >= executor.num_workers());
388    /// ```
389    #[inline]
390    pub fn capacity(&self) -> usize {
391        self.strategy.capacity()
392    }
393}
394
395// ----------------------------------------------------------------------------
396// Trait implementations
397// ----------------------------------------------------------------------------
398
399impl<S> Clone for Executor<S>
400where
401    S: Strategy,
402{
403    /// Clones the executor.
404    ///
405    /// This method creates a new executor with the same execution strategy,
406    /// which allows to share them without borrowing issues.
407    ///
408    /// # Examples
409    ///
410    /// ```
411    /// use zrx_executor::Executor;
412    ///
413    /// // Create and clone executor
414    /// let executor = Executor::default();
415    /// executor.clone();
416    /// ```
417    #[inline]
418    fn clone(&self) -> Self {
419        Self {
420            strategy: Arc::clone(&self.strategy),
421        }
422    }
423}
424
425impl Default for Executor<WorkSharing> {
426    /// Creates an executor using the default work-sharing strategy.
427    ///
428    /// # Examples
429    ///
430    /// ```
431    /// use zrx_executor::Executor;
432    ///
433    /// // Create executor
434    /// let executor = Executor::default();
435    /// ```
436    #[inline]
437    fn default() -> Self {
438        Self::new(WorkSharing::default())
439    }
440}
441
442impl<S> Drop for Executor<S>
443where
444    S: Strategy,
445{
446    /// Waits for all tasks to finish.
447    fn drop(&mut self) {
448        self.wait();
449    }
450}