omnivore_core/crawler/
mod.rs1pub mod browser;
2pub mod frontier;
3pub mod politeness;
4pub mod robots;
5pub mod scheduler;
6pub mod worker;
7
8use crate::{CrawlConfig, CrawlResult, CrawlStats, Result};
9use std::sync::Arc;
10use tokio::sync::RwLock;
11use url::Url;
12
13pub struct Crawler {
14 config: Arc<CrawlConfig>,
15 scheduler: scheduler::Scheduler,
16 frontier: Arc<RwLock<frontier::Frontier>>,
17 politeness_engine: Arc<politeness::PolitenessEngine>,
18 stats: Arc<RwLock<CrawlStats>>,
19 results: Arc<RwLock<Vec<CrawlResult>>>,
20}
21
22impl Crawler {
23 pub async fn new(config: CrawlConfig) -> Result<Self> {
24 let config = Arc::new(config);
25 let scheduler = scheduler::Scheduler::new(config.max_workers);
26 let frontier = Arc::new(RwLock::new(frontier::Frontier::new()));
27 let politeness_engine =
28 Arc::new(politeness::PolitenessEngine::new(config.politeness.clone()));
29 let stats = Arc::new(RwLock::new(CrawlStats {
30 total_urls: 0,
31 successful: 0,
32 failed: 0,
33 in_progress: 0,
34 average_response_time_ms: 0.0,
35 start_time: chrono::Utc::now(),
36 elapsed_time: std::time::Duration::from_secs(0),
37 }));
38 let results = Arc::new(RwLock::new(Vec::new()));
39
40 Ok(Self {
41 config,
42 scheduler,
43 frontier,
44 politeness_engine,
45 stats,
46 results,
47 })
48 }
49
50 pub async fn add_seed(&self, url: Url) -> Result<()> {
51 let mut frontier = self.frontier.write().await;
52 frontier.add(url, 0)?;
53 Ok(())
54 }
55
56 pub async fn add_seeds(&self, urls: Vec<Url>) -> Result<()> {
57 let mut frontier = self.frontier.write().await;
58 for url in urls {
59 frontier.add(url, 0)?;
60 }
61 Ok(())
62 }
63
64 pub async fn start(self: &Arc<Self>) -> Result<()> {
65 let start_time = std::time::Instant::now();
66
67 loop {
68 let url_entry = {
69 let mut frontier = self.frontier.write().await;
70 frontier.get_next()
71 };
72
73 if let Some((url, depth)) = url_entry {
74 if depth > self.config.max_depth {
75 continue;
76 }
77
78 let can_crawl = self.politeness_engine.can_crawl(&url).await;
79 if !can_crawl {
80 let mut frontier = self.frontier.write().await;
81 frontier.add(url, depth)?;
82 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
83 continue;
84 }
85
86 let config = self.config.clone();
87 let frontier = self.frontier.clone();
88 let politeness = self.politeness_engine.clone();
89 let stats = self.stats.clone();
90 let results = self.results.clone();
91
92 self.scheduler
93 .spawn(async move {
94 let worker = worker::Worker::new(config.clone());
95 match worker.crawl(url.clone()).await {
96 Ok(result) => {
97 politeness.record_crawl(&url).await;
98
99 let mut results_guard = results.write().await;
101 results_guard.push(result.clone());
102
103 let mut stats = stats.write().await;
104 stats.successful += 1;
105 stats.in_progress -= 1;
106
107 let mut frontier = frontier.write().await;
108 for link_str in result.links.iter() {
109 if let Ok(link_url) = url::Url::parse(link_str) {
110 let _ = frontier.add(link_url, depth + 1);
111 }
112 }
113 }
114 Err(e) => {
115 let error_msg = format!("Failed to crawl {}: {}", url, e);
116 tracing::error!("{}", error_msg);
117
118 let error_entry = format!(
120 "[{}] {}\n",
121 chrono::Utc::now().to_rfc3339(),
122 error_msg
123 );
124 if let Ok(mut file) = tokio::fs::OpenOptions::new()
125 .create(true)
126 .append(true)
127 .open("error.log")
128 .await
129 {
130 use tokio::io::AsyncWriteExt;
131 let _ = file.write_all(error_entry.as_bytes()).await;
132 }
133
134 let mut stats = stats.write().await;
135 stats.failed += 1;
136 stats.in_progress -= 1;
137 }
138 }
139 })
140 .await;
141
142 let mut stats = self.stats.write().await;
143 stats.in_progress += 1;
144 stats.total_urls += 1;
145 } else {
146 let stats = self.stats.read().await;
147 if stats.in_progress == 0 {
148 break;
149 }
150 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
151 }
152
153 let mut stats = self.stats.write().await;
154 stats.elapsed_time = start_time.elapsed();
155 }
156
157 Ok(())
158 }
159
160 pub async fn get_stats(&self) -> CrawlStats {
161 self.stats.read().await.clone()
162 }
163
164 pub async fn get_results(&self) -> Vec<CrawlResult> {
165 self.results.read().await.clone()
166 }
167
168 pub async fn stop(&self) {
169 self.scheduler.shutdown().await;
170 }
171}