use crate::io::{FetchPool, HttpClient, HttpResponse};
use crate::tile_source::{
DecodedImage, TileData, TileDecoder, TileError, TileFreshness, TileResponse, TileSource,
TileSourceDiagnostics, TileSourceFailureDiagnostics,
};
use rustial_math::TileId;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
pub const DEFAULT_RASTER_TILE_URL: &str =
"https://a.basemaps.cartocdn.com/light_all/{z}/{x}/{y}.png";
pub const DEFAULT_RASTER_TILE_USER_AGENT: &str =
"rustial-example/0.1 (+https://github.com/user/rustial25d)";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PooledRasterTileSourceConfig {
pub url_template: String,
pub headers: Vec<(String, String)>,
pub source_min_zoom: u8,
pub source_max_zoom: u8,
pub max_concurrent: usize,
pub max_cached: usize,
}
impl Default for PooledRasterTileSourceConfig {
fn default() -> Self {
Self {
url_template: DEFAULT_RASTER_TILE_URL.into(),
headers: vec![("User-Agent".into(), DEFAULT_RASTER_TILE_USER_AGENT.into())],
source_min_zoom: 0,
source_max_zoom: 19,
max_concurrent: 32,
max_cached: 768,
}
}
}
fn is_timeout_error(error: &str) -> bool {
error.to_ascii_lowercase().contains("timeout")
}
fn parse_cache_control_max_age(value: &str) -> Option<u64> {
for directive in value.split(',') {
let directive = directive.trim();
if let Some(rest) = directive.strip_prefix("max-age=") {
if let Ok(seconds) = rest.trim_matches('"').parse::<u64>() {
return Some(seconds);
}
}
}
None
}
fn parse_age_seconds(response: &HttpResponse) -> u64 {
response
.header("age")
.and_then(|value| value.parse::<u64>().ok())
.unwrap_or(0)
}
fn parse_http_freshness(response: &HttpResponse) -> TileFreshness {
let now = SystemTime::now();
let age = parse_age_seconds(response);
let expires_at = response
.header("cache-control")
.and_then(parse_cache_control_max_age)
.map(|max_age| max_age.saturating_sub(age))
.map(Duration::from_secs)
.and_then(|ttl| now.checked_add(ttl))
.or_else(|| {
response
.header("expires")
.and_then(|value| httpdate::parse_http_date(value).ok())
});
TileFreshness {
expires_at,
etag: response.header("etag").map(ToOwned::to_owned),
last_modified: response.header("last-modified").map(ToOwned::to_owned),
}
}
pub struct PooledTileSource {
url_template: String,
pool: FetchPool,
pending: Mutex<HashMap<String, TileId>>,
decoder: Option<Box<dyn TileDecoder>>,
default_headers: Vec<(String, String)>,
failure_diagnostics: Mutex<TileSourceFailureDiagnostics>,
}
impl std::fmt::Debug for PooledTileSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PooledTileSource")
.field("url_template", &self.url_template)
.field("has_decoder", &self.decoder.is_some())
.field("queued", &self.pool.queued_count())
.field("in_flight", &self.pool.in_flight_count())
.finish()
}
}
impl PooledTileSource {
pub fn new(
url_template: impl Into<String>,
client: Box<dyn HttpClient>,
max_concurrent: usize,
) -> Self {
Self {
url_template: url_template.into(),
pool: FetchPool::new(client, max_concurrent),
pending: Mutex::new(HashMap::new()),
decoder: None,
default_headers: Vec::new(),
failure_diagnostics: Mutex::new(TileSourceFailureDiagnostics::default()),
}
}
pub fn with_decoder(
url_template: impl Into<String>,
client: Box<dyn HttpClient>,
max_concurrent: usize,
decoder: Box<dyn TileDecoder>,
) -> Self {
Self {
url_template: url_template.into(),
pool: FetchPool::new(client, max_concurrent),
pending: Mutex::new(HashMap::new()),
decoder: Some(decoder),
default_headers: Vec::new(),
failure_diagnostics: Mutex::new(TileSourceFailureDiagnostics::default()),
}
}
pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
self.default_headers.push((name.into(), value.into()));
self
}
fn tile_url(&self, id: &TileId) -> String {
self.url_template
.replace("{z}", &id.zoom.to_string())
.replace("{x}", &id.x.to_string())
.replace("{y}", &id.y.to_string())
}
fn tile_priority(id: &TileId) -> f64 {
id.zoom as f64
}
}
impl TileSource for PooledTileSource {
fn request(&self, id: TileId) {
let url = self.tile_url(&id);
if let Ok(mut pending) = self.pending.lock() {
pending.insert(url.clone(), id);
}
let mut request = crate::HttpRequest::get(&url);
for (name, value) in &self.default_headers {
request = request.with_header(name.clone(), value.clone());
}
self.pool.enqueue(request, Self::tile_priority(&id));
self.pool.flush();
}
fn request_revalidate(&self, id: TileId, hint: crate::tile_source::RevalidationHint) {
let url = self.tile_url(&id);
if let Ok(mut pending) = self.pending.lock() {
pending.insert(url.clone(), id);
}
let mut request = crate::HttpRequest::get(&url);
for (name, value) in &self.default_headers {
request = request.with_header(name.clone(), value.clone());
}
if let Some(etag) = &hint.etag {
request = request.with_header("If-None-Match", etag.clone());
}
if let Some(last_modified) = &hint.last_modified {
request = request.with_header("If-Modified-Since", last_modified.clone());
}
self.pool.enqueue(request, Self::tile_priority(&id));
self.pool.flush();
}
fn poll(&self) -> Vec<(TileId, Result<TileResponse, TileError>)> {
let responses = self.pool.poll();
if responses.is_empty() {
return Vec::new();
}
let mut pending = match self.pending.lock() {
Ok(p) => p,
Err(_) => return Vec::new(),
};
let mut results = Vec::with_capacity(responses.len());
for (url, response) in responses {
let tile_id = match pending.remove(&url) {
Some(id) => id,
None => {
if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
diagnostics.ignored_completed_responses += 1;
}
continue;
}
};
match response {
Ok(resp) if resp.status == 304 => {
let freshness = parse_http_freshness(&resp);
results.push((tile_id, Ok(TileResponse::not_modified(freshness))));
}
Ok(resp) if resp.is_success() => {
let freshness = parse_http_freshness(&resp);
let tile_result = if let Some(ref decoder) = self.decoder {
decoder
.decode(&resp.body)
.map(TileData::Raster)
.map(|data| TileResponse {
data,
freshness,
not_modified: false,
})
} else {
Ok(TileResponse {
data: TileData::Raster(DecodedImage {
width: 256,
height: 256,
data: Arc::new(resp.body),
}),
freshness,
not_modified: false,
})
};
if tile_result.is_err() {
if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
diagnostics.decode_failures += 1;
}
}
results.push((tile_id, tile_result));
}
Ok(resp) if resp.status == 404 => {
if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
diagnostics.not_found_failures += 1;
}
results.push((tile_id, Err(TileError::NotFound(tile_id))));
}
Ok(resp) => {
if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
diagnostics.http_status_failures += 1;
}
results.push((
tile_id,
Err(TileError::Network(format!("HTTP {}", resp.status))),
));
}
Err(err) => {
if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
diagnostics.transport_failures += 1;
if is_timeout_error(&err) {
diagnostics.timeout_failures += 1;
}
}
results.push((tile_id, Err(TileError::Network(err))));
}
}
}
results
}
fn cancel(&self, id: TileId) {
let url = self.tile_url(&id);
let removed = if let Ok(mut pending) = self.pending.lock() {
pending.remove(&url).is_some()
} else {
false
};
if removed {
if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
diagnostics.forced_cancellations += 1;
}
}
self.pool.force_cancel(&url);
}
fn diagnostics(&self) -> Option<TileSourceDiagnostics> {
let failure_diagnostics = self
.failure_diagnostics
.lock()
.map(|diagnostics| diagnostics.clone())
.unwrap_or_default();
Some(TileSourceDiagnostics {
queued_requests: self.pool.queued_count(),
in_flight_requests: self.pool.in_flight_count(),
known_requests: self.pool.known_count(),
cancelled_in_flight_requests: self.pool.cancelled_in_flight_count(),
max_concurrent_requests: self.pool.max_concurrent(),
pending_decode_tasks: 0,
failure_diagnostics,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::io::{HttpRequest as Req, HttpResponse};
use std::sync::Mutex as StdMutex;
struct FailingDecoder;
impl TileDecoder for FailingDecoder {
fn decode(&self, _bytes: &[u8]) -> Result<DecodedImage, TileError> {
Err(TileError::Decode("bad image".into()))
}
}
struct InstantMock {
sent: StdMutex<Vec<String>>,
}
impl InstantMock {
fn new() -> Self {
Self {
sent: StdMutex::new(Vec::new()),
}
}
}
impl HttpClient for InstantMock {
fn send(&self, request: Req) {
self.sent.lock().unwrap().push(request.url);
}
fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
let sent = std::mem::take(&mut *self.sent.lock().unwrap());
sent.into_iter()
.map(|url| {
(
url,
Ok(HttpResponse {
status: 200,
body: vec![0u8; 256 * 256 * 4],
headers: vec![("Cache-Control".into(), "max-age=30".into())],
}),
)
})
.collect()
}
}
#[test]
fn request_and_poll_cycle() {
let source = PooledTileSource::new(
"https://example.com/{z}/{x}/{y}.png",
Box::new(InstantMock::new()),
4,
);
source.request(TileId::new(5, 10, 20));
let results = source.poll();
assert_eq!(results.len(), 1);
assert_eq!(results[0].0, TileId::new(5, 10, 20));
assert!(results[0].1.is_ok());
assert!(results[0]
.1
.as_ref()
.unwrap()
.freshness
.expires_at
.is_some());
}
#[test]
fn cancel_removes_pending() {
let source = PooledTileSource::new(
"https://example.com/{z}/{x}/{y}.png",
Box::new(InstantMock::new()),
4,
);
source.request(TileId::new(0, 0, 0));
source.cancel(TileId::new(0, 0, 0));
let results = source.poll();
assert!(results.is_empty());
}
#[test]
fn debug_impl() {
let source = PooledTileSource::new(
"https://example.com/{z}/{x}/{y}.png",
Box::new(InstantMock::new()),
4,
);
let dbg = format!("{source:?}");
assert!(dbg.contains("PooledTileSource"));
}
#[test]
fn default_config_has_shared_raster_defaults() {
let config = PooledRasterTileSourceConfig::default();
assert_eq!(config.url_template, DEFAULT_RASTER_TILE_URL);
assert_eq!(config.source_min_zoom, 0);
assert_eq!(config.source_max_zoom, 19);
assert_eq!(config.max_concurrent, 32);
assert_eq!(config.max_cached, 768);
assert_eq!(config.headers.len(), 1);
assert_eq!(config.headers[0].0, "User-Agent");
}
struct QueueClient {
sent: StdMutex<Vec<String>>,
responses: StdMutex<Vec<(String, Result<HttpResponse, String>)>>,
}
impl QueueClient {
fn new() -> Self {
Self {
sent: StdMutex::new(Vec::new()),
responses: StdMutex::new(Vec::new()),
}
}
fn queue_response(&self, url: String, status: u16, body: Vec<u8>) {
self.responses.lock().unwrap().push((
url,
Ok(HttpResponse {
status,
body,
headers: Vec::new(),
}),
));
}
fn queue_error(&self, url: String, error: &str) {
self.responses
.lock()
.unwrap()
.push((url, Err(error.to_string())));
}
}
impl HttpClient for Arc<QueueClient> {
fn send(&self, request: Req) {
self.sent.lock().unwrap().push(request.url);
}
fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
std::mem::take(&mut *self.responses.lock().unwrap())
}
}
#[test]
fn diagnostics_count_categorized_failures() {
let client = Arc::new(QueueClient::new());
let source = PooledTileSource::with_decoder(
"https://example.com/{z}/{x}/{y}.png",
Box::new(client.clone()),
4,
Box::new(FailingDecoder),
);
let decode_tile = TileId::new(3, 0, 0);
let timeout_tile = TileId::new(3, 0, 1);
let status_tile = TileId::new(3, 0, 2);
let not_found_tile = TileId::new(3, 0, 3);
let cancelled_tile = TileId::new(3, 0, 4);
for tile_id in [
decode_tile,
timeout_tile,
status_tile,
not_found_tile,
cancelled_tile,
] {
source.request(tile_id);
}
client.queue_response(source.tile_url(&decode_tile), 200, vec![1, 2, 3]);
client.queue_error(source.tile_url(&timeout_tile), "request timeout");
client.queue_response(source.tile_url(&status_tile), 500, Vec::new());
client.queue_response(source.tile_url(¬_found_tile), 404, Vec::new());
source.cancel(cancelled_tile);
client.queue_response(source.tile_url(&cancelled_tile), 200, vec![0; 4]);
let _ = source.poll();
let diagnostics = source.diagnostics().expect("pooled source diagnostics");
assert_eq!(diagnostics.failure_diagnostics.decode_failures, 1);
assert_eq!(diagnostics.failure_diagnostics.transport_failures, 1);
assert_eq!(diagnostics.failure_diagnostics.timeout_failures, 1);
assert_eq!(diagnostics.failure_diagnostics.http_status_failures, 1);
assert_eq!(diagnostics.failure_diagnostics.not_found_failures, 1);
assert_eq!(diagnostics.failure_diagnostics.forced_cancellations, 1);
assert_eq!(
diagnostics.failure_diagnostics.ignored_completed_responses,
1
);
}
}