Skip to main content

Crate senders_receivers

Crate senders_receivers 

Source
Expand description

§A Tiny Example

use senders_receivers::*;

let sender = Just::from((1, 2, 3, 4))
           | Then::from(|(a, b, c, d)| (a * b * c * d,));
println!("outcome: {}", sender.sync_wait().expect("no error").expect("no cancelation").0);

What this does:

  • Just::from: declares an starting value for the sender chain.
  • Then::from: declares a transformation on these values.
  • | (the pipe symbol): used to bind them together.

None of the steps are run, until sync_wait is invoked.

§Signals

Each sender, produces either a value signal, an error signal, or a done signal. Exactly one of these will be produced.

A value signal indicates that the sender completed successfully, and produced a value.
An error signal indicates that the sender failed, and produced an Error.
A done signal indicates that the sender canceled its work (producing neither a value, nor an error).

§Schedulers

Each operation will run on a Scheduler. (Sometimes more than one, for example if you Transfer to a different scheduler.)

A scheduler encapsulates the concept of CPU-time. Different schedulers will have different characteristics, related to when/where things run.

Currently, the following schedulers are implemented:

§How To Use This

The system works by creating sender-chains, which consist of a sequence of senders.

§Initial Element of the Sender-Chain

The first element in a sender-chain is a TypedSender, which produces a value of some kind. Usually, this will be a Just or a Scheduler::schedule_value. The inital element produces a value, which is a tuple.

Example:

use senders_receivers::Just;

let sender_chain = Just::from((1, 2, 3));

To get the value produced by a sender-chain, you can use SyncWait::sync_wait. (This function will block until the sender-chain complets.)

use senders_receivers::{Just, SyncWait};

let sender_chain = Just::from((1, 2, 3));
let outcome = match sender_chain.sync_wait() {
    Ok(Some(values)) => values,
    Ok(None) => panic!("execution was canceled"),
    Err(error) => panic!("execution failed: {:?}", error),
};
assert_eq!(
    (1, 2, 3),
    outcome);

The SyncWait::sync_wait method returns a Result<Option< value-type >>, so we need two unwraps.

§Making the Sender-Chain do Actual Work

The above sender-chain is not very useful. But we can make the sender-chain do some work for us. For example, we can use Then to run some computation. To attach a sender, we use the | (pipe) symbol.

use senders_receivers::{Just, Then, SyncWait};

let sender_chain = Just::from((1, 2, 3));

// Declare we want to do a thing.
let sender_chain = sender_chain
                 | Then::from(|(x, y, z)| (x + y + z,));

// The code in `Then` doesn't run until we call `sync_wait`.
let outcome = match sender_chain.sync_wait() {
    Ok(Some(values)) => values,
    Ok(None) => panic!("execution was canceled"),
    Err(error) => panic!("execution failed: {:?}", error),
};
assert_eq!(
    (6,),
    outcome);

Then is a Sender. That means it can be added to a sender-chain, and the addition produces a new sender-chain. Since attaching a sender to a sender-chain results in a new sender-chain, you can keep doing this, creating more complex chains.

The operations on the sender-chain won’t run, until the sender is started using sync_wait.

§Composition

Multiple sender-chains can be merged together into a single sender-chain.

use senders_receivers::{when_all, Just, Then, SyncWait};

// The same chain from the previous example.
let sender_chain = Just::from((1, 2, 3))
                 | Then::from(|(x, y, z)| (x + y + z,));

// Some other sender-chain, that computes a different value.
let another_sender_chain = Just::from((7, 8));

let sender_chain = when_all!(
    sender_chain,
    another_sender_chain,
);

let outcome = match sender_chain.sync_wait() {
    Ok(Some(values)) => values,
    Ok(None) => panic!("execution was canceled"),
    Err(error) => panic!("execution failed: {:?}", error),
};
assert_eq!(
    (6, 7, 8),
    outcome);

§Schedulers

We can make the sender-chain run on a different scheduler. For example, a threadpool. sync_wait_send must be used, to allow completion across a thread boundary.

use senders_receivers::{Scheduler, SyncWaitSend};
use threadpool::ThreadPool;

let pool = ThreadPool::with_name("senders-receivers example".into(), 2);
let sender_chain = pool.schedule_value((1, 2, 3));
let outcome = match sender_chain.sync_wait_send() {
    Ok(Some(values)) => values,
    Ok(None) => panic!("execution was canceled"),
    Err(error) => panic!("execution failed: {:?}", error),
};
assert_eq!(
    (1, 2, 3),
    outcome);

We can even use temporary references.

use senders_receivers::{Then, Scheduler, SyncWaitSend};
use threadpool::ThreadPool;

let pool = ThreadPool::with_name("senders-receivers example".into(), 2);

// We place the code in a scope, to show off that it handles lifetimes.
let (outcome,) = {
    let x = 6;
    let y = 7;

    let sender = pool.schedule()
               | Then::from(|_| (x * y,));
    sender.sync_wait_send().unwrap().unwrap()
};

assert_eq!(42, outcome);

We can also swap to a different scheduler part-way through a calculation, using the Transfer sender.

use senders_receivers::{Just, Then, Transfer, SyncWaitSend};
use threadpool::ThreadPool;

let pool = ThreadPool::with_name("senders-receivers example".into(), 2);

let sender_chain = Just::default()
                 | Then::from(|_| (String::from("I run on the local thread"),))
                 | Transfer::new(pool)
                 | Then::from(|(previous,)| (previous, String::from("I run on the threadpool")));

let outcome = match sender_chain.sync_wait_send() {
    Ok(Some(values)) => values,
    Ok(None) => panic!("execution was canceled"),
    Err(error) => panic!("execution failed: {:?}", error),
};
assert_eq!(
    (String::from("I run on the local thread"), String::from("I run on the threadpool")),
    outcome);

§Combining Threadpool and Composition

We can combine the threadpool and composition, to run two or more tasks in parallel.

This example will print out first task, second task, and third task in some unspecified order. They will run on the thread-pool, which will place them on one of its worker threads.

use senders_receivers::{when_all, Scheduler, Then, SyncWaitSend};
use threadpool::ThreadPool;

let pool = ThreadPool::with_name("senders-receivers example".into(), 2);

let first_sender_chain = pool.schedule_value(("first",))
                       | Then::from(|(x,)| {
                             println!("{} task", x);
                             (x,)
                         });
let second_sender_chain = pool.schedule_value(("second",))
                        | Then::from(|(x,)| {
                              println!("{} task", x);
                              (x,)
                          });
let third_sender_chain = pool.schedule_value(("third",))
                       | Then::from(|(x,)| {
                             println!("{} task", x);
                             (x,)
                         });
// Declare we want all three tasks to run as part of our sender-chain.
let sender_chain = when_all!(
    first_sender_chain,
    second_sender_chain,
    third_sender_chain,
);

let outcome = match sender_chain.sync_wait_send() {
    Ok(Some(values)) => values,
    Ok(None) => panic!("execution was canceled"),
    Err(error) => panic!("execution failed: {:?}", error),
};
assert_eq!(
    ("first", "second", "third"),
    outcome);

Modules§

embarrasingly_parallel
A scheduler that uses an embarrasingly-parallel strategy for executing tasks.
functor
The functors mod holds functions.
io
The io mod holds IO utilities.
refs
Reference logic for senders/receivers.
stop_token
Stop-tokens are used to signal a sender-chain to stop processing early.
tuple
Tuple utility types.

Macros§

transfer_when_all
Combine multiple sender-chains into a single sender-chain.
when_all
Combine multiple sender-chains into a single sender-chain.

Structs§

ImmediateScheduler
An immediate-scheduler is a Scheduler which runs any tasks on it immediately.
Just
A typed-sender that holds a tuple of values.
JustDone
A TypedSender that always generates a done-signal.
JustError
A TypedSender that always generates an Error.
LetDone
Create a let-done Sender.
LetError
Create a let-error Sender.
LetValue
Create a let-value Sender.
SharedError
Wrap an Error, so it can be shared across multiple sender-chains.
Split
Split turns a sender-chain into a re-usable sender-chain.
SplitSend
Split turns a sender-chain into a re-usable sender-chain.
StopIfRequested
Sender that cancels further processing, if the StopToken requests such.
Then
A then operation takes the current value signal, and transforms it in some way. The result of the function is the new value signal.
Transfer
Transfer to a different Scheduler.
UponDone
Handle a done-signal, by transforming it into a value signal.
UponError
Handle an error-signal, by transforming an Error into a value signal.

Traits§

BindSender
Sender can extend TypedSender. In order to do that, a function is invoked on that sender, with the typed sender as an argument. BindSender models the binding of the sender with a typed sender.
OperationState
An operation state is a TypedSender with matching ReceiverOf. It’s ready to run, just waiting to be started.
Receiver
Common receiver logic. All receivers can accept the done signal, and the error signal.
ReceiverOf
Declare that this is a receiver that can accept a specific Values type.
Scheduler
Schedulers are things that can do work. All sender chains run on a scheduler.
Sender
A sender is a type that describes a step in an operation.
StartDetached
The start-detached operation allows to start an operation, without waiting for its completion.
StartDetachedWithStopToken
The StartDetached operation, but with an associated StopToken to allow for stopping the operation.
SyncWait
Trait that implements the sync_wait method.
SyncWaitSend
Trait that implements the sync_wait_send method.
TypedSender
A typed sender is a sender, which describes an entire operation.
TypedSenderConnect
Trait for implementing connect functionality.
WithScheduler
This trait allows us to construct things from a scheduler and an argument.

Functions§

ensure_started
Start the sender-chain, while allowing for attaching further elements.
ensure_started_send
Start the sender-chain, while allowing for attaching further elements.
new_error
Create a new error from something that looks like an error.

Type Aliases§

Error
Errors are passed as opaque types.
Result
Result type used in senders/receivers.