1use crate::{
5 asprof::{self, AsProfError},
6 metadata::{aws::AwsProfilerMetadataError, AgentMetadata, ReportMetadata},
7 reporter::Reporter,
8};
9use std::{
10 fs::File,
11 io,
12 path::{Path, PathBuf},
13 time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH},
14};
15use thiserror::Error;
16
17struct JfrFile {
18 active: std::fs::File,
19 inactive: std::fs::File,
20}
21
22impl JfrFile {
23 #[cfg(target_os = "linux")]
24 fn new() -> Result<Self, io::Error> {
25 Ok(Self {
26 active: tempfile::tempfile().unwrap(),
27 inactive: tempfile::tempfile().unwrap(),
28 })
29 }
30
31 #[cfg(not(target_os = "linux"))]
32 fn new() -> Result<Self, io::Error> {
33 io::Error::new(
34 io::ErrorKind::Other,
35 "async-profiler is only supported on Linux",
36 )
37 }
38
39 fn swap(&mut self) {
40 std::mem::swap(&mut self.active, &mut self.inactive);
41 }
42
43 #[cfg(target_os = "linux")]
44 fn file_path(file: &std::fs::File) -> PathBuf {
45 use std::os::fd::AsRawFd;
46
47 format!("/proc/self/fd/{}", file.as_raw_fd()).into()
48 }
49
50 #[cfg(not(target_os = "linux"))]
51 fn file_path(_file: &std::fs::File) -> PathBuf {
52 unimplemented!()
53 }
54
55 fn active_path(&self) -> PathBuf {
56 Self::file_path(&self.active)
57 }
58
59 fn inactive_path(&self) -> PathBuf {
60 Self::file_path(&self.inactive)
61 }
62
63 fn empty_inactive_file(&mut self) -> Result<(), io::Error> {
64 File::create(Self::file_path(&self.inactive))?;
67 tracing::debug!(message = "emptied the file");
68 Ok(())
69 }
70}
71
72#[derive(Debug, Default)]
75pub struct ProfilerBuilder {
76 reporting_interval: Option<Duration>,
77 reporter: Option<Box<dyn Reporter + Send + Sync>>,
78 agent_metadata: Option<AgentMetadata>,
79}
80
81impl ProfilerBuilder {
82 pub fn with_reporting_interval(mut self, i: Duration) -> ProfilerBuilder {
84 self.reporting_interval = Some(i);
85 self
86 }
87
88 pub fn with_reporter(mut self, r: impl Reporter + Send + Sync + 'static) -> ProfilerBuilder {
90 self.reporter = Some(Box::new(r));
91 self
92 }
93
94 pub fn with_custom_agent_metadata(mut self, j: AgentMetadata) -> ProfilerBuilder {
96 self.agent_metadata = Some(j);
97 self
98 }
99
100 pub fn build(self) -> Profiler {
102 Profiler {
103 reporting_interval: self.reporting_interval.unwrap_or(Duration::from_secs(30)),
104 reporter: self.reporter.expect("reporter is required"),
105 agent_metadata: self.agent_metadata,
106 }
107 }
108}
109
110enum Status {
111 Idle,
112 Starting,
113 Running(SystemTime),
114}
115
116struct ProfilerState<E: ProfilerEngine> {
120 jfr_file: Option<JfrFile>,
122 asprof: E,
123 status: Status,
124}
125
126impl<E: ProfilerEngine> ProfilerState<E> {
127 pub fn new(asprof: E) -> Result<Self, io::Error> {
128 Ok(Self {
129 jfr_file: Some(JfrFile::new()?),
130 asprof,
131 status: Status::Idle,
132 })
133 }
134
135 pub fn jfr_file_mut(&mut self) -> &mut JfrFile {
136 self.jfr_file.as_mut().unwrap()
137 }
138
139 async fn start(&mut self) -> Result<(), AsProfError> {
140 let active = self.jfr_file.as_ref().unwrap().active_path();
141 self.status = Status::Starting;
143 E::start_async_profiler(&self.asprof, &active)?;
144 self.status = Status::Running(SystemTime::now());
145 Ok(())
146 }
147
148 fn stop(&mut self) -> Result<Option<SystemTime>, AsProfError> {
149 E::stop_async_profiler()?;
150 let status = std::mem::replace(&mut self.status, Status::Idle);
151 Ok(match status {
152 Status::Idle | Status::Starting => None,
153 Status::Running(since) => Some(since),
154 })
155 }
156
157 fn is_started(&self) -> bool {
158 matches!(self.status, Status::Running(_))
159 }
160}
161
162impl<E: ProfilerEngine> Drop for ProfilerState<E> {
163 fn drop(&mut self) {
164 match self.status {
165 Status::Running(_) => {
166 if let Err(err) = self.stop() {
167 std::mem::forget(self.jfr_file.take());
170 tracing::warn!(?err, "unable to stop profiler during drop glue");
172 }
173 }
174 Status::Idle => {}
175 Status::Starting => {
176 std::mem::forget(self.jfr_file.take());
179 }
180 }
181 }
182}
183
184pub(crate) trait ProfilerEngine: Send + Sync + 'static {
185 fn init_async_profiler() -> Result<(), asprof::AsProfError>;
186 fn start_async_profiler(&self, jfr_file_path: &Path) -> Result<(), asprof::AsProfError>;
187 fn stop_async_profiler() -> Result<(), asprof::AsProfError>;
188}
189
190#[derive(Debug, Error)]
191#[non_exhaustive]
192enum TickError {
193 #[error(transparent)]
194 AsProf(#[from] AsProfError),
195 #[error(transparent)]
196 Metadata(#[from] AwsProfilerMetadataError),
197 #[error("reporter: {0}")]
198 Reporter(Box<dyn std::error::Error + Send>),
199 #[error("broken clock: {0}")]
200 BrokenClock(#[from] SystemTimeError),
201 #[error("jfr read error: {0}")]
202 JfrRead(io::Error),
203 #[error("empty inactive file error: {0}")]
204 EmptyInactiveFile(io::Error),
205}
206
207#[derive(Debug, Error)]
208#[non_exhaustive]
209pub enum SpawnError {
210 #[error(transparent)]
211 AsProf(#[from] asprof::AsProfError),
212 #[error("tempfile error: {0}")]
213 TempFile(io::Error),
214}
215
216pub struct Profiler {
220 reporting_interval: Duration,
221 reporter: Box<dyn Reporter + Send + Sync>,
222 agent_metadata: Option<AgentMetadata>,
223}
224
225impl Profiler {
226 pub fn spawn(self) -> Result<tokio::task::JoinHandle<()>, SpawnError> {
228 self.spawn_inner(asprof::AsProf::builder().build())
229 }
230
231 fn spawn_inner<E: ProfilerEngine>(
232 self,
233 asprof: E,
234 ) -> Result<tokio::task::JoinHandle<()>, SpawnError> {
235 E::init_async_profiler()?;
237 tracing::info!("successfully initialized async profiler.");
238
239 let mut sampling_ticker = tokio::time::interval(self.reporting_interval);
240
241 Ok(tokio::spawn(async move {
243 let mut state = match ProfilerState::new(asprof) {
244 Ok(state) => state,
245 Err(err) => {
246 tracing::error!(?err, "unable to create profiler state");
247 return;
248 }
249 };
250
251 let mut agent_metadata = self.agent_metadata;
253
254 loop {
255 let _ = sampling_ticker.tick().await;
257 tracing::debug!("profiler timer woke up");
258
259 if let Err(err) = profiler_tick(
260 &mut state,
261 &mut agent_metadata,
262 &*self.reporter,
263 self.reporting_interval,
264 )
265 .await
266 {
267 match &err {
268 TickError::Reporter(_) => {
269 tracing::error!(?err, "error during profiling, continuing");
271 }
272 _stop => {
273 tracing::error!(?err, "error during profiling, stopping");
274 break;
275 }
276 }
277 }
278 }
279
280 tracing::info!("profiling task finished");
281 }))
282 }
283}
284
285async fn profiler_tick<E: ProfilerEngine>(
286 state: &mut ProfilerState<E>,
287 agent_metadata: &mut Option<AgentMetadata>,
288 reporter: &(dyn Reporter + Send + Sync),
289 reporting_interval: Duration,
290) -> Result<(), TickError> {
291 if !state.is_started() {
292 state.start().await?;
293 return Ok(());
294 }
295
296 let Some(start) = state.stop()? else {
297 tracing::warn!("stopped the profiler but it wasn't running?");
298 return Ok(());
299 };
300 let start = start.duration_since(UNIX_EPOCH)?;
301 let end = SystemTime::now().duration_since(UNIX_EPOCH)?;
302
303 state
306 .jfr_file_mut()
307 .empty_inactive_file()
308 .map_err(TickError::EmptyInactiveFile)?;
309 state.jfr_file_mut().swap();
310 state.start().await?;
311
312 if agent_metadata.is_none() {
316 #[cfg(feature = "aws-metadata")]
317 let md = crate::metadata::aws::load_agent_metadata().await?;
318 #[cfg(not(feature = "aws-metadata"))]
319 let md = crate::metadata::AgentMetadata::Other;
320 tracing::debug!("loaded metadata");
321 agent_metadata.replace(md);
322 }
323
324 let report_metadata = ReportMetadata {
325 instance: agent_metadata.as_ref().unwrap(),
326 start,
327 end,
328 reporting_interval,
329 };
330
331 let jfr = tokio::fs::read(state.jfr_file_mut().inactive_path())
332 .await
333 .map_err(TickError::JfrRead)?;
334
335 reporter
336 .report(jfr, &report_metadata)
337 .await
338 .map_err(TickError::Reporter)?;
339
340 Ok(())
341}
342
343#[cfg(test)]
344mod tests {
345 use std::sync::atomic::{self, AtomicBool, AtomicU32};
346 use std::sync::Arc;
347
348 use test_case::test_case;
349
350 use super::*;
351
352 #[test]
353 fn test_jfr_file_drop() {
354 let mut jfr = JfrFile::new().unwrap();
355
356 std::fs::write(jfr.active_path(), b"Hello, 2!").unwrap();
357 jfr.swap();
358 assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"Hello, 2!");
359 jfr.empty_inactive_file().unwrap();
360 assert_eq!(std::fs::read(jfr.inactive_path()).unwrap(), b"");
361 }
362
363 struct MockProfilerEngine {
364 counter: AtomicU32,
365 }
366 impl ProfilerEngine for MockProfilerEngine {
367 fn init_async_profiler() -> Result<(), asprof::AsProfError> {
368 Ok(())
369 }
370
371 fn start_async_profiler(&self, jfr_file_path: &Path) -> Result<(), asprof::AsProfError> {
372 let contents = format!(
373 "JFR{}",
374 self.counter.fetch_add(1, atomic::Ordering::Relaxed)
375 );
376 std::fs::write(jfr_file_path, contents.as_bytes()).unwrap();
377 Ok(())
378 }
379
380 fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
381 Ok(())
382 }
383 }
384
385 struct MockReporter(tokio::sync::mpsc::Sender<(String, AgentMetadata)>);
386 impl std::fmt::Debug for MockReporter {
387 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
388 f.debug_struct("MockReporter").finish()
389 }
390 }
391
392 #[async_trait::async_trait]
393 impl Reporter for MockReporter {
394 async fn report(
395 &self,
396 jfr: Vec<u8>,
397 metadata: &ReportMetadata,
398 ) -> Result<(), Box<dyn std::error::Error + Send>> {
399 self.0
400 .send((String::from_utf8(jfr).unwrap(), metadata.instance.clone()))
401 .await
402 .unwrap();
403 Ok(())
404 }
405 }
406
407 fn make_mock_profiler() -> (
408 Profiler,
409 tokio::sync::mpsc::Receiver<(String, AgentMetadata)>,
410 ) {
411 let (tx, rx) = tokio::sync::mpsc::channel(1);
412 let agent = ProfilerBuilder::default()
413 .with_reporter(MockReporter(tx))
414 .with_custom_agent_metadata(AgentMetadata::Ec2AgentMetadata {
415 aws_account_id: "0".into(),
416 aws_region_id: "us-east-1".into(),
417 ec2_instance_id: "i-fake".into(),
418 })
419 .build();
420 (agent, rx)
421 }
422
423 #[tokio::test(start_paused = true)]
424 async fn test_profiler_agent() {
425 let e_md = AgentMetadata::Ec2AgentMetadata {
426 aws_account_id: "0".into(),
427 aws_region_id: "us-east-1".into(),
428 ec2_instance_id: "i-fake".into(),
429 };
430 let (agent, mut rx) = make_mock_profiler();
431 agent
432 .spawn_inner::<MockProfilerEngine>(MockProfilerEngine {
433 counter: AtomicU32::new(0),
434 })
435 .unwrap();
436 let (jfr, md) = rx.recv().await.unwrap();
437 assert_eq!(jfr, "JFR0");
438 assert_eq!(e_md, md);
439 let (jfr, md) = rx.recv().await.unwrap();
440 assert_eq!(jfr, "JFR1");
441 assert_eq!(e_md, md);
442 }
443
444 struct StopErrorProfilerEngine {
447 start_error: bool,
448 counter: Arc<AtomicBool>,
449 }
450 impl ProfilerEngine for StopErrorProfilerEngine {
451 fn init_async_profiler() -> Result<(), asprof::AsProfError> {
452 Ok(())
453 }
454
455 fn start_async_profiler(&self, jfr_file_path: &Path) -> Result<(), asprof::AsProfError> {
456 let jfr_file_path = jfr_file_path.to_owned();
457 std::fs::write(&jfr_file_path, "JFR").unwrap();
458 let counter = self.counter.clone();
459 tokio::task::spawn(async move {
460 tokio::time::sleep(Duration::from_secs(5)).await;
461 assert_eq!(std::fs::read_to_string(jfr_file_path).unwrap(), "JFR");
462 counter.store(true, atomic::Ordering::Release);
463 });
464 if self.start_error {
465 Err(asprof::AsProfError::AsyncProfilerError("error".into()))
466 } else {
467 Ok(())
468 }
469 }
470
471 fn stop_async_profiler() -> Result<(), asprof::AsProfError> {
472 Err(asprof::AsProfError::AsyncProfilerError("error".into()))
473 }
474 }
475
476 #[tokio::test(start_paused = true)]
477 #[test_case(false; "error on stop")]
478 #[test_case(true; "error on start")]
479 async fn test_profiler_error(start_error: bool) {
480 let (agent, mut rx) = make_mock_profiler();
481 let counter = Arc::new(AtomicBool::new(false));
482 let engine = StopErrorProfilerEngine {
483 start_error,
484 counter: counter.clone(),
485 };
486 agent.spawn_inner(engine).unwrap();
487 assert!(rx.recv().await.is_none());
488 for _ in 0..100 {
490 tokio::time::sleep(Duration::from_secs(1)).await;
491 if counter.load(atomic::Ordering::Acquire) {
492 return;
493 }
494 }
495 panic!("didn't read from file");
496 }
497}