seq_runtime/
memory_stats.rs

1//! Cross-thread memory statistics registry
2//!
3//! Provides visibility into arena and pool memory usage across all worker threads.
4//! Each thread registers itself and updates its own slot with minimal overhead.
5//!
6//! # Design
7//!
8//! The challenge: Arena and pool are thread-local, but diagnostics runs on a
9//! separate signal handler thread. We solve this with a global registry where
10//! each thread has an exclusive slot for its stats.
11//!
12//! ```text
13//! ┌─────────────────────────────────────────────────────────┐
14//! │              MemoryStatsRegistry (global)               │
15//! ├─────────────────────────────────────────────────────────┤
16//! │ slots: [MemorySlot; MAX_THREADS]                        │
17//! │                                                         │
18//! │  ┌──────────────────┐  ┌──────────────────┐             │
19//! │  │ Slot 0 (Thread A)│  │ Slot 1 (Thread B)│  ...        │
20//! │  │ thread_id: u64   │  │ thread_id: u64   │             │
21//! │  │ arena_bytes: u64 │  │ arena_bytes: u64 │             │
22//! │  │ pool_free: u64   │  │ pool_free: u64   │             │
23//! │  │ pool_allocs: u64 │  │ pool_allocs: u64 │             │
24//! │  └──────────────────┘  └──────────────────┘             │
25//! └─────────────────────────────────────────────────────────┘
26//! ```
27//!
28//! # Performance
29//!
30//! - **Registration**: One-time CAS per thread (on first arena access)
31//! - **Updates**: Single atomic store per operation (~1-2 cycles, no contention)
32//! - **Reads**: Only during diagnostics (SIGQUIT), iterates all slots
33//!
34//! This maintains the "fast path stays fast" principle.
35
36use std::sync::OnceLock;
37use std::sync::atomic::{AtomicU64, Ordering};
38
39/// Maximum number of worker threads we can track
40/// May's default is typically fewer threads, but we allow headroom
41const MAX_THREADS: usize = 64;
42
43/// Statistics for a single thread's memory usage
44#[derive(Debug)]
45pub struct MemorySlot {
46    /// Thread ID (0 = slot is free)
47    pub thread_id: AtomicU64,
48    /// Arena allocated bytes
49    pub arena_bytes: AtomicU64,
50    /// Pool free node count
51    pub pool_free_count: AtomicU64,
52    /// Pool total capacity
53    pub pool_capacity: AtomicU64,
54    /// Total allocations from pool (lifetime counter)
55    pub pool_allocations: AtomicU64,
56}
57
58impl MemorySlot {
59    const fn new() -> Self {
60        Self {
61            thread_id: AtomicU64::new(0),
62            arena_bytes: AtomicU64::new(0),
63            pool_free_count: AtomicU64::new(0),
64            pool_capacity: AtomicU64::new(0),
65            pool_allocations: AtomicU64::new(0),
66        }
67    }
68}
69
70/// Global registry for cross-thread memory statistics
71pub struct MemoryStatsRegistry {
72    slots: Box<[MemorySlot]>,
73    /// Count of threads that couldn't get a slot
74    pub overflow_count: AtomicU64,
75}
76
77impl MemoryStatsRegistry {
78    /// Create a new registry with the given capacity
79    fn new(capacity: usize) -> Self {
80        let slots: Vec<MemorySlot> = (0..capacity).map(|_| MemorySlot::new()).collect();
81        Self {
82            slots: slots.into_boxed_slice(),
83            overflow_count: AtomicU64::new(0),
84        }
85    }
86
87    /// Register a thread and get its slot index
88    ///
89    /// Returns Some(index) if a slot was claimed, None if registry is full.
90    /// Uses the current thread's ID as the identifier.
91    pub fn register(&self) -> Option<usize> {
92        let thread_id = current_thread_id();
93
94        // Scan for a free slot
95        for (idx, slot) in self.slots.iter().enumerate() {
96            // Try to claim this slot (CAS from 0 to thread_id)
97            if slot
98                .thread_id
99                .compare_exchange(0, thread_id, Ordering::AcqRel, Ordering::Relaxed)
100                .is_ok()
101            {
102                return Some(idx);
103            }
104        }
105
106        // Registry full
107        self.overflow_count.fetch_add(1, Ordering::Relaxed);
108        None
109    }
110
111    /// Update arena stats for a slot
112    ///
113    /// # Safety
114    /// Caller must own the slot (be the thread that registered it)
115    #[inline]
116    pub fn update_arena(&self, slot_idx: usize, arena_bytes: usize) {
117        if let Some(slot) = self.slots.get(slot_idx) {
118            slot.arena_bytes
119                .store(arena_bytes as u64, Ordering::Relaxed);
120        }
121    }
122
123    /// Update pool stats for a slot
124    ///
125    /// # Safety
126    /// Caller must own the slot (be the thread that registered it)
127    #[inline]
128    pub fn update_pool(&self, slot_idx: usize, free_count: usize, capacity: usize) {
129        if let Some(slot) = self.slots.get(slot_idx) {
130            slot.pool_free_count
131                .store(free_count as u64, Ordering::Relaxed);
132            slot.pool_capacity.store(capacity as u64, Ordering::Relaxed);
133        }
134    }
135
136    /// Increment pool allocation counter
137    #[inline]
138    pub fn increment_pool_allocations(&self, slot_idx: usize) {
139        if let Some(slot) = self.slots.get(slot_idx) {
140            slot.pool_allocations.fetch_add(1, Ordering::Relaxed);
141        }
142    }
143
144    /// Get aggregated memory statistics across all threads
145    pub fn aggregate_stats(&self) -> AggregateMemoryStats {
146        let mut total_arena_bytes: u64 = 0;
147        let mut total_pool_free: u64 = 0;
148        let mut total_pool_capacity: u64 = 0;
149        let mut total_pool_allocations: u64 = 0;
150        let mut active_threads: usize = 0;
151
152        for slot in self.slots.iter() {
153            let thread_id = slot.thread_id.load(Ordering::Acquire);
154            if thread_id > 0 {
155                active_threads += 1;
156                total_arena_bytes += slot.arena_bytes.load(Ordering::Relaxed);
157                total_pool_free += slot.pool_free_count.load(Ordering::Relaxed);
158                total_pool_capacity += slot.pool_capacity.load(Ordering::Relaxed);
159                total_pool_allocations += slot.pool_allocations.load(Ordering::Relaxed);
160            }
161        }
162
163        AggregateMemoryStats {
164            active_threads,
165            total_arena_bytes,
166            total_pool_free,
167            total_pool_capacity,
168            total_pool_allocations,
169            overflow_count: self.overflow_count.load(Ordering::Relaxed),
170        }
171    }
172
173    /// Iterate over per-thread statistics (for detailed diagnostics)
174    pub fn per_thread_stats(&self) -> impl Iterator<Item = ThreadMemoryStats> + '_ {
175        self.slots.iter().filter_map(|slot| {
176            let thread_id = slot.thread_id.load(Ordering::Acquire);
177            if thread_id > 0 {
178                Some(ThreadMemoryStats {
179                    thread_id,
180                    arena_bytes: slot.arena_bytes.load(Ordering::Relaxed),
181                    pool_free_count: slot.pool_free_count.load(Ordering::Relaxed),
182                    pool_capacity: slot.pool_capacity.load(Ordering::Relaxed),
183                    pool_allocations: slot.pool_allocations.load(Ordering::Relaxed),
184                })
185            } else {
186                None
187            }
188        })
189    }
190
191    /// Get registry capacity
192    pub fn capacity(&self) -> usize {
193        self.slots.len()
194    }
195}
196
197/// Aggregated memory statistics across all threads
198#[derive(Debug, Clone, Copy)]
199pub struct AggregateMemoryStats {
200    pub active_threads: usize,
201    pub total_arena_bytes: u64,
202    pub total_pool_free: u64,
203    pub total_pool_capacity: u64,
204    pub total_pool_allocations: u64,
205    pub overflow_count: u64,
206}
207
208/// Memory statistics for a single thread
209#[derive(Debug, Clone, Copy)]
210pub struct ThreadMemoryStats {
211    pub thread_id: u64,
212    pub arena_bytes: u64,
213    pub pool_free_count: u64,
214    pub pool_capacity: u64,
215    pub pool_allocations: u64,
216}
217
218/// Global counter for generating unique thread IDs
219/// Starts at 1 because 0 means "empty slot"
220static NEXT_THREAD_ID: AtomicU64 = AtomicU64::new(1);
221
222// Thread-local storage for this thread's unique ID
223thread_local! {
224    static THIS_THREAD_ID: u64 = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
225}
226
227/// Get a unique ID for the current thread
228///
229/// Uses a global atomic counter to guarantee uniqueness (no hash collisions).
230/// Thread IDs start at 1 and increment monotonically.
231fn current_thread_id() -> u64 {
232    THIS_THREAD_ID.with(|&id| id)
233}
234
235// Global registry instance
236static MEMORY_REGISTRY: OnceLock<MemoryStatsRegistry> = OnceLock::new();
237
238/// Get the global memory stats registry
239pub fn memory_registry() -> &'static MemoryStatsRegistry {
240    MEMORY_REGISTRY.get_or_init(|| MemoryStatsRegistry::new(MAX_THREADS))
241}
242
243// Thread-local slot index (cached after first registration)
244thread_local! {
245    static SLOT_INDEX: std::cell::Cell<Option<usize>> = const { std::cell::Cell::new(None) };
246}
247
248/// Get or register the current thread's slot index
249///
250/// Returns Some(index) if registered (or already was), None if registry is full.
251pub fn get_or_register_slot() -> Option<usize> {
252    SLOT_INDEX.with(|cell| {
253        if let Some(idx) = cell.get() {
254            Some(idx)
255        } else {
256            let idx = memory_registry().register();
257            cell.set(idx);
258            idx
259        }
260    })
261}
262
263/// Update arena stats for the current thread
264///
265/// Call this after arena operations to keep stats current.
266#[inline]
267pub fn update_arena_stats(arena_bytes: usize) {
268    if let Some(idx) = SLOT_INDEX.with(|cell| cell.get()) {
269        memory_registry().update_arena(idx, arena_bytes);
270    }
271}
272
273/// Update pool stats for the current thread
274#[inline]
275pub fn update_pool_stats(free_count: usize, capacity: usize) {
276    if let Some(idx) = SLOT_INDEX.with(|cell| cell.get()) {
277        memory_registry().update_pool(idx, free_count, capacity);
278    }
279}
280
281/// Increment pool allocation counter for the current thread
282#[inline]
283pub fn increment_pool_allocations() {
284    if let Some(idx) = SLOT_INDEX.with(|cell| cell.get()) {
285        memory_registry().increment_pool_allocations(idx);
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292
293    #[test]
294    fn test_registry_basic() {
295        let registry = MemoryStatsRegistry::new(4);
296
297        // Register should succeed
298        let slot = registry.register();
299        assert!(slot.is_some());
300        let idx = slot.unwrap();
301
302        // Update stats
303        registry.update_arena(idx, 1024);
304        registry.update_pool(idx, 10, 100);
305
306        // Aggregate should reflect our updates
307        let stats = registry.aggregate_stats();
308        assert_eq!(stats.active_threads, 1);
309        assert_eq!(stats.total_arena_bytes, 1024);
310        assert_eq!(stats.total_pool_free, 10);
311        assert_eq!(stats.total_pool_capacity, 100);
312    }
313
314    #[test]
315    fn test_registry_overflow() {
316        let registry = MemoryStatsRegistry::new(2);
317
318        // Fill up the registry from different "threads" (simulated)
319        // Note: In real usage, each thread gets one slot
320        // Here we just test the CAS logic
321        assert!(registry.register().is_some());
322        assert!(registry.register().is_some());
323
324        // Third registration should fail (we're on the same thread, so it won't
325        // actually fail - but if we had 3 threads, the 3rd would fail)
326        // For now, just verify overflow_count is accessible
327        assert_eq!(registry.overflow_count.load(Ordering::Relaxed), 0);
328    }
329
330    #[test]
331    fn test_thread_local_slot() {
332        // First call should register (or return cached if already registered)
333        let slot1 = get_or_register_slot();
334
335        // Second call should return same value (cached)
336        let slot2 = get_or_register_slot();
337        assert_eq!(slot1, slot2);
338
339        // If registration succeeded, slot should be Some
340        // If registry was full, slot is None (acceptable in parallel test execution)
341        // Either way, the caching behavior is correct
342    }
343
344    #[test]
345    fn test_update_helpers() {
346        // Try to register (may fail if registry full from parallel tests)
347        let slot = get_or_register_slot();
348
349        if slot.is_some() {
350            // Update stats
351            update_arena_stats(2048);
352            update_pool_stats(5, 50);
353            increment_pool_allocations();
354            increment_pool_allocations();
355
356            // Verify via aggregate
357            let stats = memory_registry().aggregate_stats();
358            assert!(stats.total_arena_bytes >= 2048); // May have other test data
359            assert!(stats.total_pool_allocations >= 2);
360        }
361        // If slot is None, registry was full - that's OK for this test
362    }
363
364    #[test]
365    fn test_per_thread_stats() {
366        // Try to register
367        let slot = get_or_register_slot();
368
369        if slot.is_some() {
370            // Use a unique value to identify our thread's stats
371            let unique_arena_bytes: usize = 999_777_555;
372            update_arena_stats(unique_arena_bytes);
373
374            // Should be able to iterate per-thread stats
375            let per_thread: Vec<_> = memory_registry().per_thread_stats().collect();
376            assert!(!per_thread.is_empty());
377
378            // Find our thread's stats
379            let our_stats = per_thread
380                .iter()
381                .find(|s| s.arena_bytes == unique_arena_bytes as u64);
382            assert!(our_stats.is_some());
383        }
384        // If slot is None, registry was full - that's OK for this test
385    }
386
387    #[test]
388    fn test_concurrent_registration() {
389        use std::thread;
390
391        // Spawn multiple threads that each register and update stats
392        let handles: Vec<_> = (0..4)
393            .map(|i| {
394                thread::spawn(move || {
395                    let slot = get_or_register_slot();
396                    if slot.is_some() {
397                        // Each thread sets a unique arena value
398                        update_arena_stats(1000 * (i + 1));
399                        update_pool_stats(i * 10, 100);
400                        increment_pool_allocations();
401                    }
402                    slot.is_some()
403                })
404            })
405            .collect();
406
407        // Wait for all threads and count successful registrations
408        let mut registered_count = 0;
409        for h in handles {
410            if h.join().unwrap() {
411                registered_count += 1;
412            }
413        }
414
415        // Verify aggregate stats reflect the registrations
416        let stats = memory_registry().aggregate_stats();
417        // active_threads includes all threads that have registered (including test threads)
418        assert!(stats.active_threads >= registered_count);
419        // If any threads registered, we should have some pool allocations
420        if registered_count > 0 {
421            assert!(stats.total_pool_allocations >= registered_count as u64);
422        }
423    }
424
425    #[test]
426    fn test_thread_ids_are_unique() {
427        use std::collections::HashSet;
428        use std::sync::{Arc, Mutex};
429        use std::thread;
430
431        let ids = Arc::new(Mutex::new(HashSet::new()));
432
433        let handles: Vec<_> = (0..8)
434            .map(|_| {
435                let ids = Arc::clone(&ids);
436                thread::spawn(move || {
437                    let id = current_thread_id();
438                    ids.lock().unwrap().insert(id);
439                    id
440                })
441            })
442            .collect();
443
444        for h in handles {
445            h.join().unwrap();
446        }
447
448        // All thread IDs should be unique
449        let unique_count = ids.lock().unwrap().len();
450        assert_eq!(unique_count, 8, "Thread IDs should be unique");
451    }
452}