continue/
lib.rs

1//SPDX-License-Identifier: MIT OR Apache-2.0
2/*!
3# continue
4
5![logo](art/logo.png)
6
7continue is a Rust implementation of a Swift-style continuation API.
8
9# For those more familiar with Rust
10
11A continuation is a type of single-use channel.  The sender side of the channel sends a value.  The receiver of the channel is a `Future`, that `Output`s said value.  (It is a programmer error to drop a sender before sending).
12
13A common usecase for the continuation type is when you need to implement a custom Future based on being signaled by an external source when the `Future` is complete.
14
15# For those more familiar with Swift
16
17continue is the answer to how to do [`withCheckedContinuation`](https://developer.apple.com/documentation/swift/withcheckedcontinuation(isolation:function:_:)), [`CheckedContinuation`](https://developer.apple.com/documentation/swift/checkedcontinuation), and related APIs when in Rust.
18
19# For those entirely *too* familiar with Rust
20
21You may well ask: why use this?  I can 'simply' write my output into the future's memory, signal the waker, and be done with it.
22
23Not quite.  First, because wakers implicitly have a short lifetime (until the next poll, e.g. you must re-register wakers on each poll), you need some way to
24smuggle this value across threads.  The usual hammer for this nail is [`atomic-waker`](https://crates.io/crates/atomic-waker), which it will not surprise you to learn is a dependency.
25
26Secondly Drop is surprisingly hard.  In Rust, the Future side can be dropped early.  In which case: a) are you writing to a sound memory location, b) will you `Drop` the right number of times regardless of how Dropped or in-the-process-of-being-Dropped the future side is,  c) Did you want to run some code on Drop to cancel in-flight tasks, d) Did you want to optimistically poll the cancellation state and how will you smuggle that across, etc.
27
28Thirdly, executors are surprisingly hard.  It would be *sound* for an executor to keep polling you forever after it has a result, is your implementation sound in that case? Across `Drop` and `!Clone` types?
29
30I found myself making too many mistakes, in too many places, and so I've decided to make them all in one place: right here!
31
32
33*/
34
35
36use std::cell::UnsafeCell;
37use std::fmt::{Debug};
38use std::mem::{ManuallyDrop, MaybeUninit};
39use std::pin::Pin;
40use std::sync::Arc;
41use std::sync::atomic::{AtomicU8, Ordering};
42use std::task::{Context, Poll};
43
44#[repr(u8)]
45enum State {
46    Empty,
47    Data,
48    Gone,
49    FutureHangup,
50}
51
52#[derive(Debug)]
53struct Shared<R> {
54    data: UnsafeCell<MaybeUninit<R>>,
55    state: AtomicU8,
56    waker: atomic_waker::AtomicWaker,
57}
58
59
60/**
61The Sender-side of the continuation.
62
63It is a programmer error (panic) to drop this type without sending a value.
64*/
65#[derive(Debug)]
66pub struct Sender<R> {
67    shared: Arc<Shared<R>>,
68    sent: bool,
69}
70
71/**
72The receive side of a continuation, with cancellation support.
73*/
74#[derive(Debug)]
75pub struct FutureCancel<R,C: FutureCancellation> {
76    future: ManuallyDrop<Future<R>>,
77    cancellation: C,
78}
79
80pub trait FutureCancellation {
81    fn cancel(&mut self);
82}
83
84
85/**
86Creates a new continuation.
87
88If you need to provide a custom cancel implementation, use [continuation_cancel] instead.
89*/
90
91pub fn continuation<R>() -> (Sender<R>,Future<R>) {
92    let shared = Arc::new(Shared {
93        data: UnsafeCell::new(MaybeUninit::uninit()),
94        state: AtomicU8::new(State::Empty as u8),
95        waker: atomic_waker::AtomicWaker::new(),
96    });
97    (Sender { shared: shared.clone(), sent: false }, Future { shared })
98}
99
100/**
101Creates a new continuation.  Allows for a custom cancel implementation.
102
103# Parameters
104- `cancellation` - The cancellation implementation to use.  You can use the [crate::FutureCancellation] trait to react to cancel events, or Drop to react to drop events
105(regardless of whether the future is cancelled).
106*/
107pub fn continuation_cancel<R,C: FutureCancellation>(cancellation: C) -> (Sender<R>,FutureCancel<R,C>) {
108    let shared = Arc::new(Shared {
109        data: UnsafeCell::new(MaybeUninit::uninit()),
110        state: AtomicU8::new(State::Empty as u8),
111        waker: atomic_waker::AtomicWaker::new(),
112    });
113    (Sender { shared: shared.clone(), sent: false }, FutureCancel { future: ManuallyDrop::new(Future { shared }), cancellation })
114}
115
116
117
118
119impl<R> Sender<R> {
120    /**
121    Sends the data to the remote side.
122
123    Note that there is no particular guarantee that the remote side will receive this.  For example,
124    the remote side may be dropped already, in which case sending has no effect.  Alternatively, the remote
125    side may become dropped after sending.
126
127    If you have a particularly good way of handling this, you may want to check [Self::is_cancelled] to avoid doing unnecessary work.
128    Note that this is not perfect either (since the remote side may be dropped after the check but before the send).
129*/
130    pub fn send(mut self, data: R)  {
131        self.sent = true;
132
133        /*
134        Safety: Data can only be written by this type. Since the type is unclonable, we're
135        the only possible writer.
136
137        It should be ok to write by default.
138        */
139        unsafe {
140            let opt = &mut *self.shared.data.get();
141            std::ptr::write(opt.as_mut_ptr(), data); //data is moved here!
142        }
143        loop {
144            let swap = self.shared.state.compare_exchange_weak(State::Empty as u8, State::Data as u8, Ordering::Release, Ordering::Relaxed);
145            match swap {
146                Ok(_) => {
147                    self.shared.waker.wake();
148                    return
149                }
150                Err(u) => {
151                    match u {
152                        u if u == State::Empty as u8 => {/* spurious, go around again */}
153                        u if u == State::Data as u8 || u == State::Gone as u8 => {unreachable!("Continuation already resumed")}
154                        u if u == State::FutureHangup as u8 => {
155                            //sending to a hungup continuation is a no-op
156                            //however, we did write our data, so we need to drop it.
157                            unsafe {
158                                //safety: We know that the continuation has been resumed, so we can read the data
159                                let data = &mut *self.shared.data.get();
160                                //safety: we know the data was initialized and will never be written to again (only
161                                //written to in empty state.
162                                let _ = data.assume_init_read();
163                            }
164                        }
165                        //sender hangup is impossible
166                        _ => unreachable!("Invalid state"),
167                    }
168                }
169            }
170        }
171
172    }
173
174    /**
175    Determines if the underlying future is cancelled.  And thus, that sending data will have no effect.
176
177    Even if this function returns `false`, it is possible that by the time you send data, the future will be cancelled.
178    */
179    pub fn is_cancelled(&self) -> bool {
180        self.shared.state.load(Ordering::Relaxed) == State::FutureHangup as u8
181    }
182
183}
184
185impl<R> Drop for Sender<R> {
186    fn drop(&mut self) {
187        assert!(self.sent, "Sender dropped without sending data");
188    }
189}
190
191/**
192The receive side of the continuation, without cancellation support.
193
194See also: [FutureCancel]
195*/
196#[derive(Debug)]
197pub struct Future<R> {
198    shared: Arc<Shared<R>>,
199}
200
201enum DropState {
202    Cancelled,
203    NotCancelled,
204}
205impl<R> Future<R> {
206    /**
207    implementation detail of drop.
208
209    # Returns
210    a value indicating whether, at the time the function ran, the future is dropped before receiving data.
211
212    Note that this is not a guarantee that at any future time – including immediately after this function returns – the data will not be sent.
213    */
214    fn drop_impl(&mut self) -> DropState {
215        let swap = self.shared.state.swap(State::FutureHangup as u8, Ordering::Acquire);
216        match swap {
217            u if u == State::Empty as u8 => {
218                DropState::Cancelled
219            }
220            u if u == State::Data as u8 => {
221                //data needs to be dropped here
222                unsafe {
223                    //safety: We know that the continuation has been resumed, so we can read the data
224                    let data = &mut *self.shared.data.get();
225                    //safety: we know the data was initialized and will never be written to again (only
226                    //written to in empty state.
227                    let _ = data.assume_init_read();
228                }
229                DropState::NotCancelled
230            }
231            u if u == State::Gone as u8 => {
232                DropState::NotCancelled
233            }
234            _ => unreachable!("Invalid state"),
235        }
236    }
237}
238
239impl<R> Drop for Future<R> {
240    fn drop(&mut self) {
241        self.drop_impl();
242    }
243}
244
245impl<R,C: FutureCancellation> Drop for FutureCancel<R,C> {
246    fn drop(&mut self) {
247        //kill future first
248        let mut future = unsafe{ManuallyDrop::take(&mut self.future)};
249        match future.drop_impl() {
250            DropState::Cancelled => {
251                self.cancellation.cancel();
252            }
253            DropState::NotCancelled => {}
254        }
255        //don't run drop - we already ran drop_impl
256        std::mem::forget(future);
257    }
258}
259
260enum ReadStatus<R> {
261    Data(R),
262    Waiting,
263    Spurious,
264}
265
266impl<R> Future<R> {
267    fn interpret_result(result: Result<u8, u8>, data: &UnsafeCell<MaybeUninit<R>>) -> ReadStatus<R> {
268        match result {
269            Ok(..) => {
270                unsafe {
271                    //safety: We know that the continuation has been resumed, so we can read the data
272                    let data = &mut *data.get();
273                    /*safety: we know the data was initialized and will never be written to again (only
274                    written to in empty state.
275
276                    We know it will never be read again because we set gone before leaving the function.
277                    It can only be polled exclusively in this function since we have &mut self.
278                     */
279                    let r = data.assume_init_read();
280                    ReadStatus::Data(r)
281                }
282            }
283            Err(u) => {
284                match u {
285                    u if u == State::Empty as u8 => { ReadStatus::Waiting }
286                    u if u == State::Data as u8 => { ReadStatus::Spurious }
287                    u if u == State::Gone as u8 => { panic!("Continuation already polled") }
288                    _ => { unreachable!("Invalid state") }
289                }
290            }
291        }
292    }
293}
294
295
296
297impl<R> std::future::Future for Future<R> {
298    type Output = R;
299    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
300        //optimistic read.
301        let state = self.shared.state.compare_exchange_weak(State::Data as u8, State::Gone as u8, Ordering::Acquire, Ordering::Relaxed);
302        match Self::interpret_result(state, &self.shared.data) {
303            ReadStatus::Data(data) => {return Poll::Ready(data)}
304            ReadStatus::Waiting | ReadStatus::Spurious => {}
305        }
306        //register for wakeup
307        self.shared.waker.register(cx.waker());
308        loop {
309            let state2 = self.shared.state.compare_exchange_weak(State::Data as u8, State::Gone as u8, Ordering::Acquire, Ordering::Relaxed);
310            match Self::interpret_result(state2, &self.shared.data) {
311                ReadStatus::Data(data) => {return Poll::Ready(data)}
312                ReadStatus::Waiting => {return Poll::Pending}
313                ReadStatus::Spurious => {continue}
314            }
315        }
316    }
317}
318
319impl<R,C: FutureCancellation> std::future::Future for FutureCancel<R,C> {
320    type Output = R;
321
322    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
323        //nothing is unpinned here
324       unsafe{self.map_unchecked_mut(|s| &mut s.future as &mut Future<R> )}.poll(cx)
325    }
326}
327
328//tedious traits
329
330//I think we don't want clone on either type, because it creates problems for implementing Send.
331unsafe impl<R: Send> Send for Future<R> {}
332unsafe impl<R: Send, C: Send + FutureCancellation> Send for FutureCancel<R,C> {}
333unsafe impl <R: Send> Send for Sender<R> {}
334
335/*Since no clone, no copy
336
337I think we don't want Eq/Ord/hash because we don't expect multiple instances, since no clone.
338
339Default does not make a lot of sense because we generate types as a pair.
340 */
341
342
343
344
345#[cfg(test)]
346mod test {
347    use std::pin::Pin;
348    use std::task::Poll;
349    use crate::continuation;
350
351    #[test]
352    fn test_continue() {
353        let(c,mut f) = continuation();
354        let mut f = Pin::new(&mut f);
355        assert!(test_executors::poll_once(f.as_mut()).is_pending());
356        c.send(23);
357        match test_executors::poll_once(f) {
358            Poll::Ready(23) => {}
359            x => panic!("Unexpected result {:?}",x),
360        }
361    }
362
363    #[test] fn test_is_send() {
364        fn is_send<T: Send>() {}
365        is_send::<crate::Future<i32>>();
366        is_send::<crate::Sender<i32>>();
367    }
368
369
370}