Skip to main content

ferroalloc_probe/
lib.rs

1use crossbeam_queue::SegQueue;
2use serde::Serialize;
3use std::alloc::{GlobalAlloc, Layout, System};
4use std::cell::Cell;
5use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
6
7// Thread-local guard preventing re-entrant allocations triggered by the probe itself.
8// Backtrace collection internally allocates, so without this we'd recurse infinitely.
9thread_local! {
10    static IN_PROBE: Cell<bool> = const { Cell::new(false) };
11}
12
13// Gate: recording is disabled until start_flush_thread() connects to the analyzer.
14static PROBE_ACTIVE: AtomicBool = AtomicBool::new(false);
15
16// Sampling: record only 1 out of every N allocations.
17static SAMPLE_RATE: AtomicU32 = AtomicU32::new(1);
18static ALLOC_COUNTER: AtomicU64 = AtomicU64::new(0);
19
20/// Set the sampling rate. Only 1 in every `n` allocations will be recorded.
21pub fn set_sample_rate(n: u32) {
22    SAMPLE_RATE.store(n.max(1), Ordering::Relaxed);
23}
24
25/// An allocation event with the source location already resolved by the probe.
26/// Resolving at the probe side avoids ASLR/DWARF mismatch issues on macOS.
27#[derive(Serialize, Debug)]
28pub struct AllocEvent {
29    pub kind: &'static str, // "alloc" | "dealloc"
30    pub ptr: u64,
31    pub size: usize,
32    pub file: String,
33    pub line: u32,
34    pub function: String,
35}
36
37// Lock-free global queue drained by the background flush thread
38pub static EVENT_QUEUE: SegQueue<AllocEvent> = SegQueue::new();
39
40/// Drop-in global allocator that wraps the system allocator and records every
41/// heap operation into `EVENT_QUEUE` for streaming to the ferroalloc analyzer.
42///
43/// # Usage
44///
45/// ```rust,no_run
46/// use ferroalloc_probe::{FerroAllocator, start_flush_thread};
47///
48/// #[global_allocator]
49/// static ALLOC: FerroAllocator = FerroAllocator;
50///
51/// fn main() {
52///     start_flush_thread(7777);
53///     // ... rest of your program
54/// }
55/// ```
56pub struct FerroAllocator;
57
58unsafe impl GlobalAlloc for FerroAllocator {
59    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
60        let ptr = System.alloc(layout);
61        if !ptr.is_null() {
62            record(ptr as u64, layout.size(), "alloc");
63        }
64        ptr
65    }
66
67    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
68        System.dealloc(ptr, layout);
69        record(ptr as u64, layout.size(), "dealloc");
70    }
71
72    unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
73        let ptr = System.alloc_zeroed(layout);
74        if !ptr.is_null() {
75            record(ptr as u64, layout.size(), "alloc");
76        }
77        ptr
78    }
79
80    unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
81        let new_ptr = System.realloc(ptr, layout, new_size);
82        if !new_ptr.is_null() {
83            record(ptr as u64, layout.size(), "dealloc");
84            record(new_ptr as u64, new_size, "alloc");
85        }
86        new_ptr
87    }
88}
89
90fn record(ptr: u64, size: usize, kind: &'static str) {
91    if !PROBE_ACTIVE.load(Ordering::Relaxed) {
92        return;
93    }
94
95    let already_in = IN_PROBE.with(|g| {
96        if g.get() {
97            true
98        } else {
99            g.set(true);
100            false
101        }
102    });
103    if already_in {
104        return;
105    }
106
107    // Apply sampling
108    let rate = SAMPLE_RATE.load(Ordering::Relaxed);
109    if rate > 1 {
110        let count = ALLOC_COUNTER.fetch_add(1, Ordering::Relaxed);
111        if !count.is_multiple_of(rate as u64) {
112            IN_PROBE.with(|g| g.set(false));
113            return;
114        }
115    }
116
117    // Resolve source location at the probe side using the runtime symbol table.
118    // This avoids ASLR/DWARF address mismatch issues on macOS.
119    let mut file = String::new();
120    let mut line: u32 = 0;
121    let mut function = String::new();
122    let mut found = false;
123
124    unsafe {
125        backtrace::trace_unsynchronized(|frame| {
126            if found {
127                return false;
128            }
129            backtrace::resolve_frame_unsynchronized(frame, |symbol| {
130                let fname = symbol.name().map(|n| n.to_string()).unwrap_or_default();
131
132                // Skip internal frames from the probe, backtrace, std, and core
133                let is_internal = fname.contains("ferroalloc_probe")
134                    || fname.contains("backtrace::")
135                    || fname.starts_with("std::")
136                    || fname.starts_with("core::")
137                    || fname.starts_with("alloc::")
138                    || fname.contains("__rust_")
139                    || fname.contains("_ZN");
140
141                let fpath = symbol
142                    .filename()
143                    .map(|p| p.to_string_lossy().into_owned())
144                    .unwrap_or_default();
145
146                // Skip frames from cargo registry, rustup toolchain, and system paths
147                let is_dep = fpath.contains(".cargo/registry")
148                    || fpath.contains(".rustup")
149                    || fpath.contains("/rustc/")
150                    || fpath.starts_with("/usr/")
151                    || fpath.starts_with("/Library/");
152
153                if is_internal || is_dep || fpath.is_empty() {
154                    return;
155                }
156
157                file = fpath;
158                line = symbol.lineno().unwrap_or(0);
159                function = fname;
160                found = true;
161            });
162            !found
163        });
164    }
165
166    // Always push the event so tests and environments without debug symbols still
167    // record alloc/dealloc operations. file/line/function may be empty when the
168    // backtrace resolver cannot find a user frame (e.g. CI without debug info).
169    EVENT_QUEUE.push(AllocEvent {
170        kind,
171        ptr,
172        size,
173        file,
174        line,
175        function,
176    });
177
178    IN_PROBE.with(|g| g.set(false));
179}
180
181/// Starts the background flush thread that streams allocation events to the analyzer.
182///
183/// Must be called once at program startup, before allocations of interest occur.
184/// The analyzer must be listening on `127.0.0.1:<port>` (default: 7777).
185pub fn start_flush_thread(port: u16) {
186    std::thread::Builder::new()
187        .name("ferroalloc-flush".into())
188        .spawn(move || flush_loop(port))
189        .expect("failed to spawn ferroalloc flush thread");
190}
191
192fn flush_loop(port: u16) {
193    use std::io::Write;
194    use std::net::TcpStream;
195
196    // Permanently mark this thread so that none of its own allocations
197    // (e.g. serde_json serialization, TcpStream buffers) are ever recorded.
198    // Without this, dealloc() called from inside flush_loop re-enters
199    // backtrace::resolve_frame_unsynchronized and crashes on macOS.
200    IN_PROBE.with(|g| g.set(true));
201
202    let addr = format!("127.0.0.1:{port}");
203    loop {
204        match TcpStream::connect(&addr) {
205            Ok(mut stream) => {
206                PROBE_ACTIVE.store(true, Ordering::Relaxed);
207                'send: loop {
208                    while let Some(event) = EVENT_QUEUE.pop() {
209                        if let Ok(mut json) = serde_json::to_vec(&event) {
210                            json.push(b'\n');
211                            if stream.write_all(&json).is_err() {
212                                PROBE_ACTIVE.store(false, Ordering::Relaxed);
213                                break 'send;
214                            }
215                        }
216                    }
217                    std::thread::sleep(std::time::Duration::from_millis(50));
218                }
219            }
220            Err(_) => std::thread::sleep(std::time::Duration::from_millis(500)),
221        }
222    }
223}
224
225#[cfg(test)]
226mod tests {
227    use super::*;
228    use std::alloc::Layout;
229    use std::sync::Mutex;
230
231    // Tests share a global EVENT_QUEUE, so they must run serially.
232    static TEST_LOCK: Mutex<()> = Mutex::new(());
233
234    fn drain_queue() -> Vec<AllocEvent> {
235        let mut events = Vec::new();
236        while let Some(e) = EVENT_QUEUE.pop() {
237            events.push(e);
238        }
239        events
240    }
241
242    fn activate() {
243        PROBE_ACTIVE.store(true, Ordering::Relaxed);
244    }
245
246    fn deactivate() {
247        PROBE_ACTIVE.store(false, Ordering::Relaxed);
248    }
249
250    #[test]
251    fn alloc_pushes_event_to_queue() {
252        let _guard = TEST_LOCK.lock().unwrap();
253        activate();
254        drain_queue();
255
256        let layout = Layout::from_size_align(64, 8).unwrap();
257        unsafe {
258            let ptr = FerroAllocator.alloc(layout);
259            assert!(!ptr.is_null());
260
261            let events = drain_queue();
262            assert!(events
263                .iter()
264                .any(|e| e.kind == "alloc" && e.size == 64 && e.ptr == ptr as u64));
265
266            FerroAllocator.dealloc(ptr, layout);
267        }
268        deactivate();
269    }
270
271    #[test]
272    fn dealloc_pushes_event_to_queue() {
273        let _guard = TEST_LOCK.lock().unwrap();
274        activate();
275        drain_queue();
276
277        let layout = Layout::from_size_align(128, 8).unwrap();
278        unsafe {
279            let ptr = FerroAllocator.alloc(layout);
280            drain_queue();
281
282            FerroAllocator.dealloc(ptr, layout);
283
284            let events = drain_queue();
285            assert!(events
286                .iter()
287                .any(|e| e.kind == "dealloc" && e.ptr == ptr as u64));
288        }
289        deactivate();
290    }
291
292    #[test]
293    fn realloc_emits_dealloc_then_alloc() {
294        let _guard = TEST_LOCK.lock().unwrap();
295        activate();
296        drain_queue();
297
298        let layout = Layout::from_size_align(64, 8).unwrap();
299        unsafe {
300            let ptr = FerroAllocator.alloc(layout);
301            drain_queue();
302
303            let new_ptr = FerroAllocator.realloc(ptr, layout, 256);
304            assert!(!new_ptr.is_null());
305
306            let events = drain_queue();
307            assert!(events
308                .iter()
309                .any(|e| e.kind == "dealloc" && e.ptr == ptr as u64));
310            assert!(events.iter().any(|e| e.kind == "alloc" && e.size == 256));
311
312            FerroAllocator.dealloc(new_ptr, Layout::from_size_align(256, 8).unwrap());
313        }
314        deactivate();
315    }
316
317    #[test]
318    fn frames_are_captured() {
319        let _guard = TEST_LOCK.lock().unwrap();
320        activate();
321        drain_queue();
322
323        let layout = Layout::from_size_align(32, 8).unwrap();
324        unsafe {
325            let ptr = FerroAllocator.alloc(layout);
326            let events = drain_queue();
327            // With probe-side resolution, file should be non-empty for test code
328            let event = events.iter().find(|e| e.kind == "alloc");
329            assert!(event.is_some(), "alloc event should be captured");
330
331            FerroAllocator.dealloc(ptr, layout);
332        }
333        deactivate();
334    }
335}