Skip to main content

spider_core/crawler/
core.rs

1//! The core Crawler implementation for the `spider-lib` framework.
2//!
3//! This module defines the `Crawler` struct, which acts as the central orchestrator
4//! for the web scraping process. It ties together the scheduler, downloader,
5//! middlewares, spiders, and item pipelines to execute a crawl. The crawler
6//! manages the lifecycle of requests and items, handles concurrency, supports
7//! checkpointing for fault tolerance, and collects statistics for monitoring.
8//!
9//! It utilizes a task-based asynchronous model, spawning distinct tasks for
10//! handling initial requests, downloading web pages, parsing responses, and
11//! processing scraped items.
12
13use spider_util::error::SpiderError;
14use spider_util::item::ScrapedItem;
15use spider_middleware::middleware::Middleware;
16use spider_pipeline::pipeline::Pipeline;
17use spider_util::request::Request;
18use crate::scheduler::Scheduler;
19use crate::spider::Spider;
20use crate::Downloader;
21use crate::state::CrawlerState;
22use crate::stats::StatCollector;
23use anyhow::Result;
24use futures_util::future::join_all;
25use kanal::{AsyncReceiver, bounded_async};
26use tracing::{debug, error, info, trace, warn};
27
28#[cfg(feature = "checkpoint")]
29use crate::checkpoint::save_checkpoint;
30#[cfg(feature = "checkpoint")]
31use std::path::PathBuf;
32
33use std::sync::{
34    Arc,
35    atomic::Ordering,
36};
37use std::time::Duration;
38use tokio::sync::Mutex;
39
40#[cfg(feature = "cookie-store")]
41use tokio::sync::RwLock;
42
43#[cfg(feature = "cookie-store")]
44use cookie_store::CookieStore;
45
46/// The central orchestrator for the web scraping process, handling requests, responses, items, concurrency, checkpointing, and statistics collection.
47pub struct Crawler<S: Spider, C> {
48    scheduler: Arc<Scheduler>,
49    req_rx: AsyncReceiver<Request>,
50    stats: Arc<StatCollector>,
51    downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
52    middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
53    spider: Arc<Mutex<S>>,
54    item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
55    max_concurrent_downloads: usize,
56    parser_workers: usize,
57    max_concurrent_pipelines: usize,
58    channel_capacity: usize,
59    #[cfg(feature = "checkpoint")]
60    checkpoint_path: Option<PathBuf>,
61    #[cfg(feature = "checkpoint")]
62    checkpoint_interval: Option<Duration>,
63    #[cfg(feature = "cookie-store")]
64    pub cookie_store: Arc<RwLock<CookieStore>>,
65}
66
67impl<S, C> Crawler<S, C>
68where
69    S: Spider + 'static,
70    S::Item: ScrapedItem,
71    C: Send + Sync + Clone + 'static,
72{
73    /// Creates a new `Crawler` instance with the given components and configuration.
74    #[allow(clippy::too_many_arguments)]
75    pub(crate) fn new(
76        scheduler: Arc<Scheduler>,
77        req_rx: AsyncReceiver<Request>,
78        downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
79        middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
80        spider: S,
81        item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
82        max_concurrent_downloads: usize,
83        parser_workers: usize,
84        max_concurrent_pipelines: usize,
85        channel_capacity: usize,
86        #[cfg(feature = "checkpoint")] checkpoint_path: Option<PathBuf>,
87        #[cfg(feature = "checkpoint")] checkpoint_interval: Option<Duration>,
88        stats: Arc<StatCollector>,
89        #[cfg(feature = "cookie-store")] cookie_store: Arc<tokio::sync::RwLock<CookieStore>>,
90    ) -> Self {
91        Crawler {
92            scheduler,
93            req_rx,
94            stats,
95            downloader,
96            middlewares,
97            spider: Arc::new(Mutex::new(spider)),
98            item_pipelines,
99            max_concurrent_downloads,
100            parser_workers,
101            max_concurrent_pipelines,
102            channel_capacity,
103            #[cfg(feature = "checkpoint")]
104            checkpoint_path,
105            #[cfg(feature = "checkpoint")]
106            checkpoint_interval,
107            #[cfg(feature = "cookie-store")]
108            cookie_store,
109        }
110    }
111
112    /// Starts the crawl, orchestrating the scraping process, managing tasks, handling shutdown, checkpointing, and logging statistics.
113    pub async fn start_crawl(self) -> Result<(), SpiderError> {
114        info!(
115            "Crawler starting crawl with configuration: max_concurrent_downloads={}, parser_workers={}, max_concurrent_pipelines={}",
116            self.max_concurrent_downloads, self.parser_workers, self.max_concurrent_pipelines
117        );
118
119        // Handle conditional fields based on features
120        #[cfg(feature = "checkpoint")]
121        let Crawler {
122            scheduler,
123            req_rx,
124            stats,
125            downloader,
126            middlewares,
127            spider,
128            item_pipelines,
129            max_concurrent_downloads,
130            parser_workers,
131            max_concurrent_pipelines,
132            channel_capacity: _, // We don't need to use this value here
133            checkpoint_path,
134            checkpoint_interval,
135            #[cfg(feature = "cookie-store")]
136            cookie_store: _,
137        } = self;
138        
139        #[cfg(not(feature = "checkpoint"))]
140        let Crawler {
141            scheduler,
142            req_rx,
143            stats,
144            downloader,
145            middlewares,
146            spider,
147            item_pipelines,
148            max_concurrent_downloads,
149            parser_workers,
150            max_concurrent_pipelines,
151            channel_capacity: _, // We don't need to use this value here
152            #[cfg(feature = "cookie-store")]
153            cookie_store: _,
154        } = self;
155
156        let state = CrawlerState::new();
157        let pipelines = Arc::new(item_pipelines);
158
159        let adaptive_channel_capacity = std::cmp::max(
160            self.max_concurrent_downloads * 3,
161            self.parser_workers * self.max_concurrent_pipelines * 2,
162        )
163        .max(self.channel_capacity);
164
165        trace!(
166            "Creating communication channels with capacity: {}",
167            adaptive_channel_capacity
168        );
169        let (res_tx, res_rx) = bounded_async(adaptive_channel_capacity);
170        let (item_tx, item_rx) = bounded_async(adaptive_channel_capacity);
171
172        trace!("Spawning initial requests task");
173        let initial_requests_task =
174            spawn_initial_requests_task::<S>(scheduler.clone(), spider.clone(), stats.clone());
175
176        trace!("Initializing middleware manager");
177        let middlewares_manager = super::SharedMiddlewareManager::new(middlewares);
178
179        trace!("Spawning downloader task");
180        let downloader_task = super::spawn_downloader_task::<S, C>(
181            scheduler.clone(),
182            req_rx,
183            downloader,
184            middlewares_manager,
185            state.clone(),
186            res_tx.clone(),
187            max_concurrent_downloads,
188            stats.clone(),
189        );
190
191        trace!("Spawning parser task");
192        let parser_task = super::spawn_parser_task::<S>(
193            scheduler.clone(),
194            spider.clone(),
195            state.clone(),
196            res_rx,
197            item_tx.clone(),
198            parser_workers,
199            stats.clone(),
200        );
201
202        trace!("Spawning item processor task");
203        let item_processor_task = super::spawn_item_processor_task::<S>(
204            state.clone(),
205            item_rx,
206            pipelines.clone(),
207            max_concurrent_pipelines,
208            stats.clone(),
209        );
210
211        #[cfg(feature = "checkpoint")]
212        {
213            if let (Some(path), Some(interval)) = (&checkpoint_path, checkpoint_interval) {
214                let scheduler_clone = scheduler.clone();
215                let pipelines_clone = pipelines.clone();
216                let path_clone = path.clone();
217                
218                #[cfg(feature = "cookie-store")]
219                let cookie_store_clone = self.cookie_store.clone();
220                
221                #[cfg(not(feature = "cookie-store"))]
222                let _cookie_store_clone = ();
223
224                trace!(
225                    "Starting periodic checkpoint task with interval: {:?}",
226                    interval
227                );
228                tokio::spawn(async move {
229                    let mut interval_timer = tokio::time::interval(interval);
230                    interval_timer.tick().await;
231                    loop {
232                        tokio::select! {
233                            _ = interval_timer.tick() => {
234                                trace!("Checkpoint timer ticked, creating snapshot");
235                                if let Ok(scheduler_checkpoint) = scheduler_clone.snapshot().await {
236                                    debug!("Scheduler snapshot created, saving checkpoint to {:?}", path_clone);
237                                    
238                                    #[cfg(feature = "cookie-store")]
239                                    let save_result = save_checkpoint::<S>(&path_clone, scheduler_checkpoint, &pipelines_clone, &cookie_store_clone).await;
240                                    
241                                    #[cfg(not(feature = "cookie-store"))]
242                                    let save_result = save_checkpoint::<S>(&path_clone, scheduler_checkpoint, &pipelines_clone, &()).await;
243
244                                    if let Err(e) = save_result {
245                                        error!("Periodic checkpoint save failed: {}", e);
246                                    } else {
247                                        debug!("Periodic checkpoint saved successfully to {:?}", path_clone);
248                                    }
249                                } else {
250                                    warn!("Failed to create scheduler snapshot for checkpoint");
251                                }
252                            }
253                        }
254                    }
255                });
256            }
257        }
258
259        tokio::select! {
260            _ = tokio::signal::ctrl_c() => {
261                info!("Ctrl-C received, initiating graceful shutdown.");
262            }
263            _ = async {
264                loop {
265                    if scheduler.is_idle() && state.is_idle() {
266                        tokio::time::sleep(Duration::from_millis(50)).await;
267                        if scheduler.is_idle() && state.is_idle() {
268                            break;
269                        }
270                    }
271                    tokio::time::sleep(Duration::from_millis(100)).await;
272                }
273            } => {
274                info!("Crawl has become idle, initiating shutdown.");
275            }
276        };
277
278        trace!("Closing communication channels");
279        drop(res_tx);
280        drop(item_tx);
281
282        if let Err(e) = scheduler.shutdown().await {
283            error!("Error during scheduler shutdown: {}", e);
284        } else {
285            debug!("Scheduler shutdown initiated successfully");
286        }
287
288        let timeout_duration = Duration::from_secs(30); // Default timeout of 30 seconds
289
290        let mut task_set = tokio::task::JoinSet::new();
291        task_set.spawn(item_processor_task);
292        task_set.spawn(parser_task);
293        task_set.spawn(downloader_task);
294        task_set.spawn(initial_requests_task);
295
296        let remaining_results = tokio::time::timeout(timeout_duration, async {
297            let mut results = Vec::new();
298            while let Some(result) = task_set.join_next().await {
299                results.push(result);
300            }
301            results
302        })
303        .await;
304
305        let task_results = match remaining_results {
306            Ok(results) => {
307                trace!("All tasks completed during shutdown");
308                results
309            }
310            Err(_) => {
311                warn!(
312                    "Tasks did not complete within timeout ({}s), aborting remaining tasks and continuing with shutdown...",
313                    timeout_duration.as_secs()
314                );
315                task_set.abort_all();
316
317                tokio::time::sleep(Duration::from_millis(100)).await;
318
319                Vec::new()
320            }
321        };
322
323        for result in task_results {
324            if let Err(e) = result {
325                error!("Task failed during shutdown: {}", e);
326            } else {
327                trace!("Task completed successfully during shutdown");
328            }
329        }
330
331        #[cfg(feature = "checkpoint")]
332        {
333            if let Some(path) = &checkpoint_path {
334                debug!("Creating final checkpoint at {:?}", path);
335                let scheduler_checkpoint = scheduler.snapshot().await?;
336                
337                #[cfg(feature = "cookie-store")]
338                let result = save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines, &self.cookie_store).await;
339                
340                #[cfg(not(feature = "cookie-store"))]
341                let result = save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines, &()).await;
342
343                if let Err(e) = result {
344                    error!("Final checkpoint save failed: {}", e);
345                } else {
346                    info!("Final checkpoint saved successfully to {:?}", path);
347                }
348            }
349        }
350
351        info!("Closing item pipelines...");
352        let closing_futures: Vec<_> = pipelines.iter().map(|p| p.close()).collect();
353        join_all(closing_futures).await;
354        debug!("All item pipelines closed");
355
356        info!(
357            "Crawl finished successfully. Stats: requests_enqueued={}, requests_succeeded={}, items_scraped={}",
358            stats.requests_enqueued.load(Ordering::SeqCst),
359            stats.requests_succeeded.load(Ordering::SeqCst),
360            stats.items_scraped.load(Ordering::SeqCst)
361        );
362        Ok(())
363    }
364
365    /// Returns a cloned Arc to the `StatCollector` instance used by this crawler.
366    ///
367    /// This allows programmatic access to the collected statistics at any time during or after the crawl.
368    pub fn get_stats(&self) -> Arc<StatCollector> {
369        Arc::clone(&self.stats)
370    }
371}
372
373fn spawn_initial_requests_task<S>(
374    scheduler: Arc<Scheduler>,
375    spider: Arc<Mutex<S>>,
376    stats: Arc<StatCollector>,
377) -> tokio::task::JoinHandle<()>
378where
379    S: Spider + 'static,
380    S::Item: ScrapedItem,
381{
382    tokio::spawn(async move {
383        match spider.lock().await.start_requests() {
384            Ok(requests) => {
385                for mut req in requests {
386                    req.url.set_fragment(None);
387                    match scheduler.enqueue_request(req).await {
388                        Ok(_) => {
389                            stats.increment_requests_enqueued();
390                        }
391                        Err(e) => {
392                            error!("Failed to enqueue initial request: {}", e);
393                        }
394                    }
395                }
396            }
397            Err(e) => error!("Failed to create start requests: {}", e),
398        }
399    })
400}