zrx_executor/
executor.rs

1// Copyright (c) Zensical LLC <https://zensical.org>
2
3// SPDX-License-Identifier: MIT
4// Third-party contributions licensed under CLA
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Executor.
27
28use std::rc::Rc;
29use std::thread;
30use std::time::Duration;
31
32mod error;
33mod signal;
34pub mod strategy;
35pub mod task;
36
37pub use error::{Error, Result};
38use strategy::{Strategy, WorkSharing};
39use task::Task;
40
41// ----------------------------------------------------------------------------
42// Structs
43// ----------------------------------------------------------------------------
44
45/// Executor.
46///
47/// Executors serve as the primary interface for submitting and monitoring tasks
48/// within the system. They act as a frontend to various execution [`Strategy`]
49/// implementations, which define how tasks are prioritized and executed. Each
50/// execution [`Strategy`] encapsulates an implementation that determines the
51/// order and concurrency of execution. Abstracting the execution mechanism
52/// allows for flexible and interchangeable task management strategies.
53///
54/// Additionally, executors implement [`Clone`], which allows to easily share
55/// them among different parts of the system without borrowing issues.
56///
57/// Note that executors are not responsible for managing the lifetime of tasks,
58/// as it is assumed that tasks are self-contained and can be run independently.
59/// If a [`Task`] is submitted to an executor, it can't be cancelled or stopped,
60/// as the executor is not aware of the task's internal state. However, callers
61/// can implement fine-grained execution strategies on top of the executor to
62/// gain fine-grained control over task execution.
63///
64/// This is an opinionated implementation that specifically targets the needs of
65/// our execution model. It is not meant to be a general-purpose executor.
66///
67/// # Examples
68///
69/// Create an executor spawning 1,000 tasks using all CPUs - 1:
70///
71/// ```
72/// # use std::error::Error;
73/// # fn main() -> Result<(), Box<dyn Error>> {
74/// use std::thread;
75/// use std::time::Duration;
76/// use zrx_executor::strategy::WorkStealing;
77/// use zrx_executor::Executor;
78///
79/// // Create executor with strategy
80/// let strategy = WorkStealing::default();
81/// let executor = Executor::new(strategy);
82///
83/// // Create 1,000 tasks taking 20ms each
84/// for _ in 0..1000 {
85///     executor.submit(|| {
86///         thread::sleep(Duration::from_millis(20));
87///     })?;
88/// }
89/// # Ok(())
90/// # }
91/// ```
92#[derive(Debug)]
93pub struct Executor<S>
94where
95    S: Strategy,
96{
97    // Execution strategy.
98    strategy: Rc<S>,
99}
100
101// ----------------------------------------------------------------------------
102// Implementations
103// ----------------------------------------------------------------------------
104
105impl<S> Executor<S>
106where
107    S: Strategy,
108{
109    /// Creates an executor.
110    ///
111    /// # Examples
112    ///
113    /// ```
114    /// use zrx_executor::strategy::WorkSharing;
115    /// use zrx_executor::Executor;
116    ///
117    /// // Create executor with strategy
118    /// let executor = Executor::new(WorkSharing::default());
119    /// ```
120    #[must_use]
121    pub fn new(strategy: S) -> Self {
122        Self { strategy: Rc::new(strategy) }
123    }
124
125    /// Submits a task.
126    ///
127    /// This method submits a [`Task`], which is executed by one of the worker
128    /// threads as soon as possible. If a task computes a result, a [`Sender`][]
129    /// can be shared with the task, to send the result back to the caller,
130    /// which can then poll a [`Receiver`][].
131    ///
132    /// Note that tasks are intended to only run once, which is why they are
133    /// consumed. If a task needs to be run multiple times, it must be wrapped
134    /// in a closure that creates a new task each time. This allows for safe
135    /// sharing of state between tasks.
136    ///
137    /// [`Receiver`]: crossbeam::channel::Receiver
138    /// [`Sender`]: crossbeam::channel::Sender
139    ///
140    /// # Errors
141    ///
142    /// If the executor encounters a problem during task submission, it will
143    /// forward the encountered error to the caller, returning the task. Most
144    /// likely, the underlying execution strategy is at capacity, which means
145    /// the caller should resubmit the task at a later time. This is possible,
146    /// since this method accepts any type that implements the [`Task`] trait
147    /// and converts it into a boxed task.
148    ///
149    /// # Examples
150    ///
151    /// Submit a task:
152    ///
153    /// ```
154    /// # use std::error::Error;
155    /// # fn main() -> Result<(), Box<dyn Error>> {
156    /// use zrx_executor::Executor;
157    ///
158    /// // Create executor and submit task
159    /// let executor = Executor::default();
160    /// executor.submit(|| println!("Task"))?;
161    /// # Ok(())
162    /// # }
163    /// ```
164    ///
165    /// Submit a task returning subtasks:
166    ///
167    /// ```
168    /// # use std::error::Error;
169    /// # fn main() -> Result<(), Box<dyn Error>> {
170    /// use zrx_executor::Executor;
171    ///
172    /// // Create executor and submit task
173    /// let executor = Executor::default();
174    /// executor.submit(|| {
175    ///     println!("Task 1");
176    ///     || {
177    ///         println!("Task 1.1");
178    ///         || {
179    ///             println!("Task 1.1.1");
180    ///         }
181    ///     }
182    /// })?;
183    /// # Ok(())
184    /// # }
185    /// ```
186    ///
187    /// Submit a task returning a task collection:
188    ///
189    /// ```
190    /// # use std::error::Error;
191    /// # fn main() -> Result<(), Box<dyn Error>> {
192    /// use zrx_executor::task::Tasks;
193    /// use zrx_executor::Executor;
194    ///
195    /// // Create executor and submit task
196    /// let executor = Executor::default();
197    /// executor.submit(|| {
198    ///     println!("Task 1");
199    ///
200    ///     // Create subtasks
201    ///     let mut tasks = Tasks::new();
202    ///     tasks.add(|| println!("Task 1.1"));
203    ///     tasks.add(|| println!("Task 1.2"));
204    ///     tasks.add(|| println!("Task 1.3"));
205    ///     tasks
206    /// })?;
207    /// # Ok(())
208    /// # }
209    /// ```
210    #[inline]
211    pub fn submit<T>(&self, task: T) -> Result
212    where
213        T: Into<Box<dyn Task>>,
214    {
215        self.strategy.submit(task.into())
216    }
217
218    /// Waits for all tasks to finish.
219    ///
220    /// This method blocks the current thread until all submitted running and
221    /// pending tasks have been completed. Calling this method is not necessary,
222    /// as it's called automatically when the executor is dropped, but it might
223    /// be helpful for testing and debugging purposes.
224    ///
225    /// # Examples
226    ///
227    /// ```
228    /// # use std::error::Error;
229    /// # fn main() -> Result<(), Box<dyn Error>> {
230    /// use std::thread;
231    /// use std::time::Duration;
232    /// use zrx_executor::strategy::WorkStealing;
233    /// use zrx_executor::Executor;
234    ///
235    /// // Create executor with strategy
236    /// let strategy = WorkStealing::default();
237    /// let executor = Executor::new(strategy);
238    ///
239    /// // Create 1,000 tasks taking 20ms each
240    /// for _ in 0..1000 {
241    ///     executor.submit(|| {
242    ///         thread::sleep(Duration::from_millis(20));
243    ///     })?;
244    /// }
245    ///
246    /// // Wait for all tasks to finish
247    /// executor.wait();
248    /// assert!(executor.is_empty());
249    /// # Ok(())
250    /// # }
251    /// ```
252    pub fn wait(&self) {
253        let duration = Duration::from_millis(10);
254        while !self.is_empty() {
255            thread::sleep(duration);
256        }
257    }
258}
259
260#[allow(clippy::must_use_candidate)]
261impl<S> Executor<S>
262where
263    S: Strategy,
264{
265    /// Returns the number of tasks.
266    ///
267    /// This method returns the total number of tasks currently managed by the
268    /// executor, which includes running as well as pending tasks.
269    ///
270    /// # Examples
271    ///
272    /// ```
273    /// use zrx_executor::Executor;
274    ///
275    /// // Get number of tasks
276    /// let executor = Executor::default();
277    /// assert_eq!(executor.len(), 0);
278    /// ```
279    #[inline]
280    pub fn len(&self) -> usize {
281        self.num_tasks_running() + self.num_tasks_pending()
282    }
283
284    /// Returns whether there are any tasks.
285    ///
286    /// This method checks whether the executor has running or pending tasks,
287    /// and if not, considers the executor as idle. It's particularly useful
288    /// for waiting until an executor has processed all tasks, which is
289    /// necessary for implementing schedulers on top of executors.
290    ///
291    /// # Examples
292    ///
293    /// ```
294    /// use zrx_executor::Executor;
295    ///
296    /// // Check whether executor is idle
297    /// let executor = Executor::default();
298    /// assert!(executor.is_empty());
299    /// ```
300    #[inline]
301    pub fn is_empty(&self) -> bool {
302        self.len() == 0
303    }
304
305    /// Returns whether the executor is saturated.
306    ///
307    /// This method checks whether the executor is at capacity, which means
308    /// task submission will fail until a worker has finished a task.
309    ///
310    /// # Examples
311    ///
312    /// ```
313    /// use zrx_executor::Executor;
314    ///
315    /// // Check whether executor is saturated
316    /// let executor = Executor::default();
317    /// assert!(!executor.is_saturated());
318    /// ```
319    #[inline]
320    pub fn is_saturated(&self) -> bool {
321        self.capacity()
322            .is_some_and(|capacity| self.num_tasks_pending() >= capacity)
323    }
324
325    /// Returns the number of workers.
326    ///
327    /// # Examples
328    ///
329    /// ```
330    /// use zrx_executor::strategy::WorkSharing;
331    /// use zrx_executor::Executor;
332    ///
333    /// // Get number of workers
334    /// let executor = Executor::new(WorkSharing::new(1));
335    /// assert_eq!(executor.num_workers(), 1);
336    /// ```
337    #[inline]
338    pub fn num_workers(&self) -> usize {
339        self.strategy.num_workers()
340    }
341
342    /// Returns the number of running tasks.
343    ///
344    /// This method allows to monitor the worker load, as it returns how many
345    /// workers are currently actively executing tasks.
346    ///
347    /// # Examples
348    ///
349    /// ```
350    /// use zrx_executor::Executor;
351    ///
352    /// // Get number of running tasks
353    /// let executor = Executor::default();
354    /// assert_eq!(executor.num_tasks_running(), 0);
355    /// ```
356    #[inline]
357    pub fn num_tasks_running(&self) -> usize {
358        self.strategy.num_tasks_running()
359    }
360
361    /// Returns the number of pending tasks.
362    ///
363    /// This method allows to throttle the submission of tasks, as it returns
364    /// how many tasks are currently waiting to be executed.
365    ///
366    /// # Examples
367    ///
368    /// ```
369    /// use zrx_executor::Executor;
370    ///
371    /// // Get number of pending tasks
372    /// let executor = Executor::default();
373    /// assert_eq!(executor.num_tasks_pending(), 0);
374    /// ```
375    #[inline]
376    pub fn num_tasks_pending(&self) -> usize {
377        self.strategy.num_tasks_pending()
378    }
379
380    /// Returns the capacity, if bounded.
381    ///
382    /// This method returns the maximum number of tasks that can be submitted
383    /// at once, which can be used by the strategy for applying backpressure.
384    ///
385    /// # Examples
386    ///
387    /// ```
388    /// use zrx_executor::Executor;
389    ///
390    /// // Get maximum number of tasks
391    /// let executor = Executor::default();
392    /// assert!(executor.capacity() >= Some(executor.num_workers()));
393    /// ```
394    #[inline]
395    pub fn capacity(&self) -> Option<usize> {
396        self.strategy.capacity()
397    }
398}
399
400// ----------------------------------------------------------------------------
401// Trait implementations
402// ----------------------------------------------------------------------------
403
404impl<S> Clone for Executor<S>
405where
406    S: Strategy,
407{
408    /// Clones the executor.
409    ///
410    /// This method creates a new executor with the same execution strategy,
411    /// which allows to share them without borrowing issues.
412    ///
413    /// # Examples
414    ///
415    /// ```
416    /// use zrx_executor::Executor;
417    ///
418    /// // Create and clone executor
419    /// let executor = Executor::default();
420    /// executor.clone();
421    /// ```
422    #[inline]
423    fn clone(&self) -> Self {
424        Self {
425            strategy: Rc::clone(&self.strategy),
426        }
427    }
428}
429
430impl Default for Executor<WorkSharing> {
431    /// Creates an executor using the default work-sharing strategy.
432    ///
433    /// # Examples
434    ///
435    /// ```
436    /// use zrx_executor::Executor;
437    ///
438    /// // Create executor
439    /// let executor = Executor::default();
440    /// ```
441    #[inline]
442    fn default() -> Self {
443        Self::new(WorkSharing::default())
444    }
445}
446
447impl<S> Drop for Executor<S>
448where
449    S: Strategy,
450{
451    /// Waits for all tasks to finish.
452    fn drop(&mut self) {
453        self.wait();
454    }
455}