1use crate::SchedulerCheckpoint;
43use spider_util::error::SpiderError;
44use spider_middleware::middleware::Middleware;
45use spider_pipeline::pipeline::Pipeline;
46use crate::scheduler::Scheduler;
47use crate::spider::Spider;
48use crate::Downloader;
49use crate::ReqwestClientDownloader;
50use num_cpus;
51use std::fs;
52use std::marker::PhantomData;
53use std::path::{Path, PathBuf};
54use std::sync::Arc;
55use std::time::Duration;
56use tracing::{debug, warn};
57
58use super::Crawler;
59use crate::stats::StatCollector;
60
61pub struct CrawlerConfig {
63 pub max_concurrent_downloads: usize,
65 pub parser_workers: usize,
67 pub max_concurrent_pipelines: usize,
69 pub channel_capacity: usize,
71}
72
73impl Default for CrawlerConfig {
74 fn default() -> Self {
75 CrawlerConfig {
76 max_concurrent_downloads: 5,
77 parser_workers: num_cpus::get(),
78 max_concurrent_pipelines: 5,
79 channel_capacity: 200,
80 }
81 }
82}
83
84pub struct CrawlerBuilder<S: Spider, D>
85where
86 D: Downloader,
87{
88 crawler_config: CrawlerConfig,
89 downloader: D,
90 spider: Option<S>,
91 middlewares: Vec<Box<dyn Middleware<D::Client> + Send + Sync>>,
92 item_pipelines: Vec<Box<dyn Pipeline<S::Item>>>,
93 checkpoint_path: Option<PathBuf>,
94 checkpoint_interval: Option<Duration>,
95 _phantom: PhantomData<S>,
96}
97
98impl<S: Spider> Default for CrawlerBuilder<S, ReqwestClientDownloader> {
99 fn default() -> Self {
100 Self {
101 crawler_config: CrawlerConfig::default(),
102 downloader: ReqwestClientDownloader::default(),
103 spider: None,
104 middlewares: Vec::new(),
105 item_pipelines: Vec::new(),
106 checkpoint_path: None,
107 checkpoint_interval: None,
108 _phantom: PhantomData,
109 }
110 }
111}
112
113impl<S: Spider> CrawlerBuilder<S, ReqwestClientDownloader> {
114 pub fn new(spider: S) -> Self {
116 Self {
117 spider: Some(spider),
118 ..Default::default()
119 }
120 }
121}
122
123impl<S: Spider, D: Downloader> CrawlerBuilder<S, D> {
124 pub fn max_concurrent_downloads(mut self, limit: usize) -> Self {
126 self.crawler_config.max_concurrent_downloads = limit;
127 self
128 }
129
130 pub fn max_parser_workers(mut self, limit: usize) -> Self {
132 self.crawler_config.parser_workers = limit;
133 self
134 }
135
136 pub fn max_concurrent_pipelines(mut self, limit: usize) -> Self {
138 self.crawler_config.max_concurrent_pipelines = limit;
139 self
140 }
141
142 pub fn channel_capacity(mut self, capacity: usize) -> Self {
144 self.crawler_config.channel_capacity = capacity;
145 self
146 }
147
148 pub fn downloader(mut self, downloader: D) -> Self {
150 self.downloader = downloader;
151 self
152 }
153
154 pub fn add_middleware<M>(mut self, middleware: M) -> Self
156 where
157 M: Middleware<D::Client> + Send + Sync + 'static,
158 {
159 self.middlewares.push(Box::new(middleware));
160 self
161 }
162
163 pub fn add_pipeline<P>(mut self, pipeline: P) -> Self
165 where
166 P: Pipeline<S::Item> + 'static,
167 {
168 self.item_pipelines.push(Box::new(pipeline));
169 self
170 }
171
172 pub fn with_checkpoint_path<P: AsRef<Path>>(mut self, path: P) -> Self {
174 self.checkpoint_path = Some(path.as_ref().to_path_buf());
175 self
176 }
177
178 pub fn with_checkpoint_interval(mut self, interval: Duration) -> Self {
180 self.checkpoint_interval = Some(interval);
181 self
182 }
183
184 #[allow(unused_variables)]
186 pub async fn build(mut self) -> Result<Crawler<S, D::Client>, SpiderError>
187 where
188 D: Downloader + Send + Sync + 'static,
189 D::Client: Send + Sync + Clone,
190 {
191 let spider = self.validate_and_get_spider()?;
192
193 if self.item_pipelines.is_empty() {
195 return Err(SpiderError::ConfigurationError(
196 "At least one pipeline must be added to the crawler.".to_string(),
197 ));
198 }
199
200 let (initial_scheduler_state, loaded_cookie_store) =
201 self.load_and_restore_checkpoint_state().await?;
202
203 let (scheduler_arc, req_rx) = Scheduler::new(initial_scheduler_state);
204
205 let downloader_arc = Arc::new(self.downloader);
206 let stats = Arc::new(StatCollector::new());
207
208 let crawler = Crawler::new(
209 scheduler_arc,
210 req_rx,
211 downloader_arc,
212 self.middlewares,
213 spider,
214 self.item_pipelines,
215 self.crawler_config.max_concurrent_downloads,
216 self.crawler_config.parser_workers,
217 self.crawler_config.max_concurrent_pipelines,
218 self.crawler_config.channel_capacity,
219 self.checkpoint_path.take(),
220 self.checkpoint_interval,
221 stats,
222 Arc::new(tokio::sync::RwLock::new(loaded_cookie_store.unwrap_or_default())),
223 );
224
225 Ok(crawler)
226 }
227
228 async fn load_and_restore_checkpoint_state(
229 &mut self,
230 ) -> Result<(Option<SchedulerCheckpoint>, Option<crate::CookieStore>), SpiderError> {
231 let mut initial_scheduler_state = None;
232 let mut loaded_pipelines_state = None;
233 let mut loaded_cookie_store = None;
234
235 if let Some(path) = &self.checkpoint_path {
236 debug!("Attempting to load checkpoint from {:?}", path);
237 match fs::read(path) {
238 Ok(bytes) => match rmp_serde::from_slice::<crate::Checkpoint>(&bytes) {
239 Ok(checkpoint) => {
240 initial_scheduler_state = Some(checkpoint.scheduler);
241 loaded_pipelines_state = Some(checkpoint.pipelines);
242
243 loaded_cookie_store = Some(checkpoint.cookie_store);
244 }
245 Err(e) => warn!("Failed to deserialize checkpoint from {:?}: {}", path, e),
246 },
247 Err(e) => warn!("Failed to read checkpoint file {:?}: {}", path, e),
248 }
249 }
250
251 if let Some(pipeline_states) = loaded_pipelines_state {
252 for (name, state) in pipeline_states {
253 if let Some(pipeline) = self.item_pipelines.iter().find(|p| p.name() == name) {
254 pipeline.restore_state(state).await?;
255 } else {
256 warn!("Checkpoint contains state for unknown pipeline: {}", name);
257 }
258 }
259 }
260
261 Ok((initial_scheduler_state, loaded_cookie_store))
262 }
263
264 fn validate_and_get_spider(&mut self) -> Result<S, SpiderError> {
265 if self.crawler_config.max_concurrent_downloads == 0 {
266 return Err(SpiderError::ConfigurationError(
267 "max_concurrent_downloads must be greater than 0.".to_string(),
268 ));
269 }
270 if self.crawler_config.parser_workers == 0 {
271 return Err(SpiderError::ConfigurationError(
272 "parser_workers must be greater than 0.".to_string(),
273 ));
274 }
275 self.spider.take().ok_or_else(|| {
276 SpiderError::ConfigurationError("Crawler must have a spider.".to_string())
277 })
278 }
279}