1use std::fs::File;
18
19use arrow_array::{ArrayRef, RecordBatchReader};
20use datafusion_common::{DataFusionError, Result};
21use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
22use sedona_common::sedona_internal_err;
23use sedona_schema::datatypes::SedonaType;
24
25use crate::data::test_geoparquet;
26
27#[derive(Debug, Clone)]
29pub struct TestReadOptions {
30 pub sedona_type: SedonaType,
32
33 pub chunk_size: usize,
35
36 pub output_size: Option<usize>,
43}
44
45impl TestReadOptions {
46 pub fn new(sedona_type: SedonaType) -> Self {
48 TestReadOptions {
49 sedona_type,
50 chunk_size: 8192,
51 output_size: None,
52 }
53 }
54
55 pub fn with_output_size(self, output_size: usize) -> Self {
57 TestReadOptions {
58 sedona_type: self.sedona_type,
59 chunk_size: self.chunk_size,
60 output_size: Some(output_size),
61 }
62 }
63}
64
65pub fn read_geoarrow_data_geometry(
69 group: &str,
70 name: &str,
71 options: &TestReadOptions,
72) -> Result<Vec<ArrayRef>> {
73 let path = test_geoparquet(group, name)?;
74 let file = File::open(path).map_err(DataFusionError::IoError)?;
75 let reader = ParquetRecordBatchReader::try_new(file, options.chunk_size)
76 .map_err(|e| DataFusionError::External(Box::new(e)))?;
77
78 if reader.schema().fields().is_empty() {
79 return sedona_internal_err!("Unexpected schema: zero columns");
80 }
81
82 let geometry_index = reader.schema().fields().len() - 1;
84 let raw_arrays = reader
85 .map(|batch| -> Result<ArrayRef> {
86 let array = batch?.column(geometry_index).clone();
87 Ok(arrow_cast::cast(
90 &array,
91 options.sedona_type.storage_type(),
92 )?)
93 })
94 .collect::<Result<Vec<_>>>()?;
95
96 apply_output_size(raw_arrays, options)
97}
98
99fn apply_output_size(arrays: Vec<ArrayRef>, options: &TestReadOptions) -> Result<Vec<ArrayRef>> {
100 if let Some(output_size) = options.output_size {
101 let mut out = Vec::new();
102 let mut i = 0;
103 let mut out_size = 0;
104 while out_size < output_size {
105 let array = &arrays[i % arrays.len()];
106 out_size += array.len();
107 i += 1;
108 out.push(array.clone());
109 }
110
111 Ok(out)
112 } else {
113 Ok(arrays)
114 }
115}
116
117#[cfg(test)]
118mod test {
119 use sedona_schema::datatypes::WKB_GEOMETRY;
120
121 use super::*;
122
123 #[test]
124 fn read() {
125 let batches =
126 read_geoarrow_data_geometry("example", "geometry", &TestReadOptions::new(WKB_GEOMETRY))
127 .unwrap();
128 assert_eq!(batches.len(), 1);
129 assert_eq!(batches[0].len(), 9);
130 assert_eq!(batches[0].data_type(), WKB_GEOMETRY.storage_type());
131
132 let options = TestReadOptions::new(WKB_GEOMETRY).with_output_size(100);
133 let batches = read_geoarrow_data_geometry("example", "geometry", &options).unwrap();
134 assert_eq!(batches.len(), 12);
135 }
136}