1use crate::downloader::Downloader;
14use crate::error::SpiderError;
15use crate::item::ScrapedItem;
16use crate::middleware::Middleware;
17use crate::pipeline::Pipeline;
18use crate::request::Request;
19use crate::scheduler::Scheduler;
20use crate::spider::Spider;
21use crate::state::CrawlerState;
22use crate::stats::StatCollector;
23use anyhow::Result;
24use futures_util::future::join_all;
25use kanal::{AsyncReceiver, bounded_async};
26use tracing::{error, info};
27
28#[cfg(feature = "checkpoint")]
29use crate::checkpoint::save_checkpoint;
30#[cfg(feature = "checkpoint")]
31use std::path::PathBuf;
32use std::sync::Arc;
33use std::time::Duration;
34use tokio::sync::Mutex;
35
36#[cfg(feature = "middleware-cookies")]
37use cookie_store::CookieStore;
38
39pub struct Crawler<S: Spider, C> {
41 scheduler: Arc<Scheduler>,
42 req_rx: AsyncReceiver<Request>,
43 stats: Arc<StatCollector>,
44 downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
45 middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
46 spider: Arc<Mutex<S>>,
47 item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
48 max_concurrent_downloads: usize,
49 parser_workers: usize,
50 max_concurrent_pipelines: usize,
51 #[cfg(feature = "checkpoint")]
52 checkpoint_path: Option<PathBuf>,
53 #[cfg(feature = "checkpoint")]
54 checkpoint_interval: Option<Duration>,
55 #[cfg(feature = "middleware-cookies")]
56 pub cookie_store: Arc<Mutex<CookieStore>>,
57}
58
59impl<S, C> Crawler<S, C>
60where
61 S: Spider + 'static,
62 S::Item: ScrapedItem,
63 C: Send + Sync + Clone + 'static,
64{
65 #[allow(clippy::too_many_arguments)]
67 pub(crate) fn new(
68 scheduler: Arc<Scheduler>,
69 req_rx: AsyncReceiver<Request>,
70 downloader: Arc<dyn Downloader<Client = C> + Send + Sync>,
71 middlewares: Vec<Box<dyn Middleware<C> + Send + Sync>>,
72 spider: S,
73 item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
74 max_concurrent_downloads: usize,
75 parser_workers: usize,
76 max_concurrent_pipelines: usize,
77 #[cfg(feature = "checkpoint")] checkpoint_path: Option<PathBuf>,
78 #[cfg(feature = "checkpoint")] checkpoint_interval: Option<Duration>,
79 stats: Arc<StatCollector>,
80 #[cfg(feature = "middleware-cookies")] cookie_store: Arc<Mutex<CookieStore>>,
81 ) -> Self {
82 Crawler {
83 scheduler,
84 req_rx,
85 stats,
86 downloader,
87 middlewares,
88 spider: Arc::new(Mutex::new(spider)),
89 item_pipelines,
90 max_concurrent_downloads,
91 parser_workers,
92 max_concurrent_pipelines,
93 #[cfg(feature = "checkpoint")]
94 checkpoint_path,
95 #[cfg(feature = "checkpoint")]
96 checkpoint_interval,
97 #[cfg(feature = "middleware-cookies")]
98 cookie_store,
99 }
100 }
101
102 pub async fn start_crawl(self) -> Result<(), SpiderError> {
104 info!("Crawler starting crawl");
105
106 let Crawler {
107 scheduler,
108 req_rx,
109 stats,
110 downloader,
111 middlewares,
112 spider,
113 item_pipelines,
114 max_concurrent_downloads,
115 parser_workers,
116 max_concurrent_pipelines,
117 #[cfg(feature = "checkpoint")]
118 checkpoint_path,
119 #[cfg(feature = "checkpoint")]
120 checkpoint_interval,
121 #[cfg(feature = "middleware-cookies")]
122 cookie_store,
123 } = self;
124
125 let state = CrawlerState::new();
126 let pipelines = Arc::new(item_pipelines);
127 let channel_capacity = std::cmp::max(
129 max_concurrent_downloads * 2,
130 parser_workers * max_concurrent_pipelines,
131 )
132 .max(100);
133
134 let (res_tx, res_rx) = bounded_async(channel_capacity);
135 let (item_tx, item_rx) = bounded_async(channel_capacity);
136
137 let initial_requests_task =
138 spawn_initial_requests_task::<S>(scheduler.clone(), spider.clone(), stats.clone());
139
140 let middlewares_manager = super::SharedMiddlewareManager::new(middlewares);
141
142 let downloader_task = super::spawn_downloader_task::<S, C>(
143 scheduler.clone(),
144 req_rx,
145 downloader,
146 middlewares_manager,
147 state.clone(),
148 res_tx.clone(),
149 max_concurrent_downloads,
150 stats.clone(),
151 );
152
153 let parser_task = super::spawn_parser_task::<S>(
154 scheduler.clone(),
155 spider.clone(),
156 state.clone(),
157 res_rx,
158 item_tx.clone(),
159 parser_workers,
160 stats.clone(),
161 );
162
163 let item_processor_task = super::spawn_item_processor_task::<S>(
164 state.clone(),
165 item_rx,
166 pipelines.clone(),
167 max_concurrent_pipelines,
168 stats.clone(),
169 );
170
171 #[cfg(feature = "checkpoint")]
172 if let (Some(path), Some(interval)) = (&checkpoint_path, checkpoint_interval) {
173 let scheduler_clone = scheduler.clone();
174 let pipelines_clone = pipelines.clone();
175 let path_clone = path.clone();
176 #[cfg(feature = "middleware-cookies")]
177 let cookie_store_clone = cookie_store.clone();
178
179 tokio::spawn(async move {
180 let mut interval_timer = tokio::time::interval(interval);
181 interval_timer.tick().await;
182 loop {
183 tokio::select! {
184 _ = interval_timer.tick() => {
185 if let Ok(scheduler_checkpoint) = scheduler_clone.snapshot().await {
186 #[cfg(not(feature = "middleware-cookies"))]
187 let save_result = save_checkpoint::<S>(&path_clone, scheduler_checkpoint, &pipelines_clone).await;
188 #[cfg(feature = "middleware-cookies")]
189 let save_result = save_checkpoint::<S>(&path_clone, scheduler_checkpoint, &pipelines_clone, &cookie_store_clone).await;
190
191 if let Err(e) = save_result {
192 error!("Periodic checkpoint save failed: {}", e);
193 }
194 }
195 }
196 }
197 }
198 });
199 }
200
201 tokio::select! {
202 _ = tokio::signal::ctrl_c() => {
203 info!("Ctrl-C received, initiating graceful shutdown.");
204 }
205 _ = async {
206 loop {
207 if scheduler.is_idle() && state.is_idle() {
208 tokio::time::sleep(Duration::from_millis(50)).await;
209 if scheduler.is_idle() && state.is_idle() {
210 break;
211 }
212 }
213 tokio::time::sleep(Duration::from_millis(100)).await;
214 }
215 } => {
216 info!("Crawl has become idle, initiating shutdown.");
217 }
218 }
219
220 info!("Initiating actor shutdowns.");
221
222 drop(res_tx);
223 drop(item_tx);
224
225 scheduler.shutdown().await?;
226
227 item_processor_task
228 .await
229 .map_err(|e| SpiderError::GeneralError(format!("Item processor task failed: {}", e)))?;
230
231 parser_task
232 .await
233 .map_err(|e| SpiderError::GeneralError(format!("Parser task failed: {}", e)))?;
234
235 downloader_task
236 .await
237 .map_err(|e| SpiderError::GeneralError(format!("Downloader task failed: {}", e)))?;
238
239 initial_requests_task.await.map_err(|e| {
240 SpiderError::GeneralError(format!("Initial requests task failed: {}", e))
241 })?;
242
243 #[cfg(feature = "checkpoint")]
244 if let Some(path) = &checkpoint_path {
245 let scheduler_checkpoint = scheduler.snapshot().await?;
246 #[cfg(not(feature = "middleware-cookies"))]
247 let result = save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines).await;
248 #[cfg(feature = "middleware-cookies")]
249 let result =
250 save_checkpoint::<S>(path, scheduler_checkpoint, &pipelines, &cookie_store).await;
251
252 if let Err(e) = result {
253 error!("Final checkpoint save failed: {}", e);
254 }
255 }
256
257 info!("Closing item pipelines...");
258 let closing_futures: Vec<_> = pipelines.iter().map(|p| p.close()).collect();
259 join_all(closing_futures).await;
260
261 info!("Crawl finished successfully.");
262 Ok(())
263 }
264
265 pub fn get_stats(&self) -> Arc<StatCollector> {
269 Arc::clone(&self.stats)
270 }
271}
272
273fn spawn_initial_requests_task<S>(
274 scheduler: Arc<Scheduler>,
275 spider: Arc<Mutex<S>>,
276 stats: Arc<StatCollector>,
277) -> tokio::task::JoinHandle<()>
278where
279 S: Spider + 'static,
280 S::Item: ScrapedItem,
281{
282 tokio::spawn(async move {
283 match spider.lock().await.start_requests() {
284 Ok(requests) => {
285 for mut req in requests {
286 req.url.set_fragment(None);
287 match scheduler.enqueue_request(req).await {
288 Ok(_) => {
289 stats.increment_requests_enqueued();
290 }
291 Err(e) => {
292 error!("Failed to enqueue initial request: {}", e);
293 }
294 }
295 }
296 }
297 Err(e) => error!("Failed to create start requests: {}", e),
298 }
299 })
300}