halldyll_core/
orchestrator.rs

1//! Orchestrator - Main scraper orchestration with full component integration
2
3use std::sync::Arc;
4use url::Url;
5
6use crate::crawl::{CanonicalResolver, ContentDedup, CrawlEntry, Frontier, UrlDedup, UrlNormalizer};
7use crate::fetch::{FetchResponse, HttpClient};
8use crate::observe::{MetricsCollector, StructuredLogger};
9use crate::parse::{
10    AudioExtractor, HtmlParser, ImageExtractor, JsonLdExtractor, LinkExtractor,
11    MetadataExtractor, OpenGraphExtractor, TextExtractor, VideoExtractor,
12};
13use crate::politeness::{DomainThrottler, RobotsChecker};
14use crate::render::{BrowserPool, RenderChecker, RenderDecision};
15use crate::security::{DomainAllowlist, IpBlocker, ResourceLimits};
16use crate::storage::{NormalizedStore, RawSnapshot, SnapshotStore};
17use crate::types::document::RobotsDirectives;
18use crate::types::error::Result;
19use crate::types::{Config, Document, Error};
20
21/// Scrape result with additional metadata
22#[derive(Debug)]
23pub struct ScrapeResult {
24    /// Extracted document
25    pub document: Document,
26    /// Whether content was a duplicate
27    pub is_duplicate: bool,
28    /// Whether JS rendering was used
29    pub js_rendered: bool,
30    /// Canonical URL (if different from source)
31    pub canonical_url: Option<Url>,
32    /// Links discovered (for crawling)
33    pub discovered_links: Vec<Url>,
34}
35
36/// Main orchestrator
37pub struct Orchestrator {
38    /// Configuration
39    config: Arc<Config>,
40    /// HTTP client
41    client: HttpClient,
42    /// URL normalizer
43    normalizer: UrlNormalizer,
44    /// Crawl frontier
45    frontier: Frontier,
46    /// URL dedup
47    url_dedup: UrlDedup,
48    /// Content dedup
49    content_dedup: ContentDedup,
50    /// Canonical resolver
51    canonical_resolver: CanonicalResolver,
52    /// robots.txt checker
53    robots_checker: RobotsChecker,
54    /// Domain throttler
55    throttler: DomainThrottler,
56    /// Render checker
57    render_checker: RenderChecker,
58    /// Browser pool for JS rendering
59    browser_pool: BrowserPool,
60    /// Domain allowlist
61    allowlist: DomainAllowlist,
62    /// IP blocker
63    ip_blocker: IpBlocker,
64    /// Resource limits
65    limits: ResourceLimits,
66    /// Snapshot store
67    snapshot_store: SnapshotStore,
68    /// Document store
69    document_store: NormalizedStore,
70    /// Metrics collector
71    metrics: MetricsCollector,
72    /// Logger
73    logger: StructuredLogger,
74}
75
76impl Orchestrator {
77    /// Create a new orchestrator
78    pub fn new(config: Config) -> Result<Self> {
79        let client = HttpClient::new(config.clone())?;
80
81        let frontier = Frontier::new(
82            config.crawl.strategy.clone(),
83            config.crawl.max_depth,
84            config.crawl.max_urls_per_domain,
85            config.crawl.max_urls_total,
86        );
87
88        let throttler = DomainThrottler::new(
89            config.politeness.default_delay_ms,
90            config.politeness.max_concurrent_per_domain as usize,
91            config.politeness.max_concurrent_total as usize,
92            config.politeness.adaptive_delay,
93            config.politeness.rate_limit_pause_ms,
94        );
95
96        let robots_checker = RobotsChecker::new(
97            &config.fetch.user_agent,
98            config.politeness.robots_cache_ttl_secs,
99        );
100
101        let logger = StructuredLogger::new();
102
103        Ok(Self {
104            config: Arc::new(config),
105            client,
106            normalizer: UrlNormalizer::default(),
107            frontier,
108            url_dedup: UrlDedup::new(),
109            content_dedup: ContentDedup::default(),
110            canonical_resolver: CanonicalResolver::default(),
111            robots_checker,
112            throttler,
113            render_checker: RenderChecker::default(),
114            browser_pool: BrowserPool::default(),
115            allowlist: DomainAllowlist::new(),
116            ip_blocker: IpBlocker::default(),
117            limits: ResourceLimits::default(),
118            snapshot_store: SnapshotStore::default(),
119            document_store: NormalizedStore::default(),
120            metrics: MetricsCollector::new(),
121            logger,
122        })
123    }
124
125    /// Configure the browser pool for JS rendering
126    pub fn with_browser_pool(mut self, pool: BrowserPool) -> Self {
127        self.browser_pool = pool;
128        self
129    }
130
131    /// Scrape a single URL with full pipeline
132    pub async fn scrape(&self, url: &Url) -> Result<ScrapeResult> {
133        let start = std::time::Instant::now();
134
135        // 1. Normalize URL
136        let normalized_url = self.normalizer.normalize(url);
137        self.logger.log_request_url(&normalized_url, "GET");
138
139        // 2. Check URL deduplication
140        if !self.url_dedup.check_and_mark(normalized_url.as_str()) {
141            self.logger.log_info(&format!("URL already seen: {}", normalized_url));
142            return Err(Error::DuplicateUrl(normalized_url.to_string()));
143        }
144
145        // 3. Security validation
146        self.validate_url(&normalized_url)?;
147
148        // 4. Check robots.txt
149        if self.config.politeness.respect_robots_txt {
150            self.check_robots(&normalized_url).await?;
151        }
152
153        // 5. Throttling
154        let crawl_delay = self.robots_checker.get_crawl_delay(&normalized_url);
155        self.throttler.acquire(&normalized_url, crawl_delay).await;
156
157        // 6. Fetch
158        self.metrics
159            .record_request(normalized_url.host_str().unwrap_or("unknown"));
160        let response = self.fetch(&normalized_url).await?;
161        let duration_ms = start.elapsed().as_millis() as u64;
162
163        // 7. Release throttle and record metrics
164        self.throttler
165            .release(&normalized_url, duration_ms, response.is_rate_limited());
166
167        if response.is_success() {
168            self.metrics.record_success(
169                normalized_url.host_str().unwrap_or("unknown"),
170                response.body_size(),
171                duration_ms,
172            );
173        } else {
174            self.metrics
175                .record_failure(normalized_url.host_str().unwrap_or("unknown"), duration_ms);
176            return Err(Error::Http(response.status.as_u16()));
177        }
178
179        // 8. Check resource limits
180        self.limits.check_response_size(response.body_size())?;
181
182        // 9. Store raw snapshot
183        let snapshot = RawSnapshot::from_response(
184            normalized_url.clone(),
185            response.status.as_u16(),
186            response.headers_map(),
187            response.body.to_vec(),
188        );
189        self.snapshot_store.store(snapshot);
190
191        // 10. Get HTML content
192        let html = response
193            .text()
194            .map_err(|_| Error::Parse("Invalid UTF-8".to_string()))?;
195
196        // 11. Check content deduplication
197        let is_duplicate = !self.content_dedup.check_and_mark(&html);
198        if is_duplicate && self.config.crawl.dedup_content {
199            self.logger
200                .log_info(&format!("Content duplicate: {}", normalized_url));
201        }
202
203        // 12. Check if JS rendering needed
204        let render_decision = self.render_checker.check(&html);
205        let (final_html, js_rendered) = if render_decision != RenderDecision::Static {
206            // Try browser rendering if available
207            match self
208                .browser_pool
209                .render(&normalized_url, None)
210                .await
211            {
212                Ok(browser_response) => (browser_response.html, true),
213                Err(_) => {
214                    // Fall back to static HTML
215                    self.logger.log_warn(&format!(
216                        "JS rendering needed but browser unavailable: {}",
217                        normalized_url
218                    ));
219                    (html, false)
220                }
221            }
222        } else {
223            (html, false)
224        };
225
226        // 13. Parse and extract
227        let mut document = self
228            .process_response(&normalized_url, &response, &final_html)
229            .await?;
230
231        // 14. Resolve canonical URL
232        let canonical_url = if self.config.crawl.respect_canonicals {
233            self.canonical_resolver
234                .resolve_from_html(&final_html, &response.final_url)
235        } else {
236            None
237        };
238
239        if let Some(ref canonical) = canonical_url {
240            document.canonical_url = Some(canonical.clone());
241            // Mark canonical as seen to avoid re-crawling
242            self.url_dedup.mark_seen(canonical.as_str());
243        }
244
245        // 15. Extract discovered links for crawling
246        let discovered_links: Vec<Url> = document
247            .out_links
248            .iter()
249            .filter_map(|link| {
250                let url = self.normalizer.normalize(&link.url);
251                // Only include if not seen and allowed
252                if self.url_dedup.is_duplicate(url.as_str()) {
253                    None
254                } else if !self.allowlist.is_allowed(&url) {
255                    None
256                } else {
257                    Some(url)
258                }
259            })
260            .collect();
261
262        // 16. Store normalized document
263        self.document_store.store(document.clone());
264        self.metrics.record_document();
265
266        self.logger.log_response_parts(
267            &normalized_url,
268            response.status.as_u16(),
269            response.body_size(),
270            duration_ms,
271        );
272
273        Ok(ScrapeResult {
274            document,
275            is_duplicate,
276            js_rendered,
277            canonical_url,
278            discovered_links,
279        })
280    }
281
282    /// Crawl starting from seed URLs
283    pub async fn crawl(&self, seeds: Vec<Url>, max_pages: Option<usize>) -> Result<Vec<Document>> {
284        // Add seeds to frontier
285        for seed in seeds {
286            let normalized = self.normalizer.normalize(&seed);
287            self.frontier.push(CrawlEntry::new(normalized, 0, 100));
288        }
289
290        let mut documents = Vec::new();
291        let max = max_pages.unwrap_or(usize::MAX);
292
293        while let Some(entry) = self.frontier.pop() {
294            if documents.len() >= max {
295                break;
296            }
297
298            match self.scrape(&entry.url).await {
299                Ok(result) => {
300                    // Add discovered links to frontier
301                    for link in result.discovered_links {
302                        self.frontier.push(CrawlEntry::new(
303                            link,
304                            entry.depth + 1,
305                            50, // Lower priority for deeper pages
306                        ));
307                    }
308                    documents.push(result.document);
309                }
310                Err(e) => {
311                    self.logger.log_error_parts(&entry.url, &e.to_string(), true);
312                }
313            }
314        }
315
316        Ok(documents)
317    }
318
319    /// Validate a URL before scraping
320    fn validate_url(&self, url: &Url) -> Result<()> {
321        // Check scheme
322        if !url.scheme().starts_with("http") {
323            return Err(Error::Config(format!(
324                "Unsupported scheme: {}",
325                url.scheme()
326            )));
327        }
328
329        // Check allowlist
330        if !self.allowlist.is_allowed(url) {
331            return Err(Error::DomainNotAllowed(
332                url.host_str().unwrap_or("").to_string(),
333            ));
334        }
335
336        // Check blocked IPs
337        if self.ip_blocker.is_url_hostname_blocked(url) {
338            return Err(Error::IpBlocked(
339                url.host_str().unwrap_or("").to_string(),
340            ));
341        }
342
343        Ok(())
344    }
345
346    /// Check robots.txt
347    async fn check_robots(&self, url: &Url) -> Result<()> {
348        // Check cache first
349        if self.robots_checker.cache().get(url).is_none() {
350            // Fetch robots.txt
351            if let Some(robots_url) = RobotsChecker::robots_url(url) {
352                if let Ok(response) = self.fetch(&robots_url).await {
353                    if response.is_success() {
354                        if let Ok(text) = response.text() {
355                            self.robots_checker.cache_robots(url, &text);
356                        }
357                    }
358                }
359            }
360        }
361
362        if !self.robots_checker.is_allowed(url, None) {
363            return Err(Error::RobotsBlocked(url.to_string()));
364        }
365
366        Ok(())
367    }
368
369    /// Fetch a URL
370    async fn fetch(&self, url: &Url) -> Result<FetchResponse> {
371        let request = crate::fetch::RequestBuilder::new(url.clone()).with_config(&self.config);
372
373        let (url, headers) = request.build();
374
375        let response = self
376            .client
377            .client()
378            .get(url.as_str())
379            .headers(headers)
380            .send()
381            .await?;
382
383        let status = response.status();
384        let headers = response.headers().clone();
385        let final_url = response.url().clone();
386        let body = response.bytes().await?;
387
388        Ok(FetchResponse::new(final_url, status, headers, body))
389    }
390
391    /// Process an HTTP response
392    async fn process_response(
393        &self,
394        source_url: &Url,
395        response: &FetchResponse,
396        html: &str,
397    ) -> Result<Document> {
398        let parse_start = std::time::Instant::now();
399
400        // Create base document
401        let mut document = Document::new(source_url.clone(), response.final_url.clone());
402        document.status_code = response.status.as_u16();
403        document.content_type = response.content_type();
404        document.content_length = Some(response.body_size());
405
406        // Provenance
407        document.provenance.response_headers = response.headers_map();
408        document.provenance.etag = response.etag();
409        document.provenance.last_modified = response.last_modified();
410        document.provenance.cache_control = response.cache_control();
411        document.provenance.content_hash =
412            Some(crate::crawl::dedup::ContentDedup::hash_content(html));
413        document.provenance.timings = response.timings.clone();
414
415        // Check parsing time limit
416        let check_time = || {
417            let elapsed = parse_start.elapsed().as_millis() as u64;
418            self.limits.check_parse_time(elapsed)
419        };
420
421        // Parse HTML
422        let _parser = HtmlParser::parse(html);
423
424        // Metadata
425        let metadata = MetadataExtractor::new().extract(html);
426        document.title = metadata.title;
427        document.language = metadata.language;
428
429        check_time()?;
430
431        // Main text
432        let text_extractor = TextExtractor::default()
433            .with_chunking(self.config.parse.segment_text, self.config.parse.chunk_size);
434        let extracted = text_extractor.extract(html);
435        document.main_text = extracted.full_text;
436        document.provenance.text_hash = Some(crate::crawl::dedup::ContentDedup::hash_content(
437            &document.main_text,
438        ));
439
440        check_time()?;
441
442        // Links
443        if self.config.parse.extract_links {
444            let link_extractor = LinkExtractor::default();
445            document.out_links = link_extractor.extract(html, &response.final_url);
446        }
447
448        check_time()?;
449
450        // Structured data
451        if self.config.parse.extract_json_ld {
452            let jsonld = JsonLdExtractor::new().extract(html);
453            document.structured_data.json_ld = jsonld;
454        }
455
456        if self.config.parse.extract_open_graph {
457            let og = OpenGraphExtractor::new().extract(html);
458            document.structured_data.open_graph = og;
459        }
460
461        check_time()?;
462
463        // Robots directives
464        document.structured_data.robots_directives = self.parse_robots_directives(html, response);
465
466        // Assets
467        if self.config.parse.extract_images {
468            let img_extractor = ImageExtractor::default()
469                .with_options(self.config.parse.resolve_lazy_loading, false);
470            document.assets.images = img_extractor.extract(html, &response.final_url);
471        }
472
473        if self.config.parse.extract_videos {
474            let video_extractor = VideoExtractor::new();
475            document.assets.videos = video_extractor.extract(html, &response.final_url);
476        }
477
478        if self.config.parse.extract_audios {
479            let audio_extractor = AudioExtractor::new();
480            document.assets.audios = audio_extractor.extract(html, &response.final_url);
481        }
482
483        Ok(document)
484    }
485
486    /// Parse robots directives
487    fn parse_robots_directives(&self, html: &str, response: &FetchResponse) -> RobotsDirectives {
488        let parser = HtmlParser::parse(html);
489
490        let meta_robots = parser.attr(r#"meta[name="robots"]"#, "content");
491        let x_robots_tag = response.x_robots_tag();
492
493        let combined = format!(
494            "{} {}",
495            meta_robots.as_deref().unwrap_or(""),
496            x_robots_tag.as_deref().unwrap_or("")
497        )
498        .to_lowercase();
499
500        RobotsDirectives {
501            meta_robots,
502            x_robots_tag,
503            index: !combined.contains("noindex"),
504            follow: !combined.contains("nofollow"),
505            archive: !combined.contains("noarchive"),
506            snippet: !combined.contains("nosnippet"),
507            image_index: !combined.contains("noimageindex"),
508        }
509    }
510
511    /// Access to metrics
512    pub fn metrics(&self) -> &MetricsCollector {
513        &self.metrics
514    }
515
516    /// Access to document store
517    pub fn documents(&self) -> &NormalizedStore {
518        &self.document_store
519    }
520
521    /// Access to snapshot store
522    pub fn snapshots(&self) -> &SnapshotStore {
523        &self.snapshot_store
524    }
525
526    /// Access to frontier
527    pub fn frontier(&self) -> &Frontier {
528        &self.frontier
529    }
530
531    /// Access to URL dedup
532    pub fn url_dedup(&self) -> &UrlDedup {
533        &self.url_dedup
534    }
535
536    /// Access to content dedup
537    pub fn content_dedup(&self) -> &ContentDedup {
538        &self.content_dedup
539    }
540
541    /// Configure the allowlist
542    pub fn allowlist_mut(&mut self) -> &mut DomainAllowlist {
543        &mut self.allowlist
544    }
545
546    /// Configure IP blocker
547    pub fn ip_blocker_mut(&mut self) -> &mut IpBlocker {
548        &mut self.ip_blocker
549    }
550
551    /// Configure resource limits
552    pub fn limits_mut(&mut self) -> &mut ResourceLimits {
553        &mut self.limits
554    }
555
556    /// Get configuration
557    pub fn config(&self) -> &Config {
558        &self.config
559    }
560
561    /// Get logger
562    pub fn logger(&self) -> &StructuredLogger {
563        &self.logger
564    }
565}