parallel_processor/
phase_times_monitor.rs

1use crate::{
2    memory_data_size::MemoryDataSize,
3    memory_fs::{allocator::CHUNKS_ALLOCATOR, MemoryFs},
4};
5use parking_lot::lock_api::RawRwLock;
6use parking_lot::RwLock;
7use std::time::{Duration, Instant};
8#[cfg(feature = "process-stats")]
9use {nightly_quirks::utils::NightlyUtils, parking_lot::Mutex, std::cmp::max};
10
11pub struct PhaseResult {
12    name: String,
13    time: Duration,
14}
15
16pub struct PhaseTimesMonitor {
17    timer: Option<Instant>,
18    phase: Option<(String, Instant)>,
19    results: Vec<PhaseResult>,
20}
21
22pub static PHASES_TIMES_MONITOR: RwLock<PhaseTimesMonitor> =
23    RwLock::const_new(parking_lot::RawRwLock::INIT, PhaseTimesMonitor::new());
24
25#[cfg(feature = "process-stats")]
26struct ProcessStats {
27    user_cpu_total: f64,
28    kernel_cpu_total: f64,
29    mem_total: u128,
30    mem_max: u64,
31    samples_cnt: u64,
32    disk_space: u64,
33    max_disk_space: u64,
34}
35
36#[cfg(feature = "process-stats")]
37impl ProcessStats {
38    const fn new() -> Self {
39        ProcessStats {
40            user_cpu_total: 0.0,
41            kernel_cpu_total: 0.0,
42            mem_total: 0,
43            mem_max: 0,
44            samples_cnt: 0,
45            disk_space: 0,
46            max_disk_space: 0,
47        }
48    }
49
50    fn update(
51        &mut self,
52        elapsed_time: Duration,
53        elapsed_cpu: Duration,
54        elapsed_kernel: Duration,
55        current_mem: u64,
56        disk_space: u64,
57        _max_disk_space: u64,
58    ) {
59        self.samples_cnt += 1;
60        self.user_cpu_total += elapsed_cpu.as_secs_f64() / elapsed_time.as_secs_f64();
61        self.kernel_cpu_total += elapsed_kernel.as_secs_f64() / elapsed_time.as_secs_f64();
62        self.mem_total += current_mem as u128;
63        self.mem_max = max(self.mem_max, current_mem);
64        self.disk_space = disk_space;
65        self.max_disk_space = max(self.max_disk_space, disk_space);
66    }
67
68    fn format(&self) -> String {
69        let samples_cnt = if self.samples_cnt == 0 {
70            1
71        } else {
72            self.samples_cnt
73        };
74
75        format!(
76            "(uc:{:.2} kc:{:.2} mm:{:.2} cm:{:.2} ds: {:.2} mds: {:.2})",
77            (self.user_cpu_total / (samples_cnt as f64)),
78            (self.kernel_cpu_total / (samples_cnt as f64)),
79            MemoryDataSize::from_bytes(self.mem_max as usize),
80            MemoryDataSize::from_bytes((self.mem_total / (samples_cnt as u128)) as usize),
81            MemoryDataSize::from_bytes(self.disk_space as usize),
82            MemoryDataSize::from_bytes(self.max_disk_space as usize),
83        )
84    }
85
86    fn reset(&mut self) {
87        *self = Self::new();
88    }
89}
90
91#[cfg(feature = "process-stats")]
92static GLOBAL_STATS: Mutex<ProcessStats> = NightlyUtils::new_mutex(ProcessStats::new());
93#[cfg(feature = "process-stats")]
94static PHASE_STATS: Mutex<ProcessStats> = NightlyUtils::new_mutex(ProcessStats::new());
95#[cfg(feature = "process-stats")]
96static CURRENT_STATS: Mutex<ProcessStats> = NightlyUtils::new_mutex(ProcessStats::new());
97
98impl PhaseTimesMonitor {
99    const fn new() -> PhaseTimesMonitor {
100        PhaseTimesMonitor {
101            timer: None,
102            phase: None,
103            results: Vec::new(),
104        }
105    }
106
107    pub fn init(&mut self) {
108        self.timer = Some(Instant::now());
109
110        #[cfg(feature = "process-stats")]
111        {
112            std::thread::spawn(|| {
113                let clock = Instant::now();
114
115                let mut last_stats = crate::simple_process_stats::ProcessStats::get().unwrap();
116                let mut last_clock = clock.elapsed();
117
118                loop {
119                    std::thread::sleep(Duration::from_millis(100));
120                    let stats = crate::simple_process_stats::ProcessStats::get().unwrap();
121
122                    let time_now = clock.elapsed();
123
124                    let elapsed = time_now - last_clock;
125                    let kernel_elapsed_usage = stats.cpu_time_kernel - last_stats.cpu_time_kernel;
126                    let user_elapsed_usage = stats.cpu_time_user - last_stats.cpu_time_user;
127                    let current_memory = stats.memory_usage_bytes;
128                    let disk_stats = MemoryFs::get_stats();
129
130                    GLOBAL_STATS.lock().update(
131                        elapsed,
132                        user_elapsed_usage,
133                        kernel_elapsed_usage,
134                        current_memory,
135                        disk_stats.current_disk_usage,
136                        disk_stats.max_disk_usage,
137                    );
138                    PHASE_STATS.lock().update(
139                        elapsed,
140                        user_elapsed_usage,
141                        kernel_elapsed_usage,
142                        current_memory,
143                        disk_stats.current_disk_usage,
144                        disk_stats.max_disk_usage,
145                    );
146
147                    let mut current_stats = CURRENT_STATS.lock();
148                    current_stats.reset();
149                    current_stats.update(
150                        elapsed,
151                        user_elapsed_usage,
152                        kernel_elapsed_usage,
153                        current_memory,
154                        disk_stats.current_disk_usage,
155                        disk_stats.max_disk_usage,
156                    );
157
158                    last_clock = time_now;
159                    last_stats = stats;
160                }
161            });
162        }
163    }
164
165    fn end_phase(&mut self) {
166        if let Some((name, phase_timer)) = self.phase.take() {
167            let elapsed = phase_timer.elapsed();
168            crate::log_info!(
169                "Finished {}. phase duration: {:.2?} gtime: {:.2?}{}", // memory: {:.2} {:.2}%
170                name,
171                &elapsed,
172                self.get_wallclock(),
173                Self::format_process_stats()
174            );
175            self.results.push(PhaseResult {
176                name,
177                time: elapsed,
178            })
179        }
180    }
181
182    pub fn start_phase(&mut self, name: String) {
183        self.end_phase();
184        crate::log_info!(
185            "Started {}{}{}",
186            name,
187            match () {
188                #[cfg(feature = "process-stats")]
189                () => " prev stats: ",
190                #[cfg(not(feature = "process-stats"))]
191                () => String::new(),
192            },
193            Self::format_process_stats()
194        );
195        #[cfg(feature = "process-stats")]
196        PHASE_STATS.lock().reset();
197        self.phase = Some((name, Instant::now()));
198    }
199
200    pub fn get_wallclock(&self) -> Duration {
201        self.timer
202            .as_ref()
203            .map(|t| t.elapsed())
204            .unwrap_or(Duration::from_millis(0))
205    }
206
207    pub fn get_phase_desc(&self) -> String {
208        self.phase
209            .as_ref()
210            .map(|x| x.0.clone())
211            .unwrap_or(String::new())
212    }
213
214    pub fn get_phase_timer(&self) -> Duration {
215        self.phase
216            .as_ref()
217            .map(|x| x.1.elapsed())
218            .unwrap_or(Duration::from_millis(0))
219    }
220
221    fn format_process_stats() -> String {
222        #[cfg(feature = "process-stats")]
223        {
224            let memory = crate::simple_process_stats::ProcessStats::get()
225                .unwrap()
226                .memory_usage_bytes;
227
228            format!(
229                " GL:{} PH:{} CT: {} CM: {:.2}",
230                GLOBAL_STATS.lock().format(),
231                PHASE_STATS.lock().format(),
232                CURRENT_STATS.lock().format(),
233                MemoryDataSize::from_bytes(memory as usize),
234            )
235        }
236        #[cfg(not(feature = "process-stats"))]
237        String::new()
238    }
239
240    pub fn get_formatted_counter(&self) -> String {
241        let total_mem = CHUNKS_ALLOCATOR.get_total_memory();
242        let free_mem = CHUNKS_ALLOCATOR.get_free_memory();
243
244        format!(
245            " ptime: {:.2?} gtime: {:.2?} memory: {:.2} {:.2}%{}",
246            self.phase
247                .as_ref()
248                .map(|pt| pt.1.elapsed())
249                .unwrap_or(Duration::from_millis(0)),
250            self.get_wallclock(),
251            total_mem - free_mem,
252            ((1.0 - free_mem / total_mem) * 100.0),
253            Self::format_process_stats()
254        )
255    }
256
257    pub fn get_formatted_counter_without_memory(&self) -> String {
258        format!(
259            " ptime: {:.2?} gtime: {:.2?}{}",
260            self.phase
261                .as_ref()
262                .map(|pt| pt.1.elapsed())
263                .unwrap_or(Duration::from_millis(0)),
264            self.get_wallclock(),
265            Self::format_process_stats()
266        )
267    }
268
269    pub fn print_stats(&mut self, end_message: String) {
270        self.end_phase();
271
272        crate::log_info!("{}", end_message);
273        crate::log_info!("TOTAL TIME: {:.2?}", self.get_wallclock());
274        let fs_stats = MemoryFs::get_stats();
275
276        crate::log_info!(
277            "Max virtual fs usage: {:.2}",
278            MemoryDataSize::from_bytes(fs_stats.max_files_usage as usize),
279        );
280        crate::log_info!(
281            "Max disk usage: {:.2}",
282            MemoryDataSize::from_bytes(fs_stats.max_disk_usage as usize)
283        );
284        crate::log_info!("Final stats:");
285
286        for PhaseResult { name, time } in self.results.iter() {
287            crate::log_info!("\t{} \t=> {:.2?}", name, time);
288        }
289    }
290}