use crate::ToWasmtimeResult as _;
use crate::prelude::*;
use crate::profiling_agent::ProfilingAgent;
use crate::vm::Interpreter;
use pulley_interpreter::profile::{ExecutingPc, Recorder, Samples};
use std::mem;
use std::sync::mpsc;
use std::sync::{Arc, Condvar, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
struct PulleyAgent {
state: Arc<State>,
sampling_thread: Option<JoinHandle<()>>,
recording_thread: Option<JoinHandle<()>>,
}
struct State {
recorder: Mutex<Recorder>,
sampling: Mutex<SamplingState>,
sampling_done: Condvar,
sampling_freq: u32,
sampling_flush_amt: u32,
}
#[derive(Default)]
struct SamplingState {
interpreters: Vec<ExecutingPc>,
samples: Samples,
}
pub fn new() -> Result<Box<dyn ProfilingAgent>> {
let pid = std::process::id();
let filename = format!("./pulley-{pid}.data");
let mut agent = PulleyAgent {
state: Arc::new(State {
recorder: Mutex::new(Recorder::new(&filename).to_wasmtime_result()?),
sampling: Default::default(),
sampling_done: Condvar::new(),
sampling_freq: std::env::var("PULLEY_SAMPLING_FREQ")
.ok()
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(1_000),
sampling_flush_amt: std::env::var("PULLEY_SAMPLING_FLUSH_AMT")
.ok()
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(20_000),
}),
sampling_thread: None,
recording_thread: None,
};
let (tx, rx) = mpsc::channel();
let state = agent.state.clone();
agent.sampling_thread = Some(thread::spawn(move || sampling_thread(&state, tx)));
let state = agent.state.clone();
agent.recording_thread = Some(thread::spawn(move || recording_thread(&state, rx)));
Ok(Box::new(agent))
}
impl ProfilingAgent for PulleyAgent {
fn register_function(&self, name: &str, code: &[u8]) {
self.state
.recorder
.lock()
.unwrap()
.add_function(name, code)
.expect("failed to register pulley function");
}
fn register_interpreter(&self, interpreter: &Interpreter) {
let pc = interpreter.pulley().executing_pc();
self.state
.sampling
.lock()
.unwrap()
.interpreters
.push(pc.clone());
}
}
fn sampling_thread(state: &State, to_record: mpsc::Sender<Samples>) {
let between_ticks = Duration::new(0, 1_000_000_000 / state.sampling_freq);
let start = Instant::now();
let mut next_sample = start + between_ticks;
let record = |sampling: &mut SamplingState| {
if sampling.samples.num_samples() == 0 {
return;
}
let samples = mem::take(&mut sampling.samples);
to_record.send(samples).unwrap();
};
let mut sampling = state.sampling.lock().unwrap();
loop {
let dur = next_sample
.checked_duration_since(Instant::now())
.unwrap_or(Duration::new(0, 0));
let (guard, result) = state.sampling_done.wait_timeout(sampling, dur).unwrap();
sampling = guard;
if !result.timed_out() {
break;
}
next_sample += between_ticks;
let SamplingState {
interpreters,
samples,
} = &mut *sampling;
interpreters.retain(|a| !a.is_done());
for interpreter in interpreters.iter() {
if let Some(pc) = interpreter.get() {
samples.append(pc);
}
}
if samples.num_samples() > state.sampling_flush_amt {
record(&mut sampling);
}
}
record(&mut sampling);
}
fn recording_thread(state: &State, to_record: mpsc::Receiver<Samples>) {
for mut samples in to_record {
state
.recorder
.lock()
.unwrap()
.add_samples(&mut samples)
.expect("failed to write samples");
}
state.recorder.lock().unwrap().flush().unwrap();
}
impl Drop for PulleyAgent {
fn drop(&mut self) {
self.state.sampling_done.notify_one();
self.sampling_thread.take().unwrap().join().unwrap();
self.recording_thread.take().unwrap().join().unwrap();
}
}