seq_core/memory_stats.rs
1//! Cross-thread memory statistics registry
2//!
3//! Provides visibility into arena memory usage across all worker threads.
4//! Each thread registers itself and updates its own slot with minimal overhead.
5//!
6//! # Design
7//!
8//! The challenge: Arena is thread-local, but diagnostics runs on a
9//! separate signal handler thread. We solve this with a global registry where
10//! each thread has an exclusive slot for its stats.
11//!
12//! ```text
13//! ┌─────────────────────────────────────────────────────────┐
14//! │ MemoryStatsRegistry (global) │
15//! ├─────────────────────────────────────────────────────────┤
16//! │ slots: [MemorySlot; MAX_THREADS] │
17//! │ │
18//! │ ┌──────────────────┐ ┌──────────────────┐ │
19//! │ │ Slot 0 (Thread A)│ │ Slot 1 (Thread B)│ ... │
20//! │ │ thread_id: u64 │ │ thread_id: u64 │ │
21//! │ │ arena_bytes: u64 │ │ arena_bytes: u64 │ │
22//! │ └──────────────────┘ └──────────────────┘ │
23//! └─────────────────────────────────────────────────────────┘
24//! ```
25//!
26//! # Performance
27//!
28//! - **Registration**: One-time CAS per thread (on first arena access)
29//! - **Updates**: Single atomic store per operation (~1-2 cycles, no contention)
30//! - **Reads**: Only during diagnostics (SIGQUIT), iterates all slots
31//!
32//! This maintains the "fast path stays fast" principle.
33
34use std::sync::OnceLock;
35use std::sync::atomic::{AtomicU64, Ordering};
36
37/// Maximum number of worker threads we can track
38/// May's default is typically fewer threads, but we allow headroom
39const MAX_THREADS: usize = 64;
40
41/// Statistics for a single thread's memory usage
42#[derive(Debug)]
43pub struct MemorySlot {
44 /// Thread ID (0 = slot is free)
45 pub thread_id: AtomicU64,
46 /// Arena allocated bytes
47 pub arena_bytes: AtomicU64,
48}
49
50impl MemorySlot {
51 const fn new() -> Self {
52 Self {
53 thread_id: AtomicU64::new(0),
54 arena_bytes: AtomicU64::new(0),
55 }
56 }
57}
58
59/// Global registry for cross-thread memory statistics
60pub struct MemoryStatsRegistry {
61 slots: Box<[MemorySlot]>,
62 /// Count of threads that couldn't get a slot
63 pub overflow_count: AtomicU64,
64}
65
66impl MemoryStatsRegistry {
67 /// Create a new registry with the given capacity
68 fn new(capacity: usize) -> Self {
69 let slots: Vec<MemorySlot> = (0..capacity).map(|_| MemorySlot::new()).collect();
70 Self {
71 slots: slots.into_boxed_slice(),
72 overflow_count: AtomicU64::new(0),
73 }
74 }
75
76 /// Register a thread and get its slot index
77 ///
78 /// Returns Some(index) if a slot was claimed, None if registry is full.
79 /// Uses the current thread's ID as the identifier.
80 pub fn register(&self) -> Option<usize> {
81 let thread_id = current_thread_id();
82
83 // Scan for a free slot
84 for (idx, slot) in self.slots.iter().enumerate() {
85 // Try to claim this slot (CAS from 0 to thread_id)
86 if slot
87 .thread_id
88 .compare_exchange(0, thread_id, Ordering::AcqRel, Ordering::Relaxed)
89 .is_ok()
90 {
91 return Some(idx);
92 }
93 }
94
95 // Registry full
96 self.overflow_count.fetch_add(1, Ordering::Relaxed);
97 None
98 }
99
100 /// Update arena stats for a slot
101 ///
102 /// # Safety
103 /// Caller must own the slot (be the thread that registered it)
104 #[inline]
105 pub fn update_arena(&self, slot_idx: usize, arena_bytes: usize) {
106 if let Some(slot) = self.slots.get(slot_idx) {
107 slot.arena_bytes
108 .store(arena_bytes as u64, Ordering::Relaxed);
109 }
110 }
111
112 /// Get aggregated memory statistics across all threads
113 pub fn aggregate_stats(&self) -> AggregateMemoryStats {
114 let mut total_arena_bytes: u64 = 0;
115 let mut active_threads: usize = 0;
116
117 for slot in self.slots.iter() {
118 let thread_id = slot.thread_id.load(Ordering::Acquire);
119 if thread_id > 0 {
120 active_threads += 1;
121 total_arena_bytes += slot.arena_bytes.load(Ordering::Relaxed);
122 }
123 }
124
125 AggregateMemoryStats {
126 active_threads,
127 total_arena_bytes,
128 overflow_count: self.overflow_count.load(Ordering::Relaxed),
129 }
130 }
131
132 /// Get registry capacity
133 pub fn capacity(&self) -> usize {
134 self.slots.len()
135 }
136}
137
138/// Aggregated memory statistics across all threads
139#[derive(Debug, Clone, Copy)]
140pub struct AggregateMemoryStats {
141 pub active_threads: usize,
142 pub total_arena_bytes: u64,
143 pub overflow_count: u64,
144}
145
146/// Global counter for generating unique thread IDs
147/// Starts at 1 because 0 means "empty slot"
148static NEXT_THREAD_ID: AtomicU64 = AtomicU64::new(1);
149
150// Thread-local storage for this thread's unique ID
151thread_local! {
152 static THIS_THREAD_ID: u64 = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
153}
154
155/// Get a unique ID for the current thread
156///
157/// Uses a global atomic counter to guarantee uniqueness (no hash collisions).
158/// Thread IDs start at 1 and increment monotonically.
159fn current_thread_id() -> u64 {
160 THIS_THREAD_ID.with(|&id| id)
161}
162
163// Global registry instance
164static MEMORY_REGISTRY: OnceLock<MemoryStatsRegistry> = OnceLock::new();
165
166/// Get the global memory stats registry
167pub fn memory_registry() -> &'static MemoryStatsRegistry {
168 MEMORY_REGISTRY.get_or_init(|| MemoryStatsRegistry::new(MAX_THREADS))
169}
170
171// Thread-local slot index (cached after first registration)
172thread_local! {
173 static SLOT_INDEX: std::cell::Cell<Option<usize>> = const { std::cell::Cell::new(None) };
174}
175
176/// Get or register the current thread's slot index
177///
178/// Returns Some(index) if registered (or already was), None if registry is full.
179pub fn get_or_register_slot() -> Option<usize> {
180 SLOT_INDEX.with(|cell| {
181 if let Some(idx) = cell.get() {
182 Some(idx)
183 } else {
184 let idx = memory_registry().register();
185 cell.set(idx);
186 idx
187 }
188 })
189}
190
191/// Update arena stats for the current thread
192///
193/// Call this after arena operations to keep stats current.
194#[inline]
195pub fn update_arena_stats(arena_bytes: usize) {
196 if let Some(idx) = SLOT_INDEX.with(|cell| cell.get()) {
197 memory_registry().update_arena(idx, arena_bytes);
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204
205 #[test]
206 fn test_registry_basic() {
207 let registry = MemoryStatsRegistry::new(4);
208
209 // Register should succeed
210 let slot = registry.register();
211 assert!(slot.is_some());
212 let idx = slot.unwrap();
213
214 // Update stats
215 registry.update_arena(idx, 1024);
216
217 // Aggregate should reflect our updates
218 let stats = registry.aggregate_stats();
219 assert_eq!(stats.active_threads, 1);
220 assert_eq!(stats.total_arena_bytes, 1024);
221 }
222
223 #[test]
224 fn test_registry_overflow() {
225 let registry = MemoryStatsRegistry::new(2);
226
227 // Fill up the registry from different "threads" (simulated)
228 // Note: In real usage, each thread gets one slot
229 // Here we just test the CAS logic
230 assert!(registry.register().is_some());
231 assert!(registry.register().is_some());
232
233 // Third registration should fail (we're on the same thread, so it won't
234 // actually fail - but if we had 3 threads, the 3rd would fail)
235 // For now, just verify overflow_count is accessible
236 assert_eq!(registry.overflow_count.load(Ordering::Relaxed), 0);
237 }
238
239 #[test]
240 fn test_thread_local_slot() {
241 // First call should register (or return cached if already registered)
242 let slot1 = get_or_register_slot();
243
244 // Second call should return same value (cached)
245 let slot2 = get_or_register_slot();
246 assert_eq!(slot1, slot2);
247
248 // If registration succeeded, slot should be Some
249 // If registry was full, slot is None (acceptable in parallel test execution)
250 // Either way, the caching behavior is correct
251 }
252
253 #[test]
254 fn test_update_helpers() {
255 // Try to register (may fail if registry full from parallel tests)
256 let slot = get_or_register_slot();
257
258 if slot.is_some() {
259 // Update stats
260 update_arena_stats(2048);
261
262 // Verify via aggregate
263 let stats = memory_registry().aggregate_stats();
264 assert!(stats.total_arena_bytes >= 2048); // May have other test data
265 }
266 // If slot is None, registry was full - that's OK for this test
267 }
268
269 #[test]
270 fn test_concurrent_registration() {
271 use std::thread;
272
273 // Spawn multiple threads that each register and update stats
274 let handles: Vec<_> = (0..4)
275 .map(|i| {
276 thread::spawn(move || {
277 let slot = get_or_register_slot();
278 if slot.is_some() {
279 // Each thread sets a unique arena value
280 update_arena_stats(1000 * (i + 1));
281 }
282 slot.is_some()
283 })
284 })
285 .collect();
286
287 // Wait for all threads and count successful registrations
288 let mut registered_count = 0;
289 for h in handles {
290 if h.join().unwrap() {
291 registered_count += 1;
292 }
293 }
294
295 // Verify aggregate stats reflect the registrations
296 let stats = memory_registry().aggregate_stats();
297 // active_threads includes all threads that have registered (including test threads)
298 assert!(stats.active_threads >= registered_count);
299 }
300
301 #[test]
302 fn test_thread_ids_are_unique() {
303 use std::collections::HashSet;
304 use std::sync::{Arc, Mutex};
305 use std::thread;
306
307 let ids = Arc::new(Mutex::new(HashSet::new()));
308
309 let handles: Vec<_> = (0..8)
310 .map(|_| {
311 let ids = Arc::clone(&ids);
312 thread::spawn(move || {
313 let id = current_thread_id();
314 ids.lock().unwrap().insert(id);
315 id
316 })
317 })
318 .collect();
319
320 for h in handles {
321 h.join().unwrap();
322 }
323
324 // All thread IDs should be unique
325 let unique_count = ids.lock().unwrap().len();
326 assert_eq!(unique_count, 8, "Thread IDs should be unique");
327 }
328}