asynchronix/executor/task/
cancel_token.rs

1extern crate alloc;
2
3use std::alloc::{dealloc, Layout};
4use std::future::Future;
5use std::mem::ManuallyDrop;
6use std::panic::{RefUnwindSafe, UnwindSafe};
7
8use crate::loom_exports::sync::atomic::{self, Ordering};
9
10use super::runnable::Runnable;
11use super::util::{runnable_exists, RunOnDrop};
12use super::Task;
13use super::{CLOSED, POLLING, REF_INC, REF_MASK};
14
15/// Virtual table for a `CancelToken`.
16#[derive(Debug)]
17struct VTable {
18    cancel: unsafe fn(*const ()),
19    drop: unsafe fn(*const ()),
20}
21
22/// Cancels a pending task.
23///
24/// If the task is completed, nothing is done. If the task is not completed
25/// but not currently scheduled (no `Runnable` exist) then the future is
26/// dropped immediately. Otherwise, the future will be dropped at a later
27/// time by the scheduled `Runnable` once it runs.
28unsafe fn cancel<F, S, T>(ptr: *const ())
29where
30    F: Future + Send + 'static,
31    F::Output: Send + 'static,
32    S: Fn(Runnable, T) + Send + Sync + 'static,
33    T: Clone + Send + Sync + 'static,
34{
35    let this = &*(ptr as *const Task<F, S, T>);
36
37    // Enter the `Closed` or `Wind-down` phase if the tasks is not
38    // completed.
39    //
40    // Ordering: Acquire ordering is necessary to synchronize with any
41    // operation that modified or dropped the future or output. This ensures
42    // that the future or output can be safely dropped or that the task can
43    // be safely deallocated if necessary. The Release ordering synchronizes
44    // with any of the Acquire atomic fences and ensure that this atomic
45    // access is fully completed upon deallocation.
46    let state = this
47        .state
48        .fetch_update(Ordering::AcqRel, Ordering::Relaxed, |s| {
49            if s & POLLING == 0 {
50                // The task has completed or is closed so there is no need
51                // to drop the future or output and the reference count can
52                // be decremented right away.
53                Some(s - REF_INC)
54            } else if runnable_exists(s) {
55                // A `Runnable` exists so the future cannot be dropped (this
56                // will be done by the `Runnable`) and the reference count
57                // can be decremented right away.
58                Some((s | CLOSED) - REF_INC)
59            } else {
60                // The future or the output needs to be dropped so the
61                // reference count cannot be decremented just yet, otherwise
62                // another reference could deallocate the task before the
63                // drop is complete.
64                Some((s | CLOSED) & !POLLING)
65            }
66        })
67        .unwrap();
68
69    if runnable_exists(state) {
70        // The task is in the `Wind-down` phase so the cancellation is now
71        // the responsibility of the current `Runnable`.
72        return;
73    }
74
75    if state & POLLING == 0 {
76        // Deallocate the task if this was the last reference.
77        if state & REF_MASK == REF_INC {
78            // Ensure that all atomic accesses to the state are visible.
79
80            // FIXME: the fence does not seem necessary since the fetch_update
81            // uses AcqRel.
82            //
83            // Ordering: this Acquire fence synchronizes with all Release
84            // operations that decrement the number of references to the task.
85            atomic::fence(Ordering::Acquire);
86
87            // Set a drop guard to ensure that the task is deallocated,
88            // whether or not the output panics when dropped.
89            let _drop_guard = RunOnDrop::new(|| {
90                dealloc(ptr as *mut u8, Layout::new::<Task<F, S, T>>());
91            });
92
93            // Drop the output if any.
94            if state & CLOSED == 0 {
95                this.core.with_mut(|c| ManuallyDrop::drop(&mut (*c).output));
96            }
97        }
98
99        return;
100    }
101
102    // Set a drop guard to ensure that reference count is decremented and
103    // the task is deallocated if this is the last reference, whether or not
104    // the future panics when dropped.
105    let _drop_guard = RunOnDrop::new(|| {
106        // Ordering: Release ordering is necessary to ensure that the drop
107        // of the future or output is visible when the last reference
108        // deallocates the task.
109        let state = this.state.fetch_sub(REF_INC, Ordering::Release);
110        if state & REF_MASK == REF_INC {
111            // Ensure that all atomic accesses to the state are visible.
112            //
113            // Ordering: this Acquire fence synchronizes with all Release
114            // operations that decrement the number of references to the
115            // task.
116            atomic::fence(Ordering::Acquire);
117
118            dealloc(ptr as *mut u8, Layout::new::<Task<F, S, T>>());
119        }
120    });
121
122    this.core.with_mut(|c| ManuallyDrop::drop(&mut (*c).future));
123}
124
125/// Drops the token without cancelling the task.
126unsafe fn drop<F, S, T>(ptr: *const ())
127where
128    F: Future + Send + 'static,
129    F::Output: Send + 'static,
130    S: Fn(Runnable, T) + Send + Sync + 'static,
131    T: Clone + Send + Sync + 'static,
132{
133    let this = &*(ptr as *const Task<F, S, T>);
134
135    // Decrement the reference count.
136    //
137    // Ordering: the Release ordering synchronizes with any of the Acquire
138    // atomic fences and ensure that this atomic access is fully completed
139    // upon deallocation.
140    let state = this.state.fetch_sub(REF_INC, Ordering::Release);
141
142    // Deallocate the task if this token was the last reference to the task.
143    if state & REF_MASK == REF_INC && !runnable_exists(state) {
144        // Ensure that the newest state of the future or output is visible
145        // before it is dropped.
146        //
147        // Ordering: this Acquire fence synchronizes with all Release
148        // operations that decrement the number of references to the task.
149        atomic::fence(Ordering::Acquire);
150
151        // Set a drop guard to ensure that the task is deallocated whether
152        // or not the future or output panics when dropped.
153        let _drop_guard = RunOnDrop::new(|| {
154            dealloc(ptr as *mut u8, Layout::new::<Task<F, S, T>>());
155        });
156
157        if state & POLLING == POLLING {
158            this.core.with_mut(|c| ManuallyDrop::drop(&mut (*c).future));
159        } else if state & CLOSED == 0 {
160            this.core.with_mut(|c| ManuallyDrop::drop(&mut (*c).output));
161        }
162        // Else the `CLOSED` flag is set but the `POLLING` flag is cleared
163        // so the future was already dropped.
164    }
165}
166
167/// A token that can be used to cancel a task.
168#[derive(Debug)]
169pub(crate) struct CancelToken {
170    task: *const (),
171    vtable: &'static VTable,
172}
173
174impl CancelToken {
175    /// Creates a `CancelToken`.
176    ///
177    /// Safety: this is safe provided that:
178    ///
179    /// - the task pointer points to a live task allocated with the global
180    ///   allocator,
181    /// - the reference count has been incremented to account for this new task
182    ///   reference.
183    pub(super) unsafe fn new_unchecked<F, S, T>(task: *const Task<F, S, T>) -> Self
184    where
185        F: Future + Send + 'static,
186        F::Output: Send + 'static,
187        S: Fn(Runnable, T) + Send + Sync + 'static,
188        T: Clone + Send + Sync + 'static,
189    {
190        Self {
191            task: task as *const (),
192            vtable: &VTable {
193                cancel: cancel::<F, S, T>,
194                drop: drop::<F, S, T>,
195            },
196        }
197    }
198
199    /// Cancels the task.
200    ///
201    /// If the task is completed, nothing is done. If the task is not completed
202    /// but not currently scheduled (no `Runnable` exist) then the future is
203    /// dropped immediately. Otherwise, the future will be dropped at a later
204    /// time by the scheduled `Runnable` once it runs.
205    pub(crate) fn cancel(self) {
206        // Prevent the drop handler from being called, as it would call
207        // `drop_token` on the inner field.
208        let this = ManuallyDrop::new(self);
209
210        unsafe { (this.vtable.cancel)(this.task) }
211    }
212}
213
214impl Drop for CancelToken {
215    fn drop(&mut self) {
216        unsafe { (self.vtable.drop)(self.task) }
217    }
218}
219
220unsafe impl Send for CancelToken {}
221impl UnwindSafe for CancelToken {}
222impl RefUnwindSafe for CancelToken {}