use crate::camera_projection::CameraProjection;
use crate::geometry::{Feature, FeatureCollection, Geometry};
use crate::layers::{FeatureProvenance, VectorMeshData, VectorStyle};
use crate::symbols::SymbolCandidate;
use crate::terrain::{PreparedHillshadeRaster, TerrainMeshData};
use crate::tile_manager::TileTextureRegion;
use rustial_math::{geo_to_tile, ElevationGrid, GeoCoord, TileId};
use std::collections::HashMap;
use std::sync::Arc;
pub trait DataTaskResultReceiver<T: Send + 'static>: Send + Sync {
fn try_recv(&self) -> Option<T>;
}
pub trait DataTaskPool: Send + Sync {
fn spawn_terrain(
&self,
task: Box<dyn FnOnce() -> TerrainTaskOutput + Send + 'static>,
) -> Box<dyn DataTaskResultReceiver<TerrainTaskOutput>>;
fn spawn_vector(
&self,
task: Box<dyn FnOnce() -> VectorTaskOutput + Send + 'static>,
) -> Box<dyn DataTaskResultReceiver<VectorTaskOutput>>;
fn spawn_decode(
&self,
task: Box<dyn FnOnce() -> MvtDecodeOutput + Send + 'static>,
) -> Box<dyn DataTaskResultReceiver<MvtDecodeOutput>>;
}
pub struct ThreadDataTaskPool;
impl ThreadDataTaskPool {
pub fn new() -> Self {
Self
}
}
impl Default for ThreadDataTaskPool {
fn default() -> Self {
Self::new()
}
}
struct ChannelReceiver<T> {
rx: std::sync::mpsc::Receiver<T>,
}
impl<T: Send + 'static> DataTaskResultReceiver<T> for ChannelReceiver<T> {
fn try_recv(&self) -> Option<T> {
self.rx.try_recv().ok()
}
}
unsafe impl<T: Send> Sync for ChannelReceiver<T> {}
fn spawn_on_thread<T: Send + 'static>(
task: Box<dyn FnOnce() -> T + Send + 'static>,
) -> Box<dyn DataTaskResultReceiver<T>> {
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let result = task();
let _ = tx.send(result);
});
Box::new(ChannelReceiver { rx })
}
impl DataTaskPool for ThreadDataTaskPool {
fn spawn_terrain(
&self,
task: Box<dyn FnOnce() -> TerrainTaskOutput + Send + 'static>,
) -> Box<dyn DataTaskResultReceiver<TerrainTaskOutput>> {
spawn_on_thread(task)
}
fn spawn_vector(
&self,
task: Box<dyn FnOnce() -> VectorTaskOutput + Send + 'static>,
) -> Box<dyn DataTaskResultReceiver<VectorTaskOutput>> {
spawn_on_thread(task)
}
fn spawn_decode(
&self,
task: Box<dyn FnOnce() -> MvtDecodeOutput + Send + 'static>,
) -> Box<dyn DataTaskResultReceiver<MvtDecodeOutput>> {
spawn_on_thread(task)
}
}
#[derive(Clone)]
pub struct TerrainTaskInput {
pub tile: TileId,
pub elevation_source_tile: TileId,
pub elevation_region: TileTextureRegion,
pub elevation: ElevationGrid,
pub resolution: u16,
pub vertical_exaggeration: f64,
pub generation: u64,
}
pub struct TerrainTaskOutput {
pub tile: TileId,
pub mesh: TerrainMeshData,
pub hillshade: PreparedHillshadeRaster,
pub generation: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TerrainCacheKey {
pub tile: TileId,
pub generation: u64,
pub resolution: u16,
}
#[derive(Clone)]
pub struct VectorTaskInput {
pub cache_key: VectorCacheKey,
pub features: FeatureCollection,
pub style: VectorStyle,
pub query_layer_id: Option<String>,
pub query_source_id: Option<String>,
pub query_source_layer: Option<String>,
pub feature_provenance: Vec<Option<FeatureProvenance>>,
pub projection: CameraProjection,
pub terrain_samples: Vec<(GeoCoord, f64)>,
}
pub struct VectorTaskOutput {
pub cache_key: VectorCacheKey,
pub mesh: VectorMeshData,
pub symbol_candidates: Vec<SymbolCandidate>,
}
pub struct MvtDecodeOutput {
pub tile: TileId,
pub result: Result<crate::tile_source::VectorTileData, crate::tile_source::TileError>,
pub freshness: crate::tile_source::TileFreshness,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct VectorCacheKey {
pub layer_id: String,
pub data_generation: u64,
pub projection: CameraProjection,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct VectorBucketKey {
pub layer_id: String,
pub tile: TileId,
pub data_generation: u64,
pub projection: CameraProjection,
}
fn representative_coord(geometry: &Geometry) -> Option<GeoCoord> {
match geometry {
Geometry::Point(p) => Some(p.coord),
Geometry::LineString(ls) => ls.coords.first().copied(),
Geometry::Polygon(poly) => poly.exterior.first().copied(),
Geometry::MultiPoint(mp) => mp.points.first().map(|p| p.coord),
Geometry::MultiLineString(mls) => {
mls.lines.first().and_then(|ls| ls.coords.first().copied())
}
Geometry::MultiPolygon(mpoly) => mpoly
.polygons
.first()
.and_then(|p| p.exterior.first().copied()),
Geometry::GeometryCollection(geoms) => geoms.iter().find_map(representative_coord),
}
}
pub fn partition_features_by_tile(
features: &FeatureCollection,
zoom: u8,
) -> HashMap<TileId, FeatureCollection> {
let mut buckets: HashMap<TileId, Vec<Feature>> = HashMap::new();
for feature in &features.features {
if let Some(coord) = representative_coord(&feature.geometry) {
let tile_coord = geo_to_tile(&coord, zoom);
let tile_id = tile_coord.tile_id();
buckets.entry(tile_id).or_default().push(feature.clone());
}
}
buckets
.into_iter()
.map(|(tile, feats)| (tile, FeatureCollection { features: feats }))
.collect()
}
struct PendingTerrainTask {
key: TerrainCacheKey,
receiver: Box<dyn DataTaskResultReceiver<TerrainTaskOutput>>,
}
struct PendingVectorTask {
key: VectorCacheKey,
receiver: Box<dyn DataTaskResultReceiver<VectorTaskOutput>>,
}
struct PendingVectorBucketTask {
key: VectorBucketKey,
receiver: Box<dyn DataTaskResultReceiver<VectorBucketOutput>>,
}
struct PendingDecodeTask {
tile: TileId,
receiver: Box<dyn DataTaskResultReceiver<MvtDecodeOutput>>,
}
struct VectorBucketOutput {
mesh: VectorMeshData,
symbol_candidates: Vec<SymbolCandidate>,
}
pub(crate) struct AsyncDataPipeline {
pool: Arc<dyn DataTaskPool>,
pending_terrain: Vec<PendingTerrainTask>,
pending_vector: Vec<PendingVectorTask>,
terrain_cache: HashMap<TerrainCacheKey, (TerrainMeshData, PreparedHillshadeRaster)>,
vector_cache: HashMap<VectorCacheKey, (VectorMeshData, Vec<SymbolCandidate>)>,
next_vector_generation: u64,
layer_generations: HashMap<String, u64>,
pending_buckets: Vec<PendingVectorBucketTask>,
bucket_cache: HashMap<VectorBucketKey, (VectorMeshData, Vec<SymbolCandidate>)>,
pending_decodes: Vec<PendingDecodeTask>,
pub(crate) decoded_tiles: Vec<(TileId, crate::tile_source::TileResponse)>,
}
impl AsyncDataPipeline {
pub(crate) fn new(pool: Arc<dyn DataTaskPool>) -> Self {
Self {
pool,
pending_terrain: Vec::new(),
pending_vector: Vec::new(),
terrain_cache: HashMap::new(),
vector_cache: HashMap::new(),
next_vector_generation: 1,
layer_generations: HashMap::new(),
pending_buckets: Vec::new(),
bucket_cache: HashMap::new(),
pending_decodes: Vec::new(),
decoded_tiles: Vec::new(),
}
}
pub(crate) fn dispatch_terrain(&mut self, input: TerrainTaskInput) {
let key = TerrainCacheKey {
tile: input.tile,
generation: input.generation,
resolution: input.resolution,
};
if self.terrain_cache.contains_key(&key) {
return;
}
if self.pending_terrain.iter().any(|p| p.key == key) {
return;
}
let receiver = self.pool.spawn_terrain(Box::new(move || {
let mesh = crate::terrain::build_terrain_descriptor_with_source(
&input.tile,
input.elevation_source_tile,
input.elevation_region,
&input.elevation,
input.resolution,
input.vertical_exaggeration,
input.generation,
);
let hillshade = crate::terrain::prepare_hillshade_raster(
&input.elevation,
input.vertical_exaggeration,
input.generation,
);
TerrainTaskOutput {
tile: input.tile,
mesh,
hillshade,
generation: input.generation,
}
}));
self.pending_terrain
.push(PendingTerrainTask { key, receiver });
}
pub(crate) fn poll_terrain(&mut self) {
let mut still_pending = Vec::new();
for task in self.pending_terrain.drain(..) {
if let Some(output) = task.receiver.try_recv() {
self.terrain_cache
.insert(task.key, (output.mesh, output.hillshade));
} else {
still_pending.push(task);
}
}
self.pending_terrain = still_pending;
}
#[allow(dead_code)]
pub(crate) fn collect_terrain(
&self,
desired_tiles: &[TileId],
tile_generations: &HashMap<TileId, u64>,
resolution: u16,
) -> (Vec<TerrainMeshData>, Vec<PreparedHillshadeRaster>) {
let mut meshes = Vec::with_capacity(desired_tiles.len());
let mut hillshades = Vec::with_capacity(desired_tiles.len());
for tile in desired_tiles {
let generation = tile_generations.get(tile).copied().unwrap_or(0);
let key = TerrainCacheKey {
tile: *tile,
generation,
resolution,
};
if let Some((mesh, hs)) = self.terrain_cache.get(&key) {
meshes.push(mesh.clone());
hillshades.push(hs.clone());
}
}
(meshes, hillshades)
}
pub(crate) fn collect_all_terrain(
&self,
) -> (Vec<TerrainMeshData>, Vec<PreparedHillshadeRaster>) {
let mut meshes = Vec::with_capacity(self.terrain_cache.len());
let mut hillshades = Vec::with_capacity(self.terrain_cache.len());
for (mesh, hs) in self.terrain_cache.values() {
meshes.push(mesh.clone());
hillshades.push(hs.clone());
}
(meshes, hillshades)
}
pub(crate) fn prune_terrain(&mut self, visible_tiles: &[TileId]) {
let visible_set: std::collections::HashSet<TileId> =
visible_tiles.iter().copied().collect();
self.terrain_cache
.retain(|key, _| visible_set.contains(&key.tile));
}
pub(crate) fn layer_generation(&mut self, layer_id: &str, features_changed: bool) -> u64 {
if features_changed {
let gen = self.next_vector_generation;
self.next_vector_generation += 1;
self.layer_generations.insert(layer_id.to_owned(), gen);
gen
} else {
self.layer_generations
.get(layer_id)
.copied()
.unwrap_or_else(|| {
let gen = self.next_vector_generation;
self.next_vector_generation += 1;
self.layer_generations.insert(layer_id.to_owned(), gen);
gen
})
}
}
pub(crate) fn dispatch_vector(&mut self, input: VectorTaskInput) {
let key = input.cache_key.clone();
if self.vector_cache.contains_key(&key) {
return;
}
if self.pending_vector.iter().any(|p| p.key == key) {
return;
}
let receiver = self.pool.spawn_vector(Box::new(move || {
let mut features = input.features;
if !input.terrain_samples.is_empty() {
apply_terrain_samples(&mut features, &input.terrain_samples);
}
let mut temp_layer = crate::layers::VectorLayer::new(
&input.cache_key.layer_id,
FeatureCollection::default(),
input.style,
)
.with_query_metadata(input.query_layer_id, input.query_source_id)
.with_source_layer(input.query_source_layer);
temp_layer.set_features_with_provenance(features, input.feature_provenance);
let mesh = temp_layer.tessellate(input.projection);
let candidates = temp_layer.symbol_candidates();
VectorTaskOutput {
cache_key: input.cache_key,
mesh,
symbol_candidates: candidates,
}
}));
self.pending_vector
.push(PendingVectorTask { key, receiver });
}
pub(crate) fn poll_vector(&mut self) {
let mut still_pending = Vec::new();
for task in self.pending_vector.drain(..) {
if let Some(output) = task.receiver.try_recv() {
self.vector_cache
.insert(task.key, (output.mesh, output.symbol_candidates));
} else {
still_pending.push(task);
}
}
self.pending_vector = still_pending;
}
pub(crate) fn collect_vectors(
&self,
layer_keys: &[VectorCacheKey],
) -> (Vec<VectorMeshData>, Vec<SymbolCandidate>) {
let mut meshes = Vec::new();
let mut candidates = Vec::new();
for key in layer_keys {
if let Some((mesh, cands)) = self.vector_cache.get(key) {
if !mesh.is_empty() {
meshes.push(mesh.clone());
}
candidates.extend(cands.iter().cloned());
}
}
(meshes, candidates)
}
pub(crate) fn prune_vectors(&mut self, active_layer_ids: &[String]) {
let active_set: std::collections::HashSet<&str> =
active_layer_ids.iter().map(String::as_str).collect();
self.vector_cache
.retain(|key, _| active_set.contains(key.layer_id.as_str()));
self.layer_generations
.retain(|id, _| active_set.contains(id.as_str()));
}
#[cfg(test)]
pub(crate) fn current_layer_generation(&self, layer_id: &str) -> Option<u64> {
self.layer_generations.get(layer_id).copied()
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn dispatch_vector_buckets(
&mut self,
layer_id: &str,
data_generation: u64,
projection: CameraProjection,
style: &VectorStyle,
tile_features: &HashMap<TileId, FeatureCollection>,
query_source_id: Option<&str>,
query_source_layer: Option<&str>,
tile_provenance: &HashMap<TileId, Vec<Option<FeatureProvenance>>>,
terrain_samples: &[(GeoCoord, f64)],
) {
for (tile, features) in tile_features {
let key = VectorBucketKey {
layer_id: layer_id.to_owned(),
tile: *tile,
data_generation,
projection,
};
if self.bucket_cache.contains_key(&key) {
continue;
}
if self.pending_buckets.iter().any(|p| p.key == key) {
continue;
}
let task_key = key.clone();
let task_features = features.clone();
let task_style = style.clone();
let task_samples = terrain_samples.to_vec();
let task_layer_id = layer_id.to_owned();
let task_source_id = query_source_id.map(ToOwned::to_owned);
let task_source_layer = query_source_layer.map(ToOwned::to_owned);
let task_provenance = tile_provenance.get(tile).cloned().unwrap_or_default();
let receiver = self.pool.spawn_vector(Box::new(move || {
let mut feats = task_features;
if !task_samples.is_empty() {
apply_terrain_samples(&mut feats, &task_samples);
}
let mut temp_layer = crate::layers::VectorLayer::new(
&task_layer_id,
FeatureCollection::default(),
task_style,
)
.with_query_metadata(Some(task_layer_id.clone()), task_source_id)
.with_source_layer(task_source_layer);
temp_layer.set_features_with_provenance(feats, task_provenance);
let mesh = temp_layer.tessellate(task_key.projection);
let candidates = temp_layer.symbol_candidates();
VectorTaskOutput {
cache_key: VectorCacheKey {
layer_id: task_key.layer_id.clone(),
data_generation: task_key.data_generation,
projection: task_key.projection,
},
mesh,
symbol_candidates: candidates,
}
}));
self.pending_buckets.push(PendingVectorBucketTask {
key,
receiver: Box::new(BucketReceiverAdapter { inner: receiver }),
});
}
}
pub(crate) fn poll_vector_buckets(&mut self) {
let mut still_pending = Vec::new();
for task in self.pending_buckets.drain(..) {
if let Some(output) = task.receiver.try_recv() {
self.bucket_cache
.insert(task.key, (output.mesh, output.symbol_candidates));
} else {
still_pending.push(task);
}
}
self.pending_buckets = still_pending;
}
pub(crate) fn dispatch_decode(
&mut self,
tile: TileId,
raw: &crate::tile_source::RawVectorPayload,
freshness: crate::tile_source::TileFreshness,
) {
if self.pending_decodes.iter().any(|t| t.tile == tile) {
return;
}
let bytes = Arc::clone(&raw.bytes);
let opts = raw.decode_options.clone();
let task_tile = tile;
let receiver = self.pool.spawn_decode(Box::new(move || {
let result = crate::mvt::decode_mvt(&bytes, &task_tile, &opts)
.map(|layers| crate::tile_source::VectorTileData { layers })
.map_err(|e| crate::tile_source::TileError::Decode(format!("MVT decode: {e}")));
MvtDecodeOutput {
tile: task_tile,
result,
freshness,
}
}));
self.pending_decodes
.push(PendingDecodeTask { tile, receiver });
}
pub(crate) fn poll_decodes(&mut self) {
let mut still_pending = Vec::new();
for task in self.pending_decodes.drain(..) {
if let Some(output) = task.receiver.try_recv() {
match output.result {
Ok(vector_data) => {
self.decoded_tiles.push((
output.tile,
crate::tile_source::TileResponse {
data: crate::tile_source::TileData::Vector(vector_data),
freshness: output.freshness,
not_modified: false,
},
));
}
Err(_err) => {
}
}
} else {
still_pending.push(task);
}
}
self.pending_decodes = still_pending;
}
pub(crate) fn take_decoded_tiles(&mut self) -> Vec<(TileId, crate::tile_source::TileResponse)> {
std::mem::take(&mut self.decoded_tiles)
}
#[allow(dead_code)]
pub(crate) fn pending_decode_count(&self) -> usize {
self.pending_decodes.len()
}
pub(crate) fn collect_vector_buckets(
&self,
layer_id: &str,
data_generation: u64,
projection: CameraProjection,
visible_tiles: &[TileId],
) -> (Vec<VectorMeshData>, Vec<SymbolCandidate>) {
let mut meshes = Vec::new();
let mut candidates = Vec::new();
for tile in visible_tiles {
let key = VectorBucketKey {
layer_id: layer_id.to_owned(),
tile: *tile,
data_generation,
projection,
};
if let Some((mesh, cands)) = self.bucket_cache.get(&key) {
if !mesh.is_empty() {
meshes.push(mesh.clone());
}
candidates.extend(cands.iter().cloned());
}
}
(meshes, candidates)
}
#[allow(dead_code)]
pub(crate) fn prune_vector_buckets(&mut self, active_layer_ids: &[String]) {
let active_set: std::collections::HashSet<&str> =
active_layer_ids.iter().map(String::as_str).collect();
self.bucket_cache
.retain(|key, _| active_set.contains(key.layer_id.as_str()));
}
pub(crate) fn evict_vector_buckets(&mut self, layer_id: &str, tiles: &[TileId]) {
if tiles.is_empty() {
return;
}
let tile_set: std::collections::HashSet<TileId> = tiles.iter().copied().collect();
self.bucket_cache
.retain(|key, _| key.layer_id != layer_id || !tile_set.contains(&key.tile));
self.pending_buckets
.retain(|task| task.key.layer_id != layer_id || !tile_set.contains(&task.key.tile));
}
#[allow(dead_code)]
pub(crate) fn prune_vector_buckets_for_layer(
&mut self,
layer_id: &str,
visible_tiles: &[TileId],
) {
let visible_set: std::collections::HashSet<TileId> =
visible_tiles.iter().copied().collect();
self.bucket_cache
.retain(|key, _| key.layer_id != layer_id || visible_set.contains(&key.tile));
}
#[allow(dead_code)]
pub(crate) fn bucket_cache_len(&self) -> usize {
self.bucket_cache.len()
}
#[allow(dead_code)]
pub(crate) fn has_pending_tasks(&self) -> bool {
!self.pending_terrain.is_empty()
|| !self.pending_vector.is_empty()
|| !self.pending_buckets.is_empty()
}
}
struct BucketReceiverAdapter {
inner: Box<dyn DataTaskResultReceiver<VectorTaskOutput>>,
}
impl DataTaskResultReceiver<VectorBucketOutput> for BucketReceiverAdapter {
fn try_recv(&self) -> Option<VectorBucketOutput> {
self.inner.try_recv().map(|output| VectorBucketOutput {
mesh: output.mesh,
symbol_candidates: output.symbol_candidates,
})
}
}
fn apply_terrain_samples(features: &mut FeatureCollection, samples: &[(GeoCoord, f64)]) {
let lookup: HashMap<(i64, i64), f64> = samples
.iter()
.map(|(coord, elev)| {
let key = (
(coord.lat * 100_000.0).round() as i64,
(coord.lon * 100_000.0).round() as i64,
);
(key, *elev)
})
.collect();
for feature in &mut features.features {
apply_terrain_to_geometry(&mut feature.geometry, &lookup);
}
}
fn apply_terrain_to_geometry(
geometry: &mut crate::geometry::Geometry,
lookup: &HashMap<(i64, i64), f64>,
) {
use crate::geometry::Geometry;
match geometry {
Geometry::Point(p) => {
if let Some(&elev) = lookup.get(&coord_key(&p.coord)) {
p.coord.alt = elev;
}
}
Geometry::LineString(ls) => {
for coord in &mut ls.coords {
if let Some(&elev) = lookup.get(&coord_key(coord)) {
coord.alt = elev;
}
}
}
Geometry::Polygon(poly) => {
for coord in &mut poly.exterior {
if let Some(&elev) = lookup.get(&coord_key(coord)) {
coord.alt = elev;
}
}
for hole in &mut poly.interiors {
for coord in hole {
if let Some(&elev) = lookup.get(&coord_key(coord)) {
coord.alt = elev;
}
}
}
}
Geometry::MultiPoint(mp) => {
for p in &mut mp.points {
if let Some(&elev) = lookup.get(&coord_key(&p.coord)) {
p.coord.alt = elev;
}
}
}
Geometry::MultiLineString(mls) => {
for ls in &mut mls.lines {
for coord in &mut ls.coords {
if let Some(&elev) = lookup.get(&coord_key(coord)) {
coord.alt = elev;
}
}
}
}
Geometry::MultiPolygon(mpoly) => {
for poly in &mut mpoly.polygons {
for coord in &mut poly.exterior {
if let Some(&elev) = lookup.get(&coord_key(coord)) {
coord.alt = elev;
}
}
for hole in &mut poly.interiors {
for coord in hole {
if let Some(&elev) = lookup.get(&coord_key(coord)) {
coord.alt = elev;
}
}
}
}
}
Geometry::GeometryCollection(geoms) => {
for g in geoms {
apply_terrain_to_geometry(g, lookup);
}
}
}
}
fn coord_key(coord: &GeoCoord) -> (i64, i64) {
(
(coord.lat * 100_000.0).round() as i64,
(coord.lon * 100_000.0).round() as i64,
)
}
pub enum VisualizationTaskOutput {
ScalarFieldUpdate {
layer_name: String,
values: Vec<f32>,
},
ColumnUpdate {
layer_name: String,
columns: crate::visualization::ColumnInstanceSet,
ramp: crate::visualization::ColorRamp,
},
PointCloudUpdate {
layer_name: String,
points: crate::visualization::PointInstanceSet,
ramp: crate::visualization::ColorRamp,
},
}
pub struct AsyncVisualizationPipeline {
_pool: Arc<dyn DataTaskPool>,
pending: Vec<Box<dyn DataTaskResultReceiver<VisualizationTaskOutput>>>,
}
impl AsyncVisualizationPipeline {
pub fn new(pool: Arc<dyn DataTaskPool>) -> Self {
Self {
_pool: pool,
pending: Vec::new(),
}
}
pub fn dispatch(
&mut self,
task: Box<dyn FnOnce() -> VisualizationTaskOutput + Send + 'static>,
) {
let receiver = spawn_on_thread(task);
self.pending.push(receiver);
}
pub fn poll(&mut self) -> Vec<VisualizationTaskOutput> {
let mut completed = Vec::new();
let mut still_pending = Vec::new();
for receiver in self.pending.drain(..) {
match receiver.try_recv() {
Some(result) => completed.push(result),
None => still_pending.push(receiver),
}
}
self.pending = still_pending;
completed
}
pub fn pending_count(&self) -> usize {
self.pending.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::camera_projection::CameraProjection;
use crate::geometry::{Feature, Geometry, LineString, Point, Polygon};
use crate::layers::VectorStyle;
use rustial_math::{ElevationGrid, GeoCoord, TileId};
use std::collections::HashMap;
#[test]
fn thread_pool_dispatches_and_receives_terrain() {
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
let mut pipeline = AsyncDataPipeline::new(pool);
let tile = TileId::new(0, 0, 0);
let elev = ElevationGrid::flat(tile, 4, 4);
pipeline.dispatch_terrain(TerrainTaskInput {
tile,
elevation_source_tile: tile,
elevation_region: TileTextureRegion::FULL,
elevation: elev,
resolution: 4,
vertical_exaggeration: 1.0,
generation: 1,
});
for _ in 0..100 {
pipeline.poll_terrain();
if !pipeline.has_pending_tasks() {
break;
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
let mut gens = HashMap::new();
gens.insert(tile, 1u64);
let (meshes, hillshades) = pipeline.collect_terrain(&[tile], &gens, 4);
assert_eq!(meshes.len(), 1);
assert_eq!(hillshades.len(), 1);
assert_eq!(meshes[0].tile, tile);
}
#[test]
fn thread_pool_dispatches_and_receives_vector() {
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
let mut pipeline = AsyncDataPipeline::new(pool);
let features = crate::geometry::FeatureCollection {
features: vec![Feature {
geometry: Geometry::Point(Point {
coord: GeoCoord::from_lat_lon(0.0, 0.0),
}),
properties: HashMap::new(),
}],
};
let key = VectorCacheKey {
layer_id: "test".into(),
data_generation: 1,
projection: CameraProjection::WebMercator,
};
pipeline.dispatch_vector(VectorTaskInput {
cache_key: key.clone(),
features,
style: VectorStyle::default(),
query_layer_id: Some("test".into()),
query_source_id: None,
query_source_layer: None,
feature_provenance: Vec::new(),
projection: CameraProjection::WebMercator,
terrain_samples: Vec::new(),
});
for _ in 0..100 {
pipeline.poll_vector();
if !pipeline.has_pending_tasks() {
break;
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
let (meshes, _candidates) = pipeline.collect_vectors(&[key]);
assert_eq!(meshes.len(), 1);
assert!(meshes[0].vertex_count() > 0);
}
#[test]
fn duplicate_dispatch_is_deduplicated() {
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
let mut pipeline = AsyncDataPipeline::new(pool);
let tile = TileId::new(0, 0, 0);
let elev = ElevationGrid::flat(tile, 4, 4);
let input = TerrainTaskInput {
tile,
elevation_source_tile: tile,
elevation_region: TileTextureRegion::FULL,
elevation: elev,
resolution: 4,
vertical_exaggeration: 1.0,
generation: 1,
};
pipeline.dispatch_terrain(input.clone());
pipeline.dispatch_terrain(input.clone());
assert_eq!(pipeline.pending_terrain.len(), 1);
}
#[test]
fn cached_result_prevents_redispatch() {
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
let mut pipeline = AsyncDataPipeline::new(pool);
let tile = TileId::new(0, 0, 0);
let elev = ElevationGrid::flat(tile, 4, 4);
let input = TerrainTaskInput {
tile,
elevation_source_tile: tile,
elevation_region: TileTextureRegion::FULL,
elevation: elev,
resolution: 4,
vertical_exaggeration: 1.0,
generation: 1,
};
pipeline.dispatch_terrain(input.clone());
for _ in 0..100 {
pipeline.poll_terrain();
if !pipeline.has_pending_tasks() {
break;
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
pipeline.dispatch_terrain(input);
assert!(pipeline.pending_terrain.is_empty());
}
#[test]
fn prune_terrain_removes_stale_entries() {
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
let mut pipeline = AsyncDataPipeline::new(pool);
let tile_a = TileId::new(1, 0, 0);
let tile_b = TileId::new(1, 1, 0);
let elev_a = ElevationGrid::flat(tile_a, 4, 4);
let elev_b = ElevationGrid::flat(tile_b, 4, 4);
let mesh_a = crate::terrain::build_terrain_descriptor(&tile_a, &elev_a, 4, 1.0, 1);
let mesh_b = crate::terrain::build_terrain_descriptor(&tile_b, &elev_b, 4, 1.0, 1);
let hs_a = crate::terrain::prepare_hillshade_raster(&elev_a, 1.0, 1);
let hs_b = crate::terrain::prepare_hillshade_raster(&elev_b, 1.0, 1);
pipeline.terrain_cache.insert(
TerrainCacheKey {
tile: tile_a,
generation: 1,
resolution: 4,
},
(mesh_a, hs_a),
);
pipeline.terrain_cache.insert(
TerrainCacheKey {
tile: tile_b,
generation: 1,
resolution: 4,
},
(mesh_b, hs_b),
);
assert_eq!(pipeline.terrain_cache.len(), 2);
pipeline.prune_terrain(&[tile_a]);
assert_eq!(pipeline.terrain_cache.len(), 1);
assert!(pipeline.terrain_cache.keys().all(|k| k.tile == tile_a));
}
#[test]
fn synchronous_fallback_path_still_works() {
let mut state = crate::MapState::new();
state.set_viewport(800, 600);
state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
state.set_camera_distance(1_000.0);
state.update();
assert!(state.zoom_level() <= 22);
}
#[test]
fn layer_generation_tracking() {
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
let mut pipeline = AsyncDataPipeline::new(pool);
let gen1 = pipeline.layer_generation("layer-a", false);
let gen2 = pipeline.layer_generation("layer-a", false);
assert_eq!(gen1, gen2);
let gen3 = pipeline.layer_generation("layer-a", true);
assert!(gen3 > gen2);
let gen4 = pipeline.layer_generation("layer-a", false);
assert_eq!(gen3, gen4);
}
#[test]
fn collect_all_terrain_returns_cached_entries() {
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
let mut pipeline = AsyncDataPipeline::new(pool);
let tile = TileId::new(0, 0, 0);
let elev = ElevationGrid::flat(tile, 4, 4);
pipeline.dispatch_terrain(TerrainTaskInput {
tile,
elevation_source_tile: tile,
elevation_region: TileTextureRegion::FULL,
elevation: elev,
resolution: 4,
vertical_exaggeration: 1.0,
generation: 1,
});
for _ in 0..100 {
pipeline.poll_terrain();
if !pipeline.has_pending_tasks() {
break;
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
let (meshes, hillshades) = pipeline.collect_all_terrain();
assert_eq!(meshes.len(), 1);
assert_eq!(hillshades.len(), 1);
assert_eq!(meshes[0].tile, tile);
}
#[test]
fn async_pipeline_terrain_via_map_state() {
use crate::terrain::{FlatElevationSource, TerrainConfig};
let config = TerrainConfig {
enabled: true,
mesh_resolution: 4,
source: Box::new(FlatElevationSource::new(4, 4)),
..TerrainConfig::default()
};
let mut state = crate::MapState::with_terrain(config, 100);
state.set_viewport(800, 600);
state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
state.set_camera_distance(10_000_000.0);
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
state.set_task_pool(pool);
assert!(state.has_async_pipeline());
state.update_with_dt(1.0 / 60.0);
for _ in 0..100 {
std::thread::sleep(std::time::Duration::from_millis(10));
state.update_with_dt(1.0 / 60.0);
if !state.terrain_meshes().is_empty() {
break;
}
}
assert!(
!state.terrain_meshes().is_empty(),
"async pipeline should produce terrain meshes"
);
}
#[test]
fn async_pipeline_vectors_via_map_state() {
use crate::layers::{VectorLayer, VectorStyle};
let mut state = crate::MapState::new();
state.set_viewport(800, 600);
state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
state.set_camera_distance(1_000.0);
let fc = crate::geometry::FeatureCollection {
features: vec![Feature {
geometry: Geometry::Point(Point {
coord: GeoCoord::from_lat_lon(0.0, 0.0),
}),
properties: HashMap::new(),
}],
};
let vl = VectorLayer::new("test-vec", fc, VectorStyle::default());
state.push_layer(Box::new(vl));
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
state.set_task_pool(pool);
state.update_with_dt(1.0 / 60.0);
for _ in 0..100 {
std::thread::sleep(std::time::Duration::from_millis(10));
state.update_with_dt(1.0 / 60.0);
if !state.vector_meshes().is_empty() {
break;
}
}
assert!(
!state.vector_meshes().is_empty(),
"async pipeline should produce vector meshes"
);
}
#[test]
fn async_pipeline_clear_reverts_to_sync() {
let mut state = crate::MapState::new();
state.set_viewport(800, 600);
state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
state.set_camera_distance(1_000.0);
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
state.set_task_pool(pool);
assert!(state.has_async_pipeline());
state.clear_task_pool();
assert!(!state.has_async_pipeline());
state.update();
assert!(state.zoom_level() <= 22);
}
#[test]
fn data_update_interval_ignored_with_async_pipeline() {
use crate::terrain::{FlatElevationSource, TerrainConfig};
let config = TerrainConfig {
enabled: true,
mesh_resolution: 4,
source: Box::new(FlatElevationSource::new(4, 4)),
..TerrainConfig::default()
};
let mut state = crate::MapState::with_terrain(config, 100);
state.set_viewport(800, 600);
state.set_camera_target(GeoCoord::from_lat_lon(0.0, 0.0));
state.set_camera_distance(10_000_000.0);
state.set_data_update_interval(100.0);
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
state.set_task_pool(pool);
for _ in 0..100 {
state.update_with_dt(1.0 / 60.0);
std::thread::sleep(std::time::Duration::from_millis(10));
if !state.terrain_meshes().is_empty() {
break;
}
}
assert!(
!state.terrain_meshes().is_empty(),
"async pipeline should produce terrain despite high throttle"
);
}
#[test]
fn partition_features_by_tile_separates_spatially() {
let features = FeatureCollection {
features: vec![
Feature {
geometry: Geometry::Point(Point {
coord: GeoCoord::from_lat_lon(10.0, 10.0),
}),
properties: HashMap::new(),
},
Feature {
geometry: Geometry::Point(Point {
coord: GeoCoord::from_lat_lon(-30.0, -60.0),
}),
properties: HashMap::new(),
},
Feature {
geometry: Geometry::Point(Point {
coord: GeoCoord::from_lat_lon(10.01, 10.01),
}),
properties: HashMap::new(),
},
],
};
let buckets = partition_features_by_tile(&features, 4);
let total: usize = buckets.values().map(|fc| fc.len()).sum();
assert_eq!(total, 3, "all features should be partitioned");
let tile_a = geo_to_tile(&GeoCoord::from_lat_lon(10.0, 10.0), 4).tile_id();
let tile_c = geo_to_tile(&GeoCoord::from_lat_lon(10.01, 10.01), 4).tile_id();
assert_eq!(tile_a, tile_c);
assert_eq!(buckets.get(&tile_a).map(|fc| fc.len()), Some(2));
}
#[test]
fn partition_features_handles_linestring_and_polygon() {
let features = FeatureCollection {
features: vec![
Feature {
geometry: Geometry::LineString(LineString {
coords: vec![
GeoCoord::from_lat_lon(45.0, 90.0),
GeoCoord::from_lat_lon(46.0, 91.0),
],
}),
properties: HashMap::new(),
},
Feature {
geometry: Geometry::Polygon(Polygon {
exterior: vec![
GeoCoord::from_lat_lon(-10.0, -20.0),
GeoCoord::from_lat_lon(-10.0, -19.0),
GeoCoord::from_lat_lon(-9.0, -19.0),
],
interiors: vec![],
}),
properties: HashMap::new(),
},
],
};
let buckets = partition_features_by_tile(&features, 6);
let total: usize = buckets.values().map(|fc| fc.len()).sum();
assert_eq!(total, 2);
}
#[test]
fn partition_features_drops_empty_geometry() {
let features = FeatureCollection {
features: vec![Feature {
geometry: Geometry::GeometryCollection(vec![]),
properties: HashMap::new(),
}],
};
let buckets = partition_features_by_tile(&features, 4);
assert!(
buckets.is_empty(),
"empty geometry should not produce a bucket"
);
}
#[test]
fn dispatch_and_poll_vector_buckets() {
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
let mut pipeline = AsyncDataPipeline::new(pool);
let tile_a = geo_to_tile(&GeoCoord::from_lat_lon(10.0, 10.0), 4).tile_id();
let tile_b = geo_to_tile(&GeoCoord::from_lat_lon(-30.0, -60.0), 4).tile_id();
let features = FeatureCollection {
features: vec![
Feature {
geometry: Geometry::Point(Point {
coord: GeoCoord::from_lat_lon(10.0, 10.0),
}),
properties: HashMap::new(),
},
Feature {
geometry: Geometry::Point(Point {
coord: GeoCoord::from_lat_lon(-30.0, -60.0),
}),
properties: HashMap::new(),
},
],
};
let tile_features = partition_features_by_tile(&features, 4);
assert!(tile_features.len() >= 2);
pipeline.dispatch_vector_buckets(
"test-layer",
1,
CameraProjection::WebMercator,
&VectorStyle::default(),
&tile_features,
None,
None,
&HashMap::new(),
&[],
);
for _ in 0..100 {
pipeline.poll_vector_buckets();
if !pipeline.has_pending_tasks() {
break;
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
let (meshes_a, _) = pipeline.collect_vector_buckets(
"test-layer",
1,
CameraProjection::WebMercator,
&[tile_a],
);
assert_eq!(meshes_a.len(), 1, "should have one mesh for tile_a");
let (meshes_b, _) = pipeline.collect_vector_buckets(
"test-layer",
1,
CameraProjection::WebMercator,
&[tile_b],
);
assert_eq!(meshes_b.len(), 1, "should have one mesh for tile_b");
let (meshes_both, _) = pipeline.collect_vector_buckets(
"test-layer",
1,
CameraProjection::WebMercator,
&[tile_a, tile_b],
);
assert_eq!(meshes_both.len(), 2, "should have meshes for both tiles");
}
#[test]
fn bucket_dispatch_deduplicates() {
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
let mut pipeline = AsyncDataPipeline::new(pool);
let features = FeatureCollection {
features: vec![Feature {
geometry: Geometry::Point(Point {
coord: GeoCoord::from_lat_lon(10.0, 10.0),
}),
properties: HashMap::new(),
}],
};
let tile_features = partition_features_by_tile(&features, 4);
pipeline.dispatch_vector_buckets(
"layer",
1,
CameraProjection::WebMercator,
&VectorStyle::default(),
&tile_features,
None,
None,
&HashMap::new(),
&[],
);
let first_count = pipeline.pending_buckets.len();
pipeline.dispatch_vector_buckets(
"layer",
1,
CameraProjection::WebMercator,
&VectorStyle::default(),
&tile_features,
None,
None,
&HashMap::new(),
&[],
);
assert_eq!(
pipeline.pending_buckets.len(),
first_count,
"duplicate dispatch should be deduped"
);
}
#[test]
fn bucket_cache_hit_prevents_redispatch() {
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
let mut pipeline = AsyncDataPipeline::new(pool);
let features = FeatureCollection {
features: vec![Feature {
geometry: Geometry::Point(Point {
coord: GeoCoord::from_lat_lon(10.0, 10.0),
}),
properties: HashMap::new(),
}],
};
let tile_features = partition_features_by_tile(&features, 4);
pipeline.dispatch_vector_buckets(
"layer",
1,
CameraProjection::WebMercator,
&VectorStyle::default(),
&tile_features,
None,
None,
&HashMap::new(),
&[],
);
for _ in 0..100 {
pipeline.poll_vector_buckets();
if !pipeline.has_pending_tasks() {
break;
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
assert!(pipeline.bucket_cache_len() > 0);
pipeline.dispatch_vector_buckets(
"layer",
1,
CameraProjection::WebMercator,
&VectorStyle::default(),
&tile_features,
None,
None,
&HashMap::new(),
&[],
);
assert!(
pipeline.pending_buckets.is_empty(),
"cached result should prevent redispatch"
);
}
#[test]
fn prune_vector_buckets_removes_inactive_layers() {
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
let mut pipeline = AsyncDataPipeline::new(pool);
let tile = TileId::new(4, 0, 0);
let key_a = VectorBucketKey {
layer_id: "keep".into(),
tile,
data_generation: 1,
projection: CameraProjection::WebMercator,
};
let key_b = VectorBucketKey {
layer_id: "remove".into(),
tile,
data_generation: 1,
projection: CameraProjection::WebMercator,
};
pipeline
.bucket_cache
.insert(key_a, (VectorMeshData::default(), vec![]));
pipeline
.bucket_cache
.insert(key_b, (VectorMeshData::default(), vec![]));
assert_eq!(pipeline.bucket_cache_len(), 2);
pipeline.prune_vector_buckets(&["keep".to_string()]);
assert_eq!(pipeline.bucket_cache_len(), 1);
assert!(pipeline.bucket_cache.keys().all(|k| k.layer_id == "keep"));
}
#[test]
fn prune_vector_buckets_for_layer_removes_offscreen_tiles() {
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
let mut pipeline = AsyncDataPipeline::new(pool);
let tile_a = TileId::new(4, 0, 0);
let tile_b = TileId::new(4, 1, 0);
let tile_c = TileId::new(4, 2, 0);
for tile in &[tile_a, tile_b, tile_c] {
pipeline.bucket_cache.insert(
VectorBucketKey {
layer_id: "layer".into(),
tile: *tile,
data_generation: 1,
projection: CameraProjection::WebMercator,
},
(VectorMeshData::default(), vec![]),
);
}
assert_eq!(pipeline.bucket_cache_len(), 3);
pipeline.prune_vector_buckets_for_layer("layer", &[tile_a, tile_c]);
assert_eq!(pipeline.bucket_cache_len(), 2);
assert!(pipeline
.bucket_cache
.keys()
.all(|k| k.tile == tile_a || k.tile == tile_c));
}
#[test]
fn collect_vector_buckets_only_returns_visible() {
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
let mut pipeline = AsyncDataPipeline::new(pool);
let tile_visible = TileId::new(4, 5, 5);
let tile_offscreen = TileId::new(4, 10, 10);
let mesh = VectorMeshData {
positions: vec![[0.0, 0.0, 0.0], [1.0, 0.0, 0.0], [0.0, 1.0, 0.0]],
colors: vec![[1.0; 4]; 3],
indices: vec![0, 1, 2],
..Default::default()
};
for tile in &[tile_visible, tile_offscreen] {
pipeline.bucket_cache.insert(
VectorBucketKey {
layer_id: "layer".into(),
tile: *tile,
data_generation: 1,
projection: CameraProjection::WebMercator,
},
(mesh.clone(), vec![]),
);
}
let (meshes, _) = pipeline.collect_vector_buckets(
"layer",
1,
CameraProjection::WebMercator,
&[tile_visible],
);
assert_eq!(
meshes.len(),
1,
"only visible tile bucket should be collected"
);
}
#[test]
fn representative_coord_all_geometry_types() {
let p = Geometry::Point(Point {
coord: GeoCoord::from_lat_lon(1.0, 2.0),
});
assert_eq!(
representative_coord(&p).map(|c| (c.lat, c.lon)),
Some((1.0, 2.0))
);
let ls = Geometry::LineString(LineString {
coords: vec![
GeoCoord::from_lat_lon(3.0, 4.0),
GeoCoord::from_lat_lon(5.0, 6.0),
],
});
assert_eq!(
representative_coord(&ls).map(|c| (c.lat, c.lon)),
Some((3.0, 4.0))
);
let poly = Geometry::Polygon(Polygon {
exterior: vec![
GeoCoord::from_lat_lon(7.0, 8.0),
GeoCoord::from_lat_lon(7.0, 9.0),
GeoCoord::from_lat_lon(8.0, 9.0),
],
interiors: vec![],
});
assert_eq!(
representative_coord(&poly).map(|c| (c.lat, c.lon)),
Some((7.0, 8.0))
);
let gc = Geometry::GeometryCollection(vec![]);
assert!(representative_coord(&gc).is_none());
let gc_with = Geometry::GeometryCollection(vec![p.clone()]);
assert!(representative_coord(&gc_with).is_some());
}
#[test]
fn dispatch_and_poll_decode_produces_decoded_tile() {
use crate::tile_source::{RawVectorPayload, TileFreshness};
use std::sync::Arc;
fn build_test_mvt() -> Vec<u8> {
fn encode_varint(mut val: u64) -> Vec<u8> {
let mut buf = Vec::new();
loop {
let mut byte = (val & 0x7F) as u8;
val >>= 7;
if val != 0 {
byte |= 0x80;
}
buf.push(byte);
if val == 0 {
break;
}
}
buf
}
fn encode_tag(field: u32, wt: u8) -> Vec<u8> {
encode_varint(((field as u64) << 3) | wt as u64)
}
fn encode_ld(field: u32, data: &[u8]) -> Vec<u8> {
let mut b = encode_tag(field, 2);
b.extend(encode_varint(data.len() as u64));
b.extend_from_slice(data);
b
}
fn encode_vi(field: u32, val: u64) -> Vec<u8> {
let mut b = encode_tag(field, 0);
b.extend(encode_varint(val));
b
}
fn zigzag(n: i32) -> u32 {
((n << 1) ^ (n >> 31)) as u32
}
let mut geom = Vec::new();
geom.extend(encode_varint(((1u64) << 3) | 1));
geom.extend(encode_varint(zigzag(2048) as u64));
geom.extend(encode_varint(zigzag(2048) as u64));
let mut feat = Vec::new();
feat.extend(encode_vi(3, 1));
feat.extend(encode_ld(4, &geom));
let mut layer = Vec::new();
layer.extend(encode_ld(1, b"test"));
layer.extend(encode_ld(2, &feat));
layer.extend(encode_vi(5, 4096));
layer.extend(encode_vi(15, 2));
encode_ld(3, &layer)
}
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
let mut pipeline = AsyncDataPipeline::new(pool);
let tile = TileId::new(0, 0, 0);
let raw = RawVectorPayload {
tile_id: tile,
bytes: Arc::new(build_test_mvt()),
decode_options: crate::mvt::MvtDecodeOptions::default(),
};
pipeline.dispatch_decode(tile, &raw, TileFreshness::default());
for _ in 0..50 {
pipeline.poll_decodes();
if !pipeline.decoded_tiles.is_empty() {
break;
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
let decoded = pipeline.take_decoded_tiles();
assert_eq!(decoded.len(), 1, "should have one decoded tile");
let (decoded_id, response) = &decoded[0];
assert_eq!(*decoded_id, tile);
assert!(response.data.is_vector(), "decoded data should be Vector");
let vt = response.data.as_vector().expect("should be Vector");
assert!(vt.layers.contains_key("test"));
assert_eq!(vt.layers["test"].len(), 1);
}
#[test]
fn dispatch_decode_deduplicates() {
use crate::tile_source::{RawVectorPayload, TileFreshness};
use std::sync::Arc;
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
let mut pipeline = AsyncDataPipeline::new(pool);
let tile = TileId::new(0, 0, 0);
let raw = RawVectorPayload {
tile_id: tile,
bytes: Arc::new(vec![]),
decode_options: crate::mvt::MvtDecodeOptions::default(),
};
pipeline.dispatch_decode(tile, &raw, TileFreshness::default());
pipeline.dispatch_decode(tile, &raw, TileFreshness::default());
assert_eq!(pipeline.pending_decodes.len(), 1);
}
#[test]
fn async_viz_pipeline_dispatches_and_receives_scalar_update() {
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
let mut viz = AsyncVisualizationPipeline::new(pool);
viz.dispatch(Box::new(|| VisualizationTaskOutput::ScalarFieldUpdate {
layer_name: "density".to_string(),
values: vec![1.0, 2.0, 3.0, 4.0],
}));
assert_eq!(viz.pending_count(), 1);
let mut results = Vec::new();
for _ in 0..200 {
results = viz.poll();
if !results.is_empty() {
break;
}
std::thread::sleep(std::time::Duration::from_millis(5));
}
assert_eq!(results.len(), 1);
match &results[0] {
VisualizationTaskOutput::ScalarFieldUpdate { layer_name, values } => {
assert_eq!(layer_name, "density");
assert_eq!(values, &[1.0, 2.0, 3.0, 4.0]);
}
_ => panic!("expected ScalarFieldUpdate"),
}
assert_eq!(viz.pending_count(), 0);
}
#[test]
fn async_viz_pipeline_dispatches_and_receives_column_update() {
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
let mut viz = AsyncVisualizationPipeline::new(pool);
viz.dispatch(Box::new(|| VisualizationTaskOutput::ColumnUpdate {
layer_name: "bars".to_string(),
columns: crate::visualization::ColumnInstanceSet::new(vec![
crate::visualization::ColumnInstance::new(
GeoCoord::from_lat_lon(0.0, 0.0),
100.0,
10.0,
),
]),
ramp: crate::visualization::ColorRamp::new(vec![
crate::visualization::ColorStop {
value: 0.0,
color: [0.0, 0.0, 1.0, 1.0],
},
crate::visualization::ColorStop {
value: 1.0,
color: [1.0, 0.0, 0.0, 1.0],
},
]),
}));
let mut results = Vec::new();
for _ in 0..200 {
results = viz.poll();
if !results.is_empty() {
break;
}
std::thread::sleep(std::time::Duration::from_millis(5));
}
assert_eq!(results.len(), 1);
match &results[0] {
VisualizationTaskOutput::ColumnUpdate {
layer_name,
columns,
..
} => {
assert_eq!(layer_name, "bars");
assert_eq!(columns.columns.len(), 1);
}
_ => panic!("expected ColumnUpdate"),
}
}
#[test]
fn async_viz_pipeline_dispatches_and_receives_point_cloud_update() {
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
let mut viz = AsyncVisualizationPipeline::new(pool);
viz.dispatch(Box::new(|| VisualizationTaskOutput::PointCloudUpdate {
layer_name: "scatter".to_string(),
points: crate::visualization::PointInstanceSet::new(vec![
crate::visualization::PointInstance::new(GeoCoord::from_lat_lon(1.0, 2.0), 5.0),
crate::visualization::PointInstance::new(GeoCoord::from_lat_lon(3.0, 4.0), 8.0),
]),
ramp: crate::visualization::ColorRamp::new(vec![
crate::visualization::ColorStop {
value: 0.0,
color: [0.0, 1.0, 0.0, 1.0],
},
crate::visualization::ColorStop {
value: 1.0,
color: [1.0, 1.0, 0.0, 1.0],
},
]),
}));
let mut results = Vec::new();
for _ in 0..200 {
results = viz.poll();
if !results.is_empty() {
break;
}
std::thread::sleep(std::time::Duration::from_millis(5));
}
assert_eq!(results.len(), 1);
match &results[0] {
VisualizationTaskOutput::PointCloudUpdate {
layer_name, points, ..
} => {
assert_eq!(layer_name, "scatter");
assert_eq!(points.points.len(), 2);
}
_ => panic!("expected PointCloudUpdate"),
}
}
#[test]
fn async_viz_pipeline_multiple_tasks_complete_independently() {
let pool: Arc<dyn DataTaskPool> = Arc::new(ThreadDataTaskPool::new());
let mut viz = AsyncVisualizationPipeline::new(pool);
for i in 0..5 {
let name = format!("layer_{i}");
viz.dispatch(Box::new(move || {
VisualizationTaskOutput::ScalarFieldUpdate {
layer_name: name,
values: vec![i as f32; 4],
}
}));
}
assert_eq!(viz.pending_count(), 5);
let mut all_results = Vec::new();
for _ in 0..200 {
let batch = viz.poll();
all_results.extend(batch);
if all_results.len() >= 5 {
break;
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
assert_eq!(all_results.len(), 5);
assert_eq!(viz.pending_count(), 0);
}
}