geoarrow_array/geozero/export/data_source/
mod.rs

1mod record_batch_reader;
2
3use std::str::FromStr;
4use std::sync::Arc;
5
6use arrow_array::cast::AsArray;
7use arrow_array::timezone::Tz;
8use arrow_array::types::*;
9use arrow_array::{Array, RecordBatch};
10use arrow_json::writer::make_encoder;
11use arrow_schema::{DataType, Schema, TimeUnit};
12use geoarrow_schema::GeoArrowType;
13use geozero::error::GeozeroError;
14use geozero::{ColumnValue, FeatureProcessor, GeomProcessor, GeozeroDatasource, PropertyProcessor};
15pub use record_batch_reader::GeozeroRecordBatchReader;
16
17use crate::GeoArrowArray;
18use crate::array::from_arrow_array;
19use crate::builder::geo_trait_wrappers::RectWrapper;
20use crate::cast::AsGeoArrowArray;
21use crate::geozero::export::scalar::{
22    process_geometry, process_geometry_collection, process_line_string, process_multi_line_string,
23    process_multi_point, process_multi_polygon, process_point, process_polygon,
24};
25use crate::trait_::GeoArrowArrayAccessor;
26
27impl GeozeroDatasource for GeozeroRecordBatchReader {
28    fn process<P: FeatureProcessor>(&mut self, processor: &mut P) -> Result<(), GeozeroError> {
29        let reader = self.as_mut();
30        let schema = reader.schema();
31        let geom_indices = geometry_columns(&schema);
32        let geometry_column_index = if geom_indices.len() != 1 {
33            Err(GeozeroError::Dataset(
34                "Writing through geozero not supported with multiple geometries".to_string(),
35            ))?
36        } else {
37            geom_indices[0]
38        };
39
40        processor.dataset_begin(None)?;
41
42        let mut overall_row_idx = 0;
43        for batch in reader.into_iter() {
44            let batch = batch.map_err(|err| GeozeroError::Dataset(err.to_string()))?;
45            process_batch(
46                &batch,
47                &schema,
48                geometry_column_index,
49                overall_row_idx,
50                processor,
51            )?;
52            overall_row_idx += batch.num_rows();
53        }
54
55        processor.dataset_end()?;
56
57        Ok(())
58    }
59}
60
61fn process_batch<P: FeatureProcessor>(
62    batch: &RecordBatch,
63    schema: &Schema,
64    geometry_column_index: usize,
65    batch_start_idx: usize,
66    processor: &mut P,
67) -> Result<(), GeozeroError> {
68    let num_rows = batch.num_rows();
69    let geometry_field = schema.field(geometry_column_index);
70    let geometry_column_box = &batch.columns()[geometry_column_index];
71    let geometry_column = from_arrow_array(&geometry_column_box, geometry_field)
72        .map_err(|err| GeozeroError::Dataset(err.to_string()))?;
73
74    for within_batch_row_idx in 0..num_rows {
75        processor.feature_begin((within_batch_row_idx + batch_start_idx) as u64)?;
76
77        processor.properties_begin()?;
78        process_properties(
79            batch,
80            schema,
81            within_batch_row_idx,
82            geometry_column_index,
83            processor,
84        )?;
85        processor.properties_end()?;
86
87        processor.geometry_begin()?;
88        process_geometry_n(&geometry_column, within_batch_row_idx, processor)?;
89        processor.geometry_end()?;
90
91        processor.feature_end((within_batch_row_idx + batch_start_idx) as u64)?;
92    }
93
94    Ok(())
95}
96
97fn process_properties<P: PropertyProcessor>(
98    batch: &RecordBatch,
99    schema: &Schema,
100    within_batch_row_idx: usize,
101    geometry_column_index: usize,
102    processor: &mut P,
103) -> Result<(), GeozeroError> {
104    // Note: the `column_idx` will be off by one if the geometry column is not the last column in
105    // the table, so we maintain a separate property index counter
106    let mut property_idx = 0;
107    for (column_idx, (field, array)) in schema.fields.iter().zip(batch.columns().iter()).enumerate()
108    {
109        // Don't include geometry column in properties
110        if column_idx == geometry_column_index {
111            continue;
112        }
113        let name = field.name();
114
115        // Don't pass null properties to geozero
116        if array.is_null(within_batch_row_idx) {
117            continue;
118        }
119
120        match field.data_type() {
121            DataType::Boolean => {
122                let arr = array.as_boolean();
123                processor.property(
124                    property_idx,
125                    name,
126                    &ColumnValue::Bool(arr.value(within_batch_row_idx)),
127                )?;
128            }
129            DataType::UInt8 => {
130                let arr = array.as_primitive::<UInt8Type>();
131                processor.property(
132                    property_idx,
133                    name,
134                    &ColumnValue::UByte(arr.value(within_batch_row_idx)),
135                )?;
136            }
137            DataType::Int8 => {
138                let arr = array.as_primitive::<Int8Type>();
139                processor.property(
140                    property_idx,
141                    name,
142                    &ColumnValue::Byte(arr.value(within_batch_row_idx)),
143                )?;
144            }
145            DataType::UInt16 => {
146                let arr = array.as_primitive::<UInt16Type>();
147                processor.property(
148                    property_idx,
149                    name,
150                    &ColumnValue::UShort(arr.value(within_batch_row_idx)),
151                )?;
152            }
153            DataType::Int16 => {
154                let arr = array.as_primitive::<Int16Type>();
155                processor.property(
156                    property_idx,
157                    name,
158                    &ColumnValue::Short(arr.value(within_batch_row_idx)),
159                )?;
160            }
161            DataType::UInt32 => {
162                let arr = array.as_primitive::<UInt32Type>();
163                processor.property(
164                    property_idx,
165                    name,
166                    &ColumnValue::UInt(arr.value(within_batch_row_idx)),
167                )?;
168            }
169            DataType::Int32 => {
170                let arr = array.as_primitive::<Int32Type>();
171                processor.property(
172                    property_idx,
173                    name,
174                    &ColumnValue::Int(arr.value(within_batch_row_idx)),
175                )?;
176            }
177            DataType::UInt64 => {
178                let arr = array.as_primitive::<UInt64Type>();
179                processor.property(
180                    property_idx,
181                    name,
182                    &ColumnValue::ULong(arr.value(within_batch_row_idx)),
183                )?;
184            }
185            DataType::Int64 => {
186                let arr = array.as_primitive::<Int64Type>();
187                processor.property(
188                    property_idx,
189                    name,
190                    &ColumnValue::Long(arr.value(within_batch_row_idx)),
191                )?;
192            }
193            DataType::Float16 => {
194                let arr = array.as_primitive::<Float16Type>();
195                processor.property(
196                    property_idx,
197                    name,
198                    &ColumnValue::Float(arr.value(within_batch_row_idx).to_f32()),
199                )?;
200            }
201            DataType::Float32 => {
202                let arr = array.as_primitive::<Float32Type>();
203                processor.property(
204                    property_idx,
205                    name,
206                    &ColumnValue::Float(arr.value(within_batch_row_idx)),
207                )?;
208            }
209            DataType::Float64 => {
210                let arr = array.as_primitive::<Float64Type>();
211                processor.property(
212                    property_idx,
213                    name,
214                    &ColumnValue::Double(arr.value(within_batch_row_idx)),
215                )?;
216            }
217            DataType::Utf8 => {
218                let arr = array.as_string::<i32>();
219                processor.property(
220                    property_idx,
221                    name,
222                    &ColumnValue::String(arr.value(within_batch_row_idx)),
223                )?;
224            }
225            DataType::LargeUtf8 => {
226                let arr = array.as_string::<i64>();
227                processor.property(
228                    property_idx,
229                    name,
230                    &ColumnValue::String(arr.value(within_batch_row_idx)),
231                )?;
232            }
233            DataType::Binary => {
234                let arr = array.as_binary::<i32>();
235                processor.property(
236                    property_idx,
237                    name,
238                    &ColumnValue::Binary(arr.value(within_batch_row_idx)),
239                )?;
240            }
241            DataType::LargeBinary => {
242                let arr = array.as_binary::<i64>();
243                processor.property(
244                    property_idx,
245                    name,
246                    &ColumnValue::Binary(arr.value(within_batch_row_idx)),
247                )?;
248            }
249            DataType::Struct(_)
250            | DataType::List(_)
251            | DataType::LargeList(_)
252            | DataType::Map(_, _) => {
253                // TODO(Perf): refactor so that we don't make a new encoder on every row
254                let options = Default::default();
255                let mut enc = make_encoder(field, array, &options)
256                    .map_err(|err| GeozeroError::Property(err.to_string()))?;
257                let mut out = vec![];
258                enc.encode(within_batch_row_idx, &mut out);
259                let json_string = String::from_utf8(out)
260                    .map_err(|err| GeozeroError::Property(err.to_string()))?;
261                processor.property(property_idx, name, &ColumnValue::Json(&json_string))?;
262            }
263            DataType::Date32 => {
264                let arr = array.as_primitive::<Date32Type>();
265                let datetime = arr.value_as_datetime(within_batch_row_idx).unwrap();
266                let dt_str = datetime.and_utc().to_rfc3339();
267                processor.property(property_idx, name, &ColumnValue::DateTime(&dt_str))?;
268            }
269            DataType::Date64 => {
270                let arr = array.as_primitive::<Date64Type>();
271                let datetime = arr.value_as_datetime(within_batch_row_idx).unwrap();
272                let dt_str = datetime.and_utc().to_rfc3339();
273                processor.property(property_idx, name, &ColumnValue::DateTime(&dt_str))?;
274            }
275            DataType::Timestamp(unit, tz) => {
276                let arrow_tz = if let Some(tz) = tz {
277                    Some(Tz::from_str(tz).map_err(|err| GeozeroError::Property(err.to_string()))?)
278                } else {
279                    None
280                };
281
282                macro_rules! impl_timestamp {
283                    ($arrow_type:ty) => {{
284                        let arr = array.as_primitive::<$arrow_type>();
285                        let dt_str = if let Some(arrow_tz) = arrow_tz {
286                            arr.value_as_datetime_with_tz(within_batch_row_idx, arrow_tz)
287                                .unwrap()
288                                .to_rfc3339()
289                        } else {
290                            arr.value_as_datetime(within_batch_row_idx)
291                                .unwrap()
292                                .and_utc()
293                                .to_rfc3339()
294                        };
295                        processor.property(property_idx, name, &ColumnValue::DateTime(&dt_str))?;
296                    }};
297                }
298
299                match unit {
300                    TimeUnit::Microsecond => impl_timestamp!(TimestampMicrosecondType),
301                    TimeUnit::Millisecond => impl_timestamp!(TimestampMillisecondType),
302                    TimeUnit::Nanosecond => impl_timestamp!(TimestampNanosecondType),
303                    TimeUnit::Second => impl_timestamp!(TimestampSecondType),
304                }
305            }
306            dt => {
307                return Err(GeozeroError::Properties(format!(
308                    "unsupported type: {dt:?}",
309                )));
310            }
311        }
312        property_idx += 1;
313    }
314
315    Ok(())
316}
317
318fn process_geometry_n<P: GeomProcessor>(
319    geometry_column: &Arc<dyn GeoArrowArray>,
320    within_batch_row_idx: usize,
321    processor: &mut P,
322) -> Result<(), GeozeroError> {
323    let arr = geometry_column.as_ref();
324    let i = within_batch_row_idx;
325
326    use GeoArrowType::*;
327    // TODO: should we be passing the geom_idx down into these process* functions?
328    match arr.data_type() {
329        Point(_) => {
330            let geom = arr.as_point().value(i).unwrap();
331            process_point(&geom, 0, processor)?;
332        }
333        LineString(_) => {
334            let geom = arr.as_line_string().value(i).unwrap();
335            process_line_string(&geom, 0, processor)?;
336        }
337        Polygon(_) => {
338            let geom = arr.as_polygon().value(i).unwrap();
339            process_polygon(&geom, true, 0, processor)?;
340        }
341        MultiPoint(_) => {
342            let geom = arr.as_multi_point().value(i).unwrap();
343            process_multi_point(&geom, 0, processor)?;
344        }
345        MultiLineString(_) => {
346            let geom = arr.as_multi_line_string().value(i).unwrap();
347            process_multi_line_string(&geom, 0, processor)?;
348        }
349        MultiPolygon(_) => {
350            let geom = arr.as_multi_polygon().value(i).unwrap();
351            process_multi_polygon(&geom, 0, processor)?;
352        }
353        GeometryCollection(_) => {
354            let geom = arr.as_geometry_collection().value(i).unwrap();
355            process_geometry_collection(&geom, 0, processor)?;
356        }
357        Wkb(_) => {
358            let geom = arr
359                .as_wkb::<i32>()
360                .value(i)
361                .map_err(|err| GeozeroError::Geometry(err.to_string()))?;
362            process_geometry(&geom, 0, processor)?;
363        }
364        LargeWkb(_) => {
365            let geom = arr
366                .as_wkb::<i64>()
367                .value(i)
368                .map_err(|err| GeozeroError::Geometry(err.to_string()))?;
369            process_geometry(&geom, 0, processor)?;
370        }
371        WkbView(_) => {
372            let geom = arr
373                .as_wkb_view()
374                .value(i)
375                .map_err(|err| GeozeroError::Geometry(err.to_string()))?;
376            process_geometry(&geom, 0, processor)?;
377        }
378        Wkt(_) => {
379            let geom = arr
380                .as_wkt::<i32>()
381                .value(i)
382                .map_err(|err| GeozeroError::Geometry(err.to_string()))?;
383            process_geometry(&geom, 0, processor)?;
384        }
385        LargeWkt(_) => {
386            let geom = arr
387                .as_wkt::<i64>()
388                .value(i)
389                .map_err(|err| GeozeroError::Geometry(err.to_string()))?;
390            process_geometry(&geom, 0, processor)?;
391        }
392        WktView(_) => {
393            let geom = arr
394                .as_wkt_view()
395                .value(i)
396                .map_err(|err| GeozeroError::Geometry(err.to_string()))?;
397            process_geometry(&geom, 0, processor)?;
398        }
399        Rect(_) => {
400            let geom = arr.as_rect().value(i).unwrap();
401            let wrapper = RectWrapper::try_new(&geom)
402                .map_err(|err| geozero::error::GeozeroError::Geometry(err.to_string()))?;
403            process_polygon(&wrapper, true, 0, processor)?
404        }
405        Geometry(_) => {
406            let geom = arr.as_geometry().value(i).unwrap();
407            process_geometry(&geom, 0, processor)?;
408        }
409    }
410
411    Ok(())
412}
413
414fn geometry_columns(schema: &Schema) -> Vec<usize> {
415    let mut geom_indices = vec![];
416    for (field_idx, field) in schema.fields().iter().enumerate() {
417        if GeoArrowType::from_extension_field(field.as_ref()).is_ok() {
418            geom_indices.push(field_idx);
419        }
420    }
421    geom_indices
422}