Skip to main content

WorkStealing

Struct WorkStealing 

Source
pub struct WorkStealing { /* private fields */ }
Expand description

Work-stealing execution strategy.

This strategy implements work-stealing, where each worker thread has its own local queue. Workers can steal tasks from a central injector or from other workers, if their local queues are empty. This allows for more efficient execution if there’s a large number of workers and tasks.

Work stealing enhances load balancing by allowing idle workers to take on tasks from busier peers, which helps to reduce idle time and can improve overall throughput. Unlike the simpler WorkSharing strategy that uses a central queue where workers may become stuck waiting for new tasks, this method can yield better resource utilization. Additionally, work-stealing helps mitigate contention over shared resources (i.e. channels), which can become a bottleneck in central queueing systems, allowing each worker to primarily operate on its local queue. However, if task runtimes are short, the utilization can be lower than with a central queue, since stealing uses an optimistic strategy and is a best-effort operation. When task runtimes are long, work stealing can be more efficient than central queueing.

This reduced contention is particularly beneficial in dynamic environments with significantly fluctuating workloads, enabling faster task completion as workers can quickly adapt to take on shorter or less complex tasks as they become available.

§Examples

use zrx_executor::strategy::{Strategy, WorkStealing};

// Create strategy and submit task
let strategy = WorkStealing::default();
strategy.submit(Box::new(|| println!("Task")))?;

Implementations§

Source§

impl WorkStealing

Source

pub fn new(num_workers: usize) -> Self

Creates a work-stealing execution strategy.

This method creates a strategy with the given number of worker threads, which are spawned immediately before the method returns. Internally, a default limit of 8 tasks per worker is used, so for 4 workers, the executor will have a capacity of 32 tasks.

Use WorkStealing::with_capacity to set a custom capacity.

§Panics

Panics if thread creation fails.

§Examples
use zrx_executor::strategy::WorkStealing;

// Create strategy
let strategy = WorkStealing::new(4);
Source

pub fn with_capacity(num_workers: usize, capacity: usize) -> Self

Creates a work-stealing execution strategy with the given capacity.

This method creates a strategy with the given number of worker threads, which are spawned immediately before the method returns.

While this strategy uses unbounded channels due to how the Injector and Worker concepts are implemented, it still provides a capacity limit to ensure the executor doesn’t end being overwhelmed. The given capacity sets the number of tasks the executor accepts before starting to reject them, which can be used to apply backpressure. Note that the capacity is not a per-worker, but a global per-executor limit.

§Panics

Panics if thread creation fails.

§Examples
use zrx_executor::strategy::WorkStealing;

// Create strategy
let strategy = WorkStealing::with_capacity(4, 64);

Trait Implementations§

Source§

impl Debug for WorkStealing

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the execution strategy for debugging.

Source§

impl Default for WorkStealing

Source§

fn default() -> Self

Creates a work-stealing execution strategy using all CPUs - 1.

The number of workers is determined by the number of logical CPUs minus one, which reserves one core for the main thread for orchestration. If the number of logical CPUs is fewer than 1, the strategy defaults to a single worker thread.

Warning: this method makes use of thread::available_parallelism to determine the number of available cores, which has some limitations. Please refer to the documentation of that function for more details, or consider using num_cpus as an alternative.

§Examples
use zrx_executor::strategy::WorkStealing;

// Create strategy
let strategy = WorkStealing::default();
Source§

impl Drop for WorkStealing

Source§

fn drop(&mut self)

Terminates and joins all worker threads.

This method waits for all worker threads to finish executing currently running tasks, while ignoring any pending tasks. All worker threads are joined before the method returns. This is necessary to prevent worker threads from running after the strategy has been dropped.

Source§

impl Strategy for WorkStealing

Source§

fn submit(&self, task: Box<dyn Task>) -> Result

Submits a task.

This method submits a Task, which is executed by one of the worker threads as soon as possible. If a task computes a result, a Sender can be shared with the task, to send the result back to the caller, which can then poll a Receiver.

Note that tasks are intended to only run once, which is why they are consumed. If a task needs to be run multiple times, it must be wrapped in a closure that creates a new task each time. This allows for safe sharing of state between tasks.

§Errors

If the task cannot be submitted, Error::Submit is returned, which can only happen if the channel is disconnected or at capacity.

§Examples
use zrx_executor::strategy::{Strategy, WorkStealing};

// Create strategy and submit task
let strategy = WorkStealing::default();
strategy.submit(Box::new(|| println!("Task")))?;
Source§

fn num_workers(&self) -> usize

Returns the number of workers.

§Examples
use zrx_executor::strategy::{Strategy, WorkStealing};

// Get number of workers
let strategy = WorkStealing::new(1);
assert_eq!(strategy.num_workers(), 1);
Source§

fn num_tasks_running(&self) -> usize

Returns the number of running tasks.

This method allows to monitor the worker load, as it returns how many workers are currently actively executing tasks.

§Examples
use zrx_executor::strategy::{Strategy, WorkStealing};

// Get number of running tasks
let strategy = WorkStealing::default();
assert_eq!(strategy.num_tasks_running(), 0);
Source§

fn num_tasks_pending(&self) -> usize

Returns the number of pending tasks.

This method allows to throttle the submission of tasks, as it returns how many tasks are currently waiting to be executed.

§Examples
use zrx_executor::strategy::{Strategy, WorkStealing};

// Get number of pending tasks
let strategy = WorkStealing::default();
assert_eq!(strategy.num_tasks_pending(), 0);
Source§

fn capacity(&self) -> usize

Returns the capacity.

The work-stealing execution strategy does not impose a hard limit on the number of tasks. Thus, this strategy should only be used if tasks are not produced faster than they can be executed, or the number of tasks is limited by some other means.

§Examples
use zrx_executor::strategy::{Strategy, WorkStealing};

// Get capacity
let strategy = WorkStealing::default();
assert!(strategy.capacity() >= strategy.num_workers());

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.