Skip to main content

spider_lib/
crawler.rs

1//! The core Crawler implementation for the `spider-lib` framework.
2//!
3//! This module defines the `Crawler` struct, which acts as the central orchestrator
4//! for the web scraping process. It ties together the scheduler, downloader,
5//! middlewares, spiders, and item pipelines to execute a crawl. The crawler
6//! manages the lifecycle of requests and items, handles concurrency, supports
7//! checkpointing for fault tolerance, and collects statistics for monitoring.
8//!
9//! It utilizes a task-based asynchronous model, spawning distinct tasks for
10//! handling initial requests, downloading web pages, parsing responses, and
11//! processing scraped items.
12
13use crate::downloader::Downloader;
14use crate::error::SpiderError;
15use crate::item::{ParseOutput, ScrapedItem};
16use crate::middleware::{Middleware, MiddlewareAction};
17use crate::pipeline::Pipeline;
18use crate::request::Request;
19use crate::response::Response;
20use crate::scheduler::Scheduler;
21use crate::spider::Spider;
22use crate::state::CrawlerState;
23use anyhow::Result;
24use futures_util::future::join_all;
25use kanal::{AsyncReceiver, AsyncSender, bounded_async};
26
27#[cfg(feature = "checkpoint")]
28use crate::checkpoint::save_checkpoint;
29#[cfg(feature = "checkpoint")]
30use std::path::PathBuf;
31use std::sync::Arc;
32use std::sync::atomic::Ordering;
33use std::time::Duration;
34use tokio::sync::Mutex;
35use crate::stats::StatCollector;
36use tokio::sync::Semaphore;
37use tokio::task::JoinSet;
38use tracing::{debug, error, info, warn};
39
40/// The central orchestrator for the web scraping process, handling requests, responses, items, concurrency, checkpointing, and statistics collection.
41pub struct Crawler<S: Spider, C> {
42    scheduler: Arc<Scheduler>,
43    req_rx: AsyncReceiver<Request>,
44    stats: Arc<StatCollector>, // Added
45    downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
46    middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
47    spider: Arc<Mutex<S>>,
48    item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
49    max_concurrent_downloads: usize,
50    parser_workers: usize,
51    max_concurrent_pipelines: usize,
52    #[cfg(feature = "checkpoint")]
53    checkpoint_path: Option<PathBuf>,
54    #[cfg(feature = "checkpoint")]
55    checkpoint_interval: Option<Duration>,
56}
57
58impl<S, C> Crawler<S, C>
59where
60    S: Spider + 'static,
61    S::Item: ScrapedItem,
62    C: Send + Sync + 'static,
63{
64    /// Creates a new `Crawler` instance with the given components and configuration.
65    #[allow(clippy::too_many_arguments)]
66    pub(crate) fn new(
67        scheduler: Arc<Scheduler>,
68        req_rx: AsyncReceiver<Request>,
69        downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
70        middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
71        spider: S,
72        item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
73        max_concurrent_downloads: usize,
74        parser_workers: usize,
75        max_concurrent_pipelines: usize,
76        #[cfg(feature = "checkpoint")] checkpoint_path: Option<PathBuf>,
77        #[cfg(feature = "checkpoint")] checkpoint_interval: Option<Duration>,
78        stats: Arc<StatCollector>,
79    ) -> Self {
80        Crawler {
81            scheduler,
82            req_rx,
83            stats,
84            downloader,
85            middlewares,
86            spider: Arc::new(Mutex::new(spider)),
87            item_pipelines,
88            max_concurrent_downloads,
89            parser_workers,
90            max_concurrent_pipelines,
91            #[cfg(feature = "checkpoint")]
92            checkpoint_path,
93            #[cfg(feature = "checkpoint")]
94            checkpoint_interval,
95        }
96    }
97
98    /// Starts the crawl, orchestrating the scraping process, managing tasks, handling shutdown, checkpointing, and logging statistics.
99    pub async fn start_crawl(self) -> Result<(), SpiderError> {
100        info!("Crawler starting crawl");
101
102        let Crawler {
103            scheduler,
104            req_rx,
105            stats,
106            downloader,
107            middlewares,
108            spider,
109            item_pipelines,
110            max_concurrent_downloads,
111            parser_workers,
112            max_concurrent_pipelines,
113            #[cfg(feature = "checkpoint")]
114            checkpoint_path,
115            #[cfg(feature = "checkpoint")]
116            checkpoint_interval,
117        } = self;
118
119        let state = CrawlerState::new();
120        let pipelines = Arc::new(item_pipelines);
121        let channel_capacity = max_concurrent_downloads * 2;
122
123        let (res_tx, res_rx) = bounded_async(channel_capacity);
124        let (item_tx, item_rx) = bounded_async(channel_capacity);
125
126        let initial_requests_task =
127            spawn_initial_requests_task::<S>(scheduler.clone(), spider.clone(), stats.clone());
128
129        let downloader_task = spawn_downloader_task::<S, C>(
130            scheduler.clone(),
131            req_rx,
132            downloader,
133            Arc::new(Mutex::new(middlewares)),
134            state.clone(),
135            res_tx.clone(),
136            max_concurrent_downloads,
137            stats.clone(),
138        );
139
140        let parser_task = spawn_parser_task::<S>(
141            scheduler.clone(),
142            spider.clone(),
143            state.clone(),
144            res_rx,
145            item_tx.clone(),
146            parser_workers,
147            stats.clone(),
148        );
149
150        let item_processor_task = spawn_item_processor_task::<S>(
151            state.clone(),
152            item_rx,
153            pipelines.clone(),
154            max_concurrent_pipelines,
155            stats.clone(),
156        );
157
158        #[cfg(feature = "checkpoint")]
159        if let (Some(path), Some(interval)) = (&checkpoint_path, checkpoint_interval) {
160            let scheduler_clone = scheduler.clone();
161            let pipelines_clone = pipelines.clone();
162            let path_clone = path.clone();
163
164            tokio::spawn(async move {
165                let mut interval_timer = tokio::time::interval(interval);
166                interval_timer.tick().await;
167                loop {
168                    tokio::select! {
169                        _ = interval_timer.tick() => {
170                            if let Ok(scheduler_checkpoint) = scheduler_clone.snapshot().await &&
171                                let Err(e) = save_checkpoint::<S>(&path_clone, scheduler_checkpoint, &pipelines_clone).await {
172                                    error!("Periodic checkpoint save failed: {}", e);
173                            }
174                        }
175                    }
176                }
177            });
178        }
179
180        tokio::select! {
181            _ = tokio::signal::ctrl_c() => {
182                info!("Ctrl-C received, initiating graceful shutdown.");
183            }
184            _ = async {
185                loop {
186                    if scheduler.is_idle() && state.is_idle() {
187                        tokio::time::sleep(Duration::from_millis(50)).await;
188                        if scheduler.is_idle() && state.is_idle() {
189                            break;
190                        }
191                    }
192                    tokio::time::sleep(Duration::from_millis(100)).await;
193                }
194            } => {
195                info!("Crawl has become idle, initiating shutdown.");
196            }
197        }
198
199        info!("Initiating actor shutdowns.");
200
201        #[cfg(feature = "checkpoint")]
202        let scheduler_checkpoint = scheduler.snapshot().await?;
203
204        drop(res_tx);
205        drop(item_tx);
206
207        scheduler.shutdown().await?;
208
209        item_processor_task
210            .await
211            .map_err(|e| SpiderError::GeneralError(format!("Item processor task failed: {}", e)))?;
212
213        parser_task
214            .await
215            .map_err(|e| SpiderError::GeneralError(format!("Parser task failed: {}", e)))?;
216
217        downloader_task
218            .await
219            .map_err(|e| SpiderError::GeneralError(format!("Downloader task failed: {}", e)))?;
220
221        initial_requests_task.await.map_err(|e| {
222            SpiderError::GeneralError(format!("Initial requests task failed: {}", e))
223        })?;
224
225        #[cfg(feature = "checkpoint")]
226        if let Some(path) = &checkpoint_path
227            && let Err(e) = save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines).await
228        {
229            error!("Final checkpoint save failed: {}", e);
230        }
231
232        // Close all pipelines
233        info!("Closing item pipelines...");
234        let closing_futures: Vec<_> = pipelines.iter().map(|p| p.close()).collect();
235        join_all(closing_futures).await;
236
237        info!("Crawl finished successfully.");
238        Ok(())
239    }
240
241    /// Returns a cloned Arc to the `StatCollector` instance used by this crawler.
242    ///
243    /// This allows programmatic access to the collected statistics at any time during or after the crawl.
244    pub fn get_stats(&self) -> Arc<StatCollector> {
245        Arc::clone(&self.stats)
246    }
247}
248
249fn spawn_initial_requests_task<S>(
250    scheduler: Arc<Scheduler>,
251    spider: Arc<Mutex<S>>,
252    stats: Arc<StatCollector>,
253) -> tokio::task::JoinHandle<()>
254where
255    S: Spider + 'static,
256    S::Item: ScrapedItem,
257{
258    tokio::spawn(async move {
259        match spider.lock().await.start_requests() {
260            Ok(requests) => {
261                for mut req in requests {
262                    req.url.set_fragment(None);
263                    match scheduler.enqueue_request(req).await {
264                        Ok(_) => {
265                            stats.increment_requests_enqueued();
266                        }
267                        Err(e) => {
268                            error!("Failed to enqueue initial request: {}", e);
269                        }
270                    }
271                }
272            }
273            Err(e) => error!("Failed to create start requests: {}", e),
274        }
275    })
276}
277
278#[allow(clippy::too_many_arguments)]
279fn spawn_downloader_task<S, C>(
280    scheduler: Arc<Scheduler>,
281    req_rx: AsyncReceiver<Request>,
282    downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
283    middlewares: Arc<Mutex<Vec<Box<dyn Middleware<C> + Send + Sync>>>>,
284    state: Arc<CrawlerState>,
285    res_tx: AsyncSender<Response>,
286    max_concurrent_downloads: usize,
287    stats: Arc<StatCollector>,
288) -> tokio::task::JoinHandle<()>
289where
290    S: Spider + 'static,
291    S::Item: ScrapedItem,
292    C: Send + Sync + 'static,
293{
294    let semaphore = Arc::new(Semaphore::new(max_concurrent_downloads));
295    let mut tasks = JoinSet::new();
296
297    tokio::spawn(async move {
298        while let Ok(request) = req_rx.recv().await {
299            let permit = match semaphore.clone().acquire_owned().await {
300                Ok(permit) => permit,
301                Err(_) => {
302                    warn!("Semaphore closed, shutting down downloader actor.");
303                    break;
304                }
305            };
306
307            state.in_flight_requests.fetch_add(1, Ordering::SeqCst);
308            let downloader_clone = Arc::clone(&downloader);
309            let middlewares_clone = Arc::clone(&middlewares);
310            let res_tx_clone = res_tx.clone();
311            let state_clone = Arc::clone(&state);
312            let scheduler_clone = Arc::clone(&scheduler);
313            let stats_clone = Arc::clone(&stats);
314
315            tasks.spawn(async move {
316                let mut early_returned_response: Option<Response> = None;
317
318                // Process request middlewares
319                let mut processed_request_opt = Some(request); 
320                for mw in middlewares_clone.lock().await.iter_mut() {
321                    let req_to_process = processed_request_opt.take().expect("Request should be present before middleware processing");
322                    match mw.process_request(downloader_clone.client(), req_to_process).await {
323                        Ok(MiddlewareAction::Continue(req)) => {
324                            processed_request_opt = Some(req);
325                        }
326                        Ok(MiddlewareAction::Retry(req, delay)) => {
327                            stats_clone.increment_requests_retried();
328                            tokio::time::sleep(delay).await;
329                            if scheduler_clone.enqueue_request(*req).await.is_err() {
330                                error!("Failed to re-enqueue retried request.");
331                            }
332                            return;
333                        }
334                        Ok(MiddlewareAction::Drop) => {
335                            stats_clone.increment_requests_dropped();
336                            debug!("Request dropped by middleware.");
337                            return;
338                        }
339                        Ok(MiddlewareAction::ReturnResponse(resp)) => {
340                            early_returned_response = Some(resp);
341                            break;
342                        }
343                        Err(e) => {
344                            error!("Request middleware error: {:?}", e);
345                            return;
346                        }
347                    }
348                }
349
350                // Download or use early response
351                // If early_returned_response is Some, request was consumed by a middleware
352                // If early_returned_response is None, processed_request_opt must contain the request
353                let response = match early_returned_response { 
354                    Some(resp) => {
355                        if resp.cached {
356                            stats_clone.increment_responses_from_cache();
357                        }
358                        stats_clone.increment_requests_succeeded();
359                        stats_clone.increment_responses_received();
360                        stats_clone.record_response_status(resp.status.as_u16());
361                        resp
362                    },
363                    None => {
364                        let request_for_download = processed_request_opt.expect("Request must be available for download if not handled by middleware or early returned response");
365                        stats_clone.increment_requests_sent();
366                        match downloader_clone.download(request_for_download).await { 
367                            Ok(resp) => {
368                                stats_clone.increment_requests_succeeded();
369                                stats_clone.increment_responses_received();
370                                stats_clone.record_response_status(resp.status.as_u16());
371                                // Corrected for Response.body being Bytes, not Option<Body>
372                                stats_clone.add_bytes_downloaded(resp.body.len());
373                                resp
374                            },
375                            Err(e) => {
376                                stats_clone.increment_requests_failed();
377                                error!("Download error: {:?}", e);
378                                return;
379                            }
380                        }
381                    },
382                };
383
384                // Process response middlewares
385                let mut processed_response_opt = Some(response); 
386                for mw in middlewares_clone.lock().await.iter_mut().rev() {
387                    let res_to_process = processed_response_opt.take().expect("Response should be present before middleware processing"); // Take ownership for current middleware
388                    match mw.process_response(res_to_process).await {
389                        Ok(MiddlewareAction::Continue(res)) => {
390                            processed_response_opt = Some(res); // Reassign for next middleware
391                        }
392                        Ok(MiddlewareAction::Retry(req, delay)) => {
393                            stats_clone.increment_requests_retried();
394                            tokio::time::sleep(delay).await;
395                            if scheduler_clone.enqueue_request(*req).await.is_err() {
396                                error!("Failed to re-enqueue retried request.");
397                            }
398                            return;
399                        }
400                        Ok(MiddlewareAction::Drop) => {
401                            stats_clone.increment_requests_dropped();
402                            debug!("Response dropped by middleware.");
403                            return;
404                        }
405                        Ok(MiddlewareAction::ReturnResponse(_)) => {
406                            // This indicates the middleware has fully handled or consumed the response.
407                            // Effectively, the response is dropped from further processing by this chain.
408                            debug!("ReturnResponse action encountered in process_response; this is unexpected and effectively drops the response for further processing.");
409                            processed_response_opt = None; 
410                            break; 
411                        }
412                        Err(e) => {
413                            error!("Response middleware error: {:?}", e);
414                            return;
415                        }
416                    }
417                }
418
419                // Send the final processed response, if it still exists
420                if let Some(final_response) = processed_response_opt
421                    && res_tx_clone.send(final_response).await.is_err() {
422                    error!("Response channel closed, cannot send parsed response.");
423                }
424
425                state_clone.in_flight_requests.fetch_sub(1, Ordering::SeqCst);
426                drop(permit);
427            });
428        }
429        while let Some(res) = tasks.join_next().await {
430            if let Err(e) = res {
431                error!("A download task failed: {:?}", e);
432            }
433        }
434    })
435}
436
437fn spawn_parser_task<S>(
438    scheduler: Arc<Scheduler>,
439    spider: Arc<Mutex<S>>,
440    state: Arc<CrawlerState>,
441    res_rx: AsyncReceiver<Response>,
442    item_tx: AsyncSender<S::Item>,
443    parser_workers: usize,
444    stats: Arc<StatCollector>,
445) -> tokio::task::JoinHandle<()>
446where
447    S: Spider + 'static,
448    S::Item: ScrapedItem,
449{
450    let mut tasks = JoinSet::new();
451    let internal_parse_tx: AsyncSender<Response>;
452    let internal_parse_rx: AsyncReceiver<Response>;
453    (internal_parse_tx, internal_parse_rx) = bounded_async(parser_workers * 2);
454
455            // Spawn N parsing worker tasks
456
457        for _ in 0..parser_workers {
458
459            let internal_parse_rx_clone = internal_parse_rx.clone();
460
461            let spider_clone = Arc::clone(&spider);
462
463            let scheduler_clone = Arc::clone(&scheduler);
464
465            let item_tx_clone = item_tx.clone();
466
467            let state_clone = Arc::clone(&state);
468
469            let stats_clone = Arc::clone(&stats);
470
471    
472
473            tasks.spawn(async move {
474
475                while let Ok(response) = internal_parse_rx_clone.recv().await {
476
477                    debug!("Parsing response from {}", response.url);
478
479                    match spider_clone.lock().await.parse(response).await {
480
481                        Ok(outputs) => {
482
483                                                        process_crawl_outputs::<S>(
484
485                                                            outputs,
486
487                                                            scheduler_clone.clone(),
488
489                                                            item_tx_clone.clone(),
490
491                                                            state_clone.clone(),
492
493                                                            stats_clone.clone(),
494
495                                                    )
496                        .await;
497                    }
498                    Err(e) => error!("Spider parsing error: {:?}", e),
499                }
500                state_clone.parsing_responses.fetch_sub(1, Ordering::SeqCst);
501            }
502        });
503    }
504
505    tokio::spawn(async move {
506        while let Ok(response) = res_rx.recv().await {
507            state.parsing_responses.fetch_add(1, Ordering::SeqCst);
508            if internal_parse_tx.send(response).await.is_err() {
509                error!("Internal parse channel closed, cannot send response to parser worker.");
510                state.parsing_responses.fetch_sub(1, Ordering::SeqCst);
511            }
512        }
513
514        drop(internal_parse_tx);
515
516        while let Some(res) = tasks.join_next().await {
517            if let Err(e) = res {
518                error!("A parsing worker task failed: {:?}", e);
519            }
520        }
521    })
522}
523
524async fn process_crawl_outputs<S>(
525    outputs: ParseOutput<S::Item>,
526    scheduler: Arc<Scheduler>,
527    item_tx: AsyncSender<S::Item>,
528    state: Arc<CrawlerState>,
529    stats: Arc<StatCollector>,
530) where
531    S: Spider + 'static,
532    S::Item: ScrapedItem,
533{
534    let (items, requests) = outputs.into_parts();
535    info!(
536        "Processed {} requests and {} items from spider output.",
537        requests.len(),
538        items.len()
539    );
540
541    stats.increment_items_scraped();
542
543    let mut request_error_total = 0;
544    for request in requests {
545        match scheduler.enqueue_request(request).await {
546            Ok(_) => {
547                // Stat: requests_enqueued
548                stats.increment_requests_enqueued();
549            }
550            Err(_) => {
551                request_error_total += 1;
552            }
553        }
554    }
555    if request_error_total > 0 {
556        error!(
557            "Failed to enqueue {} requests: scheduler channel closed.",
558            request_error_total
559        );
560    }
561
562    let mut item_error_total = 0;
563    for item in items {
564        state.processing_items.fetch_add(1, Ordering::SeqCst);
565        if item_tx.send(item).await.is_err() {
566            item_error_total += 1;
567            state.processing_items.fetch_sub(1, Ordering::SeqCst);
568        }
569    }
570    if item_error_total > 0 {
571        error!(
572            "Failed to send {} scraped items: channel closed.",
573            item_error_total
574        );
575    }
576}
577
578fn spawn_item_processor_task<S>(
579    state: Arc<CrawlerState>,
580    item_rx: AsyncReceiver<S::Item>,
581    pipelines: Arc<Vec<Box<dyn Pipeline<S::Item>>>>,
582    max_concurrent_pipelines: usize,
583    stats: Arc<StatCollector>,
584) -> tokio::task::JoinHandle<()>
585where
586    S: Spider + 'static,
587    S::Item: ScrapedItem,
588{
589    let mut tasks = JoinSet::new();
590    let semaphore = Arc::new(Semaphore::new(max_concurrent_pipelines));
591    tokio::spawn(async move {
592        while let Ok(item) = item_rx.recv().await {
593            let permit = match semaphore.clone().acquire_owned().await {
594                Ok(p) => p,
595                Err(_) => {
596                    warn!("Semaphore closed, shutting down item processor actor.");
597                    break;
598                }
599            };
600
601            let state_clone = Arc::clone(&state);
602            let pipelines_clone = Arc::clone(&pipelines);
603            let stats_clone = Arc::clone(&stats);
604            tasks.spawn(async move {
605                let mut item_to_process = Some(item);
606                for pipeline in pipelines_clone.iter() {
607                    if let Some(item) = item_to_process.take() {
608                        match pipeline.process_item(item).await {
609                            Ok(Some(next_item)) => item_to_process = Some(next_item),
610                            Ok(None) => {
611                                stats_clone.increment_items_dropped_by_pipeline();
612                                break;
613                            }
614                            Err(e) => {
615                                error!("Pipeline error for {}: {:?}", pipeline.name(), e);
616                                stats_clone.increment_items_dropped_by_pipeline();
617                                break;
618                            }
619                        }
620                    } else {
621                        break;
622                    }
623                }
624                // If item survived all pipelines, it's processed
625                if item_to_process.is_some() {
626                    stats_clone.increment_items_processed();
627                }
628                state_clone.processing_items.fetch_sub(1, Ordering::SeqCst);
629                drop(permit);
630            });
631        }
632        while let Some(res) = tasks.join_next().await {
633            if let Err(e) = res {
634                error!("An item processing task failed: {:?}", e);
635            }
636        }
637    })
638}