rustial 0.0.1

A geospatial map library for Rust
//! Built-in [`ElevationSource`] implementations backed by `reqwest` + `image`.
//!
//! Enabled by the `terrain-rgb` feature flag.
//!
//! # Provided sources
//!
//! | Source | URL template | Encoding |
//! |--------|------|----------|
//! | [`AwsTerrainSource`] | AWS Terrain Tiles (Terrarium) | [`TerrainRgbEncoding::Terrarium`] |
//! | [`MapboxTerrainSource`] | Mapbox / MapTiler (user-supplied URL) | [`TerrainRgbEncoding::Mapbox`] |
//!
//! Both sources spawn a background thread per tile request so the
//! calling thread (engine update loop) is never blocked.
//!
//! # Example
//!
//! ```rust,ignore
//! use rustial::{AwsTerrainSource, TerrainConfig};
//!
//! let config = TerrainConfig {
//!     enabled: true,
//!     source: Box::new(AwsTerrainSource::new()),
//!     ..TerrainConfig::default()
//! };
//! ```

use rustial_engine::{
    ElevationGrid, ElevationSource, ElevationSourceDiagnostics, ElevationSourceFailureDiagnostics,
    TerrainError, TerrainRgbEncoding, TileId,
};
use std::collections::{HashSet, VecDeque};
use std::sync::{Arc, Mutex};
use std::time::Duration;

const DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS: usize = 32;

// ---------------------------------------------------------------------------
// Shared internals
// ---------------------------------------------------------------------------

/// Fetch a Terrain-RGB PNG tile and decode it into an [`ElevationGrid`].
fn fetch_terrain_rgb_tile(
    id: TileId,
    url: &str,
    client: &reqwest::blocking::Client,
    encoding: TerrainRgbEncoding,
) -> Result<ElevationGrid, TerrainError> {
    let resp = client
        .get(url)
        .send()
        .map_err(|e| TerrainError::Network(format!("{e}")))?;

    if !resp.status().is_success() {
        return Err(TerrainError::Network(format!("HTTP {}", resp.status())));
    }

    let bytes = resp
        .bytes()
        .map_err(|e| TerrainError::Network(format!("{e}")))?;

    let img = image::load_from_memory(&bytes).map_err(|e| TerrainError::Decode(format!("{e}")))?;
    let rgba = img.to_rgba8();
    let (w, h) = (rgba.width(), rgba.height());

    let data: Vec<f32> = rgba
        .pixels()
        .map(|p| encoding.decode(p[0], p[1], p[2]))
        .collect();

    ElevationGrid::from_data(id, w, h, data)
        .ok_or_else(|| TerrainError::Decode("grid size mismatch".into()))
}

/// Shared plumbing for a thread-per-request terrain source.
#[allow(clippy::type_complexity)]
struct TerrainSourceInner {
    url_template: String,
    encoding: TerrainRgbEncoding,
    client: Arc<reqwest::blocking::Client>,
    results: Arc<Mutex<Vec<(TileId, Result<ElevationGrid, TerrainError>)>>>,
    in_flight: Mutex<HashSet<TileId>>,
    queued: Mutex<VecDeque<TileId>>,
    max_concurrent: usize,
    diagnostics: Mutex<ElevationSourceFailureDiagnostics>,
}

impl TerrainSourceInner {
    fn new(url_template: String, encoding: TerrainRgbEncoding) -> Self {
        let client = reqwest::blocking::Client::builder()
            .user_agent("rustial/0.1")
            .timeout(Duration::from_secs(15))
            .build()
            .unwrap_or_else(|_| reqwest::blocking::Client::new());

        Self {
            url_template,
            encoding,
            client: Arc::new(client),
            results: Arc::new(Mutex::new(Vec::new())),
            in_flight: Mutex::new(HashSet::new()),
            queued: Mutex::new(VecDeque::new()),
            max_concurrent: DEFAULT_MAX_CONCURRENT_TERRAIN_REQUESTS,
            diagnostics: Mutex::new(ElevationSourceFailureDiagnostics::default()),
        }
    }

    fn with_max_concurrent_requests(mut self, max_concurrent: usize) -> Self {
        self.max_concurrent = max_concurrent.max(1);
        self
    }

    fn tile_url(&self, id: TileId) -> String {
        self.url_template
            .replace("{z}", &id.zoom.to_string())
            .replace("{x}", &id.x.to_string())
            .replace("{y}", &id.y.to_string())
    }

    fn spawn_request(&self, id: TileId) {
        let url = self.tile_url(id);
        let client = Arc::clone(&self.client);
        let results = Arc::clone(&self.results);
        let encoding = self.encoding;

        std::thread::spawn(move || {
            let result = fetch_terrain_rgb_tile(id, &url, &client, encoding);
            if let Ok(mut r) = results.lock() {
                r.push((id, result));
            }
        });
    }

    fn flush_queued(&self) {
        let mut in_flight = match self.in_flight.lock() {
            Ok(s) => s,
            Err(_) => return,
        };
        let mut queued = match self.queued.lock() {
            Ok(q) => q,
            Err(_) => return,
        };

        while in_flight.len() < self.max_concurrent {
            let Some(id) = queued.pop_front() else {
                break;
            };
            if !in_flight.insert(id) {
                continue;
            }
            drop(queued);
            drop(in_flight);
            self.spawn_request(id);
            in_flight = match self.in_flight.lock() {
                Ok(s) => s,
                Err(_) => return,
            };
            queued = match self.queued.lock() {
                Ok(q) => q,
                Err(_) => return,
            };
        }
    }
}

impl ElevationSource for TerrainSourceInner {
    fn request(&self, id: TileId) {
        let mut in_flight = self.in_flight.lock().unwrap_or_else(|e| e.into_inner());
        if in_flight.contains(&id) {
            return;
        }
        let mut queued = self.queued.lock().unwrap_or_else(|e| e.into_inner());
        if queued.contains(&id) {
            return;
        }

        if in_flight.len() < self.max_concurrent {
            in_flight.insert(id);
            drop(queued);
            drop(in_flight);
            self.spawn_request(id);
        } else {
            queued.push_back(id);
        }
    }

    fn poll(&self) -> Vec<(TileId, Result<ElevationGrid, TerrainError>)> {
        let completed = self
            .results
            .lock()
            .map(|mut r| std::mem::take(&mut *r))
            .unwrap_or_default();

        if let Ok(mut in_flight) = self.in_flight.lock() {
            for (id, _) in &completed {
                in_flight.remove(id);
            }
        }

        self.flush_queued();

        if let Ok(mut diagnostics) = self.diagnostics.lock() {
            for (_, result) in &completed {
                if let Err(err) = result {
                    match err {
                        TerrainError::Network(_) => diagnostics.network_failures += 1,
                        TerrainError::Decode(_) => diagnostics.decode_failures += 1,
                        TerrainError::UnsupportedFormat(_) => {
                            diagnostics.unsupported_format_failures += 1
                        }
                        TerrainError::Other(_) => diagnostics.other_failures += 1,
                    }
                }
            }
        }

        completed
    }

    fn diagnostics(&self) -> Option<ElevationSourceDiagnostics> {
        let pending = self.in_flight.lock().map(|p| p.len()).unwrap_or(0);
        let queued = self.queued.lock().map(|q| q.len()).unwrap_or(0);
        let failures = self
            .diagnostics
            .lock()
            .map(|d| d.clone())
            .unwrap_or_default();
        Some(ElevationSourceDiagnostics {
            queued_requests: queued,
            in_flight_requests: pending,
            max_concurrent_requests: self.max_concurrent,
            known_requests: queued + pending,
            cancelled_in_flight_requests: 0,
            failure_diagnostics: failures,
        })
    }

    fn cancel(&self, id: TileId) -> bool {
        if let Ok(mut queued) = self.queued.lock() {
            let before = queued.len();
            queued.retain(|queued_id| *queued_id != id);
            return queued.len() != before;
        }
        false
    }
}

// ---------------------------------------------------------------------------
// AWS Terrain Tiles (Terrarium encoding)
// ---------------------------------------------------------------------------

/// Ready-to-use [`ElevationSource`] that fetches Terrain-RGB tiles from
/// the [AWS Terrain Tiles](https://registry.opendata.aws/terrain-tiles/)
/// public S3 bucket using **Terrarium** encoding.
///
/// Tiles are fetched as PNG over HTTPS on background threads.  No API
/// key is required.
///
/// # Default URL
///
/// ```text
/// https://s3.amazonaws.com/elevation-tiles-prod/terrarium/{z}/{x}/{y}.png
/// ```
///
/// # Example
///
/// ```rust,ignore
/// use rustial::{AwsTerrainSource, TerrainConfig};
///
/// let config = TerrainConfig {
///     enabled: true,
///     source: Box::new(AwsTerrainSource::new()),
///     ..TerrainConfig::default()
/// };
/// ```
pub struct AwsTerrainSource(TerrainSourceInner);

impl AwsTerrainSource {
    /// Default AWS Terrain Tiles URL template.
    pub const DEFAULT_URL: &str =
        "https://s3.amazonaws.com/elevation-tiles-prod/terrarium/{z}/{x}/{y}.png";

    /// Create a new source using the default AWS URL.
    pub fn new() -> Self {
        Self(TerrainSourceInner::new(
            Self::DEFAULT_URL.to_string(),
            TerrainRgbEncoding::Terrarium,
        ))
    }

    /// Create a source with a custom Terrarium-encoded tile URL.
    pub fn with_url(url_template: impl Into<String>) -> Self {
        Self(TerrainSourceInner::new(
            url_template.into(),
            TerrainRgbEncoding::Terrarium,
        ))
    }

    /// Override the maximum number of concurrent terrain HTTP requests.
    pub fn with_max_concurrent_requests(mut self, max_concurrent: usize) -> Self {
        self.0 = self.0.with_max_concurrent_requests(max_concurrent);
        self
    }
}

impl Default for AwsTerrainSource {
    fn default() -> Self {
        Self::new()
    }
}

impl ElevationSource for AwsTerrainSource {
    fn request(&self, id: TileId) {
        self.0.request(id);
    }

    fn poll(&self) -> Vec<(TileId, Result<ElevationGrid, TerrainError>)> {
        self.0.poll()
    }

    fn diagnostics(&self) -> Option<ElevationSourceDiagnostics> {
        self.0.diagnostics()
    }

    fn cancel(&self, id: TileId) -> bool {
        self.0.cancel(id)
    }
}

// ---------------------------------------------------------------------------
// Mapbox / MapTiler Terrain-RGB
// ---------------------------------------------------------------------------

/// Ready-to-use [`ElevationSource`] for Mapbox / MapTiler terrain tiles
/// using **Mapbox Terrain-RGB** encoding.
///
/// Requires a user-supplied URL template (which typically includes an
/// API key).
///
/// # URL template example
///
/// ```text
/// https://api.mapbox.com/v4/mapbox.terrain-rgb/{z}/{x}/{y}.pngraw?access_token=YOUR_TOKEN
/// ```
///
/// # Example
///
/// ```rust,ignore
/// use rustial::{MapboxTerrainSource, TerrainConfig};
///
/// let url = "https://api.mapbox.com/v4/mapbox.terrain-rgb/{z}/{x}/{y}.pngraw?access_token=pk.xxx";
/// let config = TerrainConfig {
///     enabled: true,
///     source: Box::new(MapboxTerrainSource::new(url)),
///     ..TerrainConfig::default()
/// };
/// ```
pub struct MapboxTerrainSource(TerrainSourceInner);

impl MapboxTerrainSource {
    /// Create a new Mapbox terrain source with the given URL template.
    pub fn new(url_template: impl Into<String>) -> Self {
        Self(TerrainSourceInner::new(
            url_template.into(),
            TerrainRgbEncoding::Mapbox,
        ))
    }

    /// Override the maximum number of concurrent terrain HTTP requests.
    pub fn with_max_concurrent_requests(mut self, max_concurrent: usize) -> Self {
        self.0 = self.0.with_max_concurrent_requests(max_concurrent);
        self
    }
}

impl ElevationSource for MapboxTerrainSource {
    fn request(&self, id: TileId) {
        self.0.request(id);
    }

    fn poll(&self) -> Vec<(TileId, Result<ElevationGrid, TerrainError>)> {
        self.0.poll()
    }

    fn diagnostics(&self) -> Option<ElevationSourceDiagnostics> {
        self.0.diagnostics()
    }

    fn cancel(&self, id: TileId) -> bool {
        self.0.cancel(id)
    }
}