Skip to main content

spider_core/engine/
crawler.rs

1//! The concrete crawler runtime.
2
3use crate::Downloader;
4use crate::config::CrawlerConfig;
5use crate::engine::CrawlerContext;
6use crate::scheduler::Scheduler;
7use crate::spider::Spider;
8use crate::state::CrawlerState;
9use crate::stats::StatCollector;
10use anyhow::Result;
11#[cfg(feature = "live-stats")]
12use crossterm::{
13    cursor::{Hide, MoveToColumn, MoveUp, Show},
14    execute, queue,
15    terminal::{Clear, ClearType, size},
16};
17use futures_util::future::join_all;
18use kanal::{AsyncReceiver, bounded_async};
19use log::{debug, error, info, trace, warn};
20use spider_middleware::middleware::Middleware;
21use spider_pipeline::pipeline::Pipeline;
22use spider_util::error::SpiderError;
23use spider_util::item::ScrapedItem;
24use spider_util::request::Request;
25#[cfg(feature = "live-stats")]
26use unicode_width::UnicodeWidthChar;
27
28#[cfg(feature = "checkpoint")]
29use crate::checkpoint::save_checkpoint;
30#[cfg(feature = "checkpoint")]
31use crate::config::CheckpointConfig;
32
33#[cfg(feature = "live-stats")]
34use std::io::{IsTerminal, Write};
35use std::sync::Arc;
36use std::time::Duration;
37
38#[cfg(feature = "cookie-store")]
39use tokio::sync::RwLock;
40#[cfg(feature = "live-stats")]
41use tokio::sync::oneshot;
42#[cfg(feature = "live-stats")]
43use tokio::time::MissedTickBehavior;
44
45#[cfg(feature = "cookie-store")]
46use cookie_store::CookieStore;
47
48enum RunOutcome {
49    Interrupted,
50    Idle,
51    ItemLimitReached,
52}
53
54/// The running crawler instance.
55pub struct Crawler<S: Spider, C> {
56    scheduler: Arc<Scheduler>,
57    req_rx: AsyncReceiver<Request>,
58    stats: Arc<StatCollector>,
59    downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
60    middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
61    spider: Arc<S>,
62    spider_state: Arc<S::State>,
63    pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
64    config: CrawlerConfig,
65    #[cfg(feature = "checkpoint")]
66    checkpoint_config: CheckpointConfig,
67    #[cfg(feature = "cookie-store")]
68    pub cookie_store: Arc<RwLock<CookieStore>>,
69}
70
71impl<S, C> Crawler<S, C>
72where
73    S: Spider + 'static,
74    S::Item: ScrapedItem,
75    C: Send + Sync + Clone + 'static,
76{
77    #[allow(clippy::too_many_arguments)]
78    pub(crate) fn new(
79        scheduler: Arc<Scheduler>,
80        req_rx: AsyncReceiver<Request>,
81        downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
82        middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
83        spider: S,
84        pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
85        config: CrawlerConfig,
86        #[cfg(feature = "checkpoint")] checkpoint_config: CheckpointConfig,
87        stats: Arc<StatCollector>,
88        #[cfg(feature = "cookie-store")] cookie_store: Arc<tokio::sync::RwLock<CookieStore>>,
89    ) -> Self {
90        Crawler {
91            scheduler,
92            req_rx,
93            stats,
94            downloader,
95            middlewares,
96            spider: Arc::new(spider),
97            spider_state: Arc::new(S::State::default()),
98            pipelines,
99            config,
100            #[cfg(feature = "checkpoint")]
101            checkpoint_config,
102            #[cfg(feature = "cookie-store")]
103            cookie_store,
104        }
105    }
106
107    pub async fn start_crawl(self) -> Result<(), SpiderError> {
108        info!(
109            "Crawler starting crawl with configuration: max_concurrent_downloads={}, parser_workers={}, max_concurrent_pipelines={}, item_limit={:?}",
110            self.config.max_concurrent_downloads,
111            self.config.parser_workers,
112            self.config.max_concurrent_pipelines,
113            self.config.item_limit
114        );
115
116        let Crawler {
117            scheduler,
118            req_rx,
119            stats,
120            downloader,
121            middlewares,
122            spider,
123            spider_state,
124            pipelines,
125            config,
126            #[cfg(feature = "checkpoint")]
127            checkpoint_config,
128            #[cfg(feature = "cookie-store")]
129                cookie_store: _,
130        } = self;
131
132        let state = CrawlerState::new();
133        let pipelines = Arc::new(pipelines);
134
135        // Create aggregated context for efficient sharing across tasks
136        let ctx = CrawlerContext::new(
137            Arc::clone(&scheduler),
138            Arc::clone(&stats),
139            Arc::clone(&spider),
140            Arc::clone(&spider_state),
141            Arc::clone(&pipelines),
142        );
143
144        let channel_capacity = std::cmp::max(
145            config.max_concurrent_downloads * 3,
146            config.parser_workers * config.max_concurrent_pipelines * 2,
147        )
148        .max(config.channel_capacity);
149
150        trace!(
151            "Creating communication channels with capacity: {}",
152            channel_capacity
153        );
154        let (res_tx, res_rx) = bounded_async(channel_capacity);
155        let (item_tx, item_rx) = bounded_async(channel_capacity);
156
157        info!("Starting initial request bootstrap task");
158        let init_task = spawn_init_task(ctx.clone());
159
160        debug!("Initializing middleware manager");
161        let middlewares = super::SharedMiddlewareManager::new(middlewares);
162
163        info!("Starting downloader task");
164        let downloader_handle = super::spawn_downloader_task::<S, C>(
165            Arc::clone(&ctx.scheduler),
166            req_rx,
167            downloader,
168            middlewares,
169            state.clone(),
170            res_tx.clone(),
171            config.max_concurrent_downloads,
172            config.response_backpressure_threshold.max(1),
173            config.retry_release_permit,
174            Arc::clone(&ctx.stats),
175        );
176
177        info!("Starting parser task");
178        let parser_handle = super::spawn_parser_task::<S>(
179            Arc::clone(&ctx.scheduler),
180            Arc::clone(&ctx.spider),
181            Arc::clone(&ctx.spider_state),
182            state.clone(),
183            res_rx,
184            item_tx.clone(),
185            config.parser_workers,
186            config.discovery.clone(),
187            config.output_batch_size.max(1),
188            config.item_backpressure_threshold.max(1),
189            config.item_limit,
190            Arc::clone(&ctx.stats),
191        );
192
193        info!("Starting item processor task");
194        let processor_handle = super::spawn_item_processor_task::<S>(
195            state.clone(),
196            item_rx,
197            Arc::clone(&ctx.pipelines),
198            config.max_concurrent_pipelines,
199            Arc::clone(&ctx.stats),
200        );
201
202        let backlog_stats = Arc::clone(&ctx.stats);
203        let backlog_scheduler = Arc::clone(&ctx.scheduler);
204        let backlog_state = state.clone();
205        let backlog_task = tokio::spawn(async move {
206            loop {
207                backlog_stats.update_runtime_backlog(
208                    backlog_scheduler.pending_count(),
209                    backlog_state
210                        .parsing_responses
211                        .load(std::sync::atomic::Ordering::Acquire),
212                    backlog_state
213                        .processing_items
214                        .load(std::sync::atomic::Ordering::Acquire),
215                );
216                tokio::time::sleep(Duration::from_millis(100)).await;
217            }
218        });
219
220        #[cfg(feature = "live-stats")]
221        let mut live_stats_task: Option<(
222            oneshot::Sender<()>,
223            tokio::task::JoinHandle<()>,
224        )> = if config.live_stats && std::io::stdout().is_terminal() {
225            let (stop_tx, stop_rx) = oneshot::channel();
226            let stats_for_live = Arc::clone(&ctx.stats);
227            let interval = config.live_stats_interval;
228            let handle = tokio::spawn(async move {
229                run_live_stats(stats_for_live, interval, stop_rx).await;
230            });
231            Some((stop_tx, handle))
232        } else {
233            None
234        };
235        #[cfg(not(feature = "live-stats"))]
236        let mut live_stats_task: Option<((), ())> = None;
237
238        #[cfg(feature = "checkpoint")]
239        {
240            if let (Some(path), Some(interval)) =
241                (&checkpoint_config.path, checkpoint_config.interval)
242            {
243                let scheduler_cp = Arc::clone(&ctx.scheduler);
244                let pipelines_cp = Arc::clone(&ctx.pipelines);
245                let path_cp = path.clone();
246
247                #[cfg(feature = "cookie-store")]
248                let cookie_store_cp = self.cookie_store.clone();
249
250                #[cfg(not(feature = "cookie-store"))]
251                let _cookie_store_cp = ();
252
253                info!(
254                    "Starting periodic checkpoint task with interval: {:?}",
255                    interval
256                );
257                tokio::spawn(async move {
258                    let mut interval_timer = tokio::time::interval(interval);
259                    interval_timer.tick().await;
260                    loop {
261                        tokio::select! {
262                            _ = interval_timer.tick() => {
263                                trace!("Checkpoint timer ticked, creating snapshot");
264                                if let Ok(scheduler_checkpoint) = scheduler_cp.snapshot().await {
265                                    debug!("Scheduler snapshot created, saving checkpoint to {:?}", path_cp);
266
267                                    #[cfg(feature = "cookie-store")]
268                                    let save_result = save_checkpoint::<S>(&path_cp, scheduler_checkpoint, &pipelines_cp, &cookie_store_cp).await;
269
270                                    #[cfg(not(feature = "cookie-store"))]
271                                    let save_result = save_checkpoint::<S>(&path_cp, scheduler_checkpoint, &pipelines_cp, &()).await;
272
273                                    if let Err(e) = save_result {
274                                        error!("Periodic checkpoint save failed: {}", e);
275                                    } else {
276                                        debug!("Periodic checkpoint saved successfully to {:?}", path_cp);
277                                    }
278                                } else {
279                                    warn!("Failed to create scheduler snapshot for checkpoint");
280                                }
281                            }
282                        }
283                    }
284                });
285            }
286        }
287
288        let outcome = tokio::select! {
289            _ = tokio::signal::ctrl_c() => {
290                info!("Ctrl-C received, initiating graceful shutdown.");
291                if let Err(e) = scheduler.shutdown().await {
292                    error!("Error during scheduler shutdown: {}", e);
293                } else {
294                    info!("Scheduler shutdown initiated successfully");
295                }
296                RunOutcome::Interrupted
297            }
298            _ = async {
299                loop {
300                    if state.item_limit_reached.load(std::sync::atomic::Ordering::SeqCst) {
301                        break;
302                    }
303                    if scheduler.is_idle() && state.is_idle() {
304                        tokio::time::sleep(Duration::from_millis(25)).await;
305                        if state.item_limit_reached.load(std::sync::atomic::Ordering::SeqCst) {
306                            break;
307                        }
308                        if scheduler.is_idle() && state.is_idle() {
309                            break;
310                        }
311                    }
312                    tokio::time::sleep(Duration::from_millis(25)).await;
313                }
314            } => {
315                if state.item_limit_reached.load(std::sync::atomic::Ordering::SeqCst) {
316                    info!("Item limit reached, initiating fast shutdown.");
317                    RunOutcome::ItemLimitReached
318                } else {
319                    info!("Crawl has become idle, initiating shutdown.");
320                    RunOutcome::Idle
321                }
322            }
323        };
324
325        trace!("Closing communication channels");
326        drop(res_tx);
327        drop(item_tx);
328        backlog_task.abort();
329
330        if matches!(outcome, RunOutcome::Idle)
331            && !scheduler
332                .is_shutting_down
333                .load(std::sync::atomic::Ordering::SeqCst)
334        {
335            if let Err(e) = scheduler.shutdown().await {
336                error!("Error during scheduler shutdown: {}", e);
337            } else {
338                info!("Scheduler shutdown initiated successfully");
339            }
340        }
341
342        let mut results = Vec::new();
343
344        if matches!(outcome, RunOutcome::ItemLimitReached) {
345            downloader_handle.abort();
346            parser_handle.abort();
347            init_task.abort();
348
349            for handle in [downloader_handle, parser_handle, init_task] {
350                match handle.await {
351                    Ok(_) => {}
352                    Err(join_err) if join_err.is_cancelled() => {}
353                    Err(join_err) => error!("Task failed during fast shutdown: {}", join_err),
354                }
355            }
356
357            let grace_period = config.shutdown_grace_period;
358            match tokio::time::timeout(grace_period, processor_handle).await {
359                Ok(Ok(())) => {}
360                Ok(Err(join_err)) if join_err.is_cancelled() => {}
361                Ok(Err(join_err)) => {
362                    error!("Item processor failed during fast shutdown: {}", join_err)
363                }
364                Err(_) => {
365                    warn!(
366                        "Item processor did not finish within shutdown grace period ({}s) after item limit; aborting it.",
367                        grace_period.as_secs()
368                    );
369                }
370            }
371        } else {
372            let mut tasks = tokio::task::JoinSet::new();
373            tasks.spawn(processor_handle);
374            tasks.spawn(parser_handle);
375            tasks.spawn(downloader_handle);
376            tasks.spawn(init_task);
377            let mut remaining_tasks = 4usize;
378
379            if matches!(outcome, RunOutcome::Interrupted) {
380                let grace_period = config.shutdown_grace_period;
381                let shutdown_deadline = tokio::time::sleep(grace_period);
382                tokio::pin!(shutdown_deadline);
383
384                while remaining_tasks > 0 {
385                    tokio::select! {
386                        result = tasks.join_next() => {
387                            match result {
388                                Some(result) => {
389                                    results.push(result);
390                                    remaining_tasks = remaining_tasks.saturating_sub(1);
391                                }
392                                None => break,
393                            }
394                        }
395                        _ = tokio::signal::ctrl_c() => {
396                            warn!("Second Ctrl-C received, aborting remaining tasks immediately.");
397                            tasks.abort_all();
398                            tokio::time::sleep(Duration::from_millis(25)).await;
399                            break;
400                        }
401                        _ = &mut shutdown_deadline => {
402                            warn!(
403                                "Tasks did not complete within shutdown grace period ({}s), aborting remaining tasks and continuing with shutdown...",
404                                grace_period.as_secs()
405                            );
406                            tasks.abort_all();
407                            tokio::time::sleep(Duration::from_millis(25)).await;
408                            break;
409                        }
410                    }
411                }
412            } else {
413                while let Some(result) = tasks.join_next().await {
414                    results.push(result);
415                }
416                trace!("All tasks completed during shutdown");
417            }
418
419            for result in results {
420                if let Err(e) = result {
421                    error!("Task failed during shutdown: {}", e);
422                } else {
423                    trace!("Task completed successfully during shutdown");
424                }
425            }
426        }
427
428        #[cfg(feature = "live-stats")]
429        if let Some((stop_tx, handle)) = live_stats_task.take() {
430            let _ = stop_tx.send(());
431            let _ = handle.await;
432        }
433        #[cfg(not(feature = "live-stats"))]
434        let _ = live_stats_task.take();
435
436        #[cfg(feature = "checkpoint")]
437        {
438            if let Some(path) = &checkpoint_config.path {
439                debug!("Creating final checkpoint at {:?}", path);
440                let scheduler_checkpoint = scheduler.snapshot().await?;
441
442                #[cfg(feature = "cookie-store")]
443                let result = save_checkpoint::<S>(
444                    path,
445                    scheduler_checkpoint,
446                    &pipelines,
447                    &self.cookie_store,
448                )
449                .await;
450
451                #[cfg(not(feature = "cookie-store"))]
452                let result =
453                    save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines, &()).await;
454
455                if let Err(e) = result {
456                    error!("Final checkpoint save failed: {}", e);
457                } else {
458                    info!("Final checkpoint saved successfully to {:?}", path);
459                }
460            }
461        }
462
463        info!("Closing item pipelines...");
464        let futures: Vec<_> = pipelines.iter().map(|p| p.close()).collect();
465        join_all(futures).await;
466        info!("All item pipelines closed");
467
468        if state
469            .item_limit_reached
470            .load(std::sync::atomic::Ordering::SeqCst)
471        {
472            let skipped_requests = state
473                .shutdown_skipped_requests
474                .load(std::sync::atomic::Ordering::Acquire);
475            let dropped_items = state
476                .shutdown_dropped_items
477                .load(std::sync::atomic::Ordering::Acquire);
478            let skipped_visited_marks = state
479                .shutdown_skipped_visited_marks
480                .load(std::sync::atomic::Ordering::Acquire);
481
482            if skipped_requests > 0 || dropped_items > 0 || skipped_visited_marks > 0 {
483                info!(
484                    "Item-limit shutdown summary: skipped {} follow-up requests, dropped {} scraped items, skipped {} visited-mark updates while draining in-flight work.",
485                    skipped_requests, dropped_items, skipped_visited_marks,
486                );
487            }
488        }
489
490        if config.live_stats {
491            println!("{}\n", stats.to_live_report_string());
492        } else {
493            info!("Crawl finished successfully\n{}", stats);
494        }
495        Ok(())
496    }
497
498    /// Returns a shared handle to crawler runtime statistics.
499    pub fn stats(&self) -> Arc<StatCollector> {
500        Arc::clone(&self.stats)
501    }
502
503    /// Returns a reference to the spider state.
504    pub fn state(&self) -> &S::State {
505        &self.spider_state
506    }
507
508    /// Returns an Arc clone of the spider state.
509    pub fn state_arc(&self) -> Arc<S::State> {
510        Arc::clone(&self.spider_state)
511    }
512}
513
514fn spawn_init_task<S, I>(ctx: CrawlerContext<S, I>) -> tokio::task::JoinHandle<()>
515where
516    S: Spider<Item = I> + 'static,
517    I: ScrapedItem,
518{
519    tokio::spawn(async move {
520        let mut enqueued = 0usize;
521        let mut skipped = 0usize;
522        let mut failed = 0usize;
523        match ctx.spider.start_requests() {
524            Ok(source) => match source.into_iter() {
525                Ok(requests) => {
526                    for req_res in requests {
527                        let mut req = match req_res {
528                            Ok(req) => req,
529                            Err(e) => {
530                                warn!("Skipping invalid start URL entry: {}", e);
531                                skipped += 1;
532                                continue;
533                            }
534                        };
535
536                        req.url.set_fragment(None);
537                        match ctx.scheduler.enqueue_request(req).await {
538                            Ok(_) => {
539                                ctx.stats.increment_requests_enqueued();
540                                enqueued += 1;
541                            }
542                            Err(e) => {
543                                error!("Failed to enqueue initial request: {}", e);
544                                failed += 1;
545                            }
546                        }
547                    }
548                    info!(
549                        "Initial request bootstrap finished: {} enqueued, {} skipped, {} failed",
550                        enqueued, skipped, failed
551                    );
552                }
553                Err(e) => error!("Failed to resolve start request source: {}", e),
554            },
555            Err(e) => error!("Failed to create start request source: {}", e),
556        }
557    })
558}
559#[cfg(feature = "live-stats")]
560struct LiveStatsRenderer {
561    previous_lines: Vec<String>,
562}
563
564#[cfg(feature = "live-stats")]
565impl LiveStatsRenderer {
566    fn new() -> Self {
567        let mut out = std::io::stdout();
568        let _ = execute!(out, Hide);
569        let _ = writeln!(out);
570        let _ = out.flush();
571        Self {
572            previous_lines: Vec::new(),
573        }
574    }
575
576    fn render(&mut self, content: &str) {
577        let mut out = std::io::stdout();
578        let terminal_width = Self::terminal_width();
579        let next_lines: Vec<String> = content
580            .lines()
581            .map(|line| Self::fit_line(line, terminal_width))
582            .collect();
583        let previous_len = self.previous_lines.len();
584        let next_len = next_lines.len();
585        let max_len = previous_len.max(next_len);
586
587        if previous_len > 1 {
588            let _ = queue!(out, MoveUp((previous_len - 1) as u16));
589        }
590        let _ = queue!(out, MoveToColumn(0));
591
592        for line_idx in 0..max_len {
593            let _ = queue!(out, MoveToColumn(0), Clear(ClearType::CurrentLine));
594
595            if let Some(line) = next_lines.get(line_idx) {
596                let _ = write!(out, "{}", line);
597            }
598
599            if line_idx + 1 < max_len {
600                let _ = writeln!(out);
601            }
602        }
603
604        let _ = out.flush();
605        self.previous_lines = next_lines;
606    }
607
608    fn terminal_width() -> usize {
609        size()
610            .map(|(width, _)| usize::from(width.max(1)))
611            .unwrap_or(usize::MAX)
612    }
613
614    fn fit_line(line: &str, width: usize) -> String {
615        let safe_width = Self::safe_terminal_width(width);
616
617        if line.starts_with("current  : ") {
618            return Self::truncate_with_ellipsis(line, safe_width);
619        }
620
621        Self::truncate_to_width(line, safe_width)
622    }
623
624    fn safe_terminal_width(width: usize) -> usize {
625        if width == usize::MAX {
626            return width;
627        }
628
629        width.saturating_sub(1)
630    }
631
632    fn truncate_to_width(line: &str, width: usize) -> String {
633        if width == usize::MAX || width == 0 {
634            return line.to_owned();
635        }
636
637        let mut visible_width = 0;
638        let mut truncated = String::new();
639
640        for ch in line.chars() {
641            let ch_width = ch.width().unwrap_or(0);
642            if visible_width + ch_width > width {
643                break;
644            }
645
646            truncated.push(ch);
647            visible_width += ch_width;
648        }
649
650        truncated
651    }
652
653    fn truncate_with_ellipsis(line: &str, width: usize) -> String {
654        if width == usize::MAX || width == 0 {
655            return line.to_owned();
656        }
657
658        if Self::display_width(line) <= width {
659            return line.to_owned();
660        }
661
662        if width <= 3 {
663            return ".".repeat(width);
664        }
665
666        let visible = Self::truncate_to_width(line, width - 3);
667        format!("{visible}...")
668    }
669
670    fn display_width(line: &str) -> usize {
671        line.chars().map(|ch| ch.width().unwrap_or(0)).sum()
672    }
673
674    fn finish(self) {
675        let mut out = std::io::stdout();
676        self.clear_previous(&mut out);
677        let _ = execute!(out, MoveToColumn(0), Clear(ClearType::CurrentLine), Show);
678        let _ = out.flush();
679    }
680
681    fn clear_previous(&self, out: &mut std::io::Stdout) {
682        if self.previous_lines.is_empty() {
683            return;
684        }
685        let lines = self.previous_lines.len();
686        let _ = queue!(out, MoveToColumn(0));
687        if lines > 1 {
688            let _ = queue!(out, MoveUp((lines - 1) as u16));
689        }
690        for line_idx in 0..lines {
691            let _ = queue!(out, MoveToColumn(0), Clear(ClearType::CurrentLine));
692            if line_idx + 1 < lines {
693                let _ = writeln!(out);
694            }
695        }
696        if lines > 1 {
697            let _ = queue!(out, MoveUp((lines - 1) as u16));
698        }
699    }
700}
701
702#[cfg(feature = "live-stats")]
703async fn run_live_stats(
704    stats: Arc<StatCollector>,
705    interval: Duration,
706    mut stop_rx: oneshot::Receiver<()>,
707) {
708    let mut ticker = tokio::time::interval(interval);
709    ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
710    let mut renderer = LiveStatsRenderer::new();
711
712    loop {
713        tokio::select! {
714            _ = ticker.tick() => {
715                renderer.render(&stats.to_live_report_string());
716            }
717            _ = &mut stop_rx => {
718                break;
719            }
720        }
721    }
722
723    renderer.finish();
724}