Skip to main content

spider_core/
builder.rs

1//! Builder API for assembling a [`Crawler`].
2//!
3//! [`CrawlerBuilder`] is where runtime composition happens: concurrency,
4//! downloader selection, middleware, pipelines, item limits, logging, and
5//! optional checkpointing all start here.
6//!
7//! ## Example
8//!
9//! ```rust,ignore
10//! use spider_core::CrawlerBuilder;
11//! use spider_middleware::rate_limit::RateLimitMiddleware;
12//! use spider_pipeline::console::ConsolePipeline;
13//! use spider_util::error::SpiderError;
14//!
15//! async fn setup_crawler() -> Result<(), SpiderError> {
16//!     let crawler = CrawlerBuilder::new(MySpider)
17//!         .max_concurrent_downloads(10)
18//!         .max_parser_workers(4)
19//!         .add_middleware(RateLimitMiddleware::default())
20//!         .add_pipeline(ConsolePipeline::new())
21//!         .with_checkpoint_path("./crawl.checkpoint")
22//!         .build()
23//!         .await?;
24//!
25//!     crawler.start_crawl().await
26//! }
27//! ```
28
29use crate::Downloader;
30use crate::ReqwestClientDownloader;
31use crate::config::{CheckpointConfig, CrawlerConfig};
32use crate::scheduler::Scheduler;
33use crate::spider::Spider;
34use spider_middleware::middleware::Middleware;
35use spider_pipeline::pipeline::Pipeline;
36
37#[cfg(feature = "checkpoint")]
38type RestoreResult = (
39    Option<crate::SchedulerCheckpoint>,
40    Option<std::collections::HashMap<String, serde_json::Value>>,
41);
42use spider_util::error::SpiderError;
43use std::marker::PhantomData;
44use std::path::Path;
45use std::sync::Arc;
46use std::time::Duration;
47
48use super::Crawler;
49use crate::stats::StatCollector;
50use log::LevelFilter;
51#[cfg(feature = "checkpoint")]
52use log::{info, warn};
53
54#[cfg(feature = "checkpoint")]
55use rmp_serde;
56#[cfg(feature = "checkpoint")]
57use std::fs;
58
59/// A fluent builder for constructing [`Crawler`] instances.
60///
61/// ## Type Parameters
62///
63/// - `S`: The [`Spider`] implementation type
64/// - `D`: The [`Downloader`] implementation type
65///
66/// ## Example
67///
68/// ```rust,ignore
69/// # use spider_core::{CrawlerBuilder, Spider};
70/// # use spider_util::{response::Response, error::SpiderError, item::ParseOutput};
71/// # struct MySpider;
72/// # #[async_trait::async_trait]
73/// # impl Spider for MySpider {
74/// #     type Item = String;
75/// #     type State = ();
76/// #     fn start_requests(&self) -> Result<spider_core::spider::StartRequests<'_>, SpiderError> {
77/// #         Ok(spider_core::spider::StartRequests::Iter(Box::new(std::iter::empty())))
78/// #     }
79/// #     async fn parse(&self, response: Response, state: &Self::State) -> Result<ParseOutput<Self::Item>, SpiderError> { todo!() }
80/// # }
81/// let builder = CrawlerBuilder::new(MySpider)
82///     .max_concurrent_downloads(8)
83///     .max_pending_requests(16)
84///     .max_parser_workers(4);
85/// ```
86pub struct CrawlerBuilder<S: Spider, D>
87where
88    D: Downloader,
89{
90    config: CrawlerConfig,
91    checkpoint_config: CheckpointConfig,
92    downloader: D,
93    spider: Option<S>,
94    middlewares: Vec<Box<dyn Middleware<D::Client> + Send + Sync>>,
95    pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
96    log_level: Option<LevelFilter>,
97    _phantom: PhantomData<S>,
98}
99
100impl<S: Spider> Default for CrawlerBuilder<S, ReqwestClientDownloader> {
101    fn default() -> Self {
102        Self {
103            config: CrawlerConfig::default(),
104            checkpoint_config: CheckpointConfig::default(),
105            downloader: ReqwestClientDownloader::default(),
106            spider: None,
107            middlewares: Vec::new(),
108            pipelines: Vec::new(),
109            log_level: None,
110            _phantom: PhantomData,
111        }
112    }
113}
114
115impl<S: Spider> CrawlerBuilder<S, ReqwestClientDownloader> {
116    /// Creates a new `CrawlerBuilder` for a given spider with the default [`ReqwestClientDownloader`].
117    ///
118    /// ## Example
119    ///
120    /// ```rust,ignore
121    /// let crawler = CrawlerBuilder::new(MySpider)
122    ///     .build()
123    ///     .await?;
124    /// ```
125    pub fn new(spider: S) -> Self {
126        Self {
127            spider: Some(spider),
128            ..Default::default()
129        }
130    }
131}
132
133impl<S: Spider, D: Downloader> CrawlerBuilder<S, D> {
134    #[cfg(feature = "checkpoint")]
135    fn load_checkpoint_from_path(&self, path: &std::path::Path) -> Option<crate::Checkpoint> {
136        match fs::read(path) {
137            Ok(bytes) => match rmp_serde::from_slice::<crate::Checkpoint>(&bytes) {
138                Ok(checkpoint) => Some(checkpoint),
139                Err(e) => {
140                    warn!("Failed to deserialize checkpoint from {:?}: {}", path, e);
141                    None
142                }
143            },
144            Err(e) => {
145                warn!("Failed to read checkpoint file {:?}: {}", path, e);
146                None
147            }
148        }
149    }
150
151    /// Sets the maximum number of concurrent downloads.
152    ///
153    /// This controls how many HTTP requests can be in-flight simultaneously.
154    /// Higher values increase throughput but may overwhelm target servers.
155    ///
156    /// ## Default
157    ///
158    /// Defaults to twice the number of CPU cores, clamped between 4 and 64.
159    pub fn max_concurrent_downloads(mut self, limit: usize) -> Self {
160        self.config.max_concurrent_downloads = limit;
161        self
162    }
163
164    /// Sets the maximum number of outstanding requests tracked by the scheduler.
165    ///
166    /// This includes queued requests plus requests already handed off for download.
167    /// Lower values keep the frontier tighter and reduce internal request buildup.
168    pub fn max_pending_requests(mut self, limit: usize) -> Self {
169        self.config.max_pending_requests = limit;
170        self
171    }
172
173    /// Sets the number of worker tasks dedicated to parsing responses.
174    ///
175    /// Parser workers process HTTP responses concurrently, calling the
176    /// spider's [`parse`](Spider::parse) method to extract items and
177    /// discover new URLs.
178    ///
179    /// ## Default
180    ///
181    /// Defaults to the number of CPU cores, clamped between 4 and 16.
182    pub fn max_parser_workers(mut self, limit: usize) -> Self {
183        self.config.parser_workers = limit;
184        self
185    }
186
187    /// Sets the maximum number of concurrent item processing pipelines.
188    ///
189    /// This controls how many items can be processed by pipelines simultaneously.
190    ///
191    /// ## Default
192    ///
193    /// Defaults to the number of CPU cores, with a maximum of 8.
194    pub fn max_concurrent_pipelines(mut self, limit: usize) -> Self {
195        self.config.max_concurrent_pipelines = limit;
196        self
197    }
198
199    /// Sets the capacity of internal communication channels.
200    ///
201    /// This controls the buffer size for channels between the downloader,
202    /// parser, and pipeline components. Higher values can improve throughput
203    /// at the cost of increased memory usage.
204    ///
205    /// ## Default
206    ///
207    /// Defaults to 1000.
208    pub fn channel_capacity(mut self, capacity: usize) -> Self {
209        self.config.channel_capacity = capacity;
210        self
211    }
212
213    /// Sets the parser output batch size.
214    ///
215    /// Larger batches can reduce coordination overhead when pages emit many
216    /// items or follow-up requests, while smaller batches tend to improve
217    /// latency and memory locality.
218    pub fn output_batch_size(mut self, batch_size: usize) -> Self {
219        self.config.output_batch_size = batch_size;
220        self
221    }
222
223    /// Sets the downloader response-channel backpressure threshold.
224    ///
225    /// When the downloader-to-parser channel reaches this threshold, the
226    /// runtime starts applying backpressure so downloaded responses do not pile
227    /// up unboundedly in memory.
228    pub fn response_backpressure_threshold(mut self, threshold: usize) -> Self {
229        self.config.response_backpressure_threshold = threshold;
230        self
231    }
232
233    /// Sets the parser item-channel backpressure threshold.
234    ///
235    /// This primarily matters when parsing is faster than downstream pipeline
236    /// processing. Lower thresholds keep memory tighter; higher thresholds let
237    /// parsers run further ahead.
238    pub fn item_backpressure_threshold(mut self, threshold: usize) -> Self {
239        self.config.item_backpressure_threshold = threshold;
240        self
241    }
242
243    /// Controls whether retries release downloader permits before waiting.
244    ///
245    /// Enabling this is usually better for throughput because a sleeping retry
246    /// does not occupy scarce downloader concurrency. Disabling it can be
247    /// useful when you want retries to count fully against download capacity.
248    pub fn retry_release_permit(mut self, enabled: bool) -> Self {
249        self.config.retry_release_permit = enabled;
250        self
251    }
252
253    /// Enables or disables live, in-place statistics updates on terminal stdout.
254    ///
255    /// When enabled, spider-* logs are forced to `LevelFilter::Off` during build
256    /// to avoid interleaving with the live terminal renderer.
257    pub fn live_stats(mut self, enabled: bool) -> Self {
258        self.config.live_stats = enabled;
259        self
260    }
261
262    /// Sets the refresh interval for live statistics updates.
263    ///
264    /// Shorter intervals make the terminal view feel more responsive, while
265    /// longer intervals reduce redraw overhead.
266    pub fn live_stats_interval(mut self, interval: Duration) -> Self {
267        self.config.live_stats_interval = interval;
268        self
269    }
270
271    /// Sets which scraped item fields should be shown in live stats preview.
272    ///
273    /// Field names support dot notation for nested JSON objects such as
274    /// `title`, `source_url`, or `metadata.Japanese`.
275    ///
276    /// You can also set aliases with `label=path`, for example
277    /// `url=source_url` or `jp=metadata.Japanese`.
278    pub fn live_stats_preview_fields(
279        mut self,
280        fields: impl IntoIterator<Item = impl Into<String>>,
281    ) -> Self {
282        self.config.live_stats_preview_fields = Some(fields.into_iter().map(Into::into).collect());
283        self
284    }
285
286    /// Sets the maximum grace period for crawler shutdown before forcing task abort.
287    ///
288    /// This gives pipelines, checkpoint writes, and other in-flight work time
289    /// to finish cleanly after shutdown begins.
290    pub fn shutdown_grace_period(mut self, grace_period: Duration) -> Self {
291        self.config.shutdown_grace_period = grace_period;
292        self
293    }
294
295    /// Stops the crawl after `limit` scraped items have been admitted for processing.
296    ///
297    /// This is especially useful for smoke runs, local previews, and
298    /// documentation examples where you want predictable bounded work.
299    pub fn limit(mut self, limit: usize) -> Self {
300        self.config.item_limit = Some(limit);
301        self
302    }
303
304    /// Sets a custom downloader implementation.
305    ///
306    /// Use this method to provide a custom [`Downloader`] implementation
307    /// instead of the default [`ReqwestClientDownloader`].
308    ///
309    /// Reach for this when transport behavior itself needs to change, such as
310    /// request signing, alternate HTTP stacks, downloader-level tracing, or
311    /// protocol-specific request execution.
312    pub fn downloader(mut self, downloader: D) -> Self {
313        self.downloader = downloader;
314        self
315    }
316
317    /// Adds a middleware to the crawler's middleware stack.
318    ///
319    /// Middlewares intercept and modify requests before they are sent and
320    /// responses after they are received. They are executed in the order
321    /// they are added.
322    ///
323    /// Middleware is the right layer for cross-cutting HTTP behavior such as
324    /// retry policy, rate limiting, cookies, user-agent management, cache
325    /// lookup, or `robots.txt` enforcement.
326    ///
327    /// ## Example
328    ///
329    /// ```rust,ignore
330    /// let crawler = CrawlerBuilder::new(MySpider)
331    ///     .add_middleware(RateLimitMiddleware::default())
332    ///     .add_middleware(RetryMiddleware::new())
333    ///     .build()
334    ///     .await?;
335    /// ```
336    pub fn add_middleware<M>(mut self, middleware: M) -> Self
337    where
338        M: Middleware<D::Client> + Send + Sync + 'static,
339    {
340        self.middlewares.push(Box::new(middleware));
341        self
342    }
343
344    /// Adds a pipeline to the crawler's pipeline stack.
345    ///
346    /// Pipelines process scraped items after they are extracted by the spider.
347    /// They can be used for validation, transformation, deduplication, or
348    /// storage (e.g., writing to files or databases).
349    ///
350    /// Pipelines are ordered. A common pattern is transform first, validate
351    /// second, deduplicate next, and write to outputs last.
352    ///
353    /// ## Example
354    ///
355    /// ```rust,ignore
356    /// let crawler = CrawlerBuilder::new(MySpider)
357    ///     .add_pipeline(ConsolePipeline::new())
358    ///     .add_pipeline(JsonPipeline::new("output.json")?)
359    ///     .build()
360    ///     .await?;
361    /// ```
362    pub fn add_pipeline<P>(mut self, pipeline: P) -> Self
363    where
364        P: Pipeline<S::Item> + 'static,
365    {
366        self.pipelines.push(Box::new(pipeline));
367        self
368    }
369
370    /// Sets the log level for `spider-*` library crates.
371    ///
372    /// This configures the logging level specifically for the spider-lib ecosystem
373    /// (spider-core, spider-middleware, spider-pipeline, spider-util, spider-downloader).
374    /// Logs from other dependencies (e.g., reqwest, tokio) will not be affected.
375    ///
376    /// ## Log Levels
377    ///
378    /// - `LevelFilter::Error` - Only error messages
379    /// - `LevelFilter::Warn` - Warnings and errors
380    /// - `LevelFilter::Info` - Informational messages, warnings, and errors
381    /// - `LevelFilter::Debug` - Debug messages and above
382    /// - `LevelFilter::Trace` - All messages including trace
383    ///
384    /// ## Example
385    ///
386    /// ```rust,ignore
387    /// use log::LevelFilter;
388    ///
389    /// let crawler = CrawlerBuilder::new(MySpider)
390    ///     .log_level(LevelFilter::Debug)
391    ///     .build()
392    ///     .await?;
393    /// ```
394    pub fn log_level(mut self, level: LevelFilter) -> Self {
395        self.log_level = Some(level);
396        self
397    }
398
399    /// Sets the path for saving and loading checkpoints.
400    ///
401    /// When enabled, the crawler periodically saves its state to this file,
402    /// allowing crawls to be resumed after interruption.
403    ///
404    /// Requires the `checkpoint` feature to be enabled.
405    ///
406    /// If a checkpoint file already exists at build time, the builder will
407    /// attempt to restore scheduler and pipeline state from it.
408    pub fn with_checkpoint_path<P: AsRef<Path>>(mut self, path: P) -> Self {
409        self.checkpoint_config.path = Some(path.as_ref().to_path_buf());
410        self
411    }
412
413    /// Sets the interval between automatic checkpoint saves.
414    ///
415    /// When enabled, the crawler saves its state at this interval.
416    /// Shorter intervals provide more frequent recovery points but may
417    /// impact performance.
418    ///
419    /// Requires the `checkpoint` feature to be enabled.
420    pub fn with_checkpoint_interval(mut self, interval: Duration) -> Self {
421        self.checkpoint_config.interval = Some(interval);
422        self
423    }
424
425    /// Builds the [`Crawler`] instance.
426    ///
427    /// This method finalizes the crawler configuration and initializes all
428    /// components. It performs validation and sets up default values where
429    /// necessary.
430    ///
431    /// Build time is where the runtime:
432    /// - validates concurrency and channel settings
433    /// - initializes logging and live-stats behavior
434    /// - restores checkpoint state if configured
435    /// - constructs the scheduler and runtime handles
436    ///
437    /// # Errors
438    ///
439    /// Returns a [`SpiderError::ConfigurationError`] if:
440    /// - `max_concurrent_downloads` is 0
441    /// - `parser_workers` is 0
442    /// - No spider was provided to the builder
443    ///
444    /// # Example
445    ///
446    /// ```rust,ignore
447    /// let crawler = CrawlerBuilder::new(MySpider)
448    ///     .max_concurrent_downloads(10)
449    ///     .build()
450    ///     .await?;
451    /// ```
452    pub async fn build(mut self) -> Result<Crawler<S, D::Client>, SpiderError>
453    where
454        D: Downloader + Send + Sync + 'static,
455        D::Client: Send + Sync + Clone,
456        S::Item: Send + Sync + 'static,
457    {
458        let spider = self.take_spider()?;
459        self.init_default_pipeline();
460
461        // Live stats redraw on stdout and should not interleave with spider-* logs.
462        // Force spider-* logs to Off when live stats mode is enabled.
463        let effective_log_level = if self.config.live_stats {
464            Some(LevelFilter::Off)
465        } else {
466            self.log_level
467        };
468
469        // Initialize logging for spider-* crates if an effective log level is configured.
470        if let Some(level) = effective_log_level {
471            self.init_logging(level);
472        }
473
474        // Validate config
475        self.config
476            .validate()
477            .map_err(SpiderError::ConfigurationError)?;
478
479        // Restore checkpoint and get scheduler state
480        #[cfg(feature = "checkpoint")]
481        let (scheduler_state, pipeline_states) = self.restore_checkpoint()?;
482        #[cfg(not(feature = "checkpoint"))]
483        let scheduler_state: Option<()> = None;
484
485        // Restore pipeline states if checkpoint was loaded
486        #[cfg(feature = "checkpoint")]
487        {
488            if let Some(states) = pipeline_states {
489                for (name, state) in states {
490                    if let Some(pipeline) = self.pipelines.iter().find(|p| p.name() == name) {
491                        pipeline.restore_state(state).await?;
492                    } else {
493                        warn!("Checkpoint contains state for unknown pipeline: {}", name);
494                    }
495                }
496            }
497        }
498
499        // Get cookie store if feature is enabled
500        #[cfg(feature = "cookie-store")]
501        let cookie_store = {
502            #[cfg(feature = "checkpoint")]
503            {
504                let (_, cookie_store) = self.restore_cookie_store().await?;
505                cookie_store
506            }
507            #[cfg(not(feature = "checkpoint"))]
508            {
509                Some(crate::CookieStore::default())
510            }
511        };
512
513        // Create scheduler with or without checkpoint state
514        let (scheduler_arc, req_rx) =
515            Scheduler::new(scheduler_state, self.config.max_pending_requests);
516        let downloader_arc = Arc::new(self.downloader);
517        let stats = Arc::new(StatCollector::new(
518            self.config.live_stats_preview_fields.clone(),
519        ));
520
521        // Build crawler with or without cookie store based on feature flag
522        #[cfg(feature = "cookie-store")]
523        let crawler = Crawler::new(
524            scheduler_arc,
525            req_rx,
526            downloader_arc,
527            self.middlewares,
528            spider,
529            self.pipelines,
530            self.config,
531            #[cfg(feature = "checkpoint")]
532            self.checkpoint_config,
533            stats,
534            Arc::new(tokio::sync::RwLock::new(cookie_store.unwrap_or_default())),
535        );
536
537        #[cfg(not(feature = "cookie-store"))]
538        let crawler = Crawler::new(
539            scheduler_arc,
540            req_rx,
541            downloader_arc,
542            self.middlewares,
543            spider,
544            self.pipelines,
545            self.config,
546            #[cfg(feature = "checkpoint")]
547            self.checkpoint_config,
548            stats,
549        );
550
551        Ok(crawler)
552    }
553
554    /// Restores checkpoint state from disk (checkpoint feature only).
555    ///
556    /// This internal method loads a previously saved checkpoint file and
557    /// restores the scheduler state and pipeline states.
558    ///
559    /// # Returns
560    ///
561    /// Returns a tuple of `(SchedulerCheckpoint, Option<HashMap<String, Value>>)`.
562    /// If no checkpoint path is configured or the file doesn't exist, returns
563    /// default values.
564    ///
565    /// # Errors
566    ///
567    /// Returns a [`SpiderError`] if deserialization fails.
568    /// Note: Checkpoint file read/deserialization errors are logged as warnings
569    /// but do not fail the operation—the crawl proceeds without checkpoint data.
570    #[cfg(feature = "checkpoint")]
571    fn restore_checkpoint(&mut self) -> Result<RestoreResult, SpiderError> {
572        let mut scheduler_state = None;
573        let mut pipeline_states = None;
574
575        if let Some(path) = &self.checkpoint_config.path {
576            info!("Attempting to restore checkpoint from {:?}", path);
577            if let Some(checkpoint) = self.load_checkpoint_from_path(path) {
578                scheduler_state = Some(checkpoint.scheduler);
579                pipeline_states = Some(checkpoint.pipelines);
580            }
581        }
582
583        Ok((scheduler_state, pipeline_states))
584    }
585
586    /// Restores cookie store from checkpoint (checkpoint + cookie-store features).
587    #[cfg(all(feature = "checkpoint", feature = "cookie-store"))]
588    async fn restore_cookie_store(
589        &mut self,
590    ) -> Result<(Option<()>, Option<crate::CookieStore>), SpiderError> {
591        let mut cookie_store = None;
592
593        if let Some(path) = &self.checkpoint_config.path {
594            info!(
595                "Attempting to restore cookie store from checkpoint {:?}",
596                path
597            );
598            if let Some(checkpoint) = self.load_checkpoint_from_path(path) {
599                cookie_store = Some(checkpoint.cookie_store);
600            }
601        }
602
603        Ok((None, cookie_store))
604    }
605
606    /// Extracts the spider from the builder.
607    ///
608    /// # Errors
609    ///
610    /// Returns a [`SpiderError::ConfigurationError`] if:
611    /// - `max_concurrent_downloads` is 0
612    /// - `parser_workers` is 0
613    /// - No spider was provided
614    fn take_spider(&mut self) -> Result<S, SpiderError> {
615        if self.config.max_concurrent_downloads == 0 {
616            return Err(SpiderError::ConfigurationError(
617                "max_concurrent_downloads must be greater than 0.".to_string(),
618            ));
619        }
620        if self.config.max_pending_requests == 0 {
621            return Err(SpiderError::ConfigurationError(
622                "max_pending_requests must be greater than 0.".to_string(),
623            ));
624        }
625        if self.config.parser_workers == 0 {
626            return Err(SpiderError::ConfigurationError(
627                "parser_workers must be greater than 0.".to_string(),
628            ));
629        }
630        self.spider.take().ok_or_else(|| {
631            SpiderError::ConfigurationError("Crawler must have a spider.".to_string())
632        })
633    }
634
635    /// Initializes the pipeline stack with a default [`ConsolePipeline`] if empty.
636    ///
637    /// This ensures that scraped items are always output somewhere, even if
638    /// no explicit pipelines are configured by the user.
639    fn init_default_pipeline(&mut self) {
640        if self.pipelines.is_empty() {
641            use spider_pipeline::console::ConsolePipeline;
642            self.pipelines.push(Box::new(ConsolePipeline::new()));
643        }
644    }
645
646    /// Initializes logging for spider-* crates only.
647    ///
648    /// This sets up env_logger with a filter that only enables logging for
649    /// crates within the spider-lib ecosystem.
650    fn init_logging(&self, level: LevelFilter) {
651        use env_logger::{Builder, Env};
652
653        let mut builder = Builder::from_env(Env::default().default_filter_or("off"));
654
655        // Set filter specifically for spider-* crates
656        builder.filter_module("spider_core", level);
657        builder.filter_module("spider_middleware", level);
658        builder.filter_module("spider_pipeline", level);
659        builder.filter_module("spider_util", level);
660        builder.filter_module("spider_downloader", level);
661        builder.filter_module("spider_macro", level);
662
663        builder.init();
664    }
665}