use super::error::FetchError;
use super::source::{BookSource, Charset, Method, RateLimit, Retry};
use async_trait::async_trait;
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::{Duration, Instant};
struct RateLimiter {
interval: Duration,
last: Mutex<Option<Instant>>,
}
impl RateLimiter {
fn from_config(rl: &RateLimit) -> Option<Self> {
if rl.max_count == 0 || rl.per_ms == 0 {
return None;
}
Some(Self {
interval: Duration::from_millis(rl.per_ms / rl.max_count),
last: Mutex::new(None),
})
}
async fn acquire(&self) {
let wait = {
let mut last = self.last.lock().expect("rate limiter mutex poisoned");
let now = Instant::now();
let wait = match *last {
Some(prev) => self
.interval
.checked_sub(now.duration_since(prev))
.unwrap_or(Duration::ZERO),
None => Duration::ZERO,
};
*last = Some(now + wait);
wait
}; if !wait.is_zero() {
tokio::time::sleep(wait).await;
}
}
}
pub fn is_challenge(status: u16, cf_mitigated: Option<&str>, body: &str) -> bool {
if cf_mitigated == Some("challenge") {
return true;
}
matches!(status, 403 | 503)
&& (body.contains("_cf_chl_opt")
|| body.contains("/cdn-cgi/challenge-platform/")
|| body.contains("<title>Just a moment"))
}
#[derive(Debug, Clone, Default)]
pub struct FetchRequest {
pub url: String,
pub method: Method,
pub body: Option<String>,
pub headers: HashMap<String, String>,
}
impl FetchRequest {
pub fn get(url: impl Into<String>) -> Self {
Self {
url: url.into(),
..Default::default()
}
}
}
#[async_trait]
pub trait Fetcher: Send + Sync {
async fn fetch(&self, req: FetchRequest) -> Result<String, FetchError>;
}
pub struct ReqwestFetcher {
client: reqwest::Client,
base: String,
charset: Charset,
retry: Option<Retry>,
limiter: Option<RateLimiter>,
}
impl ReqwestFetcher {
pub fn new(source: &BookSource) -> Result<Self, FetchError> {
let http = &source.http;
let mut headers = HeaderMap::new();
for (k, v) in &http.headers {
let name = HeaderName::from_bytes(k.as_bytes())
.map_err(|e| FetchError::Header(e.to_string()))?;
let val = HeaderValue::from_str(v).map_err(|e| FetchError::Header(e.to_string()))?;
headers.insert(name, val);
}
if !http.cookies.is_empty() {
let cookie = http
.cookies
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>()
.join("; ");
let val =
HeaderValue::from_str(&cookie).map_err(|e| FetchError::Header(e.to_string()))?;
headers.insert(reqwest::header::COOKIE, val);
}
let mut builder = reqwest::Client::builder()
.cookie_store(true)
.default_headers(headers);
if let Some(ms) = http.timeout {
builder = builder.timeout(Duration::from_millis(ms));
}
let client = builder.build()?;
Ok(Self {
client,
base: source.url.trim_end_matches('/').to_string(),
charset: http.charset,
retry: http.retry.clone(),
limiter: http.rate_limit.as_ref().and_then(RateLimiter::from_config),
})
}
async fn send_once(&self, url: &str, req: &FetchRequest) -> Result<String, FetchError> {
let mut builder = match req.method {
Method::Get => self.client.get(url),
Method::Post => self.client.post(url),
};
for (k, v) in &req.headers {
builder = builder.header(k, v);
}
if let Some(body) = &req.body {
builder = builder.body(body.clone());
}
let resp = builder.send().await?;
let status = resp.status();
let cf_mitigated = resp
.headers()
.get("cf-mitigated")
.and_then(|v| v.to_str().ok())
.map(str::to_owned);
let status_err = resp.error_for_status_ref().err();
let bytes = resp.bytes().await?;
let text = self.decode(&bytes);
if is_challenge(status.as_u16(), cf_mitigated.as_deref(), &text) {
return Err(FetchError::Challenged(format!(
"Cloudflare/反爬挑战 @ {url}"
)));
}
if let Some(e) = status_err {
return Err(FetchError::Http(e));
}
Ok(text)
}
pub(crate) fn resolve(&self, url: &str) -> String {
if url.starts_with("http://") || url.starts_with("https://") {
url.to_string()
} else if let Some(rest) = url.strip_prefix('/') {
format!("{}/{}", self.base, rest)
} else {
format!("{}/{}", self.base, url)
}
}
fn decode(&self, bytes: &[u8]) -> String {
use encoding_rs::{BIG5, GB18030, GBK, UTF_8};
match self.charset {
Charset::Utf8 => UTF_8.decode(bytes).0.into_owned(),
Charset::Gbk => GBK.decode(bytes).0.into_owned(),
Charset::Gb18030 => GB18030.decode(bytes).0.into_owned(),
Charset::Big5 => BIG5.decode(bytes).0.into_owned(),
Charset::Auto => {
let (text, _, had_err) = UTF_8.decode(bytes);
if had_err {
GBK.decode(bytes).0.into_owned()
} else {
text.into_owned()
}
}
}
}
}
#[async_trait]
impl Fetcher for ReqwestFetcher {
async fn fetch(&self, req: FetchRequest) -> Result<String, FetchError> {
if let Some(limiter) = &self.limiter {
limiter.acquire().await;
}
let url = self.resolve(&req.url);
let max = self.retry.as_ref().map(|r| r.max).unwrap_or(0);
let backoff = self.retry.as_ref().map(|r| r.backoff_ms).unwrap_or(0);
let mut attempt = 0u32;
loop {
match self.send_once(&url, &req).await {
Ok(text) => return Ok(text),
Err(e) => {
if matches!(e, FetchError::Challenged(_)) || attempt >= max {
return Err(e);
}
attempt += 1;
if backoff > 0 {
tokio::time::sleep(Duration::from_millis(backoff)).await;
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::is_challenge;
const CHALLENGE_HTML: &str = r#"<html><head><title>Just a moment...</title></head>
<body><script>window._cf_chl_opt={cType:'managed'};
a.src='/cdn-cgi/challenge-platform/h/g/orchestrate/chl_page/v1';</script></body></html>"#;
const NORMAL_HTML: &str =
r#"<html><head><title>蛊真人 搜索结果</title></head><body>正文</body></html>"#;
#[test]
fn cf_mitigated_header_is_challenge() {
assert!(is_challenge(200, Some("challenge"), NORMAL_HTML));
}
#[test]
fn challenge_body_with_403_is_challenge() {
assert!(is_challenge(403, None, CHALLENGE_HTML));
assert!(is_challenge(503, None, CHALLENGE_HTML));
}
#[test]
fn normal_200_page_is_not_challenge() {
assert!(!is_challenge(200, None, NORMAL_HTML));
assert!(!is_challenge(200, None, CHALLENGE_HTML));
}
#[test]
fn challenge_markers_without_bad_status_not_challenge() {
assert!(!is_challenge(403, None, NORMAL_HTML));
}
}