1use super::{
2 item,
3 util::{retry_future, Retryable},
4 Item,
5};
6use futures::{Stream, TryStreamExt};
7use reqwest::Client;
8use std::io::{BufReader, Read};
9use std::time::Duration;
10use thiserror::Error;
11use tryhard::RetryPolicy;
12
13const TCP_KEEPALIVE_SECS: u64 = 20;
14const DEFAULT_CDX_BASE: &str = "http://web.archive.org/cdx/search/cdx";
15const CDX_OPTIONS: &str = "&output=json&fl=original,timestamp,digest,mimetype,length,statuscode";
16const BLOCKED_SITE_ERROR_MESSAGE: &str =
17 "org.archive.util.io.RuntimeIOException: org.archive.wayback.exception.AdministrativeAccessControlException: Blocked Site Error\n";
18
19#[derive(Error, Debug)]
20pub enum Error {
21 #[error("Item parsing error: {0}")]
22 ItemParsingError(#[from] item::Error),
23 #[error("HTTP client error: {0}")]
24 HttpClientError(#[from] reqwest::Error),
25 #[error("JSON decoding error: {0}")]
26 JsonError(#[from] serde_json::Error),
27 #[error("Blocked query: {0}")]
28 BlockedQuery(String),
29}
30
31impl Retryable for Error {
32 fn max_retries() -> u32 {
33 7
34 }
35
36 fn log_level() -> Option<log::Level> {
37 Some(log::Level::Warn)
38 }
39
40 fn default_initial_delay() -> Duration {
41 Duration::from_millis(250)
42 }
43
44 fn custom_retry_policy(&self) -> Option<RetryPolicy> {
45 match self {
46 Error::HttpClientError(_) => Some(RetryPolicy::Delay(Duration::from_secs(30))),
47 Error::JsonError(_) => Some(RetryPolicy::Delay(Duration::from_secs(30))),
50 _ => Some(RetryPolicy::Break),
51 }
52 }
53}
54
55pub struct IndexClient {
56 base: String,
57 underlying: Client,
58}
59
60impl IndexClient {
61 pub fn new(base: String) -> Result<Self, Error> {
62 Ok(Self {
63 base,
64 underlying: Client::builder()
65 .tcp_keepalive(Some(Duration::from_secs(TCP_KEEPALIVE_SECS)))
66 .build()?,
67 })
68 }
69
70 fn decode_rows(rows: Vec<Vec<String>>) -> Result<Vec<Item>, Error> {
71 rows.into_iter()
72 .skip(1)
73 .map(|row| {
74 Item::parse_optional_record(
75 row.first().map(|v| v.as_str()),
76 row.get(1).map(|v| v.as_str()),
77 row.get(2).map(|v| v.as_str()),
78 row.get(3).map(|v| v.as_str()),
79 row.get(4).map(|v| v.as_str()),
80 row.get(5).map(|v| v.as_str()),
81 )
82 .map_err(From::from)
83 })
84 .collect()
85 }
86
87 pub fn load_json<R: Read>(reader: R) -> Result<Vec<Item>, Error> {
88 let buffered = BufReader::new(reader);
89
90 let rows = serde_json::from_reader::<BufReader<R>, Vec<Vec<String>>>(buffered)?;
91
92 Self::decode_rows(rows)
93 }
94
95 pub fn stream_search<'a>(
96 &'a self,
97 query: &'a str,
98 limit: usize,
99 ) -> impl Stream<Item = Result<Item, Error>> + 'a {
100 futures::stream::try_unfold(Some(None), move |resume_key| async move {
101 let next = match resume_key {
102 Some(key) => {
103 let (items, resume_key) =
104 retry_future(|| self.search_with_resume_key(query, limit, &key)).await?;
105
106 log::info!("Resume key: {:?}", resume_key);
107
108 Some((items, resume_key.map(Some)))
109 }
110 None => None,
111 };
112
113 let result: Result<_, Error> = Ok(next);
114 result
115 })
116 .map_ok(|items| futures::stream::iter(items.into_iter().map(Ok)))
117 .try_flatten()
118 }
119
120 async fn search_with_resume_key(
121 &self,
122 query: &str,
123 limit: usize,
124 resume_key: &Option<String>,
125 ) -> Result<(Vec<Item>, Option<String>), Error> {
126 let resume_key_param = resume_key
127 .as_ref()
128 .map(|key| format!("&resumeKey={}", key))
129 .unwrap_or_default();
130 let query_url = format!(
131 "{}?url={}{}&limit={}&showResumeKey=true{}",
132 self.base, query, resume_key_param, limit, CDX_OPTIONS
133 );
134 log::info!("Search URL: {}", query_url);
135 let contents = self.underlying.get(&query_url).send().await?.text().await?;
136
137 if contents == BLOCKED_SITE_ERROR_MESSAGE {
138 Err(Error::BlockedQuery(query.to_string()))
139 } else {
140 let mut rows = serde_json::from_str::<Vec<Vec<String>>>(&contents)?;
141 let len = rows.len();
142 let next_resume_key = if len >= 2 && rows[len - 2].is_empty() {
143 let mut last = rows.remove(len - 1);
144 rows.remove(len - 2);
145 Some(last.remove(0))
146 } else {
147 None
148 };
149 log::info!("Rows received {}", rows.len());
150
151 Self::decode_rows(rows).map(|items| (items, next_resume_key))
152 }
153 }
154
155 pub async fn search(
156 &self,
157 query: &str,
158 timestamp: Option<&str>,
159 digest: Option<&str>,
160 ) -> Result<Vec<Item>, Error> {
161 let mut filter = String::new();
162
163 if let Some(value) = timestamp {
164 filter.push_str(&format!("&filter=timestamp:{}", value));
165 }
166
167 if let Some(value) = digest {
168 filter.push_str(&format!("&filter=digest:{}", value));
169 }
170
171 let query_url = format!("{}?url={}{}{}", self.base, query, filter, CDX_OPTIONS);
172 let contents = self.underlying.get(&query_url).send().await?.text().await?;
173
174 if contents == BLOCKED_SITE_ERROR_MESSAGE {
175 Err(Error::BlockedQuery(query.to_string()))
176 } else {
177 let rows = serde_json::from_str(&contents)?;
178 Self::decode_rows(rows)
179 }
180 }
181}
182
183impl Default for IndexClient {
184 fn default() -> Self {
185 Self::new(DEFAULT_CDX_BASE.to_string()).unwrap()
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use super::IndexClient;
192 use std::fs::File;
193
194 #[test]
195 fn load_json() {
196 let file = File::open("examples/wayback/cdx-result.json").unwrap();
197 let result = IndexClient::load_json(file).unwrap();
198
199 assert_eq!(result.len(), 37);
200 }
201}