geoarrow_array/geozero/export/data_source/
mod.rs1mod record_batch_reader;
2
3use std::str::FromStr;
4use std::sync::Arc;
5
6use arrow_array::cast::AsArray;
7use arrow_array::timezone::Tz;
8use arrow_array::types::*;
9use arrow_array::{Array, RecordBatch};
10use arrow_json::writer::make_encoder;
11use arrow_schema::{DataType, Schema, TimeUnit};
12use geoarrow_schema::GeoArrowType;
13use geozero::error::GeozeroError;
14use geozero::{ColumnValue, FeatureProcessor, GeomProcessor, GeozeroDatasource, PropertyProcessor};
15pub use record_batch_reader::GeozeroRecordBatchReader;
16
17use crate::GeoArrowArray;
18use crate::array::from_arrow_array;
19use crate::builder::geo_trait_wrappers::RectWrapper;
20use crate::cast::AsGeoArrowArray;
21use crate::geozero::export::scalar::{
22 process_geometry, process_geometry_collection, process_line_string, process_multi_line_string,
23 process_multi_point, process_multi_polygon, process_point, process_polygon,
24};
25use crate::trait_::GeoArrowArrayAccessor;
26
27impl GeozeroDatasource for GeozeroRecordBatchReader {
28 fn process<P: FeatureProcessor>(&mut self, processor: &mut P) -> Result<(), GeozeroError> {
29 let reader = self.as_mut();
30 let schema = reader.schema();
31 let geom_indices = geometry_columns(&schema);
32 let geometry_column_index = if geom_indices.len() != 1 {
33 Err(GeozeroError::Dataset(
34 "Writing through geozero not supported with multiple geometries".to_string(),
35 ))?
36 } else {
37 geom_indices[0]
38 };
39
40 processor.dataset_begin(None)?;
41
42 let mut overall_row_idx = 0;
43 for batch in reader.into_iter() {
44 let batch = batch.map_err(|err| GeozeroError::Dataset(err.to_string()))?;
45 process_batch(
46 &batch,
47 &schema,
48 geometry_column_index,
49 overall_row_idx,
50 processor,
51 )?;
52 overall_row_idx += batch.num_rows();
53 }
54
55 processor.dataset_end()?;
56
57 Ok(())
58 }
59}
60
61fn process_batch<P: FeatureProcessor>(
62 batch: &RecordBatch,
63 schema: &Schema,
64 geometry_column_index: usize,
65 batch_start_idx: usize,
66 processor: &mut P,
67) -> Result<(), GeozeroError> {
68 let num_rows = batch.num_rows();
69 let geometry_field = schema.field(geometry_column_index);
70 let geometry_column_box = &batch.columns()[geometry_column_index];
71 let geometry_column = from_arrow_array(&geometry_column_box, geometry_field)
72 .map_err(|err| GeozeroError::Dataset(err.to_string()))?;
73
74 for within_batch_row_idx in 0..num_rows {
75 processor.feature_begin((within_batch_row_idx + batch_start_idx) as u64)?;
76
77 processor.properties_begin()?;
78 process_properties(
79 batch,
80 schema,
81 within_batch_row_idx,
82 geometry_column_index,
83 processor,
84 )?;
85 processor.properties_end()?;
86
87 processor.geometry_begin()?;
88 process_geometry_n(&geometry_column, within_batch_row_idx, processor)?;
89 processor.geometry_end()?;
90
91 processor.feature_end((within_batch_row_idx + batch_start_idx) as u64)?;
92 }
93
94 Ok(())
95}
96
97fn process_properties<P: PropertyProcessor>(
98 batch: &RecordBatch,
99 schema: &Schema,
100 within_batch_row_idx: usize,
101 geometry_column_index: usize,
102 processor: &mut P,
103) -> Result<(), GeozeroError> {
104 let mut property_idx = 0;
107 for (column_idx, (field, array)) in schema.fields.iter().zip(batch.columns().iter()).enumerate()
108 {
109 if column_idx == geometry_column_index {
111 continue;
112 }
113 let name = field.name();
114
115 if array.is_null(within_batch_row_idx) {
117 continue;
118 }
119
120 match field.data_type() {
121 DataType::Boolean => {
122 let arr = array.as_boolean();
123 processor.property(
124 property_idx,
125 name,
126 &ColumnValue::Bool(arr.value(within_batch_row_idx)),
127 )?;
128 }
129 DataType::UInt8 => {
130 let arr = array.as_primitive::<UInt8Type>();
131 processor.property(
132 property_idx,
133 name,
134 &ColumnValue::UByte(arr.value(within_batch_row_idx)),
135 )?;
136 }
137 DataType::Int8 => {
138 let arr = array.as_primitive::<Int8Type>();
139 processor.property(
140 property_idx,
141 name,
142 &ColumnValue::Byte(arr.value(within_batch_row_idx)),
143 )?;
144 }
145 DataType::UInt16 => {
146 let arr = array.as_primitive::<UInt16Type>();
147 processor.property(
148 property_idx,
149 name,
150 &ColumnValue::UShort(arr.value(within_batch_row_idx)),
151 )?;
152 }
153 DataType::Int16 => {
154 let arr = array.as_primitive::<Int16Type>();
155 processor.property(
156 property_idx,
157 name,
158 &ColumnValue::Short(arr.value(within_batch_row_idx)),
159 )?;
160 }
161 DataType::UInt32 => {
162 let arr = array.as_primitive::<UInt32Type>();
163 processor.property(
164 property_idx,
165 name,
166 &ColumnValue::UInt(arr.value(within_batch_row_idx)),
167 )?;
168 }
169 DataType::Int32 => {
170 let arr = array.as_primitive::<Int32Type>();
171 processor.property(
172 property_idx,
173 name,
174 &ColumnValue::Int(arr.value(within_batch_row_idx)),
175 )?;
176 }
177 DataType::UInt64 => {
178 let arr = array.as_primitive::<UInt64Type>();
179 processor.property(
180 property_idx,
181 name,
182 &ColumnValue::ULong(arr.value(within_batch_row_idx)),
183 )?;
184 }
185 DataType::Int64 => {
186 let arr = array.as_primitive::<Int64Type>();
187 processor.property(
188 property_idx,
189 name,
190 &ColumnValue::Long(arr.value(within_batch_row_idx)),
191 )?;
192 }
193 DataType::Float16 => {
194 let arr = array.as_primitive::<Float16Type>();
195 processor.property(
196 property_idx,
197 name,
198 &ColumnValue::Float(arr.value(within_batch_row_idx).to_f32()),
199 )?;
200 }
201 DataType::Float32 => {
202 let arr = array.as_primitive::<Float32Type>();
203 processor.property(
204 property_idx,
205 name,
206 &ColumnValue::Float(arr.value(within_batch_row_idx)),
207 )?;
208 }
209 DataType::Float64 => {
210 let arr = array.as_primitive::<Float64Type>();
211 processor.property(
212 property_idx,
213 name,
214 &ColumnValue::Double(arr.value(within_batch_row_idx)),
215 )?;
216 }
217 DataType::Utf8 => {
218 let arr = array.as_string::<i32>();
219 processor.property(
220 property_idx,
221 name,
222 &ColumnValue::String(arr.value(within_batch_row_idx)),
223 )?;
224 }
225 DataType::LargeUtf8 => {
226 let arr = array.as_string::<i64>();
227 processor.property(
228 property_idx,
229 name,
230 &ColumnValue::String(arr.value(within_batch_row_idx)),
231 )?;
232 }
233 DataType::Binary => {
234 let arr = array.as_binary::<i32>();
235 processor.property(
236 property_idx,
237 name,
238 &ColumnValue::Binary(arr.value(within_batch_row_idx)),
239 )?;
240 }
241 DataType::LargeBinary => {
242 let arr = array.as_binary::<i64>();
243 processor.property(
244 property_idx,
245 name,
246 &ColumnValue::Binary(arr.value(within_batch_row_idx)),
247 )?;
248 }
249 DataType::Struct(_)
250 | DataType::List(_)
251 | DataType::LargeList(_)
252 | DataType::Map(_, _) => {
253 let options = Default::default();
255 let mut enc = make_encoder(field, array, &options)
256 .map_err(|err| GeozeroError::Property(err.to_string()))?;
257 let mut out = vec![];
258 enc.encode(within_batch_row_idx, &mut out);
259 let json_string = String::from_utf8(out)
260 .map_err(|err| GeozeroError::Property(err.to_string()))?;
261 processor.property(property_idx, name, &ColumnValue::Json(&json_string))?;
262 }
263 DataType::Date32 => {
264 let arr = array.as_primitive::<Date32Type>();
265 let datetime = arr.value_as_datetime(within_batch_row_idx).unwrap();
266 let dt_str = datetime.and_utc().to_rfc3339();
267 processor.property(property_idx, name, &ColumnValue::DateTime(&dt_str))?;
268 }
269 DataType::Date64 => {
270 let arr = array.as_primitive::<Date64Type>();
271 let datetime = arr.value_as_datetime(within_batch_row_idx).unwrap();
272 let dt_str = datetime.and_utc().to_rfc3339();
273 processor.property(property_idx, name, &ColumnValue::DateTime(&dt_str))?;
274 }
275 DataType::Timestamp(unit, tz) => {
276 let arrow_tz = if let Some(tz) = tz {
277 Some(Tz::from_str(tz).map_err(|err| GeozeroError::Property(err.to_string()))?)
278 } else {
279 None
280 };
281
282 macro_rules! impl_timestamp {
283 ($arrow_type:ty) => {{
284 let arr = array.as_primitive::<$arrow_type>();
285 let dt_str = if let Some(arrow_tz) = arrow_tz {
286 arr.value_as_datetime_with_tz(within_batch_row_idx, arrow_tz)
287 .unwrap()
288 .to_rfc3339()
289 } else {
290 arr.value_as_datetime(within_batch_row_idx)
291 .unwrap()
292 .and_utc()
293 .to_rfc3339()
294 };
295 processor.property(property_idx, name, &ColumnValue::DateTime(&dt_str))?;
296 }};
297 }
298
299 match unit {
300 TimeUnit::Microsecond => impl_timestamp!(TimestampMicrosecondType),
301 TimeUnit::Millisecond => impl_timestamp!(TimestampMillisecondType),
302 TimeUnit::Nanosecond => impl_timestamp!(TimestampNanosecondType),
303 TimeUnit::Second => impl_timestamp!(TimestampSecondType),
304 }
305 }
306 dt => {
307 return Err(GeozeroError::Properties(format!(
308 "unsupported type: {dt:?}",
309 )));
310 }
311 }
312 property_idx += 1;
313 }
314
315 Ok(())
316}
317
318fn process_geometry_n<P: GeomProcessor>(
319 geometry_column: &Arc<dyn GeoArrowArray>,
320 within_batch_row_idx: usize,
321 processor: &mut P,
322) -> Result<(), GeozeroError> {
323 let arr = geometry_column.as_ref();
324 let i = within_batch_row_idx;
325
326 use GeoArrowType::*;
327 match arr.data_type() {
329 Point(_) => {
330 let geom = arr.as_point().value(i).unwrap();
331 process_point(&geom, 0, processor)?;
332 }
333 LineString(_) => {
334 let geom = arr.as_line_string().value(i).unwrap();
335 process_line_string(&geom, 0, processor)?;
336 }
337 Polygon(_) => {
338 let geom = arr.as_polygon().value(i).unwrap();
339 process_polygon(&geom, true, 0, processor)?;
340 }
341 MultiPoint(_) => {
342 let geom = arr.as_multi_point().value(i).unwrap();
343 process_multi_point(&geom, 0, processor)?;
344 }
345 MultiLineString(_) => {
346 let geom = arr.as_multi_line_string().value(i).unwrap();
347 process_multi_line_string(&geom, 0, processor)?;
348 }
349 MultiPolygon(_) => {
350 let geom = arr.as_multi_polygon().value(i).unwrap();
351 process_multi_polygon(&geom, 0, processor)?;
352 }
353 GeometryCollection(_) => {
354 let geom = arr.as_geometry_collection().value(i).unwrap();
355 process_geometry_collection(&geom, 0, processor)?;
356 }
357 Wkb(_) => {
358 let geom = arr
359 .as_wkb::<i32>()
360 .value(i)
361 .map_err(|err| GeozeroError::Geometry(err.to_string()))?;
362 process_geometry(&geom, 0, processor)?;
363 }
364 LargeWkb(_) => {
365 let geom = arr
366 .as_wkb::<i64>()
367 .value(i)
368 .map_err(|err| GeozeroError::Geometry(err.to_string()))?;
369 process_geometry(&geom, 0, processor)?;
370 }
371 WkbView(_) => {
372 let geom = arr
373 .as_wkb_view()
374 .value(i)
375 .map_err(|err| GeozeroError::Geometry(err.to_string()))?;
376 process_geometry(&geom, 0, processor)?;
377 }
378 Wkt(_) => {
379 let geom = arr
380 .as_wkt::<i32>()
381 .value(i)
382 .map_err(|err| GeozeroError::Geometry(err.to_string()))?;
383 process_geometry(&geom, 0, processor)?;
384 }
385 LargeWkt(_) => {
386 let geom = arr
387 .as_wkt::<i64>()
388 .value(i)
389 .map_err(|err| GeozeroError::Geometry(err.to_string()))?;
390 process_geometry(&geom, 0, processor)?;
391 }
392 WktView(_) => {
393 let geom = arr
394 .as_wkt_view()
395 .value(i)
396 .map_err(|err| GeozeroError::Geometry(err.to_string()))?;
397 process_geometry(&geom, 0, processor)?;
398 }
399 Rect(_) => {
400 let geom = arr.as_rect().value(i).unwrap();
401 let wrapper = RectWrapper::try_new(&geom)
402 .map_err(|err| geozero::error::GeozeroError::Geometry(err.to_string()))?;
403 process_polygon(&wrapper, true, 0, processor)?
404 }
405 Geometry(_) => {
406 let geom = arr.as_geometry().value(i).unwrap();
407 process_geometry(&geom, 0, processor)?;
408 }
409 }
410
411 Ok(())
412}
413
414fn geometry_columns(schema: &Schema) -> Vec<usize> {
415 let mut geom_indices = vec![];
416 for (field_idx, field) in schema.fields().iter().enumerate() {
417 if GeoArrowType::from_extension_field(field.as_ref()).is_ok() {
418 geom_indices.push(field_idx);
419 }
420 }
421 geom_indices
422}