parallel_processor/execution_manager/
memory_tracker.rs

1use crate::execution_manager::executor::AsyncExecutor;
2use crate::execution_manager::packet::PacketTrait;
3use crate::memory_data_size::MemoryDataSize;
4use dashmap::DashMap;
5use std::any::TypeId;
6use std::cmp::max;
7use std::io::stdout;
8use std::marker::PhantomData;
9use std::sync::Arc;
10
11pub struct MemoryTrackerManager {
12    packet_sizes: DashMap<TypeId, ((usize, usize), usize)>,
13    executors_sizes: DashMap<TypeId, ((usize, usize), usize)>,
14    type_names: DashMap<TypeId, &'static str>,
15}
16
17unsafe impl Sync for MemoryTrackerManager {}
18unsafe impl Send for MemoryTrackerManager {}
19
20impl MemoryTrackerManager {
21    pub fn new() -> Self {
22        MemoryTrackerManager {
23            packet_sizes: DashMap::new(),
24            executors_sizes: DashMap::new(),
25            type_names: DashMap::new(),
26        }
27    }
28
29    pub fn get_executor_instance<E: AsyncExecutor>(self: &Arc<Self>) -> MemoryTracker<E> {
30        MemoryTracker::new(self.clone())
31    }
32
33    pub fn add_queue_packet<T: PacketTrait>(&self, packet: &T) {
34        if packet.get_size() > 0 {
35            let mut entry = self
36                .packet_sizes
37                .entry(TypeId::of::<T>())
38                .or_insert(((0, 0), 0));
39            self.type_names
40                .entry(TypeId::of::<T>())
41                .or_insert(std::any::type_name::<T>());
42            entry.value_mut().0 .0 += packet.get_size();
43            entry.value_mut().0 .1 += 1;
44
45            let crt_val = entry.value_mut().0 .0;
46            let max_val = entry.value().1;
47            entry.value_mut().1 = max(max_val, crt_val);
48        }
49    }
50    pub fn remove_queue_packet<T: PacketTrait>(&self, packet: &T) {
51        if packet.get_size() > 0 {
52            let mut entry = self.packet_sizes.get_mut(&TypeId::of::<T>()).unwrap();
53            entry.value_mut().0 .0 -= packet.get_size();
54            entry.value_mut().0 .1 -= 1;
55        }
56    }
57
58    fn get_pretty_name(&self, ptr: TypeId) -> String {
59        let string = self.type_names.get(&ptr).unwrap();
60
61        let mut builder = String::new();
62        let mut last_was_col = false;
63        for ch in string.chars() {
64            builder.push(ch);
65
66            let current_is_col = ch == ':';
67
68            if last_was_col & current_is_col {
69                builder.pop();
70                builder.pop();
71
72                while builder
73                    .chars()
74                    .last()
75                    .map(|c| c.is_ascii_alphanumeric() || c == '_')
76                    .unwrap_or(false)
77                {
78                    builder.pop();
79                }
80
81                last_was_col = false;
82            } else {
83                last_was_col = current_is_col;
84            }
85        }
86        builder
87    }
88
89    pub fn print_debug(&self) {
90        let _out = stdout().lock();
91
92        crate::log_info!("Executors usages:");
93        for executor in self.executors_sizes.iter() {
94            crate::log_info!(
95                "\t{} ==> {:.2} with {} instances [MAX {:.2}]",
96                self.get_pretty_name(*executor.key()),
97                MemoryDataSize::from_bytes(executor.value().0 .0),
98                executor.value().0 .1,
99                MemoryDataSize::from_bytes(executor.value().1)
100            );
101        }
102
103        crate::log_info!("Packets in queue:");
104        for packet in self.packet_sizes.iter() {
105            crate::log_info!(
106                "\t{} ==> {:.2} with {} instances [MAX {:.2}]",
107                self.get_pretty_name(*packet.key()),
108                MemoryDataSize::from_bytes(packet.value().0 .0),
109                packet.value().0 .1,
110                MemoryDataSize::from_bytes(packet.value().1)
111            );
112        }
113    }
114}
115
116pub struct MemoryTracker<E: AsyncExecutor> {
117    manager: Arc<MemoryTrackerManager>,
118    last_memory_usage: usize,
119    _phantom: PhantomData<E>,
120}
121
122impl<E: AsyncExecutor> Clone for MemoryTracker<E> {
123    fn clone(&self) -> Self {
124        self.manager
125            .executors_sizes
126            .get_mut(&TypeId::of::<E>())
127            .unwrap()
128            .value_mut()
129            .0
130             .1 += 1;
131        Self {
132            manager: self.manager.clone(),
133            last_memory_usage: 0,
134            _phantom: PhantomData,
135        }
136    }
137}
138
139impl<E: AsyncExecutor> MemoryTracker<E> {
140    fn new(manager: Arc<MemoryTrackerManager>) -> Self {
141        manager
142            .executors_sizes
143            .entry(TypeId::of::<E>())
144            .or_insert(((0, 0), 0))
145            .value_mut()
146            .0
147             .1 += 1;
148        manager
149            .type_names
150            .entry(TypeId::of::<E>())
151            .or_insert(std::any::type_name::<E>());
152
153        MemoryTracker {
154            manager,
155            last_memory_usage: 0,
156            _phantom: PhantomData,
157        }
158    }
159
160    pub fn update_memory_usage(&mut self, usages: &[usize]) {
161        let new_memory_usage = usages.iter().sum::<usize>();
162        let mut entry = self
163            .manager
164            .executors_sizes
165            .get_mut(&TypeId::of::<E>())
166            .unwrap();
167        entry.value_mut().0 .0 -= self.last_memory_usage;
168        entry.value_mut().0 .0 += new_memory_usage;
169        self.last_memory_usage = new_memory_usage;
170
171        let crt_val = entry.value_mut().0 .0;
172        let max_val = entry.value().1;
173        entry.value_mut().1 = max(max_val, crt_val);
174    }
175}
176
177impl<E: AsyncExecutor> Drop for MemoryTracker<E> {
178    fn drop(&mut self) {
179        let mut entry = self
180            .manager
181            .executors_sizes
182            .get_mut(&TypeId::of::<E>())
183            .unwrap();
184        entry.value_mut().0 .0 -= self.last_memory_usage;
185        entry.value_mut().0 .1 -= 1;
186    }
187}