Skip to main content

rustial_engine/
pooled_tile_source.rs

1//! Adapter that combines [`HttpTileSource`] with [`FetchPool`] for
2//! priority-ordered, concurrency-limited tile fetching.
3//!
4//! [`PooledTileSource`] implements [`TileSource`] and delegates to a
5//! [`FetchPool`] for scheduling.  This wires together the two utilities
6//! that previously existed as independent components.
7//!
8//! # Example
9//!
10//! ```rust,ignore
11//! use rustial_engine::{PooledTileSource, FetchPool};
12//!
13//! let client: Box<dyn HttpClient> = /* ... */;
14//! let source = PooledTileSource::new(
15//!     "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
16//!     client,
17//!     6,   // max concurrent
18//! );
19//!
20//! // Use as any TileSource:
21//! source.request(tile_id);
22//! let completed = source.poll();
23//! ```
24
25use crate::io::{FetchPool, HttpClient, HttpResponse};
26use crate::tile_source::{
27    DecodedImage, TileData, TileDecoder, TileError, TileFreshness, TileResponse, TileSource,
28    TileSourceDiagnostics, TileSourceFailureDiagnostics,
29};
30use rustial_math::TileId;
31use std::collections::HashMap;
32use std::sync::{Arc, Mutex};
33use std::time::{Duration, SystemTime};
34
35/// Default shared raster tile URL used by built-in examples and integrations.
36pub const DEFAULT_RASTER_TILE_URL: &str =
37    "https://a.basemaps.cartocdn.com/light_all/{z}/{x}/{y}.png";
38
39/// Default shared `User-Agent` header for built-in HTTP tile requests.
40pub const DEFAULT_RASTER_TILE_USER_AGENT: &str =
41    "rustial-example/0.1 (+https://github.com/user/rustial25d)";
42
43/// Renderer-agnostic configuration for a pooled HTTP raster tile source.
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub struct PooledRasterTileSourceConfig {
46    /// URL template with `{z}`, `{x}`, and `{y}` placeholders.
47    pub url_template: String,
48    /// Default request headers applied to every outgoing HTTP request.
49    pub headers: Vec<(String, String)>,
50    /// Minimum zoom level supported by the source.
51    pub source_min_zoom: u8,
52    /// Maximum zoom level supported by the source.
53    pub source_max_zoom: u8,
54    /// Maximum number of concurrent in-flight requests.
55    pub max_concurrent: usize,
56    /// Maximum retained tile count / visible budget for the layer using this source.
57    pub max_cached: usize,
58}
59
60impl Default for PooledRasterTileSourceConfig {
61    fn default() -> Self {
62        Self {
63            url_template: DEFAULT_RASTER_TILE_URL.into(),
64            headers: vec![("User-Agent".into(), DEFAULT_RASTER_TILE_USER_AGENT.into())],
65            source_min_zoom: 0,
66            source_max_zoom: 19,
67            max_concurrent: 32,
68            max_cached: 768,
69        }
70    }
71} // Close the Default implementation here
72
73fn is_timeout_error(error: &str) -> bool {
74    error.to_ascii_lowercase().contains("timeout")
75}
76
77fn parse_cache_control_max_age(value: &str) -> Option<u64> {
78    for directive in value.split(',') {
79        let directive = directive.trim();
80        if let Some(rest) = directive.strip_prefix("max-age=") {
81            if let Ok(seconds) = rest.trim_matches('"').parse::<u64>() {
82                return Some(seconds);
83            }
84        }
85    }
86    None
87}
88
89fn parse_age_seconds(response: &HttpResponse) -> u64 {
90    response
91        .header("age")
92        .and_then(|value| value.parse::<u64>().ok())
93        .unwrap_or(0)
94}
95
96fn parse_http_freshness(response: &HttpResponse) -> TileFreshness {
97    let now = SystemTime::now();
98    let age = parse_age_seconds(response);
99
100    let expires_at = response
101        .header("cache-control")
102        .and_then(parse_cache_control_max_age)
103        .map(|max_age| max_age.saturating_sub(age))
104        .map(Duration::from_secs)
105        .and_then(|ttl| now.checked_add(ttl))
106        .or_else(|| {
107            response
108                .header("expires")
109                .and_then(|value| httpdate::parse_http_date(value).ok())
110        });
111
112    TileFreshness {
113        expires_at,
114        etag: response.header("etag").map(ToOwned::to_owned),
115        last_modified: response.header("last-modified").map(ToOwned::to_owned),
116    }
117}
118
119/// A [`TileSource`] that fetches tiles over HTTP with concurrency limiting
120/// and viewport-center priority ordering via [`FetchPool`].
121pub struct PooledTileSource {
122    /// URL template with `{z}`, `{x}`, `{y}` placeholders.
123    url_template: String,
124    /// The underlying fetch pool that schedules HTTP requests.
125    pool: FetchPool,
126    /// Mapping from request URL to originating `TileId`.
127    pending: Mutex<HashMap<String, TileId>>,
128    /// Optional image decoder (PNG / JPEG / WebP -> RGBA8).
129    decoder: Option<Box<dyn TileDecoder>>,
130    /// Extra headers added to every outgoing request.
131    default_headers: Vec<(String, String)>,
132    /// Categorized source-side failure counters.
133    failure_diagnostics: Mutex<TileSourceFailureDiagnostics>,
134}
135
136impl std::fmt::Debug for PooledTileSource {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        f.debug_struct("PooledTileSource")
139            .field("url_template", &self.url_template)
140            .field("has_decoder", &self.decoder.is_some())
141            .field("queued", &self.pool.queued_count())
142            .field("in_flight", &self.pool.in_flight_count())
143            .finish()
144    }
145}
146
147impl PooledTileSource {
148    /// Create a new pooled tile source without image decoding.
149    ///
150    /// Response bodies are assumed to be raw RGBA8 256x256 pixel data.
151    /// For real tile providers use [`with_decoder`](Self::with_decoder).
152    pub fn new(
153        url_template: impl Into<String>,
154        client: Box<dyn HttpClient>,
155        max_concurrent: usize,
156    ) -> Self {
157        Self {
158            url_template: url_template.into(),
159            pool: FetchPool::new(client, max_concurrent),
160            pending: Mutex::new(HashMap::new()),
161            decoder: None,
162            default_headers: Vec::new(),
163            failure_diagnostics: Mutex::new(TileSourceFailureDiagnostics::default()),
164        }
165    }
166
167    /// Create a new pooled tile source with a [`TileDecoder`].
168    pub fn with_decoder(
169        url_template: impl Into<String>,
170        client: Box<dyn HttpClient>,
171        max_concurrent: usize,
172        decoder: Box<dyn TileDecoder>,
173    ) -> Self {
174        Self {
175            url_template: url_template.into(),
176            pool: FetchPool::new(client, max_concurrent),
177            pending: Mutex::new(HashMap::new()),
178            decoder: Some(decoder),
179            default_headers: Vec::new(),
180            failure_diagnostics: Mutex::new(TileSourceFailureDiagnostics::default()),
181        }
182    }
183
184    /// Add a default header that will be sent with every tile request.
185    pub fn with_header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
186        self.default_headers.push((name.into(), value.into()));
187        self
188    }
189
190    /// Expand the URL template for a given tile ID.
191    fn tile_url(&self, id: &TileId) -> String {
192        self.url_template
193            .replace("{z}", &id.zoom.to_string())
194            .replace("{x}", &id.x.to_string())
195            .replace("{y}", &id.y.to_string())
196    }
197
198    /// Compute a priority score for a tile.
199    ///
200    /// Lower zoom tiles are higher priority (loaded first as fallbacks).
201    /// Within the same zoom, tiles are equal priority (the pool's
202    /// insertion order breaks ties).
203    fn tile_priority(id: &TileId) -> f64 {
204        id.zoom as f64
205    }
206}
207
208impl TileSource for PooledTileSource {
209    fn request(&self, id: TileId) {
210        let url = self.tile_url(&id);
211
212        if let Ok(mut pending) = self.pending.lock() {
213            pending.insert(url.clone(), id);
214        }
215
216        let mut request = crate::HttpRequest::get(&url);
217        for (name, value) in &self.default_headers {
218            request = request.with_header(name.clone(), value.clone());
219        }
220
221        self.pool.enqueue(request, Self::tile_priority(&id));
222        self.pool.flush();
223    }
224
225    fn request_revalidate(&self, id: TileId, hint: crate::tile_source::RevalidationHint) {
226        let url = self.tile_url(&id);
227
228        if let Ok(mut pending) = self.pending.lock() {
229            pending.insert(url.clone(), id);
230        }
231
232        let mut request = crate::HttpRequest::get(&url);
233        for (name, value) in &self.default_headers {
234            request = request.with_header(name.clone(), value.clone());
235        }
236        if let Some(etag) = &hint.etag {
237            request = request.with_header("If-None-Match", etag.clone());
238        }
239        if let Some(last_modified) = &hint.last_modified {
240            request = request.with_header("If-Modified-Since", last_modified.clone());
241        }
242
243        self.pool.enqueue(request, Self::tile_priority(&id));
244        self.pool.flush();
245    }
246
247    fn poll(&self) -> Vec<(TileId, Result<TileResponse, TileError>)> {
248        let responses = self.pool.poll();
249        if responses.is_empty() {
250            return Vec::new();
251        }
252
253        let mut pending = match self.pending.lock() {
254            Ok(p) => p,
255            Err(_) => return Vec::new(),
256        };
257
258        let mut results = Vec::with_capacity(responses.len());
259
260        for (url, response) in responses {
261            let tile_id = match pending.remove(&url) {
262                Some(id) => id,
263                None => {
264                    if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
265                        diagnostics.ignored_completed_responses += 1;
266                    }
267                    continue;
268                }
269            };
270
271            match response {
272                Ok(resp) if resp.status == 304 => {
273                    let freshness = parse_http_freshness(&resp);
274                    results.push((tile_id, Ok(TileResponse::not_modified(freshness))));
275                }
276                Ok(resp) if resp.is_success() => {
277                    let freshness = parse_http_freshness(&resp);
278                    let tile_result = if let Some(ref decoder) = self.decoder {
279                        decoder
280                            .decode(&resp.body)
281                            .map(TileData::Raster)
282                            .map(|data| TileResponse {
283                                data,
284                                freshness,
285                                not_modified: false,
286                            })
287                    } else {
288                        Ok(TileResponse {
289                            data: TileData::Raster(DecodedImage {
290                                width: 256,
291                                height: 256,
292                                data: Arc::new(resp.body),
293                            }),
294                            freshness,
295                            not_modified: false,
296                        })
297                    };
298                    if tile_result.is_err() {
299                        if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
300                            diagnostics.decode_failures += 1;
301                        }
302                    }
303                    results.push((tile_id, tile_result));
304                }
305                Ok(resp) if resp.status == 404 => {
306                    if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
307                        diagnostics.not_found_failures += 1;
308                    }
309                    results.push((tile_id, Err(TileError::NotFound(tile_id))));
310                }
311                Ok(resp) => {
312                    if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
313                        diagnostics.http_status_failures += 1;
314                    }
315                    results.push((
316                        tile_id,
317                        Err(TileError::Network(format!("HTTP {}", resp.status))),
318                    ));
319                }
320                Err(err) => {
321                    if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
322                        diagnostics.transport_failures += 1;
323                        if is_timeout_error(&err) {
324                            diagnostics.timeout_failures += 1;
325                        }
326                    }
327                    results.push((tile_id, Err(TileError::Network(err))));
328                }
329            }
330        }
331
332        results
333    }
334
335    fn cancel(&self, id: TileId) {
336        let url = self.tile_url(&id);
337        let removed = if let Ok(mut pending) = self.pending.lock() {
338            pending.remove(&url).is_some()
339        } else {
340            false
341        };
342        if removed {
343            if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
344                diagnostics.forced_cancellations += 1;
345            }
346        }
347        self.pool.force_cancel(&url);
348    }
349
350    fn diagnostics(&self) -> Option<TileSourceDiagnostics> {
351        let failure_diagnostics = self
352            .failure_diagnostics
353            .lock()
354            .map(|diagnostics| diagnostics.clone())
355            .unwrap_or_default();
356        Some(TileSourceDiagnostics {
357            queued_requests: self.pool.queued_count(),
358            in_flight_requests: self.pool.in_flight_count(),
359            known_requests: self.pool.known_count(),
360            cancelled_in_flight_requests: self.pool.cancelled_in_flight_count(),
361            max_concurrent_requests: self.pool.max_concurrent(),
362            pending_decode_tasks: 0,
363            failure_diagnostics,
364        })
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371    use crate::io::{HttpRequest as Req, HttpResponse};
372    use std::sync::Mutex as StdMutex;
373
374    struct FailingDecoder;
375
376    impl TileDecoder for FailingDecoder {
377        fn decode(&self, _bytes: &[u8]) -> Result<DecodedImage, TileError> {
378            Err(TileError::Decode("bad image".into()))
379        }
380    }
381
382    struct InstantMock {
383        sent: StdMutex<Vec<String>>,
384    }
385
386    impl InstantMock {
387        fn new() -> Self {
388            Self {
389                sent: StdMutex::new(Vec::new()),
390            }
391        }
392    }
393
394    impl HttpClient for InstantMock {
395        fn send(&self, request: Req) {
396            self.sent.lock().unwrap().push(request.url);
397        }
398
399        fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
400            let sent = std::mem::take(&mut *self.sent.lock().unwrap());
401            sent.into_iter()
402                .map(|url| {
403                    (
404                        url,
405                        Ok(HttpResponse {
406                            status: 200,
407                            body: vec![0u8; 256 * 256 * 4],
408                            headers: vec![("Cache-Control".into(), "max-age=30".into())],
409                        }),
410                    )
411                })
412                .collect()
413        }
414    }
415
416    #[test]
417    fn request_and_poll_cycle() {
418        let source = PooledTileSource::new(
419            "https://example.com/{z}/{x}/{y}.png",
420            Box::new(InstantMock::new()),
421            4,
422        );
423
424        source.request(TileId::new(5, 10, 20));
425        let results = source.poll();
426        assert_eq!(results.len(), 1);
427        assert_eq!(results[0].0, TileId::new(5, 10, 20));
428        assert!(results[0].1.is_ok());
429        assert!(results[0]
430            .1
431            .as_ref()
432            .unwrap()
433            .freshness
434            .expires_at
435            .is_some());
436    }
437
438    #[test]
439    fn cancel_removes_pending() {
440        let source = PooledTileSource::new(
441            "https://example.com/{z}/{x}/{y}.png",
442            Box::new(InstantMock::new()),
443            4,
444        );
445
446        source.request(TileId::new(0, 0, 0));
447        source.cancel(TileId::new(0, 0, 0));
448
449        let results = source.poll();
450        // The HTTP mock still returns a response, but we removed the
451        // pending entry so the result is discarded.
452        assert!(results.is_empty());
453    }
454
455    #[test]
456    fn debug_impl() {
457        let source = PooledTileSource::new(
458            "https://example.com/{z}/{x}/{y}.png",
459            Box::new(InstantMock::new()),
460            4,
461        );
462        let dbg = format!("{source:?}");
463        assert!(dbg.contains("PooledTileSource"));
464    }
465
466    #[test]
467    fn default_config_has_shared_raster_defaults() {
468        let config = PooledRasterTileSourceConfig::default();
469        assert_eq!(config.url_template, DEFAULT_RASTER_TILE_URL);
470        assert_eq!(config.source_min_zoom, 0);
471        assert_eq!(config.source_max_zoom, 19);
472        assert_eq!(config.max_concurrent, 32);
473        assert_eq!(config.max_cached, 768);
474        assert_eq!(config.headers.len(), 1);
475        assert_eq!(config.headers[0].0, "User-Agent");
476    }
477
478    struct QueueClient {
479        sent: StdMutex<Vec<String>>,
480        responses: StdMutex<Vec<(String, Result<HttpResponse, String>)>>,
481    }
482
483    impl QueueClient {
484        fn new() -> Self {
485            Self {
486                sent: StdMutex::new(Vec::new()),
487                responses: StdMutex::new(Vec::new()),
488            }
489        }
490
491        fn queue_response(&self, url: String, status: u16, body: Vec<u8>) {
492            self.responses.lock().unwrap().push((
493                url,
494                Ok(HttpResponse {
495                    status,
496                    body,
497                    headers: Vec::new(),
498                }),
499            ));
500        }
501
502        fn queue_error(&self, url: String, error: &str) {
503            self.responses
504                .lock()
505                .unwrap()
506                .push((url, Err(error.to_string())));
507        }
508    }
509
510    impl HttpClient for Arc<QueueClient> {
511        fn send(&self, request: Req) {
512            self.sent.lock().unwrap().push(request.url);
513        }
514
515        fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
516            std::mem::take(&mut *self.responses.lock().unwrap())
517        }
518    }
519
520    #[test]
521    fn diagnostics_count_categorized_failures() {
522        let client = Arc::new(QueueClient::new());
523        let source = PooledTileSource::with_decoder(
524            "https://example.com/{z}/{x}/{y}.png",
525            Box::new(client.clone()),
526            4,
527            Box::new(FailingDecoder),
528        );
529
530        let decode_tile = TileId::new(3, 0, 0);
531        let timeout_tile = TileId::new(3, 0, 1);
532        let status_tile = TileId::new(3, 0, 2);
533        let not_found_tile = TileId::new(3, 0, 3);
534        let cancelled_tile = TileId::new(3, 0, 4);
535
536        for tile_id in [
537            decode_tile,
538            timeout_tile,
539            status_tile,
540            not_found_tile,
541            cancelled_tile,
542        ] {
543            source.request(tile_id);
544        }
545
546        client.queue_response(source.tile_url(&decode_tile), 200, vec![1, 2, 3]);
547        client.queue_error(source.tile_url(&timeout_tile), "request timeout");
548        client.queue_response(source.tile_url(&status_tile), 500, Vec::new());
549        client.queue_response(source.tile_url(&not_found_tile), 404, Vec::new());
550
551        source.cancel(cancelled_tile);
552        client.queue_response(source.tile_url(&cancelled_tile), 200, vec![0; 4]);
553
554        let _ = source.poll();
555        let diagnostics = source.diagnostics().expect("pooled source diagnostics");
556
557        assert_eq!(diagnostics.failure_diagnostics.decode_failures, 1);
558        assert_eq!(diagnostics.failure_diagnostics.transport_failures, 1);
559        assert_eq!(diagnostics.failure_diagnostics.timeout_failures, 1);
560        assert_eq!(diagnostics.failure_diagnostics.http_status_failures, 1);
561        assert_eq!(diagnostics.failure_diagnostics.not_found_failures, 1);
562        assert_eq!(diagnostics.failure_diagnostics.forced_cancellations, 1);
563        assert_eq!(
564            diagnostics.failure_diagnostics.ignored_completed_responses,
565            1
566        );
567    }
568}