lightproc/
proc_handle.rs

1//!
2//! Handle for tasks which don't need to unwind panics inside
3//! the given futures.
4use crate::proc_data::ProcData;
5use crate::proc_stack::ProcStack;
6use crate::state::*;
7use std::fmt::{self, Debug, Formatter};
8use std::future::Future;
9use std::marker::{PhantomData, Unpin};
10use std::pin::Pin;
11use std::ptr::NonNull;
12use std::sync::atomic::Ordering;
13use std::task::{Context, Poll};
14
15/// A handle that awaits the result of a proc.
16///
17/// This type is a future that resolves to an `Option<R>` where:
18///
19/// * `None` indicates the proc has panicked or was cancelled
20/// * `Some(res)` indicates the proc has completed with `res`
21pub struct ProcHandle<R> {
22    /// A raw proc pointer.
23    pub(crate) raw_proc: NonNull<()>,
24
25    /// A marker capturing the generic type `R`.
26    pub(crate) _marker: PhantomData<R>,
27}
28
29unsafe impl<R> Send for ProcHandle<R> {}
30unsafe impl<R> Sync for ProcHandle<R> {}
31
32impl<R> Unpin for ProcHandle<R> {}
33
34impl<R> ProcHandle<R> {
35    /// Cancels the proc.
36    ///
37    /// If the proc has already completed, calling this method will have no effect.
38    ///
39    /// When a proc is cancelled, its future cannot be polled again and will be dropped instead.
40    pub fn cancel(&self) {
41        let ptr = self.raw_proc.as_ptr();
42        let pdata = ptr as *const ProcData;
43
44        unsafe {
45            let mut state = (*pdata).state.load(Ordering::Acquire);
46
47            loop {
48                // If the proc has been completed or closed, it can't be cancelled.
49                if state & (COMPLETED | CLOSED) != 0 {
50                    break;
51                }
52
53                // If the proc is not scheduled nor running, we'll need to schedule it.
54                let new = if state & (SCHEDULED | RUNNING) == 0 {
55                    (state | SCHEDULED | CLOSED) + REFERENCE
56                } else {
57                    state | CLOSED
58                };
59
60                // Mark the proc as closed.
61                match (*pdata).state.compare_exchange_weak(
62                    state,
63                    new,
64                    Ordering::AcqRel,
65                    Ordering::Acquire,
66                ) {
67                    Ok(_) => {
68                        // If the proc is not scheduled nor running, schedule it so that its future
69                        // gets dropped by the executor.
70                        if state & (SCHEDULED | RUNNING) == 0 {
71                            ((*pdata).vtable.schedule)(ptr);
72                        }
73
74                        // Notify the awaiter that the proc has been closed.
75                        if state & AWAITER != 0 {
76                            (*pdata).notify();
77                        }
78
79                        break;
80                    }
81                    Err(s) => state = s,
82                }
83            }
84        }
85    }
86
87    /// Returns a reference to the stack stored inside the proc.
88    pub fn stack(&self) -> &ProcStack {
89        let offset = ProcData::offset_stack();
90        let ptr = self.raw_proc.as_ptr();
91
92        unsafe {
93            let raw = (ptr as *mut u8).add(offset) as *const ProcStack;
94            &*raw
95        }
96    }
97
98    /// Returns current state of the handle.
99    pub fn state(&self) -> State {
100        let ptr = self.raw_proc.as_ptr();
101        let pdata = ptr as *const ProcData;
102        let raw_state = unsafe { (*pdata).state.load(Ordering::SeqCst) };
103        State::new(raw_state)
104    }
105}
106
107impl<R> Future for ProcHandle<R> {
108    type Output = Option<R>;
109
110    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
111        let ptr = self.raw_proc.as_ptr();
112        let pdata = ptr as *const ProcData;
113
114        unsafe {
115            let mut state = (*pdata).state.load(Ordering::Acquire);
116
117            loop {
118                // If the proc has been closed, notify the awaiter and return `None`.
119                if state & CLOSED != 0 {
120                    // Even though the awaiter is most likely the current proc, it could also be
121                    // another proc.
122                    (*pdata).notify_unless(cx.waker());
123                    return Poll::Ready(None);
124                }
125
126                // If the proc is not completed, register the current proc.
127                if state & COMPLETED == 0 {
128                    // Replace the waker with one associated with the current proc. We need a
129                    // safeguard against panics because dropping the previous waker can panic.
130                    (*pdata).swap_awaiter(Some(cx.waker().clone()));
131
132                    // Reload the state after registering. It is possible that the proc became
133                    // completed or closed just before registration so we need to check for that.
134                    state = (*pdata).state.load(Ordering::Acquire);
135
136                    // If the proc has been closed, notify the awaiter and return `None`.
137                    if state & CLOSED != 0 {
138                        // Even though the awaiter is most likely the current proc, it could also
139                        // be another proc.
140                        (*pdata).notify_unless(cx.waker());
141                        return Poll::Ready(None);
142                    }
143
144                    // If the proc is still not completed, we're blocked on it.
145                    if state & COMPLETED == 0 {
146                        return Poll::Pending;
147                    }
148                }
149
150                // Since the proc is now completed, mark it as closed in order to grab its output.
151                match (*pdata).state.compare_exchange(
152                    state,
153                    state | CLOSED,
154                    Ordering::AcqRel,
155                    Ordering::Acquire,
156                ) {
157                    Ok(_) => {
158                        // Notify the awaiter. Even though the awaiter is most likely the current
159                        // proc, it could also be another proc.
160                        if state & AWAITER != 0 {
161                            (*pdata).notify_unless(cx.waker());
162                        }
163
164                        // Take the output from the proc.
165                        let output = ((*pdata).vtable.get_output)(ptr) as *mut R;
166                        return Poll::Ready(Some(output.read()));
167                    }
168                    Err(s) => state = s,
169                }
170            }
171        }
172    }
173}
174
175impl<R> Debug for ProcHandle<R> {
176    fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
177        let ptr = self.raw_proc.as_ptr();
178        let pdata = ptr as *const ProcData;
179
180        fmt.debug_struct("ProcHandle")
181            .field("pdata", unsafe { &(*pdata) })
182            .field("stack", self.stack())
183            .finish()
184    }
185}
186
187impl<R> Drop for ProcHandle<R> {
188    fn drop(&mut self) {
189        let ptr = self.raw_proc.as_ptr();
190        let pdata = ptr as *const ProcData;
191
192        // A place where the output will be stored in case it needs to be dropped.
193        let mut output = None;
194
195        unsafe {
196            // Optimistically assume the `ProcHandle` is being dropped just after creating the
197            // proc. This is a common case so if the handle is not used, the overhead of it is only
198            // one compare-exchange operation.
199            if let Err(mut state) = (*pdata).state.compare_exchange_weak(
200                SCHEDULED | HANDLE | REFERENCE,
201                SCHEDULED | REFERENCE,
202                Ordering::AcqRel,
203                Ordering::Acquire,
204            ) {
205                loop {
206                    // If the proc has been completed but not yet closed, that means its output
207                    // must be dropped.
208                    if state & COMPLETED != 0 && state & CLOSED == 0 {
209                        // Mark the proc as closed in order to grab its output.
210                        match (*pdata).state.compare_exchange_weak(
211                            state,
212                            state | CLOSED,
213                            Ordering::AcqRel,
214                            Ordering::Acquire,
215                        ) {
216                            Ok(_) => {
217                                // Read the output.
218                                output = Some((((*pdata).vtable.get_output)(ptr) as *mut R).read());
219
220                                // Update the state variable because we're continuing the loop.
221                                state |= CLOSED;
222                            }
223                            Err(s) => state = s,
224                        }
225                    } else {
226                        // If this is the last reference to the proc and it's not closed, then
227                        // close it and schedule one more time so that its future gets dropped by
228                        // the executor.
229                        let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
230                            SCHEDULED | CLOSED | REFERENCE
231                        } else {
232                            state & !HANDLE
233                        };
234
235                        // Unset the handle flag.
236                        match (*pdata).state.compare_exchange_weak(
237                            state,
238                            new,
239                            Ordering::AcqRel,
240                            Ordering::Acquire,
241                        ) {
242                            Ok(_) => {
243                                // If this is the last reference to the proc, we need to either
244                                // schedule dropping its future or destroy it.
245                                if state & !(REFERENCE - 1) == 0 {
246                                    if state & CLOSED == 0 {
247                                        ((*pdata).vtable.schedule)(ptr);
248                                    } else {
249                                        ((*pdata).vtable.destroy)(ptr);
250                                    }
251                                }
252
253                                break;
254                            }
255                            Err(s) => state = s,
256                        }
257                    }
258                }
259            }
260        }
261
262        // Drop the output if it was taken out of the proc.
263        drop(output);
264    }
265}