1use std::sync::Arc;
4use url::Url;
5
6use crate::crawl::{CanonicalResolver, ContentDedup, CrawlEntry, Frontier, UrlDedup, UrlNormalizer};
7use crate::fetch::{FetchResponse, HttpClient};
8use crate::observe::{MetricsCollector, StructuredLogger};
9use crate::parse::{
10 AudioExtractor, HtmlParser, ImageExtractor, JsonLdExtractor, LinkExtractor,
11 MetadataExtractor, OpenGraphExtractor, TextExtractor, VideoExtractor,
12};
13use crate::politeness::{DomainThrottler, RobotsChecker};
14use crate::render::{BrowserPool, RenderChecker, RenderDecision};
15use crate::security::{DomainAllowlist, IpBlocker, ResourceLimits};
16use crate::storage::{NormalizedStore, RawSnapshot, SnapshotStore};
17use crate::types::document::RobotsDirectives;
18use crate::types::error::Result;
19use crate::types::{Config, Document, Error};
20
21#[derive(Debug)]
23pub struct ScrapeResult {
24 pub document: Document,
26 pub is_duplicate: bool,
28 pub js_rendered: bool,
30 pub canonical_url: Option<Url>,
32 pub discovered_links: Vec<Url>,
34}
35
36pub struct Orchestrator {
38 config: Arc<Config>,
40 client: HttpClient,
42 normalizer: UrlNormalizer,
44 frontier: Frontier,
46 url_dedup: UrlDedup,
48 content_dedup: ContentDedup,
50 canonical_resolver: CanonicalResolver,
52 robots_checker: RobotsChecker,
54 throttler: DomainThrottler,
56 render_checker: RenderChecker,
58 browser_pool: BrowserPool,
60 allowlist: DomainAllowlist,
62 ip_blocker: IpBlocker,
64 limits: ResourceLimits,
66 snapshot_store: SnapshotStore,
68 document_store: NormalizedStore,
70 metrics: MetricsCollector,
72 logger: StructuredLogger,
74}
75
76impl Orchestrator {
77 pub fn new(config: Config) -> Result<Self> {
79 let client = HttpClient::new(config.clone())?;
80
81 let frontier = Frontier::new(
82 config.crawl.strategy.clone(),
83 config.crawl.max_depth,
84 config.crawl.max_urls_per_domain,
85 config.crawl.max_urls_total,
86 );
87
88 let throttler = DomainThrottler::new(
89 config.politeness.default_delay_ms,
90 config.politeness.max_concurrent_per_domain as usize,
91 config.politeness.max_concurrent_total as usize,
92 config.politeness.adaptive_delay,
93 config.politeness.rate_limit_pause_ms,
94 );
95
96 let robots_checker = RobotsChecker::new(
97 &config.fetch.user_agent,
98 config.politeness.robots_cache_ttl_secs,
99 );
100
101 let logger = StructuredLogger::new();
102
103 Ok(Self {
104 config: Arc::new(config),
105 client,
106 normalizer: UrlNormalizer::default(),
107 frontier,
108 url_dedup: UrlDedup::new(),
109 content_dedup: ContentDedup::default(),
110 canonical_resolver: CanonicalResolver::default(),
111 robots_checker,
112 throttler,
113 render_checker: RenderChecker::default(),
114 browser_pool: BrowserPool::default(),
115 allowlist: DomainAllowlist::new(),
116 ip_blocker: IpBlocker::default(),
117 limits: ResourceLimits::default(),
118 snapshot_store: SnapshotStore::default(),
119 document_store: NormalizedStore::default(),
120 metrics: MetricsCollector::new(),
121 logger,
122 })
123 }
124
125 pub fn with_browser_pool(mut self, pool: BrowserPool) -> Self {
127 self.browser_pool = pool;
128 self
129 }
130
131 pub async fn scrape(&self, url: &Url) -> Result<ScrapeResult> {
133 let start = std::time::Instant::now();
134
135 let normalized_url = self.normalizer.normalize(url);
137 self.logger.log_request_url(&normalized_url, "GET");
138
139 if !self.url_dedup.check_and_mark(normalized_url.as_str()) {
141 self.logger.log_info(&format!("URL already seen: {}", normalized_url));
142 return Err(Error::DuplicateUrl(normalized_url.to_string()));
143 }
144
145 self.validate_url(&normalized_url)?;
147
148 if self.config.politeness.respect_robots_txt {
150 self.check_robots(&normalized_url).await?;
151 }
152
153 let crawl_delay = self.robots_checker.get_crawl_delay(&normalized_url);
155 self.throttler.acquire(&normalized_url, crawl_delay).await;
156
157 self.metrics
159 .record_request(normalized_url.host_str().unwrap_or("unknown"));
160 let response = self.fetch(&normalized_url).await?;
161 let duration_ms = start.elapsed().as_millis() as u64;
162
163 self.throttler
165 .release(&normalized_url, duration_ms, response.is_rate_limited());
166
167 if response.is_success() {
168 self.metrics.record_success(
169 normalized_url.host_str().unwrap_or("unknown"),
170 response.body_size(),
171 duration_ms,
172 );
173 } else {
174 self.metrics
175 .record_failure(normalized_url.host_str().unwrap_or("unknown"), duration_ms);
176 return Err(Error::Http(response.status.as_u16()));
177 }
178
179 self.limits.check_response_size(response.body_size())?;
181
182 let snapshot = RawSnapshot::from_response(
184 normalized_url.clone(),
185 response.status.as_u16(),
186 response.headers_map(),
187 response.body.to_vec(),
188 );
189 self.snapshot_store.store(snapshot);
190
191 let html = response
193 .text()
194 .map_err(|_| Error::Parse("Invalid UTF-8".to_string()))?;
195
196 let is_duplicate = !self.content_dedup.check_and_mark(&html);
198 if is_duplicate && self.config.crawl.dedup_content {
199 self.logger
200 .log_info(&format!("Content duplicate: {}", normalized_url));
201 }
202
203 let render_decision = self.render_checker.check(&html);
205 let (final_html, js_rendered) = if render_decision != RenderDecision::Static {
206 match self
208 .browser_pool
209 .render(&normalized_url, None)
210 .await
211 {
212 Ok(browser_response) => (browser_response.html, true),
213 Err(_) => {
214 self.logger.log_warn(&format!(
216 "JS rendering needed but browser unavailable: {}",
217 normalized_url
218 ));
219 (html, false)
220 }
221 }
222 } else {
223 (html, false)
224 };
225
226 let mut document = self
228 .process_response(&normalized_url, &response, &final_html)
229 .await?;
230
231 let canonical_url = if self.config.crawl.respect_canonicals {
233 self.canonical_resolver
234 .resolve_from_html(&final_html, &response.final_url)
235 } else {
236 None
237 };
238
239 if let Some(ref canonical) = canonical_url {
240 document.canonical_url = Some(canonical.clone());
241 self.url_dedup.mark_seen(canonical.as_str());
243 }
244
245 let discovered_links: Vec<Url> = document
247 .out_links
248 .iter()
249 .filter_map(|link| {
250 let url = self.normalizer.normalize(&link.url);
251 if self.url_dedup.is_duplicate(url.as_str()) {
253 None
254 } else if !self.allowlist.is_allowed(&url) {
255 None
256 } else {
257 Some(url)
258 }
259 })
260 .collect();
261
262 self.document_store.store(document.clone());
264 self.metrics.record_document();
265
266 self.logger.log_response_parts(
267 &normalized_url,
268 response.status.as_u16(),
269 response.body_size(),
270 duration_ms,
271 );
272
273 Ok(ScrapeResult {
274 document,
275 is_duplicate,
276 js_rendered,
277 canonical_url,
278 discovered_links,
279 })
280 }
281
282 pub async fn crawl(&self, seeds: Vec<Url>, max_pages: Option<usize>) -> Result<Vec<Document>> {
284 for seed in seeds {
286 let normalized = self.normalizer.normalize(&seed);
287 self.frontier.push(CrawlEntry::new(normalized, 0, 100));
288 }
289
290 let mut documents = Vec::new();
291 let max = max_pages.unwrap_or(usize::MAX);
292
293 while let Some(entry) = self.frontier.pop() {
294 if documents.len() >= max {
295 break;
296 }
297
298 match self.scrape(&entry.url).await {
299 Ok(result) => {
300 for link in result.discovered_links {
302 self.frontier.push(CrawlEntry::new(
303 link,
304 entry.depth + 1,
305 50, ));
307 }
308 documents.push(result.document);
309 }
310 Err(e) => {
311 self.logger.log_error_parts(&entry.url, &e.to_string(), true);
312 }
313 }
314 }
315
316 Ok(documents)
317 }
318
319 fn validate_url(&self, url: &Url) -> Result<()> {
321 if !url.scheme().starts_with("http") {
323 return Err(Error::Config(format!(
324 "Unsupported scheme: {}",
325 url.scheme()
326 )));
327 }
328
329 if !self.allowlist.is_allowed(url) {
331 return Err(Error::DomainNotAllowed(
332 url.host_str().unwrap_or("").to_string(),
333 ));
334 }
335
336 if self.ip_blocker.is_url_hostname_blocked(url) {
338 return Err(Error::IpBlocked(
339 url.host_str().unwrap_or("").to_string(),
340 ));
341 }
342
343 Ok(())
344 }
345
346 async fn check_robots(&self, url: &Url) -> Result<()> {
348 if self.robots_checker.cache().get(url).is_none() {
350 if let Some(robots_url) = RobotsChecker::robots_url(url) {
352 if let Ok(response) = self.fetch(&robots_url).await {
353 if response.is_success() {
354 if let Ok(text) = response.text() {
355 self.robots_checker.cache_robots(url, &text);
356 }
357 }
358 }
359 }
360 }
361
362 if !self.robots_checker.is_allowed(url, None) {
363 return Err(Error::RobotsBlocked(url.to_string()));
364 }
365
366 Ok(())
367 }
368
369 async fn fetch(&self, url: &Url) -> Result<FetchResponse> {
371 let request = crate::fetch::RequestBuilder::new(url.clone()).with_config(&self.config);
372
373 let (url, headers) = request.build();
374
375 let response = self
376 .client
377 .client()
378 .get(url.as_str())
379 .headers(headers)
380 .send()
381 .await?;
382
383 let status = response.status();
384 let headers = response.headers().clone();
385 let final_url = response.url().clone();
386 let body = response.bytes().await?;
387
388 Ok(FetchResponse::new(final_url, status, headers, body))
389 }
390
391 async fn process_response(
393 &self,
394 source_url: &Url,
395 response: &FetchResponse,
396 html: &str,
397 ) -> Result<Document> {
398 let parse_start = std::time::Instant::now();
399
400 let mut document = Document::new(source_url.clone(), response.final_url.clone());
402 document.status_code = response.status.as_u16();
403 document.content_type = response.content_type();
404 document.content_length = Some(response.body_size());
405
406 document.provenance.response_headers = response.headers_map();
408 document.provenance.etag = response.etag();
409 document.provenance.last_modified = response.last_modified();
410 document.provenance.cache_control = response.cache_control();
411 document.provenance.content_hash =
412 Some(crate::crawl::dedup::ContentDedup::hash_content(html));
413 document.provenance.timings = response.timings.clone();
414
415 let check_time = || {
417 let elapsed = parse_start.elapsed().as_millis() as u64;
418 self.limits.check_parse_time(elapsed)
419 };
420
421 let _parser = HtmlParser::parse(html);
423
424 let metadata = MetadataExtractor::new().extract(html);
426 document.title = metadata.title;
427 document.language = metadata.language;
428
429 check_time()?;
430
431 let text_extractor = TextExtractor::default()
433 .with_chunking(self.config.parse.segment_text, self.config.parse.chunk_size);
434 let extracted = text_extractor.extract(html);
435 document.main_text = extracted.full_text;
436 document.provenance.text_hash = Some(crate::crawl::dedup::ContentDedup::hash_content(
437 &document.main_text,
438 ));
439
440 check_time()?;
441
442 if self.config.parse.extract_links {
444 let link_extractor = LinkExtractor::default();
445 document.out_links = link_extractor.extract(html, &response.final_url);
446 }
447
448 check_time()?;
449
450 if self.config.parse.extract_json_ld {
452 let jsonld = JsonLdExtractor::new().extract(html);
453 document.structured_data.json_ld = jsonld;
454 }
455
456 if self.config.parse.extract_open_graph {
457 let og = OpenGraphExtractor::new().extract(html);
458 document.structured_data.open_graph = og;
459 }
460
461 check_time()?;
462
463 document.structured_data.robots_directives = self.parse_robots_directives(html, response);
465
466 if self.config.parse.extract_images {
468 let img_extractor = ImageExtractor::default()
469 .with_options(self.config.parse.resolve_lazy_loading, false);
470 document.assets.images = img_extractor.extract(html, &response.final_url);
471 }
472
473 if self.config.parse.extract_videos {
474 let video_extractor = VideoExtractor::new();
475 document.assets.videos = video_extractor.extract(html, &response.final_url);
476 }
477
478 if self.config.parse.extract_audios {
479 let audio_extractor = AudioExtractor::new();
480 document.assets.audios = audio_extractor.extract(html, &response.final_url);
481 }
482
483 Ok(document)
484 }
485
486 fn parse_robots_directives(&self, html: &str, response: &FetchResponse) -> RobotsDirectives {
488 let parser = HtmlParser::parse(html);
489
490 let meta_robots = parser.attr(r#"meta[name="robots"]"#, "content");
491 let x_robots_tag = response.x_robots_tag();
492
493 let combined = format!(
494 "{} {}",
495 meta_robots.as_deref().unwrap_or(""),
496 x_robots_tag.as_deref().unwrap_or("")
497 )
498 .to_lowercase();
499
500 RobotsDirectives {
501 meta_robots,
502 x_robots_tag,
503 index: !combined.contains("noindex"),
504 follow: !combined.contains("nofollow"),
505 archive: !combined.contains("noarchive"),
506 snippet: !combined.contains("nosnippet"),
507 image_index: !combined.contains("noimageindex"),
508 }
509 }
510
511 pub fn metrics(&self) -> &MetricsCollector {
513 &self.metrics
514 }
515
516 pub fn documents(&self) -> &NormalizedStore {
518 &self.document_store
519 }
520
521 pub fn snapshots(&self) -> &SnapshotStore {
523 &self.snapshot_store
524 }
525
526 pub fn frontier(&self) -> &Frontier {
528 &self.frontier
529 }
530
531 pub fn url_dedup(&self) -> &UrlDedup {
533 &self.url_dedup
534 }
535
536 pub fn content_dedup(&self) -> &ContentDedup {
538 &self.content_dedup
539 }
540
541 pub fn allowlist_mut(&mut self) -> &mut DomainAllowlist {
543 &mut self.allowlist
544 }
545
546 pub fn ip_blocker_mut(&mut self) -> &mut IpBlocker {
548 &mut self.ip_blocker
549 }
550
551 pub fn limits_mut(&mut self) -> &mut ResourceLimits {
553 &mut self.limits
554 }
555
556 pub fn config(&self) -> &Config {
558 &self.config
559 }
560
561 pub fn logger(&self) -> &StructuredLogger {
563 &self.logger
564 }
565}