use arrow_array::{ArrayRef, RecordBatch, RecordBatchReader};
use arrow_array::{BinaryArray, BinaryViewArray};
use arrow_array::{Float64Array, Int32Array};
use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef};
use datafusion_common::{exec_datafusion_err, plan_err, DataFusionError, Result};
use geo_types::{
Coord, Geometry, GeometryCollection, LineString, MultiLineString, MultiPoint, MultiPolygon,
Point, Polygon, Rect,
};
use rand::{distr::Uniform, rngs::StdRng, Rng, RngExt, SeedableRng};
use sedona_common::sedona_internal_err;
use sedona_geometry::types::GeometryTypeId;
use sedona_schema::datatypes::{SedonaType, WKB_GEOMETRY};
use std::f64::consts::PI;
use std::sync::Arc;
use wkb::writer::WriteOptions;
use wkb::Endianness;
#[derive(Debug, Clone)]
pub struct RandomPartitionedDataBuilder {
pub seed: u64,
pub num_partitions: usize,
pub batches_per_partition: usize,
pub rows_per_batch: usize,
sedona_type: SedonaType,
null_rate: f64,
options: RandomGeometryOptions,
}
impl Default for RandomPartitionedDataBuilder {
fn default() -> Self {
let options = RandomGeometryOptions::new();
Self {
seed: 42,
num_partitions: 1,
batches_per_partition: 1,
rows_per_batch: 10,
sedona_type: WKB_GEOMETRY,
null_rate: 0.0,
options,
}
}
}
impl RandomPartitionedDataBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn seed(mut self, seed: u64) -> Self {
self.seed = seed;
self
}
pub fn num_partitions(mut self, num_partitions: usize) -> Self {
self.num_partitions = num_partitions;
self
}
pub fn batches_per_partition(mut self, batches_per_partition: usize) -> Self {
self.batches_per_partition = batches_per_partition;
self
}
pub fn rows_per_batch(mut self, rows_per_batch: usize) -> Self {
self.rows_per_batch = rows_per_batch;
self
}
pub fn geometry_type(mut self, geom_type: GeometryTypeId) -> Self {
self.options.geom_type = geom_type;
self
}
pub fn sedona_type(mut self, sedona_type: SedonaType) -> Self {
self.sedona_type = sedona_type;
self
}
pub fn bounds(mut self, bounds: Rect) -> Self {
self.options.bounds = bounds;
self
}
pub fn size_range(mut self, size_range: (f64, f64)) -> Self {
self.options.size_range = size_range;
self
}
pub fn null_rate(mut self, null_rate: f64) -> Self {
self.null_rate = null_rate;
self
}
pub fn empty_rate(mut self, empty_rate: f64) -> Self {
self.options.empty_rate = empty_rate;
self
}
pub fn vertices_per_linestring_range(
mut self,
vertices_per_linestring_range: (usize, usize),
) -> Self {
self.options.vertices_per_linestring_range = vertices_per_linestring_range;
self
}
pub fn num_parts_range(mut self, num_parts_range: (usize, usize)) -> Self {
self.options.num_parts_range = num_parts_range;
self
}
pub fn polygon_hole_rate(mut self, polygon_hole_rate: f64) -> Self {
self.options.polygon_hole_rate = polygon_hole_rate;
self
}
pub fn schema(&self) -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("dist", DataType::Float64, false),
self.sedona_type.to_storage_field("geometry", true).unwrap(),
]))
}
pub fn build(&self) -> Result<(SchemaRef, Vec<Vec<RecordBatch>>)> {
let schema = self.schema();
let mut result = Vec::with_capacity(self.num_partitions);
for partition_idx in 0..self.num_partitions {
let rng = Self::default_rng(self.seed + partition_idx as u64);
let partition_batches = self
.partition_reader(rng, partition_idx)
.collect::<Result<Vec<_>, ArrowError>>()?;
result.push(partition_batches);
}
Ok((schema, result))
}
pub fn validate(&self) -> Result<()> {
self.options.validate()?;
if self.null_rate < 0.0 || self.null_rate > 1.0 {
return plan_err!(
"Expected null_rate between 0.0 and 1.0 but got {}",
self.null_rate
);
}
if self.rows_per_batch == 0 {
return plan_err!("Expected rows_per_batch > 0 but got 0");
}
if self.num_partitions == 0 {
return plan_err!("Expected num_partitions > 0 but got 0");
}
Ok(())
}
pub fn default_rng(seed: u64) -> impl Rng {
StdRng::seed_from_u64(seed)
}
pub fn partition_reader<R: Rng + Send + 'static>(
&self,
rng: R,
partition_idx: usize,
) -> Box<dyn RecordBatchReader + Send> {
let reader = RandomPartitionedDataReader {
builder: self.clone(),
schema: self.schema(),
partition_idx,
batch_idx: 0,
rng,
};
Box::new(reader)
}
fn generate_batch<R: Rng>(
&self,
rng: &mut R,
schema: &SchemaRef,
partition_idx: usize,
batch_idx: usize,
) -> Result<RecordBatch> {
self.validate()?;
let id_start =
(partition_idx * self.batches_per_partition + batch_idx) * self.rows_per_batch;
let ids: Vec<i32> = (0..self.rows_per_batch)
.map(|i| (id_start + i) as i32)
.collect();
let max_dist = self
.options
.bounds
.width()
.min(self.options.bounds.height());
let distance_dist = Uniform::new(0.0, max_dist).expect("valid input to Uniform::new()");
let distances: Vec<f64> = (0..self.rows_per_batch)
.map(|_| rng.sample(distance_dist))
.collect();
let wkb_geometries = (0..self.rows_per_batch)
.map(|_| -> Result<Option<Vec<u8>>> {
if rng.random_bool(self.null_rate) {
Ok(None)
} else {
Ok(Some(generate_random_wkb(rng, &self.options)?))
}
})
.collect::<Result<Vec<Option<Vec<u8>>>>>()?;
let id_array = Arc::new(Int32Array::from(ids));
let dist_array = Arc::new(Float64Array::from(distances));
let geometry_array = create_wkb_array(wkb_geometries, &self.sedona_type)?;
Ok(RecordBatch::try_new(
schema.clone(),
vec![id_array, dist_array, geometry_array],
)?)
}
}
fn create_wkb_array(
wkb_values: Vec<Option<Vec<u8>>>,
sedona_type: &SedonaType,
) -> Result<ArrayRef> {
match sedona_type {
SedonaType::Wkb(_, _) => Ok(Arc::new(BinaryArray::from_iter(wkb_values))),
SedonaType::WkbView(_, _) => Ok(Arc::new(BinaryViewArray::from_iter(wkb_values))),
_ => sedona_internal_err!("create_wkb_array not implemented for {sedona_type:?}"),
}
}
struct RandomPartitionedDataReader<R> {
builder: RandomPartitionedDataBuilder,
schema: SchemaRef,
partition_idx: usize,
batch_idx: usize,
rng: R,
}
impl<R: Rng> RecordBatchReader for RandomPartitionedDataReader<R> {
fn schema(&self) -> SchemaRef {
self.builder.schema()
}
}
impl<R: Rng> Iterator for RandomPartitionedDataReader<R> {
type Item = std::result::Result<RecordBatch, ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
if self.batch_idx == self.builder.batches_per_partition {
return None;
}
let maybe_batch = self
.builder
.generate_batch(
&mut self.rng,
&self.schema,
self.partition_idx,
self.batch_idx,
)
.map_err(|e| ArrowError::ExternalError(Box::new(e)));
self.batch_idx += 1;
Some(maybe_batch)
}
}
#[derive(Debug, Clone)]
struct RandomGeometryOptions {
geom_type: GeometryTypeId,
bounds: Rect,
size_range: (f64, f64),
vertices_per_linestring_range: (usize, usize),
empty_rate: f64,
polygon_hole_rate: f64,
num_parts_range: (usize, usize),
}
impl RandomGeometryOptions {
fn new() -> Self {
Self {
geom_type: GeometryTypeId::Point,
empty_rate: 0.0,
bounds: Rect::new(Coord { x: 0.0, y: 0.0 }, Coord { x: 100.0, y: 100.0 }),
size_range: (1.0, 10.0),
vertices_per_linestring_range: (4, 4),
polygon_hole_rate: 0.0,
num_parts_range: (1, 3),
}
}
fn validate(&self) -> Result<()> {
if self.bounds.width() <= 0.0 || self.bounds.height() <= 0.0 {
return plan_err!("Expected valid bounds but got {:?}", self.bounds);
}
if self.size_range.0 <= 0.0 || self.size_range.0 > self.size_range.1 {
return plan_err!("Expected valid size_range but got {:?}", self.size_range);
}
if self.vertices_per_linestring_range.0 == 0
|| self.vertices_per_linestring_range.0 > self.vertices_per_linestring_range.1
{
return plan_err!(
"Expected valid vertices_per_linestring_range but got {:?}",
self.vertices_per_linestring_range
);
}
if !(0.0..=1.0).contains(&self.empty_rate) {
return plan_err!(
"Expected empty_rate between 0.0 and 1.0 but got {}",
self.empty_rate
);
}
if !(0.0..=1.0).contains(&self.polygon_hole_rate) {
return plan_err!(
"Expected polygon_hole_rate between 0.0 and 1.0 but got {}",
self.polygon_hole_rate
);
}
if self.num_parts_range.0 == 0 || self.num_parts_range.0 > self.num_parts_range.1 {
return plan_err!(
"Expected valid num_parts_range but got {:?}",
self.num_parts_range
);
}
Ok(())
}
}
impl Default for RandomGeometryOptions {
fn default() -> Self {
Self::new()
}
}
fn generate_random_wkb<R: rand::Rng>(
rng: &mut R,
options: &RandomGeometryOptions,
) -> Result<Vec<u8>> {
let geometry = generate_random_geometry(rng, options)?;
let mut out: Vec<u8> = vec![];
wkb::writer::write_geometry(
&mut out,
&geometry,
&WriteOptions {
endianness: Endianness::LittleEndian,
},
)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Ok(out)
}
fn generate_random_geometry<R: rand::Rng>(
rng: &mut R,
options: &RandomGeometryOptions,
) -> Result<Geometry> {
Ok(match options.geom_type {
GeometryTypeId::Point => Geometry::Point(generate_random_point(rng, options)?),
GeometryTypeId::LineString => {
Geometry::LineString(generate_random_linestring(rng, options)?)
}
GeometryTypeId::Polygon => Geometry::Polygon(generate_random_polygon(rng, options)?),
GeometryTypeId::MultiPoint => {
Geometry::MultiPoint(generate_random_multipoint(rng, options)?)
}
GeometryTypeId::MultiLineString => {
Geometry::MultiLineString(generate_random_multilinestring(rng, options)?)
}
GeometryTypeId::MultiPolygon => {
Geometry::MultiPolygon(generate_random_multipolygon(rng, options)?)
}
GeometryTypeId::GeometryCollection => {
Geometry::GeometryCollection(generate_random_geometrycollection(rng, options)?)
}
GeometryTypeId::Geometry => {
let mut copy_options = options.clone();
copy_options.geom_type = pick_random_geometry_type(rng);
generate_random_geometry(rng, ©_options)?
}
})
}
fn generate_random_point<R: rand::Rng>(
rng: &mut R,
options: &RandomGeometryOptions,
) -> Result<Point> {
if rng.random_bool(options.empty_rate) {
Ok(Point::new(f64::NAN, f64::NAN))
} else {
let x_dist = Uniform::new(options.bounds.min().x, options.bounds.max().x)
.map_err(|e| exec_datafusion_err!("Invalid x bounds for random point: {e}"))?;
let y_dist = Uniform::new(options.bounds.min().y, options.bounds.max().y)
.map_err(|e| exec_datafusion_err!("Invalid y bounds for random point: {e}"))?;
let x = rng.sample(x_dist);
let y = rng.sample(y_dist);
Ok(Point::new(x, y))
}
}
fn generate_random_linestring<R: rand::Rng>(
rng: &mut R,
options: &RandomGeometryOptions,
) -> Result<LineString> {
if rng.random_bool(options.empty_rate) {
Ok(LineString::new(vec![]))
} else {
let (center_x, center_y, half_size) = generate_random_circle(rng, options)?;
let vertices_dist = Uniform::new_inclusive(
options.vertices_per_linestring_range.0,
options.vertices_per_linestring_range.1,
)
.map_err(|e| exec_datafusion_err!("Invalid vertex count range for linestring: {e}"))?;
let num_vertices = rng.sample(vertices_dist).max(2);
let angle = rng.random_range(0.0..(2.0 * PI));
let coords =
generate_circular_vertices(angle, center_x, center_y, half_size, num_vertices, false)?;
Ok(LineString::from(coords))
}
}
fn generate_random_polygon<R: rand::Rng>(
rng: &mut R,
options: &RandomGeometryOptions,
) -> Result<Polygon> {
if rng.random_bool(options.empty_rate) {
Ok(Polygon::new(LineString::new(vec![]), vec![]))
} else {
let (center_x, center_y, half_size) = generate_random_circle(rng, options)?;
let vertices_dist = Uniform::new_inclusive(
options.vertices_per_linestring_range.0,
options.vertices_per_linestring_range.1,
)
.map_err(|e| exec_datafusion_err!("Invalid vertex count range for polygon: {e}"))?;
let num_vertices = rng.sample(vertices_dist).max(3);
let angle = rng.random_range(0.0..=(2.0 * PI));
let coords =
generate_circular_vertices(angle, center_x, center_y, half_size, num_vertices, true)?;
let shell = LineString::from(coords);
let mut holes = Vec::new();
let add_hole = rng.random_bool(options.polygon_hole_rate);
let hole_scale_factor = rng.random_range(0.1..0.5);
if add_hole {
let new_size = half_size * hole_scale_factor;
let mut coords = generate_circular_vertices(
angle,
center_x,
center_y,
new_size,
num_vertices,
true,
)?;
coords.reverse();
holes.push(LineString::from(coords));
}
Ok(Polygon::new(shell, holes))
}
}
fn generate_random_multipoint<R: rand::Rng>(
rng: &mut R,
options: &RandomGeometryOptions,
) -> Result<MultiPoint> {
if rng.random_bool(options.empty_rate) {
Ok(MultiPoint::new(vec![]))
} else {
let children = generate_random_children(rng, options, generate_random_point)?;
Ok(MultiPoint::new(children))
}
}
fn generate_random_multilinestring<R: rand::Rng>(
rng: &mut R,
options: &RandomGeometryOptions,
) -> Result<MultiLineString> {
if rng.random_bool(options.empty_rate) {
Ok(MultiLineString::new(vec![]))
} else {
let children = generate_random_children(rng, options, generate_random_linestring)?;
Ok(MultiLineString::new(children))
}
}
fn generate_random_multipolygon<R: rand::Rng>(
rng: &mut R,
options: &RandomGeometryOptions,
) -> Result<MultiPolygon> {
if rng.random_bool(options.empty_rate) {
Ok(MultiPolygon::new(vec![]))
} else {
let children = generate_random_children(rng, options, generate_random_polygon)?;
Ok(MultiPolygon::new(children))
}
}
fn generate_random_geometrycollection<R: rand::Rng>(
rng: &mut R,
options: &RandomGeometryOptions,
) -> Result<GeometryCollection> {
if rng.random_bool(options.empty_rate) {
Ok(GeometryCollection::new_from(vec![]))
} else {
let children = generate_random_children(rng, options, generate_random_geometry)?;
Ok(GeometryCollection::new_from(children))
}
}
fn generate_random_children<R: Rng, T, F: Fn(&mut R, &RandomGeometryOptions) -> Result<T>>(
rng: &mut R,
options: &RandomGeometryOptions,
func: F,
) -> Result<Vec<T>> {
let num_parts_dist =
Uniform::new_inclusive(options.num_parts_range.0, options.num_parts_range.1)
.map_err(|e| exec_datafusion_err!("Invalid part count range: {e}"))?;
let num_parts = rng.sample(num_parts_dist);
let (center_x, center_y, half_width) = generate_random_circle(rng, options)?;
let feature_bounds = Rect::new(
Coord {
x: center_x - half_width,
y: center_y - half_width,
},
Coord {
x: center_x + half_width,
y: center_y + half_width,
},
);
let child_bounds = generate_non_overlapping_sub_rectangles(num_parts, &feature_bounds);
let mut child_options = options.clone();
child_options.empty_rate = 0.0;
let mut children = Vec::new();
for bounds in child_bounds {
child_options.bounds = bounds;
let child_size = bounds.height().min(bounds.width());
child_options.size_range = (child_size * 0.9, child_size);
if options.geom_type == GeometryTypeId::GeometryCollection {
child_options.geom_type = pick_random_geometry_type(rng);
}
children.push(func(rng, &child_options)?);
}
Ok(children)
}
fn pick_random_geometry_type<R: Rng>(rng: &mut R) -> GeometryTypeId {
[
GeometryTypeId::Point,
GeometryTypeId::LineString,
GeometryTypeId::Polygon,
GeometryTypeId::MultiPoint,
GeometryTypeId::MultiLineString,
GeometryTypeId::MultiPolygon,
][rng.random_range(0..6)]
}
fn generate_random_circle<R: rand::Rng>(
rng: &mut R,
options: &RandomGeometryOptions,
) -> Result<(f64, f64, f64)> {
let size_dist = Uniform::new_inclusive(options.size_range.0, options.size_range.1)
.map_err(|e| exec_datafusion_err!("Invalid size range for random region: {e}"))?;
let size = rng.sample(size_dist);
let half_size = size / 2.0;
let height = options.bounds.height();
let width = options.bounds.width();
let center_x = if width >= size {
let center_x_dist = Uniform::new(
options.bounds.min().x + half_size,
options.bounds.max().x - half_size,
)
.map_err(|e| exec_datafusion_err!("Invalid x bounds for random circle center: {e}"))?;
rng.sample(center_x_dist)
} else {
options.bounds.min().x + width / 2.0
};
let center_y = if height >= size {
let center_y_dist = Uniform::new(
options.bounds.min().y + half_size,
options.bounds.max().y - half_size,
)
.map_err(|e| exec_datafusion_err!("Invalid y bounds for random circle center: {e}"))?;
rng.sample(center_y_dist)
} else {
options.bounds.min().y + height / 2.0
};
Ok((
center_x,
center_y,
half_size.min(height / 2.0).min(width / 2.0),
))
}
fn generate_non_overlapping_sub_rectangles(num_parts: usize, bounds: &Rect) -> Vec<Rect> {
let mut tiles = vec![*bounds];
let mut n = 0;
while tiles.len() < num_parts {
let (largest_idx, _) = tiles
.iter()
.enumerate()
.map(|(i, rect)| (i, rect.height() * rect.width()))
.max_by(|(_, a1), (_, a2)| a1.partial_cmp(a2).unwrap())
.unwrap_or((0, 0.0));
let new_rects = if (n % 2) == 0 {
tiles[largest_idx].split_x()
} else {
tiles[largest_idx].split_y()
};
tiles.remove(largest_idx);
tiles.insert(largest_idx, new_rects[0]);
tiles.insert(largest_idx, new_rects[1]);
n += 1;
}
tiles
}
fn generate_circular_vertices(
mut angle: f64,
center_x: f64,
center_y: f64,
radius: f64,
num_vertices: usize,
closed: bool,
) -> Result<Vec<Coord>> {
let mut out = Vec::new();
let dangle = 2.0 * PI / (num_vertices as f64).max(3.0);
for _ in 0..num_vertices {
out.push(Coord {
x: angle.cos() * radius + center_x,
y: angle.sin() * radius + center_y,
});
angle += dangle;
}
if closed {
out.push(out[0]);
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_schema::DataType;
use geo_traits::{MultiLineStringTrait, MultiPolygonTrait};
use geo_types::Coord;
use rand::rngs::StdRng;
use rand::SeedableRng;
use rstest::rstest;
use sedona_geometry::{
analyze::analyze_geometry, bounds::wkb_bounds_xy, interval::IntervalTrait,
};
#[test]
fn test_generate_random_geometry_produces_valid_wkb() {
let bounds = Rect::new(Coord { x: 10.0, y: 10.0 }, Coord { x: 90.0, y: 90.0 });
let size_range = (1.0, 10.0);
let test_cases = vec![
(GeometryTypeId::Point, 42, 100, 20, 50), (GeometryTypeId::Polygon, 123, 50, 80, 200),
];
for (geom_type, seed, iterations, min_size, max_size) in test_cases {
let mut rng = StdRng::seed_from_u64(seed);
let options = RandomGeometryOptions {
geom_type,
bounds,
size_range,
..Default::default()
};
for _ in 0..iterations {
let wkb_bytes = generate_random_wkb(&mut rng, &options).unwrap();
assert!(!wkb_bytes.is_empty());
assert!(
wkb_bytes.len() >= min_size,
"WKB size {} is smaller than expected minimum {} for {:?}",
wkb_bytes.len(),
min_size,
geom_type
);
assert!(
wkb_bytes.len() <= max_size,
"WKB size {} is larger than expected maximum {} for {:?}",
wkb_bytes.len(),
max_size,
geom_type
);
wkb::reader::read_wkb(&wkb_bytes).unwrap();
}
}
}
#[test]
fn test_generate_random_geometry_deterministic() {
let bounds = Rect::new(Coord { x: 0.0, y: 0.0 }, Coord { x: 100.0, y: 100.0 });
let size_range = (1.0, 10.0);
let geom_types = [GeometryTypeId::Point, GeometryTypeId::Polygon];
let mut rng1 = StdRng::seed_from_u64(42);
let mut rng2 = StdRng::seed_from_u64(42);
for geom_type in geom_types {
let options = RandomGeometryOptions {
geom_type,
bounds,
size_range,
..Default::default()
};
let wkb1 = generate_random_wkb(&mut rng1, &options).unwrap();
let wkb2 = generate_random_wkb(&mut rng2, &options).unwrap();
assert_eq!(wkb1, wkb2);
}
}
#[test]
fn test_random_partitioned_data_builder_build_basic() {
let (schema, partitions) = RandomPartitionedDataBuilder::new()
.num_partitions(2)
.batches_per_partition(3)
.rows_per_batch(4)
.null_rate(0.0) .build()
.unwrap();
assert_eq!(schema.fields().len(), 3);
assert_eq!(schema.field(0).name(), "id");
assert_eq!(schema.field(0).data_type(), &DataType::Int32);
assert_eq!(schema.field(1).name(), "dist");
assert_eq!(schema.field(1).data_type(), &DataType::Float64);
assert_eq!(schema.field(2).name(), "geometry");
assert_eq!(partitions.len(), 2);
for partition in &partitions {
assert_eq!(partition.len(), 3);
for batch in partition {
assert_eq!(batch.num_rows(), 4); assert_eq!(batch.num_columns(), 3);
}
}
}
#[test]
fn test_random_partitioned_data_builder_unique_ids() {
let (_, partitions) = RandomPartitionedDataBuilder::new()
.num_partitions(2)
.batches_per_partition(2)
.rows_per_batch(3)
.build()
.unwrap();
let mut all_ids = Vec::new();
for partition in &partitions {
for batch in partition {
let id_array = batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
for i in 0..id_array.len() {
all_ids.push(id_array.value(i));
}
}
}
all_ids.sort();
for i in 1..all_ids.len() {
assert_ne!(
all_ids[i - 1],
all_ids[i],
"Found duplicate ID: {}",
all_ids[i]
);
}
for (i, &id) in all_ids.iter().enumerate() {
assert_eq!(id, i as i32);
}
}
#[test]
fn test_random_partitioned_data_builder_null_rate() {
let (_, partitions) = RandomPartitionedDataBuilder::new()
.rows_per_batch(100)
.null_rate(0.5) .build()
.unwrap();
let batch = &partitions[0][0];
let geometry_array = batch.column(2);
let null_count = geometry_array.null_count();
let total_count = geometry_array.len();
let null_rate = null_count as f64 / total_count as f64;
assert!(
(0.3..=0.7).contains(&null_rate),
"Expected null rate around 0.5, got {null_rate}"
);
}
#[test]
fn test_random_partitioned_data_builder_deterministic() {
let bounds = Rect::new(Coord { x: 0.0, y: 0.0 }, Coord { x: 100.0, y: 100.0 });
let (schema1, partitions1) = RandomPartitionedDataBuilder::new()
.seed(999)
.num_partitions(2)
.batches_per_partition(2)
.rows_per_batch(5)
.bounds(bounds)
.build()
.unwrap();
let (schema2, partitions2) = RandomPartitionedDataBuilder::new()
.seed(999) .num_partitions(2)
.batches_per_partition(2)
.rows_per_batch(5)
.bounds(bounds)
.build()
.unwrap();
assert_eq!(schema1, schema2);
assert_eq!(partitions1.len(), partitions2.len());
for (partition1, partition2) in partitions1.iter().zip(partitions2.iter()) {
assert_eq!(partition1.len(), partition2.len());
for (batch1, batch2) in partition1.iter().zip(partition2.iter()) {
let ids1 = batch1
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
let ids2 = batch2
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(ids1, ids2);
let dists1 = batch1
.column(1)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
let dists2 = batch2
.column(1)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
assert_eq!(dists1, dists2);
}
}
}
#[test]
fn test_random_partitioned_data_builder_different_seeds() {
let bounds = Rect::new(Coord { x: 0.0, y: 0.0 }, Coord { x: 100.0, y: 100.0 });
let (_, partitions1) = RandomPartitionedDataBuilder::new()
.seed(111)
.rows_per_batch(10)
.bounds(bounds)
.build()
.unwrap();
let (_, partitions2) = RandomPartitionedDataBuilder::new()
.seed(222) .rows_per_batch(10)
.bounds(bounds)
.build()
.unwrap();
let dists1 = partitions1[0][0]
.column(1)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
let dists2 = partitions2[0][0]
.column(1)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
let mut found_difference = false;
for i in 0..dists1.len() {
if (dists1.value(i) - dists2.value(i)).abs() > f64::EPSILON {
found_difference = true;
break;
}
}
assert!(
found_difference,
"Expected different random data with different seeds"
);
}
#[test]
fn test_random_linestring_num_vertices() {
let mut rng = StdRng::seed_from_u64(123);
let mut options = RandomGeometryOptions::new();
options.vertices_per_linestring_range = (3, 3);
for _ in 0..100 {
let geom = generate_random_linestring(&mut rng, &options).unwrap();
assert_eq!(geom.coords().count(), 3);
}
options.vertices_per_linestring_range = (50, 50);
for _ in 0..100 {
let geom = generate_random_linestring(&mut rng, &options).unwrap();
assert_eq!(geom.coords().count(), 50);
}
}
#[test]
fn test_random_polygon_has_hole() {
let mut rng = StdRng::seed_from_u64(123);
let mut options = RandomGeometryOptions::new();
options.polygon_hole_rate = 0.0;
for _ in 0..100 {
let geom = generate_random_polygon(&mut rng, &options).unwrap();
assert_eq!(geom.interiors().len(), 0);
}
options.polygon_hole_rate = 1.0;
for _ in 0..100 {
let geom = generate_random_polygon(&mut rng, &options).unwrap();
assert!(!geom.interiors().is_empty());
}
}
#[test]
fn test_random_multipoint_part_count() {
let mut rng = StdRng::seed_from_u64(123);
let mut options = RandomGeometryOptions::new();
options.num_parts_range = (3, 3);
for _ in 0..100 {
let geom = generate_random_multipoint(&mut rng, &options).unwrap();
assert_eq!(geom.len(), 3);
}
options.num_parts_range = (10, 10);
for _ in 0..100 {
let geom = generate_random_multipoint(&mut rng, &options).unwrap();
assert_eq!(geom.len(), 10);
}
}
#[test]
fn test_random_multilinestring_part_count() {
let mut rng = StdRng::seed_from_u64(123);
let mut options = RandomGeometryOptions::new();
options.num_parts_range = (3, 3);
for _ in 0..100 {
let geom = generate_random_multilinestring(&mut rng, &options).unwrap();
assert_eq!(geom.num_line_strings(), 3);
}
options.num_parts_range = (10, 10);
for _ in 0..100 {
let geom = generate_random_multilinestring(&mut rng, &options).unwrap();
assert_eq!(geom.num_line_strings(), 10);
}
}
#[test]
fn test_random_multipolygon_part_count() {
let mut rng = StdRng::seed_from_u64(123);
let mut options = RandomGeometryOptions::new();
options.num_parts_range = (3, 3);
for _ in 0..100 {
let geom = generate_random_multipolygon(&mut rng, &options).unwrap();
assert_eq!(geom.num_polygons(), 3);
}
options.num_parts_range = (10, 10);
for _ in 0..100 {
let geom = generate_random_multipolygon(&mut rng, &options).unwrap();
assert_eq!(geom.num_polygons(), 10);
}
}
#[test]
fn test_random_geometrycollection_part_count() {
let mut rng = StdRng::seed_from_u64(123);
let mut options = RandomGeometryOptions::new();
options.num_parts_range = (3, 3);
for _ in 0..100 {
let geom = generate_random_geometrycollection(&mut rng, &options).unwrap();
assert_eq!(geom.len(), 3);
}
options.num_parts_range = (10, 10);
for _ in 0..100 {
let geom = generate_random_geometrycollection(&mut rng, &options).unwrap();
assert_eq!(geom.len(), 10);
}
}
#[rstest]
fn test_random_geometry_type(
#[values(
GeometryTypeId::Point,
GeometryTypeId::LineString,
GeometryTypeId::Polygon,
GeometryTypeId::MultiPoint,
GeometryTypeId::MultiLineString,
GeometryTypeId::MultiPolygon,
GeometryTypeId::GeometryCollection
)]
geom_type: GeometryTypeId,
) {
let mut rng = StdRng::seed_from_u64(123);
let mut options = RandomGeometryOptions::new();
options.geom_type = geom_type;
options.empty_rate = 0.0;
for _ in 0..100 {
let geom = generate_random_wkb(&mut rng, &options).unwrap();
let wkb = wkb::reader::read_wkb(&geom).unwrap();
let analysis = analyze_geometry(&wkb).unwrap();
assert_eq!(analysis.geometry_type.geometry_type(), geom_type);
}
}
#[rstest]
fn test_random_emptiness(
#[values(
GeometryTypeId::Point,
GeometryTypeId::LineString,
GeometryTypeId::Polygon,
GeometryTypeId::MultiPoint,
GeometryTypeId::MultiLineString,
GeometryTypeId::MultiPolygon,
GeometryTypeId::GeometryCollection
)]
geom_type: GeometryTypeId,
) {
let mut rng = StdRng::seed_from_u64(123);
let mut options = RandomGeometryOptions::new();
options.geom_type = geom_type;
options.empty_rate = 0.0;
for _ in 0..100 {
let geom = generate_random_wkb(&mut rng, &options).unwrap();
let bounds = wkb_bounds_xy(&geom).unwrap();
assert!(!bounds.x().is_empty());
assert!(!bounds.y().is_empty());
assert!(
bounds.x().lo() >= options.bounds.min().x
&& bounds.y().lo() >= options.bounds.min().y
&& bounds.x().hi() <= options.bounds.max().x
&& bounds.y().hi() <= options.bounds.max().y
);
}
options.empty_rate = 1.0;
for _ in 0..100 {
let geom = generate_random_wkb(&mut rng, &options).unwrap();
let bounds = wkb_bounds_xy(&geom).unwrap();
assert!(bounds.x().is_empty());
assert!(bounds.y().is_empty());
}
}
#[test]
fn test_random_partitioned_data_builder_validation() {
let err = RandomPartitionedDataBuilder::new()
.null_rate(-0.1)
.validate()
.unwrap_err();
assert_eq!(
err.to_string(),
"Error during planning: Expected null_rate between 0.0 and 1.0 but got -0.1"
);
let err = RandomPartitionedDataBuilder::new()
.null_rate(1.5)
.validate()
.unwrap_err();
assert_eq!(
err.to_string(),
"Error during planning: Expected null_rate between 0.0 and 1.0 but got 1.5"
);
let err = RandomPartitionedDataBuilder::new()
.rows_per_batch(0)
.validate()
.unwrap_err();
assert_eq!(
err.to_string(),
"Error during planning: Expected rows_per_batch > 0 but got 0"
);
let err = RandomPartitionedDataBuilder::new()
.num_partitions(0)
.validate()
.unwrap_err();
assert_eq!(
err.to_string(),
"Error during planning: Expected num_partitions > 0 but got 0"
);
let err = RandomPartitionedDataBuilder::new()
.empty_rate(-0.1)
.validate()
.unwrap_err();
assert_eq!(
err.to_string(),
"Error during planning: Expected empty_rate between 0.0 and 1.0 but got -0.1"
);
let err = RandomPartitionedDataBuilder::new()
.empty_rate(1.5)
.validate()
.unwrap_err();
assert_eq!(
err.to_string(),
"Error during planning: Expected empty_rate between 0.0 and 1.0 but got 1.5"
);
let err = RandomPartitionedDataBuilder::new()
.polygon_hole_rate(-0.1)
.validate()
.unwrap_err();
assert_eq!(
err.to_string(),
"Error during planning: Expected polygon_hole_rate between 0.0 and 1.0 but got -0.1"
);
let err = RandomPartitionedDataBuilder::new()
.polygon_hole_rate(1.5)
.validate()
.unwrap_err();
assert_eq!(
err.to_string(),
"Error during planning: Expected polygon_hole_rate between 0.0 and 1.0 but got 1.5"
);
let err = RandomPartitionedDataBuilder::new()
.size_range((0.0, 10.0))
.validate()
.unwrap_err();
assert_eq!(
err.to_string(),
"Error during planning: Expected valid size_range but got (0.0, 10.0)"
);
let err = RandomPartitionedDataBuilder::new()
.size_range((5.0, -1.0))
.validate()
.unwrap_err();
assert_eq!(
err.to_string(),
"Error during planning: Expected valid size_range but got (5.0, -1.0)"
);
let err = RandomPartitionedDataBuilder::new()
.size_range((10.0, 5.0))
.validate()
.unwrap_err();
assert_eq!(
err.to_string(),
"Error during planning: Expected valid size_range but got (10.0, 5.0)"
);
let err = RandomPartitionedDataBuilder::new()
.vertices_per_linestring_range((0, 5))
.validate()
.unwrap_err();
assert_eq!(
err.to_string(),
"Error during planning: Expected valid vertices_per_linestring_range but got (0, 5)"
);
let err = RandomPartitionedDataBuilder::new()
.vertices_per_linestring_range((10, 5))
.validate()
.unwrap_err();
assert_eq!(
err.to_string(),
"Error during planning: Expected valid vertices_per_linestring_range but got (10, 5)"
);
let err = RandomPartitionedDataBuilder::new()
.num_parts_range((0, 5))
.validate()
.unwrap_err();
assert_eq!(
err.to_string(),
"Error during planning: Expected valid num_parts_range but got (0, 5)"
);
let err = RandomPartitionedDataBuilder::new()
.num_parts_range((10, 5))
.validate()
.unwrap_err();
assert_eq!(
err.to_string(),
"Error during planning: Expected valid num_parts_range but got (10, 5)"
);
let err = RandomPartitionedDataBuilder::new()
.bounds(Rect::new(
Coord { x: 10.0, y: 10.0 },
Coord { x: 10.0, y: 20.0 },
))
.validate()
.unwrap_err();
assert_eq!(
err.to_string(),
"Error during planning: Expected valid bounds but got RECT(10.0 10.0,10.0 20.0)"
);
let err = RandomPartitionedDataBuilder::new()
.bounds(Rect::new(
Coord { x: 10.0, y: 10.0 },
Coord { x: 20.0, y: 10.0 },
))
.validate()
.unwrap_err();
assert_eq!(
err.to_string(),
"Error during planning: Expected valid bounds but got RECT(10.0 10.0,20.0 10.0)"
);
}
}