Skip to main content

chunkshop/sources/
http.rs

1//! HTTP source with depth-bounded crawl + ETag/Last-Modified incremental sync.
2//!
3//! Mirrors `python/src/chunkshop/sources/http.py` (RM-B Task 4 / Python
4//! commit `fcbad65`). Behavior summary:
5//!
6//! - `crawl_depth = 0` (default): fetch only the listed URLs (+ sitemap).
7//! - `crawl_depth >= 1`: BFS crawl from seeds. Same-host filter is on by
8//!   default; flip `allow_external` to follow off-host links.
9//! - Conditional GETs via `If-None-Match` / `If-Modified-Since`. 304 → skip.
10//! - Cursor: `{ url: { etag, last_modified } }`. Per-doc delta returned by
11//!   `cursor_from`; consumer merges deltas into running cursor (same shape
12//!   as the S3 source).
13//! - Polite delay between requests; robots.txt enforcement (one fetch per
14//!   host, cached).
15//! - Non-2xx responses are logged-and-skipped, not raised (Python parity).
16
17use anyhow::{Context, Result};
18use regex::Regex;
19use reqwest::header::{HeaderMap, HeaderValue, CONTENT_TYPE, ETAG, LAST_MODIFIED};
20use reqwest::StatusCode;
21use serde::{Deserialize, Serialize};
22use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
23use std::future::Future;
24use std::sync::Mutex;
25use std::time::{Duration, Instant};
26
27use crate::config::HttpSourceConfig;
28use crate::sources::base::{Document, IncrementalSource};
29
30/// Per-URL cursor entry for `HttpSource::iter_changes_since`.
31///
32/// `etag` is sent as `If-None-Match`; `last_modified` as `If-Modified-Since`.
33/// A 304 Not Modified response skips emitting the doc.
34#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
35pub struct HttpUrlCursor {
36    #[serde(default, skip_serializing_if = "Option::is_none")]
37    pub etag: Option<String>,
38    #[serde(default, skip_serializing_if = "Option::is_none")]
39    pub last_modified: Option<String>,
40}
41
42/// MIME types we treat as fetchable text. Everything else is skipped with a
43/// warning (mirrors Python `_TEXTY_MIMES`).
44const TEXTY_MIMES: &[&str] = &[
45    "text/html",
46    "text/plain",
47    "text/markdown",
48    "application/json",
49    "application/xml",
50    "text/xml",
51];
52
53/// Minimal `robots.txt` view used for `can_fetch` checks.
54#[derive(Debug, Default, Clone)]
55struct RobotsRules {
56    disallow: Vec<String>,
57    allow: Vec<String>,
58}
59
60impl RobotsRules {
61    fn parse(body: &str, ua: &str) -> Self {
62        let mut groups: HashMap<String, RobotsRules> = HashMap::new();
63        let mut current_uas: Vec<String> = Vec::new();
64        for raw_line in body.lines() {
65            let line = raw_line.trim();
66            if line.is_empty() || line.starts_with('#') {
67                continue;
68            }
69            let (key, value) = match line.split_once(':') {
70                Some((k, v)) => (k.trim().to_lowercase(), v.trim().to_string()),
71                None => continue,
72            };
73            let value = value.split('#').next().unwrap_or("").trim().to_string();
74            match key.as_str() {
75                "user-agent" => {
76                    current_uas.push(value.to_lowercase());
77                }
78                "disallow" => {
79                    for u in &current_uas {
80                        groups
81                            .entry(u.clone())
82                            .or_default()
83                            .disallow
84                            .push(value.clone());
85                    }
86                }
87                "allow" => {
88                    for u in &current_uas {
89                        groups
90                            .entry(u.clone())
91                            .or_default()
92                            .allow
93                            .push(value.clone());
94                    }
95                }
96                _ => {}
97            }
98        }
99        let ua_lower = ua.to_lowercase();
100        let mut best: Option<&RobotsRules> = None;
101        for (rules_ua, rules) in &groups {
102            if rules_ua == "*" {
103                best.get_or_insert(rules);
104            } else if ua_lower.contains(rules_ua.as_str()) {
105                best = Some(rules);
106            }
107        }
108        best.cloned().unwrap_or_default()
109    }
110
111    fn can_fetch(&self, path: &str) -> bool {
112        let allow_match = self
113            .allow
114            .iter()
115            .filter(|p| !p.is_empty() && path.starts_with(p.as_str()))
116            .map(|p| p.len())
117            .max()
118            .unwrap_or(0);
119        let disallow_match = self
120            .disallow
121            .iter()
122            .filter(|p| !p.is_empty() && path.starts_with(p.as_str()))
123            .map(|p| p.len())
124            .max()
125            .unwrap_or(0);
126        if disallow_match == 0 {
127            return true;
128        }
129        allow_match >= disallow_match
130    }
131}
132
133pub struct HttpSource {
134    cfg: HttpSourceConfig,
135    last_request_ts: Mutex<Option<Instant>>,
136    robots_cache: Mutex<HashMap<String, Option<RobotsRules>>>,
137}
138
139impl HttpSource {
140    pub fn new(cfg: HttpSourceConfig) -> Self {
141        Self {
142            cfg,
143            last_request_ts: Mutex::new(None),
144            robots_cache: Mutex::new(HashMap::new()),
145        }
146    }
147
148    fn build_client(&self) -> Result<reqwest::Client> {
149        let mut headers = HeaderMap::new();
150        if let Ok(ua) = HeaderValue::from_str(&self.cfg.user_agent) {
151            headers.insert(reqwest::header::USER_AGENT, ua);
152        }
153        reqwest::Client::builder()
154            .default_headers(headers)
155            .timeout(Duration::from_secs(30))
156            .redirect(reqwest::redirect::Policy::default())
157            .build()
158            .context("build reqwest client")
159    }
160
161    async fn polite_wait(&self) {
162        let delay = self.cfg.request_delay_seconds;
163        if delay <= 0.0 {
164            return;
165        }
166        let wait = {
167            let mut guard = self.last_request_ts.lock().unwrap();
168            let now = Instant::now();
169            let wait = match *guard {
170                Some(last) => {
171                    let elapsed = now.duration_since(last).as_secs_f64();
172                    if elapsed < delay {
173                        Some(Duration::from_secs_f64(delay - elapsed))
174                    } else {
175                        None
176                    }
177                }
178                None => None,
179            };
180            *guard = Some(Instant::now());
181            wait
182        };
183        if let Some(d) = wait {
184            tokio::time::sleep(d).await;
185            *self.last_request_ts.lock().unwrap() = Some(Instant::now());
186        }
187    }
188
189    async fn robots_for(&self, client: &reqwest::Client, url: &str) -> Option<RobotsRules> {
190        if !self.cfg.respect_robots {
191            return None;
192        }
193        let parsed = reqwest::Url::parse(url).ok()?;
194        let host = parsed.host_str()?;
195        let port_part = parsed.port().map(|p| format!(":{p}")).unwrap_or_default();
196        let host_key = format!("{}://{}{}", parsed.scheme(), host, port_part);
197
198        {
199            let cache = self.robots_cache.lock().unwrap();
200            if let Some(cached) = cache.get(&host_key) {
201                return cached.clone();
202            }
203        }
204
205        let robots_url = format!("{host_key}/robots.txt");
206        let fetched = self.fetch_robots_body(client, &robots_url).await;
207        let parsed_rules = fetched.map(|body| RobotsRules::parse(&body, &self.cfg.user_agent));
208
209        self.robots_cache
210            .lock()
211            .unwrap()
212            .insert(host_key, parsed_rules.clone());
213        parsed_rules
214    }
215
216    async fn fetch_robots_body(&self, client: &reqwest::Client, url: &str) -> Option<String> {
217        self.polite_wait().await;
218        let resp = client.get(url).send().await.ok()?;
219        if !resp.status().is_success() {
220            return None;
221        }
222        resp.text().await.ok()
223    }
224
225    fn robots_allows(rules: &Option<RobotsRules>, url: &str) -> bool {
226        match rules {
227            None => true,
228            Some(r) => match reqwest::Url::parse(url) {
229                Ok(u) => r.can_fetch(u.path()),
230                Err(_) => true,
231            },
232        }
233    }
234
235    async fn request(
236        &self,
237        client: &reqwest::Client,
238        url: &str,
239        cursor_entry: Option<&HttpUrlCursor>,
240    ) -> Option<(StatusCode, HeaderMap, String)> {
241        self.polite_wait().await;
242        let mut req = client.get(url);
243        if let Some(ce) = cursor_entry {
244            if let Some(etag) = &ce.etag {
245                if let Ok(v) = HeaderValue::from_str(etag) {
246                    req = req.header(reqwest::header::IF_NONE_MATCH, v);
247                }
248            }
249            if let Some(lm) = &ce.last_modified {
250                if let Ok(v) = HeaderValue::from_str(lm) {
251                    req = req.header(reqwest::header::IF_MODIFIED_SINCE, v);
252                }
253            }
254        }
255        let resp = match req.send().await {
256            Ok(r) => r,
257            Err(err) => {
258                tracing::warn!(target: "chunkshop::http", "GET {url} failed: {err}");
259                return None;
260            }
261        };
262        let status = resp.status();
263        let headers = resp.headers().clone();
264        let body = match resp.text().await {
265            Ok(t) => t,
266            Err(err) => {
267                tracing::warn!(target: "chunkshop::http", "reading body of {url}: {err}");
268                return None;
269            }
270        };
271        Some((status, headers, body))
272    }
273
274    async fn fetch_one(
275        &self,
276        client: &reqwest::Client,
277        url: &str,
278        cursor_entry: Option<&HttpUrlCursor>,
279        robots: &Option<RobotsRules>,
280    ) -> (Option<Document>, Vec<String>) {
281        if !Self::robots_allows(robots, url) {
282            tracing::info!(target: "chunkshop::http", "robots.txt disallows {url}; skipping");
283            return (None, Vec::new());
284        }
285        let (status, headers, body) = match self.request(client, url, cursor_entry).await {
286            Some(t) => t,
287            None => return (None, Vec::new()),
288        };
289        if status == StatusCode::NOT_MODIFIED {
290            return (None, Vec::new());
291        }
292        if !status.is_success() {
293            tracing::warn!(target: "chunkshop::http", "GET {url}: status {status}; skipping");
294            return (None, Vec::new());
295        }
296        let ctype_full = headers
297            .get(CONTENT_TYPE)
298            .and_then(|v| v.to_str().ok())
299            .unwrap_or("")
300            .to_string();
301        let mime = content_type_root(&ctype_full);
302        let etag = headers
303            .get(ETAG)
304            .and_then(|v| v.to_str().ok())
305            .map(|s| s.to_string());
306        let last_modified = headers
307            .get(LAST_MODIFIED)
308            .and_then(|v| v.to_str().ok())
309            .map(|s| s.to_string());
310
311        if !mime.starts_with("text/") && !TEXTY_MIMES.contains(&mime.as_str()) {
312            tracing::warn!(
313                target: "chunkshop::http",
314                "Skipping binary content {mime} for {url} (use the files source for binaries)"
315            );
316            return (None, Vec::new());
317        }
318
319        let (content, title, links) = if mime == "text/html" {
320            let title = extract_title(&body);
321            let links = extract_links(&body, url);
322            let stripped = strip_html_to_text(&body);
323            (stripped, title, links)
324        } else {
325            (body, None, Vec::new())
326        };
327
328        let metadata = serde_json::json!({
329            "url": url,
330            "status_code": status.as_u16(),
331            "content_type": ctype_full,
332            "etag": etag,
333            "last_modified": last_modified,
334        });
335        let doc = Document {
336            id: url.to_string(),
337            content,
338            title,
339            metadata,
340            fingerprint: etag,
341        };
342        (Some(doc), links)
343    }
344
345    async fn seed_urls(&self, client: &reqwest::Client) -> Vec<String> {
346        let mut seen: HashSet<String> = HashSet::new();
347        let mut out: Vec<String> = Vec::new();
348        for u in &self.cfg.urls {
349            let n = normalize_url(u);
350            if seen.insert(n) {
351                out.push(u.clone());
352            }
353        }
354        if let Some(sm) = &self.cfg.sitemap {
355            if let Some((status, _, body)) = self.request(client, sm, None).await {
356                if status.is_success() {
357                    for u in parse_sitemap(&body) {
358                        let n = normalize_url(&u);
359                        if seen.insert(n) {
360                            out.push(u);
361                        }
362                    }
363                } else {
364                    tracing::warn!(target: "chunkshop::http", "sitemap fetch {sm} status {status}");
365                }
366            } else {
367                tracing::warn!(target: "chunkshop::http", "sitemap fetch {sm} failed");
368            }
369        }
370        out
371    }
372
373    async fn crawl(&self, cursor: &BTreeMap<String, HttpUrlCursor>) -> Result<Vec<Document>> {
374        let client = self.build_client()?;
375        let seeds = self.seed_urls(&client).await;
376        let seed_hosts: HashSet<String> = seeds
377            .iter()
378            .filter_map(|s| reqwest::Url::parse(s).ok())
379            .filter_map(|u| u.host_str().map(|h| h.to_lowercase()))
380            .collect();
381
382        let mut visited: HashSet<String> = HashSet::new();
383        let mut frontier: VecDeque<(String, u32)> = VecDeque::new();
384        for s in &seeds {
385            frontier.push_back((s.clone(), self.cfg.crawl_depth));
386        }
387        let mut emitted: u64 = 0;
388        let mut out: Vec<Document> = Vec::new();
389
390        while let Some((url, depth_left)) = frontier.pop_front() {
391            if emitted >= self.cfg.max_pages {
392                break;
393            }
394            let norm = normalize_url(&url);
395            if !visited.insert(norm.clone()) {
396                continue;
397            }
398            if !self.cfg.allow_external {
399                if let Some(host) = reqwest::Url::parse(&url)
400                    .ok()
401                    .and_then(|u| u.host_str().map(|h| h.to_lowercase()))
402                {
403                    if !seed_hosts.contains(&host) {
404                        continue;
405                    }
406                }
407            }
408
409            let robots = self.robots_for(&client, &url).await;
410            let ce = cursor.get(&url).cloned();
411            let (doc, links) = self.fetch_one(&client, &url, ce.as_ref(), &robots).await;
412            if let Some(d) = doc {
413                emitted += 1;
414                out.push(d);
415                if emitted >= self.cfg.max_pages {
416                    break;
417                }
418            }
419            if depth_left > 0 && !links.is_empty() {
420                for link in links {
421                    let ln = normalize_url(&link);
422                    if visited.contains(&ln) {
423                        continue;
424                    }
425                    if !self.cfg.allow_external {
426                        if let (Some(lh), Some(uh)) = (host_of(&link), host_of(&url)) {
427                            if !seed_hosts.contains(&lh) && lh != uh {
428                                continue;
429                            }
430                        }
431                    }
432                    frontier.push_back((link, depth_left - 1));
433                }
434            }
435        }
436        Ok(out)
437    }
438
439    pub async fn iter_documents(&self) -> Result<Vec<Document>> {
440        let empty = BTreeMap::new();
441        self.crawl(&empty).await
442    }
443}
444
445impl IncrementalSource for HttpSource {
446    type Cursor = BTreeMap<String, HttpUrlCursor>;
447
448    fn empty_cursor(&self) -> Self::Cursor {
449        BTreeMap::new()
450    }
451
452    fn iter_changes_since(
453        &self,
454        cursor: &Self::Cursor,
455    ) -> impl Future<Output = Result<Vec<Document>>> + Send {
456        let cursor = cursor.clone();
457        async move { self.crawl(&cursor).await }
458    }
459
460    fn cursor_from(&self, last_document: &Document) -> Self::Cursor {
461        let url = last_document
462            .metadata
463            .get("url")
464            .and_then(|v| v.as_str())
465            .map(|s| s.to_string())
466            .unwrap_or_else(|| last_document.id.clone());
467        let etag = last_document.fingerprint.clone();
468        let last_modified = last_document
469            .metadata
470            .get("last_modified")
471            .and_then(|v| v.as_str())
472            .map(|s| s.to_string());
473        let mut delta = BTreeMap::new();
474        delta.insert(
475            url,
476            HttpUrlCursor {
477                etag,
478                last_modified,
479            },
480        );
481        delta
482    }
483}
484
485// ----- helpers -----------------------------------------------------------
486
487fn normalize_url(url: &str) -> String {
488    match reqwest::Url::parse(url) {
489        Ok(mut u) => {
490            u.set_fragment(None);
491            if u.path().is_empty() {
492                u.set_path("/");
493            }
494            let scheme = u.scheme().to_lowercase();
495            let host = u.host_str().map(|s| s.to_lowercase()).unwrap_or_default();
496            let port = u.port().map(|p| format!(":{p}")).unwrap_or_default();
497            let path = u.path();
498            let query = u.query().map(|q| format!("?{q}")).unwrap_or_default();
499            format!("{scheme}://{host}{port}{path}{query}")
500        }
501        Err(_) => url.to_string(),
502    }
503}
504
505fn host_of(url: &str) -> Option<String> {
506    reqwest::Url::parse(url)
507        .ok()
508        .and_then(|u| u.host_str().map(|h| h.to_lowercase()))
509}
510
511fn content_type_root(ctype: &str) -> String {
512    ctype.split(';').next().unwrap_or("").trim().to_lowercase()
513}
514
515fn extract_title(body: &str) -> Option<String> {
516    let re = Regex::new(r"(?is)<title[^>]*>(.*?)</title>").ok()?;
517    let captures = re.captures(body)?;
518    let raw = captures.get(1)?.as_str().trim();
519    if raw.is_empty() {
520        None
521    } else {
522        Some(raw.to_string())
523    }
524}
525
526fn strip_html_to_text(body: &str) -> String {
527    let no_script = Regex::new(r"(?is)<script[^>]*>.*?</script>")
528        .map(|r| r.replace_all(body, "").to_string())
529        .unwrap_or_else(|_| body.to_string());
530    let no_style = Regex::new(r"(?is)<style[^>]*>.*?</style>")
531        .map(|r| r.replace_all(&no_script, "").to_string())
532        .unwrap_or(no_script);
533    let no_tags = Regex::new(r"(?is)<[^>]+>")
534        .map(|r| r.replace_all(&no_style, " ").to_string())
535        .unwrap_or(no_style);
536    no_tags.split_whitespace().collect::<Vec<_>>().join(" ")
537}
538
539fn extract_links(body: &str, page_url: &str) -> Vec<String> {
540    let re = match Regex::new(r#"(?is)<a\b[^>]*\bhref\s*=\s*["']([^"']+)["']"#) {
541        Ok(r) => r,
542        Err(_) => return Vec::new(),
543    };
544    let base = match reqwest::Url::parse(page_url) {
545        Ok(b) => b,
546        Err(_) => return Vec::new(),
547    };
548    let mut out = Vec::new();
549    for caps in re.captures_iter(body) {
550        let href = caps.get(1).map(|m| m.as_str().trim()).unwrap_or("");
551        if href.is_empty() {
552            continue;
553        }
554        let lower = href.to_lowercase();
555        if lower.starts_with("mailto:")
556            || lower.starts_with("javascript:")
557            || lower.starts_with("tel:")
558            || lower.starts_with('#')
559        {
560            continue;
561        }
562        if let Ok(resolved) = base.join(href) {
563            out.push(resolved.to_string());
564        }
565    }
566    out
567}
568
569fn parse_sitemap(body: &str) -> Vec<String> {
570    let re = match Regex::new(r"(?is)<loc>(.*?)</loc>") {
571        Ok(r) => r,
572        Err(_) => return Vec::new(),
573    };
574    re.captures_iter(body)
575        .filter_map(|c| c.get(1).map(|m| m.as_str().trim().to_string()))
576        .filter(|s| !s.is_empty())
577        .collect()
578}