Skip to main content

rustial_engine/terrain/
quantized_mesh_source.rs

1//! Quantized-mesh elevation source.
2//!
3//! Implements an HTTP-backed [`ElevationSource`] for Cesium-style
4//! quantized-mesh terrain tiles.  The parser decodes quantized
5//! `(u, v, height)` vertex streams and rasterizes them to an
6//! [`ElevationGrid`] using nearest-vertex sampling.
7
8use crate::io::{HttpClient, HttpRequest};
9use crate::terrain::elevation_source::{
10    ElevationSource, ElevationSourceDiagnostics, ElevationSourceFailureDiagnostics,
11};
12use crate::terrain::error::TerrainError;
13use rustial_math::{ElevationGrid, TileId};
14use std::collections::{HashMap, VecDeque};
15use std::sync::Mutex;
16
17const DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS: usize = 32;
18
19/// HTTP-backed quantized-mesh elevation source.
20pub struct QuantizedMeshSource {
21    url_template: String,
22    client: Box<dyn HttpClient>,
23    pending: Mutex<HashMap<String, TileId>>,
24    queued: Mutex<VecDeque<(String, TileId)>>,
25    grid_size: u32,
26    max_concurrent: usize,
27    diagnostics: Mutex<ElevationSourceFailureDiagnostics>,
28}
29
30impl std::fmt::Debug for QuantizedMeshSource {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        f.debug_struct("QuantizedMeshSource")
33            .field("url_template", &self.url_template)
34            .field("grid_size", &self.grid_size)
35            .field("max_concurrent", &self.max_concurrent)
36            .finish()
37    }
38}
39
40impl QuantizedMeshSource {
41    /// Create a quantized-mesh source.
42    ///
43    /// `grid_size` is the output elevation grid resolution per edge
44    /// (e.g. 65 for typical terrain tiles).
45    pub fn new(
46        url_template: impl Into<String>,
47        client: Box<dyn HttpClient>,
48        grid_size: u32,
49    ) -> Self {
50        Self {
51            url_template: url_template.into(),
52            client,
53            pending: Mutex::new(HashMap::new()),
54            queued: Mutex::new(VecDeque::new()),
55            grid_size: grid_size.max(2),
56            max_concurrent: DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS,
57            diagnostics: Mutex::new(ElevationSourceFailureDiagnostics::default()),
58        }
59    }
60
61    /// Override the maximum number of concurrent quantized-mesh HTTP requests.
62    pub fn with_max_concurrent_requests(mut self, max_concurrent: usize) -> Self {
63        self.max_concurrent = max_concurrent.max(1);
64        self
65    }
66
67    fn tile_url(&self, id: &TileId) -> String {
68        self.url_template
69            .replace("{z}", &id.zoom.to_string())
70            .replace("{x}", &id.x.to_string())
71            .replace("{y}", &id.y.to_string())
72    }
73
74    fn parse_quantized_mesh(
75        &self,
76        id: TileId,
77        bytes: &[u8],
78    ) -> Result<ElevationGrid, TerrainError> {
79        // Cesium quantized-mesh header length in bytes:
80        // 3*f64 + 2*f32 + 4*f64 + 3*f64 = 88
81        const HEADER_LEN: usize = 88;
82        if bytes.len() < HEADER_LEN + 4 {
83            return Err(TerrainError::Decode("quantized-mesh tile too short".into()));
84        }
85
86        let mut cursor = 0usize;
87
88        // Skip centerX/Y/Z (f64), min/max height (f32),
89        // bounding sphere (4*f64), horizon occlusion point (3*f64).
90        cursor += HEADER_LEN;
91
92        let vertex_count = read_u32_le(bytes, &mut cursor)? as usize;
93        if vertex_count == 0 {
94            return Err(TerrainError::Decode("quantized-mesh has zero vertices".into()));
95        }
96
97        let mut u = vec![0i32; vertex_count];
98        let mut v = vec![0i32; vertex_count];
99        let mut h = vec![0i32; vertex_count];
100
101        decode_delta_zigzag_stream(bytes, &mut cursor, &mut u)?;
102        decode_delta_zigzag_stream(bytes, &mut cursor, &mut v)?;
103        decode_delta_zigzag_stream(bytes, &mut cursor, &mut h)?;
104
105        // Read min/max height from header for de-quantization.
106        // minHeight starts at byte 24 (after 3*f64).
107        let min_h = read_f32_le_at(bytes, 24)?;
108        let max_h = read_f32_le_at(bytes, 28)?;
109        let height_span = max_h - min_h;
110
111        let vertices: Vec<(f32, f32, f32)> = (0..vertex_count)
112            .map(|i| {
113                let uu = (u[i].clamp(0, 32767) as f32) / 32767.0;
114                let vv = (v[i].clamp(0, 32767) as f32) / 32767.0;
115                let hh = (h[i].clamp(0, 32767) as f32) / 32767.0;
116                let elev = min_h + hh * height_span;
117                (uu, vv, elev)
118            })
119            .collect();
120
121        // Rasterize to a regular grid by nearest-vertex lookup.
122        let n = self.grid_size;
123        let mut data = Vec::with_capacity((n * n) as usize);
124        for gy in 0..n {
125            let tv = gy as f32 / (n - 1) as f32;
126            for gx in 0..n {
127                let tu = gx as f32 / (n - 1) as f32;
128                let mut best_d2 = f32::MAX;
129                let mut best_h = 0.0f32;
130                for &(uu, vv, hh) in &vertices {
131                    let du = uu - tu;
132                    let dv = vv - tv;
133                    let d2 = du * du + dv * dv;
134                    if d2 < best_d2 {
135                        best_d2 = d2;
136                        best_h = hh;
137                    }
138                }
139                data.push(best_h);
140            }
141        }
142
143        ElevationGrid::from_data(id, n, n, data)
144            .ok_or_else(|| TerrainError::Decode("failed to build elevation grid".into()))
145    }
146
147    fn flush_queued(&self) {
148        let mut pending = match self.pending.lock() {
149            Ok(p) => p,
150            Err(_) => return,
151        };
152        let mut queued = match self.queued.lock() {
153            Ok(q) => q,
154            Err(_) => return,
155        };
156
157        while pending.len() < self.max_concurrent {
158            let Some((url, id)) = queued.pop_front() else {
159                break;
160            };
161            pending.insert(url.clone(), id);
162            self.client.send(HttpRequest::get(url));
163        }
164    }
165}
166
167impl ElevationSource for QuantizedMeshSource {
168    fn request(&self, id: TileId) {
169        let url = self.tile_url(&id);
170        let mut pending = match self.pending.lock() {
171            Ok(p) => p,
172            Err(_) => return,
173        };
174        if pending.values().any(|existing| *existing == id) {
175            return;
176        }
177        let mut queued = match self.queued.lock() {
178            Ok(q) => q,
179            Err(_) => return,
180        };
181        if queued.iter().any(|(_, existing)| *existing == id) {
182            return;
183        }
184
185        if pending.len() < self.max_concurrent {
186            pending.insert(url.clone(), id);
187            drop(queued);
188            drop(pending);
189            self.client.send(HttpRequest::get(url));
190        } else {
191            queued.push_back((url, id));
192        }
193    }
194
195    fn poll(&self) -> Vec<(TileId, Result<ElevationGrid, TerrainError>)> {
196        let responses = self.client.poll();
197        if responses.is_empty() {
198            return Vec::new();
199        }
200
201        let mut pending = match self.pending.lock() {
202            Ok(p) => p,
203            Err(_) => return Vec::new(),
204        };
205
206        let mut out = Vec::with_capacity(responses.len());
207        for (url, result) in responses {
208            let id = match pending.remove(&url) {
209                Some(id) => id,
210                None => {
211                    if let Ok(mut diagnostics) = self.diagnostics.lock() {
212                        diagnostics.ignored_completed_responses += 1;
213                    }
214                    continue;
215                }
216            };
217
218            match result {
219                Ok(resp) if resp.is_success() => {
220                    let parsed = self.parse_quantized_mesh(id, &resp.body);
221                    if let Err(err) = &parsed {
222                        if let Ok(mut diagnostics) = self.diagnostics.lock() {
223                            match err {
224                                TerrainError::Decode(_) => diagnostics.decode_failures += 1,
225                                TerrainError::Network(_) => diagnostics.network_failures += 1,
226                                TerrainError::UnsupportedFormat(_) => {
227                                    diagnostics.unsupported_format_failures += 1
228                                }
229                                TerrainError::Other(_) => diagnostics.other_failures += 1,
230                            }
231                        }
232                    }
233                    out.push((id, parsed));
234                }
235                Ok(resp) => {
236                    if let Ok(mut diagnostics) = self.diagnostics.lock() {
237                        diagnostics.network_failures += 1;
238                    }
239                    out.push((
240                        id,
241                        Err(TerrainError::Network(format!("HTTP {}", resp.status))),
242                    ));
243                }
244                Err(e) => {
245                    if let Ok(mut diagnostics) = self.diagnostics.lock() {
246                        diagnostics.network_failures += 1;
247                    }
248                    out.push((id, Err(TerrainError::Network(e))))
249                }
250            }
251        }
252        drop(pending);
253        self.flush_queued();
254        out
255    }
256
257    fn diagnostics(&self) -> Option<ElevationSourceDiagnostics> {
258        let pending = self.pending.lock().map(|p| p.len()).unwrap_or(0);
259        let queued = self.queued.lock().map(|q| q.len()).unwrap_or(0);
260        let failures = self
261            .diagnostics
262            .lock()
263            .map(|d| d.clone())
264            .unwrap_or_default();
265        Some(ElevationSourceDiagnostics {
266            queued_requests: queued,
267            in_flight_requests: pending,
268            max_concurrent_requests: self.max_concurrent,
269            known_requests: queued + pending,
270            cancelled_in_flight_requests: 0,
271            failure_diagnostics: failures,
272        })
273    }
274
275    fn cancel(&self, id: TileId) -> bool {
276        if let Ok(mut queued) = self.queued.lock() {
277            let before = queued.len();
278            queued.retain(|(_, queued_id)| *queued_id != id);
279            return queued.len() != before;
280        }
281        false
282    }
283}
284
285fn read_u16_le(bytes: &[u8], cursor: &mut usize) -> Result<u16, TerrainError> {
286    if *cursor + 2 > bytes.len() {
287        return Err(TerrainError::Decode("unexpected EOF (u16)".into()));
288    }
289    let v = u16::from_le_bytes([bytes[*cursor], bytes[*cursor + 1]]);
290    *cursor += 2;
291    Ok(v)
292}
293
294fn read_u32_le(bytes: &[u8], cursor: &mut usize) -> Result<u32, TerrainError> {
295    if *cursor + 4 > bytes.len() {
296        return Err(TerrainError::Decode("unexpected EOF (u32)".into()));
297    }
298    let v = u32::from_le_bytes([
299        bytes[*cursor],
300        bytes[*cursor + 1],
301        bytes[*cursor + 2],
302        bytes[*cursor + 3],
303    ]);
304    *cursor += 4;
305    Ok(v)
306}
307
308fn read_f32_le_at(bytes: &[u8], offset: usize) -> Result<f32, TerrainError> {
309    if offset + 4 > bytes.len() {
310        return Err(TerrainError::Decode("unexpected EOF (f32)".into()));
311    }
312    Ok(f32::from_le_bytes([
313        bytes[offset],
314        bytes[offset + 1],
315        bytes[offset + 2],
316        bytes[offset + 3],
317    ]))
318}
319
320fn zigzag_decode(v: u16) -> i32 {
321    ((v >> 1) as i32) ^ -((v & 1) as i32)
322}
323
324fn decode_delta_zigzag_stream(
325    bytes: &[u8],
326    cursor: &mut usize,
327    out: &mut [i32],
328) -> Result<(), TerrainError> {
329    let mut acc = 0i32;
330    for item in out.iter_mut() {
331        let enc = read_u16_le(bytes, cursor)?;
332        let delta = zigzag_decode(enc);
333        acc += delta;
334        *item = acc;
335    }
336    Ok(())
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342    use crate::io::{HttpClient, HttpRequest as Req, HttpResponse};
343    use std::sync::Mutex as StdMutex;
344
345    struct MockClient {
346        sent: StdMutex<Vec<String>>,
347        responses: StdMutex<Vec<(String, Result<HttpResponse, String>)>>,
348    }
349
350    impl MockClient {
351        fn new() -> Self {
352            Self {
353                sent: StdMutex::new(Vec::new()),
354                responses: StdMutex::new(Vec::new()),
355            }
356        }
357    }
358
359    impl HttpClient for MockClient {
360        fn send(&self, request: Req) {
361            self.sent.lock().unwrap().push(request.url);
362        }
363
364        fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
365            std::mem::take(&mut *self.responses.lock().unwrap())
366        }
367    }
368
369    fn zigzag_encode(v: i32) -> u16 {
370        ((v << 1) ^ (v >> 31)) as u16
371    }
372
373    fn make_synthetic_quantized_mesh() -> Vec<u8> {
374        let mut bytes = vec![0u8; 88];
375        // minHeight=0.0 at byte 24, maxHeight=100.0 at byte 28.
376        bytes[24..28].copy_from_slice(&0.0f32.to_le_bytes());
377        bytes[28..32].copy_from_slice(&100.0f32.to_le_bytes());
378
379        let vertex_count = 4u32;
380        bytes.extend_from_slice(&vertex_count.to_le_bytes());
381
382        // Corners in quantized space.
383        let u_abs = [0i32, 32767, 0, 32767];
384        let v_abs = [0i32, 0, 32767, 32767];
385        let h_abs = [0i32, 8192, 16384, 32767];
386
387        for stream in [&u_abs, &v_abs, &h_abs] {
388            let mut prev = 0i32;
389            for &cur in stream {
390                let delta = cur - prev;
391                prev = cur;
392                bytes.extend_from_slice(&zigzag_encode(delta).to_le_bytes());
393            }
394        }
395
396        bytes
397    }
398
399    #[test]
400    fn parse_quantized_mesh_to_grid() {
401        let client = MockClient::new();
402        let source = QuantizedMeshSource::new(
403            "https://example.com/{z}/{x}/{y}.terrain",
404            Box::new(client),
405            2,
406        );
407
408        let grid = source
409            .parse_quantized_mesh(TileId::new(0, 0, 0), &make_synthetic_quantized_mesh())
410            .expect("parse");
411
412        assert_eq!(grid.width, 2);
413        assert_eq!(grid.height, 2);
414        assert_eq!(grid.data.len(), 4);
415        assert!(grid.max_elev > grid.min_elev);
416    }
417
418    #[test]
419    fn poll_returns_decoded_grid() {
420        let client = MockClient::new();
421        client.responses.lock().unwrap().push((
422            "https://example.com/0/0/0.terrain".into(),
423            Ok(HttpResponse {
424                status: 200,
425                body: make_synthetic_quantized_mesh(),
426                headers: vec![],
427            }),
428        ));
429
430        let source = QuantizedMeshSource::new(
431            "https://example.com/{z}/{x}/{y}.terrain",
432            Box::new(client),
433            2,
434        );
435
436        source.request(TileId::new(0, 0, 0));
437        let out = source.poll();
438        assert_eq!(out.len(), 1);
439        assert!(out[0].1.is_ok());
440    }
441
442    #[test]
443    fn invalid_bytes_error() {
444        let client = MockClient::new();
445        let source = QuantizedMeshSource::new(
446            "https://example.com/{z}/{x}/{y}.terrain",
447            Box::new(client),
448            2,
449        );
450        let err = source
451            .parse_quantized_mesh(TileId::new(0, 0, 0), &[1, 2, 3])
452            .unwrap_err();
453        assert!(matches!(err, TerrainError::Decode(_)));
454    }
455
456    #[test]
457    fn respects_concurrency_cap_and_queues_excess_requests() {
458        let client = MockClient::new();
459        let source = QuantizedMeshSource::new(
460            "https://example.com/{z}/{x}/{y}.terrain",
461            Box::new(client),
462            2,
463        )
464        .with_max_concurrent_requests(1);
465
466        source.request(TileId::new(1, 0, 0));
467        source.request(TileId::new(1, 0, 1));
468
469        let diagnostics = source.diagnostics().expect("terrain diagnostics");
470        assert_eq!(diagnostics.in_flight_requests, 1);
471        assert_eq!(diagnostics.queued_requests, 1);
472        assert_eq!(diagnostics.max_concurrent_requests, 1);
473        assert_eq!(diagnostics.known_requests, 2);
474    }
475
476    #[test]
477    fn cancel_removes_queued_request_only() {
478        let client = MockClient::new();
479        let source = QuantizedMeshSource::new(
480            "https://example.com/{z}/{x}/{y}.terrain",
481            Box::new(client),
482            2,
483        )
484        .with_max_concurrent_requests(1);
485
486        let a = TileId::new(1, 0, 0);
487        let b = TileId::new(1, 0, 1);
488        source.request(a);
489        source.request(b);
490
491        assert!(!source.cancel(a), "in-flight request should not cancel");
492        assert!(source.cancel(b), "queued request should cancel");
493
494        let diagnostics = source.diagnostics().expect("terrain diagnostics");
495        assert_eq!(diagnostics.in_flight_requests, 1);
496        assert_eq!(diagnostics.queued_requests, 0);
497        assert_eq!(diagnostics.known_requests, 1);
498    }
499}