Skip to main content

parse_book_source/
fetch.rs

1//! 取页端口(Ports & Adapters)。`trait Fetcher` 抽象「拿一个 URL 的解码后正文」,
2//! 默认实现 [`ReqwestFetcher`];反爬后端(wreq / FlareSolverr)可作为另一个 `Fetcher`
3//! 适配器接入而不动引擎(见 design D8/D10)。
4
5use super::error::FetchError;
6use super::source::{BookSource, Charset, Method, RateLimit, Retry};
7use async_trait::async_trait;
8use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
9use std::collections::HashMap;
10use std::sync::Mutex;
11use std::time::{Duration, Instant};
12
13/// 简单令牌桶式限速器:保证两次请求间隔 >= `interval`。
14/// 锁仅用于读改时间戳,在 `.await` 前释放(不跨 await 持锁,符合 design D10)。
15struct RateLimiter {
16    interval: Duration,
17    last: Mutex<Option<Instant>>,
18}
19
20impl RateLimiter {
21    fn from_config(rl: &RateLimit) -> Option<Self> {
22        if rl.max_count == 0 || rl.per_ms == 0 {
23            return None;
24        }
25        Some(Self {
26            interval: Duration::from_millis(rl.per_ms / rl.max_count),
27            last: Mutex::new(None),
28        })
29    }
30
31    async fn acquire(&self) {
32        let wait = {
33            let mut last = self.last.lock().expect("rate limiter mutex poisoned");
34            let now = Instant::now();
35            let wait = match *last {
36                Some(prev) => self
37                    .interval
38                    .checked_sub(now.duration_since(prev))
39                    .unwrap_or(Duration::ZERO),
40                None => Duration::ZERO,
41            };
42            // 预约下一次可用时刻(即便本次需等待),使并发请求依次错开。
43            *last = Some(now + wait);
44            wait
45        }; // 锁在此释放,sleep 不持锁
46        if !wait.is_zero() {
47            tokio::time::sleep(wait).await;
48        }
49    }
50}
51
52/// 判定一次响应是否为反爬挑战(纯函数,便于离线测试)。
53///
54/// 命中任一即视为挑战页(而非真实内容):
55/// ① 响应头 `cf-mitigated: challenge`(最干净、机器可读);
56/// ② HTTP 403/503 且 body 含 Cloudflare 挑战脚本特征
57///    (`_cf_chl_opt` / `/cdn-cgi/challenge-platform/` / `<title>Just a moment`)。
58pub fn is_challenge(status: u16, cf_mitigated: Option<&str>, body: &str) -> bool {
59    if cf_mitigated == Some("challenge") {
60        return true;
61    }
62    matches!(status, 403 | 503)
63        && (body.contains("_cf_chl_opt")
64            || body.contains("/cdn-cgi/challenge-platform/")
65            || body.contains("<title>Just a moment"))
66}
67
68/// 一次取页请求(URL 已是最终待请求地址或相对路径)。
69#[derive(Debug, Clone, Default)]
70pub struct FetchRequest {
71    pub url: String,
72    pub method: Method,
73    pub body: Option<String>,
74    pub headers: HashMap<String, String>,
75}
76
77impl FetchRequest {
78    /// 便捷构造一个 GET 请求。
79    pub fn get(url: impl Into<String>) -> Self {
80        Self {
81            url: url.into(),
82            ..Default::default()
83        }
84    }
85}
86
87/// 取页抽象。实现者负责发请求 + 按目标站字符集解码为文本。
88#[async_trait]
89pub trait Fetcher: Send + Sync {
90    /// 取一个页面的解码后文本。
91    async fn fetch(&self, req: FetchRequest) -> Result<String, FetchError>;
92}
93
94/// 基于 reqwest + rustls + cookie_store 的默认取页实现(含限速与重试)。
95pub struct ReqwestFetcher {
96    client: reqwest::Client,
97    base: String,
98    charset: Charset,
99    retry: Option<Retry>,
100    limiter: Option<RateLimiter>,
101}
102
103impl ReqwestFetcher {
104    /// 依据书源的 `http` 配置构建客户端(默认头、静态 cookie、超时)。
105    pub fn new(source: &BookSource) -> Result<Self, FetchError> {
106        let http = &source.http;
107        let mut headers = HeaderMap::new();
108        for (k, v) in &http.headers {
109            let name = HeaderName::from_bytes(k.as_bytes())
110                .map_err(|e| FetchError::Header(e.to_string()))?;
111            let val = HeaderValue::from_str(v).map_err(|e| FetchError::Header(e.to_string()))?;
112            headers.insert(name, val);
113        }
114        // 静态 cookie 合成为 Cookie 头(会话 cookie 仍由 cookie_store 自动累积)。
115        if !http.cookies.is_empty() {
116            let cookie = http
117                .cookies
118                .iter()
119                .map(|(k, v)| format!("{k}={v}"))
120                .collect::<Vec<_>>()
121                .join("; ");
122            let val =
123                HeaderValue::from_str(&cookie).map_err(|e| FetchError::Header(e.to_string()))?;
124            headers.insert(reqwest::header::COOKIE, val);
125        }
126
127        let mut builder = reqwest::Client::builder()
128            .cookie_store(true)
129            .default_headers(headers);
130        if let Some(ms) = http.timeout {
131            builder = builder.timeout(Duration::from_millis(ms));
132        }
133        let client = builder.build()?;
134
135        Ok(Self {
136            client,
137            base: source.url.trim_end_matches('/').to_string(),
138            charset: http.charset,
139            retry: http.retry.clone(),
140            limiter: http.rate_limit.as_ref().and_then(RateLimiter::from_config),
141        })
142    }
143
144    /// 发起一次请求并解码(单次,不含重试)。
145    async fn send_once(&self, url: &str, req: &FetchRequest) -> Result<String, FetchError> {
146        let mut builder = match req.method {
147            Method::Get => self.client.get(url),
148            Method::Post => self.client.post(url),
149        };
150        for (k, v) in &req.headers {
151            builder = builder.header(k, v);
152        }
153        if let Some(body) = &req.body {
154            builder = builder.body(body.clone());
155        }
156        let resp = builder.send().await?;
157        let status = resp.status();
158        // 反爬信号:`cf-mitigated: challenge` 头(最干净、机器可读)。
159        let cf_mitigated = resp
160            .headers()
161            .get("cf-mitigated")
162            .and_then(|v| v.to_str().ok())
163            .map(str::to_owned);
164        // 先取出 HTTP 状态错误(error_for_status_ref 不消费 body),
165        // 再读 body 以便识别挑战页特征(挑战常以 403 返回)。
166        let status_err = resp.error_for_status_ref().err();
167        let bytes = resp.bytes().await?;
168        let text = self.decode(&bytes);
169        if is_challenge(status.as_u16(), cf_mitigated.as_deref(), &text) {
170            return Err(FetchError::Challenged(format!(
171                "Cloudflare/反爬挑战 @ {url}"
172            )));
173        }
174        if let Some(e) = status_err {
175            return Err(FetchError::Http(e));
176        }
177        Ok(text)
178    }
179
180    /// 把相对路径解析为绝对 URL(`http(s)` 开头则原样返回)。
181    pub(crate) fn resolve(&self, url: &str) -> String {
182        if url.starts_with("http://") || url.starts_with("https://") {
183            url.to_string()
184        } else if let Some(rest) = url.strip_prefix('/') {
185            format!("{}/{}", self.base, rest)
186        } else {
187            format!("{}/{}", self.base, url)
188        }
189    }
190
191    /// 按 charset 把响应字节解码为文本。
192    fn decode(&self, bytes: &[u8]) -> String {
193        use encoding_rs::{BIG5, GB18030, GBK, UTF_8};
194        match self.charset {
195            Charset::Utf8 => UTF_8.decode(bytes).0.into_owned(),
196            Charset::Gbk => GBK.decode(bytes).0.into_owned(),
197            Charset::Gb18030 => GB18030.decode(bytes).0.into_owned(),
198            Charset::Big5 => BIG5.decode(bytes).0.into_owned(),
199            Charset::Auto => {
200                let (text, _, had_err) = UTF_8.decode(bytes);
201                if had_err {
202                    GBK.decode(bytes).0.into_owned()
203                } else {
204                    text.into_owned()
205                }
206            }
207        }
208    }
209}
210
211#[async_trait]
212impl Fetcher for ReqwestFetcher {
213    async fn fetch(&self, req: FetchRequest) -> Result<String, FetchError> {
214        // 限速(如配置):错开请求间隔。
215        if let Some(limiter) = &self.limiter {
216            limiter.acquire().await;
217        }
218        let url = self.resolve(&req.url);
219
220        // 重试:失败后按 backoff 退避,最多重试 retry.max 次。
221        let max = self.retry.as_ref().map(|r| r.max).unwrap_or(0);
222        let backoff = self.retry.as_ref().map(|r| r.backoff_ms).unwrap_or(0);
223        let mut attempt = 0u32;
224        loop {
225            match self.send_once(&url, &req).await {
226                Ok(text) => return Ok(text),
227                Err(e) => {
228                    // 反爬挑战重试无意义(仍会被挑战),直接返回交上层升级/降级。
229                    if matches!(e, FetchError::Challenged(_)) || attempt >= max {
230                        return Err(e);
231                    }
232                    attempt += 1;
233                    if backoff > 0 {
234                        tokio::time::sleep(Duration::from_millis(backoff)).await;
235                    }
236                }
237            }
238        }
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use super::is_challenge;
245
246    /// Cloudflare 托管挑战页的最小特征(取自实测 bilixs 响应)。
247    const CHALLENGE_HTML: &str = r#"<html><head><title>Just a moment...</title></head>
248        <body><script>window._cf_chl_opt={cType:'managed'};
249        a.src='/cdn-cgi/challenge-platform/h/g/orchestrate/chl_page/v1';</script></body></html>"#;
250
251    const NORMAL_HTML: &str =
252        r#"<html><head><title>蛊真人 搜索结果</title></head><body>正文</body></html>"#;
253
254    #[test]
255    fn cf_mitigated_header_is_challenge() {
256        // 即便状态 200,带 cf-mitigated: challenge 头也判为挑战。
257        assert!(is_challenge(200, Some("challenge"), NORMAL_HTML));
258    }
259
260    #[test]
261    fn challenge_body_with_403_is_challenge() {
262        assert!(is_challenge(403, None, CHALLENGE_HTML));
263        assert!(is_challenge(503, None, CHALLENGE_HTML));
264    }
265
266    #[test]
267    fn normal_200_page_is_not_challenge() {
268        assert!(!is_challenge(200, None, NORMAL_HTML));
269        // 仅有挑战特征但状态 200(无 cf-mitigated)不误判,避免正文含 cdn-cgi 字样被冤枉。
270        assert!(!is_challenge(200, None, CHALLENGE_HTML));
271    }
272
273    #[test]
274    fn challenge_markers_without_bad_status_not_challenge() {
275        // 403 但 body 无挑战特征 → 不是挑战(交由普通 HTTP 错误处理)。
276        assert!(!is_challenge(403, None, NORMAL_HTML));
277    }
278}