Skip to main content

spider_core/crawler/
core.rs

1//! The core Crawler implementation for the `spider-lib` framework.
2//!
3//! This module defines the `Crawler` struct, which acts as the central orchestrator
4//! for the web scraping process. It ties together the scheduler, downloader,
5//! middlewares, spiders, and item pipelines to execute a crawl. The crawler
6//! manages the lifecycle of requests and items, handles concurrency, supports
7//! checkpointing for fault tolerance, and collects statistics for monitoring.
8//!
9//! It utilizes a task-based asynchronous model, spawning distinct tasks for
10//! handling initial requests, downloading web pages, parsing responses, and
11//! processing scraped items.
12
13use crate::Downloader;
14use crate::scheduler::Scheduler;
15use crate::spider::Spider;
16use crate::state::CrawlerState;
17use crate::stats::StatCollector;
18use anyhow::Result;
19use futures_util::future::join_all;
20use kanal::{AsyncReceiver, bounded_async};
21use spider_middleware::middleware::Middleware;
22use spider_pipeline::pipeline::Pipeline;
23use spider_util::error::SpiderError;
24use spider_util::item::ScrapedItem;
25use spider_util::request::Request;
26use tracing::{debug, error, info, trace, warn};
27
28#[cfg(feature = "checkpoint")]
29use crate::checkpoint::save_checkpoint;
30#[cfg(feature = "checkpoint")]
31use std::path::PathBuf;
32
33use std::sync::{Arc, atomic::Ordering};
34use std::time::Duration;
35use tokio::sync::Mutex;
36
37#[cfg(feature = "cookie-store")]
38use tokio::sync::RwLock;
39
40#[cfg(feature = "cookie-store")]
41use cookie_store::CookieStore;
42
43/// The central orchestrator for the web scraping process, handling requests, responses, items, concurrency, checkpointing, and statistics collection.
44pub struct Crawler<S: Spider, C> {
45    scheduler: Arc<Scheduler>,
46    req_rx: AsyncReceiver<Request>,
47    stats: Arc<StatCollector>,
48    downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
49    middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
50    spider: Arc<Mutex<S>>,
51    item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
52    max_concurrent_downloads: usize,
53    parser_workers: usize,
54    max_concurrent_pipelines: usize,
55    channel_capacity: usize,
56    #[cfg(feature = "checkpoint")]
57    checkpoint_path: Option<PathBuf>,
58    #[cfg(feature = "checkpoint")]
59    checkpoint_interval: Option<Duration>,
60    #[cfg(feature = "cookie-store")]
61    pub cookie_store: Arc<RwLock<CookieStore>>,
62}
63
64impl<S, C> Crawler<S, C>
65where
66    S: Spider + 'static,
67    S::Item: ScrapedItem,
68    C: Send + Sync + Clone + 'static,
69{
70    /// Creates a new `Crawler` instance with the given components and configuration.
71    #[allow(clippy::too_many_arguments)]
72    pub(crate) fn new(
73        scheduler: Arc<Scheduler>,
74        req_rx: AsyncReceiver<Request>,
75        downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
76        middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
77        spider: S,
78        item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
79        max_concurrent_downloads: usize,
80        parser_workers: usize,
81        max_concurrent_pipelines: usize,
82        channel_capacity: usize,
83        #[cfg(feature = "checkpoint")] checkpoint_path: Option<PathBuf>,
84        #[cfg(feature = "checkpoint")] checkpoint_interval: Option<Duration>,
85        stats: Arc<StatCollector>,
86        #[cfg(feature = "cookie-store")] cookie_store: Arc<tokio::sync::RwLock<CookieStore>>,
87    ) -> Self {
88        Crawler {
89            scheduler,
90            req_rx,
91            stats,
92            downloader,
93            middlewares,
94            spider: Arc::new(Mutex::new(spider)),
95            item_pipelines,
96            max_concurrent_downloads,
97            parser_workers,
98            max_concurrent_pipelines,
99            channel_capacity,
100            #[cfg(feature = "checkpoint")]
101            checkpoint_path,
102            #[cfg(feature = "checkpoint")]
103            checkpoint_interval,
104            #[cfg(feature = "cookie-store")]
105            cookie_store,
106        }
107    }
108
109    /// Starts the crawl, orchestrating the scraping process, managing tasks, handling shutdown, checkpointing, and logging statistics.
110    pub async fn start_crawl(self) -> Result<(), SpiderError> {
111        info!(
112            "Crawler starting crawl with configuration: max_concurrent_downloads={}, parser_workers={}, max_concurrent_pipelines={}",
113            self.max_concurrent_downloads, self.parser_workers, self.max_concurrent_pipelines
114        );
115
116        // Handle conditional fields based on features
117        #[cfg(feature = "checkpoint")]
118        let Crawler {
119            scheduler,
120            req_rx,
121            stats,
122            downloader,
123            middlewares,
124            spider,
125            item_pipelines,
126            max_concurrent_downloads,
127            parser_workers,
128            max_concurrent_pipelines,
129            channel_capacity: _,
130            checkpoint_path,
131            checkpoint_interval,
132            #[cfg(feature = "cookie-store")]
133                cookie_store: _,
134        } = self;
135
136        #[cfg(not(feature = "checkpoint"))]
137        let Crawler {
138            scheduler,
139            req_rx,
140            stats,
141            downloader,
142            middlewares,
143            spider,
144            item_pipelines,
145            max_concurrent_downloads,
146            parser_workers,
147            max_concurrent_pipelines,
148            channel_capacity: _,
149            #[cfg(feature = "cookie-store")]
150                cookie_store: _,
151        } = self;
152
153        let state = CrawlerState::new();
154        let pipelines = Arc::new(item_pipelines);
155
156        let adaptive_channel_capacity = std::cmp::max(
157            self.max_concurrent_downloads * 3,
158            self.parser_workers * self.max_concurrent_pipelines * 2,
159        )
160        .max(self.channel_capacity);
161
162        trace!(
163            "Creating communication channels with capacity: {}",
164            adaptive_channel_capacity
165        );
166        let (res_tx, res_rx) = bounded_async(adaptive_channel_capacity);
167        let (item_tx, item_rx) = bounded_async(adaptive_channel_capacity);
168
169        trace!("Spawning initial requests task");
170        let initial_requests_task =
171            spawn_initial_requests_task::<S>(scheduler.clone(), spider.clone(), stats.clone());
172
173        trace!("Initializing middleware manager");
174        let middlewares_manager = super::SharedMiddlewareManager::new(middlewares);
175
176        trace!("Spawning downloader task");
177        let downloader_task = super::spawn_downloader_task::<S, C>(
178            scheduler.clone(),
179            req_rx,
180            downloader,
181            middlewares_manager,
182            state.clone(),
183            res_tx.clone(),
184            max_concurrent_downloads,
185            stats.clone(),
186        );
187
188        trace!("Spawning parser task");
189        let parser_task = super::spawn_parser_task::<S>(
190            scheduler.clone(),
191            spider.clone(),
192            state.clone(),
193            res_rx,
194            item_tx.clone(),
195            parser_workers,
196            stats.clone(),
197        );
198
199        trace!("Spawning item processor task");
200        let item_processor_task = super::spawn_item_processor_task::<S>(
201            state.clone(),
202            item_rx,
203            pipelines.clone(),
204            max_concurrent_pipelines,
205            stats.clone(),
206        );
207
208        #[cfg(feature = "checkpoint")]
209        {
210            if let (Some(path), Some(interval)) = (&checkpoint_path, checkpoint_interval) {
211                let scheduler_clone = scheduler.clone();
212                let pipelines_clone = pipelines.clone();
213                let path_clone = path.clone();
214
215                #[cfg(feature = "cookie-store")]
216                let cookie_store_clone = self.cookie_store.clone();
217
218                #[cfg(not(feature = "cookie-store"))]
219                let _cookie_store_clone = ();
220
221                trace!(
222                    "Starting periodic checkpoint task with interval: {:?}",
223                    interval
224                );
225                tokio::spawn(async move {
226                    let mut interval_timer = tokio::time::interval(interval);
227                    interval_timer.tick().await;
228                    loop {
229                        tokio::select! {
230                            _ = interval_timer.tick() => {
231                                trace!("Checkpoint timer ticked, creating snapshot");
232                                if let Ok(scheduler_checkpoint) = scheduler_clone.snapshot().await {
233                                    debug!("Scheduler snapshot created, saving checkpoint to {:?}", path_clone);
234
235                                    #[cfg(feature = "cookie-store")]
236                                    let save_result = save_checkpoint::<S>(&path_clone, scheduler_checkpoint, &pipelines_clone, &cookie_store_clone).await;
237
238                                    #[cfg(not(feature = "cookie-store"))]
239                                    let save_result = save_checkpoint::<S>(&path_clone, scheduler_checkpoint, &pipelines_clone, &()).await;
240
241                                    if let Err(e) = save_result {
242                                        error!("Periodic checkpoint save failed: {}", e);
243                                    } else {
244                                        debug!("Periodic checkpoint saved successfully to {:?}", path_clone);
245                                    }
246                                } else {
247                                    warn!("Failed to create scheduler snapshot for checkpoint");
248                                }
249                            }
250                        }
251                    }
252                });
253            }
254        }
255
256        tokio::select! {
257            _ = tokio::signal::ctrl_c() => {
258                info!("Ctrl-C received, initiating graceful shutdown.");
259            }
260            _ = async {
261                loop {
262                    if scheduler.is_idle() && state.is_idle() {
263                        tokio::time::sleep(Duration::from_millis(50)).await;
264                        if scheduler.is_idle() && state.is_idle() {
265                            break;
266                        }
267                    }
268                    tokio::time::sleep(Duration::from_millis(100)).await;
269                }
270            } => {
271                info!("Crawl has become idle, initiating shutdown.");
272            }
273        };
274
275        trace!("Closing communication channels");
276        drop(res_tx);
277        drop(item_tx);
278
279        if let Err(e) = scheduler.shutdown().await {
280            error!("Error during scheduler shutdown: {}", e);
281        } else {
282            debug!("Scheduler shutdown initiated successfully");
283        }
284
285        let timeout_duration = Duration::from_secs(30);
286
287        let mut task_set = tokio::task::JoinSet::new();
288        task_set.spawn(item_processor_task);
289        task_set.spawn(parser_task);
290        task_set.spawn(downloader_task);
291        task_set.spawn(initial_requests_task);
292
293        let remaining_results = tokio::time::timeout(timeout_duration, async {
294            let mut results = Vec::new();
295            while let Some(result) = task_set.join_next().await {
296                results.push(result);
297            }
298            results
299        })
300        .await;
301
302        let task_results = match remaining_results {
303            Ok(results) => {
304                trace!("All tasks completed during shutdown");
305                results
306            }
307            Err(_) => {
308                warn!(
309                    "Tasks did not complete within timeout ({}s), aborting remaining tasks and continuing with shutdown...",
310                    timeout_duration.as_secs()
311                );
312                task_set.abort_all();
313
314                tokio::time::sleep(Duration::from_millis(100)).await;
315
316                Vec::new()
317            }
318        };
319
320        for result in task_results {
321            if let Err(e) = result {
322                error!("Task failed during shutdown: {}", e);
323            } else {
324                trace!("Task completed successfully during shutdown");
325            }
326        }
327
328        #[cfg(feature = "checkpoint")]
329        {
330            if let Some(path) = &checkpoint_path {
331                debug!("Creating final checkpoint at {:?}", path);
332                let scheduler_checkpoint = scheduler.snapshot().await?;
333
334                #[cfg(feature = "cookie-store")]
335                let result = save_checkpoint::<S>(
336                    path,
337                    scheduler_checkpoint,
338                    &pipelines,
339                    &self.cookie_store,
340                )
341                .await;
342
343                #[cfg(not(feature = "cookie-store"))]
344                let result =
345                    save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines, &()).await;
346
347                if let Err(e) = result {
348                    error!("Final checkpoint save failed: {}", e);
349                } else {
350                    info!("Final checkpoint saved successfully to {:?}", path);
351                }
352            }
353        }
354
355        info!("Closing item pipelines...");
356        let closing_futures: Vec<_> = pipelines.iter().map(|p| p.close()).collect();
357        join_all(closing_futures).await;
358        debug!("All item pipelines closed");
359
360        info!(
361            "Crawl finished successfully. Stats: requests_enqueued={}, requests_succeeded={}, items_scraped={}",
362            stats.requests_enqueued.load(Ordering::SeqCst),
363            stats.requests_succeeded.load(Ordering::SeqCst),
364            stats.items_scraped.load(Ordering::SeqCst)
365        );
366        Ok(())
367    }
368
369    /// Returns a cloned Arc to the `StatCollector` instance used by this crawler.
370    ///
371    /// This allows programmatic access to the collected statistics at any time during or after the crawl.
372    pub fn get_stats(&self) -> Arc<StatCollector> {
373        Arc::clone(&self.stats)
374    }
375}
376
377fn spawn_initial_requests_task<S>(
378    scheduler: Arc<Scheduler>,
379    spider: Arc<Mutex<S>>,
380    stats: Arc<StatCollector>,
381) -> tokio::task::JoinHandle<()>
382where
383    S: Spider + 'static,
384    S::Item: ScrapedItem,
385{
386    tokio::spawn(async move {
387        match spider.lock().await.start_requests() {
388            Ok(requests) => {
389                for mut req in requests {
390                    req.url.set_fragment(None);
391                    match scheduler.enqueue_request(req).await {
392                        Ok(_) => {
393                            stats.increment_requests_enqueued();
394                        }
395                        Err(e) => {
396                            error!("Failed to enqueue initial request: {}", e);
397                        }
398                    }
399                }
400            }
401            Err(e) => error!("Failed to create start requests: {}", e),
402        }
403    })
404}