use crate::io::{HttpClient, HttpRequest};
use crate::terrain::elevation_source::{
ElevationSource, ElevationSourceDiagnostics, ElevationSourceFailureDiagnostics,
TerrainRgbEncoding,
};
use crate::terrain::error::TerrainError;
use crate::tile_source::TileDecoder;
use rustial_math::{ElevationGrid, TileId};
use std::collections::{HashMap, VecDeque};
use std::sync::Mutex;
const DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS: usize = 32;
pub struct HttpElevationSource {
url_template: String,
client: Box<dyn HttpClient>,
decoder: Option<Box<dyn TileDecoder>>,
encoding: TerrainRgbEncoding,
pending: Mutex<HashMap<String, TileId>>,
queued: Mutex<VecDeque<(String, TileId)>>,
max_concurrent: usize,
diagnostics: Mutex<ElevationSourceFailureDiagnostics>,
}
impl std::fmt::Debug for HttpElevationSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HttpElevationSource")
.field("url_template", &self.url_template)
.field("encoding", &self.encoding)
.field("has_decoder", &self.decoder.is_some())
.field("max_concurrent", &self.max_concurrent)
.finish()
}
}
impl HttpElevationSource {
pub fn new(
url_template: impl Into<String>,
client: Box<dyn HttpClient>,
encoding: TerrainRgbEncoding,
) -> Self {
Self {
url_template: url_template.into(),
client,
decoder: None,
encoding,
pending: Mutex::new(HashMap::new()),
queued: Mutex::new(VecDeque::new()),
max_concurrent: DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS,
diagnostics: Mutex::new(ElevationSourceFailureDiagnostics::default()),
}
}
pub fn with_decoder(
url_template: impl Into<String>,
client: Box<dyn HttpClient>,
encoding: TerrainRgbEncoding,
decoder: Box<dyn TileDecoder>,
) -> Self {
Self {
url_template: url_template.into(),
client,
decoder: Some(decoder),
encoding,
pending: Mutex::new(HashMap::new()),
queued: Mutex::new(VecDeque::new()),
max_concurrent: DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS,
diagnostics: Mutex::new(ElevationSourceFailureDiagnostics::default()),
}
}
pub fn with_max_concurrent_requests(mut self, max_concurrent: usize) -> Self {
self.max_concurrent = max_concurrent.max(1);
self
}
fn tile_url(&self, id: &TileId) -> String {
self.url_template
.replace("{z}", &id.zoom.to_string())
.replace("{x}", &id.x.to_string())
.replace("{y}", &id.y.to_string())
}
fn decode_terrain_rgb(
&self,
id: TileId,
rgba: &[u8],
width: u32,
height: u32,
) -> Result<ElevationGrid, TerrainError> {
let expected = (width as usize) * (height as usize) * 4;
if rgba.len() != expected {
return Err(TerrainError::Decode(format!(
"expected {} bytes for {}x{} RGBA, got {}",
expected,
width,
height,
rgba.len()
)));
}
let mut data = Vec::with_capacity((width * height) as usize);
for pixel in rgba.chunks_exact(4) {
data.push(self.encoding.decode(pixel[0], pixel[1], pixel[2]));
}
ElevationGrid::from_data(id, width, height, data)
.ok_or_else(|| TerrainError::Decode("grid size mismatch".into()))
}
fn flush_queued(&self) {
let mut pending = match self.pending.lock() {
Ok(p) => p,
Err(_) => return,
};
let mut queued = match self.queued.lock() {
Ok(q) => q,
Err(_) => return,
};
while pending.len() < self.max_concurrent {
let Some((url, id)) = queued.pop_front() else {
break;
};
pending.insert(url.clone(), id);
self.client.send(HttpRequest::get(url));
}
}
}
impl ElevationSource for HttpElevationSource {
fn request(&self, id: TileId) {
let url = self.tile_url(&id);
let mut pending = match self.pending.lock() {
Ok(p) => p,
Err(_) => return,
};
if pending.values().any(|existing| *existing == id) {
return;
}
let mut queued = match self.queued.lock() {
Ok(q) => q,
Err(_) => return,
};
if queued.iter().any(|(_, existing)| *existing == id) {
return;
}
if pending.len() < self.max_concurrent {
pending.insert(url.clone(), id);
drop(queued);
drop(pending);
self.client.send(HttpRequest::get(url));
} else {
queued.push_back((url, id));
}
}
fn poll(&self) -> Vec<(TileId, Result<ElevationGrid, TerrainError>)> {
let responses = self.client.poll();
if responses.is_empty() {
return Vec::new();
}
let mut pending = match self.pending.lock() {
Ok(p) => p,
Err(_) => return Vec::new(),
};
let mut results = Vec::with_capacity(responses.len());
for (url, response) in responses {
let tile_id = match pending.remove(&url) {
Some(id) => id,
None => {
if let Ok(mut diagnostics) = self.diagnostics.lock() {
diagnostics.ignored_completed_responses += 1;
}
continue;
}
};
match response {
Ok(resp) if resp.is_success() => {
let decode_result = if let Some(ref decoder) = self.decoder {
match decoder.decode(&resp.body) {
Ok(img) => {
self.decode_terrain_rgb(tile_id, &img.data, img.width, img.height)
}
Err(e) => Err(TerrainError::Decode(e.to_string())),
}
} else {
self.decode_terrain_rgb(tile_id, &resp.body, 256, 256)
};
if let Err(err) = &decode_result {
if let Ok(mut diagnostics) = self.diagnostics.lock() {
match err {
TerrainError::Decode(_) => diagnostics.decode_failures += 1,
TerrainError::Network(_) => diagnostics.network_failures += 1,
TerrainError::UnsupportedFormat(_) => {
diagnostics.unsupported_format_failures += 1
}
TerrainError::Other(_) => diagnostics.other_failures += 1,
}
}
}
results.push((tile_id, decode_result));
}
Ok(resp) => {
if let Ok(mut diagnostics) = self.diagnostics.lock() {
diagnostics.network_failures += 1;
}
results.push((
tile_id,
Err(TerrainError::Network(format!("HTTP {}", resp.status))),
));
}
Err(err) => {
if let Ok(mut diagnostics) = self.diagnostics.lock() {
diagnostics.network_failures += 1;
}
results.push((tile_id, Err(TerrainError::Network(err))));
}
}
}
drop(pending);
self.flush_queued();
results
}
fn diagnostics(&self) -> Option<ElevationSourceDiagnostics> {
let pending = self.pending.lock().map(|p| p.len()).unwrap_or(0);
let queued = self.queued.lock().map(|q| q.len()).unwrap_or(0);
let failures = self
.diagnostics
.lock()
.map(|d| d.clone())
.unwrap_or_default();
Some(ElevationSourceDiagnostics {
queued_requests: queued,
in_flight_requests: pending,
max_concurrent_requests: self.max_concurrent,
known_requests: queued + pending,
cancelled_in_flight_requests: 0,
failure_diagnostics: failures,
})
}
fn cancel(&self, id: TileId) -> bool {
if let Ok(mut queued) = self.queued.lock() {
let before = queued.len();
queued.retain(|(_, queued_id)| *queued_id != id);
return queued.len() != before;
}
false
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::io::{HttpRequest as Req, HttpResponse};
use std::sync::Mutex as StdMutex;
struct MockClient {
sent: StdMutex<Vec<String>>,
responses: StdMutex<Vec<(String, Result<HttpResponse, String>)>>,
}
impl MockClient {
fn new() -> Self {
Self {
sent: StdMutex::new(Vec::new()),
responses: StdMutex::new(Vec::new()),
}
}
fn queue_response(&self, url: &str, body: Vec<u8>) {
self.responses.lock().unwrap().push((
url.to_string(),
Ok(HttpResponse {
status: 200,
body,
headers: vec![],
}),
));
}
}
impl HttpClient for MockClient {
fn send(&self, request: Req) {
self.sent.lock().unwrap().push(request.url);
}
fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
std::mem::take(&mut *self.responses.lock().unwrap())
}
}
fn terrarium_sea_level_tile() -> Vec<u8> {
vec![
128, 0, 0, 255, 128, 0, 0, 255, 128, 0, 0, 255, 128, 0, 0, 255,
]
}
#[test]
fn request_and_poll() {
let client = MockClient::new();
let url = "https://example.com/0/0/0.png";
client.queue_response(url, terrarium_sea_level_tile());
struct SeaLevelDecoder;
impl crate::tile_source::TileDecoder for SeaLevelDecoder {
fn decode(
&self,
bytes: &[u8],
) -> Result<crate::tile_source::DecodedImage, crate::tile_source::TileError>
{
Ok(crate::tile_source::DecodedImage {
width: 2,
height: 2,
data: std::sync::Arc::new(bytes.to_vec()),
})
}
}
let source = HttpElevationSource::with_decoder(
"https://example.com/{z}/{x}/{y}.png",
Box::new(client),
TerrainRgbEncoding::Terrarium,
Box::new(SeaLevelDecoder),
);
source.request(TileId::new(0, 0, 0));
let results = source.poll();
assert_eq!(results.len(), 1);
let (id, grid_result) = &results[0];
assert_eq!(*id, TileId::new(0, 0, 0));
let grid = grid_result.as_ref().expect("should decode");
assert_eq!(grid.width, 2);
assert_eq!(grid.height, 2);
}
#[test]
fn decode_terrarium_sea_level() {
let client = MockClient::new();
let source = HttpElevationSource::new(
"https://example.com/{z}/{x}/{y}.png",
Box::new(client),
TerrainRgbEncoding::Terrarium,
);
let grid = source
.decode_terrain_rgb(TileId::new(0, 0, 0), &terrarium_sea_level_tile(), 2, 2)
.expect("decode");
assert_eq!(grid.width, 2);
assert_eq!(grid.height, 2);
assert!((grid.data[0] - 0.0).abs() < 0.01);
}
#[test]
fn http_error_maps_to_terrain_error() {
let client = MockClient::new();
client.responses.lock().unwrap().push((
"https://example.com/0/0/0.png".into(),
Ok(HttpResponse {
status: 404,
body: vec![],
headers: vec![],
}),
));
let source = HttpElevationSource::new(
"https://example.com/{z}/{x}/{y}.png",
Box::new(client),
TerrainRgbEncoding::Terrarium,
);
source.request(TileId::new(0, 0, 0));
let results = source.poll();
assert_eq!(results.len(), 1);
assert!(results[0].1.is_err());
}
#[test]
fn debug_impl() {
let client = MockClient::new();
let source = HttpElevationSource::new(
"https://example.com/{z}/{x}/{y}.png",
Box::new(client),
TerrainRgbEncoding::Terrarium,
);
let dbg = format!("{source:?}");
assert!(dbg.contains("HttpElevationSource"));
}
#[test]
fn respects_concurrency_cap_and_queues_excess_requests() {
let client = MockClient::new();
let source = HttpElevationSource::new(
"https://example.com/{z}/{x}/{y}.png",
Box::new(client),
TerrainRgbEncoding::Terrarium,
)
.with_max_concurrent_requests(1);
source.request(TileId::new(1, 0, 0));
source.request(TileId::new(1, 0, 1));
let diagnostics = source.diagnostics().expect("terrain diagnostics");
assert_eq!(diagnostics.in_flight_requests, 1);
assert_eq!(diagnostics.queued_requests, 1);
assert_eq!(diagnostics.max_concurrent_requests, 1);
assert_eq!(diagnostics.known_requests, 2);
}
#[test]
fn cancel_removes_queued_request_only() {
let client = MockClient::new();
let source = HttpElevationSource::new(
"https://example.com/{z}/{x}/{y}.png",
Box::new(client),
TerrainRgbEncoding::Terrarium,
)
.with_max_concurrent_requests(1);
let a = TileId::new(1, 0, 0);
let b = TileId::new(1, 0, 1);
source.request(a);
source.request(b);
assert!(!source.cancel(a), "in-flight request should not cancel");
assert!(source.cancel(b), "queued request should cancel");
let diagnostics = source.diagnostics().expect("terrain diagnostics");
assert_eq!(diagnostics.in_flight_requests, 1);
assert_eq!(diagnostics.queued_requests, 0);
assert_eq!(diagnostics.known_requests, 1);
}
}