http_range_client/
buffered_range_client.rs

1use crate::error::Result;
2use bytes::{BufMut, BytesMut};
3use read_logger::{Level, ReadStatsLogger};
4use std::cmp::{max, min};
5use std::str::{self, FromStr};
6
7/// Buffer for Range request reader (https://developer.mozilla.org/en-US/docs/Web/HTTP/Range_requests)
8struct HttpRangeBuffer {
9    buf: BytesMut,
10    min_req_size: usize,
11    /// Current position for Read+Seek implementation
12    offset: usize,
13    /// Lower index of buffer relative to input stream
14    head: usize,
15    read_stats: ReadStatsLogger,
16    http_stats: ReadStatsLogger,
17}
18
19impl HttpRangeBuffer {
20    pub fn new() -> Self {
21        HttpRangeBuffer {
22            buf: BytesMut::new(),
23            min_req_size: 1024,
24            offset: 0,
25            head: 0,
26            read_stats: ReadStatsLogger::new(Level::Trace, "read"),
27            http_stats: ReadStatsLogger::new(Level::Debug, "http-range"),
28        }
29    }
30
31    fn tail(&self) -> usize {
32        self.head + self.buf.len()
33    }
34
35    fn get_request_range(&mut self, begin: usize, length: usize) -> Option<(usize, usize)> {
36        //
37        //            head  begin    tail
38        //       +------+-----+---+---+------------+
39        // File  |      |     |   |   |            |
40        //       +------+-----+---+---+------------+
41        // buf          |     |   |   |
42        //              +-----+---+---+
43        // Request            |   |
44        //                    +---+
45        //                    length
46
47        self.read_stats.log(begin, length, length);
48        // Download additional bytes if requested range is not in buffer
49        if begin + length > self.tail() || begin < self.head {
50            // Remove bytes before new begin
51            if begin > self.head && begin < self.tail() {
52                let _ = self.buf.split_to(begin - self.head);
53                self.head = begin;
54            } else if begin >= self.tail() || begin < self.head {
55                self.buf.clear();
56                self.head = begin;
57            }
58
59            // Read additional bytes into buffer
60            let range_begin = max(begin, self.tail());
61            let range_length = max(begin + length - range_begin, self.min_req_size);
62            Some((range_begin, range_length))
63        } else {
64            None
65        }
66    }
67
68    fn range(&mut self, begin: usize, length: usize) -> String {
69        let end = (begin + length).saturating_sub(1);
70        format!("bytes={begin}-{end}")
71    }
72}
73
74pub(crate) mod nonblocking {
75    use super::*;
76    use crate::range_client::AsyncHttpRangeClient;
77
78    /// HTTP client adapter for HTTP Range requests with a buffer optimized for sequential reading
79    pub struct AsyncBufferedHttpRangeClient<T: AsyncHttpRangeClient> {
80        http_client: T,
81        url: String,
82        buffer: HttpRangeBuffer,
83    }
84
85    impl<T: AsyncHttpRangeClient> AsyncBufferedHttpRangeClient<T> {
86        pub fn with(http_client: T, url: &str) -> AsyncBufferedHttpRangeClient<T> {
87            AsyncBufferedHttpRangeClient {
88                http_client,
89                url: url.to_string(),
90                buffer: HttpRangeBuffer::new(),
91            }
92        }
93
94        /// Set minimal request size.
95        pub fn set_min_req_size(&mut self, size: usize) {
96            self.buffer.min_req_size = size;
97        }
98
99        /// Set minimal request size.
100        pub fn min_req_size(&mut self, size: usize) -> &mut Self {
101            self.set_min_req_size(size);
102            self
103        }
104
105        /// Get `length` bytes with offset `begin`.
106        pub async fn get_range(&mut self, begin: usize, length: usize) -> Result<&[u8]> {
107            let slice_len = if let Some((range_begin, range_length)) =
108                self.buffer.get_request_range(begin, length)
109            {
110                self.buffer
111                    .http_stats
112                    .log(range_begin, range_length, length);
113                let range = self.buffer.range(range_begin, range_length);
114                let bytes = self.http_client.get_range(&self.url, &range).await?;
115                let eff_len = bytes.len();
116                self.buffer.buf.put(bytes);
117                min(range_begin - begin + eff_len, length)
118            } else {
119                length
120            };
121            self.buffer.offset = begin + slice_len;
122            // Return slice from buffer
123            let lower = begin - self.buffer.head;
124            Ok(&self.buffer.buf[lower..lower + slice_len])
125        }
126
127        /// Get `length` bytes from current offset.
128        pub async fn get_bytes(&mut self, length: usize) -> Result<&[u8]> {
129            self.get_range(self.buffer.offset, length).await
130        }
131
132        /// Send a HEAD request and return response header value
133        pub async fn head_response_header(&self, header: &str) -> Result<Option<String>> {
134            self.http_client
135                .head_response_header(&self.url, header)
136                .await
137        }
138    }
139}
140
141pub(crate) mod sync {
142    use super::*;
143    use crate::range_client::SyncHttpRangeClient;
144    use crate::HttpError;
145    use bytes::Buf;
146    use std::io::{BufRead, Read, Seek, SeekFrom};
147
148    /// HTTP client adapter for HTTP Range requests with a buffer optimized for sequential reading
149    pub struct SyncBufferedHttpRangeClient<T: SyncHttpRangeClient> {
150        http_client: T,
151        url: String,
152        buffer: HttpRangeBuffer,
153        length_info: Option<Option<u64>>,
154    }
155
156    impl<T: SyncHttpRangeClient> SyncBufferedHttpRangeClient<T> {
157        pub fn with(http_client: T, url: &str) -> SyncBufferedHttpRangeClient<T> {
158            SyncBufferedHttpRangeClient {
159                http_client,
160                url: url.to_string(),
161                buffer: HttpRangeBuffer::new(),
162                length_info: None,
163            }
164        }
165
166        /// Set minimal request size.
167        pub fn set_min_req_size(&mut self, size: usize) {
168            self.buffer.min_req_size = size;
169        }
170
171        /// Set minimal request size.
172        pub fn min_req_size(&mut self, size: usize) -> &mut Self {
173            self.set_min_req_size(size);
174            self
175        }
176
177        /// Get `length` bytes with offset `begin`.
178        pub fn get_range(&mut self, begin: usize, length: usize) -> Result<&[u8]> {
179            let slice_len = if let Some((range_begin, range_length)) =
180                self.buffer.get_request_range(begin, length)
181            {
182                self.buffer.http_stats.log(begin, range_length, length);
183                let range = self.buffer.range(range_begin, range_length);
184                let bytes = self.http_client.get_range(&self.url, &range)?;
185                let eff_len = bytes.len();
186                self.buffer.buf.put(bytes);
187                min(range_begin - begin + eff_len, length)
188            } else {
189                length
190            };
191            self.buffer.offset = begin + slice_len;
192            // Return slice from buffer
193            let lower = begin - self.buffer.head;
194            Ok(&self.buffer.buf[lower..lower + slice_len])
195        }
196
197        /// Get `length` bytes from current offset.
198        pub fn get_bytes(&mut self, length: usize) -> Result<&[u8]> {
199            self.get_range(self.buffer.offset, length)
200        }
201
202        /// Send a HEAD request and return response header value
203        pub fn head_response_header(&self, header: &str) -> Result<Option<String>> {
204            self.http_client.head_response_header(&self.url, header)
205        }
206
207        /// Send a HEAD request and get content-length
208        pub fn get_content_length(&mut self) -> Result<Option<u64>> {
209            let header_val = self.head_response_header("content-length")?;
210            let length_info = if let Some(val) = header_val {
211                let length = u64::from_str(&val).map_err(|_| {
212                    HttpError::HttpError("Invalid content-length received".to_string())
213                })?;
214                Some(length)
215            } else {
216                None
217            };
218            self.length_info = Some(length_info);
219            Ok(length_info)
220        }
221    }
222
223    impl<T: SyncHttpRangeClient> Read for SyncBufferedHttpRangeClient<T> {
224        fn read(&mut self, buf: &mut [u8]) -> std::result::Result<usize, std::io::Error> {
225            let length = buf.len();
226            let mut bytes = self.get_bytes(length).map_err(|e| match e {
227                HttpError::HttpStatus(416) => {
228                    std::io::Error::from(std::io::ErrorKind::UnexpectedEof)
229                }
230                e => std::io::Error::new(std::io::ErrorKind::Other, e.to_string()),
231            })?;
232            bytes.copy_to_slice(&mut buf[0..bytes.len()]);
233            Ok(length)
234        }
235    }
236
237    impl<T: SyncHttpRangeClient> BufRead for SyncBufferedHttpRangeClient<T> {
238        fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
239            if self.buffer.offset >= self.buffer.tail() || self.buffer.offset < self.buffer.head {
240                let res = self.get_bytes(self.buffer.min_req_size);
241                if let Some(HttpError::HttpStatus(416)) = res.as_ref().err() {
242                    // An empty buffer indicates that the stream has reached EOF
243                    return Ok(&[]);
244                }
245                res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?;
246                self.buffer.offset = self.buffer.head;
247            }
248            Ok(&self.buffer.buf[..])
249        }
250
251        fn consume(&mut self, amt: usize) {
252            self.buffer.offset += amt;
253        }
254    }
255
256    impl<T: SyncHttpRangeClient> Seek for SyncBufferedHttpRangeClient<T> {
257        fn seek(&mut self, pos: SeekFrom) -> std::result::Result<u64, std::io::Error> {
258            match pos {
259                SeekFrom::Start(p) => {
260                    self.buffer.offset = p as usize;
261                    Ok(p)
262                }
263                SeekFrom::End(p) => {
264                    if self.length_info.is_none() {
265                        // Read content-length with HEAD request
266                        let _ = self.get_content_length().map_err(|e| {
267                            std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
268                        })?;
269                    }
270                    if let Some(Some(length)) = self.length_info {
271                        self.buffer.offset = length.saturating_add_signed(p) as usize;
272                        Ok(self.buffer.offset as u64)
273                    } else {
274                        Err(std::io::Error::new(
275                            std::io::ErrorKind::Other,
276                            "SeekFrom::End failed - no content-length received",
277                        ))
278                    }
279                }
280                SeekFrom::Current(p) => {
281                    self.buffer.offset = self.buffer.offset.saturating_add_signed(p as isize);
282                    Ok(self.buffer.offset as u64)
283                }
284            }
285        }
286    }
287}
288
289#[cfg(test)]
290#[cfg(feature = "reqwest-async")]
291mod test_async {
292    use crate::{AsyncBufferedHttpRangeClient, BufferedHttpRangeClient, Result};
293
294    fn init_logger() {
295        let _ = env_logger::builder().is_test(true).try_init();
296    }
297
298    #[tokio::test]
299    async fn http_read_async() -> Result<()> {
300        init_logger();
301        let mut client =
302            BufferedHttpRangeClient::new("https://flatgeobuf.org/test/data/countries.fgb");
303        let bytes = client.min_req_size(256).get_range(0, 3).await?;
304        assert_eq!(bytes, b"fgb");
305        let version = client.get_bytes(1).await?;
306        assert_eq!(version, [3]);
307        Ok(())
308    }
309
310    #[tokio::test]
311    async fn read_over_min_req_size() -> Result<()> {
312        let mut client =
313            BufferedHttpRangeClient::new("https://flatgeobuf.org/test/data/countries.fgb");
314        let bytes = client.min_req_size(4).get_range(0, 8).await?;
315        assert_eq!(bytes, [b'f', b'g', b'b', 3, b'f', b'g', b'b', 0]);
316        Ok(())
317    }
318
319    #[tokio::test]
320    async fn zero_range() -> Result<()> {
321        init_logger();
322        let mut client =
323            BufferedHttpRangeClient::new("https://flatgeobuf.org/test/data/countries.fgb");
324        let bytes = client.get_range(100, 0).await?;
325        assert_eq!(bytes, []);
326        Ok(())
327    }
328
329    #[tokio::test]
330    async fn after_end() -> Result<()> {
331        init_logger();
332        // countries.fgb has 205680 bytes
333        let mut client =
334            BufferedHttpRangeClient::new("https://flatgeobuf.org/test/data/countries.fgb");
335        let bytes = client.get_range(205670, 10).await?;
336        assert_eq!(bytes, [78, 192, 205, 204, 204, 204, 204, 236, 73, 192]);
337
338        let bytes = client.get_bytes(10).await;
339        assert_eq!(&bytes.unwrap_err().to_string(), "http status 416");
340
341        let bytes = client.get_range(205670, 20).await?;
342        assert_eq!(bytes, [78, 192, 205, 204, 204, 204, 204, 236, 73, 192]);
343
344        Ok(())
345    }
346
347    #[tokio::test]
348    async fn buffer_overlap() -> Result<()> {
349        init_logger();
350        let mut client =
351            BufferedHttpRangeClient::new("https://flatgeobuf.org/test/data/countries.fgb");
352        let bytes = client.min_req_size(4).get_range(0, 3).await?;
353        assert_eq!(bytes, [b'f', b'g', b'b']);
354        let bytes = client.get_range(3, 4).await?;
355        assert_eq!(bytes, [3, b'f', b'g', b'b']);
356        let bytes = client.get_bytes(1).await?;
357        assert_eq!(bytes, [0]);
358        Ok(())
359    }
360
361    #[tokio::test]
362    async fn custom_headers() -> Result<()> {
363        init_logger();
364        let http_client = reqwest::Client::builder()
365            .user_agent("rust-client")
366            .build()
367            .unwrap();
368        let mut client = AsyncBufferedHttpRangeClient::with(
369            http_client,
370            "https://flatgeobuf.org/test/data/countries.fgb",
371        );
372        let bytes = client.min_req_size(256).get_range(0, 3).await?;
373        assert_eq!(bytes, b"fgb");
374        Ok(())
375    }
376}
377
378#[cfg(test)]
379#[cfg(any(feature = "reqwest-sync", feature = "ureq-sync"))]
380mod test_sync {
381    #[cfg(feature = "reqwest-sync")]
382    use crate::HttpReader;
383    use crate::Result;
384    #[cfg(all(feature = "ureq-sync", not(feature = "reqwest-sync")))]
385    use crate::UreqHttpReader as HttpReader;
386    use std::io::{BufRead, Read, Seek, SeekFrom};
387
388    fn init_logger() {
389        let _ = env_logger::builder().is_test(true).try_init();
390    }
391
392    #[test]
393    fn http_read_sync() -> Result<()> {
394        init_logger();
395        let mut client = HttpReader::new("https://flatgeobuf.org/test/data/countries.fgb");
396        let bytes = client.min_req_size(256).get_range(0, 3)?;
397        assert_eq!(bytes, b"fgb");
398
399        let version = client.get_bytes(1)?;
400        assert_eq!(version, [3]);
401
402        let bytes = client.get_bytes(3)?;
403        assert_eq!(bytes, b"fgb");
404        Ok(())
405    }
406
407    #[test]
408    fn http_read_sync_zero_range() -> Result<()> {
409        init_logger();
410        let mut client = HttpReader::new("https://flatgeobuf.org/test/data/countries.fgb");
411        let bytes = client.min_req_size(256).get_range(0, 0)?;
412        assert_eq!(bytes, []);
413        Ok(())
414    }
415
416    #[test]
417    fn io_read() -> std::io::Result<()> {
418        init_logger();
419        let mut reader = HttpReader::new("https://flatgeobuf.org/test/data/countries.fgb");
420        reader.seek(SeekFrom::Start(3)).ok();
421        let mut version = [0; 1];
422        reader.min_req_size(256).read_exact(&mut version)?;
423        assert_eq!(version, [3]);
424
425        let mut bytes = [0; 3];
426        reader.read_exact(&mut bytes)?;
427        assert_eq!(&bytes, b"fgb");
428        Ok(())
429    }
430
431    #[test]
432    fn io_read_over_min_req_size() -> std::io::Result<()> {
433        init_logger();
434        let mut reader = HttpReader::new("https://flatgeobuf.org/test/data/countries.fgb");
435        let mut bytes = [0; 8];
436        reader.min_req_size(4).read_exact(&mut bytes)?;
437        assert_eq!(bytes, [b'f', b'g', b'b', 3, b'f', b'g', b'b', 0]);
438        Ok(())
439    }
440
441    #[test]
442    fn io_read_non_exact() -> std::io::Result<()> {
443        init_logger();
444        let mut reader = HttpReader::new("https://flatgeobuf.org/test/data/countries.fgb");
445        let mut bytes = [0; 8];
446        // We could only read 4 bytes in this case
447        reader.min_req_size(4).read(&mut bytes)?;
448        assert_eq!(bytes, [b'f', b'g', b'b', 3, b'f', b'g', b'b', 0]);
449        Ok(())
450    }
451
452    #[test]
453    fn after_end() -> std::io::Result<()> {
454        init_logger();
455        // countries.fgb has 205680 bytes
456        let mut reader = HttpReader::new("https://flatgeobuf.org/test/data/countries.fgb");
457        reader.seek(SeekFrom::Start(205670)).ok();
458        let mut bytes = [0; 10];
459        reader.read_exact(&mut bytes)?;
460        assert_eq!(bytes, [78, 192, 205, 204, 204, 204, 204, 236, 73, 192]);
461
462        let result = reader.read_exact(&mut bytes);
463        assert_eq!(result.unwrap_err().to_string(), "unexpected end of file");
464
465        reader.seek(SeekFrom::Start(205670)).ok();
466        let mut bytes = [0; 20];
467        reader.read_exact(&mut bytes)?;
468        assert_eq!(
469            bytes,
470            [78, 192, 205, 204, 204, 204, 204, 236, 73, 192, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
471        );
472
473        Ok(())
474    }
475
476    #[test]
477    fn seek_current() -> std::io::Result<()> {
478        init_logger();
479        let mut reader = HttpReader::new("https://flatgeobuf.org/test/data/countries.fgb");
480        let mut bytes = [0; 8];
481        reader.read(&mut bytes)?;
482
483        assert_eq!(reader.seek(SeekFrom::Current(0))?, 8);
484
485        reader.seek(SeekFrom::Current(-8))?;
486        reader.read(&mut bytes)?;
487        assert_eq!(bytes, [b'f', b'g', b'b', 3, b'f', b'g', b'b', 0]);
488
489        Ok(())
490    }
491
492    #[test]
493    fn seek_end() -> std::io::Result<()> {
494        init_logger();
495        let mut reader = HttpReader::new("https://flatgeobuf.org/test/data/countries.fgb");
496
497        let size = reader.seek(SeekFrom::End(0))?;
498        assert_eq!(size, 205680);
499
500        reader.seek(SeekFrom::End(-205680))?;
501
502        let mut bytes = [0; 8];
503        reader.read(&mut bytes)?;
504        assert_eq!(bytes, [b'f', b'g', b'b', 3, b'f', b'g', b'b', 0]);
505
506        Ok(())
507    }
508
509    #[test]
510    fn bufread() -> std::io::Result<()> {
511        init_logger();
512        let mut reader = HttpReader::new("https://flatgeobuf.org/test/data/countries.fgb");
513        reader.set_min_req_size(5);
514
515        let mut bytes = vec![];
516        let num_bytes = reader.read_until(0, &mut bytes).unwrap();
517        assert_eq!(num_bytes, 8);
518        assert_eq!(bytes, [b'f', b'g', b'b', 3, b'f', b'g', b'b', 0]);
519
520        // Mix with seek+read
521        reader.seek(SeekFrom::Start(0)).ok();
522        let mut bytes = vec![];
523        let _num_bytes = reader.read_until(3, &mut bytes).unwrap();
524        assert_eq!(bytes, [b'f', b'g', b'b', 3]);
525        let mut bytes = [0; 2];
526        reader.read(&mut bytes)?;
527        assert_eq!(bytes, [b'f', b'g']);
528        let mut bytes = vec![];
529        let _num_bytes = reader.read_until(0, &mut bytes).unwrap();
530        assert_eq!(bytes, [b'f', b'g', b'b', 0]);
531        // ? should it be
532        // assert_eq!(bytes, [b'b', 0]);
533
534        // Test EOF
535        reader.seek(SeekFrom::Start(205680 - 8)).ok();
536        let mut bytes = vec![];
537        let num_bytes = reader.read_until(0, &mut bytes).unwrap();
538        assert_eq!(num_bytes, 8);
539        assert_eq!(bytes, [205, 204, 204, 204, 204, 236, 73, 192]);
540
541        reader.seek(SeekFrom::Start(205680 - 5)).ok();
542        let mut bytes = vec![];
543        let num_bytes = reader.read_until(0, &mut bytes).unwrap();
544        assert_eq!(num_bytes, 5);
545        assert_eq!(bytes, [204, 204, 236, 73, 192]);
546
547        Ok(())
548    }
549
550    #[test]
551    fn remote_png() -> std::io::Result<()> {
552        init_logger();
553        let mut reader =
554            HttpReader::new("https://www.rust-lang.org/static/images/favicon-32x32.png");
555        reader.seek(SeekFrom::Start(1)).ok();
556        let mut bytes = [0; 3];
557        reader.read_exact(&mut bytes)?;
558        assert_eq!(&bytes, b"PNG");
559        Ok(())
560    }
561}