pub struct WorkSharing { /* private fields */ }Expand description
Work-sharing execution strategy.
This strategy manages its tasks centrally in a single bounded crossbeam
channel, which pull tasks from it and execute them, repeating the process
until they are terminated. This is a very simple, yet reasonably efficient
strategy in most cases.
Tasks are processed in the exact same order they were submitted, albeit they
might not finish in the same order. As this strategy uses a bounded channel,
task submission might fail when the channel’s capacity is reached, leading
to better performance characteristics due to the use of atomics over locks.
As an alternative, the WorkStealing strategy can be used, which is
built on unbounded channels and allows for more flexible task submission,
including automatic load balancing between workers which is particularly
useful when tasks create subtasks.
§Examples
use zrx_executor::strategy::{Strategy, WorkSharing};
// Create strategy and submit task
let strategy = WorkSharing::default();
strategy.submit(Box::new(|| println!("Task")))?;Implementations§
Source§impl WorkSharing
impl WorkSharing
Sourcepub fn new(num_workers: usize) -> Self
pub fn new(num_workers: usize) -> Self
Creates a work-sharing execution strategy.
This method creates a strategy with the given number of worker threads, which are spawned immediately before the method returns. Internally, a bounded channel is created with a capacity of 8 tasks per worker, so for 4 workers, the channel will have a capacity of 32 tasks.
Use WorkSharing::with_capacity to set a custom capacity.
§Panics
Panics if thread creation fails.
§Examples
use zrx_executor::strategy::WorkSharing;
// Create strategy
let strategy = WorkSharing::new(4);Sourcepub fn with_capacity(num_workers: usize, capacity: usize) -> Self
pub fn with_capacity(num_workers: usize, capacity: usize) -> Self
Creates a work-sharing execution strategy with the given capacity.
This method creates a strategy with the given number of worker threads, which are spawned immediately before the method returns.
This strategy makes use of a bounded channel for its better performance characteristics, since the caller is expected to have control over task submission, ensuring that the executor can accept new tasks. The given capacity sets the number of tasks the executor accepts before starting to reject them, which can be used to apply backpressure. Note that the capacity is not a per-worker, but a global per-executor limit.
§Panics
Panics if thread creation fails.
§Examples
use zrx_executor::strategy::WorkSharing;
// Create strategy with capacity
let strategy = WorkSharing::with_capacity(4, 64);Trait Implementations§
Source§impl Debug for WorkSharing
impl Debug for WorkSharing
Source§impl Default for WorkSharing
impl Default for WorkSharing
Source§fn default() -> Self
fn default() -> Self
Creates a work-sharing execution strategy using all CPUs - 1.
The number of workers is determined by the number of logical CPUs minus one, which reserves one core for the main thread for orchestration. If the number of logical CPUs is fewer than 1, the strategy defaults to a single worker thread.
Warning: this method makes use of thread::available_parallelism
to determine the number of available cores, which has some limitations.
Please refer to the documentation of that function for more details, or
consider using num_cpus as an alternative.
§Examples
use zrx_executor::strategy::WorkSharing;
// Create strategy
let strategy = WorkSharing::default();Source§impl Drop for WorkSharing
impl Drop for WorkSharing
Source§fn drop(&mut self)
fn drop(&mut self)
Terminates and joins all worker threads.
This method waits for all worker threads to finish executing currently running tasks, while ignoring any pending tasks. All worker threads are joined before the method returns. This is necessary to prevent worker threads from running after the strategy has been dropped.
Source§impl Strategy for WorkSharing
impl Strategy for WorkSharing
Source§fn submit(&self, task: Box<dyn Task>) -> Result
fn submit(&self, task: Box<dyn Task>) -> Result
Submits a task.
This method submits a Task, which is executed by one of the worker
threads as soon as possible. If a task computes a result, a Sender
can be shared with the task, to send the result back to the caller,
which can then poll a Receiver.
Note that tasks are intended to only run once, which is why they are consumed. If a task needs to be run multiple times, it must be wrapped in a closure that creates a new task each time. This allows for safe sharing of state between tasks.
§Errors
If the task cannot be submitted, Error::Submit is returned, which
can only happen if the channel is disconnected or at capacity.
§Examples
use zrx_executor::strategy::{Strategy, WorkSharing};
// Create strategy and submit task
let strategy = WorkSharing::default();
strategy.submit(Box::new(|| println!("Task")))?;Source§fn num_workers(&self) -> usize
fn num_workers(&self) -> usize
Returns the number of workers.
§Examples
use zrx_executor::strategy::{Strategy, WorkSharing};
// Get number of workers
let strategy = WorkSharing::new(1);
assert_eq!(strategy.num_workers(), 1);Source§fn num_tasks_running(&self) -> usize
fn num_tasks_running(&self) -> usize
Returns the number of running tasks.
This method allows to monitor the worker load, as it returns how many workers are currently actively executing tasks.
§Examples
use zrx_executor::strategy::{Strategy, WorkSharing};
// Get number of running tasks
let strategy = WorkSharing::default();
assert_eq!(strategy.num_tasks_running(), 0);Source§fn num_tasks_pending(&self) -> usize
fn num_tasks_pending(&self) -> usize
Returns the number of pending tasks.
This method allows to throttle the submission of tasks, as it returns how many tasks are currently waiting to be executed.
§Examples
use zrx_executor::strategy::{Strategy, WorkSharing};
// Get number of pending tasks
let strategy = WorkSharing::default();
assert_eq!(strategy.num_tasks_pending(), 0);Source§fn capacity(&self) -> usize
fn capacity(&self) -> usize
Returns the capacity.
This method returns the maximum number of tasks that can be submitted at once, which can be used by the strategy for applying backpressure.
§Examples
use zrx_executor::strategy::{Strategy, WorkSharing};
// Get capacity
let strategy = WorkSharing::default();
assert!(strategy.capacity() >= strategy.num_workers());