Skip to main content

rustial_engine/terrain/
http_elevation_source.rs

1//! HTTP-backed elevation source using Terrain-RGB tiles.
2//!
3//! [`HttpElevationSource`] fetches elevation tiles over HTTP (using the
4//! abstract [`HttpClient`]) and decodes them from Terrain-RGB PNG format
5//! into [`ElevationGrid`] values.  It implements [`ElevationSource`] so
6//! it can be plugged directly into [`TerrainConfig`].
7//!
8//! # Supported encodings
9//!
10//! | Encoding | Provider |
11//! |----------|----------|
12//! | [`TerrainRgbEncoding::Terrarium`] | AWS Terrain Tiles, Terrarium |
13//! | [`TerrainRgbEncoding::Mapbox`] | Mapbox Terrain-RGB, MapTiler |
14//!
15//! # URL template
16//!
17//! The URL template uses `{z}`, `{x}`, `{y}` placeholders:
18//!
19//! ```text
20//! https://s3.amazonaws.com/elevation-tiles-prod/terrarium/{z}/{x}/{y}.png
21//! ```
22//!
23//! # Image decoding
24//!
25//! The source expects raw RGBA8 pixel data from the HTTP response
26//! (via a [`TileDecoder`](crate::TileDecoder) or pre-decoded).
27//! Each pixel's RGB channels are decoded into a height sample using
28//! the configured [`TerrainRgbEncoding`].
29
30use crate::io::{HttpClient, HttpRequest};
31use crate::terrain::elevation_source::{
32    ElevationSource, ElevationSourceDiagnostics, ElevationSourceFailureDiagnostics,
33    TerrainRgbEncoding,
34};
35use crate::terrain::error::TerrainError;
36use crate::tile_source::TileDecoder;
37use rustial_math::{ElevationGrid, TileId};
38use std::collections::{HashMap, VecDeque};
39use std::sync::Mutex;
40
41const DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS: usize = 32;
42
43/// An [`ElevationSource`] that fetches Terrain-RGB tiles over HTTP.
44pub struct HttpElevationSource {
45    url_template: String,
46    client: Box<dyn HttpClient>,
47    decoder: Option<Box<dyn TileDecoder>>,
48    encoding: TerrainRgbEncoding,
49    pending: Mutex<HashMap<String, TileId>>,
50    queued: Mutex<VecDeque<(String, TileId)>>,
51    max_concurrent: usize,
52    diagnostics: Mutex<ElevationSourceFailureDiagnostics>,
53}
54
55impl std::fmt::Debug for HttpElevationSource {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        f.debug_struct("HttpElevationSource")
58            .field("url_template", &self.url_template)
59            .field("encoding", &self.encoding)
60            .field("has_decoder", &self.decoder.is_some())
61            .field("max_concurrent", &self.max_concurrent)
62            .finish()
63    }
64}
65
66impl HttpElevationSource {
67    /// Create a new HTTP elevation source.
68    ///
69    /// # Arguments
70    ///
71    /// * `url_template` - URL with `{z}`, `{x}`, `{y}` placeholders.
72    /// * `client` - The HTTP transport provided by the host application.
73    /// * `encoding` - How to decode RGB pixels into elevation values.
74    pub fn new(
75        url_template: impl Into<String>,
76        client: Box<dyn HttpClient>,
77        encoding: TerrainRgbEncoding,
78    ) -> Self {
79        Self {
80            url_template: url_template.into(),
81            client,
82            decoder: None,
83            encoding,
84            pending: Mutex::new(HashMap::new()),
85            queued: Mutex::new(VecDeque::new()),
86            max_concurrent: DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS,
87            diagnostics: Mutex::new(ElevationSourceFailureDiagnostics::default()),
88        }
89    }
90
91    /// Create with a [`TileDecoder`] for decoding PNG/JPEG response
92    /// bodies into RGBA8 pixel data before Terrain-RGB decode.
93    pub fn with_decoder(
94        url_template: impl Into<String>,
95        client: Box<dyn HttpClient>,
96        encoding: TerrainRgbEncoding,
97        decoder: Box<dyn TileDecoder>,
98    ) -> Self {
99        Self {
100            url_template: url_template.into(),
101            client,
102            decoder: Some(decoder),
103            encoding,
104            pending: Mutex::new(HashMap::new()),
105            queued: Mutex::new(VecDeque::new()),
106            max_concurrent: DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS,
107            diagnostics: Mutex::new(ElevationSourceFailureDiagnostics::default()),
108        }
109    }
110
111    /// Override the maximum number of concurrent terrain HTTP requests.
112    pub fn with_max_concurrent_requests(mut self, max_concurrent: usize) -> Self {
113        self.max_concurrent = max_concurrent.max(1);
114        self
115    }
116
117    fn tile_url(&self, id: &TileId) -> String {
118        self.url_template
119            .replace("{z}", &id.zoom.to_string())
120            .replace("{x}", &id.x.to_string())
121            .replace("{y}", &id.y.to_string())
122    }
123
124    /// Decode RGBA8 pixel data into an [`ElevationGrid`].
125    fn decode_terrain_rgb(
126        &self,
127        id: TileId,
128        rgba: &[u8],
129        width: u32,
130        height: u32,
131    ) -> Result<ElevationGrid, TerrainError> {
132        let expected = (width as usize) * (height as usize) * 4;
133        if rgba.len() != expected {
134            return Err(TerrainError::Decode(format!(
135                "expected {} bytes for {}x{} RGBA, got {}",
136                expected,
137                width,
138                height,
139                rgba.len()
140            )));
141        }
142
143        let mut data = Vec::with_capacity((width * height) as usize);
144        for pixel in rgba.chunks_exact(4) {
145            data.push(self.encoding.decode(pixel[0], pixel[1], pixel[2]));
146        }
147
148        ElevationGrid::from_data(id, width, height, data)
149            .ok_or_else(|| TerrainError::Decode("grid size mismatch".into()))
150    }
151
152    fn flush_queued(&self) {
153        let mut pending = match self.pending.lock() {
154            Ok(p) => p,
155            Err(_) => return,
156        };
157        let mut queued = match self.queued.lock() {
158            Ok(q) => q,
159            Err(_) => return,
160        };
161
162        while pending.len() < self.max_concurrent {
163            let Some((url, id)) = queued.pop_front() else {
164                break;
165            };
166            pending.insert(url.clone(), id);
167            self.client.send(HttpRequest::get(url));
168        }
169    }
170}
171
172impl ElevationSource for HttpElevationSource {
173    fn request(&self, id: TileId) {
174        let url = self.tile_url(&id);
175        let mut pending = match self.pending.lock() {
176            Ok(p) => p,
177            Err(_) => return,
178        };
179        if pending.values().any(|existing| *existing == id) {
180            return;
181        }
182        let mut queued = match self.queued.lock() {
183            Ok(q) => q,
184            Err(_) => return,
185        };
186        if queued.iter().any(|(_, existing)| *existing == id) {
187            return;
188        }
189
190        if pending.len() < self.max_concurrent {
191            pending.insert(url.clone(), id);
192            drop(queued);
193            drop(pending);
194            self.client.send(HttpRequest::get(url));
195        } else {
196            queued.push_back((url, id));
197        }
198    }
199
200    fn poll(&self) -> Vec<(TileId, Result<ElevationGrid, TerrainError>)> {
201        let responses = self.client.poll();
202        if responses.is_empty() {
203            return Vec::new();
204        }
205
206        let mut pending = match self.pending.lock() {
207            Ok(p) => p,
208            Err(_) => return Vec::new(),
209        };
210
211        let mut results = Vec::with_capacity(responses.len());
212
213        for (url, response) in responses {
214            let tile_id = match pending.remove(&url) {
215                Some(id) => id,
216                None => {
217                    if let Ok(mut diagnostics) = self.diagnostics.lock() {
218                        diagnostics.ignored_completed_responses += 1;
219                    }
220                    continue;
221                }
222            };
223
224            match response {
225                Ok(resp) if resp.is_success() => {
226                    let decode_result = if let Some(ref decoder) = self.decoder {
227                        // Decode PNG/JPEG to RGBA8, then terrain-RGB decode.
228                        match decoder.decode(&resp.body) {
229                            Ok(img) => self.decode_terrain_rgb(
230                                tile_id,
231                                &img.data,
232                                img.width,
233                                img.height,
234                            ),
235                            Err(e) => Err(TerrainError::Decode(e.to_string())),
236                        }
237                    } else {
238                        // Assume raw RGBA8 256x256.
239                        self.decode_terrain_rgb(tile_id, &resp.body, 256, 256)
240                    };
241                    if let Err(err) = &decode_result {
242                        if let Ok(mut diagnostics) = self.diagnostics.lock() {
243                            match err {
244                                TerrainError::Decode(_) => diagnostics.decode_failures += 1,
245                                TerrainError::Network(_) => diagnostics.network_failures += 1,
246                                TerrainError::UnsupportedFormat(_) => {
247                                    diagnostics.unsupported_format_failures += 1
248                                }
249                                TerrainError::Other(_) => diagnostics.other_failures += 1,
250                            }
251                        }
252                    }
253                    results.push((tile_id, decode_result));
254                }
255                Ok(resp) => {
256                    if let Ok(mut diagnostics) = self.diagnostics.lock() {
257                        diagnostics.network_failures += 1;
258                    }
259                    results.push((
260                        tile_id,
261                        Err(TerrainError::Network(format!("HTTP {}", resp.status))),
262                    ));
263                }
264                Err(err) => {
265                    if let Ok(mut diagnostics) = self.diagnostics.lock() {
266                        diagnostics.network_failures += 1;
267                    }
268                    results.push((tile_id, Err(TerrainError::Network(err))));
269                }
270            }
271        }
272
273        drop(pending);
274        self.flush_queued();
275
276        results
277    }
278
279    fn diagnostics(&self) -> Option<ElevationSourceDiagnostics> {
280        let pending = self.pending.lock().map(|p| p.len()).unwrap_or(0);
281        let queued = self.queued.lock().map(|q| q.len()).unwrap_or(0);
282        let failures = self
283            .diagnostics
284            .lock()
285            .map(|d| d.clone())
286            .unwrap_or_default();
287        Some(ElevationSourceDiagnostics {
288            queued_requests: queued,
289            in_flight_requests: pending,
290            max_concurrent_requests: self.max_concurrent,
291            known_requests: queued + pending,
292            cancelled_in_flight_requests: 0,
293            failure_diagnostics: failures,
294        })
295    }
296
297    fn cancel(&self, id: TileId) -> bool {
298        if let Ok(mut queued) = self.queued.lock() {
299            let before = queued.len();
300            queued.retain(|(_, queued_id)| *queued_id != id);
301            return queued.len() != before;
302        }
303        false
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310    use crate::io::{HttpRequest as Req, HttpResponse};
311    use std::sync::Mutex as StdMutex;
312
313    struct MockClient {
314        sent: StdMutex<Vec<String>>,
315        responses: StdMutex<Vec<(String, Result<HttpResponse, String>)>>,
316    }
317
318    impl MockClient {
319        fn new() -> Self {
320            Self {
321                sent: StdMutex::new(Vec::new()),
322                responses: StdMutex::new(Vec::new()),
323            }
324        }
325
326        fn queue_response(&self, url: &str, body: Vec<u8>) {
327            self.responses.lock().unwrap().push((
328                url.to_string(),
329                Ok(HttpResponse {
330                    status: 200,
331                    body,
332                    headers: vec![],
333                }),
334            ));
335        }
336    }
337
338    impl HttpClient for MockClient {
339        fn send(&self, request: Req) {
340            self.sent.lock().unwrap().push(request.url);
341        }
342
343        fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
344            std::mem::take(&mut *self.responses.lock().unwrap())
345        }
346    }
347
348    fn terrarium_sea_level_tile() -> Vec<u8> {
349        // 2x2 RGBA tile with Terrarium sea level (R=128, G=0, B=0, A=255)
350        vec![128, 0, 0, 255, 128, 0, 0, 255, 128, 0, 0, 255, 128, 0, 0, 255]
351    }
352
353    #[test]
354    fn request_and_poll() {
355        let client = MockClient::new();
356        let url = "https://example.com/0/0/0.png";
357        // 2x2 RGBA sea-level tile
358        client.queue_response(url, terrarium_sea_level_tile());
359
360        // Use a mock decoder that returns 2x2 image from any input.
361        struct SeaLevelDecoder;
362        impl crate::tile_source::TileDecoder for SeaLevelDecoder {
363            fn decode(
364                &self,
365                bytes: &[u8],
366            ) -> Result<crate::tile_source::DecodedImage, crate::tile_source::TileError> {
367                Ok(crate::tile_source::DecodedImage {
368                    width: 2,
369                    height: 2,
370                    data: std::sync::Arc::new(bytes.to_vec()),
371                })
372            }
373        }
374
375        let source = HttpElevationSource::with_decoder(
376            "https://example.com/{z}/{x}/{y}.png",
377            Box::new(client),
378            TerrainRgbEncoding::Terrarium,
379            Box::new(SeaLevelDecoder),
380        );
381
382        source.request(TileId::new(0, 0, 0));
383        let results = source.poll();
384        assert_eq!(results.len(), 1);
385        let (id, grid_result) = &results[0];
386        assert_eq!(*id, TileId::new(0, 0, 0));
387        let grid = grid_result.as_ref().expect("should decode");
388        assert_eq!(grid.width, 2);
389        assert_eq!(grid.height, 2);
390    }
391
392    #[test]
393    fn decode_terrarium_sea_level() {
394        let client = MockClient::new();
395        let source = HttpElevationSource::new(
396            "https://example.com/{z}/{x}/{y}.png",
397            Box::new(client),
398            TerrainRgbEncoding::Terrarium,
399        );
400        let grid = source
401            .decode_terrain_rgb(TileId::new(0, 0, 0), &terrarium_sea_level_tile(), 2, 2)
402            .expect("decode");
403        assert_eq!(grid.width, 2);
404        assert_eq!(grid.height, 2);
405        assert!((grid.data[0] - 0.0).abs() < 0.01);
406    }
407
408    #[test]
409    fn http_error_maps_to_terrain_error() {
410        let client = MockClient::new();
411        client.responses.lock().unwrap().push((
412            "https://example.com/0/0/0.png".into(),
413            Ok(HttpResponse {
414                status: 404,
415                body: vec![],
416                headers: vec![],
417            }),
418        ));
419
420        let source = HttpElevationSource::new(
421            "https://example.com/{z}/{x}/{y}.png",
422            Box::new(client),
423            TerrainRgbEncoding::Terrarium,
424        );
425
426        source.request(TileId::new(0, 0, 0));
427        let results = source.poll();
428        assert_eq!(results.len(), 1);
429        assert!(results[0].1.is_err());
430    }
431
432    #[test]
433    fn debug_impl() {
434        let client = MockClient::new();
435        let source = HttpElevationSource::new(
436            "https://example.com/{z}/{x}/{y}.png",
437            Box::new(client),
438            TerrainRgbEncoding::Terrarium,
439        );
440        let dbg = format!("{source:?}");
441        assert!(dbg.contains("HttpElevationSource"));
442    }
443
444    #[test]
445    fn respects_concurrency_cap_and_queues_excess_requests() {
446        let client = MockClient::new();
447        let source = HttpElevationSource::new(
448            "https://example.com/{z}/{x}/{y}.png",
449            Box::new(client),
450            TerrainRgbEncoding::Terrarium,
451        )
452        .with_max_concurrent_requests(1);
453
454        source.request(TileId::new(1, 0, 0));
455        source.request(TileId::new(1, 0, 1));
456
457        let diagnostics = source.diagnostics().expect("terrain diagnostics");
458        assert_eq!(diagnostics.in_flight_requests, 1);
459        assert_eq!(diagnostics.queued_requests, 1);
460        assert_eq!(diagnostics.max_concurrent_requests, 1);
461        assert_eq!(diagnostics.known_requests, 2);
462    }
463
464    #[test]
465    fn cancel_removes_queued_request_only() {
466        let client = MockClient::new();
467        let source = HttpElevationSource::new(
468            "https://example.com/{z}/{x}/{y}.png",
469            Box::new(client),
470            TerrainRgbEncoding::Terrarium,
471        )
472        .with_max_concurrent_requests(1);
473
474        let a = TileId::new(1, 0, 0);
475        let b = TileId::new(1, 0, 1);
476        source.request(a);
477        source.request(b);
478
479        assert!(!source.cancel(a), "in-flight request should not cancel");
480        assert!(source.cancel(b), "queued request should cancel");
481
482        let diagnostics = source.diagnostics().expect("terrain diagnostics");
483        assert_eq!(diagnostics.in_flight_requests, 1);
484        assert_eq!(diagnostics.queued_requests, 0);
485        assert_eq!(diagnostics.known_requests, 1);
486    }
487}