Skip to main content

rustial_engine/
pooled_tile_source.rs

1//! Adapter that combines [`HttpTileSource`] with [`FetchPool`] for
2//! priority-ordered, concurrency-limited tile fetching.
3//!
4//! [`PooledTileSource`] implements [`TileSource`] and delegates to a
5//! [`FetchPool`] for scheduling.  This wires together the two utilities
6//! that previously existed as independent components.
7//!
8//! # Example
9//!
10//! ```rust,ignore
11//! use rustial_engine::{PooledTileSource, FetchPool};
12//!
13//! let client: Box<dyn HttpClient> = /* ... */;
14//! let source = PooledTileSource::new(
15//!     "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
16//!     client,
17//!     6,   // max concurrent
18//! );
19//!
20//! // Use as any TileSource:
21//! source.request(tile_id);
22//! let completed = source.poll();
23//! ```
24
25use crate::io::{FetchPool, HttpClient, HttpResponse};
26use crate::tile_source::{
27    DecodedImage, TileData, TileDecoder, TileError, TileFreshness, TileResponse, TileSource,
28    TileSourceDiagnostics, TileSourceFailureDiagnostics,
29};
30use rustial_math::TileId;
31use std::collections::HashMap;
32use std::sync::{Arc, Mutex};
33use std::time::{Duration, SystemTime};
34
35/// Default shared raster tile URL used by built-in examples and integrations.
36pub const DEFAULT_RASTER_TILE_URL: &str =
37    "https://a.basemaps.cartocdn.com/light_all/{z}/{x}/{y}.png";
38
39/// Default shared `User-Agent` header for built-in HTTP tile requests.
40pub const DEFAULT_RASTER_TILE_USER_AGENT: &str =
41    "rustial-example/0.1 (+https://github.com/user/rustial25d)";
42
43/// Renderer-agnostic configuration for a pooled HTTP raster tile source.
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub struct PooledRasterTileSourceConfig {
46    /// URL template with `{z}`, `{x}`, and `{y}` placeholders.
47    pub url_template: String,
48    /// Default request headers applied to every outgoing HTTP request.
49    pub headers: Vec<(String, String)>,
50    /// Minimum zoom level supported by the source.
51    pub source_min_zoom: u8,
52    /// Maximum zoom level supported by the source.
53    pub source_max_zoom: u8,
54    /// Maximum number of concurrent in-flight requests.
55    pub max_concurrent: usize,
56    /// Maximum retained tile count / visible budget for the layer using this source.
57    pub max_cached: usize,
58}
59
60impl Default for PooledRasterTileSourceConfig {
61    fn default() -> Self {
62        Self {
63            url_template: DEFAULT_RASTER_TILE_URL.into(),
64            headers: vec![(
65                "User-Agent".into(),
66                DEFAULT_RASTER_TILE_USER_AGENT.into(),
67            )],
68            source_min_zoom: 0,
69            source_max_zoom: 19,
70            max_concurrent: 32,
71            max_cached: 768,
72        }
73    }
74} // Close the Default implementation here
75
76fn is_timeout_error(error: &str) -> bool {
77    error.to_ascii_lowercase().contains("timeout")
78}
79
80fn parse_cache_control_max_age(value: &str) -> Option<u64> {
81    for directive in value.split(',') {
82        let directive = directive.trim();
83        if let Some(rest) = directive.strip_prefix("max-age=") {
84            if let Ok(seconds) = rest.trim_matches('"').parse::<u64>() {
85                return Some(seconds);
86            }
87        }
88    }
89    None
90}
91
92fn parse_age_seconds(response: &HttpResponse) -> u64 {
93    response
94        .header("age")
95        .and_then(|value| value.parse::<u64>().ok())
96        .unwrap_or(0)
97}
98
99fn parse_http_freshness(response: &HttpResponse) -> TileFreshness {
100    let now = SystemTime::now();
101    let age = parse_age_seconds(response);
102
103    let expires_at = response
104        .header("cache-control")
105        .and_then(parse_cache_control_max_age)
106        .map(|max_age| max_age.saturating_sub(age))
107        .map(Duration::from_secs)
108        .and_then(|ttl| now.checked_add(ttl))
109        .or_else(|| {
110            response
111                .header("expires")
112                .and_then(|value| httpdate::parse_http_date(value).ok())
113        });
114
115    TileFreshness {
116        expires_at,
117        etag: response.header("etag").map(ToOwned::to_owned),
118        last_modified: response.header("last-modified").map(ToOwned::to_owned),
119    }
120}
121
122/// A [`TileSource`] that fetches tiles over HTTP with concurrency limiting
123/// and viewport-center priority ordering via [`FetchPool`].
124pub struct PooledTileSource {
125    /// URL template with `{z}`, `{x}`, `{y}` placeholders.
126    url_template: String,
127    /// The underlying fetch pool that schedules HTTP requests.
128    pool: FetchPool,
129    /// Mapping from request URL to originating `TileId`.
130    pending: Mutex<HashMap<String, TileId>>,
131    /// Optional image decoder (PNG / JPEG / WebP -> RGBA8).
132    decoder: Option<Box<dyn TileDecoder>>,
133    /// Extra headers added to every outgoing request.
134    default_headers: Vec<(String, String)>,
135    /// Categorized source-side failure counters.
136    failure_diagnostics: Mutex<TileSourceFailureDiagnostics>,
137}
138
139impl std::fmt::Debug for PooledTileSource {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        f.debug_struct("PooledTileSource")
142            .field("url_template", &self.url_template)
143            .field("has_decoder", &self.decoder.is_some())
144            .field("queued", &self.pool.queued_count())
145            .field("in_flight", &self.pool.in_flight_count())
146            .finish()
147    }
148}
149
150impl PooledTileSource {
151    /// Create a new pooled tile source without image decoding.
152    ///
153    /// Response bodies are assumed to be raw RGBA8 256x256 pixel data.
154    /// For real tile providers use [`with_decoder`](Self::with_decoder).
155    pub fn new(
156        url_template: impl Into<String>,
157        client: Box<dyn HttpClient>,
158        max_concurrent: usize,
159    ) -> Self {
160        Self {
161            url_template: url_template.into(),
162            pool: FetchPool::new(client, max_concurrent),
163            pending: Mutex::new(HashMap::new()),
164            decoder: None,
165            default_headers: Vec::new(),
166            failure_diagnostics: Mutex::new(TileSourceFailureDiagnostics::default()),
167        }
168    }
169
170    /// Create a new pooled tile source with a [`TileDecoder`].
171    pub fn with_decoder(
172        url_template: impl Into<String>,
173        client: Box<dyn HttpClient>,
174        max_concurrent: usize,
175        decoder: Box<dyn TileDecoder>,
176    ) -> Self {
177        Self {
178            url_template: url_template.into(),
179            pool: FetchPool::new(client, max_concurrent),
180            pending: Mutex::new(HashMap::new()),
181            decoder: Some(decoder),
182            default_headers: Vec::new(),
183            failure_diagnostics: Mutex::new(TileSourceFailureDiagnostics::default()),
184        }
185    }
186
187    /// Add a default header that will be sent with every tile request.
188    pub fn with_header(
189        mut self,
190        name: impl Into<String>,
191        value: impl Into<String>,
192    ) -> Self {
193        self.default_headers.push((name.into(), value.into()));
194        self
195    }
196
197    /// Expand the URL template for a given tile ID.
198    fn tile_url(&self, id: &TileId) -> String {
199        self.url_template
200            .replace("{z}", &id.zoom.to_string())
201            .replace("{x}", &id.x.to_string())
202            .replace("{y}", &id.y.to_string())
203    }
204
205    /// Compute a priority score for a tile.
206    ///
207    /// Lower zoom tiles are higher priority (loaded first as fallbacks).
208    /// Within the same zoom, tiles are equal priority (the pool's
209    /// insertion order breaks ties).
210    fn tile_priority(id: &TileId) -> f64 {
211        id.zoom as f64
212    }
213}
214
215impl TileSource for PooledTileSource {
216    fn request(&self, id: TileId) {
217        let url = self.tile_url(&id);
218
219        if let Ok(mut pending) = self.pending.lock() {
220            pending.insert(url.clone(), id);
221        }
222
223        let mut request = crate::HttpRequest::get(&url);
224        for (name, value) in &self.default_headers {
225            request = request.with_header(name.clone(), value.clone());
226        }
227
228        self.pool.enqueue(request, Self::tile_priority(&id));
229        self.pool.flush();
230    }
231
232    fn request_revalidate(&self, id: TileId, hint: crate::tile_source::RevalidationHint) {
233        let url = self.tile_url(&id);
234
235        if let Ok(mut pending) = self.pending.lock() {
236            pending.insert(url.clone(), id);
237        }
238
239        let mut request = crate::HttpRequest::get(&url);
240        for (name, value) in &self.default_headers {
241            request = request.with_header(name.clone(), value.clone());
242        }
243        if let Some(etag) = &hint.etag {
244            request = request.with_header("If-None-Match", etag.clone());
245        }
246        if let Some(last_modified) = &hint.last_modified {
247            request = request.with_header("If-Modified-Since", last_modified.clone());
248        }
249
250        self.pool.enqueue(request, Self::tile_priority(&id));
251        self.pool.flush();
252    }
253
254    fn poll(&self) -> Vec<(TileId, Result<TileResponse, TileError>)> {
255        let responses = self.pool.poll();
256        if responses.is_empty() {
257            return Vec::new();
258        }
259
260        let mut pending = match self.pending.lock() {
261            Ok(p) => p,
262            Err(_) => return Vec::new(),
263        };
264
265        let mut results = Vec::with_capacity(responses.len());
266
267        for (url, response) in responses {
268            let tile_id = match pending.remove(&url) {
269                Some(id) => id,
270                None => {
271                    if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
272                        diagnostics.ignored_completed_responses += 1;
273                    }
274                    continue;
275                }
276            };
277
278            match response {
279                Ok(resp) if resp.status == 304 => {
280                    let freshness = parse_http_freshness(&resp);
281                    results.push((tile_id, Ok(TileResponse::not_modified(freshness))));
282                }
283                Ok(resp) if resp.is_success() => {
284                    let freshness = parse_http_freshness(&resp);
285                    let tile_result = if let Some(ref decoder) = self.decoder {
286                        decoder
287                            .decode(&resp.body)
288                            .map(TileData::Raster)
289                            .map(|data| TileResponse { data, freshness, not_modified: false })
290                    } else {
291                        Ok(TileResponse {
292                            data: TileData::Raster(DecodedImage {
293                                width: 256,
294                                height: 256,
295                                data: Arc::new(resp.body),
296                            }),
297                            freshness,
298                            not_modified: false,
299                        })
300                    };
301                    if tile_result.is_err() {
302                        if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
303                            diagnostics.decode_failures += 1;
304                        }
305                    }
306                    results.push((tile_id, tile_result));
307                }
308                Ok(resp) if resp.status == 404 => {
309                    if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
310                        diagnostics.not_found_failures += 1;
311                    }
312                    results.push((tile_id, Err(TileError::NotFound(tile_id))));
313                }
314                Ok(resp) => {
315                    if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
316                        diagnostics.http_status_failures += 1;
317                    }
318                    results.push((
319                        tile_id,
320                        Err(TileError::Network(format!("HTTP {}", resp.status))),
321                    ));
322                }
323                Err(err) => {
324                    if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
325                        diagnostics.transport_failures += 1;
326                        if is_timeout_error(&err) {
327                            diagnostics.timeout_failures += 1;
328                        }
329                    }
330                    results.push((tile_id, Err(TileError::Network(err))));
331                }
332            }
333        }
334
335        results
336    }
337
338    fn cancel(&self, id: TileId) {
339        let url = self.tile_url(&id);
340        let removed = if let Ok(mut pending) = self.pending.lock() {
341            pending.remove(&url).is_some()
342        } else {
343            false
344        };
345        if removed {
346            if let Ok(mut diagnostics) = self.failure_diagnostics.lock() {
347                diagnostics.forced_cancellations += 1;
348            }
349        }
350        self.pool.force_cancel(&url);
351    }
352
353    fn diagnostics(&self) -> Option<TileSourceDiagnostics> {
354        let failure_diagnostics = self
355            .failure_diagnostics
356            .lock()
357            .map(|diagnostics| diagnostics.clone())
358            .unwrap_or_default();
359        Some(TileSourceDiagnostics {
360            queued_requests: self.pool.queued_count(),
361            in_flight_requests: self.pool.in_flight_count(),
362            known_requests: self.pool.known_count(),
363            cancelled_in_flight_requests: self.pool.cancelled_in_flight_count(),
364            max_concurrent_requests: self.pool.max_concurrent(),
365            pending_decode_tasks: 0,
366            failure_diagnostics,
367        })
368    }
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374    use crate::io::{HttpRequest as Req, HttpResponse};
375    use std::sync::Mutex as StdMutex;
376
377    struct FailingDecoder;
378
379    impl TileDecoder for FailingDecoder {
380        fn decode(&self, _bytes: &[u8]) -> Result<DecodedImage, TileError> {
381            Err(TileError::Decode("bad image".into()))
382        }
383    }
384
385    struct InstantMock {
386        sent: StdMutex<Vec<String>>,
387    }
388
389    impl InstantMock {
390        fn new() -> Self {
391            Self {
392                sent: StdMutex::new(Vec::new()),
393            }
394        }
395    }
396
397    impl HttpClient for InstantMock {
398        fn send(&self, request: Req) {
399            self.sent.lock().unwrap().push(request.url);
400        }
401
402        fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
403            let sent = std::mem::take(&mut *self.sent.lock().unwrap());
404            sent.into_iter()
405                .map(|url| {
406                    (
407                        url,
408                        Ok(HttpResponse {
409                            status: 200,
410                            body: vec![0u8; 256 * 256 * 4],
411                            headers: vec![("Cache-Control".into(), "max-age=30".into())],
412                        }),
413                    )
414                })
415                .collect()
416        }
417    }
418
419    #[test]
420    fn request_and_poll_cycle() {
421        let source = PooledTileSource::new(
422            "https://example.com/{z}/{x}/{y}.png",
423            Box::new(InstantMock::new()),
424            4,
425        );
426
427        source.request(TileId::new(5, 10, 20));
428        let results = source.poll();
429        assert_eq!(results.len(), 1);
430        assert_eq!(results[0].0, TileId::new(5, 10, 20));
431        assert!(results[0].1.is_ok());
432        assert!(results[0].1.as_ref().unwrap().freshness.expires_at.is_some());
433    }
434
435    #[test]
436    fn cancel_removes_pending() {
437        let source = PooledTileSource::new(
438            "https://example.com/{z}/{x}/{y}.png",
439            Box::new(InstantMock::new()),
440            4,
441        );
442
443        source.request(TileId::new(0, 0, 0));
444        source.cancel(TileId::new(0, 0, 0));
445
446        let results = source.poll();
447        // The HTTP mock still returns a response, but we removed the
448        // pending entry so the result is discarded.
449        assert!(results.is_empty());
450    }
451
452    #[test]
453    fn debug_impl() {
454        let source = PooledTileSource::new(
455            "https://example.com/{z}/{x}/{y}.png",
456            Box::new(InstantMock::new()),
457            4,
458        );
459        let dbg = format!("{source:?}");
460        assert!(dbg.contains("PooledTileSource"));
461    }
462
463    #[test]
464    fn default_config_has_shared_raster_defaults() {
465        let config = PooledRasterTileSourceConfig::default();
466        assert_eq!(config.url_template, DEFAULT_RASTER_TILE_URL);
467        assert_eq!(config.source_min_zoom, 0);
468        assert_eq!(config.source_max_zoom, 19);
469        assert_eq!(config.max_concurrent, 32);
470        assert_eq!(config.max_cached, 768);
471        assert_eq!(config.headers.len(), 1);
472        assert_eq!(config.headers[0].0, "User-Agent");
473    }
474
475    struct QueueClient {
476        sent: StdMutex<Vec<String>>,
477        responses: StdMutex<Vec<(String, Result<HttpResponse, String>)>>,
478    }
479
480    impl QueueClient {
481        fn new() -> Self {
482            Self {
483                sent: StdMutex::new(Vec::new()),
484                responses: StdMutex::new(Vec::new()),
485            }
486        }
487
488        fn queue_response(&self, url: String, status: u16, body: Vec<u8>) {
489            self.responses.lock().unwrap().push((
490                url,
491                Ok(HttpResponse {
492                    status,
493                    body,
494                    headers: Vec::new(),
495                }),
496            ));
497        }
498
499        fn queue_error(&self, url: String, error: &str) {
500            self.responses
501                .lock()
502                .unwrap()
503                .push((url, Err(error.to_string())));
504        }
505    }
506
507    impl HttpClient for Arc<QueueClient> {
508        fn send(&self, request: Req) {
509            self.sent.lock().unwrap().push(request.url);
510        }
511
512        fn poll(&self) -> Vec<(String, Result<HttpResponse, String>)> {
513            std::mem::take(&mut *self.responses.lock().unwrap())
514        }
515    }
516
517    #[test]
518    fn diagnostics_count_categorized_failures() {
519        let client = Arc::new(QueueClient::new());
520        let source = PooledTileSource::with_decoder(
521            "https://example.com/{z}/{x}/{y}.png",
522            Box::new(client.clone()),
523            4,
524            Box::new(FailingDecoder),
525        );
526
527        let decode_tile = TileId::new(3, 0, 0);
528        let timeout_tile = TileId::new(3, 0, 1);
529        let status_tile = TileId::new(3, 0, 2);
530        let not_found_tile = TileId::new(3, 0, 3);
531        let cancelled_tile = TileId::new(3, 0, 4);
532
533        for tile_id in [decode_tile, timeout_tile, status_tile, not_found_tile, cancelled_tile] {
534            source.request(tile_id);
535        }
536
537        client.queue_response(source.tile_url(&decode_tile), 200, vec![1, 2, 3]);
538        client.queue_error(source.tile_url(&timeout_tile), "request timeout");
539        client.queue_response(source.tile_url(&status_tile), 500, Vec::new());
540        client.queue_response(source.tile_url(&not_found_tile), 404, Vec::new());
541
542        source.cancel(cancelled_tile);
543        client.queue_response(source.tile_url(&cancelled_tile), 200, vec![0; 4]);
544
545        let _ = source.poll();
546        let diagnostics = source.diagnostics().expect("pooled source diagnostics");
547
548        assert_eq!(diagnostics.failure_diagnostics.decode_failures, 1);
549        assert_eq!(diagnostics.failure_diagnostics.transport_failures, 1);
550        assert_eq!(diagnostics.failure_diagnostics.timeout_failures, 1);
551        assert_eq!(diagnostics.failure_diagnostics.http_status_failures, 1);
552        assert_eq!(diagnostics.failure_diagnostics.not_found_failures, 1);
553        assert_eq!(diagnostics.failure_diagnostics.forced_cancellations, 1);
554        assert_eq!(diagnostics.failure_diagnostics.ignored_completed_responses, 1);
555    }
556}