use super::{DataReaderTrait, network_reader::NetworkReader};
use crate::{Blob, ByteRange};
use anyhow::{Result, anyhow, bail};
use async_trait::async_trait;
use percent_encoding::percent_decode_str;
use regex::{Regex, RegexBuilder};
use reqwest::{Client, RequestBuilder, StatusCode, Url};
use std::{
fmt, str,
sync::{LazyLock, atomic::AtomicU64},
time::Duration,
};
use tokio::time::sleep;
pub struct DataReaderHttp {
client: Client,
name: String,
url: Url,
username: Option<String>,
password: Option<String>,
max_request_bytes: AtomicU64,
}
impl fmt::Debug for DataReaderHttp {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DataReaderHttp")
.field("url", &self.url.as_str())
.field("has_credentials", &self.username.is_some())
.finish()
}
}
impl TryFrom<&Url> for DataReaderHttp {
type Error = anyhow::Error;
fn try_from(url: &Url) -> Result<DataReaderHttp> {
let mut url = url.clone();
let username = if url.username().is_empty() {
None
} else {
Some(percent_decode_str(url.username()).decode_utf8()?.into_owned())
};
let password: Option<String> = if let Some(p) = url.password() {
Some(if let Ok(v) = percent_decode_str(p).decode_utf8() {
v.into_owned()
} else {
bail!("failed to decode password");
})
} else {
None
};
url.set_username("").map_err(|_| anyhow!("failed to set username"))?;
url.set_password(None).map_err(|_| anyhow!("failed to set password"))?;
match url.scheme() {
"http" | "https" => (),
other => bail!("unsupported URL scheme '{other}' in '{url}', expected 'http' or 'https'"),
}
let client = Client::builder()
.connect_timeout(Duration::from_secs(30))
.tcp_keepalive(Duration::from_secs(60))
.use_rustls_tls()
.build()?;
Ok(DataReaderHttp {
client,
name: url.to_string(),
url,
username,
password,
max_request_bytes: AtomicU64::new(u64::MAX),
})
}
}
impl DataReaderHttp {
fn apply_auth(&self, builder: RequestBuilder) -> RequestBuilder {
if let Some(username) = &self.username {
builder.basic_auth(username, self.password.as_deref())
} else {
builder
}
}
}
const MAX_RETRIES: u32 = 2;
#[cfg(not(test))]
const BACKOFF: fn(u32) -> Duration = |exp| Duration::from_secs(1 << exp);
#[cfg(test)]
const BACKOFF: fn(u32) -> Duration = |exp| Duration::from_millis(1 << exp);
fn is_retryable_error(err: &reqwest::Error) -> bool {
err.is_connect() || err.is_timeout() || err.is_body()
}
impl DataReaderHttp {
async fn try_read_range_impl(&self, range: &ByteRange) -> Result<Blob> {
let request_range: String = format!("bytes={}-{}", range.offset, range.length + range.offset - 1);
let total_attempts = MAX_RETRIES + 1;
let url = &self.url;
let len = range.length;
for attempt in 0..=MAX_RETRIES {
let attempt_label = format!("attempt {}/{total_attempts}", attempt + 1);
if attempt > 0 {
let backoff = BACKOFF(attempt - 1);
log::warn!("HTTP read {range} from '{url}': retrying ({attempt_label}, waiting {backoff:?})");
sleep(backoff).await;
}
let response = match self
.apply_auth(self.client.get(self.url.clone()))
.header("range", &request_range)
.send()
.await
{
Ok(r) => r,
Err(e) if is_retryable_error(&e) && attempt < MAX_RETRIES => {
log::warn!("HTTP read {range} from '{url}': {e} ({attempt_label}), will retry");
continue;
}
Err(e) => {
bail!("could not read {range} ({len} bytes) from '{url}': {e} — gave up after {total_attempts} attempts")
}
};
let status = response.status();
if status.is_server_error() && attempt < MAX_RETRIES {
log::warn!("HTTP read {range} from '{url}': server returned {status} ({attempt_label}), will retry");
continue;
}
if status != StatusCode::PARTIAL_CONTENT {
if status.is_server_error() {
bail!(
"could not read {range} ({len} bytes) from '{url}': server returned {status} — gave up after {total_attempts} attempts"
);
}
bail!("could not read {range} ({len} bytes) from '{url}': expected HTTP 206, got {status}");
}
let content_range = response
.headers()
.get("content-range")
.ok_or_else(|| anyhow!("response is missing Content-Range header"))?
.to_str()?;
static RE_RANGE: LazyLock<Regex> = LazyLock::new(|| {
RegexBuilder::new(r"^bytes (\d+)-(\d+)/\d+$")
.case_insensitive(true)
.build()
.expect("valid regex literal")
});
let caps = RE_RANGE.captures(content_range).ok_or_else(|| {
anyhow!("unexpected Content-Range format: '{content_range}', expected 'bytes <start>-<end>/<total>'")
})?;
let content_range_start: u64 = caps[1].parse()?;
let content_range_end: u64 = caps[2].parse()?;
if content_range_start != range.offset {
bail!(
"Content-Range start mismatch: expected {}, got {content_range_start}",
range.offset
);
}
let expected_end = range.offset + range.length - 1;
if content_range_end != expected_end {
bail!("Content-Range end mismatch: expected {expected_end}, got {content_range_end}");
}
let bytes = match response.bytes().await {
Ok(b) => b,
Err(e) if is_retryable_error(&e) && attempt < MAX_RETRIES => {
log::warn!("HTTP read {range} from '{url}': error reading body: {e} ({attempt_label}), will retry");
continue;
}
Err(e) => bail!(
"could not read {range} ({len} bytes) from '{url}': error reading body: {e} — gave up after {total_attempts} attempts"
),
};
return Ok(Blob::from(&*bytes));
}
bail!("could not read {range} ({len} bytes) from '{url}' — gave up after {total_attempts} attempts")
}
}
#[async_trait]
impl NetworkReader for DataReaderHttp {
async fn try_read_range(&self, range: &ByteRange) -> Result<Blob> {
self.try_read_range_impl(range).await
}
fn max_request_bytes(&self) -> &AtomicU64 {
&self.max_request_bytes
}
}
#[async_trait]
impl DataReaderTrait for DataReaderHttp {
async fn read_range(&self, range: &ByteRange) -> Result<Blob> {
self.network_read_range(range).await
}
async fn read_all(&self) -> Result<Blob> {
let total_attempts = MAX_RETRIES + 1;
let url = &self.url;
for attempt in 0..=MAX_RETRIES {
let attempt_label = format!("attempt {}/{total_attempts}", attempt + 1);
if attempt > 0 {
let backoff = BACKOFF(attempt - 1);
log::warn!("HTTP read from '{url}': retrying ({attempt_label}, waiting {backoff:?})");
sleep(backoff).await;
}
let response = match self.apply_auth(self.client.get(self.url.clone())).send().await {
Ok(r) => r,
Err(e) if is_retryable_error(&e) && attempt < MAX_RETRIES => {
log::warn!("HTTP read from '{url}': {e} ({attempt_label}), will retry");
continue;
}
Err(e) => bail!("could not read from '{url}': {e} — gave up after {total_attempts} attempts"),
};
let status = response.status();
if status.is_server_error() && attempt < MAX_RETRIES {
log::warn!("HTTP read from '{url}': server returned {status} ({attempt_label}), will retry");
continue;
}
if !status.is_success() {
if status.is_server_error() {
bail!("could not read from '{url}': server returned {status} — gave up after {total_attempts} attempts");
}
bail!("could not read from '{url}': server returned {status}");
}
let bytes = match response.bytes().await {
Ok(b) => b,
Err(e) if is_retryable_error(&e) && attempt < MAX_RETRIES => {
log::warn!("HTTP read from '{url}': error reading body: {e} ({attempt_label}), will retry");
continue;
}
Err(e) => {
bail!("could not read from '{url}': error reading body: {e} — gave up after {total_attempts} attempts")
}
};
return Ok(Blob::from(&*bytes));
}
bail!("could not read from '{url}' — gave up after {total_attempts} attempts")
}
fn name(&self) -> &str {
&self.name
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn new() {
let valid_url = Url::parse("https://www.example.com").unwrap();
let invalid_url = Url::parse("ftp://www.example.com").unwrap();
let data_reader_http = DataReaderHttp::try_from(&valid_url);
assert!(data_reader_http.is_ok());
let data_reader_http = DataReaderHttp::try_from(&invalid_url);
assert!(data_reader_http.is_err());
}
async fn read_range_helper(url: &str, offset: u64, length: u64, expected: &str) -> Result<()> {
let url = Url::parse(url).unwrap();
let data_reader_http = DataReaderHttp::try_from(&url)?;
let range = ByteRange { offset, length };
let blob = data_reader_http.read_range(&range).await?;
let result_text = str::from_utf8(blob.as_slice())?;
assert_eq!(result_text, expected);
Ok(())
}
#[tokio::test]
async fn read_range_git() {
read_range_helper(
"https://raw.githubusercontent.com/versatiles-org/versatiles-rs/refs/heads/main/testdata/berlin.mbtiles",
7,
8,
"format 3",
)
.await
.unwrap();
}
#[tokio::test]
async fn read_range_google() {
read_range_helper("https://google.com/", 100, 110, "plingplong")
.await
.unwrap_err();
}
#[test]
fn get_name() -> Result<()> {
let url = "https://www.example.com/";
let data_reader_http = DataReaderHttp::try_from(&Url::parse(url).unwrap())?;
assert_eq!(data_reader_http.name(), url);
Ok(())
}
#[test]
fn from_url_with_credentials() -> Result<()> {
let url = Url::parse("https://user:p%40ss@example.com/data.bin").unwrap();
let reader = DataReaderHttp::try_from(&url)?;
assert_eq!(reader.username.as_deref(), Some("user"));
assert_eq!(reader.password.as_deref(), Some("p@ss"));
assert_eq!(reader.name(), "https://example.com/data.bin");
assert_eq!(reader.url.username(), "");
assert_eq!(reader.url.password(), None);
Ok(())
}
#[test]
fn from_url_without_credentials() -> Result<()> {
let url = Url::parse("https://example.com/data.bin").unwrap();
let reader = DataReaderHttp::try_from(&url)?;
assert_eq!(reader.username, None);
assert_eq!(reader.password, None);
assert_eq!(reader.name(), "https://example.com/data.bin");
Ok(())
}
#[test]
fn debug_impl_hides_credentials() -> Result<()> {
let with_creds = DataReaderHttp::try_from(&Url::parse("https://user:pass@example.com/").unwrap())?;
let debug = format!("{with_creds:?}");
assert!(debug.contains("has_credentials: true"));
assert!(!debug.contains("pass"));
let no_creds = DataReaderHttp::try_from(&Url::parse("https://example.com/").unwrap())?;
let debug = format!("{no_creds:?}");
assert!(debug.contains("has_credentials: false"));
Ok(())
}
#[test]
fn from_url_rejects_unsupported_scheme() {
let url = Url::parse("ftp://example.com/").unwrap();
let err = DataReaderHttp::try_from(&url).unwrap_err();
assert!(err.to_string().contains("unsupported URL scheme"));
}
}