1use crate::io::{FetchPool, HttpClient, HttpResponse};
26use crate::tile_source::{
27 DecodedImage, TileData, TileDecoder, TileError, TileFreshness, TileResponse, TileSource,
28 TileSourceDiagnostics, TileSourceFailureDiagnostics,
29};
30use rustial_math::TileId;
31use std::collections::HashMap;
32use std::sync::{Arc, Mutex};
33use std::time::{Duration, SystemTime};
34
35pub const DEFAULT_RASTER_TILE_URL: &str =
37 "https://a.basemaps.cartocdn.com/light_all/{z}/{x}/{y}.png";
38
39pub const DEFAULT_RASTER_TILE_USER_AGENT: &str =
41 "rustial-example/0.1 (+https://github.com/user/rustial25d)";
42
43#[derive(Debug, Clone, PartialEq, Eq)]
45pub struct PooledRasterTileSourceConfig {
46 pub url_template: String,
48 pub headers: Vec<(String, String)>,
50 pub source_min_zoom: u8,
52 pub source_max_zoom: u8,
54 pub max_concurrent: usize,
56 pub max_cached: usize,
58}
59
60impl Default for PooledRasterTileSourceConfig {
61 fn default() -> Self {
62 Self {
63 url_template: DEFAULT_RASTER_TILE_URL.into(),
64 headers: vec![(
65 "User-Agent".into(),
66 DEFAULT_RASTER_TILE_USER_AGENT.into(),
67 )],
68 source_min_zoom: 0,
69 source_max_zoom: 19,
70 max_concurrent: 32,
71 max_cached: 768,
72 }
73 }
74} fn is_timeout_error(error: &str) -> bool {
77 error.to_ascii_lowercase().contains("timeout")
78}
79
80fn parse_cache_control_max_age(value: &str) -> Option<u64> {
81 for directive in value.split(',') {
82 let directive = directive.trim();
83 if let Some(rest) = directive.strip_prefix("max-age=") {
84 if let Ok(seconds) = rest.trim_matches('"').parse::<u64>() {
85 return Some(seconds);
86 }
87 }
88 }
89 None
90}
91
92fn parse_age_seconds(response: &HttpResponse) -> u64 {
93 response
94 .header("age")
95 .and_then(|value| value.parse::<u64>().ok())
96 .unwrap_or(0)
97}
98
99fn parse_http_freshness(response: &HttpResponse) -> TileFreshness {
100 let now = SystemTime::now();
101 let age = parse_age_seconds(response);
102
103 let expires_at = response
104 .header("cache-control")
105 .and_then(parse_cache_control_max_age)
106 .map(|max_age| max_age.saturating_sub(age))
107 .map(Duration::from_secs)
108 .and_then(|ttl| now.checked_add(ttl))
109 .or_else(|| {
110 response
111 .header("expires")
112 .and_then(|value| httpdate::parse_http_date(value).ok())
113 });
114
115 TileFreshness {
116 expires_at,
117 etag: response.header("etag").map(ToOwned::to_owned),
118 last_modified: response.header("last-modified").map(ToOwned::to_owned),
119 }
120}
121
122pub struct PooledTileSource {
125 url_template: String,
127 pool: FetchPool,
129 pending: Mutex<HashMap<String, TileId>>,
131 decoder: Option<Box<dyn TileDecoder>>,
133 default_headers: Vec<(String, String)>,
135 failure_diagnostics: Mutex<TileSourceFailureDiagnostics>,
137}
138
139impl std::fmt::Debug for PooledTileSource {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 f.debug_struct("PooledTileSource")
142 .field("url_template", &self.url_template)
143 .field("has_decoder", &self.decoder.is_some())
144 .field("queued", &self.pool.queued_count())
145 .field("in_flight", &self.pool.in_flight_count())
146 .finish()
147 }
148}
149
150impl PooledTileSource {
151 pub fn new(
156 url_template: impl Into<String>,
157 client: Box<dyn HttpClient>,
158 max_concurrent: usize,
159 ) -> Self {
160 Self {
161 url_template: url_template.into(),
162 pool: FetchPool::new(client, max_concurrent),
163 pending: Mutex::new(HashMap::new()),
164 decoder: None,
165 default_headers: Vec::new(),
166 failure_diagnostics: Mutex::new(TileSourceFailureDiagnostics::default()),
167 }
168 }
169
170 pub fn with_decoder(
172 url_template: impl Into<String>,
173 client: Box<dyn HttpClient>,
174 max_concurrent: usize,
175 decoder: Box<dyn TileDecoder>,
176 ) -> Self {
177 Self {
178 url_template: url_template.into(),
179 pool: FetchPool::new(client, max_concurrent),
180 pending: Mutex::new(HashMap::new()),
181 decoder: Some(decoder),
182 default_headers: Vec::new(),
183 failure_diagnostics: Mutex::new(TileSourceFailureDiagnostics::default()),
184 }
185 }
186
187 pub fn with_header(
189 mut self,
190 name: impl Into<String>,
191 value: impl Into<String>,
192 ) -> Self {
193 self.default_headers.push((name.into(), value.into()));
194 self
195 }
196
197 fn tile_url(&self, id: &TileId) -> String {
199 self.url_template
200 .replace("{z}", &id.zoom.to_string())
201 .replace("{x}", &id.x.to_string())
202 .replace("{y}", &id.y.to_string())
203 }
204
205 fn tile_priority(id: &TileId) -> f64 {
211 id.zoom as f64
212 }
213}
214
215impl TileSource for PooledTileSource {
216 fn request(&self, id: TileId) {
217 let url = self.tile_url(&id);
218
219 if let Ok(mut pending) = self.pending.lock() {
220 pending.insert(url.clone(), id);
221 }
222
223 let mut request = crate::HttpRequest::get(&url);
224 for (name, value) in &self.default_headers {
225 request = request.with_header(name.clone(), value.clone());
226 }
227
228 self.pool.enqueue(request, Self::tile_priority(&id));
229 self.pool.flush();
230 }
231
232 fn request_revalidate(&self, id: TileId, hint: crate::tile_source::RevalidationHint) {
233 let url = self.tile_url(&id);
234
235 if let Ok(mut pending) = self.pending.lock() {
236 pending.insert(url.clone(), id);
237 }
238
239 let mut request = crate::HttpRequest::get(&url);
240 for (name, value) in &self.default_headers {
241 request = request.with_header(name.clone(), value.clone());
242 }
243 if let Some(etag) = &hint.etag {
244 request = request.with_header("If-None-Match", etag.clone());
245 }
246 if let Some(last_modified) = &hint.last_modified {
247 request = request.with_header("If-Modified-Since", last_modified.clone());
248 }
249
250 self.pool.enqueue(request, Self::tile_priority(&id));
251 self.pool.flush();
252 }
253
254 fn poll(&self) -> Vec<(TileId, Result<TileResponse, TileError>)> {
255 let responses = self.pool.poll();
256 if responses.is_empty() {
257 return Vec::new();
258 }
259
260 let mut pending = match self.pending.lock() {
261 Ok(p) => p,
262 Err(_) => return Vec::new(),
263 };
264
265 let mut results = Vec::with_capacity(responses.len());
266
267 for (url, response) in responses {
268 let tile_id = match pending.remove(&url) {
269 Some(id) => id,
270 None => {
271 if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
272 diagnostics.ignored_completed_responses += 1;
273 }
274 continue;
275 }
276 };
277
278 match response {
279 Ok(resp) if resp.status == 304 => {
280 let freshness = parse_http_freshness(&resp);
281 results.push((tile_id, Ok(TileResponse::not_modified(freshness))));
282 }
283 Ok(resp) if resp.is_success() => {
284 let freshness = parse_http_freshness(&resp);
285 let tile_result = if let Some(ref decoder) = self.decoder {
286 decoder
287 .decode(&resp.body)
288 .map(TileData::Raster)
289 .map(|data| TileResponse { data, freshness, not_modified: false })
290 } else {
291 Ok(TileResponse {
292 data: TileData::Raster(DecodedImage {
293 width: 256,
294 height: 256,
295 data: Arc::new(resp.body),
296 }),
297 freshness,
298 not_modified: false,
299 })
300 };
301 if tile_result.is_err() {
302 if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
303 diagnostics.decode_failures += 1;
304 }
305 }
306 results.push((tile_id, tile_result));
307 }
308 Ok(resp) if resp.status == 404 => {
309 if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
310 diagnostics.not_found_failures += 1;
311 }
312 results.push((tile_id, Err(TileError::NotFound(tile_id))));
313 }
314 Ok(resp) => {
315 if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
316 diagnostics.http_status_failures += 1;
317 }
318 results.push((
319 tile_id,
320 Err(TileError::Network(format!("HTTP {}", resp.status))),
321 ));
322 }
323 Err(err) => {
324 if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
325 diagnostics.transport_failures += 1;
326 if is_timeout_error(&err) {
327 diagnostics.timeout_failures += 1;
328 }
329 }
330 results.push((tile_id, Err(TileError::Network(err))));
331 }
332 }
333 }
334
335 results
336 }
337
338 fn cancel(&self, id: TileId) {
339 let url = self.tile_url(&id);
340 let removed = if let Ok(mut pending) = self.pending.lock() {
341 pending.remove(&url).is_some()
342 } else {
343 false
344 };
345 if removed {
346 if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
347 diagnostics.forced_cancellations += 1;
348 }
349 }
350 self.pool.force_cancel(&url);
351 }
352
353 fn diagnostics(&self) -> Option<TileSourceDiagnostics> {
354 let failure_diagnostics = self
355 .failure_diagnostics
356 .lock()
357 .map(|diagnostics| diagnostics.clone())
358 .unwrap_or_default();
359 Some(TileSourceDiagnostics {
360 queued_requests: self.pool.queued_count(),
361 in_flight_requests: self.pool.in_flight_count(),
362 known_requests: self.pool.known_count(),
363 cancelled_in_flight_requests: self.pool.cancelled_in_flight_count(),
364 max_concurrent_requests: self.pool.max_concurrent(),
365 pending_decode_tasks: 0,
366 failure_diagnostics,
367 })
368 }
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374 use crate::io::{HttpRequest as Req, HttpResponse};
375 use std::sync::Mutex as StdMutex;
376
377 struct FailingDecoder;
378
379 impl TileDecoder for FailingDecoder {
380 fn decode(&self, _bytes: &[u8]) -> Result<DecodedImage, TileError> {
381 Err(TileError::Decode("bad image".into()))
382 }
383 }
384
385 struct InstantMock {
386 sent: StdMutex<Vec<String>>,
387 }
388
389 impl InstantMock {
390 fn new() -> Self {
391 Self {
392 sent: StdMutex::new(Vec::new()),
393 }
394 }
395 }
396
397 impl HttpClient for InstantMock {
398 fn send(&self, request: Req) {
399 self.sent.lock().unwrap().push(request.url);
400 }
401
402 fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
403 let sent = std::mem::take(&mut *self.sent.lock().unwrap());
404 sent.into_iter()
405 .map(|url| {
406 (
407 url,
408 Ok(HttpResponse {
409 status: 200,
410 body: vec![0u8; 256 * 256 * 4],
411 headers: vec![("Cache-Control".into(), "max-age=30".into())],
412 }),
413 )
414 })
415 .collect()
416 }
417 }
418
419 #[test]
420 fn request_and_poll_cycle() {
421 let source = PooledTileSource::new(
422 "https://example.com/{z}/{x}/{y}.png",
423 Box::new(InstantMock::new()),
424 4,
425 );
426
427 source.request(TileId::new(5, 10, 20));
428 let results = source.poll();
429 assert_eq!(results.len(), 1);
430 assert_eq!(results[0].0, TileId::new(5, 10, 20));
431 assert!(results[0].1.is_ok());
432 assert!(results[0].1.as_ref().unwrap().freshness.expires_at.is_some());
433 }
434
435 #[test]
436 fn cancel_removes_pending() {
437 let source = PooledTileSource::new(
438 "https://example.com/{z}/{x}/{y}.png",
439 Box::new(InstantMock::new()),
440 4,
441 );
442
443 source.request(TileId::new(0, 0, 0));
444 source.cancel(TileId::new(0, 0, 0));
445
446 let results = source.poll();
447 assert!(results.is_empty());
450 }
451
452 #[test]
453 fn debug_impl() {
454 let source = PooledTileSource::new(
455 "https://example.com/{z}/{x}/{y}.png",
456 Box::new(InstantMock::new()),
457 4,
458 );
459 let dbg = format!("{source:?}");
460 assert!(dbg.contains("PooledTileSource"));
461 }
462
463 #[test]
464 fn default_config_has_shared_raster_defaults() {
465 let config = PooledRasterTileSourceConfig::default();
466 assert_eq!(config.url_template, DEFAULT_RASTER_TILE_URL);
467 assert_eq!(config.source_min_zoom, 0);
468 assert_eq!(config.source_max_zoom, 19);
469 assert_eq!(config.max_concurrent, 32);
470 assert_eq!(config.max_cached, 768);
471 assert_eq!(config.headers.len(), 1);
472 assert_eq!(config.headers[0].0, "User-Agent");
473 }
474
475 struct QueueClient {
476 sent: StdMutex<Vec<String>>,
477 responses: StdMutex<Vec<(String, Result<HttpResponse, String>)>>,
478 }
479
480 impl QueueClient {
481 fn new() -> Self {
482 Self {
483 sent: StdMutex::new(Vec::new()),
484 responses: StdMutex::new(Vec::new()),
485 }
486 }
487
488 fn queue_response(&self, url: String, status: u16, body: Vec<u8>) {
489 self.responses.lock().unwrap().push((
490 url,
491 Ok(HttpResponse {
492 status,
493 body,
494 headers: Vec::new(),
495 }),
496 ));
497 }
498
499 fn queue_error(&self, url: String, error: &str) {
500 self.responses
501 .lock()
502 .unwrap()
503 .push((url, Err(error.to_string())));
504 }
505 }
506
507 impl HttpClient for Arc<QueueClient> {
508 fn send(&self, request: Req) {
509 self.sent.lock().unwrap().push(request.url);
510 }
511
512 fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
513 std::mem::take(&mut *self.responses.lock().unwrap())
514 }
515 }
516
517 #[test]
518 fn diagnostics_count_categorized_failures() {
519 let client = Arc::new(QueueClient::new());
520 let source = PooledTileSource::with_decoder(
521 "https://example.com/{z}/{x}/{y}.png",
522 Box::new(client.clone()),
523 4,
524 Box::new(FailingDecoder),
525 );
526
527 let decode_tile = TileId::new(3, 0, 0);
528 let timeout_tile = TileId::new(3, 0, 1);
529 let status_tile = TileId::new(3, 0, 2);
530 let not_found_tile = TileId::new(3, 0, 3);
531 let cancelled_tile = TileId::new(3, 0, 4);
532
533 for tile_id in [decode_tile, timeout_tile, status_tile, not_found_tile, cancelled_tile] {
534 source.request(tile_id);
535 }
536
537 client.queue_response(source.tile_url(&decode_tile), 200, vec![1, 2, 3]);
538 client.queue_error(source.tile_url(&timeout_tile), "request timeout");
539 client.queue_response(source.tile_url(&status_tile), 500, Vec::new());
540 client.queue_response(source.tile_url(¬_found_tile), 404, Vec::new());
541
542 source.cancel(cancelled_tile);
543 client.queue_response(source.tile_url(&cancelled_tile), 200, vec![0; 4]);
544
545 let _ = source.poll();
546 let diagnostics = source.diagnostics().expect("pooled source diagnostics");
547
548 assert_eq!(diagnostics.failure_diagnostics.decode_failures, 1);
549 assert_eq!(diagnostics.failure_diagnostics.transport_failures, 1);
550 assert_eq!(diagnostics.failure_diagnostics.timeout_failures, 1);
551 assert_eq!(diagnostics.failure_diagnostics.http_status_failures, 1);
552 assert_eq!(diagnostics.failure_diagnostics.not_found_failures, 1);
553 assert_eq!(diagnostics.failure_diagnostics.forced_cancellations, 1);
554 assert_eq!(diagnostics.failure_diagnostics.ignored_completed_responses, 1);
555 }
556}