Skip to main content

threads_rs/
http.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use chrono::{DateTime, Utc};
6use reqwest::header::{AUTHORIZATION, CONTENT_TYPE, HeaderMap, USER_AGENT};
7use serde::de::DeserializeOwned;
8
9use crate::constants::{BASE_API_URL, VERSION};
10use crate::error::{self, Error};
11use crate::rate_limit::{RateLimitInfo, RateLimiter};
12
13/// Body for HTTP requests.
14#[derive(Debug)]
15pub enum RequestBody {
16    /// JSON request body.
17    Json(serde_json::Value),
18    /// URL-encoded form body.
19    Form(HashMap<String, String>),
20}
21
22/// Options for an HTTP request.
23#[derive(Debug)]
24pub struct RequestOptions {
25    /// HTTP method.
26    pub method: reqwest::Method,
27    /// API endpoint path.
28    pub path: String,
29    /// URL query parameters.
30    pub query_params: HashMap<String, String>,
31    /// Optional request body.
32    pub body: Option<RequestBody>,
33    /// Additional HTTP headers.
34    pub headers: HashMap<String, String>,
35}
36
37/// Response wrapper with metadata.
38#[derive(Debug)]
39pub struct Response {
40    /// HTTP status code.
41    pub status_code: u16,
42    /// Raw response body bytes.
43    pub body: Vec<u8>,
44    /// Facebook request ID from `x-fb-request-id` header.
45    pub request_id: String,
46    /// Rate limit info parsed from response headers.
47    pub rate_limit: Option<RateLimitInfo>,
48    /// Round-trip request duration.
49    pub duration: Duration,
50}
51
52impl Response {
53    /// Deserialize the response body as JSON.
54    pub fn json<T: DeserializeOwned>(&self) -> crate::Result<T> {
55        if self.body.is_empty() {
56            return Err(error::new_api_error(
57                self.status_code,
58                "Empty response",
59                "Received empty response body",
60                &self.request_id,
61            ));
62        }
63
64        let text = String::from_utf8_lossy(&self.body);
65        let trimmed = text.trim();
66
67        if trimmed.is_empty() {
68            return Err(error::new_api_error(
69                self.status_code,
70                "Empty response",
71                "Received whitespace-only response",
72                &self.request_id,
73            ));
74        }
75
76        if !trimmed.starts_with('{') && !trimmed.starts_with('[') {
77            return Err(error::new_api_error(
78                self.status_code,
79                "Invalid JSON response",
80                &format!(
81                    "Received non-JSON response: {}",
82                    &trimmed[..trimmed
83                        .char_indices()
84                        .map(|(i, c)| i + c.len_utf8())
85                        .take_while(|&end| end <= 200)
86                        .last()
87                        .unwrap_or(trimmed.len().min(200))]
88                ),
89                &self.request_id,
90            ));
91        }
92
93        serde_json::from_slice(&self.body).map_err(Error::from)
94    }
95}
96
97/// Retry configuration for HTTP requests.
98#[derive(Debug, Clone)]
99pub struct RetryConfig {
100    /// Maximum number of retry attempts.
101    pub max_retries: u32,
102    /// Delay before the first retry.
103    pub initial_delay: Duration,
104    /// Maximum delay between retries.
105    pub max_delay: Duration,
106    /// Multiplier applied to delay after each retry.
107    pub backoff_factor: f64,
108}
109
110impl Default for RetryConfig {
111    fn default() -> Self {
112        Self {
113            max_retries: 3,
114            initial_delay: Duration::from_secs(1),
115            max_delay: Duration::from_secs(30),
116            backoff_factor: 2.0,
117        }
118    }
119}
120
121/// HTTP client with retry logic, rate limiting, and error handling.
122pub struct HttpClient {
123    client: reqwest::Client,
124    retry_config: RetryConfig,
125    rate_limiter: Option<Arc<RateLimiter>>,
126    base_url: String,
127    user_agent: String,
128}
129
130impl HttpClient {
131    /// Create a new HTTP client.
132    pub fn new(
133        timeout: Duration,
134        retry_config: RetryConfig,
135        rate_limiter: Option<Arc<RateLimiter>>,
136        base_url: Option<&str>,
137        user_agent: Option<&str>,
138    ) -> crate::Result<Self> {
139        let ua = user_agent
140            .map(|s| s.to_owned())
141            .unwrap_or_else(|| format!("threads-rs/{}", VERSION));
142
143        let client = reqwest::Client::builder().timeout(timeout).build()?;
144
145        Ok(Self {
146            client,
147            retry_config,
148            rate_limiter,
149            base_url: base_url.unwrap_or(BASE_API_URL).to_owned(),
150            user_agent: ua,
151        })
152    }
153
154    /// Execute an HTTP request with retry logic and rate-limit awareness.
155    pub async fn do_request(
156        &self,
157        opts: &RequestOptions,
158        access_token: &str,
159    ) -> crate::Result<Response> {
160        let mut last_err: Option<Error> = None;
161        let mut delay = self.retry_config.initial_delay;
162
163        for attempt in 0..=self.retry_config.max_retries {
164            // Check rate limiter before each attempt
165            if let Some(ref rl) = self.rate_limiter {
166                if rl.should_wait().await {
167                    rl.wait().await?;
168                }
169            }
170
171            if attempt > 0 {
172                tokio::time::sleep(delay).await;
173                delay =
174                    Duration::from_secs_f64(delay.as_secs_f64() * self.retry_config.backoff_factor);
175                if delay > self.retry_config.max_delay {
176                    delay = self.retry_config.max_delay;
177                }
178            }
179
180            match self.execute_request(opts, access_token).await {
181                Ok(resp) => {
182                    if let (Some(rl), Some(info)) = (&self.rate_limiter, &resp.rate_limit) {
183                        rl.update_from_headers(info).await;
184                    }
185                    return Ok(resp);
186                }
187                Err(err) => {
188                    if !self.is_retryable_error(&err) {
189                        return Err(err);
190                    }
191                    tracing::warn!(
192                        attempt = attempt + 1,
193                        max = self.retry_config.max_retries + 1,
194                        error = %err,
195                        "Retrying HTTP request"
196                    );
197                    last_err = Some(err);
198                }
199            }
200        }
201
202        Err(last_err.unwrap_or_else(|| {
203            error::new_network_error(0, "Request failed after retries", "", false)
204        }))
205    }
206
207    /// Execute a single HTTP request.
208    async fn execute_request(
209        &self,
210        opts: &RequestOptions,
211        access_token: &str,
212    ) -> crate::Result<Response> {
213        let start = Instant::now();
214
215        let url = format!("{}{}", self.base_url, opts.path);
216
217        let mut req = self.client.request(opts.method.clone(), &url);
218
219        // Query parameters
220        if !opts.query_params.is_empty() {
221            req = req.query(
222                &opts
223                    .query_params
224                    .iter()
225                    .collect::<Vec<(&String, &String)>>(),
226            );
227        }
228
229        // Standard headers
230        req = req.header(USER_AGENT, &self.user_agent);
231        if !access_token.is_empty() {
232            req = req.header(AUTHORIZATION, format!("Bearer {}", access_token));
233        }
234
235        // Custom headers
236        for (key, value) in &opts.headers {
237            req = req.header(key.as_str(), value.as_str());
238        }
239
240        // Body
241        if let Some(ref body) = opts.body {
242            match body {
243                RequestBody::Json(val) => {
244                    req = req.header(CONTENT_TYPE, "application/json");
245                    req = req.json(val);
246                }
247                RequestBody::Form(params) => {
248                    req = req.form(params);
249                }
250            }
251        }
252
253        tracing::debug!(method = %opts.method, path = %opts.path, "HTTP request");
254
255        // Execute
256        let http_resp = req.send().await.map_err(|e| self.wrap_network_error(e))?;
257        let status = http_resp.status().as_u16();
258        let request_id = http_resp
259            .headers()
260            .get("x-fb-request-id")
261            .and_then(|v| v.to_str().ok())
262            .unwrap_or("")
263            .to_owned();
264        let rate_limit = Self::parse_rate_limit_headers(http_resp.headers());
265
266        let body = http_resp
267            .bytes()
268            .await
269            .map_err(|e| {
270                error::new_network_error(0, "Failed to read response body", &e.to_string(), false)
271            })?
272            .to_vec();
273
274        let resp = Response {
275            status_code: status,
276            body,
277            request_id,
278            rate_limit,
279            duration: start.elapsed(),
280        };
281
282        tracing::debug!(
283            status = resp.status_code,
284            duration_ms = resp.duration.as_millis() as u64,
285            request_id = %resp.request_id,
286            "HTTP response"
287        );
288
289        if status >= 400 {
290            return Err(self.create_error_from_response(&resp).await);
291        }
292
293        Ok(resp)
294    }
295
296    /// Convenience: GET request.
297    pub async fn get(
298        &self,
299        path: &str,
300        query_params: HashMap<String, String>,
301        access_token: &str,
302    ) -> crate::Result<Response> {
303        self.do_request(
304            &RequestOptions {
305                method: reqwest::Method::GET,
306                path: path.to_owned(),
307                query_params,
308                body: None,
309                headers: HashMap::new(),
310            },
311            access_token,
312        )
313        .await
314    }
315
316    /// Convenience: POST request with optional body.
317    pub async fn post(
318        &self,
319        path: &str,
320        body: Option<RequestBody>,
321        access_token: &str,
322    ) -> crate::Result<Response> {
323        self.do_request(
324            &RequestOptions {
325                method: reqwest::Method::POST,
326                path: path.to_owned(),
327                query_params: HashMap::new(),
328                body,
329                headers: HashMap::new(),
330            },
331            access_token,
332        )
333        .await
334    }
335
336    /// Convenience: DELETE request.
337    pub async fn delete(&self, path: &str, access_token: &str) -> crate::Result<Response> {
338        self.do_request(
339            &RequestOptions {
340                method: reqwest::Method::DELETE,
341                path: path.to_owned(),
342                query_params: HashMap::new(),
343                body: None,
344                headers: HashMap::new(),
345            },
346            access_token,
347        )
348        .await
349    }
350
351    /// Parse rate limit info from response headers.
352    fn parse_rate_limit_headers(headers: &HeaderMap) -> Option<RateLimitInfo> {
353        let limit_header = headers
354            .get("x-ratelimit-limit")
355            .and_then(|v| v.to_str().ok())
356            .and_then(|v| v.parse::<u32>().ok());
357
358        let remaining_header = headers
359            .get("x-ratelimit-remaining")
360            .and_then(|v| v.to_str().ok())
361            .and_then(|v| v.parse::<u32>().ok());
362
363        let reset_header = headers
364            .get("x-ratelimit-reset")
365            .and_then(|v| v.to_str().ok())
366            .and_then(|v| v.parse::<i64>().ok())
367            .and_then(|ts| DateTime::from_timestamp(ts, 0));
368
369        let retry_after = headers
370            .get("retry-after")
371            .and_then(|v| v.to_str().ok())
372            .and_then(|v| v.parse::<u64>().ok())
373            .map(Duration::from_secs);
374
375        // Only return info if at least one rate limit header was present
376        if limit_header.is_none()
377            && remaining_header.is_none()
378            && reset_header.is_none()
379            && retry_after.is_none()
380        {
381            return None;
382        }
383
384        Some(RateLimitInfo {
385            limit: limit_header.unwrap_or(0),
386            remaining: remaining_header.unwrap_or(0),
387            reset: reset_header.unwrap_or(DateTime::UNIX_EPOCH),
388            retry_after,
389        })
390    }
391
392    /// Create a typed error from an API error response.
393    async fn create_error_from_response(&self, resp: &Response) -> Error {
394        #[derive(serde::Deserialize, Default)]
395        struct ApiErrorResponse {
396            #[serde(default)]
397            error: ApiErrorBody,
398        }
399
400        #[derive(serde::Deserialize, Default)]
401        struct ApiErrorBody {
402            #[serde(default)]
403            message: String,
404            #[serde(default)]
405            code: u16,
406            #[serde(default)]
407            is_transient: bool,
408            #[serde(default)]
409            error_subcode: u16,
410        }
411
412        let mut message = format!("HTTP {}", resp.status_code);
413        let mut error_code = resp.status_code;
414        let mut is_transient = false;
415        let mut error_subcode: u16 = 0;
416
417        if !resp.body.is_empty() {
418            if let Ok(api_err) = serde_json::from_slice::<ApiErrorResponse>(&resp.body) {
419                if !api_err.error.message.is_empty() {
420                    message = api_err.error.message;
421                    is_transient = api_err.error.is_transient;
422                    error_subcode = api_err.error.error_subcode;
423                    if api_err.error.code != 0 {
424                        error_code = api_err.error.code;
425                    }
426                }
427            }
428        }
429
430        let details = String::from_utf8_lossy(&resp.body);
431        let details = if details.len() > 500 {
432            let end = details
433                .char_indices()
434                .map(|(i, c)| i + c.len_utf8())
435                .take_while(|&end| end <= 500)
436                .last()
437                .unwrap_or(details.len().min(500));
438            format!("{}...", &details[..end])
439        } else {
440            details.into_owned()
441        };
442
443        let mut err = match resp.status_code {
444            401 | 403 => error::new_authentication_error(error_code, &message, &details),
445            429 => {
446                let retry_after = resp
447                    .rate_limit
448                    .as_ref()
449                    .and_then(|rl| rl.retry_after)
450                    .filter(|d| !d.is_zero())
451                    .unwrap_or(Duration::from_secs(60));
452
453                if let Some(ref rl) = self.rate_limiter {
454                    let reset_time = resp
455                        .rate_limit
456                        .as_ref()
457                        .map(|rl| rl.reset)
458                        .filter(|t| *t > Utc::now())
459                        .unwrap_or_else(|| {
460                            Utc::now()
461                                + chrono::Duration::from_std(retry_after)
462                                    .unwrap_or(chrono::Duration::seconds(60))
463                        });
464                    rl.mark_rate_limited(reset_time).await;
465                }
466
467                error::new_rate_limit_error(error_code, &message, &details, retry_after)
468            }
469            400 | 422 => error::new_validation_error(error_code, &message, &details, ""),
470            _ => error::new_api_error(error_code, &message, &details, &resp.request_id),
471        };
472
473        error::set_error_metadata(&mut err, is_transient, resp.status_code, error_subcode);
474
475        err
476    }
477
478    /// Check if an error should trigger a retry.
479    fn is_retryable_error(&self, err: &Error) -> bool {
480        if err.is_retryable() {
481            return true;
482        }
483        // Also retry 5xx server errors
484        if let Some(fields) = error::extract_base_fields(err) {
485            if fields.http_status_code >= 500 && fields.http_status_code < 600 {
486                return true;
487            }
488        }
489        false
490    }
491
492    /// Wrap a reqwest error into our typed errors.
493    fn wrap_network_error(&self, err: reqwest::Error) -> Error {
494        if err.is_timeout() {
495            return error::new_network_error_with_cause(
496                0,
497                "Request timeout",
498                &err.to_string(),
499                true,
500                Some(err),
501            );
502        }
503        if err.is_connect() {
504            return error::new_network_error_with_cause(
505                0,
506                "Connection error",
507                &err.to_string(),
508                true,
509                Some(err),
510            );
511        }
512        error::new_network_error_with_cause(0, "Network error", &err.to_string(), false, Some(err))
513    }
514}
515
516#[cfg(test)]
517mod tests {
518    use super::*;
519
520    #[test]
521    fn test_retry_config_default() {
522        let cfg = RetryConfig::default();
523        assert_eq!(cfg.max_retries, 3);
524        assert_eq!(cfg.initial_delay, Duration::from_secs(1));
525        assert_eq!(cfg.max_delay, Duration::from_secs(30));
526        assert!((cfg.backoff_factor - 2.0).abs() < f64::EPSILON);
527    }
528
529    #[test]
530    fn test_response_json_empty_body() {
531        let resp = Response {
532            status_code: 200,
533            body: vec![],
534            request_id: "test".to_owned(),
535            rate_limit: None,
536            duration: Duration::ZERO,
537        };
538        let result: Result<serde_json::Value, _> = resp.json();
539        assert!(result.is_err());
540    }
541
542    #[test]
543    fn test_response_json_valid() {
544        let resp = Response {
545            status_code: 200,
546            body: br#"{"id":"123"}"#.to_vec(),
547            request_id: "test".to_owned(),
548            rate_limit: None,
549            duration: Duration::ZERO,
550        };
551        let val: serde_json::Value = resp.json().unwrap();
552        assert_eq!(val["id"], "123");
553    }
554
555    #[test]
556    fn test_response_json_non_json() {
557        let resp = Response {
558            status_code: 200,
559            body: b"not json at all".to_vec(),
560            request_id: "test".to_owned(),
561            rate_limit: None,
562            duration: Duration::ZERO,
563        };
564        let result: Result<serde_json::Value, _> = resp.json();
565        assert!(result.is_err());
566    }
567
568    #[test]
569    fn test_parse_rate_limit_headers_empty() {
570        let headers = HeaderMap::new();
571        assert!(HttpClient::parse_rate_limit_headers(&headers).is_none());
572    }
573
574    #[test]
575    fn test_parse_rate_limit_headers_present() {
576        let mut headers = HeaderMap::new();
577        headers.insert("x-ratelimit-limit", "100".parse().unwrap());
578        headers.insert("x-ratelimit-remaining", "42".parse().unwrap());
579        headers.insert("x-ratelimit-reset", "1700000000".parse().unwrap());
580        headers.insert("retry-after", "60".parse().unwrap());
581
582        let info = HttpClient::parse_rate_limit_headers(&headers).unwrap();
583        assert_eq!(info.limit, 100);
584        assert_eq!(info.remaining, 42);
585        assert_eq!(info.retry_after, Some(Duration::from_secs(60)));
586    }
587
588    #[tokio::test]
589    async fn test_http_client_new() {
590        let client = HttpClient::new(
591            Duration::from_secs(30),
592            RetryConfig::default(),
593            None,
594            None,
595            None,
596        )
597        .unwrap();
598        assert_eq!(client.base_url, BASE_API_URL);
599        assert!(client.user_agent.starts_with("threads-rs/"));
600    }
601}