Struct oqueue::Sequencer

source ·
pub struct Sequencer { /* private fields */ }
Expand description

Synchronization mechanism for performing non-interleaved output from concurrent tasks.

Bare-bones skeleton

This example performs 30 tasks in parallel across a pool of 10 threads. Each task writes one line of output. All output is guaranteed to appear in order by task index from 0 through 29.

use oqueue::Sequencer;

fn main() {
    let oqueue = Sequencer::stderr();

    // Launch 10 worker threads.
    rayon::scope(|scope| {
        for _ in 0..10 {
            scope.spawn(|_| worker(&oqueue));
        }
    });
}

fn worker(oqueue: &Sequencer) {
    loop {
        let task = oqueue.begin();
        if task.index >= 30 {
            return;
        }
        writeln!(task, "hello from task #{}", task.index);
    }
}
▷ Click to show output
hello from task #0
hello from task #1
hello from task #2
hello from task #3
hello from task #4
hello from task #5
hello from task #6
hello from task #7
hello from task #8
hello from task #9
hello from task #10
hello from task #11
hello from task #12
hello from task #13
hello from task #14
hello from task #15
hello from task #16
hello from task #17
hello from task #18
hello from task #19
hello from task #20
hello from task #21
hello from task #22
hello from task #23
hello from task #24
hello from task #25
hello from task #26
hello from task #27
hello from task #28
hello from task #29

Shared slice skeleton

This example uses a shared slice to coordinate work that needs to be performed. Tasks perform work on one element of the slice according to their task index.

use oqueue::Sequencer;

struct WorkItem(u8);

fn main() {
    let oqueue = Sequencer::stderr();
    let work = (b'A'..=b'Z').map(WorkItem).collect::<Vec<_>>();

    // Launch 10 worker threads.
    rayon::scope(|scope| {
        for i in 0..10 {
            let oqueue = &oqueue;
            let work = &work;
            scope.spawn(move |_| worker(i, oqueue, work));
        }
    });
}

fn worker(thread: usize, oqueue: &Sequencer, work: &[WorkItem]) {
    loop {
        let task = oqueue.begin();
        let input = match work.get(task.index) {
            Some(input) => input,
            None => return,
        };
        writeln!(
            task,
            "thread {} is performing work {}",
            thread, input.0 as char,
        );
    }
}
▷ Click to show output
thread 0 is performing work A
thread 9 is performing work B
thread 1 is performing work C
thread 2 is performing work D
thread 0 is performing work E
thread 0 is performing work F
thread 0 is performing work G
thread 0 is performing work H
thread 0 is performing work I
thread 0 is performing work J
thread 2 is performing work K
thread 9 is performing work L
thread 9 is performing work M
thread 9 is performing work N
thread 9 is performing work O
thread 9 is performing work P
thread 9 is performing work Q
thread 9 is performing work R
thread 0 is performing work S
thread 0 is performing work T
thread 0 is performing work U
thread 0 is performing work V
thread 2 is performing work W
thread 1 is performing work X
thread 1 is performing work Y
thread 1 is performing work Z

Synchronized queue skeleton

This example uses a synchronized queue of work items in the form of a mutex holding an iterator, although any other channel-like implementation could work too. The task index is not used in this approach.

use oqueue::Sequencer;
use std::sync::Mutex;

struct WorkItem(u8);

fn main() {
    let oqueue = Sequencer::stderr();
    let work = Mutex::new((b'A'..=b'Z').map(WorkItem));

    // Launch 10 worker threads.
    rayon::scope(|scope| {
        for i in 0..10 {
            let oqueue = &oqueue;
            let work = &work;
            scope.spawn(move |_| worker(i, oqueue, work));
        }
    });
}

fn worker(thread: usize, oqueue: &Sequencer, work: &Mutex<dyn Iterator<Item = WorkItem>>) {
    loop {
        let task = oqueue.begin();
        let input = match work.lock().unwrap().next() {
            Some(input) => input,
            None => return,
        };
        writeln!(
            task,
            "thread {} is performing work {}",
            thread, input.0 as char,
        );
    }
}
▷ Click to show output
thread 9 is performing work A
thread 0 is performing work B
thread 9 is performing work C
thread 9 is performing work D
thread 9 is performing work E
thread 9 is performing work F
thread 9 is performing work G
thread 9 is performing work H
thread 9 is performing work I
thread 1 is performing work J
thread 9 is performing work K
thread 2 is performing work L
thread 1 is performing work M
thread 2 is performing work N
thread 0 is performing work O
thread 9 is performing work P
thread 1 is performing work Q
thread 1 is performing work R
thread 1 is performing work S
thread 0 is performing work T
thread 1 is performing work U
thread 2 is performing work V
thread 9 is performing work W
thread 0 is performing work X
thread 1 is performing work Y
thread 1 is performing work Z

Implementations§

source§

impl Sequencer

source

pub fn stdout() -> Self

Makes a sequencer whose output goes to stdout.

source

pub fn stderr() -> Self

Makes a sequencer whose output goes to stderr.

source

pub fn begin(&self) -> Task

Begins the next available task.

The caller may figure out what work to perform based on the index of this task available in task.index, or by acquiring work from a synchronized queue that is shared across workers.

This call does not block.

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for Twhere U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.