memkit_async/
allocator.rs

1//! Async-aware frame allocator.
2
3use std::alloc::Layout;
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
7use std::sync::Arc;
8use std::task::{Context, Poll};
9
10use crate::backpressure::MkBackpressure;
11
12/// Configuration for async frame allocator.
13#[derive(Debug, Clone)]
14pub struct MkAsyncFrameConfig {
15    /// Size of each frame arena.
16    pub arena_size: usize,
17    /// Backpressure policy when memory is exhausted.
18    pub backpressure: MkBackpressure,
19}
20
21impl Default for MkAsyncFrameConfig {
22    fn default() -> Self {
23        Self {
24            arena_size: 16 * 1024 * 1024, // 16 MB
25            backpressure: MkBackpressure::Wait,
26        }
27    }
28}
29
30/// Async-aware frame allocator.
31///
32/// Tracks allocations per-task rather than per-thread,
33/// making it safe to use across `.await` points.
34pub struct MkAsyncFrameAlloc {
35    inner: Arc<AsyncAllocInner>,
36}
37
38struct AsyncAllocInner {
39    config: MkAsyncFrameConfig,
40    /// Memory arena
41    arena: Vec<u8>,
42    /// Current allocation head
43    head: AtomicUsize,
44    /// Frame counter
45    frame: AtomicU64,
46    /// Active allocations count (prevents reset while in use)
47    active_count: AtomicUsize,
48}
49
50impl MkAsyncFrameAlloc {
51    /// Create a new async frame allocator.
52    pub fn new(config: MkAsyncFrameConfig) -> Self {
53        let arena = vec![0u8; config.arena_size];
54        Self {
55            inner: Arc::new(AsyncAllocInner {
56                config,
57                arena,
58                head: AtomicUsize::new(0),
59                frame: AtomicU64::new(0),
60                active_count: AtomicUsize::new(0),
61            }),
62        }
63    }
64
65    /// Begin a new async frame.
66    pub async fn begin_frame(&self) -> MkAsyncFrameGuard {
67        // Wait for all active allocations to be released
68        WaitForEmpty::new(&self.inner.active_count).await;
69        
70        // Increment frame and reset head
71        self.inner.frame.fetch_add(1, Ordering::SeqCst);
72        self.inner.head.store(0, Ordering::SeqCst);
73        
74        MkAsyncFrameGuard {
75            alloc: self.clone(),
76        }
77    }
78
79    /// Get the current frame number.
80    pub fn frame(&self) -> u64 {
81        self.inner.frame.load(Ordering::SeqCst)
82    }
83
84    /// Allocate memory asynchronously.
85    pub async fn alloc<T>(&self) -> Option<*mut T> {
86        let layout = Layout::new::<T>();
87        self.alloc_layout(layout).await.map(|p| p as *mut T)
88    }
89
90    /// Allocate with layout.
91    pub async fn alloc_layout(&self, layout: Layout) -> Option<*mut u8> {
92        let size = layout.size();
93        let align = layout.align();
94        
95        loop {
96            let current = self.inner.head.load(Ordering::Acquire);
97            let aligned = (current + align - 1) & !(align - 1);
98            
99            if aligned + size > self.inner.arena.len() {
100                // Out of memory - apply backpressure
101                match self.inner.config.backpressure {
102                    MkBackpressure::Fail => return None,
103                    MkBackpressure::Wait => {
104                        // Yield and retry
105                        tokio_yield().await;
106                        continue;
107                    }
108                    MkBackpressure::Timeout(duration) => {
109                        // TODO: Implement timeout
110                        let _ = duration;
111                        return None;
112                    }
113                    MkBackpressure::Evict => {
114                        // TODO: Implement eviction
115                        return None;
116                    }
117                }
118            }
119            
120            // Try to bump the head
121            match self.inner.head.compare_exchange_weak(
122                current,
123                aligned + size,
124                Ordering::AcqRel,
125                Ordering::Relaxed,
126            ) {
127                Ok(_) => {
128                    self.inner.active_count.fetch_add(1, Ordering::Relaxed);
129                    let ptr = self.inner.arena.as_ptr() as *mut u8;
130                    return Some(unsafe { ptr.add(aligned) });
131                }
132                Err(_) => continue,
133            }
134        }
135    }
136
137    /// Release an allocation (for tracking).
138    pub fn release(&self) {
139        self.inner.active_count.fetch_sub(1, Ordering::Relaxed);
140    }
141
142    /// Get current memory usage.
143    pub fn used(&self) -> usize {
144        self.inner.head.load(Ordering::Relaxed)
145    }
146
147    /// Get remaining capacity.
148    pub fn remaining(&self) -> usize {
149        self.inner.arena.len() - self.used()
150    }
151}
152
153impl Clone for MkAsyncFrameAlloc {
154    fn clone(&self) -> Self {
155        Self {
156            inner: Arc::clone(&self.inner),
157        }
158    }
159}
160
161/// Guard that represents an active frame.
162pub struct MkAsyncFrameGuard {
163    alloc: MkAsyncFrameAlloc,
164}
165
166impl MkAsyncFrameGuard {
167    /// Get the frame number.
168    pub fn frame(&self) -> u64 {
169        self.alloc.frame()
170    }
171}
172
173impl Drop for MkAsyncFrameGuard {
174    fn drop(&mut self) {
175        // Reset head when frame ends
176        self.alloc.inner.head.store(0, Ordering::SeqCst);
177    }
178}
179
180/// Future that waits for active count to reach zero.
181struct WaitForEmpty<'a> {
182    counter: &'a AtomicUsize,
183}
184
185impl<'a> WaitForEmpty<'a> {
186    fn new(counter: &'a AtomicUsize) -> Self {
187        Self { counter }
188    }
189}
190
191impl<'a> Future for WaitForEmpty<'a> {
192    type Output = ();
193
194    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
195        if self.counter.load(Ordering::Acquire) == 0 {
196            Poll::Ready(())
197        } else {
198            // Wake immediately to retry
199            cx.waker().wake_by_ref();
200            Poll::Pending
201        }
202    }
203}
204
205/// Yield to the async runtime.
206async fn tokio_yield() {
207    struct Yield(bool);
208    
209    impl Future for Yield {
210        type Output = ();
211        
212        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
213            if self.0 {
214                Poll::Ready(())
215            } else {
216                self.0 = true;
217                cx.waker().wake_by_ref();
218                Poll::Pending
219            }
220        }
221    }
222    
223    Yield(false).await
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229
230    #[test]
231    fn test_async_frame_alloc_sync() {
232        let alloc = MkAsyncFrameAlloc::new(MkAsyncFrameConfig::default());
233        assert_eq!(alloc.frame(), 0);
234        assert_eq!(alloc.used(), 0);
235    }
236}