file_api/
http_reader.rs

1use hyper::StatusCode;
2
3use hyperx::{
4    header::{ByteRangeSpec, ByteRangeSpec::FromTo, ContentRangeSpec, Range::Bytes},
5    Headers,
6};
7
8use reqwest;
9use reqwest::{header, Client};
10
11use buffer::Buffer;
12use reader::Reader;
13
14use std::cmp;
15use std::io::{Error, ErrorKind, Read, Seek, SeekFrom};
16use std::str::FromStr;
17use std::time::Instant;
18
19fn get_head(filename: &str) -> Result<reqwest::Response, reqwest::Error> {
20    if filename.contains(".amazonaws.com") {
21        let range = vec![FromTo(0, 0)];
22
23        let mut headers = Headers::new();
24        headers.set(Bytes(range));
25        let client = reqwest::Client::builder()
26            .default_headers(headers.into())
27            .build()
28            .unwrap();
29
30        client.get(filename).send()
31    } else {
32        let client = Client::new();
33        client.head(filename).send()
34    }
35}
36
37#[derive(Debug)]
38struct ResponseData {
39    body_data: Vec<u8>,
40    file_size: u64,
41}
42
43fn get_data(filename: &str, range: Vec<ByteRangeSpec>) -> Result<ResponseData, String> {
44    let mut headers = Headers::new();
45    headers.set(Bytes(range));
46    let client = reqwest::Client::builder()
47        .default_headers(headers.into())
48        .build()
49        .unwrap();
50
51    let mut response = match client.get(filename).send() {
52        Ok(content) => content,
53        Err(_msg) => return Err("bad request".to_string()),
54    };
55
56    let status = response.status();
57
58    if !(status == StatusCode::OK || status == StatusCode::PARTIAL_CONTENT) {
59        error!("ERROR {:?}", response);
60        return Err("bad response status".to_string());
61    }
62
63    let mut body: Vec<u8> = vec![];
64    let _result = response.copy_to(&mut body);
65
66    let file_size = match get_content_range(&response) {
67        Ok(length) => length.unwrap_or(0),
68        Err(_msg) => return Err("bad response header".to_string()),
69    };
70
71    Ok(ResponseData {
72        body_data: body,
73        file_size,
74    })
75}
76
77fn get_content_range(response: &reqwest::Response) -> Result<Option<u64>, String> {
78    if let Some(content_range) = response.headers().get(header::CONTENT_RANGE) {
79        let content_range_str = content_range
80            .to_str()
81            .map_err(|msg| format!("Error serializing header value to str: {}", msg))?;
82
83        match ContentRangeSpec::from_str(content_range_str)
84            .map_err(|msg| format!("Error parsing content range from str: {}", msg))?
85        {
86            ContentRangeSpec::Bytes {
87                instance_length: length,
88                ..
89            } => Ok(length),
90            ContentRangeSpec::Unregistered { .. } => {
91                Err("Unregistered, actually unsupported".to_string())
92            }
93        }
94    } else {
95        Err("Missing content_range".to_string())
96    }
97}
98
99#[derive(Debug)]
100pub struct HttpReader {
101    pub filename: String,
102    pub file_size: Option<u64>,
103    pub position: u64,
104    pub buffer: Buffer,
105}
106
107pub fn exists(filename: &str) -> bool {
108    match get_head(filename) {
109        Ok(resp) => resp.status().is_success(),
110        Err(_msg) => false,
111    }
112}
113
114fn get_data_range(position: u64, size: usize, max_end_position: Option<u64>) -> Vec<ByteRangeSpec> {
115    let start = position;
116    let end = match (position, size) {
117        (0, 0) => 0,
118        (_, _) => match max_end_position {
119            Some(max) => {
120                let max_size = max - position;
121                position + cmp::min((size - 1) as u64, max_size)
122            }
123            None => position + (size - 1) as u64,
124        },
125    };
126
127    vec![FromTo(start, end)]
128}
129
130fn load_data(reader: &mut HttpReader, size: usize) -> Result<Option<Vec<u8>>, String> {
131    let start = Instant::now();
132    info!("make HTTP request with request {:?} bytes", size);
133
134    let position = match reader.buffer.size {
135        Some(_) => reader.buffer.position,
136        None => reader.position,
137    };
138
139    if let Some(total_file_size) = reader.file_size {
140        if position >= total_file_size {
141            info!(
142                "request range out of range: {} > {}",
143                position, total_file_size
144            );
145            return Ok(None);
146        }
147    }
148
149    let range = get_data_range(position, size, reader.buffer.max_end_position);
150    let response = match get_data(&reader.filename, range) {
151        Ok(data) => data,
152        Err(msg) => return Err(msg),
153    };
154
155    let elapsed = start.elapsed();
156    if elapsed.as_secs() > 0 {
157        warn!("Request duration {} seconds", elapsed.as_secs());
158    }
159
160    let new_position = position + response.body_data.len() as u64;
161    match reader.buffer.size {
162        Some(_) => {
163            reader.buffer.position = new_position;
164        }
165        None => {
166            reader.position = new_position;
167        }
168    };
169    Ok(Some(response.body_data))
170}
171
172impl Reader for HttpReader {
173    fn new() -> HttpReader {
174        HttpReader {
175            filename: "".to_string(),
176            file_size: None,
177            position: 0,
178            buffer: Buffer {
179                size: None,
180                position: 0,
181                max_end_position: None,
182                buffer: vec![],
183            },
184        }
185    }
186
187    fn open(&mut self, filename: &str) -> Result<(), String> {
188        self.filename = filename.to_string();
189
190        match get_head(filename) {
191            Err(msg) => Err(msg.to_string()),
192            Ok(response) => {
193                let content_length = match get_content_range(&response) {
194                    Ok(length) => length,
195                    _ => response.content_length(),
196                };
197
198                self.file_size = content_length;
199                Ok(())
200            }
201        }
202    }
203
204    fn get_position(&mut self) -> Result<u64, String> {
205        Ok(self.position)
206    }
207
208    fn get_cache_size(&self) -> Option<usize> {
209        self.buffer.size
210    }
211
212    fn set_cache_size(&mut self, cache_size: Option<usize>) {
213        self.buffer.size = cache_size;
214    }
215
216    fn get_max_end_position(&self) -> Option<u64> {
217        self.buffer.max_end_position
218    }
219
220    fn set_max_end_position(&mut self, max_end_position: Option<u64>) {
221        self.buffer.max_end_position = max_end_position;
222    }
223
224    fn get_size(&mut self) -> Result<u64, String> {
225        match self.file_size {
226            Some(length) => Ok(length),
227            None => Err("No length detected".to_string()),
228        }
229    }
230}
231
232impl Read for HttpReader {
233    fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
234        if self.buffer.get_cached_size() >= buf.len() {
235            self.position += buf.len() as u64;
236            if self.buffer.get_data(buf) {
237                Ok(buf.len())
238            } else {
239                Err(Error::new(
240                    ErrorKind::Other,
241                    "unable to read data from HTTP cache",
242                ))
243            }
244        } else if let Some(buffer_size) = self.buffer.size {
245            let some_data =
246                load_data(self, buffer_size).map_err(|msg| Error::new(ErrorKind::Other, msg))?;
247
248            if let Some(data) = some_data {
249                self.buffer.append_data(&data.to_vec());
250                self.position += buf.len() as u64;
251                if self.buffer.get_data(buf) {
252                    Ok(buf.len())
253                } else {
254                    Err(Error::new(
255                        ErrorKind::Other,
256                        "unable to read data from HTTP cache",
257                    ))
258                }
259            } else {
260                Ok(0)
261            }
262        } else {
263            let some_data =
264                load_data(self, buf.len()).map_err(|msg| Error::new(ErrorKind::Other, msg))?;
265
266            if let Some(data) = some_data {
267                if data.len() >= buf.len() {
268                    buf.clone_from_slice(&data);
269                    Ok(data.len())
270                } else {
271                    Ok(0)
272                }
273            } else {
274                Ok(0)
275            }
276        }
277    }
278}
279
280impl Seek for HttpReader {
281    fn seek(&mut self, seek_from: SeekFrom) -> Result<u64, Error> {
282        match seek_from {
283            SeekFrom::Current(offset) => {
284                self.position += offset as u64;
285                if self.buffer.size.is_some() {
286                    if offset > 0 && self.buffer.get_cached_size() > offset as usize {
287                        let mut skipped_data = vec![];
288                        skipped_data.resize(offset as usize, 0);
289                        let _skiped_data = self.buffer.get_data(&mut skipped_data);
290                    } else {
291                        self.buffer.reset();
292                    }
293                }
294            }
295            SeekFrom::Start(offset) => {
296                self.buffer.reset();
297                self.position = offset;
298                if self.buffer.size.is_some() {
299                    self.buffer.position = self.position;
300                }
301            }
302            SeekFrom::End(offset) => {
303                self.buffer.reset();
304                match self.file_size {
305                    Some(size) => {
306                        self.position = size - offset as u64;
307                        if self.buffer.size.is_some() {
308                            self.buffer.position = self.position;
309                        }
310                    }
311                    None => return Err(Error::new(ErrorKind::Other, "Missing file size")),
312                }
313            }
314        }
315        Ok(self.position)
316    }
317}