zrx_executor/executor.rs
1// Copyright (c) Zensical LLC <https://zensical.org>
2
3// SPDX-License-Identifier: MIT
4// Third-party contributions licensed under CLA
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Executor.
27
28use std::rc::Rc;
29use std::thread;
30use std::time::Duration;
31
32mod error;
33mod signal;
34pub mod strategy;
35pub mod task;
36
37pub use error::{Error, Result};
38use strategy::{Strategy, WorkSharing};
39use task::Task;
40
41// ----------------------------------------------------------------------------
42// Structs
43// ----------------------------------------------------------------------------
44
45/// Executor.
46///
47/// Executors serve as the primary interface for submitting and monitoring tasks
48/// within the system. They act as a frontend to various execution [`Strategy`]
49/// implementations, which define how tasks are prioritized and executed. Each
50/// execution [`Strategy`] encapsulates an implementation that determines the
51/// order and concurrency of execution. Abstracting the execution mechanism
52/// allows for flexible and interchangeable task management strategies.
53///
54/// Additionally, executors implement [`Clone`], which allows to easily share
55/// them among different parts of the system without borrowing issues.
56///
57/// Note that executors are not responsible for managing the lifetime of tasks,
58/// as it is assumed that tasks are self-contained and can be run independently.
59/// If a [`Task`] is submitted to an executor, it can't be cancelled or stopped,
60/// as the executor is not aware of the task's internal state. However, callers
61/// can implement fine-grained execution strategies on top of the executor to
62/// gain fine-grained control over task execution.
63///
64/// This is an opinionated implementation that specifically targets the needs of
65/// our execution model. It is not meant to be a general-purpose executor.
66///
67/// # Examples
68///
69/// Create an executor spawning 1,000 tasks using all CPUs - 1:
70///
71/// ```
72/// # use std::error::Error;
73/// # fn main() -> Result<(), Box<dyn Error>> {
74/// use std::thread;
75/// use std::time::Duration;
76/// use zrx_executor::strategy::WorkStealing;
77/// use zrx_executor::Executor;
78///
79/// // Create executor with strategy
80/// let strategy = WorkStealing::default();
81/// let executor = Executor::new(strategy);
82///
83/// // Create 1,000 tasks taking 20ms each
84/// for _ in 0..1000 {
85/// executor.submit(|| {
86/// thread::sleep(Duration::from_millis(20));
87/// })?;
88/// }
89/// # Ok(())
90/// # }
91/// ```
92#[derive(Debug)]
93pub struct Executor<S>
94where
95 S: Strategy,
96{
97 // Execution strategy.
98 strategy: Rc<S>,
99}
100
101// ----------------------------------------------------------------------------
102// Implementations
103// ----------------------------------------------------------------------------
104
105impl<S> Executor<S>
106where
107 S: Strategy,
108{
109 /// Creates an executor.
110 ///
111 /// # Examples
112 ///
113 /// ```
114 /// use zrx_executor::strategy::WorkSharing;
115 /// use zrx_executor::Executor;
116 ///
117 /// // Create executor with strategy
118 /// let executor = Executor::new(WorkSharing::default());
119 /// ```
120 #[must_use]
121 pub fn new(strategy: S) -> Self {
122 Self { strategy: Rc::new(strategy) }
123 }
124
125 /// Submits a task.
126 ///
127 /// This method submits a [`Task`], which is executed by one of the worker
128 /// threads as soon as possible. If a task computes a result, a [`Sender`][]
129 /// can be shared with the task, to send the result back to the caller,
130 /// which can then poll a [`Receiver`][].
131 ///
132 /// Note that tasks are intended to only run once, which is why they are
133 /// consumed. If a task needs to be run multiple times, it must be wrapped
134 /// in a closure that creates a new task each time. This allows for safe
135 /// sharing of state between tasks.
136 ///
137 /// [`Receiver`]: crossbeam::channel::Receiver
138 /// [`Sender`]: crossbeam::channel::Sender
139 ///
140 /// # Errors
141 ///
142 /// If the executor encounters a problem during task submission, it will
143 /// forward the encountered error to the caller, returning the task. Most
144 /// likely, the underlying execution strategy is at capacity, which means
145 /// the caller should resubmit the task at a later time. This is possible,
146 /// since this method accepts any type that implements the [`Task`] trait
147 /// and converts it into a boxed task.
148 ///
149 /// # Examples
150 ///
151 /// Submit a task:
152 ///
153 /// ```
154 /// # use std::error::Error;
155 /// # fn main() -> Result<(), Box<dyn Error>> {
156 /// use zrx_executor::Executor;
157 ///
158 /// // Create executor and submit task
159 /// let executor = Executor::default();
160 /// executor.submit(|| println!("Task"))?;
161 /// # Ok(())
162 /// # }
163 /// ```
164 ///
165 /// Submit a task returning subtasks:
166 ///
167 /// ```
168 /// # use std::error::Error;
169 /// # fn main() -> Result<(), Box<dyn Error>> {
170 /// use zrx_executor::Executor;
171 ///
172 /// // Create executor and submit task
173 /// let executor = Executor::default();
174 /// executor.submit(|| {
175 /// println!("Task 1");
176 /// || {
177 /// println!("Task 1.1");
178 /// || {
179 /// println!("Task 1.1.1");
180 /// }
181 /// }
182 /// })?;
183 /// # Ok(())
184 /// # }
185 /// ```
186 ///
187 /// Submit a task returning a task collection:
188 ///
189 /// ```
190 /// # use std::error::Error;
191 /// # fn main() -> Result<(), Box<dyn Error>> {
192 /// use zrx_executor::task::Tasks;
193 /// use zrx_executor::Executor;
194 ///
195 /// // Create executor and submit task
196 /// let executor = Executor::default();
197 /// executor.submit(|| {
198 /// println!("Task 1");
199 ///
200 /// // Create subtasks
201 /// let mut tasks = Tasks::new();
202 /// tasks.add(|| println!("Task 1.1"));
203 /// tasks.add(|| println!("Task 1.2"));
204 /// tasks.add(|| println!("Task 1.3"));
205 /// tasks
206 /// })?;
207 /// # Ok(())
208 /// # }
209 /// ```
210 #[inline]
211 pub fn submit<T>(&self, task: T) -> Result
212 where
213 T: Into<Box<dyn Task>>,
214 {
215 self.strategy.submit(task.into())
216 }
217
218 /// Waits for all tasks to finish.
219 ///
220 /// This method blocks the current thread until all submitted running and
221 /// pending tasks have been completed. Calling this method is not necessary,
222 /// as it's called automatically when the executor is dropped, but it might
223 /// be helpful for testing and debugging purposes.
224 ///
225 /// # Examples
226 ///
227 /// ```
228 /// # use std::error::Error;
229 /// # fn main() -> Result<(), Box<dyn Error>> {
230 /// use std::thread;
231 /// use std::time::Duration;
232 /// use zrx_executor::strategy::WorkStealing;
233 /// use zrx_executor::Executor;
234 ///
235 /// // Create executor with strategy
236 /// let strategy = WorkStealing::default();
237 /// let executor = Executor::new(strategy);
238 ///
239 /// // Create 1,000 tasks taking 20ms each
240 /// for _ in 0..1000 {
241 /// executor.submit(|| {
242 /// thread::sleep(Duration::from_millis(20));
243 /// })?;
244 /// }
245 ///
246 /// // Wait for all tasks to finish
247 /// executor.wait();
248 /// assert!(executor.is_empty());
249 /// # Ok(())
250 /// # }
251 /// ```
252 pub fn wait(&self) {
253 let duration = Duration::from_millis(10);
254 while !self.is_empty() {
255 thread::sleep(duration);
256 }
257 }
258}
259
260#[allow(clippy::must_use_candidate)]
261impl<S> Executor<S>
262where
263 S: Strategy,
264{
265 /// Returns the number of tasks.
266 ///
267 /// This method returns the total number of tasks currently managed by the
268 /// executor, which includes running as well as pending tasks.
269 ///
270 /// # Examples
271 ///
272 /// ```
273 /// use zrx_executor::Executor;
274 ///
275 /// // Get number of tasks
276 /// let executor = Executor::default();
277 /// assert_eq!(executor.len(), 0);
278 /// ```
279 #[inline]
280 pub fn len(&self) -> usize {
281 self.num_tasks_running() + self.num_tasks_pending()
282 }
283
284 /// Returns whether there are any tasks.
285 ///
286 /// This method checks whether the executor has running or pending tasks,
287 /// and if not, considers the executor as idle. It's particularly useful
288 /// for waiting until an executor has processed all tasks, which is
289 /// necessary for implementing schedulers on top of executors.
290 ///
291 /// # Examples
292 ///
293 /// ```
294 /// use zrx_executor::Executor;
295 ///
296 /// // Check whether executor is idle
297 /// let executor = Executor::default();
298 /// assert!(executor.is_empty());
299 /// ```
300 #[inline]
301 pub fn is_empty(&self) -> bool {
302 self.len() == 0
303 }
304
305 /// Returns whether the executor is saturated.
306 ///
307 /// This method checks whether the executor is at capacity, which means
308 /// task submission will fail until a worker has finished a task.
309 ///
310 /// # Examples
311 ///
312 /// ```
313 /// use zrx_executor::Executor;
314 ///
315 /// // Check whether executor is saturated
316 /// let executor = Executor::default();
317 /// assert!(!executor.is_saturated());
318 /// ```
319 #[inline]
320 pub fn is_saturated(&self) -> bool {
321 self.capacity()
322 .is_some_and(|capacity| self.num_tasks_pending() >= capacity)
323 }
324
325 /// Returns the number of workers.
326 ///
327 /// # Examples
328 ///
329 /// ```
330 /// use zrx_executor::strategy::WorkSharing;
331 /// use zrx_executor::Executor;
332 ///
333 /// // Get number of workers
334 /// let executor = Executor::new(WorkSharing::new(1));
335 /// assert_eq!(executor.num_workers(), 1);
336 /// ```
337 #[inline]
338 pub fn num_workers(&self) -> usize {
339 self.strategy.num_workers()
340 }
341
342 /// Returns the number of running tasks.
343 ///
344 /// This method allows to monitor the worker load, as it returns how many
345 /// workers are currently actively executing tasks.
346 ///
347 /// # Examples
348 ///
349 /// ```
350 /// use zrx_executor::Executor;
351 ///
352 /// // Get number of running tasks
353 /// let executor = Executor::default();
354 /// assert_eq!(executor.num_tasks_running(), 0);
355 /// ```
356 #[inline]
357 pub fn num_tasks_running(&self) -> usize {
358 self.strategy.num_tasks_running()
359 }
360
361 /// Returns the number of pending tasks.
362 ///
363 /// This method allows to throttle the submission of tasks, as it returns
364 /// how many tasks are currently waiting to be executed.
365 ///
366 /// # Examples
367 ///
368 /// ```
369 /// use zrx_executor::Executor;
370 ///
371 /// // Get number of pending tasks
372 /// let executor = Executor::default();
373 /// assert_eq!(executor.num_tasks_pending(), 0);
374 /// ```
375 #[inline]
376 pub fn num_tasks_pending(&self) -> usize {
377 self.strategy.num_tasks_pending()
378 }
379
380 /// Returns the capacity, if bounded.
381 ///
382 /// This method returns the maximum number of tasks that can be submitted
383 /// at once, which can be used by the strategy for applying backpressure.
384 ///
385 /// # Examples
386 ///
387 /// ```
388 /// use zrx_executor::Executor;
389 ///
390 /// // Get maximum number of tasks
391 /// let executor = Executor::default();
392 /// assert!(executor.capacity() >= Some(executor.num_workers()));
393 /// ```
394 #[inline]
395 pub fn capacity(&self) -> Option<usize> {
396 self.strategy.capacity()
397 }
398}
399
400// ----------------------------------------------------------------------------
401// Trait implementations
402// ----------------------------------------------------------------------------
403
404impl<S> Clone for Executor<S>
405where
406 S: Strategy,
407{
408 /// Clones the executor.
409 ///
410 /// This method creates a new executor with the same execution strategy,
411 /// which allows to share them without borrowing issues.
412 ///
413 /// # Examples
414 ///
415 /// ```
416 /// use zrx_executor::Executor;
417 ///
418 /// // Create and clone executor
419 /// let executor = Executor::default();
420 /// executor.clone();
421 /// ```
422 #[inline]
423 fn clone(&self) -> Self {
424 Self {
425 strategy: Rc::clone(&self.strategy),
426 }
427 }
428}
429
430impl Default for Executor<WorkSharing> {
431 /// Creates an executor using the default work-sharing strategy.
432 ///
433 /// # Examples
434 ///
435 /// ```
436 /// use zrx_executor::Executor;
437 ///
438 /// // Create executor
439 /// let executor = Executor::default();
440 /// ```
441 #[inline]
442 fn default() -> Self {
443 Self::new(WorkSharing::default())
444 }
445}
446
447impl<S> Drop for Executor<S>
448where
449 S: Strategy,
450{
451 /// Waits for all tasks to finish.
452 fn drop(&mut self) {
453 self.wait();
454 }
455}