parallel_processor/
debug_allocator.rs

1use bincode::{Decode, Encode};
2use dashmap::DashMap;
3use once_cell::sync::Lazy;
4use std::alloc::{GlobalAlloc, Layout, System};
5use std::fs::File;
6use std::io::Write;
7use std::path::Path;
8use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
9use std::time::Duration;
10
11pub struct DebugAllocator {
12    default_allocator: std::alloc::System,
13}
14
15struct AllocationInfo {
16    bt: String,
17    current_count: AtomicUsize,
18    current_size: AtomicUsize,
19    max_size: AtomicUsize,
20    total_count: AtomicUsize,
21}
22
23impl AllocationInfo {
24    pub fn as_writable(&self) -> AllocationInfoWritable {
25        AllocationInfoWritable {
26            bt: self.bt.clone(),
27            current_count: self.current_count.load(Ordering::Relaxed),
28            current_size: self.current_size.load(Ordering::Relaxed),
29            max_size: self.max_size.load(Ordering::Relaxed),
30            total_count: self.total_count.load(Ordering::Relaxed),
31        }
32    }
33}
34
35#[derive(Encode, Decode, Clone, Debug)]
36struct AllocationInfoWritable {
37    bt: String,
38    current_count: usize,
39    current_size: usize,
40    max_size: usize,
41    total_count: usize,
42}
43
44static ALLOCATION_INFOS: Lazy<DashMap<String, AllocationInfo>> = Lazy::new(|| DashMap::new());
45static ADDRESSES_BACKTRACE: Lazy<DashMap<usize, String>> = Lazy::new(|| DashMap::new());
46
47pub fn debug_print_allocations(dir: impl AsRef<Path>, period: Duration) {
48    let dir = dir.as_ref().to_path_buf();
49    std::thread::spawn(move || {
50        IS_NESTED.with(|n| n.store(true, Ordering::Relaxed));
51        let mut count = 1;
52        loop {
53            std::thread::sleep(period);
54
55            let path = dir.join(format!("memory-log{}.json", count));
56
57            let mut allocations: Vec<_> =
58                ALLOCATION_INFOS.iter().map(|x| x.as_writable()).collect();
59
60            allocations.sort_by(|x, y| y.max_size.cmp(&x.max_size));
61
62            let _ = File::create(path)
63                .unwrap()
64                .write_all(format!("{:?}", allocations).as_bytes());
65
66            count += 1;
67        }
68    });
69}
70
71fn store_backtrace(addr: *mut u8, size: usize) {
72    let bt: backtrace::Backtrace = backtrace::Backtrace::new();
73
74    let bt_string = format!("{:?}", bt);
75
76    let parts = bt_string.split("  5:").collect::<Vec<_>>();
77
78    let bt_string = parts.last().unwrap().to_string();
79
80    ADDRESSES_BACKTRACE.insert(addr as usize, bt_string.clone());
81
82    let info = ALLOCATION_INFOS
83        .entry(bt_string.clone())
84        .or_insert(AllocationInfo {
85            bt: bt_string,
86            current_count: AtomicUsize::new(0),
87            current_size: AtomicUsize::new(0),
88            max_size: AtomicUsize::new(0),
89            total_count: AtomicUsize::new(0),
90        });
91
92    info.current_count.fetch_add(1, Ordering::Relaxed);
93    info.current_size.fetch_add(size, Ordering::Relaxed);
94    info.total_count.fetch_add(1, Ordering::Relaxed);
95    info.max_size
96        .fetch_max(info.current_size.load(Ordering::Relaxed), Ordering::Relaxed);
97}
98
99fn update_backtrace(ptr: *mut u8, new_ptr: *mut u8, diff: isize) {
100    let (_, bt) = ADDRESSES_BACKTRACE.remove(&(ptr as usize)).unwrap();
101
102    let aref = ALLOCATION_INFOS.get(&bt).unwrap();
103    if diff > 0 {
104        aref.current_size
105            .fetch_add(diff as usize, Ordering::Relaxed);
106        aref.max_size
107            .fetch_max(aref.current_size.load(Ordering::Relaxed), Ordering::Relaxed);
108    } else {
109        aref.current_size
110            .fetch_sub((-diff) as usize, Ordering::Relaxed);
111    }
112
113    ADDRESSES_BACKTRACE.insert(new_ptr as usize, bt);
114}
115
116fn dealloc_backtrace(ptr: *mut u8, size: usize) {
117    let (_, bt) = ADDRESSES_BACKTRACE.remove(&(ptr as usize)).unwrap();
118
119    let aref = ALLOCATION_INFOS.get(&bt).unwrap();
120    aref.current_count.fetch_sub(1, Ordering::Relaxed);
121    aref.current_size.fetch_sub(size, Ordering::Relaxed);
122}
123
124impl DebugAllocator {
125    pub const fn new() -> Self {
126        Self {
127            default_allocator: System,
128        }
129    }
130}
131
132thread_local! {
133    static IS_NESTED: AtomicBool = AtomicBool::new(false);
134}
135
136unsafe impl GlobalAlloc for DebugAllocator {
137    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
138        let ptr = self.default_allocator.alloc(layout);
139        if !IS_NESTED.with(|n| n.swap(true, Ordering::Relaxed)) {
140            store_backtrace(ptr, layout.size());
141            IS_NESTED.with(|n| n.store(false, Ordering::Relaxed));
142        }
143        ptr
144    }
145
146    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
147        if !IS_NESTED.with(|n| n.swap(true, Ordering::Relaxed)) {
148            dealloc_backtrace(ptr, layout.size());
149            IS_NESTED.with(|n| n.store(false, Ordering::Relaxed));
150        }
151        self.default_allocator.dealloc(ptr, layout)
152    }
153
154    unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 {
155        let ptr = self.default_allocator.alloc_zeroed(layout);
156        if !IS_NESTED.with(|n| n.swap(true, Ordering::Relaxed)) {
157            store_backtrace(ptr, layout.size());
158            IS_NESTED.with(|n| n.store(false, Ordering::Relaxed));
159        }
160        ptr
161    }
162
163    unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 {
164        let new_ptr = self.default_allocator.realloc(ptr, layout, new_size);
165        if !IS_NESTED.with(|n| n.swap(true, Ordering::Relaxed)) {
166            update_backtrace(ptr, new_ptr, (new_size as isize) - (layout.size() as isize));
167            IS_NESTED.with(|n| n.store(false, Ordering::Relaxed));
168        }
169        new_ptr
170    }
171}