Skip to main content

floe_core/io/read/
json.rs

1use std::collections::HashSet;
2use std::io::BufRead;
3use std::path::Path;
4
5use polars::prelude::{DataFrame, NamedFrom, Series};
6use serde_json::Value;
7
8use crate::io::format::{self, FileReadError, InputAdapter, InputFile, ReadInput};
9use crate::io::read::json_selector::{
10    compact_json, evaluate_selector, parse_selector, SelectorToken, SelectorValue,
11};
12use crate::{config, FloeResult};
13
14struct JsonInputAdapter;
15
16static JSON_INPUT_ADAPTER: JsonInputAdapter = JsonInputAdapter;
17
18pub(crate) fn json_input_adapter() -> &'static dyn InputAdapter {
19    &JSON_INPUT_ADAPTER
20}
21
22#[derive(Debug, Clone)]
23pub struct JsonReadError {
24    pub rule: String,
25    pub message: String,
26}
27
28impl std::fmt::Display for JsonReadError {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        write!(f, "{}: {}", self.rule, self.message)
31    }
32}
33
34impl std::error::Error for JsonReadError {}
35
36struct SelectorPlan {
37    source: String,
38    tokens: Vec<SelectorToken>,
39    top_level_key: Option<String>,
40}
41
42fn build_selector_plan(
43    columns: &[config::ColumnConfig],
44) -> Result<Vec<SelectorPlan>, JsonReadError> {
45    let mut plans = Vec::with_capacity(columns.len());
46    let mut seen = HashSet::new();
47    for column in columns {
48        let source = column.source_or_name().to_string();
49        if !seen.insert(source.clone()) {
50            return Err(JsonReadError {
51                rule: "json_selector_invalid".to_string(),
52                message: format!("duplicate json selector source: {}", source),
53            });
54        }
55        let tokens = parse_selector(&source).map_err(|err| JsonReadError {
56            rule: "json_selector_invalid".to_string(),
57            message: format!("invalid selector {}: {}", source, err.message),
58        })?;
59        let top_level_key = match tokens.as_slice() {
60            [SelectorToken::Field(name)] => Some(name.clone()),
61            _ => None,
62        };
63        plans.push(SelectorPlan {
64            source,
65            tokens,
66            top_level_key,
67        });
68    }
69    Ok(plans)
70}
71
72fn evaluate_selector_plan(
73    value: &Value,
74    plan: &SelectorPlan,
75) -> Result<SelectorValue, JsonReadError> {
76    if let Some(key) = plan.top_level_key.as_deref() {
77        let Some(object) = value.as_object() else {
78            return Ok(SelectorValue::Null);
79        };
80        let Some(current) = object.get(key) else {
81            return Ok(SelectorValue::Null);
82        };
83        if current.is_null() {
84            return Ok(SelectorValue::Null);
85        }
86        if current.is_object() || current.is_array() {
87            return Ok(SelectorValue::NonScalar(current.clone()));
88        }
89        return match current {
90            Value::String(value) => Ok(SelectorValue::Scalar(value.clone())),
91            Value::Bool(value) => Ok(SelectorValue::Scalar(value.to_string())),
92            Value::Number(value) => Ok(SelectorValue::Scalar(value.to_string())),
93            Value::Null => Ok(SelectorValue::Null),
94            Value::Object(_) | Value::Array(_) => Ok(SelectorValue::NonScalar(current.clone())),
95        };
96    }
97
98    evaluate_selector(value, &plan.tokens).map_err(|err| JsonReadError {
99        rule: "json_selector_invalid".to_string(),
100        message: format!("invalid selector {}: {}", plan.source, err.message),
101    })
102}
103
104fn append_extracted_row_into(
105    value: &Value,
106    plans: &[SelectorPlan],
107    cast_mode: &str,
108    location_kind: &'static str,
109    location_index: usize,
110    output_columns: &mut [Vec<Option<String>>],
111) -> Result<(), JsonReadError> {
112    for (output, plan) in output_columns.iter_mut().zip(plans) {
113        let selected = evaluate_selector_plan(value, plan)?;
114        let cell = match selected {
115            SelectorValue::Null => None,
116            SelectorValue::Scalar(value) => Some(value),
117            SelectorValue::NonScalar(value) => {
118                let location = format!("{location_kind} {location_index}");
119                if cast_mode == "coerce" {
120                    Some(compact_json(&value).map_err(|err| JsonReadError {
121                        rule: "json_selector_non_scalar".to_string(),
122                        message: format!(
123                            "failed to stringify selector {} at {}: {}",
124                            plan.source, location, err.message
125                        ),
126                    })?)
127                } else {
128                    return Err(JsonReadError {
129                        rule: "json_selector_non_scalar".to_string(),
130                        message: format!(
131                            "non-scalar value for selector {} at {}",
132                            plan.source, location
133                        ),
134                    });
135                }
136            }
137        };
138        output.push(cell);
139    }
140    Ok(())
141}
142
143fn read_ndjson_file(
144    input_path: &Path,
145    columns: &[config::ColumnConfig],
146    cast_mode: &str,
147) -> Result<DataFrame, JsonReadError> {
148    let content = std::fs::read_to_string(input_path).map_err(|err| JsonReadError {
149        rule: "json_parse_error".to_string(),
150        message: format!("failed to read json at {}: {err}", input_path.display()),
151    })?;
152
153    let plans = build_selector_plan(columns)?;
154    let mut column_values: Vec<Vec<Option<String>>> = vec![Vec::new(); plans.len()];
155    for (idx, line) in content.lines().enumerate() {
156        let line = line.trim();
157        if line.is_empty() {
158            continue;
159        }
160        let value: Value = serde_json::from_str(line).map_err(|err| JsonReadError {
161            rule: "json_parse_error".to_string(),
162            message: format!("json parse error at line {}: {err}", idx + 1),
163        })?;
164        if !value.is_object() {
165            return Err(JsonReadError {
166                rule: "json_parse_error".to_string(),
167                message: format!("expected json object at line {}", idx + 1),
168            });
169        }
170
171        append_extracted_row_into(
172            &value,
173            &plans,
174            cast_mode,
175            "line",
176            idx + 1,
177            &mut column_values,
178        )?;
179    }
180
181    build_dataframe(&plans, column_values)
182}
183
184fn read_ndjson_columns(input_path: &Path) -> Result<Vec<String>, JsonReadError> {
185    let file = std::fs::File::open(input_path).map_err(|err| JsonReadError {
186        rule: "json_parse_error".to_string(),
187        message: format!("failed to read json at {}: {err}", input_path.display()),
188    })?;
189    let reader = std::io::BufReader::new(file);
190    let mut first_error: Option<JsonReadError> = None;
191
192    for (idx, line) in reader.lines().enumerate() {
193        let line = line.map_err(|err| JsonReadError {
194            rule: "json_parse_error".to_string(),
195            message: format!("failed to read json at {}: {err}", input_path.display()),
196        })?;
197        let line = line.trim();
198        if line.is_empty() {
199            continue;
200        }
201        match serde_json::from_str::<Value>(line) {
202            Ok(value) => {
203                let object = value.as_object().ok_or_else(|| JsonReadError {
204                    rule: "json_parse_error".to_string(),
205                    message: format!("expected json object at line {}", idx + 1),
206                })?;
207                let mut keys = object.keys().cloned().collect::<Vec<_>>();
208                keys.sort();
209                return Ok(keys);
210            }
211            Err(err) => {
212                if first_error.is_none() {
213                    first_error = Some(JsonReadError {
214                        rule: "json_parse_error".to_string(),
215                        message: format!("json parse error at line {}: {err}", idx + 1),
216                    });
217                }
218                continue;
219            }
220        }
221    }
222
223    Err(first_error.unwrap_or_else(|| JsonReadError {
224        rule: "json_parse_error".to_string(),
225        message: format!("no json objects found in {}", input_path.display()),
226    }))
227}
228
229fn read_json_array_file(
230    input_path: &Path,
231    columns: &[config::ColumnConfig],
232    cast_mode: &str,
233) -> Result<DataFrame, JsonReadError> {
234    let content = std::fs::read_to_string(input_path).map_err(|err| JsonReadError {
235        rule: "json_parse_error".to_string(),
236        message: format!("failed to read json at {}: {err}", input_path.display()),
237    })?;
238
239    let value: Value = serde_json::from_str(&content).map_err(|err| JsonReadError {
240        rule: "json_parse_error".to_string(),
241        message: format!("json parse error: {err}"),
242    })?;
243    let array = value.as_array().ok_or_else(|| JsonReadError {
244        rule: "json_parse_error".to_string(),
245        message: "expected json array at root".to_string(),
246    })?;
247
248    let plans = build_selector_plan(columns)?;
249    let mut column_values: Vec<Vec<Option<String>>> = (0..plans.len())
250        .map(|_| Vec::with_capacity(array.len()))
251        .collect();
252
253    for (idx, value) in array.iter().enumerate() {
254        if !value.is_object() {
255            return Err(JsonReadError {
256                rule: "json_parse_error".to_string(),
257                message: format!("expected json object at index {}", idx),
258            });
259        }
260        append_extracted_row_into(value, &plans, cast_mode, "index", idx, &mut column_values)?;
261    }
262
263    build_dataframe(&plans, column_values)
264}
265
266fn read_json_array_columns(input_path: &Path) -> Result<Vec<String>, JsonReadError> {
267    let content = std::fs::read_to_string(input_path).map_err(|err| JsonReadError {
268        rule: "json_parse_error".to_string(),
269        message: format!("failed to read json at {}: {err}", input_path.display()),
270    })?;
271    let value: Value = serde_json::from_str(&content).map_err(|err| JsonReadError {
272        rule: "json_parse_error".to_string(),
273        message: format!("json parse error: {err}"),
274    })?;
275    let array = value.as_array().ok_or_else(|| JsonReadError {
276        rule: "json_parse_error".to_string(),
277        message: "expected json array at root".to_string(),
278    })?;
279    if array.is_empty() {
280        return Err(JsonReadError {
281            rule: "json_parse_error".to_string(),
282            message: "json array is empty".to_string(),
283        });
284    }
285    let object = array[0].as_object().ok_or_else(|| JsonReadError {
286        rule: "json_parse_error".to_string(),
287        message: "expected json object at index 0".to_string(),
288    })?;
289    let mut keys = object.keys().cloned().collect::<Vec<_>>();
290    keys.sort();
291    Ok(keys)
292}
293
294fn build_dataframe(
295    plans: &[SelectorPlan],
296    column_values: Vec<Vec<Option<String>>>,
297) -> Result<DataFrame, JsonReadError> {
298    let mut series = Vec::with_capacity(plans.len());
299    for (plan, values) in plans.iter().zip(column_values.into_iter()) {
300        series.push(Series::new(plan.source.as_str().into(), values).into());
301    }
302
303    DataFrame::new(series).map_err(|err| JsonReadError {
304        rule: "json_parse_error".to_string(),
305        message: format!("failed to build dataframe: {err}"),
306    })
307}
308
309impl InputAdapter for JsonInputAdapter {
310    fn format(&self) -> &'static str {
311        "json"
312    }
313
314    fn read_input_columns(
315        &self,
316        entity: &config::EntityConfig,
317        input_file: &InputFile,
318        _columns: &[config::ColumnConfig],
319    ) -> Result<Vec<String>, FileReadError> {
320        let json_mode = entity
321            .source
322            .options
323            .as_ref()
324            .and_then(|options| options.json_mode.as_deref())
325            .unwrap_or("array");
326        let path = &input_file.source_local_path;
327        let read_result = match json_mode {
328            "ndjson" => read_ndjson_columns(path),
329            "array" => read_json_array_columns(path),
330            other => Err(JsonReadError {
331                rule: "json_parse_error".to_string(),
332                message: format!(
333                    "unsupported source.options.json_mode={} (allowed: array, ndjson)",
334                    other
335                ),
336            }),
337        };
338        read_result.map_err(|err| FileReadError {
339            rule: err.rule,
340            message: err.message,
341        })
342    }
343
344    fn read_inputs(
345        &self,
346        entity: &config::EntityConfig,
347        files: &[InputFile],
348        columns: &[config::ColumnConfig],
349        normalize_strategy: Option<&str>,
350        collect_raw: bool,
351    ) -> FloeResult<Vec<ReadInput>> {
352        let mut inputs = Vec::with_capacity(files.len());
353        let json_mode = entity
354            .source
355            .options
356            .as_ref()
357            .and_then(|options| options.json_mode.as_deref())
358            .unwrap_or("array");
359        let cast_mode = entity.source.cast_mode.as_deref().unwrap_or("strict");
360        for input_file in files {
361            let path = &input_file.source_local_path;
362            let read_result = match json_mode {
363                "ndjson" => read_ndjson_file(path, columns, cast_mode),
364                "array" => read_json_array_file(path, columns, cast_mode),
365                other => Err(JsonReadError {
366                    rule: "json_parse_error".to_string(),
367                    message: format!(
368                        "unsupported source.options.json_mode={} (allowed: array, ndjson)",
369                        other
370                    ),
371                }),
372            };
373            match read_result {
374                Ok(df) => {
375                    let input = format::read_input_from_df(
376                        input_file,
377                        &df,
378                        columns,
379                        normalize_strategy,
380                        collect_raw,
381                    )?;
382                    inputs.push(input);
383                }
384                Err(err) => {
385                    inputs.push(ReadInput::FileError {
386                        input_file: input_file.clone(),
387                        error: FileReadError {
388                            rule: err.rule,
389                            message: err.message,
390                        },
391                    });
392                }
393            }
394        }
395        Ok(inputs)
396    }
397}