#![cfg(feature = "url")]
use std::collections::HashMap;
use std::io::{self, Read, Seek, SeekFrom};
use std::sync::Mutex;
use ureq::Agent;
const DEFAULT_BLOCK_SIZE: u64 = 64 * 1024;
#[derive(Debug, Clone)]
struct CachedBlock
{
#[allow(dead_code)]
offset: u64,
data: Vec<u8>,
}
pub struct RemoteReader
{
url: String,
agent: Agent,
cache: Mutex<HashMap<u64, CachedBlock>>,
pos: u64,
file_size: Option<u64>,
block_size: u64,
}
impl RemoteReader
{
pub fn new(url: impl Into<String>) -> io::Result<Self>
{
let url = url.into();
let agent = Agent::new_with_defaults();
let file_size = Self::get_file_size_for_url(&agent, &url)?;
Ok(Self {
url,
agent,
cache: Mutex::new(HashMap::new()),
pos: 0,
file_size: Some(file_size),
block_size: DEFAULT_BLOCK_SIZE,
})
}
fn get_file_size_for_url(agent: &Agent, url: &str) -> io::Result<u64>
{
let response = agent.head(url).call().map_err(|e| {
io::Error::new(
io::ErrorKind::ConnectionRefused,
format!("HTTP HEAD request failed: {}", e),
)
})?;
let content_length = response
.headers()
.get("Content-Length")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
"Missing or invalid Content-Length header",
)
})?;
Ok(content_length)
}
pub fn with_block_size(mut self, size: u64) -> Self
{
self.block_size = size;
self
}
fn get_file_size(&self) -> io::Result<u64>
{
if let Some(size) = self.file_size
{
return Ok(size);
}
let response = self.agent.head(&self.url).call().map_err(|e| {
io::Error::new(
io::ErrorKind::ConnectionRefused,
format!("HTTP HEAD request failed: {}", e),
)
})?;
let content_length = response
.headers()
.get("Content-Length")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
"Missing or invalid Content-Length header",
)
})?;
Ok(content_length)
}
fn block_start(&self, pos: u64) -> u64
{
(pos / self.block_size) * self.block_size
}
fn fetch_block(&self, offset: u64) -> io::Result<CachedBlock>
{
let file_size = self.get_file_size()?;
let end = std::cmp::min(offset + self.block_size - 1, file_size.saturating_sub(1));
let range = if offset >= file_size
{
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Seek beyond end of file"));
}
else if end < offset
{
format!("bytes={0}-", offset)
}
else
{
format!("bytes={}-{}", offset, end)
};
let response = self
.agent
.get(&self.url)
.header("Range", &range)
.call()
.map_err(|e| {
io::Error::new(
io::ErrorKind::ConnectionRefused,
format!("HTTP GET request failed: {}", e),
)
})?;
let status = response.status();
if status != 206 && status != 200
{
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Unexpected HTTP status: {}", status),
));
}
let data = response.into_body().read_to_vec().map_err(|e| {
io::Error::new(
io::ErrorKind::ConnectionRefused,
format!("Failed to read response body: {}", e),
)
})?;
Ok(CachedBlock { offset, data })
}
fn get_data_at(&self, offset: u64) -> io::Result<Vec<u8>>
{
let block_start = self.block_start(offset);
if !self
.cache
.lock()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Cache lock poisoned"))?
.contains_key(&block_start)
{
let block = self.fetch_block(block_start)?;
let mut cache = self
.cache
.lock()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Cache lock poisoned"))?;
cache.insert(block_start, block);
}
let cache = self
.cache
.lock()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Cache lock poisoned"))?;
let block = cache.get(&block_start).unwrap();
let offset_in_block = (offset - block_start) as usize;
Ok(block.data[offset_in_block..].to_vec())
}
}
impl Read for RemoteReader
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize>
{
let file_size = self.get_file_size()?;
if self.pos >= file_size
{
return Ok(0);
}
let remaining = file_size - self.pos;
let to_read = std::cmp::min(buf.len() as u64, remaining) as usize;
let data = self.get_data_at(self.pos)?;
let actual_read = std::cmp::min(to_read, data.len());
buf[..actual_read].copy_from_slice(&data[..actual_read]);
self.pos += actual_read as u64;
Ok(actual_read)
}
}
impl Seek for RemoteReader
{
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64>
{
let file_size = self.get_file_size().ok();
self.pos = match pos
{
SeekFrom::Start(n) => n,
SeekFrom::End(offset) =>
{
let size = file_size
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "Unknown file size"))?;
let offset_i64 = offset as i64;
if offset_i64 < 0
{
size.checked_sub(offset_i64.unsigned_abs()).ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "Seek before file start")
})?
}
else
{
size.checked_add(offset as u64).ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "Seek overflow")
})?
}
}
SeekFrom::Current(offset) =>
{
let offset_i64 = offset as i64;
if offset_i64 < 0
{
self.pos
.checked_sub(offset_i64.unsigned_abs())
.ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "Seek before file start")
})?
}
else
{
self.pos.checked_add(offset as u64).ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "Seek overflow")
})?
}
}
};
Ok(self.pos)
}
}
#[cfg(test)]
mod tests
{
use super::*;
#[test]
fn test_block_start()
{
let url = "http://example.com/test";
let agent = Agent::new_with_defaults();
let reader = RemoteReader {
url: url.to_string(),
agent,
cache: Mutex::new(HashMap::new()),
pos: 0,
file_size: None,
block_size: DEFAULT_BLOCK_SIZE,
};
assert_eq!(reader.block_start(0), 0);
assert_eq!(reader.block_start(100), 0);
assert_eq!(reader.block_start(65536), 65536);
assert_eq!(reader.block_start(70000), 65536);
assert_eq!(reader.block_start(131072), 131072);
}
#[test]
fn test_block_start_custom_size()
{
let url = "http://example.com/test";
let agent = Agent::new_with_defaults();
let reader = RemoteReader {
url: url.to_string(),
agent,
cache: Mutex::new(HashMap::new()),
pos: 0,
file_size: None,
block_size: 1024,
};
assert_eq!(reader.block_start(0), 0);
assert_eq!(reader.block_start(100), 0);
assert_eq!(reader.block_start(1024), 1024);
assert_eq!(reader.block_start(2000), 1024);
}
}