zrx_executor/executor.rs
1// Copyright (c) 2025-2026 Zensical and contributors
2
3// SPDX-License-Identifier: MIT
4// All contributions are certified under the DCO
5
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to
8// deal in the Software without restriction, including without limitation the
9// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10// sell copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12
13// The above copyright notice and this permission notice shall be included in
14// all copies or substantial portions of the Software.
15
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22// IN THE SOFTWARE.
23
24// ----------------------------------------------------------------------------
25
26//! Executor.
27
28use std::sync::Arc;
29use std::thread;
30use std::time::Duration;
31
32mod error;
33mod signal;
34pub mod strategy;
35pub mod task;
36
37pub use error::{Error, Result};
38use strategy::{Strategy, WorkSharing};
39use task::Task;
40
41// ----------------------------------------------------------------------------
42// Structs
43// ----------------------------------------------------------------------------
44
45/// Executor.
46///
47/// Executors serve as the primary interface for submitting and monitoring tasks
48/// within the system. They act as a frontend to various execution [`Strategy`]
49/// implementations, which define how tasks are prioritized and executed. Each
50/// execution [`Strategy`] encapsulates an implementation that determines the
51/// order and concurrency of execution. Abstracting the execution mechanism
52/// allows for flexible and interchangeable task management strategies.
53///
54/// Additionally, executors implement [`Clone`], which allows to easily share
55/// them among different parts of the system without borrowing issues.
56///
57/// Note that executors are not responsible for managing the lifetime of tasks,
58/// as it is assumed that tasks are self-contained and can be run independently.
59/// If a [`Task`] is submitted to an executor, it can't be cancelled or stopped,
60/// as the executor is not aware of the task's internal state. However, callers
61/// can implement fine-grained execution strategies on top of the executor to
62/// gain fine-grained control over task execution.
63///
64/// This is an opinionated implementation that specifically targets the needs of
65/// our execution model. It is not meant to be a general-purpose executor.
66///
67/// # Examples
68///
69/// Create an executor spawning 8 tasks using all CPUs - 1:
70///
71/// ```
72/// # use std::error::Error;
73/// # fn main() -> Result<(), Box<dyn Error>> {
74/// use std::thread;
75/// use std::time::Duration;
76/// use zrx_executor::Executor;
77///
78/// // Create executor
79/// let executor = Executor::default();
80///
81/// // Create tasks up to the executor's capacity
82/// for _ in 0..executor.capacity() {
83/// executor.submit(|| {
84/// thread::sleep(Duration::from_millis(20));
85/// })?;
86/// }
87/// # Ok(())
88/// # }
89/// ```
90#[derive(Debug)]
91pub struct Executor<S>
92where
93 S: Strategy,
94{
95 // Execution strategy.
96 strategy: Arc<S>,
97}
98
99// ----------------------------------------------------------------------------
100// Implementations
101// ----------------------------------------------------------------------------
102
103impl<S> Executor<S>
104where
105 S: Strategy,
106{
107 /// Creates an executor.
108 ///
109 /// # Examples
110 ///
111 /// ```
112 /// use zrx_executor::strategy::WorkSharing;
113 /// use zrx_executor::Executor;
114 ///
115 /// // Create executor with strategy
116 /// let executor = Executor::new(WorkSharing::default());
117 /// ```
118 #[must_use]
119 pub fn new(strategy: S) -> Self {
120 Self { strategy: Arc::new(strategy) }
121 }
122
123 /// Submits a task.
124 ///
125 /// This method submits a [`Task`], which is executed by one of the worker
126 /// threads as soon as possible. If a task computes a result, a [`Sender`][]
127 /// can be shared with the task, to send the result back to the caller,
128 /// which can then poll a [`Receiver`][].
129 ///
130 /// Note that tasks are intended to only run once, which is why they are
131 /// consumed. If a task needs to be run multiple times, it must be wrapped
132 /// in a closure that creates a new task each time. This allows for safe
133 /// sharing of state between tasks.
134 ///
135 /// [`Receiver`]: crossbeam::channel::Receiver
136 /// [`Sender`]: crossbeam::channel::Sender
137 ///
138 /// # Errors
139 ///
140 /// If the executor encounters a problem during task submission, it will
141 /// forward the encountered error to the caller, returning the task. Most
142 /// likely, the underlying execution strategy is at capacity, which means
143 /// the caller should resubmit the task at a later time. This is possible,
144 /// since this method accepts any type that implements the [`Task`] trait
145 /// and converts it into a boxed task.
146 ///
147 /// # Examples
148 ///
149 /// Submit a task:
150 ///
151 /// ```
152 /// # use std::error::Error;
153 /// # fn main() -> Result<(), Box<dyn Error>> {
154 /// use zrx_executor::Executor;
155 ///
156 /// // Create executor and submit task
157 /// let executor = Executor::default();
158 /// executor.submit(|| println!("Task"))?;
159 /// # Ok(())
160 /// # }
161 /// ```
162 ///
163 /// Submit a task returning subtasks:
164 ///
165 /// ```
166 /// # use std::error::Error;
167 /// # fn main() -> Result<(), Box<dyn Error>> {
168 /// use zrx_executor::Executor;
169 ///
170 /// // Create executor and submit task
171 /// let executor = Executor::default();
172 /// executor.submit(|| {
173 /// println!("Task 1");
174 /// || {
175 /// println!("Task 1.1");
176 /// || {
177 /// println!("Task 1.1.1");
178 /// }
179 /// }
180 /// })?;
181 /// # Ok(())
182 /// # }
183 /// ```
184 ///
185 /// Submit a task returning a task collection:
186 ///
187 /// ```
188 /// # use std::error::Error;
189 /// # fn main() -> Result<(), Box<dyn Error>> {
190 /// use zrx_executor::task::Tasks;
191 /// use zrx_executor::Executor;
192 ///
193 /// // Create executor and submit task
194 /// let executor = Executor::default();
195 /// executor.submit(|| {
196 /// println!("Task 1");
197 ///
198 /// // Create subtasks
199 /// let mut tasks = Tasks::new();
200 /// tasks.add(|| println!("Task 1.1"));
201 /// tasks.add(|| println!("Task 1.2"));
202 /// tasks.add(|| println!("Task 1.3"));
203 /// tasks
204 /// })?;
205 /// # Ok(())
206 /// # }
207 /// ```
208 #[inline]
209 pub fn submit<T>(&self, task: T) -> Result
210 where
211 T: Into<Box<dyn Task>>,
212 {
213 self.strategy.submit(task.into())
214 }
215
216 /// Waits for all tasks to finish.
217 ///
218 /// This method blocks the current thread until all submitted running and
219 /// pending tasks have been completed. Calling this method is not necessary,
220 /// as it's called automatically when the executor is dropped, but it might
221 /// be helpful for testing and debugging purposes.
222 ///
223 /// # Examples
224 ///
225 /// ```
226 /// # use std::error::Error;
227 /// # fn main() -> Result<(), Box<dyn Error>> {
228 /// use std::thread;
229 /// use std::time::Duration;
230 /// use zrx_executor::Executor;
231 ///
232 /// // Create executor
233 /// let executor = Executor::default();
234 ///
235 /// // Create tasks up to the executor's capacity
236 /// for _ in 0..executor.capacity() {
237 /// executor.submit(|| {
238 /// thread::sleep(Duration::from_millis(20));
239 /// })?;
240 /// }
241 ///
242 /// // Wait for all tasks to finish
243 /// executor.wait();
244 /// assert!(executor.is_empty());
245 /// # Ok(())
246 /// # }
247 /// ```
248 pub fn wait(&self) {
249 let duration = Duration::from_millis(10);
250 while !self.is_empty() {
251 thread::sleep(duration);
252 }
253 }
254}
255
256#[allow(clippy::must_use_candidate)]
257impl<S> Executor<S>
258where
259 S: Strategy,
260{
261 /// Returns the number of tasks.
262 ///
263 /// This method returns the total number of tasks currently managed by the
264 /// executor, which includes running as well as pending tasks.
265 ///
266 /// # Examples
267 ///
268 /// ```
269 /// use zrx_executor::Executor;
270 ///
271 /// // Get number of tasks
272 /// let executor = Executor::default();
273 /// assert_eq!(executor.len(), 0);
274 /// ```
275 #[inline]
276 pub fn len(&self) -> usize {
277 self.num_tasks_running() + self.num_tasks_pending()
278 }
279
280 /// Returns whether there are any tasks.
281 ///
282 /// This method checks whether the executor has running or pending tasks,
283 /// and if not, considers the executor as idle. It's particularly useful
284 /// for waiting until an executor has processed all tasks, which is
285 /// necessary for implementing schedulers on top of executors.
286 ///
287 /// # Examples
288 ///
289 /// ```
290 /// use zrx_executor::Executor;
291 ///
292 /// // Check whether executor is idle
293 /// let executor = Executor::default();
294 /// assert!(executor.is_empty());
295 /// ```
296 #[inline]
297 pub fn is_empty(&self) -> bool {
298 self.len() == 0
299 }
300
301 /// Returns whether the executor is saturated.
302 ///
303 /// This method checks whether the executor is at capacity, which means
304 /// task submission will fail until a worker has finished a task.
305 ///
306 /// # Examples
307 ///
308 /// ```
309 /// use zrx_executor::Executor;
310 ///
311 /// // Check whether executor is saturated
312 /// let executor = Executor::default();
313 /// assert!(!executor.is_saturated());
314 /// ```
315 #[inline]
316 pub fn is_saturated(&self) -> bool {
317 self.num_tasks_pending() >= self.capacity()
318 }
319
320 /// Returns the number of workers.
321 ///
322 /// # Examples
323 ///
324 /// ```
325 /// use zrx_executor::strategy::WorkSharing;
326 /// use zrx_executor::Executor;
327 ///
328 /// // Get number of workers
329 /// let executor = Executor::new(WorkSharing::new(1));
330 /// assert_eq!(executor.num_workers(), 1);
331 /// ```
332 #[inline]
333 pub fn num_workers(&self) -> usize {
334 self.strategy.num_workers()
335 }
336
337 /// Returns the number of running tasks.
338 ///
339 /// This method allows to monitor the worker load, as it returns how many
340 /// workers are currently actively executing tasks.
341 ///
342 /// # Examples
343 ///
344 /// ```
345 /// use zrx_executor::Executor;
346 ///
347 /// // Get number of running tasks
348 /// let executor = Executor::default();
349 /// assert_eq!(executor.num_tasks_running(), 0);
350 /// ```
351 #[inline]
352 pub fn num_tasks_running(&self) -> usize {
353 self.strategy.num_tasks_running()
354 }
355
356 /// Returns the number of pending tasks.
357 ///
358 /// This method allows to throttle the submission of tasks, as it returns
359 /// how many tasks are currently waiting to be executed.
360 ///
361 /// # Examples
362 ///
363 /// ```
364 /// use zrx_executor::Executor;
365 ///
366 /// // Get number of pending tasks
367 /// let executor = Executor::default();
368 /// assert_eq!(executor.num_tasks_pending(), 0);
369 /// ```
370 #[inline]
371 pub fn num_tasks_pending(&self) -> usize {
372 self.strategy.num_tasks_pending()
373 }
374
375 /// Returns the capacity.
376 ///
377 /// This method returns the maximum number of tasks that can be submitted
378 /// at once, which can be used by the strategy for applying backpressure.
379 ///
380 /// # Examples
381 ///
382 /// ```
383 /// use zrx_executor::Executor;
384 ///
385 /// // Get maximum number of tasks
386 /// let executor = Executor::default();
387 /// assert!(executor.capacity() >= executor.num_workers());
388 /// ```
389 #[inline]
390 pub fn capacity(&self) -> usize {
391 self.strategy.capacity()
392 }
393}
394
395// ----------------------------------------------------------------------------
396// Trait implementations
397// ----------------------------------------------------------------------------
398
399impl<S> Clone for Executor<S>
400where
401 S: Strategy,
402{
403 /// Clones the executor.
404 ///
405 /// This method creates a new executor with the same execution strategy,
406 /// which allows to share them without borrowing issues.
407 ///
408 /// # Examples
409 ///
410 /// ```
411 /// use zrx_executor::Executor;
412 ///
413 /// // Create and clone executor
414 /// let executor = Executor::default();
415 /// executor.clone();
416 /// ```
417 #[inline]
418 fn clone(&self) -> Self {
419 Self {
420 strategy: Arc::clone(&self.strategy),
421 }
422 }
423}
424
425impl Default for Executor<WorkSharing> {
426 /// Creates an executor using the default work-sharing strategy.
427 ///
428 /// # Examples
429 ///
430 /// ```
431 /// use zrx_executor::Executor;
432 ///
433 /// // Create executor
434 /// let executor = Executor::default();
435 /// ```
436 #[inline]
437 fn default() -> Self {
438 Self::new(WorkSharing::default())
439 }
440}
441
442impl<S> Drop for Executor<S>
443where
444 S: Strategy,
445{
446 /// Waits for all tasks to finish.
447 fn drop(&mut self) {
448 self.wait();
449 }
450}