opentelemetry_system_metrics/
lib.rs

1//! This is my awesome crate Enabling system metrics from process to be observed using opentelemetry.
2//! Current metrics observed are:
3//! - CPU
4//! - Memory
5//! - Disk
6//! - Network
7//! - GPU (optional, see below)
8//!
9//!
10//! # Getting started
11//!
12//! To use this crate, add the following to your `Cargo.toml`:
13//! ```toml
14//! [dependencies]
15//! opentelemetry = "0.29"
16//! opentelemetry-system-metrics = "0.4"
17//! tokio = { version = "1", features = ["full"] }
18//! sysinfo = "0.34"
19//! eyre = { version = "0.6", features = ["tokio"] }
20//! tracing = "0.1"
21//! ```
22//!
23//! ## GPU metrics
24//!
25//! GPU metrics are optional. To enable GPU metrics, enable the `gpu` feature for this crate.
26//!
27//! ```toml
28//! [dependencies]
29//! opentelemetry-system-metrics = { version = "0.4", features = ["gpu"] }
30//! ```
31//!
32//! ```
33//! use opentelemetry::global;
34//! use opentelemetry_system_metrics::init_process_observer;
35//!
36//! #[tokio::main]
37//! async fn main() {
38//!     use opentelemetry_system_metrics::init_process_observer_once;
39//!     let meter = global::meter("process-meter");
40//!     let result = init_process_observer_once(meter).await;
41//! }
42//! ```
43//!
44
45use eyre::ContextCompat;
46use eyre::Result;
47#[cfg(feature = "gpu")]
48use nvml_wrapper::enums::device::UsedGpuMemory;
49#[cfg(feature = "gpu")]
50use nvml_wrapper::Nvml;
51use opentelemetry::metrics::Meter;
52use opentelemetry::Key;
53use opentelemetry::KeyValue;
54use std::time::Duration;
55use sysinfo::{get_current_pid, System};
56
57const PROCESS_PID: Key = Key::from_static_str("process.pid");
58const PROCESS_EXECUTABLE_NAME: Key = Key::from_static_str("process.executable.name");
59const PROCESS_EXECUTABLE_PATH: Key = Key::from_static_str("process.executable.path");
60const PROCESS_COMMAND: Key = Key::from_static_str("process.command");
61const PROCESS_CPU_USAGE: &str = "process.cpu.usage";
62const PROCESS_CPU_UTILIZATION: &str = "process.cpu.utilization";
63const PROCESS_MEMORY_USAGE: &str = "process.memory.usage";
64const PROCESS_MEMORY_VIRTUAL: &str = "process.memory.virtual";
65const PROCESS_DISK_IO: &str = "process.disk.io";
66// const PROCESS_NETWORK_IO: &str = "process.network.io";
67const DIRECTION: Key = Key::from_static_str("direction");
68#[cfg(feature = "gpu")]
69const PROCESS_GPU_MEMORY_USAGE: &str = "process.gpu.memory.usage";
70
71/// Record asynchronously information about the current process.
72///
73/// # Parameters
74/// * `meter`: The OpenTelemetry meter to use for recording metrics.
75pub async fn init_process_observer(meter: Meter) -> Result<()> {
76    let pid =
77        get_current_pid().map_err(|err| eyre::eyre!("could not get current pid. Error: {err}"))?;
78    register_metrics(meter, pid, None).await
79}
80
81/// Record asynchronously information about a specific process by its PID.
82///
83/// # Parameters
84/// * `meter`: The OpenTelemetry meter to use for recording metrics.
85/// * `pid`: The PID of the process to observe.
86pub async fn init_process_observer_for_pid(meter: Meter, pid: u32) -> Result<()> {
87    let pid = sysinfo::Pid::from_u32(pid);
88    register_metrics(meter, pid, None).await
89}
90
91/// Record asynchronously information about the current process once.
92///
93/// # Parameters
94/// * `meter`: The OpenTelemetry meter to use for recording metrics.
95///
96/// # Example
97/// ```
98/// use opentelemetry::global;
99/// use opentelemetry_system_metrics::init_process_observer_once;
100///
101/// #[tokio::main]
102/// async fn main() {
103///    let meter = global::meter("process-meter");
104///   let result = init_process_observer_once(meter).await;
105/// }
106/// ```
107pub async fn init_process_observer_once(meter: Meter) -> Result<()> {
108    let pid =
109        get_current_pid().map_err(|err| eyre::eyre!("could not get current pid. Error: {err}"))?;
110    register_metrics(meter, pid, Some(1)).await
111}
112
113/// Register metrics for the current process.
114///
115/// # Parameters
116/// * `meter`: The OpenTelemetry meter to use for recording metrics.
117/// * `pid`: The PID of the process to observe.
118/// * `iterations`: Optional number of iterations to run the observer. If `None`, it will run indefinitely.
119///
120async fn register_metrics(
121    meter: Meter,
122    pid: sysinfo::Pid,
123    iterations: Option<usize>,
124) -> Result<()> {
125    let core_count =
126        System::physical_core_count().with_context(|| "Could not get physical core count")?;
127
128    #[cfg(feature = "gpu")]
129    let nvml = Nvml::init();
130
131    let process_cpu_utilization = meter
132        .f64_gauge(PROCESS_CPU_USAGE)
133        .with_description("The percentage of CPU in use.")
134        .with_unit("percent")
135        .build();
136    let process_cpu_usage = meter
137        .f64_gauge(PROCESS_CPU_UTILIZATION)
138        .with_description("The amount of CPU in use.")
139        .with_unit("percent")
140        .build();
141    let process_memory_usage = meter
142        .i64_gauge(PROCESS_MEMORY_USAGE)
143        .with_description("The amount of physical memory in use.")
144        .with_unit("byte")
145        .build();
146    let process_memory_virtual = meter
147        .i64_gauge(PROCESS_MEMORY_VIRTUAL)
148        .with_description("The amount of committed virtual memory.")
149        .with_unit("byte")
150        .build();
151    let process_disk_io = meter
152        .i64_gauge(PROCESS_DISK_IO)
153        .with_description("Disk bytes transferred.")
154        .with_unit("byte")
155        .build();
156
157    #[cfg(feature = "gpu")]
158    let process_gpu_memory_usage = meter
159        .u64_gauge(PROCESS_GPU_MEMORY_USAGE)
160        .with_description("The amount of physical GPU memory in use.")
161        .with_unit("byte")
162        .build();
163
164    let mut sys = System::new();
165    sys.refresh_processes(sysinfo::ProcessesToUpdate::Some(&[pid]), true);
166
167    let common_attributes = if let Some(process) = sys.process(pid) {
168        [
169            KeyValue::new(PROCESS_PID, pid.as_u32().clone() as i64),
170            KeyValue::new(
171                PROCESS_EXECUTABLE_NAME,
172                process
173                    .name()
174                    .to_os_string()
175                    .into_string()
176                    .unwrap_or_default(),
177            ),
178            KeyValue::new(
179                PROCESS_EXECUTABLE_PATH,
180                process
181                    .exe()
182                    .map(|path| path.to_string_lossy().into_owned())
183                    .unwrap_or_default(),
184            ),
185            KeyValue::new(
186                PROCESS_COMMAND,
187                process.cmd().iter().fold(String::new(), |t1, t2| {
188                    t1 + " " + t2.to_str().unwrap_or_default()
189                }),
190            ),
191        ]
192    } else {
193        unimplemented!()
194    };
195
196    let mut interval = tokio::time::interval(Duration::from_millis(
197        std::env::var("OTEL_METRIC_EXPORT_INTERVAL")
198            .unwrap_or_else(|_| "30000".to_string())
199            .parse::<u64>()
200            .unwrap_or(30000),
201    ));
202
203    let mut counter = 0;
204    loop {
205        interval.tick().await;
206
207        sys.refresh_processes(sysinfo::ProcessesToUpdate::Some(&[pid]), true);
208
209        if let Some(process) = sys.process(pid) {
210            let cpu_usage = process.cpu_usage();
211            let disk_io = process.disk_usage();
212            // let status = process.status();
213
214            process_cpu_usage.record(cpu_usage.into(), &[]);
215            process_cpu_utilization
216                .record((cpu_usage / core_count as f32).into(), &common_attributes);
217            process_memory_usage.record((process.memory()).try_into()?, &common_attributes);
218            process_memory_virtual
219                .record((process.virtual_memory()).try_into()?, &common_attributes);
220            process_disk_io.record(
221                disk_io.read_bytes.try_into()?,
222                &[
223                    common_attributes.as_slice(),
224                    &[KeyValue::new(DIRECTION, "read")],
225                ]
226                .concat(),
227            );
228            process_disk_io.record(
229                disk_io.written_bytes.try_into()?,
230                &[
231                    common_attributes.as_slice(),
232                    &[KeyValue::new(DIRECTION, "write")],
233                ]
234                .concat(),
235            );
236            if let Some(max) = iterations {
237                counter += 1;
238                if counter >= max && max > 0 {
239                    break Ok(());
240                }
241            }
242        }
243
244        #[cfg(feature = "gpu")]
245        {
246            match &nvml {
247                Ok(nvml) => {
248                    // Get the first `Device` (GPU) in the system
249                    if let Ok(device) = nvml.device_by_index(0) {
250                        if let Ok(gpu_stats) = device.running_compute_processes() {
251                            for stat in gpu_stats.iter() {
252                                if stat.pid == pid.as_u32() {
253                                    let memory_used = match stat.used_gpu_memory {
254                                        UsedGpuMemory::Used(bytes) => bytes,
255                                        UsedGpuMemory::Unavailable => 0,
256                                    };
257
258                                    process_gpu_memory_usage
259                                        .record(memory_used, &common_attributes);
260
261                                    break;
262                                }
263                            }
264                        };
265                    }
266                }
267                Err(_) => {
268                    // If we can't get the NVML, we just put 0.
269                }
270            }
271        }
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278    use opentelemetry::global;
279    use tokio::runtime::Runtime;
280
281    #[test]
282    fn test_init_process_observer_once() {
283        let rt = Runtime::new().unwrap();
284        rt.block_on(async {
285            let meter = global::meter("test-meter");
286            let result = init_process_observer_once(meter).await;
287            assert!(result.is_ok());
288        });
289    }
290}