use std::sync::Arc;
use url::Url;
use crate::crawl::{CanonicalResolver, ContentDedup, CrawlEntry, Frontier, UrlDedup, UrlNormalizer};
use crate::fetch::{FetchResponse, HttpClient};
use crate::observe::{MetricsCollector, StructuredLogger};
use crate::parse::{
AudioExtractor, HtmlParser, ImageExtractor, JsonLdExtractor, LinkExtractor,
MetadataExtractor, OpenGraphExtractor, TextExtractor, VideoExtractor,
};
use crate::politeness::{DomainThrottler, RobotsChecker};
use crate::render::{BrowserPool, RenderChecker, RenderDecision};
use crate::security::{DomainAllowlist, IpBlocker, ResourceLimits};
use crate::storage::{NormalizedStore, RawSnapshot, SnapshotStore};
use crate::types::document::RobotsDirectives;
use crate::types::error::Result;
use crate::types::{Config, Document, Error};
#[derive(Debug)]
pub struct ScrapeResult {
pub document: Document,
pub is_duplicate: bool,
pub js_rendered: bool,
pub canonical_url: Option<Url>,
pub discovered_links: Vec<Url>,
}
pub struct Orchestrator {
config: Arc<Config>,
client: HttpClient,
normalizer: UrlNormalizer,
frontier: Frontier,
url_dedup: UrlDedup,
content_dedup: ContentDedup,
canonical_resolver: CanonicalResolver,
robots_checker: RobotsChecker,
throttler: DomainThrottler,
render_checker: RenderChecker,
browser_pool: BrowserPool,
allowlist: DomainAllowlist,
ip_blocker: IpBlocker,
limits: ResourceLimits,
snapshot_store: SnapshotStore,
document_store: NormalizedStore,
metrics: MetricsCollector,
logger: StructuredLogger,
}
impl Orchestrator {
pub fn new(config: Config) -> Result<Self> {
let client = HttpClient::new(config.clone())?;
let frontier = Frontier::new(
config.crawl.strategy.clone(),
config.crawl.max_depth,
config.crawl.max_urls_per_domain,
config.crawl.max_urls_total,
);
let throttler = DomainThrottler::new(
config.politeness.default_delay_ms,
config.politeness.max_concurrent_per_domain as usize,
config.politeness.max_concurrent_total as usize,
config.politeness.adaptive_delay,
config.politeness.rate_limit_pause_ms,
);
let robots_checker = RobotsChecker::new(
&config.fetch.user_agent,
config.politeness.robots_cache_ttl_secs,
);
let logger = StructuredLogger::new();
Ok(Self {
config: Arc::new(config),
client,
normalizer: UrlNormalizer::default(),
frontier,
url_dedup: UrlDedup::new(),
content_dedup: ContentDedup::default(),
canonical_resolver: CanonicalResolver::default(),
robots_checker,
throttler,
render_checker: RenderChecker::default(),
browser_pool: BrowserPool::default(),
allowlist: DomainAllowlist::new(),
ip_blocker: IpBlocker::default(),
limits: ResourceLimits::default(),
snapshot_store: SnapshotStore::default(),
document_store: NormalizedStore::default(),
metrics: MetricsCollector::new(),
logger,
})
}
pub fn with_browser_pool(mut self, pool: BrowserPool) -> Self {
self.browser_pool = pool;
self
}
pub async fn scrape(&self, url: &Url) -> Result<ScrapeResult> {
let start = std::time::Instant::now();
let normalized_url = self.normalizer.normalize(url);
self.logger.log_request_url(&normalized_url, "GET");
if !self.url_dedup.check_and_mark(normalized_url.as_str()) {
self.logger.log_info(&format!("URL already seen: {}", normalized_url));
return Err(Error::DuplicateUrl(normalized_url.to_string()));
}
self.validate_url(&normalized_url)?;
if self.config.politeness.respect_robots_txt {
self.check_robots(&normalized_url).await?;
}
let crawl_delay = self.robots_checker.get_crawl_delay(&normalized_url);
self.throttler.acquire(&normalized_url, crawl_delay).await;
self.metrics
.record_request(normalized_url.host_str().unwrap_or("unknown"));
let response = self.fetch(&normalized_url).await?;
let duration_ms = start.elapsed().as_millis() as u64;
self.throttler
.release(&normalized_url, duration_ms, response.is_rate_limited());
if response.is_success() {
self.metrics.record_success(
normalized_url.host_str().unwrap_or("unknown"),
response.body_size(),
duration_ms,
);
} else {
self.metrics
.record_failure(normalized_url.host_str().unwrap_or("unknown"), duration_ms);
return Err(Error::Http(response.status.as_u16()));
}
self.limits.check_response_size(response.body_size())?;
let snapshot = RawSnapshot::from_response(
normalized_url.clone(),
response.status.as_u16(),
response.headers_map(),
response.body.to_vec(),
);
self.snapshot_store.store(snapshot);
let html = response
.text()
.map_err(|_| Error::Parse("Invalid UTF-8".to_string()))?;
let is_duplicate = !self.content_dedup.check_and_mark(&html);
if is_duplicate && self.config.crawl.dedup_content {
self.logger
.log_info(&format!("Content duplicate: {}", normalized_url));
}
let render_decision = self.render_checker.check(&html);
let (final_html, js_rendered) = if render_decision != RenderDecision::Static {
match self
.browser_pool
.render(&normalized_url, None)
.await
{
Ok(browser_response) => (browser_response.html, true),
Err(_) => {
self.logger.log_warn(&format!(
"JS rendering needed but browser unavailable: {}",
normalized_url
));
(html, false)
}
}
} else {
(html, false)
};
let mut document = self
.process_response(&normalized_url, &response, &final_html)
.await?;
let canonical_url = if self.config.crawl.respect_canonicals {
self.canonical_resolver
.resolve_from_html(&final_html, &response.final_url)
} else {
None
};
if let Some(ref canonical) = canonical_url {
document.canonical_url = Some(canonical.clone());
self.url_dedup.mark_seen(canonical.as_str());
}
let discovered_links: Vec<Url> = document
.out_links
.iter()
.filter_map(|link| {
let url = self.normalizer.normalize(&link.url);
if self.url_dedup.is_duplicate(url.as_str()) {
None
} else if !self.allowlist.is_allowed(&url) {
None
} else {
Some(url)
}
})
.collect();
self.document_store.store(document.clone());
self.metrics.record_document();
self.logger.log_response_parts(
&normalized_url,
response.status.as_u16(),
response.body_size(),
duration_ms,
);
Ok(ScrapeResult {
document,
is_duplicate,
js_rendered,
canonical_url,
discovered_links,
})
}
pub async fn crawl(&self, seeds: Vec<Url>, max_pages: Option<usize>) -> Result<Vec<Document>> {
for seed in seeds {
let normalized = self.normalizer.normalize(&seed);
self.frontier.push(CrawlEntry::new(normalized, 0, 100));
}
let mut documents = Vec::new();
let max = max_pages.unwrap_or(usize::MAX);
while let Some(entry) = self.frontier.pop() {
if documents.len() >= max {
break;
}
match self.scrape(&entry.url).await {
Ok(result) => {
for link in result.discovered_links {
self.frontier.push(CrawlEntry::new(
link,
entry.depth + 1,
50, ));
}
documents.push(result.document);
}
Err(e) => {
self.logger.log_error_parts(&entry.url, &e.to_string(), true);
}
}
}
Ok(documents)
}
fn validate_url(&self, url: &Url) -> Result<()> {
if !url.scheme().starts_with("http") {
return Err(Error::Config(format!(
"Unsupported scheme: {}",
url.scheme()
)));
}
if !self.allowlist.is_allowed(url) {
return Err(Error::DomainNotAllowed(
url.host_str().unwrap_or("").to_string(),
));
}
if self.ip_blocker.is_url_hostname_blocked(url) {
return Err(Error::IpBlocked(
url.host_str().unwrap_or("").to_string(),
));
}
Ok(())
}
async fn check_robots(&self, url: &Url) -> Result<()> {
if self.robots_checker.cache().get(url).is_none() {
if let Some(robots_url) = RobotsChecker::robots_url(url) {
if let Ok(response) = self.fetch(&robots_url).await {
if response.is_success() {
if let Ok(text) = response.text() {
self.robots_checker.cache_robots(url, &text);
}
}
}
}
}
if !self.robots_checker.is_allowed(url, None) {
return Err(Error::RobotsBlocked(url.to_string()));
}
Ok(())
}
async fn fetch(&self, url: &Url) -> Result<FetchResponse> {
let request = crate::fetch::RequestBuilder::new(url.clone()).with_config(&self.config);
let (url, headers) = request.build();
let response = self
.client
.client()
.get(url.as_str())
.headers(headers)
.send()
.await?;
let status = response.status();
let headers = response.headers().clone();
let final_url = response.url().clone();
let body = response.bytes().await?;
Ok(FetchResponse::new(final_url, status, headers, body))
}
async fn process_response(
&self,
source_url: &Url,
response: &FetchResponse,
html: &str,
) -> Result<Document> {
let parse_start = std::time::Instant::now();
let mut document = Document::new(source_url.clone(), response.final_url.clone());
document.status_code = response.status.as_u16();
document.content_type = response.content_type();
document.content_length = Some(response.body_size());
document.provenance.response_headers = response.headers_map();
document.provenance.etag = response.etag();
document.provenance.last_modified = response.last_modified();
document.provenance.cache_control = response.cache_control();
document.provenance.content_hash =
Some(crate::crawl::dedup::ContentDedup::hash_content(html));
document.provenance.timings = response.timings.clone();
let check_time = || {
let elapsed = parse_start.elapsed().as_millis() as u64;
self.limits.check_parse_time(elapsed)
};
let _parser = HtmlParser::parse(html);
let metadata = MetadataExtractor::new().extract(html);
document.title = metadata.title;
document.language = metadata.language;
check_time()?;
let text_extractor = TextExtractor::default()
.with_chunking(self.config.parse.segment_text, self.config.parse.chunk_size);
let extracted = text_extractor.extract(html);
document.main_text = extracted.full_text;
document.provenance.text_hash = Some(crate::crawl::dedup::ContentDedup::hash_content(
&document.main_text,
));
check_time()?;
if self.config.parse.extract_links {
let link_extractor = LinkExtractor::default();
document.out_links = link_extractor.extract(html, &response.final_url);
}
check_time()?;
if self.config.parse.extract_json_ld {
let jsonld = JsonLdExtractor::new().extract(html);
document.structured_data.json_ld = jsonld;
}
if self.config.parse.extract_open_graph {
let og = OpenGraphExtractor::new().extract(html);
document.structured_data.open_graph = og;
}
check_time()?;
document.structured_data.robots_directives = self.parse_robots_directives(html, response);
if self.config.parse.extract_images {
let img_extractor = ImageExtractor::default()
.with_options(self.config.parse.resolve_lazy_loading, false);
document.assets.images = img_extractor.extract(html, &response.final_url);
}
if self.config.parse.extract_videos {
let video_extractor = VideoExtractor::new();
document.assets.videos = video_extractor.extract(html, &response.final_url);
}
if self.config.parse.extract_audios {
let audio_extractor = AudioExtractor::new();
document.assets.audios = audio_extractor.extract(html, &response.final_url);
}
Ok(document)
}
fn parse_robots_directives(&self, html: &str, response: &FetchResponse) -> RobotsDirectives {
let parser = HtmlParser::parse(html);
let meta_robots = parser.attr(r#"meta[name="robots"]"#, "content");
let x_robots_tag = response.x_robots_tag();
let combined = format!(
"{} {}",
meta_robots.as_deref().unwrap_or(""),
x_robots_tag.as_deref().unwrap_or("")
)
.to_lowercase();
RobotsDirectives {
meta_robots,
x_robots_tag,
index: !combined.contains("noindex"),
follow: !combined.contains("nofollow"),
archive: !combined.contains("noarchive"),
snippet: !combined.contains("nosnippet"),
image_index: !combined.contains("noimageindex"),
}
}
pub fn metrics(&self) -> &MetricsCollector {
&self.metrics
}
pub fn documents(&self) -> &NormalizedStore {
&self.document_store
}
pub fn snapshots(&self) -> &SnapshotStore {
&self.snapshot_store
}
pub fn frontier(&self) -> &Frontier {
&self.frontier
}
pub fn url_dedup(&self) -> &UrlDedup {
&self.url_dedup
}
pub fn content_dedup(&self) -> &ContentDedup {
&self.content_dedup
}
pub fn allowlist_mut(&mut self) -> &mut DomainAllowlist {
&mut self.allowlist
}
pub fn ip_blocker_mut(&mut self) -> &mut IpBlocker {
&mut self.ip_blocker
}
pub fn limits_mut(&mut self) -> &mut ResourceLimits {
&mut self.limits
}
pub fn config(&self) -> &Config {
&self.config
}
pub fn logger(&self) -> &StructuredLogger {
&self.logger
}
}