memscope_rs/async_memory/
buffer.rs

1//! Lock-free event buffering for async memory tracking
2//!
3//! Provides high-performance, lock-free ring buffers for collecting allocation
4//! events from async tasks with quality monitoring and overflow handling.
5
6use std::cell::UnsafeCell;
7use std::sync::atomic::{AtomicUsize, Ordering};
8
9use crate::async_memory::error::{AsyncError, AsyncResult, BufferType};
10use crate::async_memory::task_id::TaskId;
11use crate::async_memory::DEFAULT_BUFFER_SIZE;
12
13/// Memory allocation or deallocation event
14///
15/// Optimized for cache efficiency with 64-byte alignment to avoid false sharing.
16/// Uses minimal fields to reduce memory overhead while capturing essential data.
17#[repr(C, align(64))]
18#[derive(Clone, Copy, Debug)]
19pub struct AllocationEvent {
20    /// Unique task identifier
21    pub task_id: TaskId,
22    /// Memory pointer address
23    pub ptr: usize,
24    /// Allocation size in bytes
25    pub size: usize,
26    /// Timestamp (TSC ticks or nanoseconds)
27    pub timestamp: u64,
28    /// Event type: 0=allocation, 1=deallocation
29    pub event_type: u8,
30    /// Reserved for future use, ensures 64-byte alignment
31    _padding: [u8; 31],
32}
33
34impl AllocationEvent {
35    /// Create new allocation event
36    pub fn allocation(task_id: TaskId, ptr: usize, size: usize, timestamp: u64) -> Self {
37        Self {
38            task_id,
39            ptr,
40            size,
41            timestamp,
42            event_type: 0,
43            _padding: [0; 31],
44        }
45    }
46
47    /// Create new deallocation event
48    pub fn deallocation(task_id: TaskId, ptr: usize, size: usize, timestamp: u64) -> Self {
49        Self {
50            task_id,
51            ptr,
52            size,
53            timestamp,
54            event_type: 1,
55            _padding: [0; 31],
56        }
57    }
58
59    /// Check if this is an allocation event
60    pub fn is_allocation(&self) -> bool {
61        self.event_type == 0
62    }
63
64    /// Check if this is a deallocation event
65    pub fn is_deallocation(&self) -> bool {
66        self.event_type == 1
67    }
68}
69
70/// Lock-free ring buffer for allocation events
71///
72/// Uses single-producer, single-consumer (SPSC) design where the producer
73/// (allocator hook) runs on the application thread and the consumer
74/// (aggregator) runs on a dedicated background thread.
75pub struct EventBuffer {
76    /// Pre-allocated ring buffer storage using UnsafeCell for interior mutability
77    events: UnsafeCell<Box<[AllocationEvent]>>,
78    /// Write position (modified only by producer)
79    write_pos: AtomicUsize,
80    /// Read position (modified only by consumer)
81    read_pos: AtomicUsize,
82    /// Count of dropped events due to buffer overflow
83    dropped_events: AtomicUsize,
84    /// Buffer size mask for efficient modulo operations
85    mask: usize,
86}
87
88impl EventBuffer {
89    /// Create new event buffer with default size
90    pub fn new() -> Self {
91        Self::with_capacity(DEFAULT_BUFFER_SIZE)
92    }
93
94    /// Create new event buffer with specified capacity (must be power of 2)
95    fn with_capacity(capacity: usize) -> Self {
96        assert!(
97            capacity.is_power_of_two(),
98            "Buffer capacity must be power of 2"
99        );
100
101        // Create buffer on heap to avoid stack overflow
102        let mut events = Vec::with_capacity(capacity);
103        events.resize(capacity, AllocationEvent::allocation(0, 0, 0, 0));
104        let events_box = events.into_boxed_slice();
105
106        Self {
107            events: UnsafeCell::new(events_box),
108            write_pos: AtomicUsize::new(0),
109            read_pos: AtomicUsize::new(0),
110            dropped_events: AtomicUsize::new(0),
111            mask: capacity - 1,
112        }
113    }
114
115    /// Create small buffer for testing (avoids long test times)
116    #[cfg(test)]
117    fn new_test() -> Self {
118        Self::with_capacity(1024) // 1K events for testing
119    }
120
121    /// Push event to buffer (producer side)
122    ///
123    /// Returns Ok(()) if successful, Err with drop count if buffer full.
124    /// Never blocks - drops events instead to maintain real-time performance.
125    #[inline(always)]
126    pub fn push(&self, event: AllocationEvent) -> AsyncResult<()> {
127        let write_pos = self.write_pos.load(Ordering::Relaxed);
128        let next_write = (write_pos + 1) & self.mask;
129        let read_pos = self.read_pos.load(Ordering::Acquire);
130
131        if next_write == read_pos {
132            // Buffer full - record drop and return error
133            let dropped = self.dropped_events.fetch_add(1, Ordering::Relaxed) + 1;
134            return Err(AsyncError::buffer_management(
135                BufferType::AllocationEvents,
136                "Ring buffer overflow - event dropped",
137                Some(dropped),
138            ));
139        }
140
141        // Safe write - only this thread writes to this position
142        // Use UnsafeCell to get mutable access
143        unsafe {
144            let events_ptr = self.events.get();
145            let event_ptr = (*events_ptr).as_mut_ptr().add(write_pos);
146            std::ptr::write_volatile(event_ptr, event);
147        }
148
149        // Publish write with release ordering
150        self.write_pos.store(next_write, Ordering::Release);
151        Ok(())
152    }
153
154    /// Pop event from buffer (consumer side)
155    ///
156    /// Returns Some(event) if available, None if buffer empty.
157    /// Only called by the aggregator thread.
158    pub fn pop(&self) -> Option<AllocationEvent> {
159        let read_pos = self.read_pos.load(Ordering::Relaxed);
160        let write_pos = self.write_pos.load(Ordering::Acquire);
161
162        if read_pos == write_pos {
163            return None; // Buffer empty
164        }
165
166        // Safe read - only this thread reads from this position
167        let event = unsafe {
168            let events_ptr = self.events.get();
169            let event_ptr = (*events_ptr).as_ptr().add(read_pos);
170            std::ptr::read_volatile(event_ptr)
171        };
172
173        // Advance read position
174        let next_read = (read_pos + 1) & self.mask;
175        self.read_pos.store(next_read, Ordering::Release);
176
177        Some(event)
178    }
179
180    /// Get current number of events in buffer
181    pub fn len(&self) -> usize {
182        let write_pos = self.write_pos.load(Ordering::Acquire);
183        let read_pos = self.read_pos.load(Ordering::Acquire);
184        (write_pos.wrapping_sub(read_pos)) & self.mask
185    }
186
187    /// Check if buffer is empty
188    pub fn is_empty(&self) -> bool {
189        let write_pos = self.write_pos.load(Ordering::Acquire);
190        let read_pos = self.read_pos.load(Ordering::Acquire);
191        write_pos == read_pos
192    }
193
194    /// Get buffer capacity
195    pub fn capacity(&self) -> usize {
196        (self.mask + 1) - 1 // One slot reserved for full/empty distinction
197    }
198
199    /// Get number of dropped events
200    pub fn dropped_count(&self) -> usize {
201        self.dropped_events.load(Ordering::Relaxed)
202    }
203
204    /// Reset dropped event counter (for testing)
205    #[cfg(test)]
206    pub fn reset_dropped_count(&self) {
207        self.dropped_events.store(0, Ordering::Relaxed);
208    }
209
210    /// Drain all events from buffer
211    ///
212    /// Returns vector of all current events, clearing the buffer.
213    /// Used for bulk processing and testing.
214    pub fn drain(&self) -> Vec<AllocationEvent> {
215        let mut events = Vec::new();
216        while let Some(event) = self.pop() {
217            events.push(event);
218        }
219        events
220    }
221}
222
223impl Default for EventBuffer {
224    fn default() -> Self {
225        Self::new()
226    }
227}
228
229// Safe to share between threads because:
230// - Only one thread (producer) writes to write_pos and events at write_pos
231// - Only one thread (consumer) reads from read_pos and events at read_pos
232// - Atomic operations ensure proper synchronization
233unsafe impl Sync for EventBuffer {}
234unsafe impl Send for EventBuffer {}
235
236// Thread-local event buffer storage
237//
238// Each thread gets its own event buffer to avoid contention.
239// Uses UnsafeCell for zero-overhead access from allocator hooks.
240thread_local! {
241    static THREAD_EVENT_BUFFER: UnsafeCell<EventBuffer> = UnsafeCell::new(EventBuffer::new());
242}
243
244/// Get reference to current thread's event buffer
245///
246/// Used by allocator hooks to record allocation events.
247/// Safe because each thread has exclusive access to its own buffer.
248#[inline(always)]
249pub fn with_thread_buffer<F, R>(f: F) -> R
250where
251    F: FnOnce(&EventBuffer) -> R,
252{
253    THREAD_EVENT_BUFFER.with(|buffer| {
254        // Safe: thread-local access, no concurrent access possible
255        let buffer_ref = unsafe { &*buffer.get() };
256        f(buffer_ref)
257    })
258}
259
260/// Record allocation event in current thread's buffer
261///
262/// High-performance path called from global allocator hook.
263/// Optimized for minimal overhead in the allocation fast path.
264#[inline(always)]
265pub fn record_allocation_event(
266    task_id: TaskId,
267    ptr: usize,
268    size: usize,
269    timestamp: u64,
270    is_allocation: bool,
271) -> AsyncResult<()> {
272    let event = if is_allocation {
273        AllocationEvent::allocation(task_id, ptr, size, timestamp)
274    } else {
275        AllocationEvent::deallocation(task_id, ptr, size, timestamp)
276    };
277
278    with_thread_buffer(|buffer| buffer.push(event))
279}
280
281/// Collect events from all thread buffers
282///
283/// Called by aggregator to gather events for processing.
284/// This is a simplified version - production implementation would
285/// maintain a registry of all thread buffers.
286pub fn collect_all_events() -> Vec<AllocationEvent> {
287    // In real implementation, this would iterate over all thread buffers
288    // For now, just return events from current thread
289    with_thread_buffer(|buffer| buffer.drain())
290}
291
292/// Get buffer statistics for monitoring
293#[derive(Debug, Clone)]
294pub struct BufferStats {
295    /// Current number of events in buffer
296    pub current_events: usize,
297    /// Buffer capacity
298    pub capacity: usize,
299    /// Total events dropped due to overflow
300    pub events_dropped: usize,
301    /// Buffer utilization ratio (0.0 to 1.0)
302    pub utilization: f64,
303}
304
305impl BufferStats {
306    /// Get utilization warning level
307    pub fn warning_level(&self) -> Option<&'static str> {
308        match self.utilization {
309            u if u >= 0.95 => Some("critical"),
310            u if u >= 0.85 => Some("high"),
311            u if u >= 0.75 => Some("medium"),
312            _ => None,
313        }
314    }
315}
316
317/// Get current thread buffer statistics
318pub fn get_buffer_stats() -> BufferStats {
319    with_thread_buffer(|buffer| {
320        let current_events = buffer.len();
321        let capacity = buffer.capacity();
322        let events_dropped = buffer.dropped_count();
323        let utilization = current_events as f64 / capacity as f64;
324
325        BufferStats {
326            current_events,
327            capacity,
328            events_dropped,
329            utilization,
330        }
331    })
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337
338    #[test]
339    fn test_allocation_event_creation() {
340        let alloc_event = AllocationEvent::allocation(12345, 0x1000, 1024, 567890);
341        assert_eq!(alloc_event.task_id, 12345);
342        assert_eq!(alloc_event.ptr, 0x1000);
343        assert_eq!(alloc_event.size, 1024);
344        assert_eq!(alloc_event.timestamp, 567890);
345        assert!(alloc_event.is_allocation());
346        assert!(!alloc_event.is_deallocation());
347
348        let dealloc_event = AllocationEvent::deallocation(12345, 0x1000, 1024, 567891);
349        assert!(dealloc_event.is_deallocation());
350        assert!(!dealloc_event.is_allocation());
351    }
352
353    #[test]
354    fn test_event_buffer_basic_operations() {
355        let buffer = EventBuffer::new_test();
356        assert!(buffer.is_empty());
357        assert_eq!(buffer.len(), 0);
358        assert_eq!(buffer.dropped_count(), 0);
359
360        let event = AllocationEvent::allocation(1, 0x1000, 100, 123);
361        buffer.push(event).expect("Failed to push event");
362
363        assert!(!buffer.is_empty());
364        assert_eq!(buffer.len(), 1);
365
366        let popped = buffer.pop().expect("Failed to pop event");
367        assert_eq!(popped.task_id, 1);
368        assert_eq!(popped.ptr, 0x1000);
369        assert_eq!(popped.size, 100);
370
371        assert!(buffer.is_empty());
372        assert!(buffer.pop().is_none());
373    }
374
375    #[test]
376    fn test_buffer_overflow_handling() {
377        let buffer = EventBuffer::new_test();
378        buffer.reset_dropped_count();
379
380        // Fill buffer to capacity
381        let capacity = buffer.capacity();
382        for i in 0..capacity {
383            let event = AllocationEvent::allocation(i as TaskId, i, 100, i as u64);
384            buffer.push(event).expect("Failed to push event");
385        }
386
387        // Next push should fail with overflow
388        let overflow_event = AllocationEvent::allocation(99999, 0x9999, 100, 99999);
389        let result = buffer.push(overflow_event);
390        assert!(result.is_err());
391        assert_eq!(buffer.dropped_count(), 1);
392
393        // Buffer should still be full
394        assert_eq!(buffer.len(), capacity);
395
396        // Pop one event and push should succeed again
397        buffer.pop().expect("Failed to pop event");
398        buffer
399            .push(overflow_event)
400            .expect("Failed to push after pop");
401    }
402
403    #[test]
404    fn test_buffer_wraparound() {
405        let buffer = EventBuffer::new_test();
406        let capacity = buffer.capacity();
407
408        // Fill and empty buffer multiple times to test wraparound
409        for round in 0..3 {
410            for i in 0..capacity / 2 {
411                let event = AllocationEvent::allocation(
412                    (round * 1000 + i) as TaskId,
413                    i,
414                    100,
415                    (round * 1000 + i) as u64,
416                );
417                buffer.push(event).expect("Failed to push event");
418            }
419
420            let events = buffer.drain();
421            assert_eq!(events.len(), capacity / 2);
422
423            // Verify correct order
424            for (i, event) in events.iter().enumerate() {
425                assert_eq!(event.task_id, (round * 1000 + i) as TaskId);
426            }
427        }
428    }
429
430    #[test]
431    fn test_thread_local_buffer() {
432        use std::thread;
433
434        // Test that each thread gets its own buffer
435        let handle1 = thread::spawn(|| {
436            record_allocation_event(1, 0x1000, 100, 123, true).expect("Failed to record");
437            get_buffer_stats().current_events
438        });
439
440        let handle2 = thread::spawn(|| {
441            record_allocation_event(2, 0x2000, 200, 456, true).expect("Failed to record");
442            get_buffer_stats().current_events
443        });
444
445        assert_eq!(handle1.join().expect("Thread 1 panicked"), 1);
446        assert_eq!(handle2.join().expect("Thread 2 panicked"), 1);
447
448        // Main thread buffer should be empty
449        assert_eq!(get_buffer_stats().current_events, 0);
450    }
451
452    #[test]
453    fn test_buffer_stats() {
454        with_thread_buffer(|buffer| buffer.reset_dropped_count());
455
456        // Empty buffer
457        let stats = get_buffer_stats();
458        assert_eq!(stats.current_events, 0);
459        assert_eq!(stats.utilization, 0.0);
460        assert!(stats.warning_level().is_none());
461
462        // Fill buffer to trigger warnings
463        let capacity = get_buffer_stats().capacity;
464        let high_fill = (capacity as f64 * 0.9) as usize;
465
466        for i in 0..high_fill {
467            record_allocation_event(i as TaskId, i, 100, i as u64, true).expect("Failed to record");
468        }
469
470        let stats = get_buffer_stats();
471        assert!(stats.utilization >= 0.85);
472        assert!(stats.warning_level().is_some());
473    }
474
475    #[test]
476    fn test_concurrent_producer_consumer() {
477        use std::sync::Arc;
478        use std::thread;
479        use std::time::Duration;
480
481        let buffer = Arc::new(EventBuffer::new_test());
482        let producer_buffer = Arc::clone(&buffer);
483        let consumer_buffer = Arc::clone(&buffer);
484
485        // Producer thread
486        let producer = thread::spawn(move || {
487            for i in 0..1000 {
488                let event =
489                    AllocationEvent::allocation(i as TaskId, (i * 1000) as usize, 100, i as u64);
490                // Ignore overflow errors for this test
491                let _ = producer_buffer.push(event);
492            }
493        });
494
495        // Consumer thread
496        let consumer = thread::spawn(move || {
497            let mut consumed = 0;
498            let mut last_task_id = None;
499
500            for _ in 0..100 {
501                while let Some(event) = consumer_buffer.pop() {
502                    // Verify ordering within what we can consume
503                    if let Some(last_id) = last_task_id {
504                        assert!(event.task_id >= last_id);
505                    }
506                    last_task_id = Some(event.task_id);
507                    consumed += 1;
508                }
509                thread::sleep(Duration::from_micros(10));
510            }
511            consumed
512        });
513
514        producer.join().expect("Producer thread panicked");
515        let consumed = consumer.join().expect("Consumer thread panicked");
516
517        // Should have consumed some events (exact number depends on timing)
518        assert!(consumed > 0);
519        assert!(consumed <= 1000);
520    }
521}