use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering};
use super::hash::hash_frames;
use super::raw_mem::{alloc_pages, free_pages};
use super::walk::Frames;
const DEFAULT_BUCKETS: usize = 4_096;
const MIN_BUCKETS: usize = 64;
const MAX_BUCKETS: usize = 1 << 20;
#[derive(Debug, Clone, Copy)]
pub struct CallSiteStats {
pub frames: [u64; 8],
pub frame_count: u8,
pub count: u64,
pub total_bytes: u64,
}
#[repr(C, align(16))]
struct Bucket {
hash: AtomicU64,
count: AtomicU64,
total_bytes: AtomicU64,
frame_count: AtomicU64,
sample_frames: [AtomicU64; 8],
}
const BUCKET_SIZE: usize = core::mem::size_of::<Bucket>();
static TABLE_BASE: AtomicPtr<Bucket> = AtomicPtr::new(core::ptr::null_mut());
static TABLE_BUCKETS: AtomicUsize = AtomicUsize::new(0);
static TABLE_MASK: AtomicUsize = AtomicUsize::new(0);
fn configured_bucket_count() -> usize {
let raw = match std::env::var("MOD_ALLOC_BUCKETS") {
Ok(s) => s,
Err(_) => return DEFAULT_BUCKETS,
};
let n: usize = raw.trim().parse().unwrap_or(DEFAULT_BUCKETS);
let n = n.clamp(MIN_BUCKETS, MAX_BUCKETS);
n.next_power_of_two()
}
fn ensure_init() -> Option<(*mut Bucket, usize, usize)> {
let existing = TABLE_BASE.load(Ordering::Acquire);
if !existing.is_null() {
let buckets = TABLE_BUCKETS.load(Ordering::Acquire);
let mask = TABLE_MASK.load(Ordering::Relaxed);
return Some((existing, buckets, mask));
}
let buckets = configured_bucket_count();
let bytes = buckets * BUCKET_SIZE;
let pages = unsafe { alloc_pages(bytes) };
if pages.is_null() {
return None;
}
let new_base = pages as *mut Bucket;
match TABLE_BASE.compare_exchange(
core::ptr::null_mut(),
new_base,
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => {
TABLE_BUCKETS.store(buckets, Ordering::Release);
TABLE_MASK.store(buckets - 1, Ordering::Release);
Some((new_base, buckets, buckets - 1))
}
Err(other) => {
unsafe { free_pages(pages, bytes) };
loop {
let b = TABLE_BUCKETS.load(Ordering::Acquire);
if b > 0 {
let mask = TABLE_MASK.load(Ordering::Relaxed);
return Some((other, b, mask));
}
core::hint::spin_loop();
}
}
}
}
pub(crate) fn record(frames: &Frames, size: u64) {
let count = frames.count as usize;
if count == 0 {
return;
}
let Some((base, _buckets, mask)) = ensure_init() else {
return;
};
let h = hash_frames(&frames.frames, count);
let mut idx = (h as usize) & mask;
let start = idx;
loop {
let bucket = unsafe { &*base.add(idx) };
let existing = bucket.hash.load(Ordering::Acquire);
if existing == 0 {
match bucket
.hash
.compare_exchange(0, h, Ordering::Release, Ordering::Acquire)
{
Ok(_) => {
for i in 0..count {
bucket.sample_frames[i].store(frames.frames[i], Ordering::Relaxed);
}
bucket.count.fetch_add(1, Ordering::Relaxed);
bucket.total_bytes.fetch_add(size, Ordering::Relaxed);
bucket.frame_count.store(count as u64, Ordering::Release);
return;
}
Err(observed) => {
if observed == h {
bucket.count.fetch_add(1, Ordering::Relaxed);
bucket.total_bytes.fetch_add(size, Ordering::Relaxed);
return;
}
}
}
} else if existing == h {
bucket.count.fetch_add(1, Ordering::Relaxed);
bucket.total_bytes.fetch_add(size, Ordering::Relaxed);
return;
}
idx = (idx + 1) & mask;
if idx == start {
return;
}
}
}
pub fn call_sites_report() -> Vec<CallSiteStats> {
super::arena::flush_current_thread();
let Some((base, buckets, _mask)) = ensure_init() else {
return Vec::new();
};
let mut out = Vec::new();
for i in 0..buckets {
let bucket = unsafe { &*base.add(i) };
let h = bucket.hash.load(Ordering::Acquire);
if h == 0 {
continue;
}
let fc = bucket.frame_count.load(Ordering::Acquire);
if fc == 0 {
continue;
}
let n = (fc as usize).min(8);
let mut frames = [0u64; 8];
for (j, slot) in frames.iter_mut().enumerate().take(n) {
*slot = bucket.sample_frames[j].load(Ordering::Relaxed);
}
out.push(CallSiteStats {
frames,
frame_count: n as u8,
count: bucket.count.load(Ordering::Relaxed),
total_bytes: bucket.total_bytes.load(Ordering::Relaxed),
});
}
out
}
#[doc(hidden)]
pub fn _reset_for_test() {
let Some((base, buckets, _mask)) = ensure_init() else {
return;
};
for i in 0..buckets {
let bucket = unsafe { &*base.add(i) };
bucket.hash.store(0, Ordering::Release);
bucket.count.store(0, Ordering::Relaxed);
bucket.total_bytes.store(0, Ordering::Relaxed);
bucket.frame_count.store(0, Ordering::Release);
for j in 0..8 {
bucket.sample_frames[j].store(0, Ordering::Relaxed);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
static TEST_LOCK: Mutex<()> = Mutex::new(());
#[test]
fn records_and_reports_a_single_site() {
let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
_reset_for_test();
let frames = Frames {
frames: [0xAAAA, 0xBBBB, 0xCCCC, 0, 0, 0, 0, 0],
count: 3,
};
record(&frames, 100);
record(&frames, 200);
let report = call_sites_report();
let site = report
.iter()
.find(|s| s.frames[0] == 0xAAAA && s.frames[1] == 0xBBBB)
.expect("expected our site in the report");
assert_eq!(site.frame_count, 3);
assert_eq!(site.count, 2);
assert_eq!(site.total_bytes, 300);
}
#[test]
fn distinct_sites_are_separately_aggregated() {
let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
_reset_for_test();
let a = Frames {
frames: [0xA000, 0xA001, 0, 0, 0, 0, 0, 0],
count: 2,
};
let b = Frames {
frames: [0xB000, 0xB001, 0, 0, 0, 0, 0, 0],
count: 2,
};
for _ in 0..5 {
record(&a, 10);
}
for _ in 0..3 {
record(&b, 20);
}
let report = call_sites_report();
let sa = report.iter().find(|s| s.frames[0] == 0xA000).unwrap();
let sb = report.iter().find(|s| s.frames[0] == 0xB000).unwrap();
assert_eq!(sa.count, 5);
assert_eq!(sa.total_bytes, 50);
assert_eq!(sb.count, 3);
assert_eq!(sb.total_bytes, 60);
}
#[test]
fn zero_frame_capture_is_ignored() {
let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
_reset_for_test();
let empty = Frames {
frames: [0; 8],
count: 0,
};
record(&empty, 50);
let report = call_sites_report();
assert!(
report.iter().all(|s| s.frame_count > 0),
"zero-frame capture should not appear"
);
}
}