Skip to main content

async_http_range_reader/
lib.rs

1//! This library provides the [`AsyncHttpRangeReader`] type.
2//!
3//! It allows streaming a file over HTTP while also allow random access. The type implements both
4//! [`AsyncRead`] as well as [`AsyncSeek`]. This is supported through the use of range requests.
5//! Each individual read will request a portion of the file using an HTTP range request.
6//!
7//! Requesting numerous small reads might turn out to be relatively slow because each reads needs to
8//! perform an HTTP request. To alleviate this issue [`AsyncHttpRangeReader::prefetch`] is provided.
9//! Using this method you can *prefect* a number of bytes which will be streamed in on the
10//! background. If a read operation is reading from already (pre)fetched ranges it will stream from
11//! the internal cache instead.
12//!
13//! Internally the [`AsyncHttpRangeReader`] stores a memory map which allows sparsely reading the
14//! data into memory without actually requiring all memory for file to be resident in memory.
15//!
16//! The primary use-case for this library is to be able to sparsely stream a zip archive over HTTP
17//! but its designed in a generic fashion.
18
19mod error;
20mod sparse_range;
21
22use futures::{FutureExt, Stream, StreamExt};
23use http_content_range::{ContentRange, ContentRangeBytes};
24use memmap2::MmapMut;
25use reqwest::header::HeaderMap;
26use reqwest::{Response, Url};
27use sparse_range::SparseRange;
28use std::{
29    io::{self, SeekFrom},
30    ops::Range,
31    pin::Pin,
32    sync::Arc,
33    task::{ready, Context, Poll},
34};
35use tokio::{
36    io::{AsyncRead, AsyncSeek, ReadBuf},
37    sync::watch::Sender,
38    sync::{watch, Mutex},
39};
40use tokio_stream::wrappers::WatchStream;
41use tokio_util::sync::PollSender;
42use tracing::{info_span, Instrument};
43
44pub use error::AsyncHttpRangeReaderError;
45
46/// An `AsyncRangeReader` enables reading from a file over HTTP using range requests.
47///
48/// See the [`crate`] level documentation for more information.
49///
50/// The general entrypoint is [`AsyncHttpRangeReader::new`]. Depending on the
51/// [`CheckSupportMethod`], this will either call [`AsyncHttpRangeReader::initial_tail_request`] or
52/// [`AsyncHttpRangeReader::initial_head_request`] to send the initial request and then
53/// [`AsyncHttpRangeReader::from_tail_response`] or [`AsyncHttpRangeReader::from_head_response`] to
54/// initialize the async reader. If you want to apply a caching layer, you can send the initial head
55/// (or tail) request yourself with your cache headers (e.g. through the
56/// [http-cache-semantics](https://docs.rs/http-cache-semantics) crate):
57///
58/// ```rust
59/// # use url::Url;
60/// # use async_http_range_reader::{AsyncHttpRangeReader, AsyncHttpRangeReaderError};
61/// # use reqwest::header::HeaderMap;
62/// async fn get_reader_cached(
63///     url: Url,
64/// ) -> Result<Option<AsyncHttpRangeReader>, AsyncHttpRangeReaderError> {
65///     let etag = "63c550e8-5ae";
66///     let client = reqwest::Client::new();
67///     let response = client
68///         .head(url.clone())
69///         .header(reqwest::header::IF_NONE_MATCH, etag)
70///         .send()
71///         .await?;
72///     if response.status() == reqwest::StatusCode::NOT_MODIFIED {
73///         Ok(None)
74///     } else {
75///         let reader = AsyncHttpRangeReader::from_head_response(client, response, url, HeaderMap::default()).await?;
76///         Ok(Some(reader))
77///     }
78/// }
79/// ```
80#[derive(Debug)]
81pub struct AsyncHttpRangeReader {
82    inner: Mutex<Inner>,
83    len: u64,
84}
85
86#[derive(Default, Clone, Debug)]
87struct StreamerState {
88    resident_range: SparseRange,
89    requested_ranges: Vec<Range<u64>>,
90    error: Option<AsyncHttpRangeReaderError>,
91}
92
93#[derive(Debug)]
94struct Inner {
95    /// A read-only view on the memory mapped data. The `downloaded_range` indicates the regions of
96    /// memory that contain bytes that have been downloaded.
97    data: &'static [u8],
98
99    /// The current read position in the stream
100    pos: u64,
101
102    /// The range of bytes that have been requested for download
103    requested_range: SparseRange,
104
105    /// The range of bytes that have actually been downloaded to `data`.
106    streamer_state: StreamerState,
107
108    /// A channel receiver that holds the last downloaded range (or an error) from the background
109    /// task.
110    streamer_state_rx: WatchStream<StreamerState>,
111
112    /// A channel sender to send range requests to the background task
113    request_tx: tokio::sync::mpsc::Sender<Range<u64>>,
114
115    /// An optional object to reserve a slot in the `request_tx` sender. When in the process of
116    /// sending a requests this contains an actual value.
117    poll_request_tx: Option<PollSender<Range<u64>>>,
118}
119
120/// For the initial request, we support either directly requesting N bytes from the end for file
121/// or, if you the server doesn't support negative byte offsets, starting with a HEAD request
122/// instead
123pub enum CheckSupportMethod {
124    /// Perform a range request with a negative byte range. This will return the N bytes from the
125    /// *end* of the file as well as the file-size. This is especially useful to also immediately
126    /// get some bytes from the end of the file.
127    NegativeRangeRequest(u64),
128
129    /// Perform a head request to get the length of the file and check if the server supports range
130    /// requests.
131    Head,
132}
133
134fn error_for_status(response: reqwest::Response) -> reqwest_middleware::Result<Response> {
135    response
136        .error_for_status()
137        .map_err(reqwest_middleware::Error::Reqwest)
138}
139
140impl AsyncHttpRangeReader {
141    /// Construct a new `AsyncHttpRangeReader`.
142    pub async fn new(
143        client: impl Into<reqwest_middleware::ClientWithMiddleware>,
144        url: Url,
145        check_method: CheckSupportMethod,
146        extra_headers: HeaderMap,
147    ) -> Result<(Self, HeaderMap), AsyncHttpRangeReaderError> {
148        let client = client.into();
149        match check_method {
150            CheckSupportMethod::NegativeRangeRequest(initial_chunk_size) => {
151                let response = Self::initial_tail_request(
152                    client.clone(),
153                    url.clone(),
154                    initial_chunk_size,
155                    HeaderMap::default(),
156                )
157                .await?;
158                let response_headers = response.headers().clone();
159                let self_ = Self::from_tail_response(client, response, url, extra_headers).await?;
160                Ok((self_, response_headers))
161            }
162            CheckSupportMethod::Head => {
163                let response =
164                    Self::initial_head_request(client.clone(), url.clone(), HeaderMap::default())
165                        .await?;
166                let response_headers = response.headers().clone();
167                let self_ = Self::from_head_response(client, response, url, extra_headers).await?;
168                Ok((self_, response_headers))
169            }
170        }
171    }
172
173    /// Send an initial range request to determine if the remote accepts range
174    /// requests. This will return a number of bytes from the end of the stream. Use the
175    /// `initial_chunk_size` parameter to define how many bytes should be requested from the end.
176    pub async fn initial_tail_request(
177        client: impl Into<reqwest_middleware::ClientWithMiddleware>,
178        url: reqwest::Url,
179        initial_chunk_size: u64,
180        extra_headers: HeaderMap,
181    ) -> Result<Response, AsyncHttpRangeReaderError> {
182        let client = client.into();
183        let tail_response = client
184            .get(url)
185            .header(
186                reqwest::header::RANGE,
187                format!("bytes=-{initial_chunk_size}"),
188            )
189            .headers(extra_headers)
190            .send()
191            .await
192            .and_then(error_for_status)
193            .map_err(Arc::new)
194            .map_err(AsyncHttpRangeReaderError::HttpError)?;
195        Ok(tail_response)
196    }
197
198    /// Initialize the reader from [`AsyncHttpRangeReader::initial_tail_request`] (or a user
199    /// provided response that also has a range of bytes from the end as body)
200    pub async fn from_tail_response(
201        client: impl Into<reqwest_middleware::ClientWithMiddleware>,
202        tail_request_response: Response,
203        url: Url,
204        extra_headers: HeaderMap,
205    ) -> Result<Self, AsyncHttpRangeReaderError> {
206        let client = client.into();
207
208        // Get the size of the file from this initial request
209        let content_range_header = tail_request_response
210            .headers()
211            .get(reqwest::header::CONTENT_RANGE)
212            .ok_or(AsyncHttpRangeReaderError::ContentRangeMissing)?
213            .to_str()
214            .map_err(|_err| AsyncHttpRangeReaderError::ContentRangeMissing)?;
215        let content_range = ContentRange::parse(content_range_header).ok_or_else(|| {
216            AsyncHttpRangeReaderError::ContentRangeParser(content_range_header.to_string())
217        })?;
218        let (start, finish, complete_length) = match content_range {
219            ContentRange::Bytes(ContentRangeBytes {
220                first_byte,
221                last_byte,
222                complete_length,
223            }) => (first_byte, last_byte, complete_length),
224            _ => return Err(AsyncHttpRangeReaderError::HttpRangeRequestUnsupported),
225        };
226
227        // Allocate a memory map to hold the data
228        let memory_map = memmap2::MmapOptions::new()
229            .len(complete_length as usize)
230            .map_anon()
231            .map_err(Arc::new)
232            .map_err(AsyncHttpRangeReaderError::MemoryMapError)?;
233
234        // SAFETY: Get a read-only slice to the memory. This is safe because the memory map is never
235        // reallocated and we keep track of the initialized part.
236        let memory_map_slice =
237            unsafe { std::slice::from_raw_parts(memory_map.as_ptr(), memory_map.len()) };
238
239        let requested_range =
240            SparseRange::from_range(complete_length - (finish - start)..complete_length);
241
242        // adding more than 2 entries to the channel would block the sender. I assumed two would
243        // suffice because I would want to 1) prefetch a certain range and 2) read stuff via the
244        // AsyncRead implementation. Any extra would simply have to wait for one of these to
245        // succeed. I eventually used 10 because who cares.
246        let (request_tx, request_rx) = tokio::sync::mpsc::channel(10);
247        let (state_tx, state_rx) = watch::channel(StreamerState::default());
248        tokio::spawn(run_streamer(
249            client,
250            url,
251            extra_headers,
252            Some((tail_request_response, start)),
253            memory_map,
254            state_tx,
255            request_rx,
256        ));
257
258        // Configure the initial state of the streamer.
259        let mut streamer_state = StreamerState::default();
260        streamer_state
261            .requested_ranges
262            .push(complete_length - (finish - start)..complete_length);
263
264        let reader = Self {
265            len: memory_map_slice.len() as u64,
266            inner: Mutex::new(Inner {
267                data: memory_map_slice,
268                pos: 0,
269                requested_range,
270                streamer_state,
271                streamer_state_rx: WatchStream::new(state_rx),
272                request_tx,
273                poll_request_tx: None,
274            }),
275        };
276        Ok(reader)
277    }
278
279    /// Send an initial range request to determine if the remote accepts range
280    /// requests and get the content length
281    pub async fn initial_head_request(
282        client: impl Into<reqwest_middleware::ClientWithMiddleware>,
283        url: reqwest::Url,
284        extra_headers: HeaderMap,
285    ) -> Result<Response, AsyncHttpRangeReaderError> {
286        let client = client.into();
287
288        // Perform a HEAD request to get the content-length.
289        let head_response = client
290            .head(url.clone())
291            .headers(extra_headers)
292            .send()
293            .await
294            .and_then(error_for_status)
295            .map_err(Arc::new)
296            .map_err(AsyncHttpRangeReaderError::HttpError)?;
297        Ok(head_response)
298    }
299
300    /// Initialize the reader from [`AsyncHttpRangeReader::initial_head_request`] (or a user
301    /// provided response the)
302    pub async fn from_head_response(
303        client: impl Into<reqwest_middleware::ClientWithMiddleware>,
304        head_response: Response,
305        url: Url,
306        extra_headers: HeaderMap,
307    ) -> Result<Self, AsyncHttpRangeReaderError> {
308        let client = client.into();
309
310        // Are range requests supported?
311        if head_response
312            .headers()
313            .get(reqwest::header::ACCEPT_RANGES)
314            .and_then(|h| h.to_str().ok())
315            != Some("bytes")
316        {
317            return Err(AsyncHttpRangeReaderError::HttpRangeRequestUnsupported);
318        }
319
320        let content_length: u64 = head_response
321            .headers()
322            .get(reqwest::header::CONTENT_LENGTH)
323            .ok_or(AsyncHttpRangeReaderError::ContentLengthMissing)?
324            .to_str()
325            .map_err(|_err| AsyncHttpRangeReaderError::ContentLengthMissing)?
326            .parse()
327            .map_err(|_err| AsyncHttpRangeReaderError::ContentLengthMissing)?;
328
329        // Allocate a memory map to hold the data
330        let memory_map = memmap2::MmapOptions::new()
331            .len(content_length as _)
332            .map_anon()
333            .map_err(Arc::new)
334            .map_err(AsyncHttpRangeReaderError::MemoryMapError)?;
335
336        // SAFETY: Get a read-only slice to the memory. This is safe because the memory map is never
337        // reallocated and we keep track of the initialized part.
338        let memory_map_slice =
339            unsafe { std::slice::from_raw_parts(memory_map.as_ptr(), memory_map.len()) };
340
341        let requested_range = SparseRange::default();
342
343        // adding more than 2 entries to the channel would block the sender. I assumed two would
344        // suffice because I would want to 1) prefetch a certain range and 2) read stuff via the
345        // AsyncRead implementation. Any extra would simply have to wait for one of these to
346        // succeed. I eventually used 10 because who cares.
347        let (request_tx, request_rx) = tokio::sync::mpsc::channel(10);
348        let (state_tx, state_rx) = watch::channel(StreamerState::default());
349        tokio::spawn(run_streamer(
350            client,
351            url,
352            extra_headers,
353            None,
354            memory_map,
355            state_tx,
356            request_rx,
357        ));
358
359        // Configure the initial state of the streamer.
360        let streamer_state = StreamerState::default();
361
362        let reader = Self {
363            len: memory_map_slice.len() as u64,
364            inner: Mutex::new(Inner {
365                data: memory_map_slice,
366                pos: 0,
367                requested_range,
368                streamer_state,
369                streamer_state_rx: WatchStream::new(state_rx),
370                request_tx,
371                poll_request_tx: None,
372            }),
373        };
374        Ok(reader)
375    }
376
377    /// Returns the ranges that this instance actually performed HTTP requests for.
378    pub async fn requested_ranges(&self) -> Vec<Range<u64>> {
379        let mut inner = self.inner.lock().await;
380        if let Some(Some(new_state)) = inner.streamer_state_rx.next().now_or_never() {
381            inner.streamer_state = new_state;
382        }
383        inner.streamer_state.requested_ranges.clone()
384    }
385
386    /// Prefetches a range of bytes from the remote. When specifying a large range this can
387    /// drastically reduce the number of requests required to the server.
388    pub async fn prefetch(&mut self, bytes: Range<u64>) {
389        let inner = self.inner.get_mut();
390
391        // Ensure the range is withing the file size and non-zero of length.
392        let range = bytes.start..(bytes.end.min(inner.data.len() as u64));
393        if range.start >= range.end {
394            return;
395        }
396
397        // Check if the range has been requested or not.
398        let inner = self.inner.get_mut();
399        if let Some((new_range, _)) = inner.requested_range.cover(range.clone()) {
400            let _ = inner.request_tx.send(range).await;
401            inner.requested_range = new_range;
402        }
403    }
404
405    /// Returns the length of the stream in bytes
406    #[allow(clippy::len_without_is_empty)]
407    pub fn len(&self) -> u64 {
408        self.len
409    }
410}
411
412/// A task that will download parts from the remote archive and "send" them to the frontend as they
413/// become available.
414#[tracing::instrument(name = "fetch_ranges", skip_all, fields(url))]
415async fn run_streamer(
416    client: reqwest_middleware::ClientWithMiddleware,
417    url: Url,
418    extra_headers: HeaderMap,
419    initial_tail_response: Option<(Response, u64)>,
420    mut memory_map: MmapMut,
421    mut state_tx: Sender<StreamerState>,
422    mut request_rx: tokio::sync::mpsc::Receiver<Range<u64>>,
423) {
424    let mut state = StreamerState::default();
425
426    if let Some((response, response_start)) = initial_tail_response {
427        // Add the initial range to the state
428        state
429            .requested_ranges
430            .push(response_start..memory_map.len() as u64);
431
432        // Stream the initial data in memory
433        if !stream_response(
434            response,
435            response_start,
436            &mut memory_map,
437            &mut state_tx,
438            &mut state,
439        )
440        .await
441        {
442            return;
443        }
444    }
445
446    // Listen for any new incoming requests
447    'outer: loop {
448        let range = match request_rx.recv().await {
449            Some(range) => range,
450            None => {
451                break 'outer;
452            }
453        };
454
455        // Determine the range that we need to cover
456        let uncovered_ranges = match state.resident_range.cover(range) {
457            None => continue,
458            Some((_, uncovered_ranges)) => uncovered_ranges,
459        };
460
461        // Download and stream each range.
462        for range in uncovered_ranges {
463            // Update the requested ranges
464            state
465                .requested_ranges
466                .push(*range.start()..*range.end() + 1);
467
468            // Execute the request
469            let range_string = format!("bytes={}-{}", range.start(), range.end());
470            let span = info_span!("fetch_range", range = range_string.as_str());
471            let response = match client
472                .get(url.clone())
473                .header(reqwest::header::RANGE, range_string)
474                .headers(extra_headers.clone())
475                .send()
476                .instrument(span)
477                .await
478                .and_then(error_for_status)
479                .map_err(std::io::Error::other)
480            {
481                Err(e) => {
482                    state.error = Some(e.into());
483                    let _ = state_tx.send(state);
484                    break 'outer;
485                }
486                Ok(response) => response,
487            };
488
489            // If the server returns a successful, but non-206 response (e.g., 200), then it
490            // doesn't support range requests (even if the `Accept-Ranges` header is set).
491            if response.status() != reqwest::StatusCode::PARTIAL_CONTENT {
492                state.error = Some(AsyncHttpRangeReaderError::HttpRangeRequestUnsupported);
493                let _ = state_tx.send(state);
494                break 'outer;
495            }
496
497            if !stream_response(
498                response,
499                *range.start(),
500                &mut memory_map,
501                &mut state_tx,
502                &mut state,
503            )
504            .await
505            {
506                break 'outer;
507            }
508        }
509    }
510}
511
512/// Streams the data from the specified response to the memory map updating progress in between.
513/// Returns `true` if everything went fine, `false` if anything went wrong. The error state, if any,
514/// is stored in `state_tx` so the "frontend" will consume it.
515async fn stream_response(
516    tail_request_response: Response,
517    mut offset: u64,
518    memory_map: &mut MmapMut,
519    state_tx: &mut Sender<StreamerState>,
520    state: &mut StreamerState,
521) -> bool {
522    let mut byte_stream = tail_request_response.bytes_stream();
523    while let Some(bytes) = byte_stream.next().await {
524        let bytes = match bytes {
525            Err(e) => {
526                state.error = Some(e.into());
527                let _ = state_tx.send(state.clone());
528                return false;
529            }
530            Ok(bytes) => bytes,
531        };
532
533        // Determine the range of these bytes in the complete file
534        let byte_range = offset..offset + bytes.len() as u64;
535
536        // Update the offset
537        offset = byte_range.end;
538
539        // Copy the data from the stream to memory
540        memory_map[byte_range.start as usize..byte_range.end as usize]
541            .copy_from_slice(bytes.as_ref());
542
543        // Update the range of bytes that have been downloaded
544        state.resident_range.update(byte_range);
545
546        // Notify anyone that's listening that we have downloaded some extra data
547        if state_tx.send(state.clone()).is_err() {
548            // If we failed to set the state it means there is no receiver. In that case we should
549            // just exit.
550            return false;
551        }
552    }
553
554    true
555}
556
557impl AsyncSeek for AsyncHttpRangeReader {
558    fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
559        let me = self.get_mut();
560        let inner = me.inner.get_mut();
561
562        inner.pos = match position {
563            SeekFrom::Start(pos) => pos,
564            SeekFrom::End(relative) => (inner.data.len() as i64).saturating_add(relative) as u64,
565            SeekFrom::Current(relative) => (inner.pos as i64).saturating_add(relative) as u64,
566        };
567
568        Ok(())
569    }
570
571    fn poll_complete(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
572        let inner = self.inner.get_mut();
573        Poll::Ready(Ok(inner.pos))
574    }
575}
576
577impl AsyncRead for AsyncHttpRangeReader {
578    fn poll_read(
579        self: Pin<&mut Self>,
580        cx: &mut Context<'_>,
581        buf: &mut ReadBuf<'_>,
582    ) -> Poll<io::Result<()>> {
583        let me = self.get_mut();
584        let inner = me.inner.get_mut();
585
586        // If a previous error occurred we return that.
587        if let Some(e) = inner.streamer_state.error.as_ref() {
588            return Poll::Ready(Err(io::Error::other(e.clone())));
589        }
590
591        // Determine the range to be fetched
592        let range = inner.pos..(inner.pos + buf.remaining() as u64).min(inner.data.len() as u64);
593        if range.start >= range.end {
594            return Poll::Ready(Ok(()));
595        }
596
597        // Ensure we requested the required bytes
598        while !inner.requested_range.is_covered(range.clone()) {
599            // If there is an active range request wait for it to complete
600            if let Some(mut poll) = inner.poll_request_tx.take() {
601                match poll.poll_reserve(cx) {
602                    Poll::Ready(_) => {
603                        let _ = poll.send_item(range.clone());
604                        inner.requested_range.update(range.clone());
605                        break;
606                    }
607                    Poll::Pending => {
608                        inner.poll_request_tx = Some(poll);
609                        return Poll::Pending;
610                    }
611                }
612            }
613
614            // Request the range
615            inner.poll_request_tx = Some(PollSender::new(inner.request_tx.clone()));
616        }
617
618        // If there is still a request poll open but there is no need for a request, abort it.
619        if let Some(mut poll) = inner.poll_request_tx.take() {
620            poll.abort_send();
621        }
622
623        loop {
624            // Is the range already available?
625            if inner
626                .streamer_state
627                .resident_range
628                .is_covered(range.clone())
629            {
630                let len = (range.end - range.start) as usize;
631                buf.initialize_unfilled_to(len)
632                    .copy_from_slice(&inner.data[range.start as usize..range.end as usize]);
633                buf.advance(len);
634                inner.pos += len as u64;
635                return Poll::Ready(Ok(()));
636            }
637
638            // Otherwise wait for new data to come in
639            match ready!(Pin::new(&mut inner.streamer_state_rx).poll_next(cx)) {
640                None => unreachable!(),
641                Some(state) => {
642                    inner.streamer_state = state;
643                    if let Some(e) = inner.streamer_state.error.as_ref() {
644                        return Poll::Ready(Err(io::Error::other(e.clone())));
645                    }
646                }
647            }
648        }
649    }
650}
651
652#[cfg(test)]
653mod static_directory_server;
654
655#[cfg(test)]
656mod test {
657    use super::*;
658    use crate::static_directory_server::StaticDirectoryServer;
659    use assert_matches::assert_matches;
660    use async_zip::tokio::read::seek::ZipFileReader;
661    use futures::AsyncReadExt;
662    use reqwest::{Client, StatusCode};
663    use rstest::*;
664    use std::path::Path;
665    use tokio::io::AsyncReadExt as _;
666
667    #[rstest]
668    #[case(CheckSupportMethod::Head)]
669    #[case(CheckSupportMethod::NegativeRangeRequest(8192))]
670    #[tokio::test]
671    async fn async_range_reader_zip(#[case] check_method: CheckSupportMethod) {
672        // Spawn a static file server
673        let path = Path::new(&std::env::var("CARGO_MANIFEST_DIR").unwrap()).join("test-data");
674        let server = StaticDirectoryServer::new(&path)
675            .await
676            .expect("could not initialize server");
677
678        // check that file is there and has the right size
679        let filepath = path.join("andes-1.8.3-pyhd8ed1ab_0.conda");
680        assert!(
681            filepath.exists(),
682            "The conda package is not there yet. Did you run `git lfs pull`?"
683        );
684        let file_size = std::fs::metadata(&filepath).unwrap().len();
685        assert_eq!(
686            file_size, 2_463_995,
687            "The conda package is not there yet. Did you run `git lfs pull`?"
688        );
689
690        // Construct an AsyncRangeReader
691        let (mut range, _) = AsyncHttpRangeReader::new(
692            Client::new(),
693            server.url().join("andes-1.8.3-pyhd8ed1ab_0.conda").unwrap(),
694            check_method,
695            HeaderMap::default(),
696        )
697        .await
698        .expect("Could not download range - did you run `git lfs pull`?");
699
700        // Make sure we have read the last couple of bytes
701        range.prefetch(range.len() - 8192..range.len()).await;
702
703        assert_eq!(range.len(), file_size);
704
705        let mut reader = ZipFileReader::with_tokio(tokio::io::BufReader::with_capacity(0, range))
706            .await
707            .unwrap();
708
709        assert_eq!(
710            reader
711                .file()
712                .entries()
713                .iter()
714                .map(|e| e.filename().as_str().unwrap_or(""))
715                .collect::<Vec<_>>(),
716            vec![
717                "metadata.json",
718                "info-andes-1.8.3-pyhd8ed1ab_0.tar.zst",
719                "pkg-andes-1.8.3-pyhd8ed1ab_0.tar.zst",
720            ]
721        );
722
723        // Get the number of performed requests so far
724        let request_ranges = reader
725            .inner_mut()
726            .get_mut()
727            .get_mut()
728            .requested_ranges()
729            .await;
730        assert_eq!(request_ranges.len(), 1);
731        assert_eq!(
732            request_ranges[0].end - request_ranges[0].start,
733            8192,
734            "first request should be the size of the initial chunk size"
735        );
736        assert_eq!(
737            request_ranges[0].end, file_size,
738            "first request should be at the end"
739        );
740
741        // Prefetch the data for the metadata.json file
742        let entry = reader.file().entries().first().unwrap();
743        let offset = entry.header_offset();
744        // Get the size of the entry plus the header + size of the filename. We should also actually
745        // include bytes for the extra fields but we don't have that information.
746        let size = entry.compressed_size() + 30 + entry.filename().as_bytes().len() as u64;
747
748        // The zip archive uses as BufReader which reads in chunks of 8192. To ensure we prefetch
749        // enough data we round the size up to the nearest multiple of the buffer size.
750        let buffer_size = 8192;
751        let size = ((size + buffer_size - 1) / buffer_size) * buffer_size;
752
753        // Fetch the bytes from the zip archive that contain the requested file.
754        reader
755            .inner_mut()
756            .get_mut()
757            .get_mut()
758            .prefetch(offset..offset + size as u64)
759            .await;
760
761        // Read the contents of the metadata.json file
762        let mut contents = String::new();
763        reader
764            .reader_with_entry(0)
765            .await
766            .unwrap()
767            .read_to_string(&mut contents)
768            .await
769            .unwrap();
770
771        // Get the number of performed requests
772        let request_ranges = reader
773            .inner_mut()
774            .get_mut()
775            .get_mut()
776            .requested_ranges()
777            .await;
778
779        assert_eq!(contents, r#"{"conda_pkg_format_version": 2}"#);
780        assert_eq!(request_ranges.len(), 2);
781        assert_eq!(
782            request_ranges[1],
783            0..size,
784            "expected only two range requests"
785        );
786    }
787
788    #[rstest]
789    #[case(CheckSupportMethod::Head)]
790    #[case(CheckSupportMethod::NegativeRangeRequest(8192))]
791    #[tokio::test]
792    async fn async_range_reader(#[case] check_method: CheckSupportMethod) {
793        // Spawn a static file server
794        let path = Path::new(&std::env::var("CARGO_MANIFEST_DIR").unwrap()).join("test-data");
795        let server = StaticDirectoryServer::new(&path)
796            .await
797            .expect("could not initialize server");
798
799        // Construct an AsyncRangeReader
800        let (mut range, _) = AsyncHttpRangeReader::new(
801            Client::new(),
802            server.url().join("andes-1.8.3-pyhd8ed1ab_0.conda").unwrap(),
803            check_method,
804            HeaderMap::default(),
805        )
806        .await
807        .expect("bla");
808
809        // Also open a simple file reader
810        let mut file = tokio::fs::File::open(path.join("andes-1.8.3-pyhd8ed1ab_0.conda"))
811            .await
812            .unwrap();
813
814        // Read until the end and make sure that the contents matches
815        let mut range_read = vec![0; 64 * 1024];
816        let mut file_read = vec![0; 64 * 1024];
817        loop {
818            // Read with the async reader
819            let range_read_bytes = range.read(&mut range_read).await.unwrap();
820
821            // Read directly from the file
822            let file_read_bytes = file
823                .read_exact(&mut file_read[0..range_read_bytes])
824                .await
825                .unwrap();
826
827            assert_eq!(range_read_bytes, file_read_bytes);
828            assert_eq!(
829                range_read[0..range_read_bytes],
830                file_read[0..file_read_bytes]
831            );
832
833            if file_read_bytes == 0 && range_read_bytes == 0 {
834                break;
835            }
836        }
837    }
838
839    #[tokio::test]
840    async fn test_not_found() {
841        let server = StaticDirectoryServer::new(Path::new(env!("CARGO_MANIFEST_DIR")))
842            .await
843            .expect("could not initialize server");
844        let err = AsyncHttpRangeReader::new(
845            Client::new(),
846            server.url().join("not-found").unwrap(),
847            CheckSupportMethod::Head,
848            HeaderMap::default(),
849        )
850        .await
851        .expect_err("expected an error");
852
853        assert_matches!(
854            err, AsyncHttpRangeReaderError::HttpError(err) if err.status() == Some(StatusCode::NOT_FOUND)
855        );
856    }
857}