posthog_cli/commands/sourcemap/
upload.rs1use core::str;
2use std::collections::HashMap;
3use std::path::PathBuf;
4
5use anyhow::{bail, Context, Ok, Result};
6use reqwest::blocking::multipart::{Form, Part};
7use reqwest::blocking::Client;
8use serde::{Deserialize, Serialize};
9use sha2::Digest;
10use tracing::{info, warn};
11
12use crate::utils::auth::load_token;
13use crate::utils::posthog::capture_command_invoked;
14use crate::utils::release::{create_release, CreateReleaseResponse};
15use crate::utils::sourcemaps::{read_pairs, ChunkUpload, SourcePair};
16
17const MAX_FILE_SIZE: usize = 100 * 1024 * 1024; #[derive(Debug, Deserialize)]
20struct StartUploadResponseData {
21 presigned_url: PresignedUrl,
22 symbol_set_id: String,
23}
24
25#[derive(Deserialize, Debug)]
26pub struct PresignedUrl {
27 pub url: String,
28 pub fields: HashMap<String, String>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32struct FinishUploadRequest {
33 pub content_hash: String,
34}
35
36pub fn upload(
37 host: Option<String>,
38 directory: &PathBuf,
39 project: Option<String>,
40 version: Option<String>,
41 delete_after: bool,
42) -> Result<()> {
43 let token = load_token().context("While starting upload command")?;
44 let host = token.get_host(host.as_deref());
45
46 let capture_handle = capture_command_invoked("sourcemap_upload", Some(&token.env_id));
47
48 let base_url = format!(
49 "{}/api/environments/{}/error_tracking/symbol_sets",
50 host, token.env_id
51 );
52
53 let pairs = read_pairs(directory)?;
54 let sourcemap_paths = pairs
55 .iter()
56 .map(|pair| pair.sourcemap.path.clone())
57 .collect::<Vec<_>>();
58
59 let uploads = collect_uploads(pairs).context("While preparing files for upload")?;
60 info!("Found {} chunks to upload", uploads.len());
61
62 let release = create_release(
68 &host,
69 &token,
70 Some(directory.clone()),
71 Some(content_hash(uploads.iter().map(|upload| &upload.data))),
72 project,
73 version,
74 )
75 .context("While creating release")?;
76
77 upload_chunks(&base_url, &token.token, uploads, release.as_ref())?;
78
79 if delete_after {
80 delete_files(sourcemap_paths).context("While deleting sourcemaps")?;
81 }
82
83 let _ = capture_handle.join();
84
85 Ok(())
86}
87
88fn collect_uploads(pairs: Vec<SourcePair>) -> Result<Vec<ChunkUpload>> {
89 let uploads: Vec<ChunkUpload> = pairs
90 .into_iter()
91 .map(|pair| pair.into_chunk_upload())
92 .collect::<Result<Vec<ChunkUpload>>>()?;
93 Ok(uploads)
94}
95
96fn upload_chunks(
97 base_url: &str,
98 token: &str,
99 uploads: Vec<ChunkUpload>,
100 release: Option<&CreateReleaseResponse>,
101) -> Result<()> {
102 let client = reqwest::blocking::Client::new();
103 let release_id = release.map(|r| r.id.to_string());
104 for upload in uploads {
105 info!("Uploading chunk {}", upload.chunk_id);
106
107 let upload_size = upload.data.len();
108 if upload_size > MAX_FILE_SIZE {
109 warn!(
110 "Skipping chunk {} because the file size is too large ({})",
111 upload.chunk_id, upload_size
112 );
113 continue;
114 }
115
116 let upload_response =
117 start_upload(&client, base_url, token, &upload.chunk_id, &release_id)?;
118
119 let content_hash = content_hash([&upload.data]);
120
121 upload_to_s3(&client, upload_response.presigned_url, upload.data)?;
122
123 finish_upload(
124 &client,
125 base_url,
126 token,
127 upload_response.symbol_set_id,
128 content_hash,
129 )?;
130 }
131
132 Ok(())
133}
134
135fn start_upload(
136 client: &Client,
137 base_url: &str,
138 auth_token: &str,
139 chunk_id: &str,
140 release_id: &Option<String>,
141) -> Result<StartUploadResponseData> {
142 let start_upload_url: String = format!("{}{}", base_url, "/start_upload");
143
144 let mut params = vec![("chunk_id", chunk_id)];
145 if let Some(id) = release_id {
146 params.push(("release_id", id));
147 }
148
149 let res = client
150 .post(&start_upload_url)
151 .header("Authorization", format!("Bearer {}", auth_token))
152 .query(¶ms)
153 .send()
154 .context(format!("While starting upload to {}", start_upload_url))?;
155
156 if !res.status().is_success() {
157 bail!("Failed to start upload: {:?}", res);
158 }
159
160 let data: StartUploadResponseData = res.json()?;
161 Ok(data)
162}
163
164fn upload_to_s3(client: &Client, presigned_url: PresignedUrl, data: Vec<u8>) -> Result<()> {
165 let mut form = Form::new();
166
167 for (key, value) in presigned_url.fields {
168 form = form.text(key.clone(), value.clone());
169 }
170
171 let part = Part::bytes(data);
172 form = form.part("file", part);
173
174 let res = client
175 .post(&presigned_url.url)
176 .multipart(form)
177 .send()
178 .context(format!("While uploading chunk to {}", presigned_url.url))?;
179
180 if !res.status().is_success() {
181 bail!("Failed to upload chunk: {:?}", res);
182 }
183
184 Ok(())
185}
186
187fn finish_upload(
188 client: &Client,
189 base_url: &str,
190 auth_token: &str,
191 symbol_set_id: String,
192 content_hash: String,
193) -> Result<()> {
194 let finish_upload_url: String = format!("{}/{}/{}", base_url, symbol_set_id, "finish_upload");
195 let request = FinishUploadRequest { content_hash };
196
197 let res = client
198 .put(finish_upload_url)
199 .header("Authorization", format!("Bearer {}", auth_token))
200 .header("Content-Type", "application/json")
201 .json(&request)
202 .send()
203 .context(format!("While finishing upload to {}", base_url))?;
204
205 if !res.status().is_success() {
206 bail!("Failed to finish upload: {:?}", res);
207 }
208
209 Ok(())
210}
211
212fn content_hash<Iter, Item>(upload_data: Iter) -> String
213where
214 Iter: IntoIterator<Item = Item>,
215 Item: AsRef<[u8]>,
216{
217 let mut hasher = sha2::Sha512::new();
218 for data in upload_data {
219 hasher.update(data.as_ref());
220 }
221 format!("{:x}", hasher.finalize())
222}
223
224fn delete_files(paths: Vec<PathBuf>) -> Result<()> {
225 for path in paths {
227 if path.exists() {
228 std::fs::remove_file(&path).context(format!(
229 "Failed to delete sourcemaps file: {}",
230 path.display()
231 ))?;
232 }
233 }
234 Ok(())
235}