geoarrow/algorithm/native/
explode.rs

1use std::sync::Arc;
2
3use arrow::compute::take;
4use arrow_array::{Int32Array, OffsetSizeTrait, RecordBatch};
5use arrow_buffer::OffsetBuffer;
6use arrow_schema::SchemaBuilder;
7
8use crate::array::*;
9use crate::chunked_array::{
10    from_geoarrow_chunks, ChunkedArray, ChunkedGeometryArray, ChunkedGeometryArrayTrait,
11};
12use crate::datatypes::{Dimension, GeoDataType};
13use crate::error::{GeoArrowError, Result};
14use crate::table::Table;
15use crate::GeometryArrayTrait;
16
17pub trait Explode {
18    type Output;
19
20    /// Returns the exploded geometries and, if an explode needs to happen, the indices that should
21    /// be passed into a [`take`][arrow::compute::take] operation.
22    fn explode(&self) -> Self::Output;
23}
24
25impl Explode for PointArray<2> {
26    type Output = (Self, Option<Int32Array>);
27
28    fn explode(&self) -> Self::Output {
29        (self.clone(), None)
30    }
31}
32
33impl<O: OffsetSizeTrait> Explode for LineStringArray<O, 2> {
34    type Output = (Self, Option<Int32Array>);
35
36    fn explode(&self) -> Self::Output {
37        (self.clone(), None)
38    }
39}
40
41impl<O: OffsetSizeTrait> Explode for PolygonArray<O, 2> {
42    type Output = (Self, Option<Int32Array>);
43
44    fn explode(&self) -> Self::Output {
45        (self.clone(), None)
46    }
47}
48
49/// Convert from offsets into a buffer to indices that need to be taken
50///
51/// e.g. if `offsets` is `[0, 2, 5, 10]`, then there are 2, 3, and 5 elements. The indices needed
52/// for a take to explode this array are
53/// ```notest
54/// [0, 0, 1, 1, 1, 2, 2, 2, 2, 2]
55/// ```
56/// Also note that the length of the `indices` created is the same as the last value of the
57/// offsets.
58fn explode_offsets<O: OffsetSizeTrait>(offsets: &OffsetBuffer<O>) -> Int32Array {
59    let mut take_indices: Vec<i32> =
60        Vec::with_capacity(offsets.last().unwrap().to_usize().unwrap());
61    for (offset_idx, offset_start_end) in offsets.as_ref().windows(2).enumerate() {
62        let offset_start = offset_start_end[0].to_usize().unwrap();
63        let offset_end = offset_start_end[1].to_usize().unwrap();
64        for _ in offset_start..offset_end {
65            take_indices.push(offset_idx.try_into().unwrap());
66        }
67    }
68    Int32Array::new(take_indices.into(), None)
69}
70
71impl<O: OffsetSizeTrait> Explode for MultiPointArray<O, 2> {
72    type Output = (PointArray<2>, Option<Int32Array>);
73
74    fn explode(&self) -> Self::Output {
75        assert_eq!(
76            self.null_count(),
77            0,
78            "Null values not yet supported in explode"
79        );
80
81        let exploded_geoms = PointArray::new(self.coords.clone(), None, self.metadata());
82        let take_indices = explode_offsets(self.geom_offsets());
83        (exploded_geoms, Some(take_indices))
84    }
85}
86
87impl<O: OffsetSizeTrait> Explode for MultiLineStringArray<O, 2> {
88    type Output = (LineStringArray<O, 2>, Option<Int32Array>);
89
90    fn explode(&self) -> Self::Output {
91        assert_eq!(
92            self.null_count(),
93            0,
94            "Null values not yet supported in explode"
95        );
96
97        let exploded_geoms = LineStringArray::new(
98            self.coords.clone(),
99            self.ring_offsets.clone(),
100            None,
101            self.metadata(),
102        );
103        let take_indices = explode_offsets(self.geom_offsets());
104        (exploded_geoms, Some(take_indices))
105    }
106}
107
108impl<O: OffsetSizeTrait> Explode for MultiPolygonArray<O, 2> {
109    type Output = (PolygonArray<O, 2>, Option<Int32Array>);
110
111    fn explode(&self) -> Self::Output {
112        assert_eq!(
113            self.null_count(),
114            0,
115            "Null values not yet supported in explode"
116        );
117
118        let exploded_geoms = PolygonArray::new(
119            self.coords.clone(),
120            self.polygon_offsets.clone(),
121            self.ring_offsets.clone(),
122            None,
123            self.metadata(),
124        );
125        let take_indices = explode_offsets(self.geom_offsets());
126        (exploded_geoms, Some(take_indices))
127    }
128}
129
130impl Explode for &dyn GeometryArrayTrait {
131    type Output = Result<(Arc<dyn GeometryArrayTrait>, Option<Int32Array>)>;
132
133    fn explode(&self) -> Self::Output {
134        macro_rules! call_explode {
135            ($as_func:ident) => {{
136                let (exploded_geoms, take_indices) = self.$as_func().explode();
137                (Arc::new(exploded_geoms), take_indices)
138            }};
139        }
140
141        use Dimension::*;
142        use GeoDataType::*;
143
144        let result: (Arc<dyn GeometryArrayTrait>, Option<Int32Array>) = match self.data_type() {
145            Point(_, XY) => call_explode!(as_point),
146            LineString(_, XY) => call_explode!(as_line_string),
147            LargeLineString(_, XY) => call_explode!(as_large_line_string),
148            Polygon(_, XY) => call_explode!(as_polygon),
149            LargePolygon(_, XY) => call_explode!(as_large_polygon),
150            MultiPoint(_, XY) => call_explode!(as_multi_point),
151            LargeMultiPoint(_, XY) => call_explode!(as_large_multi_point),
152            MultiLineString(_, XY) => call_explode!(as_multi_line_string),
153            LargeMultiLineString(_, XY) => call_explode!(as_large_multi_line_string),
154            MultiPolygon(_, XY) => call_explode!(as_multi_polygon),
155            LargeMultiPolygon(_, XY) => call_explode!(as_large_multi_polygon),
156            // Mixed(_, XY) => self.as_mixed::<2>().explode(),
157            // LargeMixed(_, XY) => self.as_large_mixed::<2>().explode(),
158            // GeometryCollection(_, XY) => self.as_geometry_collection::<2>().explode(),
159            // LargeGeometryCollection(_, XY) => self.as_large_geometry_collection::<2>().explode(),
160            _ => return Err(GeoArrowError::IncorrectType("".into())),
161        };
162        Ok(result)
163    }
164}
165
166impl<G: GeometryArrayTrait> Explode for ChunkedGeometryArray<G> {
167    type Output = Result<(
168        Arc<dyn ChunkedGeometryArrayTrait>,
169        Option<ChunkedArray<Int32Array>>,
170    )>;
171
172    fn explode(&self) -> Self::Output {
173        let result = self.try_map(|chunk| chunk.as_ref().explode())?;
174
175        // Convert Vec of tuples to tuple of vecs
176        let (geometry_arrays, take_indices): (Vec<_>, Vec<_>) = result.into_iter().unzip();
177        let geometry_array_refs = geometry_arrays
178            .iter()
179            .map(|x| x.as_ref())
180            .collect::<Vec<_>>();
181
182        // Convert Vec<Option<_>> to Option<Vec<_>>
183        let take_indices: Option<Vec<_>> = take_indices.into_iter().collect();
184        Ok((
185            from_geoarrow_chunks(geometry_array_refs.as_slice())?,
186            take_indices.map(ChunkedArray::new),
187        ))
188    }
189}
190
191impl Explode for &dyn ChunkedGeometryArrayTrait {
192    type Output = Result<(
193        Arc<dyn ChunkedGeometryArrayTrait>,
194        Option<ChunkedArray<Int32Array>>,
195    )>;
196
197    fn explode(&self) -> Self::Output {
198        use Dimension::*;
199        use GeoDataType::*;
200
201        match self.data_type() {
202            Point(_, XY) => self.as_point::<2>().explode(),
203            LineString(_, XY) => self.as_line_string::<2>().explode(),
204            LargeLineString(_, XY) => self.as_large_line_string::<2>().explode(),
205            Polygon(_, XY) => self.as_polygon::<2>().explode(),
206            LargePolygon(_, XY) => self.as_large_polygon::<2>().explode(),
207            MultiPoint(_, XY) => self.as_multi_point::<2>().explode(),
208            LargeMultiPoint(_, XY) => self.as_large_multi_point::<2>().explode(),
209            MultiLineString(_, XY) => self.as_multi_line_string::<2>().explode(),
210            LargeMultiLineString(_, XY) => self.as_large_multi_line_string::<2>().explode(),
211            MultiPolygon(_, XY) => self.as_multi_polygon::<2>().explode(),
212            LargeMultiPolygon(_, XY) => self.as_large_multi_polygon::<2>().explode(),
213            Mixed(_, XY) => self.as_mixed::<2>().explode(),
214            LargeMixed(_, XY) => self.as_large_mixed::<2>().explode(),
215            GeometryCollection(_, XY) => self.as_geometry_collection::<2>().explode(),
216            LargeGeometryCollection(_, XY) => self.as_large_geometry_collection::<2>().explode(),
217            Rect(XY) => self.as_rect::<2>().explode(),
218            _ => todo!(),
219        }
220    }
221}
222
223pub trait ExplodeTable {
224    /// Returns the exploded geometries and, if an explode needs to happen, the indices that should
225    /// be passed into a [`take`][arrow::compute::take] operation.
226    fn explode(&self, index: Option<usize>) -> Result<Table>;
227}
228
229impl ExplodeTable for Table {
230    fn explode(&self, index: Option<usize>) -> Result<Table> {
231        let index = if let Some(index) = index {
232            index
233        } else {
234            self.default_geometry_column_idx()?
235        };
236
237        let geometry_column = self.geometry_column(Some(index))?;
238        let (exploded_geometry, take_indices) = geometry_column.as_ref().explode()?;
239
240        // TODO: optionally use rayon?
241        if let Some(take_indices) = take_indices {
242            // Remove existing geometry column
243            let mut new_table = self.clone();
244            new_table.remove_column(index);
245
246            let field = exploded_geometry.extension_field();
247
248            // Call take on each chunk and append geometry chunk
249            let new_batches = new_table
250                .batches()
251                .iter()
252                .zip(take_indices.chunks())
253                .zip(exploded_geometry.geometry_chunks())
254                .map(|((batch, indices), geom_chunk)| {
255                    let mut schema_builder = SchemaBuilder::from(batch.schema().as_ref().clone());
256
257                    let mut new_columns = batch
258                        .columns()
259                        .iter()
260                        .map(|values| Ok(take(values, indices, None)?))
261                        .collect::<Result<Vec<_>>>()?;
262
263                    // Add geometry column
264                    new_columns.push(geom_chunk.to_array_ref());
265                    schema_builder.push(field.clone());
266
267                    Ok(RecordBatch::try_new(
268                        schema_builder.finish().into(),
269                        new_columns,
270                    )?)
271                })
272                .collect::<Result<Vec<_>>>()?;
273
274            // Update top-level schema
275            let mut schema_builder = SchemaBuilder::from(new_table.schema().as_ref().clone());
276            schema_builder.push(field.clone());
277            let schema = schema_builder.finish();
278
279            Table::try_new(new_batches, schema.into())
280        } else {
281            // No take is necessary; nothing happens
282            Ok(self.clone())
283        }
284    }
285}
286
287#[cfg(test)]
288mod test {
289    use super::*;
290    use crate::test::multipoint;
291    use crate::trait_::GeometryArrayAccessor;
292
293    #[test]
294    fn explode_multi_point() {
295        let arr = multipoint::mp_array();
296        let (exploded_geoms, take_indices) = arr.explode();
297
298        assert_eq!(exploded_geoms.value_as_geo(0), multipoint::mp0().0[0]);
299        assert_eq!(exploded_geoms.value_as_geo(1), multipoint::mp0().0[1]);
300        assert_eq!(exploded_geoms.value_as_geo(2), multipoint::mp1().0[0]);
301        assert_eq!(exploded_geoms.value_as_geo(3), multipoint::mp1().0[1]);
302
303        let take_indices = take_indices.unwrap();
304        assert_eq!(take_indices.value(0), 0);
305        assert_eq!(take_indices.value(1), 0);
306        assert_eq!(take_indices.value(2), 1);
307        assert_eq!(take_indices.value(3), 1);
308    }
309}