async_profiler_agent/reporter/
s3.rs1use async_trait::async_trait;
7use aws_config::SdkConfig;
8use chrono::SecondsFormat;
9use serde::Serialize;
10use std::io::Write;
11use std::time::SystemTime;
12use std::{fmt, io::Cursor};
13use thiserror::Error;
14use zip::result::ZipError;
15use zip::{ZipWriter, write::SimpleFileOptions};
16
17use crate::metadata::{AgentMetadata, ReportMetadata};
18
19use super::Reporter;
20
21#[derive(Error, Debug)]
23pub enum S3ReporterError {
24 #[error("io error creating zip file: {0}")]
26 ZipIoError(std::io::Error),
27 #[error("creating zip file: {0}")]
29 ZipError(#[from] ZipError),
30 #[error("failed to send profile data directly to S3: {0}")]
32 SendProfileS3Data(Box<aws_sdk_s3::Error>),
33 #[error("tokio task: {0}")]
35 JoinError(#[from] tokio::task::JoinError),
36}
37
38#[derive(Debug, Serialize)]
40#[non_exhaustive]
41pub struct MetadataJson {
42 start: u64,
43 end: u64,
44 reporting_interval: u64,
45}
46
47pub struct S3ReporterConfig<'a> {
49 pub sdk_config: &'a SdkConfig,
51 pub bucket_owner: String,
53 pub bucket_name: String,
55 pub profiling_group_name: String,
57}
58
59pub struct S3Reporter {
72 s3_client: aws_sdk_s3::Client,
73 bucket_owner: String,
74 bucket_name: String,
75 profiling_group_name: String,
76}
77
78impl S3Reporter {
79 pub fn new(config: S3ReporterConfig<'_>) -> Self {
81 let S3ReporterConfig {
82 sdk_config,
83 bucket_owner,
84 bucket_name,
85 profiling_group_name,
86 } = config;
87 let s3_client_config = aws_sdk_s3::config::Builder::from(sdk_config).build();
88 let s3_client = aws_sdk_s3::Client::from_conf(s3_client_config);
89
90 S3Reporter {
91 s3_client,
92 bucket_owner,
93 bucket_name,
94 profiling_group_name,
95 }
96 }
97
98 pub async fn report_profiling_data(
100 &self,
101 jfr: Vec<u8>,
102 metadata_obj: &ReportMetadata<'_>,
103 ) -> Result<(), S3ReporterError> {
104 tracing::debug!("sending file to backend");
105
106 let metadata_json = MetadataJson {
107 start: metadata_obj.start.as_millis() as u64,
108 end: metadata_obj.end.as_millis() as u64,
109 reporting_interval: metadata_obj.reporting_interval.as_millis() as u64,
110 };
111
112 let zip = tokio::task::spawn_blocking(move || {
114 add_files_to_zip("async_profiler_dump_0.jfr", &jfr, metadata_json)
115 })
116 .await??;
117
118 send_profile_data(
120 &self.s3_client,
121 self.bucket_owner.clone(),
122 self.bucket_name.clone(),
123 make_s3_file_name(
124 metadata_obj.instance,
125 &self.profiling_group_name,
126 SystemTime::now(),
127 ),
128 zip,
129 )
130 .await?;
131
132 Ok(())
133 }
134}
135
136fn make_s3_file_name(
137 metadata_obj: &AgentMetadata,
138 profiling_group_name: &str,
139 time: SystemTime,
140) -> String {
141 let machine = match metadata_obj {
142 AgentMetadata::Ec2AgentMetadata {
143 aws_account_id: _,
144 aws_region_id: _,
145 ec2_instance_id,
146 ..
147 } => {
148 let ec2_instance_id = ec2_instance_id.replace("/", "-").replace("_", "-");
149 format!("ec2_{ec2_instance_id}_")
150 }
151 AgentMetadata::FargateAgentMetadata {
152 aws_account_id: _,
153 aws_region_id: _,
154 ecs_task_arn,
155 ecs_cluster_arn: _,
156 ..
157 } => {
158 let task_arn = ecs_task_arn.replace("/", "-").replace("_", "-");
159 format!("ecs_{task_arn}_")
160 }
161 #[allow(deprecated)]
162 AgentMetadata::Other => "onprem__".to_string(),
163 AgentMetadata::NoMetadata => "unknown__".to_string(),
164 };
165 let time: chrono::DateTime<chrono::Utc> = time.into();
166 let time = time
167 .to_rfc3339_opts(SecondsFormat::Secs, true)
168 .replace(":", "-");
169 let pid = std::process::id();
170 format!("profile_{profiling_group_name}_{machine}_{pid}_{time}.zip")
171}
172
173#[async_trait]
174impl Reporter for S3Reporter {
175 async fn report(
176 &self,
177 jfr: Vec<u8>,
178 metadata: &ReportMetadata,
179 ) -> Result<(), Box<dyn std::error::Error + Send>> {
180 self.report_profiling_data(jfr, metadata)
181 .await
182 .map_err(|e| Box::new(e) as _)
183 }
184}
185
186impl fmt::Debug for S3Reporter {
187 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188 f.debug_struct("S3Reporter").finish()
189 }
190}
191
192fn add_files_to_zip(
193 jfr_filename: &str,
194 jfr_file: &[u8],
195 metadata_json: MetadataJson,
196) -> Result<Vec<u8>, S3ReporterError> {
197 tracing::debug!("creating zip file");
198
199 let file = Cursor::new(vec![]);
200 let mut zip = ZipWriter::new(file);
201 let metadata = serde_json::ser::to_string(&metadata_json).unwrap();
202 add_bytes_to_zip(&mut zip, jfr_filename, jfr_file).map_err(S3ReporterError::ZipIoError)?;
203 add_bytes_to_zip(&mut zip, "metadata.json", metadata.as_bytes())
204 .map_err(S3ReporterError::ZipIoError)?;
205 Ok(zip.finish()?.into_inner())
206}
207
208fn add_bytes_to_zip(
209 zip: &mut ZipWriter<Cursor<Vec<u8>>>,
210 filename: &str,
211 data: &[u8],
212) -> Result<(), std::io::Error> {
213 let options = SimpleFileOptions::default().compression_method(zip::CompressionMethod::Deflated);
214 zip.start_file(filename, options)?;
215 zip.write_all(data)?;
216
217 Ok(())
218}
219
220async fn send_profile_data(
221 s3_client: &aws_sdk_s3::Client,
222 bucket_owner: String,
223 bucket_name: String,
224 object_name: String,
225 zip: Vec<u8>,
226) -> Result<(), S3ReporterError> {
227 tracing::debug!(message="uploading to s3", bucket_name=?bucket_name, object_name=?object_name);
228 s3_client
230 .put_object()
231 .expected_bucket_owner(bucket_owner)
232 .bucket(bucket_name)
233 .key(object_name)
234 .body(zip.into())
235 .content_type("application/zip")
236 .send()
237 .await
238 .map_err(|x| S3ReporterError::SendProfileS3Data(Box::new(x.into())))?;
239 Ok(())
240}
241
242#[cfg(test)]
243mod test {
244 use std::{
245 io,
246 sync::{Arc, Mutex},
247 time::SystemTime,
248 };
249
250 use aws_sdk_s3::operation::put_object::PutObjectOutput;
251 use aws_smithy_mocks::{mock, mock_client};
252
253 use test_case::test_case;
254
255 use crate::{
256 metadata::{AgentMetadata, DUMMY_METADATA},
257 reporter::s3::S3Reporter,
258 };
259
260 fn assert_zip(zip_file: Vec<u8>) {
261 let zip = zip::ZipArchive::new(io::Cursor::new(&zip_file)).unwrap();
262 let mut file_names: Vec<_> = zip.file_names().collect();
263 file_names.sort();
264 assert_eq!(
265 file_names,
266 vec!["async_profiler_dump_0.jfr", "metadata.json"]
267 );
268 }
269
270 #[test_case(#[allow(deprecated)] { AgentMetadata::Other }, "profile_pg_onprem___<pid>_<time>.zip"; "other")]
271 #[test_case(AgentMetadata::NoMetadata, "profile_pg_unknown___<pid>_<time>.zip"; "no-metadata")]
272 #[test_case(AgentMetadata::ec2_agent_metadata(
273 "1".into(),
274 "us-east-1".into(),
275 "i-0".into()
276 ).build(), "profile_pg_ec2_i-0__<pid>_<time>.zip"; "ec2")]
277 #[test_case(AgentMetadata::fargate_agent_metadata(
278 "1".into(),
279 "us-east-1".into(),
280 "arn:aws:ecs:us-east-1:123456789012:task/profiler-metadata-cluster/5261e761e0e2a3d92da3f02c8e5bab1f".into(),
281 "arn:aws:ecs:us-east-1:123456789012:cluster/profiler-metadata-cluster".into(),
282 ).build(), "profile_pg_ecs_arn:aws:ecs:us-east-1:123456789012:task-profiler-metadata-cluster-5261e761e0e2a3d92da3f02c8e5bab1f__<pid>_<time>.zip"; "ecs")]
283 fn test_make_s3_file_name(metadata: AgentMetadata, expected: &str) {
284 let file_name = super::make_s3_file_name(&metadata, "pg", SystemTime::UNIX_EPOCH);
285 assert_eq!(
286 file_name,
287 expected
288 .replace("<pid>", &std::process::id().to_string())
289 .replace("<time>", "1970-01-01T00-00-00Z")
290 );
291 }
292
293 #[tokio::test]
294 async fn test_reporter() {
295 let uploaded_file = Arc::new(Mutex::new(None));
296 let uploaded_file_ = uploaded_file.clone();
297 let put_object_rule = mock!(aws_sdk_s3::Client::put_object)
298 .match_requests(move |req| {
299 *uploaded_file_.lock().unwrap() = Some(req.body().bytes().unwrap().to_vec());
300 true
301 })
302 .then_output(|| PutObjectOutput::builder().build());
303
304 let reporter = S3Reporter {
307 s3_client: mock_client!(aws_sdk_s3, [&put_object_rule]),
308 bucket_owner: "123456789012".into(),
309 bucket_name: "123456789012-bucket".into(),
310 profiling_group_name: "test-profiling-group".into(),
311 };
312 reporter
313 .report_profiling_data(b"JFR".into(), &DUMMY_METADATA)
314 .await
315 .unwrap();
316 assert_zip(uploaded_file.lock().unwrap().take().unwrap());
317 }
318
319 #[tokio::test]
320 async fn test_reporter_error() {
321 let put_object_rule = mock!(aws_sdk_s3::Client::put_object).then_error(|| {
322 aws_sdk_s3::operation::put_object::PutObjectError::unhandled(io::Error::new(
323 io::ErrorKind::Other,
324 "oh no",
325 ))
326 });
327
328 let reporter = S3Reporter {
331 s3_client: mock_client!(aws_sdk_s3, [&put_object_rule]),
332 bucket_owner: "123456789012".into(),
333 bucket_name: "123456789012-bucket".into(),
334 profiling_group_name: "test-profiling-group".into(),
335 };
336 reporter
337 .report_profiling_data(b"JFR".into(), &DUMMY_METADATA)
338 .await
339 .unwrap_err();
340 }
341}