kubert_prometheus_process/
lib.rs1#![deny(
34 rust_2018_idioms,
35 clippy::disallowed_methods,
36 unsafe_code,
37 missing_docs
38)]
39#![cfg_attr(docsrs, feature(doc_cfg))]
40
41use prometheus_client::{
42 collector::Collector,
43 encoding::{DescriptorEncoder, EncodeMetric},
44 metrics::{
45 counter::ConstCounter,
46 gauge::{self, ConstGauge, Gauge},
47 MetricType,
48 },
49 registry::{Registry, Unit},
50};
51use std::time::{Instant, SystemTime, UNIX_EPOCH};
52
53pub fn register(reg: &mut Registry) -> std::io::Result<()> {
56 let start_time = Instant::now();
57 let start_time_from_epoch = SystemTime::now()
58 .duration_since(UNIX_EPOCH)
59 .expect("process start time");
60
61 #[cfg(target_os = "linux")]
62 let system = linux::System::load()?;
63
64 reg.register_with_unit(
65 "start_time",
66 "Time that the process started (in seconds since the UNIX epoch)",
67 Unit::Seconds,
68 ConstGauge::new(start_time_from_epoch.as_secs_f64()),
69 );
70
71 let clock_time_ts = Gauge::<f64, ClockMetric>::default();
72 reg.register_with_unit(
73 "clock_time",
74 "Current system time for this proxy",
75 Unit::Seconds,
76 clock_time_ts,
77 );
78
79 reg.register_collector(Box::new(ProcessCollector {
80 start_time,
81 #[cfg(target_os = "linux")]
82 system,
83 }));
84
85 Ok(())
86}
87
88#[derive(Debug)]
89struct ProcessCollector {
90 start_time: Instant,
91 #[cfg(target_os = "linux")]
92 system: linux::System,
93}
94
95impl Collector for ProcessCollector {
96 fn encode(&self, mut encoder: DescriptorEncoder<'_>) -> std::fmt::Result {
97 let uptime = ConstCounter::new(
98 Instant::now()
99 .saturating_duration_since(self.start_time)
100 .as_secs_f64(),
101 );
102 let ue = encoder.encode_descriptor(
103 "uptime",
104 "Total time since the process started (in seconds)",
105 Some(&Unit::Seconds),
106 MetricType::Counter,
107 )?;
108 uptime.encode(ue)?;
109
110 #[cfg(target_os = "linux")]
111 self.system.encode(encoder)?;
112
113 Ok(())
114 }
115}
116
117#[derive(Copy, Clone, Debug, Default)]
119struct ClockMetric;
120
121impl gauge::Atomic<f64> for ClockMetric {
122 fn inc(&self) -> f64 {
123 self.get()
124 }
125
126 fn inc_by(&self, _v: f64) -> f64 {
127 self.get()
128 }
129
130 fn dec(&self) -> f64 {
131 self.get()
132 }
133
134 fn dec_by(&self, _v: f64) -> f64 {
135 self.get()
136 }
137
138 fn set(&self, _v: f64) -> f64 {
139 self.get()
140 }
141
142 fn get(&self) -> f64 {
143 match SystemTime::now().duration_since(UNIX_EPOCH) {
144 Ok(elapsed) => elapsed.as_secs_f64().floor(),
145 Err(e) => {
146 tracing::warn!(
147 "System time is before the UNIX epoch; reporting negative timestamp"
148 );
149 -e.duration().as_secs_f64().floor()
150 }
151 }
152 }
153}
154
155#[cfg(target_os = "linux")]
156mod linux {
157 use super::*;
158 use libc::{self, pid_t};
159 use process::Stat;
160 use procfs::{
161 process::{self, LimitValue, Process},
162 ProcResult,
163 };
164 use std::time::Duration;
165 use std::{fs, io};
166 use tracing::{error, warn};
167
168 #[derive(Clone, Debug)]
169 pub(super) struct System {
170 page_size: u64,
171 ms_per_tick: u64,
172 }
173
174 impl System {
175 pub fn load() -> std::io::Result<Self> {
176 let page_size = page_size()?;
177 let ms_per_tick = ms_per_tick()?;
178 Ok(Self {
179 page_size,
180 ms_per_tick,
181 })
182 }
183 }
184
185 impl Collector for System {
186 fn encode(&self, mut encoder: DescriptorEncoder<'_>) -> std::fmt::Result {
187 let stat = match blocking_stat() {
188 Ok(stat) => stat,
189 Err(error) => {
190 tracing::warn!(%error, "Failed to read process stats");
191 return Ok(());
192 }
193 };
194
195 let clock_ticks = stat.utime + stat.stime;
196 let cpu = ConstCounter::new(
197 Duration::from_millis(clock_ticks * self.ms_per_tick).as_secs_f64(),
198 );
199 let cpue = encoder.encode_descriptor(
200 "cpu",
201 "Total user and system CPU time spent in seconds",
202 Some(&Unit::Seconds),
203 MetricType::Counter,
204 )?;
205 cpu.encode(cpue)?;
206
207 let vm_bytes = ConstGauge::new(stat.vsize as i64);
208 let vme = encoder.encode_descriptor(
209 "virtual_memory",
210 "Virtual memory size in bytes",
211 Some(&Unit::Bytes),
212 MetricType::Gauge,
213 )?;
214 vm_bytes.encode(vme)?;
215
216 let rss_bytes = ConstGauge::new((stat.rss * self.page_size) as i64);
217 let rsse = encoder.encode_descriptor(
218 "resident_memory",
219 "Resident memory size in bytes",
220 Some(&Unit::Bytes),
221 MetricType::Gauge,
222 )?;
223 rss_bytes.encode(rsse)?;
224
225 match open_fds(stat.pid) {
226 Ok(open_fds) => {
227 let fds = ConstGauge::new(open_fds as i64);
228 let fdse = encoder.encode_descriptor(
229 "open_fds",
230 "Number of open file descriptors",
231 None,
232 MetricType::Gauge,
233 )?;
234 fds.encode(fdse)?;
235 }
236 Err(error) => {
237 tracing::warn!(%error, "Could not determine open fds");
238 }
239 }
240
241 match max_fds() {
242 Ok(max_fds) => {
243 let fds = ConstGauge::new(max_fds as i64);
244 let fdse = encoder.encode_descriptor(
245 "max_fds",
246 "Maximum number of open file descriptors",
247 None,
248 MetricType::Gauge,
249 )?;
250 fds.encode(fdse)?;
251 }
252 Err(error) => {
253 tracing::warn!(%error, "Could not determine max fds");
254 }
255 }
256
257 let threads = ConstGauge::new(stat.num_threads);
258 let te = encoder.encode_descriptor(
259 "threads",
260 "Number of OS threads in the process.",
261 None,
262 MetricType::Gauge,
263 )?;
264 threads.encode(te)?;
265
266 Ok(())
267 }
268 }
269
270 fn page_size() -> io::Result<u64> {
271 sysconf(libc::_SC_PAGESIZE, "page size")
272 }
273
274 fn ms_per_tick() -> io::Result<u64> {
275 let clock_ticks_per_sec = sysconf(libc::_SC_CLK_TCK, "clock ticks per second")?;
279 let ms_per_tick = 1_000 / clock_ticks_per_sec;
280 if clock_ticks_per_sec != 100 {
281 warn!(
282 clock_ticks_per_sec,
283 ms_per_tick, "Unexpected value; process_cpu_seconds_total may be inaccurate."
284 );
285 }
286 Ok(ms_per_tick)
287 }
288
289 fn blocking_stat() -> ProcResult<Stat> {
290 Process::myself()?.stat()
291 }
292
293 fn open_fds(pid: pid_t) -> io::Result<u64> {
294 let mut open = 0;
295 for f in fs::read_dir(format!("/proc/{}/fd", pid))? {
296 if !f?.file_type()?.is_dir() {
297 open += 1;
298 }
299 }
300 Ok(open)
301 }
302
303 fn max_fds() -> ProcResult<u64> {
304 let limits = Process::myself()?.limits()?.max_open_files;
305 match limits.soft_limit {
306 LimitValue::Unlimited => match limits.hard_limit {
307 LimitValue::Unlimited => Ok(0),
308 LimitValue::Value(hard) => Ok(hard),
309 },
310 LimitValue::Value(soft) => Ok(soft),
311 }
312 }
313
314 #[allow(unsafe_code)]
315 fn sysconf(num: libc::c_int, name: &'static str) -> Result<u64, io::Error> {
316 match unsafe { libc::sysconf(num) } {
317 e if e <= 0 => {
318 let error = io::Error::last_os_error();
319 error!("error getting {}: {:?}", name, error);
320 Err(error)
321 }
322 val => Ok(val as u64),
323 }
324 }
325}