1use anyhow::{Context, Result};
18use regex::Regex;
19use reqwest::header::{HeaderMap, HeaderValue, CONTENT_TYPE, ETAG, LAST_MODIFIED};
20use reqwest::StatusCode;
21use serde::{Deserialize, Serialize};
22use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
23use std::future::Future;
24use std::sync::Mutex;
25use std::time::{Duration, Instant};
26
27use crate::config::HttpSourceConfig;
28use crate::sources::base::{Document, IncrementalSource};
29
30#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
35pub struct HttpUrlCursor {
36 #[serde(default, skip_serializing_if = "Option::is_none")]
37 pub etag: Option<String>,
38 #[serde(default, skip_serializing_if = "Option::is_none")]
39 pub last_modified: Option<String>,
40}
41
42const TEXTY_MIMES: &[&str] = &[
45 "text/html",
46 "text/plain",
47 "text/markdown",
48 "application/json",
49 "application/xml",
50 "text/xml",
51];
52
53#[derive(Debug, Default, Clone)]
55struct RobotsRules {
56 disallow: Vec<String>,
57 allow: Vec<String>,
58}
59
60impl RobotsRules {
61 fn parse(body: &str, ua: &str) -> Self {
62 let mut groups: HashMap<String, RobotsRules> = HashMap::new();
63 let mut current_uas: Vec<String> = Vec::new();
64 for raw_line in body.lines() {
65 let line = raw_line.trim();
66 if line.is_empty() || line.starts_with('#') {
67 continue;
68 }
69 let (key, value) = match line.split_once(':') {
70 Some((k, v)) => (k.trim().to_lowercase(), v.trim().to_string()),
71 None => continue,
72 };
73 let value = value.split('#').next().unwrap_or("").trim().to_string();
74 match key.as_str() {
75 "user-agent" => {
76 current_uas.push(value.to_lowercase());
77 }
78 "disallow" => {
79 for u in ¤t_uas {
80 groups
81 .entry(u.clone())
82 .or_default()
83 .disallow
84 .push(value.clone());
85 }
86 }
87 "allow" => {
88 for u in ¤t_uas {
89 groups
90 .entry(u.clone())
91 .or_default()
92 .allow
93 .push(value.clone());
94 }
95 }
96 _ => {}
97 }
98 }
99 let ua_lower = ua.to_lowercase();
100 let mut best: Option<&RobotsRules> = None;
101 for (rules_ua, rules) in &groups {
102 if rules_ua == "*" {
103 best.get_or_insert(rules);
104 } else if ua_lower.contains(rules_ua.as_str()) {
105 best = Some(rules);
106 }
107 }
108 best.cloned().unwrap_or_default()
109 }
110
111 fn can_fetch(&self, path: &str) -> bool {
112 let allow_match = self
113 .allow
114 .iter()
115 .filter(|p| !p.is_empty() && path.starts_with(p.as_str()))
116 .map(|p| p.len())
117 .max()
118 .unwrap_or(0);
119 let disallow_match = self
120 .disallow
121 .iter()
122 .filter(|p| !p.is_empty() && path.starts_with(p.as_str()))
123 .map(|p| p.len())
124 .max()
125 .unwrap_or(0);
126 if disallow_match == 0 {
127 return true;
128 }
129 allow_match >= disallow_match
130 }
131}
132
133pub struct HttpSource {
134 cfg: HttpSourceConfig,
135 last_request_ts: Mutex<Option<Instant>>,
136 robots_cache: Mutex<HashMap<String, Option<RobotsRules>>>,
137}
138
139impl HttpSource {
140 pub fn new(cfg: HttpSourceConfig) -> Self {
141 Self {
142 cfg,
143 last_request_ts: Mutex::new(None),
144 robots_cache: Mutex::new(HashMap::new()),
145 }
146 }
147
148 fn build_client(&self) -> Result<reqwest::Client> {
149 let mut headers = HeaderMap::new();
150 if let Ok(ua) = HeaderValue::from_str(&self.cfg.user_agent) {
151 headers.insert(reqwest::header::USER_AGENT, ua);
152 }
153 reqwest::Client::builder()
154 .default_headers(headers)
155 .timeout(Duration::from_secs(30))
156 .redirect(reqwest::redirect::Policy::default())
157 .build()
158 .context("build reqwest client")
159 }
160
161 async fn polite_wait(&self) {
162 let delay = self.cfg.request_delay_seconds;
163 if delay <= 0.0 {
164 return;
165 }
166 let wait = {
167 let mut guard = self.last_request_ts.lock().unwrap();
168 let now = Instant::now();
169 let wait = match *guard {
170 Some(last) => {
171 let elapsed = now.duration_since(last).as_secs_f64();
172 if elapsed < delay {
173 Some(Duration::from_secs_f64(delay - elapsed))
174 } else {
175 None
176 }
177 }
178 None => None,
179 };
180 *guard = Some(Instant::now());
181 wait
182 };
183 if let Some(d) = wait {
184 tokio::time::sleep(d).await;
185 *self.last_request_ts.lock().unwrap() = Some(Instant::now());
186 }
187 }
188
189 async fn robots_for(&self, client: &reqwest::Client, url: &str) -> Option<RobotsRules> {
190 if !self.cfg.respect_robots {
191 return None;
192 }
193 let parsed = reqwest::Url::parse(url).ok()?;
194 let host = parsed.host_str()?;
195 let port_part = parsed.port().map(|p| format!(":{p}")).unwrap_or_default();
196 let host_key = format!("{}://{}{}", parsed.scheme(), host, port_part);
197
198 {
199 let cache = self.robots_cache.lock().unwrap();
200 if let Some(cached) = cache.get(&host_key) {
201 return cached.clone();
202 }
203 }
204
205 let robots_url = format!("{host_key}/robots.txt");
206 let fetched = self.fetch_robots_body(client, &robots_url).await;
207 let parsed_rules = fetched.map(|body| RobotsRules::parse(&body, &self.cfg.user_agent));
208
209 self.robots_cache
210 .lock()
211 .unwrap()
212 .insert(host_key, parsed_rules.clone());
213 parsed_rules
214 }
215
216 async fn fetch_robots_body(&self, client: &reqwest::Client, url: &str) -> Option<String> {
217 self.polite_wait().await;
218 let resp = client.get(url).send().await.ok()?;
219 if !resp.status().is_success() {
220 return None;
221 }
222 resp.text().await.ok()
223 }
224
225 fn robots_allows(rules: &Option<RobotsRules>, url: &str) -> bool {
226 match rules {
227 None => true,
228 Some(r) => match reqwest::Url::parse(url) {
229 Ok(u) => r.can_fetch(u.path()),
230 Err(_) => true,
231 },
232 }
233 }
234
235 async fn request(
236 &self,
237 client: &reqwest::Client,
238 url: &str,
239 cursor_entry: Option<&HttpUrlCursor>,
240 ) -> Option<(StatusCode, HeaderMap, String)> {
241 self.polite_wait().await;
242 let mut req = client.get(url);
243 if let Some(ce) = cursor_entry {
244 if let Some(etag) = &ce.etag {
245 if let Ok(v) = HeaderValue::from_str(etag) {
246 req = req.header(reqwest::header::IF_NONE_MATCH, v);
247 }
248 }
249 if let Some(lm) = &ce.last_modified {
250 if let Ok(v) = HeaderValue::from_str(lm) {
251 req = req.header(reqwest::header::IF_MODIFIED_SINCE, v);
252 }
253 }
254 }
255 let resp = match req.send().await {
256 Ok(r) => r,
257 Err(err) => {
258 tracing::warn!(target: "chunkshop::http", "GET {url} failed: {err}");
259 return None;
260 }
261 };
262 let status = resp.status();
263 let headers = resp.headers().clone();
264 let body = match resp.text().await {
265 Ok(t) => t,
266 Err(err) => {
267 tracing::warn!(target: "chunkshop::http", "reading body of {url}: {err}");
268 return None;
269 }
270 };
271 Some((status, headers, body))
272 }
273
274 async fn fetch_one(
275 &self,
276 client: &reqwest::Client,
277 url: &str,
278 cursor_entry: Option<&HttpUrlCursor>,
279 robots: &Option<RobotsRules>,
280 ) -> (Option<Document>, Vec<String>) {
281 if !Self::robots_allows(robots, url) {
282 tracing::info!(target: "chunkshop::http", "robots.txt disallows {url}; skipping");
283 return (None, Vec::new());
284 }
285 let (status, headers, body) = match self.request(client, url, cursor_entry).await {
286 Some(t) => t,
287 None => return (None, Vec::new()),
288 };
289 if status == StatusCode::NOT_MODIFIED {
290 return (None, Vec::new());
291 }
292 if !status.is_success() {
293 tracing::warn!(target: "chunkshop::http", "GET {url}: status {status}; skipping");
294 return (None, Vec::new());
295 }
296 let ctype_full = headers
297 .get(CONTENT_TYPE)
298 .and_then(|v| v.to_str().ok())
299 .unwrap_or("")
300 .to_string();
301 let mime = content_type_root(&ctype_full);
302 let etag = headers
303 .get(ETAG)
304 .and_then(|v| v.to_str().ok())
305 .map(|s| s.to_string());
306 let last_modified = headers
307 .get(LAST_MODIFIED)
308 .and_then(|v| v.to_str().ok())
309 .map(|s| s.to_string());
310
311 if !mime.starts_with("text/") && !TEXTY_MIMES.contains(&mime.as_str()) {
312 tracing::warn!(
313 target: "chunkshop::http",
314 "Skipping binary content {mime} for {url} (use the files source for binaries)"
315 );
316 return (None, Vec::new());
317 }
318
319 let (content, title, links) = if mime == "text/html" {
320 let title = extract_title(&body);
321 let links = extract_links(&body, url);
322 let stripped = strip_html_to_text(&body);
323 (stripped, title, links)
324 } else {
325 (body, None, Vec::new())
326 };
327
328 let metadata = serde_json::json!({
329 "url": url,
330 "status_code": status.as_u16(),
331 "content_type": ctype_full,
332 "etag": etag,
333 "last_modified": last_modified,
334 });
335 let doc = Document {
336 id: url.to_string(),
337 content,
338 title,
339 metadata,
340 fingerprint: etag,
341 };
342 (Some(doc), links)
343 }
344
345 async fn seed_urls(&self, client: &reqwest::Client) -> Vec<String> {
346 let mut seen: HashSet<String> = HashSet::new();
347 let mut out: Vec<String> = Vec::new();
348 for u in &self.cfg.urls {
349 let n = normalize_url(u);
350 if seen.insert(n) {
351 out.push(u.clone());
352 }
353 }
354 if let Some(sm) = &self.cfg.sitemap {
355 if let Some((status, _, body)) = self.request(client, sm, None).await {
356 if status.is_success() {
357 for u in parse_sitemap(&body) {
358 let n = normalize_url(&u);
359 if seen.insert(n) {
360 out.push(u);
361 }
362 }
363 } else {
364 tracing::warn!(target: "chunkshop::http", "sitemap fetch {sm} status {status}");
365 }
366 } else {
367 tracing::warn!(target: "chunkshop::http", "sitemap fetch {sm} failed");
368 }
369 }
370 out
371 }
372
373 async fn crawl(&self, cursor: &BTreeMap<String, HttpUrlCursor>) -> Result<Vec<Document>> {
374 let client = self.build_client()?;
375 let seeds = self.seed_urls(&client).await;
376 let seed_hosts: HashSet<String> = seeds
377 .iter()
378 .filter_map(|s| reqwest::Url::parse(s).ok())
379 .filter_map(|u| u.host_str().map(|h| h.to_lowercase()))
380 .collect();
381
382 let mut visited: HashSet<String> = HashSet::new();
383 let mut frontier: VecDeque<(String, u32)> = VecDeque::new();
384 for s in &seeds {
385 frontier.push_back((s.clone(), self.cfg.crawl_depth));
386 }
387 let mut emitted: u64 = 0;
388 let mut out: Vec<Document> = Vec::new();
389
390 while let Some((url, depth_left)) = frontier.pop_front() {
391 if emitted >= self.cfg.max_pages {
392 break;
393 }
394 let norm = normalize_url(&url);
395 if !visited.insert(norm.clone()) {
396 continue;
397 }
398 if !self.cfg.allow_external {
399 if let Some(host) = reqwest::Url::parse(&url)
400 .ok()
401 .and_then(|u| u.host_str().map(|h| h.to_lowercase()))
402 {
403 if !seed_hosts.contains(&host) {
404 continue;
405 }
406 }
407 }
408
409 let robots = self.robots_for(&client, &url).await;
410 let ce = cursor.get(&url).cloned();
411 let (doc, links) = self.fetch_one(&client, &url, ce.as_ref(), &robots).await;
412 if let Some(d) = doc {
413 emitted += 1;
414 out.push(d);
415 if emitted >= self.cfg.max_pages {
416 break;
417 }
418 }
419 if depth_left > 0 && !links.is_empty() {
420 for link in links {
421 let ln = normalize_url(&link);
422 if visited.contains(&ln) {
423 continue;
424 }
425 if !self.cfg.allow_external {
426 if let (Some(lh), Some(uh)) = (host_of(&link), host_of(&url)) {
427 if !seed_hosts.contains(&lh) && lh != uh {
428 continue;
429 }
430 }
431 }
432 frontier.push_back((link, depth_left - 1));
433 }
434 }
435 }
436 Ok(out)
437 }
438
439 pub async fn iter_documents(&self) -> Result<Vec<Document>> {
440 let empty = BTreeMap::new();
441 self.crawl(&empty).await
442 }
443}
444
445impl IncrementalSource for HttpSource {
446 type Cursor = BTreeMap<String, HttpUrlCursor>;
447
448 fn empty_cursor(&self) -> Self::Cursor {
449 BTreeMap::new()
450 }
451
452 fn iter_changes_since(
453 &self,
454 cursor: &Self::Cursor,
455 ) -> impl Future<Output = Result<Vec<Document>>> + Send {
456 let cursor = cursor.clone();
457 async move { self.crawl(&cursor).await }
458 }
459
460 fn cursor_from(&self, last_document: &Document) -> Self::Cursor {
461 let url = last_document
462 .metadata
463 .get("url")
464 .and_then(|v| v.as_str())
465 .map(|s| s.to_string())
466 .unwrap_or_else(|| last_document.id.clone());
467 let etag = last_document.fingerprint.clone();
468 let last_modified = last_document
469 .metadata
470 .get("last_modified")
471 .and_then(|v| v.as_str())
472 .map(|s| s.to_string());
473 let mut delta = BTreeMap::new();
474 delta.insert(
475 url,
476 HttpUrlCursor {
477 etag,
478 last_modified,
479 },
480 );
481 delta
482 }
483}
484
485fn normalize_url(url: &str) -> String {
488 match reqwest::Url::parse(url) {
489 Ok(mut u) => {
490 u.set_fragment(None);
491 if u.path().is_empty() {
492 u.set_path("/");
493 }
494 let scheme = u.scheme().to_lowercase();
495 let host = u.host_str().map(|s| s.to_lowercase()).unwrap_or_default();
496 let port = u.port().map(|p| format!(":{p}")).unwrap_or_default();
497 let path = u.path();
498 let query = u.query().map(|q| format!("?{q}")).unwrap_or_default();
499 format!("{scheme}://{host}{port}{path}{query}")
500 }
501 Err(_) => url.to_string(),
502 }
503}
504
505fn host_of(url: &str) -> Option<String> {
506 reqwest::Url::parse(url)
507 .ok()
508 .and_then(|u| u.host_str().map(|h| h.to_lowercase()))
509}
510
511fn content_type_root(ctype: &str) -> String {
512 ctype.split(';').next().unwrap_or("").trim().to_lowercase()
513}
514
515fn extract_title(body: &str) -> Option<String> {
516 let re = Regex::new(r"(?is)<title[^>]*>(.*?)</title>").ok()?;
517 let captures = re.captures(body)?;
518 let raw = captures.get(1)?.as_str().trim();
519 if raw.is_empty() {
520 None
521 } else {
522 Some(raw.to_string())
523 }
524}
525
526fn strip_html_to_text(body: &str) -> String {
527 let no_script = Regex::new(r"(?is)<script[^>]*>.*?</script>")
528 .map(|r| r.replace_all(body, "").to_string())
529 .unwrap_or_else(|_| body.to_string());
530 let no_style = Regex::new(r"(?is)<style[^>]*>.*?</style>")
531 .map(|r| r.replace_all(&no_script, "").to_string())
532 .unwrap_or(no_script);
533 let no_tags = Regex::new(r"(?is)<[^>]+>")
534 .map(|r| r.replace_all(&no_style, " ").to_string())
535 .unwrap_or(no_style);
536 no_tags.split_whitespace().collect::<Vec<_>>().join(" ")
537}
538
539fn extract_links(body: &str, page_url: &str) -> Vec<String> {
540 let re = match Regex::new(r#"(?is)<a\b[^>]*\bhref\s*=\s*["']([^"']+)["']"#) {
541 Ok(r) => r,
542 Err(_) => return Vec::new(),
543 };
544 let base = match reqwest::Url::parse(page_url) {
545 Ok(b) => b,
546 Err(_) => return Vec::new(),
547 };
548 let mut out = Vec::new();
549 for caps in re.captures_iter(body) {
550 let href = caps.get(1).map(|m| m.as_str().trim()).unwrap_or("");
551 if href.is_empty() {
552 continue;
553 }
554 let lower = href.to_lowercase();
555 if lower.starts_with("mailto:")
556 || lower.starts_with("javascript:")
557 || lower.starts_with("tel:")
558 || lower.starts_with('#')
559 {
560 continue;
561 }
562 if let Ok(resolved) = base.join(href) {
563 out.push(resolved.to_string());
564 }
565 }
566 out
567}
568
569fn parse_sitemap(body: &str) -> Vec<String> {
570 let re = match Regex::new(r"(?is)<loc>(.*?)</loc>") {
571 Ok(r) => r,
572 Err(_) => return Vec::new(),
573 };
574 re.captures_iter(body)
575 .filter_map(|c| c.get(1).map(|m| m.as_str().trim().to_string()))
576 .filter(|s| !s.is_empty())
577 .collect()
578}