Skip to main content

seq_core/
memory_stats.rs

1//! Cross-thread memory statistics registry
2//!
3//! Provides visibility into arena memory usage across all worker threads.
4//! Each thread registers itself and updates its own slot with minimal overhead.
5//!
6//! # Design
7//!
8//! The challenge: Arena is thread-local, but diagnostics runs on a
9//! separate signal handler thread. We solve this with a global registry where
10//! each thread has an exclusive slot for its stats.
11//!
12//! ```text
13//! ┌─────────────────────────────────────────────────────────┐
14//! │              MemoryStatsRegistry (global)               │
15//! ├─────────────────────────────────────────────────────────┤
16//! │ slots: [MemorySlot; MAX_THREADS]                        │
17//! │                                                         │
18//! │  ┌──────────────────┐  ┌──────────────────┐             │
19//! │  │ Slot 0 (Thread A)│  │ Slot 1 (Thread B)│  ...        │
20//! │  │ thread_id: u64   │  │ thread_id: u64   │             │
21//! │  │ arena_bytes: u64 │  │ arena_bytes: u64 │             │
22//! │  └──────────────────┘  └──────────────────┘             │
23//! └─────────────────────────────────────────────────────────┘
24//! ```
25//!
26//! # Performance
27//!
28//! - **Registration**: One-time CAS per thread (on first arena access)
29//! - **Updates**: Single atomic store per operation (~1-2 cycles, no contention)
30//! - **Reads**: Only during diagnostics (SIGQUIT), iterates all slots
31//!
32//! This maintains the "fast path stays fast" principle.
33
34use std::sync::OnceLock;
35use std::sync::atomic::{AtomicU64, Ordering};
36
37/// Maximum number of worker threads we can track
38/// May's default is typically fewer threads, but we allow headroom
39const MAX_THREADS: usize = 64;
40
41/// Statistics for a single thread's memory usage
42#[derive(Debug)]
43pub struct MemorySlot {
44    /// Thread ID (0 = slot is free)
45    pub thread_id: AtomicU64,
46    /// Arena allocated bytes
47    pub arena_bytes: AtomicU64,
48    /// Peak arena allocated bytes (high-water mark)
49    pub peak_arena_bytes: AtomicU64,
50}
51
52impl MemorySlot {
53    const fn new() -> Self {
54        Self {
55            thread_id: AtomicU64::new(0),
56            arena_bytes: AtomicU64::new(0),
57            peak_arena_bytes: AtomicU64::new(0),
58        }
59    }
60}
61
62/// Aggregated memory statistics across all threads
63#[derive(Debug, Clone, Copy)]
64pub struct AggregateMemoryStats {
65    pub active_threads: usize,
66    pub total_arena_bytes: u64,
67    pub total_peak_arena_bytes: u64,
68    pub overflow_count: u64,
69}
70
71/// Global registry for cross-thread memory statistics
72pub struct MemoryStatsRegistry {
73    slots: Box<[MemorySlot]>,
74    /// Count of threads that couldn't get a slot
75    pub overflow_count: AtomicU64,
76}
77
78impl MemoryStatsRegistry {
79    /// Create a new registry with the given capacity
80    fn new(capacity: usize) -> Self {
81        let slots: Vec<MemorySlot> = (0..capacity).map(|_| MemorySlot::new()).collect();
82        Self {
83            slots: slots.into_boxed_slice(),
84            overflow_count: AtomicU64::new(0),
85        }
86    }
87
88    /// Register a thread and get its slot index
89    ///
90    /// Returns Some(index) if a slot was claimed, None if registry is full.
91    /// Uses the current thread's ID as the identifier.
92    fn register(&self) -> Option<usize> {
93        let thread_id = current_thread_id();
94
95        // Scan for a free slot
96        for (idx, slot) in self.slots.iter().enumerate() {
97            // Try to claim this slot (CAS from 0 to thread_id)
98            if slot
99                .thread_id
100                .compare_exchange(0, thread_id, Ordering::AcqRel, Ordering::Relaxed)
101                .is_ok()
102            {
103                return Some(idx);
104            }
105        }
106
107        // Registry full
108        self.overflow_count.fetch_add(1, Ordering::Relaxed);
109        None
110    }
111
112    /// Update arena stats for a slot
113    ///
114    /// # Safety
115    /// Caller must own the slot (be the thread that registered it)
116    #[inline]
117    fn update_arena(&self, slot_idx: usize, arena_bytes: usize) {
118        if let Some(slot) = self.slots.get(slot_idx) {
119            let bytes = arena_bytes as u64;
120            slot.arena_bytes.store(bytes, Ordering::Relaxed);
121
122            // Update peak via CAS loop (same pattern as PEAK_STRANDS in scheduler.rs)
123            let mut peak = slot.peak_arena_bytes.load(Ordering::Relaxed);
124            while bytes > peak {
125                match slot.peak_arena_bytes.compare_exchange_weak(
126                    peak,
127                    bytes,
128                    Ordering::Relaxed,
129                    Ordering::Relaxed,
130                ) {
131                    Ok(_) => break,
132                    Err(current) => peak = current,
133                }
134            }
135        }
136    }
137
138    /// Get aggregated memory statistics across all threads
139    pub fn aggregate_stats(&self) -> AggregateMemoryStats {
140        let mut total_arena_bytes: u64 = 0;
141        let mut total_peak_arena_bytes: u64 = 0;
142        let mut active_threads: usize = 0;
143
144        for slot in self.slots.iter() {
145            let thread_id = slot.thread_id.load(Ordering::Acquire);
146            if thread_id > 0 {
147                active_threads += 1;
148                total_arena_bytes += slot.arena_bytes.load(Ordering::Relaxed);
149                total_peak_arena_bytes += slot.peak_arena_bytes.load(Ordering::Relaxed);
150            }
151        }
152
153        AggregateMemoryStats {
154            active_threads,
155            total_arena_bytes,
156            total_peak_arena_bytes,
157            overflow_count: self.overflow_count.load(Ordering::Relaxed),
158        }
159    }
160}
161
162/// Global counter for generating unique thread IDs
163/// Starts at 1 because 0 means "empty slot"
164static NEXT_THREAD_ID: AtomicU64 = AtomicU64::new(1);
165
166// Thread-local storage for this thread's unique ID
167thread_local! {
168    static THIS_THREAD_ID: u64 = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
169}
170
171/// Get a unique ID for the current thread
172///
173/// Uses a global atomic counter to guarantee uniqueness (no hash collisions).
174/// Thread IDs start at 1 and increment monotonically.
175fn current_thread_id() -> u64 {
176    THIS_THREAD_ID.with(|&id| id)
177}
178
179// Global registry instance
180static MEMORY_REGISTRY: OnceLock<MemoryStatsRegistry> = OnceLock::new();
181
182/// Get the global memory stats registry
183pub fn memory_registry() -> &'static MemoryStatsRegistry {
184    MEMORY_REGISTRY.get_or_init(|| MemoryStatsRegistry::new(MAX_THREADS))
185}
186
187// Thread-local slot index (cached after first registration)
188thread_local! {
189    static SLOT_INDEX: std::cell::Cell<Option<usize>> = const { std::cell::Cell::new(None) };
190}
191
192/// Get or register the current thread's slot index
193///
194/// Returns Some(index) if registered (or already was), None if registry is full.
195pub fn get_or_register_slot() -> Option<usize> {
196    SLOT_INDEX.with(|cell| {
197        if let Some(idx) = cell.get() {
198            Some(idx)
199        } else {
200            let idx = memory_registry().register();
201            cell.set(idx);
202            idx
203        }
204    })
205}
206
207/// Update arena stats for the current thread
208///
209/// Call this after arena operations to keep stats current.
210#[inline]
211pub fn update_arena_stats(arena_bytes: usize) {
212    if let Some(idx) = SLOT_INDEX.with(|cell| cell.get()) {
213        memory_registry().update_arena(idx, arena_bytes);
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220
221    #[test]
222    fn test_registry_basic() {
223        let registry = MemoryStatsRegistry::new(4);
224
225        // Register should succeed
226        let slot = registry.register();
227        assert!(slot.is_some());
228        let idx = slot.unwrap();
229
230        // Update stats
231        registry.update_arena(idx, 1024);
232
233        // Aggregate should reflect our updates
234        let stats = registry.aggregate_stats();
235        assert_eq!(stats.active_threads, 1);
236        assert_eq!(stats.total_arena_bytes, 1024);
237    }
238
239    #[test]
240    fn test_registry_overflow() {
241        let registry = MemoryStatsRegistry::new(2);
242
243        // Fill up the registry from different "threads" (simulated)
244        // Note: In real usage, each thread gets one slot
245        // Here we just test the CAS logic
246        assert!(registry.register().is_some());
247        assert!(registry.register().is_some());
248
249        // Third registration should fail (we're on the same thread, so it won't
250        // actually fail - but if we had 3 threads, the 3rd would fail)
251        // For now, just verify overflow_count is accessible
252        assert_eq!(registry.overflow_count.load(Ordering::Relaxed), 0);
253    }
254
255    #[test]
256    fn test_thread_local_slot() {
257        // First call should register (or return cached if already registered)
258        let slot1 = get_or_register_slot();
259
260        // Second call should return same value (cached)
261        let slot2 = get_or_register_slot();
262        assert_eq!(slot1, slot2);
263
264        // If registration succeeded, slot should be Some
265        // If registry was full, slot is None (acceptable in parallel test execution)
266        // Either way, the caching behavior is correct
267    }
268
269    #[test]
270    fn test_update_helpers() {
271        // Try to register (may fail if registry full from parallel tests)
272        let slot = get_or_register_slot();
273
274        if slot.is_some() {
275            // Update stats
276            update_arena_stats(2048);
277
278            // Verify via aggregate
279            let stats = memory_registry().aggregate_stats();
280            assert!(stats.total_arena_bytes >= 2048); // May have other test data
281        }
282        // If slot is None, registry was full - that's OK for this test
283    }
284
285    #[test]
286    fn test_concurrent_registration() {
287        use std::thread;
288
289        // Spawn multiple threads that each register and update stats
290        let handles: Vec<_> = (0..4)
291            .map(|i| {
292                thread::spawn(move || {
293                    let slot = get_or_register_slot();
294                    if slot.is_some() {
295                        // Each thread sets a unique arena value
296                        update_arena_stats(1000 * (i + 1));
297                    }
298                    slot.is_some()
299                })
300            })
301            .collect();
302
303        // Wait for all threads and count successful registrations
304        let mut registered_count = 0;
305        for h in handles {
306            if h.join().unwrap() {
307                registered_count += 1;
308            }
309        }
310
311        // Verify aggregate stats reflect the registrations
312        let stats = memory_registry().aggregate_stats();
313        // active_threads includes all threads that have registered (including test threads)
314        assert!(stats.active_threads >= registered_count);
315    }
316
317    #[test]
318    fn test_thread_ids_are_unique() {
319        use std::collections::HashSet;
320        use std::sync::{Arc, Mutex};
321        use std::thread;
322
323        let ids = Arc::new(Mutex::new(HashSet::new()));
324
325        let handles: Vec<_> = (0..8)
326            .map(|_| {
327                let ids = Arc::clone(&ids);
328                thread::spawn(move || {
329                    let id = current_thread_id();
330                    ids.lock().unwrap().insert(id);
331                    id
332                })
333            })
334            .collect();
335
336        for h in handles {
337            h.join().unwrap();
338        }
339
340        // All thread IDs should be unique
341        let unique_count = ids.lock().unwrap().len();
342        assert_eq!(unique_count, 8, "Thread IDs should be unique");
343    }
344}