1use spider_util::error::SpiderError;
14use spider_util::item::ScrapedItem;
15use spider_middleware::middleware::Middleware;
16use spider_pipeline::pipeline::Pipeline;
17use spider_util::request::Request;
18use crate::scheduler::Scheduler;
19use crate::spider::Spider;
20use crate::Downloader;
21use crate::state::CrawlerState;
22use crate::stats::StatCollector;
23use anyhow::Result;
24use futures_util::future::join_all;
25use kanal::{AsyncReceiver, bounded_async};
26use tracing::{debug, error, info, trace, warn};
27
28#[cfg(feature = "checkpoint")]
29use crate::checkpoint::save_checkpoint;
30#[cfg(feature = "checkpoint")]
31use std::path::PathBuf;
32
33use std::sync::{
34 Arc,
35 atomic::Ordering,
36};
37use std::time::Duration;
38use tokio::sync::Mutex;
39
40#[cfg(feature = "cookie-store")]
41use tokio::sync::RwLock;
42
43#[cfg(feature = "cookie-store")]
44use cookie_store::CookieStore;
45
46pub struct Crawler<S: Spider, C> {
48 scheduler: Arc<Scheduler>,
49 req_rx: AsyncReceiver<Request>,
50 stats: Arc<StatCollector>,
51 downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
52 middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
53 spider: Arc<Mutex<S>>,
54 item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
55 max_concurrent_downloads: usize,
56 parser_workers: usize,
57 max_concurrent_pipelines: usize,
58 channel_capacity: usize,
59 #[cfg(feature = "checkpoint")]
60 checkpoint_path: Option<PathBuf>,
61 #[cfg(feature = "checkpoint")]
62 checkpoint_interval: Option<Duration>,
63 #[cfg(feature = "cookie-store")]
64 pub cookie_store: Arc<RwLock<CookieStore>>,
65}
66
67impl<S, C> Crawler<S, C>
68where
69 S: Spider + 'static,
70 S::Item: ScrapedItem,
71 C: Send + Sync + Clone + 'static,
72{
73 #[allow(clippy::too_many_arguments)]
75 pub(crate) fn new(
76 scheduler: Arc<Scheduler>,
77 req_rx: AsyncReceiver<Request>,
78 downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
79 middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
80 spider: S,
81 item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
82 max_concurrent_downloads: usize,
83 parser_workers: usize,
84 max_concurrent_pipelines: usize,
85 channel_capacity: usize,
86 #[cfg(feature = "checkpoint")] checkpoint_path: Option<PathBuf>,
87 #[cfg(feature = "checkpoint")] checkpoint_interval: Option<Duration>,
88 stats: Arc<StatCollector>,
89 #[cfg(feature = "cookie-store")] cookie_store: Arc<tokio::sync::RwLock<CookieStore>>,
90 ) -> Self {
91 Crawler {
92 scheduler,
93 req_rx,
94 stats,
95 downloader,
96 middlewares,
97 spider: Arc::new(Mutex::new(spider)),
98 item_pipelines,
99 max_concurrent_downloads,
100 parser_workers,
101 max_concurrent_pipelines,
102 channel_capacity,
103 #[cfg(feature = "checkpoint")]
104 checkpoint_path,
105 #[cfg(feature = "checkpoint")]
106 checkpoint_interval,
107 #[cfg(feature = "cookie-store")]
108 cookie_store,
109 }
110 }
111
112 pub async fn start_crawl(self) -> Result<(), SpiderError> {
114 info!(
115 "Crawler starting crawl with configuration: max_concurrent_downloads={}, parser_workers={}, max_concurrent_pipelines={}",
116 self.max_concurrent_downloads, self.parser_workers, self.max_concurrent_pipelines
117 );
118
119 #[cfg(feature = "checkpoint")]
121 let Crawler {
122 scheduler,
123 req_rx,
124 stats,
125 downloader,
126 middlewares,
127 spider,
128 item_pipelines,
129 max_concurrent_downloads,
130 parser_workers,
131 max_concurrent_pipelines,
132 channel_capacity: _, checkpoint_path,
134 checkpoint_interval,
135 #[cfg(feature = "cookie-store")]
136 cookie_store: _,
137 } = self;
138
139 #[cfg(not(feature = "checkpoint"))]
140 let Crawler {
141 scheduler,
142 req_rx,
143 stats,
144 downloader,
145 middlewares,
146 spider,
147 item_pipelines,
148 max_concurrent_downloads,
149 parser_workers,
150 max_concurrent_pipelines,
151 channel_capacity: _, #[cfg(feature = "cookie-store")]
153 cookie_store: _,
154 } = self;
155
156 let state = CrawlerState::new();
157 let pipelines = Arc::new(item_pipelines);
158
159 let adaptive_channel_capacity = std::cmp::max(
160 self.max_concurrent_downloads * 3,
161 self.parser_workers * self.max_concurrent_pipelines * 2,
162 )
163 .max(self.channel_capacity);
164
165 trace!(
166 "Creating communication channels with capacity: {}",
167 adaptive_channel_capacity
168 );
169 let (res_tx, res_rx) = bounded_async(adaptive_channel_capacity);
170 let (item_tx, item_rx) = bounded_async(adaptive_channel_capacity);
171
172 trace!("Spawning initial requests task");
173 let initial_requests_task =
174 spawn_initial_requests_task::<S>(scheduler.clone(), spider.clone(), stats.clone());
175
176 trace!("Initializing middleware manager");
177 let middlewares_manager = super::SharedMiddlewareManager::new(middlewares);
178
179 trace!("Spawning downloader task");
180 let downloader_task = super::spawn_downloader_task::<S, C>(
181 scheduler.clone(),
182 req_rx,
183 downloader,
184 middlewares_manager,
185 state.clone(),
186 res_tx.clone(),
187 max_concurrent_downloads,
188 stats.clone(),
189 );
190
191 trace!("Spawning parser task");
192 let parser_task = super::spawn_parser_task::<S>(
193 scheduler.clone(),
194 spider.clone(),
195 state.clone(),
196 res_rx,
197 item_tx.clone(),
198 parser_workers,
199 stats.clone(),
200 );
201
202 trace!("Spawning item processor task");
203 let item_processor_task = super::spawn_item_processor_task::<S>(
204 state.clone(),
205 item_rx,
206 pipelines.clone(),
207 max_concurrent_pipelines,
208 stats.clone(),
209 );
210
211 #[cfg(feature = "checkpoint")]
212 {
213 if let (Some(path), Some(interval)) = (&checkpoint_path, checkpoint_interval) {
214 let scheduler_clone = scheduler.clone();
215 let pipelines_clone = pipelines.clone();
216 let path_clone = path.clone();
217
218 #[cfg(feature = "cookie-store")]
219 let cookie_store_clone = self.cookie_store.clone();
220
221 #[cfg(not(feature = "cookie-store"))]
222 let _cookie_store_clone = ();
223
224 trace!(
225 "Starting periodic checkpoint task with interval: {:?}",
226 interval
227 );
228 tokio::spawn(async move {
229 let mut interval_timer = tokio::time::interval(interval);
230 interval_timer.tick().await;
231 loop {
232 tokio::select! {
233 _ = interval_timer.tick() => {
234 trace!("Checkpoint timer ticked, creating snapshot");
235 if let Ok(scheduler_checkpoint) = scheduler_clone.snapshot().await {
236 debug!("Scheduler snapshot created, saving checkpoint to {:?}", path_clone);
237
238 #[cfg(feature = "cookie-store")]
239 let save_result = save_checkpoint::<S>(&path_clone, scheduler_checkpoint, &pipelines_clone, &cookie_store_clone).await;
240
241 #[cfg(not(feature = "cookie-store"))]
242 let save_result = save_checkpoint::<S>(&path_clone, scheduler_checkpoint, &pipelines_clone, &()).await;
243
244 if let Err(e) = save_result {
245 error!("Periodic checkpoint save failed: {}", e);
246 } else {
247 debug!("Periodic checkpoint saved successfully to {:?}", path_clone);
248 }
249 } else {
250 warn!("Failed to create scheduler snapshot for checkpoint");
251 }
252 }
253 }
254 }
255 });
256 }
257 }
258
259 tokio::select! {
260 _ = tokio::signal::ctrl_c() => {
261 info!("Ctrl-C received, initiating graceful shutdown.");
262 }
263 _ = async {
264 loop {
265 if scheduler.is_idle() && state.is_idle() {
266 tokio::time::sleep(Duration::from_millis(50)).await;
267 if scheduler.is_idle() && state.is_idle() {
268 break;
269 }
270 }
271 tokio::time::sleep(Duration::from_millis(100)).await;
272 }
273 } => {
274 info!("Crawl has become idle, initiating shutdown.");
275 }
276 };
277
278 trace!("Closing communication channels");
279 drop(res_tx);
280 drop(item_tx);
281
282 if let Err(e) = scheduler.shutdown().await {
283 error!("Error during scheduler shutdown: {}", e);
284 } else {
285 debug!("Scheduler shutdown initiated successfully");
286 }
287
288 let timeout_duration = Duration::from_secs(30); let mut task_set = tokio::task::JoinSet::new();
291 task_set.spawn(item_processor_task);
292 task_set.spawn(parser_task);
293 task_set.spawn(downloader_task);
294 task_set.spawn(initial_requests_task);
295
296 let remaining_results = tokio::time::timeout(timeout_duration, async {
297 let mut results = Vec::new();
298 while let Some(result) = task_set.join_next().await {
299 results.push(result);
300 }
301 results
302 })
303 .await;
304
305 let task_results = match remaining_results {
306 Ok(results) => {
307 trace!("All tasks completed during shutdown");
308 results
309 }
310 Err(_) => {
311 warn!(
312 "Tasks did not complete within timeout ({}s), aborting remaining tasks and continuing with shutdown...",
313 timeout_duration.as_secs()
314 );
315 task_set.abort_all();
316
317 tokio::time::sleep(Duration::from_millis(100)).await;
318
319 Vec::new()
320 }
321 };
322
323 for result in task_results {
324 if let Err(e) = result {
325 error!("Task failed during shutdown: {}", e);
326 } else {
327 trace!("Task completed successfully during shutdown");
328 }
329 }
330
331 #[cfg(feature = "checkpoint")]
332 {
333 if let Some(path) = &checkpoint_path {
334 debug!("Creating final checkpoint at {:?}", path);
335 let scheduler_checkpoint = scheduler.snapshot().await?;
336
337 #[cfg(feature = "cookie-store")]
338 let result = save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines, &self.cookie_store).await;
339
340 #[cfg(not(feature = "cookie-store"))]
341 let result = save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines, &()).await;
342
343 if let Err(e) = result {
344 error!("Final checkpoint save failed: {}", e);
345 } else {
346 info!("Final checkpoint saved successfully to {:?}", path);
347 }
348 }
349 }
350
351 info!("Closing item pipelines...");
352 let closing_futures: Vec<_> = pipelines.iter().map(|p| p.close()).collect();
353 join_all(closing_futures).await;
354 debug!("All item pipelines closed");
355
356 info!(
357 "Crawl finished successfully. Stats: requests_enqueued={}, requests_succeeded={}, items_scraped={}",
358 stats.requests_enqueued.load(Ordering::SeqCst),
359 stats.requests_succeeded.load(Ordering::SeqCst),
360 stats.items_scraped.load(Ordering::SeqCst)
361 );
362 Ok(())
363 }
364
365 pub fn get_stats(&self) -> Arc<StatCollector> {
369 Arc::clone(&self.stats)
370 }
371}
372
373fn spawn_initial_requests_task<S>(
374 scheduler: Arc<Scheduler>,
375 spider: Arc<Mutex<S>>,
376 stats: Arc<StatCollector>,
377) -> tokio::task::JoinHandle<()>
378where
379 S: Spider + 'static,
380 S::Item: ScrapedItem,
381{
382 tokio::spawn(async move {
383 match spider.lock().await.start_requests() {
384 Ok(requests) => {
385 for mut req in requests {
386 req.url.set_fragment(None);
387 match scheduler.enqueue_request(req).await {
388 Ok(_) => {
389 stats.increment_requests_enqueued();
390 }
391 Err(e) => {
392 error!("Failed to enqueue initial request: {}", e);
393 }
394 }
395 }
396 }
397 Err(e) => error!("Failed to create start requests: {}", e),
398 }
399 })
400}