Skip to main content

mod_alloc/backtrace/
table.rs

1//! Global call-site aggregation table.
2//!
3//! Open-addressed, fixed-size, atomic-only hash table allocated
4//! once via raw OS pages. `record_event` writes directly here on
5//! every tracked alloc; the public report API drains it into a
6//! `Vec<CallSiteStats>`. The per-thread arena that v0.9.1–v0.9.5
7//! used as a batching layer was removed in v0.9.6 because the
8//! table's matching-path fast write (two atomic ops) is already
9//! cheap enough that batching added more cost than it saved.
10//!
11//! ## Sizing
12//!
13//! Default: 4,096 buckets × 96 bytes = ~384 KB. Override via the
14//! `MOD_ALLOC_BUCKETS` environment variable at process start. The
15//! value is rounded up to the next power of two and clamped to
16//! `[64, 1,048,576]`. Reading the env var allocates a small
17//! `String` on first call; this happens inside the allocator hook
18//! with the reentrancy guard set, so the recursive `alloc` is
19//! forwarded directly to `System` without tracking.
20//!
21//! ## Concurrency
22//!
23//! Buckets use a two-phase publish protocol: the hash field is
24//! CAS-claimed first, then `sample_frames` are written, then
25//! `frame_count` is stored with `Release` to mark the bucket
26//! fully populated. Readers gate on `frame_count > 0` after
27//! observing a non-zero hash; this avoids torn reads of the
28//! sample frames.
29
30use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering};
31
32use super::hash::hash_frames;
33use super::raw_mem::{alloc_pages, free_pages};
34use super::walk::Frames;
35
36const DEFAULT_BUCKETS: usize = 4_096;
37const MIN_BUCKETS: usize = 64;
38const MAX_BUCKETS: usize = 1 << 20; // 1,048,576
39
40/// One row in the per-call-site report.
41///
42/// Frames are raw return addresses, top of stack first. The
43/// `frames[..frame_count]` slice is the captured trace; the rest
44/// is zero. Symbolication happens at report-generation time
45/// (shipped in v0.9.2 behind the `symbolicate` feature).
46///
47/// # Stability
48///
49/// Marked `#[non_exhaustive]` as of v1.0.0. New counter fields
50/// (e.g. per-bucket high-water marks) may be added in future
51/// minor versions without bumping the major version. Reading
52/// fields by name is fully stable. Iterate via
53/// [`ModAlloc::call_sites`](crate::ModAlloc::call_sites) rather
54/// than constructing literals.
55#[derive(Debug, Clone, Copy, Default)]
56#[non_exhaustive]
57pub struct CallSiteStats {
58    /// Raw return addresses, top of stack first.
59    pub frames: [u64; 8],
60    /// Number of valid frames in `frames`.
61    pub frame_count: u8,
62    /// Number of allocations attributed to this call site.
63    pub count: u64,
64    /// Total bytes allocated at this call site (across all
65    /// recorded events).
66    pub total_bytes: u64,
67}
68
69#[repr(C, align(16))]
70struct Bucket {
71    hash: AtomicU64,
72    count: AtomicU64,
73    total_bytes: AtomicU64,
74    frame_count: AtomicU64,
75    sample_frames: [AtomicU64; 8],
76}
77
78const BUCKET_SIZE: usize = core::mem::size_of::<Bucket>();
79
80static TABLE_BASE: AtomicPtr<Bucket> = AtomicPtr::new(core::ptr::null_mut());
81static TABLE_BUCKETS: AtomicUsize = AtomicUsize::new(0);
82static TABLE_MASK: AtomicUsize = AtomicUsize::new(0);
83
84fn configured_bucket_count() -> usize {
85    let raw = match std::env::var("MOD_ALLOC_BUCKETS") {
86        Ok(s) => s,
87        Err(_) => return DEFAULT_BUCKETS,
88    };
89    let n: usize = raw.trim().parse().unwrap_or(DEFAULT_BUCKETS);
90    let n = n.clamp(MIN_BUCKETS, MAX_BUCKETS);
91    n.next_power_of_two()
92}
93
94/// Steady-state inline accessor. Folds into the calling
95/// `table::record` body so the fast path (table already
96/// initialised, which is true after the very first event for
97/// the process) becomes three atomic loads with no function-
98/// call boundary.
99#[inline(always)]
100fn ensure_init() -> Option<(*mut Bucket, usize, usize)> {
101    let existing = TABLE_BASE.load(Ordering::Acquire);
102    if !existing.is_null() {
103        let buckets = TABLE_BUCKETS.load(Ordering::Acquire);
104        let mask = TABLE_MASK.load(Ordering::Relaxed);
105        return Some((existing, buckets, mask));
106    }
107    ensure_init_slow()
108}
109
110/// First-event-per-process slow path. Allocates the table via
111/// raw OS pages and CAS-publishes it. Kept out-of-line so the
112/// `record` hot path stays compact.
113#[cold]
114#[inline(never)]
115fn ensure_init_slow() -> Option<(*mut Bucket, usize, usize)> {
116    let buckets = configured_bucket_count();
117    let bytes = buckets * BUCKET_SIZE;
118    // SAFETY: alloc_pages returns either null or a writable,
119    // zero-init region of `bytes` bytes.
120    let pages = unsafe { alloc_pages(bytes) };
121    if pages.is_null() {
122        return None;
123    }
124    let new_base = pages as *mut Bucket;
125
126    match TABLE_BASE.compare_exchange(
127        core::ptr::null_mut(),
128        new_base,
129        Ordering::Release,
130        Ordering::Acquire,
131    ) {
132        Ok(_) => {
133            TABLE_BUCKETS.store(buckets, Ordering::Release);
134            TABLE_MASK.store(buckets - 1, Ordering::Release);
135            Some((new_base, buckets, buckets - 1))
136        }
137        Err(other) => {
138            // SAFETY: we own `pages`; the CAS loser has not used it.
139            unsafe { free_pages(pages, bytes) };
140            // Wait until the winner publishes the count.
141            loop {
142                let b = TABLE_BUCKETS.load(Ordering::Acquire);
143                if b > 0 {
144                    let mask = TABLE_MASK.load(Ordering::Relaxed);
145                    return Some((other, b, mask));
146                }
147                core::hint::spin_loop();
148            }
149        }
150    }
151}
152
153/// Record one captured event into the global table.
154///
155/// Called from `backtrace::record_event` on every tracked alloc.
156/// Marked `#[inline]` so thin-LTO can stitch the hot path
157/// (record_event → hash_frames → record) into the calling
158/// `GlobalAlloc::alloc` body without a cross-module function
159/// call.
160#[inline]
161pub(crate) fn record(frames: &Frames, size: u64) {
162    let count = frames.count as usize;
163    if count == 0 {
164        // Nothing meaningful to bucket on a zero-frame capture
165        // (target unsupported, FP unavailable, etc.). Skip.
166        return;
167    }
168    let Some((base, _buckets, mask)) = ensure_init() else {
169        return;
170    };
171    let h = hash_frames(&frames.frames, count);
172    let mut idx = (h as usize) & mask;
173    let start = idx;
174
175    loop {
176        // SAFETY: `base` is the start of an array of length
177        // `mask + 1`, and `idx <= mask`. Pointer arithmetic stays
178        // in-bounds.
179        let bucket = unsafe { &*base.add(idx) };
180        let existing = bucket.hash.load(Ordering::Acquire);
181
182        if existing == 0 {
183            match bucket
184                .hash
185                .compare_exchange(0, h, Ordering::Release, Ordering::Acquire)
186            {
187                Ok(_) => {
188                    // We own the initialisation phase. Use
189                    // `fetch_add` (not `store`) on `count` and
190                    // `total_bytes` so concurrent matching
191                    // writers never have their increments
192                    // clobbered by our initial value. This lets
193                    // the matching path skip `wait_published`
194                    // entirely.
195                    for i in 0..count {
196                        bucket.sample_frames[i].store(frames.frames[i], Ordering::Relaxed);
197                    }
198                    bucket.count.fetch_add(1, Ordering::Relaxed);
199                    bucket.total_bytes.fetch_add(size, Ordering::Relaxed);
200                    bucket.frame_count.store(count as u64, Ordering::Release);
201                    return;
202                }
203                Err(observed) => {
204                    if observed == h {
205                        // Same call site, another writer claimed
206                        // first. The init thread now uses
207                        // `fetch_add` (not `store`), so our
208                        // increment cannot be clobbered even if
209                        // it lands before init finishes.
210                        bucket.count.fetch_add(1, Ordering::Relaxed);
211                        bucket.total_bytes.fetch_add(size, Ordering::Relaxed);
212                        return;
213                    }
214                    // Different site collided on this slot;
215                    // probe forward.
216                }
217            }
218        } else if existing == h {
219            // Hot path: same site already in this bucket. No
220            // wait_published needed because the init thread now
221            // uses fetch_add for count/total_bytes; whether init
222            // has finished publishing `sample_frames` is a
223            // reader (`call_sites_report`) concern, not ours.
224            bucket.count.fetch_add(1, Ordering::Relaxed);
225            bucket.total_bytes.fetch_add(size, Ordering::Relaxed);
226            return;
227        }
228
229        idx = (idx + 1) & mask;
230        if idx == start {
231            // Table full; drop event silently. Tracked in v0.9.2.
232            return;
233        }
234    }
235}
236
237/// Drain the per-call-site table into a `Vec<CallSiteStats>`.
238///
239/// As of v0.9.6, events go straight from `record_event` into the
240/// global table — there is no per-thread arena to flush first.
241pub fn call_sites_report() -> Vec<CallSiteStats> {
242    let Some((base, buckets, _mask)) = ensure_init() else {
243        return Vec::new();
244    };
245
246    let mut out = Vec::new();
247    for i in 0..buckets {
248        // SAFETY: `i < buckets`, the bound used to allocate the
249        // table, so `base.add(i)` is in-bounds.
250        let bucket = unsafe { &*base.add(i) };
251        let h = bucket.hash.load(Ordering::Acquire);
252        if h == 0 {
253            continue;
254        }
255        let fc = bucket.frame_count.load(Ordering::Acquire);
256        if fc == 0 {
257            // Claimed but not yet published; skip this snapshot.
258            continue;
259        }
260        let n = (fc as usize).min(8);
261        let mut frames = [0u64; 8];
262        for (j, slot) in frames.iter_mut().enumerate().take(n) {
263            *slot = bucket.sample_frames[j].load(Ordering::Relaxed);
264        }
265        out.push(CallSiteStats {
266            frames,
267            frame_count: n as u8,
268            count: bucket.count.load(Ordering::Relaxed),
269            total_bytes: bucket.total_bytes.load(Ordering::Relaxed),
270        });
271    }
272    out
273}
274
275/// Reset the global table. Intended for tests only; production
276/// callers should treat the table as monotonic.
277#[doc(hidden)]
278pub fn _reset_for_test() {
279    let Some((base, buckets, _mask)) = ensure_init() else {
280        return;
281    };
282    for i in 0..buckets {
283        // SAFETY: bounds-checked by `buckets`, same justification
284        // as `call_sites_report`.
285        let bucket = unsafe { &*base.add(i) };
286        bucket.hash.store(0, Ordering::Release);
287        bucket.count.store(0, Ordering::Relaxed);
288        bucket.total_bytes.store(0, Ordering::Relaxed);
289        bucket.frame_count.store(0, Ordering::Release);
290        for j in 0..8 {
291            bucket.sample_frames[j].store(0, Ordering::Relaxed);
292        }
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use std::sync::Mutex;
300
301    // Serialise the table tests within this binary. They share
302    // the process-wide aggregation table, and cargo runs unit
303    // tests in parallel by default.
304    static TEST_LOCK: Mutex<()> = Mutex::new(());
305
306    #[test]
307    fn records_and_reports_a_single_site() {
308        let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
309        _reset_for_test();
310        let frames = Frames {
311            frames: [0xAAAA, 0xBBBB, 0xCCCC, 0, 0, 0, 0, 0],
312            count: 3,
313        };
314        record(&frames, 100);
315        record(&frames, 200);
316
317        let report = call_sites_report();
318        let site = report
319            .iter()
320            .find(|s| s.frames[0] == 0xAAAA && s.frames[1] == 0xBBBB)
321            .expect("expected our site in the report");
322        assert_eq!(site.frame_count, 3);
323        assert_eq!(site.count, 2);
324        assert_eq!(site.total_bytes, 300);
325    }
326
327    #[test]
328    fn distinct_sites_are_separately_aggregated() {
329        let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
330        _reset_for_test();
331        let a = Frames {
332            frames: [0xA000, 0xA001, 0, 0, 0, 0, 0, 0],
333            count: 2,
334        };
335        let b = Frames {
336            frames: [0xB000, 0xB001, 0, 0, 0, 0, 0, 0],
337            count: 2,
338        };
339        for _ in 0..5 {
340            record(&a, 10);
341        }
342        for _ in 0..3 {
343            record(&b, 20);
344        }
345        let report = call_sites_report();
346        let sa = report.iter().find(|s| s.frames[0] == 0xA000).unwrap();
347        let sb = report.iter().find(|s| s.frames[0] == 0xB000).unwrap();
348        assert_eq!(sa.count, 5);
349        assert_eq!(sa.total_bytes, 50);
350        assert_eq!(sb.count, 3);
351        assert_eq!(sb.total_bytes, 60);
352    }
353
354    #[test]
355    fn zero_frame_capture_is_ignored() {
356        let _g = TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
357        _reset_for_test();
358        let empty = Frames {
359            frames: [0; 8],
360            count: 0,
361        };
362        record(&empty, 50);
363        let report = call_sites_report();
364        assert!(
365            report.iter().all(|s| s.frame_count > 0),
366            "zero-frame capture should not appear"
367        );
368    }
369}