Skip to main content

async_profiler_agent/
profiler.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! A profiler that periodically uploads profiling samples of your program to a [Reporter]
5
6use crate::{
7    asprof::{self, AsProfError},
8    metadata::{AgentMetadata, ReportMetadata},
9    reporter::{Reporter, local::LocalReporter},
10};
11use std::{
12    fs::File,
13    io,
14    path::{Path, PathBuf},
15    time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
16};
17use thiserror::Error;
18
19struct JfrFile {
20    active: std::fs::File,
21    inactive: std::fs::File,
22}
23
24impl JfrFile {
25    #[cfg(target_os = "linux")]
26    fn new() -> Result<Self, io::Error> {
27        Ok(Self {
28            active: tempfile::tempfile()?,
29            inactive: tempfile::tempfile()?,
30        })
31    }
32
33    #[cfg(not(target_os = "linux"))]
34    fn new() -> Result<Self, io::Error> {
35        Err(io::Error::other(
36            "async-profiler is only supported on Linux",
37        ))
38    }
39
40    fn swap(&mut self) {
41        std::mem::swap(&mut self.active, &mut self.inactive);
42    }
43
44    #[cfg(target_os = "linux")]
45    fn file_path(file: &std::fs::File) -> PathBuf {
46        use std::os::fd::AsRawFd;
47
48        format!("/proc/self/fd/{}", file.as_raw_fd()).into()
49    }
50
51    #[cfg(not(target_os = "linux"))]
52    fn file_path(_file: &std::fs::File) -> PathBuf {
53        unimplemented!()
54    }
55
56    fn active_path(&self) -> PathBuf {
57        Self::file_path(&self.active)
58    }
59
60    fn inactive_path(&self) -> PathBuf {
61        Self::file_path(&self.inactive)
62    }
63
64    fn empty_inactive_file(&mut self) -> Result<(), io::Error> {
65        // Empty the file, or create it for the first time if the profiler hasn't
66        // started yet.
67        File::create(Self::file_path(&self.inactive))?;
68        tracing::debug!(message = "emptied the file");
69        Ok(())
70    }
71}
72
73/// Options for configuring the async-profiler behavior.
74/// Currently supports:
75/// - Native memory allocation tracking
76#[derive(Debug, Default, Clone)]
77#[non_exhaustive]
78pub struct ProfilerOptions {
79    /// If set, the profiler will collect information about
80    /// native memory allocations.
81    ///
82    /// The value is the interval in bytes or in other units,
83    /// if followed by k (kilobytes), m (megabytes), or g (gigabytes).
84    /// For example, `"10m"` will sample an allocation for every
85    /// 10 megabytes of memory allocated. Passing `"0"` will sample
86    /// all allocations.
87    ///
88    /// See [ProfilingModes in the async-profiler docs] for more details.
89    ///
90    /// [ProfilingModes in the async-profiler docs]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilingModes.md#native-memory-leaks
91    pub native_mem: Option<String>,
92    cpu_interval: Option<u128>,
93    wall_clock_millis: Option<u128>,
94}
95
96const DEFAULT_CPU_INTERVAL_NANOS: u128 = 100_000_000;
97const DEFAULT_WALL_CLOCK_INTERVAL_MILLIS: u128 = 1_000;
98
99impl ProfilerOptions {
100    /// Convert the profiler options to a string of arguments for the async-profiler.
101    pub fn to_args_string(&self, jfr_file_path: &std::path::Path) -> String {
102        let mut args = format!(
103            "start,event=cpu,interval={},wall={}ms,jfr,cstack=dwarf,file={}",
104            self.cpu_interval.unwrap_or(DEFAULT_CPU_INTERVAL_NANOS),
105            self.wall_clock_millis
106                .unwrap_or(DEFAULT_WALL_CLOCK_INTERVAL_MILLIS),
107            jfr_file_path.display()
108        );
109        if let Some(ref native_mem) = self.native_mem {
110            args.push_str(&format!(",nativemem={native_mem}"));
111        }
112        args
113    }
114}
115
116/// Builder for [`ProfilerOptions`].
117#[derive(Debug, Default)]
118pub struct ProfilerOptionsBuilder {
119    native_mem: Option<String>,
120    cpu_interval: Option<u128>,
121    wall_clock_millis: Option<u128>,
122}
123
124impl ProfilerOptionsBuilder {
125    /// Same as [ProfilerOptionsBuilder::with_native_mem_bytes], but pass
126    /// the string input directly to async_profiler.
127    ///
128    /// The value is the interval in bytes or in other units,
129    /// if followed by k (kilobytes), m (megabytes), or g (gigabytes).
130    ///
131    /// Prefer using [ProfilerOptionsBuilder::with_native_mem_bytes], since it's
132    /// type-checked.
133    ///
134    /// ### Examples
135    ///
136    /// This will sample allocations for every 10 megabytes allocated:
137    ///
138    /// ```
139    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
140    /// # use async_profiler_agent::profiler::SpawnError;
141    /// # #[tokio::main]
142    /// # async fn main() -> Result<(), SpawnError> {
143    /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build();
144    /// let profiler = ProfilerBuilder::default()
145    ///     .with_profiler_options(opts)
146    ///     .with_local_reporter("/tmp/profiles")
147    ///     .build();
148    /// # if false { // don't spawn the profiler in doctests
149    /// let profiler = profiler.spawn_controllable()?;
150    /// // ... your program goes here
151    /// profiler.stop().await; // make sure the last profile is flushed
152    /// # }
153    /// # Ok(())
154    /// # }
155    /// ```
156    pub fn with_native_mem(mut self, native_mem_interval: String) -> Self {
157        self.native_mem = Some(native_mem_interval);
158        self
159    }
160
161    /// If set, the profiler will collect information about
162    /// native memory allocations.
163    ///
164    /// The argument passed is the profiling interval - the profiler will
165    /// sample allocations every about that many bytes.
166    ///
167    /// See [ProfilingModes in the async-profiler docs] for more details.
168    ///
169    /// [ProfilingModes in the async-profiler docs]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilingModes.md#native-memory-leaks
170    ///
171    /// ### Examples
172    ///
173    /// This will sample allocations for every 10 megabytes allocated:
174    ///
175    /// ```
176    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
177    /// # use async_profiler_agent::profiler::SpawnError;
178    /// # #[tokio::main]
179    /// # async fn main() -> Result<(), SpawnError> {
180    /// let opts = ProfilerOptionsBuilder::default().with_native_mem_bytes(10_000_000).build();
181    /// let profiler = ProfilerBuilder::default()
182    ///     .with_profiler_options(opts)
183    ///     .with_local_reporter("/tmp/profiles")
184    ///     .build();
185    /// # if false { // don't spawn the profiler in doctests
186    /// let profiler = profiler.spawn_controllable()?;
187    /// // ... your program goes here
188    /// profiler.stop().await; // make sure the last profile is flushed
189    /// # }
190    /// # Ok(())
191    /// # }
192    /// ```
193    ///
194    /// This will sample every allocation (potentially slow):
195    /// ```
196    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
197    /// # use async_profiler_agent::profiler::SpawnError;
198    /// # #[tokio::main]
199    /// # async fn main() -> Result<(), SpawnError> {
200    /// let opts = ProfilerOptionsBuilder::default().with_native_mem_bytes(0).build();
201    /// let profiler = ProfilerBuilder::default()
202    ///     .with_profiler_options(opts)
203    ///     .with_local_reporter("/tmp/profiles")
204    ///     .build();
205    /// # if false { // don't spawn the profiler in doctests
206    /// let profiler = profiler.spawn_controllable()?;
207    /// // ... your program goes here
208    /// profiler.stop().await; // make sure the last profile is flushed
209    /// # }
210    /// # Ok(())
211    /// # }
212    /// ```
213    pub fn with_native_mem_bytes(mut self, native_mem_interval: usize) -> Self {
214        self.native_mem = Some(native_mem_interval.to_string());
215        self
216    }
217
218    /// Sets the interval in which the profiler will collect
219    /// CPU-time samples, via the [async-profiler `interval` option].
220    ///
221    /// CPU-time samples (JFR `jdk.ExecutionSample`) sample only threads that
222    /// are currently running on a CPU, not threads that are sleeping.
223    ///
224    /// It can use a higher frequency than wall-clock sampling since the
225    /// number of the threads that are running on a CPU at a given time is
226    /// naturally limited by the number of CPUs, while the number of sleeping
227    /// threads can be much larger.
228    ///
229    /// The default is to do a CPU-time sample every 100 milliseconds.
230    ///
231    /// The async-profiler agent collects both CPU time and wall-clock time
232    /// samples, so this function should normally be used along with
233    /// [ProfilerOptionsBuilder::with_wall_clock_interval].
234    ///
235    /// [async-profiler `interval` option]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilerOptions.md#options-applicable-to-any-output-format
236    ///
237    /// ### Examples
238    ///
239    /// This will sample allocations for every 10 CPU milliseconds (when running)
240    /// and 100 wall-clock milliseconds (running or sleeping):
241    ///
242    /// ```
243    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
244    /// # use async_profiler_agent::profiler::SpawnError;
245    /// # use std::time::Duration;
246    /// # #[tokio::main]
247    /// # async fn main() -> Result<(), SpawnError> {
248    /// let opts = ProfilerOptionsBuilder::default()
249    ///     .with_cpu_interval(Duration::from_millis(10))
250    ///     .with_wall_clock_interval(Duration::from_millis(100))
251    ///     .build();
252    /// let profiler = ProfilerBuilder::default()
253    ///     .with_profiler_options(opts)
254    ///     .with_local_reporter("/tmp/profiles")
255    ///     .build();
256    /// # if false { // don't spawn the profiler in doctests
257    /// let profiler = profiler.spawn_controllable()?;
258    /// // ... your program goes here
259    /// profiler.stop().await; // make sure the last profile is flushed
260    /// # }
261    /// # Ok(())
262    /// # }
263    /// ```
264    pub fn with_cpu_interval(mut self, cpu_interval: Duration) -> Self {
265        self.cpu_interval = Some(cpu_interval.as_nanos());
266        self
267    }
268
269    /// Sets the interval, in milliseconds, in which the profiler will collect
270    /// wall-clock samples, via the [async-profiler `wall` option].
271    ///
272    /// Wall-clock samples (JFR `profiler.WallClockSample`) sample threads
273    /// whether they are sleeping or running, and can therefore be
274    /// very useful for finding threads that are blocked, for example
275    /// on a synchronous lock or a slow system call.
276    ///
277    /// When using Tokio, since tasks are not threads, tasks that are not
278    /// currently running will not be sampled by a wall clock sample. However,
279    /// a wall clock sample is still very useful in Tokio, since it is what
280    /// you want to catch tasks that are blocking a thread by waiting on
281    /// synchronous operations.
282    ///
283    /// The default is to do a wall-clock sample every second.
284    ///
285    /// The async-profiler agent collects both CPU time and wall-clock time
286    /// samples, so this function should normally be used along with
287    /// [ProfilerOptionsBuilder::with_cpu_interval].
288    ///
289    /// [async-profiler `wall` option]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilerOptions.md#options-applicable-to-any-output-format
290    ///
291    /// ### Examples
292    ///
293    /// This will sample allocations for every 10 CPU milliseconds (when running)
294    /// and 100 wall-clock milliseconds (running or sleeping):
295    ///
296    /// ```
297    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
298    /// # use async_profiler_agent::profiler::SpawnError;
299    /// # use std::time::Duration;
300    /// # #[tokio::main]
301    /// # async fn main() -> Result<(), SpawnError> {
302    /// let opts = ProfilerOptionsBuilder::default()
303    ///     .with_cpu_interval(Duration::from_millis(10))
304    ///     .with_wall_clock_interval(Duration::from_millis(10))
305    ///     .build();
306    /// let profiler = ProfilerBuilder::default()
307    ///     .with_profiler_options(opts)
308    ///     .with_local_reporter("/tmp/profiles")
309    ///     .build();
310    /// # if false { // don't spawn the profiler in doctests
311    /// let profiler = profiler.spawn_controllable()?;
312    /// // ... your program goes here
313    /// profiler.stop().await; // make sure the last profile is flushed
314    /// # }
315    /// # Ok(())
316    /// # }
317    /// ```
318    pub fn with_wall_clock_interval(mut self, wall_clock: Duration) -> Self {
319        self.wall_clock_millis = Some(wall_clock.as_millis());
320        self
321    }
322
323    /// Build the [`ProfilerOptions`] from the builder.
324    pub fn build(self) -> ProfilerOptions {
325        ProfilerOptions {
326            native_mem: self.native_mem,
327            wall_clock_millis: self.wall_clock_millis,
328            cpu_interval: self.cpu_interval,
329        }
330    }
331}
332
333/// Builds a [`Profiler`], panicking if any required fields were not set by the
334/// time `build` is called.
335#[derive(Debug, Default)]
336pub struct ProfilerBuilder {
337    reporting_interval: Option<Duration>,
338    reporter: Option<Box<dyn Reporter + Send + Sync>>,
339    agent_metadata: Option<AgentMetadata>,
340    profiler_options: Option<ProfilerOptions>,
341}
342
343impl ProfilerBuilder {
344    /// Sets the reporting interval (default: 30 seconds).
345    ///
346    /// This is the interval that samples are *reported* to the backend,
347    /// and is unrelated to the interval at which the application
348    /// is *sampled* by async profiler, which is controlled by
349    /// [ProfilerOptionsBuilder::with_cpu_interval] and
350    /// [ProfilerOptionsBuilder::with_wall_clock_interval].
351    ///
352    /// Most users should not change this setting.
353    ///
354    /// ## Example
355    ///
356    /// ```no_run
357    /// # use async_profiler_agent::profiler::SpawnError;
358    /// # #[tokio::main]
359    /// # async fn main() -> Result<(), SpawnError> {
360    /// # use std::path::PathBuf;
361    /// # use std::time::Duration;
362    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
363    /// # let path = PathBuf::from(".");
364    /// let agent = ProfilerBuilder::default()
365    ///     .with_local_reporter(path)
366    ///     .with_reporting_interval(Duration::from_secs(15))
367    ///     .build()
368    ///     .spawn_controllable()?;
369    /// // .. your program goes here
370    /// agent.stop().await; // make sure the last profile is flushed
371    /// # Ok::<_, SpawnError>(())
372    /// # }
373    /// ```
374    pub fn with_reporting_interval(mut self, i: Duration) -> ProfilerBuilder {
375        self.reporting_interval = Some(i);
376        self
377    }
378
379    /// Sets the [`Reporter`], which is used to upload the collected profiling
380    /// data. Common reporters are [`LocalReporter`], and, with the `s3-no-defaults`
381    /// feature enabled,
382    #[cfg_attr(not(feature = "s3-no-defaults"), doc = "`S3Reporter`.")]
383    #[cfg_attr(feature = "s3-no-defaults", doc = "[`S3Reporter`].")]
384    /// It is also possible to write your own [`Reporter`].
385    ///
386    /// It's normally easier to use [`LocalReporter`] directly via
387    /// [`ProfilerBuilder::with_local_reporter`].
388    ///
389    /// If you want to output to multiple reporters, you can use
390    /// [`MultiReporter`].
391    ///
392    /// [`LocalReporter`]: crate::reporter::local::LocalReporter
393    /// [`MultiReporter`]: crate::reporter::multi::MultiReporter
394    #[cfg_attr(
395        feature = "s3-no-defaults",
396        doc = "[`S3Reporter`]: crate::reporter::s3::S3Reporter"
397    )]
398    ///
399    #[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example.md"))]
400    pub fn with_reporter(mut self, r: impl Reporter + Send + Sync + 'static) -> ProfilerBuilder {
401        self.reporter = Some(Box::new(r));
402        self
403    }
404
405    /// Sets the profiler to ues [LocalReporter], which will write `.jfr` files to `path`,
406    /// and disables metadata auto-detection (see [`ProfilerBuilder::with_custom_agent_metadata`])
407    /// since the [LocalReporter] does not need that.
408    ///
409    /// This is useful for testing, since metadata auto-detection currently only works
410    /// on [Amazon EC2] or [Amazon Fargate] instances.
411    ///
412    /// The local reporter should normally not be used in production, since it will
413    /// not clean up JFR files. Instead, you can use a pre-existing [`Reporter`]
414    /// or write your own (see [`ProfilerBuilder::with_reporter`]).
415    ///
416    /// [Amazon EC2]: https://aws.amazon.com/ec2
417    /// [Amazon Fargate]: https://aws.amazon.com/fargate
418    ///
419    /// ## Example
420    ///
421    /// This will write profiles as `.jfr` files to `./path-to-profiles`:
422    ///
423    /// ```no_run
424    /// # use std::path::PathBuf;
425    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
426    /// # use async_profiler_agent::reporter::local::LocalReporter;
427    /// # use async_profiler_agent::metadata::AgentMetadata;
428    /// let path = PathBuf::from("./path-to-profiles");
429    /// let agent = ProfilerBuilder::default()
430    ///     .with_local_reporter(path)
431    ///     .build()
432    ///     .spawn()?;
433    /// # Ok::<_, SpawnError>(())
434    /// ```
435    pub fn with_local_reporter(mut self, path: impl Into<PathBuf>) -> ProfilerBuilder {
436        self.reporter = Some(Box::new(LocalReporter::new(path.into())));
437        self.with_custom_agent_metadata(AgentMetadata::NoMetadata)
438    }
439
440    /// Provide custom agent metadata.
441    ///
442    /// The async-profiler Rust agent sends metadata to the [Reporter] with
443    /// the identity of the current host and process, which is normally
444    /// transmitted as `metadata.json` within the generated `.zip` file,
445    /// using the schema format [`reporter::s3::MetadataJson`].
446    ///
447    /// That metadata can later be used by tooling to be able to sort
448    /// profiling reports by host.
449    ///
450    /// async-profiler Rust agent will by default try to fetch the metadata
451    /// using [IMDS] when running on [Amazon EC2] or [Amazon Fargate], and
452    /// will error if it's unable to find it. If you are running the
453    /// async-profiler agent on any other form of compute,
454    /// you will need to create and attach your own metadata
455    /// by calling this function.
456    ///
457    #[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example-custom-metadata.md"))]
458    /// [`reporter::s3::MetadataJson`]: crate::reporter::s3::MetadataJson
459    /// [Amazon EC2]: https://aws.amazon.com/ec2
460    /// [Amazon Fargate]: https://aws.amazon.com/fargate
461    /// [IMDS]: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
462    pub fn with_custom_agent_metadata(mut self, j: AgentMetadata) -> ProfilerBuilder {
463        self.agent_metadata = Some(j);
464        self
465    }
466
467    /// Provide custom profiler options.
468    ///
469    /// ### Example
470    ///
471    /// This will sample allocations for every 10 megabytes allocated:
472    ///
473    /// ```
474    /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
475    /// # use async_profiler_agent::profiler::SpawnError;
476    /// # #[tokio::main]
477    /// # async fn main() -> Result<(), SpawnError> {
478    /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build();
479    /// let profiler = ProfilerBuilder::default()
480    ///     .with_profiler_options(opts)
481    ///     .with_local_reporter("/tmp/profiles")
482    ///     .build();
483    /// # if false { // don't spawn the profiler in doctests
484    /// let profiler = profiler.spawn_controllable()?;
485    /// // ... your program goes here
486    /// profiler.stop().await; // make sure the last profile is flushed
487    /// # }
488    /// # Ok(())
489    /// # }
490    /// ```
491    pub fn with_profiler_options(mut self, c: ProfilerOptions) -> ProfilerBuilder {
492        self.profiler_options = Some(c);
493        self
494    }
495
496    /// Turn this builder into a profiler!
497    pub fn build(self) -> Profiler {
498        Profiler {
499            reporting_interval: self.reporting_interval.unwrap_or(Duration::from_secs(30)),
500            reporter: self.reporter.expect("reporter is required"),
501            agent_metadata: self.agent_metadata,
502            profiler_options: self.profiler_options.unwrap_or_default(),
503        }
504    }
505}
506
507enum Status {
508    Idle,
509    Starting,
510    Running(SystemTime),
511}
512
513/// This type provides wrapper APIs over [`asprof::AsProf`], to allow tracking
514/// of the state of the profiler. The primary benefit of this is RAII - when
515/// this type drops, it will stop the profiler if it's running.
516struct ProfilerState<E: ProfilerEngine> {
517    // this is only None in the destructor when stopping the async-profiler fails
518    jfr_file: Option<JfrFile>,
519    asprof: E,
520    status: Status,
521    profiler_options: ProfilerOptions,
522}
523
524impl<E: ProfilerEngine> ProfilerState<E> {
525    fn new(asprof: E, profiler_options: ProfilerOptions) -> Result<Self, io::Error> {
526        Ok(Self {
527            jfr_file: Some(JfrFile::new()?),
528            asprof,
529            status: Status::Idle,
530            profiler_options,
531        })
532    }
533
534    fn start(&mut self) -> Result<(), AsProfError> {
535        let jfr_file = self
536            .jfr_file
537            .as_ref()
538            .ok_or_else(|| io::Error::other("jfr file missing (dropped during stop failure?)"))?;
539        let active = jfr_file.active_path();
540        // drop guard - make sure the files are leaked if the profiler might have started
541        self.status = Status::Starting;
542        E::start_async_profiler(&self.asprof, &active, &self.profiler_options)?;
543        self.status = Status::Running(SystemTime::now());
544        Ok(())
545    }
546
547    fn stop(&mut self) -> Result<Option<SystemTime>, AsProfError> {
548        E::stop_async_profiler()?;
549        let status = std::mem::replace(&mut self.status, Status::Idle);
550        Ok(match status {
551            Status::Idle | Status::Starting => None,
552            Status::Running(since) => Some(since),
553        })
554    }
555
556    fn is_started(&self) -> bool {
557        matches!(self.status, Status::Running(_))
558    }
559}
560
561impl<E: ProfilerEngine> Drop for ProfilerState<E> {
562    fn drop(&mut self) {
563        match self.status {
564            Status::Running(_) => {
565                // In Drop, we can't use spawn_blocking, so we call the blocking operation
566                // directly. We skip the status reset that self.stop() would do since the
567                // struct is being dropped.
568                if let Err(err) = E::stop_async_profiler() {
569                    // SECURITY: avoid removing the JFR file if stopping the profiler fails,
570                    // to avoid symlink races
571                    std::mem::forget(self.jfr_file.take());
572                    // XXX: Rust defines leaking resources during drop as safe.
573                    tracing::warn!(?err, "unable to stop profiler during drop glue");
574                }
575            }
576            Status::Idle => {}
577            Status::Starting => {
578                // SECURITY: avoid removing the JFR file if stopping the profiler fails,
579                // to avoid symlink races
580                std::mem::forget(self.jfr_file.take());
581            }
582        }
583    }
584}
585
586pub(crate) trait ProfilerEngine: Send + Sync + 'static {
587    fn init_async_profiler() -> Result<(), asprof::AsProfError>;
588    fn start_async_profiler(
589        &self,
590        jfr_file_path: &Path,
591        options: &ProfilerOptions,
592    ) -> Result<(), asprof::AsProfError>;
593    fn stop_async_profiler() -> Result<(), asprof::AsProfError>;
594}
595
596/// Holds the profiler task state and performs a final synchronous report
597/// when the task is cancelled (e.g. Tokio runtime shutdown) before a
598/// graceful stop. The local reporter will flush its contents on drop.
599/// For other reporters, you must call `RunningProfiler::stop().await`
600/// to ensure the last sample is uploaded.
601struct ProfilerTaskState<E: ProfilerEngine> {
602    // Option so profiler_tick can .take() the state to move it into spawn_blocking
603    state: Option<ProfilerState<E>>,
604    reporter: Box<dyn Reporter + Send + Sync>,
605    agent_metadata: Option<AgentMetadata>,
606    reporting_interval: Duration,
607    completed_normally: bool,
608}
609
610impl<E: ProfilerEngine> ProfilerTaskState<E> {
611    fn try_final_report(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
612        let state = self.state.as_mut().ok_or("profiler state missing")?;
613        let start = state.stop()?.ok_or("profiler was not running")?;
614        let jfr_file = state.jfr_file.as_ref().ok_or("jfr file missing")?;
615        let jfr_path = jfr_file.active_path();
616        if jfr_path.metadata()?.len() == 0 {
617            return Ok(());
618        }
619        let metadata = ReportMetadata {
620            instance: self
621                .agent_metadata
622                .as_ref()
623                .unwrap_or(&AgentMetadata::NoMetadata),
624            start: start.duration_since(UNIX_EPOCH)?,
625            end: SystemTime::now().duration_since(UNIX_EPOCH)?,
626            reporting_interval: self.reporting_interval,
627        };
628        self.reporter
629            .report_blocking(&jfr_path, &metadata)
630            .map_err(|e| e.to_string())?;
631        Ok(())
632    }
633}
634
635impl<E: ProfilerEngine> Drop for ProfilerTaskState<E> {
636    fn drop(&mut self) {
637        let is_started = self.state.as_ref().is_some_and(|s| s.is_started());
638        if self.completed_normally || !is_started {
639            return;
640        }
641        tracing::info!("profiler task cancelled, attempting final report on drop");
642        if let Err(err) = self.try_final_report() {
643            tracing::warn!(?err, "failed to report on drop");
644        }
645    }
646}
647
648#[derive(Debug, Error)]
649#[non_exhaustive]
650enum TickError {
651    #[error(transparent)]
652    AsProf(#[from] AsProfError),
653    #[error(transparent)]
654    #[cfg(feature = "aws-metadata-no-defaults")]
655    Metadata(Box<crate::metadata::aws::AwsProfilerMetadataError>),
656    #[error("reporter: {0}")]
657    Reporter(Box<dyn std::error::Error + Send>),
658    #[error("broken clock: {0}")]
659    BrokenClock(#[from] SystemTimeError),
660    #[error("jfr read error: {0}")]
661    JfrRead(io::Error),
662    #[error("empty inactive file error: {0}")]
663    EmptyInactiveFile(io::Error),
664    #[error("jfr file missing (dropped during stop failure?)")]
665    JfrFileMissing,
666    #[error("profiler state missing (previous tick panicked?)")]
667    StateMissing,
668    #[error("spawn_blocking task failed: {0}")]
669    SpawnBlocking(tokio::task::JoinError),
670}
671
672#[cfg(feature = "aws-metadata-no-defaults")]
673impl From<crate::metadata::aws::AwsProfilerMetadataError> for TickError {
674    fn from(err: crate::metadata::aws::AwsProfilerMetadataError) -> Self {
675        TickError::Metadata(Box::new(err))
676    }
677}
678
679#[derive(Debug, Error)]
680#[non_exhaustive]
681/// An error that happened spawning a profiler
682pub enum SpawnError {
683    /// Error from async-profiler
684    #[error(transparent)]
685    AsProf(#[from] asprof::AsProfError),
686    /// Error writing to a tempfile
687    #[error("tempfile error")]
688    TempFile(#[source] io::Error),
689}
690
691#[derive(Debug, Error)]
692#[non_exhaustive]
693/// An error from [`Profiler::spawn_thread`]
694pub enum SpawnThreadError {
695    /// Error from async-profiler
696    #[error(transparent)]
697    AsProf(#[from] SpawnError),
698    /// Error constructing Tokio runtime
699    #[error("constructing Tokio runtime")]
700    ConstructRt(#[source] io::Error),
701}
702
703// no control messages currently
704enum Control {}
705
706/// A handle to a running profiler
707///
708/// Currently just allows for stopping the profiler.
709///
710/// Dropping this handle will request that the profiler will stop.
711#[must_use = "dropping this stops the profiler, call .detach() to detach"]
712pub struct RunningProfiler {
713    stop_channel: tokio::sync::oneshot::Sender<Control>,
714    join_handle: tokio::task::JoinHandle<()>,
715}
716
717impl RunningProfiler {
718    /// Request that the current profiler stops and wait until it exits.
719    ///
720    /// This will cause the currently-pending profile information to be flushed.
721    ///
722    /// After this function returns, it is correct and safe to [spawn] a new
723    /// [Profiler], possibly with a different configuration. Therefore,
724    /// this function can be used to "reconfigure" a profiler by stopping
725    /// it and then starting a new one with a different configuration.
726    ///
727    /// [spawn]: Profiler::spawn_controllable
728    pub async fn stop(self) {
729        drop(self.stop_channel);
730        let _ = self.join_handle.await;
731    }
732
733    /// Like [Self::detach], but returns a JoinHandle. This is currently not a public API.
734    fn detach_inner(self) -> tokio::task::JoinHandle<()> {
735        tokio::task::spawn(async move {
736            // move the control channel to the spawned task. this way, it will be dropped
737            // just when the task is aborted.
738            let _abort_channel = self.stop_channel;
739            self.join_handle.await.ok();
740        })
741    }
742
743    /// Detach this profiler. This will prevent the profiler from being stopped
744    /// when this handle is dropped. You should call this (or [Profiler::spawn]
745    /// instead of [Profiler::spawn_controllable], which does the same thing)
746    /// if you don't intend to reconfigure your profiler at runtime.
747    pub fn detach(self) {
748        self.detach_inner();
749    }
750
751    /// Spawns this [RunningProfiler] into a separate thread within a new Tokio runtime,
752    /// and returns a [RunningProfilerThread] attached to it.
753    fn spawn_attached(
754        self,
755        runtime: tokio::runtime::Runtime,
756        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
757    ) -> RunningProfilerThread {
758        RunningProfilerThread {
759            stop_channel: self.stop_channel,
760            join_handle: spawn_fn(Box::new(move || {
761                let _ = runtime.block_on(self.join_handle);
762            })),
763        }
764    }
765
766    /// Spawns this [RunningProfiler] into a separate thread within a new Tokio runtime,
767    /// and detaches it.
768    fn spawn_detached(
769        self,
770        runtime: tokio::runtime::Runtime,
771        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
772    ) {
773        spawn_fn(Box::new(move || {
774            let _stop_channel = self.stop_channel;
775            let _ = runtime.block_on(self.join_handle);
776        }));
777    }
778}
779
780/// A handle to a running profiler, running on a separate thread.
781///
782/// Currently just allows for stopping the profiler.
783///
784/// Dropping this handle will request that the profiler will stop.
785#[must_use = "dropping this stops the profiler, call .detach() to detach"]
786pub struct RunningProfilerThread {
787    stop_channel: tokio::sync::oneshot::Sender<Control>,
788    join_handle: std::thread::JoinHandle<()>,
789}
790
791impl RunningProfilerThread {
792    /// Request that the current profiler stops and wait until it exits.
793    ///
794    /// This will cause the currently-pending profile information to be flushed.
795    ///
796    /// After this function returns, it is correct and safe to [spawn] a new
797    /// [Profiler], possibly with a different configuration. Therefore,
798    /// this function can be used to "reconfigure" a profiler by stopping
799    /// it and then starting a new one with a different configuration.
800    ///
801    /// [spawn]: Profiler::spawn_controllable
802    pub fn stop(self) {
803        drop(self.stop_channel);
804        let _ = self.join_handle.join();
805    }
806}
807
808/// Rust profiler based on [async-profiler].
809///
810/// Spawning a profiler can be done either in an attached (controllable)
811/// mode, which allows for stopping the profiler (and, in fact, stops
812/// it when the relevant handle is dropped), or in detached mode,
813/// in which the profiler keeps running forever. Applications that can
814/// shut down the profiler at run-time, for example applications that
815/// support reconfiguration of a running profiler, generally want to use
816/// controllable mode. Other applications (most of them) should use
817/// detached mode.
818///
819/// In addition, the profiler can either be spawned into the current Tokio
820/// runtime, or into a new one. Normally, applications should spawn
821/// the profiler into their own Tokio runtime, but applications that
822/// don't have a default Tokio runtime should spawn it into a
823/// different one
824///
825/// This leaves 4 functions:
826/// 1. [Self::spawn] - detached, same runtime
827/// 2. [Self::spawn_thread_to_runtime] - detached, different runtime
828/// 3. [Self::spawn_controllable] - controllable, same runtime
829/// 4. [Self::spawn_controllable_thread_to_runtime] - controllable, different runtime
830///
831/// In addition, there's a helper function that just spawns the profiler
832/// to a new runtime in a new thread, for applications that don't have
833/// a Tokio runtime and don't need complex control:
834///
835/// 5. [Self::spawn_thread] - detached, new runtime in a new thread
836///
837/// [async-profiler]: https://github.com/async-profiler/async-profiler
838pub struct Profiler {
839    reporting_interval: Duration,
840    reporter: Box<dyn Reporter + Send + Sync>,
841    agent_metadata: Option<AgentMetadata>,
842    profiler_options: ProfilerOptions,
843}
844
845impl Profiler {
846    /// Start profiling. The profiler will run in a tokio task at the configured interval.
847    ///
848    /// This is the same as calling [Profiler::spawn_controllable] followed by
849    /// [RunningProfiler::detach], except it returns a [JoinHandle].
850    ///
851    /// The returned [JoinHandle] can be used to detect if the profiler has exited
852    /// due to a fatal error.
853    ///
854    /// This function will fail if it is unable to start async-profiler, for example
855    /// if it can't find or load `libasyncProfiler.so`.
856    ///
857    /// [JoinHandle]: tokio::task::JoinHandle
858    ///
859    /// ### Uploading the last sample
860    ///
861    /// When you return from the Tokio `main`, the agent will terminate without waiting
862    /// for the last profiling JFR to be uploaded. Especially if you have a
863    /// short-running program, if you want to ensure the last profiling JFR
864    /// is uploaded, you should use [Profiler::spawn_controllable] and
865    /// [RunningProfiler::stop] , which allows waiting for the upload
866    /// to finish.
867    ///
868    /// If you do not care about losing the last sample, it is fine to directly
869    /// return from the Tokio `main` without stopping the profiler.
870    ///
871    /// ### Tokio Runtime
872    ///
873    /// This function must be run within a Tokio runtime, otherwise it will panic. If
874    /// your application does not have a `main` Tokio runtime, see
875    /// [Profiler::spawn_thread].
876    ///
877    /// ### Example
878    ///
879    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
880    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
881    ///
882    /// ```
883    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
884    /// # #[tokio::main]
885    /// # async fn main() -> Result<(), SpawnError> {
886    /// let profiler = ProfilerBuilder::default()
887    ///    .with_local_reporter("/tmp/profiles")
888    ///    .build();
889    /// # if false { // don't spawn the profiler in doctests
890    /// profiler.spawn()?;
891    /// # }
892    /// # Ok(())
893    /// # }
894    /// ```
895    pub fn spawn(self) -> Result<tokio::task::JoinHandle<()>, SpawnError> {
896        self.spawn_controllable().map(RunningProfiler::detach_inner)
897    }
898
899    /// Like [Self::spawn], but instead of spawning within the current Tokio
900    /// runtime, spawns within a set Tokio runtime and then runs a thread that calls
901    /// [block_on](tokio::runtime::Runtime::block_on) on that runtime.
902    ///
903    /// If your configuration is standard, use [Profiler::spawn_thread].
904    ///
905    /// If you want to be able to stop the resulting profiler, use
906    /// [Profiler::spawn_controllable_thread_to_runtime].
907    ///
908    /// `spawn_fn` should be [`std::thread::spawn`], or some function that behaves like it (to
909    /// allow for configuring thread properties, for example thread names).
910    ///
911    /// This is to be used when your program does not have a "main" Tokio runtime already set up.
912    ///
913    /// ### Uploading the last sample
914    ///
915    /// When you return from `main`, the agent will terminate without waiting
916    /// for the last profiling JFR to be uploaded. Especially if you have a
917    /// short-running program, if you want to ensure the last profiling JFR
918    /// is uploaded, you should use [Profiler::spawn_controllable_thread_to_runtime]
919    /// and [RunningProfilerThread::stop], which allows waiting for the upload
920    /// to finish.
921    ///
922    /// If you do not care about losing the last sample, it is fine to directly
923    /// return from the Tokio `main` without stopping the profiler.
924    ///
925    /// ### Example
926    ///
927    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
928    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
929    ///
930    /// ```no_run
931    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
932    /// let rt = tokio::runtime::Builder::new_current_thread()
933    ///     .enable_all()
934    ///     .build()?;
935    /// let profiler = ProfilerBuilder::default()
936    ///    .with_local_reporter("/tmp/profiles")
937    ///    .build();
938    ///
939    /// profiler.spawn_thread_to_runtime(
940    ///     rt,
941    ///     |t| {
942    ///         std::thread::Builder::new()
943    ///             .name("asprof-agent".to_owned())
944    ///             .spawn(t)
945    ///             .expect("thread name contains nuls")
946    ///     }
947    /// )?;
948    /// # Ok::<_, anyhow::Error>(())
949    /// ```
950    pub fn spawn_thread_to_runtime(
951        self,
952        runtime: tokio::runtime::Runtime,
953        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
954    ) -> Result<(), SpawnError> {
955        self.spawn_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn)
956    }
957
958    /// Like [Self::spawn], but instead of spawning within the current Tokio
959    /// runtime, spawns within a new Tokio runtime and then runs a thread that calls
960    /// [block_on](tokio::runtime::Runtime::block_on) on that runtime, setting up the runtime
961    /// by itself.
962    ///
963    /// If your configuration is less standard, use [Profiler::spawn_thread_to_runtime]. Calling
964    /// [Profiler::spawn_thread] is equivalent to calling [Profiler::spawn_thread_to_runtime]
965    /// with the following:
966    /// 1. a current thread runtime with background worker threads (these exist
967    ///    for blocking IO) named "asprof-worker"
968    /// 2. a controller thread (the "main" thread of the runtime) named "asprof-agent"
969    ///
970    /// If you want to be able to stop the resulting profiler, use
971    /// [Profiler::spawn_controllable_thread_to_runtime].
972    ///
973    /// This is to be used when your program does not have a "main" Tokio runtime already set up.
974    ///
975    /// ### Uploading the last sample
976    ///
977    /// When you return from `main`, the agent will terminate without waiting
978    /// for the last profiling JFR to be uploaded. Especially if you have a
979    /// short-running program, if you want to ensure the last profiling JFR
980    /// is uploaded, you should use [Profiler::spawn_controllable_thread_to_runtime]
981    /// and [RunningProfilerThread::stop], which allows waiting for the upload
982    /// to finish.
983    ///
984    /// If you do not care about losing the last sample, it is fine to directly
985    /// return from the Tokio `main` without stopping the profiler.
986    ///
987    /// ### Example
988    ///
989    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
990    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
991    ///
992    /// ```no_run
993    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
994    /// # use async_profiler_agent::reporter::local::LocalReporter;
995    /// let profiler = ProfilerBuilder::default()
996    ///    .with_local_reporter("/tmp/profiles")
997    ///    .build();
998    ///
999    /// profiler.spawn_thread()?;
1000    /// # Ok::<_, anyhow::Error>(())
1001    /// ```
1002    pub fn spawn_thread(self) -> Result<(), SpawnThreadError> {
1003        // using "asprof" in thread name to deal with 15 character + \0 length limit
1004        let rt = tokio::runtime::Builder::new_current_thread()
1005            .thread_name("asprof-worker".to_owned())
1006            .enable_all()
1007            .build()
1008            .map_err(SpawnThreadError::ConstructRt)?;
1009        let builder = std::thread::Builder::new().name("asprof-agent".to_owned());
1010        self.spawn_thread_to_runtime(rt, |t| builder.spawn(t).expect("thread name contains nuls"))
1011            .map_err(SpawnThreadError::AsProf)
1012    }
1013
1014    fn spawn_thread_inner<E: ProfilerEngine>(
1015        self,
1016        asprof: E,
1017        runtime: tokio::runtime::Runtime,
1018        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
1019    ) -> Result<(), SpawnError> {
1020        let handle: RunningProfiler = runtime.block_on(async move { self.spawn_inner(asprof) })?;
1021        handle.spawn_detached(runtime, spawn_fn);
1022        Ok(())
1023    }
1024
1025    /// Like [Self::spawn], but returns a [RunningProfiler] that allows for controlling
1026    /// (currently only stopping) the profiler.
1027    ///
1028    /// This allows for changing the configuration of the profiler at runtime, by
1029    /// stopping it and then starting a new Profiler with a new configuration. It
1030    /// also allows for stopping profiling in case the profiler is suspected to
1031    /// cause operational issues.
1032    ///
1033    /// Dropping the returned [RunningProfiler] will cause the profiler to quit,
1034    /// so if your application doen't need to change the profiler's configuration at runtime,
1035    /// it will be easier to use [Profiler::spawn].
1036    ///
1037    /// This function will fail if it is unable to start async-profiler, for example
1038    /// if it can't find or load `libasyncProfiler.so`.
1039    ///
1040    /// ### Uploading the last sample
1041    ///
1042    /// When you return from the Tokio `main`, the agent will terminate without waiting
1043    /// for the last profiling JFR to be uploaded. Especially if you have a
1044    /// short-running program, if you want to ensure the last profiling JFR
1045    /// is uploaded, you should use [RunningProfiler::stop], which allows waiting for
1046    /// the upload to finish.
1047    ///
1048    /// If you do not care about losing the last sample, it is fine to directly
1049    /// return from the Tokio `main` without stopping the profiler.
1050    ///
1051    /// ### Tokio Runtime
1052    ///
1053    /// This function must be run within a Tokio runtime, otherwise it will panic. If
1054    /// your application does not have a `main` Tokio runtime, see
1055    /// [Profiler::spawn_controllable_thread_to_runtime].
1056    ///
1057    /// ### Example
1058    ///
1059    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
1060    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
1061    ///
1062    /// ```no_run
1063    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
1064    /// # #[tokio::main]
1065    /// # async fn main() -> Result<(), SpawnError> {
1066    /// let profiler = ProfilerBuilder::default()
1067    ///    .with_local_reporter("/tmp/profiles")
1068    ///    .build();
1069    ///
1070    /// let profiler = profiler.spawn_controllable()?;
1071    ///
1072    /// // [insert your signaling/monitoring mechanism to have a request to disable
1073    /// // profiling in case of a problem]
1074    /// let got_request_to_disable_profiling = async move {
1075    ///     // ...
1076    /// #   false
1077    /// };
1078    /// // spawn a task that will disable profiling if requested
1079    /// tokio::task::spawn(async move {
1080    ///     if got_request_to_disable_profiling.await {
1081    ///         profiler.stop().await;
1082    ///     }
1083    /// });
1084    /// # Ok(())
1085    /// # }
1086    /// ```
1087    pub fn spawn_controllable(self) -> Result<RunningProfiler, SpawnError> {
1088        self.spawn_inner(asprof::AsProf::builder().build())
1089    }
1090
1091    /// Like [Self::spawn_controllable], but instead of spawning within the current Tokio
1092    /// runtime, spawns within a set Tokio runtime and then runs a thread that calls
1093    /// [block_on](tokio::runtime::Runtime::block_on) on that runtime.
1094    ///
1095    /// `spawn_fn` should be [`std::thread::spawn`], or some function that behaves like it (to
1096    /// allow for configuring thread properties, for example thread names).
1097    ///
1098    /// This is to be used when your program does not have a "main" Tokio runtime already set up.
1099    ///
1100    /// ### Uploading the last sample
1101    ///
1102    /// When you return from `main`, the agent will terminate without waiting
1103    /// for the last profiling JFR to be uploaded. Especially if you have a
1104    /// short-running program, if you want to ensure the last profiling JFR
1105    /// is uploaded, you should use [RunningProfilerThread::stop], which allows waiting
1106    /// for the upload to finish.
1107    ///
1108    /// If you do not care about losing the last sample, it is fine to directly
1109    /// return from the Tokio `main` without stopping the profiler.
1110    ///
1111    /// ### Example
1112    ///
1113    /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
1114    /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
1115    ///
1116    /// ```no_run
1117    /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
1118    /// let rt = tokio::runtime::Builder::new_current_thread()
1119    ///     .enable_all()
1120    ///     .build()?;
1121    /// let profiler = ProfilerBuilder::default()
1122    ///    .with_local_reporter("/tmp/profiles")
1123    ///    .build();
1124    ///
1125    /// let profiler = profiler.spawn_controllable_thread_to_runtime(
1126    ///     rt,
1127    ///     |t| {
1128    ///         std::thread::Builder::new()
1129    ///             .name("asprof-agent".to_owned())
1130    ///             .spawn(t)
1131    ///             .expect("thread name contains nuls")
1132    ///     }
1133    /// )?;
1134    ///
1135    /// # fn got_request_to_disable_profiling() -> bool { false }
1136    /// // spawn a task that will disable profiling if requested
1137    /// std::thread::spawn(move || {
1138    ///     if got_request_to_disable_profiling() {
1139    ///         profiler.stop();
1140    ///     }
1141    /// });
1142    /// # Ok::<_, anyhow::Error>(())
1143    /// ```
1144    pub fn spawn_controllable_thread_to_runtime(
1145        self,
1146        runtime: tokio::runtime::Runtime,
1147        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
1148    ) -> Result<RunningProfilerThread, SpawnError> {
1149        self.spawn_controllable_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn)
1150    }
1151
1152    fn spawn_controllable_thread_inner<E: ProfilerEngine>(
1153        self,
1154        asprof: E,
1155        runtime: tokio::runtime::Runtime,
1156        spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
1157    ) -> Result<RunningProfilerThread, SpawnError> {
1158        let handle = runtime.block_on(async move { self.spawn_inner(asprof) })?;
1159        Ok(handle.spawn_attached(runtime, spawn_fn))
1160    }
1161
1162    fn spawn_inner<E: ProfilerEngine>(self, asprof: E) -> Result<RunningProfiler, SpawnError> {
1163        // Initialize async profiler - needs to be done once.
1164        E::init_async_profiler()?;
1165        tracing::info!("successfully initialized async profiler.");
1166
1167        let mut sampling_ticker = tokio::time::interval(self.reporting_interval);
1168        let (stop_channel, mut stop_rx) = tokio::sync::oneshot::channel();
1169
1170        // Get profiles at the configured interval rate.
1171        let join_handle = tokio::spawn(async move {
1172            let state = match ProfilerState::new(asprof, self.profiler_options) {
1173                Ok(state) => state,
1174                Err(err) => {
1175                    tracing::error!(?err, "unable to create profiler state");
1176                    return;
1177                }
1178            };
1179
1180            let mut task = ProfilerTaskState {
1181                state: Some(state),
1182                reporter: self.reporter,
1183                agent_metadata: self.agent_metadata,
1184                reporting_interval: self.reporting_interval,
1185                completed_normally: false,
1186            };
1187
1188            let mut done = false;
1189            while !done {
1190                // Wait until a timer or exit event
1191                tokio::select! {
1192                    biased;
1193
1194                    r = &mut stop_rx, if !stop_rx.is_terminated() => {
1195                        match r {
1196                            Err(_) => {
1197                                tracing::info!("profiler stop requested, doing a final tick");
1198                                done = true;
1199                            }
1200                        }
1201                    }
1202                    _ = sampling_ticker.tick() => {
1203                        tracing::debug!("profiler timer woke up");
1204                    }
1205                }
1206
1207                if let Err(err) = profiler_tick(
1208                    &mut task.state,
1209                    &mut task.agent_metadata,
1210                    task.reporter.as_ref(),
1211                    task.reporting_interval,
1212                )
1213                .await
1214                {
1215                    match &err {
1216                        TickError::Reporter(_) => {
1217                            // don't stop on IO errors
1218                            tracing::error!(?err, "error during profiling, continuing");
1219                        }
1220                        _stop => {
1221                            tracing::error!(?err, "error during profiling, stopping");
1222                            break;
1223                        }
1224                    }
1225                }
1226            }
1227
1228            task.completed_normally = true;
1229            tracing::info!("profiling task finished");
1230        });
1231
1232        Ok(RunningProfiler {
1233            stop_channel,
1234            join_handle,
1235        })
1236    }
1237}
1238
1239/// Information from a successful profiler stop-start cycle, used by the async
1240/// reporting phase that follows.
1241struct TickCycleInfo {
1242    start_time: SystemTime,
1243    inactive_path: PathBuf,
1244}
1245
1246/// The synchronous (blocking) portion of a profiler tick.
1247///
1248/// Takes owned [`ProfilerState`] and always returns it alongside the result,
1249/// so the caller can restore it regardless of success or failure.
1250fn tick_blocking<E: ProfilerEngine>(
1251    mut state: ProfilerState<E>,
1252) -> (ProfilerState<E>, Result<Option<TickCycleInfo>, TickError>) {
1253    if !state.is_started() {
1254        let result = state.start().map(|()| None).map_err(TickError::from);
1255        return (state, result);
1256    }
1257
1258    let start_time = match state.stop() {
1259        Err(e) => return (state, Err(e.into())),
1260        Ok(None) => {
1261            tracing::warn!("stopped the profiler but it wasn't running?");
1262            return (state, Ok(None));
1263        }
1264        Ok(Some(t)) => t,
1265    };
1266
1267    let jfr_file = match state.jfr_file.as_mut() {
1268        Some(f) => f,
1269        None => {
1270            return (state, Err(TickError::JfrFileMissing));
1271        }
1272    };
1273
1274    if let Err(e) = jfr_file.empty_inactive_file() {
1275        return (state, Err(TickError::EmptyInactiveFile(e)));
1276    }
1277    jfr_file.swap();
1278    let inactive_path = jfr_file.inactive_path();
1279
1280    if let Err(e) = state.start() {
1281        return (state, Err(e.into()));
1282    }
1283
1284    (
1285        state,
1286        Ok(Some(TickCycleInfo {
1287            start_time,
1288            inactive_path,
1289        })),
1290    )
1291}
1292
1293/// # Cancel safety
1294///
1295/// This function is **not** cancel-safe. It moves `ProfilerState` out of `state_holder` via
1296/// `.take()` before awaiting `spawn_blocking`. If the future is dropped (e.g. in a `select!`
1297/// loop) before the state is restored, the profiler state is permanently lost.
1298async fn profiler_tick<E: ProfilerEngine>(
1299    state_holder: &mut Option<ProfilerState<E>>,
1300    agent_metadata: &mut Option<AgentMetadata>,
1301    reporter: &(dyn Reporter + Send + Sync),
1302    reporting_interval: Duration,
1303) -> Result<(), TickError> {
1304    let state = state_holder.take().ok_or(TickError::StateMissing)?;
1305
1306    let (state, result) = tokio::task::spawn_blocking(move || tick_blocking(state))
1307        .await
1308        .map_err(TickError::SpawnBlocking)?;
1309    *state_holder = Some(state);
1310
1311    let Some(info) = result? else {
1312        return Ok(());
1313    };
1314
1315    let start = info.start_time.duration_since(UNIX_EPOCH)?;
1316    let end = SystemTime::now().duration_since(UNIX_EPOCH)?;
1317
1318    // Lazily load the agent metadata if it was not provided in
1319    // the constructor. See the struct comments for why this is.
1320    // This code runs at most once.
1321    let instance = match agent_metadata.as_ref() {
1322        Some(md) => md,
1323        None => {
1324            #[cfg(feature = "aws-metadata-no-defaults")]
1325            let md = crate::metadata::aws::load_agent_metadata().await?;
1326            #[cfg(not(feature = "aws-metadata-no-defaults"))]
1327            let md = crate::metadata::AgentMetadata::NoMetadata;
1328            tracing::debug!("loaded metadata");
1329            agent_metadata.insert(md)
1330        }
1331    };
1332
1333    let report_metadata = ReportMetadata {
1334        instance,
1335        start,
1336        end,
1337        reporting_interval,
1338    };
1339
1340    let jfr = tokio::fs::read(&info.inactive_path)
1341        .await
1342        .map_err(TickError::JfrRead)?;
1343
1344    reporter
1345        .report(jfr, &report_metadata)
1346        .await
1347        .map_err(TickError::Reporter)?;
1348
1349    Ok(())
1350}
1351
1352#[cfg(test)]
1353mod tests {
1354    use std::sync::Arc;
1355    use std::sync::atomic::{self, AtomicBool, AtomicU32};
1356
1357    use test_case::test_case;
1358
1359    use super::*;
1360
1361    #[test]
1362    fn test_jfr_file_drop() {
1363        let mut jfr = JfrFile::new().unwrap();
1364
1365        std::fs::write(jfr.active_path(), b"Hello, 2!").unwrap();
1366        jfr.swap();
1367        assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"Hello, 2!");
1368        jfr.empty_inactive_file().unwrap();
1369        assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"");
1370    }
1371
1372    struct MockProfilerEngine {
1373        counter: AtomicU32,
1374    }
1375    impl ProfilerEngine for MockProfilerEngine {
1376        fn init_async_profiler() -> Result<(), asprof::AsProfError> {
1377            Ok(())
1378        }
1379
1380        fn start_async_profiler(
1381            &self,
1382            jfr_file_path: &Path,
1383            _options: &ProfilerOptions,
1384        ) -> Result<(), asprof::AsProfError> {
1385            let contents = format!(
1386                "JFR{}",
1387                self.counter.fetch_add(1, atomic::Ordering::Relaxed)
1388            );
1389            std::fs::write(jfr_file_path, contents.as_bytes()).unwrap();
1390            Ok(())
1391        }
1392
1393        fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
1394            Ok(())
1395        }
1396    }
1397
1398    struct MockReporter(tokio::sync::mpsc::Sender<(String, AgentMetadata)>);
1399    impl std::fmt::Debug for MockReporter {
1400        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1401            f.debug_struct("MockReporter").finish()
1402        }
1403    }
1404
1405    #[async_trait::async_trait]
1406    impl Reporter for MockReporter {
1407        async fn report(
1408            &self,
1409            jfr: Vec<u8>,
1410            metadata: &ReportMetadata,
1411        ) -> Result<(), Box<dyn std::error::Error + Send>> {
1412            self.0
1413                .send((String::from_utf8(jfr).unwrap(), metadata.instance.clone()))
1414                .await
1415                .unwrap();
1416            Ok(())
1417        }
1418    }
1419
1420    fn make_mock_profiler() -> (
1421        Profiler,
1422        tokio::sync::mpsc::Receiver<(String, AgentMetadata)>,
1423    ) {
1424        let (tx, rx) = tokio::sync::mpsc::channel(1);
1425        let agent = ProfilerBuilder::default()
1426            .with_reporter(MockReporter(tx))
1427            .with_custom_agent_metadata(AgentMetadata::Ec2AgentMetadata {
1428                aws_account_id: "0".into(),
1429                aws_region_id: "us-east-1".into(),
1430                ec2_instance_id: "i-fake".into(),
1431                ec2_instance_type: "t3.micro".into(),
1432            })
1433            .build();
1434        (agent, rx)
1435    }
1436
1437    #[tokio::test(start_paused = true)]
1438    async fn test_profiler_agent() {
1439        let e_md = AgentMetadata::Ec2AgentMetadata {
1440            aws_account_id: "0".into(),
1441            aws_region_id: "us-east-1".into(),
1442            ec2_instance_id: "i-fake".into(),
1443            ec2_instance_type: "t3.micro".into(),
1444        };
1445        let (agent, mut rx) = make_mock_profiler();
1446        agent
1447            .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1448                counter: AtomicU32::new(0),
1449            })
1450            .unwrap()
1451            .detach();
1452        let (jfr, md) = rx.recv().await.unwrap();
1453        assert_eq!(jfr, "JFR0");
1454        assert_eq!(e_md, md);
1455        let (jfr, md) = rx.recv().await.unwrap();
1456        assert_eq!(jfr, "JFR1");
1457        assert_eq!(e_md, md);
1458    }
1459
1460    #[test_case(false; "uncontrollable")]
1461    #[test_case(true; "controllable")]
1462    fn test_profiler_local_rt(controllable: bool) {
1463        let e_md = AgentMetadata::Ec2AgentMetadata {
1464            aws_account_id: "0".into(),
1465            aws_region_id: "us-east-1".into(),
1466            ec2_instance_id: "i-fake".into(),
1467            ec2_instance_type: "t3.micro".into(),
1468        };
1469        let (agent, mut rx) = make_mock_profiler();
1470        let rt = tokio::runtime::Builder::new_current_thread()
1471            .enable_all()
1472            .start_paused(true)
1473            .build()
1474            .unwrap();
1475        // spawn the profiler, doing this before spawning a thread to allow
1476        // capturing errors from `spawn`
1477        let handle = if controllable {
1478            Some(
1479                agent
1480                    .spawn_controllable_thread_inner::<MockProfilerEngine>(
1481                        MockProfilerEngine {
1482                            counter: AtomicU32::new(0),
1483                        },
1484                        rt,
1485                        std::thread::spawn,
1486                    )
1487                    .unwrap(),
1488            )
1489        } else {
1490            agent
1491                .spawn_thread_inner::<MockProfilerEngine>(
1492                    MockProfilerEngine {
1493                        counter: AtomicU32::new(0),
1494                    },
1495                    rt,
1496                    std::thread::spawn,
1497                )
1498                .unwrap();
1499            None
1500        };
1501
1502        let (jfr, md) = rx.blocking_recv().unwrap();
1503        assert_eq!(jfr, "JFR0");
1504        assert_eq!(e_md, md);
1505        let (jfr, md) = rx.blocking_recv().unwrap();
1506        assert_eq!(jfr, "JFR1");
1507        assert_eq!(e_md, md);
1508
1509        if let Some(handle) = handle {
1510            let drain_thread =
1511                std::thread::spawn(move || while let Some(_) = rx.blocking_recv() {});
1512            // request a stop
1513            handle.stop();
1514            // the drain thread should be done
1515            drain_thread.join().unwrap();
1516        }
1517    }
1518
1519    enum StopKind {
1520        Delibrate,
1521        Drop,
1522        Abort,
1523    }
1524
1525    #[tokio::test(start_paused = true)]
1526    #[test_case(StopKind::Delibrate; "deliberate stop")]
1527    #[test_case(StopKind::Drop; "drop stop")]
1528    #[test_case(StopKind::Abort; "abort stop")]
1529    async fn test_profiler_stop(stop_kind: StopKind) {
1530        let e_md = AgentMetadata::Ec2AgentMetadata {
1531            aws_account_id: "0".into(),
1532            aws_region_id: "us-east-1".into(),
1533            ec2_instance_id: "i-fake".into(),
1534            ec2_instance_type: "t3.micro".into(),
1535        };
1536        let (agent, mut rx) = make_mock_profiler();
1537        let profiler_ref = agent
1538            .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1539                counter: AtomicU32::new(0),
1540            })
1541            .unwrap();
1542        let (jfr, md) = rx.recv().await.unwrap();
1543        assert_eq!(jfr, "JFR0");
1544        assert_eq!(e_md, md);
1545        let (jfr, md) = rx.recv().await.unwrap();
1546        assert_eq!(jfr, "JFR1");
1547        assert_eq!(e_md, md);
1548        // check that stop is faster than an interval and returns an "immediate" next jfr
1549        match stop_kind {
1550            StopKind::Drop => drop(profiler_ref),
1551            StopKind::Delibrate => {
1552                tokio::time::timeout(Duration::from_millis(1), profiler_ref.stop())
1553                    .await
1554                    .unwrap();
1555            }
1556            StopKind::Abort => {
1557                // You can call Abort on the JoinHandle. make sure that is not buggy.
1558                profiler_ref.detach_inner().abort();
1559            }
1560        }
1561        // check that we get the next JFR "quickly", and the JFR after that is empty.
1562        let (jfr, md) = tokio::time::timeout(Duration::from_millis(1), rx.recv())
1563            .await
1564            .unwrap()
1565            .unwrap();
1566        assert_eq!(jfr, "JFR2");
1567        assert_eq!(e_md, md);
1568        assert!(rx.recv().await.is_none());
1569    }
1570
1571    // simulate a badly-behaved profiler that errors on start/stop and then
1572    // tries to access the JFR file
1573    struct StopErrorProfilerEngine {
1574        start_error: bool,
1575        counter: Arc<AtomicBool>,
1576    }
1577    impl ProfilerEngine for StopErrorProfilerEngine {
1578        fn init_async_profiler() -> Result<(), asprof::AsProfError> {
1579            Ok(())
1580        }
1581
1582        fn start_async_profiler(
1583            &self,
1584            jfr_file_path: &Path,
1585            _options: &ProfilerOptions,
1586        ) -> Result<(), asprof::AsProfError> {
1587            let jfr_file_path = jfr_file_path.to_owned();
1588            std::fs::write(&jfr_file_path, "JFR").unwrap();
1589            let counter = self.counter.clone();
1590            tokio::task::spawn(async move {
1591                tokio::time::sleep(Duration::from_secs(5)).await;
1592                assert_eq!(std::fs::read_to_string(jfr_file_path).unwrap(), "JFR");
1593                counter.store(true, atomic::Ordering::Release);
1594            });
1595            if self.start_error {
1596                Err(asprof::AsProfError::AsyncProfilerError("error".into()))
1597            } else {
1598                Ok(())
1599            }
1600        }
1601
1602        fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
1603            Err(asprof::AsProfError::AsyncProfilerError("error".into()))
1604        }
1605    }
1606
1607    #[tokio::test(start_paused = true)]
1608    #[test_case(false; "error on stop")]
1609    #[test_case(true; "error on start")]
1610    async fn test_profiler_error(start_error: bool) {
1611        let (agent, mut rx) = make_mock_profiler();
1612        let counter = Arc::new(AtomicBool::new(false));
1613        let engine = StopErrorProfilerEngine {
1614            start_error,
1615            counter: counter.clone(),
1616        };
1617        let handle = agent.spawn_inner(engine).unwrap().detach_inner();
1618        assert!(rx.recv().await.is_none());
1619        // check that the "sleep 5" step in start_async_profiler succeeds
1620        for _ in 0..100 {
1621            tokio::time::sleep(Duration::from_secs(1)).await;
1622            if counter.load(atomic::Ordering::Acquire) {
1623                handle.await.unwrap(); // Check that the JoinHandle is done
1624                return;
1625            }
1626        }
1627        panic!("didn't read from file");
1628    }
1629
1630    #[test]
1631    fn test_profiler_options_to_args_string_default() {
1632        let opts = ProfilerOptions::default();
1633        let dummy_path = Path::new("/tmp/test.jfr");
1634        let args = opts.to_args_string(dummy_path);
1635        assert!(
1636            args.contains("start,event=cpu,interval=100000000,wall=1000ms,jfr,cstack=dwarf"),
1637            "Default args string not constructed correctly"
1638        );
1639        assert!(args.contains("file=/tmp/test.jfr"));
1640        assert!(!args.contains("nativemem="));
1641    }
1642
1643    #[test]
1644    fn test_profiler_options_to_args_string_with_native_mem() {
1645        let opts = ProfilerOptions {
1646            native_mem: Some("10m".to_string()),
1647            wall_clock_millis: None,
1648            cpu_interval: None,
1649        };
1650        let dummy_path = Path::new("/tmp/test.jfr");
1651        let args = opts.to_args_string(dummy_path);
1652        assert!(args.contains("nativemem=10m"));
1653    }
1654
1655    #[test]
1656    fn test_profiler_options_builder() {
1657        let opts = ProfilerOptionsBuilder::default()
1658            .with_native_mem_bytes(5000000)
1659            .build();
1660
1661        assert_eq!(opts.native_mem, Some("5000000".to_string()));
1662    }
1663
1664    #[test]
1665    fn test_profiler_options_builder_all_options() {
1666        let opts = ProfilerOptionsBuilder::default()
1667            .with_native_mem_bytes(5000000)
1668            .with_cpu_interval(Duration::from_secs(1))
1669            .with_wall_clock_interval(Duration::from_secs(10))
1670            .build();
1671
1672        let dummy_path = Path::new("/tmp/test.jfr");
1673        let args = opts.to_args_string(dummy_path);
1674        assert_eq!(
1675            args,
1676            "start,event=cpu,interval=1000000000,wall=10000ms,jfr,cstack=dwarf,file=/tmp/test.jfr,nativemem=5000000"
1677        );
1678    }
1679
1680    #[test]
1681    fn test_local_reporter_has_no_metadata() {
1682        // Check that with_local_reporter sets some configuration
1683        let reporter = ProfilerBuilder::default().with_local_reporter(".");
1684        assert_eq!(
1685            format!("{:?}", reporter.reporter),
1686            r#"Some(LocalReporter { directory: "." })"#
1687        );
1688        match reporter.agent_metadata {
1689            Some(AgentMetadata::NoMetadata) => {}
1690            bad => panic!("{bad:?}"),
1691        };
1692    }
1693
1694    /// A reporter that tracks both async and blocking reports separately.
1695    struct BlockingMockReporter {
1696        async_tx: tokio::sync::mpsc::Sender<String>,
1697        blocking_reports: Arc<std::sync::Mutex<Vec<String>>>,
1698    }
1699    impl std::fmt::Debug for BlockingMockReporter {
1700        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1701            f.debug_struct("BlockingMockReporter").finish()
1702        }
1703    }
1704
1705    #[async_trait::async_trait]
1706    impl Reporter for BlockingMockReporter {
1707        async fn report(
1708            &self,
1709            jfr: Vec<u8>,
1710            _metadata: &ReportMetadata,
1711        ) -> Result<(), Box<dyn std::error::Error + Send>> {
1712            self.async_tx
1713                .send(String::from_utf8(jfr).unwrap())
1714                .await
1715                .unwrap();
1716            Ok(())
1717        }
1718
1719        fn report_blocking(
1720            &self,
1721            jfr_path: &Path,
1722            _metadata: &ReportMetadata,
1723        ) -> Result<(), Box<dyn std::error::Error + Send>> {
1724            let jfr = std::fs::read(jfr_path).map_err(|e| Box::new(e) as _)?;
1725            self.blocking_reports
1726                .lock()
1727                .unwrap()
1728                .push(String::from_utf8(jfr).unwrap());
1729            Ok(())
1730        }
1731    }
1732
1733    /// Simulates a runtime shutdown while the profiler is running.
1734    /// The profiler should call report_blocking on drop to flush the
1735    /// last sample.
1736    #[test]
1737    fn test_profiler_report_on_drop() {
1738        let blocking_reports = Arc::new(std::sync::Mutex::new(Vec::new()));
1739
1740        let rt = tokio::runtime::Builder::new_current_thread()
1741            .enable_all()
1742            .start_paused(true)
1743            .build()
1744            .unwrap();
1745
1746        let reports_clone = blocking_reports.clone();
1747        rt.block_on(async {
1748            let (async_tx, mut async_rx) = tokio::sync::mpsc::channel::<String>(10);
1749            let agent = ProfilerBuilder::default()
1750                .with_reporter(BlockingMockReporter {
1751                    async_tx,
1752                    blocking_reports: reports_clone,
1753                })
1754                .with_custom_agent_metadata(AgentMetadata::NoMetadata)
1755                .build();
1756            // Detach so the stop channel doesn't trigger a graceful stop
1757            // when the block_on future returns.
1758            agent
1759                .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1760                    counter: AtomicU32::new(0),
1761                })
1762                .unwrap()
1763                .detach();
1764
1765            // Wait for first async report to confirm profiler is running
1766            let jfr = async_rx.recv().await.unwrap();
1767            assert_eq!(jfr, "JFR0");
1768            // Return without stopping — runtime drop will cancel the task.
1769        });
1770
1771        // Runtime shutdown cancels all tasks, triggering ProfilerTaskState::Drop.
1772        drop(rt);
1773
1774        let reports = blocking_reports.lock().unwrap();
1775        assert_eq!(
1776            reports.len(),
1777            1,
1778            "expected exactly one blocking report on drop"
1779        );
1780        assert_eq!(reports[0], "JFR1");
1781    }
1782}