1use crate::camera_projection::CameraProjection;
40use crate::geometry::{Feature, FeatureCollection, Geometry};
41use crate::layers::{FeatureProvenance, VectorMeshData, VectorStyle};
42use crate::symbols::SymbolCandidate;
43use crate::terrain::{PreparedHillshadeRaster, TerrainMeshData};
44use crate::tile_manager::TileTextureRegion;
45use rustial_math::{geo_to_tile, ElevationGrid, GeoCoord, TileId};
46use std::collections::HashMap;
47use std::sync::Arc;
48
49pub trait DataTaskResultReceiver<T: Send + 'static>: Send + Sync {
58 fn try_recv(&self) -> Option<T>;
62}
63
64pub trait DataTaskPool: Send + Sync {
83 fn spawn_terrain(
85 &self,
86 task: Box<dyn FnOnce() -> TerrainTaskOutput + Send + 'static>,
87 ) -> Box<dyn DataTaskResultReceiver<TerrainTaskOutput>>;
88
89 fn spawn_vector(
91 &self,
92 task: Box<dyn FnOnce() -> VectorTaskOutput + Send + 'static>,
93 ) -> Box<dyn DataTaskResultReceiver<VectorTaskOutput>>;
94
95 fn spawn_decode(
100 &self,
101 task: Box<dyn FnOnce() -> MvtDecodeOutput + Send + 'static>,
102 ) -> Box<dyn DataTaskResultReceiver<MvtDecodeOutput>>;
103}
104
105pub struct ThreadDataTaskPool;
117
118impl ThreadDataTaskPool {
119 pub fn new() -> Self {
121 Self
122 }
123}
124
125impl Default for ThreadDataTaskPool {
126 fn default() -> Self {
127 Self::new()
128 }
129}
130
131struct ChannelReceiver<T> {
132 rx: std::sync::mpsc::Receiver<T>,
133}
134
135impl<T: Send + 'static> DataTaskResultReceiver<T> for ChannelReceiver<T> {
136 fn try_recv(&self) -> Option<T> {
137 self.rx.try_recv().ok()
138 }
139}
140
141unsafe impl<T: Send> Sync for ChannelReceiver<T> {}
146
147fn spawn_on_thread<T: Send + 'static>(
148 task: Box<dyn FnOnce() -> T + Send + 'static>,
149) -> Box<dyn DataTaskResultReceiver<T>> {
150 let (tx, rx) = std::sync::mpsc::channel();
151 std::thread::spawn(move || {
152 let result = task();
153 let _ = tx.send(result);
154 });
155 Box::new(ChannelReceiver { rx })
156}
157
158impl DataTaskPool for ThreadDataTaskPool {
159 fn spawn_terrain(
160 &self,
161 task: Box<dyn FnOnce() -> TerrainTaskOutput + Send + 'static>,
162 ) -> Box<dyn DataTaskResultReceiver<TerrainTaskOutput>> {
163 spawn_on_thread(task)
164 }
165
166 fn spawn_vector(
167 &self,
168 task: Box<dyn FnOnce() -> VectorTaskOutput + Send + 'static>,
169 ) -> Box<dyn DataTaskResultReceiver<VectorTaskOutput>> {
170 spawn_on_thread(task)
171 }
172
173 fn spawn_decode(
174 &self,
175 task: Box<dyn FnOnce() -> MvtDecodeOutput + Send + 'static>,
176 ) -> Box<dyn DataTaskResultReceiver<MvtDecodeOutput>> {
177 spawn_on_thread(task)
178 }
179}
180
181#[derive(Clone)]
187pub struct TerrainTaskInput {
188 pub tile: TileId,
190 pub elevation_source_tile: TileId,
192 pub elevation_region: TileTextureRegion,
194 pub elevation: ElevationGrid,
196 pub resolution: u16,
198 pub vertical_exaggeration: f64,
200 pub generation: u64,
202}
203
204pub struct TerrainTaskOutput {
206 pub tile: TileId,
208 pub mesh: TerrainMeshData,
210 pub hillshade: PreparedHillshadeRaster,
212 pub generation: u64,
214}
215
216#[derive(Debug, Clone, PartialEq, Eq, Hash)]
218pub struct TerrainCacheKey {
219 pub tile: TileId,
221 pub generation: u64,
223 pub resolution: u16,
225}
226
227#[derive(Clone)]
229pub struct VectorTaskInput {
230 pub cache_key: VectorCacheKey,
232 pub features: FeatureCollection,
234 pub style: VectorStyle,
236 pub query_layer_id: Option<String>,
238 pub query_source_id: Option<String>,
240 pub query_source_layer: Option<String>,
242 pub feature_provenance: Vec<Option<FeatureProvenance>>,
244 pub projection: CameraProjection,
246 pub terrain_samples: Vec<(GeoCoord, f64)>,
249}
250
251pub struct VectorTaskOutput {
253 pub cache_key: VectorCacheKey,
255 pub mesh: VectorMeshData,
257 pub symbol_candidates: Vec<SymbolCandidate>,
259}
260
261pub struct MvtDecodeOutput {
267 pub tile: TileId,
269 pub result: Result<crate::tile_source::VectorTileData, crate::tile_source::TileError>,
271 pub freshness: crate::tile_source::TileFreshness,
274}
275
276#[derive(Debug, Clone, PartialEq, Eq, Hash)]
278pub struct VectorCacheKey {
279 pub layer_id: String,
281 pub data_generation: u64,
283 pub projection: CameraProjection,
285}
286
287#[derive(Debug, Clone, PartialEq, Eq, Hash)]
297pub struct VectorBucketKey {
298 pub layer_id: String,
300 pub tile: TileId,
302 pub data_generation: u64,
304 pub projection: CameraProjection,
306}
307
308fn representative_coord(geometry: &Geometry) -> Option<GeoCoord> {
316 match geometry {
317 Geometry::Point(p) => Some(p.coord),
318 Geometry::LineString(ls) => ls.coords.first().copied(),
319 Geometry::Polygon(poly) => poly.exterior.first().copied(),
320 Geometry::MultiPoint(mp) => mp.points.first().map(|p| p.coord),
321 Geometry::MultiLineString(mls) => {
322 mls.lines.first().and_then(|ls| ls.coords.first().copied())
323 }
324 Geometry::MultiPolygon(mpoly) => mpoly
325 .polygons
326 .first()
327 .and_then(|p| p.exterior.first().copied()),
328 Geometry::GeometryCollection(geoms) => geoms.iter().find_map(representative_coord),
329 }
330}
331
332pub fn partition_features_by_tile(
347 features: &FeatureCollection,
348 zoom: u8,
349) -> HashMap<TileId, FeatureCollection> {
350 let mut buckets: HashMap<TileId, Vec<Feature>> = HashMap::new();
351 for feature in &features.features {
352 if let Some(coord) = representative_coord(&feature.geometry) {
353 let tile_coord = geo_to_tile(&coord, zoom);
354 let tile_id = tile_coord.tile_id();
355 buckets.entry(tile_id).or_default().push(feature.clone());
356 }
357 }
358 buckets
359 .into_iter()
360 .map(|(tile, feats)| (tile, FeatureCollection { features: feats }))
361 .collect()
362}
363
364struct PendingTerrainTask {
369 key: TerrainCacheKey,
370 receiver: Box<dyn DataTaskResultReceiver<TerrainTaskOutput>>,
371}
372
373struct PendingVectorTask {
374 key: VectorCacheKey,
375 receiver: Box<dyn DataTaskResultReceiver<VectorTaskOutput>>,
376}
377
378struct PendingVectorBucketTask {
379 key: VectorBucketKey,
380 receiver: Box<dyn DataTaskResultReceiver<VectorBucketOutput>>,
381}
382
383struct PendingDecodeTask {
384 tile: TileId,
385 receiver: Box<dyn DataTaskResultReceiver<MvtDecodeOutput>>,
386}
387
388struct VectorBucketOutput {
390 mesh: VectorMeshData,
391 symbol_candidates: Vec<SymbolCandidate>,
392}
393
394pub(crate) struct AsyncDataPipeline {
404 pool: Arc<dyn DataTaskPool>,
405 pending_terrain: Vec<PendingTerrainTask>,
406 pending_vector: Vec<PendingVectorTask>,
407 terrain_cache: HashMap<TerrainCacheKey, (TerrainMeshData, PreparedHillshadeRaster)>,
408 vector_cache: HashMap<VectorCacheKey, (VectorMeshData, Vec<SymbolCandidate>)>,
409 next_vector_generation: u64,
410 layer_generations: HashMap<String, u64>,
411 pending_buckets: Vec<PendingVectorBucketTask>,
413 bucket_cache: HashMap<VectorBucketKey, (VectorMeshData, Vec<SymbolCandidate>)>,
414 pending_decodes: Vec<PendingDecodeTask>,
416 pub(crate) decoded_tiles: Vec<(TileId, crate::tile_source::TileResponse)>,
422}
423
424impl AsyncDataPipeline {
425 pub(crate) fn new(pool: Arc<dyn DataTaskPool>) -> Self {
426 Self {
427 pool,
428 pending_terrain: Vec::new(),
429 pending_vector: Vec::new(),
430 terrain_cache: HashMap::new(),
431 vector_cache: HashMap::new(),
432 next_vector_generation: 1,
433 layer_generations: HashMap::new(),
434 pending_buckets: Vec::new(),
435 bucket_cache: HashMap::new(),
436 pending_decodes: Vec::new(),
437 decoded_tiles: Vec::new(),
438 }
439 }
440
441 pub(crate) fn dispatch_terrain(&mut self, input: TerrainTaskInput) {
444 let key = TerrainCacheKey {
445 tile: input.tile,
446 generation: input.generation,
447 resolution: input.resolution,
448 };
449 if self.terrain_cache.contains_key(&key) {
450 return;
451 }
452 if self.pending_terrain.iter().any(|p| p.key == key) {
453 return;
454 }
455 let receiver = self.pool.spawn_terrain(Box::new(move || {
456 let mesh = crate::terrain::build_terrain_descriptor_with_source(
457 &input.tile,
458 input.elevation_source_tile,
459 input.elevation_region,
460 &input.elevation,
461 input.resolution,
462 input.vertical_exaggeration,
463 input.generation,
464 );
465 let hillshade = crate::terrain::prepare_hillshade_raster(
466 &input.elevation,
467 input.vertical_exaggeration,
468 input.generation,
469 );
470 TerrainTaskOutput {
471 tile: input.tile,
472 mesh,
473 hillshade,
474 generation: input.generation,
475 }
476 }));
477 self.pending_terrain
478 .push(PendingTerrainTask { key, receiver });
479 }
480
481 pub(crate) fn poll_terrain(&mut self) {
482 let mut still_pending = Vec::new();
483 for task in self.pending_terrain.drain(..) {
484 if let Some(output) = task.receiver.try_recv() {
485 self.terrain_cache
486 .insert(task.key, (output.mesh, output.hillshade));
487 } else {
488 still_pending.push(task);
489 }
490 }
491 self.pending_terrain = still_pending;
492 }
493
494 #[allow(dead_code)]
495 pub(crate) fn collect_terrain(
496 &self,
497 desired_tiles: &[TileId],
498 tile_generations: &HashMap<TileId, u64>,
499 resolution: u16,
500 ) -> (Vec<TerrainMeshData>, Vec<PreparedHillshadeRaster>) {
501 let mut meshes = Vec::with_capacity(desired_tiles.len());
502 let mut hillshades = Vec::with_capacity(desired_tiles.len());
503 for tile in desired_tiles {
504 let generation = tile_generations.get(tile).copied().unwrap_or(0);
505 let key = TerrainCacheKey {
506 tile: *tile,
507 generation,
508 resolution,
509 };
510 if let Some((mesh, hs)) = self.terrain_cache.get(&key) {
511 meshes.push(mesh.clone());
512 hillshades.push(hs.clone());
513 }
514 }
515 (meshes, hillshades)
516 }
517
518 pub(crate) fn collect_all_terrain(
520 &self,
521 ) -> (Vec<TerrainMeshData>, Vec<PreparedHillshadeRaster>) {
522 let mut meshes = Vec::with_capacity(self.terrain_cache.len());
523 let mut hillshades = Vec::with_capacity(self.terrain_cache.len());
524 for (mesh, hs) in self.terrain_cache.values() {
525 meshes.push(mesh.clone());
526 hillshades.push(hs.clone());
527 }
528 (meshes, hillshades)
529 }
530
531 pub(crate) fn prune_terrain(&mut self, visible_tiles: &[TileId]) {
532 let visible_set: std::collections::HashSet<TileId> =
533 visible_tiles.iter().copied().collect();
534 self.terrain_cache
535 .retain(|key, _| visible_set.contains(&key.tile));
536 }
537
538 pub(crate) fn layer_generation(&mut self, layer_id: &str, features_changed: bool) -> u64 {
541 if features_changed {
542 let gen = self.next_vector_generation;
543 self.next_vector_generation += 1;
544 self.layer_generations.insert(layer_id.to_owned(), gen);
545 gen
546 } else {
547 self.layer_generations
548 .get(layer_id)
549 .copied()
550 .unwrap_or_else(|| {
551 let gen = self.next_vector_generation;
552 self.next_vector_generation += 1;
553 self.layer_generations.insert(layer_id.to_owned(), gen);
554 gen
555 })
556 }
557 }
558
559 pub(crate) fn dispatch_vector(&mut self, input: VectorTaskInput) {
560 let key = input.cache_key.clone();
561 if self.vector_cache.contains_key(&key) {
562 return;
563 }
564 if self.pending_vector.iter().any(|p| p.key == key) {
565 return;
566 }
567 let receiver = self.pool.spawn_vector(Box::new(move || {
568 let mut features = input.features;
569 if !input.terrain_samples.is_empty() {
570 apply_terrain_samples(&mut features, &input.terrain_samples);
571 }
572 let mut temp_layer = crate::layers::VectorLayer::new(
573 &input.cache_key.layer_id,
574 FeatureCollection::default(),
575 input.style,
576 )
577 .with_query_metadata(input.query_layer_id, input.query_source_id)
578 .with_source_layer(input.query_source_layer);
579 temp_layer.set_features_with_provenance(features, input.feature_provenance);
580 let mesh = temp_layer.tessellate(input.projection);
581 let candidates = temp_layer.symbol_candidates();
582 VectorTaskOutput {
583 cache_key: input.cache_key,
584 mesh,
585 symbol_candidates: candidates,
586 }
587 }));
588 self.pending_vector
589 .push(PendingVectorTask { key, receiver });
590 }
591
592 pub(crate) fn poll_vector(&mut self) {
593 let mut still_pending = Vec::new();
594 for task in self.pending_vector.drain(..) {
595 if let Some(output) = task.receiver.try_recv() {
596 self.vector_cache
597 .insert(task.key, (output.mesh, output.symbol_candidates));
598 } else {
599 still_pending.push(task);
600 }
601 }
602 self.pending_vector = still_pending;
603 }
604
605 pub(crate) fn collect_vectors(
606 &self,
607 layer_keys: &[VectorCacheKey],
608 ) -> (Vec<VectorMeshData>, Vec<SymbolCandidate>) {
609 let mut meshes = Vec::new();
610 let mut candidates = Vec::new();
611 for key in layer_keys {
612 if let Some((mesh, cands)) = self.vector_cache.get(key) {
613 if !mesh.is_empty() {
614 meshes.push(mesh.clone());
615 }
616 candidates.extend(cands.iter().cloned());
617 }
618 }
619 (meshes, candidates)
620 }
621
622 pub(crate) fn prune_vectors(&mut self, active_layer_ids: &[String]) {
623 let active_set: std::collections::HashSet<&str> =
624 active_layer_ids.iter().map(String::as_str).collect();
625 self.vector_cache
626 .retain(|key, _| active_set.contains(key.layer_id.as_str()));
627 self.layer_generations
628 .retain(|id, _| active_set.contains(id.as_str()));
629 }
630
631 #[cfg(test)]
632 pub(crate) fn current_layer_generation(&self, layer_id: &str) -> Option<u64> {
633 self.layer_generations.get(layer_id).copied()
634 }
635
636 #[allow(clippy::too_many_arguments)]
645 pub(crate) fn dispatch_vector_buckets(
646 &mut self,
647 layer_id: &str,
648 data_generation: u64,
649 projection: CameraProjection,
650 style: &VectorStyle,
651 tile_features: &HashMap<TileId, FeatureCollection>,
652 query_source_id: Option<&str>,
653 query_source_layer: Option<&str>,
654 tile_provenance: &HashMap<TileId, Vec<Option<FeatureProvenance>>>,
655 terrain_samples: &[(GeoCoord, f64)],
656 ) {
657 for (tile, features) in tile_features {
658 let key = VectorBucketKey {
659 layer_id: layer_id.to_owned(),
660 tile: *tile,
661 data_generation,
662 projection,
663 };
664 if self.bucket_cache.contains_key(&key) {
665 continue;
666 }
667 if self.pending_buckets.iter().any(|p| p.key == key) {
668 continue;
669 }
670 let task_key = key.clone();
671 let task_features = features.clone();
672 let task_style = style.clone();
673 let task_samples = terrain_samples.to_vec();
674 let task_layer_id = layer_id.to_owned();
675 let task_source_id = query_source_id.map(ToOwned::to_owned);
676 let task_source_layer = query_source_layer.map(ToOwned::to_owned);
677 let task_provenance = tile_provenance.get(tile).cloned().unwrap_or_default();
678
679 let receiver = self.pool.spawn_vector(Box::new(move || {
680 let mut feats = task_features;
681 if !task_samples.is_empty() {
682 apply_terrain_samples(&mut feats, &task_samples);
683 }
684 let mut temp_layer = crate::layers::VectorLayer::new(
685 &task_layer_id,
686 FeatureCollection::default(),
687 task_style,
688 )
689 .with_query_metadata(Some(task_layer_id.clone()), task_source_id)
690 .with_source_layer(task_source_layer);
691 temp_layer.set_features_with_provenance(feats, task_provenance);
692 let mesh = temp_layer.tessellate(task_key.projection);
693 let candidates = temp_layer.symbol_candidates();
694 VectorTaskOutput {
695 cache_key: VectorCacheKey {
696 layer_id: task_key.layer_id.clone(),
697 data_generation: task_key.data_generation,
698 projection: task_key.projection,
699 },
700 mesh,
701 symbol_candidates: candidates,
702 }
703 }));
704
705 self.pending_buckets.push(PendingVectorBucketTask {
707 key,
708 receiver: Box::new(BucketReceiverAdapter { inner: receiver }),
709 });
710 }
711 }
712
713 pub(crate) fn poll_vector_buckets(&mut self) {
715 let mut still_pending = Vec::new();
716 for task in self.pending_buckets.drain(..) {
717 if let Some(output) = task.receiver.try_recv() {
718 self.bucket_cache
719 .insert(task.key, (output.mesh, output.symbol_candidates));
720 } else {
721 still_pending.push(task);
722 }
723 }
724 self.pending_buckets = still_pending;
725 }
726
727 pub(crate) fn dispatch_decode(
734 &mut self,
735 tile: TileId,
736 raw: &crate::tile_source::RawVectorPayload,
737 freshness: crate::tile_source::TileFreshness,
738 ) {
739 if self.pending_decodes.iter().any(|t| t.tile == tile) {
741 return;
742 }
743
744 let bytes = Arc::clone(&raw.bytes);
745 let opts = raw.decode_options.clone();
746 let task_tile = tile;
747
748 let receiver = self.pool.spawn_decode(Box::new(move || {
749 let result = crate::mvt::decode_mvt(&bytes, &task_tile, &opts)
750 .map(|layers| crate::tile_source::VectorTileData { layers })
751 .map_err(|e| crate::tile_source::TileError::Decode(format!("MVT decode: {e}")));
752 MvtDecodeOutput {
753 tile: task_tile,
754 result,
755 freshness,
756 }
757 }));
758
759 self.pending_decodes
760 .push(PendingDecodeTask { tile, receiver });
761 }
762
763 pub(crate) fn poll_decodes(&mut self) {
769 let mut still_pending = Vec::new();
770 for task in self.pending_decodes.drain(..) {
771 if let Some(output) = task.receiver.try_recv() {
772 match output.result {
773 Ok(vector_data) => {
774 self.decoded_tiles.push((
775 output.tile,
776 crate::tile_source::TileResponse {
777 data: crate::tile_source::TileData::Vector(vector_data),
778 freshness: output.freshness,
779 not_modified: false,
780 },
781 ));
782 }
783 Err(_err) => {
784 }
788 }
789 } else {
790 still_pending.push(task);
791 }
792 }
793 self.pending_decodes = still_pending;
794 }
795
796 pub(crate) fn take_decoded_tiles(&mut self) -> Vec<(TileId, crate::tile_source::TileResponse)> {
798 std::mem::take(&mut self.decoded_tiles)
799 }
800
801 #[allow(dead_code)]
803 pub(crate) fn pending_decode_count(&self) -> usize {
804 self.pending_decodes.len()
805 }
806
807 pub(crate) fn collect_vector_buckets(
813 &self,
814 layer_id: &str,
815 data_generation: u64,
816 projection: CameraProjection,
817 visible_tiles: &[TileId],
818 ) -> (Vec<VectorMeshData>, Vec<SymbolCandidate>) {
819 let mut meshes = Vec::new();
820 let mut candidates = Vec::new();
821 for tile in visible_tiles {
822 let key = VectorBucketKey {
823 layer_id: layer_id.to_owned(),
824 tile: *tile,
825 data_generation,
826 projection,
827 };
828 if let Some((mesh, cands)) = self.bucket_cache.get(&key) {
829 if !mesh.is_empty() {
830 meshes.push(mesh.clone());
831 }
832 candidates.extend(cands.iter().cloned());
833 }
834 }
835 (meshes, candidates)
836 }
837
838 #[allow(dead_code)]
844 pub(crate) fn prune_vector_buckets(&mut self, active_layer_ids: &[String]) {
845 let active_set: std::collections::HashSet<&str> =
846 active_layer_ids.iter().map(String::as_str).collect();
847 self.bucket_cache
848 .retain(|key, _| active_set.contains(key.layer_id.as_str()));
849 }
850
851 pub(crate) fn evict_vector_buckets(&mut self, layer_id: &str, tiles: &[TileId]) {
852 if tiles.is_empty() {
853 return;
854 }
855 let tile_set: std::collections::HashSet<TileId> = tiles.iter().copied().collect();
856 self.bucket_cache
857 .retain(|key, _| key.layer_id != layer_id || !tile_set.contains(&key.tile));
858 self.pending_buckets
859 .retain(|task| task.key.layer_id != layer_id || !tile_set.contains(&task.key.tile));
860 }
861
862 #[allow(dead_code)]
868 pub(crate) fn prune_vector_buckets_for_layer(
869 &mut self,
870 layer_id: &str,
871 visible_tiles: &[TileId],
872 ) {
873 let visible_set: std::collections::HashSet<TileId> =
874 visible_tiles.iter().copied().collect();
875 self.bucket_cache
876 .retain(|key, _| key.layer_id != layer_id || visible_set.contains(&key.tile));
877 }
878
879 #[allow(dead_code)]
881 pub(crate) fn bucket_cache_len(&self) -> usize {
882 self.bucket_cache.len()
883 }
884
885 #[allow(dead_code)]
888 pub(crate) fn has_pending_tasks(&self) -> bool {
889 !self.pending_terrain.is_empty()
890 || !self.pending_vector.is_empty()
891 || !self.pending_buckets.is_empty()
892 }
893}
894
895struct BucketReceiverAdapter {
898 inner: Box<dyn DataTaskResultReceiver<VectorTaskOutput>>,
899}
900
901impl DataTaskResultReceiver<VectorBucketOutput> for BucketReceiverAdapter {
902 fn try_recv(&self) -> Option<VectorBucketOutput> {
903 self.inner.try_recv().map(|output| VectorBucketOutput {
904 mesh: output.mesh,
905 symbol_candidates: output.symbol_candidates,
906 })
907 }
908}
909
910fn apply_terrain_samples(features: &mut FeatureCollection, samples: &[(GeoCoord, f64)]) {
911 let lookup: HashMap<(i64, i64), f64> = samples
912 .iter()
913 .map(|(coord, elev)| {
914 let key = (
915 (coord.lat * 100_000.0).round() as i64,
916 (coord.lon * 100_000.0).round() as i64,
917 );
918 (key, *elev)
919 })
920 .collect();
921 for feature in &mut features.features {
922 apply_terrain_to_geometry(&mut feature.geometry, &lookup);
923 }
924}
925
926fn apply_terrain_to_geometry(
927 geometry: &mut crate::geometry::Geometry,
928 lookup: &HashMap<(i64, i64), f64>,
929) {
930 use crate::geometry::Geometry;
931 match geometry {
932 Geometry::Point(p) => {
933 if let Some(&elev) = lookup.get(&coord_key(&p.coord)) {
934 p.coord.alt = elev;
935 }
936 }
937 Geometry::LineString(ls) => {
938 for coord in &mut ls.coords {
939 if let Some(&elev) = lookup.get(&coord_key(coord)) {
940 coord.alt = elev;
941 }
942 }
943 }
944 Geometry::Polygon(poly) => {
945 for coord in &mut poly.exterior {
946 if let Some(&elev) = lookup.get(&coord_key(coord)) {
947 coord.alt = elev;
948 }
949 }
950 for hole in &mut poly.interiors {
951 for coord in hole {
952 if let Some(&elev) = lookup.get(&coord_key(coord)) {
953 coord.alt = elev;
954 }
955 }
956 }
957 }
958 Geometry::MultiPoint(mp) => {
959 for p in &mut mp.points {
960 if let Some(&elev) = lookup.get(&coord_key(&p.coord)) {
961 p.coord.alt = elev;
962 }
963 }
964 }
965 Geometry::MultiLineString(mls) => {
966 for ls in &mut mls.lines {
967 for coord in &mut ls.coords {
968 if let Some(&elev) = lookup.get(&coord_key(coord)) {
969 coord.alt = elev;
970 }
971 }
972 }
973 }
974 Geometry::MultiPolygon(mpoly) => {
975 for poly in &mut mpoly.polygons {
976 for coord in &mut poly.exterior {
977 if let Some(&elev) = lookup.get(&coord_key(coord)) {
978 coord.alt = elev;
979 }
980 }
981 for hole in &mut poly.interiors {
982 for coord in hole {
983 if let Some(&elev) = lookup.get(&coord_key(coord)) {
984 coord.alt = elev;
985 }
986 }
987 }
988 }
989 }
990 Geometry::GeometryCollection(geoms) => {
991 for g in geoms {
992 apply_terrain_to_geometry(g, lookup);
993 }
994 }
995 }
996}
997
998fn coord_key(coord: &GeoCoord) -> (i64, i64) {
999 (
1000 (coord.lat * 100_000.0).round() as i64,
1001 (coord.lon * 100_000.0).round() as i64,
1002 )
1003}
1004
1005pub enum VisualizationTaskOutput {
1014 ScalarFieldUpdate {
1016 layer_name: String,
1018 values: Vec<f32>,
1020 },
1021 ColumnUpdate {
1023 layer_name: String,
1025 columns: crate::visualization::ColumnInstanceSet,
1027 ramp: crate::visualization::ColorRamp,
1029 },
1030 PointCloudUpdate {
1032 layer_name: String,
1034 points: crate::visualization::PointInstanceSet,
1036 ramp: crate::visualization::ColorRamp,
1038 },
1039}
1040
1041pub struct AsyncVisualizationPipeline {
1077 _pool: Arc<dyn DataTaskPool>,
1078 pending: Vec<Box<dyn DataTaskResultReceiver<VisualizationTaskOutput>>>,
1079}
1080
1081impl AsyncVisualizationPipeline {
1082 pub fn new(pool: Arc<dyn DataTaskPool>) -> Self {
1084 Self {
1085 _pool: pool,
1086 pending: Vec::new(),
1087 }
1088 }
1089
1090 pub fn dispatch(
1095 &mut self,
1096 task: Box<dyn FnOnce() -> VisualizationTaskOutput + Send + 'static>,
1097 ) {
1098 let receiver = spawn_on_thread(task);
1099 self.pending.push(receiver);
1100 }
1101
1102 pub fn poll(&mut self) -> Vec<VisualizationTaskOutput> {
1107 let mut completed = Vec::new();
1108 let mut still_pending = Vec::new();
1109
1110 for receiver in self.pending.drain(..) {
1111 match receiver.try_recv() {
1112 Some(result) => completed.push(result),
1113 None => still_pending.push(receiver),
1114 }
1115 }
1116
1117 self.pending = still_pending;
1118 completed
1119 }
1120
1121 pub fn pending_count(&self) -> usize {
1123 self.pending.len()
1124 }
1125}
1126
1127#[cfg(test)]
1128mod tests {
1129 use super::*;
1130 use crate::camera_projection::CameraProjection;
1131 use crate::geometry::{Feature, Geometry, LineString, Point, Polygon};
1132 use crate::layers::VectorStyle;
1133 use rustial_math::{ElevationGrid, GeoCoord, TileId};
1134 use std::collections::HashMap;
1135
1136 #[test]
1137 fn thread_pool_dispatches_and_receives_terrain() {
1138 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1139 let mut pipeline = AsyncDataPipeline::new(pool);
1140
1141 let tile = TileId::new(0, 0, 0);
1142 let elev = ElevationGrid::flat(tile, 4, 4);
1143 pipeline.dispatch_terrain(TerrainTaskInput {
1144 tile,
1145 elevation_source_tile: tile,
1146 elevation_region: TileTextureRegion::FULL,
1147 elevation: elev,
1148 resolution: 4,
1149 vertical_exaggeration: 1.0,
1150 generation: 1,
1151 });
1152
1153 for _ in 0..100 {
1154 pipeline.poll_terrain();
1155 if !pipeline.has_pending_tasks() {
1156 break;
1157 }
1158 std::thread::sleep(std::time::Duration::from_millis(10));
1159 }
1160
1161 let mut gens = HashMap::new();
1162 gens.insert(tile, 1u64);
1163 let (meshes, hillshades) = pipeline.collect_terrain(&[tile], &gens, 4);
1164 assert_eq!(meshes.len(), 1);
1165 assert_eq!(hillshades.len(), 1);
1166 assert_eq!(meshes[0].tile, tile);
1167 }
1168
1169 #[test]
1170 fn thread_pool_dispatches_and_receives_vector() {
1171 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1172 let mut pipeline = AsyncDataPipeline::new(pool);
1173
1174 let features = crate::geometry::FeatureCollection {
1175 features: vec![Feature {
1176 geometry: Geometry::Point(Point {
1177 coord: GeoCoord::from_lat_lon(0.0, 0.0),
1178 }),
1179 properties: HashMap::new(),
1180 }],
1181 };
1182
1183 let key = VectorCacheKey {
1184 layer_id: "test".into(),
1185 data_generation: 1,
1186 projection: CameraProjection::WebMercator,
1187 };
1188
1189 pipeline.dispatch_vector(VectorTaskInput {
1190 cache_key: key.clone(),
1191 features,
1192 style: VectorStyle::default(),
1193 query_layer_id: Some("test".into()),
1194 query_source_id: None,
1195 query_source_layer: None,
1196 feature_provenance: Vec::new(),
1197 projection: CameraProjection::WebMercator,
1198 terrain_samples: Vec::new(),
1199 });
1200
1201 for _ in 0..100 {
1202 pipeline.poll_vector();
1203 if !pipeline.has_pending_tasks() {
1204 break;
1205 }
1206 std::thread::sleep(std::time::Duration::from_millis(10));
1207 }
1208
1209 let (meshes, _candidates) = pipeline.collect_vectors(&[key]);
1210 assert_eq!(meshes.len(), 1);
1211 assert!(meshes[0].vertex_count() > 0);
1212 }
1213
1214 #[test]
1215 fn duplicate_dispatch_is_deduplicated() {
1216 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1217 let mut pipeline = AsyncDataPipeline::new(pool);
1218
1219 let tile = TileId::new(0, 0, 0);
1220 let elev = ElevationGrid::flat(tile, 4, 4);
1221 let input = TerrainTaskInput {
1222 tile,
1223 elevation_source_tile: tile,
1224 elevation_region: TileTextureRegion::FULL,
1225 elevation: elev,
1226 resolution: 4,
1227 vertical_exaggeration: 1.0,
1228 generation: 1,
1229 };
1230
1231 pipeline.dispatch_terrain(input.clone());
1232 pipeline.dispatch_terrain(input.clone());
1233 assert_eq!(pipeline.pending_terrain.len(), 1);
1234 }
1235
1236 #[test]
1237 fn cached_result_prevents_redispatch() {
1238 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1239 let mut pipeline = AsyncDataPipeline::new(pool);
1240
1241 let tile = TileId::new(0, 0, 0);
1242 let elev = ElevationGrid::flat(tile, 4, 4);
1243 let input = TerrainTaskInput {
1244 tile,
1245 elevation_source_tile: tile,
1246 elevation_region: TileTextureRegion::FULL,
1247 elevation: elev,
1248 resolution: 4,
1249 vertical_exaggeration: 1.0,
1250 generation: 1,
1251 };
1252
1253 pipeline.dispatch_terrain(input.clone());
1254
1255 for _ in 0..100 {
1256 pipeline.poll_terrain();
1257 if !pipeline.has_pending_tasks() {
1258 break;
1259 }
1260 std::thread::sleep(std::time::Duration::from_millis(10));
1261 }
1262
1263 pipeline.dispatch_terrain(input);
1264 assert!(pipeline.pending_terrain.is_empty());
1265 }
1266
1267 #[test]
1268 fn prune_terrain_removes_stale_entries() {
1269 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1270 let mut pipeline = AsyncDataPipeline::new(pool);
1271
1272 let tile_a = TileId::new(1, 0, 0);
1273 let tile_b = TileId::new(1, 1, 0);
1274 let elev_a = ElevationGrid::flat(tile_a, 4, 4);
1275 let elev_b = ElevationGrid::flat(tile_b, 4, 4);
1276 let mesh_a = crate::terrain::build_terrain_descriptor(&tile_a, &elev_a, 4, 1.0, 1);
1277 let mesh_b = crate::terrain::build_terrain_descriptor(&tile_b, &elev_b, 4, 1.0, 1);
1278 let hs_a = crate::terrain::prepare_hillshade_raster(&elev_a, 1.0, 1);
1279 let hs_b = crate::terrain::prepare_hillshade_raster(&elev_b, 1.0, 1);
1280
1281 pipeline.terrain_cache.insert(
1282 TerrainCacheKey {
1283 tile: tile_a,
1284 generation: 1,
1285 resolution: 4,
1286 },
1287 (mesh_a, hs_a),
1288 );
1289 pipeline.terrain_cache.insert(
1290 TerrainCacheKey {
1291 tile: tile_b,
1292 generation: 1,
1293 resolution: 4,
1294 },
1295 (mesh_b, hs_b),
1296 );
1297
1298 assert_eq!(pipeline.terrain_cache.len(), 2);
1299 pipeline.prune_terrain(&[tile_a]);
1300 assert_eq!(pipeline.terrain_cache.len(), 1);
1301 assert!(pipeline.terrain_cache.keys().all(|k| k.tile == tile_a));
1302 }
1303
1304 #[test]
1305 fn synchronous_fallback_path_still_works() {
1306 let mut state = crate::MapState::new();
1307 state.set_viewport(800, 600);
1308 state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
1309 state.set_camera_distance(1_000.0);
1310 state.update();
1311 assert!(state.zoom_level() <= 22);
1312 }
1313
1314 #[test]
1315 fn layer_generation_tracking() {
1316 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1317 let mut pipeline = AsyncDataPipeline::new(pool);
1318
1319 let gen1 = pipeline.layer_generation("layer-a", false);
1320 let gen2 = pipeline.layer_generation("layer-a", false);
1321 assert_eq!(gen1, gen2);
1322
1323 let gen3 = pipeline.layer_generation("layer-a", true);
1324 assert!(gen3 > gen2);
1325
1326 let gen4 = pipeline.layer_generation("layer-a", false);
1327 assert_eq!(gen3, gen4);
1328 }
1329
1330 #[test]
1331 fn collect_all_terrain_returns_cached_entries() {
1332 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1333 let mut pipeline = AsyncDataPipeline::new(pool);
1334
1335 let tile = TileId::new(0, 0, 0);
1336 let elev = ElevationGrid::flat(tile, 4, 4);
1337 pipeline.dispatch_terrain(TerrainTaskInput {
1338 tile,
1339 elevation_source_tile: tile,
1340 elevation_region: TileTextureRegion::FULL,
1341 elevation: elev,
1342 resolution: 4,
1343 vertical_exaggeration: 1.0,
1344 generation: 1,
1345 });
1346
1347 for _ in 0..100 {
1348 pipeline.poll_terrain();
1349 if !pipeline.has_pending_tasks() {
1350 break;
1351 }
1352 std::thread::sleep(std::time::Duration::from_millis(10));
1353 }
1354
1355 let (meshes, hillshades) = pipeline.collect_all_terrain();
1356 assert_eq!(meshes.len(), 1);
1357 assert_eq!(hillshades.len(), 1);
1358 assert_eq!(meshes[0].tile, tile);
1359 }
1360
1361 #[test]
1362 fn async_pipeline_terrain_via_map_state() {
1363 use crate::terrain::{FlatElevationSource, TerrainConfig};
1364
1365 let config = TerrainConfig {
1366 enabled: true,
1367 mesh_resolution: 4,
1368 source: Box::new(FlatElevationSource::new(4, 4)),
1369 ..TerrainConfig::default()
1370 };
1371 let mut state = crate::MapState::with_terrain(config, 100);
1372 state.set_viewport(800, 600);
1373 state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
1374 state.set_camera_distance(10_000_000.0);
1375
1376 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1377 state.set_task_pool(pool);
1378 assert!(state.has_async_pipeline());
1379
1380 state.update_with_dt(1.0 / 60.0);
1382
1383 for _ in 0..100 {
1385 std::thread::sleep(std::time::Duration::from_millis(10));
1386 state.update_with_dt(1.0 / 60.0);
1387 if !state.terrain_meshes().is_empty() {
1388 break;
1389 }
1390 }
1391
1392 assert!(
1393 !state.terrain_meshes().is_empty(),
1394 "async pipeline should produce terrain meshes"
1395 );
1396 }
1397
1398 #[test]
1399 fn async_pipeline_vectors_via_map_state() {
1400 use crate::layers::{VectorLayer, VectorStyle};
1401
1402 let mut state = crate::MapState::new();
1403 state.set_viewport(800, 600);
1404 state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
1405 state.set_camera_distance(1_000.0);
1406
1407 let fc = crate::geometry::FeatureCollection {
1408 features: vec![Feature {
1409 geometry: Geometry::Point(Point {
1410 coord: GeoCoord::from_lat_lon(0.0, 0.0),
1411 }),
1412 properties: HashMap::new(),
1413 }],
1414 };
1415 let vl = VectorLayer::new("test-vec", fc, VectorStyle::default());
1416 state.push_layer(Box::new(vl));
1417
1418 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1419 state.set_task_pool(pool);
1420
1421 state.update_with_dt(1.0 / 60.0);
1423
1424 for _ in 0..100 {
1426 std::thread::sleep(std::time::Duration::from_millis(10));
1427 state.update_with_dt(1.0 / 60.0);
1428 if !state.vector_meshes().is_empty() {
1429 break;
1430 }
1431 }
1432
1433 assert!(
1434 !state.vector_meshes().is_empty(),
1435 "async pipeline should produce vector meshes"
1436 );
1437 }
1438
1439 #[test]
1440 fn async_pipeline_clear_reverts_to_sync() {
1441 let mut state = crate::MapState::new();
1442 state.set_viewport(800, 600);
1443 state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
1444 state.set_camera_distance(1_000.0);
1445
1446 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1447 state.set_task_pool(pool);
1448 assert!(state.has_async_pipeline());
1449
1450 state.clear_task_pool();
1451 assert!(!state.has_async_pipeline());
1452
1453 state.update();
1455 assert!(state.zoom_level() <= 22);
1456 }
1457
1458 #[test]
1459 fn data_update_interval_ignored_with_async_pipeline() {
1460 use crate::terrain::{FlatElevationSource, TerrainConfig};
1461
1462 let config = TerrainConfig {
1463 enabled: true,
1464 mesh_resolution: 4,
1465 source: Box::new(FlatElevationSource::new(4, 4)),
1466 ..TerrainConfig::default()
1467 };
1468 let mut state = crate::MapState::with_terrain(config, 100);
1469 state.set_viewport(800, 600);
1470 state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
1471 state.set_camera_distance(10_000_000.0);
1472 state.set_data_update_interval(100.0); let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1475 state.set_task_pool(pool);
1476
1477 for _ in 0..100 {
1480 state.update_with_dt(1.0 / 60.0);
1481 std::thread::sleep(std::time::Duration::from_millis(10));
1482 if !state.terrain_meshes().is_empty() {
1483 break;
1484 }
1485 }
1486
1487 assert!(
1488 !state.terrain_meshes().is_empty(),
1489 "async pipeline should produce terrain despite high throttle"
1490 );
1491 }
1492
1493 #[test]
1498 fn partition_features_by_tile_separates_spatially() {
1499 let features = FeatureCollection {
1500 features: vec![
1501 Feature {
1502 geometry: Geometry::Point(Point {
1503 coord: GeoCoord::from_lat_lon(10.0, 10.0),
1504 }),
1505 properties: HashMap::new(),
1506 },
1507 Feature {
1508 geometry: Geometry::Point(Point {
1509 coord: GeoCoord::from_lat_lon(-30.0, -60.0),
1510 }),
1511 properties: HashMap::new(),
1512 },
1513 Feature {
1514 geometry: Geometry::Point(Point {
1515 coord: GeoCoord::from_lat_lon(10.01, 10.01),
1516 }),
1517 properties: HashMap::new(),
1518 },
1519 ],
1520 };
1521
1522 let buckets = partition_features_by_tile(&features, 4);
1523 let total: usize = buckets.values().map(|fc| fc.len()).sum();
1524 assert_eq!(total, 3, "all features should be partitioned");
1525 let tile_a = geo_to_tile(&GeoCoord::from_lat_lon(10.0, 10.0), 4).tile_id();
1527 let tile_c = geo_to_tile(&GeoCoord::from_lat_lon(10.01, 10.01), 4).tile_id();
1528 assert_eq!(tile_a, tile_c);
1529 assert_eq!(buckets.get(&tile_a).map(|fc| fc.len()), Some(2));
1530 }
1531
1532 #[test]
1533 fn partition_features_handles_linestring_and_polygon() {
1534 let features = FeatureCollection {
1535 features: vec![
1536 Feature {
1537 geometry: Geometry::LineString(LineString {
1538 coords: vec![
1539 GeoCoord::from_lat_lon(45.0, 90.0),
1540 GeoCoord::from_lat_lon(46.0, 91.0),
1541 ],
1542 }),
1543 properties: HashMap::new(),
1544 },
1545 Feature {
1546 geometry: Geometry::Polygon(Polygon {
1547 exterior: vec![
1548 GeoCoord::from_lat_lon(-10.0, -20.0),
1549 GeoCoord::from_lat_lon(-10.0, -19.0),
1550 GeoCoord::from_lat_lon(-9.0, -19.0),
1551 ],
1552 interiors: vec![],
1553 }),
1554 properties: HashMap::new(),
1555 },
1556 ],
1557 };
1558
1559 let buckets = partition_features_by_tile(&features, 6);
1560 let total: usize = buckets.values().map(|fc| fc.len()).sum();
1561 assert_eq!(total, 2);
1562 }
1563
1564 #[test]
1565 fn partition_features_drops_empty_geometry() {
1566 let features = FeatureCollection {
1567 features: vec![Feature {
1568 geometry: Geometry::GeometryCollection(vec![]),
1569 properties: HashMap::new(),
1570 }],
1571 };
1572
1573 let buckets = partition_features_by_tile(&features, 4);
1574 assert!(
1575 buckets.is_empty(),
1576 "empty geometry should not produce a bucket"
1577 );
1578 }
1579
1580 #[test]
1581 fn dispatch_and_poll_vector_buckets() {
1582 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1583 let mut pipeline = AsyncDataPipeline::new(pool);
1584
1585 let tile_a = geo_to_tile(&GeoCoord::from_lat_lon(10.0, 10.0), 4).tile_id();
1586 let tile_b = geo_to_tile(&GeoCoord::from_lat_lon(-30.0, -60.0), 4).tile_id();
1587
1588 let features = FeatureCollection {
1589 features: vec![
1590 Feature {
1591 geometry: Geometry::Point(Point {
1592 coord: GeoCoord::from_lat_lon(10.0, 10.0),
1593 }),
1594 properties: HashMap::new(),
1595 },
1596 Feature {
1597 geometry: Geometry::Point(Point {
1598 coord: GeoCoord::from_lat_lon(-30.0, -60.0),
1599 }),
1600 properties: HashMap::new(),
1601 },
1602 ],
1603 };
1604
1605 let tile_features = partition_features_by_tile(&features, 4);
1606 assert!(tile_features.len() >= 2);
1607
1608 pipeline.dispatch_vector_buckets(
1609 "test-layer",
1610 1,
1611 CameraProjection::WebMercator,
1612 &VectorStyle::default(),
1613 &tile_features,
1614 None,
1615 None,
1616 &HashMap::new(),
1617 &[],
1618 );
1619
1620 for _ in 0..100 {
1621 pipeline.poll_vector_buckets();
1622 if !pipeline.has_pending_tasks() {
1623 break;
1624 }
1625 std::thread::sleep(std::time::Duration::from_millis(10));
1626 }
1627
1628 let (meshes_a, _) = pipeline.collect_vector_buckets(
1630 "test-layer",
1631 1,
1632 CameraProjection::WebMercator,
1633 &[tile_a],
1634 );
1635 assert_eq!(meshes_a.len(), 1, "should have one mesh for tile_a");
1636
1637 let (meshes_b, _) = pipeline.collect_vector_buckets(
1639 "test-layer",
1640 1,
1641 CameraProjection::WebMercator,
1642 &[tile_b],
1643 );
1644 assert_eq!(meshes_b.len(), 1, "should have one mesh for tile_b");
1645
1646 let (meshes_both, _) = pipeline.collect_vector_buckets(
1648 "test-layer",
1649 1,
1650 CameraProjection::WebMercator,
1651 &[tile_a, tile_b],
1652 );
1653 assert_eq!(meshes_both.len(), 2, "should have meshes for both tiles");
1654 }
1655
1656 #[test]
1657 fn bucket_dispatch_deduplicates() {
1658 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1659 let mut pipeline = AsyncDataPipeline::new(pool);
1660
1661 let features = FeatureCollection {
1662 features: vec![Feature {
1663 geometry: Geometry::Point(Point {
1664 coord: GeoCoord::from_lat_lon(10.0, 10.0),
1665 }),
1666 properties: HashMap::new(),
1667 }],
1668 };
1669 let tile_features = partition_features_by_tile(&features, 4);
1670
1671 pipeline.dispatch_vector_buckets(
1672 "layer",
1673 1,
1674 CameraProjection::WebMercator,
1675 &VectorStyle::default(),
1676 &tile_features,
1677 None,
1678 None,
1679 &HashMap::new(),
1680 &[],
1681 );
1682 let first_count = pipeline.pending_buckets.len();
1683
1684 pipeline.dispatch_vector_buckets(
1685 "layer",
1686 1,
1687 CameraProjection::WebMercator,
1688 &VectorStyle::default(),
1689 &tile_features,
1690 None,
1691 None,
1692 &HashMap::new(),
1693 &[],
1694 );
1695 assert_eq!(
1696 pipeline.pending_buckets.len(),
1697 first_count,
1698 "duplicate dispatch should be deduped"
1699 );
1700 }
1701
1702 #[test]
1703 fn bucket_cache_hit_prevents_redispatch() {
1704 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1705 let mut pipeline = AsyncDataPipeline::new(pool);
1706
1707 let features = FeatureCollection {
1708 features: vec![Feature {
1709 geometry: Geometry::Point(Point {
1710 coord: GeoCoord::from_lat_lon(10.0, 10.0),
1711 }),
1712 properties: HashMap::new(),
1713 }],
1714 };
1715 let tile_features = partition_features_by_tile(&features, 4);
1716
1717 pipeline.dispatch_vector_buckets(
1718 "layer",
1719 1,
1720 CameraProjection::WebMercator,
1721 &VectorStyle::default(),
1722 &tile_features,
1723 None,
1724 None,
1725 &HashMap::new(),
1726 &[],
1727 );
1728
1729 for _ in 0..100 {
1730 pipeline.poll_vector_buckets();
1731 if !pipeline.has_pending_tasks() {
1732 break;
1733 }
1734 std::thread::sleep(std::time::Duration::from_millis(10));
1735 }
1736
1737 assert!(pipeline.bucket_cache_len() > 0);
1738
1739 pipeline.dispatch_vector_buckets(
1740 "layer",
1741 1,
1742 CameraProjection::WebMercator,
1743 &VectorStyle::default(),
1744 &tile_features,
1745 None,
1746 None,
1747 &HashMap::new(),
1748 &[],
1749 );
1750 assert!(
1751 pipeline.pending_buckets.is_empty(),
1752 "cached result should prevent redispatch"
1753 );
1754 }
1755
1756 #[test]
1757 fn prune_vector_buckets_removes_inactive_layers() {
1758 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1759 let mut pipeline = AsyncDataPipeline::new(pool);
1760
1761 let tile = TileId::new(4, 0, 0);
1762 let key_a = VectorBucketKey {
1763 layer_id: "keep".into(),
1764 tile,
1765 data_generation: 1,
1766 projection: CameraProjection::WebMercator,
1767 };
1768 let key_b = VectorBucketKey {
1769 layer_id: "remove".into(),
1770 tile,
1771 data_generation: 1,
1772 projection: CameraProjection::WebMercator,
1773 };
1774 pipeline
1775 .bucket_cache
1776 .insert(key_a, (VectorMeshData::default(), vec![]));
1777 pipeline
1778 .bucket_cache
1779 .insert(key_b, (VectorMeshData::default(), vec![]));
1780 assert_eq!(pipeline.bucket_cache_len(), 2);
1781
1782 pipeline.prune_vector_buckets(&["keep".to_string()]);
1783 assert_eq!(pipeline.bucket_cache_len(), 1);
1784 assert!(pipeline.bucket_cache.keys().all(|k| k.layer_id == "keep"));
1785 }
1786
1787 #[test]
1788 fn prune_vector_buckets_for_layer_removes_offscreen_tiles() {
1789 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1790 let mut pipeline = AsyncDataPipeline::new(pool);
1791
1792 let tile_a = TileId::new(4, 0, 0);
1793 let tile_b = TileId::new(4, 1, 0);
1794 let tile_c = TileId::new(4, 2, 0);
1795
1796 for tile in &[tile_a, tile_b, tile_c] {
1797 pipeline.bucket_cache.insert(
1798 VectorBucketKey {
1799 layer_id: "layer".into(),
1800 tile: *tile,
1801 data_generation: 1,
1802 projection: CameraProjection::WebMercator,
1803 },
1804 (VectorMeshData::default(), vec![]),
1805 );
1806 }
1807 assert_eq!(pipeline.bucket_cache_len(), 3);
1808
1809 pipeline.prune_vector_buckets_for_layer("layer", &[tile_a, tile_c]);
1810 assert_eq!(pipeline.bucket_cache_len(), 2);
1811 assert!(pipeline
1812 .bucket_cache
1813 .keys()
1814 .all(|k| k.tile == tile_a || k.tile == tile_c));
1815 }
1816
1817 #[test]
1818 fn collect_vector_buckets_only_returns_visible() {
1819 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1820 let mut pipeline = AsyncDataPipeline::new(pool);
1821
1822 let tile_visible = TileId::new(4, 5, 5);
1823 let tile_offscreen = TileId::new(4, 10, 10);
1824
1825 let mesh = VectorMeshData {
1826 positions: vec![[0.0, 0.0, 0.0], [1.0, 0.0, 0.0], [0.0, 1.0, 0.0]],
1827 colors: vec![[1.0; 4]; 3],
1828 indices: vec![0, 1, 2],
1829 ..Default::default()
1830 };
1831 for tile in &[tile_visible, tile_offscreen] {
1832 pipeline.bucket_cache.insert(
1833 VectorBucketKey {
1834 layer_id: "layer".into(),
1835 tile: *tile,
1836 data_generation: 1,
1837 projection: CameraProjection::WebMercator,
1838 },
1839 (mesh.clone(), vec![]),
1840 );
1841 }
1842
1843 let (meshes, _) = pipeline.collect_vector_buckets(
1844 "layer",
1845 1,
1846 CameraProjection::WebMercator,
1847 &[tile_visible],
1848 );
1849 assert_eq!(
1850 meshes.len(),
1851 1,
1852 "only visible tile bucket should be collected"
1853 );
1854 }
1855
1856 #[test]
1857 fn representative_coord_all_geometry_types() {
1858 let p = Geometry::Point(Point {
1859 coord: GeoCoord::from_lat_lon(1.0, 2.0),
1860 });
1861 assert_eq!(
1862 representative_coord(&p).map(|c| (c.lat, c.lon)),
1863 Some((1.0, 2.0))
1864 );
1865
1866 let ls = Geometry::LineString(LineString {
1867 coords: vec![
1868 GeoCoord::from_lat_lon(3.0, 4.0),
1869 GeoCoord::from_lat_lon(5.0, 6.0),
1870 ],
1871 });
1872 assert_eq!(
1873 representative_coord(&ls).map(|c| (c.lat, c.lon)),
1874 Some((3.0, 4.0))
1875 );
1876
1877 let poly = Geometry::Polygon(Polygon {
1878 exterior: vec![
1879 GeoCoord::from_lat_lon(7.0, 8.0),
1880 GeoCoord::from_lat_lon(7.0, 9.0),
1881 GeoCoord::from_lat_lon(8.0, 9.0),
1882 ],
1883 interiors: vec![],
1884 });
1885 assert_eq!(
1886 representative_coord(&poly).map(|c| (c.lat, c.lon)),
1887 Some((7.0, 8.0))
1888 );
1889
1890 let gc = Geometry::GeometryCollection(vec![]);
1891 assert!(representative_coord(&gc).is_none());
1892
1893 let gc_with = Geometry::GeometryCollection(vec![p.clone()]);
1894 assert!(representative_coord(&gc_with).is_some());
1895 }
1896
1897 #[test]
1898 fn dispatch_and_poll_decode_produces_decoded_tile() {
1899 use crate::tile_source::{RawVectorPayload, TileFreshness};
1900 use std::sync::Arc;
1901
1902 fn build_test_mvt() -> Vec<u8> {
1904 fn encode_varint(mut val: u64) -> Vec<u8> {
1905 let mut buf = Vec::new();
1906 loop {
1907 let mut byte = (val & 0x7F) as u8;
1908 val >>= 7;
1909 if val != 0 {
1910 byte |= 0x80;
1911 }
1912 buf.push(byte);
1913 if val == 0 {
1914 break;
1915 }
1916 }
1917 buf
1918 }
1919 fn encode_tag(field: u32, wt: u8) -> Vec<u8> {
1920 encode_varint(((field as u64) << 3) | wt as u64)
1921 }
1922 fn encode_ld(field: u32, data: &[u8]) -> Vec<u8> {
1923 let mut b = encode_tag(field, 2);
1924 b.extend(encode_varint(data.len() as u64));
1925 b.extend_from_slice(data);
1926 b
1927 }
1928 fn encode_vi(field: u32, val: u64) -> Vec<u8> {
1929 let mut b = encode_tag(field, 0);
1930 b.extend(encode_varint(val));
1931 b
1932 }
1933 fn zigzag(n: i32) -> u32 {
1934 ((n << 1) ^ (n >> 31)) as u32
1935 }
1936
1937 let mut geom = Vec::new();
1938 geom.extend(encode_varint(((1u64) << 3) | 1));
1939 geom.extend(encode_varint(zigzag(2048) as u64));
1940 geom.extend(encode_varint(zigzag(2048) as u64));
1941
1942 let mut feat = Vec::new();
1943 feat.extend(encode_vi(3, 1));
1944 feat.extend(encode_ld(4, &geom));
1945
1946 let mut layer = Vec::new();
1947 layer.extend(encode_ld(1, b"test"));
1948 layer.extend(encode_ld(2, &feat));
1949 layer.extend(encode_vi(5, 4096));
1950 layer.extend(encode_vi(15, 2));
1951
1952 encode_ld(3, &layer)
1953 }
1954
1955 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1956 let mut pipeline = AsyncDataPipeline::new(pool);
1957
1958 let tile = TileId::new(0, 0, 0);
1959 let raw = RawVectorPayload {
1960 tile_id: tile,
1961 bytes: Arc::new(build_test_mvt()),
1962 decode_options: crate::mvt::MvtDecodeOptions::default(),
1963 };
1964
1965 pipeline.dispatch_decode(tile, &raw, TileFreshness::default());
1966
1967 for _ in 0..50 {
1969 pipeline.poll_decodes();
1970 if !pipeline.decoded_tiles.is_empty() {
1971 break;
1972 }
1973 std::thread::sleep(std::time::Duration::from_millis(10));
1974 }
1975
1976 let decoded = pipeline.take_decoded_tiles();
1977 assert_eq!(decoded.len(), 1, "should have one decoded tile");
1978 let (decoded_id, response) = &decoded[0];
1979 assert_eq!(*decoded_id, tile);
1980 assert!(response.data.is_vector(), "decoded data should be Vector");
1981 let vt = response.data.as_vector().expect("should be Vector");
1982 assert!(vt.layers.contains_key("test"));
1983 assert_eq!(vt.layers["test"].len(), 1);
1984 }
1985
1986 #[test]
1987 fn dispatch_decode_deduplicates() {
1988 use crate::tile_source::{RawVectorPayload, TileFreshness};
1989 use std::sync::Arc;
1990
1991 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
1992 let mut pipeline = AsyncDataPipeline::new(pool);
1993
1994 let tile = TileId::new(0, 0, 0);
1995 let raw = RawVectorPayload {
1996 tile_id: tile,
1997 bytes: Arc::new(vec![]),
1998 decode_options: crate::mvt::MvtDecodeOptions::default(),
1999 };
2000
2001 pipeline.dispatch_decode(tile, &raw, TileFreshness::default());
2002 pipeline.dispatch_decode(tile, &raw, TileFreshness::default());
2003
2004 assert_eq!(pipeline.pending_decodes.len(), 1);
2006 }
2007
2008 #[test]
2013 fn async_viz_pipeline_dispatches_and_receives_scalar_update() {
2014 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
2015 let mut viz = AsyncVisualizationPipeline::new(pool);
2016
2017 viz.dispatch(Box::new(|| VisualizationTaskOutput::ScalarFieldUpdate {
2018 layer_name: "density".to_string(),
2019 values: vec![1.0, 2.0, 3.0, 4.0],
2020 }));
2021
2022 assert_eq!(viz.pending_count(), 1);
2023
2024 let mut results = Vec::new();
2026 for _ in 0..200 {
2027 results = viz.poll();
2028 if !results.is_empty() {
2029 break;
2030 }
2031 std::thread::sleep(std::time::Duration::from_millis(5));
2032 }
2033
2034 assert_eq!(results.len(), 1);
2035 match &results[0] {
2036 VisualizationTaskOutput::ScalarFieldUpdate { layer_name, values } => {
2037 assert_eq!(layer_name, "density");
2038 assert_eq!(values, &[1.0, 2.0, 3.0, 4.0]);
2039 }
2040 _ => panic!("expected ScalarFieldUpdate"),
2041 }
2042 assert_eq!(viz.pending_count(), 0);
2043 }
2044
2045 #[test]
2046 fn async_viz_pipeline_dispatches_and_receives_column_update() {
2047 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
2048 let mut viz = AsyncVisualizationPipeline::new(pool);
2049
2050 viz.dispatch(Box::new(|| VisualizationTaskOutput::ColumnUpdate {
2051 layer_name: "bars".to_string(),
2052 columns: crate::visualization::ColumnInstanceSet::new(vec![
2053 crate::visualization::ColumnInstance::new(
2054 GeoCoord::from_lat_lon(0.0, 0.0),
2055 100.0,
2056 10.0,
2057 ),
2058 ]),
2059 ramp: crate::visualization::ColorRamp::new(vec![
2060 crate::visualization::ColorStop {
2061 value: 0.0,
2062 color: [0.0, 0.0, 1.0, 1.0],
2063 },
2064 crate::visualization::ColorStop {
2065 value: 1.0,
2066 color: [1.0, 0.0, 0.0, 1.0],
2067 },
2068 ]),
2069 }));
2070
2071 let mut results = Vec::new();
2072 for _ in 0..200 {
2073 results = viz.poll();
2074 if !results.is_empty() {
2075 break;
2076 }
2077 std::thread::sleep(std::time::Duration::from_millis(5));
2078 }
2079
2080 assert_eq!(results.len(), 1);
2081 match &results[0] {
2082 VisualizationTaskOutput::ColumnUpdate {
2083 layer_name,
2084 columns,
2085 ..
2086 } => {
2087 assert_eq!(layer_name, "bars");
2088 assert_eq!(columns.columns.len(), 1);
2089 }
2090 _ => panic!("expected ColumnUpdate"),
2091 }
2092 }
2093
2094 #[test]
2095 fn async_viz_pipeline_dispatches_and_receives_point_cloud_update() {
2096 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
2097 let mut viz = AsyncVisualizationPipeline::new(pool);
2098
2099 viz.dispatch(Box::new(|| VisualizationTaskOutput::PointCloudUpdate {
2100 layer_name: "scatter".to_string(),
2101 points: crate::visualization::PointInstanceSet::new(vec![
2102 crate::visualization::PointInstance::new(GeoCoord::from_lat_lon(1.0, 2.0), 5.0),
2103 crate::visualization::PointInstance::new(GeoCoord::from_lat_lon(3.0, 4.0), 8.0),
2104 ]),
2105 ramp: crate::visualization::ColorRamp::new(vec![
2106 crate::visualization::ColorStop {
2107 value: 0.0,
2108 color: [0.0, 1.0, 0.0, 1.0],
2109 },
2110 crate::visualization::ColorStop {
2111 value: 1.0,
2112 color: [1.0, 1.0, 0.0, 1.0],
2113 },
2114 ]),
2115 }));
2116
2117 let mut results = Vec::new();
2118 for _ in 0..200 {
2119 results = viz.poll();
2120 if !results.is_empty() {
2121 break;
2122 }
2123 std::thread::sleep(std::time::Duration::from_millis(5));
2124 }
2125
2126 assert_eq!(results.len(), 1);
2127 match &results[0] {
2128 VisualizationTaskOutput::PointCloudUpdate {
2129 layer_name, points, ..
2130 } => {
2131 assert_eq!(layer_name, "scatter");
2132 assert_eq!(points.points.len(), 2);
2133 }
2134 _ => panic!("expected PointCloudUpdate"),
2135 }
2136 }
2137
2138 #[test]
2139 fn async_viz_pipeline_multiple_tasks_complete_independently() {
2140 let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
2141 let mut viz = AsyncVisualizationPipeline::new(pool);
2142
2143 for i in 0..5 {
2144 let name = format!("layer_{i}");
2145 viz.dispatch(Box::new(move || {
2146 VisualizationTaskOutput::ScalarFieldUpdate {
2147 layer_name: name,
2148 values: vec![i as f32; 4],
2149 }
2150 }));
2151 }
2152
2153 assert_eq!(viz.pending_count(), 5);
2154
2155 let mut all_results = Vec::new();
2156 for _ in 0..200 {
2157 let batch = viz.poll();
2158 all_results.extend(batch);
2159 if all_results.len() >= 5 {
2160 break;
2161 }
2162 std::thread::sleep(std::time::Duration::from_millis(10));
2163 }
2164
2165 assert_eq!(all_results.len(), 5);
2166 assert_eq!(viz.pending_count(), 0);
2167 }
2168}