1use std::path::{Path, PathBuf};
26use std::time::Duration;
27
28#[derive(Debug, Clone)]
30pub struct CrawlerConfig {
31 pub max_concurrent_downloads: usize,
33 pub max_pending_requests: usize,
35 pub parser_workers: usize,
37 pub max_concurrent_pipelines: usize,
39 pub channel_capacity: usize,
41 pub output_batch_size: usize,
43 pub response_backpressure_threshold: usize,
45 pub item_backpressure_threshold: usize,
47 pub retry_release_permit: bool,
49 pub live_stats: bool,
51 pub live_stats_interval: Duration,
53 pub live_stats_preview_fields: Option<Vec<String>>,
55 pub shutdown_grace_period: Duration,
57 pub item_limit: Option<usize>,
59}
60
61impl Default for CrawlerConfig {
62 fn default() -> Self {
63 let cpu_count = num_cpus::get();
64 let max_concurrent_downloads = (cpu_count * 4).clamp(8, 128);
65 let max_pending_requests = (max_concurrent_downloads * 8).clamp(64, 4096);
66 let parser_workers = (cpu_count * 2).clamp(4, 32);
67 let max_concurrent_pipelines = (cpu_count * 2).clamp(4, 16);
68 let channel_capacity = (max_pending_requests / 2).clamp(512, 4096);
69 CrawlerConfig {
70 max_concurrent_downloads,
71 max_pending_requests,
72 parser_workers,
73 max_concurrent_pipelines,
74 channel_capacity,
75 output_batch_size: 64,
76 response_backpressure_threshold: (max_concurrent_downloads * 6).min(channel_capacity),
77 item_backpressure_threshold: (parser_workers * 6).min(channel_capacity),
78 retry_release_permit: true,
79 live_stats: false,
80 live_stats_interval: Duration::from_millis(50),
81 live_stats_preview_fields: None,
82 shutdown_grace_period: Duration::from_secs(5),
83 item_limit: None,
84 }
85 }
86}
87
88impl CrawlerConfig {
89 pub fn new() -> Self {
91 Self::default()
92 }
93
94 pub fn with_max_concurrent_downloads(mut self, limit: usize) -> Self {
96 self.max_concurrent_downloads = limit;
97 self
98 }
99
100 pub fn with_max_pending_requests(mut self, limit: usize) -> Self {
102 self.max_pending_requests = limit;
103 self
104 }
105
106 pub fn with_parser_workers(mut self, count: usize) -> Self {
108 self.parser_workers = count;
109 self
110 }
111
112 pub fn with_max_concurrent_pipelines(mut self, limit: usize) -> Self {
114 self.max_concurrent_pipelines = limit;
115 self
116 }
117
118 pub fn with_channel_capacity(mut self, capacity: usize) -> Self {
120 self.channel_capacity = capacity;
121 self
122 }
123
124 pub fn with_output_batch_size(mut self, batch_size: usize) -> Self {
126 self.output_batch_size = batch_size;
127 self
128 }
129
130 pub fn with_response_backpressure_threshold(mut self, threshold: usize) -> Self {
132 self.response_backpressure_threshold = threshold;
133 self
134 }
135
136 pub fn with_item_backpressure_threshold(mut self, threshold: usize) -> Self {
138 self.item_backpressure_threshold = threshold;
139 self
140 }
141
142 pub fn with_retry_release_permit(mut self, enabled: bool) -> Self {
144 self.retry_release_permit = enabled;
145 self
146 }
147
148 pub fn with_live_stats(mut self, enabled: bool) -> Self {
150 self.live_stats = enabled;
151 self
152 }
153
154 pub fn with_live_stats_interval(mut self, interval: Duration) -> Self {
156 self.live_stats_interval = interval;
157 self
158 }
159
160 pub fn with_live_stats_preview_fields(
168 mut self,
169 fields: impl IntoIterator<Item = impl Into<String>>,
170 ) -> Self {
171 self.live_stats_preview_fields = Some(fields.into_iter().map(Into::into).collect());
172 self
173 }
174
175 pub fn with_shutdown_grace_period(mut self, grace_period: Duration) -> Self {
177 self.shutdown_grace_period = grace_period;
178 self
179 }
180
181 pub fn with_item_limit(mut self, limit: usize) -> Self {
183 self.item_limit = Some(limit);
184 self
185 }
186
187 pub fn validate(&self) -> Result<(), String> {
189 if self.max_concurrent_downloads == 0 {
190 return Err("max_concurrent_downloads must be greater than 0".to_string());
191 }
192 if self.max_pending_requests == 0 {
193 return Err("max_pending_requests must be greater than 0".to_string());
194 }
195 if self.parser_workers == 0 {
196 return Err("parser_workers must be greater than 0".to_string());
197 }
198 if self.max_concurrent_pipelines == 0 {
199 return Err("max_concurrent_pipelines must be greater than 0".to_string());
200 }
201 if self.output_batch_size == 0 {
202 return Err("output_batch_size must be greater than 0".to_string());
203 }
204 if self.response_backpressure_threshold == 0 {
205 return Err("response_backpressure_threshold must be greater than 0".to_string());
206 }
207 if self.item_backpressure_threshold == 0 {
208 return Err("item_backpressure_threshold must be greater than 0".to_string());
209 }
210 if self.live_stats_interval.is_zero() {
211 return Err("live_stats_interval must be greater than 0".to_string());
212 }
213 if matches!(self.live_stats_preview_fields.as_ref(), Some(fields) if fields.is_empty()) {
214 return Err("live_stats_preview_fields must not be empty".to_string());
215 }
216 if self.shutdown_grace_period.is_zero() {
217 return Err("shutdown_grace_period must be greater than 0".to_string());
218 }
219 if matches!(self.item_limit, Some(0)) {
220 return Err("item_limit must be greater than 0".to_string());
221 }
222 Ok(())
223 }
224}
225
226#[derive(Debug, Clone, Default)]
231pub struct CheckpointConfig {
232 pub path: Option<PathBuf>,
234 pub interval: Option<Duration>,
236}
237
238impl CheckpointConfig {
239 pub fn new() -> Self {
241 Self::default()
242 }
243
244 pub fn builder() -> CheckpointConfigBuilder {
246 CheckpointConfigBuilder::default()
247 }
248
249 pub fn with_path<P: AsRef<Path>>(mut self, path: P) -> Self {
251 self.path = Some(path.as_ref().to_path_buf());
252 self
253 }
254
255 pub fn with_interval(mut self, interval: Duration) -> Self {
257 self.interval = Some(interval);
258 self
259 }
260
261 pub fn is_enabled(&self) -> bool {
263 self.path.is_some()
264 }
265}
266
267#[derive(Debug, Default)]
269pub struct CheckpointConfigBuilder {
270 path: Option<PathBuf>,
271 interval: Option<Duration>,
272}
273
274impl CheckpointConfigBuilder {
275 pub fn new() -> Self {
277 Self::default()
278 }
279
280 pub fn path<P: AsRef<Path>>(mut self, path: P) -> Self {
282 self.path = Some(path.as_ref().to_path_buf());
283 self
284 }
285
286 pub fn interval(mut self, interval: Duration) -> Self {
288 self.interval = Some(interval);
289 self
290 }
291
292 pub fn build(self) -> CheckpointConfig {
294 CheckpointConfig {
295 path: self.path,
296 interval: self.interval,
297 }
298 }
299}
300
301#[derive(Debug, Clone)]
305pub struct ParserConfig {
306 pub worker_count: usize,
308 pub queue_capacity: usize,
310}
311
312impl Default for ParserConfig {
313 fn default() -> Self {
314 ParserConfig {
315 worker_count: num_cpus::get().clamp(4, 16),
316 queue_capacity: 100,
317 }
318 }
319}
320
321impl ParserConfig {
322 pub fn new() -> Self {
324 Self::default()
325 }
326
327 pub fn with_worker_count(mut self, count: usize) -> Self {
329 self.worker_count = count;
330 self
331 }
332
333 pub fn with_queue_capacity(mut self, capacity: usize) -> Self {
335 self.queue_capacity = capacity;
336 self
337 }
338}
339
340#[derive(Debug, Clone)]
344pub struct DownloaderConfig {
345 pub max_concurrent: usize,
347 pub backpressure_threshold: usize,
349}
350
351impl Default for DownloaderConfig {
352 fn default() -> Self {
353 let max_concurrent = num_cpus::get().max(16);
354 DownloaderConfig {
355 max_concurrent,
356 backpressure_threshold: max_concurrent * 2,
357 }
358 }
359}
360
361impl DownloaderConfig {
362 pub fn new() -> Self {
364 Self::default()
365 }
366
367 pub fn with_max_concurrent(mut self, limit: usize) -> Self {
369 self.max_concurrent = limit;
370 self
371 }
372
373 pub fn with_backpressure_threshold(mut self, threshold: usize) -> Self {
375 self.backpressure_threshold = threshold;
376 self
377 }
378}
379
380#[derive(Debug, Clone)]
384pub struct ItemProcessorConfig {
385 pub max_concurrent: usize,
387}
388
389impl Default for ItemProcessorConfig {
390 fn default() -> Self {
391 ItemProcessorConfig {
392 max_concurrent: num_cpus::get().min(8),
393 }
394 }
395}
396
397impl ItemProcessorConfig {
398 pub fn new() -> Self {
400 Self::default()
401 }
402
403 pub fn with_max_concurrent(mut self, limit: usize) -> Self {
405 self.max_concurrent = limit;
406 self
407 }
408}