Skip to main content

mod_alloc/backtrace/
table.rs

1//! Global call-site aggregation table.
2//!
3//! Open-addressed, fixed-size, atomic-only hash table allocated
4//! once via raw OS pages. `record_event` writes directly here on
5//! every tracked alloc; the public report API drains it into a
6//! `Vec<CallSiteStats>`. The per-thread arena that v0.9.1–v0.9.5
7//! used as a batching layer was removed in v0.9.6 because the
8//! table's matching-path fast write (two atomic ops) is already
9//! cheap enough that batching added more cost than it saved.
10//!
11//! ## Sizing
12//!
13//! Default: 4,096 buckets × 96 bytes = ~384 KB. Override via the
14//! `MOD_ALLOC_BUCKETS` environment variable at process start. The
15//! value is rounded up to the next power of two and clamped to
16//! `[64, 1,048,576]`. Reading the env var allocates a small
17//! `String` on first call; this happens inside the allocator hook
18//! with the reentrancy guard set, so the recursive `alloc` is
19//! forwarded directly to `System` without tracking.
20//!
21//! ## Concurrency
22//!
23//! Buckets use a two-phase publish protocol: the hash field is
24//! CAS-claimed first, then `sample_frames` are written, then
25//! `frame_count` is stored with `Release` to mark the bucket
26//! fully populated. Readers gate on `frame_count > 0` after
27//! observing a non-zero hash; this avoids torn reads of the
28//! sample frames.
29
30use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering};
31
32use super::hash::hash_frames;
33use super::raw_mem::{alloc_pages, free_pages};
34use super::walk::Frames;
35
36const DEFAULT_BUCKETS: usize = 4_096;
37const MIN_BUCKETS: usize = 64;
38const MAX_BUCKETS: usize = 1 << 20; // 1,048,576
39
40/// One row in the per-call-site report.
41///
42/// Frames are raw return addresses, top of stack first. The
43/// `frames[..frame_count]` slice is the captured trace; the rest
44/// is zero. Symbolication happens at report-generation time (lands
45/// in `v0.9.2`).
46#[derive(Debug, Clone, Copy)]
47pub struct CallSiteStats {
48    /// Raw return addresses, top of stack first.
49    pub frames: [u64; 8],
50    /// Number of valid frames in `frames`.
51    pub frame_count: u8,
52    /// Number of allocations attributed to this call site.
53    pub count: u64,
54    /// Total bytes allocated at this call site (across all
55    /// recorded events).
56    pub total_bytes: u64,
57}
58
59#[repr(C, align(16))]
60struct Bucket {
61    hash: AtomicU64,
62    count: AtomicU64,
63    total_bytes: AtomicU64,
64    frame_count: AtomicU64,
65    sample_frames: [AtomicU64; 8],
66}
67
68const BUCKET_SIZE: usize = core::mem::size_of::<Bucket>();
69
70static TABLE_BASE: AtomicPtr<Bucket> = AtomicPtr::new(core::ptr::null_mut());
71static TABLE_BUCKETS: AtomicUsize = AtomicUsize::new(0);
72static TABLE_MASK: AtomicUsize = AtomicUsize::new(0);
73
74fn configured_bucket_count() -> usize {
75    let raw = match std::env::var("MOD_ALLOC_BUCKETS") {
76        Ok(s) => s,
77        Err(_) => return DEFAULT_BUCKETS,
78    };
79    let n: usize = raw.trim().parse().unwrap_or(DEFAULT_BUCKETS);
80    let n = n.clamp(MIN_BUCKETS, MAX_BUCKETS);
81    n.next_power_of_two()
82}
83
84/// Steady-state inline accessor. Folds into the calling
85/// `table::record` body so the fast path (table already
86/// initialised, which is true after the very first event for
87/// the process) becomes three atomic loads with no function-
88/// call boundary.
89#[inline(always)]
90fn ensure_init() -> Option<(*mut Bucket, usize, usize)> {
91    let existing = TABLE_BASE.load(Ordering::Acquire);
92    if !existing.is_null() {
93        let buckets = TABLE_BUCKETS.load(Ordering::Acquire);
94        let mask = TABLE_MASK.load(Ordering::Relaxed);
95        return Some((existing, buckets, mask));
96    }
97    ensure_init_slow()
98}
99
100/// First-event-per-process slow path. Allocates the table via
101/// raw OS pages and CAS-publishes it. Kept out-of-line so the
102/// `record` hot path stays compact.
103#[cold]
104#[inline(never)]
105fn ensure_init_slow() -> Option<(*mut Bucket, usize, usize)> {
106    let buckets = configured_bucket_count();
107    let bytes = buckets * BUCKET_SIZE;
108    // SAFETY: alloc_pages returns either null or a writable,
109    // zero-init region of `bytes` bytes.
110    let pages = unsafe { alloc_pages(bytes) };
111    if pages.is_null() {
112        return None;
113    }
114    let new_base = pages as *mut Bucket;
115
116    match TABLE_BASE.compare_exchange(
117        core::ptr::null_mut(),
118        new_base,
119        Ordering::Release,
120        Ordering::Acquire,
121    ) {
122        Ok(_) => {
123            TABLE_BUCKETS.store(buckets, Ordering::Release);
124            TABLE_MASK.store(buckets - 1, Ordering::Release);
125            Some((new_base, buckets, buckets - 1))
126        }
127        Err(other) => {
128            // SAFETY: we own `pages`; the CAS loser has not used it.
129            unsafe { free_pages(pages, bytes) };
130            // Wait until the winner publishes the count.
131            loop {
132                let b = TABLE_BUCKETS.load(Ordering::Acquire);
133                if b > 0 {
134                    let mask = TABLE_MASK.load(Ordering::Relaxed);
135                    return Some((other, b, mask));
136                }
137                core::hint::spin_loop();
138            }
139        }
140    }
141}
142
143/// Record one captured event into the global table.
144///
145/// Called from `backtrace::record_event` on every tracked alloc.
146/// Marked `#[inline]` so thin-LTO can stitch the hot path
147/// (record_event → hash_frames → record) into the calling
148/// `GlobalAlloc::alloc` body without a cross-module function
149/// call.
150#[inline]
151pub(crate) fn record(frames: &Frames, size: u64) {
152    let count = frames.count as usize;
153    if count == 0 {
154        // Nothing meaningful to bucket on a zero-frame capture
155        // (target unsupported, FP unavailable, etc.). Skip.
156        return;
157    }
158    let Some((base, _buckets, mask)) = ensure_init() else {
159        return;
160    };
161    let h = hash_frames(&frames.frames, count);
162    let mut idx = (h as usize) & mask;
163    let start = idx;
164
165    loop {
166        // SAFETY: `base` is the start of an array of length
167        // `mask + 1`, and `idx <= mask`. Pointer arithmetic stays
168        // in-bounds.
169        let bucket = unsafe { &*base.add(idx) };
170        let existing = bucket.hash.load(Ordering::Acquire);
171
172        if existing == 0 {
173            match bucket
174                .hash
175                .compare_exchange(0, h, Ordering::Release, Ordering::Acquire)
176            {
177                Ok(_) => {
178                    // We own the initialisation phase. Use
179                    // `fetch_add` (not `store`) on `count` and
180                    // `total_bytes` so concurrent matching
181                    // writers never have their increments
182                    // clobbered by our initial value. This lets
183                    // the matching path skip `wait_published`
184                    // entirely.
185                    for i in 0..count {
186                        bucket.sample_frames[i].store(frames.frames[i], Ordering::Relaxed);
187                    }
188                    bucket.count.fetch_add(1, Ordering::Relaxed);
189                    bucket.total_bytes.fetch_add(size, Ordering::Relaxed);
190                    bucket.frame_count.store(count as u64, Ordering::Release);
191                    return;
192                }
193                Err(observed) => {
194                    if observed == h {
195                        // Same call site, another writer claimed
196                        // first. The init thread now uses
197                        // `fetch_add` (not `store`), so our
198                        // increment cannot be clobbered even if
199                        // it lands before init finishes.
200                        bucket.count.fetch_add(1, Ordering::Relaxed);
201                        bucket.total_bytes.fetch_add(size, Ordering::Relaxed);
202                        return;
203                    }
204                    // Different site collided on this slot;
205                    // probe forward.
206                }
207            }
208        } else if existing == h {
209            // Hot path: same site already in this bucket. No
210            // wait_published needed because the init thread now
211            // uses fetch_add for count/total_bytes; whether init
212            // has finished publishing `sample_frames` is a
213            // reader (`call_sites_report`) concern, not ours.
214            bucket.count.fetch_add(1, Ordering::Relaxed);
215            bucket.total_bytes.fetch_add(size, Ordering::Relaxed);
216            return;
217        }
218
219        idx = (idx + 1) & mask;
220        if idx == start {
221            // Table full; drop event silently. Tracked in v0.9.2.
222            return;
223        }
224    }
225}
226
227/// Drain the per-call-site table into a `Vec<CallSiteStats>`.
228///
229/// As of v0.9.6, events go straight from `record_event` into the
230/// global table — there is no per-thread arena to flush first.
231pub fn call_sites_report() -> Vec<CallSiteStats> {
232    let Some((base, buckets, _mask)) = ensure_init() else {
233        return Vec::new();
234    };
235
236    let mut out = Vec::new();
237    for i in 0..buckets {
238        // SAFETY: `i < buckets`, the bound used to allocate the
239        // table, so `base.add(i)` is in-bounds.
240        let bucket = unsafe { &*base.add(i) };
241        let h = bucket.hash.load(Ordering::Acquire);
242        if h == 0 {
243            continue;
244        }
245        let fc = bucket.frame_count.load(Ordering::Acquire);
246        if fc == 0 {
247            // Claimed but not yet published; skip this snapshot.
248            continue;
249        }
250        let n = (fc as usize).min(8);
251        let mut frames = [0u64; 8];
252        for (j, slot) in frames.iter_mut().enumerate().take(n) {
253            *slot = bucket.sample_frames[j].load(Ordering::Relaxed);
254        }
255        out.push(CallSiteStats {
256            frames,
257            frame_count: n as u8,
258            count: bucket.count.load(Ordering::Relaxed),
259            total_bytes: bucket.total_bytes.load(Ordering::Relaxed),
260        });
261    }
262    out
263}
264
265/// Reset the global table. Intended for tests only; production
266/// callers should treat the table as monotonic.
267#[doc(hidden)]
268pub fn _reset_for_test() {
269    let Some((base, buckets, _mask)) = ensure_init() else {
270        return;
271    };
272    for i in 0..buckets {
273        // SAFETY: bounds-checked by `buckets`, same justification
274        // as `call_sites_report`.
275        let bucket = unsafe { &*base.add(i) };
276        bucket.hash.store(0, Ordering::Release);
277        bucket.count.store(0, Ordering::Relaxed);
278        bucket.total_bytes.store(0, Ordering::Relaxed);
279        bucket.frame_count.store(0, Ordering::Release);
280        for j in 0..8 {
281            bucket.sample_frames[j].store(0, Ordering::Relaxed);
282        }
283    }
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289    use std::sync::Mutex;
290
291    // Serialise the table tests within this binary. They share
292    // the process-wide aggregation table, and cargo runs unit
293    // tests in parallel by default.
294    static TEST_LOCK: Mutex<()> = Mutex::new(());
295
296    #[test]
297    fn records_and_reports_a_single_site() {
298        let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
299        _reset_for_test();
300        let frames = Frames {
301            frames: [0xAAAA, 0xBBBB, 0xCCCC, 0, 0, 0, 0, 0],
302            count: 3,
303        };
304        record(&frames, 100);
305        record(&frames, 200);
306
307        let report = call_sites_report();
308        let site = report
309            .iter()
310            .find(|s| s.frames[0] == 0xAAAA && s.frames[1] == 0xBBBB)
311            .expect("expected our site in the report");
312        assert_eq!(site.frame_count, 3);
313        assert_eq!(site.count, 2);
314        assert_eq!(site.total_bytes, 300);
315    }
316
317    #[test]
318    fn distinct_sites_are_separately_aggregated() {
319        let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
320        _reset_for_test();
321        let a = Frames {
322            frames: [0xA000, 0xA001, 0, 0, 0, 0, 0, 0],
323            count: 2,
324        };
325        let b = Frames {
326            frames: [0xB000, 0xB001, 0, 0, 0, 0, 0, 0],
327            count: 2,
328        };
329        for _ in 0..5 {
330            record(&a, 10);
331        }
332        for _ in 0..3 {
333            record(&b, 20);
334        }
335        let report = call_sites_report();
336        let sa = report.iter().find(|s| s.frames[0] == 0xA000).unwrap();
337        let sb = report.iter().find(|s| s.frames[0] == 0xB000).unwrap();
338        assert_eq!(sa.count, 5);
339        assert_eq!(sa.total_bytes, 50);
340        assert_eq!(sb.count, 3);
341        assert_eq!(sb.total_bytes, 60);
342    }
343
344    #[test]
345    fn zero_frame_capture_is_ignored() {
346        let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
347        _reset_for_test();
348        let empty = Frames {
349            frames: [0; 8],
350            count: 0,
351        };
352        record(&empty, 50);
353        let report = call_sites_report();
354        assert!(
355            report.iter().all(|s| s.frame_count > 0),
356            "zero-frame capture should not appear"
357        );
358    }
359}