use crate::io::{HttpClient, HttpRequest, HttpResponse};
use crate::tile_source::{
DecodedImage, RevalidationHint, TileData, TileDecoder, TileError, TileFreshness, TileResponse,
TileSource, TileSourceDiagnostics, TileSourceFailureDiagnostics,
};
use rustial_math::TileId;
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::{Duration, SystemTime};
fn parse_cache_control_max_age(value: &str) -> Option<u64> {
for directive in value.split(',') {
let directive = directive.trim();
if let Some(rest) = directive.strip_prefix("max-age=") {
if let Ok(seconds) = rest.trim_matches('"').parse::<u64>() {
return Some(seconds);
}
}
}
None
}
fn parse_age_seconds(response: &HttpResponse) -> u64 {
response
.header("age")
.and_then(|value| value.parse::<u64>().ok())
.unwrap_or(0)
}
fn parse_http_freshness(response: &HttpResponse) -> TileFreshness {
let now = SystemTime::now();
let age = parse_age_seconds(response);
let expires_at = response
.header("cache-control")
.and_then(parse_cache_control_max_age)
.map(|max_age| max_age.saturating_sub(age))
.map(Duration::from_secs)
.and_then(|ttl| now.checked_add(ttl))
.or_else(|| {
response
.header("expires")
.and_then(|value| httpdate::parse_http_date(value).ok())
});
TileFreshness {
expires_at,
etag: response.header("etag").map(ToOwned::to_owned),
last_modified: response.header("last-modified").map(ToOwned::to_owned),
}
}
fn is_timeout_error(error: &str) -> bool {
error.to_ascii_lowercase().contains("timeout")
}
pub struct HttpTileSource {
url_template: String,
client: Box<dyn HttpClient>,
decoder: Option<Box<dyn TileDecoder>>,
default_headers: Vec<(String, String)>,
pending: Mutex<HashMap<String, TileId>>,
failure_diagnostics: Mutex<TileSourceFailureDiagnostics>,
}
impl std::fmt::Debug for HttpTileSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let pending_count = self.pending.lock().map(|p| p.len()).unwrap_or(0);
f.debug_struct("HttpTileSource")
.field("url_template", &self.url_template)
.field("has_decoder", &self.decoder.is_some())
.field("default_headers", &self.default_headers.len())
.field("pending", &pending_count)
.finish()
}
}
impl HttpTileSource {
pub fn new(url_template: impl Into<String>, client: Box<dyn HttpClient>) -> Self {
Self {
url_template: url_template.into(),
client,
decoder: None,
default_headers: Vec::new(),
pending: Mutex::new(HashMap::new()),
failure_diagnostics: Mutex::new(TileSourceFailureDiagnostics::default()),
}
}
pub fn with_decoder(
url_template: impl Into<String>,
client: Box<dyn HttpClient>,
decoder: Box<dyn TileDecoder>,
) -> Self {
Self {
url_template: url_template.into(),
client,
decoder: Some(decoder),
default_headers: Vec::new(),
pending: Mutex::new(HashMap::new()),
failure_diagnostics: Mutex::new(TileSourceFailureDiagnostics::default()),
}
}
pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
self.default_headers.push((name.into(), value.into()));
self
}
pub fn tile_url(&self, id: &TileId) -> String {
self.url_template
.replace("{z}", &id.zoom.to_string())
.replace("{x}", &id.x.to_string())
.replace("{y}", &id.y.to_string())
}
#[inline]
pub fn url_template(&self) -> &str {
&self.url_template
}
pub fn pending_count(&self) -> usize {
self.pending.lock().map(|p| p.len()).unwrap_or(0)
}
}
impl TileSource for HttpTileSource {
fn request(&self, id: TileId) {
let url = self.tile_url(&id);
if let Ok(mut pending) = self.pending.lock() {
pending.insert(url.clone(), id);
}
let mut req = HttpRequest::get(&url);
for (name, value) in &self.default_headers {
req = req.with_header(name.clone(), value.clone());
}
self.client.send(req);
}
fn request_revalidate(&self, id: TileId, hint: RevalidationHint) {
let url = self.tile_url(&id);
if let Ok(mut pending) = self.pending.lock() {
pending.insert(url.clone(), id);
}
let mut req = HttpRequest::get(&url);
for (name, value) in &self.default_headers {
req = req.with_header(name.clone(), value.clone());
}
if let Some(etag) = &hint.etag {
req = req.with_header("If-None-Match", etag.clone());
}
if let Some(last_modified) = &hint.last_modified {
req = req.with_header("If-Modified-Since", last_modified.clone());
}
self.client.send(req);
}
fn poll(&self) -> Vec<(TileId, Result<TileResponse, TileError>)> {
let responses = self.client.poll();
if responses.is_empty() {
return Vec::new();
}
let mut pending = match self.pending.lock() {
Ok(p) => p,
Err(_) => return Vec::new(),
};
let mut results = Vec::with_capacity(responses.len());
for (url, response) in responses {
let tile_id = match pending.remove(&url) {
Some(id) => id,
None => {
if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
diagnostics.ignored_completed_responses += 1;
}
continue;
}
};
match response {
Ok(resp) if resp.status == 304 => {
let freshness = parse_http_freshness(&resp);
results.push((tile_id, Ok(TileResponse::not_modified(freshness))));
}
Ok(resp) if resp.is_success() => {
let freshness = parse_http_freshness(&resp);
let tile_result = if let Some(ref decoder) = self.decoder {
decoder
.decode(&resp.body)
.map(TileData::Raster)
.map(|data| TileResponse {
data,
freshness,
not_modified: false,
})
} else {
Ok(TileResponse {
data: TileData::Raster(DecodedImage {
width: 256,
height: 256,
data: std::sync::Arc::new(resp.body),
}),
freshness,
not_modified: false,
})
};
if tile_result.is_err() {
if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
diagnostics.decode_failures += 1;
}
}
results.push((tile_id, tile_result));
}
Ok(resp) if resp.status == 404 => {
if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
diagnostics.not_found_failures += 1;
}
results.push((tile_id, Err(TileError::NotFound(tile_id))));
}
Ok(resp) => {
if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
diagnostics.http_status_failures += 1;
}
results.push((
tile_id,
Err(TileError::Network(format!("HTTP {}", resp.status))),
));
}
Err(err) => {
if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
diagnostics.transport_failures += 1;
if is_timeout_error(&err) {
diagnostics.timeout_failures += 1;
}
}
results.push((tile_id, Err(TileError::Network(err))));
}
}
}
results
}
fn cancel(&self, id: TileId) {
if let Ok(mut pending) = self.pending.lock() {
let url = self.tile_url(&id);
if pending.remove(&url).is_some() {
if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
diagnostics.forced_cancellations += 1;
}
}
}
}
fn diagnostics(&self) -> Option<TileSourceDiagnostics> {
let failure_diagnostics = self
.failure_diagnostics
.lock()
.map(|diagnostics| diagnostics.clone())
.unwrap_or_default();
Some(TileSourceDiagnostics {
queued_requests: 0,
in_flight_requests: self.pending_count(),
known_requests: self.pending_count(),
cancelled_in_flight_requests: 0,
max_concurrent_requests: 0,
pending_decode_tasks: 0,
failure_diagnostics,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::io::HttpResponse;
use std::sync::{Arc, Mutex as StdMutex};
struct FailingDecoder;
impl TileDecoder for FailingDecoder {
fn decode(&self, _bytes: &[u8]) -> Result<DecodedImage, TileError> {
Err(TileError::Decode("bad image".into()))
}
}
struct MockHttpClientInner {
sent: StdMutex<Vec<HttpRequest>>,
responses: StdMutex<Vec<(String, Result<HttpResponse, String>)>>,
}
#[derive(Clone)]
struct MockHttpClient {
inner: Arc<MockHttpClientInner>,
}
impl MockHttpClient {
fn new() -> Self {
Self {
inner: Arc::new(MockHttpClientInner {
sent: StdMutex::new(Vec::new()),
responses: StdMutex::new(Vec::new()),
}),
}
}
fn queue_response(&self, url: &str, status: u16, body: Vec<u8>) {
self.inner.responses.lock().unwrap().push((
url.to_string(),
Ok(HttpResponse {
status,
body,
headers: vec![],
}),
));
}
fn queue_response_with_headers(
&self,
url: &str,
status: u16,
body: Vec<u8>,
headers: Vec<(String, String)>,
) {
self.inner.responses.lock().unwrap().push((
url.to_string(),
Ok(HttpResponse {
status,
body,
headers,
}),
));
}
fn queue_error(&self, url: &str, error: &str) {
self.inner
.responses
.lock()
.unwrap()
.push((url.to_string(), Err(error.to_string())));
}
fn sent_urls(&self) -> Vec<String> {
self.inner
.sent
.lock()
.unwrap()
.iter()
.map(|r| r.url.clone())
.collect()
}
fn last_sent_headers(&self) -> Vec<(String, String)> {
self.inner
.sent
.lock()
.unwrap()
.last()
.map(|r| r.headers.clone())
.unwrap_or_default()
}
}
impl HttpClient for MockHttpClient {
fn send(&self, request: HttpRequest) {
self.inner.sent.lock().unwrap().push(request);
}
fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
std::mem::take(&mut *self.inner.responses.lock().unwrap())
}
}
struct MockDecoder;
impl TileDecoder for MockDecoder {
fn decode(&self, _bytes: &[u8]) -> Result<DecodedImage, TileError> {
Ok(DecodedImage {
width: 2,
height: 2,
data: std::sync::Arc::new(vec![255u8; 2 * 2 * 4]),
})
}
}
const TEMPLATE: &str = "https://tiles.example.com/{z}/{x}/{y}.png";
fn tile(z: u8, x: u32, y: u32) -> TileId {
TileId::new(z, x, y)
}
#[test]
fn tile_url_substitution() {
let client = MockHttpClient::new();
let source = HttpTileSource::new(TEMPLATE, Box::new(client));
let url = source.tile_url(&tile(10, 512, 340));
assert_eq!(url, "https://tiles.example.com/10/512/340.png");
}
#[test]
fn tile_url_zoom_zero() {
let client = MockHttpClient::new();
let source = HttpTileSource::new(TEMPLATE, Box::new(client));
let url = source.tile_url(&tile(0, 0, 0));
assert_eq!(url, "https://tiles.example.com/0/0/0.png");
}
#[test]
fn url_template_accessor() {
let client = MockHttpClient::new();
let source = HttpTileSource::new(TEMPLATE, Box::new(client));
assert_eq!(source.url_template(), TEMPLATE);
}
#[test]
fn request_sends_http_get() {
let client = MockHttpClient::new();
let handle = client.clone();
let source = HttpTileSource::new(TEMPLATE, Box::new(client));
source.request(tile(5, 10, 20));
let urls = handle.sent_urls();
assert_eq!(urls.len(), 1);
assert_eq!(urls[0], "https://tiles.example.com/5/10/20.png");
}
#[test]
fn poll_returns_empty_when_no_responses() {
let client = MockHttpClient::new();
let source = HttpTileSource::new(TEMPLATE, Box::new(client));
let results = source.poll();
assert!(results.is_empty());
}
#[test]
fn successful_fetch_without_decoder() {
let client = MockHttpClient::new();
let url = "https://tiles.example.com/0/0/0.png";
let body = vec![0u8; 256 * 256 * 4];
client.queue_response(url, 200, body.clone());
let source = HttpTileSource::new(TEMPLATE, Box::new(client));
source.request(tile(0, 0, 0));
let results = source.poll();
assert_eq!(results.len(), 1);
let (id, result) = &results[0];
assert_eq!(*id, tile(0, 0, 0));
let data = result.as_ref().expect("should succeed");
match &data.data {
TileData::Raster(img) => {
assert_eq!(img.width, 256);
assert_eq!(img.height, 256);
assert_eq!(img.data.len(), 256 * 256 * 4);
}
TileData::Vector(_) | TileData::RawVector(_) => panic!("expected raster tile data"),
}
}
#[test]
fn successful_fetch_with_decoder() {
let client = MockHttpClient::new();
let url = "https://tiles.example.com/0/0/0.png";
client.queue_response(url, 200, vec![1, 2, 3]);
let source =
HttpTileSource::with_decoder(TEMPLATE, Box::new(client), Box::new(MockDecoder));
source.request(tile(0, 0, 0));
let results = source.poll();
assert_eq!(results.len(), 1);
let (_, result) = &results[0];
match &result.as_ref().unwrap().data {
TileData::Raster(img) => {
assert_eq!(img.width, 2);
assert_eq!(img.height, 2);
}
TileData::Vector(_) | TileData::RawVector(_) => panic!("expected raster tile data"),
}
}
#[test]
fn cache_control_max_age_populates_freshness() {
let client = MockHttpClient::new();
let url = "https://tiles.example.com/0/0/0.png";
client.inner.responses.lock().unwrap().push((
url.to_string(),
Ok(HttpResponse {
status: 200,
body: vec![0u8; 256 * 256 * 4],
headers: vec![("Cache-Control".into(), "public, max-age=60".into())],
}),
));
let source = HttpTileSource::new(TEMPLATE, Box::new(client));
source.request(tile(0, 0, 0));
let results = source.poll();
let freshness = &results[0].1.as_ref().unwrap().freshness;
assert!(freshness.expires_at.is_some());
assert!(!freshness.is_expired());
}
#[test]
fn expires_header_populates_freshness() {
let client = MockHttpClient::new();
let url = "https://tiles.example.com/0/0/0.png";
client.inner.responses.lock().unwrap().push((
url.to_string(),
Ok(HttpResponse {
status: 200,
body: vec![0u8; 256 * 256 * 4],
headers: vec![("Expires".into(), "Wed, 21 Oct 2099 07:28:00 GMT".into())],
}),
));
let source = HttpTileSource::new(TEMPLATE, Box::new(client));
source.request(tile(0, 0, 0));
let results = source.poll();
let freshness = &results[0].1.as_ref().unwrap().freshness;
assert!(freshness.expires_at.is_some());
}
#[test]
fn http_404_returns_not_found() {
let client = MockHttpClient::new();
let url = "https://tiles.example.com/0/0/0.png";
client.queue_response(url, 404, vec![]);
let source = HttpTileSource::new(TEMPLATE, Box::new(client));
source.request(tile(0, 0, 0));
let results = source.poll();
assert_eq!(results.len(), 1);
assert!(matches!(results[0].1, Err(TileError::NotFound(_))));
}
#[test]
fn http_500_returns_network_error() {
let client = MockHttpClient::new();
let url = "https://tiles.example.com/0/0/0.png";
client.queue_response(url, 500, vec![]);
let source = HttpTileSource::new(TEMPLATE, Box::new(client));
source.request(tile(0, 0, 0));
let results = source.poll();
assert_eq!(results.len(), 1);
let (_, result) = &results[0];
match result {
Err(TileError::Network(msg)) => assert!(msg.contains("500")),
other => panic!("expected Network error, got {other:?}"),
}
}
#[test]
fn http_2xx_range_is_success() {
let client = MockHttpClient::new();
let url = "https://tiles.example.com/0/0/0.png";
client.queue_response(url, 204, vec![0u8; 256 * 256 * 4]);
let source = HttpTileSource::new(TEMPLATE, Box::new(client));
source.request(tile(0, 0, 0));
let results = source.poll();
assert_eq!(results.len(), 1);
assert!(results[0].1.is_ok(), "2xx should be treated as success");
}
#[test]
fn transport_error_returns_network_error() {
let client = MockHttpClient::new();
let url = "https://tiles.example.com/0/0/0.png";
client.queue_error(url, "connection refused");
let source = HttpTileSource::new(TEMPLATE, Box::new(client));
source.request(tile(0, 0, 0));
let results = source.poll();
assert_eq!(results.len(), 1);
match &results[0].1 {
Err(TileError::Network(msg)) => assert!(msg.contains("connection refused")),
other => panic!("expected Network error, got {other:?}"),
}
}
#[test]
fn default_headers_are_sent() {
let client = MockHttpClient::new();
let handle = client.clone();
let source = HttpTileSource::new(TEMPLATE, Box::new(client))
.with_header("Authorization", "Bearer test-key")
.with_header("User-Agent", "rustial/0.1");
source.request(tile(0, 0, 0));
let headers = handle.last_sent_headers();
assert_eq!(headers.len(), 2);
assert_eq!(headers[0].0, "Authorization");
assert_eq!(headers[0].1, "Bearer test-key");
assert_eq!(headers[1].0, "User-Agent");
assert_eq!(headers[1].1, "rustial/0.1");
}
#[test]
fn cancel_removes_pending_entry() {
let client = MockHttpClient::new();
let url = "https://tiles.example.com/0/0/0.png";
client.queue_response(url, 200, vec![0u8; 256 * 256 * 4]);
let source = HttpTileSource::new(TEMPLATE, Box::new(client));
source.request(tile(0, 0, 0));
assert_eq!(source.pending_count(), 1);
source.cancel(tile(0, 0, 0));
assert_eq!(source.pending_count(), 0);
let results = source.poll();
assert!(
results.is_empty(),
"cancelled tile should not appear in poll results"
);
}
#[test]
fn pending_count_tracks_inflight() {
let client = MockHttpClient::new();
let source = HttpTileSource::new(TEMPLATE, Box::new(client));
assert_eq!(source.pending_count(), 0);
source.request(tile(0, 0, 0));
assert_eq!(source.pending_count(), 1);
source.request(tile(1, 0, 0));
assert_eq!(source.pending_count(), 2);
}
#[test]
fn unknown_url_response_is_ignored() {
let client = MockHttpClient::new();
client.queue_response("https://other.example.com/tile.png", 200, vec![]);
let source = HttpTileSource::new(TEMPLATE, Box::new(client));
let results = source.poll();
assert!(results.is_empty());
}
#[test]
fn debug_impl() {
let client = MockHttpClient::new();
let source = HttpTileSource::new(TEMPLATE, Box::new(client));
let dbg = format!("{source:?}");
assert!(dbg.contains("HttpTileSource"));
assert!(dbg.contains("url_template"));
}
#[test]
fn request_revalidate_sends_if_none_match_header() {
let client = MockHttpClient::new();
let source = HttpTileSource::new(TEMPLATE, Box::new(client.clone()));
let id = TileId::new(1, 0, 0);
source.request_revalidate(
id,
RevalidationHint {
etag: Some("abc123".into()),
last_modified: None,
},
);
let headers = client.last_sent_headers();
let etag_header = headers.iter().find(|(k, _)| k == "If-None-Match");
assert!(etag_header.is_some(), "should include If-None-Match header");
assert_eq!(etag_header.unwrap().1, "abc123");
}
#[test]
fn request_revalidate_sends_if_modified_since_header() {
let client = MockHttpClient::new();
let source = HttpTileSource::new(TEMPLATE, Box::new(client.clone()));
let id = TileId::new(1, 0, 0);
source.request_revalidate(
id,
RevalidationHint {
etag: None,
last_modified: Some("Wed, 01 Jan 2025 00:00:00 GMT".into()),
},
);
let headers = client.last_sent_headers();
let lm_header = headers.iter().find(|(k, _)| k == "If-Modified-Since");
assert!(
lm_header.is_some(),
"should include If-Modified-Since header"
);
assert_eq!(lm_header.unwrap().1, "Wed, 01 Jan 2025 00:00:00 GMT");
}
#[test]
fn poll_returns_not_modified_for_304_response() {
let client = MockHttpClient::new();
let source = HttpTileSource::new(TEMPLATE, Box::new(client.clone()));
let id = TileId::new(1, 0, 0);
let url = source.tile_url(&id);
source.request(id);
client.queue_response_with_headers(
&url,
304,
vec![],
vec![
("cache-control".into(), "max-age=600".into()),
("etag".into(), "new-etag".into()),
],
);
let results = source.poll();
assert_eq!(results.len(), 1);
let (result_id, result) = &results[0];
assert_eq!(*result_id, id);
let response = result.as_ref().expect("should be Ok");
assert!(response.not_modified, "should be a not-modified response");
assert_eq!(response.freshness.etag.as_deref(), Some("new-etag"));
assert!(response.freshness.expires_at.is_some());
}
#[test]
fn diagnostics_count_categorized_failures() {
let client = MockHttpClient::new();
let source = HttpTileSource::with_decoder(
TEMPLATE,
Box::new(client.clone()),
Box::new(FailingDecoder),
);
let decode_tile = tile(3, 0, 0);
let timeout_tile = tile(3, 0, 1);
let status_tile = tile(3, 0, 2);
let not_found_tile = tile(3, 0, 3);
let cancelled_tile = tile(3, 0, 4);
for tile_id in [
decode_tile,
timeout_tile,
status_tile,
not_found_tile,
cancelled_tile,
] {
source.request(tile_id);
}
client.queue_response(&source.tile_url(&decode_tile), 200, vec![1, 2, 3]);
client.queue_error(&source.tile_url(&timeout_tile), "request timeout");
client.queue_response(&source.tile_url(&status_tile), 500, Vec::new());
client.queue_response(&source.tile_url(¬_found_tile), 404, Vec::new());
source.cancel(cancelled_tile);
client.queue_response(&source.tile_url(&cancelled_tile), 200, vec![0; 4]);
let _ = source.poll();
let diagnostics = source.diagnostics().expect("http source diagnostics");
assert_eq!(diagnostics.failure_diagnostics.decode_failures, 1);
assert_eq!(diagnostics.failure_diagnostics.transport_failures, 1);
assert_eq!(diagnostics.failure_diagnostics.timeout_failures, 1);
assert_eq!(diagnostics.failure_diagnostics.http_status_failures, 1);
assert_eq!(diagnostics.failure_diagnostics.not_found_failures, 1);
assert_eq!(diagnostics.failure_diagnostics.forced_cancellations, 1);
assert_eq!(
diagnostics.failure_diagnostics.ignored_completed_responses,
1
);
}
}