async_profiler_agent/profiler.rs
1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! A profiler that periodically uploads profiling samples of your program to a [Reporter]
5
6use crate::{
7 asprof::{self, AsProfError},
8 metadata::{AgentMetadata, ReportMetadata},
9 reporter::{Reporter, local::LocalReporter},
10};
11use std::{
12 fs::File,
13 io,
14 path::{Path, PathBuf},
15 time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
16};
17use thiserror::Error;
18
19struct JfrFile {
20 active: std::fs::File,
21 inactive: std::fs::File,
22}
23
24impl JfrFile {
25 #[cfg(target_os = "linux")]
26 fn new() -> Result<Self, io::Error> {
27 Ok(Self {
28 active: tempfile::tempfile()?,
29 inactive: tempfile::tempfile()?,
30 })
31 }
32
33 #[cfg(not(target_os = "linux"))]
34 fn new() -> Result<Self, io::Error> {
35 Err(io::Error::other(
36 "async-profiler is only supported on Linux",
37 ))
38 }
39
40 fn swap(&mut self) {
41 std::mem::swap(&mut self.active, &mut self.inactive);
42 }
43
44 #[cfg(target_os = "linux")]
45 fn file_path(file: &std::fs::File) -> PathBuf {
46 use std::os::fd::AsRawFd;
47
48 format!("/proc/self/fd/{}", file.as_raw_fd()).into()
49 }
50
51 #[cfg(not(target_os = "linux"))]
52 fn file_path(_file: &std::fs::File) -> PathBuf {
53 unimplemented!()
54 }
55
56 fn active_path(&self) -> PathBuf {
57 Self::file_path(&self.active)
58 }
59
60 fn inactive_path(&self) -> PathBuf {
61 Self::file_path(&self.inactive)
62 }
63
64 fn empty_inactive_file(&mut self) -> Result<(), io::Error> {
65 // Empty the file, or create it for the first time if the profiler hasn't
66 // started yet.
67 File::create(Self::file_path(&self.inactive))?;
68 tracing::debug!(message = "emptied the file");
69 Ok(())
70 }
71}
72
73/// Options for configuring the async-profiler behavior.
74/// Currently supports:
75/// - Native memory allocation tracking
76#[derive(Debug, Default, Clone)]
77#[non_exhaustive]
78pub struct ProfilerOptions {
79 /// If set, the profiler will collect information about
80 /// native memory allocations.
81 ///
82 /// The value is the interval in bytes or in other units,
83 /// if followed by k (kilobytes), m (megabytes), or g (gigabytes).
84 /// For example, `"10m"` will sample an allocation for every
85 /// 10 megabytes of memory allocated. Passing `"0"` will sample
86 /// all allocations.
87 ///
88 /// See [ProfilingModes in the async-profiler docs] for more details.
89 ///
90 /// [ProfilingModes in the async-profiler docs]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilingModes.md#native-memory-leaks
91 pub native_mem: Option<String>,
92 cpu_interval: Option<u128>,
93 wall_clock_millis: Option<u128>,
94}
95
96const DEFAULT_CPU_INTERVAL_NANOS: u128 = 100_000_000;
97const DEFAULT_WALL_CLOCK_INTERVAL_MILLIS: u128 = 1_000;
98
99impl ProfilerOptions {
100 /// Convert the profiler options to a string of arguments for the async-profiler.
101 pub fn to_args_string(&self, jfr_file_path: &std::path::Path) -> String {
102 let mut args = format!(
103 "start,event=cpu,interval={},wall={}ms,jfr,cstack=dwarf,file={}",
104 self.cpu_interval.unwrap_or(DEFAULT_CPU_INTERVAL_NANOS),
105 self.wall_clock_millis
106 .unwrap_or(DEFAULT_WALL_CLOCK_INTERVAL_MILLIS),
107 jfr_file_path.display()
108 );
109 if let Some(ref native_mem) = self.native_mem {
110 args.push_str(&format!(",nativemem={native_mem}"));
111 }
112 args
113 }
114}
115
116/// Builder for [`ProfilerOptions`].
117#[derive(Debug, Default)]
118pub struct ProfilerOptionsBuilder {
119 native_mem: Option<String>,
120 cpu_interval: Option<u128>,
121 wall_clock_millis: Option<u128>,
122}
123
124impl ProfilerOptionsBuilder {
125 /// Same as [ProfilerOptionsBuilder::with_native_mem_bytes], but pass
126 /// the string input directly to async_profiler.
127 ///
128 /// The value is the interval in bytes or in other units,
129 /// if followed by k (kilobytes), m (megabytes), or g (gigabytes).
130 ///
131 /// Prefer using [ProfilerOptionsBuilder::with_native_mem_bytes], since it's
132 /// type-checked.
133 ///
134 /// ### Examples
135 ///
136 /// This will sample allocations for every 10 megabytes allocated:
137 ///
138 /// ```
139 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
140 /// # use async_profiler_agent::profiler::SpawnError;
141 /// # #[tokio::main]
142 /// # async fn main() -> Result<(), SpawnError> {
143 /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build();
144 /// let profiler = ProfilerBuilder::default()
145 /// .with_profiler_options(opts)
146 /// .with_local_reporter("/tmp/profiles")
147 /// .build();
148 /// # if false { // don't spawn the profiler in doctests
149 /// let profiler = profiler.spawn_controllable()?;
150 /// // ... your program goes here
151 /// profiler.stop().await; // make sure the last profile is flushed
152 /// # }
153 /// # Ok(())
154 /// # }
155 /// ```
156 pub fn with_native_mem(mut self, native_mem_interval: String) -> Self {
157 self.native_mem = Some(native_mem_interval);
158 self
159 }
160
161 /// If set, the profiler will collect information about
162 /// native memory allocations.
163 ///
164 /// The argument passed is the profiling interval - the profiler will
165 /// sample allocations every about that many bytes.
166 ///
167 /// See [ProfilingModes in the async-profiler docs] for more details.
168 ///
169 /// [ProfilingModes in the async-profiler docs]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilingModes.md#native-memory-leaks
170 ///
171 /// ### Examples
172 ///
173 /// This will sample allocations for every 10 megabytes allocated:
174 ///
175 /// ```
176 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
177 /// # use async_profiler_agent::profiler::SpawnError;
178 /// # #[tokio::main]
179 /// # async fn main() -> Result<(), SpawnError> {
180 /// let opts = ProfilerOptionsBuilder::default().with_native_mem_bytes(10_000_000).build();
181 /// let profiler = ProfilerBuilder::default()
182 /// .with_profiler_options(opts)
183 /// .with_local_reporter("/tmp/profiles")
184 /// .build();
185 /// # if false { // don't spawn the profiler in doctests
186 /// let profiler = profiler.spawn_controllable()?;
187 /// // ... your program goes here
188 /// profiler.stop().await; // make sure the last profile is flushed
189 /// # }
190 /// # Ok(())
191 /// # }
192 /// ```
193 ///
194 /// This will sample every allocation (potentially slow):
195 /// ```
196 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
197 /// # use async_profiler_agent::profiler::SpawnError;
198 /// # #[tokio::main]
199 /// # async fn main() -> Result<(), SpawnError> {
200 /// let opts = ProfilerOptionsBuilder::default().with_native_mem_bytes(0).build();
201 /// let profiler = ProfilerBuilder::default()
202 /// .with_profiler_options(opts)
203 /// .with_local_reporter("/tmp/profiles")
204 /// .build();
205 /// # if false { // don't spawn the profiler in doctests
206 /// let profiler = profiler.spawn_controllable()?;
207 /// // ... your program goes here
208 /// profiler.stop().await; // make sure the last profile is flushed
209 /// # }
210 /// # Ok(())
211 /// # }
212 /// ```
213 pub fn with_native_mem_bytes(mut self, native_mem_interval: usize) -> Self {
214 self.native_mem = Some(native_mem_interval.to_string());
215 self
216 }
217
218 /// Sets the interval in which the profiler will collect
219 /// CPU-time samples, via the [async-profiler `interval` option].
220 ///
221 /// CPU-time samples (JFR `jdk.ExecutionSample`) sample only threads that
222 /// are currently running on a CPU, not threads that are sleeping.
223 ///
224 /// It can use a higher frequency than wall-clock sampling since the
225 /// number of the threads that are running on a CPU at a given time is
226 /// naturally limited by the number of CPUs, while the number of sleeping
227 /// threads can be much larger.
228 ///
229 /// The default is to do a CPU-time sample every 100 milliseconds.
230 ///
231 /// The async-profiler agent collects both CPU time and wall-clock time
232 /// samples, so this function should normally be used along with
233 /// [ProfilerOptionsBuilder::with_wall_clock_interval].
234 ///
235 /// [async-profiler `interval` option]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilerOptions.md#options-applicable-to-any-output-format
236 ///
237 /// ### Examples
238 ///
239 /// This will sample allocations for every 10 CPU milliseconds (when running)
240 /// and 100 wall-clock milliseconds (running or sleeping):
241 ///
242 /// ```
243 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
244 /// # use async_profiler_agent::profiler::SpawnError;
245 /// # use std::time::Duration;
246 /// # #[tokio::main]
247 /// # async fn main() -> Result<(), SpawnError> {
248 /// let opts = ProfilerOptionsBuilder::default()
249 /// .with_cpu_interval(Duration::from_millis(10))
250 /// .with_wall_clock_interval(Duration::from_millis(100))
251 /// .build();
252 /// let profiler = ProfilerBuilder::default()
253 /// .with_profiler_options(opts)
254 /// .with_local_reporter("/tmp/profiles")
255 /// .build();
256 /// # if false { // don't spawn the profiler in doctests
257 /// let profiler = profiler.spawn_controllable()?;
258 /// // ... your program goes here
259 /// profiler.stop().await; // make sure the last profile is flushed
260 /// # }
261 /// # Ok(())
262 /// # }
263 /// ```
264 pub fn with_cpu_interval(mut self, cpu_interval: Duration) -> Self {
265 self.cpu_interval = Some(cpu_interval.as_nanos());
266 self
267 }
268
269 /// Sets the interval, in milliseconds, in which the profiler will collect
270 /// wall-clock samples, via the [async-profiler `wall` option].
271 ///
272 /// Wall-clock samples (JFR `profiler.WallClockSample`) sample threads
273 /// whether they are sleeping or running, and can therefore be
274 /// very useful for finding threads that are blocked, for example
275 /// on a synchronous lock or a slow system call.
276 ///
277 /// When using Tokio, since tasks are not threads, tasks that are not
278 /// currently running will not be sampled by a wall clock sample. However,
279 /// a wall clock sample is still very useful in Tokio, since it is what
280 /// you want to catch tasks that are blocking a thread by waiting on
281 /// synchronous operations.
282 ///
283 /// The default is to do a wall-clock sample every second.
284 ///
285 /// The async-profiler agent collects both CPU time and wall-clock time
286 /// samples, so this function should normally be used along with
287 /// [ProfilerOptionsBuilder::with_cpu_interval].
288 ///
289 /// [async-profiler `wall` option]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilerOptions.md#options-applicable-to-any-output-format
290 ///
291 /// ### Examples
292 ///
293 /// This will sample allocations for every 10 CPU milliseconds (when running)
294 /// and 100 wall-clock milliseconds (running or sleeping):
295 ///
296 /// ```
297 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
298 /// # use async_profiler_agent::profiler::SpawnError;
299 /// # use std::time::Duration;
300 /// # #[tokio::main]
301 /// # async fn main() -> Result<(), SpawnError> {
302 /// let opts = ProfilerOptionsBuilder::default()
303 /// .with_cpu_interval(Duration::from_millis(10))
304 /// .with_wall_clock_interval(Duration::from_millis(10))
305 /// .build();
306 /// let profiler = ProfilerBuilder::default()
307 /// .with_profiler_options(opts)
308 /// .with_local_reporter("/tmp/profiles")
309 /// .build();
310 /// # if false { // don't spawn the profiler in doctests
311 /// let profiler = profiler.spawn_controllable()?;
312 /// // ... your program goes here
313 /// profiler.stop().await; // make sure the last profile is flushed
314 /// # }
315 /// # Ok(())
316 /// # }
317 /// ```
318 pub fn with_wall_clock_interval(mut self, wall_clock: Duration) -> Self {
319 self.wall_clock_millis = Some(wall_clock.as_millis());
320 self
321 }
322
323 /// Build the [`ProfilerOptions`] from the builder.
324 pub fn build(self) -> ProfilerOptions {
325 ProfilerOptions {
326 native_mem: self.native_mem,
327 wall_clock_millis: self.wall_clock_millis,
328 cpu_interval: self.cpu_interval,
329 }
330 }
331}
332
333/// Builds a [`Profiler`], panicking if any required fields were not set by the
334/// time `build` is called.
335#[derive(Debug, Default)]
336pub struct ProfilerBuilder {
337 reporting_interval: Option<Duration>,
338 reporter: Option<Box<dyn Reporter + Send + Sync>>,
339 agent_metadata: Option<AgentMetadata>,
340 profiler_options: Option<ProfilerOptions>,
341}
342
343impl ProfilerBuilder {
344 /// Sets the reporting interval (default: 30 seconds).
345 ///
346 /// This is the interval that samples are *reported* to the backend,
347 /// and is unrelated to the interval at which the application
348 /// is *sampled* by async profiler, which is controlled by
349 /// [ProfilerOptionsBuilder::with_cpu_interval] and
350 /// [ProfilerOptionsBuilder::with_wall_clock_interval].
351 ///
352 /// Most users should not change this setting.
353 ///
354 /// ## Example
355 ///
356 /// ```no_run
357 /// # use async_profiler_agent::profiler::SpawnError;
358 /// # #[tokio::main]
359 /// # async fn main() -> Result<(), SpawnError> {
360 /// # use std::path::PathBuf;
361 /// # use std::time::Duration;
362 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
363 /// # let path = PathBuf::from(".");
364 /// let agent = ProfilerBuilder::default()
365 /// .with_local_reporter(path)
366 /// .with_reporting_interval(Duration::from_secs(15))
367 /// .build()
368 /// .spawn_controllable()?;
369 /// // .. your program goes here
370 /// agent.stop().await; // make sure the last profile is flushed
371 /// # Ok::<_, SpawnError>(())
372 /// # }
373 /// ```
374 pub fn with_reporting_interval(mut self, i: Duration) -> ProfilerBuilder {
375 self.reporting_interval = Some(i);
376 self
377 }
378
379 /// Sets the [`Reporter`], which is used to upload the collected profiling
380 /// data. Common reporters are [`LocalReporter`], and, with the `s3-no-defaults`
381 /// feature enabled,
382 #[cfg_attr(not(feature = "s3-no-defaults"), doc = "`S3Reporter`.")]
383 #[cfg_attr(feature = "s3-no-defaults", doc = "[`S3Reporter`].")]
384 /// It is also possible to write your own [`Reporter`].
385 ///
386 /// It's normally easier to use [`LocalReporter`] directly via
387 /// [`ProfilerBuilder::with_local_reporter`].
388 ///
389 /// If you want to output to multiple reporters, you can use
390 /// [`MultiReporter`].
391 ///
392 /// [`LocalReporter`]: crate::reporter::local::LocalReporter
393 /// [`MultiReporter`]: crate::reporter::multi::MultiReporter
394 #[cfg_attr(
395 feature = "s3-no-defaults",
396 doc = "[`S3Reporter`]: crate::reporter::s3::S3Reporter"
397 )]
398 ///
399 #[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example.md"))]
400 pub fn with_reporter(mut self, r: impl Reporter + Send + Sync + 'static) -> ProfilerBuilder {
401 self.reporter = Some(Box::new(r));
402 self
403 }
404
405 /// Sets the profiler to ues [LocalReporter], which will write `.jfr` files to `path`,
406 /// and disables metadata auto-detection (see [`ProfilerBuilder::with_custom_agent_metadata`])
407 /// since the [LocalReporter] does not need that.
408 ///
409 /// This is useful for testing, since metadata auto-detection currently only works
410 /// on [Amazon EC2] or [Amazon Fargate] instances.
411 ///
412 /// The local reporter should normally not be used in production, since it will
413 /// not clean up JFR files. Instead, you can use a pre-existing [`Reporter`]
414 /// or write your own (see [`ProfilerBuilder::with_reporter`]).
415 ///
416 /// [Amazon EC2]: https://aws.amazon.com/ec2
417 /// [Amazon Fargate]: https://aws.amazon.com/fargate
418 ///
419 /// ## Example
420 ///
421 /// This will write profiles as `.jfr` files to `./path-to-profiles`:
422 ///
423 /// ```no_run
424 /// # use std::path::PathBuf;
425 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
426 /// # use async_profiler_agent::reporter::local::LocalReporter;
427 /// # use async_profiler_agent::metadata::AgentMetadata;
428 /// let path = PathBuf::from("./path-to-profiles");
429 /// let agent = ProfilerBuilder::default()
430 /// .with_local_reporter(path)
431 /// .build()
432 /// .spawn()?;
433 /// # Ok::<_, SpawnError>(())
434 /// ```
435 pub fn with_local_reporter(mut self, path: impl Into<PathBuf>) -> ProfilerBuilder {
436 self.reporter = Some(Box::new(LocalReporter::new(path.into())));
437 self.with_custom_agent_metadata(AgentMetadata::NoMetadata)
438 }
439
440 /// Provide custom agent metadata.
441 ///
442 /// The async-profiler Rust agent sends metadata to the [Reporter] with
443 /// the identity of the current host and process, which is normally
444 /// transmitted as `metadata.json` within the generated `.zip` file,
445 /// using the schema format [`reporter::s3::MetadataJson`].
446 ///
447 /// That metadata can later be used by tooling to be able to sort
448 /// profiling reports by host.
449 ///
450 /// async-profiler Rust agent will by default try to fetch the metadata
451 /// using [IMDS] when running on [Amazon EC2] or [Amazon Fargate], and
452 /// will error if it's unable to find it. If you are running the
453 /// async-profiler agent on any other form of compute,
454 /// you will need to create and attach your own metadata
455 /// by calling this function.
456 ///
457 #[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example-custom-metadata.md"))]
458 /// [`reporter::s3::MetadataJson`]: crate::reporter::s3::MetadataJson
459 /// [Amazon EC2]: https://aws.amazon.com/ec2
460 /// [Amazon Fargate]: https://aws.amazon.com/fargate
461 /// [IMDS]: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
462 pub fn with_custom_agent_metadata(mut self, j: AgentMetadata) -> ProfilerBuilder {
463 self.agent_metadata = Some(j);
464 self
465 }
466
467 /// Provide custom profiler options.
468 ///
469 /// ### Example
470 ///
471 /// This will sample allocations for every 10 megabytes allocated:
472 ///
473 /// ```
474 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
475 /// # use async_profiler_agent::profiler::SpawnError;
476 /// # #[tokio::main]
477 /// # async fn main() -> Result<(), SpawnError> {
478 /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build();
479 /// let profiler = ProfilerBuilder::default()
480 /// .with_profiler_options(opts)
481 /// .with_local_reporter("/tmp/profiles")
482 /// .build();
483 /// # if false { // don't spawn the profiler in doctests
484 /// let profiler = profiler.spawn_controllable()?;
485 /// // ... your program goes here
486 /// profiler.stop().await; // make sure the last profile is flushed
487 /// # }
488 /// # Ok(())
489 /// # }
490 /// ```
491 pub fn with_profiler_options(mut self, c: ProfilerOptions) -> ProfilerBuilder {
492 self.profiler_options = Some(c);
493 self
494 }
495
496 /// Turn this builder into a profiler!
497 pub fn build(self) -> Profiler {
498 Profiler {
499 reporting_interval: self.reporting_interval.unwrap_or(Duration::from_secs(30)),
500 reporter: self.reporter.expect("reporter is required"),
501 agent_metadata: self.agent_metadata,
502 profiler_options: self.profiler_options.unwrap_or_default(),
503 }
504 }
505}
506
507enum Status {
508 Idle,
509 Starting,
510 Running(SystemTime),
511}
512
513/// This type provides wrapper APIs over [`asprof::AsProf`], to allow tracking
514/// of the state of the profiler. The primary benefit of this is RAII - when
515/// this type drops, it will stop the profiler if it's running.
516struct ProfilerState<E: ProfilerEngine> {
517 // this is only None in the destructor when stopping the async-profiler fails
518 jfr_file: Option<JfrFile>,
519 asprof: E,
520 status: Status,
521 profiler_options: ProfilerOptions,
522}
523
524impl<E: ProfilerEngine> ProfilerState<E> {
525 fn new(asprof: E, profiler_options: ProfilerOptions) -> Result<Self, io::Error> {
526 Ok(Self {
527 jfr_file: Some(JfrFile::new()?),
528 asprof,
529 status: Status::Idle,
530 profiler_options,
531 })
532 }
533
534 fn start(&mut self) -> Result<(), AsProfError> {
535 let jfr_file = self
536 .jfr_file
537 .as_ref()
538 .ok_or_else(|| io::Error::other("jfr file missing (dropped during stop failure?)"))?;
539 let active = jfr_file.active_path();
540 // drop guard - make sure the files are leaked if the profiler might have started
541 self.status = Status::Starting;
542 E::start_async_profiler(&self.asprof, &active, &self.profiler_options)?;
543 self.status = Status::Running(SystemTime::now());
544 Ok(())
545 }
546
547 fn stop(&mut self) -> Result<Option<SystemTime>, AsProfError> {
548 E::stop_async_profiler()?;
549 let status = std::mem::replace(&mut self.status, Status::Idle);
550 Ok(match status {
551 Status::Idle | Status::Starting => None,
552 Status::Running(since) => Some(since),
553 })
554 }
555
556 fn is_started(&self) -> bool {
557 matches!(self.status, Status::Running(_))
558 }
559}
560
561impl<E: ProfilerEngine> Drop for ProfilerState<E> {
562 fn drop(&mut self) {
563 match self.status {
564 Status::Running(_) => {
565 // In Drop, we can't use spawn_blocking, so we call the blocking operation
566 // directly. We skip the status reset that self.stop() would do since the
567 // struct is being dropped.
568 if let Err(err) = E::stop_async_profiler() {
569 // SECURITY: avoid removing the JFR file if stopping the profiler fails,
570 // to avoid symlink races
571 std::mem::forget(self.jfr_file.take());
572 // XXX: Rust defines leaking resources during drop as safe.
573 tracing::warn!(?err, "unable to stop profiler during drop glue");
574 }
575 }
576 Status::Idle => {}
577 Status::Starting => {
578 // SECURITY: avoid removing the JFR file if stopping the profiler fails,
579 // to avoid symlink races
580 std::mem::forget(self.jfr_file.take());
581 }
582 }
583 }
584}
585
586pub(crate) trait ProfilerEngine: Send + Sync + 'static {
587 fn init_async_profiler() -> Result<(), asprof::AsProfError>;
588 fn start_async_profiler(
589 &self,
590 jfr_file_path: &Path,
591 options: &ProfilerOptions,
592 ) -> Result<(), asprof::AsProfError>;
593 fn stop_async_profiler() -> Result<(), asprof::AsProfError>;
594}
595
596/// Holds the profiler task state and performs a final synchronous report
597/// when the task is cancelled (e.g. Tokio runtime shutdown) before a
598/// graceful stop. The local reporter will flush its contents on drop.
599/// For other reporters, you must call `RunningProfiler::stop().await`
600/// to ensure the last sample is uploaded.
601struct ProfilerTaskState<E: ProfilerEngine> {
602 // Option so profiler_tick can .take() the state to move it into spawn_blocking
603 state: Option<ProfilerState<E>>,
604 reporter: Box<dyn Reporter + Send + Sync>,
605 agent_metadata: Option<AgentMetadata>,
606 reporting_interval: Duration,
607 completed_normally: bool,
608}
609
610impl<E: ProfilerEngine> ProfilerTaskState<E> {
611 fn try_final_report(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
612 let state = self.state.as_mut().ok_or("profiler state missing")?;
613 let start = state.stop()?.ok_or("profiler was not running")?;
614 let jfr_file = state.jfr_file.as_ref().ok_or("jfr file missing")?;
615 let jfr_path = jfr_file.active_path();
616 if jfr_path.metadata()?.len() == 0 {
617 return Ok(());
618 }
619 let metadata = ReportMetadata {
620 instance: self
621 .agent_metadata
622 .as_ref()
623 .unwrap_or(&AgentMetadata::NoMetadata),
624 start: start.duration_since(UNIX_EPOCH)?,
625 end: SystemTime::now().duration_since(UNIX_EPOCH)?,
626 reporting_interval: self.reporting_interval,
627 };
628 self.reporter
629 .report_blocking(&jfr_path, &metadata)
630 .map_err(|e| e.to_string())?;
631 Ok(())
632 }
633}
634
635impl<E: ProfilerEngine> Drop for ProfilerTaskState<E> {
636 fn drop(&mut self) {
637 let is_started = self.state.as_ref().is_some_and(|s| s.is_started());
638 if self.completed_normally || !is_started {
639 return;
640 }
641 tracing::info!("profiler task cancelled, attempting final report on drop");
642 if let Err(err) = self.try_final_report() {
643 tracing::warn!(?err, "failed to report on drop");
644 }
645 }
646}
647
648#[derive(Debug, Error)]
649#[non_exhaustive]
650enum TickError {
651 #[error(transparent)]
652 AsProf(#[from] AsProfError),
653 #[error(transparent)]
654 #[cfg(feature = "aws-metadata-no-defaults")]
655 Metadata(Box<crate::metadata::aws::AwsProfilerMetadataError>),
656 #[error("reporter: {0}")]
657 Reporter(Box<dyn std::error::Error + Send>),
658 #[error("broken clock: {0}")]
659 BrokenClock(#[from] SystemTimeError),
660 #[error("jfr read error: {0}")]
661 JfrRead(io::Error),
662 #[error("empty inactive file error: {0}")]
663 EmptyInactiveFile(io::Error),
664 #[error("jfr file missing (dropped during stop failure?)")]
665 JfrFileMissing,
666 #[error("profiler state missing (previous tick panicked?)")]
667 StateMissing,
668 #[error("spawn_blocking task failed: {0}")]
669 SpawnBlocking(tokio::task::JoinError),
670}
671
672#[cfg(feature = "aws-metadata-no-defaults")]
673impl From<crate::metadata::aws::AwsProfilerMetadataError> for TickError {
674 fn from(err: crate::metadata::aws::AwsProfilerMetadataError) -> Self {
675 TickError::Metadata(Box::new(err))
676 }
677}
678
679#[derive(Debug, Error)]
680#[non_exhaustive]
681/// An error that happened spawning a profiler
682pub enum SpawnError {
683 /// Error from async-profiler
684 #[error(transparent)]
685 AsProf(#[from] asprof::AsProfError),
686 /// Error writing to a tempfile
687 #[error("tempfile error")]
688 TempFile(#[source] io::Error),
689}
690
691#[derive(Debug, Error)]
692#[non_exhaustive]
693/// An error from [`Profiler::spawn_thread`]
694pub enum SpawnThreadError {
695 /// Error from async-profiler
696 #[error(transparent)]
697 AsProf(#[from] SpawnError),
698 /// Error constructing Tokio runtime
699 #[error("constructing Tokio runtime")]
700 ConstructRt(#[source] io::Error),
701}
702
703// no control messages currently
704enum Control {}
705
706/// A handle to a running profiler
707///
708/// Currently just allows for stopping the profiler.
709///
710/// Dropping this handle will request that the profiler will stop.
711#[must_use = "dropping this stops the profiler, call .detach() to detach"]
712pub struct RunningProfiler {
713 stop_channel: tokio::sync::oneshot::Sender<Control>,
714 join_handle: tokio::task::JoinHandle<()>,
715}
716
717impl RunningProfiler {
718 /// Request that the current profiler stops and wait until it exits.
719 ///
720 /// This will cause the currently-pending profile information to be flushed.
721 ///
722 /// After this function returns, it is correct and safe to [spawn] a new
723 /// [Profiler], possibly with a different configuration. Therefore,
724 /// this function can be used to "reconfigure" a profiler by stopping
725 /// it and then starting a new one with a different configuration.
726 ///
727 /// [spawn]: Profiler::spawn_controllable
728 pub async fn stop(self) {
729 drop(self.stop_channel);
730 let _ = self.join_handle.await;
731 }
732
733 /// Like [Self::detach], but returns a JoinHandle. This is currently not a public API.
734 fn detach_inner(self) -> tokio::task::JoinHandle<()> {
735 tokio::task::spawn(async move {
736 // move the control channel to the spawned task. this way, it will be dropped
737 // just when the task is aborted.
738 let _abort_channel = self.stop_channel;
739 self.join_handle.await.ok();
740 })
741 }
742
743 /// Detach this profiler. This will prevent the profiler from being stopped
744 /// when this handle is dropped. You should call this (or [Profiler::spawn]
745 /// instead of [Profiler::spawn_controllable], which does the same thing)
746 /// if you don't intend to reconfigure your profiler at runtime.
747 pub fn detach(self) {
748 self.detach_inner();
749 }
750
751 /// Spawns this [RunningProfiler] into a separate thread within a new Tokio runtime,
752 /// and returns a [RunningProfilerThread] attached to it.
753 fn spawn_attached(
754 self,
755 runtime: tokio::runtime::Runtime,
756 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
757 ) -> RunningProfilerThread {
758 RunningProfilerThread {
759 stop_channel: self.stop_channel,
760 join_handle: spawn_fn(Box::new(move || {
761 let _ = runtime.block_on(self.join_handle);
762 })),
763 }
764 }
765
766 /// Spawns this [RunningProfiler] into a separate thread within a new Tokio runtime,
767 /// and detaches it.
768 fn spawn_detached(
769 self,
770 runtime: tokio::runtime::Runtime,
771 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
772 ) {
773 spawn_fn(Box::new(move || {
774 let _stop_channel = self.stop_channel;
775 let _ = runtime.block_on(self.join_handle);
776 }));
777 }
778}
779
780/// A handle to a running profiler, running on a separate thread.
781///
782/// Currently just allows for stopping the profiler.
783///
784/// Dropping this handle will request that the profiler will stop.
785#[must_use = "dropping this stops the profiler, call .detach() to detach"]
786pub struct RunningProfilerThread {
787 stop_channel: tokio::sync::oneshot::Sender<Control>,
788 join_handle: std::thread::JoinHandle<()>,
789}
790
791impl RunningProfilerThread {
792 /// Request that the current profiler stops and wait until it exits.
793 ///
794 /// This will cause the currently-pending profile information to be flushed.
795 ///
796 /// After this function returns, it is correct and safe to [spawn] a new
797 /// [Profiler], possibly with a different configuration. Therefore,
798 /// this function can be used to "reconfigure" a profiler by stopping
799 /// it and then starting a new one with a different configuration.
800 ///
801 /// [spawn]: Profiler::spawn_controllable
802 pub fn stop(self) {
803 drop(self.stop_channel);
804 let _ = self.join_handle.join();
805 }
806}
807
808/// Rust profiler based on [async-profiler].
809///
810/// Spawning a profiler can be done either in an attached (controllable)
811/// mode, which allows for stopping the profiler (and, in fact, stops
812/// it when the relevant handle is dropped), or in detached mode,
813/// in which the profiler keeps running forever. Applications that can
814/// shut down the profiler at run-time, for example applications that
815/// support reconfiguration of a running profiler, generally want to use
816/// controllable mode. Other applications (most of them) should use
817/// detached mode.
818///
819/// In addition, the profiler can either be spawned into the current Tokio
820/// runtime, or into a new one. Normally, applications should spawn
821/// the profiler into their own Tokio runtime, but applications that
822/// don't have a default Tokio runtime should spawn it into a
823/// different one
824///
825/// This leaves 4 functions:
826/// 1. [Self::spawn] - detached, same runtime
827/// 2. [Self::spawn_thread_to_runtime] - detached, different runtime
828/// 3. [Self::spawn_controllable] - controllable, same runtime
829/// 4. [Self::spawn_controllable_thread_to_runtime] - controllable, different runtime
830///
831/// In addition, there's a helper function that just spawns the profiler
832/// to a new runtime in a new thread, for applications that don't have
833/// a Tokio runtime and don't need complex control:
834///
835/// 5. [Self::spawn_thread] - detached, new runtime in a new thread
836///
837/// [async-profiler]: https://github.com/async-profiler/async-profiler
838pub struct Profiler {
839 reporting_interval: Duration,
840 reporter: Box<dyn Reporter + Send + Sync>,
841 agent_metadata: Option<AgentMetadata>,
842 profiler_options: ProfilerOptions,
843}
844
845impl Profiler {
846 /// Start profiling. The profiler will run in a tokio task at the configured interval.
847 ///
848 /// This is the same as calling [Profiler::spawn_controllable] followed by
849 /// [RunningProfiler::detach], except it returns a [JoinHandle].
850 ///
851 /// The returned [JoinHandle] can be used to detect if the profiler has exited
852 /// due to a fatal error.
853 ///
854 /// This function will fail if it is unable to start async-profiler, for example
855 /// if it can't find or load `libasyncProfiler.so`.
856 ///
857 /// [JoinHandle]: tokio::task::JoinHandle
858 ///
859 /// ### Uploading the last sample
860 ///
861 /// When you return from the Tokio `main`, the agent will terminate without waiting
862 /// for the last profiling JFR to be uploaded. Especially if you have a
863 /// short-running program, if you want to ensure the last profiling JFR
864 /// is uploaded, you should use [Profiler::spawn_controllable] and
865 /// [RunningProfiler::stop] , which allows waiting for the upload
866 /// to finish.
867 ///
868 /// If you do not care about losing the last sample, it is fine to directly
869 /// return from the Tokio `main` without stopping the profiler.
870 ///
871 /// ### Tokio Runtime
872 ///
873 /// This function must be run within a Tokio runtime, otherwise it will panic. If
874 /// your application does not have a `main` Tokio runtime, see
875 /// [Profiler::spawn_thread].
876 ///
877 /// ### Example
878 ///
879 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
880 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
881 ///
882 /// ```
883 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
884 /// # #[tokio::main]
885 /// # async fn main() -> Result<(), SpawnError> {
886 /// let profiler = ProfilerBuilder::default()
887 /// .with_local_reporter("/tmp/profiles")
888 /// .build();
889 /// # if false { // don't spawn the profiler in doctests
890 /// profiler.spawn()?;
891 /// # }
892 /// # Ok(())
893 /// # }
894 /// ```
895 pub fn spawn(self) -> Result<tokio::task::JoinHandle<()>, SpawnError> {
896 self.spawn_controllable().map(RunningProfiler::detach_inner)
897 }
898
899 /// Like [Self::spawn], but instead of spawning within the current Tokio
900 /// runtime, spawns within a set Tokio runtime and then runs a thread that calls
901 /// [block_on](tokio::runtime::Runtime::block_on) on that runtime.
902 ///
903 /// If your configuration is standard, use [Profiler::spawn_thread].
904 ///
905 /// If you want to be able to stop the resulting profiler, use
906 /// [Profiler::spawn_controllable_thread_to_runtime].
907 ///
908 /// `spawn_fn` should be [`std::thread::spawn`], or some function that behaves like it (to
909 /// allow for configuring thread properties, for example thread names).
910 ///
911 /// This is to be used when your program does not have a "main" Tokio runtime already set up.
912 ///
913 /// ### Uploading the last sample
914 ///
915 /// When you return from `main`, the agent will terminate without waiting
916 /// for the last profiling JFR to be uploaded. Especially if you have a
917 /// short-running program, if you want to ensure the last profiling JFR
918 /// is uploaded, you should use [Profiler::spawn_controllable_thread_to_runtime]
919 /// and [RunningProfilerThread::stop], which allows waiting for the upload
920 /// to finish.
921 ///
922 /// If you do not care about losing the last sample, it is fine to directly
923 /// return from the Tokio `main` without stopping the profiler.
924 ///
925 /// ### Example
926 ///
927 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
928 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
929 ///
930 /// ```no_run
931 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
932 /// let rt = tokio::runtime::Builder::new_current_thread()
933 /// .enable_all()
934 /// .build()?;
935 /// let profiler = ProfilerBuilder::default()
936 /// .with_local_reporter("/tmp/profiles")
937 /// .build();
938 ///
939 /// profiler.spawn_thread_to_runtime(
940 /// rt,
941 /// |t| {
942 /// std::thread::Builder::new()
943 /// .name("asprof-agent".to_owned())
944 /// .spawn(t)
945 /// .expect("thread name contains nuls")
946 /// }
947 /// )?;
948 /// # Ok::<_, anyhow::Error>(())
949 /// ```
950 pub fn spawn_thread_to_runtime(
951 self,
952 runtime: tokio::runtime::Runtime,
953 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
954 ) -> Result<(), SpawnError> {
955 self.spawn_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn)
956 }
957
958 /// Like [Self::spawn], but instead of spawning within the current Tokio
959 /// runtime, spawns within a new Tokio runtime and then runs a thread that calls
960 /// [block_on](tokio::runtime::Runtime::block_on) on that runtime, setting up the runtime
961 /// by itself.
962 ///
963 /// If your configuration is less standard, use [Profiler::spawn_thread_to_runtime]. Calling
964 /// [Profiler::spawn_thread] is equivalent to calling [Profiler::spawn_thread_to_runtime]
965 /// with the following:
966 /// 1. a current thread runtime with background worker threads (these exist
967 /// for blocking IO) named "asprof-worker"
968 /// 2. a controller thread (the "main" thread of the runtime) named "asprof-agent"
969 ///
970 /// If you want to be able to stop the resulting profiler, use
971 /// [Profiler::spawn_controllable_thread_to_runtime].
972 ///
973 /// This is to be used when your program does not have a "main" Tokio runtime already set up.
974 ///
975 /// ### Uploading the last sample
976 ///
977 /// When you return from `main`, the agent will terminate without waiting
978 /// for the last profiling JFR to be uploaded. Especially if you have a
979 /// short-running program, if you want to ensure the last profiling JFR
980 /// is uploaded, you should use [Profiler::spawn_controllable_thread_to_runtime]
981 /// and [RunningProfilerThread::stop], which allows waiting for the upload
982 /// to finish.
983 ///
984 /// If you do not care about losing the last sample, it is fine to directly
985 /// return from the Tokio `main` without stopping the profiler.
986 ///
987 /// ### Example
988 ///
989 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
990 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
991 ///
992 /// ```no_run
993 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
994 /// # use async_profiler_agent::reporter::local::LocalReporter;
995 /// let profiler = ProfilerBuilder::default()
996 /// .with_local_reporter("/tmp/profiles")
997 /// .build();
998 ///
999 /// profiler.spawn_thread()?;
1000 /// # Ok::<_, anyhow::Error>(())
1001 /// ```
1002 pub fn spawn_thread(self) -> Result<(), SpawnThreadError> {
1003 // using "asprof" in thread name to deal with 15 character + \0 length limit
1004 let rt = tokio::runtime::Builder::new_current_thread()
1005 .thread_name("asprof-worker".to_owned())
1006 .enable_all()
1007 .build()
1008 .map_err(SpawnThreadError::ConstructRt)?;
1009 let builder = std::thread::Builder::new().name("asprof-agent".to_owned());
1010 self.spawn_thread_to_runtime(rt, |t| builder.spawn(t).expect("thread name contains nuls"))
1011 .map_err(SpawnThreadError::AsProf)
1012 }
1013
1014 fn spawn_thread_inner<E: ProfilerEngine>(
1015 self,
1016 asprof: E,
1017 runtime: tokio::runtime::Runtime,
1018 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
1019 ) -> Result<(), SpawnError> {
1020 let handle: RunningProfiler = runtime.block_on(async move { self.spawn_inner(asprof) })?;
1021 handle.spawn_detached(runtime, spawn_fn);
1022 Ok(())
1023 }
1024
1025 /// Like [Self::spawn], but returns a [RunningProfiler] that allows for controlling
1026 /// (currently only stopping) the profiler.
1027 ///
1028 /// This allows for changing the configuration of the profiler at runtime, by
1029 /// stopping it and then starting a new Profiler with a new configuration. It
1030 /// also allows for stopping profiling in case the profiler is suspected to
1031 /// cause operational issues.
1032 ///
1033 /// Dropping the returned [RunningProfiler] will cause the profiler to quit,
1034 /// so if your application doen't need to change the profiler's configuration at runtime,
1035 /// it will be easier to use [Profiler::spawn].
1036 ///
1037 /// This function will fail if it is unable to start async-profiler, for example
1038 /// if it can't find or load `libasyncProfiler.so`.
1039 ///
1040 /// ### Uploading the last sample
1041 ///
1042 /// When you return from the Tokio `main`, the agent will terminate without waiting
1043 /// for the last profiling JFR to be uploaded. Especially if you have a
1044 /// short-running program, if you want to ensure the last profiling JFR
1045 /// is uploaded, you should use [RunningProfiler::stop], which allows waiting for
1046 /// the upload to finish.
1047 ///
1048 /// If you do not care about losing the last sample, it is fine to directly
1049 /// return from the Tokio `main` without stopping the profiler.
1050 ///
1051 /// ### Tokio Runtime
1052 ///
1053 /// This function must be run within a Tokio runtime, otherwise it will panic. If
1054 /// your application does not have a `main` Tokio runtime, see
1055 /// [Profiler::spawn_controllable_thread_to_runtime].
1056 ///
1057 /// ### Example
1058 ///
1059 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
1060 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
1061 ///
1062 /// ```no_run
1063 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
1064 /// # #[tokio::main]
1065 /// # async fn main() -> Result<(), SpawnError> {
1066 /// let profiler = ProfilerBuilder::default()
1067 /// .with_local_reporter("/tmp/profiles")
1068 /// .build();
1069 ///
1070 /// let profiler = profiler.spawn_controllable()?;
1071 ///
1072 /// // [insert your signaling/monitoring mechanism to have a request to disable
1073 /// // profiling in case of a problem]
1074 /// let got_request_to_disable_profiling = async move {
1075 /// // ...
1076 /// # false
1077 /// };
1078 /// // spawn a task that will disable profiling if requested
1079 /// tokio::task::spawn(async move {
1080 /// if got_request_to_disable_profiling.await {
1081 /// profiler.stop().await;
1082 /// }
1083 /// });
1084 /// # Ok(())
1085 /// # }
1086 /// ```
1087 pub fn spawn_controllable(self) -> Result<RunningProfiler, SpawnError> {
1088 self.spawn_inner(asprof::AsProf::builder().build())
1089 }
1090
1091 /// Like [Self::spawn_controllable], but instead of spawning within the current Tokio
1092 /// runtime, spawns within a set Tokio runtime and then runs a thread that calls
1093 /// [block_on](tokio::runtime::Runtime::block_on) on that runtime.
1094 ///
1095 /// `spawn_fn` should be [`std::thread::spawn`], or some function that behaves like it (to
1096 /// allow for configuring thread properties, for example thread names).
1097 ///
1098 /// This is to be used when your program does not have a "main" Tokio runtime already set up.
1099 ///
1100 /// ### Uploading the last sample
1101 ///
1102 /// When you return from `main`, the agent will terminate without waiting
1103 /// for the last profiling JFR to be uploaded. Especially if you have a
1104 /// short-running program, if you want to ensure the last profiling JFR
1105 /// is uploaded, you should use [RunningProfilerThread::stop], which allows waiting
1106 /// for the upload to finish.
1107 ///
1108 /// If you do not care about losing the last sample, it is fine to directly
1109 /// return from the Tokio `main` without stopping the profiler.
1110 ///
1111 /// ### Example
1112 ///
1113 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
1114 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
1115 ///
1116 /// ```no_run
1117 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
1118 /// let rt = tokio::runtime::Builder::new_current_thread()
1119 /// .enable_all()
1120 /// .build()?;
1121 /// let profiler = ProfilerBuilder::default()
1122 /// .with_local_reporter("/tmp/profiles")
1123 /// .build();
1124 ///
1125 /// let profiler = profiler.spawn_controllable_thread_to_runtime(
1126 /// rt,
1127 /// |t| {
1128 /// std::thread::Builder::new()
1129 /// .name("asprof-agent".to_owned())
1130 /// .spawn(t)
1131 /// .expect("thread name contains nuls")
1132 /// }
1133 /// )?;
1134 ///
1135 /// # fn got_request_to_disable_profiling() -> bool { false }
1136 /// // spawn a task that will disable profiling if requested
1137 /// std::thread::spawn(move || {
1138 /// if got_request_to_disable_profiling() {
1139 /// profiler.stop();
1140 /// }
1141 /// });
1142 /// # Ok::<_, anyhow::Error>(())
1143 /// ```
1144 pub fn spawn_controllable_thread_to_runtime(
1145 self,
1146 runtime: tokio::runtime::Runtime,
1147 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
1148 ) -> Result<RunningProfilerThread, SpawnError> {
1149 self.spawn_controllable_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn)
1150 }
1151
1152 fn spawn_controllable_thread_inner<E: ProfilerEngine>(
1153 self,
1154 asprof: E,
1155 runtime: tokio::runtime::Runtime,
1156 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
1157 ) -> Result<RunningProfilerThread, SpawnError> {
1158 let handle = runtime.block_on(async move { self.spawn_inner(asprof) })?;
1159 Ok(handle.spawn_attached(runtime, spawn_fn))
1160 }
1161
1162 fn spawn_inner<E: ProfilerEngine>(self, asprof: E) -> Result<RunningProfiler, SpawnError> {
1163 // Initialize async profiler - needs to be done once.
1164 E::init_async_profiler()?;
1165 tracing::info!("successfully initialized async profiler.");
1166
1167 let mut sampling_ticker = tokio::time::interval(self.reporting_interval);
1168 let (stop_channel, mut stop_rx) = tokio::sync::oneshot::channel();
1169
1170 // Get profiles at the configured interval rate.
1171 let join_handle = tokio::spawn(async move {
1172 let state = match ProfilerState::new(asprof, self.profiler_options) {
1173 Ok(state) => state,
1174 Err(err) => {
1175 tracing::error!(?err, "unable to create profiler state");
1176 return;
1177 }
1178 };
1179
1180 let mut task = ProfilerTaskState {
1181 state: Some(state),
1182 reporter: self.reporter,
1183 agent_metadata: self.agent_metadata,
1184 reporting_interval: self.reporting_interval,
1185 completed_normally: false,
1186 };
1187
1188 let mut done = false;
1189 while !done {
1190 // Wait until a timer or exit event
1191 tokio::select! {
1192 biased;
1193
1194 r = &mut stop_rx, if !stop_rx.is_terminated() => {
1195 match r {
1196 Err(_) => {
1197 tracing::info!("profiler stop requested, doing a final tick");
1198 done = true;
1199 }
1200 }
1201 }
1202 _ = sampling_ticker.tick() => {
1203 tracing::debug!("profiler timer woke up");
1204 }
1205 }
1206
1207 if let Err(err) = profiler_tick(
1208 &mut task.state,
1209 &mut task.agent_metadata,
1210 task.reporter.as_ref(),
1211 task.reporting_interval,
1212 )
1213 .await
1214 {
1215 match &err {
1216 TickError::Reporter(_) => {
1217 // don't stop on IO errors
1218 tracing::error!(?err, "error during profiling, continuing");
1219 }
1220 _stop => {
1221 tracing::error!(?err, "error during profiling, stopping");
1222 break;
1223 }
1224 }
1225 }
1226 }
1227
1228 task.completed_normally = true;
1229 tracing::info!("profiling task finished");
1230 });
1231
1232 Ok(RunningProfiler {
1233 stop_channel,
1234 join_handle,
1235 })
1236 }
1237}
1238
1239/// Information from a successful profiler stop-start cycle, used by the async
1240/// reporting phase that follows.
1241struct TickCycleInfo {
1242 start_time: SystemTime,
1243 inactive_path: PathBuf,
1244}
1245
1246/// The synchronous (blocking) portion of a profiler tick.
1247///
1248/// Takes owned [`ProfilerState`] and always returns it alongside the result,
1249/// so the caller can restore it regardless of success or failure.
1250fn tick_blocking<E: ProfilerEngine>(
1251 mut state: ProfilerState<E>,
1252) -> (ProfilerState<E>, Result<Option<TickCycleInfo>, TickError>) {
1253 if !state.is_started() {
1254 let result = state.start().map(|()| None).map_err(TickError::from);
1255 return (state, result);
1256 }
1257
1258 let start_time = match state.stop() {
1259 Err(e) => return (state, Err(e.into())),
1260 Ok(None) => {
1261 tracing::warn!("stopped the profiler but it wasn't running?");
1262 return (state, Ok(None));
1263 }
1264 Ok(Some(t)) => t,
1265 };
1266
1267 let jfr_file = match state.jfr_file.as_mut() {
1268 Some(f) => f,
1269 None => {
1270 return (state, Err(TickError::JfrFileMissing));
1271 }
1272 };
1273
1274 if let Err(e) = jfr_file.empty_inactive_file() {
1275 return (state, Err(TickError::EmptyInactiveFile(e)));
1276 }
1277 jfr_file.swap();
1278 let inactive_path = jfr_file.inactive_path();
1279
1280 if let Err(e) = state.start() {
1281 return (state, Err(e.into()));
1282 }
1283
1284 (
1285 state,
1286 Ok(Some(TickCycleInfo {
1287 start_time,
1288 inactive_path,
1289 })),
1290 )
1291}
1292
1293/// # Cancel safety
1294///
1295/// This function is **not** cancel-safe. It moves `ProfilerState` out of `state_holder` via
1296/// `.take()` before awaiting `spawn_blocking`. If the future is dropped (e.g. in a `select!`
1297/// loop) before the state is restored, the profiler state is permanently lost.
1298async fn profiler_tick<E: ProfilerEngine>(
1299 state_holder: &mut Option<ProfilerState<E>>,
1300 agent_metadata: &mut Option<AgentMetadata>,
1301 reporter: &(dyn Reporter + Send + Sync),
1302 reporting_interval: Duration,
1303) -> Result<(), TickError> {
1304 let state = state_holder.take().ok_or(TickError::StateMissing)?;
1305
1306 let (state, result) = tokio::task::spawn_blocking(move || tick_blocking(state))
1307 .await
1308 .map_err(TickError::SpawnBlocking)?;
1309 *state_holder = Some(state);
1310
1311 let Some(info) = result? else {
1312 return Ok(());
1313 };
1314
1315 let start = info.start_time.duration_since(UNIX_EPOCH)?;
1316 let end = SystemTime::now().duration_since(UNIX_EPOCH)?;
1317
1318 // Lazily load the agent metadata if it was not provided in
1319 // the constructor. See the struct comments for why this is.
1320 // This code runs at most once.
1321 let instance = match agent_metadata.as_ref() {
1322 Some(md) => md,
1323 None => {
1324 #[cfg(feature = "aws-metadata-no-defaults")]
1325 let md = crate::metadata::aws::load_agent_metadata().await?;
1326 #[cfg(not(feature = "aws-metadata-no-defaults"))]
1327 let md = crate::metadata::AgentMetadata::NoMetadata;
1328 tracing::debug!("loaded metadata");
1329 agent_metadata.insert(md)
1330 }
1331 };
1332
1333 let report_metadata = ReportMetadata {
1334 instance,
1335 start,
1336 end,
1337 reporting_interval,
1338 };
1339
1340 let jfr = tokio::fs::read(&info.inactive_path)
1341 .await
1342 .map_err(TickError::JfrRead)?;
1343
1344 reporter
1345 .report(jfr, &report_metadata)
1346 .await
1347 .map_err(TickError::Reporter)?;
1348
1349 Ok(())
1350}
1351
1352#[cfg(test)]
1353mod tests {
1354 use std::sync::Arc;
1355 use std::sync::atomic::{self, AtomicBool, AtomicU32};
1356
1357 use test_case::test_case;
1358
1359 use super::*;
1360
1361 #[test]
1362 fn test_jfr_file_drop() {
1363 let mut jfr = JfrFile::new().unwrap();
1364
1365 std::fs::write(jfr.active_path(), b"Hello, 2!").unwrap();
1366 jfr.swap();
1367 assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"Hello, 2!");
1368 jfr.empty_inactive_file().unwrap();
1369 assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"");
1370 }
1371
1372 struct MockProfilerEngine {
1373 counter: AtomicU32,
1374 }
1375 impl ProfilerEngine for MockProfilerEngine {
1376 fn init_async_profiler() -> Result<(), asprof::AsProfError> {
1377 Ok(())
1378 }
1379
1380 fn start_async_profiler(
1381 &self,
1382 jfr_file_path: &Path,
1383 _options: &ProfilerOptions,
1384 ) -> Result<(), asprof::AsProfError> {
1385 let contents = format!(
1386 "JFR{}",
1387 self.counter.fetch_add(1, atomic::Ordering::Relaxed)
1388 );
1389 std::fs::write(jfr_file_path, contents.as_bytes()).unwrap();
1390 Ok(())
1391 }
1392
1393 fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
1394 Ok(())
1395 }
1396 }
1397
1398 struct MockReporter(tokio::sync::mpsc::Sender<(String, AgentMetadata)>);
1399 impl std::fmt::Debug for MockReporter {
1400 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1401 f.debug_struct("MockReporter").finish()
1402 }
1403 }
1404
1405 #[async_trait::async_trait]
1406 impl Reporter for MockReporter {
1407 async fn report(
1408 &self,
1409 jfr: Vec<u8>,
1410 metadata: &ReportMetadata,
1411 ) -> Result<(), Box<dyn std::error::Error + Send>> {
1412 self.0
1413 .send((String::from_utf8(jfr).unwrap(), metadata.instance.clone()))
1414 .await
1415 .unwrap();
1416 Ok(())
1417 }
1418 }
1419
1420 fn make_mock_profiler() -> (
1421 Profiler,
1422 tokio::sync::mpsc::Receiver<(String, AgentMetadata)>,
1423 ) {
1424 let (tx, rx) = tokio::sync::mpsc::channel(1);
1425 let agent = ProfilerBuilder::default()
1426 .with_reporter(MockReporter(tx))
1427 .with_custom_agent_metadata(AgentMetadata::Ec2AgentMetadata {
1428 aws_account_id: "0".into(),
1429 aws_region_id: "us-east-1".into(),
1430 ec2_instance_id: "i-fake".into(),
1431 ec2_instance_type: "t3.micro".into(),
1432 })
1433 .build();
1434 (agent, rx)
1435 }
1436
1437 #[tokio::test(start_paused = true)]
1438 async fn test_profiler_agent() {
1439 let e_md = AgentMetadata::Ec2AgentMetadata {
1440 aws_account_id: "0".into(),
1441 aws_region_id: "us-east-1".into(),
1442 ec2_instance_id: "i-fake".into(),
1443 ec2_instance_type: "t3.micro".into(),
1444 };
1445 let (agent, mut rx) = make_mock_profiler();
1446 agent
1447 .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1448 counter: AtomicU32::new(0),
1449 })
1450 .unwrap()
1451 .detach();
1452 let (jfr, md) = rx.recv().await.unwrap();
1453 assert_eq!(jfr, "JFR0");
1454 assert_eq!(e_md, md);
1455 let (jfr, md) = rx.recv().await.unwrap();
1456 assert_eq!(jfr, "JFR1");
1457 assert_eq!(e_md, md);
1458 }
1459
1460 #[test_case(false; "uncontrollable")]
1461 #[test_case(true; "controllable")]
1462 fn test_profiler_local_rt(controllable: bool) {
1463 let e_md = AgentMetadata::Ec2AgentMetadata {
1464 aws_account_id: "0".into(),
1465 aws_region_id: "us-east-1".into(),
1466 ec2_instance_id: "i-fake".into(),
1467 ec2_instance_type: "t3.micro".into(),
1468 };
1469 let (agent, mut rx) = make_mock_profiler();
1470 let rt = tokio::runtime::Builder::new_current_thread()
1471 .enable_all()
1472 .start_paused(true)
1473 .build()
1474 .unwrap();
1475 // spawn the profiler, doing this before spawning a thread to allow
1476 // capturing errors from `spawn`
1477 let handle = if controllable {
1478 Some(
1479 agent
1480 .spawn_controllable_thread_inner::<MockProfilerEngine>(
1481 MockProfilerEngine {
1482 counter: AtomicU32::new(0),
1483 },
1484 rt,
1485 std::thread::spawn,
1486 )
1487 .unwrap(),
1488 )
1489 } else {
1490 agent
1491 .spawn_thread_inner::<MockProfilerEngine>(
1492 MockProfilerEngine {
1493 counter: AtomicU32::new(0),
1494 },
1495 rt,
1496 std::thread::spawn,
1497 )
1498 .unwrap();
1499 None
1500 };
1501
1502 let (jfr, md) = rx.blocking_recv().unwrap();
1503 assert_eq!(jfr, "JFR0");
1504 assert_eq!(e_md, md);
1505 let (jfr, md) = rx.blocking_recv().unwrap();
1506 assert_eq!(jfr, "JFR1");
1507 assert_eq!(e_md, md);
1508
1509 if let Some(handle) = handle {
1510 let drain_thread =
1511 std::thread::spawn(move || while let Some(_) = rx.blocking_recv() {});
1512 // request a stop
1513 handle.stop();
1514 // the drain thread should be done
1515 drain_thread.join().unwrap();
1516 }
1517 }
1518
1519 enum StopKind {
1520 Delibrate,
1521 Drop,
1522 Abort,
1523 }
1524
1525 #[tokio::test(start_paused = true)]
1526 #[test_case(StopKind::Delibrate; "deliberate stop")]
1527 #[test_case(StopKind::Drop; "drop stop")]
1528 #[test_case(StopKind::Abort; "abort stop")]
1529 async fn test_profiler_stop(stop_kind: StopKind) {
1530 let e_md = AgentMetadata::Ec2AgentMetadata {
1531 aws_account_id: "0".into(),
1532 aws_region_id: "us-east-1".into(),
1533 ec2_instance_id: "i-fake".into(),
1534 ec2_instance_type: "t3.micro".into(),
1535 };
1536 let (agent, mut rx) = make_mock_profiler();
1537 let profiler_ref = agent
1538 .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1539 counter: AtomicU32::new(0),
1540 })
1541 .unwrap();
1542 let (jfr, md) = rx.recv().await.unwrap();
1543 assert_eq!(jfr, "JFR0");
1544 assert_eq!(e_md, md);
1545 let (jfr, md) = rx.recv().await.unwrap();
1546 assert_eq!(jfr, "JFR1");
1547 assert_eq!(e_md, md);
1548 // check that stop is faster than an interval and returns an "immediate" next jfr
1549 match stop_kind {
1550 StopKind::Drop => drop(profiler_ref),
1551 StopKind::Delibrate => {
1552 tokio::time::timeout(Duration::from_millis(1), profiler_ref.stop())
1553 .await
1554 .unwrap();
1555 }
1556 StopKind::Abort => {
1557 // You can call Abort on the JoinHandle. make sure that is not buggy.
1558 profiler_ref.detach_inner().abort();
1559 }
1560 }
1561 // check that we get the next JFR "quickly", and the JFR after that is empty.
1562 let (jfr, md) = tokio::time::timeout(Duration::from_millis(1), rx.recv())
1563 .await
1564 .unwrap()
1565 .unwrap();
1566 assert_eq!(jfr, "JFR2");
1567 assert_eq!(e_md, md);
1568 assert!(rx.recv().await.is_none());
1569 }
1570
1571 // simulate a badly-behaved profiler that errors on start/stop and then
1572 // tries to access the JFR file
1573 struct StopErrorProfilerEngine {
1574 start_error: bool,
1575 counter: Arc<AtomicBool>,
1576 }
1577 impl ProfilerEngine for StopErrorProfilerEngine {
1578 fn init_async_profiler() -> Result<(), asprof::AsProfError> {
1579 Ok(())
1580 }
1581
1582 fn start_async_profiler(
1583 &self,
1584 jfr_file_path: &Path,
1585 _options: &ProfilerOptions,
1586 ) -> Result<(), asprof::AsProfError> {
1587 let jfr_file_path = jfr_file_path.to_owned();
1588 std::fs::write(&jfr_file_path, "JFR").unwrap();
1589 let counter = self.counter.clone();
1590 tokio::task::spawn(async move {
1591 tokio::time::sleep(Duration::from_secs(5)).await;
1592 assert_eq!(std::fs::read_to_string(jfr_file_path).unwrap(), "JFR");
1593 counter.store(true, atomic::Ordering::Release);
1594 });
1595 if self.start_error {
1596 Err(asprof::AsProfError::AsyncProfilerError("error".into()))
1597 } else {
1598 Ok(())
1599 }
1600 }
1601
1602 fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
1603 Err(asprof::AsProfError::AsyncProfilerError("error".into()))
1604 }
1605 }
1606
1607 #[tokio::test(start_paused = true)]
1608 #[test_case(false; "error on stop")]
1609 #[test_case(true; "error on start")]
1610 async fn test_profiler_error(start_error: bool) {
1611 let (agent, mut rx) = make_mock_profiler();
1612 let counter = Arc::new(AtomicBool::new(false));
1613 let engine = StopErrorProfilerEngine {
1614 start_error,
1615 counter: counter.clone(),
1616 };
1617 let handle = agent.spawn_inner(engine).unwrap().detach_inner();
1618 assert!(rx.recv().await.is_none());
1619 // check that the "sleep 5" step in start_async_profiler succeeds
1620 for _ in 0..100 {
1621 tokio::time::sleep(Duration::from_secs(1)).await;
1622 if counter.load(atomic::Ordering::Acquire) {
1623 handle.await.unwrap(); // Check that the JoinHandle is done
1624 return;
1625 }
1626 }
1627 panic!("didn't read from file");
1628 }
1629
1630 #[test]
1631 fn test_profiler_options_to_args_string_default() {
1632 let opts = ProfilerOptions::default();
1633 let dummy_path = Path::new("/tmp/test.jfr");
1634 let args = opts.to_args_string(dummy_path);
1635 assert!(
1636 args.contains("start,event=cpu,interval=100000000,wall=1000ms,jfr,cstack=dwarf"),
1637 "Default args string not constructed correctly"
1638 );
1639 assert!(args.contains("file=/tmp/test.jfr"));
1640 assert!(!args.contains("nativemem="));
1641 }
1642
1643 #[test]
1644 fn test_profiler_options_to_args_string_with_native_mem() {
1645 let opts = ProfilerOptions {
1646 native_mem: Some("10m".to_string()),
1647 wall_clock_millis: None,
1648 cpu_interval: None,
1649 };
1650 let dummy_path = Path::new("/tmp/test.jfr");
1651 let args = opts.to_args_string(dummy_path);
1652 assert!(args.contains("nativemem=10m"));
1653 }
1654
1655 #[test]
1656 fn test_profiler_options_builder() {
1657 let opts = ProfilerOptionsBuilder::default()
1658 .with_native_mem_bytes(5000000)
1659 .build();
1660
1661 assert_eq!(opts.native_mem, Some("5000000".to_string()));
1662 }
1663
1664 #[test]
1665 fn test_profiler_options_builder_all_options() {
1666 let opts = ProfilerOptionsBuilder::default()
1667 .with_native_mem_bytes(5000000)
1668 .with_cpu_interval(Duration::from_secs(1))
1669 .with_wall_clock_interval(Duration::from_secs(10))
1670 .build();
1671
1672 let dummy_path = Path::new("/tmp/test.jfr");
1673 let args = opts.to_args_string(dummy_path);
1674 assert_eq!(
1675 args,
1676 "start,event=cpu,interval=1000000000,wall=10000ms,jfr,cstack=dwarf,file=/tmp/test.jfr,nativemem=5000000"
1677 );
1678 }
1679
1680 #[test]
1681 fn test_local_reporter_has_no_metadata() {
1682 // Check that with_local_reporter sets some configuration
1683 let reporter = ProfilerBuilder::default().with_local_reporter(".");
1684 assert_eq!(
1685 format!("{:?}", reporter.reporter),
1686 r#"Some(LocalReporter { directory: "." })"#
1687 );
1688 match reporter.agent_metadata {
1689 Some(AgentMetadata::NoMetadata) => {}
1690 bad => panic!("{bad:?}"),
1691 };
1692 }
1693
1694 /// A reporter that tracks both async and blocking reports separately.
1695 struct BlockingMockReporter {
1696 async_tx: tokio::sync::mpsc::Sender<String>,
1697 blocking_reports: Arc<std::sync::Mutex<Vec<String>>>,
1698 }
1699 impl std::fmt::Debug for BlockingMockReporter {
1700 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1701 f.debug_struct("BlockingMockReporter").finish()
1702 }
1703 }
1704
1705 #[async_trait::async_trait]
1706 impl Reporter for BlockingMockReporter {
1707 async fn report(
1708 &self,
1709 jfr: Vec<u8>,
1710 _metadata: &ReportMetadata,
1711 ) -> Result<(), Box<dyn std::error::Error + Send>> {
1712 self.async_tx
1713 .send(String::from_utf8(jfr).unwrap())
1714 .await
1715 .unwrap();
1716 Ok(())
1717 }
1718
1719 fn report_blocking(
1720 &self,
1721 jfr_path: &Path,
1722 _metadata: &ReportMetadata,
1723 ) -> Result<(), Box<dyn std::error::Error + Send>> {
1724 let jfr = std::fs::read(jfr_path).map_err(|e| Box::new(e) as _)?;
1725 self.blocking_reports
1726 .lock()
1727 .unwrap()
1728 .push(String::from_utf8(jfr).unwrap());
1729 Ok(())
1730 }
1731 }
1732
1733 /// Simulates a runtime shutdown while the profiler is running.
1734 /// The profiler should call report_blocking on drop to flush the
1735 /// last sample.
1736 #[test]
1737 fn test_profiler_report_on_drop() {
1738 let blocking_reports = Arc::new(std::sync::Mutex::new(Vec::new()));
1739
1740 let rt = tokio::runtime::Builder::new_current_thread()
1741 .enable_all()
1742 .start_paused(true)
1743 .build()
1744 .unwrap();
1745
1746 let reports_clone = blocking_reports.clone();
1747 rt.block_on(async {
1748 let (async_tx, mut async_rx) = tokio::sync::mpsc::channel::<String>(10);
1749 let agent = ProfilerBuilder::default()
1750 .with_reporter(BlockingMockReporter {
1751 async_tx,
1752 blocking_reports: reports_clone,
1753 })
1754 .with_custom_agent_metadata(AgentMetadata::NoMetadata)
1755 .build();
1756 // Detach so the stop channel doesn't trigger a graceful stop
1757 // when the block_on future returns.
1758 agent
1759 .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1760 counter: AtomicU32::new(0),
1761 })
1762 .unwrap()
1763 .detach();
1764
1765 // Wait for first async report to confirm profiler is running
1766 let jfr = async_rx.recv().await.unwrap();
1767 assert_eq!(jfr, "JFR0");
1768 // Return without stopping — runtime drop will cancel the task.
1769 });
1770
1771 // Runtime shutdown cancels all tasks, triggering ProfilerTaskState::Drop.
1772 drop(rt);
1773
1774 let reports = blocking_reports.lock().unwrap();
1775 assert_eq!(
1776 reports.len(),
1777 1,
1778 "expected exactly one blocking report on drop"
1779 );
1780 assert_eq!(reports[0], "JFR1");
1781 }
1782}