use std::{fmt, ops, mem, ptr, io, error, time, thread};
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
use std::sync::{Arc, Mutex};
#[repr(C)]
pub struct CancellationTokenSource {
token: Arc<CancellationToken>
}
const STATUS_CANNOT_BE_CANCELED : usize = 0;
const STATUS_NOT_CANCELED : usize = 1;
const STATUS_CANCELING : usize = 2;
const STATUS_CANCELED : usize = 3;
pub struct CancellationToken {
status: AtomicUsize,
registrations: Option<Mutex<*mut Registration<'static>>>
}
unsafe impl Sync for CancellationToken {}
unsafe impl Send for CancellationToken {}
static NO_CANCELLATION: CancellationToken = CancellationToken {
status: ATOMIC_USIZE_INIT, registrations: None
};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct OperationCanceled;
trait FnOnceOption {
fn call_once(&mut self) -> Option<()>;
}
impl<C> FnOnceOption for Option<C> where C: FnOnce() {
fn call_once(&mut self) -> Option<()> {
self.take().map(|c| c())
}
}
struct Registration<'a> {
on_cancel: &'a mut (FnOnceOption + Send + 'a),
cancellation_token: &'a CancellationToken,
next: *mut Registration<'static>,
link_to_this: *mut *mut Registration<'static>
}
unsafe fn erase_lifetime(r: &mut Registration) -> *mut Registration<'static> {
mem::transmute(r)
}
unsafe fn unlink(r: &mut Registration) {
assert!(*r.link_to_this == erase_lifetime(r));
*r.link_to_this = r.next;
if !r.next.is_null() {
(*r.next).link_to_this = r.link_to_this
}
r.link_to_this = ptr::null_mut();
r.next = ptr::null_mut();
}
impl CancellationTokenSource {
pub fn new() -> CancellationTokenSource {
CancellationTokenSource {
token: Arc::new(CancellationToken {
status: AtomicUsize::new(STATUS_NOT_CANCELED),
registrations: Some(Mutex::new(ptr::null_mut()))
})
}
}
#[inline]
pub fn token(&self) -> &Arc<CancellationToken> {
&self.token
}
pub fn cancel(&self) {
self.token.cancel()
}
pub fn cancel_after(&self, dur: time::Duration) {
let token = self.token.clone();
thread::spawn(move || {
thread::sleep(dur);
token.cancel()
});
}
}
impl CancellationToken {
#[inline]
pub fn none() -> &'static CancellationToken {
&NO_CANCELLATION
}
fn status_string(&self) -> &'static str {
match self.status.load(Ordering::Acquire) {
STATUS_CANNOT_BE_CANCELED => "cannot be canceled",
STATUS_NOT_CANCELED => "not canceled",
STATUS_CANCELING => "canceling",
STATUS_CANCELED => "canceled",
_ => "invalid"
}
}
#[inline]
pub fn is_canceled(&self) -> bool {
self.status.load(Ordering::Acquire) >= STATUS_CANCELING
}
#[inline]
pub fn result(&self) -> Result<(), OperationCanceled> {
if self.is_canceled() {
Err(OperationCanceled)
} else {
Ok(())
}
}
fn cancel(&self) {
if self.is_canceled() {
return;
}
let mut registrations = self.registrations.as_ref().unwrap().lock().unwrap();
let status = self.status.load(Ordering::Relaxed);
if status == STATUS_CANCELED {
return; }
assert!(status == STATUS_NOT_CANCELED);
self.status.store(STATUS_CANCELING, Ordering::Release);
while !registrations.is_null() {
unsafe {
let registration = &mut **registrations;
unlink(registration);
registration.on_cancel.call_once();
}
}
self.status.store(STATUS_CANCELED, Ordering::Release);
}
pub fn run<C, F, R>(&self, on_cancel: C, f: F) -> R
where C: FnOnce() + Send,
F: FnOnce() -> R
{
let mut on_cancel = Some(on_cancel);
let mut registration: Option<Registration> = None;
fn init_registration<'a>(
token: &'a CancellationToken,
on_cancel: &'a mut (FnOnceOption + Send + 'a),
registration: &mut Option<Registration<'a>>)
{
match token.status.load(Ordering::Acquire) {
STATUS_CANNOT_BE_CANCELED => { }
STATUS_NOT_CANCELED => {
let mut mutex_guard = token.registrations.as_ref().unwrap().lock().unwrap();
match token.status.load(Ordering::Relaxed) {
STATUS_NOT_CANCELED => {
let first_registration: &mut *mut Registration = &mut *mutex_guard;
*registration = Some(Registration {
on_cancel: on_cancel,
cancellation_token: token,
next: *first_registration,
link_to_this: first_registration
});
*first_registration = unsafe { erase_lifetime(registration.as_mut().unwrap()) };
},
STATUS_CANCELED => {
on_cancel.call_once();
},
_ => {
panic!("invalid status")
}
}
}, STATUS_CANCELING | STATUS_CANCELED => {
on_cancel.call_once();
},
_ => {
panic!("invalid status")
}
}
}
init_registration(self, &mut on_cancel, &mut registration);
return f();
impl <'a> Drop for Registration<'a> {
fn drop(&mut self) {
let _mutex_guard = self.cancellation_token.registrations.as_ref().unwrap().lock().unwrap();
if !self.link_to_this.is_null() {
unsafe { unlink(self); }
}
}
}
}
}
impl fmt::Debug for CancellationTokenSource {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_struct("CancellationTokenSource")
.field("status", &self.status_string())
.finish()
}
}
impl fmt::Debug for CancellationToken {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_struct("CancellationToken")
.field("status", &self.status_string())
.finish()
}
}
impl ops::Deref for CancellationTokenSource {
type Target = CancellationToken;
#[inline]
fn deref(&self) -> &CancellationToken {
&self.token
}
}
impl fmt::Display for OperationCanceled {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.write_str(error::Error::description(self))
}
}
impl error::Error for OperationCanceled {
fn description(&self) -> &'static str {
"The operation was canceled."
}
}
impl From<OperationCanceled> for io::Error {
fn from(oc: OperationCanceled) -> Self {
io::Error::new(io::ErrorKind::TimedOut, oc)
}
}
#[cfg(test)]
mod test {
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use super::*;
#[test]
fn none_not_canceled() {
assert_eq!(false, CancellationToken::none().is_canceled());
}
#[test]
fn none_run() {
let b = AtomicBool::new(false);
assert_eq!(42, CancellationToken::none().run(
|| b.store(true, Ordering::Relaxed),
|| 42
));
assert_eq!(false, b.load(Ordering::Relaxed));
}
#[test]
fn cancel() {
let cts = CancellationTokenSource::new();
assert_eq!(false, cts.is_canceled());
assert_eq!(Ok(()), cts.result());
cts.cancel();
assert_eq!(true, cts.is_canceled());
assert_eq!(Err(OperationCanceled), cts.result());
}
fn expect(state: &AtomicUsize, expected_state: usize) {
assert_eq!(state.load(Ordering::Acquire), expected_state);
state.store(expected_state + 1, Ordering::Release);
}
#[test]
fn run_already_canceled() {
let cts = CancellationTokenSource::new();
cts.cancel();
let state = AtomicUsize::new(0);
expect(&state, 0);
cts.run(
|| {
assert!(cts.is_canceled());
expect(&state, 1);
},
|| expect(&state, 2)
);
expect(&state, 3);
}
#[test]
fn recursive_run_already_canceled() {
let cts = CancellationTokenSource::new();
cts.cancel();
let state = AtomicUsize::new(0);
cts.run(
|| cts.run(
|| expect(&state, 0),
|| expect(&state, 1),
),
|| cts.run(
|| expect(&state, 2),
|| expect(&state, 3),
));
expect(&state, 4);
}
#[test]
fn cancel_in_recursive_run() {
let cts = CancellationTokenSource::new();
let state = AtomicUsize::new(0);
cts.run(
|| expect(&state, 3),
|| {
expect(&state, 0);
cts.run(
|| expect(&state, 2),
|| {
expect(&state, 1);
cts.cancel();
expect(&state, 4);
}
);
expect(&state, 5);
}
);
expect(&state, 6);
}
#[test]
fn on_cancel_is_not_called_after_end_of_run() {
let cts = CancellationTokenSource::new();
let state = AtomicUsize::new(0);
cts.run(
|| expect(&state, 2),
|| {
cts.run(
|| panic!("bad!"),
|| expect(&state, 0)
);
expect(&state, 1);
cts.cancel();
expect(&state, 3);
});
}
}