1use serde::{Deserialize, Serialize};
115use std::collections::HashMap;
116use wasm_bindgen::prelude::*;
117use wasm_bindgen_futures::JsFuture;
118use web_sys::{Headers, Request, RequestInit, RequestMode, Response};
119
120use oxigdal_core::error::{IoError, OxiGdalError, Result};
121use oxigdal_core::io::{ByteRange, DataSource};
122
123use crate::error::{FetchError, WasmError, WasmResult};
124
125pub const DEFAULT_MAX_RETRIES: u32 = 3;
127
128pub const DEFAULT_RETRY_DELAY_MS: u64 = 1000;
130
131pub const DEFAULT_REQUEST_TIMEOUT_MS: u64 = 30000;
133
134#[allow(dead_code)]
136pub const DEFAULT_MAX_PARALLEL_REQUESTS: usize = 6;
137
138#[derive(Debug)]
140pub struct FetchBackend {
141 url: String,
142 size: u64,
143 supports_range: bool,
144}
145
146impl FetchBackend {
147 pub async fn new(url: String) -> Result<Self> {
151 let window = web_sys::window().ok_or_else(|| OxiGdalError::Internal {
153 message: "No window object available".to_string(),
154 })?;
155
156 let opts = RequestInit::new();
157 opts.set_method("HEAD");
158 opts.set_mode(RequestMode::Cors);
159
160 let request = Request::new_with_str_and_init(&url, &opts).map_err(|e| {
161 OxiGdalError::Io(IoError::Network {
162 message: format!("Failed to create request: {:?}", e),
163 })
164 })?;
165
166 let response = JsFuture::from(window.fetch_with_request(&request))
167 .await
168 .map_err(|e| {
169 OxiGdalError::Io(IoError::Network {
170 message: format!("Fetch failed: {:?}", e),
171 })
172 })?;
173
174 let response: Response = response.dyn_into().map_err(|_| OxiGdalError::Internal {
175 message: "Response is not a Response object".to_string(),
176 })?;
177
178 if !response.ok() {
179 return Err(OxiGdalError::Io(IoError::Http {
180 status: response.status(),
181 message: response.status_text(),
182 }));
183 }
184
185 let headers = response.headers();
187 let size = headers
188 .get("content-length")
189 .ok()
190 .flatten()
191 .and_then(|s| s.parse::<u64>().ok())
192 .unwrap_or(0);
193
194 let supports_range = headers
196 .get("accept-ranges")
197 .ok()
198 .flatten()
199 .map(|v| v.to_lowercase() == "bytes")
200 .unwrap_or(false);
201
202 Ok(Self {
203 url,
204 size,
205 supports_range,
206 })
207 }
208
209 #[must_use]
211 pub fn url(&self) -> &str {
212 &self.url
213 }
214
215 #[must_use]
217 pub const fn supports_range(&self) -> bool {
218 self.supports_range
219 }
220
221 async fn fetch_range_async(&self, range: ByteRange) -> Result<Vec<u8>> {
223 let window = web_sys::window().ok_or_else(|| OxiGdalError::Internal {
224 message: "No window object available".to_string(),
225 })?;
226
227 let opts = RequestInit::new();
228 opts.set_method("GET");
229 opts.set_mode(RequestMode::Cors);
230
231 let headers = Headers::new().map_err(|e| OxiGdalError::Internal {
232 message: format!("Failed to create headers: {:?}", e),
233 })?;
234
235 headers
236 .set("Range", &format!("bytes={}-{}", range.start, range.end - 1))
237 .map_err(|e| OxiGdalError::Internal {
238 message: format!("Failed to set Range header: {:?}", e),
239 })?;
240
241 opts.set_headers(&headers);
242
243 let request = Request::new_with_str_and_init(&self.url, &opts).map_err(|e| {
244 OxiGdalError::Io(IoError::Network {
245 message: format!("Failed to create request: {:?}", e),
246 })
247 })?;
248
249 let response = JsFuture::from(window.fetch_with_request(&request))
250 .await
251 .map_err(|e| {
252 OxiGdalError::Io(IoError::Network {
253 message: format!("Fetch failed: {:?}", e),
254 })
255 })?;
256
257 let response: Response = response.dyn_into().map_err(|_| OxiGdalError::Internal {
258 message: "Response is not a Response object".to_string(),
259 })?;
260
261 if !response.ok() && response.status() != 206 {
262 return Err(OxiGdalError::Io(IoError::Http {
263 status: response.status(),
264 message: response.status_text(),
265 }));
266 }
267
268 let array_buffer =
269 JsFuture::from(
270 response
271 .array_buffer()
272 .map_err(|e| OxiGdalError::Internal {
273 message: format!("Failed to get array buffer: {:?}", e),
274 })?,
275 )
276 .await
277 .map_err(|e| {
278 OxiGdalError::Io(IoError::Read {
279 message: format!("Failed to read response: {:?}", e),
280 })
281 })?;
282
283 let uint8_array = js_sys::Uint8Array::new(&array_buffer);
284 Ok(uint8_array.to_vec())
285 }
286}
287
288impl DataSource for FetchBackend {
289 fn size(&self) -> Result<u64> {
290 Ok(self.size)
291 }
292
293 fn read_range(&self, _range: ByteRange) -> Result<Vec<u8>> {
294 Err(OxiGdalError::NotSupported {
301 operation: "Synchronous read in WASM - use async methods".to_string(),
302 })
303 }
304
305 fn supports_range_requests(&self) -> bool {
306 self.supports_range
307 }
308}
309
310impl FetchBackend {
312 pub async fn read_range_async(&self, range: ByteRange) -> Result<Vec<u8>> {
314 self.fetch_range_async(range).await
315 }
316
317 pub async fn read_ranges_async(&self, ranges: &[ByteRange]) -> Result<Vec<Vec<u8>>> {
319 let mut results = Vec::with_capacity(ranges.len());
320 for range in ranges {
321 results.push(self.fetch_range_async(*range).await?);
322 }
323 Ok(results)
324 }
325}
326
327#[allow(dead_code)]
329pub struct AsyncFetchBackend {
330 inner: FetchBackend,
331 cache: std::collections::HashMap<(u64, u64), Vec<u8>>,
332}
333
334#[allow(dead_code)] impl AsyncFetchBackend {
336 pub async fn new(url: String) -> Result<Self> {
338 let inner = FetchBackend::new(url).await?;
339 Ok(Self {
340 inner,
341 cache: std::collections::HashMap::new(),
342 })
343 }
344
345 pub async fn prefetch(&mut self, range: ByteRange) -> Result<()> {
347 let data = self.inner.fetch_range_async(range).await?;
348 self.cache.insert((range.start, range.end), data);
349 Ok(())
350 }
351
352 pub async fn get_range(&mut self, range: ByteRange) -> Result<Vec<u8>> {
354 let key = (range.start, range.end);
355 if let Some(data) = self.cache.get(&key) {
356 return Ok(data.clone());
357 }
358
359 let data = self.inner.fetch_range_async(range).await?;
360 self.cache.insert(key, data.clone());
361 Ok(data)
362 }
363}
364
365#[allow(dead_code)]
367pub struct PrefetchedFetchBackend {
368 url: String,
369 size: u64,
370 data: Vec<u8>,
371}
372
373#[allow(dead_code)] impl PrefetchedFetchBackend {
375 pub async fn new(url: String) -> Result<Self> {
377 let backend = FetchBackend::new(url.clone()).await?;
378 let size = backend.size;
379
380 let data = backend
382 .fetch_range_async(ByteRange::from_offset_length(0, size))
383 .await?;
384
385 Ok(Self { url, size, data })
386 }
387
388 pub async fn with_header(url: String, header_size: u64) -> Result<Self> {
390 let backend = FetchBackend::new(url.clone()).await?;
391 let size = backend.size;
392
393 let data = backend
394 .fetch_range_async(ByteRange::from_offset_length(0, header_size))
395 .await?;
396
397 Ok(Self { url, size, data })
398 }
399}
400
401impl DataSource for PrefetchedFetchBackend {
402 fn size(&self) -> Result<u64> {
403 Ok(self.size)
404 }
405
406 fn read_range(&self, range: ByteRange) -> Result<Vec<u8>> {
407 if range.end as usize > self.data.len() {
408 return Err(OxiGdalError::Io(IoError::UnexpectedEof {
409 offset: range.start,
410 }));
411 }
412 Ok(self.data[range.start as usize..range.end as usize].to_vec())
413 }
414}
415
416#[derive(Debug, Clone, Copy)]
418pub struct RetryConfig {
419 pub max_retries: u32,
421 pub initial_delay_ms: u64,
423 pub backoff_multiplier: f64,
425 pub max_delay_ms: u64,
427}
428
429impl RetryConfig {
430 pub const fn new(max_retries: u32, initial_delay_ms: u64) -> Self {
432 Self {
433 max_retries,
434 initial_delay_ms,
435 backoff_multiplier: 2.0,
436 max_delay_ms: 60000,
437 }
438 }
439
440 pub const fn default_config() -> Self {
442 Self::new(DEFAULT_MAX_RETRIES, DEFAULT_RETRY_DELAY_MS)
443 }
444
445 pub fn delay_for_attempt(&self, attempt: u32) -> u64 {
447 let delay =
448 (self.initial_delay_ms as f64 * self.backoff_multiplier.powi(attempt as i32)) as u64;
449 delay.min(self.max_delay_ms)
450 }
451}
452
453impl Default for RetryConfig {
454 fn default() -> Self {
455 Self::default_config()
456 }
457}
458
459#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
461pub struct FetchStats {
462 pub total_requests: u64,
464 pub successful_requests: u64,
466 pub failed_requests: u64,
468 pub retried_requests: u64,
470 pub bytes_fetched: u64,
472 pub total_time_ms: f64,
474}
475
476impl FetchStats {
477 pub const fn new() -> Self {
479 Self {
480 total_requests: 0,
481 successful_requests: 0,
482 failed_requests: 0,
483 retried_requests: 0,
484 bytes_fetched: 0,
485 total_time_ms: 0.0,
486 }
487 }
488
489 pub fn success_rate(&self) -> f64 {
491 if self.total_requests == 0 {
492 0.0
493 } else {
494 self.successful_requests as f64 / self.total_requests as f64
495 }
496 }
497
498 pub fn average_request_time_ms(&self) -> f64 {
500 if self.total_requests == 0 {
501 0.0
502 } else {
503 self.total_time_ms / self.total_requests as f64
504 }
505 }
506
507 pub fn average_throughput_bps(&self) -> f64 {
509 if self.total_time_ms == 0.0 {
510 0.0
511 } else {
512 (self.bytes_fetched as f64 / self.total_time_ms) * 1000.0
513 }
514 }
515}
516
517impl Default for FetchStats {
518 fn default() -> Self {
519 Self::new()
520 }
521}
522
523pub struct EnhancedFetchBackend {
525 url: String,
527 size: u64,
529 supports_range: bool,
531 retry_config: RetryConfig,
533 stats: FetchStats,
535 #[allow(dead_code)]
537 timeout_ms: u64,
538}
539
540impl EnhancedFetchBackend {
541 pub async fn new(url: String) -> WasmResult<Self> {
543 Self::with_config(url, RetryConfig::default(), DEFAULT_REQUEST_TIMEOUT_MS).await
544 }
545
546 pub async fn with_config(
548 url: String,
549 retry_config: RetryConfig,
550 timeout_ms: u64,
551 ) -> WasmResult<Self> {
552 let (size, supports_range) = Self::probe_url(&url, &retry_config).await?;
553
554 Ok(Self {
555 url,
556 size,
557 supports_range,
558 retry_config,
559 stats: FetchStats::new(),
560 timeout_ms,
561 })
562 }
563
564 async fn probe_url(url: &str, retry_config: &RetryConfig) -> WasmResult<(u64, bool)> {
566 for attempt in 0..=retry_config.max_retries {
567 match Self::head_request(url).await {
568 Ok(result) => return Ok(result),
569 Err(e) => {
570 if attempt < retry_config.max_retries {
571 let delay = retry_config.delay_for_attempt(attempt);
572 Self::sleep_ms(delay).await;
573 } else {
574 return Err(e);
575 }
576 }
577 }
578 }
579
580 Err(WasmError::Fetch(FetchError::RetryLimitExceeded {
581 url: url.to_string(),
582 attempts: retry_config.max_retries + 1,
583 }))
584 }
585
586 async fn head_request(url: &str) -> WasmResult<(u64, bool)> {
588 let window = web_sys::window().ok_or_else(|| {
589 WasmError::Fetch(FetchError::NetworkFailure {
590 url: url.to_string(),
591 message: "No window object available".to_string(),
592 })
593 })?;
594
595 let opts = RequestInit::new();
596 opts.set_method("HEAD");
597 opts.set_mode(RequestMode::Cors);
598
599 let request = Request::new_with_str_and_init(url, &opts).map_err(|e| {
600 WasmError::Fetch(FetchError::NetworkFailure {
601 url: url.to_string(),
602 message: format!("Failed to create request: {e:?}"),
603 })
604 })?;
605
606 let response = JsFuture::from(window.fetch_with_request(&request))
607 .await
608 .map_err(|e| {
609 WasmError::Fetch(FetchError::NetworkFailure {
610 url: url.to_string(),
611 message: format!("Fetch failed: {e:?}"),
612 })
613 })?;
614
615 let response: Response = response.dyn_into().map_err(|_| {
616 WasmError::Fetch(FetchError::ParseError {
617 expected: "Response".to_string(),
618 message: "Not a Response object".to_string(),
619 })
620 })?;
621
622 if !response.ok() {
623 return Err(WasmError::Fetch(FetchError::HttpError {
624 status: response.status(),
625 status_text: response.status_text(),
626 url: url.to_string(),
627 }));
628 }
629
630 let headers = response.headers();
631 let size = headers
632 .get("content-length")
633 .ok()
634 .flatten()
635 .and_then(|s| s.parse::<u64>().ok())
636 .unwrap_or(0);
637
638 let supports_range = headers
639 .get("accept-ranges")
640 .ok()
641 .flatten()
642 .map(|v| v.to_lowercase() == "bytes")
643 .unwrap_or(false);
644
645 Ok((size, supports_range))
646 }
647
648 async fn sleep_ms(ms: u64) {
650 let promise = js_sys::Promise::new(&mut |resolve, _reject| {
651 let window = web_sys::window().expect("Window exists");
652 let _ =
653 window.set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, ms as i32);
654 });
655
656 let _ = JsFuture::from(promise).await;
657 }
658
659 pub async fn fetch_range_with_retry(&mut self, range: ByteRange) -> WasmResult<Vec<u8>> {
661 let start_time = self.current_time_ms();
662
663 for attempt in 0..=self.retry_config.max_retries {
664 self.stats.total_requests += 1;
665
666 match self.fetch_range_once(range).await {
667 Ok(data) => {
668 let elapsed = self.current_time_ms() - start_time;
669 self.stats.successful_requests += 1;
670 self.stats.bytes_fetched += data.len() as u64;
671 self.stats.total_time_ms += elapsed;
672
673 if attempt > 0 {
674 self.stats.retried_requests += 1;
675 }
676
677 return Ok(data);
678 }
679 Err(e) => {
680 if attempt < self.retry_config.max_retries {
681 let delay = self.retry_config.delay_for_attempt(attempt);
682 Self::sleep_ms(delay).await;
683 } else {
684 self.stats.failed_requests += 1;
685 return Err(e);
686 }
687 }
688 }
689 }
690
691 Err(WasmError::Fetch(FetchError::RetryLimitExceeded {
692 url: self.url.clone(),
693 attempts: self.retry_config.max_retries + 1,
694 }))
695 }
696
697 async fn fetch_range_once(&self, range: ByteRange) -> WasmResult<Vec<u8>> {
699 let window = web_sys::window().ok_or_else(|| {
700 WasmError::Fetch(FetchError::NetworkFailure {
701 url: self.url.clone(),
702 message: "No window object available".to_string(),
703 })
704 })?;
705
706 let opts = RequestInit::new();
707 opts.set_method("GET");
708 opts.set_mode(RequestMode::Cors);
709
710 let headers = Headers::new().map_err(|e| {
711 WasmError::Fetch(FetchError::NetworkFailure {
712 url: self.url.clone(),
713 message: format!("Failed to create headers: {e:?}"),
714 })
715 })?;
716
717 headers
718 .set("Range", &format!("bytes={}-{}", range.start, range.end - 1))
719 .map_err(|e| {
720 WasmError::Fetch(FetchError::NetworkFailure {
721 url: self.url.clone(),
722 message: format!("Failed to set Range header: {e:?}"),
723 })
724 })?;
725
726 opts.set_headers(&headers);
727
728 let request = Request::new_with_str_and_init(&self.url, &opts).map_err(|e| {
729 WasmError::Fetch(FetchError::NetworkFailure {
730 url: self.url.clone(),
731 message: format!("Failed to create request: {e:?}"),
732 })
733 })?;
734
735 let response = JsFuture::from(window.fetch_with_request(&request))
736 .await
737 .map_err(|e| {
738 WasmError::Fetch(FetchError::NetworkFailure {
739 url: self.url.clone(),
740 message: format!("Fetch failed: {e:?}"),
741 })
742 })?;
743
744 let response: Response = response.dyn_into().map_err(|_| {
745 WasmError::Fetch(FetchError::ParseError {
746 expected: "Response".to_string(),
747 message: "Not a Response object".to_string(),
748 })
749 })?;
750
751 if !response.ok() && response.status() != 206 {
752 return Err(WasmError::Fetch(FetchError::HttpError {
753 status: response.status(),
754 status_text: response.status_text(),
755 url: self.url.clone(),
756 }));
757 }
758
759 let array_buffer = JsFuture::from(response.array_buffer().map_err(|e| {
760 WasmError::Fetch(FetchError::NetworkFailure {
761 url: self.url.clone(),
762 message: format!("Failed to get array buffer: {e:?}"),
763 })
764 })?)
765 .await
766 .map_err(|e| {
767 WasmError::Fetch(FetchError::NetworkFailure {
768 url: self.url.clone(),
769 message: format!("Failed to read response: {e:?}"),
770 })
771 })?;
772
773 let uint8_array = js_sys::Uint8Array::new(&array_buffer);
774 let data = uint8_array.to_vec();
775
776 let expected_size = (range.end - range.start) as usize;
778 if data.len() != expected_size {
779 return Err(WasmError::Fetch(FetchError::InvalidSize {
780 expected: expected_size as u64,
781 actual: data.len() as u64,
782 }));
783 }
784
785 Ok(data)
786 }
787
788 pub async fn fetch_ranges_parallel(
790 &mut self,
791 ranges: &[ByteRange],
792 max_parallel: usize,
793 ) -> WasmResult<Vec<Vec<u8>>> {
794 let mut results = Vec::with_capacity(ranges.len());
795 let mut pending = Vec::new();
796
797 for (i, &range) in ranges.iter().enumerate() {
798 pending.push((i, range));
799
800 if pending.len() >= max_parallel || i == ranges.len() - 1 {
801 let mut batch_results = Vec::new();
803 for (_idx, range) in &pending {
804 let data = self.fetch_range_with_retry(*range).await?;
805 batch_results.push(data);
806 }
807
808 results.extend(batch_results);
809 pending.clear();
810 }
811 }
812
813 Ok(results)
814 }
815
816 fn current_time_ms(&self) -> f64 {
818 js_sys::Date::now()
819 }
820
821 pub const fn stats(&self) -> &FetchStats {
823 &self.stats
824 }
825
826 pub fn url(&self) -> &str {
828 &self.url
829 }
830
831 pub const fn size(&self) -> u64 {
833 self.size
834 }
835
836 pub const fn supports_range(&self) -> bool {
838 self.supports_range
839 }
840}
841
842#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
844pub enum RequestPriority {
845 Low,
847 Normal,
849 High,
851 Critical,
853}
854
855#[derive(Debug, Clone)]
857pub struct PrioritizedRequest {
858 pub range: ByteRange,
860 pub priority: RequestPriority,
862 pub id: u64,
864}
865
866impl PrioritizedRequest {
867 pub const fn new(range: ByteRange, priority: RequestPriority, id: u64) -> Self {
869 Self {
870 range,
871 priority,
872 id,
873 }
874 }
875}
876
877impl PartialEq for PrioritizedRequest {
878 fn eq(&self, other: &Self) -> bool {
879 self.id == other.id
880 }
881}
882
883impl Eq for PrioritizedRequest {}
884
885impl PartialOrd for PrioritizedRequest {
886 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
887 Some(self.cmp(other))
888 }
889}
890
891impl Ord for PrioritizedRequest {
892 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
893 self.priority.cmp(&other.priority)
895 }
896}
897
898pub struct RequestQueue {
900 requests: Vec<PrioritizedRequest>,
902 next_id: u64,
904 completed: HashMap<u64, Vec<u8>>,
906}
907
908impl RequestQueue {
909 pub fn new() -> Self {
911 Self {
912 requests: Vec::new(),
913 next_id: 0,
914 completed: HashMap::new(),
915 }
916 }
917
918 pub fn add(&mut self, range: ByteRange, priority: RequestPriority) -> u64 {
920 let id = self.next_id;
921 self.next_id += 1;
922
923 let request = PrioritizedRequest::new(range, priority, id);
924 self.requests.push(request);
925 self.requests.sort();
926
927 id
928 }
929
930 pub fn next(&mut self) -> Option<PrioritizedRequest> {
932 self.requests.pop()
933 }
934
935 pub fn complete(&mut self, id: u64, data: Vec<u8>) {
937 self.completed.insert(id, data);
938 }
939
940 pub fn get_completed(&self, id: u64) -> Option<&Vec<u8>> {
942 self.completed.get(&id)
943 }
944
945 pub fn pending_count(&self) -> usize {
947 self.requests.len()
948 }
949
950 pub fn clear_completed(&mut self) {
952 self.completed.clear();
953 }
954}
955
956impl Default for RequestQueue {
957 fn default() -> Self {
958 Self::new()
959 }
960}
961
962#[cfg(test)]
963mod tests {
964 use super::*;
965
966 #[test]
967 fn test_retry_config() {
968 let config = RetryConfig::new(3, 1000);
969 assert_eq!(config.delay_for_attempt(0), 1000);
970 assert_eq!(config.delay_for_attempt(1), 2000);
971 assert_eq!(config.delay_for_attempt(2), 4000);
972 }
973
974 #[test]
975 fn test_fetch_stats() {
976 let mut stats = FetchStats::new();
977 stats.total_requests = 10;
978 stats.successful_requests = 8;
979 stats.bytes_fetched = 1000;
980 stats.total_time_ms = 100.0;
981
982 assert_eq!(stats.success_rate(), 0.8);
983 assert_eq!(stats.average_request_time_ms(), 10.0);
984 assert_eq!(stats.average_throughput_bps(), 10000.0);
985 }
986
987 #[test]
988 fn test_request_priority() {
989 let low = PrioritizedRequest::new(
990 ByteRange::from_offset_length(0, 100),
991 RequestPriority::Low,
992 1,
993 );
994 let high = PrioritizedRequest::new(
995 ByteRange::from_offset_length(0, 100),
996 RequestPriority::High,
997 2,
998 );
999
1000 assert!(high > low);
1002 }
1003
1004 #[test]
1005 fn test_request_queue() {
1006 let mut queue = RequestQueue::new();
1007
1008 let _id1 = queue.add(ByteRange::from_offset_length(0, 100), RequestPriority::Low);
1009 let id2 = queue.add(
1010 ByteRange::from_offset_length(100, 100),
1011 RequestPriority::High,
1012 );
1013
1014 let next = queue.next().expect("Should have request");
1016 assert_eq!(next.id, id2);
1017
1018 queue.complete(id2, vec![1, 2, 3]);
1019 assert!(queue.get_completed(id2).is_some());
1020
1021 assert_eq!(queue.pending_count(), 1);
1022 }
1023
1024 }