Skip to main content

ferroalloc_probe/
lib.rs

1use crossbeam_queue::SegQueue;
2use serde::Serialize;
3use std::alloc::{GlobalAlloc, Layout, System};
4use std::cell::Cell;
5use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
6
7// Thread-local guard preventing re-entrant allocations triggered by the probe itself.
8// Backtrace collection internally allocates, so without this we'd recurse infinitely.
9thread_local! {
10    static IN_PROBE: Cell<bool> = const { Cell::new(false) };
11}
12
13// Gate: recording is disabled until start_flush_thread() connects to the analyzer.
14static PROBE_ACTIVE: AtomicBool = AtomicBool::new(false);
15
16// Maximum number of events buffered in the queue. When full, new events are dropped
17// to prevent unbounded memory growth if the analyzer is disconnected.
18const MAX_QUEUE_LEN: usize = 10_000;
19
20// Sampling: record only 1 out of every N allocations.
21static SAMPLE_RATE: AtomicU32 = AtomicU32::new(1);
22static ALLOC_COUNTER: AtomicU64 = AtomicU64::new(0);
23
24/// Set the sampling rate. Only 1 in every `n` allocations will be recorded.
25pub fn set_sample_rate(n: u32) {
26    SAMPLE_RATE.store(n.max(1), Ordering::Relaxed);
27}
28
29/// An allocation event with the source location already resolved by the probe.
30/// Resolving at the probe side avoids ASLR/DWARF mismatch issues on macOS.
31#[derive(Serialize, Debug)]
32pub struct AllocEvent {
33    pub kind: &'static str, // "alloc" | "dealloc"
34    pub ptr: u64,
35    pub size: usize,
36    pub file: String,
37    pub line: u32,
38    pub function: String,
39}
40
41// Lock-free global queue drained by the background flush thread
42pub static EVENT_QUEUE: SegQueue<AllocEvent> = SegQueue::new();
43
44/// Drop-in global allocator that wraps the system allocator and records every
45/// heap operation into `EVENT_QUEUE` for streaming to the ferroalloc analyzer.
46///
47/// # Usage
48///
49/// ```rust,no_run
50/// use ferroalloc_probe::{FerroAllocator, start_flush_thread};
51///
52/// #[global_allocator]
53/// static ALLOC: FerroAllocator = FerroAllocator;
54///
55/// fn main() {
56///     start_flush_thread(7777);
57///     // ... rest of your program
58/// }
59/// ```
60pub struct FerroAllocator;
61
62unsafe impl GlobalAlloc for FerroAllocator {
63    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
64        let ptr = System.alloc(layout);
65        if !ptr.is_null() {
66            record(ptr as u64, layout.size(), "alloc");
67        }
68        ptr
69    }
70
71    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
72        System.dealloc(ptr, layout);
73        record(ptr as u64, layout.size(), "dealloc");
74    }
75
76    unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
77        let ptr = System.alloc_zeroed(layout);
78        if !ptr.is_null() {
79            record(ptr as u64, layout.size(), "alloc");
80        }
81        ptr
82    }
83
84    unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
85        let new_ptr = System.realloc(ptr, layout, new_size);
86        if !new_ptr.is_null() {
87            if new_ptr == ptr {
88                // In-place resize: the block did not move. Record a dealloc for
89                // the old size and an alloc for the new size without changing the
90                // pointer — this correctly updates live_bytes without inflating
91                // alloc_count with a spurious extra allocation.
92                record(ptr as u64, layout.size(), "dealloc");
93                record(ptr as u64, new_size, "alloc");
94            } else {
95                // The allocator moved the block to a new address.
96                record(ptr as u64, layout.size(), "dealloc");
97                record(new_ptr as u64, new_size, "alloc");
98            }
99        }
100        new_ptr
101    }
102}
103
104fn record(ptr: u64, size: usize, kind: &'static str) {
105    if !PROBE_ACTIVE.load(Ordering::Relaxed) {
106        return;
107    }
108
109    let already_in = IN_PROBE.with(|g| {
110        if g.get() {
111            true
112        } else {
113            g.set(true);
114            false
115        }
116    });
117    if already_in {
118        return;
119    }
120
121    // Apply sampling
122    let rate = SAMPLE_RATE.load(Ordering::Relaxed);
123    if rate > 1 {
124        let count = ALLOC_COUNTER.fetch_add(1, Ordering::Relaxed);
125        if !count.is_multiple_of(rate as u64) {
126            IN_PROBE.with(|g| g.set(false));
127            return;
128        }
129    }
130
131    // Resolve source location at the probe side using the runtime symbol table.
132    // This avoids ASLR/DWARF address mismatch issues on macOS.
133    let mut file = String::new();
134    let mut line: u32 = 0;
135    let mut function = String::new();
136    let mut found = false;
137
138    unsafe {
139        backtrace::trace_unsynchronized(|frame| {
140            if found {
141                return false;
142            }
143            backtrace::resolve_frame_unsynchronized(frame, |symbol| {
144                let fname = symbol.name().map(|n| n.to_string()).unwrap_or_default();
145
146                // Skip internal frames from the probe, backtrace, std, and core
147                let is_internal = fname.contains("ferroalloc_probe")
148                    || fname.contains("backtrace::")
149                    || fname.starts_with("std::")
150                    || fname.starts_with("core::")
151                    || fname.starts_with("alloc::")
152                    || fname.contains("__rust_")
153                    || fname.contains("_ZN");
154
155                let fpath = symbol
156                    .filename()
157                    .map(|p| p.to_string_lossy().into_owned())
158                    .unwrap_or_default();
159
160                // Skip frames from cargo registry, rustup toolchain, and system paths
161                let is_dep = fpath.contains(".cargo/registry")
162                    || fpath.contains(".rustup")
163                    || fpath.contains("/rustc/")
164                    || fpath.starts_with("/usr/")
165                    || fpath.starts_with("/Library/");
166
167                if is_internal || is_dep || fpath.is_empty() {
168                    return;
169                }
170
171                file = fpath;
172                line = symbol.lineno().unwrap_or(0);
173                function = fname;
174                found = true;
175            });
176            !found
177        });
178    }
179
180    // Drop events when the queue is full to prevent unbounded memory growth
181    // if the analyzer is disconnected (fixes OOM on high-allocation programs).
182    if EVENT_QUEUE.len() < MAX_QUEUE_LEN {
183        EVENT_QUEUE.push(AllocEvent {
184            kind,
185            ptr,
186            size,
187            file,
188            line,
189            function,
190        });
191    }
192
193    IN_PROBE.with(|g| g.set(false));
194}
195
196/// Starts the background flush thread that streams allocation events to the analyzer.
197///
198/// Must be called once at program startup, before allocations of interest occur.
199/// The analyzer must be listening on `127.0.0.1:<port>` (default: 7777).
200pub fn start_flush_thread(port: u16) {
201    std::thread::Builder::new()
202        .name("ferroalloc-flush".into())
203        .spawn(move || flush_loop(port))
204        .expect("failed to spawn ferroalloc flush thread");
205}
206
207fn flush_loop(port: u16) {
208    use std::io::Write;
209    use std::net::TcpStream;
210
211    // Permanently mark this thread so that none of its own allocations
212    // (e.g. serde_json serialization, TcpStream buffers) are ever recorded.
213    // Without this, dealloc() called from inside flush_loop re-enters
214    // backtrace::resolve_frame_unsynchronized and crashes on macOS.
215    IN_PROBE.with(|g| g.set(true));
216
217    let addr = format!("127.0.0.1:{port}");
218    loop {
219        match TcpStream::connect(&addr) {
220            Ok(mut stream) => {
221                PROBE_ACTIVE.store(true, Ordering::Relaxed);
222                'send: loop {
223                    while let Some(event) = EVENT_QUEUE.pop() {
224                        if let Ok(mut json) = serde_json::to_vec(&event) {
225                            json.push(b'\n');
226                            if stream.write_all(&json).is_err() {
227                                PROBE_ACTIVE.store(false, Ordering::Relaxed);
228                                break 'send;
229                            }
230                        }
231                    }
232                    std::thread::sleep(std::time::Duration::from_millis(50));
233                }
234            }
235            Err(_) => std::thread::sleep(std::time::Duration::from_millis(500)),
236        }
237    }
238}
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243    use std::alloc::Layout;
244    use std::sync::Mutex;
245
246    // Tests share a global EVENT_QUEUE, so they must run serially.
247    static TEST_LOCK: Mutex<()> = Mutex::new(());
248
249    fn drain_queue() -> Vec<AllocEvent> {
250        let mut events = Vec::new();
251        while let Some(e) = EVENT_QUEUE.pop() {
252            events.push(e);
253        }
254        events
255    }
256
257    fn activate() {
258        PROBE_ACTIVE.store(true, Ordering::Relaxed);
259    }
260
261    fn deactivate() {
262        PROBE_ACTIVE.store(false, Ordering::Relaxed);
263    }
264
265    #[test]
266    fn alloc_pushes_event_to_queue() {
267        let _guard = TEST_LOCK.lock().unwrap();
268        activate();
269        drain_queue();
270
271        let layout = Layout::from_size_align(64, 8).unwrap();
272        unsafe {
273            let ptr = FerroAllocator.alloc(layout);
274            assert!(!ptr.is_null());
275
276            let events = drain_queue();
277            assert!(events
278                .iter()
279                .any(|e| e.kind == "alloc" && e.size == 64 && e.ptr == ptr as u64));
280
281            FerroAllocator.dealloc(ptr, layout);
282        }
283        deactivate();
284    }
285
286    #[test]
287    fn dealloc_pushes_event_to_queue() {
288        let _guard = TEST_LOCK.lock().unwrap();
289        activate();
290        drain_queue();
291
292        let layout = Layout::from_size_align(128, 8).unwrap();
293        unsafe {
294            let ptr = FerroAllocator.alloc(layout);
295            drain_queue();
296
297            FerroAllocator.dealloc(ptr, layout);
298
299            let events = drain_queue();
300            assert!(events
301                .iter()
302                .any(|e| e.kind == "dealloc" && e.ptr == ptr as u64));
303        }
304        deactivate();
305    }
306
307    #[test]
308    fn realloc_emits_dealloc_then_alloc() {
309        let _guard = TEST_LOCK.lock().unwrap();
310        activate();
311        drain_queue();
312
313        let layout = Layout::from_size_align(64, 8).unwrap();
314        unsafe {
315            let ptr = FerroAllocator.alloc(layout);
316            drain_queue();
317
318            let new_ptr = FerroAllocator.realloc(ptr, layout, 256);
319            assert!(!new_ptr.is_null());
320
321            let events = drain_queue();
322            assert!(events
323                .iter()
324                .any(|e| e.kind == "dealloc" && e.ptr == ptr as u64));
325            assert!(events.iter().any(|e| e.kind == "alloc" && e.size == 256));
326
327            FerroAllocator.dealloc(new_ptr, Layout::from_size_align(256, 8).unwrap());
328        }
329        deactivate();
330    }
331
332    #[test]
333    fn frames_are_captured() {
334        let _guard = TEST_LOCK.lock().unwrap();
335        activate();
336        drain_queue();
337
338        let layout = Layout::from_size_align(32, 8).unwrap();
339        unsafe {
340            let ptr = FerroAllocator.alloc(layout);
341            let events = drain_queue();
342            // With probe-side resolution, file should be non-empty for test code
343            let event = events.iter().find(|e| e.kind == "alloc");
344            assert!(event.is_some(), "alloc event should be captured");
345
346            FerroAllocator.dealloc(ptr, layout);
347        }
348        deactivate();
349    }
350}