opentelemetry_system_metrics/
lib.rs1use eyre::ContextCompat;
46use eyre::Result;
47#[cfg(feature = "gpu")]
48use nvml_wrapper::enums::device::UsedGpuMemory;
49#[cfg(feature = "gpu")]
50use nvml_wrapper::Nvml;
51use opentelemetry::metrics::Meter;
52use opentelemetry::Key;
53use opentelemetry::KeyValue;
54use std::time::Duration;
55use sysinfo::{get_current_pid, System};
56
57const PROCESS_PID: Key = Key::from_static_str("process.pid");
58const PROCESS_EXECUTABLE_NAME: Key = Key::from_static_str("process.executable.name");
59const PROCESS_EXECUTABLE_PATH: Key = Key::from_static_str("process.executable.path");
60const PROCESS_COMMAND: Key = Key::from_static_str("process.command");
61const PROCESS_CPU_USAGE: &str = "process.cpu.usage";
62const PROCESS_CPU_UTILIZATION: &str = "process.cpu.utilization";
63const PROCESS_MEMORY_USAGE: &str = "process.memory.usage";
64const PROCESS_MEMORY_VIRTUAL: &str = "process.memory.virtual";
65const PROCESS_DISK_IO: &str = "process.disk.io";
66const DIRECTION: Key = Key::from_static_str("direction");
68#[cfg(feature = "gpu")]
69const PROCESS_GPU_MEMORY_USAGE: &str = "process.gpu.memory.usage";
70
71pub async fn init_process_observer(meter: Meter) -> Result<()> {
76 let pid =
77 get_current_pid().map_err(|err| eyre::eyre!("could not get current pid. Error: {err}"))?;
78 register_metrics(meter, pid, None).await
79}
80
81pub async fn init_process_observer_for_pid(meter: Meter, pid: u32) -> Result<()> {
87 let pid = sysinfo::Pid::from_u32(pid);
88 register_metrics(meter, pid, None).await
89}
90
91pub async fn init_process_observer_once(meter: Meter) -> Result<()> {
108 let pid =
109 get_current_pid().map_err(|err| eyre::eyre!("could not get current pid. Error: {err}"))?;
110 register_metrics(meter, pid, Some(1)).await
111}
112
113async fn register_metrics(
121 meter: Meter,
122 pid: sysinfo::Pid,
123 iterations: Option<usize>,
124) -> Result<()> {
125 let core_count =
126 System::physical_core_count().with_context(|| "Could not get physical core count")?;
127
128 #[cfg(feature = "gpu")]
129 let nvml = Nvml::init();
130
131 let process_cpu_utilization = meter
132 .f64_gauge(PROCESS_CPU_USAGE)
133 .with_description("The percentage of CPU in use.")
134 .with_unit("percent")
135 .build();
136 let process_cpu_usage = meter
137 .f64_gauge(PROCESS_CPU_UTILIZATION)
138 .with_description("The amount of CPU in use.")
139 .with_unit("percent")
140 .build();
141 let process_memory_usage = meter
142 .i64_gauge(PROCESS_MEMORY_USAGE)
143 .with_description("The amount of physical memory in use.")
144 .with_unit("byte")
145 .build();
146 let process_memory_virtual = meter
147 .i64_gauge(PROCESS_MEMORY_VIRTUAL)
148 .with_description("The amount of committed virtual memory.")
149 .with_unit("byte")
150 .build();
151 let process_disk_io = meter
152 .i64_gauge(PROCESS_DISK_IO)
153 .with_description("Disk bytes transferred.")
154 .with_unit("byte")
155 .build();
156
157 #[cfg(feature = "gpu")]
158 let process_gpu_memory_usage = meter
159 .u64_gauge(PROCESS_GPU_MEMORY_USAGE)
160 .with_description("The amount of physical GPU memory in use.")
161 .with_unit("byte")
162 .build();
163
164 let mut sys = System::new();
165 sys.refresh_processes(sysinfo::ProcessesToUpdate::Some(&[pid]), true);
166
167 let common_attributes = if let Some(process) = sys.process(pid) {
168 [
169 KeyValue::new(PROCESS_PID, pid.as_u32().clone() as i64),
170 KeyValue::new(
171 PROCESS_EXECUTABLE_NAME,
172 process
173 .name()
174 .to_os_string()
175 .into_string()
176 .unwrap_or_default(),
177 ),
178 KeyValue::new(
179 PROCESS_EXECUTABLE_PATH,
180 process
181 .exe()
182 .map(|path| path.to_string_lossy().into_owned())
183 .unwrap_or_default(),
184 ),
185 KeyValue::new(
186 PROCESS_COMMAND,
187 process.cmd().iter().fold(String::new(), |t1, t2| {
188 t1 + " " + t2.to_str().unwrap_or_default()
189 }),
190 ),
191 ]
192 } else {
193 unimplemented!()
194 };
195
196 let mut interval = tokio::time::interval(Duration::from_millis(
197 std::env::var("OTEL_METRIC_EXPORT_INTERVAL")
198 .unwrap_or_else(|_| "30000".to_string())
199 .parse::<u64>()
200 .unwrap_or(30000),
201 ));
202
203 let mut counter = 0;
204 loop {
205 interval.tick().await;
206
207 sys.refresh_processes(sysinfo::ProcessesToUpdate::Some(&[pid]), true);
208
209 if let Some(process) = sys.process(pid) {
210 let cpu_usage = process.cpu_usage();
211 let disk_io = process.disk_usage();
212 process_cpu_usage.record(cpu_usage.into(), &[]);
215 process_cpu_utilization
216 .record((cpu_usage / core_count as f32).into(), &common_attributes);
217 process_memory_usage.record((process.memory()).try_into()?, &common_attributes);
218 process_memory_virtual
219 .record((process.virtual_memory()).try_into()?, &common_attributes);
220 process_disk_io.record(
221 disk_io.read_bytes.try_into()?,
222 &[
223 common_attributes.as_slice(),
224 &[KeyValue::new(DIRECTION, "read")],
225 ]
226 .concat(),
227 );
228 process_disk_io.record(
229 disk_io.written_bytes.try_into()?,
230 &[
231 common_attributes.as_slice(),
232 &[KeyValue::new(DIRECTION, "write")],
233 ]
234 .concat(),
235 );
236 if let Some(max) = iterations {
237 counter += 1;
238 if counter >= max && max > 0 {
239 break Ok(());
240 }
241 }
242 }
243
244 #[cfg(feature = "gpu")]
245 {
246 match &nvml {
247 Ok(nvml) => {
248 if let Ok(device) = nvml.device_by_index(0) {
250 if let Ok(gpu_stats) = device.running_compute_processes() {
251 for stat in gpu_stats.iter() {
252 if stat.pid == pid.as_u32() {
253 let memory_used = match stat.used_gpu_memory {
254 UsedGpuMemory::Used(bytes) => bytes,
255 UsedGpuMemory::Unavailable => 0,
256 };
257
258 process_gpu_memory_usage
259 .record(memory_used, &common_attributes);
260
261 break;
262 }
263 }
264 };
265 }
266 }
267 Err(_) => {
268 }
270 }
271 }
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278 use opentelemetry::global;
279 use tokio::runtime::Runtime;
280
281 #[test]
282 fn test_init_process_observer_once() {
283 let rt = Runtime::new().unwrap();
284 rt.block_on(async {
285 let meter = global::meter("test-meter");
286 let result = init_process_observer_once(meter).await;
287 assert!(result.is_ok());
288 });
289 }
290}