posthog_cli/commands/sourcemap/
upload.rs1use core::str;
2use std::collections::HashMap;
3use std::path::PathBuf;
4
5use anyhow::{anyhow, bail, Context, Ok, Result};
6use reqwest::blocking::multipart::{Form, Part};
7use reqwest::blocking::Client;
8use serde::{Deserialize, Serialize};
9use sha2::Digest;
10use tracing::{info, warn};
11
12use crate::utils::auth::load_token;
13use crate::utils::posthog::capture_command_invoked;
14use crate::utils::release::{create_release, CreateReleaseResponse};
15use crate::utils::sourcemaps::{read_pairs, ChunkUpload, SourcePair};
16
17const MAX_FILE_SIZE: usize = 100 * 1024 * 1024; #[derive(Debug, Serialize, Deserialize, Clone)]
20struct StartUploadResponseData {
21 presigned_url: PresignedUrl,
22 symbol_set_id: String,
23}
24
25#[derive(Serialize, Deserialize, Debug, Clone)]
26pub struct PresignedUrl {
27 pub url: String,
28 pub fields: HashMap<String, String>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32struct BulkUploadStartRequest {
33 release_id: Option<String>,
34 chunk_ids: Vec<String>,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38struct BulkUploadStartResponse {
39 id_map: HashMap<String, StartUploadResponseData>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
43struct BulkUploadFinishRequest {
44 content_hashes: HashMap<String, String>,
45}
46
47pub fn upload(
48 host: Option<String>,
49 directory: &PathBuf,
50 project: Option<String>,
51 version: Option<String>,
52 delete_after: bool,
53) -> Result<()> {
54 let token = load_token().context("While starting upload command")?;
55 let host = token.get_host(host.as_deref());
56
57 let capture_handle = capture_command_invoked("sourcemap_upload", Some(&token.env_id));
58
59 let base_url = format!(
60 "{}/api/environments/{}/error_tracking/symbol_sets",
61 host, token.env_id
62 );
63
64 let pairs = read_pairs(directory)?;
65 let sourcemap_paths = pairs
66 .iter()
67 .map(|pair| pair.sourcemap.path.clone())
68 .collect::<Vec<_>>();
69
70 let uploads = collect_uploads(pairs).context("While preparing files for upload")?;
71 info!("Found {} chunks to upload", uploads.len());
72
73 let release = create_release(
79 &host,
80 &token,
81 Some(directory.clone()),
82 Some(content_hash(uploads.iter().map(|upload| &upload.data))),
83 project,
84 version,
85 )
86 .context("While creating release")?;
87
88 upload_chunks(&base_url, &token.token, uploads, release.as_ref())?;
89
90 if delete_after {
91 delete_files(sourcemap_paths).context("While deleting sourcemaps")?;
92 }
93
94 let _ = capture_handle.join();
95
96 Ok(())
97}
98
99fn collect_uploads(pairs: Vec<SourcePair>) -> Result<Vec<ChunkUpload>> {
100 let uploads: Vec<ChunkUpload> = pairs
101 .into_iter()
102 .map(|pair| pair.into_chunk_upload())
103 .collect::<Result<Vec<ChunkUpload>>>()?;
104 Ok(uploads)
105}
106
107fn upload_chunks(
108 base_url: &str,
109 token: &str,
110 uploads: Vec<ChunkUpload>,
111 release: Option<&CreateReleaseResponse>,
112) -> Result<()> {
113 let client = reqwest::blocking::Client::new();
114 let release_id = release.map(|r| r.id.to_string());
115 let chunk_ids = uploads
116 .iter()
117 .filter(|u| {
118 if u.data.len() > MAX_FILE_SIZE {
119 warn!(
120 "Skipping chunk {} because the file size is too large ({})",
121 u.chunk_id,
122 u.data.len()
123 );
124 false
125 } else {
126 true
127 }
128 })
129 .map(|u| u.chunk_id.clone())
130 .collect::<Vec<String>>();
131
132 let start_response = start_upload(&client, base_url, token, chunk_ids, &release_id)?;
133
134 let mut id_map: HashMap<_, _> = uploads
135 .into_iter()
136 .map(|u| (u.chunk_id.clone(), u))
137 .collect();
138
139 let mut content_hashes = HashMap::new();
140
141 for (chunk_id, data) in start_response.id_map.into_iter() {
142 info!("Uploading chunk {}", chunk_id);
143 let upload = id_map.remove(&chunk_id).ok_or(anyhow!(
144 "Got a chunk ID back from posthog that we didn't expect!"
145 ))?;
146
147 let content_hash = content_hash([&upload.data]);
148
149 upload_to_s3(&client, data.presigned_url.clone(), upload.data)?;
150
151 content_hashes.insert(data.symbol_set_id.clone(), content_hash);
152 }
153
154 finish_upload(&client, base_url, token, content_hashes)?;
155
156 Ok(())
157}
158
159fn start_upload(
160 client: &Client,
161 base_url: &str,
162 auth_token: &str,
163 chunk_ids: Vec<String>,
164 release_id: &Option<String>,
165) -> Result<BulkUploadStartResponse> {
166 let start_upload_url: String = format!("{}{}", base_url, "/bulk_start_upload");
167
168 let request = BulkUploadStartRequest {
169 chunk_ids,
170 release_id: release_id.clone(),
171 };
172
173 let res = client
174 .post(&start_upload_url)
175 .header("Authorization", format!("Bearer {}", auth_token))
176 .json(&request)
177 .send()
178 .context(format!("While starting upload to {}", start_upload_url))?;
179
180 if !res.status().is_success() {
181 bail!("Failed to start upload: {:?}", res);
182 }
183
184 Ok(res.json()?)
185}
186
187fn upload_to_s3(client: &Client, presigned_url: PresignedUrl, data: Vec<u8>) -> Result<()> {
188 let mut form = Form::new();
189
190 for (key, value) in presigned_url.fields {
191 form = form.text(key.clone(), value.clone());
192 }
193
194 let part = Part::bytes(data);
195 form = form.part("file", part);
196
197 let res = client
198 .post(&presigned_url.url)
199 .multipart(form)
200 .send()
201 .context(format!("While uploading chunk to {}", presigned_url.url))?;
202
203 if !res.status().is_success() {
204 bail!("Failed to upload chunk: {:?}", res);
205 }
206
207 Ok(())
208}
209
210fn finish_upload(
211 client: &Client,
212 base_url: &str,
213 auth_token: &str,
214 content_hashes: HashMap<String, String>,
215) -> Result<()> {
216 let finish_upload_url: String = format!("{}/{}", base_url, "bulk_finish_upload");
217 let request = BulkUploadFinishRequest { content_hashes };
218
219 let res = client
220 .post(finish_upload_url)
221 .header("Authorization", format!("Bearer {}", auth_token))
222 .header("Content-Type", "application/json")
223 .json(&request)
224 .send()
225 .context(format!("While finishing upload to {}", base_url))?;
226
227 if !res.status().is_success() {
228 bail!("Failed to finish upload: {:?}", res);
229 }
230
231 Ok(())
232}
233
234fn content_hash<Iter, Item>(upload_data: Iter) -> String
235where
236 Iter: IntoIterator<Item = Item>,
237 Item: AsRef<[u8]>,
238{
239 let mut hasher = sha2::Sha512::new();
240 for data in upload_data {
241 hasher.update(data.as_ref());
242 }
243 format!("{:x}", hasher.finalize())
244}
245
246fn delete_files(paths: Vec<PathBuf>) -> Result<()> {
247 for path in paths {
249 if path.exists() {
250 std::fs::remove_file(&path).context(format!(
251 "Failed to delete sourcemaps file: {}",
252 path.display()
253 ))?;
254 }
255 }
256 Ok(())
257}