readings_probe/
lib.rs

1//! # Instrumentation Probe for [Readings](http://github.com/kali/readings)
2//!
3//!
4//! Readings goal is to make process vital metrics intrumentation as easy as
5//! possible.
6//!
7//! This is the instrumentation library that must be embedded in the client
8//! code.
9//!
10//! Please refer to the [Readings](http://github.com/kali/readings)
11//!
12//!
13//! ```rust
14//! // this is optional but the cost may be worth it. YMMV. It instruments
15//! // Rust global allocator.
16//! readings_probe::instrumented_allocator!();
17//!
18//! fn main() -> readings_probe::ReadingsResult<()> {
19//!     // setup the probe
20//!     let mut probe =
21//!         readings_probe::Probe::new(std::fs::File::create("readings.out").unwrap()).unwrap();
22//!
23//!     // We will use an AtomicI64 to communicate a user-defined metrics ("progress") to the probe.
24//!     let progress = probe.register_i64("progress".to_string())?;
25//!
26//!     // Starts the probe (1sec i a lot. heartbeat can be realistically set as low as a few millis).
27//!     probe.spawn_heartbeat(std::time::Duration::from_millis(1000))?;
28//!
29//!     // do some stuff, update progress
30//!     let percent_done = 12;
31//!     progress.store(percent_done, std::sync::atomic::Ordering::Relaxed);
32//!
33//!     // do more stuff, log an event
34//!     probe.log_event("about to get crazy")?;
35//!
36//!     // still more stuff, and another event
37//!     probe.log_event("done")?;
38//!     Ok(())
39//! }
40//! ```
41
42extern crate lazy_static;
43
44/// allocator instrumentation
45pub mod alloc;
46/// grobal default probe instance and associated macros
47pub mod global;
48
49use std::io::Write;
50use std::sync::atomic::AtomicI64;
51use std::sync::atomic::Ordering::Relaxed;
52use std::sync::Arc;
53use std::time::Duration;
54use std::{io, sync, time};
55
56use thiserror::Error;
57
58/// Reading error enumeration.
59#[derive(Error, Debug)]
60pub enum ReadingsError {
61    #[error("Metrics can only be added before first event")]
62    LateRegistertingMetricsAttempt,
63    #[error("io Error accessing /proc/self/stat")]
64    ProcStat(io::Error),
65    #[error("Io error writing readings")]
66    Io(#[from] io::Error),
67    #[error("Poisoned probe")]
68    PoisonedProbe,
69}
70
71/// Reading generic Result helper.
72pub type ReadingsResult<T> = Result<T, ReadingsError>;
73
74#[cfg(any(target_os = "linux", target_os = "android"))]
75mod linux;
76
77#[cfg(any(target_os = "macos", target_os = "ios"))]
78mod macos;
79
80#[cfg(target_os = "windows")]
81mod windows;
82
83/// Returns metrics from the operating system interface.
84///
85/// Beware, not all operating systems are made equal.
86#[allow(unreachable_code)]
87pub fn get_os_readings() -> ReadingsResult<OsReadings> {
88    #[cfg(any(target_os = "linux", target_os = "android"))]
89    return linux::get_os_readings();
90    #[cfg(any(target_os = "macos", target_os = "ios"))]
91    return macos::get_os_readings();
92    #[cfg(target_os = "windows")]
93    return windows::get_os_readings();
94    return unsafe { Ok(std::mem::zeroed()) };
95}
96
97#[derive(Debug)]
98pub struct OsReadings {
99    /// Process virtual size
100    pub virtual_size: u64,
101    /// Process resident size
102    pub resident_size: u64,
103    /// Process resident size high-water mark
104    pub resident_size_max: u64,
105    /// CPU Time in userland (in s)
106    pub user_time: Duration,
107    /// CPU Time in kernel (in s)
108    pub system_time: Duration,
109    /// Minor faults counter
110    pub minor_fault: u64,
111    /// Minor faults counter
112    pub major_fault: u64,
113}
114
115/// The interface to reading probe.
116#[derive(Clone)]
117pub struct Probe(sync::Arc<sync::Mutex<ProbeData>>);
118
119struct ProbeData {
120    cores: usize,
121    origin: Option<std::time::Instant>,
122    writer: io::BufWriter<Box<dyn io::Write + Send>>,
123    metrics_i64: Vec<(String, Arc<AtomicI64>)>,
124}
125
126impl ProbeData {
127    fn write_line(&mut self, now: time::Instant, reason: &str) -> ReadingsResult<()> {
128        if self.origin.is_none() {
129            write!(self.writer, "   time cor        vsz        rsz     rszmax")?;
130            write!(self.writer, "    utime    stime       minf       majf")?;
131            write!(self.writer, "      alloc       free")?;
132            for m in &self.metrics_i64 {
133                write!(self.writer, " {:>10}", m.0)?;
134            }
135            writeln!(self.writer, " event")?;
136            self.origin = Some(now)
137        }
138        let usage = get_os_readings()?;
139        write!(
140            self.writer,
141            "{:7.3} {:3}",
142            (now - self.origin.unwrap()).as_secs_f32(),
143            self.cores
144        )?;
145        write!(
146            self.writer,
147            " {:10} {:10} {:10}",
148            usage.virtual_size, usage.resident_size, usage.resident_size_max
149        )?;
150        write!(
151            self.writer,
152            " {:8.6} {:8.6} {:10} {:10}",
153            usage.user_time.as_secs_f64(),
154            usage.system_time.as_secs_f64(),
155            usage.minor_fault,
156            usage.major_fault
157        )?;
158        write!(
159            self.writer,
160            " {:10} {:10}",
161            alloc::ALLOCATED.load(Relaxed),
162            alloc::FREEED.load(Relaxed)
163        )?;
164        for m in &self.metrics_i64 {
165            write!(self.writer, " {:10}", m.1.load(Relaxed),)?;
166        }
167        writeln!(self.writer, " {}", reason)?;
168        self.writer.flush()?;
169        Ok(())
170    }
171}
172
173impl Probe {
174    /// Creates a probe logging its data to Write implementation (usually a
175    /// file).
176    pub fn new<W: Write + Send + 'static>(write: W) -> ReadingsResult<Probe> {
177        let mut writer = io::BufWriter::new(Box::new(write) as _);
178        writeln!(writer, "#ReadingsV1")?;
179        let data = ProbeData {
180            cores: num_cpus::get(),
181            origin: None,
182            writer,
183            metrics_i64: vec![],
184        };
185        Ok(Probe(sync::Arc::new(sync::Mutex::new(data))))
186    }
187
188    /// Register an i64 used-defined metric.
189    ///
190    /// Must be called prior to the first call to `log_event` or `spawn_heartbeat`.
191    ///
192    /// The result is shared AtomicI64 that can be used by client code to share
193    /// communicate updates with the probe.
194    ///
195    /// TODO: type-enforce this using the Builder pattern
196    pub fn register_i64<S: AsRef<str>>(&mut self, name: S) -> ReadingsResult<Arc<AtomicI64>> {
197        let mut m = self.0.lock().map_err(|_| ReadingsError::PoisonedProbe)?;
198        if m.origin.is_some() {
199            return Err(ReadingsError::LateRegistertingMetricsAttempt);
200        }
201        let it = Arc::new(AtomicI64::new(0));
202        m.metrics_i64
203            .push((name.as_ref().replace(" ", "_"), it.clone()));
204        Ok(it)
205    }
206
207    /// Spawn a thread that will record all vitals at every "interval".
208    pub fn spawn_heartbeat(&mut self, interval: time::Duration) -> ReadingsResult<()> {
209        let probe = self.clone();
210        probe.log_event("spawned_heartbeat")?;
211        let origin = probe
212            .0
213            .lock()
214            .map_err(|_| ReadingsError::PoisonedProbe)?
215            .origin
216            .unwrap();
217        std::thread::spawn(move || {
218            for step in 1.. {
219                let wanted = origin + (step * interval);
220                let now = std::time::Instant::now();
221                if wanted > now {
222                    ::std::thread::sleep(wanted - now);
223                }
224                if let Err(e) = probe.log_event("") {
225                    eprintln!("{:?}", e);
226                }
227            }
228        });
229        Ok(())
230    }
231
232    /// Log an individual event with a label and the current values of metrics.
233    pub fn log_event(&self, event: &str) -> ReadingsResult<()> {
234        self.write_line(std::time::Instant::now(), &event.replace(" ", "_"))
235    }
236
237    /// Recover a pre-registered used-defined metrics from the probe.
238    ///
239    /// The result is shared AtomicI64 that can be used by client code to share
240    /// communicate updates with the probe.
241    ///
242    /// It is more efficient for the client code to keep the shared AtomicI64 somewhere
243    /// handy than calling this at every update. Nevertheless, it allows for
244    /// intermediate code to just have to propagate the probe without worrying
245    /// about the various metrics that the underlying code may need.
246    pub fn get_i64<S: AsRef<str>>(&self, name: S) -> Option<Arc<AtomicI64>> {
247        let name = name.as_ref().replace(" ", "_");
248        self.0.lock().ok().and_then(|l| {
249            l.metrics_i64
250                .iter()
251                .find(|m| (m.0 == name))
252                .map(|m| m.1.clone())
253        })
254    }
255
256    fn write_line(&self, now: time::Instant, reason: &str) -> ReadingsResult<()> {
257        self.0
258            .lock()
259            .map_err(|_| ReadingsError::PoisonedProbe)?
260            .write_line(now, reason)
261    }
262}