Skip to main content

floe_core/io/read/
json.rs

1use std::collections::{BTreeMap, HashSet};
2use std::io::BufRead;
3use std::path::Path;
4
5use polars::prelude::{DataFrame, NamedFrom, Series};
6use serde_json::Value;
7
8use crate::io::format::{self, FileReadError, InputAdapter, InputFile, ReadInput};
9use crate::io::read::json_selector::{
10    compact_json, evaluate_selector, parse_selector, SelectorToken, SelectorValue,
11};
12use crate::{config, FloeResult};
13
14struct JsonInputAdapter;
15
16static JSON_INPUT_ADAPTER: JsonInputAdapter = JsonInputAdapter;
17
18pub(crate) fn json_input_adapter() -> &'static dyn InputAdapter {
19    &JSON_INPUT_ADAPTER
20}
21
22#[derive(Debug, Clone)]
23pub struct JsonReadError {
24    pub rule: String,
25    pub message: String,
26}
27
28impl std::fmt::Display for JsonReadError {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        write!(f, "{}: {}", self.rule, self.message)
31    }
32}
33
34impl std::error::Error for JsonReadError {}
35
36struct SelectorPlan {
37    source: String,
38    tokens: Vec<SelectorToken>,
39}
40
41fn build_selector_plan(
42    columns: &[config::ColumnConfig],
43) -> Result<Vec<SelectorPlan>, JsonReadError> {
44    let mut plans = Vec::with_capacity(columns.len());
45    let mut seen = HashSet::new();
46    for column in columns {
47        let source = column.source_or_name().to_string();
48        if !seen.insert(source.clone()) {
49            return Err(JsonReadError {
50                rule: "json_selector_invalid".to_string(),
51                message: format!("duplicate json selector source: {}", source),
52            });
53        }
54        let tokens = parse_selector(&source).map_err(|err| JsonReadError {
55            rule: "json_selector_invalid".to_string(),
56            message: format!("invalid selector {}: {}", source, err.message),
57        })?;
58        plans.push(SelectorPlan { source, tokens });
59    }
60    Ok(plans)
61}
62
63fn extract_row(
64    value: &Value,
65    plans: &[SelectorPlan],
66    cast_mode: &str,
67    location: &str,
68) -> Result<BTreeMap<String, Option<String>>, JsonReadError> {
69    let mut row = BTreeMap::new();
70    for plan in plans {
71        let selected = evaluate_selector(value, &plan.tokens).map_err(|err| JsonReadError {
72            rule: "json_selector_invalid".to_string(),
73            message: format!("invalid selector {}: {}", plan.source, err.message),
74        })?;
75        let cell = match selected {
76            SelectorValue::Null => None,
77            SelectorValue::Scalar(value) => Some(value),
78            SelectorValue::NonScalar(value) => {
79                if cast_mode == "coerce" {
80                    Some(compact_json(&value).map_err(|err| JsonReadError {
81                        rule: "json_selector_non_scalar".to_string(),
82                        message: format!(
83                            "failed to stringify selector {} at {}: {}",
84                            plan.source, location, err.message
85                        ),
86                    })?)
87                } else {
88                    return Err(JsonReadError {
89                        rule: "json_selector_non_scalar".to_string(),
90                        message: format!(
91                            "non-scalar value for selector {} at {}",
92                            plan.source, location
93                        ),
94                    });
95                }
96            }
97        };
98        row.insert(plan.source.clone(), cell);
99    }
100    Ok(row)
101}
102
103fn read_ndjson_file(
104    input_path: &Path,
105    columns: &[config::ColumnConfig],
106    cast_mode: &str,
107) -> Result<DataFrame, JsonReadError> {
108    let content = std::fs::read_to_string(input_path).map_err(|err| JsonReadError {
109        rule: "json_parse_error".to_string(),
110        message: format!("failed to read json at {}: {err}", input_path.display()),
111    })?;
112
113    let plans = build_selector_plan(columns)?;
114    let mut rows: Vec<BTreeMap<String, Option<String>>> = Vec::new();
115    for (idx, line) in content.lines().enumerate() {
116        let line = line.trim();
117        if line.is_empty() {
118            continue;
119        }
120        let value: Value = serde_json::from_str(line).map_err(|err| JsonReadError {
121            rule: "json_parse_error".to_string(),
122            message: format!("json parse error at line {}: {err}", idx + 1),
123        })?;
124        if !value.is_object() {
125            return Err(JsonReadError {
126                rule: "json_parse_error".to_string(),
127                message: format!("expected json object at line {}", idx + 1),
128            });
129        }
130
131        let row = extract_row(&value, &plans, cast_mode, &format!("line {}", idx + 1))?;
132        rows.push(row);
133    }
134
135    let columns = plans
136        .iter()
137        .map(|plan| plan.source.clone())
138        .collect::<Vec<_>>();
139
140    build_dataframe(&columns, &rows)
141}
142
143fn read_ndjson_columns(input_path: &Path) -> Result<Vec<String>, JsonReadError> {
144    let file = std::fs::File::open(input_path).map_err(|err| JsonReadError {
145        rule: "json_parse_error".to_string(),
146        message: format!("failed to read json at {}: {err}", input_path.display()),
147    })?;
148    let reader = std::io::BufReader::new(file);
149    let mut first_error: Option<JsonReadError> = None;
150
151    for (idx, line) in reader.lines().enumerate() {
152        let line = line.map_err(|err| JsonReadError {
153            rule: "json_parse_error".to_string(),
154            message: format!("failed to read json at {}: {err}", input_path.display()),
155        })?;
156        let line = line.trim();
157        if line.is_empty() {
158            continue;
159        }
160        match serde_json::from_str::<Value>(line) {
161            Ok(value) => {
162                let object = value.as_object().ok_or_else(|| JsonReadError {
163                    rule: "json_parse_error".to_string(),
164                    message: format!("expected json object at line {}", idx + 1),
165                })?;
166                let mut keys = object.keys().cloned().collect::<Vec<_>>();
167                keys.sort();
168                return Ok(keys);
169            }
170            Err(err) => {
171                if first_error.is_none() {
172                    first_error = Some(JsonReadError {
173                        rule: "json_parse_error".to_string(),
174                        message: format!("json parse error at line {}: {err}", idx + 1),
175                    });
176                }
177                continue;
178            }
179        }
180    }
181
182    Err(first_error.unwrap_or_else(|| JsonReadError {
183        rule: "json_parse_error".to_string(),
184        message: format!("no json objects found in {}", input_path.display()),
185    }))
186}
187
188fn read_json_array_file(
189    input_path: &Path,
190    columns: &[config::ColumnConfig],
191    cast_mode: &str,
192) -> Result<DataFrame, JsonReadError> {
193    let content = std::fs::read_to_string(input_path).map_err(|err| JsonReadError {
194        rule: "json_parse_error".to_string(),
195        message: format!("failed to read json at {}: {err}", input_path.display()),
196    })?;
197
198    let value: Value = serde_json::from_str(&content).map_err(|err| JsonReadError {
199        rule: "json_parse_error".to_string(),
200        message: format!("json parse error: {err}"),
201    })?;
202    let array = value.as_array().ok_or_else(|| JsonReadError {
203        rule: "json_parse_error".to_string(),
204        message: "expected json array at root".to_string(),
205    })?;
206
207    let plans = build_selector_plan(columns)?;
208    let mut rows: Vec<BTreeMap<String, Option<String>>> = Vec::with_capacity(array.len());
209
210    for (idx, value) in array.iter().enumerate() {
211        if !value.is_object() {
212            return Err(JsonReadError {
213                rule: "json_parse_error".to_string(),
214                message: format!("expected json object at index {}", idx),
215            });
216        }
217        let row = extract_row(value, &plans, cast_mode, &format!("index {}", idx))?;
218        rows.push(row);
219    }
220
221    let columns = plans
222        .iter()
223        .map(|plan| plan.source.clone())
224        .collect::<Vec<_>>();
225
226    build_dataframe(&columns, &rows)
227}
228
229fn read_json_array_columns(input_path: &Path) -> Result<Vec<String>, JsonReadError> {
230    let content = std::fs::read_to_string(input_path).map_err(|err| JsonReadError {
231        rule: "json_parse_error".to_string(),
232        message: format!("failed to read json at {}: {err}", input_path.display()),
233    })?;
234    let value: Value = serde_json::from_str(&content).map_err(|err| JsonReadError {
235        rule: "json_parse_error".to_string(),
236        message: format!("json parse error: {err}"),
237    })?;
238    let array = value.as_array().ok_or_else(|| JsonReadError {
239        rule: "json_parse_error".to_string(),
240        message: "expected json array at root".to_string(),
241    })?;
242    if array.is_empty() {
243        return Err(JsonReadError {
244            rule: "json_parse_error".to_string(),
245            message: "json array is empty".to_string(),
246        });
247    }
248    let object = array[0].as_object().ok_or_else(|| JsonReadError {
249        rule: "json_parse_error".to_string(),
250        message: "expected json object at index 0".to_string(),
251    })?;
252    let mut keys = object.keys().cloned().collect::<Vec<_>>();
253    keys.sort();
254    Ok(keys)
255}
256
257fn build_dataframe(
258    columns: &[String],
259    rows: &[BTreeMap<String, Option<String>>],
260) -> Result<DataFrame, JsonReadError> {
261    let mut series = Vec::with_capacity(columns.len());
262    for name in columns {
263        let mut values = Vec::with_capacity(rows.len());
264        for row in rows {
265            values.push(row.get(name).cloned().unwrap_or(None));
266        }
267        series.push(Series::new(name.as_str().into(), values).into());
268    }
269
270    DataFrame::new(series).map_err(|err| JsonReadError {
271        rule: "json_parse_error".to_string(),
272        message: format!("failed to build dataframe: {err}"),
273    })
274}
275
276impl InputAdapter for JsonInputAdapter {
277    fn format(&self) -> &'static str {
278        "json"
279    }
280
281    fn read_input_columns(
282        &self,
283        entity: &config::EntityConfig,
284        input_file: &InputFile,
285        _columns: &[config::ColumnConfig],
286    ) -> Result<Vec<String>, FileReadError> {
287        let json_mode = entity
288            .source
289            .options
290            .as_ref()
291            .and_then(|options| options.json_mode.as_deref())
292            .unwrap_or("array");
293        let path = &input_file.source_local_path;
294        let read_result = match json_mode {
295            "ndjson" => read_ndjson_columns(path),
296            "array" => read_json_array_columns(path),
297            other => Err(JsonReadError {
298                rule: "json_parse_error".to_string(),
299                message: format!(
300                    "unsupported source.options.json_mode={} (allowed: array, ndjson)",
301                    other
302                ),
303            }),
304        };
305        read_result.map_err(|err| FileReadError {
306            rule: err.rule,
307            message: err.message,
308        })
309    }
310
311    fn read_inputs(
312        &self,
313        entity: &config::EntityConfig,
314        files: &[InputFile],
315        columns: &[config::ColumnConfig],
316        normalize_strategy: Option<&str>,
317        collect_raw: bool,
318    ) -> FloeResult<Vec<ReadInput>> {
319        let mut inputs = Vec::with_capacity(files.len());
320        let json_mode = entity
321            .source
322            .options
323            .as_ref()
324            .and_then(|options| options.json_mode.as_deref())
325            .unwrap_or("array");
326        let cast_mode = entity.source.cast_mode.as_deref().unwrap_or("strict");
327        for input_file in files {
328            let path = &input_file.source_local_path;
329            let read_result = match json_mode {
330                "ndjson" => read_ndjson_file(path, columns, cast_mode),
331                "array" => read_json_array_file(path, columns, cast_mode),
332                other => Err(JsonReadError {
333                    rule: "json_parse_error".to_string(),
334                    message: format!(
335                        "unsupported source.options.json_mode={} (allowed: array, ndjson)",
336                        other
337                    ),
338                }),
339            };
340            match read_result {
341                Ok(df) => {
342                    let input = format::read_input_from_df(
343                        input_file,
344                        &df,
345                        columns,
346                        normalize_strategy,
347                        collect_raw,
348                    )?;
349                    inputs.push(input);
350                }
351                Err(err) => {
352                    inputs.push(ReadInput::FileError {
353                        input_file: input_file.clone(),
354                        error: FileReadError {
355                            rule: err.rule,
356                            message: err.message,
357                        },
358                    });
359                }
360            }
361        }
362        Ok(inputs)
363    }
364}