async_profiler_agent/reporter/
s3.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! A reporter that uploads reports to an S3 bucket
5
6use async_trait::async_trait;
7use aws_config::SdkConfig;
8use chrono::SecondsFormat;
9use serde::Serialize;
10use std::io::Write;
11use std::time::SystemTime;
12use std::{fmt, io::Cursor};
13use thiserror::Error;
14use zip::result::ZipError;
15use zip::{ZipWriter, write::SimpleFileOptions};
16
17use crate::metadata::{AgentMetadata, ReportMetadata};
18
19use super::Reporter;
20
21/// Error reporting to S3
22#[derive(Error, Debug)]
23pub enum S3ReporterError {
24    /// I/O error creating zip file
25    #[error("io error creating zip file: {0}")]
26    ZipIoError(std::io::Error),
27    /// Error creating zip file
28    #[error("creating zip file: {0}")]
29    ZipError(#[from] ZipError),
30    /// Error sending data to S3
31    #[error("failed to send profile data directly to S3: {0}")]
32    SendProfileS3Data(Box<aws_sdk_s3::Error>),
33    /// Error joining Tokio task
34    #[error("tokio task: {0}")]
35    JoinError(#[from] tokio::task::JoinError),
36}
37
38/// This is the format of the metadata JSON uploaded to S3.
39#[derive(Debug, Serialize)]
40#[non_exhaustive]
41pub struct MetadataJson {
42    start: u64,
43    end: u64,
44    reporting_interval: u64,
45}
46
47/// Mandatory parameters in order to configure an S3 reporter
48pub struct S3ReporterConfig<'a> {
49    /// The SDK config to get credentials from
50    pub sdk_config: &'a SdkConfig,
51    /// The expected bucket owner account
52    pub bucket_owner: String,
53    /// The bucket name
54    pub bucket_name: String,
55    /// The profiling group name, used in the file names within the bucket
56    pub profiling_group_name: String,
57}
58
59/// A reporter that uploads reports to an S3 bucket
60///
61/// The [`S3Reporter`] uploads each report in a `zip` file, that currently contains 2 files:
62/// 1. a [JFR] as `async_profiler_dump_0.jfr`
63/// 2. metadata as `metadata.json`, in format [`MetadataJson`].
64///
65/// The `zip` file is uploaded to the specified bucket under the path
66/// `profile_{profiling_group_name}_{machine}_{pid}_{time}.zip`, where `{machine}` is either
67/// `ec2_{ec2_instance_id}_`, `ecs_{cluster_arn}_{task_arn}`, or `unknown__` (or,
68/// for the deprecated [AgentMetadata::Other], `onprem__`).
69///
70/// [JFR]: https://docs.oracle.com/javacomponents/jmc-5-4/jfr-runtime-guide/about.htm
71pub struct S3Reporter {
72    s3_client: aws_sdk_s3::Client,
73    bucket_owner: String,
74    bucket_name: String,
75    profiling_group_name: String,
76}
77
78impl S3Reporter {
79    /// Create a new S3Reporter
80    pub fn new(config: S3ReporterConfig<'_>) -> Self {
81        let S3ReporterConfig {
82            sdk_config,
83            bucket_owner,
84            bucket_name,
85            profiling_group_name,
86        } = config;
87        let s3_client_config = aws_sdk_s3::config::Builder::from(sdk_config).build();
88        let s3_client = aws_sdk_s3::Client::from_conf(s3_client_config);
89
90        S3Reporter {
91            s3_client,
92            bucket_owner,
93            bucket_name,
94            profiling_group_name,
95        }
96    }
97
98    /// Makes a zip file, then uploads it.
99    pub async fn report_profiling_data(
100        &self,
101        jfr: Vec<u8>,
102        metadata_obj: &ReportMetadata<'_>,
103    ) -> Result<(), S3ReporterError> {
104        tracing::debug!("sending file to backend");
105
106        let metadata_json = MetadataJson {
107            start: metadata_obj.start.as_millis() as u64,
108            end: metadata_obj.end.as_millis() as u64,
109            reporting_interval: metadata_obj.reporting_interval.as_millis() as u64,
110        };
111
112        // Create a zip file.
113        let zip = tokio::task::spawn_blocking(move || {
114            add_files_to_zip("async_profiler_dump_0.jfr", &jfr, metadata_json)
115        })
116        .await??;
117
118        // Send zip file to the S3 pre-signed URL.
119        send_profile_data(
120            &self.s3_client,
121            self.bucket_owner.clone(),
122            self.bucket_name.clone(),
123            make_s3_file_name(
124                metadata_obj.instance,
125                &self.profiling_group_name,
126                SystemTime::now(),
127            ),
128            zip,
129        )
130        .await?;
131
132        Ok(())
133    }
134}
135
136fn make_s3_file_name(
137    metadata_obj: &AgentMetadata,
138    profiling_group_name: &str,
139    time: SystemTime,
140) -> String {
141    let machine = match metadata_obj {
142        AgentMetadata::Ec2AgentMetadata {
143            aws_account_id: _,
144            aws_region_id: _,
145            ec2_instance_id,
146            ..
147        } => {
148            let ec2_instance_id = ec2_instance_id.replace("/", "-").replace("_", "-");
149            format!("ec2_{ec2_instance_id}_")
150        }
151        AgentMetadata::FargateAgentMetadata {
152            aws_account_id: _,
153            aws_region_id: _,
154            ecs_task_arn,
155            ecs_cluster_arn: _,
156            ..
157        } => {
158            let task_arn = ecs_task_arn.replace("/", "-").replace("_", "-");
159            format!("ecs_{task_arn}_")
160        }
161        #[allow(deprecated)]
162        AgentMetadata::Other => "onprem__".to_string(),
163        AgentMetadata::NoMetadata => "unknown__".to_string(),
164    };
165    let time: chrono::DateTime<chrono::Utc> = time.into();
166    let time = time
167        .to_rfc3339_opts(SecondsFormat::Secs, true)
168        .replace(":", "-");
169    let pid = std::process::id();
170    format!("profile_{profiling_group_name}_{machine}_{pid}_{time}.zip")
171}
172
173#[async_trait]
174impl Reporter for S3Reporter {
175    async fn report(
176        &self,
177        jfr: Vec<u8>,
178        metadata: &ReportMetadata,
179    ) -> Result<(), Box<dyn std::error::Error + Send>> {
180        self.report_profiling_data(jfr, metadata)
181            .await
182            .map_err(|e| Box::new(e) as _)
183    }
184}
185
186impl fmt::Debug for S3Reporter {
187    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188        f.debug_struct("S3Reporter").finish()
189    }
190}
191
192fn add_files_to_zip(
193    jfr_filename: &str,
194    jfr_file: &[u8],
195    metadata_json: MetadataJson,
196) -> Result<Vec<u8>, S3ReporterError> {
197    tracing::debug!("creating zip file");
198
199    let file = Cursor::new(vec![]);
200    let mut zip = ZipWriter::new(file);
201    let metadata = serde_json::ser::to_string(&metadata_json).unwrap();
202    add_bytes_to_zip(&mut zip, jfr_filename, jfr_file).map_err(S3ReporterError::ZipIoError)?;
203    add_bytes_to_zip(&mut zip, "metadata.json", metadata.as_bytes())
204        .map_err(S3ReporterError::ZipIoError)?;
205    Ok(zip.finish()?.into_inner())
206}
207
208fn add_bytes_to_zip(
209    zip: &mut ZipWriter<Cursor<Vec<u8>>>,
210    filename: &str,
211    data: &[u8],
212) -> Result<(), std::io::Error> {
213    let options = SimpleFileOptions::default().compression_method(zip::CompressionMethod::Deflated);
214    zip.start_file(filename, options)?;
215    zip.write_all(data)?;
216
217    Ok(())
218}
219
220async fn send_profile_data(
221    s3_client: &aws_sdk_s3::Client,
222    bucket_owner: String,
223    bucket_name: String,
224    object_name: String,
225    zip: Vec<u8>,
226) -> Result<(), S3ReporterError> {
227    tracing::debug!(message="uploading to s3", bucket_name=?bucket_name, object_name=?object_name);
228    // Make http call to upload JFR to S3.
229    s3_client
230        .put_object()
231        .expected_bucket_owner(bucket_owner)
232        .bucket(bucket_name)
233        .key(object_name)
234        .body(zip.into())
235        .content_type("application/zip")
236        .send()
237        .await
238        .map_err(|x| S3ReporterError::SendProfileS3Data(Box::new(x.into())))?;
239    Ok(())
240}
241
242#[cfg(test)]
243mod test {
244    use std::{
245        io,
246        sync::{Arc, Mutex},
247        time::SystemTime,
248    };
249
250    use aws_sdk_s3::operation::put_object::PutObjectOutput;
251    use aws_smithy_mocks::{mock, mock_client};
252
253    use test_case::test_case;
254
255    use crate::{
256        metadata::{AgentMetadata, DUMMY_METADATA},
257        reporter::s3::S3Reporter,
258    };
259
260    fn assert_zip(zip_file: Vec<u8>) {
261        let zip = zip::ZipArchive::new(io::Cursor::new(&zip_file)).unwrap();
262        let mut file_names: Vec<_> = zip.file_names().collect();
263        file_names.sort();
264        assert_eq!(
265            file_names,
266            vec!["async_profiler_dump_0.jfr", "metadata.json"]
267        );
268    }
269
270    #[test_case(#[allow(deprecated)] { AgentMetadata::Other }, "profile_pg_onprem___<pid>_<time>.zip"; "other")]
271    #[test_case(AgentMetadata::NoMetadata, "profile_pg_unknown___<pid>_<time>.zip"; "no-metadata")]
272    #[test_case(AgentMetadata::ec2_agent_metadata(
273        "1".into(),
274        "us-east-1".into(),
275        "i-0".into()
276    ).build(), "profile_pg_ec2_i-0__<pid>_<time>.zip"; "ec2")]
277    #[test_case(AgentMetadata::fargate_agent_metadata(
278        "1".into(),
279        "us-east-1".into(),
280        "arn:aws:ecs:us-east-1:123456789012:task/profiler-metadata-cluster/5261e761e0e2a3d92da3f02c8e5bab1f".into(),
281        "arn:aws:ecs:us-east-1:123456789012:cluster/profiler-metadata-cluster".into(),
282    ).build(), "profile_pg_ecs_arn:aws:ecs:us-east-1:123456789012:task-profiler-metadata-cluster-5261e761e0e2a3d92da3f02c8e5bab1f__<pid>_<time>.zip"; "ecs")]
283    fn test_make_s3_file_name(metadata: AgentMetadata, expected: &str) {
284        let file_name = super::make_s3_file_name(&metadata, "pg", SystemTime::UNIX_EPOCH);
285        assert_eq!(
286            file_name,
287            expected
288                .replace("<pid>", &std::process::id().to_string())
289                .replace("<time>", "1970-01-01T00-00-00Z")
290        );
291    }
292
293    #[tokio::test]
294    async fn test_reporter() {
295        let uploaded_file = Arc::new(Mutex::new(None));
296        let uploaded_file_ = uploaded_file.clone();
297        let put_object_rule = mock!(aws_sdk_s3::Client::put_object)
298            .match_requests(move |req| {
299                *uploaded_file_.lock().unwrap() = Some(req.body().bytes().unwrap().to_vec());
300                true
301            })
302            .then_output(|| PutObjectOutput::builder().build());
303
304        // Create a mocked client with the rule
305        // Use the standard Builder instead of with_test_defaults
306        let reporter = S3Reporter {
307            s3_client: mock_client!(aws_sdk_s3, [&put_object_rule]),
308            bucket_owner: "123456789012".into(),
309            bucket_name: "123456789012-bucket".into(),
310            profiling_group_name: "test-profiling-group".into(),
311        };
312        reporter
313            .report_profiling_data(b"JFR".into(), &DUMMY_METADATA)
314            .await
315            .unwrap();
316        assert_zip(uploaded_file.lock().unwrap().take().unwrap());
317    }
318
319    #[tokio::test]
320    async fn test_reporter_error() {
321        let put_object_rule = mock!(aws_sdk_s3::Client::put_object).then_error(|| {
322            aws_sdk_s3::operation::put_object::PutObjectError::unhandled(io::Error::new(
323                io::ErrorKind::Other,
324                "oh no",
325            ))
326        });
327
328        // Create a mocked client with the rule
329        // Use the standard Builder instead of with_test_defaults
330        let reporter = S3Reporter {
331            s3_client: mock_client!(aws_sdk_s3, [&put_object_rule]),
332            bucket_owner: "123456789012".into(),
333            bucket_name: "123456789012-bucket".into(),
334            profiling_group_name: "test-profiling-group".into(),
335        };
336        reporter
337            .report_profiling_data(b"JFR".into(), &DUMMY_METADATA)
338            .await
339            .unwrap_err();
340    }
341}