use async_trait::async_trait;
use instant::Instant;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::io;
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use super::{Directory, FileSlice, LazyFileHandle, OwnedBytes, RangeReadFn};
#[derive(Debug, Clone, serde::Serialize)]
pub struct NetworkOp {
pub url: String,
pub bytes: u64,
pub duration_ms: u64,
pub range: Option<(u64, u64)>,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct HttpStats {
pub total_requests: u64,
pub total_bytes: u64,
pub operations: Vec<NetworkOp>,
}
struct StatsTracker {
total_requests: AtomicU64,
total_bytes: AtomicU64,
operations: RwLock<Vec<NetworkOp>>,
}
impl StatsTracker {
fn new() -> Self {
Self {
total_requests: AtomicU64::new(0),
total_bytes: AtomicU64::new(0),
operations: RwLock::new(Vec::new()),
}
}
fn record(&self, url: String, bytes: u64, duration_ms: u64, range: Option<(u64, u64)>) {
self.total_requests.fetch_add(1, Ordering::Relaxed);
self.total_bytes.fetch_add(bytes, Ordering::Relaxed);
self.operations.write().push(NetworkOp {
url,
bytes,
duration_ms,
range,
});
}
fn get_stats(&self) -> HttpStats {
HttpStats {
total_requests: self.total_requests.load(Ordering::Relaxed),
total_bytes: self.total_bytes.load(Ordering::Relaxed),
operations: self.operations.read().clone(),
}
}
fn reset(&self) {
self.total_requests.store(0, Ordering::Relaxed);
self.total_bytes.store(0, Ordering::Relaxed);
self.operations.write().clear();
}
}
pub struct HttpDirectory {
base_url: String,
client: reqwest::Client,
cache: RwLock<HashMap<PathBuf, Arc<Vec<u8>>>>,
stats: Arc<StatsTracker>,
}
impl HttpDirectory {
pub fn new(base_url: impl Into<String>) -> Self {
Self {
base_url: base_url.into(),
client: reqwest::Client::new(),
cache: RwLock::new(HashMap::new()),
stats: Arc::new(StatsTracker::new()),
}
}
pub fn with_client(base_url: impl Into<String>, client: reqwest::Client) -> Self {
Self {
base_url: base_url.into(),
client,
cache: RwLock::new(HashMap::new()),
stats: Arc::new(StatsTracker::new()),
}
}
pub fn http_stats(&self) -> HttpStats {
self.stats.get_stats()
}
pub fn reset_stats(&self) {
self.stats.reset()
}
fn url_for(&self, path: &Path) -> String {
format!("{}/{}", self.base_url, path.display())
}
async fn fetch_bytes(&self, url: &str) -> io::Result<Vec<u8>> {
let start_time = Instant::now();
let response = self
.client
.get(url)
.send()
.await
.map_err(|e| io::Error::other(e.to_string()))?;
if !response.status().is_success() {
return Err(io::Error::new(
io::ErrorKind::NotFound,
format!("HTTP {}: {}", response.status(), url),
));
}
let bytes = response
.bytes()
.await
.map(|b| b.to_vec())
.map_err(|e| io::Error::other(e.to_string()))?;
let duration_ms = start_time.elapsed().as_millis() as u64;
self.stats
.record(url.to_string(), bytes.len() as u64, duration_ms, None);
Ok(bytes)
}
async fn fetch_range(&self, url: &str, range: Range<u64>) -> io::Result<Vec<u8>> {
let start_time = Instant::now();
let range_header = format!("bytes={}-{}", range.start, range.end - 1);
let response = self
.client
.get(url)
.header("Range", range_header)
.send()
.await
.map_err(|e| io::Error::other(e.to_string()))?;
if !response.status().is_success() {
return Err(io::Error::new(
io::ErrorKind::NotFound,
format!("HTTP {}: {}", response.status(), url),
));
}
let bytes = response
.bytes()
.await
.map(|b| b.to_vec())
.map_err(|e| io::Error::other(e.to_string()))?;
let duration_ms = start_time.elapsed().as_millis() as u64;
self.stats.record(
url.to_string(),
bytes.len() as u64,
duration_ms,
Some((range.start, range.end)),
);
Ok(bytes)
}
async fn head_content_length(&self, url: &str) -> io::Result<u64> {
let response = self
.client
.head(url)
.send()
.await
.map_err(|e| io::Error::other(e.to_string()))?;
if !response.status().is_success() {
return Err(io::Error::new(
io::ErrorKind::NotFound,
format!("HTTP {}: {}", response.status(), url),
));
}
response
.headers()
.get("content-length")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse().ok())
.ok_or_else(|| io::Error::other("No Content-Length header"))
}
}
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl Directory for HttpDirectory {
async fn exists(&self, path: &Path) -> io::Result<bool> {
if self.cache.read().contains_key(path) {
return Ok(true);
}
Ok(true)
}
async fn file_size(&self, path: &Path) -> io::Result<u64> {
if let Some(data) = self.cache.read().get(path) {
return Ok(data.len() as u64);
}
let url = self.url_for(path);
self.head_content_length(&url).await
}
async fn open_read(&self, path: &Path) -> io::Result<FileSlice> {
if let Some(data) = self.cache.read().get(path) {
return Ok(FileSlice::new(OwnedBytes::new(data.as_ref().clone())));
}
let url = self.url_for(path);
let data = self.fetch_bytes(&url).await?;
let data = Arc::new(data);
self.cache
.write()
.insert(path.to_path_buf(), Arc::clone(&data));
Ok(FileSlice::new(OwnedBytes::new(data.as_ref().clone())))
}
async fn read_range(&self, path: &Path, range: Range<u64>) -> io::Result<OwnedBytes> {
if let Some(data) = self.cache.read().get(path) {
let start = range.start as usize;
let end = range.end as usize;
if end <= data.len() {
return Ok(OwnedBytes::new(data[start..end].to_vec()));
}
}
let url = self.url_for(path);
let data = self.fetch_range(&url, range).await?;
Ok(OwnedBytes::new(data))
}
async fn list_files(&self, _prefix: &Path) -> io::Result<Vec<PathBuf>> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"HTTP directory does not support file listing",
))
}
async fn open_lazy(&self, path: &Path) -> io::Result<LazyFileHandle> {
let file_size = self.file_size(path).await?;
let url = self.url_for(path);
let client = self.client.clone();
let stats = Arc::clone(&self.stats);
let read_fn: RangeReadFn = Arc::new(move |range: Range<u64>| {
let url = url.clone();
let client = client.clone();
let stats = Arc::clone(&stats);
Box::pin(async move {
let start_time = Instant::now();
let range_header = format!("bytes={}-{}", range.start, range.end - 1);
let response = client
.get(&url)
.header("Range", range_header)
.send()
.await
.map_err(|e| io::Error::other(e.to_string()))?;
if !response.status().is_success() {
return Err(io::Error::new(
io::ErrorKind::NotFound,
format!("HTTP {}", response.status()),
));
}
let bytes = response
.bytes()
.await
.map_err(|e| io::Error::other(e.to_string()))?;
let duration_ms = start_time.elapsed().as_millis() as u64;
stats.record(
url.clone(),
bytes.len() as u64,
duration_ms,
Some((range.start, range.end)),
);
Ok(OwnedBytes::new(bytes.to_vec()))
})
});
Ok(LazyFileHandle::new(file_size, read_fn))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_url_construction() {
let dir = HttpDirectory::new("http://localhost:8080");
assert_eq!(
dir.url_for(Path::new("index/segment.bin")),
"http://localhost:8080/index/segment.bin"
);
}
}