Skip to main content

spider_lib/crawler/
core.rs

1//! The core Crawler implementation for the `spider-lib` framework.
2//!
3//! This module defines the `Crawler` struct, which acts as the central orchestrator
4//! for the web scraping process. It ties together the scheduler, downloader,
5//! middlewares, spiders, and item pipelines to execute a crawl. The crawler
6//! manages the lifecycle of requests and items, handles concurrency, supports
7//! checkpointing for fault tolerance, and collects statistics for monitoring.
8//!
9//! It utilizes a task-based asynchronous model, spawning distinct tasks for
10//! handling initial requests, downloading web pages, parsing responses, and
11//! processing scraped items.
12
13use crate::downloader::Downloader;
14use crate::error::SpiderError;
15use crate::item::ScrapedItem;
16use crate::middleware::Middleware;
17use crate::pipeline::Pipeline;
18use crate::request::Request;
19use crate::scheduler::Scheduler;
20use crate::spider::Spider;
21use crate::state::CrawlerState;
22use crate::stats::StatCollector;
23use anyhow::Result;
24use futures_util::future::join_all;
25use kanal::{AsyncReceiver, bounded_async};
26use tracing::{error, info};
27
28#[cfg(feature = "checkpoint")]
29use crate::checkpoint::save_checkpoint;
30#[cfg(feature = "checkpoint")]
31use std::path::PathBuf;
32use std::sync::Arc;
33use std::time::Duration;
34use tokio::sync::Mutex;
35
36#[cfg(feature = "middleware-cookies")]
37use cookie_store::CookieStore;
38
39/// The central orchestrator for the web scraping process, handling requests, responses, items, concurrency, checkpointing, and statistics collection.
40pub struct Crawler<S: Spider, C> {
41    scheduler: Arc<Scheduler>,
42    req_rx: AsyncReceiver<Request>,
43    stats: Arc<StatCollector>,
44    downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
45    middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
46    spider: Arc<Mutex<S>>,
47    item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
48    max_concurrent_downloads: usize,
49    parser_workers: usize,
50    max_concurrent_pipelines: usize,
51    #[cfg(feature = "checkpoint")]
52    checkpoint_path: Option<PathBuf>,
53    #[cfg(feature = "checkpoint")]
54    checkpoint_interval: Option<Duration>,
55    #[cfg(feature = "middleware-cookies")]
56    pub cookie_store: Arc<Mutex<CookieStore>>,
57}
58
59impl<S, C> Crawler<S, C>
60where
61    S: Spider + 'static,
62    S::Item: ScrapedItem,
63    C: Send + Sync + Clone + 'static,
64{
65    /// Creates a new `Crawler` instance with the given components and configuration.
66    #[allow(clippy::too_many_arguments)]
67    pub(crate) fn new(
68        scheduler: Arc<Scheduler>,
69        req_rx: AsyncReceiver<Request>,
70        downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
71        middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
72        spider: S,
73        item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
74        max_concurrent_downloads: usize,
75        parser_workers: usize,
76        max_concurrent_pipelines: usize,
77        #[cfg(feature = "checkpoint")] checkpoint_path: Option<PathBuf>,
78        #[cfg(feature = "checkpoint")] checkpoint_interval: Option<Duration>,
79        stats: Arc<StatCollector>,
80        #[cfg(feature = "middleware-cookies")] cookie_store: Arc<Mutex<CookieStore>>,
81    ) -> Self {
82        Crawler {
83            scheduler,
84            req_rx,
85            stats,
86            downloader,
87            middlewares,
88            spider: Arc::new(Mutex::new(spider)),
89            item_pipelines,
90            max_concurrent_downloads,
91            parser_workers,
92            max_concurrent_pipelines,
93            #[cfg(feature = "checkpoint")]
94            checkpoint_path,
95            #[cfg(feature = "checkpoint")]
96            checkpoint_interval,
97            #[cfg(feature = "middleware-cookies")]
98            cookie_store,
99        }
100    }
101
102    /// Starts the crawl, orchestrating the scraping process, managing tasks, handling shutdown, checkpointing, and logging statistics.
103    pub async fn start_crawl(self) -> Result<(), SpiderError> {
104        info!("Crawler starting crawl");
105
106        let Crawler {
107            scheduler,
108            req_rx,
109            stats,
110            downloader,
111            middlewares,
112            spider,
113            item_pipelines,
114            max_concurrent_downloads,
115            parser_workers,
116            max_concurrent_pipelines,
117            #[cfg(feature = "checkpoint")]
118            checkpoint_path,
119            #[cfg(feature = "checkpoint")]
120            checkpoint_interval,
121            #[cfg(feature = "middleware-cookies")]
122            cookie_store,
123        } = self;
124
125        let state = CrawlerState::new();
126        let pipelines = Arc::new(item_pipelines);
127        // Calculate channel capacity based on concurrency settings
128        let channel_capacity = std::cmp::max(
129            max_concurrent_downloads * 2,
130            parser_workers * max_concurrent_pipelines,
131        )
132        .max(100);
133
134        let (res_tx, res_rx) = bounded_async(channel_capacity);
135        let (item_tx, item_rx) = bounded_async(channel_capacity);
136
137        let initial_requests_task =
138            spawn_initial_requests_task::<S>(scheduler.clone(), spider.clone(), stats.clone());
139
140        let middlewares_manager = super::SharedMiddlewareManager::new(middlewares);
141
142        let downloader_task = super::spawn_downloader_task::<S, C>(
143            scheduler.clone(),
144            req_rx,
145            downloader,
146            middlewares_manager,
147            state.clone(),
148            res_tx.clone(),
149            max_concurrent_downloads,
150            stats.clone(),
151        );
152
153        let parser_task = super::spawn_parser_task::<S>(
154            scheduler.clone(),
155            spider.clone(),
156            state.clone(),
157            res_rx,
158            item_tx.clone(),
159            parser_workers,
160            stats.clone(),
161        );
162
163        let item_processor_task = super::spawn_item_processor_task::<S>(
164            state.clone(),
165            item_rx,
166            pipelines.clone(),
167            max_concurrent_pipelines,
168            stats.clone(),
169        );
170
171        #[cfg(feature = "checkpoint")]
172        if let (Some(path), Some(interval)) = (&checkpoint_path, checkpoint_interval) {
173            let scheduler_clone = scheduler.clone();
174            let pipelines_clone = pipelines.clone();
175            let path_clone = path.clone();
176            #[cfg(feature = "middleware-cookies")]
177            let cookie_store_clone = cookie_store.clone();
178
179            tokio::spawn(async move {
180                let mut interval_timer = tokio::time::interval(interval);
181                interval_timer.tick().await;
182                loop {
183                    tokio::select! {
184                        _ = interval_timer.tick() => {
185                            if let Ok(scheduler_checkpoint) = scheduler_clone.snapshot().await {
186                                #[cfg(not(feature = "middleware-cookies"))]
187                                let save_result = save_checkpoint::<S>(&path_clone, scheduler_checkpoint, &pipelines_clone).await;
188                                #[cfg(feature = "middleware-cookies")]
189                                let save_result = save_checkpoint::<S>(&path_clone, scheduler_checkpoint, &pipelines_clone, &cookie_store_clone).await;
190
191                                if let Err(e) = save_result {
192                                    error!("Periodic checkpoint save failed: {}", e);
193                                }
194                            }
195                        }
196                    }
197                }
198            });
199        }
200
201        tokio::select! {
202            _ = tokio::signal::ctrl_c() => {
203                info!("Ctrl-C received, initiating graceful shutdown.");
204            }
205            _ = async {
206                loop {
207                    if scheduler.is_idle() && state.is_idle() {
208                        tokio::time::sleep(Duration::from_millis(50)).await;
209                        if scheduler.is_idle() && state.is_idle() {
210                            break;
211                        }
212                    }
213                    tokio::time::sleep(Duration::from_millis(100)).await;
214                }
215            } => {
216                info!("Crawl has become idle, initiating shutdown.");
217            }
218        }
219
220        info!("Initiating actor shutdowns.");
221
222        drop(res_tx);
223        drop(item_tx);
224
225        scheduler.shutdown().await?;
226
227        item_processor_task
228            .await
229            .map_err(|e| SpiderError::GeneralError(format!("Item processor task failed: {}", e)))?;
230
231        parser_task
232            .await
233            .map_err(|e| SpiderError::GeneralError(format!("Parser task failed: {}", e)))?;
234
235        downloader_task
236            .await
237            .map_err(|e| SpiderError::GeneralError(format!("Downloader task failed: {}", e)))?;
238
239        initial_requests_task.await.map_err(|e| {
240            SpiderError::GeneralError(format!("Initial requests task failed: {}", e))
241        })?;
242
243        #[cfg(feature = "checkpoint")]
244        if let Some(path) = &checkpoint_path {
245            let scheduler_checkpoint = scheduler.snapshot().await?;
246            #[cfg(not(feature = "middleware-cookies"))]
247            let result = save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines).await;
248            #[cfg(feature = "middleware-cookies")]
249            let result =
250                save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines, &cookie_store).await;
251
252            if let Err(e) = result {
253                error!("Final checkpoint save failed: {}", e);
254            }
255        }
256
257        info!("Closing item pipelines...");
258        let closing_futures: Vec<_> = pipelines.iter().map(|p| p.close()).collect();
259        join_all(closing_futures).await;
260
261        info!("Crawl finished successfully.");
262        Ok(())
263    }
264
265    /// Returns a cloned Arc to the `StatCollector` instance used by this crawler.
266    ///
267    /// This allows programmatic access to the collected statistics at any time during or after the crawl.
268    pub fn get_stats(&self) -> Arc<StatCollector> {
269        Arc::clone(&self.stats)
270    }
271}
272
273fn spawn_initial_requests_task<S>(
274    scheduler: Arc<Scheduler>,
275    spider: Arc<Mutex<S>>,
276    stats: Arc<StatCollector>,
277) -> tokio::task::JoinHandle<()>
278where
279    S: Spider + 'static,
280    S::Item: ScrapedItem,
281{
282    tokio::spawn(async move {
283        match spider.lock().await.start_requests() {
284            Ok(requests) => {
285                for mut req in requests {
286                    req.url.set_fragment(None);
287                    match scheduler.enqueue_request(req).await {
288                        Ok(_) => {
289                            stats.increment_requests_enqueued();
290                        }
291                        Err(e) => {
292                            error!("Failed to enqueue initial request: {}", e);
293                        }
294                    }
295                }
296            }
297            Err(e) => error!("Failed to create start requests: {}", e),
298        }
299    })
300}