1use crate::Downloader;
4use crate::config::CrawlerConfig;
5use crate::engine::CrawlerContext;
6use crate::scheduler::Scheduler;
7use crate::spider::Spider;
8use crate::state::CrawlerState;
9use crate::stats::StatCollector;
10use anyhow::Result;
11#[cfg(feature = "live-stats")]
12use crossterm::{
13 cursor::{Hide, MoveToColumn, MoveUp, Show},
14 execute, queue,
15 terminal::{Clear, ClearType, size},
16};
17use futures_util::future::join_all;
18use kanal::{AsyncReceiver, bounded_async};
19use log::{debug, error, info, trace, warn};
20use spider_middleware::middleware::Middleware;
21use spider_pipeline::pipeline::Pipeline;
22use spider_util::error::SpiderError;
23use spider_util::item::ScrapedItem;
24use spider_util::request::Request;
25#[cfg(feature = "live-stats")]
26use unicode_width::UnicodeWidthChar;
27
28#[cfg(feature = "checkpoint")]
29use crate::checkpoint::save_checkpoint;
30#[cfg(feature = "checkpoint")]
31use crate::config::CheckpointConfig;
32
33#[cfg(feature = "live-stats")]
34use std::io::{IsTerminal, Write};
35use std::sync::Arc;
36use std::time::Duration;
37
38#[cfg(feature = "cookie-store")]
39use tokio::sync::RwLock;
40#[cfg(feature = "live-stats")]
41use tokio::sync::oneshot;
42#[cfg(feature = "live-stats")]
43use tokio::time::MissedTickBehavior;
44
45#[cfg(feature = "cookie-store")]
46use cookie_store::CookieStore;
47
48pub struct Crawler<S: Spider, C> {
50 scheduler: Arc<Scheduler>,
51 req_rx: AsyncReceiver<Request>,
52 stats: Arc<StatCollector>,
53 downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
54 middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
55 spider: Arc<S>,
56 spider_state: Arc<S::State>,
57 pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
58 config: CrawlerConfig,
59 #[cfg(feature = "checkpoint")]
60 checkpoint_config: CheckpointConfig,
61 #[cfg(feature = "cookie-store")]
62 pub cookie_store: Arc<RwLock<CookieStore>>,
63}
64
65impl<S, C> Crawler<S, C>
66where
67 S: Spider + 'static,
68 S::Item: ScrapedItem,
69 C: Send + Sync + Clone + 'static,
70{
71 #[allow(clippy::too_many_arguments)]
72 pub(crate) fn new(
73 scheduler: Arc<Scheduler>,
74 req_rx: AsyncReceiver<Request>,
75 downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
76 middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
77 spider: S,
78 pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
79 config: CrawlerConfig,
80 #[cfg(feature = "checkpoint")] checkpoint_config: CheckpointConfig,
81 stats: Arc<StatCollector>,
82 #[cfg(feature = "cookie-store")] cookie_store: Arc<tokio::sync::RwLock<CookieStore>>,
83 ) -> Self {
84 Crawler {
85 scheduler,
86 req_rx,
87 stats,
88 downloader,
89 middlewares,
90 spider: Arc::new(spider),
91 spider_state: Arc::new(S::State::default()),
92 pipelines,
93 config,
94 #[cfg(feature = "checkpoint")]
95 checkpoint_config,
96 #[cfg(feature = "cookie-store")]
97 cookie_store,
98 }
99 }
100
101 pub async fn start_crawl(self) -> Result<(), SpiderError> {
102 info!(
103 "Crawler starting crawl with configuration: max_concurrent_downloads={}, parser_workers={}, max_concurrent_pipelines={}, item_limit={:?}",
104 self.config.max_concurrent_downloads,
105 self.config.parser_workers,
106 self.config.max_concurrent_pipelines,
107 self.config.item_limit
108 );
109
110 let Crawler {
111 scheduler,
112 req_rx,
113 stats,
114 downloader,
115 middlewares,
116 spider,
117 spider_state,
118 pipelines,
119 config,
120 #[cfg(feature = "checkpoint")]
121 checkpoint_config,
122 #[cfg(feature = "cookie-store")]
123 cookie_store: _,
124 } = self;
125
126 let state = CrawlerState::new();
127 let pipelines = Arc::new(pipelines);
128
129 let ctx = CrawlerContext::new(
131 Arc::clone(&scheduler),
132 Arc::clone(&stats),
133 Arc::clone(&spider),
134 Arc::clone(&spider_state),
135 Arc::clone(&pipelines),
136 );
137
138 let channel_capacity = std::cmp::max(
139 config.max_concurrent_downloads * 3,
140 config.parser_workers * config.max_concurrent_pipelines * 2,
141 )
142 .max(config.channel_capacity);
143
144 trace!(
145 "Creating communication channels with capacity: {}",
146 channel_capacity
147 );
148 let (res_tx, res_rx) = bounded_async(channel_capacity);
149 let (item_tx, item_rx) = bounded_async(channel_capacity);
150
151 info!("Starting initial request bootstrap task");
152 let init_task = spawn_init_task(ctx.clone());
153
154 debug!("Initializing middleware manager");
155 let middlewares = super::SharedMiddlewareManager::new(middlewares);
156
157 info!("Starting downloader task");
158 let downloader_handle = super::spawn_downloader_task::<S, C>(
159 Arc::clone(&ctx.scheduler),
160 req_rx,
161 downloader,
162 middlewares,
163 state.clone(),
164 res_tx.clone(),
165 config.max_concurrent_downloads,
166 config.response_backpressure_threshold.max(1),
167 config.retry_release_permit,
168 Arc::clone(&ctx.stats),
169 );
170
171 info!("Starting parser task");
172 let parser_handle = super::spawn_parser_task::<S>(
173 Arc::clone(&ctx.scheduler),
174 Arc::clone(&ctx.spider),
175 Arc::clone(&ctx.spider_state),
176 state.clone(),
177 res_rx,
178 item_tx.clone(),
179 config.parser_workers,
180 config.output_batch_size.max(1),
181 config.item_backpressure_threshold.max(1),
182 config.item_limit,
183 Arc::clone(&ctx.stats),
184 );
185
186 info!("Starting item processor task");
187 let processor_handle = super::spawn_item_processor_task::<S>(
188 state.clone(),
189 item_rx,
190 Arc::clone(&ctx.pipelines),
191 config.max_concurrent_pipelines,
192 Arc::clone(&ctx.stats),
193 );
194
195 #[cfg(feature = "live-stats")]
196 let mut live_stats_task: Option<(
197 oneshot::Sender<()>,
198 tokio::task::JoinHandle<()>,
199 )> = if config.live_stats && std::io::stdout().is_terminal() {
200 let (stop_tx, stop_rx) = oneshot::channel();
201 let stats_for_live = Arc::clone(&ctx.stats);
202 let interval = config.live_stats_interval;
203 let handle = tokio::spawn(async move {
204 run_live_stats(stats_for_live, interval, stop_rx).await;
205 });
206 Some((stop_tx, handle))
207 } else {
208 None
209 };
210 #[cfg(not(feature = "live-stats"))]
211 let mut live_stats_task: Option<((), ())> = None;
212
213 #[cfg(feature = "checkpoint")]
214 {
215 if let (Some(path), Some(interval)) =
216 (&checkpoint_config.path, checkpoint_config.interval)
217 {
218 let scheduler_cp = Arc::clone(&ctx.scheduler);
219 let pipelines_cp = Arc::clone(&ctx.pipelines);
220 let path_cp = path.clone();
221
222 #[cfg(feature = "cookie-store")]
223 let cookie_store_cp = self.cookie_store.clone();
224
225 #[cfg(not(feature = "cookie-store"))]
226 let _cookie_store_cp = ();
227
228 info!(
229 "Starting periodic checkpoint task with interval: {:?}",
230 interval
231 );
232 tokio::spawn(async move {
233 let mut interval_timer = tokio::time::interval(interval);
234 interval_timer.tick().await;
235 loop {
236 tokio::select! {
237 _ = interval_timer.tick() => {
238 trace!("Checkpoint timer ticked, creating snapshot");
239 if let Ok(scheduler_checkpoint) = scheduler_cp.snapshot().await {
240 debug!("Scheduler snapshot created, saving checkpoint to {:?}", path_cp);
241
242 #[cfg(feature = "cookie-store")]
243 let save_result = save_checkpoint::<S>(&path_cp, scheduler_checkpoint, &pipelines_cp, &cookie_store_cp).await;
244
245 #[cfg(not(feature = "cookie-store"))]
246 let save_result = save_checkpoint::<S>(&path_cp, scheduler_checkpoint, &pipelines_cp, &()).await;
247
248 if let Err(e) = save_result {
249 error!("Periodic checkpoint save failed: {}", e);
250 } else {
251 debug!("Periodic checkpoint saved successfully to {:?}", path_cp);
252 }
253 } else {
254 warn!("Failed to create scheduler snapshot for checkpoint");
255 }
256 }
257 }
258 }
259 });
260 }
261 }
262
263 let interrupted = tokio::select! {
264 _ = tokio::signal::ctrl_c() => {
265 info!("Ctrl-C received, initiating graceful shutdown.");
266 if let Err(e) = scheduler.shutdown().await {
267 error!("Error during scheduler shutdown: {}", e);
268 } else {
269 info!("Scheduler shutdown initiated successfully");
270 }
271 true
272 }
273 _ = async {
274 loop {
275 if scheduler.is_idle() && state.is_idle() {
276 tokio::time::sleep(Duration::from_millis(25)).await;
277 if scheduler.is_idle() && state.is_idle() {
278 break;
279 }
280 }
281 tokio::time::sleep(Duration::from_millis(25)).await;
282 }
283 } => {
284 info!("Crawl has become idle, initiating shutdown.");
285 false
286 }
287 };
288
289 trace!("Closing communication channels");
290 drop(res_tx);
291 drop(item_tx);
292
293 if !interrupted {
294 if let Err(e) = scheduler.shutdown().await {
295 error!("Error during scheduler shutdown: {}", e);
296 } else {
297 info!("Scheduler shutdown initiated successfully");
298 }
299 }
300
301 let mut tasks = tokio::task::JoinSet::new();
302 tasks.spawn(processor_handle);
303 tasks.spawn(parser_handle);
304 tasks.spawn(downloader_handle);
305 tasks.spawn(init_task);
306 let mut results = Vec::new();
307 let mut remaining_tasks = 4usize;
308
309 if interrupted {
310 let grace_period = config.shutdown_grace_period;
311 let shutdown_deadline = tokio::time::sleep(grace_period);
312 tokio::pin!(shutdown_deadline);
313
314 while remaining_tasks > 0 {
315 tokio::select! {
316 result = tasks.join_next() => {
317 match result {
318 Some(result) => {
319 results.push(result);
320 remaining_tasks = remaining_tasks.saturating_sub(1);
321 }
322 None => break,
323 }
324 }
325 _ = tokio::signal::ctrl_c() => {
326 warn!("Second Ctrl-C received, aborting remaining tasks immediately.");
327 tasks.abort_all();
328 tokio::time::sleep(Duration::from_millis(25)).await;
329 break;
330 }
331 _ = &mut shutdown_deadline => {
332 warn!(
333 "Tasks did not complete within shutdown grace period ({}s), aborting remaining tasks and continuing with shutdown...",
334 grace_period.as_secs()
335 );
336 tasks.abort_all();
337 tokio::time::sleep(Duration::from_millis(25)).await;
338 break;
339 }
340 }
341 }
342 } else {
343 while let Some(result) = tasks.join_next().await {
344 results.push(result);
345 }
346 trace!("All tasks completed during shutdown");
347 }
348
349 for result in results {
350 if let Err(e) = result {
351 error!("Task failed during shutdown: {}", e);
352 } else {
353 trace!("Task completed successfully during shutdown");
354 }
355 }
356
357 #[cfg(feature = "live-stats")]
358 if let Some((stop_tx, handle)) = live_stats_task.take() {
359 let _ = stop_tx.send(());
360 let _ = handle.await;
361 }
362 #[cfg(not(feature = "live-stats"))]
363 let _ = live_stats_task.take();
364
365 #[cfg(feature = "checkpoint")]
366 {
367 if let Some(path) = &checkpoint_config.path {
368 debug!("Creating final checkpoint at {:?}", path);
369 let scheduler_checkpoint = scheduler.snapshot().await?;
370
371 #[cfg(feature = "cookie-store")]
372 let result = save_checkpoint::<S>(
373 path,
374 scheduler_checkpoint,
375 &pipelines,
376 &self.cookie_store,
377 )
378 .await;
379
380 #[cfg(not(feature = "cookie-store"))]
381 let result =
382 save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines, &()).await;
383
384 if let Err(e) = result {
385 error!("Final checkpoint save failed: {}", e);
386 } else {
387 info!("Final checkpoint saved successfully to {:?}", path);
388 }
389 }
390 }
391
392 info!("Closing item pipelines...");
393 let futures: Vec<_> = pipelines.iter().map(|p| p.close()).collect();
394 join_all(futures).await;
395 info!("All item pipelines closed");
396
397 if config.live_stats {
398 println!("{}\n", stats.to_live_report_string());
399 } else {
400 info!("Crawl finished successfully\n{}", stats);
401 }
402 Ok(())
403 }
404
405 pub fn stats(&self) -> Arc<StatCollector> {
407 Arc::clone(&self.stats)
408 }
409
410 pub fn state(&self) -> &S::State {
412 &self.spider_state
413 }
414
415 pub fn state_arc(&self) -> Arc<S::State> {
417 Arc::clone(&self.spider_state)
418 }
419}
420
421fn spawn_init_task<S, I>(ctx: CrawlerContext<S, I>) -> tokio::task::JoinHandle<()>
422where
423 S: Spider<Item = I> + 'static,
424 I: ScrapedItem,
425{
426 tokio::spawn(async move {
427 let mut enqueued = 0usize;
428 let mut skipped = 0usize;
429 let mut failed = 0usize;
430 match ctx.spider.start_requests() {
431 Ok(source) => match source.into_iter() {
432 Ok(requests) => {
433 for req_res in requests {
434 let mut req = match req_res {
435 Ok(req) => req,
436 Err(e) => {
437 warn!("Skipping invalid start URL entry: {}", e);
438 skipped += 1;
439 continue;
440 }
441 };
442
443 req.url.set_fragment(None);
444 match ctx.scheduler.enqueue_request(req).await {
445 Ok(_) => {
446 ctx.stats.increment_requests_enqueued();
447 enqueued += 1;
448 }
449 Err(e) => {
450 error!("Failed to enqueue initial request: {}", e);
451 failed += 1;
452 }
453 }
454 }
455 info!(
456 "Initial request bootstrap finished: {} enqueued, {} skipped, {} failed",
457 enqueued, skipped, failed
458 );
459 }
460 Err(e) => error!("Failed to resolve start request source: {}", e),
461 },
462 Err(e) => error!("Failed to create start request source: {}", e),
463 }
464 })
465}
466#[cfg(feature = "live-stats")]
467struct LiveStatsRenderer {
468 previous_lines: Vec<String>,
469}
470
471#[cfg(feature = "live-stats")]
472impl LiveStatsRenderer {
473 fn new() -> Self {
474 let mut out = std::io::stdout();
475 let _ = execute!(out, Hide);
476 let _ = writeln!(out);
477 let _ = out.flush();
478 Self {
479 previous_lines: Vec::new(),
480 }
481 }
482
483 fn render(&mut self, content: &str) {
484 let mut out = std::io::stdout();
485 let terminal_width = Self::terminal_width();
486 let next_lines: Vec<String> = content
487 .lines()
488 .map(|line| Self::fit_line(line, terminal_width))
489 .collect();
490 let previous_len = self.previous_lines.len();
491 let next_len = next_lines.len();
492 let max_len = previous_len.max(next_len);
493
494 if previous_len > 1 {
495 let _ = queue!(out, MoveUp((previous_len - 1) as u16));
496 }
497 let _ = queue!(out, MoveToColumn(0));
498
499 for line_idx in 0..max_len {
500 let _ = queue!(out, MoveToColumn(0), Clear(ClearType::CurrentLine));
501
502 if let Some(line) = next_lines.get(line_idx) {
503 let _ = write!(out, "{}", line);
504 }
505
506 if line_idx + 1 < max_len {
507 let _ = writeln!(out);
508 }
509 }
510
511 let _ = out.flush();
512 self.previous_lines = next_lines;
513 }
514
515 fn terminal_width() -> usize {
516 size()
517 .map(|(width, _)| usize::from(width.max(1)))
518 .unwrap_or(usize::MAX)
519 }
520
521 fn fit_line(line: &str, width: usize) -> String {
522 let safe_width = Self::safe_terminal_width(width);
523
524 if line.starts_with("current : ") {
525 return Self::truncate_with_ellipsis(line, safe_width);
526 }
527
528 Self::truncate_to_width(line, safe_width)
529 }
530
531 fn safe_terminal_width(width: usize) -> usize {
532 if width == usize::MAX {
533 return width;
534 }
535
536 width.saturating_sub(1)
537 }
538
539 fn truncate_to_width(line: &str, width: usize) -> String {
540 if width == usize::MAX || width == 0 {
541 return line.to_owned();
542 }
543
544 let mut visible_width = 0;
545 let mut truncated = String::new();
546
547 for ch in line.chars() {
548 let ch_width = ch.width().unwrap_or(0);
549 if visible_width + ch_width > width {
550 break;
551 }
552
553 truncated.push(ch);
554 visible_width += ch_width;
555 }
556
557 truncated
558 }
559
560 fn truncate_with_ellipsis(line: &str, width: usize) -> String {
561 if width == usize::MAX || width == 0 {
562 return line.to_owned();
563 }
564
565 if Self::display_width(line) <= width {
566 return line.to_owned();
567 }
568
569 if width <= 3 {
570 return ".".repeat(width);
571 }
572
573 let visible = Self::truncate_to_width(line, width - 3);
574 format!("{visible}...")
575 }
576
577 fn display_width(line: &str) -> usize {
578 line.chars().map(|ch| ch.width().unwrap_or(0)).sum()
579 }
580
581 fn finish(self) {
582 let mut out = std::io::stdout();
583 self.clear_previous(&mut out);
584 let _ = execute!(out, MoveToColumn(0), Clear(ClearType::CurrentLine), Show);
585 let _ = out.flush();
586 }
587
588 fn clear_previous(&self, out: &mut std::io::Stdout) {
589 if self.previous_lines.is_empty() {
590 return;
591 }
592 let lines = self.previous_lines.len();
593 let _ = queue!(out, MoveToColumn(0));
594 if lines > 1 {
595 let _ = queue!(out, MoveUp((lines - 1) as u16));
596 }
597 for line_idx in 0..lines {
598 let _ = queue!(out, MoveToColumn(0), Clear(ClearType::CurrentLine));
599 if line_idx + 1 < lines {
600 let _ = writeln!(out);
601 }
602 }
603 if lines > 1 {
604 let _ = queue!(out, MoveUp((lines - 1) as u16));
605 }
606 }
607}
608
609#[cfg(feature = "live-stats")]
610async fn run_live_stats(
611 stats: Arc<StatCollector>,
612 interval: Duration,
613 mut stop_rx: oneshot::Receiver<()>,
614) {
615 let mut ticker = tokio::time::interval(interval);
616 ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
617 let mut renderer = LiveStatsRenderer::new();
618
619 loop {
620 tokio::select! {
621 _ = ticker.tick() => {
622 renderer.render(&stats.to_live_report_string());
623 }
624 _ = &mut stop_rx => {
625 break;
626 }
627 }
628 }
629
630 renderer.finish();
631}