1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use chrono::{DateTime, Utc};
6use reqwest::header::{AUTHORIZATION, CONTENT_TYPE, HeaderMap, USER_AGENT};
7use serde::de::DeserializeOwned;
8
9use crate::constants::{BASE_API_URL, VERSION};
10use crate::error::{self, Error};
11use crate::rate_limit::{RateLimitInfo, RateLimiter};
12
13#[derive(Debug)]
15pub enum RequestBody {
16 Json(serde_json::Value),
18 Form(HashMap<String, String>),
20}
21
22#[derive(Debug)]
24pub struct RequestOptions {
25 pub method: reqwest::Method,
27 pub path: String,
29 pub query_params: HashMap<String, String>,
31 pub body: Option<RequestBody>,
33 pub headers: HashMap<String, String>,
35}
36
37#[derive(Debug)]
39pub struct Response {
40 pub status_code: u16,
42 pub body: Vec<u8>,
44 pub request_id: String,
46 pub rate_limit: Option<RateLimitInfo>,
48 pub duration: Duration,
50}
51
52impl Response {
53 pub fn json<T: DeserializeOwned>(&self) -> crate::Result<T> {
55 if self.body.is_empty() {
56 return Err(error::new_api_error(
57 self.status_code,
58 "Empty response",
59 "Received empty response body",
60 &self.request_id,
61 ));
62 }
63
64 let text = String::from_utf8_lossy(&self.body);
65 let trimmed = text.trim();
66
67 if trimmed.is_empty() {
68 return Err(error::new_api_error(
69 self.status_code,
70 "Empty response",
71 "Received whitespace-only response",
72 &self.request_id,
73 ));
74 }
75
76 if !trimmed.starts_with('{') && !trimmed.starts_with('[') {
77 return Err(error::new_api_error(
78 self.status_code,
79 "Invalid JSON response",
80 &format!(
81 "Received non-JSON response: {}",
82 &trimmed[..trimmed
83 .char_indices()
84 .map(|(i, c)| i + c.len_utf8())
85 .take_while(|&end| end <= 200)
86 .last()
87 .unwrap_or(trimmed.len().min(200))]
88 ),
89 &self.request_id,
90 ));
91 }
92
93 serde_json::from_slice(&self.body).map_err(Error::from)
94 }
95}
96
97#[derive(Debug, Clone)]
99pub struct RetryConfig {
100 pub max_retries: u32,
102 pub initial_delay: Duration,
104 pub max_delay: Duration,
106 pub backoff_factor: f64,
108}
109
110impl Default for RetryConfig {
111 fn default() -> Self {
112 Self {
113 max_retries: 3,
114 initial_delay: Duration::from_secs(1),
115 max_delay: Duration::from_secs(30),
116 backoff_factor: 2.0,
117 }
118 }
119}
120
121pub struct HttpClient {
123 client: reqwest::Client,
124 retry_config: RetryConfig,
125 rate_limiter: Option<Arc<RateLimiter>>,
126 base_url: String,
127 user_agent: String,
128}
129
130impl HttpClient {
131 pub fn new(
133 timeout: Duration,
134 retry_config: RetryConfig,
135 rate_limiter: Option<Arc<RateLimiter>>,
136 base_url: Option<&str>,
137 user_agent: Option<&str>,
138 ) -> crate::Result<Self> {
139 let ua = user_agent
140 .map(|s| s.to_owned())
141 .unwrap_or_else(|| format!("threads-rs/{}", VERSION));
142
143 let client = reqwest::Client::builder().timeout(timeout).build()?;
144
145 Ok(Self {
146 client,
147 retry_config,
148 rate_limiter,
149 base_url: base_url.unwrap_or(BASE_API_URL).to_owned(),
150 user_agent: ua,
151 })
152 }
153
154 pub async fn do_request(
156 &self,
157 opts: &RequestOptions,
158 access_token: &str,
159 ) -> crate::Result<Response> {
160 let mut last_err: Option<Error> = None;
161 let mut delay = self.retry_config.initial_delay;
162
163 for attempt in 0..=self.retry_config.max_retries {
164 if let Some(ref rl) = self.rate_limiter {
166 if rl.should_wait().await {
167 rl.wait().await?;
168 }
169 }
170
171 if attempt > 0 {
172 tokio::time::sleep(delay).await;
173 delay =
174 Duration::from_secs_f64(delay.as_secs_f64() * self.retry_config.backoff_factor);
175 if delay > self.retry_config.max_delay {
176 delay = self.retry_config.max_delay;
177 }
178 }
179
180 match self.execute_request(opts, access_token).await {
181 Ok(resp) => {
182 if let (Some(rl), Some(info)) = (&self.rate_limiter, &resp.rate_limit) {
183 rl.update_from_headers(info).await;
184 }
185 return Ok(resp);
186 }
187 Err(err) => {
188 if !self.is_retryable_error(&err) {
189 return Err(err);
190 }
191 tracing::warn!(
192 attempt = attempt + 1,
193 max = self.retry_config.max_retries + 1,
194 error = %err,
195 "Retrying HTTP request"
196 );
197 last_err = Some(err);
198 }
199 }
200 }
201
202 Err(last_err.unwrap_or_else(|| {
203 error::new_network_error(0, "Request failed after retries", "", false)
204 }))
205 }
206
207 async fn execute_request(
209 &self,
210 opts: &RequestOptions,
211 access_token: &str,
212 ) -> crate::Result<Response> {
213 let start = Instant::now();
214
215 let url = format!("{}{}", self.base_url, opts.path);
216
217 let mut req = self.client.request(opts.method.clone(), &url);
218
219 if !opts.query_params.is_empty() {
221 req = req.query(
222 &opts
223 .query_params
224 .iter()
225 .collect::<Vec<(&String, &String)>>(),
226 );
227 }
228
229 req = req.header(USER_AGENT, &self.user_agent);
231 if !access_token.is_empty() {
232 req = req.header(AUTHORIZATION, format!("Bearer {}", access_token));
233 }
234
235 for (key, value) in &opts.headers {
237 req = req.header(key.as_str(), value.as_str());
238 }
239
240 if let Some(ref body) = opts.body {
242 match body {
243 RequestBody::Json(val) => {
244 req = req.header(CONTENT_TYPE, "application/json");
245 req = req.json(val);
246 }
247 RequestBody::Form(params) => {
248 req = req.form(params);
249 }
250 }
251 }
252
253 tracing::debug!(method = %opts.method, path = %opts.path, "HTTP request");
254
255 let http_resp = req.send().await.map_err(|e| self.wrap_network_error(e))?;
257 let status = http_resp.status().as_u16();
258 let request_id = http_resp
259 .headers()
260 .get("x-fb-request-id")
261 .and_then(|v| v.to_str().ok())
262 .unwrap_or("")
263 .to_owned();
264 let rate_limit = Self::parse_rate_limit_headers(http_resp.headers());
265
266 let body = http_resp
267 .bytes()
268 .await
269 .map_err(|e| {
270 error::new_network_error(0, "Failed to read response body", &e.to_string(), false)
271 })?
272 .to_vec();
273
274 let resp = Response {
275 status_code: status,
276 body,
277 request_id,
278 rate_limit,
279 duration: start.elapsed(),
280 };
281
282 tracing::debug!(
283 status = resp.status_code,
284 duration_ms = resp.duration.as_millis() as u64,
285 request_id = %resp.request_id,
286 "HTTP response"
287 );
288
289 if status >= 400 {
290 return Err(self.create_error_from_response(&resp).await);
291 }
292
293 Ok(resp)
294 }
295
296 pub async fn get(
298 &self,
299 path: &str,
300 query_params: HashMap<String, String>,
301 access_token: &str,
302 ) -> crate::Result<Response> {
303 self.do_request(
304 &RequestOptions {
305 method: reqwest::Method::GET,
306 path: path.to_owned(),
307 query_params,
308 body: None,
309 headers: HashMap::new(),
310 },
311 access_token,
312 )
313 .await
314 }
315
316 pub async fn post(
318 &self,
319 path: &str,
320 body: Option<RequestBody>,
321 access_token: &str,
322 ) -> crate::Result<Response> {
323 self.do_request(
324 &RequestOptions {
325 method: reqwest::Method::POST,
326 path: path.to_owned(),
327 query_params: HashMap::new(),
328 body,
329 headers: HashMap::new(),
330 },
331 access_token,
332 )
333 .await
334 }
335
336 pub async fn delete(&self, path: &str, access_token: &str) -> crate::Result<Response> {
338 self.do_request(
339 &RequestOptions {
340 method: reqwest::Method::DELETE,
341 path: path.to_owned(),
342 query_params: HashMap::new(),
343 body: None,
344 headers: HashMap::new(),
345 },
346 access_token,
347 )
348 .await
349 }
350
351 fn parse_rate_limit_headers(headers: &HeaderMap) -> Option<RateLimitInfo> {
353 let limit_header = headers
354 .get("x-ratelimit-limit")
355 .and_then(|v| v.to_str().ok())
356 .and_then(|v| v.parse::<u32>().ok());
357
358 let remaining_header = headers
359 .get("x-ratelimit-remaining")
360 .and_then(|v| v.to_str().ok())
361 .and_then(|v| v.parse::<u32>().ok());
362
363 let reset_header = headers
364 .get("x-ratelimit-reset")
365 .and_then(|v| v.to_str().ok())
366 .and_then(|v| v.parse::<i64>().ok())
367 .and_then(|ts| DateTime::from_timestamp(ts, 0));
368
369 let retry_after = headers
370 .get("retry-after")
371 .and_then(|v| v.to_str().ok())
372 .and_then(|v| v.parse::<u64>().ok())
373 .map(Duration::from_secs);
374
375 if limit_header.is_none()
377 && remaining_header.is_none()
378 && reset_header.is_none()
379 && retry_after.is_none()
380 {
381 return None;
382 }
383
384 Some(RateLimitInfo {
385 limit: limit_header.unwrap_or(0),
386 remaining: remaining_header.unwrap_or(0),
387 reset: reset_header.unwrap_or(DateTime::UNIX_EPOCH),
388 retry_after,
389 })
390 }
391
392 async fn create_error_from_response(&self, resp: &Response) -> Error {
394 #[derive(serde::Deserialize, Default)]
395 struct ApiErrorResponse {
396 #[serde(default)]
397 error: ApiErrorBody,
398 }
399
400 #[derive(serde::Deserialize, Default)]
401 struct ApiErrorBody {
402 #[serde(default)]
403 message: String,
404 #[serde(default)]
405 code: u16,
406 #[serde(default)]
407 is_transient: bool,
408 #[serde(default)]
409 error_subcode: u16,
410 }
411
412 let mut message = format!("HTTP {}", resp.status_code);
413 let mut error_code = resp.status_code;
414 let mut is_transient = false;
415 let mut error_subcode: u16 = 0;
416
417 if !resp.body.is_empty() {
418 if let Ok(api_err) = serde_json::from_slice::<ApiErrorResponse>(&resp.body) {
419 if !api_err.error.message.is_empty() {
420 message = api_err.error.message;
421 is_transient = api_err.error.is_transient;
422 error_subcode = api_err.error.error_subcode;
423 if api_err.error.code != 0 {
424 error_code = api_err.error.code;
425 }
426 }
427 }
428 }
429
430 let details = String::from_utf8_lossy(&resp.body);
431 let details = if details.len() > 500 {
432 let end = details
433 .char_indices()
434 .map(|(i, c)| i + c.len_utf8())
435 .take_while(|&end| end <= 500)
436 .last()
437 .unwrap_or(details.len().min(500));
438 format!("{}...", &details[..end])
439 } else {
440 details.into_owned()
441 };
442
443 let mut err = match resp.status_code {
444 401 | 403 => error::new_authentication_error(error_code, &message, &details),
445 429 => {
446 let retry_after = resp
447 .rate_limit
448 .as_ref()
449 .and_then(|rl| rl.retry_after)
450 .filter(|d| !d.is_zero())
451 .unwrap_or(Duration::from_secs(60));
452
453 if let Some(ref rl) = self.rate_limiter {
454 let reset_time = resp
455 .rate_limit
456 .as_ref()
457 .map(|rl| rl.reset)
458 .filter(|t| *t > Utc::now())
459 .unwrap_or_else(|| {
460 Utc::now()
461 + chrono::Duration::from_std(retry_after)
462 .unwrap_or(chrono::Duration::seconds(60))
463 });
464 rl.mark_rate_limited(reset_time).await;
465 }
466
467 error::new_rate_limit_error(error_code, &message, &details, retry_after)
468 }
469 400 | 422 => error::new_validation_error(error_code, &message, &details, ""),
470 _ => error::new_api_error(error_code, &message, &details, &resp.request_id),
471 };
472
473 error::set_error_metadata(&mut err, is_transient, resp.status_code, error_subcode);
474
475 err
476 }
477
478 fn is_retryable_error(&self, err: &Error) -> bool {
480 if err.is_retryable() {
481 return true;
482 }
483 if let Some(fields) = error::extract_base_fields(err) {
485 if fields.http_status_code >= 500 && fields.http_status_code < 600 {
486 return true;
487 }
488 }
489 false
490 }
491
492 fn wrap_network_error(&self, err: reqwest::Error) -> Error {
494 if err.is_timeout() {
495 return error::new_network_error_with_cause(
496 0,
497 "Request timeout",
498 &err.to_string(),
499 true,
500 Some(err),
501 );
502 }
503 if err.is_connect() {
504 return error::new_network_error_with_cause(
505 0,
506 "Connection error",
507 &err.to_string(),
508 true,
509 Some(err),
510 );
511 }
512 error::new_network_error_with_cause(0, "Network error", &err.to_string(), false, Some(err))
513 }
514}
515
516#[cfg(test)]
517mod tests {
518 use super::*;
519
520 #[test]
521 fn test_retry_config_default() {
522 let cfg = RetryConfig::default();
523 assert_eq!(cfg.max_retries, 3);
524 assert_eq!(cfg.initial_delay, Duration::from_secs(1));
525 assert_eq!(cfg.max_delay, Duration::from_secs(30));
526 assert!((cfg.backoff_factor - 2.0).abs() < f64::EPSILON);
527 }
528
529 #[test]
530 fn test_response_json_empty_body() {
531 let resp = Response {
532 status_code: 200,
533 body: vec![],
534 request_id: "test".to_owned(),
535 rate_limit: None,
536 duration: Duration::ZERO,
537 };
538 let result: Result<serde_json::Value, _> = resp.json();
539 assert!(result.is_err());
540 }
541
542 #[test]
543 fn test_response_json_valid() {
544 let resp = Response {
545 status_code: 200,
546 body: br#"{"id":"123"}"#.to_vec(),
547 request_id: "test".to_owned(),
548 rate_limit: None,
549 duration: Duration::ZERO,
550 };
551 let val: serde_json::Value = resp.json().unwrap();
552 assert_eq!(val["id"], "123");
553 }
554
555 #[test]
556 fn test_response_json_non_json() {
557 let resp = Response {
558 status_code: 200,
559 body: b"not json at all".to_vec(),
560 request_id: "test".to_owned(),
561 rate_limit: None,
562 duration: Duration::ZERO,
563 };
564 let result: Result<serde_json::Value, _> = resp.json();
565 assert!(result.is_err());
566 }
567
568 #[test]
569 fn test_parse_rate_limit_headers_empty() {
570 let headers = HeaderMap::new();
571 assert!(HttpClient::parse_rate_limit_headers(&headers).is_none());
572 }
573
574 #[test]
575 fn test_parse_rate_limit_headers_present() {
576 let mut headers = HeaderMap::new();
577 headers.insert("x-ratelimit-limit", "100".parse().unwrap());
578 headers.insert("x-ratelimit-remaining", "42".parse().unwrap());
579 headers.insert("x-ratelimit-reset", "1700000000".parse().unwrap());
580 headers.insert("retry-after", "60".parse().unwrap());
581
582 let info = HttpClient::parse_rate_limit_headers(&headers).unwrap();
583 assert_eq!(info.limit, 100);
584 assert_eq!(info.remaining, 42);
585 assert_eq!(info.retry_after, Some(Duration::from_secs(60)));
586 }
587
588 #[tokio::test]
589 async fn test_http_client_new() {
590 let client = HttpClient::new(
591 Duration::from_secs(30),
592 RetryConfig::default(),
593 None,
594 None,
595 None,
596 )
597 .unwrap();
598 assert_eq!(client.base_url, BASE_API_URL);
599 assert!(client.user_agent.starts_with("threads-rs/"));
600 }
601}