1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
// Copyright (c) 2016 Daniel Grunwald
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of this
// software and associated documentation files (the "Software"), to deal in the Software
// without restriction, including without limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons
// to whom the Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all copies or
// substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
// INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE
// FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

/*!
Rust-Cancellation is a small crate that provides the `CancellationToken` type
that can be used to signal cancellation to other code in a composable manner.

Operations that support cancellation usually accept a `ct: &CancellationToken` parameter.
They can either cooperatively check `ct.is_canceled()`, or use `ct.run()` to get notified
via callback when cancellation is requested.

Operations that finish asynchronously may instead accept `ct: Arc<CancellationToken>`.

To create a `CancellationToken`, use the type `CancellationTokenSource`.
A `CancellationTokenSource` contains a `Arc<CancellationToken>`
(which can be obtained using the `token()` method, or using deref coercions),
and additionally provides the `cancel()` operation to mark the token as canceled.

```rust
extern crate cancellation;
use cancellation::{CancellationToken, CancellationTokenSource, OperationCanceled};
use std::{time, thread};

fn cancelable_sum(values: &[i32], ct: &CancellationToken) -> Result<i32, OperationCanceled> {
    let mut sum = 0;
    for val in values {
        try!(ct.result());
        sum = sum + val;
        thread::sleep(time::Duration::from_secs(1));
    }
    Ok(sum)
}

fn main() {
    let cts = CancellationTokenSource::new();
    cts.cancel_after(time::Duration::from_millis(1500));
    assert_eq!(Err(OperationCanceled), cancelable_sum(&[1,2,3], &cts));
}
```

Using the `CancellationToken::run()` method, an action can be executed when the token is canceled.


```rust
extern crate cancellation;
use cancellation::{CancellationToken, CancellationTokenSource, OperationCanceled};
use std::{time, thread};
use std::time::Duration;

fn cancelable_sleep(dur: Duration, ct: &CancellationToken) -> Result<(), OperationCanceled> {
    let th = thread::current();
    ct.run(
        || { // the on_cancel closure runs on the canceling thread when the token is canceled
            th.unpark();
        },
        || { // this code block runs on the current thread and contains the cancelable operation
            thread::park_timeout(dur) // (TODO: handle spurious wakeups)
        }
    );
    if ct.is_canceled() {
        // The run() call above has a race condition: the on_cancel callback might call unpark()
        // after park_timeout gave up after waiting dur, but before the end of the run() call
        // deregistered the on_cancel callback.
        // We use a park() call with 0s timeout to consume the left-over parking token, if any.
        thread::park_timeout(Duration::from_secs(0));
        Err(OperationCanceled)
    } else {
        Ok(())
    }
}

fn main() {
    let cts = CancellationTokenSource::new();
    cts.cancel_after(Duration::from_millis(250));
    assert_eq!(Err(OperationCanceled), cancelable_sleep(Duration::from_secs(10), &cts));
}
```

**/

use std::{fmt, ops, mem, ptr, io, error, time, thread};
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
use std::sync::{Arc, Mutex};

#[repr(C)]
pub struct CancellationTokenSource {
    token: Arc<CancellationToken>
}

const STATUS_CANNOT_BE_CANCELED : usize = 0;
const STATUS_NOT_CANCELED : usize = 1;
const STATUS_CANCELING : usize = 2;
const STATUS_CANCELED : usize = 3;

/// A CancellationToken is used to query whether an operation should be canceled.
///
/// To cancel a token, use `CancellationTokenSource`.
///
/// Use `CancellationTokenSource` to obtain instances of `CancellationToken`.
pub struct CancellationToken {
    status: AtomicUsize,
    // Use mutex so we don't have to worry about thread-safety while managing the registrations.
    // The mutex also ensures that `CancellationToken::run()` can't return while the on_cancel callback is still running.
    // The option around the mutex allows us to construct the NO_CANCELLATION token.
    
    // The `*mut Registration` points to the first active registration.
    // Registrations are connected in a double-linked-list in order to
    // support O(1) removal.
    // The back-link (`Registration::link_to_this`) is of type `*mut *mut Registration`
    // and may refer to the contents of this mutex (for the first node) or the `Registration::next` of the previous node.
    registrations: Option<Mutex<*mut Registration<'static>>>
}

// AtomicUsize and Mutex are both Sync;
// we need this unsafe impl only because *mut isn't Send.
unsafe impl Sync for CancellationToken {}
unsafe impl Send for CancellationToken {}

static NO_CANCELLATION: CancellationToken = CancellationToken {
    status: ATOMIC_USIZE_INIT, //AtomicUsize::new(STATUS_CANNOT_BE_CANCELED),
    registrations: None
};

/// Unit struct used to indicate that an operation was canceled.
///
/// Usually used as `Result<T, OperationCanceled>`.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct OperationCanceled;

// Helper trait for Option<C> where C:FnOnce()
trait FnOnceOption {
    // equivalent to self.take().map(|c| c())
    fn call_once(&mut self) -> Option<()>;
}

impl<C> FnOnceOption for Option<C> where C: FnOnce() {
    fn call_once(&mut self) -> Option<()> {
        self.take().map(|c| c())
    }
}

/// Registrations are the entries in the linked list of on_cancel callbacks.
/// They are unsafely shared across threads.
/// Access is synchronized using the cancellation token's mutex.
struct Registration<'a> {
    on_cancel: &'a mut (FnOnceOption + Send + 'a),
    cancellation_token: &'a CancellationToken,
    // Next registration in the linked list.
    next: *mut Registration<'static>,
    // Link to the previous node's next field.
    // For the first node, this points to the contents of the CancellationToken::registration mutex.
    // The address of the pointed-to-field is stable:
    // Registrations are never moved (they only exist in `CancellationToken::run()`'s stack frame);
    // and we know the CancellationToken cannot move because it's being borrowed by this registration.
    link_to_this: *mut *mut Registration<'static>
}

unsafe fn erase_lifetime(r: &mut Registration) -> *mut Registration<'static> {
    mem::transmute(r)
}

/// Remove registration from the linked list of registrations.
/// May only be called while the registration mutex is acquired.
/// Assumes that r.link_to_this is not null.
unsafe fn unlink(r: &mut Registration) {
    assert!(*r.link_to_this == erase_lifetime(r));
    // let previous node point to the next:
    *r.link_to_this = r.next;
    if !r.next.is_null() {
        // update link_to_this of the next node
        (*r.next).link_to_this = r.link_to_this
    }
    // null out the links of the removed node
    r.link_to_this = ptr::null_mut();
    r.next = ptr::null_mut();
}

impl CancellationTokenSource {
    pub fn new() -> CancellationTokenSource {
        CancellationTokenSource {
            token: Arc::new(CancellationToken {
                status: AtomicUsize::new(STATUS_NOT_CANCELED),
                registrations: Some(Mutex::new(ptr::null_mut()))
            })
        }
    }

    /// Gets the token managed by this CancellationTokenSource.
    ///
    /// The `Arc` can be cloned so that the token can be passed into
    /// asynchronous tasks that might outlive the CancellationTokenSource.
    #[inline]
    pub fn token(&self) -> &Arc<CancellationToken> {
        &self.token
    }

    /// Marks the cancellation token as canceled.
    ///
    /// Executes the `on_cancel` callback of any active `CancellationToken::run` invocations.
    /// Has no effect if the cancellation token was already canceled.
    pub fn cancel(&self) {
        self.token.cancel()
    }

    /// Creates a new, detached thread that waits for the specified duration
    /// and then marks the cancellation token as canceled.
    pub fn cancel_after(&self, dur: time::Duration) {
        let token = self.token.clone();
        thread::spawn(move || {
            thread::sleep(dur);
            token.cancel()
        });
    }
}

impl CancellationToken {
    /// Returns a reference to a cancellation token that is never canceled.
    #[inline]
    pub fn none() -> &'static CancellationToken {
        &NO_CANCELLATION
    }

    fn status_string(&self) -> &'static str {
        match self.status.load(Ordering::Acquire) {
            STATUS_CANNOT_BE_CANCELED => "cannot be canceled",
            STATUS_NOT_CANCELED => "not canceled",
            STATUS_CANCELING => "canceling",
            STATUS_CANCELED => "canceled",
            _ => "invalid"
        }
    }

    /// Gets whether this token has been canceled.
    ///
    /// This function is inherently racy: it may start returning `true`
    /// at any moment as the token is canceled by another thread.
    ///
    /// However, once the function returns `true`, it will always keep returning `true`
    /// (cancellation tokens cannot be reset).
    #[inline]
    pub fn is_canceled(&self) -> bool {
        self.status.load(Ordering::Acquire) >= STATUS_CANCELING
    }

    /// Returns `Ok(())` if this token has not been canceled.
    /// Returns `Err(OperationCanceled)` if this token has been canceled.
    ///
    /// This is an alternative to `is_canceled()` that can be
    /// used with the `try!()` macro.
    #[inline]
    pub fn result(&self) -> Result<(), OperationCanceled> {
        if self.is_canceled() {
            Err(OperationCanceled)
        } else {
            Ok(())
        }
    }

    fn cancel(&self) {
        if self.is_canceled() {
            // avoid deadlock if cancel() is called within on_cancel callback
            return;
        }
        let mut registrations = self.registrations.as_ref().unwrap().lock().unwrap();
        let status = self.status.load(Ordering::Relaxed);
        if status == STATUS_CANCELED {
            return; // already canceled
        }
        assert!(status == STATUS_NOT_CANCELED);
        self.status.store(STATUS_CANCELING, Ordering::Release);
        while !registrations.is_null() {
            unsafe {
                let registration = &mut **registrations;
                unlink(registration);
                registration.on_cancel.call_once();
            }
        }
        self.status.store(STATUS_CANCELED, Ordering::Release);
    }

    /// Runs function `f` on the current thread.
    /// If the token is canceled while `f` is running,
    /// the `on_cancel` function will be executed by the
    /// thread calling `cancel()`.
    ///
    /// If the `f` callback returns before the `on_cancel` callback does,
    /// `run()` waits for the `on_cancel` callback to finish before returning.
    /// Any memory writes performed by the `on_cancel` callback will be visible
    /// after `run()` returns.
    ///
    /// If the token is already canceled when this function
    /// is called, `on_cancel` will be executed on the
    /// current thread before `f` is called.
    ///
    /// If the token is not canceled during the execution of `f`, the
    /// `on_cancel` callback will not be called at all.
    ///
    /// Panics in the `f` callback are supported: unwinding will wait for
    /// `on_cancel` to finish (if it is running concurrently) and unregister
    /// the `on_cancel` callback from the token.
    ///
    /// Panics in the `on_cancel` callback may abort the process.
    pub fn run<C, F, R>(&self, on_cancel: C, f: F) -> R
        where C: FnOnce() + Send,
              F: FnOnce() -> R
    {
        let mut on_cancel = Some(on_cancel);
        // Create a dummy registration
        let mut registration: Option<Registration> = None;

        // Initialization part is extracted into new function so that it doesn't get
        // unnecessarily monomorphized.
        fn init_registration<'a>(
                token: &'a CancellationToken,
                on_cancel: &'a mut (FnOnceOption + Send + 'a),
                registration: &mut Option<Registration<'a>>)
        {
            // Check the status before acquiring the lock.
            // This is important to avoid deadlocks when the token is re-used within an on_cancel callback.
            match token.status.load(Ordering::Acquire) {
                STATUS_CANNOT_BE_CANCELED => { }
                STATUS_NOT_CANCELED => {
                    let mut mutex_guard = token.registrations.as_ref().unwrap().lock().unwrap();
                    // the status might have changed while we waited for the lock
                    match token.status.load(Ordering::Relaxed) {
                        STATUS_NOT_CANCELED => {
                            // Insert registration into linked list
                            let first_registration: &mut *mut Registration = &mut *mutex_guard;
                            *registration = Some(Registration {
                                on_cancel: on_cancel,
                                cancellation_token: token,
                                next: *first_registration,
                                link_to_this: first_registration
                            });
                            // Erasing the lifetime of the registration is safe,
                            // because the Drop impl of the registration will undo this assignment
                            // before on_cancel is dropped.
                            *first_registration = unsafe { erase_lifetime(registration.as_mut().unwrap()) };
                        },
                        STATUS_CANCELED => {
                            // if already canceled, run the on_cancel callback immediately
                            on_cancel.call_once();
                        },
                        _ => {
                            // STATUS_CANNOT_BE_CANCELED is handled by the outer `match`,
                            // STATUS_CANCELING should be impossible to observe within the mutex
                            panic!("invalid status")
                        }
                    }
                }, // release mutex
                STATUS_CANCELING | STATUS_CANCELED => {
                    // If already canceling/canceled, run the on_cancel callback immediately
                    // It's important to handle this case in the outer match
                    on_cancel.call_once();
                },
                _ => {
                    panic!("invalid status")
                }
            }
        }
        init_registration(self, &mut on_cancel, &mut registration);
        return f();

        // The registration will be dropped automatically here
        impl <'a> Drop for Registration<'a> {
            fn drop(&mut self) {
                let _mutex_guard = self.cancellation_token.registrations.as_ref().unwrap().lock().unwrap();
                if !self.link_to_this.is_null() {
                    unsafe { unlink(self); }
                }
            }
        }
    }
}

impl fmt::Debug for CancellationTokenSource {
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
        f.debug_struct("CancellationTokenSource")
            .field("status", &self.status_string())
            .finish()
    }
}

impl fmt::Debug for CancellationToken {
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
        f.debug_struct("CancellationToken")
            .field("status", &self.status_string())
            .finish()
    }
}

impl ops::Deref for CancellationTokenSource {
    type Target = CancellationToken;

    #[inline]
    fn deref(&self) -> &CancellationToken {
        &self.token
    }
}

impl fmt::Display for OperationCanceled {
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
        f.write_str(error::Error::description(self))
    }
}

impl error::Error for OperationCanceled {
    fn description(&self) -> &'static str {
        "The operation was canceled."
    }
}

impl From<OperationCanceled> for io::Error {
    fn from(oc: OperationCanceled) -> Self {
        io::Error::new(io::ErrorKind::TimedOut, oc)
    }
}

#[cfg(test)]
mod test {
    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
    use super::*;

    #[test]
    fn none_not_canceled() {
        assert_eq!(false, CancellationToken::none().is_canceled());
    }

    #[test]
    fn none_run() {
        let b = AtomicBool::new(false);
        assert_eq!(42, CancellationToken::none().run(
            || b.store(true, Ordering::Relaxed),
            || 42
        ));
        assert_eq!(false, b.load(Ordering::Relaxed));
    }

    #[test]
    fn cancel() {
        let cts = CancellationTokenSource::new();
        assert_eq!(false, cts.is_canceled());
        assert_eq!(Ok(()), cts.result());
        cts.cancel();
        assert_eq!(true, cts.is_canceled());
        assert_eq!(Err(OperationCanceled), cts.result());
    }

    fn expect(state: &AtomicUsize, expected_state: usize) {
        assert_eq!(state.load(Ordering::Acquire), expected_state);
        state.store(expected_state + 1, Ordering::Release);
    }

    #[test]
    fn run_already_canceled() {
        let cts = CancellationTokenSource::new();
        cts.cancel();
        let state = AtomicUsize::new(0);
        expect(&state, 0);
        cts.run(
            || {
                assert!(cts.is_canceled());
                expect(&state, 1);
            },
            || expect(&state, 2)
        );
        expect(&state, 3);
    }

    #[test]
    fn recursive_run_already_canceled() {
        let cts = CancellationTokenSource::new();
        cts.cancel();
        let state = AtomicUsize::new(0);
        cts.run(
            || cts.run(
                || expect(&state, 0),
                || expect(&state, 1),
               ),
            || cts.run(
                || expect(&state, 2),
                || expect(&state, 3),
        ));
        expect(&state, 4);
    }

    #[test]
    fn cancel_in_recursive_run() {
        let cts = CancellationTokenSource::new();
        let state = AtomicUsize::new(0);
        cts.run(
            || expect(&state, 3),
            || {
                expect(&state, 0);
                cts.run(
                    || expect(&state, 2),
                    || {
                        expect(&state, 1);
                        cts.cancel();
                        expect(&state, 4);
                       }
                );
                expect(&state, 5);
            }
        );
        expect(&state, 6);
    }

    #[test]
    fn on_cancel_is_not_called_after_end_of_run() {
        let cts = CancellationTokenSource::new();
        let state = AtomicUsize::new(0);
        cts.run(
            || expect(&state, 2),
            || {
                cts.run(
                    || panic!("bad!"),
                    || expect(&state, 0)
                );
                expect(&state, 1);
                cts.cancel();
                expect(&state, 3);
            });
    }
}