seq_runtime/
memory_stats.rs1use std::sync::OnceLock;
37use std::sync::atomic::{AtomicU64, Ordering};
38
39const MAX_THREADS: usize = 64;
42
43#[derive(Debug)]
45pub struct MemorySlot {
46 pub thread_id: AtomicU64,
48 pub arena_bytes: AtomicU64,
50 pub pool_free_count: AtomicU64,
52 pub pool_capacity: AtomicU64,
54 pub pool_allocations: AtomicU64,
56}
57
58impl MemorySlot {
59 const fn new() -> Self {
60 Self {
61 thread_id: AtomicU64::new(0),
62 arena_bytes: AtomicU64::new(0),
63 pool_free_count: AtomicU64::new(0),
64 pool_capacity: AtomicU64::new(0),
65 pool_allocations: AtomicU64::new(0),
66 }
67 }
68}
69
70pub struct MemoryStatsRegistry {
72 slots: Box<[MemorySlot]>,
73 pub overflow_count: AtomicU64,
75}
76
77impl MemoryStatsRegistry {
78 fn new(capacity: usize) -> Self {
80 let slots: Vec<MemorySlot> = (0..capacity).map(|_| MemorySlot::new()).collect();
81 Self {
82 slots: slots.into_boxed_slice(),
83 overflow_count: AtomicU64::new(0),
84 }
85 }
86
87 pub fn register(&self) -> Option<usize> {
92 let thread_id = current_thread_id();
93
94 for (idx, slot) in self.slots.iter().enumerate() {
96 if slot
98 .thread_id
99 .compare_exchange(0, thread_id, Ordering::AcqRel, Ordering::Relaxed)
100 .is_ok()
101 {
102 return Some(idx);
103 }
104 }
105
106 self.overflow_count.fetch_add(1, Ordering::Relaxed);
108 None
109 }
110
111 #[inline]
116 pub fn update_arena(&self, slot_idx: usize, arena_bytes: usize) {
117 if let Some(slot) = self.slots.get(slot_idx) {
118 slot.arena_bytes
119 .store(arena_bytes as u64, Ordering::Relaxed);
120 }
121 }
122
123 #[inline]
128 pub fn update_pool(&self, slot_idx: usize, free_count: usize, capacity: usize) {
129 if let Some(slot) = self.slots.get(slot_idx) {
130 slot.pool_free_count
131 .store(free_count as u64, Ordering::Relaxed);
132 slot.pool_capacity.store(capacity as u64, Ordering::Relaxed);
133 }
134 }
135
136 #[inline]
138 pub fn increment_pool_allocations(&self, slot_idx: usize) {
139 if let Some(slot) = self.slots.get(slot_idx) {
140 slot.pool_allocations.fetch_add(1, Ordering::Relaxed);
141 }
142 }
143
144 pub fn aggregate_stats(&self) -> AggregateMemoryStats {
146 let mut total_arena_bytes: u64 = 0;
147 let mut total_pool_free: u64 = 0;
148 let mut total_pool_capacity: u64 = 0;
149 let mut total_pool_allocations: u64 = 0;
150 let mut active_threads: usize = 0;
151
152 for slot in self.slots.iter() {
153 let thread_id = slot.thread_id.load(Ordering::Acquire);
154 if thread_id > 0 {
155 active_threads += 1;
156 total_arena_bytes += slot.arena_bytes.load(Ordering::Relaxed);
157 total_pool_free += slot.pool_free_count.load(Ordering::Relaxed);
158 total_pool_capacity += slot.pool_capacity.load(Ordering::Relaxed);
159 total_pool_allocations += slot.pool_allocations.load(Ordering::Relaxed);
160 }
161 }
162
163 AggregateMemoryStats {
164 active_threads,
165 total_arena_bytes,
166 total_pool_free,
167 total_pool_capacity,
168 total_pool_allocations,
169 overflow_count: self.overflow_count.load(Ordering::Relaxed),
170 }
171 }
172
173 pub fn per_thread_stats(&self) -> impl Iterator<Item = ThreadMemoryStats> + '_ {
175 self.slots.iter().filter_map(|slot| {
176 let thread_id = slot.thread_id.load(Ordering::Acquire);
177 if thread_id > 0 {
178 Some(ThreadMemoryStats {
179 thread_id,
180 arena_bytes: slot.arena_bytes.load(Ordering::Relaxed),
181 pool_free_count: slot.pool_free_count.load(Ordering::Relaxed),
182 pool_capacity: slot.pool_capacity.load(Ordering::Relaxed),
183 pool_allocations: slot.pool_allocations.load(Ordering::Relaxed),
184 })
185 } else {
186 None
187 }
188 })
189 }
190
191 pub fn capacity(&self) -> usize {
193 self.slots.len()
194 }
195}
196
197#[derive(Debug, Clone, Copy)]
199pub struct AggregateMemoryStats {
200 pub active_threads: usize,
201 pub total_arena_bytes: u64,
202 pub total_pool_free: u64,
203 pub total_pool_capacity: u64,
204 pub total_pool_allocations: u64,
205 pub overflow_count: u64,
206}
207
208#[derive(Debug, Clone, Copy)]
210pub struct ThreadMemoryStats {
211 pub thread_id: u64,
212 pub arena_bytes: u64,
213 pub pool_free_count: u64,
214 pub pool_capacity: u64,
215 pub pool_allocations: u64,
216}
217
218static NEXT_THREAD_ID: AtomicU64 = AtomicU64::new(1);
221
222thread_local! {
224 static THIS_THREAD_ID: u64 = NEXT_THREAD_ID.fetch_add(1, Ordering::Relaxed);
225}
226
227fn current_thread_id() -> u64 {
232 THIS_THREAD_ID.with(|&id| id)
233}
234
235static MEMORY_REGISTRY: OnceLock<MemoryStatsRegistry> = OnceLock::new();
237
238pub fn memory_registry() -> &'static MemoryStatsRegistry {
240 MEMORY_REGISTRY.get_or_init(|| MemoryStatsRegistry::new(MAX_THREADS))
241}
242
243thread_local! {
245 static SLOT_INDEX: std::cell::Cell<Option<usize>> = const { std::cell::Cell::new(None) };
246}
247
248pub fn get_or_register_slot() -> Option<usize> {
252 SLOT_INDEX.with(|cell| {
253 if let Some(idx) = cell.get() {
254 Some(idx)
255 } else {
256 let idx = memory_registry().register();
257 cell.set(idx);
258 idx
259 }
260 })
261}
262
263#[inline]
267pub fn update_arena_stats(arena_bytes: usize) {
268 if let Some(idx) = SLOT_INDEX.with(|cell| cell.get()) {
269 memory_registry().update_arena(idx, arena_bytes);
270 }
271}
272
273#[inline]
275pub fn update_pool_stats(free_count: usize, capacity: usize) {
276 if let Some(idx) = SLOT_INDEX.with(|cell| cell.get()) {
277 memory_registry().update_pool(idx, free_count, capacity);
278 }
279}
280
281#[inline]
283pub fn increment_pool_allocations() {
284 if let Some(idx) = SLOT_INDEX.with(|cell| cell.get()) {
285 memory_registry().increment_pool_allocations(idx);
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292
293 #[test]
294 fn test_registry_basic() {
295 let registry = MemoryStatsRegistry::new(4);
296
297 let slot = registry.register();
299 assert!(slot.is_some());
300 let idx = slot.unwrap();
301
302 registry.update_arena(idx, 1024);
304 registry.update_pool(idx, 10, 100);
305
306 let stats = registry.aggregate_stats();
308 assert_eq!(stats.active_threads, 1);
309 assert_eq!(stats.total_arena_bytes, 1024);
310 assert_eq!(stats.total_pool_free, 10);
311 assert_eq!(stats.total_pool_capacity, 100);
312 }
313
314 #[test]
315 fn test_registry_overflow() {
316 let registry = MemoryStatsRegistry::new(2);
317
318 assert!(registry.register().is_some());
322 assert!(registry.register().is_some());
323
324 assert_eq!(registry.overflow_count.load(Ordering::Relaxed), 0);
328 }
329
330 #[test]
331 fn test_thread_local_slot() {
332 let slot1 = get_or_register_slot();
334
335 let slot2 = get_or_register_slot();
337 assert_eq!(slot1, slot2);
338
339 }
343
344 #[test]
345 fn test_update_helpers() {
346 let slot = get_or_register_slot();
348
349 if slot.is_some() {
350 update_arena_stats(2048);
352 update_pool_stats(5, 50);
353 increment_pool_allocations();
354 increment_pool_allocations();
355
356 let stats = memory_registry().aggregate_stats();
358 assert!(stats.total_arena_bytes >= 2048); assert!(stats.total_pool_allocations >= 2);
360 }
361 }
363
364 #[test]
365 fn test_per_thread_stats() {
366 let slot = get_or_register_slot();
368
369 if slot.is_some() {
370 let unique_arena_bytes: usize = 999_777_555;
372 update_arena_stats(unique_arena_bytes);
373
374 let per_thread: Vec<_> = memory_registry().per_thread_stats().collect();
376 assert!(!per_thread.is_empty());
377
378 let our_stats = per_thread
380 .iter()
381 .find(|s| s.arena_bytes == unique_arena_bytes as u64);
382 assert!(our_stats.is_some());
383 }
384 }
386
387 #[test]
388 fn test_concurrent_registration() {
389 use std::thread;
390
391 let handles: Vec<_> = (0..4)
393 .map(|i| {
394 thread::spawn(move || {
395 let slot = get_or_register_slot();
396 if slot.is_some() {
397 update_arena_stats(1000 * (i + 1));
399 update_pool_stats(i * 10, 100);
400 increment_pool_allocations();
401 }
402 slot.is_some()
403 })
404 })
405 .collect();
406
407 let mut registered_count = 0;
409 for h in handles {
410 if h.join().unwrap() {
411 registered_count += 1;
412 }
413 }
414
415 let stats = memory_registry().aggregate_stats();
417 assert!(stats.active_threads >= registered_count);
419 if registered_count > 0 {
421 assert!(stats.total_pool_allocations >= registered_count as u64);
422 }
423 }
424
425 #[test]
426 fn test_thread_ids_are_unique() {
427 use std::collections::HashSet;
428 use std::sync::{Arc, Mutex};
429 use std::thread;
430
431 let ids = Arc::new(Mutex::new(HashSet::new()));
432
433 let handles: Vec<_> = (0..8)
434 .map(|_| {
435 let ids = Arc::clone(&ids);
436 thread::spawn(move || {
437 let id = current_thread_id();
438 ids.lock().unwrap().insert(id);
439 id
440 })
441 })
442 .collect();
443
444 for h in handles {
445 h.join().unwrap();
446 }
447
448 let unique_count = ids.lock().unwrap().len();
450 assert_eq!(unique_count, 8, "Thread IDs should be unique");
451 }
452}