seq_runtime/
memory_stats.rs

1//! Cross-thread memory statistics registry
2//!
3//! Provides visibility into arena memory usage across all worker threads.
4//! Each thread registers itself and updates its own slot with minimal overhead.
5//!
6//! # Design
7//!
8//! The challenge: Arena is thread-local, but diagnostics runs on a
9//! separate signal handler thread. We solve this with a global registry where
10//! each thread has an exclusive slot for its stats.
11//!
12//! ```text
13//! ┌─────────────────────────────────────────────────────────┐
14//! │              MemoryStatsRegistry (global)               │
15//! ├─────────────────────────────────────────────────────────┤
16//! │ slots: [MemorySlot; MAX_THREADS]                        │
17//! │                                                         │
18//! │  ┌──────────────────┐  ┌──────────────────┐             │
19//! │  │ Slot 0 (Thread A)│  │ Slot 1 (Thread B)│  ...        │
20//! │  │ thread_id: u64   │  │ thread_id: u64   │             │
21//! │  │ arena_bytes: u64 │  │ arena_bytes: u64 │             │
22//! │  └──────────────────┘  └──────────────────┘             │
23//! └─────────────────────────────────────────────────────────┘
24//! ```
25//!
26//! # Performance
27//!
28//! - **Registration**: One-time CAS per thread (on first arena access)
29//! - **Updates**: Single atomic store per operation (~1-2 cycles, no contention)
30//! - **Reads**: Only during diagnostics (SIGQUIT), iterates all slots
31//!
32//! This maintains the "fast path stays fast" principle.
33
34use std::sync::OnceLock;
35use std::sync::atomic::{AtomicU64, Ordering};
36
37/// Maximum number of worker threads we can track
38/// May's default is typically fewer threads, but we allow headroom
39const MAX_THREADS: usize = 64;
40
41/// Statistics for a single thread's memory usage
42#[derive(Debug)]
43pub struct MemorySlot {
44    /// Thread ID (0 = slot is free)
45    pub thread_id: AtomicU64,
46    /// Arena allocated bytes
47    pub arena_bytes: AtomicU64,
48}
49
50impl MemorySlot {
51    const fn new() -> Self {
52        Self {
53            thread_id: AtomicU64::new(0),
54            arena_bytes: AtomicU64::new(0),
55        }
56    }
57}
58
59/// Global registry for cross-thread memory statistics
60pub struct MemoryStatsRegistry {
61    slots: Box<[MemorySlot]>,
62    /// Count of threads that couldn't get a slot
63    pub overflow_count: AtomicU64,
64}
65
66impl MemoryStatsRegistry {
67    /// Create a new registry with the given capacity
68    fn new(capacity: usize) -> Self {
69        let slots: Vec<MemorySlot> = (0..capacity).map(|_| MemorySlot::new()).collect();
70        Self {
71            slots: slots.into_boxed_slice(),
72            overflow_count: AtomicU64::new(0),
73        }
74    }
75
76    /// Register a thread and get its slot index
77    ///
78    /// Returns Some(index) if a slot was claimed, None if registry is full.
79    /// Uses the current thread's ID as the identifier.
80    pub fn register(&self) -> Option<usize> {
81        let thread_id = current_thread_id();
82
83        // Scan for a free slot
84        for (idx, slot) in self.slots.iter().enumerate() {
85            // Try to claim this slot (CAS from 0 to thread_id)
86            if slot
87                .thread_id
88                .compare_exchange(0, thread_id, Ordering::AcqRel, Ordering::Relaxed)
89                .is_ok()
90            {
91                return Some(idx);
92            }
93        }
94
95        // Registry full
96        self.overflow_count.fetch_add(1, Ordering::Relaxed);
97        None
98    }
99
100    /// Update arena stats for a slot
101    ///
102    /// # Safety
103    /// Caller must own the slot (be the thread that registered it)
104    #[inline]
105    pub fn update_arena(&self, slot_idx: usize, arena_bytes: usize) {
106        if let Some(slot) = self.slots.get(slot_idx) {
107            slot.arena_bytes
108                .store(arena_bytes as u64, Ordering::Relaxed);
109        }
110    }
111
112    /// Get aggregated memory statistics across all threads
113    pub fn aggregate_stats(&self) -> AggregateMemoryStats {
114        let mut total_arena_bytes: u64 = 0;
115        let mut active_threads: usize = 0;
116
117        for slot in self.slots.iter() {
118            let thread_id = slot.thread_id.load(Ordering::Acquire);
119            if thread_id > 0 {
120                active_threads += 1;
121                total_arena_bytes += slot.arena_bytes.load(Ordering::Relaxed);
122            }
123        }
124
125        AggregateMemoryStats {
126            active_threads,
127            total_arena_bytes,
128            overflow_count: self.overflow_count.load(Ordering::Relaxed),
129        }
130    }
131
132    /// Iterate over per-thread statistics (for detailed diagnostics)
133    pub fn per_thread_stats(&self) -> impl Iterator<Item = ThreadMemoryStats> + '_ {
134        self.slots.iter().filter_map(|slot| {
135            let thread_id = slot.thread_id.load(Ordering::Acquire);
136            if thread_id > 0 {
137                Some(ThreadMemoryStats {
138                    thread_id,
139                    arena_bytes: slot.arena_bytes.load(Ordering::Relaxed),
140                })
141            } else {
142                None
143            }
144        })
145    }
146
147    /// Get registry capacity
148    pub fn capacity(&self) -> usize {
149        self.slots.len()
150    }
151}
152
153/// Aggregated memory statistics across all threads
154#[derive(Debug, Clone, Copy)]
155pub struct AggregateMemoryStats {
156    pub active_threads: usize,
157    pub total_arena_bytes: u64,
158    pub overflow_count: u64,
159}
160
161/// Memory statistics for a single thread
162#[derive(Debug, Clone, Copy)]
163pub struct ThreadMemoryStats {
164    pub thread_id: u64,
165    pub arena_bytes: u64,
166}
167
168/// Global counter for generating unique thread IDs
169/// Starts at 1 because 0 means "empty slot"
170static NEXT_THREAD_ID: AtomicU64 = AtomicU64::new(1);
171
172// Thread-local storage for this thread's unique ID
173thread_local! {
174    static THIS_THREAD_ID: u64 = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
175}
176
177/// Get a unique ID for the current thread
178///
179/// Uses a global atomic counter to guarantee uniqueness (no hash collisions).
180/// Thread IDs start at 1 and increment monotonically.
181fn current_thread_id() -> u64 {
182    THIS_THREAD_ID.with(|&id| id)
183}
184
185// Global registry instance
186static MEMORY_REGISTRY: OnceLock<MemoryStatsRegistry> = OnceLock::new();
187
188/// Get the global memory stats registry
189pub fn memory_registry() -> &'static MemoryStatsRegistry {
190    MEMORY_REGISTRY.get_or_init(|| MemoryStatsRegistry::new(MAX_THREADS))
191}
192
193// Thread-local slot index (cached after first registration)
194thread_local! {
195    static SLOT_INDEX: std::cell::Cell<Option<usize>> = const { std::cell::Cell::new(None) };
196}
197
198/// Get or register the current thread's slot index
199///
200/// Returns Some(index) if registered (or already was), None if registry is full.
201pub fn get_or_register_slot() -> Option<usize> {
202    SLOT_INDEX.with(|cell| {
203        if let Some(idx) = cell.get() {
204            Some(idx)
205        } else {
206            let idx = memory_registry().register();
207            cell.set(idx);
208            idx
209        }
210    })
211}
212
213/// Update arena stats for the current thread
214///
215/// Call this after arena operations to keep stats current.
216#[inline]
217pub fn update_arena_stats(arena_bytes: usize) {
218    if let Some(idx) = SLOT_INDEX.with(|cell| cell.get()) {
219        memory_registry().update_arena(idx, arena_bytes);
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226
227    #[test]
228    fn test_registry_basic() {
229        let registry = MemoryStatsRegistry::new(4);
230
231        // Register should succeed
232        let slot = registry.register();
233        assert!(slot.is_some());
234        let idx = slot.unwrap();
235
236        // Update stats
237        registry.update_arena(idx, 1024);
238
239        // Aggregate should reflect our updates
240        let stats = registry.aggregate_stats();
241        assert_eq!(stats.active_threads, 1);
242        assert_eq!(stats.total_arena_bytes, 1024);
243    }
244
245    #[test]
246    fn test_registry_overflow() {
247        let registry = MemoryStatsRegistry::new(2);
248
249        // Fill up the registry from different "threads" (simulated)
250        // Note: In real usage, each thread gets one slot
251        // Here we just test the CAS logic
252        assert!(registry.register().is_some());
253        assert!(registry.register().is_some());
254
255        // Third registration should fail (we're on the same thread, so it won't
256        // actually fail - but if we had 3 threads, the 3rd would fail)
257        // For now, just verify overflow_count is accessible
258        assert_eq!(registry.overflow_count.load(Ordering::Relaxed), 0);
259    }
260
261    #[test]
262    fn test_thread_local_slot() {
263        // First call should register (or return cached if already registered)
264        let slot1 = get_or_register_slot();
265
266        // Second call should return same value (cached)
267        let slot2 = get_or_register_slot();
268        assert_eq!(slot1, slot2);
269
270        // If registration succeeded, slot should be Some
271        // If registry was full, slot is None (acceptable in parallel test execution)
272        // Either way, the caching behavior is correct
273    }
274
275    #[test]
276    fn test_update_helpers() {
277        // Try to register (may fail if registry full from parallel tests)
278        let slot = get_or_register_slot();
279
280        if slot.is_some() {
281            // Update stats
282            update_arena_stats(2048);
283
284            // Verify via aggregate
285            let stats = memory_registry().aggregate_stats();
286            assert!(stats.total_arena_bytes >= 2048); // May have other test data
287        }
288        // If slot is None, registry was full - that's OK for this test
289    }
290
291    #[test]
292    fn test_per_thread_stats() {
293        // Try to register
294        let slot = get_or_register_slot();
295
296        if slot.is_some() {
297            // Use a unique value to identify our thread's stats
298            let unique_arena_bytes: usize = 999_777_555;
299            update_arena_stats(unique_arena_bytes);
300
301            // Should be able to iterate per-thread stats
302            let per_thread: Vec<_> = memory_registry().per_thread_stats().collect();
303            assert!(!per_thread.is_empty());
304
305            // Find our thread's stats
306            let our_stats = per_thread
307                .iter()
308                .find(|s| s.arena_bytes == unique_arena_bytes as u64);
309            assert!(our_stats.is_some());
310        }
311        // If slot is None, registry was full - that's OK for this test
312    }
313
314    #[test]
315    fn test_concurrent_registration() {
316        use std::thread;
317
318        // Spawn multiple threads that each register and update stats
319        let handles: Vec<_> = (0..4)
320            .map(|i| {
321                thread::spawn(move || {
322                    let slot = get_or_register_slot();
323                    if slot.is_some() {
324                        // Each thread sets a unique arena value
325                        update_arena_stats(1000 * (i + 1));
326                    }
327                    slot.is_some()
328                })
329            })
330            .collect();
331
332        // Wait for all threads and count successful registrations
333        let mut registered_count = 0;
334        for h in handles {
335            if h.join().unwrap() {
336                registered_count += 1;
337            }
338        }
339
340        // Verify aggregate stats reflect the registrations
341        let stats = memory_registry().aggregate_stats();
342        // active_threads includes all threads that have registered (including test threads)
343        assert!(stats.active_threads >= registered_count);
344    }
345
346    #[test]
347    fn test_thread_ids_are_unique() {
348        use std::collections::HashSet;
349        use std::sync::{Arc, Mutex};
350        use std::thread;
351
352        let ids = Arc::new(Mutex::new(HashSet::new()));
353
354        let handles: Vec<_> = (0..8)
355            .map(|_| {
356                let ids = Arc::clone(&ids);
357                thread::spawn(move || {
358                    let id = current_thread_id();
359                    ids.lock().unwrap().insert(id);
360                    id
361                })
362            })
363            .collect();
364
365        for h in handles {
366            h.join().unwrap();
367        }
368
369        // All thread IDs should be unique
370        let unique_count = ids.lock().unwrap().len();
371        assert_eq!(unique_count, 8, "Thread IDs should be unique");
372    }
373}