1use crate::Downloader;
14use crate::scheduler::Scheduler;
15use crate::spider::Spider;
16use crate::state::CrawlerState;
17use crate::stats::StatCollector;
18use anyhow::Result;
19use futures_util::future::join_all;
20use kanal::{AsyncReceiver, bounded_async};
21use spider_middleware::middleware::Middleware;
22use spider_pipeline::pipeline::Pipeline;
23use spider_util::error::SpiderError;
24use spider_util::item::ScrapedItem;
25use spider_util::request::Request;
26use tracing::{debug, error, info, trace, warn};
27
28#[cfg(feature = "checkpoint")]
29use crate::checkpoint::save_checkpoint;
30#[cfg(feature = "checkpoint")]
31use std::path::PathBuf;
32
33use std::sync::{Arc, atomic::Ordering};
34use std::time::Duration;
35use tokio::sync::Mutex;
36
37#[cfg(feature = "cookie-store")]
38use tokio::sync::RwLock;
39
40#[cfg(feature = "cookie-store")]
41use cookie_store::CookieStore;
42
43pub struct Crawler<S: Spider, C> {
45 scheduler: Arc<Scheduler>,
46 req_rx: AsyncReceiver<Request>,
47 stats: Arc<StatCollector>,
48 downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
49 middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
50 spider: Arc<Mutex<S>>,
51 item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
52 max_concurrent_downloads: usize,
53 parser_workers: usize,
54 max_concurrent_pipelines: usize,
55 channel_capacity: usize,
56 #[cfg(feature = "checkpoint")]
57 checkpoint_path: Option<PathBuf>,
58 #[cfg(feature = "checkpoint")]
59 checkpoint_interval: Option<Duration>,
60 #[cfg(feature = "cookie-store")]
61 pub cookie_store: Arc<RwLock<CookieStore>>,
62}
63
64impl<S, C> Crawler<S, C>
65where
66 S: Spider + 'static,
67 S::Item: ScrapedItem,
68 C: Send + Sync + Clone + 'static,
69{
70 #[allow(clippy::too_many_arguments)]
72 pub(crate) fn new(
73 scheduler: Arc<Scheduler>,
74 req_rx: AsyncReceiver<Request>,
75 downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
76 middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
77 spider: S,
78 item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
79 max_concurrent_downloads: usize,
80 parser_workers: usize,
81 max_concurrent_pipelines: usize,
82 channel_capacity: usize,
83 #[cfg(feature = "checkpoint")] checkpoint_path: Option<PathBuf>,
84 #[cfg(feature = "checkpoint")] checkpoint_interval: Option<Duration>,
85 stats: Arc<StatCollector>,
86 #[cfg(feature = "cookie-store")] cookie_store: Arc<tokio::sync::RwLock<CookieStore>>,
87 ) -> Self {
88 Crawler {
89 scheduler,
90 req_rx,
91 stats,
92 downloader,
93 middlewares,
94 spider: Arc::new(Mutex::new(spider)),
95 item_pipelines,
96 max_concurrent_downloads,
97 parser_workers,
98 max_concurrent_pipelines,
99 channel_capacity,
100 #[cfg(feature = "checkpoint")]
101 checkpoint_path,
102 #[cfg(feature = "checkpoint")]
103 checkpoint_interval,
104 #[cfg(feature = "cookie-store")]
105 cookie_store,
106 }
107 }
108
109 pub async fn start_crawl(self) -> Result<(), SpiderError> {
111 info!(
112 "Crawler starting crawl with configuration: max_concurrent_downloads={}, parser_workers={}, max_concurrent_pipelines={}",
113 self.max_concurrent_downloads, self.parser_workers, self.max_concurrent_pipelines
114 );
115
116 #[cfg(feature = "checkpoint")]
118 let Crawler {
119 scheduler,
120 req_rx,
121 stats,
122 downloader,
123 middlewares,
124 spider,
125 item_pipelines,
126 max_concurrent_downloads,
127 parser_workers,
128 max_concurrent_pipelines,
129 channel_capacity: _,
130 checkpoint_path,
131 checkpoint_interval,
132 #[cfg(feature = "cookie-store")]
133 cookie_store: _,
134 } = self;
135
136 #[cfg(not(feature = "checkpoint"))]
137 let Crawler {
138 scheduler,
139 req_rx,
140 stats,
141 downloader,
142 middlewares,
143 spider,
144 item_pipelines,
145 max_concurrent_downloads,
146 parser_workers,
147 max_concurrent_pipelines,
148 channel_capacity: _,
149 #[cfg(feature = "cookie-store")]
150 cookie_store: _,
151 } = self;
152
153 let state = CrawlerState::new();
154 let pipelines = Arc::new(item_pipelines);
155
156 let adaptive_channel_capacity = std::cmp::max(
157 self.max_concurrent_downloads * 3,
158 self.parser_workers * self.max_concurrent_pipelines * 2,
159 )
160 .max(self.channel_capacity);
161
162 trace!(
163 "Creating communication channels with capacity: {}",
164 adaptive_channel_capacity
165 );
166 let (res_tx, res_rx) = bounded_async(adaptive_channel_capacity);
167 let (item_tx, item_rx) = bounded_async(adaptive_channel_capacity);
168
169 trace!("Spawning initial requests task");
170 let initial_requests_task =
171 spawn_initial_requests_task::<S>(scheduler.clone(), spider.clone(), stats.clone());
172
173 trace!("Initializing middleware manager");
174 let middlewares_manager = super::SharedMiddlewareManager::new(middlewares);
175
176 trace!("Spawning downloader task");
177 let downloader_task = super::spawn_downloader_task::<S, C>(
178 scheduler.clone(),
179 req_rx,
180 downloader,
181 middlewares_manager,
182 state.clone(),
183 res_tx.clone(),
184 max_concurrent_downloads,
185 stats.clone(),
186 );
187
188 trace!("Spawning parser task");
189 let parser_task = super::spawn_parser_task::<S>(
190 scheduler.clone(),
191 spider.clone(),
192 state.clone(),
193 res_rx,
194 item_tx.clone(),
195 parser_workers,
196 stats.clone(),
197 );
198
199 trace!("Spawning item processor task");
200 let item_processor_task = super::spawn_item_processor_task::<S>(
201 state.clone(),
202 item_rx,
203 pipelines.clone(),
204 max_concurrent_pipelines,
205 stats.clone(),
206 );
207
208 #[cfg(feature = "checkpoint")]
209 {
210 if let (Some(path), Some(interval)) = (&checkpoint_path, checkpoint_interval) {
211 let scheduler_clone = scheduler.clone();
212 let pipelines_clone = pipelines.clone();
213 let path_clone = path.clone();
214
215 #[cfg(feature = "cookie-store")]
216 let cookie_store_clone = self.cookie_store.clone();
217
218 #[cfg(not(feature = "cookie-store"))]
219 let _cookie_store_clone = ();
220
221 trace!(
222 "Starting periodic checkpoint task with interval: {:?}",
223 interval
224 );
225 tokio::spawn(async move {
226 let mut interval_timer = tokio::time::interval(interval);
227 interval_timer.tick().await;
228 loop {
229 tokio::select! {
230 _ = interval_timer.tick() => {
231 trace!("Checkpoint timer ticked, creating snapshot");
232 if let Ok(scheduler_checkpoint) = scheduler_clone.snapshot().await {
233 debug!("Scheduler snapshot created, saving checkpoint to {:?}", path_clone);
234
235 #[cfg(feature = "cookie-store")]
236 let save_result = save_checkpoint::<S>(&path_clone, scheduler_checkpoint, &pipelines_clone, &cookie_store_clone).await;
237
238 #[cfg(not(feature = "cookie-store"))]
239 let save_result = save_checkpoint::<S>(&path_clone, scheduler_checkpoint, &pipelines_clone, &()).await;
240
241 if let Err(e) = save_result {
242 error!("Periodic checkpoint save failed: {}", e);
243 } else {
244 debug!("Periodic checkpoint saved successfully to {:?}", path_clone);
245 }
246 } else {
247 warn!("Failed to create scheduler snapshot for checkpoint");
248 }
249 }
250 }
251 }
252 });
253 }
254 }
255
256 tokio::select! {
257 _ = tokio::signal::ctrl_c() => {
258 info!("Ctrl-C received, initiating graceful shutdown.");
259 }
260 _ = async {
261 loop {
262 if scheduler.is_idle() && state.is_idle() {
263 tokio::time::sleep(Duration::from_millis(50)).await;
264 if scheduler.is_idle() && state.is_idle() {
265 break;
266 }
267 }
268 tokio::time::sleep(Duration::from_millis(100)).await;
269 }
270 } => {
271 info!("Crawl has become idle, initiating shutdown.");
272 }
273 };
274
275 trace!("Closing communication channels");
276 drop(res_tx);
277 drop(item_tx);
278
279 if let Err(e) = scheduler.shutdown().await {
280 error!("Error during scheduler shutdown: {}", e);
281 } else {
282 debug!("Scheduler shutdown initiated successfully");
283 }
284
285 let timeout_duration = Duration::from_secs(30);
286
287 let mut task_set = tokio::task::JoinSet::new();
288 task_set.spawn(item_processor_task);
289 task_set.spawn(parser_task);
290 task_set.spawn(downloader_task);
291 task_set.spawn(initial_requests_task);
292
293 let remaining_results = tokio::time::timeout(timeout_duration, async {
294 let mut results = Vec::new();
295 while let Some(result) = task_set.join_next().await {
296 results.push(result);
297 }
298 results
299 })
300 .await;
301
302 let task_results = match remaining_results {
303 Ok(results) => {
304 trace!("All tasks completed during shutdown");
305 results
306 }
307 Err(_) => {
308 warn!(
309 "Tasks did not complete within timeout ({}s), aborting remaining tasks and continuing with shutdown...",
310 timeout_duration.as_secs()
311 );
312 task_set.abort_all();
313
314 tokio::time::sleep(Duration::from_millis(100)).await;
315
316 Vec::new()
317 }
318 };
319
320 for result in task_results {
321 if let Err(e) = result {
322 error!("Task failed during shutdown: {}", e);
323 } else {
324 trace!("Task completed successfully during shutdown");
325 }
326 }
327
328 #[cfg(feature = "checkpoint")]
329 {
330 if let Some(path) = &checkpoint_path {
331 debug!("Creating final checkpoint at {:?}", path);
332 let scheduler_checkpoint = scheduler.snapshot().await?;
333
334 #[cfg(feature = "cookie-store")]
335 let result = save_checkpoint::<S>(
336 path,
337 scheduler_checkpoint,
338 &pipelines,
339 &self.cookie_store,
340 )
341 .await;
342
343 #[cfg(not(feature = "cookie-store"))]
344 let result =
345 save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines, &()).await;
346
347 if let Err(e) = result {
348 error!("Final checkpoint save failed: {}", e);
349 } else {
350 info!("Final checkpoint saved successfully to {:?}", path);
351 }
352 }
353 }
354
355 info!("Closing item pipelines...");
356 let closing_futures: Vec<_> = pipelines.iter().map(|p| p.close()).collect();
357 join_all(closing_futures).await;
358 debug!("All item pipelines closed");
359
360 info!(
361 "Crawl finished successfully. Stats: requests_enqueued={}, requests_succeeded={}, items_scraped={}",
362 stats.requests_enqueued.load(Ordering::SeqCst),
363 stats.requests_succeeded.load(Ordering::SeqCst),
364 stats.items_scraped.load(Ordering::SeqCst)
365 );
366 Ok(())
367 }
368
369 pub fn get_stats(&self) -> Arc<StatCollector> {
373 Arc::clone(&self.stats)
374 }
375}
376
377fn spawn_initial_requests_task<S>(
378 scheduler: Arc<Scheduler>,
379 spider: Arc<Mutex<S>>,
380 stats: Arc<StatCollector>,
381) -> tokio::task::JoinHandle<()>
382where
383 S: Spider + 'static,
384 S::Item: ScrapedItem,
385{
386 tokio::spawn(async move {
387 match spider.lock().await.start_requests() {
388 Ok(requests) => {
389 for mut req in requests {
390 req.url.set_fragment(None);
391 match scheduler.enqueue_request(req).await {
392 Ok(_) => {
393 stats.increment_requests_enqueued();
394 }
395 Err(e) => {
396 error!("Failed to enqueue initial request: {}", e);
397 }
398 }
399 }
400 }
401 Err(e) => error!("Failed to create start requests: {}", e),
402 }
403 })
404}