Skip to main content

spider_core/engine/
crawler.rs

1//! The concrete crawler runtime.
2
3use crate::Downloader;
4use crate::config::CrawlerConfig;
5use crate::engine::CrawlerContext;
6use crate::scheduler::Scheduler;
7use crate::spider::Spider;
8use crate::state::CrawlerState;
9use crate::stats::StatCollector;
10use anyhow::Result;
11#[cfg(feature = "live-stats")]
12use crossterm::{
13    cursor::{Hide, MoveToColumn, MoveUp, Show},
14    execute, queue,
15    terminal::{Clear, ClearType, size},
16};
17use futures_util::future::join_all;
18use kanal::{AsyncReceiver, bounded_async};
19use log::{debug, error, info, trace, warn};
20use spider_middleware::middleware::Middleware;
21use spider_pipeline::pipeline::Pipeline;
22use spider_util::error::SpiderError;
23use spider_util::item::ScrapedItem;
24use spider_util::request::Request;
25#[cfg(feature = "live-stats")]
26use unicode_width::UnicodeWidthChar;
27
28#[cfg(feature = "checkpoint")]
29use crate::checkpoint::save_checkpoint;
30#[cfg(feature = "checkpoint")]
31use crate::config::CheckpointConfig;
32
33#[cfg(feature = "live-stats")]
34use std::io::{IsTerminal, Write};
35use std::sync::Arc;
36use std::time::Duration;
37
38#[cfg(feature = "cookie-store")]
39use tokio::sync::RwLock;
40#[cfg(feature = "live-stats")]
41use tokio::sync::oneshot;
42#[cfg(feature = "live-stats")]
43use tokio::time::MissedTickBehavior;
44
45#[cfg(feature = "cookie-store")]
46use cookie_store::CookieStore;
47
48/// The running crawler instance.
49pub struct Crawler<S: Spider, C> {
50    scheduler: Arc<Scheduler>,
51    req_rx: AsyncReceiver<Request>,
52    stats: Arc<StatCollector>,
53    downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
54    middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
55    spider: Arc<S>,
56    spider_state: Arc<S::State>,
57    pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
58    config: CrawlerConfig,
59    #[cfg(feature = "checkpoint")]
60    checkpoint_config: CheckpointConfig,
61    #[cfg(feature = "cookie-store")]
62    pub cookie_store: Arc<RwLock<CookieStore>>,
63}
64
65impl<S, C> Crawler<S, C>
66where
67    S: Spider + 'static,
68    S::Item: ScrapedItem,
69    C: Send + Sync + Clone + 'static,
70{
71    #[allow(clippy::too_many_arguments)]
72    pub(crate) fn new(
73        scheduler: Arc<Scheduler>,
74        req_rx: AsyncReceiver<Request>,
75        downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
76        middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
77        spider: S,
78        pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
79        config: CrawlerConfig,
80        #[cfg(feature = "checkpoint")] checkpoint_config: CheckpointConfig,
81        stats: Arc<StatCollector>,
82        #[cfg(feature = "cookie-store")] cookie_store: Arc<tokio::sync::RwLock<CookieStore>>,
83    ) -> Self {
84        Crawler {
85            scheduler,
86            req_rx,
87            stats,
88            downloader,
89            middlewares,
90            spider: Arc::new(spider),
91            spider_state: Arc::new(S::State::default()),
92            pipelines,
93            config,
94            #[cfg(feature = "checkpoint")]
95            checkpoint_config,
96            #[cfg(feature = "cookie-store")]
97            cookie_store,
98        }
99    }
100
101    pub async fn start_crawl(self) -> Result<(), SpiderError> {
102        info!(
103            "Crawler starting crawl with configuration: max_concurrent_downloads={}, parser_workers={}, max_concurrent_pipelines={}, item_limit={:?}",
104            self.config.max_concurrent_downloads,
105            self.config.parser_workers,
106            self.config.max_concurrent_pipelines,
107            self.config.item_limit
108        );
109
110        let Crawler {
111            scheduler,
112            req_rx,
113            stats,
114            downloader,
115            middlewares,
116            spider,
117            spider_state,
118            pipelines,
119            config,
120            #[cfg(feature = "checkpoint")]
121            checkpoint_config,
122            #[cfg(feature = "cookie-store")]
123                cookie_store: _,
124        } = self;
125
126        let state = CrawlerState::new();
127        let pipelines = Arc::new(pipelines);
128
129        // Create aggregated context for efficient sharing across tasks
130        let ctx = CrawlerContext::new(
131            Arc::clone(&scheduler),
132            Arc::clone(&stats),
133            Arc::clone(&spider),
134            Arc::clone(&spider_state),
135            Arc::clone(&pipelines),
136        );
137
138        let channel_capacity = std::cmp::max(
139            config.max_concurrent_downloads * 3,
140            config.parser_workers * config.max_concurrent_pipelines * 2,
141        )
142        .max(config.channel_capacity);
143
144        trace!(
145            "Creating communication channels with capacity: {}",
146            channel_capacity
147        );
148        let (res_tx, res_rx) = bounded_async(channel_capacity);
149        let (item_tx, item_rx) = bounded_async(channel_capacity);
150
151        info!("Starting initial request bootstrap task");
152        let init_task = spawn_init_task(ctx.clone());
153
154        debug!("Initializing middleware manager");
155        let middlewares = super::SharedMiddlewareManager::new(middlewares);
156
157        info!("Starting downloader task");
158        let downloader_handle = super::spawn_downloader_task::<S, C>(
159            Arc::clone(&ctx.scheduler),
160            req_rx,
161            downloader,
162            middlewares,
163            state.clone(),
164            res_tx.clone(),
165            config.max_concurrent_downloads,
166            config.response_backpressure_threshold.max(1),
167            config.retry_release_permit,
168            Arc::clone(&ctx.stats),
169        );
170
171        info!("Starting parser task");
172        let parser_handle = super::spawn_parser_task::<S>(
173            Arc::clone(&ctx.scheduler),
174            Arc::clone(&ctx.spider),
175            Arc::clone(&ctx.spider_state),
176            state.clone(),
177            res_rx,
178            item_tx.clone(),
179            config.parser_workers,
180            config.output_batch_size.max(1),
181            config.item_backpressure_threshold.max(1),
182            config.item_limit,
183            Arc::clone(&ctx.stats),
184        );
185
186        info!("Starting item processor task");
187        let processor_handle = super::spawn_item_processor_task::<S>(
188            state.clone(),
189            item_rx,
190            Arc::clone(&ctx.pipelines),
191            config.max_concurrent_pipelines,
192            Arc::clone(&ctx.stats),
193        );
194
195        #[cfg(feature = "live-stats")]
196        let mut live_stats_task: Option<(
197            oneshot::Sender<()>,
198            tokio::task::JoinHandle<()>,
199        )> = if config.live_stats && std::io::stdout().is_terminal() {
200            let (stop_tx, stop_rx) = oneshot::channel();
201            let stats_for_live = Arc::clone(&ctx.stats);
202            let interval = config.live_stats_interval;
203            let handle = tokio::spawn(async move {
204                run_live_stats(stats_for_live, interval, stop_rx).await;
205            });
206            Some((stop_tx, handle))
207        } else {
208            None
209        };
210        #[cfg(not(feature = "live-stats"))]
211        let mut live_stats_task: Option<((), ())> = None;
212
213        #[cfg(feature = "checkpoint")]
214        {
215            if let (Some(path), Some(interval)) =
216                (&checkpoint_config.path, checkpoint_config.interval)
217            {
218                let scheduler_cp = Arc::clone(&ctx.scheduler);
219                let pipelines_cp = Arc::clone(&ctx.pipelines);
220                let path_cp = path.clone();
221
222                #[cfg(feature = "cookie-store")]
223                let cookie_store_cp = self.cookie_store.clone();
224
225                #[cfg(not(feature = "cookie-store"))]
226                let _cookie_store_cp = ();
227
228                info!(
229                    "Starting periodic checkpoint task with interval: {:?}",
230                    interval
231                );
232                tokio::spawn(async move {
233                    let mut interval_timer = tokio::time::interval(interval);
234                    interval_timer.tick().await;
235                    loop {
236                        tokio::select! {
237                            _ = interval_timer.tick() => {
238                                trace!("Checkpoint timer ticked, creating snapshot");
239                                if let Ok(scheduler_checkpoint) = scheduler_cp.snapshot().await {
240                                    debug!("Scheduler snapshot created, saving checkpoint to {:?}", path_cp);
241
242                                    #[cfg(feature = "cookie-store")]
243                                    let save_result = save_checkpoint::<S>(&path_cp, scheduler_checkpoint, &pipelines_cp, &cookie_store_cp).await;
244
245                                    #[cfg(not(feature = "cookie-store"))]
246                                    let save_result = save_checkpoint::<S>(&path_cp, scheduler_checkpoint, &pipelines_cp, &()).await;
247
248                                    if let Err(e) = save_result {
249                                        error!("Periodic checkpoint save failed: {}", e);
250                                    } else {
251                                        debug!("Periodic checkpoint saved successfully to {:?}", path_cp);
252                                    }
253                                } else {
254                                    warn!("Failed to create scheduler snapshot for checkpoint");
255                                }
256                            }
257                        }
258                    }
259                });
260            }
261        }
262
263        let interrupted = tokio::select! {
264            _ = tokio::signal::ctrl_c() => {
265                info!("Ctrl-C received, initiating graceful shutdown.");
266                if let Err(e) = scheduler.shutdown().await {
267                    error!("Error during scheduler shutdown: {}", e);
268                } else {
269                    info!("Scheduler shutdown initiated successfully");
270                }
271                true
272            }
273            _ = async {
274                loop {
275                    if scheduler.is_idle() && state.is_idle() {
276                        tokio::time::sleep(Duration::from_millis(25)).await;
277                        if scheduler.is_idle() && state.is_idle() {
278                            break;
279                        }
280                    }
281                    tokio::time::sleep(Duration::from_millis(25)).await;
282                }
283            } => {
284                info!("Crawl has become idle, initiating shutdown.");
285                false
286            }
287        };
288
289        trace!("Closing communication channels");
290        drop(res_tx);
291        drop(item_tx);
292
293        if !interrupted {
294            if let Err(e) = scheduler.shutdown().await {
295                error!("Error during scheduler shutdown: {}", e);
296            } else {
297                info!("Scheduler shutdown initiated successfully");
298            }
299        }
300
301        let mut tasks = tokio::task::JoinSet::new();
302        tasks.spawn(processor_handle);
303        tasks.spawn(parser_handle);
304        tasks.spawn(downloader_handle);
305        tasks.spawn(init_task);
306        let mut results = Vec::new();
307        let mut remaining_tasks = 4usize;
308
309        if interrupted {
310            let grace_period = config.shutdown_grace_period;
311            let shutdown_deadline = tokio::time::sleep(grace_period);
312            tokio::pin!(shutdown_deadline);
313
314            while remaining_tasks > 0 {
315                tokio::select! {
316                    result = tasks.join_next() => {
317                        match result {
318                            Some(result) => {
319                                results.push(result);
320                                remaining_tasks = remaining_tasks.saturating_sub(1);
321                            }
322                            None => break,
323                        }
324                    }
325                    _ = tokio::signal::ctrl_c() => {
326                        warn!("Second Ctrl-C received, aborting remaining tasks immediately.");
327                        tasks.abort_all();
328                        tokio::time::sleep(Duration::from_millis(25)).await;
329                        break;
330                    }
331                    _ = &mut shutdown_deadline => {
332                        warn!(
333                            "Tasks did not complete within shutdown grace period ({}s), aborting remaining tasks and continuing with shutdown...",
334                            grace_period.as_secs()
335                        );
336                        tasks.abort_all();
337                        tokio::time::sleep(Duration::from_millis(25)).await;
338                        break;
339                    }
340                }
341            }
342        } else {
343            while let Some(result) = tasks.join_next().await {
344                results.push(result);
345            }
346            trace!("All tasks completed during shutdown");
347        }
348
349        for result in results {
350            if let Err(e) = result {
351                error!("Task failed during shutdown: {}", e);
352            } else {
353                trace!("Task completed successfully during shutdown");
354            }
355        }
356
357        #[cfg(feature = "live-stats")]
358        if let Some((stop_tx, handle)) = live_stats_task.take() {
359            let _ = stop_tx.send(());
360            let _ = handle.await;
361        }
362        #[cfg(not(feature = "live-stats"))]
363        let _ = live_stats_task.take();
364
365        #[cfg(feature = "checkpoint")]
366        {
367            if let Some(path) = &checkpoint_config.path {
368                debug!("Creating final checkpoint at {:?}", path);
369                let scheduler_checkpoint = scheduler.snapshot().await?;
370
371                #[cfg(feature = "cookie-store")]
372                let result = save_checkpoint::<S>(
373                    path,
374                    scheduler_checkpoint,
375                    &pipelines,
376                    &self.cookie_store,
377                )
378                .await;
379
380                #[cfg(not(feature = "cookie-store"))]
381                let result =
382                    save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines, &()).await;
383
384                if let Err(e) = result {
385                    error!("Final checkpoint save failed: {}", e);
386                } else {
387                    info!("Final checkpoint saved successfully to {:?}", path);
388                }
389            }
390        }
391
392        info!("Closing item pipelines...");
393        let futures: Vec<_> = pipelines.iter().map(|p| p.close()).collect();
394        join_all(futures).await;
395        info!("All item pipelines closed");
396
397        if config.live_stats {
398            println!("{}\n", stats.to_live_report_string());
399        } else {
400            info!("Crawl finished successfully\n{}", stats);
401        }
402        Ok(())
403    }
404
405    /// Returns a shared handle to crawler runtime statistics.
406    pub fn stats(&self) -> Arc<StatCollector> {
407        Arc::clone(&self.stats)
408    }
409
410    /// Returns a reference to the spider state.
411    pub fn state(&self) -> &S::State {
412        &self.spider_state
413    }
414
415    /// Returns an Arc clone of the spider state.
416    pub fn state_arc(&self) -> Arc<S::State> {
417        Arc::clone(&self.spider_state)
418    }
419}
420
421fn spawn_init_task<S, I>(ctx: CrawlerContext<S, I>) -> tokio::task::JoinHandle<()>
422where
423    S: Spider<Item = I> + 'static,
424    I: ScrapedItem,
425{
426    tokio::spawn(async move {
427        let mut enqueued = 0usize;
428        let mut skipped = 0usize;
429        let mut failed = 0usize;
430        match ctx.spider.start_requests() {
431            Ok(source) => match source.into_iter() {
432                Ok(requests) => {
433                    for req_res in requests {
434                        let mut req = match req_res {
435                            Ok(req) => req,
436                            Err(e) => {
437                                warn!("Skipping invalid start URL entry: {}", e);
438                                skipped += 1;
439                                continue;
440                            }
441                        };
442
443                        req.url.set_fragment(None);
444                        match ctx.scheduler.enqueue_request(req).await {
445                            Ok(_) => {
446                                ctx.stats.increment_requests_enqueued();
447                                enqueued += 1;
448                            }
449                            Err(e) => {
450                                error!("Failed to enqueue initial request: {}", e);
451                                failed += 1;
452                            }
453                        }
454                    }
455                    info!(
456                        "Initial request bootstrap finished: {} enqueued, {} skipped, {} failed",
457                        enqueued, skipped, failed
458                    );
459                }
460                Err(e) => error!("Failed to resolve start request source: {}", e),
461            },
462            Err(e) => error!("Failed to create start request source: {}", e),
463        }
464    })
465}
466#[cfg(feature = "live-stats")]
467struct LiveStatsRenderer {
468    previous_lines: Vec<String>,
469}
470
471#[cfg(feature = "live-stats")]
472impl LiveStatsRenderer {
473    fn new() -> Self {
474        let mut out = std::io::stdout();
475        let _ = execute!(out, Hide);
476        let _ = writeln!(out);
477        let _ = out.flush();
478        Self {
479            previous_lines: Vec::new(),
480        }
481    }
482
483    fn render(&mut self, content: &str) {
484        let mut out = std::io::stdout();
485        let terminal_width = Self::terminal_width();
486        let next_lines: Vec<String> = content
487            .lines()
488            .map(|line| Self::fit_line(line, terminal_width))
489            .collect();
490        let previous_len = self.previous_lines.len();
491        let next_len = next_lines.len();
492        let max_len = previous_len.max(next_len);
493
494        if previous_len > 1 {
495            let _ = queue!(out, MoveUp((previous_len - 1) as u16));
496        }
497        let _ = queue!(out, MoveToColumn(0));
498
499        for line_idx in 0..max_len {
500            let _ = queue!(out, MoveToColumn(0), Clear(ClearType::CurrentLine));
501
502            if let Some(line) = next_lines.get(line_idx) {
503                let _ = write!(out, "{}", line);
504            }
505
506            if line_idx + 1 < max_len {
507                let _ = writeln!(out);
508            }
509        }
510
511        let _ = out.flush();
512        self.previous_lines = next_lines;
513    }
514
515    fn terminal_width() -> usize {
516        size()
517            .map(|(width, _)| usize::from(width.max(1)))
518            .unwrap_or(usize::MAX)
519    }
520
521    fn fit_line(line: &str, width: usize) -> String {
522        let safe_width = Self::safe_terminal_width(width);
523
524        if line.starts_with("current  : ") {
525            return Self::truncate_with_ellipsis(line, safe_width);
526        }
527
528        Self::truncate_to_width(line, safe_width)
529    }
530
531    fn safe_terminal_width(width: usize) -> usize {
532        if width == usize::MAX {
533            return width;
534        }
535
536        width.saturating_sub(1)
537    }
538
539    fn truncate_to_width(line: &str, width: usize) -> String {
540        if width == usize::MAX || width == 0 {
541            return line.to_owned();
542        }
543
544        let mut visible_width = 0;
545        let mut truncated = String::new();
546
547        for ch in line.chars() {
548            let ch_width = ch.width().unwrap_or(0);
549            if visible_width + ch_width > width {
550                break;
551            }
552
553            truncated.push(ch);
554            visible_width += ch_width;
555        }
556
557        truncated
558    }
559
560    fn truncate_with_ellipsis(line: &str, width: usize) -> String {
561        if width == usize::MAX || width == 0 {
562            return line.to_owned();
563        }
564
565        if Self::display_width(line) <= width {
566            return line.to_owned();
567        }
568
569        if width <= 3 {
570            return ".".repeat(width);
571        }
572
573        let visible = Self::truncate_to_width(line, width - 3);
574        format!("{visible}...")
575    }
576
577    fn display_width(line: &str) -> usize {
578        line.chars().map(|ch| ch.width().unwrap_or(0)).sum()
579    }
580
581    fn finish(self) {
582        let mut out = std::io::stdout();
583        self.clear_previous(&mut out);
584        let _ = execute!(out, MoveToColumn(0), Clear(ClearType::CurrentLine), Show);
585        let _ = out.flush();
586    }
587
588    fn clear_previous(&self, out: &mut std::io::Stdout) {
589        if self.previous_lines.is_empty() {
590            return;
591        }
592        let lines = self.previous_lines.len();
593        let _ = queue!(out, MoveToColumn(0));
594        if lines > 1 {
595            let _ = queue!(out, MoveUp((lines - 1) as u16));
596        }
597        for line_idx in 0..lines {
598            let _ = queue!(out, MoveToColumn(0), Clear(ClearType::CurrentLine));
599            if line_idx + 1 < lines {
600                let _ = writeln!(out);
601            }
602        }
603        if lines > 1 {
604            let _ = queue!(out, MoveUp((lines - 1) as u16));
605        }
606    }
607}
608
609#[cfg(feature = "live-stats")]
610async fn run_live_stats(
611    stats: Arc<StatCollector>,
612    interval: Duration,
613    mut stop_rx: oneshot::Receiver<()>,
614) {
615    let mut ticker = tokio::time::interval(interval);
616    ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
617    let mut renderer = LiveStatsRenderer::new();
618
619    loop {
620        tokio::select! {
621            _ = ticker.tick() => {
622                renderer.render(&stats.to_live_report_string());
623            }
624            _ = &mut stop_rx => {
625                break;
626            }
627        }
628    }
629
630    renderer.finish();
631}