framealloc/allocators/
streaming.rs

1//! Streaming allocator for large assets loaded over time.
2//!
3//! Designed for loading assets like textures, meshes, and audio that:
4//! - Are loaded incrementally (streamed from disk/network)
5//! - Have known final sizes
6//! - May be evicted under memory pressure
7
8use std::alloc::{alloc, dealloc, Layout};
9use std::collections::HashMap;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11
12use crate::sync::mutex::Mutex;
13
14/// Unique identifier for a streaming allocation.
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
16pub struct StreamId(u64);
17
18impl StreamId {
19    /// Get the raw ID value.
20    pub fn raw(&self) -> u64 {
21        self.0
22    }
23}
24
25/// Priority level for streaming allocations.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
27pub enum StreamPriority {
28    /// Can be evicted immediately under pressure
29    Low = 0,
30    /// Normal priority
31    Normal = 1,
32    /// High priority, evict last
33    High = 2,
34    /// Critical, never evict automatically
35    Critical = 3,
36}
37
38impl Default for StreamPriority {
39    fn default() -> Self {
40        Self::Normal
41    }
42}
43
44/// State of a streaming allocation.
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum StreamState {
47    /// Memory reserved but not yet filled
48    Reserved,
49    /// Currently being filled with data
50    Loading,
51    /// Fully loaded and ready to use
52    Ready,
53    /// Marked for eviction
54    Evicting,
55}
56
57/// Metadata for a streaming allocation.
58#[derive(Debug)]
59struct StreamAllocation {
60    /// Unique ID
61    id: StreamId,
62    /// Base pointer
63    ptr: *mut u8,
64    /// Total reserved size
65    reserved_size: usize,
66    /// Currently loaded bytes
67    loaded_bytes: usize,
68    /// Current state
69    state: StreamState,
70    /// Priority for eviction
71    priority: StreamPriority,
72    /// Last access timestamp (frame number)
73    last_access: u64,
74    /// User-defined tag for categorization
75    tag: Option<&'static str>,
76}
77
78/// Streaming allocator for large assets.
79pub struct StreamingAllocator {
80    /// Active allocations
81    allocations: Mutex<HashMap<StreamId, StreamAllocation>>,
82    
83    /// Next allocation ID
84    next_id: AtomicU64,
85    
86    /// Total reserved bytes
87    total_reserved: AtomicUsize,
88    
89    /// Total loaded bytes
90    total_loaded: AtomicUsize,
91    
92    /// Memory budget for streaming
93    budget: usize,
94    
95    /// Current frame number for LRU tracking
96    current_frame: AtomicU64,
97    
98    /// Eviction callback
99    eviction_callback: Mutex<Option<Box<dyn Fn(StreamId) + Send + Sync>>>,
100}
101
102impl StreamingAllocator {
103    /// Create a new streaming allocator with the given budget.
104    pub fn new(budget: usize) -> Self {
105        Self {
106            allocations: Mutex::new(HashMap::new()),
107            next_id: AtomicU64::new(1),
108            total_reserved: AtomicUsize::new(0),
109            total_loaded: AtomicUsize::new(0),
110            budget,
111            current_frame: AtomicU64::new(0),
112            eviction_callback: Mutex::new(None),
113        }
114    }
115
116    /// Set a callback for when allocations are evicted.
117    pub fn set_eviction_callback<F>(&self, callback: F)
118    where
119        F: Fn(StreamId) + Send + Sync + 'static,
120    {
121        let mut cb = self.eviction_callback.lock();
122        *cb = Some(Box::new(callback));
123    }
124
125    /// Reserve memory for a streaming asset.
126    ///
127    /// Returns None if the budget would be exceeded and eviction fails.
128    pub fn reserve(&self, size: usize, priority: StreamPriority) -> Option<StreamId> {
129        self.reserve_tagged(size, priority, None)
130    }
131
132    /// Reserve memory with a tag for categorization.
133    pub fn reserve_tagged(
134        &self,
135        size: usize,
136        priority: StreamPriority,
137        tag: Option<&'static str>,
138    ) -> Option<StreamId> {
139        // Check if we need to evict
140        let current_reserved = self.total_reserved.load(Ordering::Relaxed);
141        if current_reserved + size > self.budget {
142            let needed = (current_reserved + size) - self.budget;
143            if !self.try_evict(needed, priority) {
144                return None; // Cannot make room
145            }
146        }
147
148        // Allocate the memory
149        let layout = Layout::from_size_align(size, 16).ok()?;
150        let ptr = unsafe { alloc(layout) };
151        
152        if ptr.is_null() {
153            return None;
154        }
155
156        let id = StreamId(self.next_id.fetch_add(1, Ordering::Relaxed));
157        let frame = self.current_frame.load(Ordering::Relaxed);
158
159        let allocation = StreamAllocation {
160            id,
161            ptr,
162            reserved_size: size,
163            loaded_bytes: 0,
164            state: StreamState::Reserved,
165            priority,
166            last_access: frame,
167            tag,
168        };
169
170        let mut allocs = self.allocations.lock();
171        allocs.insert(id, allocation);
172        self.total_reserved.fetch_add(size, Ordering::Relaxed);
173
174        Some(id)
175    }
176
177    /// Get a pointer for writing data into a streaming allocation.
178    ///
179    /// Returns None if the ID is invalid or the allocation is not in a writable state.
180    pub fn begin_load(&self, id: StreamId) -> Option<*mut u8> {
181        let mut allocs = self.allocations.lock();
182        let alloc = allocs.get_mut(&id)?;
183
184        match alloc.state {
185            StreamState::Reserved | StreamState::Loading => {
186                alloc.state = StreamState::Loading;
187                Some(alloc.ptr)
188            }
189            _ => None,
190        }
191    }
192
193    /// Report progress on loading.
194    pub fn report_progress(&self, id: StreamId, bytes_loaded: usize) {
195        let mut allocs = self.allocations.lock();
196        if let Some(alloc) = allocs.get_mut(&id) {
197            let old_loaded = alloc.loaded_bytes;
198            alloc.loaded_bytes = bytes_loaded.min(alloc.reserved_size);
199            
200            let delta = alloc.loaded_bytes as isize - old_loaded as isize;
201            if delta > 0 {
202                self.total_loaded.fetch_add(delta as usize, Ordering::Relaxed);
203            } else if delta < 0 {
204                self.total_loaded.fetch_sub((-delta) as usize, Ordering::Relaxed);
205            }
206        }
207    }
208
209    /// Mark a streaming allocation as fully loaded.
210    pub fn finish_load(&self, id: StreamId) {
211        let mut allocs = self.allocations.lock();
212        if let Some(alloc) = allocs.get_mut(&id) {
213            alloc.state = StreamState::Ready;
214            alloc.loaded_bytes = alloc.reserved_size;
215            alloc.last_access = self.current_frame.load(Ordering::Relaxed);
216        }
217    }
218
219    /// Access a ready allocation.
220    ///
221    /// Updates the LRU timestamp.
222    pub fn access(&self, id: StreamId) -> Option<*const u8> {
223        let mut allocs = self.allocations.lock();
224        let alloc = allocs.get_mut(&id)?;
225
226        if alloc.state == StreamState::Ready {
227            alloc.last_access = self.current_frame.load(Ordering::Relaxed);
228            Some(alloc.ptr as *const u8)
229        } else {
230            None
231        }
232    }
233
234    /// Access a ready allocation mutably.
235    pub fn access_mut(&self, id: StreamId) -> Option<*mut u8> {
236        let mut allocs = self.allocations.lock();
237        let alloc = allocs.get_mut(&id)?;
238
239        if alloc.state == StreamState::Ready {
240            alloc.last_access = self.current_frame.load(Ordering::Relaxed);
241            Some(alloc.ptr)
242        } else {
243            None
244        }
245    }
246
247    /// Free a streaming allocation.
248    pub fn free(&self, id: StreamId) {
249        let mut allocs = self.allocations.lock();
250        if let Some(alloc) = allocs.remove(&id) {
251            self.total_reserved.fetch_sub(alloc.reserved_size, Ordering::Relaxed);
252            self.total_loaded.fetch_sub(alloc.loaded_bytes, Ordering::Relaxed);
253            
254            let layout = Layout::from_size_align(alloc.reserved_size, 16)
255                .expect("Invalid layout");
256            unsafe {
257                dealloc(alloc.ptr, layout);
258            }
259        }
260    }
261
262    /// Try to evict allocations to free up the specified amount.
263    ///
264    /// Returns true if enough memory was freed.
265    fn try_evict(&self, bytes_needed: usize, min_priority: StreamPriority) -> bool {
266        let mut allocs = self.allocations.lock();
267        
268        // Collect candidates for eviction (lower priority than requested)
269        let mut candidates: Vec<_> = allocs
270            .values()
271            .filter(|a| a.priority < min_priority && a.state == StreamState::Ready)
272            .map(|a| (a.id, a.priority, a.last_access, a.reserved_size))
273            .collect();
274
275        // Sort by priority (ascending), then by last access (ascending = LRU first)
276        candidates.sort_by(|a, b| {
277            a.1.cmp(&b.1).then_with(|| a.2.cmp(&b.2))
278        });
279
280        let mut freed = 0;
281        let mut to_evict = Vec::new();
282
283        for (id, _, _, size) in candidates {
284            if freed >= bytes_needed {
285                break;
286            }
287            to_evict.push(id);
288            freed += size;
289        }
290
291        // Actually evict
292        for id in &to_evict {
293            if let Some(alloc) = allocs.remove(id) {
294                self.total_reserved.fetch_sub(alloc.reserved_size, Ordering::Relaxed);
295                self.total_loaded.fetch_sub(alloc.loaded_bytes, Ordering::Relaxed);
296                
297                let layout = Layout::from_size_align(alloc.reserved_size, 16)
298                    .expect("Invalid layout");
299                unsafe {
300                    dealloc(alloc.ptr, layout);
301                }
302            }
303        }
304
305        drop(allocs); // Release lock before callback
306
307        // Notify about evictions
308        if let Some(ref callback) = *self.eviction_callback.lock() {
309            for id in to_evict {
310                callback(id);
311            }
312        }
313
314        freed >= bytes_needed
315    }
316
317    /// Advance to the next frame (for LRU tracking).
318    pub fn next_frame(&self) {
319        self.current_frame.fetch_add(1, Ordering::Relaxed);
320    }
321
322    /// Get the current memory budget.
323    pub fn budget(&self) -> usize {
324        self.budget
325    }
326
327    /// Get total reserved bytes.
328    pub fn total_reserved(&self) -> usize {
329        self.total_reserved.load(Ordering::Relaxed)
330    }
331
332    /// Get total loaded bytes.
333    pub fn total_loaded(&self) -> usize {
334        self.total_loaded.load(Ordering::Relaxed)
335    }
336
337    /// Get available budget.
338    pub fn available(&self) -> usize {
339        self.budget.saturating_sub(self.total_reserved.load(Ordering::Relaxed))
340    }
341
342    /// Get the state of an allocation.
343    pub fn state(&self, id: StreamId) -> Option<StreamState> {
344        let allocs = self.allocations.lock();
345        allocs.get(&id).map(|a| a.state)
346    }
347
348    /// Get statistics about streaming allocations.
349    pub fn stats(&self) -> StreamingStats {
350        let allocs = self.allocations.lock();
351        
352        let mut stats = StreamingStats {
353            budget: self.budget,
354            total_reserved: self.total_reserved.load(Ordering::Relaxed),
355            total_loaded: self.total_loaded.load(Ordering::Relaxed),
356            allocation_count: allocs.len(),
357            reserved_count: 0,
358            loading_count: 0,
359            ready_count: 0,
360        };
361
362        for alloc in allocs.values() {
363            match alloc.state {
364                StreamState::Reserved => stats.reserved_count += 1,
365                StreamState::Loading => stats.loading_count += 1,
366                StreamState::Ready => stats.ready_count += 1,
367                StreamState::Evicting => {}
368            }
369        }
370
371        stats
372    }
373}
374
375// SAFETY: StreamingAllocator uses internal synchronization
376unsafe impl Send for StreamingAllocator {}
377unsafe impl Sync for StreamingAllocator {}
378
379/// Statistics about streaming allocations.
380#[derive(Debug, Clone, Default)]
381pub struct StreamingStats {
382    /// Total budget
383    pub budget: usize,
384    /// Total reserved bytes
385    pub total_reserved: usize,
386    /// Total loaded bytes
387    pub total_loaded: usize,
388    /// Number of active allocations
389    pub allocation_count: usize,
390    /// Allocations in Reserved state
391    pub reserved_count: usize,
392    /// Allocations in Loading state
393    pub loading_count: usize,
394    /// Allocations in Ready state
395    pub ready_count: usize,
396}
397
398impl StreamingStats {
399    /// Calculate budget utilization percentage.
400    pub fn utilization_percent(&self) -> f64 {
401        if self.budget == 0 {
402            0.0
403        } else {
404            (self.total_reserved as f64 / self.budget as f64) * 100.0
405        }
406    }
407
408    /// Calculate load progress percentage.
409    pub fn load_progress_percent(&self) -> f64 {
410        if self.total_reserved == 0 {
411            100.0
412        } else {
413            (self.total_loaded as f64 / self.total_reserved as f64) * 100.0
414        }
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421
422    #[test]
423    fn test_reserve_and_load() {
424        let streaming = StreamingAllocator::new(1024 * 1024); // 1MB budget
425        
426        let id = streaming.reserve(1024, StreamPriority::Normal).unwrap();
427        assert_eq!(streaming.state(id), Some(StreamState::Reserved));
428        
429        let ptr = streaming.begin_load(id).unwrap();
430        assert!(!ptr.is_null());
431        assert_eq!(streaming.state(id), Some(StreamState::Loading));
432        
433        streaming.report_progress(id, 512);
434        streaming.finish_load(id);
435        assert_eq!(streaming.state(id), Some(StreamState::Ready));
436        
437        let read_ptr = streaming.access(id).unwrap();
438        assert!(!read_ptr.is_null());
439        
440        streaming.free(id);
441        assert_eq!(streaming.state(id), None);
442    }
443
444    #[test]
445    fn test_budget_enforcement() {
446        let streaming = StreamingAllocator::new(1024); // 1KB budget
447        
448        // Should succeed
449        let id1 = streaming.reserve(512, StreamPriority::Normal);
450        assert!(id1.is_some());
451        
452        // Should succeed (just fits)
453        let id2 = streaming.reserve(512, StreamPriority::Normal);
454        assert!(id2.is_some());
455        
456        // Should fail (over budget, nothing to evict)
457        let id3 = streaming.reserve(512, StreamPriority::Critical);
458        assert!(id3.is_none());
459    }
460
461    #[test]
462    fn test_eviction() {
463        let streaming = StreamingAllocator::new(1024);
464        
465        // Fill with low priority
466        let id1 = streaming.reserve(512, StreamPriority::Low).unwrap();
467        streaming.finish_load(id1);
468        
469        let id2 = streaming.reserve(512, StreamPriority::Low).unwrap();
470        streaming.finish_load(id2);
471        
472        // High priority should evict low priority
473        let id3 = streaming.reserve(512, StreamPriority::High);
474        assert!(id3.is_some());
475        
476        // One of the low priority allocations should be gone
477        let remaining = [id1, id2].iter().filter(|id| streaming.state(**id).is_some()).count();
478        assert_eq!(remaining, 1);
479    }
480}