1use super::cookie::{merge_cookie_str, sanitize_header_value};
6use super::error::FetchError;
7use super::source::{BookSource, Charset, Method, RateLimit, Retry};
8use async_trait::async_trait;
9use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
10use std::collections::HashMap;
11use std::sync::Mutex;
12use std::time::{Duration, Instant};
13
14struct RateLimiter {
17 interval: Duration,
18 last: Mutex<Option<Instant>>,
19}
20
21impl RateLimiter {
22 fn from_config(rl: &RateLimit) -> Option<Self> {
23 if rl.max_count == 0 || rl.per_ms == 0 {
24 return None;
25 }
26 Some(Self {
27 interval: Duration::from_millis(rl.per_ms / rl.max_count),
28 last: Mutex::new(None),
29 })
30 }
31
32 fn from_rate_str(s: &str) -> Option<Self> {
34 let s = s.trim();
35 if s.is_empty() {
36 return None;
37 }
38 let (max_count, per_ms) = match s.split_once('/') {
39 Some((n, ms)) => (n.trim().parse().ok()?, ms.trim().parse().ok()?),
40 None => (1, s.parse().ok()?),
41 };
42 Self::from_config(&RateLimit { max_count, per_ms })
43 }
44
45 async fn acquire(&self) {
46 let wait = {
47 let mut last = self.last.lock().expect("rate limiter mutex poisoned");
48 let now = Instant::now();
49 let wait = match *last {
50 Some(prev) => self
51 .interval
52 .checked_sub(now.duration_since(prev))
53 .unwrap_or(Duration::ZERO),
54 None => Duration::ZERO,
55 };
56 *last = Some(now + wait);
58 wait
59 }; if !wait.is_zero() {
61 tokio::time::sleep(wait).await;
62 }
63 }
64}
65
66pub fn is_challenge(status: u16, cf_mitigated: Option<&str>, body: &str) -> bool {
73 if cf_mitigated == Some("challenge") {
74 return true;
75 }
76 matches!(status, 403 | 503)
77 && (body.contains("_cf_chl_opt")
78 || body.contains("/cdn-cgi/challenge-platform/")
79 || body.contains("<title>Just a moment"))
80}
81
82#[derive(Debug, Clone, Default)]
84pub struct FetchRequest {
85 pub url: String,
86 pub method: Method,
87 pub body: Option<String>,
88 pub headers: HashMap<String, String>,
89}
90
91impl FetchRequest {
92 pub fn get(url: impl Into<String>) -> Self {
94 Self {
95 url: url.into(),
96 ..Default::default()
97 }
98 }
99}
100
101#[derive(Debug, Clone, Default)]
106pub struct FetchResponse {
107 pub body: String,
108 pub status: u16,
109 pub headers: HashMap<String, String>,
110}
111
112#[async_trait]
114pub trait Fetcher: Send + Sync {
115 async fn fetch(&self, req: FetchRequest) -> Result<String, FetchError>;
117
118 async fn fetch_full(&self, req: FetchRequest) -> Result<FetchResponse, FetchError> {
121 let body = self.fetch(req).await?;
122 Ok(FetchResponse {
123 body,
124 status: 200,
125 headers: HashMap::new(),
126 })
127 }
128}
129
130pub struct ReqwestFetcher {
132 client: reqwest::Client,
133 base: String,
134 charset: Charset,
135 retry: Option<Retry>,
136 limiter: Option<RateLimiter>,
137 static_cookie: Option<String>,
141}
142
143impl ReqwestFetcher {
144 pub fn new(source: &BookSource) -> Result<Self, FetchError> {
146 let http = &source.http;
147 let mut headers = HeaderMap::new();
148 for (k, v) in &http.headers {
149 let name = HeaderName::from_bytes(k.as_bytes())
150 .map_err(|e| FetchError::Header(e.to_string()))?;
151 let val = HeaderValue::from_str(v).map_err(|e| FetchError::Header(e.to_string()))?;
152 headers.insert(name, val);
153 }
154 let static_cookie = if http.cookies.is_empty() {
157 None
158 } else {
159 let cookie = http
160 .cookies
161 .iter()
162 .map(|(k, v)| format!("{k}={v}"))
163 .collect::<Vec<_>>()
164 .join("; ");
165 let val =
166 HeaderValue::from_str(&cookie).map_err(|e| FetchError::Header(e.to_string()))?;
167 headers.insert(reqwest::header::COOKIE, val);
168 Some(cookie)
169 };
170
171 let mut builder = reqwest::Client::builder()
172 .cookie_store(true)
173 .default_headers(headers);
174 if let Some(ms) = http.timeout {
175 builder = builder.timeout(Duration::from_millis(ms));
176 }
177 let client = builder.build()?;
178
179 Ok(Self {
180 client,
181 base: source.url.trim_end_matches('/').to_string(),
182 charset: http.charset,
183 retry: http.retry.clone(),
184 limiter: http
186 .rate_limit
187 .as_ref()
188 .and_then(RateLimiter::from_config)
189 .or_else(|| RateLimiter::from_rate_str(&source.concurrent_rate)),
190 static_cookie,
191 })
192 }
193
194 async fn send_once(&self, url: &str, req: &FetchRequest) -> Result<FetchResponse, FetchError> {
196 let mut builder = match req.method {
197 Method::Get => self.client.get(url),
198 Method::Post => self.client.post(url),
199 };
200 for (k, v) in &req.headers {
201 builder = builder.header(k, final_header_value(self.static_cookie.as_deref(), k, v));
202 }
203 if let Some(body) = &req.body {
204 builder = builder.body(body.clone());
205 }
206 let resp = builder.send().await?;
207 let status = resp.status();
208 let mut headers = HashMap::new();
210 for (name, value) in resp.headers() {
211 if let Ok(v) = value.to_str() {
212 headers
213 .entry(name.as_str().to_string())
214 .and_modify(|e: &mut String| {
215 e.push('\n');
216 e.push_str(v);
217 })
218 .or_insert_with(|| v.to_string());
219 }
220 }
221 let cf_mitigated = headers.get("cf-mitigated").cloned();
223 let status_err = resp.error_for_status_ref().err();
226 let bytes = resp.bytes().await?;
227 let text = self.decode(&bytes);
228 if is_challenge(status.as_u16(), cf_mitigated.as_deref(), &text) {
229 return Err(FetchError::Challenged(format!(
230 "Cloudflare/反爬挑战 @ {url}"
231 )));
232 }
233 if let Some(e) = status_err {
234 return Err(FetchError::Http(e));
235 }
236 Ok(FetchResponse {
237 body: text,
238 status: status.as_u16(),
239 headers,
240 })
241 }
242
243 async fn fetch_full_inner(&self, req: FetchRequest) -> Result<FetchResponse, FetchError> {
245 if let Some(limiter) = &self.limiter {
247 limiter.acquire().await;
248 }
249 let url = self.resolve(&req.url);
250
251 let max = self.retry.as_ref().map(|r| r.max).unwrap_or(0);
253 let backoff = self.retry.as_ref().map(|r| r.backoff_ms).unwrap_or(0);
254 let mut attempt = 0u32;
255 loop {
256 match self.send_once(&url, &req).await {
257 Ok(resp) => return Ok(resp),
258 Err(e) => {
259 if matches!(e, FetchError::Challenged(_)) || attempt >= max {
261 return Err(e);
262 }
263 attempt += 1;
264 if backoff > 0 {
265 tokio::time::sleep(Duration::from_millis(backoff)).await;
266 }
267 }
268 }
269 }
270 }
271
272 pub(crate) fn resolve(&self, url: &str) -> String {
274 if url.starts_with("http://") || url.starts_with("https://") {
275 url.to_string()
276 } else if let Some(rest) = url.strip_prefix('/') {
277 format!("{}/{}", self.base, rest)
278 } else {
279 format!("{}/{}", self.base, url)
280 }
281 }
282
283 fn decode(&self, bytes: &[u8]) -> String {
285 use encoding_rs::{BIG5, GB18030, GBK, UTF_8};
286 match self.charset {
287 Charset::Utf8 => UTF_8.decode(bytes).0.into_owned(),
288 Charset::Gbk => GBK.decode(bytes).0.into_owned(),
289 Charset::Gb18030 => GB18030.decode(bytes).0.into_owned(),
290 Charset::Big5 => BIG5.decode(bytes).0.into_owned(),
291 Charset::Auto => {
292 let (text, _, had_err) = UTF_8.decode(bytes);
293 if had_err {
294 GBK.decode(bytes).0.into_owned()
295 } else {
296 text.into_owned()
297 }
298 }
299 }
300 }
301}
302
303fn final_header_value(static_cookie: Option<&str>, key: &str, value: &str) -> String {
313 let value = sanitize_header_value(value);
314 match static_cookie {
315 Some(s) if key.eq_ignore_ascii_case("cookie") => merge_cookie_str(s, &value),
316 _ => value,
317 }
318}
319
320#[async_trait]
321impl Fetcher for ReqwestFetcher {
322 async fn fetch(&self, req: FetchRequest) -> Result<String, FetchError> {
323 self.fetch_full_inner(req).await.map(|r| r.body)
324 }
325
326 async fn fetch_full(&self, req: FetchRequest) -> Result<FetchResponse, FetchError> {
327 self.fetch_full_inner(req).await
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use super::{final_header_value, is_challenge};
334
335 const CHALLENGE_HTML: &str = r#"<html><head><title>Just a moment...</title></head>
337 <body><script>window._cf_chl_opt={cType:'managed'};
338 a.src='/cdn-cgi/challenge-platform/h/g/orchestrate/chl_page/v1';</script></body></html>"#;
339
340 const NORMAL_HTML: &str =
341 r#"<html><head><title>蛊真人 搜索结果</title></head><body>正文</body></html>"#;
342
343 #[test]
344 fn cf_mitigated_header_is_challenge() {
345 assert!(is_challenge(200, Some("challenge"), NORMAL_HTML));
347 }
348
349 #[test]
350 fn challenge_body_with_403_is_challenge() {
351 assert!(is_challenge(403, None, CHALLENGE_HTML));
352 assert!(is_challenge(503, None, CHALLENGE_HTML));
353 }
354
355 #[test]
356 fn normal_200_page_is_not_challenge() {
357 assert!(!is_challenge(200, None, NORMAL_HTML));
358 assert!(!is_challenge(200, None, CHALLENGE_HTML));
360 }
361
362 #[test]
363 fn challenge_markers_without_bad_status_not_challenge() {
364 assert!(!is_challenge(403, None, NORMAL_HTML));
366 }
367
368 #[test]
370 fn final_header_value_merges_static_cookie_and_strips_crlf() {
371 assert_eq!(
373 final_header_value(Some("device=d1; sid=old"), "Cookie", "sid=new"),
374 "device=d1; sid=new"
375 );
376 assert_eq!(
377 final_header_value(Some("device=d1"), "cookie", "sid=1"),
378 "device=d1; sid=1"
379 );
380 assert_eq!(
382 final_header_value(Some("device=d1"), "Authorization", "Bearer\r\nT"),
383 "BearerT"
384 );
385 assert_eq!(final_header_value(None, "Cookie", "a=1\nb=2"), "a=1b=2");
387 assert_eq!(final_header_value(None, "X-Test", "42"), "42");
388 }
389}