use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::JsFuture;
use web_sys::{Headers, Request, RequestInit, RequestMode, Response};
use oxigdal_core::error::{IoError, OxiGdalError, Result};
use oxigdal_core::io::{ByteRange, DataSource};
use crate::error::{FetchError, WasmError, WasmResult};
pub const DEFAULT_MAX_RETRIES: u32 = 3;
pub const DEFAULT_RETRY_DELAY_MS: u64 = 1000;
pub const DEFAULT_REQUEST_TIMEOUT_MS: u64 = 30000;
#[allow(dead_code)]
pub const DEFAULT_MAX_PARALLEL_REQUESTS: usize = 6;
#[derive(Debug)]
pub struct FetchBackend {
url: String,
size: u64,
supports_range: bool,
}
impl FetchBackend {
pub async fn new(url: String) -> Result<Self> {
let window = web_sys::window().ok_or_else(|| OxiGdalError::Internal {
message: "No window object available".to_string(),
})?;
let opts = RequestInit::new();
opts.set_method("HEAD");
opts.set_mode(RequestMode::Cors);
let request = Request::new_with_str_and_init(&url, &opts).map_err(|e| {
OxiGdalError::Io(IoError::Network {
message: format!("Failed to create request: {:?}", e),
})
})?;
let response = JsFuture::from(window.fetch_with_request(&request))
.await
.map_err(|e| {
OxiGdalError::Io(IoError::Network {
message: format!("Fetch failed: {:?}", e),
})
})?;
let response: Response = response.dyn_into().map_err(|_| OxiGdalError::Internal {
message: "Response is not a Response object".to_string(),
})?;
if !response.ok() {
return Err(OxiGdalError::Io(IoError::Http {
status: response.status(),
message: response.status_text(),
}));
}
let headers = response.headers();
let size = headers
.get("content-length")
.ok()
.flatten()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let supports_range = headers
.get("accept-ranges")
.ok()
.flatten()
.map(|v| v.to_lowercase() == "bytes")
.unwrap_or(false);
Ok(Self {
url,
size,
supports_range,
})
}
#[must_use]
pub fn url(&self) -> &str {
&self.url
}
#[must_use]
pub const fn supports_range(&self) -> bool {
self.supports_range
}
async fn fetch_range_async(&self, range: ByteRange) -> Result<Vec<u8>> {
let window = web_sys::window().ok_or_else(|| OxiGdalError::Internal {
message: "No window object available".to_string(),
})?;
let opts = RequestInit::new();
opts.set_method("GET");
opts.set_mode(RequestMode::Cors);
let headers = Headers::new().map_err(|e| OxiGdalError::Internal {
message: format!("Failed to create headers: {:?}", e),
})?;
headers
.set("Range", &format!("bytes={}-{}", range.start, range.end - 1))
.map_err(|e| OxiGdalError::Internal {
message: format!("Failed to set Range header: {:?}", e),
})?;
opts.set_headers(&headers);
let request = Request::new_with_str_and_init(&self.url, &opts).map_err(|e| {
OxiGdalError::Io(IoError::Network {
message: format!("Failed to create request: {:?}", e),
})
})?;
let response = JsFuture::from(window.fetch_with_request(&request))
.await
.map_err(|e| {
OxiGdalError::Io(IoError::Network {
message: format!("Fetch failed: {:?}", e),
})
})?;
let response: Response = response.dyn_into().map_err(|_| OxiGdalError::Internal {
message: "Response is not a Response object".to_string(),
})?;
if !response.ok() && response.status() != 206 {
return Err(OxiGdalError::Io(IoError::Http {
status: response.status(),
message: response.status_text(),
}));
}
let array_buffer =
JsFuture::from(
response
.array_buffer()
.map_err(|e| OxiGdalError::Internal {
message: format!("Failed to get array buffer: {:?}", e),
})?,
)
.await
.map_err(|e| {
OxiGdalError::Io(IoError::Read {
message: format!("Failed to read response: {:?}", e),
})
})?;
let uint8_array = js_sys::Uint8Array::new(&array_buffer);
Ok(uint8_array.to_vec())
}
}
impl DataSource for FetchBackend {
fn size(&self) -> Result<u64> {
Ok(self.size)
}
fn read_range(&self, _range: ByteRange) -> Result<Vec<u8>> {
Err(OxiGdalError::NotSupported {
operation: "Synchronous read in WASM - use async methods".to_string(),
})
}
fn supports_range_requests(&self) -> bool {
self.supports_range
}
}
impl FetchBackend {
pub async fn read_range_async(&self, range: ByteRange) -> Result<Vec<u8>> {
self.fetch_range_async(range).await
}
pub async fn read_ranges_async(&self, ranges: &[ByteRange]) -> Result<Vec<Vec<u8>>> {
let mut results = Vec::with_capacity(ranges.len());
for range in ranges {
results.push(self.fetch_range_async(*range).await?);
}
Ok(results)
}
}
#[allow(dead_code)]
pub struct AsyncFetchBackend {
inner: FetchBackend,
cache: std::collections::HashMap<(u64, u64), Vec<u8>>,
}
#[allow(dead_code)] impl AsyncFetchBackend {
pub async fn new(url: String) -> Result<Self> {
let inner = FetchBackend::new(url).await?;
Ok(Self {
inner,
cache: std::collections::HashMap::new(),
})
}
pub async fn prefetch(&mut self, range: ByteRange) -> Result<()> {
let data = self.inner.fetch_range_async(range).await?;
self.cache.insert((range.start, range.end), data);
Ok(())
}
pub async fn get_range(&mut self, range: ByteRange) -> Result<Vec<u8>> {
let key = (range.start, range.end);
if let Some(data) = self.cache.get(&key) {
return Ok(data.clone());
}
let data = self.inner.fetch_range_async(range).await?;
self.cache.insert(key, data.clone());
Ok(data)
}
}
#[allow(dead_code)]
pub struct PrefetchedFetchBackend {
url: String,
size: u64,
data: Vec<u8>,
}
#[allow(dead_code)] impl PrefetchedFetchBackend {
pub async fn new(url: String) -> Result<Self> {
let backend = FetchBackend::new(url.clone()).await?;
let size = backend.size;
let data = backend
.fetch_range_async(ByteRange::from_offset_length(0, size))
.await?;
Ok(Self { url, size, data })
}
pub async fn with_header(url: String, header_size: u64) -> Result<Self> {
let backend = FetchBackend::new(url.clone()).await?;
let size = backend.size;
let data = backend
.fetch_range_async(ByteRange::from_offset_length(0, header_size))
.await?;
Ok(Self { url, size, data })
}
}
impl DataSource for PrefetchedFetchBackend {
fn size(&self) -> Result<u64> {
Ok(self.size)
}
fn read_range(&self, range: ByteRange) -> Result<Vec<u8>> {
if range.end as usize > self.data.len() {
return Err(OxiGdalError::Io(IoError::UnexpectedEof {
offset: range.start,
}));
}
Ok(self.data[range.start as usize..range.end as usize].to_vec())
}
}
#[derive(Debug, Clone, Copy)]
pub struct RetryConfig {
pub max_retries: u32,
pub initial_delay_ms: u64,
pub backoff_multiplier: f64,
pub max_delay_ms: u64,
}
impl RetryConfig {
pub const fn new(max_retries: u32, initial_delay_ms: u64) -> Self {
Self {
max_retries,
initial_delay_ms,
backoff_multiplier: 2.0,
max_delay_ms: 60000,
}
}
pub const fn default_config() -> Self {
Self::new(DEFAULT_MAX_RETRIES, DEFAULT_RETRY_DELAY_MS)
}
pub fn delay_for_attempt(&self, attempt: u32) -> u64 {
let delay =
(self.initial_delay_ms as f64 * self.backoff_multiplier.powi(attempt as i32)) as u64;
delay.min(self.max_delay_ms)
}
}
impl Default for RetryConfig {
fn default() -> Self {
Self::default_config()
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct FetchStats {
pub total_requests: u64,
pub successful_requests: u64,
pub failed_requests: u64,
pub retried_requests: u64,
pub bytes_fetched: u64,
pub total_time_ms: f64,
}
impl FetchStats {
pub const fn new() -> Self {
Self {
total_requests: 0,
successful_requests: 0,
failed_requests: 0,
retried_requests: 0,
bytes_fetched: 0,
total_time_ms: 0.0,
}
}
pub fn success_rate(&self) -> f64 {
if self.total_requests == 0 {
0.0
} else {
self.successful_requests as f64 / self.total_requests as f64
}
}
pub fn average_request_time_ms(&self) -> f64 {
if self.total_requests == 0 {
0.0
} else {
self.total_time_ms / self.total_requests as f64
}
}
pub fn average_throughput_bps(&self) -> f64 {
if self.total_time_ms == 0.0 {
0.0
} else {
(self.bytes_fetched as f64 / self.total_time_ms) * 1000.0
}
}
}
impl Default for FetchStats {
fn default() -> Self {
Self::new()
}
}
pub struct EnhancedFetchBackend {
url: String,
size: u64,
supports_range: bool,
retry_config: RetryConfig,
stats: FetchStats,
#[allow(dead_code)]
timeout_ms: u64,
}
impl EnhancedFetchBackend {
pub async fn new(url: String) -> WasmResult<Self> {
Self::with_config(url, RetryConfig::default(), DEFAULT_REQUEST_TIMEOUT_MS).await
}
pub async fn with_config(
url: String,
retry_config: RetryConfig,
timeout_ms: u64,
) -> WasmResult<Self> {
let (size, supports_range) = Self::probe_url(&url, &retry_config).await?;
Ok(Self {
url,
size,
supports_range,
retry_config,
stats: FetchStats::new(),
timeout_ms,
})
}
async fn probe_url(url: &str, retry_config: &RetryConfig) -> WasmResult<(u64, bool)> {
for attempt in 0..=retry_config.max_retries {
match Self::head_request(url).await {
Ok(result) => return Ok(result),
Err(e) => {
if attempt < retry_config.max_retries {
let delay = retry_config.delay_for_attempt(attempt);
Self::sleep_ms(delay).await;
} else {
return Err(e);
}
}
}
}
Err(WasmError::Fetch(FetchError::RetryLimitExceeded {
url: url.to_string(),
attempts: retry_config.max_retries + 1,
}))
}
async fn head_request(url: &str) -> WasmResult<(u64, bool)> {
let window = web_sys::window().ok_or_else(|| {
WasmError::Fetch(FetchError::NetworkFailure {
url: url.to_string(),
message: "No window object available".to_string(),
})
})?;
let opts = RequestInit::new();
opts.set_method("HEAD");
opts.set_mode(RequestMode::Cors);
let request = Request::new_with_str_and_init(url, &opts).map_err(|e| {
WasmError::Fetch(FetchError::NetworkFailure {
url: url.to_string(),
message: format!("Failed to create request: {e:?}"),
})
})?;
let response = JsFuture::from(window.fetch_with_request(&request))
.await
.map_err(|e| {
WasmError::Fetch(FetchError::NetworkFailure {
url: url.to_string(),
message: format!("Fetch failed: {e:?}"),
})
})?;
let response: Response = response.dyn_into().map_err(|_| {
WasmError::Fetch(FetchError::ParseError {
expected: "Response".to_string(),
message: "Not a Response object".to_string(),
})
})?;
if !response.ok() {
return Err(WasmError::Fetch(FetchError::HttpError {
status: response.status(),
status_text: response.status_text(),
url: url.to_string(),
}));
}
let headers = response.headers();
let size = headers
.get("content-length")
.ok()
.flatten()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let supports_range = headers
.get("accept-ranges")
.ok()
.flatten()
.map(|v| v.to_lowercase() == "bytes")
.unwrap_or(false);
Ok((size, supports_range))
}
async fn sleep_ms(ms: u64) {
let promise = js_sys::Promise::new(&mut |resolve, _reject| {
let window = web_sys::window().expect("Window exists");
let _ =
window.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, ms as i32);
});
let _ = JsFuture::from(promise).await;
}
pub async fn fetch_range_with_retry(&mut self, range: ByteRange) -> WasmResult<Vec<u8>> {
let start_time = self.current_time_ms();
for attempt in 0..=self.retry_config.max_retries {
self.stats.total_requests += 1;
match self.fetch_range_once(range).await {
Ok(data) => {
let elapsed = self.current_time_ms() - start_time;
self.stats.successful_requests += 1;
self.stats.bytes_fetched += data.len() as u64;
self.stats.total_time_ms += elapsed;
if attempt > 0 {
self.stats.retried_requests += 1;
}
return Ok(data);
}
Err(e) => {
if attempt < self.retry_config.max_retries {
let delay = self.retry_config.delay_for_attempt(attempt);
Self::sleep_ms(delay).await;
} else {
self.stats.failed_requests += 1;
return Err(e);
}
}
}
}
Err(WasmError::Fetch(FetchError::RetryLimitExceeded {
url: self.url.clone(),
attempts: self.retry_config.max_retries + 1,
}))
}
async fn fetch_range_once(&self, range: ByteRange) -> WasmResult<Vec<u8>> {
let window = web_sys::window().ok_or_else(|| {
WasmError::Fetch(FetchError::NetworkFailure {
url: self.url.clone(),
message: "No window object available".to_string(),
})
})?;
let opts = RequestInit::new();
opts.set_method("GET");
opts.set_mode(RequestMode::Cors);
let headers = Headers::new().map_err(|e| {
WasmError::Fetch(FetchError::NetworkFailure {
url: self.url.clone(),
message: format!("Failed to create headers: {e:?}"),
})
})?;
headers
.set("Range", &format!("bytes={}-{}", range.start, range.end - 1))
.map_err(|e| {
WasmError::Fetch(FetchError::NetworkFailure {
url: self.url.clone(),
message: format!("Failed to set Range header: {e:?}"),
})
})?;
opts.set_headers(&headers);
let request = Request::new_with_str_and_init(&self.url, &opts).map_err(|e| {
WasmError::Fetch(FetchError::NetworkFailure {
url: self.url.clone(),
message: format!("Failed to create request: {e:?}"),
})
})?;
let response = JsFuture::from(window.fetch_with_request(&request))
.await
.map_err(|e| {
WasmError::Fetch(FetchError::NetworkFailure {
url: self.url.clone(),
message: format!("Fetch failed: {e:?}"),
})
})?;
let response: Response = response.dyn_into().map_err(|_| {
WasmError::Fetch(FetchError::ParseError {
expected: "Response".to_string(),
message: "Not a Response object".to_string(),
})
})?;
if !response.ok() && response.status() != 206 {
return Err(WasmError::Fetch(FetchError::HttpError {
status: response.status(),
status_text: response.status_text(),
url: self.url.clone(),
}));
}
let array_buffer = JsFuture::from(response.array_buffer().map_err(|e| {
WasmError::Fetch(FetchError::NetworkFailure {
url: self.url.clone(),
message: format!("Failed to get array buffer: {e:?}"),
})
})?)
.await
.map_err(|e| {
WasmError::Fetch(FetchError::NetworkFailure {
url: self.url.clone(),
message: format!("Failed to read response: {e:?}"),
})
})?;
let uint8_array = js_sys::Uint8Array::new(&array_buffer);
let data = uint8_array.to_vec();
let expected_size = (range.end - range.start) as usize;
if data.len() != expected_size {
return Err(WasmError::Fetch(FetchError::InvalidSize {
expected: expected_size as u64,
actual: data.len() as u64,
}));
}
Ok(data)
}
pub async fn fetch_ranges_parallel(
&mut self,
ranges: &[ByteRange],
max_parallel: usize,
) -> WasmResult<Vec<Vec<u8>>> {
let mut results = Vec::with_capacity(ranges.len());
let mut pending = Vec::new();
for (i, &range) in ranges.iter().enumerate() {
pending.push((i, range));
if pending.len() >= max_parallel || i == ranges.len() - 1 {
let mut batch_results = Vec::new();
for (_idx, range) in &pending {
let data = self.fetch_range_with_retry(*range).await?;
batch_results.push(data);
}
results.extend(batch_results);
pending.clear();
}
}
Ok(results)
}
fn current_time_ms(&self) -> f64 {
js_sys::Date::now()
}
pub const fn stats(&self) -> &FetchStats {
&self.stats
}
pub fn url(&self) -> &str {
&self.url
}
pub const fn size(&self) -> u64 {
self.size
}
pub const fn supports_range(&self) -> bool {
self.supports_range
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum RequestPriority {
Low,
Normal,
High,
Critical,
}
#[derive(Debug, Clone)]
pub struct PrioritizedRequest {
pub range: ByteRange,
pub priority: RequestPriority,
pub id: u64,
}
impl PrioritizedRequest {
pub const fn new(range: ByteRange, priority: RequestPriority, id: u64) -> Self {
Self {
range,
priority,
id,
}
}
}
impl PartialEq for PrioritizedRequest {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl Eq for PrioritizedRequest {}
impl PartialOrd for PrioritizedRequest {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for PrioritizedRequest {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.priority.cmp(&other.priority)
}
}
pub struct RequestQueue {
requests: Vec<PrioritizedRequest>,
next_id: u64,
completed: HashMap<u64, Vec<u8>>,
}
impl RequestQueue {
pub fn new() -> Self {
Self {
requests: Vec::new(),
next_id: 0,
completed: HashMap::new(),
}
}
pub fn add(&mut self, range: ByteRange, priority: RequestPriority) -> u64 {
let id = self.next_id;
self.next_id += 1;
let request = PrioritizedRequest::new(range, priority, id);
self.requests.push(request);
self.requests.sort();
id
}
pub fn next(&mut self) -> Option<PrioritizedRequest> {
self.requests.pop()
}
pub fn complete(&mut self, id: u64, data: Vec<u8>) {
self.completed.insert(id, data);
}
pub fn get_completed(&self, id: u64) -> Option<&Vec<u8>> {
self.completed.get(&id)
}
pub fn pending_count(&self) -> usize {
self.requests.len()
}
pub fn clear_completed(&mut self) {
self.completed.clear();
}
}
impl Default for RequestQueue {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_retry_config() {
let config = RetryConfig::new(3, 1000);
assert_eq!(config.delay_for_attempt(0), 1000);
assert_eq!(config.delay_for_attempt(1), 2000);
assert_eq!(config.delay_for_attempt(2), 4000);
}
#[test]
fn test_fetch_stats() {
let mut stats = FetchStats::new();
stats.total_requests = 10;
stats.successful_requests = 8;
stats.bytes_fetched = 1000;
stats.total_time_ms = 100.0;
assert_eq!(stats.success_rate(), 0.8);
assert_eq!(stats.average_request_time_ms(), 10.0);
assert_eq!(stats.average_throughput_bps(), 10000.0);
}
#[test]
fn test_request_priority() {
let low = PrioritizedRequest::new(
ByteRange::from_offset_length(0, 100),
RequestPriority::Low,
1,
);
let high = PrioritizedRequest::new(
ByteRange::from_offset_length(0, 100),
RequestPriority::High,
2,
);
assert!(high > low);
}
#[test]
fn test_request_queue() {
let mut queue = RequestQueue::new();
let _id1 = queue.add(ByteRange::from_offset_length(0, 100), RequestPriority::Low);
let id2 = queue.add(
ByteRange::from_offset_length(100, 100),
RequestPriority::High,
);
let next = queue.next().expect("Should have request");
assert_eq!(next.id, id2);
queue.complete(id2, vec![1, 2, 3]);
assert!(queue.get_completed(id2).is_some());
assert_eq!(queue.pending_count(), 1);
}
}