Skip to main content

spider_core/
builder.rs

1//! # Builder Module
2//!
3//! Provides the `CrawlerBuilder`, a fluent API for constructing and configuring
4//! `Crawler` instances with customizable settings and components.
5//!
6//! ## Overview
7//!
8//! The `CrawlerBuilder` simplifies the process of assembling various `spider-core`
9//! components into a fully configured web crawler. It provides a flexible,
10//! ergonomic interface for setting up all aspects of the crawling process.
11//!
12//! ## Key Features
13//!
14//! - **Concurrency Configuration**: Control the number of concurrent downloads,
15//!   parsing workers, and pipeline processors
16//! - **Component Registration**: Attach custom downloaders, middlewares, and pipelines
17//! - **Checkpoint Management**: Configure automatic saving and loading of crawl state
18//! - **Statistics Integration**: Initialize and connect the `StatCollector`
19//! - **Default Handling**: Automatic addition of essential middlewares when needed
20//!
21//! ## Example
22//!
23//! ```rust,ignore
24//! use spider_core::CrawlerBuilder;
25//! use spider_middleware::rate_limit::RateLimitMiddleware;
26//! use spider_pipeline::console_writer::ConsoleWriterPipeline;
27//!
28//! async fn setup_crawler() -> Result<(), SpiderError> {
29//!     let crawler = CrawlerBuilder::new(MySpider)
30//!         .max_concurrent_downloads(10)
31//!         .max_parser_workers(4)
32//!         .add_middleware(RateLimitMiddleware::default())
33//!         .add_pipeline(ConsoleWriterPipeline::new())
34//!         .with_checkpoint_path("./crawl.checkpoint")
35//!         .build()
36//!         .await?;
37//!
38//!     crawler.start_crawl().await
39//! }
40//! ```
41
42use crate::SchedulerCheckpoint;
43use spider_util::error::SpiderError;
44use spider_middleware::middleware::Middleware;
45use spider_pipeline::pipeline::Pipeline;
46use crate::scheduler::Scheduler;
47use crate::spider::Spider;
48use crate::Downloader;
49use crate::ReqwestClientDownloader;
50use num_cpus;
51use std::fs;
52use std::marker::PhantomData;
53use std::path::{Path, PathBuf};
54use std::sync::Arc;
55use std::time::Duration;
56use tracing::{debug, warn};
57
58use super::Crawler;
59use crate::stats::StatCollector;
60
61/// Configuration for the crawler's concurrency settings.
62pub struct CrawlerConfig {
63    /// The maximum number of concurrent downloads.
64    pub max_concurrent_downloads: usize,
65    /// The number of workers dedicated to parsing responses.
66    pub parser_workers: usize,
67    /// The maximum number of concurrent item processing pipelines.
68    pub max_concurrent_pipelines: usize,
69    /// The capacity of communication channels between components.
70    pub channel_capacity: usize,
71}
72
73impl Default for CrawlerConfig {
74    fn default() -> Self {
75        CrawlerConfig {
76            max_concurrent_downloads: 5,
77            parser_workers: num_cpus::get(),
78            max_concurrent_pipelines: 5,
79            channel_capacity: 200,
80        }
81    }
82}
83
84pub struct CrawlerBuilder<S: Spider, D>
85where
86    D: Downloader,
87{
88    crawler_config: CrawlerConfig,
89    downloader: D,
90    spider: Option<S>,
91    middlewares: Vec<Box<dyn Middleware<D::Client> + Send + Sync>>,
92    item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
93    checkpoint_path: Option<PathBuf>,
94    checkpoint_interval: Option<Duration>,
95    _phantom: PhantomData<S>,
96}
97
98impl<S: Spider> Default for CrawlerBuilder<S, ReqwestClientDownloader> {
99    fn default() -> Self {
100        Self {
101            crawler_config: CrawlerConfig::default(),
102            downloader: ReqwestClientDownloader::default(),
103            spider: None,
104            middlewares: Vec::new(),
105            item_pipelines: Vec::new(),
106            checkpoint_path: None,
107            checkpoint_interval: None,
108            _phantom: PhantomData,
109        }
110    }
111}
112
113impl<S: Spider> CrawlerBuilder<S, ReqwestClientDownloader> {
114    /// Creates a new `CrawlerBuilder` for a given spider with the default ReqwestClientDownloader.
115    pub fn new(spider: S) -> Self {
116        Self {
117            spider: Some(spider),
118            ..Default::default()
119        }
120    }
121}
122
123impl<S: Spider, D: Downloader> CrawlerBuilder<S, D> {
124    /// Sets the maximum number of concurrent downloads.
125    pub fn max_concurrent_downloads(mut self, limit: usize) -> Self {
126        self.crawler_config.max_concurrent_downloads = limit;
127        self
128    }
129
130    /// Sets the maximum number of concurrent parser workers.
131    pub fn max_parser_workers(mut self, limit: usize) -> Self {
132        self.crawler_config.parser_workers = limit;
133        self
134    }
135
136    /// Sets the maximum number of concurrent pipelines.
137    pub fn max_concurrent_pipelines(mut self, limit: usize) -> Self {
138        self.crawler_config.max_concurrent_pipelines = limit;
139        self
140    }
141
142    /// Sets the capacity of communication channels between components.
143    pub fn channel_capacity(mut self, capacity: usize) -> Self {
144        self.crawler_config.channel_capacity = capacity;
145        self
146    }
147
148    /// Sets a custom downloader for the crawler.
149    pub fn downloader(mut self, downloader: D) -> Self {
150        self.downloader = downloader;
151        self
152    }
153
154    /// Adds a middleware to the crawler.
155    pub fn add_middleware<M>(mut self, middleware: M) -> Self
156    where
157        M: Middleware<D::Client> + Send + Sync + 'static,
158    {
159        self.middlewares.push(Box::new(middleware));
160        self
161    }
162
163    /// Adds an item pipeline to the crawler.
164    pub fn add_pipeline<P>(mut self, pipeline: P) -> Self
165    where
166        P: Pipeline<S::Item> + 'static,
167    {
168        self.item_pipelines.push(Box::new(pipeline));
169        self
170    }
171
172    /// Enables checkpointing and sets the path for the checkpoint file.
173    pub fn with_checkpoint_path<P: AsRef<Path>>(mut self, path: P) -> Self {
174        self.checkpoint_path = Some(path.as_ref().to_path_buf());
175        self
176    }
177
178    /// Sets the interval for periodic checkpointing.
179    pub fn with_checkpoint_interval(mut self, interval: Duration) -> Self {
180        self.checkpoint_interval = Some(interval);
181        self
182    }
183
184    /// Builds the `Crawler` instance, initializing and passing the `StatCollector` along with other configured components.
185    #[allow(unused_variables)]
186    pub async fn build(mut self) -> Result<Crawler<S, D::Client>, SpiderError>
187    where
188        D: Downloader + Send + Sync + 'static,
189        D::Client: Send + Sync + Clone,
190    {
191        let spider = self.validate_and_get_spider()?;
192        
193        // Ensure there's at least one pipeline
194        if self.item_pipelines.is_empty() {
195            return Err(SpiderError::ConfigurationError(
196                "At least one pipeline must be added to the crawler.".to_string(),
197            ));
198        }
199
200        let (initial_scheduler_state, loaded_cookie_store) =
201            self.load_and_restore_checkpoint_state().await?;
202
203        let (scheduler_arc, req_rx) = Scheduler::new(initial_scheduler_state);
204
205        let downloader_arc = Arc::new(self.downloader);
206        let stats = Arc::new(StatCollector::new());
207
208        let crawler = Crawler::new(
209            scheduler_arc,
210            req_rx,
211            downloader_arc,
212            self.middlewares,
213            spider,
214            self.item_pipelines,
215            self.crawler_config.max_concurrent_downloads,
216            self.crawler_config.parser_workers,
217            self.crawler_config.max_concurrent_pipelines,
218            self.crawler_config.channel_capacity,
219            self.checkpoint_path.take(),
220            self.checkpoint_interval,
221            stats,
222            Arc::new(tokio::sync::RwLock::new(loaded_cookie_store.unwrap_or_default())),
223        );
224
225        Ok(crawler)
226    }
227
228    async fn load_and_restore_checkpoint_state(
229        &mut self,
230    ) -> Result<(Option<SchedulerCheckpoint>, Option<crate::CookieStore>), SpiderError> {
231        let mut initial_scheduler_state = None;
232        let mut loaded_pipelines_state = None;
233        let mut loaded_cookie_store = None;
234
235        if let Some(path) = &self.checkpoint_path {
236            debug!("Attempting to load checkpoint from {:?}", path);
237            match fs::read(path) {
238                Ok(bytes) => match rmp_serde::from_slice::<crate::Checkpoint>(&bytes) {
239                    Ok(checkpoint) => {
240                        initial_scheduler_state = Some(checkpoint.scheduler);
241                        loaded_pipelines_state = Some(checkpoint.pipelines);
242                        
243                        loaded_cookie_store = Some(checkpoint.cookie_store);
244                    }
245                    Err(e) => warn!("Failed to deserialize checkpoint from {:?}: {}", path, e),
246                },
247                Err(e) => warn!("Failed to read checkpoint file {:?}: {}", path, e),
248            }
249        }
250
251        if let Some(pipeline_states) = loaded_pipelines_state {
252            for (name, state) in pipeline_states {
253                if let Some(pipeline) = self.item_pipelines.iter().find(|p| p.name() == name) {
254                    pipeline.restore_state(state).await?;
255                } else {
256                    warn!("Checkpoint contains state for unknown pipeline: {}", name);
257                }
258            }
259        }
260
261        Ok((initial_scheduler_state, loaded_cookie_store))
262    }
263
264    fn validate_and_get_spider(&mut self) -> Result<S, SpiderError> {
265        if self.crawler_config.max_concurrent_downloads == 0 {
266            return Err(SpiderError::ConfigurationError(
267                "max_concurrent_downloads must be greater than 0.".to_string(),
268            ));
269        }
270        if self.crawler_config.parser_workers == 0 {
271            return Err(SpiderError::ConfigurationError(
272                "parser_workers must be greater than 0.".to_string(),
273            ));
274        }
275        self.spider.take().ok_or_else(|| {
276            SpiderError::ConfigurationError("Crawler must have a spider.".to_string())
277        })
278    }
279}