libnetrunner/
mod.rs

1use anyhow::Result;
2use bootstrap::Bootstrapper;
3use governor::{Quota, RateLimiter};
4use nonzero_ext::nonzero;
5use parser::ParseResult;
6use reqwest::Client;
7use spyglass_lens::LensConfig;
8use std::collections::HashSet;
9use std::path::{Path, PathBuf};
10use std::sync::{
11    atomic::{AtomicUsize, Ordering},
12    Arc,
13};
14use url::Url;
15
16pub mod archive;
17pub mod bootstrap;
18mod cache;
19mod cdx;
20pub mod crawler;
21pub mod parser;
22pub mod s3;
23pub mod site;
24pub mod validator;
25
26use crate::crawler::{handle_crawl, http_client};
27use archive::{create_archives, ArchiveFiles, ArchiveRecord};
28
29static APP_USER_AGENT: &str = concat!("netrunner", "/", env!("CARGO_PKG_VERSION"));
30
31pub struct CrawlOpts {
32    pub create_warc: bool,
33    pub requests_per_second: u32,
34}
35
36impl Default for CrawlOpts {
37    fn default() -> Self {
38        Self {
39            create_warc: false,
40            requests_per_second: 2,
41        }
42    }
43}
44
45pub fn cache_storage_path(lens: &LensConfig) -> PathBuf {
46    let storage = Path::new("archives").join(&lens.name);
47    if !storage.exists() {
48        // No point in continuing if we're unable to create this directory
49        std::fs::create_dir_all(storage.clone()).expect("Unable to create crawl folder");
50    }
51
52    storage
53}
54
55pub fn tmp_storage_path(lens: &LensConfig) -> PathBuf {
56    let storage = Path::new("tmp").join(&lens.name);
57    if !storage.exists() {
58        // No point in continuing if we're unable to create this directory
59        std::fs::create_dir_all(storage.clone()).expect("Unable to create crawl folder");
60    }
61
62    storage
63}
64
65#[derive(Clone)]
66pub struct NetrunnerState {
67    pub has_urls: bool,
68}
69
70#[derive(Clone)]
71pub struct Netrunner {
72    bootstrapper: Bootstrapper,
73    client: Client,
74    lens: LensConfig,
75    // Where the cached web archive will be storage
76    pub storage: PathBuf,
77    pub state: NetrunnerState,
78}
79
80impl Netrunner {
81    pub fn new(lens: LensConfig) -> Self {
82        let storage = cache_storage_path(&lens);
83        let state = NetrunnerState {
84            has_urls: storage.join("urls.txt").exists(),
85        };
86        let client = http_client();
87
88        Netrunner {
89            bootstrapper: Bootstrapper::new(&client),
90            client,
91            lens,
92            storage,
93            state,
94        }
95    }
96
97    pub fn url_txt_path(&self) -> PathBuf {
98        self.storage.join("urls.txt")
99    }
100
101    pub async fn get_urls(&mut self) -> Vec<String> {
102        match self.bootstrapper.find_urls(&self.lens).await {
103            Ok(urls) => urls,
104            Err(err) => {
105                log::warn!("Unable to get_urls: {err}");
106                Vec::new()
107            }
108        }
109    }
110
111    /// Kick off a crawl for URLs represented by <lens>.
112    pub async fn crawl(&mut self, opts: CrawlOpts) -> Result<Option<ArchiveFiles>> {
113        let crawl_queue = if !self.state.has_urls {
114            self.bootstrapper.find_urls(&self.lens).await?
115        } else {
116            log::info!("Already collected URLs, skipping");
117            // Load urls from file
118            let file = std::fs::read_to_string(self.url_txt_path())?;
119            file.lines().map(|x| x.to_string()).collect::<Vec<String>>()
120        };
121
122        if opts.create_warc {
123            // CRAWL BABY CRAWL
124            // Default to max 2 requests per second for a domain.
125            let quota = Quota::per_second(nonzero!(2u32));
126            let tmp_storage = tmp_storage_path(&self.lens);
127            self.crawl_loop(&crawl_queue, &tmp_storage, quota).await?;
128            let archives =
129                create_archives(&self.storage, &self.cached_records(&tmp_storage)).await?;
130            return Ok(Some(archives));
131        }
132
133        Ok(None)
134    }
135
136    pub async fn crawl_url(
137        &mut self,
138        url: String,
139    ) -> Result<Vec<(ArchiveRecord, Option<ParseResult>)>> {
140        let quota = Quota::per_second(nonzero!(2u32));
141        let tmp_storage = tmp_storage_path(&self.lens);
142        self.crawl_loop(&[url], &tmp_storage, quota).await?;
143        let archived = self.cached_records(&tmp_storage);
144
145        let mut records = Vec::new();
146        for (_, path) in archived {
147            if let Ok(Ok(rec)) =
148                std::fs::read_to_string(path).map(|s| ron::from_str::<ArchiveRecord>(&s))
149            {
150                if rec.status >= 200 && rec.status <= 299 {
151                    let parsed = crate::parser::html::html_to_text(&rec.url, &rec.content);
152                    records.push((rec, Some(parsed)));
153                } else {
154                    records.push((rec, None));
155                }
156            }
157        }
158
159        Ok(records)
160    }
161
162    pub fn clear_cache(&self) -> Result<(), std::io::Error> {
163        std::fs::remove_dir_all(tmp_storage_path(&self.lens))
164    }
165
166    fn cached_records(&self, tmp_storage: &PathBuf) -> Vec<(String, PathBuf)> {
167        let paths = std::fs::read_dir(tmp_storage).expect("unable to read tmp storage dir");
168
169        let mut existing = HashSet::new();
170        let mut to_remove = Vec::new();
171        let mut recs = Vec::new();
172        for path in paths.flatten() {
173            match std::fs::read_to_string(path.path()) {
174                Ok(contents) => {
175                    if let Ok(record) = ron::from_str::<ArchiveRecord>(&contents) {
176                        let url = record.url;
177                        if existing.contains(&url) {
178                            to_remove.push(path.path());
179                        } else {
180                            existing.insert(url.clone());
181                            recs.push((url, path.path()));
182                        }
183                    }
184                }
185                Err(_) => {
186                    let _ = std::fs::remove_file(path.path());
187                }
188            }
189        }
190
191        log::info!("Removing {} existing caches", to_remove.len());
192        for path in to_remove {
193            let _ = std::fs::remove_file(path);
194        }
195
196        recs
197    }
198
199    /// Web Archive (WARC) file format definition: https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1
200    async fn crawl_loop(
201        &mut self,
202        crawl_queue: &[String],
203        tmp_storage: &PathBuf,
204        quota: Quota,
205    ) -> anyhow::Result<()> {
206        let lim = Arc::new(RateLimiter::<String, _, _>::keyed(quota));
207        let progress = Arc::new(AtomicUsize::new(0));
208        let total = crawl_queue.len();
209        let mut already_crawled: HashSet<String> = HashSet::new();
210
211        // Before we begin, check to see if we've already crawled anything
212        let recs = self.cached_records(tmp_storage);
213        log::debug!("found {} crawls in cache", recs.len());
214        for (url, _) in recs {
215            already_crawled.insert(url);
216        }
217
218        log::info!(
219            "beginning crawl, already crawled {} urls",
220            already_crawled.len()
221        );
222        progress.store(already_crawled.len(), Ordering::SeqCst);
223
224        // Spin up tasks to crawl through everything
225        for url in crawl_queue.iter().filter_map(|url| Url::parse(url).ok()) {
226            if already_crawled.contains(&url.to_string()) {
227                log::info!("-> skipping {}, already crawled", url);
228                continue;
229            }
230
231            if let Err(err) =
232                handle_crawl(&self.client, Some(tmp_storage.clone()), lim.clone(), &url).await
233            {
234                log::warn!("Unable to crawl {} - {err}", &url);
235            }
236
237            let progress = progress.clone();
238            let old_val = progress.fetch_add(1, Ordering::SeqCst);
239            if old_val % 100 == 0 {
240                log::info!("progress: {} / {}", old_val, total)
241            }
242        }
243
244        Ok(())
245    }
246}
247
248#[cfg(test)]
249mod test {
250    use spyglass_lens::LensConfig;
251    use std::io;
252    use std::path::Path;
253    use tracing_log::LogTracer;
254    use tracing_subscriber::{fmt, layer::SubscriberExt, EnvFilter};
255
256    use crate::{parser::ParseResult, validator::validate_lens, CrawlOpts, Netrunner};
257
258    #[tokio::test]
259    #[ignore]
260    async fn test_crawl() {
261        // Setup some nice console logging for tests
262        let subscriber = tracing_subscriber::registry()
263            .with(
264                EnvFilter::from_default_env()
265                    .add_directive(tracing::Level::INFO.into())
266                    .add_directive("libnetrunner=TRACE".parse().expect("invalid log filter")),
267            )
268            .with(fmt::Layer::new().with_ansi(false).with_writer(io::stdout));
269        tracing::subscriber::set_global_default(subscriber)
270            .expect("Unable to set a global subscriber");
271        LogTracer::init().expect("Unable to initialize logger");
272
273        let lens_file = "fixtures/test.ron";
274        let lens = LensConfig::from_path(Path::new(&lens_file).to_path_buf())
275            .expect("Unable to load lens file");
276
277        // Test crawling logic
278        let mut netrunner = Netrunner::new(lens.clone());
279        let archives = netrunner
280            .crawl(CrawlOpts {
281                create_warc: true,
282                ..Default::default()
283            })
284            .await
285            .expect("Unable to crawl");
286
287        // Validate archives created are readable.
288        if let Some(archives) = archives {
289            assert!(archives.warc.exists());
290            assert!(archives.parsed.exists());
291
292            let reader =
293                ParseResult::iter_from_gz(&archives.parsed).expect("Unable to read parsed archive");
294
295            assert_eq!(reader.count(), 1);
296        }
297
298        // Test validation logic
299        if let Err(err) = validate_lens(&lens) {
300            eprintln!("Failed validation: {err}");
301            panic!("Failed");
302        }
303    }
304}