posthog_cli/commands/sourcemap/
upload.rs1use core::str;
2use std::collections::HashMap;
3use std::path::PathBuf;
4
5use anyhow::{anyhow, bail, Context, Ok, Result};
6use rayon::iter::{IntoParallelIterator, ParallelIterator};
7use reqwest::blocking::multipart::{Form, Part};
8use reqwest::blocking::Client;
9use serde::{Deserialize, Serialize};
10use sha2::Digest;
11use tracing::{info, warn};
12
13use crate::commands::UploadArgs;
14use crate::utils::auth::load_token;
15use crate::utils::posthog::capture_command_invoked;
16use crate::utils::release::{create_release, CreateReleaseResponse};
17use crate::utils::sourcemaps::{read_pairs, ChunkUpload, SourcePair};
18
19const MAX_FILE_SIZE: usize = 100 * 1024 * 1024; #[derive(Debug, Serialize, Deserialize, Clone)]
22struct StartUploadResponseData {
23 presigned_url: PresignedUrl,
24 symbol_set_id: String,
25}
26
27#[derive(Serialize, Deserialize, Debug, Clone)]
28pub struct PresignedUrl {
29 pub url: String,
30 pub fields: HashMap<String, String>,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34struct BulkUploadStartRequest {
35 release_id: Option<String>,
36 chunk_ids: Vec<String>,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40struct BulkUploadStartResponse {
41 id_map: HashMap<String, StartUploadResponseData>,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
45struct BulkUploadFinishRequest {
46 content_hashes: HashMap<String, String>,
47}
48
49pub fn upload(host: Option<String>, args: UploadArgs) -> Result<()> {
50 let UploadArgs {
51 directory,
52 project,
53 ignore,
54 version,
55 delete_after,
56 skip_ssl_verification,
57 batch_size,
58 } = args;
59
60 let token = load_token().context("While starting upload command")?;
61 let host = token.get_host(host.as_deref());
62
63 let capture_handle = capture_command_invoked("sourcemap_upload", Some(&token.env_id));
64
65 let base_url = format!(
66 "{}/api/environments/{}/error_tracking/symbol_sets",
67 host, token.env_id
68 );
69
70 let pairs = read_pairs(&directory, &ignore)?;
71 let sourcemap_paths = pairs
72 .iter()
73 .map(|pair| pair.sourcemap.path.clone())
74 .collect::<Vec<_>>();
75 info!("Found {} chunks to upload", pairs.len());
76
77 let uploads = collect_uploads(pairs).context("While preparing files for upload")?;
78 let release = create_release(
79 &host,
80 &token,
81 Some(directory.clone()),
82 Some(content_hash(uploads.iter().map(|upload| &upload.data))),
83 project,
84 version,
85 skip_ssl_verification,
86 )
87 .context("While creating release")?;
88
89 let batched_uploads = into_batches(uploads, batch_size);
90
91 for batch_upload in batched_uploads {
92 upload_chunks(
98 &base_url,
99 &token.token,
100 batch_upload,
101 release.as_ref(),
102 skip_ssl_verification,
103 )?;
104 }
105
106 if delete_after {
107 delete_files(sourcemap_paths).context("While deleting sourcemaps")?;
108 }
109
110 let _ = capture_handle.join();
111
112 Ok(())
113}
114
115fn into_batches<T>(mut array: Vec<T>, batch_size: usize) -> Vec<Vec<T>> {
116 let mut batches = Vec::new();
117 while !array.is_empty() {
118 let take = array.len().min(batch_size);
119 let chunk: Vec<_> = array.drain(..take).collect();
120 batches.push(chunk);
121 }
122 batches
123}
124
125fn collect_uploads(pairs: Vec<SourcePair>) -> Result<Vec<ChunkUpload>> {
126 let uploads: Vec<ChunkUpload> = pairs
127 .into_iter()
128 .map(|pair| pair.into_chunk_upload())
129 .collect::<Result<Vec<ChunkUpload>>>()?;
130 Ok(uploads)
131}
132
133fn upload_chunks(
134 base_url: &str,
135 token: &str,
136 uploads: Vec<ChunkUpload>,
137 release: Option<&CreateReleaseResponse>,
138 skip_ssl_verification: bool,
139) -> Result<()> {
140 let client = reqwest::blocking::Client::builder()
141 .danger_accept_invalid_certs(skip_ssl_verification)
142 .build()?;
143
144 let release_id = release.map(|r| r.id.to_string());
145 let chunk_ids = uploads
146 .iter()
147 .filter(|u| {
148 if u.data.len() > MAX_FILE_SIZE {
149 warn!(
150 "Skipping chunk {} because the file size is too large ({})",
151 u.chunk_id,
152 u.data.len()
153 );
154 false
155 } else {
156 true
157 }
158 })
159 .map(|u| u.chunk_id.clone())
160 .collect::<Vec<String>>();
161
162 let start_response = start_upload(&client, base_url, token, chunk_ids, &release_id)?;
163
164 let id_map: HashMap<_, _> = uploads
165 .into_iter()
166 .map(|u| (u.chunk_id.clone(), u))
167 .collect();
168
169 let res: Result<HashMap<String, String>> = start_response
170 .id_map
171 .into_par_iter()
172 .map(|(chunk_id, data)| {
173 info!("Uploading chunk {}", chunk_id);
174 let upload = id_map.get(&chunk_id).ok_or(anyhow!(
175 "Got a chunk ID back from posthog that we didn't expect!"
176 ))?;
177
178 let content_hash = content_hash([&upload.data]);
179 upload_to_s3(&client, data.presigned_url.clone(), &upload.data)?;
180 Ok((data.symbol_set_id, content_hash))
181 })
182 .collect();
183
184 let content_hashes = res?;
185
186 finish_upload(&client, base_url, token, content_hashes)?;
187
188 Ok(())
189}
190
191fn start_upload(
192 client: &Client,
193 base_url: &str,
194 auth_token: &str,
195 chunk_ids: Vec<String>,
196 release_id: &Option<String>,
197) -> Result<BulkUploadStartResponse> {
198 let start_upload_url: String = format!("{}{}", base_url, "/bulk_start_upload");
199
200 let request = BulkUploadStartRequest {
201 chunk_ids,
202 release_id: release_id.clone(),
203 };
204
205 let res = client
206 .post(&start_upload_url)
207 .header("Authorization", format!("Bearer {auth_token}"))
208 .json(&request)
209 .send()
210 .context(format!("While starting upload to {start_upload_url}"))?;
211
212 if !res.status().is_success() {
213 bail!("Failed to start upload: {:?}", res);
214 }
215
216 Ok(res.json()?)
217}
218
219fn upload_to_s3(client: &Client, presigned_url: PresignedUrl, data: &[u8]) -> Result<()> {
220 let mut last_err = None;
221 let mut delay = std::time::Duration::from_millis(500);
222 for attempt in 1..=3 {
223 let mut form = Form::new();
224 for (key, value) in &presigned_url.fields {
225 form = form.text(key.clone(), value.clone());
226 }
227 let part = Part::bytes(data.to_vec());
228 form = form.part("file", part);
229
230 let res = client.post(&presigned_url.url).multipart(form).send();
231
232 match res {
233 Result::Ok(resp) if resp.status().is_success() => {
234 return Ok(());
235 }
236 Result::Ok(resp) => {
237 last_err = Some(anyhow!("Failed to upload chunk: {:?}", resp));
238 }
239 Result::Err(e) => {
240 last_err = Some(anyhow!("Failed to upload chunk: {}", e));
241 }
242 }
243 if attempt < 3 {
244 warn!(
245 "Upload attempt {} failed, retrying in {:?}...",
246 attempt, delay
247 );
248 std::thread::sleep(delay);
249 delay *= 2;
250 }
251 }
252 Err(last_err.unwrap_or_else(|| anyhow!("Unknown error during upload")))
253}
254
255fn finish_upload(
256 client: &Client,
257 base_url: &str,
258 auth_token: &str,
259 content_hashes: HashMap<String, String>,
260) -> Result<()> {
261 let finish_upload_url: String = format!("{}/{}", base_url, "bulk_finish_upload");
262 let request = BulkUploadFinishRequest { content_hashes };
263
264 let res = client
265 .post(finish_upload_url)
266 .header("Authorization", format!("Bearer {auth_token}"))
267 .header("Content-Type", "application/json")
268 .json(&request)
269 .send()
270 .context(format!("While finishing upload to {base_url}"))?;
271
272 if !res.status().is_success() {
273 bail!("Failed to finish upload: {:?}", res);
274 }
275
276 Ok(())
277}
278
279fn content_hash<Iter, Item>(upload_data: Iter) -> String
280where
281 Iter: IntoIterator<Item = Item>,
282 Item: AsRef<[u8]>,
283{
284 let mut hasher = sha2::Sha512::new();
285 for data in upload_data {
286 hasher.update(data.as_ref());
287 }
288 format!("{:x}", hasher.finalize())
289}
290
291fn delete_files(paths: Vec<PathBuf>) -> Result<()> {
292 for path in paths {
294 if path.exists() {
295 std::fs::remove_file(&path).context(format!(
296 "Failed to delete sourcemaps file: {}",
297 path.display()
298 ))?;
299 }
300 }
301 Ok(())
302}