1use crate::io::{HttpClient, HttpRequest};
31use crate::terrain::elevation_source::{
32 ElevationSource, ElevationSourceDiagnostics, ElevationSourceFailureDiagnostics,
33 TerrainRgbEncoding,
34};
35use crate::terrain::error::TerrainError;
36use crate::tile_source::TileDecoder;
37use rustial_math::{ElevationGrid, TileId};
38use std::collections::{HashMap, VecDeque};
39use std::sync::Mutex;
40
41const DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS: usize = 32;
42
43pub struct HttpElevationSource {
45 url_template: String,
46 client: Box<dyn HttpClient>,
47 decoder: Option<Box<dyn TileDecoder>>,
48 encoding: TerrainRgbEncoding,
49 pending: Mutex<HashMap<String, TileId>>,
50 queued: Mutex<VecDeque<(String, TileId)>>,
51 max_concurrent: usize,
52 diagnostics: Mutex<ElevationSourceFailureDiagnostics>,
53}
54
55impl std::fmt::Debug for HttpElevationSource {
56 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 f.debug_struct("HttpElevationSource")
58 .field("url_template", &self.url_template)
59 .field("encoding", &self.encoding)
60 .field("has_decoder", &self.decoder.is_some())
61 .field("max_concurrent", &self.max_concurrent)
62 .finish()
63 }
64}
65
66impl HttpElevationSource {
67 pub fn new(
75 url_template: impl Into<String>,
76 client: Box<dyn HttpClient>,
77 encoding: TerrainRgbEncoding,
78 ) -> Self {
79 Self {
80 url_template: url_template.into(),
81 client,
82 decoder: None,
83 encoding,
84 pending: Mutex::new(HashMap::new()),
85 queued: Mutex::new(VecDeque::new()),
86 max_concurrent: DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS,
87 diagnostics: Mutex::new(ElevationSourceFailureDiagnostics::default()),
88 }
89 }
90
91 pub fn with_decoder(
94 url_template: impl Into<String>,
95 client: Box<dyn HttpClient>,
96 encoding: TerrainRgbEncoding,
97 decoder: Box<dyn TileDecoder>,
98 ) -> Self {
99 Self {
100 url_template: url_template.into(),
101 client,
102 decoder: Some(decoder),
103 encoding,
104 pending: Mutex::new(HashMap::new()),
105 queued: Mutex::new(VecDeque::new()),
106 max_concurrent: DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS,
107 diagnostics: Mutex::new(ElevationSourceFailureDiagnostics::default()),
108 }
109 }
110
111 pub fn with_max_concurrent_requests(mut self, max_concurrent: usize) -> Self {
113 self.max_concurrent = max_concurrent.max(1);
114 self
115 }
116
117 fn tile_url(&self, id: &TileId) -> String {
118 self.url_template
119 .replace("{z}", &id.zoom.to_string())
120 .replace("{x}", &id.x.to_string())
121 .replace("{y}", &id.y.to_string())
122 }
123
124 fn decode_terrain_rgb(
126 &self,
127 id: TileId,
128 rgba: &[u8],
129 width: u32,
130 height: u32,
131 ) -> Result<ElevationGrid, TerrainError> {
132 let expected = (width as usize) * (height as usize) * 4;
133 if rgba.len() != expected {
134 return Err(TerrainError::Decode(format!(
135 "expected {} bytes for {}x{} RGBA, got {}",
136 expected,
137 width,
138 height,
139 rgba.len()
140 )));
141 }
142
143 let mut data = Vec::with_capacity((width * height) as usize);
144 for pixel in rgba.chunks_exact(4) {
145 data.push(self.encoding.decode(pixel[0], pixel[1], pixel[2]));
146 }
147
148 ElevationGrid::from_data(id, width, height, data)
149 .ok_or_else(|| TerrainError::Decode("grid size mismatch".into()))
150 }
151
152 fn flush_queued(&self) {
153 let mut pending = match self.pending.lock() {
154 Ok(p) => p,
155 Err(_) => return,
156 };
157 let mut queued = match self.queued.lock() {
158 Ok(q) => q,
159 Err(_) => return,
160 };
161
162 while pending.len() < self.max_concurrent {
163 let Some((url, id)) = queued.pop_front() else {
164 break;
165 };
166 pending.insert(url.clone(), id);
167 self.client.send(HttpRequest::get(url));
168 }
169 }
170}
171
172impl ElevationSource for HttpElevationSource {
173 fn request(&self, id: TileId) {
174 let url = self.tile_url(&id);
175 let mut pending = match self.pending.lock() {
176 Ok(p) => p,
177 Err(_) => return,
178 };
179 if pending.values().any(|existing| *existing == id) {
180 return;
181 }
182 let mut queued = match self.queued.lock() {
183 Ok(q) => q,
184 Err(_) => return,
185 };
186 if queued.iter().any(|(_, existing)| *existing == id) {
187 return;
188 }
189
190 if pending.len() < self.max_concurrent {
191 pending.insert(url.clone(), id);
192 drop(queued);
193 drop(pending);
194 self.client.send(HttpRequest::get(url));
195 } else {
196 queued.push_back((url, id));
197 }
198 }
199
200 fn poll(&self) -> Vec<(TileId, Result<ElevationGrid, TerrainError>)> {
201 let responses = self.client.poll();
202 if responses.is_empty() {
203 return Vec::new();
204 }
205
206 let mut pending = match self.pending.lock() {
207 Ok(p) => p,
208 Err(_) => return Vec::new(),
209 };
210
211 let mut results = Vec::with_capacity(responses.len());
212
213 for (url, response) in responses {
214 let tile_id = match pending.remove(&url) {
215 Some(id) => id,
216 None => {
217 if let Ok(mut diagnostics) = self.diagnostics.lock() {
218 diagnostics.ignored_completed_responses += 1;
219 }
220 continue;
221 }
222 };
223
224 match response {
225 Ok(resp) if resp.is_success() => {
226 let decode_result = if let Some(ref decoder) = self.decoder {
227 match decoder.decode(&resp.body) {
229 Ok(img) => self.decode_terrain_rgb(
230 tile_id,
231 &img.data,
232 img.width,
233 img.height,
234 ),
235 Err(e) => Err(TerrainError::Decode(e.to_string())),
236 }
237 } else {
238 self.decode_terrain_rgb(tile_id, &resp.body, 256, 256)
240 };
241 if let Err(err) = &decode_result {
242 if let Ok(mut diagnostics) = self.diagnostics.lock() {
243 match err {
244 TerrainError::Decode(_) => diagnostics.decode_failures += 1,
245 TerrainError::Network(_) => diagnostics.network_failures += 1,
246 TerrainError::UnsupportedFormat(_) => {
247 diagnostics.unsupported_format_failures += 1
248 }
249 TerrainError::Other(_) => diagnostics.other_failures += 1,
250 }
251 }
252 }
253 results.push((tile_id, decode_result));
254 }
255 Ok(resp) => {
256 if let Ok(mut diagnostics) = self.diagnostics.lock() {
257 diagnostics.network_failures += 1;
258 }
259 results.push((
260 tile_id,
261 Err(TerrainError::Network(format!("HTTP {}", resp.status))),
262 ));
263 }
264 Err(err) => {
265 if let Ok(mut diagnostics) = self.diagnostics.lock() {
266 diagnostics.network_failures += 1;
267 }
268 results.push((tile_id, Err(TerrainError::Network(err))));
269 }
270 }
271 }
272
273 drop(pending);
274 self.flush_queued();
275
276 results
277 }
278
279 fn diagnostics(&self) -> Option<ElevationSourceDiagnostics> {
280 let pending = self.pending.lock().map(|p| p.len()).unwrap_or(0);
281 let queued = self.queued.lock().map(|q| q.len()).unwrap_or(0);
282 let failures = self
283 .diagnostics
284 .lock()
285 .map(|d| d.clone())
286 .unwrap_or_default();
287 Some(ElevationSourceDiagnostics {
288 queued_requests: queued,
289 in_flight_requests: pending,
290 max_concurrent_requests: self.max_concurrent,
291 known_requests: queued + pending,
292 cancelled_in_flight_requests: 0,
293 failure_diagnostics: failures,
294 })
295 }
296
297 fn cancel(&self, id: TileId) -> bool {
298 if let Ok(mut queued) = self.queued.lock() {
299 let before = queued.len();
300 queued.retain(|(_, queued_id)| *queued_id != id);
301 return queued.len() != before;
302 }
303 false
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310 use crate::io::{HttpRequest as Req, HttpResponse};
311 use std::sync::Mutex as StdMutex;
312
313 struct MockClient {
314 sent: StdMutex<Vec<String>>,
315 responses: StdMutex<Vec<(String, Result<HttpResponse, String>)>>,
316 }
317
318 impl MockClient {
319 fn new() -> Self {
320 Self {
321 sent: StdMutex::new(Vec::new()),
322 responses: StdMutex::new(Vec::new()),
323 }
324 }
325
326 fn queue_response(&self, url: &str, body: Vec<u8>) {
327 self.responses.lock().unwrap().push((
328 url.to_string(),
329 Ok(HttpResponse {
330 status: 200,
331 body,
332 headers: vec![],
333 }),
334 ));
335 }
336 }
337
338 impl HttpClient for MockClient {
339 fn send(&self, request: Req) {
340 self.sent.lock().unwrap().push(request.url);
341 }
342
343 fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
344 std::mem::take(&mut *self.responses.lock().unwrap())
345 }
346 }
347
348 fn terrarium_sea_level_tile() -> Vec<u8> {
349 vec![128, 0, 0, 255, 128, 0, 0, 255, 128, 0, 0, 255, 128, 0, 0, 255]
351 }
352
353 #[test]
354 fn request_and_poll() {
355 let client = MockClient::new();
356 let url = "https://example.com/0/0/0.png";
357 client.queue_response(url, terrarium_sea_level_tile());
359
360 struct SeaLevelDecoder;
362 impl crate::tile_source::TileDecoder for SeaLevelDecoder {
363 fn decode(
364 &self,
365 bytes: &[u8],
366 ) -> Result<crate::tile_source::DecodedImage, crate::tile_source::TileError> {
367 Ok(crate::tile_source::DecodedImage {
368 width: 2,
369 height: 2,
370 data: std::sync::Arc::new(bytes.to_vec()),
371 })
372 }
373 }
374
375 let source = HttpElevationSource::with_decoder(
376 "https://example.com/{z}/{x}/{y}.png",
377 Box::new(client),
378 TerrainRgbEncoding::Terrarium,
379 Box::new(SeaLevelDecoder),
380 );
381
382 source.request(TileId::new(0, 0, 0));
383 let results = source.poll();
384 assert_eq!(results.len(), 1);
385 let (id, grid_result) = &results[0];
386 assert_eq!(*id, TileId::new(0, 0, 0));
387 let grid = grid_result.as_ref().expect("should decode");
388 assert_eq!(grid.width, 2);
389 assert_eq!(grid.height, 2);
390 }
391
392 #[test]
393 fn decode_terrarium_sea_level() {
394 let client = MockClient::new();
395 let source = HttpElevationSource::new(
396 "https://example.com/{z}/{x}/{y}.png",
397 Box::new(client),
398 TerrainRgbEncoding::Terrarium,
399 );
400 let grid = source
401 .decode_terrain_rgb(TileId::new(0, 0, 0), &terrarium_sea_level_tile(), 2, 2)
402 .expect("decode");
403 assert_eq!(grid.width, 2);
404 assert_eq!(grid.height, 2);
405 assert!((grid.data[0] - 0.0).abs() < 0.01);
406 }
407
408 #[test]
409 fn http_error_maps_to_terrain_error() {
410 let client = MockClient::new();
411 client.responses.lock().unwrap().push((
412 "https://example.com/0/0/0.png".into(),
413 Ok(HttpResponse {
414 status: 404,
415 body: vec![],
416 headers: vec![],
417 }),
418 ));
419
420 let source = HttpElevationSource::new(
421 "https://example.com/{z}/{x}/{y}.png",
422 Box::new(client),
423 TerrainRgbEncoding::Terrarium,
424 );
425
426 source.request(TileId::new(0, 0, 0));
427 let results = source.poll();
428 assert_eq!(results.len(), 1);
429 assert!(results[0].1.is_err());
430 }
431
432 #[test]
433 fn debug_impl() {
434 let client = MockClient::new();
435 let source = HttpElevationSource::new(
436 "https://example.com/{z}/{x}/{y}.png",
437 Box::new(client),
438 TerrainRgbEncoding::Terrarium,
439 );
440 let dbg = format!("{source:?}");
441 assert!(dbg.contains("HttpElevationSource"));
442 }
443
444 #[test]
445 fn respects_concurrency_cap_and_queues_excess_requests() {
446 let client = MockClient::new();
447 let source = HttpElevationSource::new(
448 "https://example.com/{z}/{x}/{y}.png",
449 Box::new(client),
450 TerrainRgbEncoding::Terrarium,
451 )
452 .with_max_concurrent_requests(1);
453
454 source.request(TileId::new(1, 0, 0));
455 source.request(TileId::new(1, 0, 1));
456
457 let diagnostics = source.diagnostics().expect("terrain diagnostics");
458 assert_eq!(diagnostics.in_flight_requests, 1);
459 assert_eq!(diagnostics.queued_requests, 1);
460 assert_eq!(diagnostics.max_concurrent_requests, 1);
461 assert_eq!(diagnostics.known_requests, 2);
462 }
463
464 #[test]
465 fn cancel_removes_queued_request_only() {
466 let client = MockClient::new();
467 let source = HttpElevationSource::new(
468 "https://example.com/{z}/{x}/{y}.png",
469 Box::new(client),
470 TerrainRgbEncoding::Terrarium,
471 )
472 .with_max_concurrent_requests(1);
473
474 let a = TileId::new(1, 0, 0);
475 let b = TileId::new(1, 0, 1);
476 source.request(a);
477 source.request(b);
478
479 assert!(!source.cancel(a), "in-flight request should not cancel");
480 assert!(source.cancel(b), "queued request should cancel");
481
482 let diagnostics = source.diagnostics().expect("terrain diagnostics");
483 assert_eq!(diagnostics.in_flight_requests, 1);
484 assert_eq!(diagnostics.queued_requests, 0);
485 assert_eq!(diagnostics.known_requests, 1);
486 }
487}