1use crate::Downloader;
14use crate::config::CrawlerConfig;
15use crate::engine::CrawlerContext;
16use crate::scheduler::Scheduler;
17use crate::spider::Spider;
18use crate::state::CrawlerState;
19use crate::stats::StatCollector;
20use anyhow::Result;
21use futures_util::future::join_all;
22use kanal::{AsyncReceiver, bounded_async};
23use log::{debug, error, info, trace, warn};
24use spider_middleware::middleware::Middleware;
25use spider_pipeline::pipeline::Pipeline;
26use spider_util::error::SpiderError;
27use spider_util::item::ScrapedItem;
28use spider_util::request::Request;
29
30#[cfg(feature = "checkpoint")]
31use crate::checkpoint::save_checkpoint;
32#[cfg(feature = "checkpoint")]
33use crate::config::CheckpointConfig;
34
35use std::sync::Arc;
36use std::time::Duration;
37
38#[cfg(feature = "cookie-store")]
39use tokio::sync::RwLock;
40
41#[cfg(feature = "cookie-store")]
42use cookie_store::CookieStore;
43
44pub struct Crawler<S: Spider, C> {
46 scheduler: Arc<Scheduler>,
47 req_rx: AsyncReceiver<Request>,
48 stats: Arc<StatCollector>,
49 downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
50 middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
51 spider: Arc<S>,
52 spider_state: Arc<S::State>,
53 pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
54 config: CrawlerConfig,
55 #[cfg(feature = "checkpoint")]
56 checkpoint_config: CheckpointConfig,
57 #[cfg(feature = "cookie-store")]
58 pub cookie_store: Arc<RwLock<CookieStore>>,
59}
60
61impl<S, C> Crawler<S, C>
62where
63 S: Spider + 'static,
64 S::Item: ScrapedItem,
65 C: Send + Sync + Clone + 'static,
66{
67 #[allow(clippy::too_many_arguments)]
68 pub(crate) fn new(
69 scheduler: Arc<Scheduler>,
70 req_rx: AsyncReceiver<Request>,
71 downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
72 middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
73 spider: S,
74 pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
75 config: CrawlerConfig,
76 #[cfg(feature = "checkpoint")] checkpoint_config: CheckpointConfig,
77 stats: Arc<StatCollector>,
78 #[cfg(feature = "cookie-store")] cookie_store: Arc<tokio::sync::RwLock<CookieStore>>,
79 ) -> Self {
80 Crawler {
81 scheduler,
82 req_rx,
83 stats,
84 downloader,
85 middlewares,
86 spider: Arc::new(spider),
87 spider_state: Arc::new(S::State::default()),
88 pipelines,
89 config,
90 #[cfg(feature = "checkpoint")]
91 checkpoint_config,
92 #[cfg(feature = "cookie-store")]
93 cookie_store,
94 }
95 }
96
97 pub async fn start_crawl(self) -> Result<(), SpiderError> {
98 info!(
99 "Crawler starting crawl with configuration: max_concurrent_downloads={}, parser_workers={}, max_concurrent_pipelines={}",
100 self.config.max_concurrent_downloads,
101 self.config.parser_workers,
102 self.config.max_concurrent_pipelines
103 );
104
105 let Crawler {
106 scheduler,
107 req_rx,
108 stats,
109 downloader,
110 middlewares,
111 spider,
112 spider_state,
113 pipelines,
114 config,
115 #[cfg(feature = "checkpoint")]
116 checkpoint_config,
117 #[cfg(feature = "cookie-store")]
118 cookie_store: _,
119 } = self;
120
121 let state = CrawlerState::new();
122 let pipelines = Arc::new(pipelines);
123
124 let ctx = CrawlerContext::new(
126 Arc::clone(&scheduler),
127 Arc::clone(&stats),
128 Arc::clone(&spider),
129 Arc::clone(&spider_state),
130 Arc::clone(&pipelines),
131 );
132
133 let channel_capacity = std::cmp::max(
134 config.max_concurrent_downloads * 3,
135 config.parser_workers * config.max_concurrent_pipelines * 2,
136 )
137 .max(config.channel_capacity);
138
139 trace!(
140 "Creating communication channels with capacity: {}",
141 channel_capacity
142 );
143 let (res_tx, res_rx) = bounded_async(channel_capacity);
144 let (item_tx, item_rx) = bounded_async(channel_capacity);
145
146 trace!("Spawning initial requests task");
147 let init_task = spawn_init_task(ctx.clone());
148
149 trace!("Initializing middleware manager");
150 let middlewares = super::SharedMiddlewareManager::new(middlewares);
151
152 trace!("Spawning downloader task");
153 let downloader_handle = super::spawn_downloader_task::<S, C>(
154 Arc::clone(&ctx.scheduler),
155 req_rx,
156 downloader,
157 middlewares,
158 state.clone(),
159 res_tx.clone(),
160 config.max_concurrent_downloads,
161 Arc::clone(&ctx.stats),
162 );
163
164 trace!("Spawning parser task");
165 let parser_handle = super::spawn_parser_task::<S>(
166 Arc::clone(&ctx.scheduler),
167 Arc::clone(&ctx.spider),
168 Arc::clone(&ctx.spider_state),
169 state.clone(),
170 res_rx,
171 item_tx.clone(),
172 config.parser_workers,
173 Arc::clone(&ctx.stats),
174 );
175
176 trace!("Spawning item processor task");
177 let processor_handle = super::spawn_item_processor_task::<S>(
178 state.clone(),
179 item_rx,
180 Arc::clone(&ctx.pipelines),
181 config.max_concurrent_pipelines,
182 Arc::clone(&ctx.stats),
183 );
184
185 #[cfg(feature = "checkpoint")]
186 {
187 if let (Some(path), Some(interval)) =
188 (&checkpoint_config.path, checkpoint_config.interval)
189 {
190 let scheduler_cp = Arc::clone(&ctx.scheduler);
191 let pipelines_cp = Arc::clone(&ctx.pipelines);
192 let path_cp = path.clone();
193
194 #[cfg(feature = "cookie-store")]
195 let cookie_store_cp = self.cookie_store.clone();
196
197 #[cfg(not(feature = "cookie-store"))]
198 let _cookie_store_cp = ();
199
200 trace!(
201 "Starting periodic checkpoint task with interval: {:?}",
202 interval
203 );
204 tokio::spawn(async move {
205 let mut interval_timer = tokio::time::interval(interval);
206 interval_timer.tick().await;
207 loop {
208 tokio::select! {
209 _ = interval_timer.tick() => {
210 trace!("Checkpoint timer ticked, creating snapshot");
211 if let Ok(scheduler_checkpoint) = scheduler_cp.snapshot().await {
212 debug!("Scheduler snapshot created, saving checkpoint to {:?}", path_cp);
213
214 #[cfg(feature = "cookie-store")]
215 let save_result = save_checkpoint::<S>(&path_cp, scheduler_checkpoint, &pipelines_cp, &cookie_store_cp).await;
216
217 #[cfg(not(feature = "cookie-store"))]
218 let save_result = save_checkpoint::<S>(&path_cp, scheduler_checkpoint, &pipelines_cp, &()).await;
219
220 if let Err(e) = save_result {
221 error!("Periodic checkpoint save failed: {}", e);
222 } else {
223 debug!("Periodic checkpoint saved successfully to {:?}", path_cp);
224 }
225 } else {
226 warn!("Failed to create scheduler snapshot for checkpoint");
227 }
228 }
229 }
230 }
231 });
232 }
233 }
234
235 tokio::select! {
236 _ = tokio::signal::ctrl_c() => {
237 info!("Ctrl-C received, initiating graceful shutdown.");
238 }
239 _ = async {
240 loop {
241 if scheduler.is_idle() && state.is_idle() {
242 tokio::time::sleep(Duration::from_millis(50)).await;
243 if scheduler.is_idle() && state.is_idle() {
244 break;
245 }
246 }
247 tokio::time::sleep(Duration::from_millis(100)).await;
248 }
249 } => {
250 info!("Crawl has become idle, initiating shutdown.");
251 }
252 };
253
254 trace!("Closing communication channels");
255 drop(res_tx);
256 drop(item_tx);
257
258 if let Err(e) = scheduler.shutdown().await {
259 error!("Error during scheduler shutdown: {}", e);
260 } else {
261 debug!("Scheduler shutdown initiated successfully");
262 }
263
264 let timeout_duration = Duration::from_secs(30);
265
266 let mut tasks = tokio::task::JoinSet::new();
267 tasks.spawn(processor_handle);
268 tasks.spawn(parser_handle);
269 tasks.spawn(downloader_handle);
270 tasks.spawn(init_task);
271
272 let results = tokio::time::timeout(timeout_duration, async {
273 let mut results = Vec::new();
274 while let Some(result) = tasks.join_next().await {
275 results.push(result);
276 }
277 results
278 })
279 .await;
280
281 let results = match results {
282 Ok(results) => {
283 trace!("All tasks completed during shutdown");
284 results
285 }
286 Err(_) => {
287 warn!(
288 "Tasks did not complete within timeout ({}s), aborting remaining tasks and continuing with shutdown...",
289 timeout_duration.as_secs()
290 );
291 tasks.abort_all();
292
293 tokio::time::sleep(Duration::from_millis(100)).await;
294
295 Vec::new()
296 }
297 };
298
299 for result in results {
300 if let Err(e) = result {
301 error!("Task failed during shutdown: {}", e);
302 } else {
303 trace!("Task completed successfully during shutdown");
304 }
305 }
306
307 #[cfg(feature = "checkpoint")]
308 {
309 if let Some(path) = &checkpoint_config.path {
310 debug!("Creating final checkpoint at {:?}", path);
311 let scheduler_checkpoint = scheduler.snapshot().await?;
312
313 #[cfg(feature = "cookie-store")]
314 let result = save_checkpoint::<S>(
315 path,
316 scheduler_checkpoint,
317 &pipelines,
318 &self.cookie_store,
319 )
320 .await;
321
322 #[cfg(not(feature = "cookie-store"))]
323 let result =
324 save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines, &()).await;
325
326 if let Err(e) = result {
327 error!("Final checkpoint save failed: {}", e);
328 } else {
329 info!("Final checkpoint saved successfully to {:?}", path);
330 }
331 }
332 }
333
334 info!("Closing item pipelines...");
335 let futures: Vec<_> = pipelines.iter().map(|p| p.close()).collect();
336 join_all(futures).await;
337 debug!("All item pipelines closed");
338
339 info!("Crawl finished successfully\n{}", stats);
340 Ok(())
341 }
342
343 pub fn stats(&self) -> Arc<StatCollector> {
345 Arc::clone(&self.stats)
346 }
347
348 pub fn state(&self) -> &S::State {
350 &self.spider_state
351 }
352
353 pub fn state_arc(&self) -> Arc<S::State> {
355 Arc::clone(&self.spider_state)
356 }
357}
358
359fn spawn_init_task<S, I>(ctx: CrawlerContext<S, I>) -> tokio::task::JoinHandle<()>
360where
361 S: Spider<Item = I> + 'static,
362 I: ScrapedItem,
363{
364 tokio::spawn(async move {
365 match ctx.spider.start_requests() {
366 Ok(requests) => {
367 for mut req in requests {
368 req.url.set_fragment(None);
369 match ctx.scheduler.enqueue_request(req).await {
370 Ok(_) => {
371 ctx.stats.increment_requests_enqueued();
372 }
373 Err(e) => {
374 error!("Failed to enqueue initial request: {}", e);
375 }
376 }
377 }
378 }
379 Err(e) => error!("Failed to create start requests: {}", e),
380 }
381 })
382}