1use crate::camera_projection::CameraProjection;
40use crate::layers::{FeatureProvenance, VectorMeshData, VectorStyle};
41use crate::symbols::SymbolCandidate;
42use crate::terrain::{PreparedHillshadeRaster, TerrainMeshData};
43use crate::tile_manager::TileTextureRegion;
44use crate::geometry::{Feature, FeatureCollection, Geometry};
45use rustial_math::{geo_to_tile, ElevationGrid, GeoCoord, TileId};
46use std::collections::HashMap;
47use std::sync::Arc;
48
49pub trait DataTaskResultReceiver<T: Send + 'static>: Send + Sync {
58 fn try_recv(&self) -> Option<T>;
62}
63
64pub trait DataTaskPool: Send + Sync {
83 fn spawn_terrain(
85 &self,
86 task: Box<dyn FnOnce() -> TerrainTaskOutput + Send + 'static>,
87 ) -> Box<dyn DataTaskResultReceiver<TerrainTaskOutput>>;
88
89 fn spawn_vector(
91 &self,
92 task: Box<dyn FnOnce() -> VectorTaskOutput + Send + 'static>,
93 ) -> Box<dyn DataTaskResultReceiver<VectorTaskOutput>>;
94
95 fn spawn_decode(
100 &self,
101 task: Box<dyn FnOnce() -> MvtDecodeOutput + Send + 'static>,
102 ) -> Box<dyn DataTaskResultReceiver<MvtDecodeOutput>>;
103}
104
105pub struct ThreadDataTaskPool;
117
118impl ThreadDataTaskPool {
119 pub fn new() -> Self {
121 Self
122 }
123}
124
125impl Default for ThreadDataTaskPool {
126 fn default() -> Self {
127 Self::new()
128 }
129}
130
131struct ChannelReceiver<T> {
132 rx: std::sync::mpsc::Receiver<T>,
133}
134
135impl<T: Send + 'static> DataTaskResultReceiver<T> for ChannelReceiver<T> {
136 fn try_recv(&self) -> Option<T> {
137 self.rx.try_recv().ok()
138 }
139}
140
141unsafe impl<T: Send> Sync for ChannelReceiver<T> {}
146
147fn spawn_on_thread<T: Send + 'static>(
148 task: Box<dyn FnOnce() -> T + Send + 'static>,
149) -> Box<dyn DataTaskResultReceiver<T>> {
150 let (tx, rx) = std::sync::mpsc::channel();
151 std::thread::spawn(move || {
152 let result = task();
153 let _ = tx.send(result);
154 });
155 Box::new(ChannelReceiver { rx })
156}
157
158impl DataTaskPool for ThreadDataTaskPool {
159 fn spawn_terrain(
160 &self,
161 task: Box<dyn FnOnce() -> TerrainTaskOutput + Send + 'static>,
162 ) -> Box<dyn DataTaskResultReceiver<TerrainTaskOutput>> {
163 spawn_on_thread(task)
164 }
165
166 fn spawn_vector(
167 &self,
168 task: Box<dyn FnOnce() -> VectorTaskOutput + Send + 'static>,
169 ) -> Box<dyn DataTaskResultReceiver<VectorTaskOutput>> {
170 spawn_on_thread(task)
171 }
172
173 fn spawn_decode(
174 &self,
175 task: Box<dyn FnOnce() -> MvtDecodeOutput + Send + 'static>,
176 ) -> Box<dyn DataTaskResultReceiver<MvtDecodeOutput>> {
177 spawn_on_thread(task)
178 }
179}
180
181#[derive(Clone)]
187pub struct TerrainTaskInput {
188 pub tile: TileId,
190 pub elevation_source_tile: TileId,
192 pub elevation_region: TileTextureRegion,
194 pub elevation: ElevationGrid,
196 pub resolution: u16,
198 pub vertical_exaggeration: f64,
200 pub generation: u64,
202}
203
204pub struct TerrainTaskOutput {
206 pub tile: TileId,
208 pub mesh: TerrainMeshData,
210 pub hillshade: PreparedHillshadeRaster,
212 pub generation: u64,
214}
215
216#[derive(Debug, Clone, PartialEq, Eq, Hash)]
218pub struct TerrainCacheKey {
219 pub tile: TileId,
221 pub generation: u64,
223 pub resolution: u16,
225}
226
227#[derive(Clone)]
229pub struct VectorTaskInput {
230 pub cache_key: VectorCacheKey,
232 pub features: FeatureCollection,
234 pub style: VectorStyle,
236 pub query_layer_id: Option<String>,
238 pub query_source_id: Option<String>,
240 pub query_source_layer: Option<String>,
242 pub feature_provenance: Vec<Option<FeatureProvenance>>,
244 pub projection: CameraProjection,
246 pub terrain_samples: Vec<(GeoCoord, f64)>,
249}
250
251pub struct VectorTaskOutput {
253 pub cache_key: VectorCacheKey,
255 pub mesh: VectorMeshData,
257 pub symbol_candidates: Vec<SymbolCandidate>,
259}
260
261pub struct MvtDecodeOutput {
267 pub tile: TileId,
269 pub result: Result<crate::tile_source::VectorTileData, crate::tile_source::TileError>,
271 pub freshness: crate::tile_source::TileFreshness,
274}
275
276#[derive(Debug, Clone, PartialEq, Eq, Hash)]
278pub struct VectorCacheKey {
279 pub layer_id: String,
281 pub data_generation: u64,
283 pub projection: CameraProjection,
285}
286
287#[derive(Debug, Clone, PartialEq, Eq, Hash)]
297pub struct VectorBucketKey {
298 pub layer_id: String,
300 pub tile: TileId,
302 pub data_generation: u64,
304 pub projection: CameraProjection,
306}
307
308fn representative_coord(geometry: &Geometry) -> Option<GeoCoord> {
316 match geometry {
317 Geometry::Point(p) => Some(p.coord),
318 Geometry::LineString(ls) => ls.coords.first().copied(),
319 Geometry::Polygon(poly) => poly.exterior.first().copied(),
320 Geometry::MultiPoint(mp) => mp.points.first().map(|p| p.coord),
321 Geometry::MultiLineString(mls) => {
322 mls.lines.first().and_then(|ls| ls.coords.first().copied())
323 }
324 Geometry::MultiPolygon(mpoly) => {
325 mpoly.polygons.first().and_then(|p| p.exterior.first().copied())
326 }
327 Geometry::GeometryCollection(geoms) => {
328 geoms.iter().find_map(representative_coord)
329 }
330 }
331}
332
333pub fn partition_features_by_tile(
348 features: &FeatureCollection,
349 zoom: u8,
350) -> HashMap<TileId, FeatureCollection> {
351 let mut buckets: HashMap<TileId, Vec<Feature>> = HashMap::new();
352 for feature in &features.features {
353 if let Some(coord) = representative_coord(&feature.geometry) {
354 let tile_coord = geo_to_tile(&coord, zoom);
355 let tile_id = tile_coord.tile_id();
356 buckets.entry(tile_id).or_default().push(feature.clone());
357 }
358 }
359 buckets
360 .into_iter()
361 .map(|(tile, feats)| (tile, FeatureCollection { features: feats }))
362 .collect()
363}
364
365struct PendingTerrainTask {
370 key: TerrainCacheKey,
371 receiver: Box<dyn DataTaskResultReceiver<TerrainTaskOutput>>,
372}
373
374struct PendingVectorTask {
375 key: VectorCacheKey,
376 receiver: Box<dyn DataTaskResultReceiver<VectorTaskOutput>>,
377}
378
379struct PendingVectorBucketTask {
380 key: VectorBucketKey,
381 receiver: Box<dyn DataTaskResultReceiver<VectorBucketOutput>>,
382}
383
384struct PendingDecodeTask {
385 tile: TileId,
386 receiver: Box<dyn DataTaskResultReceiver<MvtDecodeOutput>>,
387}
388
389struct VectorBucketOutput {
391 mesh: VectorMeshData,
392 symbol_candidates: Vec<SymbolCandidate>,
393}
394
395pub(crate) struct AsyncDataPipeline {
405 pool: Arc<dyn DataTaskPool>,
406 pending_terrain: Vec<PendingTerrainTask>,
407 pending_vector: Vec<PendingVectorTask>,
408 terrain_cache: HashMap<TerrainCacheKey, (TerrainMeshData, PreparedHillshadeRaster)>,
409 vector_cache: HashMap<VectorCacheKey, (VectorMeshData, Vec<SymbolCandidate>)>,
410 next_vector_generation: u64,
411 layer_generations: HashMap<String, u64>,
412 pending_buckets: Vec<PendingVectorBucketTask>,
414 bucket_cache: HashMap<VectorBucketKey, (VectorMeshData, Vec<SymbolCandidate>)>,
415 pending_decodes: Vec<PendingDecodeTask>,
417 pub(crate) decoded_tiles: Vec<(TileId, crate::tile_source::TileResponse)>,
423}
424
425impl AsyncDataPipeline {
426 pub(crate) fn new(pool: Arc<dyn DataTaskPool>) -> Self {
427 Self {
428 pool,
429 pending_terrain: Vec::new(),
430 pending_vector: Vec::new(),
431 terrain_cache: HashMap::new(),
432 vector_cache: HashMap::new(),
433 next_vector_generation: 1,
434 layer_generations: HashMap::new(),
435 pending_buckets: Vec::new(),
436 bucket_cache: HashMap::new(),
437 pending_decodes: Vec::new(),
438 decoded_tiles: Vec::new(),
439 }
440 }
441
442 pub(crate) fn dispatch_terrain(&mut self, input: TerrainTaskInput) {
445 let key = TerrainCacheKey {
446 tile: input.tile,
447 generation: input.generation,
448 resolution: input.resolution,
449 };
450 if self.terrain_cache.contains_key(&key) {
451 return;
452 }
453 if self.pending_terrain.iter().any(|p| p.key == key) {
454 return;
455 }
456 let receiver = self.pool.spawn_terrain(Box::new(move || {
457 let mesh = crate::terrain::build_terrain_descriptor_with_source(
458 &input.tile,
459 input.elevation_source_tile,
460 input.elevation_region,
461 &input.elevation,
462 input.resolution,
463 input.vertical_exaggeration,
464 input.generation,
465 );
466 let hillshade = crate::terrain::prepare_hillshade_raster(
467 &input.elevation,
468 input.vertical_exaggeration,
469 input.generation,
470 );
471 TerrainTaskOutput {
472 tile: input.tile,
473 mesh,
474 hillshade,
475 generation: input.generation,
476 }
477 }));
478 self.pending_terrain.push(PendingTerrainTask { key, receiver });
479 }
480
481 pub(crate) fn poll_terrain(&mut self) {
482 let mut still_pending = Vec::new();
483 for task in self.pending_terrain.drain(..) {
484 if let Some(output) = task.receiver.try_recv() {
485 self.terrain_cache
486 .insert(task.key, (output.mesh, output.hillshade));
487 } else {
488 still_pending.push(task);
489 }
490 }
491 self.pending_terrain = still_pending;
492 }
493
494 #[allow(dead_code)]
495 pub(crate) fn collect_terrain(
496 &self,
497 desired_tiles: &[TileId],
498 tile_generations: &HashMap<TileId, u64>,
499 resolution: u16,
500 ) -> (Vec<TerrainMeshData>, Vec<PreparedHillshadeRaster>) {
501 let mut meshes = Vec::with_capacity(desired_tiles.len());
502 let mut hillshades = Vec::with_capacity(desired_tiles.len());
503 for tile in desired_tiles {
504 let generation = tile_generations.get(tile).copied().unwrap_or(0);
505 let key = TerrainCacheKey {
506 tile: *tile,
507 generation,
508 resolution,
509 };
510 if let Some((mesh, hs)) = self.terrain_cache.get(&key) {
511 meshes.push(mesh.clone());
512 hillshades.push(hs.clone());
513 }
514 }
515 (meshes, hillshades)
516 }
517
518 pub(crate) fn collect_all_terrain(&self) -> (Vec<TerrainMeshData>, Vec<PreparedHillshadeRaster>) {
520 let mut meshes = Vec::with_capacity(self.terrain_cache.len());
521 let mut hillshades = Vec::with_capacity(self.terrain_cache.len());
522 for (mesh, hs) in self.terrain_cache.values() {
523 meshes.push(mesh.clone());
524 hillshades.push(hs.clone());
525 }
526 (meshes, hillshades)
527 }
528
529 pub(crate) fn prune_terrain(&mut self, visible_tiles: &[TileId]) {
530 let visible_set: std::collections::HashSet<TileId> =
531 visible_tiles.iter().copied().collect();
532 self.terrain_cache
533 .retain(|key, _| visible_set.contains(&key.tile));
534 }
535
536 pub(crate) fn layer_generation(&mut self, layer_id: &str, features_changed: bool) -> u64 {
539 if features_changed {
540 let gen = self.next_vector_generation;
541 self.next_vector_generation += 1;
542 self.layer_generations.insert(layer_id.to_owned(), gen);
543 gen
544 } else {
545 self.layer_generations
546 .get(layer_id)
547 .copied()
548 .unwrap_or_else(|| {
549 let gen = self.next_vector_generation;
550 self.next_vector_generation += 1;
551 self.layer_generations.insert(layer_id.to_owned(), gen);
552 gen
553 })
554 }
555 }
556
557 pub(crate) fn dispatch_vector(&mut self, input: VectorTaskInput) {
558 let key = input.cache_key.clone();
559 if self.vector_cache.contains_key(&key) {
560 return;
561 }
562 if self.pending_vector.iter().any(|p| p.key == key) {
563 return;
564 }
565 let receiver = self.pool.spawn_vector(Box::new(move || {
566 let mut features = input.features;
567 if !input.terrain_samples.is_empty() {
568 apply_terrain_samples(&mut features, &input.terrain_samples);
569 }
570 let mut temp_layer = crate::layers::VectorLayer::new(
571 &input.cache_key.layer_id,
572 FeatureCollection::default(),
573 input.style,
574 )
575 .with_query_metadata(input.query_layer_id, input.query_source_id)
576 .with_source_layer(input.query_source_layer);
577 temp_layer.set_features_with_provenance(features, input.feature_provenance);
578 let mesh = temp_layer.tessellate(input.projection);
579 let candidates = temp_layer.symbol_candidates();
580 VectorTaskOutput {
581 cache_key: input.cache_key,
582 mesh,
583 symbol_candidates: candidates,
584 }
585 }));
586 self.pending_vector.push(PendingVectorTask { key, receiver });
587 }
588
589 pub(crate) fn poll_vector(&mut self) {
590 let mut still_pending = Vec::new();
591 for task in self.pending_vector.drain(..) {
592 if let Some(output) = task.receiver.try_recv() {
593 self.vector_cache
594 .insert(task.key, (output.mesh, output.symbol_candidates));
595 } else {
596 still_pending.push(task);
597 }
598 }
599 self.pending_vector = still_pending;
600 }
601
602 pub(crate) fn collect_vectors(
603 &self,
604 layer_keys: &[VectorCacheKey],
605 ) -> (Vec<VectorMeshData>, Vec<SymbolCandidate>) {
606 let mut meshes = Vec::new();
607 let mut candidates = Vec::new();
608 for key in layer_keys {
609 if let Some((mesh, cands)) = self.vector_cache.get(key) {
610 if !mesh.is_empty() {
611 meshes.push(mesh.clone());
612 }
613 candidates.extend(cands.iter().cloned());
614 }
615 }
616 (meshes, candidates)
617 }
618
619 pub(crate) fn prune_vectors(&mut self, active_layer_ids: &[String]) {
620 let active_set: std::collections::HashSet<&str> =
621 active_layer_ids.iter().map(String::as_str).collect();
622 self.vector_cache
623 .retain(|key, _| active_set.contains(key.layer_id.as_str()));
624 self.layer_generations
625 .retain(|id, _| active_set.contains(id.as_str()));
626 }
627
628 #[cfg(test)]
629 pub(crate) fn current_layer_generation(&self, layer_id: &str) -> Option<u64> {
630 self.layer_generations.get(layer_id).copied()
631 }
632
633 pub(crate) fn dispatch_vector_buckets(
642 &mut self,
643 layer_id: &str,
644 data_generation: u64,
645 projection: CameraProjection,
646 style: &VectorStyle,
647 tile_features: &HashMap<TileId, FeatureCollection>,
648 query_source_id: Option<&str>,
649 query_source_layer: Option<&str>,
650 tile_provenance: &HashMap<TileId, Vec<Option<FeatureProvenance>>>,
651 terrain_samples: &[(GeoCoord, f64)],
652 ) {
653 for (tile, features) in tile_features {
654 let key = VectorBucketKey {
655 layer_id: layer_id.to_owned(),
656 tile: *tile,
657 data_generation,
658 projection,
659 };
660 if self.bucket_cache.contains_key(&key) {
661 continue;
662 }
663 if self.pending_buckets.iter().any(|p| p.key == key) {
664 continue;
665 }
666 let task_key = key.clone();
667 let task_features = features.clone();
668 let task_style = style.clone();
669 let task_samples = terrain_samples.to_vec();
670 let task_layer_id = layer_id.to_owned();
671 let task_source_id = query_source_id.map(ToOwned::to_owned);
672 let task_source_layer = query_source_layer.map(ToOwned::to_owned);
673 let task_provenance = tile_provenance.get(tile).cloned().unwrap_or_default();
674
675 let receiver = self.pool.spawn_vector(Box::new(move || {
676 let mut feats = task_features;
677 if !task_samples.is_empty() {
678 apply_terrain_samples(&mut feats, &task_samples);
679 }
680 let mut temp_layer = crate::layers::VectorLayer::new(
681 &task_layer_id,
682 FeatureCollection::default(),
683 task_style,
684 )
685 .with_query_metadata(Some(task_layer_id.clone()), task_source_id)
686 .with_source_layer(task_source_layer);
687 temp_layer.set_features_with_provenance(feats, task_provenance);
688 let mesh = temp_layer.tessellate(task_key.projection);
689 let candidates = temp_layer.symbol_candidates();
690 VectorTaskOutput {
691 cache_key: VectorCacheKey {
692 layer_id: task_key.layer_id.clone(),
693 data_generation: task_key.data_generation,
694 projection: task_key.projection,
695 },
696 mesh,
697 symbol_candidates: candidates,
698 }
699 }));
700
701 self.pending_buckets.push(PendingVectorBucketTask {
703 key,
704 receiver: Box::new(BucketReceiverAdapter {
705 inner: receiver,
706 }),
707 });
708 }
709 }
710
711 pub(crate) fn poll_vector_buckets(&mut self) {
713 let mut still_pending = Vec::new();
714 for task in self.pending_buckets.drain(..) {
715 if let Some(output) = task.receiver.try_recv() {
716 self.bucket_cache
717 .insert(task.key, (output.mesh, output.symbol_candidates));
718 } else {
719 still_pending.push(task);
720 }
721 }
722 self.pending_buckets = still_pending;
723 }
724
725 pub(crate) fn dispatch_decode(
732 &mut self,
733 tile: TileId,
734 raw: &crate::tile_source::RawVectorPayload,
735 freshness: crate::tile_source::TileFreshness,
736 ) {
737 if self.pending_decodes.iter().any(|t| t.tile == tile) {
739 return;
740 }
741
742 let bytes = Arc::clone(&raw.bytes);
743 let opts = raw.decode_options.clone();
744 let task_tile = tile;
745
746 let receiver = self.pool.spawn_decode(Box::new(move || {
747 let result = crate::mvt::decode_mvt(&bytes, &task_tile, &opts)
748 .map(|layers| crate::tile_source::VectorTileData { layers })
749 .map_err(|e| {
750 crate::tile_source::TileError::Decode(format!("MVT decode: {e}"))
751 });
752 MvtDecodeOutput {
753 tile: task_tile,
754 result,
755 freshness,
756 }
757 }));
758
759 self.pending_decodes.push(PendingDecodeTask { tile, receiver });
760 }
761
762 pub(crate) fn poll_decodes(&mut self) {
768 let mut still_pending = Vec::new();
769 for task in self.pending_decodes.drain(..) {
770 if let Some(output) = task.receiver.try_recv() {
771 match output.result {
772 Ok(vector_data) => {
773 self.decoded_tiles.push((
774 output.tile,
775 crate::tile_source::TileResponse {
776 data: crate::tile_source::TileData::Vector(vector_data),
777 freshness: output.freshness,
778 not_modified: false,
779 },
780 ));
781 }
782 Err(_err) => {
783 }
787 }
788 } else {
789 still_pending.push(task);
790 }
791 }
792 self.pending_decodes = still_pending;
793 }
794
795 pub(crate) fn take_decoded_tiles(
797 &mut self,
798 ) -> Vec<(TileId, crate::tile_source::TileResponse)> {
799 std::mem::take(&mut self.decoded_tiles)
800 }
801
802 #[allow(dead_code)]
804 pub(crate) fn pending_decode_count(&self) -> usize {
805 self.pending_decodes.len()
806 }
807
808 pub(crate) fn collect_vector_buckets(
814 &self,
815 layer_id: &str,
816 data_generation: u64,
817 projection: CameraProjection,
818 visible_tiles: &[TileId],
819 ) -> (Vec<VectorMeshData>, Vec<SymbolCandidate>) {
820 let mut meshes = Vec::new();
821 let mut candidates = Vec::new();
822 for tile in visible_tiles {
823 let key = VectorBucketKey {
824 layer_id: layer_id.to_owned(),
825 tile: *tile,
826 data_generation,
827 projection,
828 };
829 if let Some((mesh, cands)) = self.bucket_cache.get(&key) {
830 if !mesh.is_empty() {
831 meshes.push(mesh.clone());
832 }
833 candidates.extend(cands.iter().cloned());
834 }
835 }
836 (meshes, candidates)
837 }
838
839 #[allow(dead_code)]
845 pub(crate) fn prune_vector_buckets(&mut self, active_layer_ids: &[String]) {
846 let active_set: std::collections::HashSet<&str> =
847 active_layer_ids.iter().map(String::as_str).collect();
848 self.bucket_cache
849 .retain(|key, _| active_set.contains(key.layer_id.as_str()));
850 }
851
852 pub(crate) fn evict_vector_buckets(&mut self, layer_id: &str, tiles: &[TileId]) {
853 if tiles.is_empty() {
854 return;
855 }
856 let tile_set: std::collections::HashSet<TileId> = tiles.iter().copied().collect();
857 self.bucket_cache
858 .retain(|key, _| key.layer_id != layer_id || !tile_set.contains(&key.tile));
859 self.pending_buckets
860 .retain(|task| task.key.layer_id != layer_id || !tile_set.contains(&task.key.tile));
861 }
862
863 #[allow(dead_code)]
869 pub(crate) fn prune_vector_buckets_for_layer(
870 &mut self,
871 layer_id: &str,
872 visible_tiles: &[TileId],
873 ) {
874 let visible_set: std::collections::HashSet<TileId> =
875 visible_tiles.iter().copied().collect();
876 self.bucket_cache.retain(|key, _| {
877 key.layer_id != layer_id || visible_set.contains(&key.tile)
878 });
879 }
880
881 #[allow(dead_code)]
883 pub(crate) fn bucket_cache_len(&self) -> usize {
884 self.bucket_cache.len()
885 }
886
887 #[allow(dead_code)]
890 pub(crate) fn has_pending_tasks(&self) -> bool {
891 !self.pending_terrain.is_empty()
892 || !self.pending_vector.is_empty()
893 || !self.pending_buckets.is_empty()
894 }
895}
896
897struct BucketReceiverAdapter {
900 inner: Box<dyn DataTaskResultReceiver<VectorTaskOutput>>,
901}
902
903impl DataTaskResultReceiver<VectorBucketOutput> for BucketReceiverAdapter {
904 fn try_recv(&self) -> Option<VectorBucketOutput> {
905 self.inner.try_recv().map(|output| VectorBucketOutput {
906 mesh: output.mesh,
907 symbol_candidates: output.symbol_candidates,
908 })
909 }
910}
911
912fn apply_terrain_samples(features: &mut FeatureCollection, samples: &[(GeoCoord, f64)]) {
913 let lookup: HashMap<(i64, i64), f64> = samples
914 .iter()
915 .map(|(coord, elev)| {
916 let key = (
917 (coord.lat * 100_000.0).round() as i64,
918 (coord.lon * 100_000.0).round() as i64,
919 );
920 (key, *elev)
921 })
922 .collect();
923 for feature in &mut features.features {
924 apply_terrain_to_geometry(&mut feature.geometry, &lookup);
925 }
926}
927
928fn apply_terrain_to_geometry(
929 geometry: &mut crate::geometry::Geometry,
930 lookup: &HashMap<(i64, i64), f64>,
931) {
932 use crate::geometry::Geometry;
933 match geometry {
934 Geometry::Point(p) => {
935 if let Some(&elev) = lookup.get(&coord_key(&p.coord)) {
936 p.coord.alt = elev;
937 }
938 }
939 Geometry::LineString(ls) => {
940 for coord in &mut ls.coords {
941 if let Some(&elev) = lookup.get(&coord_key(coord)) {
942 coord.alt = elev;
943 }
944 }
945 }
946 Geometry::Polygon(poly) => {
947 for coord in &mut poly.exterior {
948 if let Some(&elev) = lookup.get(&coord_key(coord)) {
949 coord.alt = elev;
950 }
951 }
952 for hole in &mut poly.interiors {
953 for coord in hole {
954 if let Some(&elev) = lookup.get(&coord_key(coord)) {
955 coord.alt = elev;
956 }
957 }
958 }
959 }
960 Geometry::MultiPoint(mp) => {
961 for p in &mut mp.points {
962 if let Some(&elev) = lookup.get(&coord_key(&p.coord)) {
963 p.coord.alt = elev;
964 }
965 }
966 }
967 Geometry::MultiLineString(mls) => {
968 for ls in &mut mls.lines {
969 for coord in &mut ls.coords {
970 if let Some(&elev) = lookup.get(&coord_key(coord)) {
971 coord.alt = elev;
972 }
973 }
974 }
975 }
976 Geometry::MultiPolygon(mpoly) => {
977 for poly in &mut mpoly.polygons {
978 for coord in &mut poly.exterior {
979 if let Some(&elev) = lookup.get(&coord_key(coord)) {
980 coord.alt = elev;
981 }
982 }
983 for hole in &mut poly.interiors {
984 for coord in hole {
985 if let Some(&elev) = lookup.get(&coord_key(coord)) {
986 coord.alt = elev;
987 }
988 }
989 }
990 }
991 }
992 Geometry::GeometryCollection(geoms) => {
993 for g in geoms {
994 apply_terrain_to_geometry(g, lookup);
995 }
996 }
997 }
998}
999
1000fn coord_key(coord: &GeoCoord) -> (i64, i64) {
1001 (
1002 (coord.lat * 100_000.0).round() as i64,
1003 (coord.lon * 100_000.0).round() as i64,
1004 )
1005}
1006
1007pub enum VisualizationTaskOutput {
1016 ScalarFieldUpdate {
1018 layer_name: String,
1020 values: Vec<f32>,
1022 },
1023 ColumnUpdate {
1025 layer_name: String,
1027 columns: crate::visualization::ColumnInstanceSet,
1029 ramp: crate::visualization::ColorRamp,
1031 },
1032 PointCloudUpdate {
1034 layer_name: String,
1036 points: crate::visualization::PointInstanceSet,
1038 ramp: crate::visualization::ColorRamp,
1040 },
1041}
1042
1043pub struct AsyncVisualizationPipeline {
1079 _pool: Arc<dyn DataTaskPool>,
1080 pending: Vec<Box<dyn DataTaskResultReceiver<VisualizationTaskOutput>>>,
1081}
1082
1083impl AsyncVisualizationPipeline {
1084 pub fn new(pool: Arc<dyn DataTaskPool>) -> Self {
1086 Self {
1087 _pool: pool,
1088 pending: Vec::new(),
1089 }
1090 }
1091
1092 pub fn dispatch(
1097 &mut self,
1098 task: Box<dyn FnOnce() -> VisualizationTaskOutput + Send + 'static>,
1099 ) {
1100 let receiver = spawn_on_thread(task);
1101 self.pending.push(receiver);
1102 }
1103
1104 pub fn poll(&mut self) -> Vec<VisualizationTaskOutput> {
1109 let mut completed = Vec::new();
1110 let mut still_pending = Vec::new();
1111
1112 for receiver in self.pending.drain(..) {
1113 match receiver.try_recv() {
1114 Some(result) => completed.push(result),
1115 None => still_pending.push(receiver),
1116 }
1117 }
1118
1119 self.pending = still_pending;
1120 completed
1121 }
1122
1123 pub fn pending_count(&self) -> usize {
1125 self.pending.len()
1126 }
1127}
1128
1129#[cfg(test)]
1130mod tests {
1131 use super::*;
1132 use crate::camera_projection::CameraProjection;
1133 use crate::geometry::{Feature, Geometry, Point, LineString, Polygon};
1134 use crate::layers::VectorStyle;
1135 use rustial_math::{ElevationGrid, GeoCoord, TileId};
1136 use std::collections::HashMap;
1137
1138 #[test]
1139 fn thread_pool_dispatches_and_receives_terrain() {
1140 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1141 let mut pipeline = AsyncDataPipeline::new(pool);
1142
1143 let tile = TileId::new(0, 0, 0);
1144 let elev = ElevationGrid::flat(tile, 4, 4);
1145 pipeline.dispatch_terrain(TerrainTaskInput {
1146 tile,
1147 elevation_source_tile: tile,
1148 elevation_region: TileTextureRegion::FULL,
1149 elevation: elev,
1150 resolution: 4,
1151 vertical_exaggeration: 1.0,
1152 generation: 1,
1153 });
1154
1155 for _ in 0..100 {
1156 pipeline.poll_terrain();
1157 if !pipeline.has_pending_tasks() {
1158 break;
1159 }
1160 std::thread::sleep(std::time::Duration::from_millis(10));
1161 }
1162
1163 let mut gens = HashMap::new();
1164 gens.insert(tile, 1u64);
1165 let (meshes, hillshades) = pipeline.collect_terrain(&[tile], &gens, 4);
1166 assert_eq!(meshes.len(), 1);
1167 assert_eq!(hillshades.len(), 1);
1168 assert_eq!(meshes[0].tile, tile);
1169 }
1170
1171 #[test]
1172 fn thread_pool_dispatches_and_receives_vector() {
1173 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1174 let mut pipeline = AsyncDataPipeline::new(pool);
1175
1176 let features = crate::geometry::FeatureCollection {
1177 features: vec![Feature {
1178 geometry: Geometry::Point(Point {
1179 coord: GeoCoord::from_lat_lon(0.0, 0.0),
1180 }),
1181 properties: HashMap::new(),
1182 }],
1183 };
1184
1185 let key = VectorCacheKey {
1186 layer_id: "test".into(),
1187 data_generation: 1,
1188 projection: CameraProjection::WebMercator,
1189 };
1190
1191 pipeline.dispatch_vector(VectorTaskInput {
1192 cache_key: key.clone(),
1193 features,
1194 style: VectorStyle::default(),
1195 query_layer_id: Some("test".into()),
1196 query_source_id: None,
1197 query_source_layer: None,
1198 feature_provenance: Vec::new(),
1199 projection: CameraProjection::WebMercator,
1200 terrain_samples: Vec::new(),
1201 });
1202
1203 for _ in 0..100 {
1204 pipeline.poll_vector();
1205 if !pipeline.has_pending_tasks() {
1206 break;
1207 }
1208 std::thread::sleep(std::time::Duration::from_millis(10));
1209 }
1210
1211 let (meshes, _candidates) = pipeline.collect_vectors(&[key]);
1212 assert_eq!(meshes.len(), 1);
1213 assert!(meshes[0].vertex_count() > 0);
1214 }
1215
1216 #[test]
1217 fn duplicate_dispatch_is_deduplicated() {
1218 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1219 let mut pipeline = AsyncDataPipeline::new(pool);
1220
1221 let tile = TileId::new(0, 0, 0);
1222 let elev = ElevationGrid::flat(tile, 4, 4);
1223 let input = TerrainTaskInput {
1224 tile,
1225 elevation_source_tile: tile,
1226 elevation_region: TileTextureRegion::FULL,
1227 elevation: elev,
1228 resolution: 4,
1229 vertical_exaggeration: 1.0,
1230 generation: 1,
1231 };
1232
1233 pipeline.dispatch_terrain(input.clone());
1234 pipeline.dispatch_terrain(input.clone());
1235 assert_eq!(pipeline.pending_terrain.len(), 1);
1236 }
1237
1238 #[test]
1239 fn cached_result_prevents_redispatch() {
1240 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1241 let mut pipeline = AsyncDataPipeline::new(pool);
1242
1243 let tile = TileId::new(0, 0, 0);
1244 let elev = ElevationGrid::flat(tile, 4, 4);
1245 let input = TerrainTaskInput {
1246 tile,
1247 elevation_source_tile: tile,
1248 elevation_region: TileTextureRegion::FULL,
1249 elevation: elev,
1250 resolution: 4,
1251 vertical_exaggeration: 1.0,
1252 generation: 1,
1253 };
1254
1255 pipeline.dispatch_terrain(input.clone());
1256
1257 for _ in 0..100 {
1258 pipeline.poll_terrain();
1259 if !pipeline.has_pending_tasks() {
1260 break;
1261 }
1262 std::thread::sleep(std::time::Duration::from_millis(10));
1263 }
1264
1265 pipeline.dispatch_terrain(input);
1266 assert!(pipeline.pending_terrain.is_empty());
1267 }
1268
1269 #[test]
1270 fn prune_terrain_removes_stale_entries() {
1271 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1272 let mut pipeline = AsyncDataPipeline::new(pool);
1273
1274 let tile_a = TileId::new(1, 0, 0);
1275 let tile_b = TileId::new(1, 1, 0);
1276 let elev_a = ElevationGrid::flat(tile_a, 4, 4);
1277 let elev_b = ElevationGrid::flat(tile_b, 4, 4);
1278 let mesh_a = crate::terrain::build_terrain_descriptor(&tile_a, &elev_a, 4, 1.0, 1);
1279 let mesh_b = crate::terrain::build_terrain_descriptor(&tile_b, &elev_b, 4, 1.0, 1);
1280 let hs_a = crate::terrain::prepare_hillshade_raster(&elev_a, 1.0, 1);
1281 let hs_b = crate::terrain::prepare_hillshade_raster(&elev_b, 1.0, 1);
1282
1283 pipeline.terrain_cache.insert(
1284 TerrainCacheKey { tile: tile_a, generation: 1, resolution: 4 },
1285 (mesh_a, hs_a),
1286 );
1287 pipeline.terrain_cache.insert(
1288 TerrainCacheKey { tile: tile_b, generation: 1, resolution: 4 },
1289 (mesh_b, hs_b),
1290 );
1291
1292 assert_eq!(pipeline.terrain_cache.len(), 2);
1293 pipeline.prune_terrain(&[tile_a]);
1294 assert_eq!(pipeline.terrain_cache.len(), 1);
1295 assert!(pipeline.terrain_cache.keys().all(|k| k.tile == tile_a));
1296 }
1297
1298 #[test]
1299 fn synchronous_fallback_path_still_works() {
1300 let mut state = crate::MapState::new();
1301 state.set_viewport(800, 600);
1302 state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
1303 state.set_camera_distance(1_000.0);
1304 state.update();
1305 assert!(state.zoom_level() <= 22);
1306 }
1307
1308 #[test]
1309 fn layer_generation_tracking() {
1310 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1311 let mut pipeline = AsyncDataPipeline::new(pool);
1312
1313 let gen1 = pipeline.layer_generation("layer-a", false);
1314 let gen2 = pipeline.layer_generation("layer-a", false);
1315 assert_eq!(gen1, gen2);
1316
1317 let gen3 = pipeline.layer_generation("layer-a", true);
1318 assert!(gen3 > gen2);
1319
1320 let gen4 = pipeline.layer_generation("layer-a", false);
1321 assert_eq!(gen3, gen4);
1322 }
1323
1324 #[test]
1325 fn collect_all_terrain_returns_cached_entries() {
1326 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1327 let mut pipeline = AsyncDataPipeline::new(pool);
1328
1329 let tile = TileId::new(0, 0, 0);
1330 let elev = ElevationGrid::flat(tile, 4, 4);
1331 pipeline.dispatch_terrain(TerrainTaskInput {
1332 tile,
1333 elevation_source_tile: tile,
1334 elevation_region: TileTextureRegion::FULL,
1335 elevation: elev,
1336 resolution: 4,
1337 vertical_exaggeration: 1.0,
1338 generation: 1,
1339 });
1340
1341 for _ in 0..100 {
1342 pipeline.poll_terrain();
1343 if !pipeline.has_pending_tasks() {
1344 break;
1345 }
1346 std::thread::sleep(std::time::Duration::from_millis(10));
1347 }
1348
1349 let (meshes, hillshades) = pipeline.collect_all_terrain();
1350 assert_eq!(meshes.len(), 1);
1351 assert_eq!(hillshades.len(), 1);
1352 assert_eq!(meshes[0].tile, tile);
1353 }
1354
1355 #[test]
1356 fn async_pipeline_terrain_via_map_state() {
1357 use crate::terrain::{FlatElevationSource, TerrainConfig};
1358
1359 let config = TerrainConfig {
1360 enabled: true,
1361 mesh_resolution: 4,
1362 source: Box::new(FlatElevationSource::new(4, 4)),
1363 ..TerrainConfig::default()
1364 };
1365 let mut state = crate::MapState::with_terrain(config, 100);
1366 state.set_viewport(800, 600);
1367 state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
1368 state.set_camera_distance(10_000_000.0);
1369
1370 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1371 state.set_task_pool(pool);
1372 assert!(state.has_async_pipeline());
1373
1374 state.update_with_dt(1.0 / 60.0);
1376
1377 for _ in 0..100 {
1379 std::thread::sleep(std::time::Duration::from_millis(10));
1380 state.update_with_dt(1.0 / 60.0);
1381 if !state.terrain_meshes().is_empty() {
1382 break;
1383 }
1384 }
1385
1386 assert!(
1387 !state.terrain_meshes().is_empty(),
1388 "async pipeline should produce terrain meshes"
1389 );
1390 }
1391
1392 #[test]
1393 fn async_pipeline_vectors_via_map_state() {
1394 use crate::layers::{VectorLayer, VectorStyle};
1395
1396 let mut state = crate::MapState::new();
1397 state.set_viewport(800, 600);
1398 state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
1399 state.set_camera_distance(1_000.0);
1400
1401 let fc = crate::geometry::FeatureCollection {
1402 features: vec![Feature {
1403 geometry: Geometry::Point(Point {
1404 coord: GeoCoord::from_lat_lon(0.0, 0.0),
1405 }),
1406 properties: HashMap::new(),
1407 }],
1408 };
1409 let vl = VectorLayer::new("test-vec", fc, VectorStyle::default());
1410 state.push_layer(Box::new(vl));
1411
1412 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1413 state.set_task_pool(pool);
1414
1415 state.update_with_dt(1.0 / 60.0);
1417
1418 for _ in 0..100 {
1420 std::thread::sleep(std::time::Duration::from_millis(10));
1421 state.update_with_dt(1.0 / 60.0);
1422 if !state.vector_meshes().is_empty() {
1423 break;
1424 }
1425 }
1426
1427 assert!(
1428 !state.vector_meshes().is_empty(),
1429 "async pipeline should produce vector meshes"
1430 );
1431 }
1432
1433 #[test]
1434 fn async_pipeline_clear_reverts_to_sync() {
1435 let mut state = crate::MapState::new();
1436 state.set_viewport(800, 600);
1437 state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
1438 state.set_camera_distance(1_000.0);
1439
1440 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1441 state.set_task_pool(pool);
1442 assert!(state.has_async_pipeline());
1443
1444 state.clear_task_pool();
1445 assert!(!state.has_async_pipeline());
1446
1447 state.update();
1449 assert!(state.zoom_level() <= 22);
1450 }
1451
1452 #[test]
1453 fn data_update_interval_ignored_with_async_pipeline() {
1454 use crate::terrain::{FlatElevationSource, TerrainConfig};
1455
1456 let config = TerrainConfig {
1457 enabled: true,
1458 mesh_resolution: 4,
1459 source: Box::new(FlatElevationSource::new(4, 4)),
1460 ..TerrainConfig::default()
1461 };
1462 let mut state = crate::MapState::with_terrain(config, 100);
1463 state.set_viewport(800, 600);
1464 state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
1465 state.set_camera_distance(10_000_000.0);
1466 state.set_data_update_interval(100.0); let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1469 state.set_task_pool(pool);
1470
1471 for _ in 0..100 {
1474 state.update_with_dt(1.0 / 60.0);
1475 std::thread::sleep(std::time::Duration::from_millis(10));
1476 if !state.terrain_meshes().is_empty() {
1477 break;
1478 }
1479 }
1480
1481 assert!(
1482 !state.terrain_meshes().is_empty(),
1483 "async pipeline should produce terrain despite high throttle"
1484 );
1485 }
1486
1487 #[test]
1492 fn partition_features_by_tile_separates_spatially() {
1493 let features = FeatureCollection {
1494 features: vec![
1495 Feature {
1496 geometry: Geometry::Point(Point {
1497 coord: GeoCoord::from_lat_lon(10.0, 10.0),
1498 }),
1499 properties: HashMap::new(),
1500 },
1501 Feature {
1502 geometry: Geometry::Point(Point {
1503 coord: GeoCoord::from_lat_lon(-30.0, -60.0),
1504 }),
1505 properties: HashMap::new(),
1506 },
1507 Feature {
1508 geometry: Geometry::Point(Point {
1509 coord: GeoCoord::from_lat_lon(10.01, 10.01),
1510 }),
1511 properties: HashMap::new(),
1512 },
1513 ],
1514 };
1515
1516 let buckets = partition_features_by_tile(&features, 4);
1517 let total: usize = buckets.values().map(|fc| fc.len()).sum();
1518 assert_eq!(total, 3, "all features should be partitioned");
1519 let tile_a = geo_to_tile(&GeoCoord::from_lat_lon(10.0, 10.0), 4).tile_id();
1521 let tile_c = geo_to_tile(&GeoCoord::from_lat_lon(10.01, 10.01), 4).tile_id();
1522 assert_eq!(tile_a, tile_c);
1523 assert_eq!(buckets.get(&tile_a).map(|fc| fc.len()), Some(2));
1524 }
1525
1526 #[test]
1527 fn partition_features_handles_linestring_and_polygon() {
1528 let features = FeatureCollection {
1529 features: vec![
1530 Feature {
1531 geometry: Geometry::LineString(LineString {
1532 coords: vec![
1533 GeoCoord::from_lat_lon(45.0, 90.0),
1534 GeoCoord::from_lat_lon(46.0, 91.0),
1535 ],
1536 }),
1537 properties: HashMap::new(),
1538 },
1539 Feature {
1540 geometry: Geometry::Polygon(Polygon {
1541 exterior: vec![
1542 GeoCoord::from_lat_lon(-10.0, -20.0),
1543 GeoCoord::from_lat_lon(-10.0, -19.0),
1544 GeoCoord::from_lat_lon(-9.0, -19.0),
1545 ],
1546 interiors: vec![],
1547 }),
1548 properties: HashMap::new(),
1549 },
1550 ],
1551 };
1552
1553 let buckets = partition_features_by_tile(&features, 6);
1554 let total: usize = buckets.values().map(|fc| fc.len()).sum();
1555 assert_eq!(total, 2);
1556 }
1557
1558 #[test]
1559 fn partition_features_drops_empty_geometry() {
1560 let features = FeatureCollection {
1561 features: vec![Feature {
1562 geometry: Geometry::GeometryCollection(vec![]),
1563 properties: HashMap::new(),
1564 }],
1565 };
1566
1567 let buckets = partition_features_by_tile(&features, 4);
1568 assert!(buckets.is_empty(), "empty geometry should not produce a bucket");
1569 }
1570
1571 #[test]
1572 fn dispatch_and_poll_vector_buckets() {
1573 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1574 let mut pipeline = AsyncDataPipeline::new(pool);
1575
1576 let tile_a = geo_to_tile(&GeoCoord::from_lat_lon(10.0, 10.0), 4).tile_id();
1577 let tile_b = geo_to_tile(&GeoCoord::from_lat_lon(-30.0, -60.0), 4).tile_id();
1578
1579 let features = FeatureCollection {
1580 features: vec![
1581 Feature {
1582 geometry: Geometry::Point(Point {
1583 coord: GeoCoord::from_lat_lon(10.0, 10.0),
1584 }),
1585 properties: HashMap::new(),
1586 },
1587 Feature {
1588 geometry: Geometry::Point(Point {
1589 coord: GeoCoord::from_lat_lon(-30.0, -60.0),
1590 }),
1591 properties: HashMap::new(),
1592 },
1593 ],
1594 };
1595
1596 let tile_features = partition_features_by_tile(&features, 4);
1597 assert!(tile_features.len() >= 2);
1598
1599 pipeline.dispatch_vector_buckets(
1600 "test-layer",
1601 1,
1602 CameraProjection::WebMercator,
1603 &VectorStyle::default(),
1604 &tile_features,
1605 None,
1606 None,
1607 &HashMap::new(),
1608 &[],
1609 );
1610
1611 for _ in 0..100 {
1612 pipeline.poll_vector_buckets();
1613 if !pipeline.has_pending_tasks() {
1614 break;
1615 }
1616 std::thread::sleep(std::time::Duration::from_millis(10));
1617 }
1618
1619 let (meshes_a, _) = pipeline.collect_vector_buckets(
1621 "test-layer", 1, CameraProjection::WebMercator, &[tile_a],
1622 );
1623 assert_eq!(meshes_a.len(), 1, "should have one mesh for tile_a");
1624
1625 let (meshes_b, _) = pipeline.collect_vector_buckets(
1627 "test-layer", 1, CameraProjection::WebMercator, &[tile_b],
1628 );
1629 assert_eq!(meshes_b.len(), 1, "should have one mesh for tile_b");
1630
1631 let (meshes_both, _) = pipeline.collect_vector_buckets(
1633 "test-layer", 1, CameraProjection::WebMercator, &[tile_a, tile_b],
1634 );
1635 assert_eq!(meshes_both.len(), 2, "should have meshes for both tiles");
1636 }
1637
1638 #[test]
1639 fn bucket_dispatch_deduplicates() {
1640 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1641 let mut pipeline = AsyncDataPipeline::new(pool);
1642
1643 let features = FeatureCollection {
1644 features: vec![Feature {
1645 geometry: Geometry::Point(Point {
1646 coord: GeoCoord::from_lat_lon(10.0, 10.0),
1647 }),
1648 properties: HashMap::new(),
1649 }],
1650 };
1651 let tile_features = partition_features_by_tile(&features, 4);
1652
1653 pipeline.dispatch_vector_buckets(
1654 "layer", 1, CameraProjection::WebMercator,
1655 &VectorStyle::default(), &tile_features, None, None, &HashMap::new(), &[],
1656 );
1657 let first_count = pipeline.pending_buckets.len();
1658
1659 pipeline.dispatch_vector_buckets(
1660 "layer", 1, CameraProjection::WebMercator,
1661 &VectorStyle::default(), &tile_features, None, None, &HashMap::new(), &[],
1662 );
1663 assert_eq!(pipeline.pending_buckets.len(), first_count, "duplicate dispatch should be deduped");
1664 }
1665
1666 #[test]
1667 fn bucket_cache_hit_prevents_redispatch() {
1668 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1669 let mut pipeline = AsyncDataPipeline::new(pool);
1670
1671 let features = FeatureCollection {
1672 features: vec![Feature {
1673 geometry: Geometry::Point(Point {
1674 coord: GeoCoord::from_lat_lon(10.0, 10.0),
1675 }),
1676 properties: HashMap::new(),
1677 }],
1678 };
1679 let tile_features = partition_features_by_tile(&features, 4);
1680
1681 pipeline.dispatch_vector_buckets(
1682 "layer", 1, CameraProjection::WebMercator,
1683 &VectorStyle::default(), &tile_features, None, None, &HashMap::new(), &[],
1684 );
1685
1686 for _ in 0..100 {
1687 pipeline.poll_vector_buckets();
1688 if !pipeline.has_pending_tasks() {
1689 break;
1690 }
1691 std::thread::sleep(std::time::Duration::from_millis(10));
1692 }
1693
1694 assert!(pipeline.bucket_cache_len() > 0);
1695
1696 pipeline.dispatch_vector_buckets(
1697 "layer", 1, CameraProjection::WebMercator,
1698 &VectorStyle::default(), &tile_features, None, None, &HashMap::new(), &[],
1699 );
1700 assert!(pipeline.pending_buckets.is_empty(), "cached result should prevent redispatch");
1701 }
1702
1703 #[test]
1704 fn prune_vector_buckets_removes_inactive_layers() {
1705 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1706 let mut pipeline = AsyncDataPipeline::new(pool);
1707
1708 let tile = TileId::new(4, 0, 0);
1709 let key_a = VectorBucketKey {
1710 layer_id: "keep".into(),
1711 tile,
1712 data_generation: 1,
1713 projection: CameraProjection::WebMercator,
1714 };
1715 let key_b = VectorBucketKey {
1716 layer_id: "remove".into(),
1717 tile,
1718 data_generation: 1,
1719 projection: CameraProjection::WebMercator,
1720 };
1721 pipeline.bucket_cache.insert(key_a, (VectorMeshData::default(), vec![]));
1722 pipeline.bucket_cache.insert(key_b, (VectorMeshData::default(), vec![]));
1723 assert_eq!(pipeline.bucket_cache_len(), 2);
1724
1725 pipeline.prune_vector_buckets(&["keep".to_string()]);
1726 assert_eq!(pipeline.bucket_cache_len(), 1);
1727 assert!(pipeline.bucket_cache.keys().all(|k| k.layer_id == "keep"));
1728 }
1729
1730 #[test]
1731 fn prune_vector_buckets_for_layer_removes_offscreen_tiles() {
1732 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1733 let mut pipeline = AsyncDataPipeline::new(pool);
1734
1735 let tile_a = TileId::new(4, 0, 0);
1736 let tile_b = TileId::new(4, 1, 0);
1737 let tile_c = TileId::new(4, 2, 0);
1738
1739 for tile in &[tile_a, tile_b, tile_c] {
1740 pipeline.bucket_cache.insert(
1741 VectorBucketKey {
1742 layer_id: "layer".into(),
1743 tile: *tile,
1744 data_generation: 1,
1745 projection: CameraProjection::WebMercator,
1746 },
1747 (VectorMeshData::default(), vec![]),
1748 );
1749 }
1750 assert_eq!(pipeline.bucket_cache_len(), 3);
1751
1752 pipeline.prune_vector_buckets_for_layer("layer", &[tile_a, tile_c]);
1753 assert_eq!(pipeline.bucket_cache_len(), 2);
1754 assert!(pipeline.bucket_cache.keys().all(|k| k.tile == tile_a || k.tile == tile_c));
1755 }
1756
1757 #[test]
1758 fn collect_vector_buckets_only_returns_visible() {
1759 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1760 let mut pipeline = AsyncDataPipeline::new(pool);
1761
1762 let tile_visible = TileId::new(4, 5, 5);
1763 let tile_offscreen = TileId::new(4, 10, 10);
1764
1765 let mesh = VectorMeshData {
1766 positions: vec![[0.0, 0.0, 0.0], [1.0, 0.0, 0.0], [0.0, 1.0, 0.0]],
1767 colors: vec![[1.0; 4]; 3],
1768 indices: vec![0, 1, 2],
1769 ..Default::default()
1770 };
1771 for tile in &[tile_visible, tile_offscreen] {
1772 pipeline.bucket_cache.insert(
1773 VectorBucketKey {
1774 layer_id: "layer".into(),
1775 tile: *tile,
1776 data_generation: 1,
1777 projection: CameraProjection::WebMercator,
1778 },
1779 (mesh.clone(), vec![]),
1780 );
1781 }
1782
1783 let (meshes, _) = pipeline.collect_vector_buckets(
1784 "layer", 1, CameraProjection::WebMercator, &[tile_visible],
1785 );
1786 assert_eq!(meshes.len(), 1, "only visible tile bucket should be collected");
1787 }
1788
1789 #[test]
1790 fn representative_coord_all_geometry_types() {
1791 let p = Geometry::Point(Point { coord: GeoCoord::from_lat_lon(1.0, 2.0) });
1792 assert_eq!(representative_coord(&p).map(|c| (c.lat, c.lon)), Some((1.0, 2.0)));
1793
1794 let ls = Geometry::LineString(LineString {
1795 coords: vec![GeoCoord::from_lat_lon(3.0, 4.0), GeoCoord::from_lat_lon(5.0, 6.0)],
1796 });
1797 assert_eq!(representative_coord(&ls).map(|c| (c.lat, c.lon)), Some((3.0, 4.0)));
1798
1799 let poly = Geometry::Polygon(Polygon {
1800 exterior: vec![
1801 GeoCoord::from_lat_lon(7.0, 8.0),
1802 GeoCoord::from_lat_lon(7.0, 9.0),
1803 GeoCoord::from_lat_lon(8.0, 9.0),
1804 ],
1805 interiors: vec![],
1806 });
1807 assert_eq!(representative_coord(&poly).map(|c| (c.lat, c.lon)), Some((7.0, 8.0)));
1808
1809 let gc = Geometry::GeometryCollection(vec![]);
1810 assert!(representative_coord(&gc).is_none());
1811
1812 let gc_with = Geometry::GeometryCollection(vec![p.clone()]);
1813 assert!(representative_coord(&gc_with).is_some());
1814 }
1815
1816 #[test]
1817 fn dispatch_and_poll_decode_produces_decoded_tile() {
1818 use crate::tile_source::{RawVectorPayload, TileFreshness};
1819 use std::sync::Arc;
1820
1821 fn build_test_mvt() -> Vec<u8> {
1823 fn encode_varint(mut val: u64) -> Vec<u8> {
1824 let mut buf = Vec::new();
1825 loop {
1826 let mut byte = (val & 0x7F) as u8;
1827 val >>= 7;
1828 if val != 0 { byte |= 0x80; }
1829 buf.push(byte);
1830 if val == 0 { break; }
1831 }
1832 buf
1833 }
1834 fn encode_tag(field: u32, wt: u8) -> Vec<u8> {
1835 encode_varint(((field as u64) << 3) | wt as u64)
1836 }
1837 fn encode_ld(field: u32, data: &[u8]) -> Vec<u8> {
1838 let mut b = encode_tag(field, 2);
1839 b.extend(encode_varint(data.len() as u64));
1840 b.extend_from_slice(data);
1841 b
1842 }
1843 fn encode_vi(field: u32, val: u64) -> Vec<u8> {
1844 let mut b = encode_tag(field, 0);
1845 b.extend(encode_varint(val));
1846 b
1847 }
1848 fn zigzag(n: i32) -> u32 { ((n << 1) ^ (n >> 31)) as u32 }
1849
1850 let mut geom = Vec::new();
1851 geom.extend(encode_varint(((1u64) << 3) | 1));
1852 geom.extend(encode_varint(zigzag(2048) as u64));
1853 geom.extend(encode_varint(zigzag(2048) as u64));
1854
1855 let mut feat = Vec::new();
1856 feat.extend(encode_vi(3, 1));
1857 feat.extend(encode_ld(4, &geom));
1858
1859 let mut layer = Vec::new();
1860 layer.extend(encode_ld(1, b"test"));
1861 layer.extend(encode_ld(2, &feat));
1862 layer.extend(encode_vi(5, 4096));
1863 layer.extend(encode_vi(15, 2));
1864
1865 encode_ld(3, &layer)
1866 }
1867
1868 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1869 let mut pipeline = AsyncDataPipeline::new(pool);
1870
1871 let tile = TileId::new(0, 0, 0);
1872 let raw = RawVectorPayload {
1873 tile_id: tile,
1874 bytes: Arc::new(build_test_mvt()),
1875 decode_options: crate::mvt::MvtDecodeOptions::default(),
1876 };
1877
1878 pipeline.dispatch_decode(tile, &raw, TileFreshness::default());
1879
1880 for _ in 0..50 {
1882 pipeline.poll_decodes();
1883 if !pipeline.decoded_tiles.is_empty() {
1884 break;
1885 }
1886 std::thread::sleep(std::time::Duration::from_millis(10));
1887 }
1888
1889 let decoded = pipeline.take_decoded_tiles();
1890 assert_eq!(decoded.len(), 1, "should have one decoded tile");
1891 let (decoded_id, response) = &decoded[0];
1892 assert_eq!(*decoded_id, tile);
1893 assert!(response.data.is_vector(), "decoded data should be Vector");
1894 let vt = response.data.as_vector().expect("should be Vector");
1895 assert!(vt.layers.contains_key("test"));
1896 assert_eq!(vt.layers["test"].len(), 1);
1897 }
1898
1899 #[test]
1900 fn dispatch_decode_deduplicates() {
1901 use crate::tile_source::{RawVectorPayload, TileFreshness};
1902 use std::sync::Arc;
1903
1904 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1905 let mut pipeline = AsyncDataPipeline::new(pool);
1906
1907 let tile = TileId::new(0, 0, 0);
1908 let raw = RawVectorPayload {
1909 tile_id: tile,
1910 bytes: Arc::new(vec![]),
1911 decode_options: crate::mvt::MvtDecodeOptions::default(),
1912 };
1913
1914 pipeline.dispatch_decode(tile, &raw, TileFreshness::default());
1915 pipeline.dispatch_decode(tile, &raw, TileFreshness::default());
1916
1917 assert_eq!(pipeline.pending_decodes.len(), 1);
1919 }
1920
1921 #[test]
1926 fn async_viz_pipeline_dispatches_and_receives_scalar_update() {
1927 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1928 let mut viz = AsyncVisualizationPipeline::new(pool);
1929
1930 viz.dispatch(Box::new(|| {
1931 VisualizationTaskOutput::ScalarFieldUpdate {
1932 layer_name: "density".to_string(),
1933 values: vec![1.0, 2.0, 3.0, 4.0],
1934 }
1935 }));
1936
1937 assert_eq!(viz.pending_count(), 1);
1938
1939 let mut results = Vec::new();
1941 for _ in 0..200 {
1942 results = viz.poll();
1943 if !results.is_empty() {
1944 break;
1945 }
1946 std::thread::sleep(std::time::Duration::from_millis(5));
1947 }
1948
1949 assert_eq!(results.len(), 1);
1950 match &results[0] {
1951 VisualizationTaskOutput::ScalarFieldUpdate { layer_name, values } => {
1952 assert_eq!(layer_name, "density");
1953 assert_eq!(values, &[1.0, 2.0, 3.0, 4.0]);
1954 }
1955 _ => panic!("expected ScalarFieldUpdate"),
1956 }
1957 assert_eq!(viz.pending_count(), 0);
1958 }
1959
1960 #[test]
1961 fn async_viz_pipeline_dispatches_and_receives_column_update() {
1962 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1963 let mut viz = AsyncVisualizationPipeline::new(pool);
1964
1965 viz.dispatch(Box::new(|| {
1966 VisualizationTaskOutput::ColumnUpdate {
1967 layer_name: "bars".to_string(),
1968 columns: crate::visualization::ColumnInstanceSet::new(vec![
1969 crate::visualization::ColumnInstance::new(
1970 GeoCoord::from_lat_lon(0.0, 0.0),
1971 100.0,
1972 10.0,
1973 ),
1974 ]),
1975 ramp: crate::visualization::ColorRamp::new(vec![
1976 crate::visualization::ColorStop { value: 0.0, color: [0.0, 0.0, 1.0, 1.0] },
1977 crate::visualization::ColorStop { value: 1.0, color: [1.0, 0.0, 0.0, 1.0] },
1978 ]),
1979 }
1980 }));
1981
1982 let mut results = Vec::new();
1983 for _ in 0..200 {
1984 results = viz.poll();
1985 if !results.is_empty() { break; }
1986 std::thread::sleep(std::time::Duration::from_millis(5));
1987 }
1988
1989 assert_eq!(results.len(), 1);
1990 match &results[0] {
1991 VisualizationTaskOutput::ColumnUpdate { layer_name, columns, .. } => {
1992 assert_eq!(layer_name, "bars");
1993 assert_eq!(columns.columns.len(), 1);
1994 }
1995 _ => panic!("expected ColumnUpdate"),
1996 }
1997 }
1998
1999 #[test]
2000 fn async_viz_pipeline_dispatches_and_receives_point_cloud_update() {
2001 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
2002 let mut viz = AsyncVisualizationPipeline::new(pool);
2003
2004 viz.dispatch(Box::new(|| {
2005 VisualizationTaskOutput::PointCloudUpdate {
2006 layer_name: "scatter".to_string(),
2007 points: crate::visualization::PointInstanceSet::new(vec![
2008 crate::visualization::PointInstance::new(
2009 GeoCoord::from_lat_lon(1.0, 2.0),
2010 5.0,
2011 ),
2012 crate::visualization::PointInstance::new(
2013 GeoCoord::from_lat_lon(3.0, 4.0),
2014 8.0,
2015 ),
2016 ]),
2017 ramp: crate::visualization::ColorRamp::new(vec![
2018 crate::visualization::ColorStop { value: 0.0, color: [0.0, 1.0, 0.0, 1.0] },
2019 crate::visualization::ColorStop { value: 1.0, color: [1.0, 1.0, 0.0, 1.0] },
2020 ]),
2021 }
2022 }));
2023
2024 let mut results = Vec::new();
2025 for _ in 0..200 {
2026 results = viz.poll();
2027 if !results.is_empty() { break; }
2028 std::thread::sleep(std::time::Duration::from_millis(5));
2029 }
2030
2031 assert_eq!(results.len(), 1);
2032 match &results[0] {
2033 VisualizationTaskOutput::PointCloudUpdate { layer_name, points, .. } => {
2034 assert_eq!(layer_name, "scatter");
2035 assert_eq!(points.points.len(), 2);
2036 }
2037 _ => panic!("expected PointCloudUpdate"),
2038 }
2039 }
2040
2041 #[test]
2042 fn async_viz_pipeline_multiple_tasks_complete_independently() {
2043 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
2044 let mut viz = AsyncVisualizationPipeline::new(pool);
2045
2046 for i in 0..5 {
2047 let name = format!("layer_{i}");
2048 viz.dispatch(Box::new(move || {
2049 VisualizationTaskOutput::ScalarFieldUpdate {
2050 layer_name: name,
2051 values: vec![i as f32; 4],
2052 }
2053 }));
2054 }
2055
2056 assert_eq!(viz.pending_count(), 5);
2057
2058 let mut all_results = Vec::new();
2059 for _ in 0..200 {
2060 let batch = viz.poll();
2061 all_results.extend(batch);
2062 if all_results.len() >= 5 { break; }
2063 std::thread::sleep(std::time::Duration::from_millis(10));
2064 }
2065
2066 assert_eq!(all_results.len(), 5);
2067 assert_eq!(viz.pending_count(), 0);
2068 }
2069}