posthog_cli/api/
symbol_sets.rs1use anyhow::{anyhow, Context, Result};
2use rayon::iter::{IntoParallelIterator, ParallelIterator};
3use reqwest::blocking::multipart::{Form, Part};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use tracing::{info, warn};
7
8use crate::{
9 invocation_context::context,
10 utils::{files::content_hash, raise_for_err},
11};
12
13const MAX_FILE_SIZE: usize = 100 * 1024 * 1024; #[derive(Debug, Clone)]
16pub struct SymbolSetUpload {
17 pub chunk_id: String,
18 pub release_id: Option<String>,
19
20 pub data: Vec<u8>,
21}
22
23#[derive(Debug, Serialize, Deserialize, Clone)]
24struct StartUploadResponseData {
25 presigned_url: PresignedUrl,
26 symbol_set_id: String,
27}
28
29#[derive(Serialize, Deserialize, Debug, Clone)]
30pub struct PresignedUrl {
31 pub url: String,
32 pub fields: HashMap<String, String>,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
36struct BulkUploadStartRequest {
37 symbol_sets: Vec<CreateSymbolSetRequest>,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41struct BulkUploadStartResponse {
42 id_map: HashMap<String, StartUploadResponseData>,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46struct BulkUploadFinishRequest {
47 content_hashes: HashMap<String, String>,
48}
49
50pub fn upload(input_sets: &[SymbolSetUpload], batch_size: usize) -> Result<()> {
51 let upload_requests: Vec<_> = input_sets
52 .iter()
53 .filter(|s| {
54 if s.data.len() > MAX_FILE_SIZE {
55 warn!(
56 "Skipping symbol set with id: {}, file too large",
57 s.chunk_id
58 );
59 }
60 s.data.len() <= MAX_FILE_SIZE
61 })
62 .collect();
63
64 for (i, batch) in upload_requests.chunks(batch_size).enumerate() {
65 info!("Starting upload of batch {i}, {} symbol sets", batch.len());
66 let start_response = start_upload(batch)?;
67
68 let id_map: HashMap<_, _> = batch
69 .iter()
70 .map(|u| (u.chunk_id.as_str(), u))
71 .collect();
72
73 info!(
74 "Server returned {} upload keys ({} skipped as already present)",
75 start_response.id_map.len(),
76 batch.len() - start_response.id_map.len()
77 );
78
79 let res: Result<HashMap<String, String>> = start_response
80 .id_map
81 .into_par_iter()
82 .map(|(chunk_id, data)| {
83 info!("Uploading chunk {}", chunk_id);
84 let upload = id_map.get(chunk_id.as_str()).ok_or(anyhow!(
85 "Got a chunk ID back from posthog that we didn't expect!"
86 ))?;
87
88 let content_hash = content_hash([&upload.data]);
89 upload_to_s3(data.presigned_url.clone(), &upload.data)?;
90 Ok((data.symbol_set_id, content_hash))
91 })
92 .collect();
93
94 let content_hashes = res?;
95
96 finish_upload(content_hashes)?;
97 }
98
99 Ok(())
100}
101
102fn start_upload<'a>(symbol_sets: &[&SymbolSetUpload]) -> Result<BulkUploadStartResponse> {
103 let base_url = format!(
104 "{}/api/environments/{}/error_tracking/symbol_sets",
105 context().token.get_host(),
106 context().token.env_id
107 );
108 let client = &context().client;
109 let auth_token = &context().token.token;
110
111 let start_upload_url: String = format!("{}{}", base_url, "/bulk_start_upload");
112
113 let request = BulkUploadStartRequest {
114 symbol_sets: symbol_sets
115 .iter()
116 .map(|s| CreateSymbolSetRequest::new(s))
117 .collect(),
118 };
119
120 let res = client
121 .post(&start_upload_url)
122 .header("Authorization", format!("Bearer {auth_token}"))
123 .json(&request)
124 .send()
125 .context(format!("While starting upload to {start_upload_url}"))?;
126
127 let res = raise_for_err(res)?;
128
129 Ok(res.json()?)
130}
131
132fn upload_to_s3(presigned_url: PresignedUrl, data: &[u8]) -> Result<()> {
133 let client = &context().client;
134 let mut last_err = None;
135 let mut delay = std::time::Duration::from_millis(500);
136 for attempt in 1..=3 {
137 let mut form = Form::new();
138 for (key, value) in &presigned_url.fields {
139 form = form.text(key.clone(), value.clone());
140 }
141 let part = Part::bytes(data.to_vec());
142 form = form.part("file", part);
143
144 let res = client.post(&presigned_url.url).multipart(form).send();
145
146 match res {
147 Result::Ok(resp) => {
148 last_err = raise_for_err(resp).err();
149 if last_err.is_none() {
150 return Ok(());
151 }
152 }
153 Result::Err(e) => {
154 last_err = Some(anyhow!("Failed to upload chunk: {e}"));
155 }
156 }
157 if attempt < 3 {
158 warn!(
159 "Upload attempt {} failed, retrying in {:?}...",
160 attempt, delay
161 );
162 std::thread::sleep(delay);
163 delay *= 2;
164 }
165 }
166 Err(last_err.unwrap_or_else(|| anyhow!("Unknown error during upload")))
167}
168
169fn finish_upload(content_hashes: HashMap<String, String>) -> Result<()> {
170 let base_url = format!(
171 "{}/api/environments/{}/error_tracking/symbol_sets",
172 context().token.get_host(),
173 context().token.env_id
174 );
175 let client = &context().client;
176 let auth_token = &context().token.token;
177
178 let finish_upload_url: String = format!("{}/{}", base_url, "bulk_finish_upload");
179 let request = BulkUploadFinishRequest { content_hashes };
180
181 let res = client
182 .post(finish_upload_url)
183 .header("Authorization", format!("Bearer {auth_token}"))
184 .header("Content-Type", "application/json")
185 .json(&request)
186 .send()
187 .context(format!("While finishing upload to {base_url}"))?;
188
189 raise_for_err(res)?;
190
191 Ok(())
192}
193
194impl SymbolSetUpload {
195 pub fn cheap_clone(&self) -> Self {
196 Self {
197 chunk_id: self.chunk_id.clone(),
198 release_id: self.release_id.clone(),
199 data: vec![],
200 }
201 }
202}
203
204#[derive(Debug, Clone, Serialize, Deserialize)]
205struct CreateSymbolSetRequest {
206 chunk_id: String,
207 release_id: Option<String>,
208 content_hash: String,
209}
210
211impl CreateSymbolSetRequest {
212 pub fn new(inner: &SymbolSetUpload) -> Self {
213 Self {
214 chunk_id: inner.chunk_id.clone(),
215 release_id: inner.release_id.clone(),
216 content_hash: content_hash([&inner.data]),
217 }
218 }
219}