Skip to main content

spider_core/
builder.rs

1//! # Builder Module
2//!
3//! Provides the [`CrawlerBuilder`], a fluent API for constructing and configuring
4//! [`Crawler`](crate::Crawler) instances with customizable settings and components.
5//!
6//! ## Overview
7//!
8//! The [`CrawlerBuilder`] simplifies the process of assembling various `spider-core`
9//! components into a fully configured web crawler. It provides a flexible,
10//! ergonomic interface for setting up all aspects of the crawling process.
11//!
12//! ## Key Features
13//!
14//! - **Concurrency Configuration**: Control the number of concurrent downloads,
15//!   parsing workers, and pipeline processors
16//! - **Component Registration**: Attach custom downloaders, middlewares, and pipelines
17//! - **Checkpoint Management**: Configure automatic saving and loading of crawl state
18//!   (requires `checkpoint` feature)
19//! - **Statistics Integration**: Initialize and connect the [`StatCollector`](crate::stats::StatCollector)
20//! - **Default Handling**: Automatic addition of essential middlewares when needed
21//!
22//! ## Example
23//!
24//! ```rust,ignore
25//! use spider_core::CrawlerBuilder;
26//! use spider_middleware::rate_limit::RateLimitMiddleware;
27//! use spider_pipeline::console::ConsolePipeline;
28//! use spider_util::error::SpiderError;
29//!
30//! async fn setup_crawler() -> Result<(), SpiderError> {
31//!     let crawler = CrawlerBuilder::new(MySpider)
32//!         .max_concurrent_downloads(10)
33//!         .max_parser_workers(4)
34//!         .add_middleware(RateLimitMiddleware::default())
35//!         .add_pipeline(ConsolePipeline::new())
36//!         .with_checkpoint_path("./crawl.checkpoint")
37//!         .build()
38//!         .await?;
39//!
40//!     crawler.start_crawl().await
41//! }
42//! ```
43
44use crate::Downloader;
45use crate::ReqwestClientDownloader;
46use crate::config::{CheckpointConfig, CrawlerConfig};
47use crate::scheduler::Scheduler;
48use crate::spider::Spider;
49use spider_middleware::middleware::Middleware;
50use spider_pipeline::pipeline::Pipeline;
51
52#[cfg(feature = "checkpoint")]
53type RestoreResult = (
54    Option<crate::SchedulerCheckpoint>,
55    Option<std::collections::HashMap<String, serde_json::Value>>,
56);
57use spider_util::error::SpiderError;
58use std::marker::PhantomData;
59use std::path::Path;
60use std::sync::Arc;
61use std::time::Duration;
62
63use super::Crawler;
64use crate::stats::StatCollector;
65use log::LevelFilter;
66#[cfg(feature = "checkpoint")]
67use log::{debug, warn};
68
69#[cfg(feature = "checkpoint")]
70use rmp_serde;
71#[cfg(feature = "checkpoint")]
72use std::fs;
73
74/// A fluent builder for constructing [`Crawler`] instances.
75///
76/// `CrawlerBuilder` provides a chainable API for configuring all aspects
77/// of a web crawler, including concurrency settings, middleware, pipelines,
78/// and checkpoint options.
79///
80/// ## Type Parameters
81///
82/// - `S`: The [`Spider`] implementation type
83/// - `D`: The [`Downloader`] implementation type
84///
85/// ## Example
86///
87/// ```rust,ignore
88/// # use spider_core::{CrawlerBuilder, Spider};
89/// # use spider_util::{response::Response, error::SpiderError, item::ParseOutput};
90/// # struct MySpider;
91/// # #[async_trait::async_trait]
92/// # impl Spider for MySpider {
93/// #     type Item = String;
94/// #     type State = ();
95/// #     fn start_urls(&self) -> Vec<&'static str> { vec![] }
96/// #     async fn parse(&self, response: Response, state: &Self::State) -> Result<ParseOutput<Self::Item>, SpiderError> { todo!() }
97/// # }
98/// let builder = CrawlerBuilder::new(MySpider)
99///     .max_concurrent_downloads(8)
100///     .max_parser_workers(4);
101/// ```
102pub struct CrawlerBuilder<S: Spider, D>
103where
104    D: Downloader,
105{
106    config: CrawlerConfig,
107    checkpoint_config: CheckpointConfig,
108    downloader: D,
109    spider: Option<S>,
110    middlewares: Vec<Box<dyn Middleware<D::Client> + Send + Sync>>,
111    pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
112    log_level: Option<LevelFilter>,
113    _phantom: PhantomData<S>,
114}
115
116impl<S: Spider> Default for CrawlerBuilder<S, ReqwestClientDownloader> {
117    fn default() -> Self {
118        Self {
119            config: CrawlerConfig::default(),
120            checkpoint_config: CheckpointConfig::default(),
121            downloader: ReqwestClientDownloader::default(),
122            spider: None,
123            middlewares: Vec::new(),
124            pipelines: Vec::new(),
125            log_level: None,
126            _phantom: PhantomData,
127        }
128    }
129}
130
131impl<S: Spider> CrawlerBuilder<S, ReqwestClientDownloader> {
132    /// Creates a new `CrawlerBuilder` for a given spider with the default [`ReqwestClientDownloader`].
133    ///
134    /// ## Example
135    ///
136    /// ```rust,ignore
137    /// let crawler = CrawlerBuilder::new(MySpider)
138    ///     .build()
139    ///     .await?;
140    /// ```
141    pub fn new(spider: S) -> Self {
142        Self {
143            spider: Some(spider),
144            ..Default::default()
145        }
146    }
147}
148
149impl<S: Spider, D: Downloader> CrawlerBuilder<S, D> {
150    #[cfg(feature = "checkpoint")]
151    fn load_checkpoint_from_path(&self, path: &std::path::Path) -> Option<crate::Checkpoint> {
152        match fs::read(path) {
153            Ok(bytes) => match rmp_serde::from_slice::<crate::Checkpoint>(&bytes) {
154                Ok(checkpoint) => Some(checkpoint),
155                Err(e) => {
156                    warn!("Failed to deserialize checkpoint from {:?}: {}", path, e);
157                    None
158                }
159            },
160            Err(e) => {
161                warn!("Failed to read checkpoint file {:?}: {}", path, e);
162                None
163            }
164        }
165    }
166
167    /// Sets the maximum number of concurrent downloads.
168    ///
169    /// This controls how many HTTP requests can be in-flight simultaneously.
170    /// Higher values increase throughput but may overwhelm target servers.
171    ///
172    /// ## Default
173    ///
174    /// Defaults to the number of CPU cores, with a minimum of 16.
175    pub fn max_concurrent_downloads(mut self, limit: usize) -> Self {
176        self.config.max_concurrent_downloads = limit;
177        self
178    }
179
180    /// Sets the number of worker tasks dedicated to parsing responses.
181    ///
182    /// Parser workers process HTTP responses concurrently, calling the
183    /// spider's [`parse`](Spider::parse) method to extract items and
184    /// discover new URLs.
185    ///
186    /// ## Default
187    ///
188    /// Defaults to the number of CPU cores, clamped between 4 and 16.
189    pub fn max_parser_workers(mut self, limit: usize) -> Self {
190        self.config.parser_workers = limit;
191        self
192    }
193
194    /// Sets the maximum number of concurrent item processing pipelines.
195    ///
196    /// This controls how many items can be processed by pipelines simultaneously.
197    ///
198    /// ## Default
199    ///
200    /// Defaults to the number of CPU cores, with a maximum of 8.
201    pub fn max_concurrent_pipelines(mut self, limit: usize) -> Self {
202        self.config.max_concurrent_pipelines = limit;
203        self
204    }
205
206    /// Sets the capacity of internal communication channels.
207    ///
208    /// This controls the buffer size for channels between the downloader,
209    /// parser, and pipeline components. Higher values can improve throughput
210    /// at the cost of increased memory usage.
211    ///
212    /// ## Default
213    ///
214    /// Defaults to 1000.
215    pub fn channel_capacity(mut self, capacity: usize) -> Self {
216        self.config.channel_capacity = capacity;
217        self
218    }
219
220    /// Sets a custom downloader implementation.
221    ///
222    /// Use this method to provide a custom [`Downloader`] implementation
223    /// instead of the default [`ReqwestClientDownloader`].
224    pub fn downloader(mut self, downloader: D) -> Self {
225        self.downloader = downloader;
226        self
227    }
228
229    /// Adds a middleware to the crawler's middleware stack.
230    ///
231    /// Middlewares intercept and modify requests before they are sent and
232    /// responses after they are received. They are executed in the order
233    /// they are added.
234    ///
235    /// ## Example
236    ///
237    /// ```rust,ignore
238    /// let crawler = CrawlerBuilder::new(MySpider)
239    ///     .add_middleware(RateLimitMiddleware::default())
240    ///     .add_middleware(RetryMiddleware::new())
241    ///     .build()
242    ///     .await?;
243    /// ```
244    pub fn add_middleware<M>(mut self, middleware: M) -> Self
245    where
246        M: Middleware<D::Client> + Send + Sync + 'static,
247    {
248        self.middlewares.push(Box::new(middleware));
249        self
250    }
251
252    /// Adds a pipeline to the crawler's pipeline stack.
253    ///
254    /// Pipelines process scraped items after they are extracted by the spider.
255    /// They can be used for validation, transformation, deduplication, or
256    /// storage (e.g., writing to files or databases).
257    ///
258    /// ## Example
259    ///
260    /// ```rust,ignore
261    /// let crawler = CrawlerBuilder::new(MySpider)
262    ///     .add_pipeline(ConsolePipeline::new())
263    ///     .add_pipeline(JsonPipeline::new("output.json")?)
264    ///     .build()
265    ///     .await?;
266    /// ```
267    pub fn add_pipeline<P>(mut self, pipeline: P) -> Self
268    where
269        P: Pipeline<S::Item> + 'static,
270    {
271        self.pipelines.push(Box::new(pipeline));
272        self
273    }
274
275    /// Sets the log level for `spider-*` library crates.
276    ///
277    /// This configures the logging level specifically for the spider-lib ecosystem
278    /// (spider-core, spider-middleware, spider-pipeline, spider-util, spider-downloader).
279    /// Logs from other dependencies (e.g., reqwest, tokio) will not be affected.
280    ///
281    /// ## Log Levels
282    ///
283    /// - `LevelFilter::Error` - Only error messages
284    /// - `LevelFilter::Warn` - Warnings and errors
285    /// - `LevelFilter::Info` - Informational messages, warnings, and errors
286    /// - `LevelFilter::Debug` - Debug messages and above
287    /// - `LevelFilter::Trace` - All messages including trace
288    ///
289    /// ## Example
290    ///
291    /// ```rust,ignore
292    /// use log::LevelFilter;
293    ///
294    /// let crawler = CrawlerBuilder::new(MySpider)
295    ///     .log_level(LevelFilter::Debug)
296    ///     .build()
297    ///     .await?;
298    /// ```
299    pub fn log_level(mut self, level: LevelFilter) -> Self {
300        self.log_level = Some(level);
301        self
302    }
303
304    /// Sets the path for saving and loading checkpoints.
305    ///
306    /// When enabled, the crawler periodically saves its state to this file,
307    /// allowing crawls to be resumed after interruption.
308    ///
309    /// Requires the `checkpoint` feature to be enabled.
310    pub fn with_checkpoint_path<P: AsRef<Path>>(mut self, path: P) -> Self {
311        self.checkpoint_config.path = Some(path.as_ref().to_path_buf());
312        self
313    }
314
315    /// Sets the interval between automatic checkpoint saves.
316    ///
317    /// When enabled, the crawler saves its state at this interval.
318    /// Shorter intervals provide more frequent recovery points but may
319    /// impact performance.
320    ///
321    /// Requires the `checkpoint` feature to be enabled.
322    pub fn with_checkpoint_interval(mut self, interval: Duration) -> Self {
323        self.checkpoint_config.interval = Some(interval);
324        self
325    }
326
327    /// Builds the [`Crawler`] instance.
328    ///
329    /// This method finalizes the crawler configuration and initializes all
330    /// components. It performs validation and sets up default values where
331    /// necessary.
332    ///
333    /// # Errors
334    ///
335    /// Returns a [`SpiderError::ConfigurationError`] if:
336    /// - `max_concurrent_downloads` is 0
337    /// - `parser_workers` is 0
338    /// - No spider was provided to the builder
339    ///
340    /// # Example
341    ///
342    /// ```rust,ignore
343    /// let crawler = CrawlerBuilder::new(MySpider)
344    ///     .max_concurrent_downloads(10)
345    ///     .build()
346    ///     .await?;
347    /// ```
348    pub async fn build(mut self) -> Result<Crawler<S, D::Client>, SpiderError>
349    where
350        D: Downloader + Send + Sync + 'static,
351        D::Client: Send + Sync + Clone,
352        S::Item: Send + Sync + 'static,
353    {
354        let spider = self.take_spider()?;
355        self.init_default_pipeline();
356
357        // Initialize logging for spider-* crates if log level is configured
358        if let Some(level) = self.log_level {
359            self.init_logging(level);
360        }
361
362        // Validate config
363        self.config
364            .validate()
365            .map_err(SpiderError::ConfigurationError)?;
366
367        // Restore checkpoint and get scheduler state
368        #[cfg(feature = "checkpoint")]
369        let (scheduler_state, pipeline_states) = self.restore_checkpoint()?;
370        #[cfg(not(feature = "checkpoint"))]
371        let scheduler_state: Option<()> = None;
372
373        // Restore pipeline states if checkpoint was loaded
374        #[cfg(feature = "checkpoint")]
375        {
376            if let Some(states) = pipeline_states {
377                for (name, state) in states {
378                    if let Some(pipeline) = self.pipelines.iter().find(|p| p.name() == name) {
379                        pipeline.restore_state(state).await?;
380                    } else {
381                        warn!("Checkpoint contains state for unknown pipeline: {}", name);
382                    }
383                }
384            }
385        }
386
387        // Get cookie store if feature is enabled
388        #[cfg(feature = "cookie-store")]
389        let cookie_store = {
390            #[cfg(feature = "checkpoint")]
391            {
392                let (_, cookie_store) = self.restore_cookie_store().await?;
393                cookie_store
394            }
395            #[cfg(not(feature = "checkpoint"))]
396            {
397                Some(crate::CookieStore::default())
398            }
399        };
400
401        // Create scheduler with or without checkpoint state
402        let (scheduler_arc, req_rx) = Scheduler::new(scheduler_state);
403        let downloader_arc = Arc::new(self.downloader);
404        let stats = Arc::new(StatCollector::new());
405
406        // Build crawler with or without cookie store based on feature flag
407        #[cfg(feature = "cookie-store")]
408        let crawler = Crawler::new(
409            scheduler_arc,
410            req_rx,
411            downloader_arc,
412            self.middlewares,
413            spider,
414            self.pipelines,
415            self.config,
416            #[cfg(feature = "checkpoint")]
417            self.checkpoint_config,
418            stats,
419            Arc::new(tokio::sync::RwLock::new(cookie_store.unwrap_or_default())),
420        );
421
422        #[cfg(not(feature = "cookie-store"))]
423        let crawler = Crawler::new(
424            scheduler_arc,
425            req_rx,
426            downloader_arc,
427            self.middlewares,
428            spider,
429            self.pipelines,
430            self.config,
431            #[cfg(feature = "checkpoint")]
432            self.checkpoint_config,
433            stats,
434        );
435
436        Ok(crawler)
437    }
438
439    /// Restores checkpoint state from disk (checkpoint feature only).
440    ///
441    /// This internal method loads a previously saved checkpoint file and
442    /// restores the scheduler state and pipeline states.
443    ///
444    /// # Returns
445    ///
446    /// Returns a tuple of `(SchedulerCheckpoint, Option<HashMap<String, Value>>)`.
447    /// If no checkpoint path is configured or the file doesn't exist, returns
448    /// default values.
449    ///
450    /// # Errors
451    ///
452    /// Returns a [`SpiderError`] if deserialization fails.
453    /// Note: Checkpoint file read/deserialization errors are logged as warnings
454    /// but do not fail the operation—the crawl proceeds without checkpoint data.
455    #[cfg(feature = "checkpoint")]
456    fn restore_checkpoint(&mut self) -> Result<RestoreResult, SpiderError> {
457        let mut scheduler_state = None;
458        let mut pipeline_states = None;
459
460        if let Some(path) = &self.checkpoint_config.path {
461            debug!("Attempting to load checkpoint from {:?}", path);
462            if let Some(checkpoint) = self.load_checkpoint_from_path(path) {
463                scheduler_state = Some(checkpoint.scheduler);
464                pipeline_states = Some(checkpoint.pipelines);
465            }
466        }
467
468        Ok((scheduler_state, pipeline_states))
469    }
470
471    /// Restores cookie store from checkpoint (checkpoint + cookie-store features).
472    #[cfg(all(feature = "checkpoint", feature = "cookie-store"))]
473    async fn restore_cookie_store(
474        &mut self,
475    ) -> Result<(Option<()>, Option<crate::CookieStore>), SpiderError> {
476        let mut cookie_store = None;
477
478        if let Some(path) = &self.checkpoint_config.path {
479            debug!("Attempting to load cookie store from checkpoint {:?}", path);
480            if let Some(checkpoint) = self.load_checkpoint_from_path(path) {
481                cookie_store = Some(checkpoint.cookie_store);
482            }
483        }
484
485        Ok((None, cookie_store))
486    }
487
488    /// Extracts the spider from the builder.
489    ///
490    /// # Errors
491    ///
492    /// Returns a [`SpiderError::ConfigurationError`] if:
493    /// - `max_concurrent_downloads` is 0
494    /// - `parser_workers` is 0
495    /// - No spider was provided
496    fn take_spider(&mut self) -> Result<S, SpiderError> {
497        if self.config.max_concurrent_downloads == 0 {
498            return Err(SpiderError::ConfigurationError(
499                "max_concurrent_downloads must be greater than 0.".to_string(),
500            ));
501        }
502        if self.config.parser_workers == 0 {
503            return Err(SpiderError::ConfigurationError(
504                "parser_workers must be greater than 0.".to_string(),
505            ));
506        }
507        self.spider.take().ok_or_else(|| {
508            SpiderError::ConfigurationError("Crawler must have a spider.".to_string())
509        })
510    }
511
512    /// Initializes the pipeline stack with a default [`ConsolePipeline`] if empty.
513    ///
514    /// This ensures that scraped items are always output somewhere, even if
515    /// no explicit pipelines are configured by the user.
516    fn init_default_pipeline(&mut self) {
517        if self.pipelines.is_empty() {
518            use spider_pipeline::console::ConsolePipeline;
519            self.pipelines.push(Box::new(ConsolePipeline::new()));
520        }
521    }
522
523    /// Initializes logging for spider-* crates only.
524    ///
525    /// This sets up env_logger with a filter that only enables logging for
526    /// crates within the spider-lib ecosystem.
527    fn init_logging(&self, level: LevelFilter) {
528        use env_logger::{Builder, Env};
529
530        let mut builder = Builder::from_env(Env::default().default_filter_or("off"));
531
532        // Set filter specifically for spider-* crates
533        builder.filter_module("spider_core", level);
534        builder.filter_module("spider_middleware", level);
535        builder.filter_module("spider_pipeline", level);
536        builder.filter_module("spider_util", level);
537        builder.filter_module("spider_downloader", level);
538        builder.filter_module("spider_macro", level);
539
540        builder.init();
541    }
542}