1use crate::Downloader;
4use crate::config::CrawlerConfig;
5use crate::engine::CrawlerContext;
6use crate::scheduler::Scheduler;
7use crate::spider::Spider;
8use crate::state::CrawlerState;
9use crate::stats::StatCollector;
10use anyhow::Result;
11#[cfg(feature = "live-stats")]
12use crossterm::{
13 cursor::{Hide, MoveToColumn, MoveUp, Show},
14 execute, queue,
15 terminal::{Clear, ClearType, size},
16};
17use futures_util::future::join_all;
18use kanal::{AsyncReceiver, bounded_async};
19use log::{debug, error, info, trace, warn};
20use spider_middleware::middleware::Middleware;
21use spider_pipeline::pipeline::Pipeline;
22use spider_util::error::SpiderError;
23use spider_util::item::ScrapedItem;
24use spider_util::request::Request;
25#[cfg(feature = "live-stats")]
26use unicode_width::UnicodeWidthChar;
27
28#[cfg(feature = "checkpoint")]
29use crate::checkpoint::save_checkpoint;
30#[cfg(feature = "checkpoint")]
31use crate::config::CheckpointConfig;
32
33#[cfg(feature = "live-stats")]
34use std::io::{IsTerminal, Write};
35use std::sync::Arc;
36use std::time::Duration;
37
38#[cfg(feature = "cookie-store")]
39use tokio::sync::RwLock;
40#[cfg(feature = "live-stats")]
41use tokio::sync::oneshot;
42#[cfg(feature = "live-stats")]
43use tokio::time::MissedTickBehavior;
44
45#[cfg(feature = "cookie-store")]
46use cookie_store::CookieStore;
47
48enum RunOutcome {
49 Interrupted,
50 Idle,
51 ItemLimitReached,
52}
53
54pub struct Crawler<S: Spider, C> {
56 scheduler: Arc<Scheduler>,
57 req_rx: AsyncReceiver<Request>,
58 stats: Arc<StatCollector>,
59 downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
60 middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
61 spider: Arc<S>,
62 spider_state: Arc<S::State>,
63 pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
64 config: CrawlerConfig,
65 #[cfg(feature = "checkpoint")]
66 checkpoint_config: CheckpointConfig,
67 #[cfg(feature = "cookie-store")]
68 pub cookie_store: Arc<RwLock<CookieStore>>,
69}
70
71impl<S, C> Crawler<S, C>
72where
73 S: Spider + 'static,
74 S::Item: ScrapedItem,
75 C: Send + Sync + Clone + 'static,
76{
77 #[allow(clippy::too_many_arguments)]
78 pub(crate) fn new(
79 scheduler: Arc<Scheduler>,
80 req_rx: AsyncReceiver<Request>,
81 downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
82 middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
83 spider: S,
84 pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
85 config: CrawlerConfig,
86 #[cfg(feature = "checkpoint")] checkpoint_config: CheckpointConfig,
87 stats: Arc<StatCollector>,
88 #[cfg(feature = "cookie-store")] cookie_store: Arc<tokio::sync::RwLock<CookieStore>>,
89 ) -> Self {
90 Crawler {
91 scheduler,
92 req_rx,
93 stats,
94 downloader,
95 middlewares,
96 spider: Arc::new(spider),
97 spider_state: Arc::new(S::State::default()),
98 pipelines,
99 config,
100 #[cfg(feature = "checkpoint")]
101 checkpoint_config,
102 #[cfg(feature = "cookie-store")]
103 cookie_store,
104 }
105 }
106
107 pub async fn start_crawl(self) -> Result<(), SpiderError> {
108 info!(
109 "Crawler starting crawl with configuration: max_concurrent_downloads={}, parser_workers={}, max_concurrent_pipelines={}, item_limit={:?}",
110 self.config.max_concurrent_downloads,
111 self.config.parser_workers,
112 self.config.max_concurrent_pipelines,
113 self.config.item_limit
114 );
115
116 let Crawler {
117 scheduler,
118 req_rx,
119 stats,
120 downloader,
121 middlewares,
122 spider,
123 spider_state,
124 pipelines,
125 config,
126 #[cfg(feature = "checkpoint")]
127 checkpoint_config,
128 #[cfg(feature = "cookie-store")]
129 cookie_store: _,
130 } = self;
131
132 let state = CrawlerState::new();
133 let pipelines = Arc::new(pipelines);
134
135 let ctx = CrawlerContext::new(
137 Arc::clone(&scheduler),
138 Arc::clone(&stats),
139 Arc::clone(&spider),
140 Arc::clone(&spider_state),
141 Arc::clone(&pipelines),
142 );
143
144 let channel_capacity = std::cmp::max(
145 config.max_concurrent_downloads * 3,
146 config.parser_workers * config.max_concurrent_pipelines * 2,
147 )
148 .max(config.channel_capacity);
149
150 trace!(
151 "Creating communication channels with capacity: {}",
152 channel_capacity
153 );
154 let (res_tx, res_rx) = bounded_async(channel_capacity);
155 let (item_tx, item_rx) = bounded_async(channel_capacity);
156
157 info!("Starting initial request bootstrap task");
158 let init_task = spawn_init_task(ctx.clone());
159
160 debug!("Initializing middleware manager");
161 let middlewares = super::SharedMiddlewareManager::new(middlewares);
162
163 info!("Starting downloader task");
164 let downloader_handle = super::spawn_downloader_task::<S, C>(
165 Arc::clone(&ctx.scheduler),
166 req_rx,
167 downloader,
168 middlewares,
169 state.clone(),
170 res_tx.clone(),
171 config.max_concurrent_downloads,
172 config.response_backpressure_threshold.max(1),
173 config.retry_release_permit,
174 Arc::clone(&ctx.stats),
175 );
176
177 info!("Starting parser task");
178 let parser_handle = super::spawn_parser_task::<S>(
179 Arc::clone(&ctx.scheduler),
180 Arc::clone(&ctx.spider),
181 Arc::clone(&ctx.spider_state),
182 state.clone(),
183 res_rx,
184 item_tx.clone(),
185 config.parser_workers,
186 config.discovery.clone(),
187 config.output_batch_size.max(1),
188 config.item_backpressure_threshold.max(1),
189 config.item_limit,
190 Arc::clone(&ctx.stats),
191 );
192
193 info!("Starting item processor task");
194 let processor_handle = super::spawn_item_processor_task::<S>(
195 state.clone(),
196 item_rx,
197 Arc::clone(&ctx.pipelines),
198 config.max_concurrent_pipelines,
199 Arc::clone(&ctx.stats),
200 );
201
202 let backlog_stats = Arc::clone(&ctx.stats);
203 let backlog_scheduler = Arc::clone(&ctx.scheduler);
204 let backlog_state = state.clone();
205 let backlog_task = tokio::spawn(async move {
206 loop {
207 backlog_stats.update_runtime_backlog(
208 backlog_scheduler.pending_count(),
209 backlog_state
210 .parsing_responses
211 .load(std::sync::atomic::Ordering::Acquire),
212 backlog_state
213 .processing_items
214 .load(std::sync::atomic::Ordering::Acquire),
215 );
216 tokio::time::sleep(Duration::from_millis(100)).await;
217 }
218 });
219
220 #[cfg(feature = "live-stats")]
221 let mut live_stats_task: Option<(
222 oneshot::Sender<()>,
223 tokio::task::JoinHandle<()>,
224 )> = if config.live_stats && std::io::stdout().is_terminal() {
225 let (stop_tx, stop_rx) = oneshot::channel();
226 let stats_for_live = Arc::clone(&ctx.stats);
227 let interval = config.live_stats_interval;
228 let handle = tokio::spawn(async move {
229 run_live_stats(stats_for_live, interval, stop_rx).await;
230 });
231 Some((stop_tx, handle))
232 } else {
233 None
234 };
235 #[cfg(not(feature = "live-stats"))]
236 let mut live_stats_task: Option<((), ())> = None;
237
238 #[cfg(feature = "checkpoint")]
239 {
240 if let (Some(path), Some(interval)) =
241 (&checkpoint_config.path, checkpoint_config.interval)
242 {
243 let scheduler_cp = Arc::clone(&ctx.scheduler);
244 let pipelines_cp = Arc::clone(&ctx.pipelines);
245 let path_cp = path.clone();
246
247 #[cfg(feature = "cookie-store")]
248 let cookie_store_cp = self.cookie_store.clone();
249
250 #[cfg(not(feature = "cookie-store"))]
251 let _cookie_store_cp = ();
252
253 info!(
254 "Starting periodic checkpoint task with interval: {:?}",
255 interval
256 );
257 tokio::spawn(async move {
258 let mut interval_timer = tokio::time::interval(interval);
259 interval_timer.tick().await;
260 loop {
261 tokio::select! {
262 _ = interval_timer.tick() => {
263 trace!("Checkpoint timer ticked, creating snapshot");
264 if let Ok(scheduler_checkpoint) = scheduler_cp.snapshot().await {
265 debug!("Scheduler snapshot created, saving checkpoint to {:?}", path_cp);
266
267 #[cfg(feature = "cookie-store")]
268 let save_result = save_checkpoint::<S>(&path_cp, scheduler_checkpoint, &pipelines_cp, &cookie_store_cp).await;
269
270 #[cfg(not(feature = "cookie-store"))]
271 let save_result = save_checkpoint::<S>(&path_cp, scheduler_checkpoint, &pipelines_cp, &()).await;
272
273 if let Err(e) = save_result {
274 error!("Periodic checkpoint save failed: {}", e);
275 } else {
276 debug!("Periodic checkpoint saved successfully to {:?}", path_cp);
277 }
278 } else {
279 warn!("Failed to create scheduler snapshot for checkpoint");
280 }
281 }
282 }
283 }
284 });
285 }
286 }
287
288 let outcome = tokio::select! {
289 _ = tokio::signal::ctrl_c() => {
290 info!("Ctrl-C received, initiating graceful shutdown.");
291 if let Err(e) = scheduler.shutdown().await {
292 error!("Error during scheduler shutdown: {}", e);
293 } else {
294 info!("Scheduler shutdown initiated successfully");
295 }
296 RunOutcome::Interrupted
297 }
298 _ = async {
299 loop {
300 if state.item_limit_reached.load(std::sync::atomic::Ordering::SeqCst) {
301 break;
302 }
303 if scheduler.is_idle() && state.is_idle() {
304 tokio::time::sleep(Duration::from_millis(25)).await;
305 if state.item_limit_reached.load(std::sync::atomic::Ordering::SeqCst) {
306 break;
307 }
308 if scheduler.is_idle() && state.is_idle() {
309 break;
310 }
311 }
312 tokio::time::sleep(Duration::from_millis(25)).await;
313 }
314 } => {
315 if state.item_limit_reached.load(std::sync::atomic::Ordering::SeqCst) {
316 info!("Item limit reached, initiating fast shutdown.");
317 RunOutcome::ItemLimitReached
318 } else {
319 info!("Crawl has become idle, initiating shutdown.");
320 RunOutcome::Idle
321 }
322 }
323 };
324
325 trace!("Closing communication channels");
326 drop(res_tx);
327 drop(item_tx);
328 backlog_task.abort();
329
330 if matches!(outcome, RunOutcome::Idle)
331 && !scheduler
332 .is_shutting_down
333 .load(std::sync::atomic::Ordering::SeqCst)
334 {
335 if let Err(e) = scheduler.shutdown().await {
336 error!("Error during scheduler shutdown: {}", e);
337 } else {
338 info!("Scheduler shutdown initiated successfully");
339 }
340 }
341
342 let mut results = Vec::new();
343
344 if matches!(outcome, RunOutcome::ItemLimitReached) {
345 downloader_handle.abort();
346 parser_handle.abort();
347 init_task.abort();
348
349 for handle in [downloader_handle, parser_handle, init_task] {
350 match handle.await {
351 Ok(_) => {}
352 Err(join_err) if join_err.is_cancelled() => {}
353 Err(join_err) => error!("Task failed during fast shutdown: {}", join_err),
354 }
355 }
356
357 let grace_period = config.shutdown_grace_period;
358 match tokio::time::timeout(grace_period, processor_handle).await {
359 Ok(Ok(())) => {}
360 Ok(Err(join_err)) if join_err.is_cancelled() => {}
361 Ok(Err(join_err)) => {
362 error!("Item processor failed during fast shutdown: {}", join_err)
363 }
364 Err(_) => {
365 warn!(
366 "Item processor did not finish within shutdown grace period ({}s) after item limit; aborting it.",
367 grace_period.as_secs()
368 );
369 }
370 }
371 } else {
372 let mut tasks = tokio::task::JoinSet::new();
373 tasks.spawn(processor_handle);
374 tasks.spawn(parser_handle);
375 tasks.spawn(downloader_handle);
376 tasks.spawn(init_task);
377 let mut remaining_tasks = 4usize;
378
379 if matches!(outcome, RunOutcome::Interrupted) {
380 let grace_period = config.shutdown_grace_period;
381 let shutdown_deadline = tokio::time::sleep(grace_period);
382 tokio::pin!(shutdown_deadline);
383
384 while remaining_tasks > 0 {
385 tokio::select! {
386 result = tasks.join_next() => {
387 match result {
388 Some(result) => {
389 results.push(result);
390 remaining_tasks = remaining_tasks.saturating_sub(1);
391 }
392 None => break,
393 }
394 }
395 _ = tokio::signal::ctrl_c() => {
396 warn!("Second Ctrl-C received, aborting remaining tasks immediately.");
397 tasks.abort_all();
398 tokio::time::sleep(Duration::from_millis(25)).await;
399 break;
400 }
401 _ = &mut shutdown_deadline => {
402 warn!(
403 "Tasks did not complete within shutdown grace period ({}s), aborting remaining tasks and continuing with shutdown...",
404 grace_period.as_secs()
405 );
406 tasks.abort_all();
407 tokio::time::sleep(Duration::from_millis(25)).await;
408 break;
409 }
410 }
411 }
412 } else {
413 while let Some(result) = tasks.join_next().await {
414 results.push(result);
415 }
416 trace!("All tasks completed during shutdown");
417 }
418
419 for result in results {
420 if let Err(e) = result {
421 error!("Task failed during shutdown: {}", e);
422 } else {
423 trace!("Task completed successfully during shutdown");
424 }
425 }
426 }
427
428 #[cfg(feature = "live-stats")]
429 if let Some((stop_tx, handle)) = live_stats_task.take() {
430 let _ = stop_tx.send(());
431 let _ = handle.await;
432 }
433 #[cfg(not(feature = "live-stats"))]
434 let _ = live_stats_task.take();
435
436 #[cfg(feature = "checkpoint")]
437 {
438 if let Some(path) = &checkpoint_config.path {
439 debug!("Creating final checkpoint at {:?}", path);
440 let scheduler_checkpoint = scheduler.snapshot().await?;
441
442 #[cfg(feature = "cookie-store")]
443 let result = save_checkpoint::<S>(
444 path,
445 scheduler_checkpoint,
446 &pipelines,
447 &self.cookie_store,
448 )
449 .await;
450
451 #[cfg(not(feature = "cookie-store"))]
452 let result =
453 save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines, &()).await;
454
455 if let Err(e) = result {
456 error!("Final checkpoint save failed: {}", e);
457 } else {
458 info!("Final checkpoint saved successfully to {:?}", path);
459 }
460 }
461 }
462
463 info!("Closing item pipelines...");
464 let futures: Vec<_> = pipelines.iter().map(|p| p.close()).collect();
465 join_all(futures).await;
466 info!("All item pipelines closed");
467
468 if state
469 .item_limit_reached
470 .load(std::sync::atomic::Ordering::SeqCst)
471 {
472 let skipped_requests = state
473 .shutdown_skipped_requests
474 .load(std::sync::atomic::Ordering::Acquire);
475 let dropped_items = state
476 .shutdown_dropped_items
477 .load(std::sync::atomic::Ordering::Acquire);
478 let skipped_visited_marks = state
479 .shutdown_skipped_visited_marks
480 .load(std::sync::atomic::Ordering::Acquire);
481
482 if skipped_requests > 0 || dropped_items > 0 || skipped_visited_marks > 0 {
483 info!(
484 "Item-limit shutdown summary: skipped {} follow-up requests, dropped {} scraped items, skipped {} visited-mark updates while draining in-flight work.",
485 skipped_requests, dropped_items, skipped_visited_marks,
486 );
487 }
488 }
489
490 if config.live_stats {
491 println!("{}\n", stats.to_live_report_string());
492 } else {
493 info!("Crawl finished successfully\n{}", stats);
494 }
495 Ok(())
496 }
497
498 pub fn stats(&self) -> Arc<StatCollector> {
500 Arc::clone(&self.stats)
501 }
502
503 pub fn state(&self) -> &S::State {
505 &self.spider_state
506 }
507
508 pub fn state_arc(&self) -> Arc<S::State> {
510 Arc::clone(&self.spider_state)
511 }
512}
513
514fn spawn_init_task<S, I>(ctx: CrawlerContext<S, I>) -> tokio::task::JoinHandle<()>
515where
516 S: Spider<Item = I> + 'static,
517 I: ScrapedItem,
518{
519 tokio::spawn(async move {
520 let mut enqueued = 0usize;
521 let mut skipped = 0usize;
522 let mut failed = 0usize;
523 match ctx.spider.start_requests() {
524 Ok(source) => match source.into_iter() {
525 Ok(requests) => {
526 for req_res in requests {
527 let mut req = match req_res {
528 Ok(req) => req,
529 Err(e) => {
530 warn!("Skipping invalid start URL entry: {}", e);
531 skipped += 1;
532 continue;
533 }
534 };
535
536 req.url.set_fragment(None);
537 match ctx.scheduler.enqueue_request(req).await {
538 Ok(_) => {
539 ctx.stats.increment_requests_enqueued();
540 enqueued += 1;
541 }
542 Err(e) => {
543 error!("Failed to enqueue initial request: {}", e);
544 failed += 1;
545 }
546 }
547 }
548 info!(
549 "Initial request bootstrap finished: {} enqueued, {} skipped, {} failed",
550 enqueued, skipped, failed
551 );
552 }
553 Err(e) => error!("Failed to resolve start request source: {}", e),
554 },
555 Err(e) => error!("Failed to create start request source: {}", e),
556 }
557 })
558}
559#[cfg(feature = "live-stats")]
560struct LiveStatsRenderer {
561 previous_lines: Vec<String>,
562}
563
564#[cfg(feature = "live-stats")]
565impl LiveStatsRenderer {
566 fn new() -> Self {
567 let mut out = std::io::stdout();
568 let _ = execute!(out, Hide);
569 let _ = writeln!(out);
570 let _ = out.flush();
571 Self {
572 previous_lines: Vec::new(),
573 }
574 }
575
576 fn render(&mut self, content: &str) {
577 let mut out = std::io::stdout();
578 let terminal_width = Self::terminal_width();
579 let next_lines: Vec<String> = content
580 .lines()
581 .map(|line| Self::fit_line(line, terminal_width))
582 .collect();
583 let previous_len = self.previous_lines.len();
584 let next_len = next_lines.len();
585 let max_len = previous_len.max(next_len);
586
587 if previous_len > 1 {
588 let _ = queue!(out, MoveUp((previous_len - 1) as u16));
589 }
590 let _ = queue!(out, MoveToColumn(0));
591
592 for line_idx in 0..max_len {
593 let _ = queue!(out, MoveToColumn(0), Clear(ClearType::CurrentLine));
594
595 if let Some(line) = next_lines.get(line_idx) {
596 let _ = write!(out, "{}", line);
597 }
598
599 if line_idx + 1 < max_len {
600 let _ = writeln!(out);
601 }
602 }
603
604 let _ = out.flush();
605 self.previous_lines = next_lines;
606 }
607
608 fn terminal_width() -> usize {
609 size()
610 .map(|(width, _)| usize::from(width.max(1)))
611 .unwrap_or(usize::MAX)
612 }
613
614 fn fit_line(line: &str, width: usize) -> String {
615 let safe_width = Self::safe_terminal_width(width);
616
617 if line.starts_with("current : ") {
618 return Self::truncate_with_ellipsis(line, safe_width);
619 }
620
621 Self::truncate_to_width(line, safe_width)
622 }
623
624 fn safe_terminal_width(width: usize) -> usize {
625 if width == usize::MAX {
626 return width;
627 }
628
629 width.saturating_sub(1)
630 }
631
632 fn truncate_to_width(line: &str, width: usize) -> String {
633 if width == usize::MAX || width == 0 {
634 return line.to_owned();
635 }
636
637 let mut visible_width = 0;
638 let mut truncated = String::new();
639
640 for ch in line.chars() {
641 let ch_width = ch.width().unwrap_or(0);
642 if visible_width + ch_width > width {
643 break;
644 }
645
646 truncated.push(ch);
647 visible_width += ch_width;
648 }
649
650 truncated
651 }
652
653 fn truncate_with_ellipsis(line: &str, width: usize) -> String {
654 if width == usize::MAX || width == 0 {
655 return line.to_owned();
656 }
657
658 if Self::display_width(line) <= width {
659 return line.to_owned();
660 }
661
662 if width <= 3 {
663 return ".".repeat(width);
664 }
665
666 let visible = Self::truncate_to_width(line, width - 3);
667 format!("{visible}...")
668 }
669
670 fn display_width(line: &str) -> usize {
671 line.chars().map(|ch| ch.width().unwrap_or(0)).sum()
672 }
673
674 fn finish(self) {
675 let mut out = std::io::stdout();
676 self.clear_previous(&mut out);
677 let _ = execute!(out, MoveToColumn(0), Clear(ClearType::CurrentLine), Show);
678 let _ = out.flush();
679 }
680
681 fn clear_previous(&self, out: &mut std::io::Stdout) {
682 if self.previous_lines.is_empty() {
683 return;
684 }
685 let lines = self.previous_lines.len();
686 let _ = queue!(out, MoveToColumn(0));
687 if lines > 1 {
688 let _ = queue!(out, MoveUp((lines - 1) as u16));
689 }
690 for line_idx in 0..lines {
691 let _ = queue!(out, MoveToColumn(0), Clear(ClearType::CurrentLine));
692 if line_idx + 1 < lines {
693 let _ = writeln!(out);
694 }
695 }
696 if lines > 1 {
697 let _ = queue!(out, MoveUp((lines - 1) as u16));
698 }
699 }
700}
701
702#[cfg(feature = "live-stats")]
703async fn run_live_stats(
704 stats: Arc<StatCollector>,
705 interval: Duration,
706 mut stop_rx: oneshot::Receiver<()>,
707) {
708 let mut ticker = tokio::time::interval(interval);
709 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
710 let mut renderer = LiveStatsRenderer::new();
711
712 loop {
713 tokio::select! {
714 _ = ticker.tick() => {
715 renderer.render(&stats.to_live_report_string());
716 }
717 _ = &mut stop_rx => {
718 break;
719 }
720 }
721 }
722
723 renderer.finish();
724}