my_ecs/ds/fut.rs
1use std::{
2 future::Future,
3 marker::PhantomPinned,
4 mem,
5 pin::Pin,
6 ptr::NonNull,
7 sync::{
8 atomic::{AtomicBool, Ordering},
9 Arc,
10 },
11 task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
12 thread::{self, Thread},
13};
14
15/// A trait to wake a worker and send [`UnsafeFuture`] to the worker.
16pub trait WakeSend: Send + Sync + Clone + 'static {
17 /// Wakes associated worker and send [`UnsafeFuture`] to the worker.
18 fn wake_send(&self, handle: UnsafeFuture);
19}
20
21/// A handle to a future data.
22///
23/// Name contains `future`, but this struct doesn't implement [`Future`] trait.
24/// It provides you poll function instead. You can call poll function on a
25/// handle and get the result if the `FutureData` is ready.
26///
27/// Plus, this is actually a type-erased pointer to a `FutureData` so that
28/// owners must deal with the pointer carefully. See the example below to get a
29/// feel for how to use the struct.
30///
31/// # Examples
32///
33/// ```
34/// use my_ecs::ds::{WakeSend, UnsafeFuture, ReadyFuture};
35/// use std::{
36/// future::Future,
37/// task::{Poll, Context},
38/// sync::mpsc::{self, Sender},
39/// pin::Pin,
40/// };
41///
42/// #[derive(Clone)]
43/// struct MyWaker(Sender<UnsafeFuture>);
44///
45/// impl WakeSend for MyWaker {
46/// fn wake_send(&self, handle: UnsafeFuture) {
47/// self.0.send(handle).unwrap();
48/// }
49/// }
50///
51/// struct MyFuture(u32);
52///
53/// impl Future for MyFuture {
54/// type Output = u32;
55///
56/// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
57/// let this = self.get_mut();
58/// if this.0 == 0 {
59/// Poll::Ready(10)
60/// } else {
61/// this.0 -= 1;
62/// cx.waker().wake_by_ref();
63/// Poll::Pending
64/// }
65/// }
66/// }
67///
68/// let (tx, rx) = mpsc::channel();
69/// let fut = MyFuture(2);
70/// let waker = MyWaker(tx);
71/// let consume = |ret: u32, arg: u32| ret + arg;
72/// let mut u_fut = UnsafeFuture::new(fut, waker, consume);
73///
74/// unsafe {
75/// let mut pending = 0;
76/// while u_fut.poll() == Poll::Pending {
77/// u_fut = rx.recv().unwrap();
78/// pending += 1;
79/// }
80/// let r_fut = ReadyFuture::new(u_fut);
81/// let res: u32 = r_fut.consume(1);
82///
83/// assert_eq!(pending, 2);
84/// assert_eq!(res, consume(10, 1));
85/// }
86/// ```
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88#[repr(transparent)]
89pub struct UnsafeFuture {
90 /// Type-erased pointer to a future data.
91 ///
92 /// This pointer is something like 'data' field of [`RawWaker`].
93 data: NonNull<u8>,
94}
95
96unsafe impl Send for UnsafeFuture {}
97
98impl UnsafeFuture {
99 /// Creates a future data in heap memory and returns its handle.
100 ///
101 /// # Leaks
102 ///
103 /// There will be memory leak if caller doesn't deallocate the future data.
104 /// Future data can be deallocated by
105 /// - Calling [`UnsafeFuture::destroy`].
106 /// - Turning [`UnsafeFuture`] into [`ReadyFuture`] then dropping it.
107 /// `ReadyFuture` calls [`UnsafeFuture::destroy`] when it's dropped.
108 ///
109 /// # Examples
110 ///
111 /// See [`UnsafeFuture`] documentation.
112 pub fn new<F, R, W, Arg, CR>(future: F, waker: W, consume: fn(R, Arg) -> CR) -> Self
113 where
114 F: Future<Output = R> + Send + 'static,
115 R: Send + 'static,
116 W: WakeSend,
117 {
118 let pinned = FutureData::new(future, waker, consume);
119
120 // `FutureData` is created wrapped in `Pin<Box<T>>` because it's another type of future.
121 // But this struct will manage it through its pointer, so let's unwrap it.
122 //
123 // # Safety: We won't move the `FutureData` in this struct.
124 let data = unsafe {
125 let boxed = Pin::into_inner_unchecked(pinned);
126 NonNull::new_unchecked(Box::into_raw(boxed)).cast::<u8>()
127 };
128
129 Self { data }
130 }
131
132 /// Drops and deallocates associated future data.
133 ///
134 /// You may need this method when you have to cancel out not ready futures.
135 ///
136 /// # Safety
137 ///
138 /// This method must be called only once for the same handles.
139 pub unsafe fn destroy(self) {
140 // Safety
141 // - `self.data` is a valid pointer to a `FutureData`.
142 // - drop method will be called only once.
143 unsafe {
144 let vtable = self.data.cast::<FutureVTable>().as_mut();
145 (vtable.drop)(self.data)
146 }
147 }
148
149 /// Returns true if associated future data is ready.
150 ///
151 /// # Safety
152 ///
153 /// Undefined behavior if associated `FutureData` has been dropped.
154 pub unsafe fn is_ready(&self) -> bool {
155 unsafe {
156 let vtable = self.data.cast::<FutureVTable>().as_mut();
157 (vtable.is_ready)(self.data)
158 }
159 }
160
161 /// Tries to make more progress on the associated future data.
162 ///
163 /// Returning value [`Poll::Ready`] means the `FutureData` is completely
164 /// resolved and ready to provide its output. [`Poll::Pending`], on the
165 /// other hand, means the `FutureData` is not yet ready and will wake async
166 /// runtime via the waker you inserted at [`UnsafeFuture::new`] when it can
167 /// make more progress.
168 ///
169 /// # Safety
170 ///
171 /// Associated future data must be alive, not have been dropped.
172 ///
173 /// # Examples
174 ///
175 /// See [`UnsafeFuture`] documentation.
176 pub unsafe fn poll(self) -> Poll<()> {
177 unsafe {
178 let vtable = self.data.cast::<FutureVTable>().as_mut();
179 if (vtable.poll_unchecked)(self.data) {
180 Poll::Ready(())
181 } else {
182 Poll::Pending
183 }
184 }
185 }
186
187 /// Returns true if the given waker is the same as the type you inserted at
188 /// [`UnsafeFuture::new`].
189 ///
190 /// # Safety
191 ///
192 /// Waker type `W` must be the same as the type you inserted at
193 /// [`UnsafeFuture::new`].
194 pub unsafe fn will_wake<W>(self, other: &W) -> bool
195 where
196 W: WakeSend + PartialEq,
197 {
198 unsafe {
199 let waker_ptr = FutureData::<(), (), W, (), ()>::waker_ptr(self.data);
200 waker_ptr.as_ref() == other
201 }
202 }
203
204 /// Sets a new waker to the associated future data.
205 ///
206 /// # Safety
207 ///
208 /// Waker type `W` must be the same as the type you inserted at
209 /// [`UnsafeFuture::new`].
210 pub unsafe fn set_waker<W>(self, waker: W) -> W
211 where
212 W: WakeSend,
213 {
214 let old = unsafe { FutureData::<(), (), W, (), ()>::waker_ptr(self.data).as_mut() };
215 mem::replace(old, waker)
216 }
217
218 /// # Safety
219 ///
220 /// Argument types `Arg` and `CR` must be the same as the types determined
221 /// on [`UnsafeFuture::new`].
222 unsafe fn consume<Arg, CR>(self, arg: Arg) -> CR {
223 unsafe {
224 let vtable = self.data.cast::<FutureVTable>().as_mut();
225 let delegate_consume = mem::transmute::<unsafe fn(), unsafe fn(NonNull<u8>, Arg) -> CR>(
226 vtable.delegate_consume,
227 );
228 delegate_consume(self.data, arg)
229 }
230 }
231}
232
233/// A handle to a *ready* future data.
234///
235/// The struct can be created from ready [`UnsafeFuture`] only, and it doesn't
236/// provide methods such as poll except [`ReadyFuture::consume`]. You can get
237/// the result from the ready `FutureData` through the consume method, then
238/// associated `FutureData` will be dropped and deallocated.
239///
240/// See [`UnsafeFuture`] documentation to see how this struct is used.
241#[derive(Debug)]
242#[repr(transparent)]
243pub struct ReadyFuture(UnsafeFuture);
244
245impl ReadyFuture {
246 /// Creates a new [`ReadyFuture`] from the given ready [`UnsafeFuture`].
247 ///
248 /// # Panics
249 ///
250 /// Panics if associated future data is not ready.
251 ///
252 /// # Safety
253 ///
254 /// Undefined behavior if associated `FutureData` is not alive.
255 ///
256 /// # Examples
257 ///
258 /// See [`UnsafeFuture`] documentation.
259 pub unsafe fn new(future: UnsafeFuture) -> Self {
260 assert!(unsafe { future.is_ready() });
261
262 Self(future)
263 }
264
265 /// Takes the result out of associated future data, then converts it by
266 /// the consume function registered at [`UnsafeFuture::new`], and then
267 /// returns the converted result.
268 ///
269 /// By taking `self`, it's dropped at the end of the method, then drops and
270 /// deallocates the associated future data as well.
271 ///
272 /// # Safety
273 ///
274 /// `Arg` and `CR` must be the same as the types determined on
275 /// [`UnsafeFuture::new`].
276 ///
277 /// # Examples
278 ///
279 /// See [`UnsafeFuture`] documentation.
280 pub unsafe fn consume<Arg, CR>(self, arg: Arg) -> CR {
281 unsafe { self.0.consume(arg) }
282 // `self` goes out of scope then be dropped.
283 }
284}
285
286impl Drop for ReadyFuture {
287 fn drop(&mut self) {
288 unsafe { self.0.destroy() };
289 }
290}
291
292#[derive(Debug)]
293#[repr(C)]
294struct FutureData<F, R, W, Arg, CR> {
295 /// Functions that receive a pointer to this struct as first parameter.
296 //
297 // This field must be located at the first position of this struct, So, raw
298 // pointers to this structs can be translated as `FutureVTable`s as well,
299 // in turn, clients can call to various functions in vtable just using the
300 // one pointer.
301 vtable: FutureVTable,
302
303 /// Waker that wakes up the polling thread, a.k.a. executor or runtime.
304 //
305 // This field must be located at the second position of this struct, So, raw
306 // pointers to this structs can be translated as `W`s as well,
307 waker: W,
308
309 /// Future data.
310 future: F,
311
312 /// Output of the future.
313 output: Option<R>,
314
315 /// Function consuming the output with anonymous argument.
316 consume: fn(R, Arg) -> CR,
317
318 /// Atomic variable to synchronize memory over workers.
319 sync: AtomicBool,
320
321 _pin: PhantomPinned,
322}
323
324impl<F, R, W, Arg, CR> FutureData<F, R, W, Arg, CR>
325where
326 F: Future<Output = R> + Send + 'static,
327 R: Send + 'static,
328 W: WakeSend,
329{
330 fn new(future: F, waker: W, consume: fn(R, Arg) -> CR) -> Pin<Box<Self>> {
331 // Erases type `Arg` and `CR` from `delegate_consume`, so we can hold
332 // it.
333 let delegate_consume = unsafe {
334 mem::transmute::<unsafe fn(NonNull<u8>, Arg) -> CR, unsafe fn()>(Self::delegate_consume)
335 };
336
337 // See vtable functions below.
338 let vtable = FutureVTable {
339 is_ready: Self::is_ready,
340 poll_unchecked: Self::poll_unchecked,
341 drop: Self::drop,
342 wake_send: Self::wake_send,
343 delegate_consume,
344 };
345
346 Box::pin(Self {
347 vtable,
348 waker,
349 future,
350 output: None,
351 consume,
352 sync: AtomicBool::new(false),
353 _pin: PhantomPinned,
354 })
355 }
356
357 /// * data - A pointer to [`FutureData`].
358 ///
359 /// # Safety
360 ///
361 /// - The given pointer must be a valid pointer to *pinned* [`FutureData`].
362 unsafe fn is_ready(data: NonNull<u8>) -> bool {
363 let this = unsafe { data.cast::<FutureData<F, R, W, Arg, CR>>().as_mut() };
364 this.output.is_some()
365 }
366
367 /// * data - A pointer to [`FutureData`].
368 ///
369 /// # Safety
370 ///
371 /// - The given pointer must be a valid pointer to *pinned* [`FutureData`].
372 unsafe fn poll_unchecked(data: NonNull<u8>) -> bool {
373 let this = unsafe { data.cast::<FutureData<F, R, W, Arg, CR>>().as_mut() };
374
375 // Synchronize memory.
376 //
377 // Future data, `FutureData`, and its handle, `UnsafeFuture` are designed
378 // to be stolen by other workers, which makes a problem in terms of
379 // synchronization.
380 // Imagine `A` polled on a future data and wrote something on it. `B`
381 // wakes `C` up and gives future handle through `WakeSend`
382 // implementation. Here's the problem. `B` and `C` may be synchronized,
383 // but `A` and `C` isn't. Therefore, `C` cannot see what `A` made on the
384 // future data.
385 // This atomic variable synchronizes memory for polling workers.
386 //
387 // Is spin lock without limit fine?
388 // Blocking here means that poll() below results in wake-poll by another
389 // worker before atomic store operation finished.
390 // Therefore, we have to wait just one atomic store operation, which
391 // will be finished quickly.
392 while this
393 .sync
394 .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
395 .is_err()
396 {
397 thread::yield_now();
398 }
399
400 let pinned_future = unsafe { Pin::new_unchecked(&mut this.future) };
401
402 // Creates `Context` from the given data pointer.
403 let data = data.as_ptr().cast_const().cast::<()>();
404 let raw_waker = RawWaker::new(data, raw_waker_vtable());
405 let waker = unsafe { Waker::from_raw(raw_waker) };
406 let mut cx = Context::from_waker(&waker);
407
408 // Polls the future and returns true if it's ready.
409 let res = if let Poll::Ready(output) = pinned_future.poll(&mut cx) {
410 this.output = Some(output);
411 true
412 } else {
413 false
414 };
415
416 // Synchronize memory.
417 this.sync.store(false, Ordering::Release);
418
419 res
420 }
421
422 /// Calls drop methods on [`FutureData`] pointed by the given data pointer,
423 /// then release the memory.
424 ///
425 /// * data - A pointer to [`FutureData`].
426 ///
427 /// # Safety
428 ///
429 /// The given pointer must be a valid pointer to [`FutureData`].
430 unsafe fn drop(data: NonNull<u8>) {
431 unsafe {
432 let this = data.cast::<FutureData<F, R, W, Arg, CR>>().as_mut();
433 drop(Box::from_raw(this));
434 };
435 }
436
437 /// * data - A pointer to [`FutureData`].
438 ///
439 /// # Safety
440 ///
441 /// The given pointer must be a valid pointer to [`FutureData`].
442 unsafe fn wake_send(data: NonNull<u8>) {
443 let this = unsafe { data.cast::<FutureData<F, R, W, Arg, CR>>().as_mut() };
444 this.waker.wake_send(UnsafeFuture { data })
445 }
446
447 unsafe fn delegate_consume(data: NonNull<u8>, arg: Arg) -> CR {
448 unsafe {
449 let this = data.cast::<FutureData<F, R, W, Arg, CR>>().as_mut();
450 let output: R = this.output.take().unwrap_unchecked();
451 (this.consume)(output, arg)
452 }
453 }
454}
455
456impl<W> FutureData<(), (), W, (), ()> {
457 // Address of waker is determined by its alignment only.
458 // It doesn't depend on `F` and `R` because it is located right after
459 // `FutureVTable` which has fixed size.
460 unsafe fn waker_ptr(data: NonNull<u8>) -> NonNull<W> {
461 unsafe {
462 let this = data.cast::<FutureData<(), (), W, (), ()>>().as_mut();
463 let ptr = &mut this.waker as *mut W;
464 NonNull::new_unchecked(ptr)
465 }
466 }
467}
468
469#[derive(Debug, Clone, Copy)]
470struct FutureVTable {
471 /// A function pointer to [`FutureData::is_ready`].
472 is_ready: unsafe fn(NonNull<u8>) -> bool,
473
474 /// A function pointer to [`FutureData::poll_unchecked`].
475 poll_unchecked: unsafe fn(NonNull<u8>) -> bool,
476
477 /// A function pointer to [`FutureData::drop`].
478 drop: unsafe fn(NonNull<u8>),
479
480 /// A function pointer to [`FutureData::wake_send`].
481 wake_send: unsafe fn(NonNull<u8>),
482
483 /// A function pointer to [`FutureData::delegate_consume`].
484 //
485 // Since future return type is unknown here, this type erased function
486 // pointer must be cast with correct type like
487 // 'unsafe fn(NonNull<u8>, Arg)'.
488 delegate_consume: unsafe fn(),
489}
490
491fn raw_waker_vtable() -> &'static RawWakerVTable {
492 /// * data - A pointer to [`FutureData`].
493 unsafe fn clone(data: *const ()) -> RawWaker {
494 RawWaker::new(data, raw_waker_vtable())
495 }
496
497 /// * data - A pointer to [`FutureData`].
498 unsafe fn wake(data: *const ()) {
499 unsafe { wake_by_ref(data) }
500 }
501
502 /// * data - A pointer to [`FutureData`].
503 unsafe fn wake_by_ref(data: *const ()) {
504 unsafe {
505 let vtable = data.cast::<FutureVTable>().as_ref().unwrap_unchecked();
506 let data = NonNull::new_unchecked(data.cast::<u8>().cast_mut());
507 (vtable.wake_send)(data)
508 }
509 }
510
511 /// * data - A pointer to [`FutureData`].
512 //
513 // This is a drop function for `std::task::RawWaker`, not for `FutureData`.
514 // We're treating `UnsafeFuture` as the `RawWaker`,
515 // So we don't have to do something here.
516 unsafe fn drop(_data: *const ()) {}
517
518 static VTABLE: RawWakerVTable = RawWakerVTable::new(clone, wake, wake_by_ref, drop);
519
520 &VTABLE
521}
522
523/// Runs the given future to completion on the current worker.
524///
525/// This blocks until the given future is complete, then returns the result of
526/// the future.
527///
528/// # Examples
529///
530/// ```
531/// use my_ecs::ds::block_on;
532/// use std::{
533/// future::Future,
534/// task::{Poll, Context},
535/// pin::Pin,
536/// };
537///
538/// struct MyFuture {
539/// count: u32,
540/// result: u32,
541/// }
542///
543/// impl Future for MyFuture {
544/// type Output = u32;
545///
546/// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
547/// let this = self.get_mut();
548/// if this.count == 0 {
549/// Poll::Ready(this.result)
550/// } else {
551/// this.count -= 1;
552/// this.result += 1;
553/// cx.waker().wake_by_ref();
554/// Poll::Pending
555/// }
556/// }
557/// }
558///
559/// let res = block_on(MyFuture { count: 2, result: 0 });
560/// assert_eq!(res, 2);
561/// ```
562pub fn block_on<F, R>(future: F) -> R
563where
564 F: Future<Output = R> + 'static,
565 R: 'static,
566{
567 let unparked = Arc::new(AtomicBool::new(false));
568 let waker = Waker {
569 th: thread::current(),
570 unparked: Arc::clone(&unparked),
571 };
572
573 // The future and its output won't be sent elsewhere.
574 let future = DoNotSend(future);
575 let future = UnsafeFuture::new(future, waker, |r, ()| r);
576
577 loop {
578 unsafe {
579 match future.poll() {
580 Poll::Ready(()) => {
581 let ready_future = ReadyFuture::new(future);
582 let ret: DoNotSend<R> = ready_future.consume(());
583 return ret.0;
584 }
585 Poll::Pending => {
586 while !unparked.load(Ordering::Relaxed) {
587 thread::park();
588 }
589 unparked.store(false, Ordering::Relaxed);
590 }
591 };
592 }
593 }
594
595 // === Internal structs ===
596
597 #[derive(Clone)]
598 struct Waker {
599 th: Thread,
600 unparked: Arc<AtomicBool>,
601 }
602
603 impl WakeSend for Waker {
604 fn wake_send(&self, _handle: UnsafeFuture) {
605 self.unparked.store(true, Ordering::Relaxed);
606 self.th.unpark(); // Release in terms of Ordering.
607 }
608 }
609
610 #[repr(transparent)]
611 struct DoNotSend<T>(T);
612
613 unsafe impl<T> Send for DoNotSend<T> {}
614
615 impl<T: Future> Future for DoNotSend<T> {
616 type Output = DoNotSend<T::Output>;
617
618 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
619 // Safety: Possible thanks to repr(transparent)
620 let this: Pin<&mut T> = unsafe { mem::transmute(self) };
621 match this.poll(cx) {
622 Poll::Pending => Poll::Pending,
623 Poll::Ready(r) => Poll::Ready(DoNotSend(r)),
624 }
625 }
626 }
627}
628
629#[cfg(test)]
630mod tests {
631 #[allow(unused)]
632 use super::*;
633
634 #[cfg(not(target_arch = "wasm32"))]
635 #[test]
636 fn test_block_on() {
637 use async_io::Timer;
638 use std::{sync::Arc, thread, time::Duration};
639
640 let tid = Arc::new(thread::current().id());
641
642 // Future will be run on the same thread calling `block_on`.
643 let future = async move {
644 let cur_tid = thread::current().id();
645 assert_eq!(cur_tid, *tid);
646 Timer::after(Duration::from_millis(1)).await;
647
648 let cur_tid = thread::current().id();
649 assert_eq!(cur_tid, *tid);
650 Timer::after(Duration::from_millis(1)).await;
651
652 let cur_tid = thread::current().id();
653 assert_eq!(cur_tid, *tid);
654 42
655 };
656
657 let res = block_on(future);
658 assert_eq!(res, 42);
659 }
660}