use std::collections::HashSet;
use std::path::PathBuf;
use std::time::Instant;
use tracing::{debug, error, info, warn};
use scrapling_fetch::Response;
use crate::cache::ResponseCacheManager;
use crate::checkpoint::{CheckpointData, CheckpointManager};
use crate::error::{Result, SpiderError};
use crate::request::{Request, SpiderOutput};
use crate::result::{CrawlStats, ItemList};
use crate::robotstxt::RobotsTxtManager;
use crate::scheduler::Scheduler;
use crate::session::{Session, SessionManager};
const BLOCKED_CODES: &[u16] = &[401, 403, 407, 429, 444, 500, 502, 503, 504];
pub trait Spider {
fn name(&self) -> &str;
fn start_urls(&self) -> Vec<String>;
fn parse(&self, response: Response) -> Vec<SpiderOutput>;
fn allowed_domains(&self) -> HashSet<String> {
HashSet::new()
}
fn concurrent_requests(&self) -> u32 {
4
}
fn concurrent_requests_per_domain(&self) -> u32 {
0
}
fn download_delay(&self) -> f64 {
0.0
}
fn max_blocked_retries(&self) -> u32 {
3
}
fn robots_txt_obey(&self) -> bool {
false
}
fn development_mode(&self) -> bool {
false
}
fn development_cache_dir(&self) -> Option<PathBuf> {
None
}
fn fp_include_kwargs(&self) -> bool {
false
}
fn fp_keep_fragments(&self) -> bool {
false
}
fn fp_include_headers(&self) -> bool {
false
}
fn configure_sessions(&self, manager: &mut SessionManager) {
let fetcher = scrapling_fetch::Fetcher::new();
let _ = manager.add("default", Session::Fetcher(fetcher), true);
}
fn start_requests(&self) -> Vec<Request> {
self.start_urls().into_iter().map(Request::new).collect()
}
fn on_start(&self, _resuming: bool) {}
fn on_close(&self) {}
fn on_error(&self, _request: &Request, _error: &SpiderError) {}
fn on_scraped_item(&self, item: serde_json::Value) -> Option<serde_json::Value> {
Some(item)
}
fn is_blocked(&self, response: &Response) -> bool {
BLOCKED_CODES.contains(&response.status)
}
}
pub struct CrawlerEngine<'a> {
spider: &'a dyn Spider,
session_manager: SessionManager,
scheduler: Scheduler,
stats: CrawlStats,
robots_manager: Option<RobotsTxtManager>,
cache_manager: Option<ResponseCacheManager>,
checkpoint_manager: Option<CheckpointManager>,
items: ItemList,
allowed_domains: HashSet<String>,
active_tasks: u32,
pause_requested: bool,
force_stop: bool,
pub paused: bool,
last_checkpoint_time: Instant,
item_sender: Option<tokio::sync::mpsc::UnboundedSender<serde_json::Value>>,
}
impl<'a> CrawlerEngine<'a> {
pub fn new(
spider: &'a dyn Spider,
crawldir: Option<PathBuf>,
interval_secs: f64,
) -> Result<Self> {
let mut session_manager = SessionManager::new();
spider.configure_sessions(&mut session_manager);
if session_manager.is_empty() {
return Err(SpiderError::Session("no sessions configured".into()));
}
let scheduler = Scheduler::new(
spider.fp_include_kwargs(),
spider.fp_include_headers(),
spider.fp_keep_fragments(),
);
let robots_manager = if spider.robots_txt_obey() {
Some(RobotsTxtManager::new())
} else {
None
};
let cache_manager = if spider.development_mode() {
let dir = spider
.development_cache_dir()
.unwrap_or_else(|| PathBuf::from(".scrapling_cache"));
Some(ResponseCacheManager::new(dir))
} else {
None
};
let checkpoint_manager = crawldir
.map(|dir| CheckpointManager::new(dir, interval_secs))
.transpose()?;
Ok(Self {
spider,
session_manager,
scheduler,
stats: CrawlStats::default(),
robots_manager,
cache_manager,
checkpoint_manager,
items: ItemList::new(),
allowed_domains: spider.allowed_domains(),
active_tasks: 0,
pause_requested: false,
force_stop: false,
paused: false,
last_checkpoint_time: Instant::now(),
item_sender: None,
})
}
fn is_domain_allowed(&self, request: &Request) -> bool {
if self.allowed_domains.is_empty() {
return true;
}
let domain = request.domain();
self.allowed_domains
.iter()
.any(|allowed| domain == *allowed || domain.ends_with(&format!(".{allowed}")))
}
pub fn request_pause(&mut self) {
if self.pause_requested {
self.force_stop = true;
warn!("force stop requested");
} else {
self.pause_requested = true;
info!("graceful pause requested");
}
}
pub async fn crawl(&mut self) -> Result<CrawlStats> {
let start = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs_f64();
self.stats = CrawlStats {
start_time: start,
concurrent_requests: self.spider.concurrent_requests(),
concurrent_requests_per_domain: self.spider.concurrent_requests_per_domain(),
download_delay: self.spider.download_delay(),
..Default::default()
};
self.items = ItemList::new();
self.pause_requested = false;
self.force_stop = false;
self.paused = false;
self.last_checkpoint_time = Instant::now();
let mut resuming = false;
if let Some(ref cm) = self.checkpoint_manager {
if let Ok(Some(cp)) = cm.load() {
info!(
urls = cp.request_urls.len(),
seen = cp.seen_fingerprints.len(),
"restoring from checkpoint"
);
for url in &cp.request_urls {
let req = Request::new(url.clone());
self.scheduler.enqueue(req);
}
resuming = true;
}
}
self.spider.on_start(resuming);
if let Some(ref mut rm) = self.robots_manager {
let urls = self.spider.start_urls();
let sid = self
.session_manager
.default_session_id()
.unwrap_or("default")
.to_owned();
rm.prefetch(&urls, &sid, &self.session_manager).await;
}
if !resuming {
let requests = self.spider.start_requests();
for req in requests {
if self.is_domain_allowed(&req) {
self.scheduler.enqueue(req);
} else {
self.stats.offsite_requests_count += 1;
}
}
}
let max_concurrent = self.spider.concurrent_requests();
let delay = self.spider.download_delay();
loop {
if self.pause_requested {
if self.active_tasks == 0 || self.force_stop {
self.save_checkpoint();
self.paused = true;
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
continue;
}
if self.should_checkpoint() {
self.save_checkpoint();
}
if self.scheduler.is_empty() && self.active_tasks == 0 {
break;
}
if self.scheduler.is_empty() {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
continue;
}
if self.active_tasks >= max_concurrent {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
continue;
}
if let Some(request) = self.scheduler.dequeue() {
self.active_tasks += 1;
if delay > 0.0 {
tokio::time::sleep(tokio::time::Duration::from_secs_f64(delay)).await;
}
self.process_request(request).await;
self.active_tasks -= 1;
}
}
self.spider.on_close();
if !self.paused {
if let Some(ref cm) = self.checkpoint_manager {
let _ = cm.cleanup();
}
}
let end = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs_f64();
self.stats.end_time = end;
info!(
items = self.items.len(),
requests = self.stats.requests_count,
elapsed = format!("{:.2}s", self.stats.elapsed_seconds()),
"crawl complete"
);
Ok(self.stats.clone())
}
async fn process_request(&mut self, request: Request) {
if let Some(ref mut rm) = self.robots_manager {
let sid = if request.sid.is_empty() {
self.session_manager
.default_session_id()
.unwrap_or("default")
.to_owned()
} else {
request.sid.clone()
};
if !rm
.can_fetch(&request.url, &sid, &self.session_manager)
.await
{
self.stats.robots_disallowed_count += 1;
debug!(url = %request.url, "disallowed by robots.txt");
return;
}
}
if let Some(ref cm) = self.cache_manager {
if let Some(fp) = request.fingerprint() {
if let Some(cached) = cm.get(fp) {
self.stats.cache_hits += 1;
self.run_callbacks(&request, cached).await;
return;
}
self.stats.cache_misses += 1;
}
}
let sid = if request.sid.is_empty() {
self.session_manager
.default_session_id()
.unwrap_or("default")
.to_owned()
} else {
request.sid.clone()
};
self.stats.increment_requests_count(&sid);
let response = match self.session_manager.fetch(&request).await {
Ok(resp) => resp,
Err(e) => {
self.stats.failed_requests_count += 1;
error!(url = %request.url, error = %e, "fetch failed");
self.spider.on_error(&request, &e);
return;
}
};
self.stats.increment_status(response.status);
self.stats
.increment_response_bytes(&request.domain(), response.body.len() as u64);
if let Some(ref cm) = self.cache_manager {
if let Some(fp) = request.fingerprint() {
let _ = cm.put(fp, &response, "GET");
}
}
if self.spider.is_blocked(&response) {
self.stats.blocked_requests_count += 1;
if request.retry_count < self.spider.max_blocked_retries() {
let mut retry = request.copy_without_callback();
retry.retry_count += 1;
retry.priority -= 1;
retry.dont_filter = true;
debug!(url = %retry.url, retry = retry.retry_count, "retrying blocked request");
self.scheduler.enqueue(retry);
} else {
warn!(url = %request.url, "max blocked retries exceeded");
}
return;
}
self.run_callbacks(&request, response).await;
}
async fn run_callbacks(&mut self, request: &Request, response: Response) {
let outputs = if let Some(ref callback) = request.callback {
callback(response)
} else {
self.spider.parse(response)
};
for output in outputs {
match output {
SpiderOutput::Item(item) => {
if let Some(processed) = self.spider.on_scraped_item(item) {
self.stats.items_scraped += 1;
if let Some(ref tx) = self.item_sender {
let _ = tx.send(processed.clone());
}
self.items.push(processed);
} else {
self.stats.items_dropped += 1;
}
}
SpiderOutput::FollowRequest(req) => {
if self.is_domain_allowed(&req) {
self.scheduler.enqueue(req);
} else {
self.stats.offsite_requests_count += 1;
}
}
}
}
}
fn should_checkpoint(&self) -> bool {
let Some(ref cm) = self.checkpoint_manager else {
return false;
};
if cm.interval_secs == 0.0 {
return false;
}
self.last_checkpoint_time.elapsed().as_secs_f64() >= cm.interval_secs
}
fn save_checkpoint(&mut self) {
let Some(ref cm) = self.checkpoint_manager else {
return;
};
let (requests, seen) = self.scheduler.snapshot();
let data = CheckpointData {
request_urls: requests.iter().map(|r| r.url.clone()).collect(),
seen_fingerprints: seen.iter().cloned().collect(),
};
if let Err(e) = cm.save(&data) {
error!(error = %e, "failed to save checkpoint");
}
self.last_checkpoint_time = Instant::now();
}
pub fn items(&self) -> &ItemList {
&self.items
}
pub fn stats(&self) -> &CrawlStats {
&self.stats
}
pub fn stream(&mut self) -> tokio::sync::mpsc::UnboundedReceiver<serde_json::Value> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
self.item_sender = Some(tx);
rx
}
}