1use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9
10use futures_util::stream::{Stream, StreamExt};
11use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, CONTENT_TYPE, USER_AGENT};
12use reqwest::{Method, Response, Url};
13
14use crate::config::crawler::CrawlerConfig;
15use crate::config::extraction::ExtractionConfig;
16use crate::config::scrape::ScrapeConfig;
17use crate::config::screenshot::ScreenshotConfig;
18use crate::enums::HttpMethod;
19use crate::error::{from_response, parse_retry_after, ApiError, ScrapflyError};
20use crate::monitoring::{
21 CloudBrowserMonitoringOptions, MonitoringDataFormat, MonitoringMetricsOptions,
22 MonitoringTargetMetricsOptions,
23};
24use crate::result::account::{AccountData, VerifyApiKeyResult};
25use crate::result::classify::{ClassifyRequest, ClassifyResult};
26use crate::result::crawler::{
27 CrawlerArtifact, CrawlerArtifactType, CrawlerContents, CrawlerStartResponse, CrawlerStatus,
28 CrawlerUrls,
29};
30use crate::result::extraction::ExtractionResult;
31use crate::result::scrape::{ResultData, ScrapeResult};
32use crate::result::screenshot::{ScreenshotMetadata, ScreenshotResult};
33
34const DEFAULT_HOST: &str = "https://api.scrapfly.io";
35const DEFAULT_CLOUD_BROWSER_HOST: &str = "https://browser.scrapfly.io";
36const SDK_USER_AGENT: &str = "Scrapfly-Rust-SDK";
37const DEFAULT_RETRIES: usize = 3;
38const DEFAULT_RETRY_DELAY: Duration = Duration::from_secs(1);
39const DEFAULT_TIMEOUT: Duration = Duration::from_secs(150);
40
41pub type OnRequest = Arc<dyn Fn(&Method, &Url, &HeaderMap) + Send + Sync>;
46
47#[derive(Clone)]
50pub struct Client {
51 http: reqwest::Client,
52 key: String,
53 host: String,
54 cloud_browser_host: String,
55 on_request: Option<OnRequest>,
56}
57
58impl std::fmt::Debug for Client {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 f.debug_struct("Client")
61 .field("host", &self.host)
62 .field("cloud_browser_host", &self.cloud_browser_host)
63 .finish()
64 }
65}
66
67#[derive(Default)]
69pub struct ClientBuilder {
70 api_key: Option<String>,
71 host: Option<String>,
72 cloud_browser_host: Option<String>,
73 timeout: Option<Duration>,
74 danger_accept_invalid_certs: bool,
75 http_client: Option<reqwest::Client>,
76 on_request: Option<OnRequest>,
77}
78
79impl ClientBuilder {
80 pub fn api_key(mut self, key: impl Into<String>) -> Self {
82 self.api_key = Some(key.into());
83 self
84 }
85 pub fn host(mut self, host: impl Into<String>) -> Self {
87 self.host = Some(host.into());
88 self
89 }
90 pub fn cloud_browser_host(mut self, host: impl Into<String>) -> Self {
92 self.cloud_browser_host = Some(host.into());
93 self
94 }
95 pub fn timeout(mut self, t: Duration) -> Self {
97 self.timeout = Some(t);
98 self
99 }
100 pub fn danger_accept_invalid_certs(mut self, v: bool) -> Self {
102 self.danger_accept_invalid_certs = v;
103 self
104 }
105 pub fn http_client(mut self, client: reqwest::Client) -> Self {
108 self.http_client = Some(client);
109 self
110 }
111 pub fn on_request(mut self, cb: OnRequest) -> Self {
114 self.on_request = Some(cb);
115 self
116 }
117 pub fn build(self) -> Result<Client, ScrapflyError> {
119 let key = self.api_key.ok_or(ScrapflyError::BadApiKey)?;
120 if key.is_empty() {
121 return Err(ScrapflyError::BadApiKey);
122 }
123
124 let http = if let Some(c) = self.http_client {
125 c
126 } else {
127 let mut builder = reqwest::Client::builder()
128 .timeout(self.timeout.unwrap_or(DEFAULT_TIMEOUT))
129 .user_agent(SDK_USER_AGENT);
130 if self.danger_accept_invalid_certs {
131 builder = builder.danger_accept_invalid_certs(true);
132 }
133 builder.build().map_err(ScrapflyError::Transport)?
134 };
135
136 Ok(Client {
137 http,
138 key,
139 host: self.host.unwrap_or_else(|| DEFAULT_HOST.to_string()),
140 cloud_browser_host: self
141 .cloud_browser_host
142 .unwrap_or_else(|| DEFAULT_CLOUD_BROWSER_HOST.to_string()),
143 on_request: self.on_request,
144 })
145 }
146}
147
148impl Client {
149 pub fn builder() -> ClientBuilder {
151 ClientBuilder::default()
152 }
153
154 pub fn api_key(&self) -> &str {
156 &self.key
157 }
158
159 pub fn host(&self) -> &str {
161 &self.host
162 }
163
164 pub fn cloud_browser_host(&self) -> &str {
166 &self.cloud_browser_host
167 }
168
169 pub(crate) fn build_url_public(
174 &self,
175 path: &str,
176 query: &[(String, String)],
177 ) -> Result<Url, ScrapflyError> {
178 self.build_url(path, query)
179 }
180
181 pub(crate) async fn send_simple_public(
184 &self,
185 method: Method,
186 url: Url,
187 headers: Option<HeaderMap>,
188 body: Option<Vec<u8>>,
189 ) -> Result<Response, ScrapflyError> {
190 self.send_simple(method, url, headers, body).await
191 }
192
193 fn build_url(&self, path: &str, query: &[(String, String)]) -> Result<Url, ScrapflyError> {
194 let mut u = Url::parse(&format!("{}{}", self.host, path))
195 .map_err(|e| ScrapflyError::Config(format!("invalid url: {}", e)))?;
196 {
197 let mut pairs = u.query_pairs_mut();
198 pairs.append_pair("key", &self.key);
199 for (k, v) in query {
200 pairs.append_pair(k, v);
201 }
202 }
203 Ok(u)
204 }
205
206 pub async fn verify_api_key(&self) -> Result<VerifyApiKeyResult, ScrapflyError> {
208 let url = self.build_url("/account", &[])?;
209 let resp = self.send_simple(Method::GET, url, None, None).await?;
210 Ok(VerifyApiKeyResult {
211 valid: resp.status().is_success(),
212 })
213 }
214
215 pub async fn account(&self) -> Result<AccountData, ScrapflyError> {
217 let url = self.build_url("/account", &[])?;
218 let resp = self.send_simple(Method::GET, url, None, None).await?;
219 let (status, _headers, body) = read_response(resp).await?;
220 if status != 200 {
221 return Err(from_response(status, &body, 0, false));
222 }
223 Ok(serde_json::from_slice(&body)?)
224 }
225
226 pub async fn classify(&self, req: &ClassifyRequest) -> Result<ClassifyResult, ScrapflyError> {
233 if req.url.is_empty() {
234 return Err(ScrapflyError::Config("classify: url is required".into()));
235 }
236 if !(100..=599).contains(&req.status_code) {
237 return Err(ScrapflyError::Config(
238 "classify: status_code must be in [100, 599]".into(),
239 ));
240 }
241
242 let url = self.build_url("/classify", &[])?;
243 let body = serde_json::to_vec(req)
244 .map_err(|e| ScrapflyError::Config(format!("marshal classify request: {}", e)))?;
245
246 let mut headers = HeaderMap::new();
247 headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
248 headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
249
250 let resp = self
251 .send_simple(Method::POST, url, Some(headers), Some(body))
252 .await?;
253 let (status, _headers, bytes) = read_response(resp).await?;
254 if status >= 400 {
255 return Err(from_response(status, &bytes, 0, false));
256 }
257 let out: ClassifyResult = serde_json::from_slice(&bytes)
258 .map_err(|e| ScrapflyError::Config(format!("decode classify response: {}", e)))?;
259 Ok(out)
260 }
261
262 fn build_metrics_pairs(opts: &MonitoringMetricsOptions) -> Vec<(String, String)> {
270 let mut pairs: Vec<(String, String)> = Vec::new();
271 let format = opts.format.unwrap_or(MonitoringDataFormat::Structured);
272 pairs.push(("format".into(), format.as_str().into()));
273 if let Some(p) = opts.period {
274 pairs.push(("period".into(), p.as_str().into()));
275 }
276 if let Some(ref aggs) = opts.aggregation {
277 if !aggs.is_empty() {
278 let joined = aggs
279 .iter()
280 .map(|a| a.as_str())
281 .collect::<Vec<_>>()
282 .join(",");
283 pairs.push(("aggregation".into(), joined));
284 }
285 }
286 if opts.include_webhook {
287 pairs.push(("include_webhook".into(), "true".into()));
288 }
289 pairs
290 }
291
292 fn build_target_pairs(
293 opts: &MonitoringTargetMetricsOptions,
294 ) -> Result<Vec<(String, String)>, ScrapflyError> {
295 if opts.domain.is_empty() {
296 return Err(ScrapflyError::Config(
297 "monitoring target metrics: domain is required".into(),
298 ));
299 }
300 if opts.start.is_some() != opts.end.is_some() {
301 return Err(ScrapflyError::Config(
302 "monitoring target metrics: start and end must be provided together".into(),
303 ));
304 }
305 let mut pairs: Vec<(String, String)> = Vec::new();
306 pairs.push(("domain".into(), opts.domain.clone()));
307 pairs.push(("group_subdomain".into(), opts.group_subdomain.to_string()));
308 match (&opts.start, &opts.end) {
309 (Some(s), Some(e)) => {
310 pairs.push(("start".into(), s.clone()));
311 pairs.push(("end".into(), e.clone()));
312 }
313 _ => {
314 let period = opts
315 .period
316 .unwrap_or(crate::monitoring::MonitoringPeriod::Last24h);
317 pairs.push(("period".into(), period.as_str().into()));
318 }
319 }
320 if opts.include_webhook {
321 pairs.push(("include_webhook".into(), "true".into()));
322 }
323 Ok(pairs)
324 }
325
326 async fn fetch_monitoring_json(
327 &self,
328 path: &str,
329 pairs: &[(String, String)],
330 ) -> Result<serde_json::Value, ScrapflyError> {
331 let url = self.build_url(path, pairs)?;
332 let resp = self.send_simple(Method::GET, url, None, None).await?;
333 let (status, _headers, body) = read_response(resp).await?;
334 if status != 200 {
335 return Err(from_response(status, &body, 0, false));
336 }
337 Ok(serde_json::from_slice(&body)?)
338 }
339
340 pub async fn get_monitoring_metrics(
344 &self,
345 opts: &MonitoringMetricsOptions,
346 ) -> Result<serde_json::Value, ScrapflyError> {
347 self.fetch_monitoring_json(
348 "/scrape/monitoring/metrics",
349 &Self::build_metrics_pairs(opts),
350 )
351 .await
352 }
353
354 pub async fn get_monitoring_target_metrics(
356 &self,
357 opts: &MonitoringTargetMetricsOptions,
358 ) -> Result<serde_json::Value, ScrapflyError> {
359 let pairs = Self::build_target_pairs(opts)?;
360 self.fetch_monitoring_json("/scrape/monitoring/metrics/target", &pairs)
361 .await
362 }
363
364 pub async fn get_screenshot_monitoring_metrics(
368 &self,
369 opts: &MonitoringMetricsOptions,
370 ) -> Result<serde_json::Value, ScrapflyError> {
371 self.fetch_monitoring_json(
372 "/screenshot/monitoring/metrics",
373 &Self::build_metrics_pairs(opts),
374 )
375 .await
376 }
377
378 pub async fn get_screenshot_monitoring_target_metrics(
380 &self,
381 opts: &MonitoringTargetMetricsOptions,
382 ) -> Result<serde_json::Value, ScrapflyError> {
383 let pairs = Self::build_target_pairs(opts)?;
384 self.fetch_monitoring_json("/screenshot/monitoring/metrics/target", &pairs)
385 .await
386 }
387
388 pub async fn get_extraction_monitoring_metrics(
392 &self,
393 opts: &MonitoringMetricsOptions,
394 ) -> Result<serde_json::Value, ScrapflyError> {
395 self.fetch_monitoring_json(
396 "/extraction/monitoring/metrics",
397 &Self::build_metrics_pairs(opts),
398 )
399 .await
400 }
401
402 pub async fn get_extraction_monitoring_target_metrics(
404 &self,
405 opts: &MonitoringTargetMetricsOptions,
406 ) -> Result<serde_json::Value, ScrapflyError> {
407 let pairs = Self::build_target_pairs(opts)?;
408 self.fetch_monitoring_json("/extraction/monitoring/metrics/target", &pairs)
409 .await
410 }
411
412 pub async fn get_crawler_monitoring_metrics(
416 &self,
417 opts: &MonitoringMetricsOptions,
418 ) -> Result<serde_json::Value, ScrapflyError> {
419 self.fetch_monitoring_json(
420 "/crawl/monitoring/metrics",
421 &Self::build_metrics_pairs(opts),
422 )
423 .await
424 }
425
426 pub async fn get_crawler_monitoring_target_metrics(
428 &self,
429 opts: &MonitoringTargetMetricsOptions,
430 ) -> Result<serde_json::Value, ScrapflyError> {
431 let pairs = Self::build_target_pairs(opts)?;
432 self.fetch_monitoring_json("/crawl/monitoring/metrics/target", &pairs)
433 .await
434 }
435
436 pub async fn get_browser_monitoring_metrics(
440 &self,
441 opts: &CloudBrowserMonitoringOptions,
442 ) -> Result<serde_json::Value, ScrapflyError> {
443 let pairs = Self::build_browser_pairs(opts)?;
444 self.fetch_monitoring_json("/browser/monitoring/metrics", &pairs)
445 .await
446 }
447
448 pub async fn get_browser_monitoring_timeseries(
450 &self,
451 opts: &CloudBrowserMonitoringOptions,
452 ) -> Result<serde_json::Value, ScrapflyError> {
453 let pairs = Self::build_browser_pairs(opts)?;
454 self.fetch_monitoring_json("/browser/monitoring/metrics/timeseries", &pairs)
455 .await
456 }
457
458 fn build_browser_pairs(
459 opts: &CloudBrowserMonitoringOptions,
460 ) -> Result<Vec<(String, String)>, ScrapflyError> {
461 if opts.start.is_some() != opts.end.is_some() {
462 return Err(ScrapflyError::Config(
463 "cloud browser monitoring: start and end must be provided together".into(),
464 ));
465 }
466 let mut pairs: Vec<(String, String)> = Vec::new();
467 match (&opts.start, &opts.end) {
468 (Some(s), Some(e)) => {
469 pairs.push(("start".into(), s.clone()));
470 pairs.push(("end".into(), e.clone()));
471 }
472 _ => {
473 if let Some(p) = opts.period {
474 pairs.push(("period".into(), p.as_str().into()));
475 }
476 }
477 }
478 if let Some(ref pool) = opts.proxy_pool {
479 pairs.push(("proxy_pool".into(), pool.clone()));
480 }
481 Ok(pairs)
482 }
483
484 pub async fn scrape(&self, config: &ScrapeConfig) -> Result<ScrapeResult, ScrapflyError> {
486 let pairs = config.to_query_pairs()?;
487 let url = self.build_url("/scrape", &pairs)?;
488 let method = match config.method {
489 Some(m) => Method::from_bytes(m.as_str().as_bytes())
490 .map_err(|e| ScrapflyError::Config(format!("invalid method: {}", e)))?,
491 None => Method::GET,
492 };
493 let mut headers = HeaderMap::new();
494 headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
495 let body = config.body.clone();
496 let resp = self
497 .send_with_retry(method, url, Some(headers), body.map(|b| b.into_bytes()))
498 .await?;
499 let (status, _h, body_bytes) = read_response(resp).await?;
500 if status != 200 {
501 return Err(from_response(status, &body_bytes, 0, false));
502 }
503 if matches!(config.method, Some(HttpMethod::Head)) && body_bytes.is_empty() {
509 return Ok(ScrapeResult {
510 uuid: String::new(),
511 config: serde_json::Value::Null,
512 context: serde_json::Value::Null,
513 result: ResultData {
514 status_code: 200,
515 success: true,
516 ..Default::default()
517 },
518 });
519 }
520 let mut result: ScrapeResult = serde_json::from_slice(&body_bytes)?;
521 if !result.result.success {
528 let (err_code, err_message, err_doc) = match &result.result.error {
529 Some(e) => (e.code.clone(), e.message.clone(), e.doc_url.clone()),
530 None => (
531 result.result.status.clone(),
532 format!(
533 "scrape failed with status_code={}",
534 result.result.status_code
535 ),
536 String::new(),
537 ),
538 };
539 let api_err = ApiError {
540 code: err_code,
541 message: err_message,
542 http_status: result.result.status_code,
543 documentation_url: err_doc,
544 hint: String::new(),
545 retry_after_ms: 0,
546 };
547 let sc = result.result.status_code;
548 if (400..500).contains(&sc) {
549 return Err(ScrapflyError::UpstreamClient(api_err));
550 }
551 if (500..600).contains(&sc) {
552 return Err(ScrapflyError::UpstreamServer(api_err));
553 }
554 return Err(ScrapflyError::Api(api_err));
557 }
558 if result.result.success && result.result.status == "DONE" {
564 let fmt = result.result.format.as_str();
565 if fmt == "clob" || fmt == "blob" {
566 let (new_content, new_format) =
567 self.fetch_large_object(&result.result.content, fmt).await?;
568 result.result.content = new_content;
569 result.result.format = new_format;
570 }
571 }
572 Ok(result)
573 }
574
575 async fn fetch_large_object(
579 &self,
580 content_url: &str,
581 format: &str,
582 ) -> Result<(String, String), ScrapflyError> {
583 let mut url = Url::parse(content_url)
584 .map_err(|e| ScrapflyError::Config(format!("invalid large-object url: {}", e)))?;
585 {
587 let existing: Vec<(String, String)> = url
588 .query_pairs()
589 .filter(|(k, _)| k != "key")
590 .map(|(k, v)| (k.into_owned(), v.into_owned()))
591 .collect();
592 let mut qs = url.query_pairs_mut();
593 qs.clear();
594 for (k, v) in existing {
595 qs.append_pair(&k, &v);
596 }
597 qs.append_pair("key", self.api_key());
598 }
599 let mut headers = HeaderMap::new();
600 headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
601 let resp = self
602 .send_with_retry(Method::GET, url, Some(headers), None)
603 .await?;
604 let (status, _h, body) = read_response(resp).await?;
605 if status != 200 {
606 return Err(from_response(status, &body, 0, false));
607 }
608 let new_format = match format {
609 "clob" => "text",
610 "blob" => "binary",
611 _ => {
612 return Err(ScrapflyError::Config(format!(
613 "unsupported large-object format: {}",
614 format
615 )))
616 }
617 };
618 let content = String::from_utf8_lossy(&body).into_owned();
622 Ok((content, new_format.to_string()))
623 }
624
625 pub fn concurrent_scrape<'a, I>(
627 &'a self,
628 configs: I,
629 concurrency_limit: usize,
630 ) -> impl Stream<Item = Result<ScrapeResult, ScrapflyError>> + 'a
631 where
632 I: IntoIterator<Item = ScrapeConfig> + 'a,
633 <I as IntoIterator>::IntoIter: 'a,
634 {
635 let limit = if concurrency_limit == 0 {
636 5
637 } else {
638 concurrency_limit
639 };
640 futures_util::stream::iter(
641 configs
642 .into_iter()
643 .map(move |cfg| async move { self.scrape(&cfg).await }),
644 )
645 .buffer_unordered(limit)
646 }
647
648 pub async fn scrape_batch(
662 &self,
663 configs: &[ScrapeConfig],
664 ) -> Result<impl Stream<Item = (String, crate::batch::BatchOutcome)>, ScrapflyError> {
665 self.scrape_batch_with_options(configs, crate::batch::BatchOptions::default())
666 .await
667 }
668
669 pub async fn scrape_batch_with_options(
672 &self,
673 configs: &[ScrapeConfig],
674 opts: crate::batch::BatchOptions,
675 ) -> Result<impl Stream<Item = (String, crate::batch::BatchOutcome)>, ScrapflyError> {
676 use crate::batch::{
677 build_proxified_response, decode_part_body, parts_from_response, BatchOutcome,
678 };
679
680 if configs.is_empty() {
681 return Err(ScrapflyError::Config(
682 "scrape_batch: configs is empty".into(),
683 ));
684 }
685
686 if configs.len() > 100 {
687 return Err(ScrapflyError::Config(format!(
688 "scrape_batch: max 100 configs per batch (got {})",
689 configs.len()
690 )));
691 }
692
693 let mut seen: HashMap<String, usize> = HashMap::new();
694 let mut body_configs: Vec<HashMap<String, String>> = Vec::with_capacity(configs.len());
695
696 for (i, cfg) in configs.iter().enumerate() {
697 let correlation_id = cfg.correlation_id.clone().ok_or_else(|| {
698 ScrapflyError::Config(format!(
699 "scrape_batch: configs[{}] is missing correlation_id (required for matching streamed parts)",
700 i
701 ))
702 })?;
703
704 if let Some(prev) = seen.get(&correlation_id) {
705 return Err(ScrapflyError::Config(format!(
706 "scrape_batch: correlation_id {:?} reused by configs[{}] and configs[{}]",
707 correlation_id, prev, i
708 )));
709 }
710
711 seen.insert(correlation_id.clone(), i);
712
713 let pairs = cfg.to_query_pairs()?;
714 let mut entry: HashMap<String, String> = HashMap::with_capacity(pairs.len());
715
716 for (k, v) in pairs {
717 if k == "key" {
718 continue;
719 }
720
721 entry.insert(k, v);
722 }
723
724 body_configs.push(entry);
725 }
726
727 let body = serde_json::json!({ "configs": body_configs });
728 let body_bytes = serde_json::to_vec(&body)?;
729
730 let mut url = Url::parse(&self.host)
731 .map_err(|e| ScrapflyError::Config(format!("invalid host: {}", e)))?;
732 url.set_path("/scrape/batch");
733 url.query_pairs_mut().append_pair("key", &self.key);
734
735 let mut headers = HeaderMap::new();
736 headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
737 headers.insert(
738 ACCEPT,
739 HeaderValue::from_static(opts.format.accept_header()),
740 );
741 headers.insert(USER_AGENT, HeaderValue::from_static(SDK_USER_AGENT));
742
743 let method = Method::POST;
744
745 if let Some(cb) = &self.on_request {
746 cb(&method, &url, &headers);
747 }
748
749 let resp = self
750 .http
751 .request(method, url)
752 .headers(headers)
753 .body(body_bytes)
754 .send()
755 .await
756 .map_err(|e| ScrapflyError::Config(format!("scrape_batch: send: {}", e)))?;
757
758 let status = resp.status().as_u16();
759
760 if status != 200 {
761 let body_bytes = resp.bytes().await.unwrap_or_default();
762
763 return Err(from_response(status, &body_bytes, 0, false));
764 }
765
766 let parts_stream = parts_from_response(resp)?;
767
768 Ok(parts_stream.map(|part_r| match part_r {
769 Ok(part) => {
770 let correlation_id = part
771 .headers
772 .get("x-scrapfly-correlation-id")
773 .cloned()
774 .unwrap_or_default();
775
776 if part
781 .headers
782 .get("x-scrapfly-proxified")
783 .map(|v| v == "true")
784 .unwrap_or(false)
785 {
786 let prox = build_proxified_response(part);
787 return (correlation_id, BatchOutcome::Proxified(prox));
788 }
789
790 match decode_part_body::<ScrapeResult>(&part) {
791 Ok(r) => (correlation_id, BatchOutcome::Scrape(r)),
792 Err(e) => (correlation_id, BatchOutcome::Err(e)),
793 }
794 }
795 Err(e) => (String::new(), BatchOutcome::Err(e)),
796 }))
797 }
798
799 pub async fn scrape_proxified(
810 &self,
811 config: &ScrapeConfig,
812 ) -> Result<reqwest::Response, ScrapflyError> {
813 let mut cfg = config.clone();
814 cfg.proxified_response = true;
815 let pairs = cfg.to_query_pairs()?;
816 let url = self.build_url("/scrape", &pairs)?;
817 let method = match cfg.method {
818 Some(m) => Method::from_bytes(m.as_str().as_bytes())
819 .map_err(|e| ScrapflyError::Config(format!("invalid method: {}", e)))?,
820 None => Method::GET,
821 };
822 let body = cfg.body.clone();
823 let resp = self
824 .send_with_retry(method, url, None, body.map(|b| b.into_bytes()))
825 .await?;
826 if let Some(reject_code) = resp.headers().get("x-scrapfly-reject-code") {
830 let code = reject_code.to_str().unwrap_or("").to_string();
831 let desc = resp
832 .headers()
833 .get("x-scrapfly-reject-description")
834 .and_then(|v| v.to_str().ok())
835 .unwrap_or("")
836 .to_string();
837 let retryable = resp
838 .headers()
839 .get("x-scrapfly-reject-retryable")
840 .and_then(|v| v.to_str().ok())
841 .unwrap_or("false")
842 == "true";
843 let retry_after_ms: u64 = if retryable {
844 resp.headers()
845 .get("retry-after")
846 .and_then(|v| v.to_str().ok())
847 .and_then(|v| v.parse::<u64>().ok())
848 .unwrap_or(0)
849 * 1000 } else {
851 0
852 };
853 let status = resp.status().as_u16();
854 let doc = resp
855 .headers()
856 .get("x-scrapfly-reject-doc")
857 .and_then(|v| v.to_str().ok())
858 .unwrap_or("")
859 .to_string();
860 return Err(ScrapflyError::Api(crate::error::ApiError {
861 code,
862 message: format!("Proxified scrape error: {}", desc),
863 http_status: status,
864 documentation_url: doc,
865 hint: String::new(),
866 retry_after_ms,
867 }));
868 }
869 Ok(resp)
870 }
871
872 pub async fn screenshot(
874 &self,
875 config: &ScreenshotConfig,
876 ) -> Result<ScreenshotResult, ScrapflyError> {
877 let pairs = config.to_query_pairs()?;
878 let url = self.build_url("/screenshot", &pairs)?;
879 let resp = self.send_with_retry(Method::GET, url, None, None).await?;
880 let (status, headers, body) = read_response(resp).await?;
881 if status != 200 {
882 return Err(from_response(status, &body, 0, false));
883 }
884 let content_type = headers
885 .get(CONTENT_TYPE)
886 .and_then(|v| v.to_str().ok())
887 .unwrap_or("application/octet-stream");
888 let ext = content_type
889 .split('/')
890 .nth(1)
891 .and_then(|s| s.split(';').next())
892 .unwrap_or("bin")
893 .to_string();
894 let upstream_status_code: u16 = headers
895 .get("x-scrapfly-upstream-http-code")
896 .and_then(|v| v.to_str().ok())
897 .and_then(|s| s.parse().ok())
898 .unwrap_or(0);
899 let upstream_url = headers
900 .get("x-scrapfly-upstream-url")
901 .and_then(|v| v.to_str().ok())
902 .unwrap_or("")
903 .to_string();
904 Ok(ScreenshotResult {
905 image: body,
906 metadata: ScreenshotMetadata {
907 extension_name: ext,
908 upstream_status_code,
909 upstream_url,
910 },
911 })
912 }
913
914 pub async fn extract(
916 &self,
917 config: &ExtractionConfig,
918 ) -> Result<ExtractionResult, ScrapflyError> {
919 let pairs = config.to_query_pairs()?;
920 let url = self.build_url("/extraction", &pairs)?;
921 let mut headers = HeaderMap::new();
922 headers.insert(
923 CONTENT_TYPE,
924 HeaderValue::from_str(&config.content_type)
925 .map_err(|e| ScrapflyError::Config(format!("invalid content-type: {}", e)))?,
926 );
927 headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
928 if let Some(fmt) = config.document_compression_format {
929 headers.insert(
930 "content-encoding",
931 HeaderValue::from_str(fmt.as_str())
932 .map_err(|e| ScrapflyError::Config(format!("invalid encoding: {}", e)))?,
933 );
934 }
935 let resp = self
936 .send_with_retry(Method::POST, url, Some(headers), Some(config.body.clone()))
937 .await?;
938 let (status, _h, body_bytes) = read_response(resp).await?;
939 if status != 200 {
940 return Err(from_response(status, &body_bytes, 0, false));
941 }
942 Ok(serde_json::from_slice(&body_bytes)?)
943 }
944
945 pub async fn start_crawl(
951 &self,
952 config: &CrawlerConfig,
953 ) -> Result<CrawlerStartResponse, ScrapflyError> {
954 let body = config.to_json_body()?;
955 let url = self.build_url("/crawl", &[])?;
956 let mut headers = HeaderMap::new();
957 headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
958 headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
959 let resp = self
960 .send_with_retry(Method::POST, url, Some(headers), Some(body))
961 .await?;
962 let (status, _h, body_bytes) = read_response(resp).await?;
963 if status != 200 && status != 201 {
964 return Err(from_response(status, &body_bytes, 0, true));
965 }
966 let parsed: CrawlerStartResponse = serde_json::from_slice(&body_bytes)?;
967 if parsed.crawler_uuid.is_empty() {
968 return Err(ScrapflyError::UnexpectedResponseFormat(
969 "crawler start response missing crawler_uuid".into(),
970 ));
971 }
972 Ok(parsed)
973 }
974
975 pub async fn crawl_status(&self, uuid: &str) -> Result<CrawlerStatus, ScrapflyError> {
977 if uuid.is_empty() {
978 return Err(ScrapflyError::Config("uuid cannot be empty".into()));
979 }
980 let url = self.build_url(&format!("/crawl/{}/status", uuid), &[])?;
981 let resp = self.send_with_retry(Method::GET, url, None, None).await?;
982 let (status, _h, body) = read_response(resp).await?;
983 if status != 200 {
984 return Err(from_response(status, &body, 0, true));
985 }
986 Ok(serde_json::from_slice(&body)?)
987 }
988
989 pub async fn crawl_urls(
991 &self,
992 uuid: &str,
993 status_filter: Option<&str>,
994 page: u32,
995 per_page: u32,
996 ) -> Result<CrawlerUrls, ScrapflyError> {
997 if uuid.is_empty() {
998 return Err(ScrapflyError::Config("uuid cannot be empty".into()));
999 }
1000 let page = if page == 0 { 1 } else { page };
1001 let per_page = if per_page == 0 { 100 } else { per_page };
1002 let status_hint = status_filter.unwrap_or("visited");
1003 let mut pairs: Vec<(String, String)> = vec![
1004 ("page".into(), page.to_string()),
1005 ("per_page".into(), per_page.to_string()),
1006 ];
1007 if let Some(s) = status_filter {
1008 pairs.push(("status".into(), s.to_string()));
1009 }
1010 let url = self.build_url(&format!("/crawl/{}/urls", uuid), &pairs)?;
1011 let mut headers = HeaderMap::new();
1012 headers.insert(
1013 ACCEPT,
1014 HeaderValue::from_static("text/plain, application/json"),
1015 );
1016 let resp = self
1017 .send_with_retry(Method::GET, url, Some(headers), None)
1018 .await?;
1019 let (status, resp_headers, body) = read_response(resp).await?;
1020 if status != 200 {
1021 return Err(from_response(status, &body, 0, true));
1022 }
1023 let ct = resp_headers
1024 .get(CONTENT_TYPE)
1025 .and_then(|v| v.to_str().ok())
1026 .unwrap_or("");
1027 if ct.contains("application/json") {
1028 return Err(ScrapflyError::UnexpectedResponseFormat(format!(
1029 "GET /crawl/{}/urls returned JSON on a 200 response (expected text/plain)",
1030 uuid
1031 )));
1032 }
1033 let body_str = std::str::from_utf8(&body)
1034 .map_err(|e| ScrapflyError::UnexpectedResponseFormat(format!("invalid utf8: {}", e)))?;
1035 Ok(CrawlerUrls::from_text(
1036 body_str,
1037 status_hint,
1038 page,
1039 per_page,
1040 ))
1041 }
1042
1043 pub async fn crawl_contents_json(
1045 &self,
1046 uuid: &str,
1047 format: crate::enums::CrawlerContentFormat,
1048 limit: Option<u32>,
1049 offset: Option<u32>,
1050 ) -> Result<CrawlerContents, ScrapflyError> {
1051 if uuid.is_empty() {
1052 return Err(ScrapflyError::Config("uuid cannot be empty".into()));
1053 }
1054 let mut pairs: Vec<(String, String)> = vec![("formats".into(), format.as_str().into())];
1055 if let Some(l) = limit {
1056 pairs.push(("limit".into(), l.to_string()));
1057 }
1058 if let Some(o) = offset {
1059 pairs.push(("offset".into(), o.to_string()));
1060 }
1061 let url = self.build_url(&format!("/crawl/{}/contents", uuid), &pairs)?;
1062 let mut headers = HeaderMap::new();
1063 headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
1064 let resp = self
1065 .send_with_retry(Method::GET, url, Some(headers), None)
1066 .await?;
1067 let (status, resp_headers, body) = read_response(resp).await?;
1068 if status != 200 {
1069 return Err(from_response(status, &body, 0, true));
1070 }
1071 let ct = resp_headers
1072 .get(CONTENT_TYPE)
1073 .and_then(|v| v.to_str().ok())
1074 .unwrap_or("");
1075 if !ct.contains("application/json") {
1076 return Err(ScrapflyError::UnexpectedResponseFormat(format!(
1077 "expected JSON, got Content-Type={}",
1078 ct
1079 )));
1080 }
1081 Ok(serde_json::from_slice(&body)?)
1082 }
1083
1084 pub async fn crawl_contents_plain(
1086 &self,
1087 uuid: &str,
1088 target_url: &str,
1089 format: crate::enums::CrawlerContentFormat,
1090 ) -> Result<String, ScrapflyError> {
1091 if uuid.is_empty() {
1092 return Err(ScrapflyError::Config("uuid cannot be empty".into()));
1093 }
1094 if target_url.is_empty() {
1095 return Err(ScrapflyError::Config(
1096 "plain mode requires a single url argument".into(),
1097 ));
1098 }
1099 let pairs: Vec<(String, String)> = vec![
1100 ("formats".into(), format.as_str().into()),
1101 ("url".into(), target_url.into()),
1102 ("plain".into(), "true".into()),
1103 ];
1104 let url = self.build_url(&format!("/crawl/{}/contents", uuid), &pairs)?;
1105 let mut headers = HeaderMap::new();
1106 headers.insert(ACCEPT, HeaderValue::from_static("*/*"));
1107 let resp = self
1108 .send_with_retry(Method::GET, url, Some(headers), None)
1109 .await?;
1110 let (status, _h, body) = read_response(resp).await?;
1111 if status != 200 {
1112 return Err(from_response(status, &body, 0, true));
1113 }
1114 Ok(String::from_utf8_lossy(&body).into_owned())
1115 }
1116
1117 pub async fn crawl_contents_batch(
1120 &self,
1121 uuid: &str,
1122 urls: &[String],
1123 formats: &[crate::enums::CrawlerContentFormat],
1124 ) -> Result<
1125 std::collections::BTreeMap<String, std::collections::BTreeMap<String, String>>,
1126 ScrapflyError,
1127 > {
1128 if uuid.is_empty() {
1129 return Err(ScrapflyError::Config("uuid cannot be empty".into()));
1130 }
1131 if urls.is_empty() {
1132 return Err(ScrapflyError::Config("at least one URL is required".into()));
1133 }
1134 if urls.len() > 100 {
1135 return Err(ScrapflyError::Config(format!(
1136 "batch is limited to 100 URLs per request, got {}",
1137 urls.len()
1138 )));
1139 }
1140 if formats.is_empty() {
1141 return Err(ScrapflyError::Config(
1142 "at least one format is required".into(),
1143 ));
1144 }
1145 let format_strs: Vec<&'static str> = formats.iter().map(|f| f.as_str()).collect();
1146 let pairs: Vec<(String, String)> = vec![("formats".into(), format_strs.join(","))];
1147 let url = self.build_url(&format!("/crawl/{}/contents/batch", uuid), &pairs)?;
1148 let body = urls.join("\n").into_bytes();
1149 let mut headers = HeaderMap::new();
1150 headers.insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
1151 headers.insert(
1152 ACCEPT,
1153 HeaderValue::from_static("multipart/related, application/json"),
1154 );
1155 let resp = self
1156 .send_with_retry(Method::POST, url, Some(headers), Some(body))
1157 .await?;
1158 let (status, resp_headers, body_bytes) = read_response(resp).await?;
1159 if status != 200 {
1160 return Err(from_response(status, &body_bytes, 0, true));
1161 }
1162 let ct = resp_headers
1163 .get(CONTENT_TYPE)
1164 .and_then(|v| v.to_str().ok())
1165 .unwrap_or("");
1166 if ct.contains("application/json") {
1167 return Err(ScrapflyError::UnexpectedResponseFormat(
1168 "CrawlContentsBatch expected multipart/related, got JSON".into(),
1169 ));
1170 }
1171 parse_multipart_related(
1172 std::str::from_utf8(&body_bytes).unwrap_or(""),
1173 ct,
1174 &format_strs,
1175 )
1176 }
1177
1178 pub async fn crawl_cancel(&self, uuid: &str) -> Result<(), ScrapflyError> {
1180 if uuid.is_empty() {
1181 return Err(ScrapflyError::Config("uuid cannot be empty".into()));
1182 }
1183 let url = self.build_url(&format!("/crawl/{}/cancel", uuid), &[])?;
1184 let mut headers = HeaderMap::new();
1185 headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
1186 let resp = self
1187 .send_with_retry(Method::POST, url, Some(headers), None)
1188 .await?;
1189 let (status, _h, body) = read_response(resp).await?;
1190 if status != 200 && status != 202 {
1191 return Err(from_response(status, &body, 0, true));
1192 }
1193 Ok(())
1194 }
1195
1196 pub async fn crawl_artifact(
1198 &self,
1199 uuid: &str,
1200 artifact_type: CrawlerArtifactType,
1201 ) -> Result<CrawlerArtifact, ScrapflyError> {
1202 if uuid.is_empty() {
1203 return Err(ScrapflyError::Config("uuid cannot be empty".into()));
1204 }
1205 let pairs: Vec<(String, String)> = vec![("type".into(), artifact_type.as_str().into())];
1206 let url = self.build_url(&format!("/crawl/{}/artifact", uuid), &pairs)?;
1207 let mut headers = HeaderMap::new();
1208 let accept = match artifact_type {
1213 CrawlerArtifactType::Har => "application/json, application/octet-stream",
1214 CrawlerArtifactType::Warc => {
1215 "application/gzip, application/octet-stream, application/json"
1216 }
1217 };
1218 headers.insert(ACCEPT, HeaderValue::from_static(accept));
1219 let resp = self
1220 .send_with_retry(Method::GET, url, Some(headers), None)
1221 .await?;
1222 let (status, _h, body) = read_response(resp).await?;
1223 if status != 200 {
1224 return Err(from_response(status, &body, 0, true));
1225 }
1226 Ok(CrawlerArtifact {
1227 artifact_type,
1228 data: body,
1229 })
1230 }
1231
1232 pub(crate) async fn send_with_retry(
1238 &self,
1239 method: Method,
1240 url: Url,
1241 headers: Option<HeaderMap>,
1242 body: Option<Vec<u8>>,
1243 ) -> Result<Response, ScrapflyError> {
1244 let mut last_err: Option<ScrapflyError> = None;
1245 for attempt in 0..DEFAULT_RETRIES {
1246 let mut req = self.http.request(method.clone(), url.clone());
1247 let mut hmap = headers.clone().unwrap_or_default();
1248 if !hmap.contains_key(USER_AGENT) {
1249 hmap.insert(USER_AGENT, HeaderValue::from_static(SDK_USER_AGENT));
1250 }
1251 if let Some(cb) = &self.on_request {
1252 cb(&method, &url, &hmap);
1253 }
1254 req = req.headers(hmap);
1255 if let Some(b) = &body {
1256 req = req.body(b.clone());
1257 }
1258 match req.send().await {
1259 Ok(resp) => {
1260 let status = resp.status().as_u16();
1261 if (500..600).contains(&status) && attempt + 1 < DEFAULT_RETRIES {
1262 last_err = Some(ScrapflyError::ApiServer(crate::error::ApiError {
1263 message: "server error".into(),
1264 http_status: status,
1265 ..Default::default()
1266 }));
1267 tokio::time::sleep(DEFAULT_RETRY_DELAY).await;
1268 continue;
1269 }
1270 return Ok(resp);
1271 }
1272 Err(e) => {
1273 last_err = Some(ScrapflyError::Transport(e));
1274 if attempt + 1 < DEFAULT_RETRIES {
1275 tokio::time::sleep(DEFAULT_RETRY_DELAY).await;
1276 continue;
1277 }
1278 }
1279 }
1280 }
1281 Err(last_err.unwrap_or_else(|| ScrapflyError::Config("retry loop exhausted".into())))
1282 }
1283
1284 async fn send_simple(
1286 &self,
1287 method: Method,
1288 url: Url,
1289 headers: Option<HeaderMap>,
1290 body: Option<Vec<u8>>,
1291 ) -> Result<Response, ScrapflyError> {
1292 let mut req = self.http.request(method.clone(), url.clone());
1293 let mut hmap = headers.unwrap_or_default();
1294 if !hmap.contains_key(USER_AGENT) {
1295 hmap.insert(USER_AGENT, HeaderValue::from_static(SDK_USER_AGENT));
1296 }
1297 if let Some(cb) = &self.on_request {
1298 cb(&method, &url, &hmap);
1299 }
1300 req = req.headers(hmap);
1301 if let Some(b) = body {
1302 req = req.body(b);
1303 }
1304 req.send().await.map_err(ScrapflyError::Transport)
1305 }
1306}
1307
1308async fn read_response(resp: Response) -> Result<(u16, HeaderMap, bytes::Bytes), ScrapflyError> {
1311 let status = resp.status().as_u16();
1312 let headers = resp.headers().clone();
1313 let body = resp.bytes().await.map_err(ScrapflyError::Transport)?;
1314 let _ = parse_retry_after(headers.get("retry-after").and_then(|v| v.to_str().ok()));
1315 Ok((status, headers, body))
1316}
1317
1318fn parse_multipart_related(
1321 body: &str,
1322 content_type: &str,
1323 formats: &[&str],
1324) -> Result<
1325 std::collections::BTreeMap<String, std::collections::BTreeMap<String, String>>,
1326 ScrapflyError,
1327> {
1328 let mut boundary = String::new();
1329 for part in content_type.split(';') {
1330 let p = part.trim();
1331 if let Some(stripped) = p.strip_prefix("boundary=") {
1332 boundary = stripped.trim_matches('"').to_string();
1333 break;
1334 }
1335 }
1336 if boundary.is_empty() {
1337 return Err(ScrapflyError::UnexpectedResponseFormat(format!(
1338 "multipart response has no boundary in Content-Type: {}",
1339 content_type
1340 )));
1341 }
1342 let delimiter = format!("--{}", boundary);
1343 let mut result: std::collections::BTreeMap<String, std::collections::BTreeMap<String, String>> =
1344 std::collections::BTreeMap::new();
1345 let segments: Vec<&str> = body.split(&delimiter as &str).collect();
1346 for segment in segments.iter().skip(1) {
1347 let mut seg = *segment;
1348 seg = seg.trim_start_matches("\r\n").trim_start_matches('\n');
1349 if seg.starts_with("--") {
1350 break;
1351 }
1352 seg = seg.trim_end_matches("\r\n").trim_end_matches('\n');
1353 let (headers_raw, part_body) = if let Some(idx) = seg.find("\r\n\r\n") {
1354 (&seg[..idx], &seg[idx + 4..])
1355 } else if let Some(idx) = seg.find("\n\n") {
1356 (&seg[..idx], &seg[idx + 2..])
1357 } else {
1358 continue;
1359 };
1360 let mut part_url = String::new();
1361 let mut part_format = String::new();
1362 for line in headers_raw.split('\n') {
1363 let line = line.trim_end_matches('\r');
1364 if let Some(colon) = line.find(':') {
1365 let name = line[..colon].trim().to_ascii_lowercase();
1366 let value = line[colon + 1..].trim().to_string();
1367 match name.as_str() {
1368 "content-location" => part_url = value,
1369 "content-type" => part_format = infer_format_from_content_type(&value),
1370 _ => {}
1371 }
1372 }
1373 }
1374 if part_url.is_empty() {
1375 continue;
1376 }
1377 if part_format.is_empty() {
1378 part_format = formats.first().copied().unwrap_or("html").to_string();
1379 }
1380 result
1381 .entry(part_url)
1382 .or_default()
1383 .insert(part_format, part_body.to_string());
1384 }
1385 Ok(result)
1386}
1387
1388fn infer_format_from_content_type(ct: &str) -> String {
1389 let lc = ct
1390 .split(';')
1391 .next()
1392 .unwrap_or("")
1393 .trim()
1394 .to_ascii_lowercase();
1395 match lc.as_str() {
1396 "text/html" => "html".into(),
1397 "text/markdown" => "markdown".into(),
1398 "text/plain" => "text".into(),
1399 "application/json" => "json".into(),
1400 _ => String::new(),
1401 }
1402}