1use crate::error::Result;
2use bytes::{BufMut, BytesMut};
3use read_logger::{Level, ReadStatsLogger};
4use std::cmp::{max, min};
5use std::str::{self, FromStr};
6
7struct HttpRangeBuffer {
9 buf: BytesMut,
10 min_req_size: usize,
11 offset: usize,
13 head: usize,
15 read_stats: ReadStatsLogger,
16 http_stats: ReadStatsLogger,
17}
18
19impl HttpRangeBuffer {
20 pub fn new() -> Self {
21 HttpRangeBuffer {
22 buf: BytesMut::new(),
23 min_req_size: 1024,
24 offset: 0,
25 head: 0,
26 read_stats: ReadStatsLogger::new(Level::Trace, "read"),
27 http_stats: ReadStatsLogger::new(Level::Debug, "http-range"),
28 }
29 }
30
31 fn tail(&self) -> usize {
32 self.head + self.buf.len()
33 }
34
35 fn get_request_range(&mut self, begin: usize, length: usize) -> Option<(usize, usize)> {
36 self.read_stats.log(begin, length, length);
48 if begin + length > self.tail() || begin < self.head {
50 if begin > self.head && begin < self.tail() {
52 let _ = self.buf.split_to(begin - self.head);
53 self.head = begin;
54 } else if begin >= self.tail() || begin < self.head {
55 self.buf.clear();
56 self.head = begin;
57 }
58
59 let range_begin = max(begin, self.tail());
61 let range_length = max(begin + length - range_begin, self.min_req_size);
62 Some((range_begin, range_length))
63 } else {
64 None
65 }
66 }
67
68 fn range(&mut self, begin: usize, length: usize) -> String {
69 let end = (begin + length).saturating_sub(1);
70 format!("bytes={begin}-{end}")
71 }
72}
73
74pub(crate) mod nonblocking {
75 use super::*;
76 use crate::range_client::AsyncHttpRangeClient;
77
78 pub struct AsyncBufferedHttpRangeClient<T: AsyncHttpRangeClient> {
80 http_client: T,
81 url: String,
82 buffer: HttpRangeBuffer,
83 }
84
85 impl<T: AsyncHttpRangeClient> AsyncBufferedHttpRangeClient<T> {
86 pub fn with(http_client: T, url: &str) -> AsyncBufferedHttpRangeClient<T> {
87 AsyncBufferedHttpRangeClient {
88 http_client,
89 url: url.to_string(),
90 buffer: HttpRangeBuffer::new(),
91 }
92 }
93
94 pub fn set_min_req_size(&mut self, size: usize) {
96 self.buffer.min_req_size = size;
97 }
98
99 pub fn min_req_size(&mut self, size: usize) -> &mut Self {
101 self.set_min_req_size(size);
102 self
103 }
104
105 pub async fn get_range(&mut self, begin: usize, length: usize) -> Result<&[u8]> {
107 let slice_len = if let Some((range_begin, range_length)) =
108 self.buffer.get_request_range(begin, length)
109 {
110 self.buffer
111 .http_stats
112 .log(range_begin, range_length, length);
113 let range = self.buffer.range(range_begin, range_length);
114 let bytes = self.http_client.get_range(&self.url, &range).await?;
115 let eff_len = bytes.len();
116 self.buffer.buf.put(bytes);
117 min(range_begin - begin + eff_len, length)
118 } else {
119 length
120 };
121 self.buffer.offset = begin + slice_len;
122 let lower = begin - self.buffer.head;
124 Ok(&self.buffer.buf[lower..lower + slice_len])
125 }
126
127 pub async fn get_bytes(&mut self, length: usize) -> Result<&[u8]> {
129 self.get_range(self.buffer.offset, length).await
130 }
131
132 pub async fn head_response_header(&self, header: &str) -> Result<Option<String>> {
134 self.http_client
135 .head_response_header(&self.url, header)
136 .await
137 }
138 }
139}
140
141pub(crate) mod sync {
142 use super::*;
143 use crate::range_client::SyncHttpRangeClient;
144 use crate::HttpError;
145 use bytes::Buf;
146 use std::io::{BufRead, Read, Seek, SeekFrom};
147
148 pub struct SyncBufferedHttpRangeClient<T: SyncHttpRangeClient> {
150 http_client: T,
151 url: String,
152 buffer: HttpRangeBuffer,
153 length_info: Option<Option<u64>>,
154 }
155
156 impl<T: SyncHttpRangeClient> SyncBufferedHttpRangeClient<T> {
157 pub fn with(http_client: T, url: &str) -> SyncBufferedHttpRangeClient<T> {
158 SyncBufferedHttpRangeClient {
159 http_client,
160 url: url.to_string(),
161 buffer: HttpRangeBuffer::new(),
162 length_info: None,
163 }
164 }
165
166 pub fn set_min_req_size(&mut self, size: usize) {
168 self.buffer.min_req_size = size;
169 }
170
171 pub fn min_req_size(&mut self, size: usize) -> &mut Self {
173 self.set_min_req_size(size);
174 self
175 }
176
177 pub fn get_range(&mut self, begin: usize, length: usize) -> Result<&[u8]> {
179 let slice_len = if let Some((range_begin, range_length)) =
180 self.buffer.get_request_range(begin, length)
181 {
182 self.buffer.http_stats.log(begin, range_length, length);
183 let range = self.buffer.range(range_begin, range_length);
184 let bytes = self.http_client.get_range(&self.url, &range)?;
185 let eff_len = bytes.len();
186 self.buffer.buf.put(bytes);
187 min(range_begin - begin + eff_len, length)
188 } else {
189 length
190 };
191 self.buffer.offset = begin + slice_len;
192 let lower = begin - self.buffer.head;
194 Ok(&self.buffer.buf[lower..lower + slice_len])
195 }
196
197 pub fn get_bytes(&mut self, length: usize) -> Result<&[u8]> {
199 self.get_range(self.buffer.offset, length)
200 }
201
202 pub fn head_response_header(&self, header: &str) -> Result<Option<String>> {
204 self.http_client.head_response_header(&self.url, header)
205 }
206
207 pub fn get_content_length(&mut self) -> Result<Option<u64>> {
209 let header_val = self.head_response_header("content-length")?;
210 let length_info = if let Some(val) = header_val {
211 let length = u64::from_str(&val).map_err(|_| {
212 HttpError::HttpError("Invalid content-length received".to_string())
213 })?;
214 Some(length)
215 } else {
216 None
217 };
218 self.length_info = Some(length_info);
219 Ok(length_info)
220 }
221 }
222
223 impl<T: SyncHttpRangeClient> Read for SyncBufferedHttpRangeClient<T> {
224 fn read(&mut self, buf: &mut [u8]) -> std::result::Result<usize, std::io::Error> {
225 let length = buf.len();
226 let mut bytes = self.get_bytes(length).map_err(|e| match e {
227 HttpError::HttpStatus(416) => {
228 std::io::Error::from(std::io::ErrorKind::UnexpectedEof)
229 }
230 e => std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
231 })?;
232 bytes.copy_to_slice(&mut buf[0..bytes.len()]);
233 Ok(length)
234 }
235 }
236
237 impl<T: SyncHttpRangeClient> BufRead for SyncBufferedHttpRangeClient<T> {
238 fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
239 if self.buffer.offset >= self.buffer.tail() || self.buffer.offset < self.buffer.head {
240 let res = self.get_bytes(self.buffer.min_req_size);
241 if let Some(HttpError::HttpStatus(416)) = res.as_ref().err() {
242 return Ok(&[]);
244 }
245 res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
246 self.buffer.offset = self.buffer.head;
247 }
248 Ok(&self.buffer.buf[..])
249 }
250
251 fn consume(&mut self, amt: usize) {
252 self.buffer.offset += amt;
253 }
254 }
255
256 impl<T: SyncHttpRangeClient> Seek for SyncBufferedHttpRangeClient<T> {
257 fn seek(&mut self, pos: SeekFrom) -> std::result::Result<u64, std::io::Error> {
258 match pos {
259 SeekFrom::Start(p) => {
260 self.buffer.offset = p as usize;
261 Ok(p)
262 }
263 SeekFrom::End(p) => {
264 if self.length_info.is_none() {
265 let _ = self.get_content_length().map_err(|e| {
267 std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
268 })?;
269 }
270 if let Some(Some(length)) = self.length_info {
271 self.buffer.offset = length.saturating_add_signed(p) as usize;
272 Ok(self.buffer.offset as u64)
273 } else {
274 Err(std::io::Error::new(
275 std::io::ErrorKind::Other,
276 "SeekFrom::End failed - no content-length received",
277 ))
278 }
279 }
280 SeekFrom::Current(p) => {
281 self.buffer.offset = self.buffer.offset.saturating_add_signed(p as isize);
282 Ok(self.buffer.offset as u64)
283 }
284 }
285 }
286 }
287}
288
289#[cfg(test)]
290#[cfg(feature = "reqwest-async")]
291mod test_async {
292 use crate::{AsyncBufferedHttpRangeClient, BufferedHttpRangeClient, Result};
293
294 fn init_logger() {
295 let _ = env_logger::builder().is_test(true).try_init();
296 }
297
298 #[tokio::test]
299 async fn http_read_async() -> Result<()> {
300 init_logger();
301 let mut client =
302 BufferedHttpRangeClient::new("https://flatgeobuf.org/test/data/countries.fgb");
303 let bytes = client.min_req_size(256).get_range(0, 3).await?;
304 assert_eq!(bytes, b"fgb");
305 let version = client.get_bytes(1).await?;
306 assert_eq!(version, [3]);
307 Ok(())
308 }
309
310 #[tokio::test]
311 async fn read_over_min_req_size() -> Result<()> {
312 let mut client =
313 BufferedHttpRangeClient::new("https://flatgeobuf.org/test/data/countries.fgb");
314 let bytes = client.min_req_size(4).get_range(0, 8).await?;
315 assert_eq!(bytes, [b'f', b'g', b'b', 3, b'f', b'g', b'b', 0]);
316 Ok(())
317 }
318
319 #[tokio::test]
320 async fn zero_range() -> Result<()> {
321 init_logger();
322 let mut client =
323 BufferedHttpRangeClient::new("https://flatgeobuf.org/test/data/countries.fgb");
324 let bytes = client.get_range(100, 0).await?;
325 assert_eq!(bytes, []);
326 Ok(())
327 }
328
329 #[tokio::test]
330 async fn after_end() -> Result<()> {
331 init_logger();
332 let mut client =
334 BufferedHttpRangeClient::new("https://flatgeobuf.org/test/data/countries.fgb");
335 let bytes = client.get_range(205670, 10).await?;
336 assert_eq!(bytes, [78, 192, 205, 204, 204, 204, 204, 236, 73, 192]);
337
338 let bytes = client.get_bytes(10).await;
339 assert_eq!(&bytes.unwrap_err().to_string(), "http status 416");
340
341 let bytes = client.get_range(205670, 20).await?;
342 assert_eq!(bytes, [78, 192, 205, 204, 204, 204, 204, 236, 73, 192]);
343
344 Ok(())
345 }
346
347 #[tokio::test]
348 async fn buffer_overlap() -> Result<()> {
349 init_logger();
350 let mut client =
351 BufferedHttpRangeClient::new("https://flatgeobuf.org/test/data/countries.fgb");
352 let bytes = client.min_req_size(4).get_range(0, 3).await?;
353 assert_eq!(bytes, [b'f', b'g', b'b']);
354 let bytes = client.get_range(3, 4).await?;
355 assert_eq!(bytes, [3, b'f', b'g', b'b']);
356 let bytes = client.get_bytes(1).await?;
357 assert_eq!(bytes, [0]);
358 Ok(())
359 }
360
361 #[tokio::test]
362 async fn custom_headers() -> Result<()> {
363 init_logger();
364 let http_client = reqwest::Client::builder()
365 .user_agent("rust-client")
366 .build()
367 .unwrap();
368 let mut client = AsyncBufferedHttpRangeClient::with(
369 http_client,
370 "https://flatgeobuf.org/test/data/countries.fgb",
371 );
372 let bytes = client.min_req_size(256).get_range(0, 3).await?;
373 assert_eq!(bytes, b"fgb");
374 Ok(())
375 }
376}
377
378#[cfg(test)]
379#[cfg(any(feature = "reqwest-sync", feature = "ureq-sync"))]
380mod test_sync {
381 #[cfg(feature = "reqwest-sync")]
382 use crate::HttpReader;
383 use crate::Result;
384 #[cfg(all(feature = "ureq-sync", not(feature = "reqwest-sync")))]
385 use crate::UreqHttpReader as HttpReader;
386 use std::io::{BufRead, Read, Seek, SeekFrom};
387
388 fn init_logger() {
389 let _ = env_logger::builder().is_test(true).try_init();
390 }
391
392 #[test]
393 fn http_read_sync() -> Result<()> {
394 init_logger();
395 let mut client = HttpReader::new("https://flatgeobuf.org/test/data/countries.fgb");
396 let bytes = client.min_req_size(256).get_range(0, 3)?;
397 assert_eq!(bytes, b"fgb");
398
399 let version = client.get_bytes(1)?;
400 assert_eq!(version, [3]);
401
402 let bytes = client.get_bytes(3)?;
403 assert_eq!(bytes, b"fgb");
404 Ok(())
405 }
406
407 #[test]
408 fn http_read_sync_zero_range() -> Result<()> {
409 init_logger();
410 let mut client = HttpReader::new("https://flatgeobuf.org/test/data/countries.fgb");
411 let bytes = client.min_req_size(256).get_range(0, 0)?;
412 assert_eq!(bytes, []);
413 Ok(())
414 }
415
416 #[test]
417 fn io_read() -> std::io::Result<()> {
418 init_logger();
419 let mut reader = HttpReader::new("https://flatgeobuf.org/test/data/countries.fgb");
420 reader.seek(SeekFrom::Start(3)).ok();
421 let mut version = [0; 1];
422 reader.min_req_size(256).read_exact(&mut version)?;
423 assert_eq!(version, [3]);
424
425 let mut bytes = [0; 3];
426 reader.read_exact(&mut bytes)?;
427 assert_eq!(&bytes, b"fgb");
428 Ok(())
429 }
430
431 #[test]
432 fn io_read_over_min_req_size() -> std::io::Result<()> {
433 init_logger();
434 let mut reader = HttpReader::new("https://flatgeobuf.org/test/data/countries.fgb");
435 let mut bytes = [0; 8];
436 reader.min_req_size(4).read_exact(&mut bytes)?;
437 assert_eq!(bytes, [b'f', b'g', b'b', 3, b'f', b'g', b'b', 0]);
438 Ok(())
439 }
440
441 #[test]
442 fn io_read_non_exact() -> std::io::Result<()> {
443 init_logger();
444 let mut reader = HttpReader::new("https://flatgeobuf.org/test/data/countries.fgb");
445 let mut bytes = [0; 8];
446 reader.min_req_size(4).read(&mut bytes)?;
448 assert_eq!(bytes, [b'f', b'g', b'b', 3, b'f', b'g', b'b', 0]);
449 Ok(())
450 }
451
452 #[test]
453 fn after_end() -> std::io::Result<()> {
454 init_logger();
455 let mut reader = HttpReader::new("https://flatgeobuf.org/test/data/countries.fgb");
457 reader.seek(SeekFrom::Start(205670)).ok();
458 let mut bytes = [0; 10];
459 reader.read_exact(&mut bytes)?;
460 assert_eq!(bytes, [78, 192, 205, 204, 204, 204, 204, 236, 73, 192]);
461
462 let result = reader.read_exact(&mut bytes);
463 assert_eq!(result.unwrap_err().to_string(), "unexpected end of file");
464
465 reader.seek(SeekFrom::Start(205670)).ok();
466 let mut bytes = [0; 20];
467 reader.read_exact(&mut bytes)?;
468 assert_eq!(
469 bytes,
470 [78, 192, 205, 204, 204, 204, 204, 236, 73, 192, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
471 );
472
473 Ok(())
474 }
475
476 #[test]
477 fn seek_current() -> std::io::Result<()> {
478 init_logger();
479 let mut reader = HttpReader::new("https://flatgeobuf.org/test/data/countries.fgb");
480 let mut bytes = [0; 8];
481 reader.read(&mut bytes)?;
482
483 assert_eq!(reader.seek(SeekFrom::Current(0))?, 8);
484
485 reader.seek(SeekFrom::Current(-8))?;
486 reader.read(&mut bytes)?;
487 assert_eq!(bytes, [b'f', b'g', b'b', 3, b'f', b'g', b'b', 0]);
488
489 Ok(())
490 }
491
492 #[test]
493 fn seek_end() -> std::io::Result<()> {
494 init_logger();
495 let mut reader = HttpReader::new("https://flatgeobuf.org/test/data/countries.fgb");
496
497 let size = reader.seek(SeekFrom::End(0))?;
498 assert_eq!(size, 205680);
499
500 reader.seek(SeekFrom::End(-205680))?;
501
502 let mut bytes = [0; 8];
503 reader.read(&mut bytes)?;
504 assert_eq!(bytes, [b'f', b'g', b'b', 3, b'f', b'g', b'b', 0]);
505
506 Ok(())
507 }
508
509 #[test]
510 fn bufread() -> std::io::Result<()> {
511 init_logger();
512 let mut reader = HttpReader::new("https://flatgeobuf.org/test/data/countries.fgb");
513 reader.set_min_req_size(5);
514
515 let mut bytes = vec![];
516 let num_bytes = reader.read_until(0, &mut bytes).unwrap();
517 assert_eq!(num_bytes, 8);
518 assert_eq!(bytes, [b'f', b'g', b'b', 3, b'f', b'g', b'b', 0]);
519
520 reader.seek(SeekFrom::Start(0)).ok();
522 let mut bytes = vec![];
523 let _num_bytes = reader.read_until(3, &mut bytes).unwrap();
524 assert_eq!(bytes, [b'f', b'g', b'b', 3]);
525 let mut bytes = [0; 2];
526 reader.read(&mut bytes)?;
527 assert_eq!(bytes, [b'f', b'g']);
528 let mut bytes = vec![];
529 let _num_bytes = reader.read_until(0, &mut bytes).unwrap();
530 assert_eq!(bytes, [b'f', b'g', b'b', 0]);
531 reader.seek(SeekFrom::Start(205680 - 8)).ok();
536 let mut bytes = vec![];
537 let num_bytes = reader.read_until(0, &mut bytes).unwrap();
538 assert_eq!(num_bytes, 8);
539 assert_eq!(bytes, [205, 204, 204, 204, 204, 236, 73, 192]);
540
541 reader.seek(SeekFrom::Start(205680 - 5)).ok();
542 let mut bytes = vec![];
543 let num_bytes = reader.read_until(0, &mut bytes).unwrap();
544 assert_eq!(num_bytes, 5);
545 assert_eq!(bytes, [204, 204, 236, 73, 192]);
546
547 Ok(())
548 }
549
550 #[test]
551 fn remote_png() -> std::io::Result<()> {
552 init_logger();
553 let mut reader =
554 HttpReader::new("https://www.rust-lang.org/static/images/favicon-32x32.png");
555 reader.seek(SeekFrom::Start(1)).ok();
556 let mut bytes = [0; 3];
557 reader.read_exact(&mut bytes)?;
558 assert_eq!(&bytes, b"PNG");
559 Ok(())
560 }
561}