Skip to main content

parse_book_source/
fetch.rs

1//! 取页端口(Ports & Adapters)。`trait Fetcher` 抽象「拿一个 URL 的解码后正文」,
2//! 默认实现 [`ReqwestFetcher`];反爬后端(wreq / FlareSolverr)可作为另一个 `Fetcher`
3//! 适配器接入而不动引擎(见 design D8/D10)。
4
5use super::cookie::{merge_cookie_str, sanitize_header_value};
6use super::error::FetchError;
7use super::source::{BookSource, Charset, Method, RateLimit, Retry};
8use async_trait::async_trait;
9use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
10use std::collections::HashMap;
11use std::sync::Mutex;
12use std::time::{Duration, Instant};
13
14/// 简单令牌桶式限速器:保证两次请求间隔 >= `interval`。
15/// 锁仅用于读改时间戳,在 `.await` 前释放(不跨 await 持锁,符合 design D10)。
16struct RateLimiter {
17    interval: Duration,
18    last: Mutex<Option<Instant>>,
19}
20
21impl RateLimiter {
22    fn from_config(rl: &RateLimit) -> Option<Self> {
23        if rl.max_count == 0 || rl.per_ms == 0 {
24            return None;
25        }
26        Some(Self {
27            interval: Duration::from_millis(rl.per_ms / rl.max_count),
28            last: Mutex::new(None),
29        })
30    }
31
32    /// 解析 `concurrentRate` 字符串:`"N/ms"`(N 次每 ms)或纯毫秒间隔(`"1000"` = 每 1000ms 一次)。
33    fn from_rate_str(s: &str) -> Option<Self> {
34        let s = s.trim();
35        if s.is_empty() {
36            return None;
37        }
38        let (max_count, per_ms) = match s.split_once('/') {
39            Some((n, ms)) => (n.trim().parse().ok()?, ms.trim().parse().ok()?),
40            None => (1, s.parse().ok()?),
41        };
42        Self::from_config(&RateLimit { max_count, per_ms })
43    }
44
45    async fn acquire(&self) {
46        let wait = {
47            let mut last = self.last.lock().expect("rate limiter mutex poisoned");
48            let now = Instant::now();
49            let wait = match *last {
50                Some(prev) => self
51                    .interval
52                    .checked_sub(now.duration_since(prev))
53                    .unwrap_or(Duration::ZERO),
54                None => Duration::ZERO,
55            };
56            // 预约下一次可用时刻(即便本次需等待),使并发请求依次错开。
57            *last = Some(now + wait);
58            wait
59        }; // 锁在此释放,sleep 不持锁
60        if !wait.is_zero() {
61            tokio::time::sleep(wait).await;
62        }
63    }
64}
65
66/// 判定一次响应是否为反爬挑战(纯函数,便于离线测试)。
67///
68/// 命中任一即视为挑战页(而非真实内容):
69/// ① 响应头 `cf-mitigated: challenge`(最干净、机器可读);
70/// ② HTTP 403/503 且 body 含 Cloudflare 挑战脚本特征
71///    (`_cf_chl_opt` / `/cdn-cgi/challenge-platform/` / `<title>Just a moment`)。
72pub fn is_challenge(status: u16, cf_mitigated: Option<&str>, body: &str) -> bool {
73    if cf_mitigated == Some("challenge") {
74        return true;
75    }
76    matches!(status, 403 | 503)
77        && (body.contains("_cf_chl_opt")
78            || body.contains("/cdn-cgi/challenge-platform/")
79            || body.contains("<title>Just a moment"))
80}
81
82/// 一次取页请求(URL 已是最终待请求地址或相对路径)。
83#[derive(Debug, Clone, Default)]
84pub struct FetchRequest {
85    pub url: String,
86    pub method: Method,
87    pub body: Option<String>,
88    pub headers: HashMap<String, String>,
89}
90
91impl FetchRequest {
92    /// 便捷构造一个 GET 请求。
93    pub fn get(url: impl Into<String>) -> Self {
94        Self {
95            url: url.into(),
96            ..Default::default()
97        }
98    }
99}
100
101/// 一次取页的完整响应:解码后 body + HTTP 状态码 + 响应头。
102///
103/// 供 `net.connect` 读取 `Set-Cookie` / `Location` / 状态码等(`fetch` 只回 body)。
104/// 同名多值头(如多个 `Set-Cookie`)以 `\n` 连接。
105#[derive(Debug, Clone, Default)]
106pub struct FetchResponse {
107    pub body: String,
108    pub status: u16,
109    pub headers: HashMap<String, String>,
110}
111
112/// 取页抽象。实现者负责发请求 + 按目标站字符集解码为文本。
113#[async_trait]
114pub trait Fetcher: Send + Sync {
115    /// 取一个页面的解码后文本。
116    async fn fetch(&self, req: FetchRequest) -> Result<String, FetchError>;
117
118    /// 取完整响应(body + 状态码 + 响应头)。默认仅回 body(状态 200、头为空);
119    /// 需要 headers/状态码的实现(如 [`ReqwestFetcher`])应覆盖本方法。
120    async fn fetch_full(&self, req: FetchRequest) -> Result<FetchResponse, FetchError> {
121        let body = self.fetch(req).await?;
122        Ok(FetchResponse {
123            body,
124            status: 200,
125            headers: HashMap::new(),
126        })
127    }
128}
129
130/// 基于 reqwest + rustls + cookie_store 的默认取页实现(含限速与重试)。
131pub struct ReqwestFetcher {
132    client: reqwest::Client,
133    base: String,
134    charset: Charset,
135    retry: Option<Retry>,
136    limiter: Option<RateLimiter>,
137    /// 书源静态 `http.cookies` 合成串:除进 `default_headers` 外留存一份,
138    /// 供请求级 `Cookie` 出现时在 [`final_header_value`] 合并(reqwest 的 `default_headers`
139    /// 对同名请求级头是「整体替换」而非合并语义)。
140    static_cookie: Option<String>,
141}
142
143impl ReqwestFetcher {
144    /// 依据书源的 `http` 配置构建客户端(默认头、静态 cookie、超时)。
145    pub fn new(source: &BookSource) -> Result<Self, FetchError> {
146        let http = &source.http;
147        let mut headers = HeaderMap::new();
148        for (k, v) in &http.headers {
149            let name = HeaderName::from_bytes(k.as_bytes())
150                .map_err(|e| FetchError::Header(e.to_string()))?;
151            let val = HeaderValue::from_str(v).map_err(|e| FetchError::Header(e.to_string()))?;
152            headers.insert(name, val);
153        }
154        // 静态 cookie 合成为 Cookie 头(会话 cookie 仍由 cookie_store 自动累积);
155        // 原串同时留存于 self.static_cookie,供请求级 Cookie 出现时合并(见 final_header_value)。
156        let static_cookie = if http.cookies.is_empty() {
157            None
158        } else {
159            let cookie = http
160                .cookies
161                .iter()
162                .map(|(k, v)| format!("{k}={v}"))
163                .collect::<Vec<_>>()
164                .join("; ");
165            let val =
166                HeaderValue::from_str(&cookie).map_err(|e| FetchError::Header(e.to_string()))?;
167            headers.insert(reqwest::header::COOKIE, val);
168            Some(cookie)
169        };
170
171        let mut builder = reqwest::Client::builder()
172            .cookie_store(true)
173            .default_headers(headers);
174        if let Some(ms) = http.timeout {
175            builder = builder.timeout(Duration::from_millis(ms));
176        }
177        let client = builder.build()?;
178
179        Ok(Self {
180            client,
181            base: source.url.trim_end_matches('/').to_string(),
182            charset: http.charset,
183            retry: http.retry.clone(),
184            // 限速来源:优先 http.rateLimit,否则 concurrentRate("N/ms" 或间隔)。
185            limiter: http
186                .rate_limit
187                .as_ref()
188                .and_then(RateLimiter::from_config)
189                .or_else(|| RateLimiter::from_rate_str(&source.concurrent_rate)),
190            static_cookie,
191        })
192    }
193
194    /// 发起一次请求并解码(单次,不含重试),返回完整响应(body + 状态码 + 响应头)。
195    async fn send_once(&self, url: &str, req: &FetchRequest) -> Result<FetchResponse, FetchError> {
196        let mut builder = match req.method {
197            Method::Get => self.client.get(url),
198            Method::Post => self.client.post(url),
199        };
200        for (k, v) in &req.headers {
201            builder = builder.header(k, final_header_value(self.static_cookie.as_deref(), k, v));
202        }
203        if let Some(body) = &req.body {
204            builder = builder.body(body.clone());
205        }
206        let resp = builder.send().await?;
207        let status = resp.status();
208        // 收集响应头(同名多值以 `\n` 连接,保留多个 Set-Cookie)。
209        let mut headers = HashMap::new();
210        for (name, value) in resp.headers() {
211            if let Ok(v) = value.to_str() {
212                headers
213                    .entry(name.as_str().to_string())
214                    .and_modify(|e: &mut String| {
215                        e.push('\n');
216                        e.push_str(v);
217                    })
218                    .or_insert_with(|| v.to_string());
219            }
220        }
221        // 反爬信号:`cf-mitigated: challenge` 头(最干净、机器可读)。
222        let cf_mitigated = headers.get("cf-mitigated").cloned();
223        // 先取出 HTTP 状态错误(error_for_status_ref 不消费 body),
224        // 再读 body 以便识别挑战页特征(挑战常以 403 返回)。
225        let status_err = resp.error_for_status_ref().err();
226        let bytes = resp.bytes().await?;
227        let text = self.decode(&bytes);
228        if is_challenge(status.as_u16(), cf_mitigated.as_deref(), &text) {
229            return Err(FetchError::Challenged(format!(
230                "Cloudflare/反爬挑战 @ {url}"
231            )));
232        }
233        if let Some(e) = status_err {
234            return Err(FetchError::Http(e));
235        }
236        Ok(FetchResponse {
237            body: text,
238            status: status.as_u16(),
239            headers,
240        })
241    }
242
243    /// 限速 + resolve + 重试 的取页主循环,返回完整响应。
244    async fn fetch_full_inner(&self, req: FetchRequest) -> Result<FetchResponse, FetchError> {
245        // 限速(如配置):错开请求间隔。
246        if let Some(limiter) = &self.limiter {
247            limiter.acquire().await;
248        }
249        let url = self.resolve(&req.url);
250
251        // 重试:失败后按 backoff 退避,最多重试 retry.max 次。
252        let max = self.retry.as_ref().map(|r| r.max).unwrap_or(0);
253        let backoff = self.retry.as_ref().map(|r| r.backoff_ms).unwrap_or(0);
254        let mut attempt = 0u32;
255        loop {
256            match self.send_once(&url, &req).await {
257                Ok(resp) => return Ok(resp),
258                Err(e) => {
259                    // 反爬挑战重试无意义(仍会被挑战),直接返回交上层升级/降级。
260                    if matches!(e, FetchError::Challenged(_)) || attempt >= max {
261                        return Err(e);
262                    }
263                    attempt += 1;
264                    if backoff > 0 {
265                        tokio::time::sleep(Duration::from_millis(backoff)).await;
266                    }
267                }
268            }
269        }
270    }
271
272    /// 把相对路径解析为绝对 URL(`http(s)` 开头则原样返回)。
273    pub(crate) fn resolve(&self, url: &str) -> String {
274        if url.starts_with("http://") || url.starts_with("https://") {
275            url.to_string()
276        } else if let Some(rest) = url.strip_prefix('/') {
277            format!("{}/{}", self.base, rest)
278        } else {
279            format!("{}/{}", self.base, url)
280        }
281    }
282
283    /// 按 charset 把响应字节解码为文本。
284    fn decode(&self, bytes: &[u8]) -> String {
285        use encoding_rs::{BIG5, GB18030, GBK, UTF_8};
286        match self.charset {
287            Charset::Utf8 => UTF_8.decode(bytes).0.into_owned(),
288            Charset::Gbk => GBK.decode(bytes).0.into_owned(),
289            Charset::Gb18030 => GB18030.decode(bytes).0.into_owned(),
290            Charset::Big5 => BIG5.decode(bytes).0.into_owned(),
291            Charset::Auto => {
292                let (text, _, had_err) = UTF_8.decode(bytes);
293                if had_err {
294                    GBK.decode(bytes).0.into_owned()
295                } else {
296                    text.into_owned()
297                }
298            }
299        }
300    }
301}
302
303/// 计算一个请求级 header 的最终出站值(纯函数,便于离线单测):
304///
305/// - 值一律剥 CR/LF——纵深防御:已落盘的脏 loginHeader/cookie(多 `Set-Cookie` 以 `\n` 连接)
306///   不致让 reqwest builder 构建失败、拖垮该书源全部请求;
307/// - `Cookie` 头(大小写不敏感)与书源静态 `http.cookies` 串合并:reqwest 对 `default_headers`
308///   是「请求级同名头存在时整体替换」语义,不合并的话登录/jar Cookie 一注入,静态的设备/风控
309///   cookie 就被整串顶掉。静态串为最低优先级基底(请求级同名 key 胜出)。
310///   注:服务端 `Max-Age=0` 删除与静态配置同名的 cookie 时,静态值会被「复活」——
311///   这是书源静态 cookie 的固有语义(Legado 亦始终发送书源配置 cookie),可接受。
312fn final_header_value(static_cookie: Option<&str>, key: &str, value: &str) -> String {
313    let value = sanitize_header_value(value);
314    match static_cookie {
315        Some(s) if key.eq_ignore_ascii_case("cookie") => merge_cookie_str(s, &value),
316        _ => value,
317    }
318}
319
320#[async_trait]
321impl Fetcher for ReqwestFetcher {
322    async fn fetch(&self, req: FetchRequest) -> Result<String, FetchError> {
323        self.fetch_full_inner(req).await.map(|r| r.body)
324    }
325
326    async fn fetch_full(&self, req: FetchRequest) -> Result<FetchResponse, FetchError> {
327        self.fetch_full_inner(req).await
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    use super::{final_header_value, is_challenge};
334
335    /// Cloudflare 托管挑战页的最小特征(取自实测 bilixs 响应)。
336    const CHALLENGE_HTML: &str = r#"<html><head><title>Just a moment...</title></head>
337        <body><script>window._cf_chl_opt={cType:'managed'};
338        a.src='/cdn-cgi/challenge-platform/h/g/orchestrate/chl_page/v1';</script></body></html>"#;
339
340    const NORMAL_HTML: &str =
341        r#"<html><head><title>蛊真人 搜索结果</title></head><body>正文</body></html>"#;
342
343    #[test]
344    fn cf_mitigated_header_is_challenge() {
345        // 即便状态 200,带 cf-mitigated: challenge 头也判为挑战。
346        assert!(is_challenge(200, Some("challenge"), NORMAL_HTML));
347    }
348
349    #[test]
350    fn challenge_body_with_403_is_challenge() {
351        assert!(is_challenge(403, None, CHALLENGE_HTML));
352        assert!(is_challenge(503, None, CHALLENGE_HTML));
353    }
354
355    #[test]
356    fn normal_200_page_is_not_challenge() {
357        assert!(!is_challenge(200, None, NORMAL_HTML));
358        // 仅有挑战特征但状态 200(无 cf-mitigated)不误判,避免正文含 cdn-cgi 字样被冤枉。
359        assert!(!is_challenge(200, None, CHALLENGE_HTML));
360    }
361
362    #[test]
363    fn challenge_markers_without_bad_status_not_challenge() {
364        // 403 但 body 无挑战特征 → 不是挑战(交由普通 HTTP 错误处理)。
365        assert!(!is_challenge(403, None, NORMAL_HTML));
366    }
367
368    // ── 审查/correctness:请求级 Cookie 与静态 http.cookies 合并(default_headers 是替换语义)──
369    #[test]
370    fn final_header_value_merges_static_cookie_and_strips_crlf() {
371        // 请求级 Cookie 与静态基底合并(请求级同名 key 胜出),key 大小写不敏感。
372        assert_eq!(
373            final_header_value(Some("device=d1; sid=old"), "Cookie", "sid=new"),
374            "device=d1; sid=new"
375        );
376        assert_eq!(
377            final_header_value(Some("device=d1"), "cookie", "sid=1"),
378            "device=d1; sid=1"
379        );
380        // 非 Cookie 头不掺静态基底,但仍剥 CR/LF(纵深防御,脏落盘数据不致构建失败)。
381        assert_eq!(
382            final_header_value(Some("device=d1"), "Authorization", "Bearer\r\nT"),
383            "BearerT"
384        );
385        // 无静态 cookie:原样透传(剥 CR/LF)。
386        assert_eq!(final_header_value(None, "Cookie", "a=1\nb=2"), "a=1b=2");
387        assert_eq!(final_header_value(None, "X-Test", "42"), "42");
388    }
389}