Skip to main content

spider_core/engine/
crawler.rs

1//! The core Crawler implementation for the `spider-lib` framework.
2//!
3//! This module defines the `Crawler` struct, which acts as the central orchestrator
4//! for the web scraping process. It ties together the scheduler, downloader,
5//! middlewares, spiders, and item pipelines to execute a crawl. The crawler
6//! manages the lifecycle of requests and items, handles concurrency, supports
7//! checkpointing for fault tolerance, and collects statistics for monitoring.
8//!
9//! It utilizes a task-based asynchronous model, spawning distinct tasks for
10//! handling initial requests, downloading web pages, parsing responses, and
11//! processing scraped items.
12
13use crate::Downloader;
14use crate::config::CrawlerConfig;
15use crate::engine::CrawlerContext;
16use crate::scheduler::Scheduler;
17use crate::spider::Spider;
18use crate::state::CrawlerState;
19use crate::stats::StatCollector;
20use anyhow::Result;
21use futures_util::future::join_all;
22use kanal::{AsyncReceiver, bounded_async};
23use log::{debug, error, info, trace, warn};
24use spider_middleware::middleware::Middleware;
25use spider_pipeline::pipeline::Pipeline;
26use spider_util::error::SpiderError;
27use spider_util::item::ScrapedItem;
28use spider_util::request::Request;
29
30#[cfg(feature = "checkpoint")]
31use crate::checkpoint::save_checkpoint;
32#[cfg(feature = "checkpoint")]
33use crate::config::CheckpointConfig;
34
35use std::sync::Arc;
36use std::time::Duration;
37
38#[cfg(feature = "cookie-store")]
39use tokio::sync::RwLock;
40
41#[cfg(feature = "cookie-store")]
42use cookie_store::CookieStore;
43
44/// The central orchestrator for the web scraping process, handling requests, responses, items, concurrency, checkpointing, and statistics collection.
45pub struct Crawler<S: Spider, C> {
46    scheduler: Arc<Scheduler>,
47    req_rx: AsyncReceiver<Request>,
48    stats: Arc<StatCollector>,
49    downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
50    middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
51    spider: Arc<S>,
52    spider_state: Arc<S::State>,
53    pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
54    config: CrawlerConfig,
55    #[cfg(feature = "checkpoint")]
56    checkpoint_config: CheckpointConfig,
57    #[cfg(feature = "cookie-store")]
58    pub cookie_store: Arc<RwLock<CookieStore>>,
59}
60
61impl<S, C> Crawler<S, C>
62where
63    S: Spider + 'static,
64    S::Item: ScrapedItem,
65    C: Send + Sync + Clone + 'static,
66{
67    #[allow(clippy::too_many_arguments)]
68    pub(crate) fn new(
69        scheduler: Arc<Scheduler>,
70        req_rx: AsyncReceiver<Request>,
71        downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
72        middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
73        spider: S,
74        pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
75        config: CrawlerConfig,
76        #[cfg(feature = "checkpoint")] checkpoint_config: CheckpointConfig,
77        stats: Arc<StatCollector>,
78        #[cfg(feature = "cookie-store")] cookie_store: Arc<tokio::sync::RwLock<CookieStore>>,
79    ) -> Self {
80        Crawler {
81            scheduler,
82            req_rx,
83            stats,
84            downloader,
85            middlewares,
86            spider: Arc::new(spider),
87            spider_state: Arc::new(S::State::default()),
88            pipelines,
89            config,
90            #[cfg(feature = "checkpoint")]
91            checkpoint_config,
92            #[cfg(feature = "cookie-store")]
93            cookie_store,
94        }
95    }
96
97    pub async fn start_crawl(self) -> Result<(), SpiderError> {
98        info!(
99            "Crawler starting crawl with configuration: max_concurrent_downloads={}, parser_workers={}, max_concurrent_pipelines={}",
100            self.config.max_concurrent_downloads,
101            self.config.parser_workers,
102            self.config.max_concurrent_pipelines
103        );
104
105        let Crawler {
106            scheduler,
107            req_rx,
108            stats,
109            downloader,
110            middlewares,
111            spider,
112            spider_state,
113            pipelines,
114            config,
115            #[cfg(feature = "checkpoint")]
116            checkpoint_config,
117            #[cfg(feature = "cookie-store")]
118                cookie_store: _,
119        } = self;
120
121        let state = CrawlerState::new();
122        let pipelines = Arc::new(pipelines);
123
124        // Create aggregated context for efficient sharing across tasks
125        let ctx = CrawlerContext::new(
126            Arc::clone(&scheduler),
127            Arc::clone(&stats),
128            Arc::clone(&spider),
129            Arc::clone(&spider_state),
130            Arc::clone(&pipelines),
131        );
132
133        let channel_capacity = std::cmp::max(
134            config.max_concurrent_downloads * 3,
135            config.parser_workers * config.max_concurrent_pipelines * 2,
136        )
137        .max(config.channel_capacity);
138
139        trace!(
140            "Creating communication channels with capacity: {}",
141            channel_capacity
142        );
143        let (res_tx, res_rx) = bounded_async(channel_capacity);
144        let (item_tx, item_rx) = bounded_async(channel_capacity);
145
146        trace!("Spawning initial requests task");
147        let init_task = spawn_init_task(ctx.clone());
148
149        trace!("Initializing middleware manager");
150        let middlewares = super::SharedMiddlewareManager::new(middlewares);
151
152        trace!("Spawning downloader task");
153        let downloader_handle = super::spawn_downloader_task::<S, C>(
154            Arc::clone(&ctx.scheduler),
155            req_rx,
156            downloader,
157            middlewares,
158            state.clone(),
159            res_tx.clone(),
160            config.max_concurrent_downloads,
161            Arc::clone(&ctx.stats),
162        );
163
164        trace!("Spawning parser task");
165        let parser_handle = super::spawn_parser_task::<S>(
166            Arc::clone(&ctx.scheduler),
167            Arc::clone(&ctx.spider),
168            Arc::clone(&ctx.spider_state),
169            state.clone(),
170            res_rx,
171            item_tx.clone(),
172            config.parser_workers,
173            Arc::clone(&ctx.stats),
174        );
175
176        trace!("Spawning item processor task");
177        let processor_handle = super::spawn_item_processor_task::<S>(
178            state.clone(),
179            item_rx,
180            Arc::clone(&ctx.pipelines),
181            config.max_concurrent_pipelines,
182            Arc::clone(&ctx.stats),
183        );
184
185        #[cfg(feature = "checkpoint")]
186        {
187            if let (Some(path), Some(interval)) =
188                (&checkpoint_config.path, checkpoint_config.interval)
189            {
190                let scheduler_cp = Arc::clone(&ctx.scheduler);
191                let pipelines_cp = Arc::clone(&ctx.pipelines);
192                let path_cp = path.clone();
193
194                #[cfg(feature = "cookie-store")]
195                let cookie_store_cp = self.cookie_store.clone();
196
197                #[cfg(not(feature = "cookie-store"))]
198                let _cookie_store_cp = ();
199
200                trace!(
201                    "Starting periodic checkpoint task with interval: {:?}",
202                    interval
203                );
204                tokio::spawn(async move {
205                    let mut interval_timer = tokio::time::interval(interval);
206                    interval_timer.tick().await;
207                    loop {
208                        tokio::select! {
209                            _ = interval_timer.tick() => {
210                                trace!("Checkpoint timer ticked, creating snapshot");
211                                if let Ok(scheduler_checkpoint) = scheduler_cp.snapshot().await {
212                                    debug!("Scheduler snapshot created, saving checkpoint to {:?}", path_cp);
213
214                                    #[cfg(feature = "cookie-store")]
215                                    let save_result = save_checkpoint::<S>(&path_cp, scheduler_checkpoint, &pipelines_cp, &cookie_store_cp).await;
216
217                                    #[cfg(not(feature = "cookie-store"))]
218                                    let save_result = save_checkpoint::<S>(&path_cp, scheduler_checkpoint, &pipelines_cp, &()).await;
219
220                                    if let Err(e) = save_result {
221                                        error!("Periodic checkpoint save failed: {}", e);
222                                    } else {
223                                        debug!("Periodic checkpoint saved successfully to {:?}", path_cp);
224                                    }
225                                } else {
226                                    warn!("Failed to create scheduler snapshot for checkpoint");
227                                }
228                            }
229                        }
230                    }
231                });
232            }
233        }
234
235        tokio::select! {
236            _ = tokio::signal::ctrl_c() => {
237                info!("Ctrl-C received, initiating graceful shutdown.");
238            }
239            _ = async {
240                loop {
241                    if scheduler.is_idle() && state.is_idle() {
242                        tokio::time::sleep(Duration::from_millis(50)).await;
243                        if scheduler.is_idle() && state.is_idle() {
244                            break;
245                        }
246                    }
247                    tokio::time::sleep(Duration::from_millis(100)).await;
248                }
249            } => {
250                info!("Crawl has become idle, initiating shutdown.");
251            }
252        };
253
254        trace!("Closing communication channels");
255        drop(res_tx);
256        drop(item_tx);
257
258        if let Err(e) = scheduler.shutdown().await {
259            error!("Error during scheduler shutdown: {}", e);
260        } else {
261            debug!("Scheduler shutdown initiated successfully");
262        }
263
264        let timeout_duration = Duration::from_secs(30);
265
266        let mut tasks = tokio::task::JoinSet::new();
267        tasks.spawn(processor_handle);
268        tasks.spawn(parser_handle);
269        tasks.spawn(downloader_handle);
270        tasks.spawn(init_task);
271
272        let results = tokio::time::timeout(timeout_duration, async {
273            let mut results = Vec::new();
274            while let Some(result) = tasks.join_next().await {
275                results.push(result);
276            }
277            results
278        })
279        .await;
280
281        let results = match results {
282            Ok(results) => {
283                trace!("All tasks completed during shutdown");
284                results
285            }
286            Err(_) => {
287                warn!(
288                    "Tasks did not complete within timeout ({}s), aborting remaining tasks and continuing with shutdown...",
289                    timeout_duration.as_secs()
290                );
291                tasks.abort_all();
292
293                tokio::time::sleep(Duration::from_millis(100)).await;
294
295                Vec::new()
296            }
297        };
298
299        for result in results {
300            if let Err(e) = result {
301                error!("Task failed during shutdown: {}", e);
302            } else {
303                trace!("Task completed successfully during shutdown");
304            }
305        }
306
307        #[cfg(feature = "checkpoint")]
308        {
309            if let Some(path) = &checkpoint_config.path {
310                debug!("Creating final checkpoint at {:?}", path);
311                let scheduler_checkpoint = scheduler.snapshot().await?;
312
313                #[cfg(feature = "cookie-store")]
314                let result = save_checkpoint::<S>(
315                    path,
316                    scheduler_checkpoint,
317                    &pipelines,
318                    &self.cookie_store,
319                )
320                .await;
321
322                #[cfg(not(feature = "cookie-store"))]
323                let result =
324                    save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines, &()).await;
325
326                if let Err(e) = result {
327                    error!("Final checkpoint save failed: {}", e);
328                } else {
329                    info!("Final checkpoint saved successfully to {:?}", path);
330                }
331            }
332        }
333
334        info!("Closing item pipelines...");
335        let futures: Vec<_> = pipelines.iter().map(|p| p.close()).collect();
336        join_all(futures).await;
337        debug!("All item pipelines closed");
338
339        info!("Crawl finished successfully\n{}", stats);
340        Ok(())
341    }
342
343    /// Returns a shared handle to crawler runtime statistics.
344    pub fn stats(&self) -> Arc<StatCollector> {
345        Arc::clone(&self.stats)
346    }
347
348    /// Returns a reference to the spider state.
349    pub fn state(&self) -> &S::State {
350        &self.spider_state
351    }
352
353    /// Returns an Arc clone of the spider state.
354    pub fn state_arc(&self) -> Arc<S::State> {
355        Arc::clone(&self.spider_state)
356    }
357}
358
359fn spawn_init_task<S, I>(ctx: CrawlerContext<S, I>) -> tokio::task::JoinHandle<()>
360where
361    S: Spider<Item = I> + 'static,
362    I: ScrapedItem,
363{
364    tokio::spawn(async move {
365        match ctx.spider.start_requests() {
366            Ok(requests) => {
367                for mut req in requests {
368                    req.url.set_fragment(None);
369                    match ctx.scheduler.enqueue_request(req).await {
370                        Ok(_) => {
371                            ctx.stats.increment_requests_enqueued();
372                        }
373                        Err(e) => {
374                            error!("Failed to enqueue initial request: {}", e);
375                        }
376                    }
377                }
378            }
379            Err(e) => error!("Failed to create start requests: {}", e),
380        }
381    })
382}