1use crate::Downloader;
43use crate::ReqwestClientDownloader;
44use crate::scheduler::Scheduler;
45use crate::spider::Spider;
46use num_cpus;
47use spider_middleware::middleware::Middleware;
48use spider_pipeline::pipeline::Pipeline;
49use spider_util::error::SpiderError;
50use std::marker::PhantomData;
51use std::path::{Path, PathBuf};
52use std::sync::Arc;
53use std::time::Duration;
54
55use super::Crawler;
56use crate::stats::StatCollector;
57#[cfg(feature = "checkpoint")]
58use log::{debug, warn};
59
60#[cfg(feature = "checkpoint")]
61use crate::SchedulerCheckpoint;
62#[cfg(feature = "checkpoint")]
63use rmp_serde;
64#[cfg(feature = "checkpoint")]
65use std::fs;
66
67pub struct CrawlerConfig {
69 pub max_concurrent_downloads: usize,
71 pub parser_workers: usize,
73 pub max_concurrent_pipelines: usize,
75 pub channel_capacity: usize,
77}
78
79impl Default for CrawlerConfig {
80 fn default() -> Self {
81 CrawlerConfig {
82 max_concurrent_downloads: num_cpus::get().max(16),
83 parser_workers: num_cpus::get().clamp(4, 16),
84 max_concurrent_pipelines: num_cpus::get().min(8),
85 channel_capacity: 1000,
86 }
87 }
88}
89
90pub struct CrawlerBuilder<S: Spider, D>
91where
92 D: Downloader,
93{
94 crawler_config: CrawlerConfig,
95 downloader: D,
96 spider: Option<S>,
97 middlewares: Vec<Box<dyn Middleware<D::Client> + Send + Sync>>,
98 item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
99 checkpoint_path: Option<PathBuf>,
100 checkpoint_interval: Option<Duration>,
101 _phantom: PhantomData<S>,
102}
103
104impl<S: Spider> Default for CrawlerBuilder<S, ReqwestClientDownloader> {
105 fn default() -> Self {
106 Self {
107 crawler_config: CrawlerConfig::default(),
108 downloader: ReqwestClientDownloader::default(),
109 spider: None,
110 middlewares: Vec::new(),
111 item_pipelines: Vec::new(),
112 checkpoint_path: None,
113 checkpoint_interval: None,
114 _phantom: PhantomData,
115 }
116 }
117}
118
119impl<S: Spider> CrawlerBuilder<S, ReqwestClientDownloader> {
120 pub fn new(spider: S) -> Self {
122 Self {
123 spider: Some(spider),
124 ..Default::default()
125 }
126 }
127}
128
129impl<S: Spider, D: Downloader> CrawlerBuilder<S, D> {
130 pub fn max_concurrent_downloads(mut self, limit: usize) -> Self {
132 self.crawler_config.max_concurrent_downloads = limit;
133 self
134 }
135
136 pub fn max_parser_workers(mut self, limit: usize) -> Self {
138 self.crawler_config.parser_workers = limit;
139 self
140 }
141
142 pub fn max_concurrent_pipelines(mut self, limit: usize) -> Self {
144 self.crawler_config.max_concurrent_pipelines = limit;
145 self
146 }
147
148 pub fn channel_capacity(mut self, capacity: usize) -> Self {
150 self.crawler_config.channel_capacity = capacity;
151 self
152 }
153
154 pub fn downloader(mut self, downloader: D) -> Self {
156 self.downloader = downloader;
157 self
158 }
159
160 pub fn add_middleware<M>(mut self, middleware: M) -> Self
162 where
163 M: Middleware<D::Client> + Send + Sync + 'static,
164 {
165 self.middlewares.push(Box::new(middleware));
166 self
167 }
168
169 pub fn add_pipeline<P>(mut self, pipeline: P) -> Self
171 where
172 P: Pipeline<S::Item> + 'static,
173 {
174 self.item_pipelines.push(Box::new(pipeline));
175 self
176 }
177
178 pub fn with_checkpoint_path<P: AsRef<Path>>(mut self, path: P) -> Self {
180 self.checkpoint_path = Some(path.as_ref().to_path_buf());
181 self
182 }
183
184 pub fn with_checkpoint_interval(mut self, interval: Duration) -> Self {
186 self.checkpoint_interval = Some(interval);
187 self
188 }
189
190 #[allow(unused_variables)]
192 pub async fn build(mut self) -> Result<Crawler<S, D::Client>, SpiderError>
193 where
194 D: Downloader + Send + Sync + 'static,
195 D::Client: Send + Sync + Clone,
196 S::Item: Send + Sync + 'static,
197 {
198 let spider = self.validate_and_get_spider()?;
199 self.ensure_default_pipeline();
200
201 #[cfg(all(feature = "checkpoint", feature = "cookie-store"))]
202 {
203 let (initial_scheduler_state, loaded_cookie_store) =
204 self.load_and_restore_checkpoint_state().await?;
205 let (scheduler_arc, req_rx) = Scheduler::new(initial_scheduler_state);
206 let downloader_arc = Arc::new(self.downloader);
207 let stats = Arc::new(StatCollector::new());
208 let crawler = Crawler::new(
209 scheduler_arc,
210 req_rx,
211 downloader_arc,
212 self.middlewares,
213 spider,
214 self.item_pipelines,
215 self.crawler_config.max_concurrent_downloads,
216 self.crawler_config.parser_workers,
217 self.crawler_config.max_concurrent_pipelines,
218 self.crawler_config.channel_capacity,
219 self.checkpoint_path.take(),
220 self.checkpoint_interval,
221 stats,
222 Arc::new(tokio::sync::RwLock::new(
223 loaded_cookie_store.unwrap_or_default(),
224 )),
225 );
226 Ok(crawler)
227 }
228
229 #[cfg(all(feature = "checkpoint", not(feature = "cookie-store")))]
230 {
231 let (initial_scheduler_state, _loaded_cookie_store) =
232 self.load_and_restore_checkpoint_state().await?;
233 let (scheduler_arc, req_rx) = Scheduler::new(initial_scheduler_state);
234 let downloader_arc = Arc::new(self.downloader);
235 let stats = Arc::new(StatCollector::new());
236 let crawler = Crawler::new(
237 scheduler_arc,
238 req_rx,
239 downloader_arc,
240 self.middlewares,
241 spider,
242 self.item_pipelines,
243 self.crawler_config.max_concurrent_downloads,
244 self.crawler_config.parser_workers,
245 self.crawler_config.max_concurrent_pipelines,
246 self.crawler_config.channel_capacity,
247 self.checkpoint_path.take(),
248 self.checkpoint_interval,
249 stats,
250 );
251 return Ok(crawler);
252 }
253
254 #[cfg(all(not(feature = "checkpoint"), feature = "cookie-store"))]
255 {
256 let (_initial_scheduler_state, loaded_cookie_store) =
257 self.load_and_restore_checkpoint_state().await?;
258 let (scheduler_arc, req_rx) = Scheduler::new(None::<()>);
259 let downloader_arc = Arc::new(self.downloader);
260 let stats = Arc::new(StatCollector::new());
261 let crawler = Crawler::new(
262 scheduler_arc,
263 req_rx,
264 downloader_arc,
265 self.middlewares,
266 spider,
267 self.item_pipelines,
268 self.crawler_config.max_concurrent_downloads,
269 self.crawler_config.parser_workers,
270 self.crawler_config.max_concurrent_pipelines,
271 self.crawler_config.channel_capacity,
272 stats,
273 Arc::new(tokio::sync::RwLock::new(
274 loaded_cookie_store.unwrap_or_default(),
275 )),
276 );
277 Ok(crawler)
278 }
279
280 #[cfg(all(not(feature = "checkpoint"), not(feature = "cookie-store")))]
281 {
282 let (_initial_scheduler_state, _loaded_cookie_store) =
283 self.load_and_restore_checkpoint_state().await?;
284 let (scheduler_arc, req_rx) = Scheduler::new(None::<()>);
285 let downloader_arc = Arc::new(self.downloader);
286 let stats = Arc::new(StatCollector::new());
287 let crawler = Crawler::new(
288 scheduler_arc,
289 req_rx,
290 downloader_arc,
291 self.middlewares,
292 spider,
293 self.item_pipelines,
294 self.crawler_config.max_concurrent_downloads,
295 self.crawler_config.parser_workers,
296 self.crawler_config.max_concurrent_pipelines,
297 self.crawler_config.channel_capacity,
298 stats,
299 );
300 Ok(crawler)
301 }
302 }
303
304 #[cfg(all(feature = "checkpoint", feature = "cookie-store"))]
305 async fn load_and_restore_checkpoint_state(
306 &mut self,
307 ) -> Result<(Option<SchedulerCheckpoint>, Option<crate::CookieStore>), SpiderError> {
308 let mut initial_scheduler_state = None;
309 let mut loaded_pipelines_state = None;
310 let mut loaded_cookie_store = None;
311
312 if let Some(path) = &self.checkpoint_path {
313 debug!("Attempting to load checkpoint from {:?}", path);
314 match fs::read(path) {
315 Ok(bytes) => match rmp_serde::from_slice::<crate::Checkpoint>(&bytes) {
316 Ok(checkpoint) => {
317 initial_scheduler_state = Some(checkpoint.scheduler);
318 loaded_pipelines_state = Some(checkpoint.pipelines);
319
320 loaded_cookie_store = Some(checkpoint.cookie_store);
321 }
322 Err(e) => warn!("Failed to deserialize checkpoint from {:?}: {}", path, e),
323 },
324 Err(e) => warn!("Failed to read checkpoint file {:?}: {}", path, e),
325 }
326 }
327
328 if let Some(pipeline_states) = loaded_pipelines_state {
329 for (name, state) in pipeline_states {
330 if let Some(pipeline) = self.item_pipelines.iter().find(|p| p.name() == name) {
331 pipeline.restore_state(state).await?;
332 } else {
333 warn!("Checkpoint contains state for unknown pipeline: {}", name);
334 }
335 }
336 }
337
338 Ok((initial_scheduler_state, loaded_cookie_store))
339 }
340
341 #[cfg(all(feature = "checkpoint", not(feature = "cookie-store")))]
342 async fn load_and_restore_checkpoint_state(
343 &mut self,
344 ) -> Result<(Option<SchedulerCheckpoint>, Option<()>), SpiderError> {
345 let mut initial_scheduler_state = None;
346 let mut loaded_pipelines_state = None;
347
348 if let Some(path) = &self.checkpoint_path {
349 debug!("Attempting to load checkpoint from {:?}", path);
350 match fs::read(path) {
351 Ok(bytes) => match rmp_serde::from_slice::<crate::Checkpoint>(&bytes) {
352 Ok(checkpoint) => {
353 initial_scheduler_state = Some(checkpoint.scheduler);
354 loaded_pipelines_state = Some(checkpoint.pipelines);
355 }
356 Err(e) => warn!("Failed to deserialize checkpoint from {:?}: {}", path, e),
357 },
358 Err(e) => warn!("Failed to read checkpoint file {:?}: {}", path, e),
359 }
360 }
361
362 if let Some(pipeline_states) = loaded_pipelines_state {
363 for (name, state) in pipeline_states {
364 if let Some(pipeline) = self.item_pipelines.iter().find(|p| p.name() == name) {
365 pipeline.restore_state(state).await?;
366 } else {
367 warn!("Checkpoint contains state for unknown pipeline: {}", name);
368 }
369 }
370 }
371
372 Ok((initial_scheduler_state, None))
373 }
374
375 #[cfg(all(not(feature = "checkpoint"), not(feature = "cookie-store")))]
376 async fn load_and_restore_checkpoint_state(
377 &mut self,
378 ) -> Result<(Option<()>, Option<()>), SpiderError> {
379 Ok((None, None))
381 }
382
383 #[cfg(all(not(feature = "checkpoint"), feature = "cookie-store"))]
384 async fn load_and_restore_checkpoint_state(
385 &mut self,
386 ) -> Result<(Option<()>, Option<crate::CookieStore>), SpiderError> {
387 Ok((None, Some(crate::CookieStore::default())))
389 }
390
391 fn validate_and_get_spider(&mut self) -> Result<S, SpiderError> {
392 if self.crawler_config.max_concurrent_downloads == 0 {
393 return Err(SpiderError::ConfigurationError(
394 "max_concurrent_downloads must be greater than 0.".to_string(),
395 ));
396 }
397 if self.crawler_config.parser_workers == 0 {
398 return Err(SpiderError::ConfigurationError(
399 "parser_workers must be greater than 0.".to_string(),
400 ));
401 }
402 self.spider.take().ok_or_else(|| {
403 SpiderError::ConfigurationError("Crawler must have a spider.".to_string())
404 })
405 }
406
407 fn ensure_default_pipeline(&mut self) {
408 if self.item_pipelines.is_empty() {
409 use spider_pipeline::console::ConsolePipeline;
410 self.item_pipelines.push(Box::new(ConsolePipeline::new()));
411 }
412 }
413}
414