dyn-future 3.0.4

Convenient and fast dynamic Futures for Rust.
Documentation
use once_cell::unsync::Lazy;
use spin::Mutex;
use std::alloc::Layout;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::future::Future;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::ptr;
use std::sync::Arc;
use std::task::{Context, Poll};

/// Allocator region size of 256 KiB.
const REGION_SIZE: usize = 256 * 1024;

/// Try 2 regions before creating a new one.
const REGION_TRIES: usize = 2;

thread_local! {
    static POOL: Lazy<RefCell<Pool>> = Lazy::new(|| RefCell::new(Pool::new()));
}

fn align_up(mut v: usize, a: usize) -> usize {
    debug_assert!(a.is_power_of_two());

    let d = a - 1;
    while v & d != 0 {
        v = v.next_power_of_two();
    }

    v
}

struct Pool {
    queue: VecDeque<Arc<Mutex<Region>>>,
}

impl Pool {
    fn new() -> Self {
        let mut queue = VecDeque::new();
        for _ in 0..REGION_TRIES {
            queue.push_front(Arc::new(Mutex::new(Region::new())));
        }
        Self { queue }
    }

    fn allocate<T, F: Future<Output = T> + Send + 'static>(
        &mut self,
        f: F,
    ) -> (
        &'static mut (dyn Future<Output = T> + Send),
        Arc<Mutex<Region>>,
    ) {
        let mut f = Some(f);

        for _ in 0..REGION_TRIES {
            let region = self.queue.pop_front().unwrap();
            let mut r = region.lock();

            match r.allocate(f.take().unwrap()) {
                Ok(ptr) => {
                    drop(r);
                    self.queue.push_front(region.clone());
                    return (unsafe { mem::transmute(ptr) }, region);
                }
                Err(v) => {
                    drop(r);
                    self.queue.push_back(region);
                    f = Some(v);
                }
            }
        }

        self.queue.push_front(Arc::new(Mutex::new(Region::new())));
        self.allocate(f.unwrap())
    }
}

struct Region {
    start: usize,
    next: usize,
    refs: usize,
}

impl Region {
    fn new() -> Self {
        let start: Box<[u8; REGION_SIZE]> = Box::new([0; REGION_SIZE]);
        let start = Box::into_raw(start) as usize;

        Self {
            start,
            next: start,
            refs: 0,
        }
    }

    fn deallocate(&mut self) {
        self.refs -= 1;
        if self.refs == 0 {
            self.next = self.start;
        }
    }

    fn allocate<T, F: Future<Output = T> + Send + 'static>(
        &mut self,
        f: F,
    ) -> Result<*mut (dyn Future<Output = T> + Send), F> {
        let layout = Layout::for_value(&f);
        let aligned_next = align_up(self.next, layout.align());
        let potential_end = aligned_next + layout.size();
        if potential_end > self.start + REGION_SIZE {
            return Err(f);
        }
        unsafe {
            self.refs += 1;
            self.next = aligned_next + layout.size();
            ptr::write(aligned_next as *mut F, f);
            let faked_box = Box::from_raw(aligned_next as *mut F);
            let faked_box = faked_box as Box<dyn Future<Output = T> + Send>;
            Ok(Box::into_raw(faked_box))
        }
    }
}

/// Represents an asyncronous computation.
///
/// This is different from `std::future::Future` in that it
/// is a fixed size struct with a boxed future inside.
///
/// Using this is advantageous over `Box<dyn Future<Output = T>>` due to less overhead.
#[must_use = "futures do nothing unless polled"]
pub struct DynFuture<T: 'static> {
    inner: Pin<*mut (dyn Future<Output = T> + Send)>,
    region: Arc<Mutex<Region>>,
}

unsafe impl<T> Send for DynFuture<T> {}

impl<T> Unpin for DynFuture<T> {}

impl<T> Deref for DynFuture<T> {
    type Target = dyn Future<Output = T> + Send;

    #[inline]
    fn deref(&self) -> &Self::Target {
        unsafe { mem::transmute(self.inner) }
    }
}

impl<T> DerefMut for DynFuture<T> {
    #[inline]
    fn deref_mut(&mut self) -> &mut Self::Target {
        unsafe { mem::transmute(self.inner) }
    }
}

impl<T> Drop for DynFuture<T> {
    #[inline]
    fn drop(&mut self) {
        let ptr = self.deref_mut();
        let (data, fdrop): (&mut u8, &fn(&mut u8)) = unsafe { mem::transmute(ptr) };
        fdrop(data);

        self.region.lock().deallocate();
    }
}

impl<T> DynFuture<T> {
    /// Creates a new `DynFuture` from a `std::future::Future`.
    ///
    /// This method may but rarely calls the global allocator.
    /// Almost all allocations occur via a fast path using a bump allocator.
    #[inline]
    pub fn new(f: impl Future<Output = T> + Send + 'static) -> Pin<Self> {
        let (inner, region) = POOL.with(|pool| pool.borrow_mut().allocate(f));
        unsafe {
            Pin::new_unchecked({
                Self {
                    inner: mem::transmute(inner),
                    region,
                }
            })
        }
    }

    /// Converts a `DynFuture<T>` into a `Pin<Box<dyn Future<Output = T> + Send>>`.
    #[inline]
    pub fn into_boxed(self) -> Pin<Box<dyn Future<Output = T> + Send>> {
        Box::pin(self)
    }

    /// Converts a `Pin<Box<dyn Future<Output = T> + Send>>` into a `DynFuture<T>`.
    #[inline]
    pub fn from_boxed(boxed: Pin<Box<dyn Future<Output = T> + Send>>) -> Pin<Self> {
        Self::new(boxed)
    }
}

impl<T> Future for DynFuture<T> {
    type Output = T;

    #[inline]
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        let inner: Pin<&mut (dyn Future<Output = T> + Send)> =
            unsafe { mem::transmute(self.inner) };
        inner.poll(cx)
    }
}