Skip to main content

whatsapp_rust/
upload.rs

1use anyhow::{Result, anyhow};
2use base64::Engine;
3use serde::Deserialize;
4use wacore::download::MediaType;
5
6use crate::client::Client;
7use crate::http::{HttpRequest, HttpResponse};
8use crate::mediaconn::{MEDIA_AUTH_REFRESH_RETRY_ATTEMPTS, is_media_auth_error};
9
10/// Files >= 5 MiB check for existing/partial upload before sending.
11/// Matches WA Web's `_checkIfAlreadyUploaded` flow.
12const RESUMABLE_UPLOAD_THRESHOLD: usize = 5 * 1024 * 1024;
13
14/// Result of checking if an upload already exists on the server.
15enum UploadExistsResult {
16    /// Upload is complete — server already has the file.
17    Complete { url: String, direct_path: String },
18    /// Upload is partially done — resume from this byte offset.
19    Resume { byte_offset: u64 },
20    /// No previous upload found — start from scratch.
21    NotFound,
22}
23
24/// Server response for upload progress check (`?resume=1`).
25#[derive(Deserialize)]
26struct UploadProgressResponse {
27    #[serde(default)]
28    url: Option<String>,
29    #[serde(default)]
30    direct_path: Option<String>,
31    /// "complete" or a byte offset as string.
32    #[serde(default)]
33    resume: Option<String>,
34}
35
36/// Parse an upload progress response into an `UploadExistsResult`.
37fn parse_upload_progress(resp: &HttpResponse, total_size: u64) -> UploadExistsResult {
38    if resp.status_code >= 400 {
39        return UploadExistsResult::NotFound;
40    }
41    let Ok(progress) = serde_json::from_slice::<UploadProgressResponse>(&resp.body) else {
42        return UploadExistsResult::NotFound;
43    };
44    match progress.resume.as_deref() {
45        Some("complete") => {
46            if let (Some(url), Some(direct_path)) = (progress.url, progress.direct_path) {
47                UploadExistsResult::Complete { url, direct_path }
48            } else {
49                UploadExistsResult::NotFound
50            }
51        }
52        Some(offset_str) => match offset_str.parse::<u64>() {
53            Ok(offset) if offset > 0 && offset < total_size => UploadExistsResult::Resume {
54                byte_offset: offset,
55            },
56            _ => UploadExistsResult::NotFound,
57        },
58        _ => UploadExistsResult::NotFound,
59    }
60}
61
62fn build_upload_request(
63    hostname: &str,
64    mms_type: &str,
65    auth: &str,
66    token: &str,
67    body: &[u8],
68    file_offset: Option<u64>,
69) -> HttpRequest {
70    let mut url = format!("https://{hostname}/mms/{mms_type}/{token}?auth={auth}&token={token}");
71    if let Some(offset) = file_offset {
72        url.push_str(&format!("&file_offset={offset}"));
73    }
74
75    HttpRequest::post(url)
76        .with_header("Content-Type", "application/octet-stream")
77        .with_header("Origin", "https://web.whatsapp.com")
78        .with_body(body.to_vec())
79}
80
81fn build_resume_check_request(
82    hostname: &str,
83    mms_type: &str,
84    auth: &str,
85    token: &str,
86) -> HttpRequest {
87    let url =
88        format!("https://{hostname}/mms/{mms_type}/{token}?auth={auth}&token={token}&resume=1");
89    HttpRequest::post(url).with_header("Origin", "https://web.whatsapp.com")
90}
91
92fn upload_error_from_response(response: HttpResponse) -> anyhow::Error {
93    match response.body_string() {
94        Ok(body) => anyhow!("Upload failed {} body={}", response.status_code, body),
95        Err(body_err) => anyhow!(
96            "Upload failed {} and failed to read response body: {}",
97            response.status_code,
98            body_err
99        ),
100    }
101}
102
103async fn upload_media_with_retry<
104    GetMediaConn,
105    GetMediaConnFut,
106    InvalidateMediaConn,
107    InvalidateMediaConnFut,
108    ExecuteRequest,
109    ExecuteRequestFut,
110>(
111    enc: &wacore::upload::EncryptedMedia,
112    media_type: MediaType,
113    file_length: u64,
114    mut get_media_conn: GetMediaConn,
115    mut invalidate_media_conn: InvalidateMediaConn,
116    mut execute_request: ExecuteRequest,
117) -> Result<UploadResponse>
118where
119    GetMediaConn: FnMut(bool) -> GetMediaConnFut,
120    GetMediaConnFut: std::future::Future<Output = Result<crate::mediaconn::MediaConn>>,
121    InvalidateMediaConn: FnMut() -> InvalidateMediaConnFut,
122    InvalidateMediaConnFut: std::future::Future<Output = ()>,
123    ExecuteRequest: FnMut(HttpRequest) -> ExecuteRequestFut,
124    ExecuteRequestFut: std::future::Future<Output = Result<HttpResponse>>,
125{
126    let token = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(enc.file_enc_sha256);
127    let mms_type = media_type.mms_type();
128    let mut force_refresh = false;
129    let mut last_error: Option<anyhow::Error> = None;
130
131    for attempt in 0..=MEDIA_AUTH_REFRESH_RETRY_ATTEMPTS {
132        let media_conn = get_media_conn(force_refresh).await?;
133        if media_conn.hosts.is_empty() {
134            return Err(anyhow!("No media hosts"));
135        }
136
137        let mut retry_with_fresh_auth = false;
138
139        for host in &media_conn.hosts {
140            // For large files, check if the upload already exists or can be resumed.
141            // Matches WA Web's _checkIfAlreadyUploaded / _getExistingOrUpload flow.
142            let mut upload_data: &[u8] = &enc.data_to_upload;
143            let mut file_offset: Option<u64> = None;
144
145            if enc.data_to_upload.len() >= RESUMABLE_UPLOAD_THRESHOLD {
146                let check_req =
147                    build_resume_check_request(&host.hostname, mms_type, &media_conn.auth, &token);
148                if let Ok(check_resp) = execute_request(check_req).await {
149                    let total = enc.data_to_upload.len() as u64;
150                    match parse_upload_progress(&check_resp, total) {
151                        UploadExistsResult::Complete { url, direct_path } => {
152                            return Ok(UploadResponse {
153                                url,
154                                direct_path,
155                                media_key: enc.media_key.to_vec(),
156                                file_enc_sha256: enc.file_enc_sha256.to_vec(),
157                                file_sha256: enc.file_sha256.to_vec(),
158                                file_length,
159                            });
160                        }
161                        UploadExistsResult::Resume { byte_offset } => {
162                            log::info!("Resuming upload from byte {byte_offset}/{total}");
163                            upload_data = &enc.data_to_upload[byte_offset as usize..];
164                            file_offset = Some(byte_offset);
165                        }
166                        UploadExistsResult::NotFound => {}
167                    }
168                }
169                // Non-fatal: if check request itself fails, proceed with full upload
170            }
171
172            let request = build_upload_request(
173                &host.hostname,
174                mms_type,
175                &media_conn.auth,
176                &token,
177                upload_data,
178                file_offset,
179            );
180
181            let response = match execute_request(request).await {
182                Ok(response) => response,
183                Err(err) => {
184                    last_error = Some(err);
185                    continue;
186                }
187            };
188
189            if response.status_code < 400 {
190                let raw: RawUploadResponse = serde_json::from_slice(&response.body)?;
191                return Ok(UploadResponse {
192                    url: raw.url,
193                    direct_path: raw.direct_path,
194                    media_key: enc.media_key.to_vec(),
195                    file_enc_sha256: enc.file_enc_sha256.to_vec(),
196                    file_sha256: enc.file_sha256.to_vec(),
197                    file_length,
198                });
199            }
200
201            let status_code = response.status_code;
202            let err = upload_error_from_response(response);
203
204            if is_media_auth_error(status_code) {
205                if attempt == 0 {
206                    invalidate_media_conn().await;
207                    force_refresh = true;
208                    retry_with_fresh_auth = true;
209                    break;
210                }
211
212                return Err(err);
213            }
214
215            last_error = Some(err);
216        }
217
218        if !retry_with_fresh_auth {
219            break;
220        }
221    }
222
223    Err(last_error.unwrap_or_else(|| anyhow!("Failed to upload to all available media hosts")))
224}
225
226#[derive(Debug, Clone)]
227pub struct UploadResponse {
228    pub url: String,
229    pub direct_path: String,
230    pub media_key: Vec<u8>,
231    pub file_enc_sha256: Vec<u8>,
232    pub file_sha256: Vec<u8>,
233    pub file_length: u64,
234}
235
236#[derive(Deserialize)]
237struct RawUploadResponse {
238    url: String,
239    direct_path: String,
240}
241
242impl Client {
243    pub async fn upload(&self, data: Vec<u8>, media_type: MediaType) -> Result<UploadResponse> {
244        let enc = wacore::runtime::blocking(&*self.runtime, {
245            let data = data.clone();
246            move || wacore::upload::encrypt_media(&data, media_type)
247        })
248        .await?;
249
250        upload_media_with_retry(
251            &enc,
252            media_type,
253            data.len() as u64,
254            |force| async move { self.refresh_media_conn(force).await.map_err(Into::into) },
255            || async { self.invalidate_media_conn().await },
256            |request| async move { self.http_client.execute(request).await },
257        )
258        .await
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265    use crate::mediaconn::{MediaConn, MediaConnHost};
266    use async_lock::Mutex;
267    use std::sync::Arc;
268    use wacore::time::Instant;
269
270    fn media_conn(auth: &str, hosts: &[&str]) -> MediaConn {
271        MediaConn {
272            auth: auth.to_string(),
273            ttl: 60,
274            auth_ttl: None,
275            hosts: hosts
276                .iter()
277                .map(|hostname| MediaConnHost::new((*hostname).to_string()))
278                .collect(),
279            fetched_at: Instant::now(),
280        }
281    }
282
283    #[tokio::test]
284    async fn upload_retries_with_forced_media_conn_refresh_after_auth_error() {
285        let enc = wacore::upload::encrypt_media(b"retry me", MediaType::Image)
286            .expect("encryption should succeed");
287        let first_conn = media_conn("stale-auth", &["cdn1.example.com"]);
288        let refreshed_conn = media_conn("fresh-auth", &["cdn2.example.com"]);
289        let refresh_calls = Arc::new(Mutex::new(Vec::new()));
290        let invalidations = Arc::new(Mutex::new(0usize));
291        let seen_urls = Arc::new(Mutex::new(Vec::new()));
292
293        let result = upload_media_with_retry(
294            &enc,
295            MediaType::Image,
296            8,
297            {
298                let refresh_calls = Arc::clone(&refresh_calls);
299                move |force| {
300                    let refresh_calls = Arc::clone(&refresh_calls);
301                    let first_conn = first_conn.clone();
302                    let refreshed_conn = refreshed_conn.clone();
303                    async move {
304                        refresh_calls.lock().await.push(force);
305                        Ok(if force { refreshed_conn } else { first_conn })
306                    }
307                }
308            },
309            {
310                let invalidations = Arc::clone(&invalidations);
311                move || {
312                    let invalidations = Arc::clone(&invalidations);
313                    async move {
314                        *invalidations.lock().await += 1;
315                    }
316                }
317            },
318            {
319                let seen_urls = Arc::clone(&seen_urls);
320                move |request| {
321                    let seen_urls = Arc::clone(&seen_urls);
322                    async move {
323                        seen_urls.lock().await.push(request.url.clone());
324                        if request.url.contains("stale-auth") {
325                            Ok(HttpResponse {
326                                status_code: 401,
327                                body: b"expired".to_vec(),
328                            })
329                        } else {
330                            Ok(HttpResponse {
331                                status_code: 200,
332                                body: br#"{"url":"https://cdn2.example.com/file","direct_path":"/v/t62.7118-24/123"}"#.to_vec(),
333                            })
334                        }
335                    }
336                }
337            },
338        )
339        .await
340        .expect("upload should succeed after refreshing media auth");
341
342        assert_eq!(*refresh_calls.lock().await, vec![false, true]);
343        assert_eq!(*invalidations.lock().await, 1);
344
345        let seen_urls = seen_urls.lock().await.clone();
346        assert_eq!(seen_urls.len(), 2);
347        assert!(seen_urls[0].contains("cdn1.example.com"));
348        assert!(seen_urls[0].contains("auth=stale-auth"));
349        assert!(seen_urls[1].contains("cdn2.example.com"));
350        assert!(seen_urls[1].contains("auth=fresh-auth"));
351        assert_eq!(result.direct_path, "/v/t62.7118-24/123");
352        assert_eq!(result.url, "https://cdn2.example.com/file");
353    }
354
355    #[tokio::test]
356    async fn upload_fails_over_to_next_host_after_non_auth_error() {
357        let enc = wacore::upload::encrypt_media(b"retry host", MediaType::Image)
358            .expect("encryption should succeed");
359        let conn = media_conn("shared-auth", &["cdn1.example.com", "cdn2.example.com"]);
360        let seen_urls = Arc::new(Mutex::new(Vec::new()));
361
362        let result = upload_media_with_retry(
363            &enc,
364            MediaType::Image,
365            10,
366            move |_force| {
367                let conn = conn.clone();
368                async move { Ok(conn) }
369            },
370            || async {},
371            {
372                let seen_urls = Arc::clone(&seen_urls);
373                move |request| {
374                    let seen_urls = Arc::clone(&seen_urls);
375                    async move {
376                        seen_urls.lock().await.push(request.url.clone());
377                        if request.url.contains("cdn1.example.com") {
378                            Ok(HttpResponse {
379                                status_code: 500,
380                                body: b"try another host".to_vec(),
381                            })
382                        } else {
383                            Ok(HttpResponse {
384                                status_code: 200,
385                                body: br#"{"url":"https://cdn2.example.com/file","direct_path":"/v/t62.7118-24/456"}"#.to_vec(),
386                            })
387                        }
388                    }
389                }
390            },
391        )
392        .await
393        .expect("upload should succeed on the second host");
394
395        let seen_urls = seen_urls.lock().await.clone();
396        assert_eq!(seen_urls.len(), 2);
397        assert!(seen_urls[0].contains("cdn1.example.com"));
398        assert!(seen_urls[1].contains("cdn2.example.com"));
399        assert_eq!(result.direct_path, "/v/t62.7118-24/456");
400    }
401}