posthog_cli/api/
symbol_sets.rs

1use anyhow::{anyhow, Context, Result};
2use rayon::iter::{IntoParallelIterator, ParallelIterator};
3use reqwest::blocking::multipart::{Form, Part};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use tracing::{info, warn};
7
8use crate::{
9    invocation_context::context,
10    utils::{files::content_hash, raise_for_err},
11};
12
13const MAX_FILE_SIZE: usize = 100 * 1024 * 1024; // 100 MB
14
15#[derive(Debug, Clone)]
16pub struct SymbolSetUpload {
17    pub chunk_id: String,
18    pub release_id: Option<String>,
19
20    pub data: Vec<u8>,
21}
22
23#[derive(Debug, Serialize, Deserialize, Clone)]
24struct StartUploadResponseData {
25    presigned_url: PresignedUrl,
26    symbol_set_id: String,
27}
28
29#[derive(Serialize, Deserialize, Debug, Clone)]
30pub struct PresignedUrl {
31    pub url: String,
32    pub fields: HashMap<String, String>,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
36struct BulkUploadStartRequest {
37    symbol_sets: Vec<CreateSymbolSetRequest>,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41struct BulkUploadStartResponse {
42    id_map: HashMap<String, StartUploadResponseData>,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46struct BulkUploadFinishRequest {
47    content_hashes: HashMap<String, String>,
48}
49
50pub fn upload(input_sets: &[SymbolSetUpload], batch_size: usize) -> Result<()> {
51    let upload_requests: Vec<_> = input_sets
52        .iter()
53        .filter(|s| {
54            if s.data.len() > MAX_FILE_SIZE {
55                warn!(
56                    "Skipping symbol set with id: {}, file too large",
57                    s.chunk_id
58                );
59            }
60            s.data.len() <= MAX_FILE_SIZE
61        })
62        .collect();
63
64    for (i, batch) in upload_requests.chunks(batch_size).enumerate() {
65        info!("Starting upload of batch {i}, {} symbol sets", batch.len());
66        let start_response = start_upload(batch)?;
67
68        let id_map: HashMap<_, _> = batch
69            .iter()
70            .map(|u| (u.chunk_id.as_str(), u))
71            .collect();
72
73        info!(
74            "Server returned {} upload keys ({} skipped as already present)",
75            start_response.id_map.len(),
76            batch.len() - start_response.id_map.len()
77        );
78
79        let res: Result<HashMap<String, String>> = start_response
80            .id_map
81            .into_par_iter()
82            .map(|(chunk_id, data)| {
83                info!("Uploading chunk {}", chunk_id);
84                let upload = id_map.get(chunk_id.as_str()).ok_or(anyhow!(
85                    "Got a chunk ID back from posthog that we didn't expect!"
86                ))?;
87
88                let content_hash = content_hash([&upload.data]);
89                upload_to_s3(data.presigned_url.clone(), &upload.data)?;
90                Ok((data.symbol_set_id, content_hash))
91            })
92            .collect();
93
94        let content_hashes = res?;
95
96        finish_upload(content_hashes)?;
97    }
98
99    Ok(())
100}
101
102fn start_upload<'a>(symbol_sets: &[&SymbolSetUpload]) -> Result<BulkUploadStartResponse> {
103    let base_url = format!(
104        "{}/api/environments/{}/error_tracking/symbol_sets",
105        context().token.get_host(),
106        context().token.env_id
107    );
108    let client = &context().client;
109    let auth_token = &context().token.token;
110
111    let start_upload_url: String = format!("{}{}", base_url, "/bulk_start_upload");
112
113    let request = BulkUploadStartRequest {
114        symbol_sets: symbol_sets
115            .iter()
116            .map(|s| CreateSymbolSetRequest::new(s))
117            .collect(),
118    };
119
120    let res = client
121        .post(&start_upload_url)
122        .header("Authorization", format!("Bearer {auth_token}"))
123        .json(&request)
124        .send()
125        .context(format!("While starting upload to {start_upload_url}"))?;
126
127    let res = raise_for_err(res)?;
128
129    Ok(res.json()?)
130}
131
132fn upload_to_s3(presigned_url: PresignedUrl, data: &[u8]) -> Result<()> {
133    let client = &context().client;
134    let mut last_err = None;
135    let mut delay = std::time::Duration::from_millis(500);
136    for attempt in 1..=3 {
137        let mut form = Form::new();
138        for (key, value) in &presigned_url.fields {
139            form = form.text(key.clone(), value.clone());
140        }
141        let part = Part::bytes(data.to_vec());
142        form = form.part("file", part);
143
144        let res = client.post(&presigned_url.url).multipart(form).send();
145
146        match res {
147            Result::Ok(resp) => {
148                last_err = raise_for_err(resp).err();
149                if last_err.is_none() {
150                    return Ok(());
151                }
152            }
153            Result::Err(e) => {
154                last_err = Some(anyhow!("Failed to upload chunk: {e}"));
155            }
156        }
157        if attempt < 3 {
158            warn!(
159                "Upload attempt {} failed, retrying in {:?}...",
160                attempt, delay
161            );
162            std::thread::sleep(delay);
163            delay *= 2;
164        }
165    }
166    Err(last_err.unwrap_or_else(|| anyhow!("Unknown error during upload")))
167}
168
169fn finish_upload(content_hashes: HashMap<String, String>) -> Result<()> {
170    let base_url = format!(
171        "{}/api/environments/{}/error_tracking/symbol_sets",
172        context().token.get_host(),
173        context().token.env_id
174    );
175    let client = &context().client;
176    let auth_token = &context().token.token;
177
178    let finish_upload_url: String = format!("{}/{}", base_url, "bulk_finish_upload");
179    let request = BulkUploadFinishRequest { content_hashes };
180
181    let res = client
182        .post(finish_upload_url)
183        .header("Authorization", format!("Bearer {auth_token}"))
184        .header("Content-Type", "application/json")
185        .json(&request)
186        .send()
187        .context(format!("While finishing upload to {base_url}"))?;
188
189    raise_for_err(res)?;
190
191    Ok(())
192}
193
194impl SymbolSetUpload {
195    pub fn cheap_clone(&self) -> Self {
196        Self {
197            chunk_id: self.chunk_id.clone(),
198            release_id: self.release_id.clone(),
199            data: vec![],
200        }
201    }
202}
203
204#[derive(Debug, Clone, Serialize, Deserialize)]
205struct CreateSymbolSetRequest {
206    chunk_id: String,
207    release_id: Option<String>,
208    content_hash: String,
209}
210
211impl CreateSymbolSetRequest {
212    pub fn new(inner: &SymbolSetUpload) -> Self {
213        Self {
214            chunk_id: inner.chunk_id.clone(),
215            release_id: inner.release_id.clone(),
216            content_hash: content_hash([&inner.data]),
217        }
218    }
219}