parallel_processor/
debug_allocator.rs1use dashmap::DashMap;
2use once_cell::sync::Lazy;
3use serde::{Deserialize, Serialize};
4use serde_json::to_string_pretty;
5use std::alloc::{GlobalAlloc, Layout, System};
6use std::fs::File;
7use std::io::Write;
8use std::path::Path;
9use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
10use std::time::Duration;
11
12pub struct DebugAllocator {
13 default_allocator: std::alloc::System,
14}
15
16struct AllocationInfo {
17 bt: String,
18 current_count: AtomicUsize,
19 current_size: AtomicUsize,
20 max_size: AtomicUsize,
21 total_count: AtomicUsize,
22}
23
24impl AllocationInfo {
25 pub fn as_writable(&self) -> AllocationInfoWritable {
26 AllocationInfoWritable {
27 bt: self.bt.clone(),
28 current_count: self.current_count.load(Ordering::Relaxed),
29 current_size: self.current_size.load(Ordering::Relaxed),
30 max_size: self.max_size.load(Ordering::Relaxed),
31 total_count: self.total_count.load(Ordering::Relaxed),
32 }
33 }
34}
35
36#[derive(Serialize, Deserialize, Clone)]
37struct AllocationInfoWritable {
38 bt: String,
39 current_count: usize,
40 current_size: usize,
41 max_size: usize,
42 total_count: usize,
43}
44
45static ALLOCATION_INFOS: Lazy<DashMap<String, AllocationInfo>> = Lazy::new(|| DashMap::new());
46static ADDRESSES_BACKTRACE: Lazy<DashMap<usize, String>> = Lazy::new(|| DashMap::new());
47
48pub fn debug_print_allocations(dir: impl AsRef<Path>, period: Duration) {
49 let dir = dir.as_ref().to_path_buf();
50 std::thread::spawn(move || {
51 IS_NESTED.with(|n| n.store(true, Ordering::Relaxed));
52 let mut count = 1;
53 loop {
54 std::thread::sleep(period);
55
56 let path = dir.join(format!("memory-log{}.json", count));
57
58 let mut allocations: Vec<_> =
59 ALLOCATION_INFOS.iter().map(|x| x.as_writable()).collect();
60
61 allocations.sort_by(|x, y| y.max_size.cmp(&x.max_size));
62
63 let _ = File::create(path)
64 .unwrap()
65 .write_all(to_string_pretty(&allocations).unwrap().as_bytes());
66
67 count += 1;
68 }
69 });
70}
71
72fn store_backtrace(addr: *mut u8, size: usize) {
73 let bt: backtrace::Backtrace = backtrace::Backtrace::new();
74
75 let bt_string = format!("{:?}", bt);
76
77 let parts = bt_string.split(" 5:").collect::<Vec<_>>();
78
79 let bt_string = parts.last().unwrap().to_string();
80
81 ADDRESSES_BACKTRACE.insert(addr as usize, bt_string.clone());
82
83 let info = ALLOCATION_INFOS
84 .entry(bt_string.clone())
85 .or_insert(AllocationInfo {
86 bt: bt_string,
87 current_count: AtomicUsize::new(0),
88 current_size: AtomicUsize::new(0),
89 max_size: AtomicUsize::new(0),
90 total_count: AtomicUsize::new(0),
91 });
92
93 info.current_count.fetch_add(1, Ordering::Relaxed);
94 info.current_size.fetch_add(size, Ordering::Relaxed);
95 info.total_count.fetch_add(1, Ordering::Relaxed);
96 info.max_size
97 .fetch_max(info.current_size.load(Ordering::Relaxed), Ordering::Relaxed);
98}
99
100fn update_backtrace(ptr: *mut u8, new_ptr: *mut u8, diff: isize) {
101 let (_, bt) = ADDRESSES_BACKTRACE.remove(&(ptr as usize)).unwrap();
102
103 let aref = ALLOCATION_INFOS.get(&bt).unwrap();
104 if diff > 0 {
105 aref.current_size
106 .fetch_add(diff as usize, Ordering::Relaxed);
107 aref.max_size
108 .fetch_max(aref.current_size.load(Ordering::Relaxed), Ordering::Relaxed);
109 } else {
110 aref.current_size
111 .fetch_sub((-diff) as usize, Ordering::Relaxed);
112 }
113
114 ADDRESSES_BACKTRACE.insert(new_ptr as usize, bt);
115}
116
117fn dealloc_backtrace(ptr: *mut u8, size: usize) {
118 let (_, bt) = ADDRESSES_BACKTRACE.remove(&(ptr as usize)).unwrap();
119
120 let aref = ALLOCATION_INFOS.get(&bt).unwrap();
121 aref.current_count.fetch_sub(1, Ordering::Relaxed);
122 aref.current_size.fetch_sub(size, Ordering::Relaxed);
123}
124
125impl DebugAllocator {
126 pub const fn new() -> Self {
127 Self {
128 default_allocator: System,
129 }
130 }
131}
132
133thread_local! {
134 static IS_NESTED: AtomicBool = AtomicBool::new(false);
135}
136
137unsafe impl GlobalAlloc for DebugAllocator {
138 unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
139 let ptr = self.default_allocator.alloc(layout);
140 if !IS_NESTED.with(|n| n.swap(true, Ordering::Relaxed)) {
141 store_backtrace(ptr, layout.size());
142 IS_NESTED.with(|n| n.store(false, Ordering::Relaxed));
143 }
144 ptr
145 }
146
147 unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
148 if !IS_NESTED.with(|n| n.swap(true, Ordering::Relaxed)) {
149 dealloc_backtrace(ptr, layout.size());
150 IS_NESTED.with(|n| n.store(false, Ordering::Relaxed));
151 }
152 self.default_allocator.dealloc(ptr, layout)
153 }
154
155 unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
156 let ptr = self.default_allocator.alloc_zeroed(layout);
157 if !IS_NESTED.with(|n| n.swap(true, Ordering::Relaxed)) {
158 store_backtrace(ptr, layout.size());
159 IS_NESTED.with(|n| n.store(false, Ordering::Relaxed));
160 }
161 ptr
162 }
163
164 unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
165 let new_ptr = self.default_allocator.realloc(ptr, layout, new_size);
166 if !IS_NESTED.with(|n| n.swap(true, Ordering::Relaxed)) {
167 update_backtrace(ptr, new_ptr, (new_size as isize) - (layout.size() as isize));
168 IS_NESTED.with(|n| n.store(false, Ordering::Relaxed));
169 }
170 new_ptr
171 }
172}