asynchronix/executor/task/cancel_token.rs
1extern crate alloc;
2
3use std::alloc::{dealloc, Layout};
4use std::future::Future;
5use std::mem::ManuallyDrop;
6use std::panic::{RefUnwindSafe, UnwindSafe};
7
8use crate::loom_exports::sync::atomic::{self, Ordering};
9
10use super::runnable::Runnable;
11use super::util::{runnable_exists, RunOnDrop};
12use super::Task;
13use super::{CLOSED, POLLING, REF_INC, REF_MASK};
14
15/// Virtual table for a `CancelToken`.
16#[derive(Debug)]
17struct VTable {
18 cancel: unsafe fn(*const ()),
19 drop: unsafe fn(*const ()),
20}
21
22/// Cancels a pending task.
23///
24/// If the task is completed, nothing is done. If the task is not completed
25/// but not currently scheduled (no `Runnable` exist) then the future is
26/// dropped immediately. Otherwise, the future will be dropped at a later
27/// time by the scheduled `Runnable` once it runs.
28unsafe fn cancel<F, S, T>(ptr: *const ())
29where
30 F: Future + Send + 'static,
31 F::Output: Send + 'static,
32 S: Fn(Runnable, T) + Send + Sync + 'static,
33 T: Clone + Send + Sync + 'static,
34{
35 let this = &*(ptr as *const Task<F, S, T>);
36
37 // Enter the `Closed` or `Wind-down` phase if the tasks is not
38 // completed.
39 //
40 // Ordering: Acquire ordering is necessary to synchronize with any
41 // operation that modified or dropped the future or output. This ensures
42 // that the future or output can be safely dropped or that the task can
43 // be safely deallocated if necessary. The Release ordering synchronizes
44 // with any of the Acquire atomic fences and ensure that this atomic
45 // access is fully completed upon deallocation.
46 let state = this
47 .state
48 .fetch_update(Ordering::AcqRel, Ordering::Relaxed, |s| {
49 if s & POLLING == 0 {
50 // The task has completed or is closed so there is no need
51 // to drop the future or output and the reference count can
52 // be decremented right away.
53 Some(s - REF_INC)
54 } else if runnable_exists(s) {
55 // A `Runnable` exists so the future cannot be dropped (this
56 // will be done by the `Runnable`) and the reference count
57 // can be decremented right away.
58 Some((s | CLOSED) - REF_INC)
59 } else {
60 // The future or the output needs to be dropped so the
61 // reference count cannot be decremented just yet, otherwise
62 // another reference could deallocate the task before the
63 // drop is complete.
64 Some((s | CLOSED) & !POLLING)
65 }
66 })
67 .unwrap();
68
69 if runnable_exists(state) {
70 // The task is in the `Wind-down` phase so the cancellation is now
71 // the responsibility of the current `Runnable`.
72 return;
73 }
74
75 if state & POLLING == 0 {
76 // Deallocate the task if this was the last reference.
77 if state & REF_MASK == REF_INC {
78 // Ensure that all atomic accesses to the state are visible.
79
80 // FIXME: the fence does not seem necessary since the fetch_update
81 // uses AcqRel.
82 //
83 // Ordering: this Acquire fence synchronizes with all Release
84 // operations that decrement the number of references to the task.
85 atomic::fence(Ordering::Acquire);
86
87 // Set a drop guard to ensure that the task is deallocated,
88 // whether or not the output panics when dropped.
89 let _drop_guard = RunOnDrop::new(|| {
90 dealloc(ptr as *mut u8, Layout::new::<Task<F, S, T>>());
91 });
92
93 // Drop the output if any.
94 if state & CLOSED == 0 {
95 this.core.with_mut(|c| ManuallyDrop::drop(&mut (*c).output));
96 }
97 }
98
99 return;
100 }
101
102 // Set a drop guard to ensure that reference count is decremented and
103 // the task is deallocated if this is the last reference, whether or not
104 // the future panics when dropped.
105 let _drop_guard = RunOnDrop::new(|| {
106 // Ordering: Release ordering is necessary to ensure that the drop
107 // of the future or output is visible when the last reference
108 // deallocates the task.
109 let state = this.state.fetch_sub(REF_INC, Ordering::Release);
110 if state & REF_MASK == REF_INC {
111 // Ensure that all atomic accesses to the state are visible.
112 //
113 // Ordering: this Acquire fence synchronizes with all Release
114 // operations that decrement the number of references to the
115 // task.
116 atomic::fence(Ordering::Acquire);
117
118 dealloc(ptr as *mut u8, Layout::new::<Task<F, S, T>>());
119 }
120 });
121
122 this.core.with_mut(|c| ManuallyDrop::drop(&mut (*c).future));
123}
124
125/// Drops the token without cancelling the task.
126unsafe fn drop<F, S, T>(ptr: *const ())
127where
128 F: Future + Send + 'static,
129 F::Output: Send + 'static,
130 S: Fn(Runnable, T) + Send + Sync + 'static,
131 T: Clone + Send + Sync + 'static,
132{
133 let this = &*(ptr as *const Task<F, S, T>);
134
135 // Decrement the reference count.
136 //
137 // Ordering: the Release ordering synchronizes with any of the Acquire
138 // atomic fences and ensure that this atomic access is fully completed
139 // upon deallocation.
140 let state = this.state.fetch_sub(REF_INC, Ordering::Release);
141
142 // Deallocate the task if this token was the last reference to the task.
143 if state & REF_MASK == REF_INC && !runnable_exists(state) {
144 // Ensure that the newest state of the future or output is visible
145 // before it is dropped.
146 //
147 // Ordering: this Acquire fence synchronizes with all Release
148 // operations that decrement the number of references to the task.
149 atomic::fence(Ordering::Acquire);
150
151 // Set a drop guard to ensure that the task is deallocated whether
152 // or not the future or output panics when dropped.
153 let _drop_guard = RunOnDrop::new(|| {
154 dealloc(ptr as *mut u8, Layout::new::<Task<F, S, T>>());
155 });
156
157 if state & POLLING == POLLING {
158 this.core.with_mut(|c| ManuallyDrop::drop(&mut (*c).future));
159 } else if state & CLOSED == 0 {
160 this.core.with_mut(|c| ManuallyDrop::drop(&mut (*c).output));
161 }
162 // Else the `CLOSED` flag is set but the `POLLING` flag is cleared
163 // so the future was already dropped.
164 }
165}
166
167/// A token that can be used to cancel a task.
168#[derive(Debug)]
169pub(crate) struct CancelToken {
170 task: *const (),
171 vtable: &'static VTable,
172}
173
174impl CancelToken {
175 /// Creates a `CancelToken`.
176 ///
177 /// Safety: this is safe provided that:
178 ///
179 /// - the task pointer points to a live task allocated with the global
180 /// allocator,
181 /// - the reference count has been incremented to account for this new task
182 /// reference.
183 pub(super) unsafe fn new_unchecked<F, S, T>(task: *const Task<F, S, T>) -> Self
184 where
185 F: Future + Send + 'static,
186 F::Output: Send + 'static,
187 S: Fn(Runnable, T) + Send + Sync + 'static,
188 T: Clone + Send + Sync + 'static,
189 {
190 Self {
191 task: task as *const (),
192 vtable: &VTable {
193 cancel: cancel::<F, S, T>,
194 drop: drop::<F, S, T>,
195 },
196 }
197 }
198
199 /// Cancels the task.
200 ///
201 /// If the task is completed, nothing is done. If the task is not completed
202 /// but not currently scheduled (no `Runnable` exist) then the future is
203 /// dropped immediately. Otherwise, the future will be dropped at a later
204 /// time by the scheduled `Runnable` once it runs.
205 pub(crate) fn cancel(self) {
206 // Prevent the drop handler from being called, as it would call
207 // `drop_token` on the inner field.
208 let this = ManuallyDrop::new(self);
209
210 unsafe { (this.vtable.cancel)(this.task) }
211 }
212}
213
214impl Drop for CancelToken {
215 fn drop(&mut self) {
216 unsafe { (self.vtable.drop)(self.task) }
217 }
218}
219
220unsafe impl Send for CancelToken {}
221impl UnwindSafe for CancelToken {}
222impl RefUnwindSafe for CancelToken {}