use super::cookie::{merge_cookie_str, sanitize_header_value};
use super::error::FetchError;
use super::source::{BookSource, Charset, Method, RateLimit, Retry};
use async_trait::async_trait;
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::{Duration, Instant};
struct RateLimiter {
interval: Duration,
last: Mutex<Option<Instant>>,
}
impl RateLimiter {
fn from_config(rl: &RateLimit) -> Option<Self> {
if rl.max_count == 0 || rl.per_ms == 0 {
return None;
}
Some(Self {
interval: Duration::from_millis(rl.per_ms / rl.max_count),
last: Mutex::new(None),
})
}
fn from_rate_str(s: &str) -> Option<Self> {
let s = s.trim();
if s.is_empty() {
return None;
}
let (max_count, per_ms) = match s.split_once('/') {
Some((n, ms)) => (n.trim().parse().ok()?, ms.trim().parse().ok()?),
None => (1, s.parse().ok()?),
};
Self::from_config(&RateLimit { max_count, per_ms })
}
async fn acquire(&self) {
let wait = {
let mut last = self.last.lock().expect("rate limiter mutex poisoned");
let now = Instant::now();
let wait = match *last {
Some(prev) => self
.interval
.checked_sub(now.duration_since(prev))
.unwrap_or(Duration::ZERO),
None => Duration::ZERO,
};
*last = Some(now + wait);
wait
}; if !wait.is_zero() {
tokio::time::sleep(wait).await;
}
}
}
pub fn is_challenge(status: u16, cf_mitigated: Option<&str>, body: &str) -> bool {
if cf_mitigated == Some("challenge") {
return true;
}
matches!(status, 403 | 503)
&& (body.contains("_cf_chl_opt")
|| body.contains("/cdn-cgi/challenge-platform/")
|| body.contains("<title>Just a moment"))
}
#[derive(Debug, Clone, Default)]
pub struct FetchRequest {
pub url: String,
pub method: Method,
pub body: Option<String>,
pub headers: HashMap<String, String>,
}
impl FetchRequest {
pub fn get(url: impl Into<String>) -> Self {
Self {
url: url.into(),
..Default::default()
}
}
}
#[derive(Debug, Clone, Default)]
pub struct FetchResponse {
pub body: String,
pub status: u16,
pub headers: HashMap<String, String>,
}
#[async_trait]
pub trait Fetcher: Send + Sync {
async fn fetch(&self, req: FetchRequest) -> Result<String, FetchError>;
async fn fetch_full(&self, req: FetchRequest) -> Result<FetchResponse, FetchError> {
let body = self.fetch(req).await?;
Ok(FetchResponse {
body,
status: 200,
headers: HashMap::new(),
})
}
}
pub struct ReqwestFetcher {
client: reqwest::Client,
base: String,
charset: Charset,
retry: Option<Retry>,
limiter: Option<RateLimiter>,
static_cookie: Option<String>,
}
impl ReqwestFetcher {
pub fn new(source: &BookSource) -> Result<Self, FetchError> {
let http = &source.http;
let mut headers = HeaderMap::new();
for (k, v) in &http.headers {
let name = HeaderName::from_bytes(k.as_bytes())
.map_err(|e| FetchError::Header(e.to_string()))?;
let val = HeaderValue::from_str(v).map_err(|e| FetchError::Header(e.to_string()))?;
headers.insert(name, val);
}
let static_cookie = if http.cookies.is_empty() {
None
} else {
let cookie = http
.cookies
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>()
.join("; ");
let val =
HeaderValue::from_str(&cookie).map_err(|e| FetchError::Header(e.to_string()))?;
headers.insert(reqwest::header::COOKIE, val);
Some(cookie)
};
let mut builder = reqwest::Client::builder()
.cookie_store(true)
.default_headers(headers);
if let Some(ms) = http.timeout {
builder = builder.timeout(Duration::from_millis(ms));
}
let client = builder.build()?;
Ok(Self {
client,
base: source.url.trim_end_matches('/').to_string(),
charset: http.charset,
retry: http.retry.clone(),
limiter: http
.rate_limit
.as_ref()
.and_then(RateLimiter::from_config)
.or_else(|| RateLimiter::from_rate_str(&source.concurrent_rate)),
static_cookie,
})
}
async fn send_once(&self, url: &str, req: &FetchRequest) -> Result<FetchResponse, FetchError> {
let mut builder = match req.method {
Method::Get => self.client.get(url),
Method::Post => self.client.post(url),
};
for (k, v) in &req.headers {
builder = builder.header(k, final_header_value(self.static_cookie.as_deref(), k, v));
}
if let Some(body) = &req.body {
builder = builder.body(body.clone());
}
let resp = builder.send().await?;
let status = resp.status();
let mut headers = HashMap::new();
for (name, value) in resp.headers() {
if let Ok(v) = value.to_str() {
headers
.entry(name.as_str().to_string())
.and_modify(|e: &mut String| {
e.push('\n');
e.push_str(v);
})
.or_insert_with(|| v.to_string());
}
}
let cf_mitigated = headers.get("cf-mitigated").cloned();
let status_err = resp.error_for_status_ref().err();
let bytes = resp.bytes().await?;
let text = self.decode(&bytes);
if is_challenge(status.as_u16(), cf_mitigated.as_deref(), &text) {
return Err(FetchError::Challenged(format!(
"Cloudflare/反爬挑战 @ {url}"
)));
}
if let Some(e) = status_err {
return Err(FetchError::Http(e));
}
Ok(FetchResponse {
body: text,
status: status.as_u16(),
headers,
})
}
async fn fetch_full_inner(&self, req: FetchRequest) -> Result<FetchResponse, FetchError> {
if let Some(limiter) = &self.limiter {
limiter.acquire().await;
}
let url = self.resolve(&req.url);
let max = self.retry.as_ref().map(|r| r.max).unwrap_or(0);
let backoff = self.retry.as_ref().map(|r| r.backoff_ms).unwrap_or(0);
let mut attempt = 0u32;
loop {
match self.send_once(&url, &req).await {
Ok(resp) => return Ok(resp),
Err(e) => {
if matches!(e, FetchError::Challenged(_)) || attempt >= max {
return Err(e);
}
attempt += 1;
if backoff > 0 {
tokio::time::sleep(Duration::from_millis(backoff)).await;
}
}
}
}
}
pub(crate) fn resolve(&self, url: &str) -> String {
if url.starts_with("http://") || url.starts_with("https://") {
url.to_string()
} else if let Some(rest) = url.strip_prefix('/') {
format!("{}/{}", self.base, rest)
} else {
format!("{}/{}", self.base, url)
}
}
fn decode(&self, bytes: &[u8]) -> String {
use encoding_rs::{BIG5, GB18030, GBK, UTF_8};
match self.charset {
Charset::Utf8 => UTF_8.decode(bytes).0.into_owned(),
Charset::Gbk => GBK.decode(bytes).0.into_owned(),
Charset::Gb18030 => GB18030.decode(bytes).0.into_owned(),
Charset::Big5 => BIG5.decode(bytes).0.into_owned(),
Charset::Auto => {
let (text, _, had_err) = UTF_8.decode(bytes);
if had_err {
GBK.decode(bytes).0.into_owned()
} else {
text.into_owned()
}
}
}
}
}
fn final_header_value(static_cookie: Option<&str>, key: &str, value: &str) -> String {
let value = sanitize_header_value(value);
match static_cookie {
Some(s) if key.eq_ignore_ascii_case("cookie") => merge_cookie_str(s, &value),
_ => value,
}
}
#[async_trait]
impl Fetcher for ReqwestFetcher {
async fn fetch(&self, req: FetchRequest) -> Result<String, FetchError> {
self.fetch_full_inner(req).await.map(|r| r.body)
}
async fn fetch_full(&self, req: FetchRequest) -> Result<FetchResponse, FetchError> {
self.fetch_full_inner(req).await
}
}
#[cfg(test)]
mod tests {
use super::{final_header_value, is_challenge};
const CHALLENGE_HTML: &str = r#"<html><head><title>Just a moment...</title></head>
<body><script>window._cf_chl_opt={cType:'managed'};
a.src='/cdn-cgi/challenge-platform/h/g/orchestrate/chl_page/v1';</script></body></html>"#;
const NORMAL_HTML: &str =
r#"<html><head><title>蛊真人 搜索结果</title></head><body>正文</body></html>"#;
#[test]
fn cf_mitigated_header_is_challenge() {
assert!(is_challenge(200, Some("challenge"), NORMAL_HTML));
}
#[test]
fn challenge_body_with_403_is_challenge() {
assert!(is_challenge(403, None, CHALLENGE_HTML));
assert!(is_challenge(503, None, CHALLENGE_HTML));
}
#[test]
fn normal_200_page_is_not_challenge() {
assert!(!is_challenge(200, None, NORMAL_HTML));
assert!(!is_challenge(200, None, CHALLENGE_HTML));
}
#[test]
fn challenge_markers_without_bad_status_not_challenge() {
assert!(!is_challenge(403, None, NORMAL_HTML));
}
#[test]
fn final_header_value_merges_static_cookie_and_strips_crlf() {
assert_eq!(
final_header_value(Some("device=d1; sid=old"), "Cookie", "sid=new"),
"device=d1; sid=new"
);
assert_eq!(
final_header_value(Some("device=d1"), "cookie", "sid=1"),
"device=d1; sid=1"
);
assert_eq!(
final_header_value(Some("device=d1"), "Authorization", "Bearer\r\nT"),
"BearerT"
);
assert_eq!(final_header_value(None, "Cookie", "a=1\nb=2"), "a=1b=2");
assert_eq!(final_header_value(None, "X-Test", "42"), "42");
}
}