opentelemetry_system_metrics/
lib.rs1use std::time::Duration;
21
22use eyre::ContextCompat;
23use eyre::Result;
24use nvml_wrapper::enums::device::UsedGpuMemory;
25use nvml_wrapper::Nvml;
26
27use opentelemetry::KeyValue;
28use sysinfo::{get_current_pid, System};
29
30use opentelemetry::metrics::Meter;
31use opentelemetry::Key;
32use tokio::time::sleep;
33
34const PROCESS_PID: Key = Key::from_static_str("process.pid");
35const PROCESS_EXECUTABLE_NAME: Key = Key::from_static_str("process.executable.name");
36const PROCESS_EXECUTABLE_PATH: Key = Key::from_static_str("process.executable.path");
37const PROCESS_COMMAND: Key = Key::from_static_str("process.command");
38
39const PROCESS_CPU_USAGE: &str = "process.cpu.usage";
46const PROCESS_CPU_UTILIZATION: &str = "process.cpu.utilization";
47const PROCESS_MEMORY_USAGE: &str = "process.memory.usage";
48const PROCESS_MEMORY_VIRTUAL: &str = "process.memory.virtual";
49const PROCESS_DISK_IO: &str = "process.disk.io";
50const DIRECTION: Key = Key::from_static_str("direction");
52
53const PROCESS_GPU_MEMORY_USAGE: &str = "process.gpu.memory.usage";
55
56pub async fn init_process_observer(meter: Meter) -> Result<()> {
68 let pid =
69 get_current_pid().map_err(|err| eyre::eyre!("could not get current pid. Error: {err}"))?;
70 register_metrics(meter, pid).await
71}
72
73pub async fn init_process_observer_for_pid(meter: Meter, pid: u32) -> Result<()> {
86 let pid = sysinfo::Pid::from_u32(pid);
87 register_metrics(meter, pid).await
88}
89
90async fn register_metrics(meter: Meter, pid: sysinfo::Pid) -> Result<()> {
91 let sys_ = System::new_all();
92 let core_count = sys_
93 .physical_core_count()
94 .with_context(|| "Could not get physical core count")?;
95
96 let nvml = Nvml::init();
97
98 let process_cpu_utilization = meter
99 .f64_gauge(PROCESS_CPU_USAGE)
100 .with_description("The percentage of CPU in use.")
101 .with_unit("percent")
102 .build();
103 let process_cpu_usage = meter
104 .f64_gauge(PROCESS_CPU_UTILIZATION)
105 .with_description("The amount of CPU in use.")
106 .with_unit("percent")
107 .build();
108 let process_memory_usage = meter
109 .i64_gauge(PROCESS_MEMORY_USAGE)
110 .with_description("The amount of physical memory in use.")
111 .with_unit("byte")
112 .build();
113 let process_memory_virtual = meter
114 .i64_gauge(PROCESS_MEMORY_VIRTUAL)
115 .with_description("The amount of committed virtual memory.")
116 .with_unit("byte")
117 .build();
118 let process_disk_io = meter
119 .i64_gauge(PROCESS_DISK_IO)
120 .with_description("Disk bytes transferred.")
121 .with_unit("byte")
122 .build();
123
124 let process_gpu_memory_usage = meter
125 .u64_gauge(PROCESS_GPU_MEMORY_USAGE)
126 .with_description("The amount of physical GPU memory in use.")
127 .with_unit("byte")
128 .build();
129
130 let mut sys = System::new_all();
131 sys.refresh_all();
132
133 let common_attributes = if let Some(process) = sys.process(pid) {
134 [
135 KeyValue::new(PROCESS_PID, pid.as_u32().clone() as i64),
136 KeyValue::new(
137 PROCESS_EXECUTABLE_NAME,
138 process
139 .name()
140 .to_os_string()
141 .into_string()
142 .unwrap_or_default(),
143 ),
144 KeyValue::new(
145 PROCESS_EXECUTABLE_PATH,
146 process
147 .exe()
148 .map(|path| path.to_string_lossy().into_owned())
149 .unwrap_or_default(),
150 ),
151 KeyValue::new(
152 PROCESS_COMMAND,
153 process.cmd().iter().fold(String::new(), |t1, t2| {
154 t1 + " " + t2.to_str().unwrap_or_default()
155 }),
156 ),
157 ]
158 } else {
159 unimplemented!()
160 };
161
162 loop {
163 sleep(Duration::from_millis(500)).await;
164 sys.refresh_processes(sysinfo::ProcessesToUpdate::Some(&[pid]), true);
165
166 if let Some(process) = sys.process(pid) {
167 let cpu_usage = process.cpu_usage();
168 let disk_io = process.disk_usage();
169 process_cpu_usage.record(cpu_usage.into(), &[]);
172 process_cpu_utilization
173 .record((cpu_usage / core_count as f32).into(), &common_attributes);
174 process_memory_usage.record((process.memory()).try_into().unwrap(), &common_attributes);
175 process_memory_virtual.record(
176 (process.virtual_memory()).try_into().unwrap(),
177 &common_attributes,
178 );
179 process_disk_io.record(
180 disk_io.read_bytes.try_into().unwrap(),
181 &[
182 common_attributes.as_slice(),
183 &[KeyValue::new(DIRECTION, "read")],
184 ]
185 .concat(),
186 );
187 process_disk_io.record(
188 disk_io.written_bytes.try_into().unwrap(),
189 &[
190 common_attributes.as_slice(),
191 &[KeyValue::new(DIRECTION, "write")],
192 ]
193 .concat(),
194 );
195 }
196
197 match &nvml {
199 Ok(nvml) => {
200 if let Ok(device) = nvml.device_by_index(0) {
202 if let Ok(gpu_stats) = device.running_compute_processes() {
203 for stat in gpu_stats.iter() {
204 if stat.pid == pid.as_u32() {
205 let memory_used = match stat.used_gpu_memory {
206 UsedGpuMemory::Used(bytes) => bytes,
207 UsedGpuMemory::Unavailable => 0,
208 };
209
210 process_gpu_memory_usage.record(memory_used, &common_attributes);
211
212 break;
213 }
214 }
215
216 process_gpu_memory_usage.record(0, &common_attributes);
218 };
219 }
220 }
221 Err(_) => {
222 process_gpu_memory_usage.record(0, &common_attributes);
224 }
225 }
226 }
227}