1use crate::downloader::Downloader;
14use crate::error::SpiderError;
15use crate::item::{ParseOutput, ScrapedItem};
16use crate::middleware::{Middleware, MiddlewareAction};
17use crate::pipeline::Pipeline;
18use crate::request::Request;
19use crate::response::Response;
20use crate::scheduler::Scheduler;
21use crate::spider::Spider;
22use crate::state::CrawlerState;
23use anyhow::Result;
24use futures_util::future::join_all;
25use kanal::{AsyncReceiver, AsyncSender, bounded_async};
26
27#[cfg(feature = "checkpoint")]
28use crate::checkpoint::save_checkpoint;
29#[cfg(feature = "checkpoint")]
30use std::path::PathBuf;
31use std::sync::Arc;
32use std::sync::atomic::Ordering;
33use std::time::Duration;
34use tokio::sync::Mutex;
35use crate::stats::StatCollector;
36use tokio::sync::Semaphore;
37use tokio::task::JoinSet;
38use tracing::{debug, error, info, warn};
39
40pub struct Crawler<S: Spider, C> {
42 scheduler: Arc<Scheduler>,
43 req_rx: AsyncReceiver<Request>,
44 stats: Arc<StatCollector>, downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
46 middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
47 spider: Arc<Mutex<S>>,
48 item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
49 max_concurrent_downloads: usize,
50 parser_workers: usize,
51 max_concurrent_pipelines: usize,
52 #[cfg(feature = "checkpoint")]
53 checkpoint_path: Option<PathBuf>,
54 #[cfg(feature = "checkpoint")]
55 checkpoint_interval: Option<Duration>,
56}
57
58impl<S, C> Crawler<S, C>
59where
60 S: Spider + 'static,
61 S::Item: ScrapedItem,
62 C: Send + Sync + 'static,
63{
64 #[allow(clippy::too_many_arguments)]
66 pub(crate) fn new(
67 scheduler: Arc<Scheduler>,
68 req_rx: AsyncReceiver<Request>,
69 downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
70 middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
71 spider: S,
72 item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
73 max_concurrent_downloads: usize,
74 parser_workers: usize,
75 max_concurrent_pipelines: usize,
76 #[cfg(feature = "checkpoint")] checkpoint_path: Option<PathBuf>,
77 #[cfg(feature = "checkpoint")] checkpoint_interval: Option<Duration>,
78 stats: Arc<StatCollector>,
79 ) -> Self {
80 Crawler {
81 scheduler,
82 req_rx,
83 stats,
84 downloader,
85 middlewares,
86 spider: Arc::new(Mutex::new(spider)),
87 item_pipelines,
88 max_concurrent_downloads,
89 parser_workers,
90 max_concurrent_pipelines,
91 #[cfg(feature = "checkpoint")]
92 checkpoint_path,
93 #[cfg(feature = "checkpoint")]
94 checkpoint_interval,
95 }
96 }
97
98 pub async fn start_crawl(self) -> Result<(), SpiderError> {
100 info!("Crawler starting crawl");
101
102 let Crawler {
103 scheduler,
104 req_rx,
105 stats,
106 downloader,
107 middlewares,
108 spider,
109 item_pipelines,
110 max_concurrent_downloads,
111 parser_workers,
112 max_concurrent_pipelines,
113 #[cfg(feature = "checkpoint")]
114 checkpoint_path,
115 #[cfg(feature = "checkpoint")]
116 checkpoint_interval,
117 } = self;
118
119 let state = CrawlerState::new();
120 let pipelines = Arc::new(item_pipelines);
121 let channel_capacity = max_concurrent_downloads * 2;
122
123 let (res_tx, res_rx) = bounded_async(channel_capacity);
124 let (item_tx, item_rx) = bounded_async(channel_capacity);
125
126 let initial_requests_task =
127 spawn_initial_requests_task::<S>(scheduler.clone(), spider.clone(), stats.clone());
128
129 let downloader_task = spawn_downloader_task::<S, C>(
130 scheduler.clone(),
131 req_rx,
132 downloader,
133 Arc::new(Mutex::new(middlewares)),
134 state.clone(),
135 res_tx.clone(),
136 max_concurrent_downloads,
137 stats.clone(),
138 );
139
140 let parser_task = spawn_parser_task::<S>(
141 scheduler.clone(),
142 spider.clone(),
143 state.clone(),
144 res_rx,
145 item_tx.clone(),
146 parser_workers,
147 stats.clone(),
148 );
149
150 let item_processor_task = spawn_item_processor_task::<S>(
151 state.clone(),
152 item_rx,
153 pipelines.clone(),
154 max_concurrent_pipelines,
155 stats.clone(),
156 );
157
158 #[cfg(feature = "checkpoint")]
159 if let (Some(path), Some(interval)) = (&checkpoint_path, checkpoint_interval) {
160 let scheduler_clone = scheduler.clone();
161 let pipelines_clone = pipelines.clone();
162 let path_clone = path.clone();
163
164 tokio::spawn(async move {
165 let mut interval_timer = tokio::time::interval(interval);
166 interval_timer.tick().await;
167 loop {
168 tokio::select! {
169 _ = interval_timer.tick() => {
170 if let Ok(scheduler_checkpoint) = scheduler_clone.snapshot().await &&
171 let Err(e) = save_checkpoint::<S>(&path_clone, scheduler_checkpoint, &pipelines_clone).await {
172 error!("Periodic checkpoint save failed: {}", e);
173 }
174 }
175 }
176 }
177 });
178 }
179
180 tokio::select! {
181 _ = tokio::signal::ctrl_c() => {
182 info!("Ctrl-C received, initiating graceful shutdown.");
183 }
184 _ = async {
185 loop {
186 if scheduler.is_idle() && state.is_idle() {
187 tokio::time::sleep(Duration::from_millis(50)).await;
188 if scheduler.is_idle() && state.is_idle() {
189 break;
190 }
191 }
192 tokio::time::sleep(Duration::from_millis(100)).await;
193 }
194 } => {
195 info!("Crawl has become idle, initiating shutdown.");
196 }
197 }
198
199 info!("Initiating actor shutdowns.");
200
201 #[cfg(feature = "checkpoint")]
202 let scheduler_checkpoint = scheduler.snapshot().await?;
203
204 drop(res_tx);
205 drop(item_tx);
206
207 scheduler.shutdown().await?;
208
209 item_processor_task
210 .await
211 .map_err(|e| SpiderError::GeneralError(format!("Item processor task failed: {}", e)))?;
212
213 parser_task
214 .await
215 .map_err(|e| SpiderError::GeneralError(format!("Parser task failed: {}", e)))?;
216
217 downloader_task
218 .await
219 .map_err(|e| SpiderError::GeneralError(format!("Downloader task failed: {}", e)))?;
220
221 initial_requests_task.await.map_err(|e| {
222 SpiderError::GeneralError(format!("Initial requests task failed: {}", e))
223 })?;
224
225 #[cfg(feature = "checkpoint")]
226 if let Some(path) = &checkpoint_path
227 && let Err(e) = save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines).await
228 {
229 error!("Final checkpoint save failed: {}", e);
230 }
231
232 info!("Closing item pipelines...");
234 let closing_futures: Vec<_> = pipelines.iter().map(|p| p.close()).collect();
235 join_all(closing_futures).await;
236
237 info!("Crawl finished successfully.");
238 Ok(())
239 }
240
241 pub fn get_stats(&self) -> Arc<StatCollector> {
245 Arc::clone(&self.stats)
246 }
247}
248
249fn spawn_initial_requests_task<S>(
250 scheduler: Arc<Scheduler>,
251 spider: Arc<Mutex<S>>,
252 stats: Arc<StatCollector>,
253) -> tokio::task::JoinHandle<()>
254where
255 S: Spider + 'static,
256 S::Item: ScrapedItem,
257{
258 tokio::spawn(async move {
259 match spider.lock().await.start_requests() {
260 Ok(requests) => {
261 for mut req in requests {
262 req.url.set_fragment(None);
263 match scheduler.enqueue_request(req).await {
264 Ok(_) => {
265 stats.increment_requests_enqueued();
266 }
267 Err(e) => {
268 error!("Failed to enqueue initial request: {}", e);
269 }
270 }
271 }
272 }
273 Err(e) => error!("Failed to create start requests: {}", e),
274 }
275 })
276}
277
278#[allow(clippy::too_many_arguments)]
279fn spawn_downloader_task<S, C>(
280 scheduler: Arc<Scheduler>,
281 req_rx: AsyncReceiver<Request>,
282 downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
283 middlewares: Arc<Mutex<Vec<Box<dyn Middleware<C> + Send + Sync>>>>,
284 state: Arc<CrawlerState>,
285 res_tx: AsyncSender<Response>,
286 max_concurrent_downloads: usize,
287 stats: Arc<StatCollector>,
288) -> tokio::task::JoinHandle<()>
289where
290 S: Spider + 'static,
291 S::Item: ScrapedItem,
292 C: Send + Sync + 'static,
293{
294 let semaphore = Arc::new(Semaphore::new(max_concurrent_downloads));
295 let mut tasks = JoinSet::new();
296
297 tokio::spawn(async move {
298 while let Ok(request) = req_rx.recv().await {
299 let permit = match semaphore.clone().acquire_owned().await {
300 Ok(permit) => permit,
301 Err(_) => {
302 warn!("Semaphore closed, shutting down downloader actor.");
303 break;
304 }
305 };
306
307 state.in_flight_requests.fetch_add(1, Ordering::SeqCst);
308 let downloader_clone = Arc::clone(&downloader);
309 let middlewares_clone = Arc::clone(&middlewares);
310 let res_tx_clone = res_tx.clone();
311 let state_clone = Arc::clone(&state);
312 let scheduler_clone = Arc::clone(&scheduler);
313 let stats_clone = Arc::clone(&stats);
314
315 tasks.spawn(async move {
316 let mut early_returned_response: Option<Response> = None;
317
318 let mut processed_request_opt = Some(request);
320 for mw in middlewares_clone.lock().await.iter_mut() {
321 let req_to_process = processed_request_opt.take().expect("Request should be present before middleware processing");
322 match mw.process_request(downloader_clone.client(), req_to_process).await {
323 Ok(MiddlewareAction::Continue(req)) => {
324 processed_request_opt = Some(req);
325 }
326 Ok(MiddlewareAction::Retry(req, delay)) => {
327 stats_clone.increment_requests_retried();
328 tokio::time::sleep(delay).await;
329 if scheduler_clone.enqueue_request(*req).await.is_err() {
330 error!("Failed to re-enqueue retried request.");
331 }
332 return;
333 }
334 Ok(MiddlewareAction::Drop) => {
335 stats_clone.increment_requests_dropped();
336 debug!("Request dropped by middleware.");
337 return;
338 }
339 Ok(MiddlewareAction::ReturnResponse(resp)) => {
340 early_returned_response = Some(resp);
341 break;
342 }
343 Err(e) => {
344 error!("Request middleware error: {:?}", e);
345 return;
346 }
347 }
348 }
349
350 let response = match early_returned_response {
354 Some(resp) => {
355 if resp.cached {
356 stats_clone.increment_responses_from_cache();
357 }
358 stats_clone.increment_requests_succeeded();
359 stats_clone.increment_responses_received();
360 stats_clone.record_response_status(resp.status.as_u16());
361 resp
362 },
363 None => {
364 let request_for_download = processed_request_opt.expect("Request must be available for download if not handled by middleware or early returned response");
365 stats_clone.increment_requests_sent();
366 match downloader_clone.download(request_for_download).await {
367 Ok(resp) => {
368 stats_clone.increment_requests_succeeded();
369 stats_clone.increment_responses_received();
370 stats_clone.record_response_status(resp.status.as_u16());
371 stats_clone.add_bytes_downloaded(resp.body.len());
373 resp
374 },
375 Err(e) => {
376 stats_clone.increment_requests_failed();
377 error!("Download error: {:?}", e);
378 return;
379 }
380 }
381 },
382 };
383
384 let mut processed_response_opt = Some(response);
386 for mw in middlewares_clone.lock().await.iter_mut().rev() {
387 let res_to_process = processed_response_opt.take().expect("Response should be present before middleware processing"); match mw.process_response(res_to_process).await {
389 Ok(MiddlewareAction::Continue(res)) => {
390 processed_response_opt = Some(res); }
392 Ok(MiddlewareAction::Retry(req, delay)) => {
393 stats_clone.increment_requests_retried();
394 tokio::time::sleep(delay).await;
395 if scheduler_clone.enqueue_request(*req).await.is_err() {
396 error!("Failed to re-enqueue retried request.");
397 }
398 return;
399 }
400 Ok(MiddlewareAction::Drop) => {
401 stats_clone.increment_requests_dropped();
402 debug!("Response dropped by middleware.");
403 return;
404 }
405 Ok(MiddlewareAction::ReturnResponse(_)) => {
406 debug!("ReturnResponse action encountered in process_response; this is unexpected and effectively drops the response for further processing.");
409 processed_response_opt = None;
410 break;
411 }
412 Err(e) => {
413 error!("Response middleware error: {:?}", e);
414 return;
415 }
416 }
417 }
418
419 if let Some(final_response) = processed_response_opt
421 && res_tx_clone.send(final_response).await.is_err() {
422 error!("Response channel closed, cannot send parsed response.");
423 }
424
425 state_clone.in_flight_requests.fetch_sub(1, Ordering::SeqCst);
426 drop(permit);
427 });
428 }
429 while let Some(res) = tasks.join_next().await {
430 if let Err(e) = res {
431 error!("A download task failed: {:?}", e);
432 }
433 }
434 })
435}
436
437fn spawn_parser_task<S>(
438 scheduler: Arc<Scheduler>,
439 spider: Arc<Mutex<S>>,
440 state: Arc<CrawlerState>,
441 res_rx: AsyncReceiver<Response>,
442 item_tx: AsyncSender<S::Item>,
443 parser_workers: usize,
444 stats: Arc<StatCollector>,
445) -> tokio::task::JoinHandle<()>
446where
447 S: Spider + 'static,
448 S::Item: ScrapedItem,
449{
450 let mut tasks = JoinSet::new();
451 let internal_parse_tx: AsyncSender<Response>;
452 let internal_parse_rx: AsyncReceiver<Response>;
453 (internal_parse_tx, internal_parse_rx) = bounded_async(parser_workers * 2);
454
455 for _ in 0..parser_workers {
458
459 let internal_parse_rx_clone = internal_parse_rx.clone();
460
461 let spider_clone = Arc::clone(&spider);
462
463 let scheduler_clone = Arc::clone(&scheduler);
464
465 let item_tx_clone = item_tx.clone();
466
467 let state_clone = Arc::clone(&state);
468
469 let stats_clone = Arc::clone(&stats);
470
471
472
473 tasks.spawn(async move {
474
475 while let Ok(response) = internal_parse_rx_clone.recv().await {
476
477 debug!("Parsing response from {}", response.url);
478
479 match spider_clone.lock().await.parse(response).await {
480
481 Ok(outputs) => {
482
483 process_crawl_outputs::<S>(
484
485 outputs,
486
487 scheduler_clone.clone(),
488
489 item_tx_clone.clone(),
490
491 state_clone.clone(),
492
493 stats_clone.clone(),
494
495 )
496 .await;
497 }
498 Err(e) => error!("Spider parsing error: {:?}", e),
499 }
500 state_clone.parsing_responses.fetch_sub(1, Ordering::SeqCst);
501 }
502 });
503 }
504
505 tokio::spawn(async move {
506 while let Ok(response) = res_rx.recv().await {
507 state.parsing_responses.fetch_add(1, Ordering::SeqCst);
508 if internal_parse_tx.send(response).await.is_err() {
509 error!("Internal parse channel closed, cannot send response to parser worker.");
510 state.parsing_responses.fetch_sub(1, Ordering::SeqCst);
511 }
512 }
513
514 drop(internal_parse_tx);
515
516 while let Some(res) = tasks.join_next().await {
517 if let Err(e) = res {
518 error!("A parsing worker task failed: {:?}", e);
519 }
520 }
521 })
522}
523
524async fn process_crawl_outputs<S>(
525 outputs: ParseOutput<S::Item>,
526 scheduler: Arc<Scheduler>,
527 item_tx: AsyncSender<S::Item>,
528 state: Arc<CrawlerState>,
529 stats: Arc<StatCollector>,
530) where
531 S: Spider + 'static,
532 S::Item: ScrapedItem,
533{
534 let (items, requests) = outputs.into_parts();
535 info!(
536 "Processed {} requests and {} items from spider output.",
537 requests.len(),
538 items.len()
539 );
540
541 stats.increment_items_scraped();
542
543 let mut request_error_total = 0;
544 for request in requests {
545 match scheduler.enqueue_request(request).await {
546 Ok(_) => {
547 stats.increment_requests_enqueued();
549 }
550 Err(_) => {
551 request_error_total += 1;
552 }
553 }
554 }
555 if request_error_total > 0 {
556 error!(
557 "Failed to enqueue {} requests: scheduler channel closed.",
558 request_error_total
559 );
560 }
561
562 let mut item_error_total = 0;
563 for item in items {
564 state.processing_items.fetch_add(1, Ordering::SeqCst);
565 if item_tx.send(item).await.is_err() {
566 item_error_total += 1;
567 state.processing_items.fetch_sub(1, Ordering::SeqCst);
568 }
569 }
570 if item_error_total > 0 {
571 error!(
572 "Failed to send {} scraped items: channel closed.",
573 item_error_total
574 );
575 }
576}
577
578fn spawn_item_processor_task<S>(
579 state: Arc<CrawlerState>,
580 item_rx: AsyncReceiver<S::Item>,
581 pipelines: Arc<Vec<Box<dyn Pipeline<S::Item>>>>,
582 max_concurrent_pipelines: usize,
583 stats: Arc<StatCollector>,
584) -> tokio::task::JoinHandle<()>
585where
586 S: Spider + 'static,
587 S::Item: ScrapedItem,
588{
589 let mut tasks = JoinSet::new();
590 let semaphore = Arc::new(Semaphore::new(max_concurrent_pipelines));
591 tokio::spawn(async move {
592 while let Ok(item) = item_rx.recv().await {
593 let permit = match semaphore.clone().acquire_owned().await {
594 Ok(p) => p,
595 Err(_) => {
596 warn!("Semaphore closed, shutting down item processor actor.");
597 break;
598 }
599 };
600
601 let state_clone = Arc::clone(&state);
602 let pipelines_clone = Arc::clone(&pipelines);
603 let stats_clone = Arc::clone(&stats);
604 tasks.spawn(async move {
605 let mut item_to_process = Some(item);
606 for pipeline in pipelines_clone.iter() {
607 if let Some(item) = item_to_process.take() {
608 match pipeline.process_item(item).await {
609 Ok(Some(next_item)) => item_to_process = Some(next_item),
610 Ok(None) => {
611 stats_clone.increment_items_dropped_by_pipeline();
612 break;
613 }
614 Err(e) => {
615 error!("Pipeline error for {}: {:?}", pipeline.name(), e);
616 stats_clone.increment_items_dropped_by_pipeline();
617 break;
618 }
619 }
620 } else {
621 break;
622 }
623 }
624 if item_to_process.is_some() {
626 stats_clone.increment_items_processed();
627 }
628 state_clone.processing_items.fetch_sub(1, Ordering::SeqCst);
629 drop(permit);
630 });
631 }
632 while let Some(res) = tasks.join_next().await {
633 if let Err(e) = res {
634 error!("An item processing task failed: {:?}", e);
635 }
636 }
637 })
638}