opentelemetry_system_metrics/
lib.rs

1//! This is my awesome crate Enabling system metrics from process to be observed using opentelemetry.
2//! Current metrics observed are:
3//! - CPU
4//! - Memory
5//! - Disk
6//! - Network
7//!
8//!
9//! # Getting started
10//!
11//! ```
12//! use opentelemetry::global;
13//! use opentelemetry_system_metrics::init_process_observer;
14//!
15//! let meter = global::meter("process-meter");
16//! init_process_observer(meter);
17//! ```
18//!
19
20use std::time::Duration;
21
22use eyre::ContextCompat;
23use eyre::Result;
24use nvml_wrapper::enums::device::UsedGpuMemory;
25use nvml_wrapper::Nvml;
26
27use opentelemetry::KeyValue;
28use sysinfo::{get_current_pid, System};
29
30use opentelemetry::metrics::Meter;
31use opentelemetry::Key;
32use tokio::time::sleep;
33
34const PROCESS_PID: Key = Key::from_static_str("process.pid");
35const PROCESS_EXECUTABLE_NAME: Key = Key::from_static_str("process.executable.name");
36const PROCESS_EXECUTABLE_PATH: Key = Key::from_static_str("process.executable.path");
37const PROCESS_COMMAND: Key = Key::from_static_str("process.command");
38
39// Not implemented yet!
40//
41// const PROCESS_COMMAND_LINE: Key = Key::from_static_str("process.command_line");
42// const PROCESS_COMMAND_ARGS: Key = Key::from_static_str("process.command_args");
43// const PROCESS_OWNER: Key = Key::from_static_str("process.owner");
44
45const PROCESS_CPU_USAGE: &str = "process.cpu.usage";
46const PROCESS_CPU_UTILIZATION: &str = "process.cpu.utilization";
47const PROCESS_MEMORY_USAGE: &str = "process.memory.usage";
48const PROCESS_MEMORY_VIRTUAL: &str = "process.memory.virtual";
49const PROCESS_DISK_IO: &str = "process.disk.io";
50// const PROCESS_NETWORK_IO: &str = "process.network.io";
51const DIRECTION: Key = Key::from_static_str("direction");
52
53// const PROCESS_GPU_USAGE: &str = "process.gpu.usage";
54const PROCESS_GPU_MEMORY_USAGE: &str = "process.gpu.memory.usage";
55
56/// Record asynchronnously information about the current process.
57/// # Example
58///
59/// ```
60/// use opentelemetry::global;
61/// use opentelemetry_system_metrics::init_process_observer;
62///
63/// let meter = global::meter("process-meter");
64/// init_process_observer(meter);
65/// ```
66///
67pub async fn init_process_observer(meter: Meter) -> Result<()> {
68    let pid =
69        get_current_pid().map_err(|err| eyre::eyre!("could not get current pid. Error: {err}"))?;
70    register_metrics(meter, pid).await
71}
72
73/// Record asynchronously information about a specific process by its PID.
74/// # Example
75///
76/// ```
77/// use opentelemetry::global;
78/// use opentelemetry_system_metrics::init_process_observer_for_pid;
79///
80/// let meter = global::meter("process-meter");
81/// let pid = 1234; // replace with the actual PID
82/// init_process_observer_for_pid(meter, pid).await;
83/// ```
84///
85pub async fn init_process_observer_for_pid(meter: Meter, pid: u32) -> Result<()> {
86    let pid = sysinfo::Pid::from_u32(pid);
87    register_metrics(meter, pid).await
88}
89
90async fn register_metrics(meter: Meter, pid: sysinfo::Pid) -> Result<()> {
91    let sys_ = System::new_all();
92    let core_count = sys_
93        .physical_core_count()
94        .with_context(|| "Could not get physical core count")?;
95
96    let nvml = Nvml::init();
97
98    let process_cpu_utilization = meter
99        .f64_gauge(PROCESS_CPU_USAGE)
100        .with_description("The percentage of CPU in use.")
101        .with_unit("percent")
102        .build();
103    let process_cpu_usage = meter
104        .f64_gauge(PROCESS_CPU_UTILIZATION)
105        .with_description("The amount of CPU in use.")
106        .with_unit("percent")
107        .build();
108    let process_memory_usage = meter
109        .i64_gauge(PROCESS_MEMORY_USAGE)
110        .with_description("The amount of physical memory in use.")
111        .with_unit("byte")
112        .build();
113    let process_memory_virtual = meter
114        .i64_gauge(PROCESS_MEMORY_VIRTUAL)
115        .with_description("The amount of committed virtual memory.")
116        .with_unit("byte")
117        .build();
118    let process_disk_io = meter
119        .i64_gauge(PROCESS_DISK_IO)
120        .with_description("Disk bytes transferred.")
121        .with_unit("byte")
122        .build();
123
124    let process_gpu_memory_usage = meter
125        .u64_gauge(PROCESS_GPU_MEMORY_USAGE)
126        .with_description("The amount of physical GPU memory in use.")
127        .with_unit("byte")
128        .build();
129
130    let mut sys = System::new_all();
131    sys.refresh_all();
132
133    let common_attributes = if let Some(process) = sys.process(pid) {
134        [
135            KeyValue::new(PROCESS_PID, pid.as_u32().clone() as i64),
136            KeyValue::new(
137                PROCESS_EXECUTABLE_NAME,
138                process
139                    .name()
140                    .to_os_string()
141                    .into_string()
142                    .unwrap_or_default(),
143            ),
144            KeyValue::new(
145                PROCESS_EXECUTABLE_PATH,
146                process
147                    .exe()
148                    .map(|path| path.to_string_lossy().into_owned())
149                    .unwrap_or_default(),
150            ),
151            KeyValue::new(
152                PROCESS_COMMAND,
153                process.cmd().iter().fold(String::new(), |t1, t2| {
154                    t1 + " " + t2.to_str().unwrap_or_default()
155                }),
156            ),
157        ]
158    } else {
159        unimplemented!()
160    };
161
162    loop {
163        sleep(Duration::from_millis(500)).await;
164        sys.refresh_processes(sysinfo::ProcessesToUpdate::Some(&[pid]), true);
165
166        if let Some(process) = sys.process(pid) {
167            let cpu_usage = process.cpu_usage();
168            let disk_io = process.disk_usage();
169            // let status = process.status();
170
171            process_cpu_usage.record(cpu_usage.into(), &[]);
172            process_cpu_utilization
173                .record((cpu_usage / core_count as f32).into(), &common_attributes);
174            process_memory_usage.record((process.memory()).try_into().unwrap(), &common_attributes);
175            process_memory_virtual.record(
176                (process.virtual_memory()).try_into().unwrap(),
177                &common_attributes,
178            );
179            process_disk_io.record(
180                disk_io.read_bytes.try_into().unwrap(),
181                &[
182                    common_attributes.as_slice(),
183                    &[KeyValue::new(DIRECTION, "read")],
184                ]
185                .concat(),
186            );
187            process_disk_io.record(
188                disk_io.written_bytes.try_into().unwrap(),
189                &[
190                    common_attributes.as_slice(),
191                    &[KeyValue::new(DIRECTION, "write")],
192                ]
193                .concat(),
194            );
195        }
196
197        // let mut last_timestamp = last_timestamp.lock().unwrap().clone();
198        match &nvml {
199            Ok(nvml) => {
200                // Get the first `Device` (GPU) in the system
201                if let Ok(device) = nvml.device_by_index(0) {
202                    if let Ok(gpu_stats) = device.running_compute_processes() {
203                        for stat in gpu_stats.iter() {
204                            if stat.pid == pid.as_u32() {
205                                let memory_used = match stat.used_gpu_memory {
206                                    UsedGpuMemory::Used(bytes) => bytes,
207                                    UsedGpuMemory::Unavailable => 0,
208                                };
209
210                                process_gpu_memory_usage.record(memory_used, &common_attributes);
211
212                                break;
213                            }
214                        }
215
216                        // If the loop finishes and no pid matched our pid, put 0.
217                        process_gpu_memory_usage.record(0, &common_attributes);
218                    };
219                }
220            }
221            Err(_) => {
222                // If we can't get the NVML, we just put 0.
223                process_gpu_memory_usage.record(0, &common_attributes);
224            }
225        }
226    }
227}