1use crate::{
2 crawler::config::{CircuitBreaker, CrawlerConfig, MemoryMonitor},
3 crawler::filter::{is_same_domain, should_crawl_url},
4 crawler::pagination::{PaginationConfig, PaginationDetector},
5 crawler::prioritizer::{PrioritizedUrl, UrlPrioritizer},
6 crawler::rate_limiter::DomainRateLimiter,
7 engines::{http::HttpEngine, ScrapeEngine},
8 error::{Result, ScrapeError},
9 format,
10 types::{CrawlEvent, CrawlRequest, Document, ScrapeRequest},
11 utils::{normalize_url_string, robots},
12};
13use scraper::{Html, Selector};
14use std::collections::{BinaryHeap, HashMap, HashSet};
15use std::sync::Arc;
16use tokio::sync::mpsc;
17use tracing::{debug, info, warn};
18use url::Url;
19
20pub async fn crawl_website_stream(
22 request: CrawlRequest,
23 tx: mpsc::Sender<Result<CrawlEvent>>,
24) -> Result<()> {
25 info!("Starting streaming crawl from URL: {}", request.url);
26
27 let _base_url = Url::parse(&request.url)
29 .map_err(|e| ScrapeError::InvalidUrl(format!("Invalid base URL: {}", e)))?;
30
31 let normalized_base_url = normalize_url_string(&request.url)
33 .map_err(|e| ScrapeError::InvalidUrl(format!("Failed to normalize base URL: {}", e)))?;
34
35 let config = CrawlerConfig::default();
37
38 let circuit_breaker = CircuitBreaker::new(config.circuit_breaker_threshold);
40 let memory_monitor = MemoryMonitor::new(config.max_memory_mb, config.enable_memory_monitoring);
41
42 let url_prioritizer = UrlPrioritizer::new();
44
45 let mut visited = HashSet::new();
47 let mut queue = BinaryHeap::with_capacity(config.max_queue_size.min(1000));
48 let mut url_depths: HashMap<String, u32> = HashMap::new();
49 let mut success_count = 0usize;
50 let mut error_count = 0usize;
51
52 let base_prioritized_url = PrioritizedUrl::new(normalized_base_url.clone(), 0, &url_prioritizer);
54 queue.push(base_prioritized_url);
55 url_depths.insert(normalized_base_url, 0);
56
57 let robots_cache = check_robots_txt(&request.url, request.ignore_sitemap).await;
59
60 let engine = HttpEngine::new()?;
62
63 let pagination_config = PaginationConfig {
65 max_pages: request.max_pagination_pages.unwrap_or(50) as usize,
66 max_depth: request.max_depth as usize,
67 detect_circular: true,
68 };
69 let mut pagination_detector = PaginationDetector::new(pagination_config);
70 let detect_pagination = request.detect_pagination.unwrap_or(true);
71
72 let rate_limit = std::env::var("CRAWL_RATE_LIMIT_PER_SEC")
74 .ok()
75 .and_then(|v| v.parse().ok())
76 .unwrap_or(2);
77 let rate_limiter = Arc::new(DomainRateLimiter::new(rate_limit));
78
79 info!(
80 "Rate limiting enabled: {} requests/second per domain",
81 rate_limit
82 );
83
84 while let Some(prioritized_url) = queue.pop() {
86 let current_url = prioritized_url.url;
87 let current_depth = prioritized_url.depth;
88
89 if (success_count + error_count).is_multiple_of(10) {
91 if let Err(e) = memory_monitor.check_memory_limit() {
92 warn!("Memory limit check failed: {}", e);
93 let error_msg = e.to_string();
94 let _ = tx.send(Ok(CrawlEvent::Error {
95 url: current_url.clone(),
96 error: error_msg.clone(),
97 })).await;
98 return Err(ScrapeError::ResourceLimit(error_msg));
99 }
100 }
101
102 if success_count >= request.limit as usize {
104 info!("Reached crawl limit of {} pages", request.limit);
105 break;
106 }
107
108 if visited.contains(¤t_url) {
110 continue;
111 }
112
113 let domain = match Url::parse(¤t_url) {
115 Ok(parsed) => parsed.host_str().unwrap_or("unknown").to_string(),
116 Err(_) => "unknown".to_string(),
117 };
118
119 if config.enable_circuit_breaker && circuit_breaker.should_skip(&domain) {
121 warn!(
122 "Circuit breaker: Skipping domain {} due to excessive failures ({})",
123 domain,
124 circuit_breaker.get_failure_count(&domain)
125 );
126 visited.insert(current_url.clone());
127 continue;
128 }
129
130 if current_depth > request.max_depth {
132 debug!("Skipping URL due to depth limit: {}", current_url);
133 continue;
134 }
135
136 if !robots_cache {
138 match robots::is_allowed_default(¤t_url).await {
139 Ok(allowed) => {
140 if !allowed {
141 warn!("Robots.txt disallows crawling: {}", current_url);
142 visited.insert(current_url.clone());
143 continue;
144 }
145 }
146 Err(e) => {
147 warn!("Failed to check robots.txt for {}: {}", current_url, e);
148 }
149 }
150 }
151
152 if !should_crawl_url(¤t_url, &request.include_paths, &request.exclude_paths) {
154 debug!(
155 "URL filtered out by include/exclude patterns: {}",
156 current_url
157 );
158 visited.insert(current_url.clone());
159 continue;
160 }
161
162 visited.insert(current_url.clone());
164
165 if let Err(e) = rate_limiter.wait_for_permission(¤t_url).await {
167 warn!("Rate limiter error for {}: {}", current_url, e);
168 continue;
169 }
170
171 let status_event = CrawlEvent::Status {
173 pages_crawled: success_count,
174 queue_size: queue.len(),
175 current_url: Some(current_url.clone()),
176 };
177 if tx.send(Ok(status_event)).await.is_err() {
178 info!("Client disconnected, stopping crawl");
180 break;
181 }
182
183 info!(
185 "Crawling URL: {} (depth: {}, total: {})",
186 current_url,
187 current_depth,
188 success_count
189 );
190
191 match scrape_url(¤t_url, &engine).await {
192 Ok((document, links, html)) => {
193 if config.enable_circuit_breaker {
195 circuit_breaker.record_success(&domain);
196 }
197
198 success_count += 1;
199
200 let doc_event = CrawlEvent::Document {
202 url: document.url.clone().unwrap_or_else(|| current_url.clone()),
203 title: document.title.clone(),
204 markdown: document.markdown.clone(),
205 metadata: Box::new(document.metadata.clone()),
206 };
207
208 if tx.send(Ok(doc_event)).await.is_err() {
209 info!("Client disconnected, stopping crawl");
211 break;
212 }
213
214 if current_depth < request.max_depth {
216 let pagination_links = if detect_pagination {
218 pagination_detector.detect_pagination(&html, ¤t_url)
219 } else {
220 Vec::new()
221 };
222
223 for link in &pagination_links {
225 let normalized_link = match normalize_url_string(link) {
226 Ok(url) => url,
227 Err(e) => {
228 debug!("Failed to normalize pagination link {}: {}", link, e);
229 continue;
230 }
231 };
232
233 if visited.contains(&normalized_link)
235 || url_depths.contains_key(&normalized_link)
236 {
237 continue;
238 }
239
240 if is_same_domain(&normalized_link, &request.url) {
242 if queue.len() >= config.max_queue_size {
244 warn!(
245 "Queue limit reached ({}/{}), skipping pagination link: {}",
246 queue.len(),
247 config.max_queue_size,
248 link
249 );
250 continue;
251 }
252
253 let prioritized_link = PrioritizedUrl::new(
255 normalized_link.clone(),
256 current_depth,
257 &url_prioritizer,
258 );
259 queue.push(prioritized_link);
260 url_depths.insert(normalized_link, current_depth);
261 debug!("Added pagination link to queue: {}", link);
262 }
263 }
264
265 for link in links {
267 let normalized_link = match normalize_url_string(&link) {
269 Ok(url) => url,
270 Err(e) => {
271 debug!("Failed to normalize link {}: {}", link, e);
272 continue;
273 }
274 };
275
276 if visited.contains(&normalized_link)
278 || url_depths.contains_key(&normalized_link)
279 {
280 continue;
281 }
282
283 if pagination_links.contains(&link) || pagination_links.contains(&normalized_link) {
285 continue;
286 }
287
288 let allow_link = if request.allow_external_links.unwrap_or(false) {
290 true
291 } else if request.allow_backward_links.unwrap_or(false) {
292 is_same_domain(&normalized_link, &request.url)
294 } else {
295 is_same_domain(&normalized_link, &request.url)
297 && is_forward_link(&normalized_link, ¤t_url)
298 };
299
300 if allow_link {
301 if queue.len() >= config.max_queue_size {
303 debug!(
304 "Queue limit reached ({}/{}), skipping link: {}",
305 queue.len(),
306 config.max_queue_size,
307 link
308 );
309 continue;
310 }
311
312 let backpressure_limit =
314 (config.max_queue_size * config.backpressure_threshold as usize) / 100;
315 if queue.len() >= backpressure_limit {
316 debug!(
317 "Queue at backpressure threshold ({}/{}), slowing link discovery",
318 queue.len(),
319 config.max_queue_size
320 );
321 if current_depth + 1 < request.max_depth {
324 let prioritized_link = PrioritizedUrl::new(
325 normalized_link.clone(),
326 current_depth + 1,
327 &url_prioritizer,
328 );
329 queue.push(prioritized_link);
330 url_depths.insert(normalized_link, current_depth + 1);
331 }
332 } else {
333 let prioritized_link = PrioritizedUrl::new(
334 normalized_link.clone(),
335 current_depth + 1,
336 &url_prioritizer,
337 );
338 queue.push(prioritized_link);
339 url_depths.insert(normalized_link, current_depth + 1);
340 }
341 }
342 }
343 }
344 }
345 Err(e) => {
346 warn!("Failed to scrape {}: {}", current_url, e);
347
348 if config.enable_circuit_breaker {
350 circuit_breaker.record_failure(&domain);
351 }
352
353 error_count += 1;
354
355 let error_event = CrawlEvent::Error {
357 url: current_url.clone(),
358 error: e.to_string(),
359 };
360
361 if tx.send(Ok(error_event)).await.is_err() {
362 info!("Client disconnected, stopping crawl");
364 break;
365 }
366 }
367 }
368 }
369
370 let complete_event = CrawlEvent::Complete {
372 total_pages: success_count + error_count,
373 success: success_count,
374 errors: error_count,
375 };
376
377 let _ = tx.send(Ok(complete_event)).await;
378
379 info!(
381 "Crawl stats - Queue size: {}, Circuit breaker failures: {}, Memory: {:.2}%",
382 queue.len(),
383 circuit_breaker.get_total_failures(),
384 memory_monitor.get_memory_percentage()
385 );
386
387 info!(
388 "Streaming crawl completed. Total pages crawled: {}, errors: {}",
389 success_count, error_count
390 );
391
392 Ok(())
393}
394
395async fn check_robots_txt(url: &str, ignore_sitemap: Option<bool>) -> bool {
397 if ignore_sitemap.unwrap_or(false) {
398 return true;
399 }
400
401 match robots::is_allowed_default(url).await {
402 Ok(allowed) => allowed,
403 Err(e) => {
404 warn!("Failed to check robots.txt: {}, allowing by default", e);
405 true
406 }
407 }
408}
409
410async fn scrape_url(url: &str, engine: &HttpEngine) -> Result<(Document, Vec<String>, String)> {
412 let scrape_request = ScrapeRequest {
414 url: url.to_string(),
415 formats: vec!["markdown".to_string(), "links".to_string()],
416 headers: HashMap::new(),
417 include_tags: vec![],
418 exclude_tags: vec![],
419 only_main_content: true,
420 timeout: 30000,
421 wait_for: 0,
422 remove_base64_images: true,
423 skip_tls_verification: false,
424 engine: "http".to_string(),
425 wait_for_selector: None,
426 actions: vec![],
427 screenshot: false,
428 screenshot_format: "png".to_string(),
429 };
430
431 let raw_result = engine.scrape(&scrape_request).await?;
433
434 let links = extract_links(&raw_result.html, url)?;
436
437 let html = raw_result.html.clone();
439
440 let document = format::process_scrape_result(raw_result, &scrape_request).await?;
442
443 Ok((document, links, html))
444}
445
446fn extract_links(html: &str, base_url: &str) -> Result<Vec<String>> {
448 let document = Html::parse_document(html);
449 let selector = Selector::parse("a[href]")
450 .map_err(|e| ScrapeError::Internal(format!("Failed to create link selector: {:?}", e)))?;
451
452 let base = Url::parse(base_url)
453 .map_err(|e| ScrapeError::InvalidUrl(format!("Invalid base URL: {}", e)))?;
454
455 let mut links = Vec::new();
456
457 for element in document.select(&selector) {
458 if let Some(href) = element.value().attr("href") {
459 if href.starts_with("javascript:")
461 || href.starts_with("mailto:")
462 || href.starts_with("tel:")
463 || href.starts_with('#')
464 {
465 continue;
466 }
467
468 match base.join(href) {
470 Ok(absolute_url) => {
471 let url_str = absolute_url.to_string();
472 let url_without_fragment = url_str.split('#').next().unwrap_or(&url_str);
474 links.push(url_without_fragment.to_string());
475 }
476 Err(_) => {
477 continue;
479 }
480 }
481 }
482 }
483
484 links.sort();
486 links.dedup();
487
488 Ok(links)
489}
490
491fn is_forward_link(link: &str, current: &str) -> bool {
493 let link_parsed = match Url::parse(link) {
494 Ok(u) => u,
495 Err(_) => return false,
496 };
497
498 let current_parsed = match Url::parse(current) {
499 Ok(u) => u,
500 Err(_) => return false,
501 };
502
503 let link_path = link_parsed.path();
504 let current_path = current_parsed.path();
505
506 link_path == current_path || link_path.starts_with(current_path)
510}