posthog_cli/api/
symbol_sets.rs

1use anyhow::{anyhow, Context, Result};
2use rayon::iter::{IntoParallelIterator, ParallelIterator};
3use reqwest::blocking::multipart::{Form, Part};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use tracing::{info, warn};
7
8use crate::{
9    invocation_context::context,
10    utils::{files::content_hash, raise_for_err},
11};
12
13const MAX_FILE_SIZE: usize = 100 * 1024 * 1024; // 100 MB
14
15#[derive(Debug, Clone)]
16pub struct SymbolSetUpload {
17    pub chunk_id: String,
18    pub release_id: Option<String>,
19
20    pub data: Vec<u8>,
21}
22
23#[derive(Debug, Serialize, Deserialize, Clone)]
24struct StartUploadResponseData {
25    presigned_url: PresignedUrl,
26    symbol_set_id: String,
27}
28
29#[derive(Serialize, Deserialize, Debug, Clone)]
30pub struct PresignedUrl {
31    pub url: String,
32    pub fields: HashMap<String, String>,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
36struct BulkUploadStartRequest {
37    symbol_sets: Vec<CreateSymbolSetRequest>,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41struct BulkUploadStartResponse {
42    id_map: HashMap<String, StartUploadResponseData>,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46struct BulkUploadFinishRequest {
47    content_hashes: HashMap<String, String>,
48}
49
50pub fn upload(input_sets: &[SymbolSetUpload], batch_size: usize) -> Result<()> {
51    let upload_requests: Vec<_> = input_sets
52        .iter()
53        .filter(|s| {
54            if s.data.len() > MAX_FILE_SIZE {
55                warn!(
56                    "Skipping symbol set with id: {}, file too large",
57                    s.chunk_id
58                );
59            }
60            s.data.len() <= MAX_FILE_SIZE
61        })
62        .collect();
63
64    for (i, batch) in upload_requests.chunks(batch_size).enumerate() {
65        info!("Starting upload of batch {i}, {} symbol sets", batch.len());
66        let start_response = start_upload(batch)?;
67
68        let id_map: HashMap<_, _> = batch.iter().map(|u| (u.chunk_id.as_str(), u)).collect();
69
70        info!(
71            "Server returned {} upload keys ({} skipped as already present)",
72            start_response.id_map.len(),
73            batch.len() - start_response.id_map.len()
74        );
75
76        let res: Result<HashMap<String, String>> = start_response
77            .id_map
78            .into_par_iter()
79            .map(|(chunk_id, data)| {
80                info!("Uploading chunk {}", chunk_id);
81                let upload = id_map.get(chunk_id.as_str()).ok_or(anyhow!(
82                    "Got a chunk ID back from posthog that we didn't expect!"
83                ))?;
84
85                let content_hash = content_hash([&upload.data]);
86                upload_to_s3(data.presigned_url.clone(), &upload.data)?;
87                Ok((data.symbol_set_id, content_hash))
88            })
89            .collect();
90
91        let content_hashes = res?;
92
93        finish_upload(content_hashes)?;
94    }
95
96    Ok(())
97}
98
99fn start_upload(symbol_sets: &[&SymbolSetUpload]) -> Result<BulkUploadStartResponse> {
100    let base_url = format!(
101        "{}/api/environments/{}/error_tracking/symbol_sets",
102        context().token.get_host(),
103        context().token.env_id
104    );
105    let client = &context().client;
106    let auth_token = &context().token.token;
107
108    let start_upload_url: String = format!("{}{}", base_url, "/bulk_start_upload");
109
110    let request = BulkUploadStartRequest {
111        symbol_sets: symbol_sets
112            .iter()
113            .map(|s| CreateSymbolSetRequest::new(s))
114            .collect(),
115    };
116
117    let res = client
118        .post(&start_upload_url)
119        .header("Authorization", format!("Bearer {auth_token}"))
120        .json(&request)
121        .send()
122        .context(format!("While starting upload to {start_upload_url}"))?;
123
124    let res = raise_for_err(res)?;
125
126    Ok(res.json()?)
127}
128
129fn upload_to_s3(presigned_url: PresignedUrl, data: &[u8]) -> Result<()> {
130    let client = &context().client;
131    let mut last_err = None;
132    let mut delay = std::time::Duration::from_millis(500);
133    for attempt in 1..=3 {
134        let mut form = Form::new();
135        for (key, value) in &presigned_url.fields {
136            form = form.text(key.clone(), value.clone());
137        }
138        let part = Part::bytes(data.to_vec());
139        form = form.part("file", part);
140
141        let res = client.post(&presigned_url.url).multipart(form).send();
142
143        match res {
144            Result::Ok(resp) => {
145                last_err = raise_for_err(resp).err();
146                if last_err.is_none() {
147                    return Ok(());
148                }
149            }
150            Result::Err(e) => {
151                last_err = Some(anyhow!("Failed to upload chunk: {e:?}"));
152            }
153        }
154        if attempt < 3 {
155            warn!("Upload attempt {attempt} failed, retrying in {delay:?}...",);
156            std::thread::sleep(delay);
157            delay *= 2;
158        }
159    }
160    Err(last_err.unwrap_or_else(|| anyhow!("Unknown error during upload")))
161}
162
163fn finish_upload(content_hashes: HashMap<String, String>) -> Result<()> {
164    let base_url = format!(
165        "{}/api/environments/{}/error_tracking/symbol_sets",
166        context().token.get_host(),
167        context().token.env_id
168    );
169    let client = &context().client;
170    let auth_token = &context().token.token;
171
172    let finish_upload_url: String = format!("{}/{}", base_url, "bulk_finish_upload");
173    let request = BulkUploadFinishRequest { content_hashes };
174
175    let res = client
176        .post(finish_upload_url)
177        .header("Authorization", format!("Bearer {auth_token}"))
178        .header("Content-Type", "application/json")
179        .json(&request)
180        .send()
181        .context(format!("While finishing upload to {base_url}"))?;
182
183    raise_for_err(res)?;
184
185    Ok(())
186}
187
188impl SymbolSetUpload {
189    pub fn cheap_clone(&self) -> Self {
190        Self {
191            chunk_id: self.chunk_id.clone(),
192            release_id: self.release_id.clone(),
193            data: vec![],
194        }
195    }
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
199struct CreateSymbolSetRequest {
200    chunk_id: String,
201    release_id: Option<String>,
202    content_hash: String,
203}
204
205impl CreateSymbolSetRequest {
206    pub fn new(inner: &SymbolSetUpload) -> Self {
207        Self {
208            chunk_id: inner.chunk_id.clone(),
209            release_id: inner.release_id.clone(),
210            content_hash: content_hash([&inner.data]),
211        }
212    }
213}