1use anyhow::{Result, anyhow};
2use base64::Engine;
3use serde::Deserialize;
4use wacore::download::MediaType;
5
6use crate::client::Client;
7use crate::http::{HttpRequest, HttpResponse};
8use crate::mediaconn::{MEDIA_AUTH_REFRESH_RETRY_ATTEMPTS, is_media_auth_error};
9
10const RESUMABLE_UPLOAD_THRESHOLD: usize = 5 * 1024 * 1024;
13
14enum UploadExistsResult {
16 Complete { url: String, direct_path: String },
18 Resume { byte_offset: u64 },
20 NotFound,
22}
23
24#[derive(Deserialize)]
26struct UploadProgressResponse {
27 #[serde(default)]
28 url: Option<String>,
29 #[serde(default)]
30 direct_path: Option<String>,
31 #[serde(default)]
33 resume: Option<String>,
34}
35
36fn parse_upload_progress(resp: &HttpResponse, total_size: u64) -> UploadExistsResult {
38 if resp.status_code >= 400 {
39 return UploadExistsResult::NotFound;
40 }
41 let Ok(progress) = serde_json::from_slice::<UploadProgressResponse>(&resp.body) else {
42 return UploadExistsResult::NotFound;
43 };
44 match progress.resume.as_deref() {
45 Some("complete") => {
46 if let (Some(url), Some(direct_path)) = (progress.url, progress.direct_path) {
47 UploadExistsResult::Complete { url, direct_path }
48 } else {
49 UploadExistsResult::NotFound
50 }
51 }
52 Some(offset_str) => match offset_str.parse::<u64>() {
53 Ok(offset) if offset > 0 && offset < total_size => UploadExistsResult::Resume {
54 byte_offset: offset,
55 },
56 _ => UploadExistsResult::NotFound,
57 },
58 _ => UploadExistsResult::NotFound,
59 }
60}
61
62fn build_upload_request(
63 hostname: &str,
64 upload_path: &str,
65 auth: &str,
66 token: &str,
67 body: &[u8],
68 file_offset: Option<u64>,
69) -> HttpRequest {
70 let mut url = format!("https://{hostname}{upload_path}/{token}?auth={auth}&token={token}");
71 if let Some(offset) = file_offset {
72 url.push_str("&file_offset=");
73 url.push_str(itoa::Buffer::new().format(offset));
74 }
75
76 HttpRequest::post(url)
77 .with_header("Content-Type", "application/octet-stream")
78 .with_header("Origin", "https://web.whatsapp.com")
79 .with_body(body.to_vec())
80}
81
82fn build_resume_check_request(
83 hostname: &str,
84 upload_path: &str,
85 auth: &str,
86 token: &str,
87) -> HttpRequest {
88 let url = format!("https://{hostname}{upload_path}/{token}?auth={auth}&token={token}&resume=1");
89 HttpRequest::post(url).with_header("Origin", "https://web.whatsapp.com")
90}
91
92fn upload_error_from_response(response: HttpResponse) -> anyhow::Error {
93 match response.body_string() {
94 Ok(body) => anyhow!("Upload failed {} body={}", response.status_code, body),
95 Err(body_err) => anyhow!(
96 "Upload failed {} and failed to read response body: {}",
97 response.status_code,
98 body_err
99 ),
100 }
101}
102
103async fn upload_media_with_retry<
104 GetMediaConn,
105 GetMediaConnFut,
106 InvalidateMediaConn,
107 InvalidateMediaConnFut,
108 ExecuteRequest,
109 ExecuteRequestFut,
110>(
111 enc: &wacore::upload::EncryptedMedia,
112 media_type: MediaType,
113 file_length: u64,
114 media_key_timestamp: i64,
115 mut get_media_conn: GetMediaConn,
116 mut invalidate_media_conn: InvalidateMediaConn,
117 mut execute_request: ExecuteRequest,
118) -> Result<UploadResponse>
119where
120 GetMediaConn: FnMut(bool) -> GetMediaConnFut,
121 GetMediaConnFut: std::future::Future<Output = Result<crate::mediaconn::MediaConn>>,
122 InvalidateMediaConn: FnMut() -> InvalidateMediaConnFut,
123 InvalidateMediaConnFut: std::future::Future<Output = ()>,
124 ExecuteRequest: FnMut(HttpRequest) -> ExecuteRequestFut,
125 ExecuteRequestFut: std::future::Future<Output = Result<HttpResponse>>,
126{
127 let token = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(enc.file_enc_sha256);
128 let upload_path = media_type.upload_path();
129 let mut force_refresh = false;
130 let mut last_error: Option<anyhow::Error> = None;
131
132 for attempt in 0..=MEDIA_AUTH_REFRESH_RETRY_ATTEMPTS {
133 let media_conn = get_media_conn(force_refresh).await?;
134 if media_conn.hosts.is_empty() {
135 return Err(anyhow!("No media hosts"));
136 }
137
138 let mut retry_with_fresh_auth = false;
139
140 for host in &media_conn.hosts {
141 let mut upload_data: &[u8] = &enc.data_to_upload;
144 let mut file_offset: Option<u64> = None;
145
146 if enc.data_to_upload.len() >= RESUMABLE_UPLOAD_THRESHOLD {
147 let check_req = build_resume_check_request(
148 &host.hostname,
149 upload_path,
150 &media_conn.auth,
151 &token,
152 );
153 if let Ok(check_resp) = execute_request(check_req).await {
154 let total = enc.data_to_upload.len() as u64;
155 match parse_upload_progress(&check_resp, total) {
156 UploadExistsResult::Complete { url, direct_path } => {
157 return Ok(UploadResponse {
158 url,
159 direct_path,
160 media_key: enc.media_key,
161 file_enc_sha256: enc.file_enc_sha256,
162 file_sha256: enc.file_sha256,
163 file_length,
164 media_key_timestamp,
165 });
166 }
167 UploadExistsResult::Resume { byte_offset } => {
168 let offset = byte_offset as usize;
169 if offset >= enc.data_to_upload.len() {
170 log::warn!(
171 "Server resume offset {offset} exceeds data length {}; uploading from start",
172 enc.data_to_upload.len()
173 );
174 } else {
175 log::info!("Resuming upload from byte {byte_offset}/{total}");
176 upload_data = &enc.data_to_upload[offset..];
177 file_offset = Some(byte_offset);
178 }
179 }
180 UploadExistsResult::NotFound => {}
181 }
182 }
183 }
185
186 let request = build_upload_request(
187 &host.hostname,
188 upload_path,
189 &media_conn.auth,
190 &token,
191 upload_data,
192 file_offset,
193 );
194
195 let response = match execute_request(request).await {
196 Ok(response) => response,
197 Err(err) => {
198 last_error = Some(err);
199 continue;
200 }
201 };
202
203 if response.status_code < 400 {
204 let raw: RawUploadResponse = serde_json::from_slice(&response.body)?;
205 return Ok(UploadResponse {
206 url: raw.url,
207 direct_path: raw.direct_path,
208 media_key: enc.media_key,
209 file_enc_sha256: enc.file_enc_sha256,
210 file_sha256: enc.file_sha256,
211 file_length,
212 media_key_timestamp,
213 });
214 }
215
216 let status_code = response.status_code;
217 let err = upload_error_from_response(response);
218
219 if is_media_auth_error(status_code) {
220 if attempt == 0 {
221 invalidate_media_conn().await;
222 force_refresh = true;
223 retry_with_fresh_auth = true;
224 break;
225 }
226
227 return Err(err);
228 }
229
230 last_error = Some(err);
231 }
232
233 if !retry_with_fresh_auth {
234 break;
235 }
236 }
237
238 Err(last_error.unwrap_or_else(|| anyhow!("Failed to upload to all available media hosts")))
239}
240
241#[derive(Debug, Clone)]
242pub struct UploadResponse {
243 pub url: String,
244 pub direct_path: String,
245 pub media_key: [u8; 32],
246 pub file_enc_sha256: [u8; 32],
247 pub file_sha256: [u8; 32],
248 pub file_length: u64,
249 pub media_key_timestamp: i64,
251}
252
253impl From<UploadResponse> for wacore::sticker_pack::MediaUploadInfo {
254 fn from(r: UploadResponse) -> Self {
255 Self::new(
256 r.direct_path,
257 r.media_key,
258 r.file_sha256,
259 r.file_enc_sha256,
260 r.file_length,
261 r.media_key_timestamp,
262 )
263 }
264}
265
266impl UploadResponse {
267 pub fn media_key_vec(&self) -> Vec<u8> {
269 self.media_key.to_vec()
270 }
271 pub fn file_sha256_vec(&self) -> Vec<u8> {
272 self.file_sha256.to_vec()
273 }
274 pub fn file_enc_sha256_vec(&self) -> Vec<u8> {
275 self.file_enc_sha256.to_vec()
276 }
277}
278
279#[derive(Deserialize)]
280struct RawUploadResponse {
281 url: String,
282 direct_path: String,
283}
284
285#[non_exhaustive]
286#[derive(Default, Clone)]
287pub struct UploadOptions {
288 pub media_key: Option<[u8; 32]>,
290}
291
292impl std::fmt::Debug for UploadOptions {
293 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294 f.debug_struct("UploadOptions")
295 .field("media_key", &self.media_key.as_ref().map(|_| "<redacted>"))
296 .finish()
297 }
298}
299
300impl UploadOptions {
301 pub fn new() -> Self {
302 Self::default()
303 }
304
305 pub fn with_media_key(mut self, key: [u8; 32]) -> Self {
306 self.media_key = Some(key);
307 self
308 }
309}
310
311impl Client {
312 pub async fn upload(
317 &self,
318 data: Vec<u8>,
319 media_type: MediaType,
320 options: UploadOptions,
321 ) -> Result<UploadResponse> {
322 let file_length = data.len() as u64;
323 let enc = wacore::runtime::blocking(&*self.runtime, move || {
324 wacore::upload::encrypt_media_with_key(&data, media_type, options.media_key.as_ref())
325 })
326 .await?;
327
328 upload_media_with_retry(
329 &enc,
330 media_type,
331 file_length,
332 wacore::time::now_secs(),
333 |force| async move { self.refresh_media_conn(force).await.map_err(Into::into) },
334 || async { self.invalidate_media_conn().await },
335 |request| async move { self.http_client.execute(request).await },
336 )
337 .await
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344 use crate::mediaconn::{MediaConn, MediaConnHost};
345 use async_lock::Mutex;
346 use std::sync::Arc;
347 use wacore::time::Instant;
348
349 fn media_conn(auth: &str, hosts: &[&str]) -> MediaConn {
350 MediaConn {
351 auth: auth.to_string(),
352 ttl: 60,
353 auth_ttl: None,
354 hosts: hosts
355 .iter()
356 .map(|hostname| MediaConnHost::new((*hostname).to_string()))
357 .collect(),
358 fetched_at: Instant::now(),
359 }
360 }
361
362 #[tokio::test]
363 async fn upload_retries_with_forced_media_conn_refresh_after_auth_error() {
364 let enc = wacore::upload::encrypt_media(b"retry me", MediaType::Image)
365 .expect("encryption should succeed");
366 let first_conn = media_conn("stale-auth", &["cdn1.example.com"]);
367 let refreshed_conn = media_conn("fresh-auth", &["cdn2.example.com"]);
368 let refresh_calls = Arc::new(Mutex::new(Vec::new()));
369 let invalidations = Arc::new(Mutex::new(0usize));
370 let seen_urls = Arc::new(Mutex::new(Vec::new()));
371
372 let result = upload_media_with_retry(
373 &enc,
374 MediaType::Image,
375 8,
376 0,
377 {
378 let refresh_calls = Arc::clone(&refresh_calls);
379 move |force| {
380 let refresh_calls = Arc::clone(&refresh_calls);
381 let first_conn = first_conn.clone();
382 let refreshed_conn = refreshed_conn.clone();
383 async move {
384 refresh_calls.lock().await.push(force);
385 Ok(if force { refreshed_conn } else { first_conn })
386 }
387 }
388 },
389 {
390 let invalidations = Arc::clone(&invalidations);
391 move || {
392 let invalidations = Arc::clone(&invalidations);
393 async move {
394 *invalidations.lock().await += 1;
395 }
396 }
397 },
398 {
399 let seen_urls = Arc::clone(&seen_urls);
400 move |request| {
401 let seen_urls = Arc::clone(&seen_urls);
402 async move {
403 seen_urls.lock().await.push(request.url.clone());
404 if request.url.contains("stale-auth") {
405 Ok(HttpResponse {
406 status_code: 401,
407 body: b"expired".to_vec(),
408 })
409 } else {
410 Ok(HttpResponse {
411 status_code: 200,
412 body: br#"{"url":"https://cdn2.example.com/file","direct_path":"/v/t62.7118-24/123"}"#.to_vec(),
413 })
414 }
415 }
416 }
417 },
418 )
419 .await
420 .expect("upload should succeed after refreshing media auth");
421
422 assert_eq!(*refresh_calls.lock().await, vec![false, true]);
423 assert_eq!(*invalidations.lock().await, 1);
424
425 let seen_urls = seen_urls.lock().await.clone();
426 assert_eq!(seen_urls.len(), 2);
427 assert!(seen_urls[0].contains("cdn1.example.com"));
428 assert!(seen_urls[0].contains("auth=stale-auth"));
429 assert!(seen_urls[1].contains("cdn2.example.com"));
430 assert!(seen_urls[1].contains("auth=fresh-auth"));
431 assert_eq!(result.direct_path, "/v/t62.7118-24/123");
432 assert_eq!(result.url, "https://cdn2.example.com/file");
433 assert_eq!(result.media_key_timestamp, 0);
434 }
435
436 #[tokio::test]
437 async fn upload_fails_over_to_next_host_after_non_auth_error() {
438 let enc = wacore::upload::encrypt_media(b"retry host", MediaType::Image)
439 .expect("encryption should succeed");
440 let conn = media_conn("shared-auth", &["cdn1.example.com", "cdn2.example.com"]);
441 let seen_urls = Arc::new(Mutex::new(Vec::new()));
442
443 let result = upload_media_with_retry(
444 &enc,
445 MediaType::Image,
446 10,
447 0,
448 move |_force| {
449 let conn = conn.clone();
450 async move { Ok(conn) }
451 },
452 || async {},
453 {
454 let seen_urls = Arc::clone(&seen_urls);
455 move |request| {
456 let seen_urls = Arc::clone(&seen_urls);
457 async move {
458 seen_urls.lock().await.push(request.url.clone());
459 if request.url.contains("cdn1.example.com") {
460 Ok(HttpResponse {
461 status_code: 500,
462 body: b"try another host".to_vec(),
463 })
464 } else {
465 Ok(HttpResponse {
466 status_code: 200,
467 body: br#"{"url":"https://cdn2.example.com/file","direct_path":"/v/t62.7118-24/456"}"#.to_vec(),
468 })
469 }
470 }
471 }
472 },
473 )
474 .await
475 .expect("upload should succeed on the second host");
476
477 let seen_urls = seen_urls.lock().await.clone();
478 assert_eq!(seen_urls.len(), 2);
479 assert!(seen_urls[0].contains("cdn1.example.com"));
480 assert!(seen_urls[1].contains("cdn2.example.com"));
481 assert_eq!(result.direct_path, "/v/t62.7118-24/456");
482 assert_eq!(result.media_key_timestamp, 0);
483 }
484}