Crate cancellation

Source
Expand description

Rust-Cancellation is a small crate that provides the CancellationToken type that can be used to signal cancellation to other code in a composable manner.

Operations that support cancellation usually accept a ct: &CancellationToken parameter. They can either cooperatively check ct.is_canceled(), or use ct.run() to get notified via callback when cancellation is requested.

Operations that finish asynchronously may instead accept ct: Arc<CancellationToken>.

To create a CancellationToken, use the type CancellationTokenSource. A CancellationTokenSource contains a Arc<CancellationToken> (which can be obtained using the token() method, or using deref coercions), and additionally provides the cancel() operation to mark the token as canceled.

extern crate cancellation;
use cancellation::{CancellationToken, CancellationTokenSource, OperationCanceled};
use std::{time, thread};

fn cancelable_sum(values: &[i32], ct: &CancellationToken) -> Result<i32, OperationCanceled> {
    let mut sum = 0;
    for val in values {
        try!(ct.result());
        sum = sum + val;
        thread::sleep(time::Duration::from_secs(1));
    }
    Ok(sum)
}

fn main() {
    let cts = CancellationTokenSource::new();
    cts.cancel_after(time::Duration::from_millis(1500));
    assert_eq!(Err(OperationCanceled), cancelable_sum(&[1,2,3], &cts));
}

Using the CancellationToken::run() method, an action can be executed when the token is canceled.

extern crate cancellation;
use cancellation::{CancellationToken, CancellationTokenSource, OperationCanceled};
use std::{time, thread};
use std::time::Duration;

fn cancelable_sleep(dur: Duration, ct: &CancellationToken) -> Result<(), OperationCanceled> {
    let th = thread::current();
    ct.run(
        || { // the on_cancel closure runs on the canceling thread when the token is canceled
            th.unpark();
        },
        || { // this code block runs on the current thread and contains the cancelable operation
            thread::park_timeout(dur) // (TODO: handle spurious wakeups)
        }
    );
    if ct.is_canceled() {
        // The run() call above has a race condition: the on_cancel callback might call unpark()
        // after park_timeout gave up after waiting dur, but before the end of the run() call
        // deregistered the on_cancel callback.
        // We use a park() call with 0s timeout to consume the left-over parking token, if any.
        thread::park_timeout(Duration::from_secs(0));
        Err(OperationCanceled)
    } else {
        Ok(())
    }
}

fn main() {
    let cts = CancellationTokenSource::new();
    cts.cancel_after(Duration::from_millis(250));
    assert_eq!(Err(OperationCanceled), cancelable_sleep(Duration::from_secs(10), &cts));
}

Structsยง