near_rust_allocator_proxy/
allocator.rs1use arr_macro::arr;
2use backtrace::Backtrace;
3use libc;
4use log::{info, warn};
5use rand::Rng;
6use std::alloc::{GlobalAlloc, Layout};
7use std::cell::RefCell;
8use std::cmp::{max, min};
9use std::fs::OpenOptions;
10use std::io::Write;
11use std::mem;
12use std::os::raw::c_void;
13use std::sync::atomic::{AtomicUsize, Ordering};
14
15const MEBIBYTE: usize = 1024 * 1024;
16const MIN_BLOCK_SIZE: usize = 1000;
17const SMALL_BLOCK_TRACE_PROBABILITY: usize = 1;
18const REPORT_USAGE_INTERVAL: usize = 512 * MEBIBYTE;
19const SKIP_ADDR: u64 = 0x700000000000;
20const PRINT_STACK_TRACE_ON_MEMORY_SPIKE: bool = true;
21
22#[cfg(target_os = "linux")]
23const ENABLE_STACK_TRACE: bool = true;
24
25#[cfg(not(target_os = "linux"))]
27const ENABLE_STACK_TRACE: bool = false;
28
29const COUNTERS_SIZE: usize = 16384;
30static JEMALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
31static MEM_SIZE: [AtomicUsize; COUNTERS_SIZE as usize] = arr![AtomicUsize::new(0); 16384];
32static MEM_CNT: [AtomicUsize; COUNTERS_SIZE as usize] = arr![AtomicUsize::new(0); 16384];
33
34static mut SKIP_PTR: [u8; 1 << 20] = [0; 1 << 20];
35static mut CHECKED_PTR: [u8; 1 << 20] = [0; 1 << 20];
36
37const STACK_SIZE: usize = 1;
38const MAX_STACK: usize = 15;
39const SAVE_STACK_TRACES_TO_FILE: bool = false;
40
41const SKIPPED_TRACE: *mut c_void = 1 as *mut c_void;
42const MISSING_TRACE: *mut c_void = 2 as *mut c_void;
43
44#[repr(C)]
45struct AllocHeader {
46 magic: u64,
47 size: u64,
48 tid: u64,
49 stack: [*mut c_void; STACK_SIZE],
50}
51
52const HEADER_SIZE: usize = mem::size_of::<AllocHeader>();
53const MAGIC_RUST: usize = 0x12345678991100;
54
55thread_local! {
56 pub static TID: RefCell<usize> = RefCell::new(0);
57 pub static IN_TRACE: RefCell<usize> = RefCell::new(0);
58 pub static MEMORY_USAGE_MAX: RefCell<usize> = RefCell::new(0);
59 pub static MEMORY_USAGE_LAST_REPORT: RefCell<usize> = RefCell::new(0);
60}
61
62#[cfg(not(target_os = "linux"))]
63pub static NTHREADS: AtomicUsize = AtomicUsize::new(0);
64
65#[cfg(target_os = "linux")]
66pub fn get_tid() -> usize {
67 let res = TID.with(|t| {
68 if *t.borrow() == 0 {
69 *t.borrow_mut() = nix::unistd::gettid().as_raw() as usize;
70 }
71 *t.borrow()
72 });
73 res
74}
75
76#[cfg(not(target_os = "linux"))]
77pub fn get_tid() -> usize {
78 let res = TID.with(|t| {
79 if *t.borrow() == 0 {
80 *t.borrow_mut() = NTHREADS.fetch_add(1, Ordering::SeqCst) as usize;
81 }
82 *t.borrow()
83 });
84 res
85}
86
87pub fn murmur64(mut h: u64) -> u64 {
88 h ^= h >> 33;
89 h = h.overflowing_mul(0xff51afd7ed558ccd).0;
90 h ^= h >> 33;
91 h = h.overflowing_mul(0xc4ceb9fe1a85ec53).0;
92 h ^= h >> 33;
93 return h;
94}
95
96const IGNORE_START: &'static [&'static str] = &[
97 "__rg_",
98 "_ZN5actix",
99 "_ZN5alloc",
100 "_ZN6base64",
101 "_ZN6cached",
102 "_ZN4core",
103 "_ZN9hashbrown",
104 "_ZN20reed_solomon_erasure",
105 "_ZN5tokio",
106 "_ZN10tokio_util",
107 "_ZN3std",
108 "_ZN8smallvec",
109];
110
111const IGNORE_INSIDE: &'static [&'static str] = &[
112 "$LT$actix..",
113 "$LT$alloc..",
114 "$LT$base64..",
115 "$LT$cached..",
116 "$LT$core..",
117 "$LT$hashbrown..",
118 "$LT$reed_solomon_erasure..",
119 "$LT$tokio..",
120 "$LT$tokio_util..",
121 "$LT$serde_json..",
122 "$LT$std..",
123 "$LT$tracing_subscriber..",
124];
125
126fn skip_ptr(addr: *mut c_void) -> bool {
127 if addr as u64 >= SKIP_ADDR {
128 return true;
129 }
130 let mut found = false;
131 backtrace::resolve(addr, |symbol| {
132 if let Some(name) = symbol.name() {
133 let name = name.as_str().unwrap_or("");
134 for &s in IGNORE_START {
135 if name.starts_with(s) {
136 found = true;
137 break;
138 }
139 }
140 for &s in IGNORE_INSIDE {
141 if name.contains(s) {
142 found = true;
143 break;
144 }
145 }
146 }
147 });
148
149 return found;
150}
151
152pub fn current_thread_memory_usage() -> usize {
153 let tid = get_tid();
154 let memory_usage = MEM_SIZE[tid % COUNTERS_SIZE].load(Ordering::SeqCst);
155 memory_usage
156}
157
158pub fn thread_memory_usage(tid: usize) -> usize {
159 let memory_usage = MEM_SIZE[tid % COUNTERS_SIZE].load(Ordering::SeqCst);
160 memory_usage
161}
162
163pub fn thread_memory_count(tid: usize) -> usize {
164 let memory_cnt = MEM_CNT[tid % COUNTERS_SIZE].load(Ordering::SeqCst);
165 memory_cnt
166}
167
168pub fn current_thread_peak_memory_usage() -> usize {
169 MEMORY_USAGE_MAX.with(|x| *x.borrow())
170}
171
172pub fn reset_memory_usage_max() {
173 let tid = get_tid();
174 let memory_usage = MEM_SIZE[tid % COUNTERS_SIZE].load(Ordering::SeqCst);
175 MEMORY_USAGE_MAX.with(|x| *x.borrow_mut() = memory_usage);
176 MEMORY_USAGE_LAST_REPORT.with(|x| *x.borrow_mut() = memory_usage);
177}
178
179
180pub struct MyAllocator;
181unsafe impl GlobalAlloc for MyAllocator {
182 unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
183 let new_layout =
184 Layout::from_size_align(layout.size() + HEADER_SIZE, layout.align()).unwrap();
185
186 let res = JEMALLOC.alloc(new_layout);
187
188 let tid = get_tid();
189 let memory_usage = MEM_SIZE[tid % COUNTERS_SIZE].fetch_add(layout.size(), Ordering::SeqCst);
190 MEM_CNT[tid % COUNTERS_SIZE].fetch_add(1, Ordering::SeqCst);
191
192 if PRINT_STACK_TRACE_ON_MEMORY_SPIKE && memory_usage > REPORT_USAGE_INTERVAL + MEMORY_USAGE_LAST_REPORT.with(|x| *x.borrow()) {
193 if IN_TRACE.with(|in_trace| *in_trace.borrow()) == 0 {
194 IN_TRACE.with(|in_trace| *in_trace.borrow_mut() = 1);
195 MEMORY_USAGE_LAST_REPORT.with(|x| *x.borrow_mut() = memory_usage);
196
197 let bt = Backtrace::new();
198
199 warn!(
200 "Thread {} reached new record of memory usage {}MiB\n{:?}",
201 tid,
202 memory_usage / MEBIBYTE,
203 bt
204 );
205 IN_TRACE.with(|in_trace| *in_trace.borrow_mut() = 0);
206 }
207 }
208 if memory_usage > MEMORY_USAGE_MAX.with(|x| *x.borrow()) {
209 MEMORY_USAGE_MAX.with(|x| *x.borrow_mut() = memory_usage);
210 }
211
212 let mut addr: Option<*mut c_void> = Some(MISSING_TRACE);
213 let mut ary: [*mut c_void; MAX_STACK + 1] = [0 as *mut c_void; MAX_STACK + 1];
214 let mut chosen_i = 0;
215
216 if ENABLE_STACK_TRACE && IN_TRACE.with(|in_trace| *in_trace.borrow()) == 0 {
217 IN_TRACE.with(|in_trace| *in_trace.borrow_mut() = 1);
218 if layout.size() >= MIN_BLOCK_SIZE || rand::thread_rng().gen_range(0, 100) < SMALL_BLOCK_TRACE_PROBABILITY {
219 let size = libc::backtrace(ary.as_ptr() as *mut *mut c_void, MAX_STACK as i32);
220 ary[0] = 0 as *mut c_void;
221 for i in 1..min(size as usize, MAX_STACK) {
222 addr = Some(ary[i] as *mut c_void);
223 chosen_i = i;
224 if ary[i] < SKIP_ADDR as *mut c_void {
225 let hash = murmur64(ary[i] as u64) % (1 << 23);
226 if (SKIP_PTR[(hash / 8) as usize] >> hash % 8) & 1 == 1 {
227 continue;
228 }
229 if (CHECKED_PTR[(hash / 8) as usize] >> hash % 8) & 1 == 1 {
230 break;
231 }
232 if SAVE_STACK_TRACES_TO_FILE {
233 backtrace::resolve(ary[i], |symbol| {
234 let fname = format!("/tmp/logs/{}", tid);
235 if let Ok(mut f) =
236 OpenOptions::new().create(true).write(true).append(true).open(fname)
237 {
238 if let Some(path) = symbol.filename() {
239 f.write(
240 format!(
241 "PATH {:?} {} {}\n",
242 ary[i],
243 symbol.lineno().unwrap_or(0),
244 path.to_str().unwrap_or("<UNKNOWN>")
245 )
246 .as_bytes(),
247 )
248 .unwrap();
249 }
250 if let Some(name) = symbol.name() {
251 f.write(
252 format!("SYMBOL {:?} {}\n", ary[i], name.to_string())
253 .as_bytes(),
254 )
255 .unwrap();
256 }
257 }
258 });
259 }
260
261 let should_skip = skip_ptr(ary[i]);
262 if should_skip {
263 SKIP_PTR[(hash / 8) as usize] |= 1 << hash % 8;
264 continue;
265 }
266 CHECKED_PTR[(hash / 8) as usize] |= 1 << hash % 8;
267
268 if SAVE_STACK_TRACES_TO_FILE {
269 let fname = format!("/tmp/logs/{}", tid);
270
271 if let Ok(mut f) =
272 OpenOptions::new().create(true).write(true).append(true).open(fname)
273 {
274 f.write(format!("STACK_FOR {:?}\n", addr).as_bytes()).unwrap();
275 let ary2: [*mut c_void; 256] = [0 as *mut c_void; 256];
276 let size2 =
277 libc::backtrace(ary2.as_ptr() as *mut *mut c_void, 256) as usize;
278 for i in 0..size2 {
279 let addr2 = ary2[i];
280
281 backtrace::resolve(addr2, |symbol| {
282 if let Some(name) = symbol.name() {
283 let name = name.as_str().unwrap_or("");
284
285 f.write(
286 format!("STACK {:?} {:?} {:?}\n", i, addr2, name)
287 .as_bytes(),
288 )
289 .unwrap();
290 }
291 });
292 }
293 }
294 }
295 break;
296 }
297 }
298 } else {
299 addr = Some(SKIPPED_TRACE);
300 }
301 IN_TRACE.with(|in_trace| *in_trace.borrow_mut() = 0);
302 }
303
304 let mut stack = [0 as *mut c_void; STACK_SIZE];
305 stack[0] = addr.unwrap_or(0 as *mut c_void);
306 for i in 1..STACK_SIZE {
307 stack[i] =
308 ary[min(MAX_STACK as isize, max(0, chosen_i as isize + i as isize)) as usize];
309 }
310
311 let header = AllocHeader {
312 magic: (MAGIC_RUST + STACK_SIZE) as u64,
313 size: layout.size() as u64,
314 tid: tid as u64,
315 stack,
316 };
317
318 *(res as *mut AllocHeader) = header;
319
320 res.offset(HEADER_SIZE as isize)
321 }
322
323 unsafe fn dealloc(&self, mut ptr: *mut u8, layout: Layout) {
324 let new_layout =
325 Layout::from_size_align(layout.size() + HEADER_SIZE, layout.align()).unwrap();
326
327 ptr = ptr.offset(-(HEADER_SIZE as isize));
328
329 (*(ptr as *mut AllocHeader)).magic = (MAGIC_RUST + STACK_SIZE + 0x100) as u64;
330 let tid: usize = (*(ptr as *mut AllocHeader)).tid as usize;
331
332 MEM_SIZE[tid % COUNTERS_SIZE].fetch_sub(layout.size(), Ordering::SeqCst);
333 MEM_CNT[tid % COUNTERS_SIZE].fetch_sub(1, Ordering::SeqCst);
334
335 JEMALLOC.dealloc(ptr, new_layout);
336 }
337}
338
339pub fn print_counters_ary() {
340 info!("tid {}", get_tid());
341 let mut total_cnt: usize = 0;
342 let mut total_size: usize = 0;
343 for idx in 0..COUNTERS_SIZE {
344 let val: usize = MEM_SIZE.get(idx).unwrap().load(Ordering::SeqCst);
345 if val != 0 {
346 let cnt = MEM_CNT.get(idx).unwrap().load(Ordering::SeqCst);
347 total_cnt += cnt;
348 info!("COUNTERS {}: {} {}", idx, cnt, val);
349 total_size += val;
350 }
351 }
352 info!("COUNTERS TOTAL {} {}", total_cnt, total_size);
353}