pyroscope_rbspy_oncpu/recorder/
record.rs

1use anyhow::{Context, Error, Result};
2use std::fs::File;
3use std::path::PathBuf;
4use std::sync::{Arc, Mutex};
5
6use crate::storage::Store;
7use crate::ui::summary;
8
9/// A configuration bundle for the recorder
10pub struct Config {
11    /// The format to use for recorded traces. See `OutputFormat` for a list of available options.
12    pub format: crate::core::types::OutputFormat,
13    /// Where to write rbspy's raw trace output, which can be used for later processing.
14    pub raw_path: Option<PathBuf>,
15    /// Where to write rbspy's output. If `-` is given, output is written to standard output.
16    pub out_path: Option<PathBuf>,
17    /// The process ID (PID) of the process to profile. This is usually a ruby process, but rbspy
18    /// will locate and profile any ruby subprocesses of the target process if `with_subprocesses`
19    /// is enabled.
20    pub pid: crate::core::process::Pid,
21    /// Whether to profile the target process (given by `pid`) as well as its child processes, and
22    /// their child processes, and so on. Default: `false`.
23    pub with_subprocesses: bool,
24    /// The number of traces that should be collected each second. Default: `100`.
25    pub sample_rate: u32,
26    /// The length of time that the recorder should run before stopping. Default: none (run until
27    /// interrupted).
28    pub maybe_duration: Option<std::time::Duration>,
29    /// Minimum flame width. Applies to flamegraph output only. If your sample has many small
30    /// functions in it and is difficult to read, then consider increasing this value.
31    /// Default: 0.1.
32    pub flame_min_width: f64,
33    /// Locks the process when a sample is being taken.
34    ///
35    /// You should enable this option for the most accurate samples. However, it briefly
36    /// stops the process from executing and can affect performance. The performance impact
37    /// is most noticeable in CPU-bound ruby programs or when a high sampling rate is used.
38    pub lock_process: bool,
39    /// Forces the recorder to use the given Ruby version. If not given, rbspy will attempt to
40    /// determine the Ruby version from the running process.
41    ///
42    /// This option shouldn't be needed unless you're testing a pre-release Ruby version.
43    pub force_version: Option<String>,
44    pub on_cpu: bool,
45}
46
47pub struct Recorder {
48    format: crate::core::types::OutputFormat,
49    flame_min_width: f64,
50    out_path: Option<PathBuf>,
51    raw_path: Option<PathBuf>,
52    sample_rate: u32,
53    sampler: crate::sampler::Sampler,
54    summary: Arc<Mutex<summary::Stats>>,
55}
56
57impl Recorder {
58    pub fn new(config: Config) -> Self {
59        let sampler = crate::sampler::Sampler::new(
60            config.pid,
61            config.sample_rate,
62            config.lock_process,
63            config.maybe_duration,
64            config.with_subprocesses,
65            config.force_version,
66            config.on_cpu,
67        );
68
69        Recorder {
70            format: config.format,
71            flame_min_width: config.flame_min_width,
72            out_path: config.out_path,
73            raw_path: config.raw_path,
74            sample_rate: config.sample_rate,
75            sampler,
76            summary: Arc::new(Mutex::new(summary::Stats::new())),
77        }
78    }
79
80    /// Records traces until the process exits or the stop function is called
81    pub fn record(&self) -> Result<(), Error> {
82        // Create the sender/receiver channels and start the child threads off collecting stack traces
83        // from each target process.
84        // Give the child threads a buffer in case we fall a little behind with aggregating the stack
85        // traces, but not an unbounded buffer.
86        let (trace_sender, trace_receiver) = std::sync::mpsc::sync_channel(100);
87        let (result_sender, result_receiver) = std::sync::mpsc::channel();
88        self.sampler.start(trace_sender, result_sender)?;
89
90        // Aggregate stack traces as we receive them from the threads that are collecting them
91        // Aggregate to 3 places: the raw output (`.raw.gz`), some summary statistics we display live,
92        // and the formatted output (a flamegraph or something)
93        let mut out = None;
94        if self.out_path.is_some() {
95            out = Some(self.format.clone().outputter(self.flame_min_width));
96        }
97        let mut raw_store = None;
98        if let Some(raw_path) = &self.raw_path {
99            raw_store = Some(Store::new(&raw_path, self.sample_rate)?);
100        }
101
102        for trace in trace_receiver {
103            if let Some(out) = &mut out {
104                out.record(&trace)?;
105            }
106            if let Some(raw_store) = &mut raw_store {
107                raw_store.write(&trace)?;
108            }
109
110            let mut summary = self.summary.lock().unwrap();
111            summary.add_function_name(&trace.trace);
112        }
113
114        // Finish writing all data to disk
115        if let (Some(out), Some(out_path)) = (&mut out, self.out_path.as_ref()) {
116            if out_path.display().to_string() == "-" {
117                out.complete(&mut std::io::stdout())?;
118            } else {
119                let mut out_file = File::create(&out_path).context(format!(
120                    "Failed to create output file {}",
121                    &out_path.display()
122                ))?;
123                out.complete(&mut out_file)?;
124            }
125        }
126        if let Some(raw_store) = raw_store {
127            raw_store.complete();
128        }
129
130        // Check for errors from the child threads. Ignore errors unless every single thread
131        // returned an error. If that happens, return the last error. This lets rbspy successfully
132        // record processes even if the parent thread isn't a Ruby process.
133        let mut num_ok = 0;
134        let mut last_result = Ok(());
135        for result in result_receiver {
136            if result.is_ok() {
137                num_ok += 1;
138            }
139            last_result = result;
140        }
141
142        match num_ok {
143            0 => last_result,
144            _ => Ok(()),
145        }
146    }
147
148    /// Stops the recorder
149    pub fn stop(&self) {
150        self.sampler.stop();
151    }
152
153    /// Writes a summary of collected traces
154    pub fn write_summary(&self, w: &mut dyn std::io::Write) -> Result<(), Error> {
155        let width = match term_size::dimensions() {
156            Some((w, _)) => Some(w as usize),
157            None => None,
158        };
159        let timing_error_traces = self.sampler.timing_error_traces();
160        let total_traces = self.sampler.total_traces();
161        let percent_timing_error = (timing_error_traces as f64) / (total_traces as f64) * 100.0;
162
163        let summary = self.summary.lock().unwrap();
164        writeln!(
165            w,
166            "Time since start: {}s. Press Ctrl+C to stop.",
167            summary.elapsed_time().as_secs()
168        )?;
169
170        writeln!(w, "Summary of profiling data so far:")?;
171        summary.write_top_n(w, 20, width)?;
172
173        if total_traces > 100 && percent_timing_error > 0.5 {
174            // Only include this warning if timing errors are more than 0.5% of total traces. rbspy
175            // is a statistical profiler, so smaller differences don't really matter.
176            writeln!(w, "{:.1}% ({}/{}) of stack traces were sampled late because we couldn't sample at expected rate; results may be inaccurate. Current rate: {}. Try sampling at a lower rate with `--rate`.", percent_timing_error, timing_error_traces, total_traces, self.sample_rate)?;
177        }
178        Ok(())
179    }
180}
181
182impl Drop for Recorder {
183    fn drop(&mut self) {
184        self.stop();
185    }
186}