dyn-future 2.0.0

Convenient and fast dynamic Futures for Rust.
Documentation
use once_cell::unsync::Lazy;
use std::alloc::Layout;
use spin::Mutex;
use std::collections::VecDeque;
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::ptr;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::cell::RefCell;

/// Allocator region size of 256 KiB.
const REGION_SIZE: usize = 256 * 1024;

/// Try 2 regions before creating a new one.
const REGION_TRIES: usize = 2;

thread_local! {
    static POOL: Lazy<RefCell<Pool>> = Lazy::new(|| RefCell::new(Pool::new()));
}

fn align_up(mut v: usize, a: usize) -> usize {
    while v % a != 0 {
        v = v.next_power_of_two();
    }

    v
}

struct Pool {
    queue: VecDeque<Arc<Mutex<Region>>>,
}

impl Pool {
    fn new() -> Self {
        let mut queue = VecDeque::new();
        for _ in 0..REGION_TRIES {
            queue.push_front(Arc::new(Mutex::new(Region::new())));
        }
        Self { queue }
    }

    fn allocate<T, F: Future<Output = T> + Send + 'static>(
        &mut self,
        f: F,
    ) -> (
        &'static mut (dyn Future<Output = T> + Send),
        Arc<Mutex<Region>>,
    ) {
        let mut f = Some(f);

        for _ in 0..REGION_TRIES {
            let region = self.queue.pop_front().unwrap();
            let mut r = region.lock();

            match r.allocate(f.take().unwrap()) {
                Ok(ptr) => {
                    drop(r);
                    self.queue.push_front(region.clone());
                    return (unsafe { mem::transmute(ptr) }, region);
                }
                Err(v) => {
                    drop(r);
                    self.queue.push_back(region);
                    f = Some(v);
                }
            }
        }

        self.queue.push_front(Arc::new(Mutex::new(Region::new())));
        self.allocate(f.unwrap())
    }
}

struct Region {
    start: usize,
    next: usize,
    refs: usize,
}

impl Region {
    fn new() -> Self {
        let start: Box<[u8; REGION_SIZE]> = Box::new([0; REGION_SIZE]);
        let start = Box::into_raw(start) as usize;

        Self {
            start,
            next: start,
            refs: 0,
        }
    }

    fn deallocate(&mut self) {
        self.refs -= 1;
        if self.refs == 0 {
            self.next = self.start;
        }
    }

    fn allocate<T, F: Future<Output = T> + Send + 'static>(
        &mut self,
        f: F,
    ) -> Result<*mut (dyn Future<Output = T> + Send), F> {
        let layout = Layout::for_value(&f);
        let aligned_next = align_up(self.next, layout.align());
        let potential_end = aligned_next + layout.size();
        if potential_end > self.start + REGION_SIZE {
            return Err(f);
        }
        unsafe {
            self.refs += 1;
            self.next = aligned_next + layout.size();
            ptr::write(aligned_next as *mut F, f);
            let faked_box = Box::from_raw(aligned_next as *mut F);
            let faked_box = faked_box as Box<dyn Future<Output = T> + Send>;
            Ok(Box::into_raw(faked_box))
        }
    }
}

/// Represents an asyncronous computation.
///
/// This is different from `std::future::Future` in that it
/// is a fixed size struct with a boxed future inside.
pub struct DynFuture<T: 'static> {
    inner: Pin<&'static mut (dyn Future<Output = T> + Send)>,
    region: Arc<Mutex<Region>>,
}

impl<T> Drop for DynFuture<T> {
    fn drop(&mut self) {
        self.region.lock().deallocate();
    }
}

impl<T> DynFuture<T> {
    /// Creates a new `DynFuture` from a `std::future::Future`.
    pub fn new(f: impl Future<Output = T> + Send + 'static) -> Self {
        let (inner, region) = POOL.with(|pool| pool.borrow_mut().allocate(f));
        unsafe {
            Self {
                inner: Pin::new_unchecked(inner),
                region,
            }
        }
    }
}

impl<T> Future for DynFuture<T> {
    type Output = T;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        self.inner.as_mut().poll(cx)
    }
}