1use crate::Downloader;
14use crate::scheduler::Scheduler;
15use crate::spider::Spider;
16use crate::state::CrawlerState;
17use crate::stats::StatCollector;
18use anyhow::Result;
19use futures_util::future::join_all;
20use kanal::{AsyncReceiver, bounded_async};
21use spider_middleware::middleware::Middleware;
22use spider_pipeline::pipeline::Pipeline;
23use spider_util::error::SpiderError;
24use spider_util::item::ScrapedItem;
25use spider_util::request::Request;
26use log::{debug, error, info, trace, warn};
27
28#[cfg(feature = "checkpoint")]
29use crate::checkpoint::save_checkpoint;
30#[cfg(feature = "checkpoint")]
31use std::path::PathBuf;
32
33use std::sync::Arc;
34use std::time::Duration;
35
36#[cfg(feature = "cookie-store")]
37use tokio::sync::RwLock;
38
39#[cfg(feature = "cookie-store")]
40use cookie_store::CookieStore;
41
42pub struct Crawler<S: Spider, C> {
44 scheduler: Arc<Scheduler>,
45 req_rx: AsyncReceiver<Request>,
46 stats: Arc<StatCollector>,
47 downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
48 middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
49 spider: Arc<S>,
50 spider_state: Arc<S::State>,
51 item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
52 max_concurrent_downloads: usize,
53 parser_workers: usize,
54 max_concurrent_pipelines: usize,
55 channel_capacity: usize,
56 #[cfg(feature = "checkpoint")]
57 checkpoint_path: Option<PathBuf>,
58 #[cfg(feature = "checkpoint")]
59 checkpoint_interval: Option<Duration>,
60 #[cfg(feature = "cookie-store")]
61 pub cookie_store: Arc<RwLock<CookieStore>>,
62}
63
64impl<S, C> Crawler<S, C>
65where
66 S: Spider + 'static,
67 S::Item: ScrapedItem,
68 C: Send + Sync + Clone + 'static,
69{
70 #[allow(clippy::too_many_arguments)]
72 pub(crate) fn new(
73 scheduler: Arc<Scheduler>,
74 req_rx: AsyncReceiver<Request>,
75 downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
76 middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
77 spider: S,
78 item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
79 max_concurrent_downloads: usize,
80 parser_workers: usize,
81 max_concurrent_pipelines: usize,
82 channel_capacity: usize,
83 #[cfg(feature = "checkpoint")] checkpoint_path: Option<PathBuf>,
84 #[cfg(feature = "checkpoint")] checkpoint_interval: Option<Duration>,
85 stats: Arc<StatCollector>,
86 #[cfg(feature = "cookie-store")] cookie_store: Arc<tokio::sync::RwLock<CookieStore>>,
87 ) -> Self {
88 Crawler {
89 scheduler,
90 req_rx,
91 stats,
92 downloader,
93 middlewares,
94 spider: Arc::new(spider),
95 spider_state: Arc::new(S::State::default()),
96 item_pipelines,
97 max_concurrent_downloads,
98 parser_workers,
99 max_concurrent_pipelines,
100 channel_capacity,
101 #[cfg(feature = "checkpoint")]
102 checkpoint_path,
103 #[cfg(feature = "checkpoint")]
104 checkpoint_interval,
105 #[cfg(feature = "cookie-store")]
106 cookie_store,
107 }
108 }
109
110 pub async fn start_crawl(self) -> Result<(), SpiderError> {
112 info!(
113 "Crawler starting crawl with configuration: max_concurrent_downloads={}, parser_workers={}, max_concurrent_pipelines={}",
114 self.max_concurrent_downloads, self.parser_workers, self.max_concurrent_pipelines
115 );
116
117 #[cfg(feature = "checkpoint")]
119 let Crawler {
120 scheduler,
121 req_rx,
122 stats,
123 downloader,
124 middlewares,
125 spider,
126 spider_state,
127 item_pipelines,
128 max_concurrent_downloads,
129 parser_workers,
130 max_concurrent_pipelines,
131 channel_capacity: _,
132 checkpoint_path,
133 checkpoint_interval,
134 #[cfg(feature = "cookie-store")]
135 cookie_store: _,
136 } = self;
137
138 #[cfg(not(feature = "checkpoint"))]
139 let Crawler {
140 scheduler,
141 req_rx,
142 stats,
143 downloader,
144 middlewares,
145 spider,
146 spider_state,
147 item_pipelines,
148 max_concurrent_downloads,
149 parser_workers,
150 max_concurrent_pipelines,
151 channel_capacity: _,
152 #[cfg(feature = "cookie-store")]
153 cookie_store: _,
154 } = self;
155
156 let state = CrawlerState::new();
157 let pipelines = Arc::new(item_pipelines);
158
159 let adaptive_channel_capacity = std::cmp::max(
160 self.max_concurrent_downloads * 3,
161 self.parser_workers * self.max_concurrent_pipelines * 2,
162 )
163 .max(self.channel_capacity);
164
165 trace!(
166 "Creating communication channels with capacity: {}",
167 adaptive_channel_capacity
168 );
169 let (res_tx, res_rx) = bounded_async(adaptive_channel_capacity);
170 let (item_tx, item_rx) = bounded_async(adaptive_channel_capacity);
171
172 trace!("Spawning initial requests task");
173 let initial_requests_task =
174 spawn_initial_requests_task::<S>(scheduler.clone(), spider.clone(), stats.clone());
175
176 trace!("Initializing middleware manager");
177 let middlewares_manager = super::SharedMiddlewareManager::new(middlewares);
178
179 trace!("Spawning downloader task");
180 let downloader_task = super::spawn_downloader_task::<S, C>(
181 scheduler.clone(),
182 req_rx,
183 downloader,
184 middlewares_manager,
185 state.clone(),
186 res_tx.clone(),
187 max_concurrent_downloads,
188 stats.clone(),
189 );
190
191 trace!("Spawning parser task");
192 let parser_task = super::spawn_parser_task::<S>(
193 scheduler.clone(),
194 spider.clone(),
195 spider_state.clone(),
196 state.clone(),
197 res_rx,
198 item_tx.clone(),
199 parser_workers,
200 stats.clone(),
201 );
202
203 trace!("Spawning item processor task");
204 let item_processor_task = super::spawn_item_processor_task::<S>(
205 state.clone(),
206 item_rx,
207 pipelines.clone(),
208 max_concurrent_pipelines,
209 stats.clone(),
210 );
211
212 #[cfg(feature = "checkpoint")]
213 {
214 if let (Some(path), Some(interval)) = (&checkpoint_path, checkpoint_interval) {
215 let scheduler_clone = scheduler.clone();
216 let pipelines_clone = pipelines.clone();
217 let path_clone = path.clone();
218
219 #[cfg(feature = "cookie-store")]
220 let cookie_store_clone = self.cookie_store.clone();
221
222 #[cfg(not(feature = "cookie-store"))]
223 let _cookie_store_clone = ();
224
225 trace!(
226 "Starting periodic checkpoint task with interval: {:?}",
227 interval
228 );
229 tokio::spawn(async move {
230 let mut interval_timer = tokio::time::interval(interval);
231 interval_timer.tick().await;
232 loop {
233 tokio::select! {
234 _ = interval_timer.tick() => {
235 trace!("Checkpoint timer ticked, creating snapshot");
236 if let Ok(scheduler_checkpoint) = scheduler_clone.snapshot().await {
237 debug!("Scheduler snapshot created, saving checkpoint to {:?}", path_clone);
238
239 #[cfg(feature = "cookie-store")]
240 let save_result = save_checkpoint::<S>(&path_clone, scheduler_checkpoint, &pipelines_clone, &cookie_store_clone).await;
241
242 #[cfg(not(feature = "cookie-store"))]
243 let save_result = save_checkpoint::<S>(&path_clone, scheduler_checkpoint, &pipelines_clone, &()).await;
244
245 if let Err(e) = save_result {
246 error!("Periodic checkpoint save failed: {}", e);
247 } else {
248 debug!("Periodic checkpoint saved successfully to {:?}", path_clone);
249 }
250 } else {
251 warn!("Failed to create scheduler snapshot for checkpoint");
252 }
253 }
254 }
255 }
256 });
257 }
258 }
259
260 tokio::select! {
261 _ = tokio::signal::ctrl_c() => {
262 info!("Ctrl-C received, initiating graceful shutdown.");
263 }
264 _ = async {
265 loop {
266 if scheduler.is_idle() && state.is_idle() {
267 tokio::time::sleep(Duration::from_millis(50)).await;
268 if scheduler.is_idle() && state.is_idle() {
269 break;
270 }
271 }
272 tokio::time::sleep(Duration::from_millis(100)).await;
273 }
274 } => {
275 info!("Crawl has become idle, initiating shutdown.");
276 }
277 };
278
279 trace!("Closing communication channels");
280 drop(res_tx);
281 drop(item_tx);
282
283 if let Err(e) = scheduler.shutdown().await {
284 error!("Error during scheduler shutdown: {}", e);
285 } else {
286 debug!("Scheduler shutdown initiated successfully");
287 }
288
289 let timeout_duration = Duration::from_secs(30);
290
291 let mut task_set = tokio::task::JoinSet::new();
292 task_set.spawn(item_processor_task);
293 task_set.spawn(parser_task);
294 task_set.spawn(downloader_task);
295 task_set.spawn(initial_requests_task);
296
297 let remaining_results = tokio::time::timeout(timeout_duration, async {
298 let mut results = Vec::new();
299 while let Some(result) = task_set.join_next().await {
300 results.push(result);
301 }
302 results
303 })
304 .await;
305
306 let task_results = match remaining_results {
307 Ok(results) => {
308 trace!("All tasks completed during shutdown");
309 results
310 }
311 Err(_) => {
312 warn!(
313 "Tasks did not complete within timeout ({}s), aborting remaining tasks and continuing with shutdown...",
314 timeout_duration.as_secs()
315 );
316 task_set.abort_all();
317
318 tokio::time::sleep(Duration::from_millis(100)).await;
319
320 Vec::new()
321 }
322 };
323
324 for result in task_results {
325 if let Err(e) = result {
326 error!("Task failed during shutdown: {}", e);
327 } else {
328 trace!("Task completed successfully during shutdown");
329 }
330 }
331
332 #[cfg(feature = "checkpoint")]
333 {
334 if let Some(path) = &checkpoint_path {
335 debug!("Creating final checkpoint at {:?}", path);
336 let scheduler_checkpoint = scheduler.snapshot().await?;
337
338 #[cfg(feature = "cookie-store")]
339 let result = save_checkpoint::<S>(
340 path,
341 scheduler_checkpoint,
342 &pipelines,
343 &self.cookie_store,
344 )
345 .await;
346
347 #[cfg(not(feature = "cookie-store"))]
348 let result =
349 save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines, &()).await;
350
351 if let Err(e) = result {
352 error!("Final checkpoint save failed: {}", e);
353 } else {
354 info!("Final checkpoint saved successfully to {:?}", path);
355 }
356 }
357 }
358
359 info!("Closing item pipelines...");
360 let closing_futures: Vec<_> = pipelines.iter().map(|p| p.close()).collect();
361 join_all(closing_futures).await;
362 debug!("All item pipelines closed");
363
364 info!(
365 "Crawl finished successfully\n{}", stats
366 );
367 Ok(())
368 }
369
370 pub fn get_stats(&self) -> Arc<StatCollector> {
374 Arc::clone(&self.stats)
375 }
376}
377
378fn spawn_initial_requests_task<S>(
379 scheduler: Arc<Scheduler>,
380 spider: Arc<S>,
381 stats: Arc<StatCollector>,
382) -> tokio::task::JoinHandle<()>
383where
384 S: Spider + 'static,
385 S::Item: ScrapedItem,
386{
387 tokio::spawn(async move {
388 match spider.start_requests() {
389 Ok(requests) => {
390 for mut req in requests {
391 req.url.set_fragment(None);
392 match scheduler.enqueue_request(req).await {
393 Ok(_) => {
394 stats.increment_requests_enqueued();
395 }
396 Err(e) => {
397 error!("Failed to enqueue initial request: {}", e);
398 }
399 }
400 }
401 }
402 Err(e) => error!("Failed to create start requests: {}", e),
403 }
404 })
405}