Skip to main content

spider_core/crawler/
core.rs

1//! The core Crawler implementation for the `spider-lib` framework.
2//!
3//! This module defines the `Crawler` struct, which acts as the central orchestrator
4//! for the web scraping process. It ties together the scheduler, downloader,
5//! middlewares, spiders, and item pipelines to execute a crawl. The crawler
6//! manages the lifecycle of requests and items, handles concurrency, supports
7//! checkpointing for fault tolerance, and collects statistics for monitoring.
8//!
9//! It utilizes a task-based asynchronous model, spawning distinct tasks for
10//! handling initial requests, downloading web pages, parsing responses, and
11//! processing scraped items.
12
13use crate::Downloader;
14use crate::scheduler::Scheduler;
15use crate::spider::Spider;
16use crate::state::CrawlerState;
17use crate::stats::StatCollector;
18use anyhow::Result;
19use futures_util::future::join_all;
20use kanal::{AsyncReceiver, bounded_async};
21use spider_middleware::middleware::Middleware;
22use spider_pipeline::pipeline::Pipeline;
23use spider_util::error::SpiderError;
24use spider_util::item::ScrapedItem;
25use spider_util::request::Request;
26use log::{debug, error, info, trace, warn};
27
28#[cfg(feature = "checkpoint")]
29use crate::checkpoint::save_checkpoint;
30#[cfg(feature = "checkpoint")]
31use std::path::PathBuf;
32
33use std::sync::Arc;
34use std::time::Duration;
35
36#[cfg(feature = "cookie-store")]
37use tokio::sync::RwLock;
38
39#[cfg(feature = "cookie-store")]
40use cookie_store::CookieStore;
41
42/// The central orchestrator for the web scraping process, handling requests, responses, items, concurrency, checkpointing, and statistics collection.
43pub struct Crawler<S: Spider, C> {
44    scheduler: Arc<Scheduler>,
45    req_rx: AsyncReceiver<Request>,
46    stats: Arc<StatCollector>,
47    downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
48    middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
49    spider: Arc<S>,
50    spider_state: Arc<S::State>,
51    item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
52    max_concurrent_downloads: usize,
53    parser_workers: usize,
54    max_concurrent_pipelines: usize,
55    channel_capacity: usize,
56    #[cfg(feature = "checkpoint")]
57    checkpoint_path: Option<PathBuf>,
58    #[cfg(feature = "checkpoint")]
59    checkpoint_interval: Option<Duration>,
60    #[cfg(feature = "cookie-store")]
61    pub cookie_store: Arc<RwLock<CookieStore>>,
62}
63
64impl<S, C> Crawler<S, C>
65where
66    S: Spider + 'static,
67    S::Item: ScrapedItem,
68    C: Send + Sync + Clone + 'static,
69{
70    /// Creates a new `Crawler` instance with the given components and configuration.
71    #[allow(clippy::too_many_arguments)]
72    pub(crate) fn new(
73        scheduler: Arc<Scheduler>,
74        req_rx: AsyncReceiver<Request>,
75        downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
76        middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
77        spider: S,
78        item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
79        max_concurrent_downloads: usize,
80        parser_workers: usize,
81        max_concurrent_pipelines: usize,
82        channel_capacity: usize,
83        #[cfg(feature = "checkpoint")] checkpoint_path: Option<PathBuf>,
84        #[cfg(feature = "checkpoint")] checkpoint_interval: Option<Duration>,
85        stats: Arc<StatCollector>,
86        #[cfg(feature = "cookie-store")] cookie_store: Arc<tokio::sync::RwLock<CookieStore>>,
87    ) -> Self {
88        Crawler {
89            scheduler,
90            req_rx,
91            stats,
92            downloader,
93            middlewares,
94            spider: Arc::new(spider),
95            spider_state: Arc::new(S::State::default()),
96            item_pipelines,
97            max_concurrent_downloads,
98            parser_workers,
99            max_concurrent_pipelines,
100            channel_capacity,
101            #[cfg(feature = "checkpoint")]
102            checkpoint_path,
103            #[cfg(feature = "checkpoint")]
104            checkpoint_interval,
105            #[cfg(feature = "cookie-store")]
106            cookie_store,
107        }
108    }
109
110    /// Starts the crawl, orchestrating the scraping process, managing tasks, handling shutdown, checkpointing, and logging statistics.
111    pub async fn start_crawl(self) -> Result<(), SpiderError> {
112        info!(
113            "Crawler starting crawl with configuration: max_concurrent_downloads={}, parser_workers={}, max_concurrent_pipelines={}",
114            self.max_concurrent_downloads, self.parser_workers, self.max_concurrent_pipelines
115        );
116
117        // Handle conditional fields based on features
118        #[cfg(feature = "checkpoint")]
119        let Crawler {
120            scheduler,
121            req_rx,
122            stats,
123            downloader,
124            middlewares,
125            spider,
126            spider_state,
127            item_pipelines,
128            max_concurrent_downloads,
129            parser_workers,
130            max_concurrent_pipelines,
131            channel_capacity: _,
132            checkpoint_path,
133            checkpoint_interval,
134            #[cfg(feature = "cookie-store")]
135                cookie_store: _,
136        } = self;
137
138        #[cfg(not(feature = "checkpoint"))]
139        let Crawler {
140            scheduler,
141            req_rx,
142            stats,
143            downloader,
144            middlewares,
145            spider,
146            spider_state,
147            item_pipelines,
148            max_concurrent_downloads,
149            parser_workers,
150            max_concurrent_pipelines,
151            channel_capacity: _,
152            #[cfg(feature = "cookie-store")]
153                cookie_store: _,
154        } = self;
155
156        let state = CrawlerState::new();
157        let pipelines = Arc::new(item_pipelines);
158
159        let adaptive_channel_capacity = std::cmp::max(
160            self.max_concurrent_downloads * 3,
161            self.parser_workers * self.max_concurrent_pipelines * 2,
162        )
163        .max(self.channel_capacity);
164
165        trace!(
166            "Creating communication channels with capacity: {}",
167            adaptive_channel_capacity
168        );
169        let (res_tx, res_rx) = bounded_async(adaptive_channel_capacity);
170        let (item_tx, item_rx) = bounded_async(adaptive_channel_capacity);
171
172        trace!("Spawning initial requests task");
173        let initial_requests_task =
174            spawn_initial_requests_task::<S>(scheduler.clone(), spider.clone(), stats.clone());
175
176        trace!("Initializing middleware manager");
177        let middlewares_manager = super::SharedMiddlewareManager::new(middlewares);
178
179        trace!("Spawning downloader task");
180        let downloader_task = super::spawn_downloader_task::<S, C>(
181            scheduler.clone(),
182            req_rx,
183            downloader,
184            middlewares_manager,
185            state.clone(),
186            res_tx.clone(),
187            max_concurrent_downloads,
188            stats.clone(),
189        );
190
191        trace!("Spawning parser task");
192        let parser_task = super::spawn_parser_task::<S>(
193            scheduler.clone(),
194            spider.clone(),
195            spider_state.clone(),
196            state.clone(),
197            res_rx,
198            item_tx.clone(),
199            parser_workers,
200            stats.clone(),
201        );
202
203        trace!("Spawning item processor task");
204        let item_processor_task = super::spawn_item_processor_task::<S>(
205            state.clone(),
206            item_rx,
207            pipelines.clone(),
208            max_concurrent_pipelines,
209            stats.clone(),
210        );
211
212        #[cfg(feature = "checkpoint")]
213        {
214            if let (Some(path), Some(interval)) = (&checkpoint_path, checkpoint_interval) {
215                let scheduler_clone = scheduler.clone();
216                let pipelines_clone = pipelines.clone();
217                let path_clone = path.clone();
218
219                #[cfg(feature = "cookie-store")]
220                let cookie_store_clone = self.cookie_store.clone();
221
222                #[cfg(not(feature = "cookie-store"))]
223                let _cookie_store_clone = ();
224
225                trace!(
226                    "Starting periodic checkpoint task with interval: {:?}",
227                    interval
228                );
229                tokio::spawn(async move {
230                    let mut interval_timer = tokio::time::interval(interval);
231                    interval_timer.tick().await;
232                    loop {
233                        tokio::select! {
234                            _ = interval_timer.tick() => {
235                                trace!("Checkpoint timer ticked, creating snapshot");
236                                if let Ok(scheduler_checkpoint) = scheduler_clone.snapshot().await {
237                                    debug!("Scheduler snapshot created, saving checkpoint to {:?}", path_clone);
238
239                                    #[cfg(feature = "cookie-store")]
240                                    let save_result = save_checkpoint::<S>(&path_clone, scheduler_checkpoint, &pipelines_clone, &cookie_store_clone).await;
241
242                                    #[cfg(not(feature = "cookie-store"))]
243                                    let save_result = save_checkpoint::<S>(&path_clone, scheduler_checkpoint, &pipelines_clone, &()).await;
244
245                                    if let Err(e) = save_result {
246                                        error!("Periodic checkpoint save failed: {}", e);
247                                    } else {
248                                        debug!("Periodic checkpoint saved successfully to {:?}", path_clone);
249                                    }
250                                } else {
251                                    warn!("Failed to create scheduler snapshot for checkpoint");
252                                }
253                            }
254                        }
255                    }
256                });
257            }
258        }
259
260        tokio::select! {
261            _ = tokio::signal::ctrl_c() => {
262                info!("Ctrl-C received, initiating graceful shutdown.");
263            }
264            _ = async {
265                loop {
266                    if scheduler.is_idle() && state.is_idle() {
267                        tokio::time::sleep(Duration::from_millis(50)).await;
268                        if scheduler.is_idle() && state.is_idle() {
269                            break;
270                        }
271                    }
272                    tokio::time::sleep(Duration::from_millis(100)).await;
273                }
274            } => {
275                info!("Crawl has become idle, initiating shutdown.");
276            }
277        };
278
279        trace!("Closing communication channels");
280        drop(res_tx);
281        drop(item_tx);
282
283        if let Err(e) = scheduler.shutdown().await {
284            error!("Error during scheduler shutdown: {}", e);
285        } else {
286            debug!("Scheduler shutdown initiated successfully");
287        }
288
289        let timeout_duration = Duration::from_secs(30);
290
291        let mut task_set = tokio::task::JoinSet::new();
292        task_set.spawn(item_processor_task);
293        task_set.spawn(parser_task);
294        task_set.spawn(downloader_task);
295        task_set.spawn(initial_requests_task);
296
297        let remaining_results = tokio::time::timeout(timeout_duration, async {
298            let mut results = Vec::new();
299            while let Some(result) = task_set.join_next().await {
300                results.push(result);
301            }
302            results
303        })
304        .await;
305
306        let task_results = match remaining_results {
307            Ok(results) => {
308                trace!("All tasks completed during shutdown");
309                results
310            }
311            Err(_) => {
312                warn!(
313                    "Tasks did not complete within timeout ({}s), aborting remaining tasks and continuing with shutdown...",
314                    timeout_duration.as_secs()
315                );
316                task_set.abort_all();
317
318                tokio::time::sleep(Duration::from_millis(100)).await;
319
320                Vec::new()
321            }
322        };
323
324        for result in task_results {
325            if let Err(e) = result {
326                error!("Task failed during shutdown: {}", e);
327            } else {
328                trace!("Task completed successfully during shutdown");
329            }
330        }
331
332        #[cfg(feature = "checkpoint")]
333        {
334            if let Some(path) = &checkpoint_path {
335                debug!("Creating final checkpoint at {:?}", path);
336                let scheduler_checkpoint = scheduler.snapshot().await?;
337
338                #[cfg(feature = "cookie-store")]
339                let result = save_checkpoint::<S>(
340                    path,
341                    scheduler_checkpoint,
342                    &pipelines,
343                    &self.cookie_store,
344                )
345                .await;
346
347                #[cfg(not(feature = "cookie-store"))]
348                let result =
349                    save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines, &()).await;
350
351                if let Err(e) = result {
352                    error!("Final checkpoint save failed: {}", e);
353                } else {
354                    info!("Final checkpoint saved successfully to {:?}", path);
355                }
356            }
357        }
358
359        info!("Closing item pipelines...");
360        let closing_futures: Vec<_> = pipelines.iter().map(|p| p.close()).collect();
361        join_all(closing_futures).await;
362        debug!("All item pipelines closed");
363
364        info!(
365            "Crawl finished successfully\n{}", stats
366        );
367        Ok(())
368    }
369
370    /// Returns a cloned Arc to the `StatCollector` instance used by this crawler.
371    ///
372    /// This allows programmatic access to the collected statistics at any time during or after the crawl.
373    pub fn get_stats(&self) -> Arc<StatCollector> {
374        Arc::clone(&self.stats)
375    }
376}
377
378fn spawn_initial_requests_task<S>(
379    scheduler: Arc<Scheduler>,
380    spider: Arc<S>,
381    stats: Arc<StatCollector>,
382) -> tokio::task::JoinHandle<()>
383where
384    S: Spider + 'static,
385    S::Item: ScrapedItem,
386{
387    tokio::spawn(async move {
388        match spider.start_requests() {
389            Ok(requests) => {
390                for mut req in requests {
391                    req.url.set_fragment(None);
392                    match scheduler.enqueue_request(req).await {
393                        Ok(_) => {
394                            stats.increment_requests_enqueued();
395                        }
396                        Err(e) => {
397                            error!("Failed to enqueue initial request: {}", e);
398                        }
399                    }
400                }
401            }
402            Err(e) => error!("Failed to create start requests: {}", e),
403        }
404    })
405}