spider_core/builder.rs
1//! # Builder Module
2//!
3//! Provides the [`CrawlerBuilder`], a fluent API for constructing and configuring
4//! [`Crawler`](crate::Crawler) instances with customizable settings and components.
5//!
6//! ## Overview
7//!
8//! The [`CrawlerBuilder`] simplifies the process of assembling various `spider-core`
9//! components into a fully configured web crawler. It provides a flexible,
10//! ergonomic interface for setting up all aspects of the crawling process.
11//!
12//! ## Key Features
13//!
14//! - **Concurrency Configuration**: Control the number of concurrent downloads,
15//! parsing workers, and pipeline processors
16//! - **Component Registration**: Attach custom downloaders, middlewares, and pipelines
17//! - **Checkpoint Management**: Configure automatic saving and loading of crawl state
18//! (requires `checkpoint` feature)
19//! - **Statistics Integration**: Initialize and connect the [`StatCollector`](crate::stats::StatCollector)
20//! - **Default Handling**: Automatic addition of essential middlewares when needed
21//!
22//! ## Example
23//!
24//! ```rust,ignore
25//! use spider_core::CrawlerBuilder;
26//! use spider_middleware::rate_limit::RateLimitMiddleware;
27//! use spider_pipeline::console::ConsolePipeline;
28//! use spider_util::error::SpiderError;
29//!
30//! async fn setup_crawler() -> Result<(), SpiderError> {
31//! let crawler = CrawlerBuilder::new(MySpider)
32//! .max_concurrent_downloads(10)
33//! .max_parser_workers(4)
34//! .add_middleware(RateLimitMiddleware::default())
35//! .add_pipeline(ConsolePipeline::new())
36//! .with_checkpoint_path("./crawl.checkpoint")
37//! .build()
38//! .await?;
39//!
40//! crawler.start_crawl().await
41//! }
42//! ```
43
44use crate::Downloader;
45use crate::ReqwestClientDownloader;
46use crate::scheduler::Scheduler;
47use crate::spider::Spider;
48use num_cpus;
49use spider_middleware::middleware::Middleware;
50use spider_pipeline::pipeline::Pipeline;
51use spider_util::error::SpiderError;
52use std::marker::PhantomData;
53use std::path::{Path, PathBuf};
54use std::sync::Arc;
55use std::time::Duration;
56
57use super::Crawler;
58use crate::stats::StatCollector;
59#[cfg(feature = "checkpoint")]
60use log::{debug, warn};
61
62#[cfg(feature = "checkpoint")]
63use crate::SchedulerCheckpoint;
64#[cfg(feature = "checkpoint")]
65use rmp_serde;
66#[cfg(feature = "checkpoint")]
67use std::fs;
68
69/// Configuration for the crawler's concurrency settings.
70///
71/// This struct holds tunable parameters that control the parallelism
72/// and throughput of the crawler.
73pub struct CrawlerConfig {
74 /// The maximum number of concurrent downloads.
75 pub max_concurrent_downloads: usize,
76 /// The number of workers dedicated to parsing responses.
77 pub parser_workers: usize,
78 /// The maximum number of concurrent item processing pipelines.
79 pub max_concurrent_pipelines: usize,
80 /// The capacity of communication channels between components.
81 pub channel_capacity: usize,
82}
83
84impl Default for CrawlerConfig {
85 fn default() -> Self {
86 CrawlerConfig {
87 max_concurrent_downloads: num_cpus::get().max(16),
88 parser_workers: num_cpus::get().clamp(4, 16),
89 max_concurrent_pipelines: num_cpus::get().min(8),
90 channel_capacity: 1000,
91 }
92 }
93}
94
95/// A fluent builder for constructing [`Crawler`] instances.
96///
97/// `CrawlerBuilder` provides a chainable API for configuring all aspects
98/// of a web crawler, including concurrency settings, middleware, pipelines,
99/// and checkpoint options.
100///
101/// ## Type Parameters
102///
103/// - `S`: The [`Spider`] implementation type
104/// - `D`: The [`Downloader`] implementation type
105///
106/// ## Example
107///
108/// ```rust,ignore
109/// # use spider_core::{CrawlerBuilder, Spider};
110/// # use spider_util::{response::Response, error::SpiderError, item::ParseOutput};
111/// # struct MySpider;
112/// # #[async_trait::async_trait]
113/// # impl Spider for MySpider {
114/// # type Item = String;
115/// # type State = ();
116/// # fn start_urls(&self) -> Vec<&'static str> { vec![] }
117/// # async fn parse(&self, response: Response, state: &Self::State) -> Result<ParseOutput<Self::Item>, SpiderError> { todo!() }
118/// # }
119/// let builder = CrawlerBuilder::new(MySpider)
120/// .max_concurrent_downloads(8)
121/// .max_parser_workers(4);
122/// ```
123pub struct CrawlerBuilder<S: Spider, D>
124where
125 D: Downloader,
126{
127 config: CrawlerConfig,
128 downloader: D,
129 spider: Option<S>,
130 middlewares: Vec<Box<dyn Middleware<D::Client> + Send + Sync>>,
131 pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
132 checkpoint_path: Option<PathBuf>,
133 checkpoint_interval: Option<Duration>,
134 _phantom: PhantomData<S>,
135}
136
137impl<S: Spider> Default for CrawlerBuilder<S, ReqwestClientDownloader> {
138 fn default() -> Self {
139 Self {
140 config: CrawlerConfig::default(),
141 downloader: ReqwestClientDownloader::default(),
142 spider: None,
143 middlewares: Vec::new(),
144 pipelines: Vec::new(),
145 checkpoint_path: None,
146 checkpoint_interval: None,
147 _phantom: PhantomData,
148 }
149 }
150}
151
152impl<S: Spider> CrawlerBuilder<S, ReqwestClientDownloader> {
153 /// Creates a new `CrawlerBuilder` for a given spider with the default [`ReqwestClientDownloader`].
154 ///
155 /// ## Example
156 ///
157 /// ```rust,ignore
158 /// let crawler = CrawlerBuilder::new(MySpider)
159 /// .build()
160 /// .await?;
161 /// ```
162 pub fn new(spider: S) -> Self {
163 Self {
164 spider: Some(spider),
165 ..Default::default()
166 }
167 }
168}
169
170impl<S: Spider, D: Downloader> CrawlerBuilder<S, D> {
171 /// Sets the maximum number of concurrent downloads.
172 ///
173 /// This controls how many HTTP requests can be in-flight simultaneously.
174 /// Higher values increase throughput but may overwhelm target servers.
175 ///
176 /// ## Default
177 ///
178 /// Defaults to the number of CPU cores, with a minimum of 16.
179 pub fn max_concurrent_downloads(mut self, limit: usize) -> Self {
180 self.config.max_concurrent_downloads = limit;
181 self
182 }
183
184 /// Sets the number of worker tasks dedicated to parsing responses.
185 ///
186 /// Parser workers process HTTP responses concurrently, calling the
187 /// spider's [`parse`](Spider::parse) method to extract items and
188 /// discover new URLs.
189 ///
190 /// ## Default
191 ///
192 /// Defaults to the number of CPU cores, clamped between 4 and 16.
193 pub fn max_parser_workers(mut self, limit: usize) -> Self {
194 self.config.parser_workers = limit;
195 self
196 }
197
198 /// Sets the maximum number of concurrent item processing pipelines.
199 ///
200 /// This controls how many items can be processed by pipelines simultaneously.
201 ///
202 /// ## Default
203 ///
204 /// Defaults to the number of CPU cores, with a maximum of 8.
205 pub fn max_concurrent_pipelines(mut self, limit: usize) -> Self {
206 self.config.max_concurrent_pipelines = limit;
207 self
208 }
209
210 /// Sets the capacity of internal communication channels.
211 ///
212 /// This controls the buffer size for channels between the downloader,
213 /// parser, and pipeline components. Higher values can improve throughput
214 /// at the cost of increased memory usage.
215 ///
216 /// ## Default
217 ///
218 /// Defaults to 1000.
219 pub fn channel_capacity(mut self, capacity: usize) -> Self {
220 self.config.channel_capacity = capacity;
221 self
222 }
223
224 /// Sets a custom downloader implementation.
225 ///
226 /// Use this method to provide a custom [`Downloader`] implementation
227 /// instead of the default [`ReqwestClientDownloader`].
228 pub fn downloader(mut self, downloader: D) -> Self {
229 self.downloader = downloader;
230 self
231 }
232
233 /// Adds a middleware to the crawler's middleware stack.
234 ///
235 /// Middlewares intercept and modify requests before they are sent and
236 /// responses after they are received. They are executed in the order
237 /// they are added.
238 ///
239 /// ## Example
240 ///
241 /// ```rust,ignore
242 /// let crawler = CrawlerBuilder::new(MySpider)
243 /// .add_middleware(RateLimitMiddleware::default())
244 /// .add_middleware(RetryMiddleware::new())
245 /// .build()
246 /// .await?;
247 /// ```
248 pub fn add_middleware<M>(mut self, middleware: M) -> Self
249 where
250 M: Middleware<D::Client> + Send + Sync + 'static,
251 {
252 self.middlewares.push(Box::new(middleware));
253 self
254 }
255
256 /// Adds a pipeline to the crawler's pipeline stack.
257 ///
258 /// Pipelines process scraped items after they are extracted by the spider.
259 /// They can be used for validation, transformation, deduplication, or
260 /// storage (e.g., writing to files or databases).
261 ///
262 /// ## Example
263 ///
264 /// ```rust,ignore
265 /// let crawler = CrawlerBuilder::new(MySpider)
266 /// .add_pipeline(ConsolePipeline::new())
267 /// .add_pipeline(JsonPipeline::new("output.json")?)
268 /// .build()
269 /// .await?;
270 /// ```
271 pub fn add_pipeline<P>(mut self, pipeline: P) -> Self
272 where
273 P: Pipeline<S::Item> + 'static,
274 {
275 self.pipelines.push(Box::new(pipeline));
276 self
277 }
278
279 /// Sets the path for saving and loading checkpoints.
280 ///
281 /// When enabled, the crawler periodically saves its state to this file,
282 /// allowing crawls to be resumed after interruption.
283 ///
284 /// Requires the `checkpoint` feature to be enabled.
285 pub fn with_checkpoint_path<P: AsRef<Path>>(mut self, path: P) -> Self {
286 self.checkpoint_path = Some(path.as_ref().to_path_buf());
287 self
288 }
289
290 /// Sets the interval between automatic checkpoint saves.
291 ///
292 /// When enabled, the crawler saves its state at this interval.
293 /// Shorter intervals provide more frequent recovery points but may
294 /// impact performance.
295 ///
296 /// Requires the `checkpoint` feature to be enabled.
297 pub fn with_checkpoint_interval(mut self, interval: Duration) -> Self {
298 self.checkpoint_interval = Some(interval);
299 self
300 }
301
302 /// Builds the [`Crawler`] instance.
303 ///
304 /// This method finalizes the crawler configuration and initializes all
305 /// components. It performs validation and sets up default values where
306 /// necessary.
307 ///
308 /// # Errors
309 ///
310 /// Returns a [`SpiderError::ConfigurationError`] if:
311 /// - `max_concurrent_downloads` is 0
312 /// - `parser_workers` is 0
313 /// - No spider was provided to the builder
314 ///
315 /// # Example
316 ///
317 /// ```rust,ignore
318 /// let crawler = CrawlerBuilder::new(MySpider)
319 /// .max_concurrent_downloads(10)
320 /// .build()
321 /// .await?;
322 /// ```
323 #[allow(unused_variables)]
324 pub async fn build(mut self) -> Result<Crawler<S, D::Client>, SpiderError>
325 where
326 D: Downloader + Send + Sync + 'static,
327 D::Client: Send + Sync + Clone,
328 S::Item: Send + Sync + 'static,
329 {
330 let spider = self.take_spider()?;
331 self.init_default_pipeline();
332
333 #[cfg(all(feature = "checkpoint", feature = "cookie-store"))]
334 {
335 let (scheduler_state, cookie_store) = self.restore_checkpoint().await?;
336 let (scheduler_arc, req_rx) = Scheduler::new(scheduler_state);
337 let downloader_arc = Arc::new(self.downloader);
338 let stats = Arc::new(StatCollector::new());
339 let crawler = Crawler::new(
340 scheduler_arc,
341 req_rx,
342 downloader_arc,
343 self.middlewares,
344 spider,
345 self.pipelines,
346 self.config.max_concurrent_downloads,
347 self.config.parser_workers,
348 self.config.max_concurrent_pipelines,
349 self.config.channel_capacity,
350 self.checkpoint_path.take(),
351 self.checkpoint_interval,
352 stats,
353 Arc::new(tokio::sync::RwLock::new(
354 cookie_store.unwrap_or_default(),
355 )),
356 );
357 Ok(crawler)
358 }
359
360 #[cfg(all(feature = "checkpoint", not(feature = "cookie-store")))]
361 {
362 let (scheduler_state, _cookie_store) = self.restore_checkpoint().await?;
363 let (scheduler_arc, req_rx) = Scheduler::new(scheduler_state);
364 let downloader_arc = Arc::new(self.downloader);
365 let stats = Arc::new(StatCollector::new());
366 let crawler = Crawler::new(
367 scheduler_arc,
368 req_rx,
369 downloader_arc,
370 self.middlewares,
371 spider,
372 self.pipelines,
373 self.config.max_concurrent_downloads,
374 self.config.parser_workers,
375 self.config.max_concurrent_pipelines,
376 self.config.channel_capacity,
377 self.checkpoint_path.take(),
378 self.checkpoint_interval,
379 stats,
380 );
381 Ok(crawler)
382 }
383
384 #[cfg(all(not(feature = "checkpoint"), feature = "cookie-store"))]
385 {
386 let (_scheduler_state, cookie_store) = self.restore_checkpoint().await?;
387 let (scheduler_arc, req_rx) = Scheduler::new(None::<()>);
388 let downloader_arc = Arc::new(self.downloader);
389 let stats = Arc::new(StatCollector::new());
390 let crawler = Crawler::new(
391 scheduler_arc,
392 req_rx,
393 downloader_arc,
394 self.middlewares,
395 spider,
396 self.pipelines,
397 self.config.max_concurrent_downloads,
398 self.config.parser_workers,
399 self.config.max_concurrent_pipelines,
400 self.config.channel_capacity,
401 stats,
402 Arc::new(tokio::sync::RwLock::new(
403 cookie_store.unwrap_or_default(),
404 )),
405 );
406 Ok(crawler)
407 }
408
409 #[cfg(all(not(feature = "checkpoint"), not(feature = "cookie-store")))]
410 {
411 let (_scheduler_state, _cookie_store) = self.restore_checkpoint().await?;
412 let (scheduler_arc, req_rx) = Scheduler::new(None::<()>);
413 let downloader_arc = Arc::new(self.downloader);
414 let stats = Arc::new(StatCollector::new());
415 let crawler = Crawler::new(
416 scheduler_arc,
417 req_rx,
418 downloader_arc,
419 self.middlewares,
420 spider,
421 self.pipelines,
422 self.config.max_concurrent_downloads,
423 self.config.parser_workers,
424 self.config.max_concurrent_pipelines,
425 self.config.channel_capacity,
426 stats,
427 );
428 Ok(crawler)
429 }
430 }
431
432 /// Restores checkpoint state from disk.
433 ///
434 /// This internal method loads a previously saved checkpoint file and
435 /// restores the scheduler state, pipeline states, and cookie store.
436 ///
437 /// # Returns
438 ///
439 /// Returns a tuple of `(Option<SchedulerCheckpoint>, Option<CookieStore>)`.
440 /// If no checkpoint path is configured or the file doesn't exist, returns
441 /// `None` values.
442 ///
443 /// # Errors
444 ///
445 /// Returns a [`SpiderError`] if pipeline state restoration fails.
446 /// Note: Checkpoint file read/deserialization errors are logged as warnings
447 /// but do not fail the operation—the crawl proceeds without checkpoint data.
448 #[cfg(all(feature = "checkpoint", feature = "cookie-store"))]
449 async fn restore_checkpoint(
450 &mut self,
451 ) -> Result<(Option<SchedulerCheckpoint>, Option<crate::CookieStore>), SpiderError> {
452 let mut scheduler_state = None;
453 let mut pipeline_states = None;
454 let mut cookie_store = None;
455
456 if let Some(path) = &self.checkpoint_path {
457 debug!("Attempting to load checkpoint from {:?}", path);
458 match fs::read(path) {
459 Ok(bytes) => match rmp_serde::from_slice::<crate::Checkpoint>(&bytes) {
460 Ok(checkpoint) => {
461 scheduler_state = Some(checkpoint.scheduler);
462 pipeline_states = Some(checkpoint.pipelines);
463 cookie_store = Some(checkpoint.cookie_store);
464 }
465 Err(e) => warn!("Failed to deserialize checkpoint from {:?}: {}", path, e),
466 },
467 Err(e) => warn!("Failed to read checkpoint file {:?}: {}", path, e),
468 }
469 }
470
471 if let Some(states) = pipeline_states {
472 for (name, state) in states {
473 if let Some(pipeline) = self.pipelines.iter().find(|p| p.name() == name) {
474 pipeline.restore_state(state).await?;
475 } else {
476 warn!("Checkpoint contains state for unknown pipeline: {}", name);
477 }
478 }
479 }
480
481 Ok((scheduler_state, cookie_store))
482 }
483
484 /// Restores checkpoint state from disk (without cookie store support).
485 ///
486 /// This internal method loads a previously saved checkpoint file and
487 /// restores the scheduler state and pipeline states.
488 ///
489 /// # Returns
490 ///
491 /// Returns a tuple of `(Option<SchedulerCheckpoint>, Option<()>)`.
492 /// If no checkpoint path is configured or the file doesn't exist, returns
493 /// `None` values.
494 ///
495 /// # Errors
496 ///
497 /// Returns a [`SpiderError`] if pipeline state restoration fails.
498 /// Note: Checkpoint file read/deserialization errors are logged as warnings
499 /// but do not fail the operation—the crawl proceeds without checkpoint data.
500 #[cfg(all(feature = "checkpoint", not(feature = "cookie-store")))]
501 async fn restore_checkpoint(
502 &mut self,
503 ) -> Result<(Option<SchedulerCheckpoint>, Option<()>), SpiderError> {
504 let mut scheduler_state = None;
505 let mut pipeline_states = None;
506
507 if let Some(path) = &self.checkpoint_path {
508 debug!("Attempting to load checkpoint from {:?}", path);
509 match fs::read(path) {
510 Ok(bytes) => match rmp_serde::from_slice::<crate::Checkpoint>(&bytes) {
511 Ok(checkpoint) => {
512 scheduler_state = Some(checkpoint.scheduler);
513 pipeline_states = Some(checkpoint.pipelines);
514 }
515 Err(e) => warn!("Failed to deserialize checkpoint from {:?}: {}", path, e),
516 },
517 Err(e) => warn!("Failed to read checkpoint file {:?}: {}", path, e),
518 }
519 }
520
521 if let Some(states) = pipeline_states {
522 for (name, state) in states {
523 if let Some(pipeline) = self.pipelines.iter().find(|p| p.name() == name) {
524 pipeline.restore_state(state).await?;
525 } else {
526 warn!("Checkpoint contains state for unknown pipeline: {}", name);
527 }
528 }
529 }
530
531 Ok((scheduler_state, None))
532 }
533
534 /// No-op checkpoint restoration when checkpoint feature is disabled.
535 #[cfg(all(not(feature = "checkpoint"), not(feature = "cookie-store")))]
536 async fn restore_checkpoint(&mut self) -> Result<(Option<()>, Option<()>), SpiderError> {
537 Ok((None, None))
538 }
539
540 /// Returns cookie store default when only cookie-store feature is enabled.
541 #[cfg(all(not(feature = "checkpoint"), feature = "cookie-store"))]
542 async fn restore_checkpoint(&mut self) -> Result<(Option<()>, Option<crate::CookieStore>), SpiderError> {
543 Ok((None, Some(crate::CookieStore::default())))
544 }
545
546 /// Extracts the spider from the builder.
547 ///
548 /// # Errors
549 ///
550 /// Returns a [`SpiderError::ConfigurationError`] if:
551 /// - `max_concurrent_downloads` is 0
552 /// - `parser_workers` is 0
553 /// - No spider was provided
554 fn take_spider(&mut self) -> Result<S, SpiderError> {
555 if self.config.max_concurrent_downloads == 0 {
556 return Err(SpiderError::ConfigurationError(
557 "max_concurrent_downloads must be greater than 0.".to_string(),
558 ));
559 }
560 if self.config.parser_workers == 0 {
561 return Err(SpiderError::ConfigurationError(
562 "parser_workers must be greater than 0.".to_string(),
563 ));
564 }
565 self.spider.take().ok_or_else(|| {
566 SpiderError::ConfigurationError("Crawler must have a spider.".to_string())
567 })
568 }
569
570 /// Initializes the pipeline stack with a default [`ConsolePipeline`] if empty.
571 ///
572 /// This ensures that scraped items are always output somewhere, even if
573 /// no explicit pipelines are configured by the user.
574 fn init_default_pipeline(&mut self) {
575 if self.pipelines.is_empty() {
576 use spider_pipeline::console::ConsolePipeline;
577 self.pipelines.push(Box::new(ConsolePipeline::new()));
578 }
579 }
580}
581