spider_core/builder.rs
1//! # Builder Module
2//!
3//! Provides the [`CrawlerBuilder`], a fluent API for constructing and configuring
4//! [`Crawler`](crate::Crawler) instances with customizable settings and components.
5//!
6//! ## Overview
7//!
8//! The [`CrawlerBuilder`] simplifies the process of assembling various `spider-core`
9//! components into a fully configured web crawler. It provides a flexible,
10//! ergonomic interface for setting up all aspects of the crawling process.
11//!
12//! ## Key Features
13//!
14//! - **Concurrency Configuration**: Control the number of concurrent downloads,
15//! parsing workers, and pipeline processors
16//! - **Component Registration**: Attach custom downloaders, middlewares, and pipelines
17//! - **Checkpoint Management**: Configure automatic saving and loading of crawl state
18//! (requires `checkpoint` feature)
19//! - **Statistics Integration**: Initialize and connect the [`StatCollector`](crate::stats::StatCollector)
20//! - **Default Handling**: Automatic addition of essential middlewares when needed
21//!
22//! ## Example
23//!
24//! ```rust,ignore
25//! use spider_core::CrawlerBuilder;
26//! use spider_middleware::rate_limit::RateLimitMiddleware;
27//! use spider_pipeline::console::ConsolePipeline;
28//! use spider_util::error::SpiderError;
29//!
30//! async fn setup_crawler() -> Result<(), SpiderError> {
31//! let crawler = CrawlerBuilder::new(MySpider)
32//! .max_concurrent_downloads(10)
33//! .max_parser_workers(4)
34//! .add_middleware(RateLimitMiddleware::default())
35//! .add_pipeline(ConsolePipeline::new())
36//! .with_checkpoint_path("./crawl.checkpoint")
37//! .build()
38//! .await?;
39//!
40//! crawler.start_crawl().await
41//! }
42//! ```
43
44use crate::Downloader;
45use crate::ReqwestClientDownloader;
46use crate::config::{CheckpointConfig, CrawlerConfig};
47use crate::scheduler::Scheduler;
48use crate::spider::Spider;
49use spider_middleware::middleware::Middleware;
50use spider_pipeline::pipeline::Pipeline;
51
52#[cfg(feature = "checkpoint")]
53type RestoreResult = (
54 Option<crate::SchedulerCheckpoint>,
55 Option<std::collections::HashMap<String, serde_json::Value>>,
56);
57use spider_util::error::SpiderError;
58use std::marker::PhantomData;
59use std::path::Path;
60use std::sync::Arc;
61use std::time::Duration;
62
63use super::Crawler;
64use crate::stats::StatCollector;
65use log::LevelFilter;
66#[cfg(feature = "checkpoint")]
67use log::{debug, warn};
68
69#[cfg(feature = "checkpoint")]
70use rmp_serde;
71#[cfg(feature = "checkpoint")]
72use std::fs;
73
74/// A fluent builder for constructing [`Crawler`] instances.
75///
76/// `CrawlerBuilder` provides a chainable API for configuring all aspects
77/// of a web crawler, including concurrency settings, middleware, pipelines,
78/// and checkpoint options.
79///
80/// ## Type Parameters
81///
82/// - `S`: The [`Spider`] implementation type
83/// - `D`: The [`Downloader`] implementation type
84///
85/// ## Example
86///
87/// ```rust,ignore
88/// # use spider_core::{CrawlerBuilder, Spider};
89/// # use spider_util::{response::Response, error::SpiderError, item::ParseOutput};
90/// # struct MySpider;
91/// # #[async_trait::async_trait]
92/// # impl Spider for MySpider {
93/// # type Item = String;
94/// # type State = ();
95/// # fn start_urls(&self) -> Vec<&'static str> { vec![] }
96/// # async fn parse(&self, response: Response, state: &Self::State) -> Result<ParseOutput<Self::Item>, SpiderError> { todo!() }
97/// # }
98/// let builder = CrawlerBuilder::new(MySpider)
99/// .max_concurrent_downloads(8)
100/// .max_parser_workers(4);
101/// ```
102pub struct CrawlerBuilder<S: Spider, D>
103where
104 D: Downloader,
105{
106 config: CrawlerConfig,
107 checkpoint_config: CheckpointConfig,
108 downloader: D,
109 spider: Option<S>,
110 middlewares: Vec<Box<dyn Middleware<D::Client> + Send + Sync>>,
111 pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
112 log_level: Option<LevelFilter>,
113 _phantom: PhantomData<S>,
114}
115
116impl<S: Spider> Default for CrawlerBuilder<S, ReqwestClientDownloader> {
117 fn default() -> Self {
118 Self {
119 config: CrawlerConfig::default(),
120 checkpoint_config: CheckpointConfig::default(),
121 downloader: ReqwestClientDownloader::default(),
122 spider: None,
123 middlewares: Vec::new(),
124 pipelines: Vec::new(),
125 log_level: None,
126 _phantom: PhantomData,
127 }
128 }
129}
130
131impl<S: Spider> CrawlerBuilder<S, ReqwestClientDownloader> {
132 /// Creates a new `CrawlerBuilder` for a given spider with the default [`ReqwestClientDownloader`].
133 ///
134 /// ## Example
135 ///
136 /// ```rust,ignore
137 /// let crawler = CrawlerBuilder::new(MySpider)
138 /// .build()
139 /// .await?;
140 /// ```
141 pub fn new(spider: S) -> Self {
142 Self {
143 spider: Some(spider),
144 ..Default::default()
145 }
146 }
147}
148
149impl<S: Spider, D: Downloader> CrawlerBuilder<S, D> {
150 #[cfg(feature = "checkpoint")]
151 fn load_checkpoint_from_path(&self, path: &std::path::Path) -> Option<crate::Checkpoint> {
152 match fs::read(path) {
153 Ok(bytes) => match rmp_serde::from_slice::<crate::Checkpoint>(&bytes) {
154 Ok(checkpoint) => Some(checkpoint),
155 Err(e) => {
156 warn!("Failed to deserialize checkpoint from {:?}: {}", path, e);
157 None
158 }
159 },
160 Err(e) => {
161 warn!("Failed to read checkpoint file {:?}: {}", path, e);
162 None
163 }
164 }
165 }
166
167 /// Sets the maximum number of concurrent downloads.
168 ///
169 /// This controls how many HTTP requests can be in-flight simultaneously.
170 /// Higher values increase throughput but may overwhelm target servers.
171 ///
172 /// ## Default
173 ///
174 /// Defaults to the number of CPU cores, with a minimum of 16.
175 pub fn max_concurrent_downloads(mut self, limit: usize) -> Self {
176 self.config.max_concurrent_downloads = limit;
177 self
178 }
179
180 /// Sets the number of worker tasks dedicated to parsing responses.
181 ///
182 /// Parser workers process HTTP responses concurrently, calling the
183 /// spider's [`parse`](Spider::parse) method to extract items and
184 /// discover new URLs.
185 ///
186 /// ## Default
187 ///
188 /// Defaults to the number of CPU cores, clamped between 4 and 16.
189 pub fn max_parser_workers(mut self, limit: usize) -> Self {
190 self.config.parser_workers = limit;
191 self
192 }
193
194 /// Sets the maximum number of concurrent item processing pipelines.
195 ///
196 /// This controls how many items can be processed by pipelines simultaneously.
197 ///
198 /// ## Default
199 ///
200 /// Defaults to the number of CPU cores, with a maximum of 8.
201 pub fn max_concurrent_pipelines(mut self, limit: usize) -> Self {
202 self.config.max_concurrent_pipelines = limit;
203 self
204 }
205
206 /// Sets the capacity of internal communication channels.
207 ///
208 /// This controls the buffer size for channels between the downloader,
209 /// parser, and pipeline components. Higher values can improve throughput
210 /// at the cost of increased memory usage.
211 ///
212 /// ## Default
213 ///
214 /// Defaults to 1000.
215 pub fn channel_capacity(mut self, capacity: usize) -> Self {
216 self.config.channel_capacity = capacity;
217 self
218 }
219
220 /// Enables or disables live, in-place statistics updates on terminal stdout.
221 ///
222 /// When enabled, spider-* logs are forced to `LevelFilter::Off` during build
223 /// to avoid interleaving with the live terminal renderer.
224 pub fn live_stats(mut self, enabled: bool) -> Self {
225 self.config.live_stats = enabled;
226 self
227 }
228
229 /// Sets the refresh interval for live statistics updates.
230 pub fn live_stats_interval(mut self, interval: Duration) -> Self {
231 self.config.live_stats_interval = interval;
232 self
233 }
234
235 /// Sets a custom downloader implementation.
236 ///
237 /// Use this method to provide a custom [`Downloader`] implementation
238 /// instead of the default [`ReqwestClientDownloader`].
239 pub fn downloader(mut self, downloader: D) -> Self {
240 self.downloader = downloader;
241 self
242 }
243
244 /// Adds a middleware to the crawler's middleware stack.
245 ///
246 /// Middlewares intercept and modify requests before they are sent and
247 /// responses after they are received. They are executed in the order
248 /// they are added.
249 ///
250 /// ## Example
251 ///
252 /// ```rust,ignore
253 /// let crawler = CrawlerBuilder::new(MySpider)
254 /// .add_middleware(RateLimitMiddleware::default())
255 /// .add_middleware(RetryMiddleware::new())
256 /// .build()
257 /// .await?;
258 /// ```
259 pub fn add_middleware<M>(mut self, middleware: M) -> Self
260 where
261 M: Middleware<D::Client> + Send + Sync + 'static,
262 {
263 self.middlewares.push(Box::new(middleware));
264 self
265 }
266
267 /// Adds a pipeline to the crawler's pipeline stack.
268 ///
269 /// Pipelines process scraped items after they are extracted by the spider.
270 /// They can be used for validation, transformation, deduplication, or
271 /// storage (e.g., writing to files or databases).
272 ///
273 /// ## Example
274 ///
275 /// ```rust,ignore
276 /// let crawler = CrawlerBuilder::new(MySpider)
277 /// .add_pipeline(ConsolePipeline::new())
278 /// .add_pipeline(JsonPipeline::new("output.json")?)
279 /// .build()
280 /// .await?;
281 /// ```
282 pub fn add_pipeline<P>(mut self, pipeline: P) -> Self
283 where
284 P: Pipeline<S::Item> + 'static,
285 {
286 self.pipelines.push(Box::new(pipeline));
287 self
288 }
289
290 /// Sets the log level for `spider-*` library crates.
291 ///
292 /// This configures the logging level specifically for the spider-lib ecosystem
293 /// (spider-core, spider-middleware, spider-pipeline, spider-util, spider-downloader).
294 /// Logs from other dependencies (e.g., reqwest, tokio) will not be affected.
295 ///
296 /// ## Log Levels
297 ///
298 /// - `LevelFilter::Error` - Only error messages
299 /// - `LevelFilter::Warn` - Warnings and errors
300 /// - `LevelFilter::Info` - Informational messages, warnings, and errors
301 /// - `LevelFilter::Debug` - Debug messages and above
302 /// - `LevelFilter::Trace` - All messages including trace
303 ///
304 /// ## Example
305 ///
306 /// ```rust,ignore
307 /// use log::LevelFilter;
308 ///
309 /// let crawler = CrawlerBuilder::new(MySpider)
310 /// .log_level(LevelFilter::Debug)
311 /// .build()
312 /// .await?;
313 /// ```
314 pub fn log_level(mut self, level: LevelFilter) -> Self {
315 self.log_level = Some(level);
316 self
317 }
318
319 /// Sets the path for saving and loading checkpoints.
320 ///
321 /// When enabled, the crawler periodically saves its state to this file,
322 /// allowing crawls to be resumed after interruption.
323 ///
324 /// Requires the `checkpoint` feature to be enabled.
325 pub fn with_checkpoint_path<P: AsRef<Path>>(mut self, path: P) -> Self {
326 self.checkpoint_config.path = Some(path.as_ref().to_path_buf());
327 self
328 }
329
330 /// Sets the interval between automatic checkpoint saves.
331 ///
332 /// When enabled, the crawler saves its state at this interval.
333 /// Shorter intervals provide more frequent recovery points but may
334 /// impact performance.
335 ///
336 /// Requires the `checkpoint` feature to be enabled.
337 pub fn with_checkpoint_interval(mut self, interval: Duration) -> Self {
338 self.checkpoint_config.interval = Some(interval);
339 self
340 }
341
342 /// Builds the [`Crawler`] instance.
343 ///
344 /// This method finalizes the crawler configuration and initializes all
345 /// components. It performs validation and sets up default values where
346 /// necessary.
347 ///
348 /// # Errors
349 ///
350 /// Returns a [`SpiderError::ConfigurationError`] if:
351 /// - `max_concurrent_downloads` is 0
352 /// - `parser_workers` is 0
353 /// - No spider was provided to the builder
354 ///
355 /// # Example
356 ///
357 /// ```rust,ignore
358 /// let crawler = CrawlerBuilder::new(MySpider)
359 /// .max_concurrent_downloads(10)
360 /// .build()
361 /// .await?;
362 /// ```
363 pub async fn build(mut self) -> Result<Crawler<S, D::Client>, SpiderError>
364 where
365 D: Downloader + Send + Sync + 'static,
366 D::Client: Send + Sync + Clone,
367 S::Item: Send + Sync + 'static,
368 {
369 let spider = self.take_spider()?;
370 self.init_default_pipeline();
371
372 // Live stats redraw on stdout and should not interleave with spider-* logs.
373 // Force spider-* logs to Off when live stats mode is enabled.
374 let effective_log_level = if self.config.live_stats {
375 Some(LevelFilter::Off)
376 } else {
377 self.log_level
378 };
379
380 // Initialize logging for spider-* crates if an effective log level is configured.
381 if let Some(level) = effective_log_level {
382 self.init_logging(level);
383 }
384
385 // Validate config
386 self.config
387 .validate()
388 .map_err(SpiderError::ConfigurationError)?;
389
390 // Restore checkpoint and get scheduler state
391 #[cfg(feature = "checkpoint")]
392 let (scheduler_state, pipeline_states) = self.restore_checkpoint()?;
393 #[cfg(not(feature = "checkpoint"))]
394 let scheduler_state: Option<()> = None;
395
396 // Restore pipeline states if checkpoint was loaded
397 #[cfg(feature = "checkpoint")]
398 {
399 if let Some(states) = pipeline_states {
400 for (name, state) in states {
401 if let Some(pipeline) = self.pipelines.iter().find(|p| p.name() == name) {
402 pipeline.restore_state(state).await?;
403 } else {
404 warn!("Checkpoint contains state for unknown pipeline: {}", name);
405 }
406 }
407 }
408 }
409
410 // Get cookie store if feature is enabled
411 #[cfg(feature = "cookie-store")]
412 let cookie_store = {
413 #[cfg(feature = "checkpoint")]
414 {
415 let (_, cookie_store) = self.restore_cookie_store().await?;
416 cookie_store
417 }
418 #[cfg(not(feature = "checkpoint"))]
419 {
420 Some(crate::CookieStore::default())
421 }
422 };
423
424 // Create scheduler with or without checkpoint state
425 let (scheduler_arc, req_rx) = Scheduler::new(scheduler_state);
426 let downloader_arc = Arc::new(self.downloader);
427 let stats = Arc::new(StatCollector::new());
428
429 // Build crawler with or without cookie store based on feature flag
430 #[cfg(feature = "cookie-store")]
431 let crawler = Crawler::new(
432 scheduler_arc,
433 req_rx,
434 downloader_arc,
435 self.middlewares,
436 spider,
437 self.pipelines,
438 self.config,
439 #[cfg(feature = "checkpoint")]
440 self.checkpoint_config,
441 stats,
442 Arc::new(tokio::sync::RwLock::new(cookie_store.unwrap_or_default())),
443 );
444
445 #[cfg(not(feature = "cookie-store"))]
446 let crawler = Crawler::new(
447 scheduler_arc,
448 req_rx,
449 downloader_arc,
450 self.middlewares,
451 spider,
452 self.pipelines,
453 self.config,
454 #[cfg(feature = "checkpoint")]
455 self.checkpoint_config,
456 stats,
457 );
458
459 Ok(crawler)
460 }
461
462 /// Restores checkpoint state from disk (checkpoint feature only).
463 ///
464 /// This internal method loads a previously saved checkpoint file and
465 /// restores the scheduler state and pipeline states.
466 ///
467 /// # Returns
468 ///
469 /// Returns a tuple of `(SchedulerCheckpoint, Option<HashMap<String, Value>>)`.
470 /// If no checkpoint path is configured or the file doesn't exist, returns
471 /// default values.
472 ///
473 /// # Errors
474 ///
475 /// Returns a [`SpiderError`] if deserialization fails.
476 /// Note: Checkpoint file read/deserialization errors are logged as warnings
477 /// but do not fail the operation—the crawl proceeds without checkpoint data.
478 #[cfg(feature = "checkpoint")]
479 fn restore_checkpoint(&mut self) -> Result<RestoreResult, SpiderError> {
480 let mut scheduler_state = None;
481 let mut pipeline_states = None;
482
483 if let Some(path) = &self.checkpoint_config.path {
484 debug!("Attempting to load checkpoint from {:?}", path);
485 if let Some(checkpoint) = self.load_checkpoint_from_path(path) {
486 scheduler_state = Some(checkpoint.scheduler);
487 pipeline_states = Some(checkpoint.pipelines);
488 }
489 }
490
491 Ok((scheduler_state, pipeline_states))
492 }
493
494 /// Restores cookie store from checkpoint (checkpoint + cookie-store features).
495 #[cfg(all(feature = "checkpoint", feature = "cookie-store"))]
496 async fn restore_cookie_store(
497 &mut self,
498 ) -> Result<(Option<()>, Option<crate::CookieStore>), SpiderError> {
499 let mut cookie_store = None;
500
501 if let Some(path) = &self.checkpoint_config.path {
502 debug!("Attempting to load cookie store from checkpoint {:?}", path);
503 if let Some(checkpoint) = self.load_checkpoint_from_path(path) {
504 cookie_store = Some(checkpoint.cookie_store);
505 }
506 }
507
508 Ok((None, cookie_store))
509 }
510
511 /// Extracts the spider from the builder.
512 ///
513 /// # Errors
514 ///
515 /// Returns a [`SpiderError::ConfigurationError`] if:
516 /// - `max_concurrent_downloads` is 0
517 /// - `parser_workers` is 0
518 /// - No spider was provided
519 fn take_spider(&mut self) -> Result<S, SpiderError> {
520 if self.config.max_concurrent_downloads == 0 {
521 return Err(SpiderError::ConfigurationError(
522 "max_concurrent_downloads must be greater than 0.".to_string(),
523 ));
524 }
525 if self.config.parser_workers == 0 {
526 return Err(SpiderError::ConfigurationError(
527 "parser_workers must be greater than 0.".to_string(),
528 ));
529 }
530 self.spider.take().ok_or_else(|| {
531 SpiderError::ConfigurationError("Crawler must have a spider.".to_string())
532 })
533 }
534
535 /// Initializes the pipeline stack with a default [`ConsolePipeline`] if empty.
536 ///
537 /// This ensures that scraped items are always output somewhere, even if
538 /// no explicit pipelines are configured by the user.
539 fn init_default_pipeline(&mut self) {
540 if self.pipelines.is_empty() {
541 use spider_pipeline::console::ConsolePipeline;
542 self.pipelines.push(Box::new(ConsolePipeline::new()));
543 }
544 }
545
546 /// Initializes logging for spider-* crates only.
547 ///
548 /// This sets up env_logger with a filter that only enables logging for
549 /// crates within the spider-lib ecosystem.
550 fn init_logging(&self, level: LevelFilter) {
551 use env_logger::{Builder, Env};
552
553 let mut builder = Builder::from_env(Env::default().default_filter_or("off"));
554
555 // Set filter specifically for spider-* crates
556 builder.filter_module("spider_core", level);
557 builder.filter_module("spider_middleware", level);
558 builder.filter_module("spider_pipeline", level);
559 builder.filter_module("spider_util", level);
560 builder.filter_module("spider_downloader", level);
561 builder.filter_module("spider_macro", level);
562
563 builder.init();
564 }
565}