spider_core/builder.rs
1//! # Builder Module
2//!
3//! Provides the [`CrawlerBuilder`], a fluent API for constructing and configuring
4//! [`Crawler`](crate::Crawler) instances with customizable settings and components.
5//!
6//! ## Overview
7//!
8//! The [`CrawlerBuilder`] simplifies the process of assembling various `spider-core`
9//! components into a fully configured web crawler. It provides a flexible,
10//! ergonomic interface for setting up all aspects of the crawling process.
11//!
12//! ## Key Features
13//!
14//! - **Concurrency Configuration**: Control the number of concurrent downloads,
15//! parsing workers, and pipeline processors
16//! - **Component Registration**: Attach custom downloaders, middlewares, and pipelines
17//! - **Checkpoint Management**: Configure automatic saving and loading of crawl state
18//! (requires `checkpoint` feature)
19//! - **Statistics Integration**: Initialize and connect the [`StatCollector`](crate::stats::StatCollector)
20//! - **Default Handling**: Automatic addition of essential middlewares when needed
21//!
22//! ## Example
23//!
24//! ```rust,ignore
25//! use spider_core::CrawlerBuilder;
26//! use spider_middleware::rate_limit::RateLimitMiddleware;
27//! use spider_pipeline::console::ConsolePipeline;
28//! use spider_util::error::SpiderError;
29//!
30//! async fn setup_crawler() -> Result<(), SpiderError> {
31//! let crawler = CrawlerBuilder::new(MySpider)
32//! .max_concurrent_downloads(10)
33//! .max_parser_workers(4)
34//! .add_middleware(RateLimitMiddleware::default())
35//! .add_pipeline(ConsolePipeline::new())
36//! .with_checkpoint_path("./crawl.checkpoint")
37//! .build()
38//! .await?;
39//!
40//! crawler.start_crawl().await
41//! }
42//! ```
43
44use crate::Downloader;
45use crate::ReqwestClientDownloader;
46use crate::config::{CheckpointConfig, CrawlerConfig};
47use crate::scheduler::Scheduler;
48use crate::spider::Spider;
49use spider_middleware::middleware::Middleware;
50use spider_pipeline::pipeline::Pipeline;
51
52#[cfg(feature = "checkpoint")]
53type RestoreResult = (
54 Option<crate::SchedulerCheckpoint>,
55 Option<std::collections::HashMap<String, serde_json::Value>>,
56);
57use spider_util::error::SpiderError;
58use std::marker::PhantomData;
59use std::path::Path;
60use std::sync::Arc;
61use std::time::Duration;
62
63use super::Crawler;
64use crate::stats::StatCollector;
65use log::LevelFilter;
66#[cfg(feature = "checkpoint")]
67use log::{debug, warn};
68
69#[cfg(feature = "checkpoint")]
70use rmp_serde;
71#[cfg(feature = "checkpoint")]
72use std::fs;
73
74/// A fluent builder for constructing [`Crawler`] instances.
75///
76/// `CrawlerBuilder` provides a chainable API for configuring all aspects
77/// of a web crawler, including concurrency settings, middleware, pipelines,
78/// and checkpoint options.
79///
80/// ## Type Parameters
81///
82/// - `S`: The [`Spider`] implementation type
83/// - `D`: The [`Downloader`] implementation type
84///
85/// ## Example
86///
87/// ```rust,ignore
88/// # use spider_core::{CrawlerBuilder, Spider};
89/// # use spider_util::{response::Response, error::SpiderError, item::ParseOutput};
90/// # struct MySpider;
91/// # #[async_trait::async_trait]
92/// # impl Spider for MySpider {
93/// # type Item = String;
94/// # type State = ();
95/// # fn start_urls(&self) -> Vec<&'static str> { vec![] }
96/// # async fn parse(&self, response: Response, state: &Self::State) -> Result<ParseOutput<Self::Item>, SpiderError> { todo!() }
97/// # }
98/// let builder = CrawlerBuilder::new(MySpider)
99/// .max_concurrent_downloads(8)
100/// .max_parser_workers(4);
101/// ```
102pub struct CrawlerBuilder<S: Spider, D>
103where
104 D: Downloader,
105{
106 config: CrawlerConfig,
107 checkpoint_config: CheckpointConfig,
108 downloader: D,
109 spider: Option<S>,
110 middlewares: Vec<Box<dyn Middleware<D::Client> + Send + Sync>>,
111 pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
112 log_level: Option<LevelFilter>,
113 _phantom: PhantomData<S>,
114}
115
116impl<S: Spider> Default for CrawlerBuilder<S, ReqwestClientDownloader> {
117 fn default() -> Self {
118 Self {
119 config: CrawlerConfig::default(),
120 checkpoint_config: CheckpointConfig::default(),
121 downloader: ReqwestClientDownloader::default(),
122 spider: None,
123 middlewares: Vec::new(),
124 pipelines: Vec::new(),
125 log_level: None,
126 _phantom: PhantomData,
127 }
128 }
129}
130
131impl<S: Spider> CrawlerBuilder<S, ReqwestClientDownloader> {
132 /// Creates a new `CrawlerBuilder` for a given spider with the default [`ReqwestClientDownloader`].
133 ///
134 /// ## Example
135 ///
136 /// ```rust,ignore
137 /// let crawler = CrawlerBuilder::new(MySpider)
138 /// .build()
139 /// .await?;
140 /// ```
141 pub fn new(spider: S) -> Self {
142 Self {
143 spider: Some(spider),
144 ..Default::default()
145 }
146 }
147}
148
149impl<S: Spider, D: Downloader> CrawlerBuilder<S, D> {
150 #[cfg(feature = "checkpoint")]
151 fn load_checkpoint_from_path(&self, path: &std::path::Path) -> Option<crate::Checkpoint> {
152 match fs::read(path) {
153 Ok(bytes) => match rmp_serde::from_slice::<crate::Checkpoint>(&bytes) {
154 Ok(checkpoint) => Some(checkpoint),
155 Err(e) => {
156 warn!("Failed to deserialize checkpoint from {:?}: {}", path, e);
157 None
158 }
159 },
160 Err(e) => {
161 warn!("Failed to read checkpoint file {:?}: {}", path, e);
162 None
163 }
164 }
165 }
166
167 /// Sets the maximum number of concurrent downloads.
168 ///
169 /// This controls how many HTTP requests can be in-flight simultaneously.
170 /// Higher values increase throughput but may overwhelm target servers.
171 ///
172 /// ## Default
173 ///
174 /// Defaults to the number of CPU cores, with a minimum of 16.
175 pub fn max_concurrent_downloads(mut self, limit: usize) -> Self {
176 self.config.max_concurrent_downloads = limit;
177 self
178 }
179
180 /// Sets the number of worker tasks dedicated to parsing responses.
181 ///
182 /// Parser workers process HTTP responses concurrently, calling the
183 /// spider's [`parse`](Spider::parse) method to extract items and
184 /// discover new URLs.
185 ///
186 /// ## Default
187 ///
188 /// Defaults to the number of CPU cores, clamped between 4 and 16.
189 pub fn max_parser_workers(mut self, limit: usize) -> Self {
190 self.config.parser_workers = limit;
191 self
192 }
193
194 /// Sets the maximum number of concurrent item processing pipelines.
195 ///
196 /// This controls how many items can be processed by pipelines simultaneously.
197 ///
198 /// ## Default
199 ///
200 /// Defaults to the number of CPU cores, with a maximum of 8.
201 pub fn max_concurrent_pipelines(mut self, limit: usize) -> Self {
202 self.config.max_concurrent_pipelines = limit;
203 self
204 }
205
206 /// Sets the capacity of internal communication channels.
207 ///
208 /// This controls the buffer size for channels between the downloader,
209 /// parser, and pipeline components. Higher values can improve throughput
210 /// at the cost of increased memory usage.
211 ///
212 /// ## Default
213 ///
214 /// Defaults to 1000.
215 pub fn channel_capacity(mut self, capacity: usize) -> Self {
216 self.config.channel_capacity = capacity;
217 self
218 }
219
220 /// Sets a custom downloader implementation.
221 ///
222 /// Use this method to provide a custom [`Downloader`] implementation
223 /// instead of the default [`ReqwestClientDownloader`].
224 pub fn downloader(mut self, downloader: D) -> Self {
225 self.downloader = downloader;
226 self
227 }
228
229 /// Adds a middleware to the crawler's middleware stack.
230 ///
231 /// Middlewares intercept and modify requests before they are sent and
232 /// responses after they are received. They are executed in the order
233 /// they are added.
234 ///
235 /// ## Example
236 ///
237 /// ```rust,ignore
238 /// let crawler = CrawlerBuilder::new(MySpider)
239 /// .add_middleware(RateLimitMiddleware::default())
240 /// .add_middleware(RetryMiddleware::new())
241 /// .build()
242 /// .await?;
243 /// ```
244 pub fn add_middleware<M>(mut self, middleware: M) -> Self
245 where
246 M: Middleware<D::Client> + Send + Sync + 'static,
247 {
248 self.middlewares.push(Box::new(middleware));
249 self
250 }
251
252 /// Adds a pipeline to the crawler's pipeline stack.
253 ///
254 /// Pipelines process scraped items after they are extracted by the spider.
255 /// They can be used for validation, transformation, deduplication, or
256 /// storage (e.g., writing to files or databases).
257 ///
258 /// ## Example
259 ///
260 /// ```rust,ignore
261 /// let crawler = CrawlerBuilder::new(MySpider)
262 /// .add_pipeline(ConsolePipeline::new())
263 /// .add_pipeline(JsonPipeline::new("output.json")?)
264 /// .build()
265 /// .await?;
266 /// ```
267 pub fn add_pipeline<P>(mut self, pipeline: P) -> Self
268 where
269 P: Pipeline<S::Item> + 'static,
270 {
271 self.pipelines.push(Box::new(pipeline));
272 self
273 }
274
275 /// Sets the log level for `spider-*` library crates.
276 ///
277 /// This configures the logging level specifically for the spider-lib ecosystem
278 /// (spider-core, spider-middleware, spider-pipeline, spider-util, spider-downloader).
279 /// Logs from other dependencies (e.g., reqwest, tokio) will not be affected.
280 ///
281 /// ## Log Levels
282 ///
283 /// - `LevelFilter::Error` - Only error messages
284 /// - `LevelFilter::Warn` - Warnings and errors
285 /// - `LevelFilter::Info` - Informational messages, warnings, and errors
286 /// - `LevelFilter::Debug` - Debug messages and above
287 /// - `LevelFilter::Trace` - All messages including trace
288 ///
289 /// ## Example
290 ///
291 /// ```rust,ignore
292 /// use log::LevelFilter;
293 ///
294 /// let crawler = CrawlerBuilder::new(MySpider)
295 /// .log_level(LevelFilter::Debug)
296 /// .build()
297 /// .await?;
298 /// ```
299 pub fn log_level(mut self, level: LevelFilter) -> Self {
300 self.log_level = Some(level);
301 self
302 }
303
304 /// Sets the path for saving and loading checkpoints.
305 ///
306 /// When enabled, the crawler periodically saves its state to this file,
307 /// allowing crawls to be resumed after interruption.
308 ///
309 /// Requires the `checkpoint` feature to be enabled.
310 pub fn with_checkpoint_path<P: AsRef<Path>>(mut self, path: P) -> Self {
311 self.checkpoint_config.path = Some(path.as_ref().to_path_buf());
312 self
313 }
314
315 /// Sets the interval between automatic checkpoint saves.
316 ///
317 /// When enabled, the crawler saves its state at this interval.
318 /// Shorter intervals provide more frequent recovery points but may
319 /// impact performance.
320 ///
321 /// Requires the `checkpoint` feature to be enabled.
322 pub fn with_checkpoint_interval(mut self, interval: Duration) -> Self {
323 self.checkpoint_config.interval = Some(interval);
324 self
325 }
326
327 /// Builds the [`Crawler`] instance.
328 ///
329 /// This method finalizes the crawler configuration and initializes all
330 /// components. It performs validation and sets up default values where
331 /// necessary.
332 ///
333 /// # Errors
334 ///
335 /// Returns a [`SpiderError::ConfigurationError`] if:
336 /// - `max_concurrent_downloads` is 0
337 /// - `parser_workers` is 0
338 /// - No spider was provided to the builder
339 ///
340 /// # Example
341 ///
342 /// ```rust,ignore
343 /// let crawler = CrawlerBuilder::new(MySpider)
344 /// .max_concurrent_downloads(10)
345 /// .build()
346 /// .await?;
347 /// ```
348 pub async fn build(mut self) -> Result<Crawler<S, D::Client>, SpiderError>
349 where
350 D: Downloader + Send + Sync + 'static,
351 D::Client: Send + Sync + Clone,
352 S::Item: Send + Sync + 'static,
353 {
354 let spider = self.take_spider()?;
355 self.init_default_pipeline();
356
357 // Initialize logging for spider-* crates if log level is configured
358 if let Some(level) = self.log_level {
359 self.init_logging(level);
360 }
361
362 // Validate config
363 self.config
364 .validate()
365 .map_err(SpiderError::ConfigurationError)?;
366
367 // Restore checkpoint and get scheduler state
368 #[cfg(feature = "checkpoint")]
369 let (scheduler_state, pipeline_states) = self.restore_checkpoint()?;
370 #[cfg(not(feature = "checkpoint"))]
371 let scheduler_state: Option<()> = None;
372
373 // Restore pipeline states if checkpoint was loaded
374 #[cfg(feature = "checkpoint")]
375 {
376 if let Some(states) = pipeline_states {
377 for (name, state) in states {
378 if let Some(pipeline) = self.pipelines.iter().find(|p| p.name() == name) {
379 pipeline.restore_state(state).await?;
380 } else {
381 warn!("Checkpoint contains state for unknown pipeline: {}", name);
382 }
383 }
384 }
385 }
386
387 // Get cookie store if feature is enabled
388 #[cfg(feature = "cookie-store")]
389 let cookie_store = {
390 #[cfg(feature = "checkpoint")]
391 {
392 let (_, cookie_store) = self.restore_cookie_store().await?;
393 cookie_store
394 }
395 #[cfg(not(feature = "checkpoint"))]
396 {
397 Some(crate::CookieStore::default())
398 }
399 };
400
401 // Create scheduler with or without checkpoint state
402 let (scheduler_arc, req_rx) = Scheduler::new(scheduler_state);
403 let downloader_arc = Arc::new(self.downloader);
404 let stats = Arc::new(StatCollector::new());
405
406 // Build crawler with or without cookie store based on feature flag
407 #[cfg(feature = "cookie-store")]
408 let crawler = Crawler::new(
409 scheduler_arc,
410 req_rx,
411 downloader_arc,
412 self.middlewares,
413 spider,
414 self.pipelines,
415 self.config,
416 #[cfg(feature = "checkpoint")]
417 self.checkpoint_config,
418 stats,
419 Arc::new(tokio::sync::RwLock::new(cookie_store.unwrap_or_default())),
420 );
421
422 #[cfg(not(feature = "cookie-store"))]
423 let crawler = Crawler::new(
424 scheduler_arc,
425 req_rx,
426 downloader_arc,
427 self.middlewares,
428 spider,
429 self.pipelines,
430 self.config,
431 #[cfg(feature = "checkpoint")]
432 self.checkpoint_config,
433 stats,
434 );
435
436 Ok(crawler)
437 }
438
439 /// Restores checkpoint state from disk (checkpoint feature only).
440 ///
441 /// This internal method loads a previously saved checkpoint file and
442 /// restores the scheduler state and pipeline states.
443 ///
444 /// # Returns
445 ///
446 /// Returns a tuple of `(SchedulerCheckpoint, Option<HashMap<String, Value>>)`.
447 /// If no checkpoint path is configured or the file doesn't exist, returns
448 /// default values.
449 ///
450 /// # Errors
451 ///
452 /// Returns a [`SpiderError`] if deserialization fails.
453 /// Note: Checkpoint file read/deserialization errors are logged as warnings
454 /// but do not fail the operation—the crawl proceeds without checkpoint data.
455 #[cfg(feature = "checkpoint")]
456 fn restore_checkpoint(&mut self) -> Result<RestoreResult, SpiderError> {
457 let mut scheduler_state = None;
458 let mut pipeline_states = None;
459
460 if let Some(path) = &self.checkpoint_config.path {
461 debug!("Attempting to load checkpoint from {:?}", path);
462 if let Some(checkpoint) = self.load_checkpoint_from_path(path) {
463 scheduler_state = Some(checkpoint.scheduler);
464 pipeline_states = Some(checkpoint.pipelines);
465 }
466 }
467
468 Ok((scheduler_state, pipeline_states))
469 }
470
471 /// Restores cookie store from checkpoint (checkpoint + cookie-store features).
472 #[cfg(all(feature = "checkpoint", feature = "cookie-store"))]
473 async fn restore_cookie_store(
474 &mut self,
475 ) -> Result<(Option<()>, Option<crate::CookieStore>), SpiderError> {
476 let mut cookie_store = None;
477
478 if let Some(path) = &self.checkpoint_config.path {
479 debug!("Attempting to load cookie store from checkpoint {:?}", path);
480 if let Some(checkpoint) = self.load_checkpoint_from_path(path) {
481 cookie_store = Some(checkpoint.cookie_store);
482 }
483 }
484
485 Ok((None, cookie_store))
486 }
487
488 /// Extracts the spider from the builder.
489 ///
490 /// # Errors
491 ///
492 /// Returns a [`SpiderError::ConfigurationError`] if:
493 /// - `max_concurrent_downloads` is 0
494 /// - `parser_workers` is 0
495 /// - No spider was provided
496 fn take_spider(&mut self) -> Result<S, SpiderError> {
497 if self.config.max_concurrent_downloads == 0 {
498 return Err(SpiderError::ConfigurationError(
499 "max_concurrent_downloads must be greater than 0.".to_string(),
500 ));
501 }
502 if self.config.parser_workers == 0 {
503 return Err(SpiderError::ConfigurationError(
504 "parser_workers must be greater than 0.".to_string(),
505 ));
506 }
507 self.spider.take().ok_or_else(|| {
508 SpiderError::ConfigurationError("Crawler must have a spider.".to_string())
509 })
510 }
511
512 /// Initializes the pipeline stack with a default [`ConsolePipeline`] if empty.
513 ///
514 /// This ensures that scraped items are always output somewhere, even if
515 /// no explicit pipelines are configured by the user.
516 fn init_default_pipeline(&mut self) {
517 if self.pipelines.is_empty() {
518 use spider_pipeline::console::ConsolePipeline;
519 self.pipelines.push(Box::new(ConsolePipeline::new()));
520 }
521 }
522
523 /// Initializes logging for spider-* crates only.
524 ///
525 /// This sets up env_logger with a filter that only enables logging for
526 /// crates within the spider-lib ecosystem.
527 fn init_logging(&self, level: LevelFilter) {
528 use env_logger::{Builder, Env};
529
530 let mut builder = Builder::from_env(Env::default().default_filter_or("off"));
531
532 // Set filter specifically for spider-* crates
533 builder.filter_module("spider_core", level);
534 builder.filter_module("spider_middleware", level);
535 builder.filter_module("spider_pipeline", level);
536 builder.filter_module("spider_util", level);
537 builder.filter_module("spider_downloader", level);
538 builder.filter_module("spider_macro", level);
539
540 builder.init();
541 }
542}