Skip to main content

spider_core/
builder.rs

1//! # Builder Module
2//!
3//! Provides the [`CrawlerBuilder`], a fluent API for constructing and configuring
4//! [`Crawler`](crate::Crawler) instances with customizable settings and components.
5//!
6//! ## Overview
7//!
8//! The [`CrawlerBuilder`] simplifies the process of assembling various `spider-core`
9//! components into a fully configured web crawler. It provides a flexible,
10//! ergonomic interface for setting up all aspects of the crawling process.
11//!
12//! ## Key Features
13//!
14//! - **Concurrency Configuration**: Control the number of concurrent downloads,
15//!   parsing workers, and pipeline processors
16//! - **Component Registration**: Attach custom downloaders, middlewares, and pipelines
17//! - **Checkpoint Management**: Configure automatic saving and loading of crawl state
18//!   (requires `checkpoint` feature)
19//! - **Statistics Integration**: Initialize and connect the [`StatCollector`](crate::stats::StatCollector)
20//! - **Default Handling**: Automatic addition of essential middlewares when needed
21//!
22//! ## Example
23//!
24//! ```rust,ignore
25//! use spider_core::CrawlerBuilder;
26//! use spider_middleware::rate_limit::RateLimitMiddleware;
27//! use spider_pipeline::console::ConsolePipeline;
28//! use spider_util::error::SpiderError;
29//!
30//! async fn setup_crawler() -> Result<(), SpiderError> {
31//!     let crawler = CrawlerBuilder::new(MySpider)
32//!         .max_concurrent_downloads(10)
33//!         .max_parser_workers(4)
34//!         .add_middleware(RateLimitMiddleware::default())
35//!         .add_pipeline(ConsolePipeline::new())
36//!         .with_checkpoint_path("./crawl.checkpoint")
37//!         .build()
38//!         .await?;
39//!
40//!     crawler.start_crawl().await
41//! }
42//! ```
43
44use crate::Downloader;
45use crate::ReqwestClientDownloader;
46use crate::scheduler::Scheduler;
47use crate::spider::Spider;
48use num_cpus;
49use spider_middleware::middleware::Middleware;
50use spider_pipeline::pipeline::Pipeline;
51use spider_util::error::SpiderError;
52use std::marker::PhantomData;
53use std::path::{Path, PathBuf};
54use std::sync::Arc;
55use std::time::Duration;
56
57use super::Crawler;
58use crate::stats::StatCollector;
59#[cfg(feature = "checkpoint")]
60use log::{debug, warn};
61
62#[cfg(feature = "checkpoint")]
63use crate::SchedulerCheckpoint;
64#[cfg(feature = "checkpoint")]
65use rmp_serde;
66#[cfg(feature = "checkpoint")]
67use std::fs;
68
69/// Configuration for the crawler's concurrency settings.
70///
71/// This struct holds tunable parameters that control the parallelism
72/// and throughput of the crawler.
73pub struct CrawlerConfig {
74    /// The maximum number of concurrent downloads.
75    pub max_concurrent_downloads: usize,
76    /// The number of workers dedicated to parsing responses.
77    pub parser_workers: usize,
78    /// The maximum number of concurrent item processing pipelines.
79    pub max_concurrent_pipelines: usize,
80    /// The capacity of communication channels between components.
81    pub channel_capacity: usize,
82}
83
84impl Default for CrawlerConfig {
85    fn default() -> Self {
86        CrawlerConfig {
87            max_concurrent_downloads: num_cpus::get().max(16),
88            parser_workers: num_cpus::get().clamp(4, 16),
89            max_concurrent_pipelines: num_cpus::get().min(8),
90            channel_capacity: 1000,
91        }
92    }
93}
94
95/// A fluent builder for constructing [`Crawler`] instances.
96///
97/// `CrawlerBuilder` provides a chainable API for configuring all aspects
98/// of a web crawler, including concurrency settings, middleware, pipelines,
99/// and checkpoint options.
100///
101/// ## Type Parameters
102///
103/// - `S`: The [`Spider`] implementation type
104/// - `D`: The [`Downloader`] implementation type
105///
106/// ## Example
107///
108/// ```rust,ignore
109/// # use spider_core::{CrawlerBuilder, Spider};
110/// # use spider_util::{response::Response, error::SpiderError, item::ParseOutput};
111/// # struct MySpider;
112/// # #[async_trait::async_trait]
113/// # impl Spider for MySpider {
114/// #     type Item = String;
115/// #     type State = ();
116/// #     fn start_urls(&self) -> Vec<&'static str> { vec![] }
117/// #     async fn parse(&self, response: Response, state: &Self::State) -> Result<ParseOutput<Self::Item>, SpiderError> { todo!() }
118/// # }
119/// let builder = CrawlerBuilder::new(MySpider)
120///     .max_concurrent_downloads(8)
121///     .max_parser_workers(4);
122/// ```
123pub struct CrawlerBuilder<S: Spider, D>
124where
125    D: Downloader,
126{
127    config: CrawlerConfig,
128    downloader: D,
129    spider: Option<S>,
130    middlewares: Vec<Box<dyn Middleware<D::Client> + Send + Sync>>,
131    pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
132    checkpoint_path: Option<PathBuf>,
133    checkpoint_interval: Option<Duration>,
134    _phantom: PhantomData<S>,
135}
136
137impl<S: Spider> Default for CrawlerBuilder<S, ReqwestClientDownloader> {
138    fn default() -> Self {
139        Self {
140            config: CrawlerConfig::default(),
141            downloader: ReqwestClientDownloader::default(),
142            spider: None,
143            middlewares: Vec::new(),
144            pipelines: Vec::new(),
145            checkpoint_path: None,
146            checkpoint_interval: None,
147            _phantom: PhantomData,
148        }
149    }
150}
151
152impl<S: Spider> CrawlerBuilder<S, ReqwestClientDownloader> {
153    /// Creates a new `CrawlerBuilder` for a given spider with the default [`ReqwestClientDownloader`].
154    ///
155    /// ## Example
156    ///
157    /// ```rust,ignore
158    /// let crawler = CrawlerBuilder::new(MySpider)
159    ///     .build()
160    ///     .await?;
161    /// ```
162    pub fn new(spider: S) -> Self {
163        Self {
164            spider: Some(spider),
165            ..Default::default()
166        }
167    }
168}
169
170impl<S: Spider, D: Downloader> CrawlerBuilder<S, D> {
171    /// Sets the maximum number of concurrent downloads.
172    ///
173    /// This controls how many HTTP requests can be in-flight simultaneously.
174    /// Higher values increase throughput but may overwhelm target servers.
175    ///
176    /// ## Default
177    ///
178    /// Defaults to the number of CPU cores, with a minimum of 16.
179    pub fn max_concurrent_downloads(mut self, limit: usize) -> Self {
180        self.config.max_concurrent_downloads = limit;
181        self
182    }
183
184    /// Sets the number of worker tasks dedicated to parsing responses.
185    ///
186    /// Parser workers process HTTP responses concurrently, calling the
187    /// spider's [`parse`](Spider::parse) method to extract items and
188    /// discover new URLs.
189    ///
190    /// ## Default
191    ///
192    /// Defaults to the number of CPU cores, clamped between 4 and 16.
193    pub fn max_parser_workers(mut self, limit: usize) -> Self {
194        self.config.parser_workers = limit;
195        self
196    }
197
198    /// Sets the maximum number of concurrent item processing pipelines.
199    ///
200    /// This controls how many items can be processed by pipelines simultaneously.
201    ///
202    /// ## Default
203    ///
204    /// Defaults to the number of CPU cores, with a maximum of 8.
205    pub fn max_concurrent_pipelines(mut self, limit: usize) -> Self {
206        self.config.max_concurrent_pipelines = limit;
207        self
208    }
209
210    /// Sets the capacity of internal communication channels.
211    ///
212    /// This controls the buffer size for channels between the downloader,
213    /// parser, and pipeline components. Higher values can improve throughput
214    /// at the cost of increased memory usage.
215    ///
216    /// ## Default
217    ///
218    /// Defaults to 1000.
219    pub fn channel_capacity(mut self, capacity: usize) -> Self {
220        self.config.channel_capacity = capacity;
221        self
222    }
223
224    /// Sets a custom downloader implementation.
225    ///
226    /// Use this method to provide a custom [`Downloader`] implementation
227    /// instead of the default [`ReqwestClientDownloader`].
228    pub fn downloader(mut self, downloader: D) -> Self {
229        self.downloader = downloader;
230        self
231    }
232
233    /// Adds a middleware to the crawler's middleware stack.
234    ///
235    /// Middlewares intercept and modify requests before they are sent and
236    /// responses after they are received. They are executed in the order
237    /// they are added.
238    ///
239    /// ## Example
240    ///
241    /// ```rust,ignore
242    /// let crawler = CrawlerBuilder::new(MySpider)
243    ///     .add_middleware(RateLimitMiddleware::default())
244    ///     .add_middleware(RetryMiddleware::new())
245    ///     .build()
246    ///     .await?;
247    /// ```
248    pub fn add_middleware<M>(mut self, middleware: M) -> Self
249    where
250        M: Middleware<D::Client> + Send + Sync + 'static,
251    {
252        self.middlewares.push(Box::new(middleware));
253        self
254    }
255
256    /// Adds a pipeline to the crawler's pipeline stack.
257    ///
258    /// Pipelines process scraped items after they are extracted by the spider.
259    /// They can be used for validation, transformation, deduplication, or
260    /// storage (e.g., writing to files or databases).
261    ///
262    /// ## Example
263    ///
264    /// ```rust,ignore
265    /// let crawler = CrawlerBuilder::new(MySpider)
266    ///     .add_pipeline(ConsolePipeline::new())
267    ///     .add_pipeline(JsonPipeline::new("output.json")?)
268    ///     .build()
269    ///     .await?;
270    /// ```
271    pub fn add_pipeline<P>(mut self, pipeline: P) -> Self
272    where
273        P: Pipeline<S::Item> + 'static,
274    {
275        self.pipelines.push(Box::new(pipeline));
276        self
277    }
278
279    /// Sets the path for saving and loading checkpoints.
280    ///
281    /// When enabled, the crawler periodically saves its state to this file,
282    /// allowing crawls to be resumed after interruption.
283    ///
284    /// Requires the `checkpoint` feature to be enabled.
285    pub fn with_checkpoint_path<P: AsRef<Path>>(mut self, path: P) -> Self {
286        self.checkpoint_path = Some(path.as_ref().to_path_buf());
287        self
288    }
289
290    /// Sets the interval between automatic checkpoint saves.
291    ///
292    /// When enabled, the crawler saves its state at this interval.
293    /// Shorter intervals provide more frequent recovery points but may
294    /// impact performance.
295    ///
296    /// Requires the `checkpoint` feature to be enabled.
297    pub fn with_checkpoint_interval(mut self, interval: Duration) -> Self {
298        self.checkpoint_interval = Some(interval);
299        self
300    }
301
302    /// Builds the [`Crawler`] instance.
303    ///
304    /// This method finalizes the crawler configuration and initializes all
305    /// components. It performs validation and sets up default values where
306    /// necessary.
307    ///
308    /// # Errors
309    ///
310    /// Returns a [`SpiderError::ConfigurationError`] if:
311    /// - `max_concurrent_downloads` is 0
312    /// - `parser_workers` is 0
313    /// - No spider was provided to the builder
314    ///
315    /// # Example
316    ///
317    /// ```rust,ignore
318    /// let crawler = CrawlerBuilder::new(MySpider)
319    ///     .max_concurrent_downloads(10)
320    ///     .build()
321    ///     .await?;
322    /// ```
323    #[allow(unused_variables)]
324    pub async fn build(mut self) -> Result<Crawler<S, D::Client>, SpiderError>
325    where
326        D: Downloader + Send + Sync + 'static,
327        D::Client: Send + Sync + Clone,
328        S::Item: Send + Sync + 'static,
329    {
330        let spider = self.take_spider()?;
331        self.init_default_pipeline();
332
333        #[cfg(all(feature = "checkpoint", feature = "cookie-store"))]
334        {
335            let (scheduler_state, cookie_store) = self.restore_checkpoint().await?;
336            let (scheduler_arc, req_rx) = Scheduler::new(scheduler_state);
337            let downloader_arc = Arc::new(self.downloader);
338            let stats = Arc::new(StatCollector::new());
339            let crawler = Crawler::new(
340                scheduler_arc,
341                req_rx,
342                downloader_arc,
343                self.middlewares,
344                spider,
345                self.pipelines,
346                self.config.max_concurrent_downloads,
347                self.config.parser_workers,
348                self.config.max_concurrent_pipelines,
349                self.config.channel_capacity,
350                self.checkpoint_path.take(),
351                self.checkpoint_interval,
352                stats,
353                Arc::new(tokio::sync::RwLock::new(
354                    cookie_store.unwrap_or_default(),
355                )),
356            );
357            Ok(crawler)
358        }
359
360        #[cfg(all(feature = "checkpoint", not(feature = "cookie-store")))]
361        {
362            let (scheduler_state, _cookie_store) = self.restore_checkpoint().await?;
363            let (scheduler_arc, req_rx) = Scheduler::new(scheduler_state);
364            let downloader_arc = Arc::new(self.downloader);
365            let stats = Arc::new(StatCollector::new());
366            let crawler = Crawler::new(
367                scheduler_arc,
368                req_rx,
369                downloader_arc,
370                self.middlewares,
371                spider,
372                self.pipelines,
373                self.config.max_concurrent_downloads,
374                self.config.parser_workers,
375                self.config.max_concurrent_pipelines,
376                self.config.channel_capacity,
377                self.checkpoint_path.take(),
378                self.checkpoint_interval,
379                stats,
380            );
381            Ok(crawler)
382        }
383
384        #[cfg(all(not(feature = "checkpoint"), feature = "cookie-store"))]
385        {
386            let (_scheduler_state, cookie_store) = self.restore_checkpoint().await?;
387            let (scheduler_arc, req_rx) = Scheduler::new(None::<()>);
388            let downloader_arc = Arc::new(self.downloader);
389            let stats = Arc::new(StatCollector::new());
390            let crawler = Crawler::new(
391                scheduler_arc,
392                req_rx,
393                downloader_arc,
394                self.middlewares,
395                spider,
396                self.pipelines,
397                self.config.max_concurrent_downloads,
398                self.config.parser_workers,
399                self.config.max_concurrent_pipelines,
400                self.config.channel_capacity,
401                stats,
402                Arc::new(tokio::sync::RwLock::new(
403                    cookie_store.unwrap_or_default(),
404                )),
405            );
406            Ok(crawler)
407        }
408
409        #[cfg(all(not(feature = "checkpoint"), not(feature = "cookie-store")))]
410        {
411            let (_scheduler_state, _cookie_store) = self.restore_checkpoint().await?;
412            let (scheduler_arc, req_rx) = Scheduler::new(None::<()>);
413            let downloader_arc = Arc::new(self.downloader);
414            let stats = Arc::new(StatCollector::new());
415            let crawler = Crawler::new(
416                scheduler_arc,
417                req_rx,
418                downloader_arc,
419                self.middlewares,
420                spider,
421                self.pipelines,
422                self.config.max_concurrent_downloads,
423                self.config.parser_workers,
424                self.config.max_concurrent_pipelines,
425                self.config.channel_capacity,
426                stats,
427            );
428            Ok(crawler)
429        }
430    }
431
432    /// Restores checkpoint state from disk.
433    ///
434    /// This internal method loads a previously saved checkpoint file and
435    /// restores the scheduler state, pipeline states, and cookie store.
436    ///
437    /// # Returns
438    ///
439    /// Returns a tuple of `(Option<SchedulerCheckpoint>, Option<CookieStore>)`.
440    /// If no checkpoint path is configured or the file doesn't exist, returns
441    /// `None` values.
442    ///
443    /// # Errors
444    ///
445    /// Returns a [`SpiderError`] if pipeline state restoration fails.
446    /// Note: Checkpoint file read/deserialization errors are logged as warnings
447    /// but do not fail the operation—the crawl proceeds without checkpoint data.
448    #[cfg(all(feature = "checkpoint", feature = "cookie-store"))]
449    async fn restore_checkpoint(
450        &mut self,
451    ) -> Result<(Option<SchedulerCheckpoint>, Option<crate::CookieStore>), SpiderError> {
452        let mut scheduler_state = None;
453        let mut pipeline_states = None;
454        let mut cookie_store = None;
455
456        if let Some(path) = &self.checkpoint_path {
457            debug!("Attempting to load checkpoint from {:?}", path);
458            match fs::read(path) {
459                Ok(bytes) => match rmp_serde::from_slice::<crate::Checkpoint>(&bytes) {
460                    Ok(checkpoint) => {
461                        scheduler_state = Some(checkpoint.scheduler);
462                        pipeline_states = Some(checkpoint.pipelines);
463                        cookie_store = Some(checkpoint.cookie_store);
464                    }
465                    Err(e) => warn!("Failed to deserialize checkpoint from {:?}: {}", path, e),
466                },
467                Err(e) => warn!("Failed to read checkpoint file {:?}: {}", path, e),
468            }
469        }
470
471        if let Some(states) = pipeline_states {
472            for (name, state) in states {
473                if let Some(pipeline) = self.pipelines.iter().find(|p| p.name() == name) {
474                    pipeline.restore_state(state).await?;
475                } else {
476                    warn!("Checkpoint contains state for unknown pipeline: {}", name);
477                }
478            }
479        }
480
481        Ok((scheduler_state, cookie_store))
482    }
483
484    /// Restores checkpoint state from disk (without cookie store support).
485    ///
486    /// This internal method loads a previously saved checkpoint file and
487    /// restores the scheduler state and pipeline states.
488    ///
489    /// # Returns
490    ///
491    /// Returns a tuple of `(Option<SchedulerCheckpoint>, Option<()>)`.
492    /// If no checkpoint path is configured or the file doesn't exist, returns
493    /// `None` values.
494    ///
495    /// # Errors
496    ///
497    /// Returns a [`SpiderError`] if pipeline state restoration fails.
498    /// Note: Checkpoint file read/deserialization errors are logged as warnings
499    /// but do not fail the operation—the crawl proceeds without checkpoint data.
500    #[cfg(all(feature = "checkpoint", not(feature = "cookie-store")))]
501    async fn restore_checkpoint(
502        &mut self,
503    ) -> Result<(Option<SchedulerCheckpoint>, Option<()>), SpiderError> {
504        let mut scheduler_state = None;
505        let mut pipeline_states = None;
506
507        if let Some(path) = &self.checkpoint_path {
508            debug!("Attempting to load checkpoint from {:?}", path);
509            match fs::read(path) {
510                Ok(bytes) => match rmp_serde::from_slice::<crate::Checkpoint>(&bytes) {
511                    Ok(checkpoint) => {
512                        scheduler_state = Some(checkpoint.scheduler);
513                        pipeline_states = Some(checkpoint.pipelines);
514                    }
515                    Err(e) => warn!("Failed to deserialize checkpoint from {:?}: {}", path, e),
516                },
517                Err(e) => warn!("Failed to read checkpoint file {:?}: {}", path, e),
518            }
519        }
520
521        if let Some(states) = pipeline_states {
522            for (name, state) in states {
523                if let Some(pipeline) = self.pipelines.iter().find(|p| p.name() == name) {
524                    pipeline.restore_state(state).await?;
525                } else {
526                    warn!("Checkpoint contains state for unknown pipeline: {}", name);
527                }
528            }
529        }
530
531        Ok((scheduler_state, None))
532    }
533
534    /// No-op checkpoint restoration when checkpoint feature is disabled.
535    #[cfg(all(not(feature = "checkpoint"), not(feature = "cookie-store")))]
536    async fn restore_checkpoint(&mut self) -> Result<(Option<()>, Option<()>), SpiderError> {
537        Ok((None, None))
538    }
539
540    /// Returns cookie store default when only cookie-store feature is enabled.
541    #[cfg(all(not(feature = "checkpoint"), feature = "cookie-store"))]
542    async fn restore_checkpoint(&mut self) -> Result<(Option<()>, Option<crate::CookieStore>), SpiderError> {
543        Ok((None, Some(crate::CookieStore::default())))
544    }
545
546    /// Extracts the spider from the builder.
547    ///
548    /// # Errors
549    ///
550    /// Returns a [`SpiderError::ConfigurationError`] if:
551    /// - `max_concurrent_downloads` is 0
552    /// - `parser_workers` is 0
553    /// - No spider was provided
554    fn take_spider(&mut self) -> Result<S, SpiderError> {
555        if self.config.max_concurrent_downloads == 0 {
556            return Err(SpiderError::ConfigurationError(
557                "max_concurrent_downloads must be greater than 0.".to_string(),
558            ));
559        }
560        if self.config.parser_workers == 0 {
561            return Err(SpiderError::ConfigurationError(
562                "parser_workers must be greater than 0.".to_string(),
563            ));
564        }
565        self.spider.take().ok_or_else(|| {
566            SpiderError::ConfigurationError("Crawler must have a spider.".to_string())
567        })
568    }
569
570    /// Initializes the pipeline stack with a default [`ConsolePipeline`] if empty.
571    ///
572    /// This ensures that scraped items are always output somewhere, even if
573    /// no explicit pipelines are configured by the user.
574    fn init_default_pipeline(&mut self) {
575        if self.pipelines.is_empty() {
576            use spider_pipeline::console::ConsolePipeline;
577            self.pipelines.push(Box::new(ConsolePipeline::new()));
578        }
579    }
580}
581