Skip to main content

spider_core/engine/
crawler.rs

1//! The core Crawler implementation for the `spider-lib` framework.
2//!
3//! This module defines the `Crawler` struct, which acts as the central orchestrator
4//! for the web scraping process. It ties together the scheduler, downloader,
5//! middlewares, spiders, and item pipelines to execute a crawl. The crawler
6//! manages the lifecycle of requests and items, handles concurrency, supports
7//! checkpointing for fault tolerance, and collects statistics for monitoring.
8//!
9//! It utilizes a task-based asynchronous model, spawning distinct tasks for
10//! handling initial requests, downloading web pages, parsing responses, and
11//! processing scraped items.
12
13use crate::Downloader;
14use crate::scheduler::Scheduler;
15use crate::spider::Spider;
16use crate::state::CrawlerState;
17use crate::stats::StatCollector;
18use crate::engine::CrawlerContext;
19use anyhow::Result;
20use futures_util::future::join_all;
21use kanal::{AsyncReceiver, bounded_async};
22use spider_middleware::middleware::Middleware;
23use spider_pipeline::pipeline::Pipeline;
24use spider_util::error::SpiderError;
25use spider_util::item::ScrapedItem;
26use spider_util::request::Request;
27use log::{debug, error, info, trace, warn};
28
29#[cfg(feature = "checkpoint")]
30use crate::checkpoint::save_checkpoint;
31#[cfg(feature = "checkpoint")]
32use std::path::PathBuf;
33
34use std::sync::Arc;
35use std::time::Duration;
36
37#[cfg(feature = "cookie-store")]
38use tokio::sync::RwLock;
39
40#[cfg(feature = "cookie-store")]
41use cookie_store::CookieStore;
42
43/// The central orchestrator for the web scraping process, handling requests, responses, items, concurrency, checkpointing, and statistics collection.
44pub struct Crawler<S: Spider, C> {
45    scheduler: Arc<Scheduler>,
46    req_rx: AsyncReceiver<Request>,
47    stats: Arc<StatCollector>,
48    downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
49    middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
50    spider: Arc<S>,
51    spider_state: Arc<S::State>,
52    pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
53    max_concurrent_downloads: usize,
54    parser_workers: usize,
55    max_concurrent_pipelines: usize,
56    channel_capacity: usize,
57    #[cfg(feature = "checkpoint")]
58    checkpoint_path: Option<PathBuf>,
59    #[cfg(feature = "checkpoint")]
60    checkpoint_interval: Option<Duration>,
61    #[cfg(feature = "cookie-store")]
62    pub cookie_store: Arc<RwLock<CookieStore>>,
63}
64
65impl<S, C> Crawler<S, C>
66where
67    S: Spider + 'static,
68    S::Item: ScrapedItem,
69    C: Send + Sync + Clone + 'static,
70{
71    #[allow(clippy::too_many_arguments)]
72    pub(crate) fn new(
73        scheduler: Arc<Scheduler>,
74        req_rx: AsyncReceiver<Request>,
75        downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
76        middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
77        spider: S,
78        pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
79        max_concurrent_downloads: usize,
80        parser_workers: usize,
81        max_concurrent_pipelines: usize,
82        channel_capacity: usize,
83        #[cfg(feature = "checkpoint")] checkpoint_path: Option<PathBuf>,
84        #[cfg(feature = "checkpoint")] checkpoint_interval: Option<Duration>,
85        stats: Arc<StatCollector>,
86        #[cfg(feature = "cookie-store")] cookie_store: Arc<tokio::sync::RwLock<CookieStore>>,
87    ) -> Self {
88        Crawler {
89            scheduler,
90            req_rx,
91            stats,
92            downloader,
93            middlewares,
94            spider: Arc::new(spider),
95            spider_state: Arc::new(S::State::default()),
96            pipelines,
97            max_concurrent_downloads,
98            parser_workers,
99            max_concurrent_pipelines,
100            channel_capacity,
101            #[cfg(feature = "checkpoint")]
102            checkpoint_path,
103            #[cfg(feature = "checkpoint")]
104            checkpoint_interval,
105            #[cfg(feature = "cookie-store")]
106            cookie_store,
107        }
108    }
109
110    pub async fn start_crawl(self) -> Result<(), SpiderError> {
111        info!(
112            "Crawler starting crawl with configuration: max_concurrent_downloads={}, parser_workers={}, max_concurrent_pipelines={}",
113            self.max_concurrent_downloads, self.parser_workers, self.max_concurrent_pipelines
114        );
115
116        #[cfg(feature = "checkpoint")]
117        let Crawler {
118            scheduler,
119            req_rx,
120            stats,
121            downloader,
122            middlewares,
123            spider,
124            spider_state,
125            pipelines,
126            max_concurrent_downloads,
127            parser_workers,
128            max_concurrent_pipelines,
129            channel_capacity: _,
130            checkpoint_path,
131            checkpoint_interval,
132            #[cfg(feature = "cookie-store")]
133                cookie_store: _,
134        } = self;
135
136        #[cfg(not(feature = "checkpoint"))]
137        let Crawler {
138            scheduler,
139            req_rx,
140            stats,
141            downloader,
142            middlewares,
143            spider,
144            spider_state,
145            pipelines,
146            max_concurrent_downloads,
147            parser_workers,
148            max_concurrent_pipelines,
149            channel_capacity: _,
150            #[cfg(feature = "cookie-store")]
151                cookie_store: _,
152        } = self;
153
154        let state = CrawlerState::new();
155        let pipelines = Arc::new(pipelines);
156
157        // Create aggregated context for efficient sharing across tasks
158        let ctx = CrawlerContext::new(
159            Arc::clone(&scheduler),
160            Arc::clone(&stats),
161            Arc::clone(&spider),
162            Arc::clone(&spider_state),
163            Arc::clone(&pipelines),
164        );
165
166        let channel_capacity = std::cmp::max(
167            self.max_concurrent_downloads * 3,
168            self.parser_workers * self.max_concurrent_pipelines * 2,
169        )
170        .max(self.channel_capacity);
171
172        trace!("Creating communication channels with capacity: {}", channel_capacity);
173        let (res_tx, res_rx) = bounded_async(channel_capacity);
174        let (item_tx, item_rx) = bounded_async(channel_capacity);
175
176        trace!("Spawning initial requests task");
177        let init_task = spawn_init_task(ctx.clone());
178
179        trace!("Initializing middleware manager");
180        let middlewares = super::SharedMiddlewareManager::new(middlewares);
181
182        trace!("Spawning downloader task");
183        let downloader = super::spawn_downloader_task::<S, C>(
184            Arc::clone(&ctx.scheduler),
185            req_rx,
186            downloader,
187            middlewares,
188            state.clone(),
189            res_tx.clone(),
190            max_concurrent_downloads,
191            Arc::clone(&ctx.stats),
192        );
193
194        trace!("Spawning parser task");
195        let parser = super::spawn_parser_task::<S>(
196            Arc::clone(&ctx.scheduler),
197            Arc::clone(&ctx.spider),
198            Arc::clone(&ctx.spider_state),
199            state.clone(),
200            res_rx,
201            item_tx.clone(),
202            parser_workers,
203            Arc::clone(&ctx.stats),
204        );
205
206        trace!("Spawning item processor task");
207        let processor = super::spawn_item_processor_task::<S>(
208            state.clone(),
209            item_rx,
210            Arc::clone(&ctx.pipelines),
211            max_concurrent_pipelines,
212            Arc::clone(&ctx.stats),
213        );
214
215        #[cfg(feature = "checkpoint")]
216        {
217            if let (Some(path), Some(interval)) = (&checkpoint_path, checkpoint_interval) {
218                let scheduler_cp = Arc::clone(&ctx.scheduler);
219                let pipelines_cp = Arc::clone(&ctx.pipelines);
220                let path_cp = path.clone();
221
222                #[cfg(feature = "cookie-store")]
223                let cookie_store_cp = self.cookie_store.clone();
224
225                #[cfg(not(feature = "cookie-store"))]
226                let _cookie_store_cp = ();
227
228                trace!("Starting periodic checkpoint task with interval: {:?}", interval);
229                tokio::spawn(async move {
230                    let mut interval_timer = tokio::time::interval(interval);
231                    interval_timer.tick().await;
232                    loop {
233                        tokio::select! {
234                            _ = interval_timer.tick() => {
235                                trace!("Checkpoint timer ticked, creating snapshot");
236                                if let Ok(scheduler_checkpoint) = scheduler_cp.snapshot().await {
237                                    debug!("Scheduler snapshot created, saving checkpoint to {:?}", path_cp);
238
239                                    #[cfg(feature = "cookie-store")]
240                                    let save_result = save_checkpoint::<S>(&path_cp, scheduler_checkpoint, &pipelines_cp, &cookie_store_cp).await;
241
242                                    #[cfg(not(feature = "cookie-store"))]
243                                    let save_result = save_checkpoint::<S>(&path_cp, scheduler_checkpoint, &pipelines_cp, &()).await;
244
245                                    if let Err(e) = save_result {
246                                        error!("Periodic checkpoint save failed: {}", e);
247                                    } else {
248                                        debug!("Periodic checkpoint saved successfully to {:?}", path_cp);
249                                    }
250                                } else {
251                                    warn!("Failed to create scheduler snapshot for checkpoint");
252                                }
253                            }
254                        }
255                    }
256                });
257            }
258        }
259
260        tokio::select! {
261            _ = tokio::signal::ctrl_c() => {
262                info!("Ctrl-C received, initiating graceful shutdown.");
263            }
264            _ = async {
265                loop {
266                    if scheduler.is_idle() && state.is_idle() {
267                        tokio::time::sleep(Duration::from_millis(50)).await;
268                        if scheduler.is_idle() && state.is_idle() {
269                            break;
270                        }
271                    }
272                    tokio::time::sleep(Duration::from_millis(100)).await;
273                }
274            } => {
275                info!("Crawl has become idle, initiating shutdown.");
276            }
277        };
278
279        trace!("Closing communication channels");
280        drop(res_tx);
281        drop(item_tx);
282
283        if let Err(e) = scheduler.shutdown().await {
284            error!("Error during scheduler shutdown: {}", e);
285        } else {
286            debug!("Scheduler shutdown initiated successfully");
287        }
288
289        let timeout_duration = Duration::from_secs(30);
290
291        let mut tasks = tokio::task::JoinSet::new();
292        tasks.spawn(processor);
293        tasks.spawn(parser);
294        tasks.spawn(downloader);
295        tasks.spawn(init_task);
296
297        let results = tokio::time::timeout(timeout_duration, async {
298            let mut results = Vec::new();
299            while let Some(result) = tasks.join_next().await {
300                results.push(result);
301            }
302            results
303        })
304        .await;
305
306        let results = match results {
307            Ok(results) => {
308                trace!("All tasks completed during shutdown");
309                results
310            }
311            Err(_) => {
312                warn!(
313                    "Tasks did not complete within timeout ({}s), aborting remaining tasks and continuing with shutdown...",
314                    timeout_duration.as_secs()
315                );
316                tasks.abort_all();
317
318                tokio::time::sleep(Duration::from_millis(100)).await;
319
320                Vec::new()
321            }
322        };
323
324        for result in results {
325            if let Err(e) = result {
326                error!("Task failed during shutdown: {}", e);
327            } else {
328                trace!("Task completed successfully during shutdown");
329            }
330        }
331
332        #[cfg(feature = "checkpoint")]
333        {
334            if let Some(path) = &checkpoint_path {
335                debug!("Creating final checkpoint at {:?}", path);
336                let scheduler_checkpoint = scheduler.snapshot().await?;
337
338                #[cfg(feature = "cookie-store")]
339                let result = save_checkpoint::<S>(
340                    path,
341                    scheduler_checkpoint,
342                    &pipelines,
343                    &self.cookie_store,
344                )
345                .await;
346
347                #[cfg(not(feature = "cookie-store"))]
348                let result =
349                    save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines, &()).await;
350
351                if let Err(e) = result {
352                    error!("Final checkpoint save failed: {}", e);
353                } else {
354                    info!("Final checkpoint saved successfully to {:?}", path);
355                }
356            }
357        }
358
359        info!("Closing item pipelines...");
360        let futures: Vec<_> = pipelines.iter().map(|p| p.close()).collect();
361        join_all(futures).await;
362        debug!("All item pipelines closed");
363
364        info!("Crawl finished successfully\n{}", stats);
365        Ok(())
366    }
367
368    pub fn stats(&self) -> Arc<StatCollector> {
369        Arc::clone(&self.stats)
370    }
371
372    /// Returns a reference to the spider state.
373    pub fn state(&self) -> &S::State {
374        &self.spider_state
375    }
376
377    /// Returns an Arc clone of the spider state.
378    pub fn state_arc(&self) -> Arc<S::State> {
379        Arc::clone(&self.spider_state)
380    }
381}
382
383fn spawn_init_task<S, I>(
384    ctx: CrawlerContext<S, I>,
385) -> tokio::task::JoinHandle<()>
386where
387    S: Spider<Item = I> + 'static,
388    I: ScrapedItem,
389{
390    tokio::spawn(async move {
391        match ctx.spider.start_requests() {
392            Ok(requests) => {
393                for mut req in requests {
394                    req.url.set_fragment(None);
395                    match ctx.scheduler.enqueue_request(req).await {
396                        Ok(_) => {
397                            ctx.stats.increment_requests_enqueued();
398                        }
399                        Err(e) => {
400                            error!("Failed to enqueue initial request: {}", e);
401                        }
402                    }
403                }
404            }
405            Err(e) => error!("Failed to create start requests: {}", e),
406        }
407    })
408}