1use crate::io::{FetchPool, HttpClient, HttpResponse};
26use crate::tile_source::{
27 DecodedImage, TileData, TileDecoder, TileError, TileFreshness, TileResponse, TileSource,
28 TileSourceDiagnostics, TileSourceFailureDiagnostics,
29};
30use rustial_math::TileId;
31use std::collections::HashMap;
32use std::sync::{Arc, Mutex};
33use std::time::{Duration, SystemTime};
34
35pub const DEFAULT_RASTER_TILE_URL: &str =
37 "https://a.basemaps.cartocdn.com/light_all/{z}/{x}/{y}.png";
38
39pub const DEFAULT_RASTER_TILE_USER_AGENT: &str =
41 "rustial-example/0.1 (+https://github.com/user/rustial25d)";
42
43#[derive(Debug, Clone, PartialEq, Eq)]
45pub struct PooledRasterTileSourceConfig {
46 pub url_template: String,
48 pub headers: Vec<(String, String)>,
50 pub source_min_zoom: u8,
52 pub source_max_zoom: u8,
54 pub max_concurrent: usize,
56 pub max_cached: usize,
58}
59
60impl Default for PooledRasterTileSourceConfig {
61 fn default() -> Self {
62 Self {
63 url_template: DEFAULT_RASTER_TILE_URL.into(),
64 headers: vec![("User-Agent".into(), DEFAULT_RASTER_TILE_USER_AGENT.into())],
65 source_min_zoom: 0,
66 source_max_zoom: 19,
67 max_concurrent: 32,
68 max_cached: 768,
69 }
70 }
71} fn is_timeout_error(error: &str) -> bool {
74 error.to_ascii_lowercase().contains("timeout")
75}
76
77fn parse_cache_control_max_age(value: &str) -> Option<u64> {
78 for directive in value.split(',') {
79 let directive = directive.trim();
80 if let Some(rest) = directive.strip_prefix("max-age=") {
81 if let Ok(seconds) = rest.trim_matches('"').parse::<u64>() {
82 return Some(seconds);
83 }
84 }
85 }
86 None
87}
88
89fn parse_age_seconds(response: &HttpResponse) -> u64 {
90 response
91 .header("age")
92 .and_then(|value| value.parse::<u64>().ok())
93 .unwrap_or(0)
94}
95
96fn parse_http_freshness(response: &HttpResponse) -> TileFreshness {
97 let now = SystemTime::now();
98 let age = parse_age_seconds(response);
99
100 let expires_at = response
101 .header("cache-control")
102 .and_then(parse_cache_control_max_age)
103 .map(|max_age| max_age.saturating_sub(age))
104 .map(Duration::from_secs)
105 .and_then(|ttl| now.checked_add(ttl))
106 .or_else(|| {
107 response
108 .header("expires")
109 .and_then(|value| httpdate::parse_http_date(value).ok())
110 });
111
112 TileFreshness {
113 expires_at,
114 etag: response.header("etag").map(ToOwned::to_owned),
115 last_modified: response.header("last-modified").map(ToOwned::to_owned),
116 }
117}
118
119pub struct PooledTileSource {
122 url_template: String,
124 pool: FetchPool,
126 pending: Mutex<HashMap<String, TileId>>,
128 decoder: Option<Box<dyn TileDecoder>>,
130 default_headers: Vec<(String, String)>,
132 failure_diagnostics: Mutex<TileSourceFailureDiagnostics>,
134}
135
136impl std::fmt::Debug for PooledTileSource {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 f.debug_struct("PooledTileSource")
139 .field("url_template", &self.url_template)
140 .field("has_decoder", &self.decoder.is_some())
141 .field("queued", &self.pool.queued_count())
142 .field("in_flight", &self.pool.in_flight_count())
143 .finish()
144 }
145}
146
147impl PooledTileSource {
148 pub fn new(
153 url_template: impl Into<String>,
154 client: Box<dyn HttpClient>,
155 max_concurrent: usize,
156 ) -> Self {
157 Self {
158 url_template: url_template.into(),
159 pool: FetchPool::new(client, max_concurrent),
160 pending: Mutex::new(HashMap::new()),
161 decoder: None,
162 default_headers: Vec::new(),
163 failure_diagnostics: Mutex::new(TileSourceFailureDiagnostics::default()),
164 }
165 }
166
167 pub fn with_decoder(
169 url_template: impl Into<String>,
170 client: Box<dyn HttpClient>,
171 max_concurrent: usize,
172 decoder: Box<dyn TileDecoder>,
173 ) -> Self {
174 Self {
175 url_template: url_template.into(),
176 pool: FetchPool::new(client, max_concurrent),
177 pending: Mutex::new(HashMap::new()),
178 decoder: Some(decoder),
179 default_headers: Vec::new(),
180 failure_diagnostics: Mutex::new(TileSourceFailureDiagnostics::default()),
181 }
182 }
183
184 pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
186 self.default_headers.push((name.into(), value.into()));
187 self
188 }
189
190 fn tile_url(&self, id: &TileId) -> String {
192 self.url_template
193 .replace("{z}", &id.zoom.to_string())
194 .replace("{x}", &id.x.to_string())
195 .replace("{y}", &id.y.to_string())
196 }
197
198 fn tile_priority(id: &TileId) -> f64 {
204 id.zoom as f64
205 }
206}
207
208impl TileSource for PooledTileSource {
209 fn request(&self, id: TileId) {
210 let url = self.tile_url(&id);
211
212 if let Ok(mut pending) = self.pending.lock() {
213 pending.insert(url.clone(), id);
214 }
215
216 let mut request = crate::HttpRequest::get(&url);
217 for (name, value) in &self.default_headers {
218 request = request.with_header(name.clone(), value.clone());
219 }
220
221 self.pool.enqueue(request, Self::tile_priority(&id));
222 self.pool.flush();
223 }
224
225 fn request_revalidate(&self, id: TileId, hint: crate::tile_source::RevalidationHint) {
226 let url = self.tile_url(&id);
227
228 if let Ok(mut pending) = self.pending.lock() {
229 pending.insert(url.clone(), id);
230 }
231
232 let mut request = crate::HttpRequest::get(&url);
233 for (name, value) in &self.default_headers {
234 request = request.with_header(name.clone(), value.clone());
235 }
236 if let Some(etag) = &hint.etag {
237 request = request.with_header("If-None-Match", etag.clone());
238 }
239 if let Some(last_modified) = &hint.last_modified {
240 request = request.with_header("If-Modified-Since", last_modified.clone());
241 }
242
243 self.pool.enqueue(request, Self::tile_priority(&id));
244 self.pool.flush();
245 }
246
247 fn poll(&self) -> Vec<(TileId, Result<TileResponse, TileError>)> {
248 let responses = self.pool.poll();
249 if responses.is_empty() {
250 return Vec::new();
251 }
252
253 let mut pending = match self.pending.lock() {
254 Ok(p) => p,
255 Err(_) => return Vec::new(),
256 };
257
258 let mut results = Vec::with_capacity(responses.len());
259
260 for (url, response) in responses {
261 let tile_id = match pending.remove(&url) {
262 Some(id) => id,
263 None => {
264 if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
265 diagnostics.ignored_completed_responses += 1;
266 }
267 continue;
268 }
269 };
270
271 match response {
272 Ok(resp) if resp.status == 304 => {
273 let freshness = parse_http_freshness(&resp);
274 results.push((tile_id, Ok(TileResponse::not_modified(freshness))));
275 }
276 Ok(resp) if resp.is_success() => {
277 let freshness = parse_http_freshness(&resp);
278 let tile_result = if let Some(ref decoder) = self.decoder {
279 decoder
280 .decode(&resp.body)
281 .map(TileData::Raster)
282 .map(|data| TileResponse {
283 data,
284 freshness,
285 not_modified: false,
286 })
287 } else {
288 Ok(TileResponse {
289 data: TileData::Raster(DecodedImage {
290 width: 256,
291 height: 256,
292 data: Arc::new(resp.body),
293 }),
294 freshness,
295 not_modified: false,
296 })
297 };
298 if tile_result.is_err() {
299 if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
300 diagnostics.decode_failures += 1;
301 }
302 }
303 results.push((tile_id, tile_result));
304 }
305 Ok(resp) if resp.status == 404 => {
306 if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
307 diagnostics.not_found_failures += 1;
308 }
309 results.push((tile_id, Err(TileError::NotFound(tile_id))));
310 }
311 Ok(resp) => {
312 if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
313 diagnostics.http_status_failures += 1;
314 }
315 results.push((
316 tile_id,
317 Err(TileError::Network(format!("HTTP {}", resp.status))),
318 ));
319 }
320 Err(err) => {
321 if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
322 diagnostics.transport_failures += 1;
323 if is_timeout_error(&err) {
324 diagnostics.timeout_failures += 1;
325 }
326 }
327 results.push((tile_id, Err(TileError::Network(err))));
328 }
329 }
330 }
331
332 results
333 }
334
335 fn cancel(&self, id: TileId) {
336 let url = self.tile_url(&id);
337 let removed = if let Ok(mut pending) = self.pending.lock() {
338 pending.remove(&url).is_some()
339 } else {
340 false
341 };
342 if removed {
343 if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
344 diagnostics.forced_cancellations += 1;
345 }
346 }
347 self.pool.force_cancel(&url);
348 }
349
350 fn diagnostics(&self) -> Option<TileSourceDiagnostics> {
351 let failure_diagnostics = self
352 .failure_diagnostics
353 .lock()
354 .map(|diagnostics| diagnostics.clone())
355 .unwrap_or_default();
356 Some(TileSourceDiagnostics {
357 queued_requests: self.pool.queued_count(),
358 in_flight_requests: self.pool.in_flight_count(),
359 known_requests: self.pool.known_count(),
360 cancelled_in_flight_requests: self.pool.cancelled_in_flight_count(),
361 max_concurrent_requests: self.pool.max_concurrent(),
362 pending_decode_tasks: 0,
363 failure_diagnostics,
364 })
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371 use crate::io::{HttpRequest as Req, HttpResponse};
372 use std::sync::Mutex as StdMutex;
373
374 struct FailingDecoder;
375
376 impl TileDecoder for FailingDecoder {
377 fn decode(&self, _bytes: &[u8]) -> Result<DecodedImage, TileError> {
378 Err(TileError::Decode("bad image".into()))
379 }
380 }
381
382 struct InstantMock {
383 sent: StdMutex<Vec<String>>,
384 }
385
386 impl InstantMock {
387 fn new() -> Self {
388 Self {
389 sent: StdMutex::new(Vec::new()),
390 }
391 }
392 }
393
394 impl HttpClient for InstantMock {
395 fn send(&self, request: Req) {
396 self.sent.lock().unwrap().push(request.url);
397 }
398
399 fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
400 let sent = std::mem::take(&mut *self.sent.lock().unwrap());
401 sent.into_iter()
402 .map(|url| {
403 (
404 url,
405 Ok(HttpResponse {
406 status: 200,
407 body: vec![0u8; 256 * 256 * 4],
408 headers: vec![("Cache-Control".into(), "max-age=30".into())],
409 }),
410 )
411 })
412 .collect()
413 }
414 }
415
416 #[test]
417 fn request_and_poll_cycle() {
418 let source = PooledTileSource::new(
419 "https://example.com/{z}/{x}/{y}.png",
420 Box::new(InstantMock::new()),
421 4,
422 );
423
424 source.request(TileId::new(5, 10, 20));
425 let results = source.poll();
426 assert_eq!(results.len(), 1);
427 assert_eq!(results[0].0, TileId::new(5, 10, 20));
428 assert!(results[0].1.is_ok());
429 assert!(results[0]
430 .1
431 .as_ref()
432 .unwrap()
433 .freshness
434 .expires_at
435 .is_some());
436 }
437
438 #[test]
439 fn cancel_removes_pending() {
440 let source = PooledTileSource::new(
441 "https://example.com/{z}/{x}/{y}.png",
442 Box::new(InstantMock::new()),
443 4,
444 );
445
446 source.request(TileId::new(0, 0, 0));
447 source.cancel(TileId::new(0, 0, 0));
448
449 let results = source.poll();
450 assert!(results.is_empty());
453 }
454
455 #[test]
456 fn debug_impl() {
457 let source = PooledTileSource::new(
458 "https://example.com/{z}/{x}/{y}.png",
459 Box::new(InstantMock::new()),
460 4,
461 );
462 let dbg = format!("{source:?}");
463 assert!(dbg.contains("PooledTileSource"));
464 }
465
466 #[test]
467 fn default_config_has_shared_raster_defaults() {
468 let config = PooledRasterTileSourceConfig::default();
469 assert_eq!(config.url_template, DEFAULT_RASTER_TILE_URL);
470 assert_eq!(config.source_min_zoom, 0);
471 assert_eq!(config.source_max_zoom, 19);
472 assert_eq!(config.max_concurrent, 32);
473 assert_eq!(config.max_cached, 768);
474 assert_eq!(config.headers.len(), 1);
475 assert_eq!(config.headers[0].0, "User-Agent");
476 }
477
478 struct QueueClient {
479 sent: StdMutex<Vec<String>>,
480 responses: StdMutex<Vec<(String, Result<HttpResponse, String>)>>,
481 }
482
483 impl QueueClient {
484 fn new() -> Self {
485 Self {
486 sent: StdMutex::new(Vec::new()),
487 responses: StdMutex::new(Vec::new()),
488 }
489 }
490
491 fn queue_response(&self, url: String, status: u16, body: Vec<u8>) {
492 self.responses.lock().unwrap().push((
493 url,
494 Ok(HttpResponse {
495 status,
496 body,
497 headers: Vec::new(),
498 }),
499 ));
500 }
501
502 fn queue_error(&self, url: String, error: &str) {
503 self.responses
504 .lock()
505 .unwrap()
506 .push((url, Err(error.to_string())));
507 }
508 }
509
510 impl HttpClient for Arc<QueueClient> {
511 fn send(&self, request: Req) {
512 self.sent.lock().unwrap().push(request.url);
513 }
514
515 fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
516 std::mem::take(&mut *self.responses.lock().unwrap())
517 }
518 }
519
520 #[test]
521 fn diagnostics_count_categorized_failures() {
522 let client = Arc::new(QueueClient::new());
523 let source = PooledTileSource::with_decoder(
524 "https://example.com/{z}/{x}/{y}.png",
525 Box::new(client.clone()),
526 4,
527 Box::new(FailingDecoder),
528 );
529
530 let decode_tile = TileId::new(3, 0, 0);
531 let timeout_tile = TileId::new(3, 0, 1);
532 let status_tile = TileId::new(3, 0, 2);
533 let not_found_tile = TileId::new(3, 0, 3);
534 let cancelled_tile = TileId::new(3, 0, 4);
535
536 for tile_id in [
537 decode_tile,
538 timeout_tile,
539 status_tile,
540 not_found_tile,
541 cancelled_tile,
542 ] {
543 source.request(tile_id);
544 }
545
546 client.queue_response(source.tile_url(&decode_tile), 200, vec![1, 2, 3]);
547 client.queue_error(source.tile_url(&timeout_tile), "request timeout");
548 client.queue_response(source.tile_url(&status_tile), 500, Vec::new());
549 client.queue_response(source.tile_url(¬_found_tile), 404, Vec::new());
550
551 source.cancel(cancelled_tile);
552 client.queue_response(source.tile_url(&cancelled_tile), 200, vec![0; 4]);
553
554 let _ = source.poll();
555 let diagnostics = source.diagnostics().expect("pooled source diagnostics");
556
557 assert_eq!(diagnostics.failure_diagnostics.decode_failures, 1);
558 assert_eq!(diagnostics.failure_diagnostics.transport_failures, 1);
559 assert_eq!(diagnostics.failure_diagnostics.timeout_failures, 1);
560 assert_eq!(diagnostics.failure_diagnostics.http_status_failures, 1);
561 assert_eq!(diagnostics.failure_diagnostics.not_found_failures, 1);
562 assert_eq!(diagnostics.failure_diagnostics.forced_cancellations, 1);
563 assert_eq!(
564 diagnostics.failure_diagnostics.ignored_completed_responses,
565 1
566 );
567 }
568}