scrapling_spider/spider.rs
1//! The `Spider` trait and `CrawlerEngine` -- the heart of scrapling-spider.
2//!
3//! This module contains the two most important types in the crate:
4//!
5//! - **[`Spider`]** -- a trait that defines the *what* of a crawl: which URLs to
6//! start from, how to parse responses, which domains to stay within, and
7//! various tuning knobs (concurrency, download delay, retry limits, etc.).
8//!
9//! - **[`CrawlerEngine`]** -- a struct that implements the *how*. It owns the
10//! scheduler, session manager, robots.txt manager, response cache, and
11//! checkpoint manager, and runs the main crawl loop. You create an engine with
12//! [`CrawlerEngine::new`], then call [`CrawlerEngine::crawl`] to start
13//! fetching.
14//!
15//! ## Crawl loop overview
16//!
17//! 1. The engine checks for a checkpoint on disk; if one exists it restores the
18//! pending queue and seen set from the snapshot.
19//! 2. If not resuming, it calls [`Spider::start_requests`] and enqueues them.
20//! 3. In a loop, it dequeues the highest-priority request, checks domain
21//! restrictions and robots.txt, looks up the dev-mode cache, fetches the URL
22//! via the session manager, handles blocked-response retries, invokes the
23//! request's callback (or the spider's `parse`), collects scraped items, and
24//! enqueues any follow-up requests.
25//! 4. The loop ends when the queue is empty, or when a pause/force-stop signal
26//! is received. On pause, a checkpoint is saved; on normal completion, the
27//! checkpoint file is cleaned up.
28
29use std::collections::HashSet;
30use std::path::PathBuf;
31use std::time::Instant;
32
33use tracing::{debug, error, info, warn};
34
35use scrapling_fetch::Response;
36
37use crate::cache::ResponseCacheManager;
38use crate::checkpoint::{CheckpointData, CheckpointManager};
39use crate::error::{Result, SpiderError};
40use crate::request::{Request, SpiderOutput};
41use crate::result::{CrawlStats, ItemList};
42use crate::robotstxt::RobotsTxtManager;
43use crate::scheduler::Scheduler;
44use crate::session::{Session, SessionManager};
45
46/// HTTP status codes considered as "blocked" responses that trigger retries.
47///
48/// When the response status matches one of these codes, the engine treats the
49/// request as blocked and retries it (with lower priority and `dont_filter`
50/// enabled) up to [`Spider::max_blocked_retries`] times.
51const BLOCKED_CODES: &[u16] = &[401, 403, 407, 429, 444, 500, 502, 503, 504];
52
53/// The Spider trait -- implement this to define a web crawler.
54///
55/// At minimum, implement [`name`](Spider::name), [`start_urls`](Spider::start_urls),
56/// and [`parse`](Spider::parse). Everything else has sensible defaults: one
57/// default HTTP session, concurrency of 4, no download delay, no domain
58/// restrictions, and robots.txt ignored.
59///
60/// Override the `configure_sessions`, `allowed_domains`, `on_scraped_item`, and
61/// other hook methods to customize behavior without touching the engine.
62pub trait Spider {
63 /// Returns the unique name of this spider. Used in log messages and as a
64 /// human-readable identifier. There are no uniqueness constraints enforced
65 /// at runtime, but you should keep names distinct to avoid confusion in logs.
66 fn name(&self) -> &str;
67
68 /// Returns the initial URLs to crawl. These are converted into
69 /// [`Request`] objects by [`start_requests`](Spider::start_requests) and
70 /// enqueued into the scheduler at the beginning of the crawl.
71 fn start_urls(&self) -> Vec<String>;
72
73 /// Parses a response and returns scraped items and/or follow-up requests.
74 ///
75 /// This is the core of your spider: extract data from the response body and
76 /// return [`SpiderOutput::Item`] values, and/or discover new links and
77 /// return [`SpiderOutput::FollowRequest`] values to continue crawling. If a
78 /// request has a per-request callback attached, that callback is called
79 /// instead of this method.
80 fn parse(&self, response: Response) -> Vec<SpiderOutput>;
81
82 /// Returns the set of allowed domains. Requests targeting domains not in
83 /// this set are silently dropped and counted in `offsite_requests_count`.
84 /// An empty set (the default) means all domains are allowed. Include only
85 /// the base domain (e.g., `"example.com"`); subdomains are matched
86 /// automatically.
87 fn allowed_domains(&self) -> HashSet<String> {
88 HashSet::new()
89 }
90 /// Returns the maximum number of concurrent requests the engine will
91 /// dispatch at any given time. Increase this to speed up crawls on sites
92 /// that can handle the load; decrease it to be more polite. The default
93 /// is 4.
94 fn concurrent_requests(&self) -> u32 {
95 4
96 }
97 /// Returns the maximum number of concurrent requests per domain. Set this
98 /// to limit the load on any single host while still allowing high overall
99 /// concurrency across multiple domains. The default is 0, meaning unlimited
100 /// (constrained only by `concurrent_requests`).
101 fn concurrent_requests_per_domain(&self) -> u32 {
102 0
103 }
104 /// Returns the delay in seconds between consecutive requests. The engine
105 /// sleeps for this duration after dispatching each request. Use this to
106 /// throttle your crawl and avoid overloading target servers. The default
107 /// is 0.0 (no delay).
108 fn download_delay(&self) -> f64 {
109 0.0
110 }
111 /// Returns the maximum number of retries for responses flagged as
112 /// "blocked" by [`is_blocked`](Spider::is_blocked). After this many
113 /// retries, the engine gives up and logs a warning. Each retry is enqueued
114 /// with lower priority and `dont_filter` set to `true`. The default is 3.
115 fn max_blocked_retries(&self) -> u32 {
116 3
117 }
118 /// Returns whether the engine should obey robots.txt rules. When `true`,
119 /// the engine fetches and caches each domain's `robots.txt` and skips URLs
120 /// that are disallowed. The default is `false` (robots.txt is ignored).
121 fn robots_txt_obey(&self) -> bool {
122 false
123 }
124 /// Returns whether development mode is enabled. When `true`, the engine
125 /// caches every response to disk and serves cached responses on subsequent
126 /// runs, eliminating network I/O while you iterate on parse logic. Not
127 /// intended for production. The default is `false`.
128 fn development_mode(&self) -> bool {
129 false
130 }
131 /// Returns the directory used for the development response cache. If
132 /// `None` (the default), the engine falls back to `.scrapling_cache` in
133 /// the current working directory. Only relevant when
134 /// [`development_mode`](Spider::development_mode) is `true`.
135 fn development_cache_dir(&self) -> Option<PathBuf> {
136 None
137 }
138 /// Returns whether to include session kwargs (method, body, etc.) in
139 /// request fingerprints. Enable this when your spider makes POST requests
140 /// or passes different parameters to the same URL, so that each unique
141 /// combination is treated as a distinct request. The default is `false`.
142 fn fp_include_kwargs(&self) -> bool {
143 false
144 }
145 /// Returns whether to keep URL fragments (the `#section` part) in request
146 /// fingerprints. By default fragments are stripped, so `page#a` and
147 /// `page#b` are treated as the same URL. Set to `true` if fragment
148 /// differences are meaningful for your target site.
149 fn fp_keep_fragments(&self) -> bool {
150 false
151 }
152 /// Returns whether to include HTTP headers in request fingerprints. Enable
153 /// this when you send different headers to the same URL and want each
154 /// header combination to be treated as a distinct request. The default is
155 /// `false`.
156 fn fp_include_headers(&self) -> bool {
157 false
158 }
159
160 /// Configures the session manager with fetcher sessions for this spider.
161 ///
162 /// The default implementation registers a single stateless `Fetcher` under
163 /// the ID `"default"`. Override this to add authenticated sessions,
164 /// proxy-routed fetchers, or browser-based sessions.
165 fn configure_sessions(&self, manager: &mut SessionManager) {
166 let fetcher = scrapling_fetch::Fetcher::new();
167 let _ = manager.add("default", Session::Fetcher(fetcher), true);
168 }
169
170 /// Builds the initial set of requests from the start URLs. The default
171 /// implementation wraps each URL from [`start_urls`](Spider::start_urls)
172 /// in a plain [`Request`]. Override this if you need custom priorities,
173 /// metadata, or callbacks on your initial requests.
174 fn start_requests(&self) -> Vec<Request> {
175 self.start_urls().into_iter().map(Request::new).collect()
176 }
177
178 /// Called once when the crawl starts. The `resuming` flag is `true` if the
179 /// engine is restoring state from a checkpoint rather than starting fresh.
180 /// Use this hook for one-time setup like initializing external connections
181 /// or logging the crawl start.
182 fn on_start(&self, _resuming: bool) {}
183
184 /// Called once when the crawl finishes or is paused. Use this for teardown
185 /// tasks like flushing buffers or closing database connections.
186 fn on_close(&self) {}
187
188 /// Called when a request fails with an error (network timeout, DNS failure,
189 /// etc.). Override this to implement custom error handling such as logging
190 /// to an external service or adding the URL to a retry list.
191 fn on_error(&self, _request: &Request, _error: &SpiderError) {}
192
193 /// Called for each scraped item before it is added to the
194 /// [`ItemList`](crate::result::ItemList). Return `Some(item)` to keep the
195 /// item (optionally transforming it), or `None` to drop it. Dropped items
196 /// are counted in [`CrawlStats::items_dropped`](crate::result::CrawlStats::items_dropped).
197 fn on_scraped_item(&self, item: serde_json::Value) -> Option<serde_json::Value> {
198 Some(item)
199 }
200
201 /// Returns `true` if the response indicates the request was blocked by the
202 /// server. The default implementation checks the status code against a
203 /// built-in list of codes commonly associated with rate limiting and access
204 /// denial (401, 403, 407, 429, 444, 500, 502, 503, 504). Override this to
205 /// add site-specific detection (e.g., checking for CAPTCHA pages in the
206 /// response body).
207 fn is_blocked(&self, response: &Response) -> bool {
208 BLOCKED_CODES.contains(&response.status)
209 }
210}
211
212/// The crawler engine -- orchestrates the entire crawl loop.
213///
214/// `CrawlerEngine` is the runtime counterpart to the [`Spider`] trait. It owns
215/// all the infrastructure (scheduler, sessions, cache, checkpoint, robots.txt)
216/// and drives the fetch-parse-enqueue cycle. Create one with [`new`](CrawlerEngine::new),
217/// then call [`crawl`](CrawlerEngine::crawl) to start processing.
218///
219/// The engine supports graceful pause via [`request_pause`](CrawlerEngine::request_pause):
220/// the first call initiates a graceful wind-down (waiting for in-flight
221/// requests to finish), and a second call triggers an immediate force stop.
222pub struct CrawlerEngine<'a> {
223 spider: &'a dyn Spider,
224 session_manager: SessionManager,
225 scheduler: Scheduler,
226 stats: CrawlStats,
227 robots_manager: Option<RobotsTxtManager>,
228 cache_manager: Option<ResponseCacheManager>,
229 checkpoint_manager: Option<CheckpointManager>,
230 items: ItemList,
231 allowed_domains: HashSet<String>,
232 active_tasks: u32,
233 pause_requested: bool,
234 force_stop: bool,
235 /// Whether the crawl is currently in a paused state.
236 pub paused: bool,
237 last_checkpoint_time: Instant,
238 item_sender: Option<tokio::sync::mpsc::UnboundedSender<serde_json::Value>>,
239}
240
241impl<'a> CrawlerEngine<'a> {
242 /// Creates a new crawler engine for the given spider with optional
243 /// checkpoint support.
244 ///
245 /// Pass a `crawldir` path to enable pause/resume checkpointing, or `None`
246 /// to disable it. `interval_secs` controls how often auto-checkpoints are
247 /// saved during the crawl (0.0 disables periodic saves). The spider's
248 /// `configure_sessions` method is called immediately to populate the
249 /// session manager; an error is returned if no sessions are registered.
250 pub fn new(
251 spider: &'a dyn Spider,
252 crawldir: Option<PathBuf>,
253 interval_secs: f64,
254 ) -> Result<Self> {
255 let mut session_manager = SessionManager::new();
256 spider.configure_sessions(&mut session_manager);
257
258 if session_manager.is_empty() {
259 return Err(SpiderError::Session("no sessions configured".into()));
260 }
261
262 let scheduler = Scheduler::new(
263 spider.fp_include_kwargs(),
264 spider.fp_include_headers(),
265 spider.fp_keep_fragments(),
266 );
267
268 let robots_manager = if spider.robots_txt_obey() {
269 Some(RobotsTxtManager::new())
270 } else {
271 None
272 };
273
274 let cache_manager = if spider.development_mode() {
275 let dir = spider
276 .development_cache_dir()
277 .unwrap_or_else(|| PathBuf::from(".scrapling_cache"));
278 Some(ResponseCacheManager::new(dir))
279 } else {
280 None
281 };
282
283 let checkpoint_manager = crawldir
284 .map(|dir| CheckpointManager::new(dir, interval_secs))
285 .transpose()?;
286
287 Ok(Self {
288 spider,
289 session_manager,
290 scheduler,
291 stats: CrawlStats::default(),
292 robots_manager,
293 cache_manager,
294 checkpoint_manager,
295 items: ItemList::new(),
296 allowed_domains: spider.allowed_domains(),
297 active_tasks: 0,
298 pause_requested: false,
299 force_stop: false,
300 paused: false,
301 last_checkpoint_time: Instant::now(),
302 item_sender: None,
303 })
304 }
305
306 fn is_domain_allowed(&self, request: &Request) -> bool {
307 if self.allowed_domains.is_empty() {
308 return true;
309 }
310 let domain = request.domain();
311 self.allowed_domains
312 .iter()
313 .any(|allowed| domain == *allowed || domain.ends_with(&format!(".{allowed}")))
314 }
315
316 /// Requests a graceful pause of the crawl. On the first call, the engine
317 /// waits for all in-flight requests to finish before saving a checkpoint
318 /// and exiting the loop. Calling this a second time triggers a force stop
319 /// that abandons in-flight requests immediately.
320 pub fn request_pause(&mut self) {
321 if self.pause_requested {
322 self.force_stop = true;
323 warn!("force stop requested");
324 } else {
325 self.pause_requested = true;
326 info!("graceful pause requested");
327 }
328 }
329
330 /// Runs the main crawl loop and returns aggregate statistics when finished.
331 ///
332 /// This is the primary entry point for executing a crawl. The method blocks
333 /// (asynchronously) until the scheduler is empty and all tasks are done, or
334 /// until a pause/force-stop is requested. On success it returns the final
335 /// [`CrawlStats`]; check `self.paused` to determine whether the crawl
336 /// completed or was interrupted.
337 pub async fn crawl(&mut self) -> Result<CrawlStats> {
338 let start = std::time::SystemTime::now()
339 .duration_since(std::time::UNIX_EPOCH)
340 .unwrap()
341 .as_secs_f64();
342
343 self.stats = CrawlStats {
344 start_time: start,
345 concurrent_requests: self.spider.concurrent_requests(),
346 concurrent_requests_per_domain: self.spider.concurrent_requests_per_domain(),
347 download_delay: self.spider.download_delay(),
348 ..Default::default()
349 };
350 self.items = ItemList::new();
351 self.pause_requested = false;
352 self.force_stop = false;
353 self.paused = false;
354 self.last_checkpoint_time = Instant::now();
355
356 // Attempt checkpoint restore
357 let mut resuming = false;
358 if let Some(ref cm) = self.checkpoint_manager {
359 if let Ok(Some(cp)) = cm.load() {
360 info!(
361 urls = cp.request_urls.len(),
362 seen = cp.seen_fingerprints.len(),
363 "restoring from checkpoint"
364 );
365 for url in &cp.request_urls {
366 let req = Request::new(url.clone());
367 self.scheduler.enqueue(req);
368 }
369 resuming = true;
370 }
371 }
372
373 self.spider.on_start(resuming);
374
375 // Prefetch robots.txt
376 if let Some(ref mut rm) = self.robots_manager {
377 let urls = self.spider.start_urls();
378 let sid = self
379 .session_manager
380 .default_session_id()
381 .unwrap_or("default")
382 .to_owned();
383 rm.prefetch(&urls, &sid, &self.session_manager).await;
384 }
385
386 // Enqueue start requests
387 if !resuming {
388 let requests = self.spider.start_requests();
389 for req in requests {
390 if self.is_domain_allowed(&req) {
391 self.scheduler.enqueue(req);
392 } else {
393 self.stats.offsite_requests_count += 1;
394 }
395 }
396 }
397
398 // Main crawl loop
399 let max_concurrent = self.spider.concurrent_requests();
400 let delay = self.spider.download_delay();
401
402 loop {
403 if self.pause_requested {
404 if self.active_tasks == 0 || self.force_stop {
405 self.save_checkpoint();
406 self.paused = true;
407 break;
408 }
409 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
410 continue;
411 }
412
413 // Periodic checkpoint
414 if self.should_checkpoint() {
415 self.save_checkpoint();
416 }
417
418 if self.scheduler.is_empty() && self.active_tasks == 0 {
419 break;
420 }
421
422 if self.scheduler.is_empty() {
423 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
424 continue;
425 }
426
427 if self.active_tasks >= max_concurrent {
428 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
429 continue;
430 }
431
432 if let Some(request) = self.scheduler.dequeue() {
433 self.active_tasks += 1;
434
435 if delay > 0.0 {
436 tokio::time::sleep(tokio::time::Duration::from_secs_f64(delay)).await;
437 }
438
439 self.process_request(request).await;
440 self.active_tasks -= 1;
441 }
442 }
443
444 self.spider.on_close();
445
446 if !self.paused {
447 if let Some(ref cm) = self.checkpoint_manager {
448 let _ = cm.cleanup();
449 }
450 }
451
452 let end = std::time::SystemTime::now()
453 .duration_since(std::time::UNIX_EPOCH)
454 .unwrap()
455 .as_secs_f64();
456 self.stats.end_time = end;
457
458 info!(
459 items = self.items.len(),
460 requests = self.stats.requests_count,
461 elapsed = format!("{:.2}s", self.stats.elapsed_seconds()),
462 "crawl complete"
463 );
464
465 Ok(self.stats.clone())
466 }
467
468 async fn process_request(&mut self, request: Request) {
469 // Robots.txt check
470 if let Some(ref mut rm) = self.robots_manager {
471 let sid = if request.sid.is_empty() {
472 self.session_manager
473 .default_session_id()
474 .unwrap_or("default")
475 .to_owned()
476 } else {
477 request.sid.clone()
478 };
479 if !rm
480 .can_fetch(&request.url, &sid, &self.session_manager)
481 .await
482 {
483 self.stats.robots_disallowed_count += 1;
484 debug!(url = %request.url, "disallowed by robots.txt");
485 return;
486 }
487 }
488
489 // Cache check
490 if let Some(ref cm) = self.cache_manager {
491 if let Some(fp) = request.fingerprint() {
492 if let Some(cached) = cm.get(fp) {
493 self.stats.cache_hits += 1;
494 self.run_callbacks(&request, cached).await;
495 return;
496 }
497 self.stats.cache_misses += 1;
498 }
499 }
500
501 // Fetch
502 let sid = if request.sid.is_empty() {
503 self.session_manager
504 .default_session_id()
505 .unwrap_or("default")
506 .to_owned()
507 } else {
508 request.sid.clone()
509 };
510
511 self.stats.increment_requests_count(&sid);
512
513 let response = match self.session_manager.fetch(&request).await {
514 Ok(resp) => resp,
515 Err(e) => {
516 self.stats.failed_requests_count += 1;
517 error!(url = %request.url, error = %e, "fetch failed");
518 self.spider.on_error(&request, &e);
519 return;
520 }
521 };
522
523 self.stats.increment_status(response.status);
524 self.stats
525 .increment_response_bytes(&request.domain(), response.body.len() as u64);
526
527 // Cache response
528 if let Some(ref cm) = self.cache_manager {
529 if let Some(fp) = request.fingerprint() {
530 let _ = cm.put(fp, &response, "GET");
531 }
532 }
533
534 // Blocked check
535 if self.spider.is_blocked(&response) {
536 self.stats.blocked_requests_count += 1;
537 if request.retry_count < self.spider.max_blocked_retries() {
538 let mut retry = request.copy_without_callback();
539 retry.retry_count += 1;
540 retry.priority -= 1;
541 retry.dont_filter = true;
542 debug!(url = %retry.url, retry = retry.retry_count, "retrying blocked request");
543 self.scheduler.enqueue(retry);
544 } else {
545 warn!(url = %request.url, "max blocked retries exceeded");
546 }
547 return;
548 }
549
550 self.run_callbacks(&request, response).await;
551 }
552
553 async fn run_callbacks(&mut self, request: &Request, response: Response) {
554 let outputs = if let Some(ref callback) = request.callback {
555 callback(response)
556 } else {
557 self.spider.parse(response)
558 };
559
560 for output in outputs {
561 match output {
562 SpiderOutput::Item(item) => {
563 if let Some(processed) = self.spider.on_scraped_item(item) {
564 self.stats.items_scraped += 1;
565 if let Some(ref tx) = self.item_sender {
566 let _ = tx.send(processed.clone());
567 }
568 self.items.push(processed);
569 } else {
570 self.stats.items_dropped += 1;
571 }
572 }
573 SpiderOutput::FollowRequest(req) => {
574 if self.is_domain_allowed(&req) {
575 self.scheduler.enqueue(req);
576 } else {
577 self.stats.offsite_requests_count += 1;
578 }
579 }
580 }
581 }
582 }
583
584 fn should_checkpoint(&self) -> bool {
585 let Some(ref cm) = self.checkpoint_manager else {
586 return false;
587 };
588 if cm.interval_secs == 0.0 {
589 return false;
590 }
591 self.last_checkpoint_time.elapsed().as_secs_f64() >= cm.interval_secs
592 }
593
594 fn save_checkpoint(&mut self) {
595 let Some(ref cm) = self.checkpoint_manager else {
596 return;
597 };
598
599 let (requests, seen) = self.scheduler.snapshot();
600 let data = CheckpointData {
601 request_urls: requests.iter().map(|r| r.url.clone()).collect(),
602 seen_fingerprints: seen.iter().cloned().collect(),
603 };
604
605 if let Err(e) = cm.save(&data) {
606 error!(error = %e, "failed to save checkpoint");
607 }
608 self.last_checkpoint_time = Instant::now();
609 }
610
611 /// Returns a reference to the collected scraped items. You can call this
612 /// during or after the crawl to inspect what has been scraped so far.
613 pub fn items(&self) -> &ItemList {
614 &self.items
615 }
616
617 /// Returns a reference to the current crawl statistics. Like `items()`,
618 /// this is available both during and after the crawl for monitoring
619 /// progress.
620 pub fn stats(&self) -> &CrawlStats {
621 &self.stats
622 }
623
624 /// Creates a streaming receiver that yields items as they are scraped.
625 ///
626 /// Call this before [`crawl()`](CrawlerEngine::crawl) to get an unbounded
627 /// receiver. Each item passes through `on_scraped_item()` and is sent
628 /// to both the receiver and the internal `ItemList`.
629 ///
630 /// This is the Rust equivalent of Python's `async for item in spider.stream()`.
631 ///
632 /// # Example
633 ///
634 /// ```rust,ignore
635 /// let mut engine = CrawlerEngine::new(&spider, None, 0.0)?;
636 /// let mut rx = engine.stream();
637 ///
638 /// // Spawn the crawl in the background
639 /// let crawl_handle = tokio::spawn(async move {
640 /// engine.crawl().await
641 /// });
642 ///
643 /// // Process items as they arrive
644 /// while let Some(item) = rx.recv().await {
645 /// println!("Got item: {}", item);
646 /// }
647 ///
648 /// let stats = crawl_handle.await??;
649 /// ```
650 pub fn stream(&mut self) -> tokio::sync::mpsc::UnboundedReceiver<serde_json::Value> {
651 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
652 self.item_sender = Some(tx);
653 rx
654 }
655}