Skip to main content

synth_ai_core/
http.rs

1//! HTTP client for Synth API calls.
2//!
3//! This module provides an async HTTP client with Bearer authentication,
4//! optional dev headers (X-User-ID, X-Org-ID), and proper error handling.
5
6use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
7use reqwest::multipart::{Form, Part};
8use serde::de::DeserializeOwned;
9use serde_json::Value;
10use std::env;
11use std::time::Duration;
12use thiserror::Error;
13
14use crate::shared_client::{DEFAULT_CONNECT_TIMEOUT_SECS, DEFAULT_POOL_SIZE};
15use crate::x402::X402Payer;
16
17const PAYMENT_REQUIRED_HEADER: &str = "PAYMENT-REQUIRED";
18const X_PAYMENT_REQUIRED_HEADER: &str = "X-PAYMENT-REQUIRED";
19const PAYMENT_SIGNATURE_HEADER: &str = "PAYMENT-SIGNATURE";
20
21/// HTTP error details.
22#[derive(Debug, Clone)]
23pub struct HttpErrorDetail {
24    pub status: u16,
25    pub url: String,
26    pub message: String,
27    pub body_snippet: Option<String>,
28}
29
30impl std::fmt::Display for HttpErrorDetail {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        write!(f, "HTTP {} for {}: {}", self.status, self.url, self.message)?;
33        if let Some(ref snippet) = self.body_snippet {
34            let truncated: String = snippet.chars().take(200).collect();
35            write!(f, " | body[0:200]={}", truncated)?;
36        }
37        Ok(())
38    }
39}
40
41/// HTTP client errors.
42#[derive(Debug, Error)]
43pub enum HttpError {
44    #[error("request failed: {0} (is_connect={}, is_timeout={})", .0.is_connect(), .0.is_timeout())]
45    Request(#[from] reqwest::Error),
46
47    #[error("{0}")]
48    Response(HttpErrorDetail),
49
50    #[error("invalid url: {0}")]
51    InvalidUrl(String),
52
53    #[error("json parse error: {0}")]
54    JsonParse(String),
55}
56
57/// Multipart file payload.
58#[derive(Debug, Clone)]
59pub struct MultipartFile {
60    pub field: String,
61    pub filename: String,
62    pub bytes: Vec<u8>,
63    pub content_type: Option<String>,
64}
65
66impl MultipartFile {
67    pub fn new(
68        field: impl Into<String>,
69        filename: impl Into<String>,
70        bytes: Vec<u8>,
71        content_type: Option<String>,
72    ) -> Self {
73        Self {
74            field: field.into(),
75            filename: filename.into(),
76            bytes,
77            content_type,
78        }
79    }
80}
81
82impl HttpError {
83    /// Create an HTTP error from a response.
84    pub fn from_response(status: u16, url: &str, body: Option<&str>) -> Self {
85        // Keep enough body to preserve structured JSON error payloads.
86        // Display paths still truncate to 200 chars, but parsers (e.g. rate-limit
87        // detail extraction) need the full object to avoid fallback placeholders.
88        let body_snippet = body.map(|s| s.chars().take(4096).collect());
89        HttpError::Response(HttpErrorDetail {
90            status,
91            url: url.to_string(),
92            message: "request_failed".to_string(),
93            body_snippet,
94        })
95    }
96
97    /// Get the HTTP status code, if available.
98    pub fn status(&self) -> Option<u16> {
99        match self {
100            HttpError::Response(detail) => Some(detail.status),
101            HttpError::Request(e) => e.status().map(|s| s.as_u16()),
102            _ => None,
103        }
104    }
105}
106
107/// Async HTTP client for Synth API.
108///
109/// Provides Bearer token authentication and automatic JSON handling.
110///
111/// # Example
112///
113/// ```ignore
114/// let client = HttpClient::new("https://api.usesynth.ai", "sk_live_...", 30)?;
115/// let result: Value = client.get("/api/v1/jobs", None).await?;
116/// ```
117#[derive(Clone)]
118pub struct HttpClient {
119    client: reqwest::Client,
120    base_url: String,
121    #[allow(dead_code)]
122    api_key: String,
123    x402_payer: Option<X402Payer>,
124}
125
126impl HttpClient {
127    /// Create a new HTTP client.
128    ///
129    /// # Arguments
130    ///
131    /// * `base_url` - Base URL for the API (without trailing slash)
132    /// * `api_key` - API key for Bearer authentication
133    /// * `timeout_secs` - Request timeout in seconds
134    ///
135    /// # Environment Variables
136    ///
137    /// Optional headers from environment:
138    /// - `SYNTH_USER_ID` or `X_USER_ID` → `X-User-ID` header
139    /// - `SYNTH_ORG_ID` or `X_ORG_ID` → `X-Org-ID` header
140    pub fn new(base_url: &str, api_key: &str, timeout_secs: u64) -> Result<Self, HttpError> {
141        let mut headers = HeaderMap::new();
142
143        // Only add auth headers if api_key is non-empty
144        if !api_key.is_empty() {
145            let auth_value = format!("Bearer {}", api_key);
146            headers.insert(
147                AUTHORIZATION,
148                HeaderValue::from_str(&auth_value)
149                    .map_err(|_| HttpError::InvalidUrl("invalid api key characters".to_string()))?,
150            );
151            headers.insert(
152                "X-API-Key",
153                HeaderValue::from_str(api_key)
154                    .map_err(|_| HttpError::InvalidUrl("invalid api key characters".to_string()))?,
155            );
156        }
157
158        // Optional dev headers
159        if let Some(user_id) = env::var("SYNTH_USER_ID")
160            .ok()
161            .or_else(|| env::var("X_USER_ID").ok())
162        {
163            if let Ok(val) = HeaderValue::from_str(&user_id) {
164                headers.insert("X-User-ID", val);
165            }
166        }
167
168        if let Some(org_id) = env::var("SYNTH_ORG_ID")
169            .ok()
170            .or_else(|| env::var("X_ORG_ID").ok())
171        {
172            if let Ok(val) = HeaderValue::from_str(&org_id) {
173                headers.insert("X-Org-ID", val);
174            }
175        }
176
177        let client = reqwest::Client::builder()
178            .default_headers(headers)
179            .timeout(Duration::from_secs(timeout_secs))
180            .pool_max_idle_per_host(DEFAULT_POOL_SIZE)
181            .pool_idle_timeout(Duration::from_secs(90))
182            .connect_timeout(Duration::from_secs(DEFAULT_CONNECT_TIMEOUT_SECS))
183            .tcp_keepalive(Duration::from_secs(60))
184            .tcp_nodelay(true)
185            .build()
186            .map_err(HttpError::Request)?;
187
188        let x402_payer = X402Payer::from_env();
189
190        Ok(Self {
191            client,
192            base_url: base_url.trim_end_matches('/').to_string(),
193            api_key: api_key.to_string(),
194            x402_payer,
195        })
196    }
197
198    /// Get the API key used by this client.
199    pub(crate) fn api_key(&self) -> &str {
200        &self.api_key
201    }
202
203    /// Convert a relative path to an absolute URL.
204    fn abs_url(&self, path: &str) -> String {
205        if path.starts_with("http://") || path.starts_with("https://") {
206            return path.to_string();
207        }
208
209        let path = path.trim_start_matches('/');
210
211        // Handle /api prefix duplication
212        if self.base_url.ends_with("/api") && path.starts_with("api/") {
213            return format!("{}/{}", self.base_url, &path[4..]);
214        }
215
216        format!("{}/{}", self.base_url, path)
217    }
218
219    /// Make a GET request.
220    ///
221    /// # Arguments
222    ///
223    /// * `path` - API path (relative or absolute)
224    /// * `params` - Optional query parameters
225    pub async fn get<T: DeserializeOwned>(
226        &self,
227        path: &str,
228        params: Option<&[(&str, &str)]>,
229    ) -> Result<T, HttpError> {
230        let url = self.abs_url(path);
231        let mut req = self.client.get(&url);
232
233        if let Some(p) = params {
234            req = req.query(p);
235        }
236
237        let request = req.build().map_err(HttpError::Request)?;
238        let (status, _headers, body) = self.send_with_x402_retry(request).await?;
239        self.parse_json(status, &url, &body)
240    }
241
242    /// Make a GET request and return raw bytes.
243    pub async fn get_bytes(
244        &self,
245        path: &str,
246        params: Option<&[(&str, &str)]>,
247    ) -> Result<Vec<u8>, HttpError> {
248        let url = self.abs_url(path);
249        let mut request = self.client.get(&url);
250        if let Some(params) = params {
251            request = request.query(params);
252        }
253        let request = request.build().map_err(HttpError::Request)?;
254        let (status, _headers, body) = self.send_with_x402_retry(request).await?;
255        if (200..300).contains(&status) {
256            return Ok(body.to_vec());
257        }
258        let text = String::from_utf8_lossy(&body);
259        Err(HttpError::from_response(
260            status,
261            &url,
262            if text.trim().is_empty() {
263                None
264            } else {
265                Some(&text)
266            },
267        ))
268    }
269
270    /// Make a GET request returning raw JSON Value.
271    pub async fn get_json(
272        &self,
273        path: &str,
274        params: Option<&[(&str, &str)]>,
275    ) -> Result<Value, HttpError> {
276        self.get(path, params).await
277    }
278
279    /// Make a POST request with JSON body.
280    ///
281    /// # Arguments
282    ///
283    /// * `path` - API path
284    /// * `body` - JSON body to send
285    pub async fn post_json<T: DeserializeOwned>(
286        &self,
287        path: &str,
288        body: &Value,
289    ) -> Result<T, HttpError> {
290        let url = self.abs_url(path);
291        let request = self
292            .client
293            .post(&url)
294            .json(body)
295            .build()
296            .map_err(HttpError::Request)?;
297        let (status, _headers, body_bytes) = self.send_with_x402_retry(request).await?;
298        self.parse_json(status, &url, &body_bytes)
299    }
300
301    /// Make a POST request with JSON body and extra headers.
302    pub async fn post_json_with_headers<T: DeserializeOwned>(
303        &self,
304        path: &str,
305        body: &Value,
306        extra_headers: Option<HeaderMap>,
307    ) -> Result<T, HttpError> {
308        let url = self.abs_url(path);
309        let mut request = self.client.post(&url).json(body);
310        if let Some(headers) = extra_headers {
311            request = request.headers(headers);
312        }
313        let request = request.build().map_err(HttpError::Request)?;
314        let (status, _headers, body_bytes) = self.send_with_x402_retry(request).await?;
315        self.parse_json(status, &url, &body_bytes)
316    }
317
318    /// Make a POST request with multipart form data.
319    ///
320    /// # Arguments
321    ///
322    /// * `path` - API path
323    /// * `data` - Form fields
324    /// * `files` - File parts
325    pub async fn post_multipart<T: DeserializeOwned>(
326        &self,
327        path: &str,
328        data: &[(String, String)],
329        files: &[MultipartFile],
330    ) -> Result<T, HttpError> {
331        let url = self.abs_url(path);
332        let mut form = Form::new();
333        for (key, value) in data {
334            form = form.text(key.clone(), value.clone());
335        }
336        for file in files {
337            let part = Part::bytes(file.bytes.clone()).file_name(file.filename.clone());
338            let part = match &file.content_type {
339                Some(ct) => part.mime_str(ct).unwrap_or_else(|_| {
340                    Part::bytes(file.bytes.clone()).file_name(file.filename.clone())
341                }),
342                None => part,
343            };
344            form = form.part(file.field.clone(), part);
345        }
346        let request = self
347            .client
348            .post(&url)
349            .multipart(form)
350            .build()
351            .map_err(HttpError::Request)?;
352        let (status, _headers, body_bytes) = self.send_with_x402_retry(request).await?;
353        self.parse_json(status, &url, &body_bytes)
354    }
355
356    /// Make a DELETE request.
357    ///
358    /// # Arguments
359    ///
360    /// * `path` - API path
361    pub async fn delete(&self, path: &str) -> Result<(), HttpError> {
362        let url = self.abs_url(path);
363        let request = self
364            .client
365            .delete(&url)
366            .build()
367            .map_err(HttpError::Request)?;
368        let (status, _headers, body_bytes) = self.send_with_x402_retry(request).await?;
369        if (200..300).contains(&status) {
370            return Ok(());
371        }
372        let text = String::from_utf8_lossy(&body_bytes);
373        Err(HttpError::from_response(
374            status,
375            &url,
376            if text.trim().is_empty() {
377                None
378            } else {
379                Some(&text)
380            },
381        ))
382    }
383
384    fn parse_json<T: DeserializeOwned>(
385        &self,
386        status: u16,
387        url: &str,
388        body: &[u8],
389    ) -> Result<T, HttpError> {
390        if !(200..300).contains(&status) {
391            let text = String::from_utf8_lossy(body);
392            return Err(HttpError::from_response(status, url, Some(&text)));
393        }
394
395        serde_json::from_slice(body).map_err(|e| {
396            let text = String::from_utf8_lossy(body);
397            HttpError::JsonParse(format!("{}: {}", e, &text[..text.len().min(100)]))
398        })
399    }
400
401    async fn send_with_x402_retry(
402        &self,
403        request: reqwest::Request,
404    ) -> Result<(u16, HeaderMap, bytes::Bytes), HttpError> {
405        let Some(first) = request.try_clone() else {
406            // Can't retry x402 anyway if we can't clone.
407            let resp = self.client.execute(request).await?;
408            let status = resp.status().as_u16();
409            let headers = resp.headers().clone();
410            let body = resp.bytes().await?;
411            return Ok((status, headers, body));
412        };
413
414        let resp = self.client.execute(first).await?;
415        let status = resp.status().as_u16();
416        let headers = resp.headers().clone();
417        let body = resp.bytes().await?;
418
419        if status != 402 {
420            return Ok((status, headers, body));
421        }
422
423        let Some(payer) = self.x402_payer.as_ref() else {
424            return Ok((status, headers, body));
425        };
426
427        let Some(payment_required_header) = extract_payment_required_header(&headers, &body) else {
428            return Ok((status, headers, body));
429        };
430
431        let Ok(payment_signature_header) =
432            payer.build_payment_signature_header(&payment_required_header)
433        else {
434            return Ok((status, headers, body));
435        };
436
437        let Some(mut retry) = request.try_clone() else {
438            return Ok((status, headers, body));
439        };
440
441        retry.headers_mut().insert(
442            PAYMENT_SIGNATURE_HEADER,
443            HeaderValue::from_str(&payment_signature_header).map_err(|_| {
444                HttpError::InvalidUrl("invalid x402 payment signature header".to_string())
445            })?,
446        );
447
448        let resp2 = self.client.execute(retry).await?;
449        let status2 = resp2.status().as_u16();
450        let headers2 = resp2.headers().clone();
451        let body2 = resp2.bytes().await?;
452        Ok((status2, headers2, body2))
453    }
454}
455
456fn extract_payment_required_header(headers: &HeaderMap, body: &[u8]) -> Option<String> {
457    let direct = headers
458        .get(PAYMENT_REQUIRED_HEADER)
459        .or_else(|| headers.get(X_PAYMENT_REQUIRED_HEADER))
460        .and_then(|v| v.to_str().ok())
461        .map(|v| v.to_string());
462    if direct.is_some() {
463        return direct;
464    }
465
466    // Fallback: some servers also place it in the JSON body under detail.x402.payment_required_header.
467    let parsed = serde_json::from_slice::<serde_json::Value>(body).ok()?;
468    let detail = parsed.get("detail").unwrap_or(&parsed);
469    let x402 = detail.get("x402")?;
470    x402.get("payment_required_header")
471        .and_then(|v| v.as_str())
472        .map(|v| v.to_string())
473}
474
475#[cfg(test)]
476mod tests {
477    use super::*;
478    use crate::x402::{
479        decode_payment_signature_header, recover_payer_address_from_payment_payload,
480        PaymentRequired, PaymentRequirements, ResourceInfo,
481    };
482    use base64::Engine as _;
483    use bytes::Bytes;
484    use http_body_util::Full;
485    use hyper::service::service_fn;
486    use hyper::{Request, Response, StatusCode};
487    use hyper_util::rt::{TokioExecutor, TokioIo};
488    use hyper_util::server::conn::auto::Builder;
489    use std::sync::atomic::{AtomicUsize, Ordering};
490    use std::sync::Arc;
491    use tokio::net::TcpListener;
492
493    #[test]
494    fn test_abs_url_relative() {
495        let client = HttpClient::new("https://api.usesynth.ai", "test_key", 30).unwrap();
496        assert_eq!(
497            client.abs_url("/api/v1/jobs"),
498            "https://api.usesynth.ai/api/v1/jobs"
499        );
500        assert_eq!(
501            client.abs_url("api/v1/jobs"),
502            "https://api.usesynth.ai/api/v1/jobs"
503        );
504    }
505
506    #[test]
507    fn test_abs_url_absolute() {
508        let client = HttpClient::new("https://api.usesynth.ai", "test_key", 30).unwrap();
509        assert_eq!(
510            client.abs_url("https://other.com/path"),
511            "https://other.com/path"
512        );
513    }
514
515    #[test]
516    fn test_abs_url_api_prefix_dedup() {
517        let client = HttpClient::new("https://api.usesynth.ai/api", "test_key", 30).unwrap();
518        assert_eq!(
519            client.abs_url("api/v1/jobs"),
520            "https://api.usesynth.ai/api/v1/jobs"
521        );
522    }
523
524    #[test]
525    fn test_http_error_display() {
526        let err = HttpError::from_response(404, "https://api.example.com/test", Some("not found"));
527        let msg = format!("{}", err);
528        assert!(msg.contains("404"));
529        assert!(msg.contains("api.example.com"));
530    }
531
532    #[tokio::test]
533    async fn test_http_client_x402_auto_retry() {
534        // Configure a local payer (client-side) private key.
535        std::env::set_var(
536            "SYNTH_X402_PRIVATE_KEY",
537            "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
538        );
539        let payer = X402Payer::from_env().unwrap();
540
541        // Start a tiny HTTP server that 402s once, then expects PAYMENT-SIGNATURE.
542        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
543        let addr = listener.local_addr().unwrap();
544        let base_url = format!("http://{}", addr);
545
546        // Build PAYMENT-REQUIRED header payload.
547        let pay_to = "0x1111111111111111111111111111111111111111".to_string();
548        let asset = "0x036CbD53842c5426634e7929541eC2318f3dCF7e".to_string();
549        let payment_required = PaymentRequired {
550            x402_version: 2,
551            error: Some("Payment required".to_string()),
552            resource: Some(ResourceInfo {
553                url: format!("{}/test", base_url),
554                description: Some("unit test".to_string()),
555                mime_type: Some("application/json".to_string()),
556            }),
557            accepts: vec![PaymentRequirements {
558                scheme: "exact".to_string(),
559                network: "eip155:84532".to_string(),
560                asset,
561                amount: "250000".to_string(),
562                pay_to,
563                max_timeout_seconds: 300,
564                extra: Some(serde_json::json!({"name": "USDC", "version": "2"})),
565            }],
566            extensions: None,
567        };
568        let required_header = base64::engine::general_purpose::STANDARD
569            .encode(serde_json::to_vec(&payment_required).unwrap());
570
571        let request_count = Arc::new(AtomicUsize::new(0));
572        let expected_payer = payer.address().to_string();
573
574        let required_header_clone = required_header.clone();
575        let request_count_clone = request_count.clone();
576        let expected_payer_clone = expected_payer.clone();
577
578        tokio::spawn(async move {
579            loop {
580                let (stream, _) = match listener.accept().await {
581                    Ok(value) => value,
582                    Err(_) => break,
583                };
584                let io = TokioIo::new(stream);
585                let required = required_header_clone.clone();
586                let count = request_count_clone.clone();
587                let expected = expected_payer_clone.clone();
588
589                tokio::spawn(async move {
590                    let svc = service_fn(move |req: Request<hyper::body::Incoming>| {
591                        let required = required.clone();
592                        let count = count.clone();
593                        let expected = expected.clone();
594
595                        async move {
596                            let n = count.fetch_add(1, Ordering::SeqCst);
597                            if n == 0 {
598                                let mut resp = Response::new(Full::new(Bytes::from_static(b"")));
599                                *resp.status_mut() = StatusCode::PAYMENT_REQUIRED;
600                                resp.headers_mut().insert(
601                                    PAYMENT_REQUIRED_HEADER,
602                                    HeaderValue::from_str(&required).unwrap(),
603                                );
604                                return Ok::<_, hyper::Error>(resp);
605                            }
606
607                            let sig_header = req
608                                .headers()
609                                .get(PAYMENT_SIGNATURE_HEADER)
610                                .and_then(|v| v.to_str().ok())
611                                .unwrap_or("");
612                            assert!(!sig_header.trim().is_empty());
613
614                            let payment_payload =
615                                decode_payment_signature_header(sig_header).unwrap();
616                            let recovered =
617                                recover_payer_address_from_payment_payload(&payment_payload)
618                                    .unwrap();
619                            assert_eq!(recovered, expected);
620
621                            let body =
622                                serde_json::to_vec(&serde_json::json!({"ok": true})).unwrap();
623                            let mut resp = Response::new(Full::new(Bytes::from(body)));
624                            *resp.status_mut() = StatusCode::OK;
625                            Ok::<_, hyper::Error>(resp)
626                        }
627                    });
628
629                    let _ = Builder::new(TokioExecutor::new())
630                        .serve_connection(io, svc)
631                        .await;
632                });
633            }
634        });
635
636        // Client should transparently retry and succeed.
637        let client = HttpClient::new(&base_url, "test_key", 30).unwrap();
638
639        // Remove env var immediately to avoid leaking into other tests.
640        std::env::remove_var("SYNTH_X402_PRIVATE_KEY");
641
642        let result: Value = client.get_json("/test", None).await.unwrap();
643        assert_eq!(result.get("ok").and_then(|v| v.as_bool()), Some(true));
644        assert_eq!(request_count.load(Ordering::SeqCst), 2);
645    }
646}