posthog_cli/commands/sourcemap/
upload.rs

1use core::str;
2use std::collections::HashMap;
3use std::path::PathBuf;
4
5use anyhow::{anyhow, bail, Context, Ok, Result};
6use reqwest::blocking::multipart::{Form, Part};
7use reqwest::blocking::Client;
8use serde::{Deserialize, Serialize};
9use sha2::Digest;
10use tracing::{info, warn};
11
12use crate::utils::auth::load_token;
13use crate::utils::posthog::capture_command_invoked;
14use crate::utils::release::{create_release, CreateReleaseResponse};
15use crate::utils::sourcemaps::{read_pairs, ChunkUpload, SourcePair};
16
17const MAX_FILE_SIZE: usize = 100 * 1024 * 1024; // 100 MB
18
19#[derive(Debug, Serialize, Deserialize, Clone)]
20struct StartUploadResponseData {
21    presigned_url: PresignedUrl,
22    symbol_set_id: String,
23}
24
25#[derive(Serialize, Deserialize, Debug, Clone)]
26pub struct PresignedUrl {
27    pub url: String,
28    pub fields: HashMap<String, String>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32struct BulkUploadStartRequest {
33    release_id: Option<String>,
34    chunk_ids: Vec<String>,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38struct BulkUploadStartResponse {
39    id_map: HashMap<String, StartUploadResponseData>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
43struct BulkUploadFinishRequest {
44    content_hashes: HashMap<String, String>,
45}
46
47pub fn upload(
48    host: Option<String>,
49    directory: &PathBuf,
50    project: Option<String>,
51    version: Option<String>,
52    delete_after: bool,
53) -> Result<()> {
54    let token = load_token().context("While starting upload command")?;
55    let host = token.get_host(host.as_deref());
56
57    let capture_handle = capture_command_invoked("sourcemap_upload", Some(&token.env_id));
58
59    let base_url = format!(
60        "{}/api/environments/{}/error_tracking/symbol_sets",
61        host, token.env_id
62    );
63
64    let pairs = read_pairs(directory)?;
65    let sourcemap_paths = pairs
66        .iter()
67        .map(|pair| pair.sourcemap.path.clone())
68        .collect::<Vec<_>>();
69
70    let uploads = collect_uploads(pairs).context("While preparing files for upload")?;
71    info!("Found {} chunks to upload", uploads.len());
72
73    // See if we have enough information to create a release object
74    // TODO - The use of a hash_id here means repeated attempts to upload the same data will fail.
75    //        We could relax this, such that we instead replace the existing release with the new one,
76    //        or we could even just allow adding new chunks to an existing release, but for now I'm
77    //        leaving it like this... Reviewers, lets chat about the right approach here
78    let release = create_release(
79        &host,
80        &token,
81        Some(directory.clone()),
82        Some(content_hash(uploads.iter().map(|upload| &upload.data))),
83        project,
84        version,
85    )
86    .context("While creating release")?;
87
88    upload_chunks(&base_url, &token.token, uploads, release.as_ref())?;
89
90    if delete_after {
91        delete_files(sourcemap_paths).context("While deleting sourcemaps")?;
92    }
93
94    let _ = capture_handle.join();
95
96    Ok(())
97}
98
99fn collect_uploads(pairs: Vec<SourcePair>) -> Result<Vec<ChunkUpload>> {
100    let uploads: Vec<ChunkUpload> = pairs
101        .into_iter()
102        .map(|pair| pair.into_chunk_upload())
103        .collect::<Result<Vec<ChunkUpload>>>()?;
104    Ok(uploads)
105}
106
107fn upload_chunks(
108    base_url: &str,
109    token: &str,
110    uploads: Vec<ChunkUpload>,
111    release: Option<&CreateReleaseResponse>,
112) -> Result<()> {
113    let client = reqwest::blocking::Client::new();
114    let release_id = release.map(|r| r.id.to_string());
115    let chunk_ids = uploads
116        .iter()
117        .filter(|u| {
118            if u.data.len() > MAX_FILE_SIZE {
119                warn!(
120                    "Skipping chunk {} because the file size is too large ({})",
121                    u.chunk_id,
122                    u.data.len()
123                );
124                false
125            } else {
126                true
127            }
128        })
129        .map(|u| u.chunk_id.clone())
130        .collect::<Vec<String>>();
131
132    let start_response = start_upload(&client, base_url, token, chunk_ids, &release_id)?;
133
134    let mut id_map: HashMap<_, _> = uploads
135        .into_iter()
136        .map(|u| (u.chunk_id.clone(), u))
137        .collect();
138
139    let mut content_hashes = HashMap::new();
140
141    for (chunk_id, data) in start_response.id_map.into_iter() {
142        info!("Uploading chunk {}", chunk_id);
143        let upload = id_map.remove(&chunk_id).ok_or(anyhow!(
144            "Got a chunk ID back from posthog that we didn't expect!"
145        ))?;
146
147        let content_hash = content_hash([&upload.data]);
148
149        upload_to_s3(&client, data.presigned_url.clone(), upload.data)?;
150
151        content_hashes.insert(data.symbol_set_id.clone(), content_hash);
152    }
153
154    finish_upload(&client, base_url, token, content_hashes)?;
155
156    Ok(())
157}
158
159fn start_upload(
160    client: &Client,
161    base_url: &str,
162    auth_token: &str,
163    chunk_ids: Vec<String>,
164    release_id: &Option<String>,
165) -> Result<BulkUploadStartResponse> {
166    let start_upload_url: String = format!("{}{}", base_url, "/bulk_start_upload");
167
168    let request = BulkUploadStartRequest {
169        chunk_ids,
170        release_id: release_id.clone(),
171    };
172
173    let res = client
174        .post(&start_upload_url)
175        .header("Authorization", format!("Bearer {}", auth_token))
176        .json(&request)
177        .send()
178        .context(format!("While starting upload to {}", start_upload_url))?;
179
180    if !res.status().is_success() {
181        bail!("Failed to start upload: {:?}", res);
182    }
183
184    Ok(res.json()?)
185}
186
187fn upload_to_s3(client: &Client, presigned_url: PresignedUrl, data: Vec<u8>) -> Result<()> {
188    let mut form = Form::new();
189
190    for (key, value) in presigned_url.fields {
191        form = form.text(key.clone(), value.clone());
192    }
193
194    let part = Part::bytes(data);
195    form = form.part("file", part);
196
197    let res = client
198        .post(&presigned_url.url)
199        .multipart(form)
200        .send()
201        .context(format!("While uploading chunk to {}", presigned_url.url))?;
202
203    if !res.status().is_success() {
204        bail!("Failed to upload chunk: {:?}", res);
205    }
206
207    Ok(())
208}
209
210fn finish_upload(
211    client: &Client,
212    base_url: &str,
213    auth_token: &str,
214    content_hashes: HashMap<String, String>,
215) -> Result<()> {
216    let finish_upload_url: String = format!("{}/{}", base_url, "bulk_finish_upload");
217    let request = BulkUploadFinishRequest { content_hashes };
218
219    let res = client
220        .post(finish_upload_url)
221        .header("Authorization", format!("Bearer {}", auth_token))
222        .header("Content-Type", "application/json")
223        .json(&request)
224        .send()
225        .context(format!("While finishing upload to {}", base_url))?;
226
227    if !res.status().is_success() {
228        bail!("Failed to finish upload: {:?}", res);
229    }
230
231    Ok(())
232}
233
234fn content_hash<Iter, Item>(upload_data: Iter) -> String
235where
236    Iter: IntoIterator<Item = Item>,
237    Item: AsRef<[u8]>,
238{
239    let mut hasher = sha2::Sha512::new();
240    for data in upload_data {
241        hasher.update(data.as_ref());
242    }
243    format!("{:x}", hasher.finalize())
244}
245
246fn delete_files(paths: Vec<PathBuf>) -> Result<()> {
247    // Delete local sourcemaps files from the sourcepair
248    for path in paths {
249        if path.exists() {
250            std::fs::remove_file(&path).context(format!(
251                "Failed to delete sourcemaps file: {}",
252                path.display()
253            ))?;
254        }
255    }
256    Ok(())
257}