1use crate::Downloader;
43use crate::ReqwestClientDownloader;
44use crate::scheduler::Scheduler;
45use crate::spider::Spider;
46use num_cpus;
47use spider_middleware::middleware::Middleware;
48use spider_pipeline::pipeline::Pipeline;
49use spider_util::error::SpiderError;
50use std::marker::PhantomData;
51use std::path::{Path, PathBuf};
52use std::sync::Arc;
53use std::time::Duration;
54
55use super::Crawler;
56use crate::stats::StatCollector;
57#[cfg(feature = "checkpoint")]
58use tracing::{debug, warn};
59
60#[cfg(feature = "checkpoint")]
61use crate::SchedulerCheckpoint;
62#[cfg(feature = "checkpoint")]
63use rmp_serde;
64#[cfg(feature = "checkpoint")]
65use std::fs;
66
67pub struct CrawlerConfig {
69 pub max_concurrent_downloads: usize,
71 pub parser_workers: usize,
73 pub max_concurrent_pipelines: usize,
75 pub channel_capacity: usize,
77}
78
79impl Default for CrawlerConfig {
80 fn default() -> Self {
81 CrawlerConfig {
82 max_concurrent_downloads: num_cpus::get().max(16),
83 parser_workers: num_cpus::get().clamp(4, 16),
84 max_concurrent_pipelines: num_cpus::get().min(8),
85 channel_capacity: 1000,
86 }
87 }
88}
89
90pub struct CrawlerBuilder<S: Spider, D>
91where
92 D: Downloader,
93{
94 crawler_config: CrawlerConfig,
95 downloader: D,
96 spider: Option<S>,
97 middlewares: Vec<Box<dyn Middleware<D::Client> + Send + Sync>>,
98 item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
99 checkpoint_path: Option<PathBuf>,
100 checkpoint_interval: Option<Duration>,
101 _phantom: PhantomData<S>,
102}
103
104impl<S: Spider> Default for CrawlerBuilder<S, ReqwestClientDownloader> {
105 fn default() -> Self {
106 Self {
107 crawler_config: CrawlerConfig::default(),
108 downloader: ReqwestClientDownloader::default(),
109 spider: None,
110 middlewares: Vec::new(),
111 item_pipelines: Vec::new(),
112 checkpoint_path: None,
113 checkpoint_interval: None,
114 _phantom: PhantomData,
115 }
116 }
117}
118
119impl<S: Spider> CrawlerBuilder<S, ReqwestClientDownloader> {
120 pub fn new(spider: S) -> Self {
122 Self {
123 spider: Some(spider),
124 ..Default::default()
125 }
126 }
127}
128
129impl<S: Spider, D: Downloader> CrawlerBuilder<S, D> {
130 pub fn max_concurrent_downloads(mut self, limit: usize) -> Self {
132 self.crawler_config.max_concurrent_downloads = limit;
133 self
134 }
135
136 pub fn max_parser_workers(mut self, limit: usize) -> Self {
138 self.crawler_config.parser_workers = limit;
139 self
140 }
141
142 pub fn max_concurrent_pipelines(mut self, limit: usize) -> Self {
144 self.crawler_config.max_concurrent_pipelines = limit;
145 self
146 }
147
148 pub fn channel_capacity(mut self, capacity: usize) -> Self {
150 self.crawler_config.channel_capacity = capacity;
151 self
152 }
153
154 pub fn downloader(mut self, downloader: D) -> Self {
156 self.downloader = downloader;
157 self
158 }
159
160 pub fn add_middleware<M>(mut self, middleware: M) -> Self
162 where
163 M: Middleware<D::Client> + Send + Sync + 'static,
164 {
165 self.middlewares.push(Box::new(middleware));
166 self
167 }
168
169 pub fn add_pipeline<P>(mut self, pipeline: P) -> Self
171 where
172 P: Pipeline<S::Item> + 'static,
173 {
174 self.item_pipelines.push(Box::new(pipeline));
175 self
176 }
177
178 pub fn with_checkpoint_path<P: AsRef<Path>>(mut self, path: P) -> Self {
180 self.checkpoint_path = Some(path.as_ref().to_path_buf());
181 self
182 }
183
184 pub fn with_checkpoint_interval(mut self, interval: Duration) -> Self {
186 self.checkpoint_interval = Some(interval);
187 self
188 }
189
190 #[allow(unused_variables)]
192 pub async fn build(mut self) -> Result<Crawler<S, D::Client>, SpiderError>
193 where
194 D: Downloader + Send + Sync + 'static,
195 D::Client: Send + Sync + Clone,
196 S::Item: Send + Sync + 'static,
197 {
198 let spider = self.validate_and_get_spider()?;
199
200 if self.item_pipelines.is_empty() {
202 use spider_pipeline::console_writer::ConsoleWriterPipeline;
203 self = self.add_pipeline(ConsoleWriterPipeline::new());
204 }
205
206 #[cfg(all(feature = "checkpoint", feature = "cookie-store"))]
207 {
208 let (initial_scheduler_state, loaded_cookie_store) =
209 self.load_and_restore_checkpoint_state().await?;
210 let (scheduler_arc, req_rx) = Scheduler::new(initial_scheduler_state);
211 let downloader_arc = Arc::new(self.downloader);
212 let stats = Arc::new(StatCollector::new());
213 let crawler = Crawler::new(
214 scheduler_arc,
215 req_rx,
216 downloader_arc,
217 self.middlewares,
218 spider,
219 self.item_pipelines,
220 self.crawler_config.max_concurrent_downloads,
221 self.crawler_config.parser_workers,
222 self.crawler_config.max_concurrent_pipelines,
223 self.crawler_config.channel_capacity,
224 self.checkpoint_path.take(),
225 self.checkpoint_interval,
226 stats,
227 Arc::new(tokio::sync::RwLock::new(
228 loaded_cookie_store.unwrap_or_default(),
229 )),
230 );
231 Ok(crawler)
232 }
233
234 #[cfg(all(feature = "checkpoint", not(feature = "cookie-store")))]
235 {
236 let (initial_scheduler_state, _loaded_cookie_store) =
237 self.load_and_restore_checkpoint_state().await?;
238 let (scheduler_arc, req_rx) = Scheduler::new(initial_scheduler_state);
239 let downloader_arc = Arc::new(self.downloader);
240 let stats = Arc::new(StatCollector::new());
241 let crawler = Crawler::new(
242 scheduler_arc,
243 req_rx,
244 downloader_arc,
245 self.middlewares,
246 spider,
247 self.item_pipelines,
248 self.crawler_config.max_concurrent_downloads,
249 self.crawler_config.parser_workers,
250 self.crawler_config.max_concurrent_pipelines,
251 self.crawler_config.channel_capacity,
252 self.checkpoint_path.take(),
253 self.checkpoint_interval,
254 stats,
255 );
256 return Ok(crawler);
257 }
258
259 #[cfg(all(not(feature = "checkpoint"), feature = "cookie-store"))]
260 {
261 let (_initial_scheduler_state, loaded_cookie_store) =
262 self.load_and_restore_checkpoint_state().await?;
263 let (scheduler_arc, req_rx) = Scheduler::new(None::<()>);
264 let downloader_arc = Arc::new(self.downloader);
265 let stats = Arc::new(StatCollector::new());
266 let crawler = Crawler::new(
267 scheduler_arc,
268 req_rx,
269 downloader_arc,
270 self.middlewares,
271 spider,
272 self.item_pipelines,
273 self.crawler_config.max_concurrent_downloads,
274 self.crawler_config.parser_workers,
275 self.crawler_config.max_concurrent_pipelines,
276 self.crawler_config.channel_capacity,
277 stats,
278 Arc::new(tokio::sync::RwLock::new(
279 loaded_cookie_store.unwrap_or_default(),
280 )),
281 );
282 return Ok(crawler);
283 }
284
285 #[cfg(all(not(feature = "checkpoint"), not(feature = "cookie-store")))]
286 {
287 let (_initial_scheduler_state, _loaded_cookie_store) =
288 self.load_and_restore_checkpoint_state().await?;
289 let (scheduler_arc, req_rx) = Scheduler::new(None::<()>);
290 let downloader_arc = Arc::new(self.downloader);
291 let stats = Arc::new(StatCollector::new());
292 let crawler = Crawler::new(
293 scheduler_arc,
294 req_rx,
295 downloader_arc,
296 self.middlewares,
297 spider,
298 self.item_pipelines,
299 self.crawler_config.max_concurrent_downloads,
300 self.crawler_config.parser_workers,
301 self.crawler_config.max_concurrent_pipelines,
302 self.crawler_config.channel_capacity,
303 stats,
304 );
305 return Ok(crawler);
306 }
307 }
308
309 #[cfg(all(feature = "checkpoint", feature = "cookie-store"))]
310 async fn load_and_restore_checkpoint_state(
311 &mut self,
312 ) -> Result<(Option<SchedulerCheckpoint>, Option<crate::CookieStore>), SpiderError> {
313 let mut initial_scheduler_state = None;
314 let mut loaded_pipelines_state = None;
315 let mut loaded_cookie_store = None;
316
317 if let Some(path) = &self.checkpoint_path {
318 debug!("Attempting to load checkpoint from {:?}", path);
319 match fs::read(path) {
320 Ok(bytes) => match rmp_serde::from_slice::<crate::Checkpoint>(&bytes) {
321 Ok(checkpoint) => {
322 initial_scheduler_state = Some(checkpoint.scheduler);
323 loaded_pipelines_state = Some(checkpoint.pipelines);
324
325 loaded_cookie_store = Some(checkpoint.cookie_store);
326 }
327 Err(e) => warn!("Failed to deserialize checkpoint from {:?}: {}", path, e),
328 },
329 Err(e) => warn!("Failed to read checkpoint file {:?}: {}", path, e),
330 }
331 }
332
333 if let Some(pipeline_states) = loaded_pipelines_state {
334 for (name, state) in pipeline_states {
335 if let Some(pipeline) = self.item_pipelines.iter().find(|p| p.name() == name) {
336 pipeline.restore_state(state).await?;
337 } else {
338 warn!("Checkpoint contains state for unknown pipeline: {}", name);
339 }
340 }
341 }
342
343 Ok((initial_scheduler_state, loaded_cookie_store))
344 }
345
346 #[cfg(all(feature = "checkpoint", not(feature = "cookie-store")))]
347 async fn load_and_restore_checkpoint_state(
348 &mut self,
349 ) -> Result<(Option<SchedulerCheckpoint>, Option<()>), SpiderError> {
350 let mut initial_scheduler_state = None;
351 let mut loaded_pipelines_state = None;
352
353 if let Some(path) = &self.checkpoint_path {
354 debug!("Attempting to load checkpoint from {:?}", path);
355 match fs::read(path) {
356 Ok(bytes) => match rmp_serde::from_slice::<crate::Checkpoint>(&bytes) {
357 Ok(checkpoint) => {
358 initial_scheduler_state = Some(checkpoint.scheduler);
359 loaded_pipelines_state = Some(checkpoint.pipelines);
360 }
361 Err(e) => warn!("Failed to deserialize checkpoint from {:?}: {}", path, e),
362 },
363 Err(e) => warn!("Failed to read checkpoint file {:?}: {}", path, e),
364 }
365 }
366
367 if let Some(pipeline_states) = loaded_pipelines_state {
368 for (name, state) in pipeline_states {
369 if let Some(pipeline) = self.item_pipelines.iter().find(|p| p.name() == name) {
370 pipeline.restore_state(state).await?;
371 } else {
372 warn!("Checkpoint contains state for unknown pipeline: {}", name);
373 }
374 }
375 }
376
377 Ok((initial_scheduler_state, None))
378 }
379
380 #[cfg(all(not(feature = "checkpoint"), not(feature = "cookie-store")))]
381 async fn load_and_restore_checkpoint_state(
382 &mut self,
383 ) -> Result<(Option<()>, Option<()>), SpiderError> {
384 Ok((None, None))
386 }
387
388 #[cfg(all(not(feature = "checkpoint"), feature = "cookie-store"))]
389 async fn load_and_restore_checkpoint_state(
390 &mut self,
391 ) -> Result<(Option<()>, Option<crate::CookieStore>), SpiderError> {
392 Ok((None, Some(crate::CookieStore::default())))
394 }
395
396 fn validate_and_get_spider(&mut self) -> Result<S, SpiderError> {
397 if self.crawler_config.max_concurrent_downloads == 0 {
398 return Err(SpiderError::ConfigurationError(
399 "max_concurrent_downloads must be greater than 0.".to_string(),
400 ));
401 }
402 if self.crawler_config.parser_workers == 0 {
403 return Err(SpiderError::ConfigurationError(
404 "parser_workers must be greater than 0.".to_string(),
405 ));
406 }
407 self.spider.take().ok_or_else(|| {
408 SpiderError::ConfigurationError("Crawler must have a spider.".to_string())
409 })
410 }
411}
412