use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use crate::error::{WasmError, WasmResult};
use crate::fetch::RequestPriority;
use crate::tile::TileCoord;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum StreamingQuality {
Low,
Medium,
High,
Adaptive,
}
impl StreamingQuality {
pub const fn multiplier(&self) -> f64 {
match self {
Self::Low => 0.5,
Self::Medium => 1.0,
Self::High => 2.0,
Self::Adaptive => 1.0, }
}
pub const fn resolution(&self) -> u32 {
match self {
Self::Low => 128,
Self::Medium => 256,
Self::High => 512,
Self::Adaptive => 256,
}
}
}
#[derive(Debug, Clone)]
pub struct BandwidthEstimator {
transfer_sizes: VecDeque<usize>,
transfer_times: VecDeque<f64>,
max_samples: usize,
estimated_bandwidth: f64,
}
impl BandwidthEstimator {
pub fn new() -> Self {
Self {
transfer_sizes: VecDeque::new(),
transfer_times: VecDeque::new(),
max_samples: 20,
estimated_bandwidth: 1_000_000.0, }
}
pub fn record_transfer(&mut self, bytes: usize, time_ms: f64) {
self.transfer_sizes.push_back(bytes);
self.transfer_times.push_back(time_ms);
if self.transfer_sizes.len() > self.max_samples {
self.transfer_sizes.pop_front();
self.transfer_times.pop_front();
}
self.update_estimate();
}
pub fn record_download(&mut self, bytes: usize, time_ms: f64) {
self.record_transfer(bytes, time_ms);
}
pub const fn estimate(&self) -> f64 {
self.estimated_bandwidth
}
fn update_estimate(&mut self) {
if self.transfer_sizes.is_empty() || self.transfer_times.is_empty() {
return;
}
let total_bytes: usize = self.transfer_sizes.iter().sum();
let total_time: f64 = self.transfer_times.iter().sum();
if total_time > 0.0 {
self.estimated_bandwidth = (total_bytes as f64 / total_time) * 1000.0;
}
}
pub const fn bandwidth_bps(&self) -> f64 {
self.estimated_bandwidth
}
pub fn bandwidth_mbps(&self) -> f64 {
(self.estimated_bandwidth * 8.0) / 1_000_000.0
}
pub fn estimate_download_time(&self, bytes: usize) -> f64 {
if self.estimated_bandwidth > 0.0 {
(bytes as f64 / self.estimated_bandwidth) * 1000.0
} else {
f64::MAX
}
}
pub fn is_sufficient_for_quality(&self, quality: StreamingQuality) -> bool {
let required_bps = match quality {
StreamingQuality::Low => 500_000.0, StreamingQuality::Medium => 1_000_000.0, StreamingQuality::High => 5_000_000.0, StreamingQuality::Adaptive => 0.0, };
self.estimated_bandwidth >= required_bps
}
pub fn suggest_quality(&self) -> StreamingQuality {
let mbps = self.bandwidth_mbps();
if mbps < 2.0 {
StreamingQuality::Low
} else if mbps < 10.0 {
StreamingQuality::Medium
} else {
StreamingQuality::High
}
}
}
impl Default for BandwidthEstimator {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct QualityAdapter {
estimator: BandwidthEstimator,
current_quality: StreamingQuality,
hysteresis_count: usize,
hysteresis_threshold: usize,
}
impl QualityAdapter {
pub fn new() -> Self {
Self {
estimator: BandwidthEstimator::new(),
current_quality: StreamingQuality::Medium,
hysteresis_count: 0,
hysteresis_threshold: 3,
}
}
pub fn update_bandwidth(&mut self, bandwidth_bps: f64, _timestamp: f64) {
let bandwidth_bps = bandwidth_bps.min(8_000_000_000.0); let bytes = (bandwidth_bps / 8.0) as usize; self.estimator.record_transfer(bytes, 1000.0);
self.update_quality();
}
pub const fn current_quality(&self) -> StreamingQuality {
self.current_quality
}
fn update_quality(&mut self) {
let suggested = self.estimator.suggest_quality();
if suggested != self.current_quality {
self.hysteresis_count += 1;
if self.hysteresis_count >= self.hysteresis_threshold {
self.current_quality = suggested;
self.hysteresis_count = 0;
}
} else {
self.hysteresis_count = 0;
}
}
}
impl Default for QualityAdapter {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct StreamBuffer {
tiles: HashMap<TileCoord, Vec<u8>>,
max_size: usize,
current_size: usize,
access_order: VecDeque<TileCoord>,
}
impl StreamBuffer {
pub fn new(max_size: usize) -> Self {
Self {
tiles: HashMap::new(),
max_size,
current_size: 0,
access_order: VecDeque::new(),
}
}
pub fn add(&mut self, coord: TileCoord, data: Vec<u8>) -> WasmResult<()> {
let data_size = data.len();
while self.current_size + data_size > self.max_size && !self.access_order.is_empty() {
self.evict_oldest()?;
}
if data_size > self.max_size {
return Err(WasmError::OutOfMemory {
requested: data_size,
available: Some(self.max_size),
});
}
if self.tiles.contains_key(&coord) {
if let Some(old_data) = self.tiles.remove(&coord) {
self.current_size -= old_data.len();
}
if let Some(pos) = self.access_order.iter().position(|c| *c == coord) {
self.access_order.remove(pos);
}
}
self.tiles.insert(coord, data);
self.current_size += data_size;
self.access_order.push_back(coord);
Ok(())
}
pub fn get(&mut self, coord: &TileCoord) -> Option<&[u8]> {
if let Some(data) = self.tiles.get(coord) {
if let Some(pos) = self.access_order.iter().position(|c| c == coord) {
self.access_order.remove(pos);
self.access_order.push_back(*coord);
}
Some(data)
} else {
None
}
}
fn evict_oldest(&mut self) -> WasmResult<()> {
if let Some(coord) = self.access_order.pop_front() {
if let Some(data) = self.tiles.remove(&coord) {
self.current_size -= data.len();
}
}
Ok(())
}
pub fn contains(&self, coord: &TileCoord) -> bool {
self.tiles.contains_key(coord)
}
pub fn clear(&mut self) {
self.tiles.clear();
self.access_order.clear();
self.current_size = 0;
}
pub fn stats(&self) -> StreamBufferStats {
StreamBufferStats {
tile_count: self.tiles.len(),
current_size: self.current_size,
max_size: self.max_size,
utilization: self.current_size as f64 / self.max_size as f64,
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct StreamBufferStats {
pub tile_count: usize,
pub current_size: usize,
pub max_size: usize,
pub utilization: f64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LoadStrategy {
Nearest,
Spiral,
Importance,
Adaptive,
}
pub struct ImportanceCalculator {
viewport_center: (f64, f64),
viewport_size: (f64, f64),
}
impl ImportanceCalculator {
pub const fn new(viewport_center: (f64, f64), viewport_size: (f64, f64)) -> Self {
Self {
viewport_center,
viewport_size,
}
}
pub fn calculate(&self, coord: &TileCoord) -> f64 {
let tile_center_x = (f64::from(coord.x) + 0.5) * 256.0;
let tile_center_y = (f64::from(coord.y) + 0.5) * 256.0;
let dx = tile_center_x - self.viewport_center.0;
let dy = tile_center_y - self.viewport_center.1;
let distance = (dx * dx + dy * dy).sqrt();
let max_distance = (self.viewport_size.0 * self.viewport_size.0
+ self.viewport_size.1 * self.viewport_size.1)
.sqrt();
let normalized_distance = if max_distance > 0.0 {
(distance / max_distance).min(1.0)
} else {
0.0
};
1.0 - normalized_distance
}
pub fn sort_by_importance(&self, tiles: &mut Vec<TileCoord>) {
tiles.sort_by(|a, b| {
let imp_a = self.calculate(a);
let imp_b = self.calculate(b);
imp_b
.partial_cmp(&imp_a)
.unwrap_or(std::cmp::Ordering::Equal)
});
}
}
pub struct TileStreamer {
buffer: StreamBuffer,
bandwidth: BandwidthEstimator,
quality: StreamingQuality,
strategy: LoadStrategy,
pending: HashMap<TileCoord, f64>, completed: HashMap<TileCoord, f64>, }
impl TileStreamer {
pub fn new(buffer_size_mb: usize) -> Self {
Self {
buffer: StreamBuffer::new(buffer_size_mb * 1024 * 1024),
bandwidth: BandwidthEstimator::new(),
quality: StreamingQuality::Adaptive,
strategy: LoadStrategy::Adaptive,
pending: HashMap::new(),
completed: HashMap::new(),
}
}
pub fn set_quality(&mut self, quality: StreamingQuality) {
self.quality = quality;
}
pub fn set_strategy(&mut self, strategy: LoadStrategy) {
self.strategy = strategy;
}
pub fn request_tile(&mut self, coord: TileCoord, timestamp: f64) {
if !self.buffer.contains(&coord) && !self.pending.contains_key(&coord) {
self.pending.insert(coord, timestamp);
}
}
pub fn complete_tile(
&mut self,
coord: TileCoord,
data: Vec<u8>,
load_time_ms: f64,
timestamp: f64,
) -> WasmResult<()> {
self.pending.remove(&coord);
self.completed.insert(coord, timestamp);
self.bandwidth.record_transfer(data.len(), load_time_ms);
self.buffer.add(coord, data)?;
if matches!(self.quality, StreamingQuality::Adaptive) {
let suggested = self.bandwidth.suggest_quality();
self.quality = suggested;
}
Ok(())
}
pub fn get_tile(&mut self, coord: &TileCoord) -> Option<&[u8]> {
self.buffer.get(coord)
}
pub fn pending_count(&self) -> usize {
self.pending.len()
}
pub const fn current_quality(&self) -> StreamingQuality {
self.quality
}
pub fn stats(&self) -> StreamingStats {
StreamingStats {
buffer: self.buffer.stats(),
bandwidth_mbps: self.bandwidth.bandwidth_mbps(),
pending_tiles: self.pending.len(),
completed_tiles: self.completed.len(),
current_quality: self.quality,
}
}
pub fn clear(&mut self) {
self.buffer.clear();
self.pending.clear();
self.completed.clear();
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamingStats {
pub buffer: StreamBufferStats,
pub bandwidth_mbps: f64,
pub pending_tiles: usize,
pub completed_tiles: usize,
pub current_quality: StreamingQuality,
}
pub struct MultiResolutionStreamer {
streamers: HashMap<u32, TileStreamer>,
}
impl MultiResolutionStreamer {
pub fn new() -> Self {
Self {
streamers: HashMap::new(),
}
}
pub fn add_resolution(&mut self, resolution: u32, buffer_size_mb: usize) {
self.streamers
.insert(resolution, TileStreamer::new(buffer_size_mb));
}
pub fn request_tile(&mut self, resolution: u32, coord: TileCoord, timestamp: f64) {
if let Some(streamer) = self.streamers.get_mut(&resolution) {
streamer.request_tile(coord, timestamp);
}
}
pub fn get_tile(&mut self, resolution: u32, coord: &TileCoord) -> Option<&[u8]> {
self.streamers
.get_mut(&resolution)
.and_then(|s| s.get_tile(coord))
}
pub fn all_stats(&self) -> HashMap<u32, StreamingStats> {
self.streamers
.iter()
.map(|(&res, streamer)| (res, streamer.stats()))
.collect()
}
}
impl Default for MultiResolutionStreamer {
fn default() -> Self {
Self::new()
}
}
pub struct PrefetchScheduler {
queue: VecDeque<(TileCoord, RequestPriority)>,
max_concurrent: usize,
active: HashMap<TileCoord, f64>, }
impl PrefetchScheduler {
pub fn new(max_concurrent: usize) -> Self {
Self {
queue: VecDeque::new(),
max_concurrent,
active: HashMap::new(),
}
}
pub fn schedule(&mut self, coord: TileCoord, priority: RequestPriority) {
self.queue.retain(|(c, _)| *c != coord);
let pos = self
.queue
.iter()
.position(|(_, p)| *p < priority)
.unwrap_or(self.queue.len());
self.queue.insert(pos, (coord, priority));
}
pub fn next(&mut self, timestamp: f64) -> Option<TileCoord> {
if self.active.len() >= self.max_concurrent {
return None;
}
if let Some((coord, _)) = self.queue.pop_front() {
self.active.insert(coord, timestamp);
Some(coord)
} else {
None
}
}
pub fn complete(&mut self, coord: TileCoord) {
self.active.remove(&coord);
}
pub fn pending_count(&self) -> usize {
self.queue.len()
}
pub fn active_count(&self) -> usize {
self.active.len()
}
pub fn clear(&mut self) {
self.queue.clear();
self.active.clear();
}
pub fn schedule_prefetch(&self, viewport: &crate::Viewport, _level: usize) -> Vec<TileCoord> {
let bounds = viewport.bounds();
let mut tiles = Vec::new();
let min_x = (bounds.0.max(0.0) / viewport.width as f64) as u32;
let min_y = (bounds.1.max(0.0) / viewport.height as f64) as u32;
let max_x = ((bounds.2 / viewport.width as f64).ceil() as u32).min(100);
let max_y = ((bounds.3 / viewport.height as f64).ceil() as u32).min(100);
for x in min_x..=max_x {
for y in min_y..=max_y {
tiles.push(TileCoord::new(0, x, y));
}
}
tiles
}
}
#[derive(Debug, Clone)]
pub struct ProgressiveLoader {
loaded: Vec<TileCoord>,
}
impl ProgressiveLoader {
pub const fn new() -> Self {
Self { loaded: Vec::new() }
}
pub fn prioritize_tiles(&self, tiles: &[TileCoord]) -> Vec<TileCoord> {
let mut result = tiles.to_vec();
result.sort_by_key(|coord| coord.level);
result
}
pub fn mark_loaded(&mut self, coord: TileCoord) {
if !self.loaded.contains(&coord) {
self.loaded.push(coord);
}
}
pub fn is_loaded(&self, coord: &TileCoord) -> bool {
self.loaded.contains(coord)
}
pub fn clear(&mut self) {
self.loaded.clear();
}
}
impl Default for ProgressiveLoader {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bandwidth_estimator() {
let mut estimator = BandwidthEstimator::new();
estimator.record_transfer(1_000_000, 1000.0); estimator.record_transfer(2_000_000, 2000.0);
let bps = estimator.bandwidth_bps();
assert!(bps > 900_000.0 && bps < 1_100_000.0);
}
#[test]
fn test_bandwidth_quality_suggestion() {
let mut estimator = BandwidthEstimator::new();
estimator.record_transfer(100_000, 1000.0); assert_eq!(estimator.suggest_quality(), StreamingQuality::Low);
estimator.record_transfer(10_000_000, 1000.0); assert_eq!(estimator.suggest_quality(), StreamingQuality::High);
}
#[test]
fn test_stream_buffer() {
let mut buffer = StreamBuffer::new(1000);
let coord = TileCoord::new(0, 0, 0);
let data = vec![1, 2, 3, 4, 5];
buffer.add(coord, data.clone()).expect("Add failed");
assert!(buffer.contains(&coord));
let retrieved = buffer.get(&coord).expect("Get failed");
assert_eq!(retrieved, &data[..]);
}
#[test]
fn test_stream_buffer_eviction() {
let mut buffer = StreamBuffer::new(20);
let coord1 = TileCoord::new(0, 0, 0);
let coord2 = TileCoord::new(0, 1, 0);
buffer.add(coord1, vec![0; 15]).expect("Add 1");
buffer.add(coord2, vec![0; 15]).expect("Add 2");
assert!(!buffer.contains(&coord1));
assert!(buffer.contains(&coord2));
}
#[test]
fn test_importance_calculator() {
let calc = ImportanceCalculator::new((500.0, 500.0), (1000.0, 1000.0));
let center_tile = TileCoord::new(0, 2, 2);
let center_imp = calc.calculate(¢er_tile);
assert!(center_imp > 0.8);
let far_tile = TileCoord::new(0, 10, 10);
let far_imp = calc.calculate(&far_tile);
assert!(far_imp < center_imp);
}
#[test]
fn test_tile_streamer() {
let mut streamer = TileStreamer::new(10); let coord = TileCoord::new(0, 0, 0);
streamer.request_tile(coord, 0.0);
assert_eq!(streamer.pending_count(), 1);
let data = vec![0u8; 1000];
streamer
.complete_tile(coord, data, 10.0, 1.0)
.expect("Complete failed");
assert_eq!(streamer.pending_count(), 0);
assert!(streamer.get_tile(&coord).is_some());
}
#[test]
fn test_prefetch_scheduler() {
let mut scheduler = PrefetchScheduler::new(2);
scheduler.schedule(TileCoord::new(0, 0, 0), RequestPriority::Low);
scheduler.schedule(TileCoord::new(0, 1, 0), RequestPriority::High);
let next = scheduler.next(0.0).expect("Should have tile");
assert_eq!(next, TileCoord::new(0, 1, 0));
assert_eq!(scheduler.active_count(), 1);
assert_eq!(scheduler.pending_count(), 1);
}
}