parallel_processor/execution_manager/
memory_tracker.rs1use crate::execution_manager::executor::AsyncExecutor;
2use crate::execution_manager::packet::PacketTrait;
3use crate::memory_data_size::MemoryDataSize;
4use dashmap::DashMap;
5use std::any::TypeId;
6use std::cmp::max;
7use std::io::stdout;
8use std::marker::PhantomData;
9use std::sync::Arc;
10
11pub struct MemoryTrackerManager {
12 packet_sizes: DashMap<TypeId, ((usize, usize), usize)>,
13 executors_sizes: DashMap<TypeId, ((usize, usize), usize)>,
14 type_names: DashMap<TypeId, &'static str>,
15}
16
17unsafe impl Sync for MemoryTrackerManager {}
18unsafe impl Send for MemoryTrackerManager {}
19
20impl MemoryTrackerManager {
21 pub fn new() -> Self {
22 MemoryTrackerManager {
23 packet_sizes: DashMap::new(),
24 executors_sizes: DashMap::new(),
25 type_names: DashMap::new(),
26 }
27 }
28
29 pub fn get_executor_instance<E: AsyncExecutor>(self: &Arc<Self>) -> MemoryTracker<E> {
30 MemoryTracker::new(self.clone())
31 }
32
33 pub fn add_queue_packet<T: PacketTrait>(&self, packet: &T) {
34 if packet.get_size() > 0 {
35 let mut entry = self
36 .packet_sizes
37 .entry(TypeId::of::<T>())
38 .or_insert(((0, 0), 0));
39 self.type_names
40 .entry(TypeId::of::<T>())
41 .or_insert(std::any::type_name::<T>());
42 entry.value_mut().0 .0 += packet.get_size();
43 entry.value_mut().0 .1 += 1;
44
45 let crt_val = entry.value_mut().0 .0;
46 let max_val = entry.value().1;
47 entry.value_mut().1 = max(max_val, crt_val);
48 }
49 }
50 pub fn remove_queue_packet<T: PacketTrait>(&self, packet: &T) {
51 if packet.get_size() > 0 {
52 let mut entry = self.packet_sizes.get_mut(&TypeId::of::<T>()).unwrap();
53 entry.value_mut().0 .0 -= packet.get_size();
54 entry.value_mut().0 .1 -= 1;
55 }
56 }
57
58 fn get_pretty_name(&self, ptr: TypeId) -> String {
59 let string = self.type_names.get(&ptr).unwrap();
60
61 let mut builder = String::new();
62 let mut last_was_col = false;
63 for ch in string.chars() {
64 builder.push(ch);
65
66 let current_is_col = ch == ':';
67
68 if last_was_col & current_is_col {
69 builder.pop();
70 builder.pop();
71
72 while builder
73 .chars()
74 .last()
75 .map(|c| c.is_ascii_alphanumeric() || c == '_')
76 .unwrap_or(false)
77 {
78 builder.pop();
79 }
80
81 last_was_col = false;
82 } else {
83 last_was_col = current_is_col;
84 }
85 }
86 builder
87 }
88
89 pub fn print_debug(&self) {
90 let _out = stdout().lock();
91
92 crate::log_info!("Executors usages:");
93 for executor in self.executors_sizes.iter() {
94 crate::log_info!(
95 "\t{} ==> {:.2} with {} instances [MAX {:.2}]",
96 self.get_pretty_name(*executor.key()),
97 MemoryDataSize::from_bytes(executor.value().0 .0),
98 executor.value().0 .1,
99 MemoryDataSize::from_bytes(executor.value().1)
100 );
101 }
102
103 crate::log_info!("Packets in queue:");
104 for packet in self.packet_sizes.iter() {
105 crate::log_info!(
106 "\t{} ==> {:.2} with {} instances [MAX {:.2}]",
107 self.get_pretty_name(*packet.key()),
108 MemoryDataSize::from_bytes(packet.value().0 .0),
109 packet.value().0 .1,
110 MemoryDataSize::from_bytes(packet.value().1)
111 );
112 }
113 }
114}
115
116pub struct MemoryTracker<E: AsyncExecutor> {
117 manager: Arc<MemoryTrackerManager>,
118 last_memory_usage: usize,
119 _phantom: PhantomData<E>,
120}
121
122impl<E: AsyncExecutor> Clone for MemoryTracker<E> {
123 fn clone(&self) -> Self {
124 self.manager
125 .executors_sizes
126 .get_mut(&TypeId::of::<E>())
127 .unwrap()
128 .value_mut()
129 .0
130 .1 += 1;
131 Self {
132 manager: self.manager.clone(),
133 last_memory_usage: 0,
134 _phantom: PhantomData,
135 }
136 }
137}
138
139impl<E: AsyncExecutor> MemoryTracker<E> {
140 fn new(manager: Arc<MemoryTrackerManager>) -> Self {
141 manager
142 .executors_sizes
143 .entry(TypeId::of::<E>())
144 .or_insert(((0, 0), 0))
145 .value_mut()
146 .0
147 .1 += 1;
148 manager
149 .type_names
150 .entry(TypeId::of::<E>())
151 .or_insert(std::any::type_name::<E>());
152
153 MemoryTracker {
154 manager,
155 last_memory_usage: 0,
156 _phantom: PhantomData,
157 }
158 }
159
160 pub fn update_memory_usage(&mut self, usages: &[usize]) {
161 let new_memory_usage = usages.iter().sum::<usize>();
162 let mut entry = self
163 .manager
164 .executors_sizes
165 .get_mut(&TypeId::of::<E>())
166 .unwrap();
167 entry.value_mut().0 .0 -= self.last_memory_usage;
168 entry.value_mut().0 .0 += new_memory_usage;
169 self.last_memory_usage = new_memory_usage;
170
171 let crt_val = entry.value_mut().0 .0;
172 let max_val = entry.value().1;
173 entry.value_mut().1 = max(max_val, crt_val);
174 }
175}
176
177impl<E: AsyncExecutor> Drop for MemoryTracker<E> {
178 fn drop(&mut self) {
179 let mut entry = self
180 .manager
181 .executors_sizes
182 .get_mut(&TypeId::of::<E>())
183 .unwrap();
184 entry.value_mut().0 .0 -= self.last_memory_usage;
185 entry.value_mut().0 .1 -= 1;
186 }
187}