async_profiler_agent/profiler.rs
1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! A profiler that periodically uploads profiling samples of your program to a [Reporter]
5
6use crate::{
7 asprof::{self, AsProfError},
8 metadata::{AgentMetadata, ReportMetadata},
9 reporter::{Reporter, local::LocalReporter},
10};
11use std::{
12 fs::File,
13 io, mem,
14 path::{Path, PathBuf},
15 time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
16};
17use thiserror::Error;
18
19struct JfrFile {
20 active: std::fs::File,
21 inactive: std::fs::File,
22}
23
24impl JfrFile {
25 #[cfg(target_os = "linux")]
26 fn new() -> Result<Self, io::Error> {
27 Ok(Self {
28 active: tempfile::tempfile().unwrap(),
29 inactive: tempfile::tempfile().unwrap(),
30 })
31 }
32
33 #[cfg(not(target_os = "linux"))]
34 fn new() -> Result<Self, io::Error> {
35 Err(io::Error::other(
36 "async-profiler is only supported on Linux",
37 ))
38 }
39
40 fn swap(&mut self) {
41 std::mem::swap(&mut self.active, &mut self.inactive);
42 }
43
44 #[cfg(target_os = "linux")]
45 fn file_path(file: &std::fs::File) -> PathBuf {
46 use std::os::fd::AsRawFd;
47
48 format!("/proc/self/fd/{}", file.as_raw_fd()).into()
49 }
50
51 #[cfg(not(target_os = "linux"))]
52 fn file_path(_file: &std::fs::File) -> PathBuf {
53 unimplemented!()
54 }
55
56 fn active_path(&self) -> PathBuf {
57 Self::file_path(&self.active)
58 }
59
60 fn inactive_path(&self) -> PathBuf {
61 Self::file_path(&self.inactive)
62 }
63
64 fn empty_inactive_file(&mut self) -> Result<(), io::Error> {
65 // Empty the file, or create it for the first time if the profiler hasn't
66 // started yet.
67 File::create(Self::file_path(&self.inactive))?;
68 tracing::debug!(message = "emptied the file");
69 Ok(())
70 }
71}
72
73/// Options for configuring the async-profiler behavior.
74/// Currently supports:
75/// - Native memory allocation tracking
76#[derive(Debug, Default)]
77#[non_exhaustive]
78pub struct ProfilerOptions {
79 /// If set, the profiler will collect information about
80 /// native memory allocations.
81 ///
82 /// The value is the interval in bytes or in other units,
83 /// if followed by k (kilobytes), m (megabytes), or g (gigabytes).
84 /// For example, `"10m"` will sample an allocation for every
85 /// 10 megabytes of memory allocated. Passing `"0"` will sample
86 /// all allocations.
87 ///
88 /// See [ProfilingModes in the async-profiler docs] for more details.
89 ///
90 /// [ProfilingModes in the async-profiler docs]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilingModes.md#native-memory-leaks
91 pub native_mem: Option<String>,
92 cpu_interval: Option<u128>,
93 wall_clock_millis: Option<u128>,
94}
95
96const DEFAULT_CPU_INTERVAL_NANOS: u128 = 100_000_000;
97const DEFAULT_WALL_CLOCK_INTERVAL_MILLIS: u128 = 1_000;
98
99impl ProfilerOptions {
100 /// Convert the profiler options to a string of arguments for the async-profiler.
101 pub fn to_args_string(&self, jfr_file_path: &std::path::Path) -> String {
102 let mut args = format!(
103 "start,event=cpu,interval={},wall={}ms,jfr,cstack=dwarf,file={}",
104 self.cpu_interval.unwrap_or(DEFAULT_CPU_INTERVAL_NANOS),
105 self.wall_clock_millis
106 .unwrap_or(DEFAULT_WALL_CLOCK_INTERVAL_MILLIS),
107 jfr_file_path.display()
108 );
109 if let Some(ref native_mem) = self.native_mem {
110 args.push_str(&format!(",nativemem={native_mem}"));
111 }
112 args
113 }
114}
115
116/// Builder for [`ProfilerOptions`].
117#[derive(Debug, Default)]
118pub struct ProfilerOptionsBuilder {
119 native_mem: Option<String>,
120 cpu_interval: Option<u128>,
121 wall_clock_millis: Option<u128>,
122}
123
124impl ProfilerOptionsBuilder {
125 /// Same as [ProfilerOptionsBuilder::with_native_mem_bytes], but pass
126 /// the string input directly to async_profiler.
127 ///
128 /// The value is the interval in bytes or in other units,
129 /// if followed by k (kilobytes), m (megabytes), or g (gigabytes).
130 ///
131 /// Prefer using [ProfilerOptionsBuilder::with_native_mem_bytes], since it's
132 /// type-checked.
133 ///
134 /// ### Examples
135 ///
136 /// This will sample allocations for every 10 megabytes allocated:
137 ///
138 /// ```
139 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
140 /// # use async_profiler_agent::profiler::SpawnError;
141 /// # #[tokio::main]
142 /// # async fn main() -> Result<(), SpawnError> {
143 /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build();
144 /// let profiler = ProfilerBuilder::default()
145 /// .with_profiler_options(opts)
146 /// .with_local_reporter("/tmp/profiles")
147 /// .build();
148 /// # if false { // don't spawn the profiler in doctests
149 /// let profiler = profiler.spawn_controllable()?;
150 /// // ... your program goes here
151 /// profiler.stop().await; // make sure the last profile is flushed
152 /// # }
153 /// # Ok(())
154 /// # }
155 /// ```
156 pub fn with_native_mem(mut self, native_mem_interval: String) -> Self {
157 self.native_mem = Some(native_mem_interval);
158 self
159 }
160
161 /// If set, the profiler will collect information about
162 /// native memory allocations.
163 ///
164 /// The argument passed is the profiling interval - the profiler will
165 /// sample allocations every about that many bytes.
166 ///
167 /// See [ProfilingModes in the async-profiler docs] for more details.
168 ///
169 /// [ProfilingModes in the async-profiler docs]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilingModes.md#native-memory-leaks
170 ///
171 /// ### Examples
172 ///
173 /// This will sample allocations for every 10 megabytes allocated:
174 ///
175 /// ```
176 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
177 /// # use async_profiler_agent::profiler::SpawnError;
178 /// # #[tokio::main]
179 /// # async fn main() -> Result<(), SpawnError> {
180 /// let opts = ProfilerOptionsBuilder::default().with_native_mem_bytes(10_000_000).build();
181 /// let profiler = ProfilerBuilder::default()
182 /// .with_profiler_options(opts)
183 /// .with_local_reporter("/tmp/profiles")
184 /// .build();
185 /// # if false { // don't spawn the profiler in doctests
186 /// let profiler = profiler.spawn_controllable()?;
187 /// // ... your program goes here
188 /// profiler.stop().await; // make sure the last profile is flushed
189 /// # }
190 /// # Ok(())
191 /// # }
192 /// ```
193 ///
194 /// This will sample every allocation (potentially slow):
195 /// ```
196 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
197 /// # use async_profiler_agent::profiler::SpawnError;
198 /// # #[tokio::main]
199 /// # async fn main() -> Result<(), SpawnError> {
200 /// let opts = ProfilerOptionsBuilder::default().with_native_mem_bytes(0).build();
201 /// let profiler = ProfilerBuilder::default()
202 /// .with_profiler_options(opts)
203 /// .with_local_reporter("/tmp/profiles")
204 /// .build();
205 /// # if false { // don't spawn the profiler in doctests
206 /// let profiler = profiler.spawn_controllable()?;
207 /// // ... your program goes here
208 /// profiler.stop().await; // make sure the last profile is flushed
209 /// # }
210 /// # Ok(())
211 /// # }
212 /// ```
213 pub fn with_native_mem_bytes(mut self, native_mem_interval: usize) -> Self {
214 self.native_mem = Some(native_mem_interval.to_string());
215 self
216 }
217
218 /// Sets the interval in which the profiler will collect
219 /// CPU-time samples, via the [async-profiler `interval` option].
220 ///
221 /// CPU-time samples (JFR `jdk.ExecutionSample`) sample only threads that
222 /// are currently running on a CPU, not threads that are sleeping.
223 ///
224 /// It can use a higher frequency than wall-clock sampling since the
225 /// number of the threads that are running on a CPU at a given time is
226 /// naturally limited by the number of CPUs, while the number of sleeping
227 /// threads can be much larger.
228 ///
229 /// The default is to do a CPU-time sample every 100 milliseconds.
230 ///
231 /// The async-profiler agent collects both CPU time and wall-clock time
232 /// samples, so this function should normally be used along with
233 /// [ProfilerOptionsBuilder::with_wall_clock_interval].
234 ///
235 /// [async-profiler `interval` option]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilerOptions.md#options-applicable-to-any-output-format
236 ///
237 /// ### Examples
238 ///
239 /// This will sample allocations for every 10 CPU milliseconds (when running)
240 /// and 100 wall-clock milliseconds (running or sleeping):
241 ///
242 /// ```
243 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
244 /// # use async_profiler_agent::profiler::SpawnError;
245 /// # use std::time::Duration;
246 /// # #[tokio::main]
247 /// # async fn main() -> Result<(), SpawnError> {
248 /// let opts = ProfilerOptionsBuilder::default()
249 /// .with_cpu_interval(Duration::from_millis(10))
250 /// .with_wall_clock_interval(Duration::from_millis(100))
251 /// .build();
252 /// let profiler = ProfilerBuilder::default()
253 /// .with_profiler_options(opts)
254 /// .with_local_reporter("/tmp/profiles")
255 /// .build();
256 /// # if false { // don't spawn the profiler in doctests
257 /// let profiler = profiler.spawn_controllable()?;
258 /// // ... your program goes here
259 /// profiler.stop().await; // make sure the last profile is flushed
260 /// # }
261 /// # Ok(())
262 /// # }
263 /// ```
264 pub fn with_cpu_interval(mut self, cpu_interval: Duration) -> Self {
265 self.cpu_interval = Some(cpu_interval.as_nanos());
266 self
267 }
268
269 /// Sets the interval, in milliseconds, in which the profiler will collect
270 /// wall-clock samples, via the [async-profiler `wall` option].
271 ///
272 /// Wall-clock samples (JFR `profiler.WallClockSample`) sample threads
273 /// whether they are sleeping or running, and can therefore be
274 /// very useful for finding threads that are blocked, for example
275 /// on a synchronous lock or a slow system call.
276 ///
277 /// When using Tokio, since tasks are not threads, tasks that are not
278 /// currently running will not be sampled by a wall clock sample. However,
279 /// a wall clock sample is still very useful in Tokio, since it is what
280 /// you want to catch tasks that are blocking a thread by waiting on
281 /// synchronous operations.
282 ///
283 /// The default is to do a wall-clock sample every second.
284 ///
285 /// The async-profiler agent collects both CPU time and wall-clock time
286 /// samples, so this function should normally be used along with
287 /// [ProfilerOptionsBuilder::with_cpu_interval].
288 ///
289 /// [async-profiler `wall` option]: https://github.com/async-profiler/async-profiler/blob/v4.0/docs/ProfilerOptions.md#options-applicable-to-any-output-format
290 ///
291 /// ### Examples
292 ///
293 /// This will sample allocations for every 10 CPU milliseconds (when running)
294 /// and 100 wall-clock milliseconds (running or sleeping):
295 ///
296 /// ```
297 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
298 /// # use async_profiler_agent::profiler::SpawnError;
299 /// # use std::time::Duration;
300 /// # #[tokio::main]
301 /// # async fn main() -> Result<(), SpawnError> {
302 /// let opts = ProfilerOptionsBuilder::default()
303 /// .with_cpu_interval(Duration::from_millis(10))
304 /// .with_wall_clock_interval(Duration::from_millis(10))
305 /// .build();
306 /// let profiler = ProfilerBuilder::default()
307 /// .with_profiler_options(opts)
308 /// .with_local_reporter("/tmp/profiles")
309 /// .build();
310 /// # if false { // don't spawn the profiler in doctests
311 /// let profiler = profiler.spawn_controllable()?;
312 /// // ... your program goes here
313 /// profiler.stop().await; // make sure the last profile is flushed
314 /// # }
315 /// # Ok(())
316 /// # }
317 /// ```
318 pub fn with_wall_clock_interval(mut self, wall_clock: Duration) -> Self {
319 self.wall_clock_millis = Some(wall_clock.as_millis());
320 self
321 }
322
323 /// Build the [`ProfilerOptions`] from the builder.
324 pub fn build(self) -> ProfilerOptions {
325 ProfilerOptions {
326 native_mem: self.native_mem,
327 wall_clock_millis: self.wall_clock_millis,
328 cpu_interval: self.cpu_interval,
329 }
330 }
331}
332
333/// Builds a [`Profiler`], panicking if any required fields were not set by the
334/// time `build` is called.
335#[derive(Debug, Default)]
336pub struct ProfilerBuilder {
337 reporting_interval: Option<Duration>,
338 reporter: Option<Box<dyn Reporter + Send + Sync>>,
339 agent_metadata: Option<AgentMetadata>,
340 profiler_options: Option<ProfilerOptions>,
341}
342
343impl ProfilerBuilder {
344 /// Sets the reporting interval (default: 30 seconds).
345 ///
346 /// This is the interval that samples are *reported* to the backend,
347 /// and is unrelated to the interval at which the application
348 /// is *sampled* by async profiler, which is controlled by
349 /// [ProfilerOptionsBuilder::with_cpu_interval] and
350 /// [ProfilerOptionsBuilder::with_wall_clock_interval].
351 ///
352 /// Most users should not change this setting.
353 ///
354 /// ## Example
355 ///
356 /// ```no_run
357 /// # use async_profiler_agent::profiler::SpawnError;
358 /// # #[tokio::main]
359 /// # async fn main() -> Result<(), SpawnError> {
360 /// # use std::path::PathBuf;
361 /// # use std::time::Duration;
362 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
363 /// # let path = PathBuf::from(".");
364 /// let agent = ProfilerBuilder::default()
365 /// .with_local_reporter(path)
366 /// .with_reporting_interval(Duration::from_secs(15))
367 /// .build()
368 /// .spawn_controllable()?;
369 /// // .. your program goes here
370 /// agent.stop().await; // make sure the last profile is flushed
371 /// # Ok::<_, SpawnError>(())
372 /// # }
373 /// ```
374 pub fn with_reporting_interval(mut self, i: Duration) -> ProfilerBuilder {
375 self.reporting_interval = Some(i);
376 self
377 }
378
379 /// Sets the [`Reporter`], which is used to upload the collected profiling
380 /// data. Common reporters are [`LocalReporter`], and, with the `s3-no-defaults`
381 /// feature enabled,
382 #[cfg_attr(not(feature = "s3-no-defaults"), doc = "`S3Reporter`.")]
383 #[cfg_attr(feature = "s3-no-defaults", doc = "[`S3Reporter`].")]
384 /// It is also possible to write your own [`Reporter`].
385 ///
386 /// It's normally easier to use [`LocalReporter`] directly via
387 /// [`ProfilerBuilder::with_local_reporter`].
388 ///
389 /// If you want to output to multiple reporters, you can use
390 /// [`MultiReporter`].
391 ///
392 /// [`LocalReporter`]: crate::reporter::local::LocalReporter
393 /// [`MultiReporter`]: crate::reporter::multi::MultiReporter
394 #[cfg_attr(
395 feature = "s3-no-defaults",
396 doc = "[`S3Reporter`]: crate::reporter::s3::S3Reporter"
397 )]
398 ///
399 #[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example.md"))]
400 pub fn with_reporter(mut self, r: impl Reporter + Send + Sync + 'static) -> ProfilerBuilder {
401 self.reporter = Some(Box::new(r));
402 self
403 }
404
405 /// Sets the profiler to ues [LocalReporter], which will write `.jfr` files to `path`,
406 /// and disables metadata auto-detection (see [`ProfilerBuilder::with_custom_agent_metadata`])
407 /// since the [LocalReporter] does not need that.
408 ///
409 /// This is useful for testing, since metadata auto-detection currently only works
410 /// on [Amazon EC2] or [Amazon Fargate] instances.
411 ///
412 /// The local reporter should normally not be used in production, since it will
413 /// not clean up JFR files. Instead, you can use a pre-existing [`Reporter`]
414 /// or write your own (see [`ProfilerBuilder::with_reporter`]).
415 ///
416 /// [Amazon EC2]: https://aws.amazon.com/ec2
417 /// [Amazon Fargate]: https://aws.amazon.com/fargate
418 ///
419 /// ## Example
420 ///
421 /// This will write profiles as `.jfr` files to `./path-to-profiles`:
422 ///
423 /// ```no_run
424 /// # use std::path::PathBuf;
425 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
426 /// # use async_profiler_agent::reporter::local::LocalReporter;
427 /// # use async_profiler_agent::metadata::AgentMetadata;
428 /// let path = PathBuf::from("./path-to-profiles");
429 /// let agent = ProfilerBuilder::default()
430 /// .with_local_reporter(path)
431 /// .build()
432 /// .spawn()?;
433 /// # Ok::<_, SpawnError>(())
434 /// ```
435 pub fn with_local_reporter(mut self, path: impl Into<PathBuf>) -> ProfilerBuilder {
436 self.reporter = Some(Box::new(LocalReporter::new(path.into())));
437 self.with_custom_agent_metadata(AgentMetadata::NoMetadata)
438 }
439
440 /// Provide custom agent metadata.
441 ///
442 /// The async-profiler Rust agent sends metadata to the [Reporter] with
443 /// the identity of the current host and process, which is normally
444 /// transmitted as `metadata.json` within the generated `.zip` file,
445 /// using the schema format [`reporter::s3::MetadataJson`].
446 ///
447 /// That metadata can later be used by tooling to be able to sort
448 /// profiling reports by host.
449 ///
450 /// async-profiler Rust agent will by default try to fetch the metadata
451 /// using [IMDS] when running on [Amazon EC2] or [Amazon Fargate], and
452 /// will error if it's unable to find it. If you are running the
453 /// async-profiler agent on any other form of compute,
454 /// you will need to create and attach your own metadata
455 /// by calling this function.
456 ///
457 #[cfg_attr(feature = "s3-no-defaults", doc = include_str!("s3-example-custom-metadata.md"))]
458 /// [`reporter::s3::MetadataJson`]: crate::reporter::s3::MetadataJson
459 /// [Amazon EC2]: https://aws.amazon.com/ec2
460 /// [Amazon Fargate]: https://aws.amazon.com/fargate
461 /// [IMDS]: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html
462 pub fn with_custom_agent_metadata(mut self, j: AgentMetadata) -> ProfilerBuilder {
463 self.agent_metadata = Some(j);
464 self
465 }
466
467 /// Provide custom profiler options.
468 ///
469 /// ### Example
470 ///
471 /// This will sample allocations for every 10 megabytes allocated:
472 ///
473 /// ```
474 /// # use async_profiler_agent::profiler::{ProfilerBuilder, ProfilerOptionsBuilder};
475 /// # use async_profiler_agent::profiler::SpawnError;
476 /// # #[tokio::main]
477 /// # async fn main() -> Result<(), SpawnError> {
478 /// let opts = ProfilerOptionsBuilder::default().with_native_mem("10m".into()).build();
479 /// let profiler = ProfilerBuilder::default()
480 /// .with_profiler_options(opts)
481 /// .with_local_reporter("/tmp/profiles")
482 /// .build();
483 /// # if false { // don't spawn the profiler in doctests
484 /// let profiler = profiler.spawn_controllable()?;
485 /// // ... your program goes here
486 /// profiler.stop().await; // make sure the last profile is flushed
487 /// # }
488 /// # Ok(())
489 /// # }
490 /// ```
491 pub fn with_profiler_options(mut self, c: ProfilerOptions) -> ProfilerBuilder {
492 self.profiler_options = Some(c);
493 self
494 }
495
496 /// Turn this builder into a profiler!
497 pub fn build(self) -> Profiler {
498 Profiler {
499 reporting_interval: self.reporting_interval.unwrap_or(Duration::from_secs(30)),
500 reporter: self.reporter.expect("reporter is required"),
501 agent_metadata: self.agent_metadata,
502 profiler_options: self.profiler_options.unwrap_or_default(),
503 }
504 }
505}
506
507enum Status {
508 Idle,
509 Starting,
510 Running(SystemTime),
511}
512
513/// This type provides wrapper APIs over [`asprof::AsProf`], to allow tracking
514/// of the state of the profiler. The primary benefit of this is RAII - when
515/// this type drops, it will stop the profiler if it's running.
516struct ProfilerState<E: ProfilerEngine> {
517 // this is only None in the destructor when stopping the async-profiler fails
518 jfr_file: Option<JfrFile>,
519 asprof: E,
520 status: Status,
521 profiler_options: ProfilerOptions,
522}
523
524impl<E: ProfilerEngine> ProfilerState<E> {
525 fn new(asprof: E, profiler_options: ProfilerOptions) -> Result<Self, io::Error> {
526 Ok(Self {
527 jfr_file: Some(JfrFile::new()?),
528 asprof,
529 status: Status::Idle,
530 profiler_options,
531 })
532 }
533
534 fn jfr_file_mut(&mut self) -> &mut JfrFile {
535 self.jfr_file.as_mut().unwrap()
536 }
537
538 async fn start(&mut self) -> Result<(), AsProfError> {
539 let active = self.jfr_file.as_ref().unwrap().active_path();
540 // drop guard - make sure the files are leaked if the profiler might have started
541 self.status = Status::Starting;
542 E::start_async_profiler(&self.asprof, &active, &self.profiler_options)?;
543 self.status = Status::Running(SystemTime::now());
544 Ok(())
545 }
546
547 fn stop(&mut self) -> Result<Option<SystemTime>, AsProfError> {
548 E::stop_async_profiler()?;
549 let status = std::mem::replace(&mut self.status, Status::Idle);
550 Ok(match status {
551 Status::Idle | Status::Starting => None,
552 Status::Running(since) => Some(since),
553 })
554 }
555
556 fn is_started(&self) -> bool {
557 matches!(self.status, Status::Running(_))
558 }
559}
560
561impl<E: ProfilerEngine> Drop for ProfilerState<E> {
562 fn drop(&mut self) {
563 match self.status {
564 Status::Running(_) => {
565 if let Err(err) = self.stop() {
566 // SECURITY: avoid removing the JFR file if stopping the profiler fails,
567 // to avoid symlink races
568 std::mem::forget(self.jfr_file.take());
569 // XXX: Rust defines leaking resources during drop as safe.
570 tracing::warn!(?err, "unable to stop profiler during drop glue");
571 }
572 }
573 Status::Idle => {}
574 Status::Starting => {
575 // SECURITY: avoid removing the JFR file if stopping the profiler fails,
576 // to avoid symlink races
577 std::mem::forget(self.jfr_file.take());
578 }
579 }
580 }
581}
582
583pub(crate) trait ProfilerEngine: Send + Sync + 'static {
584 fn init_async_profiler() -> Result<(), asprof::AsProfError>;
585 fn start_async_profiler(
586 &self,
587 jfr_file_path: &Path,
588 options: &ProfilerOptions,
589 ) -> Result<(), asprof::AsProfError>;
590 fn stop_async_profiler() -> Result<(), asprof::AsProfError>;
591}
592
593#[derive(Debug, Error)]
594#[non_exhaustive]
595enum TickError {
596 #[error(transparent)]
597 AsProf(#[from] AsProfError),
598 #[error(transparent)]
599 #[cfg(feature = "aws-metadata-no-defaults")]
600 Metadata(#[from] crate::metadata::aws::AwsProfilerMetadataError),
601 #[error("reporter: {0}")]
602 Reporter(Box<dyn std::error::Error + Send>),
603 #[error("broken clock: {0}")]
604 BrokenClock(#[from] SystemTimeError),
605 #[error("jfr read error: {0}")]
606 JfrRead(io::Error),
607 #[error("empty inactive file error: {0}")]
608 EmptyInactiveFile(io::Error),
609}
610
611#[derive(Debug, Error)]
612#[non_exhaustive]
613/// An error that happened spawning a profiler
614pub enum SpawnError {
615 /// Error from async-profiler
616 #[error(transparent)]
617 AsProf(#[from] asprof::AsProfError),
618 /// Error writing to a tempfile
619 #[error("tempfile error")]
620 TempFile(#[source] io::Error),
621}
622
623#[derive(Debug, Error)]
624#[non_exhaustive]
625/// An error from [`Profiler::spawn_thread`]
626pub enum SpawnThreadError {
627 /// Error from async-profiler
628 #[error(transparent)]
629 AsProf(#[from] SpawnError),
630 /// Error constructing Tokio runtime
631 #[error("constructing Tokio runtime")]
632 ConstructRt(#[source] io::Error),
633}
634
635// no control messages currently
636enum Control {}
637
638/// A handle to a running profiler
639///
640/// Currently just allows for stopping the profiler.
641///
642/// Dropping this handle will request that the profiler will stop.
643#[must_use = "dropping this stops the profiler, call .detach() to detach"]
644pub struct RunningProfiler {
645 stop_channel: tokio::sync::oneshot::Sender<Control>,
646 join_handle: tokio::task::JoinHandle<()>,
647}
648
649impl RunningProfiler {
650 /// Request that the current profiler stops and wait until it exits.
651 ///
652 /// This will cause the currently-pending profile information to be flushed.
653 ///
654 /// After this function returns, it is correct and safe to [spawn] a new
655 /// [Profiler], possibly with a different configuration. Therefore,
656 /// this function can be used to "reconfigure" a profiler by stopping
657 /// it and then starting a new one with a different configuration.
658 ///
659 /// [spawn]: Profiler::spawn_controllable
660 pub async fn stop(self) {
661 drop(self.stop_channel);
662 let _ = self.join_handle.await;
663 }
664
665 /// Like [Self::detach], but returns a JoinHandle. This is currently not a public API.
666 fn detach_inner(self) -> tokio::task::JoinHandle<()> {
667 tokio::task::spawn(async move {
668 // move the control channel to the spawned task. this way, it will be dropped
669 // just when the task is aborted.
670 let _abort_channel = self.stop_channel;
671 self.join_handle.await.ok();
672 })
673 }
674
675 /// Detach this profiler. This will prevent the profiler from being stopped
676 /// when this handle is dropped. You should call this (or [Profiler::spawn]
677 /// instead of [Profiler::spawn_controllable], which does the same thing)
678 /// if you don't intend to reconfigure your profiler at runtime.
679 pub fn detach(self) {
680 self.detach_inner();
681 }
682
683 /// Spawns this [RunningProfiler] into a separate thread within a new Tokio runtime,
684 /// and returns a [RunningProfilerThread] attached to it.
685 fn spawn_attached(
686 self,
687 runtime: tokio::runtime::Runtime,
688 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
689 ) -> RunningProfilerThread {
690 RunningProfilerThread {
691 stop_channel: self.stop_channel,
692 join_handle: spawn_fn(Box::new(move || {
693 let _ = runtime.block_on(self.join_handle);
694 })),
695 }
696 }
697
698 /// Spawns this [RunningProfiler] into a separate thread within a new Tokio runtime,
699 /// and detaches it.
700 fn spawn_detached(
701 self,
702 runtime: tokio::runtime::Runtime,
703 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
704 ) {
705 spawn_fn(Box::new(move || {
706 let _stop_channel = self.stop_channel;
707 let _ = runtime.block_on(self.join_handle);
708 }));
709 }
710}
711
712/// A handle to a running profiler, running on a separate thread.
713///
714/// Currently just allows for stopping the profiler.
715///
716/// Dropping this handle will request that the profiler will stop.
717#[must_use = "dropping this stops the profiler, call .detach() to detach"]
718pub struct RunningProfilerThread {
719 stop_channel: tokio::sync::oneshot::Sender<Control>,
720 join_handle: std::thread::JoinHandle<()>,
721}
722
723impl RunningProfilerThread {
724 /// Request that the current profiler stops and wait until it exits.
725 ///
726 /// This will cause the currently-pending profile information to be flushed.
727 ///
728 /// After this function returns, it is correct and safe to [spawn] a new
729 /// [Profiler], possibly with a different configuration. Therefore,
730 /// this function can be used to "reconfigure" a profiler by stopping
731 /// it and then starting a new one with a different configuration.
732 ///
733 /// [spawn]: Profiler::spawn_controllable
734 pub fn stop(self) {
735 drop(self.stop_channel);
736 let _ = self.join_handle.join();
737 }
738}
739
740/// Rust profiler based on [async-profiler].
741///
742/// Spawning a profiler can be done either in an attached (controllable)
743/// mode, which allows for stopping the profiler (and, in fact, stops
744/// it when the relevant handle is dropped), or in detached mode,
745/// in which the profiler keeps running forever. Applications that can
746/// shut down the profiler at run-time, for example applications that
747/// support reconfiguration of a running profiler, generally want to use
748/// controllable mode. Other applications (most of them) should use
749/// detached mode.
750///
751/// In addition, the profiler can either be spawned into the current Tokio
752/// runtime, or into a new one. Normally, applications should spawn
753/// the profiler into their own Tokio runtime, but applications that
754/// don't have a default Tokio runtime should spawn it into a
755/// different one
756///
757/// This leaves 4 functions:
758/// 1. [Self::spawn] - detached, same runtime
759/// 2. [Self::spawn_thread_to_runtime] - detached, different runtime
760/// 3. [Self::spawn_controllable] - controllable, same runtime
761/// 4. [Self::spawn_controllable_thread_to_runtime] - controllable, different runtime
762///
763/// In addition, there's a helper function that just spawns the profiler
764/// to a new runtime in a new thread, for applications that don't have
765/// a Tokio runtime and don't need complex control:
766///
767/// 5. [Self::spawn_thread] - detached, new runtime in a new thread
768///
769/// [async-profiler]: https://github.com/async-profiler/async-profiler
770pub struct Profiler {
771 reporting_interval: Duration,
772 reporter: Box<dyn Reporter + Send + Sync>,
773 agent_metadata: Option<AgentMetadata>,
774 profiler_options: ProfilerOptions,
775}
776
777impl Profiler {
778 /// Start profiling. The profiler will run in a tokio task at the configured interval.
779 ///
780 /// This is the same as calling [Profiler::spawn_controllable] followed by
781 /// [RunningProfiler::detach], except it returns a [JoinHandle].
782 ///
783 /// The returned [JoinHandle] can be used to detect if the profiler has exited
784 /// due to a fatal error.
785 ///
786 /// This function will fail if it is unable to start async-profiler, for example
787 /// if it can't find or load `libasyncProfiler.so`.
788 ///
789 /// [JoinHandle]: tokio::task::JoinHandle
790 ///
791 /// ### Uploading the last sample
792 ///
793 /// When you return from the Tokio `main`, the agent will terminate without waiting
794 /// for the last profiling JFR to be uploaded. Especially if you have a
795 /// short-running program, if you want to ensure the last profiling JFR
796 /// is uploaded, you should use [Profiler::spawn_controllable] and
797 /// [RunningProfiler::stop] , which allows waiting for the upload
798 /// to finish.
799 ///
800 /// If you do not care about losing the last sample, it is fine to directly
801 /// return from the Tokio `main` without stopping the profiler.
802 ///
803 /// ### Tokio Runtime
804 ///
805 /// This function must be run within a Tokio runtime, otherwise it will panic. If
806 /// your application does not have a `main` Tokio runtime, see
807 /// [Profiler::spawn_thread].
808 ///
809 /// ### Example
810 ///
811 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
812 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
813 ///
814 /// ```
815 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
816 /// # #[tokio::main]
817 /// # async fn main() -> Result<(), SpawnError> {
818 /// let profiler = ProfilerBuilder::default()
819 /// .with_local_reporter("/tmp/profiles")
820 /// .build();
821 /// # if false { // don't spawn the profiler in doctests
822 /// profiler.spawn()?;
823 /// # }
824 /// # Ok(())
825 /// # }
826 /// ```
827 pub fn spawn(self) -> Result<tokio::task::JoinHandle<()>, SpawnError> {
828 self.spawn_controllable().map(RunningProfiler::detach_inner)
829 }
830
831 /// Like [Self::spawn], but instead of spawning within the current Tokio
832 /// runtime, spawns within a set Tokio runtime and then runs a thread that calls
833 /// [block_on](tokio::runtime::Runtime::block_on) on that runtime.
834 ///
835 /// If your configuration is standard, use [Profiler::spawn_thread].
836 ///
837 /// If you want to be able to stop the resulting profiler, use
838 /// [Profiler::spawn_controllable_thread_to_runtime].
839 ///
840 /// `spawn_fn` should be [`std::thread::spawn`], or some function that behaves like it (to
841 /// allow for configuring thread properties, for example thread names).
842 ///
843 /// This is to be used when your program does not have a "main" Tokio runtime already set up.
844 ///
845 /// ### Uploading the last sample
846 ///
847 /// When you return from `main`, the agent will terminate without waiting
848 /// for the last profiling JFR to be uploaded. Especially if you have a
849 /// short-running program, if you want to ensure the last profiling JFR
850 /// is uploaded, you should use [Profiler::spawn_controllable_thread_to_runtime]
851 /// and [RunningProfilerThread::stop], which allows waiting for the upload
852 /// to finish.
853 ///
854 /// If you do not care about losing the last sample, it is fine to directly
855 /// return from the Tokio `main` without stopping the profiler.
856 ///
857 /// ### Example
858 ///
859 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
860 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
861 ///
862 /// ```no_run
863 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
864 /// let rt = tokio::runtime::Builder::new_current_thread()
865 /// .enable_all()
866 /// .build()?;
867 /// let profiler = ProfilerBuilder::default()
868 /// .with_local_reporter("/tmp/profiles")
869 /// .build();
870 ///
871 /// profiler.spawn_thread_to_runtime(
872 /// rt,
873 /// |t| {
874 /// std::thread::Builder::new()
875 /// .name("asprof-agent".to_owned())
876 /// .spawn(t)
877 /// .expect("thread name contains nuls")
878 /// }
879 /// )?;
880 /// # Ok::<_, anyhow::Error>(())
881 /// ```
882 pub fn spawn_thread_to_runtime(
883 self,
884 runtime: tokio::runtime::Runtime,
885 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
886 ) -> Result<(), SpawnError> {
887 self.spawn_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn)
888 }
889
890 /// Like [Self::spawn], but instead of spawning within the current Tokio
891 /// runtime, spawns within a new Tokio runtime and then runs a thread that calls
892 /// [block_on](tokio::runtime::Runtime::block_on) on that runtime, setting up the runtime
893 /// by itself.
894 ///
895 /// If your configuration is less standard, use [Profiler::spawn_thread_to_runtime]. Calling
896 /// [Profiler::spawn_thread] is equivalent to calling [Profiler::spawn_thread_to_runtime]
897 /// with the following:
898 /// 1. a current thread runtime with background worker threads (these exist
899 /// for blocking IO) named "asprof-worker"
900 /// 2. a controller thread (the "main" thread of the runtime) named "asprof-agent"
901 ///
902 /// If you want to be able to stop the resulting profiler, use
903 /// [Profiler::spawn_controllable_thread_to_runtime].
904 ///
905 /// This is to be used when your program does not have a "main" Tokio runtime already set up.
906 ///
907 /// ### Uploading the last sample
908 ///
909 /// When you return from `main`, the agent will terminate without waiting
910 /// for the last profiling JFR to be uploaded. Especially if you have a
911 /// short-running program, if you want to ensure the last profiling JFR
912 /// is uploaded, you should use [Profiler::spawn_controllable_thread_to_runtime]
913 /// and [RunningProfilerThread::stop], which allows waiting for the upload
914 /// to finish.
915 ///
916 /// If you do not care about losing the last sample, it is fine to directly
917 /// return from the Tokio `main` without stopping the profiler.
918 ///
919 /// ### Example
920 ///
921 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
922 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
923 ///
924 /// ```no_run
925 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
926 /// # use async_profiler_agent::reporter::local::LocalReporter;
927 /// let profiler = ProfilerBuilder::default()
928 /// .with_local_reporter("/tmp/profiles")
929 /// .build();
930 ///
931 /// profiler.spawn_thread()?;
932 /// # Ok::<_, anyhow::Error>(())
933 /// ```
934 pub fn spawn_thread(self) -> Result<(), SpawnThreadError> {
935 // using "asprof" in thread name to deal with 15 character + \0 length limit
936 let rt = tokio::runtime::Builder::new_current_thread()
937 .thread_name("asprof-worker".to_owned())
938 .enable_all()
939 .build()
940 .map_err(SpawnThreadError::ConstructRt)?;
941 let builder = std::thread::Builder::new().name("asprof-agent".to_owned());
942 self.spawn_thread_to_runtime(rt, |t| builder.spawn(t).expect("thread name contains nuls"))
943 .map_err(SpawnThreadError::AsProf)
944 }
945
946 fn spawn_thread_inner<E: ProfilerEngine>(
947 self,
948 asprof: E,
949 runtime: tokio::runtime::Runtime,
950 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
951 ) -> Result<(), SpawnError> {
952 let handle: RunningProfiler = runtime.block_on(async move { self.spawn_inner(asprof) })?;
953 handle.spawn_detached(runtime, spawn_fn);
954 Ok(())
955 }
956
957 /// Like [Self::spawn], but returns a [RunningProfiler] that allows for controlling
958 /// (currently only stopping) the profiler.
959 ///
960 /// This allows for changing the configuration of the profiler at runtime, by
961 /// stopping it and then starting a new Profiler with a new configuration. It
962 /// also allows for stopping profiling in case the profiler is suspected to
963 /// cause operational issues.
964 ///
965 /// Dropping the returned [RunningProfiler] will cause the profiler to quit,
966 /// so if your application doen't need to change the profiler's configuration at runtime,
967 /// it will be easier to use [Profiler::spawn].
968 ///
969 /// This function will fail if it is unable to start async-profiler, for example
970 /// if it can't find or load `libasyncProfiler.so`.
971 ///
972 /// ### Uploading the last sample
973 ///
974 /// When you return from the Tokio `main`, the agent will terminate without waiting
975 /// for the last profiling JFR to be uploaded. Especially if you have a
976 /// short-running program, if you want to ensure the last profiling JFR
977 /// is uploaded, you should use [RunningProfiler::stop], which allows waiting for
978 /// the upload to finish.
979 ///
980 /// If you do not care about losing the last sample, it is fine to directly
981 /// return from the Tokio `main` without stopping the profiler.
982 ///
983 /// ### Tokio Runtime
984 ///
985 /// This function must be run within a Tokio runtime, otherwise it will panic. If
986 /// your application does not have a `main` Tokio runtime, see
987 /// [Profiler::spawn_controllable_thread_to_runtime].
988 ///
989 /// ### Example
990 ///
991 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
992 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
993 ///
994 /// ```no_run
995 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
996 /// # #[tokio::main]
997 /// # async fn main() -> Result<(), SpawnError> {
998 /// let profiler = ProfilerBuilder::default()
999 /// .with_local_reporter("/tmp/profiles")
1000 /// .build();
1001 ///
1002 /// let profiler = profiler.spawn_controllable()?;
1003 ///
1004 /// // [insert your signaling/monitoring mechanism to have a request to disable
1005 /// // profiling in case of a problem]
1006 /// let got_request_to_disable_profiling = async move {
1007 /// // ...
1008 /// # false
1009 /// };
1010 /// // spawn a task that will disable profiling if requested
1011 /// tokio::task::spawn(async move {
1012 /// if got_request_to_disable_profiling.await {
1013 /// profiler.stop().await;
1014 /// }
1015 /// });
1016 /// # Ok(())
1017 /// # }
1018 /// ```
1019 pub fn spawn_controllable(self) -> Result<RunningProfiler, SpawnError> {
1020 self.spawn_inner(asprof::AsProf::builder().build())
1021 }
1022
1023 /// Like [Self::spawn_controllable], but instead of spawning within the current Tokio
1024 /// runtime, spawns within a set Tokio runtime and then runs a thread that calls
1025 /// [block_on](tokio::runtime::Runtime::block_on) on that runtime.
1026 ///
1027 /// `spawn_fn` should be [`std::thread::spawn`], or some function that behaves like it (to
1028 /// allow for configuring thread properties, for example thread names).
1029 ///
1030 /// This is to be used when your program does not have a "main" Tokio runtime already set up.
1031 ///
1032 /// ### Uploading the last sample
1033 ///
1034 /// When you return from `main`, the agent will terminate without waiting
1035 /// for the last profiling JFR to be uploaded. Especially if you have a
1036 /// short-running program, if you want to ensure the last profiling JFR
1037 /// is uploaded, you should use [RunningProfilerThread::stop], which allows waiting
1038 /// for the upload to finish.
1039 ///
1040 /// If you do not care about losing the last sample, it is fine to directly
1041 /// return from the Tokio `main` without stopping the profiler.
1042 ///
1043 /// ### Example
1044 ///
1045 /// This example uses [ProfilerBuilder::with_local_reporter] which reports the profiles to
1046 /// a directory. It works with any other [Reporter] using [ProfilerBuilder::with_reporter].
1047 ///
1048 /// ```no_run
1049 /// # use async_profiler_agent::profiler::{ProfilerBuilder, SpawnError};
1050 /// let rt = tokio::runtime::Builder::new_current_thread()
1051 /// .enable_all()
1052 /// .build()?;
1053 /// let profiler = ProfilerBuilder::default()
1054 /// .with_local_reporter("/tmp/profiles")
1055 /// .build();
1056 ///
1057 /// let profiler = profiler.spawn_controllable_thread_to_runtime(
1058 /// rt,
1059 /// |t| {
1060 /// std::thread::Builder::new()
1061 /// .name("asprof-agent".to_owned())
1062 /// .spawn(t)
1063 /// .expect("thread name contains nuls")
1064 /// }
1065 /// )?;
1066 ///
1067 /// # fn got_request_to_disable_profiling() -> bool { false }
1068 /// // spawn a task that will disable profiling if requested
1069 /// std::thread::spawn(move || {
1070 /// if got_request_to_disable_profiling() {
1071 /// profiler.stop();
1072 /// }
1073 /// });
1074 /// # Ok::<_, anyhow::Error>(())
1075 /// ```
1076 pub fn spawn_controllable_thread_to_runtime(
1077 self,
1078 runtime: tokio::runtime::Runtime,
1079 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
1080 ) -> Result<RunningProfilerThread, SpawnError> {
1081 self.spawn_controllable_thread_inner(asprof::AsProf::builder().build(), runtime, spawn_fn)
1082 }
1083
1084 fn spawn_controllable_thread_inner<E: ProfilerEngine>(
1085 self,
1086 asprof: E,
1087 runtime: tokio::runtime::Runtime,
1088 spawn_fn: impl FnOnce(Box<dyn FnOnce() + Send>) -> std::thread::JoinHandle<()>,
1089 ) -> Result<RunningProfilerThread, SpawnError> {
1090 let handle = runtime.block_on(async move { self.spawn_inner(asprof) })?;
1091 Ok(handle.spawn_attached(runtime, spawn_fn))
1092 }
1093
1094 fn spawn_inner<E: ProfilerEngine>(self, asprof: E) -> Result<RunningProfiler, SpawnError> {
1095 struct LogOnDrop;
1096 impl Drop for LogOnDrop {
1097 fn drop(&mut self) {
1098 // Tokio will call destructors during runtime shutdown. Have something that will
1099 // emit a log in that case
1100 tracing::info!(
1101 "unable to upload the last jfr due to Tokio runtime shutdown. \
1102 Add a call to `RunningProfiler::stop` to wait for jfr upload to finish."
1103 );
1104 }
1105 }
1106
1107 // Initialize async profiler - needs to be done once.
1108 E::init_async_profiler()?;
1109 tracing::info!("successfully initialized async profiler.");
1110
1111 let mut sampling_ticker = tokio::time::interval(self.reporting_interval);
1112 let (stop_channel, mut stop_rx) = tokio::sync::oneshot::channel();
1113
1114 // Get profiles at the configured interval rate.
1115 let join_handle = tokio::spawn(async move {
1116 let mut state = match ProfilerState::new(asprof, self.profiler_options) {
1117 Ok(state) => state,
1118 Err(err) => {
1119 tracing::error!(?err, "unable to create profiler state");
1120 return;
1121 }
1122 };
1123
1124 // Lazily-loaded if not specified up front.
1125 let mut agent_metadata = self.agent_metadata;
1126 let mut done = false;
1127
1128 let guard = LogOnDrop;
1129 while !done {
1130 // Wait until a timer or exit event
1131 tokio::select! {
1132 biased;
1133
1134 r = &mut stop_rx, if !stop_rx.is_terminated() => {
1135 match r {
1136 Err(_) => {
1137 tracing::info!("profiler stop requested, doing a final tick");
1138 done = true;
1139 }
1140 }
1141 }
1142 _ = sampling_ticker.tick() => {
1143 tracing::debug!("profiler timer woke up");
1144 }
1145 }
1146
1147 if let Err(err) = profiler_tick(
1148 &mut state,
1149 &mut agent_metadata,
1150 &*self.reporter,
1151 self.reporting_interval,
1152 )
1153 .await
1154 {
1155 match &err {
1156 TickError::Reporter(_) => {
1157 // don't stop on IO errors
1158 tracing::error!(?err, "error during profiling, continuing");
1159 }
1160 _stop => {
1161 tracing::error!(?err, "error during profiling, stopping");
1162 break;
1163 }
1164 }
1165 }
1166 }
1167
1168 mem::forget(guard);
1169 tracing::info!("profiling task finished");
1170 });
1171
1172 Ok(RunningProfiler {
1173 stop_channel,
1174 join_handle,
1175 })
1176 }
1177}
1178
1179async fn profiler_tick<E: ProfilerEngine>(
1180 state: &mut ProfilerState<E>,
1181 agent_metadata: &mut Option<AgentMetadata>,
1182 reporter: &(dyn Reporter + Send + Sync),
1183 reporting_interval: Duration,
1184) -> Result<(), TickError> {
1185 if !state.is_started() {
1186 state.start().await?;
1187 return Ok(());
1188 }
1189
1190 let Some(start) = state.stop()? else {
1191 tracing::warn!("stopped the profiler but it wasn't running?");
1192 return Ok(());
1193 };
1194 let start = start.duration_since(UNIX_EPOCH)?;
1195 let end = SystemTime::now().duration_since(UNIX_EPOCH)?;
1196
1197 // Start it up immediately, writing to the "other" file, so that we keep
1198 // profiling the application while we're reporting data.
1199 state
1200 .jfr_file_mut()
1201 .empty_inactive_file()
1202 .map_err(TickError::EmptyInactiveFile)?;
1203 state.jfr_file_mut().swap();
1204 state.start().await?;
1205
1206 // Lazily load the agent metadata if it was not provided in
1207 // the constructor. See the struct comments for why this is.
1208 // This code runs at most once.
1209 if agent_metadata.is_none() {
1210 #[cfg(feature = "aws-metadata-no-defaults")]
1211 let md = crate::metadata::aws::load_agent_metadata().await?;
1212 #[cfg(not(feature = "aws-metadata-no-defaults"))]
1213 let md = crate::metadata::AgentMetadata::NoMetadata;
1214 tracing::debug!("loaded metadata");
1215 agent_metadata.replace(md);
1216 }
1217
1218 let report_metadata = ReportMetadata {
1219 instance: agent_metadata.as_ref().unwrap(),
1220 start,
1221 end,
1222 reporting_interval,
1223 };
1224
1225 let jfr = tokio::fs::read(state.jfr_file_mut().inactive_path())
1226 .await
1227 .map_err(TickError::JfrRead)?;
1228
1229 reporter
1230 .report(jfr, &report_metadata)
1231 .await
1232 .map_err(TickError::Reporter)?;
1233
1234 Ok(())
1235}
1236
1237#[cfg(test)]
1238mod tests {
1239 use std::sync::Arc;
1240 use std::sync::atomic::{self, AtomicBool, AtomicU32};
1241
1242 use test_case::test_case;
1243
1244 use super::*;
1245
1246 #[test]
1247 fn test_jfr_file_drop() {
1248 let mut jfr = JfrFile::new().unwrap();
1249
1250 std::fs::write(jfr.active_path(), b"Hello, 2!").unwrap();
1251 jfr.swap();
1252 assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"Hello, 2!");
1253 jfr.empty_inactive_file().unwrap();
1254 assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"");
1255 }
1256
1257 struct MockProfilerEngine {
1258 counter: AtomicU32,
1259 }
1260 impl ProfilerEngine for MockProfilerEngine {
1261 fn init_async_profiler() -> Result<(), asprof::AsProfError> {
1262 Ok(())
1263 }
1264
1265 fn start_async_profiler(
1266 &self,
1267 jfr_file_path: &Path,
1268 _options: &ProfilerOptions,
1269 ) -> Result<(), asprof::AsProfError> {
1270 let contents = format!(
1271 "JFR{}",
1272 self.counter.fetch_add(1, atomic::Ordering::Relaxed)
1273 );
1274 std::fs::write(jfr_file_path, contents.as_bytes()).unwrap();
1275 Ok(())
1276 }
1277
1278 fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
1279 Ok(())
1280 }
1281 }
1282
1283 struct MockReporter(tokio::sync::mpsc::Sender<(String, AgentMetadata)>);
1284 impl std::fmt::Debug for MockReporter {
1285 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1286 f.debug_struct("MockReporter").finish()
1287 }
1288 }
1289
1290 #[async_trait::async_trait]
1291 impl Reporter for MockReporter {
1292 async fn report(
1293 &self,
1294 jfr: Vec<u8>,
1295 metadata: &ReportMetadata,
1296 ) -> Result<(), Box<dyn std::error::Error + Send>> {
1297 self.0
1298 .send((String::from_utf8(jfr).unwrap(), metadata.instance.clone()))
1299 .await
1300 .unwrap();
1301 Ok(())
1302 }
1303 }
1304
1305 fn make_mock_profiler() -> (
1306 Profiler,
1307 tokio::sync::mpsc::Receiver<(String, AgentMetadata)>,
1308 ) {
1309 let (tx, rx) = tokio::sync::mpsc::channel(1);
1310 let agent = ProfilerBuilder::default()
1311 .with_reporter(MockReporter(tx))
1312 .with_custom_agent_metadata(AgentMetadata::Ec2AgentMetadata {
1313 aws_account_id: "0".into(),
1314 aws_region_id: "us-east-1".into(),
1315 ec2_instance_id: "i-fake".into(),
1316 ec2_instance_type: "t3.micro".into(),
1317 })
1318 .build();
1319 (agent, rx)
1320 }
1321
1322 #[tokio::test(start_paused = true)]
1323 async fn test_profiler_agent() {
1324 let e_md = AgentMetadata::Ec2AgentMetadata {
1325 aws_account_id: "0".into(),
1326 aws_region_id: "us-east-1".into(),
1327 ec2_instance_id: "i-fake".into(),
1328 ec2_instance_type: "t3.micro".into(),
1329 };
1330 let (agent, mut rx) = make_mock_profiler();
1331 agent
1332 .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1333 counter: AtomicU32::new(0),
1334 })
1335 .unwrap()
1336 .detach();
1337 let (jfr, md) = rx.recv().await.unwrap();
1338 assert_eq!(jfr, "JFR0");
1339 assert_eq!(e_md, md);
1340 let (jfr, md) = rx.recv().await.unwrap();
1341 assert_eq!(jfr, "JFR1");
1342 assert_eq!(e_md, md);
1343 }
1344
1345 #[test_case(false; "uncontrollable")]
1346 #[test_case(true; "controllable")]
1347 fn test_profiler_local_rt(controllable: bool) {
1348 let e_md = AgentMetadata::Ec2AgentMetadata {
1349 aws_account_id: "0".into(),
1350 aws_region_id: "us-east-1".into(),
1351 ec2_instance_id: "i-fake".into(),
1352 ec2_instance_type: "t3.micro".into(),
1353 };
1354 let (agent, mut rx) = make_mock_profiler();
1355 let rt = tokio::runtime::Builder::new_current_thread()
1356 .enable_all()
1357 .start_paused(true)
1358 .build()
1359 .unwrap();
1360 // spawn the profiler, doing this before spawning a thread to allow
1361 // capturing errors from `spawn`
1362 let handle = if controllable {
1363 Some(
1364 agent
1365 .spawn_controllable_thread_inner::<MockProfilerEngine>(
1366 MockProfilerEngine {
1367 counter: AtomicU32::new(0),
1368 },
1369 rt,
1370 std::thread::spawn,
1371 )
1372 .unwrap(),
1373 )
1374 } else {
1375 agent
1376 .spawn_thread_inner::<MockProfilerEngine>(
1377 MockProfilerEngine {
1378 counter: AtomicU32::new(0),
1379 },
1380 rt,
1381 std::thread::spawn,
1382 )
1383 .unwrap();
1384 None
1385 };
1386
1387 let (jfr, md) = rx.blocking_recv().unwrap();
1388 assert_eq!(jfr, "JFR0");
1389 assert_eq!(e_md, md);
1390 let (jfr, md) = rx.blocking_recv().unwrap();
1391 assert_eq!(jfr, "JFR1");
1392 assert_eq!(e_md, md);
1393
1394 if let Some(handle) = handle {
1395 let drain_thread =
1396 std::thread::spawn(move || while let Some(_) = rx.blocking_recv() {});
1397 // request a stop
1398 handle.stop();
1399 // the drain thread should be done
1400 drain_thread.join().unwrap();
1401 }
1402 }
1403
1404 enum StopKind {
1405 Delibrate,
1406 Drop,
1407 Abort,
1408 }
1409
1410 #[tokio::test(start_paused = true)]
1411 #[test_case(StopKind::Delibrate; "deliberate stop")]
1412 #[test_case(StopKind::Drop; "drop stop")]
1413 #[test_case(StopKind::Abort; "abort stop")]
1414 async fn test_profiler_stop(stop_kind: StopKind) {
1415 let e_md = AgentMetadata::Ec2AgentMetadata {
1416 aws_account_id: "0".into(),
1417 aws_region_id: "us-east-1".into(),
1418 ec2_instance_id: "i-fake".into(),
1419 ec2_instance_type: "t3.micro".into(),
1420 };
1421 let (agent, mut rx) = make_mock_profiler();
1422 let profiler_ref = agent
1423 .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
1424 counter: AtomicU32::new(0),
1425 })
1426 .unwrap();
1427 let (jfr, md) = rx.recv().await.unwrap();
1428 assert_eq!(jfr, "JFR0");
1429 assert_eq!(e_md, md);
1430 let (jfr, md) = rx.recv().await.unwrap();
1431 assert_eq!(jfr, "JFR1");
1432 assert_eq!(e_md, md);
1433 // check that stop is faster than an interval and returns an "immediate" next jfr
1434 match stop_kind {
1435 StopKind::Drop => drop(profiler_ref),
1436 StopKind::Delibrate => {
1437 tokio::time::timeout(Duration::from_millis(1), profiler_ref.stop())
1438 .await
1439 .unwrap();
1440 }
1441 StopKind::Abort => {
1442 // You can call Abort on the JoinHandle. make sure that is not buggy.
1443 profiler_ref.detach_inner().abort();
1444 }
1445 }
1446 // check that we get the next JFR "quickly", and the JFR after that is empty.
1447 let (jfr, md) = tokio::time::timeout(Duration::from_millis(1), rx.recv())
1448 .await
1449 .unwrap()
1450 .unwrap();
1451 assert_eq!(jfr, "JFR2");
1452 assert_eq!(e_md, md);
1453 assert!(rx.recv().await.is_none());
1454 }
1455
1456 // simulate a badly-behaved profiler that errors on start/stop and then
1457 // tries to access the JFR file
1458 struct StopErrorProfilerEngine {
1459 start_error: bool,
1460 counter: Arc<AtomicBool>,
1461 }
1462 impl ProfilerEngine for StopErrorProfilerEngine {
1463 fn init_async_profiler() -> Result<(), asprof::AsProfError> {
1464 Ok(())
1465 }
1466
1467 fn start_async_profiler(
1468 &self,
1469 jfr_file_path: &Path,
1470 _options: &ProfilerOptions,
1471 ) -> Result<(), asprof::AsProfError> {
1472 let jfr_file_path = jfr_file_path.to_owned();
1473 std::fs::write(&jfr_file_path, "JFR").unwrap();
1474 let counter = self.counter.clone();
1475 tokio::task::spawn(async move {
1476 tokio::time::sleep(Duration::from_secs(5)).await;
1477 assert_eq!(std::fs::read_to_string(jfr_file_path).unwrap(), "JFR");
1478 counter.store(true, atomic::Ordering::Release);
1479 });
1480 if self.start_error {
1481 Err(asprof::AsProfError::AsyncProfilerError("error".into()))
1482 } else {
1483 Ok(())
1484 }
1485 }
1486
1487 fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
1488 Err(asprof::AsProfError::AsyncProfilerError("error".into()))
1489 }
1490 }
1491
1492 #[tokio::test(start_paused = true)]
1493 #[test_case(false; "error on stop")]
1494 #[test_case(true; "error on start")]
1495 async fn test_profiler_error(start_error: bool) {
1496 let (agent, mut rx) = make_mock_profiler();
1497 let counter = Arc::new(AtomicBool::new(false));
1498 let engine = StopErrorProfilerEngine {
1499 start_error,
1500 counter: counter.clone(),
1501 };
1502 let handle = agent.spawn_inner(engine).unwrap().detach_inner();
1503 assert!(rx.recv().await.is_none());
1504 // check that the "sleep 5" step in start_async_profiler succeeds
1505 for _ in 0..100 {
1506 tokio::time::sleep(Duration::from_secs(1)).await;
1507 if counter.load(atomic::Ordering::Acquire) {
1508 handle.await.unwrap(); // Check that the JoinHandle is done
1509 return;
1510 }
1511 }
1512 panic!("didn't read from file");
1513 }
1514
1515 #[test]
1516 fn test_profiler_options_to_args_string_default() {
1517 let opts = ProfilerOptions::default();
1518 let dummy_path = Path::new("/tmp/test.jfr");
1519 let args = opts.to_args_string(dummy_path);
1520 assert!(
1521 args.contains("start,event=cpu,interval=100000000,wall=1000ms,jfr,cstack=dwarf"),
1522 "Default args string not constructed correctly"
1523 );
1524 assert!(args.contains("file=/tmp/test.jfr"));
1525 assert!(!args.contains("nativemem="));
1526 }
1527
1528 #[test]
1529 fn test_profiler_options_to_args_string_with_native_mem() {
1530 let opts = ProfilerOptions {
1531 native_mem: Some("10m".to_string()),
1532 wall_clock_millis: None,
1533 cpu_interval: None,
1534 };
1535 let dummy_path = Path::new("/tmp/test.jfr");
1536 let args = opts.to_args_string(dummy_path);
1537 assert!(args.contains("nativemem=10m"));
1538 }
1539
1540 #[test]
1541 fn test_profiler_options_builder() {
1542 let opts = ProfilerOptionsBuilder::default()
1543 .with_native_mem_bytes(5000000)
1544 .build();
1545
1546 assert_eq!(opts.native_mem, Some("5000000".to_string()));
1547 }
1548
1549 #[test]
1550 fn test_profiler_options_builder_all_options() {
1551 let opts = ProfilerOptionsBuilder::default()
1552 .with_native_mem_bytes(5000000)
1553 .with_cpu_interval(Duration::from_secs(1))
1554 .with_wall_clock_interval(Duration::from_secs(10))
1555 .build();
1556
1557 let dummy_path = Path::new("/tmp/test.jfr");
1558 let args = opts.to_args_string(dummy_path);
1559 assert_eq!(
1560 args,
1561 "start,event=cpu,interval=1000000000,wall=10000ms,jfr,cstack=dwarf,file=/tmp/test.jfr,nativemem=5000000"
1562 );
1563 }
1564
1565 #[test]
1566 fn test_local_reporter_has_no_metadata() {
1567 // Check that with_local_reporter sets some configuration
1568 let reporter = ProfilerBuilder::default().with_local_reporter(".");
1569 assert_eq!(
1570 format!("{:?}", reporter.reporter),
1571 r#"Some(LocalReporter { directory: "." })"#
1572 );
1573 match reporter.agent_metadata {
1574 Some(AgentMetadata::NoMetadata) => {}
1575 bad => panic!("{bad:?}"),
1576 };
1577 }
1578}