1use std::alloc::{alloc, dealloc, Layout};
9use std::collections::HashMap;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11
12use crate::sync::mutex::Mutex;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
16pub struct StreamId(u64);
17
18impl StreamId {
19 pub fn raw(&self) -> u64 {
21 self.0
22 }
23}
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
27pub enum StreamPriority {
28 Low = 0,
30 Normal = 1,
32 High = 2,
34 Critical = 3,
36}
37
38impl Default for StreamPriority {
39 fn default() -> Self {
40 Self::Normal
41 }
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46pub enum StreamState {
47 Reserved,
49 Loading,
51 Ready,
53 Evicting,
55}
56
57#[derive(Debug)]
59struct StreamAllocation {
60 id: StreamId,
62 ptr: *mut u8,
64 reserved_size: usize,
66 loaded_bytes: usize,
68 state: StreamState,
70 priority: StreamPriority,
72 last_access: u64,
74 tag: Option<&'static str>,
76}
77
78pub struct StreamingAllocator {
80 allocations: Mutex<HashMap<StreamId, StreamAllocation>>,
82
83 next_id: AtomicU64,
85
86 total_reserved: AtomicUsize,
88
89 total_loaded: AtomicUsize,
91
92 budget: usize,
94
95 current_frame: AtomicU64,
97
98 eviction_callback: Mutex<Option<Box<dyn Fn(StreamId) + Send + Sync>>>,
100}
101
102impl StreamingAllocator {
103 pub fn new(budget: usize) -> Self {
105 Self {
106 allocations: Mutex::new(HashMap::new()),
107 next_id: AtomicU64::new(1),
108 total_reserved: AtomicUsize::new(0),
109 total_loaded: AtomicUsize::new(0),
110 budget,
111 current_frame: AtomicU64::new(0),
112 eviction_callback: Mutex::new(None),
113 }
114 }
115
116 pub fn set_eviction_callback<F>(&self, callback: F)
118 where
119 F: Fn(StreamId) + Send + Sync + 'static,
120 {
121 let mut cb = self.eviction_callback.lock();
122 *cb = Some(Box::new(callback));
123 }
124
125 pub fn reserve(&self, size: usize, priority: StreamPriority) -> Option<StreamId> {
129 self.reserve_tagged(size, priority, None)
130 }
131
132 pub fn reserve_tagged(
134 &self,
135 size: usize,
136 priority: StreamPriority,
137 tag: Option<&'static str>,
138 ) -> Option<StreamId> {
139 let current_reserved = self.total_reserved.load(Ordering::Relaxed);
141 if current_reserved + size > self.budget {
142 let needed = (current_reserved + size) - self.budget;
143 if !self.try_evict(needed, priority) {
144 return None; }
146 }
147
148 let layout = Layout::from_size_align(size, 16).ok()?;
150 let ptr = unsafe { alloc(layout) };
151
152 if ptr.is_null() {
153 return None;
154 }
155
156 let id = StreamId(self.next_id.fetch_add(1, Ordering::Relaxed));
157 let frame = self.current_frame.load(Ordering::Relaxed);
158
159 let allocation = StreamAllocation {
160 id,
161 ptr,
162 reserved_size: size,
163 loaded_bytes: 0,
164 state: StreamState::Reserved,
165 priority,
166 last_access: frame,
167 tag,
168 };
169
170 let mut allocs = self.allocations.lock();
171 allocs.insert(id, allocation);
172 self.total_reserved.fetch_add(size, Ordering::Relaxed);
173
174 Some(id)
175 }
176
177 pub fn begin_load(&self, id: StreamId) -> Option<*mut u8> {
181 let mut allocs = self.allocations.lock();
182 let alloc = allocs.get_mut(&id)?;
183
184 match alloc.state {
185 StreamState::Reserved | StreamState::Loading => {
186 alloc.state = StreamState::Loading;
187 Some(alloc.ptr)
188 }
189 _ => None,
190 }
191 }
192
193 pub fn report_progress(&self, id: StreamId, bytes_loaded: usize) {
195 let mut allocs = self.allocations.lock();
196 if let Some(alloc) = allocs.get_mut(&id) {
197 let old_loaded = alloc.loaded_bytes;
198 alloc.loaded_bytes = bytes_loaded.min(alloc.reserved_size);
199
200 let delta = alloc.loaded_bytes as isize - old_loaded as isize;
201 if delta > 0 {
202 self.total_loaded.fetch_add(delta as usize, Ordering::Relaxed);
203 } else if delta < 0 {
204 self.total_loaded.fetch_sub((-delta) as usize, Ordering::Relaxed);
205 }
206 }
207 }
208
209 pub fn finish_load(&self, id: StreamId) {
211 let mut allocs = self.allocations.lock();
212 if let Some(alloc) = allocs.get_mut(&id) {
213 alloc.state = StreamState::Ready;
214 alloc.loaded_bytes = alloc.reserved_size;
215 alloc.last_access = self.current_frame.load(Ordering::Relaxed);
216 }
217 }
218
219 pub fn access(&self, id: StreamId) -> Option<*const u8> {
223 let mut allocs = self.allocations.lock();
224 let alloc = allocs.get_mut(&id)?;
225
226 if alloc.state == StreamState::Ready {
227 alloc.last_access = self.current_frame.load(Ordering::Relaxed);
228 Some(alloc.ptr as *const u8)
229 } else {
230 None
231 }
232 }
233
234 pub fn access_mut(&self, id: StreamId) -> Option<*mut u8> {
236 let mut allocs = self.allocations.lock();
237 let alloc = allocs.get_mut(&id)?;
238
239 if alloc.state == StreamState::Ready {
240 alloc.last_access = self.current_frame.load(Ordering::Relaxed);
241 Some(alloc.ptr)
242 } else {
243 None
244 }
245 }
246
247 pub fn free(&self, id: StreamId) {
249 let mut allocs = self.allocations.lock();
250 if let Some(alloc) = allocs.remove(&id) {
251 self.total_reserved.fetch_sub(alloc.reserved_size, Ordering::Relaxed);
252 self.total_loaded.fetch_sub(alloc.loaded_bytes, Ordering::Relaxed);
253
254 let layout = Layout::from_size_align(alloc.reserved_size, 16)
255 .expect("Invalid layout");
256 unsafe {
257 dealloc(alloc.ptr, layout);
258 }
259 }
260 }
261
262 fn try_evict(&self, bytes_needed: usize, min_priority: StreamPriority) -> bool {
266 let mut allocs = self.allocations.lock();
267
268 let mut candidates: Vec<_> = allocs
270 .values()
271 .filter(|a| a.priority < min_priority && a.state == StreamState::Ready)
272 .map(|a| (a.id, a.priority, a.last_access, a.reserved_size))
273 .collect();
274
275 candidates.sort_by(|a, b| {
277 a.1.cmp(&b.1).then_with(|| a.2.cmp(&b.2))
278 });
279
280 let mut freed = 0;
281 let mut to_evict = Vec::new();
282
283 for (id, _, _, size) in candidates {
284 if freed >= bytes_needed {
285 break;
286 }
287 to_evict.push(id);
288 freed += size;
289 }
290
291 for id in &to_evict {
293 if let Some(alloc) = allocs.remove(id) {
294 self.total_reserved.fetch_sub(alloc.reserved_size, Ordering::Relaxed);
295 self.total_loaded.fetch_sub(alloc.loaded_bytes, Ordering::Relaxed);
296
297 let layout = Layout::from_size_align(alloc.reserved_size, 16)
298 .expect("Invalid layout");
299 unsafe {
300 dealloc(alloc.ptr, layout);
301 }
302 }
303 }
304
305 drop(allocs); if let Some(ref callback) = *self.eviction_callback.lock() {
309 for id in to_evict {
310 callback(id);
311 }
312 }
313
314 freed >= bytes_needed
315 }
316
317 pub fn next_frame(&self) {
319 self.current_frame.fetch_add(1, Ordering::Relaxed);
320 }
321
322 pub fn budget(&self) -> usize {
324 self.budget
325 }
326
327 pub fn total_reserved(&self) -> usize {
329 self.total_reserved.load(Ordering::Relaxed)
330 }
331
332 pub fn total_loaded(&self) -> usize {
334 self.total_loaded.load(Ordering::Relaxed)
335 }
336
337 pub fn available(&self) -> usize {
339 self.budget.saturating_sub(self.total_reserved.load(Ordering::Relaxed))
340 }
341
342 pub fn state(&self, id: StreamId) -> Option<StreamState> {
344 let allocs = self.allocations.lock();
345 allocs.get(&id).map(|a| a.state)
346 }
347
348 pub fn stats(&self) -> StreamingStats {
350 let allocs = self.allocations.lock();
351
352 let mut stats = StreamingStats {
353 budget: self.budget,
354 total_reserved: self.total_reserved.load(Ordering::Relaxed),
355 total_loaded: self.total_loaded.load(Ordering::Relaxed),
356 allocation_count: allocs.len(),
357 reserved_count: 0,
358 loading_count: 0,
359 ready_count: 0,
360 };
361
362 for alloc in allocs.values() {
363 match alloc.state {
364 StreamState::Reserved => stats.reserved_count += 1,
365 StreamState::Loading => stats.loading_count += 1,
366 StreamState::Ready => stats.ready_count += 1,
367 StreamState::Evicting => {}
368 }
369 }
370
371 stats
372 }
373}
374
375unsafe impl Send for StreamingAllocator {}
377unsafe impl Sync for StreamingAllocator {}
378
379#[derive(Debug, Clone, Default)]
381pub struct StreamingStats {
382 pub budget: usize,
384 pub total_reserved: usize,
386 pub total_loaded: usize,
388 pub allocation_count: usize,
390 pub reserved_count: usize,
392 pub loading_count: usize,
394 pub ready_count: usize,
396}
397
398impl StreamingStats {
399 pub fn utilization_percent(&self) -> f64 {
401 if self.budget == 0 {
402 0.0
403 } else {
404 (self.total_reserved as f64 / self.budget as f64) * 100.0
405 }
406 }
407
408 pub fn load_progress_percent(&self) -> f64 {
410 if self.total_reserved == 0 {
411 100.0
412 } else {
413 (self.total_loaded as f64 / self.total_reserved as f64) * 100.0
414 }
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421
422 #[test]
423 fn test_reserve_and_load() {
424 let streaming = StreamingAllocator::new(1024 * 1024); let id = streaming.reserve(1024, StreamPriority::Normal).unwrap();
427 assert_eq!(streaming.state(id), Some(StreamState::Reserved));
428
429 let ptr = streaming.begin_load(id).unwrap();
430 assert!(!ptr.is_null());
431 assert_eq!(streaming.state(id), Some(StreamState::Loading));
432
433 streaming.report_progress(id, 512);
434 streaming.finish_load(id);
435 assert_eq!(streaming.state(id), Some(StreamState::Ready));
436
437 let read_ptr = streaming.access(id).unwrap();
438 assert!(!read_ptr.is_null());
439
440 streaming.free(id);
441 assert_eq!(streaming.state(id), None);
442 }
443
444 #[test]
445 fn test_budget_enforcement() {
446 let streaming = StreamingAllocator::new(1024); let id1 = streaming.reserve(512, StreamPriority::Normal);
450 assert!(id1.is_some());
451
452 let id2 = streaming.reserve(512, StreamPriority::Normal);
454 assert!(id2.is_some());
455
456 let id3 = streaming.reserve(512, StreamPriority::Critical);
458 assert!(id3.is_none());
459 }
460
461 #[test]
462 fn test_eviction() {
463 let streaming = StreamingAllocator::new(1024);
464
465 let id1 = streaming.reserve(512, StreamPriority::Low).unwrap();
467 streaming.finish_load(id1);
468
469 let id2 = streaming.reserve(512, StreamPriority::Low).unwrap();
470 streaming.finish_load(id2);
471
472 let id3 = streaming.reserve(512, StreamPriority::High);
474 assert!(id3.is_some());
475
476 let remaining = [id1, id2].iter().filter(|id| streaming.state(**id).is_some()).count();
478 assert_eq!(remaining, 1);
479 }
480}