seq_core/memory_stats.rs
1//! Cross-thread memory statistics registry
2//!
3//! Provides visibility into arena memory usage across all worker threads.
4//! Each thread registers itself and updates its own slot with minimal overhead.
5//!
6//! # Design
7//!
8//! The challenge: Arena is thread-local, but diagnostics runs on a
9//! separate signal handler thread. We solve this with a global registry where
10//! each thread has an exclusive slot for its stats.
11//!
12//! ```text
13//! ┌─────────────────────────────────────────────────────────┐
14//! │ MemoryStatsRegistry (global) │
15//! ├─────────────────────────────────────────────────────────┤
16//! │ slots: [MemorySlot; MAX_THREADS] │
17//! │ │
18//! │ ┌──────────────────┐ ┌──────────────────┐ │
19//! │ │ Slot 0 (Thread A)│ │ Slot 1 (Thread B)│ ... │
20//! │ │ thread_id: u64 │ │ thread_id: u64 │ │
21//! │ │ arena_bytes: u64 │ │ arena_bytes: u64 │ │
22//! │ └──────────────────┘ └──────────────────┘ │
23//! └─────────────────────────────────────────────────────────┘
24//! ```
25//!
26//! # Performance
27//!
28//! - **Registration**: One-time CAS per thread (on first arena access)
29//! - **Updates**: Single atomic store per operation (~1-2 cycles, no contention)
30//! - **Reads**: Only during diagnostics (SIGQUIT), iterates all slots
31//!
32//! This maintains the "fast path stays fast" principle.
33
34use std::sync::OnceLock;
35use std::sync::atomic::{AtomicU64, Ordering};
36
37/// Maximum number of worker threads we can track
38/// May's default is typically fewer threads, but we allow headroom
39const MAX_THREADS: usize = 64;
40
41/// Statistics for a single thread's memory usage
42#[derive(Debug)]
43pub struct MemorySlot {
44 /// Thread ID (0 = slot is free)
45 pub thread_id: AtomicU64,
46 /// Arena allocated bytes
47 pub arena_bytes: AtomicU64,
48 /// Peak arena allocated bytes (high-water mark)
49 pub peak_arena_bytes: AtomicU64,
50}
51
52impl MemorySlot {
53 const fn new() -> Self {
54 Self {
55 thread_id: AtomicU64::new(0),
56 arena_bytes: AtomicU64::new(0),
57 peak_arena_bytes: AtomicU64::new(0),
58 }
59 }
60}
61
62/// Global registry for cross-thread memory statistics
63pub struct MemoryStatsRegistry {
64 slots: Box<[MemorySlot]>,
65 /// Count of threads that couldn't get a slot
66 pub overflow_count: AtomicU64,
67}
68
69impl MemoryStatsRegistry {
70 /// Create a new registry with the given capacity
71 fn new(capacity: usize) -> Self {
72 let slots: Vec<MemorySlot> = (0..capacity).map(|_| MemorySlot::new()).collect();
73 Self {
74 slots: slots.into_boxed_slice(),
75 overflow_count: AtomicU64::new(0),
76 }
77 }
78
79 /// Register a thread and get its slot index
80 ///
81 /// Returns Some(index) if a slot was claimed, None if registry is full.
82 /// Uses the current thread's ID as the identifier.
83 pub fn register(&self) -> Option<usize> {
84 let thread_id = current_thread_id();
85
86 // Scan for a free slot
87 for (idx, slot) in self.slots.iter().enumerate() {
88 // Try to claim this slot (CAS from 0 to thread_id)
89 if slot
90 .thread_id
91 .compare_exchange(0, thread_id, Ordering::AcqRel, Ordering::Relaxed)
92 .is_ok()
93 {
94 return Some(idx);
95 }
96 }
97
98 // Registry full
99 self.overflow_count.fetch_add(1, Ordering::Relaxed);
100 None
101 }
102
103 /// Update arena stats for a slot
104 ///
105 /// # Safety
106 /// Caller must own the slot (be the thread that registered it)
107 #[inline]
108 pub fn update_arena(&self, slot_idx: usize, arena_bytes: usize) {
109 if let Some(slot) = self.slots.get(slot_idx) {
110 let bytes = arena_bytes as u64;
111 slot.arena_bytes.store(bytes, Ordering::Relaxed);
112
113 // Update peak via CAS loop (same pattern as PEAK_STRANDS in scheduler.rs)
114 let mut peak = slot.peak_arena_bytes.load(Ordering::Relaxed);
115 while bytes > peak {
116 match slot.peak_arena_bytes.compare_exchange_weak(
117 peak,
118 bytes,
119 Ordering::Relaxed,
120 Ordering::Relaxed,
121 ) {
122 Ok(_) => break,
123 Err(current) => peak = current,
124 }
125 }
126 }
127 }
128
129 /// Get aggregated memory statistics across all threads
130 pub fn aggregate_stats(&self) -> AggregateMemoryStats {
131 let mut total_arena_bytes: u64 = 0;
132 let mut total_peak_arena_bytes: u64 = 0;
133 let mut active_threads: usize = 0;
134
135 for slot in self.slots.iter() {
136 let thread_id = slot.thread_id.load(Ordering::Acquire);
137 if thread_id > 0 {
138 active_threads += 1;
139 total_arena_bytes += slot.arena_bytes.load(Ordering::Relaxed);
140 total_peak_arena_bytes += slot.peak_arena_bytes.load(Ordering::Relaxed);
141 }
142 }
143
144 AggregateMemoryStats {
145 active_threads,
146 total_arena_bytes,
147 total_peak_arena_bytes,
148 overflow_count: self.overflow_count.load(Ordering::Relaxed),
149 }
150 }
151
152 /// Get registry capacity
153 pub fn capacity(&self) -> usize {
154 self.slots.len()
155 }
156}
157
158/// Aggregated memory statistics across all threads
159#[derive(Debug, Clone, Copy)]
160pub struct AggregateMemoryStats {
161 pub active_threads: usize,
162 pub total_arena_bytes: u64,
163 pub total_peak_arena_bytes: u64,
164 pub overflow_count: u64,
165}
166
167/// Global counter for generating unique thread IDs
168/// Starts at 1 because 0 means "empty slot"
169static NEXT_THREAD_ID: AtomicU64 = AtomicU64::new(1);
170
171// Thread-local storage for this thread's unique ID
172thread_local! {
173 static THIS_THREAD_ID: u64 = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
174}
175
176/// Get a unique ID for the current thread
177///
178/// Uses a global atomic counter to guarantee uniqueness (no hash collisions).
179/// Thread IDs start at 1 and increment monotonically.
180fn current_thread_id() -> u64 {
181 THIS_THREAD_ID.with(|&id| id)
182}
183
184// Global registry instance
185static MEMORY_REGISTRY: OnceLock<MemoryStatsRegistry> = OnceLock::new();
186
187/// Get the global memory stats registry
188pub fn memory_registry() -> &'static MemoryStatsRegistry {
189 MEMORY_REGISTRY.get_or_init(|| MemoryStatsRegistry::new(MAX_THREADS))
190}
191
192// Thread-local slot index (cached after first registration)
193thread_local! {
194 static SLOT_INDEX: std::cell::Cell<Option<usize>> = const { std::cell::Cell::new(None) };
195}
196
197/// Get or register the current thread's slot index
198///
199/// Returns Some(index) if registered (or already was), None if registry is full.
200pub fn get_or_register_slot() -> Option<usize> {
201 SLOT_INDEX.with(|cell| {
202 if let Some(idx) = cell.get() {
203 Some(idx)
204 } else {
205 let idx = memory_registry().register();
206 cell.set(idx);
207 idx
208 }
209 })
210}
211
212/// Update arena stats for the current thread
213///
214/// Call this after arena operations to keep stats current.
215#[inline]
216pub fn update_arena_stats(arena_bytes: usize) {
217 if let Some(idx) = SLOT_INDEX.with(|cell| cell.get()) {
218 memory_registry().update_arena(idx, arena_bytes);
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225
226 #[test]
227 fn test_registry_basic() {
228 let registry = MemoryStatsRegistry::new(4);
229
230 // Register should succeed
231 let slot = registry.register();
232 assert!(slot.is_some());
233 let idx = slot.unwrap();
234
235 // Update stats
236 registry.update_arena(idx, 1024);
237
238 // Aggregate should reflect our updates
239 let stats = registry.aggregate_stats();
240 assert_eq!(stats.active_threads, 1);
241 assert_eq!(stats.total_arena_bytes, 1024);
242 }
243
244 #[test]
245 fn test_registry_overflow() {
246 let registry = MemoryStatsRegistry::new(2);
247
248 // Fill up the registry from different "threads" (simulated)
249 // Note: In real usage, each thread gets one slot
250 // Here we just test the CAS logic
251 assert!(registry.register().is_some());
252 assert!(registry.register().is_some());
253
254 // Third registration should fail (we're on the same thread, so it won't
255 // actually fail - but if we had 3 threads, the 3rd would fail)
256 // For now, just verify overflow_count is accessible
257 assert_eq!(registry.overflow_count.load(Ordering::Relaxed), 0);
258 }
259
260 #[test]
261 fn test_thread_local_slot() {
262 // First call should register (or return cached if already registered)
263 let slot1 = get_or_register_slot();
264
265 // Second call should return same value (cached)
266 let slot2 = get_or_register_slot();
267 assert_eq!(slot1, slot2);
268
269 // If registration succeeded, slot should be Some
270 // If registry was full, slot is None (acceptable in parallel test execution)
271 // Either way, the caching behavior is correct
272 }
273
274 #[test]
275 fn test_update_helpers() {
276 // Try to register (may fail if registry full from parallel tests)
277 let slot = get_or_register_slot();
278
279 if slot.is_some() {
280 // Update stats
281 update_arena_stats(2048);
282
283 // Verify via aggregate
284 let stats = memory_registry().aggregate_stats();
285 assert!(stats.total_arena_bytes >= 2048); // May have other test data
286 }
287 // If slot is None, registry was full - that's OK for this test
288 }
289
290 #[test]
291 fn test_concurrent_registration() {
292 use std::thread;
293
294 // Spawn multiple threads that each register and update stats
295 let handles: Vec<_> = (0..4)
296 .map(|i| {
297 thread::spawn(move || {
298 let slot = get_or_register_slot();
299 if slot.is_some() {
300 // Each thread sets a unique arena value
301 update_arena_stats(1000 * (i + 1));
302 }
303 slot.is_some()
304 })
305 })
306 .collect();
307
308 // Wait for all threads and count successful registrations
309 let mut registered_count = 0;
310 for h in handles {
311 if h.join().unwrap() {
312 registered_count += 1;
313 }
314 }
315
316 // Verify aggregate stats reflect the registrations
317 let stats = memory_registry().aggregate_stats();
318 // active_threads includes all threads that have registered (including test threads)
319 assert!(stats.active_threads >= registered_count);
320 }
321
322 #[test]
323 fn test_thread_ids_are_unique() {
324 use std::collections::HashSet;
325 use std::sync::{Arc, Mutex};
326 use std::thread;
327
328 let ids = Arc::new(Mutex::new(HashSet::new()));
329
330 let handles: Vec<_> = (0..8)
331 .map(|_| {
332 let ids = Arc::clone(&ids);
333 thread::spawn(move || {
334 let id = current_thread_id();
335 ids.lock().unwrap().insert(id);
336 id
337 })
338 })
339 .collect();
340
341 for h in handles {
342 h.join().unwrap();
343 }
344
345 // All thread IDs should be unique
346 let unique_count = ids.lock().unwrap().len();
347 assert_eq!(unique_count, 8, "Thread IDs should be unique");
348 }
349}