diskann_disk/utils/instrumentation/
perf_logger.rs1use std::fmt;
6
7#[cfg(feature = "perf_test")]
8use opentelemetry::{
9 global,
10 trace::{get_active_span, Tracer},
11 KeyValue,
12};
13use tracing::info;
14
15use diskann_providers::utils::Timer;
16
17pub const LATENCY_LOG_TARGET: &str = "latency_event";
19
20mod scenario {
21 pub const DISK_INDEX_BUILD_SCENARIO: &str = "DiskIndexBuild";
22}
23
24#[derive(Debug)]
25pub enum DiskIndexBuildCheckpoint {
26 PqConstruction,
27 InmemIndexBuild,
28 DiskLayout,
29}
30
31impl fmt::Display for DiskIndexBuildCheckpoint {
32 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33 write!(f, "{:?}", self)
34 }
35}
36
37#[derive(Debug)]
38pub enum BuildMergedVamanaIndexCheckpoint {
39 PartitionData,
40 BuildIndicesOnShards,
41 MergeIndices,
42}
43
44impl fmt::Display for BuildMergedVamanaIndexCheckpoint {
45 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46 write!(f, "{:?}", self)
47 }
48}
49
50pub struct PerfLogger {
51 inner: Option<PerfLoggerInner>,
52}
53
54impl PerfLogger {
55 pub fn new<T: std::fmt::Display>(scenario: T, enable_logging: bool) -> Self {
56 if enable_logging {
57 let inner = PerfLoggerInner {
58 scenario: scenario.to_string(),
59 timer: Timer::new(),
60 };
61 Self { inner: Some(inner) }
62 } else {
63 Self { inner: None }
64 }
65 }
66
67 pub fn new_disk_index_build_logger() -> Self {
68 Self::new(scenario::DISK_INDEX_BUILD_SCENARIO, true)
69 }
70
71 pub fn start(&mut self) {
73 if let Some(inner) = &mut self.inner {
74 inner.start();
75 }
76 }
77
78 pub fn log_checkpoint<T: fmt::Display>(&mut self, checkpoint: T) {
97 if let Some(inner) = &mut self.inner {
98 inner.log_checkpoint(checkpoint);
99 }
100 }
101
102 pub fn log_enabled(&self) -> bool {
104 self.inner.is_some()
105 }
106}
107
108struct PerfLoggerInner {
109 scenario: String,
110 timer: Timer,
111}
112
113impl PerfLoggerInner {
114 fn start(&mut self) {
116 self.timer.reset();
117 }
118
119 fn log_checkpoint<T: fmt::Display>(&mut self, checkpoint: T) {
138 info!( target: LATENCY_LOG_TARGET,
139 "Time for {} [Checkpoint: {}] completed: {:.3} seconds, {:.3}B cycles, {:.3}% CPU time, peak memory {:.3} GBs",
140 self.scenario,
141 checkpoint,
142 self.timer.elapsed().as_secs_f32(),
143 self.timer.elapsed_gcycles(),
144 self.timer.get_average_cpu_time_in_percents(),
145 self.timer.get_peak_memory_usage()
146 );
147
148 #[cfg(feature = "perf_test")]
149 {
150 let tracer = global::tracer("");
151 tracer.in_span(format!("{}-{}", self.scenario, checkpoint), |_context| {
152 get_active_span(|span| {
153 span.set_attribute(KeyValue::new(
154 "duration_seconds",
155 self.timer.elapsed().as_secs_f64(),
156 ));
157 span.set_attribute(KeyValue::new(
158 "elapsed_cycles",
159 self.timer.elapsed_gcycles() as f64,
160 ));
161 span.set_attribute(KeyValue::new(
162 "cpu_time",
163 self.timer.get_average_cpu_time_in_percents(),
164 ));
165 span.set_attribute(KeyValue::new(
166 "peak_memory_usage",
167 self.timer.get_peak_memory_usage() as f64,
168 ));
169 });
170 });
171 }
172
173 self.timer.reset();
174 }
175}
176
177#[cfg(test)]
178mod perf_logger_tests {
179 use super::*;
180
181 #[test]
182 fn test_log() {
183 let scenario = "test";
184 let mut logger = PerfLogger::new(scenario, true);
185 assert!(logger.log_enabled());
186 logger.log_checkpoint(DiskIndexBuildCheckpoint::PqConstruction);
187 logger.start();
188 logger.log_checkpoint(DiskIndexBuildCheckpoint::InmemIndexBuild);
189 }
190
191 #[test]
192 fn test_log_disabled() {
193 let scenario = "test";
194 let mut logger = PerfLogger::new(scenario, false);
195 assert!(!logger.log_enabled());
196 logger.log_checkpoint(DiskIndexBuildCheckpoint::PqConstruction);
197 logger.start();
198 logger.log_checkpoint(DiskIndexBuildCheckpoint::InmemIndexBuild);
199 }
200}