popgetter_core/
formatters.rs

1use anyhow::{anyhow, Result};
2use enum_dispatch::enum_dispatch;
3use geo::geometry::Geometry;
4use geojson;
5use polars::prelude::*;
6use serde::{Deserialize, Serialize};
7use serde_json::json;
8use serde_json::Value;
9use std::fmt::Write as FmtWrite;
10use std::io::Cursor;
11use std::io::Write;
12use wkb::geom_to_wkb;
13use wkt::TryFromWkt;
14
15/// Utility function to convert a polars series from WKT geometries to
16/// WKB geometries (as a string)
17fn convert_wkt_to_wkb_string(s: &Series) -> PolarsResult<Option<Series>> {
18    let ca = s.str()?;
19    let wkb_series = ca
20        .into_iter()
21        .map(|opt_wkt| {
22            opt_wkt
23                .map(|wkt_str| {
24                    Geometry::try_from_wkt_str(wkt_str)
25                        .map_err(|err| {
26                            PolarsError::ComputeError(
27                                format!("Failed to parse wkt: {err:?}").into(),
28                            )
29                        })
30                        .and_then(|geom: Geometry<f64>| {
31                            geom_to_wkb(&geom).map_err(|_| {
32                                PolarsError::ComputeError("Failed to format geom: {err:?}".into())
33                            })
34                        })
35                })
36                .unwrap_or_else(|| Ok(Vec::new()))
37        })
38        .collect::<Result<Vec<Vec<u8>>, _>>()?;
39
40    let wkb_string_series: Vec<String> = wkb_series
41        .into_iter()
42        .map(|v| {
43            v.iter().fold(String::new(), |mut acc, s| {
44                let _ = write!(acc, "{s}");
45                acc
46            })
47        })
48        .collect();
49    Ok(Some(Series::new("geometry", wkb_string_series)))
50}
51
52/// Utility function to convert from polars `AnyValue` to `serde_json::Value`
53/// Doesn't cover all types but most of them.
54fn any_value_to_json(value: &AnyValue) -> Result<Value> {
55    match value {
56        AnyValue::Null => Ok(Value::Null),
57        AnyValue::Boolean(b) => Ok(Value::Bool(*b)),
58        AnyValue::String(s) => Ok(Value::String((*s).to_string())),
59        AnyValue::Int8(n) => Ok(json!(*n)),
60        AnyValue::Int16(n) => Ok(json!(*n)),
61        AnyValue::Int32(n) => Ok(json!(*n)),
62        AnyValue::Int64(n) => Ok(json!(*n)),
63        AnyValue::UInt8(n) => Ok(json!(*n)),
64        AnyValue::UInt16(n) => Ok(json!(*n)),
65        AnyValue::UInt32(n) => Ok(json!(*n)),
66        AnyValue::UInt64(n) => Ok(json!(*n)),
67        AnyValue::Float32(n) => Ok(json!(*n)),
68        AnyValue::Float64(n) => Ok(json!(*n)),
69        AnyValue::Date(d) => Ok(json!(d.to_string())), // You might want to format this
70        AnyValue::Datetime(dt, _, _) => Ok(json!(dt.to_string())), // You might want to format this
71        AnyValue::Time(t) => Ok(json!(t.to_string())), // You might want to format this
72        AnyValue::List(series) => {
73            let json_values: Result<Vec<Value>> =
74                series.iter().map(|val| any_value_to_json(&val)).collect();
75            Ok(Value::Array(json_values?))
76        }
77        _ => Err(anyhow!("Failed to convert type")),
78    }
79}
80
81/// Trait to define different output generators. Defines two
82/// functions, format which generates a serialized string of the
83/// `DataFrame` and save which generates a file with the generated
84/// file
85#[enum_dispatch]
86pub trait OutputGenerator {
87    fn save(&self, writer: &mut impl Write, df: &mut DataFrame) -> Result<()>;
88    fn format(&self, df: &mut DataFrame) -> Result<String> {
89        // Just creating an empty vec to store the buffered output
90        let mut data: Vec<u8> = vec![];
91        let mut buff = Cursor::new(&mut data);
92        self.save(&mut buff, df)?;
93
94        Ok(String::from_utf8(data)?)
95    }
96}
97
98/// Enum of OutputFormatters one for each potential
99/// output type
100#[enum_dispatch(OutputGenerator)]
101#[derive(Serialize, Deserialize, Debug)]
102pub enum OutputFormatter {
103    GeoJSON(GeoJSONFormatter),
104    GeoJSONSeq(GeoJSONSeqFormatter),
105    Csv(CSVFormatter),
106}
107
108/// Format the results as geojson sequence format
109/// This is one line per feature serialized as a
110/// geojson feature
111#[derive(Serialize, Deserialize, Debug)]
112pub struct GeoJSONSeqFormatter;
113
114impl OutputGenerator for GeoJSONSeqFormatter {
115    fn save(&self, writer: &mut impl Write, df: &mut DataFrame) -> Result<()> {
116        let geometry_col = df.column("geometry")?;
117        let other_cols = df.drop("geometry")?;
118        for (idx, geom) in geometry_col.str()?.into_iter().enumerate() {
119            if let Some(wkt_str) = geom {
120                let geom: Geometry<f64> = Geometry::try_from_wkt_str(wkt_str).map_err(|err| {
121                    anyhow!("Invalid `Geometry<f64>` from well-known text string: {err}")
122                })?;
123                let mut properties = serde_json::Map::new();
124                for col in other_cols.get_columns() {
125                    let val = any_value_to_json(&col.get(idx)?)?;
126                    properties.insert(col.name().to_string(), val);
127                }
128                let feature = geojson::Feature {
129                    bbox: None,
130                    geometry: Some(geojson::Geometry::from(&geom)),
131                    id: None,
132                    properties: Some(properties),
133                    foreign_members: None,
134                };
135                writeln!(writer, "{feature}")?;
136            }
137        }
138        Ok(())
139    }
140}
141
142/// Define what format geometries are represented in
143///
144/// Wkb: Well-known binary
145/// Wkt: Well-known text
146#[derive(Serialize, Deserialize, Debug)]
147pub enum GeoFormat {
148    Wkb,
149    Wkt,
150}
151
152/// Format the results as a CSV file with the
153/// geometry encoded in the specified format
154#[derive(Serialize, Deserialize, Debug, Default)]
155pub struct CSVFormatter {
156    pub geo_format: Option<GeoFormat>,
157}
158
159impl OutputGenerator for CSVFormatter {
160    fn save(&self, writer: &mut impl Write, df: &mut DataFrame) -> Result<()> {
161        if let Some(GeoFormat::Wkb) = self.geo_format {
162            let mut df = df
163                .clone()
164                .lazy()
165                .with_column(
166                    col("geometry")
167                        .map(
168                            |s: Series| convert_wkt_to_wkb_string(&s),
169                            GetOutput::from_type(DataType::String),
170                        )
171                        .alias("geometry"),
172                )
173                .collect()?;
174            CsvWriter::new(writer).finish(&mut df)?;
175        } else {
176            CsvWriter::new(writer).finish(df)?;
177        };
178        Ok(())
179    }
180}
181
182/// Format the results as a geojson file
183/// TODO there is probably a better way to do this using
184/// geozero to process the dataframe to a file without
185/// having to construct the entire thing in memory first
186#[derive(Serialize, Deserialize, Debug, Default)]
187pub struct GeoJSONFormatter;
188
189impl OutputGenerator for GeoJSONFormatter {
190    fn format(&self, df: &mut DataFrame) -> Result<String> {
191        let geometry_col = df.column("geometry")?;
192        let other_cols = df.drop("geometry")?;
193        let mut features: Vec<geojson::Feature> = vec![];
194
195        for (idx, geom) in geometry_col.str()?.into_iter().enumerate() {
196            if let Some(wkt_str) = geom {
197                let geom: Geometry<f64> = Geometry::try_from_wkt_str(wkt_str)
198                    .map_err(|_| anyhow!("Failed to parse geometry"))?;
199                let mut properties = serde_json::Map::new();
200
201                for col in other_cols.get_columns() {
202                    let val = any_value_to_json(&col.get(idx)?)?;
203                    properties.insert(col.name().to_string(), val);
204                }
205
206                let feature = geojson::Feature {
207                    geometry: Some(geojson::Geometry::from(&geom)),
208                    properties: Some(properties),
209                    bbox: None,
210                    id: None,
211                    foreign_members: None,
212                };
213                features.push(feature);
214            }
215        }
216
217        let feature_collection = geojson::FeatureCollection {
218            bbox: None,
219            features,
220            foreign_members: None,
221        };
222        Ok(feature_collection.to_string())
223    }
224
225    fn save(&self, writer: &mut impl Write, df: &mut DataFrame) -> Result<()> {
226        let result = self.format(df)?;
227        writer.write_all(result.as_bytes())?;
228
229        Ok(())
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236
237    fn test_df() -> DataFrame {
238        df!(
239             "int_val" => &[2, 3, 4],
240             "float_val" => &[2.0, 3.0, 4.0],
241             "str_val" => &["two", "three", "four"],
242             "geometry" => &["POINT (0 0)", "POINT (20 20)", "POINT (30 44)"]
243        )
244        .unwrap()
245    }
246
247    #[test]
248    fn geojson_formatter_should_work() {
249        let formatter = GeoJSONFormatter;
250        let mut df = test_df();
251        let output = formatter.format(&mut df);
252        assert!(output.is_ok(), "Output should not error");
253        let correct_str = r#"{"features":[{"geometry":{"coordinates":[0.0,0.0],"type":"Point"},"properties":{"float_val":2.0,"int_val":2,"str_val":"two"},"type":"Feature"},{"geometry":{"coordinates":[20.0,20.0],"type":"Point"},"properties":{"float_val":3.0,"int_val":3,"str_val":"three"},"type":"Feature"},{"geometry":{"coordinates":[30.0,44.0],"type":"Point"},"properties":{"float_val":4.0,"int_val":4,"str_val":"four"},"type":"Feature"}],"type":"FeatureCollection"}"#;
254
255        // Deserialize back to `Value` to compare for equality as order of fields not guaranteed
256        // within the JSON serialized string
257        let actual_value: Value = serde_json::from_str(output.as_ref().unwrap()).unwrap();
258        let correct_value: Value = serde_json::from_str(correct_str).unwrap();
259        assert_eq!(actual_value, correct_value, "Output should be correct");
260    }
261
262    #[test]
263    fn geojsonseq_formatter_should_work() {
264        let formatter = GeoJSONSeqFormatter;
265        let mut df = test_df();
266        let output = formatter.format(&mut df);
267
268        let correct_str = [
269            r#"{"geometry":{"coordinates":[0.0,0.0],"type":"Point"},"properties":{"float_val":2.0,"int_val":2,"str_val":"two"},"type":"Feature"}"#,
270            r#"{"geometry":{"coordinates":[20.0,20.0],"type":"Point"},"properties":{"float_val":3.0,"int_val":3,"str_val":"three"},"type":"Feature"}"#,
271            r#"{"geometry":{"coordinates":[30.0,44.0],"type":"Point"},"properties":{"float_val":4.0,"int_val":4,"str_val":"four"},"type":"Feature"}"#,
272        ].join("\n");
273        assert!(output.is_ok(), "Output should not error");
274        assert_eq!(
275            correct_str.lines().count(),
276            output.as_ref().unwrap().lines().count()
277        );
278        // Deserialize each line back to `Value` to compare for equality as order of fields not
279        // guaranteed within the JSON serialized string
280        for (correct, actual) in correct_str.lines().zip(output.unwrap().lines()) {
281            assert_eq!(
282                serde_json::from_str::<Value>(correct).unwrap(),
283                serde_json::from_str::<Value>(actual).unwrap()
284            );
285        }
286    }
287
288    #[test]
289    fn csv_formatter_should_work() {
290        let formatter = CSVFormatter { geo_format: None };
291        let mut df = test_df();
292        let output = formatter.format(&mut df);
293        let correct_str = [
294            "int_val,float_val,str_val,geometry",
295            "2,2.0,two,POINT (0 0)",
296            "3,3.0,three,POINT (20 20)",
297            "4,4.0,four,POINT (30 44)",
298            "",
299        ]
300        .join("\n");
301
302        assert!(output.is_ok(), "Output should not error");
303        assert_eq!(output.unwrap(), correct_str, "Output should be correct");
304    }
305
306    #[test]
307    fn csv_formatter_with_wkb_should_work() {
308        let formatter = CSVFormatter {
309            geo_format: Some(GeoFormat::Wkb),
310        };
311        let mut df = test_df();
312        let output = formatter.format(&mut df);
313        let correct_str = [
314            "int_val,float_val,str_val,geometry",
315            "2,2.0,two,110000000000000000000",
316            "3,3.0,three,1100000000052640000005264",
317            "4,4.0,four,1100000000062640000007064",
318            "",
319        ]
320        .join("\n");
321
322        assert!(output.is_ok(), "Output should not error");
323        assert_eq!(output.unwrap(), correct_str, "Output should be correct");
324    }
325}