use crate::io::{HttpClient, HttpRequest};
use crate::terrain::elevation_source::{
ElevationSource, ElevationSourceDiagnostics, ElevationSourceFailureDiagnostics,
};
use crate::terrain::error::TerrainError;
use rustial_math::{ElevationGrid, TileId};
use std::collections::{HashMap, VecDeque};
use std::sync::Mutex;
const DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS: usize = 32;
pub struct QuantizedMeshSource {
url_template: String,
client: Box<dyn HttpClient>,
pending: Mutex<HashMap<String, TileId>>,
queued: Mutex<VecDeque<(String, TileId)>>,
grid_size: u32,
max_concurrent: usize,
diagnostics: Mutex<ElevationSourceFailureDiagnostics>,
}
impl std::fmt::Debug for QuantizedMeshSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QuantizedMeshSource")
.field("url_template", &self.url_template)
.field("grid_size", &self.grid_size)
.field("max_concurrent", &self.max_concurrent)
.finish()
}
}
impl QuantizedMeshSource {
pub fn new(
url_template: impl Into<String>,
client: Box<dyn HttpClient>,
grid_size: u32,
) -> Self {
Self {
url_template: url_template.into(),
client,
pending: Mutex::new(HashMap::new()),
queued: Mutex::new(VecDeque::new()),
grid_size: grid_size.max(2),
max_concurrent: DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS,
diagnostics: Mutex::new(ElevationSourceFailureDiagnostics::default()),
}
}
pub fn with_max_concurrent_requests(mut self, max_concurrent: usize) -> Self {
self.max_concurrent = max_concurrent.max(1);
self
}
fn tile_url(&self, id: &TileId) -> String {
self.url_template
.replace("{z}", &id.zoom.to_string())
.replace("{x}", &id.x.to_string())
.replace("{y}", &id.y.to_string())
}
fn parse_quantized_mesh(
&self,
id: TileId,
bytes: &[u8],
) -> Result<ElevationGrid, TerrainError> {
const HEADER_LEN: usize = 88;
if bytes.len() < HEADER_LEN + 4 {
return Err(TerrainError::Decode("quantized-mesh tile too short".into()));
}
let mut cursor = 0usize;
cursor += HEADER_LEN;
let vertex_count = read_u32_le(bytes, &mut cursor)? as usize;
if vertex_count == 0 {
return Err(TerrainError::Decode(
"quantized-mesh has zero vertices".into(),
));
}
let mut u = vec![0i32; vertex_count];
let mut v = vec![0i32; vertex_count];
let mut h = vec![0i32; vertex_count];
decode_delta_zigzag_stream(bytes, &mut cursor, &mut u)?;
decode_delta_zigzag_stream(bytes, &mut cursor, &mut v)?;
decode_delta_zigzag_stream(bytes, &mut cursor, &mut h)?;
let min_h = read_f32_le_at(bytes, 24)?;
let max_h = read_f32_le_at(bytes, 28)?;
let height_span = max_h - min_h;
let vertices: Vec<(f32, f32, f32)> = (0..vertex_count)
.map(|i| {
let uu = (u[i].clamp(0, 32767) as f32) / 32767.0;
let vv = (v[i].clamp(0, 32767) as f32) / 32767.0;
let hh = (h[i].clamp(0, 32767) as f32) / 32767.0;
let elev = min_h + hh * height_span;
(uu, vv, elev)
})
.collect();
let n = self.grid_size;
let mut data = Vec::with_capacity((n * n) as usize);
for gy in 0..n {
let tv = gy as f32 / (n - 1) as f32;
for gx in 0..n {
let tu = gx as f32 / (n - 1) as f32;
let mut best_d2 = f32::MAX;
let mut best_h = 0.0f32;
for &(uu, vv, hh) in &vertices {
let du = uu - tu;
let dv = vv - tv;
let d2 = du * du + dv * dv;
if d2 < best_d2 {
best_d2 = d2;
best_h = hh;
}
}
data.push(best_h);
}
}
ElevationGrid::from_data(id, n, n, data)
.ok_or_else(|| TerrainError::Decode("failed to build elevation grid".into()))
}
fn flush_queued(&self) {
let mut pending = match self.pending.lock() {
Ok(p) => p,
Err(_) => return,
};
let mut queued = match self.queued.lock() {
Ok(q) => q,
Err(_) => return,
};
while pending.len() < self.max_concurrent {
let Some((url, id)) = queued.pop_front() else {
break;
};
pending.insert(url.clone(), id);
self.client.send(HttpRequest::get(url));
}
}
}
impl ElevationSource for QuantizedMeshSource {
fn request(&self, id: TileId) {
let url = self.tile_url(&id);
let mut pending = match self.pending.lock() {
Ok(p) => p,
Err(_) => return,
};
if pending.values().any(|existing| *existing == id) {
return;
}
let mut queued = match self.queued.lock() {
Ok(q) => q,
Err(_) => return,
};
if queued.iter().any(|(_, existing)| *existing == id) {
return;
}
if pending.len() < self.max_concurrent {
pending.insert(url.clone(), id);
drop(queued);
drop(pending);
self.client.send(HttpRequest::get(url));
} else {
queued.push_back((url, id));
}
}
fn poll(&self) -> Vec<(TileId, Result<ElevationGrid, TerrainError>)> {
let responses = self.client.poll();
if responses.is_empty() {
return Vec::new();
}
let mut pending = match self.pending.lock() {
Ok(p) => p,
Err(_) => return Vec::new(),
};
let mut out = Vec::with_capacity(responses.len());
for (url, result) in responses {
let id = match pending.remove(&url) {
Some(id) => id,
None => {
if let Ok(mut diagnostics) = self.diagnostics.lock() {
diagnostics.ignored_completed_responses += 1;
}
continue;
}
};
match result {
Ok(resp) if resp.is_success() => {
let parsed = self.parse_quantized_mesh(id, &resp.body);
if let Err(err) = &parsed {
if let Ok(mut diagnostics) = self.diagnostics.lock() {
match err {
TerrainError::Decode(_) => diagnostics.decode_failures += 1,
TerrainError::Network(_) => diagnostics.network_failures += 1,
TerrainError::UnsupportedFormat(_) => {
diagnostics.unsupported_format_failures += 1
}
TerrainError::Other(_) => diagnostics.other_failures += 1,
}
}
}
out.push((id, parsed));
}
Ok(resp) => {
if let Ok(mut diagnostics) = self.diagnostics.lock() {
diagnostics.network_failures += 1;
}
out.push((
id,
Err(TerrainError::Network(format!("HTTP {}", resp.status))),
));
}
Err(e) => {
if let Ok(mut diagnostics) = self.diagnostics.lock() {
diagnostics.network_failures += 1;
}
out.push((id, Err(TerrainError::Network(e))))
}
}
}
drop(pending);
self.flush_queued();
out
}
fn diagnostics(&self) -> Option<ElevationSourceDiagnostics> {
let pending = self.pending.lock().map(|p| p.len()).unwrap_or(0);
let queued = self.queued.lock().map(|q| q.len()).unwrap_or(0);
let failures = self
.diagnostics
.lock()
.map(|d| d.clone())
.unwrap_or_default();
Some(ElevationSourceDiagnostics {
queued_requests: queued,
in_flight_requests: pending,
max_concurrent_requests: self.max_concurrent,
known_requests: queued + pending,
cancelled_in_flight_requests: 0,
failure_diagnostics: failures,
})
}
fn cancel(&self, id: TileId) -> bool {
if let Ok(mut queued) = self.queued.lock() {
let before = queued.len();
queued.retain(|(_, queued_id)| *queued_id != id);
return queued.len() != before;
}
false
}
}
fn read_u16_le(bytes: &[u8], cursor: &mut usize) -> Result<u16, TerrainError> {
if *cursor + 2 > bytes.len() {
return Err(TerrainError::Decode("unexpected EOF (u16)".into()));
}
let v = u16::from_le_bytes([bytes[*cursor], bytes[*cursor + 1]]);
*cursor += 2;
Ok(v)
}
fn read_u32_le(bytes: &[u8], cursor: &mut usize) -> Result<u32, TerrainError> {
if *cursor + 4 > bytes.len() {
return Err(TerrainError::Decode("unexpected EOF (u32)".into()));
}
let v = u32::from_le_bytes([
bytes[*cursor],
bytes[*cursor + 1],
bytes[*cursor + 2],
bytes[*cursor + 3],
]);
*cursor += 4;
Ok(v)
}
fn read_f32_le_at(bytes: &[u8], offset: usize) -> Result<f32, TerrainError> {
if offset + 4 > bytes.len() {
return Err(TerrainError::Decode("unexpected EOF (f32)".into()));
}
Ok(f32::from_le_bytes([
bytes[offset],
bytes[offset + 1],
bytes[offset + 2],
bytes[offset + 3],
]))
}
fn zigzag_decode(v: u16) -> i32 {
((v >> 1) as i32) ^ -((v & 1) as i32)
}
fn decode_delta_zigzag_stream(
bytes: &[u8],
cursor: &mut usize,
out: &mut [i32],
) -> Result<(), TerrainError> {
let mut acc = 0i32;
for item in out.iter_mut() {
let enc = read_u16_le(bytes, cursor)?;
let delta = zigzag_decode(enc);
acc += delta;
*item = acc;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::io::{HttpClient, HttpRequest as Req, HttpResponse};
use std::sync::Mutex as StdMutex;
struct MockClient {
sent: StdMutex<Vec<String>>,
responses: StdMutex<Vec<(String, Result<HttpResponse, String>)>>,
}
impl MockClient {
fn new() -> Self {
Self {
sent: StdMutex::new(Vec::new()),
responses: StdMutex::new(Vec::new()),
}
}
}
impl HttpClient for MockClient {
fn send(&self, request: Req) {
self.sent.lock().unwrap().push(request.url);
}
fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
std::mem::take(&mut *self.responses.lock().unwrap())
}
}
fn zigzag_encode(v: i32) -> u16 {
((v << 1) ^ (v >> 31)) as u16
}
fn make_synthetic_quantized_mesh() -> Vec<u8> {
let mut bytes = vec![0u8; 88];
bytes[24..28].copy_from_slice(&0.0f32.to_le_bytes());
bytes[28..32].copy_from_slice(&100.0f32.to_le_bytes());
let vertex_count = 4u32;
bytes.extend_from_slice(&vertex_count.to_le_bytes());
let u_abs = [0i32, 32767, 0, 32767];
let v_abs = [0i32, 0, 32767, 32767];
let h_abs = [0i32, 8192, 16384, 32767];
for stream in [&u_abs, &v_abs, &h_abs] {
let mut prev = 0i32;
for &cur in stream {
let delta = cur - prev;
prev = cur;
bytes.extend_from_slice(&zigzag_encode(delta).to_le_bytes());
}
}
bytes
}
#[test]
fn parse_quantized_mesh_to_grid() {
let client = MockClient::new();
let source = QuantizedMeshSource::new(
"https://example.com/{z}/{x}/{y}.terrain",
Box::new(client),
2,
);
let grid = source
.parse_quantized_mesh(TileId::new(0, 0, 0), &make_synthetic_quantized_mesh())
.expect("parse");
assert_eq!(grid.width, 2);
assert_eq!(grid.height, 2);
assert_eq!(grid.data.len(), 4);
assert!(grid.max_elev > grid.min_elev);
}
#[test]
fn poll_returns_decoded_grid() {
let client = MockClient::new();
client.responses.lock().unwrap().push((
"https://example.com/0/0/0.terrain".into(),
Ok(HttpResponse {
status: 200,
body: make_synthetic_quantized_mesh(),
headers: vec![],
}),
));
let source = QuantizedMeshSource::new(
"https://example.com/{z}/{x}/{y}.terrain",
Box::new(client),
2,
);
source.request(TileId::new(0, 0, 0));
let out = source.poll();
assert_eq!(out.len(), 1);
assert!(out[0].1.is_ok());
}
#[test]
fn invalid_bytes_error() {
let client = MockClient::new();
let source = QuantizedMeshSource::new(
"https://example.com/{z}/{x}/{y}.terrain",
Box::new(client),
2,
);
let err = source
.parse_quantized_mesh(TileId::new(0, 0, 0), &[1, 2, 3])
.unwrap_err();
assert!(matches!(err, TerrainError::Decode(_)));
}
#[test]
fn respects_concurrency_cap_and_queues_excess_requests() {
let client = MockClient::new();
let source = QuantizedMeshSource::new(
"https://example.com/{z}/{x}/{y}.terrain",
Box::new(client),
2,
)
.with_max_concurrent_requests(1);
source.request(TileId::new(1, 0, 0));
source.request(TileId::new(1, 0, 1));
let diagnostics = source.diagnostics().expect("terrain diagnostics");
assert_eq!(diagnostics.in_flight_requests, 1);
assert_eq!(diagnostics.queued_requests, 1);
assert_eq!(diagnostics.max_concurrent_requests, 1);
assert_eq!(diagnostics.known_requests, 2);
}
#[test]
fn cancel_removes_queued_request_only() {
let client = MockClient::new();
let source = QuantizedMeshSource::new(
"https://example.com/{z}/{x}/{y}.terrain",
Box::new(client),
2,
)
.with_max_concurrent_requests(1);
let a = TileId::new(1, 0, 0);
let b = TileId::new(1, 0, 1);
source.request(a);
source.request(b);
assert!(!source.cancel(a), "in-flight request should not cancel");
assert!(source.cancel(b), "queued request should cancel");
let diagnostics = source.diagnostics().expect("terrain diagnostics");
assert_eq!(diagnostics.in_flight_requests, 1);
assert_eq!(diagnostics.queued_requests, 0);
assert_eq!(diagnostics.known_requests, 1);
}
}