parallel_processor/
debug_allocator.rs

1use dashmap::DashMap;
2use once_cell::sync::Lazy;
3use serde::{Deserialize, Serialize};
4use serde_json::to_string_pretty;
5use std::alloc::{GlobalAlloc, Layout, System};
6use std::fs::File;
7use std::io::Write;
8use std::path::Path;
9use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
10use std::time::Duration;
11
12pub struct DebugAllocator {
13    default_allocator: std::alloc::System,
14}
15
16struct AllocationInfo {
17    bt: String,
18    current_count: AtomicUsize,
19    current_size: AtomicUsize,
20    max_size: AtomicUsize,
21    total_count: AtomicUsize,
22}
23
24impl AllocationInfo {
25    pub fn as_writable(&self) -> AllocationInfoWritable {
26        AllocationInfoWritable {
27            bt: self.bt.clone(),
28            current_count: self.current_count.load(Ordering::Relaxed),
29            current_size: self.current_size.load(Ordering::Relaxed),
30            max_size: self.max_size.load(Ordering::Relaxed),
31            total_count: self.total_count.load(Ordering::Relaxed),
32        }
33    }
34}
35
36#[derive(Serialize, Deserialize, Clone)]
37struct AllocationInfoWritable {
38    bt: String,
39    current_count: usize,
40    current_size: usize,
41    max_size: usize,
42    total_count: usize,
43}
44
45static ALLOCATION_INFOS: Lazy<DashMap<String, AllocationInfo>> = Lazy::new(|| DashMap::new());
46static ADDRESSES_BACKTRACE: Lazy<DashMap<usize, String>> = Lazy::new(|| DashMap::new());
47
48pub fn debug_print_allocations(dir: impl AsRef<Path>, period: Duration) {
49    let dir = dir.as_ref().to_path_buf();
50    std::thread::spawn(move || {
51        IS_NESTED.with(|n| n.store(true, Ordering::Relaxed));
52        let mut count = 1;
53        loop {
54            std::thread::sleep(period);
55
56            let path = dir.join(format!("memory-log{}.json", count));
57
58            let mut allocations: Vec<_> =
59                ALLOCATION_INFOS.iter().map(|x| x.as_writable()).collect();
60
61            allocations.sort_by(|x, y| y.max_size.cmp(&x.max_size));
62
63            let _ = File::create(path)
64                .unwrap()
65                .write_all(to_string_pretty(&allocations).unwrap().as_bytes());
66
67            count += 1;
68        }
69    });
70}
71
72fn store_backtrace(addr: *mut u8, size: usize) {
73    let bt: backtrace::Backtrace = backtrace::Backtrace::new();
74
75    let bt_string = format!("{:?}", bt);
76
77    let parts = bt_string.split("  5:").collect::<Vec<_>>();
78
79    let bt_string = parts.last().unwrap().to_string();
80
81    ADDRESSES_BACKTRACE.insert(addr as usize, bt_string.clone());
82
83    let info = ALLOCATION_INFOS
84        .entry(bt_string.clone())
85        .or_insert(AllocationInfo {
86            bt: bt_string,
87            current_count: AtomicUsize::new(0),
88            current_size: AtomicUsize::new(0),
89            max_size: AtomicUsize::new(0),
90            total_count: AtomicUsize::new(0),
91        });
92
93    info.current_count.fetch_add(1, Ordering::Relaxed);
94    info.current_size.fetch_add(size, Ordering::Relaxed);
95    info.total_count.fetch_add(1, Ordering::Relaxed);
96    info.max_size
97        .fetch_max(info.current_size.load(Ordering::Relaxed), Ordering::Relaxed);
98}
99
100fn update_backtrace(ptr: *mut u8, new_ptr: *mut u8, diff: isize) {
101    let (_, bt) = ADDRESSES_BACKTRACE.remove(&(ptr as usize)).unwrap();
102
103    let aref = ALLOCATION_INFOS.get(&bt).unwrap();
104    if diff > 0 {
105        aref.current_size
106            .fetch_add(diff as usize, Ordering::Relaxed);
107        aref.max_size
108            .fetch_max(aref.current_size.load(Ordering::Relaxed), Ordering::Relaxed);
109    } else {
110        aref.current_size
111            .fetch_sub((-diff) as usize, Ordering::Relaxed);
112    }
113
114    ADDRESSES_BACKTRACE.insert(new_ptr as usize, bt);
115}
116
117fn dealloc_backtrace(ptr: *mut u8, size: usize) {
118    let (_, bt) = ADDRESSES_BACKTRACE.remove(&(ptr as usize)).unwrap();
119
120    let aref = ALLOCATION_INFOS.get(&bt).unwrap();
121    aref.current_count.fetch_sub(1, Ordering::Relaxed);
122    aref.current_size.fetch_sub(size, Ordering::Relaxed);
123}
124
125impl DebugAllocator {
126    pub const fn new() -> Self {
127        Self {
128            default_allocator: System,
129        }
130    }
131}
132
133thread_local! {
134    static IS_NESTED: AtomicBool = AtomicBool::new(false);
135}
136
137unsafe impl GlobalAlloc for DebugAllocator {
138    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
139        let ptr = self.default_allocator.alloc(layout);
140        if !IS_NESTED.with(|n| n.swap(true, Ordering::Relaxed)) {
141            store_backtrace(ptr, layout.size());
142            IS_NESTED.with(|n| n.store(false, Ordering::Relaxed));
143        }
144        ptr
145    }
146
147    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
148        if !IS_NESTED.with(|n| n.swap(true, Ordering::Relaxed)) {
149            dealloc_backtrace(ptr, layout.size());
150            IS_NESTED.with(|n| n.store(false, Ordering::Relaxed));
151        }
152        self.default_allocator.dealloc(ptr, layout)
153    }
154
155    unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
156        let ptr = self.default_allocator.alloc_zeroed(layout);
157        if !IS_NESTED.with(|n| n.swap(true, Ordering::Relaxed)) {
158            store_backtrace(ptr, layout.size());
159            IS_NESTED.with(|n| n.store(false, Ordering::Relaxed));
160        }
161        ptr
162    }
163
164    unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
165        let new_ptr = self.default_allocator.realloc(ptr, layout, new_size);
166        if !IS_NESTED.with(|n| n.swap(true, Ordering::Relaxed)) {
167            update_backtrace(ptr, new_ptr, (new_size as isize) - (layout.size() as isize));
168            IS_NESTED.with(|n| n.store(false, Ordering::Relaxed));
169        }
170        new_ptr
171    }
172}