1use anyhow::Result;
2use bootstrap::Bootstrapper;
3use governor::{Quota, RateLimiter};
4use nonzero_ext::nonzero;
5use parser::ParseResult;
6use reqwest::Client;
7use spyglass_lens::LensConfig;
8use std::collections::HashSet;
9use std::path::{Path, PathBuf};
10use std::sync::{
11 atomic::{AtomicUsize, Ordering},
12 Arc,
13};
14use url::Url;
15
16pub mod archive;
17pub mod bootstrap;
18mod cache;
19mod cdx;
20pub mod crawler;
21pub mod parser;
22pub mod s3;
23pub mod site;
24pub mod validator;
25
26use crate::crawler::{handle_crawl, http_client};
27use archive::{create_archives, ArchiveFiles, ArchiveRecord};
28
29static APP_USER_AGENT: &str = concat!("netrunner", "/", env!("CARGO_PKG_VERSION"));
30
31pub struct CrawlOpts {
32 pub create_warc: bool,
33 pub requests_per_second: u32,
34}
35
36impl Default for CrawlOpts {
37 fn default() -> Self {
38 Self {
39 create_warc: false,
40 requests_per_second: 2,
41 }
42 }
43}
44
45pub fn cache_storage_path(lens: &LensConfig) -> PathBuf {
46 let storage = Path::new("archives").join(&lens.name);
47 if !storage.exists() {
48 std::fs::create_dir_all(storage.clone()).expect("Unable to create crawl folder");
50 }
51
52 storage
53}
54
55pub fn tmp_storage_path(lens: &LensConfig) -> PathBuf {
56 let storage = Path::new("tmp").join(&lens.name);
57 if !storage.exists() {
58 std::fs::create_dir_all(storage.clone()).expect("Unable to create crawl folder");
60 }
61
62 storage
63}
64
65#[derive(Clone)]
66pub struct NetrunnerState {
67 pub has_urls: bool,
68}
69
70#[derive(Clone)]
71pub struct Netrunner {
72 bootstrapper: Bootstrapper,
73 client: Client,
74 lens: LensConfig,
75 pub storage: PathBuf,
77 pub state: NetrunnerState,
78}
79
80impl Netrunner {
81 pub fn new(lens: LensConfig) -> Self {
82 let storage = cache_storage_path(&lens);
83 let state = NetrunnerState {
84 has_urls: storage.join("urls.txt").exists(),
85 };
86 let client = http_client();
87
88 Netrunner {
89 bootstrapper: Bootstrapper::new(&client),
90 client,
91 lens,
92 storage,
93 state,
94 }
95 }
96
97 pub fn url_txt_path(&self) -> PathBuf {
98 self.storage.join("urls.txt")
99 }
100
101 pub async fn get_urls(&mut self) -> Vec<String> {
102 match self.bootstrapper.find_urls(&self.lens).await {
103 Ok(urls) => urls,
104 Err(err) => {
105 log::warn!("Unable to get_urls: {err}");
106 Vec::new()
107 }
108 }
109 }
110
111 pub async fn crawl(&mut self, opts: CrawlOpts) -> Result<Option<ArchiveFiles>> {
113 let crawl_queue = if !self.state.has_urls {
114 self.bootstrapper.find_urls(&self.lens).await?
115 } else {
116 log::info!("Already collected URLs, skipping");
117 let file = std::fs::read_to_string(self.url_txt_path())?;
119 file.lines().map(|x| x.to_string()).collect::<Vec<String>>()
120 };
121
122 if opts.create_warc {
123 let quota = Quota::per_second(nonzero!(2u32));
126 let tmp_storage = tmp_storage_path(&self.lens);
127 self.crawl_loop(&crawl_queue, &tmp_storage, quota).await?;
128 let archives =
129 create_archives(&self.storage, &self.cached_records(&tmp_storage)).await?;
130 return Ok(Some(archives));
131 }
132
133 Ok(None)
134 }
135
136 pub async fn crawl_url(
137 &mut self,
138 url: String,
139 ) -> Result<Vec<(ArchiveRecord, Option<ParseResult>)>> {
140 let quota = Quota::per_second(nonzero!(2u32));
141 let tmp_storage = tmp_storage_path(&self.lens);
142 self.crawl_loop(&[url], &tmp_storage, quota).await?;
143 let archived = self.cached_records(&tmp_storage);
144
145 let mut records = Vec::new();
146 for (_, path) in archived {
147 if let Ok(Ok(rec)) =
148 std::fs::read_to_string(path).map(|s| ron::from_str::<ArchiveRecord>(&s))
149 {
150 if rec.status >= 200 && rec.status <= 299 {
151 let parsed = crate::parser::html::html_to_text(&rec.url, &rec.content);
152 records.push((rec, Some(parsed)));
153 } else {
154 records.push((rec, None));
155 }
156 }
157 }
158
159 Ok(records)
160 }
161
162 pub fn clear_cache(&self) -> Result<(), std::io::Error> {
163 std::fs::remove_dir_all(tmp_storage_path(&self.lens))
164 }
165
166 fn cached_records(&self, tmp_storage: &PathBuf) -> Vec<(String, PathBuf)> {
167 let paths = std::fs::read_dir(tmp_storage).expect("unable to read tmp storage dir");
168
169 let mut existing = HashSet::new();
170 let mut to_remove = Vec::new();
171 let mut recs = Vec::new();
172 for path in paths.flatten() {
173 match std::fs::read_to_string(path.path()) {
174 Ok(contents) => {
175 if let Ok(record) = ron::from_str::<ArchiveRecord>(&contents) {
176 let url = record.url;
177 if existing.contains(&url) {
178 to_remove.push(path.path());
179 } else {
180 existing.insert(url.clone());
181 recs.push((url, path.path()));
182 }
183 }
184 }
185 Err(_) => {
186 let _ = std::fs::remove_file(path.path());
187 }
188 }
189 }
190
191 log::info!("Removing {} existing caches", to_remove.len());
192 for path in to_remove {
193 let _ = std::fs::remove_file(path);
194 }
195
196 recs
197 }
198
199 async fn crawl_loop(
201 &mut self,
202 crawl_queue: &[String],
203 tmp_storage: &PathBuf,
204 quota: Quota,
205 ) -> anyhow::Result<()> {
206 let lim = Arc::new(RateLimiter::<String, _, _>::keyed(quota));
207 let progress = Arc::new(AtomicUsize::new(0));
208 let total = crawl_queue.len();
209 let mut already_crawled: HashSet<String> = HashSet::new();
210
211 let recs = self.cached_records(tmp_storage);
213 log::debug!("found {} crawls in cache", recs.len());
214 for (url, _) in recs {
215 already_crawled.insert(url);
216 }
217
218 log::info!(
219 "beginning crawl, already crawled {} urls",
220 already_crawled.len()
221 );
222 progress.store(already_crawled.len(), Ordering::SeqCst);
223
224 for url in crawl_queue.iter().filter_map(|url| Url::parse(url).ok()) {
226 if already_crawled.contains(&url.to_string()) {
227 log::info!("-> skipping {}, already crawled", url);
228 continue;
229 }
230
231 if let Err(err) =
232 handle_crawl(&self.client, Some(tmp_storage.clone()), lim.clone(), &url).await
233 {
234 log::warn!("Unable to crawl {} - {err}", &url);
235 }
236
237 let progress = progress.clone();
238 let old_val = progress.fetch_add(1, Ordering::SeqCst);
239 if old_val % 100 == 0 {
240 log::info!("progress: {} / {}", old_val, total)
241 }
242 }
243
244 Ok(())
245 }
246}
247
248#[cfg(test)]
249mod test {
250 use spyglass_lens::LensConfig;
251 use std::io;
252 use std::path::Path;
253 use tracing_log::LogTracer;
254 use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter};
255
256 use crate::{parser::ParseResult, validator::validate_lens, CrawlOpts, Netrunner};
257
258 #[tokio::test]
259 #[ignore]
260 async fn test_crawl() {
261 let subscriber = tracing_subscriber::registry()
263 .with(
264 EnvFilter::from_default_env()
265 .add_directive(tracing::Level::INFO.into())
266 .add_directive("libnetrunner=TRACE".parse().expect("invalid log filter")),
267 )
268 .with(fmt::Layer::new().with_ansi(false).with_writer(io::stdout));
269 tracing::subscriber::set_global_default(subscriber)
270 .expect("Unable to set a global subscriber");
271 LogTracer::init().expect("Unable to initialize logger");
272
273 let lens_file = "fixtures/test.ron";
274 let lens = LensConfig::from_path(Path::new(&lens_file).to_path_buf())
275 .expect("Unable to load lens file");
276
277 let mut netrunner = Netrunner::new(lens.clone());
279 let archives = netrunner
280 .crawl(CrawlOpts {
281 create_warc: true,
282 ..Default::default()
283 })
284 .await
285 .expect("Unable to crawl");
286
287 if let Some(archives) = archives {
289 assert!(archives.warc.exists());
290 assert!(archives.parsed.exists());
291
292 let reader =
293 ParseResult::iter_from_gz(&archives.parsed).expect("Unable to read parsed archive");
294
295 assert_eq!(reader.count(), 1);
296 }
297
298 if let Err(err) = validate_lens(&lens) {
300 eprintln!("Failed validation: {err}");
301 panic!("Failed");
302 }
303 }
304}