omnivore_core/crawler/
mod.rs

1pub mod browser;
2pub mod frontier;
3pub mod politeness;
4pub mod robots;
5pub mod scheduler;
6pub mod worker;
7
8use crate::{CrawlConfig, CrawlResult, CrawlStats, Result};
9use std::sync::Arc;
10use tokio::sync::RwLock;
11use url::Url;
12
13pub struct Crawler {
14    config: Arc<CrawlConfig>,
15    scheduler: scheduler::Scheduler,
16    frontier: Arc<RwLock<frontier::Frontier>>,
17    politeness_engine: Arc<politeness::PolitenessEngine>,
18    stats: Arc<RwLock<CrawlStats>>,
19    results: Arc<RwLock<Vec<CrawlResult>>>,
20}
21
22impl Crawler {
23    pub async fn new(config: CrawlConfig) -> Result<Self> {
24        let config = Arc::new(config);
25        let scheduler = scheduler::Scheduler::new(config.max_workers);
26        let frontier = Arc::new(RwLock::new(frontier::Frontier::new()));
27        let politeness_engine =
28            Arc::new(politeness::PolitenessEngine::new(config.politeness.clone()));
29        let stats = Arc::new(RwLock::new(CrawlStats {
30            total_urls: 0,
31            successful: 0,
32            failed: 0,
33            in_progress: 0,
34            average_response_time_ms: 0.0,
35            start_time: chrono::Utc::now(),
36            elapsed_time: std::time::Duration::from_secs(0),
37        }));
38        let results = Arc::new(RwLock::new(Vec::new()));
39
40        Ok(Self {
41            config,
42            scheduler,
43            frontier,
44            politeness_engine,
45            stats,
46            results,
47        })
48    }
49
50    pub async fn add_seed(&self, url: Url) -> Result<()> {
51        let mut frontier = self.frontier.write().await;
52        frontier.add(url, 0)?;
53        Ok(())
54    }
55
56    pub async fn add_seeds(&self, urls: Vec<Url>) -> Result<()> {
57        let mut frontier = self.frontier.write().await;
58        for url in urls {
59            frontier.add(url, 0)?;
60        }
61        Ok(())
62    }
63
64    pub async fn start(self: &Arc<Self>) -> Result<()> {
65        let start_time = std::time::Instant::now();
66
67        loop {
68            let url_entry = {
69                let mut frontier = self.frontier.write().await;
70                frontier.get_next()
71            };
72
73            if let Some((url, depth)) = url_entry {
74                if depth > self.config.max_depth {
75                    continue;
76                }
77
78                let can_crawl = self.politeness_engine.can_crawl(&url).await;
79                if !can_crawl {
80                    let mut frontier = self.frontier.write().await;
81                    frontier.add(url, depth)?;
82                    tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
83                    continue;
84                }
85
86                let config = self.config.clone();
87                let frontier = self.frontier.clone();
88                let politeness = self.politeness_engine.clone();
89                let stats = self.stats.clone();
90                let results = self.results.clone();
91
92                self.scheduler
93                    .spawn(async move {
94                        let worker = worker::Worker::new(config.clone());
95                        match worker.crawl(url.clone()).await {
96                            Ok(result) => {
97                                politeness.record_crawl(&url).await;
98
99                                // Store the crawl result
100                                let mut results_guard = results.write().await;
101                                results_guard.push(result.clone());
102
103                                let mut stats = stats.write().await;
104                                stats.successful += 1;
105                                stats.in_progress -= 1;
106
107                                let mut frontier = frontier.write().await;
108                                for link_str in result.links.iter() {
109                                    if let Ok(link_url) = url::Url::parse(link_str) {
110                                        let _ = frontier.add(link_url, depth + 1);
111                                    }
112                                }
113                            }
114                            Err(e) => {
115                                let error_msg = format!("Failed to crawl {}: {}", url, e);
116                                tracing::error!("{}", error_msg);
117                                
118                                // Write to error log file
119                                let error_entry = format!(
120                                    "[{}] {}\n",
121                                    chrono::Utc::now().to_rfc3339(),
122                                    error_msg
123                                );
124                                if let Ok(mut file) = tokio::fs::OpenOptions::new()
125                                    .create(true)
126                                    .append(true)
127                                    .open("error.log")
128                                    .await
129                                {
130                                    use tokio::io::AsyncWriteExt;
131                                    let _ = file.write_all(error_entry.as_bytes()).await;
132                                }
133                                
134                                let mut stats = stats.write().await;
135                                stats.failed += 1;
136                                stats.in_progress -= 1;
137                            }
138                        }
139                    })
140                    .await;
141
142                let mut stats = self.stats.write().await;
143                stats.in_progress += 1;
144                stats.total_urls += 1;
145            } else {
146                let stats = self.stats.read().await;
147                if stats.in_progress == 0 {
148                    break;
149                }
150                tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
151            }
152
153            let mut stats = self.stats.write().await;
154            stats.elapsed_time = start_time.elapsed();
155        }
156
157        Ok(())
158    }
159
160    pub async fn get_stats(&self) -> CrawlStats {
161        self.stats.read().await.clone()
162    }
163
164    pub async fn get_results(&self) -> Vec<CrawlResult> {
165        self.results.read().await.clone()
166    }
167
168    pub async fn stop(&self) {
169        self.scheduler.shutdown().await;
170    }
171}