memscope_rs/async_memory/
buffer.rs1use std::cell::UnsafeCell;
7use std::sync::atomic::{AtomicUsize, Ordering};
8
9use crate::async_memory::error::{AsyncError, AsyncResult, BufferType};
10use crate::async_memory::task_id::TaskId;
11use crate::async_memory::DEFAULT_BUFFER_SIZE;
12
13#[repr(C, align(64))]
18#[derive(Clone, Copy, Debug)]
19pub struct AllocationEvent {
20 pub task_id: TaskId,
22 pub ptr: usize,
24 pub size: usize,
26 pub timestamp: u64,
28 pub event_type: u8,
30 _padding: [u8; 31],
32}
33
34impl AllocationEvent {
35 pub fn allocation(task_id: TaskId, ptr: usize, size: usize, timestamp: u64) -> Self {
37 Self {
38 task_id,
39 ptr,
40 size,
41 timestamp,
42 event_type: 0,
43 _padding: [0; 31],
44 }
45 }
46
47 pub fn deallocation(task_id: TaskId, ptr: usize, size: usize, timestamp: u64) -> Self {
49 Self {
50 task_id,
51 ptr,
52 size,
53 timestamp,
54 event_type: 1,
55 _padding: [0; 31],
56 }
57 }
58
59 pub fn is_allocation(&self) -> bool {
61 self.event_type == 0
62 }
63
64 pub fn is_deallocation(&self) -> bool {
66 self.event_type == 1
67 }
68}
69
70pub struct EventBuffer {
76 events: UnsafeCell<Box<[AllocationEvent]>>,
78 write_pos: AtomicUsize,
80 read_pos: AtomicUsize,
82 dropped_events: AtomicUsize,
84 mask: usize,
86}
87
88impl EventBuffer {
89 pub fn new() -> Self {
91 Self::with_capacity(DEFAULT_BUFFER_SIZE)
92 }
93
94 fn with_capacity(capacity: usize) -> Self {
96 assert!(
97 capacity.is_power_of_two(),
98 "Buffer capacity must be power of 2"
99 );
100
101 let mut events = Vec::with_capacity(capacity);
103 events.resize(capacity, AllocationEvent::allocation(0, 0, 0, 0));
104 let events_box = events.into_boxed_slice();
105
106 Self {
107 events: UnsafeCell::new(events_box),
108 write_pos: AtomicUsize::new(0),
109 read_pos: AtomicUsize::new(0),
110 dropped_events: AtomicUsize::new(0),
111 mask: capacity - 1,
112 }
113 }
114
115 #[cfg(test)]
117 fn new_test() -> Self {
118 Self::with_capacity(1024) }
120
121 #[inline(always)]
126 pub fn push(&self, event: AllocationEvent) -> AsyncResult<()> {
127 let write_pos = self.write_pos.load(Ordering::Relaxed);
128 let next_write = (write_pos + 1) & self.mask;
129 let read_pos = self.read_pos.load(Ordering::Acquire);
130
131 if next_write == read_pos {
132 let dropped = self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1;
134 return Err(AsyncError::buffer_management(
135 BufferType::AllocationEvents,
136 "Ring buffer overflow - event dropped",
137 Some(dropped),
138 ));
139 }
140
141 unsafe {
144 let events_ptr = self.events.get();
145 let event_ptr = (*events_ptr).as_mut_ptr().add(write_pos);
146 std::ptr::write_volatile(event_ptr, event);
147 }
148
149 self.write_pos.store(next_write, Ordering::Release);
151 Ok(())
152 }
153
154 pub fn pop(&self) -> Option<AllocationEvent> {
159 let read_pos = self.read_pos.load(Ordering::Relaxed);
160 let write_pos = self.write_pos.load(Ordering::Acquire);
161
162 if read_pos == write_pos {
163 return None; }
165
166 let event = unsafe {
168 let events_ptr = self.events.get();
169 let event_ptr = (*events_ptr).as_ptr().add(read_pos);
170 std::ptr::read_volatile(event_ptr)
171 };
172
173 let next_read = (read_pos + 1) & self.mask;
175 self.read_pos.store(next_read, Ordering::Release);
176
177 Some(event)
178 }
179
180 pub fn len(&self) -> usize {
182 let write_pos = self.write_pos.load(Ordering::Acquire);
183 let read_pos = self.read_pos.load(Ordering::Acquire);
184 (write_pos.wrapping_sub(read_pos)) & self.mask
185 }
186
187 pub fn is_empty(&self) -> bool {
189 let write_pos = self.write_pos.load(Ordering::Acquire);
190 let read_pos = self.read_pos.load(Ordering::Acquire);
191 write_pos == read_pos
192 }
193
194 pub fn capacity(&self) -> usize {
196 (self.mask + 1) - 1 }
198
199 pub fn dropped_count(&self) -> usize {
201 self.dropped_events.load(Ordering::Relaxed)
202 }
203
204 #[cfg(test)]
206 pub fn reset_dropped_count(&self) {
207 self.dropped_events.store(0, Ordering::Relaxed);
208 }
209
210 pub fn drain(&self) -> Vec<AllocationEvent> {
215 let mut events = Vec::new();
216 while let Some(event) = self.pop() {
217 events.push(event);
218 }
219 events
220 }
221}
222
223impl Default for EventBuffer {
224 fn default() -> Self {
225 Self::new()
226 }
227}
228
229unsafe impl Sync for EventBuffer {}
234unsafe impl Send for EventBuffer {}
235
236thread_local! {
241 static THREAD_EVENT_BUFFER: UnsafeCell<EventBuffer> = UnsafeCell::new(EventBuffer::new());
242}
243
244#[inline(always)]
249pub fn with_thread_buffer<F, R>(f: F) -> R
250where
251 F: FnOnce(&EventBuffer) -> R,
252{
253 THREAD_EVENT_BUFFER.with(|buffer| {
254 let buffer_ref = unsafe { &*buffer.get() };
256 f(buffer_ref)
257 })
258}
259
260#[inline(always)]
265pub fn record_allocation_event(
266 task_id: TaskId,
267 ptr: usize,
268 size: usize,
269 timestamp: u64,
270 is_allocation: bool,
271) -> AsyncResult<()> {
272 let event = if is_allocation {
273 AllocationEvent::allocation(task_id, ptr, size, timestamp)
274 } else {
275 AllocationEvent::deallocation(task_id, ptr, size, timestamp)
276 };
277
278 with_thread_buffer(|buffer| buffer.push(event))
279}
280
281pub fn collect_all_events() -> Vec<AllocationEvent> {
287 with_thread_buffer(|buffer| buffer.drain())
290}
291
292#[derive(Debug, Clone)]
294pub struct BufferStats {
295 pub current_events: usize,
297 pub capacity: usize,
299 pub events_dropped: usize,
301 pub utilization: f64,
303}
304
305impl BufferStats {
306 pub fn warning_level(&self) -> Option<&'static str> {
308 match self.utilization {
309 u if u >= 0.95 => Some("critical"),
310 u if u >= 0.85 => Some("high"),
311 u if u >= 0.75 => Some("medium"),
312 _ => None,
313 }
314 }
315}
316
317pub fn get_buffer_stats() -> BufferStats {
319 with_thread_buffer(|buffer| {
320 let current_events = buffer.len();
321 let capacity = buffer.capacity();
322 let events_dropped = buffer.dropped_count();
323 let utilization = current_events as f64 / capacity as f64;
324
325 BufferStats {
326 current_events,
327 capacity,
328 events_dropped,
329 utilization,
330 }
331 })
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337
338 #[test]
339 fn test_allocation_event_creation() {
340 let alloc_event = AllocationEvent::allocation(12345, 0x1000, 1024, 567890);
341 assert_eq!(alloc_event.task_id, 12345);
342 assert_eq!(alloc_event.ptr, 0x1000);
343 assert_eq!(alloc_event.size, 1024);
344 assert_eq!(alloc_event.timestamp, 567890);
345 assert!(alloc_event.is_allocation());
346 assert!(!alloc_event.is_deallocation());
347
348 let dealloc_event = AllocationEvent::deallocation(12345, 0x1000, 1024, 567891);
349 assert!(dealloc_event.is_deallocation());
350 assert!(!dealloc_event.is_allocation());
351 }
352
353 #[test]
354 fn test_event_buffer_basic_operations() {
355 let buffer = EventBuffer::new_test();
356 assert!(buffer.is_empty());
357 assert_eq!(buffer.len(), 0);
358 assert_eq!(buffer.dropped_count(), 0);
359
360 let event = AllocationEvent::allocation(1, 0x1000, 100, 123);
361 buffer.push(event).expect("Failed to push event");
362
363 assert!(!buffer.is_empty());
364 assert_eq!(buffer.len(), 1);
365
366 let popped = buffer.pop().expect("Failed to pop event");
367 assert_eq!(popped.task_id, 1);
368 assert_eq!(popped.ptr, 0x1000);
369 assert_eq!(popped.size, 100);
370
371 assert!(buffer.is_empty());
372 assert!(buffer.pop().is_none());
373 }
374
375 #[test]
376 fn test_buffer_overflow_handling() {
377 let buffer = EventBuffer::new_test();
378 buffer.reset_dropped_count();
379
380 let capacity = buffer.capacity();
382 for i in 0..capacity {
383 let event = AllocationEvent::allocation(i as TaskId, i, 100, i as u64);
384 buffer.push(event).expect("Failed to push event");
385 }
386
387 let overflow_event = AllocationEvent::allocation(99999, 0x9999, 100, 99999);
389 let result = buffer.push(overflow_event);
390 assert!(result.is_err());
391 assert_eq!(buffer.dropped_count(), 1);
392
393 assert_eq!(buffer.len(), capacity);
395
396 buffer.pop().expect("Failed to pop event");
398 buffer
399 .push(overflow_event)
400 .expect("Failed to push after pop");
401 }
402
403 #[test]
404 fn test_buffer_wraparound() {
405 let buffer = EventBuffer::new_test();
406 let capacity = buffer.capacity();
407
408 for round in 0..3 {
410 for i in 0..capacity / 2 {
411 let event = AllocationEvent::allocation(
412 (round * 1000 + i) as TaskId,
413 i,
414 100,
415 (round * 1000 + i) as u64,
416 );
417 buffer.push(event).expect("Failed to push event");
418 }
419
420 let events = buffer.drain();
421 assert_eq!(events.len(), capacity / 2);
422
423 for (i, event) in events.iter().enumerate() {
425 assert_eq!(event.task_id, (round * 1000 + i) as TaskId);
426 }
427 }
428 }
429
430 #[test]
431 fn test_thread_local_buffer() {
432 use std::thread;
433
434 let handle1 = thread::spawn(|| {
436 record_allocation_event(1, 0x1000, 100, 123, true).expect("Failed to record");
437 get_buffer_stats().current_events
438 });
439
440 let handle2 = thread::spawn(|| {
441 record_allocation_event(2, 0x2000, 200, 456, true).expect("Failed to record");
442 get_buffer_stats().current_events
443 });
444
445 assert_eq!(handle1.join().expect("Thread 1 panicked"), 1);
446 assert_eq!(handle2.join().expect("Thread 2 panicked"), 1);
447
448 assert_eq!(get_buffer_stats().current_events, 0);
450 }
451
452 #[test]
453 fn test_buffer_stats() {
454 with_thread_buffer(|buffer| buffer.reset_dropped_count());
455
456 let stats = get_buffer_stats();
458 assert_eq!(stats.current_events, 0);
459 assert_eq!(stats.utilization, 0.0);
460 assert!(stats.warning_level().is_none());
461
462 let capacity = get_buffer_stats().capacity;
464 let high_fill = (capacity as f64 * 0.9) as usize;
465
466 for i in 0..high_fill {
467 record_allocation_event(i as TaskId, i, 100, i as u64, true).expect("Failed to record");
468 }
469
470 let stats = get_buffer_stats();
471 assert!(stats.utilization >= 0.85);
472 assert!(stats.warning_level().is_some());
473 }
474
475 #[test]
476 fn test_concurrent_producer_consumer() {
477 use std::sync::Arc;
478 use std::thread;
479 use std::time::Duration;
480
481 let buffer = Arc::new(EventBuffer::new_test());
482 let producer_buffer = Arc::clone(&buffer);
483 let consumer_buffer = Arc::clone(&buffer);
484
485 let producer = thread::spawn(move || {
487 for i in 0..1000 {
488 let event =
489 AllocationEvent::allocation(i as TaskId, (i * 1000) as usize, 100, i as u64);
490 let _ = producer_buffer.push(event);
492 }
493 });
494
495 let consumer = thread::spawn(move || {
497 let mut consumed = 0;
498 let mut last_task_id = None;
499
500 for _ in 0..100 {
501 while let Some(event) = consumer_buffer.pop() {
502 if let Some(last_id) = last_task_id {
504 assert!(event.task_id >= last_id);
505 }
506 last_task_id = Some(event.task_id);
507 consumed += 1;
508 }
509 thread::sleep(Duration::from_micros(10));
510 }
511 consumed
512 });
513
514 producer.join().expect("Producer thread panicked");
515 let consumed = consumer.join().expect("Consumer thread panicked");
516
517 assert!(consumed > 0);
519 assert!(consumed <= 1000);
520 }
521}