1use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
7use reqwest::multipart::{Form, Part};
8use serde::de::DeserializeOwned;
9use serde_json::Value;
10use std::env;
11use std::time::Duration;
12use thiserror::Error;
13
14use crate::shared_client::{DEFAULT_CONNECT_TIMEOUT_SECS, DEFAULT_POOL_SIZE};
15use crate::x402::X402Payer;
16
17const PAYMENT_REQUIRED_HEADER: &str = "PAYMENT-REQUIRED";
18const X_PAYMENT_REQUIRED_HEADER: &str = "X-PAYMENT-REQUIRED";
19const PAYMENT_SIGNATURE_HEADER: &str = "PAYMENT-SIGNATURE";
20
21#[derive(Debug, Clone)]
23pub struct HttpErrorDetail {
24 pub status: u16,
25 pub url: String,
26 pub message: String,
27 pub body_snippet: Option<String>,
28}
29
30impl std::fmt::Display for HttpErrorDetail {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 write!(f, "HTTP {} for {}: {}", self.status, self.url, self.message)?;
33 if let Some(ref snippet) = self.body_snippet {
34 let truncated: String = snippet.chars().take(200).collect();
35 write!(f, " | body[0:200]={}", truncated)?;
36 }
37 Ok(())
38 }
39}
40
41#[derive(Debug, Error)]
43pub enum HttpError {
44 #[error("request failed: {0} (is_connect={}, is_timeout={})", .0.is_connect(), .0.is_timeout())]
45 Request(#[from] reqwest::Error),
46
47 #[error("{0}")]
48 Response(HttpErrorDetail),
49
50 #[error("invalid url: {0}")]
51 InvalidUrl(String),
52
53 #[error("json parse error: {0}")]
54 JsonParse(String),
55}
56
57#[derive(Debug, Clone)]
59pub struct MultipartFile {
60 pub field: String,
61 pub filename: String,
62 pub bytes: Vec<u8>,
63 pub content_type: Option<String>,
64}
65
66impl MultipartFile {
67 pub fn new(
68 field: impl Into<String>,
69 filename: impl Into<String>,
70 bytes: Vec<u8>,
71 content_type: Option<String>,
72 ) -> Self {
73 Self {
74 field: field.into(),
75 filename: filename.into(),
76 bytes,
77 content_type,
78 }
79 }
80}
81
82impl HttpError {
83 pub fn from_response(status: u16, url: &str, body: Option<&str>) -> Self {
85 let body_snippet = body.map(|s| s.chars().take(4096).collect());
89 HttpError::Response(HttpErrorDetail {
90 status,
91 url: url.to_string(),
92 message: "request_failed".to_string(),
93 body_snippet,
94 })
95 }
96
97 pub fn status(&self) -> Option<u16> {
99 match self {
100 HttpError::Response(detail) => Some(detail.status),
101 HttpError::Request(e) => e.status().map(|s| s.as_u16()),
102 _ => None,
103 }
104 }
105}
106
107#[derive(Clone)]
118pub struct HttpClient {
119 client: reqwest::Client,
120 base_url: String,
121 #[allow(dead_code)]
122 api_key: String,
123 x402_payer: Option<X402Payer>,
124}
125
126impl HttpClient {
127 pub fn new(base_url: &str, api_key: &str, timeout_secs: u64) -> Result<Self, HttpError> {
141 let mut headers = HeaderMap::new();
142
143 if !api_key.is_empty() {
145 let auth_value = format!("Bearer {}", api_key);
146 headers.insert(
147 AUTHORIZATION,
148 HeaderValue::from_str(&auth_value)
149 .map_err(|_| HttpError::InvalidUrl("invalid api key characters".to_string()))?,
150 );
151 headers.insert(
152 "X-API-Key",
153 HeaderValue::from_str(api_key)
154 .map_err(|_| HttpError::InvalidUrl("invalid api key characters".to_string()))?,
155 );
156 }
157
158 if let Some(user_id) = env::var("SYNTH_USER_ID")
160 .ok()
161 .or_else(|| env::var("X_USER_ID").ok())
162 {
163 if let Ok(val) = HeaderValue::from_str(&user_id) {
164 headers.insert("X-User-ID", val);
165 }
166 }
167
168 if let Some(org_id) = env::var("SYNTH_ORG_ID")
169 .ok()
170 .or_else(|| env::var("X_ORG_ID").ok())
171 {
172 if let Ok(val) = HeaderValue::from_str(&org_id) {
173 headers.insert("X-Org-ID", val);
174 }
175 }
176
177 let client = reqwest::Client::builder()
178 .default_headers(headers)
179 .timeout(Duration::from_secs(timeout_secs))
180 .pool_max_idle_per_host(DEFAULT_POOL_SIZE)
181 .pool_idle_timeout(Duration::from_secs(90))
182 .connect_timeout(Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS))
183 .tcp_keepalive(Duration::from_secs(60))
184 .tcp_nodelay(true)
185 .build()
186 .map_err(HttpError::Request)?;
187
188 let x402_payer = X402Payer::from_env();
189
190 Ok(Self {
191 client,
192 base_url: base_url.trim_end_matches('/').to_string(),
193 api_key: api_key.to_string(),
194 x402_payer,
195 })
196 }
197
198 pub(crate) fn api_key(&self) -> &str {
200 &self.api_key
201 }
202
203 fn abs_url(&self, path: &str) -> String {
205 if path.starts_with("http://") || path.starts_with("https://") {
206 return path.to_string();
207 }
208
209 let path = path.trim_start_matches('/');
210
211 if self.base_url.ends_with("/api") && path.starts_with("api/") {
213 return format!("{}/{}", self.base_url, &path[4..]);
214 }
215
216 format!("{}/{}", self.base_url, path)
217 }
218
219 pub async fn get<T: DeserializeOwned>(
226 &self,
227 path: &str,
228 params: Option<&[(&str, &str)]>,
229 ) -> Result<T, HttpError> {
230 let url = self.abs_url(path);
231 let mut req = self.client.get(&url);
232
233 if let Some(p) = params {
234 req = req.query(p);
235 }
236
237 let request = req.build().map_err(HttpError::Request)?;
238 let (status, _headers, body) = self.send_with_x402_retry(request).await?;
239 self.parse_json(status, &url, &body)
240 }
241
242 pub async fn get_bytes(
244 &self,
245 path: &str,
246 params: Option<&[(&str, &str)]>,
247 ) -> Result<Vec<u8>, HttpError> {
248 let url = self.abs_url(path);
249 let mut request = self.client.get(&url);
250 if let Some(params) = params {
251 request = request.query(params);
252 }
253 let request = request.build().map_err(HttpError::Request)?;
254 let (status, _headers, body) = self.send_with_x402_retry(request).await?;
255 if (200..300).contains(&status) {
256 return Ok(body.to_vec());
257 }
258 let text = String::from_utf8_lossy(&body);
259 Err(HttpError::from_response(
260 status,
261 &url,
262 if text.trim().is_empty() {
263 None
264 } else {
265 Some(&text)
266 },
267 ))
268 }
269
270 pub async fn get_json(
272 &self,
273 path: &str,
274 params: Option<&[(&str, &str)]>,
275 ) -> Result<Value, HttpError> {
276 self.get(path, params).await
277 }
278
279 pub async fn post_json<T: DeserializeOwned>(
286 &self,
287 path: &str,
288 body: &Value,
289 ) -> Result<T, HttpError> {
290 let url = self.abs_url(path);
291 let request = self
292 .client
293 .post(&url)
294 .json(body)
295 .build()
296 .map_err(HttpError::Request)?;
297 let (status, _headers, body_bytes) = self.send_with_x402_retry(request).await?;
298 self.parse_json(status, &url, &body_bytes)
299 }
300
301 pub async fn post_json_with_headers<T: DeserializeOwned>(
303 &self,
304 path: &str,
305 body: &Value,
306 extra_headers: Option<HeaderMap>,
307 ) -> Result<T, HttpError> {
308 let url = self.abs_url(path);
309 let mut request = self.client.post(&url).json(body);
310 if let Some(headers) = extra_headers {
311 request = request.headers(headers);
312 }
313 let request = request.build().map_err(HttpError::Request)?;
314 let (status, _headers, body_bytes) = self.send_with_x402_retry(request).await?;
315 self.parse_json(status, &url, &body_bytes)
316 }
317
318 pub async fn post_multipart<T: DeserializeOwned>(
326 &self,
327 path: &str,
328 data: &[(String, String)],
329 files: &[MultipartFile],
330 ) -> Result<T, HttpError> {
331 let url = self.abs_url(path);
332 let mut form = Form::new();
333 for (key, value) in data {
334 form = form.text(key.clone(), value.clone());
335 }
336 for file in files {
337 let part = Part::bytes(file.bytes.clone()).file_name(file.filename.clone());
338 let part = match &file.content_type {
339 Some(ct) => part.mime_str(ct).unwrap_or_else(|_| {
340 Part::bytes(file.bytes.clone()).file_name(file.filename.clone())
341 }),
342 None => part,
343 };
344 form = form.part(file.field.clone(), part);
345 }
346 let request = self
347 .client
348 .post(&url)
349 .multipart(form)
350 .build()
351 .map_err(HttpError::Request)?;
352 let (status, _headers, body_bytes) = self.send_with_x402_retry(request).await?;
353 self.parse_json(status, &url, &body_bytes)
354 }
355
356 pub async fn delete(&self, path: &str) -> Result<(), HttpError> {
362 let url = self.abs_url(path);
363 let request = self
364 .client
365 .delete(&url)
366 .build()
367 .map_err(HttpError::Request)?;
368 let (status, _headers, body_bytes) = self.send_with_x402_retry(request).await?;
369 if (200..300).contains(&status) {
370 return Ok(());
371 }
372 let text = String::from_utf8_lossy(&body_bytes);
373 Err(HttpError::from_response(
374 status,
375 &url,
376 if text.trim().is_empty() {
377 None
378 } else {
379 Some(&text)
380 },
381 ))
382 }
383
384 fn parse_json<T: DeserializeOwned>(
385 &self,
386 status: u16,
387 url: &str,
388 body: &[u8],
389 ) -> Result<T, HttpError> {
390 if !(200..300).contains(&status) {
391 let text = String::from_utf8_lossy(body);
392 return Err(HttpError::from_response(status, url, Some(&text)));
393 }
394
395 serde_json::from_slice(body).map_err(|e| {
396 let text = String::from_utf8_lossy(body);
397 HttpError::JsonParse(format!("{}: {}", e, &text[..text.len().min(100)]))
398 })
399 }
400
401 async fn send_with_x402_retry(
402 &self,
403 request: reqwest::Request,
404 ) -> Result<(u16, HeaderMap, bytes::Bytes), HttpError> {
405 let Some(first) = request.try_clone() else {
406 let resp = self.client.execute(request).await?;
408 let status = resp.status().as_u16();
409 let headers = resp.headers().clone();
410 let body = resp.bytes().await?;
411 return Ok((status, headers, body));
412 };
413
414 let resp = self.client.execute(first).await?;
415 let status = resp.status().as_u16();
416 let headers = resp.headers().clone();
417 let body = resp.bytes().await?;
418
419 if status != 402 {
420 return Ok((status, headers, body));
421 }
422
423 let Some(payer) = self.x402_payer.as_ref() else {
424 return Ok((status, headers, body));
425 };
426
427 let Some(payment_required_header) = extract_payment_required_header(&headers, &body) else {
428 return Ok((status, headers, body));
429 };
430
431 let Ok(payment_signature_header) =
432 payer.build_payment_signature_header(&payment_required_header)
433 else {
434 return Ok((status, headers, body));
435 };
436
437 let Some(mut retry) = request.try_clone() else {
438 return Ok((status, headers, body));
439 };
440
441 retry.headers_mut().insert(
442 PAYMENT_SIGNATURE_HEADER,
443 HeaderValue::from_str(&payment_signature_header).map_err(|_| {
444 HttpError::InvalidUrl("invalid x402 payment signature header".to_string())
445 })?,
446 );
447
448 let resp2 = self.client.execute(retry).await?;
449 let status2 = resp2.status().as_u16();
450 let headers2 = resp2.headers().clone();
451 let body2 = resp2.bytes().await?;
452 Ok((status2, headers2, body2))
453 }
454}
455
456fn extract_payment_required_header(headers: &HeaderMap, body: &[u8]) -> Option<String> {
457 let direct = headers
458 .get(PAYMENT_REQUIRED_HEADER)
459 .or_else(|| headers.get(X_PAYMENT_REQUIRED_HEADER))
460 .and_then(|v| v.to_str().ok())
461 .map(|v| v.to_string());
462 if direct.is_some() {
463 return direct;
464 }
465
466 let parsed = serde_json::from_slice::<serde_json::Value>(body).ok()?;
468 let detail = parsed.get("detail").unwrap_or(&parsed);
469 let x402 = detail.get("x402")?;
470 x402.get("payment_required_header")
471 .and_then(|v| v.as_str())
472 .map(|v| v.to_string())
473}
474
475#[cfg(test)]
476mod tests {
477 use super::*;
478 use crate::x402::{
479 decode_payment_signature_header, recover_payer_address_from_payment_payload,
480 PaymentRequired, PaymentRequirements, ResourceInfo,
481 };
482 use base64::Engine as _;
483 use bytes::Bytes;
484 use http_body_util::Full;
485 use hyper::service::service_fn;
486 use hyper::{Request, Response, StatusCode};
487 use hyper_util::rt::{TokioExecutor, TokioIo};
488 use hyper_util::server::conn::auto::Builder;
489 use std::sync::atomic::{AtomicUsize, Ordering};
490 use std::sync::Arc;
491 use tokio::net::TcpListener;
492
493 #[test]
494 fn test_abs_url_relative() {
495 let client = HttpClient::new("https://api.usesynth.ai", "test_key", 30).unwrap();
496 assert_eq!(
497 client.abs_url("/api/v1/jobs"),
498 "https://api.usesynth.ai/api/v1/jobs"
499 );
500 assert_eq!(
501 client.abs_url("api/v1/jobs"),
502 "https://api.usesynth.ai/api/v1/jobs"
503 );
504 }
505
506 #[test]
507 fn test_abs_url_absolute() {
508 let client = HttpClient::new("https://api.usesynth.ai", "test_key", 30).unwrap();
509 assert_eq!(
510 client.abs_url("https://other.com/path"),
511 "https://other.com/path"
512 );
513 }
514
515 #[test]
516 fn test_abs_url_api_prefix_dedup() {
517 let client = HttpClient::new("https://api.usesynth.ai/api", "test_key", 30).unwrap();
518 assert_eq!(
519 client.abs_url("api/v1/jobs"),
520 "https://api.usesynth.ai/api/v1/jobs"
521 );
522 }
523
524 #[test]
525 fn test_http_error_display() {
526 let err = HttpError::from_response(404, "https://api.example.com/test", Some("not found"));
527 let msg = format!("{}", err);
528 assert!(msg.contains("404"));
529 assert!(msg.contains("api.example.com"));
530 }
531
532 #[tokio::test]
533 async fn test_http_client_x402_auto_retry() {
534 std::env::set_var(
536 "SYNTH_X402_PRIVATE_KEY",
537 "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
538 );
539 let payer = X402Payer::from_env().unwrap();
540
541 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
543 let addr = listener.local_addr().unwrap();
544 let base_url = format!("http://{}", addr);
545
546 let pay_to = "0x1111111111111111111111111111111111111111".to_string();
548 let asset = "0x036CbD53842c5426634e7929541eC2318f3dCF7e".to_string();
549 let payment_required = PaymentRequired {
550 x402_version: 2,
551 error: Some("Payment required".to_string()),
552 resource: Some(ResourceInfo {
553 url: format!("{}/test", base_url),
554 description: Some("unit test".to_string()),
555 mime_type: Some("application/json".to_string()),
556 }),
557 accepts: vec![PaymentRequirements {
558 scheme: "exact".to_string(),
559 network: "eip155:84532".to_string(),
560 asset,
561 amount: "250000".to_string(),
562 pay_to,
563 max_timeout_seconds: 300,
564 extra: Some(serde_json::json!({"name": "USDC", "version": "2"})),
565 }],
566 extensions: None,
567 };
568 let required_header = base64::engine::general_purpose::STANDARD
569 .encode(serde_json::to_vec(&payment_required).unwrap());
570
571 let request_count = Arc::new(AtomicUsize::new(0));
572 let expected_payer = payer.address().to_string();
573
574 let required_header_clone = required_header.clone();
575 let request_count_clone = request_count.clone();
576 let expected_payer_clone = expected_payer.clone();
577
578 tokio::spawn(async move {
579 loop {
580 let (stream, _) = match listener.accept().await {
581 Ok(value) => value,
582 Err(_) => break,
583 };
584 let io = TokioIo::new(stream);
585 let required = required_header_clone.clone();
586 let count = request_count_clone.clone();
587 let expected = expected_payer_clone.clone();
588
589 tokio::spawn(async move {
590 let svc = service_fn(move |req: Request<hyper::body::Incoming>| {
591 let required = required.clone();
592 let count = count.clone();
593 let expected = expected.clone();
594
595 async move {
596 let n = count.fetch_add(1, Ordering::SeqCst);
597 if n == 0 {
598 let mut resp = Response::new(Full::new(Bytes::from_static(b"")));
599 *resp.status_mut() = StatusCode::PAYMENT_REQUIRED;
600 resp.headers_mut().insert(
601 PAYMENT_REQUIRED_HEADER,
602 HeaderValue::from_str(&required).unwrap(),
603 );
604 return Ok::<_, hyper::Error>(resp);
605 }
606
607 let sig_header = req
608 .headers()
609 .get(PAYMENT_SIGNATURE_HEADER)
610 .and_then(|v| v.to_str().ok())
611 .unwrap_or("");
612 assert!(!sig_header.trim().is_empty());
613
614 let payment_payload =
615 decode_payment_signature_header(sig_header).unwrap();
616 let recovered =
617 recover_payer_address_from_payment_payload(&payment_payload)
618 .unwrap();
619 assert_eq!(recovered, expected);
620
621 let body =
622 serde_json::to_vec(&serde_json::json!({"ok": true})).unwrap();
623 let mut resp = Response::new(Full::new(Bytes::from(body)));
624 *resp.status_mut() = StatusCode::OK;
625 Ok::<_, hyper::Error>(resp)
626 }
627 });
628
629 let _ = Builder::new(TokioExecutor::new())
630 .serve_connection(io, svc)
631 .await;
632 });
633 }
634 });
635
636 let client = HttpClient::new(&base_url, "test_key", 30).unwrap();
638
639 std::env::remove_var("SYNTH_X402_PRIVATE_KEY");
641
642 let result: Value = client.get_json("/test", None).await.unwrap();
643 assert_eq!(result.get("ok").and_then(|v| v.as_bool()), Some(true));
644 assert_eq!(request_count.load(Ordering::SeqCst), 2);
645 }
646}