async_profiler_agent/
profiler.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! A profiler that periodically uploads profiling samples of your program to a [Reporter]
5
6use crate::{
7    asprof::{self, AsProfError},
8    metadata::{AgentMetadata, ReportMetadata},
9    reporter::{Reporter, local::LocalReporter},
10};
11use std::{
12    fs::File,
13    io,
14    path::{Path, PathBuf},
15    time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
16};
17use thiserror::Error;
18
19struct JfrFile {
20    active: std::fs::File,
21    inactive: std::fs::File,
22}
23
24impl JfrFile {
25    #[cfg(target_os = "linux")]
26    fn new() -> Result<Self, io::Error> {
27        Ok(Self {
28            active: tempfile::tempfile().unwrap(),
29            inactive: tempfile::tempfile().unwrap(),
30        })
31    }
32
33    #[cfg(not(target_os = "linux"))]
34    fn new() -> Result<Self, io::Error> {
35        Err(io::Error::other(
36            "async-profiler is only supported on Linux",
37        ))
38    }
39
40    fn swap(&mut self) {
41        std::mem::swap(&mut self.active, &mut self.inactive);
42    }
43
44    #[cfg(target_os = "linux")]
45    fn file_path(file: &std::fs::File) -> PathBuf {
46        use std::os::fd::AsRawFd;
47
48        format!("/proc/self/fd/{}", file.as_raw_fd()).into()
49    }
50
51    #[cfg(not(target_os = "linux"))]
52    fn file_path(_file: &std::fs::File) -> PathBuf {
53        unimplemented!()
54    }
55
56    fn active_path(&self) -> PathBuf {
57        Self::file_path(&self.active)
58    }
59
60    fn inactive_path(&self) -> PathBuf {
61        Self::file_path(&self.inactive)
62    }
63
64    fn empty_inactive_file(&mut self) -> Result<(), io::Error> {
65        // Empty the file, or create it for the first time if the profiler hasn't
66        // started yet.
67        File::create(Self::file_path(&self.inactive))?;
68        tracing::debug!(message = "emptied the file");
69        Ok(())
70    }
71}
72
73/// Options for configuring the async-profiler behavior.
74/// Currently supports:
75/// - Native memory allocation tracking
76#[derive(Debug, Default)]
77#[non_exhaustive]
78pub struct ProfilerOptions {
79    /// If set, the profiler will collect information about
80    /// native memory allocations.
81    ///
82    /// The value is the interval in bytes or in other units,
83    /// if followed by k (kilobytes), m (megabytes), or g (gigabytes).
84    /// For example, `"10m"` will sample an allocation for every
85    /// 10 megabytes of memory allocated. Passing `"0"` will sample
86    /// all allocations.
87    ///
88    /// See [ProfilingModes in the async-profiler docs] for more details.
89    ///
90    /// [ProfilingModes in the async-profiler docs]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilingModes.md#native-memory-leaks
91    pub native_mem: Option<String>,
92    cpu_interval: Option<u128>,
93    wall_clock_millis: Option<u128>,
94}
95
96const DEFAULT_CPU_INTERVAL_NANOS: u128 = 100_000_000;
97const DEFAULT_WALL_CLOCK_INTERVAL_MILLIS: u128 = 1_000;
98
99impl ProfilerOptions {
100    /// Convert the profiler options to a string of arguments for the async-profiler.
101    pub fn to_args_string(&self, jfr_file_path: &std::path::Path) -> String {
102        let mut args = format!(
103            "start,event=cpu,interval={},wall={}ms,jfr,cstack=dwarf,file={}",
104            self.cpu_interval.unwrap_or(DEFAULT_CPU_INTERVAL_NANOS),
105            self.wall_clock_millis
106                .unwrap_or(DEFAULT_WALL_CLOCK_INTERVAL_MILLIS),
107            jfr_file_path.display()
108        );
109        if let Some(ref native_mem) = self.native_mem {
110            args.push_str(&format!(",nativemem={native_mem}"));
111        }
112        args
113    }
114}
115
116/// Builder for [`ProfilerOptions`].
117#[derive(Debug, Default)]
118pub struct ProfilerOptionsBuilder {
119    native_mem: Option<String>,
120    cpu_interval: Option<u128>,
121    wall_clock_millis: Option<u128>,
122}
123
124impl ProfilerOptionsBuilder {
125    /// Same as [ProfilerOptionsBuilder::with_native_mem_bytes], but pass
126    /// the string input directly to async_profiler.
127    ///
128    /// The value is the interval in bytes or in other units,
129    /// if followed by k (kilobytes), m (megabytes), or g (gigabytes).
130    ///
131    /// Prefer using [ProfilerOptionsBuilder::with_native_mem_bytes], since it's
132    /// type-checked.
133    ///
134    /// ### Examples
135    ///
136    /// This will sample allocations for every 10 megabytes allocated:
137    ///
138    /// ```
139    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
140    /// # use async_profiler_agent::profiler::SpawnError;
141    /// # fn main() -> Result<(), SpawnError> {
142    /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build();
143    /// let profiler = ProfilerBuilder::default()
144    ///     .with_profiler_options(opts)
145    ///     .with_local_reporter("/tmp/profiles")
146    ///     .build();
147    /// # if false { // don't spawn the profiler in doctests
148    /// profiler.spawn()?;
149    /// # }
150    /// # Ok(())
151    /// # }
152    /// ```
153    pub fn with_native_mem(mut self, native_mem_interval: String) -> Self {
154        self.native_mem = Some(native_mem_interval);
155        self
156    }
157
158    /// If set, the profiler will collect information about
159    /// native memory allocations.
160    ///
161    /// The argument passed is the profiling interval - the profiler will
162    /// sample allocations every about that many bytes.
163    ///
164    /// See [ProfilingModes in the async-profiler docs] for more details.
165    ///
166    /// [ProfilingModes in the async-profiler docs]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilingModes.md#native-memory-leaks
167    ///
168    /// ### Examples
169    ///
170    /// This will sample allocations for every 10 megabytes allocated:
171    ///
172    /// ```
173    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
174    /// # use async_profiler_agent::profiler::SpawnError;
175    /// # fn main() -> Result<(), SpawnError> {
176    /// let opts = ProfilerOptionsBuilder::default().with_native_mem_bytes(10_000_000).build();
177    /// let profiler = ProfilerBuilder::default()
178    ///     .with_profiler_options(opts)
179    ///     .with_local_reporter("/tmp/profiles")
180    ///     .build();
181    /// # if false { // don't spawn the profiler in doctests
182    /// profiler.spawn()?;
183    /// # }
184    /// # Ok(())
185    /// # }
186    /// ```
187    ///
188    /// This will sample every allocation (potentially slow):
189    /// ```
190    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
191    /// # use async_profiler_agent::profiler::SpawnError;
192    /// # fn main() -> Result<(), SpawnError> {
193    /// let opts = ProfilerOptionsBuilder::default().with_native_mem_bytes(0).build();
194    /// let profiler = ProfilerBuilder::default()
195    ///     .with_profiler_options(opts)
196    ///     .with_local_reporter("/tmp/profiles")
197    ///     .build();
198    /// # if false { // don't spawn the profiler in doctests
199    /// profiler.spawn()?;
200    /// # }
201    /// # Ok(())
202    /// # }
203    /// ```
204    pub fn with_native_mem_bytes(mut self, native_mem_interval: usize) -> Self {
205        self.native_mem = Some(native_mem_interval.to_string());
206        self
207    }
208
209    /// Sets the interval in which the profiler will collect
210    /// CPU-time samples, via the [async-profiler `interval` option].
211    ///
212    /// CPU-time samples (JFR `jdk.ExecutionSample`) sample only threads that
213    /// are currently running on a CPU, not threads that are sleeping.
214    ///
215    /// It can use a higher frequency than wall-clock sampling since the
216    /// number of the threads that are running on a CPU at a given time is
217    /// naturally limited by the number of CPUs, while the number of sleeping
218    /// threads can be much larger.
219    ///
220    /// The default is to do a CPU-time sample every 100 milliseconds.
221    ///
222    /// The async-profiler agent collects both CPU time and wall-clock time
223    /// samples, so this function should normally be used along with
224    /// [ProfilerOptionsBuilder::with_wall_clock_interval].
225    ///
226    /// [async-profiler `interval` option]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilerOptions.md#options-applicable-to-any-output-format
227    ///
228    /// ### Examples
229    ///
230    /// This will sample allocations for every 10 CPU milliseconds (when running)
231    /// and 100 wall-clock milliseconds (running or sleeping):
232    ///
233    /// ```
234    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
235    /// # use async_profiler_agent::profiler::SpawnError;
236    /// # use std::time::Duration;
237    /// # fn main() -> Result<(), SpawnError> {
238    /// let opts = ProfilerOptionsBuilder::default()
239    ///     .with_cpu_interval(Duration::from_millis(10))
240    ///     .with_wall_clock_interval(Duration::from_millis(100))
241    ///     .build();
242    /// let profiler = ProfilerBuilder::default()
243    ///     .with_profiler_options(opts)
244    ///     .with_local_reporter("/tmp/profiles")
245    ///     .build();
246    /// # if false { // don't spawn the profiler in doctests
247    /// profiler.spawn()?;
248    /// # }
249    /// # Ok(())
250    /// # }
251    /// ```
252    pub fn with_cpu_interval(mut self, cpu_interval: Duration) -> Self {
253        self.cpu_interval = Some(cpu_interval.as_nanos());
254        self
255    }
256
257    /// Sets the interval, in milliseconds, in which the profiler will collect
258    /// wall-clock samples, via the [async-profiler `wall` option].
259    ///
260    /// Wall-clock samples (JFR `profiler.WallClockSample`) sample threads
261    /// whether they are sleeping or running, and can therefore be
262    /// very useful for finding threads that are blocked, for example
263    /// on a synchronous lock or a slow system call.
264    ///
265    /// When using Tokio, since tasks are not threads, tasks that are not
266    /// currently running will not be sampled by a wall clock sample. However,
267    /// a wall clock sample is still very useful in Tokio, since it is what
268    /// you want to catch tasks that are blocking a thread by waiting on
269    /// synchronous operations.
270    ///
271    /// The default is to do a wall-clock sample every second.
272    ///
273    /// The async-profiler agent collects both CPU time and wall-clock time
274    /// samples, so this function should normally be used along with
275    /// [ProfilerOptionsBuilder::with_cpu_interval].
276    ///
277    /// [async-profiler `wall` option]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilerOptions.md#options-applicable-to-any-output-format
278    ///
279    /// ### Examples
280    ///
281    /// This will sample allocations for every 10 CPU milliseconds (when running)
282    /// and 100 wall-clock milliseconds (running or sleeping):
283    ///
284    /// ```
285    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
286    /// # use async_profiler_agent::profiler::SpawnError;
287    /// # use std::time::Duration;
288    /// # fn main() -> Result<(), SpawnError> {
289    /// let opts = ProfilerOptionsBuilder::default()
290    ///     .with_cpu_interval(Duration::from_millis(10))
291    ///     .with_wall_clock_interval(Duration::from_millis(10))
292    ///     .build();
293    /// let profiler = ProfilerBuilder::default()
294    ///     .with_profiler_options(opts)
295    ///     .with_local_reporter("/tmp/profiles")
296    ///     .build();
297    /// # if false { // don't spawn the profiler in doctests
298    /// profiler.spawn()?;
299    /// # }
300    /// # Ok(())
301    /// # }
302    /// ```
303    pub fn with_wall_clock_interval(mut self, wall_clock: Duration) -> Self {
304        self.wall_clock_millis = Some(wall_clock.as_millis());
305        self
306    }
307
308    /// Build the [`ProfilerOptions`] from the builder.
309    pub fn build(self) -> ProfilerOptions {
310        ProfilerOptions {
311            native_mem: self.native_mem,
312            wall_clock_millis: self.wall_clock_millis,
313            cpu_interval: self.cpu_interval,
314        }
315    }
316}
317
318/// Builds a [`Profiler`], panicking if any required fields were not set by the
319/// time `build` is called.
320#[derive(Debug, Default)]
321pub struct ProfilerBuilder {
322    reporting_interval: Option<Duration>,
323    reporter: Option<Box<dyn Reporter + Send + Sync>>,
324    agent_metadata: Option<AgentMetadata>,
325    profiler_options: Option<ProfilerOptions>,
326}
327
328impl ProfilerBuilder {
329    /// Sets the reporting interval (default: 30 seconds).
330    ///
331    /// This is the interval that samples are *reported* to the backend,
332    /// and is unrelated to the interval at which the application
333    /// is *sampled* by async profiler, which is controlled by
334    /// [ProfilerOptionsBuilder::with_cpu_interval] and
335    /// [ProfilerOptionsBuilder::with_wall_clock_interval].
336    ///
337    /// Most users should not change this setting.
338    ///
339    /// ## Example
340    ///
341    /// ```no_run
342    /// # use std::path::PathBuf;
343    /// # use std::time::Duration;
344    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
345    /// # let path = PathBuf::from(".");
346    /// let agent = ProfilerBuilder::default()
347    ///     .with_local_reporter(path)
348    ///     .with_reporting_interval(Duration::from_secs(15))
349    ///     .build()
350    ///     .spawn()?;
351    /// # Ok::<_, SpawnError>(())
352    /// ```
353    pub fn with_reporting_interval(mut self, i: Duration) -> ProfilerBuilder {
354        self.reporting_interval = Some(i);
355        self
356    }
357
358    /// Sets the [`Reporter`], which is used to upload the collected profiling
359    /// data. Common reporters are [`LocalReporter`], and, with the `s3-no-defaults`
360    /// feature enabled,
361    #[cfg_attr(not(feature = "s3-no-defaults"), doc = "`S3Reporter`.")]
362    #[cfg_attr(feature = "s3-no-defaults", doc = "[`S3Reporter`].")]
363    /// It is also possible to write your own [`Reporter`].
364    ///
365    /// It's normally easier to use [`LocalReporter`] directly via
366    /// [`ProfilerBuilder::with_local_reporter`].
367    ///
368    /// If you want to output to multiple reporters, you can use
369    /// [`MultiReporter`].
370    ///
371    /// [`LocalReporter`]: crate::reporter::local::LocalReporter
372    /// [`MultiReporter`]: crate::reporter::multi::MultiReporter
373    #[cfg_attr(
374        feature = "s3-no-defaults",
375        doc = "[`S3Reporter`]: crate::reporter::s3::S3Reporter"
376    )]
377    ///
378    #[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example.md"))]
379    pub fn with_reporter(mut self, r: impl Reporter + Send + Sync + 'static) -> ProfilerBuilder {
380        self.reporter = Some(Box::new(r));
381        self
382    }
383
384    /// Sets the profiler to ues [LocalReporter], which will write `.jfr` files to `path`,
385    /// and disables metadata auto-detection (see [`ProfilerBuilder::with_custom_agent_metadata`])
386    /// since the [LocalReporter] does not need that.
387    ///
388    /// This is useful for testing, since metadata auto-detection currently only works
389    /// on [Amazon EC2] or [Amazon Fargate] instances.
390    ///
391    /// The local reporter should normally not be used in production, since it will
392    /// not clean up JFR files. Instead, you can use a pre-existing [`Reporter`]
393    /// or write your own (see [`ProfilerBuilder::with_reporter`]).
394    ///
395    /// [Amazon EC2]: https://aws.amazon.com/ec2
396    /// [Amazon Fargate]: https://aws.amazon.com/fargate
397    ///
398    /// ## Example
399    ///
400    /// This will write profiles as `.jfr` files to `./path-to-profiles`:
401    ///
402    /// ```no_run
403    /// # use std::path::PathBuf;
404    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
405    /// # use async_profiler_agent::reporter::local::LocalReporter;
406    /// # use async_profiler_agent::metadata::AgentMetadata;
407    /// let path = PathBuf::from("./path-to-profiles");
408    /// let agent = ProfilerBuilder::default()
409    ///     .with_local_reporter(path)
410    ///     .build()
411    ///     .spawn()?;
412    /// # Ok::<_, SpawnError>(())
413    /// ```
414    pub fn with_local_reporter(mut self, path: impl Into<PathBuf>) -> ProfilerBuilder {
415        self.reporter = Some(Box::new(LocalReporter::new(path.into())));
416        self.with_custom_agent_metadata(AgentMetadata::NoMetadata)
417    }
418
419    /// Provide custom agent metadata.
420    ///
421    /// The async-profiler Rust agent sends metadata to the [Reporter] with
422    /// the identity of the current host and process, which is normally
423    /// transmitted as `metadata.json` within the generated `.zip` file,
424    /// using the schema format [`reporter::s3::MetadataJson`].
425    ///
426    /// That metadata can later be used by tooling to be able to sort
427    /// profiling reports by host.
428    ///
429    /// async-profiler Rust agent will by default try to fetch the metadata
430    /// using [IMDS] when running on [Amazon EC2] or [Amazon Fargate], and
431    /// will error if it's unable to find it. If you are running the
432    /// async-profiler agent on any other form of compute,
433    /// you will need to create and attach your own metadata
434    /// by calling this function.
435    ///
436    #[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example-custom-metadata.md"))]
437    /// [`reporter::s3::MetadataJson`]: crate::reporter::s3::MetadataJson
438    /// [Amazon EC2]: https://aws.amazon.com/ec2
439    /// [Amazon Fargate]: https://aws.amazon.com/fargate
440    /// [IMDS]: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
441    pub fn with_custom_agent_metadata(mut self, j: AgentMetadata) -> ProfilerBuilder {
442        self.agent_metadata = Some(j);
443        self
444    }
445
446    /// Provide custom profiler options.
447    ///
448    /// ### Example
449    ///
450    /// This will sample allocations for every 10 megabytes allocated:
451    ///
452    /// ```
453    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
454    /// # use async_profiler_agent::profiler::SpawnError;
455    /// # fn main() -> Result<(), SpawnError> {
456    /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build();
457    /// let profiler = ProfilerBuilder::default()
458    ///     .with_profiler_options(opts)
459    ///     .with_local_reporter("/tmp/profiles")
460    ///     .build();
461    /// # if false { // don't spawn the profiler in doctests
462    /// profiler.spawn()?;
463    /// # }
464    /// # Ok(())
465    /// # }
466    /// ```
467    pub fn with_profiler_options(mut self, c: ProfilerOptions) -> ProfilerBuilder {
468        self.profiler_options = Some(c);
469        self
470    }
471
472    /// Turn this builder into a profiler!
473    pub fn build(self) -> Profiler {
474        Profiler {
475            reporting_interval: self.reporting_interval.unwrap_or(Duration::from_secs(30)),
476            reporter: self.reporter.expect("reporter is required"),
477            agent_metadata: self.agent_metadata,
478            profiler_options: self.profiler_options.unwrap_or_default(),
479        }
480    }
481}
482
483enum Status {
484    Idle,
485    Starting,
486    Running(SystemTime),
487}
488
489/// This type provides wrapper APIs over [`asprof::AsProf`], to allow tracking
490/// of the state of the profiler. The primary benefit of this is RAII - when
491/// this type drops, it will stop the profiler if it's running.
492struct ProfilerState<E: ProfilerEngine> {
493    // this is only None in the destructor when stopping the async-profiler fails
494    jfr_file: Option<JfrFile>,
495    asprof: E,
496    status: Status,
497    profiler_options: ProfilerOptions,
498}
499
500impl<E: ProfilerEngine> ProfilerState<E> {
501    fn new(asprof: E, profiler_options: ProfilerOptions) -> Result<Self, io::Error> {
502        Ok(Self {
503            jfr_file: Some(JfrFile::new()?),
504            asprof,
505            status: Status::Idle,
506            profiler_options,
507        })
508    }
509
510    fn jfr_file_mut(&mut self) -> &mut JfrFile {
511        self.jfr_file.as_mut().unwrap()
512    }
513
514    async fn start(&mut self) -> Result<(), AsProfError> {
515        let active = self.jfr_file.as_ref().unwrap().active_path();
516        // drop guard - make sure the files are leaked if the profiler might have started
517        self.status = Status::Starting;
518        E::start_async_profiler(&self.asprof, &active, &self.profiler_options)?;
519        self.status = Status::Running(SystemTime::now());
520        Ok(())
521    }
522
523    fn stop(&mut self) -> Result<Option<SystemTime>, AsProfError> {
524        E::stop_async_profiler()?;
525        let status = std::mem::replace(&mut self.status, Status::Idle);
526        Ok(match status {
527            Status::Idle | Status::Starting => None,
528            Status::Running(since) => Some(since),
529        })
530    }
531
532    fn is_started(&self) -> bool {
533        matches!(self.status, Status::Running(_))
534    }
535}
536
537impl<E: ProfilerEngine> Drop for ProfilerState<E> {
538    fn drop(&mut self) {
539        match self.status {
540            Status::Running(_) => {
541                if let Err(err) = self.stop() {
542                    // SECURITY: avoid removing the JFR file if stopping the profiler fails,
543                    // to avoid symlink races
544                    std::mem::forget(self.jfr_file.take());
545                    // XXX: Rust defines leaking resources during drop as safe.
546                    tracing::warn!(?err, "unable to stop profiler during drop glue");
547                }
548            }
549            Status::Idle => {}
550            Status::Starting => {
551                // SECURITY: avoid removing the JFR file if stopping the profiler fails,
552                // to avoid symlink races
553                std::mem::forget(self.jfr_file.take());
554            }
555        }
556    }
557}
558
559pub(crate) trait ProfilerEngine: Send + Sync + 'static {
560    fn init_async_profiler() -> Result<(), asprof::AsProfError>;
561    fn start_async_profiler(
562        &self,
563        jfr_file_path: &Path,
564        options: &ProfilerOptions,
565    ) -> Result<(), asprof::AsProfError>;
566    fn stop_async_profiler() -> Result<(), asprof::AsProfError>;
567}
568
569#[derive(Debug, Error)]
570#[non_exhaustive]
571enum TickError {
572    #[error(transparent)]
573    AsProf(#[from] AsProfError),
574    #[error(transparent)]
575    #[cfg(feature = "aws-metadata-no-defaults")]
576    Metadata(#[from] crate::metadata::aws::AwsProfilerMetadataError),
577    #[error("reporter: {0}")]
578    Reporter(Box<dyn std::error::Error + Send>),
579    #[error("broken clock: {0}")]
580    BrokenClock(#[from] SystemTimeError),
581    #[error("jfr read error: {0}")]
582    JfrRead(io::Error),
583    #[error("empty inactive file error: {0}")]
584    EmptyInactiveFile(io::Error),
585}
586
587#[derive(Debug, Error)]
588#[non_exhaustive]
589/// An error that happened spawning a profiler
590pub enum SpawnError {
591    /// Error from async-profiler
592    #[error(transparent)]
593    AsProf(#[from] asprof::AsProfError),
594    /// Error writing to a tempfile
595    #[error("tempfile error")]
596    TempFile(#[source] io::Error),
597}
598
599#[derive(Debug, Error)]
600#[non_exhaustive]
601/// An error from [`Profiler::spawn_thread`]
602pub enum SpawnThreadError {
603    /// Error from async-profiler
604    #[error(transparent)]
605    AsProf(#[from] SpawnError),
606    /// Error constructing Tokio runtime
607    #[error("constructing Tokio runtime")]
608    ConstructRt(#[source] io::Error),
609}
610
611// no control messages currently
612enum Control {}
613
614/// A handle to a running profiler
615///
616/// Currently just allows for stopping the profiler.
617///
618/// Dropping this handle will request that the profiler will stop.
619#[must_use = "dropping this stops the profiler, call .detach() to detach"]
620pub struct RunningProfiler {
621    stop_channel: tokio::sync::oneshot::Sender<Control>,
622    join_handle: tokio::task::JoinHandle<()>,
623}
624
625impl RunningProfiler {
626    /// Request that the current profiler stops and wait until it exits.
627    ///
628    /// This will cause the currently-pending profile information to be flushed.
629    ///
630    /// After this function returns, it is correct and safe to [spawn] a new
631    /// [Profiler], possibly with a different configuration. Therefore,
632    /// this function can be used to "reconfigure" a profiler by stopping
633    /// it and then starting a new one with a different configuration.
634    ///
635    /// [spawn]: Profiler::spawn_controllable
636    pub async fn stop(self) {
637        drop(self.stop_channel);
638        let _ = self.join_handle.await;
639    }
640
641    /// Like [Self::detach], but returns a JoinHandle. This is currently not a public API.
642    fn detach_inner(self) -> tokio::task::JoinHandle<()> {
643        tokio::task::spawn(async move {
644            // move the control channel to the spawned task. this way, it will be dropped
645            // just when the task is aborted.
646            let _abort_channel = self.stop_channel;
647            self.join_handle.await.ok();
648        })
649    }
650
651    /// Detach this profiler. This will prevent the profiler from being stopped
652    /// when this handle is dropped. You should call this (or [Profiler::spawn]
653    /// instead of [Profiler::spawn_controllable], which does the same thing)
654    /// if you don't intend to reconfigure your profiler at runtime.
655    pub fn detach(self) {
656        self.detach_inner();
657    }
658
659    /// Spawns this [RunningProfiler] into a separate thread within a new Tokio runtime,
660    /// and returns a [RunningProfilerThread] attached to it.
661    fn spawn_attached(
662        self,
663        runtime: tokio::runtime::Runtime,
664        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
665    ) -> RunningProfilerThread {
666        RunningProfilerThread {
667            stop_channel: self.stop_channel,
668            join_handle: spawn_fn(Box::new(move || {
669                let _ = runtime.block_on(self.join_handle);
670            })),
671        }
672    }
673
674    /// Spawns this [RunningProfiler] into a separate thread within a new Tokio runtime,
675    /// and detaches it.
676    fn spawn_detached(
677        self,
678        runtime: tokio::runtime::Runtime,
679        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
680    ) {
681        spawn_fn(Box::new(move || {
682            let _stop_channel = self.stop_channel;
683            let _ = runtime.block_on(self.join_handle);
684        }));
685    }
686}
687
688/// A handle to a running profiler, running on a separate thread.
689///
690/// Currently just allows for stopping the profiler.
691///
692/// Dropping this handle will request that the profiler will stop.
693#[must_use = "dropping this stops the profiler, call .detach() to detach"]
694pub struct RunningProfilerThread {
695    stop_channel: tokio::sync::oneshot::Sender<Control>,
696    join_handle: std::thread::JoinHandle<()>,
697}
698
699impl RunningProfilerThread {
700    /// Request that the current profiler stops and wait until it exits.
701    ///
702    /// This will cause the currently-pending profile information to be flushed.
703    ///
704    /// After this function returns, it is correct and safe to [spawn] a new
705    /// [Profiler], possibly with a different configuration. Therefore,
706    /// this function can be used to "reconfigure" a profiler by stopping
707    /// it and then starting a new one with a different configuration.
708    ///
709    /// [spawn]: Profiler::spawn_controllable
710    pub fn stop(self) {
711        drop(self.stop_channel);
712        let _ = self.join_handle.join();
713    }
714}
715
716/// Rust profiler based on [async-profiler].
717///
718/// Spawning a profiler can be done either in an attached (controllable)
719/// mode, which allows for stopping the profiler (and, in fact, stops
720/// it when the relevant handle is dropped), or in detached mode,
721/// in which the profiler keeps running forever. Applications that can
722/// shut down the profiler at run-time, for example applications that
723/// support reconfiguration of a running profiler, generally want to use
724/// controllable mode. Other applications (most of them) should use
725/// detached mode.
726///
727/// In addition, the profiler can either be spawned into the current Tokio
728/// runtime, or into a new one. Normally, applications should spawn
729/// the profiler into their own Tokio runtime, but applications that
730/// don't have a default Tokio runtime should spawn it into a
731/// different one
732///
733/// This leaves 4 functions:
734/// 1. [Self::spawn] - detached, same runtime
735/// 2. [Self::spawn_thread_to_runtime] - detached, different runtime
736/// 3. [Self::spawn_controllable] - controllable, same runtime
737/// 4. [Self::spawn_controllable_thread_to_runtime] - controllable, different runtime
738///
739/// In addition, there's a helper function that just spawns the profiler
740/// to a new runtime in a new thread, for applications that don't have
741/// a Tokio runtime and don't need complex control:
742///
743/// 5. [Self::spawn_thread] - detached, new runtime in a new thread
744///
745/// [async-profiler]: https://github.com/async-profiler/async-profiler
746pub struct Profiler {
747    reporting_interval: Duration,
748    reporter: Box<dyn Reporter + Send + Sync>,
749    agent_metadata: Option<AgentMetadata>,
750    profiler_options: ProfilerOptions,
751}
752
753impl Profiler {
754    /// Start profiling. The profiler will run in a tokio task at the configured interval.
755    ///
756    /// This is the same as calling [Profiler::spawn_controllable] followed by
757    /// [RunningProfiler::detach], except it returns a [JoinHandle].
758    ///
759    /// The returned [JoinHandle] can be used to detect if the profiler has exited
760    /// due to a fatal error.
761    ///
762    /// This function will fail if it is unable to start async-profiler, for example
763    /// if it can't find or load `libasyncProfiler.so`.
764    ///
765    /// [JoinHandle]: tokio::task::JoinHandle
766    ///
767    /// ### Tokio Runtime
768    ///
769    /// This function must be run within a Tokio runtime, otherwise it will panic. If
770    /// your application does not have a `main` Tokio runtime, see
771    /// [Profiler::spawn_thread].
772    ///
773    /// ### Example
774    ///
775    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
776    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
777    ///
778    /// ```
779    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
780    /// # #[tokio::main]
781    /// # async fn main() -> Result<(), SpawnError> {
782    /// let profiler = ProfilerBuilder::default()
783    ///    .with_local_reporter("/tmp/profiles")
784    ///    .build();
785    /// # if false { // don't spawn the profiler in doctests
786    /// profiler.spawn()?;
787    /// # }
788    /// # Ok(())
789    /// # }
790    /// ```
791    pub fn spawn(self) -> Result<tokio::task::JoinHandle<()>, SpawnError> {
792        self.spawn_controllable().map(RunningProfiler::detach_inner)
793    }
794
795    /// Like [Self::spawn], but instead of spawning within the current Tokio
796    /// runtime, spawns within a set Tokio runtime and then runs a thread that calls
797    /// [block_on](tokio::runtime::Runtime::block_on) on that runtime.
798    ///
799    /// If your configuration is standard, use [Profiler::spawn_thread].
800    ///
801    /// If you want to be able to stop the resulting profiler, use
802    /// [Profiler::spawn_controllable_thread_to_runtime].
803    ///
804    /// `spawn_fn` should be [`std::thread::spawn`], or some function that behaves like it (to
805    /// allow for configuring thread properties, for example thread names).
806    ///
807    /// This is to be used when your program does not have a "main" Tokio runtime already set up.
808    ///
809    /// ### Example
810    ///
811    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
812    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
813    ///
814    /// ```no_run
815    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
816    /// let rt = tokio::runtime::Builder::new_current_thread()
817    ///     .enable_all()
818    ///     .build()?;
819    /// let profiler = ProfilerBuilder::default()
820    ///    .with_local_reporter("/tmp/profiles")
821    ///    .build();
822    ///
823    /// profiler.spawn_thread_to_runtime(
824    ///     rt,
825    ///     |t| {
826    ///         std::thread::Builder::new()
827    ///             .name("asprof-agent".to_owned())
828    ///             .spawn(t)
829    ///             .expect("thread name contains nuls")
830    ///     }
831    /// )?;
832    /// # Ok::<_, anyhow::Error>(())
833    /// ```
834    pub fn spawn_thread_to_runtime(
835        self,
836        runtime: tokio::runtime::Runtime,
837        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
838    ) -> Result<(), SpawnError> {
839        self.spawn_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn)
840    }
841
842    /// Like [Self::spawn], but instead of spawning within the current Tokio
843    /// runtime, spawns within a new Tokio runtime and then runs a thread that calls
844    /// [block_on](tokio::runtime::Runtime::block_on) on that runtime, setting up the runtime
845    /// by itself.
846    ///
847    /// If your configuration is less standard, use [Profiler::spawn_thread_to_runtime]. Calling
848    /// [Profiler::spawn_thread] is equivalent to calling [Profiler::spawn_thread_to_runtime]
849    /// with the following:
850    /// 1. a current thread runtime with background worker threads (these exist
851    ///    for blocking IO) named "asprof-worker"
852    /// 2. a controller thread (the "main" thread of the runtime) named "asprof-agent"
853    ///
854    /// If you want to be able to stop the resulting profiler, use
855    /// [Profiler::spawn_controllable_thread_to_runtime].
856    ///
857    /// This is to be used when your program does not have a "main" Tokio runtime already set up.
858    ///
859    /// ### Example
860    ///
861    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
862    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
863    ///
864    /// ```no_run
865    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
866    /// # use async_profiler_agent::reporter::local::LocalReporter;
867    /// let profiler = ProfilerBuilder::default()
868    ///    .with_local_reporter("/tmp/profiles")
869    ///    .build();
870    ///
871    /// profiler.spawn_thread()?;
872    /// # Ok::<_, anyhow::Error>(())
873    /// ```
874    pub fn spawn_thread(self) -> Result<(), SpawnThreadError> {
875        // using "asprof" in thread name to deal with 15 character + \0 length limit
876        let rt = tokio::runtime::Builder::new_current_thread()
877            .thread_name("asprof-worker".to_owned())
878            .enable_all()
879            .build()
880            .map_err(SpawnThreadError::ConstructRt)?;
881        let builder = std::thread::Builder::new().name("asprof-agent".to_owned());
882        self.spawn_thread_to_runtime(rt, |t| builder.spawn(t).expect("thread name contains nuls"))
883            .map_err(SpawnThreadError::AsProf)
884    }
885
886    fn spawn_thread_inner<E: ProfilerEngine>(
887        self,
888        asprof: E,
889        runtime: tokio::runtime::Runtime,
890        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
891    ) -> Result<(), SpawnError> {
892        let handle: RunningProfiler = runtime.block_on(async move { self.spawn_inner(asprof) })?;
893        handle.spawn_detached(runtime, spawn_fn);
894        Ok(())
895    }
896
897    /// Like [Self::spawn], but returns a [RunningProfiler] that allows for controlling
898    /// (currently only stopping) the profiler.
899    ///
900    /// This allows for changing the configuration of the profiler at runtime, by
901    /// stopping it and then starting a new Profiler with a new configuration. It
902    /// also allows for stopping profiling in case the profiler is suspected to
903    /// cause operational issues.
904    ///
905    /// Dropping the returned [RunningProfiler] will cause the profiler to quit,
906    /// so if your application doen't need to change the profiler's configuration at runtime,
907    /// it will be easier to use [Profiler::spawn].
908    ///
909    /// This function will fail if it is unable to start async-profiler, for example
910    /// if it can't find or load `libasyncProfiler.so`.
911    ///
912    /// ### Tokio Runtime
913    ///
914    /// This function must be run within a Tokio runtime, otherwise it will panic. If
915    /// your application does not have a `main` Tokio runtime, see
916    /// [Profiler::spawn_controllable_thread_to_runtime].
917    ///
918    /// ### Example
919    ///
920    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
921    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
922    ///
923    /// ```no_run
924    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
925    /// # #[tokio::main]
926    /// # async fn main() -> Result<(), SpawnError> {
927    /// let profiler = ProfilerBuilder::default()
928    ///    .with_local_reporter("/tmp/profiles")
929    ///    .build();
930    ///
931    /// let profiler = profiler.spawn_controllable()?;
932    ///
933    /// // [insert your signaling/monitoring mechanism to have a request to disable
934    /// // profiling in case of a problem]
935    /// let got_request_to_disable_profiling = async move {
936    ///     // ...
937    /// #   false
938    /// };
939    /// // spawn a task that will disable profiling if requested
940    /// tokio::task::spawn(async move {
941    ///     if got_request_to_disable_profiling.await {
942    ///         profiler.stop().await;
943    ///     }
944    /// });
945    /// # Ok(())
946    /// # }
947    /// ```
948    pub fn spawn_controllable(self) -> Result<RunningProfiler, SpawnError> {
949        self.spawn_inner(asprof::AsProf::builder().build())
950    }
951
952    /// Like [Self::spawn_controllable], but instead of spawning within the current Tokio
953    /// runtime, spawns within a set Tokio runtime and then runs a thread that calls
954    /// [block_on](tokio::runtime::Runtime::block_on) on that runtime.
955    ///
956    /// `spawn_fn` should be [`std::thread::spawn`], or some function that behaves like it (to
957    /// allow for configuring thread properties, for example thread names).
958    ///
959    /// This is to be used when your program does not have a "main" Tokio runtime already set up.
960    ///
961    /// ### Example
962    ///
963    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
964    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
965    ///
966    /// ```no_run
967    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
968    /// let rt = tokio::runtime::Builder::new_current_thread()
969    ///     .enable_all()
970    ///     .build()?;
971    /// let profiler = ProfilerBuilder::default()
972    ///    .with_local_reporter("/tmp/profiles")
973    ///    .build();
974    ///
975    /// let profiler = profiler.spawn_controllable_thread_to_runtime(
976    ///     rt,
977    ///     |t| {
978    ///         std::thread::Builder::new()
979    ///             .name("asprof-agent".to_owned())
980    ///             .spawn(t)
981    ///             .expect("thread name contains nuls")
982    ///     }
983    /// )?;
984    ///
985    /// # fn got_request_to_disable_profiling() -> bool { false }
986    /// // spawn a task that will disable profiling if requested
987    /// std::thread::spawn(move || {
988    ///     if got_request_to_disable_profiling() {
989    ///         profiler.stop();
990    ///     }
991    /// });
992    /// # Ok::<_, anyhow::Error>(())
993    /// ```
994    pub fn spawn_controllable_thread_to_runtime(
995        self,
996        runtime: tokio::runtime::Runtime,
997        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
998    ) -> Result<RunningProfilerThread, SpawnError> {
999        self.spawn_controllable_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn)
1000    }
1001
1002    fn spawn_controllable_thread_inner<E: ProfilerEngine>(
1003        self,
1004        asprof: E,
1005        runtime: tokio::runtime::Runtime,
1006        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
1007    ) -> Result<RunningProfilerThread, SpawnError> {
1008        let handle = runtime.block_on(async move { self.spawn_inner(asprof) })?;
1009        Ok(handle.spawn_attached(runtime, spawn_fn))
1010    }
1011
1012    fn spawn_inner<E: ProfilerEngine>(self, asprof: E) -> Result<RunningProfiler, SpawnError> {
1013        // Initialize async profiler - needs to be done once.
1014        E::init_async_profiler()?;
1015        tracing::info!("successfully initialized async profiler.");
1016
1017        let mut sampling_ticker = tokio::time::interval(self.reporting_interval);
1018        let (stop_channel, mut stop_rx) = tokio::sync::oneshot::channel();
1019
1020        // Get profiles at the configured interval rate.
1021        let join_handle = tokio::spawn(async move {
1022            let mut state = match ProfilerState::new(asprof, self.profiler_options) {
1023                Ok(state) => state,
1024                Err(err) => {
1025                    tracing::error!(?err, "unable to create profiler state");
1026                    return;
1027                }
1028            };
1029
1030            // Lazily-loaded if not specified up front.
1031            let mut agent_metadata = self.agent_metadata;
1032            let mut done = false;
1033
1034            while !done {
1035                // Wait until a timer or exit event
1036                tokio::select! {
1037                    biased;
1038
1039                    r = &mut stop_rx, if !stop_rx.is_terminated() => {
1040                        match r {
1041                            Err(_) => {
1042                                tracing::info!("profiler stop requested, doing a final tick");
1043                                done = true;
1044                            }
1045                        }
1046                    }
1047                    _ = sampling_ticker.tick() => {
1048                        tracing::debug!("profiler timer woke up");
1049                    }
1050                }
1051
1052                if let Err(err) = profiler_tick(
1053                    &mut state,
1054                    &mut agent_metadata,
1055                    &*self.reporter,
1056                    self.reporting_interval,
1057                )
1058                .await
1059                {
1060                    match &err {
1061                        TickError::Reporter(_) => {
1062                            // don't stop on IO errors
1063                            tracing::error!(?err, "error during profiling, continuing");
1064                        }
1065                        _stop => {
1066                            tracing::error!(?err, "error during profiling, stopping");
1067                            break;
1068                        }
1069                    }
1070                }
1071            }
1072
1073            tracing::info!("profiling task finished");
1074        });
1075
1076        Ok(RunningProfiler {
1077            stop_channel,
1078            join_handle,
1079        })
1080    }
1081}
1082
1083async fn profiler_tick<E: ProfilerEngine>(
1084    state: &mut ProfilerState<E>,
1085    agent_metadata: &mut Option<AgentMetadata>,
1086    reporter: &(dyn Reporter + Send + Sync),
1087    reporting_interval: Duration,
1088) -> Result<(), TickError> {
1089    if !state.is_started() {
1090        state.start().await?;
1091        return Ok(());
1092    }
1093
1094    let Some(start) = state.stop()? else {
1095        tracing::warn!("stopped the profiler but it wasn't running?");
1096        return Ok(());
1097    };
1098    let start = start.duration_since(UNIX_EPOCH)?;
1099    let end = SystemTime::now().duration_since(UNIX_EPOCH)?;
1100
1101    // Start it up immediately, writing to the "other" file, so that we keep
1102    // profiling the application while we're reporting data.
1103    state
1104        .jfr_file_mut()
1105        .empty_inactive_file()
1106        .map_err(TickError::EmptyInactiveFile)?;
1107    state.jfr_file_mut().swap();
1108    state.start().await?;
1109
1110    // Lazily load the agent metadata if it was not provided in
1111    // the constructor. See the struct comments for why this is.
1112    // This code runs at most once.
1113    if agent_metadata.is_none() {
1114        #[cfg(feature = "aws-metadata-no-defaults")]
1115        let md = crate::metadata::aws::load_agent_metadata().await?;
1116        #[cfg(not(feature = "aws-metadata-no-defaults"))]
1117        let md = crate::metadata::AgentMetadata::NoMetadata;
1118        tracing::debug!("loaded metadata");
1119        agent_metadata.replace(md);
1120    }
1121
1122    let report_metadata = ReportMetadata {
1123        instance: agent_metadata.as_ref().unwrap(),
1124        start,
1125        end,
1126        reporting_interval,
1127    };
1128
1129    let jfr = tokio::fs::read(state.jfr_file_mut().inactive_path())
1130        .await
1131        .map_err(TickError::JfrRead)?;
1132
1133    reporter
1134        .report(jfr, &report_metadata)
1135        .await
1136        .map_err(TickError::Reporter)?;
1137
1138    Ok(())
1139}
1140
1141#[cfg(test)]
1142mod tests {
1143    use std::sync::Arc;
1144    use std::sync::atomic::{self, AtomicBool, AtomicU32};
1145
1146    use test_case::test_case;
1147
1148    use super::*;
1149
1150    #[test]
1151    fn test_jfr_file_drop() {
1152        let mut jfr = JfrFile::new().unwrap();
1153
1154        std::fs::write(jfr.active_path(), b"Hello, 2!").unwrap();
1155        jfr.swap();
1156        assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"Hello, 2!");
1157        jfr.empty_inactive_file().unwrap();
1158        assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"");
1159    }
1160
1161    struct MockProfilerEngine {
1162        counter: AtomicU32,
1163    }
1164    impl ProfilerEngine for MockProfilerEngine {
1165        fn init_async_profiler() -> Result<(), asprof::AsProfError> {
1166            Ok(())
1167        }
1168
1169        fn start_async_profiler(
1170            &self,
1171            jfr_file_path: &Path,
1172            _options: &ProfilerOptions,
1173        ) -> Result<(), asprof::AsProfError> {
1174            let contents = format!(
1175                "JFR{}",
1176                self.counter.fetch_add(1, atomic::Ordering::Relaxed)
1177            );
1178            std::fs::write(jfr_file_path, contents.as_bytes()).unwrap();
1179            Ok(())
1180        }
1181
1182        fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
1183            Ok(())
1184        }
1185    }
1186
1187    struct MockReporter(tokio::sync::mpsc::Sender<(String, AgentMetadata)>);
1188    impl std::fmt::Debug for MockReporter {
1189        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1190            f.debug_struct("MockReporter").finish()
1191        }
1192    }
1193
1194    #[async_trait::async_trait]
1195    impl Reporter for MockReporter {
1196        async fn report(
1197            &self,
1198            jfr: Vec<u8>,
1199            metadata: &ReportMetadata,
1200        ) -> Result<(), Box<dyn std::error::Error + Send>> {
1201            self.0
1202                .send((String::from_utf8(jfr).unwrap(), metadata.instance.clone()))
1203                .await
1204                .unwrap();
1205            Ok(())
1206        }
1207    }
1208
1209    fn make_mock_profiler() -> (
1210        Profiler,
1211        tokio::sync::mpsc::Receiver<(String, AgentMetadata)>,
1212    ) {
1213        let (tx, rx) = tokio::sync::mpsc::channel(1);
1214        let agent = ProfilerBuilder::default()
1215            .with_reporter(MockReporter(tx))
1216            .with_custom_agent_metadata(AgentMetadata::Ec2AgentMetadata {
1217                aws_account_id: "0".into(),
1218                aws_region_id: "us-east-1".into(),
1219                ec2_instance_id: "i-fake".into(),
1220                #[cfg(feature = "__unstable-fargate-cpu-count")]
1221                ec2_instance_type: "t3.micro".into(),
1222            })
1223            .build();
1224        (agent, rx)
1225    }
1226
1227    #[tokio::test(start_paused = true)]
1228    async fn test_profiler_agent() {
1229        let e_md = AgentMetadata::Ec2AgentMetadata {
1230            aws_account_id: "0".into(),
1231            aws_region_id: "us-east-1".into(),
1232            ec2_instance_id: "i-fake".into(),
1233            #[cfg(feature = "__unstable-fargate-cpu-count")]
1234            ec2_instance_type: "t3.micro".into(),
1235        };
1236        let (agent, mut rx) = make_mock_profiler();
1237        agent
1238            .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1239                counter: AtomicU32::new(0),
1240            })
1241            .unwrap()
1242            .detach();
1243        let (jfr, md) = rx.recv().await.unwrap();
1244        assert_eq!(jfr, "JFR0");
1245        assert_eq!(e_md, md);
1246        let (jfr, md) = rx.recv().await.unwrap();
1247        assert_eq!(jfr, "JFR1");
1248        assert_eq!(e_md, md);
1249    }
1250
1251    #[test_case(false; "uncontrollable")]
1252    #[test_case(true; "controllable")]
1253    fn test_profiler_local_rt(controllable: bool) {
1254        let e_md = AgentMetadata::Ec2AgentMetadata {
1255            aws_account_id: "0".into(),
1256            aws_region_id: "us-east-1".into(),
1257            ec2_instance_id: "i-fake".into(),
1258            #[cfg(feature = "__unstable-fargate-cpu-count")]
1259            ec2_instance_type: "t3.micro".into(),
1260        };
1261        let (agent, mut rx) = make_mock_profiler();
1262        let rt = tokio::runtime::Builder::new_current_thread()
1263            .enable_all()
1264            .start_paused(true)
1265            .build()
1266            .unwrap();
1267        // spawn the profiler, doing this before spawning a thread to allow
1268        // capturing errors from `spawn`
1269        let handle = if controllable {
1270            Some(
1271                agent
1272                    .spawn_controllable_thread_inner::<MockProfilerEngine>(
1273                        MockProfilerEngine {
1274                            counter: AtomicU32::new(0),
1275                        },
1276                        rt,
1277                        std::thread::spawn,
1278                    )
1279                    .unwrap(),
1280            )
1281        } else {
1282            agent
1283                .spawn_thread_inner::<MockProfilerEngine>(
1284                    MockProfilerEngine {
1285                        counter: AtomicU32::new(0),
1286                    },
1287                    rt,
1288                    std::thread::spawn,
1289                )
1290                .unwrap();
1291            None
1292        };
1293
1294        let (jfr, md) = rx.blocking_recv().unwrap();
1295        assert_eq!(jfr, "JFR0");
1296        assert_eq!(e_md, md);
1297        let (jfr, md) = rx.blocking_recv().unwrap();
1298        assert_eq!(jfr, "JFR1");
1299        assert_eq!(e_md, md);
1300
1301        if let Some(handle) = handle {
1302            let drain_thread =
1303                std::thread::spawn(move || while let Some(_) = rx.blocking_recv() {});
1304            // request a stop
1305            handle.stop();
1306            // the drain thread should be done
1307            drain_thread.join().unwrap();
1308        }
1309    }
1310
1311    enum StopKind {
1312        Delibrate,
1313        Drop,
1314        Abort,
1315    }
1316
1317    #[tokio::test(start_paused = true)]
1318    #[test_case(StopKind::Delibrate; "deliberate stop")]
1319    #[test_case(StopKind::Drop; "drop stop")]
1320    #[test_case(StopKind::Abort; "abort stop")]
1321    async fn test_profiler_stop(stop_kind: StopKind) {
1322        let e_md = AgentMetadata::Ec2AgentMetadata {
1323            aws_account_id: "0".into(),
1324            aws_region_id: "us-east-1".into(),
1325            ec2_instance_id: "i-fake".into(),
1326            #[cfg(feature = "__unstable-fargate-cpu-count")]
1327            ec2_instance_type: "t3.micro".into(),
1328        };
1329        let (agent, mut rx) = make_mock_profiler();
1330        let profiler_ref = agent
1331            .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1332                counter: AtomicU32::new(0),
1333            })
1334            .unwrap();
1335        let (jfr, md) = rx.recv().await.unwrap();
1336        assert_eq!(jfr, "JFR0");
1337        assert_eq!(e_md, md);
1338        let (jfr, md) = rx.recv().await.unwrap();
1339        assert_eq!(jfr, "JFR1");
1340        assert_eq!(e_md, md);
1341        // check that stop is faster than an interval and returns an "immediate" next jfr
1342        match stop_kind {
1343            StopKind::Drop => drop(profiler_ref),
1344            StopKind::Delibrate => {
1345                tokio::time::timeout(Duration::from_millis(1), profiler_ref.stop())
1346                    .await
1347                    .unwrap();
1348            }
1349            StopKind::Abort => {
1350                // You can call Abort on the JoinHandle. make sure that is not buggy.
1351                profiler_ref.detach_inner().abort();
1352            }
1353        }
1354        // check that we get the next JFR "quickly", and the JFR after that is empty.
1355        let (jfr, md) = tokio::time::timeout(Duration::from_millis(1), rx.recv())
1356            .await
1357            .unwrap()
1358            .unwrap();
1359        assert_eq!(jfr, "JFR2");
1360        assert_eq!(e_md, md);
1361        assert!(rx.recv().await.is_none());
1362    }
1363
1364    // simulate a badly-behaved profiler that errors on start/stop and then
1365    // tries to access the JFR file
1366    struct StopErrorProfilerEngine {
1367        start_error: bool,
1368        counter: Arc<AtomicBool>,
1369    }
1370    impl ProfilerEngine for StopErrorProfilerEngine {
1371        fn init_async_profiler() -> Result<(), asprof::AsProfError> {
1372            Ok(())
1373        }
1374
1375        fn start_async_profiler(
1376            &self,
1377            jfr_file_path: &Path,
1378            _options: &ProfilerOptions,
1379        ) -> Result<(), asprof::AsProfError> {
1380            let jfr_file_path = jfr_file_path.to_owned();
1381            std::fs::write(&jfr_file_path, "JFR").unwrap();
1382            let counter = self.counter.clone();
1383            tokio::task::spawn(async move {
1384                tokio::time::sleep(Duration::from_secs(5)).await;
1385                assert_eq!(std::fs::read_to_string(jfr_file_path).unwrap(), "JFR");
1386                counter.store(true, atomic::Ordering::Release);
1387            });
1388            if self.start_error {
1389                Err(asprof::AsProfError::AsyncProfilerError("error".into()))
1390            } else {
1391                Ok(())
1392            }
1393        }
1394
1395        fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
1396            Err(asprof::AsProfError::AsyncProfilerError("error".into()))
1397        }
1398    }
1399
1400    #[tokio::test(start_paused = true)]
1401    #[test_case(false; "error on stop")]
1402    #[test_case(true; "error on start")]
1403    async fn test_profiler_error(start_error: bool) {
1404        let (agent, mut rx) = make_mock_profiler();
1405        let counter = Arc::new(AtomicBool::new(false));
1406        let engine = StopErrorProfilerEngine {
1407            start_error,
1408            counter: counter.clone(),
1409        };
1410        let handle = agent.spawn_inner(engine).unwrap().detach_inner();
1411        assert!(rx.recv().await.is_none());
1412        // check that the "sleep 5" step in start_async_profiler succeeds
1413        for _ in 0..100 {
1414            tokio::time::sleep(Duration::from_secs(1)).await;
1415            if counter.load(atomic::Ordering::Acquire) {
1416                handle.await.unwrap(); // Check that the JoinHandle is done
1417                return;
1418            }
1419        }
1420        panic!("didn't read from file");
1421    }
1422
1423    #[test]
1424    fn test_profiler_options_to_args_string_default() {
1425        let opts = ProfilerOptions::default();
1426        let dummy_path = Path::new("/tmp/test.jfr");
1427        let args = opts.to_args_string(dummy_path);
1428        assert!(
1429            args.contains("start,event=cpu,interval=100000000,wall=1000ms,jfr,cstack=dwarf"),
1430            "Default args string not constructed correctly"
1431        );
1432        assert!(args.contains("file=/tmp/test.jfr"));
1433        assert!(!args.contains("nativemem="));
1434    }
1435
1436    #[test]
1437    fn test_profiler_options_to_args_string_with_native_mem() {
1438        let opts = ProfilerOptions {
1439            native_mem: Some("10m".to_string()),
1440            wall_clock_millis: None,
1441            cpu_interval: None,
1442        };
1443        let dummy_path = Path::new("/tmp/test.jfr");
1444        let args = opts.to_args_string(dummy_path);
1445        assert!(args.contains("nativemem=10m"));
1446    }
1447
1448    #[test]
1449    fn test_profiler_options_builder() {
1450        let opts = ProfilerOptionsBuilder::default()
1451            .with_native_mem_bytes(5000000)
1452            .build();
1453
1454        assert_eq!(opts.native_mem, Some("5000000".to_string()));
1455    }
1456
1457    #[test]
1458    fn test_profiler_options_builder_all_options() {
1459        let opts = ProfilerOptionsBuilder::default()
1460            .with_native_mem_bytes(5000000)
1461            .with_cpu_interval(Duration::from_secs(1))
1462            .with_wall_clock_interval(Duration::from_secs(10))
1463            .build();
1464
1465        let dummy_path = Path::new("/tmp/test.jfr");
1466        let args = opts.to_args_string(dummy_path);
1467        assert_eq!(
1468            args,
1469            "start,event=cpu,interval=1000000000,wall=10000ms,jfr,cstack=dwarf,file=/tmp/test.jfr,nativemem=5000000"
1470        );
1471    }
1472
1473    #[test]
1474    fn test_local_reporter_has_no_metadata() {
1475        // Check that with_local_reporter sets some configuration
1476        let reporter = ProfilerBuilder::default().with_local_reporter(".");
1477        assert_eq!(
1478            format!("{:?}", reporter.reporter),
1479            r#"Some(LocalReporter { directory: "." })"#
1480        );
1481        match reporter.agent_metadata {
1482            Some(AgentMetadata::NoMetadata) => {}
1483            bad => panic!("{bad:?}"),
1484        };
1485    }
1486}