1use anyhow::{Result, anyhow};
2use base64::Engine;
3use serde::Deserialize;
4use wacore::download::MediaType;
5
6use crate::client::Client;
7use crate::http::{HttpRequest, HttpResponse};
8use crate::mediaconn::{MEDIA_AUTH_REFRESH_RETRY_ATTEMPTS, is_media_auth_error};
9
10const RESUMABLE_UPLOAD_THRESHOLD: usize = 5 * 1024 * 1024;
13
14enum UploadExistsResult {
16 Complete { url: String, direct_path: String },
18 Resume { byte_offset: u64 },
20 NotFound,
22}
23
24#[derive(Deserialize)]
26struct UploadProgressResponse {
27 #[serde(default)]
28 url: Option<String>,
29 #[serde(default)]
30 direct_path: Option<String>,
31 #[serde(default)]
33 resume: Option<String>,
34}
35
36fn parse_upload_progress(resp: &HttpResponse, total_size: u64) -> UploadExistsResult {
38 if resp.status_code >= 400 {
39 return UploadExistsResult::NotFound;
40 }
41 let Ok(progress) = serde_json::from_slice::<UploadProgressResponse>(&resp.body) else {
42 return UploadExistsResult::NotFound;
43 };
44 match progress.resume.as_deref() {
45 Some("complete") => {
46 if let (Some(url), Some(direct_path)) = (progress.url, progress.direct_path) {
47 UploadExistsResult::Complete { url, direct_path }
48 } else {
49 UploadExistsResult::NotFound
50 }
51 }
52 Some(offset_str) => match offset_str.parse::<u64>() {
53 Ok(offset) if offset > 0 && offset < total_size => UploadExistsResult::Resume {
54 byte_offset: offset,
55 },
56 _ => UploadExistsResult::NotFound,
57 },
58 _ => UploadExistsResult::NotFound,
59 }
60}
61
62fn build_upload_request(
63 hostname: &str,
64 mms_type: &str,
65 auth: &str,
66 token: &str,
67 body: &[u8],
68 file_offset: Option<u64>,
69) -> HttpRequest {
70 let mut url = format!("https://{hostname}/mms/{mms_type}/{token}?auth={auth}&token={token}");
71 if let Some(offset) = file_offset {
72 url.push_str(&format!("&file_offset={offset}"));
73 }
74
75 HttpRequest::post(url)
76 .with_header("Content-Type", "application/octet-stream")
77 .with_header("Origin", "https://web.whatsapp.com")
78 .with_body(body.to_vec())
79}
80
81fn build_resume_check_request(
82 hostname: &str,
83 mms_type: &str,
84 auth: &str,
85 token: &str,
86) -> HttpRequest {
87 let url =
88 format!("https://{hostname}/mms/{mms_type}/{token}?auth={auth}&token={token}&resume=1");
89 HttpRequest::post(url).with_header("Origin", "https://web.whatsapp.com")
90}
91
92fn upload_error_from_response(response: HttpResponse) -> anyhow::Error {
93 match response.body_string() {
94 Ok(body) => anyhow!("Upload failed {} body={}", response.status_code, body),
95 Err(body_err) => anyhow!(
96 "Upload failed {} and failed to read response body: {}",
97 response.status_code,
98 body_err
99 ),
100 }
101}
102
103async fn upload_media_with_retry<
104 GetMediaConn,
105 GetMediaConnFut,
106 InvalidateMediaConn,
107 InvalidateMediaConnFut,
108 ExecuteRequest,
109 ExecuteRequestFut,
110>(
111 enc: &wacore::upload::EncryptedMedia,
112 media_type: MediaType,
113 file_length: u64,
114 mut get_media_conn: GetMediaConn,
115 mut invalidate_media_conn: InvalidateMediaConn,
116 mut execute_request: ExecuteRequest,
117) -> Result<UploadResponse>
118where
119 GetMediaConn: FnMut(bool) -> GetMediaConnFut,
120 GetMediaConnFut: std::future::Future<Output = Result<crate::mediaconn::MediaConn>>,
121 InvalidateMediaConn: FnMut() -> InvalidateMediaConnFut,
122 InvalidateMediaConnFut: std::future::Future<Output = ()>,
123 ExecuteRequest: FnMut(HttpRequest) -> ExecuteRequestFut,
124 ExecuteRequestFut: std::future::Future<Output = Result<HttpResponse>>,
125{
126 let token = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(enc.file_enc_sha256);
127 let mms_type = media_type.mms_type();
128 let mut force_refresh = false;
129 let mut last_error: Option<anyhow::Error> = None;
130
131 for attempt in 0..=MEDIA_AUTH_REFRESH_RETRY_ATTEMPTS {
132 let media_conn = get_media_conn(force_refresh).await?;
133 if media_conn.hosts.is_empty() {
134 return Err(anyhow!("No media hosts"));
135 }
136
137 let mut retry_with_fresh_auth = false;
138
139 for host in &media_conn.hosts {
140 let mut upload_data: &[u8] = &enc.data_to_upload;
143 let mut file_offset: Option<u64> = None;
144
145 if enc.data_to_upload.len() >= RESUMABLE_UPLOAD_THRESHOLD {
146 let check_req =
147 build_resume_check_request(&host.hostname, mms_type, &media_conn.auth, &token);
148 if let Ok(check_resp) = execute_request(check_req).await {
149 let total = enc.data_to_upload.len() as u64;
150 match parse_upload_progress(&check_resp, total) {
151 UploadExistsResult::Complete { url, direct_path } => {
152 return Ok(UploadResponse {
153 url,
154 direct_path,
155 media_key: enc.media_key.to_vec(),
156 file_enc_sha256: enc.file_enc_sha256.to_vec(),
157 file_sha256: enc.file_sha256.to_vec(),
158 file_length,
159 });
160 }
161 UploadExistsResult::Resume { byte_offset } => {
162 log::info!("Resuming upload from byte {byte_offset}/{total}");
163 upload_data = &enc.data_to_upload[byte_offset as usize..];
164 file_offset = Some(byte_offset);
165 }
166 UploadExistsResult::NotFound => {}
167 }
168 }
169 }
171
172 let request = build_upload_request(
173 &host.hostname,
174 mms_type,
175 &media_conn.auth,
176 &token,
177 upload_data,
178 file_offset,
179 );
180
181 let response = match execute_request(request).await {
182 Ok(response) => response,
183 Err(err) => {
184 last_error = Some(err);
185 continue;
186 }
187 };
188
189 if response.status_code < 400 {
190 let raw: RawUploadResponse = serde_json::from_slice(&response.body)?;
191 return Ok(UploadResponse {
192 url: raw.url,
193 direct_path: raw.direct_path,
194 media_key: enc.media_key.to_vec(),
195 file_enc_sha256: enc.file_enc_sha256.to_vec(),
196 file_sha256: enc.file_sha256.to_vec(),
197 file_length,
198 });
199 }
200
201 let status_code = response.status_code;
202 let err = upload_error_from_response(response);
203
204 if is_media_auth_error(status_code) {
205 if attempt == 0 {
206 invalidate_media_conn().await;
207 force_refresh = true;
208 retry_with_fresh_auth = true;
209 break;
210 }
211
212 return Err(err);
213 }
214
215 last_error = Some(err);
216 }
217
218 if !retry_with_fresh_auth {
219 break;
220 }
221 }
222
223 Err(last_error.unwrap_or_else(|| anyhow!("Failed to upload to all available media hosts")))
224}
225
226#[derive(Debug, Clone)]
227pub struct UploadResponse {
228 pub url: String,
229 pub direct_path: String,
230 pub media_key: Vec<u8>,
231 pub file_enc_sha256: Vec<u8>,
232 pub file_sha256: Vec<u8>,
233 pub file_length: u64,
234}
235
236#[derive(Deserialize)]
237struct RawUploadResponse {
238 url: String,
239 direct_path: String,
240}
241
242impl Client {
243 pub async fn upload(&self, data: Vec<u8>, media_type: MediaType) -> Result<UploadResponse> {
244 let enc = wacore::runtime::blocking(&*self.runtime, {
245 let data = data.clone();
246 move || wacore::upload::encrypt_media(&data, media_type)
247 })
248 .await?;
249
250 upload_media_with_retry(
251 &enc,
252 media_type,
253 data.len() as u64,
254 |force| async move { self.refresh_media_conn(force).await.map_err(Into::into) },
255 || async { self.invalidate_media_conn().await },
256 |request| async move { self.http_client.execute(request).await },
257 )
258 .await
259 }
260}
261
262#[cfg(test)]
263mod tests {
264 use super::*;
265 use crate::mediaconn::{MediaConn, MediaConnHost};
266 use async_lock::Mutex;
267 use std::sync::Arc;
268 use wacore::time::Instant;
269
270 fn media_conn(auth: &str, hosts: &[&str]) -> MediaConn {
271 MediaConn {
272 auth: auth.to_string(),
273 ttl: 60,
274 auth_ttl: None,
275 hosts: hosts
276 .iter()
277 .map(|hostname| MediaConnHost::new((*hostname).to_string()))
278 .collect(),
279 fetched_at: Instant::now(),
280 }
281 }
282
283 #[tokio::test]
284 async fn upload_retries_with_forced_media_conn_refresh_after_auth_error() {
285 let enc = wacore::upload::encrypt_media(b"retry me", MediaType::Image)
286 .expect("encryption should succeed");
287 let first_conn = media_conn("stale-auth", &["cdn1.example.com"]);
288 let refreshed_conn = media_conn("fresh-auth", &["cdn2.example.com"]);
289 let refresh_calls = Arc::new(Mutex::new(Vec::new()));
290 let invalidations = Arc::new(Mutex::new(0usize));
291 let seen_urls = Arc::new(Mutex::new(Vec::new()));
292
293 let result = upload_media_with_retry(
294 &enc,
295 MediaType::Image,
296 8,
297 {
298 let refresh_calls = Arc::clone(&refresh_calls);
299 move |force| {
300 let refresh_calls = Arc::clone(&refresh_calls);
301 let first_conn = first_conn.clone();
302 let refreshed_conn = refreshed_conn.clone();
303 async move {
304 refresh_calls.lock().await.push(force);
305 Ok(if force { refreshed_conn } else { first_conn })
306 }
307 }
308 },
309 {
310 let invalidations = Arc::clone(&invalidations);
311 move || {
312 let invalidations = Arc::clone(&invalidations);
313 async move {
314 *invalidations.lock().await += 1;
315 }
316 }
317 },
318 {
319 let seen_urls = Arc::clone(&seen_urls);
320 move |request| {
321 let seen_urls = Arc::clone(&seen_urls);
322 async move {
323 seen_urls.lock().await.push(request.url.clone());
324 if request.url.contains("stale-auth") {
325 Ok(HttpResponse {
326 status_code: 401,
327 body: b"expired".to_vec(),
328 })
329 } else {
330 Ok(HttpResponse {
331 status_code: 200,
332 body: br#"{"url":"https://cdn2.example.com/file","direct_path":"/v/t62.7118-24/123"}"#.to_vec(),
333 })
334 }
335 }
336 }
337 },
338 )
339 .await
340 .expect("upload should succeed after refreshing media auth");
341
342 assert_eq!(*refresh_calls.lock().await, vec![false, true]);
343 assert_eq!(*invalidations.lock().await, 1);
344
345 let seen_urls = seen_urls.lock().await.clone();
346 assert_eq!(seen_urls.len(), 2);
347 assert!(seen_urls[0].contains("cdn1.example.com"));
348 assert!(seen_urls[0].contains("auth=stale-auth"));
349 assert!(seen_urls[1].contains("cdn2.example.com"));
350 assert!(seen_urls[1].contains("auth=fresh-auth"));
351 assert_eq!(result.direct_path, "/v/t62.7118-24/123");
352 assert_eq!(result.url, "https://cdn2.example.com/file");
353 }
354
355 #[tokio::test]
356 async fn upload_fails_over_to_next_host_after_non_auth_error() {
357 let enc = wacore::upload::encrypt_media(b"retry host", MediaType::Image)
358 .expect("encryption should succeed");
359 let conn = media_conn("shared-auth", &["cdn1.example.com", "cdn2.example.com"]);
360 let seen_urls = Arc::new(Mutex::new(Vec::new()));
361
362 let result = upload_media_with_retry(
363 &enc,
364 MediaType::Image,
365 10,
366 move |_force| {
367 let conn = conn.clone();
368 async move { Ok(conn) }
369 },
370 || async {},
371 {
372 let seen_urls = Arc::clone(&seen_urls);
373 move |request| {
374 let seen_urls = Arc::clone(&seen_urls);
375 async move {
376 seen_urls.lock().await.push(request.url.clone());
377 if request.url.contains("cdn1.example.com") {
378 Ok(HttpResponse {
379 status_code: 500,
380 body: b"try another host".to_vec(),
381 })
382 } else {
383 Ok(HttpResponse {
384 status_code: 200,
385 body: br#"{"url":"https://cdn2.example.com/file","direct_path":"/v/t62.7118-24/456"}"#.to_vec(),
386 })
387 }
388 }
389 }
390 },
391 )
392 .await
393 .expect("upload should succeed on the second host");
394
395 let seen_urls = seen_urls.lock().await.clone();
396 assert_eq!(seen_urls.len(), 2);
397 assert!(seen_urls[0].contains("cdn1.example.com"));
398 assert!(seen_urls[1].contains("cdn2.example.com"));
399 assert_eq!(result.direct_path, "/v/t62.7118-24/456");
400 }
401}