seq_core/memory_stats.rs
1//! Cross-thread memory statistics registry
2//!
3//! Provides visibility into arena memory usage across all worker threads.
4//! Each thread registers itself and updates its own slot with minimal overhead.
5//!
6//! # Design
7//!
8//! The challenge: Arena is thread-local, but diagnostics runs on a
9//! separate signal handler thread. We solve this with a global registry where
10//! each thread has an exclusive slot for its stats.
11//!
12//! ```text
13//! ┌─────────────────────────────────────────────────────────┐
14//! │ MemoryStatsRegistry (global) │
15//! ├─────────────────────────────────────────────────────────┤
16//! │ slots: [MemorySlot; MAX_THREADS] │
17//! │ │
18//! │ ┌──────────────────┐ ┌──────────────────┐ │
19//! │ │ Slot 0 (Thread A)│ │ Slot 1 (Thread B)│ ... │
20//! │ │ thread_id: u64 │ │ thread_id: u64 │ │
21//! │ │ arena_bytes: u64 │ │ arena_bytes: u64 │ │
22//! │ └──────────────────┘ └──────────────────┘ │
23//! └─────────────────────────────────────────────────────────┘
24//! ```
25//!
26//! # Performance
27//!
28//! - **Registration**: One-time CAS per thread (on first arena access)
29//! - **Updates**: Single atomic store per operation (~1-2 cycles, no contention)
30//! - **Reads**: Only during diagnostics (SIGQUIT), iterates all slots
31//!
32//! This maintains the "fast path stays fast" principle.
33
34use std::sync::OnceLock;
35use std::sync::atomic::{AtomicU64, Ordering};
36
37/// Maximum number of worker threads we can track
38/// May's default is typically fewer threads, but we allow headroom
39const MAX_THREADS: usize = 64;
40
41/// Statistics for a single thread's memory usage
42#[derive(Debug)]
43pub struct MemorySlot {
44 /// Thread ID (0 = slot is free)
45 pub thread_id: AtomicU64,
46 /// Arena allocated bytes
47 pub arena_bytes: AtomicU64,
48 /// Peak arena allocated bytes (high-water mark)
49 pub peak_arena_bytes: AtomicU64,
50}
51
52impl MemorySlot {
53 const fn new() -> Self {
54 Self {
55 thread_id: AtomicU64::new(0),
56 arena_bytes: AtomicU64::new(0),
57 peak_arena_bytes: AtomicU64::new(0),
58 }
59 }
60}
61
62/// Aggregated memory statistics across all threads
63#[derive(Debug, Clone, Copy)]
64pub struct AggregateMemoryStats {
65 pub active_threads: usize,
66 pub total_arena_bytes: u64,
67 pub total_peak_arena_bytes: u64,
68 pub overflow_count: u64,
69}
70
71/// Global registry for cross-thread memory statistics
72pub struct MemoryStatsRegistry {
73 slots: Box<[MemorySlot]>,
74 /// Count of threads that couldn't get a slot
75 pub overflow_count: AtomicU64,
76}
77
78impl MemoryStatsRegistry {
79 /// Create a new registry with the given capacity
80 fn new(capacity: usize) -> Self {
81 let slots: Vec<MemorySlot> = (0..capacity).map(|_| MemorySlot::new()).collect();
82 Self {
83 slots: slots.into_boxed_slice(),
84 overflow_count: AtomicU64::new(0),
85 }
86 }
87
88 /// Register a thread and get its slot index
89 ///
90 /// Returns Some(index) if a slot was claimed, None if registry is full.
91 /// Uses the current thread's ID as the identifier.
92 fn register(&self) -> Option<usize> {
93 let thread_id = current_thread_id();
94
95 // Scan for a free slot
96 for (idx, slot) in self.slots.iter().enumerate() {
97 // Try to claim this slot (CAS from 0 to thread_id)
98 if slot
99 .thread_id
100 .compare_exchange(0, thread_id, Ordering::AcqRel, Ordering::Relaxed)
101 .is_ok()
102 {
103 return Some(idx);
104 }
105 }
106
107 // Registry full
108 self.overflow_count.fetch_add(1, Ordering::Relaxed);
109 None
110 }
111
112 /// Update arena stats for a slot
113 ///
114 /// # Safety
115 /// Caller must own the slot (be the thread that registered it)
116 #[inline]
117 fn update_arena(&self, slot_idx: usize, arena_bytes: usize) {
118 if let Some(slot) = self.slots.get(slot_idx) {
119 let bytes = arena_bytes as u64;
120 slot.arena_bytes.store(bytes, Ordering::Relaxed);
121
122 // Update peak via CAS loop (same pattern as PEAK_STRANDS in scheduler.rs)
123 let mut peak = slot.peak_arena_bytes.load(Ordering::Relaxed);
124 while bytes > peak {
125 match slot.peak_arena_bytes.compare_exchange_weak(
126 peak,
127 bytes,
128 Ordering::Relaxed,
129 Ordering::Relaxed,
130 ) {
131 Ok(_) => break,
132 Err(current) => peak = current,
133 }
134 }
135 }
136 }
137
138 /// Get aggregated memory statistics across all threads
139 pub fn aggregate_stats(&self) -> AggregateMemoryStats {
140 let mut total_arena_bytes: u64 = 0;
141 let mut total_peak_arena_bytes: u64 = 0;
142 let mut active_threads: usize = 0;
143
144 for slot in self.slots.iter() {
145 let thread_id = slot.thread_id.load(Ordering::Acquire);
146 if thread_id > 0 {
147 active_threads += 1;
148 total_arena_bytes += slot.arena_bytes.load(Ordering::Relaxed);
149 total_peak_arena_bytes += slot.peak_arena_bytes.load(Ordering::Relaxed);
150 }
151 }
152
153 AggregateMemoryStats {
154 active_threads,
155 total_arena_bytes,
156 total_peak_arena_bytes,
157 overflow_count: self.overflow_count.load(Ordering::Relaxed),
158 }
159 }
160}
161
162/// Global counter for generating unique thread IDs
163/// Starts at 1 because 0 means "empty slot"
164static NEXT_THREAD_ID: AtomicU64 = AtomicU64::new(1);
165
166// Thread-local storage for this thread's unique ID
167thread_local! {
168 static THIS_THREAD_ID: u64 = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
169}
170
171/// Get a unique ID for the current thread
172///
173/// Uses a global atomic counter to guarantee uniqueness (no hash collisions).
174/// Thread IDs start at 1 and increment monotonically.
175fn current_thread_id() -> u64 {
176 THIS_THREAD_ID.with(|&id| id)
177}
178
179// Global registry instance
180static MEMORY_REGISTRY: OnceLock<MemoryStatsRegistry> = OnceLock::new();
181
182/// Get the global memory stats registry
183pub fn memory_registry() -> &'static MemoryStatsRegistry {
184 MEMORY_REGISTRY.get_or_init(|| MemoryStatsRegistry::new(MAX_THREADS))
185}
186
187// Thread-local slot index (cached after first registration)
188thread_local! {
189 static SLOT_INDEX: std::cell::Cell<Option<usize>> = const { std::cell::Cell::new(None) };
190}
191
192/// Get or register the current thread's slot index
193///
194/// Returns Some(index) if registered (or already was), None if registry is full.
195pub fn get_or_register_slot() -> Option<usize> {
196 SLOT_INDEX.with(|cell| {
197 if let Some(idx) = cell.get() {
198 Some(idx)
199 } else {
200 let idx = memory_registry().register();
201 cell.set(idx);
202 idx
203 }
204 })
205}
206
207/// Update arena stats for the current thread
208///
209/// Call this after arena operations to keep stats current.
210#[inline]
211pub fn update_arena_stats(arena_bytes: usize) {
212 if let Some(idx) = SLOT_INDEX.with(|cell| cell.get()) {
213 memory_registry().update_arena(idx, arena_bytes);
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220
221 #[test]
222 fn test_registry_basic() {
223 let registry = MemoryStatsRegistry::new(4);
224
225 // Register should succeed
226 let slot = registry.register();
227 assert!(slot.is_some());
228 let idx = slot.unwrap();
229
230 // Update stats
231 registry.update_arena(idx, 1024);
232
233 // Aggregate should reflect our updates
234 let stats = registry.aggregate_stats();
235 assert_eq!(stats.active_threads, 1);
236 assert_eq!(stats.total_arena_bytes, 1024);
237 }
238
239 #[test]
240 fn test_registry_overflow() {
241 let registry = MemoryStatsRegistry::new(2);
242
243 // Fill up the registry from different "threads" (simulated)
244 // Note: In real usage, each thread gets one slot
245 // Here we just test the CAS logic
246 assert!(registry.register().is_some());
247 assert!(registry.register().is_some());
248
249 // Third registration should fail (we're on the same thread, so it won't
250 // actually fail - but if we had 3 threads, the 3rd would fail)
251 // For now, just verify overflow_count is accessible
252 assert_eq!(registry.overflow_count.load(Ordering::Relaxed), 0);
253 }
254
255 #[test]
256 fn test_thread_local_slot() {
257 // First call should register (or return cached if already registered)
258 let slot1 = get_or_register_slot();
259
260 // Second call should return same value (cached)
261 let slot2 = get_or_register_slot();
262 assert_eq!(slot1, slot2);
263
264 // If registration succeeded, slot should be Some
265 // If registry was full, slot is None (acceptable in parallel test execution)
266 // Either way, the caching behavior is correct
267 }
268
269 #[test]
270 fn test_update_helpers() {
271 // Try to register (may fail if registry full from parallel tests)
272 let slot = get_or_register_slot();
273
274 if slot.is_some() {
275 // Update stats
276 update_arena_stats(2048);
277
278 // Verify via aggregate
279 let stats = memory_registry().aggregate_stats();
280 assert!(stats.total_arena_bytes >= 2048); // May have other test data
281 }
282 // If slot is None, registry was full - that's OK for this test
283 }
284
285 #[test]
286 fn test_concurrent_registration() {
287 use std::thread;
288
289 // Spawn multiple threads that each register and update stats
290 let handles: Vec<_> = (0..4)
291 .map(|i| {
292 thread::spawn(move || {
293 let slot = get_or_register_slot();
294 if slot.is_some() {
295 // Each thread sets a unique arena value
296 update_arena_stats(1000 * (i + 1));
297 }
298 slot.is_some()
299 })
300 })
301 .collect();
302
303 // Wait for all threads and count successful registrations
304 let mut registered_count = 0;
305 for h in handles {
306 if h.join().unwrap() {
307 registered_count += 1;
308 }
309 }
310
311 // Verify aggregate stats reflect the registrations
312 let stats = memory_registry().aggregate_stats();
313 // active_threads includes all threads that have registered (including test threads)
314 assert!(stats.active_threads >= registered_count);
315 }
316
317 #[test]
318 fn test_thread_ids_are_unique() {
319 use std::collections::HashSet;
320 use std::sync::{Arc, Mutex};
321 use std::thread;
322
323 let ids = Arc::new(Mutex::new(HashSet::new()));
324
325 let handles: Vec<_> = (0..8)
326 .map(|_| {
327 let ids = Arc::clone(&ids);
328 thread::spawn(move || {
329 let id = current_thread_id();
330 ids.lock().unwrap().insert(id);
331 id
332 })
333 })
334 .collect();
335
336 for h in handles {
337 h.join().unwrap();
338 }
339
340 // All thread IDs should be unique
341 let unique_count = ids.lock().unwrap().len();
342 assert_eq!(unique_count, 8, "Thread IDs should be unique");
343 }
344}