Skip to main content

spider_core/crawler/
core.rs

1//! The core Crawler implementation for the `spider-lib` framework.
2//!
3//! This module defines the `Crawler` struct, which acts as the central orchestrator
4//! for the web scraping process. It ties together the scheduler, downloader,
5//! middlewares, spiders, and item pipelines to execute a crawl. The crawler
6//! manages the lifecycle of requests and items, handles concurrency, supports
7//! checkpointing for fault tolerance, and collects statistics for monitoring.
8//!
9//! It utilizes a task-based asynchronous model, spawning distinct tasks for
10//! handling initial requests, downloading web pages, parsing responses, and
11//! processing scraped items.
12
13use spider_util::error::SpiderError;
14use spider_util::item::ScrapedItem;
15use spider_middleware::middleware::Middleware;
16use spider_pipeline::pipeline::Pipeline;
17use spider_util::request::Request;
18use crate::scheduler::Scheduler;
19use crate::spider::Spider;
20use crate::Downloader;
21use crate::state::CrawlerState;
22use crate::stats::StatCollector;
23use anyhow::Result;
24use futures_util::future::join_all;
25use kanal::{AsyncReceiver, bounded_async};
26use tracing::{debug, error, info, trace, warn};
27
28use crate::checkpoint::save_checkpoint;
29use std::path::PathBuf;
30use std::sync::{
31    Arc,
32    atomic::Ordering,
33};
34use std::time::Duration;
35use tokio::sync::{Mutex, RwLock};
36
37use cookie_store::CookieStore;
38
39/// The central orchestrator for the web scraping process, handling requests, responses, items, concurrency, checkpointing, and statistics collection.
40pub struct Crawler<S: Spider, C> {
41    scheduler: Arc<Scheduler>,
42    req_rx: AsyncReceiver<Request>,
43    stats: Arc<StatCollector>,
44    downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
45    middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
46    spider: Arc<Mutex<S>>,
47    item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
48    max_concurrent_downloads: usize,
49    parser_workers: usize,
50    max_concurrent_pipelines: usize,
51    channel_capacity: usize,
52    checkpoint_path: Option<PathBuf>,
53    checkpoint_interval: Option<Duration>,
54    pub cookie_store: Arc<RwLock<CookieStore>>,
55}
56
57impl<S, C> Crawler<S, C>
58where
59    S: Spider + 'static,
60    S::Item: ScrapedItem,
61    C: Send + Sync + Clone + 'static,
62{
63    /// Creates a new `Crawler` instance with the given components and configuration.
64    #[allow(clippy::too_many_arguments)]
65    pub(crate) fn new(
66        scheduler: Arc<Scheduler>,
67        req_rx: AsyncReceiver<Request>,
68        downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
69        middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
70        spider: S,
71        item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
72        max_concurrent_downloads: usize,
73        parser_workers: usize,
74        max_concurrent_pipelines: usize,
75        channel_capacity: usize,
76        checkpoint_path: Option<PathBuf>,
77        checkpoint_interval: Option<Duration>,
78        stats: Arc<StatCollector>,
79        cookie_store: Arc<tokio::sync::RwLock<CookieStore>>,
80    ) -> Self {
81        Crawler {
82            scheduler,
83            req_rx,
84            stats,
85            downloader,
86            middlewares,
87            spider: Arc::new(Mutex::new(spider)),
88            item_pipelines,
89            max_concurrent_downloads,
90            parser_workers,
91            max_concurrent_pipelines,
92            channel_capacity,
93            checkpoint_path,
94            checkpoint_interval,
95            cookie_store,
96        }
97    }
98
99    /// Starts the crawl, orchestrating the scraping process, managing tasks, handling shutdown, checkpointing, and logging statistics.
100    pub async fn start_crawl(self) -> Result<(), SpiderError> {
101        info!(
102            "Crawler starting crawl with configuration: max_concurrent_downloads={}, parser_workers={}, max_concurrent_pipelines={}",
103            self.max_concurrent_downloads, self.parser_workers, self.max_concurrent_pipelines
104        );
105
106        let Crawler {
107            scheduler,
108            req_rx,
109            stats,
110            downloader,
111            middlewares,
112            spider,
113            item_pipelines,
114            max_concurrent_downloads,
115            parser_workers,
116            max_concurrent_pipelines,
117            channel_capacity: _, // We don't need to use this value here
118            checkpoint_path,
119            checkpoint_interval,
120            cookie_store,
121        } = self;
122
123        let state = CrawlerState::new();
124        let pipelines = Arc::new(item_pipelines);
125
126        let adaptive_channel_capacity = std::cmp::max(
127            self.max_concurrent_downloads * 3,
128            self.parser_workers * self.max_concurrent_pipelines * 2,
129        )
130        .max(self.channel_capacity);
131
132        trace!(
133            "Creating communication channels with capacity: {}",
134            adaptive_channel_capacity
135        );
136        let (res_tx, res_rx) = bounded_async(adaptive_channel_capacity);
137        let (item_tx, item_rx) = bounded_async(adaptive_channel_capacity);
138
139        trace!("Spawning initial requests task");
140        let initial_requests_task =
141            spawn_initial_requests_task::<S>(scheduler.clone(), spider.clone(), stats.clone());
142
143        trace!("Initializing middleware manager");
144        let middlewares_manager = super::SharedMiddlewareManager::new(middlewares);
145
146        trace!("Spawning downloader task");
147        let downloader_task = super::spawn_downloader_task::<S, C>(
148            scheduler.clone(),
149            req_rx,
150            downloader,
151            middlewares_manager,
152            state.clone(),
153            res_tx.clone(),
154            max_concurrent_downloads,
155            stats.clone(),
156        );
157
158        trace!("Spawning parser task");
159        let parser_task = super::spawn_parser_task::<S>(
160            scheduler.clone(),
161            spider.clone(),
162            state.clone(),
163            res_rx,
164            item_tx.clone(),
165            parser_workers,
166            stats.clone(),
167        );
168
169        trace!("Spawning item processor task");
170        let item_processor_task = super::spawn_item_processor_task::<S>(
171            state.clone(),
172            item_rx,
173            pipelines.clone(),
174            max_concurrent_pipelines,
175            stats.clone(),
176        );
177
178        if let (Some(path), Some(interval)) = (&checkpoint_path, checkpoint_interval) {
179            let scheduler_clone = scheduler.clone();
180            let pipelines_clone = pipelines.clone();
181            let path_clone = path.clone();
182            let cookie_store_clone = cookie_store.clone();
183
184            trace!(
185                "Starting periodic checkpoint task with interval: {:?}",
186                interval
187            );
188            tokio::spawn(async move {
189                let mut interval_timer = tokio::time::interval(interval);
190                interval_timer.tick().await;
191                loop {
192                    tokio::select! {
193                        _ = interval_timer.tick() => {
194                            trace!("Checkpoint timer ticked, creating snapshot");
195                            if let Ok(scheduler_checkpoint) = scheduler_clone.snapshot().await {
196                                debug!("Scheduler snapshot created, saving checkpoint to {:?}", path_clone);
197                                let save_result = save_checkpoint::<S>(&path_clone, scheduler_checkpoint, &pipelines_clone, &cookie_store_clone).await;
198
199                                if let Err(e) = save_result {
200                                    error!("Periodic checkpoint save failed: {}", e);
201                                } else {
202                                    debug!("Periodic checkpoint saved successfully to {:?}", path_clone);
203                                }
204                            } else {
205                                warn!("Failed to create scheduler snapshot for checkpoint");
206                            }
207                        }
208                    }
209                }
210            });
211        }
212
213        tokio::select! {
214            _ = tokio::signal::ctrl_c() => {
215                info!("Ctrl-C received, initiating graceful shutdown.");
216            }
217            _ = async {
218                loop {
219                    if scheduler.is_idle() && state.is_idle() {
220                        tokio::time::sleep(Duration::from_millis(50)).await;
221                        if scheduler.is_idle() && state.is_idle() {
222                            break;
223                        }
224                    }
225                    tokio::time::sleep(Duration::from_millis(100)).await;
226                }
227            } => {
228                info!("Crawl has become idle, initiating shutdown.");
229            }
230        };
231
232        trace!("Closing communication channels");
233        drop(res_tx);
234        drop(item_tx);
235
236        if let Err(e) = scheduler.shutdown().await {
237            error!("Error during scheduler shutdown: {}", e);
238        } else {
239            debug!("Scheduler shutdown initiated successfully");
240        }
241
242        let timeout_duration = Duration::from_secs(30); // Default timeout of 30 seconds
243
244        let mut task_set = tokio::task::JoinSet::new();
245        task_set.spawn(item_processor_task);
246        task_set.spawn(parser_task);
247        task_set.spawn(downloader_task);
248        task_set.spawn(initial_requests_task);
249
250        let remaining_results = tokio::time::timeout(timeout_duration, async {
251            let mut results = Vec::new();
252            while let Some(result) = task_set.join_next().await {
253                results.push(result);
254            }
255            results
256        })
257        .await;
258
259        let task_results = match remaining_results {
260            Ok(results) => {
261                trace!("All tasks completed during shutdown");
262                results
263            }
264            Err(_) => {
265                warn!(
266                    "Tasks did not complete within timeout ({}s), aborting remaining tasks and continuing with shutdown...",
267                    timeout_duration.as_secs()
268                );
269                task_set.abort_all();
270
271                tokio::time::sleep(Duration::from_millis(100)).await;
272
273                Vec::new()
274            }
275        };
276
277        for result in task_results {
278            if let Err(e) = result {
279                error!("Task failed during shutdown: {}", e);
280            } else {
281                trace!("Task completed successfully during shutdown");
282            }
283        }
284
285        if let Some(path) = &checkpoint_path {
286            debug!("Creating final checkpoint at {:?}", path);
287            let scheduler_checkpoint = scheduler.snapshot().await?;
288            let result =
289                save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines, &cookie_store).await;
290
291            if let Err(e) = result {
292                error!("Final checkpoint save failed: {}", e);
293            } else {
294                info!("Final checkpoint saved successfully to {:?}", path);
295            }
296        }
297
298        info!("Closing item pipelines...");
299        let closing_futures: Vec<_> = pipelines.iter().map(|p| p.close()).collect();
300        join_all(closing_futures).await;
301        debug!("All item pipelines closed");
302
303        info!(
304            "Crawl finished successfully. Stats: requests_enqueued={}, requests_succeeded={}, items_scraped={}",
305            stats.requests_enqueued.load(Ordering::SeqCst),
306            stats.requests_succeeded.load(Ordering::SeqCst),
307            stats.items_scraped.load(Ordering::SeqCst)
308        );
309        Ok(())
310    }
311
312    /// Returns a cloned Arc to the `StatCollector` instance used by this crawler.
313    ///
314    /// This allows programmatic access to the collected statistics at any time during or after the crawl.
315    pub fn get_stats(&self) -> Arc<StatCollector> {
316        Arc::clone(&self.stats)
317    }
318}
319
320fn spawn_initial_requests_task<S>(
321    scheduler: Arc<Scheduler>,
322    spider: Arc<Mutex<S>>,
323    stats: Arc<StatCollector>,
324) -> tokio::task::JoinHandle<()>
325where
326    S: Spider + 'static,
327    S::Item: ScrapedItem,
328{
329    tokio::spawn(async move {
330        match spider.lock().await.start_requests() {
331            Ok(requests) => {
332                for mut req in requests {
333                    req.url.set_fragment(None);
334                    match scheduler.enqueue_request(req).await {
335                        Ok(_) => {
336                            stats.increment_requests_enqueued();
337                        }
338                        Err(e) => {
339                            error!("Failed to enqueue initial request: {}", e);
340                        }
341                    }
342                }
343            }
344            Err(e) => error!("Failed to create start requests: {}", e),
345        }
346    })
347}