Skip to main content

mod_alloc/backtrace/
table.rs

1//! Global call-site aggregation table.
2//!
3//! Open-addressed, fixed-size, atomic-only hash table allocated
4//! once via raw OS pages. Per-thread arenas flush their captured
5//! events into this table; the public report API drains it into a
6//! `Vec<CallSiteStats>`.
7//!
8//! ## Sizing
9//!
10//! Default: 4,096 buckets × 96 bytes = ~384 KB. Override via the
11//! `MOD_ALLOC_BUCKETS` environment variable at process start. The
12//! value is rounded up to the next power of two and clamped to
13//! `[64, 1,048,576]`. Reading the env var allocates a small
14//! `String` on first call; this happens inside the allocator hook
15//! with the reentrancy guard set, so the recursive `alloc` is
16//! forwarded directly to `System` without tracking.
17//!
18//! ## Concurrency
19//!
20//! Buckets use a two-phase publish protocol: the hash field is
21//! CAS-claimed first, then `sample_frames` are written, then
22//! `frame_count` is stored with `Release` to mark the bucket
23//! fully populated. Readers gate on `frame_count > 0` after
24//! observing a non-zero hash; this avoids torn reads of the
25//! sample frames.
26
27use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering};
28
29use super::hash::hash_frames;
30use super::raw_mem::{alloc_pages, free_pages};
31use super::walk::Frames;
32
33const DEFAULT_BUCKETS: usize = 4_096;
34const MIN_BUCKETS: usize = 64;
35const MAX_BUCKETS: usize = 1 << 20; // 1,048,576
36
37/// One row in the per-call-site report.
38///
39/// Frames are raw return addresses, top of stack first. The
40/// `frames[..frame_count]` slice is the captured trace; the rest
41/// is zero. Symbolication happens at report-generation time (lands
42/// in `v0.9.2`).
43#[derive(Debug, Clone, Copy)]
44pub struct CallSiteStats {
45    /// Raw return addresses, top of stack first.
46    pub frames: [u64; 8],
47    /// Number of valid frames in `frames`.
48    pub frame_count: u8,
49    /// Number of allocations attributed to this call site.
50    pub count: u64,
51    /// Total bytes allocated at this call site (across all
52    /// recorded events).
53    pub total_bytes: u64,
54}
55
56#[repr(C, align(16))]
57struct Bucket {
58    hash: AtomicU64,
59    count: AtomicU64,
60    total_bytes: AtomicU64,
61    frame_count: AtomicU64,
62    sample_frames: [AtomicU64; 8],
63}
64
65const BUCKET_SIZE: usize = core::mem::size_of::<Bucket>();
66
67static TABLE_BASE: AtomicPtr<Bucket> = AtomicPtr::new(core::ptr::null_mut());
68static TABLE_BUCKETS: AtomicUsize = AtomicUsize::new(0);
69static TABLE_MASK: AtomicUsize = AtomicUsize::new(0);
70
71fn configured_bucket_count() -> usize {
72    let raw = match std::env::var("MOD_ALLOC_BUCKETS") {
73        Ok(s) => s,
74        Err(_) => return DEFAULT_BUCKETS,
75    };
76    let n: usize = raw.trim().parse().unwrap_or(DEFAULT_BUCKETS);
77    let n = n.clamp(MIN_BUCKETS, MAX_BUCKETS);
78    n.next_power_of_two()
79}
80
81fn ensure_init() -> Option<(*mut Bucket, usize, usize)> {
82    let existing = TABLE_BASE.load(Ordering::Acquire);
83    if !existing.is_null() {
84        let buckets = TABLE_BUCKETS.load(Ordering::Acquire);
85        let mask = TABLE_MASK.load(Ordering::Relaxed);
86        return Some((existing, buckets, mask));
87    }
88
89    let buckets = configured_bucket_count();
90    let bytes = buckets * BUCKET_SIZE;
91    // SAFETY: alloc_pages returns either null or a writable,
92    // zero-init region of `bytes` bytes.
93    let pages = unsafe { alloc_pages(bytes) };
94    if pages.is_null() {
95        return None;
96    }
97    let new_base = pages as *mut Bucket;
98
99    match TABLE_BASE.compare_exchange(
100        core::ptr::null_mut(),
101        new_base,
102        Ordering::Release,
103        Ordering::Acquire,
104    ) {
105        Ok(_) => {
106            TABLE_BUCKETS.store(buckets, Ordering::Release);
107            TABLE_MASK.store(buckets - 1, Ordering::Release);
108            Some((new_base, buckets, buckets - 1))
109        }
110        Err(other) => {
111            // SAFETY: we own `pages`; the CAS loser has not used it.
112            unsafe { free_pages(pages, bytes) };
113            // Wait until the winner publishes the count.
114            loop {
115                let b = TABLE_BUCKETS.load(Ordering::Acquire);
116                if b > 0 {
117                    let mask = TABLE_MASK.load(Ordering::Relaxed);
118                    return Some((other, b, mask));
119                }
120                core::hint::spin_loop();
121            }
122        }
123    }
124}
125
126/// Record one captured event into the global table.
127pub(crate) fn record(frames: &Frames, size: u64) {
128    let count = frames.count as usize;
129    if count == 0 {
130        // Nothing meaningful to bucket on a zero-frame capture
131        // (target unsupported, FP unavailable, etc.). Skip.
132        return;
133    }
134    let Some((base, _buckets, mask)) = ensure_init() else {
135        return;
136    };
137    let h = hash_frames(&frames.frames, count);
138    let mut idx = (h as usize) & mask;
139    let start = idx;
140
141    loop {
142        // SAFETY: `base` is the start of an array of length
143        // `mask + 1`, and `idx <= mask`. Pointer arithmetic stays
144        // in-bounds.
145        let bucket = unsafe { &*base.add(idx) };
146        let existing = bucket.hash.load(Ordering::Acquire);
147
148        if existing == 0 {
149            match bucket
150                .hash
151                .compare_exchange(0, h, Ordering::Release, Ordering::Acquire)
152            {
153                Ok(_) => {
154                    // We own the initialisation phase. Use
155                    // `fetch_add` (not `store`) on `count` and
156                    // `total_bytes` so concurrent matching
157                    // writers never have their increments
158                    // clobbered by our initial value. This lets
159                    // the matching path skip `wait_published`
160                    // entirely.
161                    for i in 0..count {
162                        bucket.sample_frames[i].store(frames.frames[i], Ordering::Relaxed);
163                    }
164                    bucket.count.fetch_add(1, Ordering::Relaxed);
165                    bucket.total_bytes.fetch_add(size, Ordering::Relaxed);
166                    bucket.frame_count.store(count as u64, Ordering::Release);
167                    return;
168                }
169                Err(observed) => {
170                    if observed == h {
171                        // Same call site, another writer claimed
172                        // first. The init thread now uses
173                        // `fetch_add` (not `store`), so our
174                        // increment cannot be clobbered even if
175                        // it lands before init finishes.
176                        bucket.count.fetch_add(1, Ordering::Relaxed);
177                        bucket.total_bytes.fetch_add(size, Ordering::Relaxed);
178                        return;
179                    }
180                    // Different site collided on this slot;
181                    // probe forward.
182                }
183            }
184        } else if existing == h {
185            // Hot path: same site already in this bucket. No
186            // wait_published needed because the init thread now
187            // uses fetch_add for count/total_bytes; whether init
188            // has finished publishing `sample_frames` is a
189            // reader (`call_sites_report`) concern, not ours.
190            bucket.count.fetch_add(1, Ordering::Relaxed);
191            bucket.total_bytes.fetch_add(size, Ordering::Relaxed);
192            return;
193        }
194
195        idx = (idx + 1) & mask;
196        if idx == start {
197            // Table full; drop event silently. Tracked in v0.9.2.
198            return;
199        }
200    }
201}
202
203/// Drain the per-call-site table into a `Vec<CallSiteStats>`.
204///
205/// Flushes the calling thread's arena first so recent events
206/// are visible in the report.
207pub fn call_sites_report() -> Vec<CallSiteStats> {
208    super::arena::flush_current_thread();
209
210    let Some((base, buckets, _mask)) = ensure_init() else {
211        return Vec::new();
212    };
213
214    let mut out = Vec::new();
215    for i in 0..buckets {
216        // SAFETY: `i < buckets`, the bound used to allocate the
217        // table, so `base.add(i)` is in-bounds.
218        let bucket = unsafe { &*base.add(i) };
219        let h = bucket.hash.load(Ordering::Acquire);
220        if h == 0 {
221            continue;
222        }
223        let fc = bucket.frame_count.load(Ordering::Acquire);
224        if fc == 0 {
225            // Claimed but not yet published; skip this snapshot.
226            continue;
227        }
228        let n = (fc as usize).min(8);
229        let mut frames = [0u64; 8];
230        for (j, slot) in frames.iter_mut().enumerate().take(n) {
231            *slot = bucket.sample_frames[j].load(Ordering::Relaxed);
232        }
233        out.push(CallSiteStats {
234            frames,
235            frame_count: n as u8,
236            count: bucket.count.load(Ordering::Relaxed),
237            total_bytes: bucket.total_bytes.load(Ordering::Relaxed),
238        });
239    }
240    out
241}
242
243/// Reset the global table. Intended for tests only; production
244/// callers should treat the table as monotonic.
245#[doc(hidden)]
246pub fn _reset_for_test() {
247    let Some((base, buckets, _mask)) = ensure_init() else {
248        return;
249    };
250    for i in 0..buckets {
251        // SAFETY: bounds-checked by `buckets`, same justification
252        // as `call_sites_report`.
253        let bucket = unsafe { &*base.add(i) };
254        bucket.hash.store(0, Ordering::Release);
255        bucket.count.store(0, Ordering::Relaxed);
256        bucket.total_bytes.store(0, Ordering::Relaxed);
257        bucket.frame_count.store(0, Ordering::Release);
258        for j in 0..8 {
259            bucket.sample_frames[j].store(0, Ordering::Relaxed);
260        }
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267    use std::sync::Mutex;
268
269    // Serialise the table tests within this binary. They share
270    // the process-wide aggregation table, and cargo runs unit
271    // tests in parallel by default.
272    static TEST_LOCK: Mutex<()> = Mutex::new(());
273
274    #[test]
275    fn records_and_reports_a_single_site() {
276        let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
277        _reset_for_test();
278        let frames = Frames {
279            frames: [0xAAAA, 0xBBBB, 0xCCCC, 0, 0, 0, 0, 0],
280            count: 3,
281        };
282        record(&frames, 100);
283        record(&frames, 200);
284
285        let report = call_sites_report();
286        let site = report
287            .iter()
288            .find(|s| s.frames[0] == 0xAAAA && s.frames[1] == 0xBBBB)
289            .expect("expected our site in the report");
290        assert_eq!(site.frame_count, 3);
291        assert_eq!(site.count, 2);
292        assert_eq!(site.total_bytes, 300);
293    }
294
295    #[test]
296    fn distinct_sites_are_separately_aggregated() {
297        let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
298        _reset_for_test();
299        let a = Frames {
300            frames: [0xA000, 0xA001, 0, 0, 0, 0, 0, 0],
301            count: 2,
302        };
303        let b = Frames {
304            frames: [0xB000, 0xB001, 0, 0, 0, 0, 0, 0],
305            count: 2,
306        };
307        for _ in 0..5 {
308            record(&a, 10);
309        }
310        for _ in 0..3 {
311            record(&b, 20);
312        }
313        let report = call_sites_report();
314        let sa = report.iter().find(|s| s.frames[0] == 0xA000).unwrap();
315        let sb = report.iter().find(|s| s.frames[0] == 0xB000).unwrap();
316        assert_eq!(sa.count, 5);
317        assert_eq!(sa.total_bytes, 50);
318        assert_eq!(sb.count, 3);
319        assert_eq!(sb.total_bytes, 60);
320    }
321
322    #[test]
323    fn zero_frame_capture_is_ignored() {
324        let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
325        _reset_for_test();
326        let empty = Frames {
327            frames: [0; 8],
328            count: 0,
329        };
330        record(&empty, 50);
331        let report = call_sites_report();
332        assert!(
333            report.iter().all(|s| s.frame_count > 0),
334            "zero-frame capture should not appear"
335        );
336    }
337}