Skip to main content

floe_core/io/read/
json.rs

1use std::collections::{BTreeMap, HashSet};
2use std::io::BufRead;
3use std::path::Path;
4
5use polars::prelude::{DataFrame, NamedFrom, Series};
6use serde_json::Value;
7
8use crate::io::format::{self, FileReadError, InputAdapter, InputFile, ReadInput};
9use crate::{config, FloeResult};
10
11struct JsonInputAdapter;
12
13static JSON_INPUT_ADAPTER: JsonInputAdapter = JsonInputAdapter;
14
15pub(crate) fn json_input_adapter() -> &'static dyn InputAdapter {
16    &JSON_INPUT_ADAPTER
17}
18
19#[derive(Debug, Clone)]
20pub struct JsonReadError {
21    pub rule: String,
22    pub message: String,
23}
24
25impl std::fmt::Display for JsonReadError {
26    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27        write!(f, "{}: {}", self.rule, self.message)
28    }
29}
30
31impl std::error::Error for JsonReadError {}
32
33fn read_ndjson_file(input_path: &Path) -> Result<DataFrame, JsonReadError> {
34    let content = std::fs::read_to_string(input_path).map_err(|err| JsonReadError {
35        rule: "json_parse_error".to_string(),
36        message: format!("failed to read json at {}: {err}", input_path.display()),
37    })?;
38
39    let mut rows: Vec<BTreeMap<String, Option<String>>> = Vec::new();
40    let mut keys = HashSet::new();
41    for (idx, line) in content.lines().enumerate() {
42        let line = line.trim();
43        if line.is_empty() {
44            continue;
45        }
46        let value: Value = serde_json::from_str(line).map_err(|err| JsonReadError {
47            rule: "json_parse_error".to_string(),
48            message: format!("json parse error at line {}: {err}", idx + 1),
49        })?;
50        let object = value.as_object().ok_or_else(|| JsonReadError {
51            rule: "json_parse_error".to_string(),
52            message: format!("expected json object at line {}", idx + 1),
53        })?;
54
55        let mut row = BTreeMap::new();
56        for (key, value) in object {
57            if value.is_object() || value.is_array() {
58                return Err(JsonReadError {
59                    rule: "json_unsupported_value".to_string(),
60                    message: format!(
61                        "nested json values are not supported (line {}, key {})",
62                        idx + 1,
63                        key
64                    ),
65                });
66            }
67            let cell = match value {
68                Value::Null => None,
69                Value::String(value) => Some(value.clone()),
70                Value::Bool(value) => Some(value.to_string()),
71                Value::Number(value) => Some(value.to_string()),
72                _ => None,
73            };
74            row.insert(key.clone(), cell);
75            keys.insert(key.clone());
76        }
77        rows.push(row);
78    }
79
80    let mut columns = keys.into_iter().collect::<Vec<String>>();
81    columns.sort();
82
83    build_dataframe(&columns, &rows)
84}
85
86fn read_ndjson_columns(input_path: &Path) -> Result<Vec<String>, JsonReadError> {
87    let file = std::fs::File::open(input_path).map_err(|err| JsonReadError {
88        rule: "json_parse_error".to_string(),
89        message: format!("failed to read json at {}: {err}", input_path.display()),
90    })?;
91    let reader = std::io::BufReader::new(file);
92    let mut first_error: Option<JsonReadError> = None;
93
94    for (idx, line) in reader.lines().enumerate() {
95        let line = line.map_err(|err| JsonReadError {
96            rule: "json_parse_error".to_string(),
97            message: format!("failed to read json at {}: {err}", input_path.display()),
98        })?;
99        let line = line.trim();
100        if line.is_empty() {
101            continue;
102        }
103        match serde_json::from_str::<Value>(line) {
104            Ok(value) => {
105                let object = value.as_object().ok_or_else(|| JsonReadError {
106                    rule: "json_parse_error".to_string(),
107                    message: format!("expected json object at line {}", idx + 1),
108                })?;
109                let mut keys = Vec::with_capacity(object.len());
110                for (key, value) in object {
111                    if value.is_object() || value.is_array() {
112                        return Err(JsonReadError {
113                            rule: "json_unsupported_value".to_string(),
114                            message: format!(
115                                "nested json values are not supported (line {}, key {})",
116                                idx + 1,
117                                key
118                            ),
119                        });
120                    }
121                    keys.push(key.clone());
122                }
123                keys.sort();
124                return Ok(keys);
125            }
126            Err(err) => {
127                if first_error.is_none() {
128                    first_error = Some(JsonReadError {
129                        rule: "json_parse_error".to_string(),
130                        message: format!("json parse error at line {}: {err}", idx + 1),
131                    });
132                }
133                continue;
134            }
135        }
136    }
137
138    Err(first_error.unwrap_or_else(|| JsonReadError {
139        rule: "json_parse_error".to_string(),
140        message: format!("no json objects found in {}", input_path.display()),
141    }))
142}
143
144fn read_json_array_file(input_path: &Path) -> Result<DataFrame, JsonReadError> {
145    let content = std::fs::read_to_string(input_path).map_err(|err| JsonReadError {
146        rule: "json_parse_error".to_string(),
147        message: format!("failed to read json at {}: {err}", input_path.display()),
148    })?;
149
150    let value: Value = serde_json::from_str(&content).map_err(|err| JsonReadError {
151        rule: "json_parse_error".to_string(),
152        message: format!("json parse error: {err}"),
153    })?;
154    let array = value.as_array().ok_or_else(|| JsonReadError {
155        rule: "json_parse_error".to_string(),
156        message: "expected json array at root".to_string(),
157    })?;
158
159    let mut rows: Vec<BTreeMap<String, Option<String>>> = Vec::with_capacity(array.len());
160    let mut columns = Vec::new();
161
162    for (idx, value) in array.iter().enumerate() {
163        let object = value.as_object().ok_or_else(|| JsonReadError {
164            rule: "json_parse_error".to_string(),
165            message: format!("expected json object at index {}", idx),
166        })?;
167
168        let mut row = BTreeMap::new();
169        if columns.is_empty() {
170            for key in object.keys() {
171                columns.push(key.clone());
172            }
173        }
174
175        for (key, value) in object {
176            if value.is_object() || value.is_array() {
177                return Err(JsonReadError {
178                    rule: "json_unsupported_value".to_string(),
179                    message: format!(
180                        "nested json values are not supported (index {}, key {})",
181                        idx, key
182                    ),
183                });
184            }
185            let cell = match value {
186                Value::Null => None,
187                Value::String(value) => Some(value.clone()),
188                Value::Bool(value) => Some(value.to_string()),
189                Value::Number(value) => Some(value.to_string()),
190                _ => None,
191            };
192            row.insert(key.clone(), cell);
193        }
194        rows.push(row);
195    }
196
197    build_dataframe(&columns, &rows)
198}
199
200fn build_dataframe(
201    columns: &[String],
202    rows: &[BTreeMap<String, Option<String>>],
203) -> Result<DataFrame, JsonReadError> {
204    let mut series = Vec::with_capacity(columns.len());
205    for name in columns {
206        let mut values = Vec::with_capacity(rows.len());
207        for row in rows {
208            values.push(row.get(name).cloned().unwrap_or(None));
209        }
210        series.push(Series::new(name.as_str().into(), values).into());
211    }
212
213    DataFrame::new(series).map_err(|err| JsonReadError {
214        rule: "json_parse_error".to_string(),
215        message: format!("failed to build dataframe: {err}"),
216    })
217}
218
219fn read_json_array_columns(input_path: &Path) -> Result<Vec<String>, JsonReadError> {
220    let content = std::fs::read_to_string(input_path).map_err(|err| JsonReadError {
221        rule: "json_parse_error".to_string(),
222        message: format!("failed to read json at {}: {err}", input_path.display()),
223    })?;
224    let value: Value = serde_json::from_str(&content).map_err(|err| JsonReadError {
225        rule: "json_parse_error".to_string(),
226        message: format!("json parse error: {err}"),
227    })?;
228    let array = value.as_array().ok_or_else(|| JsonReadError {
229        rule: "json_parse_error".to_string(),
230        message: "expected json array at root".to_string(),
231    })?;
232    if array.is_empty() {
233        return Err(JsonReadError {
234            rule: "json_parse_error".to_string(),
235            message: "json array is empty".to_string(),
236        });
237    }
238    let object = array[0].as_object().ok_or_else(|| JsonReadError {
239        rule: "json_parse_error".to_string(),
240        message: "expected json object at index 0".to_string(),
241    })?;
242    let mut keys = Vec::with_capacity(object.len());
243    for (key, value) in object {
244        if value.is_object() || value.is_array() {
245            return Err(JsonReadError {
246                rule: "json_unsupported_value".to_string(),
247                message: format!(
248                    "nested json values are not supported (index 0, key {})",
249                    key
250                ),
251            });
252        }
253        keys.push(key.clone());
254    }
255    keys.sort();
256    Ok(keys)
257}
258
259impl InputAdapter for JsonInputAdapter {
260    fn format(&self) -> &'static str {
261        "json"
262    }
263
264    fn read_input_columns(
265        &self,
266        entity: &config::EntityConfig,
267        input_file: &InputFile,
268        _columns: &[config::ColumnConfig],
269    ) -> Result<Vec<String>, FileReadError> {
270        let json_mode = entity
271            .source
272            .options
273            .as_ref()
274            .and_then(|options| options.json_mode.as_deref())
275            .unwrap_or("array");
276        let path = &input_file.source_local_path;
277        let read_result = match json_mode {
278            "ndjson" => read_ndjson_columns(path),
279            "array" => read_json_array_columns(path),
280            other => Err(JsonReadError {
281                rule: "json_parse_error".to_string(),
282                message: format!(
283                    "unsupported source.options.json_mode={} (allowed: array, ndjson)",
284                    other
285                ),
286            }),
287        };
288        read_result.map_err(|err| FileReadError {
289            rule: err.rule,
290            message: err.message,
291        })
292    }
293
294    fn read_inputs(
295        &self,
296        entity: &config::EntityConfig,
297        files: &[InputFile],
298        columns: &[config::ColumnConfig],
299        normalize_strategy: Option<&str>,
300        collect_raw: bool,
301    ) -> FloeResult<Vec<ReadInput>> {
302        let mut inputs = Vec::with_capacity(files.len());
303        let json_mode = entity
304            .source
305            .options
306            .as_ref()
307            .and_then(|options| options.json_mode.as_deref())
308            .unwrap_or("array");
309        for input_file in files {
310            let path = &input_file.source_local_path;
311            let read_result = match json_mode {
312                "ndjson" => read_ndjson_file(path),
313                "array" => read_json_array_file(path),
314                other => Err(JsonReadError {
315                    rule: "json_parse_error".to_string(),
316                    message: format!(
317                        "unsupported source.options.json_mode={} (allowed: array, ndjson)",
318                        other
319                    ),
320                }),
321            };
322            match read_result {
323                Ok(df) => {
324                    let input = format::read_input_from_df(
325                        input_file,
326                        &df,
327                        columns,
328                        normalize_strategy,
329                        collect_raw,
330                    )?;
331                    inputs.push(input);
332                }
333                Err(err) => {
334                    inputs.push(ReadInput::FileError {
335                        input_file: input_file.clone(),
336                        error: FileReadError {
337                            rule: err.rule,
338                            message: err.message,
339                        },
340                    });
341                }
342            }
343        }
344        Ok(inputs)
345    }
346}