Skip to main content

rustial_engine/terrain/
quantized_mesh_source.rs

1//! Quantized-mesh elevation source.
2//!
3//! Implements an HTTP-backed [`ElevationSource`] for Cesium-style
4//! quantized-mesh terrain tiles.  The parser decodes quantized
5//! `(u, v, height)` vertex streams and rasterizes them to an
6//! [`ElevationGrid`] using nearest-vertex sampling.
7
8use crate::io::{HttpClient, HttpRequest};
9use crate::terrain::elevation_source::{
10    ElevationSource, ElevationSourceDiagnostics, ElevationSourceFailureDiagnostics,
11};
12use crate::terrain::error::TerrainError;
13use rustial_math::{ElevationGrid, TileId};
14use std::collections::{HashMap, VecDeque};
15use std::sync::Mutex;
16
17const DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS: usize = 32;
18
19/// HTTP-backed quantized-mesh elevation source.
20pub struct QuantizedMeshSource {
21    url_template: String,
22    client: Box<dyn HttpClient>,
23    pending: Mutex<HashMap<String, TileId>>,
24    queued: Mutex<VecDeque<(String, TileId)>>,
25    grid_size: u32,
26    max_concurrent: usize,
27    diagnostics: Mutex<ElevationSourceFailureDiagnostics>,
28}
29
30impl std::fmt::Debug for QuantizedMeshSource {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        f.debug_struct("QuantizedMeshSource")
33            .field("url_template", &self.url_template)
34            .field("grid_size", &self.grid_size)
35            .field("max_concurrent", &self.max_concurrent)
36            .finish()
37    }
38}
39
40impl QuantizedMeshSource {
41    /// Create a quantized-mesh source.
42    ///
43    /// `grid_size` is the output elevation grid resolution per edge
44    /// (e.g. 65 for typical terrain tiles).
45    pub fn new(
46        url_template: impl Into<String>,
47        client: Box<dyn HttpClient>,
48        grid_size: u32,
49    ) -> Self {
50        Self {
51            url_template: url_template.into(),
52            client,
53            pending: Mutex::new(HashMap::new()),
54            queued: Mutex::new(VecDeque::new()),
55            grid_size: grid_size.max(2),
56            max_concurrent: DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS,
57            diagnostics: Mutex::new(ElevationSourceFailureDiagnostics::default()),
58        }
59    }
60
61    /// Override the maximum number of concurrent quantized-mesh HTTP requests.
62    pub fn with_max_concurrent_requests(mut self, max_concurrent: usize) -> Self {
63        self.max_concurrent = max_concurrent.max(1);
64        self
65    }
66
67    fn tile_url(&self, id: &TileId) -> String {
68        self.url_template
69            .replace("{z}", &id.zoom.to_string())
70            .replace("{x}", &id.x.to_string())
71            .replace("{y}", &id.y.to_string())
72    }
73
74    fn parse_quantized_mesh(
75        &self,
76        id: TileId,
77        bytes: &[u8],
78    ) -> Result<ElevationGrid, TerrainError> {
79        // Cesium quantized-mesh header length in bytes:
80        // 3*f64 + 2*f32 + 4*f64 + 3*f64 = 88
81        const HEADER_LEN: usize = 88;
82        if bytes.len() < HEADER_LEN + 4 {
83            return Err(TerrainError::Decode("quantized-mesh tile too short".into()));
84        }
85
86        let mut cursor = 0usize;
87
88        // Skip centerX/Y/Z (f64), min/max height (f32),
89        // bounding sphere (4*f64), horizon occlusion point (3*f64).
90        cursor += HEADER_LEN;
91
92        let vertex_count = read_u32_le(bytes, &mut cursor)? as usize;
93        if vertex_count == 0 {
94            return Err(TerrainError::Decode(
95                "quantized-mesh has zero vertices".into(),
96            ));
97        }
98
99        let mut u = vec![0i32; vertex_count];
100        let mut v = vec![0i32; vertex_count];
101        let mut h = vec![0i32; vertex_count];
102
103        decode_delta_zigzag_stream(bytes, &mut cursor, &mut u)?;
104        decode_delta_zigzag_stream(bytes, &mut cursor, &mut v)?;
105        decode_delta_zigzag_stream(bytes, &mut cursor, &mut h)?;
106
107        // Read min/max height from header for de-quantization.
108        // minHeight starts at byte 24 (after 3*f64).
109        let min_h = read_f32_le_at(bytes, 24)?;
110        let max_h = read_f32_le_at(bytes, 28)?;
111        let height_span = max_h - min_h;
112
113        let vertices: Vec<(f32, f32, f32)> = (0..vertex_count)
114            .map(|i| {
115                let uu = (u[i].clamp(0, 32767) as f32) / 32767.0;
116                let vv = (v[i].clamp(0, 32767) as f32) / 32767.0;
117                let hh = (h[i].clamp(0, 32767) as f32) / 32767.0;
118                let elev = min_h + hh * height_span;
119                (uu, vv, elev)
120            })
121            .collect();
122
123        // Rasterize to a regular grid by nearest-vertex lookup.
124        let n = self.grid_size;
125        let mut data = Vec::with_capacity((n * n) as usize);
126        for gy in 0..n {
127            let tv = gy as f32 / (n - 1) as f32;
128            for gx in 0..n {
129                let tu = gx as f32 / (n - 1) as f32;
130                let mut best_d2 = f32::MAX;
131                let mut best_h = 0.0f32;
132                for &(uu, vv, hh) in &vertices {
133                    let du = uu - tu;
134                    let dv = vv - tv;
135                    let d2 = du * du + dv * dv;
136                    if d2 < best_d2 {
137                        best_d2 = d2;
138                        best_h = hh;
139                    }
140                }
141                data.push(best_h);
142            }
143        }
144
145        ElevationGrid::from_data(id, n, n, data)
146            .ok_or_else(|| TerrainError::Decode("failed to build elevation grid".into()))
147    }
148
149    fn flush_queued(&self) {
150        let mut pending = match self.pending.lock() {
151            Ok(p) => p,
152            Err(_) => return,
153        };
154        let mut queued = match self.queued.lock() {
155            Ok(q) => q,
156            Err(_) => return,
157        };
158
159        while pending.len() < self.max_concurrent {
160            let Some((url, id)) = queued.pop_front() else {
161                break;
162            };
163            pending.insert(url.clone(), id);
164            self.client.send(HttpRequest::get(url));
165        }
166    }
167}
168
169impl ElevationSource for QuantizedMeshSource {
170    fn request(&self, id: TileId) {
171        let url = self.tile_url(&id);
172        let mut pending = match self.pending.lock() {
173            Ok(p) => p,
174            Err(_) => return,
175        };
176        if pending.values().any(|existing| *existing == id) {
177            return;
178        }
179        let mut queued = match self.queued.lock() {
180            Ok(q) => q,
181            Err(_) => return,
182        };
183        if queued.iter().any(|(_, existing)| *existing == id) {
184            return;
185        }
186
187        if pending.len() < self.max_concurrent {
188            pending.insert(url.clone(), id);
189            drop(queued);
190            drop(pending);
191            self.client.send(HttpRequest::get(url));
192        } else {
193            queued.push_back((url, id));
194        }
195    }
196
197    fn poll(&self) -> Vec<(TileId, Result<ElevationGrid, TerrainError>)> {
198        let responses = self.client.poll();
199        if responses.is_empty() {
200            return Vec::new();
201        }
202
203        let mut pending = match self.pending.lock() {
204            Ok(p) => p,
205            Err(_) => return Vec::new(),
206        };
207
208        let mut out = Vec::with_capacity(responses.len());
209        for (url, result) in responses {
210            let id = match pending.remove(&url) {
211                Some(id) => id,
212                None => {
213                    if let Ok(mut diagnostics) = self.diagnostics.lock() {
214                        diagnostics.ignored_completed_responses += 1;
215                    }
216                    continue;
217                }
218            };
219
220            match result {
221                Ok(resp) if resp.is_success() => {
222                    let parsed = self.parse_quantized_mesh(id, &resp.body);
223                    if let Err(err) = &parsed {
224                        if let Ok(mut diagnostics) = self.diagnostics.lock() {
225                            match err {
226                                TerrainError::Decode(_) => diagnostics.decode_failures += 1,
227                                TerrainError::Network(_) => diagnostics.network_failures += 1,
228                                TerrainError::UnsupportedFormat(_) => {
229                                    diagnostics.unsupported_format_failures += 1
230                                }
231                                TerrainError::Other(_) => diagnostics.other_failures += 1,
232                            }
233                        }
234                    }
235                    out.push((id, parsed));
236                }
237                Ok(resp) => {
238                    if let Ok(mut diagnostics) = self.diagnostics.lock() {
239                        diagnostics.network_failures += 1;
240                    }
241                    out.push((
242                        id,
243                        Err(TerrainError::Network(format!("HTTP {}", resp.status))),
244                    ));
245                }
246                Err(e) => {
247                    if let Ok(mut diagnostics) = self.diagnostics.lock() {
248                        diagnostics.network_failures += 1;
249                    }
250                    out.push((id, Err(TerrainError::Network(e))))
251                }
252            }
253        }
254        drop(pending);
255        self.flush_queued();
256        out
257    }
258
259    fn diagnostics(&self) -> Option<ElevationSourceDiagnostics> {
260        let pending = self.pending.lock().map(|p| p.len()).unwrap_or(0);
261        let queued = self.queued.lock().map(|q| q.len()).unwrap_or(0);
262        let failures = self
263            .diagnostics
264            .lock()
265            .map(|d| d.clone())
266            .unwrap_or_default();
267        Some(ElevationSourceDiagnostics {
268            queued_requests: queued,
269            in_flight_requests: pending,
270            max_concurrent_requests: self.max_concurrent,
271            known_requests: queued + pending,
272            cancelled_in_flight_requests: 0,
273            failure_diagnostics: failures,
274        })
275    }
276
277    fn cancel(&self, id: TileId) -> bool {
278        if let Ok(mut queued) = self.queued.lock() {
279            let before = queued.len();
280            queued.retain(|(_, queued_id)| *queued_id != id);
281            return queued.len() != before;
282        }
283        false
284    }
285}
286
287fn read_u16_le(bytes: &[u8], cursor: &mut usize) -> Result<u16, TerrainError> {
288    if *cursor + 2 > bytes.len() {
289        return Err(TerrainError::Decode("unexpected EOF (u16)".into()));
290    }
291    let v = u16::from_le_bytes([bytes[*cursor], bytes[*cursor + 1]]);
292    *cursor += 2;
293    Ok(v)
294}
295
296fn read_u32_le(bytes: &[u8], cursor: &mut usize) -> Result<u32, TerrainError> {
297    if *cursor + 4 > bytes.len() {
298        return Err(TerrainError::Decode("unexpected EOF (u32)".into()));
299    }
300    let v = u32::from_le_bytes([
301        bytes[*cursor],
302        bytes[*cursor + 1],
303        bytes[*cursor + 2],
304        bytes[*cursor + 3],
305    ]);
306    *cursor += 4;
307    Ok(v)
308}
309
310fn read_f32_le_at(bytes: &[u8], offset: usize) -> Result<f32, TerrainError> {
311    if offset + 4 > bytes.len() {
312        return Err(TerrainError::Decode("unexpected EOF (f32)".into()));
313    }
314    Ok(f32::from_le_bytes([
315        bytes[offset],
316        bytes[offset + 1],
317        bytes[offset + 2],
318        bytes[offset + 3],
319    ]))
320}
321
322fn zigzag_decode(v: u16) -> i32 {
323    ((v >> 1) as i32) ^ -((v & 1) as i32)
324}
325
326fn decode_delta_zigzag_stream(
327    bytes: &[u8],
328    cursor: &mut usize,
329    out: &mut [i32],
330) -> Result<(), TerrainError> {
331    let mut acc = 0i32;
332    for item in out.iter_mut() {
333        let enc = read_u16_le(bytes, cursor)?;
334        let delta = zigzag_decode(enc);
335        acc += delta;
336        *item = acc;
337    }
338    Ok(())
339}
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344    use crate::io::{HttpClient, HttpRequest as Req, HttpResponse};
345    use std::sync::Mutex as StdMutex;
346
347    struct MockClient {
348        sent: StdMutex<Vec<String>>,
349        responses: StdMutex<Vec<(String, Result<HttpResponse, String>)>>,
350    }
351
352    impl MockClient {
353        fn new() -> Self {
354            Self {
355                sent: StdMutex::new(Vec::new()),
356                responses: StdMutex::new(Vec::new()),
357            }
358        }
359    }
360
361    impl HttpClient for MockClient {
362        fn send(&self, request: Req) {
363            self.sent.lock().unwrap().push(request.url);
364        }
365
366        fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
367            std::mem::take(&mut *self.responses.lock().unwrap())
368        }
369    }
370
371    fn zigzag_encode(v: i32) -> u16 {
372        ((v << 1) ^ (v >> 31)) as u16
373    }
374
375    fn make_synthetic_quantized_mesh() -> Vec<u8> {
376        let mut bytes = vec![0u8; 88];
377        // minHeight=0.0 at byte 24, maxHeight=100.0 at byte 28.
378        bytes[24..28].copy_from_slice(&0.0f32.to_le_bytes());
379        bytes[28..32].copy_from_slice(&100.0f32.to_le_bytes());
380
381        let vertex_count = 4u32;
382        bytes.extend_from_slice(&vertex_count.to_le_bytes());
383
384        // Corners in quantized space.
385        let u_abs = [0i32, 32767, 0, 32767];
386        let v_abs = [0i32, 0, 32767, 32767];
387        let h_abs = [0i32, 8192, 16384, 32767];
388
389        for stream in [&u_abs, &v_abs, &h_abs] {
390            let mut prev = 0i32;
391            for &cur in stream {
392                let delta = cur - prev;
393                prev = cur;
394                bytes.extend_from_slice(&zigzag_encode(delta).to_le_bytes());
395            }
396        }
397
398        bytes
399    }
400
401    #[test]
402    fn parse_quantized_mesh_to_grid() {
403        let client = MockClient::new();
404        let source = QuantizedMeshSource::new(
405            "https://example.com/{z}/{x}/{y}.terrain",
406            Box::new(client),
407            2,
408        );
409
410        let grid = source
411            .parse_quantized_mesh(TileId::new(0, 0, 0), &make_synthetic_quantized_mesh())
412            .expect("parse");
413
414        assert_eq!(grid.width, 2);
415        assert_eq!(grid.height, 2);
416        assert_eq!(grid.data.len(), 4);
417        assert!(grid.max_elev > grid.min_elev);
418    }
419
420    #[test]
421    fn poll_returns_decoded_grid() {
422        let client = MockClient::new();
423        client.responses.lock().unwrap().push((
424            "https://example.com/0/0/0.terrain".into(),
425            Ok(HttpResponse {
426                status: 200,
427                body: make_synthetic_quantized_mesh(),
428                headers: vec![],
429            }),
430        ));
431
432        let source = QuantizedMeshSource::new(
433            "https://example.com/{z}/{x}/{y}.terrain",
434            Box::new(client),
435            2,
436        );
437
438        source.request(TileId::new(0, 0, 0));
439        let out = source.poll();
440        assert_eq!(out.len(), 1);
441        assert!(out[0].1.is_ok());
442    }
443
444    #[test]
445    fn invalid_bytes_error() {
446        let client = MockClient::new();
447        let source = QuantizedMeshSource::new(
448            "https://example.com/{z}/{x}/{y}.terrain",
449            Box::new(client),
450            2,
451        );
452        let err = source
453            .parse_quantized_mesh(TileId::new(0, 0, 0), &[1, 2, 3])
454            .unwrap_err();
455        assert!(matches!(err, TerrainError::Decode(_)));
456    }
457
458    #[test]
459    fn respects_concurrency_cap_and_queues_excess_requests() {
460        let client = MockClient::new();
461        let source = QuantizedMeshSource::new(
462            "https://example.com/{z}/{x}/{y}.terrain",
463            Box::new(client),
464            2,
465        )
466        .with_max_concurrent_requests(1);
467
468        source.request(TileId::new(1, 0, 0));
469        source.request(TileId::new(1, 0, 1));
470
471        let diagnostics = source.diagnostics().expect("terrain diagnostics");
472        assert_eq!(diagnostics.in_flight_requests, 1);
473        assert_eq!(diagnostics.queued_requests, 1);
474        assert_eq!(diagnostics.max_concurrent_requests, 1);
475        assert_eq!(diagnostics.known_requests, 2);
476    }
477
478    #[test]
479    fn cancel_removes_queued_request_only() {
480        let client = MockClient::new();
481        let source = QuantizedMeshSource::new(
482            "https://example.com/{z}/{x}/{y}.terrain",
483            Box::new(client),
484            2,
485        )
486        .with_max_concurrent_requests(1);
487
488        let a = TileId::new(1, 0, 0);
489        let b = TileId::new(1, 0, 1);
490        source.request(a);
491        source.request(b);
492
493        assert!(!source.cancel(a), "in-flight request should not cancel");
494        assert!(source.cancel(b), "queued request should cancel");
495
496        let diagnostics = source.diagnostics().expect("terrain diagnostics");
497        assert_eq!(diagnostics.in_flight_requests, 1);
498        assert_eq!(diagnostics.queued_requests, 0);
499        assert_eq!(diagnostics.known_requests, 1);
500    }
501}