1use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering};
31
32use super::hash::hash_frames;
33use super::raw_mem::{alloc_pages, free_pages};
34use super::walk::Frames;
35
36const DEFAULT_BUCKETS: usize = 4_096;
37const MIN_BUCKETS: usize = 64;
38const MAX_BUCKETS: usize = 1 << 20; #[derive(Debug, Clone, Copy)]
47pub struct CallSiteStats {
48 pub frames: [u64; 8],
50 pub frame_count: u8,
52 pub count: u64,
54 pub total_bytes: u64,
57}
58
59#[repr(C, align(16))]
60struct Bucket {
61 hash: AtomicU64,
62 count: AtomicU64,
63 total_bytes: AtomicU64,
64 frame_count: AtomicU64,
65 sample_frames: [AtomicU64; 8],
66}
67
68const BUCKET_SIZE: usize = core::mem::size_of::<Bucket>();
69
70static TABLE_BASE: AtomicPtr<Bucket> = AtomicPtr::new(core::ptr::null_mut());
71static TABLE_BUCKETS: AtomicUsize = AtomicUsize::new(0);
72static TABLE_MASK: AtomicUsize = AtomicUsize::new(0);
73
74fn configured_bucket_count() -> usize {
75 let raw = match std::env::var("MOD_ALLOC_BUCKETS") {
76 Ok(s) => s,
77 Err(_) => return DEFAULT_BUCKETS,
78 };
79 let n: usize = raw.trim().parse().unwrap_or(DEFAULT_BUCKETS);
80 let n = n.clamp(MIN_BUCKETS, MAX_BUCKETS);
81 n.next_power_of_two()
82}
83
84#[inline(always)]
90fn ensure_init() -> Option<(*mut Bucket, usize, usize)> {
91 let existing = TABLE_BASE.load(Ordering::Acquire);
92 if !existing.is_null() {
93 let buckets = TABLE_BUCKETS.load(Ordering::Acquire);
94 let mask = TABLE_MASK.load(Ordering::Relaxed);
95 return Some((existing, buckets, mask));
96 }
97 ensure_init_slow()
98}
99
100#[cold]
104#[inline(never)]
105fn ensure_init_slow() -> Option<(*mut Bucket, usize, usize)> {
106 let buckets = configured_bucket_count();
107 let bytes = buckets * BUCKET_SIZE;
108 let pages = unsafe { alloc_pages(bytes) };
111 if pages.is_null() {
112 return None;
113 }
114 let new_base = pages as *mut Bucket;
115
116 match TABLE_BASE.compare_exchange(
117 core::ptr::null_mut(),
118 new_base,
119 Ordering::Release,
120 Ordering::Acquire,
121 ) {
122 Ok(_) => {
123 TABLE_BUCKETS.store(buckets, Ordering::Release);
124 TABLE_MASK.store(buckets - 1, Ordering::Release);
125 Some((new_base, buckets, buckets - 1))
126 }
127 Err(other) => {
128 unsafe { free_pages(pages, bytes) };
130 loop {
132 let b = TABLE_BUCKETS.load(Ordering::Acquire);
133 if b > 0 {
134 let mask = TABLE_MASK.load(Ordering::Relaxed);
135 return Some((other, b, mask));
136 }
137 core::hint::spin_loop();
138 }
139 }
140 }
141}
142
143#[inline]
151pub(crate) fn record(frames: &Frames, size: u64) {
152 let count = frames.count as usize;
153 if count == 0 {
154 return;
157 }
158 let Some((base, _buckets, mask)) = ensure_init() else {
159 return;
160 };
161 let h = hash_frames(&frames.frames, count);
162 let mut idx = (h as usize) & mask;
163 let start = idx;
164
165 loop {
166 let bucket = unsafe { &*base.add(idx) };
170 let existing = bucket.hash.load(Ordering::Acquire);
171
172 if existing == 0 {
173 match bucket
174 .hash
175 .compare_exchange(0, h, Ordering::Release, Ordering::Acquire)
176 {
177 Ok(_) => {
178 for i in 0..count {
186 bucket.sample_frames[i].store(frames.frames[i], Ordering::Relaxed);
187 }
188 bucket.count.fetch_add(1, Ordering::Relaxed);
189 bucket.total_bytes.fetch_add(size, Ordering::Relaxed);
190 bucket.frame_count.store(count as u64, Ordering::Release);
191 return;
192 }
193 Err(observed) => {
194 if observed == h {
195 bucket.count.fetch_add(1, Ordering::Relaxed);
201 bucket.total_bytes.fetch_add(size, Ordering::Relaxed);
202 return;
203 }
204 }
207 }
208 } else if existing == h {
209 bucket.count.fetch_add(1, Ordering::Relaxed);
215 bucket.total_bytes.fetch_add(size, Ordering::Relaxed);
216 return;
217 }
218
219 idx = (idx + 1) & mask;
220 if idx == start {
221 return;
223 }
224 }
225}
226
227pub fn call_sites_report() -> Vec<CallSiteStats> {
232 let Some((base, buckets, _mask)) = ensure_init() else {
233 return Vec::new();
234 };
235
236 let mut out = Vec::new();
237 for i in 0..buckets {
238 let bucket = unsafe { &*base.add(i) };
241 let h = bucket.hash.load(Ordering::Acquire);
242 if h == 0 {
243 continue;
244 }
245 let fc = bucket.frame_count.load(Ordering::Acquire);
246 if fc == 0 {
247 continue;
249 }
250 let n = (fc as usize).min(8);
251 let mut frames = [0u64; 8];
252 for (j, slot) in frames.iter_mut().enumerate().take(n) {
253 *slot = bucket.sample_frames[j].load(Ordering::Relaxed);
254 }
255 out.push(CallSiteStats {
256 frames,
257 frame_count: n as u8,
258 count: bucket.count.load(Ordering::Relaxed),
259 total_bytes: bucket.total_bytes.load(Ordering::Relaxed),
260 });
261 }
262 out
263}
264
265#[doc(hidden)]
268pub fn _reset_for_test() {
269 let Some((base, buckets, _mask)) = ensure_init() else {
270 return;
271 };
272 for i in 0..buckets {
273 let bucket = unsafe { &*base.add(i) };
276 bucket.hash.store(0, Ordering::Release);
277 bucket.count.store(0, Ordering::Relaxed);
278 bucket.total_bytes.store(0, Ordering::Relaxed);
279 bucket.frame_count.store(0, Ordering::Release);
280 for j in 0..8 {
281 bucket.sample_frames[j].store(0, Ordering::Relaxed);
282 }
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289 use std::sync::Mutex;
290
291 static TEST_LOCK: Mutex<()> = Mutex::new(());
295
296 #[test]
297 fn records_and_reports_a_single_site() {
298 let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
299 _reset_for_test();
300 let frames = Frames {
301 frames: [0xAAAA, 0xBBBB, 0xCCCC, 0, 0, 0, 0, 0],
302 count: 3,
303 };
304 record(&frames, 100);
305 record(&frames, 200);
306
307 let report = call_sites_report();
308 let site = report
309 .iter()
310 .find(|s| s.frames[0] == 0xAAAA && s.frames[1] == 0xBBBB)
311 .expect("expected our site in the report");
312 assert_eq!(site.frame_count, 3);
313 assert_eq!(site.count, 2);
314 assert_eq!(site.total_bytes, 300);
315 }
316
317 #[test]
318 fn distinct_sites_are_separately_aggregated() {
319 let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
320 _reset_for_test();
321 let a = Frames {
322 frames: [0xA000, 0xA001, 0, 0, 0, 0, 0, 0],
323 count: 2,
324 };
325 let b = Frames {
326 frames: [0xB000, 0xB001, 0, 0, 0, 0, 0, 0],
327 count: 2,
328 };
329 for _ in 0..5 {
330 record(&a, 10);
331 }
332 for _ in 0..3 {
333 record(&b, 20);
334 }
335 let report = call_sites_report();
336 let sa = report.iter().find(|s| s.frames[0] == 0xA000).unwrap();
337 let sb = report.iter().find(|s| s.frames[0] == 0xB000).unwrap();
338 assert_eq!(sa.count, 5);
339 assert_eq!(sa.total_bytes, 50);
340 assert_eq!(sb.count, 3);
341 assert_eq!(sb.total_bytes, 60);
342 }
343
344 #[test]
345 fn zero_frame_capture_is_ignored() {
346 let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
347 _reset_for_test();
348 let empty = Frames {
349 frames: [0; 8],
350 count: 0,
351 };
352 record(&empty, 50);
353 let report = call_sites_report();
354 assert!(
355 report.iter().all(|s| s.frame_count > 0),
356 "zero-frame capture should not appear"
357 );
358 }
359}