async_profiler_agent/profiler.rs
1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! A profiler that periodically uploads profiling samples of your program to a [Reporter]
5
6use crate::{
7 asprof::{self, AsProfError},
8 metadata::{AgentMetadata, ReportMetadata},
9 reporter::{Reporter, local::LocalReporter},
10};
11use std::{
12 fs::File,
13 io,
14 path::{Path, PathBuf},
15 time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
16};
17use thiserror::Error;
18
19struct JfrFile {
20 active: std::fs::File,
21 inactive: std::fs::File,
22}
23
24impl JfrFile {
25 #[cfg(target_os = "linux")]
26 fn new() -> Result<Self, io::Error> {
27 Ok(Self {
28 active: tempfile::tempfile().unwrap(),
29 inactive: tempfile::tempfile().unwrap(),
30 })
31 }
32
33 #[cfg(not(target_os = "linux"))]
34 fn new() -> Result<Self, io::Error> {
35 Err(io::Error::other(
36 "async-profiler is only supported on Linux",
37 ))
38 }
39
40 fn swap(&mut self) {
41 std::mem::swap(&mut self.active, &mut self.inactive);
42 }
43
44 #[cfg(target_os = "linux")]
45 fn file_path(file: &std::fs::File) -> PathBuf {
46 use std::os::fd::AsRawFd;
47
48 format!("/proc/self/fd/{}", file.as_raw_fd()).into()
49 }
50
51 #[cfg(not(target_os = "linux"))]
52 fn file_path(_file: &std::fs::File) -> PathBuf {
53 unimplemented!()
54 }
55
56 fn active_path(&self) -> PathBuf {
57 Self::file_path(&self.active)
58 }
59
60 fn inactive_path(&self) -> PathBuf {
61 Self::file_path(&self.inactive)
62 }
63
64 fn empty_inactive_file(&mut self) -> Result<(), io::Error> {
65 // Empty the file, or create it for the first time if the profiler hasn't
66 // started yet.
67 File::create(Self::file_path(&self.inactive))?;
68 tracing::debug!(message = "emptied the file");
69 Ok(())
70 }
71}
72
73/// Options for configuring the async-profiler behavior.
74/// Currently supports:
75/// - Native memory allocation tracking
76#[derive(Debug, Default)]
77#[non_exhaustive]
78pub struct ProfilerOptions {
79 /// If set, the profiler will collect information about
80 /// native memory allocations.
81 ///
82 /// The value is the interval in bytes or in other units,
83 /// if followed by k (kilobytes), m (megabytes), or g (gigabytes).
84 /// For example, `"10m"` will sample an allocation for every
85 /// 10 megabytes of memory allocated. Passing `"0"` will sample
86 /// all allocations.
87 ///
88 /// See [ProfilingModes in the async-profiler docs] for more details.
89 ///
90 /// [ProfilingModes in the async-profiler docs]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilingModes.md#native-memory-leaks
91 pub native_mem: Option<String>,
92 cpu_interval: Option<u128>,
93 wall_clock_millis: Option<u128>,
94}
95
96const DEFAULT_CPU_INTERVAL_NANOS: u128 = 100_000_000;
97const DEFAULT_WALL_CLOCK_INTERVAL_MILLIS: u128 = 1_000;
98
99impl ProfilerOptions {
100 /// Convert the profiler options to a string of arguments for the async-profiler.
101 pub fn to_args_string(&self, jfr_file_path: &std::path::Path) -> String {
102 let mut args = format!(
103 "start,event=cpu,interval={},wall={}ms,jfr,cstack=dwarf,file={}",
104 self.cpu_interval.unwrap_or(DEFAULT_CPU_INTERVAL_NANOS),
105 self.wall_clock_millis
106 .unwrap_or(DEFAULT_WALL_CLOCK_INTERVAL_MILLIS),
107 jfr_file_path.display()
108 );
109 if let Some(ref native_mem) = self.native_mem {
110 args.push_str(&format!(",nativemem={native_mem}"));
111 }
112 args
113 }
114}
115
116/// Builder for [`ProfilerOptions`].
117#[derive(Debug, Default)]
118pub struct ProfilerOptionsBuilder {
119 native_mem: Option<String>,
120 cpu_interval: Option<u128>,
121 wall_clock_millis: Option<u128>,
122}
123
124impl ProfilerOptionsBuilder {
125 /// Same as [ProfilerOptionsBuilder::with_native_mem_bytes], but pass
126 /// the string input directly to async_profiler.
127 ///
128 /// The value is the interval in bytes or in other units,
129 /// if followed by k (kilobytes), m (megabytes), or g (gigabytes).
130 ///
131 /// Prefer using [ProfilerOptionsBuilder::with_native_mem_bytes], since it's
132 /// type-checked.
133 ///
134 /// ### Examples
135 ///
136 /// This will sample allocations for every 10 megabytes allocated:
137 ///
138 /// ```
139 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
140 /// # use async_profiler_agent::profiler::SpawnError;
141 /// # fn main() -> Result<(), SpawnError> {
142 /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build();
143 /// let profiler = ProfilerBuilder::default()
144 /// .with_profiler_options(opts)
145 /// .with_local_reporter("/tmp/profiles")
146 /// .build();
147 /// # if false { // don't spawn the profiler in doctests
148 /// profiler.spawn()?;
149 /// # }
150 /// # Ok(())
151 /// # }
152 /// ```
153 pub fn with_native_mem(mut self, native_mem_interval: String) -> Self {
154 self.native_mem = Some(native_mem_interval);
155 self
156 }
157
158 /// If set, the profiler will collect information about
159 /// native memory allocations.
160 ///
161 /// The argument passed is the profiling interval - the profiler will
162 /// sample allocations every about that many bytes.
163 ///
164 /// See [ProfilingModes in the async-profiler docs] for more details.
165 ///
166 /// [ProfilingModes in the async-profiler docs]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilingModes.md#native-memory-leaks
167 ///
168 /// ### Examples
169 ///
170 /// This will sample allocations for every 10 megabytes allocated:
171 ///
172 /// ```
173 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
174 /// # use async_profiler_agent::profiler::SpawnError;
175 /// # fn main() -> Result<(), SpawnError> {
176 /// let opts = ProfilerOptionsBuilder::default().with_native_mem_bytes(10_000_000).build();
177 /// let profiler = ProfilerBuilder::default()
178 /// .with_profiler_options(opts)
179 /// .with_local_reporter("/tmp/profiles")
180 /// .build();
181 /// # if false { // don't spawn the profiler in doctests
182 /// profiler.spawn()?;
183 /// # }
184 /// # Ok(())
185 /// # }
186 /// ```
187 ///
188 /// This will sample every allocation (potentially slow):
189 /// ```
190 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
191 /// # use async_profiler_agent::profiler::SpawnError;
192 /// # fn main() -> Result<(), SpawnError> {
193 /// let opts = ProfilerOptionsBuilder::default().with_native_mem_bytes(0).build();
194 /// let profiler = ProfilerBuilder::default()
195 /// .with_profiler_options(opts)
196 /// .with_local_reporter("/tmp/profiles")
197 /// .build();
198 /// # if false { // don't spawn the profiler in doctests
199 /// profiler.spawn()?;
200 /// # }
201 /// # Ok(())
202 /// # }
203 /// ```
204 pub fn with_native_mem_bytes(mut self, native_mem_interval: usize) -> Self {
205 self.native_mem = Some(native_mem_interval.to_string());
206 self
207 }
208
209 /// Sets the interval in which the profiler will collect
210 /// CPU-time samples, via the [async-profiler `interval` option].
211 ///
212 /// CPU-time samples (JFR `jdk.ExecutionSample`) sample only threads that
213 /// are currently running on a CPU, not threads that are sleeping.
214 ///
215 /// It can use a higher frequency than wall-clock sampling since the
216 /// number of the threads that are running on a CPU at a given time is
217 /// naturally limited by the number of CPUs, while the number of sleeping
218 /// threads can be much larger.
219 ///
220 /// The default is to do a CPU-time sample every 100 milliseconds.
221 ///
222 /// The async-profiler agent collects both CPU time and wall-clock time
223 /// samples, so this function should normally be used along with
224 /// [ProfilerOptionsBuilder::with_wall_clock_interval].
225 ///
226 /// [async-profiler `interval` option]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilerOptions.md#options-applicable-to-any-output-format
227 ///
228 /// ### Examples
229 ///
230 /// This will sample allocations for every 10 CPU milliseconds (when running)
231 /// and 100 wall-clock milliseconds (running or sleeping):
232 ///
233 /// ```
234 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
235 /// # use async_profiler_agent::profiler::SpawnError;
236 /// # use std::time::Duration;
237 /// # fn main() -> Result<(), SpawnError> {
238 /// let opts = ProfilerOptionsBuilder::default()
239 /// .with_cpu_interval(Duration::from_millis(10))
240 /// .with_wall_clock_interval(Duration::from_millis(100))
241 /// .build();
242 /// let profiler = ProfilerBuilder::default()
243 /// .with_profiler_options(opts)
244 /// .with_local_reporter("/tmp/profiles")
245 /// .build();
246 /// # if false { // don't spawn the profiler in doctests
247 /// profiler.spawn()?;
248 /// # }
249 /// # Ok(())
250 /// # }
251 /// ```
252 pub fn with_cpu_interval(mut self, cpu_interval: Duration) -> Self {
253 self.cpu_interval = Some(cpu_interval.as_nanos());
254 self
255 }
256
257 /// Sets the interval, in milliseconds, in which the profiler will collect
258 /// wall-clock samples, via the [async-profiler `wall` option].
259 ///
260 /// Wall-clock samples (JFR `profiler.WallClockSample`) sample threads
261 /// whether they are sleeping or running, and can therefore be
262 /// very useful for finding threads that are blocked, for example
263 /// on a synchronous lock or a slow system call.
264 ///
265 /// When using Tokio, since tasks are not threads, tasks that are not
266 /// currently running will not be sampled by a wall clock sample. However,
267 /// a wall clock sample is still very useful in Tokio, since it is what
268 /// you want to catch tasks that are blocking a thread by waiting on
269 /// synchronous operations.
270 ///
271 /// The default is to do a wall-clock sample every second.
272 ///
273 /// The async-profiler agent collects both CPU time and wall-clock time
274 /// samples, so this function should normally be used along with
275 /// [ProfilerOptionsBuilder::with_cpu_interval].
276 ///
277 /// [async-profiler `wall` option]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilerOptions.md#options-applicable-to-any-output-format
278 ///
279 /// ### Examples
280 ///
281 /// This will sample allocations for every 10 CPU milliseconds (when running)
282 /// and 100 wall-clock milliseconds (running or sleeping):
283 ///
284 /// ```
285 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
286 /// # use async_profiler_agent::profiler::SpawnError;
287 /// # use std::time::Duration;
288 /// # fn main() -> Result<(), SpawnError> {
289 /// let opts = ProfilerOptionsBuilder::default()
290 /// .with_cpu_interval(Duration::from_millis(10))
291 /// .with_wall_clock_interval(Duration::from_millis(10))
292 /// .build();
293 /// let profiler = ProfilerBuilder::default()
294 /// .with_profiler_options(opts)
295 /// .with_local_reporter("/tmp/profiles")
296 /// .build();
297 /// # if false { // don't spawn the profiler in doctests
298 /// profiler.spawn()?;
299 /// # }
300 /// # Ok(())
301 /// # }
302 /// ```
303 pub fn with_wall_clock_interval(mut self, wall_clock: Duration) -> Self {
304 self.wall_clock_millis = Some(wall_clock.as_millis());
305 self
306 }
307
308 /// Build the [`ProfilerOptions`] from the builder.
309 pub fn build(self) -> ProfilerOptions {
310 ProfilerOptions {
311 native_mem: self.native_mem,
312 wall_clock_millis: self.wall_clock_millis,
313 cpu_interval: self.cpu_interval,
314 }
315 }
316}
317
318/// Builds a [`Profiler`], panicking if any required fields were not set by the
319/// time `build` is called.
320#[derive(Debug, Default)]
321pub struct ProfilerBuilder {
322 reporting_interval: Option<Duration>,
323 reporter: Option<Box<dyn Reporter + Send + Sync>>,
324 agent_metadata: Option<AgentMetadata>,
325 profiler_options: Option<ProfilerOptions>,
326}
327
328impl ProfilerBuilder {
329 /// Sets the reporting interval (default: 30 seconds).
330 ///
331 /// This is the interval that samples are *reported* to the backend,
332 /// and is unrelated to the interval at which the application
333 /// is *sampled* by async profiler, which is controlled by
334 /// [ProfilerOptionsBuilder::with_cpu_interval] and
335 /// [ProfilerOptionsBuilder::with_wall_clock_interval].
336 ///
337 /// Most users should not change this setting.
338 ///
339 /// ## Example
340 ///
341 /// ```no_run
342 /// # use std::path::PathBuf;
343 /// # use std::time::Duration;
344 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
345 /// # let path = PathBuf::from(".");
346 /// let agent = ProfilerBuilder::default()
347 /// .with_local_reporter(path)
348 /// .with_reporting_interval(Duration::from_secs(15))
349 /// .build()
350 /// .spawn()?;
351 /// # Ok::<_, SpawnError>(())
352 /// ```
353 pub fn with_reporting_interval(mut self, i: Duration) -> ProfilerBuilder {
354 self.reporting_interval = Some(i);
355 self
356 }
357
358 /// Sets the [`Reporter`], which is used to upload the collected profiling
359 /// data. Common reporters are [`LocalReporter`], and, with the `s3-no-defaults`
360 /// feature enabled,
361 #[cfg_attr(not(feature = "s3-no-defaults"), doc = "`S3Reporter`.")]
362 #[cfg_attr(feature = "s3-no-defaults", doc = "[`S3Reporter`].")]
363 /// It is also possible to write your own [`Reporter`].
364 ///
365 /// It's normally easier to use [`LocalReporter`] directly via
366 /// [`ProfilerBuilder::with_local_reporter`].
367 ///
368 /// If you want to output to multiple reporters, you can use
369 /// [`MultiReporter`].
370 ///
371 /// [`LocalReporter`]: crate::reporter::local::LocalReporter
372 /// [`MultiReporter`]: crate::reporter::multi::MultiReporter
373 #[cfg_attr(
374 feature = "s3-no-defaults",
375 doc = "[`S3Reporter`]: crate::reporter::s3::S3Reporter"
376 )]
377 ///
378 #[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example.md"))]
379 pub fn with_reporter(mut self, r: impl Reporter + Send + Sync + 'static) -> ProfilerBuilder {
380 self.reporter = Some(Box::new(r));
381 self
382 }
383
384 /// Sets the profiler to ues [LocalReporter], which will write `.jfr` files to `path`,
385 /// and disables metadata auto-detection (see [`ProfilerBuilder::with_custom_agent_metadata`])
386 /// since the [LocalReporter] does not need that.
387 ///
388 /// This is useful for testing, since metadata auto-detection currently only works
389 /// on [Amazon EC2] or [Amazon Fargate] instances.
390 ///
391 /// The local reporter should normally not be used in production, since it will
392 /// not clean up JFR files. Instead, you can use a pre-existing [`Reporter`]
393 /// or write your own (see [`ProfilerBuilder::with_reporter`]).
394 ///
395 /// [Amazon EC2]: https://aws.amazon.com/ec2
396 /// [Amazon Fargate]: https://aws.amazon.com/fargate
397 ///
398 /// ## Example
399 ///
400 /// This will write profiles as `.jfr` files to `./path-to-profiles`:
401 ///
402 /// ```no_run
403 /// # use std::path::PathBuf;
404 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
405 /// # use async_profiler_agent::reporter::local::LocalReporter;
406 /// # use async_profiler_agent::metadata::AgentMetadata;
407 /// let path = PathBuf::from("./path-to-profiles");
408 /// let agent = ProfilerBuilder::default()
409 /// .with_local_reporter(path)
410 /// .build()
411 /// .spawn()?;
412 /// # Ok::<_, SpawnError>(())
413 /// ```
414 pub fn with_local_reporter(mut self, path: impl Into<PathBuf>) -> ProfilerBuilder {
415 self.reporter = Some(Box::new(LocalReporter::new(path.into())));
416 self.with_custom_agent_metadata(AgentMetadata::NoMetadata)
417 }
418
419 /// Provide custom agent metadata.
420 ///
421 /// The async-profiler Rust agent sends metadata to the [Reporter] with
422 /// the identity of the current host and process, which is normally
423 /// transmitted as `metadata.json` within the generated `.zip` file,
424 /// using the schema format [`reporter::s3::MetadataJson`].
425 ///
426 /// That metadata can later be used by tooling to be able to sort
427 /// profiling reports by host.
428 ///
429 /// async-profiler Rust agent will by default try to fetch the metadata
430 /// using [IMDS] when running on [Amazon EC2] or [Amazon Fargate], and
431 /// will error if it's unable to find it. If you are running the
432 /// async-profiler agent on any other form of compute,
433 /// you will need to create and attach your own metadata
434 /// by calling this function.
435 ///
436 #[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example-custom-metadata.md"))]
437 /// [`reporter::s3::MetadataJson`]: crate::reporter::s3::MetadataJson
438 /// [Amazon EC2]: https://aws.amazon.com/ec2
439 /// [Amazon Fargate]: https://aws.amazon.com/fargate
440 /// [IMDS]: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
441 pub fn with_custom_agent_metadata(mut self, j: AgentMetadata) -> ProfilerBuilder {
442 self.agent_metadata = Some(j);
443 self
444 }
445
446 /// Provide custom profiler options.
447 ///
448 /// ### Example
449 ///
450 /// This will sample allocations for every 10 megabytes allocated:
451 ///
452 /// ```
453 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
454 /// # use async_profiler_agent::profiler::SpawnError;
455 /// # fn main() -> Result<(), SpawnError> {
456 /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build();
457 /// let profiler = ProfilerBuilder::default()
458 /// .with_profiler_options(opts)
459 /// .with_local_reporter("/tmp/profiles")
460 /// .build();
461 /// # if false { // don't spawn the profiler in doctests
462 /// profiler.spawn()?;
463 /// # }
464 /// # Ok(())
465 /// # }
466 /// ```
467 pub fn with_profiler_options(mut self, c: ProfilerOptions) -> ProfilerBuilder {
468 self.profiler_options = Some(c);
469 self
470 }
471
472 /// Turn this builder into a profiler!
473 pub fn build(self) -> Profiler {
474 Profiler {
475 reporting_interval: self.reporting_interval.unwrap_or(Duration::from_secs(30)),
476 reporter: self.reporter.expect("reporter is required"),
477 agent_metadata: self.agent_metadata,
478 profiler_options: self.profiler_options.unwrap_or_default(),
479 }
480 }
481}
482
483enum Status {
484 Idle,
485 Starting,
486 Running(SystemTime),
487}
488
489/// This type provides wrapper APIs over [`asprof::AsProf`], to allow tracking
490/// of the state of the profiler. The primary benefit of this is RAII - when
491/// this type drops, it will stop the profiler if it's running.
492struct ProfilerState<E: ProfilerEngine> {
493 // this is only None in the destructor when stopping the async-profiler fails
494 jfr_file: Option<JfrFile>,
495 asprof: E,
496 status: Status,
497 profiler_options: ProfilerOptions,
498}
499
500impl<E: ProfilerEngine> ProfilerState<E> {
501 fn new(asprof: E, profiler_options: ProfilerOptions) -> Result<Self, io::Error> {
502 Ok(Self {
503 jfr_file: Some(JfrFile::new()?),
504 asprof,
505 status: Status::Idle,
506 profiler_options,
507 })
508 }
509
510 fn jfr_file_mut(&mut self) -> &mut JfrFile {
511 self.jfr_file.as_mut().unwrap()
512 }
513
514 async fn start(&mut self) -> Result<(), AsProfError> {
515 let active = self.jfr_file.as_ref().unwrap().active_path();
516 // drop guard - make sure the files are leaked if the profiler might have started
517 self.status = Status::Starting;
518 E::start_async_profiler(&self.asprof, &active, &self.profiler_options)?;
519 self.status = Status::Running(SystemTime::now());
520 Ok(())
521 }
522
523 fn stop(&mut self) -> Result<Option<SystemTime>, AsProfError> {
524 E::stop_async_profiler()?;
525 let status = std::mem::replace(&mut self.status, Status::Idle);
526 Ok(match status {
527 Status::Idle | Status::Starting => None,
528 Status::Running(since) => Some(since),
529 })
530 }
531
532 fn is_started(&self) -> bool {
533 matches!(self.status, Status::Running(_))
534 }
535}
536
537impl<E: ProfilerEngine> Drop for ProfilerState<E> {
538 fn drop(&mut self) {
539 match self.status {
540 Status::Running(_) => {
541 if let Err(err) = self.stop() {
542 // SECURITY: avoid removing the JFR file if stopping the profiler fails,
543 // to avoid symlink races
544 std::mem::forget(self.jfr_file.take());
545 // XXX: Rust defines leaking resources during drop as safe.
546 tracing::warn!(?err, "unable to stop profiler during drop glue");
547 }
548 }
549 Status::Idle => {}
550 Status::Starting => {
551 // SECURITY: avoid removing the JFR file if stopping the profiler fails,
552 // to avoid symlink races
553 std::mem::forget(self.jfr_file.take());
554 }
555 }
556 }
557}
558
559pub(crate) trait ProfilerEngine: Send + Sync + 'static {
560 fn init_async_profiler() -> Result<(), asprof::AsProfError>;
561 fn start_async_profiler(
562 &self,
563 jfr_file_path: &Path,
564 options: &ProfilerOptions,
565 ) -> Result<(), asprof::AsProfError>;
566 fn stop_async_profiler() -> Result<(), asprof::AsProfError>;
567}
568
569#[derive(Debug, Error)]
570#[non_exhaustive]
571enum TickError {
572 #[error(transparent)]
573 AsProf(#[from] AsProfError),
574 #[error(transparent)]
575 #[cfg(feature = "aws-metadata-no-defaults")]
576 Metadata(#[from] crate::metadata::aws::AwsProfilerMetadataError),
577 #[error("reporter: {0}")]
578 Reporter(Box<dyn std::error::Error + Send>),
579 #[error("broken clock: {0}")]
580 BrokenClock(#[from] SystemTimeError),
581 #[error("jfr read error: {0}")]
582 JfrRead(io::Error),
583 #[error("empty inactive file error: {0}")]
584 EmptyInactiveFile(io::Error),
585}
586
587#[derive(Debug, Error)]
588#[non_exhaustive]
589/// An error that happened spawning a profiler
590pub enum SpawnError {
591 /// Error from async-profiler
592 #[error(transparent)]
593 AsProf(#[from] asprof::AsProfError),
594 /// Error writing to a tempfile
595 #[error("tempfile error")]
596 TempFile(#[source] io::Error),
597}
598
599#[derive(Debug, Error)]
600#[non_exhaustive]
601/// An error from [`Profiler::spawn_thread`]
602pub enum SpawnThreadError {
603 /// Error from async-profiler
604 #[error(transparent)]
605 AsProf(#[from] SpawnError),
606 /// Error constructing Tokio runtime
607 #[error("constructing Tokio runtime")]
608 ConstructRt(#[source] io::Error),
609}
610
611// no control messages currently
612enum Control {}
613
614/// A handle to a running profiler
615///
616/// Currently just allows for stopping the profiler.
617///
618/// Dropping this handle will request that the profiler will stop.
619#[must_use = "dropping this stops the profiler, call .detach() to detach"]
620pub struct RunningProfiler {
621 stop_channel: tokio::sync::oneshot::Sender<Control>,
622 join_handle: tokio::task::JoinHandle<()>,
623}
624
625impl RunningProfiler {
626 /// Request that the current profiler stops and wait until it exits.
627 ///
628 /// This will cause the currently-pending profile information to be flushed.
629 ///
630 /// After this function returns, it is correct and safe to [spawn] a new
631 /// [Profiler], possibly with a different configuration. Therefore,
632 /// this function can be used to "reconfigure" a profiler by stopping
633 /// it and then starting a new one with a different configuration.
634 ///
635 /// [spawn]: Profiler::spawn_controllable
636 pub async fn stop(self) {
637 drop(self.stop_channel);
638 let _ = self.join_handle.await;
639 }
640
641 /// Like [Self::detach], but returns a JoinHandle. This is currently not a public API.
642 fn detach_inner(self) -> tokio::task::JoinHandle<()> {
643 tokio::task::spawn(async move {
644 // move the control channel to the spawned task. this way, it will be dropped
645 // just when the task is aborted.
646 let _abort_channel = self.stop_channel;
647 self.join_handle.await.ok();
648 })
649 }
650
651 /// Detach this profiler. This will prevent the profiler from being stopped
652 /// when this handle is dropped. You should call this (or [Profiler::spawn]
653 /// instead of [Profiler::spawn_controllable], which does the same thing)
654 /// if you don't intend to reconfigure your profiler at runtime.
655 pub fn detach(self) {
656 self.detach_inner();
657 }
658
659 /// Spawns this [RunningProfiler] into a separate thread within a new Tokio runtime,
660 /// and returns a [RunningProfilerThread] attached to it.
661 fn spawn_attached(
662 self,
663 runtime: tokio::runtime::Runtime,
664 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
665 ) -> RunningProfilerThread {
666 RunningProfilerThread {
667 stop_channel: self.stop_channel,
668 join_handle: spawn_fn(Box::new(move || {
669 let _ = runtime.block_on(self.join_handle);
670 })),
671 }
672 }
673
674 /// Spawns this [RunningProfiler] into a separate thread within a new Tokio runtime,
675 /// and detaches it.
676 fn spawn_detached(
677 self,
678 runtime: tokio::runtime::Runtime,
679 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
680 ) {
681 spawn_fn(Box::new(move || {
682 let _stop_channel = self.stop_channel;
683 let _ = runtime.block_on(self.join_handle);
684 }));
685 }
686}
687
688/// A handle to a running profiler, running on a separate thread.
689///
690/// Currently just allows for stopping the profiler.
691///
692/// Dropping this handle will request that the profiler will stop.
693#[must_use = "dropping this stops the profiler, call .detach() to detach"]
694pub struct RunningProfilerThread {
695 stop_channel: tokio::sync::oneshot::Sender<Control>,
696 join_handle: std::thread::JoinHandle<()>,
697}
698
699impl RunningProfilerThread {
700 /// Request that the current profiler stops and wait until it exits.
701 ///
702 /// This will cause the currently-pending profile information to be flushed.
703 ///
704 /// After this function returns, it is correct and safe to [spawn] a new
705 /// [Profiler], possibly with a different configuration. Therefore,
706 /// this function can be used to "reconfigure" a profiler by stopping
707 /// it and then starting a new one with a different configuration.
708 ///
709 /// [spawn]: Profiler::spawn_controllable
710 pub fn stop(self) {
711 drop(self.stop_channel);
712 let _ = self.join_handle.join();
713 }
714}
715
716/// Rust profiler based on [async-profiler].
717///
718/// Spawning a profiler can be done either in an attached (controllable)
719/// mode, which allows for stopping the profiler (and, in fact, stops
720/// it when the relevant handle is dropped), or in detached mode,
721/// in which the profiler keeps running forever. Applications that can
722/// shut down the profiler at run-time, for example applications that
723/// support reconfiguration of a running profiler, generally want to use
724/// controllable mode. Other applications (most of them) should use
725/// detached mode.
726///
727/// In addition, the profiler can either be spawned into the current Tokio
728/// runtime, or into a new one. Normally, applications should spawn
729/// the profiler into their own Tokio runtime, but applications that
730/// don't have a default Tokio runtime should spawn it into a
731/// different one
732///
733/// This leaves 4 functions:
734/// 1. [Self::spawn] - detached, same runtime
735/// 2. [Self::spawn_thread_to_runtime] - detached, different runtime
736/// 3. [Self::spawn_controllable] - controllable, same runtime
737/// 4. [Self::spawn_controllable_thread_to_runtime] - controllable, different runtime
738///
739/// In addition, there's a helper function that just spawns the profiler
740/// to a new runtime in a new thread, for applications that don't have
741/// a Tokio runtime and don't need complex control:
742///
743/// 5. [Self::spawn_thread] - detached, new runtime in a new thread
744///
745/// [async-profiler]: https://github.com/async-profiler/async-profiler
746pub struct Profiler {
747 reporting_interval: Duration,
748 reporter: Box<dyn Reporter + Send + Sync>,
749 agent_metadata: Option<AgentMetadata>,
750 profiler_options: ProfilerOptions,
751}
752
753impl Profiler {
754 /// Start profiling. The profiler will run in a tokio task at the configured interval.
755 ///
756 /// This is the same as calling [Profiler::spawn_controllable] followed by
757 /// [RunningProfiler::detach], except it returns a [JoinHandle].
758 ///
759 /// The returned [JoinHandle] can be used to detect if the profiler has exited
760 /// due to a fatal error.
761 ///
762 /// This function will fail if it is unable to start async-profiler, for example
763 /// if it can't find or load `libasyncProfiler.so`.
764 ///
765 /// [JoinHandle]: tokio::task::JoinHandle
766 ///
767 /// ### Tokio Runtime
768 ///
769 /// This function must be run within a Tokio runtime, otherwise it will panic. If
770 /// your application does not have a `main` Tokio runtime, see
771 /// [Profiler::spawn_thread].
772 ///
773 /// ### Example
774 ///
775 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
776 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
777 ///
778 /// ```
779 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
780 /// # #[tokio::main]
781 /// # async fn main() -> Result<(), SpawnError> {
782 /// let profiler = ProfilerBuilder::default()
783 /// .with_local_reporter("/tmp/profiles")
784 /// .build();
785 /// # if false { // don't spawn the profiler in doctests
786 /// profiler.spawn()?;
787 /// # }
788 /// # Ok(())
789 /// # }
790 /// ```
791 pub fn spawn(self) -> Result<tokio::task::JoinHandle<()>, SpawnError> {
792 self.spawn_controllable().map(RunningProfiler::detach_inner)
793 }
794
795 /// Like [Self::spawn], but instead of spawning within the current Tokio
796 /// runtime, spawns within a set Tokio runtime and then runs a thread that calls
797 /// [block_on](tokio::runtime::Runtime::block_on) on that runtime.
798 ///
799 /// If your configuration is standard, use [Profiler::spawn_thread].
800 ///
801 /// If you want to be able to stop the resulting profiler, use
802 /// [Profiler::spawn_controllable_thread_to_runtime].
803 ///
804 /// `spawn_fn` should be [`std::thread::spawn`], or some function that behaves like it (to
805 /// allow for configuring thread properties, for example thread names).
806 ///
807 /// This is to be used when your program does not have a "main" Tokio runtime already set up.
808 ///
809 /// ### Example
810 ///
811 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
812 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
813 ///
814 /// ```no_run
815 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
816 /// let rt = tokio::runtime::Builder::new_current_thread()
817 /// .enable_all()
818 /// .build()?;
819 /// let profiler = ProfilerBuilder::default()
820 /// .with_local_reporter("/tmp/profiles")
821 /// .build();
822 ///
823 /// profiler.spawn_thread_to_runtime(
824 /// rt,
825 /// |t| {
826 /// std::thread::Builder::new()
827 /// .name("asprof-agent".to_owned())
828 /// .spawn(t)
829 /// .expect("thread name contains nuls")
830 /// }
831 /// )?;
832 /// # Ok::<_, anyhow::Error>(())
833 /// ```
834 pub fn spawn_thread_to_runtime(
835 self,
836 runtime: tokio::runtime::Runtime,
837 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
838 ) -> Result<(), SpawnError> {
839 self.spawn_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn)
840 }
841
842 /// Like [Self::spawn], but instead of spawning within the current Tokio
843 /// runtime, spawns within a new Tokio runtime and then runs a thread that calls
844 /// [block_on](tokio::runtime::Runtime::block_on) on that runtime, setting up the runtime
845 /// by itself.
846 ///
847 /// If your configuration is less standard, use [Profiler::spawn_thread_to_runtime]. Calling
848 /// [Profiler::spawn_thread] is equivalent to calling [Profiler::spawn_thread_to_runtime]
849 /// with the following:
850 /// 1. a current thread runtime with background worker threads (these exist
851 /// for blocking IO) named "asprof-worker"
852 /// 2. a controller thread (the "main" thread of the runtime) named "asprof-agent"
853 ///
854 /// If you want to be able to stop the resulting profiler, use
855 /// [Profiler::spawn_controllable_thread_to_runtime].
856 ///
857 /// This is to be used when your program does not have a "main" Tokio runtime already set up.
858 ///
859 /// ### Example
860 ///
861 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
862 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
863 ///
864 /// ```no_run
865 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
866 /// # use async_profiler_agent::reporter::local::LocalReporter;
867 /// let profiler = ProfilerBuilder::default()
868 /// .with_local_reporter("/tmp/profiles")
869 /// .build();
870 ///
871 /// profiler.spawn_thread()?;
872 /// # Ok::<_, anyhow::Error>(())
873 /// ```
874 pub fn spawn_thread(self) -> Result<(), SpawnThreadError> {
875 // using "asprof" in thread name to deal with 15 character + \0 length limit
876 let rt = tokio::runtime::Builder::new_current_thread()
877 .thread_name("asprof-worker".to_owned())
878 .enable_all()
879 .build()
880 .map_err(SpawnThreadError::ConstructRt)?;
881 let builder = std::thread::Builder::new().name("asprof-agent".to_owned());
882 self.spawn_thread_to_runtime(rt, |t| builder.spawn(t).expect("thread name contains nuls"))
883 .map_err(SpawnThreadError::AsProf)
884 }
885
886 fn spawn_thread_inner<E: ProfilerEngine>(
887 self,
888 asprof: E,
889 runtime: tokio::runtime::Runtime,
890 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
891 ) -> Result<(), SpawnError> {
892 let handle: RunningProfiler = runtime.block_on(async move { self.spawn_inner(asprof) })?;
893 handle.spawn_detached(runtime, spawn_fn);
894 Ok(())
895 }
896
897 /// Like [Self::spawn], but returns a [RunningProfiler] that allows for controlling
898 /// (currently only stopping) the profiler.
899 ///
900 /// This allows for changing the configuration of the profiler at runtime, by
901 /// stopping it and then starting a new Profiler with a new configuration. It
902 /// also allows for stopping profiling in case the profiler is suspected to
903 /// cause operational issues.
904 ///
905 /// Dropping the returned [RunningProfiler] will cause the profiler to quit,
906 /// so if your application doen't need to change the profiler's configuration at runtime,
907 /// it will be easier to use [Profiler::spawn].
908 ///
909 /// This function will fail if it is unable to start async-profiler, for example
910 /// if it can't find or load `libasyncProfiler.so`.
911 ///
912 /// ### Tokio Runtime
913 ///
914 /// This function must be run within a Tokio runtime, otherwise it will panic. If
915 /// your application does not have a `main` Tokio runtime, see
916 /// [Profiler::spawn_controllable_thread_to_runtime].
917 ///
918 /// ### Example
919 ///
920 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
921 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
922 ///
923 /// ```no_run
924 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
925 /// # #[tokio::main]
926 /// # async fn main() -> Result<(), SpawnError> {
927 /// let profiler = ProfilerBuilder::default()
928 /// .with_local_reporter("/tmp/profiles")
929 /// .build();
930 ///
931 /// let profiler = profiler.spawn_controllable()?;
932 ///
933 /// // [insert your signaling/monitoring mechanism to have a request to disable
934 /// // profiling in case of a problem]
935 /// let got_request_to_disable_profiling = async move {
936 /// // ...
937 /// # false
938 /// };
939 /// // spawn a task that will disable profiling if requested
940 /// tokio::task::spawn(async move {
941 /// if got_request_to_disable_profiling.await {
942 /// profiler.stop().await;
943 /// }
944 /// });
945 /// # Ok(())
946 /// # }
947 /// ```
948 pub fn spawn_controllable(self) -> Result<RunningProfiler, SpawnError> {
949 self.spawn_inner(asprof::AsProf::builder().build())
950 }
951
952 /// Like [Self::spawn_controllable], but instead of spawning within the current Tokio
953 /// runtime, spawns within a set Tokio runtime and then runs a thread that calls
954 /// [block_on](tokio::runtime::Runtime::block_on) on that runtime.
955 ///
956 /// `spawn_fn` should be [`std::thread::spawn`], or some function that behaves like it (to
957 /// allow for configuring thread properties, for example thread names).
958 ///
959 /// This is to be used when your program does not have a "main" Tokio runtime already set up.
960 ///
961 /// ### Example
962 ///
963 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
964 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
965 ///
966 /// ```no_run
967 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
968 /// let rt = tokio::runtime::Builder::new_current_thread()
969 /// .enable_all()
970 /// .build()?;
971 /// let profiler = ProfilerBuilder::default()
972 /// .with_local_reporter("/tmp/profiles")
973 /// .build();
974 ///
975 /// let profiler = profiler.spawn_controllable_thread_to_runtime(
976 /// rt,
977 /// |t| {
978 /// std::thread::Builder::new()
979 /// .name("asprof-agent".to_owned())
980 /// .spawn(t)
981 /// .expect("thread name contains nuls")
982 /// }
983 /// )?;
984 ///
985 /// # fn got_request_to_disable_profiling() -> bool { false }
986 /// // spawn a task that will disable profiling if requested
987 /// std::thread::spawn(move || {
988 /// if got_request_to_disable_profiling() {
989 /// profiler.stop();
990 /// }
991 /// });
992 /// # Ok::<_, anyhow::Error>(())
993 /// ```
994 pub fn spawn_controllable_thread_to_runtime(
995 self,
996 runtime: tokio::runtime::Runtime,
997 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
998 ) -> Result<RunningProfilerThread, SpawnError> {
999 self.spawn_controllable_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn)
1000 }
1001
1002 fn spawn_controllable_thread_inner<E: ProfilerEngine>(
1003 self,
1004 asprof: E,
1005 runtime: tokio::runtime::Runtime,
1006 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
1007 ) -> Result<RunningProfilerThread, SpawnError> {
1008 let handle = runtime.block_on(async move { self.spawn_inner(asprof) })?;
1009 Ok(handle.spawn_attached(runtime, spawn_fn))
1010 }
1011
1012 fn spawn_inner<E: ProfilerEngine>(self, asprof: E) -> Result<RunningProfiler, SpawnError> {
1013 // Initialize async profiler - needs to be done once.
1014 E::init_async_profiler()?;
1015 tracing::info!("successfully initialized async profiler.");
1016
1017 let mut sampling_ticker = tokio::time::interval(self.reporting_interval);
1018 let (stop_channel, mut stop_rx) = tokio::sync::oneshot::channel();
1019
1020 // Get profiles at the configured interval rate.
1021 let join_handle = tokio::spawn(async move {
1022 let mut state = match ProfilerState::new(asprof, self.profiler_options) {
1023 Ok(state) => state,
1024 Err(err) => {
1025 tracing::error!(?err, "unable to create profiler state");
1026 return;
1027 }
1028 };
1029
1030 // Lazily-loaded if not specified up front.
1031 let mut agent_metadata = self.agent_metadata;
1032 let mut done = false;
1033
1034 while !done {
1035 // Wait until a timer or exit event
1036 tokio::select! {
1037 biased;
1038
1039 r = &mut stop_rx, if !stop_rx.is_terminated() => {
1040 match r {
1041 Err(_) => {
1042 tracing::info!("profiler stop requested, doing a final tick");
1043 done = true;
1044 }
1045 }
1046 }
1047 _ = sampling_ticker.tick() => {
1048 tracing::debug!("profiler timer woke up");
1049 }
1050 }
1051
1052 if let Err(err) = profiler_tick(
1053 &mut state,
1054 &mut agent_metadata,
1055 &*self.reporter,
1056 self.reporting_interval,
1057 )
1058 .await
1059 {
1060 match &err {
1061 TickError::Reporter(_) => {
1062 // don't stop on IO errors
1063 tracing::error!(?err, "error during profiling, continuing");
1064 }
1065 _stop => {
1066 tracing::error!(?err, "error during profiling, stopping");
1067 break;
1068 }
1069 }
1070 }
1071 }
1072
1073 tracing::info!("profiling task finished");
1074 });
1075
1076 Ok(RunningProfiler {
1077 stop_channel,
1078 join_handle,
1079 })
1080 }
1081}
1082
1083async fn profiler_tick<E: ProfilerEngine>(
1084 state: &mut ProfilerState<E>,
1085 agent_metadata: &mut Option<AgentMetadata>,
1086 reporter: &(dyn Reporter + Send + Sync),
1087 reporting_interval: Duration,
1088) -> Result<(), TickError> {
1089 if !state.is_started() {
1090 state.start().await?;
1091 return Ok(());
1092 }
1093
1094 let Some(start) = state.stop()? else {
1095 tracing::warn!("stopped the profiler but it wasn't running?");
1096 return Ok(());
1097 };
1098 let start = start.duration_since(UNIX_EPOCH)?;
1099 let end = SystemTime::now().duration_since(UNIX_EPOCH)?;
1100
1101 // Start it up immediately, writing to the "other" file, so that we keep
1102 // profiling the application while we're reporting data.
1103 state
1104 .jfr_file_mut()
1105 .empty_inactive_file()
1106 .map_err(TickError::EmptyInactiveFile)?;
1107 state.jfr_file_mut().swap();
1108 state.start().await?;
1109
1110 // Lazily load the agent metadata if it was not provided in
1111 // the constructor. See the struct comments for why this is.
1112 // This code runs at most once.
1113 if agent_metadata.is_none() {
1114 #[cfg(feature = "aws-metadata-no-defaults")]
1115 let md = crate::metadata::aws::load_agent_metadata().await?;
1116 #[cfg(not(feature = "aws-metadata-no-defaults"))]
1117 let md = crate::metadata::AgentMetadata::NoMetadata;
1118 tracing::debug!("loaded metadata");
1119 agent_metadata.replace(md);
1120 }
1121
1122 let report_metadata = ReportMetadata {
1123 instance: agent_metadata.as_ref().unwrap(),
1124 start,
1125 end,
1126 reporting_interval,
1127 };
1128
1129 let jfr = tokio::fs::read(state.jfr_file_mut().inactive_path())
1130 .await
1131 .map_err(TickError::JfrRead)?;
1132
1133 reporter
1134 .report(jfr, &report_metadata)
1135 .await
1136 .map_err(TickError::Reporter)?;
1137
1138 Ok(())
1139}
1140
1141#[cfg(test)]
1142mod tests {
1143 use std::sync::Arc;
1144 use std::sync::atomic::{self, AtomicBool, AtomicU32};
1145
1146 use test_case::test_case;
1147
1148 use super::*;
1149
1150 #[test]
1151 fn test_jfr_file_drop() {
1152 let mut jfr = JfrFile::new().unwrap();
1153
1154 std::fs::write(jfr.active_path(), b"Hello, 2!").unwrap();
1155 jfr.swap();
1156 assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"Hello, 2!");
1157 jfr.empty_inactive_file().unwrap();
1158 assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"");
1159 }
1160
1161 struct MockProfilerEngine {
1162 counter: AtomicU32,
1163 }
1164 impl ProfilerEngine for MockProfilerEngine {
1165 fn init_async_profiler() -> Result<(), asprof::AsProfError> {
1166 Ok(())
1167 }
1168
1169 fn start_async_profiler(
1170 &self,
1171 jfr_file_path: &Path,
1172 _options: &ProfilerOptions,
1173 ) -> Result<(), asprof::AsProfError> {
1174 let contents = format!(
1175 "JFR{}",
1176 self.counter.fetch_add(1, atomic::Ordering::Relaxed)
1177 );
1178 std::fs::write(jfr_file_path, contents.as_bytes()).unwrap();
1179 Ok(())
1180 }
1181
1182 fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
1183 Ok(())
1184 }
1185 }
1186
1187 struct MockReporter(tokio::sync::mpsc::Sender<(String, AgentMetadata)>);
1188 impl std::fmt::Debug for MockReporter {
1189 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1190 f.debug_struct("MockReporter").finish()
1191 }
1192 }
1193
1194 #[async_trait::async_trait]
1195 impl Reporter for MockReporter {
1196 async fn report(
1197 &self,
1198 jfr: Vec<u8>,
1199 metadata: &ReportMetadata,
1200 ) -> Result<(), Box<dyn std::error::Error + Send>> {
1201 self.0
1202 .send((String::from_utf8(jfr).unwrap(), metadata.instance.clone()))
1203 .await
1204 .unwrap();
1205 Ok(())
1206 }
1207 }
1208
1209 fn make_mock_profiler() -> (
1210 Profiler,
1211 tokio::sync::mpsc::Receiver<(String, AgentMetadata)>,
1212 ) {
1213 let (tx, rx) = tokio::sync::mpsc::channel(1);
1214 let agent = ProfilerBuilder::default()
1215 .with_reporter(MockReporter(tx))
1216 .with_custom_agent_metadata(AgentMetadata::Ec2AgentMetadata {
1217 aws_account_id: "0".into(),
1218 aws_region_id: "us-east-1".into(),
1219 ec2_instance_id: "i-fake".into(),
1220 #[cfg(feature = "__unstable-fargate-cpu-count")]
1221 ec2_instance_type: "t3.micro".into(),
1222 })
1223 .build();
1224 (agent, rx)
1225 }
1226
1227 #[tokio::test(start_paused = true)]
1228 async fn test_profiler_agent() {
1229 let e_md = AgentMetadata::Ec2AgentMetadata {
1230 aws_account_id: "0".into(),
1231 aws_region_id: "us-east-1".into(),
1232 ec2_instance_id: "i-fake".into(),
1233 #[cfg(feature = "__unstable-fargate-cpu-count")]
1234 ec2_instance_type: "t3.micro".into(),
1235 };
1236 let (agent, mut rx) = make_mock_profiler();
1237 agent
1238 .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1239 counter: AtomicU32::new(0),
1240 })
1241 .unwrap()
1242 .detach();
1243 let (jfr, md) = rx.recv().await.unwrap();
1244 assert_eq!(jfr, "JFR0");
1245 assert_eq!(e_md, md);
1246 let (jfr, md) = rx.recv().await.unwrap();
1247 assert_eq!(jfr, "JFR1");
1248 assert_eq!(e_md, md);
1249 }
1250
1251 #[test_case(false; "uncontrollable")]
1252 #[test_case(true; "controllable")]
1253 fn test_profiler_local_rt(controllable: bool) {
1254 let e_md = AgentMetadata::Ec2AgentMetadata {
1255 aws_account_id: "0".into(),
1256 aws_region_id: "us-east-1".into(),
1257 ec2_instance_id: "i-fake".into(),
1258 #[cfg(feature = "__unstable-fargate-cpu-count")]
1259 ec2_instance_type: "t3.micro".into(),
1260 };
1261 let (agent, mut rx) = make_mock_profiler();
1262 let rt = tokio::runtime::Builder::new_current_thread()
1263 .enable_all()
1264 .start_paused(true)
1265 .build()
1266 .unwrap();
1267 // spawn the profiler, doing this before spawning a thread to allow
1268 // capturing errors from `spawn`
1269 let handle = if controllable {
1270 Some(
1271 agent
1272 .spawn_controllable_thread_inner::<MockProfilerEngine>(
1273 MockProfilerEngine {
1274 counter: AtomicU32::new(0),
1275 },
1276 rt,
1277 std::thread::spawn,
1278 )
1279 .unwrap(),
1280 )
1281 } else {
1282 agent
1283 .spawn_thread_inner::<MockProfilerEngine>(
1284 MockProfilerEngine {
1285 counter: AtomicU32::new(0),
1286 },
1287 rt,
1288 std::thread::spawn,
1289 )
1290 .unwrap();
1291 None
1292 };
1293
1294 let (jfr, md) = rx.blocking_recv().unwrap();
1295 assert_eq!(jfr, "JFR0");
1296 assert_eq!(e_md, md);
1297 let (jfr, md) = rx.blocking_recv().unwrap();
1298 assert_eq!(jfr, "JFR1");
1299 assert_eq!(e_md, md);
1300
1301 if let Some(handle) = handle {
1302 let drain_thread =
1303 std::thread::spawn(move || while let Some(_) = rx.blocking_recv() {});
1304 // request a stop
1305 handle.stop();
1306 // the drain thread should be done
1307 drain_thread.join().unwrap();
1308 }
1309 }
1310
1311 enum StopKind {
1312 Delibrate,
1313 Drop,
1314 Abort,
1315 }
1316
1317 #[tokio::test(start_paused = true)]
1318 #[test_case(StopKind::Delibrate; "deliberate stop")]
1319 #[test_case(StopKind::Drop; "drop stop")]
1320 #[test_case(StopKind::Abort; "abort stop")]
1321 async fn test_profiler_stop(stop_kind: StopKind) {
1322 let e_md = AgentMetadata::Ec2AgentMetadata {
1323 aws_account_id: "0".into(),
1324 aws_region_id: "us-east-1".into(),
1325 ec2_instance_id: "i-fake".into(),
1326 #[cfg(feature = "__unstable-fargate-cpu-count")]
1327 ec2_instance_type: "t3.micro".into(),
1328 };
1329 let (agent, mut rx) = make_mock_profiler();
1330 let profiler_ref = agent
1331 .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1332 counter: AtomicU32::new(0),
1333 })
1334 .unwrap();
1335 let (jfr, md) = rx.recv().await.unwrap();
1336 assert_eq!(jfr, "JFR0");
1337 assert_eq!(e_md, md);
1338 let (jfr, md) = rx.recv().await.unwrap();
1339 assert_eq!(jfr, "JFR1");
1340 assert_eq!(e_md, md);
1341 // check that stop is faster than an interval and returns an "immediate" next jfr
1342 match stop_kind {
1343 StopKind::Drop => drop(profiler_ref),
1344 StopKind::Delibrate => {
1345 tokio::time::timeout(Duration::from_millis(1), profiler_ref.stop())
1346 .await
1347 .unwrap();
1348 }
1349 StopKind::Abort => {
1350 // You can call Abort on the JoinHandle. make sure that is not buggy.
1351 profiler_ref.detach_inner().abort();
1352 }
1353 }
1354 // check that we get the next JFR "quickly", and the JFR after that is empty.
1355 let (jfr, md) = tokio::time::timeout(Duration::from_millis(1), rx.recv())
1356 .await
1357 .unwrap()
1358 .unwrap();
1359 assert_eq!(jfr, "JFR2");
1360 assert_eq!(e_md, md);
1361 assert!(rx.recv().await.is_none());
1362 }
1363
1364 // simulate a badly-behaved profiler that errors on start/stop and then
1365 // tries to access the JFR file
1366 struct StopErrorProfilerEngine {
1367 start_error: bool,
1368 counter: Arc<AtomicBool>,
1369 }
1370 impl ProfilerEngine for StopErrorProfilerEngine {
1371 fn init_async_profiler() -> Result<(), asprof::AsProfError> {
1372 Ok(())
1373 }
1374
1375 fn start_async_profiler(
1376 &self,
1377 jfr_file_path: &Path,
1378 _options: &ProfilerOptions,
1379 ) -> Result<(), asprof::AsProfError> {
1380 let jfr_file_path = jfr_file_path.to_owned();
1381 std::fs::write(&jfr_file_path, "JFR").unwrap();
1382 let counter = self.counter.clone();
1383 tokio::task::spawn(async move {
1384 tokio::time::sleep(Duration::from_secs(5)).await;
1385 assert_eq!(std::fs::read_to_string(jfr_file_path).unwrap(), "JFR");
1386 counter.store(true, atomic::Ordering::Release);
1387 });
1388 if self.start_error {
1389 Err(asprof::AsProfError::AsyncProfilerError("error".into()))
1390 } else {
1391 Ok(())
1392 }
1393 }
1394
1395 fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
1396 Err(asprof::AsProfError::AsyncProfilerError("error".into()))
1397 }
1398 }
1399
1400 #[tokio::test(start_paused = true)]
1401 #[test_case(false; "error on stop")]
1402 #[test_case(true; "error on start")]
1403 async fn test_profiler_error(start_error: bool) {
1404 let (agent, mut rx) = make_mock_profiler();
1405 let counter = Arc::new(AtomicBool::new(false));
1406 let engine = StopErrorProfilerEngine {
1407 start_error,
1408 counter: counter.clone(),
1409 };
1410 let handle = agent.spawn_inner(engine).unwrap().detach_inner();
1411 assert!(rx.recv().await.is_none());
1412 // check that the "sleep 5" step in start_async_profiler succeeds
1413 for _ in 0..100 {
1414 tokio::time::sleep(Duration::from_secs(1)).await;
1415 if counter.load(atomic::Ordering::Acquire) {
1416 handle.await.unwrap(); // Check that the JoinHandle is done
1417 return;
1418 }
1419 }
1420 panic!("didn't read from file");
1421 }
1422
1423 #[test]
1424 fn test_profiler_options_to_args_string_default() {
1425 let opts = ProfilerOptions::default();
1426 let dummy_path = Path::new("/tmp/test.jfr");
1427 let args = opts.to_args_string(dummy_path);
1428 assert!(
1429 args.contains("start,event=cpu,interval=100000000,wall=1000ms,jfr,cstack=dwarf"),
1430 "Default args string not constructed correctly"
1431 );
1432 assert!(args.contains("file=/tmp/test.jfr"));
1433 assert!(!args.contains("nativemem="));
1434 }
1435
1436 #[test]
1437 fn test_profiler_options_to_args_string_with_native_mem() {
1438 let opts = ProfilerOptions {
1439 native_mem: Some("10m".to_string()),
1440 wall_clock_millis: None,
1441 cpu_interval: None,
1442 };
1443 let dummy_path = Path::new("/tmp/test.jfr");
1444 let args = opts.to_args_string(dummy_path);
1445 assert!(args.contains("nativemem=10m"));
1446 }
1447
1448 #[test]
1449 fn test_profiler_options_builder() {
1450 let opts = ProfilerOptionsBuilder::default()
1451 .with_native_mem_bytes(5000000)
1452 .build();
1453
1454 assert_eq!(opts.native_mem, Some("5000000".to_string()));
1455 }
1456
1457 #[test]
1458 fn test_profiler_options_builder_all_options() {
1459 let opts = ProfilerOptionsBuilder::default()
1460 .with_native_mem_bytes(5000000)
1461 .with_cpu_interval(Duration::from_secs(1))
1462 .with_wall_clock_interval(Duration::from_secs(10))
1463 .build();
1464
1465 let dummy_path = Path::new("/tmp/test.jfr");
1466 let args = opts.to_args_string(dummy_path);
1467 assert_eq!(
1468 args,
1469 "start,event=cpu,interval=1000000000,wall=10000ms,jfr,cstack=dwarf,file=/tmp/test.jfr,nativemem=5000000"
1470 );
1471 }
1472
1473 #[test]
1474 fn test_local_reporter_has_no_metadata() {
1475 // Check that with_local_reporter sets some configuration
1476 let reporter = ProfilerBuilder::default().with_local_reporter(".");
1477 assert_eq!(
1478 format!("{:?}", reporter.reporter),
1479 r#"Some(LocalReporter { directory: "." })"#
1480 );
1481 match reporter.agent_metadata {
1482 Some(AgentMetadata::NoMetadata) => {}
1483 bad => panic!("{bad:?}"),
1484 };
1485 }
1486}