use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use futures_util::stream::{Stream, StreamExt};
use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, CONTENT_TYPE, USER_AGENT};
use reqwest::{Method, Response, Url};
use crate::config::crawler::CrawlerConfig;
use crate::config::extraction::ExtractionConfig;
use crate::config::scrape::ScrapeConfig;
use crate::config::screenshot::ScreenshotConfig;
use crate::enums::HttpMethod;
use crate::error::{from_response, parse_retry_after, ApiError, ScrapflyError};
use crate::monitoring::{
CloudBrowserMonitoringOptions, MonitoringDataFormat, MonitoringMetricsOptions,
MonitoringTargetMetricsOptions,
};
use crate::result::account::{AccountData, VerifyApiKeyResult};
use crate::result::classify::{ClassifyRequest, ClassifyResult};
use crate::result::crawler::{
CrawlerArtifact, CrawlerArtifactType, CrawlerContents, CrawlerStartResponse, CrawlerStatus,
CrawlerUrls,
};
use crate::result::extraction::ExtractionResult;
use crate::result::scrape::{ResultData, ScrapeResult};
use crate::result::screenshot::{ScreenshotMetadata, ScreenshotResult};
const DEFAULT_HOST: &str = "https://api.scrapfly.io";
const DEFAULT_CLOUD_BROWSER_HOST: &str = "https://browser.scrapfly.io";
const SDK_USER_AGENT: &str = "Scrapfly-Rust-SDK";
const DEFAULT_RETRIES: usize = 3;
const DEFAULT_RETRY_DELAY: Duration = Duration::from_secs(1);
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(150);
pub type OnRequest = Arc<dyn Fn(&Method, &Url, &HeaderMap) + Send + Sync>;
#[derive(Clone)]
pub struct Client {
http: reqwest::Client,
key: String,
host: String,
cloud_browser_host: String,
on_request: Option<OnRequest>,
}
impl std::fmt::Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Client")
.field("host", &self.host)
.field("cloud_browser_host", &self.cloud_browser_host)
.finish()
}
}
#[derive(Default)]
pub struct ClientBuilder {
api_key: Option<String>,
host: Option<String>,
cloud_browser_host: Option<String>,
timeout: Option<Duration>,
danger_accept_invalid_certs: bool,
http_client: Option<reqwest::Client>,
on_request: Option<OnRequest>,
}
impl ClientBuilder {
pub fn api_key(mut self, key: impl Into<String>) -> Self {
self.api_key = Some(key.into());
self
}
pub fn host(mut self, host: impl Into<String>) -> Self {
self.host = Some(host.into());
self
}
pub fn cloud_browser_host(mut self, host: impl Into<String>) -> Self {
self.cloud_browser_host = Some(host.into());
self
}
pub fn timeout(mut self, t: Duration) -> Self {
self.timeout = Some(t);
self
}
pub fn danger_accept_invalid_certs(mut self, v: bool) -> Self {
self.danger_accept_invalid_certs = v;
self
}
pub fn http_client(mut self, client: reqwest::Client) -> Self {
self.http_client = Some(client);
self
}
pub fn on_request(mut self, cb: OnRequest) -> Self {
self.on_request = Some(cb);
self
}
pub fn build(self) -> Result<Client, ScrapflyError> {
let key = self.api_key.ok_or(ScrapflyError::BadApiKey)?;
if key.is_empty() {
return Err(ScrapflyError::BadApiKey);
}
let http = if let Some(c) = self.http_client {
c
} else {
let mut builder = reqwest::Client::builder()
.timeout(self.timeout.unwrap_or(DEFAULT_TIMEOUT))
.user_agent(SDK_USER_AGENT);
if self.danger_accept_invalid_certs {
builder = builder.danger_accept_invalid_certs(true);
}
builder.build().map_err(ScrapflyError::Transport)?
};
Ok(Client {
http,
key,
host: self.host.unwrap_or_else(|| DEFAULT_HOST.to_string()),
cloud_browser_host: self
.cloud_browser_host
.unwrap_or_else(|| DEFAULT_CLOUD_BROWSER_HOST.to_string()),
on_request: self.on_request,
})
}
}
impl Client {
pub fn builder() -> ClientBuilder {
ClientBuilder::default()
}
pub fn api_key(&self) -> &str {
&self.key
}
pub fn host(&self) -> &str {
&self.host
}
pub fn cloud_browser_host(&self) -> &str {
&self.cloud_browser_host
}
pub(crate) fn build_url_public(
&self,
path: &str,
query: &[(String, String)],
) -> Result<Url, ScrapflyError> {
self.build_url(path, query)
}
pub(crate) async fn send_simple_public(
&self,
method: Method,
url: Url,
headers: Option<HeaderMap>,
body: Option<Vec<u8>>,
) -> Result<Response, ScrapflyError> {
self.send_simple(method, url, headers, body).await
}
fn build_url(&self, path: &str, query: &[(String, String)]) -> Result<Url, ScrapflyError> {
let mut u = Url::parse(&format!("{}{}", self.host, path))
.map_err(|e| ScrapflyError::Config(format!("invalid url: {}", e)))?;
{
let mut pairs = u.query_pairs_mut();
pairs.append_pair("key", &self.key);
for (k, v) in query {
pairs.append_pair(k, v);
}
}
Ok(u)
}
pub async fn verify_api_key(&self) -> Result<VerifyApiKeyResult, ScrapflyError> {
let url = self.build_url("/account", &[])?;
let resp = self.send_simple(Method::GET, url, None, None).await?;
Ok(VerifyApiKeyResult {
valid: resp.status().is_success(),
})
}
pub async fn account(&self) -> Result<AccountData, ScrapflyError> {
let url = self.build_url("/account", &[])?;
let resp = self.send_simple(Method::GET, url, None, None).await?;
let (status, _headers, body) = read_response(resp).await?;
if status != 200 {
return Err(from_response(status, &body, 0, false));
}
Ok(serde_json::from_slice(&body)?)
}
pub async fn classify(&self, req: &ClassifyRequest) -> Result<ClassifyResult, ScrapflyError> {
if req.url.is_empty() {
return Err(ScrapflyError::Config("classify: url is required".into()));
}
if !(100..=599).contains(&req.status_code) {
return Err(ScrapflyError::Config(
"classify: status_code must be in [100, 599]".into(),
));
}
let url = self.build_url("/classify", &[])?;
let body = serde_json::to_vec(req)
.map_err(|e| ScrapflyError::Config(format!("marshal classify request: {}", e)))?;
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
let resp = self
.send_simple(Method::POST, url, Some(headers), Some(body))
.await?;
let (status, _headers, bytes) = read_response(resp).await?;
if status >= 400 {
return Err(from_response(status, &bytes, 0, false));
}
let out: ClassifyResult = serde_json::from_slice(&bytes)
.map_err(|e| ScrapflyError::Config(format!("decode classify response: {}", e)))?;
Ok(out)
}
fn build_metrics_pairs(opts: &MonitoringMetricsOptions) -> Vec<(String, String)> {
let mut pairs: Vec<(String, String)> = Vec::new();
let format = opts.format.unwrap_or(MonitoringDataFormat::Structured);
pairs.push(("format".into(), format.as_str().into()));
if let Some(p) = opts.period {
pairs.push(("period".into(), p.as_str().into()));
}
if let Some(ref aggs) = opts.aggregation {
if !aggs.is_empty() {
let joined = aggs
.iter()
.map(|a| a.as_str())
.collect::<Vec<_>>()
.join(",");
pairs.push(("aggregation".into(), joined));
}
}
if opts.include_webhook {
pairs.push(("include_webhook".into(), "true".into()));
}
pairs
}
fn build_target_pairs(
opts: &MonitoringTargetMetricsOptions,
) -> Result<Vec<(String, String)>, ScrapflyError> {
if opts.domain.is_empty() {
return Err(ScrapflyError::Config(
"monitoring target metrics: domain is required".into(),
));
}
if opts.start.is_some() != opts.end.is_some() {
return Err(ScrapflyError::Config(
"monitoring target metrics: start and end must be provided together".into(),
));
}
let mut pairs: Vec<(String, String)> = Vec::new();
pairs.push(("domain".into(), opts.domain.clone()));
pairs.push(("group_subdomain".into(), opts.group_subdomain.to_string()));
match (&opts.start, &opts.end) {
(Some(s), Some(e)) => {
pairs.push(("start".into(), s.clone()));
pairs.push(("end".into(), e.clone()));
}
_ => {
let period = opts
.period
.unwrap_or(crate::monitoring::MonitoringPeriod::Last24h);
pairs.push(("period".into(), period.as_str().into()));
}
}
if opts.include_webhook {
pairs.push(("include_webhook".into(), "true".into()));
}
Ok(pairs)
}
async fn fetch_monitoring_json(
&self,
path: &str,
pairs: &[(String, String)],
) -> Result<serde_json::Value, ScrapflyError> {
let url = self.build_url(path, pairs)?;
let resp = self.send_simple(Method::GET, url, None, None).await?;
let (status, _headers, body) = read_response(resp).await?;
if status != 200 {
return Err(from_response(status, &body, 0, false));
}
Ok(serde_json::from_slice(&body)?)
}
pub async fn get_monitoring_metrics(
&self,
opts: &MonitoringMetricsOptions,
) -> Result<serde_json::Value, ScrapflyError> {
self.fetch_monitoring_json(
"/scrape/monitoring/metrics",
&Self::build_metrics_pairs(opts),
)
.await
}
pub async fn get_monitoring_target_metrics(
&self,
opts: &MonitoringTargetMetricsOptions,
) -> Result<serde_json::Value, ScrapflyError> {
let pairs = Self::build_target_pairs(opts)?;
self.fetch_monitoring_json("/scrape/monitoring/metrics/target", &pairs)
.await
}
pub async fn get_screenshot_monitoring_metrics(
&self,
opts: &MonitoringMetricsOptions,
) -> Result<serde_json::Value, ScrapflyError> {
self.fetch_monitoring_json(
"/screenshot/monitoring/metrics",
&Self::build_metrics_pairs(opts),
)
.await
}
pub async fn get_screenshot_monitoring_target_metrics(
&self,
opts: &MonitoringTargetMetricsOptions,
) -> Result<serde_json::Value, ScrapflyError> {
let pairs = Self::build_target_pairs(opts)?;
self.fetch_monitoring_json("/screenshot/monitoring/metrics/target", &pairs)
.await
}
pub async fn get_extraction_monitoring_metrics(
&self,
opts: &MonitoringMetricsOptions,
) -> Result<serde_json::Value, ScrapflyError> {
self.fetch_monitoring_json(
"/extraction/monitoring/metrics",
&Self::build_metrics_pairs(opts),
)
.await
}
pub async fn get_extraction_monitoring_target_metrics(
&self,
opts: &MonitoringTargetMetricsOptions,
) -> Result<serde_json::Value, ScrapflyError> {
let pairs = Self::build_target_pairs(opts)?;
self.fetch_monitoring_json("/extraction/monitoring/metrics/target", &pairs)
.await
}
pub async fn get_crawler_monitoring_metrics(
&self,
opts: &MonitoringMetricsOptions,
) -> Result<serde_json::Value, ScrapflyError> {
self.fetch_monitoring_json(
"/crawl/monitoring/metrics",
&Self::build_metrics_pairs(opts),
)
.await
}
pub async fn get_crawler_monitoring_target_metrics(
&self,
opts: &MonitoringTargetMetricsOptions,
) -> Result<serde_json::Value, ScrapflyError> {
let pairs = Self::build_target_pairs(opts)?;
self.fetch_monitoring_json("/crawl/monitoring/metrics/target", &pairs)
.await
}
pub async fn get_browser_monitoring_metrics(
&self,
opts: &CloudBrowserMonitoringOptions,
) -> Result<serde_json::Value, ScrapflyError> {
let pairs = Self::build_browser_pairs(opts)?;
self.fetch_monitoring_json("/browser/monitoring/metrics", &pairs)
.await
}
pub async fn get_browser_monitoring_timeseries(
&self,
opts: &CloudBrowserMonitoringOptions,
) -> Result<serde_json::Value, ScrapflyError> {
let pairs = Self::build_browser_pairs(opts)?;
self.fetch_monitoring_json("/browser/monitoring/metrics/timeseries", &pairs)
.await
}
fn build_browser_pairs(
opts: &CloudBrowserMonitoringOptions,
) -> Result<Vec<(String, String)>, ScrapflyError> {
if opts.start.is_some() != opts.end.is_some() {
return Err(ScrapflyError::Config(
"cloud browser monitoring: start and end must be provided together".into(),
));
}
let mut pairs: Vec<(String, String)> = Vec::new();
match (&opts.start, &opts.end) {
(Some(s), Some(e)) => {
pairs.push(("start".into(), s.clone()));
pairs.push(("end".into(), e.clone()));
}
_ => {
if let Some(p) = opts.period {
pairs.push(("period".into(), p.as_str().into()));
}
}
}
if let Some(ref pool) = opts.proxy_pool {
pairs.push(("proxy_pool".into(), pool.clone()));
}
Ok(pairs)
}
pub async fn scrape(&self, config: &ScrapeConfig) -> Result<ScrapeResult, ScrapflyError> {
let pairs = config.to_query_pairs()?;
let url = self.build_url("/scrape", &pairs)?;
let method = match config.method {
Some(m) => Method::from_bytes(m.as_str().as_bytes())
.map_err(|e| ScrapflyError::Config(format!("invalid method: {}", e)))?,
None => Method::GET,
};
let mut headers = HeaderMap::new();
headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
let body = config.body.clone();
let resp = self
.send_with_retry(method, url, Some(headers), body.map(|b| b.into_bytes()))
.await?;
let (status, _h, body_bytes) = read_response(resp).await?;
if status != 200 {
return Err(from_response(status, &body_bytes, 0, false));
}
if matches!(config.method, Some(HttpMethod::Head)) && body_bytes.is_empty() {
return Ok(ScrapeResult {
uuid: String::new(),
config: serde_json::Value::Null,
context: serde_json::Value::Null,
result: ResultData {
status_code: 200,
success: true,
..Default::default()
},
});
}
let mut result: ScrapeResult = serde_json::from_slice(&body_bytes)?;
if !result.result.success {
let (err_code, err_message, err_doc) = match &result.result.error {
Some(e) => (e.code.clone(), e.message.clone(), e.doc_url.clone()),
None => (
result.result.status.clone(),
format!(
"scrape failed with status_code={}",
result.result.status_code
),
String::new(),
),
};
let api_err = ApiError {
code: err_code,
message: err_message,
http_status: result.result.status_code,
documentation_url: err_doc,
hint: String::new(),
retry_after_ms: 0,
};
let sc = result.result.status_code;
if (400..500).contains(&sc) {
return Err(ScrapflyError::UpstreamClient(api_err));
}
if (500..600).contains(&sc) {
return Err(ScrapflyError::UpstreamServer(api_err));
}
return Err(ScrapflyError::Api(api_err));
}
if result.result.success && result.result.status == "DONE" {
let fmt = result.result.format.as_str();
if fmt == "clob" || fmt == "blob" {
let (new_content, new_format) =
self.fetch_large_object(&result.result.content, fmt).await?;
result.result.content = new_content;
result.result.format = new_format;
}
}
Ok(result)
}
async fn fetch_large_object(
&self,
content_url: &str,
format: &str,
) -> Result<(String, String), ScrapflyError> {
let mut url = Url::parse(content_url)
.map_err(|e| ScrapflyError::Config(format!("invalid large-object url: {}", e)))?;
{
let existing: Vec<(String, String)> = url
.query_pairs()
.filter(|(k, _)| k != "key")
.map(|(k, v)| (k.into_owned(), v.into_owned()))
.collect();
let mut qs = url.query_pairs_mut();
qs.clear();
for (k, v) in existing {
qs.append_pair(&k, &v);
}
qs.append_pair("key", self.api_key());
}
let mut headers = HeaderMap::new();
headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
let resp = self
.send_with_retry(Method::GET, url, Some(headers), None)
.await?;
let (status, _h, body) = read_response(resp).await?;
if status != 200 {
return Err(from_response(status, &body, 0, false));
}
let new_format = match format {
"clob" => "text",
"blob" => "binary",
_ => {
return Err(ScrapflyError::Config(format!(
"unsupported large-object format: {}",
format
)))
}
};
let content = String::from_utf8_lossy(&body).into_owned();
Ok((content, new_format.to_string()))
}
pub fn concurrent_scrape<'a, I>(
&'a self,
configs: I,
concurrency_limit: usize,
) -> impl Stream<Item = Result<ScrapeResult, ScrapflyError>> + 'a
where
I: IntoIterator<Item = ScrapeConfig> + 'a,
<I as IntoIterator>::IntoIter: 'a,
{
let limit = if concurrency_limit == 0 {
5
} else {
concurrency_limit
};
futures_util::stream::iter(
configs
.into_iter()
.map(move |cfg| async move { self.scrape(&cfg).await }),
)
.buffer_unordered(limit)
}
pub async fn scrape_batch(
&self,
configs: &[ScrapeConfig],
) -> Result<impl Stream<Item = (String, crate::batch::BatchOutcome)>, ScrapflyError> {
self.scrape_batch_with_options(configs, crate::batch::BatchOptions::default())
.await
}
pub async fn scrape_batch_with_options(
&self,
configs: &[ScrapeConfig],
opts: crate::batch::BatchOptions,
) -> Result<impl Stream<Item = (String, crate::batch::BatchOutcome)>, ScrapflyError> {
use crate::batch::{
build_proxified_response, decode_part_body, parts_from_response, BatchOutcome,
};
if configs.is_empty() {
return Err(ScrapflyError::Config(
"scrape_batch: configs is empty".into(),
));
}
if configs.len() > 100 {
return Err(ScrapflyError::Config(format!(
"scrape_batch: max 100 configs per batch (got {})",
configs.len()
)));
}
let mut seen: HashMap<String, usize> = HashMap::new();
let mut body_configs: Vec<HashMap<String, String>> = Vec::with_capacity(configs.len());
for (i, cfg) in configs.iter().enumerate() {
let correlation_id = cfg.correlation_id.clone().ok_or_else(|| {
ScrapflyError::Config(format!(
"scrape_batch: configs[{}] is missing correlation_id (required for matching streamed parts)",
i
))
})?;
if let Some(prev) = seen.get(&correlation_id) {
return Err(ScrapflyError::Config(format!(
"scrape_batch: correlation_id {:?} reused by configs[{}] and configs[{}]",
correlation_id, prev, i
)));
}
seen.insert(correlation_id.clone(), i);
let pairs = cfg.to_query_pairs()?;
let mut entry: HashMap<String, String> = HashMap::with_capacity(pairs.len());
for (k, v) in pairs {
if k == "key" {
continue;
}
entry.insert(k, v);
}
body_configs.push(entry);
}
let body = serde_json::json!({ "configs": body_configs });
let body_bytes = serde_json::to_vec(&body)?;
let mut url = Url::parse(&self.host)
.map_err(|e| ScrapflyError::Config(format!("invalid host: {}", e)))?;
url.set_path("/scrape/batch");
url.query_pairs_mut().append_pair("key", &self.key);
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
headers.insert(
ACCEPT,
HeaderValue::from_static(opts.format.accept_header()),
);
headers.insert(USER_AGENT, HeaderValue::from_static(SDK_USER_AGENT));
let method = Method::POST;
if let Some(cb) = &self.on_request {
cb(&method, &url, &headers);
}
let resp = self
.http
.request(method, url)
.headers(headers)
.body(body_bytes)
.send()
.await
.map_err(|e| ScrapflyError::Config(format!("scrape_batch: send: {}", e)))?;
let status = resp.status().as_u16();
if status != 200 {
let body_bytes = resp.bytes().await.unwrap_or_default();
return Err(from_response(status, &body_bytes, 0, false));
}
let parts_stream = parts_from_response(resp)?;
Ok(parts_stream.map(|part_r| match part_r {
Ok(part) => {
let correlation_id = part
.headers
.get("x-scrapfly-correlation-id")
.cloned()
.unwrap_or_default();
if part
.headers
.get("x-scrapfly-proxified")
.map(|v| v == "true")
.unwrap_or(false)
{
let prox = build_proxified_response(part);
return (correlation_id, BatchOutcome::Proxified(prox));
}
match decode_part_body::<ScrapeResult>(&part) {
Ok(r) => (correlation_id, BatchOutcome::Scrape(r)),
Err(e) => (correlation_id, BatchOutcome::Err(e)),
}
}
Err(e) => (String::new(), BatchOutcome::Err(e)),
}))
}
pub async fn scrape_proxified(
&self,
config: &ScrapeConfig,
) -> Result<reqwest::Response, ScrapflyError> {
let mut cfg = config.clone();
cfg.proxified_response = true;
let pairs = cfg.to_query_pairs()?;
let url = self.build_url("/scrape", &pairs)?;
let method = match cfg.method {
Some(m) => Method::from_bytes(m.as_str().as_bytes())
.map_err(|e| ScrapflyError::Config(format!("invalid method: {}", e)))?,
None => Method::GET,
};
let body = cfg.body.clone();
let resp = self
.send_with_retry(method, url, None, body.map(|b| b.into_bytes()))
.await?;
if let Some(reject_code) = resp.headers().get("x-scrapfly-reject-code") {
let code = reject_code.to_str().unwrap_or("").to_string();
let desc = resp
.headers()
.get("x-scrapfly-reject-description")
.and_then(|v| v.to_str().ok())
.unwrap_or("")
.to_string();
let retryable = resp
.headers()
.get("x-scrapfly-reject-retryable")
.and_then(|v| v.to_str().ok())
.unwrap_or("false")
== "true";
let retry_after_ms: u64 = if retryable {
resp.headers()
.get("retry-after")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(0)
* 1000 } else {
0
};
let status = resp.status().as_u16();
let doc = resp
.headers()
.get("x-scrapfly-reject-doc")
.and_then(|v| v.to_str().ok())
.unwrap_or("")
.to_string();
return Err(ScrapflyError::Api(crate::error::ApiError {
code,
message: format!("Proxified scrape error: {}", desc),
http_status: status,
documentation_url: doc,
hint: String::new(),
retry_after_ms,
}));
}
Ok(resp)
}
pub async fn screenshot(
&self,
config: &ScreenshotConfig,
) -> Result<ScreenshotResult, ScrapflyError> {
let pairs = config.to_query_pairs()?;
let url = self.build_url("/screenshot", &pairs)?;
let resp = self.send_with_retry(Method::GET, url, None, None).await?;
let (status, headers, body) = read_response(resp).await?;
if status != 200 {
return Err(from_response(status, &body, 0, false));
}
let content_type = headers
.get(CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or("application/octet-stream");
let ext = content_type
.split('/')
.nth(1)
.and_then(|s| s.split(';').next())
.unwrap_or("bin")
.to_string();
let upstream_status_code: u16 = headers
.get("x-scrapfly-upstream-http-code")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse().ok())
.unwrap_or(0);
let upstream_url = headers
.get("x-scrapfly-upstream-url")
.and_then(|v| v.to_str().ok())
.unwrap_or("")
.to_string();
Ok(ScreenshotResult {
image: body,
metadata: ScreenshotMetadata {
extension_name: ext,
upstream_status_code,
upstream_url,
},
})
}
pub async fn extract(
&self,
config: &ExtractionConfig,
) -> Result<ExtractionResult, ScrapflyError> {
let pairs = config.to_query_pairs()?;
let url = self.build_url("/extraction", &pairs)?;
let mut headers = HeaderMap::new();
headers.insert(
CONTENT_TYPE,
HeaderValue::from_str(&config.content_type)
.map_err(|e| ScrapflyError::Config(format!("invalid content-type: {}", e)))?,
);
headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
if let Some(fmt) = config.document_compression_format {
headers.insert(
"content-encoding",
HeaderValue::from_str(fmt.as_str())
.map_err(|e| ScrapflyError::Config(format!("invalid encoding: {}", e)))?,
);
}
let resp = self
.send_with_retry(Method::POST, url, Some(headers), Some(config.body.clone()))
.await?;
let (status, _h, body_bytes) = read_response(resp).await?;
if status != 200 {
return Err(from_response(status, &body_bytes, 0, false));
}
Ok(serde_json::from_slice(&body_bytes)?)
}
pub async fn start_crawl(
&self,
config: &CrawlerConfig,
) -> Result<CrawlerStartResponse, ScrapflyError> {
let body = config.to_json_body()?;
let url = self.build_url("/crawl", &[])?;
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
let resp = self
.send_with_retry(Method::POST, url, Some(headers), Some(body))
.await?;
let (status, _h, body_bytes) = read_response(resp).await?;
if status != 200 && status != 201 {
return Err(from_response(status, &body_bytes, 0, true));
}
let parsed: CrawlerStartResponse = serde_json::from_slice(&body_bytes)?;
if parsed.crawler_uuid.is_empty() {
return Err(ScrapflyError::UnexpectedResponseFormat(
"crawler start response missing crawler_uuid".into(),
));
}
Ok(parsed)
}
pub async fn crawl_status(&self, uuid: &str) -> Result<CrawlerStatus, ScrapflyError> {
if uuid.is_empty() {
return Err(ScrapflyError::Config("uuid cannot be empty".into()));
}
let url = self.build_url(&format!("/crawl/{}/status", uuid), &[])?;
let resp = self.send_with_retry(Method::GET, url, None, None).await?;
let (status, _h, body) = read_response(resp).await?;
if status != 200 {
return Err(from_response(status, &body, 0, true));
}
Ok(serde_json::from_slice(&body)?)
}
pub async fn crawl_urls(
&self,
uuid: &str,
status_filter: Option<&str>,
page: u32,
per_page: u32,
) -> Result<CrawlerUrls, ScrapflyError> {
if uuid.is_empty() {
return Err(ScrapflyError::Config("uuid cannot be empty".into()));
}
let page = if page == 0 { 1 } else { page };
let per_page = if per_page == 0 { 100 } else { per_page };
let status_hint = status_filter.unwrap_or("visited");
let mut pairs: Vec<(String, String)> = vec![
("page".into(), page.to_string()),
("per_page".into(), per_page.to_string()),
];
if let Some(s) = status_filter {
pairs.push(("status".into(), s.to_string()));
}
let url = self.build_url(&format!("/crawl/{}/urls", uuid), &pairs)?;
let mut headers = HeaderMap::new();
headers.insert(
ACCEPT,
HeaderValue::from_static("text/plain, application/json"),
);
let resp = self
.send_with_retry(Method::GET, url, Some(headers), None)
.await?;
let (status, resp_headers, body) = read_response(resp).await?;
if status != 200 {
return Err(from_response(status, &body, 0, true));
}
let ct = resp_headers
.get(CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if ct.contains("application/json") {
return Err(ScrapflyError::UnexpectedResponseFormat(format!(
"GET /crawl/{}/urls returned JSON on a 200 response (expected text/plain)",
uuid
)));
}
let body_str = std::str::from_utf8(&body)
.map_err(|e| ScrapflyError::UnexpectedResponseFormat(format!("invalid utf8: {}", e)))?;
Ok(CrawlerUrls::from_text(
body_str,
status_hint,
page,
per_page,
))
}
pub async fn crawl_contents_json(
&self,
uuid: &str,
format: crate::enums::CrawlerContentFormat,
limit: Option<u32>,
offset: Option<u32>,
) -> Result<CrawlerContents, ScrapflyError> {
if uuid.is_empty() {
return Err(ScrapflyError::Config("uuid cannot be empty".into()));
}
let mut pairs: Vec<(String, String)> = vec![("formats".into(), format.as_str().into())];
if let Some(l) = limit {
pairs.push(("limit".into(), l.to_string()));
}
if let Some(o) = offset {
pairs.push(("offset".into(), o.to_string()));
}
let url = self.build_url(&format!("/crawl/{}/contents", uuid), &pairs)?;
let mut headers = HeaderMap::new();
headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
let resp = self
.send_with_retry(Method::GET, url, Some(headers), None)
.await?;
let (status, resp_headers, body) = read_response(resp).await?;
if status != 200 {
return Err(from_response(status, &body, 0, true));
}
let ct = resp_headers
.get(CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if !ct.contains("application/json") {
return Err(ScrapflyError::UnexpectedResponseFormat(format!(
"expected JSON, got Content-Type={}",
ct
)));
}
Ok(serde_json::from_slice(&body)?)
}
pub async fn crawl_contents_plain(
&self,
uuid: &str,
target_url: &str,
format: crate::enums::CrawlerContentFormat,
) -> Result<String, ScrapflyError> {
if uuid.is_empty() {
return Err(ScrapflyError::Config("uuid cannot be empty".into()));
}
if target_url.is_empty() {
return Err(ScrapflyError::Config(
"plain mode requires a single url argument".into(),
));
}
let pairs: Vec<(String, String)> = vec![
("formats".into(), format.as_str().into()),
("url".into(), target_url.into()),
("plain".into(), "true".into()),
];
let url = self.build_url(&format!("/crawl/{}/contents", uuid), &pairs)?;
let mut headers = HeaderMap::new();
headers.insert(ACCEPT, HeaderValue::from_static("*/*"));
let resp = self
.send_with_retry(Method::GET, url, Some(headers), None)
.await?;
let (status, _h, body) = read_response(resp).await?;
if status != 200 {
return Err(from_response(status, &body, 0, true));
}
Ok(String::from_utf8_lossy(&body).into_owned())
}
pub async fn crawl_contents_batch(
&self,
uuid: &str,
urls: &[String],
formats: &[crate::enums::CrawlerContentFormat],
) -> Result<
std::collections::BTreeMap<String, std::collections::BTreeMap<String, String>>,
ScrapflyError,
> {
if uuid.is_empty() {
return Err(ScrapflyError::Config("uuid cannot be empty".into()));
}
if urls.is_empty() {
return Err(ScrapflyError::Config("at least one URL is required".into()));
}
if urls.len() > 100 {
return Err(ScrapflyError::Config(format!(
"batch is limited to 100 URLs per request, got {}",
urls.len()
)));
}
if formats.is_empty() {
return Err(ScrapflyError::Config(
"at least one format is required".into(),
));
}
let format_strs: Vec<&'static str> = formats.iter().map(|f| f.as_str()).collect();
let pairs: Vec<(String, String)> = vec![("formats".into(), format_strs.join(","))];
let url = self.build_url(&format!("/crawl/{}/contents/batch", uuid), &pairs)?;
let body = urls.join("\n").into_bytes();
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("text/plain"));
headers.insert(
ACCEPT,
HeaderValue::from_static("multipart/related, application/json"),
);
let resp = self
.send_with_retry(Method::POST, url, Some(headers), Some(body))
.await?;
let (status, resp_headers, body_bytes) = read_response(resp).await?;
if status != 200 {
return Err(from_response(status, &body_bytes, 0, true));
}
let ct = resp_headers
.get(CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if ct.contains("application/json") {
return Err(ScrapflyError::UnexpectedResponseFormat(
"CrawlContentsBatch expected multipart/related, got JSON".into(),
));
}
parse_multipart_related(
std::str::from_utf8(&body_bytes).unwrap_or(""),
ct,
&format_strs,
)
}
pub async fn crawl_cancel(&self, uuid: &str) -> Result<(), ScrapflyError> {
if uuid.is_empty() {
return Err(ScrapflyError::Config("uuid cannot be empty".into()));
}
let url = self.build_url(&format!("/crawl/{}/cancel", uuid), &[])?;
let mut headers = HeaderMap::new();
headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
let resp = self
.send_with_retry(Method::POST, url, Some(headers), None)
.await?;
let (status, _h, body) = read_response(resp).await?;
if status != 200 && status != 202 {
return Err(from_response(status, &body, 0, true));
}
Ok(())
}
pub async fn crawl_artifact(
&self,
uuid: &str,
artifact_type: CrawlerArtifactType,
) -> Result<CrawlerArtifact, ScrapflyError> {
if uuid.is_empty() {
return Err(ScrapflyError::Config("uuid cannot be empty".into()));
}
let pairs: Vec<(String, String)> = vec![("type".into(), artifact_type.as_str().into())];
let url = self.build_url(&format!("/crawl/{}/artifact", uuid), &pairs)?;
let mut headers = HeaderMap::new();
let accept = match artifact_type {
CrawlerArtifactType::Har => "application/json, application/octet-stream",
CrawlerArtifactType::Warc => {
"application/gzip, application/octet-stream, application/json"
}
};
headers.insert(ACCEPT, HeaderValue::from_static(accept));
let resp = self
.send_with_retry(Method::GET, url, Some(headers), None)
.await?;
let (status, _h, body) = read_response(resp).await?;
if status != 200 {
return Err(from_response(status, &body, 0, true));
}
Ok(CrawlerArtifact {
artifact_type,
data: body,
})
}
pub(crate) async fn send_with_retry(
&self,
method: Method,
url: Url,
headers: Option<HeaderMap>,
body: Option<Vec<u8>>,
) -> Result<Response, ScrapflyError> {
let mut last_err: Option<ScrapflyError> = None;
for attempt in 0..DEFAULT_RETRIES {
let mut req = self.http.request(method.clone(), url.clone());
let mut hmap = headers.clone().unwrap_or_default();
if !hmap.contains_key(USER_AGENT) {
hmap.insert(USER_AGENT, HeaderValue::from_static(SDK_USER_AGENT));
}
if let Some(cb) = &self.on_request {
cb(&method, &url, &hmap);
}
req = req.headers(hmap);
if let Some(b) = &body {
req = req.body(b.clone());
}
match req.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
if (500..600).contains(&status) && attempt + 1 < DEFAULT_RETRIES {
last_err = Some(ScrapflyError::ApiServer(crate::error::ApiError {
message: "server error".into(),
http_status: status,
..Default::default()
}));
tokio::time::sleep(DEFAULT_RETRY_DELAY).await;
continue;
}
return Ok(resp);
}
Err(e) => {
last_err = Some(ScrapflyError::Transport(e));
if attempt + 1 < DEFAULT_RETRIES {
tokio::time::sleep(DEFAULT_RETRY_DELAY).await;
continue;
}
}
}
}
Err(last_err.unwrap_or_else(|| ScrapflyError::Config("retry loop exhausted".into())))
}
async fn send_simple(
&self,
method: Method,
url: Url,
headers: Option<HeaderMap>,
body: Option<Vec<u8>>,
) -> Result<Response, ScrapflyError> {
let mut req = self.http.request(method.clone(), url.clone());
let mut hmap = headers.unwrap_or_default();
if !hmap.contains_key(USER_AGENT) {
hmap.insert(USER_AGENT, HeaderValue::from_static(SDK_USER_AGENT));
}
if let Some(cb) = &self.on_request {
cb(&method, &url, &hmap);
}
req = req.headers(hmap);
if let Some(b) = body {
req = req.body(b);
}
req.send().await.map_err(ScrapflyError::Transport)
}
}
async fn read_response(resp: Response) -> Result<(u16, HeaderMap, bytes::Bytes), ScrapflyError> {
let status = resp.status().as_u16();
let headers = resp.headers().clone();
let body = resp.bytes().await.map_err(ScrapflyError::Transport)?;
let _ = parse_retry_after(headers.get("retry-after").and_then(|v| v.to_str().ok()));
Ok((status, headers, body))
}
fn parse_multipart_related(
body: &str,
content_type: &str,
formats: &[&str],
) -> Result<
std::collections::BTreeMap<String, std::collections::BTreeMap<String, String>>,
ScrapflyError,
> {
let mut boundary = String::new();
for part in content_type.split(';') {
let p = part.trim();
if let Some(stripped) = p.strip_prefix("boundary=") {
boundary = stripped.trim_matches('"').to_string();
break;
}
}
if boundary.is_empty() {
return Err(ScrapflyError::UnexpectedResponseFormat(format!(
"multipart response has no boundary in Content-Type: {}",
content_type
)));
}
let delimiter = format!("--{}", boundary);
let mut result: std::collections::BTreeMap<String, std::collections::BTreeMap<String, String>> =
std::collections::BTreeMap::new();
let segments: Vec<&str> = body.split(&delimiter as &str).collect();
for segment in segments.iter().skip(1) {
let mut seg = *segment;
seg = seg.trim_start_matches("\r\n").trim_start_matches('\n');
if seg.starts_with("--") {
break;
}
seg = seg.trim_end_matches("\r\n").trim_end_matches('\n');
let (headers_raw, part_body) = if let Some(idx) = seg.find("\r\n\r\n") {
(&seg[..idx], &seg[idx + 4..])
} else if let Some(idx) = seg.find("\n\n") {
(&seg[..idx], &seg[idx + 2..])
} else {
continue;
};
let mut part_url = String::new();
let mut part_format = String::new();
for line in headers_raw.split('\n') {
let line = line.trim_end_matches('\r');
if let Some(colon) = line.find(':') {
let name = line[..colon].trim().to_ascii_lowercase();
let value = line[colon + 1..].trim().to_string();
match name.as_str() {
"content-location" => part_url = value,
"content-type" => part_format = infer_format_from_content_type(&value),
_ => {}
}
}
}
if part_url.is_empty() {
continue;
}
if part_format.is_empty() {
part_format = formats.first().copied().unwrap_or("html").to_string();
}
result
.entry(part_url)
.or_default()
.insert(part_format, part_body.to_string());
}
Ok(result)
}
fn infer_format_from_content_type(ct: &str) -> String {
let lc = ct
.split(';')
.next()
.unwrap_or("")
.trim()
.to_ascii_lowercase();
match lc.as_str() {
"text/html" => "html".into(),
"text/markdown" => "markdown".into(),
"text/plain" => "text".into(),
"application/json" => "json".into(),
_ => String::new(),
}
}