near_rust_allocator_proxy/
allocator.rs

1use arr_macro::arr;
2use backtrace::Backtrace;
3use libc;
4use log::{info, warn};
5use rand::Rng;
6use std::alloc::{GlobalAlloc, Layout};
7use std::cell::RefCell;
8use std::cmp::{max, min};
9use std::fs::OpenOptions;
10use std::io::Write;
11use std::mem;
12use std::os::raw::c_void;
13use std::sync::atomic::{AtomicUsize, Ordering};
14
15const MEBIBYTE: usize = 1024 * 1024;
16const MIN_BLOCK_SIZE: usize = 1000;
17const SMALL_BLOCK_TRACE_PROBABILITY: usize = 1;
18const REPORT_USAGE_INTERVAL: usize = 512 * MEBIBYTE;
19const SKIP_ADDR: u64 = 0x700000000000;
20const PRINT_STACK_TRACE_ON_MEMORY_SPIKE: bool = true;
21
22#[cfg(target_os = "linux")]
23const ENABLE_STACK_TRACE: bool = true;
24
25// Currently there is no point in getting stack traces on non-linux platform, because other tools don't support linux.
26#[cfg(not(target_os = "linux"))]
27const ENABLE_STACK_TRACE: bool = false;
28
29const COUNTERS_SIZE: usize = 16384;
30static JEMALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
31static MEM_SIZE: [AtomicUsize; COUNTERS_SIZE as usize] = arr![AtomicUsize::new(0); 16384];
32static MEM_CNT: [AtomicUsize; COUNTERS_SIZE as usize] = arr![AtomicUsize::new(0); 16384];
33
34static mut SKIP_PTR: [u8; 1 << 20] = [0; 1 << 20];
35static mut CHECKED_PTR: [u8; 1 << 20] = [0; 1 << 20];
36
37const STACK_SIZE: usize = 1;
38const MAX_STACK: usize = 15;
39const SAVE_STACK_TRACES_TO_FILE: bool = false;
40
41const SKIPPED_TRACE: *mut c_void = 1 as *mut c_void;
42const MISSING_TRACE: *mut c_void = 2 as *mut c_void;
43
44#[repr(C)]
45struct AllocHeader {
46    magic: u64,
47    size: u64,
48    tid: u64,
49    stack: [*mut c_void; STACK_SIZE],
50}
51
52const HEADER_SIZE: usize = mem::size_of::<AllocHeader>();
53const MAGIC_RUST: usize = 0x12345678991100;
54
55thread_local! {
56    pub static TID: RefCell<usize> = RefCell::new(0);
57    pub static IN_TRACE: RefCell<usize> = RefCell::new(0);
58    pub static MEMORY_USAGE_MAX: RefCell<usize> = RefCell::new(0);
59    pub static MEMORY_USAGE_LAST_REPORT: RefCell<usize> = RefCell::new(0);
60}
61
62#[cfg(not(target_os = "linux"))]
63pub static NTHREADS: AtomicUsize = AtomicUsize::new(0);
64
65#[cfg(target_os = "linux")]
66pub fn get_tid() -> usize {
67    let res = TID.with(|t| {
68        if *t.borrow() == 0 {
69                *t.borrow_mut() = nix::unistd::gettid().as_raw() as usize;
70        }
71        *t.borrow()
72    });
73    res
74}
75
76#[cfg(not(target_os = "linux"))]
77pub fn get_tid() -> usize {
78    let res = TID.with(|t| {
79        if *t.borrow() == 0 {
80            *t.borrow_mut() = NTHREADS.fetch_add(1, Ordering::SeqCst) as usize;
81        }
82        *t.borrow()
83    });
84    res
85}
86
87pub fn murmur64(mut h: u64) -> u64 {
88    h ^= h >> 33;
89    h = h.overflowing_mul(0xff51afd7ed558ccd).0;
90    h ^= h >> 33;
91    h = h.overflowing_mul(0xc4ceb9fe1a85ec53).0;
92    h ^= h >> 33;
93    return h;
94}
95
96const IGNORE_START: &'static [&'static str] = &[
97    "__rg_",
98    "_ZN5actix",
99    "_ZN5alloc",
100    "_ZN6base64",
101    "_ZN6cached",
102    "_ZN4core",
103    "_ZN9hashbrown",
104    "_ZN20reed_solomon_erasure",
105    "_ZN5tokio",
106    "_ZN10tokio_util",
107    "_ZN3std",
108    "_ZN8smallvec",
109];
110
111const IGNORE_INSIDE: &'static [&'static str] = &[
112    "$LT$actix..",
113    "$LT$alloc..",
114    "$LT$base64..",
115    "$LT$cached..",
116    "$LT$core..",
117    "$LT$hashbrown..",
118    "$LT$reed_solomon_erasure..",
119    "$LT$tokio..",
120    "$LT$tokio_util..",
121    "$LT$serde_json..",
122    "$LT$std..",
123    "$LT$tracing_subscriber..",
124];
125
126fn skip_ptr(addr: *mut c_void) -> bool {
127    if addr as u64 >= SKIP_ADDR {
128        return true;
129    }
130    let mut found = false;
131    backtrace::resolve(addr, |symbol| {
132        if let Some(name) = symbol.name() {
133            let name = name.as_str().unwrap_or("");
134            for &s in IGNORE_START {
135                if name.starts_with(s) {
136                    found = true;
137                    break;
138                }
139            }
140            for &s in IGNORE_INSIDE {
141                if name.contains(s) {
142                    found = true;
143                    break;
144                }
145            }
146        }
147    });
148
149    return found;
150}
151
152pub fn current_thread_memory_usage() -> usize {
153    let tid = get_tid();
154    let memory_usage = MEM_SIZE[tid % COUNTERS_SIZE].load(Ordering::SeqCst);
155    memory_usage
156}
157
158pub fn thread_memory_usage(tid: usize) -> usize {
159    let memory_usage = MEM_SIZE[tid % COUNTERS_SIZE].load(Ordering::SeqCst);
160    memory_usage
161}
162
163pub fn thread_memory_count(tid: usize) -> usize {
164    let memory_cnt = MEM_CNT[tid % COUNTERS_SIZE].load(Ordering::SeqCst);
165    memory_cnt
166}
167
168pub fn current_thread_peak_memory_usage() -> usize {
169    MEMORY_USAGE_MAX.with(|x| *x.borrow())
170}
171
172pub fn reset_memory_usage_max() {
173    let tid = get_tid();
174    let memory_usage = MEM_SIZE[tid % COUNTERS_SIZE].load(Ordering::SeqCst);
175    MEMORY_USAGE_MAX.with(|x| *x.borrow_mut() = memory_usage);
176    MEMORY_USAGE_LAST_REPORT.with(|x| *x.borrow_mut() = memory_usage);
177}
178
179
180pub struct MyAllocator;
181unsafe impl GlobalAlloc for MyAllocator {
182    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
183        let new_layout =
184            Layout::from_size_align(layout.size() + HEADER_SIZE, layout.align()).unwrap();
185
186        let res = JEMALLOC.alloc(new_layout);
187
188        let tid = get_tid();
189        let memory_usage = MEM_SIZE[tid % COUNTERS_SIZE].fetch_add(layout.size(), Ordering::SeqCst);
190        MEM_CNT[tid % COUNTERS_SIZE].fetch_add(1, Ordering::SeqCst);
191
192        if PRINT_STACK_TRACE_ON_MEMORY_SPIKE && memory_usage > REPORT_USAGE_INTERVAL + MEMORY_USAGE_LAST_REPORT.with(|x| *x.borrow()) {
193            if IN_TRACE.with(|in_trace| *in_trace.borrow()) == 0 {
194                IN_TRACE.with(|in_trace| *in_trace.borrow_mut() = 1);
195                MEMORY_USAGE_LAST_REPORT.with(|x| *x.borrow_mut() = memory_usage);
196
197                let bt = Backtrace::new();
198
199                warn!(
200                    "Thread {} reached new record of memory usage {}MiB\n{:?}",
201                    tid,
202                    memory_usage / MEBIBYTE,
203                    bt
204                );
205                IN_TRACE.with(|in_trace| *in_trace.borrow_mut() = 0);
206            }
207        }
208        if memory_usage > MEMORY_USAGE_MAX.with(|x| *x.borrow()) {
209            MEMORY_USAGE_MAX.with(|x| *x.borrow_mut() = memory_usage);
210        }
211
212        let mut addr: Option<*mut c_void> = Some(MISSING_TRACE);
213        let mut ary: [*mut c_void; MAX_STACK + 1] = [0 as *mut c_void; MAX_STACK + 1];
214        let mut chosen_i = 0;
215
216        if ENABLE_STACK_TRACE && IN_TRACE.with(|in_trace| *in_trace.borrow()) == 0 {
217            IN_TRACE.with(|in_trace| *in_trace.borrow_mut() = 1);
218            if layout.size() >= MIN_BLOCK_SIZE || rand::thread_rng().gen_range(0, 100) < SMALL_BLOCK_TRACE_PROBABILITY {
219                let size = libc::backtrace(ary.as_ptr() as *mut *mut c_void, MAX_STACK as i32);
220                ary[0] = 0 as *mut c_void;
221                for i in 1..min(size as usize, MAX_STACK) {
222                    addr = Some(ary[i] as *mut c_void);
223                    chosen_i = i;
224                    if ary[i] < SKIP_ADDR as *mut c_void {
225                        let hash = murmur64(ary[i] as u64) % (1 << 23);
226                        if (SKIP_PTR[(hash / 8) as usize] >> hash % 8) & 1 == 1 {
227                            continue;
228                        }
229                        if (CHECKED_PTR[(hash / 8) as usize] >> hash % 8) & 1 == 1 {
230                            break;
231                        }
232                        if SAVE_STACK_TRACES_TO_FILE {
233                            backtrace::resolve(ary[i], |symbol| {
234                                    let fname = format!("/tmp/logs/{}", tid);
235                                    if let Ok(mut f) =
236                                    OpenOptions::new().create(true).write(true).append(true).open(fname)
237                                    {
238                                        if let Some(path) = symbol.filename() {
239                                            f.write(
240                                                format!(
241                                                    "PATH {:?} {} {}\n",
242                                                    ary[i],
243                                                    symbol.lineno().unwrap_or(0),
244                                                    path.to_str().unwrap_or("<UNKNOWN>")
245                                                )
246                                                    .as_bytes(),
247                                            )
248                                                .unwrap();
249                                        }
250                                        if let Some(name) = symbol.name() {
251                                            f.write(
252                                                format!("SYMBOL {:?} {}\n", ary[i], name.to_string())
253                                                    .as_bytes(),
254                                            )
255                                                .unwrap();
256                                        }
257                                    }
258                            });
259                        }
260
261                        let should_skip = skip_ptr(ary[i]);
262                        if should_skip {
263                            SKIP_PTR[(hash / 8) as usize] |= 1 << hash % 8;
264                            continue;
265                        }
266                        CHECKED_PTR[(hash / 8) as usize] |= 1 << hash % 8;
267
268                        if SAVE_STACK_TRACES_TO_FILE {
269                            let fname = format!("/tmp/logs/{}", tid);
270
271                            if let Ok(mut f) =
272                            OpenOptions::new().create(true).write(true).append(true).open(fname)
273                            {
274                                f.write(format!("STACK_FOR {:?}\n", addr).as_bytes()).unwrap();
275                                let ary2: [*mut c_void; 256] = [0 as *mut c_void; 256];
276                                let size2 =
277                                    libc::backtrace(ary2.as_ptr() as *mut *mut c_void, 256) as usize;
278                                for i in 0..size2 {
279                                    let addr2 = ary2[i];
280
281                                    backtrace::resolve(addr2, |symbol| {
282                                        if let Some(name) = symbol.name() {
283                                            let name = name.as_str().unwrap_or("");
284
285                                            f.write(
286                                                format!("STACK {:?} {:?} {:?}\n", i, addr2, name)
287                                                    .as_bytes(),
288                                            )
289                                                .unwrap();
290                                        }
291                                    });
292                                }
293                            }
294                        }
295                        break;
296                    }
297                }
298            } else {
299                addr = Some(SKIPPED_TRACE);
300            }
301            IN_TRACE.with(|in_trace| *in_trace.borrow_mut() = 0);
302        }
303
304        let mut stack = [0 as *mut c_void; STACK_SIZE];
305        stack[0] = addr.unwrap_or(0 as *mut c_void);
306        for i in 1..STACK_SIZE {
307            stack[i] =
308                ary[min(MAX_STACK as isize, max(0, chosen_i as isize + i as isize)) as usize];
309        }
310
311        let header = AllocHeader {
312            magic: (MAGIC_RUST + STACK_SIZE) as u64,
313            size: layout.size() as u64,
314            tid: tid as u64,
315            stack,
316        };
317
318        *(res as *mut AllocHeader) = header;
319
320        res.offset(HEADER_SIZE as isize)
321    }
322
323    unsafe fn dealloc(&self, mut ptr: *mut u8, layout: Layout) {
324        let new_layout =
325            Layout::from_size_align(layout.size() + HEADER_SIZE, layout.align()).unwrap();
326
327        ptr = ptr.offset(-(HEADER_SIZE as isize));
328
329        (*(ptr as *mut AllocHeader)).magic = (MAGIC_RUST + STACK_SIZE + 0x100) as u64;
330        let tid: usize = (*(ptr as *mut AllocHeader)).tid as usize;
331
332        MEM_SIZE[tid % COUNTERS_SIZE].fetch_sub(layout.size(), Ordering::SeqCst);
333        MEM_CNT[tid % COUNTERS_SIZE].fetch_sub(1, Ordering::SeqCst);
334
335        JEMALLOC.dealloc(ptr, new_layout);
336    }
337}
338
339pub fn print_counters_ary() {
340    info!("tid {}", get_tid());
341    let mut total_cnt: usize = 0;
342    let mut total_size: usize = 0;
343    for idx in 0..COUNTERS_SIZE {
344        let val: usize = MEM_SIZE.get(idx).unwrap().load(Ordering::SeqCst);
345        if val != 0 {
346            let cnt = MEM_CNT.get(idx).unwrap().load(Ordering::SeqCst);
347            total_cnt += cnt;
348            info!("COUNTERS {}: {} {}", idx, cnt, val);
349            total_size += val;
350        }
351    }
352    info!("COUNTERS TOTAL {} {}", total_cnt, total_size);
353}