Skip to main content

async_profiler_agent/
profiler.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! A profiler that periodically uploads profiling samples of your program to a [Reporter]
5
6use crate::{
7    asprof::{self, AsProfError},
8    metadata::{AgentMetadata, ReportMetadata},
9    reporter::{Reporter, local::LocalReporter},
10};
11use std::{
12    fs::File,
13    io,
14    path::{Path, PathBuf},
15    time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
16};
17use thiserror::Error;
18
19struct JfrFile {
20    active: std::fs::File,
21    inactive: std::fs::File,
22}
23
24impl JfrFile {
25    #[cfg(target_os = "linux")]
26    fn new() -> Result<Self, io::Error> {
27        Ok(Self {
28            active: tempfile::tempfile().unwrap(),
29            inactive: tempfile::tempfile().unwrap(),
30        })
31    }
32
33    #[cfg(not(target_os = "linux"))]
34    fn new() -> Result<Self, io::Error> {
35        Err(io::Error::other(
36            "async-profiler is only supported on Linux",
37        ))
38    }
39
40    fn swap(&mut self) {
41        std::mem::swap(&mut self.active, &mut self.inactive);
42    }
43
44    #[cfg(target_os = "linux")]
45    fn file_path(file: &std::fs::File) -> PathBuf {
46        use std::os::fd::AsRawFd;
47
48        format!("/proc/self/fd/{}", file.as_raw_fd()).into()
49    }
50
51    #[cfg(not(target_os = "linux"))]
52    fn file_path(_file: &std::fs::File) -> PathBuf {
53        unimplemented!()
54    }
55
56    fn active_path(&self) -> PathBuf {
57        Self::file_path(&self.active)
58    }
59
60    fn inactive_path(&self) -> PathBuf {
61        Self::file_path(&self.inactive)
62    }
63
64    fn empty_inactive_file(&mut self) -> Result<(), io::Error> {
65        // Empty the file, or create it for the first time if the profiler hasn't
66        // started yet.
67        File::create(Self::file_path(&self.inactive))?;
68        tracing::debug!(message = "emptied the file");
69        Ok(())
70    }
71}
72
73/// Options for configuring the async-profiler behavior.
74/// Currently supports:
75/// - Native memory allocation tracking
76#[derive(Debug, Default)]
77#[non_exhaustive]
78pub struct ProfilerOptions {
79    /// If set, the profiler will collect information about
80    /// native memory allocations.
81    ///
82    /// The value is the interval in bytes or in other units,
83    /// if followed by k (kilobytes), m (megabytes), or g (gigabytes).
84    /// For example, `"10m"` will sample an allocation for every
85    /// 10 megabytes of memory allocated. Passing `"0"` will sample
86    /// all allocations.
87    ///
88    /// See [ProfilingModes in the async-profiler docs] for more details.
89    ///
90    /// [ProfilingModes in the async-profiler docs]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilingModes.md#native-memory-leaks
91    pub native_mem: Option<String>,
92    cpu_interval: Option<u128>,
93    wall_clock_millis: Option<u128>,
94}
95
96const DEFAULT_CPU_INTERVAL_NANOS: u128 = 100_000_000;
97const DEFAULT_WALL_CLOCK_INTERVAL_MILLIS: u128 = 1_000;
98
99impl ProfilerOptions {
100    /// Convert the profiler options to a string of arguments for the async-profiler.
101    pub fn to_args_string(&self, jfr_file_path: &std::path::Path) -> String {
102        let mut args = format!(
103            "start,event=cpu,interval={},wall={}ms,jfr,cstack=dwarf,file={}",
104            self.cpu_interval.unwrap_or(DEFAULT_CPU_INTERVAL_NANOS),
105            self.wall_clock_millis
106                .unwrap_or(DEFAULT_WALL_CLOCK_INTERVAL_MILLIS),
107            jfr_file_path.display()
108        );
109        if let Some(ref native_mem) = self.native_mem {
110            args.push_str(&format!(",nativemem={native_mem}"));
111        }
112        args
113    }
114}
115
116/// Builder for [`ProfilerOptions`].
117#[derive(Debug, Default)]
118pub struct ProfilerOptionsBuilder {
119    native_mem: Option<String>,
120    cpu_interval: Option<u128>,
121    wall_clock_millis: Option<u128>,
122}
123
124impl ProfilerOptionsBuilder {
125    /// Same as [ProfilerOptionsBuilder::with_native_mem_bytes], but pass
126    /// the string input directly to async_profiler.
127    ///
128    /// The value is the interval in bytes or in other units,
129    /// if followed by k (kilobytes), m (megabytes), or g (gigabytes).
130    ///
131    /// Prefer using [ProfilerOptionsBuilder::with_native_mem_bytes], since it's
132    /// type-checked.
133    ///
134    /// ### Examples
135    ///
136    /// This will sample allocations for every 10 megabytes allocated:
137    ///
138    /// ```
139    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
140    /// # use async_profiler_agent::profiler::SpawnError;
141    /// # #[tokio::main]
142    /// # async fn main() -> Result<(), SpawnError> {
143    /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build();
144    /// let profiler = ProfilerBuilder::default()
145    ///     .with_profiler_options(opts)
146    ///     .with_local_reporter("/tmp/profiles")
147    ///     .build();
148    /// # if false { // don't spawn the profiler in doctests
149    /// let profiler = profiler.spawn_controllable()?;
150    /// // ... your program goes here
151    /// profiler.stop().await; // make sure the last profile is flushed
152    /// # }
153    /// # Ok(())
154    /// # }
155    /// ```
156    pub fn with_native_mem(mut self, native_mem_interval: String) -> Self {
157        self.native_mem = Some(native_mem_interval);
158        self
159    }
160
161    /// If set, the profiler will collect information about
162    /// native memory allocations.
163    ///
164    /// The argument passed is the profiling interval - the profiler will
165    /// sample allocations every about that many bytes.
166    ///
167    /// See [ProfilingModes in the async-profiler docs] for more details.
168    ///
169    /// [ProfilingModes in the async-profiler docs]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilingModes.md#native-memory-leaks
170    ///
171    /// ### Examples
172    ///
173    /// This will sample allocations for every 10 megabytes allocated:
174    ///
175    /// ```
176    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
177    /// # use async_profiler_agent::profiler::SpawnError;
178    /// # #[tokio::main]
179    /// # async fn main() -> Result<(), SpawnError> {
180    /// let opts = ProfilerOptionsBuilder::default().with_native_mem_bytes(10_000_000).build();
181    /// let profiler = ProfilerBuilder::default()
182    ///     .with_profiler_options(opts)
183    ///     .with_local_reporter("/tmp/profiles")
184    ///     .build();
185    /// # if false { // don't spawn the profiler in doctests
186    /// let profiler = profiler.spawn_controllable()?;
187    /// // ... your program goes here
188    /// profiler.stop().await; // make sure the last profile is flushed
189    /// # }
190    /// # Ok(())
191    /// # }
192    /// ```
193    ///
194    /// This will sample every allocation (potentially slow):
195    /// ```
196    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
197    /// # use async_profiler_agent::profiler::SpawnError;
198    /// # #[tokio::main]
199    /// # async fn main() -> Result<(), SpawnError> {
200    /// let opts = ProfilerOptionsBuilder::default().with_native_mem_bytes(0).build();
201    /// let profiler = ProfilerBuilder::default()
202    ///     .with_profiler_options(opts)
203    ///     .with_local_reporter("/tmp/profiles")
204    ///     .build();
205    /// # if false { // don't spawn the profiler in doctests
206    /// let profiler = profiler.spawn_controllable()?;
207    /// // ... your program goes here
208    /// profiler.stop().await; // make sure the last profile is flushed
209    /// # }
210    /// # Ok(())
211    /// # }
212    /// ```
213    pub fn with_native_mem_bytes(mut self, native_mem_interval: usize) -> Self {
214        self.native_mem = Some(native_mem_interval.to_string());
215        self
216    }
217
218    /// Sets the interval in which the profiler will collect
219    /// CPU-time samples, via the [async-profiler `interval` option].
220    ///
221    /// CPU-time samples (JFR `jdk.ExecutionSample`) sample only threads that
222    /// are currently running on a CPU, not threads that are sleeping.
223    ///
224    /// It can use a higher frequency than wall-clock sampling since the
225    /// number of the threads that are running on a CPU at a given time is
226    /// naturally limited by the number of CPUs, while the number of sleeping
227    /// threads can be much larger.
228    ///
229    /// The default is to do a CPU-time sample every 100 milliseconds.
230    ///
231    /// The async-profiler agent collects both CPU time and wall-clock time
232    /// samples, so this function should normally be used along with
233    /// [ProfilerOptionsBuilder::with_wall_clock_interval].
234    ///
235    /// [async-profiler `interval` option]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilerOptions.md#options-applicable-to-any-output-format
236    ///
237    /// ### Examples
238    ///
239    /// This will sample allocations for every 10 CPU milliseconds (when running)
240    /// and 100 wall-clock milliseconds (running or sleeping):
241    ///
242    /// ```
243    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
244    /// # use async_profiler_agent::profiler::SpawnError;
245    /// # use std::time::Duration;
246    /// # #[tokio::main]
247    /// # async fn main() -> Result<(), SpawnError> {
248    /// let opts = ProfilerOptionsBuilder::default()
249    ///     .with_cpu_interval(Duration::from_millis(10))
250    ///     .with_wall_clock_interval(Duration::from_millis(100))
251    ///     .build();
252    /// let profiler = ProfilerBuilder::default()
253    ///     .with_profiler_options(opts)
254    ///     .with_local_reporter("/tmp/profiles")
255    ///     .build();
256    /// # if false { // don't spawn the profiler in doctests
257    /// let profiler = profiler.spawn_controllable()?;
258    /// // ... your program goes here
259    /// profiler.stop().await; // make sure the last profile is flushed
260    /// # }
261    /// # Ok(())
262    /// # }
263    /// ```
264    pub fn with_cpu_interval(mut self, cpu_interval: Duration) -> Self {
265        self.cpu_interval = Some(cpu_interval.as_nanos());
266        self
267    }
268
269    /// Sets the interval, in milliseconds, in which the profiler will collect
270    /// wall-clock samples, via the [async-profiler `wall` option].
271    ///
272    /// Wall-clock samples (JFR `profiler.WallClockSample`) sample threads
273    /// whether they are sleeping or running, and can therefore be
274    /// very useful for finding threads that are blocked, for example
275    /// on a synchronous lock or a slow system call.
276    ///
277    /// When using Tokio, since tasks are not threads, tasks that are not
278    /// currently running will not be sampled by a wall clock sample. However,
279    /// a wall clock sample is still very useful in Tokio, since it is what
280    /// you want to catch tasks that are blocking a thread by waiting on
281    /// synchronous operations.
282    ///
283    /// The default is to do a wall-clock sample every second.
284    ///
285    /// The async-profiler agent collects both CPU time and wall-clock time
286    /// samples, so this function should normally be used along with
287    /// [ProfilerOptionsBuilder::with_cpu_interval].
288    ///
289    /// [async-profiler `wall` option]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilerOptions.md#options-applicable-to-any-output-format
290    ///
291    /// ### Examples
292    ///
293    /// This will sample allocations for every 10 CPU milliseconds (when running)
294    /// and 100 wall-clock milliseconds (running or sleeping):
295    ///
296    /// ```
297    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
298    /// # use async_profiler_agent::profiler::SpawnError;
299    /// # use std::time::Duration;
300    /// # #[tokio::main]
301    /// # async fn main() -> Result<(), SpawnError> {
302    /// let opts = ProfilerOptionsBuilder::default()
303    ///     .with_cpu_interval(Duration::from_millis(10))
304    ///     .with_wall_clock_interval(Duration::from_millis(10))
305    ///     .build();
306    /// let profiler = ProfilerBuilder::default()
307    ///     .with_profiler_options(opts)
308    ///     .with_local_reporter("/tmp/profiles")
309    ///     .build();
310    /// # if false { // don't spawn the profiler in doctests
311    /// let profiler = profiler.spawn_controllable()?;
312    /// // ... your program goes here
313    /// profiler.stop().await; // make sure the last profile is flushed
314    /// # }
315    /// # Ok(())
316    /// # }
317    /// ```
318    pub fn with_wall_clock_interval(mut self, wall_clock: Duration) -> Self {
319        self.wall_clock_millis = Some(wall_clock.as_millis());
320        self
321    }
322
323    /// Build the [`ProfilerOptions`] from the builder.
324    pub fn build(self) -> ProfilerOptions {
325        ProfilerOptions {
326            native_mem: self.native_mem,
327            wall_clock_millis: self.wall_clock_millis,
328            cpu_interval: self.cpu_interval,
329        }
330    }
331}
332
333/// Builds a [`Profiler`], panicking if any required fields were not set by the
334/// time `build` is called.
335#[derive(Debug, Default)]
336pub struct ProfilerBuilder {
337    reporting_interval: Option<Duration>,
338    reporter: Option<Box<dyn Reporter + Send + Sync>>,
339    agent_metadata: Option<AgentMetadata>,
340    profiler_options: Option<ProfilerOptions>,
341}
342
343impl ProfilerBuilder {
344    /// Sets the reporting interval (default: 30 seconds).
345    ///
346    /// This is the interval that samples are *reported* to the backend,
347    /// and is unrelated to the interval at which the application
348    /// is *sampled* by async profiler, which is controlled by
349    /// [ProfilerOptionsBuilder::with_cpu_interval] and
350    /// [ProfilerOptionsBuilder::with_wall_clock_interval].
351    ///
352    /// Most users should not change this setting.
353    ///
354    /// ## Example
355    ///
356    /// ```no_run
357    /// # use async_profiler_agent::profiler::SpawnError;
358    /// # #[tokio::main]
359    /// # async fn main() -> Result<(), SpawnError> {
360    /// # use std::path::PathBuf;
361    /// # use std::time::Duration;
362    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
363    /// # let path = PathBuf::from(".");
364    /// let agent = ProfilerBuilder::default()
365    ///     .with_local_reporter(path)
366    ///     .with_reporting_interval(Duration::from_secs(15))
367    ///     .build()
368    ///     .spawn_controllable()?;
369    /// // .. your program goes here
370    /// agent.stop().await; // make sure the last profile is flushed
371    /// # Ok::<_, SpawnError>(())
372    /// # }
373    /// ```
374    pub fn with_reporting_interval(mut self, i: Duration) -> ProfilerBuilder {
375        self.reporting_interval = Some(i);
376        self
377    }
378
379    /// Sets the [`Reporter`], which is used to upload the collected profiling
380    /// data. Common reporters are [`LocalReporter`], and, with the `s3-no-defaults`
381    /// feature enabled,
382    #[cfg_attr(not(feature = "s3-no-defaults"), doc = "`S3Reporter`.")]
383    #[cfg_attr(feature = "s3-no-defaults", doc = "[`S3Reporter`].")]
384    /// It is also possible to write your own [`Reporter`].
385    ///
386    /// It's normally easier to use [`LocalReporter`] directly via
387    /// [`ProfilerBuilder::with_local_reporter`].
388    ///
389    /// If you want to output to multiple reporters, you can use
390    /// [`MultiReporter`].
391    ///
392    /// [`LocalReporter`]: crate::reporter::local::LocalReporter
393    /// [`MultiReporter`]: crate::reporter::multi::MultiReporter
394    #[cfg_attr(
395        feature = "s3-no-defaults",
396        doc = "[`S3Reporter`]: crate::reporter::s3::S3Reporter"
397    )]
398    ///
399    #[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example.md"))]
400    pub fn with_reporter(mut self, r: impl Reporter + Send + Sync + 'static) -> ProfilerBuilder {
401        self.reporter = Some(Box::new(r));
402        self
403    }
404
405    /// Sets the profiler to ues [LocalReporter], which will write `.jfr` files to `path`,
406    /// and disables metadata auto-detection (see [`ProfilerBuilder::with_custom_agent_metadata`])
407    /// since the [LocalReporter] does not need that.
408    ///
409    /// This is useful for testing, since metadata auto-detection currently only works
410    /// on [Amazon EC2] or [Amazon Fargate] instances.
411    ///
412    /// The local reporter should normally not be used in production, since it will
413    /// not clean up JFR files. Instead, you can use a pre-existing [`Reporter`]
414    /// or write your own (see [`ProfilerBuilder::with_reporter`]).
415    ///
416    /// [Amazon EC2]: https://aws.amazon.com/ec2
417    /// [Amazon Fargate]: https://aws.amazon.com/fargate
418    ///
419    /// ## Example
420    ///
421    /// This will write profiles as `.jfr` files to `./path-to-profiles`:
422    ///
423    /// ```no_run
424    /// # use std::path::PathBuf;
425    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
426    /// # use async_profiler_agent::reporter::local::LocalReporter;
427    /// # use async_profiler_agent::metadata::AgentMetadata;
428    /// let path = PathBuf::from("./path-to-profiles");
429    /// let agent = ProfilerBuilder::default()
430    ///     .with_local_reporter(path)
431    ///     .build()
432    ///     .spawn()?;
433    /// # Ok::<_, SpawnError>(())
434    /// ```
435    pub fn with_local_reporter(mut self, path: impl Into<PathBuf>) -> ProfilerBuilder {
436        self.reporter = Some(Box::new(LocalReporter::new(path.into())));
437        self.with_custom_agent_metadata(AgentMetadata::NoMetadata)
438    }
439
440    /// Provide custom agent metadata.
441    ///
442    /// The async-profiler Rust agent sends metadata to the [Reporter] with
443    /// the identity of the current host and process, which is normally
444    /// transmitted as `metadata.json` within the generated `.zip` file,
445    /// using the schema format [`reporter::s3::MetadataJson`].
446    ///
447    /// That metadata can later be used by tooling to be able to sort
448    /// profiling reports by host.
449    ///
450    /// async-profiler Rust agent will by default try to fetch the metadata
451    /// using [IMDS] when running on [Amazon EC2] or [Amazon Fargate], and
452    /// will error if it's unable to find it. If you are running the
453    /// async-profiler agent on any other form of compute,
454    /// you will need to create and attach your own metadata
455    /// by calling this function.
456    ///
457    #[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example-custom-metadata.md"))]
458    /// [`reporter::s3::MetadataJson`]: crate::reporter::s3::MetadataJson
459    /// [Amazon EC2]: https://aws.amazon.com/ec2
460    /// [Amazon Fargate]: https://aws.amazon.com/fargate
461    /// [IMDS]: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
462    pub fn with_custom_agent_metadata(mut self, j: AgentMetadata) -> ProfilerBuilder {
463        self.agent_metadata = Some(j);
464        self
465    }
466
467    /// Provide custom profiler options.
468    ///
469    /// ### Example
470    ///
471    /// This will sample allocations for every 10 megabytes allocated:
472    ///
473    /// ```
474    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
475    /// # use async_profiler_agent::profiler::SpawnError;
476    /// # #[tokio::main]
477    /// # async fn main() -> Result<(), SpawnError> {
478    /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build();
479    /// let profiler = ProfilerBuilder::default()
480    ///     .with_profiler_options(opts)
481    ///     .with_local_reporter("/tmp/profiles")
482    ///     .build();
483    /// # if false { // don't spawn the profiler in doctests
484    /// let profiler = profiler.spawn_controllable()?;
485    /// // ... your program goes here
486    /// profiler.stop().await; // make sure the last profile is flushed
487    /// # }
488    /// # Ok(())
489    /// # }
490    /// ```
491    pub fn with_profiler_options(mut self, c: ProfilerOptions) -> ProfilerBuilder {
492        self.profiler_options = Some(c);
493        self
494    }
495
496    /// Turn this builder into a profiler!
497    pub fn build(self) -> Profiler {
498        Profiler {
499            reporting_interval: self.reporting_interval.unwrap_or(Duration::from_secs(30)),
500            reporter: self.reporter.expect("reporter is required"),
501            agent_metadata: self.agent_metadata,
502            profiler_options: self.profiler_options.unwrap_or_default(),
503        }
504    }
505}
506
507enum Status {
508    Idle,
509    Starting,
510    Running(SystemTime),
511}
512
513/// This type provides wrapper APIs over [`asprof::AsProf`], to allow tracking
514/// of the state of the profiler. The primary benefit of this is RAII - when
515/// this type drops, it will stop the profiler if it's running.
516struct ProfilerState<E: ProfilerEngine> {
517    // this is only None in the destructor when stopping the async-profiler fails
518    jfr_file: Option<JfrFile>,
519    asprof: E,
520    status: Status,
521    profiler_options: ProfilerOptions,
522}
523
524impl<E: ProfilerEngine> ProfilerState<E> {
525    fn new(asprof: E, profiler_options: ProfilerOptions) -> Result<Self, io::Error> {
526        Ok(Self {
527            jfr_file: Some(JfrFile::new()?),
528            asprof,
529            status: Status::Idle,
530            profiler_options,
531        })
532    }
533
534    fn jfr_file_mut(&mut self) -> &mut JfrFile {
535        self.jfr_file.as_mut().unwrap()
536    }
537
538    async fn start(&mut self) -> Result<(), AsProfError> {
539        let active = self.jfr_file.as_ref().unwrap().active_path();
540        // drop guard - make sure the files are leaked if the profiler might have started
541        self.status = Status::Starting;
542        E::start_async_profiler(&self.asprof, &active, &self.profiler_options)?;
543        self.status = Status::Running(SystemTime::now());
544        Ok(())
545    }
546
547    fn stop(&mut self) -> Result<Option<SystemTime>, AsProfError> {
548        E::stop_async_profiler()?;
549        let status = std::mem::replace(&mut self.status, Status::Idle);
550        Ok(match status {
551            Status::Idle | Status::Starting => None,
552            Status::Running(since) => Some(since),
553        })
554    }
555
556    fn is_started(&self) -> bool {
557        matches!(self.status, Status::Running(_))
558    }
559}
560
561impl<E: ProfilerEngine> Drop for ProfilerState<E> {
562    fn drop(&mut self) {
563        match self.status {
564            Status::Running(_) => {
565                if let Err(err) = self.stop() {
566                    // SECURITY: avoid removing the JFR file if stopping the profiler fails,
567                    // to avoid symlink races
568                    std::mem::forget(self.jfr_file.take());
569                    // XXX: Rust defines leaking resources during drop as safe.
570                    tracing::warn!(?err, "unable to stop profiler during drop glue");
571                }
572            }
573            Status::Idle => {}
574            Status::Starting => {
575                // SECURITY: avoid removing the JFR file if stopping the profiler fails,
576                // to avoid symlink races
577                std::mem::forget(self.jfr_file.take());
578            }
579        }
580    }
581}
582
583pub(crate) trait ProfilerEngine: Send + Sync + 'static {
584    fn init_async_profiler() -> Result<(), asprof::AsProfError>;
585    fn start_async_profiler(
586        &self,
587        jfr_file_path: &Path,
588        options: &ProfilerOptions,
589    ) -> Result<(), asprof::AsProfError>;
590    fn stop_async_profiler() -> Result<(), asprof::AsProfError>;
591}
592
593/// Holds the profiler task state and performs a final synchronous report
594/// when the task is cancelled (e.g. Tokio runtime shutdown) before a
595/// graceful stop. The local reporter will flush its contents on drop.
596/// For other reporters, you must call `RunningProfiler::stop().await`
597/// to ensure the last sample is uploaded.
598struct ProfilerTaskState<E: ProfilerEngine> {
599    state: ProfilerState<E>,
600    reporter: Box<dyn Reporter + Send + Sync>,
601    agent_metadata: Option<AgentMetadata>,
602    reporting_interval: Duration,
603    completed_normally: bool,
604}
605
606impl<E: ProfilerEngine> ProfilerTaskState<E> {
607    fn try_final_report(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
608        let start = self.state.stop()?.ok_or("profiler was not running")?;
609        let jfr_file = self.state.jfr_file.as_ref().ok_or("jfr file missing")?;
610        let jfr_path = jfr_file.active_path();
611        if jfr_path.metadata()?.len() == 0 {
612            return Ok(());
613        }
614        let metadata = ReportMetadata {
615            instance: self
616                .agent_metadata
617                .as_ref()
618                .unwrap_or(&AgentMetadata::NoMetadata),
619            start: start.duration_since(UNIX_EPOCH)?,
620            end: SystemTime::now().duration_since(UNIX_EPOCH)?,
621            reporting_interval: self.reporting_interval,
622        };
623        self.reporter
624            .report_blocking(&jfr_path, &metadata)
625            .map_err(|e| e.to_string())?;
626        Ok(())
627    }
628}
629
630impl<E: ProfilerEngine> Drop for ProfilerTaskState<E> {
631    fn drop(&mut self) {
632        if self.completed_normally || !self.state.is_started() {
633            return;
634        }
635        tracing::info!("profiler task cancelled, attempting final report on drop");
636        if let Err(err) = self.try_final_report() {
637            tracing::warn!(?err, "failed to report on drop");
638        }
639    }
640}
641
642#[derive(Debug, Error)]
643#[non_exhaustive]
644enum TickError {
645    #[error(transparent)]
646    AsProf(#[from] AsProfError),
647    #[error(transparent)]
648    #[cfg(feature = "aws-metadata-no-defaults")]
649    Metadata(#[from] crate::metadata::aws::AwsProfilerMetadataError),
650    #[error("reporter: {0}")]
651    Reporter(Box<dyn std::error::Error + Send>),
652    #[error("broken clock: {0}")]
653    BrokenClock(#[from] SystemTimeError),
654    #[error("jfr read error: {0}")]
655    JfrRead(io::Error),
656    #[error("empty inactive file error: {0}")]
657    EmptyInactiveFile(io::Error),
658}
659
660#[derive(Debug, Error)]
661#[non_exhaustive]
662/// An error that happened spawning a profiler
663pub enum SpawnError {
664    /// Error from async-profiler
665    #[error(transparent)]
666    AsProf(#[from] asprof::AsProfError),
667    /// Error writing to a tempfile
668    #[error("tempfile error")]
669    TempFile(#[source] io::Error),
670}
671
672#[derive(Debug, Error)]
673#[non_exhaustive]
674/// An error from [`Profiler::spawn_thread`]
675pub enum SpawnThreadError {
676    /// Error from async-profiler
677    #[error(transparent)]
678    AsProf(#[from] SpawnError),
679    /// Error constructing Tokio runtime
680    #[error("constructing Tokio runtime")]
681    ConstructRt(#[source] io::Error),
682}
683
684// no control messages currently
685enum Control {}
686
687/// A handle to a running profiler
688///
689/// Currently just allows for stopping the profiler.
690///
691/// Dropping this handle will request that the profiler will stop.
692#[must_use = "dropping this stops the profiler, call .detach() to detach"]
693pub struct RunningProfiler {
694    stop_channel: tokio::sync::oneshot::Sender<Control>,
695    join_handle: tokio::task::JoinHandle<()>,
696}
697
698impl RunningProfiler {
699    /// Request that the current profiler stops and wait until it exits.
700    ///
701    /// This will cause the currently-pending profile information to be flushed.
702    ///
703    /// After this function returns, it is correct and safe to [spawn] a new
704    /// [Profiler], possibly with a different configuration. Therefore,
705    /// this function can be used to "reconfigure" a profiler by stopping
706    /// it and then starting a new one with a different configuration.
707    ///
708    /// [spawn]: Profiler::spawn_controllable
709    pub async fn stop(self) {
710        drop(self.stop_channel);
711        let _ = self.join_handle.await;
712    }
713
714    /// Like [Self::detach], but returns a JoinHandle. This is currently not a public API.
715    fn detach_inner(self) -> tokio::task::JoinHandle<()> {
716        tokio::task::spawn(async move {
717            // move the control channel to the spawned task. this way, it will be dropped
718            // just when the task is aborted.
719            let _abort_channel = self.stop_channel;
720            self.join_handle.await.ok();
721        })
722    }
723
724    /// Detach this profiler. This will prevent the profiler from being stopped
725    /// when this handle is dropped. You should call this (or [Profiler::spawn]
726    /// instead of [Profiler::spawn_controllable], which does the same thing)
727    /// if you don't intend to reconfigure your profiler at runtime.
728    pub fn detach(self) {
729        self.detach_inner();
730    }
731
732    /// Spawns this [RunningProfiler] into a separate thread within a new Tokio runtime,
733    /// and returns a [RunningProfilerThread] attached to it.
734    fn spawn_attached(
735        self,
736        runtime: tokio::runtime::Runtime,
737        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
738    ) -> RunningProfilerThread {
739        RunningProfilerThread {
740            stop_channel: self.stop_channel,
741            join_handle: spawn_fn(Box::new(move || {
742                let _ = runtime.block_on(self.join_handle);
743            })),
744        }
745    }
746
747    /// Spawns this [RunningProfiler] into a separate thread within a new Tokio runtime,
748    /// and detaches it.
749    fn spawn_detached(
750        self,
751        runtime: tokio::runtime::Runtime,
752        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
753    ) {
754        spawn_fn(Box::new(move || {
755            let _stop_channel = self.stop_channel;
756            let _ = runtime.block_on(self.join_handle);
757        }));
758    }
759}
760
761/// A handle to a running profiler, running on a separate thread.
762///
763/// Currently just allows for stopping the profiler.
764///
765/// Dropping this handle will request that the profiler will stop.
766#[must_use = "dropping this stops the profiler, call .detach() to detach"]
767pub struct RunningProfilerThread {
768    stop_channel: tokio::sync::oneshot::Sender<Control>,
769    join_handle: std::thread::JoinHandle<()>,
770}
771
772impl RunningProfilerThread {
773    /// Request that the current profiler stops and wait until it exits.
774    ///
775    /// This will cause the currently-pending profile information to be flushed.
776    ///
777    /// After this function returns, it is correct and safe to [spawn] a new
778    /// [Profiler], possibly with a different configuration. Therefore,
779    /// this function can be used to "reconfigure" a profiler by stopping
780    /// it and then starting a new one with a different configuration.
781    ///
782    /// [spawn]: Profiler::spawn_controllable
783    pub fn stop(self) {
784        drop(self.stop_channel);
785        let _ = self.join_handle.join();
786    }
787}
788
789/// Rust profiler based on [async-profiler].
790///
791/// Spawning a profiler can be done either in an attached (controllable)
792/// mode, which allows for stopping the profiler (and, in fact, stops
793/// it when the relevant handle is dropped), or in detached mode,
794/// in which the profiler keeps running forever. Applications that can
795/// shut down the profiler at run-time, for example applications that
796/// support reconfiguration of a running profiler, generally want to use
797/// controllable mode. Other applications (most of them) should use
798/// detached mode.
799///
800/// In addition, the profiler can either be spawned into the current Tokio
801/// runtime, or into a new one. Normally, applications should spawn
802/// the profiler into their own Tokio runtime, but applications that
803/// don't have a default Tokio runtime should spawn it into a
804/// different one
805///
806/// This leaves 4 functions:
807/// 1. [Self::spawn] - detached, same runtime
808/// 2. [Self::spawn_thread_to_runtime] - detached, different runtime
809/// 3. [Self::spawn_controllable] - controllable, same runtime
810/// 4. [Self::spawn_controllable_thread_to_runtime] - controllable, different runtime
811///
812/// In addition, there's a helper function that just spawns the profiler
813/// to a new runtime in a new thread, for applications that don't have
814/// a Tokio runtime and don't need complex control:
815///
816/// 5. [Self::spawn_thread] - detached, new runtime in a new thread
817///
818/// [async-profiler]: https://github.com/async-profiler/async-profiler
819pub struct Profiler {
820    reporting_interval: Duration,
821    reporter: Box<dyn Reporter + Send + Sync>,
822    agent_metadata: Option<AgentMetadata>,
823    profiler_options: ProfilerOptions,
824}
825
826impl Profiler {
827    /// Start profiling. The profiler will run in a tokio task at the configured interval.
828    ///
829    /// This is the same as calling [Profiler::spawn_controllable] followed by
830    /// [RunningProfiler::detach], except it returns a [JoinHandle].
831    ///
832    /// The returned [JoinHandle] can be used to detect if the profiler has exited
833    /// due to a fatal error.
834    ///
835    /// This function will fail if it is unable to start async-profiler, for example
836    /// if it can't find or load `libasyncProfiler.so`.
837    ///
838    /// [JoinHandle]: tokio::task::JoinHandle
839    ///
840    /// ### Uploading the last sample
841    ///
842    /// When you return from the Tokio `main`, the agent will terminate without waiting
843    /// for the last profiling JFR to be uploaded. Especially if you have a
844    /// short-running program, if you want to ensure the last profiling JFR
845    /// is uploaded, you should use [Profiler::spawn_controllable] and
846    /// [RunningProfiler::stop] , which allows waiting for the upload
847    /// to finish.
848    ///
849    /// If you do not care about losing the last sample, it is fine to directly
850    /// return from the Tokio `main` without stopping the profiler.
851    ///
852    /// ### Tokio Runtime
853    ///
854    /// This function must be run within a Tokio runtime, otherwise it will panic. If
855    /// your application does not have a `main` Tokio runtime, see
856    /// [Profiler::spawn_thread].
857    ///
858    /// ### Example
859    ///
860    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
861    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
862    ///
863    /// ```
864    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
865    /// # #[tokio::main]
866    /// # async fn main() -> Result<(), SpawnError> {
867    /// let profiler = ProfilerBuilder::default()
868    ///    .with_local_reporter("/tmp/profiles")
869    ///    .build();
870    /// # if false { // don't spawn the profiler in doctests
871    /// profiler.spawn()?;
872    /// # }
873    /// # Ok(())
874    /// # }
875    /// ```
876    pub fn spawn(self) -> Result<tokio::task::JoinHandle<()>, SpawnError> {
877        self.spawn_controllable().map(RunningProfiler::detach_inner)
878    }
879
880    /// Like [Self::spawn], but instead of spawning within the current Tokio
881    /// runtime, spawns within a set Tokio runtime and then runs a thread that calls
882    /// [block_on](tokio::runtime::Runtime::block_on) on that runtime.
883    ///
884    /// If your configuration is standard, use [Profiler::spawn_thread].
885    ///
886    /// If you want to be able to stop the resulting profiler, use
887    /// [Profiler::spawn_controllable_thread_to_runtime].
888    ///
889    /// `spawn_fn` should be [`std::thread::spawn`], or some function that behaves like it (to
890    /// allow for configuring thread properties, for example thread names).
891    ///
892    /// This is to be used when your program does not have a "main" Tokio runtime already set up.
893    ///
894    /// ### Uploading the last sample
895    ///
896    /// When you return from `main`, the agent will terminate without waiting
897    /// for the last profiling JFR to be uploaded. Especially if you have a
898    /// short-running program, if you want to ensure the last profiling JFR
899    /// is uploaded, you should use [Profiler::spawn_controllable_thread_to_runtime]
900    /// and [RunningProfilerThread::stop], which allows waiting for the upload
901    /// to finish.
902    ///
903    /// If you do not care about losing the last sample, it is fine to directly
904    /// return from the Tokio `main` without stopping the profiler.
905    ///
906    /// ### Example
907    ///
908    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
909    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
910    ///
911    /// ```no_run
912    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
913    /// let rt = tokio::runtime::Builder::new_current_thread()
914    ///     .enable_all()
915    ///     .build()?;
916    /// let profiler = ProfilerBuilder::default()
917    ///    .with_local_reporter("/tmp/profiles")
918    ///    .build();
919    ///
920    /// profiler.spawn_thread_to_runtime(
921    ///     rt,
922    ///     |t| {
923    ///         std::thread::Builder::new()
924    ///             .name("asprof-agent".to_owned())
925    ///             .spawn(t)
926    ///             .expect("thread name contains nuls")
927    ///     }
928    /// )?;
929    /// # Ok::<_, anyhow::Error>(())
930    /// ```
931    pub fn spawn_thread_to_runtime(
932        self,
933        runtime: tokio::runtime::Runtime,
934        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
935    ) -> Result<(), SpawnError> {
936        self.spawn_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn)
937    }
938
939    /// Like [Self::spawn], but instead of spawning within the current Tokio
940    /// runtime, spawns within a new Tokio runtime and then runs a thread that calls
941    /// [block_on](tokio::runtime::Runtime::block_on) on that runtime, setting up the runtime
942    /// by itself.
943    ///
944    /// If your configuration is less standard, use [Profiler::spawn_thread_to_runtime]. Calling
945    /// [Profiler::spawn_thread] is equivalent to calling [Profiler::spawn_thread_to_runtime]
946    /// with the following:
947    /// 1. a current thread runtime with background worker threads (these exist
948    ///    for blocking IO) named "asprof-worker"
949    /// 2. a controller thread (the "main" thread of the runtime) named "asprof-agent"
950    ///
951    /// If you want to be able to stop the resulting profiler, use
952    /// [Profiler::spawn_controllable_thread_to_runtime].
953    ///
954    /// This is to be used when your program does not have a "main" Tokio runtime already set up.
955    ///
956    /// ### Uploading the last sample
957    ///
958    /// When you return from `main`, the agent will terminate without waiting
959    /// for the last profiling JFR to be uploaded. Especially if you have a
960    /// short-running program, if you want to ensure the last profiling JFR
961    /// is uploaded, you should use [Profiler::spawn_controllable_thread_to_runtime]
962    /// and [RunningProfilerThread::stop], which allows waiting for the upload
963    /// to finish.
964    ///
965    /// If you do not care about losing the last sample, it is fine to directly
966    /// return from the Tokio `main` without stopping the profiler.
967    ///
968    /// ### Example
969    ///
970    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
971    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
972    ///
973    /// ```no_run
974    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
975    /// # use async_profiler_agent::reporter::local::LocalReporter;
976    /// let profiler = ProfilerBuilder::default()
977    ///    .with_local_reporter("/tmp/profiles")
978    ///    .build();
979    ///
980    /// profiler.spawn_thread()?;
981    /// # Ok::<_, anyhow::Error>(())
982    /// ```
983    pub fn spawn_thread(self) -> Result<(), SpawnThreadError> {
984        // using "asprof" in thread name to deal with 15 character + \0 length limit
985        let rt = tokio::runtime::Builder::new_current_thread()
986            .thread_name("asprof-worker".to_owned())
987            .enable_all()
988            .build()
989            .map_err(SpawnThreadError::ConstructRt)?;
990        let builder = std::thread::Builder::new().name("asprof-agent".to_owned());
991        self.spawn_thread_to_runtime(rt, |t| builder.spawn(t).expect("thread name contains nuls"))
992            .map_err(SpawnThreadError::AsProf)
993    }
994
995    fn spawn_thread_inner<E: ProfilerEngine>(
996        self,
997        asprof: E,
998        runtime: tokio::runtime::Runtime,
999        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
1000    ) -> Result<(), SpawnError> {
1001        let handle: RunningProfiler = runtime.block_on(async move { self.spawn_inner(asprof) })?;
1002        handle.spawn_detached(runtime, spawn_fn);
1003        Ok(())
1004    }
1005
1006    /// Like [Self::spawn], but returns a [RunningProfiler] that allows for controlling
1007    /// (currently only stopping) the profiler.
1008    ///
1009    /// This allows for changing the configuration of the profiler at runtime, by
1010    /// stopping it and then starting a new Profiler with a new configuration. It
1011    /// also allows for stopping profiling in case the profiler is suspected to
1012    /// cause operational issues.
1013    ///
1014    /// Dropping the returned [RunningProfiler] will cause the profiler to quit,
1015    /// so if your application doen't need to change the profiler's configuration at runtime,
1016    /// it will be easier to use [Profiler::spawn].
1017    ///
1018    /// This function will fail if it is unable to start async-profiler, for example
1019    /// if it can't find or load `libasyncProfiler.so`.
1020    ///
1021    /// ### Uploading the last sample
1022    ///
1023    /// When you return from the Tokio `main`, the agent will terminate without waiting
1024    /// for the last profiling JFR to be uploaded. Especially if you have a
1025    /// short-running program, if you want to ensure the last profiling JFR
1026    /// is uploaded, you should use [RunningProfiler::stop], which allows waiting for
1027    /// the upload to finish.
1028    ///
1029    /// If you do not care about losing the last sample, it is fine to directly
1030    /// return from the Tokio `main` without stopping the profiler.
1031    ///
1032    /// ### Tokio Runtime
1033    ///
1034    /// This function must be run within a Tokio runtime, otherwise it will panic. If
1035    /// your application does not have a `main` Tokio runtime, see
1036    /// [Profiler::spawn_controllable_thread_to_runtime].
1037    ///
1038    /// ### Example
1039    ///
1040    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
1041    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
1042    ///
1043    /// ```no_run
1044    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
1045    /// # #[tokio::main]
1046    /// # async fn main() -> Result<(), SpawnError> {
1047    /// let profiler = ProfilerBuilder::default()
1048    ///    .with_local_reporter("/tmp/profiles")
1049    ///    .build();
1050    ///
1051    /// let profiler = profiler.spawn_controllable()?;
1052    ///
1053    /// // [insert your signaling/monitoring mechanism to have a request to disable
1054    /// // profiling in case of a problem]
1055    /// let got_request_to_disable_profiling = async move {
1056    ///     // ...
1057    /// #   false
1058    /// };
1059    /// // spawn a task that will disable profiling if requested
1060    /// tokio::task::spawn(async move {
1061    ///     if got_request_to_disable_profiling.await {
1062    ///         profiler.stop().await;
1063    ///     }
1064    /// });
1065    /// # Ok(())
1066    /// # }
1067    /// ```
1068    pub fn spawn_controllable(self) -> Result<RunningProfiler, SpawnError> {
1069        self.spawn_inner(asprof::AsProf::builder().build())
1070    }
1071
1072    /// Like [Self::spawn_controllable], but instead of spawning within the current Tokio
1073    /// runtime, spawns within a set Tokio runtime and then runs a thread that calls
1074    /// [block_on](tokio::runtime::Runtime::block_on) on that runtime.
1075    ///
1076    /// `spawn_fn` should be [`std::thread::spawn`], or some function that behaves like it (to
1077    /// allow for configuring thread properties, for example thread names).
1078    ///
1079    /// This is to be used when your program does not have a "main" Tokio runtime already set up.
1080    ///
1081    /// ### Uploading the last sample
1082    ///
1083    /// When you return from `main`, the agent will terminate without waiting
1084    /// for the last profiling JFR to be uploaded. Especially if you have a
1085    /// short-running program, if you want to ensure the last profiling JFR
1086    /// is uploaded, you should use [RunningProfilerThread::stop], which allows waiting
1087    /// for the upload to finish.
1088    ///
1089    /// If you do not care about losing the last sample, it is fine to directly
1090    /// return from the Tokio `main` without stopping the profiler.
1091    ///
1092    /// ### Example
1093    ///
1094    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
1095    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
1096    ///
1097    /// ```no_run
1098    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
1099    /// let rt = tokio::runtime::Builder::new_current_thread()
1100    ///     .enable_all()
1101    ///     .build()?;
1102    /// let profiler = ProfilerBuilder::default()
1103    ///    .with_local_reporter("/tmp/profiles")
1104    ///    .build();
1105    ///
1106    /// let profiler = profiler.spawn_controllable_thread_to_runtime(
1107    ///     rt,
1108    ///     |t| {
1109    ///         std::thread::Builder::new()
1110    ///             .name("asprof-agent".to_owned())
1111    ///             .spawn(t)
1112    ///             .expect("thread name contains nuls")
1113    ///     }
1114    /// )?;
1115    ///
1116    /// # fn got_request_to_disable_profiling() -> bool { false }
1117    /// // spawn a task that will disable profiling if requested
1118    /// std::thread::spawn(move || {
1119    ///     if got_request_to_disable_profiling() {
1120    ///         profiler.stop();
1121    ///     }
1122    /// });
1123    /// # Ok::<_, anyhow::Error>(())
1124    /// ```
1125    pub fn spawn_controllable_thread_to_runtime(
1126        self,
1127        runtime: tokio::runtime::Runtime,
1128        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
1129    ) -> Result<RunningProfilerThread, SpawnError> {
1130        self.spawn_controllable_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn)
1131    }
1132
1133    fn spawn_controllable_thread_inner<E: ProfilerEngine>(
1134        self,
1135        asprof: E,
1136        runtime: tokio::runtime::Runtime,
1137        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
1138    ) -> Result<RunningProfilerThread, SpawnError> {
1139        let handle = runtime.block_on(async move { self.spawn_inner(asprof) })?;
1140        Ok(handle.spawn_attached(runtime, spawn_fn))
1141    }
1142
1143    fn spawn_inner<E: ProfilerEngine>(self, asprof: E) -> Result<RunningProfiler, SpawnError> {
1144        // Initialize async profiler - needs to be done once.
1145        E::init_async_profiler()?;
1146        tracing::info!("successfully initialized async profiler.");
1147
1148        let mut sampling_ticker = tokio::time::interval(self.reporting_interval);
1149        let (stop_channel, mut stop_rx) = tokio::sync::oneshot::channel();
1150
1151        // Get profiles at the configured interval rate.
1152        let join_handle = tokio::spawn(async move {
1153            let state = match ProfilerState::new(asprof, self.profiler_options) {
1154                Ok(state) => state,
1155                Err(err) => {
1156                    tracing::error!(?err, "unable to create profiler state");
1157                    return;
1158                }
1159            };
1160
1161            let mut task = ProfilerTaskState {
1162                state,
1163                reporter: self.reporter,
1164                agent_metadata: self.agent_metadata,
1165                reporting_interval: self.reporting_interval,
1166                completed_normally: false,
1167            };
1168
1169            let mut done = false;
1170            while !done {
1171                // Wait until a timer or exit event
1172                tokio::select! {
1173                    biased;
1174
1175                    r = &mut stop_rx, if !stop_rx.is_terminated() => {
1176                        match r {
1177                            Err(_) => {
1178                                tracing::info!("profiler stop requested, doing a final tick");
1179                                done = true;
1180                            }
1181                        }
1182                    }
1183                    _ = sampling_ticker.tick() => {
1184                        tracing::debug!("profiler timer woke up");
1185                    }
1186                }
1187
1188                if let Err(err) = profiler_tick(
1189                    &mut task.state,
1190                    &mut task.agent_metadata,
1191                    &*task.reporter,
1192                    task.reporting_interval,
1193                )
1194                .await
1195                {
1196                    match &err {
1197                        TickError::Reporter(_) => {
1198                            // don't stop on IO errors
1199                            tracing::error!(?err, "error during profiling, continuing");
1200                        }
1201                        _stop => {
1202                            tracing::error!(?err, "error during profiling, stopping");
1203                            break;
1204                        }
1205                    }
1206                }
1207            }
1208
1209            task.completed_normally = true;
1210            tracing::info!("profiling task finished");
1211        });
1212
1213        Ok(RunningProfiler {
1214            stop_channel,
1215            join_handle,
1216        })
1217    }
1218}
1219
1220async fn profiler_tick<E: ProfilerEngine>(
1221    state: &mut ProfilerState<E>,
1222    agent_metadata: &mut Option<AgentMetadata>,
1223    reporter: &(dyn Reporter + Send + Sync),
1224    reporting_interval: Duration,
1225) -> Result<(), TickError> {
1226    if !state.is_started() {
1227        state.start().await?;
1228        return Ok(());
1229    }
1230
1231    let Some(start) = state.stop()? else {
1232        tracing::warn!("stopped the profiler but it wasn't running?");
1233        return Ok(());
1234    };
1235    let start = start.duration_since(UNIX_EPOCH)?;
1236    let end = SystemTime::now().duration_since(UNIX_EPOCH)?;
1237
1238    // Start it up immediately, writing to the "other" file, so that we keep
1239    // profiling the application while we're reporting data.
1240    state
1241        .jfr_file_mut()
1242        .empty_inactive_file()
1243        .map_err(TickError::EmptyInactiveFile)?;
1244    state.jfr_file_mut().swap();
1245    state.start().await?;
1246
1247    // Lazily load the agent metadata if it was not provided in
1248    // the constructor. See the struct comments for why this is.
1249    // This code runs at most once.
1250    if agent_metadata.is_none() {
1251        #[cfg(feature = "aws-metadata-no-defaults")]
1252        let md = crate::metadata::aws::load_agent_metadata().await?;
1253        #[cfg(not(feature = "aws-metadata-no-defaults"))]
1254        let md = crate::metadata::AgentMetadata::NoMetadata;
1255        tracing::debug!("loaded metadata");
1256        agent_metadata.replace(md);
1257    }
1258
1259    let report_metadata = ReportMetadata {
1260        instance: agent_metadata.as_ref().unwrap(),
1261        start,
1262        end,
1263        reporting_interval,
1264    };
1265
1266    let jfr = tokio::fs::read(state.jfr_file_mut().inactive_path())
1267        .await
1268        .map_err(TickError::JfrRead)?;
1269
1270    reporter
1271        .report(jfr, &report_metadata)
1272        .await
1273        .map_err(TickError::Reporter)?;
1274
1275    Ok(())
1276}
1277
1278#[cfg(test)]
1279mod tests {
1280    use std::sync::Arc;
1281    use std::sync::atomic::{self, AtomicBool, AtomicU32};
1282
1283    use test_case::test_case;
1284
1285    use super::*;
1286
1287    #[test]
1288    fn test_jfr_file_drop() {
1289        let mut jfr = JfrFile::new().unwrap();
1290
1291        std::fs::write(jfr.active_path(), b"Hello, 2!").unwrap();
1292        jfr.swap();
1293        assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"Hello, 2!");
1294        jfr.empty_inactive_file().unwrap();
1295        assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"");
1296    }
1297
1298    struct MockProfilerEngine {
1299        counter: AtomicU32,
1300    }
1301    impl ProfilerEngine for MockProfilerEngine {
1302        fn init_async_profiler() -> Result<(), asprof::AsProfError> {
1303            Ok(())
1304        }
1305
1306        fn start_async_profiler(
1307            &self,
1308            jfr_file_path: &Path,
1309            _options: &ProfilerOptions,
1310        ) -> Result<(), asprof::AsProfError> {
1311            let contents = format!(
1312                "JFR{}",
1313                self.counter.fetch_add(1, atomic::Ordering::Relaxed)
1314            );
1315            std::fs::write(jfr_file_path, contents.as_bytes()).unwrap();
1316            Ok(())
1317        }
1318
1319        fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
1320            Ok(())
1321        }
1322    }
1323
1324    struct MockReporter(tokio::sync::mpsc::Sender<(String, AgentMetadata)>);
1325    impl std::fmt::Debug for MockReporter {
1326        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1327            f.debug_struct("MockReporter").finish()
1328        }
1329    }
1330
1331    #[async_trait::async_trait]
1332    impl Reporter for MockReporter {
1333        async fn report(
1334            &self,
1335            jfr: Vec<u8>,
1336            metadata: &ReportMetadata,
1337        ) -> Result<(), Box<dyn std::error::Error + Send>> {
1338            self.0
1339                .send((String::from_utf8(jfr).unwrap(), metadata.instance.clone()))
1340                .await
1341                .unwrap();
1342            Ok(())
1343        }
1344    }
1345
1346    fn make_mock_profiler() -> (
1347        Profiler,
1348        tokio::sync::mpsc::Receiver<(String, AgentMetadata)>,
1349    ) {
1350        let (tx, rx) = tokio::sync::mpsc::channel(1);
1351        let agent = ProfilerBuilder::default()
1352            .with_reporter(MockReporter(tx))
1353            .with_custom_agent_metadata(AgentMetadata::Ec2AgentMetadata {
1354                aws_account_id: "0".into(),
1355                aws_region_id: "us-east-1".into(),
1356                ec2_instance_id: "i-fake".into(),
1357                ec2_instance_type: "t3.micro".into(),
1358            })
1359            .build();
1360        (agent, rx)
1361    }
1362
1363    #[tokio::test(start_paused = true)]
1364    async fn test_profiler_agent() {
1365        let e_md = AgentMetadata::Ec2AgentMetadata {
1366            aws_account_id: "0".into(),
1367            aws_region_id: "us-east-1".into(),
1368            ec2_instance_id: "i-fake".into(),
1369            ec2_instance_type: "t3.micro".into(),
1370        };
1371        let (agent, mut rx) = make_mock_profiler();
1372        agent
1373            .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1374                counter: AtomicU32::new(0),
1375            })
1376            .unwrap()
1377            .detach();
1378        let (jfr, md) = rx.recv().await.unwrap();
1379        assert_eq!(jfr, "JFR0");
1380        assert_eq!(e_md, md);
1381        let (jfr, md) = rx.recv().await.unwrap();
1382        assert_eq!(jfr, "JFR1");
1383        assert_eq!(e_md, md);
1384    }
1385
1386    #[test_case(false; "uncontrollable")]
1387    #[test_case(true; "controllable")]
1388    fn test_profiler_local_rt(controllable: bool) {
1389        let e_md = AgentMetadata::Ec2AgentMetadata {
1390            aws_account_id: "0".into(),
1391            aws_region_id: "us-east-1".into(),
1392            ec2_instance_id: "i-fake".into(),
1393            ec2_instance_type: "t3.micro".into(),
1394        };
1395        let (agent, mut rx) = make_mock_profiler();
1396        let rt = tokio::runtime::Builder::new_current_thread()
1397            .enable_all()
1398            .start_paused(true)
1399            .build()
1400            .unwrap();
1401        // spawn the profiler, doing this before spawning a thread to allow
1402        // capturing errors from `spawn`
1403        let handle = if controllable {
1404            Some(
1405                agent
1406                    .spawn_controllable_thread_inner::<MockProfilerEngine>(
1407                        MockProfilerEngine {
1408                            counter: AtomicU32::new(0),
1409                        },
1410                        rt,
1411                        std::thread::spawn,
1412                    )
1413                    .unwrap(),
1414            )
1415        } else {
1416            agent
1417                .spawn_thread_inner::<MockProfilerEngine>(
1418                    MockProfilerEngine {
1419                        counter: AtomicU32::new(0),
1420                    },
1421                    rt,
1422                    std::thread::spawn,
1423                )
1424                .unwrap();
1425            None
1426        };
1427
1428        let (jfr, md) = rx.blocking_recv().unwrap();
1429        assert_eq!(jfr, "JFR0");
1430        assert_eq!(e_md, md);
1431        let (jfr, md) = rx.blocking_recv().unwrap();
1432        assert_eq!(jfr, "JFR1");
1433        assert_eq!(e_md, md);
1434
1435        if let Some(handle) = handle {
1436            let drain_thread =
1437                std::thread::spawn(move || while let Some(_) = rx.blocking_recv() {});
1438            // request a stop
1439            handle.stop();
1440            // the drain thread should be done
1441            drain_thread.join().unwrap();
1442        }
1443    }
1444
1445    enum StopKind {
1446        Delibrate,
1447        Drop,
1448        Abort,
1449    }
1450
1451    #[tokio::test(start_paused = true)]
1452    #[test_case(StopKind::Delibrate; "deliberate stop")]
1453    #[test_case(StopKind::Drop; "drop stop")]
1454    #[test_case(StopKind::Abort; "abort stop")]
1455    async fn test_profiler_stop(stop_kind: StopKind) {
1456        let e_md = AgentMetadata::Ec2AgentMetadata {
1457            aws_account_id: "0".into(),
1458            aws_region_id: "us-east-1".into(),
1459            ec2_instance_id: "i-fake".into(),
1460            ec2_instance_type: "t3.micro".into(),
1461        };
1462        let (agent, mut rx) = make_mock_profiler();
1463        let profiler_ref = agent
1464            .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1465                counter: AtomicU32::new(0),
1466            })
1467            .unwrap();
1468        let (jfr, md) = rx.recv().await.unwrap();
1469        assert_eq!(jfr, "JFR0");
1470        assert_eq!(e_md, md);
1471        let (jfr, md) = rx.recv().await.unwrap();
1472        assert_eq!(jfr, "JFR1");
1473        assert_eq!(e_md, md);
1474        // check that stop is faster than an interval and returns an "immediate" next jfr
1475        match stop_kind {
1476            StopKind::Drop => drop(profiler_ref),
1477            StopKind::Delibrate => {
1478                tokio::time::timeout(Duration::from_millis(1), profiler_ref.stop())
1479                    .await
1480                    .unwrap();
1481            }
1482            StopKind::Abort => {
1483                // You can call Abort on the JoinHandle. make sure that is not buggy.
1484                profiler_ref.detach_inner().abort();
1485            }
1486        }
1487        // check that we get the next JFR "quickly", and the JFR after that is empty.
1488        let (jfr, md) = tokio::time::timeout(Duration::from_millis(1), rx.recv())
1489            .await
1490            .unwrap()
1491            .unwrap();
1492        assert_eq!(jfr, "JFR2");
1493        assert_eq!(e_md, md);
1494        assert!(rx.recv().await.is_none());
1495    }
1496
1497    // simulate a badly-behaved profiler that errors on start/stop and then
1498    // tries to access the JFR file
1499    struct StopErrorProfilerEngine {
1500        start_error: bool,
1501        counter: Arc<AtomicBool>,
1502    }
1503    impl ProfilerEngine for StopErrorProfilerEngine {
1504        fn init_async_profiler() -> Result<(), asprof::AsProfError> {
1505            Ok(())
1506        }
1507
1508        fn start_async_profiler(
1509            &self,
1510            jfr_file_path: &Path,
1511            _options: &ProfilerOptions,
1512        ) -> Result<(), asprof::AsProfError> {
1513            let jfr_file_path = jfr_file_path.to_owned();
1514            std::fs::write(&jfr_file_path, "JFR").unwrap();
1515            let counter = self.counter.clone();
1516            tokio::task::spawn(async move {
1517                tokio::time::sleep(Duration::from_secs(5)).await;
1518                assert_eq!(std::fs::read_to_string(jfr_file_path).unwrap(), "JFR");
1519                counter.store(true, atomic::Ordering::Release);
1520            });
1521            if self.start_error {
1522                Err(asprof::AsProfError::AsyncProfilerError("error".into()))
1523            } else {
1524                Ok(())
1525            }
1526        }
1527
1528        fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
1529            Err(asprof::AsProfError::AsyncProfilerError("error".into()))
1530        }
1531    }
1532
1533    #[tokio::test(start_paused = true)]
1534    #[test_case(false; "error on stop")]
1535    #[test_case(true; "error on start")]
1536    async fn test_profiler_error(start_error: bool) {
1537        let (agent, mut rx) = make_mock_profiler();
1538        let counter = Arc::new(AtomicBool::new(false));
1539        let engine = StopErrorProfilerEngine {
1540            start_error,
1541            counter: counter.clone(),
1542        };
1543        let handle = agent.spawn_inner(engine).unwrap().detach_inner();
1544        assert!(rx.recv().await.is_none());
1545        // check that the "sleep 5" step in start_async_profiler succeeds
1546        for _ in 0..100 {
1547            tokio::time::sleep(Duration::from_secs(1)).await;
1548            if counter.load(atomic::Ordering::Acquire) {
1549                handle.await.unwrap(); // Check that the JoinHandle is done
1550                return;
1551            }
1552        }
1553        panic!("didn't read from file");
1554    }
1555
1556    #[test]
1557    fn test_profiler_options_to_args_string_default() {
1558        let opts = ProfilerOptions::default();
1559        let dummy_path = Path::new("/tmp/test.jfr");
1560        let args = opts.to_args_string(dummy_path);
1561        assert!(
1562            args.contains("start,event=cpu,interval=100000000,wall=1000ms,jfr,cstack=dwarf"),
1563            "Default args string not constructed correctly"
1564        );
1565        assert!(args.contains("file=/tmp/test.jfr"));
1566        assert!(!args.contains("nativemem="));
1567    }
1568
1569    #[test]
1570    fn test_profiler_options_to_args_string_with_native_mem() {
1571        let opts = ProfilerOptions {
1572            native_mem: Some("10m".to_string()),
1573            wall_clock_millis: None,
1574            cpu_interval: None,
1575        };
1576        let dummy_path = Path::new("/tmp/test.jfr");
1577        let args = opts.to_args_string(dummy_path);
1578        assert!(args.contains("nativemem=10m"));
1579    }
1580
1581    #[test]
1582    fn test_profiler_options_builder() {
1583        let opts = ProfilerOptionsBuilder::default()
1584            .with_native_mem_bytes(5000000)
1585            .build();
1586
1587        assert_eq!(opts.native_mem, Some("5000000".to_string()));
1588    }
1589
1590    #[test]
1591    fn test_profiler_options_builder_all_options() {
1592        let opts = ProfilerOptionsBuilder::default()
1593            .with_native_mem_bytes(5000000)
1594            .with_cpu_interval(Duration::from_secs(1))
1595            .with_wall_clock_interval(Duration::from_secs(10))
1596            .build();
1597
1598        let dummy_path = Path::new("/tmp/test.jfr");
1599        let args = opts.to_args_string(dummy_path);
1600        assert_eq!(
1601            args,
1602            "start,event=cpu,interval=1000000000,wall=10000ms,jfr,cstack=dwarf,file=/tmp/test.jfr,nativemem=5000000"
1603        );
1604    }
1605
1606    #[test]
1607    fn test_local_reporter_has_no_metadata() {
1608        // Check that with_local_reporter sets some configuration
1609        let reporter = ProfilerBuilder::default().with_local_reporter(".");
1610        assert_eq!(
1611            format!("{:?}", reporter.reporter),
1612            r#"Some(LocalReporter { directory: "." })"#
1613        );
1614        match reporter.agent_metadata {
1615            Some(AgentMetadata::NoMetadata) => {}
1616            bad => panic!("{bad:?}"),
1617        };
1618    }
1619
1620    /// A reporter that tracks both async and blocking reports separately.
1621    struct BlockingMockReporter {
1622        async_tx: tokio::sync::mpsc::Sender<String>,
1623        blocking_reports: Arc<std::sync::Mutex<Vec<String>>>,
1624    }
1625    impl std::fmt::Debug for BlockingMockReporter {
1626        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1627            f.debug_struct("BlockingMockReporter").finish()
1628        }
1629    }
1630
1631    #[async_trait::async_trait]
1632    impl Reporter for BlockingMockReporter {
1633        async fn report(
1634            &self,
1635            jfr: Vec<u8>,
1636            _metadata: &ReportMetadata,
1637        ) -> Result<(), Box<dyn std::error::Error + Send>> {
1638            self.async_tx
1639                .send(String::from_utf8(jfr).unwrap())
1640                .await
1641                .unwrap();
1642            Ok(())
1643        }
1644
1645        fn report_blocking(
1646            &self,
1647            jfr_path: &Path,
1648            _metadata: &ReportMetadata,
1649        ) -> Result<(), Box<dyn std::error::Error + Send>> {
1650            let jfr = std::fs::read(jfr_path).map_err(|e| Box::new(e) as _)?;
1651            self.blocking_reports
1652                .lock()
1653                .unwrap()
1654                .push(String::from_utf8(jfr).unwrap());
1655            Ok(())
1656        }
1657    }
1658
1659    /// Simulates a runtime shutdown while the profiler is running.
1660    /// The profiler should call report_blocking on drop to flush the
1661    /// last sample.
1662    #[test]
1663    fn test_profiler_report_on_drop() {
1664        let blocking_reports = Arc::new(std::sync::Mutex::new(Vec::new()));
1665
1666        let rt = tokio::runtime::Builder::new_current_thread()
1667            .enable_all()
1668            .start_paused(true)
1669            .build()
1670            .unwrap();
1671
1672        let reports_clone = blocking_reports.clone();
1673        rt.block_on(async {
1674            let (async_tx, mut async_rx) = tokio::sync::mpsc::channel::<String>(10);
1675            let agent = ProfilerBuilder::default()
1676                .with_reporter(BlockingMockReporter {
1677                    async_tx,
1678                    blocking_reports: reports_clone,
1679                })
1680                .with_custom_agent_metadata(AgentMetadata::NoMetadata)
1681                .build();
1682            // Detach so the stop channel doesn't trigger a graceful stop
1683            // when the block_on future returns.
1684            agent
1685                .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1686                    counter: AtomicU32::new(0),
1687                })
1688                .unwrap()
1689                .detach();
1690
1691            // Wait for first async report to confirm profiler is running
1692            let jfr = async_rx.recv().await.unwrap();
1693            assert_eq!(jfr, "JFR0");
1694            // Return without stopping — runtime drop will cancel the task.
1695        });
1696
1697        // Runtime shutdown cancels all tasks, triggering ProfilerTaskState::Drop.
1698        drop(rt);
1699
1700        let reports = blocking_reports.lock().unwrap();
1701        assert_eq!(
1702            reports.len(),
1703            1,
1704            "expected exactly one blocking report on drop"
1705        );
1706        assert_eq!(reports[0], "JFR1");
1707    }
1708}