use crate::{
asprof::{self, AsProfError},
metadata::{AgentMetadata, ReportMetadata},
reporter::{Reporter, local::LocalReporter},
};
use std::{
fs::File,
io,
path::{Path, PathBuf},
time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
};
use thiserror::Error;
struct JfrFile {
active: std::fs::File,
inactive: std::fs::File,
}
impl JfrFile {
#[cfg(target_os = "linux")]
fn new() -> Result<Self, io::Error> {
Ok(Self {
active: tempfile::tempfile().unwrap(),
inactive: tempfile::tempfile().unwrap(),
})
}
#[cfg(not(target_os = "linux"))]
fn new() -> Result<Self, io::Error> {
Err(io::Error::other(
"async-profiler is only supported on Linux",
))
}
fn swap(&mut self) {
std::mem::swap(&mut self.active, &mut self.inactive);
}
#[cfg(target_os = "linux")]
fn file_path(file: &std::fs::File) -> PathBuf {
use std::os::fd::AsRawFd;
format!("/proc/self/fd/{}", file.as_raw_fd()).into()
}
#[cfg(not(target_os = "linux"))]
fn file_path(_file: &std::fs::File) -> PathBuf {
unimplemented!()
}
fn active_path(&self) -> PathBuf {
Self::file_path(&self.active)
}
fn inactive_path(&self) -> PathBuf {
Self::file_path(&self.inactive)
}
fn empty_inactive_file(&mut self) -> Result<(), io::Error> {
File::create(Self::file_path(&self.inactive))?;
tracing::debug!(message = "emptied the file");
Ok(())
}
}
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct ProfilerOptions {
pub native_mem: Option<String>,
cpu_interval: Option<u128>,
wall_clock_millis: Option<u128>,
}
const DEFAULT_CPU_INTERVAL_NANOS: u128 = 100_000_000;
const DEFAULT_WALL_CLOCK_INTERVAL_MILLIS: u128 = 1_000;
impl ProfilerOptions {
pub fn to_args_string(&self, jfr_file_path: &std::path::Path) -> String {
let mut args = format!(
"start,event=cpu,interval={},wall={}ms,jfr,cstack=dwarf,file={}",
self.cpu_interval.unwrap_or(DEFAULT_CPU_INTERVAL_NANOS),
self.wall_clock_millis
.unwrap_or(DEFAULT_WALL_CLOCK_INTERVAL_MILLIS),
jfr_file_path.display()
);
if let Some(ref native_mem) = self.native_mem {
args.push_str(&format!(",nativemem={native_mem}"));
}
args
}
}
#[derive(Debug, Default)]
pub struct ProfilerOptionsBuilder {
native_mem: Option<String>,
cpu_interval: Option<u128>,
wall_clock_millis: Option<u128>,
}
impl ProfilerOptionsBuilder {
pub fn with_native_mem(mut self, native_mem_interval: String) -> Self {
self.native_mem = Some(native_mem_interval);
self
}
pub fn with_native_mem_bytes(mut self, native_mem_interval: usize) -> Self {
self.native_mem = Some(native_mem_interval.to_string());
self
}
pub fn with_cpu_interval(mut self, cpu_interval: Duration) -> Self {
self.cpu_interval = Some(cpu_interval.as_nanos());
self
}
pub fn with_wall_clock_interval(mut self, wall_clock: Duration) -> Self {
self.wall_clock_millis = Some(wall_clock.as_millis());
self
}
pub fn build(self) -> ProfilerOptions {
ProfilerOptions {
native_mem: self.native_mem,
wall_clock_millis: self.wall_clock_millis,
cpu_interval: self.cpu_interval,
}
}
}
#[derive(Debug, Default)]
pub struct ProfilerBuilder {
reporting_interval: Option<Duration>,
reporter: Option<Box<dyn Reporter + Send + Sync>>,
agent_metadata: Option<AgentMetadata>,
profiler_options: Option<ProfilerOptions>,
}
impl ProfilerBuilder {
pub fn with_reporting_interval(mut self, i: Duration) -> ProfilerBuilder {
self.reporting_interval = Some(i);
self
}
#[cfg_attr(not(feature = "s3-no-defaults"), doc = "`S3Reporter`.")]
#[cfg_attr(feature = "s3-no-defaults", doc = "[`S3Reporter`].")]
#[cfg_attr(
feature = "s3-no-defaults",
doc = "[`S3Reporter`]: crate::reporter::s3::S3Reporter"
)]
#[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example.md"))]
pub fn with_reporter(mut self, r: impl Reporter + Send + Sync + 'static) -> ProfilerBuilder {
self.reporter = Some(Box::new(r));
self
}
pub fn with_local_reporter(mut self, path: impl Into<PathBuf>) -> ProfilerBuilder {
self.reporter = Some(Box::new(LocalReporter::new(path.into())));
self.with_custom_agent_metadata(AgentMetadata::NoMetadata)
}
#[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example-custom-metadata.md"))]
pub fn with_custom_agent_metadata(mut self, j: AgentMetadata) -> ProfilerBuilder {
self.agent_metadata = Some(j);
self
}
pub fn with_profiler_options(mut self, c: ProfilerOptions) -> ProfilerBuilder {
self.profiler_options = Some(c);
self
}
pub fn build(self) -> Profiler {
Profiler {
reporting_interval: self.reporting_interval.unwrap_or(Duration::from_secs(30)),
reporter: self.reporter.expect("reporter is required"),
agent_metadata: self.agent_metadata,
profiler_options: self.profiler_options.unwrap_or_default(),
}
}
}
enum Status {
Idle,
Starting,
Running(SystemTime),
}
struct ProfilerState<E: ProfilerEngine> {
jfr_file: Option<JfrFile>,
asprof: E,
status: Status,
profiler_options: ProfilerOptions,
}
impl<E: ProfilerEngine> ProfilerState<E> {
fn new(asprof: E, profiler_options: ProfilerOptions) -> Result<Self, io::Error> {
Ok(Self {
jfr_file: Some(JfrFile::new()?),
asprof,
status: Status::Idle,
profiler_options,
})
}
fn jfr_file_mut(&mut self) -> &mut JfrFile {
self.jfr_file.as_mut().unwrap()
}
async fn start(&mut self) -> Result<(), AsProfError> {
let active = self.jfr_file.as_ref().unwrap().active_path();
self.status = Status::Starting;
E::start_async_profiler(&self.asprof, &active, &self.profiler_options)?;
self.status = Status::Running(SystemTime::now());
Ok(())
}
fn stop(&mut self) -> Result<Option<SystemTime>, AsProfError> {
E::stop_async_profiler()?;
let status = std::mem::replace(&mut self.status, Status::Idle);
Ok(match status {
Status::Idle | Status::Starting => None,
Status::Running(since) => Some(since),
})
}
fn is_started(&self) -> bool {
matches!(self.status, Status::Running(_))
}
}
impl<E: ProfilerEngine> Drop for ProfilerState<E> {
fn drop(&mut self) {
match self.status {
Status::Running(_) => {
if let Err(err) = self.stop() {
std::mem::forget(self.jfr_file.take());
tracing::warn!(?err, "unable to stop profiler during drop glue");
}
}
Status::Idle => {}
Status::Starting => {
std::mem::forget(self.jfr_file.take());
}
}
}
}
pub(crate) trait ProfilerEngine: Send + Sync + 'static {
fn init_async_profiler() -> Result<(), asprof::AsProfError>;
fn start_async_profiler(
&self,
jfr_file_path: &Path,
options: &ProfilerOptions,
) -> Result<(), asprof::AsProfError>;
fn stop_async_profiler() -> Result<(), asprof::AsProfError>;
}
struct ProfilerTaskState<E: ProfilerEngine> {
state: ProfilerState<E>,
reporter: Box<dyn Reporter + Send + Sync>,
agent_metadata: Option<AgentMetadata>,
reporting_interval: Duration,
completed_normally: bool,
}
impl<E: ProfilerEngine> ProfilerTaskState<E> {
fn try_final_report(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let start = self.state.stop()?.ok_or("profiler was not running")?;
let jfr_file = self.state.jfr_file.as_ref().ok_or("jfr file missing")?;
let jfr_path = jfr_file.active_path();
if jfr_path.metadata()?.len() == 0 {
return Ok(());
}
let metadata = ReportMetadata {
instance: self
.agent_metadata
.as_ref()
.unwrap_or(&AgentMetadata::NoMetadata),
start: start.duration_since(UNIX_EPOCH)?,
end: SystemTime::now().duration_since(UNIX_EPOCH)?,
reporting_interval: self.reporting_interval,
};
self.reporter
.report_blocking(&jfr_path, &metadata)
.map_err(|e| e.to_string())?;
Ok(())
}
}
impl<E: ProfilerEngine> Drop for ProfilerTaskState<E> {
fn drop(&mut self) {
if self.completed_normally || !self.state.is_started() {
return;
}
tracing::info!("profiler task cancelled, attempting final report on drop");
if let Err(err) = self.try_final_report() {
tracing::warn!(?err, "failed to report on drop");
}
}
}
#[derive(Debug, Error)]
#[non_exhaustive]
enum TickError {
#[error(transparent)]
AsProf(#[from] AsProfError),
#[error(transparent)]
#[cfg(feature = "aws-metadata-no-defaults")]
Metadata(#[from] crate::metadata::aws::AwsProfilerMetadataError),
#[error("reporter: {0}")]
Reporter(Box<dyn std::error::Error + Send>),
#[error("broken clock: {0}")]
BrokenClock(#[from] SystemTimeError),
#[error("jfr read error: {0}")]
JfrRead(io::Error),
#[error("empty inactive file error: {0}")]
EmptyInactiveFile(io::Error),
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum SpawnError {
#[error(transparent)]
AsProf(#[from] asprof::AsProfError),
#[error("tempfile error")]
TempFile(#[source] io::Error),
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum SpawnThreadError {
#[error(transparent)]
AsProf(#[from] SpawnError),
#[error("constructing Tokio runtime")]
ConstructRt(#[source] io::Error),
}
enum Control {}
#[must_use = "dropping this stops the profiler, call .detach() to detach"]
pub struct RunningProfiler {
stop_channel: tokio::sync::oneshot::Sender<Control>,
join_handle: tokio::task::JoinHandle<()>,
}
impl RunningProfiler {
pub async fn stop(self) {
drop(self.stop_channel);
let _ = self.join_handle.await;
}
fn detach_inner(self) -> tokio::task::JoinHandle<()> {
tokio::task::spawn(async move {
let _abort_channel = self.stop_channel;
self.join_handle.await.ok();
})
}
pub fn detach(self) {
self.detach_inner();
}
fn spawn_attached(
self,
runtime: tokio::runtime::Runtime,
spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
) -> RunningProfilerThread {
RunningProfilerThread {
stop_channel: self.stop_channel,
join_handle: spawn_fn(Box::new(move || {
let _ = runtime.block_on(self.join_handle);
})),
}
}
fn spawn_detached(
self,
runtime: tokio::runtime::Runtime,
spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
) {
spawn_fn(Box::new(move || {
let _stop_channel = self.stop_channel;
let _ = runtime.block_on(self.join_handle);
}));
}
}
#[must_use = "dropping this stops the profiler, call .detach() to detach"]
pub struct RunningProfilerThread {
stop_channel: tokio::sync::oneshot::Sender<Control>,
join_handle: std::thread::JoinHandle<()>,
}
impl RunningProfilerThread {
pub fn stop(self) {
drop(self.stop_channel);
let _ = self.join_handle.join();
}
}
pub struct Profiler {
reporting_interval: Duration,
reporter: Box<dyn Reporter + Send + Sync>,
agent_metadata: Option<AgentMetadata>,
profiler_options: ProfilerOptions,
}
impl Profiler {
pub fn spawn(self) -> Result<tokio::task::JoinHandle<()>, SpawnError> {
self.spawn_controllable().map(RunningProfiler::detach_inner)
}
pub fn spawn_thread_to_runtime(
self,
runtime: tokio::runtime::Runtime,
spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
) -> Result<(), SpawnError> {
self.spawn_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn)
}
pub fn spawn_thread(self) -> Result<(), SpawnThreadError> {
let rt = tokio::runtime::Builder::new_current_thread()
.thread_name("asprof-worker".to_owned())
.enable_all()
.build()
.map_err(SpawnThreadError::ConstructRt)?;
let builder = std::thread::Builder::new().name("asprof-agent".to_owned());
self.spawn_thread_to_runtime(rt, |t| builder.spawn(t).expect("thread name contains nuls"))
.map_err(SpawnThreadError::AsProf)
}
fn spawn_thread_inner<E: ProfilerEngine>(
self,
asprof: E,
runtime: tokio::runtime::Runtime,
spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
) -> Result<(), SpawnError> {
let handle: RunningProfiler = runtime.block_on(async move { self.spawn_inner(asprof) })?;
handle.spawn_detached(runtime, spawn_fn);
Ok(())
}
pub fn spawn_controllable(self) -> Result<RunningProfiler, SpawnError> {
self.spawn_inner(asprof::AsProf::builder().build())
}
pub fn spawn_controllable_thread_to_runtime(
self,
runtime: tokio::runtime::Runtime,
spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
) -> Result<RunningProfilerThread, SpawnError> {
self.spawn_controllable_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn)
}
fn spawn_controllable_thread_inner<E: ProfilerEngine>(
self,
asprof: E,
runtime: tokio::runtime::Runtime,
spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
) -> Result<RunningProfilerThread, SpawnError> {
let handle = runtime.block_on(async move { self.spawn_inner(asprof) })?;
Ok(handle.spawn_attached(runtime, spawn_fn))
}
fn spawn_inner<E: ProfilerEngine>(self, asprof: E) -> Result<RunningProfiler, SpawnError> {
E::init_async_profiler()?;
tracing::info!("successfully initialized async profiler.");
let mut sampling_ticker = tokio::time::interval(self.reporting_interval);
let (stop_channel, mut stop_rx) = tokio::sync::oneshot::channel();
let join_handle = tokio::spawn(async move {
let state = match ProfilerState::new(asprof, self.profiler_options) {
Ok(state) => state,
Err(err) => {
tracing::error!(?err, "unable to create profiler state");
return;
}
};
let mut task = ProfilerTaskState {
state,
reporter: self.reporter,
agent_metadata: self.agent_metadata,
reporting_interval: self.reporting_interval,
completed_normally: false,
};
let mut done = false;
while !done {
tokio::select! {
biased;
r = &mut stop_rx, if !stop_rx.is_terminated() => {
match r {
Err(_) => {
tracing::info!("profiler stop requested, doing a final tick");
done = true;
}
}
}
_ = sampling_ticker.tick() => {
tracing::debug!("profiler timer woke up");
}
}
if let Err(err) = profiler_tick(
&mut task.state,
&mut task.agent_metadata,
&*task.reporter,
task.reporting_interval,
)
.await
{
match &err {
TickError::Reporter(_) => {
tracing::error!(?err, "error during profiling, continuing");
}
_stop => {
tracing::error!(?err, "error during profiling, stopping");
break;
}
}
}
}
task.completed_normally = true;
tracing::info!("profiling task finished");
});
Ok(RunningProfiler {
stop_channel,
join_handle,
})
}
}
async fn profiler_tick<E: ProfilerEngine>(
state: &mut ProfilerState<E>,
agent_metadata: &mut Option<AgentMetadata>,
reporter: &(dyn Reporter + Send + Sync),
reporting_interval: Duration,
) -> Result<(), TickError> {
if !state.is_started() {
state.start().await?;
return Ok(());
}
let Some(start) = state.stop()? else {
tracing::warn!("stopped the profiler but it wasn't running?");
return Ok(());
};
let start = start.duration_since(UNIX_EPOCH)?;
let end = SystemTime::now().duration_since(UNIX_EPOCH)?;
state
.jfr_file_mut()
.empty_inactive_file()
.map_err(TickError::EmptyInactiveFile)?;
state.jfr_file_mut().swap();
state.start().await?;
if agent_metadata.is_none() {
#[cfg(feature = "aws-metadata-no-defaults")]
let md = crate::metadata::aws::load_agent_metadata().await?;
#[cfg(not(feature = "aws-metadata-no-defaults"))]
let md = crate::metadata::AgentMetadata::NoMetadata;
tracing::debug!("loaded metadata");
agent_metadata.replace(md);
}
let report_metadata = ReportMetadata {
instance: agent_metadata.as_ref().unwrap(),
start,
end,
reporting_interval,
};
let jfr = tokio::fs::read(state.jfr_file_mut().inactive_path())
.await
.map_err(TickError::JfrRead)?;
reporter
.report(jfr, &report_metadata)
.await
.map_err(TickError::Reporter)?;
Ok(())
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::{self, AtomicBool, AtomicU32};
use test_case::test_case;
use super::*;
#[test]
fn test_jfr_file_drop() {
let mut jfr = JfrFile::new().unwrap();
std::fs::write(jfr.active_path(), b"Hello, 2!").unwrap();
jfr.swap();
assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"Hello, 2!");
jfr.empty_inactive_file().unwrap();
assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"");
}
struct MockProfilerEngine {
counter: AtomicU32,
}
impl ProfilerEngine for MockProfilerEngine {
fn init_async_profiler() -> Result<(), asprof::AsProfError> {
Ok(())
}
fn start_async_profiler(
&self,
jfr_file_path: &Path,
_options: &ProfilerOptions,
) -> Result<(), asprof::AsProfError> {
let contents = format!(
"JFR{}",
self.counter.fetch_add(1, atomic::Ordering::Relaxed)
);
std::fs::write(jfr_file_path, contents.as_bytes()).unwrap();
Ok(())
}
fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
Ok(())
}
}
struct MockReporter(tokio::sync::mpsc::Sender<(String, AgentMetadata)>);
impl std::fmt::Debug for MockReporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MockReporter").finish()
}
}
#[async_trait::async_trait]
impl Reporter for MockReporter {
async fn report(
&self,
jfr: Vec<u8>,
metadata: &ReportMetadata,
) -> Result<(), Box<dyn std::error::Error + Send>> {
self.0
.send((String::from_utf8(jfr).unwrap(), metadata.instance.clone()))
.await
.unwrap();
Ok(())
}
}
fn make_mock_profiler() -> (
Profiler,
tokio::sync::mpsc::Receiver<(String, AgentMetadata)>,
) {
let (tx, rx) = tokio::sync::mpsc::channel(1);
let agent = ProfilerBuilder::default()
.with_reporter(MockReporter(tx))
.with_custom_agent_metadata(AgentMetadata::Ec2AgentMetadata {
aws_account_id: "0".into(),
aws_region_id: "us-east-1".into(),
ec2_instance_id: "i-fake".into(),
ec2_instance_type: "t3.micro".into(),
})
.build();
(agent, rx)
}
#[tokio::test(start_paused = true)]
async fn test_profiler_agent() {
let e_md = AgentMetadata::Ec2AgentMetadata {
aws_account_id: "0".into(),
aws_region_id: "us-east-1".into(),
ec2_instance_id: "i-fake".into(),
ec2_instance_type: "t3.micro".into(),
};
let (agent, mut rx) = make_mock_profiler();
agent
.spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
counter: AtomicU32::new(0),
})
.unwrap()
.detach();
let (jfr, md) = rx.recv().await.unwrap();
assert_eq!(jfr, "JFR0");
assert_eq!(e_md, md);
let (jfr, md) = rx.recv().await.unwrap();
assert_eq!(jfr, "JFR1");
assert_eq!(e_md, md);
}
#[test_case(false; "uncontrollable")]
#[test_case(true; "controllable")]
fn test_profiler_local_rt(controllable: bool) {
let e_md = AgentMetadata::Ec2AgentMetadata {
aws_account_id: "0".into(),
aws_region_id: "us-east-1".into(),
ec2_instance_id: "i-fake".into(),
ec2_instance_type: "t3.micro".into(),
};
let (agent, mut rx) = make_mock_profiler();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.start_paused(true)
.build()
.unwrap();
let handle = if controllable {
Some(
agent
.spawn_controllable_thread_inner::<MockProfilerEngine>(
MockProfilerEngine {
counter: AtomicU32::new(0),
},
rt,
std::thread::spawn,
)
.unwrap(),
)
} else {
agent
.spawn_thread_inner::<MockProfilerEngine>(
MockProfilerEngine {
counter: AtomicU32::new(0),
},
rt,
std::thread::spawn,
)
.unwrap();
None
};
let (jfr, md) = rx.blocking_recv().unwrap();
assert_eq!(jfr, "JFR0");
assert_eq!(e_md, md);
let (jfr, md) = rx.blocking_recv().unwrap();
assert_eq!(jfr, "JFR1");
assert_eq!(e_md, md);
if let Some(handle) = handle {
let drain_thread =
std::thread::spawn(move || while let Some(_) = rx.blocking_recv() {});
handle.stop();
drain_thread.join().unwrap();
}
}
enum StopKind {
Delibrate,
Drop,
Abort,
}
#[tokio::test(start_paused = true)]
#[test_case(StopKind::Delibrate; "deliberate stop")]
#[test_case(StopKind::Drop; "drop stop")]
#[test_case(StopKind::Abort; "abort stop")]
async fn test_profiler_stop(stop_kind: StopKind) {
let e_md = AgentMetadata::Ec2AgentMetadata {
aws_account_id: "0".into(),
aws_region_id: "us-east-1".into(),
ec2_instance_id: "i-fake".into(),
ec2_instance_type: "t3.micro".into(),
};
let (agent, mut rx) = make_mock_profiler();
let profiler_ref = agent
.spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
counter: AtomicU32::new(0),
})
.unwrap();
let (jfr, md) = rx.recv().await.unwrap();
assert_eq!(jfr, "JFR0");
assert_eq!(e_md, md);
let (jfr, md) = rx.recv().await.unwrap();
assert_eq!(jfr, "JFR1");
assert_eq!(e_md, md);
match stop_kind {
StopKind::Drop => drop(profiler_ref),
StopKind::Delibrate => {
tokio::time::timeout(Duration::from_millis(1), profiler_ref.stop())
.await
.unwrap();
}
StopKind::Abort => {
profiler_ref.detach_inner().abort();
}
}
let (jfr, md) = tokio::time::timeout(Duration::from_millis(1), rx.recv())
.await
.unwrap()
.unwrap();
assert_eq!(jfr, "JFR2");
assert_eq!(e_md, md);
assert!(rx.recv().await.is_none());
}
struct StopErrorProfilerEngine {
start_error: bool,
counter: Arc<AtomicBool>,
}
impl ProfilerEngine for StopErrorProfilerEngine {
fn init_async_profiler() -> Result<(), asprof::AsProfError> {
Ok(())
}
fn start_async_profiler(
&self,
jfr_file_path: &Path,
_options: &ProfilerOptions,
) -> Result<(), asprof::AsProfError> {
let jfr_file_path = jfr_file_path.to_owned();
std::fs::write(&jfr_file_path, "JFR").unwrap();
let counter = self.counter.clone();
tokio::task::spawn(async move {
tokio::time::sleep(Duration::from_secs(5)).await;
assert_eq!(std::fs::read_to_string(jfr_file_path).unwrap(), "JFR");
counter.store(true, atomic::Ordering::Release);
});
if self.start_error {
Err(asprof::AsProfError::AsyncProfilerError("error".into()))
} else {
Ok(())
}
}
fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
Err(asprof::AsProfError::AsyncProfilerError("error".into()))
}
}
#[tokio::test(start_paused = true)]
#[test_case(false; "error on stop")]
#[test_case(true; "error on start")]
async fn test_profiler_error(start_error: bool) {
let (agent, mut rx) = make_mock_profiler();
let counter = Arc::new(AtomicBool::new(false));
let engine = StopErrorProfilerEngine {
start_error,
counter: counter.clone(),
};
let handle = agent.spawn_inner(engine).unwrap().detach_inner();
assert!(rx.recv().await.is_none());
for _ in 0..100 {
tokio::time::sleep(Duration::from_secs(1)).await;
if counter.load(atomic::Ordering::Acquire) {
handle.await.unwrap(); return;
}
}
panic!("didn't read from file");
}
#[test]
fn test_profiler_options_to_args_string_default() {
let opts = ProfilerOptions::default();
let dummy_path = Path::new("/tmp/test.jfr");
let args = opts.to_args_string(dummy_path);
assert!(
args.contains("start,event=cpu,interval=100000000,wall=1000ms,jfr,cstack=dwarf"),
"Default args string not constructed correctly"
);
assert!(args.contains("file=/tmp/test.jfr"));
assert!(!args.contains("nativemem="));
}
#[test]
fn test_profiler_options_to_args_string_with_native_mem() {
let opts = ProfilerOptions {
native_mem: Some("10m".to_string()),
wall_clock_millis: None,
cpu_interval: None,
};
let dummy_path = Path::new("/tmp/test.jfr");
let args = opts.to_args_string(dummy_path);
assert!(args.contains("nativemem=10m"));
}
#[test]
fn test_profiler_options_builder() {
let opts = ProfilerOptionsBuilder::default()
.with_native_mem_bytes(5000000)
.build();
assert_eq!(opts.native_mem, Some("5000000".to_string()));
}
#[test]
fn test_profiler_options_builder_all_options() {
let opts = ProfilerOptionsBuilder::default()
.with_native_mem_bytes(5000000)
.with_cpu_interval(Duration::from_secs(1))
.with_wall_clock_interval(Duration::from_secs(10))
.build();
let dummy_path = Path::new("/tmp/test.jfr");
let args = opts.to_args_string(dummy_path);
assert_eq!(
args,
"start,event=cpu,interval=1000000000,wall=10000ms,jfr,cstack=dwarf,file=/tmp/test.jfr,nativemem=5000000"
);
}
#[test]
fn test_local_reporter_has_no_metadata() {
let reporter = ProfilerBuilder::default().with_local_reporter(".");
assert_eq!(
format!("{:?}", reporter.reporter),
r#"Some(LocalReporter { directory: "." })"#
);
match reporter.agent_metadata {
Some(AgentMetadata::NoMetadata) => {}
bad => panic!("{bad:?}"),
};
}
struct BlockingMockReporter {
async_tx: tokio::sync::mpsc::Sender<String>,
blocking_reports: Arc<std::sync::Mutex<Vec<String>>>,
}
impl std::fmt::Debug for BlockingMockReporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BlockingMockReporter").finish()
}
}
#[async_trait::async_trait]
impl Reporter for BlockingMockReporter {
async fn report(
&self,
jfr: Vec<u8>,
_metadata: &ReportMetadata,
) -> Result<(), Box<dyn std::error::Error + Send>> {
self.async_tx
.send(String::from_utf8(jfr).unwrap())
.await
.unwrap();
Ok(())
}
fn report_blocking(
&self,
jfr_path: &Path,
_metadata: &ReportMetadata,
) -> Result<(), Box<dyn std::error::Error + Send>> {
let jfr = std::fs::read(jfr_path).map_err(|e| Box::new(e) as _)?;
self.blocking_reports
.lock()
.unwrap()
.push(String::from_utf8(jfr).unwrap());
Ok(())
}
}
#[test]
fn test_profiler_report_on_drop() {
let blocking_reports = Arc::new(std::sync::Mutex::new(Vec::new()));
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.start_paused(true)
.build()
.unwrap();
let reports_clone = blocking_reports.clone();
rt.block_on(async {
let (async_tx, mut async_rx) = tokio::sync::mpsc::channel::<String>(10);
let agent = ProfilerBuilder::default()
.with_reporter(BlockingMockReporter {
async_tx,
blocking_reports: reports_clone,
})
.with_custom_agent_metadata(AgentMetadata::NoMetadata)
.build();
agent
.spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
counter: AtomicU32::new(0),
})
.unwrap()
.detach();
let jfr = async_rx.recv().await.unwrap();
assert_eq!(jfr, "JFR0");
});
drop(rt);
let reports = blocking_reports.lock().unwrap();
assert_eq!(
reports.len(),
1,
"expected exactly one blocking report on drop"
);
assert_eq!(reports[0], "JFR1");
}
}