Skip to main content

essence/crawler/
streaming.rs

1use crate::{
2    crawler::config::{CircuitBreaker, CrawlerConfig, MemoryMonitor},
3    crawler::filter::{is_same_domain, should_crawl_url},
4    crawler::pagination::{PaginationConfig, PaginationDetector},
5    crawler::prioritizer::{PrioritizedUrl, UrlPrioritizer},
6    crawler::rate_limiter::DomainRateLimiter,
7    engines::{http::HttpEngine, ScrapeEngine},
8    error::{Result, ScrapeError},
9    format,
10    types::{CrawlEvent, CrawlRequest, Document, ScrapeRequest},
11    utils::{normalize_url_string, robots},
12};
13use scraper::{Html, Selector};
14use std::collections::{BinaryHeap, HashMap, HashSet};
15use std::sync::Arc;
16use tokio::sync::mpsc;
17use tracing::{debug, info, warn};
18use url::Url;
19
20/// Crawl a website and stream documents as they're scraped
21pub async fn crawl_website_stream(
22    request: CrawlRequest,
23    tx: mpsc::Sender<Result<CrawlEvent>>,
24) -> Result<()> {
25    info!("Starting streaming crawl from URL: {}", request.url);
26
27    // Parse and validate base URL
28    let _base_url = Url::parse(&request.url)
29        .map_err(|e| ScrapeError::InvalidUrl(format!("Invalid base URL: {}", e)))?;
30
31    // Normalize the base URL to prevent duplicates
32    let normalized_base_url = normalize_url_string(&request.url)
33        .map_err(|e| ScrapeError::InvalidUrl(format!("Failed to normalize base URL: {}", e)))?;
34
35    // Initialize crawler config with bounds
36    let config = CrawlerConfig::default();
37
38    // Initialize circuit breaker and memory monitor
39    let circuit_breaker = CircuitBreaker::new(config.circuit_breaker_threshold);
40    let memory_monitor = MemoryMonitor::new(config.max_memory_mb, config.enable_memory_monitoring);
41
42    // Initialize URL prioritizer for intelligent crawling
43    let url_prioritizer = UrlPrioritizer::new();
44
45    // Initialize data structures with capacity hints
46    let mut visited = HashSet::new();
47    let mut queue = BinaryHeap::with_capacity(config.max_queue_size.min(1000));
48    let mut url_depths: HashMap<String, u32> = HashMap::new();
49    let mut success_count = 0usize;
50    let mut error_count = 0usize;
51
52    // Add normalized base URL to priority queue
53    let base_prioritized_url = PrioritizedUrl::new(normalized_base_url.clone(), 0, &url_prioritizer);
54    queue.push(base_prioritized_url);
55    url_depths.insert(normalized_base_url, 0);
56
57    // Check robots.txt for the domain
58    let robots_cache = check_robots_txt(&request.url, request.ignore_sitemap).await;
59
60    // Create HTTP engine for scraping
61    let engine = HttpEngine::new()?;
62
63    // Initialize pagination detector with configuration
64    let pagination_config = PaginationConfig {
65        max_pages: request.max_pagination_pages.unwrap_or(50) as usize,
66        max_depth: request.max_depth as usize,
67        detect_circular: true,
68    };
69    let mut pagination_detector = PaginationDetector::new(pagination_config);
70    let detect_pagination = request.detect_pagination.unwrap_or(true);
71
72    // Create rate limiter
73    let rate_limit = std::env::var("CRAWL_RATE_LIMIT_PER_SEC")
74        .ok()
75        .and_then(|v| v.parse().ok())
76        .unwrap_or(2);
77    let rate_limiter = Arc::new(DomainRateLimiter::new(rate_limit));
78
79    info!(
80        "Rate limiting enabled: {} requests/second per domain",
81        rate_limit
82    );
83
84    // Priority-based crawl (BFS with intelligent ordering)
85    while let Some(prioritized_url) = queue.pop() {
86        let current_url = prioritized_url.url;
87        let current_depth = prioritized_url.depth;
88
89        // Check memory limit periodically (every 10 iterations)
90        if (success_count + error_count).is_multiple_of(10) {
91            if let Err(e) = memory_monitor.check_memory_limit() {
92                warn!("Memory limit check failed: {}", e);
93                let error_msg = e.to_string();
94                let _ = tx.send(Ok(CrawlEvent::Error {
95                    url: current_url.clone(),
96                    error: error_msg.clone(),
97                })).await;
98                return Err(ScrapeError::ResourceLimit(error_msg));
99            }
100        }
101
102        // Check if we've reached the document limit
103        if success_count >= request.limit as usize {
104            info!("Reached crawl limit of {} pages", request.limit);
105            break;
106        }
107
108        // Skip if already visited
109        if visited.contains(&current_url) {
110            continue;
111        }
112
113        // Extract domain for circuit breaker
114        let domain = match Url::parse(&current_url) {
115            Ok(parsed) => parsed.host_str().unwrap_or("unknown").to_string(),
116            Err(_) => "unknown".to_string(),
117        };
118
119        // Check circuit breaker
120        if config.enable_circuit_breaker && circuit_breaker.should_skip(&domain) {
121            warn!(
122                "Circuit breaker: Skipping domain {} due to excessive failures ({})",
123                domain,
124                circuit_breaker.get_failure_count(&domain)
125            );
126            visited.insert(current_url.clone());
127            continue;
128        }
129
130        // Skip if depth exceeds max_depth
131        if current_depth > request.max_depth {
132            debug!("Skipping URL due to depth limit: {}", current_url);
133            continue;
134        }
135
136        // Check robots.txt
137        if !robots_cache {
138            match robots::is_allowed_default(&current_url).await {
139                Ok(allowed) => {
140                    if !allowed {
141                        warn!("Robots.txt disallows crawling: {}", current_url);
142                        visited.insert(current_url.clone());
143                        continue;
144                    }
145                }
146                Err(e) => {
147                    warn!("Failed to check robots.txt for {}: {}", current_url, e);
148                }
149            }
150        }
151
152        // Check if URL should be crawled based on filters
153        if !should_crawl_url(&current_url, &request.include_paths, &request.exclude_paths) {
154            debug!(
155                "URL filtered out by include/exclude patterns: {}",
156                current_url
157            );
158            visited.insert(current_url.clone());
159            continue;
160        }
161
162        // Mark as visited (URL is already normalized from queue)
163        visited.insert(current_url.clone());
164
165        // Apply rate limiting before scraping
166        if let Err(e) = rate_limiter.wait_for_permission(&current_url).await {
167            warn!("Rate limiter error for {}: {}", current_url, e);
168            continue;
169        }
170
171        // Send status update
172        let status_event = CrawlEvent::Status {
173            pages_crawled: success_count,
174            queue_size: queue.len(),
175            current_url: Some(current_url.clone()),
176        };
177        if tx.send(Ok(status_event)).await.is_err() {
178            // Client disconnected
179            info!("Client disconnected, stopping crawl");
180            break;
181        }
182
183        // Scrape the URL
184        info!(
185            "Crawling URL: {} (depth: {}, total: {})",
186            current_url,
187            current_depth,
188            success_count
189        );
190
191        match scrape_url(&current_url, &engine).await {
192            Ok((document, links, html)) => {
193                // Record success in circuit breaker
194                if config.enable_circuit_breaker {
195                    circuit_breaker.record_success(&domain);
196                }
197
198                success_count += 1;
199
200                // Send document event immediately
201                let doc_event = CrawlEvent::Document {
202                    url: document.url.clone().unwrap_or_else(|| current_url.clone()),
203                    title: document.title.clone(),
204                    markdown: document.markdown.clone(),
205                    metadata: Box::new(document.metadata.clone()),
206                };
207
208                if tx.send(Ok(doc_event)).await.is_err() {
209                    // Client disconnected
210                    info!("Client disconnected, stopping crawl");
211                    break;
212                }
213
214                // Process discovered links if we haven't reached max depth
215                if current_depth < request.max_depth {
216                    // First, detect pagination links if enabled
217                    let pagination_links = if detect_pagination {
218                        pagination_detector.detect_pagination(&html, &current_url)
219                    } else {
220                        Vec::new()
221                    };
222
223                    // Add pagination links with priority (same depth as current)
224                    for link in &pagination_links {
225                        let normalized_link = match normalize_url_string(link) {
226                            Ok(url) => url,
227                            Err(e) => {
228                                debug!("Failed to normalize pagination link {}: {}", link, e);
229                                continue;
230                            }
231                        };
232
233                        // Skip if already visited or queued
234                        if visited.contains(&normalized_link)
235                            || url_depths.contains_key(&normalized_link)
236                        {
237                            continue;
238                        }
239
240                        // Check domain restrictions
241                        if is_same_domain(&normalized_link, &request.url) {
242                            // Check queue size limit before adding
243                            if queue.len() >= config.max_queue_size {
244                                warn!(
245                                    "Queue limit reached ({}/{}), skipping pagination link: {}",
246                                    queue.len(),
247                                    config.max_queue_size,
248                                    link
249                                );
250                                continue;
251                            }
252
253                            // Add pagination links with high priority (same depth as current)
254                            let prioritized_link = PrioritizedUrl::new(
255                                normalized_link.clone(),
256                                current_depth,
257                                &url_prioritizer,
258                            );
259                            queue.push(prioritized_link);
260                            url_depths.insert(normalized_link, current_depth);
261                            debug!("Added pagination link to queue: {}", link);
262                        }
263                    }
264
265                    // Then process regular links
266                    for link in links {
267                        // Normalize the link to prevent duplicates
268                        let normalized_link = match normalize_url_string(&link) {
269                            Ok(url) => url,
270                            Err(e) => {
271                                debug!("Failed to normalize link {}: {}", link, e);
272                                continue;
273                            }
274                        };
275
276                        // Skip if already visited or queued
277                        if visited.contains(&normalized_link)
278                            || url_depths.contains_key(&normalized_link)
279                        {
280                            continue;
281                        }
282
283                        // Skip if this is a pagination link (already processed)
284                        if pagination_links.contains(&link) || pagination_links.contains(&normalized_link) {
285                            continue;
286                        }
287
288                        // Check domain restrictions
289                        let allow_link = if request.allow_external_links.unwrap_or(false) {
290                            true
291                        } else if request.allow_backward_links.unwrap_or(false) {
292                            // Allow backward links means crawl entire domain
293                            is_same_domain(&normalized_link, &request.url)
294                        } else {
295                            // Only allow links that are "forward" (same path or deeper)
296                            is_same_domain(&normalized_link, &request.url)
297                                && is_forward_link(&normalized_link, &current_url)
298                        };
299
300                        if allow_link {
301                            // Check queue size limit before adding
302                            if queue.len() >= config.max_queue_size {
303                                debug!(
304                                    "Queue limit reached ({}/{}), skipping link: {}",
305                                    queue.len(),
306                                    config.max_queue_size,
307                                    link
308                                );
309                                continue;
310                            }
311
312                            // Apply backpressure: when queue is at threshold, skip secondary links
313                            let backpressure_limit =
314                                (config.max_queue_size * config.backpressure_threshold as usize) / 100;
315                            if queue.len() >= backpressure_limit {
316                                debug!(
317                                    "Queue at backpressure threshold ({}/{}), slowing link discovery",
318                                    queue.len(),
319                                    config.max_queue_size
320                                );
321                                // Only add if this is a direct child (depth + 1)
322                                // Skip secondary/deeper links
323                                if current_depth + 1 < request.max_depth {
324                                    let prioritized_link = PrioritizedUrl::new(
325                                        normalized_link.clone(),
326                                        current_depth + 1,
327                                        &url_prioritizer,
328                                    );
329                                    queue.push(prioritized_link);
330                                    url_depths.insert(normalized_link, current_depth + 1);
331                                }
332                            } else {
333                                let prioritized_link = PrioritizedUrl::new(
334                                    normalized_link.clone(),
335                                    current_depth + 1,
336                                    &url_prioritizer,
337                                );
338                                queue.push(prioritized_link);
339                                url_depths.insert(normalized_link, current_depth + 1);
340                            }
341                        }
342                    }
343                }
344            }
345            Err(e) => {
346                warn!("Failed to scrape {}: {}", current_url, e);
347
348                // Record failure in circuit breaker
349                if config.enable_circuit_breaker {
350                    circuit_breaker.record_failure(&domain);
351                }
352
353                error_count += 1;
354
355                // Send error event
356                let error_event = CrawlEvent::Error {
357                    url: current_url.clone(),
358                    error: e.to_string(),
359                };
360
361                if tx.send(Ok(error_event)).await.is_err() {
362                    // Client disconnected
363                    info!("Client disconnected, stopping crawl");
364                    break;
365                }
366            }
367        }
368    }
369
370    // Send final completion event
371    let complete_event = CrawlEvent::Complete {
372        total_pages: success_count + error_count,
373        success: success_count,
374        errors: error_count,
375    };
376
377    let _ = tx.send(Ok(complete_event)).await;
378
379    // Log final stats
380    info!(
381        "Crawl stats - Queue size: {}, Circuit breaker failures: {}, Memory: {:.2}%",
382        queue.len(),
383        circuit_breaker.get_total_failures(),
384        memory_monitor.get_memory_percentage()
385    );
386
387    info!(
388        "Streaming crawl completed. Total pages crawled: {}, errors: {}",
389        success_count, error_count
390    );
391
392    Ok(())
393}
394
395/// Check if robots.txt allows crawling
396async fn check_robots_txt(url: &str, ignore_sitemap: Option<bool>) -> bool {
397    if ignore_sitemap.unwrap_or(false) {
398        return true;
399    }
400
401    match robots::is_allowed_default(url).await {
402        Ok(allowed) => allowed,
403        Err(e) => {
404            warn!("Failed to check robots.txt: {}, allowing by default", e);
405            true
406        }
407    }
408}
409
410/// Scrape a single URL and extract links
411async fn scrape_url(url: &str, engine: &HttpEngine) -> Result<(Document, Vec<String>, String)> {
412    // Create a scrape request
413    let scrape_request = ScrapeRequest {
414        url: url.to_string(),
415        formats: vec!["markdown".to_string(), "links".to_string()],
416        headers: HashMap::new(),
417        include_tags: vec![],
418        exclude_tags: vec![],
419        only_main_content: true,
420        timeout: 30000,
421        wait_for: 0,
422        remove_base64_images: true,
423        skip_tls_verification: false,
424        engine: "http".to_string(),
425        wait_for_selector: None,
426        actions: vec![],
427        screenshot: false,
428        screenshot_format: "png".to_string(),
429    };
430
431    // Scrape the URL
432    let raw_result = engine.scrape(&scrape_request).await?;
433
434    // Extract links from HTML
435    let links = extract_links(&raw_result.html, url)?;
436
437    // Store HTML for pagination detection
438    let html = raw_result.html.clone();
439
440    // Process the result into a document
441    let document = format::process_scrape_result(raw_result, &scrape_request).await?;
442
443    Ok((document, links, html))
444}
445
446/// Extract all links from HTML
447fn extract_links(html: &str, base_url: &str) -> Result<Vec<String>> {
448    let document = Html::parse_document(html);
449    let selector = Selector::parse("a[href]")
450        .map_err(|e| ScrapeError::Internal(format!("Failed to create link selector: {:?}", e)))?;
451
452    let base = Url::parse(base_url)
453        .map_err(|e| ScrapeError::InvalidUrl(format!("Invalid base URL: {}", e)))?;
454
455    let mut links = Vec::new();
456
457    for element in document.select(&selector) {
458        if let Some(href) = element.value().attr("href") {
459            // Skip javascript:, mailto:, tel:, etc.
460            if href.starts_with("javascript:")
461                || href.starts_with("mailto:")
462                || href.starts_with("tel:")
463                || href.starts_with('#')
464            {
465                continue;
466            }
467
468            // Parse and resolve the URL
469            match base.join(href) {
470                Ok(absolute_url) => {
471                    let url_str = absolute_url.to_string();
472                    // Remove fragment
473                    let url_without_fragment = url_str.split('#').next().unwrap_or(&url_str);
474                    links.push(url_without_fragment.to_string());
475                }
476                Err(_) => {
477                    // Skip invalid URLs
478                    continue;
479                }
480            }
481        }
482    }
483
484    // Deduplicate
485    links.sort();
486    links.dedup();
487
488    Ok(links)
489}
490
491/// Check if a link is "forward" (same path or deeper)
492fn is_forward_link(link: &str, current: &str) -> bool {
493    let link_parsed = match Url::parse(link) {
494        Ok(u) => u,
495        Err(_) => return false,
496    };
497
498    let current_parsed = match Url::parse(current) {
499        Ok(u) => u,
500        Err(_) => return false,
501    };
502
503    let link_path = link_parsed.path();
504    let current_path = current_parsed.path();
505
506    // A link is forward if:
507    // 1. It has the same path as current, or
508    // 2. It's a subpath of current (starts with current path)
509    link_path == current_path || link_path.starts_with(current_path)
510}