pub struct WorkStealing { /* private fields */ }Expand description
Work-stealing execution strategy.
This strategy implements work-stealing, where each worker thread has its own local queue. Workers can steal tasks from a central injector or from other workers, if their local queues are empty. This allows for more efficient execution if there’s a large number of workers and tasks.
Work stealing enhances load balancing by allowing idle workers to take on
tasks from busier peers, which helps to reduce idle time and can improve
overall throughput. Unlike the simpler WorkSharing strategy that uses
a central queue where workers may become stuck waiting for new tasks, this
method can yield better resource utilization. Additionally, work-stealing
helps mitigate contention over shared resources (i.e. channels), which can
become a bottleneck in central queueing systems, allowing each worker to
primarily operate on its local queue. However, if task runtimes are short,
the utilization can be lower than with a central queue, since stealing uses
an optimistic strategy and is a best-effort operation. When task runtimes
are long, work stealing can be more efficient than central queueing.
This reduced contention is particularly beneficial in dynamic environments with significantly fluctuating workloads, enabling faster task completion as workers can quickly adapt to take on shorter or less complex tasks as they become available.
§Examples
use zrx_executor::strategy::{Strategy, WorkStealing};
// Create strategy and submit task
let strategy = WorkStealing::default();
strategy.submit(Box::new(|| println!("Task")))?;Implementations§
Source§impl WorkStealing
impl WorkStealing
Sourcepub fn new(num_workers: usize) -> Self
pub fn new(num_workers: usize) -> Self
Creates a work-stealing execution strategy.
This method creates a strategy with the given number of worker threads,
which are spawned immediately before the method returns. Note that this
strategy uses an unbounded channel, so there’re no capacity limits as
for the WorkSharing execution strategy.
§Panics
Panics if thread creation fails.
§Examples
use zrx_executor::strategy::WorkStealing;
// Create strategy
let strategy = WorkStealing::new(4);Trait Implementations§
Source§impl Debug for WorkStealing
impl Debug for WorkStealing
Source§impl Default for WorkStealing
impl Default for WorkStealing
Source§fn default() -> Self
fn default() -> Self
Creates a work-stealing execution strategy using all CPUs - 1.
The number of workers is determined by the number of logical CPUs minus one, which reserves one core for the main thread for orchestration. If the number of logical CPUs is fewer than 1, the strategy defaults to a single worker thread.
Warning: this method makes use of thread::available_parallelism
to determine the number of available cores, which has some limitations.
Please refer to the documentation of that function for more details, or
consider using num_cpus as an alternative.
§Examples
use zrx_executor::strategy::WorkStealing;
// Create strategy
let strategy = WorkStealing::default();Source§impl Drop for WorkStealing
impl Drop for WorkStealing
Source§fn drop(&mut self)
fn drop(&mut self)
Terminates and joins all worker threads.
This method waits for all worker threads to finish executing currently running tasks, while ignoring any pending tasks. All worker threads are joined before the method returns. This is necessary to prevent worker threads from running after the strategy has been dropped.
Source§impl Strategy for WorkStealing
impl Strategy for WorkStealing
Source§fn submit(&self, task: Box<dyn Task>) -> Result
fn submit(&self, task: Box<dyn Task>) -> Result
Submits a task.
This method submits a Task, which is executed by one of the worker
threads as soon as possible. If a task computes a result, a Sender
can be shared with the task, to send the result back to the caller,
which can then poll a Receiver.
Note that tasks are intended to only run once, which is why they are consumed. If a task needs to be run multiple times, it must be wrapped in a closure that creates a new task each time. This allows for safe sharing of state between tasks.
§Errors
This method is infallible, and will always return Ok.
§Examples
use zrx_executor::strategy::{Strategy, WorkStealing};
// Create strategy and submit task
let strategy = WorkStealing::default();
strategy.submit(Box::new(|| println!("Task")))?;Source§fn num_workers(&self) -> usize
fn num_workers(&self) -> usize
Returns the number of workers.
§Examples
use zrx_executor::strategy::{Strategy, WorkStealing};
// Get number of workers
let strategy = WorkStealing::new(1);
assert_eq!(strategy.num_workers(), 1);Source§fn num_tasks_running(&self) -> usize
fn num_tasks_running(&self) -> usize
Returns the number of running tasks.
This method allows to monitor the worker load, as it returns how many workers are currently actively executing tasks.
§Examples
use zrx_executor::strategy::{Strategy, WorkStealing};
// Get number of running tasks
let strategy = WorkStealing::default();
assert_eq!(strategy.num_tasks_running(), 0);Source§fn num_tasks_pending(&self) -> usize
fn num_tasks_pending(&self) -> usize
Returns the number of pending tasks.
This method allows to throttle the submission of tasks, as it returns how many tasks are currently waiting to be executed.
§Examples
use zrx_executor::strategy::{Strategy, WorkStealing};
// Get number of pending tasks
let strategy = WorkStealing::default();
assert_eq!(strategy.num_tasks_pending(), 0);Source§fn capacity(&self) -> Option<usize>
fn capacity(&self) -> Option<usize>
Returns the capacity, if bounded.
The work-stealing execution strategy does not impose a hard limit on the number of tasks. Thus, this strategy should only be used if tasks are not produced faster than they can be executed, or the number of tasks is limited by some other means.
§Examples
use zrx_executor::strategy::{Strategy, WorkStealing};
// Get capacity
let strategy = WorkStealing::default();
assert_eq!(strategy.capacity(), None);