posthog_cli/api/
symbol_sets.rs1use anyhow::{anyhow, Context, Result};
2use rayon::iter::{IntoParallelIterator, ParallelIterator};
3use reqwest::blocking::multipart::{Form, Part};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use tracing::{info, warn};
7
8use crate::{
9 invocation_context::context,
10 utils::{files::content_hash, raise_for_err},
11};
12
13const MAX_FILE_SIZE: usize = 100 * 1024 * 1024; #[derive(Debug, Clone)]
16pub struct SymbolSetUpload {
17 pub chunk_id: String,
18 pub release_id: Option<String>,
19
20 pub data: Vec<u8>,
21}
22
23#[derive(Debug, Serialize, Deserialize, Clone)]
24struct StartUploadResponseData {
25 presigned_url: PresignedUrl,
26 symbol_set_id: String,
27}
28
29#[derive(Serialize, Deserialize, Debug, Clone)]
30pub struct PresignedUrl {
31 pub url: String,
32 pub fields: HashMap<String, String>,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
36struct BulkUploadStartRequest {
37 symbol_sets: Vec<CreateSymbolSetRequest>,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41struct BulkUploadStartResponse {
42 id_map: HashMap<String, StartUploadResponseData>,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46struct BulkUploadFinishRequest {
47 content_hashes: HashMap<String, String>,
48}
49
50pub fn upload(input_sets: &[SymbolSetUpload], batch_size: usize) -> Result<()> {
51 let upload_requests: Vec<_> = input_sets
52 .iter()
53 .filter(|s| {
54 if s.data.len() > MAX_FILE_SIZE {
55 warn!(
56 "Skipping symbol set with id: {}, file too large",
57 s.chunk_id
58 );
59 }
60 s.data.len() <= MAX_FILE_SIZE
61 })
62 .collect();
63
64 for (i, batch) in upload_requests.chunks(batch_size).enumerate() {
65 info!("Starting upload of batch {i}, {} symbol sets", batch.len());
66 let start_response = start_upload(batch)?;
67
68 let id_map: HashMap<_, _> = batch.iter().map(|u| (u.chunk_id.as_str(), u)).collect();
69
70 info!(
71 "Server returned {} upload keys ({} skipped as already present)",
72 start_response.id_map.len(),
73 batch.len() - start_response.id_map.len()
74 );
75
76 let res: Result<HashMap<String, String>> = start_response
77 .id_map
78 .into_par_iter()
79 .map(|(chunk_id, data)| {
80 info!("Uploading chunk {}", chunk_id);
81 let upload = id_map.get(chunk_id.as_str()).ok_or(anyhow!(
82 "Got a chunk ID back from posthog that we didn't expect!"
83 ))?;
84
85 let content_hash = content_hash([&upload.data]);
86 upload_to_s3(data.presigned_url.clone(), &upload.data)?;
87 Ok((data.symbol_set_id, content_hash))
88 })
89 .collect();
90
91 let content_hashes = res?;
92
93 finish_upload(content_hashes)?;
94 }
95
96 Ok(())
97}
98
99fn start_upload(symbol_sets: &[&SymbolSetUpload]) -> Result<BulkUploadStartResponse> {
100 let base_url = format!(
101 "{}/api/environments/{}/error_tracking/symbol_sets",
102 context().token.get_host(),
103 context().token.env_id
104 );
105 let client = &context().client;
106 let auth_token = &context().token.token;
107
108 let start_upload_url: String = format!("{}{}", base_url, "/bulk_start_upload");
109
110 let request = BulkUploadStartRequest {
111 symbol_sets: symbol_sets
112 .iter()
113 .map(|s| CreateSymbolSetRequest::new(s))
114 .collect(),
115 };
116
117 let res = client
118 .post(&start_upload_url)
119 .header("Authorization", format!("Bearer {auth_token}"))
120 .json(&request)
121 .send()
122 .context(format!("While starting upload to {start_upload_url}"))?;
123
124 let res = raise_for_err(res)?;
125
126 Ok(res.json()?)
127}
128
129fn upload_to_s3(presigned_url: PresignedUrl, data: &[u8]) -> Result<()> {
130 let client = &context().client;
131 let mut last_err = None;
132 let mut delay = std::time::Duration::from_millis(500);
133 for attempt in 1..=3 {
134 let mut form = Form::new();
135 for (key, value) in &presigned_url.fields {
136 form = form.text(key.clone(), value.clone());
137 }
138 let part = Part::bytes(data.to_vec());
139 form = form.part("file", part);
140
141 let res = client.post(&presigned_url.url).multipart(form).send();
142
143 match res {
144 Result::Ok(resp) => {
145 last_err = raise_for_err(resp).err();
146 if last_err.is_none() {
147 return Ok(());
148 }
149 }
150 Result::Err(e) => {
151 last_err = Some(anyhow!("Failed to upload chunk: {e:?}"));
152 }
153 }
154 if attempt < 3 {
155 warn!("Upload attempt {attempt} failed, retrying in {delay:?}...",);
156 std::thread::sleep(delay);
157 delay *= 2;
158 }
159 }
160 Err(last_err.unwrap_or_else(|| anyhow!("Unknown error during upload")))
161}
162
163fn finish_upload(content_hashes: HashMap<String, String>) -> Result<()> {
164 let base_url = format!(
165 "{}/api/environments/{}/error_tracking/symbol_sets",
166 context().token.get_host(),
167 context().token.env_id
168 );
169 let client = &context().client;
170 let auth_token = &context().token.token;
171
172 let finish_upload_url: String = format!("{}/{}", base_url, "bulk_finish_upload");
173 let request = BulkUploadFinishRequest { content_hashes };
174
175 let res = client
176 .post(finish_upload_url)
177 .header("Authorization", format!("Bearer {auth_token}"))
178 .header("Content-Type", "application/json")
179 .json(&request)
180 .send()
181 .context(format!("While finishing upload to {base_url}"))?;
182
183 raise_for_err(res)?;
184
185 Ok(())
186}
187
188impl SymbolSetUpload {
189 pub fn cheap_clone(&self) -> Self {
190 Self {
191 chunk_id: self.chunk_id.clone(),
192 release_id: self.release_id.clone(),
193 data: vec![],
194 }
195 }
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
199struct CreateSymbolSetRequest {
200 chunk_id: String,
201 release_id: Option<String>,
202 content_hash: String,
203}
204
205impl CreateSymbolSetRequest {
206 pub fn new(inner: &SymbolSetUpload) -> Self {
207 Self {
208 chunk_id: inner.chunk_id.clone(),
209 release_id: inner.release_id.clone(),
210 content_hash: content_hash([&inner.data]),
211 }
212 }
213}