memkit_async/
allocator.rs1use std::alloc::Layout;
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
7use std::sync::Arc;
8use std::task::{Context, Poll};
9
10use crate::backpressure::MkBackpressure;
11
12#[derive(Debug, Clone)]
14pub struct MkAsyncFrameConfig {
15 pub arena_size: usize,
17 pub backpressure: MkBackpressure,
19}
20
21impl Default for MkAsyncFrameConfig {
22 fn default() -> Self {
23 Self {
24 arena_size: 16 * 1024 * 1024, backpressure: MkBackpressure::Wait,
26 }
27 }
28}
29
30pub struct MkAsyncFrameAlloc {
35 inner: Arc<AsyncAllocInner>,
36}
37
38struct AsyncAllocInner {
39 config: MkAsyncFrameConfig,
40 arena: Vec<u8>,
42 head: AtomicUsize,
44 frame: AtomicU64,
46 active_count: AtomicUsize,
48}
49
50impl MkAsyncFrameAlloc {
51 pub fn new(config: MkAsyncFrameConfig) -> Self {
53 let arena = vec![0u8; config.arena_size];
54 Self {
55 inner: Arc::new(AsyncAllocInner {
56 config,
57 arena,
58 head: AtomicUsize::new(0),
59 frame: AtomicU64::new(0),
60 active_count: AtomicUsize::new(0),
61 }),
62 }
63 }
64
65 pub async fn begin_frame(&self) -> MkAsyncFrameGuard {
67 WaitForEmpty::new(&self.inner.active_count).await;
69
70 self.inner.frame.fetch_add(1, Ordering::SeqCst);
72 self.inner.head.store(0, Ordering::SeqCst);
73
74 MkAsyncFrameGuard {
75 alloc: self.clone(),
76 }
77 }
78
79 pub fn frame(&self) -> u64 {
81 self.inner.frame.load(Ordering::SeqCst)
82 }
83
84 pub async fn alloc<T>(&self) -> Option<*mut T> {
86 let layout = Layout::new::<T>();
87 self.alloc_layout(layout).await.map(|p| p as *mut T)
88 }
89
90 pub async fn alloc_layout(&self, layout: Layout) -> Option<*mut u8> {
92 let size = layout.size();
93 let align = layout.align();
94
95 loop {
96 let current = self.inner.head.load(Ordering::Acquire);
97 let aligned = (current + align - 1) & !(align - 1);
98
99 if aligned + size > self.inner.arena.len() {
100 match self.inner.config.backpressure {
102 MkBackpressure::Fail => return None,
103 MkBackpressure::Wait => {
104 tokio_yield().await;
106 continue;
107 }
108 MkBackpressure::Timeout(duration) => {
109 let _ = duration;
111 return None;
112 }
113 MkBackpressure::Evict => {
114 return None;
116 }
117 }
118 }
119
120 match self.inner.head.compare_exchange_weak(
122 current,
123 aligned + size,
124 Ordering::AcqRel,
125 Ordering::Relaxed,
126 ) {
127 Ok(_) => {
128 self.inner.active_count.fetch_add(1, Ordering::Relaxed);
129 let ptr = self.inner.arena.as_ptr() as *mut u8;
130 return Some(unsafe { ptr.add(aligned) });
131 }
132 Err(_) => continue,
133 }
134 }
135 }
136
137 pub fn release(&self) {
139 self.inner.active_count.fetch_sub(1, Ordering::Relaxed);
140 }
141
142 pub fn used(&self) -> usize {
144 self.inner.head.load(Ordering::Relaxed)
145 }
146
147 pub fn remaining(&self) -> usize {
149 self.inner.arena.len() - self.used()
150 }
151}
152
153impl Clone for MkAsyncFrameAlloc {
154 fn clone(&self) -> Self {
155 Self {
156 inner: Arc::clone(&self.inner),
157 }
158 }
159}
160
161pub struct MkAsyncFrameGuard {
163 alloc: MkAsyncFrameAlloc,
164}
165
166impl MkAsyncFrameGuard {
167 pub fn frame(&self) -> u64 {
169 self.alloc.frame()
170 }
171}
172
173impl Drop for MkAsyncFrameGuard {
174 fn drop(&mut self) {
175 self.alloc.inner.head.store(0, Ordering::SeqCst);
177 }
178}
179
180struct WaitForEmpty<'a> {
182 counter: &'a AtomicUsize,
183}
184
185impl<'a> WaitForEmpty<'a> {
186 fn new(counter: &'a AtomicUsize) -> Self {
187 Self { counter }
188 }
189}
190
191impl<'a> Future for WaitForEmpty<'a> {
192 type Output = ();
193
194 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
195 if self.counter.load(Ordering::Acquire) == 0 {
196 Poll::Ready(())
197 } else {
198 cx.waker().wake_by_ref();
200 Poll::Pending
201 }
202 }
203}
204
205async fn tokio_yield() {
207 struct Yield(bool);
208
209 impl Future for Yield {
210 type Output = ();
211
212 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
213 if self.0 {
214 Poll::Ready(())
215 } else {
216 self.0 = true;
217 cx.waker().wake_by_ref();
218 Poll::Pending
219 }
220 }
221 }
222
223 Yield(false).await
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229
230 #[test]
231 fn test_async_frame_alloc_sync() {
232 let alloc = MkAsyncFrameAlloc::new(MkAsyncFrameConfig::default());
233 assert_eq!(alloc.frame(), 0);
234 assert_eq!(alloc.used(), 0);
235 }
236}