1use anyhow::{anyhow, Result};
2use enum_dispatch::enum_dispatch;
3use geo::geometry::Geometry;
4use geojson;
5use polars::prelude::*;
6use serde::{Deserialize, Serialize};
7use serde_json::json;
8use serde_json::Value;
9use std::fmt::Write as FmtWrite;
10use std::io::Cursor;
11use std::io::Write;
12use wkb::geom_to_wkb;
13use wkt::TryFromWkt;
14
15fn convert_wkt_to_wkb_string(s: &Series) -> PolarsResult<Option<Series>> {
18 let ca = s.str()?;
19 let wkb_series = ca
20 .into_iter()
21 .map(|opt_wkt| {
22 opt_wkt
23 .map(|wkt_str| {
24 Geometry::try_from_wkt_str(wkt_str)
25 .map_err(|err| {
26 PolarsError::ComputeError(
27 format!("Failed to parse wkt: {err:?}").into(),
28 )
29 })
30 .and_then(|geom: Geometry<f64>| {
31 geom_to_wkb(&geom).map_err(|_| {
32 PolarsError::ComputeError("Failed to format geom: {err:?}".into())
33 })
34 })
35 })
36 .unwrap_or_else(|| Ok(Vec::new()))
37 })
38 .collect::<Result<Vec<Vec<u8>>, _>>()?;
39
40 let wkb_string_series: Vec<String> = wkb_series
41 .into_iter()
42 .map(|v| {
43 v.iter().fold(String::new(), |mut acc, s| {
44 let _ = write!(acc, "{s}");
45 acc
46 })
47 })
48 .collect();
49 Ok(Some(Series::new("geometry", wkb_string_series)))
50}
51
52fn any_value_to_json(value: &AnyValue) -> Result<Value> {
55 match value {
56 AnyValue::Null => Ok(Value::Null),
57 AnyValue::Boolean(b) => Ok(Value::Bool(*b)),
58 AnyValue::String(s) => Ok(Value::String((*s).to_string())),
59 AnyValue::Int8(n) => Ok(json!(*n)),
60 AnyValue::Int16(n) => Ok(json!(*n)),
61 AnyValue::Int32(n) => Ok(json!(*n)),
62 AnyValue::Int64(n) => Ok(json!(*n)),
63 AnyValue::UInt8(n) => Ok(json!(*n)),
64 AnyValue::UInt16(n) => Ok(json!(*n)),
65 AnyValue::UInt32(n) => Ok(json!(*n)),
66 AnyValue::UInt64(n) => Ok(json!(*n)),
67 AnyValue::Float32(n) => Ok(json!(*n)),
68 AnyValue::Float64(n) => Ok(json!(*n)),
69 AnyValue::Date(d) => Ok(json!(d.to_string())), AnyValue::Datetime(dt, _, _) => Ok(json!(dt.to_string())), AnyValue::Time(t) => Ok(json!(t.to_string())), AnyValue::List(series) => {
73 let json_values: Result<Vec<Value>> =
74 series.iter().map(|val| any_value_to_json(&val)).collect();
75 Ok(Value::Array(json_values?))
76 }
77 _ => Err(anyhow!("Failed to convert type")),
78 }
79}
80
81#[enum_dispatch]
86pub trait OutputGenerator {
87 fn save(&self, writer: &mut impl Write, df: &mut DataFrame) -> Result<()>;
88 fn format(&self, df: &mut DataFrame) -> Result<String> {
89 let mut data: Vec<u8> = vec![];
91 let mut buff = Cursor::new(&mut data);
92 self.save(&mut buff, df)?;
93
94 Ok(String::from_utf8(data)?)
95 }
96}
97
98#[enum_dispatch(OutputGenerator)]
101#[derive(Serialize, Deserialize, Debug)]
102pub enum OutputFormatter {
103 GeoJSON(GeoJSONFormatter),
104 GeoJSONSeq(GeoJSONSeqFormatter),
105 Csv(CSVFormatter),
106}
107
108#[derive(Serialize, Deserialize, Debug)]
112pub struct GeoJSONSeqFormatter;
113
114impl OutputGenerator for GeoJSONSeqFormatter {
115 fn save(&self, writer: &mut impl Write, df: &mut DataFrame) -> Result<()> {
116 let geometry_col = df.column("geometry")?;
117 let other_cols = df.drop("geometry")?;
118 for (idx, geom) in geometry_col.str()?.into_iter().enumerate() {
119 if let Some(wkt_str) = geom {
120 let geom: Geometry<f64> = Geometry::try_from_wkt_str(wkt_str).map_err(|err| {
121 anyhow!("Invalid `Geometry<f64>` from well-known text string: {err}")
122 })?;
123 let mut properties = serde_json::Map::new();
124 for col in other_cols.get_columns() {
125 let val = any_value_to_json(&col.get(idx)?)?;
126 properties.insert(col.name().to_string(), val);
127 }
128 let feature = geojson::Feature {
129 bbox: None,
130 geometry: Some(geojson::Geometry::from(&geom)),
131 id: None,
132 properties: Some(properties),
133 foreign_members: None,
134 };
135 writeln!(writer, "{feature}")?;
136 }
137 }
138 Ok(())
139 }
140}
141
142#[derive(Serialize, Deserialize, Debug)]
147pub enum GeoFormat {
148 Wkb,
149 Wkt,
150}
151
152#[derive(Serialize, Deserialize, Debug, Default)]
155pub struct CSVFormatter {
156 pub geo_format: Option<GeoFormat>,
157}
158
159impl OutputGenerator for CSVFormatter {
160 fn save(&self, writer: &mut impl Write, df: &mut DataFrame) -> Result<()> {
161 if let Some(GeoFormat::Wkb) = self.geo_format {
162 let mut df = df
163 .clone()
164 .lazy()
165 .with_column(
166 col("geometry")
167 .map(
168 |s: Series| convert_wkt_to_wkb_string(&s),
169 GetOutput::from_type(DataType::String),
170 )
171 .alias("geometry"),
172 )
173 .collect()?;
174 CsvWriter::new(writer).finish(&mut df)?;
175 } else {
176 CsvWriter::new(writer).finish(df)?;
177 };
178 Ok(())
179 }
180}
181
182#[derive(Serialize, Deserialize, Debug, Default)]
187pub struct GeoJSONFormatter;
188
189impl OutputGenerator for GeoJSONFormatter {
190 fn format(&self, df: &mut DataFrame) -> Result<String> {
191 let geometry_col = df.column("geometry")?;
192 let other_cols = df.drop("geometry")?;
193 let mut features: Vec<geojson::Feature> = vec![];
194
195 for (idx, geom) in geometry_col.str()?.into_iter().enumerate() {
196 if let Some(wkt_str) = geom {
197 let geom: Geometry<f64> = Geometry::try_from_wkt_str(wkt_str)
198 .map_err(|_| anyhow!("Failed to parse geometry"))?;
199 let mut properties = serde_json::Map::new();
200
201 for col in other_cols.get_columns() {
202 let val = any_value_to_json(&col.get(idx)?)?;
203 properties.insert(col.name().to_string(), val);
204 }
205
206 let feature = geojson::Feature {
207 geometry: Some(geojson::Geometry::from(&geom)),
208 properties: Some(properties),
209 bbox: None,
210 id: None,
211 foreign_members: None,
212 };
213 features.push(feature);
214 }
215 }
216
217 let feature_collection = geojson::FeatureCollection {
218 bbox: None,
219 features,
220 foreign_members: None,
221 };
222 Ok(feature_collection.to_string())
223 }
224
225 fn save(&self, writer: &mut impl Write, df: &mut DataFrame) -> Result<()> {
226 let result = self.format(df)?;
227 writer.write_all(result.as_bytes())?;
228
229 Ok(())
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236
237 fn test_df() -> DataFrame {
238 df!(
239 "int_val" => &[2, 3, 4],
240 "float_val" => &[2.0, 3.0, 4.0],
241 "str_val" => &["two", "three", "four"],
242 "geometry" => &["POINT (0 0)", "POINT (20 20)", "POINT (30 44)"]
243 )
244 .unwrap()
245 }
246
247 #[test]
248 fn geojson_formatter_should_work() {
249 let formatter = GeoJSONFormatter;
250 let mut df = test_df();
251 let output = formatter.format(&mut df);
252 assert!(output.is_ok(), "Output should not error");
253 let correct_str = r#"{"features":[{"geometry":{"coordinates":[0.0,0.0],"type":"Point"},"properties":{"float_val":2.0,"int_val":2,"str_val":"two"},"type":"Feature"},{"geometry":{"coordinates":[20.0,20.0],"type":"Point"},"properties":{"float_val":3.0,"int_val":3,"str_val":"three"},"type":"Feature"},{"geometry":{"coordinates":[30.0,44.0],"type":"Point"},"properties":{"float_val":4.0,"int_val":4,"str_val":"four"},"type":"Feature"}],"type":"FeatureCollection"}"#;
254
255 let actual_value: Value = serde_json::from_str(output.as_ref().unwrap()).unwrap();
258 let correct_value: Value = serde_json::from_str(correct_str).unwrap();
259 assert_eq!(actual_value, correct_value, "Output should be correct");
260 }
261
262 #[test]
263 fn geojsonseq_formatter_should_work() {
264 let formatter = GeoJSONSeqFormatter;
265 let mut df = test_df();
266 let output = formatter.format(&mut df);
267
268 let correct_str = [
269 r#"{"geometry":{"coordinates":[0.0,0.0],"type":"Point"},"properties":{"float_val":2.0,"int_val":2,"str_val":"two"},"type":"Feature"}"#,
270 r#"{"geometry":{"coordinates":[20.0,20.0],"type":"Point"},"properties":{"float_val":3.0,"int_val":3,"str_val":"three"},"type":"Feature"}"#,
271 r#"{"geometry":{"coordinates":[30.0,44.0],"type":"Point"},"properties":{"float_val":4.0,"int_val":4,"str_val":"four"},"type":"Feature"}"#,
272 ].join("\n");
273 assert!(output.is_ok(), "Output should not error");
274 assert_eq!(
275 correct_str.lines().count(),
276 output.as_ref().unwrap().lines().count()
277 );
278 for (correct, actual) in correct_str.lines().zip(output.unwrap().lines()) {
281 assert_eq!(
282 serde_json::from_str::<Value>(correct).unwrap(),
283 serde_json::from_str::<Value>(actual).unwrap()
284 );
285 }
286 }
287
288 #[test]
289 fn csv_formatter_should_work() {
290 let formatter = CSVFormatter { geo_format: None };
291 let mut df = test_df();
292 let output = formatter.format(&mut df);
293 let correct_str = [
294 "int_val,float_val,str_val,geometry",
295 "2,2.0,two,POINT (0 0)",
296 "3,3.0,three,POINT (20 20)",
297 "4,4.0,four,POINT (30 44)",
298 "",
299 ]
300 .join("\n");
301
302 assert!(output.is_ok(), "Output should not error");
303 assert_eq!(output.unwrap(), correct_str, "Output should be correct");
304 }
305
306 #[test]
307 fn csv_formatter_with_wkb_should_work() {
308 let formatter = CSVFormatter {
309 geo_format: Some(GeoFormat::Wkb),
310 };
311 let mut df = test_df();
312 let output = formatter.format(&mut df);
313 let correct_str = [
314 "int_val,float_val,str_val,geometry",
315 "2,2.0,two,110000000000000000000",
316 "3,3.0,three,1100000000052640000005264",
317 "4,4.0,four,1100000000062640000007064",
318 "",
319 ]
320 .join("\n");
321
322 assert!(output.is_ok(), "Output should not error");
323 assert_eq!(output.unwrap(), correct_str, "Output should be correct");
324 }
325}