1use crate::io::{HttpClient, HttpRequest};
9use crate::terrain::elevation_source::{
10 ElevationSource, ElevationSourceDiagnostics, ElevationSourceFailureDiagnostics,
11};
12use crate::terrain::error::TerrainError;
13use rustial_math::{ElevationGrid, TileId};
14use std::collections::{HashMap, VecDeque};
15use std::sync::Mutex;
16
17const DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS: usize = 32;
18
19pub struct QuantizedMeshSource {
21 url_template: String,
22 client: Box<dyn HttpClient>,
23 pending: Mutex<HashMap<String, TileId>>,
24 queued: Mutex<VecDeque<(String, TileId)>>,
25 grid_size: u32,
26 max_concurrent: usize,
27 diagnostics: Mutex<ElevationSourceFailureDiagnostics>,
28}
29
30impl std::fmt::Debug for QuantizedMeshSource {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 f.debug_struct("QuantizedMeshSource")
33 .field("url_template", &self.url_template)
34 .field("grid_size", &self.grid_size)
35 .field("max_concurrent", &self.max_concurrent)
36 .finish()
37 }
38}
39
40impl QuantizedMeshSource {
41 pub fn new(
46 url_template: impl Into<String>,
47 client: Box<dyn HttpClient>,
48 grid_size: u32,
49 ) -> Self {
50 Self {
51 url_template: url_template.into(),
52 client,
53 pending: Mutex::new(HashMap::new()),
54 queued: Mutex::new(VecDeque::new()),
55 grid_size: grid_size.max(2),
56 max_concurrent: DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS,
57 diagnostics: Mutex::new(ElevationSourceFailureDiagnostics::default()),
58 }
59 }
60
61 pub fn with_max_concurrent_requests(mut self, max_concurrent: usize) -> Self {
63 self.max_concurrent = max_concurrent.max(1);
64 self
65 }
66
67 fn tile_url(&self, id: &TileId) -> String {
68 self.url_template
69 .replace("{z}", &id.zoom.to_string())
70 .replace("{x}", &id.x.to_string())
71 .replace("{y}", &id.y.to_string())
72 }
73
74 fn parse_quantized_mesh(
75 &self,
76 id: TileId,
77 bytes: &[u8],
78 ) -> Result<ElevationGrid, TerrainError> {
79 const HEADER_LEN: usize = 88;
82 if bytes.len() < HEADER_LEN + 4 {
83 return Err(TerrainError::Decode("quantized-mesh tile too short".into()));
84 }
85
86 let mut cursor = 0usize;
87
88 cursor += HEADER_LEN;
91
92 let vertex_count = read_u32_le(bytes, &mut cursor)? as usize;
93 if vertex_count == 0 {
94 return Err(TerrainError::Decode("quantized-mesh has zero vertices".into()));
95 }
96
97 let mut u = vec![0i32; vertex_count];
98 let mut v = vec![0i32; vertex_count];
99 let mut h = vec![0i32; vertex_count];
100
101 decode_delta_zigzag_stream(bytes, &mut cursor, &mut u)?;
102 decode_delta_zigzag_stream(bytes, &mut cursor, &mut v)?;
103 decode_delta_zigzag_stream(bytes, &mut cursor, &mut h)?;
104
105 let min_h = read_f32_le_at(bytes, 24)?;
108 let max_h = read_f32_le_at(bytes, 28)?;
109 let height_span = max_h - min_h;
110
111 let vertices: Vec<(f32, f32, f32)> = (0..vertex_count)
112 .map(|i| {
113 let uu = (u[i].clamp(0, 32767) as f32) / 32767.0;
114 let vv = (v[i].clamp(0, 32767) as f32) / 32767.0;
115 let hh = (h[i].clamp(0, 32767) as f32) / 32767.0;
116 let elev = min_h + hh * height_span;
117 (uu, vv, elev)
118 })
119 .collect();
120
121 let n = self.grid_size;
123 let mut data = Vec::with_capacity((n * n) as usize);
124 for gy in 0..n {
125 let tv = gy as f32 / (n - 1) as f32;
126 for gx in 0..n {
127 let tu = gx as f32 / (n - 1) as f32;
128 let mut best_d2 = f32::MAX;
129 let mut best_h = 0.0f32;
130 for &(uu, vv, hh) in &vertices {
131 let du = uu - tu;
132 let dv = vv - tv;
133 let d2 = du * du + dv * dv;
134 if d2 < best_d2 {
135 best_d2 = d2;
136 best_h = hh;
137 }
138 }
139 data.push(best_h);
140 }
141 }
142
143 ElevationGrid::from_data(id, n, n, data)
144 .ok_or_else(|| TerrainError::Decode("failed to build elevation grid".into()))
145 }
146
147 fn flush_queued(&self) {
148 let mut pending = match self.pending.lock() {
149 Ok(p) => p,
150 Err(_) => return,
151 };
152 let mut queued = match self.queued.lock() {
153 Ok(q) => q,
154 Err(_) => return,
155 };
156
157 while pending.len() < self.max_concurrent {
158 let Some((url, id)) = queued.pop_front() else {
159 break;
160 };
161 pending.insert(url.clone(), id);
162 self.client.send(HttpRequest::get(url));
163 }
164 }
165}
166
167impl ElevationSource for QuantizedMeshSource {
168 fn request(&self, id: TileId) {
169 let url = self.tile_url(&id);
170 let mut pending = match self.pending.lock() {
171 Ok(p) => p,
172 Err(_) => return,
173 };
174 if pending.values().any(|existing| *existing == id) {
175 return;
176 }
177 let mut queued = match self.queued.lock() {
178 Ok(q) => q,
179 Err(_) => return,
180 };
181 if queued.iter().any(|(_, existing)| *existing == id) {
182 return;
183 }
184
185 if pending.len() < self.max_concurrent {
186 pending.insert(url.clone(), id);
187 drop(queued);
188 drop(pending);
189 self.client.send(HttpRequest::get(url));
190 } else {
191 queued.push_back((url, id));
192 }
193 }
194
195 fn poll(&self) -> Vec<(TileId, Result<ElevationGrid, TerrainError>)> {
196 let responses = self.client.poll();
197 if responses.is_empty() {
198 return Vec::new();
199 }
200
201 let mut pending = match self.pending.lock() {
202 Ok(p) => p,
203 Err(_) => return Vec::new(),
204 };
205
206 let mut out = Vec::with_capacity(responses.len());
207 for (url, result) in responses {
208 let id = match pending.remove(&url) {
209 Some(id) => id,
210 None => {
211 if let Ok(mut diagnostics) = self.diagnostics.lock() {
212 diagnostics.ignored_completed_responses += 1;
213 }
214 continue;
215 }
216 };
217
218 match result {
219 Ok(resp) if resp.is_success() => {
220 let parsed = self.parse_quantized_mesh(id, &resp.body);
221 if let Err(err) = &parsed {
222 if let Ok(mut diagnostics) = self.diagnostics.lock() {
223 match err {
224 TerrainError::Decode(_) => diagnostics.decode_failures += 1,
225 TerrainError::Network(_) => diagnostics.network_failures += 1,
226 TerrainError::UnsupportedFormat(_) => {
227 diagnostics.unsupported_format_failures += 1
228 }
229 TerrainError::Other(_) => diagnostics.other_failures += 1,
230 }
231 }
232 }
233 out.push((id, parsed));
234 }
235 Ok(resp) => {
236 if let Ok(mut diagnostics) = self.diagnostics.lock() {
237 diagnostics.network_failures += 1;
238 }
239 out.push((
240 id,
241 Err(TerrainError::Network(format!("HTTP {}", resp.status))),
242 ));
243 }
244 Err(e) => {
245 if let Ok(mut diagnostics) = self.diagnostics.lock() {
246 diagnostics.network_failures += 1;
247 }
248 out.push((id, Err(TerrainError::Network(e))))
249 }
250 }
251 }
252 drop(pending);
253 self.flush_queued();
254 out
255 }
256
257 fn diagnostics(&self) -> Option<ElevationSourceDiagnostics> {
258 let pending = self.pending.lock().map(|p| p.len()).unwrap_or(0);
259 let queued = self.queued.lock().map(|q| q.len()).unwrap_or(0);
260 let failures = self
261 .diagnostics
262 .lock()
263 .map(|d| d.clone())
264 .unwrap_or_default();
265 Some(ElevationSourceDiagnostics {
266 queued_requests: queued,
267 in_flight_requests: pending,
268 max_concurrent_requests: self.max_concurrent,
269 known_requests: queued + pending,
270 cancelled_in_flight_requests: 0,
271 failure_diagnostics: failures,
272 })
273 }
274
275 fn cancel(&self, id: TileId) -> bool {
276 if let Ok(mut queued) = self.queued.lock() {
277 let before = queued.len();
278 queued.retain(|(_, queued_id)| *queued_id != id);
279 return queued.len() != before;
280 }
281 false
282 }
283}
284
285fn read_u16_le(bytes: &[u8], cursor: &mut usize) -> Result<u16, TerrainError> {
286 if *cursor + 2 > bytes.len() {
287 return Err(TerrainError::Decode("unexpected EOF (u16)".into()));
288 }
289 let v = u16::from_le_bytes([bytes[*cursor], bytes[*cursor + 1]]);
290 *cursor += 2;
291 Ok(v)
292}
293
294fn read_u32_le(bytes: &[u8], cursor: &mut usize) -> Result<u32, TerrainError> {
295 if *cursor + 4 > bytes.len() {
296 return Err(TerrainError::Decode("unexpected EOF (u32)".into()));
297 }
298 let v = u32::from_le_bytes([
299 bytes[*cursor],
300 bytes[*cursor + 1],
301 bytes[*cursor + 2],
302 bytes[*cursor + 3],
303 ]);
304 *cursor += 4;
305 Ok(v)
306}
307
308fn read_f32_le_at(bytes: &[u8], offset: usize) -> Result<f32, TerrainError> {
309 if offset + 4 > bytes.len() {
310 return Err(TerrainError::Decode("unexpected EOF (f32)".into()));
311 }
312 Ok(f32::from_le_bytes([
313 bytes[offset],
314 bytes[offset + 1],
315 bytes[offset + 2],
316 bytes[offset + 3],
317 ]))
318}
319
320fn zigzag_decode(v: u16) -> i32 {
321 ((v >> 1) as i32) ^ -((v & 1) as i32)
322}
323
324fn decode_delta_zigzag_stream(
325 bytes: &[u8],
326 cursor: &mut usize,
327 out: &mut [i32],
328) -> Result<(), TerrainError> {
329 let mut acc = 0i32;
330 for item in out.iter_mut() {
331 let enc = read_u16_le(bytes, cursor)?;
332 let delta = zigzag_decode(enc);
333 acc += delta;
334 *item = acc;
335 }
336 Ok(())
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342 use crate::io::{HttpClient, HttpRequest as Req, HttpResponse};
343 use std::sync::Mutex as StdMutex;
344
345 struct MockClient {
346 sent: StdMutex<Vec<String>>,
347 responses: StdMutex<Vec<(String, Result<HttpResponse, String>)>>,
348 }
349
350 impl MockClient {
351 fn new() -> Self {
352 Self {
353 sent: StdMutex::new(Vec::new()),
354 responses: StdMutex::new(Vec::new()),
355 }
356 }
357 }
358
359 impl HttpClient for MockClient {
360 fn send(&self, request: Req) {
361 self.sent.lock().unwrap().push(request.url);
362 }
363
364 fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
365 std::mem::take(&mut *self.responses.lock().unwrap())
366 }
367 }
368
369 fn zigzag_encode(v: i32) -> u16 {
370 ((v << 1) ^ (v >> 31)) as u16
371 }
372
373 fn make_synthetic_quantized_mesh() -> Vec<u8> {
374 let mut bytes = vec![0u8; 88];
375 bytes[24..28].copy_from_slice(&0.0f32.to_le_bytes());
377 bytes[28..32].copy_from_slice(&100.0f32.to_le_bytes());
378
379 let vertex_count = 4u32;
380 bytes.extend_from_slice(&vertex_count.to_le_bytes());
381
382 let u_abs = [0i32, 32767, 0, 32767];
384 let v_abs = [0i32, 0, 32767, 32767];
385 let h_abs = [0i32, 8192, 16384, 32767];
386
387 for stream in [&u_abs, &v_abs, &h_abs] {
388 let mut prev = 0i32;
389 for &cur in stream {
390 let delta = cur - prev;
391 prev = cur;
392 bytes.extend_from_slice(&zigzag_encode(delta).to_le_bytes());
393 }
394 }
395
396 bytes
397 }
398
399 #[test]
400 fn parse_quantized_mesh_to_grid() {
401 let client = MockClient::new();
402 let source = QuantizedMeshSource::new(
403 "https://example.com/{z}/{x}/{y}.terrain",
404 Box::new(client),
405 2,
406 );
407
408 let grid = source
409 .parse_quantized_mesh(TileId::new(0, 0, 0), &make_synthetic_quantized_mesh())
410 .expect("parse");
411
412 assert_eq!(grid.width, 2);
413 assert_eq!(grid.height, 2);
414 assert_eq!(grid.data.len(), 4);
415 assert!(grid.max_elev > grid.min_elev);
416 }
417
418 #[test]
419 fn poll_returns_decoded_grid() {
420 let client = MockClient::new();
421 client.responses.lock().unwrap().push((
422 "https://example.com/0/0/0.terrain".into(),
423 Ok(HttpResponse {
424 status: 200,
425 body: make_synthetic_quantized_mesh(),
426 headers: vec![],
427 }),
428 ));
429
430 let source = QuantizedMeshSource::new(
431 "https://example.com/{z}/{x}/{y}.terrain",
432 Box::new(client),
433 2,
434 );
435
436 source.request(TileId::new(0, 0, 0));
437 let out = source.poll();
438 assert_eq!(out.len(), 1);
439 assert!(out[0].1.is_ok());
440 }
441
442 #[test]
443 fn invalid_bytes_error() {
444 let client = MockClient::new();
445 let source = QuantizedMeshSource::new(
446 "https://example.com/{z}/{x}/{y}.terrain",
447 Box::new(client),
448 2,
449 );
450 let err = source
451 .parse_quantized_mesh(TileId::new(0, 0, 0), &[1, 2, 3])
452 .unwrap_err();
453 assert!(matches!(err, TerrainError::Decode(_)));
454 }
455
456 #[test]
457 fn respects_concurrency_cap_and_queues_excess_requests() {
458 let client = MockClient::new();
459 let source = QuantizedMeshSource::new(
460 "https://example.com/{z}/{x}/{y}.terrain",
461 Box::new(client),
462 2,
463 )
464 .with_max_concurrent_requests(1);
465
466 source.request(TileId::new(1, 0, 0));
467 source.request(TileId::new(1, 0, 1));
468
469 let diagnostics = source.diagnostics().expect("terrain diagnostics");
470 assert_eq!(diagnostics.in_flight_requests, 1);
471 assert_eq!(diagnostics.queued_requests, 1);
472 assert_eq!(diagnostics.max_concurrent_requests, 1);
473 assert_eq!(diagnostics.known_requests, 2);
474 }
475
476 #[test]
477 fn cancel_removes_queued_request_only() {
478 let client = MockClient::new();
479 let source = QuantizedMeshSource::new(
480 "https://example.com/{z}/{x}/{y}.terrain",
481 Box::new(client),
482 2,
483 )
484 .with_max_concurrent_requests(1);
485
486 let a = TileId::new(1, 0, 0);
487 let b = TileId::new(1, 0, 1);
488 source.request(a);
489 source.request(b);
490
491 assert!(!source.cancel(a), "in-flight request should not cancel");
492 assert!(source.cancel(b), "queued request should cancel");
493
494 let diagnostics = source.diagnostics().expect("terrain diagnostics");
495 assert_eq!(diagnostics.in_flight_requests, 1);
496 assert_eq!(diagnostics.queued_requests, 0);
497 assert_eq!(diagnostics.known_requests, 1);
498 }
499}