1use hyper::StatusCode;
2
3use hyperx::{
4 header::{ByteRangeSpec, ByteRangeSpec::FromTo, ContentRangeSpec, Range::Bytes},
5 Headers,
6};
7
8use reqwest;
9use reqwest::{header, Client};
10
11use buffer::Buffer;
12use reader::Reader;
13
14use std::cmp;
15use std::io::{Error, ErrorKind, Read, Seek, SeekFrom};
16use std::str::FromStr;
17use std::time::Instant;
18
19fn get_head(filename: &str) -> Result<reqwest::Response, reqwest::Error> {
20 if filename.contains(".amazonaws.com") {
21 let range = vec![FromTo(0, 0)];
22
23 let mut headers = Headers::new();
24 headers.set(Bytes(range));
25 let client = reqwest::Client::builder()
26 .default_headers(headers.into())
27 .build()
28 .unwrap();
29
30 client.get(filename).send()
31 } else {
32 let client = Client::new();
33 client.head(filename).send()
34 }
35}
36
37#[derive(Debug)]
38struct ResponseData {
39 body_data: Vec<u8>,
40 file_size: u64,
41}
42
43fn get_data(filename: &str, range: Vec<ByteRangeSpec>) -> Result<ResponseData, String> {
44 let mut headers = Headers::new();
45 headers.set(Bytes(range));
46 let client = reqwest::Client::builder()
47 .default_headers(headers.into())
48 .build()
49 .unwrap();
50
51 let mut response = match client.get(filename).send() {
52 Ok(content) => content,
53 Err(_msg) => return Err("bad request".to_string()),
54 };
55
56 let status = response.status();
57
58 if !(status == StatusCode::OK || status == StatusCode::PARTIAL_CONTENT) {
59 error!("ERROR {:?}", response);
60 return Err("bad response status".to_string());
61 }
62
63 let mut body: Vec<u8> = vec![];
64 let _result = response.copy_to(&mut body);
65
66 let file_size = match get_content_range(&response) {
67 Ok(length) => length.unwrap_or(0),
68 Err(_msg) => return Err("bad response header".to_string()),
69 };
70
71 Ok(ResponseData {
72 body_data: body,
73 file_size,
74 })
75}
76
77fn get_content_range(response: &reqwest::Response) -> Result<Option<u64>, String> {
78 if let Some(content_range) = response.headers().get(header::CONTENT_RANGE) {
79 let content_range_str = content_range
80 .to_str()
81 .map_err(|msg| format!("Error serializing header value to str: {}", msg))?;
82
83 match ContentRangeSpec::from_str(content_range_str)
84 .map_err(|msg| format!("Error parsing content range from str: {}", msg))?
85 {
86 ContentRangeSpec::Bytes {
87 instance_length: length,
88 ..
89 } => Ok(length),
90 ContentRangeSpec::Unregistered { .. } => {
91 Err("Unregistered, actually unsupported".to_string())
92 }
93 }
94 } else {
95 Err("Missing content_range".to_string())
96 }
97}
98
99#[derive(Debug)]
100pub struct HttpReader {
101 pub filename: String,
102 pub file_size: Option<u64>,
103 pub position: u64,
104 pub buffer: Buffer,
105}
106
107pub fn exists(filename: &str) -> bool {
108 match get_head(filename) {
109 Ok(resp) => resp.status().is_success(),
110 Err(_msg) => false,
111 }
112}
113
114fn get_data_range(position: u64, size: usize, max_end_position: Option<u64>) -> Vec<ByteRangeSpec> {
115 let start = position;
116 let end = match (position, size) {
117 (0, 0) => 0,
118 (_, _) => match max_end_position {
119 Some(max) => {
120 let max_size = max - position;
121 position + cmp::min((size - 1) as u64, max_size)
122 }
123 None => position + (size - 1) as u64,
124 },
125 };
126
127 vec![FromTo(start, end)]
128}
129
130fn load_data(reader: &mut HttpReader, size: usize) -> Result<Option<Vec<u8>>, String> {
131 let start = Instant::now();
132 info!("make HTTP request with request {:?} bytes", size);
133
134 let position = match reader.buffer.size {
135 Some(_) => reader.buffer.position,
136 None => reader.position,
137 };
138
139 if let Some(total_file_size) = reader.file_size {
140 if position >= total_file_size {
141 info!(
142 "request range out of range: {} > {}",
143 position, total_file_size
144 );
145 return Ok(None);
146 }
147 }
148
149 let range = get_data_range(position, size, reader.buffer.max_end_position);
150 let response = match get_data(&reader.filename, range) {
151 Ok(data) => data,
152 Err(msg) => return Err(msg),
153 };
154
155 let elapsed = start.elapsed();
156 if elapsed.as_secs() > 0 {
157 warn!("Request duration {} seconds", elapsed.as_secs());
158 }
159
160 let new_position = position + response.body_data.len() as u64;
161 match reader.buffer.size {
162 Some(_) => {
163 reader.buffer.position = new_position;
164 }
165 None => {
166 reader.position = new_position;
167 }
168 };
169 Ok(Some(response.body_data))
170}
171
172impl Reader for HttpReader {
173 fn new() -> HttpReader {
174 HttpReader {
175 filename: "".to_string(),
176 file_size: None,
177 position: 0,
178 buffer: Buffer {
179 size: None,
180 position: 0,
181 max_end_position: None,
182 buffer: vec![],
183 },
184 }
185 }
186
187 fn open(&mut self, filename: &str) -> Result<(), String> {
188 self.filename = filename.to_string();
189
190 match get_head(filename) {
191 Err(msg) => Err(msg.to_string()),
192 Ok(response) => {
193 let content_length = match get_content_range(&response) {
194 Ok(length) => length,
195 _ => response.content_length(),
196 };
197
198 self.file_size = content_length;
199 Ok(())
200 }
201 }
202 }
203
204 fn get_position(&mut self) -> Result<u64, String> {
205 Ok(self.position)
206 }
207
208 fn get_cache_size(&self) -> Option<usize> {
209 self.buffer.size
210 }
211
212 fn set_cache_size(&mut self, cache_size: Option<usize>) {
213 self.buffer.size = cache_size;
214 }
215
216 fn get_max_end_position(&self) -> Option<u64> {
217 self.buffer.max_end_position
218 }
219
220 fn set_max_end_position(&mut self, max_end_position: Option<u64>) {
221 self.buffer.max_end_position = max_end_position;
222 }
223
224 fn get_size(&mut self) -> Result<u64, String> {
225 match self.file_size {
226 Some(length) => Ok(length),
227 None => Err("No length detected".to_string()),
228 }
229 }
230}
231
232impl Read for HttpReader {
233 fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
234 if self.buffer.get_cached_size() >= buf.len() {
235 self.position += buf.len() as u64;
236 if self.buffer.get_data(buf) {
237 Ok(buf.len())
238 } else {
239 Err(Error::new(
240 ErrorKind::Other,
241 "unable to read data from HTTP cache",
242 ))
243 }
244 } else if let Some(buffer_size) = self.buffer.size {
245 let some_data =
246 load_data(self, buffer_size).map_err(|msg| Error::new(ErrorKind::Other, msg))?;
247
248 if let Some(data) = some_data {
249 self.buffer.append_data(&data.to_vec());
250 self.position += buf.len() as u64;
251 if self.buffer.get_data(buf) {
252 Ok(buf.len())
253 } else {
254 Err(Error::new(
255 ErrorKind::Other,
256 "unable to read data from HTTP cache",
257 ))
258 }
259 } else {
260 Ok(0)
261 }
262 } else {
263 let some_data =
264 load_data(self, buf.len()).map_err(|msg| Error::new(ErrorKind::Other, msg))?;
265
266 if let Some(data) = some_data {
267 if data.len() >= buf.len() {
268 buf.clone_from_slice(&data);
269 Ok(data.len())
270 } else {
271 Ok(0)
272 }
273 } else {
274 Ok(0)
275 }
276 }
277 }
278}
279
280impl Seek for HttpReader {
281 fn seek(&mut self, seek_from: SeekFrom) -> Result<u64, Error> {
282 match seek_from {
283 SeekFrom::Current(offset) => {
284 self.position += offset as u64;
285 if self.buffer.size.is_some() {
286 if offset > 0 && self.buffer.get_cached_size() > offset as usize {
287 let mut skipped_data = vec![];
288 skipped_data.resize(offset as usize, 0);
289 let _skiped_data = self.buffer.get_data(&mut skipped_data);
290 } else {
291 self.buffer.reset();
292 }
293 }
294 }
295 SeekFrom::Start(offset) => {
296 self.buffer.reset();
297 self.position = offset;
298 if self.buffer.size.is_some() {
299 self.buffer.position = self.position;
300 }
301 }
302 SeekFrom::End(offset) => {
303 self.buffer.reset();
304 match self.file_size {
305 Some(size) => {
306 self.position = size - offset as u64;
307 if self.buffer.size.is_some() {
308 self.buffer.position = self.position;
309 }
310 }
311 None => return Err(Error::new(ErrorKind::Other, "Missing file size")),
312 }
313 }
314 }
315 Ok(self.position)
316 }
317}