1use spider_util::error::SpiderError;
14use spider_util::item::ScrapedItem;
15use spider_middleware::middleware::Middleware;
16use spider_pipeline::pipeline::Pipeline;
17use spider_util::request::Request;
18use crate::scheduler::Scheduler;
19use crate::spider::Spider;
20use crate::Downloader;
21use crate::state::CrawlerState;
22use crate::stats::StatCollector;
23use anyhow::Result;
24use futures_util::future::join_all;
25use kanal::{AsyncReceiver, bounded_async};
26use tracing::{debug, error, info, trace, warn};
27
28use crate::checkpoint::save_checkpoint;
29use std::path::PathBuf;
30use std::sync::{
31 Arc,
32 atomic::Ordering,
33};
34use std::time::Duration;
35use tokio::sync::{Mutex, RwLock};
36
37use cookie_store::CookieStore;
38
39pub struct Crawler<S: Spider, C> {
41 scheduler: Arc<Scheduler>,
42 req_rx: AsyncReceiver<Request>,
43 stats: Arc<StatCollector>,
44 downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
45 middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
46 spider: Arc<Mutex<S>>,
47 item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
48 max_concurrent_downloads: usize,
49 parser_workers: usize,
50 max_concurrent_pipelines: usize,
51 channel_capacity: usize,
52 checkpoint_path: Option<PathBuf>,
53 checkpoint_interval: Option<Duration>,
54 pub cookie_store: Arc<RwLock<CookieStore>>,
55}
56
57impl<S, C> Crawler<S, C>
58where
59 S: Spider + 'static,
60 S::Item: ScrapedItem,
61 C: Send + Sync + Clone + 'static,
62{
63 #[allow(clippy::too_many_arguments)]
65 pub(crate) fn new(
66 scheduler: Arc<Scheduler>,
67 req_rx: AsyncReceiver<Request>,
68 downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
69 middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
70 spider: S,
71 item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
72 max_concurrent_downloads: usize,
73 parser_workers: usize,
74 max_concurrent_pipelines: usize,
75 channel_capacity: usize,
76 checkpoint_path: Option<PathBuf>,
77 checkpoint_interval: Option<Duration>,
78 stats: Arc<StatCollector>,
79 cookie_store: Arc<tokio::sync::RwLock<CookieStore>>,
80 ) -> Self {
81 Crawler {
82 scheduler,
83 req_rx,
84 stats,
85 downloader,
86 middlewares,
87 spider: Arc::new(Mutex::new(spider)),
88 item_pipelines,
89 max_concurrent_downloads,
90 parser_workers,
91 max_concurrent_pipelines,
92 channel_capacity,
93 checkpoint_path,
94 checkpoint_interval,
95 cookie_store,
96 }
97 }
98
99 pub async fn start_crawl(self) -> Result<(), SpiderError> {
101 info!(
102 "Crawler starting crawl with configuration: max_concurrent_downloads={}, parser_workers={}, max_concurrent_pipelines={}",
103 self.max_concurrent_downloads, self.parser_workers, self.max_concurrent_pipelines
104 );
105
106 let Crawler {
107 scheduler,
108 req_rx,
109 stats,
110 downloader,
111 middlewares,
112 spider,
113 item_pipelines,
114 max_concurrent_downloads,
115 parser_workers,
116 max_concurrent_pipelines,
117 channel_capacity: _, checkpoint_path,
119 checkpoint_interval,
120 cookie_store,
121 } = self;
122
123 let state = CrawlerState::new();
124 let pipelines = Arc::new(item_pipelines);
125
126 let adaptive_channel_capacity = std::cmp::max(
127 self.max_concurrent_downloads * 3,
128 self.parser_workers * self.max_concurrent_pipelines * 2,
129 )
130 .max(self.channel_capacity);
131
132 trace!(
133 "Creating communication channels with capacity: {}",
134 adaptive_channel_capacity
135 );
136 let (res_tx, res_rx) = bounded_async(adaptive_channel_capacity);
137 let (item_tx, item_rx) = bounded_async(adaptive_channel_capacity);
138
139 trace!("Spawning initial requests task");
140 let initial_requests_task =
141 spawn_initial_requests_task::<S>(scheduler.clone(), spider.clone(), stats.clone());
142
143 trace!("Initializing middleware manager");
144 let middlewares_manager = super::SharedMiddlewareManager::new(middlewares);
145
146 trace!("Spawning downloader task");
147 let downloader_task = super::spawn_downloader_task::<S, C>(
148 scheduler.clone(),
149 req_rx,
150 downloader,
151 middlewares_manager,
152 state.clone(),
153 res_tx.clone(),
154 max_concurrent_downloads,
155 stats.clone(),
156 );
157
158 trace!("Spawning parser task");
159 let parser_task = super::spawn_parser_task::<S>(
160 scheduler.clone(),
161 spider.clone(),
162 state.clone(),
163 res_rx,
164 item_tx.clone(),
165 parser_workers,
166 stats.clone(),
167 );
168
169 trace!("Spawning item processor task");
170 let item_processor_task = super::spawn_item_processor_task::<S>(
171 state.clone(),
172 item_rx,
173 pipelines.clone(),
174 max_concurrent_pipelines,
175 stats.clone(),
176 );
177
178 if let (Some(path), Some(interval)) = (&checkpoint_path, checkpoint_interval) {
179 let scheduler_clone = scheduler.clone();
180 let pipelines_clone = pipelines.clone();
181 let path_clone = path.clone();
182 let cookie_store_clone = cookie_store.clone();
183
184 trace!(
185 "Starting periodic checkpoint task with interval: {:?}",
186 interval
187 );
188 tokio::spawn(async move {
189 let mut interval_timer = tokio::time::interval(interval);
190 interval_timer.tick().await;
191 loop {
192 tokio::select! {
193 _ = interval_timer.tick() => {
194 trace!("Checkpoint timer ticked, creating snapshot");
195 if let Ok(scheduler_checkpoint) = scheduler_clone.snapshot().await {
196 debug!("Scheduler snapshot created, saving checkpoint to {:?}", path_clone);
197 let save_result = save_checkpoint::<S>(&path_clone, scheduler_checkpoint, &pipelines_clone, &cookie_store_clone).await;
198
199 if let Err(e) = save_result {
200 error!("Periodic checkpoint save failed: {}", e);
201 } else {
202 debug!("Periodic checkpoint saved successfully to {:?}", path_clone);
203 }
204 } else {
205 warn!("Failed to create scheduler snapshot for checkpoint");
206 }
207 }
208 }
209 }
210 });
211 }
212
213 tokio::select! {
214 _ = tokio::signal::ctrl_c() => {
215 info!("Ctrl-C received, initiating graceful shutdown.");
216 }
217 _ = async {
218 loop {
219 if scheduler.is_idle() && state.is_idle() {
220 tokio::time::sleep(Duration::from_millis(50)).await;
221 if scheduler.is_idle() && state.is_idle() {
222 break;
223 }
224 }
225 tokio::time::sleep(Duration::from_millis(100)).await;
226 }
227 } => {
228 info!("Crawl has become idle, initiating shutdown.");
229 }
230 };
231
232 trace!("Closing communication channels");
233 drop(res_tx);
234 drop(item_tx);
235
236 if let Err(e) = scheduler.shutdown().await {
237 error!("Error during scheduler shutdown: {}", e);
238 } else {
239 debug!("Scheduler shutdown initiated successfully");
240 }
241
242 let timeout_duration = Duration::from_secs(30); let mut task_set = tokio::task::JoinSet::new();
245 task_set.spawn(item_processor_task);
246 task_set.spawn(parser_task);
247 task_set.spawn(downloader_task);
248 task_set.spawn(initial_requests_task);
249
250 let remaining_results = tokio::time::timeout(timeout_duration, async {
251 let mut results = Vec::new();
252 while let Some(result) = task_set.join_next().await {
253 results.push(result);
254 }
255 results
256 })
257 .await;
258
259 let task_results = match remaining_results {
260 Ok(results) => {
261 trace!("All tasks completed during shutdown");
262 results
263 }
264 Err(_) => {
265 warn!(
266 "Tasks did not complete within timeout ({}s), aborting remaining tasks and continuing with shutdown...",
267 timeout_duration.as_secs()
268 );
269 task_set.abort_all();
270
271 tokio::time::sleep(Duration::from_millis(100)).await;
272
273 Vec::new()
274 }
275 };
276
277 for result in task_results {
278 if let Err(e) = result {
279 error!("Task failed during shutdown: {}", e);
280 } else {
281 trace!("Task completed successfully during shutdown");
282 }
283 }
284
285 if let Some(path) = &checkpoint_path {
286 debug!("Creating final checkpoint at {:?}", path);
287 let scheduler_checkpoint = scheduler.snapshot().await?;
288 let result =
289 save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines, &cookie_store).await;
290
291 if let Err(e) = result {
292 error!("Final checkpoint save failed: {}", e);
293 } else {
294 info!("Final checkpoint saved successfully to {:?}", path);
295 }
296 }
297
298 info!("Closing item pipelines...");
299 let closing_futures: Vec<_> = pipelines.iter().map(|p| p.close()).collect();
300 join_all(closing_futures).await;
301 debug!("All item pipelines closed");
302
303 info!(
304 "Crawl finished successfully. Stats: requests_enqueued={}, requests_succeeded={}, items_scraped={}",
305 stats.requests_enqueued.load(Ordering::SeqCst),
306 stats.requests_succeeded.load(Ordering::SeqCst),
307 stats.items_scraped.load(Ordering::SeqCst)
308 );
309 Ok(())
310 }
311
312 pub fn get_stats(&self) -> Arc<StatCollector> {
316 Arc::clone(&self.stats)
317 }
318}
319
320fn spawn_initial_requests_task<S>(
321 scheduler: Arc<Scheduler>,
322 spider: Arc<Mutex<S>>,
323 stats: Arc<StatCollector>,
324) -> tokio::task::JoinHandle<()>
325where
326 S: Spider + 'static,
327 S::Item: ScrapedItem,
328{
329 tokio::spawn(async move {
330 match spider.lock().await.start_requests() {
331 Ok(requests) => {
332 for mut req in requests {
333 req.url.set_fragment(None);
334 match scheduler.enqueue_request(req).await {
335 Ok(_) => {
336 stats.increment_requests_enqueued();
337 }
338 Err(e) => {
339 error!("Failed to enqueue initial request: {}", e);
340 }
341 }
342 }
343 }
344 Err(e) => error!("Failed to create start requests: {}", e),
345 }
346 })
347}