1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#![doc(html_root_url = "https://docs.rs/amadeus-commoncrawl/0.3.0")]
#![feature(type_alias_impl_trait)]
#![warn(
trivial_numeric_casts,
unused_import_braces,
unused_qualifications,
unused_results,
unreachable_pub,
clippy::pedantic,
)]
#![allow(
clippy::doc_markdown,
clippy::inline_always,
clippy::missing_errors_doc
)]
#![deny(unsafe_code)]
mod commoncrawl;
mod parser;
use async_compression::futures::bufread::GzipDecoder;
use futures::{io::BufReader, AsyncBufReadExt, FutureExt, StreamExt, TryStreamExt};
use reqwest_resume::ClientExt;
use serde_closure::FnMut;
use std::{io, time};
use amadeus_core::{
into_par_stream::IntoDistributedStream, par_stream::DistributedStream, util::DistParStream, Source
};
use amadeus_types::Webpage;
use commoncrawl::WarcParser;
#[derive(Clone, Debug)]
pub struct CommonCrawl {
urls: Vec<String>,
}
impl CommonCrawl {
pub async fn new(id: &str) -> Result<Self, reqwest::Error> {
let url = format!(
"https://commoncrawl.s3.amazonaws.com/crawl-data/{}/warc.paths.gz",
id
);
let body = reqwest::ClientBuilder::new()
.timeout(time::Duration::new(120, 0))
.build()
.unwrap()
.resumable()
.get(url.parse().unwrap())
.send();
let body = body
.await?
.bytes_stream()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
let body = BufReader::new(body.into_async_read());
let mut body = GzipDecoder::new(body);
body.multiple_members(true);
let urls = BufReader::new(body)
.lines()
.map(FnMut!(|url: Result<String, io::Error>| -> String {
format!("http://commoncrawl.s3.amazonaws.com/{}", url.unwrap())
}))
.collect()
.await;
Ok(Self { urls })
}
}
impl Source for CommonCrawl {
type Item = Webpage<'static>;
type Error = io::Error;
#[cfg(not(doc))]
type ParStream =
impl amadeus_core::par_stream::ParallelStream<Item = Result<Self::Item, Self::Error>>;
#[cfg(doc)]
type ParStream =
DistParStream<amadeus_core::util::ImplDistributedStream<Result<Self::Item, Self::Error>>>;
#[cfg(not(doc))]
type DistStream = impl DistributedStream<Item = Result<Self::Item, Self::Error>>;
#[cfg(doc)]
type DistStream = amadeus_core::util::ImplDistributedStream<Result<Self::Item, Self::Error>>;
fn par_stream(self) -> Self::ParStream {
DistParStream::new(self.dist_stream())
}
#[allow(clippy::let_and_return)]
fn dist_stream(self) -> Self::DistStream {
let ret = self
.urls
.into_dist_stream()
.flat_map(FnMut!(|url: String| async move {
let body = reqwest_resume::get(url.parse().unwrap()).await.unwrap();
let body = body
.bytes_stream()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
let body = BufReader::new(body.into_async_read());
let mut body = GzipDecoder::new(body);
body.multiple_members(true);
WarcParser::new(body)
}
.flatten_stream()));
#[cfg(doc)]
let ret = amadeus_core::util::ImplDistributedStream::new(ret);
ret
}
}