1use std::collections::HashSet;
2use std::io::BufRead;
3use std::path::Path;
4
5use polars::prelude::{DataFrame, NamedFrom, Series};
6use serde_json::Value;
7
8use crate::io::format::{self, FileReadError, InputAdapter, InputFile, ReadInput};
9use crate::io::read::json_selector::{
10 compact_json, evaluate_selector, parse_selector, SelectorToken, SelectorValue,
11};
12use crate::{config, FloeResult};
13
14struct JsonInputAdapter;
15
16static JSON_INPUT_ADAPTER: JsonInputAdapter = JsonInputAdapter;
17
18pub(crate) fn json_input_adapter() -> &'static dyn InputAdapter {
19 &JSON_INPUT_ADAPTER
20}
21
22#[derive(Debug, Clone)]
23pub struct JsonReadError {
24 pub rule: String,
25 pub message: String,
26}
27
28impl std::fmt::Display for JsonReadError {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 write!(f, "{}: {}", self.rule, self.message)
31 }
32}
33
34impl std::error::Error for JsonReadError {}
35
36struct SelectorPlan {
37 source: String,
38 tokens: Vec<SelectorToken>,
39 top_level_key: Option<String>,
40}
41
42fn build_selector_plan(
43 columns: &[config::ColumnConfig],
44) -> Result<Vec<SelectorPlan>, JsonReadError> {
45 let mut plans = Vec::with_capacity(columns.len());
46 let mut seen = HashSet::new();
47 for column in columns {
48 let source = column.source_or_name().to_string();
49 if !seen.insert(source.clone()) {
50 return Err(JsonReadError {
51 rule: "json_selector_invalid".to_string(),
52 message: format!("duplicate json selector source: {}", source),
53 });
54 }
55 let tokens = parse_selector(&source).map_err(|err| JsonReadError {
56 rule: "json_selector_invalid".to_string(),
57 message: format!("invalid selector {}: {}", source, err.message),
58 })?;
59 let top_level_key = match tokens.as_slice() {
60 [SelectorToken::Field(name)] => Some(name.clone()),
61 _ => None,
62 };
63 plans.push(SelectorPlan {
64 source,
65 tokens,
66 top_level_key,
67 });
68 }
69 Ok(plans)
70}
71
72fn evaluate_selector_plan(
73 value: &Value,
74 plan: &SelectorPlan,
75) -> Result<SelectorValue, JsonReadError> {
76 if let Some(key) = plan.top_level_key.as_deref() {
77 let Some(object) = value.as_object() else {
78 return Ok(SelectorValue::Null);
79 };
80 let Some(current) = object.get(key) else {
81 return Ok(SelectorValue::Null);
82 };
83 if current.is_null() {
84 return Ok(SelectorValue::Null);
85 }
86 if current.is_object() || current.is_array() {
87 return Ok(SelectorValue::NonScalar(current.clone()));
88 }
89 return match current {
90 Value::String(value) => Ok(SelectorValue::Scalar(value.clone())),
91 Value::Bool(value) => Ok(SelectorValue::Scalar(value.to_string())),
92 Value::Number(value) => Ok(SelectorValue::Scalar(value.to_string())),
93 Value::Null => Ok(SelectorValue::Null),
94 Value::Object(_) | Value::Array(_) => Ok(SelectorValue::NonScalar(current.clone())),
95 };
96 }
97
98 evaluate_selector(value, &plan.tokens).map_err(|err| JsonReadError {
99 rule: "json_selector_invalid".to_string(),
100 message: format!("invalid selector {}: {}", plan.source, err.message),
101 })
102}
103
104fn append_extracted_row_into(
105 value: &Value,
106 plans: &[SelectorPlan],
107 cast_mode: &str,
108 location_kind: &'static str,
109 location_index: usize,
110 output_columns: &mut [Vec<Option<String>>],
111) -> Result<(), JsonReadError> {
112 for (output, plan) in output_columns.iter_mut().zip(plans) {
113 let selected = evaluate_selector_plan(value, plan)?;
114 let cell = match selected {
115 SelectorValue::Null => None,
116 SelectorValue::Scalar(value) => Some(value),
117 SelectorValue::NonScalar(value) => {
118 let location = format!("{location_kind} {location_index}");
119 if cast_mode == "coerce" {
120 Some(compact_json(&value).map_err(|err| JsonReadError {
121 rule: "json_selector_non_scalar".to_string(),
122 message: format!(
123 "failed to stringify selector {} at {}: {}",
124 plan.source, location, err.message
125 ),
126 })?)
127 } else {
128 return Err(JsonReadError {
129 rule: "json_selector_non_scalar".to_string(),
130 message: format!(
131 "non-scalar value for selector {} at {}",
132 plan.source, location
133 ),
134 });
135 }
136 }
137 };
138 output.push(cell);
139 }
140 Ok(())
141}
142
143fn read_ndjson_file(
144 input_path: &Path,
145 columns: &[config::ColumnConfig],
146 cast_mode: &str,
147) -> Result<DataFrame, JsonReadError> {
148 let content = std::fs::read_to_string(input_path).map_err(|err| JsonReadError {
149 rule: "json_parse_error".to_string(),
150 message: format!("failed to read json at {}: {err}", input_path.display()),
151 })?;
152
153 let plans = build_selector_plan(columns)?;
154 let mut column_values: Vec<Vec<Option<String>>> = vec![Vec::new(); plans.len()];
155 for (idx, line) in content.lines().enumerate() {
156 let line = line.trim();
157 if line.is_empty() {
158 continue;
159 }
160 let value: Value = serde_json::from_str(line).map_err(|err| JsonReadError {
161 rule: "json_parse_error".to_string(),
162 message: format!("json parse error at line {}: {err}", idx + 1),
163 })?;
164 if !value.is_object() {
165 return Err(JsonReadError {
166 rule: "json_parse_error".to_string(),
167 message: format!("expected json object at line {}", idx + 1),
168 });
169 }
170
171 append_extracted_row_into(
172 &value,
173 &plans,
174 cast_mode,
175 "line",
176 idx + 1,
177 &mut column_values,
178 )?;
179 }
180
181 build_dataframe(&plans, column_values)
182}
183
184fn read_ndjson_columns(input_path: &Path) -> Result<Vec<String>, JsonReadError> {
185 let file = std::fs::File::open(input_path).map_err(|err| JsonReadError {
186 rule: "json_parse_error".to_string(),
187 message: format!("failed to read json at {}: {err}", input_path.display()),
188 })?;
189 let reader = std::io::BufReader::new(file);
190 let mut first_error: Option<JsonReadError> = None;
191
192 for (idx, line) in reader.lines().enumerate() {
193 let line = line.map_err(|err| JsonReadError {
194 rule: "json_parse_error".to_string(),
195 message: format!("failed to read json at {}: {err}", input_path.display()),
196 })?;
197 let line = line.trim();
198 if line.is_empty() {
199 continue;
200 }
201 match serde_json::from_str::<Value>(line) {
202 Ok(value) => {
203 let object = value.as_object().ok_or_else(|| JsonReadError {
204 rule: "json_parse_error".to_string(),
205 message: format!("expected json object at line {}", idx + 1),
206 })?;
207 let mut keys = object.keys().cloned().collect::<Vec<_>>();
208 keys.sort();
209 return Ok(keys);
210 }
211 Err(err) => {
212 if first_error.is_none() {
213 first_error = Some(JsonReadError {
214 rule: "json_parse_error".to_string(),
215 message: format!("json parse error at line {}: {err}", idx + 1),
216 });
217 }
218 continue;
219 }
220 }
221 }
222
223 Err(first_error.unwrap_or_else(|| JsonReadError {
224 rule: "json_parse_error".to_string(),
225 message: format!("no json objects found in {}", input_path.display()),
226 }))
227}
228
229fn read_json_array_file(
230 input_path: &Path,
231 columns: &[config::ColumnConfig],
232 cast_mode: &str,
233) -> Result<DataFrame, JsonReadError> {
234 let content = std::fs::read_to_string(input_path).map_err(|err| JsonReadError {
235 rule: "json_parse_error".to_string(),
236 message: format!("failed to read json at {}: {err}", input_path.display()),
237 })?;
238
239 let value: Value = serde_json::from_str(&content).map_err(|err| JsonReadError {
240 rule: "json_parse_error".to_string(),
241 message: format!("json parse error: {err}"),
242 })?;
243 let array = value.as_array().ok_or_else(|| JsonReadError {
244 rule: "json_parse_error".to_string(),
245 message: "expected json array at root".to_string(),
246 })?;
247
248 let plans = build_selector_plan(columns)?;
249 let mut column_values: Vec<Vec<Option<String>>> = (0..plans.len())
250 .map(|_| Vec::with_capacity(array.len()))
251 .collect();
252
253 for (idx, value) in array.iter().enumerate() {
254 if !value.is_object() {
255 return Err(JsonReadError {
256 rule: "json_parse_error".to_string(),
257 message: format!("expected json object at index {}", idx),
258 });
259 }
260 append_extracted_row_into(value, &plans, cast_mode, "index", idx, &mut column_values)?;
261 }
262
263 build_dataframe(&plans, column_values)
264}
265
266fn read_json_array_columns(input_path: &Path) -> Result<Vec<String>, JsonReadError> {
267 let content = std::fs::read_to_string(input_path).map_err(|err| JsonReadError {
268 rule: "json_parse_error".to_string(),
269 message: format!("failed to read json at {}: {err}", input_path.display()),
270 })?;
271 let value: Value = serde_json::from_str(&content).map_err(|err| JsonReadError {
272 rule: "json_parse_error".to_string(),
273 message: format!("json parse error: {err}"),
274 })?;
275 let array = value.as_array().ok_or_else(|| JsonReadError {
276 rule: "json_parse_error".to_string(),
277 message: "expected json array at root".to_string(),
278 })?;
279 if array.is_empty() {
280 return Err(JsonReadError {
281 rule: "json_parse_error".to_string(),
282 message: "json array is empty".to_string(),
283 });
284 }
285 let object = array[0].as_object().ok_or_else(|| JsonReadError {
286 rule: "json_parse_error".to_string(),
287 message: "expected json object at index 0".to_string(),
288 })?;
289 let mut keys = object.keys().cloned().collect::<Vec<_>>();
290 keys.sort();
291 Ok(keys)
292}
293
294fn build_dataframe(
295 plans: &[SelectorPlan],
296 column_values: Vec<Vec<Option<String>>>,
297) -> Result<DataFrame, JsonReadError> {
298 let mut series = Vec::with_capacity(plans.len());
299 for (plan, values) in plans.iter().zip(column_values.into_iter()) {
300 series.push(Series::new(plan.source.as_str().into(), values).into());
301 }
302
303 DataFrame::new(series).map_err(|err| JsonReadError {
304 rule: "json_parse_error".to_string(),
305 message: format!("failed to build dataframe: {err}"),
306 })
307}
308
309impl InputAdapter for JsonInputAdapter {
310 fn format(&self) -> &'static str {
311 "json"
312 }
313
314 fn read_input_columns(
315 &self,
316 entity: &config::EntityConfig,
317 input_file: &InputFile,
318 _columns: &[config::ColumnConfig],
319 ) -> Result<Vec<String>, FileReadError> {
320 let json_mode = entity
321 .source
322 .options
323 .as_ref()
324 .and_then(|options| options.json_mode.as_deref())
325 .unwrap_or("array");
326 let path = &input_file.source_local_path;
327 let read_result = match json_mode {
328 "ndjson" => read_ndjson_columns(path),
329 "array" => read_json_array_columns(path),
330 other => Err(JsonReadError {
331 rule: "json_parse_error".to_string(),
332 message: format!(
333 "unsupported source.options.json_mode={} (allowed: array, ndjson)",
334 other
335 ),
336 }),
337 };
338 read_result.map_err(|err| FileReadError {
339 rule: err.rule,
340 message: err.message,
341 })
342 }
343
344 fn read_inputs(
345 &self,
346 entity: &config::EntityConfig,
347 files: &[InputFile],
348 columns: &[config::ColumnConfig],
349 normalize_strategy: Option<&str>,
350 collect_raw: bool,
351 ) -> FloeResult<Vec<ReadInput>> {
352 let mut inputs = Vec::with_capacity(files.len());
353 let json_mode = entity
354 .source
355 .options
356 .as_ref()
357 .and_then(|options| options.json_mode.as_deref())
358 .unwrap_or("array");
359 let cast_mode = entity.source.cast_mode.as_deref().unwrap_or("strict");
360 for input_file in files {
361 let path = &input_file.source_local_path;
362 let read_result = match json_mode {
363 "ndjson" => read_ndjson_file(path, columns, cast_mode),
364 "array" => read_json_array_file(path, columns, cast_mode),
365 other => Err(JsonReadError {
366 rule: "json_parse_error".to_string(),
367 message: format!(
368 "unsupported source.options.json_mode={} (allowed: array, ndjson)",
369 other
370 ),
371 }),
372 };
373 match read_result {
374 Ok(df) => {
375 let input = format::read_input_from_df(
376 input_file,
377 &df,
378 columns,
379 normalize_strategy,
380 collect_raw,
381 )?;
382 inputs.push(input);
383 }
384 Err(err) => {
385 inputs.push(ReadInput::FileError {
386 input_file: input_file.clone(),
387 error: FileReadError {
388 rule: err.rule,
389 message: err.message,
390 },
391 });
392 }
393 }
394 }
395 Ok(inputs)
396 }
397}