spider_core/builder.rs
1//! Builder API for assembling a [`Crawler`].
2//!
3//! [`CrawlerBuilder`] is where runtime composition happens: concurrency,
4//! downloader selection, middleware, pipelines, item limits, logging, and
5//! optional checkpointing all start here.
6//!
7//! ## Example
8//!
9//! ```rust,ignore
10//! use spider_core::CrawlerBuilder;
11//! use spider_middleware::rate_limit::RateLimitMiddleware;
12//! use spider_pipeline::console::ConsolePipeline;
13//! use spider_util::error::SpiderError;
14//!
15//! async fn setup_crawler() -> Result<(), SpiderError> {
16//! let crawler = CrawlerBuilder::new(MySpider)
17//! .max_concurrent_downloads(10)
18//! .max_parser_workers(4)
19//! .add_middleware(RateLimitMiddleware::default())
20//! .add_pipeline(ConsolePipeline::new())
21//! .with_checkpoint_path("./crawl.checkpoint")
22//! .build()
23//! .await?;
24//!
25//! crawler.start_crawl().await
26//! }
27//! ```
28
29use crate::Downloader;
30use crate::ReqwestClientDownloader;
31use crate::config::{CheckpointConfig, CrawlerConfig};
32use crate::scheduler::Scheduler;
33use crate::spider::Spider;
34use spider_middleware::middleware::Middleware;
35use spider_pipeline::pipeline::Pipeline;
36
37#[cfg(feature = "checkpoint")]
38type RestoreResult = (
39 Option<crate::SchedulerCheckpoint>,
40 Option<std::collections::HashMap<String, serde_json::Value>>,
41);
42use spider_util::error::SpiderError;
43use std::marker::PhantomData;
44use std::path::Path;
45use std::sync::Arc;
46use std::time::Duration;
47
48use super::Crawler;
49use crate::stats::StatCollector;
50use log::LevelFilter;
51#[cfg(feature = "checkpoint")]
52use log::{info, warn};
53
54#[cfg(feature = "checkpoint")]
55use rmp_serde;
56#[cfg(feature = "checkpoint")]
57use std::fs;
58
59/// A fluent builder for constructing [`Crawler`] instances.
60///
61/// ## Type Parameters
62///
63/// - `S`: The [`Spider`] implementation type
64/// - `D`: The [`Downloader`] implementation type
65///
66/// ## Example
67///
68/// ```rust,ignore
69/// # use spider_core::{CrawlerBuilder, Spider};
70/// # use spider_util::{response::Response, error::SpiderError, item::ParseOutput};
71/// # struct MySpider;
72/// # #[async_trait::async_trait]
73/// # impl Spider for MySpider {
74/// # type Item = String;
75/// # type State = ();
76/// # fn start_requests(&self) -> Result<spider_core::spider::StartRequests<'_>, SpiderError> {
77/// # Ok(spider_core::spider::StartRequests::Iter(Box::new(std::iter::empty())))
78/// # }
79/// # async fn parse(&self, response: Response, state: &Self::State) -> Result<ParseOutput<Self::Item>, SpiderError> { todo!() }
80/// # }
81/// let builder = CrawlerBuilder::new(MySpider)
82/// .max_concurrent_downloads(8)
83/// .max_pending_requests(16)
84/// .max_parser_workers(4);
85/// ```
86pub struct CrawlerBuilder<S: Spider, D>
87where
88 D: Downloader,
89{
90 config: CrawlerConfig,
91 checkpoint_config: CheckpointConfig,
92 downloader: D,
93 spider: Option<S>,
94 middlewares: Vec<Box<dyn Middleware<D::Client> + Send + Sync>>,
95 pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
96 log_level: Option<LevelFilter>,
97 _phantom: PhantomData<S>,
98}
99
100impl<S: Spider> Default for CrawlerBuilder<S, ReqwestClientDownloader> {
101 fn default() -> Self {
102 Self {
103 config: CrawlerConfig::default(),
104 checkpoint_config: CheckpointConfig::default(),
105 downloader: ReqwestClientDownloader::default(),
106 spider: None,
107 middlewares: Vec::new(),
108 pipelines: Vec::new(),
109 log_level: None,
110 _phantom: PhantomData,
111 }
112 }
113}
114
115impl<S: Spider> CrawlerBuilder<S, ReqwestClientDownloader> {
116 /// Creates a new `CrawlerBuilder` for a given spider with the default [`ReqwestClientDownloader`].
117 ///
118 /// ## Example
119 ///
120 /// ```rust,ignore
121 /// let crawler = CrawlerBuilder::new(MySpider)
122 /// .build()
123 /// .await?;
124 /// ```
125 pub fn new(spider: S) -> Self {
126 Self {
127 spider: Some(spider),
128 ..Default::default()
129 }
130 }
131}
132
133impl<S: Spider, D: Downloader> CrawlerBuilder<S, D> {
134 #[cfg(feature = "checkpoint")]
135 fn load_checkpoint_from_path(&self, path: &std::path::Path) -> Option<crate::Checkpoint> {
136 match fs::read(path) {
137 Ok(bytes) => match rmp_serde::from_slice::<crate::Checkpoint>(&bytes) {
138 Ok(checkpoint) => Some(checkpoint),
139 Err(e) => {
140 warn!("Failed to deserialize checkpoint from {:?}: {}", path, e);
141 None
142 }
143 },
144 Err(e) => {
145 warn!("Failed to read checkpoint file {:?}: {}", path, e);
146 None
147 }
148 }
149 }
150
151 /// Sets the maximum number of concurrent downloads.
152 ///
153 /// This controls how many HTTP requests can be in-flight simultaneously.
154 /// Higher values increase throughput but may overwhelm target servers.
155 ///
156 /// ## Default
157 ///
158 /// Defaults to twice the number of CPU cores, clamped between 4 and 64.
159 pub fn max_concurrent_downloads(mut self, limit: usize) -> Self {
160 self.config.max_concurrent_downloads = limit;
161 self
162 }
163
164 /// Sets the maximum number of outstanding requests tracked by the scheduler.
165 ///
166 /// This includes queued requests plus requests already handed off for download.
167 /// Lower values keep the frontier tighter and reduce internal request buildup.
168 pub fn max_pending_requests(mut self, limit: usize) -> Self {
169 self.config.max_pending_requests = limit;
170 self
171 }
172
173 /// Sets the number of worker tasks dedicated to parsing responses.
174 ///
175 /// Parser workers process HTTP responses concurrently, calling the
176 /// spider's [`parse`](Spider::parse) method to extract items and
177 /// discover new URLs.
178 ///
179 /// ## Default
180 ///
181 /// Defaults to the number of CPU cores, clamped between 4 and 16.
182 pub fn max_parser_workers(mut self, limit: usize) -> Self {
183 self.config.parser_workers = limit;
184 self
185 }
186
187 /// Sets the maximum number of concurrent item processing pipelines.
188 ///
189 /// This controls how many items can be processed by pipelines simultaneously.
190 ///
191 /// ## Default
192 ///
193 /// Defaults to the number of CPU cores, with a maximum of 8.
194 pub fn max_concurrent_pipelines(mut self, limit: usize) -> Self {
195 self.config.max_concurrent_pipelines = limit;
196 self
197 }
198
199 /// Sets the capacity of internal communication channels.
200 ///
201 /// This controls the buffer size for channels between the downloader,
202 /// parser, and pipeline components. Higher values can improve throughput
203 /// at the cost of increased memory usage.
204 ///
205 /// ## Default
206 ///
207 /// Defaults to 1000.
208 pub fn channel_capacity(mut self, capacity: usize) -> Self {
209 self.config.channel_capacity = capacity;
210 self
211 }
212
213 /// Sets the parser output batch size.
214 ///
215 /// Larger batches can reduce coordination overhead when pages emit many
216 /// items or follow-up requests, while smaller batches tend to improve
217 /// latency and memory locality.
218 pub fn output_batch_size(mut self, batch_size: usize) -> Self {
219 self.config.output_batch_size = batch_size;
220 self
221 }
222
223 /// Sets the downloader response-channel backpressure threshold.
224 ///
225 /// When the downloader-to-parser channel reaches this threshold, the
226 /// runtime starts applying backpressure so downloaded responses do not pile
227 /// up unboundedly in memory.
228 pub fn response_backpressure_threshold(mut self, threshold: usize) -> Self {
229 self.config.response_backpressure_threshold = threshold;
230 self
231 }
232
233 /// Sets the parser item-channel backpressure threshold.
234 ///
235 /// This primarily matters when parsing is faster than downstream pipeline
236 /// processing. Lower thresholds keep memory tighter; higher thresholds let
237 /// parsers run further ahead.
238 pub fn item_backpressure_threshold(mut self, threshold: usize) -> Self {
239 self.config.item_backpressure_threshold = threshold;
240 self
241 }
242
243 /// Controls whether retries release downloader permits before waiting.
244 ///
245 /// Enabling this is usually better for throughput because a sleeping retry
246 /// does not occupy scarce downloader concurrency. Disabling it can be
247 /// useful when you want retries to count fully against download capacity.
248 pub fn retry_release_permit(mut self, enabled: bool) -> Self {
249 self.config.retry_release_permit = enabled;
250 self
251 }
252
253 /// Enables or disables live, in-place statistics updates on terminal stdout.
254 ///
255 /// When enabled, spider-* logs are forced to `LevelFilter::Off` during build
256 /// to avoid interleaving with the live terminal renderer.
257 pub fn live_stats(mut self, enabled: bool) -> Self {
258 self.config.live_stats = enabled;
259 self
260 }
261
262 /// Sets the refresh interval for live statistics updates.
263 ///
264 /// Shorter intervals make the terminal view feel more responsive, while
265 /// longer intervals reduce redraw overhead.
266 pub fn live_stats_interval(mut self, interval: Duration) -> Self {
267 self.config.live_stats_interval = interval;
268 self
269 }
270
271 /// Sets which scraped item fields should be shown in live stats preview.
272 ///
273 /// Field names support dot notation for nested JSON objects such as
274 /// `title`, `source_url`, or `metadata.Japanese`.
275 ///
276 /// You can also set aliases with `label=path`, for example
277 /// `url=source_url` or `jp=metadata.Japanese`.
278 pub fn live_stats_preview_fields(
279 mut self,
280 fields: impl IntoIterator<Item = impl Into<String>>,
281 ) -> Self {
282 self.config.live_stats_preview_fields = Some(fields.into_iter().map(Into::into).collect());
283 self
284 }
285
286 /// Sets the maximum grace period for crawler shutdown before forcing task abort.
287 ///
288 /// This gives pipelines, checkpoint writes, and other in-flight work time
289 /// to finish cleanly after shutdown begins.
290 pub fn shutdown_grace_period(mut self, grace_period: Duration) -> Self {
291 self.config.shutdown_grace_period = grace_period;
292 self
293 }
294
295 /// Stops the crawl after `limit` scraped items have been admitted for processing.
296 ///
297 /// This is especially useful for smoke runs, local previews, and
298 /// documentation examples where you want predictable bounded work.
299 pub fn limit(mut self, limit: usize) -> Self {
300 self.config.item_limit = Some(limit);
301 self
302 }
303
304 /// Sets a custom downloader implementation.
305 ///
306 /// Use this method to provide a custom [`Downloader`] implementation
307 /// instead of the default [`ReqwestClientDownloader`].
308 ///
309 /// Reach for this when transport behavior itself needs to change, such as
310 /// request signing, alternate HTTP stacks, downloader-level tracing, or
311 /// protocol-specific request execution.
312 pub fn downloader(mut self, downloader: D) -> Self {
313 self.downloader = downloader;
314 self
315 }
316
317 /// Adds a middleware to the crawler's middleware stack.
318 ///
319 /// Middlewares intercept and modify requests before they are sent and
320 /// responses after they are received. They are executed in the order
321 /// they are added.
322 ///
323 /// Middleware is the right layer for cross-cutting HTTP behavior such as
324 /// retry policy, rate limiting, cookies, user-agent management, cache
325 /// lookup, or `robots.txt` enforcement.
326 ///
327 /// ## Example
328 ///
329 /// ```rust,ignore
330 /// let crawler = CrawlerBuilder::new(MySpider)
331 /// .add_middleware(RateLimitMiddleware::default())
332 /// .add_middleware(RetryMiddleware::new())
333 /// .build()
334 /// .await?;
335 /// ```
336 pub fn add_middleware<M>(mut self, middleware: M) -> Self
337 where
338 M: Middleware<D::Client> + Send + Sync + 'static,
339 {
340 self.middlewares.push(Box::new(middleware));
341 self
342 }
343
344 /// Adds a pipeline to the crawler's pipeline stack.
345 ///
346 /// Pipelines process scraped items after they are extracted by the spider.
347 /// They can be used for validation, transformation, deduplication, or
348 /// storage (e.g., writing to files or databases).
349 ///
350 /// Pipelines are ordered. A common pattern is transform first, validate
351 /// second, deduplicate next, and write to outputs last.
352 ///
353 /// ## Example
354 ///
355 /// ```rust,ignore
356 /// let crawler = CrawlerBuilder::new(MySpider)
357 /// .add_pipeline(ConsolePipeline::new())
358 /// .add_pipeline(JsonPipeline::new("output.json")?)
359 /// .build()
360 /// .await?;
361 /// ```
362 pub fn add_pipeline<P>(mut self, pipeline: P) -> Self
363 where
364 P: Pipeline<S::Item> + 'static,
365 {
366 self.pipelines.push(Box::new(pipeline));
367 self
368 }
369
370 /// Sets the log level for `spider-*` library crates.
371 ///
372 /// This configures the logging level specifically for the spider-lib ecosystem
373 /// (spider-core, spider-middleware, spider-pipeline, spider-util, spider-downloader).
374 /// Logs from other dependencies (e.g., reqwest, tokio) will not be affected.
375 ///
376 /// ## Log Levels
377 ///
378 /// - `LevelFilter::Error` - Only error messages
379 /// - `LevelFilter::Warn` - Warnings and errors
380 /// - `LevelFilter::Info` - Informational messages, warnings, and errors
381 /// - `LevelFilter::Debug` - Debug messages and above
382 /// - `LevelFilter::Trace` - All messages including trace
383 ///
384 /// ## Example
385 ///
386 /// ```rust,ignore
387 /// use log::LevelFilter;
388 ///
389 /// let crawler = CrawlerBuilder::new(MySpider)
390 /// .log_level(LevelFilter::Debug)
391 /// .build()
392 /// .await?;
393 /// ```
394 pub fn log_level(mut self, level: LevelFilter) -> Self {
395 self.log_level = Some(level);
396 self
397 }
398
399 /// Sets the path for saving and loading checkpoints.
400 ///
401 /// When enabled, the crawler periodically saves its state to this file,
402 /// allowing crawls to be resumed after interruption.
403 ///
404 /// Requires the `checkpoint` feature to be enabled.
405 ///
406 /// If a checkpoint file already exists at build time, the builder will
407 /// attempt to restore scheduler and pipeline state from it.
408 pub fn with_checkpoint_path<P: AsRef<Path>>(mut self, path: P) -> Self {
409 self.checkpoint_config.path = Some(path.as_ref().to_path_buf());
410 self
411 }
412
413 /// Sets the interval between automatic checkpoint saves.
414 ///
415 /// When enabled, the crawler saves its state at this interval.
416 /// Shorter intervals provide more frequent recovery points but may
417 /// impact performance.
418 ///
419 /// Requires the `checkpoint` feature to be enabled.
420 pub fn with_checkpoint_interval(mut self, interval: Duration) -> Self {
421 self.checkpoint_config.interval = Some(interval);
422 self
423 }
424
425 /// Builds the [`Crawler`] instance.
426 ///
427 /// This method finalizes the crawler configuration and initializes all
428 /// components. It performs validation and sets up default values where
429 /// necessary.
430 ///
431 /// Build time is where the runtime:
432 /// - validates concurrency and channel settings
433 /// - initializes logging and live-stats behavior
434 /// - restores checkpoint state if configured
435 /// - constructs the scheduler and runtime handles
436 ///
437 /// # Errors
438 ///
439 /// Returns a [`SpiderError::ConfigurationError`] if:
440 /// - `max_concurrent_downloads` is 0
441 /// - `parser_workers` is 0
442 /// - No spider was provided to the builder
443 ///
444 /// # Example
445 ///
446 /// ```rust,ignore
447 /// let crawler = CrawlerBuilder::new(MySpider)
448 /// .max_concurrent_downloads(10)
449 /// .build()
450 /// .await?;
451 /// ```
452 pub async fn build(mut self) -> Result<Crawler<S, D::Client>, SpiderError>
453 where
454 D: Downloader + Send + Sync + 'static,
455 D::Client: Send + Sync + Clone,
456 S::Item: Send + Sync + 'static,
457 {
458 let spider = self.take_spider()?;
459 self.init_default_pipeline();
460
461 // Live stats redraw on stdout and should not interleave with spider-* logs.
462 // Force spider-* logs to Off when live stats mode is enabled.
463 let effective_log_level = if self.config.live_stats {
464 Some(LevelFilter::Off)
465 } else {
466 self.log_level
467 };
468
469 // Initialize logging for spider-* crates if an effective log level is configured.
470 if let Some(level) = effective_log_level {
471 self.init_logging(level);
472 }
473
474 // Validate config
475 self.config
476 .validate()
477 .map_err(SpiderError::ConfigurationError)?;
478
479 // Restore checkpoint and get scheduler state
480 #[cfg(feature = "checkpoint")]
481 let (scheduler_state, pipeline_states) = self.restore_checkpoint()?;
482 #[cfg(not(feature = "checkpoint"))]
483 let scheduler_state: Option<()> = None;
484
485 // Restore pipeline states if checkpoint was loaded
486 #[cfg(feature = "checkpoint")]
487 {
488 if let Some(states) = pipeline_states {
489 for (name, state) in states {
490 if let Some(pipeline) = self.pipelines.iter().find(|p| p.name() == name) {
491 pipeline.restore_state(state).await?;
492 } else {
493 warn!("Checkpoint contains state for unknown pipeline: {}", name);
494 }
495 }
496 }
497 }
498
499 // Get cookie store if feature is enabled
500 #[cfg(feature = "cookie-store")]
501 let cookie_store = {
502 #[cfg(feature = "checkpoint")]
503 {
504 let (_, cookie_store) = self.restore_cookie_store().await?;
505 cookie_store
506 }
507 #[cfg(not(feature = "checkpoint"))]
508 {
509 Some(crate::CookieStore::default())
510 }
511 };
512
513 // Create scheduler with or without checkpoint state
514 let (scheduler_arc, req_rx) =
515 Scheduler::new(scheduler_state, self.config.max_pending_requests);
516 let downloader_arc = Arc::new(self.downloader);
517 let stats = Arc::new(StatCollector::new(
518 self.config.live_stats_preview_fields.clone(),
519 ));
520
521 // Build crawler with or without cookie store based on feature flag
522 #[cfg(feature = "cookie-store")]
523 let crawler = Crawler::new(
524 scheduler_arc,
525 req_rx,
526 downloader_arc,
527 self.middlewares,
528 spider,
529 self.pipelines,
530 self.config,
531 #[cfg(feature = "checkpoint")]
532 self.checkpoint_config,
533 stats,
534 Arc::new(tokio::sync::RwLock::new(cookie_store.unwrap_or_default())),
535 );
536
537 #[cfg(not(feature = "cookie-store"))]
538 let crawler = Crawler::new(
539 scheduler_arc,
540 req_rx,
541 downloader_arc,
542 self.middlewares,
543 spider,
544 self.pipelines,
545 self.config,
546 #[cfg(feature = "checkpoint")]
547 self.checkpoint_config,
548 stats,
549 );
550
551 Ok(crawler)
552 }
553
554 /// Restores checkpoint state from disk (checkpoint feature only).
555 ///
556 /// This internal method loads a previously saved checkpoint file and
557 /// restores the scheduler state and pipeline states.
558 ///
559 /// # Returns
560 ///
561 /// Returns a tuple of `(SchedulerCheckpoint, Option<HashMap<String, Value>>)`.
562 /// If no checkpoint path is configured or the file doesn't exist, returns
563 /// default values.
564 ///
565 /// # Errors
566 ///
567 /// Returns a [`SpiderError`] if deserialization fails.
568 /// Note: Checkpoint file read/deserialization errors are logged as warnings
569 /// but do not fail the operation—the crawl proceeds without checkpoint data.
570 #[cfg(feature = "checkpoint")]
571 fn restore_checkpoint(&mut self) -> Result<RestoreResult, SpiderError> {
572 let mut scheduler_state = None;
573 let mut pipeline_states = None;
574
575 if let Some(path) = &self.checkpoint_config.path {
576 info!("Attempting to restore checkpoint from {:?}", path);
577 if let Some(checkpoint) = self.load_checkpoint_from_path(path) {
578 scheduler_state = Some(checkpoint.scheduler);
579 pipeline_states = Some(checkpoint.pipelines);
580 }
581 }
582
583 Ok((scheduler_state, pipeline_states))
584 }
585
586 /// Restores cookie store from checkpoint (checkpoint + cookie-store features).
587 #[cfg(all(feature = "checkpoint", feature = "cookie-store"))]
588 async fn restore_cookie_store(
589 &mut self,
590 ) -> Result<(Option<()>, Option<crate::CookieStore>), SpiderError> {
591 let mut cookie_store = None;
592
593 if let Some(path) = &self.checkpoint_config.path {
594 info!(
595 "Attempting to restore cookie store from checkpoint {:?}",
596 path
597 );
598 if let Some(checkpoint) = self.load_checkpoint_from_path(path) {
599 cookie_store = Some(checkpoint.cookie_store);
600 }
601 }
602
603 Ok((None, cookie_store))
604 }
605
606 /// Extracts the spider from the builder.
607 ///
608 /// # Errors
609 ///
610 /// Returns a [`SpiderError::ConfigurationError`] if:
611 /// - `max_concurrent_downloads` is 0
612 /// - `parser_workers` is 0
613 /// - No spider was provided
614 fn take_spider(&mut self) -> Result<S, SpiderError> {
615 if self.config.max_concurrent_downloads == 0 {
616 return Err(SpiderError::ConfigurationError(
617 "max_concurrent_downloads must be greater than 0.".to_string(),
618 ));
619 }
620 if self.config.max_pending_requests == 0 {
621 return Err(SpiderError::ConfigurationError(
622 "max_pending_requests must be greater than 0.".to_string(),
623 ));
624 }
625 if self.config.parser_workers == 0 {
626 return Err(SpiderError::ConfigurationError(
627 "parser_workers must be greater than 0.".to_string(),
628 ));
629 }
630 self.spider.take().ok_or_else(|| {
631 SpiderError::ConfigurationError("Crawler must have a spider.".to_string())
632 })
633 }
634
635 /// Initializes the pipeline stack with a default [`ConsolePipeline`] if empty.
636 ///
637 /// This ensures that scraped items are always output somewhere, even if
638 /// no explicit pipelines are configured by the user.
639 fn init_default_pipeline(&mut self) {
640 if self.pipelines.is_empty() {
641 use spider_pipeline::console::ConsolePipeline;
642 self.pipelines.push(Box::new(ConsolePipeline::new()));
643 }
644 }
645
646 /// Initializes logging for spider-* crates only.
647 ///
648 /// This sets up env_logger with a filter that only enables logging for
649 /// crates within the spider-lib ecosystem.
650 fn init_logging(&self, level: LevelFilter) {
651 use env_logger::{Builder, Env};
652
653 let mut builder = Builder::from_env(Env::default().default_filter_or("off"));
654
655 // Set filter specifically for spider-* crates
656 builder.filter_module("spider_core", level);
657 builder.filter_module("spider_middleware", level);
658 builder.filter_module("spider_pipeline", level);
659 builder.filter_module("spider_util", level);
660 builder.filter_module("spider_downloader", level);
661 builder.filter_module("spider_macro", level);
662
663 builder.init();
664 }
665}