pub struct WorkStealing { /* private fields */ }Expand description
Work-stealing execution strategy.
This strategy implements work-stealing, where each worker thread has its own local queue. Workers can steal tasks from a central injector or from other workers, if their local queues are empty. This allows for more efficient execution if there’s a large number of workers and tasks.
Work stealing enhances load balancing by allowing idle workers to take on
tasks from busier peers, which helps to reduce idle time and can improve
overall throughput. Unlike the simpler WorkSharing strategy that uses
a central queue where workers may become stuck waiting for new tasks, this
method can yield better resource utilization. Additionally, work-stealing
helps mitigate contention over shared resources (i.e. channels), which can
become a bottleneck in central queueing systems, allowing each worker to
primarily operate on its local queue. However, if task runtimes are short,
the utilization can be lower than with a central queue, since stealing uses
an optimistic strategy and is a best-effort operation. When task runtimes
are long, work stealing can be more efficient than central queueing.
This reduced contention is particularly beneficial in dynamic environments with significantly fluctuating workloads, enabling faster task completion as workers can quickly adapt to take on shorter or less complex tasks as they become available.
§Examples
use zrx_executor::strategy::{Strategy, WorkStealing};
// Create strategy and submit task
let strategy = WorkStealing::default();
strategy.submit(Box::new(|| println!("Task")))?;Implementations§
Source§impl WorkStealing
impl WorkStealing
Sourcepub fn new(num_workers: usize) -> Self
pub fn new(num_workers: usize) -> Self
Creates a work-stealing execution strategy.
This method creates a strategy with the given number of worker threads, which are spawned immediately before the method returns. Internally, a default limit of 8 tasks per worker is used, so for 4 workers, the executor will have a capacity of 32 tasks.
Use WorkStealing::with_capacity to set a custom capacity.
§Panics
Panics if thread creation fails.
§Examples
use zrx_executor::strategy::WorkStealing;
// Create strategy
let strategy = WorkStealing::new(4);Sourcepub fn with_capacity(num_workers: usize, capacity: usize) -> Self
pub fn with_capacity(num_workers: usize, capacity: usize) -> Self
Creates a work-stealing execution strategy with the given capacity.
This method creates a strategy with the given number of worker threads, which are spawned immediately before the method returns.
While this strategy uses unbounded channels due to how the Injector
and Worker concepts are implemented, it still provides a capacity
limit to ensure the executor doesn’t end being overwhelmed. The given
capacity sets the number of tasks the executor accepts before starting
to reject them, which can be used to apply backpressure. Note that the
capacity is not a per-worker, but a global per-executor limit.
§Panics
Panics if thread creation fails.
§Examples
use zrx_executor::strategy::WorkStealing;
// Create strategy
let strategy = WorkStealing::with_capacity(4, 64);Trait Implementations§
Source§impl Debug for WorkStealing
impl Debug for WorkStealing
Source§impl Default for WorkStealing
impl Default for WorkStealing
Source§fn default() -> Self
fn default() -> Self
Creates a work-stealing execution strategy using all CPUs - 1.
The number of workers is determined by the number of logical CPUs minus one, which reserves one core for the main thread for orchestration. If the number of logical CPUs is fewer than 1, the strategy defaults to a single worker thread.
Warning: this method makes use of thread::available_parallelism
to determine the number of available cores, which has some limitations.
Please refer to the documentation of that function for more details, or
consider using num_cpus as an alternative.
§Examples
use zrx_executor::strategy::WorkStealing;
// Create strategy
let strategy = WorkStealing::default();Source§impl Drop for WorkStealing
impl Drop for WorkStealing
Source§fn drop(&mut self)
fn drop(&mut self)
Terminates and joins all worker threads.
This method waits for all worker threads to finish executing currently running tasks, while ignoring any pending tasks. All worker threads are joined before the method returns. This is necessary to prevent worker threads from running after the strategy has been dropped.
Source§impl Strategy for WorkStealing
impl Strategy for WorkStealing
Source§fn submit(&self, task: Box<dyn Task>) -> Result
fn submit(&self, task: Box<dyn Task>) -> Result
Submits a task.
This method submits a Task, which is executed by one of the worker
threads as soon as possible. If a task computes a result, a Sender
can be shared with the task, to send the result back to the caller,
which can then poll a Receiver.
Note that tasks are intended to only run once, which is why they are consumed. If a task needs to be run multiple times, it must be wrapped in a closure that creates a new task each time. This allows for safe sharing of state between tasks.
§Errors
If the task cannot be submitted, Error::Submit is returned, which
can only happen if the channel is disconnected or at capacity.
§Examples
use zrx_executor::strategy::{Strategy, WorkStealing};
// Create strategy and submit task
let strategy = WorkStealing::default();
strategy.submit(Box::new(|| println!("Task")))?;Source§fn num_workers(&self) -> usize
fn num_workers(&self) -> usize
Returns the number of workers.
§Examples
use zrx_executor::strategy::{Strategy, WorkStealing};
// Get number of workers
let strategy = WorkStealing::new(1);
assert_eq!(strategy.num_workers(), 1);Source§fn num_tasks_running(&self) -> usize
fn num_tasks_running(&self) -> usize
Returns the number of running tasks.
This method allows to monitor the worker load, as it returns how many workers are currently actively executing tasks.
§Examples
use zrx_executor::strategy::{Strategy, WorkStealing};
// Get number of running tasks
let strategy = WorkStealing::default();
assert_eq!(strategy.num_tasks_running(), 0);Source§fn num_tasks_pending(&self) -> usize
fn num_tasks_pending(&self) -> usize
Returns the number of pending tasks.
This method allows to throttle the submission of tasks, as it returns how many tasks are currently waiting to be executed.
§Examples
use zrx_executor::strategy::{Strategy, WorkStealing};
// Get number of pending tasks
let strategy = WorkStealing::default();
assert_eq!(strategy.num_tasks_pending(), 0);Source§fn capacity(&self) -> usize
fn capacity(&self) -> usize
Returns the capacity.
The work-stealing execution strategy does not impose a hard limit on the number of tasks. Thus, this strategy should only be used if tasks are not produced faster than they can be executed, or the number of tasks is limited by some other means.
§Examples
use zrx_executor::strategy::{Strategy, WorkStealing};
// Get capacity
let strategy = WorkStealing::default();
assert!(strategy.capacity() >= strategy.num_workers());