1use std::collections::{BTreeMap, HashSet};
2use std::io::BufRead;
3use std::path::Path;
4
5use polars::prelude::{DataFrame, NamedFrom, Series};
6use serde_json::Value;
7
8use crate::io::format::{self, FileReadError, InputAdapter, InputFile, ReadInput};
9use crate::io::read::json_selector::{
10 compact_json, evaluate_selector, parse_selector, SelectorToken, SelectorValue,
11};
12use crate::{config, FloeResult};
13
14struct JsonInputAdapter;
15
16static JSON_INPUT_ADAPTER: JsonInputAdapter = JsonInputAdapter;
17
18pub(crate) fn json_input_adapter() -> &'static dyn InputAdapter {
19 &JSON_INPUT_ADAPTER
20}
21
22#[derive(Debug, Clone)]
23pub struct JsonReadError {
24 pub rule: String,
25 pub message: String,
26}
27
28impl std::fmt::Display for JsonReadError {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 write!(f, "{}: {}", self.rule, self.message)
31 }
32}
33
34impl std::error::Error for JsonReadError {}
35
36struct SelectorPlan {
37 source: String,
38 tokens: Vec<SelectorToken>,
39}
40
41fn build_selector_plan(
42 columns: &[config::ColumnConfig],
43) -> Result<Vec<SelectorPlan>, JsonReadError> {
44 let mut plans = Vec::with_capacity(columns.len());
45 let mut seen = HashSet::new();
46 for column in columns {
47 let source = column.source_or_name().to_string();
48 if !seen.insert(source.clone()) {
49 return Err(JsonReadError {
50 rule: "json_selector_invalid".to_string(),
51 message: format!("duplicate json selector source: {}", source),
52 });
53 }
54 let tokens = parse_selector(&source).map_err(|err| JsonReadError {
55 rule: "json_selector_invalid".to_string(),
56 message: format!("invalid selector {}: {}", source, err.message),
57 })?;
58 plans.push(SelectorPlan { source, tokens });
59 }
60 Ok(plans)
61}
62
63fn extract_row(
64 value: &Value,
65 plans: &[SelectorPlan],
66 cast_mode: &str,
67 location: &str,
68) -> Result<BTreeMap<String, Option<String>>, JsonReadError> {
69 let mut row = BTreeMap::new();
70 for plan in plans {
71 let selected = evaluate_selector(value, &plan.tokens).map_err(|err| JsonReadError {
72 rule: "json_selector_invalid".to_string(),
73 message: format!("invalid selector {}: {}", plan.source, err.message),
74 })?;
75 let cell = match selected {
76 SelectorValue::Null => None,
77 SelectorValue::Scalar(value) => Some(value),
78 SelectorValue::NonScalar(value) => {
79 if cast_mode == "coerce" {
80 Some(compact_json(&value).map_err(|err| JsonReadError {
81 rule: "json_selector_non_scalar".to_string(),
82 message: format!(
83 "failed to stringify selector {} at {}: {}",
84 plan.source, location, err.message
85 ),
86 })?)
87 } else {
88 return Err(JsonReadError {
89 rule: "json_selector_non_scalar".to_string(),
90 message: format!(
91 "non-scalar value for selector {} at {}",
92 plan.source, location
93 ),
94 });
95 }
96 }
97 };
98 row.insert(plan.source.clone(), cell);
99 }
100 Ok(row)
101}
102
103fn read_ndjson_file(
104 input_path: &Path,
105 columns: &[config::ColumnConfig],
106 cast_mode: &str,
107) -> Result<DataFrame, JsonReadError> {
108 let content = std::fs::read_to_string(input_path).map_err(|err| JsonReadError {
109 rule: "json_parse_error".to_string(),
110 message: format!("failed to read json at {}: {err}", input_path.display()),
111 })?;
112
113 let plans = build_selector_plan(columns)?;
114 let mut rows: Vec<BTreeMap<String, Option<String>>> = Vec::new();
115 for (idx, line) in content.lines().enumerate() {
116 let line = line.trim();
117 if line.is_empty() {
118 continue;
119 }
120 let value: Value = serde_json::from_str(line).map_err(|err| JsonReadError {
121 rule: "json_parse_error".to_string(),
122 message: format!("json parse error at line {}: {err}", idx + 1),
123 })?;
124 if !value.is_object() {
125 return Err(JsonReadError {
126 rule: "json_parse_error".to_string(),
127 message: format!("expected json object at line {}", idx + 1),
128 });
129 }
130
131 let row = extract_row(&value, &plans, cast_mode, &format!("line {}", idx + 1))?;
132 rows.push(row);
133 }
134
135 let columns = plans
136 .iter()
137 .map(|plan| plan.source.clone())
138 .collect::<Vec<_>>();
139
140 build_dataframe(&columns, &rows)
141}
142
143fn read_ndjson_columns(input_path: &Path) -> Result<Vec<String>, JsonReadError> {
144 let file = std::fs::File::open(input_path).map_err(|err| JsonReadError {
145 rule: "json_parse_error".to_string(),
146 message: format!("failed to read json at {}: {err}", input_path.display()),
147 })?;
148 let reader = std::io::BufReader::new(file);
149 let mut first_error: Option<JsonReadError> = None;
150
151 for (idx, line) in reader.lines().enumerate() {
152 let line = line.map_err(|err| JsonReadError {
153 rule: "json_parse_error".to_string(),
154 message: format!("failed to read json at {}: {err}", input_path.display()),
155 })?;
156 let line = line.trim();
157 if line.is_empty() {
158 continue;
159 }
160 match serde_json::from_str::<Value>(line) {
161 Ok(value) => {
162 let object = value.as_object().ok_or_else(|| JsonReadError {
163 rule: "json_parse_error".to_string(),
164 message: format!("expected json object at line {}", idx + 1),
165 })?;
166 let mut keys = object.keys().cloned().collect::<Vec<_>>();
167 keys.sort();
168 return Ok(keys);
169 }
170 Err(err) => {
171 if first_error.is_none() {
172 first_error = Some(JsonReadError {
173 rule: "json_parse_error".to_string(),
174 message: format!("json parse error at line {}: {err}", idx + 1),
175 });
176 }
177 continue;
178 }
179 }
180 }
181
182 Err(first_error.unwrap_or_else(|| JsonReadError {
183 rule: "json_parse_error".to_string(),
184 message: format!("no json objects found in {}", input_path.display()),
185 }))
186}
187
188fn read_json_array_file(
189 input_path: &Path,
190 columns: &[config::ColumnConfig],
191 cast_mode: &str,
192) -> Result<DataFrame, JsonReadError> {
193 let content = std::fs::read_to_string(input_path).map_err(|err| JsonReadError {
194 rule: "json_parse_error".to_string(),
195 message: format!("failed to read json at {}: {err}", input_path.display()),
196 })?;
197
198 let value: Value = serde_json::from_str(&content).map_err(|err| JsonReadError {
199 rule: "json_parse_error".to_string(),
200 message: format!("json parse error: {err}"),
201 })?;
202 let array = value.as_array().ok_or_else(|| JsonReadError {
203 rule: "json_parse_error".to_string(),
204 message: "expected json array at root".to_string(),
205 })?;
206
207 let plans = build_selector_plan(columns)?;
208 let mut rows: Vec<BTreeMap<String, Option<String>>> = Vec::with_capacity(array.len());
209
210 for (idx, value) in array.iter().enumerate() {
211 if !value.is_object() {
212 return Err(JsonReadError {
213 rule: "json_parse_error".to_string(),
214 message: format!("expected json object at index {}", idx),
215 });
216 }
217 let row = extract_row(value, &plans, cast_mode, &format!("index {}", idx))?;
218 rows.push(row);
219 }
220
221 let columns = plans
222 .iter()
223 .map(|plan| plan.source.clone())
224 .collect::<Vec<_>>();
225
226 build_dataframe(&columns, &rows)
227}
228
229fn read_json_array_columns(input_path: &Path) -> Result<Vec<String>, JsonReadError> {
230 let content = std::fs::read_to_string(input_path).map_err(|err| JsonReadError {
231 rule: "json_parse_error".to_string(),
232 message: format!("failed to read json at {}: {err}", input_path.display()),
233 })?;
234 let value: Value = serde_json::from_str(&content).map_err(|err| JsonReadError {
235 rule: "json_parse_error".to_string(),
236 message: format!("json parse error: {err}"),
237 })?;
238 let array = value.as_array().ok_or_else(|| JsonReadError {
239 rule: "json_parse_error".to_string(),
240 message: "expected json array at root".to_string(),
241 })?;
242 if array.is_empty() {
243 return Err(JsonReadError {
244 rule: "json_parse_error".to_string(),
245 message: "json array is empty".to_string(),
246 });
247 }
248 let object = array[0].as_object().ok_or_else(|| JsonReadError {
249 rule: "json_parse_error".to_string(),
250 message: "expected json object at index 0".to_string(),
251 })?;
252 let mut keys = object.keys().cloned().collect::<Vec<_>>();
253 keys.sort();
254 Ok(keys)
255}
256
257fn build_dataframe(
258 columns: &[String],
259 rows: &[BTreeMap<String, Option<String>>],
260) -> Result<DataFrame, JsonReadError> {
261 let mut series = Vec::with_capacity(columns.len());
262 for name in columns {
263 let mut values = Vec::with_capacity(rows.len());
264 for row in rows {
265 values.push(row.get(name).cloned().unwrap_or(None));
266 }
267 series.push(Series::new(name.as_str().into(), values).into());
268 }
269
270 DataFrame::new(series).map_err(|err| JsonReadError {
271 rule: "json_parse_error".to_string(),
272 message: format!("failed to build dataframe: {err}"),
273 })
274}
275
276impl InputAdapter for JsonInputAdapter {
277 fn format(&self) -> &'static str {
278 "json"
279 }
280
281 fn read_input_columns(
282 &self,
283 entity: &config::EntityConfig,
284 input_file: &InputFile,
285 _columns: &[config::ColumnConfig],
286 ) -> Result<Vec<String>, FileReadError> {
287 let json_mode = entity
288 .source
289 .options
290 .as_ref()
291 .and_then(|options| options.json_mode.as_deref())
292 .unwrap_or("array");
293 let path = &input_file.source_local_path;
294 let read_result = match json_mode {
295 "ndjson" => read_ndjson_columns(path),
296 "array" => read_json_array_columns(path),
297 other => Err(JsonReadError {
298 rule: "json_parse_error".to_string(),
299 message: format!(
300 "unsupported source.options.json_mode={} (allowed: array, ndjson)",
301 other
302 ),
303 }),
304 };
305 read_result.map_err(|err| FileReadError {
306 rule: err.rule,
307 message: err.message,
308 })
309 }
310
311 fn read_inputs(
312 &self,
313 entity: &config::EntityConfig,
314 files: &[InputFile],
315 columns: &[config::ColumnConfig],
316 normalize_strategy: Option<&str>,
317 collect_raw: bool,
318 ) -> FloeResult<Vec<ReadInput>> {
319 let mut inputs = Vec::with_capacity(files.len());
320 let json_mode = entity
321 .source
322 .options
323 .as_ref()
324 .and_then(|options| options.json_mode.as_deref())
325 .unwrap_or("array");
326 let cast_mode = entity.source.cast_mode.as_deref().unwrap_or("strict");
327 for input_file in files {
328 let path = &input_file.source_local_path;
329 let read_result = match json_mode {
330 "ndjson" => read_ndjson_file(path, columns, cast_mode),
331 "array" => read_json_array_file(path, columns, cast_mode),
332 other => Err(JsonReadError {
333 rule: "json_parse_error".to_string(),
334 message: format!(
335 "unsupported source.options.json_mode={} (allowed: array, ndjson)",
336 other
337 ),
338 }),
339 };
340 match read_result {
341 Ok(df) => {
342 let input = format::read_input_from_df(
343 input_file,
344 &df,
345 columns,
346 normalize_strategy,
347 collect_raw,
348 )?;
349 inputs.push(input);
350 }
351 Err(err) => {
352 inputs.push(ReadInput::FileError {
353 input_file: input_file.clone(),
354 error: FileReadError {
355 rule: err.rule,
356 message: err.message,
357 },
358 });
359 }
360 }
361 }
362 Ok(inputs)
363 }
364}