Skip to main content

diskann_disk/utils/instrumentation/
perf_logger.rs

1/*
2 * Copyright (c) Microsoft Corporation.
3 * Licensed under the MIT license.
4 */
5use std::fmt;
6
7#[cfg(feature = "perf_test")]
8use opentelemetry::{
9    global,
10    trace::{get_active_span, Tracer},
11    KeyValue,
12};
13use tracing::info;
14
15use diskann_providers::utils::Timer;
16
17/// Target for logging latency events.
18pub const LATENCY_LOG_TARGET: &str = "latency_event";
19
20mod scenario {
21    pub const DISK_INDEX_BUILD_SCENARIO: &str = "DiskIndexBuild";
22}
23
24#[derive(Debug)]
25pub enum DiskIndexBuildCheckpoint {
26    PqConstruction,
27    InmemIndexBuild,
28    DiskLayout,
29}
30
31impl fmt::Display for DiskIndexBuildCheckpoint {
32    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33        write!(f, "{:?}", self)
34    }
35}
36
37#[derive(Debug)]
38pub enum BuildMergedVamanaIndexCheckpoint {
39    PartitionData,
40    BuildIndicesOnShards,
41    MergeIndices,
42}
43
44impl fmt::Display for BuildMergedVamanaIndexCheckpoint {
45    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46        write!(f, "{:?}", self)
47    }
48}
49
50pub struct PerfLogger {
51    inner: Option<PerfLoggerInner>,
52}
53
54impl PerfLogger {
55    pub fn new<T: std::fmt::Display>(scenario: T, enable_logging: bool) -> Self {
56        if enable_logging {
57            let inner = PerfLoggerInner {
58                scenario: scenario.to_string(),
59                timer: Timer::new(),
60            };
61            Self { inner: Some(inner) }
62        } else {
63            Self { inner: None }
64        }
65    }
66
67    pub fn new_disk_index_build_logger() -> Self {
68        Self::new(scenario::DISK_INDEX_BUILD_SCENARIO, true)
69    }
70
71    /// Starts or restarts the timer for the perf logger.
72    pub fn start(&mut self) {
73        if let Some(inner) = &mut self.inner {
74            inner.start();
75        }
76    }
77
78    /// Logs the time elapsed since the last checkpoint or since the logger was created.
79    ///
80    /// This method calculates the elapsed time, logs it with the provided checkpoint name,
81    /// and then resets the start time to the current time. If logging is not enabled,
82    /// this method does nothing.
83    ///
84    /// # Arguments
85    ///
86    /// * `checkpoint` - A string slice that holds the name of the checkpoint.
87    ///
88    /// # Examples
89    ///
90    /// ```
91    /// use diskann_disk::utils::instrumentation::PerfLogger;
92    ///
93    /// let mut logger = PerfLogger::new("Scenario".to_string(), true);
94    /// logger.log_checkpoint("Checkpoint1");
95    /// ```
96    pub fn log_checkpoint<T: fmt::Display>(&mut self, checkpoint: T) {
97        if let Some(inner) = &mut self.inner {
98            inner.log_checkpoint(checkpoint);
99        }
100    }
101
102    /// Returns whether logging is enabled for the perf logger.
103    pub fn log_enabled(&self) -> bool {
104        self.inner.is_some()
105    }
106}
107
108struct PerfLoggerInner {
109    scenario: String,
110    timer: Timer,
111}
112
113impl PerfLoggerInner {
114    /// Starts or restarts the timer for the perf logger.
115    fn start(&mut self) {
116        self.timer.reset();
117    }
118
119    /// Logs the time elapsed since the last checkpoint or since the logger was created.
120    ///
121    /// This method calculates the elapsed time, logs it with the provided checkpoint name,
122    /// and then resets the start time to the current time. If logging is not enabled,
123    /// this method does nothing.
124    ///
125    /// # Arguments
126    ///
127    /// * `checkpoint` - A string slice that holds the name of the checkpoint.
128    ///
129    /// # Examples
130    ///
131    /// ```
132    /// use diskann_disk::utils::instrumentation::PerfLogger;
133    ///
134    /// let mut logger = PerfLogger::new("Scenario".to_string(), true);
135    /// logger.log_checkpoint("Checkpoint1");
136    /// ```
137    fn log_checkpoint<T: fmt::Display>(&mut self, checkpoint: T) {
138        info!( target: LATENCY_LOG_TARGET,
139            "Time for {} [Checkpoint: {}] completed: {:.3} seconds, {:.3}B cycles, {:.3}% CPU time, peak memory {:.3} GBs",
140            self.scenario,
141            checkpoint,
142            self.timer.elapsed().as_secs_f32(),
143            self.timer.elapsed_gcycles(),
144            self.timer.get_average_cpu_time_in_percents(),
145            self.timer.get_peak_memory_usage()
146        );
147
148        #[cfg(feature = "perf_test")]
149        {
150            let tracer = global::tracer("");
151            tracer.in_span(format!("{}-{}", self.scenario, checkpoint), |_context| {
152                get_active_span(|span| {
153                    span.set_attribute(KeyValue::new(
154                        "duration_seconds",
155                        self.timer.elapsed().as_secs_f64(),
156                    ));
157                    span.set_attribute(KeyValue::new(
158                        "elapsed_cycles",
159                        self.timer.elapsed_gcycles() as f64,
160                    ));
161                    span.set_attribute(KeyValue::new(
162                        "cpu_time",
163                        self.timer.get_average_cpu_time_in_percents(),
164                    ));
165                    span.set_attribute(KeyValue::new(
166                        "peak_memory_usage",
167                        self.timer.get_peak_memory_usage() as f64,
168                    ));
169                });
170            });
171        }
172
173        self.timer.reset();
174    }
175}
176
177#[cfg(test)]
178mod perf_logger_tests {
179    use super::*;
180
181    #[test]
182    fn test_log() {
183        let scenario = "test";
184        let mut logger = PerfLogger::new(scenario, true);
185        assert!(logger.log_enabled());
186        logger.log_checkpoint(DiskIndexBuildCheckpoint::PqConstruction);
187        logger.start();
188        logger.log_checkpoint(DiskIndexBuildCheckpoint::InmemIndexBuild);
189    }
190
191    #[test]
192    fn test_log_disabled() {
193        let scenario = "test";
194        let mut logger = PerfLogger::new(scenario, false);
195        assert!(!logger.log_enabled());
196        logger.log_checkpoint(DiskIndexBuildCheckpoint::PqConstruction);
197        logger.start();
198        logger.log_checkpoint(DiskIndexBuildCheckpoint::InmemIndexBuild);
199    }
200}