pub mod shapes;
use backon::ExponentialBuilder;
use backon::Retryable;
use reqwest::Client;
use reqwest::{Error, Response};
use serde::Serialize;
pub use shapes::{request::*, response::*};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::OnceLock;
use tokio_stream::StreamExt;
#[derive(Debug, Default)]
pub struct RateLimitInfo {
pub limit: AtomicU32,
pub remaining: AtomicU32,
pub reset_seconds: AtomicU32,
}
impl RateLimitInfo {
pub fn snapshot(&self) -> (u32, u32, u32) {
(
self.limit.load(Ordering::Relaxed),
self.remaining.load(Ordering::Relaxed),
self.reset_seconds.load(Ordering::Relaxed),
)
}
}
static API_URL: OnceLock<String> = OnceLock::new();
pub fn get_api_url() -> &'static str {
API_URL.get_or_init(|| {
std::env::var("SPIDER_API_URL").unwrap_or_else(|_| "https://api.spider.cloud".to_string())
})
}
#[derive(Debug, Default)]
pub struct Spider {
pub api_key: String,
pub client: Client,
pub rate_limit: RateLimitInfo,
}
pub async fn handle_json(res: reqwest::Response) -> Result<serde_json::Value, reqwest::Error> {
res.json().await
}
pub async fn handle_jsonl(res: reqwest::Response) -> Result<serde_json::Value, reqwest::Error> {
let text = res.text().await?;
let lines = text
.lines()
.filter_map(|line| serde_json::from_str::<serde_json::Value>(line).ok())
.collect::<Vec<_>>();
Ok(serde_json::Value::Array(lines))
}
#[cfg(feature = "csv")]
pub async fn handle_csv(res: reqwest::Response) -> Result<serde_json::Value, reqwest::Error> {
use std::collections::HashMap;
let text = res.text().await?;
let mut rdr = csv::Reader::from_reader(text.as_bytes());
let records: Vec<HashMap<String, String>> = rdr.deserialize().filter_map(Result::ok).collect();
if let Ok(record) = serde_json::to_value(records) {
Ok(record)
} else {
Ok(serde_json::Value::String(text))
}
}
#[cfg(not(feature = "csv"))]
pub async fn handle_csv(res: reqwest::Response) -> Result<serde_json::Value, reqwest::Error> {
handle_text(res).await
}
pub async fn handle_text(res: reqwest::Response) -> Result<serde_json::Value, reqwest::Error> {
Ok(serde_json::Value::String(
res.text().await.unwrap_or_default(),
))
}
#[cfg(feature = "csv")]
pub async fn handle_xml(res: reqwest::Response) -> Result<serde_json::Value, reqwest::Error> {
let text = res.text().await?;
match quick_xml::de::from_str::<serde_json::Value>(&text) {
Ok(val) => Ok(val),
Err(_) => Ok(serde_json::Value::String(text)),
}
}
#[cfg(not(feature = "csv"))]
pub async fn handle_xml(res: reqwest::Response) -> Result<serde_json::Value, reqwest::Error> {
handle_text(res).await
}
pub async fn parse_response(res: reqwest::Response) -> Result<serde_json::Value, reqwest::Error> {
let content_type = res
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or_default()
.to_ascii_lowercase();
if content_type.contains("json") && !content_type.contains("jsonl") {
handle_json(res).await
} else if content_type.contains("jsonl") || content_type.contains("ndjson") {
handle_jsonl(res).await
} else if content_type.contains("csv") {
handle_csv(res).await
} else if content_type.contains("xml") {
handle_xml(res).await
} else {
handle_text(res).await
}
}
impl Spider {
pub fn new(api_key: Option<String>) -> Result<Self, &'static str> {
let api_key = api_key.or_else(|| std::env::var("SPIDER_API_KEY").ok());
match api_key {
Some(key) => Ok(Self {
api_key: key,
client: Client::new(),
rate_limit: RateLimitInfo::default(),
}),
None => Err("No API key provided"),
}
}
pub fn new_with_client(api_key: Option<String>, client: Client) -> Result<Self, &'static str> {
let api_key = api_key.or_else(|| std::env::var("SPIDER_API_KEY").ok());
match api_key {
Some(key) => Ok(Self {
api_key: key,
client,
rate_limit: RateLimitInfo::default(),
}),
None => Err("No API key provided"),
}
}
fn update_rate_limit(&self, headers: &reqwest::header::HeaderMap) {
if let Some(v) = headers.get("RateLimit-Limit").and_then(|v| v.to_str().ok()) {
if let Ok(n) = v.parse::<u32>() {
self.rate_limit.limit.store(n, Ordering::Relaxed);
}
}
if let Some(v) = headers
.get("RateLimit-Remaining")
.and_then(|v| v.to_str().ok())
{
if let Ok(n) = v.parse::<u32>() {
self.rate_limit.remaining.store(n, Ordering::Relaxed);
}
}
if let Some(v) = headers
.get("RateLimit-Reset")
.and_then(|v| v.to_str().ok())
{
if let Ok(n) = v.parse::<u32>() {
self.rate_limit.reset_seconds.store(n, Ordering::Relaxed);
}
}
}
async fn api_post_base(
&self,
endpoint: &str,
data: impl Serialize + Sized + std::fmt::Debug,
content_type: &str,
) -> Result<Response, Error> {
let url: String = format!("{}/{}", get_api_url(), endpoint);
let resp = self
.client
.post(&url)
.header(
"User-Agent",
format!("Spider-Client/{}", env!("CARGO_PKG_VERSION")),
)
.header("Content-Type", content_type)
.header("Authorization", format!("Bearer {}", self.api_key))
.json(&data)
.send()
.await?;
self.update_rate_limit(resp.headers());
if resp.status() == reqwest::StatusCode::TOO_MANY_REQUESTS {
let retry_after = resp
.headers()
.get("Retry-After")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(1);
tokio::time::sleep(std::time::Duration::from_secs(retry_after)).await;
return resp.error_for_status();
}
Ok(resp)
}
pub async fn api_post(
&self,
endpoint: &str,
data: impl Serialize + std::fmt::Debug + Clone + Send + Sync,
content_type: &str,
) -> Result<Response, Error> {
let fetch = || async {
self.api_post_base(endpoint, data.to_owned(), content_type)
.await
};
fetch
.retry(ExponentialBuilder::default().with_max_times(5))
.when(|err: &reqwest::Error| {
if let Some(status) = err.status() {
status.is_server_error()
|| status == reqwest::StatusCode::TOO_MANY_REQUESTS
} else {
err.is_timeout()
}
})
.await
}
async fn api_get_base<T: Serialize>(
&self,
endpoint: &str,
query_params: Option<&T>,
) -> Result<serde_json::Value, reqwest::Error> {
let url = format!("{}/{}", get_api_url(), endpoint);
let res = self
.client
.get(&url)
.query(&query_params)
.header(
"User-Agent",
format!("Spider-Client/{}", env!("CARGO_PKG_VERSION")),
)
.header("Content-Type", "application/json")
.header("Authorization", format!("Bearer {}", self.api_key))
.send()
.await?;
self.update_rate_limit(res.headers());
if res.status() == reqwest::StatusCode::TOO_MANY_REQUESTS {
let retry_after = res
.headers()
.get("Retry-After")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(1);
tokio::time::sleep(std::time::Duration::from_secs(retry_after)).await;
return Err(res.error_for_status().unwrap_err());
}
parse_response(res).await
}
pub async fn api_get<T: Serialize>(
&self,
endpoint: &str,
query_params: Option<&T>,
) -> Result<serde_json::Value, reqwest::Error> {
let fetch = || async { self.api_get_base(endpoint, query_params.to_owned()).await };
fetch
.retry(ExponentialBuilder::default().with_max_times(5))
.when(|err: &reqwest::Error| {
if let Some(status) = err.status() {
status.is_server_error()
|| status == reqwest::StatusCode::TOO_MANY_REQUESTS
} else {
err.is_timeout()
}
})
.await
}
async fn api_delete_base(
&self,
endpoint: &str,
params: Option<HashMap<String, serde_json::Value>>,
) -> Result<Response, Error> {
let url = format!("{}/v1/{}", get_api_url(), endpoint);
let request_builder = self
.client
.delete(&url)
.header(
"User-Agent",
format!("Spider-Client/{}", env!("CARGO_PKG_VERSION")),
)
.header("Content-Type", "application/json")
.header("Authorization", format!("Bearer {}", self.api_key));
let request_builder = if let Some(params) = params {
request_builder.json(¶ms)
} else {
request_builder
};
request_builder.send().await
}
pub async fn api_delete(
&self,
endpoint: &str,
params: Option<HashMap<String, serde_json::Value>>,
) -> Result<Response, Error> {
let fetch = || async { self.api_delete_base(endpoint, params.to_owned()).await };
fetch
.retry(ExponentialBuilder::default().with_max_times(5))
.when(|err: &reqwest::Error| {
if let Some(status) = err.status() {
status.is_server_error()
} else {
err.is_timeout()
}
})
.await
}
pub async fn scrape_url(
&self,
url: &str,
params: Option<RequestParams>,
content_type: &str,
) -> Result<serde_json::Value, reqwest::Error> {
let mut data = HashMap::new();
if let Ok(params) = serde_json::to_value(params) {
if let Some(ref p) = params.as_object() {
data.extend(p.iter().map(|(k, v)| (k.to_string(), v.clone())));
}
}
if !url.is_empty() {
data.insert(
"url".to_string(),
serde_json::Value::String(url.to_string()),
);
}
let res = self.api_post("scrape", data, content_type).await?;
parse_response(res).await
}
pub async fn multi_scrape_url(
&self,
params: Option<Vec<RequestParams>>,
content_type: &str,
) -> Result<serde_json::Value, reqwest::Error> {
let mut data = HashMap::new();
if let Ok(mut params) = serde_json::to_value(params) {
if let Some(obj) = params.as_object_mut() {
obj.insert("limit".to_string(), serde_json::Value::Number(1.into()));
data.extend(obj.iter().map(|(k, v)| (k.clone(), v.clone())));
}
}
let res = self.api_post("scrape", data, content_type).await?;
parse_response(res).await
}
pub async fn crawl_url(
&self,
url: &str,
params: Option<RequestParams>,
stream: bool,
content_type: &str,
callback: Option<impl Fn(serde_json::Value) + Send>,
) -> Result<serde_json::Value, reqwest::Error> {
use tokio_util::codec::{FramedRead, LinesCodec};
let mut data = HashMap::new();
if let Ok(params) = serde_json::to_value(params) {
if let Some(ref p) = params.as_object() {
data.extend(p.iter().map(|(k, v)| (k.to_string(), v.clone())));
}
}
data.insert("url".into(), serde_json::Value::String(url.to_string()));
let res = self.api_post("crawl", data, content_type).await?;
if stream {
if let Some(callback) = callback {
let stream = res.bytes_stream();
let stream_reader = tokio_util::io::StreamReader::new(
stream
.map(|r| r.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))),
);
let mut lines = FramedRead::new(stream_reader, LinesCodec::new());
while let Some(line_result) = lines.next().await {
match line_result {
Ok(line) => match serde_json::from_str::<serde_json::Value>(&line) {
Ok(value) => {
callback(value);
}
Err(_e) => {
continue;
}
},
Err(_e) => return Ok(serde_json::Value::Null),
}
}
Ok(serde_json::Value::Null)
} else {
Ok(serde_json::Value::Null)
}
} else {
parse_response(res).await
}
}
pub async fn multi_crawl_url(
&self,
params: Option<Vec<RequestParams>>,
stream: bool,
content_type: &str,
callback: Option<impl Fn(serde_json::Value) + Send>,
) -> Result<serde_json::Value, reqwest::Error> {
use tokio_util::codec::{FramedRead, LinesCodec};
let res = self.api_post("crawl", params, content_type).await?;
if stream {
if let Some(callback) = callback {
let stream = res.bytes_stream();
let stream_reader = tokio_util::io::StreamReader::new(
stream
.map(|r| r.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))),
);
let mut lines = FramedRead::new(stream_reader, LinesCodec::new());
while let Some(line_result) = lines.next().await {
match line_result {
Ok(line) => match serde_json::from_str::<serde_json::Value>(&line) {
Ok(value) => {
callback(value);
}
Err(_e) => {
continue;
}
},
Err(_e) => return Ok(serde_json::Value::Null),
}
}
Ok(serde_json::Value::Null)
} else {
Ok(serde_json::Value::Null)
}
} else {
parse_response(res).await
}
}
pub async fn links(
&self,
url: &str,
params: Option<RequestParams>,
_stream: bool,
content_type: &str,
) -> Result<serde_json::Value, reqwest::Error> {
let mut data = HashMap::new();
if let Ok(params) = serde_json::to_value(params) {
if let Some(ref p) = params.as_object() {
data.extend(p.iter().map(|(k, v)| (k.to_string(), v.clone())));
}
}
data.insert("url".into(), serde_json::Value::String(url.to_string()));
let res = self.api_post("links", data, content_type).await?;
parse_response(res).await
}
pub async fn multi_links(
&self,
params: Option<Vec<RequestParams>>,
_stream: bool,
content_type: &str,
) -> Result<serde_json::Value, reqwest::Error> {
let res = self.api_post("links", params, content_type).await?;
parse_response(res).await
}
pub async fn screenshot(
&self,
url: &str,
params: Option<RequestParams>,
_stream: bool,
content_type: &str,
) -> Result<serde_json::Value, reqwest::Error> {
let mut data = HashMap::new();
if let Ok(params) = serde_json::to_value(params) {
if let Some(ref p) = params.as_object() {
data.extend(p.iter().map(|(k, v)| (k.to_string(), v.clone())));
}
}
data.insert("url".into(), serde_json::Value::String(url.to_string()));
let res = self.api_post("screenshot", data, content_type).await?;
parse_response(res).await
}
pub async fn multi_screenshot(
&self,
params: Option<Vec<RequestParams>>,
_stream: bool,
content_type: &str,
) -> Result<serde_json::Value, reqwest::Error> {
let res = self.api_post("screenshot", params, content_type).await?;
parse_response(res).await
}
pub async fn search(
&self,
q: &str,
params: Option<SearchRequestParams>,
_stream: bool,
content_type: &str,
) -> Result<serde_json::Value, reqwest::Error> {
let body = match params {
Some(mut params) => {
params.search = q.to_string();
params
}
_ => {
let mut params = SearchRequestParams::default();
params.search = q.to_string();
params
}
};
let res = self.api_post("search", body, content_type).await?;
parse_response(res).await
}
pub async fn multi_search(
&self,
params: Option<Vec<SearchRequestParams>>,
content_type: &str,
) -> Result<serde_json::Value, reqwest::Error> {
let res = self.api_post("search", params, content_type).await?;
parse_response(res).await
}
pub async fn unblock_url(
&self,
url: &str,
params: Option<RequestParams>,
content_type: &str,
) -> Result<serde_json::Value, reqwest::Error> {
let mut data = HashMap::new();
if let Ok(params) = serde_json::to_value(params) {
if let Some(ref p) = params.as_object() {
data.extend(p.iter().map(|(k, v)| (k.to_string(), v.clone())));
}
}
if !url.is_empty() {
data.insert(
"url".to_string(),
serde_json::Value::String(url.to_string()),
);
}
let res = self.api_post("unblocker", data, content_type).await?;
parse_response(res).await
}
pub async fn multi_unblock_url(
&self,
params: Option<Vec<RequestParams>>,
content_type: &str,
) -> Result<serde_json::Value, reqwest::Error> {
let mut data = HashMap::new();
if let Ok(mut params) = serde_json::to_value(params) {
if let Some(obj) = params.as_object_mut() {
obj.insert("limit".to_string(), serde_json::Value::Number(1.into()));
data.extend(obj.iter().map(|(k, v)| (k.clone(), v.clone())));
}
}
let res = self.api_post("unblocker", data, content_type).await?;
parse_response(res).await
}
pub async fn transform(
&self,
data: Vec<HashMap<&str, &str>>,
params: Option<TransformParams>,
_stream: bool,
content_type: &str,
) -> Result<serde_json::Value, reqwest::Error> {
let mut payload = HashMap::new();
if let Ok(params) = serde_json::to_value(params) {
if let Some(ref p) = params.as_object() {
payload.extend(p.iter().map(|(k, v)| (k.to_string(), v.clone())));
}
}
if let Ok(d) = serde_json::to_value(data) {
payload.insert("data".into(), d);
}
let res = self.api_post("transform", payload, content_type).await?;
parse_response(res).await
}
pub async fn get_credits(&self) -> Result<serde_json::Value, reqwest::Error> {
self.api_get::<serde_json::Value>("data/credits", None)
.await
}
pub async fn data_post(
&self,
table: &str,
data: Option<RequestParams>,
) -> Result<serde_json::Value, reqwest::Error> {
let res = self
.api_post(&format!("data/{}", table), data, "application/json")
.await?;
parse_response(res).await
}
pub async fn data_get(
&self,
table: &str,
params: Option<RequestParams>,
) -> Result<serde_json::Value, reqwest::Error> {
let mut payload = HashMap::new();
if let Some(params) = params {
if let Ok(p) = serde_json::to_value(params) {
if let Some(o) = p.as_object() {
payload.extend(o.iter().map(|(k, v)| (k.as_str(), v.clone())));
}
}
}
let res = self
.api_get::<serde_json::Value>(&format!("data/{}", table), None)
.await?;
Ok(res)
}
}
#[cfg(test)]
mod tests {
use super::*;
use dotenv::dotenv;
use lazy_static::lazy_static;
use reqwest::ClientBuilder;
lazy_static! {
static ref SPIDER_CLIENT: Spider = {
dotenv().ok();
let client = ClientBuilder::new();
let client = client.user_agent("SpiderBot").build().unwrap();
Spider::new_with_client(None, client).expect("client to build")
};
}
#[tokio::test]
#[ignore]
async fn test_scrape_url() {
let response = SPIDER_CLIENT
.scrape_url("https://example.com", None, "application/json")
.await;
assert!(response.is_ok());
}
#[tokio::test]
async fn test_crawl_url() {
let response = SPIDER_CLIENT
.crawl_url(
"https://example.com",
None,
false,
"application/json",
None::<fn(serde_json::Value)>,
)
.await;
assert!(response.is_ok());
}
#[tokio::test]
#[ignore]
async fn test_links() {
let response: Result<serde_json::Value, Error> = SPIDER_CLIENT
.links("https://example.com", None, false, "application/json")
.await;
assert!(response.is_ok());
}
#[tokio::test]
#[ignore]
async fn test_screenshot() {
let mut params = RequestParams::default();
params.limit = Some(1);
let response = SPIDER_CLIENT
.screenshot(
"https://example.com",
Some(params),
false,
"application/json",
)
.await;
assert!(response.is_ok());
}
#[tokio::test]
#[ignore]
async fn test_transform() {
let data = vec![HashMap::from([(
"<html><body><h1>Transformation</h1></body></html>".into(),
"".into(),
)])];
let response = SPIDER_CLIENT
.transform(data, None, false, "application/json")
.await;
assert!(response.is_ok());
}
#[tokio::test]
async fn test_get_credits() {
let response = SPIDER_CLIENT.get_credits().await;
assert!(response.is_ok());
}
}