dyn_future/
lib.rs

1use once_cell::unsync::Lazy;
2use spin::Mutex;
3use std::alloc::Layout;
4use std::cell::RefCell;
5use std::collections::VecDeque;
6use std::future::Future;
7use std::mem;
8use std::ops::{Deref, DerefMut};
9use std::pin::Pin;
10use std::ptr;
11use std::sync::Arc;
12use std::task::{Context, Poll};
13
14/// Allocator region size of 256 KiB.
15const REGION_SIZE: usize = 256 * 1024;
16
17/// Try 2 regions before creating a new one.
18const REGION_TRIES: usize = 2;
19
20thread_local! {
21    static POOL: Lazy<RefCell<Pool>> = Lazy::new(|| RefCell::new(Pool::new()));
22}
23
24fn align_up(mut v: usize, a: usize) -> usize {
25    debug_assert!(a.is_power_of_two());
26
27    let d = a - 1;
28    while v & d != 0 {
29        v = v.next_power_of_two();
30    }
31
32    v
33}
34
35struct Pool {
36    queue: VecDeque<Arc<Mutex<Region>>>,
37}
38
39impl Pool {
40    fn new() -> Self {
41        let mut queue = VecDeque::new();
42        for _ in 0..REGION_TRIES {
43            queue.push_front(Arc::new(Mutex::new(Region::new())));
44        }
45        Self { queue }
46    }
47
48    fn allocate<T, F: Future<Output = T> + Send + 'static>(
49        &mut self,
50        f: F,
51    ) -> (
52        &'static mut (dyn Future<Output = T> + Send),
53        Arc<Mutex<Region>>,
54    ) {
55        let mut f = Some(f);
56
57        for _ in 0..REGION_TRIES {
58            let region = self.queue.pop_front().unwrap();
59            let mut r = region.lock();
60
61            match r.allocate(f.take().unwrap()) {
62                Ok(ptr) => {
63                    drop(r);
64                    self.queue.push_front(region.clone());
65                    return (unsafe { mem::transmute(ptr) }, region);
66                }
67                Err(v) => {
68                    drop(r);
69                    self.queue.push_back(region);
70                    f = Some(v);
71                }
72            }
73        }
74
75        self.queue.push_front(Arc::new(Mutex::new(Region::new())));
76        self.allocate(f.unwrap())
77    }
78}
79
80struct Region {
81    start: usize,
82    next: usize,
83    refs: usize,
84}
85
86impl Region {
87    fn new() -> Self {
88        let start: Box<[u8; REGION_SIZE]> = Box::new([0; REGION_SIZE]);
89        let start = Box::into_raw(start) as usize;
90
91        Self {
92            start,
93            next: start,
94            refs: 0,
95        }
96    }
97
98    fn deallocate(&mut self) {
99        self.refs -= 1;
100        if self.refs == 0 {
101            self.next = self.start;
102        }
103    }
104
105    fn allocate<T, F: Future<Output = T> + Send + 'static>(
106        &mut self,
107        f: F,
108    ) -> Result<*mut (dyn Future<Output = T> + Send), F> {
109        let layout = Layout::for_value(&f);
110        let aligned_next = align_up(self.next, layout.align());
111        let potential_end = aligned_next + layout.size();
112        if potential_end > self.start + REGION_SIZE {
113            return Err(f);
114        }
115        unsafe {
116            self.refs += 1;
117            self.next = aligned_next + layout.size();
118            ptr::write(aligned_next as *mut F, f);
119            let faked_box = Box::from_raw(aligned_next as *mut F);
120            let faked_box = faked_box as Box<dyn Future<Output = T> + Send>;
121            Ok(Box::into_raw(faked_box))
122        }
123    }
124}
125
126/// Represents an asyncronous computation.
127///
128/// This is different from `std::future::Future` in that it
129/// is a fixed size struct with a boxed future inside.
130///
131/// Using this is advantageous over `Box<dyn Future<Output = T>>` due to less overhead.
132#[must_use = "futures do nothing unless polled"]
133pub struct DynFuture<T: 'static> {
134    inner: Pin<*mut (dyn Future<Output = T> + Send)>,
135    region: Arc<Mutex<Region>>,
136}
137
138unsafe impl<T> Send for DynFuture<T> {}
139
140impl<T> Unpin for DynFuture<T> {}
141
142impl<T> Deref for DynFuture<T> {
143    type Target = dyn Future<Output = T> + Send;
144
145    #[inline]
146    fn deref(&self) -> &Self::Target {
147        unsafe { mem::transmute(self.inner) }
148    }
149}
150
151impl<T> DerefMut for DynFuture<T> {
152    #[inline]
153    fn deref_mut(&mut self) -> &mut Self::Target {
154        unsafe { mem::transmute(self.inner) }
155    }
156}
157
158impl<T> Drop for DynFuture<T> {
159    #[inline]
160    fn drop(&mut self) {
161        let ptr = self.deref_mut();
162        let (data, fdrop): (&mut u8, &fn(&mut u8)) = unsafe { mem::transmute(ptr) };
163        fdrop(data);
164
165        self.region.lock().deallocate();
166    }
167}
168
169impl<T> DynFuture<T> {
170    /// Creates a new `DynFuture` from a `std::future::Future`.
171    ///
172    /// This method may but rarely calls the global allocator.
173    /// Almost all allocations occur via a fast path using a bump allocator.
174    #[inline]
175    pub fn new(f: impl Future<Output = T> + Send + 'static) -> Pin<Self> {
176        let (inner, region) = POOL.with(|pool| pool.borrow_mut().allocate(f));
177        unsafe {
178            Pin::new_unchecked({
179                Self {
180                    inner: mem::transmute(inner),
181                    region,
182                }
183            })
184        }
185    }
186
187    /// Converts a `DynFuture<T>` into a `Pin<Box<dyn Future<Output = T> + Send>>`.
188    #[inline]
189    pub fn into_boxed(self) -> Pin<Box<dyn Future<Output = T> + Send>> {
190        Box::pin(self)
191    }
192
193    /// Converts a `Pin<Box<dyn Future<Output = T> + Send>>` into a `DynFuture<T>`.
194    #[inline]
195    pub fn from_boxed(boxed: Pin<Box<dyn Future<Output = T> + Send>>) -> Pin<Self> {
196        Self::new(boxed)
197    }
198}
199
200impl<T> Future for DynFuture<T> {
201    type Output = T;
202
203    #[inline]
204    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
205        let inner: Pin<&mut (dyn Future<Output = T> + Send)> =
206            unsafe { mem::transmute(self.inner) };
207        inner.poll(cx)
208    }
209}