async_profiler_agent/
profiler.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! A profiler that periodically uploads profiling samples of your program to a [Reporter]
5
6use crate::{
7    asprof::{self, AsProfError},
8    metadata::{AgentMetadata, ReportMetadata},
9    reporter::{Reporter, local::LocalReporter},
10};
11use std::{
12    fs::File,
13    io, mem,
14    path::{Path, PathBuf},
15    time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
16};
17use thiserror::Error;
18
19struct JfrFile {
20    active: std::fs::File,
21    inactive: std::fs::File,
22}
23
24impl JfrFile {
25    #[cfg(target_os = "linux")]
26    fn new() -> Result<Self, io::Error> {
27        Ok(Self {
28            active: tempfile::tempfile().unwrap(),
29            inactive: tempfile::tempfile().unwrap(),
30        })
31    }
32
33    #[cfg(not(target_os = "linux"))]
34    fn new() -> Result<Self, io::Error> {
35        Err(io::Error::other(
36            "async-profiler is only supported on Linux",
37        ))
38    }
39
40    fn swap(&mut self) {
41        std::mem::swap(&mut self.active, &mut self.inactive);
42    }
43
44    #[cfg(target_os = "linux")]
45    fn file_path(file: &std::fs::File) -> PathBuf {
46        use std::os::fd::AsRawFd;
47
48        format!("/proc/self/fd/{}", file.as_raw_fd()).into()
49    }
50
51    #[cfg(not(target_os = "linux"))]
52    fn file_path(_file: &std::fs::File) -> PathBuf {
53        unimplemented!()
54    }
55
56    fn active_path(&self) -> PathBuf {
57        Self::file_path(&self.active)
58    }
59
60    fn inactive_path(&self) -> PathBuf {
61        Self::file_path(&self.inactive)
62    }
63
64    fn empty_inactive_file(&mut self) -> Result<(), io::Error> {
65        // Empty the file, or create it for the first time if the profiler hasn't
66        // started yet.
67        File::create(Self::file_path(&self.inactive))?;
68        tracing::debug!(message = "emptied the file");
69        Ok(())
70    }
71}
72
73/// Options for configuring the async-profiler behavior.
74/// Currently supports:
75/// - Native memory allocation tracking
76#[derive(Debug, Default)]
77#[non_exhaustive]
78pub struct ProfilerOptions {
79    /// If set, the profiler will collect information about
80    /// native memory allocations.
81    ///
82    /// The value is the interval in bytes or in other units,
83    /// if followed by k (kilobytes), m (megabytes), or g (gigabytes).
84    /// For example, `"10m"` will sample an allocation for every
85    /// 10 megabytes of memory allocated. Passing `"0"` will sample
86    /// all allocations.
87    ///
88    /// See [ProfilingModes in the async-profiler docs] for more details.
89    ///
90    /// [ProfilingModes in the async-profiler docs]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilingModes.md#native-memory-leaks
91    pub native_mem: Option<String>,
92    cpu_interval: Option<u128>,
93    wall_clock_millis: Option<u128>,
94}
95
96const DEFAULT_CPU_INTERVAL_NANOS: u128 = 100_000_000;
97const DEFAULT_WALL_CLOCK_INTERVAL_MILLIS: u128 = 1_000;
98
99impl ProfilerOptions {
100    /// Convert the profiler options to a string of arguments for the async-profiler.
101    pub fn to_args_string(&self, jfr_file_path: &std::path::Path) -> String {
102        let mut args = format!(
103            "start,event=cpu,interval={},wall={}ms,jfr,cstack=dwarf,file={}",
104            self.cpu_interval.unwrap_or(DEFAULT_CPU_INTERVAL_NANOS),
105            self.wall_clock_millis
106                .unwrap_or(DEFAULT_WALL_CLOCK_INTERVAL_MILLIS),
107            jfr_file_path.display()
108        );
109        if let Some(ref native_mem) = self.native_mem {
110            args.push_str(&format!(",nativemem={native_mem}"));
111        }
112        args
113    }
114}
115
116/// Builder for [`ProfilerOptions`].
117#[derive(Debug, Default)]
118pub struct ProfilerOptionsBuilder {
119    native_mem: Option<String>,
120    cpu_interval: Option<u128>,
121    wall_clock_millis: Option<u128>,
122}
123
124impl ProfilerOptionsBuilder {
125    /// Same as [ProfilerOptionsBuilder::with_native_mem_bytes], but pass
126    /// the string input directly to async_profiler.
127    ///
128    /// The value is the interval in bytes or in other units,
129    /// if followed by k (kilobytes), m (megabytes), or g (gigabytes).
130    ///
131    /// Prefer using [ProfilerOptionsBuilder::with_native_mem_bytes], since it's
132    /// type-checked.
133    ///
134    /// ### Examples
135    ///
136    /// This will sample allocations for every 10 megabytes allocated:
137    ///
138    /// ```
139    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
140    /// # use async_profiler_agent::profiler::SpawnError;
141    /// # #[tokio::main]
142    /// # async fn main() -> Result<(), SpawnError> {
143    /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build();
144    /// let profiler = ProfilerBuilder::default()
145    ///     .with_profiler_options(opts)
146    ///     .with_local_reporter("/tmp/profiles")
147    ///     .build();
148    /// # if false { // don't spawn the profiler in doctests
149    /// let profiler = profiler.spawn_controllable()?;
150    /// // ... your program goes here
151    /// profiler.stop().await; // make sure the last profile is flushed
152    /// # }
153    /// # Ok(())
154    /// # }
155    /// ```
156    pub fn with_native_mem(mut self, native_mem_interval: String) -> Self {
157        self.native_mem = Some(native_mem_interval);
158        self
159    }
160
161    /// If set, the profiler will collect information about
162    /// native memory allocations.
163    ///
164    /// The argument passed is the profiling interval - the profiler will
165    /// sample allocations every about that many bytes.
166    ///
167    /// See [ProfilingModes in the async-profiler docs] for more details.
168    ///
169    /// [ProfilingModes in the async-profiler docs]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilingModes.md#native-memory-leaks
170    ///
171    /// ### Examples
172    ///
173    /// This will sample allocations for every 10 megabytes allocated:
174    ///
175    /// ```
176    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
177    /// # use async_profiler_agent::profiler::SpawnError;
178    /// # #[tokio::main]
179    /// # async fn main() -> Result<(), SpawnError> {
180    /// let opts = ProfilerOptionsBuilder::default().with_native_mem_bytes(10_000_000).build();
181    /// let profiler = ProfilerBuilder::default()
182    ///     .with_profiler_options(opts)
183    ///     .with_local_reporter("/tmp/profiles")
184    ///     .build();
185    /// # if false { // don't spawn the profiler in doctests
186    /// let profiler = profiler.spawn_controllable()?;
187    /// // ... your program goes here
188    /// profiler.stop().await; // make sure the last profile is flushed
189    /// # }
190    /// # Ok(())
191    /// # }
192    /// ```
193    ///
194    /// This will sample every allocation (potentially slow):
195    /// ```
196    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
197    /// # use async_profiler_agent::profiler::SpawnError;
198    /// # #[tokio::main]
199    /// # async fn main() -> Result<(), SpawnError> {
200    /// let opts = ProfilerOptionsBuilder::default().with_native_mem_bytes(0).build();
201    /// let profiler = ProfilerBuilder::default()
202    ///     .with_profiler_options(opts)
203    ///     .with_local_reporter("/tmp/profiles")
204    ///     .build();
205    /// # if false { // don't spawn the profiler in doctests
206    /// let profiler = profiler.spawn_controllable()?;
207    /// // ... your program goes here
208    /// profiler.stop().await; // make sure the last profile is flushed
209    /// # }
210    /// # Ok(())
211    /// # }
212    /// ```
213    pub fn with_native_mem_bytes(mut self, native_mem_interval: usize) -> Self {
214        self.native_mem = Some(native_mem_interval.to_string());
215        self
216    }
217
218    /// Sets the interval in which the profiler will collect
219    /// CPU-time samples, via the [async-profiler `interval` option].
220    ///
221    /// CPU-time samples (JFR `jdk.ExecutionSample`) sample only threads that
222    /// are currently running on a CPU, not threads that are sleeping.
223    ///
224    /// It can use a higher frequency than wall-clock sampling since the
225    /// number of the threads that are running on a CPU at a given time is
226    /// naturally limited by the number of CPUs, while the number of sleeping
227    /// threads can be much larger.
228    ///
229    /// The default is to do a CPU-time sample every 100 milliseconds.
230    ///
231    /// The async-profiler agent collects both CPU time and wall-clock time
232    /// samples, so this function should normally be used along with
233    /// [ProfilerOptionsBuilder::with_wall_clock_interval].
234    ///
235    /// [async-profiler `interval` option]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilerOptions.md#options-applicable-to-any-output-format
236    ///
237    /// ### Examples
238    ///
239    /// This will sample allocations for every 10 CPU milliseconds (when running)
240    /// and 100 wall-clock milliseconds (running or sleeping):
241    ///
242    /// ```
243    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
244    /// # use async_profiler_agent::profiler::SpawnError;
245    /// # use std::time::Duration;
246    /// # #[tokio::main]
247    /// # async fn main() -> Result<(), SpawnError> {
248    /// let opts = ProfilerOptionsBuilder::default()
249    ///     .with_cpu_interval(Duration::from_millis(10))
250    ///     .with_wall_clock_interval(Duration::from_millis(100))
251    ///     .build();
252    /// let profiler = ProfilerBuilder::default()
253    ///     .with_profiler_options(opts)
254    ///     .with_local_reporter("/tmp/profiles")
255    ///     .build();
256    /// # if false { // don't spawn the profiler in doctests
257    /// let profiler = profiler.spawn_controllable()?;
258    /// // ... your program goes here
259    /// profiler.stop().await; // make sure the last profile is flushed
260    /// # }
261    /// # Ok(())
262    /// # }
263    /// ```
264    pub fn with_cpu_interval(mut self, cpu_interval: Duration) -> Self {
265        self.cpu_interval = Some(cpu_interval.as_nanos());
266        self
267    }
268
269    /// Sets the interval, in milliseconds, in which the profiler will collect
270    /// wall-clock samples, via the [async-profiler `wall` option].
271    ///
272    /// Wall-clock samples (JFR `profiler.WallClockSample`) sample threads
273    /// whether they are sleeping or running, and can therefore be
274    /// very useful for finding threads that are blocked, for example
275    /// on a synchronous lock or a slow system call.
276    ///
277    /// When using Tokio, since tasks are not threads, tasks that are not
278    /// currently running will not be sampled by a wall clock sample. However,
279    /// a wall clock sample is still very useful in Tokio, since it is what
280    /// you want to catch tasks that are blocking a thread by waiting on
281    /// synchronous operations.
282    ///
283    /// The default is to do a wall-clock sample every second.
284    ///
285    /// The async-profiler agent collects both CPU time and wall-clock time
286    /// samples, so this function should normally be used along with
287    /// [ProfilerOptionsBuilder::with_cpu_interval].
288    ///
289    /// [async-profiler `wall` option]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilerOptions.md#options-applicable-to-any-output-format
290    ///
291    /// ### Examples
292    ///
293    /// This will sample allocations for every 10 CPU milliseconds (when running)
294    /// and 100 wall-clock milliseconds (running or sleeping):
295    ///
296    /// ```
297    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
298    /// # use async_profiler_agent::profiler::SpawnError;
299    /// # use std::time::Duration;
300    /// # #[tokio::main]
301    /// # async fn main() -> Result<(), SpawnError> {
302    /// let opts = ProfilerOptionsBuilder::default()
303    ///     .with_cpu_interval(Duration::from_millis(10))
304    ///     .with_wall_clock_interval(Duration::from_millis(10))
305    ///     .build();
306    /// let profiler = ProfilerBuilder::default()
307    ///     .with_profiler_options(opts)
308    ///     .with_local_reporter("/tmp/profiles")
309    ///     .build();
310    /// # if false { // don't spawn the profiler in doctests
311    /// let profiler = profiler.spawn_controllable()?;
312    /// // ... your program goes here
313    /// profiler.stop().await; // make sure the last profile is flushed
314    /// # }
315    /// # Ok(())
316    /// # }
317    /// ```
318    pub fn with_wall_clock_interval(mut self, wall_clock: Duration) -> Self {
319        self.wall_clock_millis = Some(wall_clock.as_millis());
320        self
321    }
322
323    /// Build the [`ProfilerOptions`] from the builder.
324    pub fn build(self) -> ProfilerOptions {
325        ProfilerOptions {
326            native_mem: self.native_mem,
327            wall_clock_millis: self.wall_clock_millis,
328            cpu_interval: self.cpu_interval,
329        }
330    }
331}
332
333/// Builds a [`Profiler`], panicking if any required fields were not set by the
334/// time `build` is called.
335#[derive(Debug, Default)]
336pub struct ProfilerBuilder {
337    reporting_interval: Option<Duration>,
338    reporter: Option<Box<dyn Reporter + Send + Sync>>,
339    agent_metadata: Option<AgentMetadata>,
340    profiler_options: Option<ProfilerOptions>,
341}
342
343impl ProfilerBuilder {
344    /// Sets the reporting interval (default: 30 seconds).
345    ///
346    /// This is the interval that samples are *reported* to the backend,
347    /// and is unrelated to the interval at which the application
348    /// is *sampled* by async profiler, which is controlled by
349    /// [ProfilerOptionsBuilder::with_cpu_interval] and
350    /// [ProfilerOptionsBuilder::with_wall_clock_interval].
351    ///
352    /// Most users should not change this setting.
353    ///
354    /// ## Example
355    ///
356    /// ```no_run
357    /// # use async_profiler_agent::profiler::SpawnError;
358    /// # #[tokio::main]
359    /// # async fn main() -> Result<(), SpawnError> {
360    /// # use std::path::PathBuf;
361    /// # use std::time::Duration;
362    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
363    /// # let path = PathBuf::from(".");
364    /// let agent = ProfilerBuilder::default()
365    ///     .with_local_reporter(path)
366    ///     .with_reporting_interval(Duration::from_secs(15))
367    ///     .build()
368    ///     .spawn_controllable()?;
369    /// // .. your program goes here
370    /// agent.stop().await; // make sure the last profile is flushed
371    /// # Ok::<_, SpawnError>(())
372    /// # }
373    /// ```
374    pub fn with_reporting_interval(mut self, i: Duration) -> ProfilerBuilder {
375        self.reporting_interval = Some(i);
376        self
377    }
378
379    /// Sets the [`Reporter`], which is used to upload the collected profiling
380    /// data. Common reporters are [`LocalReporter`], and, with the `s3-no-defaults`
381    /// feature enabled,
382    #[cfg_attr(not(feature = "s3-no-defaults"), doc = "`S3Reporter`.")]
383    #[cfg_attr(feature = "s3-no-defaults", doc = "[`S3Reporter`].")]
384    /// It is also possible to write your own [`Reporter`].
385    ///
386    /// It's normally easier to use [`LocalReporter`] directly via
387    /// [`ProfilerBuilder::with_local_reporter`].
388    ///
389    /// If you want to output to multiple reporters, you can use
390    /// [`MultiReporter`].
391    ///
392    /// [`LocalReporter`]: crate::reporter::local::LocalReporter
393    /// [`MultiReporter`]: crate::reporter::multi::MultiReporter
394    #[cfg_attr(
395        feature = "s3-no-defaults",
396        doc = "[`S3Reporter`]: crate::reporter::s3::S3Reporter"
397    )]
398    ///
399    #[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example.md"))]
400    pub fn with_reporter(mut self, r: impl Reporter + Send + Sync + 'static) -> ProfilerBuilder {
401        self.reporter = Some(Box::new(r));
402        self
403    }
404
405    /// Sets the profiler to ues [LocalReporter], which will write `.jfr` files to `path`,
406    /// and disables metadata auto-detection (see [`ProfilerBuilder::with_custom_agent_metadata`])
407    /// since the [LocalReporter] does not need that.
408    ///
409    /// This is useful for testing, since metadata auto-detection currently only works
410    /// on [Amazon EC2] or [Amazon Fargate] instances.
411    ///
412    /// The local reporter should normally not be used in production, since it will
413    /// not clean up JFR files. Instead, you can use a pre-existing [`Reporter`]
414    /// or write your own (see [`ProfilerBuilder::with_reporter`]).
415    ///
416    /// [Amazon EC2]: https://aws.amazon.com/ec2
417    /// [Amazon Fargate]: https://aws.amazon.com/fargate
418    ///
419    /// ## Example
420    ///
421    /// This will write profiles as `.jfr` files to `./path-to-profiles`:
422    ///
423    /// ```no_run
424    /// # use std::path::PathBuf;
425    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
426    /// # use async_profiler_agent::reporter::local::LocalReporter;
427    /// # use async_profiler_agent::metadata::AgentMetadata;
428    /// let path = PathBuf::from("./path-to-profiles");
429    /// let agent = ProfilerBuilder::default()
430    ///     .with_local_reporter(path)
431    ///     .build()
432    ///     .spawn()?;
433    /// # Ok::<_, SpawnError>(())
434    /// ```
435    pub fn with_local_reporter(mut self, path: impl Into<PathBuf>) -> ProfilerBuilder {
436        self.reporter = Some(Box::new(LocalReporter::new(path.into())));
437        self.with_custom_agent_metadata(AgentMetadata::NoMetadata)
438    }
439
440    /// Provide custom agent metadata.
441    ///
442    /// The async-profiler Rust agent sends metadata to the [Reporter] with
443    /// the identity of the current host and process, which is normally
444    /// transmitted as `metadata.json` within the generated `.zip` file,
445    /// using the schema format [`reporter::s3::MetadataJson`].
446    ///
447    /// That metadata can later be used by tooling to be able to sort
448    /// profiling reports by host.
449    ///
450    /// async-profiler Rust agent will by default try to fetch the metadata
451    /// using [IMDS] when running on [Amazon EC2] or [Amazon Fargate], and
452    /// will error if it's unable to find it. If you are running the
453    /// async-profiler agent on any other form of compute,
454    /// you will need to create and attach your own metadata
455    /// by calling this function.
456    ///
457    #[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example-custom-metadata.md"))]
458    /// [`reporter::s3::MetadataJson`]: crate::reporter::s3::MetadataJson
459    /// [Amazon EC2]: https://aws.amazon.com/ec2
460    /// [Amazon Fargate]: https://aws.amazon.com/fargate
461    /// [IMDS]: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
462    pub fn with_custom_agent_metadata(mut self, j: AgentMetadata) -> ProfilerBuilder {
463        self.agent_metadata = Some(j);
464        self
465    }
466
467    /// Provide custom profiler options.
468    ///
469    /// ### Example
470    ///
471    /// This will sample allocations for every 10 megabytes allocated:
472    ///
473    /// ```
474    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
475    /// # use async_profiler_agent::profiler::SpawnError;
476    /// # #[tokio::main]
477    /// # async fn main() -> Result<(), SpawnError> {
478    /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build();
479    /// let profiler = ProfilerBuilder::default()
480    ///     .with_profiler_options(opts)
481    ///     .with_local_reporter("/tmp/profiles")
482    ///     .build();
483    /// # if false { // don't spawn the profiler in doctests
484    /// let profiler = profiler.spawn_controllable()?;
485    /// // ... your program goes here
486    /// profiler.stop().await; // make sure the last profile is flushed
487    /// # }
488    /// # Ok(())
489    /// # }
490    /// ```
491    pub fn with_profiler_options(mut self, c: ProfilerOptions) -> ProfilerBuilder {
492        self.profiler_options = Some(c);
493        self
494    }
495
496    /// Turn this builder into a profiler!
497    pub fn build(self) -> Profiler {
498        Profiler {
499            reporting_interval: self.reporting_interval.unwrap_or(Duration::from_secs(30)),
500            reporter: self.reporter.expect("reporter is required"),
501            agent_metadata: self.agent_metadata,
502            profiler_options: self.profiler_options.unwrap_or_default(),
503        }
504    }
505}
506
507enum Status {
508    Idle,
509    Starting,
510    Running(SystemTime),
511}
512
513/// This type provides wrapper APIs over [`asprof::AsProf`], to allow tracking
514/// of the state of the profiler. The primary benefit of this is RAII - when
515/// this type drops, it will stop the profiler if it's running.
516struct ProfilerState<E: ProfilerEngine> {
517    // this is only None in the destructor when stopping the async-profiler fails
518    jfr_file: Option<JfrFile>,
519    asprof: E,
520    status: Status,
521    profiler_options: ProfilerOptions,
522}
523
524impl<E: ProfilerEngine> ProfilerState<E> {
525    fn new(asprof: E, profiler_options: ProfilerOptions) -> Result<Self, io::Error> {
526        Ok(Self {
527            jfr_file: Some(JfrFile::new()?),
528            asprof,
529            status: Status::Idle,
530            profiler_options,
531        })
532    }
533
534    fn jfr_file_mut(&mut self) -> &mut JfrFile {
535        self.jfr_file.as_mut().unwrap()
536    }
537
538    async fn start(&mut self) -> Result<(), AsProfError> {
539        let active = self.jfr_file.as_ref().unwrap().active_path();
540        // drop guard - make sure the files are leaked if the profiler might have started
541        self.status = Status::Starting;
542        E::start_async_profiler(&self.asprof, &active, &self.profiler_options)?;
543        self.status = Status::Running(SystemTime::now());
544        Ok(())
545    }
546
547    fn stop(&mut self) -> Result<Option<SystemTime>, AsProfError> {
548        E::stop_async_profiler()?;
549        let status = std::mem::replace(&mut self.status, Status::Idle);
550        Ok(match status {
551            Status::Idle | Status::Starting => None,
552            Status::Running(since) => Some(since),
553        })
554    }
555
556    fn is_started(&self) -> bool {
557        matches!(self.status, Status::Running(_))
558    }
559}
560
561impl<E: ProfilerEngine> Drop for ProfilerState<E> {
562    fn drop(&mut self) {
563        match self.status {
564            Status::Running(_) => {
565                if let Err(err) = self.stop() {
566                    // SECURITY: avoid removing the JFR file if stopping the profiler fails,
567                    // to avoid symlink races
568                    std::mem::forget(self.jfr_file.take());
569                    // XXX: Rust defines leaking resources during drop as safe.
570                    tracing::warn!(?err, "unable to stop profiler during drop glue");
571                }
572            }
573            Status::Idle => {}
574            Status::Starting => {
575                // SECURITY: avoid removing the JFR file if stopping the profiler fails,
576                // to avoid symlink races
577                std::mem::forget(self.jfr_file.take());
578            }
579        }
580    }
581}
582
583pub(crate) trait ProfilerEngine: Send + Sync + 'static {
584    fn init_async_profiler() -> Result<(), asprof::AsProfError>;
585    fn start_async_profiler(
586        &self,
587        jfr_file_path: &Path,
588        options: &ProfilerOptions,
589    ) -> Result<(), asprof::AsProfError>;
590    fn stop_async_profiler() -> Result<(), asprof::AsProfError>;
591}
592
593#[derive(Debug, Error)]
594#[non_exhaustive]
595enum TickError {
596    #[error(transparent)]
597    AsProf(#[from] AsProfError),
598    #[error(transparent)]
599    #[cfg(feature = "aws-metadata-no-defaults")]
600    Metadata(#[from] crate::metadata::aws::AwsProfilerMetadataError),
601    #[error("reporter: {0}")]
602    Reporter(Box<dyn std::error::Error + Send>),
603    #[error("broken clock: {0}")]
604    BrokenClock(#[from] SystemTimeError),
605    #[error("jfr read error: {0}")]
606    JfrRead(io::Error),
607    #[error("empty inactive file error: {0}")]
608    EmptyInactiveFile(io::Error),
609}
610
611#[derive(Debug, Error)]
612#[non_exhaustive]
613/// An error that happened spawning a profiler
614pub enum SpawnError {
615    /// Error from async-profiler
616    #[error(transparent)]
617    AsProf(#[from] asprof::AsProfError),
618    /// Error writing to a tempfile
619    #[error("tempfile error")]
620    TempFile(#[source] io::Error),
621}
622
623#[derive(Debug, Error)]
624#[non_exhaustive]
625/// An error from [`Profiler::spawn_thread`]
626pub enum SpawnThreadError {
627    /// Error from async-profiler
628    #[error(transparent)]
629    AsProf(#[from] SpawnError),
630    /// Error constructing Tokio runtime
631    #[error("constructing Tokio runtime")]
632    ConstructRt(#[source] io::Error),
633}
634
635// no control messages currently
636enum Control {}
637
638/// A handle to a running profiler
639///
640/// Currently just allows for stopping the profiler.
641///
642/// Dropping this handle will request that the profiler will stop.
643#[must_use = "dropping this stops the profiler, call .detach() to detach"]
644pub struct RunningProfiler {
645    stop_channel: tokio::sync::oneshot::Sender<Control>,
646    join_handle: tokio::task::JoinHandle<()>,
647}
648
649impl RunningProfiler {
650    /// Request that the current profiler stops and wait until it exits.
651    ///
652    /// This will cause the currently-pending profile information to be flushed.
653    ///
654    /// After this function returns, it is correct and safe to [spawn] a new
655    /// [Profiler], possibly with a different configuration. Therefore,
656    /// this function can be used to "reconfigure" a profiler by stopping
657    /// it and then starting a new one with a different configuration.
658    ///
659    /// [spawn]: Profiler::spawn_controllable
660    pub async fn stop(self) {
661        drop(self.stop_channel);
662        let _ = self.join_handle.await;
663    }
664
665    /// Like [Self::detach], but returns a JoinHandle. This is currently not a public API.
666    fn detach_inner(self) -> tokio::task::JoinHandle<()> {
667        tokio::task::spawn(async move {
668            // move the control channel to the spawned task. this way, it will be dropped
669            // just when the task is aborted.
670            let _abort_channel = self.stop_channel;
671            self.join_handle.await.ok();
672        })
673    }
674
675    /// Detach this profiler. This will prevent the profiler from being stopped
676    /// when this handle is dropped. You should call this (or [Profiler::spawn]
677    /// instead of [Profiler::spawn_controllable], which does the same thing)
678    /// if you don't intend to reconfigure your profiler at runtime.
679    pub fn detach(self) {
680        self.detach_inner();
681    }
682
683    /// Spawns this [RunningProfiler] into a separate thread within a new Tokio runtime,
684    /// and returns a [RunningProfilerThread] attached to it.
685    fn spawn_attached(
686        self,
687        runtime: tokio::runtime::Runtime,
688        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
689    ) -> RunningProfilerThread {
690        RunningProfilerThread {
691            stop_channel: self.stop_channel,
692            join_handle: spawn_fn(Box::new(move || {
693                let _ = runtime.block_on(self.join_handle);
694            })),
695        }
696    }
697
698    /// Spawns this [RunningProfiler] into a separate thread within a new Tokio runtime,
699    /// and detaches it.
700    fn spawn_detached(
701        self,
702        runtime: tokio::runtime::Runtime,
703        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
704    ) {
705        spawn_fn(Box::new(move || {
706            let _stop_channel = self.stop_channel;
707            let _ = runtime.block_on(self.join_handle);
708        }));
709    }
710}
711
712/// A handle to a running profiler, running on a separate thread.
713///
714/// Currently just allows for stopping the profiler.
715///
716/// Dropping this handle will request that the profiler will stop.
717#[must_use = "dropping this stops the profiler, call .detach() to detach"]
718pub struct RunningProfilerThread {
719    stop_channel: tokio::sync::oneshot::Sender<Control>,
720    join_handle: std::thread::JoinHandle<()>,
721}
722
723impl RunningProfilerThread {
724    /// Request that the current profiler stops and wait until it exits.
725    ///
726    /// This will cause the currently-pending profile information to be flushed.
727    ///
728    /// After this function returns, it is correct and safe to [spawn] a new
729    /// [Profiler], possibly with a different configuration. Therefore,
730    /// this function can be used to "reconfigure" a profiler by stopping
731    /// it and then starting a new one with a different configuration.
732    ///
733    /// [spawn]: Profiler::spawn_controllable
734    pub fn stop(self) {
735        drop(self.stop_channel);
736        let _ = self.join_handle.join();
737    }
738}
739
740/// Rust profiler based on [async-profiler].
741///
742/// Spawning a profiler can be done either in an attached (controllable)
743/// mode, which allows for stopping the profiler (and, in fact, stops
744/// it when the relevant handle is dropped), or in detached mode,
745/// in which the profiler keeps running forever. Applications that can
746/// shut down the profiler at run-time, for example applications that
747/// support reconfiguration of a running profiler, generally want to use
748/// controllable mode. Other applications (most of them) should use
749/// detached mode.
750///
751/// In addition, the profiler can either be spawned into the current Tokio
752/// runtime, or into a new one. Normally, applications should spawn
753/// the profiler into their own Tokio runtime, but applications that
754/// don't have a default Tokio runtime should spawn it into a
755/// different one
756///
757/// This leaves 4 functions:
758/// 1. [Self::spawn] - detached, same runtime
759/// 2. [Self::spawn_thread_to_runtime] - detached, different runtime
760/// 3. [Self::spawn_controllable] - controllable, same runtime
761/// 4. [Self::spawn_controllable_thread_to_runtime] - controllable, different runtime
762///
763/// In addition, there's a helper function that just spawns the profiler
764/// to a new runtime in a new thread, for applications that don't have
765/// a Tokio runtime and don't need complex control:
766///
767/// 5. [Self::spawn_thread] - detached, new runtime in a new thread
768///
769/// [async-profiler]: https://github.com/async-profiler/async-profiler
770pub struct Profiler {
771    reporting_interval: Duration,
772    reporter: Box<dyn Reporter + Send + Sync>,
773    agent_metadata: Option<AgentMetadata>,
774    profiler_options: ProfilerOptions,
775}
776
777impl Profiler {
778    /// Start profiling. The profiler will run in a tokio task at the configured interval.
779    ///
780    /// This is the same as calling [Profiler::spawn_controllable] followed by
781    /// [RunningProfiler::detach], except it returns a [JoinHandle].
782    ///
783    /// The returned [JoinHandle] can be used to detect if the profiler has exited
784    /// due to a fatal error.
785    ///
786    /// This function will fail if it is unable to start async-profiler, for example
787    /// if it can't find or load `libasyncProfiler.so`.
788    ///
789    /// [JoinHandle]: tokio::task::JoinHandle
790    ///
791    /// ### Uploading the last sample
792    ///
793    /// When you return from the Tokio `main`, the agent will terminate without waiting
794    /// for the last profiling JFR to be uploaded. Especially if you have a
795    /// short-running program, if you want to ensure the last profiling JFR
796    /// is uploaded, you should use [Profiler::spawn_controllable] and
797    /// [RunningProfiler::stop] , which allows waiting for the upload
798    /// to finish.
799    ///
800    /// If you do not care about losing the last sample, it is fine to directly
801    /// return from the Tokio `main` without stopping the profiler.
802    ///
803    /// ### Tokio Runtime
804    ///
805    /// This function must be run within a Tokio runtime, otherwise it will panic. If
806    /// your application does not have a `main` Tokio runtime, see
807    /// [Profiler::spawn_thread].
808    ///
809    /// ### Example
810    ///
811    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
812    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
813    ///
814    /// ```
815    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
816    /// # #[tokio::main]
817    /// # async fn main() -> Result<(), SpawnError> {
818    /// let profiler = ProfilerBuilder::default()
819    ///    .with_local_reporter("/tmp/profiles")
820    ///    .build();
821    /// # if false { // don't spawn the profiler in doctests
822    /// profiler.spawn()?;
823    /// # }
824    /// # Ok(())
825    /// # }
826    /// ```
827    pub fn spawn(self) -> Result<tokio::task::JoinHandle<()>, SpawnError> {
828        self.spawn_controllable().map(RunningProfiler::detach_inner)
829    }
830
831    /// Like [Self::spawn], but instead of spawning within the current Tokio
832    /// runtime, spawns within a set Tokio runtime and then runs a thread that calls
833    /// [block_on](tokio::runtime::Runtime::block_on) on that runtime.
834    ///
835    /// If your configuration is standard, use [Profiler::spawn_thread].
836    ///
837    /// If you want to be able to stop the resulting profiler, use
838    /// [Profiler::spawn_controllable_thread_to_runtime].
839    ///
840    /// `spawn_fn` should be [`std::thread::spawn`], or some function that behaves like it (to
841    /// allow for configuring thread properties, for example thread names).
842    ///
843    /// This is to be used when your program does not have a "main" Tokio runtime already set up.
844    ///
845    /// ### Uploading the last sample
846    ///
847    /// When you return from `main`, the agent will terminate without waiting
848    /// for the last profiling JFR to be uploaded. Especially if you have a
849    /// short-running program, if you want to ensure the last profiling JFR
850    /// is uploaded, you should use [Profiler::spawn_controllable_thread_to_runtime]
851    /// and [RunningProfilerThread::stop], which allows waiting for the upload
852    /// to finish.
853    ///
854    /// If you do not care about losing the last sample, it is fine to directly
855    /// return from the Tokio `main` without stopping the profiler.
856    ///
857    /// ### Example
858    ///
859    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
860    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
861    ///
862    /// ```no_run
863    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
864    /// let rt = tokio::runtime::Builder::new_current_thread()
865    ///     .enable_all()
866    ///     .build()?;
867    /// let profiler = ProfilerBuilder::default()
868    ///    .with_local_reporter("/tmp/profiles")
869    ///    .build();
870    ///
871    /// profiler.spawn_thread_to_runtime(
872    ///     rt,
873    ///     |t| {
874    ///         std::thread::Builder::new()
875    ///             .name("asprof-agent".to_owned())
876    ///             .spawn(t)
877    ///             .expect("thread name contains nuls")
878    ///     }
879    /// )?;
880    /// # Ok::<_, anyhow::Error>(())
881    /// ```
882    pub fn spawn_thread_to_runtime(
883        self,
884        runtime: tokio::runtime::Runtime,
885        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
886    ) -> Result<(), SpawnError> {
887        self.spawn_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn)
888    }
889
890    /// Like [Self::spawn], but instead of spawning within the current Tokio
891    /// runtime, spawns within a new Tokio runtime and then runs a thread that calls
892    /// [block_on](tokio::runtime::Runtime::block_on) on that runtime, setting up the runtime
893    /// by itself.
894    ///
895    /// If your configuration is less standard, use [Profiler::spawn_thread_to_runtime]. Calling
896    /// [Profiler::spawn_thread] is equivalent to calling [Profiler::spawn_thread_to_runtime]
897    /// with the following:
898    /// 1. a current thread runtime with background worker threads (these exist
899    ///    for blocking IO) named "asprof-worker"
900    /// 2. a controller thread (the "main" thread of the runtime) named "asprof-agent"
901    ///
902    /// If you want to be able to stop the resulting profiler, use
903    /// [Profiler::spawn_controllable_thread_to_runtime].
904    ///
905    /// This is to be used when your program does not have a "main" Tokio runtime already set up.
906    ///
907    /// ### Uploading the last sample
908    ///
909    /// When you return from `main`, the agent will terminate without waiting
910    /// for the last profiling JFR to be uploaded. Especially if you have a
911    /// short-running program, if you want to ensure the last profiling JFR
912    /// is uploaded, you should use [Profiler::spawn_controllable_thread_to_runtime]
913    /// and [RunningProfilerThread::stop], which allows waiting for the upload
914    /// to finish.
915    ///
916    /// If you do not care about losing the last sample, it is fine to directly
917    /// return from the Tokio `main` without stopping the profiler.
918    ///
919    /// ### Example
920    ///
921    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
922    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
923    ///
924    /// ```no_run
925    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
926    /// # use async_profiler_agent::reporter::local::LocalReporter;
927    /// let profiler = ProfilerBuilder::default()
928    ///    .with_local_reporter("/tmp/profiles")
929    ///    .build();
930    ///
931    /// profiler.spawn_thread()?;
932    /// # Ok::<_, anyhow::Error>(())
933    /// ```
934    pub fn spawn_thread(self) -> Result<(), SpawnThreadError> {
935        // using "asprof" in thread name to deal with 15 character + \0 length limit
936        let rt = tokio::runtime::Builder::new_current_thread()
937            .thread_name("asprof-worker".to_owned())
938            .enable_all()
939            .build()
940            .map_err(SpawnThreadError::ConstructRt)?;
941        let builder = std::thread::Builder::new().name("asprof-agent".to_owned());
942        self.spawn_thread_to_runtime(rt, |t| builder.spawn(t).expect("thread name contains nuls"))
943            .map_err(SpawnThreadError::AsProf)
944    }
945
946    fn spawn_thread_inner<E: ProfilerEngine>(
947        self,
948        asprof: E,
949        runtime: tokio::runtime::Runtime,
950        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
951    ) -> Result<(), SpawnError> {
952        let handle: RunningProfiler = runtime.block_on(async move { self.spawn_inner(asprof) })?;
953        handle.spawn_detached(runtime, spawn_fn);
954        Ok(())
955    }
956
957    /// Like [Self::spawn], but returns a [RunningProfiler] that allows for controlling
958    /// (currently only stopping) the profiler.
959    ///
960    /// This allows for changing the configuration of the profiler at runtime, by
961    /// stopping it and then starting a new Profiler with a new configuration. It
962    /// also allows for stopping profiling in case the profiler is suspected to
963    /// cause operational issues.
964    ///
965    /// Dropping the returned [RunningProfiler] will cause the profiler to quit,
966    /// so if your application doen't need to change the profiler's configuration at runtime,
967    /// it will be easier to use [Profiler::spawn].
968    ///
969    /// This function will fail if it is unable to start async-profiler, for example
970    /// if it can't find or load `libasyncProfiler.so`.
971    ///
972    /// ### Uploading the last sample
973    ///
974    /// When you return from the Tokio `main`, the agent will terminate without waiting
975    /// for the last profiling JFR to be uploaded. Especially if you have a
976    /// short-running program, if you want to ensure the last profiling JFR
977    /// is uploaded, you should use [RunningProfiler::stop], which allows waiting for
978    /// the upload to finish.
979    ///
980    /// If you do not care about losing the last sample, it is fine to directly
981    /// return from the Tokio `main` without stopping the profiler.
982    ///
983    /// ### Tokio Runtime
984    ///
985    /// This function must be run within a Tokio runtime, otherwise it will panic. If
986    /// your application does not have a `main` Tokio runtime, see
987    /// [Profiler::spawn_controllable_thread_to_runtime].
988    ///
989    /// ### Example
990    ///
991    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
992    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
993    ///
994    /// ```no_run
995    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
996    /// # #[tokio::main]
997    /// # async fn main() -> Result<(), SpawnError> {
998    /// let profiler = ProfilerBuilder::default()
999    ///    .with_local_reporter("/tmp/profiles")
1000    ///    .build();
1001    ///
1002    /// let profiler = profiler.spawn_controllable()?;
1003    ///
1004    /// // [insert your signaling/monitoring mechanism to have a request to disable
1005    /// // profiling in case of a problem]
1006    /// let got_request_to_disable_profiling = async move {
1007    ///     // ...
1008    /// #   false
1009    /// };
1010    /// // spawn a task that will disable profiling if requested
1011    /// tokio::task::spawn(async move {
1012    ///     if got_request_to_disable_profiling.await {
1013    ///         profiler.stop().await;
1014    ///     }
1015    /// });
1016    /// # Ok(())
1017    /// # }
1018    /// ```
1019    pub fn spawn_controllable(self) -> Result<RunningProfiler, SpawnError> {
1020        self.spawn_inner(asprof::AsProf::builder().build())
1021    }
1022
1023    /// Like [Self::spawn_controllable], but instead of spawning within the current Tokio
1024    /// runtime, spawns within a set Tokio runtime and then runs a thread that calls
1025    /// [block_on](tokio::runtime::Runtime::block_on) on that runtime.
1026    ///
1027    /// `spawn_fn` should be [`std::thread::spawn`], or some function that behaves like it (to
1028    /// allow for configuring thread properties, for example thread names).
1029    ///
1030    /// This is to be used when your program does not have a "main" Tokio runtime already set up.
1031    ///
1032    /// ### Uploading the last sample
1033    ///
1034    /// When you return from `main`, the agent will terminate without waiting
1035    /// for the last profiling JFR to be uploaded. Especially if you have a
1036    /// short-running program, if you want to ensure the last profiling JFR
1037    /// is uploaded, you should use [RunningProfilerThread::stop], which allows waiting
1038    /// for the upload to finish.
1039    ///
1040    /// If you do not care about losing the last sample, it is fine to directly
1041    /// return from the Tokio `main` without stopping the profiler.
1042    ///
1043    /// ### Example
1044    ///
1045    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
1046    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
1047    ///
1048    /// ```no_run
1049    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
1050    /// let rt = tokio::runtime::Builder::new_current_thread()
1051    ///     .enable_all()
1052    ///     .build()?;
1053    /// let profiler = ProfilerBuilder::default()
1054    ///    .with_local_reporter("/tmp/profiles")
1055    ///    .build();
1056    ///
1057    /// let profiler = profiler.spawn_controllable_thread_to_runtime(
1058    ///     rt,
1059    ///     |t| {
1060    ///         std::thread::Builder::new()
1061    ///             .name("asprof-agent".to_owned())
1062    ///             .spawn(t)
1063    ///             .expect("thread name contains nuls")
1064    ///     }
1065    /// )?;
1066    ///
1067    /// # fn got_request_to_disable_profiling() -> bool { false }
1068    /// // spawn a task that will disable profiling if requested
1069    /// std::thread::spawn(move || {
1070    ///     if got_request_to_disable_profiling() {
1071    ///         profiler.stop();
1072    ///     }
1073    /// });
1074    /// # Ok::<_, anyhow::Error>(())
1075    /// ```
1076    pub fn spawn_controllable_thread_to_runtime(
1077        self,
1078        runtime: tokio::runtime::Runtime,
1079        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
1080    ) -> Result<RunningProfilerThread, SpawnError> {
1081        self.spawn_controllable_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn)
1082    }
1083
1084    fn spawn_controllable_thread_inner<E: ProfilerEngine>(
1085        self,
1086        asprof: E,
1087        runtime: tokio::runtime::Runtime,
1088        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
1089    ) -> Result<RunningProfilerThread, SpawnError> {
1090        let handle = runtime.block_on(async move { self.spawn_inner(asprof) })?;
1091        Ok(handle.spawn_attached(runtime, spawn_fn))
1092    }
1093
1094    fn spawn_inner<E: ProfilerEngine>(self, asprof: E) -> Result<RunningProfiler, SpawnError> {
1095        struct LogOnDrop;
1096        impl Drop for LogOnDrop {
1097            fn drop(&mut self) {
1098                // Tokio will call destructors during runtime shutdown. Have something that will
1099                // emit a log in that case
1100                tracing::info!(
1101                    "unable to upload the last jfr due to Tokio runtime shutdown. \
1102                Add a call to `RunningProfiler::stop` to wait for jfr upload to finish."
1103                );
1104            }
1105        }
1106
1107        // Initialize async profiler - needs to be done once.
1108        E::init_async_profiler()?;
1109        tracing::info!("successfully initialized async profiler.");
1110
1111        let mut sampling_ticker = tokio::time::interval(self.reporting_interval);
1112        let (stop_channel, mut stop_rx) = tokio::sync::oneshot::channel();
1113
1114        // Get profiles at the configured interval rate.
1115        let join_handle = tokio::spawn(async move {
1116            let mut state = match ProfilerState::new(asprof, self.profiler_options) {
1117                Ok(state) => state,
1118                Err(err) => {
1119                    tracing::error!(?err, "unable to create profiler state");
1120                    return;
1121                }
1122            };
1123
1124            // Lazily-loaded if not specified up front.
1125            let mut agent_metadata = self.agent_metadata;
1126            let mut done = false;
1127
1128            let guard = LogOnDrop;
1129            while !done {
1130                // Wait until a timer or exit event
1131                tokio::select! {
1132                    biased;
1133
1134                    r = &mut stop_rx, if !stop_rx.is_terminated() => {
1135                        match r {
1136                            Err(_) => {
1137                                tracing::info!("profiler stop requested, doing a final tick");
1138                                done = true;
1139                            }
1140                        }
1141                    }
1142                    _ = sampling_ticker.tick() => {
1143                        tracing::debug!("profiler timer woke up");
1144                    }
1145                }
1146
1147                if let Err(err) = profiler_tick(
1148                    &mut state,
1149                    &mut agent_metadata,
1150                    &*self.reporter,
1151                    self.reporting_interval,
1152                )
1153                .await
1154                {
1155                    match &err {
1156                        TickError::Reporter(_) => {
1157                            // don't stop on IO errors
1158                            tracing::error!(?err, "error during profiling, continuing");
1159                        }
1160                        _stop => {
1161                            tracing::error!(?err, "error during profiling, stopping");
1162                            break;
1163                        }
1164                    }
1165                }
1166            }
1167
1168            mem::forget(guard);
1169            tracing::info!("profiling task finished");
1170        });
1171
1172        Ok(RunningProfiler {
1173            stop_channel,
1174            join_handle,
1175        })
1176    }
1177}
1178
1179async fn profiler_tick<E: ProfilerEngine>(
1180    state: &mut ProfilerState<E>,
1181    agent_metadata: &mut Option<AgentMetadata>,
1182    reporter: &(dyn Reporter + Send + Sync),
1183    reporting_interval: Duration,
1184) -> Result<(), TickError> {
1185    if !state.is_started() {
1186        state.start().await?;
1187        return Ok(());
1188    }
1189
1190    let Some(start) = state.stop()? else {
1191        tracing::warn!("stopped the profiler but it wasn't running?");
1192        return Ok(());
1193    };
1194    let start = start.duration_since(UNIX_EPOCH)?;
1195    let end = SystemTime::now().duration_since(UNIX_EPOCH)?;
1196
1197    // Start it up immediately, writing to the "other" file, so that we keep
1198    // profiling the application while we're reporting data.
1199    state
1200        .jfr_file_mut()
1201        .empty_inactive_file()
1202        .map_err(TickError::EmptyInactiveFile)?;
1203    state.jfr_file_mut().swap();
1204    state.start().await?;
1205
1206    // Lazily load the agent metadata if it was not provided in
1207    // the constructor. See the struct comments for why this is.
1208    // This code runs at most once.
1209    if agent_metadata.is_none() {
1210        #[cfg(feature = "aws-metadata-no-defaults")]
1211        let md = crate::metadata::aws::load_agent_metadata().await?;
1212        #[cfg(not(feature = "aws-metadata-no-defaults"))]
1213        let md = crate::metadata::AgentMetadata::NoMetadata;
1214        tracing::debug!("loaded metadata");
1215        agent_metadata.replace(md);
1216    }
1217
1218    let report_metadata = ReportMetadata {
1219        instance: agent_metadata.as_ref().unwrap(),
1220        start,
1221        end,
1222        reporting_interval,
1223    };
1224
1225    let jfr = tokio::fs::read(state.jfr_file_mut().inactive_path())
1226        .await
1227        .map_err(TickError::JfrRead)?;
1228
1229    reporter
1230        .report(jfr, &report_metadata)
1231        .await
1232        .map_err(TickError::Reporter)?;
1233
1234    Ok(())
1235}
1236
1237#[cfg(test)]
1238mod tests {
1239    use std::sync::Arc;
1240    use std::sync::atomic::{self, AtomicBool, AtomicU32};
1241
1242    use test_case::test_case;
1243
1244    use super::*;
1245
1246    #[test]
1247    fn test_jfr_file_drop() {
1248        let mut jfr = JfrFile::new().unwrap();
1249
1250        std::fs::write(jfr.active_path(), b"Hello, 2!").unwrap();
1251        jfr.swap();
1252        assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"Hello, 2!");
1253        jfr.empty_inactive_file().unwrap();
1254        assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"");
1255    }
1256
1257    struct MockProfilerEngine {
1258        counter: AtomicU32,
1259    }
1260    impl ProfilerEngine for MockProfilerEngine {
1261        fn init_async_profiler() -> Result<(), asprof::AsProfError> {
1262            Ok(())
1263        }
1264
1265        fn start_async_profiler(
1266            &self,
1267            jfr_file_path: &Path,
1268            _options: &ProfilerOptions,
1269        ) -> Result<(), asprof::AsProfError> {
1270            let contents = format!(
1271                "JFR{}",
1272                self.counter.fetch_add(1, atomic::Ordering::Relaxed)
1273            );
1274            std::fs::write(jfr_file_path, contents.as_bytes()).unwrap();
1275            Ok(())
1276        }
1277
1278        fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
1279            Ok(())
1280        }
1281    }
1282
1283    struct MockReporter(tokio::sync::mpsc::Sender<(String, AgentMetadata)>);
1284    impl std::fmt::Debug for MockReporter {
1285        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1286            f.debug_struct("MockReporter").finish()
1287        }
1288    }
1289
1290    #[async_trait::async_trait]
1291    impl Reporter for MockReporter {
1292        async fn report(
1293            &self,
1294            jfr: Vec<u8>,
1295            metadata: &ReportMetadata,
1296        ) -> Result<(), Box<dyn std::error::Error + Send>> {
1297            self.0
1298                .send((String::from_utf8(jfr).unwrap(), metadata.instance.clone()))
1299                .await
1300                .unwrap();
1301            Ok(())
1302        }
1303    }
1304
1305    fn make_mock_profiler() -> (
1306        Profiler,
1307        tokio::sync::mpsc::Receiver<(String, AgentMetadata)>,
1308    ) {
1309        let (tx, rx) = tokio::sync::mpsc::channel(1);
1310        let agent = ProfilerBuilder::default()
1311            .with_reporter(MockReporter(tx))
1312            .with_custom_agent_metadata(AgentMetadata::Ec2AgentMetadata {
1313                aws_account_id: "0".into(),
1314                aws_region_id: "us-east-1".into(),
1315                ec2_instance_id: "i-fake".into(),
1316                ec2_instance_type: "t3.micro".into(),
1317            })
1318            .build();
1319        (agent, rx)
1320    }
1321
1322    #[tokio::test(start_paused = true)]
1323    async fn test_profiler_agent() {
1324        let e_md = AgentMetadata::Ec2AgentMetadata {
1325            aws_account_id: "0".into(),
1326            aws_region_id: "us-east-1".into(),
1327            ec2_instance_id: "i-fake".into(),
1328            ec2_instance_type: "t3.micro".into(),
1329        };
1330        let (agent, mut rx) = make_mock_profiler();
1331        agent
1332            .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1333                counter: AtomicU32::new(0),
1334            })
1335            .unwrap()
1336            .detach();
1337        let (jfr, md) = rx.recv().await.unwrap();
1338        assert_eq!(jfr, "JFR0");
1339        assert_eq!(e_md, md);
1340        let (jfr, md) = rx.recv().await.unwrap();
1341        assert_eq!(jfr, "JFR1");
1342        assert_eq!(e_md, md);
1343    }
1344
1345    #[test_case(false; "uncontrollable")]
1346    #[test_case(true; "controllable")]
1347    fn test_profiler_local_rt(controllable: bool) {
1348        let e_md = AgentMetadata::Ec2AgentMetadata {
1349            aws_account_id: "0".into(),
1350            aws_region_id: "us-east-1".into(),
1351            ec2_instance_id: "i-fake".into(),
1352            ec2_instance_type: "t3.micro".into(),
1353        };
1354        let (agent, mut rx) = make_mock_profiler();
1355        let rt = tokio::runtime::Builder::new_current_thread()
1356            .enable_all()
1357            .start_paused(true)
1358            .build()
1359            .unwrap();
1360        // spawn the profiler, doing this before spawning a thread to allow
1361        // capturing errors from `spawn`
1362        let handle = if controllable {
1363            Some(
1364                agent
1365                    .spawn_controllable_thread_inner::<MockProfilerEngine>(
1366                        MockProfilerEngine {
1367                            counter: AtomicU32::new(0),
1368                        },
1369                        rt,
1370                        std::thread::spawn,
1371                    )
1372                    .unwrap(),
1373            )
1374        } else {
1375            agent
1376                .spawn_thread_inner::<MockProfilerEngine>(
1377                    MockProfilerEngine {
1378                        counter: AtomicU32::new(0),
1379                    },
1380                    rt,
1381                    std::thread::spawn,
1382                )
1383                .unwrap();
1384            None
1385        };
1386
1387        let (jfr, md) = rx.blocking_recv().unwrap();
1388        assert_eq!(jfr, "JFR0");
1389        assert_eq!(e_md, md);
1390        let (jfr, md) = rx.blocking_recv().unwrap();
1391        assert_eq!(jfr, "JFR1");
1392        assert_eq!(e_md, md);
1393
1394        if let Some(handle) = handle {
1395            let drain_thread =
1396                std::thread::spawn(move || while let Some(_) = rx.blocking_recv() {});
1397            // request a stop
1398            handle.stop();
1399            // the drain thread should be done
1400            drain_thread.join().unwrap();
1401        }
1402    }
1403
1404    enum StopKind {
1405        Delibrate,
1406        Drop,
1407        Abort,
1408    }
1409
1410    #[tokio::test(start_paused = true)]
1411    #[test_case(StopKind::Delibrate; "deliberate stop")]
1412    #[test_case(StopKind::Drop; "drop stop")]
1413    #[test_case(StopKind::Abort; "abort stop")]
1414    async fn test_profiler_stop(stop_kind: StopKind) {
1415        let e_md = AgentMetadata::Ec2AgentMetadata {
1416            aws_account_id: "0".into(),
1417            aws_region_id: "us-east-1".into(),
1418            ec2_instance_id: "i-fake".into(),
1419            ec2_instance_type: "t3.micro".into(),
1420        };
1421        let (agent, mut rx) = make_mock_profiler();
1422        let profiler_ref = agent
1423            .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1424                counter: AtomicU32::new(0),
1425            })
1426            .unwrap();
1427        let (jfr, md) = rx.recv().await.unwrap();
1428        assert_eq!(jfr, "JFR0");
1429        assert_eq!(e_md, md);
1430        let (jfr, md) = rx.recv().await.unwrap();
1431        assert_eq!(jfr, "JFR1");
1432        assert_eq!(e_md, md);
1433        // check that stop is faster than an interval and returns an "immediate" next jfr
1434        match stop_kind {
1435            StopKind::Drop => drop(profiler_ref),
1436            StopKind::Delibrate => {
1437                tokio::time::timeout(Duration::from_millis(1), profiler_ref.stop())
1438                    .await
1439                    .unwrap();
1440            }
1441            StopKind::Abort => {
1442                // You can call Abort on the JoinHandle. make sure that is not buggy.
1443                profiler_ref.detach_inner().abort();
1444            }
1445        }
1446        // check that we get the next JFR "quickly", and the JFR after that is empty.
1447        let (jfr, md) = tokio::time::timeout(Duration::from_millis(1), rx.recv())
1448            .await
1449            .unwrap()
1450            .unwrap();
1451        assert_eq!(jfr, "JFR2");
1452        assert_eq!(e_md, md);
1453        assert!(rx.recv().await.is_none());
1454    }
1455
1456    // simulate a badly-behaved profiler that errors on start/stop and then
1457    // tries to access the JFR file
1458    struct StopErrorProfilerEngine {
1459        start_error: bool,
1460        counter: Arc<AtomicBool>,
1461    }
1462    impl ProfilerEngine for StopErrorProfilerEngine {
1463        fn init_async_profiler() -> Result<(), asprof::AsProfError> {
1464            Ok(())
1465        }
1466
1467        fn start_async_profiler(
1468            &self,
1469            jfr_file_path: &Path,
1470            _options: &ProfilerOptions,
1471        ) -> Result<(), asprof::AsProfError> {
1472            let jfr_file_path = jfr_file_path.to_owned();
1473            std::fs::write(&jfr_file_path, "JFR").unwrap();
1474            let counter = self.counter.clone();
1475            tokio::task::spawn(async move {
1476                tokio::time::sleep(Duration::from_secs(5)).await;
1477                assert_eq!(std::fs::read_to_string(jfr_file_path).unwrap(), "JFR");
1478                counter.store(true, atomic::Ordering::Release);
1479            });
1480            if self.start_error {
1481                Err(asprof::AsProfError::AsyncProfilerError("error".into()))
1482            } else {
1483                Ok(())
1484            }
1485        }
1486
1487        fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
1488            Err(asprof::AsProfError::AsyncProfilerError("error".into()))
1489        }
1490    }
1491
1492    #[tokio::test(start_paused = true)]
1493    #[test_case(false; "error on stop")]
1494    #[test_case(true; "error on start")]
1495    async fn test_profiler_error(start_error: bool) {
1496        let (agent, mut rx) = make_mock_profiler();
1497        let counter = Arc::new(AtomicBool::new(false));
1498        let engine = StopErrorProfilerEngine {
1499            start_error,
1500            counter: counter.clone(),
1501        };
1502        let handle = agent.spawn_inner(engine).unwrap().detach_inner();
1503        assert!(rx.recv().await.is_none());
1504        // check that the "sleep 5" step in start_async_profiler succeeds
1505        for _ in 0..100 {
1506            tokio::time::sleep(Duration::from_secs(1)).await;
1507            if counter.load(atomic::Ordering::Acquire) {
1508                handle.await.unwrap(); // Check that the JoinHandle is done
1509                return;
1510            }
1511        }
1512        panic!("didn't read from file");
1513    }
1514
1515    #[test]
1516    fn test_profiler_options_to_args_string_default() {
1517        let opts = ProfilerOptions::default();
1518        let dummy_path = Path::new("/tmp/test.jfr");
1519        let args = opts.to_args_string(dummy_path);
1520        assert!(
1521            args.contains("start,event=cpu,interval=100000000,wall=1000ms,jfr,cstack=dwarf"),
1522            "Default args string not constructed correctly"
1523        );
1524        assert!(args.contains("file=/tmp/test.jfr"));
1525        assert!(!args.contains("nativemem="));
1526    }
1527
1528    #[test]
1529    fn test_profiler_options_to_args_string_with_native_mem() {
1530        let opts = ProfilerOptions {
1531            native_mem: Some("10m".to_string()),
1532            wall_clock_millis: None,
1533            cpu_interval: None,
1534        };
1535        let dummy_path = Path::new("/tmp/test.jfr");
1536        let args = opts.to_args_string(dummy_path);
1537        assert!(args.contains("nativemem=10m"));
1538    }
1539
1540    #[test]
1541    fn test_profiler_options_builder() {
1542        let opts = ProfilerOptionsBuilder::default()
1543            .with_native_mem_bytes(5000000)
1544            .build();
1545
1546        assert_eq!(opts.native_mem, Some("5000000".to_string()));
1547    }
1548
1549    #[test]
1550    fn test_profiler_options_builder_all_options() {
1551        let opts = ProfilerOptionsBuilder::default()
1552            .with_native_mem_bytes(5000000)
1553            .with_cpu_interval(Duration::from_secs(1))
1554            .with_wall_clock_interval(Duration::from_secs(10))
1555            .build();
1556
1557        let dummy_path = Path::new("/tmp/test.jfr");
1558        let args = opts.to_args_string(dummy_path);
1559        assert_eq!(
1560            args,
1561            "start,event=cpu,interval=1000000000,wall=10000ms,jfr,cstack=dwarf,file=/tmp/test.jfr,nativemem=5000000"
1562        );
1563    }
1564
1565    #[test]
1566    fn test_local_reporter_has_no_metadata() {
1567        // Check that with_local_reporter sets some configuration
1568        let reporter = ProfilerBuilder::default().with_local_reporter(".");
1569        assert_eq!(
1570            format!("{:?}", reporter.reporter),
1571            r#"Some(LocalReporter { directory: "." })"#
1572        );
1573        match reporter.agent_metadata {
1574            Some(AgentMetadata::NoMetadata) => {}
1575            bad => panic!("{bad:?}"),
1576        };
1577    }
1578}