lightproc/proc_handle.rs
1//!
2//! Handle for tasks which don't need to unwind panics inside
3//! the given futures.
4use crate::proc_data::ProcData;
5use crate::proc_stack::ProcStack;
6use crate::state::*;
7use std::fmt::{self, Debug, Formatter};
8use std::future::Future;
9use std::marker::{PhantomData, Unpin};
10use std::pin::Pin;
11use std::ptr::NonNull;
12use std::sync::atomic::Ordering;
13use std::task::{Context, Poll};
14
15/// A handle that awaits the result of a proc.
16///
17/// This type is a future that resolves to an `Option<R>` where:
18///
19/// * `None` indicates the proc has panicked or was cancelled
20/// * `Some(res)` indicates the proc has completed with `res`
21pub struct ProcHandle<R> {
22 /// A raw proc pointer.
23 pub(crate) raw_proc: NonNull<()>,
24
25 /// A marker capturing the generic type `R`.
26 pub(crate) _marker: PhantomData<R>,
27}
28
29unsafe impl<R> Send for ProcHandle<R> {}
30unsafe impl<R> Sync for ProcHandle<R> {}
31
32impl<R> Unpin for ProcHandle<R> {}
33
34impl<R> ProcHandle<R> {
35 /// Cancels the proc.
36 ///
37 /// If the proc has already completed, calling this method will have no effect.
38 ///
39 /// When a proc is cancelled, its future cannot be polled again and will be dropped instead.
40 pub fn cancel(&self) {
41 let ptr = self.raw_proc.as_ptr();
42 let pdata = ptr as *const ProcData;
43
44 unsafe {
45 let mut state = (*pdata).state.load(Ordering::Acquire);
46
47 loop {
48 // If the proc has been completed or closed, it can't be cancelled.
49 if state & (COMPLETED | CLOSED) != 0 {
50 break;
51 }
52
53 // If the proc is not scheduled nor running, we'll need to schedule it.
54 let new = if state & (SCHEDULED | RUNNING) == 0 {
55 (state | SCHEDULED | CLOSED) + REFERENCE
56 } else {
57 state | CLOSED
58 };
59
60 // Mark the proc as closed.
61 match (*pdata).state.compare_exchange_weak(
62 state,
63 new,
64 Ordering::AcqRel,
65 Ordering::Acquire,
66 ) {
67 Ok(_) => {
68 // If the proc is not scheduled nor running, schedule it so that its future
69 // gets dropped by the executor.
70 if state & (SCHEDULED | RUNNING) == 0 {
71 ((*pdata).vtable.schedule)(ptr);
72 }
73
74 // Notify the awaiter that the proc has been closed.
75 if state & AWAITER != 0 {
76 (*pdata).notify();
77 }
78
79 break;
80 }
81 Err(s) => state = s,
82 }
83 }
84 }
85 }
86
87 /// Returns a reference to the stack stored inside the proc.
88 pub fn stack(&self) -> &ProcStack {
89 let offset = ProcData::offset_stack();
90 let ptr = self.raw_proc.as_ptr();
91
92 unsafe {
93 let raw = (ptr as *mut u8).add(offset) as *const ProcStack;
94 &*raw
95 }
96 }
97
98 /// Returns current state of the handle.
99 pub fn state(&self) -> State {
100 let ptr = self.raw_proc.as_ptr();
101 let pdata = ptr as *const ProcData;
102 let raw_state = unsafe { (*pdata).state.load(Ordering::SeqCst) };
103 State::new(raw_state)
104 }
105}
106
107impl<R> Future for ProcHandle<R> {
108 type Output = Option<R>;
109
110 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
111 let ptr = self.raw_proc.as_ptr();
112 let pdata = ptr as *const ProcData;
113
114 unsafe {
115 let mut state = (*pdata).state.load(Ordering::Acquire);
116
117 loop {
118 // If the proc has been closed, notify the awaiter and return `None`.
119 if state & CLOSED != 0 {
120 // Even though the awaiter is most likely the current proc, it could also be
121 // another proc.
122 (*pdata).notify_unless(cx.waker());
123 return Poll::Ready(None);
124 }
125
126 // If the proc is not completed, register the current proc.
127 if state & COMPLETED == 0 {
128 // Replace the waker with one associated with the current proc. We need a
129 // safeguard against panics because dropping the previous waker can panic.
130 (*pdata).swap_awaiter(Some(cx.waker().clone()));
131
132 // Reload the state after registering. It is possible that the proc became
133 // completed or closed just before registration so we need to check for that.
134 state = (*pdata).state.load(Ordering::Acquire);
135
136 // If the proc has been closed, notify the awaiter and return `None`.
137 if state & CLOSED != 0 {
138 // Even though the awaiter is most likely the current proc, it could also
139 // be another proc.
140 (*pdata).notify_unless(cx.waker());
141 return Poll::Ready(None);
142 }
143
144 // If the proc is still not completed, we're blocked on it.
145 if state & COMPLETED == 0 {
146 return Poll::Pending;
147 }
148 }
149
150 // Since the proc is now completed, mark it as closed in order to grab its output.
151 match (*pdata).state.compare_exchange(
152 state,
153 state | CLOSED,
154 Ordering::AcqRel,
155 Ordering::Acquire,
156 ) {
157 Ok(_) => {
158 // Notify the awaiter. Even though the awaiter is most likely the current
159 // proc, it could also be another proc.
160 if state & AWAITER != 0 {
161 (*pdata).notify_unless(cx.waker());
162 }
163
164 // Take the output from the proc.
165 let output = ((*pdata).vtable.get_output)(ptr) as *mut R;
166 return Poll::Ready(Some(output.read()));
167 }
168 Err(s) => state = s,
169 }
170 }
171 }
172 }
173}
174
175impl<R> Debug for ProcHandle<R> {
176 fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
177 let ptr = self.raw_proc.as_ptr();
178 let pdata = ptr as *const ProcData;
179
180 fmt.debug_struct("ProcHandle")
181 .field("pdata", unsafe { &(*pdata) })
182 .field("stack", self.stack())
183 .finish()
184 }
185}
186
187impl<R> Drop for ProcHandle<R> {
188 fn drop(&mut self) {
189 let ptr = self.raw_proc.as_ptr();
190 let pdata = ptr as *const ProcData;
191
192 // A place where the output will be stored in case it needs to be dropped.
193 let mut output = None;
194
195 unsafe {
196 // Optimistically assume the `ProcHandle` is being dropped just after creating the
197 // proc. This is a common case so if the handle is not used, the overhead of it is only
198 // one compare-exchange operation.
199 if let Err(mut state) = (*pdata).state.compare_exchange_weak(
200 SCHEDULED | HANDLE | REFERENCE,
201 SCHEDULED | REFERENCE,
202 Ordering::AcqRel,
203 Ordering::Acquire,
204 ) {
205 loop {
206 // If the proc has been completed but not yet closed, that means its output
207 // must be dropped.
208 if state & COMPLETED != 0 && state & CLOSED == 0 {
209 // Mark the proc as closed in order to grab its output.
210 match (*pdata).state.compare_exchange_weak(
211 state,
212 state | CLOSED,
213 Ordering::AcqRel,
214 Ordering::Acquire,
215 ) {
216 Ok(_) => {
217 // Read the output.
218 output = Some((((*pdata).vtable.get_output)(ptr) as *mut R).read());
219
220 // Update the state variable because we're continuing the loop.
221 state |= CLOSED;
222 }
223 Err(s) => state = s,
224 }
225 } else {
226 // If this is the last reference to the proc and it's not closed, then
227 // close it and schedule one more time so that its future gets dropped by
228 // the executor.
229 let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
230 SCHEDULED | CLOSED | REFERENCE
231 } else {
232 state & !HANDLE
233 };
234
235 // Unset the handle flag.
236 match (*pdata).state.compare_exchange_weak(
237 state,
238 new,
239 Ordering::AcqRel,
240 Ordering::Acquire,
241 ) {
242 Ok(_) => {
243 // If this is the last reference to the proc, we need to either
244 // schedule dropping its future or destroy it.
245 if state & !(REFERENCE - 1) == 0 {
246 if state & CLOSED == 0 {
247 ((*pdata).vtable.schedule)(ptr);
248 } else {
249 ((*pdata).vtable.destroy)(ptr);
250 }
251 }
252
253 break;
254 }
255 Err(s) => state = s,
256 }
257 }
258 }
259 }
260 }
261
262 // Drop the output if it was taken out of the proc.
263 drop(output);
264 }
265}