async_profiler_agent/
profiler.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{
5    asprof::{self, AsProfError},
6    metadata::{aws::AwsProfilerMetadataError, AgentMetadata, ReportMetadata},
7    reporter::Reporter,
8};
9use std::{
10    fs::File,
11    io,
12    path::{Path, PathBuf},
13    time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
14};
15use thiserror::Error;
16
17struct JfrFile {
18    active: std::fs::File,
19    inactive: std::fs::File,
20}
21
22impl JfrFile {
23    #[cfg(target_os = "linux")]
24    fn new() -> Result<Self, io::Error> {
25        Ok(Self {
26            active: tempfile::tempfile().unwrap(),
27            inactive: tempfile::tempfile().unwrap(),
28        })
29    }
30
31    #[cfg(not(target_os = "linux"))]
32    fn new() -> Result<Self, io::Error> {
33        io::Error::new(
34            io::ErrorKind::Other,
35            "async-profiler is only supported on Linux",
36        )
37    }
38
39    fn swap(&mut self) {
40        std::mem::swap(&mut self.active, &mut self.inactive);
41    }
42
43    #[cfg(target_os = "linux")]
44    fn file_path(file: &std::fs::File) -> PathBuf {
45        use std::os::fd::AsRawFd;
46
47        format!("/proc/self/fd/{}", file.as_raw_fd()).into()
48    }
49
50    #[cfg(not(target_os = "linux"))]
51    fn file_path(_file: &std::fs::File) -> PathBuf {
52        unimplemented!()
53    }
54
55    fn active_path(&self) -> PathBuf {
56        Self::file_path(&self.active)
57    }
58
59    fn inactive_path(&self) -> PathBuf {
60        Self::file_path(&self.inactive)
61    }
62
63    fn empty_inactive_file(&mut self) -> Result<(), io::Error> {
64        // Empty the file, or create it for the first time if the profiler hasn't
65        // started yet.
66        File::create(Self::file_path(&self.inactive))?;
67        tracing::debug!(message = "emptied the file");
68        Ok(())
69    }
70}
71
72/// Builds a [`Profiler`], panicking if any required fields were not set by the
73/// time `build` is called.
74#[derive(Debug, Default)]
75pub struct ProfilerBuilder {
76    reporting_interval: Option<Duration>,
77    reporter: Option<Box<dyn Reporter + Send + Sync>>,
78    agent_metadata: Option<AgentMetadata>,
79}
80
81impl ProfilerBuilder {
82    /// Sets the reporting interval.
83    pub fn with_reporting_interval(mut self, i: Duration) -> ProfilerBuilder {
84        self.reporting_interval = Some(i);
85        self
86    }
87
88    /// Sets the reporter.
89    pub fn with_reporter(mut self, r: impl Reporter + Send + Sync + 'static) -> ProfilerBuilder {
90        self.reporter = Some(Box::new(r));
91        self
92    }
93
94    /// Provide custom agent metadata.
95    pub fn with_custom_agent_metadata(mut self, j: AgentMetadata) -> ProfilerBuilder {
96        self.agent_metadata = Some(j);
97        self
98    }
99
100    /// Turn this builder into a profiler!
101    pub fn build(self) -> Profiler {
102        Profiler {
103            reporting_interval: self.reporting_interval.unwrap_or(Duration::from_secs(30)),
104            reporter: self.reporter.expect("reporter is required"),
105            agent_metadata: self.agent_metadata,
106        }
107    }
108}
109
110enum Status {
111    Idle,
112    Starting,
113    Running(SystemTime),
114}
115
116/// This type provides wrapper APIs over [`asprof::AsProf`], to allow tracking
117/// of the state of the profiler. The primary benefit of this is RAII - when
118/// this type drops, it will stop the profiler if it's running.
119struct ProfilerState<E: ProfilerEngine> {
120    // this is only None in the destructor when stopping the async-profiler fails
121    jfr_file: Option<JfrFile>,
122    asprof: E,
123    status: Status,
124}
125
126impl<E: ProfilerEngine> ProfilerState<E> {
127    pub fn new(asprof: E) -> Result<Self, io::Error> {
128        Ok(Self {
129            jfr_file: Some(JfrFile::new()?),
130            asprof,
131            status: Status::Idle,
132        })
133    }
134
135    pub fn jfr_file_mut(&mut self) -> &mut JfrFile {
136        self.jfr_file.as_mut().unwrap()
137    }
138
139    async fn start(&mut self) -> Result<(), AsProfError> {
140        let active = self.jfr_file.as_ref().unwrap().active_path();
141        // drop guard - make sure the files are leaked if the profiler might have started
142        self.status = Status::Starting;
143        E::start_async_profiler(&self.asprof, &active)?;
144        self.status = Status::Running(SystemTime::now());
145        Ok(())
146    }
147
148    fn stop(&mut self) -> Result<Option<SystemTime>, AsProfError> {
149        E::stop_async_profiler()?;
150        let status = std::mem::replace(&mut self.status, Status::Idle);
151        Ok(match status {
152            Status::Idle | Status::Starting => None,
153            Status::Running(since) => Some(since),
154        })
155    }
156
157    fn is_started(&self) -> bool {
158        matches!(self.status, Status::Running(_))
159    }
160}
161
162impl<E: ProfilerEngine> Drop for ProfilerState<E> {
163    fn drop(&mut self) {
164        match self.status {
165            Status::Running(_) => {
166                if let Err(err) = self.stop() {
167                    // SECURITY: avoid removing the JFR file if stopping the profiler fails,
168                    // to avoid symlink races
169                    std::mem::forget(self.jfr_file.take());
170                    // XXX: Rust defines leaking resources during drop as safe.
171                    tracing::warn!(?err, "unable to stop profiler during drop glue");
172                }
173            }
174            Status::Idle => {}
175            Status::Starting => {
176                // SECURITY: avoid removing the JFR file if stopping the profiler fails,
177                // to avoid symlink races
178                std::mem::forget(self.jfr_file.take());
179            }
180        }
181    }
182}
183
184pub(crate) trait ProfilerEngine: Send + Sync + 'static {
185    fn init_async_profiler() -> Result<(), asprof::AsProfError>;
186    fn start_async_profiler(&self, jfr_file_path: &Path) -> Result<(), asprof::AsProfError>;
187    fn stop_async_profiler() -> Result<(), asprof::AsProfError>;
188}
189
190#[derive(Debug, Error)]
191#[non_exhaustive]
192enum TickError {
193    #[error(transparent)]
194    AsProf(#[from] AsProfError),
195    #[error(transparent)]
196    Metadata(#[from] AwsProfilerMetadataError),
197    #[error("reporter: {0}")]
198    Reporter(Box<dyn std::error::Error + Send>),
199    #[error("broken clock: {0}")]
200    BrokenClock(#[from] SystemTimeError),
201    #[error("jfr read error: {0}")]
202    JfrRead(io::Error),
203    #[error("empty inactive file error: {0}")]
204    EmptyInactiveFile(io::Error),
205}
206
207#[derive(Debug, Error)]
208#[non_exhaustive]
209pub enum SpawnError {
210    #[error(transparent)]
211    AsProf(#[from] asprof::AsProfError),
212    #[error("tempfile error: {0}")]
213    TempFile(io::Error),
214}
215
216/// Rust profiler based on [async-profiler].
217///
218/// [async-profiler]: https://github.com/async-profiler/async-profiler
219pub struct Profiler {
220    reporting_interval: Duration,
221    reporter: Box<dyn Reporter + Send + Sync>,
222    agent_metadata: Option<AgentMetadata>,
223}
224
225impl Profiler {
226    /// Start profiling. The profiler will run in a tokio task at the configured interval.
227    pub fn spawn(self) -> Result<tokio::task::JoinHandle<()>, SpawnError> {
228        self.spawn_inner(asprof::AsProf::builder().build())
229    }
230
231    fn spawn_inner<E: ProfilerEngine>(
232        self,
233        asprof: E,
234    ) -> Result<tokio::task::JoinHandle<()>, SpawnError> {
235        // Initialize async profiler - needs to be done once.
236        E::init_async_profiler()?;
237        tracing::info!("successfully initialized async profiler.");
238
239        let mut sampling_ticker = tokio::time::interval(self.reporting_interval);
240
241        // Get profiles at the configured interval rate.
242        Ok(tokio::spawn(async move {
243            let mut state = match ProfilerState::new(asprof) {
244                Ok(state) => state,
245                Err(err) => {
246                    tracing::error!(?err, "unable to create profiler state");
247                    return;
248                }
249            };
250
251            // Lazily-loaded if not specified up front.
252            let mut agent_metadata = self.agent_metadata;
253
254            loop {
255                // Start timer.
256                let _ = sampling_ticker.tick().await;
257                tracing::debug!("profiler timer woke up");
258
259                if let Err(err) = profiler_tick(
260                    &mut state,
261                    &mut agent_metadata,
262                    &*self.reporter,
263                    self.reporting_interval,
264                )
265                .await
266                {
267                    match &err {
268                        TickError::Reporter(_) => {
269                            // don't stop on IO errors
270                            tracing::error!(?err, "error during profiling, continuing");
271                        }
272                        _stop => {
273                            tracing::error!(?err, "error during profiling, stopping");
274                            break;
275                        }
276                    }
277                }
278            }
279
280            tracing::info!("profiling task finished");
281        }))
282    }
283}
284
285async fn profiler_tick<E: ProfilerEngine>(
286    state: &mut ProfilerState<E>,
287    agent_metadata: &mut Option<AgentMetadata>,
288    reporter: &(dyn Reporter + Send + Sync),
289    reporting_interval: Duration,
290) -> Result<(), TickError> {
291    if !state.is_started() {
292        state.start().await?;
293        return Ok(());
294    }
295
296    let Some(start) = state.stop()? else {
297        tracing::warn!("stopped the profiler but it wasn't running?");
298        return Ok(());
299    };
300    let start = start.duration_since(UNIX_EPOCH)?;
301    let end = SystemTime::now().duration_since(UNIX_EPOCH)?;
302
303    // Start it up immediately, writing to the "other" file, so that we keep
304    // profiling the application while we're reporting data.
305    state
306        .jfr_file_mut()
307        .empty_inactive_file()
308        .map_err(TickError::EmptyInactiveFile)?;
309    state.jfr_file_mut().swap();
310    state.start().await?;
311
312    // Lazily load the agent metadata if it was not provided in
313    // the constructor. See the struct comments for why this is.
314    // This code runs at most once.
315    if agent_metadata.is_none() {
316        #[cfg(feature = "aws-metadata")]
317        let md = crate::metadata::aws::load_agent_metadata().await?;
318        #[cfg(not(feature = "aws-metadata"))]
319        let md = crate::metadata::AgentMetadata::Other;
320        tracing::debug!("loaded metadata");
321        agent_metadata.replace(md);
322    }
323
324    let report_metadata = ReportMetadata {
325        instance: agent_metadata.as_ref().unwrap(),
326        start,
327        end,
328        reporting_interval,
329    };
330
331    let jfr = tokio::fs::read(state.jfr_file_mut().inactive_path())
332        .await
333        .map_err(TickError::JfrRead)?;
334
335    reporter
336        .report(jfr, &report_metadata)
337        .await
338        .map_err(TickError::Reporter)?;
339
340    Ok(())
341}
342
343#[cfg(test)]
344mod tests {
345    use std::sync::atomic::{self, AtomicBool, AtomicU32};
346    use std::sync::Arc;
347
348    use test_case::test_case;
349
350    use super::*;
351
352    #[test]
353    fn test_jfr_file_drop() {
354        let mut jfr = JfrFile::new().unwrap();
355
356        std::fs::write(jfr.active_path(), b"Hello, 2!").unwrap();
357        jfr.swap();
358        assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"Hello, 2!");
359        jfr.empty_inactive_file().unwrap();
360        assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"");
361    }
362
363    struct MockProfilerEngine {
364        counter: AtomicU32,
365    }
366    impl ProfilerEngine for MockProfilerEngine {
367        fn init_async_profiler() -> Result<(), asprof::AsProfError> {
368            Ok(())
369        }
370
371        fn start_async_profiler(&self, jfr_file_path: &Path) -> Result<(), asprof::AsProfError> {
372            let contents = format!(
373                "JFR{}",
374                self.counter.fetch_add(1, atomic::Ordering::Relaxed)
375            );
376            std::fs::write(jfr_file_path, contents.as_bytes()).unwrap();
377            Ok(())
378        }
379
380        fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
381            Ok(())
382        }
383    }
384
385    struct MockReporter(tokio::sync::mpsc::Sender<(String, AgentMetadata)>);
386    impl std::fmt::Debug for MockReporter {
387        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
388            f.debug_struct("MockReporter").finish()
389        }
390    }
391
392    #[async_trait::async_trait]
393    impl Reporter for MockReporter {
394        async fn report(
395            &self,
396            jfr: Vec<u8>,
397            metadata: &ReportMetadata,
398        ) -> Result<(), Box<dyn std::error::Error + Send>> {
399            self.0
400                .send((String::from_utf8(jfr).unwrap(), metadata.instance.clone()))
401                .await
402                .unwrap();
403            Ok(())
404        }
405    }
406
407    fn make_mock_profiler() -> (
408        Profiler,
409        tokio::sync::mpsc::Receiver<(String, AgentMetadata)>,
410    ) {
411        let (tx, rx) = tokio::sync::mpsc::channel(1);
412        let agent = ProfilerBuilder::default()
413            .with_reporter(MockReporter(tx))
414            .with_custom_agent_metadata(AgentMetadata::Ec2AgentMetadata {
415                aws_account_id: "0".into(),
416                aws_region_id: "us-east-1".into(),
417                ec2_instance_id: "i-fake".into(),
418            })
419            .build();
420        (agent, rx)
421    }
422
423    #[tokio::test(start_paused = true)]
424    async fn test_profiler_agent() {
425        let e_md = AgentMetadata::Ec2AgentMetadata {
426            aws_account_id: "0".into(),
427            aws_region_id: "us-east-1".into(),
428            ec2_instance_id: "i-fake".into(),
429        };
430        let (agent, mut rx) = make_mock_profiler();
431        agent
432            .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
433                counter: AtomicU32::new(0),
434            })
435            .unwrap();
436        let (jfr, md) = rx.recv().await.unwrap();
437        assert_eq!(jfr, "JFR0");
438        assert_eq!(e_md, md);
439        let (jfr, md) = rx.recv().await.unwrap();
440        assert_eq!(jfr, "JFR1");
441        assert_eq!(e_md, md);
442    }
443
444    // simulate a badly-behaved profiler that errors on start/stop and then
445    // tries to access the JFR file
446    struct StopErrorProfilerEngine {
447        start_error: bool,
448        counter: Arc<AtomicBool>,
449    }
450    impl ProfilerEngine for StopErrorProfilerEngine {
451        fn init_async_profiler() -> Result<(), asprof::AsProfError> {
452            Ok(())
453        }
454
455        fn start_async_profiler(&self, jfr_file_path: &Path) -> Result<(), asprof::AsProfError> {
456            let jfr_file_path = jfr_file_path.to_owned();
457            std::fs::write(&jfr_file_path, "JFR").unwrap();
458            let counter = self.counter.clone();
459            tokio::task::spawn(async move {
460                tokio::time::sleep(Duration::from_secs(5)).await;
461                assert_eq!(std::fs::read_to_string(jfr_file_path).unwrap(), "JFR");
462                counter.store(true, atomic::Ordering::Release);
463            });
464            if self.start_error {
465                Err(asprof::AsProfError::AsyncProfilerError("error".into()))
466            } else {
467                Ok(())
468            }
469        }
470
471        fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
472            Err(asprof::AsProfError::AsyncProfilerError("error".into()))
473        }
474    }
475
476    #[tokio::test(start_paused = true)]
477    #[test_case(false; "error on stop")]
478    #[test_case(true; "error on start")]
479    async fn test_profiler_error(start_error: bool) {
480        let (agent, mut rx) = make_mock_profiler();
481        let counter = Arc::new(AtomicBool::new(false));
482        let engine = StopErrorProfilerEngine {
483            start_error,
484            counter: counter.clone(),
485        };
486        agent.spawn_inner(engine).unwrap();
487        assert!(rx.recv().await.is_none());
488        // check that the "sleep 5" step in start_async_profiler succeeds
489        for _ in 0..100 {
490            tokio::time::sleep(Duration::from_secs(1)).await;
491            if counter.load(atomic::Ordering::Acquire) {
492                return;
493            }
494        }
495        panic!("didn't read from file");
496    }
497}