posthog_cli/commands/sourcemap/
upload.rs

1use core::str;
2use std::collections::HashMap;
3use std::path::PathBuf;
4
5use anyhow::{bail, Context, Ok, Result};
6use reqwest::blocking::multipart::{Form, Part};
7use reqwest::blocking::Client;
8use serde::{Deserialize, Serialize};
9use sha2::Digest;
10use tracing::{info, warn};
11
12use crate::utils::auth::load_token;
13use crate::utils::posthog::capture_command_invoked;
14use crate::utils::release::{create_release, CreateReleaseResponse};
15use crate::utils::sourcemaps::{read_pairs, ChunkUpload, SourcePair};
16
17const MAX_FILE_SIZE: usize = 100 * 1024 * 1024; // 100 MB
18
19#[derive(Debug, Deserialize)]
20struct StartUploadResponseData {
21    presigned_url: PresignedUrl,
22    symbol_set_id: String,
23}
24
25#[derive(Deserialize, Debug)]
26pub struct PresignedUrl {
27    pub url: String,
28    pub fields: HashMap<String, String>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32struct FinishUploadRequest {
33    pub content_hash: String,
34}
35
36pub fn upload(
37    host: Option<String>,
38    directory: &PathBuf,
39    project: Option<String>,
40    version: Option<String>,
41    delete_after: bool,
42) -> Result<()> {
43    let token = load_token().context("While starting upload command")?;
44    let host = token.get_host(host.as_deref());
45
46    let capture_handle = capture_command_invoked("sourcemap_upload", Some(&token.env_id));
47
48    let base_url = format!(
49        "{}/api/environments/{}/error_tracking/symbol_sets",
50        host, token.env_id
51    );
52
53    let pairs = read_pairs(directory)?;
54    let sourcemap_paths = pairs
55        .iter()
56        .map(|pair| pair.sourcemap.path.clone())
57        .collect::<Vec<_>>();
58
59    let uploads = collect_uploads(pairs).context("While preparing files for upload")?;
60    info!("Found {} chunks to upload", uploads.len());
61
62    // See if we have enough information to create a release object
63    // TODO - The use of a hash_id here means repeated attempts to upload the same data will fail.
64    //        We could relax this, such that we instead replace the existing release with the new one,
65    //        or we could even just allow adding new chunks to an existing release, but for now I'm
66    //        leaving it like this... Reviewers, lets chat about the right approach here
67    let release = create_release(
68        &host,
69        &token,
70        Some(directory.clone()),
71        Some(content_hash(uploads.iter().map(|upload| &upload.data))),
72        project,
73        version,
74    )
75    .context("While creating release")?;
76
77    upload_chunks(&base_url, &token.token, uploads, release.as_ref())?;
78
79    if delete_after {
80        delete_files(sourcemap_paths).context("While deleting sourcemaps")?;
81    }
82
83    let _ = capture_handle.join();
84
85    Ok(())
86}
87
88fn collect_uploads(pairs: Vec<SourcePair>) -> Result<Vec<ChunkUpload>> {
89    let uploads: Vec<ChunkUpload> = pairs
90        .into_iter()
91        .map(|pair| pair.into_chunk_upload())
92        .collect::<Result<Vec<ChunkUpload>>>()?;
93    Ok(uploads)
94}
95
96fn upload_chunks(
97    base_url: &str,
98    token: &str,
99    uploads: Vec<ChunkUpload>,
100    release: Option<&CreateReleaseResponse>,
101) -> Result<()> {
102    let client = reqwest::blocking::Client::new();
103    let release_id = release.map(|r| r.id.to_string());
104    for upload in uploads {
105        info!("Uploading chunk {}", upload.chunk_id);
106
107        let upload_size = upload.data.len();
108        if upload_size > MAX_FILE_SIZE {
109            warn!(
110                "Skipping chunk {} because the file size is too large ({})",
111                upload.chunk_id, upload_size
112            );
113            continue;
114        }
115
116        let upload_response =
117            start_upload(&client, base_url, token, &upload.chunk_id, &release_id)?;
118
119        let content_hash = content_hash([&upload.data]);
120
121        upload_to_s3(&client, upload_response.presigned_url, upload.data)?;
122
123        finish_upload(
124            &client,
125            base_url,
126            token,
127            upload_response.symbol_set_id,
128            content_hash,
129        )?;
130    }
131
132    Ok(())
133}
134
135fn start_upload(
136    client: &Client,
137    base_url: &str,
138    auth_token: &str,
139    chunk_id: &str,
140    release_id: &Option<String>,
141) -> Result<StartUploadResponseData> {
142    let start_upload_url: String = format!("{}{}", base_url, "/start_upload");
143
144    let mut params = vec![("chunk_id", chunk_id)];
145    if let Some(id) = release_id {
146        params.push(("release_id", id));
147    }
148
149    let res = client
150        .post(&start_upload_url)
151        .header("Authorization", format!("Bearer {}", auth_token))
152        .query(&params)
153        .send()
154        .context(format!("While starting upload to {}", start_upload_url))?;
155
156    if !res.status().is_success() {
157        bail!("Failed to start upload: {:?}", res);
158    }
159
160    let data: StartUploadResponseData = res.json()?;
161    Ok(data)
162}
163
164fn upload_to_s3(client: &Client, presigned_url: PresignedUrl, data: Vec<u8>) -> Result<()> {
165    let mut form = Form::new();
166
167    for (key, value) in presigned_url.fields {
168        form = form.text(key.clone(), value.clone());
169    }
170
171    let part = Part::bytes(data);
172    form = form.part("file", part);
173
174    let res = client
175        .post(&presigned_url.url)
176        .multipart(form)
177        .send()
178        .context(format!("While uploading chunk to {}", presigned_url.url))?;
179
180    if !res.status().is_success() {
181        bail!("Failed to upload chunk: {:?}", res);
182    }
183
184    Ok(())
185}
186
187fn finish_upload(
188    client: &Client,
189    base_url: &str,
190    auth_token: &str,
191    symbol_set_id: String,
192    content_hash: String,
193) -> Result<()> {
194    let finish_upload_url: String = format!("{}/{}/{}", base_url, symbol_set_id, "finish_upload");
195    let request = FinishUploadRequest { content_hash };
196
197    let res = client
198        .put(finish_upload_url)
199        .header("Authorization", format!("Bearer {}", auth_token))
200        .header("Content-Type", "application/json")
201        .json(&request)
202        .send()
203        .context(format!("While finishing upload to {}", base_url))?;
204
205    if !res.status().is_success() {
206        bail!("Failed to finish upload: {:?}", res);
207    }
208
209    Ok(())
210}
211
212fn content_hash<Iter, Item>(upload_data: Iter) -> String
213where
214    Iter: IntoIterator<Item = Item>,
215    Item: AsRef<[u8]>,
216{
217    let mut hasher = sha2::Sha512::new();
218    for data in upload_data {
219        hasher.update(data.as_ref());
220    }
221    format!("{:x}", hasher.finalize())
222}
223
224fn delete_files(paths: Vec<PathBuf>) -> Result<()> {
225    // Delete local sourcemaps files from the sourcepair
226    for path in paths {
227        if path.exists() {
228            std::fs::remove_file(&path).context(format!(
229                "Failed to delete sourcemaps file: {}",
230                path.display()
231            ))?;
232        }
233    }
234    Ok(())
235}