Skip to main content

scrapfly_sdk/
client.rs

1//! HTTP client for the Scrapfly API.
2//!
3//! Built on `reqwest` with `rustls`. Single shared [`reqwest::Client`]
4//! re-used across every call.
5
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9
10use futures_util::stream::{Stream, StreamExt};
11use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, CONTENT_TYPE, USER_AGENT};
12use reqwest::{Method, Response, Url};
13
14use crate::config::crawler::CrawlerConfig;
15use crate::config::extraction::ExtractionConfig;
16use crate::config::scrape::ScrapeConfig;
17use crate::config::screenshot::ScreenshotConfig;
18use crate::enums::HttpMethod;
19use crate::error::{from_response, parse_retry_after, ApiError, ScrapflyError};
20use crate::monitoring::{
21    CloudBrowserMonitoringOptions, MonitoringDataFormat, MonitoringMetricsOptions,
22    MonitoringTargetMetricsOptions,
23};
24use crate::result::account::{AccountData, VerifyApiKeyResult};
25use crate::result::classify::{ClassifyRequest, ClassifyResult};
26use crate::result::crawler::{
27    CrawlerArtifact, CrawlerArtifactType, CrawlerContents, CrawlerStartResponse, CrawlerStatus,
28    CrawlerUrls,
29};
30use crate::result::extraction::ExtractionResult;
31use crate::result::scrape::{ResultData, ScrapeResult};
32use crate::result::screenshot::{ScreenshotMetadata, ScreenshotResult};
33
34const DEFAULT_HOST: &str = "https://api.scrapfly.io";
35const DEFAULT_CLOUD_BROWSER_HOST: &str = "https://browser.scrapfly.io";
36const SDK_USER_AGENT: &str = "Scrapfly-Rust-SDK";
37const DEFAULT_RETRIES: usize = 3;
38const DEFAULT_RETRY_DELAY: Duration = Duration::from_secs(1);
39const DEFAULT_TIMEOUT: Duration = Duration::from_secs(150);
40
41/// Request-inspection callback. Fires right before `send()`.
42///
43/// Used by the integration harness to record the outgoing method/URL/headers
44/// without wrapping the `reqwest::Client` in a middleware layer.
45pub type OnRequest = Arc<dyn Fn(&Method, &Url, &HeaderMap) + Send + Sync>;
46
47/// Scrapfly API client. Cheap to `Clone` (the inner `reqwest::Client` is
48/// `Arc`'d so all clones share one connection pool).
49#[derive(Clone)]
50pub struct Client {
51    http: reqwest::Client,
52    key: String,
53    host: String,
54    cloud_browser_host: String,
55    on_request: Option<OnRequest>,
56}
57
58impl std::fmt::Debug for Client {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        f.debug_struct("Client")
61            .field("host", &self.host)
62            .field("cloud_browser_host", &self.cloud_browser_host)
63            .finish()
64    }
65}
66
67/// Builder for [`Client`].
68#[derive(Default)]
69pub struct ClientBuilder {
70    api_key: Option<String>,
71    host: Option<String>,
72    cloud_browser_host: Option<String>,
73    timeout: Option<Duration>,
74    danger_accept_invalid_certs: bool,
75    http_client: Option<reqwest::Client>,
76    on_request: Option<OnRequest>,
77}
78
79impl ClientBuilder {
80    /// Set the API key (required).
81    pub fn api_key(mut self, key: impl Into<String>) -> Self {
82        self.api_key = Some(key.into());
83        self
84    }
85    /// Override the API host.
86    pub fn host(mut self, host: impl Into<String>) -> Self {
87        self.host = Some(host.into());
88        self
89    }
90    /// Override the Cloud Browser host (`https://browser.scrapfly.io`).
91    pub fn cloud_browser_host(mut self, host: impl Into<String>) -> Self {
92        self.cloud_browser_host = Some(host.into());
93        self
94    }
95    /// Override the HTTP timeout (default 150s).
96    pub fn timeout(mut self, t: Duration) -> Self {
97        self.timeout = Some(t);
98        self
99    }
100    /// Accept invalid TLS certificates (tests / self-signed dev hosts).
101    pub fn danger_accept_invalid_certs(mut self, v: bool) -> Self {
102        self.danger_accept_invalid_certs = v;
103        self
104    }
105    /// Inject a pre-built `reqwest::Client`. Bypasses the timeout /
106    /// TLS-verify options.
107    pub fn http_client(mut self, client: reqwest::Client) -> Self {
108        self.http_client = Some(client);
109        self
110    }
111    /// Install a pre-send request callback (used by the integration runner
112    /// to capture SDK-layer attribution without installing middleware).
113    pub fn on_request(mut self, cb: OnRequest) -> Self {
114        self.on_request = Some(cb);
115        self
116    }
117    /// Build the client.
118    pub fn build(self) -> Result<Client, ScrapflyError> {
119        let key = self.api_key.ok_or(ScrapflyError::BadApiKey)?;
120        if key.is_empty() {
121            return Err(ScrapflyError::BadApiKey);
122        }
123
124        let http = if let Some(c) = self.http_client {
125            c
126        } else {
127            let mut builder = reqwest::Client::builder()
128                .timeout(self.timeout.unwrap_or(DEFAULT_TIMEOUT))
129                .user_agent(SDK_USER_AGENT);
130            if self.danger_accept_invalid_certs {
131                builder = builder.danger_accept_invalid_certs(true);
132            }
133            builder.build().map_err(ScrapflyError::Transport)?
134        };
135
136        Ok(Client {
137            http,
138            key,
139            host: self.host.unwrap_or_else(|| DEFAULT_HOST.to_string()),
140            cloud_browser_host: self
141                .cloud_browser_host
142                .unwrap_or_else(|| DEFAULT_CLOUD_BROWSER_HOST.to_string()),
143            on_request: self.on_request,
144        })
145    }
146}
147
148impl Client {
149    /// Start a new [`ClientBuilder`].
150    pub fn builder() -> ClientBuilder {
151        ClientBuilder::default()
152    }
153
154    /// Return the configured API key.
155    pub fn api_key(&self) -> &str {
156        &self.key
157    }
158
159    /// Return the configured API host.
160    pub fn host(&self) -> &str {
161        &self.host
162    }
163
164    /// Return the configured Cloud Browser host.
165    pub fn cloud_browser_host(&self) -> &str {
166        &self.cloud_browser_host
167    }
168
169    /// Build a URL by joining `path` onto the configured host.
170    /// Crate-internal shim over `build_url`, used by `schedule.rs` to share
171    /// the same auth + host wiring as the rest of the SDK without exposing
172    /// the helper publicly.
173    pub(crate) fn build_url_public(
174        &self,
175        path: &str,
176        query: &[(String, String)],
177    ) -> Result<Url, ScrapflyError> {
178        self.build_url(path, query)
179    }
180
181    /// Crate-internal shim over `send_simple` — same rationale as
182    /// `build_url_public`.
183    pub(crate) async fn send_simple_public(
184        &self,
185        method: Method,
186        url: Url,
187        headers: Option<HeaderMap>,
188        body: Option<Vec<u8>>,
189    ) -> Result<Response, ScrapflyError> {
190        self.send_simple(method, url, headers, body).await
191    }
192
193    fn build_url(&self, path: &str, query: &[(String, String)]) -> Result<Url, ScrapflyError> {
194        let mut u = Url::parse(&format!("{}{}", self.host, path))
195            .map_err(|e| ScrapflyError::Config(format!("invalid url: {}", e)))?;
196        {
197            let mut pairs = u.query_pairs_mut();
198            pairs.append_pair("key", &self.key);
199            for (k, v) in query {
200                pairs.append_pair(k, v);
201            }
202        }
203        Ok(u)
204    }
205
206    /// Verify the API key by hitting `/account`.
207    pub async fn verify_api_key(&self) -> Result<VerifyApiKeyResult, ScrapflyError> {
208        let url = self.build_url("/account", &[])?;
209        let resp = self.send_simple(Method::GET, url, None, None).await?;
210        Ok(VerifyApiKeyResult {
211            valid: resp.status().is_success(),
212        })
213    }
214
215    /// Fetch account info.
216    pub async fn account(&self) -> Result<AccountData, ScrapflyError> {
217        let url = self.build_url("/account", &[])?;
218        let resp = self.send_simple(Method::GET, url, None, None).await?;
219        let (status, _headers, body) = read_response(resp).await?;
220        if status != 200 {
221            return Err(from_response(status, &body, 0, false));
222        }
223        Ok(serde_json::from_slice(&body)?)
224    }
225
226    /// Classify an already-fetched HTTP response for anti-bot blocking.
227    ///
228    /// Runs the same detection pipeline used by every live Scrapfly scrape
229    /// against a response you already have (from your own proxy, cache, etc).
230    /// 1 API credit per successful call. See
231    /// <https://scrapfly.io/docs/scrape-api/classify>.
232    pub async fn classify(&self, req: &ClassifyRequest) -> Result<ClassifyResult, ScrapflyError> {
233        if req.url.is_empty() {
234            return Err(ScrapflyError::Config("classify: url is required".into()));
235        }
236        if !(100..=599).contains(&req.status_code) {
237            return Err(ScrapflyError::Config(
238                "classify: status_code must be in [100, 599]".into(),
239            ));
240        }
241
242        let url = self.build_url("/classify", &[])?;
243        let body = serde_json::to_vec(req)
244            .map_err(|e| ScrapflyError::Config(format!("marshal classify request: {}", e)))?;
245
246        let mut headers = HeaderMap::new();
247        headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
248        headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
249
250        let resp = self
251            .send_simple(Method::POST, url, Some(headers), Some(body))
252            .await?;
253        let (status, _headers, bytes) = read_response(resp).await?;
254        if status >= 400 {
255            return Err(from_response(status, &bytes, 0, false));
256        }
257        let out: ClassifyResult = serde_json::from_slice(&bytes)
258            .map_err(|e| ScrapflyError::Config(format!("decode classify response: {}", e)))?;
259        Ok(out)
260    }
261
262    // ── Monitoring API (Enterprise+ plan only) ──────────────────────
263    // The Monitoring API exposes per-product aggregates and per-target
264    // timeseries. Web Scraping / Screenshot / Extraction / Crawler share
265    // one shape (request-based) but live under different URL prefixes;
266    // Cloud Browser is session-based and exposes a distinct shape.
267    // See <https://scrapfly.io/docs/monitoring#api>.
268
269    fn build_metrics_pairs(opts: &MonitoringMetricsOptions) -> Vec<(String, String)> {
270        let mut pairs: Vec<(String, String)> = Vec::new();
271        let format = opts.format.unwrap_or(MonitoringDataFormat::Structured);
272        pairs.push(("format".into(), format.as_str().into()));
273        if let Some(p) = opts.period {
274            pairs.push(("period".into(), p.as_str().into()));
275        }
276        if let Some(ref aggs) = opts.aggregation {
277            if !aggs.is_empty() {
278                let joined = aggs
279                    .iter()
280                    .map(|a| a.as_str())
281                    .collect::<Vec<_>>()
282                    .join(",");
283                pairs.push(("aggregation".into(), joined));
284            }
285        }
286        if opts.include_webhook {
287            pairs.push(("include_webhook".into(), "true".into()));
288        }
289        pairs
290    }
291
292    fn build_target_pairs(
293        opts: &MonitoringTargetMetricsOptions,
294    ) -> Result<Vec<(String, String)>, ScrapflyError> {
295        if opts.domain.is_empty() {
296            return Err(ScrapflyError::Config(
297                "monitoring target metrics: domain is required".into(),
298            ));
299        }
300        if opts.start.is_some() != opts.end.is_some() {
301            return Err(ScrapflyError::Config(
302                "monitoring target metrics: start and end must be provided together".into(),
303            ));
304        }
305        let mut pairs: Vec<(String, String)> = Vec::new();
306        pairs.push(("domain".into(), opts.domain.clone()));
307        pairs.push(("group_subdomain".into(), opts.group_subdomain.to_string()));
308        match (&opts.start, &opts.end) {
309            (Some(s), Some(e)) => {
310                pairs.push(("start".into(), s.clone()));
311                pairs.push(("end".into(), e.clone()));
312            }
313            _ => {
314                let period = opts
315                    .period
316                    .unwrap_or(crate::monitoring::MonitoringPeriod::Last24h);
317                pairs.push(("period".into(), period.as_str().into()));
318            }
319        }
320        if opts.include_webhook {
321            pairs.push(("include_webhook".into(), "true".into()));
322        }
323        Ok(pairs)
324    }
325
326    async fn fetch_monitoring_json(
327        &self,
328        path: &str,
329        pairs: &[(String, String)],
330    ) -> Result<serde_json::Value, ScrapflyError> {
331        let url = self.build_url(path, pairs)?;
332        let resp = self.send_simple(Method::GET, url, None, None).await?;
333        let (status, _headers, body) = read_response(resp).await?;
334        if status != 200 {
335            return Err(from_response(status, &body, 0, false));
336        }
337        Ok(serde_json::from_slice(&body)?)
338    }
339
340    // ── Web Scraping API ─────────────────────────────────────────────
341
342    /// Fetch aggregate monitoring metrics for the Web Scraping API.
343    pub async fn get_monitoring_metrics(
344        &self,
345        opts: &MonitoringMetricsOptions,
346    ) -> Result<serde_json::Value, ScrapflyError> {
347        self.fetch_monitoring_json(
348            "/scrape/monitoring/metrics",
349            &Self::build_metrics_pairs(opts),
350        )
351        .await
352    }
353
354    /// Fetch per-target monitoring metrics for the Web Scraping API.
355    pub async fn get_monitoring_target_metrics(
356        &self,
357        opts: &MonitoringTargetMetricsOptions,
358    ) -> Result<serde_json::Value, ScrapflyError> {
359        let pairs = Self::build_target_pairs(opts)?;
360        self.fetch_monitoring_json("/scrape/monitoring/metrics/target", &pairs)
361            .await
362    }
363
364    // ── Screenshot API ───────────────────────────────────────────────
365
366    /// Fetch aggregate monitoring metrics for the Screenshot API.
367    pub async fn get_screenshot_monitoring_metrics(
368        &self,
369        opts: &MonitoringMetricsOptions,
370    ) -> Result<serde_json::Value, ScrapflyError> {
371        self.fetch_monitoring_json(
372            "/screenshot/monitoring/metrics",
373            &Self::build_metrics_pairs(opts),
374        )
375        .await
376    }
377
378    /// Fetch per-target monitoring metrics for the Screenshot API.
379    pub async fn get_screenshot_monitoring_target_metrics(
380        &self,
381        opts: &MonitoringTargetMetricsOptions,
382    ) -> Result<serde_json::Value, ScrapflyError> {
383        let pairs = Self::build_target_pairs(opts)?;
384        self.fetch_monitoring_json("/screenshot/monitoring/metrics/target", &pairs)
385            .await
386    }
387
388    // ── Extraction API ───────────────────────────────────────────────
389
390    /// Fetch aggregate monitoring metrics for the Extraction API.
391    pub async fn get_extraction_monitoring_metrics(
392        &self,
393        opts: &MonitoringMetricsOptions,
394    ) -> Result<serde_json::Value, ScrapflyError> {
395        self.fetch_monitoring_json(
396            "/extraction/monitoring/metrics",
397            &Self::build_metrics_pairs(opts),
398        )
399        .await
400    }
401
402    /// Fetch per-target monitoring metrics for the Extraction API.
403    pub async fn get_extraction_monitoring_target_metrics(
404        &self,
405        opts: &MonitoringTargetMetricsOptions,
406    ) -> Result<serde_json::Value, ScrapflyError> {
407        let pairs = Self::build_target_pairs(opts)?;
408        self.fetch_monitoring_json("/extraction/monitoring/metrics/target", &pairs)
409            .await
410    }
411
412    // ── Crawler API ──────────────────────────────────────────────────
413
414    /// Fetch aggregate monitoring metrics for the Crawler API.
415    pub async fn get_crawler_monitoring_metrics(
416        &self,
417        opts: &MonitoringMetricsOptions,
418    ) -> Result<serde_json::Value, ScrapflyError> {
419        self.fetch_monitoring_json(
420            "/crawl/monitoring/metrics",
421            &Self::build_metrics_pairs(opts),
422        )
423        .await
424    }
425
426    /// Fetch per-target monitoring metrics for the Crawler API.
427    pub async fn get_crawler_monitoring_target_metrics(
428        &self,
429        opts: &MonitoringTargetMetricsOptions,
430    ) -> Result<serde_json::Value, ScrapflyError> {
431        let pairs = Self::build_target_pairs(opts)?;
432        self.fetch_monitoring_json("/crawl/monitoring/metrics/target", &pairs)
433            .await
434    }
435
436    // ── Cloud Browser API (session-based, distinct shape) ────────────
437
438    /// Fetch aggregate monitoring metrics for the Cloud Browser API.
439    pub async fn get_browser_monitoring_metrics(
440        &self,
441        opts: &CloudBrowserMonitoringOptions,
442    ) -> Result<serde_json::Value, ScrapflyError> {
443        let pairs = Self::build_browser_pairs(opts)?;
444        self.fetch_monitoring_json("/browser/monitoring/metrics", &pairs)
445            .await
446    }
447
448    /// Fetch monitoring time-series for the Cloud Browser API.
449    pub async fn get_browser_monitoring_timeseries(
450        &self,
451        opts: &CloudBrowserMonitoringOptions,
452    ) -> Result<serde_json::Value, ScrapflyError> {
453        let pairs = Self::build_browser_pairs(opts)?;
454        self.fetch_monitoring_json("/browser/monitoring/metrics/timeseries", &pairs)
455            .await
456    }
457
458    fn build_browser_pairs(
459        opts: &CloudBrowserMonitoringOptions,
460    ) -> Result<Vec<(String, String)>, ScrapflyError> {
461        if opts.start.is_some() != opts.end.is_some() {
462            return Err(ScrapflyError::Config(
463                "cloud browser monitoring: start and end must be provided together".into(),
464            ));
465        }
466        let mut pairs: Vec<(String, String)> = Vec::new();
467        match (&opts.start, &opts.end) {
468            (Some(s), Some(e)) => {
469                pairs.push(("start".into(), s.clone()));
470                pairs.push(("end".into(), e.clone()));
471            }
472            _ => {
473                if let Some(p) = opts.period {
474                    pairs.push(("period".into(), p.as_str().into()));
475                }
476            }
477        }
478        if let Some(ref pool) = opts.proxy_pool {
479            pairs.push(("proxy_pool".into(), pool.clone()));
480        }
481        Ok(pairs)
482    }
483
484    /// Scrape a URL.
485    pub async fn scrape(&self, config: &ScrapeConfig) -> Result<ScrapeResult, ScrapflyError> {
486        let pairs = config.to_query_pairs()?;
487        let url = self.build_url("/scrape", &pairs)?;
488        let method = match config.method {
489            Some(m) => Method::from_bytes(m.as_str().as_bytes())
490                .map_err(|e| ScrapflyError::Config(format!("invalid method: {}", e)))?,
491            None => Method::GET,
492        };
493        let mut headers = HeaderMap::new();
494        headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
495        let body = config.body.clone();
496        let resp = self
497            .send_with_retry(method, url, Some(headers), body.map(|b| b.into_bytes()))
498            .await?;
499        let (status, _h, body_bytes) = read_response(resp).await?;
500        if status != 200 {
501            return Err(from_response(status, &body_bytes, 0, false));
502        }
503        // HEAD has no body per HTTP spec, so the Scrapfly API returns a 200
504        // with an empty body — there's no JSON envelope to parse. Synthesize
505        // a minimal ScrapeResult so callers still get a typed response with
506        // status_code=200 and an empty content string. Matches Python SDK
507        // behavior, which tolerates an empty body_handler read on HEAD.
508        if matches!(config.method, Some(HttpMethod::Head)) && body_bytes.is_empty() {
509            return Ok(ScrapeResult {
510                uuid: String::new(),
511                config: serde_json::Value::Null,
512                context: serde_json::Value::Null,
513                result: ResultData {
514                    status_code: 200,
515                    success: true,
516                    ..Default::default()
517                },
518            });
519        }
520        let mut result: ScrapeResult = serde_json::from_slice(&body_bytes)?;
521        // Upstream failure handling: the Scrapfly API call itself may succeed
522        // (HTTP 200) while the *target* site returned a failure. In that case
523        // result.result.success is false and we must surface it as an error
524        // variant so callers can `match` on it. Mirrors the Go SDK behavior
525        // in `sdk/go/client.go::checkResult` (4xx → UpstreamClient,
526        // 5xx → UpstreamServer).
527        if !result.result.success {
528            let (err_code, err_message, err_doc) = match &result.result.error {
529                Some(e) => (e.code.clone(), e.message.clone(), e.doc_url.clone()),
530                None => (
531                    result.result.status.clone(),
532                    format!(
533                        "scrape failed with status_code={}",
534                        result.result.status_code
535                    ),
536                    String::new(),
537                ),
538            };
539            let api_err = ApiError {
540                code: err_code,
541                message: err_message,
542                http_status: result.result.status_code,
543                documentation_url: err_doc,
544                hint: String::new(),
545                retry_after_ms: 0,
546            };
547            let sc = result.result.status_code;
548            if (400..500).contains(&sc) {
549                return Err(ScrapflyError::UpstreamClient(api_err));
550            }
551            if (500..600).contains(&sc) {
552                return Err(ScrapflyError::UpstreamServer(api_err));
553            }
554            // Unknown status code (e.g. 0, timeouts) — fall through to generic
555            // Api error rather than silently returning a failed result.
556            return Err(ScrapflyError::Api(api_err));
557        }
558        // Transparent large-object handling: when a scrape response is too
559        // large, the engine offloads the body to a signed URL and sets
560        // `format=clob|blob`, stashing the URL in `content`. The SDK must
561        // auto-fetch and surface the final bytes + a user-friendly format
562        // marker (clob→text, blob→binary). Mirrors `sdk/go/client.go::handleLargeObjects`.
563        if result.result.success && result.result.status == "DONE" {
564            let fmt = result.result.format.as_str();
565            if fmt == "clob" || fmt == "blob" {
566                let (new_content, new_format) =
567                    self.fetch_large_object(&result.result.content, fmt).await?;
568                result.result.content = new_content;
569                result.result.format = new_format;
570            }
571        }
572        Ok(result)
573    }
574
575    /// Fetch an offloaded large-object body from its signed URL, re-attaching
576    /// the API key as a query param. Returns `(content, format)`:
577    /// `clob → ("…text…", "text")`, `blob → ("…bytes as lossy utf8…", "binary")`.
578    async fn fetch_large_object(
579        &self,
580        content_url: &str,
581        format: &str,
582    ) -> Result<(String, String), ScrapflyError> {
583        let mut url = Url::parse(content_url)
584            .map_err(|e| ScrapflyError::Config(format!("invalid large-object url: {}", e)))?;
585        // Append the API key without clobbering existing query params.
586        {
587            let existing: Vec<(String, String)> = url
588                .query_pairs()
589                .filter(|(k, _)| k != "key")
590                .map(|(k, v)| (k.into_owned(), v.into_owned()))
591                .collect();
592            let mut qs = url.query_pairs_mut();
593            qs.clear();
594            for (k, v) in existing {
595                qs.append_pair(&k, &v);
596            }
597            qs.append_pair("key", self.api_key());
598        }
599        let mut headers = HeaderMap::new();
600        headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
601        let resp = self
602            .send_with_retry(Method::GET, url, Some(headers), None)
603            .await?;
604        let (status, _h, body) = read_response(resp).await?;
605        if status != 200 {
606            return Err(from_response(status, &body, 0, false));
607        }
608        let new_format = match format {
609            "clob" => "text",
610            "blob" => "binary",
611            _ => {
612                return Err(ScrapflyError::Config(format!(
613                    "unsupported large-object format: {}",
614                    format
615                )))
616            }
617        };
618        // For blob (binary PDF, image, etc.) we use from_utf8_lossy to
619        // preserve the raw bytes in the `content` string field, matching
620        // the Go/Python SDKs' behavior.
621        let content = String::from_utf8_lossy(&body).into_owned();
622        Ok((content, new_format.to_string()))
623    }
624
625    /// Concurrent-scrape stream. Emits results in completion order.
626    pub fn concurrent_scrape<'a, I>(
627        &'a self,
628        configs: I,
629        concurrency_limit: usize,
630    ) -> impl Stream<Item = Result<ScrapeResult, ScrapflyError>> + 'a
631    where
632        I: IntoIterator<Item = ScrapeConfig> + 'a,
633        <I as IntoIterator>::IntoIter: 'a,
634    {
635        let limit = if concurrency_limit == 0 {
636            5
637        } else {
638            concurrency_limit
639        };
640        futures_util::stream::iter(
641            configs
642                .into_iter()
643                .map(move |cfg| async move { self.scrape(&cfg).await }),
644        )
645        .buffer_unordered(limit)
646    }
647
648    /// POST /scrape/batch: scrape up to 100 URLs and stream results
649    /// back as each scrape completes. Returns an async stream where
650    /// each item is `(correlation_id, Result<ScrapeResult, ScrapflyError>)`.
651    ///
652    /// Results arrive OUT OF ORDER — whichever scrape finishes first
653    /// is yielded first. Every `ScrapeConfig` MUST carry a unique
654    /// `correlation_id`; missing / duplicate values are caught
655    /// client-side before the request is sent.
656    ///
657    /// Batch-level failures (plan gate, insufficient concurrency,
658    /// validation) surface as the outer `Err(ScrapflyError)` returned
659    /// from the `await` — the stream is only created after the
660    /// batch request succeeds.
661    pub async fn scrape_batch(
662        &self,
663        configs: &[ScrapeConfig],
664    ) -> Result<impl Stream<Item = (String, crate::batch::BatchOutcome)>, ScrapflyError> {
665        self.scrape_batch_with_options(configs, crate::batch::BatchOptions::default())
666            .await
667    }
668
669    /// Like `scrape_batch` but with explicit `BatchOptions`
670    /// (msgpack wire format, etc.).
671    pub async fn scrape_batch_with_options(
672        &self,
673        configs: &[ScrapeConfig],
674        opts: crate::batch::BatchOptions,
675    ) -> Result<impl Stream<Item = (String, crate::batch::BatchOutcome)>, ScrapflyError> {
676        use crate::batch::{
677            build_proxified_response, decode_part_body, parts_from_response, BatchOutcome,
678        };
679
680        if configs.is_empty() {
681            return Err(ScrapflyError::Config(
682                "scrape_batch: configs is empty".into(),
683            ));
684        }
685
686        if configs.len() > 100 {
687            return Err(ScrapflyError::Config(format!(
688                "scrape_batch: max 100 configs per batch (got {})",
689                configs.len()
690            )));
691        }
692
693        let mut seen: HashMap<String, usize> = HashMap::new();
694        let mut body_configs: Vec<HashMap<String, String>> = Vec::with_capacity(configs.len());
695
696        for (i, cfg) in configs.iter().enumerate() {
697            let correlation_id = cfg.correlation_id.clone().ok_or_else(|| {
698                ScrapflyError::Config(format!(
699                    "scrape_batch: configs[{}] is missing correlation_id (required for matching streamed parts)",
700                    i
701                ))
702            })?;
703
704            if let Some(prev) = seen.get(&correlation_id) {
705                return Err(ScrapflyError::Config(format!(
706                    "scrape_batch: correlation_id {:?} reused by configs[{}] and configs[{}]",
707                    correlation_id, prev, i
708                )));
709            }
710
711            seen.insert(correlation_id.clone(), i);
712
713            let pairs = cfg.to_query_pairs()?;
714            let mut entry: HashMap<String, String> = HashMap::with_capacity(pairs.len());
715
716            for (k, v) in pairs {
717                if k == "key" {
718                    continue;
719                }
720
721                entry.insert(k, v);
722            }
723
724            body_configs.push(entry);
725        }
726
727        let body = serde_json::json!({ "configs": body_configs });
728        let body_bytes = serde_json::to_vec(&body)?;
729
730        let mut url = Url::parse(&self.host)
731            .map_err(|e| ScrapflyError::Config(format!("invalid host: {}", e)))?;
732        url.set_path("/scrape/batch");
733        url.query_pairs_mut().append_pair("key", &self.key);
734
735        let mut headers = HeaderMap::new();
736        headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
737        headers.insert(
738            ACCEPT,
739            HeaderValue::from_static(opts.format.accept_header()),
740        );
741        headers.insert(USER_AGENT, HeaderValue::from_static(SDK_USER_AGENT));
742
743        let method = Method::POST;
744
745        if let Some(cb) = &self.on_request {
746            cb(&method, &url, &headers);
747        }
748
749        let resp = self
750            .http
751            .request(method, url)
752            .headers(headers)
753            .body(body_bytes)
754            .send()
755            .await
756            .map_err(|e| ScrapflyError::Config(format!("scrape_batch: send: {}", e)))?;
757
758        let status = resp.status().as_u16();
759
760        if status != 200 {
761            let body_bytes = resp.bytes().await.unwrap_or_default();
762
763            return Err(from_response(status, &body_bytes, 0, false));
764        }
765
766        let parts_stream = parts_from_response(resp)?;
767
768        Ok(parts_stream.map(|part_r| match part_r {
769            Ok(part) => {
770                let correlation_id = part
771                    .headers
772                    .get("x-scrapfly-correlation-id")
773                    .cloned()
774                    .unwrap_or_default();
775
776                // Proxified-response parts: the part body is the raw
777                // upstream bytes, not a JSON envelope. Surface as a
778                // BatchProxifiedResponse rather than attempting to
779                // decode the body as JSON.
780                if part
781                    .headers
782                    .get("x-scrapfly-proxified")
783                    .map(|v| v == "true")
784                    .unwrap_or(false)
785                {
786                    let prox = build_proxified_response(part);
787                    return (correlation_id, BatchOutcome::Proxified(prox));
788                }
789
790                match decode_part_body::<ScrapeResult>(&part) {
791                    Ok(r) => (correlation_id, BatchOutcome::Scrape(r)),
792                    Err(e) => (correlation_id, BatchOutcome::Err(e)),
793                }
794            }
795            Err(e) => (String::new(), BatchOutcome::Err(e)),
796        }))
797    }
798
799    /// Scrape a URL with `proxified_response=true`, returning the raw
800    /// upstream `reqwest::Response` (target's status, headers, body).
801    ///
802    /// Unlike [`scrape()`], no JSON parsing occurs — the response body is
803    /// the target page's raw content. Scrapfly metadata is available on
804    /// the `X-Scrapfly-*` response headers (`Api-Cost`, `Content-Format`,
805    /// `Log`, etc.).
806    ///
807    /// Automatically forces `proxified_response=true` regardless of the
808    /// config's field value.
809    pub async fn scrape_proxified(
810        &self,
811        config: &ScrapeConfig,
812    ) -> Result<reqwest::Response, ScrapflyError> {
813        let mut cfg = config.clone();
814        cfg.proxified_response = true;
815        let pairs = cfg.to_query_pairs()?;
816        let url = self.build_url("/scrape", &pairs)?;
817        let method = match cfg.method {
818            Some(m) => Method::from_bytes(m.as_str().as_bytes())
819                .map_err(|e| ScrapflyError::Config(format!("invalid method: {}", e)))?,
820            None => Method::GET,
821        };
822        let body = cfg.body.clone();
823        let resp = self
824            .send_with_retry(method, url, None, body.map(|b| b.into_bytes()))
825            .await?;
826        // Error restoration: if X-Scrapfly-Reject-Code is present, the
827        // scrape failed. Return a typed error so callers get the same
828        // interface as non-proxified mode.
829        if let Some(reject_code) = resp.headers().get("x-scrapfly-reject-code") {
830            let code = reject_code.to_str().unwrap_or("").to_string();
831            let desc = resp
832                .headers()
833                .get("x-scrapfly-reject-description")
834                .and_then(|v| v.to_str().ok())
835                .unwrap_or("")
836                .to_string();
837            let retryable = resp
838                .headers()
839                .get("x-scrapfly-reject-retryable")
840                .and_then(|v| v.to_str().ok())
841                .unwrap_or("false")
842                == "true";
843            let retry_after_ms: u64 = if retryable {
844                resp.headers()
845                    .get("retry-after")
846                    .and_then(|v| v.to_str().ok())
847                    .and_then(|v| v.parse::<u64>().ok())
848                    .unwrap_or(0)
849                    * 1000 // Retry-After header is in seconds
850            } else {
851                0
852            };
853            let status = resp.status().as_u16();
854            let doc = resp
855                .headers()
856                .get("x-scrapfly-reject-doc")
857                .and_then(|v| v.to_str().ok())
858                .unwrap_or("")
859                .to_string();
860            return Err(ScrapflyError::Api(crate::error::ApiError {
861                code,
862                message: format!("Proxified scrape error: {}", desc),
863                http_status: status,
864                documentation_url: doc,
865                hint: String::new(),
866                retry_after_ms,
867            }));
868        }
869        Ok(resp)
870    }
871
872    /// Screenshot a URL.
873    pub async fn screenshot(
874        &self,
875        config: &ScreenshotConfig,
876    ) -> Result<ScreenshotResult, ScrapflyError> {
877        let pairs = config.to_query_pairs()?;
878        let url = self.build_url("/screenshot", &pairs)?;
879        let resp = self.send_with_retry(Method::GET, url, None, None).await?;
880        let (status, headers, body) = read_response(resp).await?;
881        if status != 200 {
882            return Err(from_response(status, &body, 0, false));
883        }
884        let content_type = headers
885            .get(CONTENT_TYPE)
886            .and_then(|v| v.to_str().ok())
887            .unwrap_or("application/octet-stream");
888        let ext = content_type
889            .split('/')
890            .nth(1)
891            .and_then(|s| s.split(';').next())
892            .unwrap_or("bin")
893            .to_string();
894        let upstream_status_code: u16 = headers
895            .get("x-scrapfly-upstream-http-code")
896            .and_then(|v| v.to_str().ok())
897            .and_then(|s| s.parse().ok())
898            .unwrap_or(0);
899        let upstream_url = headers
900            .get("x-scrapfly-upstream-url")
901            .and_then(|v| v.to_str().ok())
902            .unwrap_or("")
903            .to_string();
904        Ok(ScreenshotResult {
905            image: body,
906            metadata: ScreenshotMetadata {
907                extension_name: ext,
908                upstream_status_code,
909                upstream_url,
910            },
911        })
912    }
913
914    /// Run AI extraction on a document.
915    pub async fn extract(
916        &self,
917        config: &ExtractionConfig,
918    ) -> Result<ExtractionResult, ScrapflyError> {
919        let pairs = config.to_query_pairs()?;
920        let url = self.build_url("/extraction", &pairs)?;
921        let mut headers = HeaderMap::new();
922        headers.insert(
923            CONTENT_TYPE,
924            HeaderValue::from_str(&config.content_type)
925                .map_err(|e| ScrapflyError::Config(format!("invalid content-type: {}", e)))?,
926        );
927        headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
928        if let Some(fmt) = config.document_compression_format {
929            headers.insert(
930                "content-encoding",
931                HeaderValue::from_str(fmt.as_str())
932                    .map_err(|e| ScrapflyError::Config(format!("invalid encoding: {}", e)))?,
933            );
934        }
935        let resp = self
936            .send_with_retry(Method::POST, url, Some(headers), Some(config.body.clone()))
937            .await?;
938        let (status, _h, body_bytes) = read_response(resp).await?;
939        if status != 200 {
940            return Err(from_response(status, &body_bytes, 0, false));
941        }
942        Ok(serde_json::from_slice(&body_bytes)?)
943    }
944
945    // ==============================================================================
946    // Crawler methods
947    // ==============================================================================
948
949    /// Schedule a new crawler job.
950    pub async fn start_crawl(
951        &self,
952        config: &CrawlerConfig,
953    ) -> Result<CrawlerStartResponse, ScrapflyError> {
954        let body = config.to_json_body()?;
955        let url = self.build_url("/crawl", &[])?;
956        let mut headers = HeaderMap::new();
957        headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
958        headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
959        let resp = self
960            .send_with_retry(Method::POST, url, Some(headers), Some(body))
961            .await?;
962        let (status, _h, body_bytes) = read_response(resp).await?;
963        if status != 200 && status != 201 {
964            return Err(from_response(status, &body_bytes, 0, true));
965        }
966        let parsed: CrawlerStartResponse = serde_json::from_slice(&body_bytes)?;
967        if parsed.crawler_uuid.is_empty() {
968            return Err(ScrapflyError::UnexpectedResponseFormat(
969                "crawler start response missing crawler_uuid".into(),
970            ));
971        }
972        Ok(parsed)
973    }
974
975    /// Fetch crawler status.
976    pub async fn crawl_status(&self, uuid: &str) -> Result<CrawlerStatus, ScrapflyError> {
977        if uuid.is_empty() {
978            return Err(ScrapflyError::Config("uuid cannot be empty".into()));
979        }
980        let url = self.build_url(&format!("/crawl/{}/status", uuid), &[])?;
981        let resp = self.send_with_retry(Method::GET, url, None, None).await?;
982        let (status, _h, body) = read_response(resp).await?;
983        if status != 200 {
984            return Err(from_response(status, &body, 0, true));
985        }
986        Ok(serde_json::from_slice(&body)?)
987    }
988
989    /// List crawled URLs (streaming text endpoint).
990    pub async fn crawl_urls(
991        &self,
992        uuid: &str,
993        status_filter: Option<&str>,
994        page: u32,
995        per_page: u32,
996    ) -> Result<CrawlerUrls, ScrapflyError> {
997        if uuid.is_empty() {
998            return Err(ScrapflyError::Config("uuid cannot be empty".into()));
999        }
1000        let page = if page == 0 { 1 } else { page };
1001        let per_page = if per_page == 0 { 100 } else { per_page };
1002        let status_hint = status_filter.unwrap_or("visited");
1003        let mut pairs: Vec<(String, String)> = vec![
1004            ("page".into(), page.to_string()),
1005            ("per_page".into(), per_page.to_string()),
1006        ];
1007        if let Some(s) = status_filter {
1008            pairs.push(("status".into(), s.to_string()));
1009        }
1010        let url = self.build_url(&format!("/crawl/{}/urls", uuid), &pairs)?;
1011        let mut headers = HeaderMap::new();
1012        headers.insert(
1013            ACCEPT,
1014            HeaderValue::from_static("text/plain, application/json"),
1015        );
1016        let resp = self
1017            .send_with_retry(Method::GET, url, Some(headers), None)
1018            .await?;
1019        let (status, resp_headers, body) = read_response(resp).await?;
1020        if status != 200 {
1021            return Err(from_response(status, &body, 0, true));
1022        }
1023        let ct = resp_headers
1024            .get(CONTENT_TYPE)
1025            .and_then(|v| v.to_str().ok())
1026            .unwrap_or("");
1027        if ct.contains("application/json") {
1028            return Err(ScrapflyError::UnexpectedResponseFormat(format!(
1029                "GET /crawl/{}/urls returned JSON on a 200 response (expected text/plain)",
1030                uuid
1031            )));
1032        }
1033        let body_str = std::str::from_utf8(&body)
1034            .map_err(|e| ScrapflyError::UnexpectedResponseFormat(format!("invalid utf8: {}", e)))?;
1035        Ok(CrawlerUrls::from_text(
1036            body_str,
1037            status_hint,
1038            page,
1039            per_page,
1040        ))
1041    }
1042
1043    /// Bulk `GET /crawl/{uuid}/contents` in JSON mode.
1044    pub async fn crawl_contents_json(
1045        &self,
1046        uuid: &str,
1047        format: crate::enums::CrawlerContentFormat,
1048        limit: Option<u32>,
1049        offset: Option<u32>,
1050    ) -> Result<CrawlerContents, ScrapflyError> {
1051        if uuid.is_empty() {
1052            return Err(ScrapflyError::Config("uuid cannot be empty".into()));
1053        }
1054        let mut pairs: Vec<(String, String)> = vec![("formats".into(), format.as_str().into())];
1055        if let Some(l) = limit {
1056            pairs.push(("limit".into(), l.to_string()));
1057        }
1058        if let Some(o) = offset {
1059            pairs.push(("offset".into(), o.to_string()));
1060        }
1061        let url = self.build_url(&format!("/crawl/{}/contents", uuid), &pairs)?;
1062        let mut headers = HeaderMap::new();
1063        headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
1064        let resp = self
1065            .send_with_retry(Method::GET, url, Some(headers), None)
1066            .await?;
1067        let (status, resp_headers, body) = read_response(resp).await?;
1068        if status != 200 {
1069            return Err(from_response(status, &body, 0, true));
1070        }
1071        let ct = resp_headers
1072            .get(CONTENT_TYPE)
1073            .and_then(|v| v.to_str().ok())
1074            .unwrap_or("");
1075        if !ct.contains("application/json") {
1076            return Err(ScrapflyError::UnexpectedResponseFormat(format!(
1077                "expected JSON, got Content-Type={}",
1078                ct
1079            )));
1080        }
1081        Ok(serde_json::from_slice(&body)?)
1082    }
1083
1084    /// Plain single-URL `GET /crawl/{uuid}/contents?plain=true`.
1085    pub async fn crawl_contents_plain(
1086        &self,
1087        uuid: &str,
1088        target_url: &str,
1089        format: crate::enums::CrawlerContentFormat,
1090    ) -> Result<String, ScrapflyError> {
1091        if uuid.is_empty() {
1092            return Err(ScrapflyError::Config("uuid cannot be empty".into()));
1093        }
1094        if target_url.is_empty() {
1095            return Err(ScrapflyError::Config(
1096                "plain mode requires a single url argument".into(),
1097            ));
1098        }
1099        let pairs: Vec<(String, String)> = vec![
1100            ("formats".into(), format.as_str().into()),
1101            ("url".into(), target_url.into()),
1102            ("plain".into(), "true".into()),
1103        ];
1104        let url = self.build_url(&format!("/crawl/{}/contents", uuid), &pairs)?;
1105        let mut headers = HeaderMap::new();
1106        headers.insert(ACCEPT, HeaderValue::from_static("*/*"));
1107        let resp = self
1108            .send_with_retry(Method::GET, url, Some(headers), None)
1109            .await?;
1110        let (status, _h, body) = read_response(resp).await?;
1111        if status != 200 {
1112            return Err(from_response(status, &body, 0, true));
1113        }
1114        Ok(String::from_utf8_lossy(&body).into_owned())
1115    }
1116
1117    /// Bulk-batch `POST /crawl/{uuid}/contents/batch`.
1118    /// Returns `url → format → content` (multipart/related response).
1119    pub async fn crawl_contents_batch(
1120        &self,
1121        uuid: &str,
1122        urls: &[String],
1123        formats: &[crate::enums::CrawlerContentFormat],
1124    ) -> Result<
1125        std::collections::BTreeMap<String, std::collections::BTreeMap<String, String>>,
1126        ScrapflyError,
1127    > {
1128        if uuid.is_empty() {
1129            return Err(ScrapflyError::Config("uuid cannot be empty".into()));
1130        }
1131        if urls.is_empty() {
1132            return Err(ScrapflyError::Config("at least one URL is required".into()));
1133        }
1134        if urls.len() > 100 {
1135            return Err(ScrapflyError::Config(format!(
1136                "batch is limited to 100 URLs per request, got {}",
1137                urls.len()
1138            )));
1139        }
1140        if formats.is_empty() {
1141            return Err(ScrapflyError::Config(
1142                "at least one format is required".into(),
1143            ));
1144        }
1145        let format_strs: Vec<&'static str> = formats.iter().map(|f| f.as_str()).collect();
1146        let pairs: Vec<(String, String)> = vec![("formats".into(), format_strs.join(","))];
1147        let url = self.build_url(&format!("/crawl/{}/contents/batch", uuid), &pairs)?;
1148        let body = urls.join("\n").into_bytes();
1149        let mut headers = HeaderMap::new();
1150        headers.insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
1151        headers.insert(
1152            ACCEPT,
1153            HeaderValue::from_static("multipart/related, application/json"),
1154        );
1155        let resp = self
1156            .send_with_retry(Method::POST, url, Some(headers), Some(body))
1157            .await?;
1158        let (status, resp_headers, body_bytes) = read_response(resp).await?;
1159        if status != 200 {
1160            return Err(from_response(status, &body_bytes, 0, true));
1161        }
1162        let ct = resp_headers
1163            .get(CONTENT_TYPE)
1164            .and_then(|v| v.to_str().ok())
1165            .unwrap_or("");
1166        if ct.contains("application/json") {
1167            return Err(ScrapflyError::UnexpectedResponseFormat(
1168                "CrawlContentsBatch expected multipart/related, got JSON".into(),
1169            ));
1170        }
1171        parse_multipart_related(
1172            std::str::from_utf8(&body_bytes).unwrap_or(""),
1173            ct,
1174            &format_strs,
1175        )
1176    }
1177
1178    /// Cancel a crawler job.
1179    pub async fn crawl_cancel(&self, uuid: &str) -> Result<(), ScrapflyError> {
1180        if uuid.is_empty() {
1181            return Err(ScrapflyError::Config("uuid cannot be empty".into()));
1182        }
1183        let url = self.build_url(&format!("/crawl/{}/cancel", uuid), &[])?;
1184        let mut headers = HeaderMap::new();
1185        headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
1186        let resp = self
1187            .send_with_retry(Method::POST, url, Some(headers), None)
1188            .await?;
1189        let (status, _h, body) = read_response(resp).await?;
1190        if status != 200 && status != 202 {
1191            return Err(from_response(status, &body, 0, true));
1192        }
1193        Ok(())
1194    }
1195
1196    /// Download a crawler artifact (WARC or HAR).
1197    pub async fn crawl_artifact(
1198        &self,
1199        uuid: &str,
1200        artifact_type: CrawlerArtifactType,
1201    ) -> Result<CrawlerArtifact, ScrapflyError> {
1202        if uuid.is_empty() {
1203            return Err(ScrapflyError::Config("uuid cannot be empty".into()));
1204        }
1205        let pairs: Vec<(String, String)> = vec![("type".into(), artifact_type.as_str().into())];
1206        let url = self.build_url(&format!("/crawl/{}/artifact", uuid), &pairs)?;
1207        let mut headers = HeaderMap::new();
1208        // HAR is plain JSON — asking for `application/gzip` makes the server
1209        // gzip-wrap it, and reqwest can't auto-decode it without a matching
1210        // `Content-Encoding` header. Match `sdk/go/crawler.go::CrawlArtifact`
1211        // which sends different Accept per artifact type.
1212        let accept = match artifact_type {
1213            CrawlerArtifactType::Har => "application/json, application/octet-stream",
1214            CrawlerArtifactType::Warc => {
1215                "application/gzip, application/octet-stream, application/json"
1216            }
1217        };
1218        headers.insert(ACCEPT, HeaderValue::from_static(accept));
1219        let resp = self
1220            .send_with_retry(Method::GET, url, Some(headers), None)
1221            .await?;
1222        let (status, _h, body) = read_response(resp).await?;
1223        if status != 200 {
1224            return Err(from_response(status, &body, 0, true));
1225        }
1226        Ok(CrawlerArtifact {
1227            artifact_type,
1228            data: body,
1229        })
1230    }
1231
1232    // ==============================================================================
1233    // Cloud browser methods (implementations in cloud_browser.rs)
1234    // ==============================================================================
1235
1236    /// Fire a request through the retry loop.
1237    pub(crate) async fn send_with_retry(
1238        &self,
1239        method: Method,
1240        url: Url,
1241        headers: Option<HeaderMap>,
1242        body: Option<Vec<u8>>,
1243    ) -> Result<Response, ScrapflyError> {
1244        let mut last_err: Option<ScrapflyError> = None;
1245        for attempt in 0..DEFAULT_RETRIES {
1246            let mut req = self.http.request(method.clone(), url.clone());
1247            let mut hmap = headers.clone().unwrap_or_default();
1248            if !hmap.contains_key(USER_AGENT) {
1249                hmap.insert(USER_AGENT, HeaderValue::from_static(SDK_USER_AGENT));
1250            }
1251            if let Some(cb) = &self.on_request {
1252                cb(&method, &url, &hmap);
1253            }
1254            req = req.headers(hmap);
1255            if let Some(b) = &body {
1256                req = req.body(b.clone());
1257            }
1258            match req.send().await {
1259                Ok(resp) => {
1260                    let status = resp.status().as_u16();
1261                    if (500..600).contains(&status) && attempt + 1 < DEFAULT_RETRIES {
1262                        last_err = Some(ScrapflyError::ApiServer(crate::error::ApiError {
1263                            message: "server error".into(),
1264                            http_status: status,
1265                            ..Default::default()
1266                        }));
1267                        tokio::time::sleep(DEFAULT_RETRY_DELAY).await;
1268                        continue;
1269                    }
1270                    return Ok(resp);
1271                }
1272                Err(e) => {
1273                    last_err = Some(ScrapflyError::Transport(e));
1274                    if attempt + 1 < DEFAULT_RETRIES {
1275                        tokio::time::sleep(DEFAULT_RETRY_DELAY).await;
1276                        continue;
1277                    }
1278                }
1279            }
1280        }
1281        Err(last_err.unwrap_or_else(|| ScrapflyError::Config("retry loop exhausted".into())))
1282    }
1283
1284    /// Single-shot send, no retry (for `verify_api_key`/`account` style calls).
1285    async fn send_simple(
1286        &self,
1287        method: Method,
1288        url: Url,
1289        headers: Option<HeaderMap>,
1290        body: Option<Vec<u8>>,
1291    ) -> Result<Response, ScrapflyError> {
1292        let mut req = self.http.request(method.clone(), url.clone());
1293        let mut hmap = headers.unwrap_or_default();
1294        if !hmap.contains_key(USER_AGENT) {
1295            hmap.insert(USER_AGENT, HeaderValue::from_static(SDK_USER_AGENT));
1296        }
1297        if let Some(cb) = &self.on_request {
1298            cb(&method, &url, &hmap);
1299        }
1300        req = req.headers(hmap);
1301        if let Some(b) = body {
1302            req = req.body(b);
1303        }
1304        req.send().await.map_err(ScrapflyError::Transport)
1305    }
1306}
1307
1308/// Drain a response into (status, headers, body bytes) and propagate
1309/// `Retry-After` into the retry-ms field when present.
1310async fn read_response(resp: Response) -> Result<(u16, HeaderMap, bytes::Bytes), ScrapflyError> {
1311    let status = resp.status().as_u16();
1312    let headers = resp.headers().clone();
1313    let body = resp.bytes().await.map_err(ScrapflyError::Transport)?;
1314    let _ = parse_retry_after(headers.get("retry-after").and_then(|v| v.to_str().ok()));
1315    Ok((status, headers, body))
1316}
1317
1318/// Minimal RFC 2387 multipart/related parser — ported from
1319/// `sdk/go/crawler.go::parseMultipartRelated`.
1320fn parse_multipart_related(
1321    body: &str,
1322    content_type: &str,
1323    formats: &[&str],
1324) -> Result<
1325    std::collections::BTreeMap<String, std::collections::BTreeMap<String, String>>,
1326    ScrapflyError,
1327> {
1328    let mut boundary = String::new();
1329    for part in content_type.split(';') {
1330        let p = part.trim();
1331        if let Some(stripped) = p.strip_prefix("boundary=") {
1332            boundary = stripped.trim_matches('"').to_string();
1333            break;
1334        }
1335    }
1336    if boundary.is_empty() {
1337        return Err(ScrapflyError::UnexpectedResponseFormat(format!(
1338            "multipart response has no boundary in Content-Type: {}",
1339            content_type
1340        )));
1341    }
1342    let delimiter = format!("--{}", boundary);
1343    let mut result: std::collections::BTreeMap<String, std::collections::BTreeMap<String, String>> =
1344        std::collections::BTreeMap::new();
1345    let segments: Vec<&str> = body.split(&delimiter as &str).collect();
1346    for segment in segments.iter().skip(1) {
1347        let mut seg = *segment;
1348        seg = seg.trim_start_matches("\r\n").trim_start_matches('\n');
1349        if seg.starts_with("--") {
1350            break;
1351        }
1352        seg = seg.trim_end_matches("\r\n").trim_end_matches('\n');
1353        let (headers_raw, part_body) = if let Some(idx) = seg.find("\r\n\r\n") {
1354            (&seg[..idx], &seg[idx + 4..])
1355        } else if let Some(idx) = seg.find("\n\n") {
1356            (&seg[..idx], &seg[idx + 2..])
1357        } else {
1358            continue;
1359        };
1360        let mut part_url = String::new();
1361        let mut part_format = String::new();
1362        for line in headers_raw.split('\n') {
1363            let line = line.trim_end_matches('\r');
1364            if let Some(colon) = line.find(':') {
1365                let name = line[..colon].trim().to_ascii_lowercase();
1366                let value = line[colon + 1..].trim().to_string();
1367                match name.as_str() {
1368                    "content-location" => part_url = value,
1369                    "content-type" => part_format = infer_format_from_content_type(&value),
1370                    _ => {}
1371                }
1372            }
1373        }
1374        if part_url.is_empty() {
1375            continue;
1376        }
1377        if part_format.is_empty() {
1378            part_format = formats.first().copied().unwrap_or("html").to_string();
1379        }
1380        result
1381            .entry(part_url)
1382            .or_default()
1383            .insert(part_format, part_body.to_string());
1384    }
1385    Ok(result)
1386}
1387
1388fn infer_format_from_content_type(ct: &str) -> String {
1389    let lc = ct
1390        .split(';')
1391        .next()
1392        .unwrap_or("")
1393        .trim()
1394        .to_ascii_lowercase();
1395    match lc.as_str() {
1396        "text/html" => "html".into(),
1397        "text/markdown" => "markdown".into(),
1398        "text/plain" => "text".into(),
1399        "application/json" => "json".into(),
1400        _ => String::new(),
1401    }
1402}