async_profiler_agent/reporter/
s3.rs1use async_trait::async_trait;
7use aws_config::SdkConfig;
8use chrono::SecondsFormat;
9use serde::Serialize;
10use std::io::Write;
11use std::time::SystemTime;
12use std::{fmt, io::Cursor};
13use thiserror::Error;
14use zip::result::ZipError;
15use zip::{write::SimpleFileOptions, ZipWriter};
16
17use crate::metadata::{AgentMetadata, ReportMetadata};
18
19use super::Reporter;
20
21#[derive(Error, Debug)]
23pub enum S3ReporterError {
24 #[error("io error creating zip file: {0}")]
26 ZipIoError(std::io::Error),
27 #[error("creating zip file: {0}")]
29 ZipError(#[from] ZipError),
30 #[error("failed to send profile data directly to S3: {0}")]
32 SendProfileS3Data(Box<aws_sdk_s3::Error>),
33 #[error("tokio task: {0}")]
35 JoinError(#[from] tokio::task::JoinError),
36}
37
38#[derive(Debug, Serialize)]
40#[non_exhaustive]
41pub struct MetadataJson {
42 start: u64,
43 end: u64,
44 reporting_interval: u64,
45}
46
47pub struct S3ReporterConfig<'a> {
49 pub sdk_config: &'a SdkConfig,
51 pub bucket_owner: String,
53 pub bucket_name: String,
55 pub profiling_group_name: String,
57}
58
59pub struct S3Reporter {
61 s3_client: aws_sdk_s3::Client,
62 bucket_owner: String,
63 bucket_name: String,
64 profiling_group_name: String,
65}
66
67impl S3Reporter {
68 pub fn new(config: S3ReporterConfig<'_>) -> Self {
70 let S3ReporterConfig {
71 sdk_config,
72 bucket_owner,
73 bucket_name,
74 profiling_group_name,
75 } = config;
76 let s3_client_config = aws_sdk_s3::config::Builder::from(sdk_config).build();
77 let s3_client = aws_sdk_s3::Client::from_conf(s3_client_config);
78
79 S3Reporter {
80 s3_client,
81 bucket_owner,
82 bucket_name,
83 profiling_group_name,
84 }
85 }
86
87 pub async fn report_profiling_data(
89 &self,
90 jfr: Vec<u8>,
91 metadata_obj: &ReportMetadata<'_>,
92 ) -> Result<(), S3ReporterError> {
93 tracing::debug!("sending file to backend");
94
95 let metadata_json = MetadataJson {
96 start: metadata_obj.start.as_millis() as u64,
97 end: metadata_obj.end.as_millis() as u64,
98 reporting_interval: metadata_obj.reporting_interval.as_millis() as u64,
99 };
100
101 let zip = tokio::task::spawn_blocking(move || {
103 add_files_to_zip("async_profiler_dump_0.jfr", &jfr, metadata_json)
104 })
105 .await??;
106
107 send_profile_data(
109 &self.s3_client,
110 self.bucket_owner.clone(),
111 self.bucket_name.clone(),
112 make_s3_file_name(
113 metadata_obj.instance,
114 &self.profiling_group_name,
115 SystemTime::now(),
116 ),
117 zip,
118 )
119 .await?;
120
121 Ok(())
122 }
123}
124
125fn make_s3_file_name(
126 metadata_obj: &AgentMetadata,
127 profiling_group_name: &str,
128 time: SystemTime,
129) -> String {
130 let machine = match metadata_obj {
131 AgentMetadata::Ec2AgentMetadata {
132 aws_account_id: _,
133 aws_region_id: _,
134 ec2_instance_id,
135 } => {
136 let ec2_instance_id = ec2_instance_id.replace("/", "-").replace("_", "-");
137 format!("ec2_{ec2_instance_id}_")
138 }
139 AgentMetadata::FargateAgentMetadata {
140 aws_account_id: _,
141 aws_region_id: _,
142 ecs_task_arn,
143 ecs_cluster_arn: _,
144 } => {
145 let task_arn = ecs_task_arn.replace("/", "-").replace("_", "-");
146 format!("ecs_{task_arn}_")
147 }
148 #[allow(deprecated)]
149 AgentMetadata::Other => "onprem__".to_string(),
150 AgentMetadata::NoMetadata => "unknown__".to_string(),
151 };
152 let time: chrono::DateTime<chrono::Utc> = time.into();
153 let time = time
154 .to_rfc3339_opts(SecondsFormat::Secs, true)
155 .replace(":", "-");
156 let pid = std::process::id();
157 format!("profile_{profiling_group_name}_{machine}_{pid}_{time}.zip")
158}
159
160#[async_trait]
161impl Reporter for S3Reporter {
162 async fn report(
163 &self,
164 jfr: Vec<u8>,
165 metadata: &ReportMetadata,
166 ) -> Result<(), Box<dyn std::error::Error + Send>> {
167 self.report_profiling_data(jfr, metadata)
168 .await
169 .map_err(|e| Box::new(e) as _)
170 }
171}
172
173impl fmt::Debug for S3Reporter {
174 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
175 f.debug_struct("S3Reporter").finish()
176 }
177}
178
179fn add_files_to_zip(
180 jfr_filename: &str,
181 jfr_file: &[u8],
182 metadata_json: MetadataJson,
183) -> Result<Vec<u8>, S3ReporterError> {
184 tracing::debug!("creating zip file");
185
186 let file = Cursor::new(vec![]);
187 let mut zip = ZipWriter::new(file);
188 let metadata = serde_json::ser::to_string(&metadata_json).unwrap();
189 add_bytes_to_zip(&mut zip, jfr_filename, jfr_file).map_err(S3ReporterError::ZipIoError)?;
190 add_bytes_to_zip(&mut zip, "metadata.json", metadata.as_bytes())
191 .map_err(S3ReporterError::ZipIoError)?;
192 Ok(zip.finish()?.into_inner())
193}
194
195fn add_bytes_to_zip(
196 zip: &mut ZipWriter<Cursor<Vec<u8>>>,
197 filename: &str,
198 data: &[u8],
199) -> Result<(), std::io::Error> {
200 let options = SimpleFileOptions::default().compression_method(zip::CompressionMethod::Deflated);
201 zip.start_file(filename, options)?;
202 zip.write_all(data)?;
203
204 Ok(())
205}
206
207async fn send_profile_data(
208 s3_client: &aws_sdk_s3::Client,
209 bucket_owner: String,
210 bucket_name: String,
211 object_name: String,
212 zip: Vec<u8>,
213) -> Result<(), S3ReporterError> {
214 tracing::debug!(message="uploading to s3", bucket_name=?bucket_name, object_name=?object_name);
215 s3_client
217 .put_object()
218 .expected_bucket_owner(bucket_owner)
219 .bucket(bucket_name)
220 .key(object_name)
221 .body(zip.into())
222 .content_type("application/zip")
223 .send()
224 .await
225 .map_err(|x| S3ReporterError::SendProfileS3Data(Box::new(x.into())))?;
226 Ok(())
227}
228
229#[cfg(test)]
230mod test {
231 use std::{
232 io,
233 sync::{Arc, Mutex},
234 time::SystemTime,
235 };
236
237 use aws_sdk_s3::operation::put_object::PutObjectOutput;
238 use aws_smithy_mocks::{mock, mock_client};
239
240 use test_case::test_case;
241
242 use crate::{
243 metadata::{AgentMetadata, DUMMY_METADATA},
244 reporter::s3::S3Reporter,
245 };
246
247 fn assert_zip(zip_file: Vec<u8>) {
248 let zip = zip::ZipArchive::new(io::Cursor::new(&zip_file)).unwrap();
249 let mut file_names: Vec<_> = zip.file_names().collect();
250 file_names.sort();
251 assert_eq!(
252 file_names,
253 vec!["async_profiler_dump_0.jfr", "metadata.json"]
254 );
255 }
256
257 #[test_case(#[allow(deprecated)] { AgentMetadata::Other }, "profile_pg_onprem___<pid>_<time>.zip"; "other")]
258 #[test_case(AgentMetadata::NoMetadata, "profile_pg_unknown___<pid>_<time>.zip"; "no-metadata")]
259 #[test_case(AgentMetadata::Ec2AgentMetadata {
260 aws_account_id: "1".into(),
261 aws_region_id: "us-east-1".into(),
262 ec2_instance_id: "i-0".into()
263 }, "profile_pg_ec2_i-0__<pid>_<time>.zip"; "ec2")]
264 #[test_case(AgentMetadata::FargateAgentMetadata {
265 aws_account_id: "1".into(),
266 aws_region_id: "us-east-1".into(),
267 ecs_task_arn: "arn:aws:ecs:us-east-1:123456789012:task/profiler-metadata-cluster/5261e761e0e2a3d92da3f02c8e5bab1f".into(),
268 ecs_cluster_arn: "arn:aws:ecs:us-east-1:123456789012:cluster/profiler-metadata-cluster".into()
269 }, "profile_pg_ecs_arn:aws:ecs:us-east-1:123456789012:task-profiler-metadata-cluster-5261e761e0e2a3d92da3f02c8e5bab1f__<pid>_<time>.zip"; "ecs")]
270 fn test_make_s3_file_name(metadata: AgentMetadata, expected: &str) {
271 let file_name = super::make_s3_file_name(&metadata, "pg", SystemTime::UNIX_EPOCH);
272 assert_eq!(
273 file_name,
274 expected
275 .replace("<pid>", &std::process::id().to_string())
276 .replace("<time>", "1970-01-01T00-00-00Z")
277 );
278 }
279
280 #[tokio::test]
281 async fn test_reporter() {
282 let uploaded_file = Arc::new(Mutex::new(None));
283 let uploaded_file_ = uploaded_file.clone();
284 let put_object_rule = mock!(aws_sdk_s3::Client::put_object)
285 .match_requests(move |req| {
286 *uploaded_file_.lock().unwrap() = Some(req.body().bytes().unwrap().to_vec());
287 true
288 })
289 .then_output(|| PutObjectOutput::builder().build());
290
291 let reporter = S3Reporter {
294 s3_client: mock_client!(aws_sdk_s3, [&put_object_rule]),
295 bucket_owner: "123456789012".into(),
296 bucket_name: "123456789012-bucket".into(),
297 profiling_group_name: "test-profiling-group".into(),
298 };
299 reporter
300 .report_profiling_data(b"JFR".into(), &DUMMY_METADATA)
301 .await
302 .unwrap();
303 assert_zip(uploaded_file.lock().unwrap().take().unwrap());
304 }
305
306 #[tokio::test]
307 async fn test_reporter_error() {
308 let put_object_rule = mock!(aws_sdk_s3::Client::put_object).then_error(|| {
309 aws_sdk_s3::operation::put_object::PutObjectError::unhandled(io::Error::new(
310 io::ErrorKind::Other,
311 "oh no",
312 ))
313 });
314
315 let reporter = S3Reporter {
318 s3_client: mock_client!(aws_sdk_s3, [&put_object_rule]),
319 bucket_owner: "123456789012".into(),
320 bucket_name: "123456789012-bucket".into(),
321 profiling_group_name: "test-profiling-group".into(),
322 };
323 reporter
324 .report_profiling_data(b"JFR".into(), &DUMMY_METADATA)
325 .await
326 .unwrap_err();
327 }
328}