async_profiler_agent/reporter/
s3.rs1use async_trait::async_trait;
5use aws_config::SdkConfig;
6use chrono::SecondsFormat;
7use serde::Serialize;
8use std::io::Write;
9use std::time::SystemTime;
10use std::{fmt, io::Cursor};
11use thiserror::Error;
12use zip::result::ZipError;
13use zip::{write::SimpleFileOptions, ZipWriter};
14
15use crate::metadata::{AgentMetadata, ReportMetadata};
16
17use super::Reporter;
18
19#[derive(Error, Debug)]
20pub enum S3ReporterError {
21 #[error("io error creating zip file: {0}")]
22 ZipIoError(std::io::Error),
23 #[error("creating zip file: {0}")]
24 ZipError(#[from] ZipError),
25 #[error("failed to send profile data directly to S3: {0}")]
26 SendProfileS3Data(Box<aws_sdk_s3::Error>),
27 #[error("tokio task: {0}")]
28 JoinError(#[from] tokio::task::JoinError),
29}
30
31#[derive(Debug, Serialize)]
32pub struct MetadataJson {
33 start: u64,
34 end: u64,
35 reporting_interval: u64,
36}
37
38pub struct S3ReporterConfig<'a> {
40 pub sdk_config: &'a SdkConfig,
42 pub bucket_owner: String,
44 pub bucket_name: String,
46 pub profiling_group_name: String,
48}
49
50pub struct S3Reporter {
52 s3_client: aws_sdk_s3::Client,
53 bucket_owner: String,
54 bucket_name: String,
55 profiling_group_name: String,
56}
57
58impl S3Reporter {
59 pub fn new(config: S3ReporterConfig<'_>) -> Self {
61 let S3ReporterConfig {
62 sdk_config,
63 bucket_owner,
64 bucket_name,
65 profiling_group_name,
66 } = config;
67 let s3_client_config = aws_sdk_s3::config::Builder::from(sdk_config).build();
68 let s3_client = aws_sdk_s3::Client::from_conf(s3_client_config);
69
70 S3Reporter {
71 s3_client,
72 bucket_owner,
73 bucket_name,
74 profiling_group_name,
75 }
76 }
77
78 pub async fn report_profiling_data(
80 &self,
81 jfr: Vec<u8>,
82 metadata_obj: &ReportMetadata<'_>,
83 ) -> Result<(), S3ReporterError> {
84 tracing::debug!("sending file to backend");
85
86 let metadata_json = MetadataJson {
87 start: metadata_obj.start.as_millis() as u64,
88 end: metadata_obj.end.as_millis() as u64,
89 reporting_interval: metadata_obj.reporting_interval.as_millis() as u64,
90 };
91
92 let zip = tokio::task::spawn_blocking(move || {
94 add_files_to_zip("async_profiler_dump_0.jfr", &jfr, metadata_json)
95 })
96 .await??;
97
98 send_profile_data(
100 &self.s3_client,
101 self.bucket_owner.clone(),
102 self.bucket_name.clone(),
103 make_s3_file_name(metadata_obj.instance, &self.profiling_group_name),
104 zip,
105 )
106 .await?;
107
108 Ok(())
109 }
110}
111
112fn make_s3_file_name(metadata_obj: &AgentMetadata, profiling_group_name: &str) -> String {
113 let machine = match metadata_obj {
114 AgentMetadata::Ec2AgentMetadata {
115 aws_account_id: _,
116 aws_region_id: _,
117 ec2_instance_id,
118 } => {
119 let ec2_instance_id = ec2_instance_id.replace("/", "-").replace("_", "-");
120 format!("ec2_{ec2_instance_id}_")
121 }
122 AgentMetadata::FargateAgentMetadata {
123 aws_account_id: _,
124 aws_region_id: _,
125 ecs_task_arn,
126 ecs_cluster_arn,
127 } => {
128 let task_arn = ecs_task_arn.replace("/", "-").replace("_", "-");
129 let cluster_arn = ecs_cluster_arn.replace("/", "-").replace("_", "-");
130 format!("ecs_{cluster_arn}_{task_arn}")
131 }
132 AgentMetadata::Other => "onprem__".to_string(),
133 };
134 let time: chrono::DateTime<chrono::Utc> = SystemTime::now().into();
135 let time = time
136 .to_rfc3339_opts(SecondsFormat::Secs, true)
137 .replace(":", "-");
138 let pid = std::process::id();
139 format!("profile_{profiling_group_name}_{machine}_{pid}_{time}.zip")
140}
141
142#[async_trait]
143impl Reporter for S3Reporter {
144 async fn report(
145 &self,
146 jfr: Vec<u8>,
147 metadata: &ReportMetadata,
148 ) -> Result<(), Box<dyn std::error::Error + Send>> {
149 self.report_profiling_data(jfr, metadata)
150 .await
151 .map_err(|e| Box::new(e) as _)
152 }
153}
154
155impl fmt::Debug for S3Reporter {
156 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157 f.debug_struct("S3Reporter").finish()
158 }
159}
160
161fn add_files_to_zip(
162 jfr_filename: &str,
163 jfr_file: &[u8],
164 metadata_json: MetadataJson,
165) -> Result<Vec<u8>, S3ReporterError> {
166 tracing::debug!("creating zip file");
167
168 let file = Cursor::new(vec![]);
169 let mut zip = ZipWriter::new(file);
170 let metadata = serde_json::ser::to_string(&metadata_json).unwrap();
171 add_bytes_to_zip(&mut zip, jfr_filename, jfr_file).map_err(S3ReporterError::ZipIoError)?;
172 add_bytes_to_zip(&mut zip, "metadata.json", metadata.as_bytes())
173 .map_err(S3ReporterError::ZipIoError)?;
174 Ok(zip.finish()?.into_inner())
175}
176
177fn add_bytes_to_zip(
178 zip: &mut ZipWriter<Cursor<Vec<u8>>>,
179 filename: &str,
180 data: &[u8],
181) -> Result<(), std::io::Error> {
182 let options = SimpleFileOptions::default().compression_method(zip::CompressionMethod::Deflated);
183 zip.start_file(filename, options)?;
184 zip.write_all(data)?;
185
186 Ok(())
187}
188
189async fn send_profile_data(
190 s3_client: &aws_sdk_s3::Client,
191 bucket_owner: String,
192 bucket_name: String,
193 object_name: String,
194 zip: Vec<u8>,
195) -> Result<(), S3ReporterError> {
196 tracing::debug!(message="uploading to s3", bucket_name=?bucket_name, object_name=?object_name);
197 s3_client
199 .put_object()
200 .expected_bucket_owner(bucket_owner)
201 .bucket(bucket_name)
202 .key(object_name)
203 .body(zip.into())
204 .content_type("application/zip")
205 .send()
206 .await
207 .map_err(|x| S3ReporterError::SendProfileS3Data(Box::new(x.into())))?;
208 Ok(())
209}