Skip to main content

spider_core/
builder.rs

1//! # Builder Module
2//!
3//! Provides the `CrawlerBuilder`, a fluent API for constructing and configuring
4//! `Crawler` instances with customizable settings and components.
5//!
6//! ## Overview
7//!
8//! The `CrawlerBuilder` simplifies the process of assembling various `spider-core`
9//! components into a fully configured web crawler. It provides a flexible,
10//! ergonomic interface for setting up all aspects of the crawling process.
11//!
12//! ## Key Features
13//!
14//! - **Concurrency Configuration**: Control the number of concurrent downloads,
15//!   parsing workers, and pipeline processors
16//! - **Component Registration**: Attach custom downloaders, middlewares, and pipelines
17//! - **Checkpoint Management**: Configure automatic saving and loading of crawl state (feature: `core-checkpoint`)
18//! - **Statistics Integration**: Initialize and connect the `StatCollector`
19//! - **Default Handling**: Automatic addition of essential middlewares when needed
20//!
21//! ## Example
22//!
23//! ```rust,ignore
24//! use spider_core::CrawlerBuilder;
25//! use spider_middleware::rate_limit::RateLimitMiddleware;
26//! use spider_pipeline::console_writer::ConsoleWriterPipeline;
27//!
28//! async fn setup_crawler() -> Result<(), SpiderError> {
29//!     let crawler = CrawlerBuilder::new(MySpider)
30//!         .max_concurrent_downloads(10)
31//!         .max_parser_workers(4)
32//!         .add_middleware(RateLimitMiddleware::default())
33//!         .add_pipeline(ConsoleWriterPipeline::new())
34//!         .with_checkpoint_path("./crawl.checkpoint")
35//!         .build()
36//!         .await?;
37//!
38//!     crawler.start_crawl().await
39//! }
40//! ```
41
42use crate::Downloader;
43use crate::ReqwestClientDownloader;
44use crate::scheduler::Scheduler;
45use crate::spider::Spider;
46use num_cpus;
47use spider_middleware::middleware::Middleware;
48use spider_pipeline::pipeline::Pipeline;
49use spider_util::error::SpiderError;
50use std::marker::PhantomData;
51use std::path::{Path, PathBuf};
52use std::sync::Arc;
53use std::time::Duration;
54
55use super::Crawler;
56use crate::stats::StatCollector;
57#[cfg(feature = "checkpoint")]
58use log::{debug, warn};
59
60#[cfg(feature = "checkpoint")]
61use crate::SchedulerCheckpoint;
62#[cfg(feature = "checkpoint")]
63use rmp_serde;
64#[cfg(feature = "checkpoint")]
65use std::fs;
66
67/// Configuration for the crawler's concurrency settings.
68pub struct CrawlerConfig {
69    /// The maximum number of concurrent downloads.
70    pub max_concurrent_downloads: usize,
71    /// The number of workers dedicated to parsing responses.
72    pub parser_workers: usize,
73    /// The maximum number of concurrent item processing pipelines.
74    pub max_concurrent_pipelines: usize,
75    /// The capacity of communication channels between components.
76    pub channel_capacity: usize,
77}
78
79impl Default for CrawlerConfig {
80    fn default() -> Self {
81        CrawlerConfig {
82            max_concurrent_downloads: num_cpus::get().max(16),
83            parser_workers: num_cpus::get().clamp(4, 16),
84            max_concurrent_pipelines: num_cpus::get().min(8),
85            channel_capacity: 1000,
86        }
87    }
88}
89
90pub struct CrawlerBuilder<S: Spider, D>
91where
92    D: Downloader,
93{
94    crawler_config: CrawlerConfig,
95    downloader: D,
96    spider: Option<S>,
97    middlewares: Vec<Box<dyn Middleware<D::Client> + Send + Sync>>,
98    item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
99    checkpoint_path: Option<PathBuf>,
100    checkpoint_interval: Option<Duration>,
101    _phantom: PhantomData<S>,
102}
103
104impl<S: Spider> Default for CrawlerBuilder<S, ReqwestClientDownloader> {
105    fn default() -> Self {
106        Self {
107            crawler_config: CrawlerConfig::default(),
108            downloader: ReqwestClientDownloader::default(),
109            spider: None,
110            middlewares: Vec::new(),
111            item_pipelines: Vec::new(),
112            checkpoint_path: None,
113            checkpoint_interval: None,
114            _phantom: PhantomData,
115        }
116    }
117}
118
119impl<S: Spider> CrawlerBuilder<S, ReqwestClientDownloader> {
120    /// Creates a new `CrawlerBuilder` for a given spider with the default ReqwestClientDownloader.
121    pub fn new(spider: S) -> Self {
122        Self {
123            spider: Some(spider),
124            ..Default::default()
125        }
126    }
127}
128
129impl<S: Spider, D: Downloader> CrawlerBuilder<S, D> {
130    /// Sets the maximum number of concurrent downloads.
131    pub fn max_concurrent_downloads(mut self, limit: usize) -> Self {
132        self.crawler_config.max_concurrent_downloads = limit;
133        self
134    }
135
136    /// Sets the maximum number of concurrent parser workers.
137    pub fn max_parser_workers(mut self, limit: usize) -> Self {
138        self.crawler_config.parser_workers = limit;
139        self
140    }
141
142    /// Sets the maximum number of concurrent pipelines.
143    pub fn max_concurrent_pipelines(mut self, limit: usize) -> Self {
144        self.crawler_config.max_concurrent_pipelines = limit;
145        self
146    }
147
148    /// Sets the capacity of communication channels between components.
149    pub fn channel_capacity(mut self, capacity: usize) -> Self {
150        self.crawler_config.channel_capacity = capacity;
151        self
152    }
153
154    /// Sets a custom downloader for the crawler.
155    pub fn downloader(mut self, downloader: D) -> Self {
156        self.downloader = downloader;
157        self
158    }
159
160    /// Adds a middleware to the crawler.
161    pub fn add_middleware<M>(mut self, middleware: M) -> Self
162    where
163        M: Middleware<D::Client> + Send + Sync + 'static,
164    {
165        self.middlewares.push(Box::new(middleware));
166        self
167    }
168
169    /// Adds an item pipeline to the crawler.
170    pub fn add_pipeline<P>(mut self, pipeline: P) -> Self
171    where
172        P: Pipeline<S::Item> + 'static,
173    {
174        self.item_pipelines.push(Box::new(pipeline));
175        self
176    }
177
178    /// Enables checkpointing and sets the path for the checkpoint file.
179    pub fn with_checkpoint_path<P: AsRef<Path>>(mut self, path: P) -> Self {
180        self.checkpoint_path = Some(path.as_ref().to_path_buf());
181        self
182    }
183
184    /// Sets the interval for periodic checkpointing.
185    pub fn with_checkpoint_interval(mut self, interval: Duration) -> Self {
186        self.checkpoint_interval = Some(interval);
187        self
188    }
189
190    /// Builds the `Crawler` instance, initializing and passing the `StatCollector` along with other configured components.
191    #[allow(unused_variables)]
192    pub async fn build(mut self) -> Result<Crawler<S, D::Client>, SpiderError>
193    where
194        D: Downloader + Send + Sync + 'static,
195        D::Client: Send + Sync + Clone,
196        S::Item: Send + Sync + 'static,
197    {
198        let spider = self.validate_and_get_spider()?;
199        self.ensure_default_pipeline();
200
201        #[cfg(all(feature = "checkpoint", feature = "cookie-store"))]
202        {
203            let (initial_scheduler_state, loaded_cookie_store) =
204                self.load_and_restore_checkpoint_state().await?;
205            let (scheduler_arc, req_rx) = Scheduler::new(initial_scheduler_state);
206            let downloader_arc = Arc::new(self.downloader);
207            let stats = Arc::new(StatCollector::new());
208            let crawler = Crawler::new(
209                scheduler_arc,
210                req_rx,
211                downloader_arc,
212                self.middlewares,
213                spider,
214                self.item_pipelines,
215                self.crawler_config.max_concurrent_downloads,
216                self.crawler_config.parser_workers,
217                self.crawler_config.max_concurrent_pipelines,
218                self.crawler_config.channel_capacity,
219                self.checkpoint_path.take(),
220                self.checkpoint_interval,
221                stats,
222                Arc::new(tokio::sync::RwLock::new(
223                    loaded_cookie_store.unwrap_or_default(),
224                )),
225            );
226            Ok(crawler)
227        }
228
229        #[cfg(all(feature = "checkpoint", not(feature = "cookie-store")))]
230        {
231            let (initial_scheduler_state, _loaded_cookie_store) =
232                self.load_and_restore_checkpoint_state().await?;
233            let (scheduler_arc, req_rx) = Scheduler::new(initial_scheduler_state);
234            let downloader_arc = Arc::new(self.downloader);
235            let stats = Arc::new(StatCollector::new());
236            let crawler = Crawler::new(
237                scheduler_arc,
238                req_rx,
239                downloader_arc,
240                self.middlewares,
241                spider,
242                self.item_pipelines,
243                self.crawler_config.max_concurrent_downloads,
244                self.crawler_config.parser_workers,
245                self.crawler_config.max_concurrent_pipelines,
246                self.crawler_config.channel_capacity,
247                self.checkpoint_path.take(),
248                self.checkpoint_interval,
249                stats,
250            );
251            return Ok(crawler);
252        }
253
254        #[cfg(all(not(feature = "checkpoint"), feature = "cookie-store"))]
255        {
256            let (_initial_scheduler_state, loaded_cookie_store) =
257                self.load_and_restore_checkpoint_state().await?;
258            let (scheduler_arc, req_rx) = Scheduler::new(None::<()>);
259            let downloader_arc = Arc::new(self.downloader);
260            let stats = Arc::new(StatCollector::new());
261            let crawler = Crawler::new(
262                scheduler_arc,
263                req_rx,
264                downloader_arc,
265                self.middlewares,
266                spider,
267                self.item_pipelines,
268                self.crawler_config.max_concurrent_downloads,
269                self.crawler_config.parser_workers,
270                self.crawler_config.max_concurrent_pipelines,
271                self.crawler_config.channel_capacity,
272                stats,
273                Arc::new(tokio::sync::RwLock::new(
274                    loaded_cookie_store.unwrap_or_default(),
275                )),
276            );
277            Ok(crawler)
278        }
279
280        #[cfg(all(not(feature = "checkpoint"), not(feature = "cookie-store")))]
281        {
282            let (_initial_scheduler_state, _loaded_cookie_store) =
283                self.load_and_restore_checkpoint_state().await?;
284            let (scheduler_arc, req_rx) = Scheduler::new(None::<()>);
285            let downloader_arc = Arc::new(self.downloader);
286            let stats = Arc::new(StatCollector::new());
287            let crawler = Crawler::new(
288                scheduler_arc,
289                req_rx,
290                downloader_arc,
291                self.middlewares,
292                spider,
293                self.item_pipelines,
294                self.crawler_config.max_concurrent_downloads,
295                self.crawler_config.parser_workers,
296                self.crawler_config.max_concurrent_pipelines,
297                self.crawler_config.channel_capacity,
298                stats,
299            );
300            Ok(crawler)
301        }
302    }
303
304    #[cfg(all(feature = "checkpoint", feature = "cookie-store"))]
305    async fn load_and_restore_checkpoint_state(
306        &mut self,
307    ) -> Result<(Option<SchedulerCheckpoint>, Option<crate::CookieStore>), SpiderError> {
308        let mut initial_scheduler_state = None;
309        let mut loaded_pipelines_state = None;
310        let mut loaded_cookie_store = None;
311
312        if let Some(path) = &self.checkpoint_path {
313            debug!("Attempting to load checkpoint from {:?}", path);
314            match fs::read(path) {
315                Ok(bytes) => match rmp_serde::from_slice::<crate::Checkpoint>(&bytes) {
316                    Ok(checkpoint) => {
317                        initial_scheduler_state = Some(checkpoint.scheduler);
318                        loaded_pipelines_state = Some(checkpoint.pipelines);
319
320                        loaded_cookie_store = Some(checkpoint.cookie_store);
321                    }
322                    Err(e) => warn!("Failed to deserialize checkpoint from {:?}: {}", path, e),
323                },
324                Err(e) => warn!("Failed to read checkpoint file {:?}: {}", path, e),
325            }
326        }
327
328        if let Some(pipeline_states) = loaded_pipelines_state {
329            for (name, state) in pipeline_states {
330                if let Some(pipeline) = self.item_pipelines.iter().find(|p| p.name() == name) {
331                    pipeline.restore_state(state).await?;
332                } else {
333                    warn!("Checkpoint contains state for unknown pipeline: {}", name);
334                }
335            }
336        }
337
338        Ok((initial_scheduler_state, loaded_cookie_store))
339    }
340
341    #[cfg(all(feature = "checkpoint", not(feature = "cookie-store")))]
342    async fn load_and_restore_checkpoint_state(
343        &mut self,
344    ) -> Result<(Option<SchedulerCheckpoint>, Option<()>), SpiderError> {
345        let mut initial_scheduler_state = None;
346        let mut loaded_pipelines_state = None;
347
348        if let Some(path) = &self.checkpoint_path {
349            debug!("Attempting to load checkpoint from {:?}", path);
350            match fs::read(path) {
351                Ok(bytes) => match rmp_serde::from_slice::<crate::Checkpoint>(&bytes) {
352                    Ok(checkpoint) => {
353                        initial_scheduler_state = Some(checkpoint.scheduler);
354                        loaded_pipelines_state = Some(checkpoint.pipelines);
355                    }
356                    Err(e) => warn!("Failed to deserialize checkpoint from {:?}: {}", path, e),
357                },
358                Err(e) => warn!("Failed to read checkpoint file {:?}: {}", path, e),
359            }
360        }
361
362        if let Some(pipeline_states) = loaded_pipelines_state {
363            for (name, state) in pipeline_states {
364                if let Some(pipeline) = self.item_pipelines.iter().find(|p| p.name() == name) {
365                    pipeline.restore_state(state).await?;
366                } else {
367                    warn!("Checkpoint contains state for unknown pipeline: {}", name);
368                }
369            }
370        }
371
372        Ok((initial_scheduler_state, None))
373    }
374
375    #[cfg(all(not(feature = "checkpoint"), not(feature = "cookie-store")))]
376    async fn load_and_restore_checkpoint_state(
377        &mut self,
378    ) -> Result<(Option<()>, Option<()>), SpiderError> {
379        // When both checkpoint and cookie-store features are disabled, return None for both values
380        Ok((None, None))
381    }
382
383    #[cfg(all(not(feature = "checkpoint"), feature = "cookie-store"))]
384    async fn load_and_restore_checkpoint_state(
385        &mut self,
386    ) -> Result<(Option<()>, Option<crate::CookieStore>), SpiderError> {
387        // When checkpoint is disabled but cookie-store is enabled, initialize default cookie store
388        Ok((None, Some(crate::CookieStore::default())))
389    }
390
391    fn validate_and_get_spider(&mut self) -> Result<S, SpiderError> {
392        if self.crawler_config.max_concurrent_downloads == 0 {
393            return Err(SpiderError::ConfigurationError(
394                "max_concurrent_downloads must be greater than 0.".to_string(),
395            ));
396        }
397        if self.crawler_config.parser_workers == 0 {
398            return Err(SpiderError::ConfigurationError(
399                "parser_workers must be greater than 0.".to_string(),
400            ));
401        }
402        self.spider.take().ok_or_else(|| {
403            SpiderError::ConfigurationError("Crawler must have a spider.".to_string())
404        })
405    }
406
407    fn ensure_default_pipeline(&mut self) {
408        if self.item_pipelines.is_empty() {
409            use spider_pipeline::console::ConsolePipeline;
410            self.item_pipelines.push(Box::new(ConsolePipeline::new()));
411        }
412    }
413}
414