Skip to main content

rustial_engine/
async_data.rs

1//! Async retained data pipeline (worker-equivalent architecture.
2//!
3//! This module provides the infrastructure for offloading heavy per-frame CPU
4//! work (terrain mesh generation, vector tessellation, symbol placement) to
5//! background tasks, matching the MapLibre/Mapbox web worker model.
6//!
7//! ## Architecture
8//!
9//! The [`DataTaskPool`] trait is an executor-agnostic interface that the host
10//! or renderer injects into [`MapState`](crate::MapState).  The engine
11//! submits work items via typed spawn methods and polls completed results
12//! via [`DataTaskResultReceiver::try_recv`] each frame.
13//!
14//! When no task pool is set, `MapState` falls back to synchronous inline
15//! execution (current behavior), ensuring backward compatibility.
16//!
17//! ## Task families
18//!
19//! | Family | Input | Output |
20//! |--------|-------|--------|
21//! | Terrain | `TerrainTaskInput` (tile, elevation grid, config) | `TerrainTaskOutput` (mesh + hillshade) |
22//! | Vector | `VectorTaskInput` (features, style, terrain samples, projection) | `VectorTaskOutput` (tessellated mesh + symbol candidates) |
23//!
24//! ## Retained model
25//!
26//! Completed results are stored per cache key and reused across frames until
27//! invalidated by source data changes, camera origin movement, or zoom-level
28//! style property changes.
29//!
30//! ## Per-tile vector buckets (MapLibre parity)
31//!
32//! Vector features can be spatially partitioned into per-tile buckets using
33//! [`partition_features_by_tile`].  Each bucket is dispatched, tessellated,
34//! and cached independently via [`VectorBucketKey`], matching MapLibre's
35//! per-tile bucket architecture.  Only buckets whose tiles are in the
36//! visible set are collected for rendering; off-screen buckets are retained
37//! in the cache for reuse when the camera pans back.
38
39use crate::camera_projection::CameraProjection;
40use crate::geometry::{Feature, FeatureCollection, Geometry};
41use crate::layers::{FeatureProvenance, VectorMeshData, VectorStyle};
42use crate::symbols::SymbolCandidate;
43use crate::terrain::{PreparedHillshadeRaster, TerrainMeshData};
44use crate::tile_manager::TileTextureRegion;
45use rustial_math::{geo_to_tile, ElevationGrid, GeoCoord, TileId};
46use std::collections::HashMap;
47use std::sync::Arc;
48
49// ---------------------------------------------------------------------------
50// DataTaskPool trait
51// ---------------------------------------------------------------------------
52
53/// A result receiver for completed async data tasks.
54///
55/// Implementations should be non-blocking: [`try_recv`](Self::try_recv)
56/// returns `None` when no result is available yet.
57pub trait DataTaskResultReceiver<T: Send + 'static>: Send + Sync {
58    /// Try to receive a completed result without blocking.
59    ///
60    /// Returns `Some(result)` if a result is ready, `None` otherwise.
61    fn try_recv(&self) -> Option<T>;
62}
63
64/// An executor-agnostic task pool for offloading heavy data pipeline work.
65///
66/// The engine submits work items and polls result channels without depending
67/// on a specific async runtime.  Implementations may use:
68///
69/// - Bevy's `AsyncComputeTaskPool`
70/// - A `rayon` thread pool
71/// - A `tokio` runtime
72/// - A simple `std::thread`-based pool
73///
74/// # Contract
75///
76/// - Spawn methods must not block.  They should enqueue the closure
77///   for execution on a background thread and return a receiver immediately.
78/// - The returned receiver must be `Send + Sync` so it can be stored in
79///   `MapState`.
80/// - The closure must be `Send + 'static` because it will execute on a
81///   different thread.
82pub trait DataTaskPool: Send + Sync {
83    /// Spawn a terrain mesh generation task.
84    fn spawn_terrain(
85        &self,
86        task: Box<dyn FnOnce() -> TerrainTaskOutput + Send + 'static>,
87    ) -> Box<dyn DataTaskResultReceiver<TerrainTaskOutput>>;
88
89    /// Spawn a vector tessellation task.
90    fn spawn_vector(
91        &self,
92        task: Box<dyn FnOnce() -> VectorTaskOutput + Send + 'static>,
93    ) -> Box<dyn DataTaskResultReceiver<VectorTaskOutput>>;
94
95    /// Spawn an MVT decode task.
96    ///
97    /// The closure decodes raw PBF bytes into a [`VectorTileData`] on a
98    /// background thread, matching MapLibre's web worker model.
99    fn spawn_decode(
100        &self,
101        task: Box<dyn FnOnce() -> MvtDecodeOutput + Send + 'static>,
102    ) -> Box<dyn DataTaskResultReceiver<MvtDecodeOutput>>;
103}
104
105// ---------------------------------------------------------------------------
106// Built-in std::thread task pool
107// ---------------------------------------------------------------------------
108
109/// A simple `std::thread`-based [`DataTaskPool`] implementation.
110///
111/// Each spawned task runs on a new OS thread.  This is suitable for
112/// host applications that do not use Bevy or another async runtime.
113///
114/// For production use with many concurrent tasks, prefer a thread-pool
115/// based implementation (e.g. `rayon` or Bevy's `ComputeTaskPool`).
116pub struct ThreadDataTaskPool;
117
118impl ThreadDataTaskPool {
119    /// Create a new thread-based task pool.
120    pub fn new() -> Self {
121        Self
122    }
123}
124
125impl Default for ThreadDataTaskPool {
126    fn default() -> Self {
127        Self::new()
128    }
129}
130
131struct ChannelReceiver<T> {
132    rx: std::sync::mpsc::Receiver<T>,
133}
134
135impl<T: Send + 'static> DataTaskResultReceiver<T> for ChannelReceiver<T> {
136    fn try_recv(&self) -> Option<T> {
137        self.rx.try_recv().ok()
138    }
139}
140
141// Safety: `std::sync::mpsc::Receiver` is `Send` but not `Sync` by default.
142// However, we only access it via `try_recv()` which is safe to call from
143// any single thread (MapState is behind RwLock with exclusive write access).
144// We wrap it to satisfy the Sync bound required by MapState's Send + Sync.
145unsafe impl<T: Send> Sync for ChannelReceiver<T> {}
146
147fn spawn_on_thread<T: Send + 'static>(
148    task: Box<dyn FnOnce() -> T + Send + 'static>,
149) -> Box<dyn DataTaskResultReceiver<T>> {
150    let (tx, rx) = std::sync::mpsc::channel();
151    std::thread::spawn(move || {
152        let result = task();
153        let _ = tx.send(result);
154    });
155    Box::new(ChannelReceiver { rx })
156}
157
158impl DataTaskPool for ThreadDataTaskPool {
159    fn spawn_terrain(
160        &self,
161        task: Box<dyn FnOnce() -> TerrainTaskOutput + Send + 'static>,
162    ) -> Box<dyn DataTaskResultReceiver<TerrainTaskOutput>> {
163        spawn_on_thread(task)
164    }
165
166    fn spawn_vector(
167        &self,
168        task: Box<dyn FnOnce() -> VectorTaskOutput + Send + 'static>,
169    ) -> Box<dyn DataTaskResultReceiver<VectorTaskOutput>> {
170        spawn_on_thread(task)
171    }
172
173    fn spawn_decode(
174        &self,
175        task: Box<dyn FnOnce() -> MvtDecodeOutput + Send + 'static>,
176    ) -> Box<dyn DataTaskResultReceiver<MvtDecodeOutput>> {
177        spawn_on_thread(task)
178    }
179}
180
181// ---------------------------------------------------------------------------
182// Task inputs and outputs
183// ---------------------------------------------------------------------------
184
185/// Input for an async terrain mesh generation task.
186#[derive(Clone)]
187pub struct TerrainTaskInput {
188    /// Tile to generate terrain for.
189    pub tile: TileId,
190    /// DEM source tile backing this visible terrain tile.
191    pub elevation_source_tile: TileId,
192    /// Normalized sub-region inside the DEM source tile.
193    pub elevation_region: TileTextureRegion,
194    /// Elevation grid data for this tile.
195    pub elevation: ElevationGrid,
196    /// Mesh grid resolution.
197    pub resolution: u16,
198    /// Vertical exaggeration factor.
199    pub vertical_exaggeration: f64,
200    /// Generation counter for cache invalidation.
201    pub generation: u64,
202}
203
204/// Output from an async terrain mesh generation task.
205pub struct TerrainTaskOutput {
206    /// The tile this result is for.
207    pub tile: TileId,
208    /// Generated terrain mesh descriptor.
209    pub mesh: TerrainMeshData,
210    /// Prepared hillshade raster for this tile.
211    pub hillshade: PreparedHillshadeRaster,
212    /// Generation counter for cache invalidation.
213    pub generation: u64,
214}
215
216/// Cache key for a retained terrain task result.
217#[derive(Debug, Clone, PartialEq, Eq, Hash)]
218pub struct TerrainCacheKey {
219    /// Tile id.
220    pub tile: TileId,
221    /// Elevation data generation.
222    pub generation: u64,
223    /// Grid resolution.
224    pub resolution: u16,
225}
226
227/// Input for an async vector tessellation task.
228#[derive(Clone)]
229pub struct VectorTaskInput {
230    /// Unique key for this (layer, data generation) pair.
231    pub cache_key: VectorCacheKey,
232    /// The feature collection to tessellate.
233    pub features: FeatureCollection,
234    /// The vector style to apply.
235    pub style: VectorStyle,
236    /// Originating style/runtime layer id, when known.
237    pub query_layer_id: Option<String>,
238    /// Originating style source id, when known.
239    pub query_source_id: Option<String>,
240    /// Originating style source-layer id, when known.
241    pub query_source_layer: Option<String>,
242    /// Per-feature tile/source-layer provenance.
243    pub feature_provenance: Vec<Option<FeatureProvenance>>,
244    /// Active camera projection.
245    pub projection: CameraProjection,
246    /// Per-vertex terrain elevation samples (coord -> elevation).
247    /// Empty when terrain is disabled.
248    pub terrain_samples: Vec<(GeoCoord, f64)>,
249}
250
251/// Output from an async vector tessellation task.
252pub struct VectorTaskOutput {
253    /// Cache key matching the input.
254    pub cache_key: VectorCacheKey,
255    /// Tessellated mesh data.
256    pub mesh: VectorMeshData,
257    /// Symbol candidates extracted during tessellation.
258    pub symbol_candidates: Vec<SymbolCandidate>,
259}
260
261/// Output from an async MVT decode task.
262///
263/// Produced by [`DataTaskPool::spawn_decode`] on a background thread and
264/// consumed by [`AsyncDataPipeline::poll_decodes`] to promote raw vector
265/// payloads in the tile cache to fully decoded [`TileData::Vector`] data.
266pub struct MvtDecodeOutput {
267    /// The tile that was decoded.
268    pub tile: TileId,
269    /// Decode result -- either decoded vector data or an error.
270    pub result: Result<crate::tile_source::VectorTileData, crate::tile_source::TileError>,
271    /// Freshness metadata from the original HTTP response, carried through
272    /// so that the promoted cache entry retains correct TTL information.
273    pub freshness: crate::tile_source::TileFreshness,
274}
275
276/// Cache key for a retained vector task result.
277#[derive(Debug, Clone, PartialEq, Eq, Hash)]
278pub struct VectorCacheKey {
279    /// Layer identifier (name or style layer id).
280    pub layer_id: String,
281    /// Data generation counter.
282    pub data_generation: u64,
283    /// Projection used for tessellation.
284    pub projection: CameraProjection,
285}
286
287// ---------------------------------------------------------------------------
288// Per-tile vector bucket key and partitioning
289// ---------------------------------------------------------------------------
290
291/// Cache key for a per-tile vector bucket.
292///
293/// This is the fine-grained cache key that matches MapLibre's per-tile
294/// bucket architecture.  Each `(layer, tile, generation, projection)` tuple
295/// maps to an independently tessellated and cached mesh.
296#[derive(Debug, Clone, PartialEq, Eq, Hash)]
297pub struct VectorBucketKey {
298    /// Layer identifier.
299    pub layer_id: String,
300    /// Tile that this bucket covers.
301    pub tile: TileId,
302    /// Data generation counter for cache invalidation.
303    pub data_generation: u64,
304    /// Projection used for tessellation.
305    pub projection: CameraProjection,
306}
307
308/// Return the representative coordinate for a feature's geometry.
309///
310/// Uses the first coordinate found in the geometry (point position,
311/// first line vertex, first polygon vertex, etc.).  This is sufficient
312/// for tile assignment: a feature is assigned to exactly one tile based
313/// on its representative point, matching MapLibre's bucket-assignment
314/// strategy.
315fn representative_coord(geometry: &Geometry) -> Option<GeoCoord> {
316    match geometry {
317        Geometry::Point(p) => Some(p.coord),
318        Geometry::LineString(ls) => ls.coords.first().copied(),
319        Geometry::Polygon(poly) => poly.exterior.first().copied(),
320        Geometry::MultiPoint(mp) => mp.points.first().map(|p| p.coord),
321        Geometry::MultiLineString(mls) => {
322            mls.lines.first().and_then(|ls| ls.coords.first().copied())
323        }
324        Geometry::MultiPolygon(mpoly) => mpoly
325            .polygons
326            .first()
327            .and_then(|p| p.exterior.first().copied()),
328        Geometry::GeometryCollection(geoms) => geoms.iter().find_map(representative_coord),
329    }
330}
331
332/// Partition a feature collection into per-tile sub-collections.
333///
334/// Each feature is assigned to exactly one tile based on its representative
335/// coordinate (first vertex).  Features whose geometry has no coordinates
336/// are dropped.
337///
338/// # Arguments
339///
340/// * `features` - The source feature collection to partition.
341/// * `zoom` - The zoom level at which to compute tile assignments.
342///
343/// # Returns
344///
345/// A map from `TileId` to the subset of features that fall within that tile.
346pub fn partition_features_by_tile(
347    features: &FeatureCollection,
348    zoom: u8,
349) -> HashMap<TileId, FeatureCollection> {
350    let mut buckets: HashMap<TileId, Vec<Feature>> = HashMap::new();
351    for feature in &features.features {
352        if let Some(coord) = representative_coord(&feature.geometry) {
353            let tile_coord = geo_to_tile(&coord, zoom);
354            let tile_id = tile_coord.tile_id();
355            buckets.entry(tile_id).or_default().push(feature.clone());
356        }
357    }
358    buckets
359        .into_iter()
360        .map(|(tile, feats)| (tile, FeatureCollection { features: feats }))
361        .collect()
362}
363
364// ---------------------------------------------------------------------------
365// Pending task handles
366// ---------------------------------------------------------------------------
367
368struct PendingTerrainTask {
369    key: TerrainCacheKey,
370    receiver: Box<dyn DataTaskResultReceiver<TerrainTaskOutput>>,
371}
372
373struct PendingVectorTask {
374    key: VectorCacheKey,
375    receiver: Box<dyn DataTaskResultReceiver<VectorTaskOutput>>,
376}
377
378struct PendingVectorBucketTask {
379    key: VectorBucketKey,
380    receiver: Box<dyn DataTaskResultReceiver<VectorBucketOutput>>,
381}
382
383struct PendingDecodeTask {
384    tile: TileId,
385    receiver: Box<dyn DataTaskResultReceiver<MvtDecodeOutput>>,
386}
387
388/// Output from an async per-tile vector bucket tessellation task.
389struct VectorBucketOutput {
390    mesh: VectorMeshData,
391    symbol_candidates: Vec<SymbolCandidate>,
392}
393
394// ---------------------------------------------------------------------------
395// AsyncDataPipeline
396// ---------------------------------------------------------------------------
397
398/// Manages async data task dispatch and result polling.
399///
400/// This is the internal coordinator that [`MapState`](crate::MapState) uses
401/// to submit terrain/vector/symbol work to a [`DataTaskPool`] and collect
402/// completed results each frame.
403pub(crate) struct AsyncDataPipeline {
404    pool: Arc<dyn DataTaskPool>,
405    pending_terrain: Vec<PendingTerrainTask>,
406    pending_vector: Vec<PendingVectorTask>,
407    terrain_cache: HashMap<TerrainCacheKey, (TerrainMeshData, PreparedHillshadeRaster)>,
408    vector_cache: HashMap<VectorCacheKey, (VectorMeshData, Vec<SymbolCandidate>)>,
409    next_vector_generation: u64,
410    layer_generations: HashMap<String, u64>,
411    // Per-tile vector bucket state
412    pending_buckets: Vec<PendingVectorBucketTask>,
413    bucket_cache: HashMap<VectorBucketKey, (VectorMeshData, Vec<SymbolCandidate>)>,
414    // Background MVT decode state
415    pending_decodes: Vec<PendingDecodeTask>,
416    /// Completed decode results waiting to be promoted into the tile cache.
417    ///
418    /// Each entry is a `(TileId, TileResponse)` ready to replace the
419    /// `TileData::RawVector` entry in the tile cache with a fully decoded
420    /// `TileData::Vector`.
421    pub(crate) decoded_tiles: Vec<(TileId, crate::tile_source::TileResponse)>,
422}
423
424impl AsyncDataPipeline {
425    pub(crate) fn new(pool: Arc<dyn DataTaskPool>) -> Self {
426        Self {
427            pool,
428            pending_terrain: Vec::new(),
429            pending_vector: Vec::new(),
430            terrain_cache: HashMap::new(),
431            vector_cache: HashMap::new(),
432            next_vector_generation: 1,
433            layer_generations: HashMap::new(),
434            pending_buckets: Vec::new(),
435            bucket_cache: HashMap::new(),
436            pending_decodes: Vec::new(),
437            decoded_tiles: Vec::new(),
438        }
439    }
440
441    // -- Terrain ----------------------------------------------------------
442
443    pub(crate) fn dispatch_terrain(&mut self, input: TerrainTaskInput) {
444        let key = TerrainCacheKey {
445            tile: input.tile,
446            generation: input.generation,
447            resolution: input.resolution,
448        };
449        if self.terrain_cache.contains_key(&key) {
450            return;
451        }
452        if self.pending_terrain.iter().any(|p| p.key == key) {
453            return;
454        }
455        let receiver = self.pool.spawn_terrain(Box::new(move || {
456            let mesh = crate::terrain::build_terrain_descriptor_with_source(
457                &input.tile,
458                input.elevation_source_tile,
459                input.elevation_region,
460                &input.elevation,
461                input.resolution,
462                input.vertical_exaggeration,
463                input.generation,
464            );
465            let hillshade = crate::terrain::prepare_hillshade_raster(
466                &input.elevation,
467                input.vertical_exaggeration,
468                input.generation,
469            );
470            TerrainTaskOutput {
471                tile: input.tile,
472                mesh,
473                hillshade,
474                generation: input.generation,
475            }
476        }));
477        self.pending_terrain
478            .push(PendingTerrainTask { key, receiver });
479    }
480
481    pub(crate) fn poll_terrain(&mut self) {
482        let mut still_pending = Vec::new();
483        for task in self.pending_terrain.drain(..) {
484            if let Some(output) = task.receiver.try_recv() {
485                self.terrain_cache
486                    .insert(task.key, (output.mesh, output.hillshade));
487            } else {
488                still_pending.push(task);
489            }
490        }
491        self.pending_terrain = still_pending;
492    }
493
494    #[allow(dead_code)]
495    pub(crate) fn collect_terrain(
496        &self,
497        desired_tiles: &[TileId],
498        tile_generations: &HashMap<TileId, u64>,
499        resolution: u16,
500    ) -> (Vec<TerrainMeshData>, Vec<PreparedHillshadeRaster>) {
501        let mut meshes = Vec::with_capacity(desired_tiles.len());
502        let mut hillshades = Vec::with_capacity(desired_tiles.len());
503        for tile in desired_tiles {
504            let generation = tile_generations.get(tile).copied().unwrap_or(0);
505            let key = TerrainCacheKey {
506                tile: *tile,
507                generation,
508                resolution,
509            };
510            if let Some((mesh, hs)) = self.terrain_cache.get(&key) {
511                meshes.push(mesh.clone());
512                hillshades.push(hs.clone());
513            }
514        }
515        (meshes, hillshades)
516    }
517
518    /// Collect all currently cached terrain results regardless of tile key.
519    pub(crate) fn collect_all_terrain(
520        &self,
521    ) -> (Vec<TerrainMeshData>, Vec<PreparedHillshadeRaster>) {
522        let mut meshes = Vec::with_capacity(self.terrain_cache.len());
523        let mut hillshades = Vec::with_capacity(self.terrain_cache.len());
524        for (mesh, hs) in self.terrain_cache.values() {
525            meshes.push(mesh.clone());
526            hillshades.push(hs.clone());
527        }
528        (meshes, hillshades)
529    }
530
531    pub(crate) fn prune_terrain(&mut self, visible_tiles: &[TileId]) {
532        let visible_set: std::collections::HashSet<TileId> =
533            visible_tiles.iter().copied().collect();
534        self.terrain_cache
535            .retain(|key, _| visible_set.contains(&key.tile));
536    }
537
538    // -- Per-layer vector (legacy) ----------------------------------------
539
540    pub(crate) fn layer_generation(&mut self, layer_id: &str, features_changed: bool) -> u64 {
541        if features_changed {
542            let gen = self.next_vector_generation;
543            self.next_vector_generation += 1;
544            self.layer_generations.insert(layer_id.to_owned(), gen);
545            gen
546        } else {
547            self.layer_generations
548                .get(layer_id)
549                .copied()
550                .unwrap_or_else(|| {
551                    let gen = self.next_vector_generation;
552                    self.next_vector_generation += 1;
553                    self.layer_generations.insert(layer_id.to_owned(), gen);
554                    gen
555                })
556        }
557    }
558
559    pub(crate) fn dispatch_vector(&mut self, input: VectorTaskInput) {
560        let key = input.cache_key.clone();
561        if self.vector_cache.contains_key(&key) {
562            return;
563        }
564        if self.pending_vector.iter().any(|p| p.key == key) {
565            return;
566        }
567        let receiver = self.pool.spawn_vector(Box::new(move || {
568            let mut features = input.features;
569            if !input.terrain_samples.is_empty() {
570                apply_terrain_samples(&mut features, &input.terrain_samples);
571            }
572            let mut temp_layer = crate::layers::VectorLayer::new(
573                &input.cache_key.layer_id,
574                FeatureCollection::default(),
575                input.style,
576            )
577            .with_query_metadata(input.query_layer_id, input.query_source_id)
578            .with_source_layer(input.query_source_layer);
579            temp_layer.set_features_with_provenance(features, input.feature_provenance);
580            let mesh = temp_layer.tessellate(input.projection);
581            let candidates = temp_layer.symbol_candidates();
582            VectorTaskOutput {
583                cache_key: input.cache_key,
584                mesh,
585                symbol_candidates: candidates,
586            }
587        }));
588        self.pending_vector
589            .push(PendingVectorTask { key, receiver });
590    }
591
592    pub(crate) fn poll_vector(&mut self) {
593        let mut still_pending = Vec::new();
594        for task in self.pending_vector.drain(..) {
595            if let Some(output) = task.receiver.try_recv() {
596                self.vector_cache
597                    .insert(task.key, (output.mesh, output.symbol_candidates));
598            } else {
599                still_pending.push(task);
600            }
601        }
602        self.pending_vector = still_pending;
603    }
604
605    pub(crate) fn collect_vectors(
606        &self,
607        layer_keys: &[VectorCacheKey],
608    ) -> (Vec<VectorMeshData>, Vec<SymbolCandidate>) {
609        let mut meshes = Vec::new();
610        let mut candidates = Vec::new();
611        for key in layer_keys {
612            if let Some((mesh, cands)) = self.vector_cache.get(key) {
613                if !mesh.is_empty() {
614                    meshes.push(mesh.clone());
615                }
616                candidates.extend(cands.iter().cloned());
617            }
618        }
619        (meshes, candidates)
620    }
621
622    pub(crate) fn prune_vectors(&mut self, active_layer_ids: &[String]) {
623        let active_set: std::collections::HashSet<&str> =
624            active_layer_ids.iter().map(String::as_str).collect();
625        self.vector_cache
626            .retain(|key, _| active_set.contains(key.layer_id.as_str()));
627        self.layer_generations
628            .retain(|id, _| active_set.contains(id.as_str()));
629    }
630
631    #[cfg(test)]
632    pub(crate) fn current_layer_generation(&self, layer_id: &str) -> Option<u64> {
633        self.layer_generations.get(layer_id).copied()
634    }
635
636    // -- Per-tile vector buckets ------------------------------------------
637
638    /// Dispatch per-tile vector tessellation tasks for a partitioned feature set.
639    ///
640    /// For each `(tile, features)` pair, a background task is spawned that
641    /// tessellates only the features belonging to that tile.  Results are
642    /// cached by [`VectorBucketKey`] and deduplicated against both the
643    /// pending queue and the completed cache.
644    #[allow(clippy::too_many_arguments)]
645    pub(crate) fn dispatch_vector_buckets(
646        &mut self,
647        layer_id: &str,
648        data_generation: u64,
649        projection: CameraProjection,
650        style: &VectorStyle,
651        tile_features: &HashMap<TileId, FeatureCollection>,
652        query_source_id: Option<&str>,
653        query_source_layer: Option<&str>,
654        tile_provenance: &HashMap<TileId, Vec<Option<FeatureProvenance>>>,
655        terrain_samples: &[(GeoCoord, f64)],
656    ) {
657        for (tile, features) in tile_features {
658            let key = VectorBucketKey {
659                layer_id: layer_id.to_owned(),
660                tile: *tile,
661                data_generation,
662                projection,
663            };
664            if self.bucket_cache.contains_key(&key) {
665                continue;
666            }
667            if self.pending_buckets.iter().any(|p| p.key == key) {
668                continue;
669            }
670            let task_key = key.clone();
671            let task_features = features.clone();
672            let task_style = style.clone();
673            let task_samples = terrain_samples.to_vec();
674            let task_layer_id = layer_id.to_owned();
675            let task_source_id = query_source_id.map(ToOwned::to_owned);
676            let task_source_layer = query_source_layer.map(ToOwned::to_owned);
677            let task_provenance = tile_provenance.get(tile).cloned().unwrap_or_default();
678
679            let receiver = self.pool.spawn_vector(Box::new(move || {
680                let mut feats = task_features;
681                if !task_samples.is_empty() {
682                    apply_terrain_samples(&mut feats, &task_samples);
683                }
684                let mut temp_layer = crate::layers::VectorLayer::new(
685                    &task_layer_id,
686                    FeatureCollection::default(),
687                    task_style,
688                )
689                .with_query_metadata(Some(task_layer_id.clone()), task_source_id)
690                .with_source_layer(task_source_layer);
691                temp_layer.set_features_with_provenance(feats, task_provenance);
692                let mesh = temp_layer.tessellate(task_key.projection);
693                let candidates = temp_layer.symbol_candidates();
694                VectorTaskOutput {
695                    cache_key: VectorCacheKey {
696                        layer_id: task_key.layer_id.clone(),
697                        data_generation: task_key.data_generation,
698                        projection: task_key.projection,
699                    },
700                    mesh,
701                    symbol_candidates: candidates,
702                }
703            }));
704
705            // Wrap the VectorTaskOutput receiver into a VectorBucketOutput receiver
706            self.pending_buckets.push(PendingVectorBucketTask {
707                key,
708                receiver: Box::new(BucketReceiverAdapter { inner: receiver }),
709            });
710        }
711    }
712
713    /// Poll all pending per-tile vector bucket tasks.
714    pub(crate) fn poll_vector_buckets(&mut self) {
715        let mut still_pending = Vec::new();
716        for task in self.pending_buckets.drain(..) {
717            if let Some(output) = task.receiver.try_recv() {
718                self.bucket_cache
719                    .insert(task.key, (output.mesh, output.symbol_candidates));
720            } else {
721                still_pending.push(task);
722            }
723        }
724        self.pending_buckets = still_pending;
725    }
726
727    // -- Background MVT decode --------------------------------------------
728
729    /// Submit a raw vector tile payload for background decoding.
730    ///
731    /// The decode closure runs on the [`DataTaskPool`] and produces an
732    /// [`MvtDecodeOutput`] that is collected by [`poll_decodes`](Self::poll_decodes).
733    pub(crate) fn dispatch_decode(
734        &mut self,
735        tile: TileId,
736        raw: &crate::tile_source::RawVectorPayload,
737        freshness: crate::tile_source::TileFreshness,
738    ) {
739        // Do not double-dispatch if already pending.
740        if self.pending_decodes.iter().any(|t| t.tile == tile) {
741            return;
742        }
743
744        let bytes = Arc::clone(&raw.bytes);
745        let opts = raw.decode_options.clone();
746        let task_tile = tile;
747
748        let receiver = self.pool.spawn_decode(Box::new(move || {
749            let result = crate::mvt::decode_mvt(&bytes, &task_tile, &opts)
750                .map(|layers| crate::tile_source::VectorTileData { layers })
751                .map_err(|e| crate::tile_source::TileError::Decode(format!("MVT decode: {e}")));
752            MvtDecodeOutput {
753                tile: task_tile,
754                result,
755                freshness,
756            }
757        }));
758
759        self.pending_decodes
760            .push(PendingDecodeTask { tile, receiver });
761    }
762
763    /// Poll all pending background MVT decode tasks.
764    ///
765    /// Successfully decoded tiles are collected into [`decoded_tiles`](Self::decoded_tiles)
766    /// for the caller to promote into the tile cache.  Failed decodes are
767    /// also collected so the tile cache can transition to the Failed state.
768    pub(crate) fn poll_decodes(&mut self) {
769        let mut still_pending = Vec::new();
770        for task in self.pending_decodes.drain(..) {
771            if let Some(output) = task.receiver.try_recv() {
772                match output.result {
773                    Ok(vector_data) => {
774                        self.decoded_tiles.push((
775                            output.tile,
776                            crate::tile_source::TileResponse {
777                                data: crate::tile_source::TileData::Vector(vector_data),
778                                freshness: output.freshness,
779                                not_modified: false,
780                            },
781                        ));
782                    }
783                    Err(_err) => {
784                        // Failed decodes: the tile manager will handle the
785                        // cache transition when it sees the tile is still
786                        // in RawVector state on the next frame.
787                    }
788                }
789            } else {
790                still_pending.push(task);
791            }
792        }
793        self.pending_decodes = still_pending;
794    }
795
796    /// Take all completed decode results, leaving the internal buffer empty.
797    pub(crate) fn take_decoded_tiles(&mut self) -> Vec<(TileId, crate::tile_source::TileResponse)> {
798        std::mem::take(&mut self.decoded_tiles)
799    }
800
801    /// Number of decode tasks currently in flight.
802    #[allow(dead_code)]
803    pub(crate) fn pending_decode_count(&self) -> usize {
804        self.pending_decodes.len()
805    }
806
807    /// Collect tessellated vector data for the visible tile set.
808    ///
809    /// Only buckets whose tile is in `visible_tiles` are included in the
810    /// output.  Buckets for off-screen tiles remain in the cache for reuse
811    /// when the camera pans back.
812    pub(crate) fn collect_vector_buckets(
813        &self,
814        layer_id: &str,
815        data_generation: u64,
816        projection: CameraProjection,
817        visible_tiles: &[TileId],
818    ) -> (Vec<VectorMeshData>, Vec<SymbolCandidate>) {
819        let mut meshes = Vec::new();
820        let mut candidates = Vec::new();
821        for tile in visible_tiles {
822            let key = VectorBucketKey {
823                layer_id: layer_id.to_owned(),
824                tile: *tile,
825                data_generation,
826                projection,
827            };
828            if let Some((mesh, cands)) = self.bucket_cache.get(&key) {
829                if !mesh.is_empty() {
830                    meshes.push(mesh.clone());
831                }
832                candidates.extend(cands.iter().cloned());
833            }
834        }
835        (meshes, candidates)
836    }
837
838    /// Prune per-tile vector buckets for layers that are no longer active.
839    ///
840    /// Buckets for active layers whose tiles are off-screen are **retained**
841    /// (they will be reused when the camera pans back).  Only buckets for
842    /// entirely removed layers are evicted.
843    #[allow(dead_code)]
844    pub(crate) fn prune_vector_buckets(&mut self, active_layer_ids: &[String]) {
845        let active_set: std::collections::HashSet<&str> =
846            active_layer_ids.iter().map(String::as_str).collect();
847        self.bucket_cache
848            .retain(|key, _| active_set.contains(key.layer_id.as_str()));
849    }
850
851    pub(crate) fn evict_vector_buckets(&mut self, layer_id: &str, tiles: &[TileId]) {
852        if tiles.is_empty() {
853            return;
854        }
855        let tile_set: std::collections::HashSet<TileId> = tiles.iter().copied().collect();
856        self.bucket_cache
857            .retain(|key, _| key.layer_id != layer_id || !tile_set.contains(&key.tile));
858        self.pending_buckets
859            .retain(|task| task.key.layer_id != layer_id || !tile_set.contains(&task.key.tile));
860    }
861
862    /// Prune per-tile vector buckets for a specific layer to only retain
863    /// buckets matching the given visible tile set.
864    ///
865    /// This is a more aggressive pruning strategy that evicts off-screen
866    /// tile buckets for a layer, useful when memory pressure is high.
867    #[allow(dead_code)]
868    pub(crate) fn prune_vector_buckets_for_layer(
869        &mut self,
870        layer_id: &str,
871        visible_tiles: &[TileId],
872    ) {
873        let visible_set: std::collections::HashSet<TileId> =
874            visible_tiles.iter().copied().collect();
875        self.bucket_cache
876            .retain(|key, _| key.layer_id != layer_id || visible_set.contains(&key.tile));
877    }
878
879    /// Return the number of cached per-tile vector buckets.
880    #[allow(dead_code)]
881    pub(crate) fn bucket_cache_len(&self) -> usize {
882        self.bucket_cache.len()
883    }
884
885    // -- General ----------------------------------------------------------
886
887    #[allow(dead_code)]
888    pub(crate) fn has_pending_tasks(&self) -> bool {
889        !self.pending_terrain.is_empty()
890            || !self.pending_vector.is_empty()
891            || !self.pending_buckets.is_empty()
892    }
893}
894
895/// Adapter that converts a `DataTaskResultReceiver<VectorTaskOutput>` into
896/// a `DataTaskResultReceiver<VectorBucketOutput>` by attaching the bucket key.
897struct BucketReceiverAdapter {
898    inner: Box<dyn DataTaskResultReceiver<VectorTaskOutput>>,
899}
900
901impl DataTaskResultReceiver<VectorBucketOutput> for BucketReceiverAdapter {
902    fn try_recv(&self) -> Option<VectorBucketOutput> {
903        self.inner.try_recv().map(|output| VectorBucketOutput {
904            mesh: output.mesh,
905            symbol_candidates: output.symbol_candidates,
906        })
907    }
908}
909
910fn apply_terrain_samples(features: &mut FeatureCollection, samples: &[(GeoCoord, f64)]) {
911    let lookup: HashMap<(i64, i64), f64> = samples
912        .iter()
913        .map(|(coord, elev)| {
914            let key = (
915                (coord.lat * 100_000.0).round() as i64,
916                (coord.lon * 100_000.0).round() as i64,
917            );
918            (key, *elev)
919        })
920        .collect();
921    for feature in &mut features.features {
922        apply_terrain_to_geometry(&mut feature.geometry, &lookup);
923    }
924}
925
926fn apply_terrain_to_geometry(
927    geometry: &mut crate::geometry::Geometry,
928    lookup: &HashMap<(i64, i64), f64>,
929) {
930    use crate::geometry::Geometry;
931    match geometry {
932        Geometry::Point(p) => {
933            if let Some(&elev) = lookup.get(&coord_key(&p.coord)) {
934                p.coord.alt = elev;
935            }
936        }
937        Geometry::LineString(ls) => {
938            for coord in &mut ls.coords {
939                if let Some(&elev) = lookup.get(&coord_key(coord)) {
940                    coord.alt = elev;
941                }
942            }
943        }
944        Geometry::Polygon(poly) => {
945            for coord in &mut poly.exterior {
946                if let Some(&elev) = lookup.get(&coord_key(coord)) {
947                    coord.alt = elev;
948                }
949            }
950            for hole in &mut poly.interiors {
951                for coord in hole {
952                    if let Some(&elev) = lookup.get(&coord_key(coord)) {
953                        coord.alt = elev;
954                    }
955                }
956            }
957        }
958        Geometry::MultiPoint(mp) => {
959            for p in &mut mp.points {
960                if let Some(&elev) = lookup.get(&coord_key(&p.coord)) {
961                    p.coord.alt = elev;
962                }
963            }
964        }
965        Geometry::MultiLineString(mls) => {
966            for ls in &mut mls.lines {
967                for coord in &mut ls.coords {
968                    if let Some(&elev) = lookup.get(&coord_key(coord)) {
969                        coord.alt = elev;
970                    }
971                }
972            }
973        }
974        Geometry::MultiPolygon(mpoly) => {
975            for poly in &mut mpoly.polygons {
976                for coord in &mut poly.exterior {
977                    if let Some(&elev) = lookup.get(&coord_key(coord)) {
978                        coord.alt = elev;
979                    }
980                }
981                for hole in &mut poly.interiors {
982                    for coord in hole {
983                        if let Some(&elev) = lookup.get(&coord_key(coord)) {
984                            coord.alt = elev;
985                        }
986                    }
987                }
988            }
989        }
990        Geometry::GeometryCollection(geoms) => {
991            for g in geoms {
992                apply_terrain_to_geometry(g, lookup);
993            }
994        }
995    }
996}
997
998fn coord_key(coord: &GeoCoord) -> (i64, i64) {
999    (
1000        (coord.lat * 100_000.0).round() as i64,
1001        (coord.lon * 100_000.0).round() as i64,
1002    )
1003}
1004
1005// ---------------------------------------------------------------------------
1006// Async visualization preprocessing pipeline
1007// ---------------------------------------------------------------------------
1008
1009/// Output of an asynchronous visualization preprocessing task.
1010///
1011/// Contains the preprocessed data ready to be applied to a layer via
1012/// the corresponding `MapState::set_*` method.
1013pub enum VisualizationTaskOutput {
1014    /// Preprocessed scalar field values (for grid scalar / grid extrusion).
1015    ScalarFieldUpdate {
1016        /// Layer name to update.
1017        layer_name: String,
1018        /// New scalar values.
1019        values: Vec<f32>,
1020    },
1021    /// Preprocessed column instances (for instanced column layers).
1022    ColumnUpdate {
1023        /// Layer name to update.
1024        layer_name: String,
1025        /// New column instance set.
1026        columns: crate::visualization::ColumnInstanceSet,
1027        /// Colour ramp (included in case the ramp changes with the data).
1028        ramp: crate::visualization::ColorRamp,
1029    },
1030    /// Preprocessed point instances (for point cloud layers).
1031    PointCloudUpdate {
1032        /// Layer name to update.
1033        layer_name: String,
1034        /// New point instance set.
1035        points: crate::visualization::PointInstanceSet,
1036        /// Colour ramp.
1037        ramp: crate::visualization::ColorRamp,
1038    },
1039}
1040
1041/// Asynchronous preprocessing pipeline for large visualization data updates.
1042///
1043/// Use this when visualization data preparation (binning, normalisation,
1044/// aggregation, coordinate transforms) is too heavy for the main thread's
1045/// per-frame budget.  The pipeline offloads work to the same
1046/// [`DataTaskPool`] used for terrain and vector tessellation.
1047///
1048/// # Usage
1049///
1050/// ```rust,no_run
1051/// # use rustial_engine::{AsyncVisualizationPipeline, ThreadDataTaskPool, VisualizationTaskOutput};
1052/// # use std::sync::Arc;
1053/// let pool: Arc<dyn rustial_engine::DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1054/// let mut viz_pipeline = AsyncVisualizationPipeline::new(pool);
1055///
1056/// // Dispatch a heavy computation on a background thread.
1057/// viz_pipeline.dispatch(Box::new(|| {
1058///     // Expensive data processing...
1059///     let values: Vec<f32> = (0..10_000).map(|i| (i as f32).sin()).collect();
1060///     VisualizationTaskOutput::ScalarFieldUpdate {
1061///         layer_name: "heatmap".to_string(),
1062///         values,
1063///     }
1064/// }));
1065///
1066/// // Each frame, poll for completed results.
1067/// for result in viz_pipeline.poll() {
1068///     match result {
1069///         VisualizationTaskOutput::ScalarFieldUpdate { layer_name, values } => {
1070///             // Apply: state.update_scalar_field(&layer_name, values);
1071///         }
1072///         _ => {}
1073///     }
1074/// }
1075/// ```
1076pub struct AsyncVisualizationPipeline {
1077    _pool: Arc<dyn DataTaskPool>,
1078    pending: Vec<Box<dyn DataTaskResultReceiver<VisualizationTaskOutput>>>,
1079}
1080
1081impl AsyncVisualizationPipeline {
1082    /// Create a new async visualization pipeline backed by the given task pool.
1083    pub fn new(pool: Arc<dyn DataTaskPool>) -> Self {
1084        Self {
1085            _pool: pool,
1086            pending: Vec::new(),
1087        }
1088    }
1089
1090    /// Dispatch a visualization preprocessing task to the background thread pool.
1091    ///
1092    /// The closure runs on a background thread and should return a
1093    /// [`VisualizationTaskOutput`] with the preprocessed data.
1094    pub fn dispatch(
1095        &mut self,
1096        task: Box<dyn FnOnce() -> VisualizationTaskOutput + Send + 'static>,
1097    ) {
1098        let receiver = spawn_on_thread(task);
1099        self.pending.push(receiver);
1100    }
1101
1102    /// Poll all pending tasks and return completed results.
1103    ///
1104    /// Call this once per frame from the main thread.  Incomplete tasks
1105    /// remain in the pending queue for the next poll.
1106    pub fn poll(&mut self) -> Vec<VisualizationTaskOutput> {
1107        let mut completed = Vec::new();
1108        let mut still_pending = Vec::new();
1109
1110        for receiver in self.pending.drain(..) {
1111            match receiver.try_recv() {
1112                Some(result) => completed.push(result),
1113                None => still_pending.push(receiver),
1114            }
1115        }
1116
1117        self.pending = still_pending;
1118        completed
1119    }
1120
1121    /// Return the number of tasks currently in-flight.
1122    pub fn pending_count(&self) -> usize {
1123        self.pending.len()
1124    }
1125}
1126
1127#[cfg(test)]
1128mod tests {
1129    use super::*;
1130    use crate::camera_projection::CameraProjection;
1131    use crate::geometry::{Feature, Geometry, LineString, Point, Polygon};
1132    use crate::layers::VectorStyle;
1133    use rustial_math::{ElevationGrid, GeoCoord, TileId};
1134    use std::collections::HashMap;
1135
1136    #[test]
1137    fn thread_pool_dispatches_and_receives_terrain() {
1138        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1139        let mut pipeline = AsyncDataPipeline::new(pool);
1140
1141        let tile = TileId::new(0, 0, 0);
1142        let elev = ElevationGrid::flat(tile, 4, 4);
1143        pipeline.dispatch_terrain(TerrainTaskInput {
1144            tile,
1145            elevation_source_tile: tile,
1146            elevation_region: TileTextureRegion::FULL,
1147            elevation: elev,
1148            resolution: 4,
1149            vertical_exaggeration: 1.0,
1150            generation: 1,
1151        });
1152
1153        for _ in 0..100 {
1154            pipeline.poll_terrain();
1155            if !pipeline.has_pending_tasks() {
1156                break;
1157            }
1158            std::thread::sleep(std::time::Duration::from_millis(10));
1159        }
1160
1161        let mut gens = HashMap::new();
1162        gens.insert(tile, 1u64);
1163        let (meshes, hillshades) = pipeline.collect_terrain(&[tile], &gens, 4);
1164        assert_eq!(meshes.len(), 1);
1165        assert_eq!(hillshades.len(), 1);
1166        assert_eq!(meshes[0].tile, tile);
1167    }
1168
1169    #[test]
1170    fn thread_pool_dispatches_and_receives_vector() {
1171        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1172        let mut pipeline = AsyncDataPipeline::new(pool);
1173
1174        let features = crate::geometry::FeatureCollection {
1175            features: vec![Feature {
1176                geometry: Geometry::Point(Point {
1177                    coord: GeoCoord::from_lat_lon(0.0, 0.0),
1178                }),
1179                properties: HashMap::new(),
1180            }],
1181        };
1182
1183        let key = VectorCacheKey {
1184            layer_id: "test".into(),
1185            data_generation: 1,
1186            projection: CameraProjection::WebMercator,
1187        };
1188
1189        pipeline.dispatch_vector(VectorTaskInput {
1190            cache_key: key.clone(),
1191            features,
1192            style: VectorStyle::default(),
1193            query_layer_id: Some("test".into()),
1194            query_source_id: None,
1195            query_source_layer: None,
1196            feature_provenance: Vec::new(),
1197            projection: CameraProjection::WebMercator,
1198            terrain_samples: Vec::new(),
1199        });
1200
1201        for _ in 0..100 {
1202            pipeline.poll_vector();
1203            if !pipeline.has_pending_tasks() {
1204                break;
1205            }
1206            std::thread::sleep(std::time::Duration::from_millis(10));
1207        }
1208
1209        let (meshes, _candidates) = pipeline.collect_vectors(&[key]);
1210        assert_eq!(meshes.len(), 1);
1211        assert!(meshes[0].vertex_count() > 0);
1212    }
1213
1214    #[test]
1215    fn duplicate_dispatch_is_deduplicated() {
1216        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1217        let mut pipeline = AsyncDataPipeline::new(pool);
1218
1219        let tile = TileId::new(0, 0, 0);
1220        let elev = ElevationGrid::flat(tile, 4, 4);
1221        let input = TerrainTaskInput {
1222            tile,
1223            elevation_source_tile: tile,
1224            elevation_region: TileTextureRegion::FULL,
1225            elevation: elev,
1226            resolution: 4,
1227            vertical_exaggeration: 1.0,
1228            generation: 1,
1229        };
1230
1231        pipeline.dispatch_terrain(input.clone());
1232        pipeline.dispatch_terrain(input.clone());
1233        assert_eq!(pipeline.pending_terrain.len(), 1);
1234    }
1235
1236    #[test]
1237    fn cached_result_prevents_redispatch() {
1238        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1239        let mut pipeline = AsyncDataPipeline::new(pool);
1240
1241        let tile = TileId::new(0, 0, 0);
1242        let elev = ElevationGrid::flat(tile, 4, 4);
1243        let input = TerrainTaskInput {
1244            tile,
1245            elevation_source_tile: tile,
1246            elevation_region: TileTextureRegion::FULL,
1247            elevation: elev,
1248            resolution: 4,
1249            vertical_exaggeration: 1.0,
1250            generation: 1,
1251        };
1252
1253        pipeline.dispatch_terrain(input.clone());
1254
1255        for _ in 0..100 {
1256            pipeline.poll_terrain();
1257            if !pipeline.has_pending_tasks() {
1258                break;
1259            }
1260            std::thread::sleep(std::time::Duration::from_millis(10));
1261        }
1262
1263        pipeline.dispatch_terrain(input);
1264        assert!(pipeline.pending_terrain.is_empty());
1265    }
1266
1267    #[test]
1268    fn prune_terrain_removes_stale_entries() {
1269        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1270        let mut pipeline = AsyncDataPipeline::new(pool);
1271
1272        let tile_a = TileId::new(1, 0, 0);
1273        let tile_b = TileId::new(1, 1, 0);
1274        let elev_a = ElevationGrid::flat(tile_a, 4, 4);
1275        let elev_b = ElevationGrid::flat(tile_b, 4, 4);
1276        let mesh_a = crate::terrain::build_terrain_descriptor(&tile_a, &elev_a, 4, 1.0, 1);
1277        let mesh_b = crate::terrain::build_terrain_descriptor(&tile_b, &elev_b, 4, 1.0, 1);
1278        let hs_a = crate::terrain::prepare_hillshade_raster(&elev_a, 1.0, 1);
1279        let hs_b = crate::terrain::prepare_hillshade_raster(&elev_b, 1.0, 1);
1280
1281        pipeline.terrain_cache.insert(
1282            TerrainCacheKey {
1283                tile: tile_a,
1284                generation: 1,
1285                resolution: 4,
1286            },
1287            (mesh_a, hs_a),
1288        );
1289        pipeline.terrain_cache.insert(
1290            TerrainCacheKey {
1291                tile: tile_b,
1292                generation: 1,
1293                resolution: 4,
1294            },
1295            (mesh_b, hs_b),
1296        );
1297
1298        assert_eq!(pipeline.terrain_cache.len(), 2);
1299        pipeline.prune_terrain(&[tile_a]);
1300        assert_eq!(pipeline.terrain_cache.len(), 1);
1301        assert!(pipeline.terrain_cache.keys().all(|k| k.tile == tile_a));
1302    }
1303
1304    #[test]
1305    fn synchronous_fallback_path_still_works() {
1306        let mut state = crate::MapState::new();
1307        state.set_viewport(800, 600);
1308        state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
1309        state.set_camera_distance(1_000.0);
1310        state.update();
1311        assert!(state.zoom_level() <= 22);
1312    }
1313
1314    #[test]
1315    fn layer_generation_tracking() {
1316        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1317        let mut pipeline = AsyncDataPipeline::new(pool);
1318
1319        let gen1 = pipeline.layer_generation("layer-a", false);
1320        let gen2 = pipeline.layer_generation("layer-a", false);
1321        assert_eq!(gen1, gen2);
1322
1323        let gen3 = pipeline.layer_generation("layer-a", true);
1324        assert!(gen3 > gen2);
1325
1326        let gen4 = pipeline.layer_generation("layer-a", false);
1327        assert_eq!(gen3, gen4);
1328    }
1329
1330    #[test]
1331    fn collect_all_terrain_returns_cached_entries() {
1332        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1333        let mut pipeline = AsyncDataPipeline::new(pool);
1334
1335        let tile = TileId::new(0, 0, 0);
1336        let elev = ElevationGrid::flat(tile, 4, 4);
1337        pipeline.dispatch_terrain(TerrainTaskInput {
1338            tile,
1339            elevation_source_tile: tile,
1340            elevation_region: TileTextureRegion::FULL,
1341            elevation: elev,
1342            resolution: 4,
1343            vertical_exaggeration: 1.0,
1344            generation: 1,
1345        });
1346
1347        for _ in 0..100 {
1348            pipeline.poll_terrain();
1349            if !pipeline.has_pending_tasks() {
1350                break;
1351            }
1352            std::thread::sleep(std::time::Duration::from_millis(10));
1353        }
1354
1355        let (meshes, hillshades) = pipeline.collect_all_terrain();
1356        assert_eq!(meshes.len(), 1);
1357        assert_eq!(hillshades.len(), 1);
1358        assert_eq!(meshes[0].tile, tile);
1359    }
1360
1361    #[test]
1362    fn async_pipeline_terrain_via_map_state() {
1363        use crate::terrain::{FlatElevationSource, TerrainConfig};
1364
1365        let config = TerrainConfig {
1366            enabled: true,
1367            mesh_resolution: 4,
1368            source: Box::new(FlatElevationSource::new(4, 4)),
1369            ..TerrainConfig::default()
1370        };
1371        let mut state = crate::MapState::with_terrain(config, 100);
1372        state.set_viewport(800, 600);
1373        state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
1374        state.set_camera_distance(10_000_000.0);
1375
1376        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1377        state.set_task_pool(pool);
1378        assert!(state.has_async_pipeline());
1379
1380        // First update: dispatches terrain tasks.
1381        state.update_with_dt(1.0 / 60.0);
1382
1383        // Allow background tasks to complete.
1384        for _ in 0..100 {
1385            std::thread::sleep(std::time::Duration::from_millis(10));
1386            state.update_with_dt(1.0 / 60.0);
1387            if !state.terrain_meshes().is_empty() {
1388                break;
1389            }
1390        }
1391
1392        assert!(
1393            !state.terrain_meshes().is_empty(),
1394            "async pipeline should produce terrain meshes"
1395        );
1396    }
1397
1398    #[test]
1399    fn async_pipeline_vectors_via_map_state() {
1400        use crate::layers::{VectorLayer, VectorStyle};
1401
1402        let mut state = crate::MapState::new();
1403        state.set_viewport(800, 600);
1404        state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
1405        state.set_camera_distance(1_000.0);
1406
1407        let fc = crate::geometry::FeatureCollection {
1408            features: vec![Feature {
1409                geometry: Geometry::Point(Point {
1410                    coord: GeoCoord::from_lat_lon(0.0, 0.0),
1411                }),
1412                properties: HashMap::new(),
1413            }],
1414        };
1415        let vl = VectorLayer::new("test-vec", fc, VectorStyle::default());
1416        state.push_layer(Box::new(vl));
1417
1418        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1419        state.set_task_pool(pool);
1420
1421        // First update: dispatches vector tasks.
1422        state.update_with_dt(1.0 / 60.0);
1423
1424        // Allow background tasks to complete.
1425        for _ in 0..100 {
1426            std::thread::sleep(std::time::Duration::from_millis(10));
1427            state.update_with_dt(1.0 / 60.0);
1428            if !state.vector_meshes().is_empty() {
1429                break;
1430            }
1431        }
1432
1433        assert!(
1434            !state.vector_meshes().is_empty(),
1435            "async pipeline should produce vector meshes"
1436        );
1437    }
1438
1439    #[test]
1440    fn async_pipeline_clear_reverts_to_sync() {
1441        let mut state = crate::MapState::new();
1442        state.set_viewport(800, 600);
1443        state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
1444        state.set_camera_distance(1_000.0);
1445
1446        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1447        state.set_task_pool(pool);
1448        assert!(state.has_async_pipeline());
1449
1450        state.clear_task_pool();
1451        assert!(!state.has_async_pipeline());
1452
1453        // Synchronous update should still work.
1454        state.update();
1455        assert!(state.zoom_level() <= 22);
1456    }
1457
1458    #[test]
1459    fn data_update_interval_ignored_with_async_pipeline() {
1460        use crate::terrain::{FlatElevationSource, TerrainConfig};
1461
1462        let config = TerrainConfig {
1463            enabled: true,
1464            mesh_resolution: 4,
1465            source: Box::new(FlatElevationSource::new(4, 4)),
1466            ..TerrainConfig::default()
1467        };
1468        let mut state = crate::MapState::with_terrain(config, 100);
1469        state.set_viewport(800, 600);
1470        state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
1471        state.set_camera_distance(10_000_000.0);
1472        state.set_data_update_interval(100.0); // Very high throttle
1473
1474        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1475        state.set_task_pool(pool);
1476
1477        // Even with a very high throttle, async dispatch should still run
1478        // because the throttle is ignored when an async pipeline is active.
1479        for _ in 0..100 {
1480            state.update_with_dt(1.0 / 60.0);
1481            std::thread::sleep(std::time::Duration::from_millis(10));
1482            if !state.terrain_meshes().is_empty() {
1483                break;
1484            }
1485        }
1486
1487        assert!(
1488            !state.terrain_meshes().is_empty(),
1489            "async pipeline should produce terrain despite high throttle"
1490        );
1491    }
1492
1493    // =====================================================================
1494    // Per-tile vector bucket tests
1495    // =====================================================================
1496
1497    #[test]
1498    fn partition_features_by_tile_separates_spatially() {
1499        let features = FeatureCollection {
1500            features: vec![
1501                Feature {
1502                    geometry: Geometry::Point(Point {
1503                        coord: GeoCoord::from_lat_lon(10.0, 10.0),
1504                    }),
1505                    properties: HashMap::new(),
1506                },
1507                Feature {
1508                    geometry: Geometry::Point(Point {
1509                        coord: GeoCoord::from_lat_lon(-30.0, -60.0),
1510                    }),
1511                    properties: HashMap::new(),
1512                },
1513                Feature {
1514                    geometry: Geometry::Point(Point {
1515                        coord: GeoCoord::from_lat_lon(10.01, 10.01),
1516                    }),
1517                    properties: HashMap::new(),
1518                },
1519            ],
1520        };
1521
1522        let buckets = partition_features_by_tile(&features, 4);
1523        let total: usize = buckets.values().map(|fc| fc.len()).sum();
1524        assert_eq!(total, 3, "all features should be partitioned");
1525        // The first and third points are very close and should be in the same tile at zoom 4
1526        let tile_a = geo_to_tile(&GeoCoord::from_lat_lon(10.0, 10.0), 4).tile_id();
1527        let tile_c = geo_to_tile(&GeoCoord::from_lat_lon(10.01, 10.01), 4).tile_id();
1528        assert_eq!(tile_a, tile_c);
1529        assert_eq!(buckets.get(&tile_a).map(|fc| fc.len()), Some(2));
1530    }
1531
1532    #[test]
1533    fn partition_features_handles_linestring_and_polygon() {
1534        let features = FeatureCollection {
1535            features: vec![
1536                Feature {
1537                    geometry: Geometry::LineString(LineString {
1538                        coords: vec![
1539                            GeoCoord::from_lat_lon(45.0, 90.0),
1540                            GeoCoord::from_lat_lon(46.0, 91.0),
1541                        ],
1542                    }),
1543                    properties: HashMap::new(),
1544                },
1545                Feature {
1546                    geometry: Geometry::Polygon(Polygon {
1547                        exterior: vec![
1548                            GeoCoord::from_lat_lon(-10.0, -20.0),
1549                            GeoCoord::from_lat_lon(-10.0, -19.0),
1550                            GeoCoord::from_lat_lon(-9.0, -19.0),
1551                        ],
1552                        interiors: vec![],
1553                    }),
1554                    properties: HashMap::new(),
1555                },
1556            ],
1557        };
1558
1559        let buckets = partition_features_by_tile(&features, 6);
1560        let total: usize = buckets.values().map(|fc| fc.len()).sum();
1561        assert_eq!(total, 2);
1562    }
1563
1564    #[test]
1565    fn partition_features_drops_empty_geometry() {
1566        let features = FeatureCollection {
1567            features: vec![Feature {
1568                geometry: Geometry::GeometryCollection(vec![]),
1569                properties: HashMap::new(),
1570            }],
1571        };
1572
1573        let buckets = partition_features_by_tile(&features, 4);
1574        assert!(
1575            buckets.is_empty(),
1576            "empty geometry should not produce a bucket"
1577        );
1578    }
1579
1580    #[test]
1581    fn dispatch_and_poll_vector_buckets() {
1582        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1583        let mut pipeline = AsyncDataPipeline::new(pool);
1584
1585        let tile_a = geo_to_tile(&GeoCoord::from_lat_lon(10.0, 10.0), 4).tile_id();
1586        let tile_b = geo_to_tile(&GeoCoord::from_lat_lon(-30.0, -60.0), 4).tile_id();
1587
1588        let features = FeatureCollection {
1589            features: vec![
1590                Feature {
1591                    geometry: Geometry::Point(Point {
1592                        coord: GeoCoord::from_lat_lon(10.0, 10.0),
1593                    }),
1594                    properties: HashMap::new(),
1595                },
1596                Feature {
1597                    geometry: Geometry::Point(Point {
1598                        coord: GeoCoord::from_lat_lon(-30.0, -60.0),
1599                    }),
1600                    properties: HashMap::new(),
1601                },
1602            ],
1603        };
1604
1605        let tile_features = partition_features_by_tile(&features, 4);
1606        assert!(tile_features.len() >= 2);
1607
1608        pipeline.dispatch_vector_buckets(
1609            "test-layer",
1610            1,
1611            CameraProjection::WebMercator,
1612            &VectorStyle::default(),
1613            &tile_features,
1614            None,
1615            None,
1616            &HashMap::new(),
1617            &[],
1618        );
1619
1620        for _ in 0..100 {
1621            pipeline.poll_vector_buckets();
1622            if !pipeline.has_pending_tasks() {
1623                break;
1624            }
1625            std::thread::sleep(std::time::Duration::from_millis(10));
1626        }
1627
1628        // Collect only tile_a
1629        let (meshes_a, _) = pipeline.collect_vector_buckets(
1630            "test-layer",
1631            1,
1632            CameraProjection::WebMercator,
1633            &[tile_a],
1634        );
1635        assert_eq!(meshes_a.len(), 1, "should have one mesh for tile_a");
1636
1637        // Collect only tile_b
1638        let (meshes_b, _) = pipeline.collect_vector_buckets(
1639            "test-layer",
1640            1,
1641            CameraProjection::WebMercator,
1642            &[tile_b],
1643        );
1644        assert_eq!(meshes_b.len(), 1, "should have one mesh for tile_b");
1645
1646        // Collect both
1647        let (meshes_both, _) = pipeline.collect_vector_buckets(
1648            "test-layer",
1649            1,
1650            CameraProjection::WebMercator,
1651            &[tile_a, tile_b],
1652        );
1653        assert_eq!(meshes_both.len(), 2, "should have meshes for both tiles");
1654    }
1655
1656    #[test]
1657    fn bucket_dispatch_deduplicates() {
1658        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1659        let mut pipeline = AsyncDataPipeline::new(pool);
1660
1661        let features = FeatureCollection {
1662            features: vec![Feature {
1663                geometry: Geometry::Point(Point {
1664                    coord: GeoCoord::from_lat_lon(10.0, 10.0),
1665                }),
1666                properties: HashMap::new(),
1667            }],
1668        };
1669        let tile_features = partition_features_by_tile(&features, 4);
1670
1671        pipeline.dispatch_vector_buckets(
1672            "layer",
1673            1,
1674            CameraProjection::WebMercator,
1675            &VectorStyle::default(),
1676            &tile_features,
1677            None,
1678            None,
1679            &HashMap::new(),
1680            &[],
1681        );
1682        let first_count = pipeline.pending_buckets.len();
1683
1684        pipeline.dispatch_vector_buckets(
1685            "layer",
1686            1,
1687            CameraProjection::WebMercator,
1688            &VectorStyle::default(),
1689            &tile_features,
1690            None,
1691            None,
1692            &HashMap::new(),
1693            &[],
1694        );
1695        assert_eq!(
1696            pipeline.pending_buckets.len(),
1697            first_count,
1698            "duplicate dispatch should be deduped"
1699        );
1700    }
1701
1702    #[test]
1703    fn bucket_cache_hit_prevents_redispatch() {
1704        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1705        let mut pipeline = AsyncDataPipeline::new(pool);
1706
1707        let features = FeatureCollection {
1708            features: vec![Feature {
1709                geometry: Geometry::Point(Point {
1710                    coord: GeoCoord::from_lat_lon(10.0, 10.0),
1711                }),
1712                properties: HashMap::new(),
1713            }],
1714        };
1715        let tile_features = partition_features_by_tile(&features, 4);
1716
1717        pipeline.dispatch_vector_buckets(
1718            "layer",
1719            1,
1720            CameraProjection::WebMercator,
1721            &VectorStyle::default(),
1722            &tile_features,
1723            None,
1724            None,
1725            &HashMap::new(),
1726            &[],
1727        );
1728
1729        for _ in 0..100 {
1730            pipeline.poll_vector_buckets();
1731            if !pipeline.has_pending_tasks() {
1732                break;
1733            }
1734            std::thread::sleep(std::time::Duration::from_millis(10));
1735        }
1736
1737        assert!(pipeline.bucket_cache_len() > 0);
1738
1739        pipeline.dispatch_vector_buckets(
1740            "layer",
1741            1,
1742            CameraProjection::WebMercator,
1743            &VectorStyle::default(),
1744            &tile_features,
1745            None,
1746            None,
1747            &HashMap::new(),
1748            &[],
1749        );
1750        assert!(
1751            pipeline.pending_buckets.is_empty(),
1752            "cached result should prevent redispatch"
1753        );
1754    }
1755
1756    #[test]
1757    fn prune_vector_buckets_removes_inactive_layers() {
1758        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1759        let mut pipeline = AsyncDataPipeline::new(pool);
1760
1761        let tile = TileId::new(4, 0, 0);
1762        let key_a = VectorBucketKey {
1763            layer_id: "keep".into(),
1764            tile,
1765            data_generation: 1,
1766            projection: CameraProjection::WebMercator,
1767        };
1768        let key_b = VectorBucketKey {
1769            layer_id: "remove".into(),
1770            tile,
1771            data_generation: 1,
1772            projection: CameraProjection::WebMercator,
1773        };
1774        pipeline
1775            .bucket_cache
1776            .insert(key_a, (VectorMeshData::default(), vec![]));
1777        pipeline
1778            .bucket_cache
1779            .insert(key_b, (VectorMeshData::default(), vec![]));
1780        assert_eq!(pipeline.bucket_cache_len(), 2);
1781
1782        pipeline.prune_vector_buckets(&["keep".to_string()]);
1783        assert_eq!(pipeline.bucket_cache_len(), 1);
1784        assert!(pipeline.bucket_cache.keys().all(|k| k.layer_id == "keep"));
1785    }
1786
1787    #[test]
1788    fn prune_vector_buckets_for_layer_removes_offscreen_tiles() {
1789        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1790        let mut pipeline = AsyncDataPipeline::new(pool);
1791
1792        let tile_a = TileId::new(4, 0, 0);
1793        let tile_b = TileId::new(4, 1, 0);
1794        let tile_c = TileId::new(4, 2, 0);
1795
1796        for tile in &[tile_a, tile_b, tile_c] {
1797            pipeline.bucket_cache.insert(
1798                VectorBucketKey {
1799                    layer_id: "layer".into(),
1800                    tile: *tile,
1801                    data_generation: 1,
1802                    projection: CameraProjection::WebMercator,
1803                },
1804                (VectorMeshData::default(), vec![]),
1805            );
1806        }
1807        assert_eq!(pipeline.bucket_cache_len(), 3);
1808
1809        pipeline.prune_vector_buckets_for_layer("layer", &[tile_a, tile_c]);
1810        assert_eq!(pipeline.bucket_cache_len(), 2);
1811        assert!(pipeline
1812            .bucket_cache
1813            .keys()
1814            .all(|k| k.tile == tile_a || k.tile == tile_c));
1815    }
1816
1817    #[test]
1818    fn collect_vector_buckets_only_returns_visible() {
1819        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1820        let mut pipeline = AsyncDataPipeline::new(pool);
1821
1822        let tile_visible = TileId::new(4, 5, 5);
1823        let tile_offscreen = TileId::new(4, 10, 10);
1824
1825        let mesh = VectorMeshData {
1826            positions: vec![[0.0, 0.0, 0.0], [1.0, 0.0, 0.0], [0.0, 1.0, 0.0]],
1827            colors: vec![[1.0; 4]; 3],
1828            indices: vec![0, 1, 2],
1829            ..Default::default()
1830        };
1831        for tile in &[tile_visible, tile_offscreen] {
1832            pipeline.bucket_cache.insert(
1833                VectorBucketKey {
1834                    layer_id: "layer".into(),
1835                    tile: *tile,
1836                    data_generation: 1,
1837                    projection: CameraProjection::WebMercator,
1838                },
1839                (mesh.clone(), vec![]),
1840            );
1841        }
1842
1843        let (meshes, _) = pipeline.collect_vector_buckets(
1844            "layer",
1845            1,
1846            CameraProjection::WebMercator,
1847            &[tile_visible],
1848        );
1849        assert_eq!(
1850            meshes.len(),
1851            1,
1852            "only visible tile bucket should be collected"
1853        );
1854    }
1855
1856    #[test]
1857    fn representative_coord_all_geometry_types() {
1858        let p = Geometry::Point(Point {
1859            coord: GeoCoord::from_lat_lon(1.0, 2.0),
1860        });
1861        assert_eq!(
1862            representative_coord(&p).map(|c| (c.lat, c.lon)),
1863            Some((1.0, 2.0))
1864        );
1865
1866        let ls = Geometry::LineString(LineString {
1867            coords: vec![
1868                GeoCoord::from_lat_lon(3.0, 4.0),
1869                GeoCoord::from_lat_lon(5.0, 6.0),
1870            ],
1871        });
1872        assert_eq!(
1873            representative_coord(&ls).map(|c| (c.lat, c.lon)),
1874            Some((3.0, 4.0))
1875        );
1876
1877        let poly = Geometry::Polygon(Polygon {
1878            exterior: vec![
1879                GeoCoord::from_lat_lon(7.0, 8.0),
1880                GeoCoord::from_lat_lon(7.0, 9.0),
1881                GeoCoord::from_lat_lon(8.0, 9.0),
1882            ],
1883            interiors: vec![],
1884        });
1885        assert_eq!(
1886            representative_coord(&poly).map(|c| (c.lat, c.lon)),
1887            Some((7.0, 8.0))
1888        );
1889
1890        let gc = Geometry::GeometryCollection(vec![]);
1891        assert!(representative_coord(&gc).is_none());
1892
1893        let gc_with = Geometry::GeometryCollection(vec![p.clone()]);
1894        assert!(representative_coord(&gc_with).is_some());
1895    }
1896
1897    #[test]
1898    fn dispatch_and_poll_decode_produces_decoded_tile() {
1899        use crate::tile_source::{RawVectorPayload, TileFreshness};
1900        use std::sync::Arc;
1901
1902        // Build a minimal valid MVT tile.
1903        fn build_test_mvt() -> Vec<u8> {
1904            fn encode_varint(mut val: u64) -> Vec<u8> {
1905                let mut buf = Vec::new();
1906                loop {
1907                    let mut byte = (val & 0x7F) as u8;
1908                    val >>= 7;
1909                    if val != 0 {
1910                        byte |= 0x80;
1911                    }
1912                    buf.push(byte);
1913                    if val == 0 {
1914                        break;
1915                    }
1916                }
1917                buf
1918            }
1919            fn encode_tag(field: u32, wt: u8) -> Vec<u8> {
1920                encode_varint(((field as u64) << 3) | wt as u64)
1921            }
1922            fn encode_ld(field: u32, data: &[u8]) -> Vec<u8> {
1923                let mut b = encode_tag(field, 2);
1924                b.extend(encode_varint(data.len() as u64));
1925                b.extend_from_slice(data);
1926                b
1927            }
1928            fn encode_vi(field: u32, val: u64) -> Vec<u8> {
1929                let mut b = encode_tag(field, 0);
1930                b.extend(encode_varint(val));
1931                b
1932            }
1933            fn zigzag(n: i32) -> u32 {
1934                ((n << 1) ^ (n >> 31)) as u32
1935            }
1936
1937            let mut geom = Vec::new();
1938            geom.extend(encode_varint(((1u64) << 3) | 1));
1939            geom.extend(encode_varint(zigzag(2048) as u64));
1940            geom.extend(encode_varint(zigzag(2048) as u64));
1941
1942            let mut feat = Vec::new();
1943            feat.extend(encode_vi(3, 1));
1944            feat.extend(encode_ld(4, &geom));
1945
1946            let mut layer = Vec::new();
1947            layer.extend(encode_ld(1, b"test"));
1948            layer.extend(encode_ld(2, &feat));
1949            layer.extend(encode_vi(5, 4096));
1950            layer.extend(encode_vi(15, 2));
1951
1952            encode_ld(3, &layer)
1953        }
1954
1955        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1956        let mut pipeline = AsyncDataPipeline::new(pool);
1957
1958        let tile = TileId::new(0, 0, 0);
1959        let raw = RawVectorPayload {
1960            tile_id: tile,
1961            bytes: Arc::new(build_test_mvt()),
1962            decode_options: crate::mvt::MvtDecodeOptions::default(),
1963        };
1964
1965        pipeline.dispatch_decode(tile, &raw, TileFreshness::default());
1966
1967        // Poll until the decode completes (should be near-instant).
1968        for _ in 0..50 {
1969            pipeline.poll_decodes();
1970            if !pipeline.decoded_tiles.is_empty() {
1971                break;
1972            }
1973            std::thread::sleep(std::time::Duration::from_millis(10));
1974        }
1975
1976        let decoded = pipeline.take_decoded_tiles();
1977        assert_eq!(decoded.len(), 1, "should have one decoded tile");
1978        let (decoded_id, response) = &decoded[0];
1979        assert_eq!(*decoded_id, tile);
1980        assert!(response.data.is_vector(), "decoded data should be Vector");
1981        let vt = response.data.as_vector().expect("should be Vector");
1982        assert!(vt.layers.contains_key("test"));
1983        assert_eq!(vt.layers["test"].len(), 1);
1984    }
1985
1986    #[test]
1987    fn dispatch_decode_deduplicates() {
1988        use crate::tile_source::{RawVectorPayload, TileFreshness};
1989        use std::sync::Arc;
1990
1991        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1992        let mut pipeline = AsyncDataPipeline::new(pool);
1993
1994        let tile = TileId::new(0, 0, 0);
1995        let raw = RawVectorPayload {
1996            tile_id: tile,
1997            bytes: Arc::new(vec![]),
1998            decode_options: crate::mvt::MvtDecodeOptions::default(),
1999        };
2000
2001        pipeline.dispatch_decode(tile, &raw, TileFreshness::default());
2002        pipeline.dispatch_decode(tile, &raw, TileFreshness::default());
2003
2004        // Only one task should be pending despite two dispatches.
2005        assert_eq!(pipeline.pending_decodes.len(), 1);
2006    }
2007
2008    // -----------------------------------------------------------------------
2009    // AsyncVisualizationPipeline tests
2010    // -----------------------------------------------------------------------
2011
2012    #[test]
2013    fn async_viz_pipeline_dispatches_and_receives_scalar_update() {
2014        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
2015        let mut viz = AsyncVisualizationPipeline::new(pool);
2016
2017        viz.dispatch(Box::new(|| VisualizationTaskOutput::ScalarFieldUpdate {
2018            layer_name: "density".to_string(),
2019            values: vec![1.0, 2.0, 3.0, 4.0],
2020        }));
2021
2022        assert_eq!(viz.pending_count(), 1);
2023
2024        // Busy-poll until complete.
2025        let mut results = Vec::new();
2026        for _ in 0..200 {
2027            results = viz.poll();
2028            if !results.is_empty() {
2029                break;
2030            }
2031            std::thread::sleep(std::time::Duration::from_millis(5));
2032        }
2033
2034        assert_eq!(results.len(), 1);
2035        match &results[0] {
2036            VisualizationTaskOutput::ScalarFieldUpdate { layer_name, values } => {
2037                assert_eq!(layer_name, "density");
2038                assert_eq!(values, &[1.0, 2.0, 3.0, 4.0]);
2039            }
2040            _ => panic!("expected ScalarFieldUpdate"),
2041        }
2042        assert_eq!(viz.pending_count(), 0);
2043    }
2044
2045    #[test]
2046    fn async_viz_pipeline_dispatches_and_receives_column_update() {
2047        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
2048        let mut viz = AsyncVisualizationPipeline::new(pool);
2049
2050        viz.dispatch(Box::new(|| VisualizationTaskOutput::ColumnUpdate {
2051            layer_name: "bars".to_string(),
2052            columns: crate::visualization::ColumnInstanceSet::new(vec![
2053                crate::visualization::ColumnInstance::new(
2054                    GeoCoord::from_lat_lon(0.0, 0.0),
2055                    100.0,
2056                    10.0,
2057                ),
2058            ]),
2059            ramp: crate::visualization::ColorRamp::new(vec![
2060                crate::visualization::ColorStop {
2061                    value: 0.0,
2062                    color: [0.0, 0.0, 1.0, 1.0],
2063                },
2064                crate::visualization::ColorStop {
2065                    value: 1.0,
2066                    color: [1.0, 0.0, 0.0, 1.0],
2067                },
2068            ]),
2069        }));
2070
2071        let mut results = Vec::new();
2072        for _ in 0..200 {
2073            results = viz.poll();
2074            if !results.is_empty() {
2075                break;
2076            }
2077            std::thread::sleep(std::time::Duration::from_millis(5));
2078        }
2079
2080        assert_eq!(results.len(), 1);
2081        match &results[0] {
2082            VisualizationTaskOutput::ColumnUpdate {
2083                layer_name,
2084                columns,
2085                ..
2086            } => {
2087                assert_eq!(layer_name, "bars");
2088                assert_eq!(columns.columns.len(), 1);
2089            }
2090            _ => panic!("expected ColumnUpdate"),
2091        }
2092    }
2093
2094    #[test]
2095    fn async_viz_pipeline_dispatches_and_receives_point_cloud_update() {
2096        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
2097        let mut viz = AsyncVisualizationPipeline::new(pool);
2098
2099        viz.dispatch(Box::new(|| VisualizationTaskOutput::PointCloudUpdate {
2100            layer_name: "scatter".to_string(),
2101            points: crate::visualization::PointInstanceSet::new(vec![
2102                crate::visualization::PointInstance::new(GeoCoord::from_lat_lon(1.0, 2.0), 5.0),
2103                crate::visualization::PointInstance::new(GeoCoord::from_lat_lon(3.0, 4.0), 8.0),
2104            ]),
2105            ramp: crate::visualization::ColorRamp::new(vec![
2106                crate::visualization::ColorStop {
2107                    value: 0.0,
2108                    color: [0.0, 1.0, 0.0, 1.0],
2109                },
2110                crate::visualization::ColorStop {
2111                    value: 1.0,
2112                    color: [1.0, 1.0, 0.0, 1.0],
2113                },
2114            ]),
2115        }));
2116
2117        let mut results = Vec::new();
2118        for _ in 0..200 {
2119            results = viz.poll();
2120            if !results.is_empty() {
2121                break;
2122            }
2123            std::thread::sleep(std::time::Duration::from_millis(5));
2124        }
2125
2126        assert_eq!(results.len(), 1);
2127        match &results[0] {
2128            VisualizationTaskOutput::PointCloudUpdate {
2129                layer_name, points, ..
2130            } => {
2131                assert_eq!(layer_name, "scatter");
2132                assert_eq!(points.points.len(), 2);
2133            }
2134            _ => panic!("expected PointCloudUpdate"),
2135        }
2136    }
2137
2138    #[test]
2139    fn async_viz_pipeline_multiple_tasks_complete_independently() {
2140        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
2141        let mut viz = AsyncVisualizationPipeline::new(pool);
2142
2143        for i in 0..5 {
2144            let name = format!("layer_{i}");
2145            viz.dispatch(Box::new(move || {
2146                VisualizationTaskOutput::ScalarFieldUpdate {
2147                    layer_name: name,
2148                    values: vec![i as f32; 4],
2149                }
2150            }));
2151        }
2152
2153        assert_eq!(viz.pending_count(), 5);
2154
2155        let mut all_results = Vec::new();
2156        for _ in 0..200 {
2157            let batch = viz.poll();
2158            all_results.extend(batch);
2159            if all_results.len() >= 5 {
2160                break;
2161            }
2162            std::thread::sleep(std::time::Duration::from_millis(10));
2163        }
2164
2165        assert_eq!(all_results.len(), 5);
2166        assert_eq!(viz.pending_count(), 0);
2167    }
2168}