wayback_rs/
cdx.rs

1use super::{
2    item,
3    util::{retry_future, Retryable},
4    Item,
5};
6use futures::{Stream, TryStreamExt};
7use reqwest::Client;
8use std::io::{BufReader, Read};
9use std::time::Duration;
10use thiserror::Error;
11use tryhard::RetryPolicy;
12
13const TCP_KEEPALIVE_SECS: u64 = 20;
14const DEFAULT_CDX_BASE: &str = "http://web.archive.org/cdx/search/cdx";
15const CDX_OPTIONS: &str = "&output=json&fl=original,timestamp,digest,mimetype,length,statuscode";
16const BLOCKED_SITE_ERROR_MESSAGE: &str =
17        "org.archive.util.io.RuntimeIOException: org.archive.wayback.exception.AdministrativeAccessControlException: Blocked Site Error\n";
18
19#[derive(Error, Debug)]
20pub enum Error {
21    #[error("Item parsing error: {0}")]
22    ItemParsingError(#[from] item::Error),
23    #[error("HTTP client error: {0}")]
24    HttpClientError(#[from] reqwest::Error),
25    #[error("JSON decoding error: {0}")]
26    JsonError(#[from] serde_json::Error),
27    #[error("Blocked query: {0}")]
28    BlockedQuery(String),
29}
30
31impl Retryable for Error {
32    fn max_retries() -> u32 {
33        7
34    }
35
36    fn log_level() -> Option<log::Level> {
37        Some(log::Level::Warn)
38    }
39
40    fn default_initial_delay() -> Duration {
41        Duration::from_millis(250)
42    }
43
44    fn custom_retry_policy(&self) -> Option<RetryPolicy> {
45        match self {
46            Error::HttpClientError(_) => Some(RetryPolicy::Delay(Duration::from_secs(30))),
47            // The CDX server occasionally returns an empty body that results in a JSON parsing
48            // failure.
49            Error::JsonError(_) => Some(RetryPolicy::Delay(Duration::from_secs(30))),
50            _ => Some(RetryPolicy::Break),
51        }
52    }
53}
54
55pub struct IndexClient {
56    base: String,
57    underlying: Client,
58}
59
60impl IndexClient {
61    pub fn new(base: String) -> Result<Self, Error> {
62        Ok(Self {
63            base,
64            underlying: Client::builder()
65                .tcp_keepalive(Some(Duration::from_secs(TCP_KEEPALIVE_SECS)))
66                .build()?,
67        })
68    }
69
70    fn decode_rows(rows: Vec<Vec<String>>) -> Result<Vec<Item>, Error> {
71        rows.into_iter()
72            .skip(1)
73            .map(|row| {
74                Item::parse_optional_record(
75                    row.first().map(|v| v.as_str()),
76                    row.get(1).map(|v| v.as_str()),
77                    row.get(2).map(|v| v.as_str()),
78                    row.get(3).map(|v| v.as_str()),
79                    row.get(4).map(|v| v.as_str()),
80                    row.get(5).map(|v| v.as_str()),
81                )
82                .map_err(From::from)
83            })
84            .collect()
85    }
86
87    pub fn load_json<R: Read>(reader: R) -> Result<Vec<Item>, Error> {
88        let buffered = BufReader::new(reader);
89
90        let rows = serde_json::from_reader::<BufReader<R>, Vec<Vec<String>>>(buffered)?;
91
92        Self::decode_rows(rows)
93    }
94
95    pub fn stream_search<'a>(
96        &'a self,
97        query: &'a str,
98        limit: usize,
99    ) -> impl Stream<Item = Result<Item, Error>> + 'a {
100        futures::stream::try_unfold(Some(None), move |resume_key| async move {
101            let next = match resume_key {
102                Some(key) => {
103                    let (items, resume_key) =
104                        retry_future(|| self.search_with_resume_key(query, limit, &key)).await?;
105
106                    log::info!("Resume key: {:?}", resume_key);
107
108                    Some((items, resume_key.map(Some)))
109                }
110                None => None,
111            };
112
113            let result: Result<_, Error> = Ok(next);
114            result
115        })
116        .map_ok(|items| futures::stream::iter(items.into_iter().map(Ok)))
117        .try_flatten()
118    }
119
120    async fn search_with_resume_key(
121        &self,
122        query: &str,
123        limit: usize,
124        resume_key: &Option<String>,
125    ) -> Result<(Vec<Item>, Option<String>), Error> {
126        let resume_key_param = resume_key
127            .as_ref()
128            .map(|key| format!("&resumeKey={}", key))
129            .unwrap_or_default();
130        let query_url = format!(
131            "{}?url={}{}&limit={}&showResumeKey=true{}",
132            self.base, query, resume_key_param, limit, CDX_OPTIONS
133        );
134        log::info!("Search URL: {}", query_url);
135        let contents = self.underlying.get(&query_url).send().await?.text().await?;
136
137        if contents == BLOCKED_SITE_ERROR_MESSAGE {
138            Err(Error::BlockedQuery(query.to_string()))
139        } else {
140            let mut rows = serde_json::from_str::<Vec<Vec<String>>>(&contents)?;
141            let len = rows.len();
142            let next_resume_key = if len >= 2 && rows[len - 2].is_empty() {
143                let mut last = rows.remove(len - 1);
144                rows.remove(len - 2);
145                Some(last.remove(0))
146            } else {
147                None
148            };
149            log::info!("Rows received {}", rows.len());
150
151            Self::decode_rows(rows).map(|items| (items, next_resume_key))
152        }
153    }
154
155    pub async fn search(
156        &self,
157        query: &str,
158        timestamp: Option<&str>,
159        digest: Option<&str>,
160    ) -> Result<Vec<Item>, Error> {
161        let mut filter = String::new();
162
163        if let Some(value) = timestamp {
164            filter.push_str(&format!("&filter=timestamp:{}", value));
165        }
166
167        if let Some(value) = digest {
168            filter.push_str(&format!("&filter=digest:{}", value));
169        }
170
171        let query_url = format!("{}?url={}{}{}", self.base, query, filter, CDX_OPTIONS);
172        let contents = self.underlying.get(&query_url).send().await?.text().await?;
173
174        if contents == BLOCKED_SITE_ERROR_MESSAGE {
175            Err(Error::BlockedQuery(query.to_string()))
176        } else {
177            let rows = serde_json::from_str(&contents)?;
178            Self::decode_rows(rows)
179        }
180    }
181}
182
183impl Default for IndexClient {
184    fn default() -> Self {
185        Self::new(DEFAULT_CDX_BASE.to_string()).unwrap()
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::IndexClient;
192    use std::fs::File;
193
194    #[test]
195    fn load_json() {
196        let file = File::open("examples/wayback/cdx-result.json").unwrap();
197        let result = IndexClient::load_json(file).unwrap();
198
199        assert_eq!(result.len(), 37);
200    }
201}