1use crate::Downloader;
14use crate::scheduler::Scheduler;
15use crate::spider::Spider;
16use crate::state::CrawlerState;
17use crate::stats::StatCollector;
18use crate::engine::CrawlerContext;
19use anyhow::Result;
20use futures_util::future::join_all;
21use kanal::{AsyncReceiver, bounded_async};
22use spider_middleware::middleware::Middleware;
23use spider_pipeline::pipeline::Pipeline;
24use spider_util::error::SpiderError;
25use spider_util::item::ScrapedItem;
26use spider_util::request::Request;
27use log::{debug, error, info, trace, warn};
28
29#[cfg(feature = "checkpoint")]
30use crate::checkpoint::save_checkpoint;
31#[cfg(feature = "checkpoint")]
32use std::path::PathBuf;
33
34use std::sync::Arc;
35use std::time::Duration;
36
37#[cfg(feature = "cookie-store")]
38use tokio::sync::RwLock;
39
40#[cfg(feature = "cookie-store")]
41use cookie_store::CookieStore;
42
43pub struct Crawler<S: Spider, C> {
45 scheduler: Arc<Scheduler>,
46 req_rx: AsyncReceiver<Request>,
47 stats: Arc<StatCollector>,
48 downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
49 middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
50 spider: Arc<S>,
51 spider_state: Arc<S::State>,
52 pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
53 max_concurrent_downloads: usize,
54 parser_workers: usize,
55 max_concurrent_pipelines: usize,
56 channel_capacity: usize,
57 #[cfg(feature = "checkpoint")]
58 checkpoint_path: Option<PathBuf>,
59 #[cfg(feature = "checkpoint")]
60 checkpoint_interval: Option<Duration>,
61 #[cfg(feature = "cookie-store")]
62 pub cookie_store: Arc<RwLock<CookieStore>>,
63}
64
65impl<S, C> Crawler<S, C>
66where
67 S: Spider + 'static,
68 S::Item: ScrapedItem,
69 C: Send + Sync + Clone + 'static,
70{
71 #[allow(clippy::too_many_arguments)]
72 pub(crate) fn new(
73 scheduler: Arc<Scheduler>,
74 req_rx: AsyncReceiver<Request>,
75 downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
76 middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
77 spider: S,
78 pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
79 max_concurrent_downloads: usize,
80 parser_workers: usize,
81 max_concurrent_pipelines: usize,
82 channel_capacity: usize,
83 #[cfg(feature = "checkpoint")] checkpoint_path: Option<PathBuf>,
84 #[cfg(feature = "checkpoint")] checkpoint_interval: Option<Duration>,
85 stats: Arc<StatCollector>,
86 #[cfg(feature = "cookie-store")] cookie_store: Arc<tokio::sync::RwLock<CookieStore>>,
87 ) -> Self {
88 Crawler {
89 scheduler,
90 req_rx,
91 stats,
92 downloader,
93 middlewares,
94 spider: Arc::new(spider),
95 spider_state: Arc::new(S::State::default()),
96 pipelines,
97 max_concurrent_downloads,
98 parser_workers,
99 max_concurrent_pipelines,
100 channel_capacity,
101 #[cfg(feature = "checkpoint")]
102 checkpoint_path,
103 #[cfg(feature = "checkpoint")]
104 checkpoint_interval,
105 #[cfg(feature = "cookie-store")]
106 cookie_store,
107 }
108 }
109
110 pub async fn start_crawl(self) -> Result<(), SpiderError> {
111 info!(
112 "Crawler starting crawl with configuration: max_concurrent_downloads={}, parser_workers={}, max_concurrent_pipelines={}",
113 self.max_concurrent_downloads, self.parser_workers, self.max_concurrent_pipelines
114 );
115
116 #[cfg(feature = "checkpoint")]
117 let Crawler {
118 scheduler,
119 req_rx,
120 stats,
121 downloader,
122 middlewares,
123 spider,
124 spider_state,
125 pipelines,
126 max_concurrent_downloads,
127 parser_workers,
128 max_concurrent_pipelines,
129 channel_capacity: _,
130 checkpoint_path,
131 checkpoint_interval,
132 #[cfg(feature = "cookie-store")]
133 cookie_store: _,
134 } = self;
135
136 #[cfg(not(feature = "checkpoint"))]
137 let Crawler {
138 scheduler,
139 req_rx,
140 stats,
141 downloader,
142 middlewares,
143 spider,
144 spider_state,
145 pipelines,
146 max_concurrent_downloads,
147 parser_workers,
148 max_concurrent_pipelines,
149 channel_capacity: _,
150 #[cfg(feature = "cookie-store")]
151 cookie_store: _,
152 } = self;
153
154 let state = CrawlerState::new();
155 let pipelines = Arc::new(pipelines);
156
157 let ctx = CrawlerContext::new(
159 Arc::clone(&scheduler),
160 Arc::clone(&stats),
161 Arc::clone(&spider),
162 Arc::clone(&spider_state),
163 Arc::clone(&pipelines),
164 );
165
166 let channel_capacity = std::cmp::max(
167 self.max_concurrent_downloads * 3,
168 self.parser_workers * self.max_concurrent_pipelines * 2,
169 )
170 .max(self.channel_capacity);
171
172 trace!("Creating communication channels with capacity: {}", channel_capacity);
173 let (res_tx, res_rx) = bounded_async(channel_capacity);
174 let (item_tx, item_rx) = bounded_async(channel_capacity);
175
176 trace!("Spawning initial requests task");
177 let init_task = spawn_init_task(ctx.clone());
178
179 trace!("Initializing middleware manager");
180 let middlewares = super::SharedMiddlewareManager::new(middlewares);
181
182 trace!("Spawning downloader task");
183 let downloader = super::spawn_downloader_task::<S, C>(
184 Arc::clone(&ctx.scheduler),
185 req_rx,
186 downloader,
187 middlewares,
188 state.clone(),
189 res_tx.clone(),
190 max_concurrent_downloads,
191 Arc::clone(&ctx.stats),
192 );
193
194 trace!("Spawning parser task");
195 let parser = super::spawn_parser_task::<S>(
196 Arc::clone(&ctx.scheduler),
197 Arc::clone(&ctx.spider),
198 Arc::clone(&ctx.spider_state),
199 state.clone(),
200 res_rx,
201 item_tx.clone(),
202 parser_workers,
203 Arc::clone(&ctx.stats),
204 );
205
206 trace!("Spawning item processor task");
207 let processor = super::spawn_item_processor_task::<S>(
208 state.clone(),
209 item_rx,
210 Arc::clone(&ctx.pipelines),
211 max_concurrent_pipelines,
212 Arc::clone(&ctx.stats),
213 );
214
215 #[cfg(feature = "checkpoint")]
216 {
217 if let (Some(path), Some(interval)) = (&checkpoint_path, checkpoint_interval) {
218 let scheduler_cp = Arc::clone(&ctx.scheduler);
219 let pipelines_cp = Arc::clone(&ctx.pipelines);
220 let path_cp = path.clone();
221
222 #[cfg(feature = "cookie-store")]
223 let cookie_store_cp = self.cookie_store.clone();
224
225 #[cfg(not(feature = "cookie-store"))]
226 let _cookie_store_cp = ();
227
228 trace!("Starting periodic checkpoint task with interval: {:?}", interval);
229 tokio::spawn(async move {
230 let mut interval_timer = tokio::time::interval(interval);
231 interval_timer.tick().await;
232 loop {
233 tokio::select! {
234 _ = interval_timer.tick() => {
235 trace!("Checkpoint timer ticked, creating snapshot");
236 if let Ok(scheduler_checkpoint) = scheduler_cp.snapshot().await {
237 debug!("Scheduler snapshot created, saving checkpoint to {:?}", path_cp);
238
239 #[cfg(feature = "cookie-store")]
240 let save_result = save_checkpoint::<S>(&path_cp, scheduler_checkpoint, &pipelines_cp, &cookie_store_cp).await;
241
242 #[cfg(not(feature = "cookie-store"))]
243 let save_result = save_checkpoint::<S>(&path_cp, scheduler_checkpoint, &pipelines_cp, &()).await;
244
245 if let Err(e) = save_result {
246 error!("Periodic checkpoint save failed: {}", e);
247 } else {
248 debug!("Periodic checkpoint saved successfully to {:?}", path_cp);
249 }
250 } else {
251 warn!("Failed to create scheduler snapshot for checkpoint");
252 }
253 }
254 }
255 }
256 });
257 }
258 }
259
260 tokio::select! {
261 _ = tokio::signal::ctrl_c() => {
262 info!("Ctrl-C received, initiating graceful shutdown.");
263 }
264 _ = async {
265 loop {
266 if scheduler.is_idle() && state.is_idle() {
267 tokio::time::sleep(Duration::from_millis(50)).await;
268 if scheduler.is_idle() && state.is_idle() {
269 break;
270 }
271 }
272 tokio::time::sleep(Duration::from_millis(100)).await;
273 }
274 } => {
275 info!("Crawl has become idle, initiating shutdown.");
276 }
277 };
278
279 trace!("Closing communication channels");
280 drop(res_tx);
281 drop(item_tx);
282
283 if let Err(e) = scheduler.shutdown().await {
284 error!("Error during scheduler shutdown: {}", e);
285 } else {
286 debug!("Scheduler shutdown initiated successfully");
287 }
288
289 let timeout_duration = Duration::from_secs(30);
290
291 let mut tasks = tokio::task::JoinSet::new();
292 tasks.spawn(processor);
293 tasks.spawn(parser);
294 tasks.spawn(downloader);
295 tasks.spawn(init_task);
296
297 let results = tokio::time::timeout(timeout_duration, async {
298 let mut results = Vec::new();
299 while let Some(result) = tasks.join_next().await {
300 results.push(result);
301 }
302 results
303 })
304 .await;
305
306 let results = match results {
307 Ok(results) => {
308 trace!("All tasks completed during shutdown");
309 results
310 }
311 Err(_) => {
312 warn!(
313 "Tasks did not complete within timeout ({}s), aborting remaining tasks and continuing with shutdown...",
314 timeout_duration.as_secs()
315 );
316 tasks.abort_all();
317
318 tokio::time::sleep(Duration::from_millis(100)).await;
319
320 Vec::new()
321 }
322 };
323
324 for result in results {
325 if let Err(e) = result {
326 error!("Task failed during shutdown: {}", e);
327 } else {
328 trace!("Task completed successfully during shutdown");
329 }
330 }
331
332 #[cfg(feature = "checkpoint")]
333 {
334 if let Some(path) = &checkpoint_path {
335 debug!("Creating final checkpoint at {:?}", path);
336 let scheduler_checkpoint = scheduler.snapshot().await?;
337
338 #[cfg(feature = "cookie-store")]
339 let result = save_checkpoint::<S>(
340 path,
341 scheduler_checkpoint,
342 &pipelines,
343 &self.cookie_store,
344 )
345 .await;
346
347 #[cfg(not(feature = "cookie-store"))]
348 let result =
349 save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines, &()).await;
350
351 if let Err(e) = result {
352 error!("Final checkpoint save failed: {}", e);
353 } else {
354 info!("Final checkpoint saved successfully to {:?}", path);
355 }
356 }
357 }
358
359 info!("Closing item pipelines...");
360 let futures: Vec<_> = pipelines.iter().map(|p| p.close()).collect();
361 join_all(futures).await;
362 debug!("All item pipelines closed");
363
364 info!("Crawl finished successfully\n{}", stats);
365 Ok(())
366 }
367
368 pub fn stats(&self) -> Arc<StatCollector> {
369 Arc::clone(&self.stats)
370 }
371
372 pub fn state(&self) -> &S::State {
374 &self.spider_state
375 }
376
377 pub fn state_arc(&self) -> Arc<S::State> {
379 Arc::clone(&self.spider_state)
380 }
381}
382
383fn spawn_init_task<S, I>(
384 ctx: CrawlerContext<S, I>,
385) -> tokio::task::JoinHandle<()>
386where
387 S: Spider<Item = I> + 'static,
388 I: ScrapedItem,
389{
390 tokio::spawn(async move {
391 match ctx.spider.start_requests() {
392 Ok(requests) => {
393 for mut req in requests {
394 req.url.set_fragment(None);
395 match ctx.scheduler.enqueue_request(req).await {
396 Ok(_) => {
397 ctx.stats.increment_requests_enqueued();
398 }
399 Err(e) => {
400 error!("Failed to enqueue initial request: {}", e);
401 }
402 }
403 }
404 }
405 Err(e) => error!("Failed to create start requests: {}", e),
406 }
407 })
408}