1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
use pyroscope::{
    backend::{
        Backend, BackendImpl, BackendUninitialized, Report, Rule, Ruleset, StackBuffer, StackFrame,
        StackTrace,
    },
    error::{PyroscopeError, Result},
};
use rbspy::sampler::Sampler;
use std::{
    ops::Deref,
    sync::{
        mpsc::{channel, sync_channel, Receiver, Sender, SyncSender},
        Arc, Mutex,
    },
    thread::JoinHandle,
};

/// Short-hand function for creating a new Rbspy backend.
pub fn rbspy_backend(config: RbspyConfig) -> BackendImpl<BackendUninitialized> {
    BackendImpl::new(Box::new(Rbspy::new(config)))
}

/// Rbspy Configuration
#[derive(Debug)]
pub struct RbspyConfig {
    /// Process to monitor
    pid: Option<i32>,
    /// Sampling rate
    sample_rate: u32,
    /// Lock Process while sampling
    lock_process: bool,
    /// Profiling duration. None for infinite.
    time_limit: Option<core::time::Duration>,
    /// Include subprocesses
    with_subprocesses: bool,
}

impl Default for RbspyConfig {
    fn default() -> Self {
        RbspyConfig {
            pid: None,
            sample_rate: 100,
            lock_process: false,
            time_limit: None,
            with_subprocesses: false,
        }
    }
}

impl RbspyConfig {
    /// Create a new RbspyConfig
    pub fn new(pid: i32) -> Self {
        RbspyConfig {
            pid: Some(pid),
            ..Default::default()
        }
    }

    /// Set the sampling rate
    pub fn sample_rate(self, sample_rate: u32) -> Self {
        RbspyConfig {
            sample_rate,
            ..self
        }
    }

    /// Set the lock process flag
    pub fn lock_process(self, lock_process: bool) -> Self {
        RbspyConfig {
            lock_process,
            ..self
        }
    }

    /// Set the time limit
    pub fn time_limit(self, time_limit: Option<core::time::Duration>) -> Self {
        RbspyConfig { time_limit, ..self }
    }

    /// Include subprocesses
    pub fn with_subprocesses(self, with_subprocesses: bool) -> Self {
        RbspyConfig {
            with_subprocesses,
            ..self
        }
    }
}

/// Rbspy Backend
#[derive(Default)]
pub struct Rbspy {
    /// Rbspy Configuration
    config: RbspyConfig,
    /// Rbspy Sampler
    sampler: Option<Sampler>,
    /// StackTrace Receiver
    //stack_receiver: Option<Receiver<rbspy::StackTrace>>,
    /// Error Receiver
    error_receiver: Option<Receiver<std::result::Result<(), anyhow::Error>>>,
    /// Profiling buffer
    buffer: Arc<Mutex<StackBuffer>>,
    /// Rulset
    ruleset: Ruleset,
}

impl std::fmt::Debug for Rbspy {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "Rbspy Backend")
    }
}

impl Rbspy {
    /// Create a new Rbspy instance
    pub fn new(config: RbspyConfig) -> Self {
        Rbspy {
            sampler: None,
            //stack_receiver: None,
            error_receiver: None,
            config,
            buffer: Arc::new(Mutex::new(StackBuffer::default())),
            ruleset: Ruleset::default(),
        }
    }
}

// Type aliases
type ErrorSender = Sender<std::result::Result<(), anyhow::Error>>;
type ErrorReceiver = Receiver<std::result::Result<(), anyhow::Error>>;

impl Backend for Rbspy {
    /// Return the backend name
    fn spy_name(&self) -> Result<String> {
        Ok("rbspy".to_string())
    }

    /// Return the sample rate
    fn sample_rate(&self) -> Result<u32> {
        Ok(self.config.sample_rate)
    }

    /// Add a rule to the ruleset
    fn add_rule(&self, rule: Rule) -> Result<()> {
        self.ruleset.add_rule(rule)?;

        Ok(())
    }

    /// Remove a rule from the ruleset
    fn remove_rule(&self, rule: Rule) -> Result<()> {
        self.ruleset.remove_rule(rule)?;

        Ok(())
    }

    /// Initialize the backend
    fn initialize(&mut self) -> Result<()> {
        // Check if a process ID is set
        if self.config.pid.is_none() {
            return Err(PyroscopeError::new("Rbspy: No Process ID Specified"));
        }

        // Create Sampler
        self.sampler = Some(Sampler::new(
            self.config.pid.unwrap(), // unwrap is safe because of check above
            self.config.sample_rate,
            self.config.lock_process,
            self.config.time_limit,
            self.config.with_subprocesses,
            None,
        ));

        // Channel for Errors generated by the RubySpy Sampler
        let (error_sender, error_receiver): (ErrorSender, ErrorReceiver) = channel();

        // This is provides enough space for 100 threads.
        // It might be a better idea to figure out how many threads are running and determine the
        // size of the channel based on that.
        let queue_size: usize = self.config.sample_rate as usize * 10 * 100;

        // Channel for StackTraces generated by the RubySpy Sampler
        let (stack_sender, stack_receiver): (
            SyncSender<rbspy::StackTrace>,
            Receiver<rbspy::StackTrace>,
        ) = sync_channel(queue_size);

        // Set Error and Stack Receivers
        //self.stack_receiver = Some(stack_receiver);
        self.error_receiver = Some(error_receiver);

        // Get the Sampler
        let sampler = self
            .sampler
            .as_ref()
            .ok_or_else(|| PyroscopeError::new("Rbspy: Sampler is not set"))?;

        // Start the Sampler
        sampler
            .start(stack_sender, error_sender)
            .map_err(|e| PyroscopeError::new(&format!("Rbspy: Sampler Error: {}", e)))?;

        // Start own thread
        //
        // Get an Arc reference to the Report Buffer
        let buffer = self.buffer.clone();

        // ruleset reference
        let ruleset = self.ruleset.clone();

        let _: JoinHandle<Result<()>> = std::thread::spawn(move || {
            // Iterate over the StackTrace
            while let Ok(stack_trace) = stack_receiver.recv() {
                // convert StackTrace
                let own_trace: StackTrace = Into::<StackTraceWrapper>::into(stack_trace).into();

                let stacktrace = own_trace + &ruleset;

                buffer.lock()?.record(stacktrace)?;
            }

            Ok(())
        });

        Ok(())
    }

    fn shutdown(self: Box<Self>) -> Result<()> {
        // Stop Sampler
        self.sampler
            .as_ref()
            .ok_or_else(|| PyroscopeError::new("Rbspy: Sampler is not set"))?
            .stop();

        Ok(())
    }

    fn report(&mut self) -> Result<Vec<Report>> {
        // Get an Arc reference to the Report Buffer
        let buffer = self.buffer.clone();

        let v8: StackBuffer = buffer.lock()?.deref().to_owned();
        let reports: Vec<Report> = v8.into();

        buffer.lock()?.clear();

        // Return the writer's buffer
        Ok(reports)
    }
}

struct StackFrameWrapper(StackFrame);

impl From<StackFrameWrapper> for StackFrame {
    fn from(frame: StackFrameWrapper) -> Self {
        frame.0
    }
}

impl From<rbspy::StackFrame> for StackFrameWrapper {
    fn from(frame: rbspy::StackFrame) -> Self {
        StackFrameWrapper(StackFrame {
            module: None,
            name: Some(frame.name),
            filename: Some(frame.relative_path.clone()),
            relative_path: Some(frame.relative_path),
            absolute_path: frame.absolute_path,
            line: Some(frame.lineno),
        })
    }
}

struct StackTraceWrapper(StackTrace);

impl From<StackTraceWrapper> for StackTrace {
    fn from(trace: StackTraceWrapper) -> Self {
        trace.0
    }
}

impl From<rbspy::StackTrace> for StackTraceWrapper {
    fn from(trace: rbspy::StackTrace) -> Self {
        StackTraceWrapper(StackTrace::new(
            trace.pid.map(|pid| pid as u32),
            trace.thread_id.map(|id| id as u64),
            None,
            trace
                .iter()
                .map(|frame| Into::<StackFrameWrapper>::into(frame.clone()).into())
                .collect(),
        ))
    }
}