1use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering};
28
29use super::hash::hash_frames;
30use super::raw_mem::{alloc_pages, free_pages};
31use super::walk::Frames;
32
33const DEFAULT_BUCKETS: usize = 4_096;
34const MIN_BUCKETS: usize = 64;
35const MAX_BUCKETS: usize = 1 << 20; #[derive(Debug, Clone, Copy)]
44pub struct CallSiteStats {
45 pub frames: [u64; 8],
47 pub frame_count: u8,
49 pub count: u64,
51 pub total_bytes: u64,
54}
55
56#[repr(C, align(16))]
57struct Bucket {
58 hash: AtomicU64,
59 count: AtomicU64,
60 total_bytes: AtomicU64,
61 frame_count: AtomicU64,
62 sample_frames: [AtomicU64; 8],
63}
64
65const BUCKET_SIZE: usize = core::mem::size_of::<Bucket>();
66
67static TABLE_BASE: AtomicPtr<Bucket> = AtomicPtr::new(core::ptr::null_mut());
68static TABLE_BUCKETS: AtomicUsize = AtomicUsize::new(0);
69static TABLE_MASK: AtomicUsize = AtomicUsize::new(0);
70
71fn configured_bucket_count() -> usize {
72 let raw = match std::env::var("MOD_ALLOC_BUCKETS") {
73 Ok(s) => s,
74 Err(_) => return DEFAULT_BUCKETS,
75 };
76 let n: usize = raw.trim().parse().unwrap_or(DEFAULT_BUCKETS);
77 let n = n.clamp(MIN_BUCKETS, MAX_BUCKETS);
78 n.next_power_of_two()
79}
80
81fn ensure_init() -> Option<(*mut Bucket, usize, usize)> {
82 let existing = TABLE_BASE.load(Ordering::Acquire);
83 if !existing.is_null() {
84 let buckets = TABLE_BUCKETS.load(Ordering::Acquire);
85 let mask = TABLE_MASK.load(Ordering::Relaxed);
86 return Some((existing, buckets, mask));
87 }
88
89 let buckets = configured_bucket_count();
90 let bytes = buckets * BUCKET_SIZE;
91 let pages = unsafe { alloc_pages(bytes) };
94 if pages.is_null() {
95 return None;
96 }
97 let new_base = pages as *mut Bucket;
98
99 match TABLE_BASE.compare_exchange(
100 core::ptr::null_mut(),
101 new_base,
102 Ordering::Release,
103 Ordering::Acquire,
104 ) {
105 Ok(_) => {
106 TABLE_BUCKETS.store(buckets, Ordering::Release);
107 TABLE_MASK.store(buckets - 1, Ordering::Release);
108 Some((new_base, buckets, buckets - 1))
109 }
110 Err(other) => {
111 unsafe { free_pages(pages, bytes) };
113 loop {
115 let b = TABLE_BUCKETS.load(Ordering::Acquire);
116 if b > 0 {
117 let mask = TABLE_MASK.load(Ordering::Relaxed);
118 return Some((other, b, mask));
119 }
120 core::hint::spin_loop();
121 }
122 }
123 }
124}
125
126pub(crate) fn record(frames: &Frames, size: u64) {
128 let count = frames.count as usize;
129 if count == 0 {
130 return;
133 }
134 let Some((base, _buckets, mask)) = ensure_init() else {
135 return;
136 };
137 let h = hash_frames(&frames.frames, count);
138 let mut idx = (h as usize) & mask;
139 let start = idx;
140
141 loop {
142 let bucket = unsafe { &*base.add(idx) };
146 let existing = bucket.hash.load(Ordering::Acquire);
147
148 if existing == 0 {
149 match bucket
150 .hash
151 .compare_exchange(0, h, Ordering::Release, Ordering::Acquire)
152 {
153 Ok(_) => {
154 for i in 0..count {
162 bucket.sample_frames[i].store(frames.frames[i], Ordering::Relaxed);
163 }
164 bucket.count.fetch_add(1, Ordering::Relaxed);
165 bucket.total_bytes.fetch_add(size, Ordering::Relaxed);
166 bucket.frame_count.store(count as u64, Ordering::Release);
167 return;
168 }
169 Err(observed) => {
170 if observed == h {
171 bucket.count.fetch_add(1, Ordering::Relaxed);
177 bucket.total_bytes.fetch_add(size, Ordering::Relaxed);
178 return;
179 }
180 }
183 }
184 } else if existing == h {
185 bucket.count.fetch_add(1, Ordering::Relaxed);
191 bucket.total_bytes.fetch_add(size, Ordering::Relaxed);
192 return;
193 }
194
195 idx = (idx + 1) & mask;
196 if idx == start {
197 return;
199 }
200 }
201}
202
203pub fn call_sites_report() -> Vec<CallSiteStats> {
208 super::arena::flush_current_thread();
209
210 let Some((base, buckets, _mask)) = ensure_init() else {
211 return Vec::new();
212 };
213
214 let mut out = Vec::new();
215 for i in 0..buckets {
216 let bucket = unsafe { &*base.add(i) };
219 let h = bucket.hash.load(Ordering::Acquire);
220 if h == 0 {
221 continue;
222 }
223 let fc = bucket.frame_count.load(Ordering::Acquire);
224 if fc == 0 {
225 continue;
227 }
228 let n = (fc as usize).min(8);
229 let mut frames = [0u64; 8];
230 for (j, slot) in frames.iter_mut().enumerate().take(n) {
231 *slot = bucket.sample_frames[j].load(Ordering::Relaxed);
232 }
233 out.push(CallSiteStats {
234 frames,
235 frame_count: n as u8,
236 count: bucket.count.load(Ordering::Relaxed),
237 total_bytes: bucket.total_bytes.load(Ordering::Relaxed),
238 });
239 }
240 out
241}
242
243#[doc(hidden)]
246pub fn _reset_for_test() {
247 let Some((base, buckets, _mask)) = ensure_init() else {
248 return;
249 };
250 for i in 0..buckets {
251 let bucket = unsafe { &*base.add(i) };
254 bucket.hash.store(0, Ordering::Release);
255 bucket.count.store(0, Ordering::Relaxed);
256 bucket.total_bytes.store(0, Ordering::Relaxed);
257 bucket.frame_count.store(0, Ordering::Release);
258 for j in 0..8 {
259 bucket.sample_frames[j].store(0, Ordering::Relaxed);
260 }
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267 use std::sync::Mutex;
268
269 static TEST_LOCK: Mutex<()> = Mutex::new(());
273
274 #[test]
275 fn records_and_reports_a_single_site() {
276 let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
277 _reset_for_test();
278 let frames = Frames {
279 frames: [0xAAAA, 0xBBBB, 0xCCCC, 0, 0, 0, 0, 0],
280 count: 3,
281 };
282 record(&frames, 100);
283 record(&frames, 200);
284
285 let report = call_sites_report();
286 let site = report
287 .iter()
288 .find(|s| s.frames[0] == 0xAAAA && s.frames[1] == 0xBBBB)
289 .expect("expected our site in the report");
290 assert_eq!(site.frame_count, 3);
291 assert_eq!(site.count, 2);
292 assert_eq!(site.total_bytes, 300);
293 }
294
295 #[test]
296 fn distinct_sites_are_separately_aggregated() {
297 let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
298 _reset_for_test();
299 let a = Frames {
300 frames: [0xA000, 0xA001, 0, 0, 0, 0, 0, 0],
301 count: 2,
302 };
303 let b = Frames {
304 frames: [0xB000, 0xB001, 0, 0, 0, 0, 0, 0],
305 count: 2,
306 };
307 for _ in 0..5 {
308 record(&a, 10);
309 }
310 for _ in 0..3 {
311 record(&b, 20);
312 }
313 let report = call_sites_report();
314 let sa = report.iter().find(|s| s.frames[0] == 0xA000).unwrap();
315 let sb = report.iter().find(|s| s.frames[0] == 0xB000).unwrap();
316 assert_eq!(sa.count, 5);
317 assert_eq!(sa.total_bytes, 50);
318 assert_eq!(sb.count, 3);
319 assert_eq!(sb.total_bytes, 60);
320 }
321
322 #[test]
323 fn zero_frame_capture_is_ignored() {
324 let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
325 _reset_for_test();
326 let empty = Frames {
327 frames: [0; 8],
328 count: 0,
329 };
330 record(&empty, 50);
331 let report = call_sites_report();
332 assert!(
333 report.iter().all(|s| s.frame_count > 0),
334 "zero-frame capture should not appear"
335 );
336 }
337}