continue/lib.rs
1//SPDX-License-Identifier: MIT OR Apache-2.0
2/*!
3# continue
4
5
6
7continue is a Rust implementation of a Swift-style continuation API.
8
9# For those more familiar with Rust
10
11A continuation is a type of single-use channel. The sender side of the channel sends a value. The receiver of the channel is a `Future`, that `Output`s said value. (It is a programmer error to drop a sender before sending).
12
13A common usecase for the continuation type is when you need to implement a custom Future based on being signaled by an external source when the `Future` is complete.
14
15# For those more familiar with Swift
16
17continue is the answer to how to do [`withCheckedContinuation`](https://developer.apple.com/documentation/swift/withcheckedcontinuation(isolation:function:_:)), [`CheckedContinuation`](https://developer.apple.com/documentation/swift/checkedcontinuation), and related APIs when in Rust.
18
19# For those entirely *too* familiar with Rust
20
21You may well ask: why use this? I can 'simply' write my output into the future's memory, signal the waker, and be done with it.
22
23Not quite. First, because wakers implicitly have a short lifetime (until the next poll, e.g. you must re-register wakers on each poll), you need some way to
24smuggle this value across threads. The usual hammer for this nail is [`atomic-waker`](https://crates.io/crates/atomic-waker), which it will not surprise you to learn is a dependency.
25
26Secondly Drop is surprisingly hard. In Rust, the Future side can be dropped early. In which case: a) are you writing to a sound memory location, b) will you `Drop` the right number of times regardless of how Dropped or in-the-process-of-being-Dropped the future side is, c) Did you want to run some code on Drop to cancel in-flight tasks, d) Did you want to optimistically poll the cancellation state and how will you smuggle that across, etc.
27
28Thirdly, executors are surprisingly hard. It would be *sound* for an executor to keep polling you forever after it has a result, is your implementation sound in that case? Across `Drop` and `!Clone` types?
29
30I found myself making too many mistakes, in too many places, and so I've decided to make them all in one place: right here!
31
32
33*/
34
35
36use std::cell::UnsafeCell;
37use std::fmt::{Debug};
38use std::mem::{ManuallyDrop, MaybeUninit};
39use std::pin::Pin;
40use std::sync::Arc;
41use std::sync::atomic::{AtomicU8, Ordering};
42use std::task::{Context, Poll};
43
44#[repr(u8)]
45enum State {
46 Empty,
47 Data,
48 Gone,
49 FutureHangup,
50}
51
52#[derive(Debug)]
53struct Shared<R> {
54 data: UnsafeCell<MaybeUninit<R>>,
55 state: AtomicU8,
56 waker: atomic_waker::AtomicWaker,
57}
58
59
60/**
61The Sender-side of the continuation.
62
63It is a programmer error (panic) to drop this type without sending a value.
64*/
65#[derive(Debug)]
66pub struct Sender<R> {
67 shared: Arc<Shared<R>>,
68 sent: bool,
69}
70
71/**
72The receive side of a continuation, with cancellation support.
73*/
74#[derive(Debug)]
75pub struct FutureCancel<R,C: FutureCancellation> {
76 future: ManuallyDrop<Future<R>>,
77 cancellation: C,
78}
79
80pub trait FutureCancellation {
81 fn cancel(&mut self);
82}
83
84
85/**
86Creates a new continuation.
87
88If you need to provide a custom cancel implementation, use [continuation_cancel] instead.
89*/
90
91pub fn continuation<R>() -> (Sender<R>,Future<R>) {
92 let shared = Arc::new(Shared {
93 data: UnsafeCell::new(MaybeUninit::uninit()),
94 state: AtomicU8::new(State::Empty as u8),
95 waker: atomic_waker::AtomicWaker::new(),
96 });
97 (Sender { shared: shared.clone(), sent: false }, Future { shared })
98}
99
100/**
101Creates a new continuation. Allows for a custom cancel implementation.
102
103# Parameters
104- `cancellation` - The cancellation implementation to use. You can use the [crate::FutureCancellation] trait to react to cancel events, or Drop to react to drop events
105(regardless of whether the future is cancelled).
106*/
107pub fn continuation_cancel<R,C: FutureCancellation>(cancellation: C) -> (Sender<R>,FutureCancel<R,C>) {
108 let shared = Arc::new(Shared {
109 data: UnsafeCell::new(MaybeUninit::uninit()),
110 state: AtomicU8::new(State::Empty as u8),
111 waker: atomic_waker::AtomicWaker::new(),
112 });
113 (Sender { shared: shared.clone(), sent: false }, FutureCancel { future: ManuallyDrop::new(Future { shared }), cancellation })
114}
115
116
117
118
119impl<R> Sender<R> {
120 /**
121 Sends the data to the remote side.
122
123 Note that there is no particular guarantee that the remote side will receive this. For example,
124 the remote side may be dropped already, in which case sending has no effect. Alternatively, the remote
125 side may become dropped after sending.
126
127 If you have a particularly good way of handling this, you may want to check [Self::is_cancelled] to avoid doing unnecessary work.
128 Note that this is not perfect either (since the remote side may be dropped after the check but before the send).
129*/
130 pub fn send(mut self, data: R) {
131 self.sent = true;
132
133 /*
134 Safety: Data can only be written by this type. Since the type is unclonable, we're
135 the only possible writer.
136
137 It should be ok to write by default.
138 */
139 unsafe {
140 let opt = &mut *self.shared.data.get();
141 std::ptr::write(opt.as_mut_ptr(), data); //data is moved here!
142 }
143 loop {
144 let swap = self.shared.state.compare_exchange_weak(State::Empty as u8, State::Data as u8, Ordering::Release, Ordering::Relaxed);
145 match swap {
146 Ok(_) => {
147 self.shared.waker.wake();
148 return
149 }
150 Err(u) => {
151 match u {
152 u if u == State::Empty as u8 => {/* spurious, go around again */}
153 u if u == State::Data as u8 || u == State::Gone as u8 => {unreachable!("Continuation already resumed")}
154 u if u == State::FutureHangup as u8 => {
155 //sending to a hungup continuation is a no-op
156 //however, we did write our data, so we need to drop it.
157 unsafe {
158 //safety: We know that the continuation has been resumed, so we can read the data
159 let data = &mut *self.shared.data.get();
160 //safety: we know the data was initialized and will never be written to again (only
161 //written to in empty state.
162 let _ = data.assume_init_read();
163 }
164 }
165 //sender hangup is impossible
166 _ => unreachable!("Invalid state"),
167 }
168 }
169 }
170 }
171
172 }
173
174 /**
175 Determines if the underlying future is cancelled. And thus, that sending data will have no effect.
176
177 Even if this function returns `false`, it is possible that by the time you send data, the future will be cancelled.
178 */
179 pub fn is_cancelled(&self) -> bool {
180 self.shared.state.load(Ordering::Relaxed) == State::FutureHangup as u8
181 }
182
183}
184
185impl<R> Drop for Sender<R> {
186 fn drop(&mut self) {
187 assert!(self.sent, "Sender dropped without sending data");
188 }
189}
190
191/**
192The receive side of the continuation, without cancellation support.
193
194See also: [FutureCancel]
195*/
196#[derive(Debug)]
197pub struct Future<R> {
198 shared: Arc<Shared<R>>,
199}
200
201enum DropState {
202 Cancelled,
203 NotCancelled,
204}
205impl<R> Future<R> {
206 /**
207 implementation detail of drop.
208
209 # Returns
210 a value indicating whether, at the time the function ran, the future is dropped before receiving data.
211
212 Note that this is not a guarantee that at any future time – including immediately after this function returns – the data will not be sent.
213 */
214 fn drop_impl(&mut self) -> DropState {
215 let swap = self.shared.state.swap(State::FutureHangup as u8, Ordering::Acquire);
216 match swap {
217 u if u == State::Empty as u8 => {
218 DropState::Cancelled
219 }
220 u if u == State::Data as u8 => {
221 //data needs to be dropped here
222 unsafe {
223 //safety: We know that the continuation has been resumed, so we can read the data
224 let data = &mut *self.shared.data.get();
225 //safety: we know the data was initialized and will never be written to again (only
226 //written to in empty state.
227 let _ = data.assume_init_read();
228 }
229 DropState::NotCancelled
230 }
231 u if u == State::Gone as u8 => {
232 DropState::NotCancelled
233 }
234 _ => unreachable!("Invalid state"),
235 }
236 }
237}
238
239impl<R> Drop for Future<R> {
240 fn drop(&mut self) {
241 self.drop_impl();
242 }
243}
244
245impl<R,C: FutureCancellation> Drop for FutureCancel<R,C> {
246 fn drop(&mut self) {
247 //kill future first
248 let mut future = unsafe{ManuallyDrop::take(&mut self.future)};
249 match future.drop_impl() {
250 DropState::Cancelled => {
251 self.cancellation.cancel();
252 }
253 DropState::NotCancelled => {}
254 }
255 //don't run drop - we already ran drop_impl
256 std::mem::forget(future);
257 }
258}
259
260enum ReadStatus<R> {
261 Data(R),
262 Waiting,
263 Spurious,
264}
265
266impl<R> Future<R> {
267 fn interpret_result(result: Result<u8, u8>, data: &UnsafeCell<MaybeUninit<R>>) -> ReadStatus<R> {
268 match result {
269 Ok(..) => {
270 unsafe {
271 //safety: We know that the continuation has been resumed, so we can read the data
272 let data = &mut *data.get();
273 /*safety: we know the data was initialized and will never be written to again (only
274 written to in empty state.
275
276 We know it will never be read again because we set gone before leaving the function.
277 It can only be polled exclusively in this function since we have &mut self.
278 */
279 let r = data.assume_init_read();
280 ReadStatus::Data(r)
281 }
282 }
283 Err(u) => {
284 match u {
285 u if u == State::Empty as u8 => { ReadStatus::Waiting }
286 u if u == State::Data as u8 => { ReadStatus::Spurious }
287 u if u == State::Gone as u8 => { panic!("Continuation already polled") }
288 _ => { unreachable!("Invalid state") }
289 }
290 }
291 }
292 }
293}
294
295
296
297impl<R> std::future::Future for Future<R> {
298 type Output = R;
299 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
300 //optimistic read.
301 let state = self.shared.state.compare_exchange_weak(State::Data as u8, State::Gone as u8, Ordering::Acquire, Ordering::Relaxed);
302 match Self::interpret_result(state, &self.shared.data) {
303 ReadStatus::Data(data) => {return Poll::Ready(data)}
304 ReadStatus::Waiting | ReadStatus::Spurious => {}
305 }
306 //register for wakeup
307 self.shared.waker.register(cx.waker());
308 loop {
309 let state2 = self.shared.state.compare_exchange_weak(State::Data as u8, State::Gone as u8, Ordering::Acquire, Ordering::Relaxed);
310 match Self::interpret_result(state2, &self.shared.data) {
311 ReadStatus::Data(data) => {return Poll::Ready(data)}
312 ReadStatus::Waiting => {return Poll::Pending}
313 ReadStatus::Spurious => {continue}
314 }
315 }
316 }
317}
318
319impl<R,C: FutureCancellation> std::future::Future for FutureCancel<R,C> {
320 type Output = R;
321
322 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
323 //nothing is unpinned here
324 unsafe{self.map_unchecked_mut(|s| &mut s.future as &mut Future<R> )}.poll(cx)
325 }
326}
327
328//tedious traits
329
330//I think we don't want clone on either type, because it creates problems for implementing Send.
331unsafe impl<R: Send> Send for Future<R> {}
332unsafe impl<R: Send, C: Send + FutureCancellation> Send for FutureCancel<R,C> {}
333unsafe impl <R: Send> Send for Sender<R> {}
334
335/*Since no clone, no copy
336
337I think we don't want Eq/Ord/hash because we don't expect multiple instances, since no clone.
338
339Default does not make a lot of sense because we generate types as a pair.
340 */
341
342
343
344
345#[cfg(test)]
346mod test {
347 use std::pin::Pin;
348 use std::task::Poll;
349 use crate::continuation;
350
351 #[test]
352 fn test_continue() {
353 let(c,mut f) = continuation();
354 let mut f = Pin::new(&mut f);
355 assert!(test_executors::poll_once(f.as_mut()).is_pending());
356 c.send(23);
357 match test_executors::poll_once(f) {
358 Poll::Ready(23) => {}
359 x => panic!("Unexpected result {:?}",x),
360 }
361 }
362
363 #[test] fn test_is_send() {
364 fn is_send<T: Send>() {}
365 is_send::<crate::Future<i32>>();
366 is_send::<crate::Sender<i32>>();
367 }
368
369
370}