Skip to main content

WorkSharing

Struct WorkSharing 

Source
pub struct WorkSharing { /* private fields */ }
Expand description

Work-sharing execution strategy.

This strategy manages its tasks centrally in a single bounded crossbeam channel, which pull tasks from it and execute them, repeating the process until they are terminated. This is a very simple, yet reasonably efficient strategy in most cases.

Tasks are processed in the exact same order they were submitted, albeit they might not finish in the same order. As this strategy uses a bounded channel, task submission might fail when the channel’s capacity is reached, leading to better performance characteristics due to the use of atomics over locks. As an alternative, the WorkStealing strategy can be used, which is built on unbounded channels and allows for more flexible task submission, including automatic load balancing between workers which is particularly useful when tasks create subtasks.

§Examples

use zrx_executor::strategy::{Strategy, WorkSharing};

// Create strategy and submit task
let strategy = WorkSharing::default();
strategy.submit(Box::new(|| println!("Task")))?;

Implementations§

Source§

impl WorkSharing

Source

pub fn new(num_workers: usize) -> Self

Creates a work-sharing execution strategy.

This method creates a strategy with the given number of worker threads, which are spawned immediately before the method returns. Internally, a bounded channel is created with a capacity of 8 tasks per worker, so for 4 workers, the channel will have a capacity of 32 tasks.

Use WorkSharing::with_capacity to set a custom capacity.

§Panics

Panics if thread creation fails.

§Examples
use zrx_executor::strategy::WorkSharing;

// Create strategy
let strategy = WorkSharing::new(4);
Source

pub fn with_capacity(num_workers: usize, capacity: usize) -> Self

Creates a work-sharing execution strategy with the given capacity.

This method creates a strategy with the given number of worker threads, which are spawned immediately before the method returns.

This strategy makes use of a bounded channel for its better performance characteristics, since the caller is expected to have control over task submission, ensuring that the executor can accept new tasks. The given capacity sets the number of tasks the executor accepts before starting to reject them, which can be used to apply backpressure. Note that the capacity is not a per-worker, but a global per-executor limit.

§Panics

Panics if thread creation fails.

§Examples
use zrx_executor::strategy::WorkSharing;

// Create strategy with capacity
let strategy = WorkSharing::with_capacity(4, 64);

Trait Implementations§

Source§

impl Debug for WorkSharing

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the execution strategy for debugging.

Source§

impl Default for WorkSharing

Source§

fn default() -> Self

Creates a work-sharing execution strategy using all CPUs - 1.

The number of workers is determined by the number of logical CPUs minus one, which reserves one core for the main thread for orchestration. If the number of logical CPUs is fewer than 1, the strategy defaults to a single worker thread.

Warning: this method makes use of thread::available_parallelism to determine the number of available cores, which has some limitations. Please refer to the documentation of that function for more details, or consider using num_cpus as an alternative.

§Examples
use zrx_executor::strategy::WorkSharing;

// Create strategy
let strategy = WorkSharing::default();
Source§

impl Drop for WorkSharing

Source§

fn drop(&mut self)

Terminates and joins all worker threads.

This method waits for all worker threads to finish executing currently running tasks, while ignoring any pending tasks. All worker threads are joined before the method returns. This is necessary to prevent worker threads from running after the strategy has been dropped.

Source§

impl Strategy for WorkSharing

Source§

fn submit(&self, task: Box<dyn Task>) -> Result

Submits a task.

This method submits a Task, which is executed by one of the worker threads as soon as possible. If a task computes a result, a Sender can be shared with the task, to send the result back to the caller, which can then poll a Receiver.

Note that tasks are intended to only run once, which is why they are consumed. If a task needs to be run multiple times, it must be wrapped in a closure that creates a new task each time. This allows for safe sharing of state between tasks.

§Errors

If the task cannot be submitted, Error::Submit is returned, which can only happen if the channel is disconnected or at capacity.

§Examples
use zrx_executor::strategy::{Strategy, WorkSharing};

// Create strategy and submit task
let strategy = WorkSharing::default();
strategy.submit(Box::new(|| println!("Task")))?;
Source§

fn num_workers(&self) -> usize

Returns the number of workers.

§Examples
use zrx_executor::strategy::{Strategy, WorkSharing};

// Get number of workers
let strategy = WorkSharing::new(1);
assert_eq!(strategy.num_workers(), 1);
Source§

fn num_tasks_running(&self) -> usize

Returns the number of running tasks.

This method allows to monitor the worker load, as it returns how many workers are currently actively executing tasks.

§Examples
use zrx_executor::strategy::{Strategy, WorkSharing};

// Get number of running tasks
let strategy = WorkSharing::default();
assert_eq!(strategy.num_tasks_running(), 0);
Source§

fn num_tasks_pending(&self) -> usize

Returns the number of pending tasks.

This method allows to throttle the submission of tasks, as it returns how many tasks are currently waiting to be executed.

§Examples
use zrx_executor::strategy::{Strategy, WorkSharing};

// Get number of pending tasks
let strategy = WorkSharing::default();
assert_eq!(strategy.num_tasks_pending(), 0);
Source§

fn capacity(&self) -> usize

Returns the capacity.

This method returns the maximum number of tasks that can be submitted at once, which can be used by the strategy for applying backpressure.

§Examples
use zrx_executor::strategy::{Strategy, WorkSharing};

// Get capacity
let strategy = WorkSharing::default();
assert!(strategy.capacity() >= strategy.num_workers());

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> Pointable for T

Source§

const ALIGN: usize

The alignment of pointer.
Source§

type Init = T

The type for initializers.
Source§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
Source§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
Source§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
Source§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.