Skip to main content

rustial_engine/
async_data.rs

1//! Async retained data pipeline (worker-equivalent architecture.
2//!
3//! This module provides the infrastructure for offloading heavy per-frame CPU
4//! work (terrain mesh generation, vector tessellation, symbol placement) to
5//! background tasks, matching the MapLibre/Mapbox web worker model.
6//!
7//! ## Architecture
8//!
9//! The [`DataTaskPool`] trait is an executor-agnostic interface that the host
10//! or renderer injects into [`MapState`](crate::MapState).  The engine
11//! submits work items via typed spawn methods and polls completed results
12//! via [`DataTaskResultReceiver::try_recv`] each frame.
13//!
14//! When no task pool is set, `MapState` falls back to synchronous inline
15//! execution (current behavior), ensuring backward compatibility.
16//!
17//! ## Task families
18//!
19//! | Family | Input | Output |
20//! |--------|-------|--------|
21//! | Terrain | `TerrainTaskInput` (tile, elevation grid, config) | `TerrainTaskOutput` (mesh + hillshade) |
22//! | Vector | `VectorTaskInput` (features, style, terrain samples, projection) | `VectorTaskOutput` (tessellated mesh + symbol candidates) |
23//!
24//! ## Retained model
25//!
26//! Completed results are stored per cache key and reused across frames until
27//! invalidated by source data changes, camera origin movement, or zoom-level
28//! style property changes.
29//!
30//! ## Per-tile vector buckets (MapLibre parity)
31//!
32//! Vector features can be spatially partitioned into per-tile buckets using
33//! [`partition_features_by_tile`].  Each bucket is dispatched, tessellated,
34//! and cached independently via [`VectorBucketKey`], matching MapLibre's
35//! per-tile bucket architecture.  Only buckets whose tiles are in the
36//! visible set are collected for rendering; off-screen buckets are retained
37//! in the cache for reuse when the camera pans back.
38
39use crate::camera_projection::CameraProjection;
40use crate::layers::{FeatureProvenance, VectorMeshData, VectorStyle};
41use crate::symbols::SymbolCandidate;
42use crate::terrain::{PreparedHillshadeRaster, TerrainMeshData};
43use crate::tile_manager::TileTextureRegion;
44use crate::geometry::{Feature, FeatureCollection, Geometry};
45use rustial_math::{geo_to_tile, ElevationGrid, GeoCoord, TileId};
46use std::collections::HashMap;
47use std::sync::Arc;
48
49// ---------------------------------------------------------------------------
50// DataTaskPool trait
51// ---------------------------------------------------------------------------
52
53/// A result receiver for completed async data tasks.
54///
55/// Implementations should be non-blocking: [`try_recv`](Self::try_recv)
56/// returns `None` when no result is available yet.
57pub trait DataTaskResultReceiver<T: Send + 'static>: Send + Sync {
58    /// Try to receive a completed result without blocking.
59    ///
60    /// Returns `Some(result)` if a result is ready, `None` otherwise.
61    fn try_recv(&self) -> Option<T>;
62}
63
64/// An executor-agnostic task pool for offloading heavy data pipeline work.
65///
66/// The engine submits work items and polls result channels without depending
67/// on a specific async runtime.  Implementations may use:
68///
69/// - Bevy's `AsyncComputeTaskPool`
70/// - A `rayon` thread pool
71/// - A `tokio` runtime
72/// - A simple `std::thread`-based pool
73///
74/// # Contract
75///
76/// - Spawn methods must not block.  They should enqueue the closure
77///   for execution on a background thread and return a receiver immediately.
78/// - The returned receiver must be `Send + Sync` so it can be stored in
79///   `MapState`.
80/// - The closure must be `Send + 'static` because it will execute on a
81///   different thread.
82pub trait DataTaskPool: Send + Sync {
83    /// Spawn a terrain mesh generation task.
84    fn spawn_terrain(
85        &self,
86        task: Box<dyn FnOnce() -> TerrainTaskOutput + Send + 'static>,
87    ) -> Box<dyn DataTaskResultReceiver<TerrainTaskOutput>>;
88
89    /// Spawn a vector tessellation task.
90    fn spawn_vector(
91        &self,
92        task: Box<dyn FnOnce() -> VectorTaskOutput + Send + 'static>,
93    ) -> Box<dyn DataTaskResultReceiver<VectorTaskOutput>>;
94
95    /// Spawn an MVT decode task.
96    ///
97    /// The closure decodes raw PBF bytes into a [`VectorTileData`] on a
98    /// background thread, matching MapLibre's web worker model.
99    fn spawn_decode(
100        &self,
101        task: Box<dyn FnOnce() -> MvtDecodeOutput + Send + 'static>,
102    ) -> Box<dyn DataTaskResultReceiver<MvtDecodeOutput>>;
103}
104
105// ---------------------------------------------------------------------------
106// Built-in std::thread task pool
107// ---------------------------------------------------------------------------
108
109/// A simple `std::thread`-based [`DataTaskPool`] implementation.
110///
111/// Each spawned task runs on a new OS thread.  This is suitable for
112/// host applications that do not use Bevy or another async runtime.
113///
114/// For production use with many concurrent tasks, prefer a thread-pool
115/// based implementation (e.g. `rayon` or Bevy's `ComputeTaskPool`).
116pub struct ThreadDataTaskPool;
117
118impl ThreadDataTaskPool {
119    /// Create a new thread-based task pool.
120    pub fn new() -> Self {
121        Self
122    }
123}
124
125impl Default for ThreadDataTaskPool {
126    fn default() -> Self {
127        Self::new()
128    }
129}
130
131struct ChannelReceiver<T> {
132    rx: std::sync::mpsc::Receiver<T>,
133}
134
135impl<T: Send + 'static> DataTaskResultReceiver<T> for ChannelReceiver<T> {
136    fn try_recv(&self) -> Option<T> {
137        self.rx.try_recv().ok()
138    }
139}
140
141// Safety: `std::sync::mpsc::Receiver` is `Send` but not `Sync` by default.
142// However, we only access it via `try_recv()` which is safe to call from
143// any single thread (MapState is behind RwLock with exclusive write access).
144// We wrap it to satisfy the Sync bound required by MapState's Send + Sync.
145unsafe impl<T: Send> Sync for ChannelReceiver<T> {}
146
147fn spawn_on_thread<T: Send + 'static>(
148    task: Box<dyn FnOnce() -> T + Send + 'static>,
149) -> Box<dyn DataTaskResultReceiver<T>> {
150    let (tx, rx) = std::sync::mpsc::channel();
151    std::thread::spawn(move || {
152        let result = task();
153        let _ = tx.send(result);
154    });
155    Box::new(ChannelReceiver { rx })
156}
157
158impl DataTaskPool for ThreadDataTaskPool {
159    fn spawn_terrain(
160        &self,
161        task: Box<dyn FnOnce() -> TerrainTaskOutput + Send + 'static>,
162    ) -> Box<dyn DataTaskResultReceiver<TerrainTaskOutput>> {
163        spawn_on_thread(task)
164    }
165
166    fn spawn_vector(
167        &self,
168        task: Box<dyn FnOnce() -> VectorTaskOutput + Send + 'static>,
169    ) -> Box<dyn DataTaskResultReceiver<VectorTaskOutput>> {
170        spawn_on_thread(task)
171    }
172
173    fn spawn_decode(
174        &self,
175        task: Box<dyn FnOnce() -> MvtDecodeOutput + Send + 'static>,
176    ) -> Box<dyn DataTaskResultReceiver<MvtDecodeOutput>> {
177        spawn_on_thread(task)
178    }
179}
180
181// ---------------------------------------------------------------------------
182// Task inputs and outputs
183// ---------------------------------------------------------------------------
184
185/// Input for an async terrain mesh generation task.
186#[derive(Clone)]
187pub struct TerrainTaskInput {
188    /// Tile to generate terrain for.
189    pub tile: TileId,
190    /// DEM source tile backing this visible terrain tile.
191    pub elevation_source_tile: TileId,
192    /// Normalized sub-region inside the DEM source tile.
193    pub elevation_region: TileTextureRegion,
194    /// Elevation grid data for this tile.
195    pub elevation: ElevationGrid,
196    /// Mesh grid resolution.
197    pub resolution: u16,
198    /// Vertical exaggeration factor.
199    pub vertical_exaggeration: f64,
200    /// Generation counter for cache invalidation.
201    pub generation: u64,
202}
203
204/// Output from an async terrain mesh generation task.
205pub struct TerrainTaskOutput {
206    /// The tile this result is for.
207    pub tile: TileId,
208    /// Generated terrain mesh descriptor.
209    pub mesh: TerrainMeshData,
210    /// Prepared hillshade raster for this tile.
211    pub hillshade: PreparedHillshadeRaster,
212    /// Generation counter for cache invalidation.
213    pub generation: u64,
214}
215
216/// Cache key for a retained terrain task result.
217#[derive(Debug, Clone, PartialEq, Eq, Hash)]
218pub struct TerrainCacheKey {
219    /// Tile id.
220    pub tile: TileId,
221    /// Elevation data generation.
222    pub generation: u64,
223    /// Grid resolution.
224    pub resolution: u16,
225}
226
227/// Input for an async vector tessellation task.
228#[derive(Clone)]
229pub struct VectorTaskInput {
230    /// Unique key for this (layer, data generation) pair.
231    pub cache_key: VectorCacheKey,
232    /// The feature collection to tessellate.
233    pub features: FeatureCollection,
234    /// The vector style to apply.
235    pub style: VectorStyle,
236    /// Originating style/runtime layer id, when known.
237    pub query_layer_id: Option<String>,
238    /// Originating style source id, when known.
239    pub query_source_id: Option<String>,
240    /// Originating style source-layer id, when known.
241    pub query_source_layer: Option<String>,
242    /// Per-feature tile/source-layer provenance.
243    pub feature_provenance: Vec<Option<FeatureProvenance>>,
244    /// Active camera projection.
245    pub projection: CameraProjection,
246    /// Per-vertex terrain elevation samples (coord -> elevation).
247    /// Empty when terrain is disabled.
248    pub terrain_samples: Vec<(GeoCoord, f64)>,
249}
250
251/// Output from an async vector tessellation task.
252pub struct VectorTaskOutput {
253    /// Cache key matching the input.
254    pub cache_key: VectorCacheKey,
255    /// Tessellated mesh data.
256    pub mesh: VectorMeshData,
257    /// Symbol candidates extracted during tessellation.
258    pub symbol_candidates: Vec<SymbolCandidate>,
259}
260
261/// Output from an async MVT decode task.
262///
263/// Produced by [`DataTaskPool::spawn_decode`] on a background thread and
264/// consumed by [`AsyncDataPipeline::poll_decodes`] to promote raw vector
265/// payloads in the tile cache to fully decoded [`TileData::Vector`] data.
266pub struct MvtDecodeOutput {
267    /// The tile that was decoded.
268    pub tile: TileId,
269    /// Decode result -- either decoded vector data or an error.
270    pub result: Result<crate::tile_source::VectorTileData, crate::tile_source::TileError>,
271    /// Freshness metadata from the original HTTP response, carried through
272    /// so that the promoted cache entry retains correct TTL information.
273    pub freshness: crate::tile_source::TileFreshness,
274}
275
276/// Cache key for a retained vector task result.
277#[derive(Debug, Clone, PartialEq, Eq, Hash)]
278pub struct VectorCacheKey {
279    /// Layer identifier (name or style layer id).
280    pub layer_id: String,
281    /// Data generation counter.
282    pub data_generation: u64,
283    /// Projection used for tessellation.
284    pub projection: CameraProjection,
285}
286
287// ---------------------------------------------------------------------------
288// Per-tile vector bucket key and partitioning
289// ---------------------------------------------------------------------------
290
291/// Cache key for a per-tile vector bucket.
292///
293/// This is the fine-grained cache key that matches MapLibre's per-tile
294/// bucket architecture.  Each `(layer, tile, generation, projection)` tuple
295/// maps to an independently tessellated and cached mesh.
296#[derive(Debug, Clone, PartialEq, Eq, Hash)]
297pub struct VectorBucketKey {
298    /// Layer identifier.
299    pub layer_id: String,
300    /// Tile that this bucket covers.
301    pub tile: TileId,
302    /// Data generation counter for cache invalidation.
303    pub data_generation: u64,
304    /// Projection used for tessellation.
305    pub projection: CameraProjection,
306}
307
308/// Return the representative coordinate for a feature's geometry.
309///
310/// Uses the first coordinate found in the geometry (point position,
311/// first line vertex, first polygon vertex, etc.).  This is sufficient
312/// for tile assignment: a feature is assigned to exactly one tile based
313/// on its representative point, matching MapLibre's bucket-assignment
314/// strategy.
315fn representative_coord(geometry: &Geometry) -> Option<GeoCoord> {
316    match geometry {
317        Geometry::Point(p) => Some(p.coord),
318        Geometry::LineString(ls) => ls.coords.first().copied(),
319        Geometry::Polygon(poly) => poly.exterior.first().copied(),
320        Geometry::MultiPoint(mp) => mp.points.first().map(|p| p.coord),
321        Geometry::MultiLineString(mls) => {
322            mls.lines.first().and_then(|ls| ls.coords.first().copied())
323        }
324        Geometry::MultiPolygon(mpoly) => {
325            mpoly.polygons.first().and_then(|p| p.exterior.first().copied())
326        }
327        Geometry::GeometryCollection(geoms) => {
328            geoms.iter().find_map(representative_coord)
329        }
330    }
331}
332
333/// Partition a feature collection into per-tile sub-collections.
334///
335/// Each feature is assigned to exactly one tile based on its representative
336/// coordinate (first vertex).  Features whose geometry has no coordinates
337/// are dropped.
338///
339/// # Arguments
340///
341/// * `features` - The source feature collection to partition.
342/// * `zoom` - The zoom level at which to compute tile assignments.
343///
344/// # Returns
345///
346/// A map from `TileId` to the subset of features that fall within that tile.
347pub fn partition_features_by_tile(
348    features: &FeatureCollection,
349    zoom: u8,
350) -> HashMap<TileId, FeatureCollection> {
351    let mut buckets: HashMap<TileId, Vec<Feature>> = HashMap::new();
352    for feature in &features.features {
353        if let Some(coord) = representative_coord(&feature.geometry) {
354            let tile_coord = geo_to_tile(&coord, zoom);
355            let tile_id = tile_coord.tile_id();
356            buckets.entry(tile_id).or_default().push(feature.clone());
357        }
358    }
359    buckets
360        .into_iter()
361        .map(|(tile, feats)| (tile, FeatureCollection { features: feats }))
362        .collect()
363}
364
365// ---------------------------------------------------------------------------
366// Pending task handles
367// ---------------------------------------------------------------------------
368
369struct PendingTerrainTask {
370    key: TerrainCacheKey,
371    receiver: Box<dyn DataTaskResultReceiver<TerrainTaskOutput>>,
372}
373
374struct PendingVectorTask {
375    key: VectorCacheKey,
376    receiver: Box<dyn DataTaskResultReceiver<VectorTaskOutput>>,
377}
378
379struct PendingVectorBucketTask {
380    key: VectorBucketKey,
381    receiver: Box<dyn DataTaskResultReceiver<VectorBucketOutput>>,
382}
383
384struct PendingDecodeTask {
385    tile: TileId,
386    receiver: Box<dyn DataTaskResultReceiver<MvtDecodeOutput>>,
387}
388
389/// Output from an async per-tile vector bucket tessellation task.
390struct VectorBucketOutput {
391    mesh: VectorMeshData,
392    symbol_candidates: Vec<SymbolCandidate>,
393}
394
395// ---------------------------------------------------------------------------
396// AsyncDataPipeline
397// ---------------------------------------------------------------------------
398
399/// Manages async data task dispatch and result polling.
400///
401/// This is the internal coordinator that [`MapState`](crate::MapState) uses
402/// to submit terrain/vector/symbol work to a [`DataTaskPool`] and collect
403/// completed results each frame.
404pub(crate) struct AsyncDataPipeline {
405    pool: Arc<dyn DataTaskPool>,
406    pending_terrain: Vec<PendingTerrainTask>,
407    pending_vector: Vec<PendingVectorTask>,
408    terrain_cache: HashMap<TerrainCacheKey, (TerrainMeshData, PreparedHillshadeRaster)>,
409    vector_cache: HashMap<VectorCacheKey, (VectorMeshData, Vec<SymbolCandidate>)>,
410    next_vector_generation: u64,
411    layer_generations: HashMap<String, u64>,
412    // Per-tile vector bucket state
413    pending_buckets: Vec<PendingVectorBucketTask>,
414    bucket_cache: HashMap<VectorBucketKey, (VectorMeshData, Vec<SymbolCandidate>)>,
415    // Background MVT decode state
416    pending_decodes: Vec<PendingDecodeTask>,
417    /// Completed decode results waiting to be promoted into the tile cache.
418    ///
419    /// Each entry is a `(TileId, TileResponse)` ready to replace the
420    /// `TileData::RawVector` entry in the tile cache with a fully decoded
421    /// `TileData::Vector`.
422    pub(crate) decoded_tiles: Vec<(TileId, crate::tile_source::TileResponse)>,
423}
424
425impl AsyncDataPipeline {
426    pub(crate) fn new(pool: Arc<dyn DataTaskPool>) -> Self {
427        Self {
428            pool,
429            pending_terrain: Vec::new(),
430            pending_vector: Vec::new(),
431            terrain_cache: HashMap::new(),
432            vector_cache: HashMap::new(),
433            next_vector_generation: 1,
434            layer_generations: HashMap::new(),
435            pending_buckets: Vec::new(),
436            bucket_cache: HashMap::new(),
437            pending_decodes: Vec::new(),
438            decoded_tiles: Vec::new(),
439        }
440    }
441
442    // -- Terrain ----------------------------------------------------------
443
444    pub(crate) fn dispatch_terrain(&mut self, input: TerrainTaskInput) {
445        let key = TerrainCacheKey {
446            tile: input.tile,
447            generation: input.generation,
448            resolution: input.resolution,
449        };
450        if self.terrain_cache.contains_key(&key) {
451            return;
452        }
453        if self.pending_terrain.iter().any(|p| p.key == key) {
454            return;
455        }
456        let receiver = self.pool.spawn_terrain(Box::new(move || {
457            let mesh = crate::terrain::build_terrain_descriptor_with_source(
458                &input.tile,
459                input.elevation_source_tile,
460                input.elevation_region,
461                &input.elevation,
462                input.resolution,
463                input.vertical_exaggeration,
464                input.generation,
465            );
466            let hillshade = crate::terrain::prepare_hillshade_raster(
467                &input.elevation,
468                input.vertical_exaggeration,
469                input.generation,
470            );
471            TerrainTaskOutput {
472                tile: input.tile,
473                mesh,
474                hillshade,
475                generation: input.generation,
476            }
477        }));
478        self.pending_terrain.push(PendingTerrainTask { key, receiver });
479    }
480
481    pub(crate) fn poll_terrain(&mut self) {
482        let mut still_pending = Vec::new();
483        for task in self.pending_terrain.drain(..) {
484            if let Some(output) = task.receiver.try_recv() {
485                self.terrain_cache
486                    .insert(task.key, (output.mesh, output.hillshade));
487            } else {
488                still_pending.push(task);
489            }
490        }
491        self.pending_terrain = still_pending;
492    }
493
494    #[allow(dead_code)]
495    pub(crate) fn collect_terrain(
496        &self,
497        desired_tiles: &[TileId],
498        tile_generations: &HashMap<TileId, u64>,
499        resolution: u16,
500    ) -> (Vec<TerrainMeshData>, Vec<PreparedHillshadeRaster>) {
501        let mut meshes = Vec::with_capacity(desired_tiles.len());
502        let mut hillshades = Vec::with_capacity(desired_tiles.len());
503        for tile in desired_tiles {
504            let generation = tile_generations.get(tile).copied().unwrap_or(0);
505            let key = TerrainCacheKey {
506                tile: *tile,
507                generation,
508                resolution,
509            };
510            if let Some((mesh, hs)) = self.terrain_cache.get(&key) {
511                meshes.push(mesh.clone());
512                hillshades.push(hs.clone());
513            }
514        }
515        (meshes, hillshades)
516    }
517
518    /// Collect all currently cached terrain results regardless of tile key.
519    pub(crate) fn collect_all_terrain(&self) -> (Vec<TerrainMeshData>, Vec<PreparedHillshadeRaster>) {
520        let mut meshes = Vec::with_capacity(self.terrain_cache.len());
521        let mut hillshades = Vec::with_capacity(self.terrain_cache.len());
522        for (mesh, hs) in self.terrain_cache.values() {
523            meshes.push(mesh.clone());
524            hillshades.push(hs.clone());
525        }
526        (meshes, hillshades)
527    }
528
529    pub(crate) fn prune_terrain(&mut self, visible_tiles: &[TileId]) {
530        let visible_set: std::collections::HashSet<TileId> =
531            visible_tiles.iter().copied().collect();
532        self.terrain_cache
533            .retain(|key, _| visible_set.contains(&key.tile));
534    }
535
536    // -- Per-layer vector (legacy) ----------------------------------------
537
538    pub(crate) fn layer_generation(&mut self, layer_id: &str, features_changed: bool) -> u64 {
539        if features_changed {
540            let gen = self.next_vector_generation;
541            self.next_vector_generation += 1;
542            self.layer_generations.insert(layer_id.to_owned(), gen);
543            gen
544        } else {
545            self.layer_generations
546                .get(layer_id)
547                .copied()
548                .unwrap_or_else(|| {
549                    let gen = self.next_vector_generation;
550                    self.next_vector_generation += 1;
551                    self.layer_generations.insert(layer_id.to_owned(), gen);
552                    gen
553                })
554        }
555    }
556
557    pub(crate) fn dispatch_vector(&mut self, input: VectorTaskInput) {
558        let key = input.cache_key.clone();
559        if self.vector_cache.contains_key(&key) {
560            return;
561        }
562        if self.pending_vector.iter().any(|p| p.key == key) {
563            return;
564        }
565        let receiver = self.pool.spawn_vector(Box::new(move || {
566            let mut features = input.features;
567            if !input.terrain_samples.is_empty() {
568                apply_terrain_samples(&mut features, &input.terrain_samples);
569            }
570            let mut temp_layer = crate::layers::VectorLayer::new(
571                &input.cache_key.layer_id,
572                FeatureCollection::default(),
573                input.style,
574            )
575            .with_query_metadata(input.query_layer_id, input.query_source_id)
576            .with_source_layer(input.query_source_layer);
577            temp_layer.set_features_with_provenance(features, input.feature_provenance);
578            let mesh = temp_layer.tessellate(input.projection);
579            let candidates = temp_layer.symbol_candidates();
580            VectorTaskOutput {
581                cache_key: input.cache_key,
582                mesh,
583                symbol_candidates: candidates,
584            }
585        }));
586        self.pending_vector.push(PendingVectorTask { key, receiver });
587    }
588
589    pub(crate) fn poll_vector(&mut self) {
590        let mut still_pending = Vec::new();
591        for task in self.pending_vector.drain(..) {
592            if let Some(output) = task.receiver.try_recv() {
593                self.vector_cache
594                    .insert(task.key, (output.mesh, output.symbol_candidates));
595            } else {
596                still_pending.push(task);
597            }
598        }
599        self.pending_vector = still_pending;
600    }
601
602    pub(crate) fn collect_vectors(
603        &self,
604        layer_keys: &[VectorCacheKey],
605    ) -> (Vec<VectorMeshData>, Vec<SymbolCandidate>) {
606        let mut meshes = Vec::new();
607        let mut candidates = Vec::new();
608        for key in layer_keys {
609            if let Some((mesh, cands)) = self.vector_cache.get(key) {
610                if !mesh.is_empty() {
611                    meshes.push(mesh.clone());
612                }
613                candidates.extend(cands.iter().cloned());
614            }
615        }
616        (meshes, candidates)
617    }
618
619    pub(crate) fn prune_vectors(&mut self, active_layer_ids: &[String]) {
620        let active_set: std::collections::HashSet<&str> =
621            active_layer_ids.iter().map(String::as_str).collect();
622        self.vector_cache
623            .retain(|key, _| active_set.contains(key.layer_id.as_str()));
624        self.layer_generations
625            .retain(|id, _| active_set.contains(id.as_str()));
626    }
627
628    #[cfg(test)]
629    pub(crate) fn current_layer_generation(&self, layer_id: &str) -> Option<u64> {
630        self.layer_generations.get(layer_id).copied()
631    }
632
633    // -- Per-tile vector buckets ------------------------------------------
634
635    /// Dispatch per-tile vector tessellation tasks for a partitioned feature set.
636    ///
637    /// For each `(tile, features)` pair, a background task is spawned that
638    /// tessellates only the features belonging to that tile.  Results are
639    /// cached by [`VectorBucketKey`] and deduplicated against both the
640    /// pending queue and the completed cache.
641    pub(crate) fn dispatch_vector_buckets(
642        &mut self,
643        layer_id: &str,
644        data_generation: u64,
645        projection: CameraProjection,
646        style: &VectorStyle,
647        tile_features: &HashMap<TileId, FeatureCollection>,
648        query_source_id: Option<&str>,
649        query_source_layer: Option<&str>,
650        tile_provenance: &HashMap<TileId, Vec<Option<FeatureProvenance>>>,
651        terrain_samples: &[(GeoCoord, f64)],
652    ) {
653        for (tile, features) in tile_features {
654            let key = VectorBucketKey {
655                layer_id: layer_id.to_owned(),
656                tile: *tile,
657                data_generation,
658                projection,
659            };
660            if self.bucket_cache.contains_key(&key) {
661                continue;
662            }
663            if self.pending_buckets.iter().any(|p| p.key == key) {
664                continue;
665            }
666            let task_key = key.clone();
667            let task_features = features.clone();
668            let task_style = style.clone();
669            let task_samples = terrain_samples.to_vec();
670            let task_layer_id = layer_id.to_owned();
671            let task_source_id = query_source_id.map(ToOwned::to_owned);
672            let task_source_layer = query_source_layer.map(ToOwned::to_owned);
673            let task_provenance = tile_provenance.get(tile).cloned().unwrap_or_default();
674
675            let receiver = self.pool.spawn_vector(Box::new(move || {
676                let mut feats = task_features;
677                if !task_samples.is_empty() {
678                    apply_terrain_samples(&mut feats, &task_samples);
679                }
680                let mut temp_layer = crate::layers::VectorLayer::new(
681                    &task_layer_id,
682                    FeatureCollection::default(),
683                    task_style,
684                )
685                .with_query_metadata(Some(task_layer_id.clone()), task_source_id)
686                .with_source_layer(task_source_layer);
687                temp_layer.set_features_with_provenance(feats, task_provenance);
688                let mesh = temp_layer.tessellate(task_key.projection);
689                let candidates = temp_layer.symbol_candidates();
690                VectorTaskOutput {
691                    cache_key: VectorCacheKey {
692                        layer_id: task_key.layer_id.clone(),
693                        data_generation: task_key.data_generation,
694                        projection: task_key.projection,
695                    },
696                    mesh,
697                    symbol_candidates: candidates,
698                }
699            }));
700
701            // Wrap the VectorTaskOutput receiver into a VectorBucketOutput receiver
702            self.pending_buckets.push(PendingVectorBucketTask {
703                key,
704                receiver: Box::new(BucketReceiverAdapter {
705                    inner: receiver,
706                }),
707            });
708        }
709    }
710
711    /// Poll all pending per-tile vector bucket tasks.
712    pub(crate) fn poll_vector_buckets(&mut self) {
713        let mut still_pending = Vec::new();
714        for task in self.pending_buckets.drain(..) {
715            if let Some(output) = task.receiver.try_recv() {
716                self.bucket_cache
717                    .insert(task.key, (output.mesh, output.symbol_candidates));
718            } else {
719                still_pending.push(task);
720            }
721        }
722        self.pending_buckets = still_pending;
723    }
724
725    // -- Background MVT decode --------------------------------------------
726
727    /// Submit a raw vector tile payload for background decoding.
728    ///
729    /// The decode closure runs on the [`DataTaskPool`] and produces an
730    /// [`MvtDecodeOutput`] that is collected by [`poll_decodes`](Self::poll_decodes).
731    pub(crate) fn dispatch_decode(
732        &mut self,
733        tile: TileId,
734        raw: &crate::tile_source::RawVectorPayload,
735        freshness: crate::tile_source::TileFreshness,
736    ) {
737        // Do not double-dispatch if already pending.
738        if self.pending_decodes.iter().any(|t| t.tile == tile) {
739            return;
740        }
741
742        let bytes = Arc::clone(&raw.bytes);
743        let opts = raw.decode_options.clone();
744        let task_tile = tile;
745
746        let receiver = self.pool.spawn_decode(Box::new(move || {
747            let result = crate::mvt::decode_mvt(&bytes, &task_tile, &opts)
748                .map(|layers| crate::tile_source::VectorTileData { layers })
749                .map_err(|e| {
750                    crate::tile_source::TileError::Decode(format!("MVT decode: {e}"))
751                });
752            MvtDecodeOutput {
753                tile: task_tile,
754                result,
755                freshness,
756            }
757        }));
758
759        self.pending_decodes.push(PendingDecodeTask { tile, receiver });
760    }
761
762    /// Poll all pending background MVT decode tasks.
763    ///
764    /// Successfully decoded tiles are collected into [`decoded_tiles`](Self::decoded_tiles)
765    /// for the caller to promote into the tile cache.  Failed decodes are
766    /// also collected so the tile cache can transition to the Failed state.
767    pub(crate) fn poll_decodes(&mut self) {
768        let mut still_pending = Vec::new();
769        for task in self.pending_decodes.drain(..) {
770            if let Some(output) = task.receiver.try_recv() {
771                match output.result {
772                    Ok(vector_data) => {
773                        self.decoded_tiles.push((
774                            output.tile,
775                            crate::tile_source::TileResponse {
776                                data: crate::tile_source::TileData::Vector(vector_data),
777                                freshness: output.freshness,
778                                not_modified: false,
779                            },
780                        ));
781                    }
782                    Err(_err) => {
783                        // Failed decodes: the tile manager will handle the
784                        // cache transition when it sees the tile is still
785                        // in RawVector state on the next frame.
786                    }
787                }
788            } else {
789                still_pending.push(task);
790            }
791        }
792        self.pending_decodes = still_pending;
793    }
794
795    /// Take all completed decode results, leaving the internal buffer empty.
796    pub(crate) fn take_decoded_tiles(
797        &mut self,
798    ) -> Vec<(TileId, crate::tile_source::TileResponse)> {
799        std::mem::take(&mut self.decoded_tiles)
800    }
801
802    /// Number of decode tasks currently in flight.
803    #[allow(dead_code)]
804    pub(crate) fn pending_decode_count(&self) -> usize {
805        self.pending_decodes.len()
806    }
807
808    /// Collect tessellated vector data for the visible tile set.
809    ///
810    /// Only buckets whose tile is in `visible_tiles` are included in the
811    /// output.  Buckets for off-screen tiles remain in the cache for reuse
812    /// when the camera pans back.
813    pub(crate) fn collect_vector_buckets(
814        &self,
815        layer_id: &str,
816        data_generation: u64,
817        projection: CameraProjection,
818        visible_tiles: &[TileId],
819    ) -> (Vec<VectorMeshData>, Vec<SymbolCandidate>) {
820        let mut meshes = Vec::new();
821        let mut candidates = Vec::new();
822        for tile in visible_tiles {
823            let key = VectorBucketKey {
824                layer_id: layer_id.to_owned(),
825                tile: *tile,
826                data_generation,
827                projection,
828            };
829            if let Some((mesh, cands)) = self.bucket_cache.get(&key) {
830                if !mesh.is_empty() {
831                    meshes.push(mesh.clone());
832                }
833                candidates.extend(cands.iter().cloned());
834            }
835        }
836        (meshes, candidates)
837    }
838
839    /// Prune per-tile vector buckets for layers that are no longer active.
840    ///
841    /// Buckets for active layers whose tiles are off-screen are **retained**
842    /// (they will be reused when the camera pans back).  Only buckets for
843    /// entirely removed layers are evicted.
844    #[allow(dead_code)]
845    pub(crate) fn prune_vector_buckets(&mut self, active_layer_ids: &[String]) {
846        let active_set: std::collections::HashSet<&str> =
847            active_layer_ids.iter().map(String::as_str).collect();
848        self.bucket_cache
849            .retain(|key, _| active_set.contains(key.layer_id.as_str()));
850    }
851
852    pub(crate) fn evict_vector_buckets(&mut self, layer_id: &str, tiles: &[TileId]) {
853        if tiles.is_empty() {
854            return;
855        }
856        let tile_set: std::collections::HashSet<TileId> = tiles.iter().copied().collect();
857        self.bucket_cache
858            .retain(|key, _| key.layer_id != layer_id || !tile_set.contains(&key.tile));
859        self.pending_buckets
860            .retain(|task| task.key.layer_id != layer_id || !tile_set.contains(&task.key.tile));
861    }
862
863    /// Prune per-tile vector buckets for a specific layer to only retain
864    /// buckets matching the given visible tile set.
865    ///
866    /// This is a more aggressive pruning strategy that evicts off-screen
867    /// tile buckets for a layer, useful when memory pressure is high.
868    #[allow(dead_code)]
869    pub(crate) fn prune_vector_buckets_for_layer(
870        &mut self,
871        layer_id: &str,
872        visible_tiles: &[TileId],
873    ) {
874        let visible_set: std::collections::HashSet<TileId> =
875            visible_tiles.iter().copied().collect();
876        self.bucket_cache.retain(|key, _| {
877            key.layer_id != layer_id || visible_set.contains(&key.tile)
878        });
879    }
880
881    /// Return the number of cached per-tile vector buckets.
882    #[allow(dead_code)]
883    pub(crate) fn bucket_cache_len(&self) -> usize {
884        self.bucket_cache.len()
885    }
886
887    // -- General ----------------------------------------------------------
888
889    #[allow(dead_code)]
890    pub(crate) fn has_pending_tasks(&self) -> bool {
891        !self.pending_terrain.is_empty()
892            || !self.pending_vector.is_empty()
893            || !self.pending_buckets.is_empty()
894    }
895}
896
897/// Adapter that converts a `DataTaskResultReceiver<VectorTaskOutput>` into
898/// a `DataTaskResultReceiver<VectorBucketOutput>` by attaching the bucket key.
899struct BucketReceiverAdapter {
900    inner: Box<dyn DataTaskResultReceiver<VectorTaskOutput>>,
901}
902
903impl DataTaskResultReceiver<VectorBucketOutput> for BucketReceiverAdapter {
904    fn try_recv(&self) -> Option<VectorBucketOutput> {
905        self.inner.try_recv().map(|output| VectorBucketOutput {
906            mesh: output.mesh,
907            symbol_candidates: output.symbol_candidates,
908        })
909    }
910}
911
912fn apply_terrain_samples(features: &mut FeatureCollection, samples: &[(GeoCoord, f64)]) {
913    let lookup: HashMap<(i64, i64), f64> = samples
914        .iter()
915        .map(|(coord, elev)| {
916            let key = (
917                (coord.lat * 100_000.0).round() as i64,
918                (coord.lon * 100_000.0).round() as i64,
919            );
920            (key, *elev)
921        })
922        .collect();
923    for feature in &mut features.features {
924        apply_terrain_to_geometry(&mut feature.geometry, &lookup);
925    }
926}
927
928fn apply_terrain_to_geometry(
929    geometry: &mut crate::geometry::Geometry,
930    lookup: &HashMap<(i64, i64), f64>,
931) {
932    use crate::geometry::Geometry;
933    match geometry {
934        Geometry::Point(p) => {
935            if let Some(&elev) = lookup.get(&coord_key(&p.coord)) {
936                p.coord.alt = elev;
937            }
938        }
939        Geometry::LineString(ls) => {
940            for coord in &mut ls.coords {
941                if let Some(&elev) = lookup.get(&coord_key(coord)) {
942                    coord.alt = elev;
943                }
944            }
945        }
946        Geometry::Polygon(poly) => {
947            for coord in &mut poly.exterior {
948                if let Some(&elev) = lookup.get(&coord_key(coord)) {
949                    coord.alt = elev;
950                }
951            }
952            for hole in &mut poly.interiors {
953                for coord in hole {
954                    if let Some(&elev) = lookup.get(&coord_key(coord)) {
955                        coord.alt = elev;
956                    }
957                }
958            }
959        }
960        Geometry::MultiPoint(mp) => {
961            for p in &mut mp.points {
962                if let Some(&elev) = lookup.get(&coord_key(&p.coord)) {
963                    p.coord.alt = elev;
964                }
965            }
966        }
967        Geometry::MultiLineString(mls) => {
968            for ls in &mut mls.lines {
969                for coord in &mut ls.coords {
970                    if let Some(&elev) = lookup.get(&coord_key(coord)) {
971                        coord.alt = elev;
972                    }
973                }
974            }
975        }
976        Geometry::MultiPolygon(mpoly) => {
977            for poly in &mut mpoly.polygons {
978                for coord in &mut poly.exterior {
979                    if let Some(&elev) = lookup.get(&coord_key(coord)) {
980                        coord.alt = elev;
981                    }
982                }
983                for hole in &mut poly.interiors {
984                    for coord in hole {
985                        if let Some(&elev) = lookup.get(&coord_key(coord)) {
986                            coord.alt = elev;
987                        }
988                    }
989                }
990            }
991        }
992        Geometry::GeometryCollection(geoms) => {
993            for g in geoms {
994                apply_terrain_to_geometry(g, lookup);
995            }
996        }
997    }
998}
999
1000fn coord_key(coord: &GeoCoord) -> (i64, i64) {
1001    (
1002        (coord.lat * 100_000.0).round() as i64,
1003        (coord.lon * 100_000.0).round() as i64,
1004    )
1005}
1006
1007// ---------------------------------------------------------------------------
1008// Async visualization preprocessing pipeline
1009// ---------------------------------------------------------------------------
1010
1011/// Output of an asynchronous visualization preprocessing task.
1012///
1013/// Contains the preprocessed data ready to be applied to a layer via
1014/// the corresponding `MapState::set_*` method.
1015pub enum VisualizationTaskOutput {
1016    /// Preprocessed scalar field values (for grid scalar / grid extrusion).
1017    ScalarFieldUpdate {
1018        /// Layer name to update.
1019        layer_name: String,
1020        /// New scalar values.
1021        values: Vec<f32>,
1022    },
1023    /// Preprocessed column instances (for instanced column layers).
1024    ColumnUpdate {
1025        /// Layer name to update.
1026        layer_name: String,
1027        /// New column instance set.
1028        columns: crate::visualization::ColumnInstanceSet,
1029        /// Colour ramp (included in case the ramp changes with the data).
1030        ramp: crate::visualization::ColorRamp,
1031    },
1032    /// Preprocessed point instances (for point cloud layers).
1033    PointCloudUpdate {
1034        /// Layer name to update.
1035        layer_name: String,
1036        /// New point instance set.
1037        points: crate::visualization::PointInstanceSet,
1038        /// Colour ramp.
1039        ramp: crate::visualization::ColorRamp,
1040    },
1041}
1042
1043/// Asynchronous preprocessing pipeline for large visualization data updates.
1044///
1045/// Use this when visualization data preparation (binning, normalisation,
1046/// aggregation, coordinate transforms) is too heavy for the main thread's
1047/// per-frame budget.  The pipeline offloads work to the same
1048/// [`DataTaskPool`] used for terrain and vector tessellation.
1049///
1050/// # Usage
1051///
1052/// ```rust,no_run
1053/// # use rustial_engine::{AsyncVisualizationPipeline, ThreadDataTaskPool, VisualizationTaskOutput};
1054/// # use std::sync::Arc;
1055/// let pool: Arc<dyn rustial_engine::DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1056/// let mut viz_pipeline = AsyncVisualizationPipeline::new(pool);
1057///
1058/// // Dispatch a heavy computation on a background thread.
1059/// viz_pipeline.dispatch(Box::new(|| {
1060///     // Expensive data processing...
1061///     let values: Vec<f32> = (0..10_000).map(|i| (i as f32).sin()).collect();
1062///     VisualizationTaskOutput::ScalarFieldUpdate {
1063///         layer_name: "heatmap".to_string(),
1064///         values,
1065///     }
1066/// }));
1067///
1068/// // Each frame, poll for completed results.
1069/// for result in viz_pipeline.poll() {
1070///     match result {
1071///         VisualizationTaskOutput::ScalarFieldUpdate { layer_name, values } => {
1072///             // Apply: state.update_scalar_field(&layer_name, values);
1073///         }
1074///         _ => {}
1075///     }
1076/// }
1077/// ```
1078pub struct AsyncVisualizationPipeline {
1079    _pool: Arc<dyn DataTaskPool>,
1080    pending: Vec<Box<dyn DataTaskResultReceiver<VisualizationTaskOutput>>>,
1081}
1082
1083impl AsyncVisualizationPipeline {
1084    /// Create a new async visualization pipeline backed by the given task pool.
1085    pub fn new(pool: Arc<dyn DataTaskPool>) -> Self {
1086        Self {
1087            _pool: pool,
1088            pending: Vec::new(),
1089        }
1090    }
1091
1092    /// Dispatch a visualization preprocessing task to the background thread pool.
1093    ///
1094    /// The closure runs on a background thread and should return a
1095    /// [`VisualizationTaskOutput`] with the preprocessed data.
1096    pub fn dispatch(
1097        &mut self,
1098        task: Box<dyn FnOnce() -> VisualizationTaskOutput + Send + 'static>,
1099    ) {
1100        let receiver = spawn_on_thread(task);
1101        self.pending.push(receiver);
1102    }
1103
1104    /// Poll all pending tasks and return completed results.
1105    ///
1106    /// Call this once per frame from the main thread.  Incomplete tasks
1107    /// remain in the pending queue for the next poll.
1108    pub fn poll(&mut self) -> Vec<VisualizationTaskOutput> {
1109        let mut completed = Vec::new();
1110        let mut still_pending = Vec::new();
1111
1112        for receiver in self.pending.drain(..) {
1113            match receiver.try_recv() {
1114                Some(result) => completed.push(result),
1115                None => still_pending.push(receiver),
1116            }
1117        }
1118
1119        self.pending = still_pending;
1120        completed
1121    }
1122
1123    /// Return the number of tasks currently in-flight.
1124    pub fn pending_count(&self) -> usize {
1125        self.pending.len()
1126    }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131    use super::*;
1132    use crate::camera_projection::CameraProjection;
1133    use crate::geometry::{Feature, Geometry, Point, LineString, Polygon};
1134    use crate::layers::VectorStyle;
1135    use rustial_math::{ElevationGrid, GeoCoord, TileId};
1136    use std::collections::HashMap;
1137
1138    #[test]
1139    fn thread_pool_dispatches_and_receives_terrain() {
1140        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1141        let mut pipeline = AsyncDataPipeline::new(pool);
1142
1143        let tile = TileId::new(0, 0, 0);
1144        let elev = ElevationGrid::flat(tile, 4, 4);
1145        pipeline.dispatch_terrain(TerrainTaskInput {
1146            tile,
1147            elevation_source_tile: tile,
1148            elevation_region: TileTextureRegion::FULL,
1149            elevation: elev,
1150            resolution: 4,
1151            vertical_exaggeration: 1.0,
1152            generation: 1,
1153        });
1154
1155        for _ in 0..100 {
1156            pipeline.poll_terrain();
1157            if !pipeline.has_pending_tasks() {
1158                break;
1159            }
1160            std::thread::sleep(std::time::Duration::from_millis(10));
1161        }
1162
1163        let mut gens = HashMap::new();
1164        gens.insert(tile, 1u64);
1165        let (meshes, hillshades) = pipeline.collect_terrain(&[tile], &gens, 4);
1166        assert_eq!(meshes.len(), 1);
1167        assert_eq!(hillshades.len(), 1);
1168        assert_eq!(meshes[0].tile, tile);
1169    }
1170
1171    #[test]
1172    fn thread_pool_dispatches_and_receives_vector() {
1173        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1174        let mut pipeline = AsyncDataPipeline::new(pool);
1175
1176        let features = crate::geometry::FeatureCollection {
1177            features: vec![Feature {
1178                geometry: Geometry::Point(Point {
1179                    coord: GeoCoord::from_lat_lon(0.0, 0.0),
1180                }),
1181                properties: HashMap::new(),
1182            }],
1183        };
1184
1185        let key = VectorCacheKey {
1186            layer_id: "test".into(),
1187            data_generation: 1,
1188            projection: CameraProjection::WebMercator,
1189        };
1190
1191        pipeline.dispatch_vector(VectorTaskInput {
1192            cache_key: key.clone(),
1193            features,
1194            style: VectorStyle::default(),
1195            query_layer_id: Some("test".into()),
1196            query_source_id: None,
1197            query_source_layer: None,
1198            feature_provenance: Vec::new(),
1199            projection: CameraProjection::WebMercator,
1200            terrain_samples: Vec::new(),
1201        });
1202
1203        for _ in 0..100 {
1204            pipeline.poll_vector();
1205            if !pipeline.has_pending_tasks() {
1206                break;
1207            }
1208            std::thread::sleep(std::time::Duration::from_millis(10));
1209        }
1210
1211        let (meshes, _candidates) = pipeline.collect_vectors(&[key]);
1212        assert_eq!(meshes.len(), 1);
1213        assert!(meshes[0].vertex_count() > 0);
1214    }
1215
1216    #[test]
1217    fn duplicate_dispatch_is_deduplicated() {
1218        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1219        let mut pipeline = AsyncDataPipeline::new(pool);
1220
1221        let tile = TileId::new(0, 0, 0);
1222        let elev = ElevationGrid::flat(tile, 4, 4);
1223        let input = TerrainTaskInput {
1224            tile,
1225            elevation_source_tile: tile,
1226            elevation_region: TileTextureRegion::FULL,
1227            elevation: elev,
1228            resolution: 4,
1229            vertical_exaggeration: 1.0,
1230            generation: 1,
1231        };
1232
1233        pipeline.dispatch_terrain(input.clone());
1234        pipeline.dispatch_terrain(input.clone());
1235        assert_eq!(pipeline.pending_terrain.len(), 1);
1236    }
1237
1238    #[test]
1239    fn cached_result_prevents_redispatch() {
1240        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1241        let mut pipeline = AsyncDataPipeline::new(pool);
1242
1243        let tile = TileId::new(0, 0, 0);
1244        let elev = ElevationGrid::flat(tile, 4, 4);
1245        let input = TerrainTaskInput {
1246            tile,
1247            elevation_source_tile: tile,
1248            elevation_region: TileTextureRegion::FULL,
1249            elevation: elev,
1250            resolution: 4,
1251            vertical_exaggeration: 1.0,
1252            generation: 1,
1253        };
1254
1255        pipeline.dispatch_terrain(input.clone());
1256
1257        for _ in 0..100 {
1258            pipeline.poll_terrain();
1259            if !pipeline.has_pending_tasks() {
1260                break;
1261            }
1262            std::thread::sleep(std::time::Duration::from_millis(10));
1263        }
1264
1265        pipeline.dispatch_terrain(input);
1266        assert!(pipeline.pending_terrain.is_empty());
1267    }
1268
1269    #[test]
1270    fn prune_terrain_removes_stale_entries() {
1271        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1272        let mut pipeline = AsyncDataPipeline::new(pool);
1273
1274        let tile_a = TileId::new(1, 0, 0);
1275        let tile_b = TileId::new(1, 1, 0);
1276        let elev_a = ElevationGrid::flat(tile_a, 4, 4);
1277        let elev_b = ElevationGrid::flat(tile_b, 4, 4);
1278        let mesh_a = crate::terrain::build_terrain_descriptor(&tile_a, &elev_a, 4, 1.0, 1);
1279        let mesh_b = crate::terrain::build_terrain_descriptor(&tile_b, &elev_b, 4, 1.0, 1);
1280        let hs_a = crate::terrain::prepare_hillshade_raster(&elev_a, 1.0, 1);
1281        let hs_b = crate::terrain::prepare_hillshade_raster(&elev_b, 1.0, 1);
1282
1283        pipeline.terrain_cache.insert(
1284            TerrainCacheKey { tile: tile_a, generation: 1, resolution: 4 },
1285            (mesh_a, hs_a),
1286        );
1287        pipeline.terrain_cache.insert(
1288            TerrainCacheKey { tile: tile_b, generation: 1, resolution: 4 },
1289            (mesh_b, hs_b),
1290        );
1291
1292        assert_eq!(pipeline.terrain_cache.len(), 2);
1293        pipeline.prune_terrain(&[tile_a]);
1294        assert_eq!(pipeline.terrain_cache.len(), 1);
1295        assert!(pipeline.terrain_cache.keys().all(|k| k.tile == tile_a));
1296    }
1297
1298    #[test]
1299    fn synchronous_fallback_path_still_works() {
1300        let mut state = crate::MapState::new();
1301        state.set_viewport(800, 600);
1302        state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
1303        state.set_camera_distance(1_000.0);
1304        state.update();
1305        assert!(state.zoom_level() <= 22);
1306    }
1307
1308    #[test]
1309    fn layer_generation_tracking() {
1310        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1311        let mut pipeline = AsyncDataPipeline::new(pool);
1312
1313        let gen1 = pipeline.layer_generation("layer-a", false);
1314        let gen2 = pipeline.layer_generation("layer-a", false);
1315        assert_eq!(gen1, gen2);
1316
1317        let gen3 = pipeline.layer_generation("layer-a", true);
1318        assert!(gen3 > gen2);
1319
1320        let gen4 = pipeline.layer_generation("layer-a", false);
1321        assert_eq!(gen3, gen4);
1322    }
1323
1324    #[test]
1325    fn collect_all_terrain_returns_cached_entries() {
1326        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1327        let mut pipeline = AsyncDataPipeline::new(pool);
1328
1329        let tile = TileId::new(0, 0, 0);
1330        let elev = ElevationGrid::flat(tile, 4, 4);
1331        pipeline.dispatch_terrain(TerrainTaskInput {
1332            tile,
1333            elevation_source_tile: tile,
1334            elevation_region: TileTextureRegion::FULL,
1335            elevation: elev,
1336            resolution: 4,
1337            vertical_exaggeration: 1.0,
1338            generation: 1,
1339        });
1340
1341        for _ in 0..100 {
1342            pipeline.poll_terrain();
1343            if !pipeline.has_pending_tasks() {
1344                break;
1345            }
1346            std::thread::sleep(std::time::Duration::from_millis(10));
1347        }
1348
1349        let (meshes, hillshades) = pipeline.collect_all_terrain();
1350        assert_eq!(meshes.len(), 1);
1351        assert_eq!(hillshades.len(), 1);
1352        assert_eq!(meshes[0].tile, tile);
1353    }
1354
1355    #[test]
1356    fn async_pipeline_terrain_via_map_state() {
1357        use crate::terrain::{FlatElevationSource, TerrainConfig};
1358
1359        let config = TerrainConfig {
1360            enabled: true,
1361            mesh_resolution: 4,
1362            source: Box::new(FlatElevationSource::new(4, 4)),
1363            ..TerrainConfig::default()
1364        };
1365        let mut state = crate::MapState::with_terrain(config, 100);
1366        state.set_viewport(800, 600);
1367        state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
1368        state.set_camera_distance(10_000_000.0);
1369
1370        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1371        state.set_task_pool(pool);
1372        assert!(state.has_async_pipeline());
1373
1374        // First update: dispatches terrain tasks.
1375        state.update_with_dt(1.0 / 60.0);
1376
1377        // Allow background tasks to complete.
1378        for _ in 0..100 {
1379            std::thread::sleep(std::time::Duration::from_millis(10));
1380            state.update_with_dt(1.0 / 60.0);
1381            if !state.terrain_meshes().is_empty() {
1382                break;
1383            }
1384        }
1385
1386        assert!(
1387            !state.terrain_meshes().is_empty(),
1388            "async pipeline should produce terrain meshes"
1389        );
1390    }
1391
1392    #[test]
1393    fn async_pipeline_vectors_via_map_state() {
1394        use crate::layers::{VectorLayer, VectorStyle};
1395
1396        let mut state = crate::MapState::new();
1397        state.set_viewport(800, 600);
1398        state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
1399        state.set_camera_distance(1_000.0);
1400
1401        let fc = crate::geometry::FeatureCollection {
1402            features: vec![Feature {
1403                geometry: Geometry::Point(Point {
1404                    coord: GeoCoord::from_lat_lon(0.0, 0.0),
1405                }),
1406                properties: HashMap::new(),
1407            }],
1408        };
1409        let vl = VectorLayer::new("test-vec", fc, VectorStyle::default());
1410        state.push_layer(Box::new(vl));
1411
1412        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1413        state.set_task_pool(pool);
1414
1415        // First update: dispatches vector tasks.
1416        state.update_with_dt(1.0 / 60.0);
1417
1418        // Allow background tasks to complete.
1419        for _ in 0..100 {
1420            std::thread::sleep(std::time::Duration::from_millis(10));
1421            state.update_with_dt(1.0 / 60.0);
1422            if !state.vector_meshes().is_empty() {
1423                break;
1424            }
1425        }
1426
1427        assert!(
1428            !state.vector_meshes().is_empty(),
1429            "async pipeline should produce vector meshes"
1430        );
1431    }
1432
1433    #[test]
1434    fn async_pipeline_clear_reverts_to_sync() {
1435        let mut state = crate::MapState::new();
1436        state.set_viewport(800, 600);
1437        state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
1438        state.set_camera_distance(1_000.0);
1439
1440        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1441        state.set_task_pool(pool);
1442        assert!(state.has_async_pipeline());
1443
1444        state.clear_task_pool();
1445        assert!(!state.has_async_pipeline());
1446
1447        // Synchronous update should still work.
1448        state.update();
1449        assert!(state.zoom_level() <= 22);
1450    }
1451
1452    #[test]
1453    fn data_update_interval_ignored_with_async_pipeline() {
1454        use crate::terrain::{FlatElevationSource, TerrainConfig};
1455
1456        let config = TerrainConfig {
1457            enabled: true,
1458            mesh_resolution: 4,
1459            source: Box::new(FlatElevationSource::new(4, 4)),
1460            ..TerrainConfig::default()
1461        };
1462        let mut state = crate::MapState::with_terrain(config, 100);
1463        state.set_viewport(800, 600);
1464        state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
1465        state.set_camera_distance(10_000_000.0);
1466        state.set_data_update_interval(100.0); // Very high throttle
1467
1468        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1469        state.set_task_pool(pool);
1470
1471        // Even with a very high throttle, async dispatch should still run
1472        // because the throttle is ignored when an async pipeline is active.
1473        for _ in 0..100 {
1474            state.update_with_dt(1.0 / 60.0);
1475            std::thread::sleep(std::time::Duration::from_millis(10));
1476            if !state.terrain_meshes().is_empty() {
1477                break;
1478            }
1479        }
1480
1481        assert!(
1482            !state.terrain_meshes().is_empty(),
1483            "async pipeline should produce terrain despite high throttle"
1484        );
1485    }
1486
1487    // =====================================================================
1488    // Per-tile vector bucket tests
1489    // =====================================================================
1490
1491    #[test]
1492    fn partition_features_by_tile_separates_spatially() {
1493        let features = FeatureCollection {
1494            features: vec![
1495                Feature {
1496                    geometry: Geometry::Point(Point {
1497                        coord: GeoCoord::from_lat_lon(10.0, 10.0),
1498                    }),
1499                    properties: HashMap::new(),
1500                },
1501                Feature {
1502                    geometry: Geometry::Point(Point {
1503                        coord: GeoCoord::from_lat_lon(-30.0, -60.0),
1504                    }),
1505                    properties: HashMap::new(),
1506                },
1507                Feature {
1508                    geometry: Geometry::Point(Point {
1509                        coord: GeoCoord::from_lat_lon(10.01, 10.01),
1510                    }),
1511                    properties: HashMap::new(),
1512                },
1513            ],
1514        };
1515
1516        let buckets = partition_features_by_tile(&features, 4);
1517        let total: usize = buckets.values().map(|fc| fc.len()).sum();
1518        assert_eq!(total, 3, "all features should be partitioned");
1519        // The first and third points are very close and should be in the same tile at zoom 4
1520        let tile_a = geo_to_tile(&GeoCoord::from_lat_lon(10.0, 10.0), 4).tile_id();
1521        let tile_c = geo_to_tile(&GeoCoord::from_lat_lon(10.01, 10.01), 4).tile_id();
1522        assert_eq!(tile_a, tile_c);
1523        assert_eq!(buckets.get(&tile_a).map(|fc| fc.len()), Some(2));
1524    }
1525
1526    #[test]
1527    fn partition_features_handles_linestring_and_polygon() {
1528        let features = FeatureCollection {
1529            features: vec![
1530                Feature {
1531                    geometry: Geometry::LineString(LineString {
1532                        coords: vec![
1533                            GeoCoord::from_lat_lon(45.0, 90.0),
1534                            GeoCoord::from_lat_lon(46.0, 91.0),
1535                        ],
1536                    }),
1537                    properties: HashMap::new(),
1538                },
1539                Feature {
1540                    geometry: Geometry::Polygon(Polygon {
1541                        exterior: vec![
1542                            GeoCoord::from_lat_lon(-10.0, -20.0),
1543                            GeoCoord::from_lat_lon(-10.0, -19.0),
1544                            GeoCoord::from_lat_lon(-9.0, -19.0),
1545                        ],
1546                        interiors: vec![],
1547                    }),
1548                    properties: HashMap::new(),
1549                },
1550            ],
1551        };
1552
1553        let buckets = partition_features_by_tile(&features, 6);
1554        let total: usize = buckets.values().map(|fc| fc.len()).sum();
1555        assert_eq!(total, 2);
1556    }
1557
1558    #[test]
1559    fn partition_features_drops_empty_geometry() {
1560        let features = FeatureCollection {
1561            features: vec![Feature {
1562                geometry: Geometry::GeometryCollection(vec![]),
1563                properties: HashMap::new(),
1564            }],
1565        };
1566
1567        let buckets = partition_features_by_tile(&features, 4);
1568        assert!(buckets.is_empty(), "empty geometry should not produce a bucket");
1569    }
1570
1571    #[test]
1572    fn dispatch_and_poll_vector_buckets() {
1573        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1574        let mut pipeline = AsyncDataPipeline::new(pool);
1575
1576        let tile_a = geo_to_tile(&GeoCoord::from_lat_lon(10.0, 10.0), 4).tile_id();
1577        let tile_b = geo_to_tile(&GeoCoord::from_lat_lon(-30.0, -60.0), 4).tile_id();
1578
1579        let features = FeatureCollection {
1580            features: vec![
1581                Feature {
1582                    geometry: Geometry::Point(Point {
1583                        coord: GeoCoord::from_lat_lon(10.0, 10.0),
1584                    }),
1585                    properties: HashMap::new(),
1586                },
1587                Feature {
1588                    geometry: Geometry::Point(Point {
1589                        coord: GeoCoord::from_lat_lon(-30.0, -60.0),
1590                    }),
1591                    properties: HashMap::new(),
1592                },
1593            ],
1594        };
1595
1596        let tile_features = partition_features_by_tile(&features, 4);
1597        assert!(tile_features.len() >= 2);
1598
1599        pipeline.dispatch_vector_buckets(
1600            "test-layer",
1601            1,
1602            CameraProjection::WebMercator,
1603            &VectorStyle::default(),
1604            &tile_features,
1605            None,
1606            None,
1607            &HashMap::new(),
1608            &[],
1609        );
1610
1611        for _ in 0..100 {
1612            pipeline.poll_vector_buckets();
1613            if !pipeline.has_pending_tasks() {
1614                break;
1615            }
1616            std::thread::sleep(std::time::Duration::from_millis(10));
1617        }
1618
1619        // Collect only tile_a
1620        let (meshes_a, _) = pipeline.collect_vector_buckets(
1621            "test-layer", 1, CameraProjection::WebMercator, &[tile_a],
1622        );
1623        assert_eq!(meshes_a.len(), 1, "should have one mesh for tile_a");
1624
1625        // Collect only tile_b
1626        let (meshes_b, _) = pipeline.collect_vector_buckets(
1627            "test-layer", 1, CameraProjection::WebMercator, &[tile_b],
1628        );
1629        assert_eq!(meshes_b.len(), 1, "should have one mesh for tile_b");
1630
1631        // Collect both
1632        let (meshes_both, _) = pipeline.collect_vector_buckets(
1633            "test-layer", 1, CameraProjection::WebMercator, &[tile_a, tile_b],
1634        );
1635        assert_eq!(meshes_both.len(), 2, "should have meshes for both tiles");
1636    }
1637
1638    #[test]
1639    fn bucket_dispatch_deduplicates() {
1640        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1641        let mut pipeline = AsyncDataPipeline::new(pool);
1642
1643        let features = FeatureCollection {
1644            features: vec![Feature {
1645                geometry: Geometry::Point(Point {
1646                    coord: GeoCoord::from_lat_lon(10.0, 10.0),
1647                }),
1648                properties: HashMap::new(),
1649            }],
1650        };
1651        let tile_features = partition_features_by_tile(&features, 4);
1652
1653        pipeline.dispatch_vector_buckets(
1654            "layer", 1, CameraProjection::WebMercator,
1655            &VectorStyle::default(), &tile_features, None, None, &HashMap::new(), &[],
1656        );
1657        let first_count = pipeline.pending_buckets.len();
1658
1659        pipeline.dispatch_vector_buckets(
1660            "layer", 1, CameraProjection::WebMercator,
1661            &VectorStyle::default(), &tile_features, None, None, &HashMap::new(), &[],
1662        );
1663        assert_eq!(pipeline.pending_buckets.len(), first_count, "duplicate dispatch should be deduped");
1664    }
1665
1666    #[test]
1667    fn bucket_cache_hit_prevents_redispatch() {
1668        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1669        let mut pipeline = AsyncDataPipeline::new(pool);
1670
1671        let features = FeatureCollection {
1672            features: vec![Feature {
1673                geometry: Geometry::Point(Point {
1674                    coord: GeoCoord::from_lat_lon(10.0, 10.0),
1675                }),
1676                properties: HashMap::new(),
1677            }],
1678        };
1679        let tile_features = partition_features_by_tile(&features, 4);
1680
1681        pipeline.dispatch_vector_buckets(
1682            "layer", 1, CameraProjection::WebMercator,
1683            &VectorStyle::default(), &tile_features, None, None, &HashMap::new(), &[],
1684        );
1685
1686        for _ in 0..100 {
1687            pipeline.poll_vector_buckets();
1688            if !pipeline.has_pending_tasks() {
1689                break;
1690            }
1691            std::thread::sleep(std::time::Duration::from_millis(10));
1692        }
1693
1694        assert!(pipeline.bucket_cache_len() > 0);
1695
1696        pipeline.dispatch_vector_buckets(
1697            "layer", 1, CameraProjection::WebMercator,
1698            &VectorStyle::default(), &tile_features, None, None, &HashMap::new(), &[],
1699        );
1700        assert!(pipeline.pending_buckets.is_empty(), "cached result should prevent redispatch");
1701    }
1702
1703    #[test]
1704    fn prune_vector_buckets_removes_inactive_layers() {
1705        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1706        let mut pipeline = AsyncDataPipeline::new(pool);
1707
1708        let tile = TileId::new(4, 0, 0);
1709        let key_a = VectorBucketKey {
1710            layer_id: "keep".into(),
1711            tile,
1712            data_generation: 1,
1713            projection: CameraProjection::WebMercator,
1714        };
1715        let key_b = VectorBucketKey {
1716            layer_id: "remove".into(),
1717            tile,
1718            data_generation: 1,
1719            projection: CameraProjection::WebMercator,
1720        };
1721        pipeline.bucket_cache.insert(key_a, (VectorMeshData::default(), vec![]));
1722        pipeline.bucket_cache.insert(key_b, (VectorMeshData::default(), vec![]));
1723        assert_eq!(pipeline.bucket_cache_len(), 2);
1724
1725        pipeline.prune_vector_buckets(&["keep".to_string()]);
1726        assert_eq!(pipeline.bucket_cache_len(), 1);
1727        assert!(pipeline.bucket_cache.keys().all(|k| k.layer_id == "keep"));
1728    }
1729
1730    #[test]
1731    fn prune_vector_buckets_for_layer_removes_offscreen_tiles() {
1732        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1733        let mut pipeline = AsyncDataPipeline::new(pool);
1734
1735        let tile_a = TileId::new(4, 0, 0);
1736        let tile_b = TileId::new(4, 1, 0);
1737        let tile_c = TileId::new(4, 2, 0);
1738
1739        for tile in &[tile_a, tile_b, tile_c] {
1740            pipeline.bucket_cache.insert(
1741                VectorBucketKey {
1742                    layer_id: "layer".into(),
1743                    tile: *tile,
1744                    data_generation: 1,
1745                    projection: CameraProjection::WebMercator,
1746                },
1747                (VectorMeshData::default(), vec![]),
1748            );
1749        }
1750        assert_eq!(pipeline.bucket_cache_len(), 3);
1751
1752        pipeline.prune_vector_buckets_for_layer("layer", &[tile_a, tile_c]);
1753        assert_eq!(pipeline.bucket_cache_len(), 2);
1754        assert!(pipeline.bucket_cache.keys().all(|k| k.tile == tile_a || k.tile == tile_c));
1755    }
1756
1757    #[test]
1758    fn collect_vector_buckets_only_returns_visible() {
1759        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1760        let mut pipeline = AsyncDataPipeline::new(pool);
1761
1762        let tile_visible = TileId::new(4, 5, 5);
1763        let tile_offscreen = TileId::new(4, 10, 10);
1764
1765        let mesh = VectorMeshData {
1766            positions: vec![[0.0, 0.0, 0.0], [1.0, 0.0, 0.0], [0.0, 1.0, 0.0]],
1767            colors: vec![[1.0; 4]; 3],
1768            indices: vec![0, 1, 2],
1769            ..Default::default()
1770        };
1771        for tile in &[tile_visible, tile_offscreen] {
1772            pipeline.bucket_cache.insert(
1773                VectorBucketKey {
1774                    layer_id: "layer".into(),
1775                    tile: *tile,
1776                    data_generation: 1,
1777                    projection: CameraProjection::WebMercator,
1778                },
1779                (mesh.clone(), vec![]),
1780            );
1781        }
1782
1783        let (meshes, _) = pipeline.collect_vector_buckets(
1784            "layer", 1, CameraProjection::WebMercator, &[tile_visible],
1785        );
1786        assert_eq!(meshes.len(), 1, "only visible tile bucket should be collected");
1787    }
1788
1789    #[test]
1790    fn representative_coord_all_geometry_types() {
1791        let p = Geometry::Point(Point { coord: GeoCoord::from_lat_lon(1.0, 2.0) });
1792        assert_eq!(representative_coord(&p).map(|c| (c.lat, c.lon)), Some((1.0, 2.0)));
1793
1794        let ls = Geometry::LineString(LineString {
1795            coords: vec![GeoCoord::from_lat_lon(3.0, 4.0), GeoCoord::from_lat_lon(5.0, 6.0)],
1796        });
1797        assert_eq!(representative_coord(&ls).map(|c| (c.lat, c.lon)), Some((3.0, 4.0)));
1798
1799        let poly = Geometry::Polygon(Polygon {
1800            exterior: vec![
1801                GeoCoord::from_lat_lon(7.0, 8.0),
1802                GeoCoord::from_lat_lon(7.0, 9.0),
1803                GeoCoord::from_lat_lon(8.0, 9.0),
1804            ],
1805            interiors: vec![],
1806        });
1807        assert_eq!(representative_coord(&poly).map(|c| (c.lat, c.lon)), Some((7.0, 8.0)));
1808
1809        let gc = Geometry::GeometryCollection(vec![]);
1810        assert!(representative_coord(&gc).is_none());
1811
1812        let gc_with = Geometry::GeometryCollection(vec![p.clone()]);
1813        assert!(representative_coord(&gc_with).is_some());
1814    }
1815
1816    #[test]
1817    fn dispatch_and_poll_decode_produces_decoded_tile() {
1818        use crate::tile_source::{RawVectorPayload, TileFreshness};
1819        use std::sync::Arc;
1820
1821        // Build a minimal valid MVT tile.
1822        fn build_test_mvt() -> Vec<u8> {
1823            fn encode_varint(mut val: u64) -> Vec<u8> {
1824                let mut buf = Vec::new();
1825                loop {
1826                    let mut byte = (val & 0x7F) as u8;
1827                    val >>= 7;
1828                    if val != 0 { byte |= 0x80; }
1829                    buf.push(byte);
1830                    if val == 0 { break; }
1831                }
1832                buf
1833            }
1834            fn encode_tag(field: u32, wt: u8) -> Vec<u8> {
1835                encode_varint(((field as u64) << 3) | wt as u64)
1836            }
1837            fn encode_ld(field: u32, data: &[u8]) -> Vec<u8> {
1838                let mut b = encode_tag(field, 2);
1839                b.extend(encode_varint(data.len() as u64));
1840                b.extend_from_slice(data);
1841                b
1842            }
1843            fn encode_vi(field: u32, val: u64) -> Vec<u8> {
1844                let mut b = encode_tag(field, 0);
1845                b.extend(encode_varint(val));
1846                b
1847            }
1848            fn zigzag(n: i32) -> u32 { ((n << 1) ^ (n >> 31)) as u32 }
1849
1850            let mut geom = Vec::new();
1851            geom.extend(encode_varint(((1u64) << 3) | 1));
1852            geom.extend(encode_varint(zigzag(2048) as u64));
1853            geom.extend(encode_varint(zigzag(2048) as u64));
1854
1855            let mut feat = Vec::new();
1856            feat.extend(encode_vi(3, 1));
1857            feat.extend(encode_ld(4, &geom));
1858
1859            let mut layer = Vec::new();
1860            layer.extend(encode_ld(1, b"test"));
1861            layer.extend(encode_ld(2, &feat));
1862            layer.extend(encode_vi(5, 4096));
1863            layer.extend(encode_vi(15, 2));
1864
1865            encode_ld(3, &layer)
1866        }
1867
1868        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1869        let mut pipeline = AsyncDataPipeline::new(pool);
1870
1871        let tile = TileId::new(0, 0, 0);
1872        let raw = RawVectorPayload {
1873            tile_id: tile,
1874            bytes: Arc::new(build_test_mvt()),
1875            decode_options: crate::mvt::MvtDecodeOptions::default(),
1876        };
1877
1878        pipeline.dispatch_decode(tile, &raw, TileFreshness::default());
1879
1880        // Poll until the decode completes (should be near-instant).
1881        for _ in 0..50 {
1882            pipeline.poll_decodes();
1883            if !pipeline.decoded_tiles.is_empty() {
1884                break;
1885            }
1886            std::thread::sleep(std::time::Duration::from_millis(10));
1887        }
1888
1889        let decoded = pipeline.take_decoded_tiles();
1890        assert_eq!(decoded.len(), 1, "should have one decoded tile");
1891        let (decoded_id, response) = &decoded[0];
1892        assert_eq!(*decoded_id, tile);
1893        assert!(response.data.is_vector(), "decoded data should be Vector");
1894        let vt = response.data.as_vector().expect("should be Vector");
1895        assert!(vt.layers.contains_key("test"));
1896        assert_eq!(vt.layers["test"].len(), 1);
1897    }
1898
1899    #[test]
1900    fn dispatch_decode_deduplicates() {
1901        use crate::tile_source::{RawVectorPayload, TileFreshness};
1902        use std::sync::Arc;
1903
1904        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1905        let mut pipeline = AsyncDataPipeline::new(pool);
1906
1907        let tile = TileId::new(0, 0, 0);
1908        let raw = RawVectorPayload {
1909            tile_id: tile,
1910            bytes: Arc::new(vec![]),
1911            decode_options: crate::mvt::MvtDecodeOptions::default(),
1912        };
1913
1914        pipeline.dispatch_decode(tile, &raw, TileFreshness::default());
1915        pipeline.dispatch_decode(tile, &raw, TileFreshness::default());
1916
1917        // Only one task should be pending despite two dispatches.
1918        assert_eq!(pipeline.pending_decodes.len(), 1);
1919    }
1920
1921    // -----------------------------------------------------------------------
1922    // AsyncVisualizationPipeline tests
1923    // -----------------------------------------------------------------------
1924
1925    #[test]
1926    fn async_viz_pipeline_dispatches_and_receives_scalar_update() {
1927        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1928        let mut viz = AsyncVisualizationPipeline::new(pool);
1929
1930        viz.dispatch(Box::new(|| {
1931            VisualizationTaskOutput::ScalarFieldUpdate {
1932                layer_name: "density".to_string(),
1933                values: vec![1.0, 2.0, 3.0, 4.0],
1934            }
1935        }));
1936
1937        assert_eq!(viz.pending_count(), 1);
1938
1939        // Busy-poll until complete.
1940        let mut results = Vec::new();
1941        for _ in 0..200 {
1942            results = viz.poll();
1943            if !results.is_empty() {
1944                break;
1945            }
1946            std::thread::sleep(std::time::Duration::from_millis(5));
1947        }
1948
1949        assert_eq!(results.len(), 1);
1950        match &results[0] {
1951            VisualizationTaskOutput::ScalarFieldUpdate { layer_name, values } => {
1952                assert_eq!(layer_name, "density");
1953                assert_eq!(values, &[1.0, 2.0, 3.0, 4.0]);
1954            }
1955            _ => panic!("expected ScalarFieldUpdate"),
1956        }
1957        assert_eq!(viz.pending_count(), 0);
1958    }
1959
1960    #[test]
1961    fn async_viz_pipeline_dispatches_and_receives_column_update() {
1962        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1963        let mut viz = AsyncVisualizationPipeline::new(pool);
1964
1965        viz.dispatch(Box::new(|| {
1966            VisualizationTaskOutput::ColumnUpdate {
1967                layer_name: "bars".to_string(),
1968                columns: crate::visualization::ColumnInstanceSet::new(vec![
1969                    crate::visualization::ColumnInstance::new(
1970                        GeoCoord::from_lat_lon(0.0, 0.0),
1971                        100.0,
1972                        10.0,
1973                    ),
1974                ]),
1975                ramp: crate::visualization::ColorRamp::new(vec![
1976                    crate::visualization::ColorStop { value: 0.0, color: [0.0, 0.0, 1.0, 1.0] },
1977                    crate::visualization::ColorStop { value: 1.0, color: [1.0, 0.0, 0.0, 1.0] },
1978                ]),
1979            }
1980        }));
1981
1982        let mut results = Vec::new();
1983        for _ in 0..200 {
1984            results = viz.poll();
1985            if !results.is_empty() { break; }
1986            std::thread::sleep(std::time::Duration::from_millis(5));
1987        }
1988
1989        assert_eq!(results.len(), 1);
1990        match &results[0] {
1991            VisualizationTaskOutput::ColumnUpdate { layer_name, columns, .. } => {
1992                assert_eq!(layer_name, "bars");
1993                assert_eq!(columns.columns.len(), 1);
1994            }
1995            _ => panic!("expected ColumnUpdate"),
1996        }
1997    }
1998
1999    #[test]
2000    fn async_viz_pipeline_dispatches_and_receives_point_cloud_update() {
2001        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
2002        let mut viz = AsyncVisualizationPipeline::new(pool);
2003
2004        viz.dispatch(Box::new(|| {
2005            VisualizationTaskOutput::PointCloudUpdate {
2006                layer_name: "scatter".to_string(),
2007                points: crate::visualization::PointInstanceSet::new(vec![
2008                    crate::visualization::PointInstance::new(
2009                        GeoCoord::from_lat_lon(1.0, 2.0),
2010                        5.0,
2011                    ),
2012                    crate::visualization::PointInstance::new(
2013                        GeoCoord::from_lat_lon(3.0, 4.0),
2014                        8.0,
2015                    ),
2016                ]),
2017                ramp: crate::visualization::ColorRamp::new(vec![
2018                    crate::visualization::ColorStop { value: 0.0, color: [0.0, 1.0, 0.0, 1.0] },
2019                    crate::visualization::ColorStop { value: 1.0, color: [1.0, 1.0, 0.0, 1.0] },
2020                ]),
2021            }
2022        }));
2023
2024        let mut results = Vec::new();
2025        for _ in 0..200 {
2026            results = viz.poll();
2027            if !results.is_empty() { break; }
2028            std::thread::sleep(std::time::Duration::from_millis(5));
2029        }
2030
2031        assert_eq!(results.len(), 1);
2032        match &results[0] {
2033            VisualizationTaskOutput::PointCloudUpdate { layer_name, points, .. } => {
2034                assert_eq!(layer_name, "scatter");
2035                assert_eq!(points.points.len(), 2);
2036            }
2037            _ => panic!("expected PointCloudUpdate"),
2038        }
2039    }
2040
2041    #[test]
2042    fn async_viz_pipeline_multiple_tasks_complete_independently() {
2043        let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
2044        let mut viz = AsyncVisualizationPipeline::new(pool);
2045
2046        for i in 0..5 {
2047            let name = format!("layer_{i}");
2048            viz.dispatch(Box::new(move || {
2049                VisualizationTaskOutput::ScalarFieldUpdate {
2050                    layer_name: name,
2051                    values: vec![i as f32; 4],
2052                }
2053            }));
2054        }
2055
2056        assert_eq!(viz.pending_count(), 5);
2057
2058        let mut all_results = Vec::new();
2059        for _ in 0..200 {
2060            let batch = viz.poll();
2061            all_results.extend(batch);
2062            if all_results.len() >= 5 { break; }
2063            std::thread::sleep(std::time::Duration::from_millis(10));
2064        }
2065
2066        assert_eq!(all_results.len(), 5);
2067        assert_eq!(viz.pending_count(), 0);
2068    }
2069}