async_profiler_agent/
profiler.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! A profiler that periodically uploads profiling samples of your program to a [Reporter]
5
6use crate::{
7    asprof::{self, AsProfError},
8    metadata::{AgentMetadata, ReportMetadata},
9    reporter::{local::LocalReporter, Reporter},
10};
11use std::{
12    fs::File,
13    io,
14    path::{Path, PathBuf},
15    time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
16};
17use thiserror::Error;
18
19struct JfrFile {
20    active: std::fs::File,
21    inactive: std::fs::File,
22}
23
24impl JfrFile {
25    #[cfg(target_os = "linux")]
26    fn new() -> Result<Self, io::Error> {
27        Ok(Self {
28            active: tempfile::tempfile().unwrap(),
29            inactive: tempfile::tempfile().unwrap(),
30        })
31    }
32
33    #[cfg(not(target_os = "linux"))]
34    fn new() -> Result<Self, io::Error> {
35        io::Error::new(
36            io::ErrorKind::Other,
37            "async-profiler is only supported on Linux",
38        )
39    }
40
41    fn swap(&mut self) {
42        std::mem::swap(&mut self.active, &mut self.inactive);
43    }
44
45    #[cfg(target_os = "linux")]
46    fn file_path(file: &std::fs::File) -> PathBuf {
47        use std::os::fd::AsRawFd;
48
49        format!("/proc/self/fd/{}", file.as_raw_fd()).into()
50    }
51
52    #[cfg(not(target_os = "linux"))]
53    fn file_path(_file: &std::fs::File) -> PathBuf {
54        unimplemented!()
55    }
56
57    fn active_path(&self) -> PathBuf {
58        Self::file_path(&self.active)
59    }
60
61    fn inactive_path(&self) -> PathBuf {
62        Self::file_path(&self.inactive)
63    }
64
65    fn empty_inactive_file(&mut self) -> Result<(), io::Error> {
66        // Empty the file, or create it for the first time if the profiler hasn't
67        // started yet.
68        File::create(Self::file_path(&self.inactive))?;
69        tracing::debug!(message = "emptied the file");
70        Ok(())
71    }
72}
73
74/// Options for configuring the async-profiler behavior.
75/// Currently supports:
76/// - Native memory allocation tracking
77#[derive(Debug, Default)]
78#[non_exhaustive]
79pub struct ProfilerOptions {
80    /// If set, the profiler will collect information about
81    /// native memory allocations.
82    ///
83    /// The value is the interval in bytes or in other units,
84    /// if followed by k (kilobytes), m (megabytes), or g (gigabytes).
85    /// For example, `"10m"` will sample an allocation for every
86    /// 10 megabytes of memory allocated. Passing `"0"` will sample
87    /// all allocations.
88    ///
89    /// See [ProfilingModes in the async-profiler docs] for more details.
90    ///
91    /// [ProfilingModes in the async-profiler docs]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilingModes.md#native-memory-leaks
92    pub native_mem: Option<String>,
93    cpu_interval: Option<u128>,
94    wall_clock_millis: Option<u128>,
95}
96
97const DEFAULT_CPU_INTERVAL_NANOS: u128 = 100_000_000;
98const DEFAULT_WALL_CLOCK_INTERVAL_MILLIS: u128 = 1_000;
99
100impl ProfilerOptions {
101    /// Convert the profiler options to a string of arguments for the async-profiler.
102    pub fn to_args_string(&self, jfr_file_path: &std::path::Path) -> String {
103        let mut args = format!(
104            "start,event=cpu,interval={},wall={}ms,jfr,cstack=dwarf,file={}",
105            self.cpu_interval.unwrap_or(DEFAULT_CPU_INTERVAL_NANOS),
106            self.wall_clock_millis
107                .unwrap_or(DEFAULT_WALL_CLOCK_INTERVAL_MILLIS),
108            jfr_file_path.display()
109        );
110        if let Some(ref native_mem) = self.native_mem {
111            args.push_str(&format!(",nativemem={native_mem}"));
112        }
113        args
114    }
115}
116
117/// Builder for [`ProfilerOptions`].
118#[derive(Debug, Default)]
119pub struct ProfilerOptionsBuilder {
120    native_mem: Option<String>,
121    cpu_interval: Option<u128>,
122    wall_clock_millis: Option<u128>,
123}
124
125impl ProfilerOptionsBuilder {
126    /// Same as [ProfilerOptionsBuilder::with_native_mem_bytes], but pass
127    /// the string input directly to async_profiler.
128    ///
129    /// The value is the interval in bytes or in other units,
130    /// if followed by k (kilobytes), m (megabytes), or g (gigabytes).
131    ///
132    /// Prefer using [ProfilerOptionsBuilder::with_native_mem_bytes], since it's
133    /// type-checked.
134    ///
135    /// ### Examples
136    ///
137    /// This will sample allocations for every 10 megabytes allocated:
138    ///
139    /// ```
140    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
141    /// # use async_profiler_agent::profiler::SpawnError;
142    /// # fn main() -> Result<(), SpawnError> {
143    /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build();
144    /// let profiler = ProfilerBuilder::default()
145    ///     .with_profiler_options(opts)
146    ///     .with_local_reporter("/tmp/profiles")
147    ///     .build();
148    /// # if false { // don't spawn the profiler in doctests
149    /// profiler.spawn()?;
150    /// # }
151    /// # Ok(())
152    /// # }
153    /// ```
154    pub fn with_native_mem(mut self, native_mem_interval: String) -> Self {
155        self.native_mem = Some(native_mem_interval);
156        self
157    }
158
159    /// If set, the profiler will collect information about
160    /// native memory allocations.
161    ///
162    /// The argument passed is the profiling interval - the profiler will
163    /// sample allocations every about that many bytes.
164    ///
165    /// See [ProfilingModes in the async-profiler docs] for more details.
166    ///
167    /// [ProfilingModes in the async-profiler docs]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilingModes.md#native-memory-leaks
168    ///
169    /// ### Examples
170    ///
171    /// This will sample allocations for every 10 megabytes allocated:
172    ///
173    /// ```
174    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
175    /// # use async_profiler_agent::profiler::SpawnError;
176    /// # fn main() -> Result<(), SpawnError> {
177    /// let opts = ProfilerOptionsBuilder::default().with_native_mem_bytes(10_000_000).build();
178    /// let profiler = ProfilerBuilder::default()
179    ///     .with_profiler_options(opts)
180    ///     .with_local_reporter("/tmp/profiles")
181    ///     .build();
182    /// # if false { // don't spawn the profiler in doctests
183    /// profiler.spawn()?;
184    /// # }
185    /// # Ok(())
186    /// # }
187    /// ```
188    ///
189    /// This will sample every allocation (potentially slow):
190    /// ```
191    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
192    /// # use async_profiler_agent::profiler::SpawnError;
193    /// # fn main() -> Result<(), SpawnError> {
194    /// let opts = ProfilerOptionsBuilder::default().with_native_mem_bytes(0).build();
195    /// let profiler = ProfilerBuilder::default()
196    ///     .with_profiler_options(opts)
197    ///     .with_local_reporter("/tmp/profiles")
198    ///     .build();
199    /// # if false { // don't spawn the profiler in doctests
200    /// profiler.spawn()?;
201    /// # }
202    /// # Ok(())
203    /// # }
204    /// ```
205    pub fn with_native_mem_bytes(mut self, native_mem_interval: usize) -> Self {
206        self.native_mem = Some(native_mem_interval.to_string());
207        self
208    }
209
210    /// Sets the interval in which the profiler will collect
211    /// CPU-time samples, via the [async-profiler `interval` option].
212    ///
213    /// CPU-time samples (JFR `jdk.ExecutionSample`) sample only threads that
214    /// are currently running on a CPU, not threads that are sleeping.
215    ///
216    /// It can use a higher frequency than wall-clock sampling since the
217    /// number of the threads that are running on a CPU at a given time is
218    /// naturally limited by the number of CPUs, while the number of sleeping
219    /// threads can be much larger.
220    ///
221    /// The default is to do a CPU-time sample every 100 milliseconds.
222    ///
223    /// The async-profiler agent collects both CPU time and wall-clock time
224    /// samples, so this function should normally be used along with
225    /// [ProfilerOptionsBuilder::with_wall_clock_interval].
226    ///
227    /// [async-profiler `interval` option]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilerOptions.md#options-applicable-to-any-output-format
228    ///
229    /// ### Examples
230    ///
231    /// This will sample allocations for every 10 CPU milliseconds (when running)
232    /// and 100 wall-clock milliseconds (running or sleeping):
233    ///
234    /// ```
235    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
236    /// # use async_profiler_agent::profiler::SpawnError;
237    /// # use std::time::Duration;
238    /// # fn main() -> Result<(), SpawnError> {
239    /// let opts = ProfilerOptionsBuilder::default()
240    ///     .with_cpu_interval(Duration::from_millis(10))
241    ///     .with_wall_clock_interval(Duration::from_millis(100))
242    ///     .build();
243    /// let profiler = ProfilerBuilder::default()
244    ///     .with_profiler_options(opts)
245    ///     .with_local_reporter("/tmp/profiles")
246    ///     .build();
247    /// # if false { // don't spawn the profiler in doctests
248    /// profiler.spawn()?;
249    /// # }
250    /// # Ok(())
251    /// # }
252    /// ```
253    pub fn with_cpu_interval(mut self, cpu_interval: Duration) -> Self {
254        self.cpu_interval = Some(cpu_interval.as_nanos());
255        self
256    }
257
258    /// Sets the interval, in milliseconds, in which the profiler will collect
259    /// wall-clock samples, via the [async-profiler `wall` option].
260    ///
261    /// Wall-clock samples (JFR `profiler.WallClockSample`) sample threads
262    /// whether they are sleeping or running, and can therefore be
263    /// very useful for finding threads that are blocked, for example
264    /// on a synchronous lock or a slow system call.
265    ///
266    /// When using Tokio, since tasks are not threads, tasks that are not
267    /// currently running will not be sampled by a wall clock sample. However,
268    /// a wall clock sample is still very useful in Tokio, since it is what
269    /// you want to catch tasks that are blocking a thread by waiting on
270    /// synchronous operations.
271    ///
272    /// The default is to do a wall-clock sample every second.
273    ///
274    /// The async-profiler agent collects both CPU time and wall-clock time
275    /// samples, so this function should normally be used along with
276    /// [ProfilerOptionsBuilder::with_cpu_interval].
277    ///
278    /// [async-profiler `wall` option]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilerOptions.md#options-applicable-to-any-output-format
279    ///
280    /// ### Examples
281    ///
282    /// This will sample allocations for every 10 CPU milliseconds (when running)
283    /// and 100 wall-clock milliseconds (running or sleeping):
284    ///
285    /// ```
286    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
287    /// # use async_profiler_agent::profiler::SpawnError;
288    /// # use std::time::Duration;
289    /// # fn main() -> Result<(), SpawnError> {
290    /// let opts = ProfilerOptionsBuilder::default()
291    ///     .with_cpu_interval(Duration::from_millis(10))
292    ///     .with_wall_clock_interval(Duration::from_millis(10))
293    ///     .build();
294    /// let profiler = ProfilerBuilder::default()
295    ///     .with_profiler_options(opts)
296    ///     .with_local_reporter("/tmp/profiles")
297    ///     .build();
298    /// # if false { // don't spawn the profiler in doctests
299    /// profiler.spawn()?;
300    /// # }
301    /// # Ok(())
302    /// # }
303    /// ```
304    pub fn with_wall_clock_interval(mut self, wall_clock: Duration) -> Self {
305        self.wall_clock_millis = Some(wall_clock.as_millis());
306        self
307    }
308
309    /// Build the [`ProfilerOptions`] from the builder.
310    pub fn build(self) -> ProfilerOptions {
311        ProfilerOptions {
312            native_mem: self.native_mem,
313            wall_clock_millis: self.wall_clock_millis,
314            cpu_interval: self.cpu_interval,
315        }
316    }
317}
318
319/// Builds a [`Profiler`], panicking if any required fields were not set by the
320/// time `build` is called.
321#[derive(Debug, Default)]
322pub struct ProfilerBuilder {
323    reporting_interval: Option<Duration>,
324    reporter: Option<Box<dyn Reporter + Send + Sync>>,
325    agent_metadata: Option<AgentMetadata>,
326    profiler_options: Option<ProfilerOptions>,
327}
328
329impl ProfilerBuilder {
330    /// Sets the reporting interval (default: 30 seconds).
331    ///
332    /// This is the interval that samples are *reported* to the backend,
333    /// and is unrelated to the interval at which the application
334    /// is *sampled* by async profiler, which is controlled by
335    /// [ProfilerOptionsBuilder::with_cpu_interval] and
336    /// [ProfilerOptionsBuilder::with_wall_clock_interval].
337    ///
338    /// Most users should not change this setting.
339    ///
340    /// ## Example
341    ///
342    /// ```no_run
343    /// # use std::path::PathBuf;
344    /// # use std::time::Duration;
345    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
346    /// # let path = PathBuf::from(".");
347    /// let agent = ProfilerBuilder::default()
348    ///     .with_local_reporter(path)
349    ///     .with_reporting_interval(Duration::from_secs(15))
350    ///     .build()
351    ///     .spawn()?;
352    /// # Ok::<_, SpawnError>(())
353    /// ```
354    pub fn with_reporting_interval(mut self, i: Duration) -> ProfilerBuilder {
355        self.reporting_interval = Some(i);
356        self
357    }
358
359    /// Sets the [`Reporter`], which is used to upload the collected profiling
360    /// data. Common reporters are [`LocalReporter`], and, with the `s3-no-defaults`
361    /// feature enabled,
362    #[cfg_attr(not(feature = "s3-no-defaults"), doc = "`S3Reporter`.")]
363    #[cfg_attr(feature = "s3-no-defaults", doc = "[`S3Reporter`].")]
364    /// It is also possible to write your own [`Reporter`].
365    ///
366    /// It's normally easier to use [`LocalReporter`] directly via
367    /// [`ProfilerBuilder::with_local_reporter`].
368    ///
369    /// If you want to output to multiple reporters, you can use
370    /// [`MultiReporter`].
371    ///
372    /// [`LocalReporter`]: crate::reporter::local::LocalReporter
373    /// [`MultiReporter`]: crate::reporter::multi::MultiReporter
374    #[cfg_attr(
375        feature = "s3-no-defaults",
376        doc = "[`S3Reporter`]: crate::reporter::s3::S3Reporter"
377    )]
378    ///
379    #[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example.md"))]
380    pub fn with_reporter(mut self, r: impl Reporter + Send + Sync + 'static) -> ProfilerBuilder {
381        self.reporter = Some(Box::new(r));
382        self
383    }
384
385    /// Sets the profiler to ues [LocalReporter], which will write `.jfr` files to `path`,
386    /// and disables metadata auto-detection (see [`ProfilerBuilder::with_custom_agent_metadata`])
387    /// since the [LocalReporter] does not need that.
388    ///
389    /// This is useful for testing, since metadata auto-detection currently only works
390    /// on [Amazon EC2] or [Amazon Fargate] instances.
391    ///
392    /// The local reporter should normally not be used in production, since it will
393    /// not clean up JFR files. Instead, you can use a pre-existing [`Reporter`]
394    /// or write your own (see [`ProfilerBuilder::with_reporter`]).
395    ///
396    /// [Amazon EC2]: https://aws.amazon.com/ec2
397    /// [Amazon Fargate]: https://aws.amazon.com/fargate
398    ///
399    /// ## Example
400    ///
401    /// This will write profiles as `.jfr` files to `./path-to-profiles`:
402    ///
403    /// ```no_run
404    /// # use std::path::PathBuf;
405    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
406    /// # use async_profiler_agent::reporter::local::LocalReporter;
407    /// # use async_profiler_agent::metadata::AgentMetadata;
408    /// let path = PathBuf::from("./path-to-profiles");
409    /// let agent = ProfilerBuilder::default()
410    ///     .with_local_reporter(path)
411    ///     .build()
412    ///     .spawn()?;
413    /// # Ok::<_, SpawnError>(())
414    /// ```
415    pub fn with_local_reporter(mut self, path: impl Into<PathBuf>) -> ProfilerBuilder {
416        self.reporter = Some(Box::new(LocalReporter::new(path.into())));
417        self.with_custom_agent_metadata(AgentMetadata::NoMetadata)
418    }
419
420    /// Provide custom agent metadata.
421    ///
422    /// The async-profiler Rust agent sends metadata to the [Reporter] with
423    /// the identity of the current host and process, which is normally
424    /// transmitted as `metadata.json` within the generated `.zip` file,
425    /// using the schema format [`reporter::s3::MetadataJson`].
426    ///
427    /// That metadata can later be used by tooling to be able to sort
428    /// profiling reports by host.
429    ///
430    /// async-profiler Rust agent will by default try to fetch the metadata
431    /// using [IMDS] when running on [Amazon EC2] or [Amazon Fargate], and
432    /// will error if it's unable to find it. If you are running the
433    /// async-profiler agent on any other form of compute,
434    /// you will need to create and attach your own metadata
435    /// by calling this function.
436    ///
437    #[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example-custom-metadata.md"))]
438    /// [`reporter::s3::MetadataJson`]: crate::reporter::s3::MetadataJson
439    /// [Amazon EC2]: https://aws.amazon.com/ec2
440    /// [Amazon Fargate]: https://aws.amazon.com/fargate
441    /// [IMDS]: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
442    pub fn with_custom_agent_metadata(mut self, j: AgentMetadata) -> ProfilerBuilder {
443        self.agent_metadata = Some(j);
444        self
445    }
446
447    /// Provide custom profiler options.
448    ///
449    /// ### Example
450    ///
451    /// This will sample allocations for every 10 megabytes allocated:
452    ///
453    /// ```
454    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
455    /// # use async_profiler_agent::profiler::SpawnError;
456    /// # fn main() -> Result<(), SpawnError> {
457    /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build();
458    /// let profiler = ProfilerBuilder::default()
459    ///     .with_profiler_options(opts)
460    ///     .with_local_reporter("/tmp/profiles")
461    ///     .build();
462    /// # if false { // don't spawn the profiler in doctests
463    /// profiler.spawn()?;
464    /// # }
465    /// # Ok(())
466    /// # }
467    /// ```
468    pub fn with_profiler_options(mut self, c: ProfilerOptions) -> ProfilerBuilder {
469        self.profiler_options = Some(c);
470        self
471    }
472
473    /// Turn this builder into a profiler!
474    pub fn build(self) -> Profiler {
475        Profiler {
476            reporting_interval: self.reporting_interval.unwrap_or(Duration::from_secs(30)),
477            reporter: self.reporter.expect("reporter is required"),
478            agent_metadata: self.agent_metadata,
479            profiler_options: self.profiler_options.unwrap_or_default(),
480        }
481    }
482}
483
484enum Status {
485    Idle,
486    Starting,
487    Running(SystemTime),
488}
489
490/// This type provides wrapper APIs over [`asprof::AsProf`], to allow tracking
491/// of the state of the profiler. The primary benefit of this is RAII - when
492/// this type drops, it will stop the profiler if it's running.
493struct ProfilerState<E: ProfilerEngine> {
494    // this is only None in the destructor when stopping the async-profiler fails
495    jfr_file: Option<JfrFile>,
496    asprof: E,
497    status: Status,
498    profiler_options: ProfilerOptions,
499}
500
501impl<E: ProfilerEngine> ProfilerState<E> {
502    fn new(asprof: E, profiler_options: ProfilerOptions) -> Result<Self, io::Error> {
503        Ok(Self {
504            jfr_file: Some(JfrFile::new()?),
505            asprof,
506            status: Status::Idle,
507            profiler_options,
508        })
509    }
510
511    fn jfr_file_mut(&mut self) -> &mut JfrFile {
512        self.jfr_file.as_mut().unwrap()
513    }
514
515    async fn start(&mut self) -> Result<(), AsProfError> {
516        let active = self.jfr_file.as_ref().unwrap().active_path();
517        // drop guard - make sure the files are leaked if the profiler might have started
518        self.status = Status::Starting;
519        E::start_async_profiler(&self.asprof, &active, &self.profiler_options)?;
520        self.status = Status::Running(SystemTime::now());
521        Ok(())
522    }
523
524    fn stop(&mut self) -> Result<Option<SystemTime>, AsProfError> {
525        E::stop_async_profiler()?;
526        let status = std::mem::replace(&mut self.status, Status::Idle);
527        Ok(match status {
528            Status::Idle | Status::Starting => None,
529            Status::Running(since) => Some(since),
530        })
531    }
532
533    fn is_started(&self) -> bool {
534        matches!(self.status, Status::Running(_))
535    }
536}
537
538impl<E: ProfilerEngine> Drop for ProfilerState<E> {
539    fn drop(&mut self) {
540        match self.status {
541            Status::Running(_) => {
542                if let Err(err) = self.stop() {
543                    // SECURITY: avoid removing the JFR file if stopping the profiler fails,
544                    // to avoid symlink races
545                    std::mem::forget(self.jfr_file.take());
546                    // XXX: Rust defines leaking resources during drop as safe.
547                    tracing::warn!(?err, "unable to stop profiler during drop glue");
548                }
549            }
550            Status::Idle => {}
551            Status::Starting => {
552                // SECURITY: avoid removing the JFR file if stopping the profiler fails,
553                // to avoid symlink races
554                std::mem::forget(self.jfr_file.take());
555            }
556        }
557    }
558}
559
560pub(crate) trait ProfilerEngine: Send + Sync + 'static {
561    fn init_async_profiler() -> Result<(), asprof::AsProfError>;
562    fn start_async_profiler(
563        &self,
564        jfr_file_path: &Path,
565        options: &ProfilerOptions,
566    ) -> Result<(), asprof::AsProfError>;
567    fn stop_async_profiler() -> Result<(), asprof::AsProfError>;
568}
569
570#[derive(Debug, Error)]
571#[non_exhaustive]
572enum TickError {
573    #[error(transparent)]
574    AsProf(#[from] AsProfError),
575    #[error(transparent)]
576    #[cfg(feature = "aws-metadata-no-defaults")]
577    Metadata(#[from] crate::metadata::aws::AwsProfilerMetadataError),
578    #[error("reporter: {0}")]
579    Reporter(Box<dyn std::error::Error + Send>),
580    #[error("broken clock: {0}")]
581    BrokenClock(#[from] SystemTimeError),
582    #[error("jfr read error: {0}")]
583    JfrRead(io::Error),
584    #[error("empty inactive file error: {0}")]
585    EmptyInactiveFile(io::Error),
586}
587
588#[derive(Debug, Error)]
589#[non_exhaustive]
590/// An error that happened spawning a profiler
591pub enum SpawnError {
592    /// Error from async-profiler
593    #[error(transparent)]
594    AsProf(#[from] asprof::AsProfError),
595    /// Error writing to a tempfile
596    #[error("tempfile error")]
597    TempFile(#[source] io::Error),
598}
599
600#[derive(Debug, Error)]
601#[non_exhaustive]
602/// An error from [`Profiler::spawn_thread`]
603pub enum SpawnThreadError {
604    /// Error from async-profiler
605    #[error(transparent)]
606    AsProf(#[from] SpawnError),
607    /// Error constructing Tokio runtime
608    #[error("constructing Tokio runtime")]
609    ConstructRt(#[source] io::Error),
610}
611
612// no control messages currently
613enum Control {}
614
615/// A handle to a running profiler
616///
617/// Currently just allows for stopping the profiler.
618///
619/// Dropping this handle will request that the profiler will stop.
620#[must_use = "dropping this stops the profiler, call .detach() to detach"]
621pub struct RunningProfiler {
622    stop_channel: tokio::sync::oneshot::Sender<Control>,
623    join_handle: tokio::task::JoinHandle<()>,
624}
625
626impl RunningProfiler {
627    /// Request that the current profiler stops and wait until it exits.
628    ///
629    /// This will cause the currently-pending profile information to be flushed.
630    ///
631    /// After this function returns, it is correct and safe to [spawn] a new
632    /// [Profiler], possibly with a different configuration. Therefore,
633    /// this function can be used to "reconfigure" a profiler by stopping
634    /// it and then starting a new one with a different configuration.
635    ///
636    /// [spawn]: Profiler::spawn_controllable
637    pub async fn stop(self) {
638        drop(self.stop_channel);
639        let _ = self.join_handle.await;
640    }
641
642    /// Like [Self::detach], but returns a JoinHandle. This is currently not a public API.
643    fn detach_inner(self) -> tokio::task::JoinHandle<()> {
644        tokio::task::spawn(async move {
645            // move the control channel to the spawned task. this way, it will be dropped
646            // just when the task is aborted.
647            let _abort_channel = self.stop_channel;
648            self.join_handle.await.ok();
649        })
650    }
651
652    /// Detach this profiler. This will prevent the profiler from being stopped
653    /// when this handle is dropped. You should call this (or [Profiler::spawn]
654    /// instead of [Profiler::spawn_controllable], which does the same thing)
655    /// if you don't intend to reconfigure your profiler at runtime.
656    pub fn detach(self) {
657        self.detach_inner();
658    }
659
660    /// Spawns this [RunningProfiler] into a separate thread within a new Tokio runtime,
661    /// and returns a [RunningProfilerThread] attached to it.
662    fn spawn_attached(
663        self,
664        runtime: tokio::runtime::Runtime,
665        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
666    ) -> RunningProfilerThread {
667        RunningProfilerThread {
668            stop_channel: self.stop_channel,
669            join_handle: spawn_fn(Box::new(move || {
670                let _ = runtime.block_on(self.join_handle);
671            })),
672        }
673    }
674
675    /// Spawns this [RunningProfiler] into a separate thread within a new Tokio runtime,
676    /// and detaches it.
677    fn spawn_detached(
678        self,
679        runtime: tokio::runtime::Runtime,
680        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
681    ) {
682        spawn_fn(Box::new(move || {
683            let _stop_channel = self.stop_channel;
684            let _ = runtime.block_on(self.join_handle);
685        }));
686    }
687}
688
689/// A handle to a running profiler, running on a separate thread.
690///
691/// Currently just allows for stopping the profiler.
692///
693/// Dropping this handle will request that the profiler will stop.
694#[must_use = "dropping this stops the profiler, call .detach() to detach"]
695pub struct RunningProfilerThread {
696    stop_channel: tokio::sync::oneshot::Sender<Control>,
697    join_handle: std::thread::JoinHandle<()>,
698}
699
700impl RunningProfilerThread {
701    /// Request that the current profiler stops and wait until it exits.
702    ///
703    /// This will cause the currently-pending profile information to be flushed.
704    ///
705    /// After this function returns, it is correct and safe to [spawn] a new
706    /// [Profiler], possibly with a different configuration. Therefore,
707    /// this function can be used to "reconfigure" a profiler by stopping
708    /// it and then starting a new one with a different configuration.
709    ///
710    /// [spawn]: Profiler::spawn_controllable
711    pub fn stop(self) {
712        drop(self.stop_channel);
713        let _ = self.join_handle.join();
714    }
715}
716
717/// Rust profiler based on [async-profiler].
718///
719/// Spawning a profiler can be done either in an attached (controllable)
720/// mode, which allows for stopping the profiler (and, in fact, stops
721/// it when the relevant handle is dropped), or in detached mode,
722/// in which the profiler keeps running forever. Applications that can
723/// shut down the profiler at run-time, for example applications that
724/// support reconfiguration of a running profiler, generally want to use
725/// controllable mode. Other applications (most of them) should use
726/// detached mode.
727///
728/// In addition, the profiler can either be spawned into the current Tokio
729/// runtime, or into a new one. Normally, applications should spawn
730/// the profiler into their own Tokio runtime, but applications that
731/// don't have a default Tokio runtime should spawn it into a
732/// different one
733///
734/// This leaves 4 functions:
735/// 1. [Self::spawn] - detached, same runtime
736/// 2. [Self::spawn_thread_to_runtime] - detached, different runtime
737/// 3. [Self::spawn_controllable] - controllable, same runtime
738/// 4. [Self::spawn_controllable_thread_to_runtime] - controllable, different runtime
739///
740/// In addition, there's a helper function that just spawns the profiler
741/// to a new runtime in a new thread, for applications that don't have
742/// a Tokio runtime and don't need complex control:
743///
744/// 5. [Self::spawn_thread] - detached, new runtime in a new thread
745///
746/// [async-profiler]: https://github.com/async-profiler/async-profiler
747pub struct Profiler {
748    reporting_interval: Duration,
749    reporter: Box<dyn Reporter + Send + Sync>,
750    agent_metadata: Option<AgentMetadata>,
751    profiler_options: ProfilerOptions,
752}
753
754impl Profiler {
755    /// Start profiling. The profiler will run in a tokio task at the configured interval.
756    ///
757    /// This is the same as calling [Profiler::spawn_controllable] followed by
758    /// [RunningProfiler::detach], except it returns a [JoinHandle].
759    ///
760    /// The returned [JoinHandle] can be used to detect if the profiler has exited
761    /// due to a fatal error.
762    ///
763    /// This function will fail if it is unable to start async-profiler, for example
764    /// if it can't find or load `libasyncProfiler.so`.
765    ///
766    /// [JoinHandle]: tokio::task::JoinHandle
767    ///
768    /// ### Tokio Runtime
769    ///
770    /// This function must be run within a Tokio runtime, otherwise it will panic. If
771    /// your application does not have a `main` Tokio runtime, see
772    /// [Profiler::spawn_thread].
773    ///
774    /// ### Example
775    ///
776    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
777    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
778    ///
779    /// ```
780    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
781    /// # #[tokio::main]
782    /// # async fn main() -> Result<(), SpawnError> {
783    /// let profiler = ProfilerBuilder::default()
784    ///    .with_local_reporter("/tmp/profiles")
785    ///    .build();
786    /// # if false { // don't spawn the profiler in doctests
787    /// profiler.spawn()?;
788    /// # }
789    /// # Ok(())
790    /// # }
791    /// ```
792    pub fn spawn(self) -> Result<tokio::task::JoinHandle<()>, SpawnError> {
793        self.spawn_controllable().map(RunningProfiler::detach_inner)
794    }
795
796    /// Like [Self::spawn], but instead of spawning within the current Tokio
797    /// runtime, spawns within a set Tokio runtime and then runs a thread that calls
798    /// [block_on](tokio::runtime::Runtime::block_on) on that runtime.
799    ///
800    /// If your configuration is standard, use [Profiler::spawn_thread].
801    ///
802    /// If you want to be able to stop the resulting profiler, use
803    /// [Profiler::spawn_controllable_thread_to_runtime].
804    ///
805    /// `spawn_fn` should be [`std::thread::spawn`], or some function that behaves like it (to
806    /// allow for configuring thread properties, for example thread names).
807    ///
808    /// This is to be used when your program does not have a "main" Tokio runtime already set up.
809    ///
810    /// ### Example
811    ///
812    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
813    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
814    ///
815    /// ```no_run
816    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
817    /// let rt = tokio::runtime::Builder::new_current_thread()
818    ///     .enable_all()
819    ///     .build()?;
820    /// let profiler = ProfilerBuilder::default()
821    ///    .with_local_reporter("/tmp/profiles")
822    ///    .build();
823    ///
824    /// profiler.spawn_thread_to_runtime(
825    ///     rt,
826    ///     |t| {
827    ///         std::thread::Builder::new()
828    ///             .name("asprof-agent".to_owned())
829    ///             .spawn(t)
830    ///             .expect("thread name contains nuls")
831    ///     }
832    /// )?;
833    /// # Ok::<_, anyhow::Error>(())
834    /// ```
835    pub fn spawn_thread_to_runtime(
836        self,
837        runtime: tokio::runtime::Runtime,
838        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
839    ) -> Result<(), SpawnError> {
840        self.spawn_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn)
841    }
842
843    /// Like [Self::spawn], but instead of spawning within the current Tokio
844    /// runtime, spawns within a new Tokio runtime and then runs a thread that calls
845    /// [block_on](tokio::runtime::Runtime::block_on) on that runtime, setting up the runtime
846    /// by itself.
847    ///
848    /// If your configuration is less standard, use [Profiler::spawn_thread_to_runtime]. Calling
849    /// [Profiler::spawn_thread] is equivalent to calling [Profiler::spawn_thread_to_runtime]
850    /// with the following:
851    /// 1. a current thread runtime with background worker threads (these exist
852    ///    for blocking IO) named "asprof-worker"
853    /// 2. a controller thread (the "main" thread of the runtime) named "asprof-agent"
854    ///
855    /// If you want to be able to stop the resulting profiler, use
856    /// [Profiler::spawn_controllable_thread_to_runtime].
857    ///
858    /// This is to be used when your program does not have a "main" Tokio runtime already set up.
859    ///
860    /// ### Example
861    ///
862    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
863    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
864    ///
865    /// ```no_run
866    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
867    /// # use async_profiler_agent::reporter::local::LocalReporter;
868    /// let profiler = ProfilerBuilder::default()
869    ///    .with_local_reporter("/tmp/profiles")
870    ///    .build();
871    ///
872    /// profiler.spawn_thread()?;
873    /// # Ok::<_, anyhow::Error>(())
874    /// ```
875    pub fn spawn_thread(self) -> Result<(), SpawnThreadError> {
876        // using "asprof" in thread name to deal with 15 character + \0 length limit
877        let rt = tokio::runtime::Builder::new_current_thread()
878            .thread_name("asprof-worker".to_owned())
879            .enable_all()
880            .build()
881            .map_err(SpawnThreadError::ConstructRt)?;
882        let builder = std::thread::Builder::new().name("asprof-agent".to_owned());
883        self.spawn_thread_to_runtime(rt, |t| builder.spawn(t).expect("thread name contains nuls"))
884            .map_err(SpawnThreadError::AsProf)
885    }
886
887    fn spawn_thread_inner<E: ProfilerEngine>(
888        self,
889        asprof: E,
890        runtime: tokio::runtime::Runtime,
891        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
892    ) -> Result<(), SpawnError> {
893        let handle: RunningProfiler = runtime.block_on(async move { self.spawn_inner(asprof) })?;
894        handle.spawn_detached(runtime, spawn_fn);
895        Ok(())
896    }
897
898    /// Like [Self::spawn], but returns a [RunningProfiler] that allows for controlling
899    /// (currently only stopping) the profiler.
900    ///
901    /// This allows for changing the configuration of the profiler at runtime, by
902    /// stopping it and then starting a new Profiler with a new configuration. It
903    /// also allows for stopping profiling in case the profiler is suspected to
904    /// cause operational issues.
905    ///
906    /// Dropping the returned [RunningProfiler] will cause the profiler to quit,
907    /// so if your application doen't need to change the profiler's configuration at runtime,
908    /// it will be easier to use [Profiler::spawn].
909    ///
910    /// This function will fail if it is unable to start async-profiler, for example
911    /// if it can't find or load `libasyncProfiler.so`.
912    ///
913    /// ### Tokio Runtime
914    ///
915    /// This function must be run within a Tokio runtime, otherwise it will panic. If
916    /// your application does not have a `main` Tokio runtime, see
917    /// [Profiler::spawn_controllable_thread_to_runtime].
918    ///
919    /// ### Example
920    ///
921    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
922    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
923    ///
924    /// ```no_run
925    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
926    /// # #[tokio::main]
927    /// # async fn main() -> Result<(), SpawnError> {
928    /// let profiler = ProfilerBuilder::default()
929    ///    .with_local_reporter("/tmp/profiles")
930    ///    .build();
931    ///
932    /// let profiler = profiler.spawn_controllable()?;
933    ///
934    /// // [insert your signaling/monitoring mechanism to have a request to disable
935    /// // profiling in case of a problem]
936    /// let got_request_to_disable_profiling = async move {
937    ///     // ...
938    /// #   false
939    /// };
940    /// // spawn a task that will disable profiling if requested
941    /// tokio::task::spawn(async move {
942    ///     if got_request_to_disable_profiling.await {
943    ///         profiler.stop().await;
944    ///     }
945    /// });
946    /// # Ok(())
947    /// # }
948    /// ```
949    pub fn spawn_controllable(self) -> Result<RunningProfiler, SpawnError> {
950        self.spawn_inner(asprof::AsProf::builder().build())
951    }
952
953    /// Like [Self::spawn_controllable], but instead of spawning within the current Tokio
954    /// runtime, spawns within a set Tokio runtime and then runs a thread that calls
955    /// [block_on](tokio::runtime::Runtime::block_on) on that runtime.
956    ///
957    /// `spawn_fn` should be [`std::thread::spawn`], or some function that behaves like it (to
958    /// allow for configuring thread properties, for example thread names).
959    ///
960    /// This is to be used when your program does not have a "main" Tokio runtime already set up.
961    ///
962    /// ### Example
963    ///
964    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
965    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
966    ///
967    /// ```no_run
968    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
969    /// let rt = tokio::runtime::Builder::new_current_thread()
970    ///     .enable_all()
971    ///     .build()?;
972    /// let profiler = ProfilerBuilder::default()
973    ///    .with_local_reporter("/tmp/profiles")
974    ///    .build();
975    ///
976    /// let profiler = profiler.spawn_controllable_thread_to_runtime(
977    ///     rt,
978    ///     |t| {
979    ///         std::thread::Builder::new()
980    ///             .name("asprof-agent".to_owned())
981    ///             .spawn(t)
982    ///             .expect("thread name contains nuls")
983    ///     }
984    /// )?;
985    ///
986    /// # fn got_request_to_disable_profiling() -> bool { false }
987    /// // spawn a task that will disable profiling if requested
988    /// std::thread::spawn(move || {
989    ///     if got_request_to_disable_profiling() {
990    ///         profiler.stop();
991    ///     }
992    /// });
993    /// # Ok::<_, anyhow::Error>(())
994    /// ```
995    pub fn spawn_controllable_thread_to_runtime(
996        self,
997        runtime: tokio::runtime::Runtime,
998        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
999    ) -> Result<RunningProfilerThread, SpawnError> {
1000        self.spawn_controllable_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn)
1001    }
1002
1003    fn spawn_controllable_thread_inner<E: ProfilerEngine>(
1004        self,
1005        asprof: E,
1006        runtime: tokio::runtime::Runtime,
1007        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
1008    ) -> Result<RunningProfilerThread, SpawnError> {
1009        let handle = runtime.block_on(async move { self.spawn_inner(asprof) })?;
1010        Ok(handle.spawn_attached(runtime, spawn_fn))
1011    }
1012
1013    fn spawn_inner<E: ProfilerEngine>(self, asprof: E) -> Result<RunningProfiler, SpawnError> {
1014        // Initialize async profiler - needs to be done once.
1015        E::init_async_profiler()?;
1016        tracing::info!("successfully initialized async profiler.");
1017
1018        let mut sampling_ticker = tokio::time::interval(self.reporting_interval);
1019        let (stop_channel, mut stop_rx) = tokio::sync::oneshot::channel();
1020
1021        // Get profiles at the configured interval rate.
1022        let join_handle = tokio::spawn(async move {
1023            let mut state = match ProfilerState::new(asprof, self.profiler_options) {
1024                Ok(state) => state,
1025                Err(err) => {
1026                    tracing::error!(?err, "unable to create profiler state");
1027                    return;
1028                }
1029            };
1030
1031            // Lazily-loaded if not specified up front.
1032            let mut agent_metadata = self.agent_metadata;
1033            let mut done = false;
1034
1035            while !done {
1036                // Wait until a timer or exit event
1037                tokio::select! {
1038                    biased;
1039
1040                    r = &mut stop_rx, if !stop_rx.is_terminated() => {
1041                        match r {
1042                            Err(_) => {
1043                                tracing::info!("profiler stop requested, doing a final tick");
1044                                done = true;
1045                            }
1046                        }
1047                    }
1048                    _ = sampling_ticker.tick() => {
1049                        tracing::debug!("profiler timer woke up");
1050                    }
1051                }
1052
1053                if let Err(err) = profiler_tick(
1054                    &mut state,
1055                    &mut agent_metadata,
1056                    &*self.reporter,
1057                    self.reporting_interval,
1058                )
1059                .await
1060                {
1061                    match &err {
1062                        TickError::Reporter(_) => {
1063                            // don't stop on IO errors
1064                            tracing::error!(?err, "error during profiling, continuing");
1065                        }
1066                        _stop => {
1067                            tracing::error!(?err, "error during profiling, stopping");
1068                            break;
1069                        }
1070                    }
1071                }
1072            }
1073
1074            tracing::info!("profiling task finished");
1075        });
1076
1077        Ok(RunningProfiler {
1078            stop_channel,
1079            join_handle,
1080        })
1081    }
1082}
1083
1084async fn profiler_tick<E: ProfilerEngine>(
1085    state: &mut ProfilerState<E>,
1086    agent_metadata: &mut Option<AgentMetadata>,
1087    reporter: &(dyn Reporter + Send + Sync),
1088    reporting_interval: Duration,
1089) -> Result<(), TickError> {
1090    if !state.is_started() {
1091        state.start().await?;
1092        return Ok(());
1093    }
1094
1095    let Some(start) = state.stop()? else {
1096        tracing::warn!("stopped the profiler but it wasn't running?");
1097        return Ok(());
1098    };
1099    let start = start.duration_since(UNIX_EPOCH)?;
1100    let end = SystemTime::now().duration_since(UNIX_EPOCH)?;
1101
1102    // Start it up immediately, writing to the "other" file, so that we keep
1103    // profiling the application while we're reporting data.
1104    state
1105        .jfr_file_mut()
1106        .empty_inactive_file()
1107        .map_err(TickError::EmptyInactiveFile)?;
1108    state.jfr_file_mut().swap();
1109    state.start().await?;
1110
1111    // Lazily load the agent metadata if it was not provided in
1112    // the constructor. See the struct comments for why this is.
1113    // This code runs at most once.
1114    if agent_metadata.is_none() {
1115        #[cfg(feature = "aws-metadata-no-defaults")]
1116        let md = crate::metadata::aws::load_agent_metadata().await?;
1117        #[cfg(not(feature = "aws-metadata-no-defaults"))]
1118        let md = crate::metadata::AgentMetadata::NoMetadata;
1119        tracing::debug!("loaded metadata");
1120        agent_metadata.replace(md);
1121    }
1122
1123    let report_metadata = ReportMetadata {
1124        instance: agent_metadata.as_ref().unwrap(),
1125        start,
1126        end,
1127        reporting_interval,
1128    };
1129
1130    let jfr = tokio::fs::read(state.jfr_file_mut().inactive_path())
1131        .await
1132        .map_err(TickError::JfrRead)?;
1133
1134    reporter
1135        .report(jfr, &report_metadata)
1136        .await
1137        .map_err(TickError::Reporter)?;
1138
1139    Ok(())
1140}
1141
1142#[cfg(test)]
1143mod tests {
1144    use std::sync::atomic::{self, AtomicBool, AtomicU32};
1145    use std::sync::Arc;
1146
1147    use test_case::test_case;
1148
1149    use super::*;
1150
1151    #[test]
1152    fn test_jfr_file_drop() {
1153        let mut jfr = JfrFile::new().unwrap();
1154
1155        std::fs::write(jfr.active_path(), b"Hello, 2!").unwrap();
1156        jfr.swap();
1157        assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"Hello, 2!");
1158        jfr.empty_inactive_file().unwrap();
1159        assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"");
1160    }
1161
1162    struct MockProfilerEngine {
1163        counter: AtomicU32,
1164    }
1165    impl ProfilerEngine for MockProfilerEngine {
1166        fn init_async_profiler() -> Result<(), asprof::AsProfError> {
1167            Ok(())
1168        }
1169
1170        fn start_async_profiler(
1171            &self,
1172            jfr_file_path: &Path,
1173            _options: &ProfilerOptions,
1174        ) -> Result<(), asprof::AsProfError> {
1175            let contents = format!(
1176                "JFR{}",
1177                self.counter.fetch_add(1, atomic::Ordering::Relaxed)
1178            );
1179            std::fs::write(jfr_file_path, contents.as_bytes()).unwrap();
1180            Ok(())
1181        }
1182
1183        fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
1184            Ok(())
1185        }
1186    }
1187
1188    struct MockReporter(tokio::sync::mpsc::Sender<(String, AgentMetadata)>);
1189    impl std::fmt::Debug for MockReporter {
1190        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1191            f.debug_struct("MockReporter").finish()
1192        }
1193    }
1194
1195    #[async_trait::async_trait]
1196    impl Reporter for MockReporter {
1197        async fn report(
1198            &self,
1199            jfr: Vec<u8>,
1200            metadata: &ReportMetadata,
1201        ) -> Result<(), Box<dyn std::error::Error + Send>> {
1202            self.0
1203                .send((String::from_utf8(jfr).unwrap(), metadata.instance.clone()))
1204                .await
1205                .unwrap();
1206            Ok(())
1207        }
1208    }
1209
1210    fn make_mock_profiler() -> (
1211        Profiler,
1212        tokio::sync::mpsc::Receiver<(String, AgentMetadata)>,
1213    ) {
1214        let (tx, rx) = tokio::sync::mpsc::channel(1);
1215        let agent = ProfilerBuilder::default()
1216            .with_reporter(MockReporter(tx))
1217            .with_custom_agent_metadata(AgentMetadata::Ec2AgentMetadata {
1218                aws_account_id: "0".into(),
1219                aws_region_id: "us-east-1".into(),
1220                ec2_instance_id: "i-fake".into(),
1221            })
1222            .build();
1223        (agent, rx)
1224    }
1225
1226    #[tokio::test(start_paused = true)]
1227    async fn test_profiler_agent() {
1228        let e_md = AgentMetadata::Ec2AgentMetadata {
1229            aws_account_id: "0".into(),
1230            aws_region_id: "us-east-1".into(),
1231            ec2_instance_id: "i-fake".into(),
1232        };
1233        let (agent, mut rx) = make_mock_profiler();
1234        agent
1235            .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1236                counter: AtomicU32::new(0),
1237            })
1238            .unwrap()
1239            .detach();
1240        let (jfr, md) = rx.recv().await.unwrap();
1241        assert_eq!(jfr, "JFR0");
1242        assert_eq!(e_md, md);
1243        let (jfr, md) = rx.recv().await.unwrap();
1244        assert_eq!(jfr, "JFR1");
1245        assert_eq!(e_md, md);
1246    }
1247
1248    #[test_case(false; "uncontrollable")]
1249    #[test_case(true; "controllable")]
1250    fn test_profiler_local_rt(controllable: bool) {
1251        let e_md = AgentMetadata::Ec2AgentMetadata {
1252            aws_account_id: "0".into(),
1253            aws_region_id: "us-east-1".into(),
1254            ec2_instance_id: "i-fake".into(),
1255        };
1256        let (agent, mut rx) = make_mock_profiler();
1257        let rt = tokio::runtime::Builder::new_current_thread()
1258            .enable_all()
1259            .start_paused(true)
1260            .build()
1261            .unwrap();
1262        // spawn the profiler, doing this before spawning a thread to allow
1263        // capturing errors from `spawn`
1264        let handle = if controllable {
1265            Some(
1266                agent
1267                    .spawn_controllable_thread_inner::<MockProfilerEngine>(
1268                        MockProfilerEngine {
1269                            counter: AtomicU32::new(0),
1270                        },
1271                        rt,
1272                        std::thread::spawn,
1273                    )
1274                    .unwrap(),
1275            )
1276        } else {
1277            agent
1278                .spawn_thread_inner::<MockProfilerEngine>(
1279                    MockProfilerEngine {
1280                        counter: AtomicU32::new(0),
1281                    },
1282                    rt,
1283                    std::thread::spawn,
1284                )
1285                .unwrap();
1286            None
1287        };
1288
1289        let (jfr, md) = rx.blocking_recv().unwrap();
1290        assert_eq!(jfr, "JFR0");
1291        assert_eq!(e_md, md);
1292        let (jfr, md) = rx.blocking_recv().unwrap();
1293        assert_eq!(jfr, "JFR1");
1294        assert_eq!(e_md, md);
1295
1296        if let Some(handle) = handle {
1297            let drain_thread =
1298                std::thread::spawn(move || while let Some(_) = rx.blocking_recv() {});
1299            // request a stop
1300            handle.stop();
1301            // the drain thread should be done
1302            drain_thread.join().unwrap();
1303        }
1304    }
1305
1306    enum StopKind {
1307        Delibrate,
1308        Drop,
1309        Abort,
1310    }
1311
1312    #[tokio::test(start_paused = true)]
1313    #[test_case(StopKind::Delibrate; "deliberate stop")]
1314    #[test_case(StopKind::Drop; "drop stop")]
1315    #[test_case(StopKind::Abort; "abort stop")]
1316    async fn test_profiler_stop(stop_kind: StopKind) {
1317        let e_md = AgentMetadata::Ec2AgentMetadata {
1318            aws_account_id: "0".into(),
1319            aws_region_id: "us-east-1".into(),
1320            ec2_instance_id: "i-fake".into(),
1321        };
1322        let (agent, mut rx) = make_mock_profiler();
1323        let profiler_ref = agent
1324            .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1325                counter: AtomicU32::new(0),
1326            })
1327            .unwrap();
1328        let (jfr, md) = rx.recv().await.unwrap();
1329        assert_eq!(jfr, "JFR0");
1330        assert_eq!(e_md, md);
1331        let (jfr, md) = rx.recv().await.unwrap();
1332        assert_eq!(jfr, "JFR1");
1333        assert_eq!(e_md, md);
1334        // check that stop is faster than an interval and returns an "immediate" next jfr
1335        match stop_kind {
1336            StopKind::Drop => drop(profiler_ref),
1337            StopKind::Delibrate => {
1338                tokio::time::timeout(Duration::from_millis(1), profiler_ref.stop())
1339                    .await
1340                    .unwrap();
1341            }
1342            StopKind::Abort => {
1343                // You can call Abort on the JoinHandle. make sure that is not buggy.
1344                profiler_ref.detach_inner().abort();
1345            }
1346        }
1347        // check that we get the next JFR "quickly", and the JFR after that is empty.
1348        let (jfr, md) = tokio::time::timeout(Duration::from_millis(1), rx.recv())
1349            .await
1350            .unwrap()
1351            .unwrap();
1352        assert_eq!(jfr, "JFR2");
1353        assert_eq!(e_md, md);
1354        assert!(rx.recv().await.is_none());
1355    }
1356
1357    // simulate a badly-behaved profiler that errors on start/stop and then
1358    // tries to access the JFR file
1359    struct StopErrorProfilerEngine {
1360        start_error: bool,
1361        counter: Arc<AtomicBool>,
1362    }
1363    impl ProfilerEngine for StopErrorProfilerEngine {
1364        fn init_async_profiler() -> Result<(), asprof::AsProfError> {
1365            Ok(())
1366        }
1367
1368        fn start_async_profiler(
1369            &self,
1370            jfr_file_path: &Path,
1371            _options: &ProfilerOptions,
1372        ) -> Result<(), asprof::AsProfError> {
1373            let jfr_file_path = jfr_file_path.to_owned();
1374            std::fs::write(&jfr_file_path, "JFR").unwrap();
1375            let counter = self.counter.clone();
1376            tokio::task::spawn(async move {
1377                tokio::time::sleep(Duration::from_secs(5)).await;
1378                assert_eq!(std::fs::read_to_string(jfr_file_path).unwrap(), "JFR");
1379                counter.store(true, atomic::Ordering::Release);
1380            });
1381            if self.start_error {
1382                Err(asprof::AsProfError::AsyncProfilerError("error".into()))
1383            } else {
1384                Ok(())
1385            }
1386        }
1387
1388        fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
1389            Err(asprof::AsProfError::AsyncProfilerError("error".into()))
1390        }
1391    }
1392
1393    #[tokio::test(start_paused = true)]
1394    #[test_case(false; "error on stop")]
1395    #[test_case(true; "error on start")]
1396    async fn test_profiler_error(start_error: bool) {
1397        let (agent, mut rx) = make_mock_profiler();
1398        let counter = Arc::new(AtomicBool::new(false));
1399        let engine = StopErrorProfilerEngine {
1400            start_error,
1401            counter: counter.clone(),
1402        };
1403        let handle = agent.spawn_inner(engine).unwrap().detach_inner();
1404        assert!(rx.recv().await.is_none());
1405        // check that the "sleep 5" step in start_async_profiler succeeds
1406        for _ in 0..100 {
1407            tokio::time::sleep(Duration::from_secs(1)).await;
1408            if counter.load(atomic::Ordering::Acquire) {
1409                handle.await.unwrap(); // Check that the JoinHandle is done
1410                return;
1411            }
1412        }
1413        panic!("didn't read from file");
1414    }
1415
1416    #[test]
1417    fn test_profiler_options_to_args_string_default() {
1418        let opts = ProfilerOptions::default();
1419        let dummy_path = Path::new("/tmp/test.jfr");
1420        let args = opts.to_args_string(dummy_path);
1421        assert!(
1422            args.contains("start,event=cpu,interval=100000000,wall=1000ms,jfr,cstack=dwarf"),
1423            "Default args string not constructed correctly"
1424        );
1425        assert!(args.contains("file=/tmp/test.jfr"));
1426        assert!(!args.contains("nativemem="));
1427    }
1428
1429    #[test]
1430    fn test_profiler_options_to_args_string_with_native_mem() {
1431        let opts = ProfilerOptions {
1432            native_mem: Some("10m".to_string()),
1433            wall_clock_millis: None,
1434            cpu_interval: None,
1435        };
1436        let dummy_path = Path::new("/tmp/test.jfr");
1437        let args = opts.to_args_string(dummy_path);
1438        assert!(args.contains("nativemem=10m"));
1439    }
1440
1441    #[test]
1442    fn test_profiler_options_builder() {
1443        let opts = ProfilerOptionsBuilder::default()
1444            .with_native_mem_bytes(5000000)
1445            .build();
1446
1447        assert_eq!(opts.native_mem, Some("5000000".to_string()));
1448    }
1449
1450    #[test]
1451    fn test_profiler_options_builder_all_options() {
1452        let opts = ProfilerOptionsBuilder::default()
1453            .with_native_mem_bytes(5000000)
1454            .with_cpu_interval(Duration::from_secs(1))
1455            .with_wall_clock_interval(Duration::from_secs(10))
1456            .build();
1457
1458        let dummy_path = Path::new("/tmp/test.jfr");
1459        let args = opts.to_args_string(dummy_path);
1460        assert_eq!(args, "start,event=cpu,interval=1000000000,wall=10000ms,jfr,cstack=dwarf,file=/tmp/test.jfr,nativemem=5000000");
1461    }
1462
1463    #[test]
1464    fn test_local_reporter_has_no_metadata() {
1465        // Check that with_local_reporter sets some configuration
1466        let reporter = ProfilerBuilder::default().with_local_reporter(".");
1467        assert_eq!(
1468            format!("{:?}", reporter.reporter),
1469            r#"Some(LocalReporter { directory: "." })"#
1470        );
1471        match reporter.agent_metadata {
1472            Some(AgentMetadata::NoMetadata) => {}
1473            bad => panic!("{bad:?}"),
1474        };
1475    }
1476}