Skip to main content

scrapling_spider/
spider.rs

1//! The `Spider` trait and `CrawlerEngine` -- the heart of scrapling-spider.
2//!
3//! This module contains the two most important types in the crate:
4//!
5//! - **[`Spider`]** -- a trait that defines the *what* of a crawl: which URLs to
6//!   start from, how to parse responses, which domains to stay within, and
7//!   various tuning knobs (concurrency, download delay, retry limits, etc.).
8//!
9//! - **[`CrawlerEngine`]** -- a struct that implements the *how*. It owns the
10//!   scheduler, session manager, robots.txt manager, response cache, and
11//!   checkpoint manager, and runs the main crawl loop. You create an engine with
12//!   [`CrawlerEngine::new`], then call [`CrawlerEngine::crawl`] to start
13//!   fetching.
14//!
15//! ## Crawl loop overview
16//!
17//! 1. The engine checks for a checkpoint on disk; if one exists it restores the
18//!    pending queue and seen set from the snapshot.
19//! 2. If not resuming, it calls [`Spider::start_requests`] and enqueues them.
20//! 3. In a loop, it dequeues the highest-priority request, checks domain
21//!    restrictions and robots.txt, looks up the dev-mode cache, fetches the URL
22//!    via the session manager, handles blocked-response retries, invokes the
23//!    request's callback (or the spider's `parse`), collects scraped items, and
24//!    enqueues any follow-up requests.
25//! 4. The loop ends when the queue is empty, or when a pause/force-stop signal
26//!    is received. On pause, a checkpoint is saved; on normal completion, the
27//!    checkpoint file is cleaned up.
28
29use std::collections::HashSet;
30use std::path::PathBuf;
31use std::time::Instant;
32
33use tracing::{debug, error, info, warn};
34
35use scrapling_fetch::Response;
36
37use crate::cache::ResponseCacheManager;
38use crate::checkpoint::{CheckpointData, CheckpointManager};
39use crate::error::{Result, SpiderError};
40use crate::request::{Request, SpiderOutput};
41use crate::result::{CrawlStats, ItemList};
42use crate::robotstxt::RobotsTxtManager;
43use crate::scheduler::Scheduler;
44use crate::session::{Session, SessionManager};
45
46/// HTTP status codes considered as "blocked" responses that trigger retries.
47///
48/// When the response status matches one of these codes, the engine treats the
49/// request as blocked and retries it (with lower priority and `dont_filter`
50/// enabled) up to [`Spider::max_blocked_retries`] times.
51const BLOCKED_CODES: &[u16] = &[401, 403, 407, 429, 444, 500, 502, 503, 504];
52
53/// The Spider trait -- implement this to define a web crawler.
54///
55/// At minimum, implement [`name`](Spider::name), [`start_urls`](Spider::start_urls),
56/// and [`parse`](Spider::parse). Everything else has sensible defaults: one
57/// default HTTP session, concurrency of 4, no download delay, no domain
58/// restrictions, and robots.txt ignored.
59///
60/// Override the `configure_sessions`, `allowed_domains`, `on_scraped_item`, and
61/// other hook methods to customize behavior without touching the engine.
62pub trait Spider {
63    /// Returns the unique name of this spider. Used in log messages and as a
64    /// human-readable identifier. There are no uniqueness constraints enforced
65    /// at runtime, but you should keep names distinct to avoid confusion in logs.
66    fn name(&self) -> &str;
67
68    /// Returns the initial URLs to crawl. These are converted into
69    /// [`Request`] objects by [`start_requests`](Spider::start_requests) and
70    /// enqueued into the scheduler at the beginning of the crawl.
71    fn start_urls(&self) -> Vec<String>;
72
73    /// Parses a response and returns scraped items and/or follow-up requests.
74    ///
75    /// This is the core of your spider: extract data from the response body and
76    /// return [`SpiderOutput::Item`] values, and/or discover new links and
77    /// return [`SpiderOutput::FollowRequest`] values to continue crawling. If a
78    /// request has a per-request callback attached, that callback is called
79    /// instead of this method.
80    fn parse(&self, response: Response) -> Vec<SpiderOutput>;
81
82    /// Returns the set of allowed domains. Requests targeting domains not in
83    /// this set are silently dropped and counted in `offsite_requests_count`.
84    /// An empty set (the default) means all domains are allowed. Include only
85    /// the base domain (e.g., `"example.com"`); subdomains are matched
86    /// automatically.
87    fn allowed_domains(&self) -> HashSet<String> {
88        HashSet::new()
89    }
90    /// Returns the maximum number of concurrent requests the engine will
91    /// dispatch at any given time. Increase this to speed up crawls on sites
92    /// that can handle the load; decrease it to be more polite. The default
93    /// is 4.
94    fn concurrent_requests(&self) -> u32 {
95        4
96    }
97    /// Returns the maximum number of concurrent requests per domain. Set this
98    /// to limit the load on any single host while still allowing high overall
99    /// concurrency across multiple domains. The default is 0, meaning unlimited
100    /// (constrained only by `concurrent_requests`).
101    fn concurrent_requests_per_domain(&self) -> u32 {
102        0
103    }
104    /// Returns the delay in seconds between consecutive requests. The engine
105    /// sleeps for this duration after dispatching each request. Use this to
106    /// throttle your crawl and avoid overloading target servers. The default
107    /// is 0.0 (no delay).
108    fn download_delay(&self) -> f64 {
109        0.0
110    }
111    /// Returns the maximum number of retries for responses flagged as
112    /// "blocked" by [`is_blocked`](Spider::is_blocked). After this many
113    /// retries, the engine gives up and logs a warning. Each retry is enqueued
114    /// with lower priority and `dont_filter` set to `true`. The default is 3.
115    fn max_blocked_retries(&self) -> u32 {
116        3
117    }
118    /// Returns whether the engine should obey robots.txt rules. When `true`,
119    /// the engine fetches and caches each domain's `robots.txt` and skips URLs
120    /// that are disallowed. The default is `false` (robots.txt is ignored).
121    fn robots_txt_obey(&self) -> bool {
122        false
123    }
124    /// Returns whether development mode is enabled. When `true`, the engine
125    /// caches every response to disk and serves cached responses on subsequent
126    /// runs, eliminating network I/O while you iterate on parse logic. Not
127    /// intended for production. The default is `false`.
128    fn development_mode(&self) -> bool {
129        false
130    }
131    /// Returns the directory used for the development response cache. If
132    /// `None` (the default), the engine falls back to `.scrapling_cache` in
133    /// the current working directory. Only relevant when
134    /// [`development_mode`](Spider::development_mode) is `true`.
135    fn development_cache_dir(&self) -> Option<PathBuf> {
136        None
137    }
138    /// Returns whether to include session kwargs (method, body, etc.) in
139    /// request fingerprints. Enable this when your spider makes POST requests
140    /// or passes different parameters to the same URL, so that each unique
141    /// combination is treated as a distinct request. The default is `false`.
142    fn fp_include_kwargs(&self) -> bool {
143        false
144    }
145    /// Returns whether to keep URL fragments (the `#section` part) in request
146    /// fingerprints. By default fragments are stripped, so `page#a` and
147    /// `page#b` are treated as the same URL. Set to `true` if fragment
148    /// differences are meaningful for your target site.
149    fn fp_keep_fragments(&self) -> bool {
150        false
151    }
152    /// Returns whether to include HTTP headers in request fingerprints. Enable
153    /// this when you send different headers to the same URL and want each
154    /// header combination to be treated as a distinct request. The default is
155    /// `false`.
156    fn fp_include_headers(&self) -> bool {
157        false
158    }
159
160    /// Configures the session manager with fetcher sessions for this spider.
161    ///
162    /// The default implementation registers a single stateless `Fetcher` under
163    /// the ID `"default"`. Override this to add authenticated sessions,
164    /// proxy-routed fetchers, or browser-based sessions.
165    fn configure_sessions(&self, manager: &mut SessionManager) {
166        let fetcher = scrapling_fetch::Fetcher::new();
167        let _ = manager.add("default", Session::Fetcher(fetcher), true);
168    }
169
170    /// Builds the initial set of requests from the start URLs. The default
171    /// implementation wraps each URL from [`start_urls`](Spider::start_urls)
172    /// in a plain [`Request`]. Override this if you need custom priorities,
173    /// metadata, or callbacks on your initial requests.
174    fn start_requests(&self) -> Vec<Request> {
175        self.start_urls().into_iter().map(Request::new).collect()
176    }
177
178    /// Called once when the crawl starts. The `resuming` flag is `true` if the
179    /// engine is restoring state from a checkpoint rather than starting fresh.
180    /// Use this hook for one-time setup like initializing external connections
181    /// or logging the crawl start.
182    fn on_start(&self, _resuming: bool) {}
183
184    /// Called once when the crawl finishes or is paused. Use this for teardown
185    /// tasks like flushing buffers or closing database connections.
186    fn on_close(&self) {}
187
188    /// Called when a request fails with an error (network timeout, DNS failure,
189    /// etc.). Override this to implement custom error handling such as logging
190    /// to an external service or adding the URL to a retry list.
191    fn on_error(&self, _request: &Request, _error: &SpiderError) {}
192
193    /// Called for each scraped item before it is added to the
194    /// [`ItemList`](crate::result::ItemList). Return `Some(item)` to keep the
195    /// item (optionally transforming it), or `None` to drop it. Dropped items
196    /// are counted in [`CrawlStats::items_dropped`](crate::result::CrawlStats::items_dropped).
197    fn on_scraped_item(&self, item: serde_json::Value) -> Option<serde_json::Value> {
198        Some(item)
199    }
200
201    /// Returns `true` if the response indicates the request was blocked by the
202    /// server. The default implementation checks the status code against a
203    /// built-in list of codes commonly associated with rate limiting and access
204    /// denial (401, 403, 407, 429, 444, 500, 502, 503, 504). Override this to
205    /// add site-specific detection (e.g., checking for CAPTCHA pages in the
206    /// response body).
207    fn is_blocked(&self, response: &Response) -> bool {
208        BLOCKED_CODES.contains(&response.status)
209    }
210}
211
212/// The crawler engine -- orchestrates the entire crawl loop.
213///
214/// `CrawlerEngine` is the runtime counterpart to the [`Spider`] trait. It owns
215/// all the infrastructure (scheduler, sessions, cache, checkpoint, robots.txt)
216/// and drives the fetch-parse-enqueue cycle. Create one with [`new`](CrawlerEngine::new),
217/// then call [`crawl`](CrawlerEngine::crawl) to start processing.
218///
219/// The engine supports graceful pause via [`request_pause`](CrawlerEngine::request_pause):
220/// the first call initiates a graceful wind-down (waiting for in-flight
221/// requests to finish), and a second call triggers an immediate force stop.
222pub struct CrawlerEngine<'a> {
223    spider: &'a dyn Spider,
224    session_manager: SessionManager,
225    scheduler: Scheduler,
226    stats: CrawlStats,
227    robots_manager: Option<RobotsTxtManager>,
228    cache_manager: Option<ResponseCacheManager>,
229    checkpoint_manager: Option<CheckpointManager>,
230    items: ItemList,
231    allowed_domains: HashSet<String>,
232    active_tasks: u32,
233    pause_requested: bool,
234    force_stop: bool,
235    /// Whether the crawl is currently in a paused state.
236    pub paused: bool,
237    last_checkpoint_time: Instant,
238    item_sender: Option<tokio::sync::mpsc::UnboundedSender<serde_json::Value>>,
239}
240
241impl<'a> CrawlerEngine<'a> {
242    /// Creates a new crawler engine for the given spider with optional
243    /// checkpoint support.
244    ///
245    /// Pass a `crawldir` path to enable pause/resume checkpointing, or `None`
246    /// to disable it. `interval_secs` controls how often auto-checkpoints are
247    /// saved during the crawl (0.0 disables periodic saves). The spider's
248    /// `configure_sessions` method is called immediately to populate the
249    /// session manager; an error is returned if no sessions are registered.
250    pub fn new(
251        spider: &'a dyn Spider,
252        crawldir: Option<PathBuf>,
253        interval_secs: f64,
254    ) -> Result<Self> {
255        let mut session_manager = SessionManager::new();
256        spider.configure_sessions(&mut session_manager);
257
258        if session_manager.is_empty() {
259            return Err(SpiderError::Session("no sessions configured".into()));
260        }
261
262        let scheduler = Scheduler::new(
263            spider.fp_include_kwargs(),
264            spider.fp_include_headers(),
265            spider.fp_keep_fragments(),
266        );
267
268        let robots_manager = if spider.robots_txt_obey() {
269            Some(RobotsTxtManager::new())
270        } else {
271            None
272        };
273
274        let cache_manager = if spider.development_mode() {
275            let dir = spider
276                .development_cache_dir()
277                .unwrap_or_else(|| PathBuf::from(".scrapling_cache"));
278            Some(ResponseCacheManager::new(dir))
279        } else {
280            None
281        };
282
283        let checkpoint_manager = crawldir
284            .map(|dir| CheckpointManager::new(dir, interval_secs))
285            .transpose()?;
286
287        Ok(Self {
288            spider,
289            session_manager,
290            scheduler,
291            stats: CrawlStats::default(),
292            robots_manager,
293            cache_manager,
294            checkpoint_manager,
295            items: ItemList::new(),
296            allowed_domains: spider.allowed_domains(),
297            active_tasks: 0,
298            pause_requested: false,
299            force_stop: false,
300            paused: false,
301            last_checkpoint_time: Instant::now(),
302            item_sender: None,
303        })
304    }
305
306    fn is_domain_allowed(&self, request: &Request) -> bool {
307        if self.allowed_domains.is_empty() {
308            return true;
309        }
310        let domain = request.domain();
311        self.allowed_domains
312            .iter()
313            .any(|allowed| domain == *allowed || domain.ends_with(&format!(".{allowed}")))
314    }
315
316    /// Requests a graceful pause of the crawl. On the first call, the engine
317    /// waits for all in-flight requests to finish before saving a checkpoint
318    /// and exiting the loop. Calling this a second time triggers a force stop
319    /// that abandons in-flight requests immediately.
320    pub fn request_pause(&mut self) {
321        if self.pause_requested {
322            self.force_stop = true;
323            warn!("force stop requested");
324        } else {
325            self.pause_requested = true;
326            info!("graceful pause requested");
327        }
328    }
329
330    /// Runs the main crawl loop and returns aggregate statistics when finished.
331    ///
332    /// This is the primary entry point for executing a crawl. The method blocks
333    /// (asynchronously) until the scheduler is empty and all tasks are done, or
334    /// until a pause/force-stop is requested. On success it returns the final
335    /// [`CrawlStats`]; check `self.paused` to determine whether the crawl
336    /// completed or was interrupted.
337    pub async fn crawl(&mut self) -> Result<CrawlStats> {
338        let start = std::time::SystemTime::now()
339            .duration_since(std::time::UNIX_EPOCH)
340            .unwrap()
341            .as_secs_f64();
342
343        self.stats = CrawlStats {
344            start_time: start,
345            concurrent_requests: self.spider.concurrent_requests(),
346            concurrent_requests_per_domain: self.spider.concurrent_requests_per_domain(),
347            download_delay: self.spider.download_delay(),
348            ..Default::default()
349        };
350        self.items = ItemList::new();
351        self.pause_requested = false;
352        self.force_stop = false;
353        self.paused = false;
354        self.last_checkpoint_time = Instant::now();
355
356        // Attempt checkpoint restore
357        let mut resuming = false;
358        if let Some(ref cm) = self.checkpoint_manager {
359            if let Ok(Some(cp)) = cm.load() {
360                info!(
361                    urls = cp.request_urls.len(),
362                    seen = cp.seen_fingerprints.len(),
363                    "restoring from checkpoint"
364                );
365                for url in &cp.request_urls {
366                    let req = Request::new(url.clone());
367                    self.scheduler.enqueue(req);
368                }
369                resuming = true;
370            }
371        }
372
373        self.spider.on_start(resuming);
374
375        // Prefetch robots.txt
376        if let Some(ref mut rm) = self.robots_manager {
377            let urls = self.spider.start_urls();
378            let sid = self
379                .session_manager
380                .default_session_id()
381                .unwrap_or("default")
382                .to_owned();
383            rm.prefetch(&urls, &sid, &self.session_manager).await;
384        }
385
386        // Enqueue start requests
387        if !resuming {
388            let requests = self.spider.start_requests();
389            for req in requests {
390                if self.is_domain_allowed(&req) {
391                    self.scheduler.enqueue(req);
392                } else {
393                    self.stats.offsite_requests_count += 1;
394                }
395            }
396        }
397
398        // Main crawl loop
399        let max_concurrent = self.spider.concurrent_requests();
400        let delay = self.spider.download_delay();
401
402        loop {
403            if self.pause_requested {
404                if self.active_tasks == 0 || self.force_stop {
405                    self.save_checkpoint();
406                    self.paused = true;
407                    break;
408                }
409                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
410                continue;
411            }
412
413            // Periodic checkpoint
414            if self.should_checkpoint() {
415                self.save_checkpoint();
416            }
417
418            if self.scheduler.is_empty() && self.active_tasks == 0 {
419                break;
420            }
421
422            if self.scheduler.is_empty() {
423                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
424                continue;
425            }
426
427            if self.active_tasks >= max_concurrent {
428                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
429                continue;
430            }
431
432            if let Some(request) = self.scheduler.dequeue() {
433                self.active_tasks += 1;
434
435                if delay > 0.0 {
436                    tokio::time::sleep(tokio::time::Duration::from_secs_f64(delay)).await;
437                }
438
439                self.process_request(request).await;
440                self.active_tasks -= 1;
441            }
442        }
443
444        self.spider.on_close();
445
446        if !self.paused {
447            if let Some(ref cm) = self.checkpoint_manager {
448                let _ = cm.cleanup();
449            }
450        }
451
452        let end = std::time::SystemTime::now()
453            .duration_since(std::time::UNIX_EPOCH)
454            .unwrap()
455            .as_secs_f64();
456        self.stats.end_time = end;
457
458        info!(
459            items = self.items.len(),
460            requests = self.stats.requests_count,
461            elapsed = format!("{:.2}s", self.stats.elapsed_seconds()),
462            "crawl complete"
463        );
464
465        Ok(self.stats.clone())
466    }
467
468    async fn process_request(&mut self, request: Request) {
469        // Robots.txt check
470        if let Some(ref mut rm) = self.robots_manager {
471            let sid = if request.sid.is_empty() {
472                self.session_manager
473                    .default_session_id()
474                    .unwrap_or("default")
475                    .to_owned()
476            } else {
477                request.sid.clone()
478            };
479            if !rm
480                .can_fetch(&request.url, &sid, &self.session_manager)
481                .await
482            {
483                self.stats.robots_disallowed_count += 1;
484                debug!(url = %request.url, "disallowed by robots.txt");
485                return;
486            }
487        }
488
489        // Cache check
490        if let Some(ref cm) = self.cache_manager {
491            if let Some(fp) = request.fingerprint() {
492                if let Some(cached) = cm.get(fp) {
493                    self.stats.cache_hits += 1;
494                    self.run_callbacks(&request, cached).await;
495                    return;
496                }
497                self.stats.cache_misses += 1;
498            }
499        }
500
501        // Fetch
502        let sid = if request.sid.is_empty() {
503            self.session_manager
504                .default_session_id()
505                .unwrap_or("default")
506                .to_owned()
507        } else {
508            request.sid.clone()
509        };
510
511        self.stats.increment_requests_count(&sid);
512
513        let response = match self.session_manager.fetch(&request).await {
514            Ok(resp) => resp,
515            Err(e) => {
516                self.stats.failed_requests_count += 1;
517                error!(url = %request.url, error = %e, "fetch failed");
518                self.spider.on_error(&request, &e);
519                return;
520            }
521        };
522
523        self.stats.increment_status(response.status);
524        self.stats
525            .increment_response_bytes(&request.domain(), response.body.len() as u64);
526
527        // Cache response
528        if let Some(ref cm) = self.cache_manager {
529            if let Some(fp) = request.fingerprint() {
530                let _ = cm.put(fp, &response, "GET");
531            }
532        }
533
534        // Blocked check
535        if self.spider.is_blocked(&response) {
536            self.stats.blocked_requests_count += 1;
537            if request.retry_count < self.spider.max_blocked_retries() {
538                let mut retry = request.copy_without_callback();
539                retry.retry_count += 1;
540                retry.priority -= 1;
541                retry.dont_filter = true;
542                debug!(url = %retry.url, retry = retry.retry_count, "retrying blocked request");
543                self.scheduler.enqueue(retry);
544            } else {
545                warn!(url = %request.url, "max blocked retries exceeded");
546            }
547            return;
548        }
549
550        self.run_callbacks(&request, response).await;
551    }
552
553    async fn run_callbacks(&mut self, request: &Request, response: Response) {
554        let outputs = if let Some(ref callback) = request.callback {
555            callback(response)
556        } else {
557            self.spider.parse(response)
558        };
559
560        for output in outputs {
561            match output {
562                SpiderOutput::Item(item) => {
563                    if let Some(processed) = self.spider.on_scraped_item(item) {
564                        self.stats.items_scraped += 1;
565                        if let Some(ref tx) = self.item_sender {
566                            let _ = tx.send(processed.clone());
567                        }
568                        self.items.push(processed);
569                    } else {
570                        self.stats.items_dropped += 1;
571                    }
572                }
573                SpiderOutput::FollowRequest(req) => {
574                    if self.is_domain_allowed(&req) {
575                        self.scheduler.enqueue(req);
576                    } else {
577                        self.stats.offsite_requests_count += 1;
578                    }
579                }
580            }
581        }
582    }
583
584    fn should_checkpoint(&self) -> bool {
585        let Some(ref cm) = self.checkpoint_manager else {
586            return false;
587        };
588        if cm.interval_secs == 0.0 {
589            return false;
590        }
591        self.last_checkpoint_time.elapsed().as_secs_f64() >= cm.interval_secs
592    }
593
594    fn save_checkpoint(&mut self) {
595        let Some(ref cm) = self.checkpoint_manager else {
596            return;
597        };
598
599        let (requests, seen) = self.scheduler.snapshot();
600        let data = CheckpointData {
601            request_urls: requests.iter().map(|r| r.url.clone()).collect(),
602            seen_fingerprints: seen.iter().cloned().collect(),
603        };
604
605        if let Err(e) = cm.save(&data) {
606            error!(error = %e, "failed to save checkpoint");
607        }
608        self.last_checkpoint_time = Instant::now();
609    }
610
611    /// Returns a reference to the collected scraped items. You can call this
612    /// during or after the crawl to inspect what has been scraped so far.
613    pub fn items(&self) -> &ItemList {
614        &self.items
615    }
616
617    /// Returns a reference to the current crawl statistics. Like `items()`,
618    /// this is available both during and after the crawl for monitoring
619    /// progress.
620    pub fn stats(&self) -> &CrawlStats {
621        &self.stats
622    }
623
624    /// Creates a streaming receiver that yields items as they are scraped.
625    ///
626    /// Call this before [`crawl()`](CrawlerEngine::crawl) to get an unbounded
627    /// receiver. Each item passes through `on_scraped_item()` and is sent
628    /// to both the receiver and the internal `ItemList`.
629    ///
630    /// This is the Rust equivalent of Python's `async for item in spider.stream()`.
631    ///
632    /// # Example
633    ///
634    /// ```rust,ignore
635    /// let mut engine = CrawlerEngine::new(&spider, None, 0.0)?;
636    /// let mut rx = engine.stream();
637    ///
638    /// // Spawn the crawl in the background
639    /// let crawl_handle = tokio::spawn(async move {
640    ///     engine.crawl().await
641    /// });
642    ///
643    /// // Process items as they arrive
644    /// while let Some(item) = rx.recv().await {
645    ///     println!("Got item: {}", item);
646    /// }
647    ///
648    /// let stats = crawl_handle.await??;
649    /// ```
650    pub fn stream(&mut self) -> tokio::sync::mpsc::UnboundedReceiver<serde_json::Value> {
651        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
652        self.item_sender = Some(tx);
653        rx
654    }
655}