kubert_prometheus_process/
lib.rs

1//! Process metrics for Prometheus.
2//!
3//! This crate registers a collector that provides the standard set of [Process
4//! metrics][pm].
5//!
6//! ```
7//! let mut prom = prometheus_client::registry::Registry::default();
8//! if let Err(error) =
9//!     kubert_prometheus_process::register(prom.sub_registry_with_prefix("process"))
10//! {
11//!     tracing::warn!(%error, "Failed to register process metrics");
12//! }
13//! ```
14//!
15//! [pm]: https://prometheus.io/docs/instrumenting/writing_clientlibs/#process-metrics
16//
17// Based on linkerd2-proxy.
18//
19// Copyright The Linkerd Authors
20//
21// Licensed under the Apache License, Version 2.0 (the "License");
22// you may not use this file except in compliance with the License.
23// You may obtain a copy of the License at
24//
25//     http://www.apache.org/licenses/LICENSE-2.0
26//
27// Unless required by applicable law or agreed to in writing, software
28// distributed under the License is distributed on an "AS IS" BASIS,
29// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
30// See the License for the specific language governing permissions and
31// limitations under the License.
32
33#![deny(
34    rust_2018_idioms,
35    clippy::disallowed_methods,
36    unsafe_code,
37    missing_docs
38)]
39#![cfg_attr(docsrs, feature(doc_cfg))]
40
41use prometheus_client::{
42    collector::Collector,
43    encoding::{DescriptorEncoder, EncodeMetric},
44    metrics::{
45        counter::ConstCounter,
46        gauge::{self, ConstGauge, Gauge},
47        MetricType,
48    },
49    registry::{Registry, Unit},
50};
51use std::time::{Instant, SystemTime, UNIX_EPOCH};
52
53/// Registers process metrics with the given registry. Note that the 'process_'
54/// prefix is NOT added and should be specified by the caller if desired.
55pub fn register(reg: &mut Registry) -> std::io::Result<()> {
56    let start_time = Instant::now();
57    let start_time_from_epoch = SystemTime::now()
58        .duration_since(UNIX_EPOCH)
59        .expect("process start time");
60
61    #[cfg(target_os = "linux")]
62    let system = linux::System::load()?;
63
64    reg.register_with_unit(
65        "start_time",
66        "Time that the process started (in seconds since the UNIX epoch)",
67        Unit::Seconds,
68        ConstGauge::new(start_time_from_epoch.as_secs_f64()),
69    );
70
71    let clock_time_ts = Gauge::<f64, ClockMetric>::default();
72    reg.register_with_unit(
73        "clock_time",
74        "Current system time for this proxy",
75        Unit::Seconds,
76        clock_time_ts,
77    );
78
79    reg.register_collector(Box::new(ProcessCollector {
80        start_time,
81        #[cfg(target_os = "linux")]
82        system,
83    }));
84
85    Ok(())
86}
87
88#[derive(Debug)]
89struct ProcessCollector {
90    start_time: Instant,
91    #[cfg(target_os = "linux")]
92    system: linux::System,
93}
94
95impl Collector for ProcessCollector {
96    fn encode(&self, mut encoder: DescriptorEncoder<'_>) -> std::fmt::Result {
97        let uptime = ConstCounter::new(
98            Instant::now()
99                .saturating_duration_since(self.start_time)
100                .as_secs_f64(),
101        );
102        let ue = encoder.encode_descriptor(
103            "uptime",
104            "Total time since the process started (in seconds)",
105            Some(&Unit::Seconds),
106            MetricType::Counter,
107        )?;
108        uptime.encode(ue)?;
109
110        #[cfg(target_os = "linux")]
111        self.system.encode(encoder)?;
112
113        Ok(())
114    }
115}
116
117// Metric that always reports the current system time on a call to [`get`].
118#[derive(Copy, Clone, Debug, Default)]
119struct ClockMetric;
120
121impl gauge::Atomic<f64> for ClockMetric {
122    fn inc(&self) -> f64 {
123        self.get()
124    }
125
126    fn inc_by(&self, _v: f64) -> f64 {
127        self.get()
128    }
129
130    fn dec(&self) -> f64 {
131        self.get()
132    }
133
134    fn dec_by(&self, _v: f64) -> f64 {
135        self.get()
136    }
137
138    fn set(&self, _v: f64) -> f64 {
139        self.get()
140    }
141
142    fn get(&self) -> f64 {
143        match SystemTime::now().duration_since(UNIX_EPOCH) {
144            Ok(elapsed) => elapsed.as_secs_f64().floor(),
145            Err(e) => {
146                tracing::warn!(
147                    "System time is before the UNIX epoch; reporting negative timestamp"
148                );
149                -e.duration().as_secs_f64().floor()
150            }
151        }
152    }
153}
154
155#[cfg(target_os = "linux")]
156mod linux {
157    use super::*;
158    use libc::{self, pid_t};
159    use process::Stat;
160    use procfs::{
161        process::{self, LimitValue, Process},
162        ProcResult,
163    };
164    use std::time::Duration;
165    use std::{fs, io};
166    use tracing::{error, warn};
167
168    #[derive(Clone, Debug)]
169    pub(super) struct System {
170        page_size: u64,
171        ms_per_tick: u64,
172    }
173
174    impl System {
175        pub fn load() -> std::io::Result<Self> {
176            let page_size = page_size()?;
177            let ms_per_tick = ms_per_tick()?;
178            Ok(Self {
179                page_size,
180                ms_per_tick,
181            })
182        }
183    }
184
185    impl Collector for System {
186        fn encode(&self, mut encoder: DescriptorEncoder<'_>) -> std::fmt::Result {
187            let stat = match blocking_stat() {
188                Ok(stat) => stat,
189                Err(error) => {
190                    tracing::warn!(%error, "Failed to read process stats");
191                    return Ok(());
192                }
193            };
194
195            let clock_ticks = stat.utime + stat.stime;
196            let cpu = ConstCounter::new(
197                Duration::from_millis(clock_ticks * self.ms_per_tick).as_secs_f64(),
198            );
199            let cpue = encoder.encode_descriptor(
200                "cpu",
201                "Total user and system CPU time spent in seconds",
202                Some(&Unit::Seconds),
203                MetricType::Counter,
204            )?;
205            cpu.encode(cpue)?;
206
207            let vm_bytes = ConstGauge::new(stat.vsize as i64);
208            let vme = encoder.encode_descriptor(
209                "virtual_memory",
210                "Virtual memory size in bytes",
211                Some(&Unit::Bytes),
212                MetricType::Gauge,
213            )?;
214            vm_bytes.encode(vme)?;
215
216            let rss_bytes = ConstGauge::new((stat.rss * self.page_size) as i64);
217            let rsse = encoder.encode_descriptor(
218                "resident_memory",
219                "Resident memory size in bytes",
220                Some(&Unit::Bytes),
221                MetricType::Gauge,
222            )?;
223            rss_bytes.encode(rsse)?;
224
225            match open_fds(stat.pid) {
226                Ok(open_fds) => {
227                    let fds = ConstGauge::new(open_fds as i64);
228                    let fdse = encoder.encode_descriptor(
229                        "open_fds",
230                        "Number of open file descriptors",
231                        None,
232                        MetricType::Gauge,
233                    )?;
234                    fds.encode(fdse)?;
235                }
236                Err(error) => {
237                    tracing::warn!(%error, "Could not determine open fds");
238                }
239            }
240
241            match max_fds() {
242                Ok(max_fds) => {
243                    let fds = ConstGauge::new(max_fds as i64);
244                    let fdse = encoder.encode_descriptor(
245                        "max_fds",
246                        "Maximum number of open file descriptors",
247                        None,
248                        MetricType::Gauge,
249                    )?;
250                    fds.encode(fdse)?;
251                }
252                Err(error) => {
253                    tracing::warn!(%error, "Could not determine max fds");
254                }
255            }
256
257            let threads = ConstGauge::new(stat.num_threads);
258            let te = encoder.encode_descriptor(
259                "threads",
260                "Number of OS threads in the process.",
261                None,
262                MetricType::Gauge,
263            )?;
264            threads.encode(te)?;
265
266            Ok(())
267        }
268    }
269
270    fn page_size() -> io::Result<u64> {
271        sysconf(libc::_SC_PAGESIZE, "page size")
272    }
273
274    fn ms_per_tick() -> io::Result<u64> {
275        // On Linux, CLK_TCK is ~always `100`, so pure integer division
276        // works. This is probably not suitable if we encounter other
277        // values.
278        let clock_ticks_per_sec = sysconf(libc::_SC_CLK_TCK, "clock ticks per second")?;
279        let ms_per_tick = 1_000 / clock_ticks_per_sec;
280        if clock_ticks_per_sec != 100 {
281            warn!(
282                clock_ticks_per_sec,
283                ms_per_tick, "Unexpected value; process_cpu_seconds_total may be inaccurate."
284            );
285        }
286        Ok(ms_per_tick)
287    }
288
289    fn blocking_stat() -> ProcResult<Stat> {
290        Process::myself()?.stat()
291    }
292
293    fn open_fds(pid: pid_t) -> io::Result<u64> {
294        let mut open = 0;
295        for f in fs::read_dir(format!("/proc/{}/fd", pid))? {
296            if !f?.file_type()?.is_dir() {
297                open += 1;
298            }
299        }
300        Ok(open)
301    }
302
303    fn max_fds() -> ProcResult<u64> {
304        let limits = Process::myself()?.limits()?.max_open_files;
305        match limits.soft_limit {
306            LimitValue::Unlimited => match limits.hard_limit {
307                LimitValue::Unlimited => Ok(0),
308                LimitValue::Value(hard) => Ok(hard),
309            },
310            LimitValue::Value(soft) => Ok(soft),
311        }
312    }
313
314    #[allow(unsafe_code)]
315    fn sysconf(num: libc::c_int, name: &'static str) -> Result<u64, io::Error> {
316        match unsafe { libc::sysconf(num) } {
317            e if e <= 0 => {
318                let error = io::Error::last_os_error();
319                error!("error getting {}: {:?}", name, error);
320                Err(error)
321            }
322            val => Ok(val as u64),
323        }
324    }
325}