async_profiler_agent/profiler.rs
1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! A profiler that periodically uploads profiling samples of your program to a [Reporter]
5
6use crate::{
7 asprof::{self, AsProfError},
8 metadata::{AgentMetadata, ReportMetadata},
9 reporter::{local::LocalReporter, Reporter},
10};
11use std::{
12 fs::File,
13 io,
14 path::{Path, PathBuf},
15 time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
16};
17use thiserror::Error;
18
19struct JfrFile {
20 active: std::fs::File,
21 inactive: std::fs::File,
22}
23
24impl JfrFile {
25 #[cfg(target_os = "linux")]
26 fn new() -> Result<Self, io::Error> {
27 Ok(Self {
28 active: tempfile::tempfile().unwrap(),
29 inactive: tempfile::tempfile().unwrap(),
30 })
31 }
32
33 #[cfg(not(target_os = "linux"))]
34 fn new() -> Result<Self, io::Error> {
35 io::Error::new(
36 io::ErrorKind::Other,
37 "async-profiler is only supported on Linux",
38 )
39 }
40
41 fn swap(&mut self) {
42 std::mem::swap(&mut self.active, &mut self.inactive);
43 }
44
45 #[cfg(target_os = "linux")]
46 fn file_path(file: &std::fs::File) -> PathBuf {
47 use std::os::fd::AsRawFd;
48
49 format!("/proc/self/fd/{}", file.as_raw_fd()).into()
50 }
51
52 #[cfg(not(target_os = "linux"))]
53 fn file_path(_file: &std::fs::File) -> PathBuf {
54 unimplemented!()
55 }
56
57 fn active_path(&self) -> PathBuf {
58 Self::file_path(&self.active)
59 }
60
61 fn inactive_path(&self) -> PathBuf {
62 Self::file_path(&self.inactive)
63 }
64
65 fn empty_inactive_file(&mut self) -> Result<(), io::Error> {
66 // Empty the file, or create it for the first time if the profiler hasn't
67 // started yet.
68 File::create(Self::file_path(&self.inactive))?;
69 tracing::debug!(message = "emptied the file");
70 Ok(())
71 }
72}
73
74/// Options for configuring the async-profiler behavior.
75/// Currently supports:
76/// - Native memory allocation tracking
77#[derive(Debug, Default)]
78#[non_exhaustive]
79pub struct ProfilerOptions {
80 /// If set, the profiler will collect information about
81 /// native memory allocations.
82 ///
83 /// The value is the interval in bytes or in other units,
84 /// if followed by k (kilobytes), m (megabytes), or g (gigabytes).
85 /// For example, `"10m"` will sample an allocation for every
86 /// 10 megabytes of memory allocated. Passing `"0"` will sample
87 /// all allocations.
88 ///
89 /// See [ProfilingModes in the async-profiler docs] for more details.
90 ///
91 /// [ProfilingModes in the async-profiler docs]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilingModes.md#native-memory-leaks
92 pub native_mem: Option<String>,
93 cpu_interval: Option<u128>,
94 wall_clock_millis: Option<u128>,
95}
96
97const DEFAULT_CPU_INTERVAL_NANOS: u128 = 100_000_000;
98const DEFAULT_WALL_CLOCK_INTERVAL_MILLIS: u128 = 1_000;
99
100impl ProfilerOptions {
101 /// Convert the profiler options to a string of arguments for the async-profiler.
102 pub fn to_args_string(&self, jfr_file_path: &std::path::Path) -> String {
103 let mut args = format!(
104 "start,event=cpu,interval={},wall={}ms,jfr,cstack=dwarf,file={}",
105 self.cpu_interval.unwrap_or(DEFAULT_CPU_INTERVAL_NANOS),
106 self.wall_clock_millis
107 .unwrap_or(DEFAULT_WALL_CLOCK_INTERVAL_MILLIS),
108 jfr_file_path.display()
109 );
110 if let Some(ref native_mem) = self.native_mem {
111 args.push_str(&format!(",nativemem={native_mem}"));
112 }
113 args
114 }
115}
116
117/// Builder for [`ProfilerOptions`].
118#[derive(Debug, Default)]
119pub struct ProfilerOptionsBuilder {
120 native_mem: Option<String>,
121 cpu_interval: Option<u128>,
122 wall_clock_millis: Option<u128>,
123}
124
125impl ProfilerOptionsBuilder {
126 /// Same as [ProfilerOptionsBuilder::with_native_mem_bytes], but pass
127 /// the string input directly to async_profiler.
128 ///
129 /// The value is the interval in bytes or in other units,
130 /// if followed by k (kilobytes), m (megabytes), or g (gigabytes).
131 ///
132 /// Prefer using [ProfilerOptionsBuilder::with_native_mem_bytes], since it's
133 /// type-checked.
134 ///
135 /// ### Examples
136 ///
137 /// This will sample allocations for every 10 megabytes allocated:
138 ///
139 /// ```
140 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
141 /// # use async_profiler_agent::profiler::SpawnError;
142 /// # fn main() -> Result<(), SpawnError> {
143 /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build();
144 /// let profiler = ProfilerBuilder::default()
145 /// .with_profiler_options(opts)
146 /// .with_local_reporter("/tmp/profiles")
147 /// .build();
148 /// # if false { // don't spawn the profiler in doctests
149 /// profiler.spawn()?;
150 /// # }
151 /// # Ok(())
152 /// # }
153 /// ```
154 pub fn with_native_mem(mut self, native_mem_interval: String) -> Self {
155 self.native_mem = Some(native_mem_interval);
156 self
157 }
158
159 /// If set, the profiler will collect information about
160 /// native memory allocations.
161 ///
162 /// The argument passed is the profiling interval - the profiler will
163 /// sample allocations every about that many bytes.
164 ///
165 /// See [ProfilingModes in the async-profiler docs] for more details.
166 ///
167 /// [ProfilingModes in the async-profiler docs]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilingModes.md#native-memory-leaks
168 ///
169 /// ### Examples
170 ///
171 /// This will sample allocations for every 10 megabytes allocated:
172 ///
173 /// ```
174 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
175 /// # use async_profiler_agent::profiler::SpawnError;
176 /// # fn main() -> Result<(), SpawnError> {
177 /// let opts = ProfilerOptionsBuilder::default().with_native_mem_bytes(10_000_000).build();
178 /// let profiler = ProfilerBuilder::default()
179 /// .with_profiler_options(opts)
180 /// .with_local_reporter("/tmp/profiles")
181 /// .build();
182 /// # if false { // don't spawn the profiler in doctests
183 /// profiler.spawn()?;
184 /// # }
185 /// # Ok(())
186 /// # }
187 /// ```
188 ///
189 /// This will sample every allocation (potentially slow):
190 /// ```
191 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
192 /// # use async_profiler_agent::profiler::SpawnError;
193 /// # fn main() -> Result<(), SpawnError> {
194 /// let opts = ProfilerOptionsBuilder::default().with_native_mem_bytes(0).build();
195 /// let profiler = ProfilerBuilder::default()
196 /// .with_profiler_options(opts)
197 /// .with_local_reporter("/tmp/profiles")
198 /// .build();
199 /// # if false { // don't spawn the profiler in doctests
200 /// profiler.spawn()?;
201 /// # }
202 /// # Ok(())
203 /// # }
204 /// ```
205 pub fn with_native_mem_bytes(mut self, native_mem_interval: usize) -> Self {
206 self.native_mem = Some(native_mem_interval.to_string());
207 self
208 }
209
210 /// Sets the interval in which the profiler will collect
211 /// CPU-time samples, via the [async-profiler `interval` option].
212 ///
213 /// CPU-time samples (JFR `jdk.ExecutionSample`) sample only threads that
214 /// are currently running on a CPU, not threads that are sleeping.
215 ///
216 /// It can use a higher frequency than wall-clock sampling since the
217 /// number of the threads that are running on a CPU at a given time is
218 /// naturally limited by the number of CPUs, while the number of sleeping
219 /// threads can be much larger.
220 ///
221 /// The default is to do a CPU-time sample every 100 milliseconds.
222 ///
223 /// The async-profiler agent collects both CPU time and wall-clock time
224 /// samples, so this function should normally be used along with
225 /// [ProfilerOptionsBuilder::with_wall_clock_interval].
226 ///
227 /// [async-profiler `interval` option]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilerOptions.md#options-applicable-to-any-output-format
228 ///
229 /// ### Examples
230 ///
231 /// This will sample allocations for every 10 CPU milliseconds (when running)
232 /// and 100 wall-clock milliseconds (running or sleeping):
233 ///
234 /// ```
235 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
236 /// # use async_profiler_agent::profiler::SpawnError;
237 /// # use std::time::Duration;
238 /// # fn main() -> Result<(), SpawnError> {
239 /// let opts = ProfilerOptionsBuilder::default()
240 /// .with_cpu_interval(Duration::from_millis(10))
241 /// .with_wall_clock_interval(Duration::from_millis(100))
242 /// .build();
243 /// let profiler = ProfilerBuilder::default()
244 /// .with_profiler_options(opts)
245 /// .with_local_reporter("/tmp/profiles")
246 /// .build();
247 /// # if false { // don't spawn the profiler in doctests
248 /// profiler.spawn()?;
249 /// # }
250 /// # Ok(())
251 /// # }
252 /// ```
253 pub fn with_cpu_interval(mut self, cpu_interval: Duration) -> Self {
254 self.cpu_interval = Some(cpu_interval.as_nanos());
255 self
256 }
257
258 /// Sets the interval, in milliseconds, in which the profiler will collect
259 /// wall-clock samples, via the [async-profiler `wall` option].
260 ///
261 /// Wall-clock samples (JFR `profiler.WallClockSample`) sample threads
262 /// whether they are sleeping or running, and can therefore be
263 /// very useful for finding threads that are blocked, for example
264 /// on a synchronous lock or a slow system call.
265 ///
266 /// When using Tokio, since tasks are not threads, tasks that are not
267 /// currently running will not be sampled by a wall clock sample. However,
268 /// a wall clock sample is still very useful in Tokio, since it is what
269 /// you want to catch tasks that are blocking a thread by waiting on
270 /// synchronous operations.
271 ///
272 /// The default is to do a wall-clock sample every second.
273 ///
274 /// The async-profiler agent collects both CPU time and wall-clock time
275 /// samples, so this function should normally be used along with
276 /// [ProfilerOptionsBuilder::with_cpu_interval].
277 ///
278 /// [async-profiler `wall` option]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilerOptions.md#options-applicable-to-any-output-format
279 ///
280 /// ### Examples
281 ///
282 /// This will sample allocations for every 10 CPU milliseconds (when running)
283 /// and 100 wall-clock milliseconds (running or sleeping):
284 ///
285 /// ```
286 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
287 /// # use async_profiler_agent::profiler::SpawnError;
288 /// # use std::time::Duration;
289 /// # fn main() -> Result<(), SpawnError> {
290 /// let opts = ProfilerOptionsBuilder::default()
291 /// .with_cpu_interval(Duration::from_millis(10))
292 /// .with_wall_clock_interval(Duration::from_millis(10))
293 /// .build();
294 /// let profiler = ProfilerBuilder::default()
295 /// .with_profiler_options(opts)
296 /// .with_local_reporter("/tmp/profiles")
297 /// .build();
298 /// # if false { // don't spawn the profiler in doctests
299 /// profiler.spawn()?;
300 /// # }
301 /// # Ok(())
302 /// # }
303 /// ```
304 pub fn with_wall_clock_interval(mut self, wall_clock: Duration) -> Self {
305 self.wall_clock_millis = Some(wall_clock.as_millis());
306 self
307 }
308
309 /// Build the [`ProfilerOptions`] from the builder.
310 pub fn build(self) -> ProfilerOptions {
311 ProfilerOptions {
312 native_mem: self.native_mem,
313 wall_clock_millis: self.wall_clock_millis,
314 cpu_interval: self.cpu_interval,
315 }
316 }
317}
318
319/// Builds a [`Profiler`], panicking if any required fields were not set by the
320/// time `build` is called.
321#[derive(Debug, Default)]
322pub struct ProfilerBuilder {
323 reporting_interval: Option<Duration>,
324 reporter: Option<Box<dyn Reporter + Send + Sync>>,
325 agent_metadata: Option<AgentMetadata>,
326 profiler_options: Option<ProfilerOptions>,
327}
328
329impl ProfilerBuilder {
330 /// Sets the reporting interval (default: 30 seconds).
331 ///
332 /// This is the interval that samples are *reported* to the backend,
333 /// and is unrelated to the interval at which the application
334 /// is *sampled* by async profiler, which is controlled by
335 /// [ProfilerOptionsBuilder::with_cpu_interval] and
336 /// [ProfilerOptionsBuilder::with_wall_clock_interval].
337 ///
338 /// Most users should not change this setting.
339 ///
340 /// ## Example
341 ///
342 /// ```no_run
343 /// # use std::path::PathBuf;
344 /// # use std::time::Duration;
345 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
346 /// # let path = PathBuf::from(".");
347 /// let agent = ProfilerBuilder::default()
348 /// .with_local_reporter(path)
349 /// .with_reporting_interval(Duration::from_secs(15))
350 /// .build()
351 /// .spawn()?;
352 /// # Ok::<_, SpawnError>(())
353 /// ```
354 pub fn with_reporting_interval(mut self, i: Duration) -> ProfilerBuilder {
355 self.reporting_interval = Some(i);
356 self
357 }
358
359 /// Sets the [`Reporter`], which is used to upload the collected profiling
360 /// data. Common reporters are [`LocalReporter`], and, with the `s3-no-defaults`
361 /// feature enabled,
362 #[cfg_attr(not(feature = "s3-no-defaults"), doc = "`S3Reporter`.")]
363 #[cfg_attr(feature = "s3-no-defaults", doc = "[`S3Reporter`].")]
364 /// It is also possible to write your own [`Reporter`].
365 ///
366 /// It's normally easier to use [`LocalReporter`] directly via
367 /// [`ProfilerBuilder::with_local_reporter`].
368 ///
369 /// If you want to output to multiple reporters, you can use
370 /// [`MultiReporter`].
371 ///
372 /// [`LocalReporter`]: crate::reporter::local::LocalReporter
373 /// [`MultiReporter`]: crate::reporter::multi::MultiReporter
374 #[cfg_attr(
375 feature = "s3-no-defaults",
376 doc = "[`S3Reporter`]: crate::reporter::s3::S3Reporter"
377 )]
378 ///
379 #[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example.md"))]
380 pub fn with_reporter(mut self, r: impl Reporter + Send + Sync + 'static) -> ProfilerBuilder {
381 self.reporter = Some(Box::new(r));
382 self
383 }
384
385 /// Sets the profiler to ues [LocalReporter], which will write `.jfr` files to `path`,
386 /// and disables metadata auto-detection (see [`ProfilerBuilder::with_custom_agent_metadata`])
387 /// since the [LocalReporter] does not need that.
388 ///
389 /// This is useful for testing, since metadata auto-detection currently only works
390 /// on [Amazon EC2] or [Amazon Fargate] instances.
391 ///
392 /// The local reporter should normally not be used in production, since it will
393 /// not clean up JFR files. Instead, you can use a pre-existing [`Reporter`]
394 /// or write your own (see [`ProfilerBuilder::with_reporter`]).
395 ///
396 /// [Amazon EC2]: https://aws.amazon.com/ec2
397 /// [Amazon Fargate]: https://aws.amazon.com/fargate
398 ///
399 /// ## Example
400 ///
401 /// This will write profiles as `.jfr` files to `./path-to-profiles`:
402 ///
403 /// ```no_run
404 /// # use std::path::PathBuf;
405 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
406 /// # use async_profiler_agent::reporter::local::LocalReporter;
407 /// # use async_profiler_agent::metadata::AgentMetadata;
408 /// let path = PathBuf::from("./path-to-profiles");
409 /// let agent = ProfilerBuilder::default()
410 /// .with_local_reporter(path)
411 /// .build()
412 /// .spawn()?;
413 /// # Ok::<_, SpawnError>(())
414 /// ```
415 pub fn with_local_reporter(mut self, path: impl Into<PathBuf>) -> ProfilerBuilder {
416 self.reporter = Some(Box::new(LocalReporter::new(path.into())));
417 self.with_custom_agent_metadata(AgentMetadata::NoMetadata)
418 }
419
420 /// Provide custom agent metadata.
421 ///
422 /// The async-profiler Rust agent sends metadata to the [Reporter] with
423 /// the identity of the current host and process, which is normally
424 /// transmitted as `metadata.json` within the generated `.zip` file,
425 /// using the schema format [`reporter::s3::MetadataJson`].
426 ///
427 /// That metadata can later be used by tooling to be able to sort
428 /// profiling reports by host.
429 ///
430 /// async-profiler Rust agent will by default try to fetch the metadata
431 /// using [IMDS] when running on [Amazon EC2] or [Amazon Fargate], and
432 /// will error if it's unable to find it. If you are running the
433 /// async-profiler agent on any other form of compute,
434 /// you will need to create and attach your own metadata
435 /// by calling this function.
436 ///
437 #[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example-custom-metadata.md"))]
438 /// [`reporter::s3::MetadataJson`]: crate::reporter::s3::MetadataJson
439 /// [Amazon EC2]: https://aws.amazon.com/ec2
440 /// [Amazon Fargate]: https://aws.amazon.com/fargate
441 /// [IMDS]: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
442 pub fn with_custom_agent_metadata(mut self, j: AgentMetadata) -> ProfilerBuilder {
443 self.agent_metadata = Some(j);
444 self
445 }
446
447 /// Provide custom profiler options.
448 ///
449 /// ### Example
450 ///
451 /// This will sample allocations for every 10 megabytes allocated:
452 ///
453 /// ```
454 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
455 /// # use async_profiler_agent::profiler::SpawnError;
456 /// # fn main() -> Result<(), SpawnError> {
457 /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build();
458 /// let profiler = ProfilerBuilder::default()
459 /// .with_profiler_options(opts)
460 /// .with_local_reporter("/tmp/profiles")
461 /// .build();
462 /// # if false { // don't spawn the profiler in doctests
463 /// profiler.spawn()?;
464 /// # }
465 /// # Ok(())
466 /// # }
467 /// ```
468 pub fn with_profiler_options(mut self, c: ProfilerOptions) -> ProfilerBuilder {
469 self.profiler_options = Some(c);
470 self
471 }
472
473 /// Turn this builder into a profiler!
474 pub fn build(self) -> Profiler {
475 Profiler {
476 reporting_interval: self.reporting_interval.unwrap_or(Duration::from_secs(30)),
477 reporter: self.reporter.expect("reporter is required"),
478 agent_metadata: self.agent_metadata,
479 profiler_options: self.profiler_options.unwrap_or_default(),
480 }
481 }
482}
483
484enum Status {
485 Idle,
486 Starting,
487 Running(SystemTime),
488}
489
490/// This type provides wrapper APIs over [`asprof::AsProf`], to allow tracking
491/// of the state of the profiler. The primary benefit of this is RAII - when
492/// this type drops, it will stop the profiler if it's running.
493struct ProfilerState<E: ProfilerEngine> {
494 // this is only None in the destructor when stopping the async-profiler fails
495 jfr_file: Option<JfrFile>,
496 asprof: E,
497 status: Status,
498 profiler_options: ProfilerOptions,
499}
500
501impl<E: ProfilerEngine> ProfilerState<E> {
502 fn new(asprof: E, profiler_options: ProfilerOptions) -> Result<Self, io::Error> {
503 Ok(Self {
504 jfr_file: Some(JfrFile::new()?),
505 asprof,
506 status: Status::Idle,
507 profiler_options,
508 })
509 }
510
511 fn jfr_file_mut(&mut self) -> &mut JfrFile {
512 self.jfr_file.as_mut().unwrap()
513 }
514
515 async fn start(&mut self) -> Result<(), AsProfError> {
516 let active = self.jfr_file.as_ref().unwrap().active_path();
517 // drop guard - make sure the files are leaked if the profiler might have started
518 self.status = Status::Starting;
519 E::start_async_profiler(&self.asprof, &active, &self.profiler_options)?;
520 self.status = Status::Running(SystemTime::now());
521 Ok(())
522 }
523
524 fn stop(&mut self) -> Result<Option<SystemTime>, AsProfError> {
525 E::stop_async_profiler()?;
526 let status = std::mem::replace(&mut self.status, Status::Idle);
527 Ok(match status {
528 Status::Idle | Status::Starting => None,
529 Status::Running(since) => Some(since),
530 })
531 }
532
533 fn is_started(&self) -> bool {
534 matches!(self.status, Status::Running(_))
535 }
536}
537
538impl<E: ProfilerEngine> Drop for ProfilerState<E> {
539 fn drop(&mut self) {
540 match self.status {
541 Status::Running(_) => {
542 if let Err(err) = self.stop() {
543 // SECURITY: avoid removing the JFR file if stopping the profiler fails,
544 // to avoid symlink races
545 std::mem::forget(self.jfr_file.take());
546 // XXX: Rust defines leaking resources during drop as safe.
547 tracing::warn!(?err, "unable to stop profiler during drop glue");
548 }
549 }
550 Status::Idle => {}
551 Status::Starting => {
552 // SECURITY: avoid removing the JFR file if stopping the profiler fails,
553 // to avoid symlink races
554 std::mem::forget(self.jfr_file.take());
555 }
556 }
557 }
558}
559
560pub(crate) trait ProfilerEngine: Send + Sync + 'static {
561 fn init_async_profiler() -> Result<(), asprof::AsProfError>;
562 fn start_async_profiler(
563 &self,
564 jfr_file_path: &Path,
565 options: &ProfilerOptions,
566 ) -> Result<(), asprof::AsProfError>;
567 fn stop_async_profiler() -> Result<(), asprof::AsProfError>;
568}
569
570#[derive(Debug, Error)]
571#[non_exhaustive]
572enum TickError {
573 #[error(transparent)]
574 AsProf(#[from] AsProfError),
575 #[error(transparent)]
576 #[cfg(feature = "aws-metadata-no-defaults")]
577 Metadata(#[from] crate::metadata::aws::AwsProfilerMetadataError),
578 #[error("reporter: {0}")]
579 Reporter(Box<dyn std::error::Error + Send>),
580 #[error("broken clock: {0}")]
581 BrokenClock(#[from] SystemTimeError),
582 #[error("jfr read error: {0}")]
583 JfrRead(io::Error),
584 #[error("empty inactive file error: {0}")]
585 EmptyInactiveFile(io::Error),
586}
587
588#[derive(Debug, Error)]
589#[non_exhaustive]
590/// An error that happened spawning a profiler
591pub enum SpawnError {
592 /// Error from async-profiler
593 #[error(transparent)]
594 AsProf(#[from] asprof::AsProfError),
595 /// Error writing to a tempfile
596 #[error("tempfile error")]
597 TempFile(#[source] io::Error),
598}
599
600#[derive(Debug, Error)]
601#[non_exhaustive]
602/// An error from [`Profiler::spawn_thread`]
603pub enum SpawnThreadError {
604 /// Error from async-profiler
605 #[error(transparent)]
606 AsProf(#[from] SpawnError),
607 /// Error constructing Tokio runtime
608 #[error("constructing Tokio runtime")]
609 ConstructRt(#[source] io::Error),
610}
611
612// no control messages currently
613enum Control {}
614
615/// A handle to a running profiler
616///
617/// Currently just allows for stopping the profiler.
618///
619/// Dropping this handle will request that the profiler will stop.
620#[must_use = "dropping this stops the profiler, call .detach() to detach"]
621pub struct RunningProfiler {
622 stop_channel: tokio::sync::oneshot::Sender<Control>,
623 join_handle: tokio::task::JoinHandle<()>,
624}
625
626impl RunningProfiler {
627 /// Request that the current profiler stops and wait until it exits.
628 ///
629 /// This will cause the currently-pending profile information to be flushed.
630 ///
631 /// After this function returns, it is correct and safe to [spawn] a new
632 /// [Profiler], possibly with a different configuration. Therefore,
633 /// this function can be used to "reconfigure" a profiler by stopping
634 /// it and then starting a new one with a different configuration.
635 ///
636 /// [spawn]: Profiler::spawn_controllable
637 pub async fn stop(self) {
638 drop(self.stop_channel);
639 let _ = self.join_handle.await;
640 }
641
642 /// Like [Self::detach], but returns a JoinHandle. This is currently not a public API.
643 fn detach_inner(self) -> tokio::task::JoinHandle<()> {
644 tokio::task::spawn(async move {
645 // move the control channel to the spawned task. this way, it will be dropped
646 // just when the task is aborted.
647 let _abort_channel = self.stop_channel;
648 self.join_handle.await.ok();
649 })
650 }
651
652 /// Detach this profiler. This will prevent the profiler from being stopped
653 /// when this handle is dropped. You should call this (or [Profiler::spawn]
654 /// instead of [Profiler::spawn_controllable], which does the same thing)
655 /// if you don't intend to reconfigure your profiler at runtime.
656 pub fn detach(self) {
657 self.detach_inner();
658 }
659
660 /// Spawns this [RunningProfiler] into a separate thread within a new Tokio runtime,
661 /// and returns a [RunningProfilerThread] attached to it.
662 fn spawn_attached(
663 self,
664 runtime: tokio::runtime::Runtime,
665 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
666 ) -> RunningProfilerThread {
667 RunningProfilerThread {
668 stop_channel: self.stop_channel,
669 join_handle: spawn_fn(Box::new(move || {
670 let _ = runtime.block_on(self.join_handle);
671 })),
672 }
673 }
674
675 /// Spawns this [RunningProfiler] into a separate thread within a new Tokio runtime,
676 /// and detaches it.
677 fn spawn_detached(
678 self,
679 runtime: tokio::runtime::Runtime,
680 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
681 ) {
682 spawn_fn(Box::new(move || {
683 let _stop_channel = self.stop_channel;
684 let _ = runtime.block_on(self.join_handle);
685 }));
686 }
687}
688
689/// A handle to a running profiler, running on a separate thread.
690///
691/// Currently just allows for stopping the profiler.
692///
693/// Dropping this handle will request that the profiler will stop.
694#[must_use = "dropping this stops the profiler, call .detach() to detach"]
695pub struct RunningProfilerThread {
696 stop_channel: tokio::sync::oneshot::Sender<Control>,
697 join_handle: std::thread::JoinHandle<()>,
698}
699
700impl RunningProfilerThread {
701 /// Request that the current profiler stops and wait until it exits.
702 ///
703 /// This will cause the currently-pending profile information to be flushed.
704 ///
705 /// After this function returns, it is correct and safe to [spawn] a new
706 /// [Profiler], possibly with a different configuration. Therefore,
707 /// this function can be used to "reconfigure" a profiler by stopping
708 /// it and then starting a new one with a different configuration.
709 ///
710 /// [spawn]: Profiler::spawn_controllable
711 pub fn stop(self) {
712 drop(self.stop_channel);
713 let _ = self.join_handle.join();
714 }
715}
716
717/// Rust profiler based on [async-profiler].
718///
719/// Spawning a profiler can be done either in an attached (controllable)
720/// mode, which allows for stopping the profiler (and, in fact, stops
721/// it when the relevant handle is dropped), or in detached mode,
722/// in which the profiler keeps running forever. Applications that can
723/// shut down the profiler at run-time, for example applications that
724/// support reconfiguration of a running profiler, generally want to use
725/// controllable mode. Other applications (most of them) should use
726/// detached mode.
727///
728/// In addition, the profiler can either be spawned into the current Tokio
729/// runtime, or into a new one. Normally, applications should spawn
730/// the profiler into their own Tokio runtime, but applications that
731/// don't have a default Tokio runtime should spawn it into a
732/// different one
733///
734/// This leaves 4 functions:
735/// 1. [Self::spawn] - detached, same runtime
736/// 2. [Self::spawn_thread_to_runtime] - detached, different runtime
737/// 3. [Self::spawn_controllable] - controllable, same runtime
738/// 4. [Self::spawn_controllable_thread_to_runtime] - controllable, different runtime
739///
740/// In addition, there's a helper function that just spawns the profiler
741/// to a new runtime in a new thread, for applications that don't have
742/// a Tokio runtime and don't need complex control:
743///
744/// 5. [Self::spawn_thread] - detached, new runtime in a new thread
745///
746/// [async-profiler]: https://github.com/async-profiler/async-profiler
747pub struct Profiler {
748 reporting_interval: Duration,
749 reporter: Box<dyn Reporter + Send + Sync>,
750 agent_metadata: Option<AgentMetadata>,
751 profiler_options: ProfilerOptions,
752}
753
754impl Profiler {
755 /// Start profiling. The profiler will run in a tokio task at the configured interval.
756 ///
757 /// This is the same as calling [Profiler::spawn_controllable] followed by
758 /// [RunningProfiler::detach], except it returns a [JoinHandle].
759 ///
760 /// The returned [JoinHandle] can be used to detect if the profiler has exited
761 /// due to a fatal error.
762 ///
763 /// This function will fail if it is unable to start async-profiler, for example
764 /// if it can't find or load `libasyncProfiler.so`.
765 ///
766 /// [JoinHandle]: tokio::task::JoinHandle
767 ///
768 /// ### Tokio Runtime
769 ///
770 /// This function must be run within a Tokio runtime, otherwise it will panic. If
771 /// your application does not have a `main` Tokio runtime, see
772 /// [Profiler::spawn_thread].
773 ///
774 /// ### Example
775 ///
776 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
777 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
778 ///
779 /// ```
780 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
781 /// # #[tokio::main]
782 /// # async fn main() -> Result<(), SpawnError> {
783 /// let profiler = ProfilerBuilder::default()
784 /// .with_local_reporter("/tmp/profiles")
785 /// .build();
786 /// # if false { // don't spawn the profiler in doctests
787 /// profiler.spawn()?;
788 /// # }
789 /// # Ok(())
790 /// # }
791 /// ```
792 pub fn spawn(self) -> Result<tokio::task::JoinHandle<()>, SpawnError> {
793 self.spawn_controllable().map(RunningProfiler::detach_inner)
794 }
795
796 /// Like [Self::spawn], but instead of spawning within the current Tokio
797 /// runtime, spawns within a set Tokio runtime and then runs a thread that calls
798 /// [block_on](tokio::runtime::Runtime::block_on) on that runtime.
799 ///
800 /// If your configuration is standard, use [Profiler::spawn_thread].
801 ///
802 /// If you want to be able to stop the resulting profiler, use
803 /// [Profiler::spawn_controllable_thread_to_runtime].
804 ///
805 /// `spawn_fn` should be [`std::thread::spawn`], or some function that behaves like it (to
806 /// allow for configuring thread properties, for example thread names).
807 ///
808 /// This is to be used when your program does not have a "main" Tokio runtime already set up.
809 ///
810 /// ### Example
811 ///
812 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
813 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
814 ///
815 /// ```no_run
816 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
817 /// let rt = tokio::runtime::Builder::new_current_thread()
818 /// .enable_all()
819 /// .build()?;
820 /// let profiler = ProfilerBuilder::default()
821 /// .with_local_reporter("/tmp/profiles")
822 /// .build();
823 ///
824 /// profiler.spawn_thread_to_runtime(
825 /// rt,
826 /// |t| {
827 /// std::thread::Builder::new()
828 /// .name("asprof-agent".to_owned())
829 /// .spawn(t)
830 /// .expect("thread name contains nuls")
831 /// }
832 /// )?;
833 /// # Ok::<_, anyhow::Error>(())
834 /// ```
835 pub fn spawn_thread_to_runtime(
836 self,
837 runtime: tokio::runtime::Runtime,
838 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
839 ) -> Result<(), SpawnError> {
840 self.spawn_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn)
841 }
842
843 /// Like [Self::spawn], but instead of spawning within the current Tokio
844 /// runtime, spawns within a new Tokio runtime and then runs a thread that calls
845 /// [block_on](tokio::runtime::Runtime::block_on) on that runtime, setting up the runtime
846 /// by itself.
847 ///
848 /// If your configuration is less standard, use [Profiler::spawn_thread_to_runtime]. Calling
849 /// [Profiler::spawn_thread] is equivalent to calling [Profiler::spawn_thread_to_runtime]
850 /// with the following:
851 /// 1. a current thread runtime with background worker threads (these exist
852 /// for blocking IO) named "asprof-worker"
853 /// 2. a controller thread (the "main" thread of the runtime) named "asprof-agent"
854 ///
855 /// If you want to be able to stop the resulting profiler, use
856 /// [Profiler::spawn_controllable_thread_to_runtime].
857 ///
858 /// This is to be used when your program does not have a "main" Tokio runtime already set up.
859 ///
860 /// ### Example
861 ///
862 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
863 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
864 ///
865 /// ```no_run
866 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
867 /// # use async_profiler_agent::reporter::local::LocalReporter;
868 /// let profiler = ProfilerBuilder::default()
869 /// .with_local_reporter("/tmp/profiles")
870 /// .build();
871 ///
872 /// profiler.spawn_thread()?;
873 /// # Ok::<_, anyhow::Error>(())
874 /// ```
875 pub fn spawn_thread(self) -> Result<(), SpawnThreadError> {
876 // using "asprof" in thread name to deal with 15 character + \0 length limit
877 let rt = tokio::runtime::Builder::new_current_thread()
878 .thread_name("asprof-worker".to_owned())
879 .enable_all()
880 .build()
881 .map_err(SpawnThreadError::ConstructRt)?;
882 let builder = std::thread::Builder::new().name("asprof-agent".to_owned());
883 self.spawn_thread_to_runtime(rt, |t| builder.spawn(t).expect("thread name contains nuls"))
884 .map_err(SpawnThreadError::AsProf)
885 }
886
887 fn spawn_thread_inner<E: ProfilerEngine>(
888 self,
889 asprof: E,
890 runtime: tokio::runtime::Runtime,
891 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
892 ) -> Result<(), SpawnError> {
893 let handle: RunningProfiler = runtime.block_on(async move { self.spawn_inner(asprof) })?;
894 handle.spawn_detached(runtime, spawn_fn);
895 Ok(())
896 }
897
898 /// Like [Self::spawn], but returns a [RunningProfiler] that allows for controlling
899 /// (currently only stopping) the profiler.
900 ///
901 /// This allows for changing the configuration of the profiler at runtime, by
902 /// stopping it and then starting a new Profiler with a new configuration. It
903 /// also allows for stopping profiling in case the profiler is suspected to
904 /// cause operational issues.
905 ///
906 /// Dropping the returned [RunningProfiler] will cause the profiler to quit,
907 /// so if your application doen't need to change the profiler's configuration at runtime,
908 /// it will be easier to use [Profiler::spawn].
909 ///
910 /// This function will fail if it is unable to start async-profiler, for example
911 /// if it can't find or load `libasyncProfiler.so`.
912 ///
913 /// ### Tokio Runtime
914 ///
915 /// This function must be run within a Tokio runtime, otherwise it will panic. If
916 /// your application does not have a `main` Tokio runtime, see
917 /// [Profiler::spawn_controllable_thread_to_runtime].
918 ///
919 /// ### Example
920 ///
921 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
922 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
923 ///
924 /// ```no_run
925 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
926 /// # #[tokio::main]
927 /// # async fn main() -> Result<(), SpawnError> {
928 /// let profiler = ProfilerBuilder::default()
929 /// .with_local_reporter("/tmp/profiles")
930 /// .build();
931 ///
932 /// let profiler = profiler.spawn_controllable()?;
933 ///
934 /// // [insert your signaling/monitoring mechanism to have a request to disable
935 /// // profiling in case of a problem]
936 /// let got_request_to_disable_profiling = async move {
937 /// // ...
938 /// # false
939 /// };
940 /// // spawn a task that will disable profiling if requested
941 /// tokio::task::spawn(async move {
942 /// if got_request_to_disable_profiling.await {
943 /// profiler.stop().await;
944 /// }
945 /// });
946 /// # Ok(())
947 /// # }
948 /// ```
949 pub fn spawn_controllable(self) -> Result<RunningProfiler, SpawnError> {
950 self.spawn_inner(asprof::AsProf::builder().build())
951 }
952
953 /// Like [Self::spawn_controllable], but instead of spawning within the current Tokio
954 /// runtime, spawns within a set Tokio runtime and then runs a thread that calls
955 /// [block_on](tokio::runtime::Runtime::block_on) on that runtime.
956 ///
957 /// `spawn_fn` should be [`std::thread::spawn`], or some function that behaves like it (to
958 /// allow for configuring thread properties, for example thread names).
959 ///
960 /// This is to be used when your program does not have a "main" Tokio runtime already set up.
961 ///
962 /// ### Example
963 ///
964 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
965 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
966 ///
967 /// ```no_run
968 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
969 /// let rt = tokio::runtime::Builder::new_current_thread()
970 /// .enable_all()
971 /// .build()?;
972 /// let profiler = ProfilerBuilder::default()
973 /// .with_local_reporter("/tmp/profiles")
974 /// .build();
975 ///
976 /// let profiler = profiler.spawn_controllable_thread_to_runtime(
977 /// rt,
978 /// |t| {
979 /// std::thread::Builder::new()
980 /// .name("asprof-agent".to_owned())
981 /// .spawn(t)
982 /// .expect("thread name contains nuls")
983 /// }
984 /// )?;
985 ///
986 /// # fn got_request_to_disable_profiling() -> bool { false }
987 /// // spawn a task that will disable profiling if requested
988 /// std::thread::spawn(move || {
989 /// if got_request_to_disable_profiling() {
990 /// profiler.stop();
991 /// }
992 /// });
993 /// # Ok::<_, anyhow::Error>(())
994 /// ```
995 pub fn spawn_controllable_thread_to_runtime(
996 self,
997 runtime: tokio::runtime::Runtime,
998 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
999 ) -> Result<RunningProfilerThread, SpawnError> {
1000 self.spawn_controllable_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn)
1001 }
1002
1003 fn spawn_controllable_thread_inner<E: ProfilerEngine>(
1004 self,
1005 asprof: E,
1006 runtime: tokio::runtime::Runtime,
1007 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
1008 ) -> Result<RunningProfilerThread, SpawnError> {
1009 let handle = runtime.block_on(async move { self.spawn_inner(asprof) })?;
1010 Ok(handle.spawn_attached(runtime, spawn_fn))
1011 }
1012
1013 fn spawn_inner<E: ProfilerEngine>(self, asprof: E) -> Result<RunningProfiler, SpawnError> {
1014 // Initialize async profiler - needs to be done once.
1015 E::init_async_profiler()?;
1016 tracing::info!("successfully initialized async profiler.");
1017
1018 let mut sampling_ticker = tokio::time::interval(self.reporting_interval);
1019 let (stop_channel, mut stop_rx) = tokio::sync::oneshot::channel();
1020
1021 // Get profiles at the configured interval rate.
1022 let join_handle = tokio::spawn(async move {
1023 let mut state = match ProfilerState::new(asprof, self.profiler_options) {
1024 Ok(state) => state,
1025 Err(err) => {
1026 tracing::error!(?err, "unable to create profiler state");
1027 return;
1028 }
1029 };
1030
1031 // Lazily-loaded if not specified up front.
1032 let mut agent_metadata = self.agent_metadata;
1033 let mut done = false;
1034
1035 while !done {
1036 // Wait until a timer or exit event
1037 tokio::select! {
1038 biased;
1039
1040 r = &mut stop_rx, if !stop_rx.is_terminated() => {
1041 match r {
1042 Err(_) => {
1043 tracing::info!("profiler stop requested, doing a final tick");
1044 done = true;
1045 }
1046 }
1047 }
1048 _ = sampling_ticker.tick() => {
1049 tracing::debug!("profiler timer woke up");
1050 }
1051 }
1052
1053 if let Err(err) = profiler_tick(
1054 &mut state,
1055 &mut agent_metadata,
1056 &*self.reporter,
1057 self.reporting_interval,
1058 )
1059 .await
1060 {
1061 match &err {
1062 TickError::Reporter(_) => {
1063 // don't stop on IO errors
1064 tracing::error!(?err, "error during profiling, continuing");
1065 }
1066 _stop => {
1067 tracing::error!(?err, "error during profiling, stopping");
1068 break;
1069 }
1070 }
1071 }
1072 }
1073
1074 tracing::info!("profiling task finished");
1075 });
1076
1077 Ok(RunningProfiler {
1078 stop_channel,
1079 join_handle,
1080 })
1081 }
1082}
1083
1084async fn profiler_tick<E: ProfilerEngine>(
1085 state: &mut ProfilerState<E>,
1086 agent_metadata: &mut Option<AgentMetadata>,
1087 reporter: &(dyn Reporter + Send + Sync),
1088 reporting_interval: Duration,
1089) -> Result<(), TickError> {
1090 if !state.is_started() {
1091 state.start().await?;
1092 return Ok(());
1093 }
1094
1095 let Some(start) = state.stop()? else {
1096 tracing::warn!("stopped the profiler but it wasn't running?");
1097 return Ok(());
1098 };
1099 let start = start.duration_since(UNIX_EPOCH)?;
1100 let end = SystemTime::now().duration_since(UNIX_EPOCH)?;
1101
1102 // Start it up immediately, writing to the "other" file, so that we keep
1103 // profiling the application while we're reporting data.
1104 state
1105 .jfr_file_mut()
1106 .empty_inactive_file()
1107 .map_err(TickError::EmptyInactiveFile)?;
1108 state.jfr_file_mut().swap();
1109 state.start().await?;
1110
1111 // Lazily load the agent metadata if it was not provided in
1112 // the constructor. See the struct comments for why this is.
1113 // This code runs at most once.
1114 if agent_metadata.is_none() {
1115 #[cfg(feature = "aws-metadata-no-defaults")]
1116 let md = crate::metadata::aws::load_agent_metadata().await?;
1117 #[cfg(not(feature = "aws-metadata-no-defaults"))]
1118 let md = crate::metadata::AgentMetadata::NoMetadata;
1119 tracing::debug!("loaded metadata");
1120 agent_metadata.replace(md);
1121 }
1122
1123 let report_metadata = ReportMetadata {
1124 instance: agent_metadata.as_ref().unwrap(),
1125 start,
1126 end,
1127 reporting_interval,
1128 };
1129
1130 let jfr = tokio::fs::read(state.jfr_file_mut().inactive_path())
1131 .await
1132 .map_err(TickError::JfrRead)?;
1133
1134 reporter
1135 .report(jfr, &report_metadata)
1136 .await
1137 .map_err(TickError::Reporter)?;
1138
1139 Ok(())
1140}
1141
1142#[cfg(test)]
1143mod tests {
1144 use std::sync::atomic::{self, AtomicBool, AtomicU32};
1145 use std::sync::Arc;
1146
1147 use test_case::test_case;
1148
1149 use super::*;
1150
1151 #[test]
1152 fn test_jfr_file_drop() {
1153 let mut jfr = JfrFile::new().unwrap();
1154
1155 std::fs::write(jfr.active_path(), b"Hello, 2!").unwrap();
1156 jfr.swap();
1157 assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"Hello, 2!");
1158 jfr.empty_inactive_file().unwrap();
1159 assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"");
1160 }
1161
1162 struct MockProfilerEngine {
1163 counter: AtomicU32,
1164 }
1165 impl ProfilerEngine for MockProfilerEngine {
1166 fn init_async_profiler() -> Result<(), asprof::AsProfError> {
1167 Ok(())
1168 }
1169
1170 fn start_async_profiler(
1171 &self,
1172 jfr_file_path: &Path,
1173 _options: &ProfilerOptions,
1174 ) -> Result<(), asprof::AsProfError> {
1175 let contents = format!(
1176 "JFR{}",
1177 self.counter.fetch_add(1, atomic::Ordering::Relaxed)
1178 );
1179 std::fs::write(jfr_file_path, contents.as_bytes()).unwrap();
1180 Ok(())
1181 }
1182
1183 fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
1184 Ok(())
1185 }
1186 }
1187
1188 struct MockReporter(tokio::sync::mpsc::Sender<(String, AgentMetadata)>);
1189 impl std::fmt::Debug for MockReporter {
1190 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1191 f.debug_struct("MockReporter").finish()
1192 }
1193 }
1194
1195 #[async_trait::async_trait]
1196 impl Reporter for MockReporter {
1197 async fn report(
1198 &self,
1199 jfr: Vec<u8>,
1200 metadata: &ReportMetadata,
1201 ) -> Result<(), Box<dyn std::error::Error + Send>> {
1202 self.0
1203 .send((String::from_utf8(jfr).unwrap(), metadata.instance.clone()))
1204 .await
1205 .unwrap();
1206 Ok(())
1207 }
1208 }
1209
1210 fn make_mock_profiler() -> (
1211 Profiler,
1212 tokio::sync::mpsc::Receiver<(String, AgentMetadata)>,
1213 ) {
1214 let (tx, rx) = tokio::sync::mpsc::channel(1);
1215 let agent = ProfilerBuilder::default()
1216 .with_reporter(MockReporter(tx))
1217 .with_custom_agent_metadata(AgentMetadata::Ec2AgentMetadata {
1218 aws_account_id: "0".into(),
1219 aws_region_id: "us-east-1".into(),
1220 ec2_instance_id: "i-fake".into(),
1221 })
1222 .build();
1223 (agent, rx)
1224 }
1225
1226 #[tokio::test(start_paused = true)]
1227 async fn test_profiler_agent() {
1228 let e_md = AgentMetadata::Ec2AgentMetadata {
1229 aws_account_id: "0".into(),
1230 aws_region_id: "us-east-1".into(),
1231 ec2_instance_id: "i-fake".into(),
1232 };
1233 let (agent, mut rx) = make_mock_profiler();
1234 agent
1235 .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1236 counter: AtomicU32::new(0),
1237 })
1238 .unwrap()
1239 .detach();
1240 let (jfr, md) = rx.recv().await.unwrap();
1241 assert_eq!(jfr, "JFR0");
1242 assert_eq!(e_md, md);
1243 let (jfr, md) = rx.recv().await.unwrap();
1244 assert_eq!(jfr, "JFR1");
1245 assert_eq!(e_md, md);
1246 }
1247
1248 #[test_case(false; "uncontrollable")]
1249 #[test_case(true; "controllable")]
1250 fn test_profiler_local_rt(controllable: bool) {
1251 let e_md = AgentMetadata::Ec2AgentMetadata {
1252 aws_account_id: "0".into(),
1253 aws_region_id: "us-east-1".into(),
1254 ec2_instance_id: "i-fake".into(),
1255 };
1256 let (agent, mut rx) = make_mock_profiler();
1257 let rt = tokio::runtime::Builder::new_current_thread()
1258 .enable_all()
1259 .start_paused(true)
1260 .build()
1261 .unwrap();
1262 // spawn the profiler, doing this before spawning a thread to allow
1263 // capturing errors from `spawn`
1264 let handle = if controllable {
1265 Some(
1266 agent
1267 .spawn_controllable_thread_inner::<MockProfilerEngine>(
1268 MockProfilerEngine {
1269 counter: AtomicU32::new(0),
1270 },
1271 rt,
1272 std::thread::spawn,
1273 )
1274 .unwrap(),
1275 )
1276 } else {
1277 agent
1278 .spawn_thread_inner::<MockProfilerEngine>(
1279 MockProfilerEngine {
1280 counter: AtomicU32::new(0),
1281 },
1282 rt,
1283 std::thread::spawn,
1284 )
1285 .unwrap();
1286 None
1287 };
1288
1289 let (jfr, md) = rx.blocking_recv().unwrap();
1290 assert_eq!(jfr, "JFR0");
1291 assert_eq!(e_md, md);
1292 let (jfr, md) = rx.blocking_recv().unwrap();
1293 assert_eq!(jfr, "JFR1");
1294 assert_eq!(e_md, md);
1295
1296 if let Some(handle) = handle {
1297 let drain_thread =
1298 std::thread::spawn(move || while let Some(_) = rx.blocking_recv() {});
1299 // request a stop
1300 handle.stop();
1301 // the drain thread should be done
1302 drain_thread.join().unwrap();
1303 }
1304 }
1305
1306 enum StopKind {
1307 Delibrate,
1308 Drop,
1309 Abort,
1310 }
1311
1312 #[tokio::test(start_paused = true)]
1313 #[test_case(StopKind::Delibrate; "deliberate stop")]
1314 #[test_case(StopKind::Drop; "drop stop")]
1315 #[test_case(StopKind::Abort; "abort stop")]
1316 async fn test_profiler_stop(stop_kind: StopKind) {
1317 let e_md = AgentMetadata::Ec2AgentMetadata {
1318 aws_account_id: "0".into(),
1319 aws_region_id: "us-east-1".into(),
1320 ec2_instance_id: "i-fake".into(),
1321 };
1322 let (agent, mut rx) = make_mock_profiler();
1323 let profiler_ref = agent
1324 .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1325 counter: AtomicU32::new(0),
1326 })
1327 .unwrap();
1328 let (jfr, md) = rx.recv().await.unwrap();
1329 assert_eq!(jfr, "JFR0");
1330 assert_eq!(e_md, md);
1331 let (jfr, md) = rx.recv().await.unwrap();
1332 assert_eq!(jfr, "JFR1");
1333 assert_eq!(e_md, md);
1334 // check that stop is faster than an interval and returns an "immediate" next jfr
1335 match stop_kind {
1336 StopKind::Drop => drop(profiler_ref),
1337 StopKind::Delibrate => {
1338 tokio::time::timeout(Duration::from_millis(1), profiler_ref.stop())
1339 .await
1340 .unwrap();
1341 }
1342 StopKind::Abort => {
1343 // You can call Abort on the JoinHandle. make sure that is not buggy.
1344 profiler_ref.detach_inner().abort();
1345 }
1346 }
1347 // check that we get the next JFR "quickly", and the JFR after that is empty.
1348 let (jfr, md) = tokio::time::timeout(Duration::from_millis(1), rx.recv())
1349 .await
1350 .unwrap()
1351 .unwrap();
1352 assert_eq!(jfr, "JFR2");
1353 assert_eq!(e_md, md);
1354 assert!(rx.recv().await.is_none());
1355 }
1356
1357 // simulate a badly-behaved profiler that errors on start/stop and then
1358 // tries to access the JFR file
1359 struct StopErrorProfilerEngine {
1360 start_error: bool,
1361 counter: Arc<AtomicBool>,
1362 }
1363 impl ProfilerEngine for StopErrorProfilerEngine {
1364 fn init_async_profiler() -> Result<(), asprof::AsProfError> {
1365 Ok(())
1366 }
1367
1368 fn start_async_profiler(
1369 &self,
1370 jfr_file_path: &Path,
1371 _options: &ProfilerOptions,
1372 ) -> Result<(), asprof::AsProfError> {
1373 let jfr_file_path = jfr_file_path.to_owned();
1374 std::fs::write(&jfr_file_path, "JFR").unwrap();
1375 let counter = self.counter.clone();
1376 tokio::task::spawn(async move {
1377 tokio::time::sleep(Duration::from_secs(5)).await;
1378 assert_eq!(std::fs::read_to_string(jfr_file_path).unwrap(), "JFR");
1379 counter.store(true, atomic::Ordering::Release);
1380 });
1381 if self.start_error {
1382 Err(asprof::AsProfError::AsyncProfilerError("error".into()))
1383 } else {
1384 Ok(())
1385 }
1386 }
1387
1388 fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
1389 Err(asprof::AsProfError::AsyncProfilerError("error".into()))
1390 }
1391 }
1392
1393 #[tokio::test(start_paused = true)]
1394 #[test_case(false; "error on stop")]
1395 #[test_case(true; "error on start")]
1396 async fn test_profiler_error(start_error: bool) {
1397 let (agent, mut rx) = make_mock_profiler();
1398 let counter = Arc::new(AtomicBool::new(false));
1399 let engine = StopErrorProfilerEngine {
1400 start_error,
1401 counter: counter.clone(),
1402 };
1403 let handle = agent.spawn_inner(engine).unwrap().detach_inner();
1404 assert!(rx.recv().await.is_none());
1405 // check that the "sleep 5" step in start_async_profiler succeeds
1406 for _ in 0..100 {
1407 tokio::time::sleep(Duration::from_secs(1)).await;
1408 if counter.load(atomic::Ordering::Acquire) {
1409 handle.await.unwrap(); // Check that the JoinHandle is done
1410 return;
1411 }
1412 }
1413 panic!("didn't read from file");
1414 }
1415
1416 #[test]
1417 fn test_profiler_options_to_args_string_default() {
1418 let opts = ProfilerOptions::default();
1419 let dummy_path = Path::new("/tmp/test.jfr");
1420 let args = opts.to_args_string(dummy_path);
1421 assert!(
1422 args.contains("start,event=cpu,interval=100000000,wall=1000ms,jfr,cstack=dwarf"),
1423 "Default args string not constructed correctly"
1424 );
1425 assert!(args.contains("file=/tmp/test.jfr"));
1426 assert!(!args.contains("nativemem="));
1427 }
1428
1429 #[test]
1430 fn test_profiler_options_to_args_string_with_native_mem() {
1431 let opts = ProfilerOptions {
1432 native_mem: Some("10m".to_string()),
1433 wall_clock_millis: None,
1434 cpu_interval: None,
1435 };
1436 let dummy_path = Path::new("/tmp/test.jfr");
1437 let args = opts.to_args_string(dummy_path);
1438 assert!(args.contains("nativemem=10m"));
1439 }
1440
1441 #[test]
1442 fn test_profiler_options_builder() {
1443 let opts = ProfilerOptionsBuilder::default()
1444 .with_native_mem_bytes(5000000)
1445 .build();
1446
1447 assert_eq!(opts.native_mem, Some("5000000".to_string()));
1448 }
1449
1450 #[test]
1451 fn test_profiler_options_builder_all_options() {
1452 let opts = ProfilerOptionsBuilder::default()
1453 .with_native_mem_bytes(5000000)
1454 .with_cpu_interval(Duration::from_secs(1))
1455 .with_wall_clock_interval(Duration::from_secs(10))
1456 .build();
1457
1458 let dummy_path = Path::new("/tmp/test.jfr");
1459 let args = opts.to_args_string(dummy_path);
1460 assert_eq!(args, "start,event=cpu,interval=1000000000,wall=10000ms,jfr,cstack=dwarf,file=/tmp/test.jfr,nativemem=5000000");
1461 }
1462
1463 #[test]
1464 fn test_local_reporter_has_no_metadata() {
1465 // Check that with_local_reporter sets some configuration
1466 let reporter = ProfilerBuilder::default().with_local_reporter(".");
1467 assert_eq!(
1468 format!("{:?}", reporter.reporter),
1469 r#"Some(LocalReporter { directory: "." })"#
1470 );
1471 match reporter.agent_metadata {
1472 Some(AgentMetadata::NoMetadata) => {}
1473 bad => panic!("{bad:?}"),
1474 };
1475 }
1476}