async_profiler_agent/profiler.rs
1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! A profiler that periodically uploads profiling samples of your program to a [Reporter]
5
6use crate::{
7 asprof::{self, AsProfError},
8 metadata::{AgentMetadata, ReportMetadata},
9 reporter::{Reporter, local::LocalReporter},
10};
11use std::{
12 fs::File,
13 io,
14 path::{Path, PathBuf},
15 time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
16};
17use thiserror::Error;
18
19struct JfrFile {
20 active: std::fs::File,
21 inactive: std::fs::File,
22}
23
24impl JfrFile {
25 #[cfg(target_os = "linux")]
26 fn new() -> Result<Self, io::Error> {
27 Ok(Self {
28 active: tempfile::tempfile().unwrap(),
29 inactive: tempfile::tempfile().unwrap(),
30 })
31 }
32
33 #[cfg(not(target_os = "linux"))]
34 fn new() -> Result<Self, io::Error> {
35 Err(io::Error::other(
36 "async-profiler is only supported on Linux",
37 ))
38 }
39
40 fn swap(&mut self) {
41 std::mem::swap(&mut self.active, &mut self.inactive);
42 }
43
44 #[cfg(target_os = "linux")]
45 fn file_path(file: &std::fs::File) -> PathBuf {
46 use std::os::fd::AsRawFd;
47
48 format!("/proc/self/fd/{}", file.as_raw_fd()).into()
49 }
50
51 #[cfg(not(target_os = "linux"))]
52 fn file_path(_file: &std::fs::File) -> PathBuf {
53 unimplemented!()
54 }
55
56 fn active_path(&self) -> PathBuf {
57 Self::file_path(&self.active)
58 }
59
60 fn inactive_path(&self) -> PathBuf {
61 Self::file_path(&self.inactive)
62 }
63
64 fn empty_inactive_file(&mut self) -> Result<(), io::Error> {
65 // Empty the file, or create it for the first time if the profiler hasn't
66 // started yet.
67 File::create(Self::file_path(&self.inactive))?;
68 tracing::debug!(message = "emptied the file");
69 Ok(())
70 }
71}
72
73/// Options for configuring the async-profiler behavior.
74/// Currently supports:
75/// - Native memory allocation tracking
76#[derive(Debug, Default)]
77#[non_exhaustive]
78pub struct ProfilerOptions {
79 /// If set, the profiler will collect information about
80 /// native memory allocations.
81 ///
82 /// The value is the interval in bytes or in other units,
83 /// if followed by k (kilobytes), m (megabytes), or g (gigabytes).
84 /// For example, `"10m"` will sample an allocation for every
85 /// 10 megabytes of memory allocated. Passing `"0"` will sample
86 /// all allocations.
87 ///
88 /// See [ProfilingModes in the async-profiler docs] for more details.
89 ///
90 /// [ProfilingModes in the async-profiler docs]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilingModes.md#native-memory-leaks
91 pub native_mem: Option<String>,
92 cpu_interval: Option<u128>,
93 wall_clock_millis: Option<u128>,
94}
95
96const DEFAULT_CPU_INTERVAL_NANOS: u128 = 100_000_000;
97const DEFAULT_WALL_CLOCK_INTERVAL_MILLIS: u128 = 1_000;
98
99impl ProfilerOptions {
100 /// Convert the profiler options to a string of arguments for the async-profiler.
101 pub fn to_args_string(&self, jfr_file_path: &std::path::Path) -> String {
102 let mut args = format!(
103 "start,event=cpu,interval={},wall={}ms,jfr,cstack=dwarf,file={}",
104 self.cpu_interval.unwrap_or(DEFAULT_CPU_INTERVAL_NANOS),
105 self.wall_clock_millis
106 .unwrap_or(DEFAULT_WALL_CLOCK_INTERVAL_MILLIS),
107 jfr_file_path.display()
108 );
109 if let Some(ref native_mem) = self.native_mem {
110 args.push_str(&format!(",nativemem={native_mem}"));
111 }
112 args
113 }
114}
115
116/// Builder for [`ProfilerOptions`].
117#[derive(Debug, Default)]
118pub struct ProfilerOptionsBuilder {
119 native_mem: Option<String>,
120 cpu_interval: Option<u128>,
121 wall_clock_millis: Option<u128>,
122}
123
124impl ProfilerOptionsBuilder {
125 /// Same as [ProfilerOptionsBuilder::with_native_mem_bytes], but pass
126 /// the string input directly to async_profiler.
127 ///
128 /// The value is the interval in bytes or in other units,
129 /// if followed by k (kilobytes), m (megabytes), or g (gigabytes).
130 ///
131 /// Prefer using [ProfilerOptionsBuilder::with_native_mem_bytes], since it's
132 /// type-checked.
133 ///
134 /// ### Examples
135 ///
136 /// This will sample allocations for every 10 megabytes allocated:
137 ///
138 /// ```
139 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
140 /// # use async_profiler_agent::profiler::SpawnError;
141 /// # #[tokio::main]
142 /// # async fn main() -> Result<(), SpawnError> {
143 /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build();
144 /// let profiler = ProfilerBuilder::default()
145 /// .with_profiler_options(opts)
146 /// .with_local_reporter("/tmp/profiles")
147 /// .build();
148 /// # if false { // don't spawn the profiler in doctests
149 /// let profiler = profiler.spawn_controllable()?;
150 /// // ... your program goes here
151 /// profiler.stop().await; // make sure the last profile is flushed
152 /// # }
153 /// # Ok(())
154 /// # }
155 /// ```
156 pub fn with_native_mem(mut self, native_mem_interval: String) -> Self {
157 self.native_mem = Some(native_mem_interval);
158 self
159 }
160
161 /// If set, the profiler will collect information about
162 /// native memory allocations.
163 ///
164 /// The argument passed is the profiling interval - the profiler will
165 /// sample allocations every about that many bytes.
166 ///
167 /// See [ProfilingModes in the async-profiler docs] for more details.
168 ///
169 /// [ProfilingModes in the async-profiler docs]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilingModes.md#native-memory-leaks
170 ///
171 /// ### Examples
172 ///
173 /// This will sample allocations for every 10 megabytes allocated:
174 ///
175 /// ```
176 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
177 /// # use async_profiler_agent::profiler::SpawnError;
178 /// # #[tokio::main]
179 /// # async fn main() -> Result<(), SpawnError> {
180 /// let opts = ProfilerOptionsBuilder::default().with_native_mem_bytes(10_000_000).build();
181 /// let profiler = ProfilerBuilder::default()
182 /// .with_profiler_options(opts)
183 /// .with_local_reporter("/tmp/profiles")
184 /// .build();
185 /// # if false { // don't spawn the profiler in doctests
186 /// let profiler = profiler.spawn_controllable()?;
187 /// // ... your program goes here
188 /// profiler.stop().await; // make sure the last profile is flushed
189 /// # }
190 /// # Ok(())
191 /// # }
192 /// ```
193 ///
194 /// This will sample every allocation (potentially slow):
195 /// ```
196 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
197 /// # use async_profiler_agent::profiler::SpawnError;
198 /// # #[tokio::main]
199 /// # async fn main() -> Result<(), SpawnError> {
200 /// let opts = ProfilerOptionsBuilder::default().with_native_mem_bytes(0).build();
201 /// let profiler = ProfilerBuilder::default()
202 /// .with_profiler_options(opts)
203 /// .with_local_reporter("/tmp/profiles")
204 /// .build();
205 /// # if false { // don't spawn the profiler in doctests
206 /// let profiler = profiler.spawn_controllable()?;
207 /// // ... your program goes here
208 /// profiler.stop().await; // make sure the last profile is flushed
209 /// # }
210 /// # Ok(())
211 /// # }
212 /// ```
213 pub fn with_native_mem_bytes(mut self, native_mem_interval: usize) -> Self {
214 self.native_mem = Some(native_mem_interval.to_string());
215 self
216 }
217
218 /// Sets the interval in which the profiler will collect
219 /// CPU-time samples, via the [async-profiler `interval` option].
220 ///
221 /// CPU-time samples (JFR `jdk.ExecutionSample`) sample only threads that
222 /// are currently running on a CPU, not threads that are sleeping.
223 ///
224 /// It can use a higher frequency than wall-clock sampling since the
225 /// number of the threads that are running on a CPU at a given time is
226 /// naturally limited by the number of CPUs, while the number of sleeping
227 /// threads can be much larger.
228 ///
229 /// The default is to do a CPU-time sample every 100 milliseconds.
230 ///
231 /// The async-profiler agent collects both CPU time and wall-clock time
232 /// samples, so this function should normally be used along with
233 /// [ProfilerOptionsBuilder::with_wall_clock_interval].
234 ///
235 /// [async-profiler `interval` option]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilerOptions.md#options-applicable-to-any-output-format
236 ///
237 /// ### Examples
238 ///
239 /// This will sample allocations for every 10 CPU milliseconds (when running)
240 /// and 100 wall-clock milliseconds (running or sleeping):
241 ///
242 /// ```
243 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
244 /// # use async_profiler_agent::profiler::SpawnError;
245 /// # use std::time::Duration;
246 /// # #[tokio::main]
247 /// # async fn main() -> Result<(), SpawnError> {
248 /// let opts = ProfilerOptionsBuilder::default()
249 /// .with_cpu_interval(Duration::from_millis(10))
250 /// .with_wall_clock_interval(Duration::from_millis(100))
251 /// .build();
252 /// let profiler = ProfilerBuilder::default()
253 /// .with_profiler_options(opts)
254 /// .with_local_reporter("/tmp/profiles")
255 /// .build();
256 /// # if false { // don't spawn the profiler in doctests
257 /// let profiler = profiler.spawn_controllable()?;
258 /// // ... your program goes here
259 /// profiler.stop().await; // make sure the last profile is flushed
260 /// # }
261 /// # Ok(())
262 /// # }
263 /// ```
264 pub fn with_cpu_interval(mut self, cpu_interval: Duration) -> Self {
265 self.cpu_interval = Some(cpu_interval.as_nanos());
266 self
267 }
268
269 /// Sets the interval, in milliseconds, in which the profiler will collect
270 /// wall-clock samples, via the [async-profiler `wall` option].
271 ///
272 /// Wall-clock samples (JFR `profiler.WallClockSample`) sample threads
273 /// whether they are sleeping or running, and can therefore be
274 /// very useful for finding threads that are blocked, for example
275 /// on a synchronous lock or a slow system call.
276 ///
277 /// When using Tokio, since tasks are not threads, tasks that are not
278 /// currently running will not be sampled by a wall clock sample. However,
279 /// a wall clock sample is still very useful in Tokio, since it is what
280 /// you want to catch tasks that are blocking a thread by waiting on
281 /// synchronous operations.
282 ///
283 /// The default is to do a wall-clock sample every second.
284 ///
285 /// The async-profiler agent collects both CPU time and wall-clock time
286 /// samples, so this function should normally be used along with
287 /// [ProfilerOptionsBuilder::with_cpu_interval].
288 ///
289 /// [async-profiler `wall` option]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilerOptions.md#options-applicable-to-any-output-format
290 ///
291 /// ### Examples
292 ///
293 /// This will sample allocations for every 10 CPU milliseconds (when running)
294 /// and 100 wall-clock milliseconds (running or sleeping):
295 ///
296 /// ```
297 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
298 /// # use async_profiler_agent::profiler::SpawnError;
299 /// # use std::time::Duration;
300 /// # #[tokio::main]
301 /// # async fn main() -> Result<(), SpawnError> {
302 /// let opts = ProfilerOptionsBuilder::default()
303 /// .with_cpu_interval(Duration::from_millis(10))
304 /// .with_wall_clock_interval(Duration::from_millis(10))
305 /// .build();
306 /// let profiler = ProfilerBuilder::default()
307 /// .with_profiler_options(opts)
308 /// .with_local_reporter("/tmp/profiles")
309 /// .build();
310 /// # if false { // don't spawn the profiler in doctests
311 /// let profiler = profiler.spawn_controllable()?;
312 /// // ... your program goes here
313 /// profiler.stop().await; // make sure the last profile is flushed
314 /// # }
315 /// # Ok(())
316 /// # }
317 /// ```
318 pub fn with_wall_clock_interval(mut self, wall_clock: Duration) -> Self {
319 self.wall_clock_millis = Some(wall_clock.as_millis());
320 self
321 }
322
323 /// Build the [`ProfilerOptions`] from the builder.
324 pub fn build(self) -> ProfilerOptions {
325 ProfilerOptions {
326 native_mem: self.native_mem,
327 wall_clock_millis: self.wall_clock_millis,
328 cpu_interval: self.cpu_interval,
329 }
330 }
331}
332
333/// Builds a [`Profiler`], panicking if any required fields were not set by the
334/// time `build` is called.
335#[derive(Debug, Default)]
336pub struct ProfilerBuilder {
337 reporting_interval: Option<Duration>,
338 reporter: Option<Box<dyn Reporter + Send + Sync>>,
339 agent_metadata: Option<AgentMetadata>,
340 profiler_options: Option<ProfilerOptions>,
341}
342
343impl ProfilerBuilder {
344 /// Sets the reporting interval (default: 30 seconds).
345 ///
346 /// This is the interval that samples are *reported* to the backend,
347 /// and is unrelated to the interval at which the application
348 /// is *sampled* by async profiler, which is controlled by
349 /// [ProfilerOptionsBuilder::with_cpu_interval] and
350 /// [ProfilerOptionsBuilder::with_wall_clock_interval].
351 ///
352 /// Most users should not change this setting.
353 ///
354 /// ## Example
355 ///
356 /// ```no_run
357 /// # use async_profiler_agent::profiler::SpawnError;
358 /// # #[tokio::main]
359 /// # async fn main() -> Result<(), SpawnError> {
360 /// # use std::path::PathBuf;
361 /// # use std::time::Duration;
362 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
363 /// # let path = PathBuf::from(".");
364 /// let agent = ProfilerBuilder::default()
365 /// .with_local_reporter(path)
366 /// .with_reporting_interval(Duration::from_secs(15))
367 /// .build()
368 /// .spawn_controllable()?;
369 /// // .. your program goes here
370 /// agent.stop().await; // make sure the last profile is flushed
371 /// # Ok::<_, SpawnError>(())
372 /// # }
373 /// ```
374 pub fn with_reporting_interval(mut self, i: Duration) -> ProfilerBuilder {
375 self.reporting_interval = Some(i);
376 self
377 }
378
379 /// Sets the [`Reporter`], which is used to upload the collected profiling
380 /// data. Common reporters are [`LocalReporter`], and, with the `s3-no-defaults`
381 /// feature enabled,
382 #[cfg_attr(not(feature = "s3-no-defaults"), doc = "`S3Reporter`.")]
383 #[cfg_attr(feature = "s3-no-defaults", doc = "[`S3Reporter`].")]
384 /// It is also possible to write your own [`Reporter`].
385 ///
386 /// It's normally easier to use [`LocalReporter`] directly via
387 /// [`ProfilerBuilder::with_local_reporter`].
388 ///
389 /// If you want to output to multiple reporters, you can use
390 /// [`MultiReporter`].
391 ///
392 /// [`LocalReporter`]: crate::reporter::local::LocalReporter
393 /// [`MultiReporter`]: crate::reporter::multi::MultiReporter
394 #[cfg_attr(
395 feature = "s3-no-defaults",
396 doc = "[`S3Reporter`]: crate::reporter::s3::S3Reporter"
397 )]
398 ///
399 #[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example.md"))]
400 pub fn with_reporter(mut self, r: impl Reporter + Send + Sync + 'static) -> ProfilerBuilder {
401 self.reporter = Some(Box::new(r));
402 self
403 }
404
405 /// Sets the profiler to ues [LocalReporter], which will write `.jfr` files to `path`,
406 /// and disables metadata auto-detection (see [`ProfilerBuilder::with_custom_agent_metadata`])
407 /// since the [LocalReporter] does not need that.
408 ///
409 /// This is useful for testing, since metadata auto-detection currently only works
410 /// on [Amazon EC2] or [Amazon Fargate] instances.
411 ///
412 /// The local reporter should normally not be used in production, since it will
413 /// not clean up JFR files. Instead, you can use a pre-existing [`Reporter`]
414 /// or write your own (see [`ProfilerBuilder::with_reporter`]).
415 ///
416 /// [Amazon EC2]: https://aws.amazon.com/ec2
417 /// [Amazon Fargate]: https://aws.amazon.com/fargate
418 ///
419 /// ## Example
420 ///
421 /// This will write profiles as `.jfr` files to `./path-to-profiles`:
422 ///
423 /// ```no_run
424 /// # use std::path::PathBuf;
425 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
426 /// # use async_profiler_agent::reporter::local::LocalReporter;
427 /// # use async_profiler_agent::metadata::AgentMetadata;
428 /// let path = PathBuf::from("./path-to-profiles");
429 /// let agent = ProfilerBuilder::default()
430 /// .with_local_reporter(path)
431 /// .build()
432 /// .spawn()?;
433 /// # Ok::<_, SpawnError>(())
434 /// ```
435 pub fn with_local_reporter(mut self, path: impl Into<PathBuf>) -> ProfilerBuilder {
436 self.reporter = Some(Box::new(LocalReporter::new(path.into())));
437 self.with_custom_agent_metadata(AgentMetadata::NoMetadata)
438 }
439
440 /// Provide custom agent metadata.
441 ///
442 /// The async-profiler Rust agent sends metadata to the [Reporter] with
443 /// the identity of the current host and process, which is normally
444 /// transmitted as `metadata.json` within the generated `.zip` file,
445 /// using the schema format [`reporter::s3::MetadataJson`].
446 ///
447 /// That metadata can later be used by tooling to be able to sort
448 /// profiling reports by host.
449 ///
450 /// async-profiler Rust agent will by default try to fetch the metadata
451 /// using [IMDS] when running on [Amazon EC2] or [Amazon Fargate], and
452 /// will error if it's unable to find it. If you are running the
453 /// async-profiler agent on any other form of compute,
454 /// you will need to create and attach your own metadata
455 /// by calling this function.
456 ///
457 #[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example-custom-metadata.md"))]
458 /// [`reporter::s3::MetadataJson`]: crate::reporter::s3::MetadataJson
459 /// [Amazon EC2]: https://aws.amazon.com/ec2
460 /// [Amazon Fargate]: https://aws.amazon.com/fargate
461 /// [IMDS]: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
462 pub fn with_custom_agent_metadata(mut self, j: AgentMetadata) -> ProfilerBuilder {
463 self.agent_metadata = Some(j);
464 self
465 }
466
467 /// Provide custom profiler options.
468 ///
469 /// ### Example
470 ///
471 /// This will sample allocations for every 10 megabytes allocated:
472 ///
473 /// ```
474 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
475 /// # use async_profiler_agent::profiler::SpawnError;
476 /// # #[tokio::main]
477 /// # async fn main() -> Result<(), SpawnError> {
478 /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build();
479 /// let profiler = ProfilerBuilder::default()
480 /// .with_profiler_options(opts)
481 /// .with_local_reporter("/tmp/profiles")
482 /// .build();
483 /// # if false { // don't spawn the profiler in doctests
484 /// let profiler = profiler.spawn_controllable()?;
485 /// // ... your program goes here
486 /// profiler.stop().await; // make sure the last profile is flushed
487 /// # }
488 /// # Ok(())
489 /// # }
490 /// ```
491 pub fn with_profiler_options(mut self, c: ProfilerOptions) -> ProfilerBuilder {
492 self.profiler_options = Some(c);
493 self
494 }
495
496 /// Turn this builder into a profiler!
497 pub fn build(self) -> Profiler {
498 Profiler {
499 reporting_interval: self.reporting_interval.unwrap_or(Duration::from_secs(30)),
500 reporter: self.reporter.expect("reporter is required"),
501 agent_metadata: self.agent_metadata,
502 profiler_options: self.profiler_options.unwrap_or_default(),
503 }
504 }
505}
506
507enum Status {
508 Idle,
509 Starting,
510 Running(SystemTime),
511}
512
513/// This type provides wrapper APIs over [`asprof::AsProf`], to allow tracking
514/// of the state of the profiler. The primary benefit of this is RAII - when
515/// this type drops, it will stop the profiler if it's running.
516struct ProfilerState<E: ProfilerEngine> {
517 // this is only None in the destructor when stopping the async-profiler fails
518 jfr_file: Option<JfrFile>,
519 asprof: E,
520 status: Status,
521 profiler_options: ProfilerOptions,
522}
523
524impl<E: ProfilerEngine> ProfilerState<E> {
525 fn new(asprof: E, profiler_options: ProfilerOptions) -> Result<Self, io::Error> {
526 Ok(Self {
527 jfr_file: Some(JfrFile::new()?),
528 asprof,
529 status: Status::Idle,
530 profiler_options,
531 })
532 }
533
534 fn jfr_file_mut(&mut self) -> &mut JfrFile {
535 self.jfr_file.as_mut().unwrap()
536 }
537
538 async fn start(&mut self) -> Result<(), AsProfError> {
539 let active = self.jfr_file.as_ref().unwrap().active_path();
540 // drop guard - make sure the files are leaked if the profiler might have started
541 self.status = Status::Starting;
542 E::start_async_profiler(&self.asprof, &active, &self.profiler_options)?;
543 self.status = Status::Running(SystemTime::now());
544 Ok(())
545 }
546
547 fn stop(&mut self) -> Result<Option<SystemTime>, AsProfError> {
548 E::stop_async_profiler()?;
549 let status = std::mem::replace(&mut self.status, Status::Idle);
550 Ok(match status {
551 Status::Idle | Status::Starting => None,
552 Status::Running(since) => Some(since),
553 })
554 }
555
556 fn is_started(&self) -> bool {
557 matches!(self.status, Status::Running(_))
558 }
559}
560
561impl<E: ProfilerEngine> Drop for ProfilerState<E> {
562 fn drop(&mut self) {
563 match self.status {
564 Status::Running(_) => {
565 if let Err(err) = self.stop() {
566 // SECURITY: avoid removing the JFR file if stopping the profiler fails,
567 // to avoid symlink races
568 std::mem::forget(self.jfr_file.take());
569 // XXX: Rust defines leaking resources during drop as safe.
570 tracing::warn!(?err, "unable to stop profiler during drop glue");
571 }
572 }
573 Status::Idle => {}
574 Status::Starting => {
575 // SECURITY: avoid removing the JFR file if stopping the profiler fails,
576 // to avoid symlink races
577 std::mem::forget(self.jfr_file.take());
578 }
579 }
580 }
581}
582
583pub(crate) trait ProfilerEngine: Send + Sync + 'static {
584 fn init_async_profiler() -> Result<(), asprof::AsProfError>;
585 fn start_async_profiler(
586 &self,
587 jfr_file_path: &Path,
588 options: &ProfilerOptions,
589 ) -> Result<(), asprof::AsProfError>;
590 fn stop_async_profiler() -> Result<(), asprof::AsProfError>;
591}
592
593/// Holds the profiler task state and performs a final synchronous report
594/// when the task is cancelled (e.g. Tokio runtime shutdown) before a
595/// graceful stop. The local reporter will flush its contents on drop.
596/// For other reporters, you must call `RunningProfiler::stop().await`
597/// to ensure the last sample is uploaded.
598struct ProfilerTaskState<E: ProfilerEngine> {
599 state: ProfilerState<E>,
600 reporter: Box<dyn Reporter + Send + Sync>,
601 agent_metadata: Option<AgentMetadata>,
602 reporting_interval: Duration,
603 completed_normally: bool,
604}
605
606impl<E: ProfilerEngine> ProfilerTaskState<E> {
607 fn try_final_report(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
608 let start = self.state.stop()?.ok_or("profiler was not running")?;
609 let jfr_file = self.state.jfr_file.as_ref().ok_or("jfr file missing")?;
610 let jfr_path = jfr_file.active_path();
611 if jfr_path.metadata()?.len() == 0 {
612 return Ok(());
613 }
614 let metadata = ReportMetadata {
615 instance: self
616 .agent_metadata
617 .as_ref()
618 .unwrap_or(&AgentMetadata::NoMetadata),
619 start: start.duration_since(UNIX_EPOCH)?,
620 end: SystemTime::now().duration_since(UNIX_EPOCH)?,
621 reporting_interval: self.reporting_interval,
622 };
623 self.reporter
624 .report_blocking(&jfr_path, &metadata)
625 .map_err(|e| e.to_string())?;
626 Ok(())
627 }
628}
629
630impl<E: ProfilerEngine> Drop for ProfilerTaskState<E> {
631 fn drop(&mut self) {
632 if self.completed_normally || !self.state.is_started() {
633 return;
634 }
635 tracing::info!("profiler task cancelled, attempting final report on drop");
636 if let Err(err) = self.try_final_report() {
637 tracing::warn!(?err, "failed to report on drop");
638 }
639 }
640}
641
642#[derive(Debug, Error)]
643#[non_exhaustive]
644enum TickError {
645 #[error(transparent)]
646 AsProf(#[from] AsProfError),
647 #[error(transparent)]
648 #[cfg(feature = "aws-metadata-no-defaults")]
649 Metadata(#[from] crate::metadata::aws::AwsProfilerMetadataError),
650 #[error("reporter: {0}")]
651 Reporter(Box<dyn std::error::Error + Send>),
652 #[error("broken clock: {0}")]
653 BrokenClock(#[from] SystemTimeError),
654 #[error("jfr read error: {0}")]
655 JfrRead(io::Error),
656 #[error("empty inactive file error: {0}")]
657 EmptyInactiveFile(io::Error),
658}
659
660#[derive(Debug, Error)]
661#[non_exhaustive]
662/// An error that happened spawning a profiler
663pub enum SpawnError {
664 /// Error from async-profiler
665 #[error(transparent)]
666 AsProf(#[from] asprof::AsProfError),
667 /// Error writing to a tempfile
668 #[error("tempfile error")]
669 TempFile(#[source] io::Error),
670}
671
672#[derive(Debug, Error)]
673#[non_exhaustive]
674/// An error from [`Profiler::spawn_thread`]
675pub enum SpawnThreadError {
676 /// Error from async-profiler
677 #[error(transparent)]
678 AsProf(#[from] SpawnError),
679 /// Error constructing Tokio runtime
680 #[error("constructing Tokio runtime")]
681 ConstructRt(#[source] io::Error),
682}
683
684// no control messages currently
685enum Control {}
686
687/// A handle to a running profiler
688///
689/// Currently just allows for stopping the profiler.
690///
691/// Dropping this handle will request that the profiler will stop.
692#[must_use = "dropping this stops the profiler, call .detach() to detach"]
693pub struct RunningProfiler {
694 stop_channel: tokio::sync::oneshot::Sender<Control>,
695 join_handle: tokio::task::JoinHandle<()>,
696}
697
698impl RunningProfiler {
699 /// Request that the current profiler stops and wait until it exits.
700 ///
701 /// This will cause the currently-pending profile information to be flushed.
702 ///
703 /// After this function returns, it is correct and safe to [spawn] a new
704 /// [Profiler], possibly with a different configuration. Therefore,
705 /// this function can be used to "reconfigure" a profiler by stopping
706 /// it and then starting a new one with a different configuration.
707 ///
708 /// [spawn]: Profiler::spawn_controllable
709 pub async fn stop(self) {
710 drop(self.stop_channel);
711 let _ = self.join_handle.await;
712 }
713
714 /// Like [Self::detach], but returns a JoinHandle. This is currently not a public API.
715 fn detach_inner(self) -> tokio::task::JoinHandle<()> {
716 tokio::task::spawn(async move {
717 // move the control channel to the spawned task. this way, it will be dropped
718 // just when the task is aborted.
719 let _abort_channel = self.stop_channel;
720 self.join_handle.await.ok();
721 })
722 }
723
724 /// Detach this profiler. This will prevent the profiler from being stopped
725 /// when this handle is dropped. You should call this (or [Profiler::spawn]
726 /// instead of [Profiler::spawn_controllable], which does the same thing)
727 /// if you don't intend to reconfigure your profiler at runtime.
728 pub fn detach(self) {
729 self.detach_inner();
730 }
731
732 /// Spawns this [RunningProfiler] into a separate thread within a new Tokio runtime,
733 /// and returns a [RunningProfilerThread] attached to it.
734 fn spawn_attached(
735 self,
736 runtime: tokio::runtime::Runtime,
737 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
738 ) -> RunningProfilerThread {
739 RunningProfilerThread {
740 stop_channel: self.stop_channel,
741 join_handle: spawn_fn(Box::new(move || {
742 let _ = runtime.block_on(self.join_handle);
743 })),
744 }
745 }
746
747 /// Spawns this [RunningProfiler] into a separate thread within a new Tokio runtime,
748 /// and detaches it.
749 fn spawn_detached(
750 self,
751 runtime: tokio::runtime::Runtime,
752 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
753 ) {
754 spawn_fn(Box::new(move || {
755 let _stop_channel = self.stop_channel;
756 let _ = runtime.block_on(self.join_handle);
757 }));
758 }
759}
760
761/// A handle to a running profiler, running on a separate thread.
762///
763/// Currently just allows for stopping the profiler.
764///
765/// Dropping this handle will request that the profiler will stop.
766#[must_use = "dropping this stops the profiler, call .detach() to detach"]
767pub struct RunningProfilerThread {
768 stop_channel: tokio::sync::oneshot::Sender<Control>,
769 join_handle: std::thread::JoinHandle<()>,
770}
771
772impl RunningProfilerThread {
773 /// Request that the current profiler stops and wait until it exits.
774 ///
775 /// This will cause the currently-pending profile information to be flushed.
776 ///
777 /// After this function returns, it is correct and safe to [spawn] a new
778 /// [Profiler], possibly with a different configuration. Therefore,
779 /// this function can be used to "reconfigure" a profiler by stopping
780 /// it and then starting a new one with a different configuration.
781 ///
782 /// [spawn]: Profiler::spawn_controllable
783 pub fn stop(self) {
784 drop(self.stop_channel);
785 let _ = self.join_handle.join();
786 }
787}
788
789/// Rust profiler based on [async-profiler].
790///
791/// Spawning a profiler can be done either in an attached (controllable)
792/// mode, which allows for stopping the profiler (and, in fact, stops
793/// it when the relevant handle is dropped), or in detached mode,
794/// in which the profiler keeps running forever. Applications that can
795/// shut down the profiler at run-time, for example applications that
796/// support reconfiguration of a running profiler, generally want to use
797/// controllable mode. Other applications (most of them) should use
798/// detached mode.
799///
800/// In addition, the profiler can either be spawned into the current Tokio
801/// runtime, or into a new one. Normally, applications should spawn
802/// the profiler into their own Tokio runtime, but applications that
803/// don't have a default Tokio runtime should spawn it into a
804/// different one
805///
806/// This leaves 4 functions:
807/// 1. [Self::spawn] - detached, same runtime
808/// 2. [Self::spawn_thread_to_runtime] - detached, different runtime
809/// 3. [Self::spawn_controllable] - controllable, same runtime
810/// 4. [Self::spawn_controllable_thread_to_runtime] - controllable, different runtime
811///
812/// In addition, there's a helper function that just spawns the profiler
813/// to a new runtime in a new thread, for applications that don't have
814/// a Tokio runtime and don't need complex control:
815///
816/// 5. [Self::spawn_thread] - detached, new runtime in a new thread
817///
818/// [async-profiler]: https://github.com/async-profiler/async-profiler
819pub struct Profiler {
820 reporting_interval: Duration,
821 reporter: Box<dyn Reporter + Send + Sync>,
822 agent_metadata: Option<AgentMetadata>,
823 profiler_options: ProfilerOptions,
824}
825
826impl Profiler {
827 /// Start profiling. The profiler will run in a tokio task at the configured interval.
828 ///
829 /// This is the same as calling [Profiler::spawn_controllable] followed by
830 /// [RunningProfiler::detach], except it returns a [JoinHandle].
831 ///
832 /// The returned [JoinHandle] can be used to detect if the profiler has exited
833 /// due to a fatal error.
834 ///
835 /// This function will fail if it is unable to start async-profiler, for example
836 /// if it can't find or load `libasyncProfiler.so`.
837 ///
838 /// [JoinHandle]: tokio::task::JoinHandle
839 ///
840 /// ### Uploading the last sample
841 ///
842 /// When you return from the Tokio `main`, the agent will terminate without waiting
843 /// for the last profiling JFR to be uploaded. Especially if you have a
844 /// short-running program, if you want to ensure the last profiling JFR
845 /// is uploaded, you should use [Profiler::spawn_controllable] and
846 /// [RunningProfiler::stop] , which allows waiting for the upload
847 /// to finish.
848 ///
849 /// If you do not care about losing the last sample, it is fine to directly
850 /// return from the Tokio `main` without stopping the profiler.
851 ///
852 /// ### Tokio Runtime
853 ///
854 /// This function must be run within a Tokio runtime, otherwise it will panic. If
855 /// your application does not have a `main` Tokio runtime, see
856 /// [Profiler::spawn_thread].
857 ///
858 /// ### Example
859 ///
860 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
861 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
862 ///
863 /// ```
864 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
865 /// # #[tokio::main]
866 /// # async fn main() -> Result<(), SpawnError> {
867 /// let profiler = ProfilerBuilder::default()
868 /// .with_local_reporter("/tmp/profiles")
869 /// .build();
870 /// # if false { // don't spawn the profiler in doctests
871 /// profiler.spawn()?;
872 /// # }
873 /// # Ok(())
874 /// # }
875 /// ```
876 pub fn spawn(self) -> Result<tokio::task::JoinHandle<()>, SpawnError> {
877 self.spawn_controllable().map(RunningProfiler::detach_inner)
878 }
879
880 /// Like [Self::spawn], but instead of spawning within the current Tokio
881 /// runtime, spawns within a set Tokio runtime and then runs a thread that calls
882 /// [block_on](tokio::runtime::Runtime::block_on) on that runtime.
883 ///
884 /// If your configuration is standard, use [Profiler::spawn_thread].
885 ///
886 /// If you want to be able to stop the resulting profiler, use
887 /// [Profiler::spawn_controllable_thread_to_runtime].
888 ///
889 /// `spawn_fn` should be [`std::thread::spawn`], or some function that behaves like it (to
890 /// allow for configuring thread properties, for example thread names).
891 ///
892 /// This is to be used when your program does not have a "main" Tokio runtime already set up.
893 ///
894 /// ### Uploading the last sample
895 ///
896 /// When you return from `main`, the agent will terminate without waiting
897 /// for the last profiling JFR to be uploaded. Especially if you have a
898 /// short-running program, if you want to ensure the last profiling JFR
899 /// is uploaded, you should use [Profiler::spawn_controllable_thread_to_runtime]
900 /// and [RunningProfilerThread::stop], which allows waiting for the upload
901 /// to finish.
902 ///
903 /// If you do not care about losing the last sample, it is fine to directly
904 /// return from the Tokio `main` without stopping the profiler.
905 ///
906 /// ### Example
907 ///
908 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
909 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
910 ///
911 /// ```no_run
912 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
913 /// let rt = tokio::runtime::Builder::new_current_thread()
914 /// .enable_all()
915 /// .build()?;
916 /// let profiler = ProfilerBuilder::default()
917 /// .with_local_reporter("/tmp/profiles")
918 /// .build();
919 ///
920 /// profiler.spawn_thread_to_runtime(
921 /// rt,
922 /// |t| {
923 /// std::thread::Builder::new()
924 /// .name("asprof-agent".to_owned())
925 /// .spawn(t)
926 /// .expect("thread name contains nuls")
927 /// }
928 /// )?;
929 /// # Ok::<_, anyhow::Error>(())
930 /// ```
931 pub fn spawn_thread_to_runtime(
932 self,
933 runtime: tokio::runtime::Runtime,
934 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
935 ) -> Result<(), SpawnError> {
936 self.spawn_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn)
937 }
938
939 /// Like [Self::spawn], but instead of spawning within the current Tokio
940 /// runtime, spawns within a new Tokio runtime and then runs a thread that calls
941 /// [block_on](tokio::runtime::Runtime::block_on) on that runtime, setting up the runtime
942 /// by itself.
943 ///
944 /// If your configuration is less standard, use [Profiler::spawn_thread_to_runtime]. Calling
945 /// [Profiler::spawn_thread] is equivalent to calling [Profiler::spawn_thread_to_runtime]
946 /// with the following:
947 /// 1. a current thread runtime with background worker threads (these exist
948 /// for blocking IO) named "asprof-worker"
949 /// 2. a controller thread (the "main" thread of the runtime) named "asprof-agent"
950 ///
951 /// If you want to be able to stop the resulting profiler, use
952 /// [Profiler::spawn_controllable_thread_to_runtime].
953 ///
954 /// This is to be used when your program does not have a "main" Tokio runtime already set up.
955 ///
956 /// ### Uploading the last sample
957 ///
958 /// When you return from `main`, the agent will terminate without waiting
959 /// for the last profiling JFR to be uploaded. Especially if you have a
960 /// short-running program, if you want to ensure the last profiling JFR
961 /// is uploaded, you should use [Profiler::spawn_controllable_thread_to_runtime]
962 /// and [RunningProfilerThread::stop], which allows waiting for the upload
963 /// to finish.
964 ///
965 /// If you do not care about losing the last sample, it is fine to directly
966 /// return from the Tokio `main` without stopping the profiler.
967 ///
968 /// ### Example
969 ///
970 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
971 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
972 ///
973 /// ```no_run
974 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
975 /// # use async_profiler_agent::reporter::local::LocalReporter;
976 /// let profiler = ProfilerBuilder::default()
977 /// .with_local_reporter("/tmp/profiles")
978 /// .build();
979 ///
980 /// profiler.spawn_thread()?;
981 /// # Ok::<_, anyhow::Error>(())
982 /// ```
983 pub fn spawn_thread(self) -> Result<(), SpawnThreadError> {
984 // using "asprof" in thread name to deal with 15 character + \0 length limit
985 let rt = tokio::runtime::Builder::new_current_thread()
986 .thread_name("asprof-worker".to_owned())
987 .enable_all()
988 .build()
989 .map_err(SpawnThreadError::ConstructRt)?;
990 let builder = std::thread::Builder::new().name("asprof-agent".to_owned());
991 self.spawn_thread_to_runtime(rt, |t| builder.spawn(t).expect("thread name contains nuls"))
992 .map_err(SpawnThreadError::AsProf)
993 }
994
995 fn spawn_thread_inner<E: ProfilerEngine>(
996 self,
997 asprof: E,
998 runtime: tokio::runtime::Runtime,
999 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
1000 ) -> Result<(), SpawnError> {
1001 let handle: RunningProfiler = runtime.block_on(async move { self.spawn_inner(asprof) })?;
1002 handle.spawn_detached(runtime, spawn_fn);
1003 Ok(())
1004 }
1005
1006 /// Like [Self::spawn], but returns a [RunningProfiler] that allows for controlling
1007 /// (currently only stopping) the profiler.
1008 ///
1009 /// This allows for changing the configuration of the profiler at runtime, by
1010 /// stopping it and then starting a new Profiler with a new configuration. It
1011 /// also allows for stopping profiling in case the profiler is suspected to
1012 /// cause operational issues.
1013 ///
1014 /// Dropping the returned [RunningProfiler] will cause the profiler to quit,
1015 /// so if your application doen't need to change the profiler's configuration at runtime,
1016 /// it will be easier to use [Profiler::spawn].
1017 ///
1018 /// This function will fail if it is unable to start async-profiler, for example
1019 /// if it can't find or load `libasyncProfiler.so`.
1020 ///
1021 /// ### Uploading the last sample
1022 ///
1023 /// When you return from the Tokio `main`, the agent will terminate without waiting
1024 /// for the last profiling JFR to be uploaded. Especially if you have a
1025 /// short-running program, if you want to ensure the last profiling JFR
1026 /// is uploaded, you should use [RunningProfiler::stop], which allows waiting for
1027 /// the upload to finish.
1028 ///
1029 /// If you do not care about losing the last sample, it is fine to directly
1030 /// return from the Tokio `main` without stopping the profiler.
1031 ///
1032 /// ### Tokio Runtime
1033 ///
1034 /// This function must be run within a Tokio runtime, otherwise it will panic. If
1035 /// your application does not have a `main` Tokio runtime, see
1036 /// [Profiler::spawn_controllable_thread_to_runtime].
1037 ///
1038 /// ### Example
1039 ///
1040 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
1041 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
1042 ///
1043 /// ```no_run
1044 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
1045 /// # #[tokio::main]
1046 /// # async fn main() -> Result<(), SpawnError> {
1047 /// let profiler = ProfilerBuilder::default()
1048 /// .with_local_reporter("/tmp/profiles")
1049 /// .build();
1050 ///
1051 /// let profiler = profiler.spawn_controllable()?;
1052 ///
1053 /// // [insert your signaling/monitoring mechanism to have a request to disable
1054 /// // profiling in case of a problem]
1055 /// let got_request_to_disable_profiling = async move {
1056 /// // ...
1057 /// # false
1058 /// };
1059 /// // spawn a task that will disable profiling if requested
1060 /// tokio::task::spawn(async move {
1061 /// if got_request_to_disable_profiling.await {
1062 /// profiler.stop().await;
1063 /// }
1064 /// });
1065 /// # Ok(())
1066 /// # }
1067 /// ```
1068 pub fn spawn_controllable(self) -> Result<RunningProfiler, SpawnError> {
1069 self.spawn_inner(asprof::AsProf::builder().build())
1070 }
1071
1072 /// Like [Self::spawn_controllable], but instead of spawning within the current Tokio
1073 /// runtime, spawns within a set Tokio runtime and then runs a thread that calls
1074 /// [block_on](tokio::runtime::Runtime::block_on) on that runtime.
1075 ///
1076 /// `spawn_fn` should be [`std::thread::spawn`], or some function that behaves like it (to
1077 /// allow for configuring thread properties, for example thread names).
1078 ///
1079 /// This is to be used when your program does not have a "main" Tokio runtime already set up.
1080 ///
1081 /// ### Uploading the last sample
1082 ///
1083 /// When you return from `main`, the agent will terminate without waiting
1084 /// for the last profiling JFR to be uploaded. Especially if you have a
1085 /// short-running program, if you want to ensure the last profiling JFR
1086 /// is uploaded, you should use [RunningProfilerThread::stop], which allows waiting
1087 /// for the upload to finish.
1088 ///
1089 /// If you do not care about losing the last sample, it is fine to directly
1090 /// return from the Tokio `main` without stopping the profiler.
1091 ///
1092 /// ### Example
1093 ///
1094 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
1095 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
1096 ///
1097 /// ```no_run
1098 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
1099 /// let rt = tokio::runtime::Builder::new_current_thread()
1100 /// .enable_all()
1101 /// .build()?;
1102 /// let profiler = ProfilerBuilder::default()
1103 /// .with_local_reporter("/tmp/profiles")
1104 /// .build();
1105 ///
1106 /// let profiler = profiler.spawn_controllable_thread_to_runtime(
1107 /// rt,
1108 /// |t| {
1109 /// std::thread::Builder::new()
1110 /// .name("asprof-agent".to_owned())
1111 /// .spawn(t)
1112 /// .expect("thread name contains nuls")
1113 /// }
1114 /// )?;
1115 ///
1116 /// # fn got_request_to_disable_profiling() -> bool { false }
1117 /// // spawn a task that will disable profiling if requested
1118 /// std::thread::spawn(move || {
1119 /// if got_request_to_disable_profiling() {
1120 /// profiler.stop();
1121 /// }
1122 /// });
1123 /// # Ok::<_, anyhow::Error>(())
1124 /// ```
1125 pub fn spawn_controllable_thread_to_runtime(
1126 self,
1127 runtime: tokio::runtime::Runtime,
1128 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
1129 ) -> Result<RunningProfilerThread, SpawnError> {
1130 self.spawn_controllable_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn)
1131 }
1132
1133 fn spawn_controllable_thread_inner<E: ProfilerEngine>(
1134 self,
1135 asprof: E,
1136 runtime: tokio::runtime::Runtime,
1137 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
1138 ) -> Result<RunningProfilerThread, SpawnError> {
1139 let handle = runtime.block_on(async move { self.spawn_inner(asprof) })?;
1140 Ok(handle.spawn_attached(runtime, spawn_fn))
1141 }
1142
1143 fn spawn_inner<E: ProfilerEngine>(self, asprof: E) -> Result<RunningProfiler, SpawnError> {
1144 // Initialize async profiler - needs to be done once.
1145 E::init_async_profiler()?;
1146 tracing::info!("successfully initialized async profiler.");
1147
1148 let mut sampling_ticker = tokio::time::interval(self.reporting_interval);
1149 let (stop_channel, mut stop_rx) = tokio::sync::oneshot::channel();
1150
1151 // Get profiles at the configured interval rate.
1152 let join_handle = tokio::spawn(async move {
1153 let state = match ProfilerState::new(asprof, self.profiler_options) {
1154 Ok(state) => state,
1155 Err(err) => {
1156 tracing::error!(?err, "unable to create profiler state");
1157 return;
1158 }
1159 };
1160
1161 let mut task = ProfilerTaskState {
1162 state,
1163 reporter: self.reporter,
1164 agent_metadata: self.agent_metadata,
1165 reporting_interval: self.reporting_interval,
1166 completed_normally: false,
1167 };
1168
1169 let mut done = false;
1170 while !done {
1171 // Wait until a timer or exit event
1172 tokio::select! {
1173 biased;
1174
1175 r = &mut stop_rx, if !stop_rx.is_terminated() => {
1176 match r {
1177 Err(_) => {
1178 tracing::info!("profiler stop requested, doing a final tick");
1179 done = true;
1180 }
1181 }
1182 }
1183 _ = sampling_ticker.tick() => {
1184 tracing::debug!("profiler timer woke up");
1185 }
1186 }
1187
1188 if let Err(err) = profiler_tick(
1189 &mut task.state,
1190 &mut task.agent_metadata,
1191 &*task.reporter,
1192 task.reporting_interval,
1193 )
1194 .await
1195 {
1196 match &err {
1197 TickError::Reporter(_) => {
1198 // don't stop on IO errors
1199 tracing::error!(?err, "error during profiling, continuing");
1200 }
1201 _stop => {
1202 tracing::error!(?err, "error during profiling, stopping");
1203 break;
1204 }
1205 }
1206 }
1207 }
1208
1209 task.completed_normally = true;
1210 tracing::info!("profiling task finished");
1211 });
1212
1213 Ok(RunningProfiler {
1214 stop_channel,
1215 join_handle,
1216 })
1217 }
1218}
1219
1220async fn profiler_tick<E: ProfilerEngine>(
1221 state: &mut ProfilerState<E>,
1222 agent_metadata: &mut Option<AgentMetadata>,
1223 reporter: &(dyn Reporter + Send + Sync),
1224 reporting_interval: Duration,
1225) -> Result<(), TickError> {
1226 if !state.is_started() {
1227 state.start().await?;
1228 return Ok(());
1229 }
1230
1231 let Some(start) = state.stop()? else {
1232 tracing::warn!("stopped the profiler but it wasn't running?");
1233 return Ok(());
1234 };
1235 let start = start.duration_since(UNIX_EPOCH)?;
1236 let end = SystemTime::now().duration_since(UNIX_EPOCH)?;
1237
1238 // Start it up immediately, writing to the "other" file, so that we keep
1239 // profiling the application while we're reporting data.
1240 state
1241 .jfr_file_mut()
1242 .empty_inactive_file()
1243 .map_err(TickError::EmptyInactiveFile)?;
1244 state.jfr_file_mut().swap();
1245 state.start().await?;
1246
1247 // Lazily load the agent metadata if it was not provided in
1248 // the constructor. See the struct comments for why this is.
1249 // This code runs at most once.
1250 if agent_metadata.is_none() {
1251 #[cfg(feature = "aws-metadata-no-defaults")]
1252 let md = crate::metadata::aws::load_agent_metadata().await?;
1253 #[cfg(not(feature = "aws-metadata-no-defaults"))]
1254 let md = crate::metadata::AgentMetadata::NoMetadata;
1255 tracing::debug!("loaded metadata");
1256 agent_metadata.replace(md);
1257 }
1258
1259 let report_metadata = ReportMetadata {
1260 instance: agent_metadata.as_ref().unwrap(),
1261 start,
1262 end,
1263 reporting_interval,
1264 };
1265
1266 let jfr = tokio::fs::read(state.jfr_file_mut().inactive_path())
1267 .await
1268 .map_err(TickError::JfrRead)?;
1269
1270 reporter
1271 .report(jfr, &report_metadata)
1272 .await
1273 .map_err(TickError::Reporter)?;
1274
1275 Ok(())
1276}
1277
1278#[cfg(test)]
1279mod tests {
1280 use std::sync::Arc;
1281 use std::sync::atomic::{self, AtomicBool, AtomicU32};
1282
1283 use test_case::test_case;
1284
1285 use super::*;
1286
1287 #[test]
1288 fn test_jfr_file_drop() {
1289 let mut jfr = JfrFile::new().unwrap();
1290
1291 std::fs::write(jfr.active_path(), b"Hello, 2!").unwrap();
1292 jfr.swap();
1293 assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"Hello, 2!");
1294 jfr.empty_inactive_file().unwrap();
1295 assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"");
1296 }
1297
1298 struct MockProfilerEngine {
1299 counter: AtomicU32,
1300 }
1301 impl ProfilerEngine for MockProfilerEngine {
1302 fn init_async_profiler() -> Result<(), asprof::AsProfError> {
1303 Ok(())
1304 }
1305
1306 fn start_async_profiler(
1307 &self,
1308 jfr_file_path: &Path,
1309 _options: &ProfilerOptions,
1310 ) -> Result<(), asprof::AsProfError> {
1311 let contents = format!(
1312 "JFR{}",
1313 self.counter.fetch_add(1, atomic::Ordering::Relaxed)
1314 );
1315 std::fs::write(jfr_file_path, contents.as_bytes()).unwrap();
1316 Ok(())
1317 }
1318
1319 fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
1320 Ok(())
1321 }
1322 }
1323
1324 struct MockReporter(tokio::sync::mpsc::Sender<(String, AgentMetadata)>);
1325 impl std::fmt::Debug for MockReporter {
1326 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1327 f.debug_struct("MockReporter").finish()
1328 }
1329 }
1330
1331 #[async_trait::async_trait]
1332 impl Reporter for MockReporter {
1333 async fn report(
1334 &self,
1335 jfr: Vec<u8>,
1336 metadata: &ReportMetadata,
1337 ) -> Result<(), Box<dyn std::error::Error + Send>> {
1338 self.0
1339 .send((String::from_utf8(jfr).unwrap(), metadata.instance.clone()))
1340 .await
1341 .unwrap();
1342 Ok(())
1343 }
1344 }
1345
1346 fn make_mock_profiler() -> (
1347 Profiler,
1348 tokio::sync::mpsc::Receiver<(String, AgentMetadata)>,
1349 ) {
1350 let (tx, rx) = tokio::sync::mpsc::channel(1);
1351 let agent = ProfilerBuilder::default()
1352 .with_reporter(MockReporter(tx))
1353 .with_custom_agent_metadata(AgentMetadata::Ec2AgentMetadata {
1354 aws_account_id: "0".into(),
1355 aws_region_id: "us-east-1".into(),
1356 ec2_instance_id: "i-fake".into(),
1357 ec2_instance_type: "t3.micro".into(),
1358 })
1359 .build();
1360 (agent, rx)
1361 }
1362
1363 #[tokio::test(start_paused = true)]
1364 async fn test_profiler_agent() {
1365 let e_md = AgentMetadata::Ec2AgentMetadata {
1366 aws_account_id: "0".into(),
1367 aws_region_id: "us-east-1".into(),
1368 ec2_instance_id: "i-fake".into(),
1369 ec2_instance_type: "t3.micro".into(),
1370 };
1371 let (agent, mut rx) = make_mock_profiler();
1372 agent
1373 .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1374 counter: AtomicU32::new(0),
1375 })
1376 .unwrap()
1377 .detach();
1378 let (jfr, md) = rx.recv().await.unwrap();
1379 assert_eq!(jfr, "JFR0");
1380 assert_eq!(e_md, md);
1381 let (jfr, md) = rx.recv().await.unwrap();
1382 assert_eq!(jfr, "JFR1");
1383 assert_eq!(e_md, md);
1384 }
1385
1386 #[test_case(false; "uncontrollable")]
1387 #[test_case(true; "controllable")]
1388 fn test_profiler_local_rt(controllable: bool) {
1389 let e_md = AgentMetadata::Ec2AgentMetadata {
1390 aws_account_id: "0".into(),
1391 aws_region_id: "us-east-1".into(),
1392 ec2_instance_id: "i-fake".into(),
1393 ec2_instance_type: "t3.micro".into(),
1394 };
1395 let (agent, mut rx) = make_mock_profiler();
1396 let rt = tokio::runtime::Builder::new_current_thread()
1397 .enable_all()
1398 .start_paused(true)
1399 .build()
1400 .unwrap();
1401 // spawn the profiler, doing this before spawning a thread to allow
1402 // capturing errors from `spawn`
1403 let handle = if controllable {
1404 Some(
1405 agent
1406 .spawn_controllable_thread_inner::<MockProfilerEngine>(
1407 MockProfilerEngine {
1408 counter: AtomicU32::new(0),
1409 },
1410 rt,
1411 std::thread::spawn,
1412 )
1413 .unwrap(),
1414 )
1415 } else {
1416 agent
1417 .spawn_thread_inner::<MockProfilerEngine>(
1418 MockProfilerEngine {
1419 counter: AtomicU32::new(0),
1420 },
1421 rt,
1422 std::thread::spawn,
1423 )
1424 .unwrap();
1425 None
1426 };
1427
1428 let (jfr, md) = rx.blocking_recv().unwrap();
1429 assert_eq!(jfr, "JFR0");
1430 assert_eq!(e_md, md);
1431 let (jfr, md) = rx.blocking_recv().unwrap();
1432 assert_eq!(jfr, "JFR1");
1433 assert_eq!(e_md, md);
1434
1435 if let Some(handle) = handle {
1436 let drain_thread =
1437 std::thread::spawn(move || while let Some(_) = rx.blocking_recv() {});
1438 // request a stop
1439 handle.stop();
1440 // the drain thread should be done
1441 drain_thread.join().unwrap();
1442 }
1443 }
1444
1445 enum StopKind {
1446 Delibrate,
1447 Drop,
1448 Abort,
1449 }
1450
1451 #[tokio::test(start_paused = true)]
1452 #[test_case(StopKind::Delibrate; "deliberate stop")]
1453 #[test_case(StopKind::Drop; "drop stop")]
1454 #[test_case(StopKind::Abort; "abort stop")]
1455 async fn test_profiler_stop(stop_kind: StopKind) {
1456 let e_md = AgentMetadata::Ec2AgentMetadata {
1457 aws_account_id: "0".into(),
1458 aws_region_id: "us-east-1".into(),
1459 ec2_instance_id: "i-fake".into(),
1460 ec2_instance_type: "t3.micro".into(),
1461 };
1462 let (agent, mut rx) = make_mock_profiler();
1463 let profiler_ref = agent
1464 .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1465 counter: AtomicU32::new(0),
1466 })
1467 .unwrap();
1468 let (jfr, md) = rx.recv().await.unwrap();
1469 assert_eq!(jfr, "JFR0");
1470 assert_eq!(e_md, md);
1471 let (jfr, md) = rx.recv().await.unwrap();
1472 assert_eq!(jfr, "JFR1");
1473 assert_eq!(e_md, md);
1474 // check that stop is faster than an interval and returns an "immediate" next jfr
1475 match stop_kind {
1476 StopKind::Drop => drop(profiler_ref),
1477 StopKind::Delibrate => {
1478 tokio::time::timeout(Duration::from_millis(1), profiler_ref.stop())
1479 .await
1480 .unwrap();
1481 }
1482 StopKind::Abort => {
1483 // You can call Abort on the JoinHandle. make sure that is not buggy.
1484 profiler_ref.detach_inner().abort();
1485 }
1486 }
1487 // check that we get the next JFR "quickly", and the JFR after that is empty.
1488 let (jfr, md) = tokio::time::timeout(Duration::from_millis(1), rx.recv())
1489 .await
1490 .unwrap()
1491 .unwrap();
1492 assert_eq!(jfr, "JFR2");
1493 assert_eq!(e_md, md);
1494 assert!(rx.recv().await.is_none());
1495 }
1496
1497 // simulate a badly-behaved profiler that errors on start/stop and then
1498 // tries to access the JFR file
1499 struct StopErrorProfilerEngine {
1500 start_error: bool,
1501 counter: Arc<AtomicBool>,
1502 }
1503 impl ProfilerEngine for StopErrorProfilerEngine {
1504 fn init_async_profiler() -> Result<(), asprof::AsProfError> {
1505 Ok(())
1506 }
1507
1508 fn start_async_profiler(
1509 &self,
1510 jfr_file_path: &Path,
1511 _options: &ProfilerOptions,
1512 ) -> Result<(), asprof::AsProfError> {
1513 let jfr_file_path = jfr_file_path.to_owned();
1514 std::fs::write(&jfr_file_path, "JFR").unwrap();
1515 let counter = self.counter.clone();
1516 tokio::task::spawn(async move {
1517 tokio::time::sleep(Duration::from_secs(5)).await;
1518 assert_eq!(std::fs::read_to_string(jfr_file_path).unwrap(), "JFR");
1519 counter.store(true, atomic::Ordering::Release);
1520 });
1521 if self.start_error {
1522 Err(asprof::AsProfError::AsyncProfilerError("error".into()))
1523 } else {
1524 Ok(())
1525 }
1526 }
1527
1528 fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
1529 Err(asprof::AsProfError::AsyncProfilerError("error".into()))
1530 }
1531 }
1532
1533 #[tokio::test(start_paused = true)]
1534 #[test_case(false; "error on stop")]
1535 #[test_case(true; "error on start")]
1536 async fn test_profiler_error(start_error: bool) {
1537 let (agent, mut rx) = make_mock_profiler();
1538 let counter = Arc::new(AtomicBool::new(false));
1539 let engine = StopErrorProfilerEngine {
1540 start_error,
1541 counter: counter.clone(),
1542 };
1543 let handle = agent.spawn_inner(engine).unwrap().detach_inner();
1544 assert!(rx.recv().await.is_none());
1545 // check that the "sleep 5" step in start_async_profiler succeeds
1546 for _ in 0..100 {
1547 tokio::time::sleep(Duration::from_secs(1)).await;
1548 if counter.load(atomic::Ordering::Acquire) {
1549 handle.await.unwrap(); // Check that the JoinHandle is done
1550 return;
1551 }
1552 }
1553 panic!("didn't read from file");
1554 }
1555
1556 #[test]
1557 fn test_profiler_options_to_args_string_default() {
1558 let opts = ProfilerOptions::default();
1559 let dummy_path = Path::new("/tmp/test.jfr");
1560 let args = opts.to_args_string(dummy_path);
1561 assert!(
1562 args.contains("start,event=cpu,interval=100000000,wall=1000ms,jfr,cstack=dwarf"),
1563 "Default args string not constructed correctly"
1564 );
1565 assert!(args.contains("file=/tmp/test.jfr"));
1566 assert!(!args.contains("nativemem="));
1567 }
1568
1569 #[test]
1570 fn test_profiler_options_to_args_string_with_native_mem() {
1571 let opts = ProfilerOptions {
1572 native_mem: Some("10m".to_string()),
1573 wall_clock_millis: None,
1574 cpu_interval: None,
1575 };
1576 let dummy_path = Path::new("/tmp/test.jfr");
1577 let args = opts.to_args_string(dummy_path);
1578 assert!(args.contains("nativemem=10m"));
1579 }
1580
1581 #[test]
1582 fn test_profiler_options_builder() {
1583 let opts = ProfilerOptionsBuilder::default()
1584 .with_native_mem_bytes(5000000)
1585 .build();
1586
1587 assert_eq!(opts.native_mem, Some("5000000".to_string()));
1588 }
1589
1590 #[test]
1591 fn test_profiler_options_builder_all_options() {
1592 let opts = ProfilerOptionsBuilder::default()
1593 .with_native_mem_bytes(5000000)
1594 .with_cpu_interval(Duration::from_secs(1))
1595 .with_wall_clock_interval(Duration::from_secs(10))
1596 .build();
1597
1598 let dummy_path = Path::new("/tmp/test.jfr");
1599 let args = opts.to_args_string(dummy_path);
1600 assert_eq!(
1601 args,
1602 "start,event=cpu,interval=1000000000,wall=10000ms,jfr,cstack=dwarf,file=/tmp/test.jfr,nativemem=5000000"
1603 );
1604 }
1605
1606 #[test]
1607 fn test_local_reporter_has_no_metadata() {
1608 // Check that with_local_reporter sets some configuration
1609 let reporter = ProfilerBuilder::default().with_local_reporter(".");
1610 assert_eq!(
1611 format!("{:?}", reporter.reporter),
1612 r#"Some(LocalReporter { directory: "." })"#
1613 );
1614 match reporter.agent_metadata {
1615 Some(AgentMetadata::NoMetadata) => {}
1616 bad => panic!("{bad:?}"),
1617 };
1618 }
1619
1620 /// A reporter that tracks both async and blocking reports separately.
1621 struct BlockingMockReporter {
1622 async_tx: tokio::sync::mpsc::Sender<String>,
1623 blocking_reports: Arc<std::sync::Mutex<Vec<String>>>,
1624 }
1625 impl std::fmt::Debug for BlockingMockReporter {
1626 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1627 f.debug_struct("BlockingMockReporter").finish()
1628 }
1629 }
1630
1631 #[async_trait::async_trait]
1632 impl Reporter for BlockingMockReporter {
1633 async fn report(
1634 &self,
1635 jfr: Vec<u8>,
1636 _metadata: &ReportMetadata,
1637 ) -> Result<(), Box<dyn std::error::Error + Send>> {
1638 self.async_tx
1639 .send(String::from_utf8(jfr).unwrap())
1640 .await
1641 .unwrap();
1642 Ok(())
1643 }
1644
1645 fn report_blocking(
1646 &self,
1647 jfr_path: &Path,
1648 _metadata: &ReportMetadata,
1649 ) -> Result<(), Box<dyn std::error::Error + Send>> {
1650 let jfr = std::fs::read(jfr_path).map_err(|e| Box::new(e) as _)?;
1651 self.blocking_reports
1652 .lock()
1653 .unwrap()
1654 .push(String::from_utf8(jfr).unwrap());
1655 Ok(())
1656 }
1657 }
1658
1659 /// Simulates a runtime shutdown while the profiler is running.
1660 /// The profiler should call report_blocking on drop to flush the
1661 /// last sample.
1662 #[test]
1663 fn test_profiler_report_on_drop() {
1664 let blocking_reports = Arc::new(std::sync::Mutex::new(Vec::new()));
1665
1666 let rt = tokio::runtime::Builder::new_current_thread()
1667 .enable_all()
1668 .start_paused(true)
1669 .build()
1670 .unwrap();
1671
1672 let reports_clone = blocking_reports.clone();
1673 rt.block_on(async {
1674 let (async_tx, mut async_rx) = tokio::sync::mpsc::channel::<String>(10);
1675 let agent = ProfilerBuilder::default()
1676 .with_reporter(BlockingMockReporter {
1677 async_tx,
1678 blocking_reports: reports_clone,
1679 })
1680 .with_custom_agent_metadata(AgentMetadata::NoMetadata)
1681 .build();
1682 // Detach so the stop channel doesn't trigger a graceful stop
1683 // when the block_on future returns.
1684 agent
1685 .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1686 counter: AtomicU32::new(0),
1687 })
1688 .unwrap()
1689 .detach();
1690
1691 // Wait for first async report to confirm profiler is running
1692 let jfr = async_rx.recv().await.unwrap();
1693 assert_eq!(jfr, "JFR0");
1694 // Return without stopping — runtime drop will cancel the task.
1695 });
1696
1697 // Runtime shutdown cancels all tasks, triggering ProfilerTaskState::Drop.
1698 drop(rt);
1699
1700 let reports = blocking_reports.lock().unwrap();
1701 assert_eq!(
1702 reports.len(),
1703 1,
1704 "expected exactly one blocking report on drop"
1705 );
1706 assert_eq!(reports[0], "JFR1");
1707 }
1708}