use rustial_engine::{
ElevationGrid, ElevationSource, ElevationSourceDiagnostics, ElevationSourceFailureDiagnostics,
TerrainError, TerrainRgbEncoding, TileId,
};
use std::collections::{HashSet, VecDeque};
use std::sync::{Arc, Mutex};
use std::time::Duration;
const DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS: usize = 32;
fn fetch_terrain_rgb_tile(
id: TileId,
url: &str,
client: &reqwest::blocking::Client,
encoding: TerrainRgbEncoding,
) -> Result<ElevationGrid, TerrainError> {
let resp = client
.get(url)
.send()
.map_err(|e| TerrainError::Network(format!("{e}")))?;
if !resp.status().is_success() {
return Err(TerrainError::Network(format!("HTTP {}", resp.status())));
}
let bytes = resp
.bytes()
.map_err(|e| TerrainError::Network(format!("{e}")))?;
let img = image::load_from_memory(&bytes).map_err(|e| TerrainError::Decode(format!("{e}")))?;
let rgba = img.to_rgba8();
let (w, h) = (rgba.width(), rgba.height());
let data: Vec<f32> = rgba
.pixels()
.map(|p| encoding.decode(p[0], p[1], p[2]))
.collect();
ElevationGrid::from_data(id, w, h, data)
.ok_or_else(|| TerrainError::Decode("grid size mismatch".into()))
}
#[allow(clippy::type_complexity)]
struct TerrainSourceInner {
url_template: String,
encoding: TerrainRgbEncoding,
client: Arc<reqwest::blocking::Client>,
results: Arc<Mutex<Vec<(TileId, Result<ElevationGrid, TerrainError>)>>>,
in_flight: Mutex<HashSet<TileId>>,
queued: Mutex<VecDeque<TileId>>,
max_concurrent: usize,
diagnostics: Mutex<ElevationSourceFailureDiagnostics>,
}
impl TerrainSourceInner {
fn new(url_template: String, encoding: TerrainRgbEncoding) -> Self {
let client = reqwest::blocking::Client::builder()
.user_agent("rustial/0.1")
.timeout(Duration::from_secs(15))
.build()
.unwrap_or_else(|_| reqwest::blocking::Client::new());
Self {
url_template,
encoding,
client: Arc::new(client),
results: Arc::new(Mutex::new(Vec::new())),
in_flight: Mutex::new(HashSet::new()),
queued: Mutex::new(VecDeque::new()),
max_concurrent: DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS,
diagnostics: Mutex::new(ElevationSourceFailureDiagnostics::default()),
}
}
fn with_max_concurrent_requests(mut self, max_concurrent: usize) -> Self {
self.max_concurrent = max_concurrent.max(1);
self
}
fn tile_url(&self, id: TileId) -> String {
self.url_template
.replace("{z}", &id.zoom.to_string())
.replace("{x}", &id.x.to_string())
.replace("{y}", &id.y.to_string())
}
fn spawn_request(&self, id: TileId) {
let url = self.tile_url(id);
let client = Arc::clone(&self.client);
let results = Arc::clone(&self.results);
let encoding = self.encoding;
std::thread::spawn(move || {
let result = fetch_terrain_rgb_tile(id, &url, &client, encoding);
if let Ok(mut r) = results.lock() {
r.push((id, result));
}
});
}
fn flush_queued(&self) {
let mut in_flight = match self.in_flight.lock() {
Ok(s) => s,
Err(_) => return,
};
let mut queued = match self.queued.lock() {
Ok(q) => q,
Err(_) => return,
};
while in_flight.len() < self.max_concurrent {
let Some(id) = queued.pop_front() else {
break;
};
if !in_flight.insert(id) {
continue;
}
drop(queued);
drop(in_flight);
self.spawn_request(id);
in_flight = match self.in_flight.lock() {
Ok(s) => s,
Err(_) => return,
};
queued = match self.queued.lock() {
Ok(q) => q,
Err(_) => return,
};
}
}
}
impl ElevationSource for TerrainSourceInner {
fn request(&self, id: TileId) {
let mut in_flight = self.in_flight.lock().unwrap_or_else(|e| e.into_inner());
if in_flight.contains(&id) {
return;
}
let mut queued = self.queued.lock().unwrap_or_else(|e| e.into_inner());
if queued.contains(&id) {
return;
}
if in_flight.len() < self.max_concurrent {
in_flight.insert(id);
drop(queued);
drop(in_flight);
self.spawn_request(id);
} else {
queued.push_back(id);
}
}
fn poll(&self) -> Vec<(TileId, Result<ElevationGrid, TerrainError>)> {
let completed = self
.results
.lock()
.map(|mut r| std::mem::take(&mut *r))
.unwrap_or_default();
if let Ok(mut in_flight) = self.in_flight.lock() {
for (id, _) in &completed {
in_flight.remove(id);
}
}
self.flush_queued();
if let Ok(mut diagnostics) = self.diagnostics.lock() {
for (_, result) in &completed {
if let Err(err) = result {
match err {
TerrainError::Network(_) => diagnostics.network_failures += 1,
TerrainError::Decode(_) => diagnostics.decode_failures += 1,
TerrainError::UnsupportedFormat(_) => {
diagnostics.unsupported_format_failures += 1
}
TerrainError::Other(_) => diagnostics.other_failures += 1,
}
}
}
}
completed
}
fn diagnostics(&self) -> Option<ElevationSourceDiagnostics> {
let pending = self.in_flight.lock().map(|p| p.len()).unwrap_or(0);
let queued = self.queued.lock().map(|q| q.len()).unwrap_or(0);
let failures = self
.diagnostics
.lock()
.map(|d| d.clone())
.unwrap_or_default();
Some(ElevationSourceDiagnostics {
queued_requests: queued,
in_flight_requests: pending,
max_concurrent_requests: self.max_concurrent,
known_requests: queued + pending,
cancelled_in_flight_requests: 0,
failure_diagnostics: failures,
})
}
fn cancel(&self, id: TileId) -> bool {
if let Ok(mut queued) = self.queued.lock() {
let before = queued.len();
queued.retain(|queued_id| *queued_id != id);
return queued.len() != before;
}
false
}
}
pub struct AwsTerrainSource(TerrainSourceInner);
impl AwsTerrainSource {
pub const DEFAULT_URL: &str =
"https://s3.amazonaws.com/elevation-tiles-prod/terrarium/{z}/{x}/{y}.png";
pub fn new() -> Self {
Self(TerrainSourceInner::new(
Self::DEFAULT_URL.to_string(),
TerrainRgbEncoding::Terrarium,
))
}
pub fn with_url(url_template: impl Into<String>) -> Self {
Self(TerrainSourceInner::new(
url_template.into(),
TerrainRgbEncoding::Terrarium,
))
}
pub fn with_max_concurrent_requests(mut self, max_concurrent: usize) -> Self {
self.0 = self.0.with_max_concurrent_requests(max_concurrent);
self
}
}
impl Default for AwsTerrainSource {
fn default() -> Self {
Self::new()
}
}
impl ElevationSource for AwsTerrainSource {
fn request(&self, id: TileId) {
self.0.request(id);
}
fn poll(&self) -> Vec<(TileId, Result<ElevationGrid, TerrainError>)> {
self.0.poll()
}
fn diagnostics(&self) -> Option<ElevationSourceDiagnostics> {
self.0.diagnostics()
}
fn cancel(&self, id: TileId) -> bool {
self.0.cancel(id)
}
}
pub struct MapboxTerrainSource(TerrainSourceInner);
impl MapboxTerrainSource {
pub fn new(url_template: impl Into<String>) -> Self {
Self(TerrainSourceInner::new(
url_template.into(),
TerrainRgbEncoding::Mapbox,
))
}
pub fn with_max_concurrent_requests(mut self, max_concurrent: usize) -> Self {
self.0 = self.0.with_max_concurrent_requests(max_concurrent);
self
}
}
impl ElevationSource for MapboxTerrainSource {
fn request(&self, id: TileId) {
self.0.request(id);
}
fn poll(&self) -> Vec<(TileId, Result<ElevationGrid, TerrainError>)> {
self.0.poll()
}
fn diagnostics(&self) -> Option<ElevationSourceDiagnostics> {
self.0.diagnostics()
}
fn cancel(&self, id: TileId) -> bool {
self.0.cancel(id)
}
}