my_ecs/ds/
fut.rs

1use std::{
2    future::Future,
3    marker::PhantomPinned,
4    mem,
5    pin::Pin,
6    ptr::NonNull,
7    sync::{
8        atomic::{AtomicBool, Ordering},
9        Arc,
10    },
11    task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
12    thread::{self, Thread},
13};
14
15/// A trait to wake a worker and send [`UnsafeFuture`] to the worker.
16pub trait WakeSend: Send + Sync + Clone + 'static {
17    /// Wakes associated worker and send [`UnsafeFuture`] to the worker.
18    fn wake_send(&self, handle: UnsafeFuture);
19}
20
21/// A handle to a future data.
22///
23/// Name contains `future`, but this struct doesn't implement [`Future`] trait.
24/// It provides you poll function instead. You can call poll function on a
25/// handle and get the result if the `FutureData` is ready.
26///
27/// Plus, this is actually a type-erased pointer to a `FutureData` so that
28/// owners must deal with the pointer carefully. See the example below to get a
29/// feel for how to use the struct.
30///
31/// # Examples
32///
33/// ```
34/// use my_ecs::ds::{WakeSend, UnsafeFuture, ReadyFuture};
35/// use std::{
36///     future::Future,
37///     task::{Poll, Context},
38///     sync::mpsc::{self, Sender},
39///     pin::Pin,
40/// };
41///
42/// #[derive(Clone)]
43/// struct MyWaker(Sender<UnsafeFuture>);
44///
45/// impl WakeSend for MyWaker {
46///     fn wake_send(&self, handle: UnsafeFuture) {
47///         self.0.send(handle).unwrap();
48///     }
49/// }
50///
51/// struct MyFuture(u32);
52///
53/// impl Future for MyFuture {
54///     type Output = u32;
55///
56///     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
57///         let this = self.get_mut();
58///         if this.0 == 0 {
59///             Poll::Ready(10)
60///         } else {
61///             this.0 -= 1;
62///             cx.waker().wake_by_ref();
63///             Poll::Pending
64///         }
65///     }
66/// }
67///
68/// let (tx, rx) = mpsc::channel();
69/// let fut = MyFuture(2);
70/// let waker = MyWaker(tx);
71/// let consume = |ret: u32, arg: u32| ret + arg;
72/// let mut u_fut = UnsafeFuture::new(fut, waker, consume);
73///
74/// unsafe {
75///     let mut pending = 0;
76///     while u_fut.poll() == Poll::Pending {
77///         u_fut = rx.recv().unwrap();
78///         pending += 1;
79///     }
80///     let r_fut = ReadyFuture::new(u_fut);
81///     let res: u32 = r_fut.consume(1);
82///
83///     assert_eq!(pending, 2);
84///     assert_eq!(res, consume(10, 1));
85/// }
86/// ```
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88#[repr(transparent)]
89pub struct UnsafeFuture {
90    /// Type-erased pointer to a future data.
91    ///
92    /// This pointer is something like 'data' field of [`RawWaker`].
93    data: NonNull<u8>,
94}
95
96unsafe impl Send for UnsafeFuture {}
97
98impl UnsafeFuture {
99    /// Creates a future data in heap memory and returns its handle.
100    ///
101    /// # Leaks
102    ///
103    /// There will be memory leak if caller doesn't deallocate the future data.
104    /// Future data can be deallocated by
105    /// - Calling [`UnsafeFuture::destroy`].
106    /// - Turning [`UnsafeFuture`] into [`ReadyFuture`] then dropping it.
107    ///   `ReadyFuture` calls [`UnsafeFuture::destroy`] when it's dropped.
108    ///
109    /// # Examples
110    ///
111    /// See [`UnsafeFuture`] documentation.
112    pub fn new<F, R, W, Arg, CR>(future: F, waker: W, consume: fn(R, Arg) -> CR) -> Self
113    where
114        F: Future<Output = R> + Send + 'static,
115        R: Send + 'static,
116        W: WakeSend,
117    {
118        let pinned = FutureData::new(future, waker, consume);
119
120        // `FutureData` is created wrapped in `Pin<Box<T>>` because it's another type of future.
121        // But this struct will manage it through its pointer, so let's unwrap it.
122        //
123        // # Safety: We won't move the `FutureData` in this struct.
124        let data = unsafe {
125            let boxed = Pin::into_inner_unchecked(pinned);
126            NonNull::new_unchecked(Box::into_raw(boxed)).cast::<u8>()
127        };
128
129        Self { data }
130    }
131
132    /// Drops and deallocates associated future data.
133    ///
134    /// You may need this method when you have to cancel out not ready futures.
135    ///
136    /// # Safety
137    ///
138    /// This method must be called only once for the same handles.
139    pub unsafe fn destroy(self) {
140        // Safety
141        // - `self.data` is a valid pointer to a `FutureData`.
142        // - drop method will be called only once.
143        unsafe {
144            let vtable = self.data.cast::<FutureVTable>().as_mut();
145            (vtable.drop)(self.data)
146        }
147    }
148
149    /// Returns true if associated future data is ready.
150    ///
151    /// # Safety
152    ///
153    /// Undefined behavior if associated `FutureData` has been dropped.
154    pub unsafe fn is_ready(&self) -> bool {
155        unsafe {
156            let vtable = self.data.cast::<FutureVTable>().as_mut();
157            (vtable.is_ready)(self.data)
158        }
159    }
160
161    /// Tries to make more progress on the associated future data.
162    ///
163    /// Returning value [`Poll::Ready`] means the `FutureData` is completely
164    /// resolved and ready to provide its output. [`Poll::Pending`], on the
165    /// other hand, means the `FutureData` is not yet ready and will wake async
166    /// runtime via the waker you inserted at [`UnsafeFuture::new`] when it can
167    /// make more progress.
168    ///
169    /// # Safety
170    ///
171    /// Associated future data must be alive, not have been dropped.
172    ///
173    /// # Examples
174    ///
175    /// See [`UnsafeFuture`] documentation.
176    pub unsafe fn poll(self) -> Poll<()> {
177        unsafe {
178            let vtable = self.data.cast::<FutureVTable>().as_mut();
179            if (vtable.poll_unchecked)(self.data) {
180                Poll::Ready(())
181            } else {
182                Poll::Pending
183            }
184        }
185    }
186
187    /// Returns true if the given waker is the same as the type you inserted at
188    /// [`UnsafeFuture::new`].
189    ///
190    /// # Safety
191    ///
192    /// Waker type `W` must be the same as the type you inserted at
193    /// [`UnsafeFuture::new`].
194    pub unsafe fn will_wake<W>(self, other: &W) -> bool
195    where
196        W: WakeSend + PartialEq,
197    {
198        unsafe {
199            let waker_ptr = FutureData::<(), (), W, (), ()>::waker_ptr(self.data);
200            waker_ptr.as_ref() == other
201        }
202    }
203
204    /// Sets a new waker to the associated future data.
205    ///
206    /// # Safety
207    ///
208    /// Waker type `W` must be the same as the type you inserted at
209    /// [`UnsafeFuture::new`].
210    pub unsafe fn set_waker<W>(self, waker: W) -> W
211    where
212        W: WakeSend,
213    {
214        let old = unsafe { FutureData::<(), (), W, (), ()>::waker_ptr(self.data).as_mut() };
215        mem::replace(old, waker)
216    }
217
218    /// # Safety
219    ///
220    /// Argument types `Arg` and `CR` must be the same as the types determined
221    /// on [`UnsafeFuture::new`].
222    unsafe fn consume<Arg, CR>(self, arg: Arg) -> CR {
223        unsafe {
224            let vtable = self.data.cast::<FutureVTable>().as_mut();
225            let delegate_consume = mem::transmute::<unsafe fn(), unsafe fn(NonNull<u8>, Arg) -> CR>(
226                vtable.delegate_consume,
227            );
228            delegate_consume(self.data, arg)
229        }
230    }
231}
232
233/// A handle to a *ready* future data.
234///
235/// The struct can be created from ready [`UnsafeFuture`] only, and it doesn't
236/// provide methods such as poll except [`ReadyFuture::consume`]. You can get
237/// the result from the ready `FutureData` through the consume method, then
238/// associated `FutureData` will be dropped and deallocated.
239///
240/// See [`UnsafeFuture`] documentation to see how this struct is used.
241#[derive(Debug)]
242#[repr(transparent)]
243pub struct ReadyFuture(UnsafeFuture);
244
245impl ReadyFuture {
246    /// Creates a new [`ReadyFuture`] from the given ready [`UnsafeFuture`].
247    ///
248    /// # Panics
249    ///
250    /// Panics if associated future data is not ready.
251    ///
252    /// # Safety
253    ///
254    /// Undefined behavior if associated `FutureData` is not alive.
255    ///
256    /// # Examples
257    ///
258    /// See [`UnsafeFuture`] documentation.
259    pub unsafe fn new(future: UnsafeFuture) -> Self {
260        assert!(unsafe { future.is_ready() });
261
262        Self(future)
263    }
264
265    /// Takes the result out of associated future data, then converts it by
266    /// the consume function registered at [`UnsafeFuture::new`], and then
267    /// returns the converted result.
268    ///
269    /// By taking `self`, it's dropped at the end of the method, then drops and
270    /// deallocates the associated future data as well.
271    ///
272    /// # Safety
273    ///
274    /// `Arg` and `CR` must be the same as the types determined on
275    /// [`UnsafeFuture::new`].
276    ///
277    /// # Examples
278    ///
279    /// See [`UnsafeFuture`] documentation.
280    pub unsafe fn consume<Arg, CR>(self, arg: Arg) -> CR {
281        unsafe { self.0.consume(arg) }
282        // `self` goes out of scope then be dropped.
283    }
284}
285
286impl Drop for ReadyFuture {
287    fn drop(&mut self) {
288        unsafe { self.0.destroy() };
289    }
290}
291
292#[derive(Debug)]
293#[repr(C)]
294struct FutureData<F, R, W, Arg, CR> {
295    /// Functions that receive a pointer to this struct as first parameter.
296    //
297    // This field must be located at the first position of this struct, So, raw
298    // pointers to this structs can be translated as `FutureVTable`s as well,
299    // in turn, clients can call to various functions in vtable just using the
300    // one pointer.
301    vtable: FutureVTable,
302
303    /// Waker that wakes up the polling thread, a.k.a. executor or runtime.
304    //
305    // This field must be located at the second position of this struct, So, raw
306    // pointers to this structs can be translated as `W`s as well,
307    waker: W,
308
309    /// Future data.
310    future: F,
311
312    /// Output of the future.
313    output: Option<R>,
314
315    /// Function consuming the output with anonymous argument.
316    consume: fn(R, Arg) -> CR,
317
318    /// Atomic variable to synchronize memory over workers.
319    sync: AtomicBool,
320
321    _pin: PhantomPinned,
322}
323
324impl<F, R, W, Arg, CR> FutureData<F, R, W, Arg, CR>
325where
326    F: Future<Output = R> + Send + 'static,
327    R: Send + 'static,
328    W: WakeSend,
329{
330    fn new(future: F, waker: W, consume: fn(R, Arg) -> CR) -> Pin<Box<Self>> {
331        // Erases type `Arg` and `CR` from `delegate_consume`, so we can hold
332        // it.
333        let delegate_consume = unsafe {
334            mem::transmute::<unsafe fn(NonNull<u8>, Arg) -> CR, unsafe fn()>(Self::delegate_consume)
335        };
336
337        // See vtable functions below.
338        let vtable = FutureVTable {
339            is_ready: Self::is_ready,
340            poll_unchecked: Self::poll_unchecked,
341            drop: Self::drop,
342            wake_send: Self::wake_send,
343            delegate_consume,
344        };
345
346        Box::pin(Self {
347            vtable,
348            waker,
349            future,
350            output: None,
351            consume,
352            sync: AtomicBool::new(false),
353            _pin: PhantomPinned,
354        })
355    }
356
357    /// * data - A pointer to [`FutureData`].
358    ///
359    /// # Safety
360    ///
361    /// - The given pointer must be a valid pointer to *pinned* [`FutureData`].
362    unsafe fn is_ready(data: NonNull<u8>) -> bool {
363        let this = unsafe { data.cast::<FutureData<F, R, W, Arg, CR>>().as_mut() };
364        this.output.is_some()
365    }
366
367    /// * data - A pointer to [`FutureData`].
368    ///
369    /// # Safety
370    ///
371    /// - The given pointer must be a valid pointer to *pinned* [`FutureData`].
372    unsafe fn poll_unchecked(data: NonNull<u8>) -> bool {
373        let this = unsafe { data.cast::<FutureData<F, R, W, Arg, CR>>().as_mut() };
374
375        // Synchronize memory.
376        //
377        // Future data, `FutureData`, and its handle, `UnsafeFuture` are designed
378        // to be stolen by other workers, which makes a problem in terms of
379        // synchronization.
380        // Imagine `A` polled on a future data and wrote something on it. `B`
381        // wakes `C` up and gives future handle through `WakeSend`
382        // implementation. Here's the problem. `B` and `C` may be synchronized,
383        // but `A` and `C` isn't. Therefore, `C` cannot see what `A` made on the
384        // future data.
385        // This atomic variable synchronizes memory for polling workers.
386        //
387        // Is spin lock without limit fine?
388        // Blocking here means that poll() below results in wake-poll by another
389        // worker before atomic store operation finished.
390        // Therefore, we have to wait just one atomic store operation, which
391        // will be finished quickly.
392        while this
393            .sync
394            .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
395            .is_err()
396        {
397            thread::yield_now();
398        }
399
400        let pinned_future = unsafe { Pin::new_unchecked(&mut this.future) };
401
402        // Creates `Context` from the given data pointer.
403        let data = data.as_ptr().cast_const().cast::<()>();
404        let raw_waker = RawWaker::new(data, raw_waker_vtable());
405        let waker = unsafe { Waker::from_raw(raw_waker) };
406        let mut cx = Context::from_waker(&waker);
407
408        // Polls the future and returns true if it's ready.
409        let res = if let Poll::Ready(output) = pinned_future.poll(&mut cx) {
410            this.output = Some(output);
411            true
412        } else {
413            false
414        };
415
416        // Synchronize memory.
417        this.sync.store(false, Ordering::Release);
418
419        res
420    }
421
422    /// Calls drop methods on [`FutureData`] pointed by the given data pointer,
423    /// then release the memory.
424    ///
425    /// * data - A pointer to [`FutureData`].
426    ///
427    /// # Safety
428    ///
429    /// The given pointer must be a valid pointer to [`FutureData`].
430    unsafe fn drop(data: NonNull<u8>) {
431        unsafe {
432            let this = data.cast::<FutureData<F, R, W, Arg, CR>>().as_mut();
433            drop(Box::from_raw(this));
434        };
435    }
436
437    /// * data - A pointer to [`FutureData`].
438    ///
439    /// # Safety
440    ///
441    /// The given pointer must be a valid pointer to [`FutureData`].
442    unsafe fn wake_send(data: NonNull<u8>) {
443        let this = unsafe { data.cast::<FutureData<F, R, W, Arg, CR>>().as_mut() };
444        this.waker.wake_send(UnsafeFuture { data })
445    }
446
447    unsafe fn delegate_consume(data: NonNull<u8>, arg: Arg) -> CR {
448        unsafe {
449            let this = data.cast::<FutureData<F, R, W, Arg, CR>>().as_mut();
450            let output: R = this.output.take().unwrap_unchecked();
451            (this.consume)(output, arg)
452        }
453    }
454}
455
456impl<W> FutureData<(), (), W, (), ()> {
457    // Address of waker is determined by its alignment only.
458    // It doesn't depend on `F` and `R` because it is located right after
459    // `FutureVTable` which has fixed size.
460    unsafe fn waker_ptr(data: NonNull<u8>) -> NonNull<W> {
461        unsafe {
462            let this = data.cast::<FutureData<(), (), W, (), ()>>().as_mut();
463            let ptr = &mut this.waker as *mut W;
464            NonNull::new_unchecked(ptr)
465        }
466    }
467}
468
469#[derive(Debug, Clone, Copy)]
470struct FutureVTable {
471    /// A function pointer to [`FutureData::is_ready`].
472    is_ready: unsafe fn(NonNull<u8>) -> bool,
473
474    /// A function pointer to [`FutureData::poll_unchecked`].
475    poll_unchecked: unsafe fn(NonNull<u8>) -> bool,
476
477    /// A function pointer to [`FutureData::drop`].
478    drop: unsafe fn(NonNull<u8>),
479
480    /// A function pointer to [`FutureData::wake_send`].
481    wake_send: unsafe fn(NonNull<u8>),
482
483    /// A function pointer to [`FutureData::delegate_consume`].
484    //
485    // Since future return type is unknown here, this type erased function
486    // pointer must be cast with correct type like
487    // 'unsafe fn(NonNull<u8>, Arg)'.
488    delegate_consume: unsafe fn(),
489}
490
491fn raw_waker_vtable() -> &'static RawWakerVTable {
492    /// * data - A pointer to [`FutureData`].
493    unsafe fn clone(data: *const ()) -> RawWaker {
494        RawWaker::new(data, raw_waker_vtable())
495    }
496
497    /// * data - A pointer to [`FutureData`].
498    unsafe fn wake(data: *const ()) {
499        unsafe { wake_by_ref(data) }
500    }
501
502    /// * data - A pointer to [`FutureData`].
503    unsafe fn wake_by_ref(data: *const ()) {
504        unsafe {
505            let vtable = data.cast::<FutureVTable>().as_ref().unwrap_unchecked();
506            let data = NonNull::new_unchecked(data.cast::<u8>().cast_mut());
507            (vtable.wake_send)(data)
508        }
509    }
510
511    /// * data - A pointer to [`FutureData`].
512    //
513    // This is a drop function for `std::task::RawWaker`, not for `FutureData`.
514    // We're treating `UnsafeFuture` as the `RawWaker`,
515    // So we don't have to do something here.
516    unsafe fn drop(_data: *const ()) {}
517
518    static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
519
520    &VTABLE
521}
522
523/// Runs the given future to completion on the current worker.
524///
525/// This blocks until the given future is complete, then returns the result of
526/// the future.
527///
528/// # Examples
529///
530/// ```
531/// use my_ecs::ds::block_on;
532/// use std::{
533///     future::Future,
534///     task::{Poll, Context},
535///     pin::Pin,
536/// };
537///
538/// struct MyFuture {
539///     count: u32,
540///     result: u32,
541/// }
542///
543/// impl Future for MyFuture {
544///     type Output = u32;
545///
546///     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
547///         let this = self.get_mut();
548///         if this.count == 0 {
549///             Poll::Ready(this.result)
550///         } else {
551///             this.count -= 1;
552///             this.result += 1;
553///             cx.waker().wake_by_ref();
554///             Poll::Pending
555///         }
556///     }
557/// }
558///
559/// let res = block_on(MyFuture { count: 2, result: 0 });
560/// assert_eq!(res, 2);
561/// ```
562pub fn block_on<F, R>(future: F) -> R
563where
564    F: Future<Output = R> + 'static,
565    R: 'static,
566{
567    let unparked = Arc::new(AtomicBool::new(false));
568    let waker = Waker {
569        th: thread::current(),
570        unparked: Arc::clone(&unparked),
571    };
572
573    // The future and its output won't be sent elsewhere.
574    let future = DoNotSend(future);
575    let future = UnsafeFuture::new(future, waker, |r, ()| r);
576
577    loop {
578        unsafe {
579            match future.poll() {
580                Poll::Ready(()) => {
581                    let ready_future = ReadyFuture::new(future);
582                    let ret: DoNotSend<R> = ready_future.consume(());
583                    return ret.0;
584                }
585                Poll::Pending => {
586                    while !unparked.load(Ordering::Relaxed) {
587                        thread::park();
588                    }
589                    unparked.store(false, Ordering::Relaxed);
590                }
591            };
592        }
593    }
594
595    // === Internal structs ===
596
597    #[derive(Clone)]
598    struct Waker {
599        th: Thread,
600        unparked: Arc<AtomicBool>,
601    }
602
603    impl WakeSend for Waker {
604        fn wake_send(&self, _handle: UnsafeFuture) {
605            self.unparked.store(true, Ordering::Relaxed);
606            self.th.unpark(); // Release in terms of Ordering.
607        }
608    }
609
610    #[repr(transparent)]
611    struct DoNotSend<T>(T);
612
613    unsafe impl<T> Send for DoNotSend<T> {}
614
615    impl<T: Future> Future for DoNotSend<T> {
616        type Output = DoNotSend<T::Output>;
617
618        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
619            // Safety: Possible thanks to repr(transparent)
620            let this: Pin<&mut T> = unsafe { mem::transmute(self) };
621            match this.poll(cx) {
622                Poll::Pending => Poll::Pending,
623                Poll::Ready(r) => Poll::Ready(DoNotSend(r)),
624            }
625        }
626    }
627}
628
629#[cfg(test)]
630mod tests {
631    #[allow(unused)]
632    use super::*;
633
634    #[cfg(not(target_arch = "wasm32"))]
635    #[test]
636    fn test_block_on() {
637        use async_io::Timer;
638        use std::{sync::Arc, thread, time::Duration};
639
640        let tid = Arc::new(thread::current().id());
641
642        // Future will be run on the same thread calling `block_on`.
643        let future = async move {
644            let cur_tid = thread::current().id();
645            assert_eq!(cur_tid, *tid);
646            Timer::after(Duration::from_millis(1)).await;
647
648            let cur_tid = thread::current().id();
649            assert_eq!(cur_tid, *tid);
650            Timer::after(Duration::from_millis(1)).await;
651
652            let cur_tid = thread::current().id();
653            assert_eq!(cur_tid, *tid);
654            42
655        };
656
657        let res = block_on(future);
658        assert_eq!(res, 42);
659    }
660}