1use crate::io::{HttpClient, HttpRequest};
9use crate::terrain::elevation_source::{
10 ElevationSource, ElevationSourceDiagnostics, ElevationSourceFailureDiagnostics,
11};
12use crate::terrain::error::TerrainError;
13use rustial_math::{ElevationGrid, TileId};
14use std::collections::{HashMap, VecDeque};
15use std::sync::Mutex;
16
17const DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS: usize = 32;
18
19pub struct QuantizedMeshSource {
21 url_template: String,
22 client: Box<dyn HttpClient>,
23 pending: Mutex<HashMap<String, TileId>>,
24 queued: Mutex<VecDeque<(String, TileId)>>,
25 grid_size: u32,
26 max_concurrent: usize,
27 diagnostics: Mutex<ElevationSourceFailureDiagnostics>,
28}
29
30impl std::fmt::Debug for QuantizedMeshSource {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 f.debug_struct("QuantizedMeshSource")
33 .field("url_template", &self.url_template)
34 .field("grid_size", &self.grid_size)
35 .field("max_concurrent", &self.max_concurrent)
36 .finish()
37 }
38}
39
40impl QuantizedMeshSource {
41 pub fn new(
46 url_template: impl Into<String>,
47 client: Box<dyn HttpClient>,
48 grid_size: u32,
49 ) -> Self {
50 Self {
51 url_template: url_template.into(),
52 client,
53 pending: Mutex::new(HashMap::new()),
54 queued: Mutex::new(VecDeque::new()),
55 grid_size: grid_size.max(2),
56 max_concurrent: DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS,
57 diagnostics: Mutex::new(ElevationSourceFailureDiagnostics::default()),
58 }
59 }
60
61 pub fn with_max_concurrent_requests(mut self, max_concurrent: usize) -> Self {
63 self.max_concurrent = max_concurrent.max(1);
64 self
65 }
66
67 fn tile_url(&self, id: &TileId) -> String {
68 self.url_template
69 .replace("{z}", &id.zoom.to_string())
70 .replace("{x}", &id.x.to_string())
71 .replace("{y}", &id.y.to_string())
72 }
73
74 fn parse_quantized_mesh(
75 &self,
76 id: TileId,
77 bytes: &[u8],
78 ) -> Result<ElevationGrid, TerrainError> {
79 const HEADER_LEN: usize = 88;
82 if bytes.len() < HEADER_LEN + 4 {
83 return Err(TerrainError::Decode("quantized-mesh tile too short".into()));
84 }
85
86 let mut cursor = 0usize;
87
88 cursor += HEADER_LEN;
91
92 let vertex_count = read_u32_le(bytes, &mut cursor)? as usize;
93 if vertex_count == 0 {
94 return Err(TerrainError::Decode(
95 "quantized-mesh has zero vertices".into(),
96 ));
97 }
98
99 let mut u = vec![0i32; vertex_count];
100 let mut v = vec![0i32; vertex_count];
101 let mut h = vec![0i32; vertex_count];
102
103 decode_delta_zigzag_stream(bytes, &mut cursor, &mut u)?;
104 decode_delta_zigzag_stream(bytes, &mut cursor, &mut v)?;
105 decode_delta_zigzag_stream(bytes, &mut cursor, &mut h)?;
106
107 let min_h = read_f32_le_at(bytes, 24)?;
110 let max_h = read_f32_le_at(bytes, 28)?;
111 let height_span = max_h - min_h;
112
113 let vertices: Vec<(f32, f32, f32)> = (0..vertex_count)
114 .map(|i| {
115 let uu = (u[i].clamp(0, 32767) as f32) / 32767.0;
116 let vv = (v[i].clamp(0, 32767) as f32) / 32767.0;
117 let hh = (h[i].clamp(0, 32767) as f32) / 32767.0;
118 let elev = min_h + hh * height_span;
119 (uu, vv, elev)
120 })
121 .collect();
122
123 let n = self.grid_size;
125 let mut data = Vec::with_capacity((n * n) as usize);
126 for gy in 0..n {
127 let tv = gy as f32 / (n - 1) as f32;
128 for gx in 0..n {
129 let tu = gx as f32 / (n - 1) as f32;
130 let mut best_d2 = f32::MAX;
131 let mut best_h = 0.0f32;
132 for &(uu, vv, hh) in &vertices {
133 let du = uu - tu;
134 let dv = vv - tv;
135 let d2 = du * du + dv * dv;
136 if d2 < best_d2 {
137 best_d2 = d2;
138 best_h = hh;
139 }
140 }
141 data.push(best_h);
142 }
143 }
144
145 ElevationGrid::from_data(id, n, n, data)
146 .ok_or_else(|| TerrainError::Decode("failed to build elevation grid".into()))
147 }
148
149 fn flush_queued(&self) {
150 let mut pending = match self.pending.lock() {
151 Ok(p) => p,
152 Err(_) => return,
153 };
154 let mut queued = match self.queued.lock() {
155 Ok(q) => q,
156 Err(_) => return,
157 };
158
159 while pending.len() < self.max_concurrent {
160 let Some((url, id)) = queued.pop_front() else {
161 break;
162 };
163 pending.insert(url.clone(), id);
164 self.client.send(HttpRequest::get(url));
165 }
166 }
167}
168
169impl ElevationSource for QuantizedMeshSource {
170 fn request(&self, id: TileId) {
171 let url = self.tile_url(&id);
172 let mut pending = match self.pending.lock() {
173 Ok(p) => p,
174 Err(_) => return,
175 };
176 if pending.values().any(|existing| *existing == id) {
177 return;
178 }
179 let mut queued = match self.queued.lock() {
180 Ok(q) => q,
181 Err(_) => return,
182 };
183 if queued.iter().any(|(_, existing)| *existing == id) {
184 return;
185 }
186
187 if pending.len() < self.max_concurrent {
188 pending.insert(url.clone(), id);
189 drop(queued);
190 drop(pending);
191 self.client.send(HttpRequest::get(url));
192 } else {
193 queued.push_back((url, id));
194 }
195 }
196
197 fn poll(&self) -> Vec<(TileId, Result<ElevationGrid, TerrainError>)> {
198 let responses = self.client.poll();
199 if responses.is_empty() {
200 return Vec::new();
201 }
202
203 let mut pending = match self.pending.lock() {
204 Ok(p) => p,
205 Err(_) => return Vec::new(),
206 };
207
208 let mut out = Vec::with_capacity(responses.len());
209 for (url, result) in responses {
210 let id = match pending.remove(&url) {
211 Some(id) => id,
212 None => {
213 if let Ok(mut diagnostics) = self.diagnostics.lock() {
214 diagnostics.ignored_completed_responses += 1;
215 }
216 continue;
217 }
218 };
219
220 match result {
221 Ok(resp) if resp.is_success() => {
222 let parsed = self.parse_quantized_mesh(id, &resp.body);
223 if let Err(err) = &parsed {
224 if let Ok(mut diagnostics) = self.diagnostics.lock() {
225 match err {
226 TerrainError::Decode(_) => diagnostics.decode_failures += 1,
227 TerrainError::Network(_) => diagnostics.network_failures += 1,
228 TerrainError::UnsupportedFormat(_) => {
229 diagnostics.unsupported_format_failures += 1
230 }
231 TerrainError::Other(_) => diagnostics.other_failures += 1,
232 }
233 }
234 }
235 out.push((id, parsed));
236 }
237 Ok(resp) => {
238 if let Ok(mut diagnostics) = self.diagnostics.lock() {
239 diagnostics.network_failures += 1;
240 }
241 out.push((
242 id,
243 Err(TerrainError::Network(format!("HTTP {}", resp.status))),
244 ));
245 }
246 Err(e) => {
247 if let Ok(mut diagnostics) = self.diagnostics.lock() {
248 diagnostics.network_failures += 1;
249 }
250 out.push((id, Err(TerrainError::Network(e))))
251 }
252 }
253 }
254 drop(pending);
255 self.flush_queued();
256 out
257 }
258
259 fn diagnostics(&self) -> Option<ElevationSourceDiagnostics> {
260 let pending = self.pending.lock().map(|p| p.len()).unwrap_or(0);
261 let queued = self.queued.lock().map(|q| q.len()).unwrap_or(0);
262 let failures = self
263 .diagnostics
264 .lock()
265 .map(|d| d.clone())
266 .unwrap_or_default();
267 Some(ElevationSourceDiagnostics {
268 queued_requests: queued,
269 in_flight_requests: pending,
270 max_concurrent_requests: self.max_concurrent,
271 known_requests: queued + pending,
272 cancelled_in_flight_requests: 0,
273 failure_diagnostics: failures,
274 })
275 }
276
277 fn cancel(&self, id: TileId) -> bool {
278 if let Ok(mut queued) = self.queued.lock() {
279 let before = queued.len();
280 queued.retain(|(_, queued_id)| *queued_id != id);
281 return queued.len() != before;
282 }
283 false
284 }
285}
286
287fn read_u16_le(bytes: &[u8], cursor: &mut usize) -> Result<u16, TerrainError> {
288 if *cursor + 2 > bytes.len() {
289 return Err(TerrainError::Decode("unexpected EOF (u16)".into()));
290 }
291 let v = u16::from_le_bytes([bytes[*cursor], bytes[*cursor + 1]]);
292 *cursor += 2;
293 Ok(v)
294}
295
296fn read_u32_le(bytes: &[u8], cursor: &mut usize) -> Result<u32, TerrainError> {
297 if *cursor + 4 > bytes.len() {
298 return Err(TerrainError::Decode("unexpected EOF (u32)".into()));
299 }
300 let v = u32::from_le_bytes([
301 bytes[*cursor],
302 bytes[*cursor + 1],
303 bytes[*cursor + 2],
304 bytes[*cursor + 3],
305 ]);
306 *cursor += 4;
307 Ok(v)
308}
309
310fn read_f32_le_at(bytes: &[u8], offset: usize) -> Result<f32, TerrainError> {
311 if offset + 4 > bytes.len() {
312 return Err(TerrainError::Decode("unexpected EOF (f32)".into()));
313 }
314 Ok(f32::from_le_bytes([
315 bytes[offset],
316 bytes[offset + 1],
317 bytes[offset + 2],
318 bytes[offset + 3],
319 ]))
320}
321
322fn zigzag_decode(v: u16) -> i32 {
323 ((v >> 1) as i32) ^ -((v & 1) as i32)
324}
325
326fn decode_delta_zigzag_stream(
327 bytes: &[u8],
328 cursor: &mut usize,
329 out: &mut [i32],
330) -> Result<(), TerrainError> {
331 let mut acc = 0i32;
332 for item in out.iter_mut() {
333 let enc = read_u16_le(bytes, cursor)?;
334 let delta = zigzag_decode(enc);
335 acc += delta;
336 *item = acc;
337 }
338 Ok(())
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344 use crate::io::{HttpClient, HttpRequest as Req, HttpResponse};
345 use std::sync::Mutex as StdMutex;
346
347 struct MockClient {
348 sent: StdMutex<Vec<String>>,
349 responses: StdMutex<Vec<(String, Result<HttpResponse, String>)>>,
350 }
351
352 impl MockClient {
353 fn new() -> Self {
354 Self {
355 sent: StdMutex::new(Vec::new()),
356 responses: StdMutex::new(Vec::new()),
357 }
358 }
359 }
360
361 impl HttpClient for MockClient {
362 fn send(&self, request: Req) {
363 self.sent.lock().unwrap().push(request.url);
364 }
365
366 fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
367 std::mem::take(&mut *self.responses.lock().unwrap())
368 }
369 }
370
371 fn zigzag_encode(v: i32) -> u16 {
372 ((v << 1) ^ (v >> 31)) as u16
373 }
374
375 fn make_synthetic_quantized_mesh() -> Vec<u8> {
376 let mut bytes = vec![0u8; 88];
377 bytes[24..28].copy_from_slice(&0.0f32.to_le_bytes());
379 bytes[28..32].copy_from_slice(&100.0f32.to_le_bytes());
380
381 let vertex_count = 4u32;
382 bytes.extend_from_slice(&vertex_count.to_le_bytes());
383
384 let u_abs = [0i32, 32767, 0, 32767];
386 let v_abs = [0i32, 0, 32767, 32767];
387 let h_abs = [0i32, 8192, 16384, 32767];
388
389 for stream in [&u_abs, &v_abs, &h_abs] {
390 let mut prev = 0i32;
391 for &cur in stream {
392 let delta = cur - prev;
393 prev = cur;
394 bytes.extend_from_slice(&zigzag_encode(delta).to_le_bytes());
395 }
396 }
397
398 bytes
399 }
400
401 #[test]
402 fn parse_quantized_mesh_to_grid() {
403 let client = MockClient::new();
404 let source = QuantizedMeshSource::new(
405 "https://example.com/{z}/{x}/{y}.terrain",
406 Box::new(client),
407 2,
408 );
409
410 let grid = source
411 .parse_quantized_mesh(TileId::new(0, 0, 0), &make_synthetic_quantized_mesh())
412 .expect("parse");
413
414 assert_eq!(grid.width, 2);
415 assert_eq!(grid.height, 2);
416 assert_eq!(grid.data.len(), 4);
417 assert!(grid.max_elev > grid.min_elev);
418 }
419
420 #[test]
421 fn poll_returns_decoded_grid() {
422 let client = MockClient::new();
423 client.responses.lock().unwrap().push((
424 "https://example.com/0/0/0.terrain".into(),
425 Ok(HttpResponse {
426 status: 200,
427 body: make_synthetic_quantized_mesh(),
428 headers: vec![],
429 }),
430 ));
431
432 let source = QuantizedMeshSource::new(
433 "https://example.com/{z}/{x}/{y}.terrain",
434 Box::new(client),
435 2,
436 );
437
438 source.request(TileId::new(0, 0, 0));
439 let out = source.poll();
440 assert_eq!(out.len(), 1);
441 assert!(out[0].1.is_ok());
442 }
443
444 #[test]
445 fn invalid_bytes_error() {
446 let client = MockClient::new();
447 let source = QuantizedMeshSource::new(
448 "https://example.com/{z}/{x}/{y}.terrain",
449 Box::new(client),
450 2,
451 );
452 let err = source
453 .parse_quantized_mesh(TileId::new(0, 0, 0), &[1, 2, 3])
454 .unwrap_err();
455 assert!(matches!(err, TerrainError::Decode(_)));
456 }
457
458 #[test]
459 fn respects_concurrency_cap_and_queues_excess_requests() {
460 let client = MockClient::new();
461 let source = QuantizedMeshSource::new(
462 "https://example.com/{z}/{x}/{y}.terrain",
463 Box::new(client),
464 2,
465 )
466 .with_max_concurrent_requests(1);
467
468 source.request(TileId::new(1, 0, 0));
469 source.request(TileId::new(1, 0, 1));
470
471 let diagnostics = source.diagnostics().expect("terrain diagnostics");
472 assert_eq!(diagnostics.in_flight_requests, 1);
473 assert_eq!(diagnostics.queued_requests, 1);
474 assert_eq!(diagnostics.max_concurrent_requests, 1);
475 assert_eq!(diagnostics.known_requests, 2);
476 }
477
478 #[test]
479 fn cancel_removes_queued_request_only() {
480 let client = MockClient::new();
481 let source = QuantizedMeshSource::new(
482 "https://example.com/{z}/{x}/{y}.terrain",
483 Box::new(client),
484 2,
485 )
486 .with_max_concurrent_requests(1);
487
488 let a = TileId::new(1, 0, 0);
489 let b = TileId::new(1, 0, 1);
490 source.request(a);
491 source.request(b);
492
493 assert!(!source.cancel(a), "in-flight request should not cancel");
494 assert!(source.cancel(b), "queued request should cancel");
495
496 let diagnostics = source.diagnostics().expect("terrain diagnostics");
497 assert_eq!(diagnostics.in_flight_requests, 1);
498 assert_eq!(diagnostics.queued_requests, 0);
499 assert_eq!(diagnostics.known_requests, 1);
500 }
501}