posthog_cli/commands/sourcemap/
upload.rs

1use core::str;
2use std::collections::HashMap;
3use std::path::PathBuf;
4
5use anyhow::{anyhow, bail, Context, Ok, Result};
6use rayon::iter::{IntoParallelIterator, ParallelIterator};
7use reqwest::blocking::multipart::{Form, Part};
8use reqwest::blocking::Client;
9use serde::{Deserialize, Serialize};
10use sha2::Digest;
11use tracing::{info, warn};
12
13use crate::commands::UploadArgs;
14use crate::utils::auth::load_token;
15use crate::utils::posthog::capture_command_invoked;
16use crate::utils::release::{create_release, CreateReleaseResponse};
17use crate::utils::sourcemaps::{read_pairs, ChunkUpload, SourcePair};
18
19const MAX_FILE_SIZE: usize = 100 * 1024 * 1024; // 100 MB
20
21#[derive(Debug, Serialize, Deserialize, Clone)]
22struct StartUploadResponseData {
23    presigned_url: PresignedUrl,
24    symbol_set_id: String,
25}
26
27#[derive(Serialize, Deserialize, Debug, Clone)]
28pub struct PresignedUrl {
29    pub url: String,
30    pub fields: HashMap<String, String>,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34struct BulkUploadStartRequest {
35    release_id: Option<String>,
36    chunk_ids: Vec<String>,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40struct BulkUploadStartResponse {
41    id_map: HashMap<String, StartUploadResponseData>,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
45struct BulkUploadFinishRequest {
46    content_hashes: HashMap<String, String>,
47}
48
49pub fn upload(host: Option<String>, args: UploadArgs) -> Result<()> {
50    let UploadArgs {
51        directory,
52        project,
53        ignore,
54        version,
55        delete_after,
56        skip_ssl_verification,
57        batch_size,
58    } = args;
59
60    let token = load_token().context("While starting upload command")?;
61    let host = token.get_host(host.as_deref());
62
63    let capture_handle = capture_command_invoked("sourcemap_upload", Some(&token.env_id));
64
65    let base_url = format!(
66        "{}/api/environments/{}/error_tracking/symbol_sets",
67        host, token.env_id
68    );
69
70    let pairs = read_pairs(&directory, &ignore)?;
71    let sourcemap_paths = pairs
72        .iter()
73        .map(|pair| pair.sourcemap.path.clone())
74        .collect::<Vec<_>>();
75    info!("Found {} chunks to upload", pairs.len());
76
77    let uploads = collect_uploads(pairs).context("While preparing files for upload")?;
78    let release = create_release(
79        &host,
80        &token,
81        Some(directory.clone()),
82        Some(content_hash(uploads.iter().map(|upload| &upload.data))),
83        project,
84        version,
85        skip_ssl_verification,
86    )
87    .context("While creating release")?;
88
89    let batched_uploads = into_batches(uploads, batch_size);
90
91    for batch_upload in batched_uploads {
92        // See if we have enough information to create a release object
93        // TODO - The use of a hash_id here means repeated attempts to upload the same data will fail.
94        //        We could relax this, such that we instead replace the existing release with the new one,
95        //        or we could even just allow adding new chunks to an existing release, but for now I'm
96        //        leaving it like this... Reviewers, lets chat about the right approach here
97        upload_chunks(
98            &base_url,
99            &token.token,
100            batch_upload,
101            release.as_ref(),
102            skip_ssl_verification,
103        )?;
104    }
105
106    if delete_after {
107        delete_files(sourcemap_paths).context("While deleting sourcemaps")?;
108    }
109
110    let _ = capture_handle.join();
111
112    Ok(())
113}
114
115fn into_batches<T>(mut array: Vec<T>, batch_size: usize) -> Vec<Vec<T>> {
116    let mut batches = Vec::new();
117    while !array.is_empty() {
118        let take = array.len().min(batch_size);
119        let chunk: Vec<_> = array.drain(..take).collect();
120        batches.push(chunk);
121    }
122    batches
123}
124
125fn collect_uploads(pairs: Vec<SourcePair>) -> Result<Vec<ChunkUpload>> {
126    let uploads: Vec<ChunkUpload> = pairs
127        .into_iter()
128        .map(|pair| pair.into_chunk_upload())
129        .collect::<Result<Vec<ChunkUpload>>>()?;
130    Ok(uploads)
131}
132
133fn upload_chunks(
134    base_url: &str,
135    token: &str,
136    uploads: Vec<ChunkUpload>,
137    release: Option<&CreateReleaseResponse>,
138    skip_ssl_verification: bool,
139) -> Result<()> {
140    let client = reqwest::blocking::Client::builder()
141        .danger_accept_invalid_certs(skip_ssl_verification)
142        .build()?;
143
144    let release_id = release.map(|r| r.id.to_string());
145    let chunk_ids = uploads
146        .iter()
147        .filter(|u| {
148            if u.data.len() > MAX_FILE_SIZE {
149                warn!(
150                    "Skipping chunk {} because the file size is too large ({})",
151                    u.chunk_id,
152                    u.data.len()
153                );
154                false
155            } else {
156                true
157            }
158        })
159        .map(|u| u.chunk_id.clone())
160        .collect::<Vec<String>>();
161
162    let start_response = start_upload(&client, base_url, token, chunk_ids, &release_id)?;
163
164    let id_map: HashMap<_, _> = uploads
165        .into_iter()
166        .map(|u| (u.chunk_id.clone(), u))
167        .collect();
168
169    let res: Result<HashMap<String, String>> = start_response
170        .id_map
171        .into_par_iter()
172        .map(|(chunk_id, data)| {
173            info!("Uploading chunk {}", chunk_id);
174            let upload = id_map.get(&chunk_id).ok_or(anyhow!(
175                "Got a chunk ID back from posthog that we didn't expect!"
176            ))?;
177
178            let content_hash = content_hash([&upload.data]);
179            upload_to_s3(&client, data.presigned_url.clone(), &upload.data)?;
180            Ok((data.symbol_set_id, content_hash))
181        })
182        .collect();
183
184    let content_hashes = res?;
185
186    finish_upload(&client, base_url, token, content_hashes)?;
187
188    Ok(())
189}
190
191fn start_upload(
192    client: &Client,
193    base_url: &str,
194    auth_token: &str,
195    chunk_ids: Vec<String>,
196    release_id: &Option<String>,
197) -> Result<BulkUploadStartResponse> {
198    let start_upload_url: String = format!("{}{}", base_url, "/bulk_start_upload");
199
200    let request = BulkUploadStartRequest {
201        chunk_ids,
202        release_id: release_id.clone(),
203    };
204
205    let res = client
206        .post(&start_upload_url)
207        .header("Authorization", format!("Bearer {auth_token}"))
208        .json(&request)
209        .send()
210        .context(format!("While starting upload to {start_upload_url}"))?;
211
212    if !res.status().is_success() {
213        bail!("Failed to start upload: {:?}", res);
214    }
215
216    Ok(res.json()?)
217}
218
219fn upload_to_s3(client: &Client, presigned_url: PresignedUrl, data: &[u8]) -> Result<()> {
220    let mut last_err = None;
221    let mut delay = std::time::Duration::from_millis(500);
222    for attempt in 1..=3 {
223        let mut form = Form::new();
224        for (key, value) in &presigned_url.fields {
225            form = form.text(key.clone(), value.clone());
226        }
227        let part = Part::bytes(data.to_vec());
228        form = form.part("file", part);
229
230        let res = client.post(&presigned_url.url).multipart(form).send();
231
232        match res {
233            Result::Ok(resp) if resp.status().is_success() => {
234                return Ok(());
235            }
236            Result::Ok(resp) => {
237                last_err = Some(anyhow!("Failed to upload chunk: {:?}", resp));
238            }
239            Result::Err(e) => {
240                last_err = Some(anyhow!("Failed to upload chunk: {}", e));
241            }
242        }
243        if attempt < 3 {
244            warn!(
245                "Upload attempt {} failed, retrying in {:?}...",
246                attempt, delay
247            );
248            std::thread::sleep(delay);
249            delay *= 2;
250        }
251    }
252    Err(last_err.unwrap_or_else(|| anyhow!("Unknown error during upload")))
253}
254
255fn finish_upload(
256    client: &Client,
257    base_url: &str,
258    auth_token: &str,
259    content_hashes: HashMap<String, String>,
260) -> Result<()> {
261    let finish_upload_url: String = format!("{}/{}", base_url, "bulk_finish_upload");
262    let request = BulkUploadFinishRequest { content_hashes };
263
264    let res = client
265        .post(finish_upload_url)
266        .header("Authorization", format!("Bearer {auth_token}"))
267        .header("Content-Type", "application/json")
268        .json(&request)
269        .send()
270        .context(format!("While finishing upload to {base_url}"))?;
271
272    if !res.status().is_success() {
273        bail!("Failed to finish upload: {:?}", res);
274    }
275
276    Ok(())
277}
278
279fn content_hash<Iter, Item>(upload_data: Iter) -> String
280where
281    Iter: IntoIterator<Item = Item>,
282    Item: AsRef<[u8]>,
283{
284    let mut hasher = sha2::Sha512::new();
285    for data in upload_data {
286        hasher.update(data.as_ref());
287    }
288    format!("{:x}", hasher.finalize())
289}
290
291fn delete_files(paths: Vec<PathBuf>) -> Result<()> {
292    // Delete local sourcemaps files from the sourcepair
293    for path in paths {
294        if path.exists() {
295            std::fs::remove_file(&path).context(format!(
296                "Failed to delete sourcemaps file: {}",
297                path.display()
298            ))?;
299        }
300    }
301    Ok(())
302}