Skip to main content

rustial_engine/terrain/
http_elevation_source.rs

1//! HTTP-backed elevation source using Terrain-RGB tiles.
2//!
3//! [`HttpElevationSource`] fetches elevation tiles over HTTP (using the
4//! abstract [`HttpClient`]) and decodes them from Terrain-RGB PNG format
5//! into [`ElevationGrid`] values.  It implements [`ElevationSource`] so
6//! it can be plugged directly into [`TerrainConfig`].
7//!
8//! # Supported encodings
9//!
10//! | Encoding | Provider |
11//! |----------|----------|
12//! | [`TerrainRgbEncoding::Terrarium`] | AWS Terrain Tiles, Terrarium |
13//! | [`TerrainRgbEncoding::Mapbox`] | Mapbox Terrain-RGB, MapTiler |
14//!
15//! # URL template
16//!
17//! The URL template uses `{z}`, `{x}`, `{y}` placeholders:
18//!
19//! ```text
20//! https://s3.amazonaws.com/elevation-tiles-prod/terrarium/{z}/{x}/{y}.png
21//! ```
22//!
23//! # Image decoding
24//!
25//! The source expects raw RGBA8 pixel data from the HTTP response
26//! (via a [`TileDecoder`](crate::TileDecoder) or pre-decoded).
27//! Each pixel's RGB channels are decoded into a height sample using
28//! the configured [`TerrainRgbEncoding`].
29
30use crate::io::{HttpClient, HttpRequest};
31use crate::terrain::elevation_source::{
32    ElevationSource, ElevationSourceDiagnostics, ElevationSourceFailureDiagnostics,
33    TerrainRgbEncoding,
34};
35use crate::terrain::error::TerrainError;
36use crate::tile_source::TileDecoder;
37use rustial_math::{ElevationGrid, TileId};
38use std::collections::{HashMap, VecDeque};
39use std::sync::Mutex;
40
41const DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS: usize = 32;
42
43/// An [`ElevationSource`] that fetches Terrain-RGB tiles over HTTP.
44pub struct HttpElevationSource {
45    url_template: String,
46    client: Box<dyn HttpClient>,
47    decoder: Option<Box<dyn TileDecoder>>,
48    encoding: TerrainRgbEncoding,
49    pending: Mutex<HashMap<String, TileId>>,
50    queued: Mutex<VecDeque<(String, TileId)>>,
51    max_concurrent: usize,
52    diagnostics: Mutex<ElevationSourceFailureDiagnostics>,
53}
54
55impl std::fmt::Debug for HttpElevationSource {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        f.debug_struct("HttpElevationSource")
58            .field("url_template", &self.url_template)
59            .field("encoding", &self.encoding)
60            .field("has_decoder", &self.decoder.is_some())
61            .field("max_concurrent", &self.max_concurrent)
62            .finish()
63    }
64}
65
66impl HttpElevationSource {
67    /// Create a new HTTP elevation source.
68    ///
69    /// # Arguments
70    ///
71    /// * `url_template` - URL with `{z}`, `{x}`, `{y}` placeholders.
72    /// * `client` - The HTTP transport provided by the host application.
73    /// * `encoding` - How to decode RGB pixels into elevation values.
74    pub fn new(
75        url_template: impl Into<String>,
76        client: Box<dyn HttpClient>,
77        encoding: TerrainRgbEncoding,
78    ) -> Self {
79        Self {
80            url_template: url_template.into(),
81            client,
82            decoder: None,
83            encoding,
84            pending: Mutex::new(HashMap::new()),
85            queued: Mutex::new(VecDeque::new()),
86            max_concurrent: DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS,
87            diagnostics: Mutex::new(ElevationSourceFailureDiagnostics::default()),
88        }
89    }
90
91    /// Create with a [`TileDecoder`] for decoding PNG/JPEG response
92    /// bodies into RGBA8 pixel data before Terrain-RGB decode.
93    pub fn with_decoder(
94        url_template: impl Into<String>,
95        client: Box<dyn HttpClient>,
96        encoding: TerrainRgbEncoding,
97        decoder: Box<dyn TileDecoder>,
98    ) -> Self {
99        Self {
100            url_template: url_template.into(),
101            client,
102            decoder: Some(decoder),
103            encoding,
104            pending: Mutex::new(HashMap::new()),
105            queued: Mutex::new(VecDeque::new()),
106            max_concurrent: DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS,
107            diagnostics: Mutex::new(ElevationSourceFailureDiagnostics::default()),
108        }
109    }
110
111    /// Override the maximum number of concurrent terrain HTTP requests.
112    pub fn with_max_concurrent_requests(mut self, max_concurrent: usize) -> Self {
113        self.max_concurrent = max_concurrent.max(1);
114        self
115    }
116
117    fn tile_url(&self, id: &TileId) -> String {
118        self.url_template
119            .replace("{z}", &id.zoom.to_string())
120            .replace("{x}", &id.x.to_string())
121            .replace("{y}", &id.y.to_string())
122    }
123
124    /// Decode RGBA8 pixel data into an [`ElevationGrid`].
125    fn decode_terrain_rgb(
126        &self,
127        id: TileId,
128        rgba: &[u8],
129        width: u32,
130        height: u32,
131    ) -> Result<ElevationGrid, TerrainError> {
132        let expected = (width as usize) * (height as usize) * 4;
133        if rgba.len() != expected {
134            return Err(TerrainError::Decode(format!(
135                "expected {} bytes for {}x{} RGBA, got {}",
136                expected,
137                width,
138                height,
139                rgba.len()
140            )));
141        }
142
143        let mut data = Vec::with_capacity((width * height) as usize);
144        for pixel in rgba.chunks_exact(4) {
145            data.push(self.encoding.decode(pixel[0], pixel[1], pixel[2]));
146        }
147
148        ElevationGrid::from_data(id, width, height, data)
149            .ok_or_else(|| TerrainError::Decode("grid size mismatch".into()))
150    }
151
152    fn flush_queued(&self) {
153        let mut pending = match self.pending.lock() {
154            Ok(p) => p,
155            Err(_) => return,
156        };
157        let mut queued = match self.queued.lock() {
158            Ok(q) => q,
159            Err(_) => return,
160        };
161
162        while pending.len() < self.max_concurrent {
163            let Some((url, id)) = queued.pop_front() else {
164                break;
165            };
166            pending.insert(url.clone(), id);
167            self.client.send(HttpRequest::get(url));
168        }
169    }
170}
171
172impl ElevationSource for HttpElevationSource {
173    fn request(&self, id: TileId) {
174        let url = self.tile_url(&id);
175        let mut pending = match self.pending.lock() {
176            Ok(p) => p,
177            Err(_) => return,
178        };
179        if pending.values().any(|existing| *existing == id) {
180            return;
181        }
182        let mut queued = match self.queued.lock() {
183            Ok(q) => q,
184            Err(_) => return,
185        };
186        if queued.iter().any(|(_, existing)| *existing == id) {
187            return;
188        }
189
190        if pending.len() < self.max_concurrent {
191            pending.insert(url.clone(), id);
192            drop(queued);
193            drop(pending);
194            self.client.send(HttpRequest::get(url));
195        } else {
196            queued.push_back((url, id));
197        }
198    }
199
200    fn poll(&self) -> Vec<(TileId, Result<ElevationGrid, TerrainError>)> {
201        let responses = self.client.poll();
202        if responses.is_empty() {
203            return Vec::new();
204        }
205
206        let mut pending = match self.pending.lock() {
207            Ok(p) => p,
208            Err(_) => return Vec::new(),
209        };
210
211        let mut results = Vec::with_capacity(responses.len());
212
213        for (url, response) in responses {
214            let tile_id = match pending.remove(&url) {
215                Some(id) => id,
216                None => {
217                    if let Ok(mut diagnostics) = self.diagnostics.lock() {
218                        diagnostics.ignored_completed_responses += 1;
219                    }
220                    continue;
221                }
222            };
223
224            match response {
225                Ok(resp) if resp.is_success() => {
226                    let decode_result = if let Some(ref decoder) = self.decoder {
227                        // Decode PNG/JPEG to RGBA8, then terrain-RGB decode.
228                        match decoder.decode(&resp.body) {
229                            Ok(img) => {
230                                self.decode_terrain_rgb(tile_id, &img.data, img.width, img.height)
231                            }
232                            Err(e) => Err(TerrainError::Decode(e.to_string())),
233                        }
234                    } else {
235                        // Assume raw RGBA8 256x256.
236                        self.decode_terrain_rgb(tile_id, &resp.body, 256, 256)
237                    };
238                    if let Err(err) = &decode_result {
239                        if let Ok(mut diagnostics) = self.diagnostics.lock() {
240                            match err {
241                                TerrainError::Decode(_) => diagnostics.decode_failures += 1,
242                                TerrainError::Network(_) => diagnostics.network_failures += 1,
243                                TerrainError::UnsupportedFormat(_) => {
244                                    diagnostics.unsupported_format_failures += 1
245                                }
246                                TerrainError::Other(_) => diagnostics.other_failures += 1,
247                            }
248                        }
249                    }
250                    results.push((tile_id, decode_result));
251                }
252                Ok(resp) => {
253                    if let Ok(mut diagnostics) = self.diagnostics.lock() {
254                        diagnostics.network_failures += 1;
255                    }
256                    results.push((
257                        tile_id,
258                        Err(TerrainError::Network(format!("HTTP {}", resp.status))),
259                    ));
260                }
261                Err(err) => {
262                    if let Ok(mut diagnostics) = self.diagnostics.lock() {
263                        diagnostics.network_failures += 1;
264                    }
265                    results.push((tile_id, Err(TerrainError::Network(err))));
266                }
267            }
268        }
269
270        drop(pending);
271        self.flush_queued();
272
273        results
274    }
275
276    fn diagnostics(&self) -> Option<ElevationSourceDiagnostics> {
277        let pending = self.pending.lock().map(|p| p.len()).unwrap_or(0);
278        let queued = self.queued.lock().map(|q| q.len()).unwrap_or(0);
279        let failures = self
280            .diagnostics
281            .lock()
282            .map(|d| d.clone())
283            .unwrap_or_default();
284        Some(ElevationSourceDiagnostics {
285            queued_requests: queued,
286            in_flight_requests: pending,
287            max_concurrent_requests: self.max_concurrent,
288            known_requests: queued + pending,
289            cancelled_in_flight_requests: 0,
290            failure_diagnostics: failures,
291        })
292    }
293
294    fn cancel(&self, id: TileId) -> bool {
295        if let Ok(mut queued) = self.queued.lock() {
296            let before = queued.len();
297            queued.retain(|(_, queued_id)| *queued_id != id);
298            return queued.len() != before;
299        }
300        false
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use super::*;
307    use crate::io::{HttpRequest as Req, HttpResponse};
308    use std::sync::Mutex as StdMutex;
309
310    struct MockClient {
311        sent: StdMutex<Vec<String>>,
312        responses: StdMutex<Vec<(String, Result<HttpResponse, String>)>>,
313    }
314
315    impl MockClient {
316        fn new() -> Self {
317            Self {
318                sent: StdMutex::new(Vec::new()),
319                responses: StdMutex::new(Vec::new()),
320            }
321        }
322
323        fn queue_response(&self, url: &str, body: Vec<u8>) {
324            self.responses.lock().unwrap().push((
325                url.to_string(),
326                Ok(HttpResponse {
327                    status: 200,
328                    body,
329                    headers: vec![],
330                }),
331            ));
332        }
333    }
334
335    impl HttpClient for MockClient {
336        fn send(&self, request: Req) {
337            self.sent.lock().unwrap().push(request.url);
338        }
339
340        fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
341            std::mem::take(&mut *self.responses.lock().unwrap())
342        }
343    }
344
345    fn terrarium_sea_level_tile() -> Vec<u8> {
346        // 2x2 RGBA tile with Terrarium sea level (R=128, G=0, B=0, A=255)
347        vec![
348            128, 0, 0, 255, 128, 0, 0, 255, 128, 0, 0, 255, 128, 0, 0, 255,
349        ]
350    }
351
352    #[test]
353    fn request_and_poll() {
354        let client = MockClient::new();
355        let url = "https://example.com/0/0/0.png";
356        // 2x2 RGBA sea-level tile
357        client.queue_response(url, terrarium_sea_level_tile());
358
359        // Use a mock decoder that returns 2x2 image from any input.
360        struct SeaLevelDecoder;
361        impl crate::tile_source::TileDecoder for SeaLevelDecoder {
362            fn decode(
363                &self,
364                bytes: &[u8],
365            ) -> Result<crate::tile_source::DecodedImage, crate::tile_source::TileError>
366            {
367                Ok(crate::tile_source::DecodedImage {
368                    width: 2,
369                    height: 2,
370                    data: std::sync::Arc::new(bytes.to_vec()),
371                })
372            }
373        }
374
375        let source = HttpElevationSource::with_decoder(
376            "https://example.com/{z}/{x}/{y}.png",
377            Box::new(client),
378            TerrainRgbEncoding::Terrarium,
379            Box::new(SeaLevelDecoder),
380        );
381
382        source.request(TileId::new(0, 0, 0));
383        let results = source.poll();
384        assert_eq!(results.len(), 1);
385        let (id, grid_result) = &results[0];
386        assert_eq!(*id, TileId::new(0, 0, 0));
387        let grid = grid_result.as_ref().expect("should decode");
388        assert_eq!(grid.width, 2);
389        assert_eq!(grid.height, 2);
390    }
391
392    #[test]
393    fn decode_terrarium_sea_level() {
394        let client = MockClient::new();
395        let source = HttpElevationSource::new(
396            "https://example.com/{z}/{x}/{y}.png",
397            Box::new(client),
398            TerrainRgbEncoding::Terrarium,
399        );
400        let grid = source
401            .decode_terrain_rgb(TileId::new(0, 0, 0), &terrarium_sea_level_tile(), 2, 2)
402            .expect("decode");
403        assert_eq!(grid.width, 2);
404        assert_eq!(grid.height, 2);
405        assert!((grid.data[0] - 0.0).abs() < 0.01);
406    }
407
408    #[test]
409    fn http_error_maps_to_terrain_error() {
410        let client = MockClient::new();
411        client.responses.lock().unwrap().push((
412            "https://example.com/0/0/0.png".into(),
413            Ok(HttpResponse {
414                status: 404,
415                body: vec![],
416                headers: vec![],
417            }),
418        ));
419
420        let source = HttpElevationSource::new(
421            "https://example.com/{z}/{x}/{y}.png",
422            Box::new(client),
423            TerrainRgbEncoding::Terrarium,
424        );
425
426        source.request(TileId::new(0, 0, 0));
427        let results = source.poll();
428        assert_eq!(results.len(), 1);
429        assert!(results[0].1.is_err());
430    }
431
432    #[test]
433    fn debug_impl() {
434        let client = MockClient::new();
435        let source = HttpElevationSource::new(
436            "https://example.com/{z}/{x}/{y}.png",
437            Box::new(client),
438            TerrainRgbEncoding::Terrarium,
439        );
440        let dbg = format!("{source:?}");
441        assert!(dbg.contains("HttpElevationSource"));
442    }
443
444    #[test]
445    fn respects_concurrency_cap_and_queues_excess_requests() {
446        let client = MockClient::new();
447        let source = HttpElevationSource::new(
448            "https://example.com/{z}/{x}/{y}.png",
449            Box::new(client),
450            TerrainRgbEncoding::Terrarium,
451        )
452        .with_max_concurrent_requests(1);
453
454        source.request(TileId::new(1, 0, 0));
455        source.request(TileId::new(1, 0, 1));
456
457        let diagnostics = source.diagnostics().expect("terrain diagnostics");
458        assert_eq!(diagnostics.in_flight_requests, 1);
459        assert_eq!(diagnostics.queued_requests, 1);
460        assert_eq!(diagnostics.max_concurrent_requests, 1);
461        assert_eq!(diagnostics.known_requests, 2);
462    }
463
464    #[test]
465    fn cancel_removes_queued_request_only() {
466        let client = MockClient::new();
467        let source = HttpElevationSource::new(
468            "https://example.com/{z}/{x}/{y}.png",
469            Box::new(client),
470            TerrainRgbEncoding::Terrarium,
471        )
472        .with_max_concurrent_requests(1);
473
474        let a = TileId::new(1, 0, 0);
475        let b = TileId::new(1, 0, 1);
476        source.request(a);
477        source.request(b);
478
479        assert!(!source.cancel(a), "in-flight request should not cancel");
480        assert!(source.cancel(b), "queued request should cancel");
481
482        let diagnostics = source.diagnostics().expect("terrain diagnostics");
483        assert_eq!(diagnostics.in_flight_requests, 1);
484        assert_eq!(diagnostics.queued_requests, 0);
485        assert_eq!(diagnostics.known_requests, 1);
486    }
487}