mod error;
mod sparse_range;
use futures::{FutureExt, Stream, StreamExt};
use http_content_range::{ContentRange, ContentRangeBytes};
use memmap2::MmapMut;
use reqwest::header::HeaderMap;
use reqwest::{Response, Url};
use sparse_range::SparseRange;
use std::{
io::{self, SeekFrom},
ops::Range,
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::{
io::{AsyncRead, AsyncSeek, ReadBuf},
sync::watch::Sender,
sync::{watch, Mutex},
};
use tokio_stream::wrappers::WatchStream;
use tokio_util::sync::PollSender;
use tracing::{info_span, Instrument};
pub use error::AsyncHttpRangeReaderError;
#[derive(Debug)]
pub struct AsyncHttpRangeReader {
inner: Mutex<Inner>,
len: u64,
}
#[derive(Default, Clone, Debug)]
struct StreamerState {
resident_range: SparseRange,
requested_ranges: Vec<Range<u64>>,
error: Option<AsyncHttpRangeReaderError>,
}
#[derive(Debug)]
struct Inner {
data: &'static [u8],
pos: u64,
requested_range: SparseRange,
streamer_state: StreamerState,
streamer_state_rx: WatchStream<StreamerState>,
request_tx: tokio::sync::mpsc::Sender<Range<u64>>,
poll_request_tx: Option<PollSender<Range<u64>>>,
}
pub enum CheckSupportMethod {
NegativeRangeRequest(u64),
Head,
}
fn error_for_status(response: reqwest::Response) -> reqwest_middleware::Result<Response> {
response
.error_for_status()
.map_err(reqwest_middleware::Error::Reqwest)
}
impl AsyncHttpRangeReader {
pub async fn new(
client: impl Into<reqwest_middleware::ClientWithMiddleware>,
url: Url,
check_method: CheckSupportMethod,
extra_headers: HeaderMap,
) -> Result<(Self, HeaderMap), AsyncHttpRangeReaderError> {
let client = client.into();
match check_method {
CheckSupportMethod::NegativeRangeRequest(initial_chunk_size) => {
let response = Self::initial_tail_request(
client.clone(),
url.clone(),
initial_chunk_size,
HeaderMap::default(),
)
.await?;
let response_headers = response.headers().clone();
let self_ = Self::from_range_response(client, response, url, extra_headers).await?;
Ok((self_, response_headers))
}
CheckSupportMethod::Head => {
let response =
Self::initial_head_request(client.clone(), url.clone(), HeaderMap::default())
.await?;
let response_headers = response.headers().clone();
let self_ = Self::from_head_response(client, response, url, extra_headers).await?;
Ok((self_, response_headers))
}
}
}
pub async fn initial_tail_request(
client: impl Into<reqwest_middleware::ClientWithMiddleware>,
url: reqwest::Url,
initial_chunk_size: u64,
extra_headers: HeaderMap,
) -> Result<Response, AsyncHttpRangeReaderError> {
let client = client.into();
let tail_response = client
.get(url)
.header(
reqwest::header::RANGE,
format!("bytes=-{initial_chunk_size}"),
)
.headers(extra_headers)
.send()
.await
.and_then(error_for_status)
.map_err(Arc::new)
.map_err(AsyncHttpRangeReaderError::HttpError)?;
Ok(tail_response)
}
#[deprecated(note = "use `from_range_response` instead")]
pub async fn from_tail_response(
client: impl Into<reqwest_middleware::ClientWithMiddleware>,
tail_request_response: Response,
url: Url,
extra_headers: HeaderMap,
) -> Result<Self, AsyncHttpRangeReaderError> {
Self::from_range_response(client, tail_request_response, url, extra_headers).await
}
pub async fn from_range_response(
client: impl Into<reqwest_middleware::ClientWithMiddleware>,
response: Response,
url: Url,
extra_headers: HeaderMap,
) -> Result<Self, AsyncHttpRangeReaderError> {
let client = client.into();
let content_range_header = response
.headers()
.get(reqwest::header::CONTENT_RANGE)
.ok_or(AsyncHttpRangeReaderError::ContentRangeMissing)?
.to_str()
.map_err(|_err| AsyncHttpRangeReaderError::ContentRangeMissing)?;
let content_range = ContentRange::parse(content_range_header).ok_or_else(|| {
AsyncHttpRangeReaderError::ContentRangeParser(content_range_header.to_string())
})?;
let (start, end_inclusive, complete_length) = match content_range {
ContentRange::Bytes(ContentRangeBytes {
first_byte,
last_byte,
complete_length,
}) => (first_byte, last_byte, complete_length),
_ => return Err(AsyncHttpRangeReaderError::HttpRangeRequestUnsupported),
};
let memory_map = memmap2::MmapOptions::new()
.len(complete_length as usize)
.map_anon()
.map_err(Arc::new)
.map_err(AsyncHttpRangeReaderError::MemoryMapError)?;
let memory_map_slice =
unsafe { std::slice::from_raw_parts(memory_map.as_ptr(), memory_map.len()) };
let requested_range = SparseRange::from_range(start..end_inclusive + 1);
let (request_tx, request_rx) = tokio::sync::mpsc::channel(10);
let (state_tx, state_rx) = watch::channel(StreamerState::default());
tokio::spawn(run_streamer(
client,
url,
extra_headers,
Some((response, start, end_inclusive + 1)),
memory_map,
state_tx,
request_rx,
));
let mut streamer_state = StreamerState::default();
streamer_state
.requested_ranges
.push(start..end_inclusive + 1);
let reader = Self {
len: memory_map_slice.len() as u64,
inner: Mutex::new(Inner {
data: memory_map_slice,
pos: 0,
requested_range,
streamer_state,
streamer_state_rx: WatchStream::new(state_rx),
request_tx,
poll_request_tx: None,
}),
};
Ok(reader)
}
pub async fn initial_head_request(
client: impl Into<reqwest_middleware::ClientWithMiddleware>,
url: reqwest::Url,
extra_headers: HeaderMap,
) -> Result<Response, AsyncHttpRangeReaderError> {
let client = client.into();
let head_response = client
.head(url.clone())
.headers(extra_headers)
.send()
.await
.and_then(error_for_status)
.map_err(Arc::new)
.map_err(AsyncHttpRangeReaderError::HttpError)?;
Ok(head_response)
}
pub async fn from_head_response(
client: impl Into<reqwest_middleware::ClientWithMiddleware>,
head_response: Response,
url: Url,
extra_headers: HeaderMap,
) -> Result<Self, AsyncHttpRangeReaderError> {
let client = client.into();
if head_response
.headers()
.get(reqwest::header::ACCEPT_RANGES)
.and_then(|h| h.to_str().ok())
!= Some("bytes")
{
return Err(AsyncHttpRangeReaderError::HttpRangeRequestUnsupported);
}
let content_length: u64 = head_response
.headers()
.get(reqwest::header::CONTENT_LENGTH)
.ok_or(AsyncHttpRangeReaderError::ContentLengthMissing)?
.to_str()
.map_err(|_err| AsyncHttpRangeReaderError::ContentLengthMissing)?
.parse()
.map_err(|_err| AsyncHttpRangeReaderError::ContentLengthMissing)?;
let memory_map = memmap2::MmapOptions::new()
.len(content_length as _)
.map_anon()
.map_err(Arc::new)
.map_err(AsyncHttpRangeReaderError::MemoryMapError)?;
let memory_map_slice =
unsafe { std::slice::from_raw_parts(memory_map.as_ptr(), memory_map.len()) };
let requested_range = SparseRange::default();
let (request_tx, request_rx) = tokio::sync::mpsc::channel(10);
let (state_tx, state_rx) = watch::channel(StreamerState::default());
tokio::spawn(run_streamer(
client,
url,
extra_headers,
None,
memory_map,
state_tx,
request_rx,
));
let streamer_state = StreamerState::default();
let reader = Self {
len: memory_map_slice.len() as u64,
inner: Mutex::new(Inner {
data: memory_map_slice,
pos: 0,
requested_range,
streamer_state,
streamer_state_rx: WatchStream::new(state_rx),
request_tx,
poll_request_tx: None,
}),
};
Ok(reader)
}
pub async fn requested_ranges(&self) -> Vec<Range<u64>> {
let mut inner = self.inner.lock().await;
if let Some(Some(new_state)) = inner.streamer_state_rx.next().now_or_never() {
inner.streamer_state = new_state;
}
inner.streamer_state.requested_ranges.clone()
}
pub async fn prefetch(&mut self, bytes: Range<u64>) {
let inner = self.inner.get_mut();
let range = bytes.start..(bytes.end.min(inner.data.len() as u64));
if range.start >= range.end {
return;
}
let inner = self.inner.get_mut();
if let Some((new_range, _)) = inner.requested_range.cover(range.clone()) {
let _ = inner.request_tx.send(range).await;
inner.requested_range = new_range;
}
}
#[allow(clippy::len_without_is_empty)]
pub fn len(&self) -> u64 {
self.len
}
}
#[tracing::instrument(name = "fetch_ranges", skip_all, fields(url))]
async fn run_streamer(
client: reqwest_middleware::ClientWithMiddleware,
url: Url,
extra_headers: HeaderMap,
response: Option<(Response, u64, u64)>,
mut memory_map: MmapMut,
mut state_tx: Sender<StreamerState>,
mut request_rx: tokio::sync::mpsc::Receiver<Range<u64>>,
) {
let mut state = StreamerState::default();
if let Some((response, start, end_exclusive)) = response {
state.requested_ranges.push(start..end_exclusive);
if !stream_response(
response,
start,
end_exclusive,
&mut memory_map,
&mut state_tx,
&mut state,
)
.await
{
return;
}
}
'outer: loop {
let range = match request_rx.recv().await {
Some(range) => range,
None => {
break 'outer;
}
};
let uncovered_ranges = match state.resident_range.cover(range) {
None => continue,
Some((_, uncovered_ranges)) => uncovered_ranges,
};
for range in uncovered_ranges {
state
.requested_ranges
.push(*range.start()..*range.end() + 1);
let range_string = format!("bytes={}-{}", range.start(), range.end());
let span = info_span!("fetch_range", range = range_string.as_str());
let response = match client
.get(url.clone())
.header(reqwest::header::RANGE, range_string)
.headers(extra_headers.clone())
.send()
.instrument(span)
.await
.and_then(error_for_status)
.map_err(std::io::Error::other)
{
Err(e) => {
state.error = Some(e.into());
let _ = state_tx.send(state);
break 'outer;
}
Ok(response) => response,
};
if let Err(err) =
validate_content_range(&response, *range.start(), *range.end(), memory_map.len())
{
state.error = Some(err);
let _ = state_tx.send(state);
break 'outer;
}
if response.status() != reqwest::StatusCode::PARTIAL_CONTENT {
state.error = Some(AsyncHttpRangeReaderError::HttpRangeRequestUnsupported);
let _ = state_tx.send(state);
break 'outer;
}
if !stream_response(
response,
*range.start(),
*range.end() + 1,
&mut memory_map,
&mut state_tx,
&mut state,
)
.await
{
break 'outer;
}
}
}
}
fn validate_content_range(
response: &Response,
expected_start: u64,
expected_end_inclusive: u64,
expected_complete_length: usize,
) -> Result<(), AsyncHttpRangeReaderError> {
let content_range_header = response
.headers()
.get(reqwest::header::CONTENT_RANGE)
.ok_or(AsyncHttpRangeReaderError::ContentRangeMissing)?
.to_str()
.map_err(|_err| AsyncHttpRangeReaderError::ContentRangeMissing)?;
let content_range = ContentRange::parse(content_range_header).ok_or_else(|| {
AsyncHttpRangeReaderError::ContentRangeParser(content_range_header.to_string())
})?;
let (actual_start, actual_end_inclusive, actual_complete_length) = match content_range {
ContentRange::Bytes(ContentRangeBytes {
first_byte,
last_byte,
complete_length,
}) => (first_byte, last_byte, complete_length),
_ => return Err(AsyncHttpRangeReaderError::HttpRangeRequestUnsupported),
};
if expected_start != actual_start
|| expected_end_inclusive != actual_end_inclusive
|| expected_complete_length as u64 != actual_complete_length
{
return Err(AsyncHttpRangeReaderError::RangeMismatch {
expected_start,
expected_end_inclusive,
expected_complete_length,
actual_start,
actual_end_inclusive,
actual_complete_length,
});
}
Ok(())
}
async fn stream_response(
tail_request_response: Response,
start: u64,
end_exclusive: u64,
memory_map: &mut MmapMut,
state_tx: &mut Sender<StreamerState>,
state: &mut StreamerState,
) -> bool {
assert!(
(end_exclusive as usize) <= memory_map.len(),
"end is outside of memory map {} > {}",
end_exclusive,
memory_map.len()
);
let mut offset = start;
let mut byte_stream = tail_request_response.bytes_stream();
while let Some(bytes) = byte_stream.next().await {
let bytes = match bytes {
Err(e) => {
state.error = Some(e.into());
let _ = state_tx.send(state.clone());
return false;
}
Ok(bytes) => bytes,
};
let byte_range = offset..offset + bytes.len() as u64;
offset += bytes.len() as u64;
if offset > end_exclusive {
state.error = Some(AsyncHttpRangeReaderError::ResponseTooLong {
expected: end_exclusive - start,
});
let _ = state_tx.send(state.clone());
return false;
}
memory_map[byte_range.start as usize..byte_range.end as usize]
.copy_from_slice(bytes.as_ref());
state.resident_range.update(byte_range);
if state_tx.send(state.clone()).is_err() {
return false;
}
}
if offset != end_exclusive {
state.error = Some(AsyncHttpRangeReaderError::ResponseTooShort {
expected: end_exclusive - start,
actual: offset - start,
});
let _ = state_tx.send(state.clone());
return false;
}
true
}
impl AsyncSeek for AsyncHttpRangeReader {
fn start_seek(self: Pin<&mut Self>, position: SeekFrom) -> io::Result<()> {
let me = self.get_mut();
let inner = me.inner.get_mut();
inner.pos = match position {
SeekFrom::Start(pos) => pos,
SeekFrom::End(relative) => (inner.data.len() as i64).saturating_add(relative) as u64,
SeekFrom::Current(relative) => (inner.pos as i64).saturating_add(relative) as u64,
};
Ok(())
}
fn poll_complete(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
let inner = self.inner.get_mut();
Poll::Ready(Ok(inner.pos))
}
}
impl AsyncRead for AsyncHttpRangeReader {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let me = self.get_mut();
let inner = me.inner.get_mut();
if let Some(e) = inner.streamer_state.error.as_ref() {
return Poll::Ready(Err(io::Error::other(e.clone())));
}
let range = inner.pos..(inner.pos + buf.remaining() as u64).min(inner.data.len() as u64);
if range.start >= range.end {
return Poll::Ready(Ok(()));
}
while !inner.requested_range.is_covered(range.clone()) {
if let Some(mut poll) = inner.poll_request_tx.take() {
match poll.poll_reserve(cx) {
Poll::Ready(_) => {
let _ = poll.send_item(range.clone());
inner.requested_range.update(range.clone());
break;
}
Poll::Pending => {
inner.poll_request_tx = Some(poll);
return Poll::Pending;
}
}
}
inner.poll_request_tx = Some(PollSender::new(inner.request_tx.clone()));
}
if let Some(mut poll) = inner.poll_request_tx.take() {
poll.abort_send();
}
loop {
if inner
.streamer_state
.resident_range
.is_covered(range.clone())
{
let len = (range.end - range.start) as usize;
buf.initialize_unfilled_to(len)
.copy_from_slice(&inner.data[range.start as usize..range.end as usize]);
buf.advance(len);
inner.pos += len as u64;
return Poll::Ready(Ok(()));
}
match ready!(Pin::new(&mut inner.streamer_state_rx).poll_next(cx)) {
None => unreachable!(),
Some(state) => {
inner.streamer_state = state;
if let Some(e) = inner.streamer_state.error.as_ref() {
return Poll::Ready(Err(io::Error::other(e.clone())));
}
}
}
}
}
}
#[cfg(test)]
mod static_directory_server;
#[cfg(test)]
mod test {
use super::*;
use crate::static_directory_server::StaticDirectoryServer;
use assert_matches::assert_matches;
use async_zip::tokio::read::seek::ZipFileReader;
use axum::body::Body;
use axum::extract::Request;
use axum::response::IntoResponse;
use futures::AsyncReadExt;
use reqwest::header;
use reqwest::Method;
use reqwest::{Client, StatusCode};
use rstest::*;
use std::path::Path;
use tokio::io::AsyncReadExt as _;
#[rstest]
#[case(CheckSupportMethod::Head)]
#[case(CheckSupportMethod::NegativeRangeRequest(8192))]
#[tokio::test]
async fn async_range_reader_zip(#[case] check_method: CheckSupportMethod) {
let path = Path::new(&std::env::var("CARGO_MANIFEST_DIR").unwrap()).join("test-data");
let server = StaticDirectoryServer::new(&path)
.await
.expect("could not initialize server");
let filepath = path.join("andes-1.8.3-pyhd8ed1ab_0.conda");
assert!(
filepath.exists(),
"The conda package is not there yet. Did you run `git lfs pull`?"
);
let file_size = std::fs::metadata(&filepath).unwrap().len();
assert_eq!(
file_size, 2_463_995,
"The conda package is not there yet. Did you run `git lfs pull`?"
);
let (mut range, _) = AsyncHttpRangeReader::new(
Client::new(),
server.url().join("andes-1.8.3-pyhd8ed1ab_0.conda").unwrap(),
check_method,
HeaderMap::default(),
)
.await
.expect("Could not download range - did you run `git lfs pull`?");
range.prefetch(range.len() - 8192..range.len()).await;
assert_eq!(range.len(), file_size);
let mut reader = ZipFileReader::with_tokio(tokio::io::BufReader::with_capacity(0, range))
.await
.unwrap();
assert_eq!(
reader
.file()
.entries()
.iter()
.map(|e| e.filename().as_str().unwrap_or(""))
.collect::<Vec<_>>(),
vec![
"metadata.json",
"info-andes-1.8.3-pyhd8ed1ab_0.tar.zst",
"pkg-andes-1.8.3-pyhd8ed1ab_0.tar.zst",
]
);
let request_ranges = reader
.inner_mut()
.get_mut()
.get_mut()
.requested_ranges()
.await;
assert_eq!(request_ranges.len(), 1);
assert_eq!(
request_ranges[0].end - request_ranges[0].start,
8192,
"first request should be the size of the initial chunk size"
);
assert_eq!(
request_ranges[0].end, file_size,
"first request should be at the end"
);
let entry = reader.file().entries().first().unwrap();
let offset = entry.header_offset();
let size = entry.compressed_size() + 30 + entry.filename().as_bytes().len() as u64;
let buffer_size = 8192;
let size = ((size + buffer_size - 1) / buffer_size) * buffer_size;
reader
.inner_mut()
.get_mut()
.get_mut()
.prefetch(offset..offset + size as u64)
.await;
let mut contents = String::new();
reader
.reader_with_entry(0)
.await
.unwrap()
.read_to_string(&mut contents)
.await
.unwrap();
let request_ranges = reader
.inner_mut()
.get_mut()
.get_mut()
.requested_ranges()
.await;
assert_eq!(contents, r#"{"conda_pkg_format_version": 2}"#);
assert_eq!(request_ranges.len(), 2);
assert_eq!(
request_ranges[1],
0..size,
"expected only two range requests"
);
}
#[rstest]
#[case(CheckSupportMethod::Head)]
#[case(CheckSupportMethod::NegativeRangeRequest(8192))]
#[tokio::test]
async fn async_range_reader(#[case] check_method: CheckSupportMethod) {
let path = Path::new(&std::env::var("CARGO_MANIFEST_DIR").unwrap()).join("test-data");
let server = StaticDirectoryServer::new(&path)
.await
.expect("could not initialize server");
let (mut range, _) = AsyncHttpRangeReader::new(
Client::new(),
server.url().join("andes-1.8.3-pyhd8ed1ab_0.conda").unwrap(),
check_method,
HeaderMap::default(),
)
.await
.expect("bla");
let mut file = tokio::fs::File::open(path.join("andes-1.8.3-pyhd8ed1ab_0.conda"))
.await
.unwrap();
let mut range_read = vec![0; 64 * 1024];
let mut file_read = vec![0; 64 * 1024];
loop {
let range_read_bytes = range.read(&mut range_read).await.unwrap();
let file_read_bytes = file
.read_exact(&mut file_read[0..range_read_bytes])
.await
.unwrap();
assert_eq!(range_read_bytes, file_read_bytes);
assert_eq!(
range_read[0..range_read_bytes],
file_read[0..file_read_bytes]
);
if file_read_bytes == 0 && range_read_bytes == 0 {
break;
}
}
}
#[tokio::test]
async fn test_not_found() {
let server = StaticDirectoryServer::new(Path::new(env!("CARGO_MANIFEST_DIR")))
.await
.expect("could not initialize server");
let err = AsyncHttpRangeReader::new(
Client::new(),
server.url().join("not-found").unwrap(),
CheckSupportMethod::Head,
HeaderMap::default(),
)
.await
.expect_err("expected an error");
assert_matches!(
err, AsyncHttpRangeReaderError::HttpError(err) if err.status() == Some(StatusCode::NOT_FOUND)
);
}
async fn spawn_mismatch_server(
head_content_length: usize,
pretend_size: usize,
actual_size: usize,
) -> Url {
let app =
axum::Router::new().fallback(async move |request: Request| match *request.method() {
Method::HEAD => {
let headers = [
(header::CONTENT_LENGTH, head_content_length.to_string()),
(header::ACCEPT_RANGES, "bytes".to_string()),
];
(StatusCode::OK, headers).into_response()
}
Method::GET => {
let range_header = request
.headers()
.get(header::RANGE)
.unwrap()
.to_str()
.unwrap()
.to_string();
let range_spec = range_header.strip_prefix("bytes=").unwrap();
let (start_str, _end_str) = range_spec.split_once('-').unwrap();
let start = start_str.parse::<usize>().unwrap();
let end = start + pretend_size - 1;
axum::response::Response::builder()
.status(StatusCode::PARTIAL_CONTENT)
.header(
header::CONTENT_RANGE,
format!("bytes {start}-{end}/{head_content_length}"),
)
.body(Body::from(vec![1u8; actual_size]))
.unwrap()
.into_response()
}
_ => StatusCode::METHOD_NOT_ALLOWED.into_response(),
});
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, app.into_make_service())
.await
.unwrap();
});
Url::parse(&format!("http://localhost:{}/file", local_addr.port())).unwrap()
}
#[tokio::test]
async fn test_content_length_response_beyond_content_length() {
fn into_range_error(err: std::io::Error) -> AsyncHttpRangeReaderError {
err.into_inner()
.unwrap()
.downcast::<AsyncHttpRangeReaderError>()
.map(|e| *e)
.unwrap()
}
let cases: Vec<(usize, usize, usize, Option<AsyncHttpRangeReaderError>)> = vec![
(512, 512, 512, None),
(
512,
512,
1024,
Some(AsyncHttpRangeReaderError::ResponseTooLong { expected: 512 }),
),
(
512,
1024,
1024,
Some(AsyncHttpRangeReaderError::ContentRangeParser(
"bytes 0-1023/512".to_string(),
)),
),
(
512,
1024,
512,
Some(AsyncHttpRangeReaderError::ContentRangeParser(
"bytes 0-1023/512".to_string(),
)),
),
(1024, 512, 512, None),
(
1024,
512,
1024,
Some(AsyncHttpRangeReaderError::ResponseTooLong { expected: 512 }),
),
(
1024,
1024,
1024,
Some(AsyncHttpRangeReaderError::RangeMismatch {
expected_start: 0,
expected_end_inclusive: 511,
expected_complete_length: 1024,
actual_start: 0,
actual_end_inclusive: 1023,
actual_complete_length: 1024,
}),
),
(
1024,
1024,
512,
Some(AsyncHttpRangeReaderError::RangeMismatch {
expected_start: 0,
expected_end_inclusive: 511,
expected_complete_length: 1024,
actual_start: 0,
actual_end_inclusive: 1023,
actual_complete_length: 1024,
}),
),
];
for (head_content_length, range_header_length, range_actual_length, expected_error) in cases
{
let url = spawn_mismatch_server(
head_content_length,
range_header_length,
range_actual_length,
)
.await;
let (mut reader, _) = AsyncHttpRangeReader::new(
Client::new(),
url,
CheckSupportMethod::Head,
HeaderMap::default(),
)
.await
.unwrap();
assert_eq!(reader.len(), head_content_length as u64);
reader.prefetch(0..512).await;
let mut buf = vec![0u8; 512];
let result = reader.read(&mut buf).await;
let label =
format!("{head_content_length} {range_header_length} {range_actual_length}");
match expected_error {
None => {
assert_matches!(result, Ok(_), "{label}");
}
Some(expected) => {
assert_eq!(
into_range_error(result.unwrap_err()).to_string(),
expected.to_string(),
"{label}"
);
}
}
}
}
}