async_profiler_agent/reporter/
s3.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use async_trait::async_trait;
5use aws_config::SdkConfig;
6use chrono::SecondsFormat;
7use serde::Serialize;
8use std::io::Write;
9use std::time::SystemTime;
10use std::{fmt, io::Cursor};
11use thiserror::Error;
12use zip::result::ZipError;
13use zip::{write::SimpleFileOptions, ZipWriter};
14
15use crate::metadata::{AgentMetadata, ReportMetadata};
16
17use super::Reporter;
18
19#[derive(Error, Debug)]
20pub enum S3ReporterError {
21    #[error("io error creating zip file: {0}")]
22    ZipIoError(std::io::Error),
23    #[error("creating zip file: {0}")]
24    ZipError(#[from] ZipError),
25    #[error("failed to send profile data directly to S3: {0}")]
26    SendProfileS3Data(Box<aws_sdk_s3::Error>),
27    #[error("tokio task: {0}")]
28    JoinError(#[from] tokio::task::JoinError),
29}
30
31#[derive(Debug, Serialize)]
32pub struct MetadataJson {
33    start: u64,
34    end: u64,
35    reporting_interval: u64,
36}
37
38/// Mandatory parameters in order to configure an S3 reporter
39pub struct S3ReporterConfig<'a> {
40    /// The SDK config to get credentials from
41    pub sdk_config: &'a SdkConfig,
42    /// The expected bucket owner account
43    pub bucket_owner: String,
44    /// The bucket name
45    pub bucket_name: String,
46    /// The profiling group name, used in the file names within the bucket
47    pub profiling_group_name: String,
48}
49
50/// A reporter for S3.
51pub struct S3Reporter {
52    s3_client: aws_sdk_s3::Client,
53    bucket_owner: String,
54    bucket_name: String,
55    profiling_group_name: String,
56}
57
58impl S3Reporter {
59    /// Makes a new one.
60    pub fn new(config: S3ReporterConfig<'_>) -> Self {
61        let S3ReporterConfig {
62            sdk_config,
63            bucket_owner,
64            bucket_name,
65            profiling_group_name,
66        } = config;
67        let s3_client_config = aws_sdk_s3::config::Builder::from(sdk_config).build();
68        let s3_client = aws_sdk_s3::Client::from_conf(s3_client_config);
69
70        S3Reporter {
71            s3_client,
72            bucket_owner,
73            bucket_name,
74            profiling_group_name,
75        }
76    }
77
78    /// Makes a zip file, then uploads it.
79    pub async fn report_profiling_data(
80        &self,
81        jfr: Vec<u8>,
82        metadata_obj: &ReportMetadata<'_>,
83    ) -> Result<(), S3ReporterError> {
84        tracing::debug!("sending file to backend");
85
86        let metadata_json = MetadataJson {
87            start: metadata_obj.start.as_millis() as u64,
88            end: metadata_obj.end.as_millis() as u64,
89            reporting_interval: metadata_obj.reporting_interval.as_millis() as u64,
90        };
91
92        // Create a zip file.
93        let zip = tokio::task::spawn_blocking(move || {
94            add_files_to_zip("async_profiler_dump_0.jfr", &jfr, metadata_json)
95        })
96        .await??;
97
98        // Send zip file to the S3 pre-signed URL.
99        send_profile_data(
100            &self.s3_client,
101            self.bucket_owner.clone(),
102            self.bucket_name.clone(),
103            make_s3_file_name(metadata_obj.instance, &self.profiling_group_name),
104            zip,
105        )
106        .await?;
107
108        Ok(())
109    }
110}
111
112fn make_s3_file_name(metadata_obj: &AgentMetadata, profiling_group_name: &str) -> String {
113    let machine = match metadata_obj {
114        AgentMetadata::Ec2AgentMetadata {
115            aws_account_id: _,
116            aws_region_id: _,
117            ec2_instance_id,
118        } => {
119            let ec2_instance_id = ec2_instance_id.replace("/", "-").replace("_", "-");
120            format!("ec2_{ec2_instance_id}_")
121        }
122        AgentMetadata::FargateAgentMetadata {
123            aws_account_id: _,
124            aws_region_id: _,
125            ecs_task_arn,
126            ecs_cluster_arn,
127        } => {
128            let task_arn = ecs_task_arn.replace("/", "-").replace("_", "-");
129            let cluster_arn = ecs_cluster_arn.replace("/", "-").replace("_", "-");
130            format!("ecs_{cluster_arn}_{task_arn}")
131        }
132        AgentMetadata::Other => "onprem__".to_string(),
133    };
134    let time: chrono::DateTime<chrono::Utc> = SystemTime::now().into();
135    let time = time
136        .to_rfc3339_opts(SecondsFormat::Secs, true)
137        .replace(":", "-");
138    let pid = std::process::id();
139    format!("profile_{profiling_group_name}_{machine}_{pid}_{time}.zip")
140}
141
142#[async_trait]
143impl Reporter for S3Reporter {
144    async fn report(
145        &self,
146        jfr: Vec<u8>,
147        metadata: &ReportMetadata,
148    ) -> Result<(), Box<dyn std::error::Error + Send>> {
149        self.report_profiling_data(jfr, metadata)
150            .await
151            .map_err(|e| Box::new(e) as _)
152    }
153}
154
155impl fmt::Debug for S3Reporter {
156    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157        f.debug_struct("S3Reporter").finish()
158    }
159}
160
161fn add_files_to_zip(
162    jfr_filename: &str,
163    jfr_file: &[u8],
164    metadata_json: MetadataJson,
165) -> Result<Vec<u8>, S3ReporterError> {
166    tracing::debug!("creating zip file");
167
168    let file = Cursor::new(vec![]);
169    let mut zip = ZipWriter::new(file);
170    let metadata = serde_json::ser::to_string(&metadata_json).unwrap();
171    add_bytes_to_zip(&mut zip, jfr_filename, jfr_file).map_err(S3ReporterError::ZipIoError)?;
172    add_bytes_to_zip(&mut zip, "metadata.json", metadata.as_bytes())
173        .map_err(S3ReporterError::ZipIoError)?;
174    Ok(zip.finish()?.into_inner())
175}
176
177fn add_bytes_to_zip(
178    zip: &mut ZipWriter<Cursor<Vec<u8>>>,
179    filename: &str,
180    data: &[u8],
181) -> Result<(), std::io::Error> {
182    let options = SimpleFileOptions::default().compression_method(zip::CompressionMethod::Deflated);
183    zip.start_file(filename, options)?;
184    zip.write_all(data)?;
185
186    Ok(())
187}
188
189async fn send_profile_data(
190    s3_client: &aws_sdk_s3::Client,
191    bucket_owner: String,
192    bucket_name: String,
193    object_name: String,
194    zip: Vec<u8>,
195) -> Result<(), S3ReporterError> {
196    tracing::debug!(message="uploading to s3", bucket_name=?bucket_name, object_name=?object_name);
197    // Make http call to upload JFR to S3.
198    s3_client
199        .put_object()
200        .expected_bucket_owner(bucket_owner)
201        .bucket(bucket_name)
202        .key(object_name)
203        .body(zip.into())
204        .content_type("application/zip")
205        .send()
206        .await
207        .map_err(|x| S3ReporterError::SendProfileS3Data(Box::new(x.into())))?;
208    Ok(())
209}