Skip to main content

spider_core/
builder.rs

1//! # Builder Module
2//!
3//! Provides the [`CrawlerBuilder`], a fluent API for constructing and configuring
4//! [`Crawler`](crate::Crawler) instances with customizable settings and components.
5//!
6//! ## Overview
7//!
8//! The [`CrawlerBuilder`] simplifies the process of assembling various `spider-core`
9//! components into a fully configured web crawler. It provides a flexible,
10//! ergonomic interface for setting up all aspects of the crawling process.
11//!
12//! ## Key Features
13//!
14//! - **Concurrency Configuration**: Control the number of concurrent downloads,
15//!   parsing workers, and pipeline processors
16//! - **Component Registration**: Attach custom downloaders, middlewares, and pipelines
17//! - **Checkpoint Management**: Configure automatic saving and loading of crawl state
18//!   (requires `checkpoint` feature)
19//! - **Statistics Integration**: Initialize and connect the [`StatCollector`](crate::stats::StatCollector)
20//! - **Default Handling**: Automatic addition of essential middlewares when needed
21//!
22//! ## Example
23//!
24//! ```rust,ignore
25//! use spider_core::CrawlerBuilder;
26//! use spider_middleware::rate_limit::RateLimitMiddleware;
27//! use spider_pipeline::console::ConsolePipeline;
28//! use spider_util::error::SpiderError;
29//!
30//! async fn setup_crawler() -> Result<(), SpiderError> {
31//!     let crawler = CrawlerBuilder::new(MySpider)
32//!         .max_concurrent_downloads(10)
33//!         .max_parser_workers(4)
34//!         .add_middleware(RateLimitMiddleware::default())
35//!         .add_pipeline(ConsolePipeline::new())
36//!         .with_checkpoint_path("./crawl.checkpoint")
37//!         .build()
38//!         .await?;
39//!
40//!     crawler.start_crawl().await
41//! }
42//! ```
43
44use crate::Downloader;
45use crate::ReqwestClientDownloader;
46use crate::config::{CheckpointConfig, CrawlerConfig};
47use crate::scheduler::Scheduler;
48use crate::spider::Spider;
49use spider_middleware::middleware::Middleware;
50use spider_pipeline::pipeline::Pipeline;
51
52#[cfg(feature = "checkpoint")]
53type RestoreResult = (
54    Option<crate::SchedulerCheckpoint>,
55    Option<std::collections::HashMap<String, serde_json::Value>>,
56);
57use spider_util::error::SpiderError;
58use std::marker::PhantomData;
59use std::path::Path;
60use std::sync::Arc;
61use std::time::Duration;
62
63use super::Crawler;
64use crate::stats::StatCollector;
65use log::LevelFilter;
66#[cfg(feature = "checkpoint")]
67use log::{debug, warn};
68
69#[cfg(feature = "checkpoint")]
70use rmp_serde;
71#[cfg(feature = "checkpoint")]
72use std::fs;
73
74/// A fluent builder for constructing [`Crawler`] instances.
75///
76/// `CrawlerBuilder` provides a chainable API for configuring all aspects
77/// of a web crawler, including concurrency settings, middleware, pipelines,
78/// and checkpoint options.
79///
80/// ## Type Parameters
81///
82/// - `S`: The [`Spider`] implementation type
83/// - `D`: The [`Downloader`] implementation type
84///
85/// ## Example
86///
87/// ```rust,ignore
88/// # use spider_core::{CrawlerBuilder, Spider};
89/// # use spider_util::{response::Response, error::SpiderError, item::ParseOutput};
90/// # struct MySpider;
91/// # #[async_trait::async_trait]
92/// # impl Spider for MySpider {
93/// #     type Item = String;
94/// #     type State = ();
95/// #     fn start_urls(&self) -> Vec<&'static str> { vec![] }
96/// #     async fn parse(&self, response: Response, state: &Self::State) -> Result<ParseOutput<Self::Item>, SpiderError> { todo!() }
97/// # }
98/// let builder = CrawlerBuilder::new(MySpider)
99///     .max_concurrent_downloads(8)
100///     .max_parser_workers(4);
101/// ```
102pub struct CrawlerBuilder<S: Spider, D>
103where
104    D: Downloader,
105{
106    config: CrawlerConfig,
107    checkpoint_config: CheckpointConfig,
108    downloader: D,
109    spider: Option<S>,
110    middlewares: Vec<Box<dyn Middleware<D::Client> + Send + Sync>>,
111    pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
112    log_level: Option<LevelFilter>,
113    _phantom: PhantomData<S>,
114}
115
116impl<S: Spider> Default for CrawlerBuilder<S, ReqwestClientDownloader> {
117    fn default() -> Self {
118        Self {
119            config: CrawlerConfig::default(),
120            checkpoint_config: CheckpointConfig::default(),
121            downloader: ReqwestClientDownloader::default(),
122            spider: None,
123            middlewares: Vec::new(),
124            pipelines: Vec::new(),
125            log_level: None,
126            _phantom: PhantomData,
127        }
128    }
129}
130
131impl<S: Spider> CrawlerBuilder<S, ReqwestClientDownloader> {
132    /// Creates a new `CrawlerBuilder` for a given spider with the default [`ReqwestClientDownloader`].
133    ///
134    /// ## Example
135    ///
136    /// ```rust,ignore
137    /// let crawler = CrawlerBuilder::new(MySpider)
138    ///     .build()
139    ///     .await?;
140    /// ```
141    pub fn new(spider: S) -> Self {
142        Self {
143            spider: Some(spider),
144            ..Default::default()
145        }
146    }
147}
148
149impl<S: Spider, D: Downloader> CrawlerBuilder<S, D> {
150    #[cfg(feature = "checkpoint")]
151    fn load_checkpoint_from_path(&self, path: &std::path::Path) -> Option<crate::Checkpoint> {
152        match fs::read(path) {
153            Ok(bytes) => match rmp_serde::from_slice::<crate::Checkpoint>(&bytes) {
154                Ok(checkpoint) => Some(checkpoint),
155                Err(e) => {
156                    warn!("Failed to deserialize checkpoint from {:?}: {}", path, e);
157                    None
158                }
159            },
160            Err(e) => {
161                warn!("Failed to read checkpoint file {:?}: {}", path, e);
162                None
163            }
164        }
165    }
166
167    /// Sets the maximum number of concurrent downloads.
168    ///
169    /// This controls how many HTTP requests can be in-flight simultaneously.
170    /// Higher values increase throughput but may overwhelm target servers.
171    ///
172    /// ## Default
173    ///
174    /// Defaults to the number of CPU cores, with a minimum of 16.
175    pub fn max_concurrent_downloads(mut self, limit: usize) -> Self {
176        self.config.max_concurrent_downloads = limit;
177        self
178    }
179
180    /// Sets the number of worker tasks dedicated to parsing responses.
181    ///
182    /// Parser workers process HTTP responses concurrently, calling the
183    /// spider's [`parse`](Spider::parse) method to extract items and
184    /// discover new URLs.
185    ///
186    /// ## Default
187    ///
188    /// Defaults to the number of CPU cores, clamped between 4 and 16.
189    pub fn max_parser_workers(mut self, limit: usize) -> Self {
190        self.config.parser_workers = limit;
191        self
192    }
193
194    /// Sets the maximum number of concurrent item processing pipelines.
195    ///
196    /// This controls how many items can be processed by pipelines simultaneously.
197    ///
198    /// ## Default
199    ///
200    /// Defaults to the number of CPU cores, with a maximum of 8.
201    pub fn max_concurrent_pipelines(mut self, limit: usize) -> Self {
202        self.config.max_concurrent_pipelines = limit;
203        self
204    }
205
206    /// Sets the capacity of internal communication channels.
207    ///
208    /// This controls the buffer size for channels between the downloader,
209    /// parser, and pipeline components. Higher values can improve throughput
210    /// at the cost of increased memory usage.
211    ///
212    /// ## Default
213    ///
214    /// Defaults to 1000.
215    pub fn channel_capacity(mut self, capacity: usize) -> Self {
216        self.config.channel_capacity = capacity;
217        self
218    }
219
220    /// Enables or disables live, in-place statistics updates on terminal stdout.
221    ///
222    /// When enabled, spider-* logs are forced to `LevelFilter::Off` during build
223    /// to avoid interleaving with the live terminal renderer.
224    pub fn live_stats(mut self, enabled: bool) -> Self {
225        self.config.live_stats = enabled;
226        self
227    }
228
229    /// Sets the refresh interval for live statistics updates.
230    pub fn live_stats_interval(mut self, interval: Duration) -> Self {
231        self.config.live_stats_interval = interval;
232        self
233    }
234
235    /// Sets a custom downloader implementation.
236    ///
237    /// Use this method to provide a custom [`Downloader`] implementation
238    /// instead of the default [`ReqwestClientDownloader`].
239    pub fn downloader(mut self, downloader: D) -> Self {
240        self.downloader = downloader;
241        self
242    }
243
244    /// Adds a middleware to the crawler's middleware stack.
245    ///
246    /// Middlewares intercept and modify requests before they are sent and
247    /// responses after they are received. They are executed in the order
248    /// they are added.
249    ///
250    /// ## Example
251    ///
252    /// ```rust,ignore
253    /// let crawler = CrawlerBuilder::new(MySpider)
254    ///     .add_middleware(RateLimitMiddleware::default())
255    ///     .add_middleware(RetryMiddleware::new())
256    ///     .build()
257    ///     .await?;
258    /// ```
259    pub fn add_middleware<M>(mut self, middleware: M) -> Self
260    where
261        M: Middleware<D::Client> + Send + Sync + 'static,
262    {
263        self.middlewares.push(Box::new(middleware));
264        self
265    }
266
267    /// Adds a pipeline to the crawler's pipeline stack.
268    ///
269    /// Pipelines process scraped items after they are extracted by the spider.
270    /// They can be used for validation, transformation, deduplication, or
271    /// storage (e.g., writing to files or databases).
272    ///
273    /// ## Example
274    ///
275    /// ```rust,ignore
276    /// let crawler = CrawlerBuilder::new(MySpider)
277    ///     .add_pipeline(ConsolePipeline::new())
278    ///     .add_pipeline(JsonPipeline::new("output.json")?)
279    ///     .build()
280    ///     .await?;
281    /// ```
282    pub fn add_pipeline<P>(mut self, pipeline: P) -> Self
283    where
284        P: Pipeline<S::Item> + 'static,
285    {
286        self.pipelines.push(Box::new(pipeline));
287        self
288    }
289
290    /// Sets the log level for `spider-*` library crates.
291    ///
292    /// This configures the logging level specifically for the spider-lib ecosystem
293    /// (spider-core, spider-middleware, spider-pipeline, spider-util, spider-downloader).
294    /// Logs from other dependencies (e.g., reqwest, tokio) will not be affected.
295    ///
296    /// ## Log Levels
297    ///
298    /// - `LevelFilter::Error` - Only error messages
299    /// - `LevelFilter::Warn` - Warnings and errors
300    /// - `LevelFilter::Info` - Informational messages, warnings, and errors
301    /// - `LevelFilter::Debug` - Debug messages and above
302    /// - `LevelFilter::Trace` - All messages including trace
303    ///
304    /// ## Example
305    ///
306    /// ```rust,ignore
307    /// use log::LevelFilter;
308    ///
309    /// let crawler = CrawlerBuilder::new(MySpider)
310    ///     .log_level(LevelFilter::Debug)
311    ///     .build()
312    ///     .await?;
313    /// ```
314    pub fn log_level(mut self, level: LevelFilter) -> Self {
315        self.log_level = Some(level);
316        self
317    }
318
319    /// Sets the path for saving and loading checkpoints.
320    ///
321    /// When enabled, the crawler periodically saves its state to this file,
322    /// allowing crawls to be resumed after interruption.
323    ///
324    /// Requires the `checkpoint` feature to be enabled.
325    pub fn with_checkpoint_path<P: AsRef<Path>>(mut self, path: P) -> Self {
326        self.checkpoint_config.path = Some(path.as_ref().to_path_buf());
327        self
328    }
329
330    /// Sets the interval between automatic checkpoint saves.
331    ///
332    /// When enabled, the crawler saves its state at this interval.
333    /// Shorter intervals provide more frequent recovery points but may
334    /// impact performance.
335    ///
336    /// Requires the `checkpoint` feature to be enabled.
337    pub fn with_checkpoint_interval(mut self, interval: Duration) -> Self {
338        self.checkpoint_config.interval = Some(interval);
339        self
340    }
341
342    /// Builds the [`Crawler`] instance.
343    ///
344    /// This method finalizes the crawler configuration and initializes all
345    /// components. It performs validation and sets up default values where
346    /// necessary.
347    ///
348    /// # Errors
349    ///
350    /// Returns a [`SpiderError::ConfigurationError`] if:
351    /// - `max_concurrent_downloads` is 0
352    /// - `parser_workers` is 0
353    /// - No spider was provided to the builder
354    ///
355    /// # Example
356    ///
357    /// ```rust,ignore
358    /// let crawler = CrawlerBuilder::new(MySpider)
359    ///     .max_concurrent_downloads(10)
360    ///     .build()
361    ///     .await?;
362    /// ```
363    pub async fn build(mut self) -> Result<Crawler<S, D::Client>, SpiderError>
364    where
365        D: Downloader + Send + Sync + 'static,
366        D::Client: Send + Sync + Clone,
367        S::Item: Send + Sync + 'static,
368    {
369        let spider = self.take_spider()?;
370        self.init_default_pipeline();
371
372        // Live stats redraw on stdout and should not interleave with spider-* logs.
373        // Force spider-* logs to Off when live stats mode is enabled.
374        let effective_log_level = if self.config.live_stats {
375            Some(LevelFilter::Off)
376        } else {
377            self.log_level
378        };
379
380        // Initialize logging for spider-* crates if an effective log level is configured.
381        if let Some(level) = effective_log_level {
382            self.init_logging(level);
383        }
384
385        // Validate config
386        self.config
387            .validate()
388            .map_err(SpiderError::ConfigurationError)?;
389
390        // Restore checkpoint and get scheduler state
391        #[cfg(feature = "checkpoint")]
392        let (scheduler_state, pipeline_states) = self.restore_checkpoint()?;
393        #[cfg(not(feature = "checkpoint"))]
394        let scheduler_state: Option<()> = None;
395
396        // Restore pipeline states if checkpoint was loaded
397        #[cfg(feature = "checkpoint")]
398        {
399            if let Some(states) = pipeline_states {
400                for (name, state) in states {
401                    if let Some(pipeline) = self.pipelines.iter().find(|p| p.name() == name) {
402                        pipeline.restore_state(state).await?;
403                    } else {
404                        warn!("Checkpoint contains state for unknown pipeline: {}", name);
405                    }
406                }
407            }
408        }
409
410        // Get cookie store if feature is enabled
411        #[cfg(feature = "cookie-store")]
412        let cookie_store = {
413            #[cfg(feature = "checkpoint")]
414            {
415                let (_, cookie_store) = self.restore_cookie_store().await?;
416                cookie_store
417            }
418            #[cfg(not(feature = "checkpoint"))]
419            {
420                Some(crate::CookieStore::default())
421            }
422        };
423
424        // Create scheduler with or without checkpoint state
425        let (scheduler_arc, req_rx) = Scheduler::new(scheduler_state);
426        let downloader_arc = Arc::new(self.downloader);
427        let stats = Arc::new(StatCollector::new());
428
429        // Build crawler with or without cookie store based on feature flag
430        #[cfg(feature = "cookie-store")]
431        let crawler = Crawler::new(
432            scheduler_arc,
433            req_rx,
434            downloader_arc,
435            self.middlewares,
436            spider,
437            self.pipelines,
438            self.config,
439            #[cfg(feature = "checkpoint")]
440            self.checkpoint_config,
441            stats,
442            Arc::new(tokio::sync::RwLock::new(cookie_store.unwrap_or_default())),
443        );
444
445        #[cfg(not(feature = "cookie-store"))]
446        let crawler = Crawler::new(
447            scheduler_arc,
448            req_rx,
449            downloader_arc,
450            self.middlewares,
451            spider,
452            self.pipelines,
453            self.config,
454            #[cfg(feature = "checkpoint")]
455            self.checkpoint_config,
456            stats,
457        );
458
459        Ok(crawler)
460    }
461
462    /// Restores checkpoint state from disk (checkpoint feature only).
463    ///
464    /// This internal method loads a previously saved checkpoint file and
465    /// restores the scheduler state and pipeline states.
466    ///
467    /// # Returns
468    ///
469    /// Returns a tuple of `(SchedulerCheckpoint, Option<HashMap<String, Value>>)`.
470    /// If no checkpoint path is configured or the file doesn't exist, returns
471    /// default values.
472    ///
473    /// # Errors
474    ///
475    /// Returns a [`SpiderError`] if deserialization fails.
476    /// Note: Checkpoint file read/deserialization errors are logged as warnings
477    /// but do not fail the operation—the crawl proceeds without checkpoint data.
478    #[cfg(feature = "checkpoint")]
479    fn restore_checkpoint(&mut self) -> Result<RestoreResult, SpiderError> {
480        let mut scheduler_state = None;
481        let mut pipeline_states = None;
482
483        if let Some(path) = &self.checkpoint_config.path {
484            debug!("Attempting to load checkpoint from {:?}", path);
485            if let Some(checkpoint) = self.load_checkpoint_from_path(path) {
486                scheduler_state = Some(checkpoint.scheduler);
487                pipeline_states = Some(checkpoint.pipelines);
488            }
489        }
490
491        Ok((scheduler_state, pipeline_states))
492    }
493
494    /// Restores cookie store from checkpoint (checkpoint + cookie-store features).
495    #[cfg(all(feature = "checkpoint", feature = "cookie-store"))]
496    async fn restore_cookie_store(
497        &mut self,
498    ) -> Result<(Option<()>, Option<crate::CookieStore>), SpiderError> {
499        let mut cookie_store = None;
500
501        if let Some(path) = &self.checkpoint_config.path {
502            debug!("Attempting to load cookie store from checkpoint {:?}", path);
503            if let Some(checkpoint) = self.load_checkpoint_from_path(path) {
504                cookie_store = Some(checkpoint.cookie_store);
505            }
506        }
507
508        Ok((None, cookie_store))
509    }
510
511    /// Extracts the spider from the builder.
512    ///
513    /// # Errors
514    ///
515    /// Returns a [`SpiderError::ConfigurationError`] if:
516    /// - `max_concurrent_downloads` is 0
517    /// - `parser_workers` is 0
518    /// - No spider was provided
519    fn take_spider(&mut self) -> Result<S, SpiderError> {
520        if self.config.max_concurrent_downloads == 0 {
521            return Err(SpiderError::ConfigurationError(
522                "max_concurrent_downloads must be greater than 0.".to_string(),
523            ));
524        }
525        if self.config.parser_workers == 0 {
526            return Err(SpiderError::ConfigurationError(
527                "parser_workers must be greater than 0.".to_string(),
528            ));
529        }
530        self.spider.take().ok_or_else(|| {
531            SpiderError::ConfigurationError("Crawler must have a spider.".to_string())
532        })
533    }
534
535    /// Initializes the pipeline stack with a default [`ConsolePipeline`] if empty.
536    ///
537    /// This ensures that scraped items are always output somewhere, even if
538    /// no explicit pipelines are configured by the user.
539    fn init_default_pipeline(&mut self) {
540        if self.pipelines.is_empty() {
541            use spider_pipeline::console::ConsolePipeline;
542            self.pipelines.push(Box::new(ConsolePipeline::new()));
543        }
544    }
545
546    /// Initializes logging for spider-* crates only.
547    ///
548    /// This sets up env_logger with a filter that only enables logging for
549    /// crates within the spider-lib ecosystem.
550    fn init_logging(&self, level: LevelFilter) {
551        use env_logger::{Builder, Env};
552
553        let mut builder = Builder::from_env(Env::default().default_filter_or("off"));
554
555        // Set filter specifically for spider-* crates
556        builder.filter_module("spider_core", level);
557        builder.filter_module("spider_middleware", level);
558        builder.filter_module("spider_pipeline", level);
559        builder.filter_module("spider_util", level);
560        builder.filter_module("spider_downloader", level);
561        builder.filter_module("spider_macro", level);
562
563        builder.init();
564    }
565}