Skip to main content

whatsapp_rust/
upload.rs

1use anyhow::{Result, anyhow};
2use base64::Engine;
3use serde::Deserialize;
4use wacore::download::MediaType;
5
6use crate::client::Client;
7use crate::http::{HttpRequest, HttpResponse};
8use crate::mediaconn::{MEDIA_AUTH_REFRESH_RETRY_ATTEMPTS, is_media_auth_error};
9
10/// Files >= 5 MiB check for existing/partial upload before sending.
11/// Matches WA Web's `_checkIfAlreadyUploaded` flow.
12const RESUMABLE_UPLOAD_THRESHOLD: usize = 5 * 1024 * 1024;
13
14/// Result of checking if an upload already exists on the server.
15enum UploadExistsResult {
16    /// Upload is complete — server already has the file.
17    Complete { url: String, direct_path: String },
18    /// Upload is partially done — resume from this byte offset.
19    Resume { byte_offset: u64 },
20    /// No previous upload found — start from scratch.
21    NotFound,
22}
23
24/// Server response for upload progress check (`?resume=1`).
25#[derive(Deserialize)]
26struct UploadProgressResponse {
27    #[serde(default)]
28    url: Option<String>,
29    #[serde(default)]
30    direct_path: Option<String>,
31    /// "complete" or a byte offset as string.
32    #[serde(default)]
33    resume: Option<String>,
34}
35
36/// Parse an upload progress response into an `UploadExistsResult`.
37fn parse_upload_progress(resp: &HttpResponse, total_size: u64) -> UploadExistsResult {
38    if resp.status_code >= 400 {
39        return UploadExistsResult::NotFound;
40    }
41    let Ok(progress) = serde_json::from_slice::<UploadProgressResponse>(&resp.body) else {
42        return UploadExistsResult::NotFound;
43    };
44    match progress.resume.as_deref() {
45        Some("complete") => {
46            if let (Some(url), Some(direct_path)) = (progress.url, progress.direct_path) {
47                UploadExistsResult::Complete { url, direct_path }
48            } else {
49                UploadExistsResult::NotFound
50            }
51        }
52        Some(offset_str) => match offset_str.parse::<u64>() {
53            Ok(offset) if offset > 0 && offset < total_size => UploadExistsResult::Resume {
54                byte_offset: offset,
55            },
56            _ => UploadExistsResult::NotFound,
57        },
58        _ => UploadExistsResult::NotFound,
59    }
60}
61
62fn build_upload_request(
63    hostname: &str,
64    upload_path: &str,
65    auth: &str,
66    token: &str,
67    body: &[u8],
68    file_offset: Option<u64>,
69) -> HttpRequest {
70    let mut url = format!("https://{hostname}{upload_path}/{token}?auth={auth}&token={token}");
71    if let Some(offset) = file_offset {
72        url.push_str("&file_offset=");
73        url.push_str(itoa::Buffer::new().format(offset));
74    }
75
76    HttpRequest::post(url)
77        .with_header("Content-Type", "application/octet-stream")
78        .with_header("Origin", "https://web.whatsapp.com")
79        .with_body(body.to_vec())
80}
81
82fn build_resume_check_request(
83    hostname: &str,
84    upload_path: &str,
85    auth: &str,
86    token: &str,
87) -> HttpRequest {
88    let url = format!("https://{hostname}{upload_path}/{token}?auth={auth}&token={token}&resume=1");
89    HttpRequest::post(url).with_header("Origin", "https://web.whatsapp.com")
90}
91
92fn upload_error_from_response(response: HttpResponse) -> anyhow::Error {
93    match response.body_string() {
94        Ok(body) => anyhow!("Upload failed {} body={}", response.status_code, body),
95        Err(body_err) => anyhow!(
96            "Upload failed {} and failed to read response body: {}",
97            response.status_code,
98            body_err
99        ),
100    }
101}
102
103async fn upload_media_with_retry<
104    GetMediaConn,
105    GetMediaConnFut,
106    InvalidateMediaConn,
107    InvalidateMediaConnFut,
108    ExecuteRequest,
109    ExecuteRequestFut,
110>(
111    enc: &wacore::upload::EncryptedMedia,
112    media_type: MediaType,
113    file_length: u64,
114    media_key_timestamp: i64,
115    mut get_media_conn: GetMediaConn,
116    mut invalidate_media_conn: InvalidateMediaConn,
117    mut execute_request: ExecuteRequest,
118) -> Result<UploadResponse>
119where
120    GetMediaConn: FnMut(bool) -> GetMediaConnFut,
121    GetMediaConnFut: std::future::Future<Output = Result<crate::mediaconn::MediaConn>>,
122    InvalidateMediaConn: FnMut() -> InvalidateMediaConnFut,
123    InvalidateMediaConnFut: std::future::Future<Output = ()>,
124    ExecuteRequest: FnMut(HttpRequest) -> ExecuteRequestFut,
125    ExecuteRequestFut: std::future::Future<Output = Result<HttpResponse>>,
126{
127    let token = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(enc.file_enc_sha256);
128    let upload_path = media_type.upload_path();
129    let mut force_refresh = false;
130    let mut last_error: Option<anyhow::Error> = None;
131
132    for attempt in 0..=MEDIA_AUTH_REFRESH_RETRY_ATTEMPTS {
133        let media_conn = get_media_conn(force_refresh).await?;
134        if media_conn.hosts.is_empty() {
135            return Err(anyhow!("No media hosts"));
136        }
137
138        let mut retry_with_fresh_auth = false;
139
140        for host in &media_conn.hosts {
141            // For large files, check if the upload already exists or can be resumed.
142            // Matches WA Web's _checkIfAlreadyUploaded / _getExistingOrUpload flow.
143            let mut upload_data: &[u8] = &enc.data_to_upload;
144            let mut file_offset: Option<u64> = None;
145
146            if enc.data_to_upload.len() >= RESUMABLE_UPLOAD_THRESHOLD {
147                let check_req = build_resume_check_request(
148                    &host.hostname,
149                    upload_path,
150                    &media_conn.auth,
151                    &token,
152                );
153                if let Ok(check_resp) = execute_request(check_req).await {
154                    let total = enc.data_to_upload.len() as u64;
155                    match parse_upload_progress(&check_resp, total) {
156                        UploadExistsResult::Complete { url, direct_path } => {
157                            return Ok(UploadResponse {
158                                url,
159                                direct_path,
160                                media_key: enc.media_key,
161                                file_enc_sha256: enc.file_enc_sha256,
162                                file_sha256: enc.file_sha256,
163                                file_length,
164                                media_key_timestamp,
165                            });
166                        }
167                        UploadExistsResult::Resume { byte_offset } => {
168                            let offset = byte_offset as usize;
169                            if offset >= enc.data_to_upload.len() {
170                                log::warn!(
171                                    "Server resume offset {offset} exceeds data length {}; uploading from start",
172                                    enc.data_to_upload.len()
173                                );
174                            } else {
175                                log::info!("Resuming upload from byte {byte_offset}/{total}");
176                                upload_data = &enc.data_to_upload[offset..];
177                                file_offset = Some(byte_offset);
178                            }
179                        }
180                        UploadExistsResult::NotFound => {}
181                    }
182                }
183                // Non-fatal: if check request itself fails, proceed with full upload
184            }
185
186            let request = build_upload_request(
187                &host.hostname,
188                upload_path,
189                &media_conn.auth,
190                &token,
191                upload_data,
192                file_offset,
193            );
194
195            let response = match execute_request(request).await {
196                Ok(response) => response,
197                Err(err) => {
198                    last_error = Some(err);
199                    continue;
200                }
201            };
202
203            if response.status_code < 400 {
204                let raw: RawUploadResponse = serde_json::from_slice(&response.body)?;
205                return Ok(UploadResponse {
206                    url: raw.url,
207                    direct_path: raw.direct_path,
208                    media_key: enc.media_key,
209                    file_enc_sha256: enc.file_enc_sha256,
210                    file_sha256: enc.file_sha256,
211                    file_length,
212                    media_key_timestamp,
213                });
214            }
215
216            let status_code = response.status_code;
217            let err = upload_error_from_response(response);
218
219            if is_media_auth_error(status_code) {
220                if attempt == 0 {
221                    invalidate_media_conn().await;
222                    force_refresh = true;
223                    retry_with_fresh_auth = true;
224                    break;
225                }
226
227                return Err(err);
228            }
229
230            last_error = Some(err);
231        }
232
233        if !retry_with_fresh_auth {
234            break;
235        }
236    }
237
238    Err(last_error.unwrap_or_else(|| anyhow!("Failed to upload to all available media hosts")))
239}
240
241#[derive(Debug, Clone)]
242pub struct UploadResponse {
243    pub url: String,
244    pub direct_path: String,
245    pub media_key: [u8; 32],
246    pub file_enc_sha256: [u8; 32],
247    pub file_sha256: [u8; 32],
248    pub file_length: u64,
249    /// Unix timestamp (seconds) when the media key was generated.
250    pub media_key_timestamp: i64,
251}
252
253impl From<UploadResponse> for wacore::sticker_pack::MediaUploadInfo {
254    fn from(r: UploadResponse) -> Self {
255        Self::new(
256            r.direct_path,
257            r.media_key,
258            r.file_sha256,
259            r.file_enc_sha256,
260            r.file_length,
261            r.media_key_timestamp,
262        )
263    }
264}
265
266impl UploadResponse {
267    /// Convert crypto fields to `Vec<u8>` for protobuf message construction.
268    pub fn media_key_vec(&self) -> Vec<u8> {
269        self.media_key.to_vec()
270    }
271    pub fn file_sha256_vec(&self) -> Vec<u8> {
272        self.file_sha256.to_vec()
273    }
274    pub fn file_enc_sha256_vec(&self) -> Vec<u8> {
275        self.file_enc_sha256.to_vec()
276    }
277}
278
279#[derive(Deserialize)]
280struct RawUploadResponse {
281    url: String,
282    direct_path: String,
283}
284
285#[non_exhaustive]
286#[derive(Default, Clone)]
287pub struct UploadOptions {
288    /// Reuse an existing media key instead of generating a fresh one.
289    pub media_key: Option<[u8; 32]>,
290}
291
292impl std::fmt::Debug for UploadOptions {
293    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294        f.debug_struct("UploadOptions")
295            .field("media_key", &self.media_key.as_ref().map(|_| "<redacted>"))
296            .finish()
297    }
298}
299
300impl UploadOptions {
301    pub fn new() -> Self {
302        Self::default()
303    }
304
305    pub fn with_media_key(mut self, key: [u8; 32]) -> Self {
306        self.media_key = Some(key);
307        self
308    }
309}
310
311impl Client {
312    /// Encrypts and uploads media to WhatsApp's CDN.
313    ///
314    /// Only needed for new or modified media. To forward existing media unchanged,
315    /// reuse the original message's CDN fields directly, no round-trip required.
316    pub async fn upload(
317        &self,
318        data: Vec<u8>,
319        media_type: MediaType,
320        options: UploadOptions,
321    ) -> Result<UploadResponse> {
322        let file_length = data.len() as u64;
323        let enc = wacore::runtime::blocking(&*self.runtime, move || {
324            wacore::upload::encrypt_media_with_key(&data, media_type, options.media_key.as_ref())
325        })
326        .await?;
327
328        upload_media_with_retry(
329            &enc,
330            media_type,
331            file_length,
332            wacore::time::now_secs(),
333            |force| async move { self.refresh_media_conn(force).await.map_err(Into::into) },
334            || async { self.invalidate_media_conn().await },
335            |request| async move { self.http_client.execute(request).await },
336        )
337        .await
338    }
339}
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344    use crate::mediaconn::{MediaConn, MediaConnHost};
345    use async_lock::Mutex;
346    use std::sync::Arc;
347    use wacore::time::Instant;
348
349    fn media_conn(auth: &str, hosts: &[&str]) -> MediaConn {
350        MediaConn {
351            auth: auth.to_string(),
352            ttl: 60,
353            auth_ttl: None,
354            hosts: hosts
355                .iter()
356                .map(|hostname| MediaConnHost::new((*hostname).to_string()))
357                .collect(),
358            fetched_at: Instant::now(),
359        }
360    }
361
362    #[tokio::test]
363    async fn upload_retries_with_forced_media_conn_refresh_after_auth_error() {
364        let enc = wacore::upload::encrypt_media(b"retry me", MediaType::Image)
365            .expect("encryption should succeed");
366        let first_conn = media_conn("stale-auth", &["cdn1.example.com"]);
367        let refreshed_conn = media_conn("fresh-auth", &["cdn2.example.com"]);
368        let refresh_calls = Arc::new(Mutex::new(Vec::new()));
369        let invalidations = Arc::new(Mutex::new(0usize));
370        let seen_urls = Arc::new(Mutex::new(Vec::new()));
371
372        let result = upload_media_with_retry(
373            &enc,
374            MediaType::Image,
375            8,
376            0,
377            {
378                let refresh_calls = Arc::clone(&refresh_calls);
379                move |force| {
380                    let refresh_calls = Arc::clone(&refresh_calls);
381                    let first_conn = first_conn.clone();
382                    let refreshed_conn = refreshed_conn.clone();
383                    async move {
384                        refresh_calls.lock().await.push(force);
385                        Ok(if force { refreshed_conn } else { first_conn })
386                    }
387                }
388            },
389            {
390                let invalidations = Arc::clone(&invalidations);
391                move || {
392                    let invalidations = Arc::clone(&invalidations);
393                    async move {
394                        *invalidations.lock().await += 1;
395                    }
396                }
397            },
398            {
399                let seen_urls = Arc::clone(&seen_urls);
400                move |request| {
401                    let seen_urls = Arc::clone(&seen_urls);
402                    async move {
403                        seen_urls.lock().await.push(request.url.clone());
404                        if request.url.contains("stale-auth") {
405                            Ok(HttpResponse {
406                                status_code: 401,
407                                body: b"expired".to_vec(),
408                            })
409                        } else {
410                            Ok(HttpResponse {
411                                status_code: 200,
412                                body: br#"{"url":"https://cdn2.example.com/file","direct_path":"/v/t62.7118-24/123"}"#.to_vec(),
413                            })
414                        }
415                    }
416                }
417            },
418        )
419        .await
420        .expect("upload should succeed after refreshing media auth");
421
422        assert_eq!(*refresh_calls.lock().await, vec![false, true]);
423        assert_eq!(*invalidations.lock().await, 1);
424
425        let seen_urls = seen_urls.lock().await.clone();
426        assert_eq!(seen_urls.len(), 2);
427        assert!(seen_urls[0].contains("cdn1.example.com"));
428        assert!(seen_urls[0].contains("auth=stale-auth"));
429        assert!(seen_urls[1].contains("cdn2.example.com"));
430        assert!(seen_urls[1].contains("auth=fresh-auth"));
431        assert_eq!(result.direct_path, "/v/t62.7118-24/123");
432        assert_eq!(result.url, "https://cdn2.example.com/file");
433        assert_eq!(result.media_key_timestamp, 0);
434    }
435
436    #[tokio::test]
437    async fn upload_fails_over_to_next_host_after_non_auth_error() {
438        let enc = wacore::upload::encrypt_media(b"retry host", MediaType::Image)
439            .expect("encryption should succeed");
440        let conn = media_conn("shared-auth", &["cdn1.example.com", "cdn2.example.com"]);
441        let seen_urls = Arc::new(Mutex::new(Vec::new()));
442
443        let result = upload_media_with_retry(
444            &enc,
445            MediaType::Image,
446            10,
447            0,
448            move |_force| {
449                let conn = conn.clone();
450                async move { Ok(conn) }
451            },
452            || async {},
453            {
454                let seen_urls = Arc::clone(&seen_urls);
455                move |request| {
456                    let seen_urls = Arc::clone(&seen_urls);
457                    async move {
458                        seen_urls.lock().await.push(request.url.clone());
459                        if request.url.contains("cdn1.example.com") {
460                            Ok(HttpResponse {
461                                status_code: 500,
462                                body: b"try another host".to_vec(),
463                            })
464                        } else {
465                            Ok(HttpResponse {
466                                status_code: 200,
467                                body: br#"{"url":"https://cdn2.example.com/file","direct_path":"/v/t62.7118-24/456"}"#.to_vec(),
468                            })
469                        }
470                    }
471                }
472            },
473        )
474        .await
475        .expect("upload should succeed on the second host");
476
477        let seen_urls = seen_urls.lock().await.clone();
478        assert_eq!(seen_urls.len(), 2);
479        assert!(seen_urls[0].contains("cdn1.example.com"));
480        assert!(seen_urls[1].contains("cdn2.example.com"));
481        assert_eq!(result.direct_path, "/v/t62.7118-24/456");
482        assert_eq!(result.media_key_timestamp, 0);
483    }
484}